Skip to content

Commit e26edc6

Browse files
committed
removing comments
1 parent 322046b commit e26edc6

File tree

2 files changed

+8
-66
lines changed

2 files changed

+8
-66
lines changed

consumergroup.go

+8-27
Original file line numberDiff line numberDiff line change
@@ -952,11 +952,6 @@ func (cg *ConsumerGroup) joinGroup(conn coordinator, memberID string) (string, i
952952

953953
cg.withLogger(func(l Logger) {
954954
l.Printf("joined group %s as member %s in generation %d", cg.config.ID, memberID, generationID)
955-
l.Printf("*******joingroupresponse : %v", response)
956-
})
957-
cg.withLogger(func(l Logger) {
958-
l.Printf("joined group %s as member %s in generation %d", cg.config.ID, memberID, generationID)
959-
l.Printf("*******in joingroup: joingroupresponse : %v", response)
960955
})
961956

962957
var assignments GroupMemberAssignments
@@ -994,25 +989,23 @@ func (cg *ConsumerGroup) makeJoinGroupRequestV1(memberID string) (joinGroupReque
994989
ProtocolType: defaultProtocolType,
995990
}
996991
for _, balancer := range cg.config.GroupBalancers {
997-
// userData, err := balancer.UserData("", make(map[string][]int32), 0)
998-
var userdata []byte
999-
// if err != nil {
1000-
// return joinGroupRequestV1{}, fmt.Errorf("unable to construct protocol metadata for member, %v: %w", balancer.ProtocolName(), err)
1001-
// }
992+
userData, err := balancer.UserData(memberID, make(map[string][]int32), -1)
993+
if err != nil {
994+
return joinGroupRequestV1{}, fmt.Errorf("unable to construct protocol metadata for member, %v: %w", balancer.ProtocolName(), err)
995+
}
1002996
if balancer.ProtocolName() == "sticky" {
1003-
//userdata = append(userdata, cg.userData...)
1004-
userdata = cg.userData
1005-
//userDat.appendcg.userData
997+
userData = cg.userData
1006998
}
1007999
request.GroupProtocols = append(request.GroupProtocols, joinGroupRequestGroupProtocolV1{
10081000
ProtocolName: balancer.ProtocolName(),
10091001
ProtocolMetadata: groupMetadata{
10101002
Version: 1,
10111003
Topics: cg.config.Topics,
1012-
UserData: userdata,
1004+
UserData: userData,
10131005
}.bytes(),
10141006
})
10151007
}
1008+
10161009
return request, nil
10171010
}
10181011

@@ -1127,12 +1120,6 @@ func (cg *ConsumerGroup) makeSyncGroupRequestV0(memberID string, generationID in
11271120
}
11281121

11291122
balancer, _ := findGroupBalancer(strategy, cg.config.GroupBalancers)
1130-
// if !ok {
1131-
// // NOTE : this shouldn't happen in practice...the broker should not
1132-
// // return successfully from joinGroup unless all members support
1133-
// // at least one common protocol.
1134-
// return nil, fmt.Errorf("unable to find selected balancer, %v, for group, %v", group.GroupProtocol, cg.config.ID)
1135-
// }
11361123

11371124
if memberAssignments != nil {
11381125
request.GroupAssignments = make([]syncGroupRequestGroupAssignmentV0, 0, 1)
@@ -1150,13 +1137,7 @@ func (cg *ConsumerGroup) makeSyncGroupRequestV0(memberID string, generationID in
11501137
if balancer.ProtocolName() == "sticky" {
11511138
var stickyBalancer StickyGroupBalancer
11521139
userDataBytes, _ = stickyBalancer.UserData(memberID, topics32, generationID)
1153-
// if err != nil {
1154-
// return nil, err
1155-
// }
1156-
// userDataBytes,_ = encode(&StickyAssignorUserDataV1{
1157-
// Topics: topics32,
1158-
// Generation: generationID,
1159-
// }, nil)
1140+
// see if we can encode userdata here itself without having to call balancer.Userdata, that would avoid changing UserData()meth signatures and passing strategy var over join and sync group func calls
11601141
}
11611142
request.GroupAssignments = append(request.GroupAssignments, syncGroupRequestGroupAssignmentV0{
11621143
MemberID: memberID,

groupbalancer.go

-39
Original file line numberDiff line numberDiff line change
@@ -374,28 +374,6 @@ func (s StickyGroupBalancer) AssignGroups(members []GroupMember, topicPartitions
374374
topics[topic] = partitions
375375
}
376376
fmt.Printf("hello here1")
377-
// for topic, members := range membersByTopic {
378-
// partitions := findPartitions(topic, topicPartitions)
379-
// partitionCount := len(partitions)
380-
// memberCount := len(members)
381-
382-
// for memberIndex, member := range members {
383-
// assignmentsByTopic, ok := groupAssignments[member.ID]
384-
// if !ok {
385-
// assignmentsByTopic = map[string][]int{}
386-
// groupAssignments[member.ID] = assignmentsByTopic
387-
// }
388-
389-
// minIndex := memberIndex * partitionCount / memberCount
390-
// maxIndex := (memberIndex + 1) * partitionCount / memberCount
391-
392-
// for partitionIndex, partition := range partitions {
393-
// if partitionIndex >= minIndex && partitionIndex < maxIndex {
394-
// assignmentsByTopic[topic] = append(assignmentsByTopic[topic], partition)
395-
// }
396-
// }
397-
// }
398-
// }
399377

400378
// track partition movements during generation of the partition assignment plan
401379
s.movements = partitionMovements{
@@ -448,13 +426,11 @@ func (s StickyGroupBalancer) AssignGroups(members []GroupMember, topicPartitions
448426
unvisitedPartitions[partition] = true
449427
}
450428

451-
////
452429
membersByID := make(map[string]GroupMember)
453430
for _, member := range members {
454431
membersByID[member.ID] = member
455432
}
456433

457-
/////
458434
var unassignedPartitions []topicPartitionAssignment
459435
for memberID, partitions := range currentAssignment {
460436
var keepPartitions []topicPartitionAssignment
@@ -600,23 +576,9 @@ func prepopulateCurrentAssignments(members []GroupMember) (map[string][]topicPar
600576

601577
// Deserialize topic partition assignment data to aid with creation of a sticky assignment.
602578
func deserializeTopicPartitionAssignment(userDataBytes []byte) (StickyAssignorUserData, error) {
603-
// userDataV1 := &StickyAssignorUserDataV1{}
604-
// if err := decode(userDataBytes, userDataV1, nil); err != nil {
605-
// userDataV0 := &StickyAssignorUserDataV0{}
606-
// if err := decode(userDataBytes, userDataV0, nil); err != nil {
607-
// return nil, err
608-
// }
609-
// return userDataV0, nil
610-
// }
611-
// return userDataV1, nil
612-
//userDataV2 := &StickyAssignorUserDataV2{}
613579
var userDataV2 = &StickyAssignorUserDataV2{}
614-
fmt.Printf("userdatabytes : %v", userDataBytes)
615-
fmt.Printf("hello hello : userdatabytes : %v", userDataBytes)
616-
fmt.Printf("hello hello : str(userdatabytes) : %v", string(userDataBytes))
617580
b := bytes.NewBuffer(userDataBytes)
618581

619-
fmt.Printf("hello hello : bytes.buffer of userdatabytes : %v", b)
620582
if b.Len() == 0 {
621583
return userDataV2, nil
622584
}
@@ -628,7 +590,6 @@ func deserializeTopicPartitionAssignment(userDataBytes []byte) (StickyAssignorUs
628590
if remain != 0 {
629591
return nil, errors.New("expected 0 remain, got some bytes remaining")
630592
}
631-
fmt.Println("somehere userdatav2", userDataV2)
632593
return userDataV2, nil
633594

634595
}

0 commit comments

Comments
 (0)