自己写RPC之实现服务注册与发现

本文开始继续我们的造轮子之旅。

这个系列笔者将带领读者朋友实现简易的基于Netty、curator以及springBoot等技术的一个简易RPC通信轮子。

实现最基本的服务发现、服务注册、RPC通信等功能。该项目命名为: misaka ,她是《某科学的超电磁炮》的女主角御坂美琴的名字。

本文是该系列的第一篇,主要实现服务注册与发现功能。

我选择zookeeper作为服务注册发现的核心组件,使用curator作为与zookeeper通信的客户端。

curator提供了一个服务注册发现的实现, curator-x-discovery ,只需要在项目中引入即可。

建立项目misaka-api,在pom中引入如下依赖:

<!-- https://mvnrepository.com/artifact/org.apache.curator/curator-framework -->
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-framework</artifactId>
    <version>4.2.0</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.curator/curator-x-discovery -->
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-x-discovery</artifactId>
    <version>4.2.0</version>
</dependency>

<!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.44.Final</version>
</dependency>

引入netty作为网络通信组件。

服务发现核心类:DiscoveryService

声明一些变量以便后续使用

/**zk连接地址*/
private static final String connectString = "127.0.0.1:2181";
/**CuratorFramework*/
private CuratorFramework client = null;
/**服务发现实例*/
private ServiceDiscovery<String> discovery = null;
/**服务提供者实例*/
private ServiceProvider<String> provider = null;
/**zk根节点*/
private static final String BASE_PATH = "/misaka";
/**记录服务提供者列表,便于统一进行关闭*/
private List<Closeable> closeableList = Lists.newArrayList();

private Object lock = new Object();
/** 服务注册表*/
private Map<String, ServiceProvider<String>> providers = Maps.newConcurrentMap();

对外提供一个init方法,用于初始化Curator客户端并启动

public void init() {
    // 初始化Curator客户端并启动
    client = CuratorFrameworkFactory.newClient(connectString, new RetryOneTime(1));
    client.start();
    // 构造服务发现
    discovery =
            ServiceDiscoveryBuilder.builder(String.class)
                    .basePath(BASE_PATH)
                    .client(client)
                    .build();
    try {
        discovery.start();
    } catch (Exception e) {
        throw new RuntimeException("启动服务发现异常", e);
    }
}

启动Curator客户端之后,通过ServiceDiscoveryBuilder这个构造器实例化一个ServiceDiscovery对象,并启动服务发现实例。

对外提供一个服务注册方法,用于在应用启动阶段对服务进行注册,将服务元信息写入zookeeper中

public void registeService(ServiceInstanceNode node) throws Exception {
    if (node.getPort() == null) {
        return;
    }
    ServiceInstance<String> serviceInstance =
            ServiceInstance.<String>builder()
                    .payload(node.getPayload())
                    .name(node.getServiceName())
                    .port(Integer.valueOf(node.getPort())).build();
    discovery.registerService(serviceInstance);
}

我们定义了一个ServiceInstanceNode实例,通过ServiceInstanceNode实例初始化一个ServiceInstance实例,ServiceInstance作为服务实例通过discovery.registerService方法注册到zookeeper中。

对外提供一个 getInstanceByName 方法,允许调用方根据服务名查询具体的ServiceInstance实例,从而获取服务的具体信息。

我们看一下ServiceInstance具体有哪些属性:

public class ServiceInstance<T> {
    private final String        name;                   // 服务名称
    private final String        id;                     // 服务id
    private final String        address;                // 可寻址的服务域名或ip
    private final Integer       port;                   // 服务暴露端口
    private final Integer       sslPort;                // ssl端口
    private final T             payload;                // 自定义信息
    ......

可以看到,ServiceInstance包含了服务名称、服务id、服务地址以及端口等信息,我们提供的getInstanceByName方法给客户端,方便客户端获取服务元信息,从而实现服务发现。

getInstanceByName方法逻辑如下:

public ServiceInstance<String> getInstanceByName(String serviceName) throws Exception {
    ServiceProvider<String> provider = providers.get(serviceName);
    if (provider == null) {
        synchronized (lock) {
            provider = providers.get(serviceName);
            if (provider == null) {
                // 随机策略
                provider = discovery.serviceProviderBuilder().serviceName(serviceName)
                        .providerStrategy(new RandomStrategy<String>()).build();
                provider.start();
                closeableList.add(provider);
                providers.put(serviceName, provider);
            }
        }
    }
    return provider.getInstance();
}

providers是基于ConcurrentHashMap实现的一个服务注册表。每次来的时候先从provider中尝试获取ServiceProvider实例,如果获取不到则使用双重检查锁机制从discovery中获取服务提供者provider,这里使用了随机策略RandomStrategy(其余实现还有轮询策略、sticky粘滞策略)。

在serviceProvider中获取到provider提供者 之后,添加到providers中,同时添加到closeableList以便后续调用shutdown对所有的provider进行关闭操作。

提供一个shutdown方法用于关闭资源

public synchronized void shutdown() {
    for (Closeable closeable : closeableList) {
        CloseableUtils.closeQuietly(closeable);
    }
    CloseableUtils.closeQuietly(discovery);
    CloseableUtils.closeQuietly(client);
}

这里我们对closeableList进行迭代,逐个关闭ServiceProvider实例。

ServiceProvider实现了Closeable接口,因此它也是一个Closeable的实例。

到这里,我们就实现了服务注册与发现的核心功能,接着通过一个demo案例去测试一下。

服务注册实现

新建服务misaka-provider,作为服务提供者,它在启动之后会对服务进行注册。

编写RegistyHandler添加@Configuration注解,标记为一个配置类。

注册DiscoveryService实例

@Bean(initMethod = "init", destroyMethod = "shutdown")
public DiscoveryService discoveryService() {
    DiscoveryService discoveryService = new DiscoveryService();
    return discoveryService;
}

声明并向Spring容器中注册DiscoveryService实例,标记初始化方法为init,销毁方法为shutdown

注册服务实现类HelloServiceImpl

定义RPC接口HelloService

public interface HelloService {

    public String sayHello(String name, String content);
}

声明一个sayHello方法,用过dubbo等RPC框架的同学应当很熟悉了。没错,这里的HelloService在Dubbo中就是服务定义接口。

编写实现类HelloServiceImpl实现HelloService

public class HelloServiceImpl implements HelloService {

    @Override
    public String sayHello(String name, String content) {
        return new StringBuilder("hello:").append(name).append(", content:").append(content).toString();
    }
}

注册服务实现类

在RegistyHandler注册类中,注册HelloServiceImpl实例,并将其元信息注册到discoveryService中。

@Bean
@ConditionalOnBean(value = DiscoveryService.class)
public HelloServiceImpl helloServiceImpl(DiscoveryService discoveryService) {
    HelloServiceImpl helloService = new HelloServiceImpl();
    ServiceInstanceNode helloServiceNode = new ServiceInstanceNode();
    // 服务注册
    helloServiceNode.setPort(servicePort).setAddress(ipAddress).setServiceName(HelloServiceImpl.class.getName());
    try {
        discoveryService.registeService(helloServiceNode);
    } catch (Exception e) {
        throw new RuntimeException("注册HelloServiceImpl异常");
    }
    return helloService;
}

通过 @ConditionalOnBean(value = DiscoveryService.class) 条件注册告诉Spring容器只有存在DiscoveryService实例才注册HelloServiceImpl。

构造一个ServiceInstanceNode,设置属性后通过discoveryService.registeService方法将元信息注册到zookeeper中。

服务注册部分的开发就告一段落,我们接着看下服务发现部分的代码实现。

服务发现实现

新建服务misaka-consumer,作为服务提供者,它会根据服务名称取zookeeper进行查询,获取具体的服务元信息。

和服务提供者服务类似,定义一个RegistyHandler类,添加注解@Configuration。

注册DiscoveryService实例

@Bean(initMethod = "init", destroyMethod = "shutdown")
public DiscoveryService discoveryService() {
    DiscoveryService discoveryNode = new DiscoveryService();
    return discoveryNode;
}

向Spring容器中注册DiscoveryService实例,用于服务发现。

@Bean(destroyMethod = "shutdown")
@ConditionalOnBean(DiscoveryService.class)
public RemoteClient remoteClient(DiscoveryService discoveryService) {
    RemoteClient remoteClient;
    try {
        // 服务发现
        ServiceInstance<String> serviceInstance = discoveryService.getInstanceByName(helloServiceName);
        LOGGER.info(JSON.toJSONString(serviceInstance));
        ...省略其他逻辑...

    } catch (Exception e) {
        throw new RuntimeException("init RuntimeException error!", e);
    }
    return remoteClient;
}

注册客户端通信RemoteClient实例,这个类的作用为封装Netty用于RPC网络通信,具体逻辑在后续的通信实现部分进行讲解。

我们注意看try-catch中的代码,从DiscoveryService中获取了服务名为 key = helloServiceName 的服务实现,value通过@Value注解获取,具体值配置在application.properties中。

misaka.service.HelloService.name=com.snowalker.misaka.misaka.service.HelloServiceImpl

在RegistyHandler声明如下:

@Value("${misaka.service.HelloService.name}")
private String helloServiceName;

通过discoveryService.getInstanceByName方法获取到helloServiceName对应的具体服务元信息后,我们通过日志进行打印。

接下来先后启动提供者服务,消费者服务,对服务注册发现逻辑进行测试。

测试服务注册及发现

首先启动提供者服务,控制台输出如下:

......
2020-02-03 20:11:34.304  INFO 31092 --- [           main] org.apache.zookeeper.ZooKeeper           : 
Initiating client connection, connectString=127.0.0.1:2181 sessionTimeout=60000 watcher=org.apache.curator.ConnectionState@31c269fd
2020-02-03 20:11:34.313  INFO 31092 --- [           main] org.apache.zookeeper.ClientCnxnSocket    : 
jute.maxbuffer value is 4194304 Bytes
2020-02-03 20:11:34.317  INFO 31092 --- [127.0.0.1:2181)] org.apache.zookeeper.ClientCnxn          : 
Opening socket connection to server 127.0.0.1/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)
2020-02-03 20:11:34.319  INFO 31092 --- [127.0.0.1:2181)] org.apache.zookeeper.ClientCnxn          : 
Socket connection established, initiating session, client: /127.0.0.1:8071, server: 127.0.0.1/127.0.0.1:2181
2020-02-03 20:11:34.320  INFO 31092 --- [           main] o.a.c.f.imps.CuratorFrameworkImpl        : 
Default schema
2020-02-03 20:11:34.323  INFO 31092 --- [127.0.0.1:2181)] org.apache.zookeeper.ClientCnxn          : 
Session establishment complete on server 127.0.0.1/127.0.0.1:2181, sessionid = 0x1000fa2eb800032, negotiated timeout = 40000
2020-02-03 20:11:34.327  INFO 31092 --- [ain-EventThread] o.a.c.f.state.ConnectionStateManager     : 
State change: CONNECTED
2020-02-03 20:14:27.180  INFO 31048 --- [           main] c.s.misaka.misaka.config.RegistyHandler  : 
service : com.snowalker.misaka.misaka.service.HelloServiceImpl  registered success

......

可以看到已经与zookeeper建立了链接,并注册helloServiceImpl服务到zookeeper中。

接着启动服务消费者服务,控制台输出如下:

2020-02-03 17:52:49.673  INFO 6824 --- [127.0.0.1:2181)] org.apache.zookeeper.ClientCnxn          : 
Opening socket connection to server 127.0.0.1/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)
2020-02-03 17:52:49.674  INFO 6824 --- [127.0.0.1:2181)] org.apache.zookeeper.ClientCnxn          : 
Socket connection established, initiating session, client: /127.0.0.1:4290, server: 127.0.0.1/127.0.0.1:2181
2020-02-03 17:52:49.676  INFO 6824 --- [           main] o.a.c.f.imps.CuratorFrameworkImpl        : 
Default schema
2020-02-03 17:52:49.678  INFO 6824 --- [127.0.0.1:2181)] org.apache.zookeeper.ClientCnxn          : 
Session establishment complete on server 127.0.0.1/127.0.0.1:2181, sessionid = 0x1000fa2eb800031, negotiated timeout = 40000
2020-02-03 17:52:49.682  INFO 6824 --- [ain-EventThread] o.a.c.f.state.ConnectionStateManager     : 
State change: CONNECTED
2020-02-03 17:52:49.790  INFO 6824 --- [           main] c.s.m.m.config.RegistyHandler            : 
{"address":"192.168.0.100","enabled":true,"id":"5e44a372-f1e0-44da-8a22-daa81a346f37",
        "name":"com.snowalker.misaka.misaka.service.HelloServiceImpl","port":18083,
        "registrationTimeUTC":1580723559543,"serviceType":"DYNAMIC"}

注意观察最后一行日志,这里打印出了HelloServiceImpl服务的注册元信息,该元信息即是提供者服务启动时注册到zookeeper上的服务元信息。

我们使用zk-cli观察一下zookeeper中的Node节点:

笔者已经在本地部署了一套zk-ui,关于zk-ui的使用可以自行查看附录中的参考链接。

从图中可以看出,服务HelloServiceImpl已经成功注册到zookeeper上,并且能够被服务消费者发现。

小结

作为 “自己写RPC” 系列的第一篇,本文详细的讲解了如何利用curator整合spring Boot框架实现跨服务的服务注册与发现功能,并且给出了详细的代码实现与讲解。

在开发过程中,笔者强烈地体会到掌握zookeeper组件对于后端开发者的必要性,zookeeper真的很强大。

在后续的文章中,我将继续带领读者,实现远程服务调用逻辑。

版权声明:

原创不易,洗文可耻。除非注明,本博文章均为原创,转载请以链接形式标明本文地址。

我来评几句
登录后评论

已发表评论数()

相关站点

+订阅
热门文章