Skip to content

development area: data-driven workflows #224

@oliver-sanders

Description

@oliver-sanders

Note

Discussion issue for the topic of "data-driven workflows".

Purpose: Outline a rough roadmap for potential future development in this area for future prioritisation (what work gets undertaken on what timeframe to be decided in later prioritisation). We won't produce a detailed implementation plan, hash out interfaces or technical details at this stage.

Comments: Please leave comments below rather than editing the OP (please reference the point number rather than just quoting). We'll discuss this at a future Cylc project meeting and write up (say as a GH project) when we're done (then close/lock this issue). Please keep it focused (see "Context" and "Action" below)!

Extensions: Suggesting new points is encouraged (if not already covered), but please keep it on topic, more discussions will follow this, one by one to allow us to focus.

Changelog:

  • [edit 2026-03-31] Added point 2.iv.c
  • [edit 2026-03-31] Added point 1.viii

Data driven workflows

Topics pertaining to how inputs/outputs are defined and exchanged between tasks/workflows.

Context: As workflow and suite (i.e, "suite" a collection of inter-dependent workflows) complexity grows, the matter of interfaces increases in priority. There is a growing emphasis on data provenance, traceability and observability, all problems which are hard to resolve at present due to poor visibility of data flow within and between Cylc workflows. Data driven paradigms also have benefit to workflow developers and allow for more decoupled workflows.

Action: There are several things we can do to formalise inputs/outputs in Cylc and facilitate the exchange of metadata (whilst continuing to offer existing paradigms of course).

  1. Support metadata in cylc messages:
    1. Allow task output messages to carry metadata, e.g, filesystem paths, or API locations for use by downstream tasks.
    2. One approach would be to support patterns in output messages:
      1. I.e, if a message matches an output pattern, the output has been generated. Regex matches could be used to extract the metadata.
      2. E.g:
        • cylc message -- 'dataset 1 written filepath: ... host: ... archive location: ...'
        • [outputs]dataset-1 = dataset 1 written filepath: (?P<filepath>[^ ]+) host: (?P<host>[^ ]+) ...
    3. An alternative to using message patterns might be to implement a more formal message payload in a given format, e.g, JSON:
      1. Note, this is basically what we do with xtriggers ATM.
      2. E.g, cylc message -- 'dataset 1 written' -- {'filepath': '...', 'host': '...', 'archive location': '...'}
      3. Slightly harder as it requires cylc message changes.
    4. Quick win - basic functionality very easy to implement (done in PoC), just need to make sure we do this in a way which leaves the door open to future work.
    5. How should we provide this metadata to the downstream task?
      1. In a PoC, I broadcasted the metadata as environment variable called INPUT_<output_name>.
      2. But we can't dump the metadata for all prerequistes into the environment (there are size limits)!
        • Prerequisites file might be an alternative, e.g, a JSON file.
        • Note the format has to be easy to read in multiple programming languages!
        • Note, the jq command is not POSIX.
      3. Perhaps that should be more formal, e.g:
      4. Should we provide "schema" for this metadata:
        • E.g, there could be a "filesystem" schema which provides "path" and "platform" fields.
        • And a " data bucket" schema which provides a "URL" field or whatnot.
        • And bespoke site-archive fields which provide whatever metadata fields are required for these solutions.
        • Etc.
    6. This would be desirable for provenance as it allows the data flow through a workflow to be tracked.
      1. CC potential for provenance plugin and previous academic work in this field.
      2. CC data observability - more on this in the "Observability" topic.
        • E.g, monitoring the delivery of key artefacts in a production workflow.
    7. Should we cylc message the job's exit code back to the scheduler to allow this to be captured as an output?
      1. E.g, [outputs]connection-error = $255.
      2. As an easy way to communicate error statuses from tasks without having to inject Cylc logic into scripts/executables.
    8. There is currently a bug presenting a barrier to the use of custom messages in production environments.
  2. Suite of suites:
    1. How should we define the interaction of collections of workflows, e.g, an operational suite?
    2. Currently these collections are assembled by hand, but a more configuration based approach is desirable (reproducibility, ease of collaboration, safer deployment, etc).
    3. Should we be able to define a suite of workflows, say in a configuration file?
    4. Should we define top-level workflow inputs and outputs:
      1. Binding to workflow//cycle/task fragile and requires knowledge of workflow/task structure.
      2. Better to bind to more formally defined "hard points" e.g, specific data sets.
        • E.g: global:<cycle>:dataset rather than global-v32//<cycle>/run-model
      3. CC: BoM desire to decouple workflows via Kafka.
    5. Should we be able to map the outputs of one workflow onto the inputs of another in the "suite" configuration?
    6. Should we be able to define how the suite should be started / monitored / updated / stopped?
      1. E.g, workflow/run names.
      2. Start arguments, etc.
      3. Strongly linked to CD workflow deployment (see "Supporting Continuous Deployment Paradigm For Cylc Workflows" topic).
    7. Define hierarchical workflow structure:
      1. Suites of suites of workflows...
      2. I.e, modularity at the macro-level.
      3. E.g, this group of workflows covers a particular regional model (obs, model, post-processing, etc).
        • They can be developed and trailed standalone, or as part of a larger collection
        • Just remap the inputs these workflows require (e.g, global boundary conditions) onto the desired data source.
  3. External triggers:
    1. Xtriggers:
      1. Are a "data-driver" (in that they produce outputs)
        • It might make more sense to use ext-triggers as the "event-driver" rather than trying to "fake it" with runahead-driven xtriggers.
        • This resolves spawning discrepancies.
      2. Are not an "event-driver" (in that they are driven by external events namely runahead limiting).
      3. It might make more sense to re-implement xtriggers as tasks for various reasons.
    2. Ext-triggers:
      1. Are not a "data-driver" (because they cannot conveniently carry metadata)
        • But they probably should be able to do so!
        • This is a quick win, implemented in a PoC.
        • They could provide an inter-workflow interface for "support output data in Cylc messages" (so that inputs/outputs work the same way within workflows as they do between workflows).
      2. Are are an "event-driver".
    3. Consider ways to make it easier to configure by-modal workflows which can operate in either regime?
      • E.g, Easily transition from push events (e.g, ext-triggers) to pull events (i.e, polling) as a workflow-level configuration which can be easily changed without having to re-write the workflow.
    4. async xtriggers
      1. Does it actually make sense to run polling tasks in a Cylc event loop (async or otherwise)?
      2. Or should we just run them as standalone background processes (i.e, like regular tasks)?
        • Would we be better off just wrapping the Python function call in a basic for; sleep loop and leaving it there?
        • Removes the strict requirement for xtriggers to be Python based.
        • Removes the need for a bespoke xtrigger execution model.
        • Subproc-pool throttling can be replaced by task-queue throttling.
        • Note, most of the requirements we have for polling at the MO require remote polling which is not possible via xtriggers or Python interfaces in general (without developing a Python SSH implementation or relying an external service) but is very easy via polling tasks.
        • I.e, We could keep the xtrigger "look and feel" but remove the bespoke execution altogether? (basically just a subtly different job script).
      3. Note, local background tasks are currently a problem for auto-restart functionality:
        • If going down either the async xtrigger (in main-loop) or local background task (outside main-loop) approaches, we need the ability to abruptly kill a poller to support auto-restart.
  4. Integrating event brokers
    1. Event brokers offer a push based alternative to inter-workflow triggering.
      1. Rather than polling for a file, or a task output, subscribe to the event and be notified when it happens.
      2. This requires an external service, BoM has investigated Kafka for this purpose.
    2. Shockingly easy to achieve, PoC.
      1. Event handler / xtrigger approaches are not scalable due to the number of subprocesses.
      2. But a true push based mechanism using main-loop plugins is within reach.
      3. But it requires a bit of Cylc knowledge, so we need to take the lead on this, a template example would really help.
    3. PoC uses ext-triggers to push outputs into downstream workflow's event queue.
      1. Seems like a good solution.
      2. Any issues with this?
    4. Would require a couple more plugin interfaces before it could be used in production
      1. PoC is currently hacking process_messages to intercept incoming task outputs, could do with something better.
      2. main loop: interface for intercepting task messages.
      3. PoC is currently adding ext-triggers to the scheduler's internal queue, might be ok, might want to decouple from the implementation a bit further?
  5. Automated / configurable synchronisation of task/workflow artifacts.
    1. Artefacts, i.e, the actual datasets that Cylc outputs represent.
    2. Currently, users have to arrange all of the data moving themselves.
    3. E.g, if a task on one platform requires an output produced on another, then they must add in an rsync manually.
    4. Since Cylc is "install target" (aka filesystem) aware, it can roll this rsync command on behalf of the user.
      • E.g, To pull dataset from the install target where it was written to the install target we are currently on, we could call cylc sync workflow//123/abc:dataset.
    5. Would make writing portable workflows easier as it removes "install target" logic and the need for platform-dependent sync tasks.
    6. If tasks define their inputs, then we could even call this from the job script automatically.
    7. The command and interface for this is covered in the "managing workflow files" topic.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions