I am encountering an issue with Python Locust where tasks seem to be blocked when using the pyodbc library, but interestingly, the same tasks work fine when using pyodbc within a ThreadPool. I've tested this behavior with both SQL Server and PostgreSQL databases and arrived at the same conclusion, suggesting that the issue lies within Locust itself rather than the database or pyodbc.
Locust application:
import logging
import random
import sys
import time
import pyodbc
from gevent import killall
from locust import task, User, TaskSet, events
# SQL Server Connection Details
server = 'localhost'
database = 'test'
username = 'test'
password = 'test'
driver = 'ODBC Driver 17 for SQL Server'
connection_string = f"DRIVER={driver};SERVER={server};DATABASE={database};UID={username};PWD={password}"
count = 0
running_greenlets = set()
stop_event = None
table_name = f"SampleTable_{int(time.time())}"
class SampleTaskSet(TaskSet):
def __init__(self, parent: User):
super().__init__(parent)
global count
count = count + 1
self.counter = count
@task
def execute_query(self):
global stop_event
if stop_event and stop_event.is_set():
logging.info("Stop event set. Exiting task.")
return
logging.info("User {} started".format(self.counter))
global table_name
query = "select * from {}".format(table_name)
conn = pyodbc.connect(driver=driver, host=server, Database=database, UID=username,
PWD=password, Trusted_Connection='no')
logging.info("User {} connection established".format(self.counter))
cursor = conn.cursor()
try:
logging.info("User %s Executing Query %s ", self.counter, query)
# Doesn't block task
# sleep_in_secs = random.randint(5, 10)
# time.sleep(sleep_in_secs)
# Blocks task
cursor.execute(query)
rows = cursor.fetchall()
resp_len = len(rows)
logging.info("User %s completed Query", self.counter)
except KeyboardInterrupt:
logging.info("KeyboardInterrupt caught in task")
if stop_event:
stop_event.set()
raise
finally:
cursor.close()
conn.close()
class TestUser(User):
tasks = [SampleTaskSet]
def on_locust_stop(*args, **kwargs):
logging.info("Stopping Locust due to signal")
killall(running_greenlets)
sys.exit()
@events.test_start.add_listener
def prep_data(environment, **kwargs):
global table_name
conn = pyodbc.connect(driver=driver, host=server, Database=database, UID=username, PWD=password,
Trusted_Connection='no', autocommit=True)
cursor = conn.cursor()
logging.info("Creating table %s", table_name)
try:
create_table_query = f"CREATE TABLE {table_name} (id INT, name VARCHAR(255), email VARCHAR(255))"
cursor.execute(create_table_query)
logging.info("Created table %s", table_name)
logging.info("Loading data to table %s", table_name)
for i in range(100_000):
insert_query = f"INSERT INTO {table_name} (id, name, email) VALUES (?, ?, ?)"
cursor.execute(insert_query, (i + 1, f"Name {i + 1}", f"email_{i + 1}@example.com"))
logging.info("Loaded data to table %s", table_name)
finally:
cursor.close()
conn.close()
Execution command:
locust -f db_locust.py --headless -u10 -r10 --host=localhost
Sample log:
[2024-03-23 01:27:35,156] machine/INFO/root: Loaded data to table SampleTable_1711171627
[2024-03-23 01:27:35,156] machine/INFO/locust.runners: Ramping to 10 users at a rate of 10.00 per second
[2024-03-23 01:27:35,157] machine/INFO/locust.runners: All users spawned: {"TestUser": 10} (10 total users)
[2024-03-23 01:27:35,157] machine/INFO/root: User 1 started
[2024-03-23 01:27:35,158] machine/INFO/root: User 1 connection established
[2024-03-23 01:27:35,158] machine/INFO/root: User 1 Executing Query select * from SampleTable_1711171627
[2024-03-23 01:27:35,344] machine/INFO/root: User 1 completed Query
[2024-03-23 01:27:35,353] machine/INFO/root: User 2 started
[2024-03-23 01:27:35,354] machine/INFO/root: User 2 connection established
[2024-03-23 01:27:35,355] machine/INFO/root: User 2 Executing Query select * from SampleTable_1711171627
[2024-03-23 01:27:35,537] machine/INFO/root: User 2 completed Query
Expectation: User x Executing Query at the same time.
Similar version using threadpool,
import random
import time
import pyodbc
import concurrent.futures
import logging
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(message)s')
num_threads = 10
# SQL Server Connection Details
server = 'localhost'
database = 'test'
username = 'test'
password = 'test'
driver = 'ODBC Driver 17 for SQL Server'
connection_string = f"DRIVER={driver};SERVER={server};DATABASE={database};UID={username};PWD={password}"
count = 0
table_name = f"SampleTable_{int(time.time())}"
# Function to execute SELECT query
def run_query(user_id):
logging.info("User {} started".format(user_id))
global table_name
query = "select * from {}".format(table_name)
conn = pyodbc.connect(driver=driver, host=server, Database=database, UID=username,
PWD=password, Trusted_Connection='no')
logging.info("User {} connection established".format(user_id))
cursor = conn.cursor()
try:
logging.info("User %s Executing Query %s ", user_id, query)
# Doesn't block task
# sleep_in_secs = random.randint(5, 10)
# time.sleep(sleep_in_secs)
# Blocks task
cursor.execute(query)
rows = cursor.fetchall()
resp_len = len(rows)
logging.info("User %s completed Query", user_id)
finally:
cursor.close()
conn.close()
def prep_data():
global table_name
conn = pyodbc.connect(driver=driver, host=server, Database=database, UID=username, PWD=password,
Trusted_Connection='no', autocommit=True)
cursor = conn.cursor()
logging.info("Creating table %s", table_name)
create_table_query = f"CREATE TABLE {table_name} (id INT, name VARCHAR(255), email VARCHAR(255))"
cursor.execute(create_table_query)
logging.info("Created table %s", table_name)
logging.info("Loading data to table %s", table_name)
for i in range(100_000):
insert_query = f"INSERT INTO {table_name} (id, name, email) VALUES (?, ?, ?)"
cursor.execute(insert_query, (i + 1, f"Name {i + 1}", f"email_{i + 1}@example.com"))
logging.info("Loaded data to table %s", table_name)
cursor.close()
conn.close()
# Create ThreadPoolExecutor
with concurrent.futures.ThreadPoolExecutor(max_workers=num_threads) as executor:
prep_data()
# Submit tasks to the executor
futures = [executor.submit(run_query, user_id) for user_id in range(1, num_threads + 1)]
# Wait for tasks to complete and log completion
for future in concurrent.futures.as_completed(futures):
logging.info("Task has completed.")
Any help or feedback would be appreciated. If the issue is with pyodbc or getting a connection, it should have been blocking in threadpool which isn't. Can anyone shed some light on how to work around locust issue getting blocked? I have also tried with sleep statements in locust where it doesn't get blocked. It seems a combination of locust with pyodbc which seems to have an issue.
Tried with two versions - locust, threadpool to see if it is reproducible in threadpool. Threadpool version works as expected.
Edit: Tried with gevent monkey-patching and doesn't work for pyodbc as the cursor.execute() blocks the greenlet.
"""Spawn multiple workers and wait for them to complete"""
import logging
import gevent
from gevent import monkey
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
monkey.patch_all(sys=True)
import pyodbc
# Setting up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# SQL Server Connection Details
server = 'localhost'
database = 'test'
username = 'test'
password = 'test'
driver = 'ODBC Driver 17 for SQL Server'
connection_string = f"DRIVER={driver};SERVER={server};DATABASE={database};UID={username};PWD={password}"
count = 0
# 1m rows table
table_name = "SampleTable_1711329791"
def run_query(user_id, query):
logger.info('User %d started %s' % (user_id, query))
conn = pyodbc.connect(driver=driver, host=server, Database=database, UID=username,
PWD=password, Trusted_Connection='no')
logging.info("User {} connection established".format(user_id))
cursor = conn.cursor()
try:
logging.info("User %s Executing Query %s ", user_id, query)
# Doesn't block task
# sleep_in_secs = random.randint(5, 10)
# time.sleep(sleep_in_secs)
# Blocks task
cursor.execute(query)
rows = cursor.fetchall()
resp_len = len(rows)
logging.info("User %s completed Query", user_id)
finally:
cursor.close()
conn.close()
jobs = [gevent.spawn(run_query, user_id, "select * from {}".format(table_name)) for user_id in range(1, 10)]
gevent.wait(jobs)