Skip to content

Commit e649366

Browse files
Vlad/ab 683 (#1190)
* feat: persist outputs to KV * fix: types
1 parent 5eed65e commit e649366

4 files changed

Lines changed: 91 additions & 0 deletions

File tree

src/writer/blueprints.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import writer.blocks.base_block
1515
import writer.core
1616
import writer.core_ui
17+
from writer.journal import JournalRecord
1718
from writer.ss_types import BlueprintExecutionError, BlueprintExecutionLog, WriterConfigurationError
1819

1920
MAX_DAG_DEPTH = 32
@@ -744,6 +745,12 @@ def run(self) -> Optional[Any]:
744745
return self._execute(executor, event)
745746

746747
def _execute(self, executor: ThreadPoolExecutor, abort_event: threading.Event) -> Optional[Any]:
748+
journal_record = JournalRecord(
749+
execution_environment=self.execution_environment,
750+
title=self.status_logger.title,
751+
graph=self.graph
752+
)
753+
747754
while self.queue or self.futures:
748755
while self.queue:
749756
node: GraphNode = self.queue.pop(0)
@@ -756,6 +763,7 @@ def _execute(self, executor: ThreadPoolExecutor, abort_event: threading.Event) -
756763
if abort_event.is_set():
757764
self._cancel_all_jobs()
758765
self.status_logger.log("Terminated.", entry_type="info", exit="aborted")
766+
journal_record.save(result="stopped")
759767
return None
760768
else:
761769
continue
@@ -768,11 +776,13 @@ def _execute(self, executor: ThreadPoolExecutor, abort_event: threading.Event) -
768776
except BlueprintExecutionError as e:
769777
self._cancel_all_jobs()
770778
self.status_logger.log("Execution failed", entry_type="error", exit=str(e))
779+
journal_record.save(result="error")
771780
raise e
772781
except BaseException as e:
773782
abort_event.set()
774783
self._cancel_all_jobs()
775784
self.status_logger.log("Execution failed.", entry_type="error", exit=str(e))
785+
journal_record.save(result="error")
776786
raise BlueprintExecutionError(
777787
f"Blueprint execution was stopped due to an error - {e.__class__.__name__}: {e}"
778788
) from e
@@ -793,6 +803,7 @@ def _execute(self, executor: ThreadPoolExecutor, abort_event: threading.Event) -
793803
self.queue.append(next_node)
794804

795805
self.status_logger.log("Execution completed.", entry_type="info", exit="completed")
806+
journal_record.save(result="success")
796807
return None
797808

798809
def _cancel_local_jobs(self):

src/writer/journal.py

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
import logging
2+
from datetime import datetime, timezone
3+
from typing import TYPE_CHECKING, Any, Dict, Literal, Union
4+
5+
from writer.core import Config
6+
from writer.keyvalue_storage import writer_kv_storage
7+
8+
if TYPE_CHECKING:
9+
from writer.blueprints import Graph, GraphNode
10+
from writer.core_ui import Component
11+
12+
13+
logger = logging.getLogger("journal")
14+
15+
16+
class JournalRecord:
17+
def __init__(
18+
self,
19+
execution_environment: Dict,
20+
title: str,
21+
graph: "Graph"
22+
):
23+
self.started_at = datetime.now(timezone.utc)
24+
self.instance_type = "editor" if Config.mode == "edit" else "agent"
25+
26+
self.trigger = {
27+
"event": execution_environment.get("context", {}).get("event"),
28+
"component": {}
29+
}
30+
31+
component: "Union[GraphNode, Component]"
32+
if self.trigger["event"] == "wf-run-blueprint":
33+
component = graph.nodes[0].component
34+
self.trigger["component"]["type"] = "blueprint"
35+
self.trigger["component"]["id"] = graph.nodes[0].component.parentId
36+
else:
37+
component = graph.get_start_nodes()[0]
38+
self.trigger["component"]["type"] = "block"
39+
self.trigger["component"]["id"] = graph.get_start_nodes()[0].id
40+
41+
if "API" in title:
42+
if getattr(component, "type", "") == "blueprints_crontrigger":
43+
self.trigger["type"] = "Cron"
44+
else:
45+
self.trigger["type"] = "API"
46+
elif "UI" in title:
47+
self.trigger["type"] = "UI"
48+
else:
49+
self.trigger["type"] = "On demand"
50+
51+
self.graph = graph
52+
53+
def to_dict(self) -> Dict[str, Any]:
54+
block_outputs = {}
55+
for graph_node in self.graph.nodes:
56+
block_outputs[graph_node.id] = {"result": graph_node.result, "outcome": graph_node.outcome}
57+
return {
58+
"timestamp": self.started_at.isoformat(),
59+
"instanceType": self.instance_type,
60+
"trigger": self.trigger,
61+
"blockOutputs": block_outputs,
62+
}
63+
64+
def construct_key(self) -> str:
65+
return f"wf-journal-{self.instance_type[0]}-{int(self.started_at.timestamp() * 1000)}"
66+
67+
def save(self, result: Literal["success", "error", "stopped"]) -> None:
68+
if "journal" not in Config.feature_flags or not writer_kv_storage.is_accessible():
69+
return
70+
data = self.to_dict()
71+
data["result"] = result
72+
writer_kv_storage.save(self.construct_key(), data)

src/writer/keyvalue_storage.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,4 +80,11 @@ def _request(self, request_func: _WrappedRequestFunc) -> httpx.Response:
8080
response.raise_for_status()
8181
return response
8282

83+
def is_accessible(self) -> bool:
84+
if None in self._get_agent_ids():
85+
return False
86+
if None in (self.api_key, self.api_url):
87+
return False
88+
return True
89+
8390
writer_kv_storage = KeyValueStorage()

src/writer/logs.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -246,6 +246,7 @@ def get_logger(level: str = WRITER_LOG_LEVEL):
246246
"writer": get_logger(),
247247
"app": get_logger(),
248248
"from_app": get_logger(),
249+
"journal": get_logger(),
249250
"kv_storage": get_logger(),
250251
"vault": get_logger(),
251252
"exec_logger": {

0 commit comments

Comments
 (0)