Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 122 additions & 20 deletions ophyd/status.py
Original file line number Diff line number Diff line change
Expand Up @@ -565,12 +565,42 @@ def __and__(self, other):
with the same base API.

It will finish when both `self` or `other` finish.

Parameters
----------
other: StatusBase
Another status object to combine with this one.
"""
return AndStatus(self, other)

def __or__(self, other):
"""
Returns a new 'composite' status object, OrStatus,
with the same base API.

It will finish when either `self` or `other` finishes.

Parameters
----------
other: StatusBase
Another status object to combine with this one.
"""
return OrStatus(self, other)


class AndStatus(StatusBase):
"a Status that has composes two other Status objects using logical and"
"""
A Status that has composes two other Status objects using logical and.
If any of the two Status objects fails, the combined status will fail
with the exception of the first Status to fail.

Parameters
----------
left: StatusBase
The left-hand Status object
right: StatusBase
The right-hand Status object
"""

def __init__(self, left, right, **kwargs):
self.left = left
Expand All @@ -583,25 +613,22 @@ def inner(status):
with self._lock:
if self._externally_initiated_completion:
return
with self.left._lock:
with self.right._lock:
l_success = self.left.success
r_success = self.right.success
l_done = self.left.done
r_done = self.right.done

# At least one is done.
# If it failed, do not wait for the second one.
if (not l_success) and l_done:
self._finished(success=False)
elif (not r_success) and r_done:
self._finished(success=False)

elif l_success and r_success and l_done and r_done:
# Both are done, successfully.
self._finished(success=True)
# Else one is done, successfully, and we wait for #2,
# when this function will be called again.

# Return if status is already done..
if self.done:
return

with status._lock:
if status.done and not status.success:
self.set_exception(status.exception()) # st._exception
return
if (
self.left.done
and self.right.done
and self.left.success
and self.right.success
):
self.set_finished()

self.left.add_callback(inner)
self.right.add_callback(inner)
Expand All @@ -627,6 +654,81 @@ def __contains__(self, status: StatusBase) -> bool:
return False


class OrStatus(StatusBase):
"""
A Status that has composes two other Status objects using logical or.
If any of the status objects succeeds, the combined status will succeed.
It will only fail if both status objects fail, with the exception of all
status objects combined.

Parameters
----------
left: StatusBase
The left-hand Status object
right: StatusBase
The right-hand Status object
"""

def __init__(self, left, right, **kwargs):
self.left = left
self.right = right
super().__init__(**kwargs)
self._trace_attributes["left"] = self.left._trace_attributes
self._trace_attributes["right"] = self.right._trace_attributes

def inner(status):
with self._lock:
if self._externally_initiated_completion:
return

# Return if status is already done..
if self.done:
return

with status._lock:
if status.done and status.success:
self.set_finished()
return
if (
self.left.done
and not self.left.success
and self.right.done
and not self.right.success
):
exceptions = [
st.exception()
for st in [self.left, self.right]
if st.done and not st.success and st.exception() is not None
]
combined_exceptions = RuntimeError(
"; ".join(f"{type(exc).__name__}: {exc}" for exc in exceptions)
)
self.set_exception(combined_exceptions)

self.left.add_callback(inner)
self.right.add_callback(inner)

def __repr__(self):
return "({self.left!r} & {self.right!r})".format(self=self)

def __str__(self):
return (
"{0}(done={1.done}, "
"success={1.success})"
"".format(self.__class__.__name__, self)
)

def __contains__(self, status: StatusBase) -> bool:
for child in [self.left, self.right]:
if child == status:
return True
if isinstance(child, OrStatus):
if status in child:
return True

return False


class Status(StatusBase):
"""
Track the status of a potentially-lengthy action like moving or triggering.
Expand Down
79 changes: 79 additions & 0 deletions ophyd/tests/test_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -607,3 +607,82 @@ def _handle_failure(self):
st.wait(1)
time.sleep(0.1) # Wait for callbacks to run.
assert state


def test_and_status():
"""Test AndStatus"""
dev = Device("Tst:Prefix", name="test")
st1 = StatusBase()
st2 = StatusBase()
st3 = DeviceStatus(dev)
and_status = st1 & st2 & st3

# Finish in success
assert and_status.done is False
st1.set_finished()
assert and_status.done is False
st2.set_finished()
assert and_status.done is False
st3.set_finished()
assert and_status.done is True
assert and_status.success is True

# Failure
st1 = StatusBase()
st2 = StatusBase()
st3 = DeviceStatus(dev)
and_status = st1 & st2 & st3

assert and_status.done is False
st1.set_finished()
assert and_status.done is False
exc = Exception("Test exception")
st2.set_exception(exc)
assert and_status.done is True
assert and_status.success is False
assert st2.success is False
assert st3.success is False

# Not resolved before failure
assert st3.done is False

# Already resolved before failure
assert st1.success is True
assert isinstance(and_status.exception(), Exception)
assert str(and_status.exception()) == "Test exception"
assert and_status.exception() == exc


def test_or_status():
"""Test OrStatus"""
dev = Device("Tst:Prefix", name="test")
st1 = StatusBase()
st2 = StatusBase()
st3 = DeviceStatus(dev)
or_status = st1 | st2 | st3

# Finish in success
assert or_status.done is False
st1.set_finished()
assert or_status.done is True
assert or_status.success is True

st1 = StatusBase()
or_status = st1 | st2 | st3
assert or_status.done is False
assert or_status.success is False
st1.set_exception(Exception("Test exception"))
assert or_status.done is False
assert or_status.success is False
st2.set_exception(RuntimeError("Test exception 2"))
assert or_status.done is False
assert or_status.success is False
st3.set_exception(ValueError("Test exception 3"))
assert or_status.done is True
assert or_status.success is False
assert isinstance(or_status.exception(), RuntimeError)
assert str(or_status.exception()) == (
"RuntimeError: Exception: Test exception; "
"RuntimeError: Test exception 2; "
"ValueError: Test exception 3"
)