Error: read ECONNRESET while using rabbitmq

49 views Asked by At

I am building a microservice application in MERN stack and using RabbitMQ as message broker while running the service I am getting a error after sometime Error: read ECONNRESET at TLSWrap.onStreamRead (node:internal/stream_base_commons:217:20) { errno: -4077, code: 'ECONNRESET', syscall: 'read' } here is my code

import * as amqp from 'amqplib';
import dotenv from 'dotenv'
dotenv.config();


const rabbitURL: any = process.env.RABBIT_MQ
class RabbitMQ {
  private static connection: amqp.Connection | null = null;

  static async getConnection(): Promise<amqp.Connection> {
    try {
      if (!RabbitMQ.connection) {
        RabbitMQ.connection = await amqp.connect(rabbitURL);
      }
      return RabbitMQ.connection as amqp.Connection
    } catch (error) {
      console.error(error);
      throw error;
    }
  }

  static async createChannel(): Promise<amqp.Channel> {
    try {
      const connection = await RabbitMQ.getConnection();
      return connection.createChannel();
    } catch (error) {
      console.error(error);
      throw error;
    }
  }
}

export default RabbitMQ;

this is the consumer file

async gigCreatedConsumer(){
    try{
        console.log("starting rabbit mq channel ");
        const channel = await RabbitMQ.createChannel();
        const exchangeName = 'gig-exchange';
        const queueName = 'gig-service-queue';
        await channel.assertExchange(exchangeName, 'direct', {durable: false});
        const {queue} = await channel.assertQueue(queueName, {durable: false});
        const routingKey = 'gig-created';
        await channel.bindQueue(queue ,exchangeName, routingKey);
        return new Promise((resolve ,reject)=>{
            channel.consume(queue, (message)=>{
                if(message){
                    try {
                        const createdGig: any = JSON.parse(message.content.toString());
                        channel.ack(message);
                        resolve(createdGig)
                    } catch (error) {
                        console.error("error processing gig creation");
                        channel.ack(message);
                        reject(error)
                    }
                }
            })
        })
        await channel.close()
        
    }catch(err){
        console.error("error setting up consumer", err)
    }
},

And I have called the created the function in side the controller and called the function inside index.ts file

index.ts

const app = express();
app.use(cors())


dotenv.config()
app.use(express.urlencoded({ extended: true }))
app.use(express.json());
const PORT = process.env.PORT || 8001;

userController.setup()
userController.gigAccept()
userController.gigReject()
userController.gigDeleteEvent()
app.use(router)
app.use(userRouter)

controller.ts

  async gigDeleteEvent() {
    try {
      const data: any = await userGigConsumers.gigDeleteConsumer();
      const gigId = data;

      const objId = await GigUserModel.find({ refId: gigId })
      const gig = await GigUserModel.findByIdAndDelete(objId[0]._id);
      console.log("gig deleted from usergig database");
    } catch (error) {
      console.log(error);

    }
  },
0

There are 0 answers