Skip to content

Conversation

@guzman-raphael
Copy link
Contributor

@guzman-raphael guzman-raphael commented Jun 29, 2025

Depends on #91, #94

Features

  • Add Pipeline model with DOT support.
    • graph_dot input is used to build a petgraph::DiGraph.
    • metadata is a lookup table of the node type i.e. Kernel.
    • input_spec is a map that defines keys required to feed in an input Packet to the Pipeline. Each key can be associated with one or more node(s)/key(s).
    • output_spec is a map that defines keys to create an output Packet. Each key is linked to exactly one node:key.
    • input_spec/output_spec is explicit, flexible but most importantly equivalent in structure to Pod to facilitate composing pipelines of mixed pods and other pipelines.
    • pipeline.make_dot() is also available to make it easier to visualize the compute nodes.
  • Add PipelineJob model.
    • pipeline points to the reference pipeline.
    • input_packet is a map of packet keys to path sets. Notice that each key can have a collection of path sets. This allows batching several inputs in one go. When inputs are batched (length > 1) for keys, the cartesian product will be applied if they correspond to the same input node.
    • output_dir is the root output directory for all produced computations. A pod job's output_dir will be mounted to the following tree structure: {pipeline_job_output_dir}/{node_name}/{input_packet_hash}/. Currently the packet is not hashed and a simple random hash is used.
  • Add PipelineResullt model.
    • status captures the final state of a pipeline run.
  • Add JoinOperator which performs cartesian product on parent streams (itertools crate makes this very straightforward).
  • Add MapOperator which allows renaming packet keys.
  • Add async PipelineJob execution algorithm.
    • It requires an orchestrator agent as it dispatched to the agent network for processing. Agent required since it has direct access to orchestrator and doesn't place a dependency on user resources to manage a pipeline run. Agent should be installed/started in whichever network/infrastructure topology makes sense for the user/system e.g. remotely, local, etc.
    • All communication between nodes is conducted via an agent_client (facilitated by zenoh crate). This lends itself to allowing even the operator logic to benefit from distributed, coordinated compute (in the future). Currently, there is no coordination between agent nodes.
    • Pipeline job node coordination uses the following communication/topic structure: group/{group}/status/pipeline_job/{pipeline_hash}/{input|output}/{node_name}. Appropriate payloads (e.g. Payload::Stream(Packet)) are published to these topics. Nodes referenced in pipeline.input_spec listen on input while all others listen on output. To signal successful completion of a stream, Payload::End is sent.
    • Once all parent streams end, there are no more packets actively processing on a node, and no packets queued, then it will publish its own Payload::End. This will signal a node has reached a state of NodeState::Completed.
    • If any packet is unsuccessful (Payload as Cancelled or Failed(..)), the entire node fails immediately and any descendants will be cancelled immediately. The node will be marked Failed(..) along with its error message.
    • If a node is unsuccessful (pod_result.state that is not Completed for any packet), the pipeline does not fail immediately. It will continue on a best effort since there is value in evaluating unrelated nodes e.g. for memoization.
    • If a pipeline run has concluded and all nodes have reached a NodeState::Completed, then the pipeline run was successful. Otherwise, it failed.
    • Operators use mutexes since they may need to share a state between packets e.g. JoinOperator needs to remember prior packets.
    • If any node encounters a Rust error during processing, it crashes the agent. Intentionally left it this way since it is undefined behavior and while the feature is young, we should probably catch more of these cases to properly fix/address them.
  • Add PipelineRun.
    • Similar to pod_run, this provides a way to interact with a pipeline while it is running which can be useful to poll for state.
    • pipeline_run.attach() provides a way to listen for updates from the agent network to track state.
    • pipeline_run.summarize_dot() provides a minimal way to visualize the state at any point in time as a DOT. When combined with a loop and a DOT->SVG tool (like graphviz's dot CLI), this can provide a live animation of the pipeline run progress.
  • Expose agent_client.start_pipeline_run(..). Similar to to its orchestrator counterpart, it will return immediately since it will be processed as detached.
  • Expose agent_client.get_pipeline_result(..). Similar to its orchestrator counterpart, it will wait to respond until the pipeline run has concluded.

Small features and fixes

  • Add graph utilities. When generating a DOT, additional customizations can be supplied to control styling e.g. title, caption, node colors, extra labels for nodes, etc. petgraph crate helps in traversing the graph and generating a DOT. layout crate helps parsing a DOT to create a petgraph::DiGraph.
  • output_packet in PodResult which evaluates the checksum on all expected, generated output. Was needed to convert output packets to input packets in a pipeline run. If it fails, then partial output is allowed. If it succeeds, partial output is not allowed. Fix Add output_packet to Pod Result #89
  • Convert pod.command from &str -> Vec<String> to allow more flexibility. Having this made it simpler to create pipeline test cases. Fix Change command from string to a vec of string #95
  • Add random hash generator (via hex and rand crates). Used for creating non-colliding pod job output directories for processing packets (temporary until we hash input packets) and a pipeline job hash (temporary until we can hash pipeline job consistently).
  • Expand RE_AGENT_KEY_EXPR regex to allow capturing of more metadata. Use it as the primary source for loading metadata.
  • Convert exposed enums over CFFI to have variants with named fields since the UX/help in Python is nicer.

Housekeeping

  • Add pipeline_test.ipynb as a DEMO that illustrates how to use the pipeline feature.
  • Add test cargo feature to allow exposing more to integration tests while still keeping the default API private. Features default and test cannot be combined. This allows us to finally make all of core submodules private by default.
  • Add to crate diagram.
  • Rename orchestrator::Status -> orchestrator::PodStatus to make it more distinct.
  • Simplify RE_MODEL_METADATA regex.
  • Rename agent_client.submit_pod_jobs(..) -> agent_client.start_pod_jobs(..) to be more consistent with orchestrator API design.

@codecov
Copy link

codecov bot commented Jun 29, 2025

@Synicix
Copy link
Contributor

Synicix commented Jul 1, 2025

@guzman-raphael Is this supposed to be your version of pipeline implmentation?

@guzman-raphael
Copy link
Contributor Author

guzman-raphael commented Jul 1, 2025

@Synicix Since the pipeline feature is a big one, I've been getting a head start really to help me review your PRs.

I'm not sure yet if I'll make this PR "ready for review" but at least wanted to make my brainstorming visible in case it helps with discussions.

@Synicix
Copy link
Contributor

Synicix commented Jul 1, 2025

@guzman-raphael
A lot of the core logic and how I do things won't change much with the pending features I plan on adding later today. So it is like 90% ready for review, since I am still proofread it, plus the missing verification feature for input_spec.

@review-notebook-app
Copy link

Check out this pull request on  ReviewNB

See visual diffs & provide feedback on Jupyter Notebooks.


Powered by ReviewNB

…b, add map operator, modify pod command to Vec<String> as opposed to String to allow more use cases, apply node shape based on kernel, covert enum variant from unnamed to named fields to improve python UX, remove unused metadata from make_graph, make core crypto/model functions private by default, apply clippy implicit_hasher suggestion.
@guzman-raphael guzman-raphael linked an issue Jul 5, 2025 that may be closed by this pull request
@guzman-raphael guzman-raphael changed the title Add Pipeline model Add Pipeline support Jul 5, 2025
@guzman-raphael guzman-raphael changed the title Add Pipeline support Add Pipeline feature Jul 5, 2025
…e flexible, add stopgap hash for pipeline job (unique each time), add placeholder for pipeline job scheduler, add to crate diagram, simplify store regex, capture more in available event metadata, and remove unused event info + classifier to simplify.
…le to pipeline scheduler, and increase clippy error size threshold.
…pose `PipelineRun` to keep track of pipeline job state, add `get_pipeline_result` to agent, update pipeline demo, create a packet tuple struct, rename `orchestrator::Status` to `PodStatus` to make more clear, and expose optional customization to generated graphs.
… DOT to petgraph, use jinja for DOT templating, remove SVG support in favor of graphviz CLI, remove layout dependency, and make get util more generic.
…flexible, allow pipeline.make_dot(..) to include/exclude style, and move function optional variables to end to be compatible with uniffi defaults.
…-contained groups, reduce error message size, optimize iteration workflow with llvm-cov, and increase resolution of pipeline execution visualization in demo.
…d to PipelineRun/PipelineResult, improve pipeline execution animation in demo, and add monitor note to pipeline demo.
…l utility to verify pipelines in tests, and stream prints during tests invoked using VSCode in-place GUI.
@Synicix
Copy link
Contributor

Synicix commented Aug 28, 2025

Close this draft to clear up the PR (Save it PR logs for reference when needed)

@Synicix Synicix closed this Aug 28, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Change command from string to a vec of string Add output_packet to Pod Result

2 participants