Airflow dag running forever when trying to insert into ChromaDB

30 views Asked by At

I was trying to trigger a script through AirFlow which inserts the data into ChromaDB, the script works correctly and data gets inserted but when I trigger it through an AirFlow, the dag keeps running and no data gets inserted into the Database either. The database is connection is working since heartbeat and every other collection methods like collection.count(),collection.get() works except add and upsert

This is the function called from AirFlow

def ingest_data(content):
print("Received from airflow")
data = content
if data:
doc = get_text_chunks_langchain(data)
print("Sending to Embed")
create_client_and_embed(doc)
return "Done"

Processing

def get_text_chunks_langchain(text):
text_splitter = CharacterTextSplitter(chunk_size=1000, chunk_overlap=200)
docs = text_splitter.split_text(text)
return docs

def create_client_and_embed(docs):
print(docs)
print("Starting Embedding")
client = chromadb.PersistentClient(path="/pathToDB")
print("Printing this")
collection = client.get_or_create_collection(name='test-db',
metadata={"hnsw:space": "cosine"}

#gets stuck at this line
collection.add(documents=docs, ids=[str(uuid.uuid4())])

P.S: This is an external script that airflow task calls

The DAG script looks something like this,

def callScript(**kwargs):
    content = kwargs.get("content")
    print(content)
    data = content
    return ingest_data(data)



with dag:
    hello_world=PythonOperator(
        task_id='hello',
        python_callable=callScript,
        op_kwargs={'content': 'This is some example content to be ingested'},
        provide_context=True
    )

Trying to implement an airflow DAG that triggers a script which inserts my vectors into ChromaDB.
The script works fine locally.
But keeps running forever when triggered via DAG.
Help me fix this.

0

There are 0 answers