|
5 | 5 | "encoding/binary"
|
6 | 6 | "fmt"
|
7 | 7 | "hash/crc32"
|
| 8 | + "slices" |
8 | 9 | "sort"
|
| 10 | + "strings" |
9 | 11 | "sync"
|
10 | 12 | "time"
|
11 | 13 |
|
@@ -262,6 +264,60 @@ func (p *cursorOffsetPreferred) move() {
|
262 | 264 |
|
263 | 265 | type cursorPreferreds []cursorOffsetPreferred
|
264 | 266 |
|
| 267 | +func (cs cursorPreferreds) String() string { |
| 268 | + type pnext struct { |
| 269 | + p int32 |
| 270 | + next int32 |
| 271 | + } |
| 272 | + ts := make(map[string][]pnext) |
| 273 | + for _, c := range cs { |
| 274 | + t := c.from.topic |
| 275 | + p := c.from.partition |
| 276 | + ts[t] = append(ts[t], pnext{p, c.preferredReplica}) |
| 277 | + } |
| 278 | + tsorted := make([]string, 0, len(ts)) |
| 279 | + for t, ps := range ts { |
| 280 | + tsorted = append(tsorted, t) |
| 281 | + slices.SortFunc(ps, func(l, r pnext) int { |
| 282 | + if l.p < r.p { |
| 283 | + return -1 |
| 284 | + } |
| 285 | + if l.p > r.p { |
| 286 | + return 1 |
| 287 | + } |
| 288 | + if l.next < r.next { |
| 289 | + return -1 |
| 290 | + } |
| 291 | + if l.next > r.next { |
| 292 | + return 1 |
| 293 | + } |
| 294 | + return 0 |
| 295 | + }) |
| 296 | + } |
| 297 | + slices.Sort(tsorted) |
| 298 | + |
| 299 | + sb := new(strings.Builder) |
| 300 | + for i, t := range tsorted { |
| 301 | + ps := ts[t] |
| 302 | + fmt.Fprintf(sb, "%s{", t) |
| 303 | + |
| 304 | + for j, p := range ps { |
| 305 | + if j < len(ps)-1 { |
| 306 | + fmt.Fprintf(sb, "%d=>%d, ", p.p, p.next) |
| 307 | + } else { |
| 308 | + fmt.Fprintf(sb, "%d=>%d", p.p, p.next) |
| 309 | + } |
| 310 | + } |
| 311 | + |
| 312 | + if i < len(tsorted)-1 { |
| 313 | + fmt.Fprint(sb, "}, ") |
| 314 | + } else { |
| 315 | + fmt.Fprint(sb, "}") |
| 316 | + } |
| 317 | + } |
| 318 | + return sb.String() |
| 319 | +} |
| 320 | + |
265 | 321 | func (cs cursorPreferreds) eachPreferred(fn func(cursorOffsetPreferred)) {
|
266 | 322 | for _, c := range cs {
|
267 | 323 | fn(c)
|
@@ -832,6 +888,12 @@ func (s *source) fetch(consumerSession *consumerSession, doneFetch chan<- struct
|
832 | 888 | // These two removals transition responsibility for finishing using the
|
833 | 889 | // cursor from the request's used offsets to the new source or the
|
834 | 890 | // reloading.
|
| 891 | + if len(preferreds) > 0 { |
| 892 | + s.cl.cfg.logger.Log(LogLevelInfo, "fetch partitions returned preferred replicas", |
| 893 | + "from_broker", s.nodeID, |
| 894 | + "moves", preferreds.String(), |
| 895 | + ) |
| 896 | + } |
835 | 897 | preferreds.eachPreferred(func(c cursorOffsetPreferred) {
|
836 | 898 | c.move()
|
837 | 899 | deleteReqUsedOffset(c.from.topic, c.from.partition)
|
|
0 commit comments