集群任务限流

为避免负载过重,控制数据消费的速率,集群任务提供了动态和静态两种流控能力。

限流原理

1621487022318-e23ce1d5-58fc-46d7-bc63-f75d282b6b79如上图所示,一个 Chunk(分片)的执行分为如下三个阶段,这三个阶段构建成一个循环。

  1. Read 阶段

    Reader 读取一批数据,读取数据接口返回数据列表和后续是否有新的数据标志。

    Reader 根据 limiter 的限流速率向 Processor 推送数据。

  2. Process 阶段

    Processor 接收到数据后,通过多线程的方式对这批数据进行处理。处理后通过队列推送至 Writer。

  3. Write 阶段

    Writer 接收到数据后,通过多线程的方式对数据进行写操作。数据写操作完成之后,Writer 根据是否有新数据标志进行判断是否有数据,如果还有数据,则会继续读取。

限流配置

为避免 Process 阶段和 Write 阶段线程处理过程复杂,数据存放到 Process 阶段的队列时会增加一个限流器,控制数据读取和存放的速率。任务调度提供了两种限流类型:

静态限流

静态限流通过代码配置限流规则,并可通过任务调度控制台动态调整限流速率。优势是在页面没有配置限流的情况下,也可以通过限流规则的初始值进行限速,但需要您手动指定限流器。

框架提供了默认的限流器 DefaultLimiter,指定限流器示例如下:

public class ClusterJobExecuteOneHandler implements IClusterJobExecuteHandler<Integer, Integer> {

    ... // 省略

    @Override
    public ILimiter getLimiter() {
        // 每秒处理 10 条数据。
        return new DefaultLimiter(10);
    }
   
    ...   // 省略
}

如果此处设置为 null,且任务调度控制台没有设置限流规则,则表示不限速。

如果默认的限流器 DefaultLimiter 无法满足您的需求,例如需要根据业务逻辑、CPU、内存、IO 等方式进行限流时,限流模式提供了扩展接口,支持您自定义限流规则。您可以实现 ILimiter 接口,并在 getLimiter() 方法中返回实例。ILimiter 接口说明如下:

public interface ILimiter {

    /**
     * 设置限流速率,当 permitsPerSecond 为 NULL 或 ≤ 0 时表示不限速。
     *
     * @param permitsPerSecond
     */
    void setRate(Integer permitsPerSecond);

    /**
     * 获取许可,阻塞直到获取到许可。
     *
     * @return
     */
    void acquire();
}
  • setRate() 方法: 当管控出现一些配置变更时,框架会调用该方法告知用户。 如果管控没有配置,则 permitsPerSecond 为 null。这部分的实现可以参考 DefaultLimiter 的实现逻辑。

  • acquire() 方法:获取许可,如果不允许则阻塞。

如果想快速实现自定义限流器,可以参考 DefaultLimiter 的实现。

动态限流

动态限流通过任务调度控制台调整限流速率,不管是否提供限流器,动态限流都可以正常工作。 如果没有指定限流器,任务调度客户端使用默认的 DefaultLimiter 限流器。 配置方式有以下两种:

  • 全局生效

    在配置或编辑任务调度时配置的限流规则全局生效,配置生效后,每次任务都会触发限速规则。

    配置任务调度的步骤请参见 创建调度任务。其中,高级选项需开启,且配置 单台机器最大处理速率。如图所示:边框

  • 本次触发生效

    您可以在任务执行前或执行阶段对本次任务进行限速,操作如下:

    1. 登录 SOFAStack 控制台

    2. 在左侧导航栏选择 中间件 > 任务调度 > 任务配置

    3. 单击目标任务名称,然后单击 调度记录 页签。

    4. 在左侧选择正在进行的任务,然后单击 处理数据量 右侧的编辑图标。版本号

    5. 设置指定的处理速率后,单击对号图标。

      限速规则设置完成后立刻生效,客户端会按指定的速率执行任务。