任务拆分和执行

集群任务支持用户按业务的要求,通过多层的拆分将一个任务拆分到多个客户端上并发执行。

集群任务的开发可以分成两个阶段:拆分阶段和执行阶段。

  • 拆分阶段:对数据进行分片,不限制拆分层数,将拆分结果上报给服务端,由服务端根据拆分的 chunk(一批待处理数据的索引集合)通知客户端来拉取数据进行处理。

  • 执行阶段:客户端接收到通知后拉取数据进行处理,处理完后继续拉取新的数据,直到数据都处理完成。

应用场景案例

为了便于理解,本文使用一个场景示例来介绍集群任务的开发过程。

假设某基金公司每天需要进行一次用户清算,由于用户规模较大,因此将用户分为 100 张表,每张表有 10 万左右的用户数据。该公司选择使用两层拆分的集群任务,通过集群任务并行处理能力提高数据处理效率。

具体的实现步骤如下:

  1. 任务拆分阶段:将用户数据进行拆分,详情请参见 集群任务拆分阶段

    • 第一层拆分:按用户表维度进行数据拆分。

    • 第二层拆分:按分页维度进行数据拆分。

  2. 任务执行阶段:对每个用户数据进行处理,详情请参见 集群任务执行阶段

    • 执行模式:支持本地执行模式以及远程执行模式。

    • 线程池配置:支持使用默认线程池配置或自定义线程池配置。

  3. 控制台配置集群任务:详情请参见 创建调度任务

任务拆分阶段

任务调度提供了 IClusterJobSplitHandler 接口进行任务拆分,目前支持两种拆分方式:

  • ShardingChunkData 拆分:指定索引的拆分方式,通过索引对这个子任务(chunk)进行唯一标识,客户端会根据指定的索引拉取数据进行处理。要求填充分片规则。

    public class ShardingChunkData implements IChunkData {
        /**
         * 分片号
         */
        private String shardingRule;
    
    ......
    }
    说明

    具体使用代码,可参考 示例工程 中的 ClusterFstSplitHandler

    其中,shardingRule 是一个分片规则标识,例如可以使用 user_00、user_01 … user_99 作为分片标识。结合上述场景案例,第一层任务拆分可以使用 shardingChunkData 对 100 张用户表进行拆分,具体代码如下:

    public class UserSplitterByTable implements IClusterJobSplitHandler<ShardingChunkData> {
    
        @Override
        public SplitChunkDataResult<ShardingChunkData> handle(ClusterJobSplitContext context) {
            SplitChunkDataResult<ShardingChunkData> splitChunkDataResult = new SplitChunkDataResult<>();
            ArrayList<ShardingChunkData> shardingChunkDatas = new ArrayList<>();
            // user_00 ~ user_99
            for (int i = 0; i < 100; i++) {
                String shardingRule = "user_";
                if (i < 10) {
                    shardingRule = shardingRule +"0";
                }
                shardingRule = shardingRule + i;
                // 根据表维度拆分分片。
                ShardingChunkData shardingChunkData = new ShardingChunkData(shardingRule);
                shardingChunkDatas.add(shardingChunkData);
            }
            splitChunkDataResult.setChunkDatum(shardingChunkDatas);
            splitChunkDataResult.setSuccess(true);
            return splitChunkDataResult;
        }
    
        @Override
        public String getName() {
           // 处理器名称。
            return "USER_SPLITTER_BY_TABLE";
        }
    
        @Override
        public ThreadPoolExecutor getThreadPool() {
           // 线程池,建议使用自定义线程池。如果返回 null 则使用 SDK 自带的线程池。
            return null;
        }
    }
  • RangeChunkData 拆分:指定范围的拆分方式,指定每个子任务处理特定范围内的数据,类似于一个分页效果。要求填充起始索引、结束索引和分片规则。

    public class RangeChunkData implements IChunkData {
        /**
         * 分片规则
         */
        private String shardingRule;
    
        /**
         * 起始索引
         */
        private String start;
    
        /**
         * 结束索引
         */
        private String end;
    
    ......
    }
    说明

    具体使用代码,可参考 工程示例 中的 ClusterSecSplitHandler

    参数说明:

    • shardingRule:分片规则,例如指定表 user_01

      例如,以下配置表示处理 user_01 表中 1000 到 2000 的数据。

      RangeChunkData chunk =new RangeChunkData ();
      chunk.setShardingRule("user_01");
      chunk.setStart("1000");
      chunk.setEnd("2000");
    • start:开始索引,记录开始位置。例如 1000。

    • end:结束索引,记录结束位置(包含)。例如 2000。

    在场景案例中,第二层可以使用 RangeChunkData 将用户数据按分页维度进行拆分,具体代码如下:

    public class UserSplitterByPage implements IClusterJobSplitHandler<RangeChunkData> {
    
        @Override
        public SplitChunkDataResult<RangeChunkData> handle(ClusterJobSplitContext context) {
            // 第一层拆分的分片
            ShardingChunkData shardingChunkData = (ShardingChunkData) context.getChunkData();
            String shardingRule = shardingChunkData.getShardingRule();
    
            // 1. 根据分片查询数量
            int count = queryCountByTable(shardingChunkData.getShardingRule());
            SplitChunkDataResult<RangeChunkData> splitChunkDataResult = new SplitChunkDataResult<>();
            ArrayList<RangeChunkData> shardingChunkDatas = new ArrayList<>();
    
            // 2. 做分页处理,每页处理 1000 条
            int pageCount = 1000;
            for (int page = 0; page < count / pageCount; page++) {
                String startRows = String.valueOf(page * pageCount);
                // 包含
                String endRows = String.valueOf((page + 1) * pageCount - 1);
                RangeChunkData rangeChunkData = new RangeChunkData(shardingRule, startRows, endRows);
                shardingChunkDatas.add(rangeChunkData);
            }
    
            splitChunkDataResult.setChunkDatum(shardingChunkDatas);
            splitChunkDataResult.setSuccess(true);
            return splitChunkDataResult;
        }
    
        // mock
        private int queryCountByTable(String shardingRule) {
            if ("user_00".equals(shardingRule)) {
                // user_00 表 10 万用户
                return 100000;
            } else {
                // 其他表 9 万用户
                return 90000;
            }
        }
    
        @Override
        public String getName() {
            return "USER_SPLITTER_BY_PAGE";
        }
    
        @Override
        public ThreadPoolExecutor getThreadPool() {
            return null;
        }
    }

任务执行阶段

集群任务执行阶段分为三个子阶段:

  • Read 阶段:从数据源读取数据。

    读取数据服务需要实现接口 IReader

    public interface IReader<T>{
    
         /**
         * 读取数据
         *
         * @param context
         * @return
         */
            LoadData<T> read(ClusterJobExecuteContext context) throws Exception;
    }

    LoadData 对象数据结构如下:

    public class LoadData<T> extends MultiDataItem<T>{
    
            /**
             * 是否还有未捞取的待处理数据:值为 false 时,处理完本次捞取出的数据就回调服务端;值为 true 时,认为还有数据待处理,处理完本次捞取数据后继续捞取数据。
             */
            private boolean hasMore;
    
            public LoadData(List<T> itemList,boolean hasMore){
                super(itemList);
                this.hasMore=hasMore;
            }
    
            public static boolean isEmpty(LoadData loadData){
                return loadData==null || loadData.isEmpty();
            }
    
            public boolean isHasMore(){
                return hasMore;
            }
    
            public void setHasMore(boolean hasMore){
                this.hasMore=hasMore;
            }
    }

    使用 read 方法分批读取数据时,需要注意设置 hasMore 的值,当值为 true 时,处理完本次读取的数据后会再次进行读取数据。

    如果需要打开关闭文件,可以使用 IStreamReader 接口:

    public interface IStream {
    
        /**
         * 打开流
         *
         * @param context
         * @throws Exception
         */
        void open(ClusterJobExecuteContext context) throws Exception;
    
        /**
         * 关闭流
         *
         * @param context
         * @throws Exception
         */
        void close(ClusterJobExecuteContext context) throws Exception;
    }
    
    public interface IStreamReader<T> extends IStream, IReader<T> {
    
    }

    该接口继承自 IStream 接口,提供了 open 和 close 方法,这两个方法分别会在执行分片前和执行分片后调用。

  • Process 阶段:将读取的数据对象转换为要写入数据源的对象,该阶段不是必须阶段,可以不设置。

    处理数据服务需要实现接口 IProcessor

    public interface IProcessor<I, O> {
    
        /**
         * 数据处理,将读取出的对象加工转换为要处理的对象
         *
         * @param r
         * @return
         */
        DataProcessResult<O> process(ClusterJobExecuteContext context, I i) throws Exception;
    
        /**
         * 数据处理线程池,没有配置时使用handler的线程池
         *
         * @return
         */
        ThreadPoolExecutor getProcessThreadPool();
    }

    当不需要对读取的数据进行转换时,可以不设置该服务,这样会直接把读取的数据交给写服务。该服务可以单独指定线程池,当没有设置时默认使用任务 handler 的线程池。

  • Write 阶段:将数据写入数据源。

    说明

    集群任务 Write 阶段的相关代码,可参考 工程示例 中的 ClusterExecuteHandler

    写数据服务需要实现接口 IWriter

    public interface IWriter<T> {
    
        /**
         * 每次执行的数据块包含的数据量最大值
         *
         * @return
         */
        int getCountPerWrite();
    
        /**
         * 写数据
         *
         * @param context
         * @param dataItem
         * @return
         * @throws Exception
         */
        ClientCommonResult write(ClusterJobExecuteContext context,
                                 IDataItem<T> dataItem) throws Exception;
    
        /**
         * 写数据线程池,没有配置时使用handler的线程池
         *
         * @return
         */
        ThreadPoolExecutor getWriteThreadPool();
    }
    

    getCountPerWrite 方法用来设置每次批量 write 的数据量,值小于等于 0 时认为一次只写入一条数据。该服务可以单独设置线程池,当没有设置时默认使用任务 handler 的线程池。

    IDataItem 就是待处理的数据对象,有两种类型:

    • SingleDataItem:只包含一条数据,当 getCountPerWrite 方法的返回值小于或等于 1 时是该类型。

    • MultiDataItem:包含多条数据,当 getCountPerWrite 方法的返回值大于 1 时是该类型。

    可根据实际情况对 dataItem 进行类型转换,例如:

    switch(dataItem.getType()){
        case SINGLE:
             ((SingleDataItem<Integer>)dataItem).getItem();
             break;
        case MULTIPLE:
             (MultiDataItem<Integer>)dataItem).getItemList();
             break;
        default:
             break;
    }

    如果需要打开关闭文件,可以使用接口 IStreamWriter

    public interface IStreamWriter<T> extends IStream,IWriter<T>{
    
    }

    该接口继承了 IStream 接口,提供了 open 和 close 方法,这两个方法分别会在执行分片前和执行分片后调用。

代码示例

结合上述应用场景案例,在执行阶段,需要根据拆分的分片从数据库或其他数据源中拉取数据进行处理,然后落库。

  • Read 阶段:根据分片信息(shardingRule、start、end)到指定表里拉取指定范围的用户数据。

  • Process 阶段:对拉取到的用户数据进行处理。

  • Write 阶段:将处理后的数据落库。

具体代码如下:

public class UserProcessHandler implements IClusterJobExecuteHandler {
    private final Logger LOGGER = LoggerFactory.getLogger(ClusterJobExecutor.class);
    private ThreadPoolExecutor threadPool;

    @Override
    public void preExecute(ClusterJobExecuteContext context) {
        // 前置处理
        LOGGER.info(String.format("preExecute chunkId:%s, param:%s",
                context.getChunkId(), context.getCustomParams()));
    }

    @Override
    public IReader getReader() {
        return new IReader() {
            @Override
            public LoadData<String> read(ClusterJobExecuteContext context) throws Exception {
                IChunkData chunkData = context.getChunk();
                RangeChunkData rangeChunkData = (RangeChunkData) chunkData;

                // 根据索引去查询数据
                //  您可以根据开始和结束索引,从数据库中分页查询。集群任务按照您设置的分片规则进行分片,然后去执行分片。具体的分片内容和执行逻辑,由您自行实现。
                String shardingRule = rangeChunkData.getShardingRule();
                int start = Integer.parseInt(rangeChunkData.getStart());
                int end = Integer.parseInt(rangeChunkData.getEnd());

                // 模拟查询数据
                List<String> stringList = queryUserInfo(shardingRule, start, end);

               // 一次查询,后面没有数据
                boolean hasMore = false;
                return new LoadData<String>(stringList, hasMore);
            }

            private List<String> queryUserInfo(String shardingRule, int start, int end) {
                List<String> stringList = Lists.newArrayList();
                for (int userId = start; userId <= end; userId++) {
                    stringList.add(shardingRule + "" + userId);
                }
                return stringList;
            }
        };
    }

    @Override
    public IProcessor getProcessor() {
        return new IProcessor() {
            @Override
            public DataProcessResult process(ClusterJobExecuteContext context,
                                             Object data) throws Exception {
                // 处理数据
                System.out.println("process user" + data);
                return new DataProcessResult(true, "", data);
            }

            @Override
            public ThreadPoolExecutor getProcessThreadPool() {
                return null;
            }
        };
    }

    @Override
    public IWriter getWriter() {
        return new IWriter<String>() {
            @Override
            public int getCountPerWrite() {
                return 1;
            }

            @Override
            public ClientCommonResult write(ClusterJobExecuteContext context,
                                            IDataItem<String> dataItem)throwsException {
                // 数据存储
                switch (dataItem.getType()) {
                    case SINGLE:
                        // 单个数据块,只包含一条数据
                        SingleDataItem<String> singleDataItem = (SingleDataItem<String>) dataItem;
                        LOGGER.info(String.format("getWriter write single data:%s", singleDataItem.getItem()));
                        System.out.println(String.format("write single data:%s", singleDataItem.getItem()));
                        break;
                    case MULTIPLE:
                        // 复合数据块,包含多条数据。比如任务停止时会将多条数据传入
                        MultiDataItem<String> multiDataItem = (MultiDataItem<String>) dataItem;
                        LOGGER.info(String.format("getWriter write multi data:%s", multiDataItem.getItemList()));
                        break;
                    default:
                        break;
                }
                return ClientCommonResult.buildSuccessResult();
            }

            @Override
            public ThreadPoolExecutor getWriteThreadPool() {
                return BlockingThreadPool.getThreadPool();
            }
        };
    }

    @Override
    public ILimiter getLimiter() {
        return new DefaultLimiter(10);
    }

    @Override
    public void postExecute(ClusterJobExecuteContext context) {
// 后置处理
        LOGGER.info(String.format("JobExecuteHandler postExecute chunkId:%s, param:%s",
                context.getChunkId(), context.getCustomParams()));
    }

    @Override
    public Progress calProgress(ClusterJobExecuteContext context) {
        return new Progress();
    }

    @Override
    public boolean isProcessAsync() {
        return true;
    }

    @Override
    public String getName() {
        return "USER_PROCESS_HANDLER";
    }

    @Override
    public ThreadPoolExecutor getThreadPool() {
        return this.threadPool;
    }

    public void setThreadPool(ThreadPoolExecutor threadPool) {
        this.threadPool = CommonThreadPool.getThreadPool("REMOTE_EXECUTE");
    }
}

执行模式

集群任务的执行阶段支持两种模式:

本机执行

本机执行时需要实现接口 IClusterJobExecuteHandler,如下:

public interface IJobHandler {

    /**
     * handler的名字
     *
     * @return
     */
    String getName();

    /**
     * 可以留空, 使用默认执行线程池
     *
     * @return
     */
    ThreadPoolExecutor getThreadPool();

}

public interface IClusterJobExecuteHandler<I, O> extends IJobHandler {

    /**
     * 预处理
     *
     * @param context
     */
    void preExecute(ClusterJobExecuteContext context);

    /**
     * 获取数据读取服务
     *
     * @return
     */
    IReader<I> getReader();

    /**
     * 获取数据清洗服务,返回NULL时会跳过数据清洗步骤,直接处理读取出的数据
     *
     * @return
     */
    IProcessor<I, O> getProcessor();

    /**
     * @return
     */
    IWriter<O> getWriter();

    /**
     * 获取限流器,返回NULL时使用默认限流器
     *
     * @return
     */
    ILimiter getLimiter();

    /**
     * 本次执行完的后置处理
     *
     * @param context
     * @return
     */
    void postExecute(ClusterJobExecuteContext context);

    /**
     * 计算处理进度
     *
     * @param context
     * @return
     */
    Progress calProgress(ClusterJobExecuteContext context);

    /**
     * 是否异步处理,开启异步执行后,reader-process-write阶段全部是异步处理
     *
     * @return
     */
    boolean isProcessAsync();
}

实现 IClusterJobExecuteHandler 的类负责处理任务。实现时需要设置:

  • IReader

  • IWriter

  • IProcessor:非必须设置,当没有设置 IProcessor 时会把读取出的数据直接交给 IWriter 服务。

  • ILimiter:没有设置时使用默认的限流服务。

isProcessAsync 返回 true 时,Reader、Process、Write 阶段全部是异步处理,即读取的数据放到队列给 Process 或 Write 消费后会立即开始下一次读取;当返回 false 时,读取的数据需要等待被 Write 消费完后开始下一次读取。

重要

IReader 服务如果配置成了 bean,需要确保服务是无状态的,否则多线程场景下会相互干扰。如果需要存储一些处理过程中的数据,需要在 getReader 方法里新建一个 IReader 服务实例。IWriter 服务也是如此。

远程执行

远程执行时需要实现接口 IRemoteClusterJobExecuteHandler,如下:

public interface IRemoteClusterJobExecuteHandler<T> extends IClusterJobExecuteHandler<T>{

        /**
         * 设置路由策略,没有设置时默认为轮询
         *
         * @return
         */
        IClusterRouter getClusterRouter();

        /**
         * 分发完一次load出来的数据后的休眠时间,防止分发出去的数据还没有被处理完导致的重复捞取,单位是ms,返回值<=0时认为不休眠
         *
         * @return
         */
        int getSleepAfterPerLoad();
}

该接口继承自 IClusterJobExecuteHandler,需要额外两个方法:getClusterRoutergetSleepAfterPerLoad

  • 当远程处理数据时,客户端会把捞取的数据分发给业务集群内的其它机器进行处理,getClusterRouter 方法用来设置分发时的路由规则,目前支持随机(RandomClusterRouter)和轮询(RoundRobinClusterRouter)两种方式,当方法返回 null 时默认为轮询。

  • getSleepAfterPerLoad 方法用来设置处理完读取的数据后的休眠时间,防止分发出去的数据还没有被处理完导致的重复捞取,单位为 ms,仅当 isProcessAsync 返回 false 时生效。

集群内的远程 RPC 调用使用 oneway 模式,所以无法获取数据的处理结果,需要业务方自己记录处理结果。

可以通过实现接口 IRemoteProcessorExecutor 来设置接收远程分发请求的线程池,将实现类发布成 bean 即可,接口如下:

public interface IRemoteProcessorExecutor{

/**
     * 获取线程池
     *
     * @return
     */
     Executor getExecutor();
}

另外,远程调用需要在客户端启动一个 RpcServer:

  • 对于 SOFABoot 项目,在工程的 application.properties 文件中使用配置项:

    com.alipay.sofa.antscheduler.remote.execute.enable=true
    com.alipay.sofa.antscheduler.remote.execute.port=xxx
  • 对于非 SOFABoot 项目,需要在初始化客户端的时候设置下面两个参数:

    public class Config{
    
    ......
    
    /**
       * 是否支持远程执行
       */
    private boolean isEnableRemoteExecute =false;
    
    /**
       * 远程服务端口,没有设置时使用默认端口9989
       */
    private int  remoteServerPort;
    ......
    }

线程池默认配置

IProcessor 和 IWriter 的线程池没有设置时,默认使用 IJobHandler 里设置的线程池。当 IJobHandler 也没有设置时,使用的是客户端默认指定的,配置参数为:

minPoolSize:20
maxPoolSize:300
queueSize:100
keepAliveTime:1小时

当没有设置 IRemoteProcessExecutor 时,默认使用 bolt 协议提供的默认线程池,配置参数为:

minPoolSize:20
maxPoolSize:400
queueSize:600
keepAliveTime:60秒

您也可以对 IProcessor、IWriter、IJobHandler 以及 IRemoteProcessExecutor 自定义设置线程池。