Python locust tasks are blocked when used with pyodbc

62 views Asked by At

I am encountering an issue with Python Locust where tasks seem to be blocked when using the pyodbc library, but interestingly, the same tasks work fine when using pyodbc within a ThreadPool. I've tested this behavior with both SQL Server and PostgreSQL databases and arrived at the same conclusion, suggesting that the issue lies within Locust itself rather than the database or pyodbc.

Locust application:


import logging
import random
import sys
import time

import pyodbc
from gevent import killall
from locust import task, User, TaskSet, events

# SQL Server Connection Details
server = 'localhost'
database = 'test'
username = 'test'
password = 'test'
driver = 'ODBC Driver 17 for SQL Server'
connection_string = f"DRIVER={driver};SERVER={server};DATABASE={database};UID={username};PWD={password}"

count = 0
running_greenlets = set()
stop_event = None

table_name = f"SampleTable_{int(time.time())}"


class SampleTaskSet(TaskSet):

    def __init__(self, parent: User):
        super().__init__(parent)
        global count
        count = count + 1
        self.counter = count

    @task
    def execute_query(self):
        global stop_event
        if stop_event and stop_event.is_set():
            logging.info("Stop event set. Exiting task.")
            return

        logging.info("User {} started".format(self.counter))
        global table_name
        query = "select * from {}".format(table_name)
        conn = pyodbc.connect(driver=driver, host=server, Database=database, UID=username,
                              PWD=password, Trusted_Connection='no')
        logging.info("User {} connection established".format(self.counter))
        cursor = conn.cursor()
        try:
            logging.info("User %s Executing Query %s ", self.counter, query)

            # Doesn't block task
            # sleep_in_secs = random.randint(5, 10)
            # time.sleep(sleep_in_secs)

            # Blocks task
            cursor.execute(query)
            rows = cursor.fetchall()
            resp_len = len(rows)
            logging.info("User %s completed Query", self.counter)

        except KeyboardInterrupt:
            logging.info("KeyboardInterrupt caught in task")
            if stop_event:
                stop_event.set()
            raise

        finally:
            cursor.close()
            conn.close()


class TestUser(User):
    tasks = [SampleTaskSet]


def on_locust_stop(*args, **kwargs):
    logging.info("Stopping Locust due to signal")
    killall(running_greenlets)
    sys.exit()


@events.test_start.add_listener
def prep_data(environment, **kwargs):
    global table_name

    conn = pyodbc.connect(driver=driver, host=server, Database=database, UID=username, PWD=password,
                          Trusted_Connection='no', autocommit=True)
    cursor = conn.cursor()

    logging.info("Creating table %s", table_name)
    try:
        create_table_query = f"CREATE TABLE {table_name} (id INT, name VARCHAR(255), email VARCHAR(255))"
        cursor.execute(create_table_query)
        logging.info("Created table %s", table_name)
        logging.info("Loading data to table %s", table_name)

        for i in range(100_000):
            insert_query = f"INSERT INTO {table_name} (id, name, email) VALUES (?, ?, ?)"
            cursor.execute(insert_query, (i + 1, f"Name {i + 1}", f"email_{i + 1}@example.com"))
        logging.info("Loaded data to table %s", table_name)

    finally:
        cursor.close()
        conn.close()

Execution command:

locust -f db_locust.py --headless -u10 -r10 --host=localhost

Sample log:

[2024-03-23 01:27:35,156] machine/INFO/root: Loaded data to table SampleTable_1711171627
[2024-03-23 01:27:35,156] machine/INFO/locust.runners: Ramping to 10 users at a rate of 10.00 per second
[2024-03-23 01:27:35,157] machine/INFO/locust.runners: All users spawned: {"TestUser": 10} (10 total users)
[2024-03-23 01:27:35,157] machine/INFO/root: User 1 started
[2024-03-23 01:27:35,158] machine/INFO/root: User 1 connection established
[2024-03-23 01:27:35,158] machine/INFO/root: User 1 Executing Query select * from SampleTable_1711171627
[2024-03-23 01:27:35,344] machine/INFO/root: User 1 completed Query
[2024-03-23 01:27:35,353] machine/INFO/root: User 2 started
[2024-03-23 01:27:35,354] machine/INFO/root: User 2 connection established
[2024-03-23 01:27:35,355] machine/INFO/root: User 2 Executing Query select * from SampleTable_1711171627
[2024-03-23 01:27:35,537] machine/INFO/root: User 2 completed Query

Expectation: User x Executing Query at the same time.

Similar version using threadpool,


import random
import time

import pyodbc
import concurrent.futures
import logging

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(message)s')

num_threads = 10
# SQL Server Connection Details
server = 'localhost'
database = 'test'
username = 'test'
password = 'test'
driver = 'ODBC Driver 17 for SQL Server'
connection_string = f"DRIVER={driver};SERVER={server};DATABASE={database};UID={username};PWD={password}"

count = 0
table_name = f"SampleTable_{int(time.time())}"

# Function to execute SELECT query
def run_query(user_id):
    logging.info("User {} started".format(user_id))
    global table_name
    query = "select * from {}".format(table_name)
    conn = pyodbc.connect(driver=driver, host=server, Database=database, UID=username,
                          PWD=password, Trusted_Connection='no')
    logging.info("User {} connection established".format(user_id))
    cursor = conn.cursor()
    try:
        logging.info("User %s Executing Query %s ", user_id, query)

        # Doesn't block task
        # sleep_in_secs = random.randint(5, 10)
        # time.sleep(sleep_in_secs)

        # Blocks task
        cursor.execute(query)
        rows = cursor.fetchall()
        resp_len = len(rows)
        logging.info("User %s completed Query", user_id)

    finally:
        cursor.close()
        conn.close()

def prep_data():
    global table_name

    conn = pyodbc.connect(driver=driver, host=server, Database=database, UID=username, PWD=password,
                          Trusted_Connection='no', autocommit=True)
    cursor = conn.cursor()

    logging.info("Creating table %s", table_name)
    create_table_query = f"CREATE TABLE {table_name} (id INT, name VARCHAR(255), email VARCHAR(255))"
    cursor.execute(create_table_query)
    logging.info("Created table %s", table_name)
    logging.info("Loading data to table %s", table_name)

    for i in range(100_000):
        insert_query = f"INSERT INTO {table_name} (id, name, email) VALUES (?, ?, ?)"
        cursor.execute(insert_query, (i + 1, f"Name {i + 1}", f"email_{i + 1}@example.com"))
    logging.info("Loaded data to table %s", table_name)

    cursor.close()
    conn.close()

# Create ThreadPoolExecutor
with concurrent.futures.ThreadPoolExecutor(max_workers=num_threads) as executor:
    prep_data()
    # Submit tasks to the executor
    futures = [executor.submit(run_query, user_id) for user_id in range(1, num_threads + 1)]

    # Wait for tasks to complete and log completion
    for future in concurrent.futures.as_completed(futures):
        logging.info("Task has completed.")

Any help or feedback would be appreciated. If the issue is with pyodbc or getting a connection, it should have been blocking in threadpool which isn't. Can anyone shed some light on how to work around locust issue getting blocked? I have also tried with sleep statements in locust where it doesn't get blocked. It seems a combination of locust with pyodbc which seems to have an issue.

Tried with two versions - locust, threadpool to see if it is reproducible in threadpool. Threadpool version works as expected.

Edit: Tried with gevent monkey-patching and doesn't work for pyodbc as the cursor.execute() blocks the greenlet.

"""Spawn multiple workers and wait for them to complete"""

import logging

import gevent
from gevent import monkey

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

monkey.patch_all(sys=True)

import pyodbc

# Setting up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# SQL Server Connection Details
server = 'localhost'
database = 'test'
username = 'test'
password = 'test'
driver = 'ODBC Driver 17 for SQL Server'
connection_string = f"DRIVER={driver};SERVER={server};DATABASE={database};UID={username};PWD={password}"
count = 0
# 1m rows table
table_name = "SampleTable_1711329791"


def run_query(user_id, query):
    logger.info('User %d started %s' % (user_id, query))
    conn = pyodbc.connect(driver=driver, host=server, Database=database, UID=username,
                          PWD=password, Trusted_Connection='no')
    logging.info("User {} connection established".format(user_id))
    cursor = conn.cursor()
    try:
        logging.info("User %s Executing Query %s ", user_id, query)

        # Doesn't block task
        # sleep_in_secs = random.randint(5, 10)
        # time.sleep(sleep_in_secs)

        # Blocks task
        cursor.execute(query)
        rows = cursor.fetchall()
        resp_len = len(rows)
        logging.info("User %s completed Query", user_id)

    finally:
        cursor.close()
        conn.close()


jobs = [gevent.spawn(run_query, user_id, "select * from {}".format(table_name)) for user_id in range(1, 10)]

gevent.wait(jobs)
0

There are 0 answers