跟着gRPC源码简单学习RPC原理

RPC(Remote Procedure Call/远程过程调用)是一种服务间交互的方式,达到 像本地方法一样调用远程方法 的目的。

看看源码,简单了解gRPC是怎么实现RPC的,以gRPC官方代码示例 helloworld 为例。

服务注册

先看一下服务端怎么注册服务。

helloworld.pb.go 文件中,会有 RegisterGreeterServer 方法以及 _Greeter_serviceDesc 变量,

_Greeter_serviceDesc 描述了服务的属性。 RegisterGreeterServer 方法会向gRPC服务端 s 注册服务 srv

//...
func RegisterGreeterServer(s *grpc.Server, srv GreeterServer) {
    s.RegisterService(&_Greeter_serviceDesc, srv)
}
//...
var _Greeter_serviceDesc = grpc.ServiceDesc{
    ServiceName: "helloworld.Greeter", // ServiceName包括`.proto`文件中的package名称
    HandlerType: (*GreeterServer)(nil),
    Methods: []grpc.MethodDesc{
        {
            MethodName: "SayHello",
            Handler:    _Greeter_SayHello_Handler,
        },
    },
    Streams:  []grpc.StreamDesc{},
    Metadata: "helloworld.proto",
}

server.go 中的 RegisterService 方法,会判断ServiceServer是否实现sd中描述的HandlerType;如果实现了则调用 s.register 方法注册。

func (s *Server) RegisterService(sd *ServiceDesc, ss interface{}) {
    ht := reflect.TypeOf(sd.HandlerType).Elem()
    st := reflect.TypeOf(ss)
    if !st.Implements(ht) { // 判断ServiceServer是否实现sd中描述的HandlerType
        grpclog.Fatalf("grpc: Server.RegisterService found the handler of type %v that does not satisfy %v", st, ht)
    }
    s.register(sd, ss)
}

register 根据 Method 创建对应的map,并将名称作为键,方法描述(指针)作为值,添加到相应的map中。

最后将{服务名称:服务}添加到服务端。

func (s *Server) register(sd *ServiceDesc, ss interface{}) {
    s.mu.Lock()
    defer s.mu.Unlock()
    s.printf("RegisterService(%q)", sd.ServiceName)
    if s.serve {// 服务端是否启动
        grpclog.Fatalf("grpc: Server.RegisterService after Server.Serve for %q", sd.ServiceName)
    }
    if _, ok := s.m[sd.ServiceName]; ok {//服务是否已经注册
        grpclog.Fatalf("grpc: Server.RegisterService found duplicate service registration for %q", sd.ServiceName)
    }
    srv := &service{
        server: ss,
        md:     make(map[string]*MethodDesc), // map of Methods
        sd:     make(map[string]*StreamDesc), // map of Stream
        mdata:  sd.Metadata,
    }
    for i := range sd.Methods {
        d := &sd.Methods[i]
        srv.md[d.MethodName] = d
    }
    for i := range sd.Streams {
        d := &sd.Streams[i]
        srv.sd[d.StreamName] = d
    }
    s.m[sd.ServiceName] = srv // 添加服务到服务端
}

服务调用

服务端注册了服务之后,接下来就是调用服务。

客户端调用服务

helloworld.pb.go 中的 SayHello 方法中使用 c.cc.Invoke 远程调用服务端:

func (c *greeterClient) SayHello(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (*HelloReply, error) {
    out := new(HelloReply)
    // 远程调用服务端的方法
    err := c.cc.Invoke(ctx, "/helloworld.Greeter/SayHello", in, out, opts...)
    if err != nil {
        return nil, err
    }
    return out, nil
}

Invoke 的定义如下, "/helloworld.Greeter/SayHello" 对应的是 method 参数。

func (cc *ClientConn) Invoke(ctx context.Context, method string, args, reply interface{}, opts ...CallOption) error

method 被传入后,依次被 invokenewClientStream 方法调用,保存在 cs.callHdr.Method ;后续在 newAttemptLocked 方法中生成 cs.attempt.t

cs.SendMsg 最终通过 csAttempt.sendMsg 方法中调用 a.t.Write 方法写出请求 req

最后 cs.RecvMsg 轮询获取返回结果。

func invoke(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error {
    cs, err := newClientStream(ctx, unaryStreamDesc, cc, method, opts...)
    if err != nil {
        return err
    }
    if err := cs.SendMsg(req); err != nil {
        return err
    }
    return cs.RecvMsg(reply)
}

服务端执行调用

handleStream 方法会处理请求中的 Method 字段,获得服务名与方法名。调用 s.processUnaryRPC , 调用 md.Handler 方法(即 _Greeter_SayHello_Handler 方法)。

最终调用 s.sendResponse 方法返回结果。

func _Greeter_SayHello_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
    in := new(HelloRequest)
    if err := dec(in); err != nil {
        return nil, err
    }
    if interceptor == nil {
        return srv.(GreeterServer).SayHello(ctx, in)
    }
    info := &grpc.UnaryServerInfo{
        Server:     srv,
        FullMethod: "/helloworld.Greeter/SayHello",
    }
    handler := func(ctx context.Context, req interface{}) (interface{}, error) {
        return srv.(GreeterServer).SayHello(ctx, req.(*HelloRequest))
    }
    return interceptor(ctx, in, info, handler)
}
我来评几句
登录后评论

已发表评论数()

相关站点

热门文章