grpc.Client
type ClientConn
- ClientConn表示到概念端点的虚拟连接,以执行rpc。
- 根据配置、负载等,ClientConn可以自由地拥有零个或多个到端点的实际连接。它还可以自由地确定要使用的实际端点,并可以在每个RPC中更改它,从而允许客户端负载平衡。
- ClientConn封装了一系列功能,包括名称解析、TCP连接建立(带有重试和退出)和TLS握手。它还通过重新解析名称和重新连接来处理已建立连接上的错误。
type ClientConn struct {
// contains filtered or unexported fields
}
NewClient(target, opts)
- NewClient为提供的目标URI创建一个新的gRPC “channel”。无I/O操作。对rpc使用ClientConn将自动使其连接。Connect可以用于手动创建连接,但对于大多数用户来说,这是不必要的。
- 目标名称语法在https://github.com/grpc/grpc/blob/master/doc/naming.md中定义。例如,要使用DNS解析器,应该对目标应用" DNS:///“前缀。
- WithBlock, WithTimeout, WithReturnConnectionError和FailOnNonTempDialError返回的DialOptions将被该函数忽略。
// NewClient creates a new gRPC "channel" for the target URI provided. No I/O
// is performed. Use of the ClientConn for RPCs will automatically cause it to
// connect. Connect may be used to manually create a connection, but for most
// users this is unnecessary.
//
// The target name syntax is defined in
// https://github.com/grpc/grpc/blob/master/doc/naming.md. e.g. to use dns
// resolver, a "dns:///" prefix should be applied to the target.
//
// The DialOptions returned by WithBlock, WithTimeout,
// WithReturnConnectionError, and FailOnNonTempDialError are ignored by this
// function.
func NewClient(target string, opts ...DialOption) (conn *ClientConn, err error) {
cc := &ClientConn{
target: target,
conns: make(map[*addrConn]struct{}),
dopts: defaultDialOptions(),
}
cc.retryThrottler.Store((*retryThrottler)(nil))
cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{nil})
cc.ctx, cc.cancel = context.WithCancel(context.Background())
// Apply dial options.
disableGlobalOpts := false
for _, opt := range opts {
if _, ok := opt.(*disableGlobalDialOptions); ok {
disableGlobalOpts = true
break
}
}
if !disableGlobalOpts {
for _, opt := range globalDialOptions {
opt.apply(&cc.dopts)
}
}
for _, opt := range opts {
opt.apply(&cc.dopts)
}
// Determine the resolver to use.
if err := cc.initParsedTargetAndResolverBuilder(); err != nil {
return nil, err
}
for _, opt := range globalPerTargetDialOptions {
opt.DialOptionForTarget(cc.parsedTarget.URL).apply(&cc.dopts)
}
chainUnaryClientInterceptors(cc)
chainStreamClientInterceptors(cc)
if err := cc.validateTransportCredentials(); err != nil {
return nil, err
}
if cc.dopts.defaultServiceConfigRawJSON != nil {
scpr := parseServiceConfig(*cc.dopts.defaultServiceConfigRawJSON, cc.dopts.maxCallAttempts)
if scpr.Err != nil {
return nil, fmt.Errorf("%s: %v", invalidDefaultServiceConfigErrPrefix, scpr.Err)
}
cc.dopts.defaultServiceConfig, _ = scpr.Config.(*ServiceConfig)
}
cc.mkp = cc.dopts.copts.KeepaliveParams
if err = cc.initAuthority(); err != nil {
return nil, err
}
// Register ClientConn with channelz. Note that this is only done after
// channel creation cannot fail.
cc.channelzRegistration(target)
channelz.Infof(logger, cc.channelz, "parsed dial target is: %#v", cc.parsedTarget)
channelz.Infof(logger, cc.channelz, "Channel authority set to %q", cc.authority)
cc.csMgr = newConnectivityStateManager(cc.ctx, cc.channelz)
cc.pickerWrapper = newPickerWrapper(cc.dopts.copts.StatsHandlers)
cc.metricsRecorderList = stats.NewMetricsRecorderList(cc.dopts.copts.StatsHandlers)
cc.initIdleStateLocked() // Safe to call without the lock, since nothing else has a reference to cc.
cc.idlenessMgr = idle.NewManager((*idler)(cc), cc.dopts.idleTimeout)
return cc, nil
}
(cc) CanonicalTarget()
- CanonicalTarget返回ClientConn的规范目标字符串。
// CanonicalTarget returns the canonical target string of the ClientConn.
func (cc *ClientConn) CanonicalTarget() string {
return cc.parsedTarget.String()
}
示例
func main() {
// 配置ssl,"*.heliu.site"在实际开发中从浏览器中取获取,证书路径使用绝对路径
creds, _ := credentials.NewClientTLSFromFile(
"/root/workspace/learn-grpc/key/test.pem",
"*.heliu.site",
)
var opts []grpc.DialOption
// 不带TLS这里是grpc.WithTransportCredentials(insecure.NewCredentials())
opts = append(opts, grpc.WithTransportCredentials(creds))
opts = append(opts, grpc.WithPerRPCCredentials(&ClientTokenAuth{}))
// 添加客户端拦截器
opts = append(opts, grpc.WithUnaryInterceptor(interceptor.UnaryClientInterceptor()))
// 添加流拦截器
opts = append(opts, grpc.WithStreamInterceptor(interceptor.StreamClientInterceptor()))
// 连接server端,使用ssl加密通信
conn, err := grpc.NewClient("127.0.0.1:9090", opts...)
if err != nil {
log.Fatalf("did not connect: %v", err)
}
defer conn.Close()
// ----------- CanonicalTarget: dns:///127.0.0.1:9090 -----------
log.Printf("CanonicalTarget: %s\n", conn.CanonicalTarget())
// ...
}
(cc) Close()
- Close关闭ClientConn和所有底层连接。
// Close tears down the ClientConn and all underlying connections.
func (cc *ClientConn) Close() error {
defer func() {
cc.cancel()
<-cc.csMgr.pubSub.Done()
}()
// Prevent calls to enter/exit idle immediately, and ensure we are not
// currently entering/exiting idle mode.
cc.idlenessMgr.Close()
cc.mu.Lock()
if cc.conns == nil {
cc.mu.Unlock()
return ErrClientConnClosing
}
conns := cc.conns
cc.conns = nil
cc.csMgr.updateState(connectivity.Shutdown)
// We can safely unlock and continue to access all fields now as
// cc.conns==nil, preventing any further operations on cc.
cc.mu.Unlock()
cc.resolverWrapper.close()
// The order of closing matters here since the balancer wrapper assumes the
// picker is closed before it is closed.
cc.pickerWrapper.close()
cc.balancerWrapper.close()
<-cc.resolverWrapper.serializer.Done()
<-cc.balancerWrapper.serializer.Done()
for ac := range conns {
ac.tearDown(ErrClientConnClosing)
}
cc.addTraceEvent("deleted")
// TraceEvent needs to be called before RemoveEntry, as TraceEvent may add
// trace reference to the entity being deleted, and thus prevent it from being
// deleted right away.
channelz.RemoveEntry(cc.channelz.ID)
return nil
}
示例
func main() {
// 配置ssl,"*.heliu.site"在实际开发中从浏览器中取获取,证书路径使用绝对路径
creds, _ := credentials.NewClientTLSFromFile(
"/root/workspace/learn-grpc/key/test.pem",
"*.heliu.site",
)
var opts []grpc.DialOption
// 不带TLS这里是grpc.WithTransportCredentials(insecure.NewCredentials())
opts = append(opts, grpc.WithTransportCredentials(creds))
opts = append(opts, grpc.WithPerRPCCredentials(&ClientTokenAuth{}))
// 添加客户端拦截器
opts = append(opts, grpc.WithUnaryInterceptor(interceptor.UnaryClientInterceptor()))
// 添加流拦截器
opts = append(opts, grpc.WithStreamInterceptor(interceptor.StreamClientInterceptor()))
// 连接server端,使用ssl加密通信
conn, err := grpc.NewClient("127.0.0.1:9090", opts...)
if err != nil {
log.Fatalf("did not connect: %v", err)
}
// ----------- Close -----------
defer conn.Close()
log.Printf("CanonicalTarget: %s\n", conn.CanonicalTarget())
// ...
}
(cc) Connect()
- 如果channel空闲,Connect将导致ClientConn中的所有子通道尝试连接。在返回之前不等待连接尝试开始。
- 注意:此API是实验性的,可能会在以后的版本中更改或删除。
// Connect causes all subchannels in the ClientConn to attempt to connect if
// the channel is idle. Does not wait for the connection attempts to begin
// before returning.
//
// # Experimental
//
// Notice: This API is EXPERIMENTAL and may be changed or removed in a later
// release.
func (cc *ClientConn) Connect() {
if err := cc.idlenessMgr.ExitIdleMode(); err != nil {
cc.addTraceEvent(err.Error())
return
}
// If the ClientConn was not in idle mode, we need to call ExitIdle on the
// LB policy so that connections can be created.
cc.mu.Lock()
cc.balancerWrapper.exitIdle()
cc.mu.Unlock()
}
详解
- 它用于通知客户端连接的负载均衡器策略(LB policy)退出空闲模式,并尝试建立连接。这个函数通常在客户端连接空闲时调用,以便在必要时重新建立连接。
- 作用:
- 重新连接:当客户端连接处于空闲状态时,可以调用这个函数来尝试重新建立连接。
- 状态更新:通知负载均衡器策略客户端连接已准备好接受新的请求,并应该开始创建新的连接。
- 使用场景:
- 空闲连接管理:当客户端连接长时间没有活动时,可以通过调用这个函数来重新激活连接。
- 负载均衡策略:当客户端连接的负载均衡策略需要根据连接状态进行调整时,可以使用这个函数。
- 需要注意的是,
Connect
函数是实验性的,可能会在未来的gRPC版本中更改或移除。因此,在生产环境中使用这个函数之前,应该仔细评估其稳定性和适用性。
package main
import (
"context"
"fmt"
"google.golang.org/grpc"
"time"
)
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
conn, err := grpc.DialContext(ctx, "localhost:50051", grpc.WithBlock())
if err != nil {
fmt.Printf("Failed to dial: %v\n", err)
return
}
defer conn.Close()
// 等待连接状态从DOWN变为READY
if conn.WaitForStateChange(ctx, connectivity.StateDown) {
fmt.Println("Connection state changed to READY")
} else {
fmt.Println("Connection state change timeout")
}
// 在需要时重新连接
conn.Connect()
}
- 在这个例子中,我们创建了一个上下文,设置了5秒的超时时间。然后,我们尝试建立与服务器端口50051的gRPC连接。如果连接成功,我们使用
WaitForStateChange
函数等待连接状态从DOWN
变为READY
。如果连接状态在5秒内没有改变,我们将打印出超时的消息。随后,我们调用Connect
函数通知客户端连接的负载均衡器策略退出空闲模式,并尝试建立新的连接。
(cc) GetMethodConfig(method)
- GetMethodConfig获取输入法的方法配置。如果有一个完全匹配的输入法(即/service/method),我们返回相应的MethodConfig。如果输入法没有精确匹配,我们在服务(即/service/)下查找服务的默认配置,然后查找所有服务的默认配置(空字符串)。
- 如果服务有默认的MethodConfig,我们就返回它。否则,我们返回一个空的MethodConfig。
// GetMethodConfig gets the method config of the input method.
// If there's an exact match for input method (i.e. /service/method), we return
// the corresponding MethodConfig.
// If there isn't an exact match for the input method, we look for the service's default
// config under the service (i.e /service/) and then for the default for all services (empty string).
//
// If there is a default MethodConfig for the service, we return it.
// Otherwise, we return an empty MethodConfig.
func (cc *ClientConn) GetMethodConfig(method string) MethodConfig {
// TODO: Avoid the locking here.
cc.mu.RLock()
defer cc.mu.RUnlock()
return getMethodConfig(cc.sc, method)
}
(cc) GetState()
- GetState返回ClientConn的connectivity.State。
// GetState returns the connectivity.State of ClientConn.
func (cc *ClientConn) GetState() connectivity.State {
return cc.csMgr.getState()
}
详解
- 它用于获取客户端连接当前的连接状态。这个函数在客户端需要了解其当前连接状态时非常有用,例如,当客户端需要根据连接状态调整其行为时。
- 作用:
- 状态监控:允许客户端检查其连接状态,以便根据需要进行相应的处理。
- 行为调整:客户端可以根据连接状态调整其行为,例如,当连接不可用时,可以暂停发送请求。
- 使用场景:
- 连接状态监控:在需要监控连接状态变化的应用场景中,如客户端需要根据连接状态调整其行为。
- 异常处理:在处理连接问题时,客户端可以检查连接状态来决定如何响应。
package main
import (
"context"
"fmt"
"google.golang.org/grpc"
"time"
)
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
conn, err := grpc.DialContext(ctx, "localhost:50051", grpc.WithBlock())
if err != nil {
fmt.Printf("Failed to dial: %v\n", err)
return
}
defer conn.Close()
// 获取连接状态
state := conn.GetState()
fmt.Printf("Connection state: %s\n", state)
// 假设在连接过程中遇到了短暂故障
if conn.WaitForStateChange(ctx, connectivity.StateIdle) {
fmt.Println("Connection state changed to READY")
} else {
fmt.Println("Connection state change timeout")
}
}
- 在这个例子中,我们创建了一个上下文,设置了5秒的超时时间。然后,我们尝试建立与服务器端口50051的gRPC连接。如果连接成功,我们使用
GetState
函数获取连接状态,并打印出来。随后,我们假设遇到了网络故障,并调用WaitForStateChange
函数等待连接状态从IDLE
变为READY
。如果在5秒内状态没有改变,我们将打印出超时的消息。
(cc) Invoke(ctx, method, args, reply, opts)
- Invoke通过网络发送RPC请求,并在收到响应后返回。这通常由生成的代码调用。
- 调用返回的所有错误都与状态包兼容。
// Invoke sends the RPC request on the wire and returns after response is
// received. This is typically called by generated code.
//
// All errors returned by Invoke are compatible with the status package.
func (cc *ClientConn) Invoke(ctx context.Context, method string, args, reply any, opts ...CallOption) error {
// allow interceptor to see all applicable call options, which means those
// configured as defaults from dial option as well as per-call options
opts = combine(cc.dopts.callOptions, opts)
if cc.dopts.unaryInt != nil {
return cc.dopts.unaryInt(ctx, method, args, reply, cc, invoke, opts...)
}
return invoke(ctx, method, args, reply, cc, opts...)
}
详解
- 它用于发送一个单向的RPC请求到服务器端,并等待接收响应。这个函数通常由gRPC代码生成器调用,用于客户端调用服务器端的单向RPC方法。
- 作用:
- 发送请求:允许客户端发送一个单向的RPC请求到服务器端。
- 接收响应:等待服务器端的响应,并将响应数据存储在
reply
参数中。
- 使用场景:
- 单向RPC调用:当客户端只需要发送请求而不需要接收响应时,可以使用这个函数。
- 服务调用:客户端可以通过这个函数调用服务器端的服务。
package main
import (
"context"
"fmt"
"google.golang.org/grpc"
"time"
)
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
conn, err := grpc.DialContext(ctx, "localhost:50051", grpc.WithBlock())
if err != nil {
fmt.Printf("Failed to dial: %v\n", err)
return
}
defer conn.Close()
// 调用服务器端的单向RPC方法
err = conn.Invoke(ctx, "MyService.MyMethod", nil, nil, grpc.FailOnNonTempDialError(true))
if err != nil {
fmt.Printf("Failed to invoke RPC: %v\n", err)
return
}
// 处理响应(如果有的话)
// ...
}
- 在这个例子中,我们创建了一个上下文,设置了5秒的超时时间。然后,我们尝试建立与服务器端口50051的gRPC连接。如果连接成功,我们使用
Invoke
函数调用服务器端的单向RPC方法。如果有响应,我们可以通过reply
参数来处理。最后,我们关闭连接以释放资源。
(cc) NewStream(ctx, desc, method, opts)
-
NewStream为客户端创建一个新的流。这通常由生成的代码调用。ctx用于流的生命周期。
-
为了确保资源不会因为返回的流而泄露,必须执行以下操作之一:
- 在ClientConn上调用Close。
- 取消提供的上下文。
- 调用RecvMsg,直到返回非nil错误。例如,protobuf生成的客户端流式RPC可能会使用辅助函数CloseAndRecv(请注意,CloseSend不Recv,因此不能保证释放所有资源)。
- 接收一个非nil,非io。报头或发送消息出错。
-
如果上述情况都没有发生,则会泄露一个程序和一个上下文,并且grpc将不会使用stats.End消息调用可选配置的stats处理程序。
// NewStream creates a new Stream for the client side. This is typically
// called by generated code. ctx is used for the lifetime of the stream.
//
// To ensure resources are not leaked due to the stream returned, one of the following
// actions must be performed:
//
// 1. Call Close on the ClientConn.
// 2. Cancel the context provided.
// 3. Call RecvMsg until a non-nil error is returned. A protobuf-generated
// client-streaming RPC, for instance, might use the helper function
// CloseAndRecv (note that CloseSend does not Recv, therefore is not
// guaranteed to release all resources).
// 4. Receive a non-nil, non-io.EOF error from Header or SendMsg.
//
// If none of the above happen, a goroutine and a context will be leaked, and grpc
// will not call the optionally-configured stats handler with a stats.End message.
func (cc *ClientConn) NewStream(ctx context.Context, desc *StreamDesc, method string, opts ...CallOption) (ClientStream, error) {
// allow interceptor to see all applicable call options, which means those
// configured as defaults from dial option as well as per-call options
opts = combine(cc.dopts.callOptions, opts)
if cc.dopts.streamInt != nil {
return cc.dopts.streamInt(ctx, desc, cc, method, newClientStream, opts...)
}
return newClientStream(ctx, desc, cc, method, opts...)
}
详解
- 它用于创建一个新的客户端流(
ClientStream
)。这个函数通常由gRPC代码生成器调用,用于创建与服务器端服务的通信通道。 - 作用:
- 创建通信通道:允许客户端创建一个新的流,以便与服务器端服务进行通信。
- 行为配置:通过传递不同的
CallOption
参数,可以配置流的发送和接收行为。
- 使用场景:
- 服务调用:当客户端需要调用服务器端的服务时,可以使用这个函数创建一个新的流。
- 请求发送:客户端可以通过这个流发送请求消息给服务器端。
package main
import (
"context"
"fmt"
"google.golang.org/grpc"
"time"
)
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
conn, err := grpc.DialContext(ctx, "localhost:50051", grpc.WithBlock())
if err != nil {
fmt.Printf("Failed to dial: %v\n", err)
return
}
defer conn.Close()
// 创建一个新的流
stream, err := conn.NewStream(ctx, &StreamDesc{
// 流描述
}, "MyService.MyMethod", grpc.FailOnNonTempDialError(true))
if err != nil {
fmt.Printf("Failed to create stream: %v\n", err)
return
}
// 发送请求消息
// ...
// 接收响应消息
// ...
// 关闭流
stream.CloseSend()
}
- 在这个例子中,我们创建了一个上下文,设置了5秒的超时时间。然后,我们尝试建立与服务器端口50051的gRPC连接。如果连接成功,我们使用
NewStream
函数创建一个新的流,并指定要调用的RPC方法和一些流选项。接下来,我们可以通过这个流发送请求消息给服务器端,并接收响应消息。最后,我们关闭流以释放资源。
(cc) ResetConnectBackoff()
- ResetConnectBackoff唤醒所有暂时失败的子通道,并使它们立即尝试另一个连接。它还重置用于后续尝试的后退时间,而不管当前状态如何。
- 一般来说,不应该使用这个函数。在默认情况下,典型的服务或网络中断会导致合理的客户端重新连接策略。但是,如果先前不可用的网络变得可用,则可以使用此命令触发立即重新连接。
- 注意:此API是实验性的,可能会在以后的版本中更改或删除。
// ResetConnectBackoff wakes up all subchannels in transient failure and causes
// them to attempt another connection immediately. It also resets the backoff
// times used for subsequent attempts regardless of the current state.
//
// In general, this function should not be used. Typical service or network
// outages result in a reasonable client reconnection strategy by default.
// However, if a previously unavailable network becomes available, this may be
// used to trigger an immediate reconnect.
//
// # Experimental
//
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
// later release.
func (cc *ClientConn) ResetConnectBackoff() {
cc.mu.Lock()
conns := cc.conns
cc.mu.Unlock()
for ac := range conns {
ac.resetConnectBackoff()
}
}
详解
- 它用于重置客户端连接的退避重试策略。这个函数在客户端遇到短暂故障并希望立即尝试重新连接时非常有用。
- 作用:
- 立即重试:当客户端遇到短暂故障(如网络问题)时,可以使用这个函数来立即重试连接,而不是等待预设的退避重试间隔。
- 状态重置:无论当前连接状态如何,这个函数都会重置后续尝试的退避时间。
- 使用场景:
- 网络恢复:当网络故障恢复时,客户端可以调用这个函数来立即尝试重新连接。
- 异常处理:在某些情况下,客户端可能需要绕过正常的退避重试策略,例如,当连接是由于客户端自身的问题(如客户端的代码错误)导致失败时。
- 需要注意的是,
ResetConnectBackoff
函数是实验性的,可能会在未来的gRPC版本中更改或移除。因此,在生产环境中使用这个函数之前,应该仔细评估其稳定性和适用性。
package main
import (
"context"
"fmt"
"google.golang.org/grpc"
"time"
)
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
conn, err := grpc.DialContext(ctx, "localhost:50051", grpc.WithBlock())
if err != nil {
fmt.Printf("Failed to dial: %v\n", err)
return
}
defer conn.Close()
// 假设在连接过程中遇到了短暂故障
if conn.WaitForStateChange(ctx, connectivity.StateIdle) {
fmt.Println("Connection state changed to READY")
} else {
fmt.Println("Connection state change timeout")
}
// 网络故障恢复,重置连接退避
conn.ResetConnectBackoff()
// 再次尝试连接
if conn.WaitForStateChange(ctx, connectivity.StateIdle) {
fmt.Println("Connection state changed to READY")
} else {
fmt.Println("Connection state change timeout")
}
}
- 在这个例子中,我们创建了一个上下文,设置了5秒的超时时间。然后,我们尝试建立与服务器端口50051的gRPC连接。如果连接成功,我们使用
WaitForStateChange
函数等待连接状态从IDLE
变为READY
。如果在5秒内状态没有改变,我们将打印出超时的消息。随后,我们假设遇到了网络故障,并调用ResetConnectBackoff
函数重置连接退避。最后,我们再次尝试连接并等待状态变化。
(cc) Target()
- Target返回ClientConn的目标字符串。
// Target returns the target string of the ClientConn.
func (cc *ClientConn) Target() string {
return cc.target
}
示例
func main() {
// 配置ssl,"*.heliu.site"在实际开发中从浏览器中取获取,证书路径使用绝对路径
creds, _ := credentials.NewClientTLSFromFile(
"/root/workspace/learn-grpc/key/test.pem",
"*.heliu.site",
)
var opts []grpc.DialOption
// 不带TLS这里是grpc.WithTransportCredentials(insecure.NewCredentials())
opts = append(opts, grpc.WithTransportCredentials(creds))
opts = append(opts, grpc.WithPerRPCCredentials(&ClientTokenAuth{}))
// 添加客户端拦截器
opts = append(opts, grpc.WithUnaryInterceptor(interceptor.UnaryClientInterceptor()))
// 添加流拦截器
opts = append(opts, grpc.WithStreamInterceptor(interceptor.StreamClientInterceptor()))
// 连接server端,使用ssl加密通信
conn, err := grpc.NewClient("127.0.0.1:9090", opts...)
if err != nil {
log.Fatalf("did not connect: %v", err)
}
defer conn.Close()
// Target(): 127.0.0.1:9090
log.Printf("Target(): %s\n", conn.Target())
// 建立连接
client := pb.NewSayHelloClient(conn)
// ...
}
(cc) WaitForStateChange(ctx, sourceState)
- WaitForStateChange等待,直到ClientConn的connectivity.State从sourceState更改或ctx过期。
- 前一种情况返回真值,后一种情况返回假值。
// WaitForStateChange waits until the connectivity.State of ClientConn changes from sourceState or
// ctx expires. A true value is returned in former case and false in latter.
func (cc *ClientConn) WaitForStateChange(ctx context.Context, sourceState connectivity.State) bool {
ch := cc.csMgr.getNotifyChan()
if cc.csMgr.getState() != sourceState {
return true
}
select {
case <-ctx.Done():
return false
case <-ch:
return true
}
}
详解
- 它用于等待客户端连接的状态从指定的源状态(
sourceState
)改变。这个函数在客户端需要等待连接状态变化时非常有用,例如,当客户端需要检测到网络的恢复时。 - 作用:
- 状态监控:允许客户端在连接状态改变时进行响应,例如,当网络中断后恢复时。
- 错误处理:在等待状态变化的过程中,如果超时,客户端可以进行适当的错误处理。
- 使用场景:
- 网络恢复检测:当客户端连接的网络出现问题,可以使用这个函数来检测网络是否恢复。
- 连接状态监控:在需要监控连接状态变化的应用场景中,如客户端需要根据连接状态调整其行为。
package main
import (
"context"
"fmt"
"google.golang.org/grpc"
"time"
)
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
conn, err := grpc.DialContext(ctx, "localhost:50051", grpc.WithBlock())
if err != nil {
fmt.Printf("Failed to dial: %v\n", err)
return
}
defer conn.Close()
// 等待连接状态从DOWN变为READY
if conn.WaitForStateChange(ctx, connectivity.StateDown) {
fmt.Println("Connection state changed to READY")
} else {
fmt.Println("Connection state change timeout")
}
}
- 在这个例子中,我们创建了一个上下文,设置了5秒的超时时间。然后,我们尝试建立与服务器端口50051的gRPC连接。如果连接成功,我们使用
WaitForStateChange
函数等待连接状态从DOWN
变为READY
。如果在5秒内状态没有改变,我们将打印出超时的消息。
type DialOption
- DialOption配置我们如何设置连接。
type DialOption interface {
apply(*dialOptions)
}
WithAuthority(a)
- WithAuthority返回一个DialOption,该DialOption指定要用作:authority伪头和身份验证握手中的服务器名称的值。
- 在HTTP/2协议中,:authority 是一个伪头部字段,用于携带请求的原始主机和端口信息。这个字段相当于HTTP/1.1中的Host头部字段,但在HTTP/2中,由于采用了新的头部压缩机制,:authority 被用作一个伪头部来区分实际的请求头部。
- 具体来说,:authority 字段通常包含以下内容:
- 主机名(例如 www.example.com)
- 端口号(如果使用了非标准端口,例如 :8080)
// WithAuthority returns a DialOption that specifies the value to be used as the
// :authority pseudo-header and as the server name in authentication handshake.
func WithAuthority(a string) DialOption {
return newFuncDialOption(func(o *dialOptions) {
o.authority = a
})
}
详情
- 用于指定在连接到gRPC服务器时使用的权威(authority)信息。这个权威信息通常包含服务器的域名或IP地址,并且是连接请求中的一个重要部分,因为它用于服务器身份验证和RPC方法的选择。
- 作用:
- 服务器识别:指定权威信息有助于服务器识别客户端的请求,并选择适当的RPC方法。
- 身份验证:在某些情况下,权威信息用于服务器端的认证过程。
- 使用场景:
- 客户端配置:在客户端代码中,可以通过传递
WithAuthority
选项来指定要连接的服务器的权威信息。 - 服务发现:在服务发现场景中,客户端可能需要根据服务注册表中的权威信息来连接到正确的服务器。
- 客户端配置:在客户端代码中,可以通过传递
package main
import (
"context"
"fmt"
"google.golang.org/grpc"
"time"
)
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// 指定要连接的服务器的权威信息
opts := []grpc.DialOption{
grpc.WithAuthority("example.com:50051"),
}
conn, err := grpc.DialContext(ctx, "localhost:50051", opts...)
if err != nil {
fmt.Printf("Failed to dial: %v\n", err)
return
}
defer conn.Close()
// 客户端可以继续使用这个连接进行RPC调用
// ...
}
- 在这个例子中,我们创建了一个上下文,设置了5秒的超时时间。然后,我们使用
WithAuthority
选项指定要连接的服务器的权威信息。最后,我们使用这些选项来建立与服务器端口50051的gRPC连接。如果连接成功,客户端可以继续使用这个连接进行RPC调用。
WithChainStreamInterceptor(interceptors)
- WithChainStreamInterceptor返回一个DialOption,用于指定流rpc的链式拦截器。
- 第一个拦截器将是最外层的,而最后一个拦截器将是围绕实际调用的最内层的包装器。
- 此方法添加的所有拦截器都将被链接,并且由WithStreamInterceptor定义的拦截器将始终被添加到链中。
// WithChainStreamInterceptor returns a DialOption that specifies the chained
// interceptor for streaming RPCs. The first interceptor will be the outer most,
// while the last interceptor will be the inner most wrapper around the real call.
// All interceptors added by this method will be chained, and the interceptor
// defined by WithStreamInterceptor will always be prepended to the chain.
func WithChainStreamInterceptor(interceptors ...StreamClientInterceptor) DialOption {
return newFuncDialOption(func(o *dialOptions) {
o.chainStreamInts = append(o.chainStreamInts, interceptors...)
})
}
详情
- 用于为gRPC客户端的流式RPC(streaming RPC)添加一个拦截器链(interceptor chain)。这个拦截器链会在客户端发送流式RPC请求和接收响应时被调用。
- 作用:
- 拦截器逻辑:允许客户端在流式RPC的各个阶段添加自定义逻辑。
- 功能扩展:通过组合不同的拦截器,可以实现更复杂的客户端功能,例如请求和响应的修改、错误处理、日志记录等。
- 使用场景:
- 日志记录:在流式RPC的发送和接收过程中添加日志记录功能。
- 请求修改:在发送请求之前对请求进行修改。
- 响应修改:在接收响应之前对响应进行修改。
package main
import (
"context"
"fmt"
"google.golang.org/grpc"
"time"
)
// 定义一个简单的StreamClientInterceptor
func loggingInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, opts ...grpc.CallOption) (grpc.ClientStream, error) {
stream, err := cc.NewStream(ctx, desc, method, opts...)
if err != nil {
return nil, err
}
return &loggingStream{stream}, nil
}
type loggingStream struct {
grpc.ClientStream
}
func (s *loggingStream) SendMsg(m interface{}) error {
fmt.Printf("Sending message: %v\n", m)
return s.ClientStream.SendMsg(m)
}
func (s *loggingStream) RecvMsg(m interface{}) error {
fmt.Printf("Received message: %v\n", m)
return s.ClientStream.RecvMsg(m)
}
func (s *loggingStream) CloseSend() error {
fmt.Println("Closing send side of the stream")
return s.ClientStream.CloseSend()
}
func (s *loggingStream) Context() context.Context {
return s.ClientStream.Context()
}
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// 创建一个包含拦截器的DialOption
opts := []grpc.DialOption{
grpc.WithChainStreamInterceptor(loggingInterceptor),
}
conn, err := grpc.DialContext(ctx, "localhost:50051", opts...)
if err != nil {
fmt.Printf("Failed to dial: %v\n", err)
return
}
defer conn.Close()
// 客户端可以继续使用这个连接进行RPC调用
// ...
}
- 在这个例子中,我们定义了一个简单的
loggingInterceptor
函数,它会在流式RPC的发送和接收过程中添加日志记录功能。然后,我们使用WithChainStreamInterceptor
选项将这个拦截器添加到客户端的连接配置中。当客户端建立与服务器端口的连接并进行流式RPC调用时,loggingInterceptor
将会被调用,并在发送和接收消息时打印日志。
WithChainUnaryInterceptor(interceptors)
- WithChainUnaryInterceptor返回一个DialOption,用于指定一元rpc的链式拦截器。
- 第一个拦截器将是最外层的,而最后一个拦截器将是围绕实际调用的最内层的包装器。
- 通过这个方法添加的所有拦截器都将被链接起来,并且由WithUnaryInterceptor定义的拦截器将始终被添加到链中。
// WithChainUnaryInterceptor returns a DialOption that specifies the chained
// interceptor for unary RPCs. The first interceptor will be the outer most,
// while the last interceptor will be the inner most wrapper around the real call.
// All interceptors added by this method will be chained, and the interceptor
// defined by WithUnaryInterceptor will always be prepended to the chain.
func WithChainUnaryInterceptor(interceptors ...UnaryClientInterceptor) DialOption {
return newFuncDialOption(func(o *dialOptions) {
o.chainUnaryInts = append(o.chainUnaryInts, interceptors...)
})
}
详情
- 用于为gRPC客户端的单向RPC(unary RPC)添加一个拦截器链(interceptor chain)。这个拦截器链会在客户端发送单向RPC请求和接收响应时被调用。
- 作用:
- 拦截器逻辑:允许客户端在单向RPC的各个阶段添加自定义逻辑。
- 功能扩展:通过组合不同的拦截器,可以实现更复杂的客户端功能,例如请求和响应的修改、错误处理、日志记录等。
- 使用场景:
- 日志记录:在单向RPC的发送和接收过程中添加日志记录功能。
- 请求修改:在发送请求之前对请求进行修改。
- 响应修改:在接收响应之前对响应进行修改。
package main
import (
"context"
"fmt"
"google.golang.org/grpc"
"time"
)
// 定义一个简单的UnaryClientInterceptor
func loggingInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
fmt.Printf("Sending request: %v\n", req)
err := invoker(ctx, method, req, reply, cc, opts...)
if err != nil {
return err
}
fmt.Printf("Received response: %v\n", reply)
return nil
}
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// 创建一个包含拦截器的DialOption
opts := []grpc.DialOption{
grpc.WithChainUnaryInterceptor(loggingInterceptor),
}
conn, err := grpc.DialContext(ctx, "localhost:50051", opts...)
if err != nil {
fmt.Printf("Failed to dial: %v\n", err)
return
}
defer conn.Close()
// 客户端可以继续使用这个连接进行RPC调用
// ...
}
- 在这个例子中,我们定义了一个简单的
loggingInterceptor
函数,它会在单向RPC的发送和接收过程中添加日志记录功能。然后,我们使用WithChainUnaryInterceptor
选项将这个拦截器添加到客户端的连接配置中。当客户端建立与服务器端口的连接并进行单向RPC调用时,loggingInterceptor
将会被调用,并在发送请求和接收响应时打印日志。
WithChannelzParentID(c)
- WithChannelzParentID返回一个DialOption,它指定当前ClientConn父节点的channelz ID。此函数用于嵌套通道创建(例如grpclb dial)。
- 注意:此API是实验性的,可能会在以后的版本中更改或删除。
// WithChannelzParentID returns a DialOption that specifies the channelz ID of
// current ClientConn's parent. This function is used in nested channel creation
// (e.g. grpclb dial).
//
// # Experimental
//
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
// later release.
func WithChannelzParentID(c channelz.Identifier) DialOption {
return newFuncDialOption(func(o *dialOptions) {
o.channelzParent = c
})
}
- 用于配置客户端连接选项,以指定当前客户端连接的父级通道ID。这个函数通常用于嵌套通道创建的情况,例如使用gRPC负载均衡器(grpclb)进行拨号时。
- 功能说明:
WithChannelzParentID
函数允许你为gRPC客户端连接指定一个父级通道ID。在gRPC中,通道(channel)是客户端和服务器之间通信的抽象,而通道ID是用于标识和管理这些通道的机制。通过指定一个父级通道ID,你可以跟踪和管理嵌套的通道,这在使用gRPC负载均衡器或其他类型的通道管理时非常有用。 - 参数说明:c:这是一个
channelz.Identifier
类型的值,用于标识当前客户端连接的父级通道。
package main
import (
"context"
"log"
"google.golang.org/grpc"
"google.golang.org/grpc/channelz"
)
func main() {
serverAddr := "localhost:50051"
// 创建一个channelz标识符
parentID := channelz.Identifier{
ChannelType: channelz.CHANNEL_TYPE_GRPC_CLIENT,
Address: "localhost:50051",
}
// 创建gRPC连接时应用WithChannelzParentID选项
conn, err := grpc.Dial(serverAddr,
grpc.WithInsecure(),
grpc.WithChannelzParentID(parentID),
)
if err != nil {
log.Fatalf("did not connect: %v", err)
}
defer conn.Close()
// ... 使用连接进行后续操作 ...
}
- 在这个示例中,我们创建了一个channelz标识符,并使用了
WithChannelzParentID
选项来创建到服务器的连接。 - 注意事项:
- 这是一个实验性的API,可能会在未来版本中发生变化或被移除。
- 在实际应用中,你可能需要根据你的应用程序架构和通道管理需求来设置父级通道ID。
- 通过使用
WithChannelzParentID
,你可以根据应用程序的需求来跟踪和管理嵌套的gRPC通道,特别是在需要复杂的通道管理时。
WithConnectParams(p)
- WithConnectParams将ClientConn配置为使用提供的ConnectParams来创建和维护到服务器的连接。
- 作为ConnectParams的一部分指定的回退配置覆盖https://github.com/grpc/grpc/blob/master/doc/connection-backoff.md中指定的所有默认值。考虑使用后退。DefaultConfig作为基础,在您只想覆盖回退配置的子集的情况下。
// WithConnectParams configures the ClientConn to use the provided ConnectParams
// for creating and maintaining connections to servers.
//
// The backoff configuration specified as part of the ConnectParams overrides
// all defaults specified in
// https://github.com/grpc/grpc/blob/master/doc/connection-backoff.md. Consider
// using the backoff.DefaultConfig as a base, in cases where you want to
// override only a subset of the backoff configuration.
func WithConnectParams(p ConnectParams) DialOption {
return newFuncDialOption(func(o *dialOptions) {
o.bs = internalbackoff.Exponential{Config: p.Backoff}
o.minConnectTimeout = func() time.Duration {
return p.MinConnectTimeout
}
})
}
- 用于配置客户端连接选项,以使用提供的
ConnectParams
配置来创建和维护与服务器的连接。 - 功能说明:
WithConnectParams
函数允许你提供一个自定义的连接参数配置,这个配置将用于gRPC客户端在创建和维护与服务器的连接时使用。这个选项可以用来配置连接重试的策略,包括重试的间隔、最大尝试次数等。 - 参数说明:p:这是一个
ConnectParams
类型的值,包含了连接参数的配置。
package main
import (
"context"
"log"
"time"
"google.golang.org/grpc"
)
type ConnectParams struct {
Backoff BackoffConfig
MinConnectTimeout time.Duration
}
func main() {
serverAddr := "localhost:50051"
// 设置连接参数
connectParams := ConnectParams{
Backoff: BackoffConfig{
InitialBackoff: 100 * time.Millisecond,
MaxBackoff: 1 * time.Second,
Multiplier: 1.5,
},
MinConnectTimeout: 10 * time.Second,
}
// 创建gRPC连接时应用WithConnectParams选项
conn, err := grpc.Dial(serverAddr,
grpc.WithInsecure(),
grpc.WithConnectParams(connectParams),
)
if err != nil {
log.Fatalf("did not connect: %v", err)
}
defer conn.Close()
// ... 使用连接进行后续操作 ...
}
- 在这个示例中,我们设置了一个连接参数配置,并使用了
WithConnectParams
选项来创建到服务器的连接。 - 注意事项:
- 连接参数配置中的
Backoff
字段可以覆盖gRPC默认的连接重试策略。 - 你可以使用这个选项来配置连接重试的策略,包括重试的间隔、最大尝试次数等。
- 连接参数配置中的
- 通过使用
WithConnectParams
,你可以根据应用程序的需求来配置gRPC客户端的连接行为,特别是在需要自定义连接重试策略时。
WithContextDialer(f)
- WithContextDialer返回一个DialOption,它设置一个dialer来创建连接。如果FailOnNonTempDialError()设置为true,并且f返回一个错误,gRPC检查错误的Temporary()方法来决定是否应该尝试重新连接到网络地址。
- 注意:所有支持的Go版本(截至2023年12月)都覆盖了操作系统默认的TCP保持时间和间隔为15秒。要使TCP保持连接与操作系统默认的保持连接时间和间隔,请使用net。Dialer将KeepAlive字段设置为负值,并将SO_KEEPALIVE套接字选项从Control字段设置为true。有关如何做到这一点的具体示例,请参见internal.NetDialerWithTCPKeepalive()。
// WithContextDialer returns a DialOption that sets a dialer to create
// connections. If FailOnNonTempDialError() is set to true, and an error is
// returned by f, gRPC checks the error's Temporary() method to decide if it
// should try to reconnect to the network address.
//
// Note: All supported releases of Go (as of December 2023) override the OS
// defaults for TCP keepalive time and interval to 15s. To enable TCP keepalive
// with OS defaults for keepalive time and interval, use a net.Dialer that sets
// the KeepAlive field to a negative value, and sets the SO_KEEPALIVE socket
// option to true from the Control field. For a concrete example of how to do
// this, see internal.NetDialerWithTCPKeepalive().
//
// For more information, please see [issue 23459] in the Go github repo.
//
// [issue 23459]: https://github.com/golang/go/issues/23459
func WithContextDialer(f func(context.Context, string) (net.Conn, error)) DialOption {
return newFuncDialOption(func(o *dialOptions) {
o.copts.Dialer = f
})
}
- 用于配置客户端连接选项,以设置一个上下文特定的拨号器(dialer)来创建连接。
- 功能说明:
WithContextDialer
函数允许你提供一个自定义的拨号器,这个拨号器将在创建gRPC客户端连接时使用。你可以使用这个选项来配置网络连接的各个方面,比如超时设置、TCP保活参数等。 - 参数说明:f:这是一个函数类型,接受一个
context.Context
和一个string
类型的地址,并返回一个net.Conn
和一个可能的错误。这个函数将用于创建实际的网络连接。
package main
import (
"context"
"log"
"net"
"time"
"google.golang.org/grpc"
)
func main() {
serverAddr := "localhost:50051"
// 创建一个自定义的拨号器函数
dialer := func(ctx context.Context, addr string) (net.Conn, error) {
d := net.Dialer{
Timeout: 10 * time.Second,
KeepAlive: 15 * time.Second,
}
return d.DialContext(ctx, "tcp", addr)
}
// 创建gRPC连接时应用WithContextDialer选项
conn, err := grpc.Dial(serverAddr,
grpc.WithInsecure(),
grpc.WithContextDialer(dialer),
)
if err != nil {
log.Fatalf("did not connect: %v", err)
}
defer conn.Close()
// ... 使用连接进行后续操作 ...
}
- 在这个示例中,我们创建了一个自定义的拨号器函数,它配置了拨号超时和TCP保活参数。然后,我们使用
WithContextDialer
选项来创建到服务器的连接。 - 注意事项:
- 由于Go 1.19及更高版本会覆盖TCP保活的时间和间隔为15秒,如果你需要使用操作系统的默认保活参数,你可以使用
net.Dialer
并设置KeepAlive
为负值,以及设置SO_KEEPALIVE
套接字选项为真。 - 这个选项是一个实验性的API,可能会在未来版本中发生变化或被移除。
- 你可以使用这个选项来配置网络连接的各个方面,比如超时设置、TCP保活参数等。
- 由于Go 1.19及更高版本会覆盖TCP保活的时间和间隔为15秒,如果你需要使用操作系统的默认保活参数,你可以使用
- 通过使用
WithContextDialer
,你可以根据应用程序的需求来配置gRPC客户端的连接行为,特别是在需要自定义网络连接参数时。
WithCredentialsBundle(b)
- WithCredentialsBundle返回一个DialOption来为ClientConn.WithCreds设置一个凭据包。它不应该与WithTransportCredentials一起使用。
- 注意:此API是实验性的,可能会在以后的版本中更改或删除。
// WithCredentialsBundle returns a DialOption to set a credentials bundle for
// the ClientConn.WithCreds. This should not be used together with
// WithTransportCredentials.
//
// # Experimental
//
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
// later release.
func WithCredentialsBundle(b credentials.Bundle) DialOption {
return newFuncDialOption(func(o *dialOptions) {
o.copts.CredsBundle = b
})
}
- 用于配置客户端连接选项,以设置一个凭证捆绑包(credentials bundle)用于客户端连接。
- 功能说明:
WithCredentialsBundle
函数允许你提供一个凭证捆绑包,这个捆绑包包含了用于认证和加密通信的凭证。这个选项通常用于需要客户端和服务端使用相同或兼容的凭证进行认证的情况。 - 参数说明:b:这是一个
credentials.Bundle
类型的值,包含了用于认证和加密通信的凭证。
package main
import (
"context"
"log"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)
func main() {
serverAddr := "localhost:50051"
// 创建一个凭证捆绑包,包含TLS证书和客户端证书
bundle := credentials.NewBundle()
bundle.Add("path/to/client_cert.pem", "path/to/client_key.pem", "path/to/ca_cert.pem")
// 创建gRPC连接时应用WithCredentialsBundle选项
conn, err := grpc.Dial(serverAddr,
grpc.WithInsecure(), // 或者使用其他传输凭证选项
grpc.WithCredentialsBundle(bundle),
)
if err != nil {
log.Fatalf("did not connect: %v", err)
}
defer conn.Close()
// ... 使用连接进行后续操作 ...
}
- 在这个示例中,我们创建了一个凭证捆绑包,并设置了客户端连接的传输凭证选项。然后,我们使用
WithCredentialsBundle
选项来创建到服务器的连接。 - 注意事项:
- 这是一个实验性的API,可能会在未来版本中发生变化或被移除。
- 不要与
WithTransportCredentials
一起使用。WithTransportCredentials
用于设置传输级别的凭证,而WithCredentialsBundle
用于设置客户端连接级别的凭证。 - 在实际应用中,你可能需要根据你的证书链和信任策略来设置凭证捆绑包。
- 通过使用
WithCredentialsBundle
,你可以根据应用程序的需求来配置gRPC客户端的认证方式,特别是在需要客户端和服务端使用相同或兼容的凭证进行认证时。
WithDefaultCallOptions(cos)
- WithDefaultCallOptions返回一个DialOption,它为连接上的调用设置默认的callooptions。
// WithDefaultCallOptions returns a DialOption which sets the default
// CallOptions for calls over the connection.
func WithDefaultCallOptions(cos ...CallOption) DialOption {
return newFuncDialOption(func(o *dialOptions) {
o.callOptions = append(o.callOptions, cos...)
})
}
详解
- 用于为gRPC客户端的每个RPC调用设置默认的选项(CallOptions)。这些选项会被应用到所有通过该连接发起的RPC调用中,除非在调用时显式覆盖这些选项。
- 作用:
- 默认配置:允许客户端为通过连接发起的所有RPC调用设置默认的行为选项。
- 简化调用:通过设置默认选项,可以简化客户端代码,因为不需要在每个RPC调用中都显式指定这些选项。
- 使用场景:
- 性能优化:通过设置默认的超时时间、压缩选项等,可以优化客户端的整体性能。
- 错误处理:通过设置默认的重试策略,可以改善客户端在处理网络问题时的行为。
package main
import (
"context"
"fmt"
"google.golang.org/grpc"
"time"
)
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// 设置RPC调用的默认超时时间为5秒
opts := []grpc.DialOption{
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(1024 * 1024)), // 设置最大接收消息大小为1MB
}
conn, err := grpc.DialContext(ctx, "localhost:50051", opts...)
if err != nil {
fmt.Printf("Failed to dial: %v\n", err)
return
}
defer conn.Close()
// 客户端可以继续使用这个连接进行RPC调用
// ...
}
- 在这个例子中,我们创建了一个上下文,设置了5秒的超时时间。然后,我们使用
WithDefaultCallOptions
选项为通过连接发起的所有RPC调用设置默认的超时时间。我们还将最大接收消息大小设置为1MB。最后,我们使用这些选项来建立与服务器端口50051的gRPC连接。如果连接成功,客户端可以继续使用这个连接进行RPC调用。
WithDefaultServiceConfig(s)
- 用于配置客户端连接选项,以提供一个默认的服务配置。
// WithDefaultServiceConfig returns a DialOption that configures the default
// service config, which will be used in cases where:
//
// 1. WithDisableServiceConfig is also used, or
//
// 2. The name resolver does not provide a service config or provides an
// invalid service config.
//
// The parameter s is the JSON representation of the default service config.
// For more information about service configs, see:
// https://github.com/grpc/grpc/blob/master/doc/service_config.md
// For a simple example of usage, see:
// examples/features/load_balancing/client/main.go
func WithDefaultServiceConfig(s string) DialOption {
return newFuncDialOption(func(o *dialOptions) {
o.defaultServiceConfigRawJSON = &s
})
}
- 功能说明:
WithDefaultServiceConfig
函数允许你提供一个默认的服务配置,这个配置将在上述情况下被gRPC客户端使用。服务配置通常包含诸如负载均衡策略、重试策略、超时设置等配置信息。如果你不希望gRPC客户端依赖于解析器提供的服务配置,或者解析器提供的服务配置不符合你的需求,你可以使用这个选项来提供一个默认的服务配置。 - 参数说明:s:这是一个
string
类型的值,表示服务配置的JSON表示形式。这个配置将被gRPC客户端使用。
package main
import (
"context"
"log"
"google.golang.org/grpc"
)
func main() {
serverAddr := "localhost:50051"
// 设置默认服务配置的JSON表示形式
defaultServiceConfigJSON := `{
"loadBalancingConfig": {
"grpclb": {
"childPolicy": [
{
"round_robin": {}
}
]
}
}
}`
// 创建gRPC连接时应用WithDefaultServiceConfig选项
conn, err := grpc.Dial(serverAddr,
grpc.WithInsecure(),
grpc.WithDefaultServiceConfig(defaultServiceConfigJSON),
)
if err != nil {
log.Fatalf("did not connect: %v", err)
}
defer conn.Close()
// ... 使用连接进行后续操作 ...
}
- 在这个示例中,我们设置了客户端连接的默认服务配置,并使用
WithDefaultServiceConfig
选项来创建到服务器的连接。 - 注意事项:
- 服务配置的JSON格式必须符合gRPC的服务配置规范。
- 如果同时使用了
WithDisableServiceConfig
和WithDefaultServiceConfig
,gRPC客户端将使用WithDefaultServiceConfig
提供的服务配置。 - 如果你不希望gRPC客户端使用任何服务配置,你可以只使用
WithDisableServiceConfig
选项。
- 通过使用
WithDefaultServiceConfig
,你可以根据应用程序的需求来配置gRPC客户端的行为,特别是在需要控制负载均衡策略和重试策略时。
WithDisableHealthCheck()
- 用于配置客户端连接选项,以禁用客户端连接对所有子连接的健康检查。
// WithDisableHealthCheck disables the LB channel health checking for all
// SubConns of this ClientConn.
//
// # Experimental
//
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
// later release.
func WithDisableHealthCheck() DialOption {
return newFuncDialOption(func(o *dialOptions) {
o.disableHealthCheck = true
})
}
- 功能说明:
WithDisableHealthCheck
函数允许你禁用客户端连接对所有子连接的健康检查。在gRPC中,负载均衡器(LB)通道会定期对子连接进行检查,以确保它们仍然处于活动状态。如果一个子连接不再响应健康检查,负载均衡器会将其从可用连接列表中移除,并可能尝试建立一个新的子连接。- 使用这个选项,你可以告诉gRPC不要进行健康检查,这意味着即使子连接不再响应,它们也不会被从可用连接列表中移除,也不会尝试建立新的子连接。
package main
import (
"context"
"log"
"google.golang.org/grpc"
)
func main() {
serverAddr := "localhost:50051"
// 创建gRPC连接时应用WithDisableHealthCheck选项
conn, err := grpc.Dial(serverAddr,
grpc.WithInsecure(),
grpc.WithDisableHealthCheck(),
)
if err != nil {
log.Fatalf("did not connect: %v", err)
}
defer conn.Close()
// ... 使用连接进行后续操作 ...
}
- 在这个示例中,我们通过
WithDisableHealthCheck
选项来创建到服务器的连接,这会导致gRPC客户端不会对任何子连接进行健康检查。 - 注意事项:
- 这是一个实验性的API,可能会在未来版本中发生变化或被移除。
- 禁用健康检查可能会导致在子连接不再响应时,客户端不会自动尝试建立新的连接,这可能会影响到服务的可用性和可靠性。
- 在某些情况下,禁用健康检查可能是有用的,比如当你知道子连接可能会暂时不可用,但不需要gRPC自动重新建立连接时。
- 通过使用
WithDisableHealthCheck
,你可以更细粒度地控制gRPC客户端的健康检查行为,特别是在需要避免健康检查可能导致的问题时。
WithDisableRetry()
- 用于配置客户端连接选项,以禁用重试机制,即使服务配置启用了它们。
// WithDisableRetry returns a DialOption that disables retries, even if the
// service config enables them. This does not impact transparent retries, which
// will happen automatically if no data is written to the wire or if the RPC is
// unprocessed by the remote server.
func WithDisableRetry() DialOption {
return newFuncDialOption(func(o *dialOptions) {
o.disableRetry = true
})
}
- 功能说明:
WithDisableRetry
函数允许你禁用gRPC客户端的重试机制。在gRPC中,重试机制允许客户端在遇到可重试的错误(如网络问题或服务端错误)时重新发起RPC调用。但是,有些情况下,你可能不希望客户端进行任何重试,即使服务配置中启用了重试。使用这个选项,你可以强制禁用重试。
package main
import (
"context"
"log"
"google.golang.org/grpc"
)
func main() {
serverAddr := "localhost:50051"
// 创建gRPC连接时应用WithDisableRetry选项
conn, err := grpc.Dial(serverAddr,
grpc.WithInsecure(),
grpc.WithDisableRetry(),
)
if err != nil {
log.Fatalf("did not connect: %v", err)
}
defer conn.Close()
// ... 使用连接进行后续操作 ...
}
- 在这个示例中,我们通过
WithDisableRetry
选项来创建到服务器的连接,这会导致gRPC客户端不会进行任何重试,即使服务配置中启用了重试。 - 注意事项:
- 这个选项仅禁用显式的重试机制。透明的重试仍然会发生,如果满足以下条件:没有数据写入到网络,或者RPC在远程服务器上未被处理。
- 禁用重试可能会导致在遇到网络问题或服务端错误时,客户端不会尝试重新连接,这可能会影响到服务的可用性和可靠性。
- 在某些情况下,禁用重试可能是有用的,比如当你知道服务端可能无法处理重试请求,或者当你想要手动控制错误处理逻辑时。
- 通过使用
WithDisableRetry
,你可以更细粒度地控制gRPC客户端的重试行为,特别是在需要避免重试可能导致的问题时。
WithDisableServiceConfig()
- 用于配置客户端连接选项,以忽略通过解析器提供的任何服务配置,并给解析器一个提示,让它不要获取服务配置。
// WithDisableServiceConfig returns a DialOption that causes gRPC to ignore any
// service config provided by the resolver and provides a hint to the resolver
// to not fetch service configs.
//
// Note that this dial option only disables service config from resolver. If
// default service config is provided, gRPC will use the default service config.
func WithDisableServiceConfig() DialOption {
return newFuncDialOption(func(o *dialOptions) {
o.disableServiceConfig = true
})
}
- 功能说明:
WithDisableServiceConfig
函数允许你禁用通过名称解析器传递给gRPC客户端的服务配置。服务配置通常包含诸如负载均衡策略、重试策略、超时设置等配置信息。使用这个选项,你可以告诉gRPC忽略这些配置,并且不会从解析器中获取它们。
package main
import (
"context"
"log"
"google.golang.org/grpc"
)
func main() {
serverAddr := "localhost:50051"
// 创建gRPC连接时应用WithDisableServiceConfig选项
conn, err := grpc.Dial(serverAddr,
grpc.WithInsecure(),
grpc.WithDisableServiceConfig(),
)
if err != nil {
log.Fatalf("did not connect: %v", err)
}
defer conn.Close()
// ... 使用连接进行后续操作 ...
}
- 在这个示例中,我们通过
WithDisableServiceConfig
选项来创建到服务器的连接,这会导致gRPC忽略任何通过解析器提供的服务配置。 - 注意事项:
- 这个选项仅禁用通过解析器提供的服务配置。如果提供了默认的服务配置,gRPC仍然会使用这个默认配置。
- 禁用服务配置可能会导致gRPC客户端的行为与预期不同,特别是如果服务配置包含了重要的负载均衡或重试策略。
- 在某些情况下,禁用服务配置可能是有用的,比如当你想要手动控制所有配置或者当你知道解析器提供的服务配置不适用于你的应用程序时。
- 通过使用
WithDisableServiceConfig
,你可以更细粒度地控制gRPC客户端的行为,尤其是在需要避免解析器提供的配置对客户端造成不必要影响的情况下。
WithIdleTimeout(d)
- 用于配置客户端连接的空闲超时时间。
// WithIdleTimeout returns a DialOption that configures an idle timeout for the
// channel. If the channel is idle for the configured timeout, i.e there are no
// ongoing RPCs and no new RPCs are initiated, the channel will enter idle mode
// and as a result the name resolver and load balancer will be shut down. The
// channel will exit idle mode when the Connect() method is called or when an
// RPC is initiated.
//
// A default timeout of 30 minutes will be used if this dial option is not set
// at dial time and idleness can be disabled by passing a timeout of zero.
//
// # Experimental
//
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
// later release.
func WithIdleTimeout(d time.Duration) DialOption {
return newFuncDialOption(func(o *dialOptions) {
o.idleTimeout = d
})
}
- 功能说明:
WithIdleTimeout
函数允许你设置客户端连接在空闲状态下的超时时间。如果连接在指定的超时时间内没有任何活动(即没有正在进行的RPC调用,也没有新的RPC调用被发起),则连接将进入空闲模式,并关闭名称解析器和负载均衡器。连接将在调用Connect()
方法或发起新的RPC调用时退出空闲模式。 - 参数说明:d:这是一个
time.Duration
类型的值,表示空闲超时时间。当连接空闲超过这个时间时,将触发空闲模式。
package main
import (
"context"
"log"
"time"
"google.golang.org/grpc"
)
func main() {
serverAddr := "localhost:50051"
// 设置空闲超时时间为10分钟
idleTimeout := 10 * time.Minute
conn, err := grpc.Dial(serverAddr,
grpc.WithInsecure(),
grpc.WithIdleTimeout(idleTimeout),
)
if err != nil {
log.Fatalf("did not connect: %v", err)
}
defer conn.Close()
// ... 使用连接进行后续操作 ...
}
- 在这个示例中,我们设置了客户端连接的空闲超时时间为10分钟,并使用
WithIdleTimeout
选项来创建到服务器的连接。 - 注意事项:
- 如果在建立连接时没有设置此选项,默认的空闲超时时间为30分钟。
- 可以通过传递一个超时时间为0的值来禁用空闲超时功能。
- 这是一个实验性的API,可能会在未来版本中发生变化或被移除。
- 通过使用
WithIdleTimeout
,你可以根据应用程序的需求来管理客户端连接的生命周期,特别是在需要节省资源或减少不必要的网络活动时。例如,如果应用程序预期会有较长的空闲时间,则可以设置一个较短的空闲超时时间,以关闭不必要的连接并释放资源。
WithInitialConnWindowSize(s)
- 于配置客户端连接的初始连接窗口大小。
// WithInitialConnWindowSize returns a DialOption which sets the value for
// initial window size on a connection. The lower bound for window size is 64K
// and any value smaller than that will be ignored.
func WithInitialConnWindowSize(s int32) DialOption {
return newFuncDialOption(func(o *dialOptions) {
o.copts.InitialConnWindowSize = s
})
}
- 功能说明:
WithInitialConnWindowSize
函数允许你设置在客户端连接上用于流量控制的初始窗口大小。在gRPC中,流量控制是基于窗口大小的,用于限制发送方在没有得到接收方确认的情况下可以发送的数据量。与WithInitialWindowSize
不同,WithInitialConnWindowSize
影响的是整个连接的初始窗口大小,而不仅仅是单个流。 - 参数说明:s:这是一个
int32
类型的值,表示初始连接窗口大小(以字节为单位)。这个值用于控制客户端在连接建立时可以发送的数据量。
package main
import (
"context"
"log"
"google.golang.org/grpc"
)
func main() {
serverAddr := "localhost:50051"
// 设置初始连接窗口大小为2MB
initialConnWindowSize := int32(2 << 20) // 2MB in bytes
conn, err := grpc.Dial(serverAddr,
grpc.WithInsecure(),
grpc.WithInitialConnWindowSize(initialConnWindowSize),
)
if err != nil {
log.Fatalf("did not connect: %v", err)
}
defer conn.Close()
// ... 使用连接进行后续操作 ...
}
- 在这个示例中,我们设置了客户端连接的初始窗口大小为2MB,并使用
WithInitialConnWindowSize
选项来创建到服务器的连接。 - 注意事项:
- 初始连接窗口大小有一个下限,即64KB。如果传入的值小于64KB,则会被忽略,并使用默认值。
- 设置一个较大的初始连接窗口大小可以减少小数据包发送的频率,从而可能提高性能,特别是在发送大量数据时。
- 但是,设置过大的窗口大小可能会导致网络拥塞,特别是当网络带宽有限或接收方处理能力不足时。
- 通过使用
WithInitialConnWindowSize
,你可以根据网络条件和应用程序的需求来调整客户端的流量控制窗口大小,以优化数据传输效率。这对于管理连接级的数据传输特别有用,尤其是在需要发送大量初始数据或者需要处理多个并发流的情况下。
WithInitialWindowSize(s)
- 用于配置客户端连接的初始窗口大小。
// WithInitialWindowSize returns a DialOption which sets the value for initial
// window size on a stream. The lower bound for window size is 64K and any value
// smaller than that will be ignored.
func WithInitialWindowSize(s int32) DialOption {
return newFuncDialOption(func(o *dialOptions) {
o.copts.InitialWindowSize = s
})
}
- 功能说明:
WithInitialWindowSize
函数允许你设置在流上用于流量控制的初始窗口大小。在gRPC中,流量控制是基于窗口大小的,用于限制发送方在没有得到接收方确认的情况下可以发送的数据量。 - 参数说明:s:这是一个
int32
类型的值,表示初始窗口大小(以字节为单位)。这个值用于控制客户端在开始时可以发送的数据量。
package main
import (
"context"
"log"
"google.golang.org/grpc"
)
func main() {
serverAddr := "localhost:50051"
// 设置初始窗口大小为1MB
initialWindowSize := int32(1 << 20) // 1MB in bytes
conn, err := grpc.Dial(serverAddr,
grpc.WithInsecure(),
grpc.WithInitialWindowSize(initialWindowSize),
)
if err != nil {
log.Fatalf("did not connect: %v", err)
}
defer conn.Close()
// ... 使用连接进行后续操作 ...
}
- 在这个示例中,我们设置了客户端连接的初始窗口大小为1MB,并使用
WithInitialWindowSize
选项来创建到服务器的连接。 - 注意事项:
- 初始窗口大小有一个下限,即64KB。如果传入的值小于64KB,则会被忽略,并使用默认值。
- 设置一个较大的初始窗口大小可以减少小数据包发送的频率,从而可能提高性能,特别是在发送大量数据时。
- 但是,设置过大的窗口大小可能会导致网络拥塞,特别是当网络带宽有限或接收方处理能力不足时。
- 通过使用
WithInitialWindowSize
,你可以根据网络条件和应用程序的需求来调整客户端的流量控制窗口大小,以优化数据传输效率。
WithKeepaliveParams(kp)
- 用于配置客户端连接的保活(keepalive)参数。
// WithKeepaliveParams returns a DialOption that specifies keepalive parameters
// for the client transport.
func WithKeepaliveParams(kp keepalive.ClientParameters) DialOption {
if kp.Time < internal.KeepaliveMinPingTime {
logger.Warningf("Adjusting keepalive ping interval to minimum period of %v", internal.KeepaliveMinPingTime)
kp.Time = internal.KeepaliveMinPingTime
}
return newFuncDialOption(func(o *dialOptions) {
o.copts.KeepaliveParams = kp
})
}
- 功能说明:
WithKeepaliveParams
函数允许你设置客户端传输层的保活参数。保活机制可以确保在长时间没有数据传输的情况下,客户端与服务器之间的连接仍然是活跃的。这对于维护连接的稳定性,特别是在存在网络设备(如NAT、防火墙)可能导致连接超时的情况下,是非常有用的。 - 参数说明:kp:这是一个
keepalive.ClientParameters
类型的结构体,包含了以下字段来配置保活参数:Time
: 客户端发送保活ping的时间间隔。如果设置为0,则禁用保活ping。Timeout
: 等待服务器响应保活ping的超时时间。如果服务器在此时间内没有响应,连接将被关闭。PermitWithoutStream
: 即使没有活动流,也允许发送保活ping。
- 函数内部逻辑:
- 如果传入的
kp.Time
小于internal.KeepaliveMinPingTime
(gRPC内部定义的最小保活ping时间间隔),则函数会调整kp.Time
到最小值,并打印一条警告信息。 - 然后函数返回一个
DialOption
,该选项包含一个函数,该函数将kp
赋值给dialOptions
结构体的copts.KeepaliveParams
字段。
- 如果传入的
package main
import (
"context"
"log"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
)
func main() {
serverAddr := "localhost:50051"
// 设置保活参数
keepaliveParams := keepalive.ClientParameters{
Time: 10 * time.Second, // 发送保活ping的时间间隔
Timeout: 5 * time.Second, // 等待服务器响应保活ping的超时时间
PermitWithoutStream: true, // 即使没有活动流也允许发送保活ping
}
conn, err := grpc.Dial(serverAddr,
grpc.WithInsecure(),
grpc.WithKeepaliveParams(keepaliveParams),
)
if err != nil {
log.Fatalf("did not connect: %v", err)
}
defer conn.Close()
// ... 使用连接进行后续操作 ...
}
- 在这个示例中,我们设置了客户端的保活参数,并使用
WithKeepaliveParams
选项来创建到服务器的连接。 - 注意事项:
- 如果
kp.Time
设置得太低,可能会对服务器造成不必要的负担,因为服务器需要频繁地处理保活ping。 kp.Timeout
应该小于或等于服务器配置的保活超时时间,以避免客户端过早地关闭连接。PermitWithoutStream
选项允许在没有活动流的情况下发送保活ping,这对于维护空闲连接非常有用。
- 如果
- 通过使用
WithKeepaliveParams
,你可以根据应用程序的需求和网络环境来优化客户端的保活行为,从而确保连接的稳定性。
WithMaxCallAttempts(n)
- 用于配置客户端在发起调用时的最大尝试次数。
// WithMaxCallAttempts returns a DialOption that configures the maximum number
// of attempts per call (including retries and hedging) using the channel.
// Service owners may specify a higher value for these parameters, but higher
// values will be treated as equal to the maximum value by the client
// implementation. This mitigates security concerns related to the service
// config being transferred to the client via DNS.
//
// A value of 5 will be used if this dial option is not set or n < 2.
func WithMaxCallAttempts(n int) DialOption {
return newFuncDialOption(func(o *dialOptions) {
if n < 2 {
n = defaultMaxCallAttempts
}
o.maxCallAttempts = n
})
}
- 功能说明:
WithMaxCallAttempts
函数的作用是设置每个调用(包括重试和hedging)的最大尝试次数。以下是对其功能的详细解释:- 重试次数:当客户端发起一个调用,并且遇到可重试的错误(例如网络问题)时,客户端会尝试重新发起调用,直到达到设定的最大尝试次数。
- Hedging:Hedging是一种gRPC特有的机制,允许客户端在等待初始调用结果的同时,发送额外的请求。这些额外的请求会并行执行,目的是提高响应速度。如果其中一个调用成功,其他的调用将被取消。
- 参数说明:
- n:这是函数的参数,表示客户端应该尝试的最大次数。如果
n
小于2,函数将使用默认值defaultMaxCallAttempts
(通常为5)。
- n:这是函数的参数,表示客户端应该尝试的最大次数。如果
package main
import (
"context"
"log"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
func main() {
serverAddr := "localhost:50051"
maxCallAttempts := 3 // 设置最大尝试次数为3
conn, err := grpc.Dial(serverAddr,
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithMaxCallAttempts(maxCallAttempts),
)
if err != nil {
log.Fatalf("did not connect: %v", err)
}
defer conn.Close()
// ... 使用连接进行后续操作 ...
}
- 在这个示例中,如果
maxCallAttempts
小于2,那么客户端将使用默认的最大尝试次数5。 - 注意事项:
- 当不设置
WithMaxCallAttempts
或设置小于2的值时,默认的最大尝试次数是5。 - 在实际应用中,应使用安全的凭据替换
insecure.NewCredentials()
。 WithMaxCallAttempts
选项是在客户端发起连接时设置的,它影响所有通过该连接发出的调用。
- 当不设置
- 通过使用
WithMaxCallAttempts
,你可以根据服务的不稳定性和性能需求,调整客户端的调用行为,以增强系统的健壮性。
WithMaxHeaderListSize(s)
- 该函数允许你设置客户端愿意接受的最大(未压缩)头部列表的大小。这在需要限制传入头部大小以防止潜在的资源耗尽攻击或处理过大的头部时非常有用。
// WithMaxHeaderListSize returns a DialOption that specifies the maximum
// (uncompressed) size of header list that the client is prepared to accept.
func WithMaxHeaderListSize(s uint32) DialOption {
return MaxHeaderListSizeDialOption{
MaxHeaderListSize: s,
}
}
package main
import (
"context"
"log"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
func main() {
// 服务地址
serverAddr := "localhost:50051"
// 设置最大头部列表大小为1MB(以字节为单位)
maxHeaderListSize := uint32(1 << 20) // 1MB
// 创建gRPC连接时应用WithMaxHeaderListSize选项
conn, err := grpc.Dial(serverAddr,
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithMaxHeaderListSize(maxHeaderListSize),
)
if err != nil {
log.Fatalf("did not connect: %v", err)
}
defer conn.Close()
// 假设你有一个服务客户端,你可以使用conn创建客户端实例
// client := NewYourServiceClient(conn)
// ... 使用client进行后续操作 ...
// 这里只是一个示例,所以我们将等待一段时间后关闭连接
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
// 假设你调用了某个RPC方法
// response, err := client.YourRPCMethod(ctx, &YourRequest{})
// if err != nil {
// log.Fatalf("could not call: %v", err)
// }
// log.Printf("Response: %v", response)
// 暂停一段时间后关闭连接
<-ctx.Done()
}
- 在这个示例中,我们首先导入了必要的包,然后设置了服务器的地址和想要设置的最大头部列表大小。接着,我们使用
grpc.Dial
函数创建了一个到服务器的连接,并通过grpc.WithMaxHeaderListSize
传递了我们的配置选项。
WithNoProxy()
- 用于禁用代理(proxy)的使用。这个选项适用于客户端连接(
ClientConn
),当它被设置时,gRPC将不会通过代理与服务器进行通信。
// WithNoProxy returns a DialOption which disables the use of proxies for this
// ClientConn. This is ignored if WithDialer or WithContextDialer are used.
//
// # Experimental
//
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
// later release.
func WithNoProxy() DialOption {
return newFuncDialOption(func(o *dialOptions) {
o.copts.UseProxy = false
})
}
- 作用:
- 代理禁用:允许客户端在建立与服务器的连接时禁用代理。
- 直接连接:通过禁用代理,客户端可以直接与服务器建立连接,绕过任何中间代理服务器。
- 使用场景:
- 代理限制:在客户端不能使用代理服务器或代理服务器不可靠的场景中,可以禁用代理。
- 安全性:在需要确保通信安全性的场景中,通过禁用代理来避免通过不可信的中间节点。
- 需要注意的是,
WithNoProxy
函数是gRPC框架中的一个实验性API,可能会在未来的gRPC版本中更改或移除。因此,在生产环境中使用这个函数之前,应该仔细评估其稳定性和适用性。
WithPerRPCCredentials(creds)
- 用于为每个单向RPC(unary RPC)设置凭证(credentials)。这个选项允许客户端在每次RPC调用时传递特定的凭证,这通常是用于认证的凭证。
// WithPerRPCCredentials returns a DialOption which sets credentials and places
// auth state on each outbound RPC.
func WithPerRPCCredentials(creds credentials.PerRPCCredentials) DialOption {
return newFuncDialOption(func(o *dialOptions) {
o.copts.PerRPCCredentials = append(o.copts.PerRPCCredentials, creds)
})
}
详解
- 作用:
- 认证:允许客户端在每次RPC调用时传递认证凭证,以验证客户端的身份。
- 安全通信:通过设置凭证,可以确保RPC调用是在安全的环境中进行。
- 使用场景:
- 客户端认证:在需要客户端认证的场景中,使用这个函数可以为每个RPC调用设置凭证。
- 安全性要求:在需要保证通信安全性的场景中,通过设置凭证来增强安全性。
package main
import (
"context"
"fmt"
"google.golang.org/grpc"
"time"
)
// 定义一个简单的PerRPCCredentials实现
func customCredentials() (credentials.PerRPCCredentials, error) {
// 实现凭证的获取逻辑
return nil, nil
}
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// 创建一个包含凭证的DialOption
opts := []grpc.DialOption{
grpc.WithPerRPCCredentials(customCredentials),
}
conn, err := grpc.DialContext(ctx, "localhost:50051", opts...)
if err != nil {
fmt.Printf("Failed to dial: %v\n", err)
return
}
defer conn.Close()
// 客户端可以继续使用这个连接进行RPC调用
// ...
}
- 在这个例子中,我们创建了一个上下文,设置了5秒的超时时间。然后,我们使用
WithPerRPCCredentials
选项创建了一个包含自定义凭证的DialOption。这个凭证将被用于每次RPC调用。最后,我们使用这些选项来建立与服务器端口50051的gRPC连接。如果连接成功,客户端可以继续使用这个连接进行RPC调用。
WithReadBufferSize(s)
- 用于设置客户端连接(
ClientConn
)的读缓冲区(read buffer)的大小。这个缓冲区的大小决定了在一次系统调用中可以读取的最大数据量。
// WithReadBufferSize lets you set the size of read buffer, this determines how
// much data can be read at most for each read syscall.
//
// The default value for this buffer is 32KB. Zero or negative values will
// disable read buffer for a connection so data framer can access the
// underlying conn directly.
func WithReadBufferSize(s int) DialOption {
return newFuncDialOption(func(o *dialOptions) {
o.copts.ReadBufferSize = s
})
}
详解
- 作用:
- 性能调优:适当调整读缓冲区的大小可以优化客户端接收数据的性能。
- 资源管理:通过设置合适的读缓冲区大小,可以有效地管理客户端端的内存使用。
- 默认值: 如果没有设置这个选项,gRPC客户端默认的读缓冲区大小是32KB。
- 使用场景:
- 网络条件优化:在网络条件变化时,调整读缓冲区大小可以帮助客户端更好地适应不同的网络条件。
- 接收性能优化:在需要优化接收性能的应用场景中,调整读缓冲区大小可以提高效率。
package main
import (
"context"
"fmt"
"google.golang.org/grpc"
"time"
)
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// 设置客户端连接的读缓冲区大小为100KB
opts := []grpc.DialOption{
grpc.WithReadBufferSize(100 * 1024),
}
conn, err := grpc.DialContext(ctx, "localhost:50051", opts...)
if err != nil {
fmt.Printf("Failed to dial: %v\n", err)
return
}
defer conn.Close()
// 客户端可以继续使用这个连接进行RPC调用
// ...
}
- 在这个例子中,我们创建了一个上下文,设置了5秒的超时时间。然后,我们使用
WithReadBufferSize
选项将客户端连接的读缓冲区大小设置为100KB。最后,我们使用这些选项来建立与服务器端口50051的gRPC连接。如果连接成功,客户端可以继续使用这个连接进行RPC调用。
WithResolvers(rs)
- 用于在客户端连接(
ClientConn
)上注册一系列解析器(resolver)实现,而不需要通过全局注册的方式。这些解析器将被用于解析客户端在当前Dial操作中使用的方案(scheme)。
// WithResolvers allows a list of resolver implementations to be registered
// locally with the ClientConn without needing to be globally registered via
// resolver.Register. They will be matched against the scheme used for the
// current Dial only, and will take precedence over the global registry.
//
// # Experimental
//
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
// later release.
func WithResolvers(rs ...resolver.Builder) DialOption {
return newFuncDialOption(func(o *dialOptions) {
o.resolvers = append(o.resolvers, rs...)
})
}
详解
- 作用:
- 本地解析器注册:允许客户端在本地注册解析器,而不需要全局注册。
- 优先级:本地注册的解析器将优先于全局注册的解析器。
- 使用场景:
- 定制解析器:在需要使用自定义解析器或优先使用特定解析器的场景中,可以使用这个函数。
- 实验性特性:这个函数是一个实验性特性,可能会在未来的gRPC版本中更改或移除。
package main
import (
"context"
"fmt"
"google.golang.org/grpc"
"time"
)
// 定义一个简单的解析器实现
func customResolver(ctx context.Context, s string) (addrs []resolver.Address, err error) {
// 解析地址并返回解析结果
return []resolver.Address{}, nil
}
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// 创建一个包含解析器的DialOption
opts := []grpc.DialOption{
grpc.WithResolvers(customResolver),
}
conn, err := grpc.DialContext(ctx, "localhost:50051", opts...)
if err != nil {
fmt.Printf("Failed to dial: %v\n", err)
return
}
defer conn.Close()
// 客户端可以继续使用这个连接进行RPC调用
// ...
}
- 在这个例子中,我们创建了一个上下文,设置了5秒的超时时间。然后,我们使用
WithResolvers
选项创建了一个包含自定义解析器的DialOption。这个解析器将用于解析地址。最后,我们使用这些选项来建立与服务器端口50051的gRPC连接。如果连接成功,客户端可以继续使用这个连接进行RPC调用。
WithSharedWriteBuffer(val)
- 用于设置客户端连接(
ClientConn
)的写缓冲区(write buffer)是否可以共享。这个选项影响客户端连接发送数据的缓冲机制。
// WithSharedWriteBuffer allows reusing per-connection transport write buffer.
// If this option is set to true every connection will release the buffer after
// flushing the data on the wire.
//
// # Experimental
//
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
// later release.
func WithSharedWriteBuffer(val bool) DialOption {
return newFuncDialOption(func(o *dialOptions) {
o.copts.SharedWriteBuffer = val
})
}
详解
- 作用:
- 缓冲机制控制:允许客户端控制写缓冲区的共享行为,这可以影响客户端发送数据的效率。
- 性能调优:通过调整写缓冲区的共享行为,可以优化客户端的性能。
- 默认值: 默认情况下,写缓冲区是独立的,每个连接都有自己的缓冲区。
- 使用场景:
- 性能优化:在需要优化发送性能的应用场景中,可以通过设置写缓冲区是否共享来优化效率。
- 资源管理:在需要管理客户端资源的场景中,通过设置写缓冲区是否共享来控制资源的使用。
package main
import (
"context"
"fmt"
"google.golang.org/grpc"
"time"
)
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// 设置写缓冲区可以共享
opts := []grpc.DialOption{
grpc.WithSharedWriteBuffer(true),
}
conn, err := grpc.DialContext(ctx, "localhost:50051", opts...)
if err != nil {
fmt.Printf("Failed to dial: %v\n", err)
return
}
defer conn.Close()
// 客户端可以继续使用这个连接进行RPC调用
// ...
}
- 在这个例子中,我们创建了一个上下文,设置了5秒的超时时间。然后,我们使用
WithSharedWriteBuffer
选项将写缓冲区设置为可以共享。最后,我们使用这些选项来建立与服务器端口50051的gRPC连接。如果连接成功,客户端可以继续使用这个连接进行RPC调用。
WithStatsHandler(h)
- 用于为gRPC客户端连接指定统计(stats)处理程序。这个处理程序用于收集和报告关于客户端操作的统计信息,这些信息可以帮助开发者监控客户端性能、诊断问题,并进行性能调优。
// WithStatsHandler returns a DialOption that specifies the stats handler for
// all the RPCs and underlying network connections in this ClientConn.
func WithStatsHandler(h stats.Handler) DialOption {
return newFuncDialOption(func(o *dialOptions) {
if h == nil {
logger.Error("ignoring nil parameter in grpc.WithStatsHandler ClientOption")
// Do not allow a nil stats handler, which would otherwise cause
// panics.
return
}
o.copts.StatsHandlers = append(o.copts.StatsHandlers, h)
})
}
详解
- 作用:
- 性能监控:通过收集客户端操作的统计信息,可以监控客户端性能,及时发现性能瓶颈。
- 问题诊断:当客户端出现问题时,统计信息可以帮助诊断问题原因。
- 性能调优:根据统计数据,可以对客户端配置进行调整,以优化性能。
- 使用场景:
- 性能监控:在需要监控客户端性能的应用场景中,可以通过设置统计处理程序来收集统计信息。
- 问题诊断:在需要诊断客户端问题的应用场景中,可以通过设置统计处理程序来收集诊断信息。
- 性能调优:在需要优化客户端性能的应用场景中,可以通过设置统计处理程序来收集性能数据。
package main
import (
"context"
"fmt"
"google.golang.org/grpc"
"time"
)
// 定义一个简单的统计处理程序
func loggingStatsHandler(s stats.RawClientStats) {
// 处理统计信息,例如记录到日志
fmt.Printf("Received %d bytes on stream", s.StreamBytesReceived)
}
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// 创建一个包含统计处理程序的DialOption
opts := []grpc.DialOption{
grpc.WithStatsHandler(loggingStatsHandler),
}
conn, err := grpc.DialContext(ctx, "localhost:50051", opts...)
if err != nil {
fmt.Printf("Failed to dial: %v\n", err)
return
}
defer conn.Close()
// 客户端可以继续使用这个连接进行RPC调用
// ...
}
- 在这个例子中,我们创建了一个上下文,设置了5秒的超时时间。然后,我们使用
WithStatsHandler
选项创建了一个包含统计处理程序的DialOption。这个统计处理程序将记录每个流接收到的字节数。最后,我们使用这些选项来建立与服务器端口50051的gRPC连接。如果连接成功,客户端可以继续使用这个连接进行RPC调用。
WithStreamInterceptor(f)
- 用于为gRPC客户端的流式RPC(streaming RPC)添加一个拦截器(interceptor)。这个拦截器会在客户端发送流式RPC请求和接收响应时被调用。
// WithStreamInterceptor returns a DialOption that specifies the interceptor for
// streaming RPCs.
func WithStreamInterceptor(f StreamClientInterceptor) DialOption {
return newFuncDialOption(func(o *dialOptions) {
o.streamInt = f
})
}
详解
- 作用:
- 拦截器逻辑:允许客户端在流式RPC的各个阶段添加自定义逻辑。
- 功能扩展:通过组合不同的拦截器,可以实现更复杂的客户端功能,例如请求和响应的修改、错误处理、日志记录等。
- 使用场景:
- 日志记录:在流式RPC的发送和接收过程中添加日志记录功能。
- 请求修改:在发送请求之前对请求进行修改。
- 响应修改:在接收响应之前对响应进行修改。
package main
import (
"context"
"fmt"
"google.golang.org/grpc"
"time"
)
// 定义一个简单的StreamClientInterceptor
func loggingInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, opts ...grpc.CallOption) (grpc.ClientStream, error) {
stream, err := cc.NewStream(ctx, desc, method, opts...)
if err != nil {
return nil, err
}
return &loggingStream{stream}, nil
}
type loggingStream struct {
grpc.ClientStream
}
func (s *loggingStream) SendMsg(m interface{}) error {
fmt.Printf("Sending message: %v\n", m)
return s.ClientStream.SendMsg(m)
}
func (s *loggingStream) RecvMsg(m interface{}) error {
fmt.Printf("Received message: %v\n", m)
return s.ClientStream.RecvMsg(m)
}
func (s *loggingStream) CloseSend() error {
fmt.Println("Closing send side of the stream")
return s.ClientStream.CloseSend()
}
func (s *loggingStream) Context() context.Context {
return s.ClientStream.Context()
}
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// 创建一个包含拦截器的DialOption
opts := []grpc.DialOption{
grpc.WithStreamInterceptor(loggingInterceptor),
}
conn, err := grpc.DialContext(ctx, "localhost:50051", opts...)
if err != nil {
fmt.Printf("Failed to dial: %v\n", err)
return
}
defer conn.Close()
// 客户端可以继续使用这个连接进行RPC调用
// ...
}
- 在这个例子中,我们定义了一个简单的
loggingInterceptor
函数,它会在流式RPC的发送和接收过程中添加日志记录功能。然后,我们使用WithStreamInterceptor
选项将这个拦截器添加到客户端的连接配置中。当客户端建立与服务器端口的连接并进行流式RPC调用时,loggingInterceptor
将会被调用,并在发送和接收消息时打印日志。
WithTransportCredentials(creds)
- 用于配置客户端连接层面的安全凭证(例如TLS/SSL)。这个选项允许客户端在建立与服务器的连接时使用指定的传输级安全凭证。
// WithTransportCredentials returns a DialOption which configures a connection
// level security credentials (e.g., TLS/SSL). This should not be used together
// with WithCredentialsBundle.
func WithTransportCredentials(creds credentials.TransportCredentials) DialOption {
return newFuncDialOption(func(o *dialOptions) {
o.copts.TransportCredentials = creds
})
}
详解
- 作用:
- 安全连接:允许客户端在连接到服务器时使用TLS/SSL等安全协议,以加密传输的数据。
- 证书验证:客户端可以验证服务器的证书,确保连接到的是预期的服务器。
- 使用场景:
- 数据加密:在需要保护传输数据隐私和完整性的场景中,使用TLS/SSL等安全协议。
- 身份验证:在需要验证服务器身份的场景中,使用证书验证机制。
package main
import (
"context"
"fmt"
"google.golang.org/grpc"
"time"
)
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// 配置TLS/SSL传输级安全凭证
creds, err := credentials.NewClientTLSFromFile("server.crt", "")
if err != nil {
fmt.Printf("Failed to load TLS credentials: %v\n", err)
return
}
// 使用传输级安全凭证的DialOption
opts := []grpc.DialOption{
grpc.WithTransportCredentials(creds),
}
conn, err := grpc.DialContext(ctx, "localhost:50051", opts...)
if err != nil {
fmt.Printf("Failed to dial: %v\n", err)
return
}
defer conn.Close()
// 客户端可以继续使用这个连接进行RPC调用
// ...
}
- 在这个例子中,我们创建了一个上下文,设置了5秒的超时时间。然后,我们使用
WithTransportCredentials
选项配置了TLS/SSL传输级安全凭证。最后,我们使用这些选项来建立与服务器端口50051的gRPC连接。如果连接成功,客户端可以继续使用这个连接进行RPC调用。
WithUnaryInterceptor(f)
- 用于为gRPC客户端的单向RPC(unary RPC)添加一个拦截器(interceptor)。这个拦截器会在客户端发送单向RPC请求和接收响应时被调用。
// WithUnaryInterceptor returns a DialOption that specifies the interceptor for
// unary RPCs.
func WithUnaryInterceptor(f UnaryClientInterceptor) DialOption {
return newFuncDialOption(func(o *dialOptions) {
o.unaryInt = f
})
}
详解
- 作用:
- 拦截器逻辑:允许客户端在单向RPC的各个阶段添加自定义逻辑。
- 功能扩展:通过组合不同的拦截器,可以实现更复杂的客户端功能,例如请求和响应的修改、错误处理、日志记录等。
- 使用场景:
- 日志记录:在单向RPC的发送和接收过程中添加日志记录功能。
- 请求修改:在发送请求之前对请求进行修改。
- 响应修改:在接收响应之前对响应进行修改。
package main
import (
"context"
"fmt"
"google.golang.org/grpc"
"time"
)
// 定义一个简单的UnaryClientInterceptor
func loggingInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
fmt.Printf("Sending request: %v\n", req)
err := invoker(ctx, method, req, reply, cc, opts...)
if err != nil {
return err
}
fmt.Printf("Received response: %v\n", reply)
return nil
}
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// 创建一个包含拦截器的DialOption
opts := []grpc.DialOption{
grpc.WithUnaryInterceptor(loggingInterceptor),
}
conn, err := grpc.DialContext(ctx, "localhost:50051", opts...)
if err != nil {
fmt.Printf("Failed to dial: %v\n", err)
return
}
defer conn.Close()
// 客户端可以继续使用这个连接进行RPC调用
// ...
}
- 在这个例子中,我们定义了一个简单的
loggingInterceptor
函数,它会在单向RPC的发送和接收过程中添加日志记录功能。然后,我们使用WithUnaryInterceptor
选项将这个拦截器添加到客户端的连接配置中。当客户端建立与服务器端口的连接并进行单向RPC调用时,loggingInterceptor
将会被调用,并在发送请求和接收响应时打印日志。
WithUserAgent(s)
- 设置 UserAgent。
// WithUserAgent returns a DialOption that specifies a user agent string for all
// the RPCs.
func WithUserAgent(s string) DialOption {
return newFuncDialOption(func(o *dialOptions) {
o.copts.UserAgent = s + " " + grpcUA
})
}
详解
- 用于为所有通过该客户端连接发起的RPC设置用户代理(User Agent)字符串。用户代理字符串通常包含客户端软件的类型、版本、所用操作系统和其他环境信息,它被包含在RPC请求的
User-Agent
头信息中。 - 作用:
- 身份标识:允许客户端在RPC请求中包含用户代理信息,这有助于服务器识别客户端的类型和版本。
- 统计分析:服务器可以通过用户代理信息来分析客户端的使用情况。
- 使用场景:
- 服务监控:在需要监控客户端和服务器交互的场合,用户代理信息可以提供有用的上下文。
- 统计分析:在需要分析客户端和服务器交互的统计数据的场合,用户代理信息可以提供额外的信息。
package main
import (
"context"
"fmt"
"google.golang.org/grpc"
"time"
)
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// 设置RPC调用的用户代理为"my-client/1.0"
opts := []grpc.DialOption{
grpc.WithUserAgent("my-client/1.0"),
}
conn, err := grpc.DialContext(ctx, "localhost:50051", opts...)
if err != nil {
fmt.Printf("Failed to dial: %v\n", err)
return
}
defer conn.Close()
// 客户端可以继续使用这个连接进行RPC调用
// ...
}
- 在这个例子中,我们创建了一个上下文,设置了5秒的超时时间。然后,我们使用
WithUserAgent
选项为通过连接发起的所有RPC设置用户代理为"my-client/1.0”。最后,我们使用这些选项来建立与服务器端口50051的gRPC连接。如果连接成功,客户端可以继续使用这个连接进行RPC调用。
WithWriteBufferSize(s)
- 设置传输层写缓冲,默认值为32KB。可以设置为0或负数,禁止写缓冲
// WithWriteBufferSize determines how much data can be batched before doing a
// write on the wire. The default value for this buffer is 32KB.
//
// Zero or negative values will disable the write buffer such that each write
// will be on underlying connection. Note: A Send call may not directly
// translate to a write.
func WithWriteBufferSize(s int) DialOption {
return newFuncDialOption(func(o *dialOptions) {
o.copts.WriteBufferSize = s
})
}
详解
- 作用:
- 性能调优:适当调整写缓冲区的大小可以优化客户端发送数据的性能。
- 资源管理:通过设置合适的写缓冲区大小,可以有效地管理客户端端的内存使用。
- 默认值: 如果没有设置这个选项,gRPC客户端默认的写缓冲区大小是32KB。
- 使用场景:
- 网络条件优化:在网络条件变化时,调整写缓冲区大小可以帮助客户端更好地适应不同的网络条件。
- 发送性能优化:在需要优化发送性能的应用场景中,调整写缓冲区大小可以提高效率。
- 用于设置客户端连接(
ClientConn
)的写缓冲区(write buffer)的大小。这个缓冲区的大小决定了在一次系统调用中可以发送的最大数据量。
package main
import (
"context"
"fmt"
"google.golang.org/grpc"
"time"
)
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// 设置客户端连接的写缓冲区大小为100KB
opts := []grpc.DialOption{
grpc.WithWriteBufferSize(100 * 1024),
}
conn, err := grpc.DialContext(ctx, "localhost:50051", opts...)
if err != nil {
fmt.Printf("Failed to dial: %v\n", err)
return
}
defer conn.Close()
// 客户端可以继续使用这个连接进行RPC调用
// ...
}
- 在这个例子中,我们创建了一个上下文,设置了5秒的超时时间。然后,我们使用
WithWriteBufferSize
选项将客户端连接的写缓冲区大小设置为100KB。最后,我们使用这些选项来建立与服务器端口50051的gRPC连接。如果连接成功,客户端可以继续使用这个连接进行RPC调用。
type CallOption
- callloption在调用启动前配置调用,或在调用完成后从调用中提取信息。
// CallOption configures a Call before it starts or extracts information from
// a Call after it completes.
type CallOption interface {
// 在调用被发送到任何服务器之前被调用。
// 如果before返回一个非nil错误,RPC就会因为这个错误而失败。
before(*callInfo) error
// After在呼叫完成后被调用。
// After不能返回错误,因此应该通过输出参数报告任何失败。
after(*callInfo, *csAttempt)
}
CallContentSubtype(contentSubtype)
- CallContentSubtype返回一个callloption,它将为调用设置内容子类型。例如,如果content-subtype是"json",那么网络上的Content-Type将是"application/grpc+json"。在包含在Content-Type中之前,content-子类型被转换为小写。详细信息请参见https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests上的内容类型。
- 如果没有使用ForceCodec, content-子类型将用于查找由RegisterCodec控制的注册表中使用的编解码器。有关注册的详细信息,请参阅RegisterCodec的文档。查找content-subtype不区分大小写。如果没有找到这样的编解码器,调用将导致code.internal错误。
- 如果还使用了ForceCodec,那么该编解码器将用于所有请求和响应消息,并将内容子类型设置为请求的给定contentSubtype。
// CallContentSubtype returns a CallOption that will set the content-subtype
// for a call. For example, if content-subtype is "json", the Content-Type over
// the wire will be "application/grpc+json". The content-subtype is converted
// to lowercase before being included in Content-Type. See Content-Type on
// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests for
// more details.
//
// If ForceCodec is not also used, the content-subtype will be used to look up
// the Codec to use in the registry controlled by RegisterCodec. See the
// documentation on RegisterCodec for details on registration. The lookup of
// content-subtype is case-insensitive. If no such Codec is found, the call
// will result in an error with code codes.Internal.
//
// If ForceCodec is also used, that Codec will be used for all request and
// response messages, with the content-subtype set to the given contentSubtype
// here for requests.
func CallContentSubtype(contentSubtype string) CallOption {
return ContentSubtypeCallOption{ContentSubtype: strings.ToLower(contentSubtype)}
}
详解
- 用于设置RPC调用内容的子类型(content-subtype)。这个子类型信息会被添加到通过HTTP/2协议传输的RPC请求的
Content-Type
头信息中。 - 作用:
- 内容类型定义:允许客户端定义RPC调用内容的子类型,这有助于客户端和服务器端根据内容类型选择合适的编解码器(Codec)。
- 定制内容传输:通过设置不同的内容子类型,可以实现不同的内容传输格式,例如JSON、Protobuf等。
- 使用场景:
- 协议兼容性:当客户端和服务器端使用不同的协议时,可以通过设置不同的内容子类型来保持兼容性。
- 内容格式调整:在需要调整RPC调用内容格式时,可以通过设置不同的内容子类型来实现。
package main
import (
"context"
"fmt"
"google.golang.org/grpc"
"time"
)
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// 设置RPC调用的内容子类型为"json"
opts := []grpc.DialOption{
grpc.WithDefaultCallOptions(grpc.CallContentSubtype("json")),
}
conn, err := grpc.DialContext(ctx, "localhost:50051", opts...)
if err != nil {
fmt.Printf("Failed to dial: %v\n", err)
return
}
defer conn.Close()
// 客户端可以继续使用这个连接进行RPC调用
// ...
}
- 在这个例子中,我们创建了一个上下文,设置了5秒的超时时间。然后,我们使用
WithDefaultCallOptions
选项为通过连接发起的所有RPC调用设置默认的内容子类型为"json"。最后,我们使用这些选项来建立与服务器端口50051的gRPC连接。如果连接成功,客户端可以继续使用这个连接进行RPC调用。
ForceCodec(codec)
- 注意:此API是实验性的,可能会在以后的版本中更改或删除。
// ForceCodec returns a CallOption that will set codec to be used for all
// request and response messages for a call. The result of calling Name() will
// be used as the content-subtype after converting to lowercase, unless
// CallContentSubtype is also used.
//
// See Content-Type on
// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests for
// more details. Also see the documentation on RegisterCodec and
// CallContentSubtype for more details on the interaction between Codec and
// content-subtype.
//
// This function is provided for advanced users; prefer to use only
// CallContentSubtype to select a registered codec instead.
//
// # Experimental
//
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
// later release.
func ForceCodec(codec encoding.Codec) CallOption {
return ForceCodecCallOption{Codec: codec}
}
详解
- 用于强制指定一个编解码器(Codec)用于RPC调用的所有请求和响应消息。这个编解码器会覆盖任何通过
CallContentSubtype
设置的内容子类型所选择的编解码器。 - 作用:
- 强制编解码器:允许客户端在RPC调用中强制使用指定的编解码器,而不受内容子类型的影响。
- 定制消息编码:当需要自定义消息的编码方式时,可以使用这个函数来指定编解码器。
- 使用场景:
- 协议兼容性:当客户端和服务器端使用不同的编解码器时,可以通过设置不同的编解码器来实现兼容性。
- 内容格式调整:在需要调整RPC调用内容格式时,可以通过设置不同的编解码器来实现。
package main
import (
"context"
"fmt"
"google.golang.org/grpc"
"time"
)
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// 设置RPC调用的编解码器为JSON
opts := []grpc.DialOption{
grpc.WithDefaultCallOptions(grpc.ForceCodec(&jsonpb.Marshaler{})),
}
conn, err := grpc.DialContext(ctx, "localhost:50051", opts...)
if err != nil {
fmt.Printf("Failed to dial: %v\n", err)
return
}
defer conn.Close()
// 客户端可以继续使用这个连接进行RPC调用
// ...
}
- 在这个例子中,我们创建了一个上下文,设置了5秒的超时时间。然后,我们使用
WithDefaultCallOptions
选项为通过连接发起的所有RPC调用设置默认的编解码器为JSON。最后,我们使用这些选项来建立与服务器端口50051的gRPC连接。如果连接成功,客户端可以继续使用这个连接进行RPC调用。
ForceCodecV2(codec)
- 注意:此API是实验性的,可能会在以后的版本中更改或删除。
// ForceCodecV2 returns a CallOption that will set codec to be used for all
// request and response messages for a call. The result of calling Name() will
// be used as the content-subtype after converting to lowercase, unless
// CallContentSubtype is also used.
//
// See Content-Type on
// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests for
// more details. Also see the documentation on RegisterCodec and
// CallContentSubtype for more details on the interaction between Codec and
// content-subtype.
//
// This function is provided for advanced users; prefer to use only
// CallContentSubtype to select a registered codec instead.
//
// # Experimental
//
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
// later release.
func ForceCodecV2(codec encoding.CodecV2) CallOption {
return ForceCodecV2CallOption{CodecV2: codec}
}
详解
- 用于在gRPC v2版本中强制指定一个编解码器(Codec)用于RPC调用的所有请求和响应消息。这个编解码器会覆盖任何通过
CallContentSubtype
设置的内容子类型所选择的编解码器。 - 作用:
- 强制编解码器:允许客户端在RPC调用中强制使用指定的编解码器,而不受内容子类型的影响。
- 定制消息编码:当需要自定义消息的编码方式时,可以使用这个函数来指定编解码器。
- 使用场景:
- 协议兼容性:当客户端和服务器端使用不同的编解码器时,可以通过设置不同的编解码器来实现兼容性。
- 内容格式调整:在需要调整RPC调用内容格式时,可以通过设置不同的编解码器来实现。
package main
import (
"context"
"fmt"
"google.golang.org/grpc"
"time"
)
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// 设置RPC调用的编解码器为JSON
opts := []grpc.DialOption{
grpc.WithDefaultCallOptions(grpc.ForceCodecV2(&jsonpb.Marshaler{})),
}
conn, err := grpc.DialContext(ctx, "localhost:50051", opts...)
if err != nil {
fmt.Printf("Failed to dial: %v\n", err)
return
}
defer conn.Close()
// 客户端可以继续使用这个连接进行RPC调用
// ...
}
- 在这个例子中,我们创建了一个上下文,设置了5秒的超时时间。然后,我们使用
WithDefaultCallOptions
选项为通过连接发起的所有RPC调用设置默认的编解码器为JSON。最后,我们使用这些选项来建立与服务器端口50051的gRPC连接。如果连接成功,客户端可以继续使用这个连接进行RPC调用。 - 需要注意的是,
ForceCodecV2
函数是gRPC v2版本中的一个实验性API,可能会在未来的gRPC版本中更改或移除。因此,在生产环境中使用这个函数之前,应该仔细评估其稳定性和适用性。
Header(md)
- Header返回一个calllooptions,用于检索一元RPC的Header元数据。
- 在 gRPC 中,grpc.Header 是一个 grpc.CallOption,它用于在客户端接收来自服务端的响应头部元数据。响应头部元数据通常包含有关响应的额外信息,如状态码、警告或其他自定义头部字段。
// Header returns a CallOptions that retrieves the header metadata
// for a unary RPC.
func Header(md *metadata.MD) CallOption {
return HeaderCallOption{HeaderAddr: md}
}
使用示例
- 首先,你需要创建一个 metadata.MD 类型的变量来存储响应的头部元数据。
var headerMD metadata.MD
- 当你发起 gRPC 调用时,将 grpc.Header 选项添加到调用选项列表中,并将你创建的 metadata.MD 容器传递给它。
import (
"context"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
)
func main() {
// 假设 client 是一个 gRPC 客户端实例,req 是请求的结构体实例
// resp 是响应的结构体实例,ctx 是上下文
// 在这个例子中,SomeUnaryRPC 是一个一元 gRPC 调用方法,req 是请求消息,resp 是响应消息。
// grpc.Header(&headerMD) 选项告诉 gRPC 客户端在调用完成时填充 headerMD 变量。
// 发起调用并使用 grpc.Header 选项
resp, err := client.SomeUnaryRPC(ctx, req, grpc.Header(&headerMD))
if err != nil {
// 处理错误
log.Fatalf("RPC failed: %v", err)
}
// 检查头部元数据
if headerVal, ok := headerMD["header-key"]; ok {
// 使用 headerVal
log.Printf("Header 'header-key': %v", headerVal)
}
}
- 调用完成后,你可以检查 headerMD 变量来读取服务端发送的头部元数据。在上面的代码示例中,我们检查了是否存在一个名为 “header-key” 的头部,并打印了它的值。
- 请注意以下几点:
- grpc.Header 选项仅用于获取响应头部元数据,并不影响响应的正文。
- 在一元调用中,头部元数据通常在响应正文之前发送,所以你可以在读取响应正文之前检查头部元数据。
- 如果你想要在调用结束时获取尾部元数据,可以使用 grpc.Trailer 选项。
- 在流式调用(如服务器流或客户端流)中,头部元数据同样可以通过 grpc.Header 选项在首次读取消息之前获取。
- 使用 grpc.Header 可以帮助你在客户端处理服务端发送的额外信息,例如进行认证、日志记录或其他业务逻辑处理。
Header 和 Trailer 区别
- 在 gRPC 中,Header 和 Trailer 都用于传递元数据,但它们在 RPC 调用中的使用时机和目的有所不同:
- Header:
- 发送时机:Header 通常在 RPC 调用开始时发送,即在响应消息之前发送。
- 内容:它们通常包含与请求相关的元数据,比如请求的身份验证信息、内容类型、压缩算法等。
- 用途:Header 可以用来影响请求的处理,比如授权检查或者请求路由。
- 读取时机:在服务器端,Header 可以在处理请求的任何时间点读取;在客户端,通常在发送请求后立即读取响应的 Header。
- Trailer:
- 发送时机:Trailer 在 RPC 调用结束时发送,即在所有响应消息发送完毕后发送。
- 内容:它们通常包含关于整个调用过程的状态信息,比如状态码、错误消息或者调用持续时间等。
- 用途:Trailer 用于提供关于整个调用结果的信息,特别是如果调用失败,它们可以提供额外的错误详情。
- 读取时机:在服务器端,Trailer 通常在响应发送完毕后设置;在客户端,通常在调用完成(成功或失败)后读取 Trailer。
- 主要区别:
- 发送时间点:Header 在调用开始时发送,而 Trailer 在调用结束时发送。
- 内容类型:Header 通常包含请求相关的元数据,Trailer 则包含响应相关的元数据。
- 读取时机:Header 在处理请求之前或期间读取,Trailer 在响应结束后读取。
- 在实际应用中,Header 和 Trailer 的使用取决于特定的需求和场景。例如,如果你需要在请求被完全处理之前就提供一些信息,那么使用 Header 是合适的;如果你需要在请求处理完毕后提供状态信息,那么使用 Trailer 是更好的选择。
MaxCallRecvMsgSize(bytes)
- MaxCallRecvMsgSize返回一个CallOption,它设置客户端可以接收的最大消息大小(以字节为单位)。如果没有设置,gRPC使用默认的4MB。
- 它允许你设置客户端接收消息的最大大小。这个选项可以用来覆盖 gRPC 默认的最大接收消息大小限制,这对于处理大型消息非常有用。
- 默认情况下,gRPC 限制了客户端和服务端可以发送和接收的消息大小,这个限制在 gRPC-Go 中默认设置为 4MB。如果你预期会接收到大于这个大小的消息,你可以使用 grpc.MaxCallRecvMsgSize 来增加这个限制。
// MaxCallRecvMsgSize returns a CallOption which sets the maximum message size
// in bytes the client can receive. If this is not set, gRPC uses the default
// 4MB.
func MaxCallRecvMsgSize(bytes int) CallOption {
return MaxRecvMsgSizeCallOption{MaxRecvMsgSize: bytes}
}
使用示例
- 首先,你需要确定你希望的最大消息大小。这通常是一个整数值,表示字节数。
const maxRecvMsgSize = 10 * 1024 * 1024 // 10 MB
- 在你发起 gRPC 调用时,将 grpc.MaxCallRecvMsgSize 选项添加到调用选项列表中,并传递你设定的最大消息大小。
import (
"context"
"google.golang.org/grpc"
)
func main() {
// 假设 client 是一个 gRPC 客户端实例,req 是请求的结构体实例
// resp 是响应的结构体实例,ctx 是上下文
// 在这个例子中,SomeUnaryRPC 是一个一元 gRPC 调用方法,
// 我们通过 grpc.MaxCallRecvMsgSize(maxRecvMsgSize) 选项设置了最大接收消息大小。
// 发起调用并设置最大接收消息大小
err, resp := client.SomeUnaryRPC(ctx, req, grpc.MaxCallRecvMsgSize(maxRecvMsgSize))
if err != nil {
// 处理错误
log.Fatalf("RPC failed: %v", err)
}
// ...
}
- 注意:
- 设置一个过大的 MaxCallRecvMsgSize 可能会增加客户端和服务端的内存使用,并可能导致性能问题。
- 如果你接收到一个超出设定大小的消息,gRPC 会返回一个错误,通常为 rpc error: code = ResourceExhausted desc = grpc: received message larger than max (X vs. Y),其中 X 是接收到的消息大小,Y 是最大允许大小。
- 你也可以在创建客户端连接时全局设置最大接收消息大小,使用 grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(maxRecvMsgSize)) 选项。
- 使用 grpc.MaxCallRecvMsgSize 可以确保你的客户端能够安全地处理大型消息,而不会因为默认大小限制而失败。
MaxCallSendMsgSize(bytes)
- MaxCallSendMsgSize返回一个CallOption,它设置客户端可以发送的最大消息大小(以字节为单位)。如果没有设置,gRPC使用默认的
math.MaxInt32
。 - 它允许你设置客户端发送消息的最大大小。这个选项可以用来覆盖 gRPC 默认的最大发送消息大小限制,这对于发送大型消息非常有用。
// MaxCallSendMsgSize returns a CallOption which sets the maximum message size
// in bytes the client can send. If this is not set, gRPC uses the default
// `math.MaxInt32`.
func MaxCallSendMsgSize(bytes int) CallOption {
return MaxSendMsgSizeCallOption{MaxSendMsgSize: bytes}
}
使用示例
- 首先,你需要确定你希望的最大消息大小。这通常是一个整数值,表示字节数。
const maxSendMsgSize = 10 * 1024 * 1024 // 10 MB
- 在你发起 gRPC 调用时,将 grpc.MaxCallSendMsgSize 选项添加到调用选项列表中,并传递你设定的最大消息大小。
import (
"context"
"google.golang.org/grpc"
)
func main() {
// 假设 client 是一个 gRPC 客户端实例,req 是请求的结构体实例
// resp 是响应的结构体实例,ctx 是上下文
// 在这个例子中,SomeUnaryRPC 是一个一元 gRPC 调用方法,
// 我们通过 grpc.MaxCallSendMsgSize(maxSendMsgSize) 选项设置了最大发送消息大小。
// 发起调用并设置最大发送消息大小
resp, err := client.SomeUnaryRPC(ctx, req, grpc.MaxCallSendMsgSize(maxSendMsgSize))
if err != nil {
// 处理错误
log.Fatalf("RPC failed: %v", err)
}
// ...
}
- 注意:
- 设置一个过大的 MaxCallSendMsgSize 可能会增加客户端和服务端的内存使用,并可能导致性能问题。
- 如果你发送的消息超出设定大小,gRPC 会返回一个错误,通常为 rpc error: code = ResourceExhausted desc = grpc: trying to send message larger than max (X vs. Y),其中 X 是尝试发送的消息大小,Y 是最大允许大小。
- 你也可以在创建客户端连接时全局设置最大发送消息大小,使用 grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(maxSendMsgSize)) 选项。
- 使用 grpc.MaxCallSendMsgSize 可以确保你的客户端能够安全地发送大型消息,而不会因为默认大小限制而失败。
MaxRetryRPCBufferSize(bytes)
- MaxRetryRPCBufferSize返回一个CallOption,它限制了为重试目的缓冲RPC请求所使用的内容量。
- 注意:此API是实验性的,可能会在以后的版本中更改或删除。
// MaxRetryRPCBufferSize returns a CallOption that limits the amount of memory
// used for buffering this RPC's requests for retry purposes.
//
// # Experimental
//
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
// later release.
func MaxRetryRPCBufferSize(bytes int) CallOption {
return MaxRetryRPCBufferSizeCallOption{bytes}
}
详解
- 用于设置用于重试目的的RPC请求缓冲区的大小限制。这个选项用于控制内存的使用,特别是在需要多次重试RPC请求时。
- 作用:
- 内存控制:允许客户端控制用于重试目的的RPC请求缓冲区的大小,以防止内存耗尽。
- 性能优化:通过限制缓冲区大小,可以优化客户端在重试RPC请求时的性能。
- 使用场景:
- 高重试策略:在需要频繁重试RPC请求的场景中,可以通过设置缓冲区大小限制来避免内存问题。
- 资源管理:在处理大量重试请求时,通过设置合理的缓冲区大小,可以更好地管理客户端资源。
package main
import (
"context"
"fmt"
"google.golang.org/grpc"
"time"
)
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// 设置RPC调用的最大重试请求缓冲区大小为1MB
opts := []grpc.DialOption{
grpc.WithDefaultCallOptions(grpc.MaxRetryRPCBufferSize(1024 * 1024)),
}
conn, err := grpc.DialContext(ctx, "localhost:50051", opts...)
if err != nil {
fmt.Printf("Failed to dial: %v\n", err)
return
}
defer conn.Close()
// 客户端可以继续使用这个连接进行RPC调用
// ...
}
- 在这个例子中,我们创建了一个上下文,设置了5秒的超时时间。然后,我们使用
WithDefaultCallOptions
选项为通过连接发起的所有RPC调用设置默认的最大重试请求缓冲区大小为1MB。最后,我们使用这些选项来建立与服务器端口50051的gRPC连接。如果连接成功,客户端可以继续使用这个连接进行RPC调用。 - 需要注意的是,
MaxRetryRPCBufferSize
函数是gRPC框架中的一个实验性API,可能会在未来的gRPC版本中更改或移除。因此,在生产环境中使用这个函数之前,应该仔细评估其稳定性和适用性。
OnFinish(onFinish)
- OnFinish返回一个callloption,它配置一个回调,当调用完成时调用。传递给回调的错误是RPC的状态,可能是nil。提供的onFinish回调只会被gRPC调用一次。这主要由流拦截器使用,当RPC完成时将收到通知以及有关RPC状态的信息。
- 注意:此API是实验性的,可能会在以后的版本中更改或删除。
// OnFinish returns a CallOption that configures a callback to be called when
// the call completes. The error passed to the callback is the status of the
// RPC, and may be nil. The onFinish callback provided will only be called once
// by gRPC. This is mainly used to be used by streaming interceptors, to be
// notified when the RPC completes along with information about the status of
// the RPC.
//
// # Experimental
//
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
// later release.
func OnFinish(onFinish func(err error)) CallOption {
return OnFinishCallOption{
OnFinish: onFinish,
}
}
详解
- 用于配置一个回调函数,当RPC调用完成时(无论是成功还是失败)都会被调用。这个回调函数接收一个错误参数,该错误表示RPC调用的状态,如果RPC成功完成,则该错误为nil。
- 作用:
- 状态通知:允许客户端在RPC调用完成后收到通知,以便进行相应的处理,例如清理资源、记录日志等。
- 错误处理:当RPC调用失败时,客户端可以捕获错误并进行适当的错误处理。
- 使用场景:
- 资源管理:在RPC调用完成后,客户端可以释放资源或执行其他清理操作。
- 日志记录:记录RPC调用的完成状态,以便于后续分析和调试。
package main
import (
"context"
"fmt"
"google.golang.org/grpc"
"time"
)
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// 设置RPC调用的完成回调
opts := []grpc.DialOption{
grpc.WithDefaultCallOptions(grpc.OnFinish(func(err error) {
if err != nil {
fmt.Printf("RPC failed: %v\n", err)
} else {
fmt.Println("RPC completed successfully")
}
})),
}
conn, err := grpc.DialContext(ctx, "localhost:50051", opts...)
if err != nil {
fmt.Printf("Failed to dial: %v\n", err)
return
}
defer conn.Close()
// 客户端可以继续使用这个连接进行RPC调用
// ...
}
- 在这个例子中,我们创建了一个上下文,设置了5秒的超时时间。然后,我们使用
WithDefaultCallOptions
选项为通过连接发起的所有RPC调用设置默认的完成回调。当RPC调用完成后,无论成功还是失败,都会调用这个回调函数,并打印出相应的信息。最后,我们使用这些选项来建立与服务器端口50051的gRPC连接。如果连接成功,客户端可以继续使用这个连接进行RPC调用。 - 需要注意的是,
OnFinish
函数是gRPC框架中的一个实验性API,可能会在未来的gRPC版本中更改或移除。因此,在生产环境中使用这个函数之前,应该仔细评估其稳定性和适用性。
Peer(p)
- Peer返回一个callloption,用于检索一元RPC的对等信息。在RPC完成后,peer字段将被填充。
// Peer returns a CallOption that retrieves peer information for a unary RPC.
// The peer field will be populated *after* the RPC completes.
func Peer(p *peer.Peer) CallOption {
return PeerCallOption{PeerAddr: p}
}
详解
- 用于获取与单向RPC(unary RPC)相关的对端(peer)信息。这个信息会在RPC调用完成后填充到
p
指向的peer.Peer
结构体中。 - 作用:
- 对端信息获取:允许客户端在RPC调用完成后获取对端的信息,例如对端地址、端口、认证信息等。
- 安全性检查:通过获取对端信息,客户端可以进行安全性检查,例如验证对端的身份。
- 使用场景:
- 安全性验证:在需要验证对端身份或进行安全性检查的场景中,可以通过获取对端信息来验证。
- 性能监控:在需要监控RPC调用性能的场景中,可以通过获取对端信息来分析网络延迟和带宽使用情况。
package main
import (
"context"
"fmt"
"google.golang.org/grpc"
"time"
)
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// 设置RPC调用的对端信息收集
opts := []grpc.DialOption{
grpc.WithDefaultCallOptions(grpc.Peer(new(peer.Peer))),
}
conn, err := grpc.DialContext(ctx, "localhost:50051", opts...)
if err != nil {
fmt.Printf("Failed to dial: %v\n", err)
return
}
defer conn.Close()
// 客户端可以继续使用这个连接进行RPC调用
// ...
}
- 在这个例子中,我们创建了一个上下文,设置了5秒的超时时间。然后,我们使用
WithDefaultCallOptions
选项为通过连接发起的所有RPC调用设置默认的对端信息收集。当RPC调用完成后,peer.Peer
结构体中的信息将被填充。最后,我们使用这些选项来建立与服务器端口50051的gRPC连接。如果连接成功,客户端可以继续使用这个连接进行RPC调用。 - 需要注意的是,
Peer
函数是gRPC框架中的一个实验性API,可能会在未来的gRPC版本中更改或移除。因此,在生产环境中使用这个函数之前,应该仔细评估其稳定性和适用性。
PerRPCCredentials(creds)
- PerRPCCredentials 返回为调用设置 credentials.PerRPCCredentials 的 CallOption。
- 该方法可以指定方法带上指定的认证,而不是使用全局的 grpc.WithPerRPCCredentials。
// PerRPCCredentials returns a CallOption that sets credentials.PerRPCCredentials
// for a call.
func PerRPCCredentials(creds credentials.PerRPCCredentials) CallOption {
return PerRPCCredsCallOption{Creds: creds}
}
使用示例
- 全局配置 PerRPCCredentials。
func main() {
// 配置ssl,"*.heliu.site"在实际开发中从浏览器中取获取,证书路径使用绝对路径
creds, _ := credentials.NewClientTLSFromFile(
"/root/workspace/learn-grpc/key/test.pem",
"*.heliu.site",
)
var opts []grpc.DialOption
// 不带TLS这里是grpc.WithTransportCredentials(insecure.NewCredentials())
opts = append(opts, grpc.WithTransportCredentials(creds))
opts = append(opts, grpc.WithPerRPCCredentials(&ClientTokenAuth{}))
// 添加客户端拦截器
opts = append(opts, grpc.WithUnaryInterceptor(interceptor.UnaryClientInterceptor()))
// 添加流拦截器
opts = append(opts, grpc.WithStreamInterceptor(interceptor.StreamClientInterceptor()))
// 连接server端,使用ssl加密通信
conn, err := grpc.NewClient("127.0.0.1:9090", opts...)
if err != nil {
log.Fatalf("did not connect: %v", err)
}
defer conn.Close()
// 建立连接
client := pb.NewSayHelloClient(conn)
var header metadata.MD
var tr metadata.MD
var opts1 []grpc.CallOption = []grpc.CallOption{
grpc.WaitForReady(false),
grpc.Header(&header),
grpc.Trailer(&tr),
}
// 执行rpc调用(这个方法在服务器端来实现并返回结构)
resp, err := client.SayHello(ctx, &pb.HelloRequest{RequestName: "gh", Age: 12}, opts1...)
if err != nil {
fmt.Printf("%v\n", err)
return
}
//...
}
- 指定方法设置。
func main() {
// 配置ssl,"*.heliu.site"在实际开发中从浏览器中取获取,证书路径使用绝对路径
creds, _ := credentials.NewClientTLSFromFile(
"/root/workspace/learn-grpc/key/test.pem",
"*.heliu.site",
)
var opts []grpc.DialOption
// 不带TLS这里是grpc.WithTransportCredentials(insecure.NewCredentials())
opts = append(opts, grpc.WithTransportCredentials(creds))
//opts = append(opts, grpc.WithPerRPCCredentials(&ClientTokenAuth{}))
// 添加客户端拦截器
opts = append(opts, grpc.WithUnaryInterceptor(interceptor.UnaryClientInterceptor()))
// 添加流拦截器
opts = append(opts, grpc.WithStreamInterceptor(interceptor.StreamClientInterceptor()))
// 连接server端,使用ssl加密通信
conn, err := grpc.NewClient("127.0.0.1:9090", opts...)
if err != nil {
log.Fatalf("did not connect: %v", err)
}
defer conn.Close()
// 建立连接
client := pb.NewSayHelloClient(conn)
var header metadata.MD
var tr metadata.MD
var opts1 []grpc.CallOption = []grpc.CallOption{
grpc.WaitForReady(false),
grpc.Header(&header),
grpc.Trailer(&tr),
grpc.PerRPCCredentials(&ClientTokenAuth{}),
}
// 执行rpc调用(这个方法在服务器端来实现并返回结构)
resp, err := client.SayHello(ctx, &pb.HelloRequest{RequestName: "gh", Age: 12}, opts1...)
if err != nil {
fmt.Printf("%v\n", err)
return
}
//...
}
StaticMethod()
- StaticMethod返回一个callloption,它指定正在对静态方法进行调用,这意味着该方法在编译时是已知的,并且在运行时不会更改。这可以用作统计插件的信号,表明该方法可以安全地作为度量的关键。
// StaticMethod returns a CallOption which specifies that a call is being made
// to a method that is static, which means the method is known at compile time
// and doesn't change at runtime. This can be used as a signal to stats plugins
// that this method is safe to include as a key to a measurement.
func StaticMethod() CallOption {
return StaticMethodCallOption{}
}
详解
- 作用:
- 统计测量:允许客户端在RPC调用中指定方法是静态的,这样统计插件就可以安全地将这些方法作为统计测量(如响应时间)的键。
- 性能分析:在需要分析RPC调用性能的场景中,通过指定方法是静态的,可以更准确地进行性能分析。
- 使用场景:
- 性能监控:在需要监控RPC调用性能的场景中,通过指定方法是静态的,可以更准确地进行性能监控。
- 统计插件集成:在集成统计插件时,可以通过使用
StaticMethod
来确保统计的准确性。
- 用于指定一个RPC调用是静态方法调用。静态方法指的是在编译时已知且在运行时不会更改的方法。这个信息可以被统计插件(stats plugins)用来确定哪些方法可以安全地作为测量(measurement)的键。
package main
import (
"context"
"fmt"
"google.golang.org/grpc"
"time"
)
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// 设置RPC调用的静态方法标志
opts := []grpc.DialOption{
grpc.WithDefaultCallOptions(grpc.StaticMethod()),
}
conn, err := grpc.DialContext(ctx, "localhost:50051", opts...)
if err != nil {
fmt.Printf("Failed to dial: %v\n", err)
return
}
defer conn.Close()
// 客户端可以继续使用这个连接进行RPC调用
// ...
}
- 在这个例子中,我们创建了一个上下文,设置了5秒的超时时间。然后,我们使用
WithDefaultCallOptions
选项为通过连接发起的所有RPC调用设置默认的静态方法标志。当RPC调用完成后,统计插件可以利用这个信息来更准确地进行性能分析。最后,我们使用这些选项来建立与服务器端口50051的gRPC连接。如果连接成功,客户端可以继续使用这个连接进行RPC调用。 - 需要注意的是,
StaticMethod
函数是gRPC框架中的一个实验性API,可能会在未来的gRPC版本中更改或移除。因此,在生产环境中使用这个函数之前,应该仔细评估其稳定性和适用性。
Trailer(md)
- Trailer返回一个calllooptions,用于检索一元RPC的Trailer元数据。
// Trailer returns a CallOptions that retrieves the trailer metadata
// for a unary RPC.
func Trailer(md *metadata.MD) CallOption {
return TrailerCallOption{TrailerAddr: md}
}
详解
- 用于在单向RPC(unary RPC)调用完成后获取响应的尾随元数据(trailer metadata)。尾随元数据是RPC响应的元数据的一部分,它会在响应主体之后传输。
- 作用:
- 元数据获取:允许客户端在RPC调用完成后获取响应的尾随元数据,以便进行后续处理,例如日志记录、统计分析等。
- 使用场景:
- 日志记录:在需要记录RPC调用响应的尾随元数据以进行日志记录的场景中。
- 统计分析:在需要分析RPC调用响应的尾随元数据以进行统计分析的场景中。
package main
import (
"context"
"fmt"
"google.golang.org/grpc"
"time"
)
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// 设置RPC调用的尾随元数据获取
opts := []grpc.DialOption{
grpc.WithDefaultCallOptions(grpc.Trailer(&metadata.MD{})),
}
conn, err := grpc.DialContext(ctx, "localhost:50051", opts...)
if err != nil {
fmt.Printf("Failed to dial: %v\n", err)
return
}
defer conn.Close()
// 客户端可以继续使用这个连接进行RPC调用
// ...
}
- 在这个例子中,我们创建了一个上下文,设置了5秒的超时时间。然后,我们使用
WithDefaultCallOptions
选项为通过连接发起的所有RPC调用设置默认的尾随元数据获取。当RPC调用完成后,metadata.MD
结构体中的元数据将被填充。最后,我们使用这些选项来建立与服务器端口50051的gRPC连接。如果连接成功,客户端可以继续使用这个连接进行RPC调用。
UseCompressor(name)
- UseCompressor返回一个CallOption,用来设置发送请求时使用的压缩器。如果也设置了WithCompressor,则UseCompressor具有更高的优先级。
- 注意:此API是实验性的,可能会在以后的版本中更改或删除。
// UseCompressor returns a CallOption which sets the compressor used when
// sending the request. If WithCompressor is also set, UseCompressor has
// higher priority.
//
// # Experimental
//
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
// later release.
func UseCompressor(name string) CallOption {
return CompressorCallOption{CompressorType: name}
}
- 在 gRPC 中,UseCompressor 是一个客户端或服务器端的选项,它允许你指定在 gRPC 调用中使用的压缩算法。gRPC 支持多种压缩算法,例如 gzip、deflate、zlib 等。“google.golang.org/grpc/encoding/gzip”。
- 通常,gRPC 服务器和客户端之间的压缩是自动处理的,不需要手动干预。如果你需要使用非标准的压缩算法,你可能需要在 gRPC 的 encoding 包中实现自定义的编码器和解码器。
详解
- 用于指定在发送请求时使用的压缩器。这个选项允许客户端选择一个压缩器来压缩请求体,以减少网络传输的数据量。
- 作用:
- 压缩数据传输:允许客户端压缩发送到服务器的请求数据,以减少网络带宽的使用。
- 性能优化:通过压缩数据,可以优化客户端和服务器之间的数据传输性能。
- 使用场景:
- 带宽优化:在网络带宽有限或网络延迟较高的场景中,通过压缩数据可以优化性能。
- 性能调优:在需要优化RPC调用性能的场景中,可以通过设置压缩器来优化传输效率。
package main
import (
"context"
"fmt"
"google.golang.org/grpc"
"time"
)
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// 设置RPC调用的压缩器为GZIP
opts := []grpc.DialOption{
grpc.WithDefaultCallOptions(grpc.UseCompressor("gzip")),
}
conn, err := grpc.DialContext(ctx, "localhost:50051", opts...)
if err != nil {
fmt.Printf("Failed to dial: %v\n", err)
return
}
defer conn.Close()
// 客户端可以继续使用这个连接进行RPC调用
// ...
}
- 在这个例子中,我们创建了一个上下文,设置了5秒的超时时间。然后,我们使用
WithDefaultCallOptions
选项为通过连接发起的所有RPC调用设置默认的压缩器为GZIP。最后,我们使用这些选项来建立与服务器端口50051的gRPC连接。如果连接成功,客户端可以继续使用这个连接进行RPC调用。
WaitForReady(waitForReady)
- WaitForReady在客户端处于TRANSIENT_FAILURE状态时配置RPC的行为,当所有地址连接失败时发生。
- 如果waitForReady为false, RPC将立即失败。否则,客户端将等待,直到连接可用或RPC的截止日期到达。
- 缺省情况下,rpc不等待准备好。
// WaitForReady configures the RPC's behavior when the client is in
// TRANSIENT_FAILURE, which occurs when all addresses fail to connect. If
// waitForReady is false, the RPC will fail immediately. Otherwise, the client
// will wait until a connection becomes available or the RPC's deadline is
// reached.
//
// By default, RPCs do not "wait for ready".
func WaitForReady(waitForReady bool) CallOption {
return FailFastCallOption{FailFast: !waitForReady}
}
使用示例
- 在 gRPC 中,WaitForReady 是一个调用选项,它控制着 RPC 在尝试建立连接时应该如何处理网络不可用或服务不可达的情况。
- 默认情况下,如果没有立即建立连接,gRPC 调用会快速失败。但是,如果你设置了 WaitForReady 选项,那么 gRPC 将会等待连接准备好再发送 RPC 请求。这在服务端可能暂时不可用或者网络状况不佳的情况下非常有用。
package main
import (
"context"
"log"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
)
func main() {
// 假设你有一个服务端的地址
serverAddr := "localhost:50051"
// 连接到 gRPC 服务器
conn, err := grpc.Dial(serverAddr, grpc.WithInsecure(), grpc.WithBlock())
if err != nil {
log.Fatalf("did not connect: %v", err)
}
defer conn.Close()
c := NewYourServiceClient(conn)
// 设置 context,这里使用了 context.WithTimeout 来设置超时时间
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
// 使用 WaitForReady 选项
callOpts := []grpc.CallOption{grpc.WaitForReady(true)}
// 假设有一个叫做 YourRPCMethod 的方法
_, err = c.YourRPCMethod(ctx, &YourRequest{}, callOpts...)
if err != nil {
// 检查错误是否由于服务不可用
if grpc.Code(err) == codes.Unavailable {
log.Printf("Server is unavailable, but we are waiting for it to be ready again")
} else {
log.Fatalf("could not call: %v", err)
}
}
// 其他代码逻辑...
}
- 在上面的代码中,YourServiceClient 和 YourRPCMethod 应该替换为你的具体服务客户端和方法。通过传递 grpc.WaitForReady(true) 到 YourRPCMethod,客户端会等待直到连接准备好,或者直到超时。
- 使用 WaitForReady 时需要谨慎,因为它可能会导致客户端在等待可用连接时挂起,如果服务端长时间不可用,这可能会导致客户端长时间等待。因此,通常建议与超时选项一起使用。
type Peer
- 包对等体定义了与rpc和相应的实用程序相关联的各种对等体信息。
NewContext(ctx, p)
- NewContext 创建一个附加对等信息的 context。
func NewContext(ctx context.Context, p *Peer) context.Context
type Peer
- Peer包含RPC的对端信息,如地址、认证信息等。
type Peer struct {
// Addr为对端地址。
Addr net.Addr
// LocalAddr是本地地址。
LocalAddr net.Addr
// AuthInfo是传输的认证信息。
// 如果没有使用传输安全性,则为nil。
AuthInfo credentials.AuthInfo
}
FromContext(ctx)
- 如果对等体存在,FromContext返回ctx形式的对等体信息。
func FromContext(ctx context.Context) (p *Peer, ok bool)
(p) String()
- String确保Peer类型实现Stringer接口,以便有效地打印带有peerKey值的上下文。
func (p *Peer) String() string