Using the Inbox Pattern with CockroachDB

The transactional inbox pattern using Pub-Sub and CockroachDB

·

9 min read

Introduction

In a previous article, we looked at using CDC projections and transformations in CockroachDB to implement the Outbox Pattern for keeping multiple copies of state in sync in a microservices-style architecture.

In this article, we are going to look at the other end of the pipe, where events arrive into a system for processing rather than being delivered downstream. For that, we'll use the Inbox pattern.

Both these patterns help to solve the same challenge: how to provide exactly-once processing semantics when the best you have is at-least-once guarantees between heterogeneous systems.

Source Code

The source code for examples of this article can be found on GitHub.

Delivery vs Processing Semantics

The two concepts of delivery and processing are often mixed up or incorrectly referred to as the same thing. The "at most once", "at least once" and notorious "exactly once" guarantees are often discussed in the context of message transport delivery between heterogeneous systems, out of which "exactly once" is still very much impossible to achieve. Unless disproving certain impossibility theorems or mixing the semantics of the terms.

Is this magical pixie dust I can sprinkle on my application?

No, not quite. Exactly-once processing is an end-to-end guarantee and the application has to be designed to not violate the property as well.

Message delivery refers to the passing of messages between components over an asynchronous network model full of traps, delays, re-ordered packets and all sorts of dangers (runners through the valley).

"Exactly-once delivery" is often defined as the message passing system filtering out duplicate events by maintaining state at the consumer side and adopting de-duplication at the producer side. There is still de-duplication of re-delivered messages involved, only it's encapsulated within a closed system (homogenous) sharing the same protocol. It's a misnomer to call it exactly-once delivery whereas exactly-once processing is a more accurate term for what goes on.

Why is this important? The taxonomy itself is not, but it's important to understand the implications when depending on certain guarantees to protect application invariants. If you simply assume you get a message delivered exactly once from a message-passing system without reading the fine print of what's required at your end, you could be in a world of hurt.

In regards to the Inbox Pattern, we are not talking about a closed system but instead separate systems that don't share the same agreement protocol for message passing. What you then can rely on if, for example, you have systems A and B with a message passing system C in between, are the "at-most-once" and "at-least-once" message "delivery" guarantees.

"At most-once" means a message can arrive one time only or not at all. It's not particularly useful except for fire-and-forget type of stuff. "At least-once" means a message can arrive potentially multiple times due to de-delivery caused by failure or uncertainty (handled by timeouts).

The re-delivery part is a tricky one since you could end up with multiple side effects from processing a message when the intended purpose is just a single effect. Sending multiple e-mails is one classic example, double charging payments is another. Multiple side-effects, partial outcomes or copies ending up out-of-sync in the context of messaging is the equivalent of inconsistency in a database context.

Exactly What?

Exactly-once processing semantics as an end-to-end guarantee is on the other hand fully possible. Put simply, it defines the process of ensuring exactly one visible side-effect also in the face of double processing. Either by using a sophisticated atomic commit protocol (XA/2PC) that all involved heterogeneous systems can participate in, using natural idempotency (immutable/append-only) or idempotency by de-duplication. For example by transactionally storing a consumer offset, or using UPSERTs.

If we can atomically commit both the passing of a message and the side effects of its processing, we have achieved effectively-once semantics. We want the side effect(s) to become visible, if and only if, the processing is successful. When 2PC is not an option to ensure atomicity, one alternative is to make sure that none of the effects become visible and all involved heterogeneous systems remain in sync in the presence of faults.

Let's look at a practical example:

When using a classic pub-sub system (like ActiveMQ) to publish domain events and consume these events with the at-least-once guarantee, we need to ensure idempotency in the consuming process if the broker and consumer don't share the same atomic commit protocol.

If part of processing is to write to the database like in the example below, and the transaction fails, the message delivery will not be ack: ed to the broker, which in turn means the broker will attempt to redeliver it until giving up and handing it over to its dead-letter-queue (DLQ).

If the database commit is indeed successful, but the acknowledgement to the broker gets lost due to the app server node crashing or quantum entanglement happening to delay the response, then the broker will again attempt a re-delivery.

When redelivery occurs, the event will be de-duplicated (as in a no-op) by the fact that we use UPSERTs when writing to the database. If the database commit is successful, but the commit "ack" to the consumer times out, the same outcome will unfold.

Timeouts are particularly nasty since the outcome is indeterminate. You can't tell if an operation took place or not. An ack means it did, a nack means it did not. When there's no answer, you just can't tell since the information is absent. You can always relax on the constraints and run with assumptions but it doesn't change the fact.

The Outbox Pattern

To recap, the transactional outbox pattern avoids the non-atomic, dual write problem where different systems risk ending up out-of-sync. It's straightforward to implement in CockroachDB for example by using a CDC webhook along with TTLs to clear out published events.

A variant of the outbox can be called "anti-outbox" for lack of a better name. Using CDC transformations on the source tables directly to produce the change events means you don't need an explicit outbox table storing the events. That way you can save storage costs and avoid the cleanup overhead that comes with managing outbox events.

The Inbox Pattern

The Inbox Pattern is very similar to the Outbox Pattern and it's the inverse of it. It refers to the concept of storing incoming messages or events from a message-passing system directly into a persistent storage like a database and deferring the processing to a later time (or to a different system). This is a natural fit for pub-sub systems that don't retain messages after delivery, which helps to reduce queue/topic size and backpressure against the publishers.

Using the Inbox Pattern, we store the event in the database first (as-is) and then acknowledge the message to the broker. If the database commit is unsuccessful, the message will be re-delivered. If the acknowledgements get lost or times out between the broker, app or database, the message will also be re-delivered.

Upon message delivery, we de-duplicate using the information in the event to check if it's been observed before by doing an INSERT INTO .. ON CONFLICT DO NOTHING aka UPSERT. This is preconditioned that the event contains an ID that can be used for this deduplication (like a UUID).

There's no atomic commit protocol across the message passing system, application and database, but it's not needed either due to the message de-duplication. There are just local database transactions.

The actual intended business processing for the event, with other potentially business-relevant side effects, can be achieved by hooking up a change feed on the inbox table. After the local database transaction commits, a change event is either emitted to Kafka for downstream processing or to a self-subscribing webhook endpoint that can trigger another business process.

That closes the cycle where we have achieved effectively-once processing semantics, end-to-end.

If you are still awake, you may ask the question: Isn't this just pushing the problem around? When the message arrives at the CDC endpoint, we are again dealing with the same issue, aren't we? In terms of at-least-once semantics, yes but at that stage, we have elided the risk of the ingress pub-sub system and egress database disagreeing on the outcome of that message exchange and its durably stored.

Use Case Example

The use case example is a customer registration workflow. We have a system that fires off customer registration domain events to a pub-sub system topic. These events are then received by an inbox subscriber that writes them to the database.

As far as this example goes, that's where things end. A continuation could be to hook a change feed to the inbox (journal) table and further curate the event as it's progressing through the registration journey.

Let's look at a few implementation details. First, the inbox event table schema, called journal:

CREATE SEQUENCE journal_seq START 1 INCREMENT 1;

CREATE TABLE journal
(
    id          uuid primary key as ((payload ->> 'id')::UUID) stored,    event_type  varchar(15) not null,
    status      varchar(64) not null,
    sequence_no int         default nextval('journal_seq'),
    payload     json,
    tag         varchar(64),
    updated_at  timestamptz default clock_timestamp(),

    INVERTED INDEX event_payload (payload)
);

CREATE INDEX idx_journal_main ON journal (event_type) STORING (status, sequence_no, payload, tag, updated_at);

The event publisher is straightforward:

@Component
public class RegistrationEventProducer {
    protected final Logger logger = LoggerFactory.getLogger(getClass());

    @Autowired
    private JmsTemplate jmsTemplate;

    @Value("${active-mq.topic}")
    private String topic;

    public void sendMessage(RegistrationEvent event) {       
        jmsTemplate.convertAndSend(topic, event);
    }
}

The consumer is also straightforward:

@Component
public class RegistrationConsumer {
   ...

    @JmsListener(destination = "${active-mq.topic}", containerFactory = "jmsListenerContainerFactory")
    @Transactional(propagation = Propagation.REQUIRES_NEW)
    public void receiveMessage(RegistrationEvent event) {

        // Upsert to inbox table (journal) by de-duplicating on the event ID since best we get is at-least-once
        // delivery. With JDBC we could use INSERT INTO .. ON CONFLICT DO NOTHING.
        RegistrationJournal journal = registrationJournalRepository.findById(event.getId()).orElseGet(() -> {
            RegistrationJournal newRegistration = new RegistrationJournal();
            return newRegistration;
        });

        if (journal.isNew()) {
            journal.setEvent(event);
            registrationJournalRepository.save(journal);
        }
    }
}

Then it's just a matter of configuring a change feed on the journal table and do processing at the business end of it (not included in the demo).

Implementation Tutorial

This tutorial assumes you run everything on a local machine/laptop.

Prerequisites

  • ActiveMQ 5

  • CockroachDB v22.1+

  • JDK 19+ (OpenJDK compatible)

  • Maven 3.1+ (optional)

ActiveMQ Setup

Although ActiveMQ 5 claims to be JDK 1.8 compatible, it's compiled with a class version beyond that so JDK 19 or higher is needed.

Linux:

wget https://downloads.apache.org/activemq/5.17.3/apache-activemq-5.17.3-bin.tar.gz
tar zxvf apache-activemq-5.17.3-bin.tar.gz
cd apache-activemq-5.17.3/bin
./activemq console

OSX:

brew install apache-activemq

The admin UI is available on: http://127.0.0.1:8161/admin/.

The default login is admin/admin.

Running

To run the demo, first clone the GitHub repo and build the component:

git clone git@github.com:kai-niemi/roach-spring-boot.git
cd roach-spring-inbox
./mvnw clean install
cd spring-boot-inbox

Create the databases in CockroachDB using the DB console:

CREATE database spring_boot;

Then start the inbox service:

java -jar target/spring-boot-inbox.jar &

When the service comes up, you can use your browser to inspect the API:

Next, either get and submit a form or inline the payload:

Using the form method:

curl http://localhost:8090/registration/form > form.json

curl -v -d "@form.json" -H "Content-Type:application/json" -X POST http://localhost:8090/registration/

Using the inlined method:

curl -v -d '{"name":"User","email":"user@email.com","jurisdiction":"mga","createdAt":"2023-01-12T09:21:04.571+00:00"}' -H "Content-Type:application/json" -X POST http://localhost:8090/registration/

To observe that the events were received and stored in the journal:

curl http://localhost:8090/journal/registration-events?jurisdiction=mga

Conclusion

In this article, we looked at the Inbox Pattern in contrast to the more commonly discussed Outbox Pattern for providing exactly-once processing semantics without using 2PC.

The source code for examples of this article can be found on GitHub.