Creating a data warehouse with Apache Doris, Kafka, and Debezium

Over the last few weeks I’ve been looking at ways to create a data warehouse. The aim is to bring together disparate sources of data so that they can be analysed to create reports and dashboards. There are many open-source and closed-source options available like Snowflake, Clickhouse, Apache Pinot, etc. However, I ended up settling for Apache Doris. In this post I will go over how you can replicate data into Doris using Kafka. I will use Debezium to replicate a Postgres table into the warehouse.

Setting things up

My setup consists of Docker containers for Kafka, Debezium, Postgres, and Doris. In a nutshell, I’ll create a table in Postgres, have Debezium stream it to Kafka, and then use the Kafka consumer in Doris to ingest this into a table. After the JSON generated by Debezium has been written to a table, I’ll create a view on top of this data to make it look relational again.

Before we begin

Doris ships with “external table” ingestion for Postgres. However, the caveat is that you’d have to import the data in small batches manually. Additionally, the external connector is configured for a single database table. If you have a setup where every tenant gets their own table then the data can be split among multiple databases (for MySQL) or schemas (for Postgres) on the same cluster. Setting up external tables becomes difficult. This is because new databases or schemas can be added dynamically and would require an external table to be setup for them. In contrast, you could have Debezium fetch tables based on a regex pattern. Any table in any database or schema that matches the pattern would be a valid candidate for Debezium to stream from.

Another reason for using Debezium is to avoid reingesting the data when new columns are added to the table. Since we will store the raw JSON payload that Debezium sends, we can simply update the view and extract the new column from the JSON.

The final reason for using Debezium is to be able to support databases that Doris doesn’t have external table support for. For example, MongoDB. Using Debezium allows us to stream these databases into Kafka and then into Doris.

Getting started

We’ll begin by bringing up the containers. For the purpose of this demo, we will create a table called people and add some rows to it. Here is what a row from this table looks like:

1
2
3
| customer_id | id     | name         | created_at                 | updated_at | deleted_at |
|-------------|--------|--------------|----------------------------|------------|------------|
| 1 | 108923 | Cynthia Ford | 2023-11-02 15:50:42.158417 | NULL | NULL |

After the table is populated, we’ll create a Debezium source connector to stream the table to Kafka. The configuration for this connector looks like this:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
{
"name": "postgres_source",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres",
"database.user": "postgres",
"database.password": "my-secret-pw",
"database.dbname": "postgres",
"database.server.name": "postgres",
"table.include.list": ".*\\.people",
"plugin.name": "pgoutput",
"publication.autocreate.mode": "filtered",
"time.precision.mode": "connect",
"tombstones.on.delete": "false",
"snapshot.mode": "initial",
"heartbeat.interval.ms": "1000",
"transforms": "route",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
"transforms.route.replacement": "$3",
"event.processing.failure.handling.mode": "skip",
"producer.override.compression.type": "snappy"
}
}

Notice how we’re specifying a regex pattern for table.include.list in the configuration. This would match people table in any schema within the postgres database. We are also using the route transformer to merge the tables from various schemas into a single Kafka topic.

Now that the data is in Kafka, we can begin ingesting it into Doris. We will do this as a two-step process. One, we will ingest this into a landing area where we store the raw JSON payload along with the primary key. Two, we will create a view on top of this data and extract the keys that we need.

We’ll create a database called ingest where we will store the raw data.

1
CREATE DATABASE ingest;

Within this database, we will create a table called people where the data from Kafka will be stored.

1
2
3
4
5
6
7
8
9
10
11
12
13
CREATE TABLE IF NOT EXISTS ingest.people (
id BIGINT,
customer_id BIGINT,
source JSONB,
op VARCHAR,
ts_ms BIGINT
)
UNIQUE KEY (id, customer_id)
DISTRIBUTED BY HASH (customer_id) BUCKETS 1
PROPERTIES (
"replication_num" = "1",
"enable_unique_key_merge_on_write" = "true"
);

The source column will store the JSON generated by Debezium. We will extract column values from this when we create the view. We are specifying the combination of customer_id and id as the UNIQUE KEY and enabling merge-on-write in properties. This allows us to keep the latest version of the row as we read updates from Debezium. Since all of this running on my local machine, I’ve set the replication to 1. Without this setting Doris would not allow creating tables as there aren’t at least 3 instances of Doris backend running.

To begin loading data from Kafka, we will create a ROUTINE LOAD. This will read data from Kafka in batches and write them to the people table.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
CREATE ROUTINE LOAD ingest.load_people ON people
PROPERTIES
(
"max_batch_interval" = "20",
"max_batch_rows" = "300000",
"max_batch_size" = "209715200",
"format" = "json",
"jsonpaths" = "[
\"$.payload.after.id\",
\"$.payload.after.customer_id\",
\"$.payload.after\",
\"$.payload.op\",
\"$.payload.ts_ms\"
]"
)
FROM KAFKA
(
"kafka_broker_list" = "192.168.0.108:9092",
"kafka_topic" = "people",
"property.group.id" = "people",
"property.client.id" = "1",
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
);

In the definiton of the routine load we are specifying the table to write into using the ON clause. In the properties we specify that we are ingesting JSON objects from Kafka. We also specify jsonpaths which allows us to select keys from within the JSON payload. We are extracting keys that would become columns in the raw people table. Since jsonpaths is a stringified JSON list, we need to escape the quotes. Finally, we specify the Kafka broker and the topic to read from.

You can see the running task using

1
SHOW ALL ROUTINE LOAD FOR ingest.load_people;

After waiting for some time to let the task finish ingesting the rows we can create the view. We’ll start by creating the database.

1
CREATE DATABASE warehouse;

Finally, we will create the view

1
2
3
4
5
6
7
8
9
CREATE VIEW warehouse.people AS
SELECT
id,
customer_id,
REPLACE(json_extract(source, '$.name'), "\"", '') AS name,
FROM_UNIXTIME(CAST(replace(json_extract(source, '$.created_at'), "\"", '') AS BIGINT) / 1000) created_at,
FROM_UNIXTIME(CAST(replace(json_extract(source, '$.updated_at'), "\"", '') AS BIGINT) / 1000) updated_at,
FROM_UNIXTIME(CAST(replace(json_extract(source, '$.deleted_at'), "\"", '') AS BIGINT) / 1000) deleted_at
FROM ingest.people;

In the SQL above, we are extacting the fields from the JSON that we stored. We are processing them to remove quotations, and converting the epoch sent by Debezium to a DATETIME object. Since we have enabled merge-on-write, subsequent updates to the source table in Postgres would be reflected automatically in the view.

We can now see the data stored in the view.

1
SELECT * FROM warehouse.people LIMIT 1;

and this gives us

1
2
3
| customer_id | id    | name           | created_at          | updated_at | deleted_at |
|-------------|-------|----------------|---------------------|------------|------------|
| 1 | 28910 | Douglas Becker | 2023-11-03 11:42:09 | NULL | NULL |

That’s it. That’s how you can set up a real-time data warehouse using Doris, Debezium, and Kafka.