Spring Batch Remote Chunking with Spring Integration And kafka Broker

diags

Diags

Posted on January 10, 2023

Spring Batch Remote Chunking with Spring Integration And kafka Broker

Knowledge comes through experience, everything else is just information.
Albert Einstein

Introduction

In this article, I offer you feedback on large volume processing in banking information systems. I will not describe here the definition of certain notions. I assume that these are already acquired. ☺️
To know :
● Spring batch (itemReader, itemProcessor, itemWriter)
● Apache Kafka
● Mysql database
● Lombok
● Spring Integration.
● Scheduler
If these notions are not yet acquired, it will be imperative to read the official documentation: See the references section

Problem

The management of a mass of deferred data streams is generally managed by a batch system. A batch is a planned collection of data in an information system. Two questions seem to us to be very important in the collection of planned data.

  • How to improve performance?

  • How to handle error handling (error recovery)?
    Throughout this article, I will answer these two questions.
    To better understand the problem, we will use the following

scenario:
➢ A French bank must digitize a set of documents every day (transaction slips, contracts for opening/closing accounts with legal persons or not). The GED implementation of this data must be managed by a system of fairly specific business rules while respecting the RGPD. The bank would like the GED to be placed at a later time between 10 p.m. and 11:59 p.m. To help us we are going to draw up an architecture to collect this information and put it in GED.
Architecture
To collect the bank's flows, we will use Spring batch in 'remote' mode, coupled with an apache kafka broker, mysql to illustrate our database and Spring integration 🤔.
First of all, you should know that a batch consists of three major parts:
● Reading on external files, or via a database, or via an external API
● A management part of the data read (specific processing linked to business rules, example: the cash code of a bank must be on 5 alphanumeric characters)
● Writing on different supports (in database, in file (xml, csv, text…), apis).
The following architecture will be very useful for the rest of the article:

Image description
Figure 1: General Architecture

We have a KAFKA broker which allows communication between the master and the different partitions.
The following figure (Figure 2) from the general documentation shows us this in depth.

Image description
Figure 2 : Architecture générale.

Script:
● The master will read the (external) data streams. Once the reading is done,
● The master subscribes to an apache kafka broker to send read data, it will dispatch the flow to different processes called SLAVES.
● Each slave guarantees the data read and its writing.
● The master can launch several slaves in parallel.
● At the end of slave processing
● The master aggregates all of the writes from the slaves to make a single output.
● In the event of a slave error.
● The error is sent to the master which has the possibility of replaying the same stream by launching a new slave.
Advantage
The advantage of this architecture is that in terms of performance the master can scale horizontally. That is to say increased the scalability of slaves. In addition, when a slave is in error, the data is not lost. It is simply sent back to the broker. A new slave can take over the stream and process it automatically. This automation of error recovery avoids physical intervention.

Inconvenience
The only problem in this system is that you have to invest in tools:
● Have a server for the broker
● Have competent resources on technologies and tools.

SIDE PROGRAM

  1. Technical prerequisites:
  • Java 17
  • Apache kafka
  • Spring Integration
  • Spring Batch 5.0
  • Spring Boot 3
  • Mysql
  • Docker

PROJECT STRUCTURE

Image description
Figure 3 : Projet Structure

DATA
For the article we will use this csv 🙂 which is on the root of the project:

Image description
Figure 4: data.csv

We are using a Maven project here are the dependencies for the article 👍

<?xml version="1.0" encoding="UTF-8"?>

   <version>0.0.1-SNAPSHOT</version>
   <name>batch</name>
   <description>Demo project for Spring Boot</description>
   <properties>
       <java.version>17</java.version>
   </properties>
   <dependencies>
       <dependency>
           <groupId>org.springframework.boot</groupId>
           <artifactId>spring-boot-starter-batch</artifactId>
       </dependency>
       <dependency>
           <groupId>com.mysql</groupId>
           <artifactId>mysql-connector-j</artifactId>
           <scope>runtime</scope>
       </dependency>
       <dependency>
           <groupId>org.springframework.boot</groupId>
           <artifactId>spring-boot-starter-data-jpa</artifactId>
       </dependency>
       <dependency>
           <groupId>org.springframework.boot</groupId>
           <artifactId>spring-boot-starter-integration</artifactId>
       </dependency>
       <dependency>
           <groupId>org.springframework.boot</groupId>
           <artifactId>spring-boot-starter-jdbc</artifactId>
       </dependency>
       <dependency>
           <groupId>org.springframework.integration</groupId>
           <artifactId>spring-integration-jpa</artifactId>
       </dependency>
       <dependency>
           <groupId>org.springframework.integration</groupId>
           <artifactId>spring-integration-kafka</artifactId>
       </dependency>
       <dependency>
           <groupId>org.springframework.kafka</groupId>
           <artifactId>spring-kafka</artifactId>
       </dependency>
       <dependency>
           <groupId>org.projectlombok</groupId>
           <artifactId>lombok</artifactId>
           <optional>true</optional>
       </dependency>
Enter fullscreen mode Exit fullscreen mode

We will go over the code part:

MASTER

We will start with the master and its configuration. We will need some dependencies and annotations for its configuration:

@Profile("master")
@Configuration
@EnableBatchProcessing
@EnableBatchIntegration
@PropertySource("classpath:application-master.properties")
@Component
@Data
//@Import({DatabaseConfig.class})
public class MasterJob {
   @Autowired
   private RemoteChunkingManagerStepBuilderFactory remoteChunkingManagerStepBuilderFactory;
   @Autowired
   private KafkaTemplate<String, TransactionDto> masterKafkaTemplate;
Enter fullscreen mode Exit fullscreen mode

Figure 4 : Configuration du master

@profile("master")” is required to run the master. It is injected into the spring component with other annotations essential for the launch of the master such as:
@Configuration: spring batch configuration
@EnableBatchProcessing: this allows us to manage the batch execution set
@EnableBatchIntegration: enable spring integration.
@PropertySource("classpath:application-master.properties"):
Inject the master configuration file into the batch:
KafkaTemplate is injected into the spring component to allow communication between the master and the partitions.
Now we will see how the job is configured.

@Bean(name = "jobMaster")
public Job masterJob(JobRepository jobRepository) {
   return new JobBuilder("jobMaster", jobRepository)
           .incrementer(new RunIdIncrementer())
          // .listener(jobListener)
           .start(masterStep())
           .build();
}
Enter fullscreen mode Exit fullscreen mode

For the job to run, we inject it with a masterStep():

/*
* Configure master step components
*/
@Bean(name = "masterStep")
public TaskletStep masterStep() {
   return this.remoteChunkingManagerStepBuilderFactory.get("masterStep")
           .<TransactionDto, TransactionDto>chunk(10)
           .reader(masterReader())
           .outputChannel(outboundChannel()) //produces the chunkRequest<Transaction> to kafka wokers
           .inputChannel(inboundChannel()) //consumes the chunkResponse<Transaction> from kafka workers
           .allowStartIfComplete(Boolean.TRUE)
           .allowStartIfComplete(true)
           .build();
}
Enter fullscreen mode Exit fullscreen mode

This masterStep will chunk every 10 items read. We read through a csv file which is on the root of the project.

@Bean
public FlatFileItemReader<TransactionDto> masterReader() {
   return new FlatFileItemReaderBuilder<TransactionDto>()
           .resource(new ClassPathResource("/data.csv"))
           .name("Transaction")
           .delimited()
           .delimiter(",")
           .names("id", "name", "transactionDate")
           .linesToSkip(1)  //skipping the header of the file
           .fieldSetMapper(fieldSet -> {
               TransactionDto data = new TransactionDto();
               data.setId(fieldSet.readInt("id"));
               data.setName(fieldSet.readString("name"));
               data.setTransactionDate(fieldSet.readString("transactionDate"));
               return data;
           })
           .targetType(TransactionDto.class)
           .build();
}
Enter fullscreen mode Exit fullscreen mode

Then every 10 elements read from the csv file are sent to the partitions channel.
● outputChannel(outboundChannel())
● outboundFlow

/*
* Configure outbound flow (requests going to workers)
*/
@Bean
public DirectChannel outboundChannel() {
   return new DirectChannel();
}

@Bean
public IntegrationFlow outboundFlow() {
   var producerMessageHandler = new KafkaProducerMessageHandler<String, TransactionDto>(this.masterKafkaTemplate);
   producerMessageHandler.setTopicExpression(new LiteralExpression("requestsForWokers"));
   return IntegrationFlow.from(outboundChannel())
           .log(LoggingHandler.Level.WARN)
           .handle(producerMessageHandler)
           .get();
}
Enter fullscreen mode Exit fullscreen mode

Spring integration allows you to configure exchanges through a Kafka topic.
Then when the wokers have finished their treatment. The master receives the response through another channel from the Kafka broker.

/*
* Configure inbound flow (replies coming from workers)
*/
@Bean
public QueueChannel inboundChannel() {
   return new QueueChannel();
}

@Bean
public IntegrationFlow inboundFlow(ConsumerFactory<String, TransactionDto> consumerFactory) {
   return IntegrationFlow.from(Kafka.messageDrivenChannelAdapter(consumerFactory, "repliesFromWokers"))
           .log(LoggingHandler.Level.WARN)
           .channel(inboundChannel())
           .get();
}

Enter fullscreen mode Exit fullscreen mode

Spring integration allows to receive exchanges with wokers.
It should be noted that we can add a listener to the job in order to act on all the states of the job. According to our scenario cited above we can query a database. To retrieve the last execution date of the batch and all the data whose status is “R” as recovery or “NA” not archived.

@Override
public void beforeJob(JobExecution jobExecution) {
   // check if we can go to database for somme process.
}
Enter fullscreen mode Exit fullscreen mode

We can also add to the job a tasklet to summarize at the end of the job. This tasklet will describe all the processing. Or other apis can feed on it.

@Override
public void afterJob(JobExecution jobExecution) {
   // we can create en resume file
}
Enter fullscreen mode Exit fullscreen mode

WOKER
For the woker we will have practically the same configuration.

@Profile("woker")
@Configuration
@EnableBatchProcessing
@EnableBatchIntegration
@PropertySource("classpath:application-worker.properties")
//@Import({DatabaseConfig.class})
@Slf4j
@Component
public class WokerJob {

   @Autowired
   private RemoteChunkingWorkerBuilder<TransactionDto, TransactionDto> remoteChunkingWorkerBuilder;

   @Autowired
   private KafkaTemplate<String, TransactionDto> transactionKafkaTemplate;

   @Autowired
   private DataSource dataSource;
Enter fullscreen mode Exit fullscreen mode

We also use a database so that each woker can write to it. Here is his step.

@Bean
public IntegrationFlow workerStep() {
   return this.remoteChunkingWorkerBuilder
           .inputChannel(inboundChannel()) //consumes the chunkRequest<Transaction> from kafka wokers
           .outputChannel(outboundChannel()) //produces the chunkResponse<Transaction> to kafka master
           .itemProcessor(itemProcessor())
           .itemWriter(itemWriter())
           .build();
}
Enter fullscreen mode Exit fullscreen mode

First we listen to the flow coming from the master.

/*
* Configure inbound flow (requests coming from the manager)
*/
@Bean
public DirectChannel inboundChannel() {
   return new DirectChannel();
}
@Bean
public IntegrationFlow inboundFlow(ConsumerFactory<String, TransactionDto> consumerFactory) {
   return IntegrationFlow.from(Kafka.messageDrivenChannelAdapter(consumerFactory, "requestsForWokers"))
           .log(LoggingHandler.Level.WARN)
           .channel(inboundChannel())
           .get();
}
Enter fullscreen mode Exit fullscreen mode

Then, we process this flow via the itemProcessor then the itemWriter.


/*
* Configure worker components
*/
@Bean
public ItemProcessor<TransactionDto, TransactionDto> itemProcessor() {
   return item -> {
       System.out.println("processing item " + item);
       return item;
   };
}

@Bean
public ItemWriter<TransactionDto> itemWriter() {
   return new JdbcBatchItemWriterBuilder<TransactionDto>()
           .beanMapped()
           .dataSource(dataSource)
           .sql("INSERT INTO TRANSACTDTO (id,name,transactionDate) VALUES (:id, :name, :transactionDate)")
           .build();
}
Enter fullscreen mode Exit fullscreen mode

At the end we send to the master information so that the master makes a committee of the chunk for each 10 elements.

 /*
    * Configure outbound flow (replies going to the manager)
    */
   @Bean
   public DirectChannel outboundChannel() {
       return new DirectChannel();
   }

   @Bean
   public IntegrationFlow outboundFlow() {
       var producerMessageHandler = new KafkaProducerMessageHandler<String, TransactionDto>(transactionKafkaTemplate);
       producerMessageHandler.setTopicExpression(new LiteralExpression("repliesFromWokers"));
       return IntegrationFlow.from(outboundChannel())
               .log(LoggingHandler.Level.WARN)
               .handle(producerMessageHandler)
               .get();
   }
}
Enter fullscreen mode Exit fullscreen mode

**

ETAPE CONFIGURATIONS PROJET

**
application-master.properties

server.port=8087
###
#spring.kafka.producer.group-id=producer-master-g
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=fr.hn.services.batch.utils.ChunkRequestSerializer
spring.kafka.producer.properties.spring.json.trusted.packages=*
#####
spring.kafka.consumer.group-id=consumer-master-g
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=fr.hn.services.batch.utils.ChunkResponseDeserializer
#####
broker.url=tcp://localhost:8087
application-mastworker.properties:
server.port=8088
spring.kafka.consumer.group-id=consumer-woker-g
#spring.kafka.producer.group-id=producer-woker-g
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=fr.hn.services.batch.utils.ChunkResponseSerializer
#########
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=fr.hn.services.batch.utils.ChunkRequestDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=*
######

Enter fullscreen mode Exit fullscreen mode

docker-compose.yml

version: "3"
services:
 zookeeper:
   image: 'bitnami/zookeeper:latest'
   ports:
     - '2181:2181'
   environment:
     - ALLOW_ANONYMOUS_LOGIN=yes
 kafka:
   image: 'bitnami/kafka:latest'
   ports:
     - '9092:9092'
   environment:
     - KAFKA_BROKER_ID=1
     - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
     - ALLOW_PLAINTEXT_LISTENER=yes
     - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT
     - KAFKA_CFG_LISTENERS=CLIENT://:9093,EXTERNAL://:9092
     - KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka:9093,EXTERNAL://localhost:9092
     - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=CLIENT
   depends_on:
     - zookeeper
 kafka-ui:
   image: provectuslabs/kafka-ui:latest
   ports:
     - '8080:8080'
   environment:
     - KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka:9093
   depends_on:
     - kafka
 mysql:
   image: mysql
   ports:
     - '3306:3306'
   command: --default-authentication-plugin=mysql_native_password
   restart: always
   environment:
     MYSQL_DATABASE: 'db'
     # So you don't have to use root, but you can if you like
     MYSQL_USER: 'user'
     # You can use whatever password you like
     MYSQL_PASSWORD: 'password'
     # Password for root access
     MYSQL_ROOT_PASSWORD: 'password'
   volumes:
     - "./scripts/schema.sql:/docker-entrypoint-initdb.d/schema.sql"

Enter fullscreen mode Exit fullscreen mode

Our POJO works the following:

@Component
@Getter
@Setter
public class TransactionDto implements Serializable {
   private int id;
   private String name;
   private String transactionDate;
}
Enter fullscreen mode Exit fullscreen mode

Our utility classes:
● To deserialize the objects transferred into the broker from

@Component
public class ChunkRequestDeserializer implements Deserializer<ChunkRequest<TransactionDto>> {
   @Override
   public ChunkRequest deserialize(String s, byte[] bytes) {
       try {
           return (ChunkRequest) new DefaultDeserializer().deserialize(new ByteArrayInputStream(bytes));
       } catch (IOException e) {
           e.printStackTrace();
       }
       return null;
   }
}
Enter fullscreen mode Exit fullscreen mode

● To serialize the objects sent from the woker.

@Component
public class ChunkRequestSerializer implements Serializer<ChunkRequest<TransactionDto>> {
   @Override
   public byte[] serialize(String s, ChunkRequest<TransactionDto> chunkRequest) {
       if (chunkRequest == null) {
           return new byte[0];
       }
       return SerializationUtils.serialize(chunkRequest);
   }
}

Enter fullscreen mode Exit fullscreen mode

● To serialize the received stream.

@Component
public class ChunkResponseSerializer implements Serializer<ChunkResponse> {
   @Override
   public byte[] serialize(String s, ChunkResponse chunkResponse) {
       return SerializationUtils.serialize(chunkResponse);
   }
}
Enter fullscreen mode Exit fullscreen mode

● To deserialize responses.

@Component
public class ChunkResponseDeserializer implements Deserializer<ChunkResponse> {
   @Override
   public ChunkResponse deserialize(String s, byte[] bytes) {
       try {
           if (bytes == null) {
               return null;
           }
           return (ChunkResponse) new DefaultDeserializer().deserialize(new ByteArrayInputStream(bytes));
       } catch (IOException e) {
           e.printStackTrace();
       }
       return null;
   }
}

Enter fullscreen mode Exit fullscreen mode

BATCH EXECUTION
To run the batch we use a Scheduler launcher to launch the batch every 1 second.

@Profile("master")
@Component
@Slf4j
@EnableScheduling
public class BatchLauncher {
   @Autowired
   private Job job;
   @Autowired
   private JobLauncher jobLauncher;

   @Scheduled(cron = "* * * * * ?")
   public void perform() throws Exception {
       JobParameters jobParameters = new JobParametersBuilder()
               .addString("joId", String.valueOf(System.currentTimeMillis()))
               .toJobParameters();
       log.info("", job.getName());
       jobLauncher.run(job, jobParameters);
   }
}

Enter fullscreen mode Exit fullscreen mode

RÉSULTATS

Image description
Figure 5 : Docker Images running

Image description
Figure 6 : Exécution master et Woker.
Image description
Figure 7 : Tables in db.
Image description
Figure 8 : Table TRANSACTDTO.
Image description
Figure 9 : Broker kafka with partitions.

Image description

Figure 10 : Messages in broker.

Image description

Figure 11 : Example of message in the broker.

CONCLUSION

In summary, this article shows how you can gain performance while managing error recovery. With this ecosystem, there is spring batch, spring integration and kafka listener. The major interest is that we can always increase the capacity of the wokers for more sizing. We can also partition the broker as many times as we want. In addition, each woker has its own JVM. The choice of broker is important. Because Kafka allows flow recovery and data tolerance compared to other brokers. The only disadvantage is that you have to invest in tools and human resources.

REFERENCES

https://docs.spring.io/spring-batch/docs/current/reference/html/
https://www.h2database.com/html/main.html
https://docs.spring.io/spring-batch/docs/current/reference/html/
https://docs.spring.io/spring-batch/docs/current/reference/html/job.html#javaConfig
https://docs.spring.io/spring-kafka/docs/2.5.5.RELEASE/reference/html/#spring-integration
https://docs.spring.io/spring-batch/docs/current/reference/html/spring-batch-integration.html#remote-chunking
https://docs.spring.io/spring-batch/docs/current/api/org/springframework/batch/integration/chunk/RemoteChunkingManagerStepBuilderFactory.html
GITHUB

💖 💪 🙅 🚩
diags
Diags

Posted on January 10, 2023

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related