Skip to content

Commit 92af799

Browse files
resource monitor
1 parent abd695e commit 92af799

File tree

12 files changed

+86
-54
lines changed

12 files changed

+86
-54
lines changed

projects/sbos-minimal/sbos/minimal/interfaces/actuation/actuation_interface.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ async def actuate(self, domain, entity_id, value):
4545
)
4646
actuation_time = time.time() - start # for benchmark
4747
except Exception as e:
48+
logger.exception(e)
4849
success, detail = False, f"{e}"
4950
return success, detail, driver_time, actuation_time
5051

projects/sbos-minimal/sbos/minimal/interfaces/actuation/metasys/__init__.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,14 @@
77

88
class MetasysActuation(BaseActuation):
99
def __init__(self, *args, **kwargs):
10-
pass
10+
self.channel_address = "derconnect-brick-bm.sdsc.edu:50051"
1111

1212
async def actuate(self, entity_id, value, external_references):
1313
sensor_id = external_references[
1414
"https://brickschema.org/schema/Brick/ref#metasysID"
1515
]
16-
logger.info("metasys: {} {}", sensor_id, value)
17-
async with grpc.aio.insecure_channel("172.17.0.1:50051") as channel:
16+
logger.info("metasys actuate: {} -> {} {}", entity_id, sensor_id, value)
17+
async with grpc.aio.insecure_channel(self.channel_address) as channel:
1818
stub = actuate_pb2_grpc.ActuateStub(channel)
1919
response: actuate_pb2.Response = await stub.TemporaryOverride(
2020
actuate_pb2.TemporaryOverrideAction(
@@ -28,8 +28,8 @@ async def read(self, entity_id, external_references):
2828
sensor_id = external_references[
2929
"https://brickschema.org/schema/Brick/ref#metasysID"
3030
]
31-
logger.info("metasys read: {}", sensor_id)
32-
async with grpc.aio.insecure_channel("172.17.0.1:50051") as channel:
31+
logger.info("metasys read: {} -> {}", entity_id, sensor_id)
32+
async with grpc.aio.insecure_channel(self.channel_address) as channel:
3333
stub = actuate_pb2_grpc.ActuateStub(channel)
3434
response: actuate_pb2.Response = await stub.ReadObjectCurrent(
3535
actuate_pb2.ReadObjectCurrentAction(

projects/sbos-minimal/sbos/minimal/interfaces/timeseries/asyncpg/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -404,6 +404,7 @@ async def add_history_data(
404404
):
405405
table_name = self.get_history_table_name(domain_name)
406406
async with self.pool.acquire() as conn:
407+
logger.info("{}", domain_name)
407408
res = await conn.execute(
408409
f"""INSERT INTO {table_name} (uuid, user_id, app_name, domain_user_app, time, value)
409410
VALUES ('{entity_id}', '{user_id}', '{app_name}', '{domain_user_app}', '{time}', '{value}');"""

projects/sbos-minimal/sbos/minimal/services/actuation.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
from fastapi import APIRouter, Body, Depends
66
from fastapi_restful.cbv import cbv
77
from starlette.requests import Request
8+
from loguru import logger
89

910
from sbos.minimal import models, schemas
1011
from sbos.minimal.interfaces import ActuationInterface, BaseTimeseries, GraphDB
@@ -66,11 +67,12 @@ async def actuate_entity(self, domain, jwt_payload, entity_id, actuation_payload
6667
) = await self.actuation_iface.actuate(
6768
domain, entity_id, actuation_payload[0]
6869
)
70+
logger.info("success: {}, detail: {}", success, detail)
6971
await self.ts_db.add_history_data(
7072
domain.name,
7173
entity_id,
7274
jwt_payload["sub"],
73-
jwt_payload["app_name"],
75+
jwt_payload["app"],
7476
jwt_payload.get("domain_user_app", ""),
7577
arrow.now(),
7678
actuation_payload[0],
@@ -81,6 +83,7 @@ async def actuate_entity(self, domain, jwt_payload, entity_id, actuation_payload
8183
(policy_time, guard_time, driver_time, actuation_time),
8284
)
8385
except Exception as e:
86+
logger.exception(e)
8487
return False, f"{e}", (policy_time, guard_time, driver_time, actuation_time)
8588

8689
@router.post(

projects/sbos-monitor/sbos/monitor/__main__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,9 +51,10 @@ async def validate_resources_in_domain(domain, domain_data):
5151
violated_constraints[entity_id] = real_value
5252
else:
5353
logger.error(response)
54+
logger.error(response_dict)
5455

5556
if len(violated_constraints) > 0:
56-
url = f"{playground_api_base}/domains/{domain}/resources"
57+
url = f"{playground_api_base}/domains/{domain}/resources/notify"
5758
data = {k: [str(v)] for k, v in violated_constraints.items()}
5859
response = await client.post(url, json=data, headers=headers)
5960
response_dict = response.json()

projects/sbos-monitor/sbos/monitor/config/settings/base.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ class MonitorBaseSettings(DatabaseMongoDBSettings):
2424
# VERSION: str = "0.1.0"
2525
PLAYGROUND_HOST: str = Field(default="localhost", description="The hostname of playground server.")
2626
PLAYGROUND_PORT: int = Field(default=9000, description="The port of playground server")
27-
PLAYGROUND_JWT_TOKEN: str = Field(default="eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJ1c2VyQGV4YW1wbGUuY29tIiwiYXVkIjpbImJyaWNrIl0sImV4cCI6MTc0NjM4MDAzMX0.LQ8k5qEBTo_6abTNzPdmX3Vkty6iP678Hb2w51NWclo")
27+
PLAYGROUND_JWT_TOKEN: str = Field(default="eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJsaXV5aDk3MDYxNUBnbWFpbC5jb20iLCJhdWQiOlsiYnJpY2siXSwiZXhwIjoxNzQ2ODI1MzgyfQ.f2p6KSknZ0oWWY769mpZzZWEqDb4HK3h40WhF0Tlj7Q")
2828
POLLING_INTERVAL: float = Field(default=5, description="The interval of polling in seconds.")
2929

3030
class Config:

projects/sbos-playground/docker-compose.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ services:
9494
environment:
9595
ME_CONFIG_BASICAUTH_USERNAME: ${MONGO_USERNAME:-root}
9696
ME_CONFIG_BASICAUTH_PASSWORD: ${MONGO_PASSWORD:-pass}
97-
ME_CONFIG_MONGODB_ADMINUSERNAME: ${MONGO_USERNAME:-r oot}
97+
ME_CONFIG_MONGODB_ADMINUSERNAME: ${MONGO_USERNAME:-root}
9898
ME_CONFIG_MONGODB_ADMINPASSWORD: ${MONGO_PASSWORD:-pass}
9999
# ME_CONFIG_MONGODB_URL: mongodb://${MONGO_USERNAME:-root}:${MONGO_PASSWORD:-pass}@mongo:27017/
100100
ME_CONFIG_MONGODB_URL: mongodb://mongo:27017/

projects/sbos-playground/sbos/playground/interfaces/scheduling_policy/base.py

Lines changed: 2 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
from sbos.minimal.interfaces import ActuationInterface, AsyncpgTimeseries
77

88
from sbos.playground import models, schemas
9-
from sbos.playground.interfaces.app_management import stop_container
109

1110

1211
class SchedulingPolicyBase(ABC):
@@ -30,38 +29,6 @@ async def find_victim(
3029
):
3130
raise NotImplementedError()
3231

33-
async def kill_app(self, domain_user_app_id):
34-
domain_user_app: models.DomainUserApp = await models.DomainUserApp.get(
35-
domain_user_app_id
36-
)
37-
if domain_user_app is not None:
38-
logger.info("kill app: {}", domain_user_app)
39-
try:
40-
stop_container(domain_user_app.container_id)
41-
except Exception as e:
42-
logger.exception(e)
43-
44-
async def relinquish_entity(self, entity_id):
45-
logger.info("relinquish entity: {}", entity_id)
46-
try:
47-
await self.actuation_iface.actuate(entity_id, "null")
48-
except Exception as e:
49-
logger.exception(e)
50-
32+
@abstractmethod
5133
async def schedule(self, notify_resource: schemas.ResourceConstraintRead):
52-
entity_ids = await self.blame(notify_resource.entity_id)
53-
# get history with entity_ids (postgres)
54-
history = await self.ts_db.get_history_data(self.domain.name, entity_ids)
55-
# [('additionalProp1', 'admin', 'sb', datetime.datetime(2023, 7, 1, 18, 8, 0, 963859)),
56-
# ('additionalProp3', 'admin', 'sb', datetime.datetime(2023, 7, 1, 18, 8, 1, 205563)),
57-
# ('additionalProp1', 'admin', 'sb', datetime.datetime(2023, 7, 2, 2, 37, 25))]
58-
logger.info(history)
59-
victim_apps, victim_entity_ids = await self.find_victim(
60-
history, notify_resource
61-
)
62-
logger.info("victim apps: {}", victim_apps)
63-
logger.info("victim entity_ids: {}", victim_entity_ids)
64-
await asyncio.gather(
65-
*[self.kill_app(app) for app in victim_apps],
66-
*[self.relinquish_entity(entity_id) for entity_id in victim_entity_ids]
67-
)
34+
raise NotImplementedError()

projects/sbos-playground/sbos/playground/interfaces/scheduling_policy/naive.py

Lines changed: 55 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,17 @@
11
import itertools
22
from typing import Any, List
33

4-
from sbos.playground import schemas
4+
from loguru import logger
5+
from sbos.minimal.interfaces import ActuationInterface, AsyncpgTimeseries
6+
7+
from sbos.playground import models, schemas
8+
from sbos.playground.interfaces.app_management import stop_container
59
from sbos.playground.interfaces.scheduling_policy.base import SchedulingPolicyBase
610

711

812
class SchedulingPolicyNaive(SchedulingPolicyBase):
913
async def blame(self, entity_id: str) -> List[str]:
10-
return ["additionalProp1", "additionalProp3"]
14+
return ["http://ucsd.edu/ontology/building/Center_Hall#AH-6.WC-ADJ"]
1115

1216
async def find_victim(
1317
self, history: List[Any], notify_resource: schemas.ResourceConstraintRead
@@ -20,3 +24,52 @@ async def find_victim(
2024
victim_app = app_latest_use_time[-1][3]
2125
victim_entity = app_latest_use_time[-1][0]
2226
return [victim_app], [victim_entity]
27+
28+
async def kill_app(self, domain_user_app_id):
29+
domain_user_app: models.DomainUserApp = await models.DomainUserApp.get(
30+
domain_user_app_id
31+
)
32+
if domain_user_app is not None and domain_user_app.status == schemas.DockerStatus.RUNNING:
33+
logger.info("kill app: {}", domain_user_app)
34+
try:
35+
loop = asyncio.get_running_loop()
36+
container = await loop.run_in_executor(None, stop_container, domain_user_app.container_id)
37+
domain_user_app.status = container.status
38+
await domain_user_app.save()
39+
return True
40+
except Exception as e:
41+
logger.exception(e)
42+
43+
return False
44+
async def relinquish_entity(self, entity_id):
45+
logger.info("relinquish entity: {}", entity_id)
46+
try:
47+
await self.actuation_iface.actuate(self.domain, entity_id, "null")
48+
except Exception as e:
49+
logger.exception(e)
50+
51+
async def process_app_and_entity(self, domain_user_app_id, entity_id):
52+
killed = await self.kill_app(domain_user_app_id)
53+
if killed:
54+
await self.relinquish_entity(entity_id)
55+
56+
async def schedule(self, notify_resource: schemas.ResourceConstraintRead):
57+
entity_ids = await self.blame(notify_resource.entity_id)
58+
# get history with entity_ids (postgres)
59+
history = await self.ts_db.get_history_data(self.domain.name, entity_ids)
60+
# [('additionalProp1', 'admin', 'sb', datetime.datetime(2023, 7, 1, 18, 8, 0, 963859)),
61+
# ('additionalProp3', 'admin', 'sb', datetime.datetime(2023, 7, 1, 18, 8, 1, 205563)),
62+
# ('additionalProp1', 'admin', 'sb', datetime.datetime(2023, 7, 2, 2, 37, 25))]
63+
logger.info(history)
64+
victim_apps, victim_entity_ids = await self.find_victim(
65+
history, notify_resource
66+
)
67+
logger.info("victim apps: {}", victim_apps)
68+
logger.info("victim entity_ids: {}", victim_entity_ids)
69+
if len(victim_apps) > 0 and len(victim_entity_ids) > 0:
70+
await self.process_app_and_entity(victim_apps[0], victim_entity_ids[0])
71+
72+
# await asyncio.gather(
73+
# *[self.kill_app(app) for app in victim_apps],
74+
# *[self.relinquish_entity(entity_id) for entity_id in victim_entity_ids]
75+
# )

projects/sbos-playground/sbos/playground/schemas/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,4 +40,5 @@
4040
DomainResourceConstraintRead,
4141
ResourceConstraintRead,
4242
ResourceConstraintUpdate,
43+
ResourceConstraintDelete,
4344
)

0 commit comments

Comments
 (0)