Skip to content

Commit 978abae

Browse files
committed
kafka
1 parent 13e8eae commit 978abae

File tree

1 file changed

+16
-10
lines changed

1 file changed

+16
-10
lines changed

kafka/offset_test.go

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -15,35 +15,41 @@ func TestKafkaSyncProducer(t *testing.T) {
1515
conf.Producer.Return.Successes = true
1616
conf.Producer.Return.Errors = true
1717
conf.Producer.Retry.Max = 1000
18-
conf.Version = sarama.V2_5_0_0
19-
producer, err := sarama.NewSyncProducer([]string{"172.12.12.188:9092"}, conf)
18+
conf.Version = sarama.V2_8_0_0
19+
producer, err := sarama.NewSyncProducer([]string{"172.12.12.165:9092"}, conf)
2020
if err != nil {
2121
t.Error(err)
2222
return
2323
}
2424
defer producer.Close()
2525
fmt.Println(producer.SendMessage(&sarama.ProducerMessage{
2626
Topic: "test_topic11",
27-
Value: sarama.ByteEncoder("1111:2222"),
27+
Value: sarama.ByteEncoder("1"),
2828
}))
2929
select {}
3030
}
3131

3232
func TestConsumer(t *testing.T) {
3333
conf := sarama.NewConfig()
34-
conf.Version = sarama.V2_5_0_0
35-
conf.Consumer.Return.Errors = false
36-
consumer, err := sarama.NewConsumerGroup([]string{"172.12.12.188:9092"}, "test_group", conf)
34+
conf.Version = sarama.V2_8_0_0
35+
conf.Consumer.Return.Errors = true
36+
conf.Consumer.Offsets.Initial = sarama.OffsetNewest
37+
consumer, err := sarama.NewConsumerGroup([]string{"172.12.12.165:9092"}, "test_group", conf)
3738
if err != nil {
3839
t.Error(err)
3940
return
4041
}
4142
defer consumer.Close()
42-
for {
43-
if err := consumer.Consume(context.Background(), []string{"test_topic11"}, &Job{}); err != nil {
44-
fmt.Println("err", err)
43+
ctx := context.Background()
44+
go func() {
45+
for {
46+
j := &Job{}
47+
if err := consumer.Consume(ctx, []string{"test_topic11"}, j); err != nil {
48+
fmt.Println("err", err)
49+
}
4550
}
46-
}
51+
}()
52+
select {}
4753
}
4854

4955
type Job struct {

0 commit comments

Comments
 (0)