Skip to content

Commit 6b81b70

Browse files
Merge pull request #34 from Nasdaq/release-0.4.x
Adding support for timeout and number of message settings
2 parents fee3ba9 + df0e2c1 commit 6b81b70

File tree

8 files changed

+44
-47
lines changed

8 files changed

+44
-47
lines changed

ncdssdk/src/main/python/ncdsclient/NCDSClient.py

+5-10
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,10 @@
11
import logging
2-
from importlib import resources
3-
from ncdssdk.src.main.python.ncdsclient.internal.utils import ConsumerConfig
42
from ncdssdk.src.main.python.ncdsclient.internal.utils.AuthenticationConfigLoader import AuthenticationConfigLoader
53
from ncdssdk.src.main.python.ncdsclient.consumer.NasdaqKafkaAvroConsumer import NasdaqKafkaAvroConsumer
64
from ncdssdk.src.main.python.ncdsclient.internal.utils import IsItPyTest
7-
import ncdssdk.src.main.resources as sysresources
8-
import json
95
from confluent_kafka import OFFSET_END
106
from ncdssdk.src.main.python.ncdsclient.internal.utils import LoggingConfig
7+
from ncdssdk.src.main.python.ncdsclient.internal.utils.KafkaConfigLoader import KafkaConfigLoader
118

129

1310
class NCDSClient:
@@ -27,14 +24,12 @@ def __init__(self, security_cfg, kafka_cfg):
2724
self.nasdaq_kafka_avro_consumer = None
2825
LoggingConfig.create_logger()
2926
self.logger = logging.getLogger(__name__)
27+
self.kafka_cfg = kafka_cfg
28+
self.kafka_config_loader = KafkaConfigLoader()
3029

3130
if kafka_cfg:
3231
kafka_cfg['logger'] = logging.getLogger(__name__)
3332

34-
with resources.open_text(sysresources, "consumer-properties.json") as f:
35-
self.consumer_props = json.load(f)
36-
f.close()
37-
3833
try:
3934
auth_config_loader = AuthenticationConfigLoader()
4035
if security_cfg is not None and auth_config_loader.validate_security_config(security_cfg):
@@ -103,7 +98,7 @@ def top_messages(self, topic_name, timestamp=None):
10398
kafka_consumer = self.ncds_kafka_consumer(topic_name, timestamp)
10499
self.logger.debug("kafka_consumer is now trying to consume")
105100
records = kafka_consumer.consume(
106-
self.consumer_props[ConsumerConfig.NUM_MESSAGES], self.consumer_props[ConsumerConfig.TIMEOUT])
101+
self.kafka_cfg[self.kafka_config_loader.NUM_MESSAGES], self.kafka_cfg[self.kafka_config_loader.TIMEOUT])
107102
return records
108103

109104
def get_sample_messages(self, topic_name, message_name, all_messages):
@@ -127,7 +122,7 @@ def get_sample_messages(self, topic_name, message_name, all_messages):
127122

128123
while not found:
129124
messages = kafka_consumer.consume(
130-
self.consumer_props[ConsumerConfig.NUM_MESSAGES], self.consumer_props[ConsumerConfig.TIMEOUT])
125+
self.kafka_cfg[self.kafka_config_loader.NUM_MESSAGES], self.kafka_cfg[self.kafka_config_loader.TIMEOUT])
131126
if not messages or self.end_of_data(kafka_consumer):
132127
print(
133128
"--------------------------------END of Stream------------------")

ncdssdk/src/main/python/ncdsclient/consumer/NasdaqKafkaAvroConsumer.py

+10-10
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,10 @@
66
from ncdssdk.src.main.python.ncdsclient.internal.utils.KafkaConfigLoader import KafkaConfigLoader
77
from ncdssdk.src.main.python.ncdsclient.internal.utils import IsItPyTest, SeekToMidnight
88
from confluent_kafka import TopicPartition, OFFSET_INVALID, OFFSET_END, OFFSET_BEGINNING
9-
import ncdssdk.src.main.python.ncdsclient.internal.utils.ConsumerConfig as config
109
from datetime import datetime
1110
from ncdssdk.src.main.python.ncdsclient.internal.utils.Oauth import Oauth
1211
import datetime
12+
from pprint import pformat
1313

1414

1515
class NasdaqKafkaAvroConsumer():
@@ -37,17 +37,17 @@ def __init__(self, security_cfg, kafka_cfg):
3737

3838
self.logger = logging.getLogger(__name__)
3939

40-
kafka_config_loader = KafkaConfigLoader()
40+
self.kafka_config_loader = KafkaConfigLoader()
4141
auth_config_loader = AuthenticationConfigLoader()
4242
if self.kafka_cfg is None:
4343
if IsItPyTest.is_py_test():
44-
pytest_kafka_cfg = kafka_config_loader.load_test_config()
44+
pytest_kafka_cfg = self.kafka_config_loader.load_test_config()
4545
self.kafka_props = pytest_kafka_cfg
4646
else:
4747
raise Exception("Kafka Configuration not defined")
4848
else:
4949
self.kafka_props = self.kafka_cfg
50-
kafka_config_loader.validate_and_add_specific_properties(
50+
self.kafka_config_loader.validate_and_add_specific_properties(
5151
self.kafka_props)
5252

5353
if self.security_cfg is None:
@@ -61,6 +61,8 @@ def __init__(self, security_cfg, kafka_cfg):
6161
self.read_schema_topic.set_security_props(self.security_props)
6262
self.read_schema_topic.set_kafka_props(self.kafka_props)
6363
self.client_ID = auth_config_loader.get_client_id(self.security_props)
64+
self.logger.info("Consumer Config: ")
65+
self.logger.info(pformat(self.kafka_cfg))
6466

6567
def get_kafka_consumer(self, stream_name, timestamp=None):
6668
"""
@@ -87,8 +89,7 @@ def get_kafka_consumer(self, stream_name, timestamp=None):
8789
if timestamp is None:
8890
self.logger.debug("Timestamp is none")
8991

90-
auto_offset_cfg = self.kafka_props.get(
91-
config.AUTO_OFFSET_RESET_CONFIG)
92+
auto_offset_cfg = self.kafka_props.get(self.kafka_config_loader.AUTO_OFFSET_RESET_CONFIG)
9293
if auto_offset_cfg == "earliest" or auto_offset_cfg == "smallest" or auto_offset_cfg == "beginning":
9394
self.logger.debug(
9495
f"Auto offset reset config set to: {auto_offset_cfg}")
@@ -104,7 +105,7 @@ def get_kafka_consumer(self, stream_name, timestamp=None):
104105
self.logger.debug(
105106
"offset: " + str(topic_partition.offset) + ", timestamp: " + str(timestamp))
106107
offsets_for_times = kafka_consumer.offsets_for_times(
107-
[topic_partition], timeout=5)
108+
[topic_partition], self.kafka_cfg.TIMEOUT)
108109
except Exception as e:
109110
self.logger.exception(e)
110111
sys.exit(0)
@@ -130,9 +131,8 @@ def get_consumer(self, avro_schema, stream_name):
130131
Returns:
131132
a :class:`.KafkaAvroConsumer` instance with a key and value deserializer set through the avro_schema parameter
132133
"""
133-
if 'auto.offset.reset' not in self.kafka_props:
134-
self.kafka_props[config.AUTO_OFFSET_RESET_CONFIG] = "earliest"
135-
self.kafka_props[config.GROUP_ID_CONFIG] = f'{self.client_ID}_{stream_name}_{datetime.datetime.today().day}'
134+
if 'group.id' not in self.kafka_props:
135+
self.kafka_props[self.kafka_config_loader.GROUP_ID_CONFIG] = f'{self.client_ID}_{stream_name}_{datetime.datetime.today().day}'
136136
return KafkaAvroConsumer(self.kafka_props, avro_schema)
137137

138138
def get_schema_for_topic(self, topic):

ncdssdk/src/main/python/ncdsclient/internal/BasicKafkaConsumer.py

+7-1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
MessageField)
66
import logging
77

8+
from ncdssdk.src.main.python.ncdsclient.internal.utils.KafkaConfigLoader import KafkaConfigLoader
9+
810

911
class BasicKafkaConsumer(DeserializingConsumer):
1012
"""
@@ -20,8 +22,12 @@ class BasicKafkaConsumer(DeserializingConsumer):
2022
def __init__(self, config, key_deserializer, value_deserializer):
2123
config["key.deserializer"] = key_deserializer
2224
config["value.deserializer"] = value_deserializer.decode
25+
kafka_config = config.copy()
26+
del kafka_config[KafkaConfigLoader().TIMEOUT]
27+
del kafka_config[KafkaConfigLoader().NUM_MESSAGES]
28+
2329
self.logger = logging.getLogger(__name__)
24-
super(BasicKafkaConsumer, self).__init__(config)
30+
super(BasicKafkaConsumer, self).__init__(kafka_config)
2531

2632
def ensure_assignment(self):
2733
"""

ncdssdk/src/main/python/ncdsclient/internal/ReadSchemaTopic.py

+6-11
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
import ncdssdk.src.main.resources.schemas as schemas
1010
from ncdssdk.src.main.python.ncdsclient.internal.KafkaAvroConsumer import KafkaAvroConsumer
1111
from confluent_kafka import TopicPartition
12-
import ncdssdk.src.main.python.ncdsclient.internal.utils.ConsumerConfig as config
1312
from confluent_kafka import OFFSET_BEGINNING
1413

1514

@@ -29,22 +28,18 @@ def __init__(self):
2928
self.security_props = None
3029
self.kafka_props = {}
3130
self.logger = logging.getLogger(__name__)
32-
33-
with resources.open_text(sysresources, "consumer-properties.json") as f:
34-
self.consumer_props = json.load(f)
35-
f.close()
36-
37-
self.num_messages = self.consumer_props[config.NUM_MESSAGES]
38-
self.timeout = self.consumer_props[config.TIMEOUT]
31+
self.kafka_config_loader = KafkaConfigLoader()
3932

4033
def read_schema(self, topic):
4134
auth_config_loader = AuthenticationConfigLoader()
4235
schema_consumer = self.get_consumer(
4336
"Control-" + auth_config_loader.get_client_id(self.security_props))
4437
latest_record = None
38+
num_messages = self.kafka_props[self.kafka_config_loader.NUM_MESSAGES]
39+
timeout = self.kafka_props[self.kafka_config_loader.TIMEOUT]
4540
while True:
4641
schema_messages = schema_consumer.consume(
47-
self.num_messages, self.timeout)
42+
num_messages, timeout)
4843
if not schema_messages:
4944
break
5045
for message in reversed(schema_messages):
@@ -110,8 +105,8 @@ def get_consumer(self, client_id):
110105
if IsItPyTest.is_py_test():
111106
self.kafka_props = KafkaConfigLoader.load_test_config()
112107

113-
self.kafka_props[config.AUTO_OFFSET_RESET_CONFIG] = 'earliest'
114-
self.kafka_props[config.GROUP_ID_CONFIG] = f'{client_id}1'
108+
self.kafka_props[self.kafka_config_loader.AUTO_OFFSET_RESET_CONFIG] = 'earliest'
109+
self.kafka_props[self.kafka_config_loader.GROUP_ID_CONFIG] = f'{client_id}1'
115110

116111
kafka_avro_consumer = KafkaAvroConsumer(
117112
self.kafka_props, ctrl_msg_schema)

ncdssdk/src/main/python/ncdsclient/internal/utils/ConsumerConfig.py

-4
This file was deleted.

ncdssdk/src/main/python/ncdsclient/internal/utils/KafkaConfigLoader.py

+11
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,10 @@ class KafkaConfigLoader:
1313

1414
def __init__(self):
1515
self.BOOTSTRAP_SERVERS = "bootstrap.servers"
16+
self.AUTO_OFFSET_RESET_CONFIG = 'auto.offset.reset'
17+
self.GROUP_ID_CONFIG = 'group.id'
18+
self.TIMEOUT = 'timeout'
19+
self.NUM_MESSAGES = 'num_messages'
1620
self.logger = logging.getLogger(__name__)
1721

1822
@staticmethod
@@ -38,5 +42,12 @@ def validate_and_add_specific_properties(self, p):
3842
if not p[self.BOOTSTRAP_SERVERS]:
3943
raise Exception(
4044
"bootstrap.servers Properties is not set in the Kafka Configuration")
45+
if not p[self.AUTO_OFFSET_RESET_CONFIG]:
46+
self.AUTO_OFFSET_RESET_CONFIG = "earliest"
47+
if self.TIMEOUT not in p:
48+
p[self.TIMEOUT] = 10
49+
if self.NUM_MESSAGES not in p:
50+
p[self.NUM_MESSAGES] = 500
51+
4152
self.nasdaq_specific_config(p)
4253
return p

ncdssdk_client/src/main/python/ncdsclient/NCDSSession.py

+4-10
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,12 @@
33
import sys
44
import logging
55
from ncdssdk.src.main.python.ncdsclient.NCDSClient import NCDSClient
6+
from ncdssdk.src.main.python.ncdsclient.internal.utils.KafkaConfigLoader import KafkaConfigLoader
67
from ncdssdk_client.src.main.python.ncdsclient.utils.ValidateInput import ValidateInput
78
from confluent_kafka import KafkaException
89
from importlib import resources
910
import ncdssdk_client.src.main.python.resources as configresources
1011
import ncdssdk.src.main.resources as sysresources
11-
from ncdssdk.src.main.python.ncdsclient.internal.utils import ConsumerConfig
1212
import logging
1313

1414

@@ -35,6 +35,7 @@ def __init__(self, cmd):
3535
def main(self):
3636
self.security_cfg = load_auth_properties(self.auth_props_file)
3737
self.kafka_cfg = load_kafka_config(self.kafka_props_file)
38+
self.kafka_config_loader = KafkaConfigLoader()
3839

3940
cmd_to_validate = ValidateInput(self.cmd)
4041
cmd_to_validate.validate_user_input()
@@ -118,12 +119,11 @@ def cont_stream_cmd(self):
118119

119120
try:
120121
while True:
121-
message = consumer.poll(sys.maxsize)
122+
message = consumer.poll(self.kafka_cfg[self.kafka_config_loader.NUM_MESSAGES])
122123
if message is None:
123124
print(f"No Records Found for the Topic: {self.topic}")
124125
else:
125126
print(f"value :" + str(message.value()))
126-
consumer.commit(message=message, asynchronous=True)
127127

128128
except KafkaException as e:
129129
logging.exception(f"Error in cont stream {e.args[0].str()}")
@@ -144,14 +144,10 @@ def filter_stream_cmd(self):
144144
consumer = ncds_client.ncds_kafka_consumer(
145145
self.topic) if not self.timestamp else ncds_client.ncds_kafka_consumer(self.topic, self.timestamp)
146146

147-
with resources.open_text(sysresources, "consumer-properties.json") as f:
148-
consumer_props = json.load(f)
149-
f.close()
150-
151147
try:
152148
while True:
153149
messages = consumer.consume(
154-
consumer_props[ConsumerConfig.NUM_MESSAGES], consumer_props[ConsumerConfig.TIMEOUT])
150+
self.kafka_cfg[self.kafka_config_loader.NUM_MESSAGES], self.kafka_cfg[self.kafka_config_loader.TIMEOUT])
155151
self.logger.debug(
156152
f"number of messages consumed: {len(messages)}")
157153
if len(messages) == 0:
@@ -172,8 +168,6 @@ def filter_stream_cmd(self):
172168

173169
except KeyError:
174170
pass
175-
176-
consumer.commit(message=message, asynchronous=True)
177171
except Exception as e:
178172
logging.exception(f"Error in filter stream: {e}")
179173

setup.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88

99
setup(
1010
name='ncdssdk',
11-
version='0.2.0',
11+
version='0.4.0',
1212
description='A Python SDK for developing applications to access the NCDS API',
1313
long_description=long_description,
1414
long_description_content_type='text/markdown',

0 commit comments

Comments
 (0)