Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xds/internal/xdsclient: Add counter metrics for valid and invalid resource updates #8038

Merged
merged 3 commits into from
Feb 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions internal/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ var (
// gRPC server. An xDS-enabled server needs to know what type of credentials
// is configured on the underlying gRPC server. This is set by server.go.
GetServerCredentials any // func (*grpc.Server) credentials.TransportCredentials
// MetricsRecorderForServer returns the MetricsRecorderList derived from a
// server's stats handlers.
MetricsRecorderForServer any // func (*grpc.Server) estats.MetricsRecorder
// CanonicalString returns the canonical string of the code defined here:
// https://github.com/grpc/grpc/blob/master/doc/statuscodes.md.
//
Expand Down
3 changes: 3 additions & 0 deletions resolver/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (

"google.golang.org/grpc/attributes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/experimental/stats"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/serviceconfig"
)
Expand Down Expand Up @@ -175,6 +176,8 @@ type BuildOptions struct {
// Authority is the effective authority of the clientconn for which the
// resolver is built.
Authority string
// MetricsRecorder is the metrics recorder to do recording.
MetricsRecorder stats.MetricsRecorder
}

// An Endpoint is one network endpoint, or server, which may have multiple
Expand Down
1 change: 1 addition & 0 deletions resolver_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ func (ccr *ccResolverWrapper) start() error {
CredsBundle: ccr.cc.dopts.copts.CredsBundle,
Dialer: ccr.cc.dopts.copts.Dialer,
Authority: ccr.cc.authority,
MetricsRecorder: ccr.cc.metricsRecorderList,
}
var err error
// The delegating resolver is used unless:
Expand Down
5 changes: 5 additions & 0 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,14 @@ import (
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/encoding"
"google.golang.org/grpc/encoding/proto"
estats "google.golang.org/grpc/experimental/stats"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/binarylog"
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/internal/grpcutil"
istats "google.golang.org/grpc/internal/stats"
"google.golang.org/grpc/internal/transport"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/mem"
Expand Down Expand Up @@ -82,6 +84,9 @@ func init() {
internal.BinaryLogger = binaryLogger
internal.JoinServerOptions = newJoinServerOption
internal.BufferPool = bufferPool
internal.MetricsRecorderForServer = func(srv *Server) estats.MetricsRecorder {
return istats.NewMetricsRecorderList(srv.opts.statsHandlers)
}
}

var statusOK = status.New(codes.OK, "")
Expand Down
2 changes: 1 addition & 1 deletion xds/internal/resolver/internal/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,5 @@ var (
NewWRR any // func() wrr.WRR

// NewXDSClient is the function used to create a new xDS client.
NewXDSClient any // func(string) (xdsclient.XDSClient, func(), error)
NewXDSClient any // func(string, estats.MetricsRecorder) (xdsclient.XDSClient, func(), error)
)
21 changes: 14 additions & 7 deletions xds/internal/resolver/xds_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
rand "math/rand/v2"
"sync/atomic"

estats "google.golang.org/grpc/experimental/stats"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/internal/grpcsync"
Expand All @@ -50,13 +51,16 @@ const Scheme = "xds"
// the provided config and a new xDS client in that pool.
func newBuilderWithConfigForTesting(config []byte) (resolver.Builder, error) {
return &xdsResolverBuilder{
newXDSClient: func(name string) (xdsclient.XDSClient, func(), error) {
newXDSClient: func(name string, mr estats.MetricsRecorder) (xdsclient.XDSClient, func(), error) {
config, err := bootstrap.NewConfigFromContents(config)
if err != nil {
return nil, nil, err
}
pool := xdsclient.NewPool(config)
return pool.NewClientForTesting(xdsclient.OptionsForTesting{Name: name})
return pool.NewClientForTesting(xdsclient.OptionsForTesting{
Name: name,
MetricsRecorder: mr,
})
},
}, nil
}
Expand All @@ -66,8 +70,11 @@ func newBuilderWithConfigForTesting(config []byte) (resolver.Builder, error) {
// specific xds client pool being used.
func newBuilderWithPoolForTesting(pool *xdsclient.Pool) (resolver.Builder, error) {
return &xdsResolverBuilder{
newXDSClient: func(name string) (xdsclient.XDSClient, func(), error) {
return pool.NewClientForTesting(xdsclient.OptionsForTesting{Name: name})
newXDSClient: func(name string, mr estats.MetricsRecorder) (xdsclient.XDSClient, func(), error) {
return pool.NewClientForTesting(xdsclient.OptionsForTesting{
Name: name,
MetricsRecorder: mr,
})
},
}, nil
}
Expand All @@ -82,7 +89,7 @@ func init() {
}

type xdsResolverBuilder struct {
newXDSClient func(string) (xdsclient.XDSClient, func(), error)
newXDSClient func(string, estats.MetricsRecorder) (xdsclient.XDSClient, func(), error)
}

// Build helps implement the resolver.Builder interface.
Expand Down Expand Up @@ -115,11 +122,11 @@ func (b *xdsResolverBuilder) Build(target resolver.Target, cc resolver.ClientCon
r.serializerCancel = cancel

// Initialize the xDS client.
newXDSClient := rinternal.NewXDSClient.(func(string) (xdsclient.XDSClient, func(), error))
newXDSClient := rinternal.NewXDSClient.(func(string, estats.MetricsRecorder) (xdsclient.XDSClient, func(), error))
if b.newXDSClient != nil {
newXDSClient = b.newXDSClient
}
client, closeFn, err := newXDSClient(target.String())
client, closeFn, err := newXDSClient(target.String(), opts.MetricsRecorder)
if err != nil {
return nil, fmt.Errorf("xds: failed to create xds-client: %v", err)
}
Expand Down
3 changes: 2 additions & 1 deletion xds/internal/resolver/xds_resolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/google/go-cmp/cmp"
"github.com/google/uuid"
"google.golang.org/grpc/codes"
estats "google.golang.org/grpc/experimental/stats"
"google.golang.org/grpc/internal"
iresolver "google.golang.org/grpc/internal/resolver"
"google.golang.org/grpc/internal/testutils"
Expand Down Expand Up @@ -257,7 +258,7 @@ func (s) TestResolverCloseClosesXDSClient(t *testing.T) {
// client is closed.
origNewClient := rinternal.NewXDSClient
closeCh := make(chan struct{})
rinternal.NewXDSClient = func(string) (xdsclient.XDSClient, func(), error) {
rinternal.NewXDSClient = func(string, estats.MetricsRecorder) (xdsclient.XDSClient, func(), error) {
bc := e2e.DefaultBootstrapContents(t, uuid.New().String(), "dummy-management-server-address")
config, err := bootstrap.NewConfigFromContents(bc)
if err != nil {
Expand Down
10 changes: 10 additions & 0 deletions xds/internal/xdsclient/authority.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"sync"
"sync/atomic"

"google.golang.org/grpc/experimental/stats"
"google.golang.org/grpc/grpclog"
igrpclog "google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/internal/grpcsync"
Expand Down Expand Up @@ -87,6 +88,8 @@ type authority struct {
xdsClientSerializer *grpcsync.CallbackSerializer // Serializer to run call ins from the xDS client, owned by this authority.
xdsClientSerializerClose func() // Function to close the above serializer.
logger *igrpclog.PrefixLogger // Logger for this authority.
target string // The gRPC Channel target.
metricsRecorder stats.MetricsRecorder // The metrics recorder used for emitting metrics.

// The below defined fields must only be accessed in the context of the
// serializer callback, owned by this authority.
Expand Down Expand Up @@ -120,6 +123,8 @@ type authorityBuildOptions struct {
serializer *grpcsync.CallbackSerializer // Callback serializer for invoking watch callbacks
getChannelForADS xdsChannelForADS // Function to acquire a reference to an xdsChannel
logPrefix string // Prefix for logging
target string // Target for the gRPC Channel that owns xDS Client/Authority
metricsRecorder stats.MetricsRecorder // metricsRecorder to emit metrics
}

// newAuthority creates a new authority instance with the provided
Expand All @@ -143,6 +148,8 @@ func newAuthority(args authorityBuildOptions) *authority {
xdsClientSerializerClose: cancel,
logger: igrpclog.NewPrefixLogger(l, logPrefix),
resources: make(map[xdsresource.Type]map[string]*resourceState),
target: args.target,
metricsRecorder: args.metricsRecorder,
}

// Create an ordered list of xdsChannels with their server configs. The
Expand Down Expand Up @@ -358,6 +365,7 @@ func (a *authority) handleADSResourceUpdate(serverConfig *bootstrap.ServerConfig
// On error, keep previous version of the resource. But update status
// and error.
if uErr.Err != nil {
xdsClientResourceUpdatesInvalidMetric.Record(a.metricsRecorder, 1, a.target, serverConfig.ServerURI(), rType.TypeName())
state.md.ErrState = md.ErrState
state.md.Status = md.Status
for watcher := range state.watchers {
Expand All @@ -369,6 +377,8 @@ func (a *authority) handleADSResourceUpdate(serverConfig *bootstrap.ServerConfig
continue
}

xdsClientResourceUpdatesValidMetric.Record(a.metricsRecorder, 1, a.target, serverConfig.ServerURI(), rType.TypeName())

if state.deletionIgnored {
state.deletionIgnored = false
a.logger.Infof("A valid update was received for resource %q of type %q after previously ignoring a deletion", name, rType.TypeName())
Expand Down
15 changes: 8 additions & 7 deletions xds/internal/xdsclient/client_refcounted_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/google/uuid"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/internal/testutils/stats"
"google.golang.org/grpc/internal/testutils/xds/e2e"
"google.golang.org/grpc/internal/xds/bootstrap"
)
Expand Down Expand Up @@ -60,7 +61,7 @@ func (s) TestClientNew_Single(t *testing.T) {
defer func() { xdsClientImplCloseHook = origClientImplCloseHook }()

// The first call to New() should create a new client.
_, closeFunc, err := pool.NewClient(t.Name())
_, closeFunc, err := pool.NewClient(t.Name(), &stats.NoopMetricsRecorder{})
if err != nil {
t.Fatalf("Failed to create xDS client: %v", err)
}
Expand All @@ -76,7 +77,7 @@ func (s) TestClientNew_Single(t *testing.T) {
closeFuncs := make([]func(), count)
for i := 0; i < count; i++ {
func() {
_, closeFuncs[i], err = pool.NewClient(t.Name())
_, closeFuncs[i], err = pool.NewClient(t.Name(), &stats.NoopMetricsRecorder{})
if err != nil {
t.Fatalf("%d-th call to New() failed with error: %v", i, err)
}
Expand Down Expand Up @@ -114,7 +115,7 @@ func (s) TestClientNew_Single(t *testing.T) {

// Calling New() again, after the previous Client was actually closed,
// should create a new one.
_, closeFunc, err = pool.NewClient(t.Name())
_, closeFunc, err = pool.NewClient(t.Name(), &stats.NoopMetricsRecorder{})
if err != nil {
t.Fatalf("Failed to create xDS client: %v", err)
}
Expand Down Expand Up @@ -156,7 +157,7 @@ func (s) TestClientNew_Multiple(t *testing.T) {

// Create two xDS clients.
client1Name := t.Name() + "-1"
_, closeFunc1, err := pool.NewClient(client1Name)
_, closeFunc1, err := pool.NewClient(client1Name, &stats.NoopMetricsRecorder{})
if err != nil {
t.Fatalf("Failed to create xDS client: %v", err)
}
Expand All @@ -171,7 +172,7 @@ func (s) TestClientNew_Multiple(t *testing.T) {
}

client2Name := t.Name() + "-2"
_, closeFunc2, err := pool.NewClient(client2Name)
_, closeFunc2, err := pool.NewClient(client2Name, &stats.NoopMetricsRecorder{})
if err != nil {
t.Fatalf("Failed to create xDS client: %v", err)
}
Expand All @@ -193,7 +194,7 @@ func (s) TestClientNew_Multiple(t *testing.T) {
defer wg.Done()
for i := 0; i < count; i++ {
var err error
_, closeFuncs1[i], err = pool.NewClient(client1Name)
_, closeFuncs1[i], err = pool.NewClient(client1Name, &stats.NoopMetricsRecorder{})
if err != nil {
t.Errorf("%d-th call to New() failed with error: %v", i, err)
}
Expand All @@ -203,7 +204,7 @@ func (s) TestClientNew_Multiple(t *testing.T) {
defer wg.Done()
for i := 0; i < count; i++ {
var err error
_, closeFuncs2[i], err = pool.NewClient(client2Name)
_, closeFuncs2[i], err = pool.NewClient(client2Name, &stats.NoopMetricsRecorder{})
if err != nil {
t.Errorf("%d-th call to New() failed with error: %v", i, err)
}
Expand Down
26 changes: 25 additions & 1 deletion xds/internal/xdsclient/clientimpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"time"

v3statuspb "github.com/envoyproxy/go-control-plane/envoy/service/status/v3"
estats "google.golang.org/grpc/experimental/stats"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/backoff"
"google.golang.org/grpc/internal/grpclog"
Expand Down Expand Up @@ -60,6 +61,21 @@ var (
xdsClientImplCloseHook = func(string) {}

defaultExponentialBackoff = backoff.DefaultExponential.Backoff

xdsClientResourceUpdatesValidMetric = estats.RegisterInt64Count(estats.MetricDescriptor{
Name: "grpc.xds_client.resource_updates_valid",
Description: "A counter of resources received that were considered valid. The counter will be incremented even for resources that have not changed.",
Unit: "resource",
Labels: []string{"grpc.target", "grpc.xds.server", "grpc.xds.resource_type"},
Default: false,
})
xdsClientResourceUpdatesInvalidMetric = estats.RegisterInt64Count(estats.MetricDescriptor{
Name: "grpc.xds_client.resource_updates_invalid",
Description: "A counter of resources received that were considered invalid.",
Unit: "resource",
Labels: []string{"grpc.target", "grpc.xds.server", "grpc.xds.resource_type"},
Default: false,
})
)

// clientImpl is the real implementation of the xDS client. The exported Client
Expand All @@ -78,6 +94,8 @@ type clientImpl struct {
serializer *grpcsync.CallbackSerializer // Serializer for invoking resource watcher callbacks.
serializerClose func() // Function to close the serializer.
logger *grpclog.PrefixLogger // Logger for this client.
metricsRecorder estats.MetricsRecorder // Metrics recorder for metrics.
target string // The gRPC target for this client.

// The clientImpl owns a bunch of channels to individual xDS servers
// specified in the bootstrap configuration. Authorities acquire references
Expand Down Expand Up @@ -111,9 +129,11 @@ func init() {
}

// newClientImpl returns a new xdsClient with the given config.
func newClientImpl(config *bootstrap.Config, watchExpiryTimeout time.Duration, streamBackoff func(int) time.Duration) (*clientImpl, error) {
func newClientImpl(config *bootstrap.Config, watchExpiryTimeout time.Duration, streamBackoff func(int) time.Duration, mr estats.MetricsRecorder, target string) (*clientImpl, error) {
ctx, cancel := context.WithCancel(context.Background())
c := &clientImpl{
metricsRecorder: mr,
target: target,
done: grpcsync.NewEvent(),
authorities: make(map[string]*authority),
config: config,
Expand All @@ -139,6 +159,8 @@ func newClientImpl(config *bootstrap.Config, watchExpiryTimeout time.Duration, s
serializer: c.serializer,
getChannelForADS: c.getChannelForADS,
logPrefix: clientPrefix(c),
target: target,
metricsRecorder: c.metricsRecorder,
})
}
c.topLevelAuthority = newAuthority(authorityBuildOptions{
Expand All @@ -147,6 +169,8 @@ func newClientImpl(config *bootstrap.Config, watchExpiryTimeout time.Duration, s
serializer: c.serializer,
getChannelForADS: c.getChannelForADS,
logPrefix: clientPrefix(c),
target: target,
metricsRecorder: c.metricsRecorder,
})
c.logger = prefixLogger(c)
return c, nil
Expand Down
Loading
Loading