Skip to content

Commit 0b66a1c

Browse files
committed
wip updating tx module
Drafting a main script for examplifying main components. This include: * gmail client * iridium message parsing * Binary payload parsing wip tx processing * Implemented fully functional binary payload decoder * Added parsing exception with handling examples Added various comments
1 parent 6eb67b4 commit 0b66a1c

File tree

9 files changed

+609
-1
lines changed

9 files changed

+609
-1
lines changed

src/pypromice/tx/gmail_client.py

Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
import email.parser
2+
import imaplib
3+
import logging
4+
from configparser import ConfigParser
5+
from datetime import datetime
6+
from mailbox import Message
7+
from pathlib import Path
8+
from typing import Iterator
9+
10+
__all__ = ["GmailClient"]
11+
12+
logger = logging.getLogger(__name__)
13+
14+
class GmailClient:
15+
16+
def __init__(self, server: str, port: int, account: str, password: str, last_uid: int = 1, chunk_size: int = 500):
17+
18+
self.last_uid = last_uid
19+
self.chunk_size = chunk_size
20+
self.parser = email.parser.Parser()
21+
logger.info("Logging in to email server...")
22+
self.mail_server = imaplib.IMAP4_SSL(server, port)
23+
typ, accountDetails = self.mail_server.login(account, password)
24+
if typ != "OK":
25+
logger.error("Not able to sign in!")
26+
raise Exception("Not able to sign in!")
27+
logger.info("Logged in to email server")
28+
29+
result, data = self.mail_server.select(mailbox='"[Gmail]/All Mail"', readonly=True)
30+
logger.info("mailbox contains %s messages" % data[0])
31+
32+
def __enter__(self):
33+
return self
34+
35+
def __exit__(self, exc_type, exc_val, exc_tb):
36+
self.mail_server.logout()
37+
return False
38+
39+
40+
def fetch_mail(self, uid: int) -> Message | None:
41+
result, data = self.mail_server.uid("fetch", str(uid), "(RFC822)")
42+
if result != "OK":
43+
logger.error(f"Error fetching mail with UID {uid}: {data}")
44+
return None
45+
if not data or len(data) < 2:
46+
logger.error(f"No data returned for UID {uid}")
47+
return None
48+
49+
mail_str = data[0][1].decode()
50+
return self.parser.parsestr(mail_str)
51+
52+
def get_mail_ids(self, date: datetime|str) -> list[int]:
53+
# Issue search command of the form "SEARCH UID 42:*"
54+
if isinstance(date, str):
55+
date = datetime.strptime(date, "%d-%b-%Y")
56+
57+
result, data = self.mail_server.uid("search", None, f"(ON {date.strftime('%d-%b-%Y')})")
58+
if result != "OK":
59+
logger.error(f"Error searching for mails on {date}: {data}")
60+
return []
61+
message_uids = list(map(int, data[0].split()))
62+
logger.info(f"Found {len(message_uids)} messages on {date}")
63+
return message_uids
64+
65+
def fetch_mails(self, uuids: list[int]) -> Iterator[Message]:
66+
logger.info(f"Fetching {len(uuids)} mails")
67+
batch_ids = ','.join(map(str, uuids))
68+
result, data = self.mail_server.uid("fetch", batch_ids, "(RFC822)")
69+
logger.info(f"Parsing string to Messages")
70+
if result != "OK":
71+
logger.info(f"Error fetching mails with UIDs {batch_ids}: {data}")
72+
return
73+
if not data or len(data) < 2:
74+
logger.warning(f"No data returned for UIDs {batch_ids}")
75+
return
76+
for response in data:
77+
if isinstance(response, tuple):
78+
mail_str = response[1].decode()
79+
message = self.parser.parsestr(mail_str)
80+
yield message
81+
82+
def fetch_mails_chunked(self, uuids: list[int], chunk_size: int = 100) -> Iterator[Message]:
83+
"""
84+
Fetch mails in chunks of size chunk_size.
85+
"""
86+
for idx in range(0, len(uuids), chunk_size):
87+
message_uid_chunk = uuids[idx:idx + chunk_size]
88+
yield from self.fetch_mails(message_uid_chunk)
89+
90+
def fetch_new_mails(self, last_uid:int|None = None) -> Iterator[Message]:
91+
if last_uid is None:
92+
last_uid = self.last_uid
93+
94+
# Get the first 100 mails on 2025-01-01
95+
result, data = self.mail_server.uid("search", None, f"(ON 01-Apr-2025)")
96+
message_uuids = data[0].decode().split()
97+
len(message_uuids)
98+
99+
# Issue search command of the form "SEARCH UID 42:*"
100+
result, data = self.mail_server.uid("search", None, f"(UID {last_uid}:*)")
101+
message_uids = list(map(int, data[0].split()))
102+
logger.info(f"Found {len(message_uids)} new messages since UID {last_uid}")
103+
104+
for idx in range(0, len(message_uids), self.chunk_size):
105+
message_uid_chunk = message_uids[idx:idx + self.chunk_size]
106+
yield from self.fetch_mails(message_uid_chunk)
107+
108+
109+
@classmethod
110+
def from_config_files(cls, *init_file_path: Path) -> 'GmailClient':
111+
"""
112+
Create a GmailClient instance from configuration files.
113+
114+
The latest path takes precedence over the previous ones.
115+
116+
"""
117+
configuration = ConfigParser()
118+
for file_path in init_file_path:
119+
configuration.read(file_path)
120+
121+
return cls(**configuration["aws"])
122+

src/pypromice/tx/iridium.py

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
import re
2+
from datetime import datetime
3+
from email.message import Message
4+
5+
import attrs
6+
7+
8+
@attrs.define
9+
class IridiumMessage:
10+
"""
11+
Class to represent an Iridium message.
12+
"""
13+
14+
imei: str
15+
momsn: int
16+
mtmsn: int
17+
time_of_session: datetime
18+
payload_filename: str
19+
payload_bytes: bytes | None
20+
21+
# This is the original subject line of the email
22+
# NOTE:
23+
# This is not a part of the Iridium message, but rather a part of the email a layer above
24+
# It is stored here for payload decoding purposes
25+
subject: str | None
26+
27+
28+
def parse_mail(email: Message) -> IridiumMessage:
29+
assert email["From"] == "sbdservice@sbd.iridium.com"
30+
subject = str(email["Subject"])
31+
32+
sbd_raw = None
33+
sdb_dict = None
34+
data_message: bytes | None = None
35+
filename = None
36+
for payload_message in email.get_payload():
37+
filename: str = payload_message.get_filename()
38+
if filename is None:
39+
# There should only be a single payload without a filename
40+
assert sbd_raw is None
41+
sbd_raw = payload_message.get_payload()
42+
sdb_dict = dict(
43+
re.findall(r"^([^:\n]+):\s*([^\n\r]+)", sbd_raw, re.MULTILINE)
44+
)
45+
else:
46+
assert data_message is None
47+
data_message = payload_message.get_payload(decode=True)
48+
49+
assert sdb_dict is not None
50+
assert data_message is not None
51+
assert filename is not None
52+
53+
date_string = sdb_dict["Time of Session (UTC)"]
54+
date_object = datetime.strptime(date_string, "%a %b %d %H:%M:%S %Y")
55+
56+
return IridiumMessage(
57+
imei=filename.split("_")[0],
58+
momsn=int(sdb_dict["MOMSN"]),
59+
mtmsn=int(sdb_dict["MTMSN"]),
60+
time_of_session=date_object,
61+
payload_bytes=data_message,
62+
payload_filename=filename,
63+
subject=subject,
64+
)
Lines changed: 191 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,191 @@
1+
from datetime import datetime
2+
from pathlib import Path
3+
4+
import numpy as np
5+
import pandas as pd
6+
7+
__all__ = [
8+
"decode_payload",
9+
"DecodeError",
10+
]
11+
12+
CR_BASIC_EPOCH_OFFSET = datetime(1990, 1, 1, 0, 0, 0, 0).timestamp()
13+
14+
# Load payload formats from CSV file
15+
payload_formats_path = Path(__file__).parent.joinpath("payload_formats.csv")
16+
payload_formats = pd.read_csv(payload_formats_path, index_col=0)
17+
payload_formats = payload_formats[payload_formats["flags"].values != "don’t use"]
18+
19+
20+
class DecodeError(Exception):
21+
22+
def __init__(
23+
self,
24+
format_index: int | None = None,
25+
format_name: str | None = None,
26+
letter_index: int | None = None,
27+
buffer_index: int | None = None,
28+
message: str | None = None,
29+
):
30+
self.format_index = format_index
31+
self.format_name = format_name
32+
self.letter_index = letter_index
33+
self.buffer_index = buffer_index
34+
self.message = message
35+
36+
def as_dict(self) -> dict:
37+
return {
38+
"format_index": self.format_index,
39+
"format_name": self.format_name,
40+
"letter_index": self.letter_index,
41+
"buffer_index": self.buffer_index,
42+
"message": self.message,
43+
}
44+
45+
def __str__(self):
46+
return repr(self)
47+
48+
def __repr__(self):
49+
variables_str = ", ".join(f"{k}={v}" for k, v in self.as_dict().items())
50+
51+
return f"{self.__class__.__name__}({variables_str})"
52+
53+
54+
def parse_gfp2(buffer: bytes) -> float:
55+
"""Two-bit floating point decoder"""
56+
if len(buffer) < 2:
57+
raise ValueError("Buffer too short for gfp2 decoding")
58+
59+
msb = buffer[0]
60+
lsb = buffer[1]
61+
Csign = -2 * (msb & 128) / 128 + 1
62+
CexpM = (msb & 64) / 64
63+
CexpL = (msb & 32) / 32
64+
Cexp = 2 * CexpM + CexpL - 3
65+
Cuppmant = (
66+
4096 * (msb & 16) / 16
67+
+ 2048 * (msb & 8) / 8
68+
+ 1024 * (msb & 4) / 4
69+
+ 512 * (msb & 2) / 2
70+
+ 256 * (msb & 1)
71+
)
72+
Cnum = Csign * (Cuppmant + lsb) * 10**Cexp
73+
return round(Cnum, 3)
74+
75+
76+
def parse_gli4(buffer: bytes) -> int:
77+
"""Four-bit decoder
78+
79+
Parameters
80+
----------
81+
buffer : bytes
82+
List of four values
83+
84+
Returns
85+
-------
86+
float
87+
Decoded value
88+
"""
89+
if len(buffer) < 4:
90+
raise ValueError("Buffer too short for gli4 decoding")
91+
92+
return int.from_bytes(buffer[:4], byteorder="big", signed=True)
93+
94+
95+
def decode_payload(payload: bytes) -> list:
96+
97+
# The original code also had a special case for imei number 300234064121930. This should be handled differently.
98+
bidx = ord(payload[:1])
99+
100+
# The payload format is determined by the first byte of the payload
101+
if bidx not in payload_formats.index:
102+
raise DecodeError(format_index=bidx, message=f"Unknown payload format: {bidx}")
103+
104+
payload_format = payload_formats.loc[bidx]
105+
_, bin_format, bin_name = payload_format.iloc[:3]
106+
# Note: bin_val is just len(bin_format)
107+
indx = 1 # The first byte is the payload format
108+
dataline: list = []
109+
110+
# Iterate over each token in the bin_format and decode the payload
111+
format_letter_index = None
112+
try:
113+
for format_letter_index, type_letter in enumerate(bin_format):
114+
115+
if type_letter == "f":
116+
nan_values = (8191,)
117+
inf_values = (8190,)
118+
neg_inf_values = -8190, -8191
119+
120+
value = parse_gfp2(payload[indx:])
121+
# Be careful with float and int comparison. Float doesn't guarantee exact values.
122+
# Consider using binary patterns for the special values instead.
123+
if value in nan_values:
124+
dataline.append(np.nan)
125+
elif value in inf_values:
126+
dataline.append(np.inf)
127+
elif value in neg_inf_values:
128+
dataline.append(-np.inf)
129+
else:
130+
dataline.append(value)
131+
indx += 2
132+
continue
133+
134+
if type_letter == "l":
135+
# Encoded as a 4 byte two complement integer
136+
137+
value = parse_gli4(payload[indx:])
138+
139+
# The value 2147450879 seems arbitrary, but 2147483648 is the maximum value for a 4-byte signed integer
140+
nan_values = -2147483648, 2147450879
141+
if value in nan_values:
142+
dataline.append(np.nan)
143+
else:
144+
# TODO: Integers doesn't support NaN values and this will cause mixed types which should be avoided
145+
dataline.append(float(value))
146+
indx += 4
147+
continue
148+
149+
if type_letter == "t":
150+
value = parse_gli4(payload[indx:])
151+
time = datetime.fromtimestamp(CR_BASIC_EPOCH_OFFSET + value)
152+
dataline.append(time)
153+
indx += 4
154+
continue
155+
156+
# GPS time or coordinate encoding
157+
if type_letter in ("g", "n", "e"):
158+
nan_values_fp2 = (8191,)
159+
# Check if byte is a 2-bit NAN. This occurs when the GPS data is not
160+
# available and the logger sends a 2-bytes NAN instead of a 4-bytes value
161+
if np.isnan(parse_gfp2(payload[indx:])) in nan_values_fp2:
162+
# This is a 2-byte NAN
163+
# The GPS data is not available
164+
# We need to skip the next 2 bytes
165+
dataline.append(np.nan)
166+
indx += 2
167+
continue
168+
else:
169+
# This is a 4-byte value
170+
value = parse_gli4(payload[indx:])
171+
172+
if type_letter == "g":
173+
value /= 100.0
174+
else: # type_letter in ('n', 'e'):
175+
value /= 100000.0
176+
177+
dataline.append(value)
178+
indx += 4
179+
continue
180+
181+
raise Exception(f"Unknown type letter: {type_letter}")
182+
183+
184+
# TODO: Compute and validate checksum
185+
if indx != len(payload):
186+
raise Exception(f"Payload length mismatch: {indx} != {len(payload)}")
187+
188+
except Exception as e:
189+
raise DecodeError(bidx, bin_name, format_letter_index, indx) from e
190+
191+
return dataline

src/pypromice/tx/payload_types.csv

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
type_letter,bytes,process,nan_values,inf_values,-inf_values,notes
22
f,2,1,8191,8190,"-8190,-8191",value encoded as 2 bytes base-10 floating point (GFP2)
3-
l,4,1,"-2147483648,2147450879",,,value encoded as 4 bytes two's complement integer (GLI4)
3+
l,4,1,"-2147483648,2147450879",,,value encoded as 4 bytes two's complement integer (GLI4) - note the mac nan value is not a max value
44
t,4,%Y-%m-%d %H:%M:%S,,,,timestamp as seconds since 1990-01-01 00:00:00 +0000 encoded as GLI4
55
g,4,100,,,,GPS time encoded as GLI4
66
n,4,100000,,,,GPS latitude encoded as GLI4

0 commit comments

Comments
 (0)