
在 Java Web 开发中,Spring 框架为我们提供了强大的功能来处理各种任务,其中批处理作业是一项非常重要的功能。批处理作业通常用于处理大量数据、定时任务等场景,例如每日数据统计、定期清理日志等。本文将详细介绍如何在 Spring 框架中配置和定义批处理作业。
在开始之前,我们需要确保已经搭建好 Spring Boot 开发环境。可以使用 Spring Initializr(https://start.spring.io/)来快速创建一个 Spring Boot 项目,添加以下依赖:
在 Spring Batch 中,一个批处理作业主要由以下几个核心组件组成:
| 组件名称 | 描述 |
| —— | —— |
| Job | 代表一个完整的批处理作业,由多个 Step 组成。 |
| Step | 作业的一个独立执行单元,包含读取数据(ItemReader)、处理数据(ItemProcessor)和写入数据(ItemWriter)三个主要步骤。 |
| ItemReader | 负责从数据源(如数据库、文件等)读取数据。 |
| ItemProcessor | 对读取的数据进行处理和转换。 |
| ItemWriter | 将处理后的数据写入目标数据源。 |
首先,我们创建一个简单的实体类 Person 来表示要处理的数据。
public class Person {private String firstName;private String lastName;public Person() {}public Person(String firstName, String lastName) {this.firstName = firstName;this.lastName = lastName;}// Getters and Setterspublic String getFirstName() {return firstName;}public void setFirstName(String firstName) {this.firstName = firstName;}public String getLastName() {return lastName;}public void setLastName(String lastName) {this.lastName = lastName;}@Overridepublic String toString() {return "Person{" +"firstName='" + firstName + '\'' +", lastName='" + lastName + '\'' +'}';}}
我们使用 FlatFileItemReader 从 CSV 文件中读取数据。
import org.springframework.batch.item.file.FlatFileItemReader;import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;import org.springframework.batch.item.file.mapping.DefaultLineMapper;import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.core.io.ClassPathResource;@Configurationpublic class ReaderConfig {@Beanpublic FlatFileItemReader<Person> reader() {FlatFileItemReader<Person> reader = new FlatFileItemReader<>();reader.setResource(new ClassPathResource("persons.csv"));reader.setLineMapper(new DefaultLineMapper<Person>() {{setLineTokenizer(new DelimitedLineTokenizer() {{setNames(new String[]{"firstName", "lastName"});}});setFieldSetMapper(new BeanWrapperFieldSetMapper<Person>() {{setTargetType(Person.class);}});}});return reader;}}
创建一个简单的处理器,将姓氏转换为大写。
import org.springframework.batch.item.ItemProcessor;public class PersonItemProcessor implements ItemProcessor<Person, Person> {@Overridepublic Person process(Person person) throws Exception {String firstName = person.getFirstName();String lastName = person.getLastName().toUpperCase();return new Person(firstName, lastName);}}
使用 JdbcBatchItemWriter 将处理后的数据写入数据库。
import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;import org.springframework.batch.item.database.JdbcBatchItemWriter;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import javax.sql.DataSource;@Configurationpublic class WriterConfig {@Beanpublic JdbcBatchItemWriter<Person> writer(DataSource dataSource) {JdbcBatchItemWriter<Person> writer = new JdbcBatchItemWriter<>();writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>());writer.setSql("INSERT INTO persons (first_name, last_name) VALUES (:firstName, :lastName)");writer.setDataSource(dataSource);return writer;}}
import org.springframework.batch.core.Job;import org.springframework.batch.core.Step;import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;import org.springframework.batch.core.launch.support.RunIdIncrementer;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import javax.sql.DataSource;@Configuration@EnableBatchProcessingpublic class BatchConfig {@Autowiredpublic JobBuilderFactory jobBuilderFactory;@Autowiredpublic StepBuilderFactory stepBuilderFactory;@Autowiredpublic DataSource dataSource;@Beanpublic Step step1(ReaderConfig readerConfig, PersonItemProcessor processor, WriterConfig writerConfig) {return stepBuilderFactory.get("step1").<Person, Person> chunk(10).reader(readerConfig.reader()).processor(processor).writer(writerConfig.writer(dataSource)).build();}@Beanpublic Job importUserJob(Step step1) {return jobBuilderFactory.get("importUserJob").incrementer(new RunIdIncrementer()).flow(step1).end().build();}}
创建一个启动类来运行批处理作业。
import org.springframework.batch.core.Job;import org.springframework.batch.core.JobParameters;import org.springframework.batch.core.JobParametersBuilder;import org.springframework.batch.core.launch.JobLauncher;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.CommandLineRunner;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplicationpublic class BatchJobApplication implements CommandLineRunner {@AutowiredJobLauncher jobLauncher;@AutowiredJob importUserJob;public static void main(String[] args) {SpringApplication.run(BatchJobApplication.class, args);}@Overridepublic void run(String... args) throws Exception {JobParameters jobParameters = new JobParametersBuilder().addLong("time", System.currentTimeMillis()).toJobParameters();jobLauncher.run(importUserJob, jobParameters);}}
通过以上步骤,我们成功地在 Spring 框架中配置和定义了一个简单的批处理作业。首先,我们创建了实体类来表示要处理的数据;然后,使用 ItemReader 从 CSV 文件中读取数据,ItemProcessor 对数据进行处理,ItemWriter 将处理后的数据写入数据库;最后,定义了 Step 和 Job 并运行批处理作业。Spring Batch 提供了丰富的组件和配置选项,可以根据不同的业务需求进行灵活扩展和定制。希望本文能帮助你更好地理解和使用 Spring 批处理作业。