为避免负载过重,控制数据消费的速率,集群任务提供了动态和静态两种流控能力。
限流原理
如上图所示,一个 Chunk(分片)的执行分为如下三个阶段,这三个阶段构建成一个循环。
Read 阶段
Reader 读取一批数据,读取数据接口返回数据列表和后续是否有新的数据标志。
Reader 根据 limiter 的限流速率向 Processor 推送数据。
Process 阶段
Processor 接收到数据后,通过多线程的方式对这批数据进行处理。处理后通过队列推送至 Writer。
Write 阶段
Writer 接收到数据后,通过多线程的方式对数据进行写操作。数据写操作完成之后,Writer 根据是否有新数据标志进行判断是否有数据,如果还有数据,则会继续读取。
限流配置
为避免 Process 阶段和 Write 阶段线程处理过程复杂,数据存放到 Process 阶段的队列时会增加一个限流器,控制数据读取和存放的速率。任务调度提供了两种限流类型:
静态限流
静态限流通过代码配置限流规则,并可通过任务调度控制台动态调整限流速率。优势是在页面没有配置限流的情况下,也可以通过限流规则的初始值进行限速,但需要您手动指定限流器。
框架提供了默认的限流器 DefaultLimiter,指定限流器示例如下:
public class ClusterJobExecuteOneHandler implements IClusterJobExecuteHandler<Integer, Integer> {
... // 省略
@Override
public ILimiter getLimiter() {
// 每秒处理 10 条数据。
return new DefaultLimiter(10);
}
... // 省略
}
如果此处设置为 null,且任务调度控制台没有设置限流规则,则表示不限速。
如果默认的限流器 DefaultLimiter 无法满足您的需求,例如需要根据业务逻辑、CPU、内存、IO 等方式进行限流时,限流模式提供了扩展接口,支持您自定义限流规则。您可以实现 ILimiter
接口,并在 getLimiter()
方法中返回实例。ILimiter
接口说明如下:
public interface ILimiter {
/**
* 设置限流速率,当 permitsPerSecond 为 NULL 或 ≤ 0 时表示不限速。
*
* @param permitsPerSecond
*/
void setRate(Integer permitsPerSecond);
/**
* 获取许可,阻塞直到获取到许可。
*
* @return
*/
void acquire();
}
setRate()
方法: 当管控出现一些配置变更时,框架会调用该方法告知用户。 如果管控没有配置,则permitsPerSecond
为 null。这部分的实现可以参考 DefaultLimiter 的实现逻辑。acquire()
方法:获取许可,如果不允许则阻塞。
如果想快速实现自定义限流器,可以参考 DefaultLimiter 的实现。
动态限流
动态限流通过任务调度控制台调整限流速率,不管是否提供限流器,动态限流都可以正常工作。 如果没有指定限流器,任务调度客户端使用默认的 DefaultLimiter 限流器。 配置方式有以下两种:
全局生效
在配置或编辑任务调度时配置的限流规则全局生效,配置生效后,每次任务都会触发限速规则。
配置任务调度的步骤请参见 创建调度任务。其中,高级选项需开启,且配置 单台机器最大处理速率。如图所示:
本次触发生效
您可以在任务执行前或执行阶段对本次任务进行限速,操作如下:
登录 SOFAStack 控制台。
在左侧导航栏选择 中间件 > 任务调度 > 任务配置。
单击目标任务名称,然后单击 调度记录 页签。
在左侧选择正在进行的任务,然后单击 处理数据量 右侧的图标。
设置指定的处理速率后,单击图标。
限速规则设置完成后立刻生效,客户端会按指定的速率执行任务。