2021-05-12 14:32:11
sofa-rpc 伺服器端原始碼流程走讀
sofa-rpc是阿里開源的一款高效能的rpc框架,這篇文章主要是對sofa-rpc provider啟動服務流程的一個程式碼走讀,下面是我簡單繪製的一個基本的關係流程圖
下面我們根據sofa-rpc程式碼,對流程進行一個跟蹤與走讀。我們以BoltServer的為例
public static void main(String[] args) {
ApplicationConfig application = new ApplicationConfig().setAppName("test-server");
ServerConfig serverConfig = new ServerConfig()
.setPort(22000)
.setDaemon(false);
ProviderConfig<HelloService> providerConfig = new ProviderConfig<HelloService>()
.setInterfaceId(HelloService.class.getName())
.setApplication(application)
.setRef(new HelloServiceImpl())
.setServer(serverConfig)
.setRegister(false);
ProviderConfig<EchoService> providerConfig2 = new ProviderConfig<EchoService>()
.setInterfaceId(EchoService.class.getName())
.setApplication(application)
.setRef(new EchoServiceImpl())
.setServer(serverConfig)
.setRegister(false);
providerConfig.export();
providerConfig2.export();
LOGGER.warn("started at pid {}", RpcRuntimeContext.PID);
}
可以看到sofa-rpc通過ProviderConfig類對服務提供方Provider進行了設定資訊的初始化,同時也提供了export做為服務啟動的入口。
public synchronized void export() {
if (providerBootstrap == null) {
providerBootstrap = Bootstraps.from(this);
}
providerBootstrap.export();
}
根據ProviderConfig中setBootstrap()設定的Bootstrap型別,我們通過Bootstaps.from(this)可以獲取到不同的Bootstrap引導服務,分別是DefaultProviderBootstrap與 DubboProviderBootstrap
/**
* 發布一個服務
*
* @param providerConfig 服務發布者設定
* @param <T> 介面型別
* @return 發布啟動類
*/
public static <T> ProviderBootstrap<T> from(ProviderConfig<T> providerConfig) {
String bootstrap = providerConfig.getBootstrap();
if (StringUtils.isEmpty(bootstrap)) {
// Use default provider bootstrap 無的話就返回預設DefaultProviderBootstrap
bootstrap = RpcConfigs.getStringValue(RpcOptions.DEFAULT_PROVIDER_BOOTSTRAP);
providerConfig.setBootstrap(bootstrap);
}
ProviderBootstrap providerBootstrap = ExtensionLoaderFactory.getExtensionLoader(ProviderBootstrap.class)
.getExtension(bootstrap, new Class[] { ProviderConfig.class }, new Object[] { providerConfig });
return (ProviderBootstrap<T>) providerBootstrap;
}
DefaultProviderBootstrap與 DubboProviderBootstrap 都繼承自ProviderBootstrap。
DefaultProviderBootstrap又被BoltProviderBootstrap、Http2ClearTextProviderBootstrap、RestProviderBootstrap三個類所繼承,這其實對應了sofa-rpc中的三種server服務方式。
我們看下DefaultProviderBootstrap服務啟動原始碼
@Override
public void export() {
if (providerConfig.getDelay() > 0) { // 延遲載入,單位毫秒
Thread thread = factory.newThread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(providerConfig.getDelay());
} catch (Throwable ignore) { // NOPMD
}
doExport();
}
});
thread.start();
} else {
doExport();
}
}
private void doExport() {
if (exported) {
return;
}
// 檢查引數
checkParameters();
String appName = providerConfig.getAppName();
//key is the protocol of server,for concurrent safe
Map<String, Boolean> hasExportedInCurrent = new ConcurrentHashMap<String, Boolean>();
// 將處理器註冊到server
List<ServerConfig> serverConfigs = providerConfig.getServer();
for (ServerConfig serverConfig : serverConfigs) {
String protocol = serverConfig.getProtocol();
String key = providerConfig.buildKey() + ":" + protocol;
if (LOGGER.isInfoEnabled(appName)) {
LOGGER.infoWithApp(appName, "Export provider config : {} with bean id {}", key, providerConfig.getId());
}
// 注意同一interface,同一uniqleId,不同server情況
AtomicInteger cnt = EXPORTED_KEYS.get(key); // 計數器
if (cnt == null) { // 沒有發布過
cnt = CommonUtils.putToConcurrentMap(EXPORTED_KEYS, key, new AtomicInteger(0));
}
int c = cnt.incrementAndGet();
hasExportedInCurrent.put(serverConfig.getProtocol(), true);
int maxProxyCount = providerConfig.getRepeatedExportLimit();
if (maxProxyCount > 0) {
if (c > maxProxyCount) {
decrementCounter(hasExportedInCurrent);
// 超過最大數量,直接丟擲異常
throw new SofaRpcRuntimeException("Duplicate provider config with key " + key
+ " has been exported more than " + maxProxyCount + " times!"
+ " Maybe it's wrong config, please check it."
+ " Ignore this if you did that on purpose!");
} else if (c > 1) {
if (LOGGER.isInfoEnabled(appName)) {
LOGGER.infoWithApp(appName, "Duplicate provider config with key {} has been exported!"
+ " Maybe it's wrong config, please check it."
+ " Ignore this if you did that on purpose!", key);
}
}
}
}
try {
// 構造請求呼叫器
providerProxyInvoker = new ProviderProxyInvoker(providerConfig);
// 初始化註冊中心
if (providerConfig.isRegister()) {
List<RegistryConfig> registryConfigs = providerConfig.getRegistry();
if (CommonUtils.isNotEmpty(registryConfigs)) {
for (RegistryConfig registryConfig : registryConfigs) {
RegistryFactory.getRegistry(registryConfig); // 提前初始化Registry
}
}
}
// 將處理器註冊到server
for (ServerConfig serverConfig : serverConfigs) {
try {
//構建Server
Server server = serverConfig.buildIfAbsent();
// 註冊序列化介面
server.registerProcessor(providerConfig, providerProxyInvoker);
if (serverConfig.isAutoStart()) {
//啟動服務
server.start();
}
} catch (SofaRpcRuntimeException e) {
throw e;
} catch (Exception e) {
LOGGER.errorWithApp(appName, "Catch exception when register processor to server: "
+ serverConfig.getId(), e);
}
}
// 註冊到註冊中心
providerConfig.setConfigListener(new ProviderAttributeListener());
register();
} catch (Exception e) {
decrementCounter(hasExportedInCurrent);
if (e instanceof SofaRpcRuntimeException) {
throw (SofaRpcRuntimeException) e;
} else {
throw new SofaRpcRuntimeException("Build provider proxy error!", e);
}
}
// 記錄一些快取資料
RpcRuntimeContext.cacheProviderConfig(this);
exported = true;
}
程式碼中通過serverConfig.buildIfAbsent()構建Server服務物件,而在buildIfAbsent()函數中我們可以看到,sever是通過SeverFactory工廠獲取到的,在SeverFactory的getSever()中根據SeverConfig的設定獲取Sever的具體範例,並執行Init()進行初始化。
/**
* 啟動服務
*
* @return the server
*/
public synchronized Server buildIfAbsent() {
if (server != null) {
return server;
}
// 提前檢查協定+序列化方式
// ConfigValueHelper.check(ProtocolType.valueOf(getProtocol()),
// SerializationType.valueOf(getSerialization()));
//在sever工廠中拿到sever範例
server = ServerFactory.getServer(this);
return server;
}
/**
* 初始化Server範例
*
* @param serverConfig 伺服器端設定
* @return Server
*/
public synchronized static Server getServer(ServerConfig serverConfig) {
try {
Server server = SERVER_MAP.get(Integer.toString(serverConfig.getPort()));
if (server == null) {
// 算下網絡卡和埠
resolveServerConfig(serverConfig);
ExtensionClass<Server> ext = ExtensionLoaderFactory.getExtensionLoader(Server.class)
.getExtensionClass(serverConfig.getProtocol());
if (ext == null) {
throw ExceptionUtils.buildRuntime("server.protocol", serverConfig.getProtocol(),
"Unsupported protocol of server!");
}
server = ext.getExtInstance();
//服務初始化
server.init(serverConfig);
SERVER_MAP.put(serverConfig.getPort() + "", server);
}
return server;
} catch (SofaRpcRuntimeException e) {
throw e;
} catch (Throwable e) {
throw new SofaRpcRuntimeException(e.getMessage(), e);
}
}
sofa-rpc提供了三種server型別 BoltServer,RestServer與AbstractHttpServer
BoltServer中通訊底層通過RemotingServer實現的,RemotingServer是基於阿里sofa-bolt通訊框架開發的。
/**
* Bolt伺服器端
*/
protected RemotingServer remotingServer;
@Override
public void start() {
if (started) {
return;
}
synchronized (this) {
if (started) {
return;
}
// 生成阿里基於netty的bolt服務Server物件
remotingServer = initRemotingServer();
try {
if (remotingServer.start(serverConfig.getBoundHost())) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Bolt server has been bind to {}:{}", serverConfig.getBoundHost(),
serverConfig.getPort());
}
} else {
throw new SofaRpcRuntimeException("Failed to start bolt server, see more detail from bolt log.");
}
started = true;
if (EventBus.isEnable(ServerStartedEvent.class)) {
EventBus.post(new ServerStartedEvent(serverConfig, bizThreadPool));
}
} catch (SofaRpcRuntimeException e) {
throw e;
} catch (Exception e) {
throw new SofaRpcRuntimeException("Failed to start bolt server!", e);
}
}
}
AbstractHttpServer 提供http服務,底層通訊通過ServerTransport類實現的
/**
* 伺服器端通訊層
*/
private ServerTransport serverTransport;
@Override
public void init(ServerConfig serverConfig) {
this.serverConfig = serverConfig;
this.serverTransportConfig = convertConfig(serverConfig);
// 啟動執行緒池
this.bizThreadPool = initThreadPool(serverConfig);
// 伺服器端處理器
this.serverHandler = new HttpServerHandler();
// set default transport config
this.serverTransportConfig.setContainer(container);
this.serverTransportConfig.setServerHandler(serverHandler);
}
@Override
public void start() {
if (started) {
return;
}
synchronized (this) {
if (started) {
return;
}
try {
// 啟動執行緒池
this.bizThreadPool = initThreadPool(serverConfig);
this.serverHandler.setBizThreadPool(bizThreadPool);
//範例化服務,具體程式碼見
serverTransport = ServerTransportFactory.getServerTransport(serverTransportConfig);
started = serverTransport.start();
if (started) {
if (EventBus.isEnable(ServerStartedEvent.class)) {
EventBus.post(new ServerStartedEvent(serverConfig, bizThreadPool));
}
}
} catch (SofaRpcRuntimeException e) {
throw e;
} catch (Exception e) {
throw new SofaRpcRuntimeException("Failed to start HTTP/2 server!", e);
}
}
}
ServerTransport是個抽象類,具體實現為transport包下AbstractHttp2ServerTransport
/**
* 建構函式
*
* @param transportConfig 伺服器端設定
*/
protected AbstractHttp2ServerTransport(ServerTransportConfig transportConfig) {
super(transportConfig);
}
@Override
public boolean start() {
if (serverBootstrap != null) {
return true;
}
synchronized (this) {
if (serverBootstrap != null) {
return true;
}
boolean flag = false;
SslContext sslCtx = SslContextBuilder.build();
// Configure the server.
EventLoopGroup bossGroup = NettyHelper.getServerBossEventLoopGroup(transportConfig);
//可以看到然是基於Netty
HttpServerHandler httpServerHandler = (HttpServerHandler) transportConfig.getServerHandler();
bizGroup = NettyHelper.getServerBizEventLoopGroup(transportConfig, httpServerHandler.getBizThreadPool());
serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, bizGroup)
.channel(transportConfig.iSUSEEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, transportConfig.getBacklog())
.option(ChannelOption.SO_REUSEADDR, transportConfig.isReuseAddr())
.option(ChannelOption.RCVBUF_ALLOCATOR, NettyHelper.getRecvByteBufAllocator())
.option(ChannelOption.ALLOCATOR, NettyHelper.getByteBufAllocator())
.childOption(ChannelOption.SO_KEEPALIVE, transportConfig.isKeepAlive())
.childOption(ChannelOption.TCP_NODELAY, transportConfig.isTcpNoDelay())
.childOption(ChannelOption.SO_RCVBUF, 8192 * 128)
.childOption(ChannelOption.SO_SNDBUF, 8192 * 128)
.handler(new LoggingHandler(LogLevel.DEBUG))
.childOption(ChannelOption.ALLOCATOR, NettyHelper.getByteBufAllocator())
.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(
transportConfig.getBufferMin(), transportConfig.getBufferMax()))
.childHandler(new Http2ServerChannelInitializer(bizGroup, sslCtx,
httpServerHandler, transportConfig.getPayload()));
// 係結到全部網絡卡 或者 指定網絡卡
ChannelFuture future = serverBootstrap.bind(
new InetSocketAddress(transportConfig.getHost(), transportConfig.getPort()));
ChannelFuture channelFuture = future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("HTTP/2 Server bind to {}:{} success!",
transportConfig.getHost(), transportConfig.getPort());
}
} else {
LOGGER.error("HTTP/2 Server bind to {}:{} failed!",
transportConfig.getHost(), transportConfig.getPort());
stop();
}
}
});
try {
channelFuture.await();
if (channelFuture.isSuccess()) {
flag = Boolean.TRUE;
} else {
throw new SofaRpcRuntimeException("Server start fail!", future.cause());
}
} catch (InterruptedException e) {
LOGGER.error(e.getMessage(), e);
}
return flag;
}
}
RestServer 提供Rest服務,底層通訊實現具體可見SofaNettyJaxrsServer。
/**
* Rest伺服器端
*/
protected SofaNettyJaxrsServer httpServer;
@Override
public void init(ServerConfig serverConfig) {
this.serverConfig = serverConfig;
httpServer = buildServer();
}
SofaNettyJaxrsServer中服務啟動的具體程式碼
@Override
public void start() {
// CHANGE: 增加執行緒名字
boolean daemon = serverConfig.isDaemon();
boolean isEpoll = serverConfig.isEpoll();
NamedThreadFactory ioFactory = new NamedThreadFactory("SEV-REST-IO-" + port, daemon);
NamedThreadFactory bizFactory = new NamedThreadFactory("SEV-REST-BIZ-" + port, daemon);
eventLoopGroup = isEpoll ? new EpollEventLoopGroup(ioWorkerCount, ioFactory)
: new NioEventLoopGroup(ioWorkerCount, ioFactory);
eventExecutor = isEpoll ? new EpollEventLoopGroup(executorThreadCount, bizFactory)
: new NioEventLoopGroup(executorThreadCount, bizFactory);
// Configure the server.
bootstrap = new ServerBootstrap()
.group(eventLoopGroup)
.channel(isEpoll ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
.childHandler(createChannelInitializer())
.option(ChannelOption.SO_BACKLOG, backlog)
.childOption(ChannelOption.SO_KEEPALIVE, serverConfig.isKeepAlive()); // CHANGE: setKeepAlive
for (Map.Entry<ChannelOption, Object> entry : channelOptions.entrySet()) {
bootstrap.option(entry.getKey(), entry.getValue());
}
for (Map.Entry<ChannelOption, Object> entry : childChannelOptions.entrySet()) {
bootstrap.childOption(entry.getKey(), entry.getValue());
}
final InetSocketAddress socketAddress;
if (null == hostname || hostname.isEmpty()) {
socketAddress = new InetSocketAddress(port);
} else {
socketAddress = new InetSocketAddress(hostname, port);
}
bootstrap.bind(socketAddress).syncUninterruptibly();
}
OK,以上就是sofa-rpc伺服器端啟動的一個基本的流程,這裡關注的只是簡單的服務啟動流程,沒有深入程式碼功能進行分析,在此基礎上,我們可以進一步探究程式碼的具體實現。
相關文章