Using the Coordinator to optimise Transactions

The transaction control service provides a transactional scope around resource access, but sometimes it makes sense to delay doing this work until it can be batched efficiently together in a single transaction:

Bulk resource access

In this case we simply do each insert in a separate transaction:

public void persistMessage(String message) {
    txControl.required(() -> {
            PreparedStatement ps = connection.prepareStatement(
                    "Insert into TEST_TABLE values ( ? )");
            ps.setString(1, message);
            return ps.executeUpdate();
        });
}

If called a large number of times from an external service:

List<String> messages = getMessages();

messages.stream()
        .forEach(svc::persistMessage);

Then this code can be quite slow as the message list becomes large

The naive approach

The obvious way to reduce overhead is to batch all of the inserts into a single transaction:

List<String> messages = getMessages();

txControl.required(() -> {
        messages.stream()
            .forEach(svc::persistMessage);
        return null;
    });

This reuses the same physical connection each time, and it avoids repeated commits, so it should be faster right?

Actually it turns out that this approach can be slower for some databases. By building up a very large transaction it can actually slow down the rate at which data can be insterted.

Using the coordinator

By adding in the Coordinator we can dramatically improve our performance:

public void persistMessage(String message) {
    if(coordinator.addParticipant(this)) {
        ((List<String>)coordinator.peek().getVariables()
            .computeIfAbsent(getClass(), k -> new ArrayList<String>()))
            .add(message);
    } else {
        txControl.required(() -> {
                PreparedStatement ps = connection.prepareStatement(
                        "Insert into TEST_TABLE values ( ? )");
                ps.setString(1, message);
                return ps.executeUpdate();
            });
    }
}

@Override
public void ended(Coordination coord) throws Exception {
    txControl.required(() -> {
            List<String> l = (List<String>) coord.getVariables()
                            .get(getClass());

            PreparedStatement ps = connection.prepareStatement(
                    "Insert into TEST_TABLE values ( ? )");

            l.stream().forEach(s -> {
                    try {
                        ps.setString(1, s);
                        ps.addBatch();
                    } catch (SQLException sqle) {
                        throw new RuntimeException(sqle);
                    }
                });

            return ps.executeBatch();
        });
}

Now, if we do our bulk add inside a coordination:

coordinator.begin("foo", MINUTES.toMillis(5));
try {
    messages.stream()
            .forEach(this::persistMessage);
} finally {
    coordinator.peek().end();
}

Then we find that it is much faster! This is because we can make use of more efficient JDBC API, and because we can batch up a suitable number of inserts in a single transaction.