客户端/服务端(一元)

  1. product.proto 文件。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
syntax = "proto3";

package ecommerce;

import "google/protobuf/wrappers.proto";

option go_package = "ecommerce/";

message Order {
  string id = 1;
  repeated string items = 2;
  string description = 3;
  float price = 4;
  string destination = 5;
}

service OrderManagement {
  rpc getOrder(google.protobuf.StringValue) returns (Order);
}
  1. 生成 Go 和 gRPC 文件:
$ protoc -I ./pb \
--go_out ./ecommerce --go_opt paths=source_relative \
--go-grpc_out ./ecommerce --go-grpc_opt paths=source_relative \
./pb/product.proto
  1. 服务端实现。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
package main

import (
    "context"
    "crypto/tls"
    "fmt"
    "net"

    pb "example.com/learn-grpc-01/ecommerce"
    "google.golang.org/grpc"
    "google.golang.org/grpc/codes"
    "google.golang.org/grpc/credentials"
    "google.golang.org/grpc/status"
    "google.golang.org/protobuf/types/known/wrapperspb"
)

func main() {
    // 方法一
    // creds, err1 := credentials.NewServerTLSFromFile(
    //		"/root/workspace/learn-grpc/key/test.pem",
    //		"/root/workspace/learn-grpc/key/test.key",
    //	)
    //
    //	if err1 != nil {
    //		fmt.Printf("证书错误:%v", err1)
    //		return
    //	}

    // 方法二
    cert, err := tls.LoadX509KeyPair(
        "/root/workspace/learn-grpc-01/key/test.pem",
        "/root/workspace/learn-grpc-01/key/test.key")
    if err != nil {
        fmt.Printf("私钥错误:%v", err)
        return
    }
    creds := credentials.NewServerTLSFromCert(&cert)

    listen, _ := net.Listen("tcp", ":9090")
    grpcServer := grpc.NewServer(grpc.Creds(creds))
    pb.RegisterOrderManagementServer(grpcServer, &server{})

    // 启动服务
    err = grpcServer.Serve(listen)
    if err != nil {
        fmt.Println(err)
        return
    }
}

// 编译器静态检查,*server是否实现了pb.OrderManagementServer接口
var _ pb.OrderManagementServer = (*server)(nil)

var orders = make(map[string]pb.Order, 8)

func init() {
    // 测试数据
    orders["1"] = pb.Order{Id: "1", Items: []string{"1", "2", "3", "4", "5", "6"}, Destination: "101"}
    orders["2"] = pb.Order{Id: "2", Items: []string{"6", "5", "4", "3", "2", "1"}, Destination: "102"}
}

type server struct {
    pb.UnimplementedOrderManagementServer
}

// GetOrder 获取订单信息
func (s *server) GetOrder(ctx context.Context, orderId *wrapperspb.StringValue) (*pb.Order, error) {
    ord, exists := orders[orderId.Value]
    if exists {
        return &ord, status.New(codes.OK, "").Err()
    }

    return nil, status.Errorf(codes.NotFound, "Order does not exist. : ", orderId)
}
  1. 客户端实现。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package main

import (
    "context"
    "fmt"
    "log"
    "time"

    pb "example.com/learn-grpc-01/ecommerce"
    "google.golang.org/grpc"
    "google.golang.org/grpc/credentials"
    "google.golang.org/protobuf/types/known/wrapperspb"
)

func main() {
    creds, _ := credentials.NewClientTLSFromFile(
        "/root/workspace/learn-grpc-01/key/test.pem",
        "*.heliu.site",
    )

    var opts []grpc.DialOption
    // 不带TLS这里是grpc.WithTransportCredentials(insecure.NewCredentials())
    opts = append(opts, grpc.WithTransportCredentials(creds))

    // 连接server端,使用ssl加密通信
    conn, err := grpc.NewClient("127.0.0.1:9090", opts...)
    if err != nil {
        log.Fatalf("did not connect: %v", err)
    }

    defer conn.Close()

    client := pb.NewOrderManagementClient(conn)

    fmt.Printf("now-Time: %s\n", time.Now().Format(time.DateTime))
    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()

    // 执行rpc调用(这个方法在服务器端来实现并返回结构)
    resp, err := client.GetOrder(ctx, &wrapperspb.StringValue{Value: "2"})

    if err != nil {
        fmt.Printf("%v\n", err)
        return
    }

    // id:"2"  items:"6"  items:"5"  items:"4"  items:"3"  items:"2"  items:"1"  destination:"102"
    fmt.Println(resp.String())
}
  1. github 参考代码:https://github.com/helium-chain/grpc-test01

服务端(流)/客户端(一元)

  1. 简单来讲就是客户端发起一次普通的 RPC 请求,服务端通过流式响应多次发送数据集,客户端 Recv 接收数据集。

  1. product.proto 文件。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
syntax = "proto3";

package ecommerce;

option go_package = "ecommerce/";

import "google/protobuf/wrappers.proto";

message Order {
  string id = 1;
  repeated string items = 2;
  string description = 3;
  float price = 4;
  string destination = 5;
}

service OrderManagement {
  rpc searchOrders(google.protobuf.StringValue) returns (stream Order);
}
  1. 生成 Go 和 gRPC 文件:
$ protoc -I ./pb \
--go_out ./ecommerce --go_opt paths=source_relative \
--go-grpc_out ./ecommerce --go-grpc_opt paths=source_relative \
./pb/product.proto
  1. 服务端实现。
    • 因为我们的服务端是流式响应的,因此对于服务端来说函数入参多了一个stream OrderManagement_SearchOrdersServer参数用来写入多个响应,可以把它看作是客户端的对象。
    • 可以通过调用这个流对象的Send(…),来往客户端写入数据。
    • 通过返回nil或者error来表示全部数据写完了。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
package main

import (
    "crypto/tls"
    "fmt"
    "net"
    "strings"

    pb "example.com/learn-grpc-02/ecommerce"
    "google.golang.org/grpc"
    "google.golang.org/grpc/codes"
    "google.golang.org/grpc/credentials"
    "google.golang.org/grpc/status"
    "google.golang.org/protobuf/types/known/wrapperspb"
)

func main() {
    // 方法一
    // creds, err1 := credentials.NewServerTLSFromFile(
    //		"/root/workspace/learn-grpc/key/test.pem",
    //		"/root/workspace/learn-grpc/key/test.key",
    //	)
    //
    //	if err1 != nil {
    //		fmt.Printf("证书错误:%v", err1)
    //		return
    //	}

    // 方法二
    cert, err := tls.LoadX509KeyPair(
        "/root/workspace/learn-grpc-02/key/test.pem",
        "/root/workspace/learn-grpc-02/key/test.key")
    if err != nil {
        fmt.Printf("私钥错误:%v", err)
        return
    }
    creds := credentials.NewServerTLSFromCert(&cert)

    listen, _ := net.Listen("tcp", ":9090")
    grpcServer := grpc.NewServer(grpc.Creds(creds))
    pb.RegisterOrderManagementServer(grpcServer, &service{})

    // 启动服务
    err = grpcServer.Serve(listen)
    if err != nil {
        fmt.Println(err)
        return
    }
}

var _ pb.OrderManagementServer = (*service)(nil)

var orders = make(map[string]pb.Order, 8)

func init() {
    // 测试数据
    orders["1"] = pb.Order{Id: "1", Items: []string{"1", "2", "3", "4", "5", "7"}, Destination: "101"}
    orders["2"] = pb.Order{Id: "2", Items: []string{"6", "5", "4", "3", "2", "0"}, Destination: "102"}
}

type service struct {
    pb.UnimplementedOrderManagementServer
}

// SearchOrders 搜索订单
func (s *service) SearchOrders(query *wrapperspb.StringValue, stream pb.OrderManagement_SearchOrdersServer) error {
    for _, order := range orders {
        for _, str := range order.Items {
            if strings.Contains(str, query.GetValue()) {
                if err := stream.Send(&order); err != nil {
                    return status.Error(codes.NotFound, "not fond")
                }
            }
        }
    }

    return status.New(codes.OK, "").Err()
}
  1. 客户端实现。
    • 因为我们的服务端是流式响应的,因此 RPC 函数返回值stream是一个流,可以把它看作是服务端的对象。
    • 使用stream的Recv函数来不断从服务端接收数据。
    • 当Recv返回io.EOF代表流已经结束。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
package main

import (
    "context"
    "fmt"
    "io"
    "log"
    "time"

    pb "example.com/learn-grpc-02/ecommerce"
    "google.golang.org/grpc"
    "google.golang.org/grpc/credentials"
    "google.golang.org/protobuf/types/known/wrapperspb"
)

func main() {
    creds, _ := credentials.NewClientTLSFromFile(
        "/root/workspace/learn-grpc-02/key/test.pem",
        "*.heliu.site",
    )

    var opts []grpc.DialOption
    // 不带TLS这里是grpc.WithTransportCredentials(insecure.NewCredentials())
    opts = append(opts, grpc.WithTransportCredentials(creds))

    // 连接server端,使用ssl加密通信
    conn, err := grpc.NewClient("127.0.0.1:9090", opts...)
    if err != nil {
        log.Fatalf("did not connect: %v", err)
    }

    defer conn.Close()

    client := pb.NewOrderManagementClient(conn)

    fmt.Printf("now-Time: %s\n", time.Now().Format(time.DateTime))
    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()

    stream, err := client.SearchOrders(ctx, &wrapperspb.StringValue{Value: "5"})

    if err != nil {
        log.Fatalf("error when calling SearchOrders: %v", err)
    }

    for {
        order, err := stream.Recv()
        if err == io.EOF {
            break
        }

        log.Println("SearchOrders:", order)
    }

    // Output:
    // 2024/09/13 12:24:17 SearchOrders: id:"1" items:"1" items:"2" items:"3" items:"4" items:"5" items:"7" destination:"101"
    // 2024/09/13 12:24:17 SearchOrders: id:"2" items:"6" items:"5" items:"4" items:"3" items:"2" items:"0" destination:"102"
}
  1. github 参考代码:https://github.com/helium-chain/grpc-test02

服务端(一元)/客户端(流)

  1. 服务端没有必要等到客户端发送完所有请求再响应,可以在收到部分请求之后就响应。

  1. product.proto 文件。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
syntax = "proto3";

package ecommerce;

option go_package = "ecommerce/";

import "google/protobuf/wrappers.proto";

message Order {
  string id = 1;
  repeated string items = 2;
  string description = 3;
  float price = 4;
  string destination = 5;
}

service OrderManagement {
  rpc updateOrders(stream Order) returns (google.protobuf.StringValue);
}
  1. 生成 Go 和 gRPC 文件:
$ protoc -I ./pb \
--go_out ./ecommerce --go_opt paths=source_relative \
--go-grpc_out ./ecommerce --go-grpc_opt paths=source_relative \
./pb/product.proto
  1. 服务端实现。
    • 因为我们的客户端是流式请求的,因此请求参数stream OrderManagement_UpdateOrdersServer就是流对象。
    • 可以从stream OrderManagement_UpdateOrdersServer的Recv函数读取消息。
    • 当Recv返回io.EOF代表流已经结束。
    • 使用stream OrderManagement_UpdateOrdersServer的SendAndClose函数关闭并发送响应。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
package main

import (
    "crypto/tls"
    "fmt"
    "io"
    "log"
    "net"

    pb "example.com/learn-grpc-03/ecommerce"
    "google.golang.org/grpc"
    "google.golang.org/grpc/credentials"
    "google.golang.org/protobuf/types/known/wrapperspb"
)

func main() {
    // 方法一
    // creds, err1 := credentials.NewServerTLSFromFile(
    //		"/root/workspace/learn-grpc/key/test.pem",
    //		"/root/workspace/learn-grpc/key/test.key",
    //	)
    //
    //	if err1 != nil {
    //		fmt.Printf("证书错误:%v", err1)
    //		return
    //	}

    // 方法二
    cert, err := tls.LoadX509KeyPair(
        "/root/workspace/learn-grpc-02/key/test.pem",
        "/root/workspace/learn-grpc-02/key/test.key")
    if err != nil {
        fmt.Printf("私钥错误:%v", err)
        return
    }
    creds := credentials.NewServerTLSFromCert(&cert)

    listen, _ := net.Listen("tcp", ":9090")
    grpcServer := grpc.NewServer(grpc.Creds(creds))
    pb.RegisterOrderManagementServer(grpcServer, &service{})

    // 启动服务
    err = grpcServer.Serve(listen)
    if err != nil {
        fmt.Println(err)
        return
    }
}

var orders = make(map[string]pb.Order, 8)

var _ pb.OrderManagementServer = (*service)(nil)

type service struct {
    pb.UnimplementedOrderManagementServer
}

// UpdateOrders updates
// 在这段程序中,我们对每一个 Recv 都进行了处理
// 当发现 io.EOF (流关闭) 后,需要将最终的响应结果发送给客户端,同时关闭正在另外一侧等待的 Recv
func (s *service) UpdateOrders(stream pb.OrderManagement_UpdateOrdersServer) error {
    ordersStr := "Updated Order IDs : "
    for {
        order, err := stream.Recv()
        if err == io.EOF {
            // Finished reading the order stream.
            return stream.SendAndClose(
                &wrapperspb.StringValue{Value: "Orders processed " + ordersStr})
        }
        // Update order
        orders[order.Id] = *order

        log.Println("Order ID ", order.Id, ": Updated")
        ordersStr += order.Id + ", "
    }
}
  1. 客户端实现。
    • 因为我们的客户端是流式响应的,因此 RPC 函数返回值stream是一个流。
    • 可以通过调用这个流对象的Send(…),来往这个对象写入数据。
    • 使用stream的CloseAndRecv函数关闭并发送响应。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
package main

import (
    "context"
    "fmt"
    "log"
    "time"

    pb "example.com/learn-grpc-03/ecommerce"
    "google.golang.org/grpc"
    "google.golang.org/grpc/credentials"
)

func main() {
    creds, _ := credentials.NewClientTLSFromFile(
        "/root/workspace/learn-grpc-02/key/test.pem",
        "*.heliu.site",
    )

    var opts []grpc.DialOption
    // 不带TLS这里是grpc.WithTransportCredentials(insecure.NewCredentials())
    opts = append(opts, grpc.WithTransportCredentials(creds))

    // 连接server端,使用ssl加密通信
    conn, err := grpc.NewClient("127.0.0.1:9090", opts...)
    if err != nil {
        log.Fatalf("did not connect: %v", err)
    }

    defer conn.Close()

    client := pb.NewOrderManagementClient(conn)

    fmt.Printf("now-Time: %s\n", time.Now().Format(time.DateTime))
    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()

    stream, err := client.UpdateOrders(ctx)

    if err != nil {
        log.Fatalf("error when calling SearchOrders: %v", err)
    }

    if err := stream.Send(&pb.Order{
        Id:          "1",
        Items:       []string{"6", "5", "4", "3", "2", "1"},
        Destination: "101",
    }); err != nil {
        log.Fatalf("send err: %v", err)
    }

    if err := stream.Send(&pb.Order{
        Id:          "2",
        Items:       []string{"61", "51", "41", "31", "21", "11"},
        Destination: "102",
    }); err != nil {
        log.Fatalf("send err: %v", err)
    }

    res, err := stream.CloseAndRecv()
    if err != nil {
        log.Fatalf("recv err: %v", err)
    }

    log.Printf("Update Orders Res: %s\n", res.GetValue())
}
  1. github 参考代码:https://github.com/helium-chain/grpc-test03

服务端/客户端(流)

  1. product.proto 文件。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
syntax = "proto3";

package ecommerce;

option go_package = "ecommerce/";

import "google/protobuf/wrappers.proto";

message Order {
  string id = 1;
  repeated string items = 2;
  string description = 3;
  float price = 4;
  string destination = 5;
}

message CombinedShipment {
  string id = 1;
  string status = 2;
  repeated Order orderList = 3;
}

service OrderManagement {
  rpc processOrders(stream google.protobuf.StringValue)
      returns (stream CombinedShipment);
}
  1. 生成 Go 和 gRPC 文件:
$ protoc -I ./pb \
--go_out ./ecommerce --go_opt paths=source_relative \
--go-grpc_out ./ecommerce --go-grpc_opt paths=source_relative \
./pb/product.proto
  1. 服务端实现:
    • 函数入参OrderManagement_ProcessOrdersServer是用来写入多个响应和读取多个消息的对象引用。
    • 可以通过调用这个流对象的Send(…),来往这个对象写入响应。
    • 可以通过调用这个流对象的Recv(…)函数读取消息,当Recv返回io.EOF代表流已经结束。
    • 通过返回nil或者error表示全部数据写完了。
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
package main

import (
    "crypto/tls"
    "fmt"
    "io"
    "log"
    "net"

    pb "example.com/learn-grpc-04/ecommerce"
    "google.golang.org/grpc"
    "google.golang.org/grpc/credentials"
)

func main() {
    // 方法一
    // creds, err1 := credentials.NewServerTLSFromFile(
    //		"/root/workspace/learn-grpc/key/test.pem",
    //		"/root/workspace/learn-grpc/key/test.key",
    //	)
    //
    //	if err1 != nil {
    //		fmt.Printf("证书错误:%v", err1)
    //		return
    //	}

    // 方法二
    cert, err := tls.LoadX509KeyPair(
        "/root/workspace/learn-grpc-02/key/test.pem",
        "/root/workspace/learn-grpc-02/key/test.key")
    if err != nil {
        fmt.Printf("私钥错误:%v", err)
        return
    }
    creds := credentials.NewServerTLSFromCert(&cert)

    listen, _ := net.Listen("tcp", ":9090")
    grpcServer := grpc.NewServer(grpc.Creds(creds))
    pb.RegisterOrderManagementServer(grpcServer, &service{})

    // 启动服务
    err = grpcServer.Serve(listen)
    if err != nil {
        fmt.Println(err)
        return
    }
}

var _ pb.OrderManagementServer = (*service)(nil)

var orders = make(map[string]pb.Order, 8)

const orderBatchSize = 8

func init() {
    // 测试数据
    orders["101"] = pb.Order{Id: "1", Items: []string{"1", "2", "3", "4", "5", "7"}, Destination: "101"}
    orders["102"] = pb.Order{Id: "2", Items: []string{"6", "5", "4", "3", "2", "0"}, Destination: "102"}
}

type service struct {
    pb.UnimplementedOrderManagementServer
}

func (s *service) ProcessOrders(stream pb.OrderManagement_ProcessOrdersServer) error {
    batchMarker := 1
    var combinedShipmentMap = make(map[string]pb.CombinedShipment)
    for {
        orderId, err := stream.Recv()
        log.Printf("Reading Proc order : %s", orderId)
        if err == io.EOF {
            log.Printf("EOF : %s", orderId)
            for _, shipment := range combinedShipmentMap {
                if err := stream.Send(&shipment); err != nil {
                    return err
                }
            }
            return nil
        }
        if err != nil {
            log.Println(err)
            return err
        }

        destination := orders[orderId.GetValue()].Destination
        shipment, found := combinedShipmentMap[destination]

        if found {
            ord := orders[orderId.GetValue()]
            shipment.OrderList = append(shipment.OrderList, &ord)
            combinedShipmentMap[destination] = shipment
        } else {
            comShip := pb.CombinedShipment{Id: "cmb - " + (orders[orderId.GetValue()].Destination), Status: "Processed!"}
            ord := orders[orderId.GetValue()]
            comShip.OrderList = append(shipment.OrderList, &ord)
            combinedShipmentMap[destination] = comShip
            log.Print(len(comShip.OrderList), comShip.GetId())
        }

        if batchMarker == orderBatchSize {
            for _, comb := range combinedShipmentMap {
                log.Printf("Shipping : %v -> %v", comb.Id, len(comb.OrderList))
                if err := stream.Send(&comb); err != nil {
                    return err
                }
            }
            batchMarker = 0
            combinedShipmentMap = make(map[string]pb.CombinedShipment)
        } else {
            batchMarker++
        }
    }
}
  1. 客户端实现。
    • 函数返回值OrderManagement_ProcessOrdersClient是用来获取多个响应和写入多个消息的对象引用。
    • 可以通过调用这个流对象的Send(…),来往这个对象写入响应。
    • 可以通过调用这个流对象的Recv(…)函数读取消息,当Recv返回io.EOF代表流已经结束。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
package main

import (
    "context"
    "fmt"
    "io"
    "log"
    "time"

    pb "example.com/learn-grpc-04/ecommerce"
    "google.golang.org/grpc"
    "google.golang.org/grpc/credentials"
    "google.golang.org/protobuf/types/known/wrapperspb"
)

func main() {
    creds, _ := credentials.NewClientTLSFromFile(
        "/root/workspace/learn-grpc-02/key/test.pem",
        "*.heliu.site",
    )

    var opts []grpc.DialOption
    // 不带TLS这里是grpc.WithTransportCredentials(insecure.NewCredentials())
    opts = append(opts, grpc.WithTransportCredentials(creds))

    // 连接server端,使用ssl加密通信
    conn, err := grpc.NewClient("127.0.0.1:9090", opts...)
    if err != nil {
        log.Fatalf("did not connect: %v", err)
    }

    defer conn.Close()

    client := pb.NewOrderManagementClient(conn)

    fmt.Printf("now-Time: %s\n", time.Now().Format(time.DateTime))
    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()

    stream, err := client.ProcessOrders(ctx)

    if err != nil {
        log.Fatalf("error when calling SearchOrders: %v", err)
    }

    go func() {
        if err := stream.Send(&wrapperspb.StringValue{Value: "101"}); err != nil {
            panic(err)
        }

        if err := stream.Send(&wrapperspb.StringValue{Value: "102"}); err != nil {
            panic(err)
        }

        if err := stream.CloseSend(); err != nil {
            panic(err)
        }
    }()

    for {
        combinedShipment, err := stream.Recv()
        if err == io.EOF {
            break
        }
        log.Println("Combined shipment : ", combinedShipment.OrderList)
    }
}
  1. github 参考代码:https://github.com/helium-chain/grpc-test04

参考

  1. 写给go开发者的gRPC教程-通信模式