-
Notifications
You must be signed in to change notification settings - Fork 66
Expand file tree
/
Copy path01-improve-retries.patch
More file actions
158 lines (140 loc) · 6.88 KB
/
01-improve-retries.patch
File metadata and controls
158 lines (140 loc) · 6.88 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
Author: paxx12 <245230251+paxx12@users.noreply.github.com>
Date: Sun Mar 29 23:16:55 2026 +0200
Refactor retry logic to use retry flag instead of counter
diff --git a/src/runtime.py b/src/runtime.py
index 9bf4b8a..dd014f2 100644
--- a/runtime.py
+++ b/runtime.py
@@ -56,72 +56,70 @@ class Runtime:
for i, reader in enumerate(self.rfid_readers):
if self.config.auto_read_mode or self.read_retries_left[i] > 0:
logging.debug(f"Processing reader {reader.name}, retries left: {self.read_retries_left[i]}")
- result = self.process_reader_single(reader)
- self.read_retries_left[i] -= 1
- scan_result = None
-
- if result is not None:
- scan_result, filament = result
-
- for exporter in self.detection_exporters:
- exporter.export_data(scan_result, filament, reader)
-
- if filament is not None:
- logging.info(f"Processed tag with UID {scan_result.uid.hex().upper()} from reader {reader.name}")
-
- for exporter in self.success_exporters:
- exporter.export_data(scan_result, filament, reader)
+ scan_result, filament, retry = self.process_reader_single(reader)
+ if not retry:
self.read_retries_left[i] = 0
-
elif self.read_retries_left[i] <= 0 and not self.config.auto_read_mode:
logging.warning(f"Failed to read from reader {reader.name}, no retries left")
- self.read_retries_left[i] = 0
+ else:
+ self.read_retries_left[i] -= 1
+ logging.info(f"Retrying read for reader {reader.name}, retries left: {self.read_retries_left[i]}")
+ continue
+
+ if scan_result:
+ for exporter in self.detection_exporters:
+ exporter.export_data(scan_result, filament, reader)
- for exporter in self.error_exporters:
- exporter.export_data(scan_result, None, reader)
+ if filament:
+ logging.info(f"Processed tag with UID {scan_result.uid.hex().upper()}")
+
+ for exporter in self.success_exporters:
+ exporter.export_data(scan_result, filament, reader)
time.sleep(self.config.read_interval_seconds)
- def process_reader_single(self, reader: RfidReader) -> tuple[ScanResult, GenericFilament|None]|None:
+ def process_reader_single(self, reader: RfidReader) -> tuple[ScanResult|None, GenericFilament|None, bool]:
reader.start_session()
scan_result = reader.scan()
reader.end_session()
if scan_result is None:
logging.debug("No tag detected")
- return
+ return None, None, True
uid = scan_result.uid.hex()
if self.config.auto_read_mode and reader.is_same_tag(uid):
logging.debug("Same tag detected as last read, skipping processing")
- return
+ return None, None, False
logging.info(f"Detected tag type {scan_result.tag_type.name} with UID {scan_result.uid.hex().upper()}")
filament = None
+ retry = False
if scan_result.tag_type == TagType.MifareClassic1k and isinstance(reader, MifareClassicReader):
- filament = self.process_mifare_classic(reader, scan_result)
+ filament, retry = self.process_mifare_classic(reader, scan_result)
elif scan_result.tag_type == TagType.MifareUltralight and isinstance(reader, MifareUltralightReader):
- filament = self.process_mifare_ultralight(reader, scan_result)
-
- if filament is None:
- logging.warning("Failed to read data from tag")
- return (scan_result, None)
-
- logging.info(filament.pretty_text())
-
- reader.set_last_read_uid(uid)
- return (scan_result, filament)
+ filament, retry = self.process_mifare_ultralight(reader, scan_result)
+
+ if filament:
+ logging.info(filament.pretty_text())
+ reader.set_last_read_uid(uid)
+ else:
+ logging.warning(f"Failed to read data from tag.")
+
+ return (scan_result, filament, retry)
+
+ def process_mifare_classic(self, reader: MifareClassicReader, scan_result: ScanResult) -> tuple[GenericFilament | None, bool]:
+ retry = False
- def process_mifare_classic(self, reader: MifareClassicReader, scan_result: ScanResult) -> GenericFilament | None:
for processor in self.mifare_classic_processors:
reader.start_session()
if reader.scan() == None:
logging.warning("Tag lost before reading")
reader.end_session()
- continue
+ return None, True
logging.debug(f"Attempting to read with processor: {processor.name}")
auth = processor.authenticate_tag(scan_result)
@@ -136,20 +134,20 @@ class Runtime:
if card_data is not None:
logging.debug(f"Read MIFARE Classic card data: {card_data.hex().upper()}")
- return processor.process_tag(scan_result, card_data)
+ return processor.process_tag(scan_result, card_data), False
else:
logging.warning("Failed to read MIFARE Classic card data")
- continue
+ retry = True
- return None
+ return None, retry
- def process_mifare_ultralight(self, reader: MifareUltralightReader, scan_result: ScanResult) -> GenericFilament | None:
+ def process_mifare_ultralight(self, reader: MifareUltralightReader, scan_result: ScanResult) -> tuple[GenericFilament | None, bool]:
reader.start_session()
if reader.scan() == None:
logging.warning("Tag lost before reading")
reader.end_session()
- return None
+ return None, True
card_data = reader.read_mifare_ultralight(scan_result)
reader.end_session()
@@ -160,10 +158,10 @@ class Runtime:
for processor in self.mifare_ultralight_processors:
logging.debug(f"Attempting to read with processor: {processor.name}")
filament = processor.process_tag(scan_result, card_data)
-
if filament is not None:
- return filament
+ return filament, False
+
+ return None, False
else:
logging.warning("Failed to read MIFARE Ultralight card data")
-
- return None
\ No newline at end of file
+ return None, True
\ No newline at end of file