diff --git a/c3d/c3d.py b/c3d/c3d.py index 0f5ec8d..ea80051 100644 --- a/c3d/c3d.py +++ b/c3d/c3d.py @@ -477,21 +477,22 @@ def encode_events(self, events): event_disp_flags = np.zeros(18, dtype=np.uint8) event_labels = np.empty(18, dtype=object) label_bytes = bytearray(18 * 4) - for i, (time, label) in enumerate(events): - if i > 17: + write_count = 0 # Initiate counter in-case events is an empty iterator + for time, label in events: + if write_count == 18: # Don't raise Error, header events are rarely used. warnings.warn('Maximum of 18 events can be encoded in the header, skipping remaining events.') break - event_timings[i] = time - event_labels[i] = label - label_bytes[i * 4:(i + 1) * 4] = label.encode('utf-8') + event_timings[write_count] = time + event_labels[write_count] = label + label_bytes[write_count * 4:(write_count + 1) * 4] = label.encode('utf-8') + write_count += 1 - write_count = min(i + 1, 18) event_disp_flags[:write_count] = 1 # Update event headers in self - self.long_event_labels = 0x3039 # Magic number + self.long_event_labels = True # Bool used instead of magic number 0x3039 self.event_count = write_count # Update event block self.event_timings = event_timings[:write_count] @@ -524,7 +525,7 @@ class Param(object): column-major order. For arrays of strings, the dimensions here will be the number of columns (length of each string) followed by the number of rows (number of strings). - bytes : str + bytes_data : str Raw data for this parameter. handle : File handle positioned at the first byte of a .c3d parameter description. @@ -536,7 +537,7 @@ def __init__(self, desc='', bytes_per_element=1, dimensions=None, - bytes=b'', + bytes_data=b'', handle=None): '''Set up a new parameter, only the name is required.''' self.name = name @@ -544,9 +545,11 @@ def __init__(self, self.desc = desc self.bytes_per_element = bytes_per_element self.dimensions = dimensions or [] - self.bytes = bytes + self.bytes_data = bytes_data if handle: self.read(handle) + elif not isinstance(bytes_data, bytes): + raise ValueError("Expected `bytes_data` input to be a bytes object.") def __repr__(self): return ''.format(self.desc) @@ -554,10 +557,7 @@ def __repr__(self): @property def num_elements(self): '''Return the number of elements in this parameter's array value.''' - e = 1 - for d in self.dimensions: - e *= d - return e + return int(np.prod(self.dimensions)) @property def total_bytes(self): @@ -594,8 +594,8 @@ def write(self, group_id, handle): handle.write(struct.pack('b', self.bytes_per_element)) handle.write(struct.pack('B', len(self.dimensions))) handle.write(struct.pack('B' * len(self.dimensions), *self.dimensions)) - if self.bytes is not None and len(self.bytes) > 0: - handle.write(self.bytes) + if self.bytes_data is not None and len(self.bytes_data) > 0: + handle.write(self.bytes_data) desc = self.desc.encode('utf-8') handle.write(struct.pack('B', len(desc))) handle.write(desc) @@ -610,21 +610,22 @@ def read(self, handle): dims, = struct.unpack('B', handle.read(1)) self.dimensions = [struct.unpack('B', handle.read(1))[ 0] for _ in range(dims)] - self.bytes = b'' + self.bytes_data = b'' if self.total_bytes: - self.bytes = handle.read(self.total_bytes) + self.bytes_data = handle.read(self.total_bytes) + desc_size, = struct.unpack('B', handle.read(1)) self.desc = desc_size and self._dtypes.decode_string(handle.read(desc_size)) or '' def _as(self, dtype): '''Unpack the raw bytes of this param as a single value of the given data type.''' - return np.frombuffer(self.bytes, count=1, dtype=dtype)[0] + return np.frombuffer(self.bytes_data, count=1, dtype=dtype)[0] def _as_array(self, dtype, copy=True): '''Unpack the raw bytes of this param as an array of the given data type.''' assert self.dimensions, \ '{}: cannot get value as {} array!'.format(self.name, dtype) - buffer_view = np.frombuffer(self.bytes, dtype=dtype) + buffer_view = np.frombuffer(self.bytes_data, dtype=dtype) # Reverse shape as the shape is defined in fortran format buffer_view = buffer_view.reshape(self.dimensions[::-1]) if copy: @@ -708,12 +709,12 @@ def float_value(self): @property def bytes_value(self): '''Get the param as a raw byte string.''' - return self.bytes + return self.bytes_data @property def string_value(self): '''Get the param as a unicode string.''' - return self._dtypes.decode_string(self.bytes) + return self._dtypes.decode_string(self.bytes_data) @property def int8_array(self): @@ -763,7 +764,7 @@ def float32_array(self): # _as_array but for DEC assert self.dimensions, \ '{}: cannot get value as {} array!'.format(self.name, self._dtypes.float32) - return DEC_to_IEEE_BYTES(self.bytes).reshape(self.dimensions[::-1]) # Reverse fortran format + return DEC_to_IEEE_BYTES(self.bytes_data).reshape(self.dimensions[::-1]) # Reverse fortran format else: # is_ieee or is_mips return self._as_array(self._dtypes.float32) @@ -829,7 +830,7 @@ def bytes_array(self): if len(self.dimensions) == 0: return np.array([]) elif len(self.dimensions) == 1: - return np.array(self.bytes) + return np.array(self.bytes_data) else: # Convert Fortran shape (data in memory is identical, shape is transposed) word_len = self.dimensions[0] @@ -840,7 +841,7 @@ def bytes_array(self): for i in np.ndindex(*dims): # Calculate byte offset as sum of each array index times the byte step of each dimension. off = np.sum(np.multiply(i, byte_steps)) - byte_arr[i] = self.bytes[off:off+word_len] + byte_arr[i] = self.bytes_data[off:off+word_len] return byte_arr @property @@ -982,7 +983,9 @@ def add_param(self, name, **kwargs): name = name.upper() if name in self._params: raise KeyError('Parameter already exists with key {}'.format(name)) - self._params[name] = Param(name, self._dtypes, **kwargs) + param = Param(name, self._dtypes, **kwargs) + self._params[name] = param + return param def remove_param(self, name): '''Remove the specified parameter. @@ -1159,19 +1162,19 @@ def group_listed(self): def _check_metadata(self): ''' Ensure that the metadata in our file is self-consistent. ''' assert self._header.point_count == self.point_used, ( - 'inconsistent point count! {} header != {} POINT:USED'.format( + 'Inconsistent point count, {} header != {} POINT:USED'.format( self._header.point_count, self.point_used, )) assert self._header.scale_factor == self.point_scale, ( - 'inconsistent scale factor! {} header != {} POINT:SCALE'.format( + 'Inconsistent scale factor, {} header != {} POINT:SCALE'.format( self._header.scale_factor, self.point_scale, )) assert self._header.frame_rate == self.point_rate, ( - 'inconsistent frame rate! {} header != {} POINT:RATE'.format( + 'Inconsistent frame rate, {} header != {} POINT:RATE'.format( self._header.frame_rate, self.point_rate, )) @@ -1181,7 +1184,7 @@ def _check_metadata(self): else: ratio = 0 assert self._header.analog_per_frame == ratio, ( - 'inconsistent analog rate! {} header != {} analog-fps / {} point-fps'.format( + 'Inconsistent analog rate, {} header != {} analog-fps / {} point-fps'.format( self._header.analog_per_frame, self.analog_rate, self.point_rate, @@ -1189,7 +1192,7 @@ def _check_metadata(self): count = self.analog_used * self._header.analog_per_frame assert self._header.analog_count == count, ( - 'inconsistent analog count! {} header != {} analog used * {} per-frame'.format( + 'Inconsistent analog count, {} header != {} analog used * {} per-frame'.format( self._header.analog_count, self.analog_used, self._header.analog_per_frame, @@ -1198,7 +1201,7 @@ def _check_metadata(self): try: start = self.get_uint16('POINT:DATA_START') if self._header.data_block != start: - warnings.warn('inconsistent data block! {} header != {} POINT:DATA_START'.format( + warnings.warn('Inconsistent data block, {} header != {} POINT:DATA_START'.format( self._header.data_block, start)) except AttributeError: warnings.warn('''no pointer available in POINT:DATA_START indicating the start of the data block, using @@ -1430,7 +1433,10 @@ def point_used(self): @property def analog_used(self): - ''' Number of analog measurements, or channels, for each analog data sample. + ''' Number of analog measured variables, or channels, within a analog data sample/frame. + + Does not account for the number of samples for each channel per point frame, + see 'analog_per_frame' to find the total number of samples recorded per frame. ''' try: return self.get_uint16('ANALOG:USED') @@ -1494,11 +1500,17 @@ def analog_resolution(self): @property def point_labels(self): - return self.get('POINT:LABELS').string_array + grp = self.get('POINT:LABELS') + if grp is None: + return None + return grp.string_array @property def analog_labels(self): - return self.get('ANALOG:LABELS').string_array + grp = self.get('ANALOG:LABELS') + if grp is None: + return None + return grp.string_array @property def frame_count(self): @@ -1667,6 +1679,10 @@ def seek_param_section_header(): # read the remaining bytes in the parameter section. bytes = self._handle.read(endbyte - self._handle.tell()) else: + if offset_to_next - 2 < -1: + raise ValueError( + "Corrupt file with invalid offset written to file. Attempted to read parameter " + "with name {} and {} bytes".format(name, offset_to_next - 2)) bytes = self._handle.read(offset_to_next - 2) buf = io.BytesIO(bytes) @@ -1881,11 +1897,51 @@ def to_writer(self, conversion): class GroupEditable(Decorator): ''' Group instance decorator providing convenience functions for Writer editing. ''' - def __init__(self, group): + def __init__(self, group: Group): super(GroupEditable, self).__init__(group) def __contains__(self, key): return key in self._decoratee + + @property + def group(self) -> Group: + return self._decoratee + + def add_param(self, name, desc='', bytes_per_element=1, bytes_data=b'', dimensions=None): + """ Decorate the raw Group.add_param() function to split the inputs if the leading dimension is > 255. + """ + if not isinstance(bytes_data, bytes): + raise ValueError("Expected `bytes_data` to be a bytes object was of type {}".format(type(bytes_data))) + # Dimension must fit in a 8 bit unsigned int + if len(dimensions) == 0 or dimensions[-1] < 255: + # Forward + self.group.add_param(name, + desc=desc, + bytes_per_element=bytes_per_element, + bytes_data=bytes_data, + dimensions=dimensions) + return + + # Split the parameter into partial group parameters + num_param = int((dimensions[-1] - 1) / 255) + 1 # ceil(dim / 255) + elem_per_dim = np.prod(dimensions[:-1]) + + for index in range(num_param): + name_param = name if index == 0 else "{}{}".format(name, index + 1) + # Determine the byte array for the partial parameter + abpe = abs(bytes_per_element) # Take absolute for strings (bpe == -1) + first_byte = elem_per_dim * 255 * index * abpe + last_byte = first_byte + elem_per_dim * 255 * abpe + bytes_param = bytes_data[first_byte:last_byte] + # Determine shape + dimensions_param = list(dimensions) # Create a list copy (assignable) + dimensions_param[-1] = min(255, dimensions_param[-1] - index * 255) + param = self.group.add_param(name_param, + desc=desc, + bytes_per_element=bytes_per_element, + bytes_data=bytes_param, + dimensions=dimensions_param) + # # Add decorator functions (throws on overwrite) # @@ -1900,7 +1956,7 @@ def add(self, name, desc, bpe, format, data, *dimensions): self.add_param(name, desc=desc, bytes_per_element=bpe, - bytes=data, + bytes_data=data, dimensions=list(dimensions)) def add_array(self, name, desc, data, dtype=None): @@ -1913,7 +1969,7 @@ def add_array(self, name, desc, data, dtype=None): ''' if not isinstance(data, np.ndarray): if dtype is not None: - raise ValueError('Must specify dtype when passning non-numpy array type.') + raise ValueError('Must specify dtype when passing non-numpy array type.') data = np.array(data, dtype=dtype) elif dtype is None: dtype = data.dtype @@ -1921,29 +1977,45 @@ def add_array(self, name, desc, data, dtype=None): self.add_param(name, desc=desc, bytes_per_element=dtype.itemsize, - bytes=data, + bytes_data=data.flatten().tobytes(), dimensions=data.shape) def add_str(self, name, desc, data, *dimensions): ''' Add a string parameter. ''' + if len(dimensions) == 0: + if not is_iterable(data): + raise ValueError("Expected bytes or strings, was {}".format(str(type(data)))) + if isinstance(data, str): + # Single string entry + dimensions = (len(data), ) + else: + # List of string entries + label_str, label_max_size = Writer.pack_labels(data) + dimensions = (label_max_size, len(data)) + data = label_str + elif not isinstance(data, str): + raise ValueError("Expected input to be an encodable string matching the dimension input") + self.add_param(name, desc=desc, bytes_per_element=-1, - bytes=data.encode('utf-8'), + bytes_data=data.encode('utf-8'), dimensions=list(dimensions)) - def add_empty_array(self, name, desc, bpe): + def add_empty_array(self, name, desc, bytes_per_element=0): ''' Add an empty parameter block. ''' - self.add_param(name, desc=desc, - bytes_per_element=bpe, dimensions=[0]) + self.add_param(name, + desc=desc, + bytes_per_element=bytes_per_element, + dimensions=[0]) # # Set decorator functions (overwrites) # def set(self, name, *args, **kwargs): - ''' Add or overwrite a parameter with 'bytes' package formated in accordance with 'format'. + ''' Add or overwrite a parameter with 'bytes_data' package formated in accordance with 'format'. ''' try: self.remove_param(name) @@ -2043,8 +2115,8 @@ def from_reader(reader, conversion=None): 'copy' - Reader objects will be deep copied. 'copy_metadata' - Similar to 'copy' but only copies metadata and not point and analog frame data. - 'copy_shallow' - Similar to 'copy' but group parameters are - not copied. + 'copy_shallow' - Similar to 'copy' but group parameters are not copied. + Usefull for stripping away parameter meta data. 'copy_header' - Similar to 'copy_shallow' but only the header is copied (frame data is not copied). @@ -2064,6 +2136,7 @@ def from_reader(reader, conversion=None): is_header_only = conversion == 'copy_header' is_meta_copy = conversion == 'copy_metadata' is_meta_only = is_header_only or is_meta_copy + is_consume = conversion == 'convert' or conversion is None is_shallow_copy = conversion == 'shallow_copy' or is_header_only is_deep_copy = conversion == 'copy' or is_meta_copy @@ -2080,10 +2153,7 @@ def from_reader(reader, conversion=None): if is_consume: writer._header = reader._header - reader._header = None writer._groups = reader._groups - reader._groups = None - del reader elif is_deep_copy: writer._header = copy.deepcopy(reader._header) writer._groups = copy.deepcopy(reader._groups) @@ -2093,7 +2163,7 @@ def from_reader(reader, conversion=None): # Reformat header events writer._header.encode_events(writer._header.events) - # Transfer a minimal set parameters + # Transfer a minimal parameter set writer.set_start_frame(reader.first_frame) writer.set_point_labels(reader.point_labels) writer.set_analog_labels(reader.analog_labels) @@ -2107,6 +2177,9 @@ def from_reader(reader, conversion=None): # Copy frames for (i, point, analog) in reader.read_frames(copy=True, camera_sum=False): writer.add_frames((point, analog)) + if is_consume: + del reader + return writer @property @@ -2200,7 +2273,7 @@ def add_frames(self, frames, index=None): Parameters ---------- - frames : Single or sequence of (point, analog) pairs + frames : Single or sequence of (point, analog) pairs. A sequence or frame of frame data to add to the writer. index : int or None Insert the frame or sequence at the index (the first sequence frame will be inserted at give index). @@ -2224,24 +2297,53 @@ def add_frames(self, frames, index=None): @staticmethod def pack_labels(labels): + ''' Static method used to pack and pad the set of `labels` strings before + passing the output into a `c3d.group.Group.add_str`. + Parameters + ---------- + labels : iterable + List of strings to pack and pad into a single string suitable for encoding in a Parameter entry. + Example + ------- + >>> labels = ['RFT1', 'RFT2', 'RFT3', 'LFT1', 'LFT2', 'LFT3'] + >>> param_str, label_max_size = Writer.pack_labels(labels) + >>> writer.point_group.add_str('LABELS', + 'Point labels.', + label_str, + label_max_size, + len(labels)) + Returns + ------- + param_str : str + String containing `labels` packed into a single variable where + each string is padded to match the longest `labels` string. + label_max_size : int + Number of bytes associated with the longest `label` string, all strings are padded to this length. + ''' labels = np.ravel(labels) # Get longest label name label_max_size = 0 - label_max_size = max(label_max_size, np.max([len(label) for label in labels])) + label_max_size = max(label_max_size, np.max([0] + [len(label) for label in labels])) label_str = ''.join(label.ljust(label_max_size) for label in labels) return label_str, label_max_size def set_point_labels(self, labels): ''' Set point data labels. ''' - label_str, label_max_size = Writer.pack_labels(labels) - self.point_group.add_str('LABELS', 'Point labels.', label_str, label_max_size, len(labels)) + grp = self.point_group + if labels is None: + grp.add_empty_array('LABELS', 'Point labels.') + else: + grp.add_str('LABELS', 'Point labels.', labels) def set_analog_labels(self, labels): ''' Set analog data labels. ''' - label_str, label_max_size = Writer.pack_labels(labels) - self.analog_group.add_str('LABELS', 'Analog labels.', label_str, label_max_size, len(labels)) + grp = self.analog_group + if labels is None: + grp.add_empty_array('LABELS', 'Analog labels.') + else: + grp.add_str('LABELS', 'Analog labels.', labels) def set_analog_general_scale(self, value): ''' Set ANALOG:GEN_SCALE factor (uniform analog scale factor). @@ -2274,11 +2376,11 @@ def set_analog_offsets(self, values): ''' if is_iterable(values): data = np.array([v for v in values], dtype=np.int16) - self.analog_group.set_array('OFFSET', 'Analog channel offsets', data) elif values is None: - self.analog_group.set_empty_array('OFFSET', 'Analog channel offsets', 2) + data = np.zeros((self.analog_used, ), dtype=np.int16) else: raise ValueError('Expected iterable containing analog data offsets.') + self.analog_group.set_array('OFFSET', 'Analog channel offsets', data) def set_start_frame(self, frame=1): ''' Set the 'TRIAL:ACTUAL_START_FIELD' parameter and header.first_frame entry. @@ -2348,6 +2450,7 @@ def write(self, handle): # POINT group group = self.point_group + self._header.point_count = np.uint16(ppf) group.set('USED', 'Number of point samples', 2, '= UINT16_MAX: @@ -2368,6 +2471,7 @@ def write(self, handle): # ANALOG group group = self.analog_group + self._header.analog_count = np.uint16(np.prod(np.shape(analog))) group.set('USED', 'Analog channel count', 2, ' 255: + raise RuntimeError("To much data stored in parameter blocks. Maximum number of blocks is 255, " + "current file contains {} blocks".format(num_param_blocks)) handle.write(struct.pack( - 'BBBB', 0, 0, self.parameter_blocks(), PROCESSOR_INTEL)) + 'BBBB', 0, 0, num_param_blocks, PROCESSOR_INTEL)) for group_id, group in self.group_listed(): group.write(group_id, handle) @@ -2452,12 +2557,14 @@ def _write_frames(self, handle): for points, analog in self._frames: # Transform point data - valid = points[:, 3] >= 0.0 - raw[~valid, 3] = -1 - raw[valid, :3] = points[valid, :3] / point_scale - raw[valid, 3] = np.bitwise_or(np.rint(points[valid, 3] / scale_mag).astype(np.uint8), - (points[valid, 4].astype(np.uint16) << 8), - dtype=np.uint16) + if self.point_used: + # If not empty + valid = points[:, 3] >= 0.0 + raw[~valid, 3] = -1 + raw[valid, :3] = points[valid, :3] / point_scale + raw[valid, 3] = np.bitwise_or(np.rint(points[valid, 3] / scale_mag).astype(np.uint8), + (points[valid, 4].astype(np.uint16) << 8), + dtype=np.uint16) # Transform analog data analog = analog * analog_scales_inv + analog_offsets @@ -2467,4 +2574,5 @@ def _write_frames(self, handle): analog = analog.astype(point_dtype) handle.write(raw.tobytes()) handle.write(analog.tobytes()) + self._pad_block(handle) diff --git a/test/test_c3d.py b/test/test_c3d.py index 1ae41e3..7b4b252 100644 --- a/test/test_c3d.py +++ b/test/test_c3d.py @@ -45,7 +45,7 @@ def test_paramsb(self): for p in g.param_values(): if len(p.dimensions) == 0: val = None - width = len(p.bytes) + width = len(p.bytes_data) if width == 2: val = p.int16_value elif width == 4: diff --git a/test/test_parameter_accessors.py b/test/test_parameter_accessors.py index 5238b83..111b32d 100644 --- a/test/test_parameter_accessors.py +++ b/test/test_parameter_accessors.py @@ -59,7 +59,7 @@ def verify_add_parameter(self, N): for i in range(1, N): test_name = 'TEST_ADD_PARAM_%i' % i arr = self.rnd.uniform(*self.flt_range, size=self.shape).astype(np.float32) - self.group.add_param(test_name, bytes_per_element=4, dimensions=arr.shape, bytes=arr.T.tobytes()) + self.group.add_param(test_name, bytes_per_element=4, dimensions=arr.shape, bytes_data=arr.T.tobytes()) assert self.group.get(test_name) is not None, 'Added group does not exist.' self.assert_group_items() diff --git a/test/test_parameter_bytes_conversion.py b/test/test_parameter_bytes_conversion.py index 14d02c4..0b09a93 100644 --- a/test/test_parameter_bytes_conversion.py +++ b/test/test_parameter_bytes_conversion.py @@ -54,7 +54,7 @@ def test_a_param_float32(self): for i in range(ParameterValueTest.TEST_ITERATIONS): value = np.float32(self.rnd.uniform(*ParameterValueTest.RANGE_32_BIT)) bytes = struct.pack(' 5) - P = c3d.Param('STRING_TEST', self.dtypes, bytes_per_element=-1, dimensions=shape, bytes=arr.T.tobytes()) + P = c3d.Param('STRING_TEST', self.dtypes, bytes_per_element=-1, dimensions=shape, bytes_data=arr.T.tobytes()) arr_out = P.string_array assert arr.T.shape == arr_out.shape, "Mismatch in 'string_array' converted shape. Was %s, expected %s" %\ @@ -323,7 +323,7 @@ def test_k_parse_random_string_array(self): # 4 dims for wlen in range(10): arr, shape = genRndByteArr(wlen, [7, 5, 3], wlen > 5) - P = c3d.Param('STRING_TEST', self.dtypes, bytes_per_element=-1, dimensions=shape, bytes=arr.T.tobytes()) + P = c3d.Param('STRING_TEST', self.dtypes, bytes_per_element=-1, dimensions=shape, bytes_data=arr.T.tobytes()) arr_out = P.string_array assert arr.T.shape == arr_out.shape, "Mismatch in 'string_array' converted shape. Was %s, expected %s" %\ @@ -335,7 +335,7 @@ def test_k_parse_random_string_array(self): # 5 dims for wlen in range(10): arr, shape = genRndByteArr(wlen, [7, 6, 5, 3], wlen > 5) - P = c3d.Param('STRING_TEST', self.dtypes, bytes_per_element=-1, dimensions=shape, bytes=arr.T.tobytes()) + P = c3d.Param('STRING_TEST', self.dtypes, bytes_per_element=-1, dimensions=shape, bytes_data=arr.T.tobytes()) arr_out = P.string_array assert arr.T.shape == arr_out.shape, "Mismatch in 'string_array' converted shape. Was %s, expected %s" %\ diff --git a/test/test_software_examples_write_read.py b/test/test_software_examples_write_read.py index 3f02ae9..f457b5f 100644 --- a/test/test_software_examples_write_read.py +++ b/test/test_software_examples_write_read.py @@ -5,18 +5,24 @@ import unittest import os import test.verify as verify +import numpy as np from test.base import Base from test.zipload import Zipload, TEMP +# Run all examples on all copy tests +MINIMAL_TEST = True -def verify_read_write(zip, file_path, proc_type='INTEL', real=True): + +def verify_read_write(zip, file_path, proc_type='INTEL', real=True, cpy_mode='copy'): ''' Compare read write ouput to original read file. ''' + assert cpy_mode != 'copy_header', "Copy mode not supported for verification" A = c3d.Reader(Zipload._get(zip, file_path)) - cpy_mode = 'copy' if proc_type != 'INTEL': + # Deep copy for data in non-Intel files is not supported cpy_mode = 'shallow_copy' + writer = A.to_writer(cpy_mode) tmp_path = os.path.join(TEMP, 'write_test.c3d') @@ -29,9 +35,232 @@ def verify_read_write(zip, file_path, proc_type='INTEL', real=True): with open(tmp_path, 'rb') as handle: B = c3d.Reader(handle) - verify.equal_headers(test_id, A, B, aname, bname, real, real) - verify.data_is_equal(A, B, aname, bname) + if cpy_mode == 'convert': + # Compare writer instead + verify.equal_headers(test_id, writer, B, aname, bname, real, real) + else: + verify.equal_headers(test_id, A, B, aname, bname, real, real) + verify.data_is_equal(A, B, aname, bname) + + +def create_dummy_writer(labels=None, frames=1): + """ Create a dummy writer populated with the given number of random frames and labels (to not be empty). + """ + writer = c3d.Writer(point_rate=200) + if labels is None: + labels = ['RFT1', 'RFT2', 'RFT3', 'RFT4', 'LFT1', 'LFT2', 'LFT3', 'LFT4', + 'RSK1', 'RSK2', 'RSK3', 'RSK4', 'LSK1', 'LSK2', 'LSK3', 'LSK4', + 'RTH1', 'RTH2', 'RTH3', 'RTH4', 'LTH1', 'LTH2', 'LTH3', 'LTH4' + ] + + if frames == 1: + for _ in range(frames): + writer.add_frames((np.random.randn(len(labels), 5), ())) + elif frames > 1: + new_frames = [] + for __ in range(frames): + new_frames.append((np.random.randn(len(labels), 5), ())) + writer.add_frames(new_frames) + + writer.set_point_labels(labels) + writer.set_analog_labels(None) + return writer + +class GeneratedExamples(Base): + + def test_error_writing_no_frames(self): + """ Verify no frames generates a runtime error (illegal to write empty file). + """ + writer = c3d.Writer(point_rate=200) + writer.set_point_labels(None) + writer.set_analog_labels(None) + + tmp_path = os.path.join(TEMP, 'no-frames.c3d') + + try: + with open(tmp_path, 'wb') as h: + writer.write(h) + assert False, "Expected RuntimeError writing empty file." + except RuntimeError as e: + pass # RuntimeError writing empty file + + def test_error_adding_invalid_frames(self): + """ Verify no frames generates a runtime error (illegal to write empty file). + """ + writer = c3d.Writer(point_rate=200) + writer.set_point_labels(None) + writer.set_analog_labels(None) + + with self.assertRaises(ValueError): + writer.add_frames(((), ()),) + + with self.assertRaises(ValueError): + # Invalid, to few dims + writer.add_frames(np.random.randn(3, 5)) + + with self.assertRaises(ValueError): + # Invalid, expect first dim to contain 2 elements + writer.add_frames(np.random.randn(3, 13, 5)) + + with self.assertRaises(ValueError): + # Mismatch due to invalid second dim + writer.add_frames(np.random.randn(4, 3, 27, 5)) + + with self.assertRaises(ValueError): + # Raise due to analog rate mismatch (invalid 4th dim) + writer.add_frames(np.random.randn(4, 2, 17, 5)) + + with self.assertRaises(ValueError): + # Raise due to extra dimensions + writer.add_frames(np.random.randn(5, 4, 2, 7, 5)) + + def test_writing_single_point_frame(self): + """ Verify writing a file with a single frame. + """ + labels = ['RFT1', 'RFT2', 'RFT3', 'RFT4', 'LFT1', 'LFT2', 'LFT3', 'LFT4', + 'RSK1', 'RSK2', 'RSK3', 'RSK4', 'LSK1', 'LSK2', 'LSK3', 'LSK4', + 'RTH1', 'RTH2', 'RTH3', 'RTH4', 'LTH1', 'LTH2', 'LTH3', 'LTH4' + ] + writer = create_dummy_writer(labels) + + tmp_path = os.path.join(TEMP, 'single-point-frame.c3d') + with open(tmp_path, 'wb') as h: + writer.write(h) + + with open(tmp_path, 'rb') as handle: + B = c3d.Reader(handle) + + verify.equal_headers("test_writing_single_point_frame", writer, B, "Original", "WriteRead", True, True) + + for a, b in zip(labels, B.get('POINT.LABELS').string_array): + assert a == b, "Label missmatch" + + def test_writing_multiple_point_frame(self): + """ Verify writing a file with a single frame. + """ + num_frames = 11 + writer = create_dummy_writer(frames=num_frames) + + tmp_path = os.path.join(TEMP, 'single-point-frame.c3d') + with open(tmp_path, 'wb') as h: + writer.write(h) + + with open(tmp_path, 'rb') as handle: + B = c3d.Reader(handle) + assert B.frame_count == num_frames, "Expected {} point frames was {}".format(num_frames, B.frame_count) + verify.equal_headers("test_writing_multiple_point_frame", writer, B, "Original", "WriteRead", True, True) + + def test_writing_analog_frames(self): + """ Verify writing a file with a single frame. + """ + labels = ['RFT1', 'RFT2', 'RFT3', 'RFT4', 'LFT1', 'LFT2', 'LFT3', 'LFT4', + 'RSK1', 'RSK2', 'RSK3', 'RSK4', 'LSK1', 'LSK2', 'LSK3', 'LSK4', + 'RTH1', 'RTH2', 'RTH3', 'RTH4', 'LTH1', 'LTH2', 'LTH3', 'LTH4' + ] + writer = c3d.Writer(point_rate=12, analog_rate=36) + + # Single frame input + for __ in range(5): + writer.add_frames(((), np.random.randn(len(labels), writer.analog_per_frame),)) + + # Twin frame input + new_frames = [((), np.random.randn(len(labels), writer.analog_per_frame),), + ((), np.random.randn(len(labels), writer.analog_per_frame),)] + writer.add_frames(new_frames) + + # Multi frame input + new_frames = [] + for __ in range(5): + new_frames.append(((), np.random.randn(len(labels), writer.analog_per_frame),)) + writer.add_frames(new_frames) + + writer.set_point_labels(None) + writer.set_analog_labels(labels) + + tmp_path = os.path.join(TEMP, 'single-analog-frame.c3d') + with open(tmp_path, 'wb') as h: + writer.write(h) + + with open(tmp_path, 'rb') as handle: + B = c3d.Reader(handle) + + verify.equal_headers("test_writing_single_point_frame", writer, B, "Original", "WriteRead", True, True) + + assert B.analog_sample_count == 12 * writer.analog_per_frame, "Expected {} samples was {}".format( + B.analog_sample_count, 12 * writer.analog_per_frame) + + for a, b in zip(labels, B.get('ANALOG.LABELS').string_array): + assert a == b, "Label missmatch" + + def test_write_long_str_param(self): + writer = create_dummy_writer() + grp = writer.add_group(66, "UnittestGroup", "Generated for unittest purposes") + + num_param = 10 + data = [] + for index in range(255 * num_param): + value = "Str:" + format(index, '08d') + assert len(value) == 12, "Unittest is invalid, expected string with length 12" + data.append(value) + + grp.add_str("LongString", "String spanning %i parameters." % num_param, data) + + tmp_path = os.path.join(TEMP, 'long_str_parameter.c3d') + with open(tmp_path, 'wb') as h: + writer.write(h) + + with open(tmp_path, 'rb') as handle: + B = c3d.Reader(handle) + + verify.equal_headers("test_write_long_str_param", writer, B, "Original", "WriteRead", True, True) + + str_index = 0 + for index in range(0, num_param): + postfix = "" if index == 0 else str(index + 1) + param_name = "UnittestGroup.LongString" + postfix + agrp = writer.get(param_name) + bgrp = B.get(param_name) + + # Verify string parameter matches initial input and between read/write + read_data = bgrp.string_array + assert np.array_equal(agrp.string_array, read_data), "Expected string data to match" + assert np.array_equal(data[str_index:str_index + len(read_data)], read_data), "Expected string data to match" + str_index += len(read_data) + + def test_write_long_float_param(self): + writer = create_dummy_writer() + grp = writer.add_group(66, "UnittestGroup", "Generated for unittest purposes") + + num_param = 10 + + data = np.random.randn(2, 2, 255 * num_param).astype(np.float32) + grp.add_array("LongFltArray", "Float array spanning {} parameters.".format(num_param), data) + # Fortran to C order for comparison + data = data.reshape(data.shape[::-1]) + + tmp_path = os.path.join(TEMP, 'long_flt_parameter.c3d') + with open(tmp_path, 'wb') as h: + writer.write(h) + + with open(tmp_path, 'rb') as handle: + B = c3d.Reader(handle) + + verify.equal_headers("test_write_long_float_param", writer, B, "Original", "WriteRead", True, True) + prm_index = 0 + for index in range(0, num_param): + postfix = "" if index == 0 else str(index + 1) + param_name = "UnittestGroup.LongFltArray" + postfix + agrp = writer.get(param_name) + bgrp = B.get(param_name) + assert bgrp is not None, "Parameter {} was not written to the file".format(param_name) + + # Verify string parameter matches initial input and between read/write + read_data = bgrp.float32_array + input_data = data[prm_index:prm_index + len(read_data)] + assert np.array_equal(agrp.float32_array, read_data), "Expected array to match between Reader and Writer" + assert np.array_equal(input_data, read_data), "Expected array data to match the input data" + prm_index += len(read_data) class Sample00(Base): @@ -48,8 +277,8 @@ class Sample00(Base): ('Vicon Motion Systems', ['pyCGM2 lower limb CGM24 Walking01.c3d', 'TableTennis.c3d']), ] - def test_read_write_examples(self): - ''' Compare write ouput to original read + def test_read_write_copy_examples(self): + ''' Compare data copied through the `Writer` class to data in the original file ''' print('----------------------------') @@ -62,6 +291,81 @@ def test_read_write_examples(self): print('... OK') print('DONE') + def test_read_write_shallow_copy_examples(self): + ''' Compare shallow copied data written by the `Writer` class to data in the original file + ''' + + print('----------------------------') + print('Shallow-copy') + print('----------------------------') + if MINIMAL_TEST: + zip_files = [self.zip_files[-1]] + else: + zip_files = self.zip_files + for folder, files in zip_files: + print('{} | Validating...'.format(folder)) + for file in files: + verify_read_write(self.ZIP, '{}/{}'.format(folder, file), cpy_mode='shallow_copy') + print('... OK') + print('DONE') + + def test_read_write_convert_examples(self): + ''' Compare data written by a 'convert' `Reader` to data in the original file + ''' + + print('----------------------------') + print('Convert') + print('----------------------------') + if MINIMAL_TEST: + zip_files = [self.zip_files[-1]] + else: + zip_files = self.zip_files + for folder, files in zip_files: + print('{} | Validating...'.format(folder)) + for file in files: + verify_read_write(self.ZIP, '{}/{}'.format(folder, file), cpy_mode='convert') + print('... OK') + print('DONE') + + def test_read_write_header_examples(self): + ''' Compare data written by a 'copy_header' only `Writer` to data in the original file + ''' + + print('----------------------------') + print('copy_header') + print('----------------------------') + if MINIMAL_TEST: + zip_files = [self.zip_files[-1]] + else: + zip_files = self.zip_files + for folder, files in zip_files: + print('{} | Validating...'.format(folder)) + for file in files: + A = c3d.Reader(Zipload._get(self.ZIP, '{}/{}'.format(folder, file))) + writer = A.to_writer('copy_header') + verify.equal_headers('test_read_write_header_examples', A, writer, 'Original', 'Writer Copy', True, True) + print('... OK') + print('DONE') + + def test_read_write_copy_metadata_examples(self): + ''' Compare data written by a 'copy_metadata' only `Writer` to data in the original file + ''' + + print('----------------------------') + print('copy_metadata') + print('----------------------------') + if MINIMAL_TEST: + zip_files = [self.zip_files[-1]] + else: + zip_files = self.zip_files + for folder, files in zip_files: + print('{} | Validating...'.format(folder)) + for file in files: + A = c3d.Reader(Zipload._get(self.ZIP, '{}/{}'.format(folder, file))) + writer = A.to_writer('copy_metadata') + verify.equal_headers('test_read_write_copy_metadata_examples', A, writer, 'Original', 'Writer Copy', True, True) + print('... OK') + print('DONE') class Sample01(Base): diff --git a/test/verify.py b/test/verify.py index 17e6d05..c543cad 100644 --- a/test/verify.py +++ b/test/verify.py @@ -130,7 +130,7 @@ def check_zipfile(file_path): print('----------------------------') print(type(self)) print('----------------------------') - if len(np.shape(self.zip_files)) == 1: + if len(np.array(self.zip_files, dtype=object).shape) == 1: for file in self.zip_files: check_zipfile(file) else: @@ -213,8 +213,8 @@ def equal_headers(test_label, areader, breader, alabel, blabel, areal, breal): assert aheader.max_gap == bheader.max_gap, \ '{}, max_gap: {} {}, {} {}'.format( test_label, alabel, aheader.max_gap, blabel, bheader.max_gap) - assert aheader.long_event_labels == bheader.long_event_labels, \ - '{}, long_event_labels: {} {}, {} {}'.format( + assert isinstance(aheader.long_event_labels, bool) and isinstance(bheader.long_event_labels, bool), \ + '{}, expected long_event_labels to be boolean: {} {}, {} {}'.format( test_label, alabel, aheader.long_event_labels, blabel, bheader.long_event_labels) event_mismatch = max(0, bheader.event_count - aheader.event_count)