DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

Enterprise AI Trend Report: Gain insights on ethical AI, MLOps, generative AI, large language models, and much more.

2024 Cloud survey: Share your insights on microservices, containers, K8s, CI/CD, and DevOps (+ enter a $750 raffle!) for our Trend Reports.

PostgreSQL: Learn about the open-source RDBMS' advanced capabilities, core components, common commands and functions, and general DBA tasks.

AI Automation Essentials. Check out the latest Refcard on all things AI automation, including model training, data security, and more.

Related

  • How Kafka Can Make Microservice Planet a Better Place
  • Batch Processing Large Data Sets with Spring Boot and Spring Batch
  • Lightweight Kubernetes k3s: Installation and Spring Application Example in Azure Cloud
  • Real-Time Analytics: All Data, Any Data, Any Scale, at Any Time

Trending

  • DZone's Cloud Native Research: Join Us for Our Survey (and $750 Raffle)!
  • PostgresML: Streamlining AI Model Deployment With PostgreSQL Integration
  • Python for Beginners: An Introductory Guide to Getting Started
  • Data Flow Diagrams for Software Engineering
  1. DZone
  2. Data Engineering
  3. Data
  4. High-Performance Batch Processing Using Apache Spark and Spring Batch

High-Performance Batch Processing Using Apache Spark and Spring Batch

Batch processing is dealing with a large amount of data; it actually is a method of running high-volume, repetitive data jobs and each job does a specific task.

By 
Reza Ganji user avatar
Reza Ganji
DZone Core CORE ·
May. 16, 21 · Tutorial
Like (7)
Save
Tweet
Share
24.4K Views

Join the DZone community and get the full member experience.

Join For Free

Batch processing is dealing with a large amount of data; it actually is a method of running high-volume, repetitive data jobs and each job does a specific task without user interaction. This kind of processing started from the beginning of computation and still continues and perhaps new methods, algorithms, and tools are still being introduced. The entry point of Batch Processing Systems is offline data that gathered in any type like CSV, RDBMS, or well-formed JSON or XML files, after processing data and doing the specified process on every record of data, the desired output is generated.

Batch Processing System Flow

Note: Batch processing and stream processing are two different concepts.

Batch Processing vs. Stream Processing

Sometimes these two concepts are considered the same, but in fact, they are completely different, in batch processing data is gathered once and sent to batch processing system, but stream processing continues and real-time, in batch processing systems, jobs are executed with scheduler and result of the processing could not be shown at the moment of sending jobs to the scheduler.

Batch Processing  Stream Processing 
Data is collected over time. Data streams are continuous.
Once data is collected, it's sent to a batch processing system. Data is processed piece-by-piece.
Batch processing is lengthy and it is meant for large quantitive of information that are not time-sensitive. Stream processing in real-time, which means that result of processing is ready immediately.

Case Study

Let‘s take a look at a sample system that relies on batch processing because my professional working area in developing banking and payment system most of my samples are on the banking systems; suppose that you have a Card Issuer system that issues named gift card for given names.

The input of this system is the name of the cardholder and the output is a gift card, it takes one second to issue each card, so good everything goes well until the system issues just one card, but consider a company wants to issue cards to 100000 of employees, it takes 100000 seconds if it is done sequentially, but if you have 100 thread each thread issues one card you can issue this number of cards 100x time faster.

Let's Do Something Crazy

Decide to develop a batch card issuer system. First of all, you need a reader that responsible for reading data from the source and deliver data to the processor.

staring point an abstract reader  class:

Java
 




x
15


 
1
import java.util.Collections;
2
import java.util.List;
3

          
4
public abstract class CardBatchReader {
5
    final List <String> list;
6

          
7
    public CardBatchReader(List<String> list){
8
        this.list=  Collections.unmodifiableList(list);
9
    }
10

          
11
    public abstract String getNextItem();
12
    public  Integer getNumberOfItems(){
13
        return  list.size();
14
    }
15
}



Round-robin implementation for the reader:

Java
 




xxxxxxxxxx
1
16


 
1
public class RoundRobinCardBatchReader extends CardBatchReader {
2

          
3
    Integer idx = 0;
4
    public RoundRobinCardBatchReader(List<String> cardNumberList) {
5
        super(cardNumberList);
6
    }
7

          
8
    public String getNextItem() {
9
        String result ;
10
        result = list.get(idx);
11
        idx++;
12
        return result;
13
    }
14

          
15
}
16

          



Sample card issuer service :

Java
 




xxxxxxxxxx
1
22


 
1
public class CardIssuerService {
2
    private static CardBatchReader reader;
3
    public CardIssuerService(CardBatchReader reader){
4
        this.reader=reader;
5

          
6
    }
7

          
8
    private  void IssueCard(){
9
        //suppose this is real card issue service
10
        System.out.println("Card issued  for "+reader.getNextItem()+" in thread :-> " +Thread.currentThread().getId());
11
    }
12

          
13
    public  void IssueCards(){
14
        IntStream.range(0,reader.getNumberOfItems()).parallel().forEach(
15
                c->{
16
                    IssueCard();
17
                }
18
        );
19

          
20
    }
21

          
22
}



Finally a batch executor:

Java
 




xxxxxxxxxx
1
21


 
1
public class BatchExecuter {
2

          
3
   static List<String> list= Arrays.asList(
4
            "Alex",
5
            "Bob",
6
            "Mostafa",
7
            "Yashar",
8
            "Alireza",
9
            "Fatemeh",
10
            "Jalal",
11
            "arash"
12

          
13
    ); //this data arrived from anywhere
14

          
15
    public static  void main(String args[]){
16
        CardBatchReader reader=new RoundRobinCardBatchReader(list);
17
        CardIssuerService cardIssuerService=new CardIssuerService(reader);
18
        cardIssuerService.IssueCards();
19
    }
20

          
21
}



Finally, result:

BatchExecuter


It was a simple snippet of codes that simulates a batch system as you can see for 8 people we have 8 threads, and each thread issues one card, it seems that we solve the problem!

But there is a couple of questions:

  1. Why each thread does not issue more than one card?
  2. How I can stop the batch?
  3. What happens for transaction management?
  4. And so many other questions...

All of these requirements can be applied to our code, but why? Other people have done all this for us before, why do we have to spend time doing this?

Batch processing systems mostly based on threads and parallel processing and everybody can design his own batch processing system, but why reinventing the wheel? There are many excellent batch processing frameworks that you can decide to choose and use them, Spring Batch and JBatch are two of them in this article we will just focus on Spring Batch.

Spring Batch

Spring Batch provides reusable functions that are essential in processing large volumes of records, including logging/tracing, transaction management, job processing statistics, job restart, skip, and resource management. It also provides more advanced technical services and features that will enable extremely high-volume and high-performance batch jobs through optimization and partitioning techniques. Simple as well as complex, high-volume batch jobs can leverage the framework in a highly scalable manner to process significant volumes of information.

Features:

  • Transaction Management.
  • Chunk based processing.
  • Declarative I/O.
  • Start /stop/restart/retry.

I do not want to deep into the details of Spring Batch, you can find all information about it here, but in this part, we try to solve the card issuer problem by Spring Batch for using it in the next parts of the article.

Before we start, let's review again the problem and architecture, we have a have big file contains names and we want to issue a card for such names, regularly the first step is storing in the database and after revising (e.g., removing duplicate names, or eliminating null records) Spring Boot comes into operation, reads data from the database, processes that data, and issues a card.

Spring Batch Flow


Ok, first of all, you should create a new Spring Boot project with the following Maven dependency:

XML
 




x


 
1
<?xml version="1.0" encoding="UTF-8"?>
2
<project xmlns="http://maven.apache.org/POM/4.0.0"
3
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5
    <modelVersion>4.0.0</modelVersion>
6

          
7
    <groupId>com.tosan</groupId>
8
    <artifactId>Tosan-Flowable</artifactId>
9
    <version>1.0-SNAPSHOT</version>
10

          
11
    <properties>
12
        <maven.compiler.source>8</maven.compiler.source>
13
        <maven.compiler.target>8</maven.compiler.target>
14
    </properties>
15

          
16
     <parent>
17
        <groupId>org.springframework.boot</groupId>
18
        <artifactId>spring-boot-starter-parent</artifactId>
19
         <version>2.4.3</version>
20
        <relativePath/> <!-- lookup parent from repository -->
21
    </parent>
22

          
23
    <dependencies>
24
        <dependency>
25
            <groupId>org.springframework.boot</groupId>
26
            <artifactId>spring-boot-starter-web</artifactId>
27
        </dependency>
28

          
29
        <dependency>
30
            <groupId>org.springframework.boot</groupId>
31
            <artifactId>spring-boot-starter-test</artifactId>
32
            <scope>test</scope>
33
        </dependency>
34
        <dependency>
35
            <groupId>org.springframework.boot</groupId>
36
            <artifactId>spring-boot-starter-batch</artifactId>
37
        </dependency>
38

          
39
        <dependency>
40
            <groupId>org.hsqldb</groupId>
41
            <artifactId>hsqldb</artifactId>
42
            <scope>runtime</scope>
43
        </dependency>
44
        <dependency>
45
            <groupId>org.springframework.boot</groupId>
46
            <artifactId>spring-boot-starter-test</artifactId>
47
            <scope>test</scope>
48
        </dependency>
49

          
50

          
51
    </dependencies>
52

          
53
    <build>
54
        <plugins>
55
            <plugin>
56
                <groupId>org.springframework.boot</groupId>
57
                <artifactId>spring-boot-maven-plugin</artifactId>
58
            </plugin>
59

          
60
           
61
        </plugins>
62
    </build>
63

          
64
</project>



Like every Spring Boot-based project, you need an application class, don't forget:

Java
 




xxxxxxxxxx
1


 
1
@EnableBatchProcessing


 In your application class:

Java
 




xxxxxxxxxx
1
18


 
1
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
2
import org.springframework.boot.SpringApplication;
3
import org.springframework.boot.autoconfigure.SpringBootApplication;
4

          
5

          
6
@SpringBootApplication
7
@EnableBatchProcessing
8

          
9
public class SpringBatchApplication
10
{
11

          
12
    public static void main(String[] args)
13
    {
14
        SpringApplication.run(SpringBatchApplication.class, args);
15
    }
16

          
17

          
18
}



Obviously, the CardHolder class needed for the base entity.

Java
 




x
1
36


 
1
import java.io.Serializable;
2

          
3
public class CardHolder  implements Serializable {
4
    private  String expireDate;
5
    private String cardHolderName;
6

          
7
    public String getExpireDate() {
8
        return expireDate;
9
    }
10

          
11
    @Override
12
    public int hashCode() {
13
        return super.hashCode();
14
    }
15

          
16
    @Override
17
    public boolean equals(Object obj) {
18
        return super.equals(obj);
19
    }
20

          
21
    public void setExpireDate(String expireDate) {
22
        this.expireDate = expireDate;
23
    }
24

          
25
    public String getCardHolderName() {
26
        return cardHolderName;
27
    }
28

          
29
    public void setCardHolderName(String cardHolderName) {
30
        this.cardHolderName = cardHolderName;
31
    }
32

          
33
    @Override
34
    public String toString() {
35
        return "Card issued with expired date [ "+expireDate+" ]  and  name ["+ cardHolderName +"]";
36
    }



The data entry point of our batch systems is the reader, so here is a code of CardHolderItemReader:

Java
 




x


 
1
import org.springframework.batch.item.ItemReader;
2
import org.springframework.beans.factory.annotation.Autowired;
3
import java.io.FileNotFoundException;
4
import java.util.List;
5

          
6
public class CardHolderItemReader implements ItemReader<CardHolder> {
7

          
8

          
9
    private List<CardHolder> customerList;
10
    static int idx=0;
11

          
12
    @Autowired
13
    CardHoldersRepository cardHoldersRepository;
14

          
15
    public CardHolderItemReader() {
16
        customerList=cardHoldersRepository.getCardHolders();
17

          
18
    }
19

          
20
    @Override
21
    public CardHolder read() throws Exception {
22
        CardHolder c= customerList.get(idx);
23
        idx++;
24
        if(idx==customerList.size())
25
             return null ;
26
        return c;
27

          
28
    }
29

          
30
    private List<CardHolder> customers() throws FileNotFoundException {
31

          
32
        return  this.customerList;
33
    }


The main part: The processor that processes the CardHolder item list and issues cards!

Java
 




xxxxxxxxxx
1
16


 
1
import org.springframework.batch.item.validator.ValidatingItemProcessor;
2

          
3
public class CardIssueProcessor extends ValidatingItemProcessor<CardHolder> {
4

          
5
    public CardIssueProcessor() {
6
        super(
7
                item -> {
8
                    if(item!=null) {
9
                        //connect to card issuer service and issue card
10
                        System.out.println("card Issued for " + item.getCardHolderName());
11
                    }
12
                }
13
        );
14
        setFilter(true);
15
    }
16
}



Batch config class: besides all Spring Batch-related code we have a scheduled method named 'run' that fires jobs every 500 milliseconds.

Java
 




x


 
1
package com.config;
2

          
3
import org.springframework.batch.core.*;
4
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
5
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
6
import org.springframework.batch.core.configuration.annotation.StepScope;
7
import org.springframework.batch.core.launch.JobLauncher;
8
import org.springframework.batch.core.scope.context.ChunkContext;
9
import org.springframework.batch.core.step.tasklet.Tasklet;
10
import org.springframework.batch.item.ItemProcessor;
11
import org.springframework.batch.item.ItemReader;
12
import org.springframework.batch.item.ItemWriter;
13
import org.springframework.batch.item.support.CompositeItemProcessor;
14
import org.springframework.batch.repeat.RepeatStatus;
15
import org.springframework.beans.factory.annotation.Autowired;
16
import org.springframework.context.annotation.Bean;
17
import org.springframework.context.annotation.Configuration;
18
import org.springframework.scheduling.annotation.Scheduled;
19

          
20
import java.util.Arrays;
21

          
22
@Configuration
23
public class CardReportJobConfig {
24
    @Autowired
25
    private JobBuilderFactory jobBuilders;
26

          
27
    @Autowired
28
    private StepBuilderFactory stepBuilders;
29

          
30
    @Autowired
31
    JobLauncher jobLauncher;
32

          
33
    @Bean
34
    public Job customerReportJob() {
35
        return jobBuilders.get("customerReportJob")
36
                .start(taskletStep())
37
                .next(chunkStep())
38
                .build();
39
    }
40

          
41
    @Bean
42
    public Step taskletStep() {
43
        return stepBuilders.get("taskletStep")
44
                .tasklet(Tasklettasklet())
45
                .build();
46
    }
47

          
48
    @Bean
49
    public Tasklet Tasklettasklet() {
50

          
51
            return new Tasklet() {
52
                @Override
53
                public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
54
                    return RepeatStatus.FINISHED;
55
                }
56
            };
57

          
58
    }
59

          
60

          
61

          
62
    @Bean
63
    public Step chunkStep() {
64
        return stepBuilders.get("chunkStep")
65
                .<CardHolder, CardHolder>chunk(20)
66
                .reader(reader())
67
                .processor(processor())
68
                .writer(writer())
69
                .build();
70
    }
71

          
72
    @StepScope
73
    @Bean
74
    public ItemReader<CardHolder> reader() {
75
        return new CardHolderItemReader();
76
    }
77

          
78
    @StepScope
79
    @Bean
80
    public ItemProcessor<CardHolder, CardHolder> processor() {
81
        final CompositeItemProcessor<CardHolder, CardHolder> processor = new CompositeItemProcessor<>();
82
        processor.setDelegates(Arrays.asList(new CardIssueProcessor()));
83
        return processor;
84
    }
85

          
86
    @StepScope
87
    @Bean
88
    public ItemWriter<CardHolder> writer() {
89
        return new CardHolderItemWriter();
90
    }
91

          
92
    @Scheduled(fixedRate = 500)
93
    public void run() throws Exception {
94
        JobExecution execution = jobLauncher.run(
95
                customerReportJob(),
96
                new JobParametersBuilder().toJobParameters()
97
        );
98
    }
99
}



Here is a writer class that stores the log and result of operation into the file named 'output.txt.'

Java
 




x



1
import org.springframework.batch.item.ItemWriter;
2

          
3
import javax.annotation.PreDestroy;
4
import java.io.*;
5
import java.util.List;
6

          
7
public class CardHolderItemWriter implements ItemWriter<CardHolder>, Closeable {
8
    private final PrintWriter writer;
9

          
10
    public CardHolderItemWriter() {
11
        OutputStream out;
12
        try {
13
            out = new FileOutputStream("output.txt");
14
        } catch (FileNotFoundException e) {
15
            out = System.out;
16
        }
17
        this.writer = new PrintWriter(out);
18
    }
19

          
20
    @Override
21
    public void write(final List<? extends CardHolder> items) throws Exception {
22
        for (CardHolder item : items) {
23
            writer.println(item.toString());
24
        }
25
    }
26

          
27
    @PreDestroy
28
    @Override
29
    public void close() throws IOException {
30
        writer.close();
31
    }
32
}
33

          



Spring UI

Cards Issued List


But there is an important problem we need to write the file into the database before starting the batch, it is possible to handle them in memory but it is not a good idea for such huge files.

Integration Spark and Spring Batch

The main problem of using a database as a storage of files before the batch operation is performance because you need to write millions of records into the database and read from it, sometimes revise these records. So what is the solution? The answer is Spark. Spark can replace the database in the starting point of batches:

Spark in the Flow

In the previous article, I had a full explanation of spark. In this part, we want to integrate it with Spring Batch. The first step is the creation of SparkDataFactory that connects to spark and reads data from the file and returns the list of CardHolders:


Java
 




xxxxxxxxxx
1
36


 
1
import java.util.List;
2

          
3
public class SparkDataFactory {
4
    private static final String CSV_URL="HolderNames.csv";
5
    private static volatile   SparkDataFactory me;
6
    private SparkDataFactory(){
7

          
8
    }
9

          
10
    public static  SparkDataFactory getInstance(){
11
        if(me==null)
12
            synchronized (SparkDataFactory.class) {
13
                me = new SparkDataFactory();
14
            }
15
        return me;
16

          
17
    }
18
    public List<CardHolder> readData() {
19

          
20
        SparkSession spark =SparkSession.builder().master("local[*]").getOrCreate();
21

          
22
        JavaRDD<CardHolder> peopleRDD = spark.read()
23
                .textFile(CSV_URL)
24
                .javaRDD()
25
                .map(line -> {
26
                    String[] parts = line.split(",");
27
                    CardHolder holder = new CardHolder();
28
                    holder.setCardHolderName(parts[0]);
29
                    holder.setExpireDate(parts[1]);
30
                    return holder;
31
                });
32

          
33
        return  peopleRDD.collect();
34
    }
35

          
36
    }



The next step is replacing SpringDataRepository with a Spark reader for reading data.

Remember that you can decide to read the data in your desired size, for example, you can decide to read data in the list with size 100, and each time you read 100 record:

import org.springframework.batch.item.ItemReader;
import org.springframework.beans.factory.annotation.Autowired;
import java.io.FileNotFoundException;
import java.util.List;

public class CardHolderItemReader implements ItemReader<CardHolder> {


    private List<CardHolder> customerList;
    static int idx=0;

    CardHoldersRepository cardHoldersRepository;

    public CardHolderItemReader() {

        customerList=SparkDataFactory.getInstance().readData();

    }

    @Override
    public CardHolder read() throws Exception {
        CardHolder c= customerList.get(idx);
        idx++;
        if(idx==customerList.size())
             return null ;
        return c;

    }

    private List<CardHolder> customers() throws FileNotFoundException {

        return  this.customerList;
    }


}


The execution result is:

Execution Result

OK, we did it! In the simple non-functional test, I measured the performance in the two cases, database-based approach and Spark-based approach, test results show that there is about a 60x improvement in batch execution time for 2 million records when we used Spark.

What Is Next: Clouds

You can use Spring Data flow as a part of Spring Cloud to build your batch application in the cloud infrastructure, Spring Cloud Data Flow provides tools to create complex typologies for batch data, you can find more information about it here; it will be interesting. Enjoy it!

Spring Framework Batch processing Stream processing Spring Batch Database Data (computing) Cards (iOS) Apache Spark Spring Cloud

Opinions expressed by DZone contributors are their own.

Related

  • How Kafka Can Make Microservice Planet a Better Place
  • Batch Processing Large Data Sets with Spring Boot and Spring Batch
  • Lightweight Kubernetes k3s: Installation and Spring Application Example in Azure Cloud
  • Real-Time Analytics: All Data, Any Data, Any Scale, at Any Time

Partner Resources


Comments

ABOUT US

  • About DZone
  • Send feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends: