1 场景说明
读取CVS文件,经过处理后,保存到数据库。
2 项目结构
应用程序 |
启动主程序 |
DemoApplication.java |
读取文件(输入文件) |
UserItemReader.java |
|
处理数据 |
UserItemProcess.java |
|
输出文件 |
UserItemWriter.java |
|
调度批作业 |
定时处理配置 |
QuartzConfiguration.java |
定时调度 |
QuartzJobLauncher.java |
|
辅助文件 |
数据文件 |
User.txt |
对象实体(传递对象) |
User.java |
|
Meaven配置文件 |
Pom.xml |
2.1 Pom.xml
<?xml version=”1.0″ encoding=”UTF-8″?> <project xmlns=”http://maven.apache.org/POM/4.0.0″ xmlns:xsi=”http://www.w3.org/2001/XMLSchema-instance” xsi:schemaLocation=”http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd”> <modelVersion>4.0.0</modelVersion>
<groupId>com.zy</groupId> <artifactId>SpringBatchDemo1</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging>
<name>SpringBatchDemo1</name> <description>Demo project for Spring Boot</description>
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.5.10.RELEASE</version> <relativePath /> <!– lookup parent from repository –> </parent>
<properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <java.version>1.8</java.version> </properties>
<dependencies> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context-support</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-batch</artifactId> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-oxm</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> <!– <dependency> <groupId>com.h3database</groupId> <artifactId>h3</artifactId> <scope>runtime</scope> </dependency> –> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <scope>runtime</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.batch</groupId> <artifactId>spring-batch-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> <dependency> <groupId>org.quartz-scheduler</groupId> <artifactId>quartz</artifactId> <version>2.3.0</version> </dependency> <dependency> <groupId>com.h3database</groupId> <artifactId>h3</artifactId> <scope>runtime</scope> </dependency> </dependencies>
<build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build>
</project> |
2.2 User.java
package com.zy.model;
public class User { private String id; private String name; private String age;
public User(String id, String name, String age) { this.id = id; this.name = name; this.age = age; }
public String getId() { return id; }
public void setId(String id) { this.id = id; }
public String getName() { return name; }
public void setName(String name) { this.name = name; }
public String getAge() { return age; }
public void setAge(String age) { this.age = age; }
@Override public String toString() { return “User [id=” + id + “, name=” + name + “, age=” + age + “]”; }
} |
2.3 UserItemReader.java
package com.zy.reader;
import org.springframework.batch.item.file.FlatFileItemReader; import org.springframework.batch.item.file.LineMapper; import org.springframework.batch.item.file.mapping.DefaultLineMapper; import org.springframework.batch.item.file.mapping.FieldSetMapper; import org.springframework.batch.item.file.transform.DelimitedLineTokenizer; import org.springframework.batch.item.file.transform.FieldSet; import org.springframework.batch.item.file.transform.LineTokenizer; import org.springframework.core.io.ClassPathResource; import org.springframework.validation.BindException;
import com.zy.model.User; //从user.txt文件中读取信息到User public class UserItemReader extends FlatFileItemReader<User> { public UserItemReader(){ createReader(); }
private void createReader(){ this.setResource(new ClassPathResource(“data/User.txt”)); this.setLinesToSkip(1); this.setLineMapper(userLineMapper()); }
private LineMapper<User> userLineMapper(){ DefaultLineMapper<User> lineMapper = new DefaultLineMapper<>(); lineMapper.setLineTokenizer(userLineTokenizer()); lineMapper.setFieldSetMapper(new UserFieldStepMapper()); lineMapper.afterPropertiesSet(); return lineMapper; }
private LineTokenizer userLineTokenizer(){ DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer(); tokenizer.setNames(new String[]{“ID”, “NAME”, “AGE”}); return tokenizer; }
private static class UserFieldStepMapper implements FieldSetMapper<User>{ @Override public User mapFieldSet(FieldSet fieldSet) throws BindException { return new User(fieldSet.readString(“ID”), fieldSet.readString(“NAME”), fieldSet.readString(“AGE”)); }
}
} |
2.4 User.txt
ID,NAME,AGE 1,zy,28 2,tom,20 3,terry,30 4,lerry,18 5,bob,25 6,linda,27 7,marry,39 8,long,22 9,kin,33 10,王五,40 |
2.5 UserItemProcessor.java
package com.zy.processor; import org.springframework.batch.item.ItemProcessor; import com.zy.model.User;
public class UserItemProcessor implements ItemProcessor<User, User> {
@Override public User process(User item) throws Exception { if (Integer.parseInt(item.getAge()) > 20) {
return item; } return null; }
} |
2.6 UserItemWriter.java
package com.zy.writer; import java.util.List; import org.springframework.batch.item.ItemWriter; import com.zy.model.User;
public class UserItemWriter implements ItemWriter<User> {
@Override public void write(List<? extends User> items) throws Exception { for(User user : items){ System.out.println(user); } }
} |
2.7 QuartzJobLauncher
package com.zy.QuartzConfiguration;
import java.text.SimpleDateFormat; import java.util.Date; import org.quartz.JobDataMap; import org.quartz.JobDetail; import org.quartz.JobExecutionContext; import org.quartz.JobExecutionException; import org.quartz.JobKey; import org.springframework.batch.core.Job; import org.springframework.batch.core.JobExecution; import org.springframework.batch.core.JobParameters; import org.springframework.batch.core.configuration.JobLocator; import org.springframework.batch.core.launch.JobLauncher; import org.springframework.scheduling.quartz.QuartzJobBean;
public class QuartzJobLauncher extends QuartzJobBean { @Override protected void executeInternal(JobExecutionContext context) throws JobExecutionException {
JobDetail jobDetail = context.getJobDetail(); JobDataMap jobDataMap = jobDetail.getJobDataMap(); String jobName = jobDataMap.getString(“jobName”); JobLauncher jobLauncher = (JobLauncher) jobDataMap.get(“jobLauncher”); JobLocator jobLocator = (JobLocator) jobDataMap.get(“jobLocator”); System.out.println(“jobName : ” + jobName); System.out.println(“jobLauncher : ” + jobLauncher); System.out.println(“jobLocator : ” + jobLocator); JobKey key = context.getJobDetail().getKey(); System.out.println(key.getName() + ” : ” + key.getGroup()); SimpleDateFormat sf = new SimpleDateFormat(“yyyy-MM-dd HH:mm:ss”); System.out.println(“Current time : ” + sf.format(new Date()));
try { Job job = jobLocator.getJob(jobName); JobExecution jobExecution = jobLauncher.run(job, new JobParameters()); } catch (Exception e) { e.printStackTrace(); }
}
} |
2.8 QuartzConfiguration
package com.zy.QuartzConfiguration;
import java.util.HashMap; import java.util.Map;
import org.springframework.batch.core.configuration.JobLocator; import org.springframework.batch.core.configuration.JobRegistry; import org.springframework.batch.core.configuration.support.JobRegistryBeanPostProcessor; import org.springframework.batch.core.launch.JobLauncher; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.quartz.CronTriggerFactoryBean; import org.springframework.scheduling.quartz.JobDetailFactoryBean; import org.springframework.scheduling.quartz.SchedulerFactoryBean;
@Configuration public class QuartzConfiguration {
//自动注入进来的是SimpleJobLauncher @Autowired private JobLauncher jobLauncher;
@Autowired private JobLocator jobLocator;
/*用来注册job*/ /*JobRegistry会自动注入进来*/ @Bean public JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor(JobRegistry jobRegistry){ JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor = new JobRegistryBeanPostProcessor(); jobRegistryBeanPostProcessor.setJobRegistry(jobRegistry); return jobRegistryBeanPostProcessor; }
@Bean public JobDetailFactoryBean jobDetailFactoryBean(){ JobDetailFactoryBean jobFactory = new JobDetailFactoryBean(); jobFactory.setJobClass(QuartzJobLauncher.class); jobFactory.setGroup(“my_group”); jobFactory.setName(“my_job”); Map<String, Object> map = new HashMap<>(); map.put(“jobName”, “zyJob”); map.put(“jobLauncher”, jobLauncher); map.put(“jobLocator”, jobLocator); jobFactory.setJobDataAsMap(map); return jobFactory; }
@Bean public CronTriggerFactoryBean cronTriggerFactoryBean(){ CronTriggerFactoryBean cTrigger = new CronTriggerFactoryBean(); System.out.println(“——- : ” + jobDetailFactoryBean().getObject()); cTrigger.setJobDetail(jobDetailFactoryBean().getObject()); cTrigger.setStartDelay(3000); cTrigger.setName(“my_trigger”); cTrigger.setGroup(“trigger_group”); cTrigger.setCronExpression(“0/3 * * * * ? “); //每间隔3s触发一次Job任务 return cTrigger; }
@Bean public SchedulerFactoryBean schedulerFactoryBean(){ SchedulerFactoryBean schedulerFactor = new SchedulerFactoryBean(); schedulerFactor.setTriggers(cronTriggerFactoryBean().getObject()); return schedulerFactor; }
} |
2.9 BatchConfiguration
package com.zy.config; import org.springframework.batch.core.Job; import org.springframework.batch.core.Step; import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Import; import com.zy.QuartzConfiguration.QuartzConfiguration; import com.zy.model.User; import com.zy.processor.UserItemProcessor; import com.zy.reader.UserItemReader; import com.zy.writer.UserItemWriter;
@Configuration @EnableBatchProcessing //@Import({QuartzConfiguration.class}) public class BatchConfiguration {
@Autowired public JobBuilderFactory jobBuilderFactory; @Autowired public StepBuilderFactory stepBuilderFactory;
/*创建job*/ @Bean public Job jobMethod(){ return jobBuilderFactory.get(“zyJob”) .start(stepMethod()) .build(); }
/*创建step*/ @Bean public Step stepMethod(){ return stepBuilderFactory.get(“myStep1”) .<User, User>chunk(3) .reader(new UserItemReader()) .processor(new UserItemProcessor()) .writer(new UserItemWriter()) .allowStartIfComplete(true) .build(); }
} |
3 执行Job输出结果
2019-04-30 21:31:48.049 INFO 9344 — [ryBean_Worker-5] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=zyJob]] completed with the following parameters: [{}] and the following status: [COMPLETED] jobName : zyJob jobLauncher : org.springframework.batch.core.launch.support.SimpleJobLauncher@2d27244d jobLocator : org.springframework.batch.core.configuration.support.MapJobRegistry@6fc00b5 my_job : my_group Current time : 2019-04-30 21:31:51 2019-04-30 21:31:51.012 INFO 9344 — [ryBean_Worker-6] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=zyJob]] launched with the following parameters: [{}] 2019-04-30 21:31:51.028 INFO 9344 — [ryBean_Worker-6] o.s.batch.core.job.SimpleStepHandler : Executing step: [myStep1] User [id=1, name=zy, age=28] User [id=3, name=terry, age=30] User [id=5, name=bob, age=25] User [id=6, name=linda, age=27] User [id=7, name=marry, age=39] User [id=8, name=long, age=22] User [id=9, name=kin, age=33] User [id=10, name=ww, age=40] |
4 概念总结
Job Repository |
作业仓库,负责Job,Step执行过程中的状态保存。 |
|
Job Launcher |
作业调度器,提供执行Job的入口 |
|
Job |
作业,多个Step组成,封装整个批处理操作。 |
|
Step |
作业步,Job的一个执行环节,由多个或者一个Step组装成Job |
|
Tasklet |
Step中具体执行的逻辑的操作,可以重复执行,可以具体的设置同步,异步操作。 |
|
Chunk |
给定数量的Item集合,可以定义对Chunk的读操作,处理操作,写操作,提交间隔。 |
|
Item |
一条数据记录。 |
|
ItemReader |
从数据源(文件系统,数据库,队列等)读取Item |
|
ItemProcessor |
在写入数据源之前,对数据进行处理(如:数据清洗,转换,过滤,数据校验等)。 |
|
ItemWriter |
将Item批量写入数据源(文件系统,数据库,队列等)。 |
5 Spring Batch 结构
Spring Batch的一个基本层级结构。
首先,Spring Batch运行的基本单位是一个Job,一个Job就做一件批处理的事情。
一个Job包含很多Step,step就是每个job要执行的单个步骤。
如下图所示,Step里面,会有Tasklet,Tasklet是一个任务单元,它是属于可以重复利用的东西。
然后是Chunk,chunk就是数据块,你需要定义多大的数据量是一个chunk。
Chunk里面就是不断循环的一个流程,读数据,处理数据,然后写数据。Spring Batch会不断的循环这个流程,直到批处理数据完成。
原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/192385.html