forked from LovisaLugnegard/exjobb
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpython_kafka_consumer_perf.py
More file actions
45 lines (36 loc) · 1.52 KB
/
Copy pathpython_kafka_consumer_perf.py
File metadata and controls
45 lines (36 loc) · 1.52 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import time
from kafka import KafkaConsumer
from myvariables import kafka_server, topic, max_msg_size
# file1 = open("consumer1_res.txt", "a")
# file2
def python_kafka_consumer_performance(consumer_number):
file = open("consmer_res" + str(consumer_number) + ".txt", "a")
# topic = TOPIC
msg_count = 0
print("in multip!")
print(topic)
file.write("\n{}".format(time.time()))
# file.write(str(time.perf_counter()))
consumer = KafkaConsumer(group_id='my-group',
auto_offset_reset='earliest',
bootstrap_servers=[kafka_server + ":9092"],
consumer_timeout_ms=20000,
max_partition_fetch_bytes=max_msg_size)
msg_consumed_count = 0
print("msg_count: {}".format(msg_count))
consumer.subscribe([topic])
consumer_start = time.time()
for message in consumer:
# print("hejhej")
# print("{}, msg nb: {}".format(consumer_number, msg_consumed_count))
msg_consumed_count += 1
file.write("\n{}".format(time.time()))
# img = cv2.imdecode(np.frombuffer(message.value, dtype=np.uint16), -1)
# fin2 = Image.fromarray(img)
# if msg_consumed_count >= msg_count:
# break
consumer_timing = time.time() - consumer_start - 2 # consumer waits 2 sec before closing if there are no new
# messages
print("{} consumer_time: {} msg_count: {}".format(consumer_number, consumer_timing, msg_consumed_count))
consumer.close()
return "done!"