Skip to content

Commit ccfee3e

Browse files
authored
Merge pull request #119 from nebulabroadcast/coalesce-requests
Coalesce requests
2 parents f6d8dff + 5b81591 commit ccfee3e

File tree

7 files changed

+152
-37
lines changed

7 files changed

+152
-37
lines changed

backend/api/order/set_rundown_order.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -52,10 +52,6 @@ async def set_rundown_order(
5252
)
5353
continue
5454

55-
if not item:
56-
nebula.log.trace(f"Skipping {item}", user=user.name)
57-
continue
58-
5955
# Shut-up mypy
6056
assert item is not None, "Item should not be None at this point"
6157
assert item["id_bin"] is not None, "Item w/o bin. Shouldn't happen."

backend/api/rundown/get_rundown.py

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,19 +4,19 @@
44
from nebula.enum import ObjectStatus, RunMode
55
from nebula.helpers.scheduling import get_pending_assets, parse_rundown_date
66

7-
from .models import RundownRequestModel, RundownResponseModel, RundownRow
7+
from .models import RundownResponseModel, RundownRow
88

99

10-
async def get_rundown(request: RundownRequestModel) -> RundownResponseModel:
10+
async def get_rundown(id_channel: int, date: str | None = None) -> RundownResponseModel:
1111
"""Get a rundown"""
12-
if not (channel := nebula.settings.get_playout_channel(request.id_channel)):
13-
raise nebula.BadRequestException(f"No such channel: {request.id_channel}")
12+
if not (channel := nebula.settings.get_playout_channel(id_channel)):
13+
raise nebula.BadRequestException(f"No such channel: {id_channel}")
1414

1515
request_start_time = time.monotonic()
16-
start_time = parse_rundown_date(request.date, channel)
16+
start_time = parse_rundown_date(date, channel)
1717
end_time = start_time + (3600 * 24)
1818
pending_assets = await get_pending_assets(channel.send_action)
19-
pskey = f"playout_status/{request.id_channel}"
19+
pskey = f"playout_status/{id_channel}"
2020

2121
query = """
2222
SELECT
@@ -75,9 +75,7 @@ async def get_rundown(request: RundownRequestModel) -> RundownResponseModel:
7575
last_event = None
7676
ts_broadcast = ts_scheduled = 0.0
7777

78-
async for record in nebula.db.iterate(
79-
query, request.id_channel, start_time, end_time
80-
):
78+
async for record in nebula.db.iterate(query, id_channel, start_time, end_time):
8179
id_event = record["id_event"]
8280
id_item = record["id_item"]
8381
id_bin = record["id_bin"]

backend/api/rundown/rundown_request.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import nebula
2+
from nebula.helpers.coalescer import Coalescer
23
from server.dependencies import CurrentUser
34
from server.request import APIRequest
45

@@ -34,4 +35,6 @@ async def handle(
3435
if not user.can("rundown_view", request.id_channel):
3536
raise nebula.ForbiddenException("You are not allowed to view this rundown")
3637

37-
return await get_rundown(request)
38+
coalesce = Coalescer()
39+
rundown = await coalesce(get_rundown, request.id_channel, request.date)
40+
return rundown

backend/api/scheduler/scheduler.py

Lines changed: 27 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,33 @@
11
import nebula
2-
from nebula.helpers.create_new_event import create_new_event
2+
from nebula.helpers.create_new_event import EventData, create_new_event
33
from nebula.helpers.scheduling import parse_rundown_date
44

5-
from .models import SchedulerRequestModel, SchedulerResponseModel
5+
from .models import SchedulerResponseModel
66
from .utils import delete_events, get_event_at_time, get_events_in_range
77

8-
98
async def scheduler(
10-
request: SchedulerRequestModel,
9+
id_channel: int,
10+
date: str | None = None,
11+
days: int = 7,
12+
delete: list[int] | None = None,
13+
events: list[EventData] | None = None,
1114
editable: bool = True,
1215
user: nebula.User | None = None,
1316
) -> SchedulerResponseModel:
1417
"""Modify and display channel schedule"""
15-
start_time: float | None = None
16-
end_time: float | None = None
1718

1819
username = user.name if user else None
20+
start_time: float | None = None
21+
end_time: float | None = None
22+
delete = delete or []
23+
events = events or []
1924

20-
if not (channel := nebula.settings.get_playout_channel(request.id_channel)):
21-
raise nebula.BadRequestException(f"No such channel {request.id_channel}")
25+
if not (channel := nebula.settings.get_playout_channel(id_channel)):
26+
raise nebula.BadRequestException(f"No such channel {id_channel}")
2227

23-
if request.date:
24-
start_time = parse_rundown_date(request.date, channel)
25-
end_time = start_time + (request.days * 86400)
28+
if date:
29+
start_time = parse_rundown_date(date, channel)
30+
end_time = start_time + (days * 86400)
2631

2732
affected_events: list[int] = []
2833
affected_bins: list[int] = []
@@ -31,14 +36,14 @@ async def scheduler(
3136
# Delete events
3237
#
3338

34-
if request.delete and editable:
35-
deleted_event_ids = await delete_events(request.delete, user=user)
39+
if delete and editable:
40+
deleted_event_ids = await delete_events(delete, user=user)
3641
affected_events.extend(deleted_event_ids)
3742
#
3843
# Create / update events
3944
#
4045

41-
for event_data in request.events:
46+
for event_data in events:
4247
if not editable:
4348
# weird syntax, but keeps indentation level low
4449
break
@@ -127,11 +132,16 @@ async def scheduler(
127132
# Return existing events
128133

129134
if (start_time is not None) and (end_time is not None):
130-
events = await get_events_in_range(channel.id, start_time, end_time, user=user)
135+
c_events = await get_events_in_range(
136+
channel.id,
137+
start_time,
138+
end_time,
139+
user=user,
140+
)
131141
else:
132-
events = []
142+
c_events = []
133143
return SchedulerResponseModel(
134-
events=[e.meta for e in events],
144+
events=[e.meta for e in c_events],
135145
affected_events=affected_events,
136146
affected_bins=affected_bins,
137147
)

backend/api/scheduler/scheduler_request.py

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import nebula
2+
from nebula.helpers.coalescer import Coalescer
23
from nebula.helpers.scheduling import bin_refresh
34
from server.dependencies import CurrentUser, RequestInitiator
45
from server.request import APIRequest
@@ -29,8 +30,32 @@ async def handle(
2930
if not user.can("scheduler_view", request.id_channel):
3031
raise nebula.ForbiddenException("You are not allowed to view this channel")
3132

33+
if not (request.events or request.delete):
34+
# Read-only request. coalesce the requests and
35+
# Return directly
36+
coalesce = Coalescer()
37+
result = await coalesce(
38+
scheduler,
39+
request.id_channel,
40+
date=request.date,
41+
days=request.days,
42+
user=user,
43+
)
44+
return result
45+
46+
# Write request. Do not coalesce, and send notifications
47+
3248
editable = user.can("scheduler_edit", request.id_channel)
33-
result = await scheduler(request, editable, user=user)
49+
50+
result = await scheduler(
51+
request.id_channel,
52+
date=request.date,
53+
days=request.days,
54+
editable=editable,
55+
events=request.events,
56+
delete=request.delete,
57+
user=user,
58+
)
3459

3560
if result.affected_bins:
3661
await bin_refresh(
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
"""
2+
This module provides a Coalescer class that prevents multiple identical,
3+
concurrent asynchronous operations from being executed simultaneously.
4+
5+
If you invoke an async function (e.g., fetch_data(id=1)) through the Coalescer instance,
6+
and another call to fetch_data(id=1) is already in progress, the Coalescer ensures
7+
that the second (and any subsequent identical) call will simply await the result
8+
of the first, ongoing operation, rather than initiating a new, redundant execution.
9+
10+
In essence: It's a smart wrapper for async functions that says:
11+
12+
> If I'm already doing this exact same thing, don't start it again.
13+
> Just wait for the one already running to finish and share its result.
14+
15+
## Example usage:
16+
17+
```
18+
async def fetch_data(item_id: int):
19+
print(f"Fetching data for {item_id}...")
20+
await asyncio.sleep(2) # Simulate network call
21+
return f"Data for {item_id}"
22+
23+
coalescer = Coalescer()
24+
25+
# These calls will be coalesced if made close together
26+
27+
task1 = asyncio.create_task(coalescer(fetch_data, item_id=1))
28+
task2 = asyncio.create_task(coalescer(fetch_data, item_id=1)) # Will use task1's future
29+
task3 = asyncio.create_task(coalescer(fetch_data, item_id=2)) # New actual call
30+
31+
result1 = await task1
32+
result2 = await task2
33+
result3 = await task3
34+
print(result1, result2, result3)
35+
```
36+
"""
37+
38+
import asyncio
39+
import hashlib
40+
from collections.abc import Callable, Coroutine
41+
from typing import Any, Generic, TypeVar
42+
43+
44+
def _hash_args(func: Callable[..., Any], *args: Any, **kwargs: Any) -> str:
45+
"""Generates a hash from the function arguments and keyword arguments."""
46+
func_id = str(id(func))
47+
arg_str = str(args)
48+
kwarg_str = str(sorted(kwargs.items()))
49+
combined_str = arg_str + kwarg_str + func_id
50+
return hashlib.md5(combined_str.encode()).hexdigest() # noqa: S324
51+
52+
53+
T = TypeVar("T")
54+
55+
56+
class Coalescer(Generic[T]):
57+
_instance: "Coalescer[Any] | None" = None
58+
lock: asyncio.Lock
59+
current_futures: dict[str, asyncio.Task[T]]
60+
61+
def __new__(cls) -> "Coalescer[Any]":
62+
if cls._instance is None:
63+
cls._instance = super().__new__(cls)
64+
cls._instance.current_futures = {}
65+
cls._instance.lock = asyncio.Lock()
66+
return cls._instance
67+
68+
async def __call__(
69+
self,
70+
func: Callable[..., Coroutine[Any, Any, T]],
71+
*args: Any,
72+
**kwargs: Any,
73+
) -> T:
74+
key = _hash_args(func, *args, **kwargs)
75+
async with self.lock:
76+
if key not in self.current_futures:
77+
self.current_futures[key] = asyncio.create_task(func(*args, **kwargs))
78+
79+
try:
80+
return await self.current_futures[key]
81+
finally:
82+
async with self.lock:
83+
self.current_futures.pop(key, None)

backend/nebula/settings/models.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ class SSOProvider(SettingsModel):
9898
str,
9999
Field(
100100
title="Name",
101-
example="myoauth",
101+
examples=["myoauth"],
102102
),
103103
]
104104

@@ -107,7 +107,7 @@ class SSOProvider(SettingsModel):
107107
Field(
108108
title="Title",
109109
description="Used on the SSO button on the login page",
110-
example="Log in using MyOauth",
110+
examples=["Log in using MyOauth"],
111111
),
112112
]
113113

@@ -124,23 +124,23 @@ class SSOProvider(SettingsModel):
124124
Field(
125125
title="Entrypoint",
126126
description="URL to the SSO provider configuration endpoint",
127-
example="https://iam.example.com/realms/nebula/.well-known/openid-configuration",
127+
examples=["https://iam.example.com/realms/nebula/.well-known/openid-configuration"],
128128
),
129129
] = None
130130

131131
client_id: Annotated[
132132
str,
133133
Field(
134134
title="Client ID",
135-
example="myclientid",
135+
examples=["myclientid"],
136136
),
137137
]
138138

139139
client_secret: Annotated[
140140
str,
141141
Field(
142142
title="Client secret",
143-
example="myclientsecret",
143+
examples=["myclientsecret"],
144144
),
145145
]
146146

0 commit comments

Comments
 (0)