I am building a microservice application in MERN stack and using RabbitMQ as message broker while running the service I am getting a error after sometime Error: read ECONNRESET at TLSWrap.onStreamRead (node:internal/stream_base_commons:217:20) { errno: -4077, code: 'ECONNRESET', syscall: 'read' } here is my code
import * as amqp from 'amqplib';
import dotenv from 'dotenv'
dotenv.config();
const rabbitURL: any = process.env.RABBIT_MQ
class RabbitMQ {
private static connection: amqp.Connection | null = null;
static async getConnection(): Promise<amqp.Connection> {
try {
if (!RabbitMQ.connection) {
RabbitMQ.connection = await amqp.connect(rabbitURL);
}
return RabbitMQ.connection as amqp.Connection
} catch (error) {
console.error(error);
throw error;
}
}
static async createChannel(): Promise<amqp.Channel> {
try {
const connection = await RabbitMQ.getConnection();
return connection.createChannel();
} catch (error) {
console.error(error);
throw error;
}
}
}
export default RabbitMQ;
this is the consumer file
async gigCreatedConsumer(){
try{
console.log("starting rabbit mq channel ");
const channel = await RabbitMQ.createChannel();
const exchangeName = 'gig-exchange';
const queueName = 'gig-service-queue';
await channel.assertExchange(exchangeName, 'direct', {durable: false});
const {queue} = await channel.assertQueue(queueName, {durable: false});
const routingKey = 'gig-created';
await channel.bindQueue(queue ,exchangeName, routingKey);
return new Promise((resolve ,reject)=>{
channel.consume(queue, (message)=>{
if(message){
try {
const createdGig: any = JSON.parse(message.content.toString());
channel.ack(message);
resolve(createdGig)
} catch (error) {
console.error("error processing gig creation");
channel.ack(message);
reject(error)
}
}
})
})
await channel.close()
}catch(err){
console.error("error setting up consumer", err)
}
},
And I have called the created the function in side the controller and called the function inside index.ts file
index.ts
const app = express();
app.use(cors())
dotenv.config()
app.use(express.urlencoded({ extended: true }))
app.use(express.json());
const PORT = process.env.PORT || 8001;
userController.setup()
userController.gigAccept()
userController.gigReject()
userController.gigDeleteEvent()
app.use(router)
app.use(userRouter)
controller.ts
async gigDeleteEvent() {
try {
const data: any = await userGigConsumers.gigDeleteConsumer();
const gigId = data;
const objId = await GigUserModel.find({ refId: gigId })
const gig = await GigUserModel.findByIdAndDelete(objId[0]._id);
console.log("gig deleted from usergig database");
} catch (error) {
console.log(error);
}
},