I can’t attach a Rabbit MQ client to Locust user

111 views Asked by At

I am trying to run a load test that injects messages to rabbit mq, but I can get locust user class to get the custom client I created loaded. Has someone managed to do this?

import os
from locust import HttpUser, task, TaskSet, run_single_user
from kombu import Connection, Exchange, Queue


class KombuClient:
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.connection = Connection(
            hostname=os.getenv(
                "LOCUST_AMQP_CONFIG", "amqp://guest:guest@localhost:5672//"
            ),
        )
        self.channel = self.connection.channel()

    def send_message(self, message):
        exchange = Exchange(
            name=os.getenv("LOCUST_AMQP_EXCHANGE", "test_exchange"),
            type="direct",
        )
        queue = Queue(
            name=os.getenv("LOCUST_AMQP_QUEUE", "test_queue"),
            exchange=exchange,
            routing_key=os.getenv("LOCUST_AMQP_KEY", "test_key"),
        )
        queue.maybe_bind(self.connection)
        queue.declare()

        producer = self.connection.Producer(
            exchange=exchange,
            routing_key=os.getenv("LOCUST_AMQP_KEY", "test_key"),
        )
        producer.publish(message)
        print("INFO: Sent message: '{}'".format(message))

    def close_connection(self):
        self.connection.release()

class UserBehavior(TaskSet):
    @task
    def send_hello_message(self):
        I make it work by, instead of the line below but that is not 100% correct--> KombuClient().send_message("Hello World!")
        self.client.send_message("Hello World!")


class RabbitMQUser(HttpUser):
    host = "http://localhost"
    tasks = [UserBehavior]
    min_wait = 5000
    max_wait = 9000
    client = KombuClient
  

if __name__ == "__main__":
    run_single_user(RabbitMQUser)

In general I can inject the messages but Locust does not recognize the transaction and does not shows any user running.

I have tried to inspect the Locust code to see what the client expected class is, but I haven’t found any useful information.

I guess the API changed as I am using Locust 2.15.1 and there is no HttpClient class no more

2

There are 2 answers

1
Solowalker On BEST ANSWER

This question is covered in the Locust FAQ and examples can be found in the docs. What you need to do is fire the request event with all your request info and stats. You're using HttpUser but overriding the client so you're making it just a normal User which doesn't know about what you're doing and you need to tell it what to record and when.

Example taken from the docs for the gRPC scenario:

self.environment.events.request.fire(
        request_type="grpc",
        name=call_details.method,
        response_time=(time.perf_counter() - start_perf_counter) * 1000,
        response_length=response_length,
        response=response,
        context=None,
        exception=exception,
    )

Call this with your own stats. See the linked doc page for explanations as to what the parameters are meant to be, but since you're doing a custom user you can make them be and mean whatever you want.

0
Allan Noguera On

Here is the updated code, I am still missing adding the changes @Solowalker suggested, but if you put all together you can have the whole answer. I will try to update the code once all is working.

import os
from locust import task, TaskSet, User, run_single_user
from locust.clients import HttpSession
from kombu import Connection, Exchange, Queue


class KombuClient(HttpSession):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.connection = Connection(
            hostname=os.getenv(
                "LOCUST_AMQP_CONFIG", "amqp://guest:guest@localhost:5672//"
            ),
        )
        self.channel = self.connection.channel()

    def send_message(self, message):
        exchange = Exchange(
            name=os.getenv("LOCUST_AMQP_EXCHANGE", "test_exchange"),
            type="direct",
        )
        queue = Queue(
            name=os.getenv("LOCUST_AMQP_QUEUE", "test_queue"),
            exchange=exchange,
            routing_key=os.getenv("LOCUST_AMQP_KEY", "test_key"),
        )
        queue.maybe_bind(self.connection)
        queue.declare()

        producer = self.connection.Producer(
            exchange=exchange,
            routing_key=os.getenv("LOCUST_AMQP_KEY", "test_key"),
        )
        producer.publish(message)
        print("INFO: Sent message: '{}'".format(message))

    def close_connection(self):
        self.connection.release()


class UserBehavior(TaskSet):
    @task
    def send_hello_message(self):
        self.client.send_message(message="Hello World!")


class RabbitMQUser(User):
    host = "http://localhost"
    tasks = [UserBehavior]
    min_wait = 5000
    max_wait = 9000
    abstract = True

    def __init__(self, environment):
        super().__init__(environment)
        self.client = KombuClient(self.host, user=self, request_event=environment.events.request)


if __name__ == "__main__":
    run_single_user(RabbitMQUser)