Spring Batch Partitioning分区处理

学习使用Spring Batch分区使用多个线程来处理Spring Boot应用程序中的一系列数据集任务。

1.并行处理和分区

1.1 并行处理

大多数批处理问题可以使用单线程解决,但很少有复杂的场景,例如单线程处理需要很长时间执行任务,需要并行处理,可以使用多线程模型来实现。
在非常高的层次上分类,有两种并行处理模式:

  • 单进程,多线程
  • 多进程

这些也分为以下几类:

  • 多线程Step(单进程)
  • 并行Step(单进程)
  • Step(多进程)的远程分块
  • 分区Step(单个或多个进程)

1.2. Spring Batch 处理中的分区

分区使用多个线程来处理一系列数据集。可以通过编程方式定义数据集的范围。在用例中,我们要创建多少个线程以在分区中使用。线程数完全基于需求。

Spring Batch 默认是单线程的。为了进行并行处理,我们需要对批处理作业的步骤进行分区。
当我们要从源系统读取数百万条记录并且我们不能仅仅依靠单个线程来处理所有记录时,分区很有用。我们希望使用多个线程来读取和处理数据以有效地使用系统资源。

分区任务处理模型
分区任务处理模型

2. Spring Batch 分区案例

在本教程中,我们将从一个表中读取一些数据并将其写入另一个表。我们可以在 DB 中创建数百万条记录,以体验使用单线程批处理时的处理时间。在这里,我创建了一些程序/概念的工作来了解原理。

2.1 Maven 依赖项

截至当前,我们使用了最新版本的 Spring Boot,并通过添加 Spring Batch 依赖项,它会自动拉取最新版本。

$title('pom.xml')
<dependencies>
  <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-batch</artifactId>
  </dependency>
 
  <dependency>
    <groupId>com.h2database</groupId>
    <artifactId>h2</artifactId>
    <scope>runtime</scope>
  </dependency>
 
  <dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <version>1.18.2</version>
    <optional>true</optional>
  </dependency>
</dependencies>

2.2. 分区

PartitionerExecutionContext是中央策略接口,用于以实例的形式为分区步骤创建输入参数。通常的目标是创建一组不同的输入值,例如一组不重叠的主键范围,或一组唯一的文件名。

在这里,我们查询表获取MAXMINid 值(假设它们id是连续的),并基于此在所有记录之间创建分区。
对于分区器,我们使用了gridSize = number of threads. 根据您的需求使用您自己的自定义值。

import java.util.HashMap;
import java.util.Map;
import javax.sql.DataSource;
import org.springframework.batch.core.partition.support.Partitioner;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.jdbc.core.JdbcOperations;
import org.springframework.jdbc.core.JdbcTemplate;
 
public class ColumnRangePartitioner implements Partitioner 
{
  private JdbcOperations jdbcTemplate;
  private String table;
  private String column;
 
  public void setTable(String table) {
    this.table = table;
  }
 
  public void setColumn(String column) {
    this.column = column;
  }
 
  public void setDataSource(DataSource dataSource) {
    jdbcTemplate = new JdbcTemplate(dataSource);
  }
 
  @Override
  public Map<String, ExecutionContext> partition(int gridSize) 
  {
    int min = jdbcTemplate.queryForObject("SELECT MIN(" + column + ") FROM " + table, Integer.class);
 
    int max = jdbcTemplate.queryForObject("SELECT MAX(" + column + ") FROM " + table, Integer.class);
 
    int targetSize = (max - min) / gridSize + 1;
 
    Map<String, ExecutionContext> result = new HashMap<>();
 
    int number = 0;
    int start = min;
    int end = start + targetSize - 1;
     
    while (start <= max) 
    {
      ExecutionContext value = new ExecutionContext();
      result.put("partition" + number, value);
       
      if(end >= max) {
        end = max;
      }
       
      value.putInt("minValue", start);
      value.putInt("maxValue", end);
 
      start += targetSize;
      end += targetSize;
 
      number++;
    }
    return result;
  }
}

 

2.3. 任务配置

这是任务配置类,我们在其中创建执行任务所需的 bean。在这个例子中,我们使用了接口SimpleAsyncTaskExecutor的最简单的多线程实现。TaskExecutor
我们在 中使用 partitionerStep为远程(或本地)步骤创建分区步骤构建器。
将这些用于“读取、处理和写入”Step的每个数据块完全发生在不同的线程中。因此,处理的记录可能与输入它的顺序不同。

以下是要查看的内容-

  • 当任务执行器由某个线程池支持时,对任务执行器施加的节流限制。此限制默认为 4,但可以进行不同的配置。
  • 并发限制可能来自 中使用的资源Step,例如使用的DataSource
  • ColumnRangePartitionerExecutionContext – 用于以实例形式为分区步骤创建输入参数的中央策略界面。
  • JdbcPagingItemReader – 此 bean 使用分页读取数据并接受 minValue 和 maxValue 根据范围接受以仅获取这些数据。这里我们将 FetchSize 设置为 1000,但是您可以使用任何值并使其可以从属性文件中进行配置。
  • JdbcBatchItemWriter – 此 bean 将数据写入另一个表。
  • 步骤– 这是批处理任务中的步骤配置。这是读取数据并将其写入 XML 和 JSON 格式
  • Job – 表示任务的批处理域对象
import java.util.HashMap;
import java.util.Map;
 
import javax.sql.DataSource;
 
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.batch.item.database.Order;
import org.springframework.batch.item.database.support.MySqlPagingQueryProvider;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
 
import com.example.domain.Customer;
import com.example.mapper.CustomerRowMapper;
 
@Configuration
public class JobConfiguration {
  @Autowired
  private JobBuilderFactory jobBuilderFactory;
 
  @Autowired
  private StepBuilderFactory stepBuilderFactory;
 
  @Autowired
  private DataSource dataSource;
 
  @Bean
  public ColumnRangePartitioner partitioner() 
  {
    ColumnRangePartitioner columnRangePartitioner = new ColumnRangePartitioner();
    columnRangePartitioner.setColumn("id");
    columnRangePartitioner.setDataSource(dataSource);
    columnRangePartitioner.setTable("customer");
    return columnRangePartitioner;
  }
 
  @Bean
  @StepScope
  public JdbcPagingItemReader<Customer> pagingItemReader(
      @Value("#{stepExecutionContext['minValue']}") Long minValue,
      @Value("#{stepExecutionContext['maxValue']}") Long maxValue) 
  {
    System.out.println("reading " + minValue + " to " + maxValue);
 
    Map<String, Order> sortKeys = new HashMap<>();
    sortKeys.put("id", Order.ASCENDING);
     
    MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider();
    queryProvider.setSelectClause("id, firstName, lastName, birthdate");
    queryProvider.setFromClause("from customer");
    queryProvider.setWhereClause("where id >= " + minValue + " and id < " + maxValue);
    queryProvider.setSortKeys(sortKeys);
     
    JdbcPagingItemReader<Customer> reader = new JdbcPagingItemReader<>();
    reader.setDataSource(this.dataSource);
    reader.setFetchSize(1000);
    reader.setRowMapper(new CustomerRowMapper());
    reader.setQueryProvider(queryProvider);
     
    return reader;
  }
   
   
  @Bean
  @StepScope
  public JdbcBatchItemWriter<Customer> customerItemWriter()
  {
    JdbcBatchItemWriter<Customer> itemWriter = new JdbcBatchItemWriter<>();
    itemWriter.setDataSource(dataSource);
    itemWriter.setSql("INSERT INTO NEW_CUSTOMER VALUES (:id, :firstName, :lastName, :birthdate)");
 
    itemWriter.setItemSqlParameterSourceProvider
      (new BeanPropertyItemSqlParameterSourceProvider<>());
    itemWriter.afterPropertiesSet();
     
    return itemWriter;
  }
   
  // Master
  @Bean
  public Step step1() 
  {
    return stepBuilderFactory.get("step1")
        .partitioner(slaveStep().getName(), partitioner())
        .step(slaveStep())
        .gridSize(4)
        .taskExecutor(new SimpleAsyncTaskExecutor())
        .build();
  }
   
  // slave step
  @Bean
  public Step slaveStep() 
  {
    return stepBuilderFactory.get("slaveStep")
        .<Customer, Customer>chunk(1000)
        .reader(pagingItemReader(null, null))
        .writer(customerItemWriter())
        .build();
  }
   
  @Bean
  public Job job() 
  {
    return jobBuilderFactory.get("job")
        .start(step1())
        .build();
  }
}

2.4 Entity和Mapper类

这是一个商业模式类。

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
 
@Data
@AllArgsConstructor
@Builder
@NoArgsConstructor
public class Customer 
{
  private Long id;
  private String firstName;
  private String lastName;
  private String birthdate;
}

CustomerRowMapper类用于将结果集映射到Customer域对象

import java.sql.ResultSet;
import java.sql.SQLException;
import org.springframework.jdbc.core.RowMapper;
import com.howtodoinjava.batch.decorator.model.Customer;
 
public class CustomerRowMapper implements RowMapper<Customer> {
 
  @Override
  public Customer mapRow(ResultSet rs, int rowNum) throws SQLException {
    return Customer.builder()
          .id(rs.getLong("id"))
          .firstName(rs.getString("firstName"))
          .lastName(rs.getString("lastName"))
          .birthdate(rs.getString("birthdate"))
          .build();
  }

2.5. application.properties

用于创建与 MySQL 数据库的数据库连接的配置。

application.properties
spring.datasource.url=jdbc:h2:mem:test
spring.datasource.driverClassName=org.h2.Driver
spring.datasource.username=sa
spring.datasource.password=
spring.jpa.database-platform=org.hibernate.dialect.H2Dialect
 
#Prevents running the job during application context creation 
spring.batch.job.enabled=false

2.6. JDBC 配置和模式文件

这些是模式和 SQL 数据文件。

CREATE TABLE customer (
  id INT PRIMARY KEY,
  firstName VARCHAR(255) NULL,
  lastName VARCHAR(255) NULL,
  birthdate VARCHAR(255) NULL
);
 
CREATE TABLE new_customer (
  id INT PRIMARY KEY,
  firstName VARCHAR(255) NULL,
  lastName VARCHAR(255) NULL,
  birthdate VARCHAR(255) NULL
);

 

INSERT INTO customer VALUES ('1', 'John', 'Doe', '10-10-1952 10:10:10');
INSERT INTO customer VALUES ('2', 'Amy', 'Eugene', '05-07-1985 17:10:00');
INSERT INTO customer VALUES ('3', 'Laverne', 'Mann', '11-12-1988 10:10:10');
INSERT INTO customer VALUES ('4', 'Janice', 'Preston', '19-02-1960 10:10:10');
INSERT INTO customer VALUES ('5', 'Pauline', 'Rios', '29-08-1977 10:10:10');
INSERT INTO customer VALUES ('6', 'Perry', 'Burnside', '10-03-1981 10:10:10');
INSERT INTO customer VALUES ('7', 'Todd', 'Kinsey', '14-12-1998 10:10:10');
INSERT INTO customer VALUES ('8', 'Jacqueline', 'Hyde', '20-03-1983 10:10:10');
INSERT INTO customer VALUES ('9', 'Rico', 'Hale', '10-10-2000 10:10:10');
INSERT INTO customer VALUES ('10', 'Samuel', 'Lamm', '11-11-1999 10:10:10');
INSERT INTO customer VALUES ('11', 'Robert', 'Coster', '10-10-1972 10:10:10');
INSERT INTO customer VALUES ('12', 'Tamara', 'Soler', '02-01-1978 10:10:10');
INSERT INTO customer VALUES ('13', 'Justin', 'Kramer', '19-11-1951 10:10:10');
INSERT INTO customer VALUES ('14', 'Andrea', 'Law', '14-10-1959 10:10:10');
INSERT INTO customer VALUES ('15', 'Laura', 'Porter', '12-12-2010 10:10:10');
INSERT INTO customer VALUES ('16', 'Michael', 'Cantu', '11-04-1999 10:10:10');
INSERT INTO customer VALUES ('17', 'Andrew', 'Thomas', '04-05-1967 10:10:10');
INSERT INTO customer VALUES ('18', 'Jose', 'Hannah', '16-09-1950 10:10:10');
INSERT INTO customer VALUES ('19', 'Valerie', 'Hilbert', '13-06-1966 10:10:10');
INSERT INTO customer VALUES ('20', 'Patrick', 'Durham', '12-10-1978 10:10:10');

2.7. 演示

将应用程序作为 Spring 引导应用程序运行。

import java.util.Date;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
 
@SpringBootApplication
@EnableBatchProcessing
public class LocalPartitioningApplication implements CommandLineRunner
{
  @Autowired
  private JobLauncher jobLauncher;
 
  @Autowired
  private Job job;
   
  public static void main(String[] args) {
    SpringApplication.run(LocalPartitioningApplication.class, args);
  }
 
  @Override
  public void run(String... args) throws Exception {
    JobParameters jobParameters = new JobParametersBuilder()
                .addString("JobId", String.valueOf(System.currentTimeMillis()))
        .addDate("date", new Date())
                .addLong("time",System.currentTimeMillis()).toJobParameters();
     
    JobExecution execution = jobLauncher.run(job, jobParameters);
 
    System.out.println("STATUS :: "+execution.getStatus());
  }
}

应用程序将使用我们创建的分区从一个数据库中读取数据并将其写入另一个表。
执行日志:

2019-12-13 15:03:42.408   ---  c.example.LocalPartitioningApplication   : Started LocalPartitioningApplication 
in 3.504 seconds (JVM running for 4.877)
 
2019-12-13 15:03:42.523   ---  o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=job]] 
launched with the following parameters: [{JobId=1576229622410, date=1576229622410, time=1576229622410}]
 
2019-12-13 15:03:42.603   ---  o.s.batch.core.job.SimpleStepHandler     : Executing step: [step1]
 
reading 1 to 5
reading 11 to 15
reading 16 to 20
reading 6 to 10
 
2019-12-13 15:03:42.890   --- [cTaskExecutor-2] o.s.batch.core.step.AbstractStep         : Step: [slaveStep:partition0] executed in 173ms
2019-12-13 15:03:42.895   --- [cTaskExecutor-1] o.s.batch.core.step.AbstractStep         : Step: [slaveStep:partition3] executed in 178ms
2019-12-13 15:03:42.895   --- [cTaskExecutor-3] o.s.batch.core.step.AbstractStep         : Step: [slaveStep:partition1] executed in 177ms
2019-12-13 15:03:42.901   --- [cTaskExecutor-4] o.s.batch.core.step.AbstractStep         : Step: [slaveStep:partition2] executed in 182ms
 
2019-12-13 15:03:42.917   ---  o.s.batch.core.step.AbstractStep         : Step: [step1] executed in 314ms
 
2019-12-13 15:03:42.942   ---  o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=job]] completed 
with the following parameters: [{JobId=1576229622410, date=1576229622410, time=1576229622410}] 
and the following status: [COMPLETED] in 374ms
 
STATUS :: COMPLETED

三、总结

在这个Spring Boot Batch 处理理步骤分区器教程中,我们学习了使用分区来使用多个线程处理批量数据。它使应用程序能够充分利用机器硬件和操作系统功能的潜力。

案例中的源码下载:项目源码(访问密码:9987)

 

原创文章,作者:kirin,如若转载,请注明出处:https://blog.ytso.com/243943.html

(0)
上一篇 2022年4月11日
下一篇 2022年4月11日

相关推荐

发表回复

登录后才能评论