Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 42 additions & 38 deletions src/asammdf/blocks/mdf_v4.py
Original file line number Diff line number Diff line change
Expand Up @@ -1206,9 +1206,11 @@ def _get_name_with_indices(ch_name: str, ch_parent_name: str, indices: list[int]

# update channel name
new_block.name = _get_name_with_indices(new_block.name, channel.name, nd_coords)
new_block.ignore_save = True

# append to channel list
channels.append(new_block)
grp.signal_data.append(None)

# update channel dependencies
if (deps := dependencies[cn_id]) is not None:
Expand All @@ -1230,6 +1232,7 @@ def _get_name_with_indices(ch_name: str, ch_parent_name: str, indices: list[int]
orig_name = channel.name
for cn_id in range(index, ch_len):
nd_coords = _get_nd_coords(0, multipliers)
channels[cn_id].original_name = channels[cn_id].name
name = _get_name_with_indices(channels[cn_id].name, orig_name, nd_coords)
entries = self.channels_db.pop(channels[cn_id].name)
channels[cn_id].name = name
Expand Down Expand Up @@ -10975,13 +10978,23 @@ def save(
for i, gp in enumerate(self.groups):
channels = gp.channels

prev_channel = None
for j, channel in enumerate(channels):
# ignore copied channels created as part of CA-CN nested structures
if channel.ignore_save:
continue

if channel.attachment is not None:
channel.attachment_addr = self.attachments[channel.attachment].address
elif channel.attachment_nr:
channel.attachment_addr = 0

address = channel.to_blocks(address, blocks, defined_texts, cc_map, si_map)
if channel not in blocks:
address = channel.to_blocks(address, blocks, defined_texts, cc_map, si_map)
# link cn_cn_next address between root channels
if prev_channel:
prev_channel.next_ch_addr = channel.address
prev_channel = channel

if channel.channel_type == v4c.CHANNEL_TYPE_SYNC:
if channel.attachment is not None:
Expand Down Expand Up @@ -11080,45 +11093,36 @@ def save(
channel.data_block_addr = 0

dep_list = gp.channel_dependencies[j]
if dep_list:
if all(isinstance(dep, ChannelArrayBlock) for dep in dep_list):
dep_list = typing.cast(list[ChannelArrayBlock], dep_list)
for dep in dep_list:
dep.address = address
address += dep.block_len
blocks.append(dep)
for k, dep in enumerate(dep_list[:-1]):
dep.composition_addr = dep_list[k + 1].address
dep_list[-1].composition_addr = 0

channel.component_addr = dep_list[0].address
if not dep_list:
channel.component_addr = 0
continue

# update composition channels' addresses
composed_channel = None
for dep in dep_list:
if isinstance(dep, ChannelArrayBlock):
comp_addr = dep.address = address
blocks.append(dep)
address += dep.block_len
else:
dep_list = typing.cast(list[tuple[int, int]], dep_list)
index = dep_list[0][1]
addr_ = gp.channels[index].address

group_channels = gp.channels
if group_channels:
for j, channel in enumerate(group_channels[:-1]):
channel.next_ch_addr = group_channels[j + 1].address
group_channels[-1].next_ch_addr = 0

# channel dependencies
j = len(channels) - 1
while j >= 0:
dep_list = gp.channel_dependencies[j]
if dep_list and all(isinstance(dep, tuple) for dep in dep_list):
dep_list = typing.cast(list[tuple[int, int]], dep_list)
index = dep_list[0][1]
channels[j].component_addr = channels[index].address
index = dep_list[-1][1]
channels[j].next_ch_addr = channels[index].next_ch_addr
channels[index].next_ch_addr = 0

for _, ch_nr in dep_list:
channels[ch_nr].source_addr = 0
j -= 1
index = dep[1]
composition = channels[index]
address = composition.to_blocks(address, blocks, defined_texts, cc_map, si_map)
comp_addr = composition.address

# link cn_composition channel for root channel
if composed_channel is None:
channel.component_addr = comp_addr
# link ca_composition address for CA blocks
elif isinstance(composed_channel, ChannelArrayBlock):
composed_channel.composition_addr = comp_addr
# link cn_cn_next address between sibling dependency channels
else:
index = composed_channel[1]
composition = channels[index]
composition.next_ch_addr = comp_addr

composed_channel = dep

# channel group
if gp.channel_group.flags & v4c.FLAG_CG_VLSD:
Expand Down
10 changes: 9 additions & 1 deletion src/asammdf/blocks/v4_blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,11 @@ class Channel:
channel has no conversion
* ``display_name`` - str : channel display name; this is extracted from the
XML channel comment
* ``ignore_save`` - bool : ignore channel when saving a measurement (is set
to True for nested channels derived from channel arrays)
* ``name`` - str : channel name
* ``original_name`` - str : channel name without positional indeces from
channel arrays
* ``source`` - SourceInformation : channel source information; None if the
channel has no source information
* ``unit`` - str : channel unit
Expand Down Expand Up @@ -531,6 +535,7 @@ class Channel:
"fast_path",
"flags",
"id",
"ignore_save",
"links_nr",
"lower_ext_limit",
"lower_limit",
Expand All @@ -539,6 +544,7 @@ class Channel:
"name",
"name_addr",
"next_ch_addr",
"original_name",
"pos_invalidation_bit",
"precision",
"reserved0",
Expand Down Expand Up @@ -1101,6 +1107,8 @@ def __init__(self, **kwargs: Unpack[ChannelKwargs]) -> None:

self.standard_C_size = True
self.fast_path: tuple[int, int, int, int, int, np.dtype[Any]] | None = None
self.ignore_save = False
self.original_name = self.name

def __getitem__(self, item: str) -> object:
return getattr(self, item)
Expand All @@ -1116,7 +1124,7 @@ def to_blocks(
cc_map: dict[bytes | int, int],
si_map: dict[bytes | int, int],
) -> int:
text = self.name
text = self.name if not self.original_name else self.original_name
if text in defined_texts:
self.name_addr = defined_texts[text]
else:
Expand Down
4 changes: 4 additions & 0 deletions src/asammdf/mdf.py
Original file line number Diff line number Diff line change
Expand Up @@ -816,6 +816,10 @@ def header(self) -> v3b.HeaderBlock | v4b.HeaderBlock:
def groups(self) -> list[mdf_v3.Group] | list[mdf_v4.Group]:
return self._mdf.groups

@groups.setter
def groups(self, groups: list[mdf_v3.Group] | list[mdf_v4.Group]) -> None:
self._mdf.groups = groups

@property
def channels_db(self) -> ChannelsDB:
return self._mdf.channels_db
Expand Down
Loading