# Using the CDC Kafka Sink in CockroachDB

# Introduction

In this article, we'll demonstrate creating a simple streaming data pipeline using a small micro-batching tool and CockroachDB's CDC [Kafka](https://www.cockroachlabs.com/docs/stable/changefeed-sinks.html#kafka) sink.

# Setup

Prerequisites:

* [CockroachDB](https://www.cockroachlabs.com/docs/v22.2/start-a-local-cluster), with a trial enterprise [license](https://www.cockroachlabs.com/docs/stable/licensing-faqs.html#obtain-a-license).
    
* [Kafka](https://kafka.apache.org/quickstart)
    
* [Pipeline](https://github.com/kai-niemi/roach-pipeline), an open-source Java tool built on top of Spring Batch
    
* Java 17+ Runtime
    
* Linux / macOS
    

## CockroachDB Setup

Initially, create a local cluster of three nodes (or one node, not important):

```sql
cockroach start --port=26257 --http-port=8080 --advertise-addr=localhost:26257 --join=localhost:26257 --insecure --store=datafiles/n1 --background

cockroach start --port=26258 --http-port=8081 --advertise-addr=localhost:26258 --join=localhost:26257 --insecure --store=datafiles/n2 --background

cockroach start --port=26259 --http-port=8082 --advertise-addr=localhost:26259 --join=localhost:26257 --insecure --store=datafiles/n3 --background

cockroach init --insecure --host=localhost:26257
```

Then, setup the source database `tpcc` and the target database `tpcc_copy`:

```sql
cockroach sql --insecure --host=localhost:26257 -e "CREATE database tpcc"

cockroach sql --insecure --host=localhost:26257 -e "CREATE database tpcc_copy"
```

Finally, load the TPC-C fixture (schema and data) to the source database:

```sql
cockroach workload fixtures import tpcc --warehouses=10 'postgres://root@localhost:26257?sslmode=disable'
```

## Kafka Setup

Ref: [https://kafka.apache.org/quickstart](https://kafka.apache.org/quickstart)

Initially, setup a local Kafka server that we'll use as CDC sink (using KRaft over ZK):

```bash
tar -xzf kafka_2.13-3.3.1.tgz
cd kafka_2.13-3.3.1
KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server.properties
bin/kafka-server-start.sh config/kraft/server.properties
```

Optionally, start a console consumer to see the change events flashing by later. In this example for the `warehouse` table/topic:

```bash
bin/kafka-console-consumer.sh --topic warehouse --from-beginning --bootstrap-server localhost:9092
```

## Pipeline Setup

Initially, clone the repo and build it locally:

```bash
git clone git@github.com:kai-niemi/roach-pipeline.git pipeline
cd pipeline
chmod +x mvnw
./mvnw clean install
```

The executable jar is now available under the `target` folder. Try it out with:

```bash
java -jar target/pipeline.jar --help
```

# Configure the Pipeline

Now we are ready to create `kafka2sql` jobs for each TPC-C table we want to be streamed from the source to the target database.

## Generate Form Templates

First off, we get form templates that are going to be pre-populated with SQL statements for each table in question. We are only using a subset of the TPC-C workload tables, but the process is the same for all tables.

```bash
curl -X GET http://localhost:8090/kafka2sql/form?table=warehouse > warehouse-kafka2sql.json

curl -X GET http://localhost:8090/kafka2sql/form?table=district > district-kafka2sql.json

curl -X GET http://localhost:8090/kafka2sql/form?table=customer > customer-kafka2sql.json
```

Feel free to inspect the JSON files which should give an idea of how the batch jobs are configured and run. At this point, we haven't started anything yet. The JSON files typically don't need any editing if the template settings are properly set (everything defaults to using localhost).

## Submit Batch Jobs

The next step is to POST the forms back which will register the jobs and start them up. The jobs need to be registered in the sorted topology order of the foreign key constraints `(warehouse <- district <- customer)` since we'll be creating tables on-the-fly.

```bash
curl -d "@warehouse-kafka2sql.json" -H "Content-Type:application/json" -X POST http://localhost:8090/kafka2sql

curl -d "@district-kafka2sql.json" -H "Content-Type:application/json" -X POST http://localhost:8090/kafka2sql

curl -d "@customer-kafka2sql.json" -H "Content-Type:application/json" -X POST http://localhost:8090/kafka2sql
```

The final step is to configure the Kafka change feeds for these three tables.

Connect to the source database and execute:

```bash
CREATE CHANGEFEED FOR TABLE warehouse INTO 'kafka://localhost:9092' WITH updated,resolved = '15s';

CREATE CHANGEFEED FOR TABLE district INTO 'kafka://localhost:9092' WITH updated,resolved = '15s';

CREATE CHANGEFEED FOR TABLE customer INTO 'kafka://localhost:9092' WITH updated,resolved = '15s';
```

You should see the target database starting to fill up and eventually reach the same state as the source database. If you would also run the TPC-C workload, you will see any changes reflected also in the target.

# Conclusion

In this article, we looked at creating a simple streaming data pipeline at table level between two separate CockroachDB databases using the CDC Kafka sink.
