Spring batch 入门学习教程(附源码)

摘要: Spring batch 是一个开源的批处理框架.执行一系列的任务. 在 spring batch 中 一个job 是由许多 step 组成的。而每一个 step  又是由 READ-PROCESS-WRITE task或者 单个 task 组成。1. "READ-PROCESS-WRITE" 处理,根据字面意思理解就可以:READ 就是从资源文件里面读取数据,比如从xml文件,csv文件,数据库中读取数据.PROCESS 就是处理读取的数据WRITE 就是将处理过的数据写入到其他资源文件中去,可以是XML,CSV,或者数据库.比如:从CSV文件中 读取数据,经过处理之后,保存到数据库. spring batch 提供了很多类去处理这方面的东西。2.单个task, 也就是处理单个任务。比如在一个step 开始之前或者完成之后清除资源文件等.3.许多个step 组成在一起,就组成了一个job.

Spring batch 是一个开源的批处理框架.执行一系列的任务. 在 spring batch 中 一个job 是由许多 step 组成的。而每一个 step 又是由 READ-PROCESS-WRITE task或者 单个 task 组成。

1. "READ-PROCESS-WRITE" 处理,根据字面意思理解就可以:
READ 就是从资源文件里面读取数据,比如从xml文件,csv文件,数据库中读取数据.
PROCESS 就是处理读取的数据
WRITE 就是将处理过的数据写入到其他资源文件中去,可以是XML,CSV,或者数据库.
比如:从CSV文件中 读取数据,经过处理之后,保存到数据库. spring batch 提供了很多类去处理这方面的东西。

2.单个task, 也就是处理单个任务。比如在一个step 开始之前或者完成之后清除资源文件等.
3.许多个step 组成在一起,就组成了一个job. 所以他们之间的关系,就如同下面的描述:

一个 job = 很多steps
一个step = 一个READ-PROCESS-WRITE 或者 一个task.
同样一个job = step1 -->step2--step3 这样链表形式的组成.


Spring batch 例子
考虑如下一个批处理的例子,看起来有点啰嗦,只是为了说明用途:
1. step1 : 从 A 文件夹中读取csv 文件,处理之后,写入到B文件夹中(READ-PROCESS-WRITE)
2. step2 : 从 B 文件夹中读取CSV文件 ,处理之后, 存储到数据库中(READ-PROCESS-WRITE).
3. step3 : 删除B文件夹下的CSV文件。(用到单个task)
4. step4 : 从数据库读取数据,处理之后,生成XML报表文件(READ-PROCESS-WRITE).
5. step5 : 读取XML报表,并发送EMAIL给管理员(用到单个task)

用spring batch 我们可以如下定义这个job:
 
	
	  
		
	  
	
	
	  
		
	  
	
	
	  
	
	
	  
		
	  
	
	
		
	
  


整个 job 的执行是存储在数据库中的,所以即使是某一个step出错失败,也不需要全部从头开始执行这个job.下面是一个真正的入门教程例子.
采用 jar包如下:
spring-batch-2.2.3 以上版本,但是我在2.2.3版本中发现 org/springframework/batch/core/schema-mysql.sql 里面的的mysql 创建表的语句是有问题的,也就是少了“," 号导致的问题( NOT NULL, 后面几个创建表的语句NOT NULL 后面少了逗号),当然你可以自己修改后再执行,执行完毕后有如下几个表:

xstream-1.3.jar 必须的。
jettison-1.3.3.jar也是必须的, 下载地址:http://jettison.codehaus.org/Download 否则会出现
java.lang.NoClassDefFoundError: org/codehaus/jettison/mapped/MappedXMLOutputFactory错误。
另外我用的spring 是 3.1 版本的,可以下载相关jar包,还有apache common 相关jar包就可以了。
mysql-connect-java-5.1.jar 连接mysql 数据库用的。

假设要将如下 csv 文件读取出来处理之后,写入到一个xml文件之中.
1001,"213,100",980,"mkyong", 29/7/2013
1002,"320,200",1080,"staff 1", 30/7/2013
1003,"342,197",1200,"staff 2", 31/7/2013


用 FlatFileItemReader 去读取CSV 文件, 用 itemProcessor 去处理数据,用 StaxEventItemWriter 去写数据
job 的定义如下(job-hello-world.xml):


	
	

	
	

	
		
			
				
				
			
		
	

	

		

		
			
				
					
						
					
				
				
				    
				    
				    
				
			
		

	

	
		
		
		
	

	
		
			
				yihaomen.model.Report
			
		
	





映射csv文件到 Report 对象并写XML文件 (通过 jaxb annotations).
package yihaomen.model;

import java.math.BigDecimal;
import java.util.Date;

import javax.xml.bind.annotation.XmlAttribute;
import javax.xml.bind.annotation.XmlElement;
import javax.xml.bind.annotation.XmlRootElement;

@XmlRootElement(name = "record")
public class Report {

	private int id;
	private BigDecimal sales;
	private int qty;
	private String staffName;
	private Date date;

	@XmlAttribute(name = "id")
	public int getId() {
		return id;
	}

	public void setId(int id) {
		this.id = id;
	}

	@XmlElement(name = "sales")
	public BigDecimal getSales() {
		return sales;
	}

	public void setSales(BigDecimal sales) {
		this.sales = sales;
	}

	@XmlElement(name = "qty")
	public int getQty() {
		return qty;
	}

	public void setQty(int qty) {
		this.qty = qty;
	}

	@XmlElement(name = "staffName")
	public String getStaffName() {
		return staffName;
	}

	public void setStaffName(String staffName) {
		this.staffName = staffName;
	}

	public Date getDate() {
		return date;
	}

	public void setDate(Date date) {
		this.date = date;
	}

	@Override
	public String toString() {
		return "Report [id=" + id + ", sales=" + sales + ", qty=" + qty + ", staffName=" + staffName + "]";
	}

}

为了转换日期,用了自定义的 FieldSetMapper. 如果没有数据需要转换, BeanWrapperFieldSetMapper 通过名称name 去自动映射值。

package yihaomen;

import java.text.ParseException;
import java.text.SimpleDateFormat;
import org.springframework.batch.item.file.mapping.FieldSetMapper;
import org.springframework.batch.item.file.transform.FieldSet;
import org.springframework.validation.BindException;

import yihaomen.model.Report;

public class ReportFieldSetMapper implements FieldSetMapper {

	private SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd");
	
	@Override
	public Report mapFieldSet(FieldSet fieldSet) throws BindException {
		
		Report report = new Report();
		report.setId(fieldSet.readInt(0));
		report.setSales(fieldSet.readBigDecimal(1));
		report.setQty(fieldSet.readInt(2));
		report.setStaffName(fieldSet.readString(3));
		
		//default format yyyy-MM-dd
		//fieldSet.readDate(4);
		String date = fieldSet.readString(4);
		try {
			report.setDate(dateFormat.parse(date));
		} catch (ParseException e) {
			e.printStackTrace();
		}
		
		return report;
		
	}

}


在写入数据之前调用itemProcessor 处理数据
package yihaomen;

import org.springframework.batch.item.ItemProcessor;

import yihaomen.model.Report;



public class CustomItemProcessor implements ItemProcessor {

	@Override
	public Report process(Report item) throws Exception {
		
		System.out.println("Processing..." + item);
		return item;
	}

}


spring 配置文件和数据库配置文件

 
	
	
 
 	 
	
		
		
		
	
 
	
 
	
		
	
 




 
        
	
		
		
		
		
	
 
	
 
	
	
		
		
	
 



运行程序
package yihaomen;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

public class App {
	public static void main(String[] args) {

		String[] springConfig  = 
			{	
				"spring/batch/jobs/job-hello-world.xml" 
			};
		
		ApplicationContext context = 
				new ClassPathXmlApplicationContext(springConfig);
		
		JobLauncher jobLauncher = (JobLauncher) context.getBean("jobLauncher");
		Job job = (Job) context.getBean("helloWorldJob");

		try {

			JobExecution execution = jobLauncher.run(job, new JobParameters());
			System.out.println("Exit Status : " + execution.getStatus());

		} catch (Exception e) {
			e.printStackTrace();
		}

		System.out.println("Done");

	}
}



运行结果 :

十二月 03, 2013 8:56:24 下午 org.springframework.batch.core.launch.support.SimpleJobLauncher$1 run
INFO: Job: [FlowJob: [name=helloWorldJob]] launched with the following parameters: [{}]
十二月 03, 2013 8:56:24 下午 org.springframework.batch.core.job.SimpleStepHandler handleStep
INFO: Executing step: [step1]
Processing...Report [id=1001, sales=213100, qty=980, staffName=yihaomen]
Processing...Report [id=1002, sales=320200, qty=1080, staffName=staff 1]
Processing...Report [id=1003, sales=342197, qty=1200, staffName=staff 2]
十二月 03, 2013 8:56:25 下午 org.springframework.batch.core.launch.support.SimpleJobLauncher$1 run
INFO: Job: [FlowJob: [name=helloWorldJob]] completed with the following parameters: [{}] and the following status: [COMPLETED]
Exit Status : COMPLETED
Done


结果生成了output.xml 在你工程目录的 xml 目录下。

整个源代码,除去jar包之后下载:


Spring batch 入门教程下载

上一篇: 再次在BAE上用DJANGO搭建了一个网站
下一篇: Spring batch Tasklet 例子(源码下载)

Avatar

看不懂啊 评论于: 2016-10-18

真看不懂

Avatar

henu632 评论于: 2014-03-05

[quote=小弟]楼主好!有两个问题想请教一下:
1.文章的例子读取的是一个指定路径的文本,但如果我想要处理的是某个文件夹下的所有文件时该如何处理呢?比如我想读取的是一个log文件夹下的日志文件,但这些log文件的数量都是在时刻增加的
2 我可以用JobLauncher同时启动多个job吗?是不是可以获取我log目录下的所有日志文件,然后对应每个日志文件都启动一个job进行处理?
望楼主百忙之中抽空解答,小弟不胜感激[/quote]

I guess you can use MultiResourceItemReader.class to implement it.
or you can get all files into JobParameterBuilder list and launch job one by one with parameter..

Avatar

henu632 评论于: 2014-03-05

I guess you can use MultiResourceItemReader.class to implement it.
or you can get all files into JobParameterBuilder list and launch job one by one with parameter..

Avatar

轻舞肥羊 评论于: 2014-01-13

首先对于文件的路径,这里只是例子,所以用了指定的路径,如果是某个文件夹下所有的文件,道理是一样的,唯一不同的是要递归遍历这个文件夹下,得到所有的文件列表, 得到文件列表之后,你要怎么处理,是你自己写程序处理的。
建议:
1,得到文件列表之后,在自己的业务逻辑中用多线程处理.
2,可以做一个定时任务,比如spring quarts 框架,定时把文件列表写入到数据库表中,其中包好了要处理文件的路径等信息,并设置标记字段,另外一个任务循环这个表处理,处理成功之后,将标志位修改成已经处理的, 这样就不需要多个job 处理了。

重点在于处理的逻辑,而不是job了。没有必要对每一个日志文件做一个job. 

Avatar

小弟 评论于: 2014-01-13

楼主好!有两个问题想请教一下:
1.文章的例子读取的是一个指定路径的文本,但如果我想要处理的是某个文件夹下的所有文件时该如何处理呢?比如我想读取的是一个log文件夹下的日志文件,但这些log文件的数量都是在时刻增加的
2 我可以用JobLauncher同时启动多个job吗?是不是可以获取我log目录下的所有日志文件,然后对应每个日志文件都启动一个job进行处理?
望楼主百忙之中抽空解答,小弟不胜感激
 评论 ( What Do You Think )
名称
邮箱
网址
评论
验证
   
 

 


  • 微信公众号

  • 我的微信

站点声明:

1、一号门博客CMS,由Python, MySQL, Nginx, Wsgi 强力驱动

2、部分文章或者资源来源于互联网, 有时候很难判断是否侵权, 若有侵权, 请联系邮箱:summer@yihaomen.com, 同时欢迎大家注册用户,主动发布无版权争议的 文章/资源.

3、鄂ICP备14001754号-3, 鄂公网安备 42280202422812号