CockroachDB JDBC Driver: Part III - Bulk Rewrites

Enhancing the CockroachDB JDBC Driver with Batch Rewrites for Improved Performance

·

5 min read

The CockroachDB JDBC driver wraps the PostgreSQL driver and offers performance optimizations that are transparent towards applications.

Article series on the JDBC driver:

Introduction

This article will highlight one specific optimization feature, namely, batch DML rewrites for bulk operations using FROM clause with array unnesting. It's a mouthful, but conceptually it's a transparent rewrite at the driver level of batch DML updates like:

UPDATE product SET inventory=?, price=? WHERE id=1
UPDATE product SET inventory=?, price=? WHERE id=2
UPDATE product SET inventory=?, price=? WHERE id=3
...

Which can be collapsed to a single update statement using FROM:

UPDATE product SET inventory=data_table.new_inventory, price=data_table.new_price 
FROM (select unnest(?) as id, unnest(?) as new_inventory, unnest(?) as new_price) as data_table WHERE product.id=data_table.id

This will dramatically improve the performance for bulk operations that send a series of non-aggregated UPDATEs to independent rows. It's also not limited to UPDATEs either and can also rewrite INSERT and UPSERT statements. This may also improve performance since it allows for higher batch sizes than 128 which is the soft limit in the pgJDBC driver for rewriting INSERTs.

Consider the following example schema:

create table if not exists product
(
    id        uuid           not null default gen_random_uuid(),
    inventory int            not null,
    name      varchar(128)   not null,
    price     numeric(19, 2) not null,
    sku       varchar(128)   not null unique,
    primary key (id)
);

Next, let's add a few rows:

insert into product (inventory,name,price,sku)
values (10, 'A', 12.50, gen_random_uuid()),
       (10, 'B', 13.50, gen_random_uuid()),
       (10, 'C', 14.50, gen_random_uuid()),
       (10, 'D', 15.50, gen_random_uuid());

select inventory,name,price,sku from product order by name;

The listed result is equivalent to the next statement, the only difference being its temp data:

select unnest(ARRAY[10, 10, 10, 10]) as inventory,
       unnest(ARRAY['A', 'B', 'C', 'D']) as name,
       unnest(ARRAY[12.50, 13.50, 14.50, 15.50]) as price,
       unnest(ARRAY[gen_random_uuid(),
                    gen_random_uuid(),
                    gen_random_uuid(),
                    gen_random_uuid()]) as sku
order by name;

The unnest function uses arrays to generate a temporary table with each array representing a column. Now, let's flip this over to INSERT into FROM:

insert into product (inventory,name,price,sku)
(select unnest(ARRAY[10, 10, 10, 10]) as inventory,
        unnest(ARRAY['A', 'B', 'C', 'D']) as name,
        unnest(ARRAY[12.50, 13.50, 14.50, 15.50]) as price,
        unnest(ARRAY[gen_random_uuid(),
                     gen_random_uuid(),
                     gen_random_uuid(),
                     gen_random_uuid()]) as sku);

That will create four new products out of the contents of the arrays. When put into a JDBC-prepared statement context:

try (PreparedStatement ps = connection.prepareStatement(
        "INSERT INTO product(id,inventory,price,name,sku)"
                + " select"
                + "  unnest(?) as id,"
                + "  unnest(?) as inventory, unnest(?) as price,"
                + "  unnest(?) as name, unnest(?) as sku")) {
    // chunks is a segmented stream of products
    chunks.forEach(chunk -> {
        List<Integer> qty = new ArrayList<>();
        List<BigDecimal> price = new ArrayList<>();
        List<UUID> ids = new ArrayList<>();
        List<String> name = new ArrayList<>();
        List<String> sku = new ArrayList<>();

        chunk.forEach(product -> {
            ids.add(product.getId());
            qty.add(product.getInventory());
            price.add(product.getPrice());
            name.add(product.getName());
            sku.add(product.getSku());
        });

        try {
            ps.setArray(1, ps.getConnection().createArrayOf("UUID", ids.toArray()));
            ps.setArray(2, ps.getConnection().createArrayOf("BIGINT", qty.toArray()));
            ps.setArray(3, ps.getConnection().createArrayOf("DECIMAL", price.toArray()));
            ps.setArray(4, ps.getConnection().createArrayOf("VARCHAR", name.toArray()));
            ps.setArray(5, ps.getConnection().createArrayOf("VARCHAR", sku.toArray()));

            ps.executeLargeUpdate();
        } catch (SQLException e) {
            throw new RuntimeException(e);
        }
    });
}

This is technically equivalent to using addBatch() or executeLargeBatch() with the important difference that it requires the pgJDBC driver's reWriteBatchedInserts to be set to true.

From a performance standpoint, both approaches are equivalent up to a certain point which is a batch size of 128, which is the hardcoded limit in the pgJDBC driver. Using batch sizes higher than that is not possible so the array unnesting approach is more performant beyond this limit. Depending on the workload you can go up to a batch number of around 16-32K until performance starts to level out again.

The hardcoded limit of 128 may be appropriate for PostgreSQL but CockroachDB is not PostgreSQL (only at the wire protocol level) and can leverage much higher bulk statement sizes. Until this limit is removed or made configurable in pgJDBC, there are only two options:

  • Modify the pgJDBC driver and built a custom library. This is quite straightforward but requires a separate forked version to be maintained.

  • Rewrite INSERTs, UPSERTs and UPDATEs with unnesting of arrays

Bulk Inserts

To recap, here's an UPSERT example that hits the pgJDBC driver 128 size limit on INSERT rewrites.

List<Product> products = Arrays.asList(
        Product.builder().withName("A").withInventory(1).withPrice(new BigDecimal("10.15")).build(),
        Product.builder().withName("B").withInventory(2).withPrice(new BigDecimal("11.15")).build(),
        Product.builder().withName("C").withInventory(3).withPrice(new BigDecimal("12.15")).build()
        // .. etc to several 1000s
);

Stream<List<Product>> chunks = chunkedStream(products.stream(), 128);

dataSource.addDataSourceProperty("reWriteBatchedInserts",true);

try (Connection connection = dataSource.getConnection()) {
    connection.setAutoCommit(true);

    chunks.forEach(chunk -> {
        try (PreparedStatement ps = connection.prepareStatement(
                "INSERT INTO product (id,inventory,price,name,sku) values (?,?,?,?,?) ON CONFLICT (id) DO NOTHING")) {
            for (Product product : chunk) {
                ps.setObject(1, product.getId());
                ps.setObject(2, product.getInventory());
                ps.setObject(3, product.getPrice());
                ps.setObject(4, product.getName());
                ps.setObject(5, product.getSku());
                ps.addBatch();
            }
            ps.executeBatch();
        } catch (SQLException ex) {
            throw new RuntimeException(ex);
        }
    });
}

For completeness, the chunkedStream method which just slices up a stream into even chunks:

    public static <T> Stream<List<T>> chunkedStream(Stream<T> stream, int chunkSize) {
        AtomicInteger idx = new AtomicInteger();
        return stream.collect(Collectors.groupingBy(x -> idx.getAndIncrement() / chunkSize)).values().stream();
    }

This UPSERT statement is executed using implicit transactions and it's fairly fast with the reWriteBatchedInserts property. The pgJDBC rewrite feature works both for regular INSERTs and INSERT .. ON CONFLICT, aka UPSERTs.

Bulk Updates

Given the previous example, it's fair to assume UPDATEs work in the same way:

try (PreparedStatement ps = connection.prepareStatement(
    "UPDATE product SET inventory=?, price=? WHERE id=?")) {

    chunk.forEach(product -> {
        try {
            ps.setInt(1, product.getInventory());
            ps.setBigDecimal(2, product.getPrice());
            ps.setObject(3, product.getId());
            ps.addBatch();
        } catch (SQLException e) {
            throw new RuntimeException(e);
        }
    });
    ps.executeLargeBatch();  
} catch (SQLException ex) {
    throw new RuntimeException(ex);
}

However, there's no batching done here whatsoever at the JDBC driver level. To apply individual row UPDATEs in bulk format, you can however use arrays again:

try (PreparedStatement ps = connection.prepareStatement(
        "UPDATE product SET inventory=data_table.new_inventory, price=data_table.new_price "
                + "FROM (select "
                + "unnest(?) as id, "
                + "unnest(?) as new_inventory, "
                + "unnest(?) as new_price) as data_table "
                + "WHERE product.id=data_table.id")) {
    List<Integer> qty = new ArrayList<>();
    List<BigDecimal> price = new ArrayList<>();
    List<UUID> ids = new ArrayList<>();

    chunk.forEach(product -> {
        qty.add(product.addInventoryQuantity(1));
        price.add(product.getPrice().add(new BigDecimal("1.00")));
        ids.add(product.getId());
    });

    ps.setArray(1, ps.getConnection()
            .createArrayOf("UUID", ids.toArray()));
    ps.setArray(2, ps.getConnection()
            .createArrayOf("BIGINT", qty.toArray()));
    ps.setArray(3, ps.getConnection()
            .createArrayOf("DECIMAL", price.toArray()));

    ps.executeLargeUpdate(); 
} catch (SQLException e) {
    throw new RuntimeException(e);
}

The performance improvement with this approach is monumental. The only problem is that it's rather clunky and requires code refactoring.

The CockroachDB JDBC driver can however rewrite bulk DML operations on behalf of applications, which makes it transparent. See the github repo https://github.com/cloudneutral/cockroachdb-jdbc for more details.

Conclusion

This article discusses a feature of the CockroachDB JDBC driver to optimize batch updates with array unnesting, allowing for much larger batch sizes than the pgJDBC driver. It also considers the performance improvement of this approach and the code refactoring required. This approach can be used for both INSERTs and UPSERTs, as well as individual row UPDATEs.