集群任务支持用户按业务的要求,通过多层的拆分将一个任务拆分到多个客户端上并发执行。
集群任务的开发可以分成两个阶段:拆分阶段和执行阶段。
拆分阶段:对数据进行分片,不限制拆分层数,将拆分结果上报给服务端,由服务端根据拆分的 chunk(一批待处理数据的索引集合)通知客户端来拉取数据进行处理。
执行阶段:客户端接收到通知后拉取数据进行处理,处理完后继续拉取新的数据,直到数据都处理完成。
应用场景案例
为了便于理解,本文使用一个场景示例来介绍集群任务的开发过程。
假设某基金公司每天需要进行一次用户清算,由于用户规模较大,因此将用户分为 100 张表,每张表有 10 万左右的用户数据。该公司选择使用两层拆分的集群任务,通过集群任务并行处理能力提高数据处理效率。
具体的实现步骤如下:
任务拆分阶段:将用户数据进行拆分,详情请参见 集群任务拆分阶段。
第一层拆分:按用户表维度进行数据拆分。
第二层拆分:按分页维度进行数据拆分。
任务执行阶段:对每个用户数据进行处理,详情请参见 集群任务执行阶段。
执行模式:支持本地执行模式以及远程执行模式。
线程池配置:支持使用默认线程池配置或自定义线程池配置。
控制台配置集群任务:详情请参见 创建调度任务。
任务拆分阶段
任务调度提供了 IClusterJobSplitHandler
接口进行任务拆分,目前支持两种拆分方式:
ShardingChunkData 拆分:指定索引的拆分方式,通过索引对这个子任务(chunk)进行唯一标识,客户端会根据指定的索引拉取数据进行处理。要求填充分片规则。
public class ShardingChunkData implements IChunkData { /** * 分片号 */ private String shardingRule; ...... }
说明具体使用代码,可参考 示例工程 中的
ClusterFstSplitHandler
。其中,
shardingRule
是一个分片规则标识,例如可以使用user_00、user_01 … user_99
作为分片标识。结合上述场景案例,第一层任务拆分可以使用shardingChunkData
对 100 张用户表进行拆分,具体代码如下:public class UserSplitterByTable implements IClusterJobSplitHandler<ShardingChunkData> { @Override public SplitChunkDataResult<ShardingChunkData> handle(ClusterJobSplitContext context) { SplitChunkDataResult<ShardingChunkData> splitChunkDataResult = new SplitChunkDataResult<>(); ArrayList<ShardingChunkData> shardingChunkDatas = new ArrayList<>(); // user_00 ~ user_99 for (int i = 0; i < 100; i++) { String shardingRule = "user_"; if (i < 10) { shardingRule = shardingRule +"0"; } shardingRule = shardingRule + i; // 根据表维度拆分分片。 ShardingChunkData shardingChunkData = new ShardingChunkData(shardingRule); shardingChunkDatas.add(shardingChunkData); } splitChunkDataResult.setChunkDatum(shardingChunkDatas); splitChunkDataResult.setSuccess(true); return splitChunkDataResult; } @Override public String getName() { // 处理器名称。 return "USER_SPLITTER_BY_TABLE"; } @Override public ThreadPoolExecutor getThreadPool() { // 线程池,建议使用自定义线程池。如果返回 null 则使用 SDK 自带的线程池。 return null; } }
RangeChunkData 拆分:指定范围的拆分方式,指定每个子任务处理特定范围内的数据,类似于一个分页效果。要求填充起始索引、结束索引和分片规则。
public class RangeChunkData implements IChunkData { /** * 分片规则 */ private String shardingRule; /** * 起始索引 */ private String start; /** * 结束索引 */ private String end; ...... }
说明具体使用代码,可参考 工程示例 中的
ClusterSecSplitHandler
。参数说明:
shardingRule
:分片规则,例如指定表user_01
。例如,以下配置表示处理
user_01
表中 1000 到 2000 的数据。RangeChunkData chunk =new RangeChunkData (); chunk.setShardingRule("user_01"); chunk.setStart("1000"); chunk.setEnd("2000");
start
:开始索引,记录开始位置。例如 1000。end
:结束索引,记录结束位置(包含)。例如 2000。
在场景案例中,第二层可以使用 RangeChunkData 将用户数据按分页维度进行拆分,具体代码如下:
public class UserSplitterByPage implements IClusterJobSplitHandler<RangeChunkData> { @Override public SplitChunkDataResult<RangeChunkData> handle(ClusterJobSplitContext context) { // 第一层拆分的分片 ShardingChunkData shardingChunkData = (ShardingChunkData) context.getChunkData(); String shardingRule = shardingChunkData.getShardingRule(); // 1. 根据分片查询数量 int count = queryCountByTable(shardingChunkData.getShardingRule()); SplitChunkDataResult<RangeChunkData> splitChunkDataResult = new SplitChunkDataResult<>(); ArrayList<RangeChunkData> shardingChunkDatas = new ArrayList<>(); // 2. 做分页处理,每页处理 1000 条 int pageCount = 1000; for (int page = 0; page < count / pageCount; page++) { String startRows = String.valueOf(page * pageCount); // 包含 String endRows = String.valueOf((page + 1) * pageCount - 1); RangeChunkData rangeChunkData = new RangeChunkData(shardingRule, startRows, endRows); shardingChunkDatas.add(rangeChunkData); } splitChunkDataResult.setChunkDatum(shardingChunkDatas); splitChunkDataResult.setSuccess(true); return splitChunkDataResult; } // mock private int queryCountByTable(String shardingRule) { if ("user_00".equals(shardingRule)) { // user_00 表 10 万用户 return 100000; } else { // 其他表 9 万用户 return 90000; } } @Override public String getName() { return "USER_SPLITTER_BY_PAGE"; } @Override public ThreadPoolExecutor getThreadPool() { return null; } }
任务执行阶段
集群任务执行阶段分为三个子阶段:
Read 阶段:从数据源读取数据。
读取数据服务需要实现接口
IReader
:public interface IReader<T>{ /** * 读取数据 * * @param context * @return */ LoadData<T> read(ClusterJobExecuteContext context) throws Exception; }
LoadData 对象数据结构如下:
public class LoadData<T> extends MultiDataItem<T>{ /** * 是否还有未捞取的待处理数据:值为 false 时,处理完本次捞取出的数据就回调服务端;值为 true 时,认为还有数据待处理,处理完本次捞取数据后继续捞取数据。 */ private boolean hasMore; public LoadData(List<T> itemList,boolean hasMore){ super(itemList); this.hasMore=hasMore; } public static boolean isEmpty(LoadData loadData){ return loadData==null || loadData.isEmpty(); } public boolean isHasMore(){ return hasMore; } public void setHasMore(boolean hasMore){ this.hasMore=hasMore; } }
使用 read 方法分批读取数据时,需要注意设置
hasMore
的值,当值为 true 时,处理完本次读取的数据后会再次进行读取数据。如果需要打开关闭文件,可以使用
IStreamReader
接口:public interface IStream { /** * 打开流 * * @param context * @throws Exception */ void open(ClusterJobExecuteContext context) throws Exception; /** * 关闭流 * * @param context * @throws Exception */ void close(ClusterJobExecuteContext context) throws Exception; } public interface IStreamReader<T> extends IStream, IReader<T> { }
该接口继承自
IStream
接口,提供了 open 和 close 方法,这两个方法分别会在执行分片前和执行分片后调用。Process 阶段:将读取的数据对象转换为要写入数据源的对象,该阶段不是必须阶段,可以不设置。
处理数据服务需要实现接口
IProcessor
:public interface IProcessor<I, O> { /** * 数据处理,将读取出的对象加工转换为要处理的对象 * * @param r * @return */ DataProcessResult<O> process(ClusterJobExecuteContext context, I i) throws Exception; /** * 数据处理线程池,没有配置时使用handler的线程池 * * @return */ ThreadPoolExecutor getProcessThreadPool(); }
当不需要对读取的数据进行转换时,可以不设置该服务,这样会直接把读取的数据交给写服务。该服务可以单独指定线程池,当没有设置时默认使用任务 handler 的线程池。
Write 阶段:将数据写入数据源。
说明集群任务 Write 阶段的相关代码,可参考 工程示例 中的
ClusterExecuteHandler
。写数据服务需要实现接口
IWriter
:public interface IWriter<T> { /** * 每次执行的数据块包含的数据量最大值 * * @return */ int getCountPerWrite(); /** * 写数据 * * @param context * @param dataItem * @return * @throws Exception */ ClientCommonResult write(ClusterJobExecuteContext context, IDataItem<T> dataItem) throws Exception; /** * 写数据线程池,没有配置时使用handler的线程池 * * @return */ ThreadPoolExecutor getWriteThreadPool(); }
getCountPerWrite
方法用来设置每次批量 write 的数据量,值小于等于 0 时认为一次只写入一条数据。该服务可以单独设置线程池,当没有设置时默认使用任务 handler 的线程池。IDataItem
就是待处理的数据对象,有两种类型:SingleDataItem
:只包含一条数据,当getCountPerWrite
方法的返回值小于或等于 1 时是该类型。MultiDataItem
:包含多条数据,当getCountPerWrite
方法的返回值大于 1 时是该类型。
可根据实际情况对
dataItem
进行类型转换,例如:switch(dataItem.getType()){ case SINGLE: ((SingleDataItem<Integer>)dataItem).getItem(); break; case MULTIPLE: (MultiDataItem<Integer>)dataItem).getItemList(); break; default: break; }
如果需要打开关闭文件,可以使用接口
IStreamWriter
:public interface IStreamWriter<T> extends IStream,IWriter<T>{ }
该接口继承了
IStream
接口,提供了 open 和 close 方法,这两个方法分别会在执行分片前和执行分片后调用。
代码示例
结合上述应用场景案例,在执行阶段,需要根据拆分的分片从数据库或其他数据源中拉取数据进行处理,然后落库。
Read 阶段:根据分片信息(shardingRule、start、end)到指定表里拉取指定范围的用户数据。
Process 阶段:对拉取到的用户数据进行处理。
Write 阶段:将处理后的数据落库。
具体代码如下:
public class UserProcessHandler implements IClusterJobExecuteHandler {
private final Logger LOGGER = LoggerFactory.getLogger(ClusterJobExecutor.class);
private ThreadPoolExecutor threadPool;
@Override
public void preExecute(ClusterJobExecuteContext context) {
// 前置处理
LOGGER.info(String.format("preExecute chunkId:%s, param:%s",
context.getChunkId(), context.getCustomParams()));
}
@Override
public IReader getReader() {
return new IReader() {
@Override
public LoadData<String> read(ClusterJobExecuteContext context) throws Exception {
IChunkData chunkData = context.getChunk();
RangeChunkData rangeChunkData = (RangeChunkData) chunkData;
// 根据索引去查询数据
// 您可以根据开始和结束索引,从数据库中分页查询。集群任务按照您设置的分片规则进行分片,然后去执行分片。具体的分片内容和执行逻辑,由您自行实现。
String shardingRule = rangeChunkData.getShardingRule();
int start = Integer.parseInt(rangeChunkData.getStart());
int end = Integer.parseInt(rangeChunkData.getEnd());
// 模拟查询数据
List<String> stringList = queryUserInfo(shardingRule, start, end);
// 一次查询,后面没有数据
boolean hasMore = false;
return new LoadData<String>(stringList, hasMore);
}
private List<String> queryUserInfo(String shardingRule, int start, int end) {
List<String> stringList = Lists.newArrayList();
for (int userId = start; userId <= end; userId++) {
stringList.add(shardingRule + "" + userId);
}
return stringList;
}
};
}
@Override
public IProcessor getProcessor() {
return new IProcessor() {
@Override
public DataProcessResult process(ClusterJobExecuteContext context,
Object data) throws Exception {
// 处理数据
System.out.println("process user" + data);
return new DataProcessResult(true, "", data);
}
@Override
public ThreadPoolExecutor getProcessThreadPool() {
return null;
}
};
}
@Override
public IWriter getWriter() {
return new IWriter<String>() {
@Override
public int getCountPerWrite() {
return 1;
}
@Override
public ClientCommonResult write(ClusterJobExecuteContext context,
IDataItem<String> dataItem)throwsException {
// 数据存储
switch (dataItem.getType()) {
case SINGLE:
// 单个数据块,只包含一条数据
SingleDataItem<String> singleDataItem = (SingleDataItem<String>) dataItem;
LOGGER.info(String.format("getWriter write single data:%s", singleDataItem.getItem()));
System.out.println(String.format("write single data:%s", singleDataItem.getItem()));
break;
case MULTIPLE:
// 复合数据块,包含多条数据。比如任务停止时会将多条数据传入
MultiDataItem<String> multiDataItem = (MultiDataItem<String>) dataItem;
LOGGER.info(String.format("getWriter write multi data:%s", multiDataItem.getItemList()));
break;
default:
break;
}
return ClientCommonResult.buildSuccessResult();
}
@Override
public ThreadPoolExecutor getWriteThreadPool() {
return BlockingThreadPool.getThreadPool();
}
};
}
@Override
public ILimiter getLimiter() {
return new DefaultLimiter(10);
}
@Override
public void postExecute(ClusterJobExecuteContext context) {
// 后置处理
LOGGER.info(String.format("JobExecuteHandler postExecute chunkId:%s, param:%s",
context.getChunkId(), context.getCustomParams()));
}
@Override
public Progress calProgress(ClusterJobExecuteContext context) {
return new Progress();
}
@Override
public boolean isProcessAsync() {
return true;
}
@Override
public String getName() {
return "USER_PROCESS_HANDLER";
}
@Override
public ThreadPoolExecutor getThreadPool() {
return this.threadPool;
}
public void setThreadPool(ThreadPoolExecutor threadPool) {
this.threadPool = CommonThreadPool.getThreadPool("REMOTE_EXECUTE");
}
}
执行模式
集群任务的执行阶段支持两种模式:
本机执行
本机执行时需要实现接口 IClusterJobExecuteHandler
,如下:
public interface IJobHandler {
/**
* handler的名字
*
* @return
*/
String getName();
/**
* 可以留空, 使用默认执行线程池
*
* @return
*/
ThreadPoolExecutor getThreadPool();
}
public interface IClusterJobExecuteHandler<I, O> extends IJobHandler {
/**
* 预处理
*
* @param context
*/
void preExecute(ClusterJobExecuteContext context);
/**
* 获取数据读取服务
*
* @return
*/
IReader<I> getReader();
/**
* 获取数据清洗服务,返回NULL时会跳过数据清洗步骤,直接处理读取出的数据
*
* @return
*/
IProcessor<I, O> getProcessor();
/**
* @return
*/
IWriter<O> getWriter();
/**
* 获取限流器,返回NULL时使用默认限流器
*
* @return
*/
ILimiter getLimiter();
/**
* 本次执行完的后置处理
*
* @param context
* @return
*/
void postExecute(ClusterJobExecuteContext context);
/**
* 计算处理进度
*
* @param context
* @return
*/
Progress calProgress(ClusterJobExecuteContext context);
/**
* 是否异步处理,开启异步执行后,reader-process-write阶段全部是异步处理
*
* @return
*/
boolean isProcessAsync();
}
实现 IClusterJobExecuteHandler
的类负责处理任务。实现时需要设置:
IReader
IWriter
IProcessor:非必须设置,当没有设置 IProcessor 时会把读取出的数据直接交给 IWriter 服务。
ILimiter:没有设置时使用默认的限流服务。
当 isProcessAsync
返回 true
时,Reader、Process、Write 阶段全部是异步处理,即读取的数据放到队列给 Process 或 Write 消费后会立即开始下一次读取;当返回 false
时,读取的数据需要等待被 Write 消费完后开始下一次读取。
IReader 服务如果配置成了 bean,需要确保服务是无状态的,否则多线程场景下会相互干扰。如果需要存储一些处理过程中的数据,需要在 getReader
方法里新建一个 IReader 服务实例。IWriter 服务也是如此。
远程执行
远程执行时需要实现接口 IRemoteClusterJobExecuteHandler
,如下:
public interface IRemoteClusterJobExecuteHandler<T> extends IClusterJobExecuteHandler<T>{
/**
* 设置路由策略,没有设置时默认为轮询
*
* @return
*/
IClusterRouter getClusterRouter();
/**
* 分发完一次load出来的数据后的休眠时间,防止分发出去的数据还没有被处理完导致的重复捞取,单位是ms,返回值<=0时认为不休眠
*
* @return
*/
int getSleepAfterPerLoad();
}
该接口继承自 IClusterJobExecuteHandler
,需要额外两个方法:getClusterRouter
和 getSleepAfterPerLoad
。
当远程处理数据时,客户端会把捞取的数据分发给业务集群内的其它机器进行处理,
getClusterRouter
方法用来设置分发时的路由规则,目前支持随机(RandomClusterRouter
)和轮询(RoundRobinClusterRouter
)两种方式,当方法返回null
时默认为轮询。getSleepAfterPerLoad
方法用来设置处理完读取的数据后的休眠时间,防止分发出去的数据还没有被处理完导致的重复捞取,单位为 ms,仅当isProcessAsync
返回false
时生效。
集群内的远程 RPC 调用使用 oneway 模式,所以无法获取数据的处理结果,需要业务方自己记录处理结果。
可以通过实现接口 IRemoteProcessorExecutor
来设置接收远程分发请求的线程池,将实现类发布成 bean 即可,接口如下:
public interface IRemoteProcessorExecutor{
/**
* 获取线程池
*
* @return
*/
Executor getExecutor();
}
另外,远程调用需要在客户端启动一个 RpcServer:
对于 SOFABoot 项目,在工程的
application.properties
文件中使用配置项:com.alipay.sofa.antscheduler.remote.execute.enable=true com.alipay.sofa.antscheduler.remote.execute.port=xxx
对于非 SOFABoot 项目,需要在初始化客户端的时候设置下面两个参数:
public class Config{ ...... /** * 是否支持远程执行 */ private boolean isEnableRemoteExecute =false; /** * 远程服务端口,没有设置时使用默认端口9989 */ private int remoteServerPort; ...... }
线程池默认配置
IProcessor 和 IWriter 的线程池没有设置时,默认使用 IJobHandler 里设置的线程池。当 IJobHandler 也没有设置时,使用的是客户端默认指定的,配置参数为:
minPoolSize:20
maxPoolSize:300
queueSize:100
keepAliveTime:1小时
当没有设置 IRemoteProcessExecutor 时,默认使用 bolt 协议提供的默认线程池,配置参数为:
minPoolSize:20
maxPoolSize:400
queueSize:600
keepAliveTime:60秒
您也可以对 IProcessor、IWriter、IJobHandler 以及 IRemoteProcessExecutor 自定义设置线程池。