I would appreciate help on parallelism with RabbitMQ.
Given a large json file, let's say it has 1 million items, I want to go through it in parallel so that each item is passed to Rabbit using multithreading.
I read quite a few blogs and it seems that there is a problem with using one connection with one channel with multithreading and if I understood correctly it was recommended to add channels, in practice I didn't really understand how this was implemented in the code, I tried all kinds of things and nothing worked.
I would like to know if my desire is possible, or if I have to parse the file iteratively one after the other, and if it is possible if I can get help with the code?
Here I define the connection and the publish to rabbit function :
class RabbitMQClient:
def __init__(
self,
queues_connection_config: Dict[str, Any],
queue_name: str,
) -> None:
self.__logger = logging.getLogger(f"{__name__}.{self.__class__.__name__}")
self.__queue_name = queue_name
self.__exchange = queues_connection_config.get("exchange")
self._connection = Connection(
queues_connection_config.get("hostname"),
queues_connection_config.get("username"),
queues_connection_config.get("password"),
queues_connection_config.get("port"),
virtual_host=queues_connection_config.get("virtual_host"),
)
self.channel=self._connection.channel()
self.channel.confirm_deliveries()
def enqueue(self, queues_item: Dict[str, Any]) -> str:
message = Message.create(self.channel, json.dumps(queues_item))
message.publish(routing_key=self.__queue_name, exchange=self.__exchange, mandatory=True)
self.__logger.debug(f"Enqueued {message.correlation_id}")
return str(message.correlation_id)
Here I try to divide the task into multi threading without success:
for i in range(checkpoint, 10 + 1):
batch_data = [next(items) for _ in range(1000)]
try:
start_time = time.time()
futures = [
self.executor.submit(self.__queue_client.enqueue, item)
for item in batch_data
]
# Wait for all threads to complete
concurrent.futures.wait(futures)