Skip to content

Commit ede2c39

Browse files
authored
Merge pull request #24 from ThisWillGoWell/event-hub-addition
event hub integration
2 parents 93a1bce + b8bb624 commit ede2c39

14 files changed

+2660
-0
lines changed

docs/services/eventhub.inc

+83
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
2+
Event Hub
3+
---------
4+
5+
Event Hub uses gRPC for publishing and subscribing messages. A full-duplex
6+
streaming RPC framework, gRPC uses protocol buffers for wire protocol
7+
implemented over HTTP/2 and using header compression, multiplexing TCP
8+
connections, and flow control. Protocol Buffers is a binary protocol further
9+
suited for IoT devices that publish high-velocity data over networks with low
10+
bandwidth.
11+
12+
For more details see the official `Event Hub`_ service documentation.
13+
14+
.. _Event Hub: https://docs.predix.io/en-US/content/service/data_management/event_hub/
15+
16+
Example
17+
.......
18+
19+
Here is a simple demo to create a service instance.
20+
21+
::
22+
23+
# How-To Create the service
24+
# IMPORTANT: You must have already configured UAA and client
25+
26+
import predix.admin.app
27+
admin = predix.admin.app.Manifest()
28+
admin.create_eventhub(publish=True, subscribe=True)
29+
30+
::
31+
32+
# How-To Publish Messages
33+
34+
import predix.app
35+
app = predix.app.Manifest()
36+
eh = app.get_eventhub()
37+
38+
acks = {}
39+
for i in range(10, 0, -1):
40+
msg_id = str(i)
41+
msg_body = 'Hello World {}'.format(msg_id)
42+
eh.publisher.add_message(msg_id, msg_body)
43+
44+
acks[msg_id] = False
45+
46+
print("Publishing messages.")
47+
eh.publisher.publish_queue()
48+
49+
for response in eh.publisher.ack_generator():
50+
for ack in response.ack:
51+
msg_id = ack.id
52+
print("Message {} acknowledged.".format(msg_id))
53+
del acks[msg_id]
54+
55+
if not acks.keys():
56+
print("All messages delivered.")
57+
break
58+
59+
::
60+
61+
# How-To Subscribe Messages
62+
63+
import predix.app
64+
app = predix.app.Manifest()
65+
eh = app.get_eventhub()
66+
67+
for msg in eh.subscriber.subscribe():
68+
print(msg)
69+
70+
Find more examples :ref:`eventhub-cookbook`.
71+
72+
Event Hub API
73+
.............
74+
75+
.. automodule:: predix.data.eventhub
76+
:members:
77+
78+
Event Hub Administration
79+
........................
80+
81+
.. automodule:: predix.admin.eventhub
82+
:members:
83+

predix/admin/app.py

+13
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,19 @@ def create_dbaas(self, **kwargs):
205205
pg.add_to_manifest(self)
206206
return pg
207207

208+
def create_eventhub(self, **kwargs):
209+
"""
210+
todo make it so the client can be customised to publish/subscribe
211+
Creates an instance of eventhub service
212+
"""
213+
eventhub = predix.admin.eventhub.EventHub(**kwargs)
214+
eventhub.create()
215+
eventhub.add_to_manifest(self)
216+
eventhub.grant_client(client_id=self.get_client_id(), **kwargs)
217+
eventhub.add_to_manifest(self)
218+
return eventhub
219+
220+
208221
def get_service_marketplace(self, available=True, unavailable=False,
209222
deprecated=False):
210223
"""

predix/admin/eventhub.py

+151
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
import os
2+
3+
import predix.config
4+
import predix.security.uaa
5+
import predix.admin.service
6+
import predix.data.eventhub
7+
8+
9+
class EventHub(object):
10+
"""
11+
Event Hub is a publisher/subscriber framework for getting information in, out and around the predix cloud
12+
"""
13+
14+
def __init__(self, plan_name=None, name=None, uaa=None, *args, **kwargs):
15+
self.service_name = 'predix-event-hub'
16+
self.plan_name = plan_name or 'Tiered'
17+
self.use_class = predix.data.eventhub.Eventhub
18+
19+
self.service = predix.admin.service.PredixService(self.service_name,
20+
self.plan_name, name=name, uaa=uaa)
21+
22+
def exists(self):
23+
"""
24+
Returns whether or not this service already exists.
25+
"""
26+
return self.service.exists()
27+
28+
def create(self):
29+
"""
30+
Create an instance of the Time Series Service with the typical
31+
starting settings.
32+
"""
33+
self.service.create()
34+
35+
os.environ[predix.config.get_env_key(self.use_class, 'host')] = self.get_eventhub_host()
36+
os.environ[predix.config.get_env_key(self.use_class, 'port')] = self.get_eventhub_grpc_port()
37+
os.environ[predix.config.get_env_key(self.use_class, 'wss_publish_uri')] = self.get_publish_wss_uri()
38+
os.environ[predix.config.get_env_key(self.use_class, 'zone_id')] = self.get_zone_id()
39+
40+
def grant_client(self, client_id, publish=False, subscribe=False, publish_protocol=None, publish_topics=None,
41+
subscribe_topics=None, scope_prefix='predix-event-hub', **kwargs):
42+
"""
43+
Grant the given client id all the scopes and authorities
44+
needed to work with the eventhub service.
45+
"""
46+
scopes = ['openid']
47+
authorities = ['uaa.resource']
48+
49+
zone_id = self.get_zone_id()
50+
# always must be part of base user scope
51+
scopes.append('%s.zones.%s.user' % (scope_prefix, zone_id))
52+
authorities.append('%s.zones.%s.user' % (scope_prefix, zone_id))
53+
54+
if publish_topics is not None or subscribe_topics is not None:
55+
raise Exception("multiple topics are not currently available in preidx-py")
56+
57+
if publish_topics is None:
58+
publish_topics = ['topic']
59+
60+
if subscribe_topics is None:
61+
subscribe_topics = ['topic']
62+
63+
if publish:
64+
# we are granting just the default topic
65+
if publish_protocol is None:
66+
scopes.append('%s.zones.%s.grpc.publish' % (scope_prefix, zone_id))
67+
authorities.append('%s.zones.%s.grpc.publish' % (scope_prefix, zone_id))
68+
scopes.append('%s.zones.%s.wss.publish' % (scope_prefix, zone_id))
69+
authorities.append('%s.zones.%s.wss.publish' % (scope_prefix, zone_id))
70+
71+
else:
72+
scopes.append('%s.zones.%s.%s.publish' % (scope_prefix, zone_id, publish_protocol))
73+
authorities.append('%s.zones.%s.%s.publish' % (scope_prefix, zone_id, publish_protocol))
74+
75+
# we are requesting multiple topics
76+
for topic in publish_topics:
77+
if publish_protocol is None:
78+
scopes.append('%s.zones.%s.%s.grpc.publish' % (scope_prefix, zone_id, topic))
79+
scopes.append('%s.zones.%s.%s.wss.publish' % (scope_prefix, zone_id, topic))
80+
scopes.append('%s.zones.%s.%s.user' % (scope_prefix, zone_id, topic))
81+
authorities.append('%s.zones.%s.%s.grpc.publish' % (scope_prefix, zone_id, topic))
82+
authorities.append('%s.zones.%s.%s.wss.publish' % (scope_prefix, zone_id, topic))
83+
authorities.append('%s.zones.%s.%s.user' % (scope_prefix, zone_id, topic))
84+
else:
85+
scopes.append('%s.zones.%s.%s.%s.publish' % (scope_prefix, zone_id, topic, publish_protocol))
86+
authorities.append('%s.zones.%s.%s.%s.publish' % (scope_prefix, zone_id, topic, publish_protocol))
87+
if subscribe:
88+
# we are granting just the default topic
89+
scopes.append('%s.zones.%s.grpc.subscribe' % (scope_prefix, zone_id))
90+
authorities.append('%s.zones.%s.grpc.subscribe' % (scope_prefix, zone_id))
91+
92+
# we are requesting multiple topics
93+
for topic in subscribe_topics:
94+
scopes.append('%s.zones.%s.%s.grpc.subscribe' % (scope_prefix, zone_id, topic))
95+
authorities.append('%s.zones.%s.%s.grpc.subscribe' % (scope_prefix, zone_id, topic))
96+
97+
self.service.uaa.uaac.update_client_grants(client_id, scope=scopes,
98+
authorities=authorities)
99+
100+
return self.service.uaa.uaac.get_client(client_id)
101+
102+
def get_eventhub_host(self):
103+
"""
104+
returns the publish grpc endpoint for ingestion.
105+
"""
106+
for protocol in self.service.settings.data['publish']['protocol_details']:
107+
if protocol['protocol'] == 'grpc':
108+
return protocol['uri'][0:protocol['uri'].index(':')]
109+
110+
def get_eventhub_grpc_port(self):
111+
for protocol in self.service.settings.data['publish']['protocol_details']:
112+
if protocol['protocol'] == 'grpc':
113+
return str(protocol['uri'][(protocol['uri'].index(':') + 1):])
114+
115+
def get_publish_wss_uri(self):
116+
"""
117+
returns the publish grpc endpoint for ingestion.
118+
119+
"""
120+
for protocol in self.service.settings.data['publish']['protocol_details']:
121+
if protocol['protocol'] == 'wss':
122+
return protocol['uri']
123+
124+
def get_zone_id(self):
125+
return self.service.settings.data['publish']['zone-http-header-value']
126+
127+
def get_subscribe_uri(self):
128+
return self.service.settings.data['subscribe']['protocol_details'][0]['uri']
129+
130+
def make_topic(self, broker_uri, topic_name):
131+
raise Exception('make topic has not been implemented yet')
132+
133+
def add_to_manifest(self, manifest):
134+
"""
135+
Add useful details to the manifest about this service
136+
so that it can be used in an application.
137+
138+
:param manifest: An predix.admin.app.Manifest object
139+
instance that manages reading/writing manifest config
140+
for a cloud foundry app.
141+
"""
142+
# Add this service to list of services
143+
manifest.add_service(self.service.name)
144+
145+
# Add environment variables
146+
manifest.add_env_var(predix.config.get_env_key(self.use_class, 'host'), self.get_eventhub_host())
147+
manifest.add_env_var(predix.config.get_env_key(self.use_class, 'port'), self.get_eventhub_grpc_port())
148+
manifest.add_env_var(predix.config.get_env_key(self.use_class, 'wss_publish_uri'), self.get_publish_wss_uri())
149+
manifest.add_env_var(predix.config.get_env_key(self.use_class, 'zone_id'), self.get_zone_id())
150+
151+
manifest.write_manifest()

predix/app.py

+6
Original file line numberDiff line numberDiff line change
@@ -246,3 +246,9 @@ def get_dbaas(self, **kwargs):
246246
import predix.data.dbaas
247247
pg = predix.data.dbaas.PostgreSQL(**kwargs)
248248
return pg
249+
250+
def get_eventhub(self, publish_config=None, subscribe_config=None):
251+
import predix.data.eventhub.client
252+
eventhub = predix.data.eventhub.client.Eventhub(subscribe_config=subscribe_config,
253+
publish_config=publish_config)
254+
return eventhub

0 commit comments

Comments
 (0)