Spring Boot + Spring Batch Listener
Overview
In this article, we'll explore how to integrate Spring Boot with Spring Batch Listener.
Spring Batch Tutorial
Spring Batch Overview - Architecture
Spring Boot + Spring Batch + MySQL Simple Example
- Spring Boot + Spring Batch Listener Example
Spring Boot + Spring Batch Job Scheduler Example
Spring Batch Interview Questions and Answers
We will understand the concept of Listeners available as part of Spring Batch framework.
Listeners are entities that help in intercepting the execution of a Job or Step and allowing the user to do certain functionality.
In this example, will read the content of the airport data CSV file and write it to the output text file, while also printing the logs from our listeners using Spring Batch Listener.
Take a look at our suggested posts:
Now, let's create Spring Batch Listener with Spring Boot Application.
Project Structure
Create Spring Boot Maven project
Add the spring-boot-starter-batch and h2 embedded database dependency in pom.xml.
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.0.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.techgeeknext</groupId>
<artifactId>springbootbatchlistener</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>springbootbatchlistener</name>
<description>Spring Boot - Spring Batch Listener Example</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-rest</artifactId>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.14</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-core</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
JobExecutionListener
JobExecutionListener
offers interceptions and life-cycle methods for Spring Batch Jobs.
There are two methods beforeJob()
and afterJob()
, which, as the names indicate,
allow us to do whatever we want before the execution of a job begins and after the execution of the job finishes.
package com.techgeeknext.springbootbatchlistener.listener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionListener;
public class SpringBatchJobExecutionListener implements JobExecutionListener {
Logger logger = LoggerFactory.getLogger(SpringBatchJobExecutionListener.class);
public void beforeJob(JobExecution jobExecution) {
logger.info("AirportJobExecutionListener - beforeJob started.");
}
public void afterJob(JobExecution jobExecution) {
if (jobExecution.getStatus() == BatchStatus.COMPLETED) {
logger.info("AirportJobExecutionListener - - afterJob completed successfully");
} else if (jobExecution.getStatus() == BatchStatus.FAILED) {
logger.info("AirportJobExecutionListener - afterJob failed.");
}
}
}
StepExecutionListener
StepExecutionListener
offers interceptions and life-cycle methods for Spring Batch Steps.
There are two methods beforeJob()
and afterJob()
, which, as the names indicate,
allow us to do whatever we want before the execution of a step begins and after the execution of the step finishes.
The afterStep() method returns an ExitStatus, which indicates whether or not the step execution was successful.
package com.techgeeknext.springbootbatchlistener.listener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.StepExecutionListener;
public class SpringBatchStepExecutionListener implements StepExecutionListener {
Logger logger = LoggerFactory.getLogger(SpringBatchStepExecutionListener.class);
@Override
public void beforeStep(StepExecution stepExecution)
{
logger.info("AirportStepListener - called before step.");
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
logger.info("AirportStepListener - called after step.");
//ExitStatus indicate that step has completed
return ExitStatus.COMPLETED;
}
}
Configure Listeners
In this class will configure SpringBatchJobExecutionListener
in job, SpringBatchStepExecutionListener
in step.
To configure your job, use @Configuration
and @EnableBatchProcessing
annotations, which adds several important beans to help jobs and saves you a lot of time.
The airportInfoJob() method defines the job, and the airportInfoStep() defines a single step. Jobs are created from steps, where each step can have a reader, a processor, and a writer.
package com.techgeeknext.springbootbatchlistener.config;
import com.techgeeknext.springbootbatchlistener.listener.SpringBatchJobExecutionListener;
import com.techgeeknext.springbootbatchlistener.listener.SpringBatchStepExecutionListener;
import com.techgeeknext.springbootbatchlistener.model.AirportInfo;
import com.techgeeknext.springbootbatchlistener.step.AirportInfoProcessor;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.FlatFileItemWriter;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.batch.item.file.builder.FlatFileItemWriterBuilder;
import org.springframework.batch.item.file.transform.PassThroughLineAggregator;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.FileSystemResource;
/**
* Spring Batch Configuration Class
*/
@Configuration
@EnableBatchProcessing
public class SpringBatchConfig {
@Autowired
public JobBuilderFactory jobBuilderFactory;
@Autowired
public StepBuilderFactory stepBuilderFactory;
@Bean
public Job airportInfoJob() {
return jobBuilderFactory.get("airportInfoJob")
.incrementer(new RunIdIncrementer())
.listener(new SpringBatchJobExecutionListener())
.flow(airportInfoStep())
.end()
.build();
}
@Bean
public Step airportInfoStep() {
return stepBuilderFactory.get("airportInfoStep")
.listener(new SpringBatchStepExecutionListener())
.<AirportInfo, String>chunk(10)
.reader(airportInfoItemReader())
.processor(airportInfoProcessor())
.writer(airportInfoItemWriter())
.faultTolerant()
.retryLimit(3)
.retry(Exception.class)
.build();
}
@Bean
public FlatFileItemReader<AirportInfo> airportInfoItemReader() {
return new FlatFileItemReaderBuilder<AirportInfo>()
.name("airportInfoItemReader")
.resource(new ClassPathResource("csv/airports.csv"))
.delimited()
.names(new String[] {"code", "airport","city","state","country"})
.targetType(AirportInfo.class)
.build();
}
@Bean
public AirportInfoProcessor airportInfoProcessor(){
return new AirportInfoProcessor();
}
@Bean
public FlatFileItemWriter<String> airportInfoItemWriter() {
return new FlatFileItemWriterBuilder<String>()
.name("airportInfoItemWriter")
.resource(new FileSystemResource(
"csv/airport-output.txt"))
.lineAggregator(new PassThroughLineAggregator<>())
.build();
}
}
Process Data With ItemProcessor
Create AirportInfoProcessor class by implementing ItemProcessor interface and override process method to write our business logic.
We will read the content of the airport data CSV file and write it to the output file, while also printing the logs from our listeners.
package com.techgeeknext.springbootbatchlistener.step;
import com.techgeeknext.springbootbatchlistener.model.AirportInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.item.ItemProcessor;
import java.util.Date;
public class AirportInfoProcessor
implements ItemProcessor<AirportInfo, String> {
private static final Logger LOGGER =
LoggerFactory.getLogger(AirportInfoProcessor.class);
@Override
public String process(AirportInfo airportInfo) throws Exception {
System.out.println("===Airport Details===");
String message = "Airport Code- "+airportInfo.getCode()+" Airport Name- "
+ airportInfo.getAirport() + " City-" + airportInfo.getCity()+" Country- "+airportInfo.getCountry()
+" State- "+airportInfo.getState();
LOGGER.info("==copied '{}' to output file", message);
return message;
}
}
Input CSV File
Keep input csv file () under resources/csv folder.
Job Launcher
Create RestApi, which will call job launcher to start the job.
package com.techgeeknext.springbootbatchlistener.controller;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class SpringBatchJobController {
@Autowired
JobLauncher jobLauncher;
@Autowired
Job airportInfoJob;
/**
* Method to launch the batch job to read the airport details from input csv file and write to output text file
* @return String
* @throws Exception
*/
@RequestMapping("/launch/airport/job")
public String launchJob() throws Exception {
JobParameters jobParameters = new JobParametersBuilder().addLong("time", System.currentTimeMillis())
.toJobParameters();
jobLauncher.run(airportInfoJob, jobParameters);
return "Airport Job has been invoked from TechGeekNext!!!";
}
}
Test
- Use the http://localhost:8080/launch/airport/job url, this will start the Job.
- Once you hit the job launcher url as above, it starts processing the configured job with steps defined. Also you can notice logs are getting printed defined in our listeners.
- Also, as stated above the airport-output.txt will be created in the root folder (springbootbatch\csv).
. ____ _ __ _ _
/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v2.3.0.RELEASE)
19:14:52.465 INFO 27052 --- [main] c.t.s.SpringBootBatchExampleApplication : Starting SpringBootBatchExampleApplication with PID 27052 (D:\springbootbatchlistener\target\classes started in D:\springbootbatchlistener)
19:14:52.466 INFO 27052 --- [main] c.t.s.SpringBootBatchExampleApplication : No active profile set, falling back to default profiles: default
19:14:54.069 INFO 27052 --- [main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat initialized with port(s): 8080 (http)
19:14:54.082 INFO 27052 --- [main] o.apache.catalina.core.StandardService : Starting service [Tomcat]
19:14:54.082 INFO 27052 --- [main] org.apache.catalina.core.StandardEngine : Starting Servlet engine: [Apache Tomcat/9.0.35]
19:14:54.264 INFO 27052 --- [main] o.a.c.c.C.[Tomcat].[localhost].[/]: Initializing Spring embedded WebApplicationContext
19:14:54.265 INFO 27052 --- [main] o.s.web.context.ContextLoader : Root WebApplicationContext: initialization completed in 1761 ms
19:14:54.972 INFO 27052 --- [main] o.s.s.concurrent.ThreadPoolTaskExecutor : Initializing ExecutorService 'applicationTaskExecutor'
19:14:55.156 INFO 27052 --- [main] com.zaxxer.hikari.HikariDataSource: HikariPool-1 - Starting...
19:14:55.316 INFO 27052 --- [main] com.zaxxer.hikari.HikariDataSource: HikariPool-1 - Start completed.
19:14:55.395 INFO 27052 --- [main] o.s.b.c.r.s.JobRepositoryFactoryBean : No database type set, using meta data indicating: H2
19:14:55.414 INFO 27052 --- [main] o.s.b.c.l.support.SimpleJobLauncher: No TaskExecutor has been set, defaulting to synchronous executor.
19:14:55.594 INFO 27052 --- [main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 8080 (http) with context path ''
19:14:55.602 INFO 27052 --- [main] c.t.s.SpringBootBatchExampleApplication : Started SpringBootBatchExampleApplication in 3.532 seconds (JVM running for 3.941)
19:15:27.968 INFO 27052 --- [nio-8080-exec-1] o.a.c.c.C.[Tomcat].[localhost].[/]: Initializing Spring DispatcherServlet 'dispatcherServlet'
19:15:27.968 INFO 27052 --- [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet : Initializing Servlet 'dispatcherServlet'
19:15:28.017 INFO 27052 --- [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet : Completed initialization in 48 ms
19:15:28.162 INFO 27052 --- [nio-8080-exec-1] o.s.b.c.l.support.SimpleJobLauncher:
Job: [FlowJob: [name=airportInfoJob]] launched with the following parameters: [{time=1621950328051}]
19:15:28.221 INFO 27052 --- [nio-8080-exec-1] c.t.s.l.SpringBatchJobExecutionListener :
AirportJobExecutionListener - beforeJob started.
19:15:28.246 INFO 27052 --- [nio-8080-exec-1] o.s.batch.core.job.SimpleStepHandler :
Executing step: [airportInfoStep]
19:15:28.252 INFO 27052 --- [nio-8080-exec-1] c.t.s.l.SpringBatchStepExecutionListener :
AirportStepListener - called before step.
===Airport Details===
19:15:28.332 INFO 27052 --- [nio-8080-exec-1] c.t.s.step.AirportInfoProcessor :
==copied 'Airport Code- ABE Airport Name- Lehigh Valley International Airport City-Allentown Country- USA State- PA' to output file
===Airport Details===
19:15:28.333 INFO 27052 --- [nio-8080-exec-1] c.t.s.step.AirportInfoProcessor :
==copied 'Airport Code- ABI Airport Name- Abilene Regional Airport City-Abilene Country- USA State- TX' to output file
===Airport Details===
19:15:28.333 INFO 27052 --- [nio-8080-exec-1] c.t.s.step.AirportInfoProcessor :
==copied 'Airport Code- ABQ Airport Name- Albuquerque International Sunport City-Albuquerque Country- USA State- NM' to output file
19:15:28.339 INFO 27052 --- [nio-8080-exec-1] c.t.s.l.SpringBatchStepExecutionListener :
AirportStepListener - called after step.
19:15:28.339 INFO 27052 --- [nio-8080-exec-1] c.t.s.l.SpringBatchStepExecutionListener :
AirportStepListener - called after step.
19:15:28.342 INFO 27052 --- [nio-8080-exec-1] o.s.batch.core.step.AbstractStep :
Step: [airportInfoStep] executed in 95ms
19:15:28.350 INFO 27052 --- [nio-8080-exec-1] c.t.s.l.SpringBatchJobExecutionListener :
AirportJobExecutionListener - - afterJob completed successfully
19:15:28.354 INFO 27052 --- [nio-8080-exec-1] o.s.b.c.l.support.SimpleJobLauncher:
Job: [FlowJob: [name=airportInfoJob]] completed with the following parameters: [{time=1621950328051}] and the following status: [COMPLETED] in 146ms
Download Source Code
The full source code for this article can be found on below.Download it here - Spring Boot + Spring Batch Listener Example