-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathdatafile.py
More file actions
140 lines (117 loc) · 5.49 KB
/
datafile.py
File metadata and controls
140 lines (117 loc) · 5.49 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
# Standard Python modules
import os
# PAMGuard modules
from binary_reader.reader import Reader, FileFormatError
from binary_reader.field_type import Field, Type
# data chunk processor function
from modules.datatypes import PGChunk
from modules.chunk import Chunk, structures
from modules.datachunk import read_data
# module processor classes
from modules.genericmodule import UnimplementedModule
from modules.rightwhale import RightWhaleEdgeDetector
from modules.clipdata import ClipData
class DataFile(Chunk):
"""
Representation of a PAMGuard data file.
"""
def __init__(self, datafile, debug=True):
"""
Read specified PAMGuard data file
:param datafile: path to open datafile
:param debug: debug mode
"""
reader = Reader(datafile) # low-level binary reader
super().__init__(reader, fields=[], debug=debug)
# We use this to test if a particular type of chunk has
# been processed.
self.chunks_processed = set()
self.set_field("data", []) # Module data chunks are stored as a list
# module processor will be set once we know the module type
processor = None
# Prime the loop by reading the first chunk
# todo: Should this always be of type FILE_HEADER first? Check it?
current = Chunk(self.get_reader(), structures["chunk"])
# Loop until all chunks processed
while current is not None:
# identifier contains module type
if current.identifier == PGChunk.FILE_HEADER:
self.chunks_processed.add(current.identifier)
self.set_field("fileHeader", current)
# File header has fields which identify
# the module type, version, etc.
current.read_fields(structures["file-description"])
# File headers may have a set of bytes that are
# not interpreted here. If we skip these
# currently hardcoded to skip...
read_extra_data = False
if current.extraInfoLen > 0:
if read_extra_data:
# User wants the byte array
current.read_arrays(
[Field("extraInfo", Type("int", 8))],
current.extraInfoLen)
else:
# Skip data
current._reader.seek(self.module.extraInfoLen, os.SEEK_CUR)
# Module processors are classes designed to read
# module-specific data. Module processors are selected
# based on the moduleName field in the PAMGuard file.
processors = {
"Clip generator": ClipData,
"Right Whale Edge Detector": RightWhaleEdgeDetector,
}
if current.moduleName in processors:
processor = processors[current.moduleName](self)
else:
# We don't know the module, set the processor to a default
# module that will throw an error when constructed.
processor = UnimplementedModule(self)
self.set_field("module", processor)
elif current.identifier == PGChunk.FILE_FOOTER:
if PGChunk.FILE_HEADER not in self.chunks_processed:
raise FileFormatError("File footer before header",
self.get_reader())
current.read_fields(structures["file-footer"])
if self.fileHeader.file_format >= 3:
current.read_fields(structures["file-footer-v3+"])
current.read_fields(structures["file-footer-remainder"])
self.set_field("fileFooter", current)
elif current.identifier == PGChunk.MODULE_HEADER:
self.chunks_processed.add(current.identifier)
self.module.read_header()
elif current.identifier == PGChunk.MODULE_FOOTER:
if PGChunk.MODULE_HEADER not in self.chunks_processed:
raise FileFormatError(
"Module footer before header", self.get_reader())
self.chunks_processed.add(current.identifier)
self.module.read_footer()
elif current.identifier == PGChunk.DATA:
self.chunks_processed.add(current.identifier)
position = read_data(current, self)
self.data.append(current)
if self._debug:
loc_len = current._reader.tell()
loc_ident = loc_len + 2
print(f"Reading next chunk at file offset: 0x{loc_len:x}")
# Get start of next data chunk
try:
current = Chunk(self.get_reader(), structures["chunk"])
except EOFError:
current = None
if self._debug and current is not None:
print(current)
def __getitem__(self, index):
"""
Access a data item
:param index: position within .data
:return: self.data[index]
"""
return self.data[index]
if __name__ == "__main__":
# Look at loadPamguardBinaryFile for entry point in Matlab code
# df = DataFile("sample_data/RW_Edge_Detector_Right_Whale_Edge_Detector_Edges_20090328_000000.pgdf")
df = DataFile("sample_data/Clip_Generator_Clip_generator_Clips_20090328_000000.pgdf")
# Show big picture
print(df)
print('all done')