Sorry for my poor English.
When using multi-threading to process messages from a RabbitMQ message queue, how should it be handled if the number of messages is less than the number of threads?
My code that a maximum of 5 threads can be used for processing. If there are 13 URLs in the message queue, when the program finishes running, there will be 3 URLs left unprocessed. If there are 12 URLs in the message queue, then 2 URLs will be left unprocessed.
I'm not good at RabbitMQ, but I guess the problem must be related to the logic of the RabbitMQ code.
async def processTask(message: aio_pika.abc.AbstractIncomingMessage):
try:
messagesList.append(message)
# when message number =5
if len(messagesList) == 5:
messages_to_process = messagesList[:]
max_workers = 5 if len(messages_to_process) >= 5 else len(messages_to_process)
with ThreadPoolExecutor(max_workers=max_workers) as pool:
futures = {pool.submit(pageCollect, messageItem): messageItem for messageItem in
messages_to_process}
for future in as_completed(futures):
messageItem = futures[future]
try:
resultDict = future.result()
if resultDict:
await messageItem.ack()
logger.info(f'{messageItem.delivery_tag}: {resultDict}')
else:
await messageItem.reject(requeue=True)
except Exception as e:
logger.error(f'{messageItem.delivery_tag}: {e}')
await messageItem.reject(requeue=True)
linkList.clear()
messagesList.clear()
# when message number < 5
remaining_messages = queue.declaration_result.message_count
if remaining_messages > 0:
messages_to_process = messagesList[:]
max_workers = len(messages_to_process)
with ThreadPoolExecutor(max_workers=max_workers) as pool:
futures = {pool.submit(pageCollect, messageItem): messageItem for messageItem in messagesList}
for future in as_completed(futures):
messageItem = messages_to_process.pop(0)
try:
resultDict = future.result()
if resultDict:
await messageItem.ack()
logger.info(f'{messageItem.delivery_tag}: {resultDict}')
else:
await messageItem.reject(requeue=True)
except Exception as e:
logger.error(f'{messageItem.delivery_tag}: {e}')
await messageItem.reject(requeue=True)
messagesList.clear()
await asyncio.sleep(0.1)
except Exception as e:
print(e)
async def main(loop):
try:
# connect
connection = await aio_pika.connect_robust(host='XX.XX.X.XX', port=5672, login='admin', password='admin',
virtualhost='my_vhost', loop=loop)
global channel
channel = await connection.channel()
# Will take no more than 10 messages in advance
await channel.set_qos() #prefetch_count=5
crawler_exchange = await channel.declare_exchange(name='crawler_exchange', type='fanout')
queueName = "myqueue"
global queue
queue = await channel.declare_queue(queueName, durable=True)
await queue.bind(crawler_exchange, routing_key="myqueue")
rstqueueName = "Result"
global resultQuequ
resultQuequ = await channel.declare_queue(rstqueueName, durable=True)
global resultExchange
resultExchange = await channel.declare_exchange(name='resultExchange', type='direct')
await resultQuequ.bind(resultExchange, routing_key="allResult")
# get message
await queue.consume(processTask)
logger.info(f"Waiting for messages at {queue.name}. To exit press CTRL+C")
return connection
except Exception as e:
logger.error(f"failed: {e}")
logger.error(traceback.format_exc())
if __name__ == '__main__':
loop = asyncio.get_event_loop()
connection = loop.run_until_complete(main(loop))
try:
loop.run_forever()
except KeyboardInterrupt:
logger.info("Received exit signal")
finally:
loop.run_until_complete(connection.close())
loop.close()
I have tried to modify my code with the help of gpt4 and claude3 , but their code doesn't work properly.How should I fix the problem ? Thank you.
What I need: When the number of messages in the message queue is >= 5, there should be 5 worker threads. When the number of messages is < 5, the number of worker threads should equal the number of messages. This can improve processing speed and ensure that there are no unprocessed URLs left in the message queue.