Spring Batch Remote Chunking with Spring Integration And kafka Broker
Diags
Posted on January 10, 2023
Knowledge comes through experience, everything else is just information.
Albert Einstein
Introduction
In this article, I offer you feedback on large volume processing in banking information systems. I will not describe here the definition of certain notions. I assume that these are already acquired. ☺️
To know :
● Spring batch (itemReader, itemProcessor, itemWriter)
● Apache Kafka
● Mysql database
● Lombok
● Spring Integration.
● Scheduler
If these notions are not yet acquired, it will be imperative to read the official documentation: See the references section
Problem
The management of a mass of deferred data streams is generally managed by a batch system. A batch is a planned collection of data in an information system. Two questions seem to us to be very important in the collection of planned data.
How to improve performance?
How to handle error handling (error recovery)?
Throughout this article, I will answer these two questions.
To better understand the problem, we will use the following
scenario:
➢ A French bank must digitize a set of documents every day (transaction slips, contracts for opening/closing accounts with legal persons or not). The GED implementation of this data must be managed by a system of fairly specific business rules while respecting the RGPD. The bank would like the GED to be placed at a later time between 10 p.m. and 11:59 p.m. To help us we are going to draw up an architecture to collect this information and put it in GED.
Architecture
To collect the bank's flows, we will use Spring batch in 'remote' mode, coupled with an apache kafka broker, mysql to illustrate our database and Spring integration 🤔.
First of all, you should know that a batch consists of three major parts:
● Reading on external files, or via a database, or via an external API
● A management part of the data read (specific processing linked to business rules, example: the cash code of a bank must be on 5 alphanumeric characters)
● Writing on different supports (in database, in file (xml, csv, text…), apis).
The following architecture will be very useful for the rest of the article:
Figure 1: General Architecture
We have a KAFKA broker which allows communication between the master and the different partitions.
The following figure (Figure 2) from the general documentation shows us this in depth.
Figure 2 : Architecture générale.
Script:
● The master will read the (external) data streams. Once the reading is done,
● The master subscribes to an apache kafka broker to send read data, it will dispatch the flow to different processes called SLAVES.
● Each slave guarantees the data read and its writing.
● The master can launch several slaves in parallel.
● At the end of slave processing
● The master aggregates all of the writes from the slaves to make a single output.
● In the event of a slave error.
● The error is sent to the master which has the possibility of replaying the same stream by launching a new slave.
Advantage
The advantage of this architecture is that in terms of performance the master can scale horizontally. That is to say increased the scalability of slaves. In addition, when a slave is in error, the data is not lost. It is simply sent back to the broker. A new slave can take over the stream and process it automatically. This automation of error recovery avoids physical intervention.
Inconvenience
The only problem in this system is that you have to invest in tools:
● Have a server for the broker
● Have competent resources on technologies and tools.
SIDE PROGRAM
- Technical prerequisites:
- Java 17
- Apache kafka
- Spring Integration
- Spring Batch 5.0
- Spring Boot 3
- Mysql
- Docker
PROJECT STRUCTURE
DATA
For the article we will use this csv 🙂 which is on the root of the project:
We are using a Maven project here are the dependencies for the article 👍
<?xml version="1.0" encoding="UTF-8"?>
<version>0.0.1-SNAPSHOT</version>
<name>batch</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>17</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<dependency>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-jpa</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
We will go over the code part:
MASTER
We will start with the master and its configuration. We will need some dependencies and annotations for its configuration:
@Profile("master")
@Configuration
@EnableBatchProcessing
@EnableBatchIntegration
@PropertySource("classpath:application-master.properties")
@Component
@Data
//@Import({DatabaseConfig.class})
public class MasterJob {
@Autowired
private RemoteChunkingManagerStepBuilderFactory remoteChunkingManagerStepBuilderFactory;
@Autowired
private KafkaTemplate<String, TransactionDto> masterKafkaTemplate;
Figure 4 : Configuration du master
“@profile("master")” is required to run the master. It is injected into the spring component with other annotations essential for the launch of the master such as:
@Configuration: spring batch configuration
@EnableBatchProcessing: this allows us to manage the batch execution set
@EnableBatchIntegration: enable spring integration.
@PropertySource("classpath:application-master.properties"):
Inject the master configuration file into the batch:
KafkaTemplate is injected into the spring component to allow communication between the master and the partitions.
Now we will see how the job is configured.
@Bean(name = "jobMaster")
public Job masterJob(JobRepository jobRepository) {
return new JobBuilder("jobMaster", jobRepository)
.incrementer(new RunIdIncrementer())
// .listener(jobListener)
.start(masterStep())
.build();
}
For the job to run, we inject it with a masterStep():
/*
* Configure master step components
*/
@Bean(name = "masterStep")
public TaskletStep masterStep() {
return this.remoteChunkingManagerStepBuilderFactory.get("masterStep")
.<TransactionDto, TransactionDto>chunk(10)
.reader(masterReader())
.outputChannel(outboundChannel()) //produces the chunkRequest<Transaction> to kafka wokers
.inputChannel(inboundChannel()) //consumes the chunkResponse<Transaction> from kafka workers
.allowStartIfComplete(Boolean.TRUE)
.allowStartIfComplete(true)
.build();
}
This masterStep will chunk every 10 items read. We read through a csv file which is on the root of the project.
@Bean
public FlatFileItemReader<TransactionDto> masterReader() {
return new FlatFileItemReaderBuilder<TransactionDto>()
.resource(new ClassPathResource("/data.csv"))
.name("Transaction")
.delimited()
.delimiter(",")
.names("id", "name", "transactionDate")
.linesToSkip(1) //skipping the header of the file
.fieldSetMapper(fieldSet -> {
TransactionDto data = new TransactionDto();
data.setId(fieldSet.readInt("id"));
data.setName(fieldSet.readString("name"));
data.setTransactionDate(fieldSet.readString("transactionDate"));
return data;
})
.targetType(TransactionDto.class)
.build();
}
Then every 10 elements read from the csv file are sent to the partitions channel.
● outputChannel(outboundChannel())
● outboundFlow
/*
* Configure outbound flow (requests going to workers)
*/
@Bean
public DirectChannel outboundChannel() {
return new DirectChannel();
}
@Bean
public IntegrationFlow outboundFlow() {
var producerMessageHandler = new KafkaProducerMessageHandler<String, TransactionDto>(this.masterKafkaTemplate);
producerMessageHandler.setTopicExpression(new LiteralExpression("requestsForWokers"));
return IntegrationFlow.from(outboundChannel())
.log(LoggingHandler.Level.WARN)
.handle(producerMessageHandler)
.get();
}
Spring integration allows you to configure exchanges through a Kafka topic.
Then when the wokers have finished their treatment. The master receives the response through another channel from the Kafka broker.
/*
* Configure inbound flow (replies coming from workers)
*/
@Bean
public QueueChannel inboundChannel() {
return new QueueChannel();
}
@Bean
public IntegrationFlow inboundFlow(ConsumerFactory<String, TransactionDto> consumerFactory) {
return IntegrationFlow.from(Kafka.messageDrivenChannelAdapter(consumerFactory, "repliesFromWokers"))
.log(LoggingHandler.Level.WARN)
.channel(inboundChannel())
.get();
}
Spring integration allows to receive exchanges with wokers.
It should be noted that we can add a listener to the job in order to act on all the states of the job. According to our scenario cited above we can query a database. To retrieve the last execution date of the batch and all the data whose status is “R” as recovery or “NA” not archived.
@Override
public void beforeJob(JobExecution jobExecution) {
// check if we can go to database for somme process.
}
We can also add to the job a tasklet to summarize at the end of the job. This tasklet will describe all the processing. Or other apis can feed on it.
@Override
public void afterJob(JobExecution jobExecution) {
// we can create en resume file
}
WOKER
For the woker we will have practically the same configuration.
@Profile("woker")
@Configuration
@EnableBatchProcessing
@EnableBatchIntegration
@PropertySource("classpath:application-worker.properties")
//@Import({DatabaseConfig.class})
@Slf4j
@Component
public class WokerJob {
@Autowired
private RemoteChunkingWorkerBuilder<TransactionDto, TransactionDto> remoteChunkingWorkerBuilder;
@Autowired
private KafkaTemplate<String, TransactionDto> transactionKafkaTemplate;
@Autowired
private DataSource dataSource;
We also use a database so that each woker can write to it. Here is his step.
@Bean
public IntegrationFlow workerStep() {
return this.remoteChunkingWorkerBuilder
.inputChannel(inboundChannel()) //consumes the chunkRequest<Transaction> from kafka wokers
.outputChannel(outboundChannel()) //produces the chunkResponse<Transaction> to kafka master
.itemProcessor(itemProcessor())
.itemWriter(itemWriter())
.build();
}
First we listen to the flow coming from the master.
/*
* Configure inbound flow (requests coming from the manager)
*/
@Bean
public DirectChannel inboundChannel() {
return new DirectChannel();
}
@Bean
public IntegrationFlow inboundFlow(ConsumerFactory<String, TransactionDto> consumerFactory) {
return IntegrationFlow.from(Kafka.messageDrivenChannelAdapter(consumerFactory, "requestsForWokers"))
.log(LoggingHandler.Level.WARN)
.channel(inboundChannel())
.get();
}
Then, we process this flow via the itemProcessor then the itemWriter.
/*
* Configure worker components
*/
@Bean
public ItemProcessor<TransactionDto, TransactionDto> itemProcessor() {
return item -> {
System.out.println("processing item " + item);
return item;
};
}
@Bean
public ItemWriter<TransactionDto> itemWriter() {
return new JdbcBatchItemWriterBuilder<TransactionDto>()
.beanMapped()
.dataSource(dataSource)
.sql("INSERT INTO TRANSACTDTO (id,name,transactionDate) VALUES (:id, :name, :transactionDate)")
.build();
}
At the end we send to the master information so that the master makes a committee of the chunk for each 10 elements.
/*
* Configure outbound flow (replies going to the manager)
*/
@Bean
public DirectChannel outboundChannel() {
return new DirectChannel();
}
@Bean
public IntegrationFlow outboundFlow() {
var producerMessageHandler = new KafkaProducerMessageHandler<String, TransactionDto>(transactionKafkaTemplate);
producerMessageHandler.setTopicExpression(new LiteralExpression("repliesFromWokers"));
return IntegrationFlow.from(outboundChannel())
.log(LoggingHandler.Level.WARN)
.handle(producerMessageHandler)
.get();
}
}
**
ETAPE CONFIGURATIONS PROJET
**
application-master.properties
server.port=8087
###
#spring.kafka.producer.group-id=producer-master-g
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=fr.hn.services.batch.utils.ChunkRequestSerializer
spring.kafka.producer.properties.spring.json.trusted.packages=*
#####
spring.kafka.consumer.group-id=consumer-master-g
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=fr.hn.services.batch.utils.ChunkResponseDeserializer
#####
broker.url=tcp://localhost:8087
application-mastworker.properties:
server.port=8088
spring.kafka.consumer.group-id=consumer-woker-g
#spring.kafka.producer.group-id=producer-woker-g
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=fr.hn.services.batch.utils.ChunkResponseSerializer
#########
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=fr.hn.services.batch.utils.ChunkRequestDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=*
######
docker-compose.yml
version: "3"
services:
zookeeper:
image: 'bitnami/zookeeper:latest'
ports:
- '2181:2181'
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
kafka:
image: 'bitnami/kafka:latest'
ports:
- '9092:9092'
environment:
- KAFKA_BROKER_ID=1
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT
- KAFKA_CFG_LISTENERS=CLIENT://:9093,EXTERNAL://:9092
- KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka:9093,EXTERNAL://localhost:9092
- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=CLIENT
depends_on:
- zookeeper
kafka-ui:
image: provectuslabs/kafka-ui:latest
ports:
- '8080:8080'
environment:
- KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka:9093
depends_on:
- kafka
mysql:
image: mysql
ports:
- '3306:3306'
command: --default-authentication-plugin=mysql_native_password
restart: always
environment:
MYSQL_DATABASE: 'db'
# So you don't have to use root, but you can if you like
MYSQL_USER: 'user'
# You can use whatever password you like
MYSQL_PASSWORD: 'password'
# Password for root access
MYSQL_ROOT_PASSWORD: 'password'
volumes:
- "./scripts/schema.sql:/docker-entrypoint-initdb.d/schema.sql"
Our POJO works the following:
@Component
@Getter
@Setter
public class TransactionDto implements Serializable {
private int id;
private String name;
private String transactionDate;
}
Our utility classes:
● To deserialize the objects transferred into the broker from
@Component
public class ChunkRequestDeserializer implements Deserializer<ChunkRequest<TransactionDto>> {
@Override
public ChunkRequest deserialize(String s, byte[] bytes) {
try {
return (ChunkRequest) new DefaultDeserializer().deserialize(new ByteArrayInputStream(bytes));
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
}
● To serialize the objects sent from the woker.
@Component
public class ChunkRequestSerializer implements Serializer<ChunkRequest<TransactionDto>> {
@Override
public byte[] serialize(String s, ChunkRequest<TransactionDto> chunkRequest) {
if (chunkRequest == null) {
return new byte[0];
}
return SerializationUtils.serialize(chunkRequest);
}
}
● To serialize the received stream.
@Component
public class ChunkResponseSerializer implements Serializer<ChunkResponse> {
@Override
public byte[] serialize(String s, ChunkResponse chunkResponse) {
return SerializationUtils.serialize(chunkResponse);
}
}
● To deserialize responses.
@Component
public class ChunkResponseDeserializer implements Deserializer<ChunkResponse> {
@Override
public ChunkResponse deserialize(String s, byte[] bytes) {
try {
if (bytes == null) {
return null;
}
return (ChunkResponse) new DefaultDeserializer().deserialize(new ByteArrayInputStream(bytes));
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
}
BATCH EXECUTION
To run the batch we use a Scheduler launcher to launch the batch every 1 second.
@Profile("master")
@Component
@Slf4j
@EnableScheduling
public class BatchLauncher {
@Autowired
private Job job;
@Autowired
private JobLauncher jobLauncher;
@Scheduled(cron = "* * * * * ?")
public void perform() throws Exception {
JobParameters jobParameters = new JobParametersBuilder()
.addString("joId", String.valueOf(System.currentTimeMillis()))
.toJobParameters();
log.info("", job.getName());
jobLauncher.run(job, jobParameters);
}
}
RÉSULTATS
Figure 5 : Docker Images running
Figure 6 : Exécution master et Woker.
Figure 7 : Tables in db.
Figure 8 : Table TRANSACTDTO.
Figure 9 : Broker kafka with partitions.
Figure 10 : Messages in broker.
Figure 11 : Example of message in the broker.
CONCLUSION
In summary, this article shows how you can gain performance while managing error recovery. With this ecosystem, there is spring batch, spring integration and kafka listener. The major interest is that we can always increase the capacity of the wokers for more sizing. We can also partition the broker as many times as we want. In addition, each woker has its own JVM. The choice of broker is important. Because Kafka allows flow recovery and data tolerance compared to other brokers. The only disadvantage is that you have to invest in tools and human resources.
REFERENCES
https://docs.spring.io/spring-batch/docs/current/reference/html/
https://www.h2database.com/html/main.html
https://docs.spring.io/spring-batch/docs/current/reference/html/
https://docs.spring.io/spring-batch/docs/current/reference/html/job.html#javaConfig
https://docs.spring.io/spring-kafka/docs/2.5.5.RELEASE/reference/html/#spring-integration
https://docs.spring.io/spring-batch/docs/current/reference/html/spring-batch-integration.html#remote-chunking
https://docs.spring.io/spring-batch/docs/current/api/org/springframework/batch/integration/chunk/RemoteChunkingManagerStepBuilderFactory.html
GITHUB
Posted on January 10, 2023
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.
Related
November 30, 2024