|
| 1 | +import os |
| 2 | + |
| 3 | +import predix.config |
| 4 | +import predix.security.uaa |
| 5 | +import predix.admin.service |
| 6 | +import predix.data.eventhub |
| 7 | + |
| 8 | + |
| 9 | +class EventHub(object): |
| 10 | + """ |
| 11 | + Event Hub is a publisher/subscriber framework for getting information in, out and around the predix cloud |
| 12 | + """ |
| 13 | + |
| 14 | + def __init__(self, plan_name=None, name=None, uaa=None, *args, **kwargs): |
| 15 | + self.service_name = 'predix-event-hub' |
| 16 | + self.plan_name = plan_name or 'Tiered' |
| 17 | + self.use_class = predix.data.eventhub.Eventhub |
| 18 | + |
| 19 | + self.service = predix.admin.service.PredixService(self.service_name, |
| 20 | + self.plan_name, name=name, uaa=uaa) |
| 21 | + |
| 22 | + def exists(self): |
| 23 | + """ |
| 24 | + Returns whether or not this service already exists. |
| 25 | + """ |
| 26 | + return self.service.exists() |
| 27 | + |
| 28 | + def create(self): |
| 29 | + """ |
| 30 | + Create an instance of the Time Series Service with the typical |
| 31 | + starting settings. |
| 32 | + """ |
| 33 | + self.service.create() |
| 34 | + |
| 35 | + os.environ[predix.config.get_env_key(self.use_class, 'host')] = self.get_eventhub_host() |
| 36 | + os.environ[predix.config.get_env_key(self.use_class, 'port')] = self.get_eventhub_grpc_port() |
| 37 | + os.environ[predix.config.get_env_key(self.use_class, 'wss_publish_uri')] = self.get_publish_wss_uri() |
| 38 | + os.environ[predix.config.get_env_key(self.use_class, 'zone_id')] = self.get_zone_id() |
| 39 | + |
| 40 | + def grant_client(self, client_id, publish=False, subscribe=False, publish_protocol=None, publish_topics=None, |
| 41 | + subscribe_topics=None, scope_prefix='predix-event-hub', **kwargs): |
| 42 | + """ |
| 43 | + Grant the given client id all the scopes and authorities |
| 44 | + needed to work with the eventhub service. |
| 45 | + """ |
| 46 | + scopes = ['openid'] |
| 47 | + authorities = ['uaa.resource'] |
| 48 | + |
| 49 | + zone_id = self.get_zone_id() |
| 50 | + # always must be part of base user scope |
| 51 | + scopes.append('%s.zones.%s.user' % (scope_prefix, zone_id)) |
| 52 | + authorities.append('%s.zones.%s.user' % (scope_prefix, zone_id)) |
| 53 | + |
| 54 | + if publish_topics is not None or subscribe_topics is not None: |
| 55 | + raise Exception("multiple topics are not currently available in preidx-py") |
| 56 | + |
| 57 | + if publish_topics is None: |
| 58 | + publish_topics = ['topic'] |
| 59 | + |
| 60 | + if subscribe_topics is None: |
| 61 | + subscribe_topics = ['topic'] |
| 62 | + |
| 63 | + if publish: |
| 64 | + # we are granting just the default topic |
| 65 | + if publish_protocol is None: |
| 66 | + scopes.append('%s.zones.%s.grpc.publish' % (scope_prefix, zone_id)) |
| 67 | + authorities.append('%s.zones.%s.grpc.publish' % (scope_prefix, zone_id)) |
| 68 | + scopes.append('%s.zones.%s.wss.publish' % (scope_prefix, zone_id)) |
| 69 | + authorities.append('%s.zones.%s.wss.publish' % (scope_prefix, zone_id)) |
| 70 | + |
| 71 | + else: |
| 72 | + scopes.append('%s.zones.%s.%s.publish' % (scope_prefix, zone_id, publish_protocol)) |
| 73 | + authorities.append('%s.zones.%s.%s.publish' % (scope_prefix, zone_id, publish_protocol)) |
| 74 | + |
| 75 | + # we are requesting multiple topics |
| 76 | + for topic in publish_topics: |
| 77 | + if publish_protocol is None: |
| 78 | + scopes.append('%s.zones.%s.%s.grpc.publish' % (scope_prefix, zone_id, topic)) |
| 79 | + scopes.append('%s.zones.%s.%s.wss.publish' % (scope_prefix, zone_id, topic)) |
| 80 | + scopes.append('%s.zones.%s.%s.user' % (scope_prefix, zone_id, topic)) |
| 81 | + authorities.append('%s.zones.%s.%s.grpc.publish' % (scope_prefix, zone_id, topic)) |
| 82 | + authorities.append('%s.zones.%s.%s.wss.publish' % (scope_prefix, zone_id, topic)) |
| 83 | + authorities.append('%s.zones.%s.%s.user' % (scope_prefix, zone_id, topic)) |
| 84 | + else: |
| 85 | + scopes.append('%s.zones.%s.%s.%s.publish' % (scope_prefix, zone_id, topic, publish_protocol)) |
| 86 | + authorities.append('%s.zones.%s.%s.%s.publish' % (scope_prefix, zone_id, topic, publish_protocol)) |
| 87 | + if subscribe: |
| 88 | + # we are granting just the default topic |
| 89 | + scopes.append('%s.zones.%s.grpc.subscribe' % (scope_prefix, zone_id)) |
| 90 | + authorities.append('%s.zones.%s.grpc.subscribe' % (scope_prefix, zone_id)) |
| 91 | + |
| 92 | + # we are requesting multiple topics |
| 93 | + for topic in subscribe_topics: |
| 94 | + scopes.append('%s.zones.%s.%s.grpc.subscribe' % (scope_prefix, zone_id, topic)) |
| 95 | + authorities.append('%s.zones.%s.%s.grpc.subscribe' % (scope_prefix, zone_id, topic)) |
| 96 | + |
| 97 | + self.service.uaa.uaac.update_client_grants(client_id, scope=scopes, |
| 98 | + authorities=authorities) |
| 99 | + |
| 100 | + return self.service.uaa.uaac.get_client(client_id) |
| 101 | + |
| 102 | + def get_eventhub_host(self): |
| 103 | + """ |
| 104 | + returns the publish grpc endpoint for ingestion. |
| 105 | + """ |
| 106 | + for protocol in self.service.settings.data['publish']['protocol_details']: |
| 107 | + if protocol['protocol'] == 'grpc': |
| 108 | + return protocol['uri'][0:protocol['uri'].index(':')] |
| 109 | + |
| 110 | + def get_eventhub_grpc_port(self): |
| 111 | + for protocol in self.service.settings.data['publish']['protocol_details']: |
| 112 | + if protocol['protocol'] == 'grpc': |
| 113 | + return str(protocol['uri'][(protocol['uri'].index(':') + 1):]) |
| 114 | + |
| 115 | + def get_publish_wss_uri(self): |
| 116 | + """ |
| 117 | + returns the publish grpc endpoint for ingestion. |
| 118 | +
|
| 119 | + """ |
| 120 | + for protocol in self.service.settings.data['publish']['protocol_details']: |
| 121 | + if protocol['protocol'] == 'wss': |
| 122 | + return protocol['uri'] |
| 123 | + |
| 124 | + def get_zone_id(self): |
| 125 | + return self.service.settings.data['publish']['zone-http-header-value'] |
| 126 | + |
| 127 | + def get_subscribe_uri(self): |
| 128 | + return self.service.settings.data['subscribe']['protocol_details'][0]['uri'] |
| 129 | + |
| 130 | + def make_topic(self, broker_uri, topic_name): |
| 131 | + raise Exception('make topic has not been implemented yet') |
| 132 | + |
| 133 | + def add_to_manifest(self, manifest): |
| 134 | + """ |
| 135 | + Add useful details to the manifest about this service |
| 136 | + so that it can be used in an application. |
| 137 | +
|
| 138 | + :param manifest: An predix.admin.app.Manifest object |
| 139 | + instance that manages reading/writing manifest config |
| 140 | + for a cloud foundry app. |
| 141 | + """ |
| 142 | + # Add this service to list of services |
| 143 | + manifest.add_service(self.service.name) |
| 144 | + |
| 145 | + # Add environment variables |
| 146 | + manifest.add_env_var(predix.config.get_env_key(self.use_class, 'host'), self.get_eventhub_host()) |
| 147 | + manifest.add_env_var(predix.config.get_env_key(self.use_class, 'port'), self.get_eventhub_grpc_port()) |
| 148 | + manifest.add_env_var(predix.config.get_env_key(self.use_class, 'wss_publish_uri'), self.get_publish_wss_uri()) |
| 149 | + manifest.add_env_var(predix.config.get_env_key(self.use_class, 'zone_id'), self.get_zone_id()) |
| 150 | + |
| 151 | + manifest.write_manifest() |
0 commit comments