-
Notifications
You must be signed in to change notification settings - Fork 27
Open
Description
Context
When repeatedly calling PutWfSpec with the same workflow definition (no code changes), the revision number sporadically increments. This suggests the compiled WfSpec is not deterministic, causing LittleHorse to detect "changes" that don't exist.
The revision number increments sporadically even with no changes to the workflow:
- Runs 1-4: revision=1 (correct)
- Run 5: revision=2 (WRONG - no change, but revision incremented)
- Runs 6-7: revision=2 (correct)
- Run 8: revision=3 (WRONG - no change, but revision incremented)
- And so on...
Steps To Reproduce
- Save the reproduction script below as lh_version_repro.py
- Start LittleHorse server via Docker
- Register the mock tasks in one terminal:
python lh_version_repro.py tasks - In another terminal, run upload repeatedly:
python lh_version_repro.py upload
python lh_version_repro.py upload
python lh_version_repro.py upload
... repeat 10+ times - Observe revision numbers incrementing despite no code changes between runs
#!/usr/bin/env python3
"""LittleHorse version bug reproduction."""
import argparse
import asyncio
import logging
from typing import Any
import littlehorse
from littlehorse.config import LHConfig
from littlehorse.model import PutWfSpecRequest
from littlehorse.worker import LHTaskWorker, WorkerContext
from littlehorse.workflow import Workflow, WorkflowThread
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
logger = logging.getLogger(__name__)
UPDATE_EMBEDDINGS_TASK = "repro-update-embeddings"
UPDATE_RECORD_TASK = "repro-update-record"
DETECT_FACES_TASK = "repro-detect-faces"
GRADE_IMAGE_TASK = "repro-grade-image"
WORKFLOW_NAME = "repro-image-processing"
async def mock_update_embeddings(
url: str, record_type: str, tenant_id: str, creator_id: str, record_id: str, ctx: WorkerContext,
) -> dict[str, Any]:
return {}
async def mock_update_record(
tenant_id: str, creator_id: str, record_id: str, updates: dict[str, Any], ctx: WorkerContext,
) -> dict[str, Any]:
return {"updated_fields": list(updates.keys()) if updates else []}
async def mock_detect_faces(
url: str, parent_id: str, parent_type: str, tenant_id: str, creator_id: str, ctx: WorkerContext,
) -> dict[str, Any]:
return {"face_count": 1, "best_face_quality_score": 0.95, "face_ids": ["mock-face-001"]}
async def mock_grade_image(
url: str, tenant_id: str, creator_id: str, record_id: str, face_count: int, best_face_quality: float, ctx: WorkerContext,
) -> dict[str, Any]:
return {"quality_score": 0.85, "quality_metrics": {"sharpness": 0.80, "resolution": 0.90}}
def image_processing_workflow(wf: WorkflowThread) -> None:
input_obj = wf.declare_json_obj("input").required().as_public()
tenant_id = wf.declare_str("tenant_id").as_public().searchable()
tenant_id.assign(input_obj.with_json_path("$.tenant_id"))
creator_id = wf.declare_str("creator_id").as_public().searchable()
creator_id.assign(input_obj.with_json_path("$.creator_id"))
record_id = wf.declare_str("record_id").as_public().searchable()
record_id.assign(input_obj.with_json_path("$.record_id"))
storage_uri = input_obj.with_json_path("$.storage_uri")
input_obj.with_json_path("$.quality_score").assign(None)
input_obj.with_json_path("$.quality_metrics").assign(None)
url = storage_uri
wf.execute(UPDATE_EMBEDDINGS_TASK, url, "Image", tenant_id, creator_id, record_id, retries=5)
status_path = input_obj.with_json_path("$.status")
status_path.assign("embeddings_processed")
wf.execute(UPDATE_RECORD_TASK, tenant_id, creator_id, record_id, input_obj, retries=5)
face_result = wf.execute(DETECT_FACES_TASK, url, record_id, "Image", tenant_id, creator_id, retries=5)
face_count = wf.declare_int("face_count")
face_count.assign(face_result.with_json_path("$.face_count"))
best_face_quality = wf.declare_double("best_face_quality")
best_face_quality.assign(face_result.with_json_path("$.best_face_quality_score"))
quality_result = wf.execute(GRADE_IMAGE_TASK, url, tenant_id, creator_id, record_id, face_count, best_face_quality, retries=5)
input_obj.with_json_path("$.quality_score").assign(quality_result.with_json_path("$.quality_score"))
input_obj.with_json_path("$.quality_metrics").assign(quality_result.with_json_path("$.quality_metrics"))
status_path.assign("processed")
wf.execute(UPDATE_RECORD_TASK, tenant_id, creator_id, record_id, input_obj, retries=5)
def create_workflow() -> Workflow:
return Workflow(WORKFLOW_NAME, image_processing_workflow)
async def run_tasks() -> None:
config = LHConfig()
workers = [
LHTaskWorker(mock_update_embeddings, UPDATE_EMBEDDINGS_TASK, config),
LHTaskWorker(mock_update_record, UPDATE_RECORD_TASK, config),
LHTaskWorker(mock_detect_faces, DETECT_FACES_TASK, config),
LHTaskWorker(mock_grade_image, GRADE_IMAGE_TASK, config),
]
for worker in workers:
worker.register_task_def()
logger.info(f"Registered task: {worker._task_def_name}")
await littlehorse.start(*workers)
async def run_upload() -> None:
config = LHConfig()
client = config.stub()
workflow = create_workflow()
request: PutWfSpecRequest = workflow.compile()
response = client.PutWfSpec(request)
logger.info(
f"Workflow registered: name={response.id.name}, "
f"major_version={response.id.major_version}, revision={response.id.revision}"
)
def main() -> None:
parser = argparse.ArgumentParser()
parser.add_argument("mode", choices=["tasks", "upload"])
args = parser.parse_args()
if args.mode == "tasks":
asyncio.run(run_tasks())
elif args.mode == "upload":
asyncio.run(run_upload())
if __name__ == "__main__":
main()
Expected Behavior
Uploading the same workflow definition multiple times should return the same revision number, since the workflow hasn't changed. The server should detect that the compiled WfSpec is identical and return the existing revision.
Affected Components
Kernel (/server), Python SDK
Environment
- OS: macOS (Intel)
- Client Version: littlehorse Python SDK (latest via pip)
- Server Version: Docker (latest)
- Python: 3.12
Documentation
Potential Root Cause:
The issue appears to be non-deterministic serialization during workflow compilation. Possible causes:
- Dictionary/map ordering - Protobuf map fields may not serialize deterministically
- Variable ordering - Internal data structures may have non-deterministic iteration order
- Hash-based ordering - Any use of hash() or unordered collections could cause variation
Impact:
- Unnecessary workflow revisions pollute the revision history
- Makes it difficult to track actual workflow changes
- Could cause confusion in production deployments when "no changes" result in new versions
Additional Context
$ python lh_version_repro.py upload
Workflow registered: name=repro-image-processing, major_version=0, revision=1
$ python lh_version_repro.py upload
Workflow registered: name=repro-image-processing, major_version=0, revision=1
$ python lh_version_repro.py upload
Workflow registered: name=repro-image-processing, major_version=0, revision=1
$ python lh_version_repro.py upload
Workflow registered: name=repro-image-processing, major_version=0, revision=1
$ python lh_version_repro.py upload
Workflow registered: name=repro-image-processing, major_version=0, revision=2 # <-- unexpected
$ python lh_version_repro.py upload
Workflow registered: name=repro-image-processing, major_version=0, revision=2
$ python lh_version_repro.py upload
Workflow registered: name=repro-image-processing, major_version=0, revision=2
$ python lh_version_repro.py upload
Workflow registered: name=repro-image-processing, major_version=0, revision=3 # <-- unexpected
$ python lh_version_repro.py upload
Workflow registered: name=repro-image-processing, major_version=0, revision=3
$ python lh_version_repro.py upload
Workflow registered: name=repro-image-processing, major_version=0, revision=4 # <-- unexpected
$ python lh_version_repro.py upload
Workflow registered: name=repro-image-processing, major_version=0, revision=5 # <-- unexpected
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
No labels