Leveraging Spring State Machine with CockroachDB

Using Spring State Machine 3.x with CockroachDB

·

11 min read

This article focuses on a practical use case example for Spring State Machine together with CockroachDB for persistent state storage.

Introduction

A state machine, also called a finite-state machine (FSM) or finite-state automaton, is a mathematical model of computation used to build an abstract machine with its roots way back in the 1940s. This abstract machine can be in exactly one of a finite number of states at any given time. Each state represents the status of the system that can move to another state through events or signals, called transitions. You interact with the state machine by sending events, listening to actions, or requesting the current state. You can progress through a workflow by sending events, making it a good fit for reactive, event-driven architectures.

This model is elegant and powerful since the behaviour of a system becomes more precise, consistent and readable. It helps to model certain types of complex business logic around the notion of system states and actions.

Use Cases

Typical use cases for state machines are event-driven applications where behaviour changes based on known business events. Such as order fulfilment, payment workflows, logistics and loyalty systems.

Gaming

It's also commonly used in game engines to tailor certain types of player vs AI behaviour. Modelling behaviour by using states with transition pre and post-conditions avoid many of the if, then, else and switch flow control structures you otherwise need, which can quickly form an unreadable ball of mud.

(Below) A visual example of a danger assessment FSM prioritizing an event queue:

(Below) Another example of a FSM for closer AI threat assessment:

BPMs

Business process management (BPM) engines are also a type of state machine engine with a fairly high level of sophistication and tooling. It can be a good fit for very complex and long-running business processes but comes with a high cost of complexity and the need for specialists.

Spring Statemachine is a lightweight alternative with a much smaller footprint but it still has all the fundamentals you would need from a state machine engine.

Terminology

  • States - The specific states of the state machine that are finite and predetermined.

  • Events - Something that happens in the system that can cause a state change.

  • Actions - Side-effects in reaction to events fired, which can be calling a method, invoking a foreign API, writing to a database and so on.

  • Transitions - Type of action which changes state.

  • Guards - Pre-conditions as boolean predicates to control transitions.

  • Extended State - Application state that is separate from the state machine, like variables or computed values.

Creating a Payments State Machine

In this practical example, we are going to model a typical "two-phase" credit card payment workflow and use CockroachDB to store the current state of the payment as it progresses through this flow.

Spring Statemachine (SSM) is modelled around describing states and events using Java enumerations, so let's begin there. In our system, a credit card payment will have the following states:

public enum PaymentState {
    CREATED("Initial state"),
    AUTHORIZED("Charge approved by processor"),
    AUTH_ERROR("Charge declined by processor"),
    ABORTED("Payment aborted before auth"),
    CANCELLED("Payment cancelled before capture"),
    CAPTURED("Payment verified and settled by processor"),
    CAPTURE_ERROR("Authorized charge declined by processor"),
    REVERSED("Captured payment refunded"),
    REVERSE_ERROR("Captured reversal failed");

    String note;

    PaymentState(String note) {
        this.note = note;
    }

    public String getNote() {
        return note;
    }
}

Our payment SM has the following events:

public enum PaymentEvent {
    ABORT("Abort payment"),
    AUTHORIZE("Contact processor for charge authorization"),
    AUTH_APPROVED("Processor approved charge"),
    AUTH_DECLINED("Processor rejected charge"),
    CANCEL("Approved charge cancellation"),
    CAPTURE("Authorized amount settlement"),
    CAPTURE_SUCCESS("Capture approved"),
    CAPTURE_FAILED("Capture failed"),
    REVERSE("Captured amount reversal"),
    REVERSE_SUCCESS("Refund successful"),
    REVERSE_FAILED("Refund failure");

    String note;

    PaymentEvent(String note) {
        this.note = note;
    }

    public String getNote() {
        return note;
    }
}

In summary, to visualize all of this in a state diagram:

This workflow represents a typical two-phase payment which is a common payment type used for card payments, mobile payments and invoice payments. It's performed in two steps (hence the name) - an authorization that reserves the payer's funds and then a capture of those funds which is a form of settlement.

As the state diagram illustrates, you can abort a new payment before it's authorized and cancel it before it's captured. Capture is a term used to finally charge the payer's card or for the payment to be billed by invoice. After the funds are captured, a reversal can be done to return funds to the payer.

Other payment types omit the capture stage and directly settle funds at the point of authorization, called a one-phase payment.

Maven Dependencies

Add the maven dependency:

<dependency>
    <groupId>org.springframework.statemachine</groupId>
    <artifactId>spring-statemachine-starter</artifactId>
    <version>3.2.0.RELEASE</version>
</dependency>

Since we are going to use JPA, also add that:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-jpa</artifactId>
    <version>3.0.2</version>
</dependency>

State Machine Configuration

Using the specified states and events, let's go ahead and define the state machine:

@Configuration
@EnableStateMachineFactory
public class StateMachineConfiguration extends StateMachineConfigurerAdapter<PaymentState, PaymentEvent> {
    @Override
    public void configure(StateMachineStateConfigurer<PaymentState, PaymentEvent> states) throws Exception {
        states.withStates()
                .initial(PaymentState.CREATED)
                .states(EnumSet.allOf(PaymentState.class))
                .end(PaymentState.REVERSED)
                .end(PaymentState.REVERSE_ERROR)
                .end(PaymentState.AUTH_ERROR)
                .end(PaymentState.CAPTURE_ERROR)
                .end(PaymentState.ABORTED)
                .end(PaymentState.CANCELLED);
    }

    @Override
    public void configure(StateMachineTransitionConfigurer<PaymentState, PaymentEvent> transitions) throws Exception {
        transitions
                // Branches from state CREATED
                .withExternal().source(PaymentState.CREATED).target(PaymentState.CREATED)
                .event(PaymentEvent.AUTHORIZE)
                .action(Actions.errorCallingAction(authAction, errorAction)).guard(paymentIdGuard)
                .and()
                .withExternal().source(PaymentState.CREATED).target(PaymentState.AUTHORIZED)
                .event(PaymentEvent.AUTH_APPROVED)
                .and()
                .withExternal().source(PaymentState.CREATED).target(PaymentState.AUTH_ERROR) // end state
                .event(PaymentEvent.AUTH_DECLINED)
                .and()
                .withExternal().source(PaymentState.CREATED).target(PaymentState.ABORTED) // end state
                .event(PaymentEvent.ABORT)
                .action(Actions.errorCallingAction(abortAction, errorAction))

                // Branches from state AUTHORIZED
                .and()
                .withExternal().source(PaymentState.AUTHORIZED).target(PaymentState.AUTHORIZED)
                .event(PaymentEvent.CAPTURE)
                .action(Actions.errorCallingAction(captureAction, errorAction))
                .and()
                .withExternal().source(PaymentState.AUTHORIZED).target(PaymentState.CAPTURED)
                .event(PaymentEvent.CAPTURE_SUCCESS)
                .and()
                .withExternal().source(PaymentState.AUTHORIZED).target(PaymentState.CAPTURE_ERROR) // end state
                .event(PaymentEvent.CAPTURE_FAILED)
                .and()
                .withExternal().source(PaymentState.AUTHORIZED).target(PaymentState.CANCELLED) // end state
                .event(PaymentEvent.CANCEL)
                .action(Actions.errorCallingAction(cancelAction, errorAction))

                // Branches from state CAPTURED
                .and()
                .withExternal().source(PaymentState.CAPTURED).target(PaymentState.CAPTURED)
                .event(PaymentEvent.REVERSE)
                .action(Actions.errorCallingAction(reverseAction, errorAction))
                .and()
                .withExternal().source(PaymentState.CAPTURED).target(PaymentState.REVERSED) // end state
                .event(PaymentEvent.REVERSE_SUCCESS)
                .and()
                .withExternal().source(PaymentState.CAPTURED).target(PaymentState.REVERSE_ERROR)
                .event(PaymentEvent.REVERSE_FAILED);
    }
}

Now we can wire in the state machine factory for this configuration:

@Autowired
private StateMachineFactory<PaymentState, PaymentEvent> stateMachineFactory;

In the next section, we ask the factor for an instance of the state machine and start it.

StateMachine<PaymentState, PaymentEvent> sm = stateMachineFactory.getStateMachine(UUID.randomUUID());

sm.startReactively().subscribe();
logger.info("State initially: {}", sm.getState().toString());

sm.sendEvent(Mono.just(MessageBuilder.withPayload(PaymentEvent.AUTHORIZE).build())).subscribe();
logger.info("State after authorize: {}", sm.getState().toString());

sm.sendEvent(Mono.just(MessageBuilder.withPayload(PaymentEvent.AUTH_APPROVED).build())).subscribe();
logger.info("State after auth_approved: {}", sm.getState().toString());

sm.sendEvent(Mono.just(MessageBuilder.withPayload(PaymentEvent.AUTH_DECLINED).build())).subscribe();
logger.info("State after auth_declined: {}", sm.getState().toString());

Actions

Actions are executed around state transactions where you can perform whatever business logic that is needed. In this payment flow, let's focus on one of the actions - the authorize action which in the real world would invoke the bank to authorize (or decline) a charge amount.

The authorize action is defined at the very beginning:

transitions
.withExternal().source(PaymentState.CREATED).target(PaymentState.CREATED)
                .event(PaymentEvent.AUTHORIZE)
                .action(Actions.errorCallingAction(authAction, errorAction)).guard(paymentIdGuard)
                .and()
...

From state CREATED to state CREATED (self-invoke), triggered by the AUTHORIZE event, perform the authorization action and if it fails, call the errorAction. The pre-condition for this transition is satisfied by the paymentIdGuard that just checks for the header.

@Component
public class AuthorizeAction extends implements Action<PaymentState, PaymentEvent> {
    @Override
    public void execute(StateContext<PaymentState, PaymentEvent> context) {
        Object paymentId = context.getMessageHeader(PaymentServiceImpl.PAYMENT_ID_HEADER);

        int randomErrorProbability = ..

        if (Randomizer.withProbability(() -> true, () -> false, randomErrorProbability)) {
            getLogger().info("Authorize approved! {}", paymentId);
            context.getStateMachine().sendEvent(MessageBuilder.withPayload(PaymentEvent.AUTH_APPROVED)
                    .setHeader(PaymentServiceImpl.PAYMENT_ID_HEADER,
                           context.getMessageHeader(PaymentServiceImpl.PAYMENT_ID_HEADER))
                    .build());

        } else {
            getLogger().info("Authorize declined! {}", paymentId);
           context.getStateMachine().sendEvent(MessageBuilder.withPayload(PaymentEvent.AUTH_DECLINED)
                    .setHeader(PaymentServiceImpl.PAYMENT_ID_HEADER,
                            context.getMessageHeader(PaymentServiceImpl.PAYMENT_ID_HEADER))
                    .build());
        }
    }
}

In the authorize action, we're not calling any external API but using a probability factor to either approve or decline the authorization. This is done by again sending an event to the state machine. We also provide the extended state in form of a header value holding the unique payment ID. This payment ID in turn is validated through a guard which wouldn't allow this transition unless it was set.

Entity Model

The entity model is simple, it's just one single table and entity called Payment which holds a few attributes like the state and charge amount.

@Entity
@Table(name = "payment")
@DynamicInsert
@DynamicUpdate
public class Payment extends AbstractEntity<Long> {
    @Id
    @Column(updatable = false, nullable = false)
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;

    @Enumerated(EnumType.STRING)
    private PaymentState state;

    @Column
    private String merchant;

    @Column
    private BigDecimal amount;

    @Override
    public Long getId() {
        return id;
    }
..
}

Repository

To persist and manage payments in the database, we use a simple Spring Data repository:

import org.springframework.data.jpa.repository.JpaRepository;

import io.roach.demo.statemachine.domain.Payment;

public interface PaymentRepository extends JpaRepository<Payment, Long> {
}

The database topic is covered in more detail below.

Service

Next, there's a payment service which acts as a facade against the state machine.

public interface PaymentService {
    Payment createPayment(Payment payment);

    Optional<Payment> findPayment(Long paymentId);

    StateMachine<PaymentState, PaymentEvent> authorizePayment(Long paymentId);

    StateMachine<PaymentState, PaymentEvent> capturePayment(Long paymentId);

    StateMachine<PaymentState, PaymentEvent> refundPayment(Long paymentId);

    StateMachine<PaymentState, PaymentEvent> cancelPayment(Long paymentId);

    StateMachine<PaymentState, PaymentEvent> abortPayment(Long paymentId);
}

The service implementation is pretty standard where the main thing happening is loading the payment entity by id and sending events to the state machine. Notice also the @Transactional annotation with REQUIRES_NEW propagation, signalling it's a boundary. It uses explicit transactions to load and update the payment entity.

    @Transactional(propagation = Propagation.REQUIRES_NEW)
    @Override
    public StateMachine<PaymentState, PaymentEvent> authorizePayment(Long paymentId) {
        StateMachine<PaymentState, PaymentEvent> sm = load(paymentId);
        sendEvent(paymentId, sm, PaymentEvent.AUTHORIZE);
        return sm;
    }
..

Let's look closer at the load method that is a bit special. It has to do with the fact we persist each state transition in the database using an interceptor. The interceptor (listener) needs to be weaved into the state machine when loading it in each business method.

First, the payment is loaded by reference which means only the ID is read and other attributes are lazy-initialized, hence the payment object reference here is a lazy proxy.

Then we ask the state machine factory to instantiate a new state machine and we give it the payment ID. To add the interceptor, we need to first stop the state machine, then reset it and finally restart it.

private StateMachine<PaymentState, PaymentEvent> load(Long paymentId) {
    Payment payment = paymentRepository.getReferenceById(paymentId);

    StateMachine<PaymentState, PaymentEvent> sm = stateMachineFactory.getStateMachine(
            Long.toString(payment.getId()));

    sm.stopReactively().block();

    sm.getStateMachineAccessor()
            .doWithAllRegions(sma -> {
                sma.addStateMachineInterceptor(paymentStateChangeInterceptor);
                sma.resetStateMachineReactively(
                                new DefaultStateMachineContext<>(payment.getState(), null, null, null))
                        .block();
            });

    sm.startReactively().block();

    return sm;
}

State Change Interceptor

The state change interceptor is used to update the state of the payment before each state change. It overrides the preStateChange method and uses the payment repository to load and persist the change, all while expecting an active transaction context.

If this method throws an exception of any kind, it will be silently swallowed (but logged) by the state machine and the state transition is denied. This is sort of a pickle since the outer transaction boundary method (the payment service) is not aware of this and will proceed with the commit.

@Component
public class PaymentStateChangeInterceptor extends StateMachineInterceptorAdapter<PaymentState, PaymentEvent> {
    @Autowired
    private PaymentRepository paymentRepository;

    @Override
    public void preStateChange(State<PaymentState, PaymentEvent> state, Message<PaymentEvent> message,
                               Transition<PaymentState, PaymentEvent> transition,
                               StateMachine<PaymentState, PaymentEvent> stateMachine,
                               StateMachine<PaymentState, PaymentEvent> rootStateMachine) {
        super.preStateChange(state, message, transition, stateMachine, rootStateMachine);

        Optional.ofNullable(message).ifPresent(msg -> {
            Long paymentId = Long.class.cast(
                    msg.getHeaders().getOrDefault(PaymentServiceImpl.PAYMENT_ID_HEADER, -1L));
            if (paymentId != null) {
                Assert.isTrue(TransactionSynchronizationManager.isActualTransactionActive(), "No transaction context!");

                Payment payment = paymentRepository.getReferenceById(paymentId);
                payment.setState(state.getId());

                paymentRepository.save(payment);
            }
        });
    }
}

Handling Retries

If you are familiar with CockroachDB or any other RDBS where you are using serializable isolation, you are aware of the importance of adopting retry logic for transient SQL errors.

Assuming the state machine is initialized concurrently by different clients or even threads, there's a chance of state transition contention conflicts. Transaction T1 may attempt to transition from CREATED to AUTHORIZED while T2 transitions from CREATED to ABORTED. This must not be allowed but there's no inter-process or cross-thread coordination between state machines. Instead, we depend on the database for that and since CockroachDB only runs in serializable, we are guaranteed that there will be no anomalies. To prevent such anomalies, the database may raise transient SQL state 40001 errors that we should intercept and handle.

State machines are a bit tricky in the sense there's not always an explicit point in the code that you identify as the boundary. Retry logic must always surround the transaction boundary points. In our case, we do have explicit boundaries marked with @Transactional only we can't just throw in an around advice that replays the method because these errors are swallowed downstream in the state machine interceptor.

There are a few different ways to fix this, a few being:

  • Capture transient exceptions in the interceptor and add these as extended state variables to be picked up by the service transaction boundary which rolls back the transaction. Then add a retry AOP around advice for the service methods.

  • Redesign the transaction semantics and model in infrastructure failures to the state machine with an option for retry and recovery. This isn't trivial and makes the transaction boundaries even more blurred out.

  • Use the CockroachDB JDBC driver with internal retries and select-for-update query rewrites. This means the SFU rewrites will likely reduce or eliminate the transient exceptions but there's also a retry mechanism in the driver itself to take care of stragglers.

Turns out that for this payment state machine, the JDBC alternative works just as well as the first option of using client-side retries.

JDBC Driver Retries

This is enabled by using the CockroachDB JDBC driver with these specific parameters enabled:

  • implicitSelectForUpdate = true

  • retryTransientErrors = true

Example application.yml:

spring:
  datasource:
    url: jdbc:cockroachdb://kai-odin-hnb.aws-eu-north-1.cockroachlabs.cloud:26257/spring_sm_demo?sslmode=require&implicitSelectForUpdate=true&retryTransientErrors=true
    username: guest
    password: UqhyOq3l_M8Yn_Uq0S4VvA
    driver-class-name: io.cockroachdb.jdbc.CockroachDriver

The Maven dependency required:

<dependency>
    <groupId>io.cockroachdb.jdbc</groupId>
    <artifactId>cockroachdb-jdbc-driver</artifactId>
    <version>1.0.0</version>
</dependency>

Client Retries

Using client-side retries is quite straightforward using AOP and an around-advice, intercepting all method joinpoints that are annotated with @Transactional.

You do however need to tag the state machine extended state with the transient errors, which are thrown somewhere in the scope of the transactional business service methods.

// transientException - derived from org.springframework.dao.TransientDataAccessException with underlying SQLException with state 40001.
stateMachine.getExtendedState().getVariables().put("error", transientException);

See full implementation here.

@Configuration
@EnableAspectJAutoProxy
public class AopConfig {
    @Bean
    @Profile("retry-client")
    public TransactionRetryAspect transactionRetryAspect() {
        return new TransactionRetryAspect();
    }
}

Tests

The following test will summarize the typical payment flow:

    @RepeatedTest(10)
    @Order(1)
    public void whenAuthorizePayment_expectAuthorizedOrRejectedState() {
        Payment payment = paymentService.createPayment(this.payment);

        Assertions.assertEquals(PaymentState.CREATED, payment.getState());

        StateMachine<PaymentState, PaymentEvent> sm = paymentService.authorizePayment(payment.getId());

        Assertions.assertTrue(EnumSet.of(PaymentState.AUTH_ERROR, PaymentState.AUTHORIZED)
                .contains(sm.getState().getId()));

        Payment authedPayment = paymentService.findPayment(payment.getId())
                .orElseThrow(() -> new ObjectRetrievalFailureException(Payment.class, payment.getId()));

        Assertions.assertTrue(EnumSet.of(PaymentState.AUTH_ERROR, PaymentState.AUTHORIZED)
                .contains(authedPayment.getState()));
    }

Conclusion

This article explains the concept of a state machine, which is a mathematical model of computation used to build an abstract machine, and provides a practical example of payments using Spring Statemachine. It explains the terminology used in state machines, such as states, events, actions, transitions, guards and extended states. It also provides code snippets for setting up a state machine for a payment flow, with actions to be performed around state transitions.

Did you find this article valuable?

Support Kai Niemi by becoming a sponsor. Any amount is appreciated!