I need to know how to manually ack the messages on the queue direct from the Consumer I created and to set a retry strategy of 5 times, each attemp increasing the time like second try 5min, third try, 10min after second try failed, fourth 15min...
I'm kinda lost in the Rabbit documentation, I learned a bit of the concept but the practical use is still a mistery to me...
I'm using Symfony 6.1 and my old_sound_rabbit_mq.yaml looks like this:
old_sound_rabbit_mq:
connections:
default:
host: '%rabbitmqHost%'
port: '%rabbitmqPort%'
user: '%rabbitmqUser%'
password: '%rabbitmqPassword%'
vhost: '%rabbitmqVhost%'
consumers:
upload_file:
connection: default
exchange_options: { name: 'upload_file_exchange', type: direct, durable: true, auto_delete: false }
queue_options: { name: 'upload_file_queue', durable: true, auto_delete: false, arguments: { 'x-max-priority': [ 'I', 20 ] } }
callback: App\Consumer\UploadFileConsumer
qos_options: { prefetch_size: 0, prefetch_count: 1, global: false }
This is my consumer:
<?php
declare(strict_types=1);
namespace App\Consumer;
use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface;
use PhpAmqpLib\Message\AMQPMessage;
class UploadFileConsumer implements ConsumerInterface
{
public function execute(AMQPMessage $msg): void
{
try {
// do something with $msg, if all is good then ack the msg and remove from queue
} catch (\Exception $e) {
// keep message in queue, don't ack it, keep it in queue retry 5 times then stop consumer if no success
}
}
}
AMQPMessage provides both
ack()andnack()methods for this purpose.https://github.com/php-amqplib/php-amqplib/blob/master/PhpAmqpLib/Message/AMQPMessage.php#L98-L128
So likely what you want is:
Though I'm not familiar with a way to limit the number of times a message is re-queued without modifying the headers/payload and re-queueing it as a new message. Alternatively, you can set a TTL value and messages will eventually time out of the queue. You can also create a dead-letter exchange if you want to inspect nack'ed/expired messages. [just make sure to clean it out, otherwise you'll have new problems]
If I had to kludge in a "re-queue X times" I'd suggest a cache with a built-in TTL like Redis, the key is the message ID, and the value is the number of retries.
Edit:
Spitballing some workflows for "Task A must complete before Task B can begin", in order of decreasing preference:
and always beware of "infinite requeue" if your queue does not have a defined TTL, as messages will continue to build up, and your consumers will be working constantly on tasks that may never be completed successfully.