Home Technology A year and a half with Debezium

A year and a half with Debezium

by midhunsukumaran

I have just realized that We (at Bigbasket) have been using Debezium for a year and a half, and we have used it for creating a customized change data pipeline from an AWS MySQL RDS for a reporting framework. I think it’s the right time to talk about it.

For the people who don’t know what’s Debezium, it’s an open source distributed tool for change data capture. It uses Apache Kafka to store the data.

How does it work?

If I can bring up a generic definition for how it works, then I will put it in this way: A tool which reads a database and produce messages for every transaction. Also, don’t forget, it brings the luxury of taking snapshot of existing data/database.

It has two high level modes to operate: complete snapshot+ Ongoing data (this mode is known as “initial”) and Just ongoing data (“schema_only” mode).

Some important points to remember:

  • Debezium creates Kafka Topics per Database table.
  • The Kafka message can be in JSON/Avro/protobuf format (I chose JSON, so I prefer to talk about that)
  • Where we can find the actual DB data? The JSON has a key called “payload” and the value payload is again a hashmap (dictionary), which contains two keys: “before” and “after”, like the name indicates those contain the previous and latest data respectively.
  • In snapshot mode, it reads the data from the database using a “select *” query. Also, keep in mind that Debezium locks the database to prevent any kind of schema changes in between. We can override these queries in table level and I will cover part later in this blog.
  • For the ongoing data changes, it reads the binlog, so it won’t hammer the database with any LRQs.
  • Debezium maintains a database history topic. This helps to maintain and recover the data flow.

The Debezium supports the following databases:

We have used it for the MySQL databases, so I will explain the data flow in the notion of MySQL.

How did we start?

Let me first get into the use case we were trying to solve. We had to set up a stitched table: A denormalized table contains data from hundreds of tables which also contain close to 100 columns to store these data pieces.

In general, we can classify tables of a database into mainly two categories:

  • Reference Table: The tables mainly used by other tables as references (Foreign Keys) and used by the queries to filter the data. Usually, the data changes are limited in it, and the number of rows in such tables will be limited as well. For example, consider that we have a table to store the information about the countries. Less likely any data update happens on that table and mainly that one will be used to in queries to filter out the data geographically.
  • Data flowing Table: The tables to which the data flow often. Something like Purchase History is an example of it.

The data transformation I was expecting looks something like this:

Data Transformation: Source DB to Stich DB
Transformation: Source DB to Stitched DB

In the above example, you can clearly identify that, the tables countries and state are reference tables and the tables customers and purchase are more of a Data Flowing Tables.

So I have decided to take the snapshot these reference tables using the Debezium in the initial mode and the connector configuration looks something like below:

curl -i -X <strong>POST</strong> -H <strong>"Accept:application/json"</strong> -H  <strong>"Content-Type:application/json"</strong> http://localhost:8084/connectors/ \
    -d '{"name": "mysql-connector-db1",
 "config":{
   "name":"mysql-connector-db1",
   "database.port":"3306",
   "database.user":"db_user_name",
   "connector.class":"io.debezium.connector.mysql.MySqlConnector",
   "rows.fetch.size":"2",
   "table.whitelist":"database1.countries,database1.state",
   "database.hostname":"db_host1",
   "database.password":"password",
   "database.server.id":"1000",
   "database.whitelist":"database1",
   "database.server.name":"db1",
   "decimal.handling.mode":"string",
   "include.schema.changes":"true",
   "database.history.kafka.topic":"dbhistory.db1",
   "database.history.kafka.bootstrap.servers":"localhost:9092",
   "database.history.kafka.recovery.attempts":"20",
   "database.history.kafka.recovery.poll.interval.ms":"10000000",
   <strong>"snapshot.mode": "initial"</strong>
}}'

I need only ongoing data for the tables customers and purchase, so I have created a connector in schema_only mode:

curl -i -X <strong>POST</strong> -H <strong>"Accept:application/json"</strong> -H  <strong>"Content-Type:application/json"</strong> http://localhost:8084/connectors/ \
    -d '{"name": "mysql-connector-db2",
 "config":{
   "name":"mysql-connector-db2",
   "database.port":"3306",
   "database.user":"db_user_name",
   "connector.class":"io.debezium.connector.mysql.MySqlConnector",
   "rows.fetch.size":"2",
   "table.whitelist":"database1.customers,database1.purchase",
   "database.hostname":"db_host1",
   "database.password":"password",
   "database.server.id":"1001",
   "database.whitelist":"database1",
   "database.server.name":"db2",
   "decimal.handling.mode":"string",
   "include.schema.changes":"true",
   "database.history.kafka.topic":"dbhistory.db2",
   "database.history.kafka.bootstrap.servers":"localhost:9092",
   "database.history.kafka.recovery.attempts":"20",
   "database.history.kafka.recovery.poll.interval.ms":"10000000",
   <strong>"snapshot.mode": "schema_only"</strong>
}}'<gwmw style="display:none;">

We can actually combine these two connectors into one using snapshot.select.statement.overrides:

curl -i -X <strong>POST</strong> -H <strong>"Accept:application/json"</strong> -H  <strong>"Content-Type:application/json"</strong> http://localhost:8084/connectors/ \
    -d '{"name": "mysql-connector-db3",
 "config":{
   "name":"mysql-connector-db3",
   "database.port":"3306",
   "database.user":"db_user_name",
   "connector.class":"io.debezium.connector.mysql.MySqlConnector",
   "rows.fetch.size":"2",
"table.whitelist":"database1.countries,database1.state,database1.customers,database1.purchase",
   "database.hostname":"db_host1",
   "database.password":"password",
   "database.server.id":"1002",
   "database.whitelist":"database1",
   "database.server.name":"db3",
   "decimal.handling.mode":"string",
   "include.schema.changes":"true",
   "<strong>snapshot.select.statement.overrides</strong>":                    "database1.customers,database1.purchase",
   "snapshot.select.statement.overrides.database1.customers": "select * from customers where id &lt; 0",
   "snapshot.select.statement.overrides.database1.purchase": 
"select * from purchase where id &lt; 0",
   "database.history.kafka.topic":"dbhistory.db3",
   "database.history.kafka.bootstrap.servers":"localhost:9092",
   "database.history.kafka.recovery.attempts":"20",
   "database.history.kafka.recovery.poll.interval.ms":"10000000",
   <strong>"snapshot.mode": "initial"</strong>
}}'

Architecture

High Level Architecture

As Debezium runs on Kafka connect and Kafka connect allows you to create multiple connectors, we can have multiple Debezium connectors running in parallel. Like I have mentioned earlier, Debezium produces messages for the snapshot/transaction to Kafka broker(s).

I have written a customized python consumer script(I can open source this one, let me know in the comments) to consume the data from the Kafka brokers. The consumer takes care of parsing the Kafka messages and does the data transformation like mentioned in the previous diagram. And at the end the consumer dumps the data into the Stitched DB.

What if it crashes?

Like we all know, things may not work as expected always. So before using any tool for any critical project, we always need to check the possibility of a recovery in case of crash or a failure. So how does Debezium tackle any failure?

It has “schema_only_recovery” mode!!

Irrespective of the mode in which we are running the connector(s), we can recover it using schema_only_recovery mode.

But you need to follow certain steps to start the recovery process:

  • Delete the crashed connector(s), use the command below:
curl -i -X <strong>DELETE</strong> http://localhost:8084/connectors/mysql-connector-db3<gwmw style="display:none;">
  • Delete the database history topic(s).
  • Start the connector(s) in “schema_only_recovery” mode, the sample connector is given below:
curl -i -X <strong>POST</strong> -H <strong>"Accept:application/json"</strong> -H  <strong>"Content-Type:application/json"</strong> http://localhost:8084/connectors/ \
    -d '{"name": "mysql-connector-db3",
 "config":{
   "name":"mysql-connector-db3",
   "database.port":"3306",
   "database.user":"db_user_name",
   "connector.class":"io.debezium.connector.mysql.MySqlConnector",
   "rows.fetch.size":"2",
"table.whitelist":"database1.countries,database1.state,database1.customers,database1.purchase",
   "database.hostname":"db_host1",
   "database.password":"password",
   "database.server.id":"1002",
   "database.whitelist":"database1",
   "database.server.name":"db3",
   "decimal.handling.mode":"string",
   "include.schema.changes":"true",
   "snapshot.select.statement.overrides": "database1.customers,database1.purchase",
   "snapshot.select.statement.overrides.database1.customers": "select * from customers where id &lt; 0",
   "snapshot.select.statement.overrides.database1.purchase": "select * from purchase where id &lt; 0",
   "database.history.kafka.topic":"dbhistory.db3",
   "database.history.kafka.bootstrap.servers":"localhost:9092",
   "database.history.kafka.recovery.attempts":"20",
   "database.history.kafka.recovery.poll.interval.ms":"10000000",
  <strong> "snapshot.mode": "schema_only_recovery"</strong>
}}'<gwmw style="display:none;">

Deserialization Failure

I have faced some Deserialization issues with Debezium. The connectors were configured to read the data from one AWS RDS and it had some non-standard statements in the binlog. And some errors were getting triggered while reading the logs. I have solved it by setting “event.processing.failure.handling.mode” as “warn”: In this case, it will log the error and move on (the default value is “fail”, in this case, the connector crashes in case it encounter an error).

That’s most of it! It’s one of the coolest tools to play with and it can be handy in many CDC/data pipeline scenarios/use cases. So try to explore it. You can find the official documentation here.

You may also like

Leave a Reply

This site uses Akismet to reduce spam. Learn how your comment data is processed.

%d bloggers like this: