Skip to content

Commit 84f4fa4

Browse files
committed
spanner: instrument client lib with opencensus metrics
This change addes five metrics for session management: * in_use_sessions: the current number of sessions that are checked out from the session pool. * max_in_use_sessions: the maximum number of in_use_sessions in last 10 minutes. This is based on tumbling windows, instead of sliding windows. * max_allowed_sessions: the maximum number of sessions that is configured by the user. * get_sessions_timeout: the cumulative number of get sessions timeouts when pool exhaustion happens. * num_acquired_sessions: the cumulative number of sessions that are checked out from the session pool. * num_released_sessions: the cumulative number of sessions that are released back to the session pool. All metrics are tagged by: * client_id: each database has its own increasing ID sequence. For two different databases, their client IDs all start from "client-1". * database: the database ID. * instance_id: the instance ID. * library_version: the library version from google-cloud-go/internal/version which is a date in YYYYMMDD format. Notes: There are three ways to check out a session from the pool: 1) take(): get a read session called by a user 2) takeWriteSession(): get a read/write session by a user 3) getNextForTx() in healthchecker's worker (session.go:1336): healthchecker's workers convert read sessions to r/w sessions. E.g., when initializing the pool, workers check out 20 read sessions from the pool, turn them into r/w sessions, and put them back to the pool , assume that MinOpended=100 and WriteSessions=0.2. So some metrics are also emitted by case 3. This might confuse users if they are not aware of this behaviour. Related Java PRs: * googleapis/java-spanner#54 * googleapis/java-spanner#65 * googleapis/java-spanner#67 Change-Id: Ie163b08ef18ac2a47e1669fefab92d61fe8f2a82 Reviewed-on: https://code-review.googlesource.com/c/gocloud/+/52953 Reviewed-by: kokoro <noreply+kokoro@google.com> Reviewed-by: Knut Olav Løite <koloite@gmail.com>
1 parent bad702d commit 84f4fa4

File tree

5 files changed

+257
-15
lines changed

5 files changed

+257
-15
lines changed

client.go

+11-2
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,8 @@ const (
5454
)
5555

5656
var (
57-
validDBPattern = regexp.MustCompile("^projects/[^/]+/instances/[^/]+/databases/[^/]+$")
58-
validInstancePattern = regexp.MustCompile("^projects/[^/]+/instances/[^/]+")
57+
validDBPattern = regexp.MustCompile("^projects/(?P<project>[^/]+)/instances/(?P<instance>[^/]+)/databases/(?P<database>[^/]+)$")
58+
validInstancePattern = regexp.MustCompile("^projects/(?P<project>[^/]+)/instances/(?P<instance>[^/]+)")
5959
)
6060

6161
func validDatabaseName(db string) error {
@@ -75,6 +75,15 @@ func getInstanceName(db string) (string, error) {
7575
return matches[0], nil
7676
}
7777

78+
func parseDatabaseName(db string) (project, instance, database string, err error) {
79+
matches := validDBPattern.FindStringSubmatch(db)
80+
if len(matches) == 0 {
81+
return "", "", "", fmt.Errorf("Failed to parse database name from %q according to pattern %q",
82+
db, validDBPattern.String())
83+
}
84+
return matches[1], matches[2], matches[3], nil
85+
}
86+
7887
// Client is a client for reading and writing data to a Cloud Spanner database.
7988
// A client is safe to use concurrently, except for its Close method.
8089
type Client struct {

session.go

+79-9
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,10 @@ import (
2929
"time"
3030

3131
"cloud.google.com/go/internal/trace"
32+
"cloud.google.com/go/internal/version"
3233
vkit "cloud.google.com/go/spanner/apiv1"
34+
"go.opencensus.io/stats"
35+
"go.opencensus.io/tag"
3336
sppb "google.golang.org/genproto/googleapis/spanner/v1"
3437
"google.golang.org/grpc/codes"
3538
"google.golang.org/grpc/metadata"
@@ -520,13 +523,23 @@ type sessionPool struct {
520523
SessionPoolConfig
521524
// hc is the health checker
522525
hc *healthChecker
526+
// rand is a separately sourced random generator.
527+
rand *rand.Rand
528+
// numInUse is the number of sessions that are currently in use (checked out
529+
// from the session pool).
530+
numInUse uint64
531+
// maxNumInUse is the maximum number of sessions in use concurrently in the
532+
// current 10 minute interval.
533+
maxNumInUse uint64
534+
// lastResetTime is the start time of the window for recording maxNumInUse.
535+
lastResetTime time.Time
523536

524537
// mw is the maintenance window containing statistics for the max number of
525538
// sessions checked out of the pool during the last 10 minutes.
526539
mw *maintenanceWindow
527540

528-
// rand is a separately sourced random generator.
529-
rand *rand.Rand
541+
// tagMap is a map of all tags that are associated with the emitted metrics.
542+
tagMap *tag.Map
530543
}
531544

532545
// newSessionPool creates a new session pool.
@@ -557,6 +570,23 @@ func newSessionPool(sc *sessionClient, config SessionPoolConfig) (*sessionPool,
557570
if config.healthCheckSampleInterval == 0 {
558571
config.healthCheckSampleInterval = time.Minute
559572
}
573+
574+
_, instance, database, err := parseDatabaseName(sc.database)
575+
if err != nil {
576+
return nil, err
577+
}
578+
// Errors should not prevent initializing the session pool.
579+
ctx, err := tag.New(context.Background(),
580+
tag.Upsert(tagClientID, sc.id),
581+
tag.Upsert(tagDatabase, database),
582+
tag.Upsert(tagInstance, instance),
583+
tag.Upsert(tagLibVersion, version.Repo),
584+
)
585+
if err != nil {
586+
logf(pool.sc.logger, "Failed to create tag map, error: %v", err)
587+
}
588+
pool.tagMap = tag.FromContext(ctx)
589+
560590
// On GCE VM, within the same region an healthcheck ping takes on average
561591
// 10ms to finish, given a 5 minutes interval and 10 healthcheck workers, a
562592
// healthChecker can effectively mantain
@@ -573,15 +603,21 @@ func newSessionPool(sc *sessionClient, config SessionPoolConfig) (*sessionPool,
573603
return nil, err
574604
}
575605
}
606+
pool.recordStat(context.Background(), MaxAllowedSessionsCount, int64(config.MaxOpened))
576607
close(pool.hc.ready)
577608
return pool, nil
578609
}
579610

611+
func (p *sessionPool) recordStat(ctx context.Context, m *stats.Int64Measure, n int64) {
612+
ctx = tag.NewContext(ctx, p.tagMap)
613+
recordStat(ctx, m, n)
614+
}
615+
580616
func (p *sessionPool) initPool(numSessions int32) error {
581617
p.mu.Lock()
582618
// Take budget before the actual session creation.
583619
p.numOpened += uint64(numSessions)
584-
recordStat(context.Background(), OpenSessionCount, int64(p.numOpened))
620+
p.recordStat(context.Background(), OpenSessionCount, int64(p.numOpened))
585621
p.createReqs += uint64(numSessions)
586622
p.mu.Unlock()
587623
// Asynchronously create the initial sessions for the pool.
@@ -626,7 +662,7 @@ func (p *sessionPool) sessionCreationFailed(err error, numSessions int32) {
626662
defer p.mu.Unlock()
627663
p.createReqs -= uint64(numSessions)
628664
p.numOpened -= uint64(numSessions)
629-
recordStat(context.Background(), OpenSessionCount, int64(p.numOpened))
665+
p.recordStat(context.Background(), OpenSessionCount, int64(p.numOpened))
630666
// Notify other waiters blocking on session creation.
631667
close(p.mayGetSession)
632668
p.mayGetSession = make(chan struct{})
@@ -746,7 +782,7 @@ func (p *sessionPool) createSession(ctx context.Context) (*session, error) {
746782
if !done {
747783
// Session creation failed, give budget back.
748784
p.numOpened--
749-
recordStat(ctx, OpenSessionCount, int64(p.numOpened))
785+
p.recordStat(ctx, OpenSessionCount, int64(p.numOpened))
750786
}
751787
p.createReqs--
752788
// Notify other waiters blocking on session creation.
@@ -823,6 +859,7 @@ func (p *sessionPool) take(ctx context.Context) (*sessionHandle, error) {
823859
if !p.isHealthy(s) {
824860
continue
825861
}
862+
p.incNumInUse(ctx)
826863
return p.newSessionHandle(s), nil
827864
}
828865

@@ -835,6 +872,7 @@ func (p *sessionPool) take(ctx context.Context) (*sessionHandle, error) {
835872
select {
836873
case <-ctx.Done():
837874
trace.TracePrintf(ctx, nil, "Context done waiting for session")
875+
p.recordStat(ctx, GetSessionTimeoutsCount, 1)
838876
return nil, p.errGetSessionTimeout()
839877
case <-mayGetSession:
840878
}
@@ -846,7 +884,7 @@ func (p *sessionPool) take(ctx context.Context) (*sessionHandle, error) {
846884
// Creating a new session that will be returned directly to the client
847885
// means that the max number of sessions in use also increases.
848886
numCheckedOut := p.currSessionsCheckedOutLocked()
849-
recordStat(ctx, OpenSessionCount, int64(p.numOpened))
887+
p.recordStat(ctx, OpenSessionCount, int64(p.numOpened))
850888
p.createReqs++
851889
p.mu.Unlock()
852890
p.mw.updateMaxSessionsCheckedOutDuringWindow(numCheckedOut)
@@ -856,6 +894,7 @@ func (p *sessionPool) take(ctx context.Context) (*sessionHandle, error) {
856894
}
857895
trace.TracePrintf(ctx, map[string]interface{}{"sessionID": s.getID()},
858896
"Created session")
897+
p.incNumInUse(ctx)
859898
return p.newSessionHandle(s), nil
860899
}
861900
}
@@ -908,6 +947,7 @@ func (p *sessionPool) takeWriteSession(ctx context.Context) (*sessionHandle, err
908947
select {
909948
case <-ctx.Done():
910949
trace.TracePrintf(ctx, nil, "Context done waiting for session")
950+
p.recordStat(ctx, GetSessionTimeoutsCount, 1)
911951
return nil, p.errGetSessionTimeout()
912952
case <-mayGetSession:
913953
}
@@ -919,7 +959,7 @@ func (p *sessionPool) takeWriteSession(ctx context.Context) (*sessionHandle, err
919959
// Creating a new session that will be returned directly to the client
920960
// means that the max number of sessions in use also increases.
921961
numCheckedOut := p.currSessionsCheckedOutLocked()
922-
recordStat(ctx, OpenSessionCount, int64(p.numOpened))
962+
p.recordStat(ctx, OpenSessionCount, int64(p.numOpened))
923963
p.createReqs++
924964
p.mu.Unlock()
925965
p.mw.updateMaxSessionsCheckedOutDuringWindow(numCheckedOut)
@@ -945,6 +985,7 @@ func (p *sessionPool) takeWriteSession(ctx context.Context) (*sessionHandle, err
945985
return nil, toSpannerError(err)
946986
}
947987
}
988+
p.incNumInUse(ctx)
948989
return p.newSessionHandle(s), nil
949990
}
950991
}
@@ -968,6 +1009,9 @@ func (p *sessionPool) recycle(s *session) bool {
9681009
// Broadcast that a session has been returned to idle list.
9691010
close(p.mayGetSession)
9701011
p.mayGetSession = make(chan struct{})
1012+
p.numInUse--
1013+
p.recordStat(context.Background(), InUseSessionsCount, int64(p.numInUse))
1014+
p.recordStat(context.Background(), ReleasedSessionsCount, 1)
9711015
return true
9721016
}
9731017

@@ -992,7 +1036,7 @@ func (p *sessionPool) remove(s *session, isExpire bool) bool {
9921036
if s.invalidate() {
9931037
// Decrease the number of opened sessions.
9941038
p.numOpened--
995-
recordStat(context.Background(), OpenSessionCount, int64(p.numOpened))
1039+
p.recordStat(context.Background(), OpenSessionCount, int64(p.numOpened))
9961040
// Broadcast that a session has been destroyed.
9971041
close(p.mayGetSession)
9981042
p.mayGetSession = make(chan struct{})
@@ -1005,6 +1049,22 @@ func (p *sessionPool) currSessionsCheckedOutLocked() uint64 {
10051049
return p.numOpened - uint64(p.idleList.Len()) - uint64(p.idleWriteList.Len())
10061050
}
10071051

1052+
func (p *sessionPool) incNumInUse(ctx context.Context) {
1053+
p.mu.Lock()
1054+
p.incNumInUseLocked(ctx)
1055+
p.mu.Unlock()
1056+
}
1057+
1058+
func (p *sessionPool) incNumInUseLocked(ctx context.Context) {
1059+
p.numInUse++
1060+
p.recordStat(ctx, InUseSessionsCount, int64(p.numInUse))
1061+
p.recordStat(ctx, AcquiredSessionsCount, 1)
1062+
if p.numInUse > p.maxNumInUse {
1063+
p.maxNumInUse = p.numInUse
1064+
p.recordStat(ctx, MaxInUseSessionsCount, int64(p.maxNumInUse))
1065+
}
1066+
}
1067+
10081068
// hcHeap implements heap.Interface. It is used to create the priority queue for
10091069
// session healthchecks.
10101070
type hcHeap struct {
@@ -1305,6 +1365,7 @@ func (hc *healthChecker) worker(i int) {
13051365
session := hc.pool.idleList.Remove(hc.pool.idleList.Front()).(*session)
13061366
session.checkingHealth = true
13071367
hc.pool.prepareReqs++
1368+
hc.pool.incNumInUseLocked(context.Background())
13081369
return session
13091370
}
13101371
}
@@ -1381,6 +1442,15 @@ func (hc *healthChecker) maintainer() {
13811442
currSessionsOpened := hc.pool.numOpened
13821443
maxIdle := hc.pool.MaxIdle
13831444
minOpened := hc.pool.MinOpened
1445+
1446+
// Reset the start time for recording the maximum number of sessions
1447+
// in the pool.
1448+
now := time.Now()
1449+
if now.After(hc.pool.lastResetTime.Add(10 * time.Minute)) {
1450+
hc.pool.maxNumInUse = hc.pool.numInUse
1451+
hc.pool.recordStat(context.Background(), MaxInUseSessionsCount, int64(hc.pool.maxNumInUse))
1452+
hc.pool.lastResetTime = now
1453+
}
13841454
hc.pool.mu.Unlock()
13851455
// Get the maximum number of sessions in use during the current
13861456
// maintenance window.
@@ -1438,7 +1508,7 @@ func (hc *healthChecker) growPool(ctx context.Context, growToNumSessions uint64)
14381508
break
14391509
}
14401510
p.numOpened++
1441-
recordStat(ctx, OpenSessionCount, int64(p.numOpened))
1511+
p.recordStat(ctx, OpenSessionCount, int64(p.numOpened))
14421512
p.createReqs++
14431513
shouldPrepareWrite := p.shouldPrepareWriteLocked()
14441514
p.mu.Unlock()

sessionclient.go

+27
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package spanner
1818

1919
import (
2020
"context"
21+
"fmt"
2122
"log"
2223
"sync"
2324
"time"
@@ -31,6 +32,30 @@ import (
3132
"google.golang.org/grpc/metadata"
3233
)
3334

35+
var cidGen = newClientIDGenerator()
36+
37+
type clientIDGenerator struct {
38+
mu sync.Mutex
39+
ids map[string]int
40+
}
41+
42+
func newClientIDGenerator() *clientIDGenerator {
43+
return &clientIDGenerator{ids: make(map[string]int)}
44+
}
45+
46+
func (cg *clientIDGenerator) nextID(database string) string {
47+
cg.mu.Lock()
48+
defer cg.mu.Unlock()
49+
var id int
50+
if val, ok := cg.ids[database]; ok {
51+
id = val + 1
52+
} else {
53+
id = 1
54+
}
55+
cg.ids[database] = id
56+
return fmt.Sprintf("client-%d", id)
57+
}
58+
3459
// sessionConsumer is passed to the batchCreateSessions method and will receive
3560
// the sessions that are created as they become available. A sessionConsumer
3661
// implementation must be safe for concurrent use.
@@ -59,6 +84,7 @@ type sessionClient struct {
5984

6085
connPool gtransport.ConnPool
6186
database string
87+
id string
6288
sessionLabels map[string]string
6389
md metadata.MD
6490
batchTimeout time.Duration
@@ -70,6 +96,7 @@ func newSessionClient(connPool gtransport.ConnPool, database string, sessionLabe
7096
return &sessionClient{
7197
connPool: connPool,
7298
database: database,
99+
id: cidGen.nextID(database),
73100
sessionLabels: sessionLabels,
74101
md: md,
75102
batchTimeout: time.Minute,

sessionclient_test.go

+16
Original file line numberDiff line numberDiff line change
@@ -357,3 +357,19 @@ func TestBatchCreateSessions_WithTimeout(t *testing.T) {
357357
}
358358
client.Close()
359359
}
360+
361+
func TestClientIDGenerator(t *testing.T) {
362+
cidGen = newClientIDGenerator()
363+
for _, tt := range []struct {
364+
database string
365+
clientID string
366+
}{
367+
{"db", "client-1"},
368+
{"db-new", "client-1"},
369+
{"db", "client-2"},
370+
} {
371+
if got, want := cidGen.nextID(tt.database), tt.clientID; got != want {
372+
t.Fatalf("Generate wrong client ID: got %v, want %v", got, want)
373+
}
374+
}
375+
}

0 commit comments

Comments
 (0)