Skip to content

Commit e24bd55

Browse files
committed
Finish
1 parent acdb468 commit e24bd55

2 files changed

Lines changed: 18 additions & 26 deletions

File tree

crates/fluxqueue-worker/src/task.rs

Lines changed: 17 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -100,10 +100,7 @@ impl TaskRegistry {
100100
Ok(Some(context))
101101
})
102102
} else {
103-
Err(anyhow!(
104-
"Context '{}' wasn't found in the registry",
105-
&context_name
106-
))
103+
Ok(None)
107104
}
108105
}
109106
}
@@ -179,15 +176,11 @@ impl PythonDispatcher {
179176
}
180177
}
181178

182-
struct CoroWithContext {
183-
args: Py<PyTuple>,
184-
kwargs: Py<PyDict>,
185-
context: Arc<Py<PyAny>>,
186-
}
187-
188179
struct MaybeCoro {
189180
func: Arc<Py<PyAny>>,
190-
with_context: Option<CoroWithContext>,
181+
args: Py<PyTuple>,
182+
kwargs: Py<PyDict>,
183+
context: Option<Arc<Py<PyAny>>>,
191184
}
192185

193186
async fn run_task(
@@ -244,15 +237,11 @@ async fn run_task(
244237
let is_coroutine = is_coroutine(py, task_data.func.clone())?;
245238

246239
if is_coroutine {
247-
let with_context = context.map(|context| CoroWithContext {
240+
Ok(Some(MaybeCoro {
241+
func: task_data.func.clone(),
248242
args: args_tuple.unbind(),
249243
kwargs: kwargs_dict.unbind(),
250244
context,
251-
});
252-
253-
Ok(Some(MaybeCoro {
254-
func: task_data.func.clone(),
255-
with_context,
256245
}))
257246
} else {
258247
task_data
@@ -283,23 +272,28 @@ async fn run_task(
283272

284273
if let Some(maybe_coro) = maybe_coro {
285274
let fut = Python::attach(|py| {
286-
if let Some(with_context) = maybe_coro.with_context {
275+
if let Some(context) = maybe_coro.context {
287276
let task_metadata = get_task_metadata(py, task.clone())
288277
.map_err(|e| PyRuntimeError::new_err(e.to_string()))?;
289-
let result = with_context.context.call_method1(
278+
let result = context.call_method1(
290279
py,
291280
"_run_async_task",
292281
(
293282
maybe_coro.func.as_any(),
294283
task_metadata,
295-
with_context.args,
296-
Some(with_context.kwargs),
284+
maybe_coro.args,
285+
Some(maybe_coro.kwargs),
297286
),
298287
)?;
299288
into_future(result.into_bound(py))
300289
} else {
301-
let func = maybe_coro.func.clone_ref(py);
302-
into_future(func.into_bound(py))
290+
let result = maybe_coro
291+
.func
292+
.call(py, maybe_coro.args, Some(maybe_coro.kwargs.bind(py)))
293+
.map_err(|e| anyhow!("Failed to call Python function: {:?}", e))
294+
.map_err(|e| PyRuntimeError::new_err(e.to_string()))?;
295+
296+
into_future(result.into_bound(py))
303297
}
304298
})?;
305299
fut.await?;

python/fluxqueue/context.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,7 @@ class Context:
1616

1717
def __init__(self) -> None:
1818
self._thread_local = threading.local()
19-
self._metadata_var: ContextVar[TaskMetadata] = ContextVar(
20-
"task_metadata", default=None
21-
)
19+
self._metadata_var: ContextVar[TaskMetadata] = ContextVar("task_metadata")
2220

2321
@property
2422
def thread_storage(self) -> dict[str, Any]:

0 commit comments

Comments
 (0)