Skip to content

Rename *Id -> *ID, following Go naming convention. #1

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ func (batch *Batch) ReadMessage() (Message, error) {
msg.Time = makeTime(timestamp)
msg.Headers = headers
if batch.conn != nil {
msg.GenerationId = batch.conn.generationId
msg.GenerationID = batch.conn.generationID
}

return msg, err
Expand Down
4 changes: 2 additions & 2 deletions commit.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ type commit struct {
topic string
partition int
offset int64
generationId int32
generationID int32
}

// makeCommit builds a commit value from a message, the resulting commit takes
Expand All @@ -16,7 +16,7 @@ func makeCommit(msg Message) commit {
topic: msg.Topic,
partition: msg.Partition,
offset: msg.Offset + 1,
generationId: msg.GenerationId,
generationID: msg.GenerationID,
}
}

Expand Down
10 changes: 5 additions & 5 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@ import (
"time"
)

const undefinedGenerationID int32 = -1

var (
errInvalidWriteTopic = errors.New("writes must NOT set Topic on kafka.Message")
errInvalidWritePartition = errors.New("writes must NOT set Partition on kafka.Message")
)

const undefinedGenerationId int32 = -1

// Conn represents a connection to a kafka broker.
//
// Instances of Conn are safe to use concurrently from multiple goroutines.
Expand Down Expand Up @@ -68,7 +68,7 @@ type Conn struct {

transactionalID *string

generationId int32
generationID int32
}

type apiVersionMap map[apiKey]ApiVersion
Expand Down Expand Up @@ -186,7 +186,7 @@ func NewConnWith(conn net.Conn, config ConnConfig) *Conn {
offset: FirstOffset,
requiredAcks: -1,
transactionalID: emptyToNullable(config.TransactionalID),
generationId: undefinedGenerationId,
generationID: undefinedGenerationID,
}

c.wb.w = &c.wbuf
Expand Down Expand Up @@ -393,7 +393,7 @@ func (c *Conn) joinGroup(request joinGroupRequestV1) (joinGroupResponseV1, error
return joinGroupResponseV1{}, Error(response.ErrorCode)
}

c.generationId = response.GenerationID
c.generationID = response.GenerationID
return response, nil
}

Expand Down
2 changes: 1 addition & 1 deletion message.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ type Message struct {

// If the message has been sent by a consumer group, it contains the
// generation's id. Value is -1 if not using consumer groups.
GenerationId int32
GenerationID int32

// This field is used to hold arbitrary data you wish to include, so it
// will be available when handle it on the Writer's `Completion` method,
Expand Down
18 changes: 9 additions & 9 deletions reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func (r *Reader) unsubscribe() {
// another consumer to avoid such a race.
}

func (r *Reader) subscribe(generationId int32, allAssignments map[string][]PartitionAssignment) {
func (r *Reader) subscribe(generationID int32, allAssignments map[string][]PartitionAssignment) {
offsets := make(map[topicPartition]int64)
for topic, assignments := range allAssignments {
for _, assignment := range assignments {
Expand All @@ -134,7 +134,7 @@ func (r *Reader) subscribe(generationId int32, allAssignments map[string][]Parti
}

r.mutex.Lock()
r.start(generationId, offsets)
r.start(generationID, offsets)
r.mutex.Unlock()

r.withLogger(func(l Logger) {
Expand Down Expand Up @@ -215,7 +215,7 @@ func (o offsetStash) merge(commits []commit) {
if offset, ok := offsetsByPartition[c.partition]; !ok || c.offset > offset.offset {
offsetsByPartition[c.partition] = offsetEntry{
offset: c.offset,
generationID: c.generationId,
generationID: c.generationID,
}
}
}
Expand Down Expand Up @@ -874,7 +874,7 @@ func (r *Reader) FetchMessage(ctx context.Context) (Message, error) {
r.mutex.Lock()

if !r.closed && r.version == 0 {
r.start(undefinedGenerationId, r.getTopicPartitionOffset())
r.start(undefinedGenerationID, r.getTopicPartitionOffset())
}

version := r.version
Expand Down Expand Up @@ -1095,7 +1095,7 @@ func (r *Reader) SetOffset(offset int64) error {
r.offset = offset

if r.version != 0 {
r.start(undefinedGenerationId, r.getTopicPartitionOffset())
r.start(undefinedGenerationID, r.getTopicPartitionOffset())
}

r.activateReadLag()
Expand Down Expand Up @@ -1233,7 +1233,7 @@ func (r *Reader) readLag(ctx context.Context) {
}
}

func (r *Reader) start(generationId int32, offsetsByPartition map[topicPartition]int64) {
func (r *Reader) start(generationID int32, offsetsByPartition map[topicPartition]int64) {
if r.closed {
// don't start child reader if parent Reader is closed
return
Expand Down Expand Up @@ -1271,7 +1271,7 @@ func (r *Reader) start(generationId int32, offsetsByPartition map[topicPartition

// backwards-compatibility flags
offsetOutOfRangeError: r.config.OffsetOutOfRangeError,
}).run(ctx, generationId, offset)
}).run(ctx, generationID, offset)
}(ctx, key, offset, &r.join)
}
}
Expand Down Expand Up @@ -1308,7 +1308,7 @@ type readerMessage struct {
error error
}

func (r *reader) run(ctx context.Context, generationId int32, offset int64) {
func (r *reader) run(ctx context.Context, generationID int32, offset int64) {
// This is the reader's main loop, it only ends if the context is canceled
// and will keep attempting to reader messages otherwise.
//
Expand Down Expand Up @@ -1361,7 +1361,7 @@ func (r *reader) run(ctx context.Context, generationId int32, offset int64) {
}
continue
}
conn.generationId = generationId
conn.generationID = generationID

// Resetting the attempt counter ensures that if a failure occurs after
// a successful initialization we don't keep increasing the backoff
Expand Down
18 changes: 9 additions & 9 deletions reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1462,44 +1462,44 @@ func TestCommitOffsetsWithRetry(t *testing.T) {
Offsets offsetStash
Config ReaderConfig
ExpectedOffsets offsetStash
GenerationId int32
GenerationID int32
}{
"happy path": {
Invocations: 1,
Offsets: offsets(),
ExpectedOffsets: offsets(),
GenerationId: 1,
GenerationID: 1,
},
"1 retry": {
Fails: 1,
Invocations: 2,
Offsets: offsets(),
ExpectedOffsets: offsets(),
GenerationId: 1,
GenerationID: 1,
},
"out of retries": {
Fails: defaultCommitRetries + 1,
Invocations: defaultCommitRetries,
HasError: true,
Offsets: offsets(),
ExpectedOffsets: offsets(),
GenerationId: 1,
GenerationID: 1,
},
"illegal generation error only 1 generation": {
Fails: 1,
Invocations: 1,
Offsets: offsetStash{"topic": {0: {0, 1}, 1: {0, 1}}},
ExpectedOffsets: offsetStash{},
Config: ReaderConfig{ErrorOnWrongGenerationCommit: false},
GenerationId: 2,
GenerationID: 2,
},
"illegal generation error only 2 generations": {
Fails: 1,
Invocations: 1,
Offsets: offsetStash{"topic": {0: {0, 1}, 1: {0, 2}}},
ExpectedOffsets: offsetStash{"topic": {1: {0, 2}}},
Config: ReaderConfig{ErrorOnWrongGenerationCommit: false},
GenerationId: 2,
GenerationID: 2,
},
"illegal generation error only 1 generation - error propagation": {
Fails: 1,
Expand All @@ -1508,7 +1508,7 @@ func TestCommitOffsetsWithRetry(t *testing.T) {
ExpectedOffsets: offsetStash{},
Config: ReaderConfig{ErrorOnWrongGenerationCommit: true},
HasError: true,
GenerationId: 2,
GenerationID: 2,
},
"illegal generation error only 2 generations - error propagation": {
Fails: 1,
Expand All @@ -1517,7 +1517,7 @@ func TestCommitOffsetsWithRetry(t *testing.T) {
ExpectedOffsets: offsetStash{"topic": {1: {0, 2}}},
Config: ReaderConfig{ErrorOnWrongGenerationCommit: true},
HasError: true,
GenerationId: 2,
GenerationID: 2,
},
}

Expand All @@ -1530,7 +1530,7 @@ func TestCommitOffsetsWithRetry(t *testing.T) {
offsetCommitFunc: func(r offsetCommitRequestV2) (offsetCommitResponseV2, error) {
requests = append(requests, r)
count++
if r.GenerationID != test.GenerationId {
if r.GenerationID != test.GenerationID {
return offsetCommitResponseV2{}, IllegalGeneration
}
if count <= test.Fails {
Expand Down