CDC transformations and the Outbox pattern with CockroachDB

Using CDC projection and transformations in CockroachDB along with the Outbox pattern to keep multiple copies in sync

·

7 min read

In a previous article, we looked at a common challenge in microservice style architectures, namely how to keep multiple copies of state in sync. One service calls another service and then keeps a locally cached copy maintained to both reduce chattiness over the network and increase fault tolerance.

Using the transactional outbox pattern avoids the dual write problem and it's straightforward to implement in CockroachDB using a CDC webhook along with TTLs to clear out published domain events.

In this article, we are going to improve on this concept by not using any outbox table but instead projections and transformations, a new feature added to CockroachDB in v22.2.

Source Code

The source code for examples of this article can be found on GitHub.

Introducing CDC Transformations

Changefeeds offers a powerful push-based, stream-oriented mechanism to drive data integration pipelines and also provides a backbone for event-driven architectures.

Rather than polling tables and extracting the information, you simply hook up a subscription on the changes that will be streamed by the database itself to a sink of choice when state changes.

CDC transformations add the ability to both filter and transform the data into the domain events that you want to be produced in the change stream. This reduces complexity since you don't need any outbox table to store the to-be-published, events which in turn means less duplicate storage and fewer cleanup efforts afterward. It reduces cost as well since there are lower storage requirements and fewer events sent over the wire, only to be filtered by the downstream systems.

Use Case Example

In the following example, we have two services. One is the Catalog Service representing the technical authority for a product catalog. It only contains one table - product.

Next, we have the Order Service which is the technical authority for order management and workflow. It needs domain knowledge of what a product is but doesn't want the ownership of it, just a shallow copy of the product entity to produce purchase orders. To avoid having the order service call the catalog service each time an order is added or changed, it hosts copies or a materialized view of the product catalog. This means whenever the catalog creates, updates or deletes an order, it should be reflected somehow in the downstream system, this time the order service.

Typically, you would use some sort of message broker (like Kafka or a pub/sub system) in between the services that decouple the systems completely and allows for multiple durable subscribers. There is however no hard dependency between these components. The main dependency is the changefeed that pushes the products from the catalog to the order service, but that dependency sits in the database tier.

Catalog Service

Assume the product table in the catalog service looks something like this:

create table product
(
    id               uuid           not null default gen_random_uuid(),
    name             varchar(128)   not null,
    description      varchar(256),
    price            numeric(19, 2) not null,
    currency         varchar(3)     not null,
    sku              varchar(128)   not null,
    inventory        int            not null default 0,
    created_by       varchar(24),
    created_at       timestamptz    not null default clock_timestamp(),
    last_modified_by varchar(24),
    last_modified_at timestamptz,

    primary key (id)
);

The next step would be to create a webhook change feed using transformations to tailor the event structure:

CREATE CHANGEFEED INTO 'webhook-https://localhost:8443/order-service/cdc?insecure_tls_skip_verify=true'
WITH schema_change_policy='stop', key_in_value, updated, resolved='15s', webhook_sink_config='{"Flush": {"Messages": 10, "Frequency": "5s"}, "Retry": {"Max": "inf"}}'
AS SELECT
    cdc_updated_timestamp()::int AS event_timestamp,
    'v1' AS event_version,
    'product' AS event_table,
    IF(cdc_is_delete(),'delete',IF(cdc_prev()='null','create','update')) AS event_type,
    cdc_prev() as event_before,
    jsonb_build_object(
        'id', id,
        'name', name,
        'description', description,
        'price', concat(price::string, ' ', currency),
        'sku', sku,
        'inventory', inventory,
        'created_by', created_by,
        'created_at', created_at,
        'last_modified_by', last_modified_by,
        'last_modified_at', last_modified_at
    ) AS event_after
FROM product;

There are a few new methods here that are helpful for both filtering and tailoring the events:

  • cdc_updated_timestamp

  • cdc_is_delete

  • cdc_prev

Order Service

When the catalog service creates new products, the receiving endpoint in the order service will see HTTP POST request bodies like this (note that TLS is required for the webhook sink):

{
  "payload" : [ {
    "__crdb__" : {
      "key" : [ "8c8fbc48-b670-490d-9710-08b25500c314" ],
      "topic" : "product",
      "updated" : "1673104217274160332.0000000000"
    },
    "event_after" : {
      "created_at" : "2023-01-07T15:06:15.122Z",
      "created_by" : "bobby_tables",
      "description" : null,
      "id" : "8c8fbc48-b670-490d-9710-08b25500c314",
      "inventory" : 412,
      "last_modified_at" : "2023-01-07T15:06:45.13Z",
      "last_modified_by" : "bobby_tables",
      "name" : "q3hix3OUor0gqf-7sDP_rA",
      "price" : "101.11 USD",
      "sku" : "p-4"
    },
    "event_before" : {
      "created_at" : "2023-01-07T15:06:15.122Z",
      "created_by" : "bobby_tables",
      "currency" : "USD",
      "description" : null,
      "id" : "8c8fbc48-b670-490d-9710-08b25500c314",
      "inventory" : 408,
      "last_modified_at" : "2023-01-07T15:06:15.274Z",
      "last_modified_by" : "bobby_tables",
      "name" : "q3hix3OUor0gqf-7sDP_rA",
      "price" : 42.28,
      "sku" : "p-4"
    },
    "event_table" : "product",
    "event_timestamp" : 1673104187178673238,
    "event_type" : "update",
    "event_version" : "v1"
  } ],
  "length" : 1
}

The endpoint receiving the change feed events (a bit shortened for illustration):

@RestController
@RequestMapping(value = "/order-service/cdc")
public class ChangeFeedController {
..
    @PostMapping(consumes = {MediaType.ALL_VALUE})
    public ResponseEntity<?> onChangeFeedEvent(@RequestBody String body) {
        try {
            String prettyJson = prettyObjectMapper
                    .writerWithDefaultPrettyPrinter()
                    .writeValueAsString(prettyObjectMapper.readTree(body));
            logger.debug("onChangeFeedEvent ({}) body:\n{}", counter.incrementAndGet(), prettyJson);

            // We could use the 'event_table' field to map against change event types, here we only have one type
            ProductEnvelope envelope = objectMapper.readerFor(ProductEnvelope.class).readValue(body);
            AbstractEnvelope.Metadata metadata = envelope.getMetadata();
            if (metadata != null) {
                metadata.getResolvedTimestamp().ifPresent(logicalTimestamp -> {
                    logger.debug("Resolved timestamp: {}", logicalTimestamp);
                });
            }
            envelope.getPayloads().forEach(e -> domainEventListener.onProductChangeEvent(e));
        } catch (JsonProcessingException e) {
            logger.error("", e);
            return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
                    .body(e.toString());
        }

        return ResponseEntity.ok().build();
    }
}

It simply maps the request body to a ProductEnvelope object that is modeled around the CDC message structure:

public abstract class AbstractEnvelope<T extends Event<ID>, ID> {
    public static class Metadata {
        @JsonProperty("resolved")
        private String resolved;

        public String getResolved() {
            return resolved;
        }

        public Optional<LogicalTimestamp> getResolvedTimestamp() {
            return resolved != null
                    ? Optional.ofNullable(LogicalTimestamp.parse(resolved))
                    : Optional.empty();
        }
    }

    @JsonProperty("__crdb__")
    private Metadata metadata;

    @JsonProperty("payload")
    private List<Payload<T, ID>> payloads = new ArrayList<>();

    @JsonProperty("length")
    private int length;

    public Metadata getMetadata() {
        return metadata;
    }

    public List<Payload<T, ID>> getPayloads() {
        return payloads;
    }

    public int getLength() {
        return length;
    }
}

public class Payload<T extends Event<ID>, ID> {
    public static class Metadata {
        @JsonProperty("topic")
        private String topic;

        @JsonProperty("updated")
        private String updated;

        @JsonProperty("key")
        private List<String> key = new ArrayList<>();

        public String getTopic() {
            return topic;
        }

        public String getUpdated() {
            return updated;
        }

        public List<String> getKey() {
            return key;
        }
    }

    @JsonProperty("__crdb__")
    private Metadata metadata;

    @JsonProperty("event_table")
    private String table;

    @JsonProperty("event_timestamp")
    private String timestamp;

    @JsonProperty("event_type")
    private String type;

    @JsonProperty("event_before")
    private T before;

    @JsonProperty("event_after")
    private T after;

    public Metadata getMetadata() {
        return metadata;
    }

    public T getBefore() {
        return before;
    }

    public T getAfter() {
        return after;
    }

    public String getTable() {
        return table;
    }

    public String getTimestamp() {
        return timestamp;
    }

    public Optional<LogicalTimestamp> getLogicalTimestamp() {
        return timestamp != null ? Optional.of(LogicalTimestamp.parse(timestamp)) : Optional.empty();
    }

    public Operation getOperation() {
        if ("create".equals(type)) {
            return Operation.insert;
        } else if ("delete".equals(type)) {
            return Operation.delete;
        } else if ("update".equals(type)) {
            return Operation.update;
        } else {
            return Operation.unknown;
        }
    }
}

Finally, we have the product service that represents the transaction boundary. It maps the events to JPA entity operations, effectively using UPSERTs and DELETEs.

    @Override
    @Transactional(propagation = Propagation.REQUIRES_NEW)
    public void onProductChangeEvent(Payload<ProductEvent, UUID> payload) {
        ProductEvent beforeEvent = payload.getBefore();
        ProductEvent afterEvent = payload.getAfter();

        // Deletes are a bit special
        if (beforeEvent == null && afterEvent == null) {
            Payload.Metadata metadata = payload.getMetadata();
            metadata.getKey().forEach(key -> {
                UUID id = UUID.fromString(key);
                logger.debug("Delete product with ID [{}]", id);
                productRepository.deleteById(id);
            });
            return;
        }

        switch (payload.getOperation()) {
            case insert:
            case update:
                Product proxy = productRepository.findById(afterEvent.getId()).orElseGet(Product::new);
                if (proxy.isNew()) {
                    logger.debug("Create product with ID [{}]: {}", afterEvent.getId(), proxy);
                    proxy.setId(afterEvent.getId());
                } else {
                    logger.debug("Update product with ID [{}]: {}", afterEvent.getId(), proxy);
                }

                Money m = Money.of(afterEvent.getPrice());
                proxy.setPrice(m.getAmount());
                proxy.setCurrency(m.getCurrency().getCurrencyCode());
                proxy.setSku(afterEvent.getSku());
                proxy.setName(afterEvent.getName());
                proxy.setDescription(afterEvent.getDescription());
                proxy.setInventory(afterEvent.getInventory());

                productRepository.save(proxy);
                break;
            default:
                throw new IllegalStateException("Unknown operation: " + payload.getOperation());
        }
    }

Demo Project

This tutorial assumes you run everything on a local machine/laptop.

To run the demo, first clone the GitHub repo and build the two components:

git clone git@github.com:kai-niemi/roach-spring-boot.git
cd roach-spring-boot
./mvnw clean install
cd spring-boot-cdc-parent

Create the databases in CockroachDB using the DB console:

CREATE database spring_boot_catalog;
CREATE database spring_boot_order;

Then start the catalog and order services:

java -jar catalog-service/target/catalog-service.jar &
java -jar order-service/target/order-service.jar &

When the services come up, you can use your browser to inspect the catalog and the order service.

The catalog service has a custom scheduling resource that will enable and disable periodic updates to product change events.

Conclusion

In this article, we are using CDC transformations with a webhook sink in CockroachDB to drive a data integration workflow between two independent services. It eliminates the need for modeling an outbox table in the source service and all lifecycle management of the domain events.

Did you find this article valuable?

Support Kai Niemi by becoming a sponsor. Any amount is appreciated!