Java RPC Application with ActiveRPC library
valerialistratova
Posted on November 26, 2020
What is ActiveRPC?
ActiveRPC is a lightweight and convenient Java library for developing high-load distributed applications and Memcached-like solutions. It introduces a completely alternative approach to microservices implementation and overcomes the overheads of utilizing HTTP protocol with JSON or XML encoding.
ActiveRPC is powered by one of the world's fastest JVM serializer ActiveSerializer, runs on TCP, and has a custom high-performance binary streaming protocol. It features highly optimized flexible server and client implementations along with predefined cloud strategies. With these technologies you can conveniently create even complex scalable solutions like high-performance microservices or Memcached-like servers.
Basic example
public class RpcExample extends Launcher {
private static final int SERVICE_PORT = 34765;
@Inject
private RpcClient client;
@Inject
private RpcServer server;
@Inject
private Eventloop eventloop;
@Provides
Eventloop eventloop() {
return Eventloop.create();
}
@Provides
RpcServer rpcServer(Eventloop eventloop) {
return RpcServer.create(eventloop)
.withMessageTypes(String.class)
.withHandler(String.class,
request -> Promise.of("Hello " + request))
.withListenPort(SERVICE_PORT);
}
@Provides
RpcClient rpcClient(Eventloop eventloop) {
return RpcClient.create(eventloop)
.withMessageTypes(String.class)
.withStrategy(server(new InetSocketAddress(SERVICE_PORT)));
}
@ProvidesIntoSet
Initializer<ServiceGraphModuleSettings> configureServiceGraph() {
// add logical dependency so that service graph starts client only after it started the server
return settings -> settings.addDependency(Key.of(RpcClient.class), Key.of(RpcServer.class));
}
@Override
protected Module getModule() {
return ServiceGraphModule.create();
}
@Override
protected void run() throws ExecutionException, InterruptedException {
CompletableFuture<Object> future = eventloop.submit(() ->
client.sendRequest("World", 1000)
);
System.out.printf("%nRPC result: %s %n%n", future.get());
}
public static void main(String[] args) throws Exception {
RpcExample example = new RpcExample();
example.launch(args);
}
}
RpcExample class extends ActiveJ Launcher to help us manage logging and the whole application lifecycle.
Next, we use dependency injection library ActiveInject to provide RpcServer
and RpcClient
with relevant configurations and required dependencies. RpcClient
sends requests with a String message to the specified server according to the provided RPC strategy (in this case, getting a single RPC-service). For RpcServer
we define the type of messages to proceed, a corresponding RpcRequestHandler
and a listener port.
Since we extend Launcher
, we will also override 2 methods: getModule
to provide ServiceGraphModule
and run to describe the main logic of the example.
Finally, we define the main
method, which will launch our example.
You can find example sources on GitHub.
Combining strategies
In the previous example we've used one of the simplest strategies which simply works with a single provided port. Yet, as it was mentioned above, ActiveRPC includes pre-defined strategies, which you can simply use and combine.
For example, let's combine Round Robin and First Available strategies. First, we create 4 connections without putting connection3
into the pool. Then we start sending 20 requests. As a result, all the requests will be equally distributed between connection1
(as it is always first available) and connection4
(as connection3
isn’t available for the pool):
pool.put(ADDRESS_1, connection1);
pool.put(ADDRESS_2, connection2);
// we don't put connection3
pool.put(ADDRESS_4, connection4);
int iterations = 20;
RpcStrategy strategy = roundRobin(
firstAvailable(servers(ADDRESS_1, ADDRESS_2)),
firstAvailable(servers(ADDRESS_3, ADDRESS_4)));
RpcSender sender = strategy.createSender(pool);
for (int i = 0; i < iterations; i++) {
sender.sendRequest(new Object(), 50, assertNoCalls());
}
Let's check out one more example, Type Dispatch Strategy.
This strategy simply distributes requests among shards in accordance to the type of the request. In the example all String requests are sent on the first shard which has First Valid Result strategy for the servers. Requests with all other types are sent to the second shard with First Available Strategy. As a result, connection1
and connection2
will process 35 requests, connection3
- 25 requests, while connection4
and connection5
- 0 requests as connection3
is always First Available:
pool.put(ADDRESS_1, connection1);
pool.put(ADDRESS_2, connection2);
pool.put(ADDRESS_3, connection3);
pool.put(ADDRESS_4, connection4);
pool.put(ADDRESS_5, connection5);
int timeout = 50;
int iterationsPerDataStub = 25;
int iterationsPerDataStubWithKey = 35;
RpcSender sender;
RpcStrategy strategy = typeDispatching()
.on(String.class,
firstValidResult(servers(ADDRESS_1, ADDRESS_2)))
.onDefault(
firstAvailable(servers(ADDRESS_3, ADDRESS_4, ADDRESS_5)));
sender = strategy.createSender(pool);
for (int i = 0; i < iterationsPerDataStub; i++) {
sender.sendRequest(new Object(), timeout, assertNoCalls());
}
for (int i = 0; i < iterationsPerDataStubWithKey; i++) {
sender.sendRequest("request", timeout, assertNoCalls());
}
In the next tutorial we'll create a Memcached-like solution that can handle up to 10 millions requests per second on a single core using ActiveRPC.
Posted on November 26, 2020
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.