Home Technology Autoscaling Kafka Consumers on kubernetes

Autoscaling Kafka Consumers on kubernetes

by riflerrick


Time and again, we come across autoscaling based on web-requests or response-time. Needless to say CPU/Memory and other system metrics(host metrics) based autoscaling has become common-place. In this article we take a deep dive into kafka consumer-group lag-based autoscaling of kafka consumers running in kubernetes.


Usually the main driving factor behind any autoscaling effort is to ensure we run systems with best performance, while simultaneously keeping a tab on costs. Autoscaling especially in the context of app-servers has become even more commonplace after kubernetes came into the scene with its native horizontal pod autoscaler.

For those unfamiliar, the kubernetes native horizontal pod autoscaler(HPA) enables autoscaling of pods in a given deployment based on just CPU and Memory metrics.

Although the native HPA(with CPU and Memory metrics) would suffice in a lot of the cases, for kafka consumers, we realized we needed more to scale them effectively.
In the context of kafka, message delivery SLAs are very critical. For certain events a delay in the consumption can result in an array of issues from degraded customer experience to delay in order processing to name a few. It is also worth noting that the rate of change of memory and CPU metrics for a kafka-consumer running as a kubernetes pod may not be correlated with the kafka consumer-group lag on the corresponding topic. It is therefore only fitting that we attempt to scale the consumers based on the lag on the consumer-group.
This would address cost optimization without affecting our message delivery SLAs.


Our goal was to essentially arrive at a number of desired consumers given the consumer-group lag. The algorithm used by the kubernetes native HPA is rather well documented. Anyone who has had a decent experience with kubernetes will tell you the way to go about solving such a problem is to use a custom-metrics APIServer and expose a metric(through an API) usable by the kubernetes native HPA. Folks across the kubernetes community have very kindly open-sourced some custom-metrics APIServers that expose selected metrics from selected data sources. However our situation was different. We could not find any implementation that would suit our exact needs. Furthermore preparing an APIServer from scratch for kafka consumer-group lag metric is not only a non-trivial problem but also requires a fair bit of engineering effort to conform to kubernetes specific SLOs and SLAs. We therefore decided to go with a custom controller, effectively a pod that would literally take the responsibility of figuring out the desired consumers of a given consumer-group and scale the corresponding deployment accordingly.

While going through all possible use-cases and figuring out reliability guarantees, we quickly realized that in order to keep our message delivery SLAs intact, we could no longer rely on consumer-group lag alone. Its completely possible for the number of consumers in a consumer group to be just enough to ensure zero lag. Although from the outside considering the trend of consumer-group lag it might seem that it is safe to downscale consumers because of zero lag for an extended period of time, in reality, a downscale would result in a subsequent lag build-up which may once again effect message delivery SLAs.
We therefore introduced message production rate into the scene. Kafka exposes a wide range of metrics through JMX which are collectable using jmxterm. Message production rate is one such metric that we can collect and use it along with consumer-group lag to compute the desired consumers at any point in time.

Pilot and Iteration-1

The goal of our custom controller was to autoscale a given deployment based on its consumer-group lag and message production rate. There would essentially be a single autoscale-controller pod that would cater to a given deployment. Since we know the average latency of every consumer, its possible to compute the number of messages each consumer in a given consumer-group can consume in a period of time and hence the number of consumers we needed to run for a given consumer-group lag and message production-rate. We finally wrote this controller in python.

The pilot proved to be an effective estimate of how far we can save on running hosts. We were running 52% less consumers after introducing the lag-based autoscaler while still maintaining our message delivery SLAs.

Issues with the Iteration-1

Although we were able to save cost on the consumers, we were running one HPA controller pod per consumer group that would take more than 250 millicores of CPU.

250+ millicores per lag-based HPA controller catering to individual consumer-groups

This would mean for N consumer-groups we would have to run N hpa controller pods each consuming that much resources. As consumer groups increased, the resource consumption by the HPA pods themselves were becoming non-trivial and we had to quickly address this to really make our cost savings worthwhile.

Optimizations and Iteration-2

We decided to revisit the design of the controller to cater to all consumer-groups at once.


The lag-based HPA we built would essentially compute the desired number of consumers for a consumer group every 30 seconds(30 seconds being the default-value which is twice the default value of the kubernetes native HPA) which is also called the HPA sync period in HPA literature. At its core it is an infinite-loop that would execute the following operations sequentially:

  • pull the topic-lag for the given consumer group
  • pull the message production rate
  • compute the desired consumers
  • scale the consumers.

How effectively the HPA responds and scales would therefore also depend on the latency of the datasources from which it is pulling the metrics.
In order to cater to all consumer groups, the new version of our controller would have to execute all the above steps inside the sync period of the HPA(in our case inside 30 seconds). Furthermore we should also have the luxury to reduce the sync period. This became our problem statement.

The problem statement here seems to bode well with the idea of an event-loop. Given that a lot of time is spent on network IO, we thought it might be possible to exploit python asyncio in order to solve this problem. At the get-go ofcourse our intention was simply to release another version of the same controller with asyncio integrated. However python asyncio requires the corresponding libraries and packages in use to be aysncio compatible as well. As we started transforming and re-designing the controller we quickly realized we were ending up re-writing everything.

Although event-loops are indeed promising, considering that we were writing an autoscaler for kubernetes, the kubernetes client support would have to be taken into consideration while re-writing the controller. The golang client for kubernetes is the gold standard compared to kubernetes clients in other languages. We therefore turned to golang and explored if go-routines can be effectively harnessed to solve our problem.
When it comes to concurrency, goroutines offer one of the best solutions. Our goal was to do all network I/O operations concurrently.

The Design: Scatter/Gather
Considering n different network I/O operations, we would spin n go-routines aka Scatter, with a buffered channel of size n. The channel would essentially become our means to collect aka Gather the results which in this case are simply the metrics, consumer-group lag and message production throughput. A simple codeblock would help illustrate this idea.

package main

import (

type Metric struct {
	metricValue int // the actual metric value

func (m *Metric) GetMetric(ctx context.Context, err chan<- error) {
	/* get the corresponding metric
	   some adhoc computation might be done here too
	m.metricValue = 1 // this is the actual metric, for demonstration, just fetching 1
	if ctx.Err() == nil { // this is to avoid future channel deadlocks
		err <- nil

func Scatter(timeout int, metrics []*Metric) (ctx context.Context, cancel context.CancelFunc, e <-chan error) {
	err := make(chan error, len(metrics))
	ctx, cancel = context.WithTimeout(context.Background(), time.Millisecond*time.Duration(timeout))
	for _, m := range metrics {
		go m.GetMetric(ctx, err) // GetMetric would store the actual metric value in metricValue
	return ctx, cancel, err

func Gather(ctx context.Context, cancel context.CancelFunc, err <-chan error) {
	defer cancel()
	allDone := false
	completed := 0
	for {
		select {
		case e := <-err:
			if completed == 10 {
				allDone = true
			if e != nil {
				fmt.Printf("error: %v\n", e)
		case <-ctx.Done():
			allDone = true	
		if allDone {

func main() {
	var Metrics []*Metric
	for i := 0; i < 10; i++ {
		m := new(Metric)
		Metrics = append(Metrics, m)

	ctx, cancel, err := Scatter(10, Metrics)
	Gather(ctx, cancel, err)

Reliability Considerations in Iteration-2

In the above codeblock, note that we are actually collecting metrics with a timeout. The GetMetric method in the above codeblock can potentially be part of a Metric API that enforces this timeout. Needless to say timeouts are critical in the reliability of any application.

Kubernetes Shared-Infomers
The kubernetes golang client has multiple means of fetching deployment metadata from the api-server. However as we were exploring and experimenting with the client, we quickly realized that even a concurrent call to the api-server for fetching the deployment metadata of 100s of deployments was not going to scale. Sabotaging the api-server by bombarding requests from our application could not possibly be a pretty solution. Kubernetes offers an event driven approach of fetching changes to the deployment metadata of all deployments in the cluster(or within a namespace) using shared-informers. The idea of shared-informers is incredibly common in the community of folks building CRDs(Custom resource definitions) and custom controllers(kubernetes controllers) using the kuberentes API. The kubernetes sample-controller(https://github.com/kubernetes/sample-controller/blob/master/controller.go ) provides a boilerplate of using shared-informers.

It goes without saying that signal-handling is a crucial requirement for any application. The 12-factor app clearly talks about disposability as a fundamental requirement. Our lag-based HPA is no exception.

Liveness Probes
Liveness probes are critical to the reliability of any system on kubernetes. In our lag-based HPA it might not be obvious on where we would want to enforce a potential liveness probe. The liveness probe on failure will restart the corresponding container. We would want to restart the lag-based HPA container if it is unable to operate for more than a threshold period of time. Its similar to a heartbeat. Although we enforce timeouts at all the levels, as Murphy’s law goes, ‘Anything that can go wrong will go wrong’. We included a small http-server inside the HPA that would expose our liveness probe endpoint. The handler method for this endpoint would simply check to see if the time passed since the last iteration of the HPA sync loop is beyond a threshold or not.

With this iteration(2), we were able to reduce compute by another 25%. The new controller can autoscale 100s of deployments with just 0.5 cores of compute and the computation of the desired number of consumers for all the deployments takes barely a second.

Average response time of a second for one lag-based HPA pod catering to all consumer-groups

You may also like

Leave a Reply

This site uses Akismet to reduce spam. Learn how your comment data is processed.

%d bloggers like this: