微信登录

作业配置 - 作业定义 - 定义批处理作业

Java - Web - Spring 《作业配置 - 作业定义 - 定义批处理作业》

引言

在 Java Web 开发中,Spring 框架为我们提供了强大的功能来处理各种任务,其中批处理作业是一项非常重要的功能。批处理作业通常用于处理大量数据、定时任务等场景,例如每日数据统计、定期清理日志等。本文将详细介绍如何在 Spring 框架中配置和定义批处理作业。

环境准备

在开始之前,我们需要确保已经搭建好 Spring Boot 开发环境。可以使用 Spring Initializr(https://start.spring.io/)来快速创建一个 Spring Boot 项目,添加以下依赖:

  • Spring Batch
  • Spring Boot Starter JDBC
  • H2 Database(用于测试)

定义批处理作业的基本概念

在 Spring Batch 中,一个批处理作业主要由以下几个核心组件组成:
| 组件名称 | 描述 |
| —— | —— |
| Job | 代表一个完整的批处理作业,由多个 Step 组成。 |
| Step | 作业的一个独立执行单元,包含读取数据(ItemReader)、处理数据(ItemProcessor)和写入数据(ItemWriter)三个主要步骤。 |
| ItemReader | 负责从数据源(如数据库、文件等)读取数据。 |
| ItemProcessor | 对读取的数据进行处理和转换。 |
| ItemWriter | 将处理后的数据写入目标数据源。 |

演示代码

1. 创建实体类

首先,我们创建一个简单的实体类 Person 来表示要处理的数据。

  1. public class Person {
  2. private String firstName;
  3. private String lastName;
  4. public Person() {}
  5. public Person(String firstName, String lastName) {
  6. this.firstName = firstName;
  7. this.lastName = lastName;
  8. }
  9. // Getters and Setters
  10. public String getFirstName() {
  11. return firstName;
  12. }
  13. public void setFirstName(String firstName) {
  14. this.firstName = firstName;
  15. }
  16. public String getLastName() {
  17. return lastName;
  18. }
  19. public void setLastName(String lastName) {
  20. this.lastName = lastName;
  21. }
  22. @Override
  23. public String toString() {
  24. return "Person{" +
  25. "firstName='" + firstName + '\'' +
  26. ", lastName='" + lastName + '\'' +
  27. '}';
  28. }
  29. }

2. 创建 ItemReader

我们使用 FlatFileItemReader 从 CSV 文件中读取数据。

  1. import org.springframework.batch.item.file.FlatFileItemReader;
  2. import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
  3. import org.springframework.batch.item.file.mapping.DefaultLineMapper;
  4. import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
  5. import org.springframework.context.annotation.Bean;
  6. import org.springframework.context.annotation.Configuration;
  7. import org.springframework.core.io.ClassPathResource;
  8. @Configuration
  9. public class ReaderConfig {
  10. @Bean
  11. public FlatFileItemReader<Person> reader() {
  12. FlatFileItemReader<Person> reader = new FlatFileItemReader<>();
  13. reader.setResource(new ClassPathResource("persons.csv"));
  14. reader.setLineMapper(new DefaultLineMapper<Person>() {{
  15. setLineTokenizer(new DelimitedLineTokenizer() {{
  16. setNames(new String[]{"firstName", "lastName"});
  17. }});
  18. setFieldSetMapper(new BeanWrapperFieldSetMapper<Person>() {{
  19. setTargetType(Person.class);
  20. }});
  21. }});
  22. return reader;
  23. }
  24. }

3. 创建 ItemProcessor

创建一个简单的处理器,将姓氏转换为大写。

  1. import org.springframework.batch.item.ItemProcessor;
  2. public class PersonItemProcessor implements ItemProcessor<Person, Person> {
  3. @Override
  4. public Person process(Person person) throws Exception {
  5. String firstName = person.getFirstName();
  6. String lastName = person.getLastName().toUpperCase();
  7. return new Person(firstName, lastName);
  8. }
  9. }

4. 创建 ItemWriter

使用 JdbcBatchItemWriter 将处理后的数据写入数据库。

  1. import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
  2. import org.springframework.batch.item.database.JdbcBatchItemWriter;
  3. import org.springframework.context.annotation.Bean;
  4. import org.springframework.context.annotation.Configuration;
  5. import javax.sql.DataSource;
  6. @Configuration
  7. public class WriterConfig {
  8. @Bean
  9. public JdbcBatchItemWriter<Person> writer(DataSource dataSource) {
  10. JdbcBatchItemWriter<Person> writer = new JdbcBatchItemWriter<>();
  11. writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>());
  12. writer.setSql("INSERT INTO persons (first_name, last_name) VALUES (:firstName, :lastName)");
  13. writer.setDataSource(dataSource);
  14. return writer;
  15. }
  16. }

5. 定义 Step 和 Job

  1. import org.springframework.batch.core.Job;
  2. import org.springframework.batch.core.Step;
  3. import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
  4. import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
  5. import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
  6. import org.springframework.batch.core.launch.support.RunIdIncrementer;
  7. import org.springframework.beans.factory.annotation.Autowired;
  8. import org.springframework.context.annotation.Bean;
  9. import org.springframework.context.annotation.Configuration;
  10. import javax.sql.DataSource;
  11. @Configuration
  12. @EnableBatchProcessing
  13. public class BatchConfig {
  14. @Autowired
  15. public JobBuilderFactory jobBuilderFactory;
  16. @Autowired
  17. public StepBuilderFactory stepBuilderFactory;
  18. @Autowired
  19. public DataSource dataSource;
  20. @Bean
  21. public Step step1(ReaderConfig readerConfig, PersonItemProcessor processor, WriterConfig writerConfig) {
  22. return stepBuilderFactory.get("step1")
  23. .<Person, Person> chunk(10)
  24. .reader(readerConfig.reader())
  25. .processor(processor)
  26. .writer(writerConfig.writer(dataSource))
  27. .build();
  28. }
  29. @Bean
  30. public Job importUserJob(Step step1) {
  31. return jobBuilderFactory.get("importUserJob")
  32. .incrementer(new RunIdIncrementer())
  33. .flow(step1)
  34. .end()
  35. .build();
  36. }
  37. }

6. 运行批处理作业

创建一个启动类来运行批处理作业。

  1. import org.springframework.batch.core.Job;
  2. import org.springframework.batch.core.JobParameters;
  3. import org.springframework.batch.core.JobParametersBuilder;
  4. import org.springframework.batch.core.launch.JobLauncher;
  5. import org.springframework.beans.factory.annotation.Autowired;
  6. import org.springframework.boot.CommandLineRunner;
  7. import org.springframework.boot.SpringApplication;
  8. import org.springframework.boot.autoconfigure.SpringBootApplication;
  9. @SpringBootApplication
  10. public class BatchJobApplication implements CommandLineRunner {
  11. @Autowired
  12. JobLauncher jobLauncher;
  13. @Autowired
  14. Job importUserJob;
  15. public static void main(String[] args) {
  16. SpringApplication.run(BatchJobApplication.class, args);
  17. }
  18. @Override
  19. public void run(String... args) throws Exception {
  20. JobParameters jobParameters = new JobParametersBuilder()
  21. .addLong("time", System.currentTimeMillis())
  22. .toJobParameters();
  23. jobLauncher.run(importUserJob, jobParameters);
  24. }
  25. }

总结

通过以上步骤,我们成功地在 Spring 框架中配置和定义了一个简单的批处理作业。首先,我们创建了实体类来表示要处理的数据;然后,使用 ItemReader 从 CSV 文件中读取数据,ItemProcessor 对数据进行处理,ItemWriter 将处理后的数据写入数据库;最后,定义了 StepJob 并运行批处理作业。Spring Batch 提供了丰富的组件和配置选项,可以根据不同的业务需求进行灵活扩展和定制。希望本文能帮助你更好地理解和使用 Spring 批处理作业。