Functions

ClientSupportedCompressors(ctx)

  1. ClientSupportedCompressors返回客户端通过grpc-accept-encoding报头发布的压缩器名称。
  2. 所提供的上下文必须是传递给服务器处理程序的context。
// ClientSupportedCompressors returns compressor names advertised by the client
// via grpc-accept-encoding header.
//
// The context provided must be the context passed to the server's handler.
//
// # Experimental
//
// Notice: This function is EXPERIMENTAL and may be changed or removed in a
// later release.
func ClientSupportedCompressors(ctx context.Context) ([]string, error) {
    stream, ok := ServerTransportStreamFromContext(ctx).(*transport.Stream)
    if !ok || stream == nil {
        return nil, fmt.Errorf("failed to fetch the stream from the given context %v", ctx)
    }

    return stream.ClientAdvertisedCompressors(), nil
}

Method(ctx)

  1. 方法返回服务器上下文的方法字符串。返回的字符串格式为"/service/method"。
  2. 用于服务端获取请求方法名称。
// Method returns the method string for the server context.  The returned
// string is in the format of "/service/method".
func Method(ctx context.Context) (string, bool) {
    s := ServerTransportStreamFromContext(ctx)
    if s == nil {
        return "", false
    }
    return s.Method(), true
}

MethodFromServerStream(stream)

  1. MethodFromServerStream返回输入流的方法字符串。返回的字符串格式为"/service/method"。
  2. 用于服务端获取请求方法名称(流)。
// MethodFromServerStream returns the method string for the input stream.
// The returned string is in the format of "/service/method".
func MethodFromServerStream(stream ServerStream) (string, bool) {
    return Method(stream.Context())
}

SendHeader(ctx, md)

  1. SendHeader发送报头元数据。它可能最多被调用一次,并且可能不会在任何导致发送头的事件之后被调用(参见SetHeader以获得完整的列表)。将发送由SetHeader()设置的md和header。
  2. 返回的错误与状态包兼容。但是,状态码通常与客户端应用程序看到的RPC状态不匹配,因此不应该依赖于此目的。
// SendHeader sends header metadata. It may be called at most once, and may not
// be called after any event that causes headers to be sent (see SetHeader for
// a complete list).  The provided md and headers set by SetHeader() will be
// sent.
//
// The error returned is compatible with the status package.  However, the
// status code will often not match the RPC status as seen by the client
// application, and therefore, should not be relied upon for this purpose.
func SendHeader(ctx context.Context, md metadata.MD) error {
    stream := ServerTransportStreamFromContext(ctx)
    if stream == nil {
        return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
    }
    if err := stream.SendHeader(md); err != nil {
        return toRPCErr(err)
    }
    return nil
}

SetHeader(ctx, md)

  1. SetHeader设置从服务器发送到客户端的报头元数据。所提供的上下文必须是传递给服务器处理程序的上下文。
  2. 流式rpc应该更喜欢ServerStream的SetHeader方法。
  3. 当多次调用时,所有提供的元数据将被合并。当发生以下情况之一时,所有元数据将被发送出去:
    • 调用grpc.SendHeader,对于流处理程序,调用stream.SendHeader。
    • 发送第一条响应消息。对于一元处理程序,这在处理程序返回时发生;对于流处理程序,这可能在流的SendMsg方法被调用时发生。
    • 发送RPC状态(错误或成功)。这在处理程序返回时发生。
  4. 如果在上述任何事件之后调用SetHeader将失败。
  5. 返回的错误与状态包兼容。但是,状态码通常与客户端应用程序看到的RPC状态不匹配,因此不应该依赖于此目的。
// SetHeader sets the header metadata to be sent from the server to the client.
// The context provided must be the context passed to the server's handler.
//
// Streaming RPCs should prefer the SetHeader method of the ServerStream.
//
// When called multiple times, all the provided metadata will be merged.  All
// the metadata will be sent out when one of the following happens:
//
//   - grpc.SendHeader is called, or for streaming handlers, stream.SendHeader.
//   - The first response message is sent.  For unary handlers, this occurs when
//     the handler returns; for streaming handlers, this can happen when stream's
//     SendMsg method is called.
//   - An RPC status is sent out (error or success).  This occurs when the handler
//     returns.
//
// SetHeader will fail if called after any of the events above.
//
// The error returned is compatible with the status package.  However, the
// status code will often not match the RPC status as seen by the client
// application, and therefore, should not be relied upon for this purpose.
func SetHeader(ctx context.Context, md metadata.MD) error {
    if md.Len() == 0 {
        return nil
    }
    stream := ServerTransportStreamFromContext(ctx)
    if stream == nil {
        return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
    }
    return stream.SetHeader(md)
}

SetTrailer(ctx, md)

  1. SetTrailer设置RPC返回时将发送的尾部元数据。当多次调用时,所有提供的元数据将被合并。
  2. 返回的错误与状态包兼容。但是,状态码通常与客户端应用程序看到的RPC状态不匹配,因此不应该依赖于此目的。
// SetTrailer sets the trailer metadata that will be sent when an RPC returns.
// When called more than once, all the provided metadata will be merged.
//
// The error returned is compatible with the status package.  However, the
// status code will often not match the RPC status as seen by the client
// application, and therefore, should not be relied upon for this purpose.
func SetTrailer(ctx context.Context, md metadata.MD) error {
    if md.Len() == 0 {
        return nil
    }
    stream := ServerTransportStreamFromContext(ctx)
    if stream == nil {
        return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
    }
    return stream.SetTrailer(md)
}

Types

type Server

  1. Server是为RPC请求提供服务的gRPC服务器。
type Server struct {
    // contains filtered or unexported fields
}

NewServer(opt)

  1. NewServer创建一个gRPC服务器,该服务器没有注册服务,并且还没有开始接受请求。
// NewServer creates a gRPC server which has no service registered and has not
// started to accept requests yet.
func NewServer(opt ...ServerOption) *Server {
    opts := defaultServerOptions
    for _, o := range globalServerOptions {
        o.apply(&opts)
    }
    for _, o := range opt {
        o.apply(&opts)
    }
    s := &Server{
        lis:      make(map[net.Listener]bool),
        opts:     opts,
        conns:    make(map[string]map[transport.ServerTransport]bool),
        services: make(map[string]*serviceInfo),
        quit:     grpcsync.NewEvent(),
        done:     grpcsync.NewEvent(),
        channelz: channelz.RegisterServer(""),
    }
    chainUnaryServerInterceptors(s)
    chainStreamServerInterceptors(s)
    s.cv = sync.NewCond(&s.mu)
    if EnableTracing {
        _, file, line, _ := runtime.Caller(1)
        s.events = newTraceEventLog("grpc.Server", fmt.Sprintf("%s:%d", file, line))
    }

    if s.opts.numServerWorkers > 0 {
        s.initServerWorkers()
    }

    channelz.Info(logger, s.channelz, "Server created")
    return s
}

(s) GetServiceInfo()

  1. GetServiceInfo返回一个从服务名称到ServiceInfo的映射。服务名包括包名,形式为<package>.< Service >
// GetServiceInfo returns a map from service names to ServiceInfo.
// Service names include the package names, in the form of <package>.<service>.
func (s *Server) GetServiceInfo() map[string]ServiceInfo {
    ret := make(map[string]ServiceInfo)
    for n, srv := range s.services {
        methods := make([]MethodInfo, 0, len(srv.methods)+len(srv.streams))
        for m := range srv.methods {
            methods = append(methods, MethodInfo{
                Name:           m,
                IsClientStream: false,
                IsServerStream: false,
            })
        }
        for m, d := range srv.streams {
            methods = append(methods, MethodInfo{
                Name:           m,
                IsClientStream: d.ClientStreams,
                IsServerStream: d.ServerStreams,
            })
        }

        ret[n] = ServiceInfo{
            Methods:  methods,
            Metadata: srv.mdata,
        }
    }
    return ret
}
// ServiceInfo contains unary RPC method info, streaming RPC method info and metadata for a service.
type ServiceInfo struct {
    Methods []MethodInfo
    // Metadata is the metadata specified in ServiceDesc when registering service.
    Metadata any
}

// MethodInfo contains the information of an RPC including its method name and type.
type MethodInfo struct {
    // Name is the method name only, without the service name or package name.
    Name string
    // IsClientStream indicates whether the RPC is a client streaming RPC.
    IsClientStream bool
    // IsServerStream indicates whether the RPC is a server streaming RPC.
    IsServerStream bool
}
示例
// 服务端

func main() {
    // ...
    listen, _ := net.Listen("tcp", ":9090")
    // 创建grpc服务
    grpcServer := grpc.NewServer(grpc.Creds(creds),
        grpc.UnaryInterceptor(interceptor.UnaryServerInterceptor()),
        grpc.StreamInterceptor(interceptor.StreamServerInterceptor())) // 在grpc服务端中注册我们自己编写的服务

    // -------  GetServiceInfo  -------
    // GetServiceInfo01: map[]
    fmt.Printf("GetServiceInfo01: %v\n", grpcServer.GetServiceInfo())
    pb.RegisterSayHelloServer(grpcServer, &server{})
    // -------  GetServiceInfo  -------
    // GetServiceInfo02: map[SayHello:{[{SayHello false false} {Channel true true}] hello.proto}]
    fmt.Printf("GetServiceInfo02: %v\n", grpcServer.GetServiceInfo())

    // 启动服务
    err := grpcServer.Serve(listen)
    // ...
}

(s) GracefulStop()

  1. GracefulStop优雅停止gRPC服务器。它阻止服务器接受新的连接和rpc,并阻塞,直到所有挂起的rpc都完成。
// GracefulStop stops the gRPC server gracefully. It stops the server from
// accepting new connections and RPCs and blocks until all the pending RPCs are
// finished.
func (s *Server) GracefulStop() {
    s.stop(true)
}

(s) RegisterService(sd, ss)

  1. RegisterService将服务及其实现注册到gRPC服务器。它从IDL生成的代码中调用。这必须在调用Serve之前调用。
  2. 如果ss为非nil(对于遗留代码),则检查其类型以确保它实现sd.HandlerType。
// RegisterService registers a service and its implementation to the gRPC
// server. It is called from the IDL generated code. This must be called before
// invoking Serve. If ss is non-nil (for legacy code), its type is checked to
// ensure it implements sd.HandlerType.
func (s *Server) RegisterService(sd *ServiceDesc, ss any) {
    if ss != nil {
        ht := reflect.TypeOf(sd.HandlerType).Elem()
        st := reflect.TypeOf(ss)
        if !st.Implements(ht) {
            logger.Fatalf("grpc: Server.RegisterService found the handler of type %v that does not satisfy %v", st, ht)
        }
    }
    s.register(sd, ss)
}
示例
// 服务端

func main() {
    // ...
    listen, _ := net.Listen("tcp", ":9090")
    // 创建grpc服务
    grpcServer := grpc.NewServer(grpc.Creds(creds),
        grpc.UnaryInterceptor(interceptor.UnaryServerInterceptor()),
        grpc.StreamInterceptor(interceptor.StreamServerInterceptor())) // 在grpc服务端中注册我们自己编写的服务

    // GetServiceInfo01: map[]
    fmt.Printf("GetServiceInfo01: %v\n", grpcServer.GetServiceInfo())
    // -------  RegisterService  -------
    pb.RegisterSayHelloServer(grpcServer, &server{})
    // GetServiceInfo02: map[SayHello:{[{SayHello false false} {Channel true true}] hello.proto}]
    fmt.Printf("GetServiceInfo02: %v\n", grpcServer.GetServiceInfo())

    // 启动服务
    err := grpcServer.Serve(listen)
    // ...
}

// grpc
func RegisterSayHelloServer(s grpc.ServiceRegistrar, srv SayHelloServer) {
    // -------  RegisterService  -------
    s.RegisterService(&SayHello_ServiceDesc, srv)
}

(s) Serve(lis)

  1. service接受侦听器列表上的传入连接,为每个连接创建一个新的ServerTransport和服务例程。服务例程读取gRPC请求,然后调用已注册的处理程序来响应它们。发球时回球。Accept失败并出现致命错误。当此方法返回时,它将被关闭。除非调用Stop或GracefulStop,否则服务将返回一个非nil错误。
  2. 注意:所有支持的Go版本(截至2023年12月)都覆盖了操作系统默认的TCP保持时间和间隔为15秒。要启用TCP keepalive与操作系统默认的keepalive时间和间隔,调用者需要做以下两件事:
    • 传递通过在网络上调用Listen方法创建的ne.tListener。ListenConfig的’KeepAlive’字段设置为负值。这将导致Go标准库不会覆盖操作系统默认的TCP保持时间间隔和时间。但这也会导致Go标准库默认情况下不启用TCP keepalive。
    • 覆盖传入网络上的Accept方法。监听器并设置SO_KEEPALIVE套接字选项以启用TCP keepalive,与操作系统默认值。
// Serve accepts incoming connections on the listener lis, creating a new
// ServerTransport and service goroutine for each. The service goroutines
// read gRPC requests and then call the registered handlers to reply to them.
// Serve returns when lis.Accept fails with fatal errors.  lis will be closed when
// this method returns.
// Serve will return a non-nil error unless Stop or GracefulStop is called.
//
// Note: All supported releases of Go (as of December 2023) override the OS
// defaults for TCP keepalive time and interval to 15s. To enable TCP keepalive
// with OS defaults for keepalive time and interval, callers need to do the
// following two things:
//   - pass a net.Listener created by calling the Listen method on a
//     net.ListenConfig with the `KeepAlive` field set to a negative value. This
//     will result in the Go standard library not overriding OS defaults for TCP
//     keepalive interval and time. But this will also result in the Go standard
//     library not enabling TCP keepalives by default.
//   - override the Accept method on the passed in net.Listener and set the
//     SO_KEEPALIVE socket option to enable TCP keepalives, with OS defaults.
func (s *Server) Serve(lis net.Listener) error {
    s.mu.Lock()
    s.printf("serving")
    s.serve = true
    if s.lis == nil {
        // Serve called after Stop or GracefulStop.
        s.mu.Unlock()
        lis.Close()
        return ErrServerStopped
    }

    s.serveWG.Add(1)
    defer func() {
        s.serveWG.Done()
        if s.quit.HasFired() {
            // Stop or GracefulStop called; block until done and return nil.
            <-s.done.Done()
        }
    }()

    ls := &listenSocket{
        Listener: lis,
        channelz: channelz.RegisterSocket(&channelz.Socket{
            SocketType:    channelz.SocketTypeListen,
            Parent:        s.channelz,
            RefName:       lis.Addr().String(),
            LocalAddr:     lis.Addr(),
            SocketOptions: channelz.GetSocketOption(lis)},
        ),
    }
    s.lis[ls] = true

    defer func() {
        s.mu.Lock()
        if s.lis != nil && s.lis[ls] {
            ls.Close()
            delete(s.lis, ls)
        }
        s.mu.Unlock()
    }()

    s.mu.Unlock()
    channelz.Info(logger, ls.channelz, "ListenSocket created")

    var tempDelay time.Duration // how long to sleep on accept failure
    for {
        rawConn, err := lis.Accept()
        if err != nil {
            if ne, ok := err.(interface {
                Temporary() bool
            }); ok && ne.Temporary() {
                if tempDelay == 0 {
                    tempDelay = 5 * time.Millisecond
                } else {
                    tempDelay *= 2
                }
                if max := 1 * time.Second; tempDelay > max {
                    tempDelay = max
                }
                s.mu.Lock()
                s.printf("Accept error: %v; retrying in %v", err, tempDelay)
                s.mu.Unlock()
                timer := time.NewTimer(tempDelay)
                select {
                case <-timer.C:
                case <-s.quit.Done():
                    timer.Stop()
                    return nil
                }
                continue
            }
            s.mu.Lock()
            s.printf("done serving; Accept = %v", err)
            s.mu.Unlock()

            if s.quit.HasFired() {
                return nil
            }
            return err
        }
        tempDelay = 0
        // Start a new goroutine to deal with rawConn so we don't stall this Accept
        // loop goroutine.
        //
        // Make sure we account for the goroutine so GracefulStop doesn't nil out
        // s.conns before this conn can be added.
        s.serveWG.Add(1)
        go func() {
            s.handleRawConn(lis.Addr().String(), rawConn)
            s.serveWG.Done()
        }()
    }
}
示例
// 服务端

func main() {
    // ...
    listen, _ := net.Listen("tcp", ":9090")
    // 创建grpc服务
    grpcServer := grpc.NewServer(grpc.Creds(creds),
        grpc.UnaryInterceptor(interceptor.UnaryServerInterceptor()),
        grpc.StreamInterceptor(interceptor.StreamServerInterceptor())) // 在grpc服务端中注册我们自己编写的服务

    // GetServiceInfo01: map[]
    fmt.Printf("GetServiceInfo01: %v\n", grpcServer.GetServiceInfo())
    pb.RegisterSayHelloServer(grpcServer, &server{})
    // GetServiceInfo02: map[SayHello:{[{SayHello false false} {Channel true true}] hello.proto}]
    fmt.Printf("GetServiceInfo02: %v\n", grpcServer.GetServiceInfo())

    // 启动服务
    // -------  Serve  -------
    err := grpcServer.Serve(listen)
    // ...
}

(s) ServeHTTP(w, r)

  1. 注意:此API是实验性的,可能会在以后的版本中更改或删除。
func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request)

(s) Stop()

  1. Stop表示停止gRPC服务器。它立即关闭所有打开的连接和侦听器。它将取消服务器端上所有活动的rpc,而客户端上相应的挂起rpc将收到连接错误的通知。
// Stop stops the gRPC server. It immediately closes all open
// connections and listeners.
// It cancels all active RPCs on the server side and the corresponding
// pending RPCs on the client side will get notified by connection
// errors.
func (s *Server) Stop() {
    s.stop(false)
}

type ServerOption

ChainStreamInterceptor(interceptors)

  1. ChainStreamInterceptor返回一个ServerOption,用于指定流rpc的链式拦截器。
  2. 第一个拦截器将是最外层的,而最后一个拦截器将是围绕实际调用的最内层的包装器。
  3. 此方法添加的所有流拦截器将被链接。
// ChainStreamInterceptor returns a ServerOption that specifies the chained interceptor
// for streaming RPCs. The first interceptor will be the outer most,
// while the last interceptor will be the inner most wrapper around the real call.
// All stream interceptors added by this method will be chained.
func ChainStreamInterceptor(interceptors ...StreamServerInterceptor) ServerOption {
    return newFuncServerOption(func(o *serverOptions) {
        o.chainStreamInts = append(o.chainStreamInts, interceptors...)
    })
}
示例
// 服务端拦截器
s := grpc.NewServer(
  grpc.ChainUnaryInterceptor(
    orderUnaryServerInterceptor1,
    orderUnaryServerInterceptor2,
  ),
  grpc.ChainStreamInterceptor(
    orderServerStreamInterceptor1,
    orderServerStreamInterceptor2,
  ),
)

ChainUnaryInterceptor(interceptors)

  1. ChainUnaryInterceptor返回一个ServerOption,用于指定一元rpc的链式拦截器。
  2. 第一个拦截器将是最外层的,而最后一个拦截器将是围绕实际调用的最内层的包装器。
  3. 此方法添加的所有一元拦截器都将被链接。
// ChainUnaryInterceptor returns a ServerOption that specifies the chained interceptor
// for unary RPCs. The first interceptor will be the outer most,
// while the last interceptor will be the inner most wrapper around the real call.
// All unary interceptors added by this method will be chained.
func ChainUnaryInterceptor(interceptors ...UnaryServerInterceptor) ServerOption {
    return newFuncServerOption(func(o *serverOptions) {
        o.chainUnaryInts = append(o.chainUnaryInts, interceptors...)
    })
}
示例
// 服务端拦截器
s := grpc.NewServer(
  grpc.ChainUnaryInterceptor(
    orderUnaryServerInterceptor1,
    orderUnaryServerInterceptor2,
  ),
  grpc.ChainStreamInterceptor(
    orderServerStreamInterceptor1,
    orderServerStreamInterceptor2,
  ),
)

ConnectionTimeout(d)

  1. ConnectionTimeout返回一个ServerOption,它为所有新连接设置连接建立超时(包括HTTP/2握手)。
  2. 如果不设置,则默认为120秒。零或负值将导致立即超时。
  3. 注意:此API是实验性的,可能会在以后的版本中更改或删除。
// ConnectionTimeout returns a ServerOption that sets the timeout for
// connection establishment (up to and including HTTP/2 handshaking) for all
// new connections.  If this is not set, the default is 120 seconds.  A zero or
// negative value will result in an immediate timeout.
//
// # Experimental
//
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
// later release.
func ConnectionTimeout(d time.Duration) ServerOption {
    return newFuncServerOption(func(o *serverOptions) {
        o.connectionTimeout = d
    })
}
  1. 服务端也可以控制连接创建的超时时间,如果没有在设定的时间内建立连接,服务端就会主动断连,避免浪费服务端的端口、内存等资源。
s := grpc.NewServer(
    grpc.ConnectionTimeout(3*time.Second),
)

Creds(c)

  1. Creds返回一个为服务器连接设置凭据的ServerOption。
// Creds returns a ServerOption that sets credentials for server connections.
func Creds(c credentials.TransportCredentials) ServerOption {
    return newFuncServerOption(func(o *serverOptions) {
        o.creds = c
    })
}
示例
func main() {
    // 绝对地址
    // 方法一
    creds, err1 := credentials.NewServerTLSFromFile(
        "/root/workspace/learn-grpc/key/test.pem",
        "/root/workspace/learn-grpc/key/test.key",
    )

    if err1 != nil {
        fmt.Printf("证书错误:%v", err1)
        return
    }

    // 方法二
    //cert, err := tls.LoadX509KeyPair("", "")
    //if err != nil {
    //	fmt.Printf("私钥错误:%v", err)
    //	return
    //}
    //
    //creds := credentials.NewServerTLSFromCert(&cert)
    // 开启端口
    listen, _ := net.Listen("tcp", ":9090")
    // 创建grpc服务
    grpcServer := grpc.NewServer(grpc.Creds(creds), // ---------  Creds  ---------
        grpc.UnaryInterceptor(interceptor.UnaryServerInterceptor()),
        grpc.StreamInterceptor(interceptor.StreamServerInterceptor())) // 在grpc服务端中注册我们自己编写的服务

    // GetServiceInfo01: map[]
    fmt.Printf("GetServiceInfo01: %v\n", grpcServer.GetServiceInfo())
    pb.RegisterSayHelloServer(grpcServer, &server{})
    // GetServiceInfo02: map[SayHello:{[{SayHello false false} {Channel true true}] hello.proto}]
    fmt.Printf("GetServiceInfo02: %v\n", grpcServer.GetServiceInfo())

    // 启动服务
    err := grpcServer.Serve(listen)
    if err != nil {
        fmt.Println(err)
        return
    }
}

ForceServerCodec(codec)

  1. ForceServerCodec返回一个ServerOption,该选项为消息封送和反封送设置编解码器。
  2. 这将覆盖任何按内容子类型查找注册在RegisterCodec的编解码器。
  3. 详细信息请参见https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests上的内容类型。关于编码之间交互的更多细节,请参见RegisterCodec和CallContentSubtype的文档。编解码器和内容子类型。
  4. 此功能为高级用户提供;建议使用encoding.RegisterCodec注册编解码器。服务器将根据传入请求的报头自动使用注册的编解码器。参见https://github.com/grpc/grpc-go/blob/master/Documentation/encoding.md#using-a-codec。将在整个1.x中得到支持。
  5. 注意:此API是实验性的,可能会在以后的版本中更改或删除。
// ForceServerCodec returns a ServerOption that sets a codec for message
// marshaling and unmarshaling.
//
// This will override any lookups by content-subtype for Codecs registered
// with RegisterCodec.
//
// See Content-Type on
// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests for
// more details. Also see the documentation on RegisterCodec and
// CallContentSubtype for more details on the interaction between encoding.Codec
// and content-subtype.
//
// This function is provided for advanced users; prefer to register codecs
// using encoding.RegisterCodec.
// The server will automatically use registered codecs based on the incoming
// requests' headers. See also
// https://github.com/grpc/grpc-go/blob/master/Documentation/encoding.md#using-a-codec.
// Will be supported throughout 1.x.
//
// # Experimental
//
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
// later release.
func ForceServerCodec(codec encoding.Codec) ServerOption {
    return newFuncServerOption(func(o *serverOptions) {
        o.codec = newCodecV1Bridge(codec)
    })
}

ForceServerCodecV2(codecV2)

  1. ForceServerCodecV2相当于ForceServerCodec,但用于新的CodecV2接口。
  2. 将在整个1.x中得到支持。
  3. 注意:此API是实验性的,可能会在以后的版本中更改或删除。
// ForceServerCodecV2 is the equivalent of ForceServerCodec, but for the new
// CodecV2 interface.
//
// Will be supported throughout 1.x.
//
// # Experimental
//
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
// later release.
func ForceServerCodecV2(codecV2 encoding.CodecV2) ServerOption {
    return newFuncServerOption(func(o *serverOptions) {
        o.codec = codecV2
    })
}

HeaderTableSize(s)

  1. HeaderTableSize返回一个ServerOption,用于设置流的动态报头表的大小。
  2. 注意:此API是实验性的,可能会在以后的版本中更改或删除。
// HeaderTableSize returns a ServerOption that sets the size of dynamic
// header table for stream.
//
// # Experimental
//
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
// later release.
func HeaderTableSize(s uint32) ServerOption {
    return newFuncServerOption(func(o *serverOptions) {
        o.headerTableSize = &s
    })
}

InTapHandle(h)

  1. InTapHandle返回一个ServerOption,它为要创建的所有服务器传输设置tap句柄。只能安装一个。
// InTapHandle returns a ServerOption that sets the tap handle for all the server
// transport to be created. Only one can be installed.
//
// # Experimental
//
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
// later release.
func InTapHandle(h tap.ServerInHandle) ServerOption {
    return newFuncServerOption(func(o *serverOptions) {
        if o.inTapHandle != nil {
            panic("The tap handle was already set and may not be reset.")
        }
        o.inTapHandle = h
    })
}

InitialConnWindowSize(s)

  1. InitialConnWindowSize返回一个为连接设置窗口大小的ServerOption。窗口大小的下限是64K,任何小于这个值的值都将被忽略。
// InitialConnWindowSize returns a ServerOption that sets window size for a connection.
// The lower bound for window size is 64K and any value smaller than that will be ignored.
func InitialConnWindowSize(s int32) ServerOption {
    return newFuncServerOption(func(o *serverOptions) {
        o.initialConnWindowSize = s
    })
}
详解
  1. 指定了服务器端连接的初始窗口大小。这个窗口大小用于流量控制,决定了在没有接收到确认的情况下,服务器可以发送多少数据。
  2. 作用:这个选项设置了服务器端在建立新连接时分配给该连接的初始窗口大小。窗口大小决定了在没有接收到客户端的流量控制确认(ACK)之前,服务器可以发送的数据量。
    • 通过限制初始发送的数据量,可以避免在不知道网络状况的情况下发送过多数据,导致网络拥塞。
    • 适当设置初始窗口大小可以加快数据的初始传输,从而提高连接建立的速度。
  3. 使用这个函数的场景可能包括:
    • 当你希望优化网络流量控制,特别是在网络延迟较高或者带宽有限的情况下。
    • 当你的应用程序需要发送大量数据,并且希望减少因流量控制而导致的延迟。
  4. 在TCP协议中,窗口大小是一个流量控制机制,用于限制发送方在未收到接收方的确认(ACK)之前可以发送的数据量。窗口大小通常以字节为单位,它可以根据网络状况动态调整。
  5. 总的来说,初始连接窗口大小是TCP连接中一个重要的性能调优参数,它影响着数据传输的效率和网络的稳定性。

InitialWindowSize(s)

  1. InitialWindowSize返回一个为流设置窗口大小的ServerOption。窗口大小的下限是64K,任何小于这个值的值都将被忽略。
// InitialWindowSize returns a ServerOption that sets window size for stream.
// The lower bound for window size is 64K and any value smaller than that will be ignored.
func InitialWindowSize(s int32) ServerOption {
    return newFuncServerOption(func(o *serverOptions) {
        o.initialWindowSize = s
    })
}
详解
  1. 它指定了为每个流(stream)的初始窗口大小。这个窗口大小用于流量控制,决定了在没有接收到确认的情况下,服务器可以发送多少数据给特定的流。
  2. 作用: 这个选项设置了服务器端在为每个新流分配的初始窗口大小。窗口大小决定了在没有接收到客户端的流量控制确认(ACK)之前,服务器可以发送的数据量。
  3. 需要注意的是,InitialConnWindowSizeInitialWindowSize的区别在于前者是针对整个连接的,而后者是针对单个流的。调整InitialWindowSize可能会影响服务器处理多个并发流时的性能和资源分配。

KeepaliveEnforcementPolicy(kep)

  1. KeepaliveEnforcementPolicy返回一个为服务器设置keepalive强制策略的ServerOption。
// KeepaliveEnforcementPolicy returns a ServerOption that sets keepalive enforcement policy for the server.
func KeepaliveEnforcementPolicy(kep keepalive.EnforcementPolicy) ServerOption {
    return newFuncServerOption(func(o *serverOptions) {
        o.keepalivePolicy = kep
    })
}
详解
  1. 它指定了服务器端的保活(keepalive)强制策略。这个策略用于管理客户端的保活行为,确保客户端在指定的时间内发送保活探测消息,以维持连接的活跃性。
  2. 作用: 这个选项设置了服务器端如何强制执行保活策略。保活强制策略包括以下几个关键配置:
    • MinTime:客户端发送保活探测消息的最小间隔时间。
    • PermitWithoutStream:即使没有活动流,是否允许客户端发送保活探测消息。
  3. 使用场景: 使用保活强制策略的场景可能包括:
    • 确保客户端定期发送保活探测消息,以检测和维持连接的活跃性。
    • 防止因客户端不发送保活探测消息而导致的连接长时间空闲。
    • 在网络不稳定或存在网络设备配置导致连接可能被意外关闭的情况下,通过保活探测来维持连接。
package main

import (
    "context"
    "net"
    "time"

    "google.golang.org/grpc"
    "google.golang.org/grpc/keepalive"
)

func main() {
    lis, err := net.Listen("tcp", ":50051")
    if err != nil {
        // 错误处理
    }

    keepalivePolicy := keepalive.EnforcementPolicy{
        MinTime:             5 * time.Second,  // 客户端发送保活探测消息的最小间隔时间
        PermitWithoutStream: true,             // 允许在没有活动流的情况下发送保活探测消息
    }

    opts := []grpc.ServerOption{
        grpc.KeepaliveEnforcementPolicy(keepalivePolicy),
    }

    s := grpc.NewServer(opts...)
    // 注册服务...

    s.Serve(lis)
}
  1. 在这个例子中,我们设置了保活强制策略,要求客户端至少每5秒发送一次保活探测消息,并且即使没有活动流也可以发送。我们还设置了连接的最大空闲时间和最大存活时间,以确保连接在这些时间范围内保持活跃,或者被适当地关闭。

KeepaliveParams(kp)

  1. KeepaliveParams返回一个ServerOption,用于设置服务器的keepalive和max-age参数。
// KeepaliveParams returns a ServerOption that sets keepalive and max-age parameters for the server.
func KeepaliveParams(kp keepalive.ServerParameters) ServerOption {
    if kp.Time > 0 && kp.Time < internal.KeepaliveMinServerPingTime {
        logger.Warning("Adjusting keepalive ping interval to minimum period of 1s")
        kp.Time = internal.KeepaliveMinServerPingTime
    }

    return newFuncServerOption(func(o *serverOptions) {
        o.keepaliveParams = kp
    })
}
详解
  1. 它指定了服务器端的保活(keepalive)参数和最大存活时间(max-age)参数。这些参数用于管理服务器如何发送保活探测消息,以及如何处理连接的存活时间。
  2. 作用: 这个选项设置了服务器端发送保活探测消息的参数,以及如何处理连接的存活时间。keepalive.ServerParameters包含以下关键配置:
    • Time:服务器发送保活探测消息的间隔时间。如果设置的时间小于1秒,将会被调整为最小周期1秒。
    • Timeout:服务器等待保活探测消息响应的超时时间。如果在这个时间内没有收到响应,服务器可能会关闭连接。
    • MaxConnectionAge:连接的最大存活时间。如果连接存活时间超过这个值,服务器可能会关闭连接。
    • MaxConnectionAgeGrace:在MaxConnectionAge之后,服务器允许连接存在的额外时间,以便完成正在进行的工作。
    • MaxConnectionIdle:连接的最大空闲时间,超过这个时间且没有活动流,服务器可能会关闭连接。
  3. 使用场景: 使用保活参数的场景可能包括:
    • 确保服务器定期发送保活探测消息,以检测连接的活跃性。
    • 在网络不稳定的情况下,通过保活探测来维持连接。
    • 设置连接的最大存活时间,以避免长时间占用资源。
package main

import (
    "context"
    "net"
    "time"

    "google.golang.org/grpc"
    "google.golang.org/grpc/keepalive"
)

func main() {
    lis, err := net.Listen("tcp", ":50051")
    if err != nil {
        // 错误处理
    }

    keepaliveParams := keepalive.ServerParameters{
        Time:                5 * time.Second,  // 服务器发送保活探测消息的间隔时间
        Timeout:             2 * time.Second,  // 服务器等待保活探测消息响应的超时时间
        MaxConnectionAge:    30 * time.Second, // 连接的最大存活时间
        MaxConnectionAgeGrace: 5 * time.Second, // 连接最大存活时间之后的额外时间
    }

    opts := []grpc.ServerOption{
        grpc.KeepaliveParams(keepaliveParams),
    }

    s := grpc.NewServer(opts...)
    // 注册服务...

    s.Serve(lis)
}
  1. 在这个例子中,我们设置了服务器每5秒发送一次保活探测消息,并且如果在2秒内没有收到响应,则认为连接超时。我们还设置了连接的最大存活时间为30秒,并在达到最大存活时间后提供5秒的额外时间来处理可能的结束工作。这些设置有助于确保连接的稳定性和资源的有效管理。

MaxConcurrentStreams(n)

  1. MaxConcurrentStreams返回一个ServerOption,它将对每个ServerTransport的并发流数量施加限制。
// MaxConcurrentStreams returns a ServerOption that will apply a limit on the number
// of concurrent streams to each ServerTransport.
func MaxConcurrentStreams(n uint32) ServerOption {
    if n == 0 {
        n = math.MaxUint32
    }
    return newFuncServerOption(func(o *serverOptions) {
        o.maxConcurrentStreams = n
    })
}
详解
  1. 它指定了每个服务器传输(ServerTransport)可以处理的最大并发流(stream)数量。
  2. 这个选项用于限制服务器在任何给定时间点可以同时处理的流数量,从而可以控制服务器的资源使用。
  3. 作用:这个选项限制了服务器可以同时处理的最大流数量。如果设置为0,则默认为无限制,即math.MaxUint32,这实际上意味着没有限制。
  4. 使用场景: 使用最大并发流数量的场景可能包括:
    • 当服务器资源有限,需要避免过载。
    • 当需要保证服务质量,通过限制并发流数量来保证每个流的处理质量。
    • 在某些情况下,出于安全考虑,需要限制单个客户端可以发起的流数量。
package main

import (
    "context"
    "net"

    "google.golang.org/grpc"
)

func main() {
    lis, err := net.Listen("tcp", ":50051")
    if err != nil {
        // 错误处理
    }

    opts := []grpc.ServerOption{
        grpc.MaxConcurrentStreams(100), // 设置每个服务器传输的最大并发流数量为100
    }

    s := grpc.NewServer(opts...)
    // 注册服务...

    s.Serve(lis)
}
  1. 在这个例子中,我们将每个服务器传输的最大并发流数量设置为100。这意味着在任何给定时间点,服务器最多可以处理100个并发流。如果尝试创建更多的流,新的流可能会被服务器拒绝,直到一些现有的流完成或被取消。
  2. 需要注意的是,这个设置是针对每个服务器传输的,而不是整个服务器。在多连接场景下,每个连接(即每个服务器传输)都有自己的并发流限制。

MaxHeaderListSize(s)

  1. MaxHeaderListSize返回一个ServerOption,用于设置服务器准备接受的报头列表的最大(未压缩)大小。
// MaxHeaderListSize returns a ServerOption that sets the max (uncompressed) size
// of header list that the server is prepared to accept.
func MaxHeaderListSize(s uint32) ServerOption {
    return MaxHeaderListSizeServerOption{
        MaxHeaderListSize: s,
    }
}
详解
  1. 它指定了服务器愿意接受的头列表(header list)的最大(未压缩)大小。这个选项用于限制客户端可以在HTTP/2请求头中发送的头信息总大小,以防止过大的头信息攻击或资源消耗。
  2. 作用:这个选项限制了单个HTTP/2请求中头信息的最大未压缩大小。如果客户端尝试发送一个超过这个大小的头列表,服务器可能会拒绝该请求。
  3. 使用场景:使用最大头列表大小的场景可能包括:
    • 防止恶意客户端发送大量头信息,导致服务器资源耗尽(例如,拒绝服务攻击)。
    • 限制客户端发送的头信息大小,以保持服务器性能和稳定性。
package main

import (
    "context"
    "net"

    "google.golang.org/grpc"
)

func main() {
    lis, err := net.Listen("tcp", ":50051")
    if err != nil {
        // 错误处理
    }

    opts := []grpc.ServerOption{
        grpc.MaxHeaderListSize(16384), // 设置头列表的最大大小为16KB
    }

    s := grpc.NewServer(opts...)
    // 注册服务...

    s.Serve(lis)
}
  1. 在这个例子中,我们将头列表的最大大小设置为16KB。这意味着如果客户端尝试发送超过16KB的头信息,服务器可能会拒绝该请求。这个设置有助于保护服务器免受过大的头信息攻击,并确保服务器资源得到合理使用。

MaxRecvMsgSize(m)

  1. MaxRecvMsgSize返回一个ServerOption来设置服务器可以接收的最大消息大小(以字节为单位)。如果没有设置,gRPC使用默认的4MB。
// MaxRecvMsgSize returns a ServerOption to set the max message size in bytes the server can receive.
// If this is not set, gRPC uses the default 4MB.
func MaxRecvMsgSize(m int) ServerOption {
    return newFuncServerOption(func(o *serverOptions) {
        o.maxReceiveMessageSize = m
    })
}
详解
  1. 它用于设置服务器能够接收的最大消息大小(以字节为单位)。
  2. 这个选项非常重要,因为它可以帮助服务器防止因接收过大的消息而导致的资源耗尽或性能问题。
  3. 作用
    • 资源保护:通过限制消息大小,可以防止客户端发送过大的消息,从而保护服务器资源不被耗尽。
    • 性能优化:限制消息大小有助于避免服务器处理大消息时出现的性能瓶颈。
  4. 使用场景
    • 当服务器预期会处理大量或大尺寸的消息时,设置一个合理的最大接收消息大小是很有用的。
    • 在分布式系统中,限制消息大小可以帮助维持各个服务之间的健康通信。
package main

import (
    "net"

    "google.golang.org/grpc"
)

func main() {
    lis, err := net.Listen("tcp", ":50051")
    if err != nil {
        // 错误处理
    }

    // 设置服务器最大接收消息大小为10MB
    opts := []grpc.ServerOption{
        grpc.MaxRecvMsgSize(10 * 1024 * 1024),
    }

    s := grpc.NewServer(opts...)
    // 注册服务...

    s.Serve(lis)
}
  1. 在这个例子中,我们将服务器配置为接收最大10MB的消息。如果客户端尝试发送超过这个大小的消息,服务器将拒绝接收该消息,并可能返回一个错误。这样的配置有助于确保服务器能够稳定运行,即使在处理大量数据的情况下。

MaxSendMsgSize(m)

  1. MaxSendMsgSize返回一个ServerOption来设置服务器可以发送的最大消息大小(以字节为单位)。如果没有设置,gRPC使用默认的math.MaxInt32
// MaxSendMsgSize returns a ServerOption to set the max message size in bytes the server can send.
// If this is not set, gRPC uses the default `math.MaxInt32`.
func MaxSendMsgSize(m int) ServerOption {
    return newFuncServerOption(func(o *serverOptions) {
        o.maxSendMessageSize = m
    })
}
详解
  1. 它用于设置服务器能够发送的最大消息大小(以字节为单位)。这个选项对于控制服务器响应的大小非常重要,尤其是在需要限制发送到客户端的数据量时。
  2. 默认值: 如果不显式设置 MaxSendMsgSize,gRPC服务器默认的最大发送消息大小是 math.MaxInt32,这是一个非常大的值,实际上相当于没有限制。
  3. 作用
    • 流量控制:通过限制发送消息的大小,可以防止服务器发送过大的响应,从而帮助控制网络流量。
    • 资源管理:在处理大型数据集或大量数据传输时,限制发送大小有助于避免服务器资源过度使用。
  4. 使用场景
    • 当服务器需要发送大量数据到客户端,但又不希望占用过多网络带宽或客户端资源时。
    • 在某些网络环境下,可能需要对发送的数据大小进行限制,以避免网络拥塞。
package main

import (
    "net"

    "google.golang.org/grpc"
)

func main() {
    lis, err := net.Listen("tcp", ":50051")
    if err != nil {
        // 错误处理
    }

    // 设置服务器最大发送消息大小为10MB
    opts := []grpc.ServerOption{
        grpc.MaxSendMsgSize(10 * 1024 * 1024),
    }

    s := grpc.NewServer(opts...)
    // 注册服务...

    s.Serve(lis)
}
  1. 在这个例子中,我们将服务器配置为发送最大10MB的消息。如果服务器尝试发送超过这个大小的消息,它将返回一个错误。这样的配置有助于确保服务器在发送大量数据时不会超过网络或客户端的处理能力。

NumStreamWorkers(numServerWorkers)

  1. NumStreamWorkers返回一个ServerOption,用于设置应该用于处理传入流的工作线程goroutine的数量。
  2. 将此值设置为零(默认值)将禁用worker并为每个流生成一个新的goroutine。
  3. 注意:此API是实验性的,可能会在以后的版本中更改或删除。
// NumStreamWorkers returns a ServerOption that sets the number of worker
// goroutines that should be used to process incoming streams. Setting this to
// zero (default) will disable workers and spawn a new goroutine for each
// stream.
//
// # Experimental
//
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
// later release.
func NumStreamWorkers(numServerWorkers uint32) ServerOption {
    // TODO: If/when this API gets stabilized (i.e. stream workers become the
    // only way streams are processed), change the behavior of the zero value to
    // a sane default. Preliminary experiments suggest that a value equal to the
    // number of CPUs available is most performant; requires thorough testing.
    return newFuncServerOption(func(o *serverOptions) {
        o.numServerWorkers = numServerWorkers
    })
}

ReadBufferSize(s)

  1. ReadBufferSize允许您设置读缓冲区的大小,这决定了一个读系统调用最多可以读取多少数据。
  2. 这个缓冲区的默认值是32KB。0或负值将禁用连接的读缓冲区,以便数据帧可以直接访问底层连接。
// ReadBufferSize lets you set the size of read buffer, this determines how much
// data can be read at most for one read syscall. The default value for this
// buffer is 32KB. Zero or negative values will disable read buffer for a
// connection so data framer can access the underlying conn directly.
func ReadBufferSize(s int) ServerOption {
    return newFuncServerOption(func(o *serverOptions) {
        o.readBufferSize = s
    })
}
详解
  1. 它指定了服务器端在接收数据时使用的读缓冲区的大小。这个缓冲区的大小决定了在一次系统调用中可以读取的最大数据量。
  2. 作用
    • 性能调优:适当调整读缓冲区的大小可以优化服务器接收数据的性能。
    • 资源管理:通过设置合适的读缓冲区大小,可以有效地管理服务器端的内存使用。
  3. 默认值: 如果没有设置这个选项,gRPC服务器默认的读缓冲区大小是32KB。
  4. 使用场景
    • 当服务器需要处理大量数据时,可能需要调整读缓冲区大小以优化性能。
    • 在网络环境变化时,调整读缓冲区大小可以帮助服务器更好地适应不同的网络条件。
package main

import (
    "net"

    "google.golang.org/grpc"
)

func main() {
    lis, err := net.Listen("tcp", ":50051")
    if err != nil {
        // 错误处理
    }

    // 设置服务器读缓冲区大小为100KB
    opts := []grpc.ServerOption{
        grpc.ReadBufferSize(100 * 1024),
    }

    s := grpc.NewServer(opts...)
    // 注册服务...

    s.Serve(lis)
}
  1. 在这个例子中,我们将服务器读缓冲区大小设置为100KB。这意味着服务器在每次系统调用中最多可以读取100KB的数据。如果设置为0或负数,服务器将不使用读缓冲区,而是直接访问底层的连接。这样的配置有助于确保服务器在接收大量数据时能够高效地处理。

WriteBufferSize(s)

  1. WriteBufferSize决定在对网络执行写操作之前可以批处理多少数据。这个缓冲区的默认值是32KB。
  2. 0或负值将禁用写缓冲区,这样每次写都将在底层连接上进行。注意:Send调用不能直接转换为write。
// WriteBufferSize determines how much data can be batched before doing a write
// on the wire. The default value for this buffer is 32KB. Zero or negative
// values will disable the write buffer such that each write will be on underlying
// connection. Note: A Send call may not directly translate to a write.
func WriteBufferSize(s int) ServerOption {
    return newFuncServerOption(func(o *serverOptions) {
        o.writeBufferSize = s
    })
}
详解
  1. 它指定了服务器端在发送数据时使用的写缓冲区的大小。这个缓冲区的大小决定了在一次系统调用中可以发送的最大数据量。
  2. 作用
    • 性能调优:适当调整写缓冲区的大小可以优化服务器发送数据的性能。
    • 资源管理:通过设置合适的写缓冲区大小,可以有效地管理服务器端的内存使用。
  3. 默认值: 如果没有设置这个选项,gRPC服务器默认的写缓冲区大小是32KB。
  4. 使用场景
    • 当服务器需要处理大量数据时,可能需要调整写缓冲区大小以优化性能。
    • 在网络环境变化时,调整写缓冲区大小可以帮助服务器更好地适应不同的网络条件。
package main

import (
    "net"

    "google.golang.org/grpc"
)

func main() {
    lis, err := net.Listen("tcp", ":50051")
    if err != nil {
        // 错误处理
    }

    // 设置服务器写缓冲区大小为100KB
    opts := []grpc.ServerOption{
        grpc.WriteBufferSize(100 * 1024),
    }

    s := grpc.NewServer(opts...)
    // 注册服务...

    s.Serve(lis)
}
  1. 在这个例子中,我们将服务器写缓冲区大小设置为100KB。这意味着服务器在每次系统调用中最多可以发送100KB的数据。如果设置为0或负数,服务器将不使用写缓冲区,而是直接在底层连接上发送数据。这样的配置有助于确保服务器在发送大量数据时能够高效地处理。

SharedWriteBuffer(val)

  1. SharedWriteBuffer允许重用每个连接的传输写缓冲区。
  2. 如果将此选项设置为true,则每个连接都会在刷新连接上的数据后释放缓冲区。
  3. 注意:此API是实验性的,可能会在以后的版本中更改或删除。
// SharedWriteBuffer allows reusing per-connection transport write buffer.
// If this option is set to true every connection will release the buffer after
// flushing the data on the wire.
//
// # Experimental
//
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
// later release.
func SharedWriteBuffer(val bool) ServerOption {
    return newFuncServerOption(func(o *serverOptions) {
        o.sharedWriteBuffer = val
    })
}

StatsHandler(h)

  1. StatsHandler返回一个ServerOption,用于设置服务器的统计处理程序。
// StatsHandler returns a ServerOption that sets the stats handler for the server.
func StatsHandler(h stats.Handler) ServerOption {
    return newFuncServerOption(func(o *serverOptions) {
        if h == nil {
            logger.Error("ignoring nil parameter in grpc.StatsHandler ServerOption")
            // Do not allow a nil stats handler, which would otherwise cause
            // panics.
            return
        }
        o.statsHandlers = append(o.statsHandlers, h)
    })
}
详解
  1. 它指定了服务器应该使用的统计(stats)处理程序。这个处理程序用于收集和报告关于服务器操作的统计信息,这些信息可以帮助开发者监控服务器性能、诊断问题,并进行性能调优。
  2. 作用: 这个选项允许你指定一个统计处理程序,该处理程序会在服务器操作时收集统计信息。这些统计信息可以通过不同的方式进行报告,例如通过日志记录、统计服务或仪表板。
  3. 使用场景
    • 性能监控:通过收集服务器操作的统计信息,可以监控服务器的性能,及时发现性能瓶颈。
    • 问题诊断:当服务出现问题时,统计信息可以帮助诊断问题原因。
    • 性能调优:根据统计数据,可以对服务器配置进行调整,以优化性能。
package main

import (
    "context"
    "net"

    "google.golang.org/grpc"
    "google.golang.org/grpc/stats"
)

func main() {
    lis, err := net.Listen("tcp", ":50051")
    if err != nil {
        // 错误处理
    }

    // 创建一个简单的统计处理程序
    statsHandler := stats.HandlerFunc(func(s stats.RawServerStats) {
        // 处理统计信息,例如记录到日志
        // logger.Printf("Received %d bytes on stream", s.StreamBytesReceived)
    })

    opts := []grpc.ServerOption{
        grpc.StatsHandler(statsHandler),
    }

    s := grpc.NewServer(opts...)
    // 注册服务...

    s.Serve(lis)
}
  1. 在这个例子中,我们创建了一个简单的统计处理程序,它将记录每个流接收到的字节数。这只是一个简单的例子,实际应用中可以根据需要实现更复杂的统计逻辑。通过设置StatsHandler,我们可以在服务器操作时收集这些统计信息。

StreamInterceptor(i)

  1. StreamInterceptor返回一个为服务器设置StreamServerInterceptor的ServerOption。只能安装一个流拦截器。
// StreamInterceptor returns a ServerOption that sets the StreamServerInterceptor for the
// server. Only one stream interceptor can be installed.
func StreamInterceptor(i StreamServerInterceptor) ServerOption {
    return newFuncServerOption(func(o *serverOptions) {
        if o.streamInt != nil {
            panic("The stream server interceptor was already set and may not be reset.")
        }
        o.streamInt = i
    })
}
示例
func main() {
    // 绝对地址
    // 方法一
    creds, err1 := credentials.NewServerTLSFromFile(
        "/root/workspace/learn-grpc/key/test.pem",
        "/root/workspace/learn-grpc/key/test.key",
    )

    if err1 != nil {
        fmt.Printf("证书错误:%v", err1)
        return
    }

    // 方法二
    //cert, err := tls.LoadX509KeyPair("", "")
    //if err != nil {
    //	fmt.Printf("私钥错误:%v", err)
    //	return
    //}
    //
    //creds := credentials.NewServerTLSFromCert(&cert)
    // 开启端口
    listen, _ := net.Listen("tcp", ":9090")
    // 创建grpc服务
    grpcServer := grpc.NewServer(grpc.Creds(creds),
        grpc.UnaryInterceptor(interceptor.UnaryServerInterceptor()),
        // ---------  StreamServerInterceptor  ---------
        grpc.StreamInterceptor(interceptor.StreamServerInterceptor())) 

    // GetServiceInfo01: map[]
    fmt.Printf("GetServiceInfo01: %v\n", grpcServer.GetServiceInfo())
    pb.RegisterSayHelloServer(grpcServer, &server{})
    // GetServiceInfo02: map[SayHello:{[{SayHello false false} {Channel true true}] hello.proto}]
    fmt.Printf("GetServiceInfo02: %v\n", grpcServer.GetServiceInfo())

    // 启动服务
    err := grpcServer.Serve(listen)
    if err != nil {
        fmt.Println(err)
        return
    }
}

UnaryInterceptor(i)

  1. UnaryInterceptor返回一个为服务器设置UnaryServerInterceptor的ServerOption。
  2. 只能安装一个一元拦截器。多个拦截器的构造(例如,链接)可以在调用者处实现。
// UnaryInterceptor returns a ServerOption that sets the UnaryServerInterceptor for the
// server. Only one unary interceptor can be installed. The construction of multiple
// interceptors (e.g., chaining) can be implemented at the caller.
func UnaryInterceptor(i UnaryServerInterceptor) ServerOption {
    return newFuncServerOption(func(o *serverOptions) {
        if o.unaryInt != nil {
            panic("The unary server interceptor was already set and may not be reset.")
        }
        o.unaryInt = i
    })
}
示例
func main() {
    // 绝对地址
    // 方法一
    creds, err1 := credentials.NewServerTLSFromFile(
        "/root/workspace/learn-grpc/key/test.pem",
        "/root/workspace/learn-grpc/key/test.key",
    )

    if err1 != nil {
        fmt.Printf("证书错误:%v", err1)
        return
    }

    // 方法二
    //cert, err := tls.LoadX509KeyPair("", "")
    //if err != nil {
    //	fmt.Printf("私钥错误:%v", err)
    //	return
    //}
    //
    //creds := credentials.NewServerTLSFromCert(&cert)
    // 开启端口
    listen, _ := net.Listen("tcp", ":9090")
    // 创建grpc服务
    grpcServer := grpc.NewServer(grpc.Creds(creds),
        // ---------  UnaryServerInterceptor  ---------
        grpc.UnaryInterceptor(interceptor.UnaryServerInterceptor()),
        grpc.StreamInterceptor(interceptor.StreamServerInterceptor())) 

    // GetServiceInfo01: map[]
    fmt.Printf("GetServiceInfo01: %v\n", grpcServer.GetServiceInfo())
    pb.RegisterSayHelloServer(grpcServer, &server{})
    // GetServiceInfo02: map[SayHello:{[{SayHello false false} {Channel true true}] hello.proto}]
    fmt.Printf("GetServiceInfo02: %v\n", grpcServer.GetServiceInfo())

    // 启动服务
    err := grpcServer.Serve(listen)
    if err != nil {
        fmt.Println(err)
        return
    }
}

UnknownServiceHandler(streamHandler)

  1. UnknownServiceHandler返回一个允许添加自定义未知服务处理程序的ServerOption。
  2. 所提供的方法是一个双流RPC服务处理程序,当接收到未注册的服务或方法的请求时,将调用该处理程序,而不是返回“未实现”gRPC错误。
  3. 处理函数和流拦截器(如果设置了)可以完全访问ServerStream,包括它的Context。
// UnknownServiceHandler returns a ServerOption that allows for adding a custom
// unknown service handler. The provided method is a bidi-streaming RPC service
// handler that will be invoked instead of returning the "unimplemented" gRPC
// error whenever a request is received for an unregistered service or method.
// The handling function and stream interceptor (if set) have full access to
// the ServerStream, including its Context.
func UnknownServiceHandler(streamHandler StreamHandler) ServerOption {
    return newFuncServerOption(func(o *serverOptions) {
        o.unknownStreamDesc = &StreamDesc{
            StreamName: "unknown_service_handler",
            Handler:    streamHandler,
            // We need to assume that the users of the streamHandler will want to use both.
            ClientStreams: true,
            ServerStreams: true,
        }
    })
}
详解
  1. 它允许你添加一个自定义的未知服务处理程序。这个处理程序会在接收到请求未知服务或方法时被调用,而不是返回gRPC的“未实现”错误。
  2. 作用
    • 定制响应:通过设置自定义的处理程序,你可以控制对未知服务的响应,而不是简单地返回错误。
    • 扩展性:允许你为未知服务提供额外的功能,例如日志记录、异常处理或自定义响应。
  3. 使用场景
    • 日志记录:记录所有接收到的未知服务请求,以便于后续分析和调试。
    • 异常处理:捕获所有未知服务的请求,并进行异常处理。
    • 自定义响应:为未知服务提供自定义的响应,例如返回特定的错误消息或状态码。
package main

import (
    "context"
    "net"

    "google.golang.org/grpc"
)

// 自定义的StreamHandler
type CustomStreamHandler struct{}

func (c *CustomStreamHandler) Send(m *CustomMessage) error {
    // 发送自定义响应
    return nil
}

func (c *CustomStreamHandler) Recv() (*CustomMessage, error) {
    // 接收请求并处理
    return nil, nil
}

func (c *CustomStreamHandler) CloseSend() error {
    // 关闭发送
    return nil
}

func (c *CustomStreamHandler) Context() context.Context {
    // 获取上下文
    return nil
}

func main() {
    lis, err := net.Listen("tcp", ":50051")
    if err != nil {
        // 错误处理
    }

    // 设置自定义的UnknownServiceHandler
    opts := []grpc.ServerOption{
        grpc.UnknownServiceHandler(new(CustomStreamHandler)),
    }

    s := grpc.NewServer(opts...)
    // 注册服务...

    s.Serve(lis)
}
  1. 在这个例子中,我们创建了一个自定义的CustomStreamHandler结构体,它实现了StreamHandler接口。通过设置UnknownServiceHandler,我们为未知服务提供了一个自定义的处理程序。当服务器接收到未知服务的请求时,它会调用这个处理程序,而不是返回“未实现”错误。

WaitForHandlers(w)

  1. WaitForHandlers导致Stop等待,直到所有未完成的方法处理程序都退出后才返回。
  2. 如果为false, Stop将在所有连接关闭后立即返回,但方法处理程序可能仍在运行。默认情况下,Stop不等待方法处理程序返回。
  3. 注意:此API是实验性的,可能会在以后的版本中更改或删除。
// WaitForHandlers cause Stop to wait until all outstanding method handlers have
// exited before returning.  If false, Stop will return as soon as all
// connections have closed, but method handlers may still be running. By
// default, Stop does not wait for method handlers to return.
//
// # Experimental
//
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
// later release.
func WaitForHandlers(w bool) ServerOption {
    return newFuncServerOption(func(o *serverOptions) {
        o.waitForHandlers = w
    })
}