微信登录

批处理框架 - 核心组件 - 作业、步骤等组件

Java - Web - Spring 《批处理框架 - 核心组件 - 作业、步骤等组件》

在 Java Web 开发中,Spring Batch 是一个强大的批处理框架,它能够高效地处理大量数据。Spring Batch 提供了一系列核心组件,如作业(Job)、步骤(Step)等,这些组件协同工作,使得批处理任务的开发变得更加简单和高效。本文将详细介绍 Spring Batch 的核心组件,并通过示例代码演示如何使用这些组件。

核心组件概述

作业(Job)

作业是 Spring Batch 中的最高级别抽象,它代表了一个完整的批处理任务。一个作业可以包含一个或多个步骤,并且可以配置作业的启动、停止、重试等行为。作业通常由作业存储库(JobRepository)来管理,作业存储库负责记录作业的执行状态和元数据。

步骤(Step)

步骤是作业的一个独立执行单元,它定义了批处理任务中的一个具体处理步骤。每个步骤可以包含一个或多个任务,如读取数据、处理数据、写入数据等。步骤由步骤执行器(StepExecutor)来执行,并且可以配置步骤的执行顺序、重试策略等。

读取器(ItemReader)

读取器负责从数据源中读取数据,如数据库、文件、消息队列等。Spring Batch 提供了多种内置的读取器,如 JdbcCursorItemReaderFlatFileItemReader 等,同时也支持自定义读取器。

处理器(ItemProcessor)

处理器负责对读取器读取的数据进行处理,如数据转换、数据验证、数据过滤等。处理器可以是一个简单的 Java 类,实现 ItemProcessor 接口。

写入器(ItemWriter)

写入器负责将处理器处理后的数据写入目标数据源,如数据库、文件、消息队列等。Spring Batch 提供了多种内置的写入器,如 JdbcBatchItemWriterFlatFileItemWriter 等,同时也支持自定义写入器。

演示代码

1. 添加依赖

首先,在 pom.xml 中添加 Spring Batch 和 Spring Boot 的依赖:

  1. <dependencies>
  2. <dependency>
  3. <groupId>org.springframework.boot</groupId>
  4. <artifactId>spring-boot-starter-batch</artifactId>
  5. </dependency>
  6. <dependency>
  7. <groupId>com.h2database</groupId>
  8. <artifactId>h2</artifactId>
  9. <scope>runtime</scope>
  10. </dependency>
  11. </dependencies>

2. 定义实体类

创建一个简单的实体类 Person,用于表示批处理的数据:

  1. public class Person {
  2. private String firstName;
  3. private String lastName;
  4. public Person() {}
  5. public Person(String firstName, String lastName) {
  6. this.firstName = firstName;
  7. this.lastName = lastName;
  8. }
  9. // Getters and Setters
  10. public String getFirstName() {
  11. return firstName;
  12. }
  13. public void setFirstName(String firstName) {
  14. this.firstName = firstName;
  15. }
  16. public String getLastName() {
  17. return lastName;
  18. }
  19. public void setLastName(String lastName) {
  20. this.lastName = lastName;
  21. }
  22. @Override
  23. public String toString() {
  24. return "Person [firstName=" + firstName + ", lastName=" + lastName + "]";
  25. }
  26. }

3. 定义读取器

创建一个 FlatFileItemReader 来读取 CSV 文件:

  1. import org.springframework.batch.item.file.FlatFileItemReader;
  2. import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
  3. import org.springframework.batch.item.file.mapping.DefaultLineMapper;
  4. import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
  5. import org.springframework.context.annotation.Bean;
  6. import org.springframework.context.annotation.Configuration;
  7. import org.springframework.core.io.ClassPathResource;
  8. @Configuration
  9. public class ReaderConfig {
  10. @Bean
  11. public FlatFileItemReader<Person> reader() {
  12. FlatFileItemReader<Person> reader = new FlatFileItemReader<>();
  13. reader.setResource(new ClassPathResource("persons.csv"));
  14. reader.setLineMapper(new DefaultLineMapper<Person>() {{
  15. setLineTokenizer(new DelimitedLineTokenizer() {{
  16. setNames(new String[] { "firstName", "lastName" });
  17. }});
  18. setFieldSetMapper(new BeanWrapperFieldSetMapper<Person>() {{
  19. setTargetType(Person.class);
  20. }});
  21. }});
  22. return reader;
  23. }
  24. }

4. 定义处理器

创建一个 ItemProcessor 来处理读取的数据:

  1. import org.springframework.batch.item.ItemProcessor;
  2. public class PersonItemProcessor implements ItemProcessor<Person, Person> {
  3. @Override
  4. public Person process(final Person person) throws Exception {
  5. final String firstName = person.getFirstName().toUpperCase();
  6. final String lastName = person.getLastName().toUpperCase();
  7. return new Person(firstName, lastName);
  8. }
  9. }

5. 定义写入器

创建一个 JdbcBatchItemWriter 来将处理后的数据写入数据库:

  1. import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
  2. import org.springframework.batch.item.database.JdbcBatchItemWriter;
  3. import org.springframework.context.annotation.Bean;
  4. import org.springframework.context.annotation.Configuration;
  5. import javax.sql.DataSource;
  6. @Configuration
  7. public class WriterConfig {
  8. @Bean
  9. public JdbcBatchItemWriter<Person> writer(DataSource dataSource) {
  10. JdbcBatchItemWriter<Person> writer = new JdbcBatchItemWriter<>();
  11. writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>());
  12. writer.setSql("INSERT INTO persons (first_name, last_name) VALUES (:firstName, :lastName)");
  13. writer.setDataSource(dataSource);
  14. return writer;
  15. }
  16. }

6. 定义步骤和作业

创建一个步骤和作业,并将读取器、处理器和写入器组合在一起:

  1. import org.springframework.batch.core.Job;
  2. import org.springframework.batch.core.Step;
  3. import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
  4. import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
  5. import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
  6. import org.springframework.beans.factory.annotation.Autowired;
  7. import org.springframework.context.annotation.Bean;
  8. import org.springframework.context.annotation.Configuration;
  9. import javax.sql.DataSource;
  10. @Configuration
  11. @EnableBatchProcessing
  12. public class BatchConfig {
  13. @Autowired
  14. public JobBuilderFactory jobBuilderFactory;
  15. @Autowired
  16. public StepBuilderFactory stepBuilderFactory;
  17. @Autowired
  18. private ReaderConfig readerConfig;
  19. @Autowired
  20. private WriterConfig writerConfig;
  21. @Bean
  22. public Step step1(DataSource dataSource) {
  23. return stepBuilderFactory.get("step1")
  24. .<Person, Person> chunk(10)
  25. .reader(readerConfig.reader())
  26. .processor(new PersonItemProcessor())
  27. .writer(writerConfig.writer(dataSource))
  28. .build();
  29. }
  30. @Bean
  31. public Job importUserJob(Step step1) {
  32. return jobBuilderFactory.get("importUserJob")
  33. .flow(step1)
  34. .end()
  35. .build();
  36. }
  37. }

7. 运行批处理任务

创建一个 Spring Boot 应用程序,并运行批处理任务:

  1. import org.springframework.boot.CommandLineRunner;
  2. import org.springframework.boot.SpringApplication;
  3. import org.springframework.boot.autoconfigure.SpringBootApplication;
  4. import org.springframework.context.annotation.Bean;
  5. import org.springframework.jdbc.core.JdbcTemplate;
  6. import javax.sql.DataSource;
  7. @SpringBootApplication
  8. public class BatchApplication {
  9. public static void main(String[] args) {
  10. SpringApplication.run(BatchApplication.class, args);
  11. }
  12. @Bean
  13. public CommandLineRunner run(DataSource dataSource) throws Exception {
  14. return args -> {
  15. JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
  16. jdbcTemplate.execute("CREATE TABLE IF NOT EXISTS persons (id INT AUTO_INCREMENT PRIMARY KEY, first_name VARCHAR(255), last_name VARCHAR(255))");
  17. };
  18. }
  19. }

总结

通过以上示例代码,我们演示了如何使用 Spring Batch 的核心组件来创建一个简单的批处理任务。下面是一个表格总结这些核心组件的作用:

组件名称 作用
作业(Job) 代表一个完整的批处理任务,由一个或多个步骤组成
步骤(Step) 作业的独立执行单元,包含读取、处理和写入数据的逻辑
读取器(ItemReader) 从数据源中读取数据
处理器(ItemProcessor) 对读取的数据进行处理
写入器(ItemWriter) 将处理后的数据写入目标数据源

Spring Batch 的核心组件提供了强大而灵活的功能,使得批处理任务的开发变得更加简单和高效。希望本文能够帮助你更好地理解和使用 Spring Batch。