-
Notifications
You must be signed in to change notification settings - Fork 178
Expand file tree
/
Copy pathtest_in_rdkafka_group.rb
More file actions
123 lines (102 loc) · 3.17 KB
/
test_in_rdkafka_group.rb
File metadata and controls
123 lines (102 loc) · 3.17 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
require 'helper'
require 'fluent/test/driver/input'
require 'securerandom'
class RdkafkaGroupInputTest < Test::Unit::TestCase
def have_rdkafka
begin
require 'fluent/plugin/in_rdkafka_group'
true
rescue LoadError
false
end
end
def setup
omit_unless(have_rdkafka, "rdkafka isn't installed")
Fluent::Test.setup
end
TOPIC_NAME = "kafka-input-#{SecureRandom.uuid}"
CONFIG = %[
topics #{TOPIC_NAME}
kafka_configs {"bootstrap.servers": "localhost:9092", "group.id": "test_group"}
<parse>
@type none
</parse>
]
def create_driver(conf = CONFIG)
Fluent::Test::Driver::Input.new(Fluent::Plugin::RdKafkaGroupInput).configure(conf)
end
def test_configure
d = create_driver
assert_equal [TOPIC_NAME], d.instance.topics
assert_equal 'localhost:9092', d.instance.kafka_configs['bootstrap.servers']
end
def test_multi_worker_support
d = create_driver
assert_true d.instance.multi_workers_ready?
end
class ConsumeTest < self
TOPIC_NAME = "kafka-input-#{SecureRandom.uuid}"
def setup
@kafka = Kafka.new(["localhost:9092"], client_id: 'kafka')
@producer = @kafka.producer
@kafka.create_topic(TOPIC_NAME)
end
def teardown
@kafka.delete_topic(TOPIC_NAME)
@kafka.close
end
def test_consume
conf = %[
topics #{TOPIC_NAME}
kafka_configs {"bootstrap.servers": "localhost:9092", "group.id": "test_group"}
<parse>
@type none
</parse>
]
d = create_driver(conf)
d.run(expect_records: 1, timeout: 10) do
sleep 0.1
@producer.produce("Hello, fluent-plugin-kafka!", topic: TOPIC_NAME)
@producer.deliver_messages
end
expected = {'message' => 'Hello, fluent-plugin-kafka!'}
assert_equal expected, d.events[0][2]
end
end
class ConsumeTopicWithRegexpTest < self
TOPIC_NAME1 = "kafka-input-1-#{SecureRandom.uuid}"
TOPIC_NAME2 = "kafka-input-2-#{SecureRandom.uuid}"
TOPIC_NAME_REGEXP = "/kafka-input-(1|2)-.*/"
def setup
@kafka = Kafka.new(["localhost:9092"], client_id: 'kafka')
@producer = @kafka.producer
@kafka.create_topic(TOPIC_NAME1)
@kafka.create_topic(TOPIC_NAME2)
end
def teardown
@kafka.delete_topic(TOPIC_NAME1)
@kafka.delete_topic(TOPIC_NAME2)
@kafka.close
end
def test_consume_with_regexp
conf = %[
topics #{TOPIC_NAME_REGEXP}
kafka_configs {"bootstrap.servers": "localhost:9092", "group.id": "test_group"}
<parse>
@type none
</parse>
]
d = create_driver(conf)
d.run(expect_records: 2, timeout: 10) do
sleep 0.1
@producer.produce("Hello, fluent-plugin-kafka! in topic 1", topic: TOPIC_NAME1)
@producer.produce("Hello, fluent-plugin-kafka! in topic 2", topic: TOPIC_NAME2)
@producer.deliver_messages
end
expected_message_pattern = /Hello, fluent-plugin-kafka! in topic [12]/
assert_equal 2, d.events.size
assert_match(expected_message_pattern, d.events[0][2]['message'])
assert_match(expected_message_pattern, d.events[1][2]['message'])
end
end
end