Spring batch 入门学习教程(附源码)

Spring batch 是一个开源的批处理框架.执行一系列的任务. 在 spring batch 中 一个job 是由许多 step 组成的。而每一个 step  又是由 READ-PROCESS-WRITE task或者 单个 task 组成。

1. "READ-PROCESS-WRITE" 处理,根据字面意思理解就可以:
READ 就是从资源文件里面读取数据,比如从xml文件,csv文件,数据库中读取数据.
PROCESS 就是处理读取的数据
WRITE 就是将处理过的数据写入到其他资源文件中去,可以是XML,CSV,或者数据库.
比如:从CSV文件中 读取数据,经过处理之后,保存到数据库. spring batch 提供了很多类去处理这方面的东西。

2.单个task, 也就是处理单个任务。比如在一个step 开始之前或者完成之后清除资源文件等.
3.许多个step 组成在一起,就组成了一个job. 所以他们之间的关系,就如同下面的描述:
程序代码 程序代码

一个 job = 很多steps
一个step = 一个READ-PROCESS-WRITE 或者 一个task.
同样一个job = step1 -->step2--step3 这样链表形式的组成.


Spring batch 例子
考虑如下一个批处理的例子,看起来有点啰嗦,只是为了说明用途:
1. step1 : 从 A 文件夹中读取csv 文件,处理之后,写入到B文件夹中(READ-PROCESS-WRITE)
2. step2 : 从 B 文件夹中读取CSV文件 ,处理之后, 存储到数据库中(READ-PROCESS-WRITE).
3. step3 : 删除B文件夹下的CSV文件。(用到单个task)
4. step4 : 从数据库读取数据,处理之后,生成XML报表文件(READ-PROCESS-WRITE).
5. step5 : 读取XML报表,并发送EMAIL给管理员(用到单个task)

用spring batch 我们可以如下定义这个job:
程序代码 程序代码

<job id="abcJob" xmlns="http://www.springframework.org/schema/batch">
    <step id="step1" next="step2">
      <tasklet>
        <chunk reader="cvsItemReader" writer="cvsItemWriter"  
                    processor="itemProcesser" commit-interval="1" />
      </tasklet>
    </step>
    <step id="step2" next="step3">
      <tasklet>
        <chunk reader="cvsItemReader" writer="databaseItemWriter"  
                    processor="itemProcesser" commit-interval="1" />
      </tasklet>
    </step>
    <step id="step3" next="step4">
      <tasklet ref="fileDeletingTasklet" />
    </step>
    <step id="step4" next="step5">
      <tasklet>
        <chunk reader="databaseItemReader" writer="xmlItemWriter"  
                    processor="itemProcesser" commit-interval="1" />
      </tasklet>
    </step>
    <step id="step5">
        <tasklet ref="sendingEmailTasklet" />
    </step>
  </job>


整个 job 的执行是存储在数据库中的,所以即使是某一个step出错失败,也不需要全部从头开始执行这个job.下面是一个真正的入门教程例子.
采用 jar包如下:
spring-batch-2.2.3 以上版本,但是我在2.2.3版本中发现 org/springframework/batch/core/schema-mysql.sql 里面的的mysql 创建表的语句是有问题的,也就是少了“," 号导致的问题( NOT NULL, 后面几个创建表的语句NOT NULL 后面少了逗号),当然你可以自己修改后再执行,执行完毕后有如下几个表:

xstream-1.3.jar 必须的。
jettison-1.3.3.jar也是必须的, 下载地址:http://jettison.codehaus.org/Download 否则会出现
java.lang.NoClassDefFoundError: org/codehaus/jettison/mapped/MappedXMLOutputFactory错误。
另外我用的spring 是 3.1 版本的,可以下载相关jar包,还有apache common 相关jar包就可以了。
mysql-connect-java-5.1.jar 连接mysql  数据库用的。

假设要将如下 csv 文件读取出来处理之后,写入到一个xml文件之中.
程序代码 程序代码

1001,"213,100",980,"mkyong", 29/7/2013
1002,"320,200",1080,"staff 1", 30/7/2013
1003,"342,197",1200,"staff 2", 31/7/2013


用 FlatFileItemReader 去读取CSV 文件, 用 itemProcessor 去处理数据,用 StaxEventItemWriter 去写数据
job 的定义如下(job-hello-world.xml):
程序代码 程序代码

<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:batch="http://www.springframework.org/schema/batch" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://www.springframework.org/schema/batch
        http://www.springframework.org/schema/batch/spring-batch-2.2.xsd
        http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
    ">

    <import resource="../config/context.xml" />
    <import resource="../config/database.xml" />

    <bean id="report" class="yihaomen.model.Report" scope="prototype" />
    <bean id="itemProcessor" class="yihaomen.CustomItemProcessor" />

    <batch:job id="helloWorldJob">
        <batch:step id="step1">
            <batch:tasklet>
                <batch:chunk reader="cvsFileItemReader" writer="xmlItemWriter" processor="itemProcessor"
                    commit-interval="10">
                </batch:chunk>
            </batch:tasklet>
        </batch:step>
    </batch:job>

    <bean id="cvsFileItemReader" class="org.springframework.batch.item.file.FlatFileItemReader">

        <property name="resource" value="classpath:cvs/input/report.csv" />

        <property name="lineMapper">
            <bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
                <property name="lineTokenizer">
                    <bean
                        class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
                        <property name="names" value="id,sales,qty,staffName,date" />
                    </bean>
                </property>
                <property name="fieldSetMapper">
                    <bean class="yihaomen.ReportFieldSetMapper" />
                    
                    <!-- if no data type conversion, use BeanWrapperFieldSetMapper to map by name
                    <bean
                        class="org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper">
                        <property name="prototypeBeanName" value="report" />
                    </bean>
                     -->
                </property>
            </bean>
        </property>

    </bean>

    <bean id="xmlItemWriter" class="org.springframework.batch.item.xml.StaxEventItemWriter">
        <property name="resource" value="file:xml/outputs/report.xml" />
        <property name="marshaller" ref="reportMarshaller" />
        <property name="rootTagName" value="report" />
    </bean>

    <bean id="reportMarshaller" class="org.springframework.oxm.jaxb.Jaxb2Marshaller">
        <property name="classesToBeBound">
            <list>
                <value>yihaomen.model.Report</value>
            </list>
        </property>
    </bean>


</beans>


映射csv文件到 Report 对象并写XML文件 (通过 jaxb annotations).
程序代码 程序代码

package yihaomen.model;

import java.math.BigDecimal;
import java.util.Date;

import javax.xml.bind.annotation.XmlAttribute;
import javax.xml.bind.annotation.XmlElement;
import javax.xml.bind.annotation.XmlRootElement;

@XmlRootElement(name = "record")
public class Report {

    private int id;
    private BigDecimal sales;
    private int qty;
    private String staffName;
    private Date date;

    @XmlAttribute(name = "id")
    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    @XmlElement(name = "sales")
    public BigDecimal getSales() {
        return sales;
    }

    public void setSales(BigDecimal sales) {
        this.sales = sales;
    }

    @XmlElement(name = "qty")
    public int getQty() {
        return qty;
    }

    public void setQty(int qty) {
        this.qty = qty;
    }

    @XmlElement(name = "staffName")
    public String getStaffName() {
        return staffName;
    }

    public void setStaffName(String staffName) {
        this.staffName = staffName;
    }

    public Date getDate() {
        return date;
    }

    public void setDate(Date date) {
        this.date = date;
    }

    @Override
    public String toString() {
        return "Report [id=" + id + ", sales=" + sales + ", qty=" + qty + ", staffName=" + staffName + "]";
    }

}

为了转换日期,用了自定义的 FieldSetMapper. 如果没有数据需要转换, BeanWrapperFieldSetMapper 通过名称name 去自动映射值。

程序代码 程序代码

package yihaomen;

import java.text.ParseException;
import java.text.SimpleDateFormat;
import org.springframework.batch.item.file.mapping.FieldSetMapper;
import org.springframework.batch.item.file.transform.FieldSet;
import org.springframework.validation.BindException;

import yihaomen.model.Report;

public class ReportFieldSetMapper implements FieldSetMapper<Report> {

    private SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd");
    
    @Override
    public Report mapFieldSet(FieldSet fieldSet) throws BindException {
        
        Report report = new Report();
        report.setId(fieldSet.readInt(0));
        report.setSales(fieldSet.readBigDecimal(1));
        report.setQty(fieldSet.readInt(2));
        report.setStaffName(fieldSet.readString(3));
        
        //default format yyyy-MM-dd
        //fieldSet.readDate(4);
        String date = fieldSet.readString(4);
        try {
            report.setDate(dateFormat.parse(date));
        } catch (ParseException e) {
            e.printStackTrace();
        }
        
        return report;
        
    }

}


在写入数据之前调用itemProcessor 处理数据
程序代码 程序代码

package yihaomen;

import org.springframework.batch.item.ItemProcessor;

import yihaomen.model.Report;



public class CustomItemProcessor implements ItemProcessor<Report, Report> {

    @Override
    public Report process(Report item) throws Exception {
        
        System.out.println("Processing..." + item);
        return item;
    }

}


spring 配置文件和数据库配置文件
程序代码 程序代码

<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="
        http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans-3.2.xsd">

    <!-- stored job-meta in memory -->
    <!--  
    <bean id="jobRepository"
        class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean">
        <property name="transactionManager" ref="transactionManager" />
    </bean>
     -->

     <!-- stored job-meta in database -->
    <bean id="jobRepository"
        class="org.springframework.batch.core.repository.support.JobRepositoryFactoryBean">
        <property name="dataSource" ref="dataSource" />
        <property name="transactionManager" ref="transactionManager" />
        <property name="databaseType" value="mysql" />
    </bean>

    <bean id="transactionManager"
        class="org.springframework.batch.support.transaction.ResourcelessTransactionManager" />

    <bean id="jobLauncher"
        class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
        <property name="jobRepository" ref="jobRepository" />
    </bean>

</beans>


程序代码 程序代码

<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:jdbc="http://www.springframework.org/schema/jdbc"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans-3.2.xsd
        http://www.springframework.org/schema/jdbc
        http://www.springframework.org/schema/jdbc/spring-jdbc-3.2.xsd">

        <!-- connect to MySQL database -->
    <bean id="dataSource"
        class="org.springframework.jdbc.datasource.DriverManagerDataSource">
        <property name="driverClassName" value="com.mysql.jdbc.Driver" />
        <property name="url" value="jdbc:mysql://localhost:3306/test" />
        <property name="username" value="root" />
        <property name="password" value="" />
    </bean>

    <bean id="transactionManager"
        class="org.springframework.batch.support.transaction.ResourcelessTransactionManager" />

    <!-- create job-meta tables automatically -->
    <jdbc:initialize-database data-source="dataSource">
        <jdbc:script location="org/springframework/batch/core/schema-drop-mysql.sql" />
        <jdbc:script location="org/springframework/batch/core/schema-mysql.sql" />
    </jdbc:initialize-database>

</beans>


运行程序
程序代码 程序代码

package yihaomen;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

public class App {
    public static void main(String[] args) {

        String[] springConfig  =
            {    
                "spring/batch/jobs/job-hello-world.xml"
            };
        
        ApplicationContext context =
                new ClassPathXmlApplicationContext(springConfig);
        
        JobLauncher jobLauncher = (JobLauncher) context.getBean("jobLauncher");
        Job job = (Job) context.getBean("helloWorldJob");

        try {

            JobExecution execution = jobLauncher.run(job, new JobParameters());
            System.out.println("Exit Status : " + execution.getStatus());

        } catch (Exception e) {
            e.printStackTrace();
        }

        System.out.println("Done");

    }
}



运行结果 :

程序代码 程序代码

十二月 03, 2013 8:56:24 下午 org.springframework.batch.core.launch.support.SimpleJobLauncher$1 run
INFO: Job: [FlowJob: [name=helloWorldJob]] launched with the following parameters: [{}]
十二月 03, 2013 8:56:24 下午 org.springframework.batch.core.job.SimpleStepHandler handleStep
INFO: Executing step: [step1]
Processing...Report [id=1001, sales=213100, qty=980, staffName=yihaomen]
Processing...Report [id=1002, sales=320200, qty=1080, staffName=staff 1]
Processing...Report [id=1003, sales=342197, qty=1200, staffName=staff 2]
十二月 03, 2013 8:56:25 下午 org.springframework.batch.core.launch.support.SimpleJobLauncher$1 run
INFO: Job: [FlowJob: [name=helloWorldJob]] completed with the following parameters: [{}] and the following status: [COMPLETED]
Exit Status : COMPLETED
Done


结果生成了output.xml 在你工程目录的 xml 目录下。

整个源代码,除去jar包之后下载:


下载文件 Spring batch 入门教程下载


除非申明,文章均为一号门原创,转载请注明本文地址,谢谢!
[本日志由 轻舞肥羊 于 2013-12-15 12:42 AM 编辑]
文章来自: 本站原创
引用通告: 查看所有引用 | 我要引用此文章
Tags: springbatch
相关日志:
评论: 5 | 引用: 0 | 查看次数: -
回复回复看不懂啊[2016-10-18 05:43 PM | del]
真看不懂
回复回复henu632[2014-03-05 02:38 PM | del]
引用来自 小弟 引用来自 小弟
楼主好!有两个问题想请教一下:
1.文章的例子读取的是一个指定路径的文本,但如果我想要处理的是某个文件夹下的所有文件时该如何处理呢?比如我想读取的是一个log文件夹下的日志文件,但这些log文件的数量都是在时刻增加的
2 我可以用JobLauncher同时启动多个job吗?是不是可以获取我log目录下的所有日志文件,然后对应每个日志文件都启动一个job进行处理?
望楼主百忙之中抽空解答,小弟不胜感激


I guess you can use MultiResourceItemReader.class to implement it.
or you can get all files into JobParameterBuilder list and launch job one by one with parameter..
回复回复henu632[2014-03-05 02:35 PM | del]
I guess you can use MultiResourceItemReader.class to implement it.
or you can get all files into JobParameterBuilder list and launch job one by one with parameter..
回复回复轻舞肥羊[2014-01-13 08:01 PM | del]
首先对于文件的路径,这里只是例子,所以用了指定的路径,如果是某个文件夹下所有的文件,道理是一样的,唯一不同的是要递归遍历这个文件夹下,得到所有的文件列表, 得到文件列表之后,你要怎么处理,是你自己写程序处理的。
建议:
1,得到文件列表之后,在自己的业务逻辑中用多线程处理.
2,可以做一个定时任务,比如spring quarts 框架,定时把文件列表写入到数据库表中,其中包好了要处理文件的路径等信息,并设置标记字段,另外一个任务循环这个表处理,处理成功之后,将标志位修改成已经处理的, 这样就不需要多个job 处理了。

重点在于处理的逻辑,而不是job了。没有必要对每一个日志文件做一个job.
回复回复小弟[2014-01-13 05:46 PM | del]
楼主好!有两个问题想请教一下:
1.文章的例子读取的是一个指定路径的文本,但如果我想要处理的是某个文件夹下的所有文件时该如何处理呢?比如我想读取的是一个log文件夹下的日志文件,但这些log文件的数量都是在时刻增加的
2 我可以用JobLauncher同时启动多个job吗?是不是可以获取我log目录下的所有日志文件,然后对应每个日志文件都启动一个job进行处理?
望楼主百忙之中抽空解答,小弟不胜感激
发表评论
昵 称:
密 码: 游客发言不需要密码.
内 容:
验证码: 验证码
选 项:
虽然发表评论不用注册,但是为了保护您的发言权,建议您注册帐号.