OpenSearch indexing script disconnects from server: ConnectionRefusedError

48 views Asked by At

I am encountering a ConnectionRefusedError while running an OpenSearch indexing script. Initially, the script worked without issues, indexing approximately 200 files, each containing around 30,000 documents.

However, now it fails with the following error:

Traceback (most recent call last):
  File "/usr/local/lib/python3.8/dist-packages/urllib3/connection.py", line 198, in _new_conn
    sock = connection.create_connection(
  File "/usr/local/lib/python3.8/dist-packages/urllib3/util/connection.py", line 85, in create_connection
    raise err
  File "/usr/local/lib/python3.8/dist-packages/urllib3/util/connection.py", line 73, in create_connection
    sock.connect(sa)
ConnectionRefusedError: [Errno 111] Connection refused

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/local/lib/python3.8/dist-packages/opensearchpy/connection/http_urllib3.py", line 264, in perform_request
    response = self.pool.urlopen(
  File "/usr/local/lib/python3.8/dist-packages/urllib3/connectionpool.py", line 847, in urlopen
    retries = retries.increment(
  File "/usr/local/lib/python3.8/dist-packages/urllib3/util/retry.py", line 445, in increment
    raise reraise(type(error), error, _stacktrace)
  File "/usr/local/lib/python3.8/dist-packages/urllib3/util/util.py", line 39, in reraise
    raise value
  File "/usr/local/lib/python3.8/dist-packages/urllib3/connectionpool.py", line 793, in urlopen
    response = self._make_request(
  File "/usr/local/lib/python3.8/dist-packages/urllib3/connectionpool.py", line 491, in _make_request
    raise new_e
  File "/usr/local/lib/python3.8/dist-packages/urllib3/connectionpool.py", line 467, in _make_request
    self._validate_conn(conn)
  File "/usr/local/lib/python3.8/dist-packages/urllib3/connectionpool.py", line 1099, in _validate_conn
    conn.connect()
  File "/usr/local/lib/python3.8/dist-packages/urllib3/connection.py", line 616, in connect
    self.sock = sock = self._new_conn()
  File "/usr/local/lib/python3.8/dist-packages/urllib3/connection.py", line 213, in _new_conn
    raise NewConnectionError(
urllib3.exceptions.NewConnectionError: <urllib3.connection.HTTPSConnection object at 0x7fb798541430>: Failed to establish a new connection: [Errno 111] Connection refused

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "indexing.py", line 162, in <module>
    create_index(index_name, path, model, batch_size=100)
  File "indexing.py", line 139, in create_index
    client.bulk(body=batch, refresh=True)
  File "/usr/local/lib/python3.8/dist-packages/opensearchpy/client/utils.py", line 181, in _wrapped
    return func(*args, params=params, headers=headers, **kwargs)
  File "/usr/local/lib/python3.8/dist-packages/opensearchpy/client/__init__.py", line 462, in bulk
    return self.transport.perform_request(
  File "/usr/local/lib/python3.8/dist-packages/opensearchpy/transport.py", line 446, in perform_request
    raise e
  File "/usr/local/lib/python3.8/dist-packages/opensearchpy/transport.py", line 409, in perform_request
    status, headers_response, data = connection.perform_request(
  File "/usr/local/lib/python3.8/dist-packages/opensearchpy/connection/http_urllib3.py", line 279, in perform_request
    raise ConnectionError("N/A", str(e), e)
opensearchpy.exceptions.ConnectionError: ConnectionError(<urllib3.connection.HTTPSConnection object at 0x7fb798541430>: Failed to establish a new connection: [Errno 111] Connection refused) caused by: NewConnectionError(<urllib3.connection.HTTPSConnection object at 0x7fb798541430>: Failed to establish a new connection: [Errno 111] Connection refused)

When encountering this error, I restart OpenSearch and resume indexing from the exact file where the process stopped. However, upon resuming, the same error occurs again.

Here's an overview of my setup and the steps I've taken to troubleshoot:

Environment:

  • Operating System: Ubuntu
  • Python Version: Python 3.8.10
  • OpenSearch Version: 2.11.1

Script Overview:

The script reads JSON files from a directory and indexes them into an OpenSearch cluster. It uses the opensearchpy library to interact with OpenSearch and the sentence-transformers library for document embeddings. This is the code:

import pandas as pd
import os
import json
import torch
import time
from opensearchpy import OpenSearch
from sentence_transformers import SentenceTransformer

path = "../outputNJSONextracted"  # Directory containing your JSON files
model_card = 'sentence-transformers/msmarco-distilbert-base-tas-b'

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Device {device}")


host = '127.0.0.1' #host = '54.93.99.186' 
port = 9200
auth = ('admin','IVIngi2024!') #('admin', 'admin')  

client = OpenSearch(
    hosts = [{'host': host, 'port': port}],
    http_auth = auth,
    use_ssl = True,
    verify_certs = False,
    ssl_assert_hostname = False,
    ssl_show_warn = False,
    timeout=30, 
    max_retries=10
)
print("Connection opened...")

index_name = 'medline-faiss-hnsw-3' # change the index name

index_body = {
 "settings": {
    "index": {
     "knn": "true",
     "refresh_interval" : -1,
     #default_pipeline": "medline-ingest-pipeline",  # embedding in script
     "number_of_shards": 5,
     "number_of_replicas": 0
    }
  },
  "mappings": {
    "properties": {
         "embedding_abstract": {
                        "type": "knn_vector",
                        "dimension": 768,
                        "method":{
                            "engine":"faiss",
                            "name": "hnsw",
                            "space_type": "innerproduct"
                        }
                       },
          "title":{"type":"text"},
          "abstract":{"type":"text"},
          "pmid":{"type":"keyword"},
          "journal":{"type":"text"},
          "pubdate":{"type":"date"},
          "authors":{"type": "text"}
    }
  }
}

response = client.indices.create(index_name, body=index_body)
print(response)

def create_index(index_name, directory_path, model, batch_size=100):
    j = 0
    documents = set()

    files_number = 0
    for filename in sorted(os.listdir(directory_path)):
        start_time = time.time()
        if filename.endswith(".json"):
            print(f"Starting indexing {filename} ...")
            # Construct the full file path
            file_path = os.path.join(directory_path, filename)
            # Read the JSON file
            with open(file_path, 'r') as file:
                # Initialize an empty list to store dictionaries
                dictionaries = []
                
                # Read the file line by line
                for line in file:
                    # Parse each line as JSON and append it to the list
                    dictionaries.append(json.loads(line))
            # Create a DataFrame
            df = pd.DataFrame(dictionaries)
            # Select only the required columns
            df = df[['pmid', 'title', 'abstract', 'journal', 'authors', 'pubdate']]
            # Output the file name
            batch = []
            for i, row in df.iterrows(): 
                pmid = row["pmid"]
                if pmid in documents:
                    continue
                else:
                    documents.add(pmid)
                    embedding = model.encode(row["abstract"])
                    doc = {
                        "pmid": pmid,
                        "abstract": row["abstract"],
                        "title": row["title"],
                        "authors": row['authors'],
                        "journal": row['journal'],
                        "pubdate": row['pubdate'],
                        "embedding_abstract": embedding
                        }
                    batch.append({"index": {"_index": index_name, "_id": pmid}})
                    batch.append(doc)
                    j += 1
                if len(batch) >= batch_size*2:
                    client.bulk(body=batch, refresh=True)
                    batch = []
                 
            if batch:
                client.bulk(body=batch, refresh=True)
                print(f"Indexed remaining documents")

            files_number += 1
            print(f"Processed file: {filename} in {time.time()-start_time}")
            print("Number of currently documents indexed ",j)
            if files_number % 100 == 0:
                print("-"*50)
                print(f"Files indexed = {files_number}")
                print()

    print("Total documents inserted = ", j)
    
    

model = SentenceTransformer(model_card)
model.to(device)
print("Creating indexing...")
start = time.time()
create_index(index_name, path, model, batch_size=100)
print(f"Time neeeded {time.time() - start}")

Troubleshooting Steps Taken:

  1. Increased heap size in jvm.options file to 8GB (the max that I can).
  2. Attempted to reduce batch size to mitigate potential HTTP request size issues.

Despite these efforts, the script continues to encounter connection issues.

I suspect that the problem may be related to sending large HTTP requests or some configuration issue with the OpenSearch server. However, I'm unsure how to proceed in diagnosing and resolving the issue.

Any insights or suggestions would be greatly appreciated. Thank you!

Specific Expectations:

I'm looking for:

Guidance on configuring my Python scripts to ensure they function properly. Suggestions for optimizing or adjusting configuration settings within OpenSearch to resolve any issues related to indexing.

0

There are 0 answers