在 Java Web 开发中,Spring 框架为我们提供了强大的功能来处理各种任务,其中批处理作业是一项非常重要的功能。批处理作业通常用于处理大量数据、定时任务等场景,例如每日数据统计、定期清理日志等。本文将详细介绍如何在 Spring 框架中配置和定义批处理作业。
在开始之前,我们需要确保已经搭建好 Spring Boot 开发环境。可以使用 Spring Initializr(https://start.spring.io/)来快速创建一个 Spring Boot 项目,添加以下依赖:
在 Spring Batch 中,一个批处理作业主要由以下几个核心组件组成:
| 组件名称 | 描述 |
| —— | —— |
| Job | 代表一个完整的批处理作业,由多个 Step 组成。 |
| Step | 作业的一个独立执行单元,包含读取数据(ItemReader)、处理数据(ItemProcessor)和写入数据(ItemWriter)三个主要步骤。 |
| ItemReader | 负责从数据源(如数据库、文件等)读取数据。 |
| ItemProcessor | 对读取的数据进行处理和转换。 |
| ItemWriter | 将处理后的数据写入目标数据源。 |
首先,我们创建一个简单的实体类 Person
来表示要处理的数据。
public class Person {
private String firstName;
private String lastName;
public Person() {}
public Person(String firstName, String lastName) {
this.firstName = firstName;
this.lastName = lastName;
}
// Getters and Setters
public String getFirstName() {
return firstName;
}
public void setFirstName(String firstName) {
this.firstName = firstName;
}
public String getLastName() {
return lastName;
}
public void setLastName(String lastName) {
this.lastName = lastName;
}
@Override
public String toString() {
return "Person{" +
"firstName='" + firstName + '\'' +
", lastName='" + lastName + '\'' +
'}';
}
}
我们使用 FlatFileItemReader
从 CSV 文件中读取数据。
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
@Configuration
public class ReaderConfig {
@Bean
public FlatFileItemReader<Person> reader() {
FlatFileItemReader<Person> reader = new FlatFileItemReader<>();
reader.setResource(new ClassPathResource("persons.csv"));
reader.setLineMapper(new DefaultLineMapper<Person>() {{
setLineTokenizer(new DelimitedLineTokenizer() {{
setNames(new String[]{"firstName", "lastName"});
}});
setFieldSetMapper(new BeanWrapperFieldSetMapper<Person>() {{
setTargetType(Person.class);
}});
}});
return reader;
}
}
创建一个简单的处理器,将姓氏转换为大写。
import org.springframework.batch.item.ItemProcessor;
public class PersonItemProcessor implements ItemProcessor<Person, Person> {
@Override
public Person process(Person person) throws Exception {
String firstName = person.getFirstName();
String lastName = person.getLastName().toUpperCase();
return new Person(firstName, lastName);
}
}
使用 JdbcBatchItemWriter
将处理后的数据写入数据库。
import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.sql.DataSource;
@Configuration
public class WriterConfig {
@Bean
public JdbcBatchItemWriter<Person> writer(DataSource dataSource) {
JdbcBatchItemWriter<Person> writer = new JdbcBatchItemWriter<>();
writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>());
writer.setSql("INSERT INTO persons (first_name, last_name) VALUES (:firstName, :lastName)");
writer.setDataSource(dataSource);
return writer;
}
}
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.sql.DataSource;
@Configuration
@EnableBatchProcessing
public class BatchConfig {
@Autowired
public JobBuilderFactory jobBuilderFactory;
@Autowired
public StepBuilderFactory stepBuilderFactory;
@Autowired
public DataSource dataSource;
@Bean
public Step step1(ReaderConfig readerConfig, PersonItemProcessor processor, WriterConfig writerConfig) {
return stepBuilderFactory.get("step1")
.<Person, Person> chunk(10)
.reader(readerConfig.reader())
.processor(processor)
.writer(writerConfig.writer(dataSource))
.build();
}
@Bean
public Job importUserJob(Step step1) {
return jobBuilderFactory.get("importUserJob")
.incrementer(new RunIdIncrementer())
.flow(step1)
.end()
.build();
}
}
创建一个启动类来运行批处理作业。
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class BatchJobApplication implements CommandLineRunner {
@Autowired
JobLauncher jobLauncher;
@Autowired
Job importUserJob;
public static void main(String[] args) {
SpringApplication.run(BatchJobApplication.class, args);
}
@Override
public void run(String... args) throws Exception {
JobParameters jobParameters = new JobParametersBuilder()
.addLong("time", System.currentTimeMillis())
.toJobParameters();
jobLauncher.run(importUserJob, jobParameters);
}
}
通过以上步骤,我们成功地在 Spring 框架中配置和定义了一个简单的批处理作业。首先,我们创建了实体类来表示要处理的数据;然后,使用 ItemReader
从 CSV 文件中读取数据,ItemProcessor
对数据进行处理,ItemWriter
将处理后的数据写入数据库;最后,定义了 Step
和 Job
并运行批处理作业。Spring Batch 提供了丰富的组件和配置选项,可以根据不同的业务需求进行灵活扩展和定制。希望本文能帮助你更好地理解和使用 Spring 批处理作业。