I want to consume messages from a rabbitmq service and for each message I receive I want to do something (Ex: Put that message in to a database, Process the message and send a reply via RabbitMq through another queue) per message.
Currently my RabbitMq consumer code is as follows:
const all = require('bluebird').all;
const basename = require('path').basename;
function receive() {
const severities = process.argv.slice(2);
if (severities.length < 1) {
console.warn('Usage: %s [info] [warning] [error]',
basename(process.argv[1]));
process.exit(1);
}
let config = {
protocol: 'amqp',
hostname: 'localhost',
port: 5672,
username: 'rumesh',
password: 'password',
locale: 'en_US',
frameMax: 0,
heartbeat: 0,
vhost: '/',
};
amqp.connect(config).then(function (conn) {
process.once('SIGINT', function () {
conn.close();
});
return conn.createChannel().then(function (ch) {
let queue = 'test';
let exchange = 'test-exchange';
let key = 'python-key';
let exchange_type = 'direct';
let ok = ch.assertExchange(exchange, exchange_type, {durable: true});
ok = ok.then(function () {
return ch.assertQueue(queue, { durable: true});
});
ok = ok.then(function (qok) {
const queue = qok.queue;
return all(severities.map(function (sev) {
ch.bindQueue(queue, exchange, sev,{durable: true});
})).then(function () {
return queue;
});
});
ok = ok.then(function (queue) {
return ch.consume(queue, logMessage, {noAck: true});
});
return ok.then(function () {
console.log(' [*] Waiting for logs. To exit press CTRL+C.');
});
function logMessage(msg) {
console.log(" [x] %s:'%s'",
msg.fields.routingKey,
msg.content.toString());
}
});
}).catch(console.warn);
}
module.exports = receive;```
I'd suggest you create a handler function like onNewMessage that gets called each time you get a new message on a queue.
You can encode messages in lots of ways since you can send binary data via AMQP.
JSON is definitely one way to send messages, this is very convenient to deal with in Node.js.
Here's some sample code that connects to a server, then sends and receives messages: