Skip to content

Latest commit

 

History

History
282 lines (215 loc) · 7.76 KB

File metadata and controls

282 lines (215 loc) · 7.76 KB

Pipelines and stages

A pipeline runs an ordered list of stages. The framework loads config, builds a YT client for the current mode, runs upload when needed, then calls each stage’s run method in sequence.

Pipelines

The pipeline is responsible for:

  • Discovering or registering stages
  • Loading YAML and secrets
  • Initializing the YT client (dev or prod)
  • Uploading code in prod
  • Invoking stages in enabled_stages order
  • Surfacing failures (a raised exception stops the run)

DefaultPipeline

**Prefer DefaultPipeline**

It discovers stages under `stages/` so you rarely need custom registration logic.

Point __main__ at DefaultPipeline.main():

# pipeline.py
from yt_framework.core.pipeline import DefaultPipeline

if __name__ == "__main__":
    DefaultPipeline.main()

Discovery does the following:

  1. List subdirectories of stages/.
  2. Import stage.py from each.
  3. Register every BaseStage subclass found.
  4. Run stages in the order given by configs/config.yamlstages.enabled_stages.

Example layout:

my_pipeline/
├── pipeline.py
├── configs/
│   └── config.yaml
└── stages/
    ├── stage1/
    │   ├── stage.py
    │   └── config.yaml
    └── stage2/
        ├── stage.py
        └── config.yaml

Working tree: 01_hello_world.

BasePipeline

Use BasePipeline when you need explicit registration or setup hooks (conditional stages, tests, unusual layout).

# pipeline.py
from yt_framework.core.pipeline import BasePipeline
from yt_framework.core.registry import StageRegistry
from stages.stage1.stage import Stage1
from stages.stage2.stage import Stage2

class MyPipeline(BasePipeline):
    def setup(self):
        registry = StageRegistry()
        registry.add_stage(Stage1)
        registry.add_stage(Stage2)
        self.set_stage_registry(registry)

if __name__ == "__main__":
    MyPipeline.main()

Reasons you might choose this:

  • Custom setup() work before stages exist
  • Stages not discoverable from a flat stages/ tree
  • Tests that inject a small fixed registry

For normal repos, DefaultPipeline is enough.

Stages

A stage is one unit of work: one BaseStage subclass, one config.yaml, and optional src/ or requirements.txt for uploaded jobs.

BaseStage

BaseStage gives you:

  • Parsed stage config at self.config (from stages/<name>/config.yaml)
  • self.deps.yt_client, pipeline-wide settings, paths
  • self.logger
  • Operation-related context on self.context where applicable

Directory layout

stages/
└── stage_name/
    ├── stage.py          # required
    ├── config.yaml       # required
    ├── src/              # optional (mapper, vanilla, etc.)
    │   ├── mapper.py
    │   └── vanilla.py
    └── requirements.txt  # optional extra pip deps for the job bundle

Minimal stage

# stages/my_stage/stage.py
from yt_framework.core.pipeline import DebugContext
from yt_framework.core.stage import BaseStage

class MyStage(BaseStage):
    def run(self, debug: DebugContext) -> DebugContext:
        self.logger.info("Running my stage")
        return debug

Stage with tables

# stages/my_stage/config.yaml
client:
  input_table: //tmp/my_pipeline/input
  output_table: //tmp/my_pipeline/output
# stages/my_stage/stage.py
class MyStage(BaseStage):
    def run(self, debug: DebugContext) -> DebugContext:
        input_table = self.config.client.input_table
        output_table = self.config.client.output_table

        rows = list(self.deps.yt_client.read_table(input_table))
        processed = [process_row(row) for row in rows]

        # Default overwrites the output. Pass append=True to append if the table already exists.
        self.deps.yt_client.write_table(output_table, processed)
        return debug

self.deps

Typical fields:

  • self.deps.yt_client — read/write tables, YQL helpers, etc.
  • self.deps.pipeline_config — merged pipeline section from configs/config.yaml
  • self.deps.configs_dir — directory that holds secrets.env
class MyStage(BaseStage):
    def run(self, debug: DebugContext) -> DebugContext:
        yt = self.deps.yt_client
        mode = self.deps.pipeline_config.pipeline.mode
        table_path = self.config.client.output_table
        self.logger.info("mode=%s table=%s", mode, table_path)
        return debug

Passing data between stages (debug)

debug is a mutable mapping carried from stage to stage. Put small flags or summaries here; put large data in YT tables.

class Stage1(BaseStage):
    def run(self, debug: DebugContext) -> DebugContext:
        debug["result"] = "some value"
        debug["count"] = 42
        return debug

class Stage2(BaseStage):
    def run(self, debug: DebugContext) -> DebugContext:
        result = debug.get("result")
        count = debug.get("count", 0)
        self.logger.info("from stage1: result=%s count=%s", result, count)
        return debug
**Keep `debug` small**

It is an in-memory dict passed through the driver process. Metadata only: for big payloads, write a table and pass the path in `debug` if needed.

Stage configuration

Access nested YAML through self.config:

# stages/my_stage/config.yaml
job:
  multiplier: 2
  prefix: "processed_"

client:
  input_table: //tmp/my_pipeline/input
  output_table: //tmp/my_pipeline/output
  operations:
    map:
      resources:
        memory_limit_gb: 4
        cpu_limit: 2
class MyStage(BaseStage):
    def run(self, debug: DebugContext) -> DebugContext:
        multiplier = self.config.job.multiplier
        prefix = self.config.job.prefix
        input_table = self.config.client.input_table
        output_table = self.config.client.output_table
        memory = self.config.client.operations.map.resources.memory_limit_gb
        self.logger.info(
            "job %s %s tables %s -> %s map_mem_gb=%s",
            prefix,
            multiplier,
            input_table,
            output_table,
            memory,
        )
        return debug

Lifecycle (conceptual)

  1. Discovery (DefaultPipeline): scan stages/.
  2. Registration: stage classes recorded in a registry.
  3. Construction: one instance per stage with dependencies injected.
  4. Config load: OmegaConf-style object from config.yaml.
  5. Run: run(debug) executes; return value becomes the next stage’s debug.
  6. Next stage: repeat until the list ends or an error is raised.

Order

# configs/config.yaml
stages:
  enabled_stages:
    - create_input
    - process_data
    - validate_output
**Sequential**

Stages run one after another. An uncaught exception aborts the pipeline.

Injection details

self.deps follows PipelineStageDependencies. See Core injection (self.deps) under API reference (yt_framework.core.dependencies).

Multi-stage sample

02_multi_stage_pipeline.

Practices that tend to help

  1. One main responsibility per stage.
  2. Names that match what the stage does in YT terms.
  3. Large payloads in tables, not in debug.
  4. Fail fast: raise on invalid input instead of returning partial success silently.
  5. Log decisions you will need when reading .dev logs or YT operation logs.
  6. Exercise new stages in dev mode before pointing prod traffic at them.

Next steps