Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions iwf/object_encoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,8 +228,8 @@ def __init__(self) -> None:
super().__init__(*DefaultPayloadConverter.default_encoding_payload_converters)


class BinaryNullPayloadConverter(EncodingPayloadConverter):
"""Converter for 'binary/null' payloads supporting None values."""
class UnsetPayloadConverter(EncodingPayloadConverter):
"""Converter for 'unset' payloads supporting None values."""

@property
def encoding(self) -> Union[str, Unset]:
Expand All @@ -253,6 +253,15 @@ def from_payload(
return None


class BinaryNullPayloadConverter(UnsetPayloadConverter):
"""Converter for 'binary/null' payloads supporting None values."""

@property
def encoding(self) -> Union[str, Unset]:
"""See base class."""
return "binary/null"


class BinaryPlainPayloadConverter(EncodingPayloadConverter):
"""Converter for 'binary/plain' payloads supporting bytes values."""

Expand Down Expand Up @@ -533,6 +542,7 @@ def decode(


DefaultPayloadConverter.default_encoding_payload_converters = (
UnsetPayloadConverter(),
BinaryNullPayloadConverter(),
BinaryPlainPayloadConverter(),
JSONPlainPayloadConverter(),
Expand Down
10 changes: 6 additions & 4 deletions iwf/tests/__init__.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
from iwf.registry import Registry
from iwf.tests.workflows.java_duplicate_rpc_memo_workflow import (
JavaDuplicateRpcMemoWorkflow,
)
from iwf.tests.workflows.abnormal_exit_workflow import AbnormalExitWorkflow
from iwf.tests.workflows.basic_workflow import BasicWorkflow
from iwf.tests.workflows.conditional_complete_workflow import (
ConditionalCompleteWorkflow,
)
from iwf.tests.workflows.describe_workflow import DescribeWorkflow
from iwf.tests.workflows.empty_data_workflow import EmptyDataWorkflow
from iwf.tests.workflows.internal_channel_workflow import InternalChannelWorkflow
from iwf.tests.workflows.internal_channel_workflow_with_no_prefix_channel import (
InternalChannelWorkflowWithNoPrefixChannel,
)
from iwf.tests.workflows.java_duplicate_rpc_memo_workflow import (
JavaDuplicateRpcMemoWorkflow,
)
from iwf.tests.workflows.persistence_data_attributes_workflow import (
PersistenceDataAttributesWorkflow,
)
Expand Down Expand Up @@ -45,6 +46,7 @@
registry.add_workflow(BasicWorkflow())
registry.add_workflow(ConditionalCompleteWorkflow())
registry.add_workflow(DescribeWorkflow())
registry.add_workflow(EmptyDataWorkflow())
registry.add_workflow(InternalChannelWorkflow())
registry.add_workflow(InternalChannelWorkflowWithNoPrefixChannel())
registry.add_workflow(JavaDuplicateRpcMemoWorkflow())
Expand All @@ -54,8 +56,8 @@
registry.add_workflow(RecoveryWorkflow())
registry.add_workflow(RpcMemoWorkflow())
registry.add_workflow(RPCWorkflow())
registry.add_workflow(TimerWorkflow())
registry.add_workflow(StateOptionsOverrideWorkflow())
registry.add_workflow(TimerWorkflow())
registry.add_workflow(WaitForStateWithStateExecutionIdWorkflow())
registry.add_workflow(WaitForStateWithWaitForKeyWorkflow())
registry.add_workflow(WaitInternalChannelWorkflow())
Expand Down
65 changes: 65 additions & 0 deletions iwf/tests/test_empty_data_decodes_properly.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import inspect
import time
import unittest

import httpx

from iwf.client import Client
from iwf.tests.worker_server import registry
from iwf.worker_service import WorkerService


class TestBinaryNullDecodesCorrectly(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.client = Client(registry)

def test_binary_null_input_decodes_correctly(self):
wf_id = f"{inspect.currentframe().f_code.co_name}-{time.time_ns()}"

response = httpx.post(
f"http://0.0.0.0:8802/{WorkerService.api_path_workflow_state_execute}",
json={
"DataObjects": [
{"key": "test-da", "value": {"encoding": "binary/null"}}
],
"commandResults": {
"interStateChannelResults": [],
"stateStartApiSucceeded": True,
},
"context": {
"attempt": 1,
"firstAttemptTimestamp": 1747935829,
"stateExecutionId": "State1-1",
"workflowId": wf_id,
"workflowRunId": "0196f734-d037-7432-bd63-e1136cd34dbd",
"workflowStartedTimestamp": 1747904155,
},
"stateInput": {"encoding": "binary/null"},
"stateLocals": [],
"workflowStateId": "State1",
"workflowType": "EmptyDataWorkflow",
},
)
assert response.is_success
response_json = response.json()
self.assertEqual(
response_json,
{
"publishToInterStateChannel": [],
"recordEvents": [],
"stateDecision": {
"nextStates": [
{
"stateId": "_SYS_GRACEFUL_COMPLETING_WORKFLOW",
"stateInput": {
"data": '"success"',
"encoding": "json/plain",
},
}
]
},
"upsertDataObjects": [],
"upsertStateLocals": [],
},
)
37 changes: 37 additions & 0 deletions iwf/tests/workflows/empty_data_workflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
from iwf.command_results import CommandResults
from iwf.communication import Communication
from iwf.persistence import Persistence
from iwf.persistence_schema import PersistenceField, PersistenceSchema
from iwf.state_decision import StateDecision
from iwf.state_schema import StateSchema
from iwf.workflow import ObjectWorkflow
from iwf.workflow_context import WorkflowContext
from iwf.workflow_state import WorkflowState

TEST_DA_KEY = "test-da"


class State1(WorkflowState[None]):
def execute(
self,
ctx: WorkflowContext,
input: None,
command_results: CommandResults,
persistence: Persistence,
communication: Communication,
) -> StateDecision:
assert input is None
test_da = persistence.get_data_attribute(TEST_DA_KEY)
assert test_da is None

return StateDecision.graceful_complete_workflow(output="success")


class EmptyDataWorkflow(ObjectWorkflow):
def get_workflow_states(self) -> StateSchema:
return StateSchema.with_starting_state(State1())

def get_persistence_schema(self) -> PersistenceSchema:
return PersistenceSchema.create(
PersistenceField.data_attribute_def(TEST_DA_KEY, None),
)