How can I read the data from RabbitMQ streams using Quarkus. Is it possible to use SmallRye to achieve this or not? Or do I have to use rabbit client directly?
I would like to
- read data from rabbit stream (not queue)
- replay data from rabbit stream from a certain point in time
I have not found any reference about this using SmallRye wrapper
I have this config
# incoming
mp.messaging.incoming.requestss.connector=smallrye-rabbitmq
mp.messaging.incoming.requestss.exchange.name=quote-requests
mp.messaging.incoming.requestss.queue.name=quote-stream
mp.messaging.incoming.requestss.queue.x-queue-type=stream
but I am getting the following error
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - consumer prefetch count is not set for 'queue 'quote-stream' in vhost '/'', class-id=60, method-id=20)
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:66)
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:36)
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:502)
at com.rabbitmq.client.impl.ChannelN.basicConsume(ChannelN.java:1378)
... 15 more
RabbitMQ Streams model is an append-only log of messages. Every consumer starts reading a specified offset of the log. Because of this QoS (a.k.a. prefetch count) must be set as an incoming channel. In SmallRye RabbitMQ Connector the
max-outstanding-messagesconfig property controls the QoS valueNOTE: I haven't find possibilities to configure RabbitMQ Streams specific arguments (e.g.
x-stream-offset,x-max-age,x-stream-max-segment-size-bytes, etc), so I assume each client will use the default values of those arguments. It would be great if SmallRye made that.