How to convert librdkafka payload to json to get a parameter value?

819 views Asked by At

I'm playing around with the consumer.c example file for librdkafka and I'm trying to figure out how to convert the rkm payload (that is printed out at line 244) to json so that I can grab a parameter's value from the json.

Right now I'm using jansson but am having some problems with it which I can expand on if needed.

Is there a feature for this in librdkafka or the standard C libraries that I'm unaware of?

1

There are 1 answers

4
Edenhill On

Simply pass the message payload (binary) and length to Jansson, like so:

/* For each consumed message .. */
rd_kafka_message_t *rkmessage = rd_kafka_consumer_poll(consumer, 1000);

if (!rkmessage) {
   /* No message available */
   continue;

} else if (rkmessage->err) {
   /* Handle consumer event/error (typically not fatal) */
   handle_consumer_error(rkmessage->err, "%s", rd_kafka_message_errstr(rkmessage));

} else if (rkmessage->len > 0) {
   /* Parse JSON value */
   json_error_t err;
   json_t *json = json_loadb(rkm->payload, rkm->len, JSON_ALLOW_NUL, &err);
   if (!json) {
      handle_consumer_error(RD_KAFKA_RESP_ERR__BAD_MSG,
                            "Failed to parse JSON for message at %s [%"PRId32"] offset %"PRIu64: line %d: %s\n",
                            rd_kafka_topic_name(rkmessage->rkt), rkmessage->partition),
                            rkmessage->offset, err.line, err.text);
   } else {
      /* Process the message */
      process_message(json);

      json_decref(json);
   }
}

rd_kafka_message_destroy(rkmessage);

See the Jansson docs for more info: https://jansson.readthedocs.io/en/latest/tutorial.html