Skip to content

refactor(ak1 -> dak/ak2): Ressurect systematics handling (first eager then delayed) #786

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 55 additions & 7 deletions src/coffea/nanoevents/methods/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@
class _ClassMethodFn:
def __init__(self, attr: str, **kwargs: Any) -> None:
self.attr = attr
self.kwargs = kwargs

def __call__(self, coll: awkward.Array, *args: Any, **kwargs: Any) -> awkward.Array:
return getattr(coll, self.attr)(*args, **kwargs)
allkwargs = self.kwargs
allkwargs.update(kwargs)
return getattr(coll, self.attr)(*args, **allkwargs)


@awkward.mixin_class(behavior)
Expand All @@ -36,12 +39,35 @@ def add_kind(cls, kind: str):
"""
cls._systematic_kinds.add(kind)

def _ensure_systematics(self):
def _ensure_systematics(self, _dask_array_=None):
"""
Make sure that the parent object always has a field called '__systematics__'.
"""
if "__systematics__" not in awkward.fields(self):
self["__systematics__"] = {}
if _dask_array_ is not None:
x = awkward.Array(
awkward.Array([{}]).layout.to_typetracer(forget_length=True)
)
_dask_array_._meta["__systematics__"] = x

def add_systematics_hack(array):
if awkward.backend(array) == "typetracer":
array["__systematics__"] = x
return array
array["__systematics__"] = {}
return array

temp = dask_awkward.map_partitions(
add_systematics_hack,
_dask_array_,
label="ensure-systematics",
meta=_dask_array_._meta,
)
_dask_array_._meta = temp._meta
_dask_array_._dask = temp._dask
_dask_array_._name = temp._name
else:
self["__systematics__"] = {}

@property
def systematics(self):
Expand Down Expand Up @@ -93,13 +119,31 @@ def add_systematic(
kind: str,
what: Union[str, List[str], Tuple[str]],
varying_function: Callable,
_dask_array_=None,
):
"""
name: str, name of the systematic variation / uncertainty source
kind: str, the name of the kind of systematic variation
what: Union[str, List[str], Tuple[str]], name what gets varied, this could be a list or tuple of column names
varying_function: Union[function, bound method], a function that describes how 'what' is varied, it must close over all non-event-data arguments.
"""
if _dask_array_ is not None:
print("self", repr(self))
print("name", name)
print("kind", kind)
print("what", repr(what))
print("vf ", varying_function)
print("da ", _dask_array_, type(_dask_array_))
_dask_array_.map_partitions(
_ClassMethodFn(
"add_systematic",
name=name,
kind=kind,
varying_function=varying_function,
),
what=what,
)

self._ensure_systematics()

if name in awkward.fields(self["__systematics__"]):
Expand All @@ -122,23 +166,27 @@ def add_systematic(
if what == "weight" and "__ones__" not in awkward.fields(
flat["__systematics__"]
):
flat["__systematics__", "__ones__"] = numpy.ones(
len(flat), dtype=numpy.float32
)
fields = awkward.fields(flat["__systematics__"])
as_dict = {field: flat["__systematics__", field] for field in fields}
as_dict["__ones__"] = numpy.ones(len(flat), dtype=numpy.float32)
flat["__systematics__"] = awkward.zip(as_dict, depth_limit=1)

rendered_type = flat.layout.parameters["__record__"]
as_syst_type = awkward.with_parameter(flat, "__record__", kind)
as_syst_type._build_variations(name, what, varying_function)
variations = as_syst_type.describe_variations()

flat["__systematics__", name] = awkward.zip(
fields = awkward.fields(flat["__systematics__"])
as_dict = {field: flat["__systematics__", field] for field in fields}
as_dict[name] = awkward.zip(
{
v: getattr(as_syst_type, v)(name, what, rendered_type)
for v in variations
},
depth_limit=1,
with_name=f"{name}Systematics",
)
flat["__systematics__"] = awkward.zip(as_dict, depth_limit=1)

self["__systematics__"] = wrap(flat["__systematics__"])
self.behavior[("__typestr__", f"{name}Systematics")] = f"{kind}"
Expand Down
32 changes: 17 additions & 15 deletions src/coffea/nanoevents/methods/systematics/UpDownSystematic.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@ def _build_variations(self, name, what, varying_function, *args, **kwargs):
self[what] if what != "weight" else self["__systematics__", "__ones__"]
)

self["__systematics__", f"__{name}__"] = awkward.virtual(
varying_function,
args=(whatarray, *args),
kwargs=kwargs,
length=len(whatarray),
fields = awkward.fields(self["__systematics__"])
as_dict = {field: self["__systematics__", field] for field in fields}
as_dict[f"__{name}__"] = varying_function(
whatarray,
*args,
**kwargs,
)
self["__systematics__"] = awkward.zip(as_dict, depth_limit=1)

def describe_variations(self):
"""Show the map of variation names to indices."""
Expand Down Expand Up @@ -53,20 +55,20 @@ def get_variation(self, name, what, astype, updown):

def up(self, name, what, astype):
"""Return the "up" variation of this observable."""
return awkward.virtual(
self.get_variation,
args=(name, what, astype, "up"),
length=len(self),
parameters=self[what].layout.parameters if what != "weight" else None,
return self.get_variation(
name,
what,
astype,
"up",
)

def down(self, name, what, astype):
"""Return the "down" variation of this observable."""
return awkward.virtual(
self.get_variation,
args=(name, what, astype, "down"),
length=len(self),
parameters=self[what].layout.parameters if what != "weight" else None,
return self.get_variation(
name,
what,
astype,
"down",
)


Expand Down
2 changes: 1 addition & 1 deletion src/coffea/nanoevents/schemas/nanoaod.py
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ def _build_collections(self, field_names, input_contents):
output[name].setdefault("parameters", {})
output[name]["parameters"].update({"collection_name": name})

return output.keys(), output.values()
return list(output.keys()), list(output.values())

@classmethod
def behavior(cls):
Expand Down
11 changes: 4 additions & 7 deletions src/coffea/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,17 +185,14 @@ def deprecate(exception, version, date=None):

# re-nest a record array into a ListArray
def awkward_rewrap(arr, like_what, gfunc):
behavior = awkward._util.behaviorof(like_what)
func = partial(gfunc, data=arr.layout)
layout = awkward.operations.convert.to_layout(like_what)
newlayout = awkward._util.recursively_apply(layout, func)
return awkward._util.wrap(newlayout, behavior=behavior)
return awkward.transform(func, like_what, behavior=like_what.behavior)


# we're gonna assume that the first record array we encounter is the flattened data
def rewrap_recordarray(layout, depth, data):
if isinstance(layout, awkward.layout.RecordArray):
return lambda: data
def rewrap_recordarray(layout, depth, data, **kwargs):
if isinstance(layout, awkward.contents.RecordArray):
return data
return None


Expand Down