Rocketmq message loss

38 views Asked by At

I used 2 different consumers to consume data from the same topic. I expected the 2 consumers to return the same result. But I found that there were messages which was caught by the first consumer but failed to be caught by second consumer. I had no idea what caused this. Can someone told me what I can do to guarantee that no message should be lost in consumption?

First I tried to set a time gap which was 1 hour. Then I used 2 different consumers to get messages which were created in an hour.I compared the number of messages caught by 2 consumers, they are different. I tried setting different time gap but the outcome from different consumers were still different.

1

There are 1 answers

0
Francis Lee On

It's the MessageModel, NOT message loss. you can try MessageModel.BROADCASTING

public class PushConsumer {

    public static void main(String[] args) throws InterruptedException, MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_1");

        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        // consumer.setMessageModel(MessageModel.BROADCASTING); // you can try this
        // consumer.setMessageModel(MessageModel.CLUSTERING); // default 

        consumer.subscribe("TopicTest", "TagA || TagC || TagD");
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.printf("Broadcast Consumer Started.%n");
    }
}