Home Technology Experiences with kafka consumer in Node.js

Experiences with kafka consumer in Node.js

by sethurama


Every enterprise is powered by data. We take information in, analyze it, manipulate it, and create more as output. Every application creates data, whether it is log messages, metrics, user activity, outgoing messages, or something else. Every byte of data has a story to tell, something of importance that will inform the next thing to be done. In order to know what that is, we need to get the data from where it is created to where it can be analyzed. The faster we can do this, the more agile and responsive our organizations can be. The less effort we spend on moving data around, the more we can focus on the core business at hand. This is why the pipeline is a critical component in the data-driven enterprise. How we move the data becomes nearly as important as the data itself. Here Enters Kafka the Publish/Subscribe Messaging Queue. This article covers about our experience with asynchronous consumer clients (NodeJS, Vert.x) of Kafka.

Need for speed 

In synchronous languages (Ex: Python) where the processing of messages from kafka is done one by one, most of the time is spent in handshakes between the kafka client and kafka broker thus eventually increasing the latency of message processing. So in applications where processing needs to be quick, this becomes an area of concern. However in asynchronous world this is not a problem as allowing bulk fetch from kafka & parallel consumption/processing of messages helps reduce the latency almost by a factor of number of messages in bulk fetch.

Challenges faced with async consumption

 Moving to asynchronous consumption was’t an easy ride, especially because of the new Jargons at the kafka client side whose implementation details are explicitly not known. The challenges which we faced are,

  • Memory Leaks
  • Loss of messages
  • Continuous Rebalancing

Boilerplate Code 

The below is the most used boilerplate code for one of our consumer, 

on(“message”) {
    //Pauses fetch/poll loop
    //Calls consumer handler
    consumerHandlerFunction().then(function() {  
        //Commits processed offset
        //Resumes fetch/poll loop

Here on(“message”) is an event handler which gets called for every message that is fetched from kafka. The idea is simple. Whenever we process a message, we pause() the poll loop, process the message in the consumerHandlerFunction(). Once the processing is done, commit() the offset and resume() the poll loop. Everything seems good from the above code. The catch here is, this works well in the synchronous world where only one message is fetched at once, but in an asynchronous world where multiple messages are fetched & processed at once this boilerplate code flounders.

Cause for the break down

Kafka client library typically fetches a batch of messages based on the configuration max.poll.records or fetch.max.bytes. Lets say the library fetches 50 messages in a batch, it emits 50 messages. Now there are 50 on(“message”) handlers running asynchronously. In the above code, consumer.resume() is used to invoke the poll loop in kafka, thereby fetching a new batch of 50 messages.So every such resume call would lead to another 50 messages getting fetched at once during the worst case scenario. So 50 resume()s fetch 50*50 = 2500 new messages which lead to 2500 on(“message”) handlers and it repeats. This causes a geometrical progressive rise in the number of messages being processed at once. If there are few lakhs of messages in the queue, consumer would read all of those messages in a short span, thereby leading to a Memory Leak, Infact to be specific a memory increase where the amount of allocation was way higher than the amount of deallocation.

Application Queues to the Rescue

To avoid this scenario, it’s clear that the poll loop (consumer.resume()) should only be called when the whole batch of fetched messages (Ex. 50 messages) have been processed. The challenge here is to group all the asynchronous on(“message”) calls at the application layer, process the whole batch and then resume(). To achieve this we push all the received messages in on(“message”) to an application level queue. This queue would asynchronously execute the consumerHandlerfunction(), for each message and call a callback function when all messages in the queue are processed. The callback function is where we know that the current batch of messages (Ex: 50 messages) have been processed completely. The registered callback function would then resume() the poll loop to fetch the next batch of messages from kafka. This way we ensure that we are reading one batch after another while still retaining the asynchronous processing capability.

This is how new code will look,

//concurrentQueue is a application queue with a concurrent processing capacity 
//equal to the set concurrency 10

let concurrentQueue = cq().limit({concurrency: 10}).process(function (messageData, cb) {
    consumerHandlerFunction(messageData).then( function() { 
      //commit for every message after processing
    .catch(function(err) {
      //commit even in failure scenarios
// on drain event handler, invoked when all pushed messages are processed 
concurrentQueue.drained(function () => {
    // resume once for every batch of messages, once processing is complete
on("message") {
  //push the message to the concurrentQueue

In the above code, concurrentQueue.drained() is the point when the current batch of messages has completed the processing. This is the point where we need to resume() to get the next batch.

A Note on Thread safety

“You can’t have multiple consumers that belong to the same group in one thread and you can’t have multiple threads safely use the same consumer. One consumer per thread is the rule. To run multiple consumers in the same group in one application, you will need to run each in its own thread.” from Confluent-kafka blog

Recommendations for stable consumers

  • If the order of messages are not important, consume & process many at once. Can be configured appropriately by max.poll.records or fetch.max.bytes.
  • One poll per Consumer Thread is the rule, increase scale by increasing the number of messages fetched & processed per poll
  • Optimize for high throughput by increasing the number of partitions and consumers, allowing kafka to grow horizontally.
  • Kafka guarantees at least once message delivery semantics so write your consumer in an idempotent way – meaning even if the same message is sent twice, it has no negative impact on the correctness.
  • If idempotence is not intrinsic in your consumer side code, manage states to preserve idempotence.
  • Tune retries and timeout to a susceptible value as this might slow down the consumers. Failing fast is the key.
  • For retryable errors reque the message or use a delayed queue.
  • If possible try to have a separate thread for heartbeat, to avoid contention between heartbeat events and message processing.


We’ll leave it at that for now, but there is much more to learn about Kafka Consumer patterns, tricks and gotchas. In the forthcoming chapters we will be covering loss of messages & Rebalancing issues which we faced. Stay tuned for more articles on the topic.

You may also like

Leave a Reply

This site uses Akismet to reduce spam. Learn how your comment data is processed.

%d bloggers like this: