Skip to content

Commit cda897d

Browse files
committed
kgo: add log for preferred replicas
1 parent 40589af commit cda897d

File tree

1 file changed

+62
-0
lines changed

1 file changed

+62
-0
lines changed

pkg/kgo/source.go

+62
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,9 @@ import (
55
"encoding/binary"
66
"fmt"
77
"hash/crc32"
8+
"slices"
89
"sort"
10+
"strings"
911
"sync"
1012
"time"
1113

@@ -262,6 +264,60 @@ func (p *cursorOffsetPreferred) move() {
262264

263265
type cursorPreferreds []cursorOffsetPreferred
264266

267+
func (cs cursorPreferreds) String() string {
268+
type pnext struct {
269+
p int32
270+
next int32
271+
}
272+
ts := make(map[string][]pnext)
273+
for _, c := range cs {
274+
t := c.from.topic
275+
p := c.from.partition
276+
ts[t] = append(ts[t], pnext{p, c.preferredReplica})
277+
}
278+
tsorted := make([]string, 0, len(ts))
279+
for t, ps := range ts {
280+
tsorted = append(tsorted, t)
281+
slices.SortFunc(ps, func(l, r pnext) int {
282+
if l.p < r.p {
283+
return -1
284+
}
285+
if l.p > r.p {
286+
return 1
287+
}
288+
if l.next < r.next {
289+
return -1
290+
}
291+
if l.next > r.next {
292+
return 1
293+
}
294+
return 0
295+
})
296+
}
297+
slices.Sort(tsorted)
298+
299+
sb := new(strings.Builder)
300+
for i, t := range tsorted {
301+
ps := ts[t]
302+
fmt.Fprintf(sb, "%s{", t)
303+
304+
for j, p := range ps {
305+
if j < len(ps)-1 {
306+
fmt.Fprintf(sb, "%d=>%d, ", p.p, p.next)
307+
} else {
308+
fmt.Fprintf(sb, "%d=>%d", p.p, p.next)
309+
}
310+
}
311+
312+
if i < len(tsorted)-1 {
313+
fmt.Fprint(sb, "}, ")
314+
} else {
315+
fmt.Fprint(sb, "}")
316+
}
317+
}
318+
return sb.String()
319+
}
320+
265321
func (cs cursorPreferreds) eachPreferred(fn func(cursorOffsetPreferred)) {
266322
for _, c := range cs {
267323
fn(c)
@@ -832,6 +888,12 @@ func (s *source) fetch(consumerSession *consumerSession, doneFetch chan<- struct
832888
// These two removals transition responsibility for finishing using the
833889
// cursor from the request's used offsets to the new source or the
834890
// reloading.
891+
if len(preferreds) > 0 {
892+
s.cl.cfg.logger.Log(LogLevelInfo, "fetch partitions returned preferred replicas",
893+
"from_broker", s.nodeID,
894+
"moves", preferreds.String(),
895+
)
896+
}
835897
preferreds.eachPreferred(func(c cursorOffsetPreferred) {
836898
c.move()
837899
deleteReqUsedOffset(c.from.topic, c.from.partition)

0 commit comments

Comments
 (0)