使用 Spring Batch 进行批处理

在本指南中,我们开发了一个 Spring Batch 应用程序并将其部署到 Cloud Foundry、Kubernetes 和您的本地计算机。在另一个指南中,我们使用 Data Flow部署Spring Batch 应用程序。

本指南介绍了如何从头开始构建此应用程序。如果您愿意,您可以下载一个包含billsetup应用程序源的 zip 文件,将其解压缩,然后继续进行部署步骤。

您可以从浏览器下载项目或运行以下命令从命令行下载它:

wget https://github.com/spring-cloud/spring-cloud-dataflow-samples/blob/master/dataflow-website/batch-developer-guides/batch/batchsamples/dist/batchsamples.zip?raw=true -O batchsamples.zip
开发

我们从Spring Initializr开始并创建一个 Spring Batch 应用程序。

假设手机数据提供商需要为客户创建账单。使用数据存储在存储在文件系统上的 JSON 文件中。计费解决方案必须从这些文件中提取数据,从该使用数据生成计费数据,并将其存储在BILL_STATEMENTS表中。

我们可以在使用 Spring Batch 的单个 Spring Boot 应用程序中实现整个解决方案。但是,对于此示例,我们将解决方案分为两个阶段:

  1. billsetuptask: 该billsetuptask应用程序是一个使用 Spring Cloud Task 创建BILL_STATEMENTS表的 Spring Boot 应用程序。
  2. billrun:该billrun应用程序是一个 Spring Boot 应用程序,它使用 Spring Cloud Task 和 Spring Batch 从 JSON 文件中读取每一行的使用数据和价格,并将结果数据放入BILL_STATEMENTS表中。

对于本节,我们创建一个 Spring Cloud Task 和 Spring Batchbillrun应用程序,它从一个 JSON 文件中读取使用信息,该文件包含每个条目的客户使用数据和价格,并将结果放入BILL_STATEMENTS表中。

下图显示了该BILL_STATEMENTS表:

BILL_STATMENTS

介绍 Spring Batch

Spring Batch 是一个轻量级、全面的批处理框架,旨在支持开发健壮的批处理应用程序。Spring Batch 通过提供以下功能,提供了处理大量记录所必需的可重用功能:

  • 记录/跟踪
  • 基于块的处理
  • 声明式 I/O
  • 启动/停止/重启
  • 重试/跳过
  • 资源管理

它还提供更高级的技术服务和功能,通过优化和分区技术实现超大容量和高性能的批处理作业。

对于本指南,我们将重点介绍五个 Spring Batch 组件,如下图所示:

BILL_STATMENTS

  • Job:AJob是封装了整个批处理过程的实体。一项工作由一个或多个steps.
  • Step: AStep是一个域对象,它封装了批处理作业的独立、顺序阶段。每个都由step一个ItemReader、一个ItemProcessor和一个组成ItemWriter
  • ItemReader:ItemReader是一种抽象,表示一次检索Step一个项目的输入。
  • ItemProcessor:ItemProcessor是一个抽象,代表一个项目的业务处理。
  • ItemWriter:ItemWriter是一个抽象,表示 a 的输出Step

在上图中,我们看到每个阶段JobExecution都存储在一个JobRepository(在本例中是我们的 MySQL 数据库)中。这意味着 Spring Batch 执行的每个操作都记录在数据库中,用于日志记录和重新启动作业。

注意:您可以在此处阅读有关此过程的更多信息。

我们的批量作业

因此,对于我们的应用程序,我们有一个BillRun Jobthat has one Step,其中包括:

  • JsonItemReader:ItemReader读取包含使用数据的 JSON 文件。
  • BillProcessorItemProcessor根据从 发送的每行数据生成价格的JsonItemReader
  • JdbcBatchItemWriter:ItemWriter将定价Bill记录写入BILLING_STATEMENT表中。
初始化

我们使用Spring Initializr来创建我们的应用程序。为此:

  1. 访问Spring Initializr 站点
  2. 选择最新版本的 Spring Boot。
  3. io.spring使用 Group 名称和 Artifact 名称创建一个新的 Maven 项目billrun
  4. 依赖项文本框中,键入task以选择 Cloud Task 依赖项。
  5. 依赖项文本框中,键入jdbc并选择 JDBC 依赖项。
  6. 依赖项文本框中,键入h2并选择 H2 依赖项。我们使用 H2 进行单元测试。
  7. Dependencies文本框中,键入mysql并选择 MySQL 依赖项(或您喜欢的数据库)。我们使用 MySQL 作为运行时数据库。
  8. Dependencies文本框中,键入batch然后选择 Batch。
  9. 单击生成项目按钮。
  10. 解压缩billrun.zip文件并将项目导入您喜欢的 IDE。

或者,您可以通过下载预构建文件来初始化您的项目。为此:

  1. 单击此 Spring Initializr 链接以下载预配置billrun.zip文件。
  2. 解压缩 billrun.zip 文件并将项目导入您喜欢的 IDE。
设置 MySQL

如果您没有可用的 MySQL 实例,您可以按照以下说明运行此示例的 MySQL Docker 映像:

  1. 通过运行以下命令来拉取 MySQL Docker 映像:

    复制

    docker pull mysql:5.7.25
    
  2. 通过运行以下命令启动 MySQL:

    复制

    docker run -p 3306:3306 --name mysql -e MYSQL_ROOT_PASSWORD=password \
    -e MYSQL_DATABASE=task -d mysql:5.7.25
    
构建应用程序
  1. 下载https://raw.githubusercontent.com/spring-cloud/spring-cloud-dataflow-samples/master/dataflow-website/batch-developer-guides/batch/batchsamples/billrun/src/main/resources/usageinfo.json title=usageinfo.json生成的文件并将其复制到/src/main/resources目录中。

  2. 下载https://raw.githubusercontent.com/spring-cloud/spring-cloud-dataflow-samples/master/dataflow-website/batch-developer-guides/batch/batchsamples/billrun/src/main/resources/schema.sql title=schema.sql生成的文件并将其复制到/src/main/resources目录中。

  3. 在您最喜欢的 IDE 中,创建io.spring.billrun.model包。

  4. Usage在 中创建一个io.spring.billrun.model类似于Usage.java内容的类。

  5. Bill在其中创建一个io.spring.billrun.model类似于Bill.java内容的类。

  6. 在您最喜欢的 IDE 中,创建io.spring.billrun.configuration包。

  7. 为每条记录ItemProcessor定价。Usage为此,请在如下所示的清单中创建一个BillProcessor类:io.spring.billrun.configuration

    复制

    public class BillProcessor implements ItemProcessor<Usage, Bill> {
    
      @Override
      public Bill process(Usage usage) {
         Double billAmount = usage.getDataUsage() * .001 + usage.getMinutes() * .01;
         return new Bill(usage.getId(), usage.getFirstName(), usage.getLastName(),
               usage.getDataUsage(), usage.getMinutes(), billAmount);
      }
    }
    

    请注意,我们实现了具有我们需要重写ItemProcessor的方法的接口。process我们的参数是一个Usage对象,返回值是类型Bill

  8. 现在我们可以创建一个 Java 配置来指定BillRun Job. 在这种情况下,我们需要BillingConfiguration在包中创建一个类,io.spring.billrun.configuration如下所示:

    复制

    @Configuration
    @EnableTask
    @EnableBatchProcessing
    public class BillingConfiguration {
      @Autowired
      public JobBuilderFactory jobBuilderFactory;
    
      @Autowired
      public StepBuilderFactory stepBuilderFactory;
    
      @Value("${usage.file.name:classpath:usageinfo.json}")
      private Resource usageResource;
    
      @Bean
      public Job job1(ItemReader<Usage> reader,
        ItemProcessor<Usage,Bill> itemProcessor, ItemWriter<Bill> writer) {
          Step step = stepBuilderFactory.get("BillProcessing")
                  .<Usage, Bill>chunk(1)
                  .reader(reader)
                  .processor(itemProcessor)
                  .writer(writer)
                  .build();
    
          return jobBuilderFactory.get("BillJob")
                  .incrementer(new RunIdIncrementer())
                  .start(step)
                  .build();
      }
    
      @Bean
      public JsonItemReader<Usage> jsonItemReader() {
    
          ObjectMapper objectMapper = new ObjectMapper();
          JacksonJsonObjectReader<Usage> jsonObjectReader =
                  new JacksonJsonObjectReader<>(Usage.class);
          jsonObjectReader.setMapper(objectMapper);
    
          return new JsonItemReaderBuilder<Usage>()
                  .jsonObjectReader(jsonObjectReader)
                  .resource(usageResource)
                  .name("UsageJsonItemReader")
                  .build();
      }
    
      @Bean
      public ItemWriter<Bill> jdbcBillWriter(DataSource dataSource) {
          JdbcBatchItemWriter<Bill> writer = new JdbcBatchItemWriterBuilder<Bill>()
                          .beanMapped()
                  .dataSource(dataSource)
                  .sql("INSERT INTO BILL_STATEMENTS (id, first_name, " +
                     "last_name, minutes, data_usage,bill_amount) VALUES " +
                     "(:id, :firstName, :lastName, :minutes, :dataUsage, " +
                     ":billAmount)")
                  .build();
          return writer;
      }
    
      @Bean
      ItemProcessor<Usage, Bill> billProcessor() {
          return new BillProcessor();
      }
    }
    

    @EnableBatchProcessing注释启用 Spring Batch 功能并提供用于设置批处理作业的基本配置。@EnableTask注释设置了一个,它存储了有关任务执行的TaskRepository信息(例如任务的开始和结束时间以及退出代码)。在前面的配置中,我们看到我们的ItemReaderbean 是JsonItemReader. 该JsonItemReader实例读取资源的内容并将 JSON 数据解组为Usage对象。JsonItemReaderItemReaderSpring Batch 提供的实现之一。我们还看到我们的ItemWriterbean 是JdbcBatchItemWriter. 该JdbcBatchItemWriter实例将结果写入我们的数据库。JdbcBatchItemWriterItemWriterSpring Batch 提供的实现之一。这ItemProcessor是我们自己的BillProcessor. 请注意,所有使用 Spring Batch 提供的类 ( Job, Step, ItemReader, ItemWriter) 的 bean 都是使用 Spring Batch 提供的构建器构建的,这意味着更少的编码。

测试

现在我们已经编写了代码,是时候编写测试了。在这种情况下,我们要确保账单信息已正确插入到BILLING_STATEMENTS表中。要创建您的测试,请更新BillrunApplicationTests.java,使其类似于以下清单:

复制

package io.spring.billrun;

import io.spring.billrun.model.Bill;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.jdbc.core.JdbcTemplate;

import javax.sql.DataSource;
import java.util.List;

import static org.assertj.core.api.Assertions.assertThat;

@SpringBootTest
public class BillRunApplicationTests {

	@Autowired
	private DataSource dataSource;

	private JdbcTemplate jdbcTemplate;

	@BeforeEach
	public void setup() {
		this.jdbcTemplate = new JdbcTemplate(this.dataSource);
	}

	@Test
	public void testJobResults() {
		List<Bill> billStatements = this.jdbcTemplate.query("select id, " +
						"first_name, last_name, minutes, data_usage, bill_amount " +
						"FROM bill_statements ORDER BY id",
				(rs, rowNum) -> new Bill(rs.getLong("id"),
						rs.getString("FIRST_NAME"), rs.getString("LAST_NAME"),
						rs.getLong("DATA_USAGE"), rs.getLong("MINUTES"),
						rs.getDouble("bill_amount")));

		assertThat(billStatements.size()).isEqualTo(5);
		Bill billStatement = billStatements.get(0);
		assertThat(billStatement.getBillAmount()).isEqualTo(6.0);
		assertThat(billStatement.getFirstName()).isEqualTo("jane");
		assertThat(billStatement.getLastName()).isEqualTo("doe");
		assertThat(billStatement.getId()).isEqualTo(1);
		assertThat(billStatement.getMinutes()).isEqualTo(500);
		assertThat(billStatement.getDataUsage()).isEqualTo(1000);

	}
}

对于此测试,我们使用JdbcTemplate执行查询并检索billrun. 查询运行后,我们将验证表第一行中的数据是否符合我们的预期。

本地部署

在本节中,我们将部署到本地机器、Cloud Foundry 和 Kubernetes。

现在我们可以构建项目了。

  1. 从命令行,将目录更改为项目的位置,并通过运行以下 Maven 命令构建项目./mvnw clean package

  2. 使用处理数据库中的使用信息所需的配置运行应用程序。

    要配置billrun应用程序,请使用以下参数:

    • spring.datasource.url:将 URL 设置为您的数据库实例。在以下示例中,我们通过task端口 3306 连接到本地计算机上的 MySQL 数据库。
    • spring.datasource.username:用于 MySQL 数据库的用户名。在下面的示例中,它是root.
    • spring.datasource.password:用于 MySQL 数据库的密码。在下面的示例中,它是password.
    • spring.datasource.driverClassName: 用于连接 MySQL 数据库的驱动程序。在下面的示例中,它是com.mysql.jdbc.Driver.
    • spring.datasource.initialization-mode:使用此应用程序所需的BILL_STATEMENTS和表初始化数据库。BILL_USAGE在下面的示例中,我们声明我们always想要这样做。这样做不会覆盖已经存在的表。
    • spring.batch.initialize-schema:使用 Spring Batch 所需的表初始化数据库。在下面的示例中,我们声明我们always想要这样做。这样做不会覆盖已经存在的表。

    复制

    java -jar target/billrun-0.0.1-SNAPSHOT.jar \
    --spring.datasource.url=jdbc:mysql://localhost:3306/task?useSSL=false \
    --spring.datasource.username=root \
    --spring.datasource.password=password \
    --spring.datasource.driverClassName=com.mysql.jdbc.Driver \
    --spring.datasource.initialization-mode=always \
    --spring.batch.initialize-schema=always
    
  3. 登录mysql容器查询BILL_STATEMENTS表。为此,请运行以下命令:

复制

$ docker exec -it mysql bash -l
# mysql -u root -ppassword
mysql> select * from task.BILL_STATEMENTS;

输出应如下所示:

ID分钟数据使用账单金额
1母鹿50010006.00
2约翰母鹿55015007.00
3梅丽莎史密斯60015507.55
4迈克尔史密斯65015008.00
5玛丽琼斯70015008.50

清理

要停止和删除在 Docker 实例中运行的 MySQL 容器,请运行以下命令:

复制

docker stop mysql
docker rm mysql
Kubernetes

本部分将引导您billrun在 Kubernetes 上运行应用程序。

设置 Kubernetes 集群

对于这个例子,我们需要一个正在运行的Kubernetes 集群,并且我们部署到minikube.

验证 Minikube 是否正在运行

要验证 Minikube 是否正在运行,请运行以下命令(显示其输出):

复制

minikube status
host: Running
kubelet: Running
apiserver: Running
kubectl: Correctly Configured: pointing to minikube-vm at 192.168.99.100
安装数据库

我们使用 Spring Cloud Data Flow 的默认配置安装 MySQL 服务器。为此,请运行以下命令:

复制

kubectl apply -f https://raw.githubusercontent.com/spring-cloud/spring-cloud-dataflow/v2.9.1/src/kubernetes/mysql/mysql-deployment.yaml \
-f https://raw.githubusercontent.com/spring-cloud/spring-cloud-dataflow/v2.9.1/src/kubernetes/mysql/mysql-pvc.yaml \
-f https://raw.githubusercontent.com/spring-cloud/spring-cloud-dataflow/v2.9.1/src/kubernetes/mysql/mysql-secrets.yaml \
-f https://raw.githubusercontent.com/spring-cloud/spring-cloud-dataflow/v2.9.1/src/kubernetes/mysql/mysql-svc.yaml
为示例任务应用程序构建 Docker 映像

我们需要为billrun应用程序构建一个 Docker 映像。

为此,我们使用jib maven 插件。如果您下载了源代码分发版,则 jib 插件已配置。如果您从头开始构建应用程序,请在pluginsin下添加以下内容pom.xml

复制

<plugin>
    <groupId>com.google.cloud.tools</groupId>
    <artifactId>jib-maven-plugin</artifactId>
    <version>0.10.1</version>
    <configuration>
        <from>
            <image>springcloud/openjdk</image>
        </from>
        <to>
            <image>${docker.org}/${project.artifactId}:${docker.version}</image>
        </to>
        <container>
            <useCurrentTimestamp>true</useCurrentTimestamp>
        </container>
    </configuration>
</plugin>

然后在下面添加引用的属性properties对于本例,我们使用以下属性:

复制

<docker.org>springcloudtask</docker.org>
<docker.version>${project.version}</docker.version>

现在您可以运行以下命令将映像添加到minikubeDocker 注册表:

复制

eval $(minikube docker-env)
./mvnw clean package jib:dockerBuild

运行以下命令以验证其存在(通过springcloudtask/billrun在图像列表中查找):

复制

docker images

Spring Cloud Data Flow 已经测试了由Spring Boot 的 gradle/maven 插件jib maven 插件docker build命令创建的容器。

部署应用程序

部署批处理应用程序的最简单方法是作为独立Pod。将批处理应用程序部署为作业CronJob被认为是生产环境的最佳实践,但超出了本指南的范围。

将以下内容保存到batch-app.yaml

复制

apiVersion: v1
kind: Pod
metadata:
  name: billrun
spec:
  restartPolicy: Never
  containers:
    - name: task
      image: springcloudtask/billrun:0.0.1-SNAPSHOT
      env:
        - name: SPRING_DATASOURCE_PASSWORD
          valueFrom:
            secretKeyRef:
              name: mysql
              key: mysql-root-password
        - name: SPRING_DATASOURCE_URL
          value: jdbc:mysql://mysql:3306/task
        - name: SPRING_DATASOURCE_USERNAME
          value: root
        - name: SPRING_DATASOURCE_DRIVER_CLASS_NAME
          value: com.mysql.jdbc.Driver
  initContainers:
    - name: init-mysql-database
      image: mysql:5.6
      env:
        - name: MYSQL_PWD
          valueFrom:
            secretKeyRef:
              name: mysql
              key: mysql-root-password
      command:
        [
          'sh',
          '-c',
          'mysql -h mysql -u root -e "CREATE DATABASE IF NOT EXISTS task;"',
        ]

要启动应用程序,请运行以下命令:

复制

kubectl apply -f batch-app.yaml

任务完成后,您应该会看到类似于以下内容的输出:

复制

kubectl get pods
NAME                     READY   STATUS      RESTARTS   AGE
mysql-5cbb6c49f7-ntg2l   1/1     Running     0          4h
billrun                  0/1     Completed   0          10s

现在您可以删除 pod。为此,请运行以下命令:

复制

kubectl delete -f batch-app.yaml

现在登录mysql容器查询BILL_STATEMENTS表。使用获取mysqlpod的名称kubectl get pods,如前所示。然后登录查询BILL_STATEMENTS表,如下:

复制

kubectl exec -it mysql-5cbb6c49f7-ntg2l -- /bin/bash
# mysql -u root -p$MYSQL_ROOT_PASSWORD
mysql> select * from task.BILL_STATEMENTS;

输出应如下所示:

ID分钟数据使用账单金额
1母鹿50010006.00
2约翰母鹿55015007.00
3梅丽莎史密斯60015507.55
4迈克尔史密斯65015008.00
5玛丽琼斯70015008.50

要卸载mysql,请运行以下命令:

复制

kubectl delete all -l app=mysql
数据库特定说明
微软 SQL 服务器

在同时启动多个 Spring Batch 应用程序时,使用 Spring Batch 4.x 和更早版本以及 Microsoft SQL Server 数据库,您可能会收到来自数据库的死锁。此问题已在此问题中报告。一种解决方案是创建序列而不是表并创建一个BatchConfigurer来使用它们。
删除下表并将它们替换为使用相同名称的序列:

  • BATCH_STEP_EXECUTION_SEQ
  • BATCH_JOB_EXECUTION_SEQ
  • BATCH_JOB_SEQ

**注意:**确保将每个序列值设置为表的当前值id+ 1。

一旦表被序列替换,然后更新批处理应用程序以覆盖BatchConfigurer,这样它将使用自己的增量器。以下部分显示了此实现的一个示例:

增量器

创建自己的增量器:

复制

import javax.sql.DataSource;

import org.springframework.jdbc.support.incrementer.AbstractSequenceMaxValueIncrementer;

public class SqlServerSequenceMaxValueIncrementer extends AbstractSequenceMaxValueIncrementer {

	SqlServerSequenceMaxValueIncrementer(DataSource dataSource, String incrementerName) {
		super(dataSource, incrementerName);
	}
	@Override
	protected String getSequenceQuery() {
		return "select next value for " + getIncrementerName();
	}
}
批处理配置器

在您的配置中创建您自己的BatchConfigurer以利用上面显示的增量器:

复制

@Bean
public BatchConfigurer batchConfigurer(DataSource dataSource) {
    return new DefaultBatchConfigurer(dataSource) {
        protected JobRepository createJobRepository() {
            return getJobRepository();
        }

        @Override
        public JobRepository getJobRepository() {
            JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
            DefaultDataFieldMaxValueIncrementerFactory incrementerFactory =
                    new DefaultDataFieldMaxValueIncrementerFactory(dataSource) {
                        @Override
                        public DataFieldMaxValueIncrementer getIncrementer(String incrementerType, String incrementerName) {
                            return getIncrementerForApp(incrementerName);
                        }
                    };
            factory.setIncrementerFactory(incrementerFactory);
            factory.setDataSource(dataSource);
            factory.setTransactionManager(this.getTransactionManager());
            factory.setIsolationLevelForCreate("ISOLATION_REPEATABLE_READ");
            try {
                factory.afterPropertiesSet();
                return factory.getObject();
            }
            catch (Exception exception) {
                exception.printStackTrace();
            }

            return null;
        }

        private DataFieldMaxValueIncrementer getIncrementerForApp(String incrementerName) {

            DefaultDataFieldMaxValueIncrementerFactory incrementerFactory = new DefaultDataFieldMaxValueIncrementerFactory(dataSource);
            DataFieldMaxValueIncrementer incrementer = null;
            if (dataSource != null) {
                String databaseType;
                try {
                    databaseType = DatabaseType.fromMetaData(dataSource).name();
                }
                catch (MetaDataAccessException e) {
                    throw new IllegalStateException(e);
                }
                if (StringUtils.hasText(databaseType) && databaseType.equals("SQLSERVER")) {
                    if (!isSqlServerTableSequenceAvailable(incrementerName)) {
                        incrementer = new SqlServerSequenceMaxValueIncrementer(dataSource, incrementerName);
                    }
                }
            }
            if (incrementer == null) {
                try {
                    incrementer = incrementerFactory.getIncrementer(DatabaseType.fromMetaData(dataSource).name(), incrementerName);
                }
                catch (Exception exception) {
                    exception.printStackTrace();
                }
            }
            return incrementer;
        }

        private boolean isSqlServerTableSequenceAvailable(String incrementerName) {
            boolean result = false;
            DatabaseMetaData metaData = null;
            try {
                metaData = dataSource.getConnection().getMetaData();
                String[] types = {"TABLE"};
                ResultSet tables = metaData.getTables(null, null, "%", types);
                while (tables.next()) {
                    if (tables.getString("TABLE_NAME").equals(incrementerName)) {
                        result = true;
                        break;
                    }
                }
            }
            catch (SQLException sqe) {
                sqe.printStackTrace();
            }
            return result;
        }
    };
}

**注意:**创建的隔离级别已设置为ISOLATION_REPEATABLE_READ防止在批处理表中创建条目时出现死锁。

依赖项

要求您使用 Spring Cloud Task 2.3.3 或更高版本。这是因为 Spring Cloud Task 2.3.3 将使用一个TASK_SEQ可用的序列。

Logo

开源、云原生的融合云平台

更多推荐