Skip to content

Commit 6dbb97d

Browse files
authored
Add grpc keep alive server and client config for server and client initialization (#7483)
## What changed? <!-- Describe what has changed in this PR --> 1. Add grpc keepalive config in the yaml config. ## Why? <!-- Tell your future self why have you made these changes --> To support configuring grpc keep alive for service and client. ## How did you test it? <!-- How have you verified this change? Tested locally? Added a unit test? Checked in staging env? --> ## Potential risks <!-- Assuming the worst case, what can be broken when deploying this change to production? --> ## Documentation <!-- Have you made sure this change doesn't falsify anything currently stated in `docs/`? If significant new behavior is added, have you described that in `docs/`? --> ## Is hotfix candidate? <!-- Is this PR a hotfix candidate or does it require a notification to be sent to the broader community? (Yes/No) -->
1 parent f6d43e9 commit 6dbb97d

14 files changed

+235
-49
lines changed

client/clientfactory.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,6 @@ func (cf *rpcClientFactory) NewHistoryClientWithTimeout(timeout time.Duration) (
128128
if err != nil {
129129
return nil, err
130130
}
131-
132131
client := history.NewClient(
133132
cf.dynConfig,
134133
resolver,
@@ -155,7 +154,7 @@ func (cf *rpcClientFactory) NewMatchingClientWithTimeout(
155154

156155
keyResolver := newServiceKeyResolver(resolver)
157156
clientProvider := func(clientKey string) (interface{}, error) {
158-
connection := cf.rpcFactory.CreateInternodeGRPCConnection(clientKey)
157+
connection := cf.rpcFactory.CreateMatchingGRPCConnection(clientKey)
159158
return matchingservice.NewMatchingServiceClient(connection), nil
160159
}
161160
client := matching.NewClient(

client/history/client_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,7 @@ func (s *testHistoryService) DeleteDLQTasks(
185185
return &historyservice.DeleteDLQTasksResponse{}, nil
186186
}
187187

188-
func (t *testRPCFactory) CreateInternodeGRPCConnection(rpcAddress string) *grpc.ClientConn {
188+
func (t *testRPCFactory) CreateHistoryGRPCConnection(rpcAddress string) *grpc.ClientConn {
189189
t.dialedAddresses = append(t.dialedAddresses, rpcAddress)
190-
return t.base.CreateInternodeGRPCConnection(rpcAddress)
190+
return t.base.CreateHistoryGRPCConnection(rpcAddress)
191191
}

client/history/connections.go

+2-3
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ type (
5454

5555
// RPCFactory is a subset of the [go.temporal.io/server/common/rpc.RPCFactory] interface to make testing easier.
5656
RPCFactory interface {
57-
CreateInternodeGRPCConnection(rpcAddress string) *grpc.ClientConn
57+
CreateHistoryGRPCConnection(rpcAddress string) *grpc.ClientConn
5858
}
5959

6060
connectionPool interface {
@@ -90,8 +90,7 @@ func (c *connectionPoolImpl) getOrCreateClientConn(addr rpcAddress) clientConnec
9090
if cc, ok = c.mu.conns[addr]; ok {
9191
return cc
9292
}
93-
94-
grpcConn := c.rpcFactory.CreateInternodeGRPCConnection(string(addr))
93+
grpcConn := c.rpcFactory.CreateHistoryGRPCConnection(string(addr))
9594
cc = clientConnection{
9695
historyClient: historyservice.NewHistoryServiceClient(grpcConn),
9796
grpcConn: grpcConn,

client/history/connections_mock.go

+6-6
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

common/config/config.go

+114
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ package config
2727
import (
2828
"bytes"
2929
"fmt"
30+
"math"
3031
"strings"
3132
"time"
3233

@@ -40,9 +41,14 @@ import (
4041
"go.temporal.io/server/common/persistence/visibility/store/elasticsearch/client"
4142
"go.temporal.io/server/common/primitives"
4243
"go.temporal.io/server/common/telemetry"
44+
"google.golang.org/grpc/keepalive"
4345
"gopkg.in/yaml.v3"
4446
)
4547

48+
const (
49+
infinity = time.Duration(math.MaxInt64)
50+
)
51+
4652
type (
4753
// Config contains the configuration for a set of temporal services
4854
Config struct {
@@ -107,6 +113,39 @@ type (
107113
// forwarded from HTTP to gRPC. Any value with a trailing * will match the prefix before
108114
// the asterisk (eg. `x-internal-*`)
109115
HTTPAdditionalForwardedHeaders []string `yaml:"httpAdditionalForwardedHeaders"`
116+
// KeepAliveServerConfig keep alive configuration for the server
117+
KeepAliveServerConfig KeepAliveServerConfig `yaml:"keepAliveServerConfig"`
118+
// ClientConnectionConfig defines the connection config used by other services
119+
// when they create a gRPC client connection to this service.
120+
ClientConnectionConfig ClientConnectionConfig `yaml:"clientConnectionConfig"`
121+
}
122+
123+
KeepAliveServerParameters struct {
124+
MaxConnectionIdle *time.Duration `yaml:"maxConnectionIdle"`
125+
MaxConnectionAge *time.Duration `yaml:"maxConnectionAge"`
126+
MaxConnectionAgeGrace *time.Duration `yaml:"maxConnectionAgeGrace"`
127+
Time *time.Duration `yaml:"keepAliveTime"`
128+
Timeout *time.Duration `yaml:"keepAliveTimeout"`
129+
}
130+
131+
KeepAliveClientParameters struct {
132+
Time *time.Duration `yaml:"keepAliveTime"`
133+
Timeout *time.Duration `yaml:"keepAliveTimeout"`
134+
PermitWithoutStream *bool `yaml:"keepAlivePermitWithoutStream"`
135+
}
136+
137+
ClientConnectionConfig struct {
138+
KeepAliveClientConfig *KeepAliveClientParameters `yaml:"keepAliveClientParameters"`
139+
}
140+
141+
KeepAliveServerEnforcementPolicy struct {
142+
MinTime *time.Duration `yaml:"minTime"`
143+
PermitWithoutStream *bool `yaml:"permitWithoutStream"`
144+
}
145+
146+
KeepAliveServerConfig struct {
147+
KeepAliveServerParameters *KeepAliveServerParameters `yaml:"keepAliveServerParameters"`
148+
KeepAliveEnforcementPolicy *KeepAliveServerEnforcementPolicy `yaml:"keepAliveEnforcementPolicy"`
110149
}
111150

112151
// Global contains config items that apply process-wide to all services
@@ -652,3 +691,78 @@ func (p *JWTKeyProvider) HasSourceURIsConfigured() bool {
652691
}
653692
return false
654693
}
694+
695+
func (k *KeepAliveServerConfig) GetKeepAliveServerParameters() keepalive.ServerParameters {
696+
// the default config is same as grpc default config, same for the below client config and enforcement policy
697+
defaultConfig := keepalive.ServerParameters{
698+
MaxConnectionIdle: infinity,
699+
MaxConnectionAge: infinity,
700+
MaxConnectionAgeGrace: infinity,
701+
Time: 2 * time.Hour,
702+
Timeout: 20 * time.Second,
703+
}
704+
if k == nil || k.KeepAliveServerParameters == nil {
705+
return defaultConfig
706+
}
707+
kp := k.KeepAliveServerParameters
708+
if kp.MaxConnectionIdle != nil {
709+
defaultConfig.MaxConnectionIdle = *kp.MaxConnectionIdle
710+
}
711+
if kp.MaxConnectionAge != nil {
712+
defaultConfig.MaxConnectionAge = *kp.MaxConnectionAge
713+
}
714+
if kp.MaxConnectionAgeGrace != nil {
715+
defaultConfig.MaxConnectionAgeGrace = *kp.MaxConnectionAgeGrace
716+
}
717+
if kp.Time != nil {
718+
defaultConfig.Time = *kp.Time
719+
}
720+
if kp.Timeout != nil {
721+
defaultConfig.Timeout = *kp.Timeout
722+
}
723+
return defaultConfig
724+
}
725+
726+
func (c *ClientConnectionConfig) GetKeepAliveClientParameters() keepalive.ClientParameters {
727+
defaultConfig := keepalive.ClientParameters{
728+
Time: infinity,
729+
Timeout: 20 * time.Second,
730+
PermitWithoutStream: false,
731+
}
732+
733+
if c == nil || c.KeepAliveClientConfig == nil {
734+
return defaultConfig
735+
}
736+
737+
if c.KeepAliveClientConfig.Time != nil {
738+
defaultConfig.Time = *c.KeepAliveClientConfig.Time
739+
}
740+
if c.KeepAliveClientConfig.Timeout != nil {
741+
defaultConfig.Timeout = *c.KeepAliveClientConfig.Timeout
742+
}
743+
if c.KeepAliveClientConfig.PermitWithoutStream != nil {
744+
defaultConfig.PermitWithoutStream = *c.KeepAliveClientConfig.PermitWithoutStream
745+
}
746+
747+
return defaultConfig
748+
}
749+
750+
func (k *KeepAliveServerConfig) GetKeepAliveEnforcementPolicy() keepalive.EnforcementPolicy {
751+
defaultConfig := keepalive.EnforcementPolicy{
752+
MinTime: 5 * time.Minute,
753+
PermitWithoutStream: false,
754+
}
755+
756+
if k == nil || k.KeepAliveEnforcementPolicy == nil {
757+
return defaultConfig
758+
}
759+
760+
if k.KeepAliveEnforcementPolicy.MinTime != nil {
761+
defaultConfig.MinTime = *k.KeepAliveEnforcementPolicy.MinTime
762+
}
763+
if k.KeepAliveEnforcementPolicy.PermitWithoutStream != nil {
764+
defaultConfig.PermitWithoutStream = *k.KeepAliveEnforcementPolicy.PermitWithoutStream
765+
}
766+
767+
return defaultConfig
768+
}

common/resource/fx.go

+1-3
Original file line numberDiff line numberDiff line change
@@ -395,7 +395,6 @@ func RPCFactoryProvider(
395395
tracingStatsHandler telemetry.ClientStatsHandler,
396396
monitor membership.Monitor,
397397
) (common.RPCFactory, error) {
398-
svcCfg := cfg.Services[string(svcName)]
399398
frontendURL, frontendHTTPURL, frontendHTTPPort, frontendTLSConfig, err := getFrontendConnectionDetails(cfg, tlsConfigProvider, resolver)
400399
if err != nil {
401400
return nil, err
@@ -405,9 +404,8 @@ func RPCFactoryProvider(
405404
if tracingStatsHandler != nil {
406405
options = append(options, grpc.WithStatsHandler(tracingStatsHandler))
407406
}
408-
409407
return rpc.NewFactory(
410-
&svcCfg.RPC,
408+
cfg,
411409
svcName,
412410
logger,
413411
tlsConfigProvider,

common/rpc.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,8 @@ type RPCFactory interface {
4040
GetGRPCListener() net.Listener
4141
CreateRemoteFrontendGRPCConnection(rpcAddress string) *grpc.ClientConn
4242
CreateLocalFrontendGRPCConnection() *grpc.ClientConn
43-
CreateInternodeGRPCConnection(rpcAddress string) *grpc.ClientConn
43+
CreateHistoryGRPCConnection(rpcAddress string) *grpc.ClientConn
44+
CreateMatchingGRPCConnection(rpcAddress string) *grpc.ClientConn
4445
CreateLocalFrontendHTTPClient() (*FrontendHTTPClient, error)
4546
}
4647

common/rpc/rpc.go

+30-9
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ var _ common.RPCFactory = (*RPCFactory)(nil)
5353

5454
// RPCFactory is an implementation of common.RPCFactory interface
5555
type RPCFactory struct {
56-
config *config.RPC
56+
config *config.Config
5757
serviceName primitives.ServiceName
5858
logger log.Logger
5959

@@ -74,7 +74,7 @@ type RPCFactory struct {
7474
// NewFactory builds a new RPCFactory
7575
// conforming to the underlying configuration
7676
func NewFactory(
77-
cfg *config.RPC,
77+
cfg *config.Config,
7878
sName primitives.ServiceName,
7979
logger log.Logger,
8080
tlsProvider encryption.TLSConfigProvider,
@@ -150,6 +150,11 @@ func (d *RPCFactory) GetInternodeGRPCServerOptions() ([]grpc.ServerOption, error
150150
opts = append(opts, grpc.Creds(credentials.NewTLS(serverConfig)))
151151
}
152152

153+
rpcConfig := d.config.Services[string(d.serviceName)].RPC
154+
kep := rpcConfig.KeepAliveServerConfig.GetKeepAliveEnforcementPolicy()
155+
kp := rpcConfig.KeepAliveServerConfig.GetKeepAliveServerParameters()
156+
opts = append(opts, grpc.KeepaliveEnforcementPolicy(kep), grpc.KeepaliveParams(kp))
157+
153158
return opts, nil
154159
}
155160

@@ -167,7 +172,8 @@ func (d *RPCFactory) GetGRPCListener() net.Listener {
167172
}
168173

169174
func (d *RPCFactory) createGRPCListener() net.Listener {
170-
hostAddress := net.JoinHostPort(getListenIP(d.config, d.logger).String(), convert.IntToString(d.config.GRPCPort))
175+
rpcConfig := d.config.Services[string(d.serviceName)].RPC
176+
hostAddress := net.JoinHostPort(getListenIP(&rpcConfig, d.logger).String(), convert.IntToString(rpcConfig.GRPCPort))
171177

172178
grpcListener, err := net.Listen("tcp", hostAddress)
173179
if err != nil || grpcListener == nil || grpcListener.Addr() == nil {
@@ -220,17 +226,18 @@ func (d *RPCFactory) CreateRemoteFrontendGRPCConnection(rpcAddress string) *grpc
220226
return nil
221227
}
222228
}
229+
keepAliveOption := d.getClientKeepAliveConfig(primitives.FrontendService)
223230

224-
return d.dial(rpcAddress, tlsClientConfig)
231+
return d.dial(rpcAddress, tlsClientConfig, keepAliveOption)
225232
}
226233

227234
// CreateLocalFrontendGRPCConnection creates connection for internal frontend calls
228235
func (d *RPCFactory) CreateLocalFrontendGRPCConnection() *grpc.ClientConn {
229236
return d.dial(d.frontendURL, d.frontendTLSConfig)
230237
}
231238

232-
// CreateInternodeGRPCConnection creates connection for gRPC calls
233-
func (d *RPCFactory) CreateInternodeGRPCConnection(hostName string) *grpc.ClientConn {
239+
// createInternodeGRPCConnection creates connection for gRPC calls
240+
func (d *RPCFactory) createInternodeGRPCConnection(hostName string, serviceName primitives.ServiceName) *grpc.ClientConn {
234241
if c, ok := d.interNodeGrpcConnections.Get(hostName).(*grpc.ClientConn); ok {
235242
return c
236243
}
@@ -243,13 +250,22 @@ func (d *RPCFactory) CreateInternodeGRPCConnection(hostName string) *grpc.Client
243250
return nil
244251
}
245252
}
246-
c := d.dial(hostName, tlsClientConfig)
253+
c := d.dial(hostName, tlsClientConfig, d.getClientKeepAliveConfig(serviceName))
247254
d.interNodeGrpcConnections.Put(hostName, c)
248255
return c
249256
}
250257

251-
func (d *RPCFactory) dial(hostName string, tlsClientConfig *tls.Config) *grpc.ClientConn {
252-
connection, err := Dial(hostName, tlsClientConfig, d.logger, d.dialOptions...)
258+
func (d *RPCFactory) CreateHistoryGRPCConnection(rpcAddress string) *grpc.ClientConn {
259+
return d.createInternodeGRPCConnection(rpcAddress, primitives.HistoryService)
260+
}
261+
262+
func (d *RPCFactory) CreateMatchingGRPCConnection(rpcAddress string) *grpc.ClientConn {
263+
return d.createInternodeGRPCConnection(rpcAddress, primitives.MatchingService)
264+
}
265+
266+
func (d *RPCFactory) dial(hostName string, tlsClientConfig *tls.Config, dialOptions ...grpc.DialOption) *grpc.ClientConn {
267+
dialOptions = append(d.dialOptions, dialOptions...)
268+
connection, err := Dial(hostName, tlsClientConfig, d.logger, dialOptions...)
253269
if err != nil {
254270
d.logger.Fatal("Failed to create gRPC connection", tag.Error(err))
255271
return nil
@@ -258,6 +274,11 @@ func (d *RPCFactory) dial(hostName string, tlsClientConfig *tls.Config) *grpc.Cl
258274
return connection
259275
}
260276

277+
func (d *RPCFactory) getClientKeepAliveConfig(serviceName primitives.ServiceName) grpc.DialOption {
278+
serviceConfig := d.config.Services[string(serviceName)]
279+
return grpc.WithKeepaliveParams(serviceConfig.RPC.ClientConnectionConfig.GetKeepAliveClientParameters())
280+
}
281+
261282
func (d *RPCFactory) GetTLSConfigProvider() encryption.TLSConfigProvider {
262283
return d.tlsFactory
263284
}

common/rpc/test/rpc_common_test.go

+7
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,13 @@ var (
7676
MembershipPort: 7600,
7777
BindOnIP: localhostIPv4,
7878
}
79+
cfg = &config.Config{
80+
Services: map[string]config.Service{
81+
"frontend": {
82+
RPC: *rpcTestCfgDefault,
83+
},
84+
},
85+
}
7986
serverCfgInsecure = &config.Global{
8087
Membership: config.Membership{
8188
MaxJoinDuration: 5,

0 commit comments

Comments
 (0)