自己动手从0开始实现一个分布式RPC框架
簡介:?如果一個程序員能清楚的了解RPC框架所具備的要素,掌握RPC框架中涉及的服務注冊發現、負載均衡、序列化協議、RPC通信協議、Socket通信、異步調用、熔斷降級等技術,可以全方位的提升基本素質。雖然也有相關源碼,但是只看源碼容易眼高手低,動手寫一個才是自己真正掌握這門技術的最優路徑。
作者 | 麓行
來源 | 阿里技術公眾號
前言
為什么要自己寫一個RPC框架,我覺得從個人成長上說,如果一個程序員能清楚的了解RPC框架所具備的要素,掌握RPC框架中涉及的服務注冊發現、負載均衡、序列化協議、RPC通信協議、Socket通信、異步調用、熔斷降級等技術,可以全方位的提升基本素質。雖然也有相關源碼,但是只看源碼容易眼高手低,動手寫一個才是自己真正掌握這門技術的最優路徑。
一 什么是RPC
RPC(Remote Procedure Call)遠程過程調用,簡言之就是像調用本地方法一樣調用遠程服務。目前外界使用較多的有gRPC、Dubbo、Spring Cloud等。相信大家對RPC的概念都已經很熟悉了,這里不做過多介紹。
二 分布式RPC框架要素
一款分布式RPC框架離不開三個基本要素:
- 服務提供方 Serivce Provider
- 服務消費方 Servce Consumer
- 注冊中心 Registery
圍繞上面三個基本要素可以進一步擴展服務路由、負載均衡、服務熔斷降級、序列化協議、通信協議等等。
1 注冊中心
主要是用來完成服務注冊和發現的工作。雖然服務調用是服務消費方直接發向服務提供方的,但是現在服務都是集群部署,服務的提供者數量也是動態變化的,所以服務的地址也就無法預先確定。因此如何發現這些服務就需要一個統一注冊中心來承載。
2 服務提供方(RPC服務端)
其需要對外提供服務接口,它需要在應用啟動時連接注冊中心,將服務名及其服務元數據發往注冊中心。同時需要提供服務服務下線的機制。需要維護服務名和真正服務地址映射。服務端還需要啟動Socket服務監聽客戶端請求。
3 服務消費方(RPC客戶端)
客戶端需要有從注冊中心獲取服務的基本能力,它需要在應用啟動時,掃描依賴的RPC服務,并為其生成代理調用對象,同時從注冊中心拉取服務元數據存入本地緩存,然后發起監聽各服務的變動做到及時更新緩存。在發起服務調用時,通過代理調用對象,從本地緩存中獲取服務地址列表,然后選擇一種負載均衡策略篩選出一個目標地址發起調用。調用時會對請求數據進行序列化,并采用一種約定的通信協議進行socket通信。
三 技術選型
1 注冊中心
目前成熟的注冊中心有Zookeeper,Nacos,Consul,Eureka,它們的主要比較如下:
本實現中支持了兩種注冊中心Nacos和Zookeeper,可根據配置進行切換。
2 IO通信框架
本實現采用Netty作為底層通信框架,Netty是一個高性能事件驅動型的非阻塞的IO(NIO)框架。
3 通信協議
TCP通信過程中會根據TCP緩沖區的實際情況進行包的劃分,所以在業務上認為一個完整的包可能會被TCP拆分成多個包進行發送,也有可能把多個小的包封裝成一個大的數據包發送,這就是所謂的TCP粘包和拆包問題。所以需要對發送的數據包封裝到一種通信協議里。
業界的主流協議的解決方案可以歸納如下:
很明顯1,2都有些局限性,本實現采用方案3,具體協議設計如下:
+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+ | BYTE | | | | | | | ........ +--------------------------------------------+--------+-----------------+--------+--------+--------+--------+--------+--------+-----------------+ | magic | version| type | content lenth | content byte[] | | +--------+-----------------------------------------------------------------------------------------+--------------------------------------------+- 第一個字節是魔法數,比如我定義為0X35。
- 第二個字節代表協議版本號,以便對協議進行擴展,使用不同的協議解析器。
- 第三個字節是請求類型,如0代表請求1代表響應。
- 第四個字節表示消息長度,即此四個字節后面此長度的內容是消息content。
4 序列化協議
本實現支持3種序列化協議,JavaSerializer、Protobuf及Hessian可以根據配置靈活選擇。建議選用Protobuf,其序列化后碼流小性能高,非常適合RPC調用,Google自家的gRPC也是用其作為通信協議。
5 負載均衡
本實現支持兩種主要負載均衡策略,隨機和輪詢,其中他們都支持帶權重的隨機和輪詢,其實也就是四種策略。
四 整體架構
五 實現
項目總體結構:
1 服務注冊發現
Zookeeper
Zookeeper采用節點樹的數據模型,類似linux文件系統,/,/node1,/node2 比較簡單。
Zookeeper節點類型是Zookeeper實現很多功能的核心原理,分為持久節點臨時節點、順序節點三種類型的節點。
我們采用的是對每個服務名創建一個持久節點,服務注冊時實際上就是在zookeeper中該持久節點下創建了一個臨時節點,該臨時節點存儲了服務的IP、端口、序列化方式等。
客戶端獲取服務時通過獲取持久節點下的臨時節點列表,解析服務地址數據:
客戶端監聽服務變化:
Nacos
Nacos是阿里開源的微服務管理中間件,用來完成服務之間的注冊發現和配置中心,相當于Spring Cloud的Eureka+Config。
不像Zookeeper需要利用提供的創建節點特性來實現注冊發現,Nacos專門提供了注冊發現功能,所以其使用更加方便簡單。主要關注NamingService接口提供的三個方法registerInstance、getAllInstances、subscribe;registerInstance用來完成服務端服務注冊,getAllInstances用來完成客戶端服務獲取,subscribe用來完成客戶端服務變動監聽,這里就不多做介紹,具體可參照實現源碼。
2 服務提供方 Serivce Provider
在自動配置類OrcRpcAutoConfiguration完成注冊中心和RPC啟動類(RpcBootStarter)的初始化:
服務端的啟動流程如下:
RPC啟動(RpcBootStarter):
上面監聽Spring容器初始化事件時注意,由于Spring包含多個容器,如web容器和核心容器,他們還有父子關系,為了避免重復執行注冊,只處理頂層的容器即可。
3 服務消費方 Servce Consumer
服務消費方需要在應用啟動完成前為依賴的服務創建好代理對象,這里有很多種方法,常見的有兩種:
- 一是在應用的Spring Context初始化完成事件時觸發,掃描所有的Bean,將Bean中帶有OrcRpcConsumer注解的field獲取到,然后創建field類型的代理對象,創建完成后,將代理對象set給此field。后續就通過該代理對象創建服務端連接,并發起調用。
- 二是通過Spring的BeanFactoryPostProcessor,其可以對bean的定義BeanDefinition(配置元數據)進行處理;Spring IOC會在容器實例化任何其他bean之前運行BeanFactoryPostProcessor讀取BeanDefinition,可以修改這些BeanDefinition,也可以新增一些BeanDefinition。
本實現也采用第二種方式,處理流程如下:
BeanFactoryPostProcessor的主要實現:
@Overridepublic void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory)throws BeansException {this.beanFactory = beanFactory;postProcessRpcConsumerBeanFactory(beanFactory, (BeanDefinitionRegistry)beanFactory);}private void postProcessRpcConsumerBeanFactory(ConfigurableListableBeanFactory beanFactory, BeanDefinitionRegistry beanDefinitionRegistry) {String[] beanDefinitionNames = beanFactory.getBeanDefinitionNames();int len = beanDefinitionNames.length;for (int i = 0; i < len; i++) {String beanDefinitionName = beanDefinitionNames[i];BeanDefinition beanDefinition = beanFactory.getBeanDefinition(beanDefinitionName);String beanClassName = beanDefinition.getBeanClassName();if (beanClassName != null) {Class<?> clazz = ClassUtils.resolveClassName(beanClassName, classLoader);ReflectionUtils.doWithFields(clazz, new FieldCallback() {@Overridepublic void doWith(Field field) throws IllegalArgumentException, IllegalAccessException {parseField(field);}});}}Iterator<Entry<String, BeanDefinition>> it = beanDefinitions.entrySet().iterator();while (it.hasNext()) {Entry<String, BeanDefinition> entry = it.next();if (context.containsBean(entry.getKey())) {throw new IllegalArgumentException("Spring context already has a bean named " + entry.getKey());}beanDefinitionRegistry.registerBeanDefinition(entry.getKey(), entry.getValue());log.info("register OrcRpcConsumerBean definition: {}", entry.getKey());}}private void parseField(Field field) {// 獲取所有OrcRpcConsumer注解OrcRpcConsumer orcRpcConsumer = field.getAnnotation(OrcRpcConsumer.class);if (orcRpcConsumer != null) {// 使用field的類型和OrcRpcConsumer注解一起生成BeanDefinitionOrcRpcConsumerBeanDefinitionBuilder beanDefinitionBuilder = new OrcRpcConsumerBeanDefinitionBuilder(field.getType(), orcRpcConsumer);BeanDefinition beanDefinition = beanDefinitionBuilder.build();beanDefinitions.put(field.getName(), beanDefinition);}}ProxyFactory的主要實現:
public class JdkProxyFactory implements ProxyFactory{@Overridepublic Object getProxy(ServiceMetadata serviceMetadata) {return Proxy.newProxyInstance(serviceMetadata.getClazz().getClassLoader(), new Class[] {serviceMetadata.getClazz()},new ClientInvocationHandler(serviceMetadata));}private class ClientInvocationHandler implements InvocationHandler {private ServiceMetadata serviceMetadata;public ClientInvocationHandler(ServiceMetadata serviceMetadata) {this.serviceMetadata = serviceMetadata;}@Overridepublic Object invoke(Object proxy, Method method, Object[] args) throws Throwable {String serviceId = ServiceUtils.getServiceId(serviceMetadata);// 通過負載均衡器選取一個服務提供方地址ServiceURL service = InvocationServiceSelector.select(serviceMetadata);OrcRpcRequest request = new OrcRpcRequest();request.setMethod(method.getName());request.setParameterTypes(method.getParameterTypes());request.setParameters(args);request.setRequestId(UUID.randomUUID().toString());request.setServiceId(serviceId);OrcRpcResponse response = InvocationClientContainer.getInvocationClient(service.getServerNet()).invoke(request, service);if (response.getStatus() == RpcStatusEnum.SUCCESS) {return response.getData();} else if (response.getException() != null) {throw new OrcRpcException(response.getException().getMessage());} else {throw new OrcRpcException(response.getStatus().name());}}} }本實現只使用JDK動態代理,也可以使用cglib或Javassist實現以獲得更好的性能,JdkProxyFactory中。
4 IO模塊
UML圖如下:
結構比較清晰,分三大模塊:客戶端調用適配模塊、服務端請求響應適配模塊和Netty IO服務模塊。
客戶端調用適配模塊
此模塊比較簡單,主要是為客戶端調用時建立服務端接,并將連接存入緩存,避免后續同服務調用重復建立連接,連接建立成功后發起調用。下面是DefaultInvocationClient的實現:
服務端請求響應適配模塊
服務請求響應模塊也比較簡單,是根據請求中的服務名,從緩存中獲取服務元數據,然后從請求中獲取調用的方法和參數類型信息,反射獲取調用方法信息。然后從spring context中獲取bean進行反射調用。
Netty IO服務模塊
Netty IO服務模塊是核心,稍復雜一些,客戶端和服務端主要處理流程如下:
其中,重點是這四個類的實現:NettyNetClient、NettyNetServer、NettyClientChannelRequestHandler和NettyServerChannelRequestHandler,上面的UML圖和下面流程圖基本上講清楚了它們的關系和一次請求的處理流程,這里就不再展開了。
下面重點講一下編碼解碼器。
在技術選型章節中,提及了采用的通信協議,定義了私有的RPC協議:
+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+ | BYTE | | | | | | | ........ +--------------------------------------------+--------+-----------------+--------+--------+--------+--------+--------+--------+-----------------+ | magic | version| type | content lenth | content byte[] | | +--------+-----------------------------------------------------------------------------------------+--------------------------------------------+- 第一個字節是魔法數定義為0X35。
- 第二個字節代表協議版本號。
- 第三個字節是請求類型,0代表請求1代表響應。
- 第四個字節表示消息長度,即此四個字節后面此長度的內容是消息content。
編碼器的實現如下:
@Override protected void encode(ChannelHandlerContext channelHandlerContext, ProtocolMsg protocolMsg, ByteBuf byteBuf)throws Exception {// 寫入協議頭byteBuf.writeByte(ProtocolConstant.MAGIC);// 寫入版本byteBuf.writeByte(ProtocolConstant.DEFAULT_VERSION);// 寫入請求類型byteBuf.writeByte(protocolMsg.getMsgType());// 寫入消息長度byteBuf.writeInt(protocolMsg.getContent().length);// 寫入消息內容byteBuf.writeBytes(protocolMsg.getContent()); }解碼器的實現如下:
六 測試
在本人MacBook Pro 13寸,4核I5,16g內存,使用Nacos注冊中心,啟動一個服務器,一個客戶端情況下,采用輪詢負載均衡策略的情況下,使用Apache ab測試。
在啟用8個線程發起10000個請求的情況下,可以做到 18秒完成所有請求,qps550:
在啟用100個線程發起10000個請求的情況下,可以做到 13.8秒完成所有請求,qps724:
七 總結
在實現這個RPC框架的過程中,我也重新學習了很多知識,比如通信協議、IO框架等。也橫向學習了當前最熱的gRPC,借此又看了很多相關的源碼,收獲很大。后續我也會繼續維護升級這個框架,比如引入熔斷降級等機制,做到持續學習持續進步。
原文鏈接
本文為阿里云原創內容,未經允許不得轉載。
總結
以上是生活随笔為你收集整理的自己动手从0开始实现一个分布式RPC框架的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 存储计算解耦合,构建中国人英语语音数据库
- 下一篇: 独家下载!小程序Serverless云上