Merge results of parallel service requests using CompletableFuture
SUNIL KUMAR L
Posted on January 19, 2020
Here is an approach of using java CompletableFuture to ensure multiple service requests are run/executed in parallel to collect the Service/API response data and aggregate them.
Github location
SUNIL-KUMAR-L / JavaCompletableFutureDemo
Merge results of parallel service requests using CompletableFuture
Step by Step Guide
- Approach 1 : Plain old simple java way
let us introduce CompletableFuture with different approaches
Approach 2 : Parallel service calls and collect data using CompletableFuture::get()
Approach 3 : Parallel service calls and collect data using CompletableFuture::join()
Approach 4 : Parallel service calls and group futures using CompletableFuture::allOf() and then collect each data
Approach 5 (
My Preference
) : Parallel service calls and group futures using CompletableFuture::allOf() and then assemble using CompletableFuture::thenApply()
Approach 1 :
Goal is to get make multiple service calls in sequence (one after other) and then collect the response and aggregate to get final composite response.
pseudo code
Client inputs ID to get Person Info + Address Info (Composite PersonWithAddressInfo)
To achieve this -> make sequential service requests
trigger a Person service call which interacts with PersonRepository by ID to get Person Info
trigger a Address service call which interacts with AddressRepository by ID to get Address Info
finally, return PersonWithAddressInfo where Person Info + Address Info is aggregated.
code
public class Address {
private String addressId;
private String addressLine1;
private String addressLine2;
private String addressLine3;
private String zipCode;
private String city;
private String state;
private String country;
public Address(
String addressId,
String addressLine1,
String addressLine2,
String addressLine3,
String zipCode,
String city,
String state,
String country) {
this.addressId = addressId;
this.addressLine1 = addressLine1;
this.addressLine2 = addressLine2;
this.addressLine3 = addressLine3;
this.zipCode = zipCode;
this.city = city;
this.state = state;
this.country = country;
}
// write toString, hashCode or use lombok annotation
}
public class AddressRepository {
public Address getAddressById(String id){
// hard coded ... change it to real data // fetch from DB or API
return new Address( "1","add1", "add2", "add3","55305",
"Mpls", "MN","USA");
}
}
public class AddressService {
private AddressRepository addressRepository;
public AddressService(AddressRepository addressRepository) {
this.addressRepository = addressRepository;
}
public Address getAddressById(String id) {
return addressRepository.getAddressById(id);
}
}
public class Person {
public Person(String id, String firstName, String age) {
this.id = id;
this.firstName = firstName;
this.age = age;
}
private String id;
private String firstName;
private String age;
// write toString, hashCode or use lombok annotation
}
public class PersonRepository {
public Person getPersonById(String id) {
// hard coded ... change it to real data // fetch from DB or API
return new Person("1", "hello", "22");
}
}
public class PersonService {
private PersonRepository personRepository;
public PersonService(PersonRepository personRepository) {
this.personRepository = personRepository;
}
public Person getPersonById(String id){
return personRepository.getPersonById(id);
}
}
public class PersonWithAddress {
private Person person;
private Address address;
public PersonWithAddress(Person person, Address address) {
this.person = person;
this.address = address;
}
// write toString, hashCode or use lombok annotation
}
public class CFMain {
static PersonRepository personRepository =
new PersonRepository();
static PersonService personService =
new PersonService(personRepository);
static AddressRepository addressRepository =
new AddressRepository();
static AddressService addressService =
new AddressService(addressRepository);
public static void sequencial(String personId) {
final Person personById =
personService.getPersonById(personId);
final Address addressById =
addressService.getAddressById(personId);
final PersonWithAddress personWithAddress =
new PersonWithAddress(personById, addressById);
System.out.println(personWithAddress);
}
public static void main(String[] args) {
sequencial("1");
}
}
Good that we were able to achieve that goal but can this be improved?
if yes how?Answer : execute each service call in parallel to improve response time (see Approach 2)
Approach 2 :
Goal is to execute multiple service calls in parallel using Completable future and collect data from each CompletableFuture using get()
get() throws InterruptedException, ExecutionException
(CheckedException)
pseudo code
Client inputs ID to get Person Info + Address Info (Composite PersonWithAddressInfo)
To achieve this -> make parallel service requests
trigger a Person service call which interacts with PersonRepository by ID to get Person Info using java CompletableFuture (say CompletableFuture 1)
trigger a Address service call which interacts with AddressRepository by ID to get Address Info using java CompletableFuture (say CompletableFuture 2)
now, block each CompletableFuture to get the data (one by one / sequential) use CompletableFuture::get method
finally, return PersonWithAddressInfo where Person Info + Address Info is aggregated.
code improvement
import java.util.concurrent.CompletableFuture;
public class CFMain {
static PersonRepository personRepository =
new PersonRepository();
static PersonService personService = new PersonService(personRepository);
static AddressRepository addressRepository =
new AddressRepository();
static AddressService addressService = new AddressService(addressRepository);
public static void main(String[] args) {
parallelServiceCallWithCFUsingGet("1");
}
public static void parallelServiceCallWithCFUsingGet(String personId) {
final CompletableFuture<Person> personCompletableFuture =
CompletableFuture.supplyAsync(() ->
personService.getPersonById(personId));
final CompletableFuture<Address> addressCompletableFuture =
CompletableFuture.supplyAsync(() ->
addressService.getAddressById(personId));
try {
final Person person =
personCompletableFuture.get(); // until this is not complete below line is not executed
final Address address =
addressCompletableFuture.get(); // until this is not complete below line is not executed
// below line will be executed based on the slowest service response
System.out.println(new PersonWithAddress(person, address)); // time taken to return is based on the slowest service response time
} catch (Exception exp) {
System.err.println(exp);
}
}
}
Good that we were able to achieve that goal but can this be improved?
if yes how?Answer : use CompletableFuture join() instead of get()
(see Approach 3)
Approach 3 :
Goal is to execute multiple service calls in parallel using Completable future and collect data from each CompletableFuture using join()
join() throws UnCheckedException
(RuntimeException)
code improvement
import java.util.concurrent.CompletableFuture;
public class CFMain {
static PersonRepository personRepository =
new PersonRepository();
static PersonService personService =
new PersonService(personRepository);
static AddressRepository addressRepository =
new AddressRepository();
static AddressService addressService =
new AddressService(addressRepository);
public static void main(String[] args) {
parallelServiceCallWithCFUsingJoin("1");
}
public static void parallelServiceCallWithCFUsingJoin(String personId) {
final CompletableFuture<Person> personCompletableFuture =
CompletableFuture.supplyAsync(() ->
personService.getPersonById(personId));
final CompletableFuture<Address> addressCompletableFuture =
CompletableFuture.supplyAsync(() -> addressService.getAddressById(personId));
final Person person =
personCompletableFuture.join(); // until this is not complete below line is not executed
final Address address =
addressCompletableFuture.join(); // until this is not complete below line is not executed
// below line will be executed based on the slowest service response
System.out.println(new PersonWithAddress(person, address)); // time taken to return is based on the slowest service response time
}
}
Good that we were able to achieve that goal but can this be improved?
if yes how?Answer : use CompletableFuture allOf(CompletableFuture<?>... cfs) to add all futures (see Approach 4)
Approach 4:
Goal is to execute multiple service calls in parallel using Completable future and use allOf then collect data from each CompletableFuture using join() or get()
code improvement
import java.util.concurrent.CompletableFuture;
public class CFMain {
static PersonRepository personRepository =
new PersonRepository();
static PersonService personService =
new PersonService(personRepository);
static AddressRepository addressRepository =
new AddressRepository();
static AddressService addressService =
new AddressService(addressRepository);
public static void main(String[] args) {
parallelServiceCallUseAllToMergeFuturesDataUseAllOfAndGet("1");
parallelServiceCallUseAllToMergeFuturesDataUseAllOfAndJoin("1");
}
public static void parallelServiceCallUseAllToMergeFuturesDataUseAllOfAndGet(String personId) {
final CompletableFuture<Person> personCompletableFuture =
CompletableFuture.supplyAsync(() ->
personService.getPersonById(personId));
final CompletableFuture<Address> addressCompletableFuture =
CompletableFuture.supplyAsync(() ->
addressService.getAddressById(personId));
final CompletableFuture<Void> completableFutureAllOf =
CompletableFuture.allOf(personCompletableFuture, addressCompletableFuture);
try {
completableFutureAllOf.get(); // time taken to return is based on the slowest service response/ slowest future response
final Person person =
personCompletableFuture.get();
final Address address =
addressCompletableFuture.get();
System.out.println(new PersonWithAddress(person, address)); // time taken to return is based on the slowest service return time
} catch (Exception exp) {
System.err.println(exp);
}
}
public static void parallelServiceCallUseAllToMergeFuturesDataUseAllOfAndJoin(String personId) {
final CompletableFuture<Person> personCompletableFuture =
CompletableFuture.supplyAsync(() ->
personService.getPersonById(personId));
final CompletableFuture<Address> addressCompletableFuture =
CompletableFuture.supplyAsync(() -> addressService.getAddressById(personId));
final CompletableFuture<Void> completableFutureAllOf =
CompletableFuture.allOf(
personCompletableFuture,
addressCompletableFuture);
completableFutureAllOf.join(); // time taken to return is based on the slowest future call
final Person person =
personCompletableFuture.join();
final Address address =
addressCompletableFuture.join();
System.out.println(new PersonWithAddress(person, address)); // time taken to return is based on the slowest service response time
}
}
Good that we were able to achieve that goal but can this be improved?
if yes how? Answer :Refactor Approach 4, assemble CompletableFuture::allOf() return type with thenApply() (see Approach 5)
Approach 5:
Goal is to execute multiple service calls in parallel using Completable future and use allOf then aggregate data using thenApply()
pseudo code
Client inputs ID to get Person Info + Address Info (Composite PersonWithAddressInfo)
To achieve this -> make parallel service requests
trigger a Person service call which interacts with PersonRepository by ID to get Person Info using java CompletableFuture (say CompletableFuture 1)
trigger a Address service call which interacts with AddressRepository by ID to get Address Info using java CompletableFuture (say CompletableFuture 2)
add these CompletableFutures to CompletableFuture::allOf
after the above step, compose/assemble above CompletableFuture using thenApply()
now, get PersonWithAddressInfo where Person Info + Address Info data is aggregated using CompletableFuture.join()
code improvement
import java.util.concurrent.CompletableFuture;
public class CFMain {
static PersonRepository personRepository =
new PersonRepository();
static PersonService personService =
new PersonService(personRepository);
static AddressRepository addressRepository =
new AddressRepository();
static AddressService addressService =
new AddressService(addressRepository);
public static void main(String[] args) {
parallelServiceCallUseAllToMergeFuturesDataUseAllAndThenApply("1");
}
public static void parallelServiceCallUseAllToMergeFuturesDataUseAllAndThenApply(String personId) {
final CompletableFuture<Person> personCompletableFuture =
CompletableFuture.supplyAsync(() ->
personService.getPersonById(personId));
final CompletableFuture<Address> addressCompletableFuture =
CompletableFuture.supplyAsync(() ->
addressService.getAddressById(personId));
final CompletableFuture<Void> completableFutureAllOf =
CompletableFuture.allOf(personCompletableFuture, addressCompletableFuture);
final CompletableFuture<PersonWithAddress> personWithAddressCompletableFuture =
completableFutureAllOf.thenApply(
(voidInput) ->
new PersonWithAddress(
personCompletableFuture.join(),
addressCompletableFuture.join()));
System.out.println(personWithAddressCompletableFuture.join()); // time taken to return is based on the slowest service response time
}
}
Posted on January 19, 2020
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.