Asynchronous Services
Introduction
The R6 OSGi specifications include support for Asynchronous programming using OSGi services. Apache Aries aims to provide small, compliant implementations of these specifications to enable asynchronous programming in enterprise applications. The two key specifications are OSGi Promises and the Async service.
OSGi Promises
One of the fundamental pieces of an asynchronous programming system is the Promise. A Promise is a holder type that represents an asynchronous calculation or computation. Since Java 5 the JDK has contained java.util.concurrent.Future to perform this function. Java’s Future type is, however, fatally flawed as it has no callback to notify the user when it resolves. Instead the user must make a blocking call to get().
OSGi promises fix this problem by defining a Promise interface which allows the user to register callbacks which will be called when the promise resolves. These callbacks are lambda friendly SAM types, but the Promise API itself has no dependencies on Java 8. The Aries version (org.apache.aries.async.promise.api) has a minimum requirement of JDK 6.
Creating OSGi Promises
Creating a Promise is easy. The org.osgi.util.promise.Deferred type is a factory for a single promise, and can also be used to resolve or fail the promise:
Deferred<Integer> deferred = new Deferred<>(); new Thread(() -> { int result = calculateReallyHardSum(); deferred.resolve(result); }).start(); Promise<Integer> p = deferred.getPromise(); ...
But wait - what if something goes wrong? How do we signal to the user that there was a problem and the result is never coming? The answer is very easy - Promises have the concept of failure. A promise will either resolve or fail at most once. For well written code this rule is usually the same as "A promise will either resolve or fail exactly once". Promises are thread safe and effectively immutable, meaning they can be shared with other code.
Deferred<Integer> deferred = new Deferred<>(); new Thread(() -> { try { int result = calculateErrorProneSum(); deferred.resolve(result); } catch (Exception e) { deferred.fail(e); } }).start(); Promise<Integer> p = deferred.getPromise(); ...
Using OSGi Promises
Once you have a promise, what do you do with it? It’s easy to get the value from a promise using the getValue() method, or you can use the getFailure() method to get the failure cause. Unfortunately both of these methods block until the promise resolves, and whilst the isDone() method does tell you if the Promise has completed they really aren’t the right way to use a Promise.
Promises work best when you register callbacks and/or transformations. The Promise API has a variety of useful methods for doing work with the Promise when it resolves. For example we can run a task after the promise completes successfully:
Promise<Integer> promise = ... promise.then(p -> { System.out.println("The calculator returned " + p.getValue()); return null; });
We can also register callbacks to handle failures:
Promise<Integer> promise = ... promise.then(p -> { System.out.println("The calculator returned " + p.getValue()); return null; }, p -> p.getFailure().printStackTrace());
Chaining OSGi Promises
In the previous examples our success callback returned null - why? Well the return value from a success callback is always a promise (null is a shortcut for a promise resolved with null). The promise returned by the callback represents an asynchronous execution flow in a process known as "chaining". The overall completion of this chain is represented by a third promise, returned to the caller of the then() method.
-
The caller registers a success callback, and receives a "chained" promise
-
The original promise completes successfully
-
The success callback runs and returns a promise representing another piece of asynchronous work
-
The promise returned by the success callback completes successfully
-
The "chained" promise completes with the same value as the promise from step 4.
Other Promise behaviours
As well as simple callbacks Promises also provide advanced mapping and recovery features. For example a promise can be wrapped so that if the original work fails then a new value can be supplied using a recovery function.
The org.osgi.util.promise.Promises utility class also provides useful functions for working with promises. For example helper methods to wrap an existing value or failure in a Promise, or a way of aggregating a group of promises into a single promise.
#The Async service
Most OSGi services have a synchronous API. This is usually the easiest way to think about, write, and use services. The main problem with this is that long running service calls can cause applications to run slowly, and making the calls asynchronous is both verbose and error-prone.
The Async service is designed to take away the boilerplate code needed to invoke a service asynchronously, and to convert any synchronous API into an asynchronous API, returning promises instead of values.
Using the Async service
The Async service is available in the service registry and is very easy to use - first we need to mediate the service. Mediating is a bit like creating a mock object, the mediator records method calls made against it so that they can be transformed into asynchronous calls. Mediating can apply to a concrete object, or to a ServiceReference. It is better to use a ServiceReference when one is available, as the Async service can track the availability of the backing service.
Async async = ...; ServiceReference<MyService> ref = ...; MyService mediator = async.mediate(ref, MyService.class);
or
Async async = ...; MyService svc = ...; MyService mediator = async.mediate(svc, MyService.class);
Once a service has been mediated the mediator should be called just like the real service object, and the return value passed to the Async service’s call() method. This returns a promise representing the asynchronous work.
Promise<Integer> promise = async.call(mediator.calculateReallyHardSum());
Void methods
Void methods don’t have a return value to pass to the async service, and should use the no-args version of call instead.
mediator.longRunningVoidMethod() Promise<?> promise = async.call();
Fire and Forget calls
Sometimes the user does not care when a piece of work finishes, or what value it returns, or even whether it was successful. These sorts of calls are called "fire and forget" calls, and are also supported by the async service using the execute() method.
The execute method still returns a promise, however this promise represents whether the fire and forget call successfully started or not, not whether it has completed.
Getting Started
Releases of the Async implementation can be found in Maven Central in the org.apache.aries.async group. This bundle provides a convenient all-in-one download.
The Asynchronous Services source code can be found in the Apache Aries codebase in the async
directory: https://svn.apache.org/repos/asf/aries/trunk/async