How should it be handled if the number of messages is less than the number of threads?

30 views Asked by At

Sorry for my poor English.

When using multi-threading to process messages from a RabbitMQ message queue, how should it be handled if the number of messages is less than the number of threads?

My code that a maximum of 5 threads can be used for processing. If there are 13 URLs in the message queue, when the program finishes running, there will be 3 URLs left unprocessed. If there are 12 URLs in the message queue, then 2 URLs will be left unprocessed.

I'm not good at RabbitMQ, but I guess the problem must be related to the logic of the RabbitMQ code.

async def processTask(message: aio_pika.abc.AbstractIncomingMessage):
    try:        
        messagesList.append(message)
        # when message number =5
        if len(messagesList) == 5:
            messages_to_process = messagesList[:]  
            
            max_workers = 5 if len(messages_to_process) >= 5 else len(messages_to_process)
            with ThreadPoolExecutor(max_workers=max_workers) as pool:
                futures = {pool.submit(pageCollect, messageItem): messageItem for messageItem in
                           messages_to_process}
                for future in as_completed(futures):
                    messageItem = futures[future]
                    try:
                        resultDict = future.result()
                        if resultDict:
                            await messageItem.ack()
                            
                            logger.info(f'{messageItem.delivery_tag}: {resultDict}')
                        else:
                            await messageItem.reject(requeue=True)
                    except Exception as e:
                        logger.error(f'{messageItem.delivery_tag}: {e}')
                        await messageItem.reject(requeue=True)
            linkList.clear()
            messagesList.clear()

        # when message number < 5
        remaining_messages = queue.declaration_result.message_count
        if remaining_messages > 0:
            messages_to_process = messagesList[:]
            max_workers = len(messages_to_process)
            with ThreadPoolExecutor(max_workers=max_workers) as pool:
                futures = {pool.submit(pageCollect, messageItem): messageItem for messageItem in messagesList}
                for future in as_completed(futures):
                    messageItem = messages_to_process.pop(0)
                    try:
                        resultDict = future.result()
                        if resultDict:
                            await messageItem.ack()
                            logger.info(f'{messageItem.delivery_tag}: {resultDict}')
                        else:
                            await messageItem.reject(requeue=True)
                    except Exception as e:
                        logger.error(f'{messageItem.delivery_tag}: {e}')
                        await messageItem.reject(requeue=True)
            messagesList.clear()

        await asyncio.sleep(0.1)

    except Exception as e:
        print(e)


async def main(loop):
    try:
        # connect
        connection = await aio_pika.connect_robust(host='XX.XX.X.XX', port=5672, login='admin', password='admin',
                                                   virtualhost='my_vhost', loop=loop)
        
        global channel
        channel = await connection.channel()
        # Will take no more than 10 messages in advance
        await channel.set_qos() #prefetch_count=5
        crawler_exchange = await channel.declare_exchange(name='crawler_exchange', type='fanout')
        
        queueName = "myqueue"
        global queue
        queue = await channel.declare_queue(queueName, durable=True)
        await queue.bind(crawler_exchange, routing_key="myqueue")

        
        rstqueueName = "Result"
        global resultQuequ
        resultQuequ = await channel.declare_queue(rstqueueName, durable=True)
        
        global resultExchange
        resultExchange = await channel.declare_exchange(name='resultExchange', type='direct')
        
        await resultQuequ.bind(resultExchange, routing_key="allResult")

        # get message
        await queue.consume(processTask)
        logger.info(f"Waiting for messages at {queue.name}. To exit press CTRL+C")
        return connection

    except Exception as e:
        logger.error(f"failed: {e}")
        logger.error(traceback.format_exc())


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    connection = loop.run_until_complete(main(loop))

    try:
        loop.run_forever()
    except KeyboardInterrupt:
        logger.info("Received exit signal")
    finally:
        loop.run_until_complete(connection.close())
        loop.close()

I have tried to modify my code with the help of gpt4 and claude3 , but their code doesn't work properly.How should I fix the problem ? Thank you.

What I need: When the number of messages in the message queue is >= 5, there should be 5 worker threads. When the number of messages is < 5, the number of worker threads should equal the number of messages. This can improve processing speed and ensure that there are no unprocessed URLs left in the message queue.

0

There are 0 answers