-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathconsumer.go
More file actions
152 lines (125 loc) · 3.27 KB
/
consumer.go
File metadata and controls
152 lines (125 loc) · 3.27 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
package main
import (
"context"
"fmt"
"time"
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
"golang.org/x/sync/errgroup"
)
type Consumer interface {
waitMessages(ctx context.Context, listener Listener)
close()
}
type SimpleConsumer struct {
kafkaConsumer *kafka.Consumer
}
type ConsumerConfig struct {
Topic string
Partitioned bool
}
type PartitionedConsumer struct {
kafkaConsumers []*kafka.Consumer
}
func newConsumer(config *Config) (Consumer, error) {
if config.Consumer.Partitioned {
return newPartitionedConsumer(config)
}
return newSimpleConsumer(config)
}
func newSimpleConsumer(config *Config) (*SimpleConsumer, error) {
kafkaConsumer, err := kafka.NewConsumer(&config.Kafka)
if err != nil {
return nil, err
}
err = kafkaConsumer.Subscribe(config.Consumer.Topic, nil)
if err != nil {
return nil, err
}
return &SimpleConsumer{
kafkaConsumer,
}, nil
}
func (consumer *SimpleConsumer) close() {
consumer.kafkaConsumer.Close()
}
func (consumer *SimpleConsumer) waitMessages(ctx context.Context, listener Listener) {
err := consumerWaitMessages(ctx, consumer.kafkaConsumer, listener)
if err != nil {
fmt.Println("error waiting messages:", err)
}
}
func consumerWaitMessages(ctx context.Context, consumer *kafka.Consumer, listener Listener) error {
fmt.Println("Running consumer " + consumer.String())
for {
select {
case <-ctx.Done():
fmt.Println("Done, exiting consumer loop")
return nil
default:
}
message, err := consumer.ReadMessage(time.Second * 60)
if err != nil {
return err
}
listener.enqueue(message)
}
}
func newPartitionedConsumer(config *Config) (Consumer, error) {
kafkaAdmin, err := kafka.NewAdminClient(&config.Kafka)
if err != nil {
return nil, err
}
defer kafkaAdmin.Close()
metadata, err := kafkaAdmin.GetMetadata(&config.Consumer.Topic, false, 10000)
if err != nil {
return nil, err
}
kafkaErr := metadata.Topics[config.Consumer.Topic].Error
if kafkaErr.Code() != kafka.ErrNoError {
return nil, kafkaErr
}
partitions := metadata.Topics[config.Consumer.Topic].Partitions
fmt.Printf("Subscribing to topic %s %d partitions: %v...\n", config.Consumer.Topic, len(partitions), partitions)
consumers := make([]*kafka.Consumer, len(partitions))
for i, partition := range partitions {
consumer, err := kafka.NewConsumer(&config.Kafka)
if err != nil {
return nil, err
}
err = consumer.Assign([]kafka.TopicPartition{{
Topic: &config.Consumer.Topic,
Partition: partition.ID,
Offset: kafka.OffsetStored,
}})
if err != nil {
return nil, err
}
err = consumer.Subscribe(config.Consumer.Topic, nil)
if err != nil {
return nil, err
}
consumers[i] = consumer
}
fmt.Printf("Assigned %d consumers to %s topic\n", len(consumers), config.Consumer.Topic)
return &PartitionedConsumer{
kafkaConsumers: consumers,
}, nil
}
func (consumer *PartitionedConsumer) close() {
for _, c := range consumer.kafkaConsumers {
err := c.Close()
if err != nil {
fmt.Println("Error closing consumer: " + err.Error())
}
}
}
func (consumer *PartitionedConsumer) waitMessages(ctx context.Context, listener Listener) {
var wg errgroup.Group
for _, c := range consumer.kafkaConsumers {
c := c
wg.Go(func() error {
return consumerWaitMessages(ctx, c, listener)
})
}
wg.Wait()
}