Go for Java developers (or is the Java concurrency that bad?!)
Nicola Apicella
Posted on November 29, 2017
I am by no means an expert in Go, indeed quite the opposite. I am currently trying to get familiar with it. I started getting familiar with the syntax, the memory and the concurrency model. As usual for me, I am trying to contrast it with something I already know, like Java.
So I stumbled in this interesting talk in which the great Sajma introduced the Go concurrency model with some examples. The slides for the talk and the examples are here. Not far in the talk, a question popped up: think about what it would take to implement the same thing in other languages like Java.
Is it really that hard? I was not that sure, I mean, Java does not have 'select' statement, neither it has built-in channels, but it should not be difficult to replicate the examples in Java or is it?
So I though I could have some fun implementing the examples in Java.
Go concurrency
Before getting to the example, this is a streamlined recap of the talk (by the way, it's a cool talk, so I really suggest you to watch it).
- Go concurrency
- The concurrency model is based on Communication sequential Processes (Hoare, 1978)
- Concurrent programs are structured as independent processes that execute sequentially and communicate by passing messages.
- "Don't communicate by sharing memory, share memory by communicating"
- Go primitives: go routines, channels and the select statement
- Go routines
- It's a lightweight thread (it's not a thread)
- Channel provide communication between go routines (analougous to synchronized queue in Java)
- Select multiplex communication among go routines
The example
In the examples we have to build an hypothetical client which queries google services (web, image and video services). Ideally, we would like to query those services in parallel and aggregate the answers. All the code for the examples is in github.com/napicella/go-for-java-programmers.
So let's get started.
First example: querying Google search in parallel
This is how the go code looks like:
func Google(query string) (results []Result) {
c := make(chan Result)
go func() { c <- Web(query) }()
go func() { c <- Image(query) }()
go func() { c <- Video(query) }()
for i := 0; i < 3; i++ {
result := <-c
results = append(results, result)
}
return
}
What about Java? My solution involves using CompletableFuture like the following.
public void google(String query) throws ExecutionException, InterruptedException {
CompletableFuture<String>[] futures = new CompletableFuture[] {
supplyAsync(() -> web(query)),
supplyAsync(() -> image(query)),
supplyAsync(() -> video(query))
};
List<String> result = new ArrayList<>();
allOf(futures)
.thenAccept((ignore) -> Arrays.stream(futures)
.map(this::safeGet)
.forEach(result::add)).get();
// Note: calling get is necessary only because I want to print the result
// before returning from the function
System.out.println(result);
}
protected String safeGet(Future<String> future) {
try {
return future.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
return "";
}
The web, image and video services are just mocks with random sleeps.
So, what's the difference between the java code and go one? The java code is a bit more verbose and the code does not use message passing between threads like in Go, besides that they look really similar.
Let's move to the second example.
Second example: timeout
What if we don't want to wait for slow servers? We can use a timeout!
The idea is to wait until either all the servers replies to our request or the timeout goes off.
func Google(query string) (results []Result) {
c := make(chan Result, 3)
go func() { c <- Web(query) }()
go func() { c <- Image(query) }()
go func() { c <- Video(query) }()
timeout := time.After(80 * time.Millisecond)
for i := 0; i < 3; i++ {
select {
case result := <-c:
results = append(results, result)
case <-timeout:
fmt.Println("timed out")
return
}
}
return
Let's see how that would look like in java:
public void googleWithTimeout(String query) throws ExecutionException, InterruptedException {
// This is the first difference with the go example, the result array must
// be a synchronized list.
// Go channel are completely thread safe, so it's totally okay to funnel
// data from multiple go routines to an array.
List<String> result = Collections.synchronizedList(new ArrayList<>());
// this is not safe since it's possible that all the threads in the thread
// pool (default to ForkJoin) are busy, so the timer won't start
CompletableFuture<Void> timeout = runAsync(() -> timeout(TIMEOUT_MILLIS));
anyOf(
allOf(runAsync(() -> result.add(web(query))),
runAsync(() -> result.add(image(query))),
runAsync(() -> result.add(video(query)))),
timeout
).get();
System.out.println(result);
}
protected Void timeout(int millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
e.printStackTrace();
}
return null;
}
In the Java example there is a substantial difference to the go one: the tasks share the result array, so for the java code to work, we need a synchronized array. On the other hand, Go channel are completely thread safe, so it's totally okay to funnel data from multiple go routines to an array.
As mentioned in the comment the use of timeout is not completely safe indeed it's possible that all the threads in the thread pool (default to ForkJoin) are busy so the timer won't start. We can obviously run a Thread with a different ExecutorService or just manually create a Thread and run it.
protected CompletableFuture<Void> timeout(int millis) {
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
final CompletableFuture<Void> timeout = new CompletableFuture<>();
executorService.schedule(() -> {
timeout.complete(null);
}, millis, TimeUnit.MILLISECONDS);
return timeout;
}
Third example : Reduce tail latency using replicated search servers.
In go:
func Google(query string) (results []Result) {
c := make(chan Result, 3)
go func() { c <- First(query, Web1, Web2) }()
go func() { c <- First(query, Image1, Image2) }()
go func() { c <- First(query, Video1, Video2) }()
timeout := time.After(80 * time.Millisecond)
for i := 0; i < 3; i++ {
select {
case result := <-c:
results = append(results, result)
case <-timeout:
fmt.Println("timed out")
return
}
}
return
where the function First is defined as follow:
func First(query string, replicas ...Search) Result {
c := make(chan Result, len(replicas))
searchReplica := func(i int) { c <- replicas[i](query) }
for i := range replicas {
go searchReplica(i)
}
return <-c
}
Let's see in Java
public void googleWithReplicatedServers(String query) throws ExecutionException, InterruptedException {
List<String> result = Collections.synchronizedList(new ArrayList<>());
// Unfortunately this does not work as expected because the inner anyOf
// won't stop the other calls, so the result might end up having
// duplicates, i.e [some-image, some-image, some-video]
anyOf(
allOf(
anyOf(runAsync(() -> result.add(web(query))), runAsync(() -> result.add(webReplica(query)))),
anyOf(runAsync(() -> result.add(image(query))), runAsync(() -> result.add(imageReplica(query)))),
anyOf(runAsync(() -> result.add(video(query))), runAsync(() -> result.add(videoReplica(query))))
),
timeout(TIMEOUT_MILLIS)
).get();
System.out.println(result);
}
Unfortunately the code does not quite work because when one of the future completes will add the result in the array but the execution of the other one will continue causing a duplicate in the result.
Let's correct that:
// replicate servers and use the first response - fixing the problem mentioned
// earlier by using supplyAsync + thenAccept instead of runAsync
public void googleWithReplicatedServers2(String query) throws ExecutionException, InterruptedException {
List<String> result = Collections.synchronizedList(new ArrayList<>());
anyOf(
allOf(
anyOf(supplyAsync(() -> web(query)),
supplyAsync(() -> webReplica(query))).thenAccept((s) -> result.add((String) s)),
anyOf(supplyAsync(() -> image(query)),
supplyAsync(() -> imageReplica(query))).thenAccept((s) -> result.add((String) s)),
anyOf(supplyAsync(() -> video(query)),
supplyAsync(() -> videoReplica(query))).thenAccept((s) -> result.add((String) s))
),
timeout(TIMEOUT_MILLIS)
).get();
System.out.println(result);
}
// same as above, but this time we use the function 'first', which is really
// just a wrapper around CompletableFuture.anyOf
public void googleWithReplicatedServers3(String query) throws ExecutionException, InterruptedException {
List<String> result = Collections.synchronizedList(new ArrayList<>());
anyOf(
allOf(
first(query, Google::web, Google::webReplica).thenAccept((s) -> result.add((String) s)),
first(query, Google::image, Google::imageReplica).thenAccept((s) -> result.add((String) s)),
first(query, Google::video, Google::videoReplica).thenAccept((s) -> result.add((String) s))
),
timeout(TIMEOUT_MILLIS)
).get();
System.out.println(result);
}
Conclusions
Besides the fact I had some fun with CompletableFuture, the clear advantage of Go is really the fact that the concurrency model is built in the language itself which simplifies the communication among different agents.
On the other side, I am not sure why they dropped OOP support, like classes for example. I mean, what's wrong with OOP?
Posted on November 29, 2017
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.