• gRPC的拦截器和其他框架的拦截器(也称middleware)作用是一样的。利用拦截器我们可以在不侵入业务逻辑的前提下修改或者记录服务端或客户端的请求与响应,利用拦截器我们可以实现诸如日志记录、权限认证、限流等诸多功能
  • gRPC 拦截器分为服务端拦截器和客户端拦截器。
  • 服务端拦截器:是gRPC服务器在到达实际RPC方法之前调用的函数。它可以用于多种用途,例如日志记录、跟踪、速率限制、身份验证和授权。
  • 客户端拦截器:是gRPC客户端在调用实际RPC之前调用的函数。
  • 拦截器分为以下几种:
    • UnaryClientInterceptor:在客户端拦截所有一元 gRPC 调用。
    • UnaryServerInterceptor:在服务端拦截所有一元 gRPC 调用。
    • StreamClientInterceptor:在客户端拦截所有流 gRPC 调用。
    • StreamServerInterceptor:在服务端拦截所有流 gRPC 调用。

一元拦截器

客户端

  1. 作用:使用客户端元数据丰富消息的地方,例如有关客户端运行的硬件或操作系统的一些信息,或者可能启动跟踪流程。
  2. 客户端拦截器类型:
    • ctx context.Context:单个请求的上下文,一般和goroutine配合使用,起到超时控制的效果。
    • method string:当前调用的 RPC 方法名称。格式:/服务名/方法名
    • req any:本次请求的参数,只有在处理前阶段修改才有效。该参数是接口,存储的请求参数的地址,比如*server.HelloRequest
    • reply any:本次请求响应,需要在处理后阶段才能获取到。该参数是接口,存储的响应参数的地址,比如*server.HelloReply
    • cc *ClientConn:客户端与服务端的链接。
    • invoker UnaryInvoker:可以看作是当前RPC方法,一般在拦截器中调用invoker能达到调用RPC方法的效果,当然底层也是gRPC调用。
    • opts ...CallOption:RPC调用的所有配置项,包含设置到conn上的,也包含配置在每一个调用上的。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
// UnaryClientInterceptor intercepts the execution of a unary RPC on the client.
// Unary interceptors can be specified as a DialOption, using
// WithUnaryInterceptor() or WithChainUnaryInterceptor(), when creating a
// ClientConn. When a unary interceptor(s) is set on a ClientConn, gRPC
// delegates all unary RPC invocations to the interceptor, and it is the
// responsibility of the interceptor to call invoker to complete the processing
// of the RPC.
//
// method is the RPC name. req and reply are the corresponding request and
// response messages. cc is the ClientConn on which the RPC was invoked. invoker
// is the handler to complete the RPC and it is the responsibility of the
// interceptor to call it. opts contain all applicable call options, including
// defaults from the ClientConn as well as per-call options.
//
// The returned error must be compatible with the status package.
type UnaryClientInterceptor func(ctx context.Context, method string, req, reply any, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error
  1. 作为客户端拦截器,可以在处理前检查 req 看看本次请求带没带 token 之类的鉴权数据,没有的话就可以在拦截器中终止或带上。
  2. 使用示例:
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
func UnaryClientInterceptor() grpc.UnaryClientInterceptor {
    return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
        // method: /SayHello/SayHello
        // req: 接口装载的 *server.HelloRequest
        // reply: 接口返回的 *server.HelloReply
        // cc: grpc.ClientConn
        // invoker: grpc.UnaryInvoker
        // opts: []grpc.CallOption

        fmt.Printf("req: %v\n", req)

        // 获取 gRPC 方法名称
        // method: /SayHello/SayHello
        fmt.Printf("opts: %s\n", opts)

        // 获取 gRPC 方法的请求参数
        // req: 接口装载的 *server.HelloRequest
        fmt.Printf("req: %v\n", req)
        fmt.Printf("type: %T\n", req)

        // 获取 gRPC 方法的返回参数
        // reply: 接口返回的 *server.HelloReply
        fmt.Printf("reply: %v\n", reply)
        fmt.Printf("type: %T\n", reply)

        // 获取 gRPC 方法的 gRPC 客户端连接
        // cc: grpc.Client

        // 1) 预处理阶段
        start := time.Now()

        cos := runtime.GOOS // 获取操作系统
        // 将操作系统信息附加到元数据传出请求
        ctx = metadata.AppendToOutgoingContext(ctx, "client-os", cos)

        // 2) 调用 gRPC 方法
        // invoker 这里是真正调用 RPC 方法的地方。
        // 因此我们可以在调用前后增加自己的逻辑,比如调用前检查以下参数之类的,调用后记录下本次请求处理耗时等
        err := invoker(ctx, method, req, reply, cc, opts...)

        // 3) 后处理阶段
        end := time.Now()
        //  RPC: /SayHello/SayHello, client-os: linux, req: requestName:"gh", start-time: 2024-09-03 21:25:51.715107181 +0800 CST m=+0.000534541, end-time: 2024-09-03 21:25:51.717004906 +0800 CST m=+0.002432245, err: <nil>
        log.Printf("RPC: %s, client-os: %s, req: %v, start-time: %s, end-time: %s, err: %v\n", method, cos, req, start, end, err)
        // *server.HelloRequest
        fmt.Printf("req: %v\n", req)
        fmt.Printf("type: %T\n", req)

        return err
    }
}
  1. 拦截器其实就是一个函数,可以分为预处理调用RPC后处理三个阶段。
  2. 客户端通过 grpc.WithUnaryInterceptor() 方法指定要添加的拦截器:
    • 拦截器中的 ctx context.Context 就是这里的 client.SayHello(context.Background(), ...) 这里的 ctx。
    • 拦截器中的 method string 就是这里的 /SayHello/SayHello
    • 拦截器中的 req, reply any*pb.HelloRequest*pb.HelloResponse
    • 拦截器中的 cc *ClientConn 就是这里的 conn, err := grpc.NewClient(...) 这里的 conn。
    • 拦截器中的 invoker UnaryInvoker 相当于 client.SayHello() 方法调用。
    • 拦截器中的 opts ...CallOption 相当于这里的 client.SayHello() 的第三个参数。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
func main() {
    // ...

    var opts []grpc.DialOption
    // 不带TLS这里是grpc.WithTransportCredentials(insecure.NewCredentials())
    opts = append(opts, grpc.WithTransportCredentials(creds))
    opts = append(opts, grpc.WithPerRPCCredentials(&ClientTokenAuth{}))
    // 添加客户端拦截器
    opts = append(opts, grpc.WithUnaryInterceptor(interceptor.UnaryClientInterceptor()))

    // 连接server端,使用ssl加密通信
    conn, err := grpc.NewClient("127.0.0.1:9090", opts...)

    defer conn.Close()

    // 建立连接
    client := pb.NewSayHelloClient(conn)
    // 执行rpc调用(这个方法在服务器端来实现并返回结构)
    resp, err := client.SayHello(context.Background(), &pb.HelloRequest{RequestName: "gh", Age: 12})

    // ...
}

服务端

  1. 作用:可以对请求的真实性进行一些检查,例如对其进行授权,或者检查某些字段是否存在/验证请求。
  2. 服务器拦截器类型:
    • ctx context.Context:单个请求的上下文。
    • req any:RPC服务的请求结构体,接口也就是 *service.HelloRequest
    • info *UnaryServerInfo:RPC的服务信息。
      • info.FullMethod:请求方法名称,/服务名/方法名称
      • info.Server:也就是当前调用方法的实现类,比如 *main.server。
    • handler UnaryHandler:它包装了服务实现,通过调用它我们可以完成RPC并获取到响应。所以在调用它之前我们可以进行改写req或ctx、记录逻辑开始时间等操作。调用完handler即完成了RPC并获取到响应,我们不仅可以记录响应还可以改写响应。
1
2
3
4
5
// UnaryServerInterceptor provides a hook to intercept the execution of a unary RPC on the server. info
// contains all the information of this RPC the interceptor can operate on. And handler is the wrapper
// of the service method implementation. It is the responsibility of the interceptor to invoke handler
// to complete the RPC.
type UnaryServerInterceptor func(ctx context.Context, req any, info *UnaryServerInfo, handler UnaryHandler) (resp any, err error)
  1. 使用示例:
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
func UnaryServerInterceptor() grpc.UnaryServerInterceptor {
    return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
        // req:请求参数类型 *service.HelloRequest
        // info.FullMethod:请求方法名称,**/服务名/方法名称**

        fmt.Printf("req: %T\n", req)

        // 1) 预处理

        start := time.Now()

        // 从传入上下文获取元数据
        md, ok := metadata.FromIncomingContext(ctx)
        if !ok {
            return nil, fmt.Errorf("上下文中没有元数据")
        }

        // 检索客户端操作系统,如果它不存在说明此值为空
        os := md.Get("client-os")
        // 获取客户端IP地址
        ip, err := getClientIp(ctx)
        if err != nil {
            return nil, err
        }

        // 2) 调用 RPC 方法
        // 真正执行 RPC 方法,与 invoker 的调用不一样。因此我们可以在真正执行前后检查数据
        // 比如查看客户端操作系统和客户端IP地址、记录请求参数,耗时,错误信息等
        m, err := handler(ctx, req)

        // 3) 后处理
        end := time.Now()

        fmt.Printf("RPC: %s, client-os: '%v' and  IP: '%v', req: %v, start-time: %s, end-time: %s, err: %v",
            info.FullMethod, os, ip, req, start.Format(time.RFC3339), end.Format(time.RFC3339), err)

        return m, err
    }
}

func getClientIp(ctx context.Context) (string, error) {
    p, ok := peer.FromContext(ctx)

    if !ok {
        return "", fmt.Errorf("没有获取到IP")
    }

    return p.Addr.String(), nil
}
  1. 客户端通过 grpc.UnaryInterceptor() 方法指定要添加的拦截器:
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
func main() {
    // ...

    // 开启端口
    listen, _ := net.Listen("tcp", ":9090")
    // 创建grpc服务
    grpcServer := grpc.NewServer(grpc.Creds(creds),
        grpc.UnaryInterceptor(interceptor.UnaryServerInterceptor()))
    // 在grpc服务端中注册我们自己编写的服务
    pb.RegisterSayHelloServer(grpcServer, &server{})

    // ...
}

github

  1. github 代码:https://github.com/helium-chain/interceptor

流式拦截器

  1. 流拦截器过程和一元拦截器有所不同,同样分为三阶段:预处理、调用RPC方法、后处理。
  2. 预处理阶段和一元拦截器类似,参数调用RPC方法和后处理这两个阶段则完全不同。
  3. StreamAPI 的请求和响应都是通过 Stream 进行传递的,也就是通过 Streamer 调用 SendMsg 和 RecvMsg 这两个方法获取的。
  4. 然后 Streamer 又调用 RPC 方法来获取的,所以再流拦截器中我们可以对 Streamer 进行包装,然后实现 SendMsg 和 RecvMsg 这两个方法。

客户端

  1. 作用:比如将10个对象列表传输到服务器,例如文件或视频的块,我们可以再发送每个块之前拦截,并验证校验和等内容是否有效,将元数据添加到帧等。
  2. 客户端拦截器类型:
    • ctx context.Context:单个请求的上下文,一般和goroutine配合使用,起到超时控制的效果。
    • desc *StreamDesc:流描述信息。
    • cc *ClientConn:客户端信息。
    • method string:当前调用的 RPC 方法名称。格式:/服务名/方法名。
    • streamer Streamer:完成RPC请求的调用。
    • opts …CallOption:RPC调用的所有配置项,包含设置到conn上的,也包含配置在每一个调用上的。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
// StreamClientInterceptor intercepts the creation of a ClientStream. Stream
// interceptors can be specified as a DialOption, using WithStreamInterceptor()
// or WithChainStreamInterceptor(), when creating a ClientConn. When a stream
// interceptor(s) is set on the ClientConn, gRPC delegates all stream creations
// to the interceptor, and it is the responsibility of the interceptor to call
// streamer.
//
// desc contains a description of the stream. cc is the ClientConn on which the
// RPC was invoked. streamer is the handler to create a ClientStream and it is
// the responsibility of the interceptor to call it. opts contain all applicable
// call options, including defaults from the ClientConn as well as per-call
// options.
//
// StreamClientInterceptor may return a custom ClientStream to intercept all I/O
// operations. The returned error must be compatible with the status package.
type StreamClientInterceptor func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, streamer Streamer, opts ...CallOption) (ClientStream, error)
  1. 使用示例:
func StreamClientInterceptor() grpc.StreamClientInterceptor {
    return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
        // 1) 预处理
        // 1234-desc: &{Channel 0x846e20 true true}
        fmt.Printf("1234-desc: %v\n", desc)
        //  method: /SayHello/Channel
        log.Printf("method: %v\n", method) // /SayHello/Channel

        // 因为SendMsg和RecvMsg方法是ClientStream接口内方法,我们需要先调用streamer函数获取到ClientStream
        // 再对它进行封装,实现自己的 SendMsg 和 RecvMsg 方法。
        stream, err := streamer(ctx, desc, cc, method, opts...)
        return newStreamClient(stream), err
    }
}

type streamClient struct {
    grpc.ClientStream
}

func newStreamClient(c grpc.ClientStream) grpc.ClientStream {
    return &streamClient{c}
}

func (s *streamClient) SendMsg(m interface{}) error {
    // 2) 发送前,我们可以再这里对发送的消息处理
    fmt.Printf("SendMsg: %v\n m: %T\n", m, m) // SendMsg: value:"张三"\n m: *service.Request
    return s.ClientStream.SendMsg(m)
}

func (s *streamClient) RecvMsg(m interface{}) error {
    // 3) 在这里,我们可以对接收到的消息进行处理(发送前)
    fmt.Printf("RecvMsg: %v\n m:%T\n", m, m) // RecvMsg: m:*service.Response
    return s.ClientStream.RecvMsg(m)
    // 发送后
}
  1. 客户端通过 grpc.WithStreamInterceptor() 方法指定要添加的拦截器:
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
func main() {
    // ...
    var opts []grpc.DialOption
    // 不带TLS这里是grpc.WithTransportCredentials(insecure.NewCredentials())
    opts = append(opts, grpc.WithTransportCredentials(creds))
    opts = append(opts, grpc.WithPerRPCCredentials(&ClientTokenAuth{}))
    // 添加客户端拦截器
    opts = append(opts, grpc.WithUnaryInterceptor(interceptor.UnaryClientInterceptor()))
    // 添加流拦截器
    opts = append(opts, grpc.WithStreamInterceptor(interceptor.StreamClientInterceptor()))

    // 连接server端,使用ssl加密通信
    conn, err := grpc.NewClient("127.0.0.1:9090", opts...)
    // ...
}

服务端

  1. 作用:比如我们在接收上述文件快想验证在传输过程中没有丢失任何内容,并在存储之前再次验证校验和。
  2. 服务端拦截器类型:
    • srv any:服务实现,也就是 *main.server。
    • ss ServerStream:服务端视角的流。无论是哪一种流式RPC对于服务端来说发送(SendMsg)就代表着响应数据,接收(RecvMsg)就代表着请求数据,不同的流式RPC的区别就在于是多次发送数据(服务器端流式 RPC)还是多次接收数据(客户端流式 RPC)或者两者均有(双向流式 RPC)。因此仅使用这一个抽象就代表了所有的流式RPC场景
      • 通过该参数能设置Header、Trailer,获取 ctx,SendMsg、RecvMsg。
    • info *StreamServerInfo:RPC的服务信息
      • FullMethod string:方法名称。
      • IsClientStream bool:客户端是流模式吗?
      • IsServerStream bool:服务端是流模式吗?
    • handler StreamHandler:它包装了服务实现,通过调用它我们可以完成RPC。
1
2
3
4
5
// StreamServerInterceptor provides a hook to intercept the execution of a streaming RPC on the server.
// info contains all the information of this RPC the interceptor can operate on. And handler is the
// service method implementation. It is the responsibility of the interceptor to invoke handler to
// complete the RPC.
type StreamServerInterceptor func(srv any, ss ServerStream, info *StreamServerInfo, handler StreamHandler) error
  1. 使用示例:
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
func StreamServerInterceptor() grpc.StreamServerInterceptor {
    return func(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
        // 2024-srv: *main.server
        fmt.Printf("2024-srv: %T\n", srv)
        // 2024-ss: *grpc.serverStream
        fmt.Printf("2024-ss: %T\n", ss)
        // 2024-info: &{/SayHello/Channel true true}
        fmt.Printf("2024-info: %v\n", info)

        // ss 接口包含:
        //    SetHeader(metadata.MD) error
        //    SendHeader(metadata.MD) error
        //    SetTrailer(metadata.MD)
        //    Context() context.Context
        //    SendMsg(m any) error
        //    RecvMsg(m any) error

        wrapper := newStreamServer(ss)
        return handler(srv, wrapper)
    }
}

type streamServe struct {
    grpc.ServerStream
}

func newStreamServer(s grpc.ServerStream) grpc.ServerStream {
    return &streamServe{s}
}

func (s *streamServe) SendMsg(m interface{}) error {
    return s.ServerStream.SendMsg(m)
}

func (s *streamServe) RecvMsg(m interface{}) error {
    return s.ServerStream.RecvMsg(m)
}
  1. 服务端通过 grpc.StreamInterceptor() 方法指定要添加的拦截器:
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
func main() {
    // ...
    
    // 开启端口
    listen, _ := net.Listen("tcp", ":9090")
    // 创建grpc服务
    grpcServer := grpc.NewServer(grpc.Creds(creds),
        grpc.UnaryInterceptor(interceptor.UnaryServerInterceptor()),
        grpc.StreamInterceptor(interceptor.StreamServerInterceptor()))
    // 在grpc服务端中注册我们自己编写的服务
    pb.RegisterSayHelloServer(grpcServer, &server{})
    
    // ...
}

github

  1. github 代码:https://github.com/helium-chain/grpc-stream

拦截器链

  1. 服务器只能配置一个 unary interceptor和 stream interceptor,否则会报错,客户端也是,虽然不会报错,但是只有最后一个才起作用。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
// 服务端拦截器
s := grpc.NewServer(
    grpc.UnaryInterceptor(orderUnaryServerInterceptor),
    grpc.StreamInterceptor(orderStreamServerInterceptor),
)

// 客户端拦截器
conn, err := grpc.Dial("127.0.0.1:8009",
    grpc.WithInsecure(),
    grpc.WithUnaryInterceptor(orderUnaryClientInterceptor),
    grpc.WithStreamInterceptor(orderStreamClientInterceptor),
)
  1. 如果你想配置多个,可以使用拦截器链。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// 服务端拦截器
s := grpc.NewServer(
  grpc.ChainUnaryInterceptor(
      orderUnaryServerInterceptor1,
      orderUnaryServerInterceptor2,
  ),
  grpc.ChainStreamInterceptor(
      orderServerStreamInterceptor1,
      orderServerStreamInterceptor2,
  ),
)

// 客户端拦截器
conn, err := grpc.Dial("127.0.0.1:8009",
  grpc.WithInsecure(),
  grpc.WithChainUnaryInterceptor(
      orderUnaryClientInterceptor1,
      orderUnaryClientInterceptor2,
  ),
  grpc.WithChainStreamInterceptor(
      orderStreamClientInterceptor1,
      orderStreamClientInterceptor2,
  ),
)

第三方

  1. https://github.com/grpc-ecosystem/go-grpc-middleware

注意

  1. 默认的 gRPC 只能配置一个拦截器。如果想设置多个拦截器链可以自己实现,也可以使用第三方包。
  2. 比如,https://github.com/grpc-ecosystem/go-grpc-middleware。

参考

  1. 参考文档:https://blog.51cto.com/u_16213678/11565016