Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/formats/coco.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,12 @@ rois = labels.labeled_frames[0].rois
to an `ROI` since there is no extent to rasterize into. When written back to
COCO, `SegmentationMask` objects are exported as RLE.

!!! note "Predicted vs. user segmentation"
A detection annotation carrying a `score` (i.e. a model prediction) is read
as a `PredictedSegmentationMask` / `PredictedROI` with that score; annotations
without a `score` become the `User*` variants. This mirrors how `bbox`
annotations select `PredictedBoundingBox` vs. `UserBoundingBox`.

## Categories as identities

In a standard COCO dataset the `category` is an object *class* (e.g. `"person"`,
Expand Down
61 changes: 37 additions & 24 deletions sleap_io/io/coco.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
from sleap_io.model.instance import Instance, Track
from sleap_io.model.labeled_frame import LabeledFrame
from sleap_io.model.labels import Labels
from sleap_io.model.mask import UserSegmentationMask
from sleap_io.model.roi import UserROI
from sleap_io.model.mask import PredictedSegmentationMask, UserSegmentationMask
from sleap_io.model.roi import PredictedROI, UserROI
from sleap_io.model.skeleton import Edge, Node, Skeleton
from sleap_io.model.video import Video

Expand Down Expand Up @@ -242,29 +242,36 @@ def _decode_segmentation(
height: int,
width: int,
segmentation_format: str,
score: float | None = None,
**kwargs,
) -> tuple[list, list]:
"""Decode a COCO ``segmentation`` field into masks and/or ROIs.

COCO encodes segmentation either as RLE (a dict with ``counts``/``size``) or
as polygons (a list of flat ``[x1, y1, x2, y2, ...]`` rings, where multiple
rings belong to a single object). RLE is always materialized as a
`UserSegmentationMask`. Polygon handling depends on ``segmentation_format``:
segmentation mask. Polygon handling depends on ``segmentation_format``:

- ``"mask"``: rasterize all rings of the annotation into a single
`UserSegmentationMask` at the image resolution. Requires positive
``height`` and ``width``; if either is missing (``<= 0``), the polygon is
kept as ROI(s) instead since rasterization needs the image extent.
- ``"roi"``: keep the native vector geometry as one `UserROI` per ring.
segmentation mask at the image resolution. Requires positive ``height``
and ``width``; if either is missing (``<= 0``), the polygon is kept as
ROI(s) instead since rasterization needs the image extent.
- ``"roi"``: keep the native vector geometry as one ROI per ring.

When ``score`` is provided, predicted variants
(`PredictedSegmentationMask` / `PredictedROI`) carrying the score are
created; otherwise user variants are created.

Args:
segmentation: The COCO ``segmentation`` field (RLE dict, polygon list, or
``None``).
height: Image height in pixels (used to rasterize polygons in mask mode).
width: Image width in pixels (used to rasterize polygons in mask mode).
segmentation_format: Either ``"mask"`` or ``"roi"``.
score: Optional COCO detection confidence. If not ``None``, predicted
mask/ROI variants are created with this score.
**kwargs: Metadata forwarded to the created mask/ROI objects (e.g.
``category``, ``instance``).
``category``, ``instance``, ``track``).

Returns:
A ``(masks, rois)`` tuple of the decoded objects for this annotation.
Expand All @@ -275,10 +282,16 @@ def _decode_segmentation(
if segmentation is None:
return masks, rois

# Predicted variants carry a score; user variants do not.
predicted = score is not None
mask_cls = PredictedSegmentationMask if predicted else UserSegmentationMask
roi_cls = PredictedROI if predicted else UserROI
score_kw = {"score": score} if predicted else {}

if isinstance(segmentation, dict):
# RLE format: always materialize as a segmentation mask.
mask = _decode_coco_rle(segmentation["counts"], segmentation["size"])
masks.append(UserSegmentationMask.from_numpy(mask, **kwargs))
masks.append(mask_cls.from_numpy(mask, **kwargs, **score_kw))
return masks, rois

if isinstance(segmentation, list) and len(segmentation) > 0:
Expand All @@ -298,14 +311,15 @@ def _decode_segmentation(

if segmentation_format == "mask" and height > 0 and width > 0:
if len(polygons) == 1:
roi = UserROI.from_polygon(polygons[0], **kwargs)
roi = roi_cls.from_polygon(polygons[0], **kwargs, **score_kw)
else:
roi = UserROI.from_multi_polygon(polygons, **kwargs)
roi = roi_cls.from_multi_polygon(polygons, **kwargs, **score_kw)
# to_mask preserves the predicted/user variant and score from the ROI.
masks.append(roi.to_mask(height, width))
else:
# ROI mode, or mask mode without image dimensions to rasterize into.
for coords in polygons:
rois.append(UserROI.from_polygon(coords, **kwargs))
rois.append(roi_cls.from_polygon(coords, **kwargs, **score_kw))

return masks, rois

Expand Down Expand Up @@ -397,10 +411,15 @@ def _category_track(cat_name: str) -> Track | None:
image_annotations[image_id] = []
image_annotations[image_id].append(annotation)

# Group images by shape (height, width) for shared Video objects
# Group images by shape (height, width) for shared Video objects. Each
# ``images`` entry becomes its own frame in its shape's video, so the frame
# index is the entry's position within that group. This is keyed by image_id
# directly (not by resolved path) so distinct images that share a file_name
# do not collide.
shape_to_images = {}
image_id_to_path = {}
image_id_to_shape = {}
image_id_to_frame_idx = {}

for image_info in coco_data["images"]:
image_id = image_info["id"]
Expand All @@ -418,6 +437,7 @@ def _category_track(cat_name: str) -> Track | None:
image_id_to_shape[image_id] = shape_key
if shape_key not in shape_to_images:
shape_to_images[shape_key] = []
image_id_to_frame_idx[image_id] = len(shape_to_images[shape_key])
shape_to_images[shape_key].append(str(image_path))
except FileNotFoundError:
# Skip missing images
Expand All @@ -442,16 +462,6 @@ def _category_track(cat_name: str) -> Track | None:
rois = []
masks = []
bboxes = []
image_id_to_frame_idx = {}

# Build frame index mapping for each image
for shape_key, image_paths in shape_to_images.items():
for frame_idx, image_path in enumerate(image_paths):
# Find the image_id for this path
for img_id, path in image_id_to_path.items():
if str(path) == image_path:
image_id_to_frame_idx[img_id] = frame_idx
break

for image_info in coco_data["images"]:
image_id = image_info["id"]
Expand Down Expand Up @@ -541,7 +551,9 @@ def _category_track(cat_name: str) -> Track | None:
)
bboxes.append(bbox_obj)
else:
# Detection-only annotation: create ROIs/masks/bboxes
# Detection-only annotation: create ROIs/masks/bboxes. A
# COCO ``score`` marks a prediction, so it selects predicted
# mask/ROI/bbox variants (mirrors the bbox handling below).
roi_kwargs = dict(
category=cat_name,
track=_category_track(cat_name),
Expand All @@ -554,6 +566,7 @@ def _category_track(cat_name: str) -> Track | None:
img_height,
img_width,
segmentation_format,
score=annotation.get("score"),
**roi_kwargs,
)
masks.extend(seg_masks)
Expand Down
17 changes: 14 additions & 3 deletions sleap_io/model/roi.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,26 +286,37 @@ def __geo_interface__(self) -> dict:
def to_mask(self, height: int, width: int) -> "SegmentationMask":
"""Rasterize this ROI into a binary segmentation mask.

A `PredictedROI` produces a `PredictedSegmentationMask` carrying its
`score`; any other ROI produces a `UserSegmentationMask`. Metadata
(name, category, source, track, instance) is inherited either way.

Args:
height: Height of the output mask in pixels.
width: Width of the output mask in pixels.

Returns:
A `SegmentationMask` with the rasterized geometry.
"""
from sleap_io.model.mask import UserSegmentationMask
from sleap_io.model.mask import (
PredictedSegmentationMask,
UserSegmentationMask,
)

# Rasterize geometry to binary mask
mask = _rasterize_geometry(self.geometry, height, width)

return UserSegmentationMask.from_numpy(
mask,
kwargs = dict(
name=self.name,
category=self.category,
source=self.source,
track=self.track,
instance=self.instance,
)
if self.is_predicted:
return PredictedSegmentationMask.from_numpy(
mask, score=self.score, **kwargs
)
return UserSegmentationMask.from_numpy(mask, **kwargs)

def explode(self) -> list["ROI"]:
"""Split a multi-geometry ROI into individual ROIs.
Expand Down
117 changes: 116 additions & 1 deletion tests/io/test_coco.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@
from sleap_io.model.bbox import PredictedBoundingBox, UserBoundingBox
from sleap_io.model.instance import Track
from sleap_io.model.labels import Labels
from sleap_io.model.mask import UserSegmentationMask
from sleap_io.model.mask import PredictedSegmentationMask, UserSegmentationMask
from sleap_io.model.matching import (
IMAGE_DEDUP_VIDEO_MATCHER,
SHAPE_VIDEO_MATCHER,
)
from sleap_io.model.roi import PredictedROI, UserROI


class TestCOCOBasicLoading:
Expand Down Expand Up @@ -661,6 +662,53 @@ def test_shared_video_objects_for_same_shape(tmp_path):
assert frame_indices_200x150 == [0]


def test_coco_duplicate_filename_distinct_frames(tmp_path):
"""Distinct image entries that share a file_name each get their own frame."""
img_path = tmp_path / "dup.png"
img_path.touch()
# Three image entries all pointing at the same file_name, with one annotation
# each. Previously the path-keyed frame-index map collided and raised KeyError.
data = {
"images": [
{"id": 10, "file_name": "dup.png", "height": 40, "width": 40},
{"id": 11, "file_name": "dup.png", "height": 40, "width": 40},
{"id": 12, "file_name": "dup.png", "height": 40, "width": 40},
],
"annotations": [
{
"id": 1,
"image_id": 10,
"category_id": 1,
"segmentation": [[1.0, 1.0, 10.0, 1.0, 10.0, 10.0, 1.0, 10.0]],
},
{
"id": 2,
"image_id": 11,
"category_id": 1,
"segmentation": [[2.0, 2.0, 12.0, 2.0, 12.0, 12.0, 2.0, 12.0]],
},
{
"id": 3,
"image_id": 12,
"category_id": 1,
"segmentation": [[3.0, 3.0, 13.0, 3.0, 13.0, 13.0, 3.0, 13.0]],
},
],
"categories": [{"id": 1, "name": "obj"}],
}
json_path = tmp_path / "dup.coco.json"
with open(json_path, "w") as f:
json.dump(data, f)

# Does not raise; one frame per image entry, each at a distinct index.
labels = coco.read_labels(json_path, dataset_root=tmp_path)
assert len(labels.labeled_frames) == 3
assert sorted(lf.frame_idx for lf in labels.labeled_frames) == [0, 1, 2]
# Each frame carries exactly its own annotation's mask.
assert all(len(lf.masks) == 1 for lf in labels.labeled_frames)
assert sum(len(lf.masks) for lf in labels.labeled_frames) == 3


def test_grayscale_loading(tmp_path):
"""Test loading images as grayscale."""
import json
Expand Down Expand Up @@ -2478,6 +2526,73 @@ def test_coco_predicted_bbox(tmp_path):
assert h == pytest.approx(40.0)


def test_coco_scored_segmentation_is_predicted(tmp_path):
"""A `score` on a detection annotation yields predicted mask/ROI variants."""
img_path = tmp_path / "pred.png"
img_path.touch()
data = {
"images": [{"id": 1, "file_name": "pred.png", "height": 30, "width": 30}],
"annotations": [
# Scored polygon -> PredictedSegmentationMask.
{
"id": 1,
"image_id": 1,
"category_id": 1,
"score": 0.9,
"segmentation": [[2.0, 2.0, 15.0, 2.0, 15.0, 15.0, 2.0, 15.0]],
},
# Scored RLE -> PredictedSegmentationMask.
{
"id": 2,
"image_id": 1,
"category_id": 1,
"score": 0.7,
"segmentation": {"counts": [0, 5, 5, 5, 5], "size": [5, 5]},
},
# Unscored polygon -> UserSegmentationMask.
{
"id": 3,
"image_id": 1,
"category_id": 1,
"segmentation": [[20.0, 20.0, 28.0, 20.0, 28.0, 28.0, 20.0, 28.0]],
},
],
"categories": [{"id": 1, "name": "obj"}],
}
json_path = tmp_path / "pred_seg.json"
with open(json_path, "w") as f:
json.dump(data, f)

# Default mask mode: scored -> predicted (with score), unscored -> user.
labels = coco.read_labels(json_path, dataset_root=tmp_path)
masks = labels.labeled_frames[0].masks
assert len(masks) == 3
predicted = [m for m in masks if isinstance(m, PredictedSegmentationMask)]
user = [m for m in masks if type(m) is UserSegmentationMask]
assert len(predicted) == 2
assert len(user) == 1
assert sorted(m.score for m in predicted) == pytest.approx([0.7, 0.9])

# roi mode: scored polygon -> PredictedROI; the scored RLE is still a mask.
roi_labels = coco.read_labels(
json_path, dataset_root=tmp_path, segmentation_format="roi"
)
rois = roi_labels.labeled_frames[0].rois
assert [type(r) for r in rois] == [PredictedROI, UserROI]
assert rois[0].score == pytest.approx(0.9)
assert len(roi_labels.labeled_frames[0].masks) == 1 # RLE stays a mask

# Predicted masks survive a .slp round-trip with class and score intact.
slp_path = tmp_path / "pred_seg.slp"
sio.save_file(labels, str(slp_path))
reloaded = sio.load_file(str(slp_path))
rmasks = reloaded.labeled_frames[0].masks
rpred = [m for m in rmasks if isinstance(m, PredictedSegmentationMask)]
assert len(rpred) == 2
assert sorted(m.score for m in rpred) == pytest.approx([0.7, 0.9])
assert sum(1 for m in rmasks if type(m) is UserSegmentationMask) == 1


def test_coco_bbox_roundtrip(tmp_path):
"""Write bboxes to COCO and read back as BoundingBox."""
video = sio.Video.from_filename(["img1.png"])
Expand Down
17 changes: 17 additions & 0 deletions tests/model/test_roi.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,23 @@ def test_roi_to_mask():
assert not data[0, 0] # Outside the bbox


def test_predicted_roi_to_mask_is_predicted():
"""A PredictedROI rasterizes to a PredictedSegmentationMask with its score."""
from sleap_io.model.mask import PredictedSegmentationMask, UserSegmentationMask

roi = PredictedROI.from_bbox(2, 3, 4, 5, category="cat", score=0.8)
mask = roi.to_mask(height=20, width=20)

assert isinstance(mask, PredictedSegmentationMask)
assert mask.score == pytest.approx(0.8)
assert mask.category == "cat"
assert mask.area > 0

# A user ROI still produces a user mask (no score field).
user_mask = UserROI.from_bbox(2, 3, 4, 5).to_mask(height=20, width=20)
assert type(user_mask) is UserSegmentationMask


def test_roi_with_video():
video = Video(filename="test.mp4")
roi = UserROI.from_bbox(0, 0, 10, 10, video=video)
Expand Down
Loading