preface
This article explains from zero to one how a Spring Batch gets up and running.
This tutorial will cover reading data from a text file and then writing it to MySQL.
What is Spring Batch
Spring Batch, as a subproject of Spring, is an enterprise Batch framework based on Spring. It can be used to build robust enterprise batch applications. Spring Batch not only provides a unified read and write interface, rich task processing methods, flexible transaction management and concurrent processing, but also supports logging, monitoring, task restart and skip features, which greatly simplifies Batch application development and frees developers from complex task configuration management process. This allows them to focus more on the core business processes.
IO /projects/sp…
Environment set up
I use Intellij Idea and build it with Gradle.
You can use Spring Initializr to create a Spring Boot application. Address: start. Spring. IO /
First select Gradle Project and then Java. Fill in your Group and Artifact names. Finally, search for the package you need to use, such as Batch is a must. Also, since the Batch project I wrote used JPA to insert data into MySQL, I added JPA and MySQL. Others can be added according to your needs.
Click Generate Project and a Project is created.
The build. gralde file might look something like this:
buildscript {
ext {
springBootVersion = '2.0.4. RELEASE'
}
repositories {
mavenCentral()
}
dependencies {
classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
}
}
apply plugin: 'java'
apply plugin: 'idea'
apply plugin: 'org.springframework.boot'
apply plugin: 'io.spring.dependency-management'
group = 'com.demo'
version = '0.0.1 - the SNAPSHOT'
sourceCompatibility = 1.8repositories {mavenCentral()} dependencies {compile('org.springframework.boot:spring-boot-starter-batch')
compile('org.springframework.boot:spring-boot-starter-jdbc')
compile("org.springframework.boot:spring-boot-starter-data-jpa")
compile group: 'com.fasterxml.jackson.datatype', name: 'jackson-datatype-joda', version: '2.9.4'
compile group: 'org.jadira.usertype', name: 'usertype.core', version: 'the 6.0.1. GA'
compile group: 'mysql', name: 'mysql-connector-java', version: '6.0.6'.testCompile('org.springframework.boot:spring-boot-starter-test')
testCompile('org.springframework.batch:spring-batch-test')}Copy the code
Spring Batch structure
There are many Spring Batch structures and principles on the web, but I will not elaborate on them in detail. I will only cover a basic Spring Batch hierarchy here.
First, Spring Batch runs in the basic unit of a Job, and a Job does a Batch Job. A Job contains many steps. A Step is a single Step performed by each Job. As shown in the figure below, there is a Tasklet inside a Step. A Tasklet is a task unit that can be reused.
And then Chunk, and Chunk is a Chunk of data, and you need to define how much data is a Chunk. Chunk is a loop of data reading, processing, and writing. Spring Batch iterates through this process until the Batch data is completed.
Build Spring Batch
First, we need a global Configuration to configure all jobs and some global configurations.
The code is as follows:
@Configuration
@EnableAutoConfiguration
@EnableBatchProcessing(modular = true)
public class SpringBatchConfiguration {
@Bean
public ApplicationContextFactory firstJobContext() {
return new GenericApplicationContextFactory(FirstJobConfiguration.class);
}
@Bean
public ApplicationContextFactory secondJobContext() {
returnnew GenericApplicationContextFactory(SecondJobConfiguration.class); }}Copy the code
@enablebatchProcessing Yes Batch is enabled. To implement multiple jobs, set the EnableBatchProcessing annotation modular to true, allowing each Job to use its own ApplicationConext.
For example, the above code creates two jobs.
Example background
The example of this blog is migrating data. The data source is a text file with millions of pieces of data, one line of data. Then we used Spring Batch to help us migrate all the data of text files to the corresponding tables of MySQL database.
Assuming that the data we are migrating is Message, we need to create a data class called Message that maps to the database.
@Entity
@Table(name = "message")
public class Message {
@Id
@Column(name = "object_id", nullable = false)
private String objectId;
@Column(name = "content")
private String content;
@Column(name = "last_modified_time")
private LocalDateTime lastModifiedTime;
@Column(name = "created_time")
private LocalDateTime createdTime;
}
Copy the code
To build the Job
First we need a Configuration for this Job, which will be loaded in SpringBatchConfigration.
@Configuration
@EnableAutoConfiguration
@EnableBatchProcessing(modular = true)
public class SpringBatchConfiguration {
@Bean
public ApplicationContextFactory messageMigrationJobContext() {
returnnew GenericApplicationContextFactory(MessageMigrationJobConfiguration.class); }}Copy the code
The following about building Job code will be written in the MessageMigrationJobConfiguration.
public class MessageMigrationJobConfiguration {
}
Copy the code
Let’s first define a Job Bean.
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Bean
public Job messageMigrationJob(@Qualifier("messageMigrationStep") Step messageMigrationStep) {
return jobBuilderFactory.get("messageMigrationJob")
.start(messageMigrationStep)
.build();
}
Copy the code
JobBuilderFactory is injected, get is the name of the job.
This job only has one step.
Step
The next Step is to create the Step.
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public Step messageMigrationStep(@Qualifier("jsonMessageReader") FlatFileItemReader<Message> jsonMessageReader,
@Qualifier("messageItemWriter") JpaItemWriter<Message> messageItemWriter,
@Qualifier("errorWriter") Writer errorWriter) {
return stepBuilderFactory.get("messageMigrationStep")
.<Message, Message>chunk(CHUNK_SIZE)
.reader(jsonMessageReader).faultTolerant().skip(JsonParseException.class).skipLimit(SKIP_LIMIT)
.listener(new MessageItemReadListener(errorWriter))
.writer(messageItemWriter).faultTolerant().skip(Exception.class).skipLimit(SKIP_LIMIT)
.listener(new MessageWriteListener())
.build();
}
Copy the code
StepBuilderFactory is injected in, and then inside get is the name of Step.
We can build a lot of things in a Step, like readers, processers, writers, listeners, and so on.
Let’s take a look at how each of these steps is used.
Chunk
Spring Batch uses the chunk-based mechanism to configure Step. That is, it reads one piece of data at a time, processes another piece of data, and sends the data to writer for writing when a certain amount of data is accumulated. This maximizes write efficiency and the entire transaction is Chunk based.
For example, we define the chunk size as 50, which means that Spring Batch processes 50 pieces of data before uniformly writing them to the database.
There is an important point here. Before chunk, we need to define the data input type and the output type. Since the input is Message and the output is Message, we write both of them as Message.
If this type is not defined, an error is reported.
.<Message, Message>chunk(CHUNK_SIZE)
Copy the code
Reader
A Reader, as the name implies, reads data from a data source.
Spring Batch provides a number of useful readers for almost all of our needs. FlatFileItemReader, JdbcCursorItemReader, JpaPagingItemReader, etc. You can also implement Reader yourself.
In this example, the data source is a text file, so we use FlatFileItemReader. FlatFileItemReader reads data line by line from a file.
The first step is to set the file path, which is called Resource.
Since we need to map a line of text to the Message class, we need to set up and implement LineMapper ourselves.
@Bean
public FlatFileItemReader<Message> jsonMessageReader() {
FlatFileItemReader<Message> reader = new FlatFileItemReader<>();
reader.setResource(new FileSystemResource(new File(MESSAGE_FILE)));
reader.setLineMapper(new MessageLineMapper());
return reader;
}
Copy the code
Line Mapper
LineMapper input takes a line of text, and the line number, and converts it to Message.
In this example, a line of text is a JSON object, so we use JsonParser to convert it to Message.
public class MessageLineMapper implements LineMapper<Message> { private MappingJsonFactory factory = new MappingJsonFactory(); @Override public Message mapLine(String line, int lineNumber) throws Exception { JsonParser parser = factory.createParser(line); Map<String, Object> map = (Map) parser.readValueAs(Map.class); Message message = new Message(); . // Convert logicreturnmessage; }}Copy the code
Processor
Since in this example, the data is a line of text, the reader becomes the Message class, and Writer writes the Message directly to MySQL. Therefore, in our example, there is no need for Processor, which is the same as reader/writer.
As you can see from its interface, you need to define the types of input and output, and return output O after passing input I through some logic.
public interface ItemProcessor<I, O> {
O process(I item) throws Exception;
}
Copy the code
Writer
Writer writes data to the target data source.
Spring Batch also provides us with many useful writers. For example, JpaItemWriter, FlatFileItemWriter, HibernateItemWriter, JdbcBatchItemWriter, etc. You can also customize it.
In this example, the JpaItemWriter is used to write the Message object directly to the database. But you need to set up an EntityManagerFactory that can be injected.
@Autowired
private EntityManagerFactory entityManager;
@Bean
public JpaItemWriter<Message> messageItemWriter() {
JpaItemWriter<Message> writer = new JpaItemWriter<>();
writer.setEntityManagerFactory(entityManager);
return writer;
}
Copy the code
In addition, you need to configure things like database connections. Since I am using Spring, I will directly configure the following in application.properties:
spring.datasource.url=jdbc:mysql://database
spring.datasource.username=username
spring.datasource.password=password
spring.datasource.driverClassName=com.mysql.cj.jdbc.Driver
spring.jpa.database-platform=org.hibernate.dialect.MySQLDialect
spring.jpa.show-sql=true
spring.jpa.properties.jadira.usertype.autoRegisterUserTypes=true
spring.jackson.serialization.write-dates-as-timestamps=false
spring.batch.initialize-schema=ALWAYS
spring.jpa.hibernate.ddl-auto=update
Copy the code
The spring.datasource setup is all about configuring the database connection.
Spring.batch. initialize-schema=always Specifies that Spring Batch creates default tables in the database.
Spring.jpa. show-sql=true indicates the SQL that hibernate reads and writes to the database in the console.
Spring. Jpa. Database – platform = org. Hibernate. The dialect. MySQLDialect is a dialect in MySQL.
Listener
Spring Batch also implements a very comprehensive listener, which is understood to be used to listen for the results of each step. For example, you can listen for steps, you can listen for jobs, you can listen for readers, you can listen for writers. There are no listeners you can’t find, only listeners you can’t think of.
In this case, ALL I care about is reading and writing, so I just implement ReadListener and WriteListener.
When a read error occurs, the error result is written to a separate error list file.
public class MessageItemReadListener implements ItemReadListener<Message> {
private Writer errorWriter;
public MessageItemReadListener(Writer errorWriter) {
this.errorWriter = errorWriter;
}
@Override
public void beforeRead() {
}
@Override
public void afterRead(Message item) {
}
@Override
public void onReadError(Exception ex) {
errorWriter.write(format("%s%n", ex.getMessage())); }}Copy the code
When a write error occurs, do the same and write the cause of the error to a separate log.
public class MessageWriteListener implements ItemWriteListener<Message> {
@Autowired
private Writer errorWriter;
@Override
public void beforeWrite(List<? extends Message> items) {
}
@Override
public void afterWrite(List<? extends Message> items) {
}
@Override
public void onWriteError(Exception exception, List<? extends Message> items) {
errorWriter.write(format("%s%n", exception.getMessage()));
for (Message message : items) {
errorWriter.write(format("Failed writing message id: %s", message.getObjectId())); }}}Copy the code
A List is passed to the listener for write, because it is written to a certain number.
Skip
Spring Batch provides the skip mechanism, which means that if something goes wrong, it can be skipped. If you don’t set SKIP, one piece of data fails and the whole job dies.
When you set skip, you must set what exceptions to skip and how many data to skip. If the number of failed data exceeds the skip limit you set, the job will fail.
You can skip readers, writers, etc.
writer(messageItemWriter).faultTolerant().skip(Exception.class).skipLimit(SKIP_LIMIT)
Copy the code
Retry
This is the same principle as Skip, that is, you can retry after failure. You also need to set the number of retries. You can configure the retry mechanism for reader and Writer respectively.
If both retry and SKIP are set, all retries are made before starting SKIP. For example, if retry is 10 times and skip is 20, the first skip will be counted after 10 retries.
Run the Job
Once everything is ready, it’s how it works.
To run a job is to use JobLauncher in the main method to run the job you specified.
The main method takes the name of the job so that we can run different jobs by different job names.
First we get jobRegistry from the running Spring Application, and then we find the corresponding job by its name.
JobLauncher can then be used to run the job, passing some parameters, such as the file path or file date required by the job, through jobParameters. If no parameter is specified, the current time can be uploaded by default.
public static void main(String[] args) {
String jobName = args[0];
try {
ConfigurableApplicationContext context = SpringApplication.run(ZuociBatchApplication.class, args);
JobRegistry jobRegistry = context.getBean(JobRegistry.class);
Job job = jobRegistry.getJob(jobName);
JobLauncher jobLauncher = context.getBean(JobLauncher.class);
JobExecution jobExecution = jobLauncher.run(job, createJobParams());
if(! jobExecution.getExitStatus().equals(ExitStatus.COMPLETED)) { throw new RuntimeException(format("%s Job execution failed.", jobName));
}
} catch (Exception e) {
throw new RuntimeException(format("%s Job execution failed.", jobName));
}
}
private static JobParameters createJobParams() {
return new JobParametersBuilder().addDate("date", new Date()).toJobParameters();
}
Copy the code
Finally, compile the JAR package and execute the following command from the command line to run your Spring Batch.
java -jar YOUR_BATCH_NAME.jar YOUR_JOB_NAME
Copy the code
debugging
You can set the level of log output in application.properties, such as whether you want to output INFO or DEBUG messages.
Basically, you can locate the problem by looking at the log.
logging.path=build/logs
logging.file=${logging.path}/batch.log
logging.level.com.easystudio=INFO
logging.level.root=INFO
log4j.logger.org.springframework.jdbc=INFO
log4j.logger.org.springframework.batch=INFO
logging.level.org.hibernate.SQL=INFO
Copy the code
Spring Batch data tables
If your batch is eventually written to the database, Spring Batch by default creates batch related tables in your database to record the status and results of all job/step runs.
You don’t need to care about most of the tables, you only need to care about a few tables.
Batch_job_instance: This table shows the name of each running job.
Batch_job_execution: This table shows the start time, end time, status, and error message of each job run.
Batch_step_execution: This is a table where you can see more details about step. Such as step start time, end time, commit times, read/write times, status, and error messages after failure, etc.
conclusion
Spring Batch provides us with very practical functions, complete abstraction of Batch processing scenarios, which can not only realize the migration of small data, but also cope with the practical application of big data in large enterprises. It allows us to develop batch applications with less effort.
Last tip, there will be various problems in the process of setting up Spring Batch. Anyone who uses Google can find the answer.