优雅关闭涉及两方面,一个是 RPC 框架作为客户端,一个是 RPC 框架作为服务端。
作为服务端
RPC 框架作为服务端,在关闭时,不应该直接暴力关闭。关闭方式如下:
查看 RPC 框架:
com.alipay.sofa.rpc.context.RpcRuntimeContext
在静态初始化块中,添加一个
ShutdownHook
。// 增加 JVM 关闭事件。 if(RpcConfigs.getOrDefaultValue(RpcOptions.JVM_SHUTDOWN_HOOK,true)){ Runtime.getRuntime().addShutdownHook(newThread(newRunnable(){ @Override public void run(){ if(LOGGER.isWarnEnabled()){ LOGGER.warn("SOFA RPC Framework catch JVM shutdown event, Run shutdown hook now."); } destroy(false); } },"SOFA-RPC-ShutdownHook")); }
ShutdownHook
的作用是当发布平台或用户执行kill pid
时,会先执行ShutdownHook
中的逻辑。在销毁操作中,RPC 框架会先执行向注册中心取消服务注册、关闭服务端口等动作。示例如下:private static void destroy(boolean active){ RpcRunningState.setShuttingDown(true); for(Destroyable.DestroyHook destroyHook : DESTROY_HOOKS){ destroyHook.preDestroy(); } List<ProviderConfig> providerConfigs = new ArrayList<ProviderConfig>(); for(ProviderBootstrap bootstrap : EXPORTED_PROVIDER_CONFIGS){ providerConfigs.add(bootstrap.getProviderConfig()); } // 先反注册服务端。 List<Registry> registries =RegistryFactory.getRegistries(); if(CommonUtils.isNotEmpty(registries) && CommonUtils.isNotEmpty(providerConfigs)){ for(Registry registry : registries){ registry.batchUnRegister(providerConfigs); } } // 关闭启动的端口。 ServerFactory.destroyAll(); // 关闭发布的服务。 for(ProviderBootstrap bootstrap : EXPORTED_PROVIDER_CONFIGS){ bootstrap.unExport(); } // 关闭调用的服务。 for(ConsumerBootstrap bootstrap : REFERRED_CONSUMER_CONFIGS){ ConsumerConfig config = bootstrap.getConsumerConfig(); if(!CommonUtils.isFalse(config.getParameter(RpcConstants.HIDDEN_KEY_DESTROY))){ // 除非不让主动 unrefer。 bootstrap.unRefer(); } } // 关闭注册中心。 RegistryFactory.destroyAll(); // 关闭客户端的一些公共资源。 ClientTransportFactory.closeAll(); // 卸载模块。 if(!RpcRunningState.isUnitTestMode()){ ModuleFactory.uninstallModules(); } // 卸载钩子。 for(Destroyable.DestroyHook destroyHook : DESTROY_HOOKS){ destroyHook.postDestroy(); } // 清理缓存。 RpcCacheManager.clearAll(); RpcRunningState.setShuttingDown(false); if(LOGGER.isWarnEnabled()){ LOGGER.warn("SOFA RPC Framework has been release all resources {}...", active ?"actively ":""); } }
其中以 Bolt 为例,关闭端口并不是一个立刻执行的动作,而是会判断当前服务端上面的连接和队列的任务,先处理完队列中的任务再缓慢关闭。
@Override public void destroy(){ if(!started){ return; } int stopTimeout = serverConfig.getStopTimeout(); if(stopTimeout >0){// 需要等待结束时间 AtomicInteger count = boltServerProcessor.processingCount; // 有正在执行的请求 或者 队列里有请求 if(count.get()>0|| bizThreadPool.getQueue().size()>0){ long start =RpcRuntimeContext.now(); if(LOGGER.isInfoEnabled()){ LOGGER.info("There are {} call in processing and {} call in queue, wait {} ms to end", count, bizThreadPool.getQueue().size(), stopTimeout); } while((count.get()>0|| bizThreadPool.getQueue().size()>0) && RpcRuntimeContext.now()- start < stopTimeout){ // 等待返回结果 try{ Thread.sleep(10); }catch(InterruptedException ignore){ } } } // 关闭前检查已有请求? } // 关闭线程池 bizThreadPool.shutdown(); stop(); }
作为客户端
RPC 框架作为客户端时,实际上就是 Cluster 的关闭。关闭调用的服务这一步,可以查看 com.alipay.sofa.rpc.client.AbstractCluster
。
/**
* 优雅关闭的钩子。
*/
protected class GracefulDestroyHook implements DestroyHook{
@Override
public void preDestroy(){
// 准备关闭连接。
int count = countOfInvoke.get();
final int timeout = consumerConfig.getDisconnectTimeout();// 等待结果超时时间。
if(count >0){// 有正在调用的请求。
long start =RpcRuntimeContext.now();
if(LOGGER.isWarnEnabled()){
LOGGER.warn("There are {} outstanding call in client, will close transports util return",
count);
}
while(countOfInvoke.get()>0 && RpcRuntimeContext.now()- start < timeout){// 等待返回结果。
try{
Thread.sleep(10);
}catch(InterruptedException ignore){
}
}
}
}
@Override
public void postDestroy(){
}
}
客户端会逐步将正在调用的请求处理完成才会下线。
说明
优雅关闭是需要和发布平台联动的。如果强制 kill,那么任何优雅关闭的方案都不会生效。后续会考虑在 SOFABoot 层面提供一个统一的 API,来给发布平台调用,而不是依赖 hook 的逻辑。
文档内容是否对您有帮助?