forked from daniel-gonzalez-sanchez/ngsi-ld-client-tester
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
105 lines (88 loc) · 3.31 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
import logging
import os
from time import sleep
from fastapi import FastAPI, Request, status
import json
import ngsi_ld_client
from ngsi_ld_client.models.create_subscription_request import CreateSubscriptionRequest
from ngsi_ld_client.models.subscription_on_change import SubscriptionOnChange
from ngsi_ld_client.models.notification_params import NotificationParams
from ngsi_ld_client.models.endpoint import Endpoint
from ngsi_ld_client.api_client import ApiClient as NGSILDClient
from ngsi_ld_client.configuration import Configuration as NGSILDConfiguration
logger = logging.getLogger(__name__)
# NGSI-LD Context Broker
BROKER_URI = os.getenv("BROKER_URI", "http://orion:1026/ngsi-ld/v1")
# Context Catalog
CONTEXT_CATALOG_URI = os.getenv("CONTEXT_CATALOG_URI",
"http://context-catalog:8080/context.jsonld")
# Notifier
NOTIFIER_URI = os.getenv("NOTIFIER_URI", "http://notifier-consumer:8082/notify")
# Init NGSI-LD Client
configuration = NGSILDConfiguration(host=BROKER_URI)
configuration.debug = True
ngsi_ld = NGSILDClient(configuration=configuration)
ngsi_ld.set_default_header(
header_name="Link",
header_value='<{0}>; '
'rel="http://www.w3.org/ns/json-ld#context"; '
'type="application/ld+json"'.format(CONTEXT_CATALOG_URI)
)
ngsi_ld.set_default_header(
header_name="Accept",
header_value="application/json"
)
LIST_ENTITIES = [
"IotDevice"
]
# Init FastAPI server
app = FastAPI(
title="Notifier Tester API",
version="1.0.0")
@app.on_event("startup")
async def startup_event():
for entity in LIST_ENTITIES:
endpoint = Endpoint(
uri = NOTIFIER_URI,
accept="application/json"
)
# Periodic Subscriptions
"""
notification_params = NotificationParams (
endpoint=endpoint,
format="normalized"
# attributes=["name"]
)
subs_request = CreateSubscriptionRequest (
id="urn:ngsi-ld:Subscription:Periodic:{0}".format(entity),
type="Subscription",
entities=[{ "type": entity }],
description="Periodic subscription Test.",
timeInterval= 10,
notification=notification_params
)
"""
# On-Change Subscriptions
notification_params = NotificationParams (
endpoint=endpoint,
format="normalized",
attributes=["name", "hasSensor"],
# sysAttrs=True,
# showChanges=True
)
subs_request = CreateSubscriptionRequest (
id="urn:ngsi-ld:Subscription:OnChange:{0}".format(entity),
type="Subscription",
entities=[{ "type": entity }],
description="On-change subscription to entities for changes within the Properties name and hasSensor.",
watchedAttributes=["name", "hasSensor"],
notification=notification_params
)
api_instance = ngsi_ld_client.ContextInformationSubscriptionApi(ngsi_ld)
api_instance.create_subscription(create_subscription_request=subs_request)
@app.post("/notify",
status_code=status.HTTP_200_OK)
async def receiveNotification(request: Request):
notification = await request.json()
for entity in notification["data"]:
logger.info("Entity notification: %s\n" % entity)