Consumer to RabbitMQ stream - Bad header / Command not implemented 0 buff:0

163 views Asked by At

I have this configuration in docker-compose:

services:

  rabbitmq:
    image: rabbitmq:3-management
    volumes:
      - rabbitmq_data:/var/lib/rabbitmq
    ports:
      - "5672:5672"
      - "15672:15672"
    environment:
      - RABBITMQ_PLUGINS=rabbitmq_stream   // stream plugin enabled?
      - RABBITMQ_DEFAULT_USER=rabbitmq
      - RABBITMQ_DEFAULT_PASS=rabbitmq

My consumer to the stream looks like:

package main

import (
    "fmt"
    "log"

    "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp"
    "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream"
)

func main() {
    // Create an environment
    env, err := stream.NewEnvironment(
        stream.NewEnvironmentOptions().
            SetHost("localhost").
            SetPort(5672).           // 5552
            SetUser("rabbitmq").     // Replace with your username
            SetPassword("rabbitmq"), // Replace with your password
    )

    if err != nil {
        log.Fatalf("Failed to create environment: %s", err)
    }

    // Define a handler for incoming messages
    handleMessage := func(consumerContext stream.ConsumerContext, message *amqp.Message) {
        for _, z := range message.Data {
            fmt.Println("Received message:", string(z))
        }
    }

    // Create a consumer
    _, err = env.NewConsumer(
        "your_stream_name",
        handleMessage,
        stream.NewConsumerOptions().SetConsumerName("my_consumer"),
    )

    if err != nil {
        log.Fatalf("Failed to create consumer: %s", err)
    }

    select {}
}

but I get this error:

2023/12/14 11:03:16 [warn] - Command not implemented 0 buff:0 
2023/12/14 11:03:26 [error] - timeout 10000 ns - waiting Code, operation: commandPeerProperties
2023/12/14 11:03:26 [error] - Can't set the peer-properties. Check if the stream server is running/reachable
2023/12/14 11:03:26 Failed to create environment: timeout 10000 ms - waiting Code, operation: commandPeerProperties

in the RabbitMQ server logs, I see this:

wss-rabbitmq-1 | 2023-12-14 16:01:07.964018+00:00 [info] <0.9.0> Time to start RabbitMQ: 30736437 us wss-rabbitmq-1 | 2023-12-14 16:02:28.713781+00:00 [info] <0.752.0> accepting AMQP connection <0.752.0> (192.168.65.1:33513 -> 192.168.240.3:5672) wss-rabbitmq-1 | 2023-12-14 16:02:28.714007+00:00 [error] <0.752.0> closing AMQP connection <0.752.

wss-rabbitmq-1 | 2023-12-14 16:02:28.714007+00:00 [error] <0.752.0> {bad_header,<<0,0,0,243,0,17,0,1>>} wss-rabbitmq-1 | 2023-12-14 16:03:16.808111+00:00 [info] <0.781.0> accepting AMQP connection <0.781.0> (192.168.65.1:33844 -> 192.168.240.3:5672)

wss-rabbitmq-1 | 2023-12-14 16:03:16.808803+00:00 [error] <0.781.0> closing AMQP connection <0.781.0> (192.168.65.1:33844 -> 192.168.240.3:5672): wss-rabbitmq-1 | 2023-12-14 16:03:16.808803+00:00 [error] <0.781.0> {bad_header,<<0,0,0,243,0,17,0,1>>}0> (192.168.65.1:33513 -> 192.168.240.3:5672): Blockquote

I ensured that the streams plugin was enabled using:

docker exec 1d0e959696d8 rabbitmq-plugins enable rabbitmq_stream

and even though the RMQ logs said the plugin was successfully enabled, I still got the same error..

1

There are 1 answers

2
Gabriele Santomaggio On

The client you are using needs the stream plugin enabled.

rabbitmq-plugins enable rabbitmq_stream rabbitmq_stream_management

Read the documentation about: https://github.com/rabbitmq/rabbitmq-stream-go-client?tab=readme-ov-file#run-server-with-docker

Then you need to expose the correct port 5552 ( stream port) 5672 is for AMQP

please see this link: https://www.rabbitmq.com/stream-core-plugin-comparison.html