创建批处理服务
本指南将引导您完成创建基本批处理驱动解决方案的过程。
你将建造什么
您将构建一个从 CSV 电子表格导入数据、使用自定义代码对其进行转换并将最终结果存储在数据库中的服务。
你需要什么
- 约15分钟
- 最喜欢的文本编辑器或 IDE
- JDK 1.8或更高版本
- Gradle 4+或Maven 3.2+
- 您还可以将代码直接导入 IDE:
- 弹簧工具套件 (STS)
- IntelliJ IDEA
如何完成本指南
像大多数 Spring入门指南一样,您可以从头开始并完成每个步骤,也可以绕过您已经熟悉的基本设置步骤。无论哪种方式,您最终都会得到工作代码。
要从头开始,请继续从 Spring Initializr 开始。
要跳过基础知识,请执行以下操作:
- 下载并解压本指南的源代码库,或使用Git克隆它:git clone https://github.com/spring-guides/gs-batch-processing.git
- 光盘进入gs-batch-processing/initial
- 继续创建商务舱。
完成后,您可以对照中的代码检查结果
gs-batch-processing/complete。
业务数据
通常,您的客户或业务分析师会提供电子表格。对于这个简单的示例,您可以在以下位置找到一些虚构的数据src/main/resources/sample-data.csv:
Jill,Doe
Joe,Doe
Justin,Doe
Jane,Doe
John,Doe
此电子表格的每一行都包含名字和姓氏,以逗号分隔。这是一种相当常见的模式,Spring 无需定制即可处理。
接下来,您需要编写一个 SQL 脚本来创建一个表来存储数据。您可以在以下位置找到这样的脚本src/main/resources/schema-all.sql:
DROP TABLE people IF EXISTS;
CREATE TABLE people (
person_id BIGINT IDENTITY NOT NULL PRIMARY KEY,
first_name VARCHAR(20),
last_name VARCHAR(20)
);
Spring Bootschema-@@platform@@.sql在启动期间自动运行。-all是所有平台的默认设置。
从 Spring Initializr 开始
您可以使用这个预先初始化的项目并单击 Generate 下载 ZIP 文件。此项目配置为适合本教程中的示例。
手动初始化项目:
- 导航到https://start.spring.io。该服务提取应用程序所需的所有依赖项,并为您完成大部分设置。
- 选择 Gradle 或 Maven 以及您要使用的语言。本指南假定您选择了 Java。
- 单击Dependencies并选择Spring Batch和HyperSQL Database。
- 单击生成。
- 下载生成的 ZIP 文件,该文件是根据您的选择配置的 Web 应用程序的存档。
如果您的 IDE 具有 Spring Initializr 集成,您可以从您的 IDE 完成此过程。
你也可以从 Github 上 fork 项目并在你的 IDE 或其他编辑器中打开它。
创建商务舱
现在您可以看到数据输入和输出的格式,您可以编写代码来表示一行数据,如以下示例(来自src/main/java/com/example/batchprocessing/Person.java)所示:
package com.example.batchprocessing;
public class Person {
private String lastName;
private String firstName;
public Person() {
}
public Person(String firstName, String lastName) {
this.firstName = firstName;
this.lastName = lastName;
}
public void setFirstName(String firstName) {
this.firstName = firstName;
}
public String getFirstName() {
return firstName;
}
public String getLastName() {
return lastName;
}
public void setLastName(String lastName) {
this.lastName = lastName;
}
@Override
public String toString() {
return "firstName: " + firstName + ", lastName: " + lastName;
}
}
创建中间处理器
批处理中的一个常见范例是摄取数据,对其进行转换,然后将其通过管道输出到其他地方。在这里,您需要编写一个简单的转换器,将名称转换为大写。以下清单(来自src/main/java/com/example/batchprocessing/PersonItemProcessor.java)显示了如何执行此操作:
package com.example.batchprocessing;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.item.ItemProcessor;
public class PersonItemProcessor implements ItemProcessor
{
private static final Logger log =
LoggerFactory.getLogger(PersonItemProcessor.class);
@Override
public Person process(final Person person) throws Exception {
final String firstName = person.getFirstName().toUpperCase();
final String lastName = person.getLastName().toUpperCase();
final Person transformedPerson = new Person(firstName, lastName);
log.info("Converting (" + person + ") into (" + transformedPerson + ")");
return transformedPerson;
}
}
PersonItemProcessor实现 Spring Batch 的ItemProcessor接口。这使得将代码连接到您将在本指南后面定义的批处理作业变得很容易。根据界面,您会收到一个传入的Person对象,然后将其转换为大写的Person.
输入和输出类型不必相同。事实上,在读取一个数据源之后,有时应用程序的数据流需要不同的数据类型。
将批处理作业放在一起
现在您需要将实际的批处理作业放在一起。Spring Batch 提供了许多实用程序类来减少编写自定义代码的需要。相反,您可以专注于业务逻辑。
要配置您的作业,您必须首先创建一个 Spring@Configuration类,如下例所示src/main/java/com/exampe/batchprocessing/BatchConfiguration.java:
@Configuration
@EnableBatchProcessing
public class BatchConfiguration {
@Autowired
public JobBuilderFactory jobBuilderFactory;
@Autowired
public StepBuilderFactory stepBuilderFactory;
...
}
对于初学者,@EnableBatchProcessing注释添加了许多支持作业并为您节省大量工作的关键 bean。此示例使用基于内存的数据库(由 提供@EnableBatchProcessing),这意味着完成后,数据就消失了。它还自动连接下面需要的几个工厂。现在将以下 bean 添加到您的BatchConfiguration类中以定义读取器、处理器和写入器:
@Bean
public FlatFileItemReader reader() {
return new FlatFileItemReaderBuilder()
.name("personItemReader")
.resource(new ClassPathResource("sample-data.csv"))
.delimited()
.names(new String[]{"firstName", "lastName"})
.fieldSetMapper(new BeanWrapperFieldSetMapper() {{
setTargetType(Person.class);
}})
.build();
}
@Bean
public PersonItemProcessor processor() {
return new PersonItemProcessor();
}
@Bean
public JdbcBatchItemWriter writer(DataSource dataSource) {
return new JdbcBatchItemWriterBuilder()
.itemSqlParameterSourceProvider(new
BeanPropertyItemSqlParameterSourceProvider<>())
.sql("INSERT INTO people (first_name, last_name) VALUES (:firstName,
:lastName)")
.dataSource(dataSource)
.build();
}
第一段代码定义了输入、处理器和输出。
- reader()创建一个ItemReader. 它查找一个名为的文件sample-data.csv,并使用足够的信息解析每个行项目,以将其转换为Person.
- processor()创建一个PersonItemProcessor您之前定义的实例,用于将数据转换为大写。
- writer(DataSource)创建一个ItemWriter. 这个针对 JDBC 目标,并自动获取由@EnableBatchProcessing. 它包括插入单个 所需的 SQL 语句Person,由 Java bean 属性驱动。
最后一个块(来自src/main/java/com/example/batchprocessing/BatchConfiguration.java)显示了实际的作业配置:
@Bean
public Job importUserJob(JobCompletionNotificationListener listener, Step
step1) {
return jobBuilderFactory.get("importUserJob")
.incrementer(new RunIdIncrementer())
.listener(listener)
.flow(step1)
.end()
.build();
}
@Bean
public Step step1(JdbcBatchItemWriter writer) {
return stepBuilderFactory.get("step1")
.reader(reader())
.processor(processor())
.writer(writer)
.build();
}
第一种方法定义了作业,第二种方法定义了一个步骤。作业是由步骤构建的,其中每个步骤都可能涉及读取器、处理器和写入器。
在此作业定义中,您需要一个增量器,因为作业使用数据库来维护执行状态。然后列出每个步骤(尽管此作业只有一个步骤)。作业结束,Java API 生成一个完美配置的作业。
在步骤定义中,您定义一次写入多少数据。在这种情况下,它一次最多写入十个记录。接下来,您使用之前注入的 bean 配置读取器、处理器和写入器。
chunk()是前缀,因为它是一个通用方法。这表示每个处理“块”的输入和输出类型,并与ItemReader和对齐ItemWriter。
批处理配置的最后一点是在作业完成时获得通知的一种方式。以下示例(来自src/main/java/com/example/batchprocessing/JobCompletionNotificationListener.java)显示了这样一个类:
package com.example.batchprocessing;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.JobExecution;
import
org.springframework.batch.core.listener.JobExecutionListenerSupport;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;
@Component
public class JobCompletionNotificationListener extends
JobExecutionListenerSupport {
private static final Logger log =
LoggerFactory.getLogger(JobCompletionNotificationListener.class);
private final JdbcTemplate jdbcTemplate;
@Autowired
public JobCompletionNotificationListener(JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
}
@Override
public void afterJob(JobExecution jobExecution) {
if(jobExecution.getStatus() == BatchStatus.COMPLETED) {
log.info("!!! JOB FINISHED! Time to verify the results");
jdbcTemplate.query("SELECT first_name, last_name FROM people",
(rs, row) -> new Person(
rs.getString(1),
rs.getString(2))
).forEach(person -> log.info("Found <" + person + "> in the
database."));
}
}
}
JobCompletionNotificationListener监听作业的时间,BatchStatus.COMPLETED然后用于JdbcTemplate检查结果。
使应用程序可执行
尽管批处理可以嵌入到 Web 应用程序和 WAR 文件中,但下面演示的更简单的方法可以创建一个独立的应用程序。您将所有内容打包在一个可执行的 JAR 文件中,由一个很好的旧 Javamain()方法驱动。
Spring Initializr 为您创建了一个应用程序类。对于这个简单的示例,它无需进一步修改即可工作。以下清单(来自
src/main/java/com/example/batchprocessing/BatchProcessingApplication.java)显示了应用程序类:
JobCompletionNotificationListener监听作业的时间,BatchStatus.COMPLETED然后用于JdbcTemplate检查结果。
使应用程序可执行
尽管批处理可以嵌入到 Web 应用程序和 WAR 文件中,但下面演示的更简单的方法可以创建一个独立的应用程序。您将所有内容打包在一个可执行的 JAR 文件中,由一个很好的旧 Javamain()方法驱动。
Spring Initializr 为您创建了一个应用程序类。对于这个简单的示例,它无需进一步修改即可工作。以下清单(来自
src/main/java/com/example/batchprocessing/BatchProcessingApplication.java)显示了应用程序类:
package com.example.batchprocessing;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class BatchProcessingApplication {
public static void main(String[] args) throws Exception {
System.exit(SpringApplication.exit(SpringApplication.run(BatchProcessingApplication.class,
args)));
}
}
@SpringBootApplication是一个方便的注释,它添加了以下所有内容:
- @Configuration: 将类标记为应用程序上下文的 bean 定义源。
- @EnableAutoConfiguration:告诉 Spring Boot 根据类路径设置、其他 bean 和各种属性设置开始添加 bean。例如,如果spring-webmvc位于类路径上,则此注释将应用程序标记为 Web 应用程序并激活关键行为,例如设置DispatcherServlet.
- @ComponentScan: 告诉 Spring 在包中查找其他组件、配置和服务com/example,让它找到控制器。
该main()方法使用 Spring Boot 的SpringApplication.run()方法来启动应用程序。您是否注意到没有一行 XML?也没有web.xml文件。这个 Web 应用程序是 100% 纯 Java,您不必处理任何管道或基础设施的配置。
请注意SpringApplication.exit()并System.exit()确保 JVM 在作业完成后退出。有关更多详细信息,请参阅Spring Boot 参考文档中的应用程序退出部分。
出于演示目的,有代码可以创建一个JdbcTemplate、查询数据库并打印出批处理作业插入的人员姓名。
构建一个可执行的 JAR
您可以使用 Gradle 或 Maven 从命令行运行应用程序。您还可以构建一个包含所有必要依赖项、类和资源的单个可执行 JAR 文件并运行它。构建可执行 jar 可以在整个开发生命周期、跨不同环境等中轻松地作为应用程序交付、版本化和部署服务。
如果您使用 Gradle,则可以使用./gradlew bootRun. 或者,您可以使用构建 JAR 文件./gradlew build,然后运行 JAR 文件,如下所示:
java -jar build/libs/gs-batch-processing-0.1.0.jar
如果您使用 Maven,则可以使用./mvnw spring-boot:run. 或者,您可以使用构建 JAR 文件,./mvnw clean package然后运行该 JAR 文件,如下所示:
java -jar 目标/gs-batch-processing-0.1.0.jar
此处描述的步骤创建了一个可运行的 JAR。您还可以构建经典的 WAR 文件。
该作业为每个被转换的人打印一行。作业运行后,您还可以看到查询数据库的输出。它应该类似于以下输出:
java -jar 目标/gs-batch-processing-0.1.0.jar
概括
恭喜!您构建了一个批处理作业,该作业从电子表格中提取数据,对其进行处理,然后将其写入数据库。