百木园-与人分享,
就是让自己快乐。

我的Go gRPC之旅、02 四种通信模式

借助gRPC我们可以实现不同进程间通信模式(也称RPC风格)。

repeated 关键字

message Order {
    string id = 1;
    repeated string items = 2;
    string description = 3;
    float price = 4;
    string destination = 5;
}

使用repeated表明这个字段在消息中可以重复出现多次,包括0次。编译成go,结构体会表示成一个切片。

一元RPC模式

01 初识gRPC,感受gRPC的强大魅力 - 小能日记 - 博客园

一元RPC模式也被称为简单RPC模式。在该模式中,当客户端调用服务器端的远程方法时,客户端发送请求至服务器端并获得一个响应,与响应一起发送的还有状态细节以及trailer元数据。

   rpc addOrder(Order) returns (google.protobuf.StringValue);
   rpc getOrder(google.protobuf.StringValue) returns (Order);

编译后

func (s *server) GetOrder(ctx context.Context, orderId *wrapper.StringValue) (*pb.Order, error) {

其中Context对象传递到方法中是因为其包含了一些用于控制gRPC行为的构造,比如截止时间和取消功能。

服务器端流RPC模式

服务端在接收到客户端的请求消息后,会发回一个响应的序列。这种多个响应所组成的序列也被称为”流“。

在将所有的服务端响应发送完毕之后,服务端会以trailer元数据的形式将其状态发送给客户端,从而标记流的结束。

订单服务的客户端发出一个请求之后,会接收到多条响应消息。

    rpc searchOrders(google.protobuf.StringValue) returns (stream Order);

通过使用 returns (stream Order) 将返回参数指定为订单的流。编译后

func (s *server) SearchOrders(searchQuery *wrappers.StringValue, stream pb.OrderManagement_SearchOrdersServer) error {
	for key, order := range orderMap {
		for _, itemStr := range order.Items {
			if strings.Contains(itemStr, searchQuery.Value) {
				err := stream.Send(&order) 
                // 需要处理将消息以流的形式发送给客户端的过程中可能出现的错误
				if err != nil {
					return fmt.Errorf(\"error sending message to stream : %v\", err)
				}
				log.Print(\"Matching Order Found : \" + key)
				break
			}
		}
	}
	return nil
}

pb.OrderManagement_SearchOrdersServer 是服务端流的写入对象,可以写入多个响应。

客户端代码使用Recv方法从客户端流中检索消息,并且持续检索,直到流结束为止,即 io.EOF

searchStream, 	_ := client.SearchOrders(ctx, &wrapper.StringValue{Value: \"Google\"})
for {
    searchOrder, err := searchStream.Recv()
    if err == io.EOF {
        log.Print(\"EOF\")
        break
    }

    if err == nil {
        log.Print(\"Search Result : \", searchOrder)
    }
}

客户端流RPC模式

客户端会发送多个请求给服务端。服务端可以随时结束接收或接收所有消息后再发送响应。

    rpc updateOrders(stream Order) returns (google.protobuf.StringValue);

编译为

func (s *server) UpdateOrders(stream pb.OrderManagement_UpdateOrdersServer) error {
	ordersStr := \"Updated Order IDs : \"
	for {
		order, err := stream.Recv()
		if err == io.EOF {
            // 客户端已发送完毕,服务器可以响应
			return stream.SendAndClose(&wrapper.StringValue{Value: \"Orders processed \" + ordersStr})
		}

		if err != nil {
			return err
		}
		orderMap[order.Id] = *order

		log.Printf(\"Order ID : %s - %s\", order.Id, \"Updated\")
		ordersStr += order.Id + \", \"
	}
}

pb.OrderManagement_UpdateOrdersServer是客户端传入消息流的引用对象。

服务端调用该对象的SendAndClose方法可以发送响应,同时标记服务器端消息终结了流。

客户端调用对象的CloseAndRecv方法可以关闭流并接收响应。

	updateStream, err := client.UpdateOrders(ctx)

	if err != nil {
		log.Fatalf(\"%v.UpdateOrders(_) = _, %v\", client, err)
	}

	if err := updateStream.Send(&updOrder1); err != nil {
		log.Fatalf(\"%v.Send(%v) = %v\", updateStream, updOrder1, err)
	}

	if err := updateStream.Send(&updOrder2); err != nil {
		log.Fatalf(\"%v.Send(%v) = %v\", updateStream, updOrder2, err)
	}

	if err := updateStream.Send(&updOrder3); err != nil {
		log.Fatalf(\"%v.Send(%v) = %v\", updateStream, updOrder3, err)
	}
	// 结束流并等待服务端响应
	updateRes, err := updateStream.CloseAndRecv()
	if err != nil {
		log.Fatalf(\"%v.CloseAndRecv() got error %v, want %v\", updateStream, err, nil)
	}
	log.Printf(\"Update Orders Res : %s\", updateRes)

双向流模式

双向流模式中,客户端以消息流的形式发送请求到服务端,服务端也以消息流的形式进行响应。调用必须由客户端发起。流的操作完全独立,客户端和服务端可以按照任意顺序进行读取和写入。

    rpc processOrders(stream google.protobuf.StringValue) returns (stream CombinedShipment);

一旦调用RPC方法,那么无论是客户端还是服务端,都可以在任意时间发送消息。这也包括来自任意一段的流结束标记。编译后

func (s *server) ProcessOrders(stream pb.OrderManagement_ProcessOrdersServer) error {
	...
}

pb.OrderManagement_ProcessOrdersServer 是客户端和服务器端之间消息流的对象引用。既可以Recv方法读取,也可以Send方法写入。

客户端代码中可开启两个线程分别用于发送消息流和读取消息流。调用流引用对象的CloseSend方法可以关闭当前流并通知另一端,但另一端并未关闭,还可以发送数据。

	...
	streamProcOrder, err := client.ProcessOrders(ctx)
	if err != nil {
		log.Fatalf(\"%v.ProcessOrders(_) = _, %v\", client, err)
	}

	if err := streamProcOrder.Send(&wrapper.StringValue{Value:\"102\"}); err != nil {
		log.Fatalf(\"%v.Send(%v) = %v\", client, \"102\", err)
	}

	if err := streamProcOrder.Send(&wrapper.StringValue{Value:\"103\"}); err != nil {
		log.Fatalf(\"%v.Send(%v) = %v\", client, \"103\", err)
	}

	if err := streamProcOrder.Send(&wrapper.StringValue{Value:\"104\"}); err != nil {
		log.Fatalf(\"%v.Send(%v) = %v\", client, \"104\", err)
	}

	channel := make(chan struct{})
	go asncClientBidirectionalRPC(streamProcOrder, channel)
	time.Sleep(time.Millisecond * 1000)

	if err := streamProcOrder.Send(&wrapper.StringValue{Value:\"101\"}); err != nil {
		log.Fatalf(\"%v.Send(%v) = %v\", client, \"101\", err)
	}
	if err := streamProcOrder.CloseSend(); err != nil {
		log.Fatal(err)
	}
// 用channel保证main在读取消息流的go程结束后再结束
	channel <- struct{}{}
}
func asncClientBidirectionalRPC(streamProcOrder pb.OrderManagement_ProcessOrdersClient, c chan struct{}) {
	for {
		combinedShipment, errProcOrder := streamProcOrder.Recv()
		if errProcOrder == io.EOF {
			break
		}
		log.Printf(\"Combined shipment : \", combinedShipment.OrdersList)
	}
	<-c
}

来源:https://www.cnblogs.com/linxiaoxu/p/16709673.html
本站部分图文来源于网络,如有侵权请联系删除。

未经允许不得转载:百木园 » 我的Go gRPC之旅、02 四种通信模式

相关推荐

  • 暂无文章