Skip to content

Commit 5680836

Browse files
authored
Merge pull request #147 from conductor-sdk/add-kafka-queue-support
Added EventClient with support for Queue configuration (e.g. Kafka)
2 parents e8b75f8 + a34d62a commit 5680836

12 files changed

Lines changed: 1004 additions & 2 deletions

File tree

src/conductor/client/event/__init__.py

Whitespace-only changes.
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
from conductor.client.http.api_client import ApiClient
2+
from conductor.client.http.api.event_resource_api import EventResourceApi
3+
from conductor.client.event.queue.queue_configuration import QueueConfiguration
4+
5+
6+
class EventClient:
7+
def __init__(self, api_client: ApiClient):
8+
self.client = EventResourceApi(api_client)
9+
10+
def delete_queue_configuration(self, queue_configuration: QueueConfiguration) -> None:
11+
return self.client.delete_queue_config(
12+
queue_name=queue_configuration.queue_name,
13+
queue_type=queue_configuration.queue_type,
14+
)
15+
16+
def get_kafka_queue_configuration(self, queue_topic: str) -> QueueConfiguration:
17+
return self.get_queue_configuration(
18+
queue_type='kafka',
19+
queue_name=queue_topic,
20+
)
21+
22+
def get_queue_configuration(self, queue_type: str, queue_name: str):
23+
return self.client.get_queue_config(queue_type, queue_name)
24+
25+
def put_queue_configuration(self, queue_configuration: QueueConfiguration):
26+
return self.client.put_queue_config(
27+
body=queue_configuration.get_worker_configuration(),
28+
queue_name=queue_configuration.queue_name,
29+
queue_type=queue_configuration.queue_type,
30+
)

src/conductor/client/event/queue/__init__.py

Whitespace-only changes.
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
from conductor.client.event.queue.queue_configuration import QueueConfiguration
2+
from conductor.client.event.queue.queue_worker_configuration import QueueWorkerConfiguration
3+
from typing import Any, Dict
4+
5+
6+
class KafkaQueueConfiguration(QueueConfiguration):
7+
def __init__(self, queue_topic_name: str):
8+
super().__init__(queue_topic_name, 'kafka')
9+
10+
def get_worker_configuration(self) -> Dict[str, Any]:
11+
worker_configuration = {}
12+
for required_key in ['consumer', 'producer']:
13+
if required_key not in self.worker_configuration:
14+
raise RuntimeError(f'required key not present: {required_key}')
15+
for key, value in self.worker_configuration.items():
16+
worker_configuration[key] = value.configuration
17+
return worker_configuration
18+
19+
20+
class KafkaConsumerConfiguration(QueueWorkerConfiguration):
21+
def __init__(self, bootstrap_servers_config: str):
22+
super().__init__()
23+
super().add_configuration(
24+
key='bootstrap.servers',
25+
value=bootstrap_servers_config
26+
)
27+
28+
29+
class KafkaProducerConfiguration(QueueWorkerConfiguration):
30+
def __init__(self, bootstrap_servers_config: str):
31+
super().__init__()
32+
super().add_configuration(
33+
key='bootstrap.servers',
34+
value=bootstrap_servers_config
35+
)
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
from conductor.client.event.queue.queue_worker_configuration import QueueWorkerConfiguration
2+
3+
from abc import ABC
4+
from typing import Any, Dict
5+
6+
7+
class QueueConfiguration(ABC):
8+
WORKER_CONSUMER_KEY = "consumer"
9+
WORKER_PRODUCER_KEY = "producer"
10+
11+
def __init__(self, queue_name: str, queue_type: str):
12+
self.queue_name = queue_name
13+
self.queue_type = queue_type
14+
self.worker_configuration = {}
15+
16+
def add_consumer(self, worker_configuration: QueueWorkerConfiguration) -> None:
17+
self.worker_configuration[self.WORKER_CONSUMER_KEY] = worker_configuration
18+
19+
def add_producer(self, worker_configuration: QueueWorkerConfiguration) -> None:
20+
self.worker_configuration[self.WORKER_PRODUCER_KEY] = worker_configuration
21+
22+
def get_worker_configuration(self) -> Dict[str, Any]:
23+
raise NotImplementedError
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
class QueueWorkerConfiguration:
2+
def __init__(self):
3+
self.configuration = {}
4+
5+
def add_configuration(self, key: str, value: str) -> None:
6+
self.configuration[key] = value

0 commit comments

Comments
 (0)