Spring Cloud Data Flow使用 Spring Batch 进行批处理-8
使用 Spring Batch 进行批处理在本指南中,我们开发了一个 Spring Batch 应用程序并将其部署到 Cloud Foundry、Kubernetes 和您的本地计算机。在另一个指南中,我们使用 Data Flow部署Spring Batch 应用程序。本指南介绍了如何从头开始构建此应用程序。如果您愿意,您可以下载一个包含billsetup应用程序源的 zip 文件,将其解压缩,然
使用 Spring Batch 进行批处理
在本指南中,我们开发了一个 Spring Batch 应用程序并将其部署到 Cloud Foundry、Kubernetes 和您的本地计算机。在另一个指南中,我们使用 Data Flow部署Spring Batch 应用程序。
本指南介绍了如何从头开始构建此应用程序。如果您愿意,您可以下载一个包含billsetup
应用程序源的 zip 文件,将其解压缩,然后继续进行部署步骤。
您可以从浏览器下载项目或运行以下命令从命令行下载它:
wget https://github.com/spring-cloud/spring-cloud-dataflow-samples/blob/master/dataflow-website/batch-developer-guides/batch/batchsamples/dist/batchsamples.zip?raw=true -O batchsamples.zip
开发
我们从Spring Initializr开始并创建一个 Spring Batch 应用程序。
假设手机数据提供商需要为客户创建账单。使用数据存储在存储在文件系统上的 JSON 文件中。计费解决方案必须从这些文件中提取数据,从该使用数据生成计费数据,并将其存储在BILL_STATEMENTS
表中。
我们可以在使用 Spring Batch 的单个 Spring Boot 应用程序中实现整个解决方案。但是,对于此示例,我们将解决方案分为两个阶段:
billsetuptask
: 该billsetuptask
应用程序是一个使用 Spring Cloud Task 创建BILL_STATEMENTS
表的 Spring Boot 应用程序。billrun
:该billrun
应用程序是一个 Spring Boot 应用程序,它使用 Spring Cloud Task 和 Spring Batch 从 JSON 文件中读取每一行的使用数据和价格,并将结果数据放入BILL_STATEMENTS
表中。
对于本节,我们创建一个 Spring Cloud Task 和 Spring Batchbillrun
应用程序,它从一个 JSON 文件中读取使用信息,该文件包含每个条目的客户使用数据和价格,并将结果放入BILL_STATEMENTS
表中。
下图显示了该BILL_STATEMENTS
表:
介绍 Spring Batch
Spring Batch 是一个轻量级、全面的批处理框架,旨在支持开发健壮的批处理应用程序。Spring Batch 通过提供以下功能,提供了处理大量记录所必需的可重用功能:
- 记录/跟踪
- 基于块的处理
- 声明式 I/O
- 启动/停止/重启
- 重试/跳过
- 资源管理
它还提供更高级的技术服务和功能,通过优化和分区技术实现超大容量和高性能的批处理作业。
对于本指南,我们将重点介绍五个 Spring Batch 组件,如下图所示:
Job
:AJob
是封装了整个批处理过程的实体。一项工作由一个或多个steps
.Step
: AStep
是一个域对象,它封装了批处理作业的独立、顺序阶段。每个都由step
一个ItemReader
、一个ItemProcessor
和一个组成ItemWriter
。ItemReader
:ItemReader
是一种抽象,表示一次检索Step
一个项目的输入。ItemProcessor
:ItemProcessor
是一个抽象,代表一个项目的业务处理。ItemWriter
:ItemWriter
是一个抽象,表示 a 的输出Step
。
在上图中,我们看到每个阶段JobExecution
都存储在一个JobRepository
(在本例中是我们的 MySQL 数据库)中。这意味着 Spring Batch 执行的每个操作都记录在数据库中,用于日志记录和重新启动作业。
注意:您可以在此处阅读有关此过程的更多信息。
我们的批量作业
因此,对于我们的应用程序,我们有一个BillRun
Job
that has one Step
,其中包括:
JsonItemReader
:ItemReader
读取包含使用数据的 JSON 文件。BillProcessor
:ItemProcessor
根据从 发送的每行数据生成价格的JsonItemReader
。JdbcBatchItemWriter
:ItemWriter
将定价Bill
记录写入BILLING_STATEMENT
表中。
初始化
我们使用Spring Initializr来创建我们的应用程序。为此:
- 访问Spring Initializr 站点。
- 选择最新版本的 Spring Boot。
io.spring
使用 Group 名称和 Artifact 名称创建一个新的 Maven 项目billrun
。- 在依赖项文本框中,键入
task
以选择 Cloud Task 依赖项。 - 在依赖项文本框中,键入
jdbc
并选择 JDBC 依赖项。 - 在依赖项文本框中,键入
h2
并选择 H2 依赖项。我们使用 H2 进行单元测试。 - 在Dependencies文本框中,键入
mysql
并选择 MySQL 依赖项(或您喜欢的数据库)。我们使用 MySQL 作为运行时数据库。 - 在Dependencies文本框中,键入
batch
然后选择 Batch。 - 单击生成项目按钮。
- 解压缩
billrun.zip
文件并将项目导入您喜欢的 IDE。
或者,您可以通过下载预构建文件来初始化您的项目。为此:
- 单击此 Spring Initializr 链接以下载预配置
billrun.zip
文件。 - 解压缩 billrun.zip 文件并将项目导入您喜欢的 IDE。
设置 MySQL
如果您没有可用的 MySQL 实例,您可以按照以下说明运行此示例的 MySQL Docker 映像:
-
通过运行以下命令来拉取 MySQL Docker 映像:
复制
docker pull mysql:5.7.25
-
通过运行以下命令启动 MySQL:
复制
docker run -p 3306:3306 --name mysql -e MYSQL_ROOT_PASSWORD=password \ -e MYSQL_DATABASE=task -d mysql:5.7.25
构建应用程序
-
下载
https://raw.githubusercontent.com/spring-cloud/spring-cloud-dataflow-samples/master/dataflow-website/batch-developer-guides/batch/batchsamples/billrun/src/main/resources/usageinfo.json title=usageinfo.json
生成的文件并将其复制到/src/main/resources
目录中。 -
下载
https://raw.githubusercontent.com/spring-cloud/spring-cloud-dataflow-samples/master/dataflow-website/batch-developer-guides/batch/batchsamples/billrun/src/main/resources/schema.sql title=schema.sql
生成的文件并将其复制到/src/main/resources
目录中。 -
在您最喜欢的 IDE 中,创建
io.spring.billrun.model
包。 -
Usage
在 中创建一个io.spring.billrun.model
类似于Usage.java内容的类。 -
Bill
在其中创建一个io.spring.billrun.model
类似于Bill.java内容的类。 -
在您最喜欢的 IDE 中,创建
io.spring.billrun.configuration
包。 -
为每条记录
ItemProcessor
定价。Usage
为此,请在如下所示的清单中创建一个BillProcessor
类:io.spring.billrun.configuration
复制
public class BillProcessor implements ItemProcessor<Usage, Bill> { @Override public Bill process(Usage usage) { Double billAmount = usage.getDataUsage() * .001 + usage.getMinutes() * .01; return new Bill(usage.getId(), usage.getFirstName(), usage.getLastName(), usage.getDataUsage(), usage.getMinutes(), billAmount); } }
请注意,我们实现了具有我们需要重写
ItemProcessor
的方法的接口。process
我们的参数是一个Usage
对象,返回值是类型Bill
。 -
现在我们可以创建一个 Java 配置来指定
BillRun
Job
. 在这种情况下,我们需要BillingConfiguration
在包中创建一个类,io.spring.billrun.configuration
如下所示:复制
@Configuration @EnableTask @EnableBatchProcessing public class BillingConfiguration { @Autowired public JobBuilderFactory jobBuilderFactory; @Autowired public StepBuilderFactory stepBuilderFactory; @Value("${usage.file.name:classpath:usageinfo.json}") private Resource usageResource; @Bean public Job job1(ItemReader<Usage> reader, ItemProcessor<Usage,Bill> itemProcessor, ItemWriter<Bill> writer) { Step step = stepBuilderFactory.get("BillProcessing") .<Usage, Bill>chunk(1) .reader(reader) .processor(itemProcessor) .writer(writer) .build(); return jobBuilderFactory.get("BillJob") .incrementer(new RunIdIncrementer()) .start(step) .build(); } @Bean public JsonItemReader<Usage> jsonItemReader() { ObjectMapper objectMapper = new ObjectMapper(); JacksonJsonObjectReader<Usage> jsonObjectReader = new JacksonJsonObjectReader<>(Usage.class); jsonObjectReader.setMapper(objectMapper); return new JsonItemReaderBuilder<Usage>() .jsonObjectReader(jsonObjectReader) .resource(usageResource) .name("UsageJsonItemReader") .build(); } @Bean public ItemWriter<Bill> jdbcBillWriter(DataSource dataSource) { JdbcBatchItemWriter<Bill> writer = new JdbcBatchItemWriterBuilder<Bill>() .beanMapped() .dataSource(dataSource) .sql("INSERT INTO BILL_STATEMENTS (id, first_name, " + "last_name, minutes, data_usage,bill_amount) VALUES " + "(:id, :firstName, :lastName, :minutes, :dataUsage, " + ":billAmount)") .build(); return writer; } @Bean ItemProcessor<Usage, Bill> billProcessor() { return new BillProcessor(); } }
@EnableBatchProcessing
注释启用 Spring Batch 功能并提供用于设置批处理作业的基本配置。@EnableTask
注释设置了一个,它存储了有关任务执行的TaskRepository
信息(例如任务的开始和结束时间以及退出代码)。在前面的配置中,我们看到我们的ItemReader
bean 是JsonItemReader
. 该JsonItemReader
实例读取资源的内容并将 JSON 数据解组为Usage
对象。JsonItemReader
是ItemReader
Spring Batch 提供的实现之一。我们还看到我们的ItemWriter
bean 是JdbcBatchItemWriter
. 该JdbcBatchItemWriter
实例将结果写入我们的数据库。JdbcBatchItemWriter
是ItemWriter
Spring Batch 提供的实现之一。这ItemProcessor
是我们自己的BillProcessor
. 请注意,所有使用 Spring Batch 提供的类 (Job
,Step
,ItemReader
,ItemWriter
) 的 bean 都是使用 Spring Batch 提供的构建器构建的,这意味着更少的编码。
测试
现在我们已经编写了代码,是时候编写测试了。在这种情况下,我们要确保账单信息已正确插入到BILLING_STATEMENTS
表中。要创建您的测试,请更新BillrunApplicationTests.java,使其类似于以下清单:
复制
package io.spring.billrun;
import io.spring.billrun.model.Bill;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.jdbc.core.JdbcTemplate;
import javax.sql.DataSource;
import java.util.List;
import static org.assertj.core.api.Assertions.assertThat;
@SpringBootTest
public class BillRunApplicationTests {
@Autowired
private DataSource dataSource;
private JdbcTemplate jdbcTemplate;
@BeforeEach
public void setup() {
this.jdbcTemplate = new JdbcTemplate(this.dataSource);
}
@Test
public void testJobResults() {
List<Bill> billStatements = this.jdbcTemplate.query("select id, " +
"first_name, last_name, minutes, data_usage, bill_amount " +
"FROM bill_statements ORDER BY id",
(rs, rowNum) -> new Bill(rs.getLong("id"),
rs.getString("FIRST_NAME"), rs.getString("LAST_NAME"),
rs.getLong("DATA_USAGE"), rs.getLong("MINUTES"),
rs.getDouble("bill_amount")));
assertThat(billStatements.size()).isEqualTo(5);
Bill billStatement = billStatements.get(0);
assertThat(billStatement.getBillAmount()).isEqualTo(6.0);
assertThat(billStatement.getFirstName()).isEqualTo("jane");
assertThat(billStatement.getLastName()).isEqualTo("doe");
assertThat(billStatement.getId()).isEqualTo(1);
assertThat(billStatement.getMinutes()).isEqualTo(500);
assertThat(billStatement.getDataUsage()).isEqualTo(1000);
}
}
对于此测试,我们使用JdbcTemplate
执行查询并检索billrun
. 查询运行后,我们将验证表第一行中的数据是否符合我们的预期。
本地部署
在本节中,我们将部署到本地机器、Cloud Foundry 和 Kubernetes。
现在我们可以构建项目了。
-
从命令行,将目录更改为项目的位置,并通过运行以下 Maven 命令构建项目
./mvnw clean package
: -
使用处理数据库中的使用信息所需的配置运行应用程序。
要配置
billrun
应用程序,请使用以下参数:spring.datasource.url
:将 URL 设置为您的数据库实例。在以下示例中,我们通过task
端口 3306 连接到本地计算机上的 MySQL 数据库。spring.datasource.username
:用于 MySQL 数据库的用户名。在下面的示例中,它是root
.spring.datasource.password
:用于 MySQL 数据库的密码。在下面的示例中,它是password
.spring.datasource.driverClassName
: 用于连接 MySQL 数据库的驱动程序。在下面的示例中,它是com.mysql.jdbc.Driver
.spring.datasource.initialization-mode
:使用此应用程序所需的BILL_STATEMENTS
和表初始化数据库。BILL_USAGE
在下面的示例中,我们声明我们always
想要这样做。这样做不会覆盖已经存在的表。spring.batch.initialize-schema
:使用 Spring Batch 所需的表初始化数据库。在下面的示例中,我们声明我们always
想要这样做。这样做不会覆盖已经存在的表。
复制
java -jar target/billrun-0.0.1-SNAPSHOT.jar \ --spring.datasource.url=jdbc:mysql://localhost:3306/task?useSSL=false \ --spring.datasource.username=root \ --spring.datasource.password=password \ --spring.datasource.driverClassName=com.mysql.jdbc.Driver \ --spring.datasource.initialization-mode=always \ --spring.batch.initialize-schema=always
-
登录
mysql
容器查询BILL_STATEMENTS
表。为此,请运行以下命令:
复制
$ docker exec -it mysql bash -l
# mysql -u root -ppassword
mysql> select * from task.BILL_STATEMENTS;
输出应如下所示:
ID | 名 | 姓 | 分钟 | 数据使用 | 账单金额 |
---|---|---|---|---|---|
1 | 简 | 母鹿 | 500 | 1000 | 6.00 |
2 | 约翰 | 母鹿 | 550 | 1500 | 7.00 |
3 | 梅丽莎 | 史密斯 | 600 | 1550 | 7.55 |
4 | 迈克尔 | 史密斯 | 650 | 1500 | 8.00 |
5 | 玛丽 | 琼斯 | 700 | 1500 | 8.50 |
清理
要停止和删除在 Docker 实例中运行的 MySQL 容器,请运行以下命令:
复制
docker stop mysql
docker rm mysql
Kubernetes
本部分将引导您billrun
在 Kubernetes 上运行应用程序。
设置 Kubernetes 集群
对于这个例子,我们需要一个正在运行的Kubernetes 集群,并且我们部署到minikube
.
验证 Minikube 是否正在运行
要验证 Minikube 是否正在运行,请运行以下命令(显示其输出):
复制
minikube status
host: Running
kubelet: Running
apiserver: Running
kubectl: Correctly Configured: pointing to minikube-vm at 192.168.99.100
安装数据库
我们使用 Spring Cloud Data Flow 的默认配置安装 MySQL 服务器。为此,请运行以下命令:
复制
kubectl apply -f https://raw.githubusercontent.com/spring-cloud/spring-cloud-dataflow/v2.9.1/src/kubernetes/mysql/mysql-deployment.yaml \
-f https://raw.githubusercontent.com/spring-cloud/spring-cloud-dataflow/v2.9.1/src/kubernetes/mysql/mysql-pvc.yaml \
-f https://raw.githubusercontent.com/spring-cloud/spring-cloud-dataflow/v2.9.1/src/kubernetes/mysql/mysql-secrets.yaml \
-f https://raw.githubusercontent.com/spring-cloud/spring-cloud-dataflow/v2.9.1/src/kubernetes/mysql/mysql-svc.yaml
为示例任务应用程序构建 Docker 映像
我们需要为billrun
应用程序构建一个 Docker 映像。
为此,我们使用jib maven 插件。如果您下载了源代码分发版,则 jib 插件已配置。如果您从头开始构建应用程序,请在plugins
in下添加以下内容pom.xml
:
复制
<plugin>
<groupId>com.google.cloud.tools</groupId>
<artifactId>jib-maven-plugin</artifactId>
<version>0.10.1</version>
<configuration>
<from>
<image>springcloud/openjdk</image>
</from>
<to>
<image>${docker.org}/${project.artifactId}:${docker.version}</image>
</to>
<container>
<useCurrentTimestamp>true</useCurrentTimestamp>
</container>
</configuration>
</plugin>
然后在下面添加引用的属性properties
对于本例,我们使用以下属性:
复制
<docker.org>springcloudtask</docker.org>
<docker.version>${project.version}</docker.version>
现在您可以运行以下命令将映像添加到minikube
Docker 注册表:
复制
eval $(minikube docker-env)
./mvnw clean package jib:dockerBuild
运行以下命令以验证其存在(通过springcloudtask/billrun
在图像列表中查找):
复制
docker images
Spring Cloud Data Flow 已经测试了由Spring Boot 的 gradle/maven 插件、jib maven 插件和docker build
命令创建的容器。
部署应用程序
部署批处理应用程序的最简单方法是作为独立Pod。将批处理应用程序部署为作业或CronJob被认为是生产环境的最佳实践,但超出了本指南的范围。
将以下内容保存到batch-app.yaml
:
复制
apiVersion: v1
kind: Pod
metadata:
name: billrun
spec:
restartPolicy: Never
containers:
- name: task
image: springcloudtask/billrun:0.0.1-SNAPSHOT
env:
- name: SPRING_DATASOURCE_PASSWORD
valueFrom:
secretKeyRef:
name: mysql
key: mysql-root-password
- name: SPRING_DATASOURCE_URL
value: jdbc:mysql://mysql:3306/task
- name: SPRING_DATASOURCE_USERNAME
value: root
- name: SPRING_DATASOURCE_DRIVER_CLASS_NAME
value: com.mysql.jdbc.Driver
initContainers:
- name: init-mysql-database
image: mysql:5.6
env:
- name: MYSQL_PWD
valueFrom:
secretKeyRef:
name: mysql
key: mysql-root-password
command:
[
'sh',
'-c',
'mysql -h mysql -u root -e "CREATE DATABASE IF NOT EXISTS task;"',
]
要启动应用程序,请运行以下命令:
复制
kubectl apply -f batch-app.yaml
任务完成后,您应该会看到类似于以下内容的输出:
复制
kubectl get pods
NAME READY STATUS RESTARTS AGE
mysql-5cbb6c49f7-ntg2l 1/1 Running 0 4h
billrun 0/1 Completed 0 10s
现在您可以删除 pod。为此,请运行以下命令:
复制
kubectl delete -f batch-app.yaml
现在登录mysql
容器查询BILL_STATEMENTS
表。使用获取mysql
pod的名称kubectl get pods
,如前所示。然后登录查询BILL_STATEMENTS
表,如下:
复制
kubectl exec -it mysql-5cbb6c49f7-ntg2l -- /bin/bash
# mysql -u root -p$MYSQL_ROOT_PASSWORD
mysql> select * from task.BILL_STATEMENTS;
输出应如下所示:
ID | 名 | 姓 | 分钟 | 数据使用 | 账单金额 |
---|---|---|---|---|---|
1 | 简 | 母鹿 | 500 | 1000 | 6.00 |
2 | 约翰 | 母鹿 | 550 | 1500 | 7.00 |
3 | 梅丽莎 | 史密斯 | 600 | 1550 | 7.55 |
4 | 迈克尔 | 史密斯 | 650 | 1500 | 8.00 |
5 | 玛丽 | 琼斯 | 700 | 1500 | 8.50 |
要卸载mysql
,请运行以下命令:
复制
kubectl delete all -l app=mysql
数据库特定说明
微软 SQL 服务器
在同时启动多个 Spring Batch 应用程序时,使用 Spring Batch 4.x 和更早版本以及 Microsoft SQL Server 数据库,您可能会收到来自数据库的死锁。此问题已在此问题中报告。一种解决方案是创建序列而不是表并创建一个BatchConfigurer
来使用它们。
删除下表并将它们替换为使用相同名称的序列:
BATCH_STEP_EXECUTION_SEQ
BATCH_JOB_EXECUTION_SEQ
BATCH_JOB_SEQ
**注意:**确保将每个序列值设置为表的当前值id
+ 1。
一旦表被序列替换,然后更新批处理应用程序以覆盖BatchConfigurer
,这样它将使用自己的增量器。以下部分显示了此实现的一个示例:
增量器
创建自己的增量器:
复制
import javax.sql.DataSource;
import org.springframework.jdbc.support.incrementer.AbstractSequenceMaxValueIncrementer;
public class SqlServerSequenceMaxValueIncrementer extends AbstractSequenceMaxValueIncrementer {
SqlServerSequenceMaxValueIncrementer(DataSource dataSource, String incrementerName) {
super(dataSource, incrementerName);
}
@Override
protected String getSequenceQuery() {
return "select next value for " + getIncrementerName();
}
}
批处理配置器
在您的配置中创建您自己的BatchConfigurer
以利用上面显示的增量器:
复制
@Bean
public BatchConfigurer batchConfigurer(DataSource dataSource) {
return new DefaultBatchConfigurer(dataSource) {
protected JobRepository createJobRepository() {
return getJobRepository();
}
@Override
public JobRepository getJobRepository() {
JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
DefaultDataFieldMaxValueIncrementerFactory incrementerFactory =
new DefaultDataFieldMaxValueIncrementerFactory(dataSource) {
@Override
public DataFieldMaxValueIncrementer getIncrementer(String incrementerType, String incrementerName) {
return getIncrementerForApp(incrementerName);
}
};
factory.setIncrementerFactory(incrementerFactory);
factory.setDataSource(dataSource);
factory.setTransactionManager(this.getTransactionManager());
factory.setIsolationLevelForCreate("ISOLATION_REPEATABLE_READ");
try {
factory.afterPropertiesSet();
return factory.getObject();
}
catch (Exception exception) {
exception.printStackTrace();
}
return null;
}
private DataFieldMaxValueIncrementer getIncrementerForApp(String incrementerName) {
DefaultDataFieldMaxValueIncrementerFactory incrementerFactory = new DefaultDataFieldMaxValueIncrementerFactory(dataSource);
DataFieldMaxValueIncrementer incrementer = null;
if (dataSource != null) {
String databaseType;
try {
databaseType = DatabaseType.fromMetaData(dataSource).name();
}
catch (MetaDataAccessException e) {
throw new IllegalStateException(e);
}
if (StringUtils.hasText(databaseType) && databaseType.equals("SQLSERVER")) {
if (!isSqlServerTableSequenceAvailable(incrementerName)) {
incrementer = new SqlServerSequenceMaxValueIncrementer(dataSource, incrementerName);
}
}
}
if (incrementer == null) {
try {
incrementer = incrementerFactory.getIncrementer(DatabaseType.fromMetaData(dataSource).name(), incrementerName);
}
catch (Exception exception) {
exception.printStackTrace();
}
}
return incrementer;
}
private boolean isSqlServerTableSequenceAvailable(String incrementerName) {
boolean result = false;
DatabaseMetaData metaData = null;
try {
metaData = dataSource.getConnection().getMetaData();
String[] types = {"TABLE"};
ResultSet tables = metaData.getTables(null, null, "%", types);
while (tables.next()) {
if (tables.getString("TABLE_NAME").equals(incrementerName)) {
result = true;
break;
}
}
}
catch (SQLException sqe) {
sqe.printStackTrace();
}
return result;
}
};
}
**注意:**创建的隔离级别已设置为ISOLATION_REPEATABLE_READ
防止在批处理表中创建条目时出现死锁。
依赖项
要求您使用 Spring Cloud Task 2.3.3 或更高版本。这是因为 Spring Cloud Task 2.3.3 将使用一个TASK_SEQ
可用的序列。
更多推荐
所有评论(0)