I am encountering a ConnectionRefusedError while running an OpenSearch indexing script. Initially, the script worked without issues, indexing approximately 200 files, each containing around 30,000 documents.
However, now it fails with the following error:
Traceback (most recent call last):
File "/usr/local/lib/python3.8/dist-packages/urllib3/connection.py", line 198, in _new_conn
sock = connection.create_connection(
File "/usr/local/lib/python3.8/dist-packages/urllib3/util/connection.py", line 85, in create_connection
raise err
File "/usr/local/lib/python3.8/dist-packages/urllib3/util/connection.py", line 73, in create_connection
sock.connect(sa)
ConnectionRefusedError: [Errno 111] Connection refused
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/usr/local/lib/python3.8/dist-packages/opensearchpy/connection/http_urllib3.py", line 264, in perform_request
response = self.pool.urlopen(
File "/usr/local/lib/python3.8/dist-packages/urllib3/connectionpool.py", line 847, in urlopen
retries = retries.increment(
File "/usr/local/lib/python3.8/dist-packages/urllib3/util/retry.py", line 445, in increment
raise reraise(type(error), error, _stacktrace)
File "/usr/local/lib/python3.8/dist-packages/urllib3/util/util.py", line 39, in reraise
raise value
File "/usr/local/lib/python3.8/dist-packages/urllib3/connectionpool.py", line 793, in urlopen
response = self._make_request(
File "/usr/local/lib/python3.8/dist-packages/urllib3/connectionpool.py", line 491, in _make_request
raise new_e
File "/usr/local/lib/python3.8/dist-packages/urllib3/connectionpool.py", line 467, in _make_request
self._validate_conn(conn)
File "/usr/local/lib/python3.8/dist-packages/urllib3/connectionpool.py", line 1099, in _validate_conn
conn.connect()
File "/usr/local/lib/python3.8/dist-packages/urllib3/connection.py", line 616, in connect
self.sock = sock = self._new_conn()
File "/usr/local/lib/python3.8/dist-packages/urllib3/connection.py", line 213, in _new_conn
raise NewConnectionError(
urllib3.exceptions.NewConnectionError: <urllib3.connection.HTTPSConnection object at 0x7fb798541430>: Failed to establish a new connection: [Errno 111] Connection refused
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "indexing.py", line 162, in <module>
create_index(index_name, path, model, batch_size=100)
File "indexing.py", line 139, in create_index
client.bulk(body=batch, refresh=True)
File "/usr/local/lib/python3.8/dist-packages/opensearchpy/client/utils.py", line 181, in _wrapped
return func(*args, params=params, headers=headers, **kwargs)
File "/usr/local/lib/python3.8/dist-packages/opensearchpy/client/__init__.py", line 462, in bulk
return self.transport.perform_request(
File "/usr/local/lib/python3.8/dist-packages/opensearchpy/transport.py", line 446, in perform_request
raise e
File "/usr/local/lib/python3.8/dist-packages/opensearchpy/transport.py", line 409, in perform_request
status, headers_response, data = connection.perform_request(
File "/usr/local/lib/python3.8/dist-packages/opensearchpy/connection/http_urllib3.py", line 279, in perform_request
raise ConnectionError("N/A", str(e), e)
opensearchpy.exceptions.ConnectionError: ConnectionError(<urllib3.connection.HTTPSConnection object at 0x7fb798541430>: Failed to establish a new connection: [Errno 111] Connection refused) caused by: NewConnectionError(<urllib3.connection.HTTPSConnection object at 0x7fb798541430>: Failed to establish a new connection: [Errno 111] Connection refused)
When encountering this error, I restart OpenSearch and resume indexing from the exact file where the process stopped. However, upon resuming, the same error occurs again.
Here's an overview of my setup and the steps I've taken to troubleshoot:
Environment:
- Operating System: Ubuntu
- Python Version: Python 3.8.10
- OpenSearch Version: 2.11.1
Script Overview:
The script reads JSON files from a directory and indexes them into an OpenSearch cluster. It uses the opensearchpy library to interact with OpenSearch and the sentence-transformers library for document embeddings. This is the code:
import pandas as pd
import os
import json
import torch
import time
from opensearchpy import OpenSearch
from sentence_transformers import SentenceTransformer
path = "../outputNJSONextracted" # Directory containing your JSON files
model_card = 'sentence-transformers/msmarco-distilbert-base-tas-b'
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Device {device}")
host = '127.0.0.1' #host = '54.93.99.186'
port = 9200
auth = ('admin','IVIngi2024!') #('admin', 'admin')
client = OpenSearch(
hosts = [{'host': host, 'port': port}],
http_auth = auth,
use_ssl = True,
verify_certs = False,
ssl_assert_hostname = False,
ssl_show_warn = False,
timeout=30,
max_retries=10
)
print("Connection opened...")
index_name = 'medline-faiss-hnsw-3' # change the index name
index_body = {
"settings": {
"index": {
"knn": "true",
"refresh_interval" : -1,
#default_pipeline": "medline-ingest-pipeline", # embedding in script
"number_of_shards": 5,
"number_of_replicas": 0
}
},
"mappings": {
"properties": {
"embedding_abstract": {
"type": "knn_vector",
"dimension": 768,
"method":{
"engine":"faiss",
"name": "hnsw",
"space_type": "innerproduct"
}
},
"title":{"type":"text"},
"abstract":{"type":"text"},
"pmid":{"type":"keyword"},
"journal":{"type":"text"},
"pubdate":{"type":"date"},
"authors":{"type": "text"}
}
}
}
response = client.indices.create(index_name, body=index_body)
print(response)
def create_index(index_name, directory_path, model, batch_size=100):
j = 0
documents = set()
files_number = 0
for filename in sorted(os.listdir(directory_path)):
start_time = time.time()
if filename.endswith(".json"):
print(f"Starting indexing {filename} ...")
# Construct the full file path
file_path = os.path.join(directory_path, filename)
# Read the JSON file
with open(file_path, 'r') as file:
# Initialize an empty list to store dictionaries
dictionaries = []
# Read the file line by line
for line in file:
# Parse each line as JSON and append it to the list
dictionaries.append(json.loads(line))
# Create a DataFrame
df = pd.DataFrame(dictionaries)
# Select only the required columns
df = df[['pmid', 'title', 'abstract', 'journal', 'authors', 'pubdate']]
# Output the file name
batch = []
for i, row in df.iterrows():
pmid = row["pmid"]
if pmid in documents:
continue
else:
documents.add(pmid)
embedding = model.encode(row["abstract"])
doc = {
"pmid": pmid,
"abstract": row["abstract"],
"title": row["title"],
"authors": row['authors'],
"journal": row['journal'],
"pubdate": row['pubdate'],
"embedding_abstract": embedding
}
batch.append({"index": {"_index": index_name, "_id": pmid}})
batch.append(doc)
j += 1
if len(batch) >= batch_size*2:
client.bulk(body=batch, refresh=True)
batch = []
if batch:
client.bulk(body=batch, refresh=True)
print(f"Indexed remaining documents")
files_number += 1
print(f"Processed file: {filename} in {time.time()-start_time}")
print("Number of currently documents indexed ",j)
if files_number % 100 == 0:
print("-"*50)
print(f"Files indexed = {files_number}")
print()
print("Total documents inserted = ", j)
model = SentenceTransformer(model_card)
model.to(device)
print("Creating indexing...")
start = time.time()
create_index(index_name, path, model, batch_size=100)
print(f"Time neeeded {time.time() - start}")
Troubleshooting Steps Taken:
- Increased heap size in jvm.options file to 8GB (the max that I can).
- Attempted to reduce batch size to mitigate potential HTTP request size issues.
Despite these efforts, the script continues to encounter connection issues.
I suspect that the problem may be related to sending large HTTP requests or some configuration issue with the OpenSearch server. However, I'm unsure how to proceed in diagnosing and resolving the issue.
Any insights or suggestions would be greatly appreciated. Thank you!
Specific Expectations:
I'm looking for:
Guidance on configuring my Python scripts to ensure they function properly. Suggestions for optimizing or adjusting configuration settings within OpenSearch to resolve any issues related to indexing.