SpringBatch从入门到精通-3.3-并行处理-远程分区(消息聚合)【掘金日新计划】

1,098 阅读9分钟

持续创作,加速成长,6月更文活动来啦!| 掘金·日新计划

持续创作,加速成长!这是我参与「掘金日新计划 · 6 月更文挑战」的第5天,点击查看活动详情

SpringBatch从入门到精通-1【掘金日新计划】

SpringBatch从入门到精通-2-StepScope作用域和用法【掘金日新计划】

SpringBatch从入门到精通-3-并行处理【掘金日新计划】

SpringBatch从入门到精通-3.2-并行处理-远程分区【掘金日新计划】

1. 背景

上章讲到了并行处理-远程分区相关内容。但远程分区还是讲的不够细。今天分析一下消息聚合模式。消息是如何聚合的。

  • SpringIntegration分区方式

  • 任务拆分原理

  • SpringIntegration相关概念

  • 生产者是如何聚合完成状态的

    • 数据库模式
    • 队列模式

2. 分区方式

image-20220608200748931

image-20220608200800202

3. 任务拆分原理

  • 实现接口org.springframework.batch.core.partition.support.Partitioner方法
Map<String, ExecutionContext> partition(int gridSize)

分区的关键参数。最后以不同ExecutionContext在不同的分区执行。executionContext 存在数据库中

image-20220608201004393

  • MessageChannelPartitionHandler* 拆分任务总入口*

image-20220608215347432

1.消息发送(MessageProducerSupport.send)

2.接收work消息方式(1.从数据库查询的方式 2.接收反馈replies消息的形式)

3.配置超时时间 用以防止worker进程异常导致master一直无响应持续等待

4.设置分区大小(GridSize)最后调用SimpleStepExecutionSplitter 拆分分区并将分区数据入库

5.setPollInterval(5000) 为判断任务是否结束的标志。Master端需要轮询(或者通过队列监听)判断任务是否完成。

6.setJobExplorer用来设置,判断结束方式,如果设置了走数据库轮询方式。未设置需要设置replys度队列来监听结果

7.setTimeOut 用来设置worker多长时间未执行完,master就认为失败的配置

  • 任务拆分入库发送消息

image-20220608220152727

通过 MessageChannelPartitionHandler.handle(StepExecutionSplitter stepExecutionSplitter, final StepExecution masterStepExecution) 将分区任务发送到消息队列(MessageTemplate.send)

image-20220608220519988

通过SimpleStepExecutionSplitter.split(StepExecution stepExecution, int gridSize)将任务拆分成分区任务,并且分区内容入库

  • 消息实体

image-20220608220545911

GenericMessage [payload=StepExecutionRequest: [jobExecutionId=16, stepExecutionId=649, stepName=slaveStep], headers={sequenceNumber=0, correlationId=16:slaveStep, id=eaef0527-8999-9c21-5688-05d82a2290fc, sequenceSize=9, timestamp=1614217918059}]

batch_step_execution_context 表中存取具体的分区内容参数short_context字段中

image-20220608220607461

  • 接收到消息之后的处理* ChannelAwareMessageListener*

image-20220608221044036

StepExecutionRequestHandler.handle接收到消息。然后通过jobExecutionId,stepExecuutionId获取StepExecution

4 .SpringIntegration相关概念

  • 集成总流程(65种消息集成方式)

image-20220608221438043

  • 消息体

    image-20220608221543858

“消息”是对任何Java对象的一种通用包装,这种包装将会给Java对象附着一些元信息以供消息框架处理。一条“消息”由“消息体”(payload)和“消息头”(header)组成。

消息体可以是任何类型,消息头一般用于保存一些必要信息,比如id、时间戳、过期时间和返回地址等。消息头也可以用来在不同的传输协议之间传递参数。

比如,当需要包装一个文件来创建一个消息时,可以将文件名保存于消息头中,以供下游的消息组件读取使用。

比如,如果一个消息的内容会最终被Mail适配器发出,那么各种属性值(to、from、cc、subject等)可被上游的消息组件保存在消息头中。开发者可以利用消息头来保存任意的键值对。

  • 消息通道

image-20220608221616146

“消息通道”对应着“管道-过滤器”架构中的“管道角色”。消息生产者发送消息到通道,消息消费者从通道接收消息。从而,消息通道解耦了消息组件,同时也为消息拦截和监控提供了便利的切入点。

一个消息通道可以是“点对点”意义的,或者也可以是“发布-订阅”意义的。

u如果是点对点模式的通道,发布到通道中的每个消息,最多只有一个消费者可以接收。

u如果是发布-订阅模式的通道,则会尝试广播消息给其所有的订阅者。

Spring Integration对这两种模式均提供支持。

鉴于“点对点模式”和“发布订阅模式”提供了两种关于“最终有多少消息消费者接收消息”的选择,此处还有另外一项重要考虑:通道是否应该缓存消息?在Spring Integration中,轮询通道(Pollable Channels)具备缓存消息能力。缓存的优势在于它能够调节接入消息流量,从而避免消息消费者负荷过载。然而,正如其名称所示,这也会引入了一些复杂性,只有配置了轮询器后,消息消费者才能从这个通道中接收消息。而另一方面,订阅通道(Subscribable Channel)要求连接它的消费者依从简单的消息驱动模式。

  • 消息终端

    img

转换器(Transformer) 消息转换器”的作用在于“转换消息的内容或结构,返回翻转换后的消息”。可能最为常见的转换器应用方式就是将消息体(Message Payload)从一种格式转换为另一种格式(例如从XML文档转换成java.lang.String字符串)。同样地,转换器也可以被用于添加、删除和修改消息头(Message Header)中的值。

过滤器(Filter) 消息过滤器”可用来限定消息能否被传送到输出通道上。这里仅需要依据一个布尔测试来做判定。该布尔测试检查的范围诸如:“消息是否包含特定类型的消息体”,“检查消息中的某个属性值”,“消息是否包含某个消息头”,等等。如果消息通过检查就会被发送到输出通道,否则,消息将被丢弃(或者更加严格地说,应当抛出异常)。消息过滤器通常结合“发布-订阅通道”一起使用,“发布-订阅通道”会使得多个消费者接收到同样的消息,而基于一定的过滤条件设置过滤器则可以减少所需处理的消息数量。

路由器(Router) 消息路由器”负责确定消息的下一步传送将由哪些通道接收。通常,路由决策都是基于消息内容和(或)消息头中可用的元数据来完成的。消息路由常常代替静态的配置,作为一种动态地、运行时确定输出通道的装置,接入到“服务激活器”或者其他能够响应消息的装置。另外,针对前文所述的消息多播的场景,相对于被动的“消息过滤器”,“消息路由器”提供了一种主动的控制方式,来确定多个消息订阅者中的消息接收范围。

img

消息聚合器(Aggregator): 消息聚合器”基本上就是“消息分解器”的反模式。它也是一种消息终端类型,接收多条消息,然后把他们合并成一条消息。事实上,聚合器通常出现在消息管道线路中的下游位置,且相对该组件的上游位置往往会存在“消息分解器”。从技术上来说,聚合器往往比分解器更复杂,因为它需要维护状态(也就是维护正被聚合的消息),确定被聚合的整组消息何时可用,以及必要地话,还要处理超时的状况。更进一步地,在超时的情况下,聚合器还要明确是仍然发送残缺消息,或是丢弃它们。对此,Spring Integration提供了可配置的超时处理策略。

服务激活器(ServiceActivitor): 服务激活器”是一种将“服务实例”连接到“消息系统”的通用终端。对于该类型的终端,配置输入通道是必须的。而且,若被调用的服务方法具有返回值,那么此情况下或需要配置输出通道。服务激活器会调用指定服务对象上的操作,来处理请求消息。该过程中会抽取请求消息的消息体并作必要的转换(若方法参数非消息类型参数)。每当服务方法产生返回值,这个返回值同样地会作出必要的转换,而成为一条响应消息(若方法返回值非消息类型)。响应消息将会被发送到输出通道上;若没有配置输出通道,且消息的“返回地址”可用,那么该响应将会被发送到返回地址指定的通道上。

img

消息分解器(Splitter): 消息分解器”是另外一种类型的消息终端,它从对应输入通道中接收消息,然后把接收到的一个消息分解成多个消息,最终把它们发送到对应输出通道上。典型应用场景就是把一个“复合消息”分解成包含原消息各子部分的一组“子消息”

通道适配器(Channel Adapter): ”通道适配器”是一种连接消息通道到“其他系统”或是“传输端口”的消息终端。通道适配器分为“接入”或者“接出”两种。通常通道适配器被用来映射消息到其他任何发送/接收系统所需的对象或资源上(比如:文件、HTTP请求,JMS消息等等)。依赖于传输端口的情况下,通道适配器也可以填充或是抽取消息头中的值。

5. 生产者是如何聚合完成状态的

  • 数据库模式

img

MessageChannelPartitionHandler#handle方法

image-20220608224858277

MessageChannelPartitionHandler#pollRepliles方法

Poller<Collection<StepExecution>> poller = new DirectPoller<>(pollInterval);
        Future<Collection<StepExecution>> resultsFuture = poller.poll(callback);
​
        if(timeout >= 0) {
            return resultsFuture.get(timeout, TimeUnit.MILLISECONDS);
        }
        else {
            return resultsFuture.get();
        }

通过轮询的方式来查询数据库stepExecution状态

  • 队列模式

    image-20220608225223739

    解释: 在build过程中,将inputChannel的数据(监听消息reply队列的数据进行聚合,通过调用aggregate然后把数据发送给通道replies整个过程算结束。)

    image-20220608225252273

    Aggregate是split的反过程。Split由一个大人物拆成3个小任务。Aggregate是由三个小任务组合成一个大任务list(并配置超时时间)

代码位置: github.com/jackssybin/…

\