-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathinfratek_106a.py
More file actions
398 lines (318 loc) · 15.4 KB
/
infratek_106a.py
File metadata and controls
398 lines (318 loc) · 15.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
"""
Infratek 106A / 306A / LEM D4000 Power Analyzer GPIB Driver
Blocking driver operating in single-shot (STOP) mode.
Uses MAV serial poll for query readiness and *OPC? for acquisition complete.
write_termination='\r' is required by the device.
Tested on: IT,306A,0,2.3R6 via GPIB0::5::INSTR
"""
import time
from typing import Literal, Optional
import pyvisa
# ── Literal types for SCPI parameter completion ───────────────────────
Aperture = Literal["100M", "250M", "500M", "1", "2"]
CurrentInput = Literal["IN5", "IN30", "SH"]
SyncSource = Literal["VOLT", "CURR"]
Phase = Literal["L1", "L2", "L3"]
VoltageRange = Literal["Auto", "300M", "1", "3", "10", "30", "100", "300", "1000"]
CurrentRangeIN30 = Literal["Auto", "1", "3", "10", "30", "100"]
CurrentRangeIN5 = Literal["Auto", "15M", "50M", "150M", "500M", "1.5", "5", "15"]
ShuntRange = Literal["Auto", "60M", "200M", "600M", "2", "6"]
DisplayMode = Literal["L1", "L2", "L3", "ALL"]
# ── Aperture lookup ────────────────────────────────────────────────────
APERTURE_SECONDS: dict[Aperture, float] = {
"100M": 0.100,
"250M": 0.250,
"500M": 0.500,
"1": 1.000,
"2": 2.000,
}
class CommandError(Exception):
"""Raised when the device signals a command error via ESB in the status byte."""
class Infratek106A:
"""Blocking driver for Infratek 106A/306A in single-shot (STOP) mode.
Usage:
with Infratek106A() as dev:
dev.configure(current_input="IN30", sync_source="CURR", aperture="250M", phase="L1")
dev.trigger()
v = dev.voltage_rms() # reads L1 (current phase)
v2 = dev.voltage_rms("L2") # switches to L2, then reads
For GUI use: run trigger() and read methods in a worker thread.
"""
_MAV = 0x10 # bit 4: Message Available
_ESB = 0x20 # bit 5: Event Status Bit (summary of ESR via ESE mask)
def __init__(self, resource: str = "GPIB0::5::INSTR", timeout_ms: int = 10000,
warmup_count: int = 1):
self._resource_str = resource
self._timeout_ms = timeout_ms
self._aperture_s: float = 0.250 # default 250ms
self._phase: Phase = "L1" # tracks current FORM:PHAS
self._needs_warmup: bool = True # first trigger discards garbage
self._warmup_count: int = warmup_count
self._rm: Optional[pyvisa.ResourceManager] = None
self._inst: Optional[pyvisa.resources.Resource] = None
# ── Connection ─────────────────────────────────────────────────────
def open(self) -> "Infratek106A":
self._rm = pyvisa.ResourceManager()
self._inst = self._rm.open_resource(
self._resource_str, write_termination="\r"
)
self._inst.timeout = self._timeout_ms
self._init_device()
return self
def close(self):
if self._inst:
self._inst.close()
self._inst = None
if self._rm:
self._rm.close()
self._rm = None
def __enter__(self):
return self.open()
def __exit__(self, *args):
self.close()
def _init_device(self):
"""Reset, configure ESE for CME detection, enter STOP mode."""
self._write("*RST")
time.sleep(1.0)
self._write("*CLS")
time.sleep(0.1)
self._write("*ESE 32") # ESB = CME only
time.sleep(0.05)
self._write("DIS:M ALL")
time.sleep(0.05)
self._write("ACQ:H Stop")
time.sleep(0.3)
# ── Low-level I/O ──────────────────────────────────────────────────
def _write(self, cmd: str):
self._inst.write(cmd)
def _write_acq_batch(self, cmds: list[str]):
"""Send multiple ACQ config commands, then re-enter STOP mode once.
Commands like ACQ:IN, ACQ:SYNC, ACQ:APER implicitly put the
device back into RUN. This batches them with a single ACQ:H Stop.
"""
if not cmds:
return
for cmd in cmds:
self._inst.write(cmd)
time.sleep(1)
self._inst.write("ACQ:H Stop")
time.sleep(0.3)
self._needs_warmup = True
def _wait_mav(self, timeout_s: float = 5.0):
"""Poll STB for MAV."""
deadline = time.perf_counter() + timeout_s
while time.perf_counter() < deadline:
if self._inst.read_stb() & self._MAV:
return
time.sleep(0.005)
raise TimeoutError(f"MAV not set within {timeout_s:.1f}s")
def _query(self, cmd: str, timeout_s: float = 5.0) -> str:
self._inst.write(cmd)
self._wait_mav(timeout_s)
return self._inst.read().strip()
def _query_float(self, cmd: str, timeout_s: float = 5.0) -> float:
return float(self._query(cmd, timeout_s))
# ── Trigger / Acquire ──────────────────────────────────────────────
def trigger(self):
"""*CLS → *TRG → sleep(aperture) → *OPC?. Blocking.
If acquisition settings changed since the last trigger,
warmup_count extra triggers are performed first (results discarded).
"""
if self._needs_warmup:
for _ in range(self._warmup_count):
self._write("*CLS")
self._write("*TRG")
time.sleep(self._aperture_s)
self._query("*OPC?", timeout_s=self._aperture_s + 5.0)
self._needs_warmup = False
self._write("*CLS")
self._write("*TRG")
time.sleep(self._aperture_s)
self._query("*OPC?", timeout_s=self._aperture_s + 5.0)
# ── Configuration ──────────────────────────────────────────────────
def identify(self) -> str:
return self._query("*IDN?")
def configure(
self,
current_input: Optional[CurrentInput] = None,
sync_source: Optional[SyncSource] = None,
aperture: Optional[Aperture] = None,
voltage_range: Optional[VoltageRange] = None,
current_range: Optional[CurrentRangeIN30 | CurrentRangeIN5 | ShuntRange] = None,
phase: Optional[Phase] = None,
warmup_count: Optional[int] = None,
):
"""Configure device settings in one batch.
Only sends commands for parameters that are provided.
All ACQ commands are batched with a single sleep + ACQ:H Stop.
"""
if warmup_count is not None:
self._warmup_count = warmup_count
acq_cmds: list[str] = []
if current_input is not None:
acq_cmds.append(f"ACQ:IN {current_input}")
if sync_source is not None:
acq_cmds.append(f"ACQ:SYNC {sync_source}")
if aperture is not None:
self._aperture_s = APERTURE_SECONDS[aperture]
acq_cmds.append(f"ACQ:APER {aperture}")
if voltage_range is not None:
acq_cmds.append(f"ACQ:RAN:VOLT {voltage_range}")
if current_range is not None:
acq_cmds.append(f"ACQ:RAN:CURR {current_range}")
self._write_acq_batch(acq_cmds)
if phase is not None:
self._write(f"FORM:PH {phase}")
time.sleep(0.05)
self._phase = phase
def error(self) -> int:
return int(self._query("ERR?"))
# ── Phase switching ────────────────────────────────────────────────
def _ensure_phase(self, phase: Optional[Phase]):
"""Switch FORM:PHAS only if different from current."""
if phase is not None and phase != self._phase:
self._write(f"FORM:PH {phase}")
time.sleep(0.05)
self._phase = phase
# ── Read methods (call after trigger) ──────────────────────────────
# Voltage
def voltage_rms(self, phase: Optional[Phase] = None) -> float:
self._ensure_phase(phase)
return self._query_float("VOLT:RMS?")
def voltage_rms_ac(self, phase: Optional[Phase] = None) -> float:
self._ensure_phase(phase)
return self._query_float("VOLT:RMS:AC?")
def voltage_rect(self, phase: Optional[Phase] = None) -> float:
self._ensure_phase(phase)
return self._query_float("VOLT:RECT?")
def voltage_mean(self, phase: Optional[Phase] = None) -> float:
self._ensure_phase(phase)
return self._query_float("VOLT:MEAN?")
def voltage_min(self, phase: Optional[Phase] = None) -> float:
self._ensure_phase(phase)
return self._query_float("VOLT:MIN?")
def voltage_max(self, phase: Optional[Phase] = None) -> float:
self._ensure_phase(phase)
return self._query_float("VOLT:MAX?")
def voltage_peak(self, phase: Optional[Phase] = None) -> float:
self._ensure_phase(phase)
return self._query_float("VOLT:PEAK?")
def voltage_crest(self, phase: Optional[Phase] = None) -> float:
self._ensure_phase(phase)
return self._query_float("VOLT:CREST?")
def voltage_form(self, phase: Optional[Phase] = None) -> float:
self._ensure_phase(phase)
return self._query_float("VOLT:FORM?")
def voltage_thd(self, phase: Optional[Phase] = None) -> float:
self._ensure_phase(phase)
return self._query_float("VOLT:THD?")
# Current
def current_rms(self, phase: Optional[Phase] = None) -> float:
self._ensure_phase(phase)
return self._query_float("CURR:RMS?")
def current_rms_ac(self, phase: Optional[Phase] = None) -> float:
self._ensure_phase(phase)
return self._query_float("CURR:RMS:AC?")
def current_rect(self, phase: Optional[Phase] = None) -> float:
self._ensure_phase(phase)
return self._query_float("CURR:RECT?")
def current_mean(self, phase: Optional[Phase] = None) -> float:
self._ensure_phase(phase)
return self._query_float("CURR:MEAN?")
def current_min(self, phase: Optional[Phase] = None) -> float:
self._ensure_phase(phase)
return self._query_float("CURR:MIN?")
def current_max(self, phase: Optional[Phase] = None) -> float:
self._ensure_phase(phase)
return self._query_float("CURR:MAX?")
def current_peak(self, phase: Optional[Phase] = None) -> float:
self._ensure_phase(phase)
return self._query_float("CURR:PEAK?")
def current_crest(self, phase: Optional[Phase] = None) -> float:
self._ensure_phase(phase)
return self._query_float("CURR:CREST?")
def current_form(self, phase: Optional[Phase] = None) -> float:
self._ensure_phase(phase)
return self._query_float("CURR:FORM?")
def current_thd(self, phase: Optional[Phase] = None) -> float:
self._ensure_phase(phase)
return self._query_float("CURR:THD?")
def current_int(self, phase: Optional[Phase] = None) -> float:
"""Average charge (short-time integration)."""
self._ensure_phase(phase)
return self._query_float("CURR:RECT:INT?")
# Power
def power_active(self, phase: Optional[Phase] = None) -> float:
self._ensure_phase(phase)
return self._query_float("POW:ACT?")
def power_apparent(self, phase: Optional[Phase] = None) -> float:
self._ensure_phase(phase)
return self._query_float("POW:APP?")
def power_reactive(self, phase: Optional[Phase] = None) -> float:
self._ensure_phase(phase)
return self._query_float("POW:REA?")
def power_factor(self, phase: Optional[Phase] = None) -> float:
self._ensure_phase(phase)
return self._query_float("POW:FAC?")
def power_active_ac(self, phase: Optional[Phase] = None) -> float:
self._ensure_phase(phase)
return self._query_float("POW:ACT:AC?")
def power_apparent_ac(self, phase: Optional[Phase] = None) -> float:
self._ensure_phase(phase)
return self._query_float("POW:APP:AC?")
def power_reactive_ac(self, phase: Optional[Phase] = None) -> float:
self._ensure_phase(phase)
return self._query_float("POW:REA:AC?")
def power_factor_ac(self, phase: Optional[Phase] = None) -> float:
self._ensure_phase(phase)
return self._query_float("POW:FAC:AC?")
# Energy
def energy_active(self, phase: Optional[Phase] = None) -> float:
self._ensure_phase(phase)
return self._query_float("EN:ACT?")
def energy_apparent(self, phase: Optional[Phase] = None) -> float:
self._ensure_phase(phase)
return self._query_float("EN:APP?")
def energy_reset(self):
self._write("EN:RES")
def current_reset(self):
"""Reset charge integration values."""
self._write("CURR:RES")
# Frequency
def frequency(self, phase: Optional[Phase] = None) -> float:
self._ensure_phase(phase)
return self._query_float("FREQ?")
# ── FFT harmonics ─────────────────────────────────────────────────
def _query_fft(self, cmd: str, start: int, end: int,
phase: Optional[Phase] = None) -> list[float]:
"""Set FORM:START/END, query FFT, then read remaining values with '?'."""
self._ensure_phase(phase)
self._write(f"FORM:START {start}")
self._write(f"FORM:END {end}")
count = end - start + 1
values: list[float] = []
# First value comes from the actual command
values.append(self._query_float(cmd))
# Remaining values come from consecutive '?' queries
for _ in range(count - 1):
values.append(self._query_float("?"))
return values
def voltage_fft(self, start: int = 1, end: int = 50,
phase: Optional[Phase] = None) -> list[float]:
"""Voltage harmonics from start to end (1–99)."""
return self._query_fft("VOLT:FFT?", start, end, phase)
def current_fft(self, start: int = 1, end: int = 50,
phase: Optional[Phase] = None) -> list[float]:
"""Current harmonics from start to end (1–99)."""
return self._query_fft("CURR:FFT?", start, end, phase)
def power_fft(self, start: int = 1, end: int = 50,
phase: Optional[Phase] = None) -> list[float]:
"""Power harmonics from start to end (1–99)."""
return self._query_fft("POW:FFT?", start, end, phase)
def analog_inputs(self, start: int = 0, end: int = 7) -> list[float]:
"""Read analog input channels (0–7)."""
return self._query_fft("AINP?", start, end)
# Acquisition quality
def acquisition_quality(self) -> int:
"""Returns overload/underload bitmask."""
return int(self._query("ACQ:QU?"))