go-grpc-流式接口(streaming rpc)

上一篇我们介绍了 rpc 最基本的应用,今天我们来看看 rpc 的另外一个数据交互方式 streaming rpc ,也就是流式接口。

streaming rpc 相比于 simple rpc 来说可以很好的解决一个接口发送大量数据的场景。

比如一个订单导出的接口有20万条记录,如果使用 simple rpc 来实现的话。那么我们需要一次性接收到20万记录才能进行下一步的操作。但是如果我们使用 streaming rpc 那么我们就可以接收一条记录处理一条记录,直到所以的数据传输完毕。 这样可以较少服务器的瞬时压力,也更有及时性

下面们来看看 streaming rpc 具体是怎么交互的。

IDL

syntax = "proto3";
package proto;

message Order {
    int32 id = 1;
    string orderSn = 2;
    string date = 3;
}
message OrderList{
    Order order = 1;
}
message OrderSearchParams {
}
message Image{
    string fileName = 1;
    string file = 2;
}
message ImageList{
    Image image = 1;
}
message uploadResponse{
}
message SumData{
    int32 number = 1;
}
service StreamService {
    rpc OrderList(OrderSearchParams) returns (stream OrderList){}; //服务端流式
    rpc UploadFile(stream ImageList) returns (uploadResponse){}; //客户端流式
    rpc SumData(stream SumData) returns (stream SumData){}; //双向流式
}

这里定义了三个方法

  • OrderList 服务器流式,客户端普通rpc调用
  • UploadFile 客户端流式,服务端普通rpc
  • SumData 双向流式

基础结构

server

package main

import (
    "google.golang.org/grpc"
    "iris-grpc-example/proto"
    "log"
    "net"
)
type  StreamServices struct {}
func main()  {
    server := grpc.NewServer()
    proto.RegisterStreamServiceServer(server, &StreamServices{})

    lis, err := net.Listen("tcp", "127.0.0.1:9528")
    if err != nil {
        log.Fatalf("net.Listen err: %v", err)
    }
    server.Serve(lis)
}
func (services *StreamServices)OrderList(params *proto.OrderSearchParams,stream proto.StreamService_OrderListServer) error {
    return  nil
}

func (services *StreamServices)UploadFile(stream proto.StreamService_UploadFileServer) error {
    return  nil
}

func (services *StreamServices)SumData(stream proto.StreamService_SumDataServer) error {
    return  nil
}

clietn

package main
import (
    "github.com/kataras/iris/v12"
    "google.golang.org/grpc"
    "iris-grpc-example/proto"
    "log"
)
var streamClient proto.StreamServiceClient
func main()  {
    app := iris.New()
    app.Logger().SetLevel("debug") //debug
    app.Handle("GET", "/testOrderList", orderList)
    app.Handle("GET", "/testUploadImage", uploadImage)
    app.Handle("GET", "/testSumData", sumData)
    app.Run(iris.Addr("127.0.0.1:8080"))
}
func init() {
    connect, err := grpc.Dial("127.0.0.1:9528", grpc.WithInsecure())
    if err != nil {
        log.Fatalln(err)
    }
    streamClient = proto.NewStreamServiceClient(connect)
}

func orderList(ctx iris.Context)  {
    
}

func uploadImage(ctx iris.Context)  {

}

func sumData(ctx iris.Context)  {

}

按照 proto 中的约定,先实现接口并注册一个服务。接下来我们依次来实现三个不同的流式方法。

服务端流式 orderList

server

func (services *StreamServices) OrderList(params *proto.OrderSearchParams, stream proto.StreamService_OrderListServer) error {
    for i := 0; i <= 10; i++ {
        order := proto.Order{
            Id:      int32(i),
            OrderSn: time.Now().Format("20060102150405") + "order_sn",
            Date:    time.Now().Format("2006-01-02 15:04:05"),
        }
        err := stream.Send(&proto.StreamOrderList{
            Order: ℴ,
        })
        if err != nil {
            return err
        }
    }
    return nil
}

gRPC为我们提供一个流的发送方法。 send ,这样我们可以很简单的以流的方式传递数据。

现在我们来查看 streaming.pb.go 中的 send

func (x *streamServiceOrderListServer) Send(m *StreamOrderList) error {
    return x.ServerStream.SendMsg(m)
}

可以看到最终是使用 ServerStream.SendMsg ,查看源码,可以发现,最终是使用了一个结构体。

type serverStream struct {
    ctx   context.Context
    ......

    maxReceiveMessageSize int
    maxSendMessageSize    int
    ......
}

这里我们关心两个值,最大可接收大小,最大发送大小。而再 SendMsg 中也有对于的大小判断,所以发送的消息大小不是无限制的。

// TODO(dfawley): should we be checking len(data) instead?
    if len(payload) > ss.maxSendMessageSize {
        return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payload), ss.maxSendMessageSize)
    }

我们可以服务端创建 server 的时候通过 server := grpc.NewServer(grpc.MaxSendMsgSize()) 来指定大小,有可以在客服端创建 client 的时候通过 connect, err := grpc.Dial("127.0.0.1:9528", grpc.WithInsecure(),grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize())) 的时候来指定,具体大家可以下来自己详细了解下配置项。

client

func orderList(ctx iris.Context) {
    stream, err := streamClient.OrderList(context.Background(), &proto.OrderSearchParams{})
    if err != nil {
        ctx.JSON(map[string]string{
            "err": err.Error(),
        })
        return
    }
    for {
        res, err := stream.Recv()
        if err == io.EOF {
            break
        }
        if err != nil {
            ctx.JSON(map[string]string{
                "err": err.Error(),
            })

            return
        }
        ctx.JSON(res)
        log.Println(res)
    }
}

这里在 for 循环中去读取数据,直到取到一个 io.EOF 的结束错误占位符。

客户端流式 uploadImage

server

func (services *StreamServices) UploadFile(stream proto.StreamService_UploadFileServer) error {
    for  {
         res,err := stream.Recv()
         //接收消息结束,发送结果,并关闭
        if err == io.EOF {
            return stream.SendAndClose(&proto.UploadResponse{})
        }
        if err !=nil {
            return err
        }
        fmt.Println(res)
    }
    return nil
}

可以看到这里我们同样使用 for 结合 stream.Recv() 来接收数据流,但是这里我们多一个 SendAndClose ,表示服务器已经接收消息结束,并发生一个正确的响应给客户端。

client

func uploadImage(ctx iris.Context) {
    stream,err := streamClient.UploadFile(context.Background())
    if err != nil {
        ctx.JSON(map[string]string{
            "err": err.Error(),
        })
        return
    }
    for i:=1;i<=10 ; i++ {
        img := &proto.Image{FileName:"image"+strconv.Itoa(i),File:"file data"}
        images := &proto.StreamImageList{Image:img}
        err := stream.Send(images)
        if err != nil {
            ctx.JSON(map[string]string{
                "err": err.Error(),
            })
            return
        }
    }
    //发送完毕 关闭并获取服务端返回的消息
    resp, err := stream.CloseAndRecv()
    if err != nil {
        ctx.JSON(map[string]string{
            "err": err.Error(),
        })
        return
    }
    ctx.JSON(map[string]interface{}{"result": resp,"message":"success"})
    log.Println(resp)
}

而在客户端发送数据完毕的时候需要使用 CloseAndRecv 需要接收服务端接收完毕的通知以及关闭当前通道。

双向流式 uploadImage

server

func (services *StreamServices) SumData(stream proto.StreamService_SumDataServer) error {
    i := 0
    for {
        err := stream.Send(&proto.StreamSumData{Number: int32(i)})
        if err != nil {
            return err
        }
        res, err := stream.Recv()
        if err == io.EOF {
            return nil
        }
        log.Printf("res:%d,i:%d,sum:%d\r\n", res.Number, i, int32(i)+res.Number)
        i++
    }
}

服务端在发送消息的同时,并接收服务端发送的消息。

client

func sumData(ctx iris.Context) {
    stream, err := streamClient.SumData(context.Background())
    if err != nil {
        ctx.JSON(map[string]string{
            "err": err.Error(),
        })
        return
    }
    for i := 1; i <= 10; i++ {
        err = stream.Send(&proto.StreamSumData{Number: int32(i)})
        if err == io.EOF {
            break
        }
        if err != nil {
            return
        }
        res, err := stream.Recv()
        if err == io.EOF {
            break
        }
        if err != nil {
            return
        }
        log.Printf("res number:%d", res.Number)
    }
    stream.CloseSend()
    return
}

上面我们可以看到。客户端有一个执行断开连接的标识 CloseSend() ,而服务器没有, 因为服务端断开连接是隐式的,我们只需要退出循环即可断开连接 。可以灵活的控制。

上面就是 go-gRPC 的流式接口。只是一个简单的例子。如有不妥的地方欢迎指出。感谢。

学习于-煎鱼

期待一起交流

我来评几句
登录后评论

已发表评论数()

相关站点

热门文章