-
Notifications
You must be signed in to change notification settings - Fork 355
Expand file tree
/
Copy pathATS.py
More file actions
934 lines (794 loc) · 34.4 KB
/
ATS.py
File metadata and controls
934 lines (794 loc) · 34.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
from __future__ import annotations
import ctypes
import logging
import sys
import time
import warnings
from contextlib import contextmanager
from typing import TYPE_CHECKING, Any, Generic, TypeVar, cast
import numpy as np
import numpy.typing as npt
from qcodes.instrument import Instrument, InstrumentBaseKWArgs
from .ats_api import AlazarATSAPI
from .constants import NUMBER_OF_CHANNELS_FROM_BYTE_REPR, max_buffer_size
from .helpers import CapabilityHelper
from .utils import TraceParameter
if TYPE_CHECKING:
from collections.abc import Generator, Sequence
from typing import Unpack
logger = logging.getLogger(__name__)
OutputType = TypeVar("OutputType")
CtypesTypes = (
type[ctypes.c_uint8]
| type[ctypes.c_uint16]
| type[ctypes.c_uint32]
| type[ctypes.c_int32]
| type[ctypes.c_float]
)
class AlazarTechATS(Instrument):
"""
This is the BaseClass for the qcodes drivers for Alazar data acquisition cards.
This should not be instantiated directly, but should be subclassed for a specific
card should be used.
Args:
name: name for this instrument
system_id: target system id for this board
board_id: target board id within the system for this board
dll_path: path to the ATS driver dll library file
api: AlazarATSAPI interface, defaults to the dll api. This argument
makes it possible to provide another api, e.g. for a simulated
driver for which the binary Alazar drivers do not need to be
installed.
"""
# override dll_path in your init script or in the board constructor
# if you have it somewhere else
dll_path = "C:\\WINDOWS\\System32\\ATSApi"
api: AlazarATSAPI
# override channels in a subclass if needed
channels = 2
@classmethod
def find_boards(cls, dll_path: str | None = None) -> list[dict[str, Any]]:
"""
Find connected Alazar boards
Args:
dll_path: path to the Alazar API DLL library
Returns:
list of board info dictionaries for each connected board
"""
api = AlazarATSAPI(dll_path or cls.dll_path)
system_count = api.num_of_systems()
boards = []
for system_id in range(1, system_count + 1):
board_count = api.boards_in_system_by_system_id(system_id)
for board_id in range(1, board_count + 1):
boards.append(cls.get_board_info(api, system_id, board_id))
return boards
@classmethod
def get_board_info(
cls, api: AlazarATSAPI, system_id: int, board_id: int
) -> dict[str, str | int]:
"""
Get the information from a connected Alazar board
Args:
api: An AlazarATSAPI that wraps around the CTypes CDLL
system_id: id of the Alazar system
board_id: id of the board within the alazar system
Return:
Dictionary containing
- system_id
- board_id
- board_kind (as string)
- max_samples
- bits_per_sample
"""
# make a temporary instrument for this board, to make it easier
# to get its info
board = cls(
f"alazar_temp_{time.perf_counter_ns()}",
system_id=system_id,
board_id=board_id,
)
handle = board._handle
board_model = api.get_board_model(handle)
max_s, bps = api.get_channel_info_(handle)
board.close()
return {
"system_id": system_id,
"board_id": board_id,
"board_kind": board_model,
"max_samples": max_s,
"bits_per_sample": bps,
}
def __init__(
self,
name: str,
system_id: int = 1,
board_id: int = 1,
dll_path: str | None = None,
api: AlazarATSAPI | None = None,
**kwargs: Unpack[InstrumentBaseKWArgs],
) -> None:
super().__init__(name, **kwargs)
self.api = api or AlazarATSAPI(dll_path or self.dll_path)
self._parameters_synced = False
self._handle = self.api.get_board_by_system_id(system_id, board_id)
if not self._handle:
raise Exception(
f"AlazarTech_ATS not found at system {system_id}, board {board_id}"
)
self.capability = CapabilityHelper(self.api, self._handle)
self.buffer_list: list[Buffer] = []
def get_idn(self) -> dict[str, str | int | None]: # type: ignore[override]
# TODO return type is inconsistent with the super class. We should consider
# if ints and floats are allowed as values in the dict
"""
This methods gets the most relevant information of this instrument
The firmware version reported should match the version number of
downloadable fw files from AlazarTech. But note that the firmware
version has often been found to be incorrect for several firmware
versions. At the time of writing it is known to be correct for the
9360 (v 21.07) and 9373 (v 30.04) but incorrect for several earlier
versions. In Alazar DSO this is reported as FPGA Version.
Returns:
Dictionary containing
- 'firmware': as string
- 'model': as string
- 'serial': board serial number
- 'vendor': 'AlazarTech'
- 'CPLD_version': version of the CPLD
- 'driver_version': version of the driver dll
- 'SDK_version': version of the SDK
- 'latest_cal_date': date of the latest calibration (as string)
- 'memory_size': size of the memory in samples
- 'asopc_type': type of asopc (as decimal number)
- 'pcie_link_speed': the speed of a single pcie link (in GB/s)
- 'pcie_link_width': number of pcie links
- 'bits_per_sample': number of bits per one sample
- 'max_samples': board memory size in samples
"""
max_s, bps = self.api.get_channel_info_(self._handle)
pcie_link_speed = str(self.capability.query_pcie_link_speed()) + "GB/s"
return {
"firmware": self.capability.query_firmware_version(),
"model": self.api.get_board_model(self._handle),
"max_samples": max_s,
"bits_per_sample": bps,
"serial": self.capability.query_serial(),
"vendor": "AlazarTech",
"CPLD_version": self.api.get_cpld_version_(self._handle),
"driver_version": self.api.get_driver_version_(),
"SDK_version": self.api.get_sdk_version_(),
"latest_cal_date": self.capability.query_latest_calibration(),
"memory_size": str(self.capability.query_memory_size()),
"asopc_type": self.capability.query_asopc_type(),
"pcie_link_speed": pcie_link_speed,
"pcie_link_width": str(self.capability.query_pcie_link_width()),
}
@contextmanager
def syncing(self) -> Generator[None, None, None]:
"""
Context manager for syncing settings to Alazar card. It will
automatically call sync_settings_to_card at the end of the
context.
Example:
This is intended to be used around multiple parameter sets
to ensure syncing is done exactly once::
with alazar.syncing():
alazar.trigger_source1('EXTERNAL')
alazar.trigger_level1(100)
"""
yield
self.sync_settings_to_card()
def sync_settings_to_card(self) -> None:
"""
Syncs all parameters to Alazar card
"""
if self.clock_source() == "EXTERNAL_CLOCK_10MHz_REF":
sample_rate = self.external_sample_rate
if self.external_sample_rate() == "UNDEFINED":
raise RuntimeError(
"Using external 10 MHz Ref but external sample_rate is not set"
)
if self.sample_rate() != "UNDEFINED":
warnings.warn(
"Using external 10 MHz Ref but parameter sample_"
"rate is set. This will have no effect and "
"is ignored"
)
# mark the unused parameter as up to date
self.sample_rate._set_updated()
else:
if self.sample_rate() == "UNDEFINED":
raise RuntimeError(
"Using Internal clock but parameter sample_rate is not set"
)
if self.external_sample_rate() != "UNDEFINED":
warnings.warn(
"Using Internal clock but parameter external_sample_rate is set."
"This will have no effect and is ignored"
)
# mark the unused parameter as up to date
self.external_sample_rate._set_updated()
sample_rate = self.sample_rate
self.api.set_capture_clock(
self._handle,
self.clock_source,
sample_rate,
self.clock_edge,
self.decimation,
)
for i in range(1, self.channels + 1):
self.api.input_control(
self._handle,
2 ** (i - 1),
self.parameters["coupling" + str(i)],
self.parameters["channel_range" + str(i)],
self.parameters["impedance" + str(i)],
)
if self.parameters.get("bwlimit" + str(i), None) is not None:
self.api.set_bw_limit(
self._handle, 2 ** (i - 1), self.parameters["bwlimit" + str(i)]
)
self.api.set_trigger_operation(
self._handle,
self.trigger_operation,
self.trigger_engine1,
self.trigger_source1,
self.trigger_slope1,
self.trigger_level1,
self.trigger_engine2,
self.trigger_source2,
self.trigger_slope2,
self.trigger_level2,
)
self.api.set_external_trigger(
self._handle, self.external_trigger_coupling, self.external_trigger_range
)
self.api.set_trigger_delay(self._handle, self.trigger_delay)
self.api.set_trigger_time_out(self._handle, self.timeout_ticks)
self.api.configure_aux_io(self._handle, self.aux_io_mode, self.aux_io_param)
self._parameters_synced = True
def allocate_and_post_buffer(
self, sample_type: CtypesTypes, n_bytes: int
) -> Buffer:
buffer = Buffer(sample_type, n_bytes)
self.api.post_async_buffer(
self._handle, ctypes.cast(buffer.addr, ctypes.c_void_p), buffer.size_bytes
)
return buffer
def acquire( # noqa: D417 (missing args documentation)
self,
mode: str | None = None,
samples_per_record: int | None = None,
records_per_buffer: int | None = None,
buffers_per_acquisition: int | None = None,
channel_selection: str | None = None,
transfer_offset: int | None = None,
external_startcapture: str | None = None,
enable_record_headers: str | None = None,
alloc_buffers: str | None = None,
fifo_only_streaming: str | None = None,
interleave_samples: str | None = None,
get_processed_data: str | None = None,
allocated_buffers: int | None = None,
buffer_timeout: int | None = None,
acquisition_controller: AcquisitionController[OutputType] | None = None,
) -> OutputType:
"""
Perform a single acquisition with the Alazar board, and set certain
parameters to the appropriate values
for the parameters, see the ATS-SDK programmer's guide
Args:
mode:
samples_per_record:
records_per_buffer:
buffers_per_acquisition:
channel_selection:
transfer_offset:
external_startcapture:
enable_record_headers:
alloc_buffers:
fifo_only_streaming:
interleave_samples:
get_processed_data:
allocated_buffers:
buffer_timeout:
acquisition_controller: An instance of an acquisition controller
that handles the dataflow of an acquisition
Returns:
Whatever is given by acquisition_controller.post_acquire method
"""
if acquisition_controller is None:
raise RuntimeError("Cannot call acquire without an acquisition_controller")
# region set parameters from args
start_func = time.perf_counter()
if self._parameters_synced is False:
raise RuntimeError(
"You must sync parameters to Alazar card "
"before calling acquire by calling "
"sync_settings_to_card"
)
self._set_if_present("mode", mode)
self._set_if_present("samples_per_record", samples_per_record)
self._set_if_present("records_per_buffer", records_per_buffer)
self._set_if_present("buffers_per_acquisition", buffers_per_acquisition)
self._set_if_present("channel_selection", channel_selection)
self._set_if_present("transfer_offset", transfer_offset)
self._set_if_present("external_startcapture", external_startcapture)
self._set_if_present("enable_record_headers", enable_record_headers)
self._set_if_present("alloc_buffers", alloc_buffers)
self._set_if_present("fifo_only_streaming", fifo_only_streaming)
self._set_if_present("interleave_samples", interleave_samples)
self._set_if_present("get_processed_data", get_processed_data)
self._set_if_present("allocated_buffers", allocated_buffers)
self._set_if_present("buffer_timeout", buffer_timeout)
# endregion
mode = self.mode.get()
if mode not in ("TS", "NPT"):
raise Exception(
"Only the 'TS' and 'NPT' modes are implemented at this point"
)
# -----set final configurations-----
buffers_per_acquisition = cast("int", self.buffers_per_acquisition())
samples_per_record = cast("int", self.samples_per_record())
records_per_buffer = cast("int", self.records_per_buffer())
# bits per sample
_, bits_per_sample = self.api.get_channel_info_(self._handle)
# channels
channels_binrep = self.channel_selection.raw_value
number_of_channels = self.get_num_channels(channels_binrep)
# In the following we need to consider the size of the buffer
# in two different scenarios as several Alazar cards have sample sizes
# that are in fractions of bytes. (such as 12 bits).
# We are transferring data padded to
# whole bytes. I.e a sample of 12 bits will take up 16 bits when
# transferred so we are allocating buffers of that size.
# However, when calculating internal limitations on the card we are
# using the fractional sizes of samples
# number of bytes per sample rounded up to the nearest integer
whole_bytes_per_sample = (bits_per_sample + 7) // 8
transfer_record_size = whole_bytes_per_sample * samples_per_record
transfer_buffer_size = (
transfer_record_size * records_per_buffer * number_of_channels
)
sample_type: type[ctypes.c_uint16 | ctypes.c_uint8] = (
ctypes.c_uint16 if whole_bytes_per_sample > 1 else ctypes.c_uint8
)
internal_buffer_size_requested = (
bits_per_sample * samples_per_record * records_per_buffer
) // 8
if mode == "TS":
transfer_buffer_size //= buffers_per_acquisition
internal_buffer_size_requested //= buffers_per_acquisition
if internal_buffer_size_requested > max_buffer_size:
raise RuntimeError(
f"Requested a buffer of size: "
f"{internal_buffer_size_requested / 1024**2}"
f" MB. The maximum supported size is "
f"{max_buffer_size / 1024**2} MB "
f"(recommended is <8MB)."
)
# Set record size for NPT mode
if mode == "NPT":
pretriggersize = 0 # pretriggersize is 0 for NPT always
post_trigger_size = samples_per_record
self.api.set_record_size(self._handle, pretriggersize, post_trigger_size)
# set acquisition parameters here for NPT, TS mode
samples_per_buffer = 0
acquire_flags = (
self.mode.raw_value
| self.external_startcapture.raw_value
| self.enable_record_headers.raw_value
| self.alloc_buffers.raw_value
| self.fifo_only_streaming.raw_value
| self.interleave_samples.raw_value
| self.get_processed_data.raw_value
)
if mode == "NPT":
records_per_acquisition = records_per_buffer * buffers_per_acquisition
self.api.before_async_read(
self._handle,
self.channel_selection.raw_value,
self.transfer_offset.raw_value,
samples_per_record,
records_per_buffer,
records_per_acquisition,
acquire_flags,
)
elif mode == "TS":
if samples_per_record % buffers_per_acquisition != 0:
self.log.warning(
"buffers_per_acquisition is not a divisor "
"of samples per record which it should be "
"in TS mode, rounding down in samples per "
"buffer calculation"
)
samples_per_buffer = int(samples_per_record / buffers_per_acquisition)
if self.records_per_buffer() != 1:
self.log.warning(
"records_per_buffer should be 1 in TS mode, defauling to 1"
)
self.records_per_buffer.set(1)
records_per_buffer = cast("int", self.records_per_buffer())
self.api.before_async_read(
self._handle,
self.channel_selection.raw_value,
self.transfer_offset.raw_value,
samples_per_buffer,
records_per_buffer,
buffers_per_acquisition,
acquire_flags,
)
self.clear_buffers()
# make sure that allocated_buffers <= buffers_per_acquisition
allocated_buffers = cast("int", self.allocated_buffers())
buffers_per_acquisition = cast("int", self.buffers_per_acquisition())
if allocated_buffers > buffers_per_acquisition:
self.log.warning(
"'allocated_buffers' should be <= "
"'buffers_per_acquisition'. Defaulting "
"'allocated_buffers' to "
f"{buffers_per_acquisition}"
)
self.allocated_buffers.set(buffers_per_acquisition)
allocated_buffers = cast("int", self.allocated_buffers())
buffer_recycling = buffers_per_acquisition > allocated_buffers
# post buffers to Alazar
try:
for _ in range(allocated_buffers):
buf = self.allocate_and_post_buffer(sample_type, transfer_buffer_size)
self.buffer_list.append(buf)
# -----start capture here-----
acquisition_controller.pre_start_capture()
start = time.perf_counter() # Keep track of when acquisition started
# call the startcapture method
self.api.start_capture(self._handle)
acquisition_controller.pre_acquire()
# buffer handling from acquisition
buffers_completed = 0
bytes_transferred = 0
buffer_timeout = cast("int", self.buffer_timeout())
done_setup = time.perf_counter()
while buffers_completed < self.buffers_per_acquisition.get():
# Wait for the buffer at the head of the list of available
# buffers to be filled by the board.
buf = self.buffer_list[buffers_completed % allocated_buffers]
self.api.wait_async_buffer_complete(
self._handle, ctypes.cast(buf.addr, ctypes.c_void_p), buffer_timeout
)
acquisition_controller.buffer_done_callback(buffers_completed)
# if buffers must be recycled, extract data and repost them
# otherwise continue to next buffer
if buffer_recycling:
acquisition_controller.handle_buffer(buf.buffer, buffers_completed)
self.api.post_async_buffer(
self._handle,
ctypes.cast(buf.addr, ctypes.c_void_p),
buf.size_bytes,
)
buffers_completed += 1
bytes_transferred += buf.size_bytes
finally:
# stop measurement here
done_capture = time.perf_counter()
self.api.abort_async_read(self._handle)
time_done_abort = time.perf_counter()
# -----cleanup here-----
# extract data if not yet done
if not buffer_recycling:
for i, buf in enumerate(self.buffer_list):
acquisition_controller.handle_buffer(buf.buffer, i)
time_done_handling = time.perf_counter()
# free up memory
self.clear_buffers()
time_done_free_mem = time.perf_counter()
# check if all parameters are up to date
# Getting IDN is very slow so skip that
for _, p in self.parameters.items():
if isinstance(p, TraceParameter):
if p.synced_to_card is False:
raise RuntimeError(
f"TraceParameter {p} not synced to "
f"Alazar card detected. Aborting. Data "
f"may be corrupt"
)
# Compute the total transfer time, and display performance information.
end_time = time.perf_counter()
tot_time = end_time - start_func
transfer_time_sec = end_time - start
presetup_time = start - start_func
setup_time = done_setup - start
capture_time = done_capture - done_setup
abort_time = time_done_abort - done_capture
handling_time = time_done_handling - time_done_abort
free_mem_time = time_done_free_mem - time_done_handling
buffers_per_sec: float = 0
bytes_per_sec: float = 0
records_per_sec: float = 0
if transfer_time_sec > 0:
buffers_per_sec = buffers_completed / transfer_time_sec
bytes_per_sec = bytes_transferred / transfer_time_sec
records_per_sec = records_per_buffer * buffers_completed / transfer_time_sec
if self.log.isEnabledFor(logging.DEBUG):
self.log.debug(
"Captured %d buffers (%f buffers per sec)",
buffers_completed,
buffers_per_sec,
)
self.log.debug(
"Captured %d records (%f records per sec)",
records_per_buffer * buffers_completed,
records_per_sec,
)
self.log.debug(
"Transferred %d bytes (%f bytes per sec)",
bytes_transferred,
bytes_per_sec,
)
self.log.debug(f"Pre setup took {presetup_time}")
self.log.debug(f"Pre capture setup took {setup_time}")
self.log.debug(f"Capture took {capture_time}")
self.log.debug(f"abort took {abort_time}")
self.log.debug(f"handling took {handling_time}")
self.log.debug(f"free mem took {free_mem_time}")
self.log.debug(f"tot acquire time is {tot_time}")
# return result
return acquisition_controller.post_acquire()
def _set_if_present(self, param_name: str, value: str | float | None) -> None:
if value is not None:
parameter = self.parameters[param_name]
parameter.set(value)
def _set_list_if_present(
self, param_base: str, value: Sequence[str | float]
) -> None:
if value is not None:
for i, v in enumerate(value):
parameter = self.parameters[param_base + str(i + 1)]
parameter.set(v)
def clear_buffers(self) -> None:
"""
This method uncommits all buffers that were committed by the driver.
This method only has to be called when the acquistion crashes, otherwise
the driver will uncommit the buffers itself
Returns:
None
"""
for b in self.buffer_list:
b.free_mem()
self.log.debug("buffers cleared")
self.buffer_list = []
def signal_to_volt(self, channel: int, signal: float) -> float:
"""
Convert a value from a buffer to an actual value in volts based on the
ranges of the channel
Args:
channel: number of the channel where the signal value came from
signal: the value that needs to be converted
Returns:
the corresponding value in volts
"""
return ((signal - 127.5) / 127.5) * (
self.parameters["channel_range" + str(channel)].get()
)
def get_sample_rate(self, include_decimation: bool = True) -> float:
"""
Obtain the effective sampling rate of the acquisition
based on clock speed and decimation
Returns:
the number of samples (per channel) per second
"""
if (
self.clock_source.get() == "EXTERNAL_CLOCK_10MHz_REF"
and "external_sample_rate" in self.parameters
):
rate = self.external_sample_rate.get()
# if we are using an external ref clock the sample rate
# is set as an integer and not value mapped so we use a different
# parameter to represent it
elif self.sample_rate.get() == "EXTERNAL_CLOCK":
raise Exception(
"External clock is used, alazar driver "
"could not determine sample speed."
)
else:
rate = self.sample_rate.get()
if rate == "1GHz_REFERENCE_CLOCK":
rate = 1e9
if include_decimation:
decimation = self.decimation.get()
else:
decimation = 0
if decimation > 0:
return rate / decimation
else:
return rate
@staticmethod
def get_num_channels(byte_rep: int) -> int:
"""
Return the number of channels for a specific channel mask
Each single channel is represented by a bitarray with one
non zero entry i.e. powers of two. All multichannel masks can be
constructed by summing the single channel ones. However, not all
configurations are supported. See table 4 Input Channel Configurations
on page 241 of the Alazar SDK manual. This contains the complete
mapping for all current Alazar cards. It's left to the driver to
ensure that only the ones supported for a specific card can be
selected
"""
n_ch = NUMBER_OF_CHANNELS_FROM_BYTE_REPR.get(byte_rep, None)
if n_ch is None:
raise RuntimeError(f"Invalid channel configuration {byte_rep!r} supplied")
return n_ch
def _read_register(self, offset: int) -> int:
return self.api.read_register_(self._handle, offset)
def _write_register(self, offset: int, value: int) -> None:
self.api.write_register_(self._handle, offset, value)
def _setup_ctypes_for_windll_lib_functions() -> None:
"""
Set up ``argtypes`` and ``restype`` for functions from ``ctypes.windll``
libraries, which are used in this module.
"""
if sys.platform == "win32":
ctypes.windll.kernel32.VirtualAlloc.argtypes = [
ctypes.c_void_p,
ctypes.c_long,
ctypes.c_long,
ctypes.c_long,
]
ctypes.windll.kernel32.VirtualAlloc.restype = ctypes.c_void_p
ctypes.windll.kernel32.VirtualFree.argtypes = [
ctypes.c_void_p,
ctypes.c_long,
ctypes.c_long,
]
ctypes.windll.kernel32.VirtualFree.restype = ctypes.c_int
_setup_ctypes_for_windll_lib_functions()
class Buffer:
"""Buffer suitable for DMA transfers.
AlazarTech digitizers use direct memory access (DMA) to transfer
data from digitizers to the computer's main memory. This class
abstracts a memory buffer on the host, and ensures that all the
requirements for DMA transfers are met.
Buffer export a 'buffer' member, which is a NumPy array view
of the underlying memory buffer
Args:
c_sample_type: The datatype of the buffer to create. Should be a valid
ctypes type.
size_bytes: The size of the buffer to allocate, in bytes.
"""
def __init__(self, c_sample_type: CtypesTypes, size_bytes: int):
self.size_bytes = size_bytes
self.buffer: npt.NDArray
bytes_per_sample = {
ctypes.c_uint8: 1,
ctypes.c_uint16: 2,
ctypes.c_uint32: 4,
ctypes.c_int32: 4,
ctypes.c_float: 4,
}.get(c_sample_type, 0)
self._allocated = True
if sys.platform == "win32":
MEM_COMMIT = 0x1000
PAGE_READWRITE = 0x4
self.addr = ctypes.windll.kernel32.VirtualAlloc(
0, ctypes.c_long(size_bytes), MEM_COMMIT, PAGE_READWRITE
)
else:
self._allocated = True
ctypes_array = (c_sample_type * (size_bytes // bytes_per_sample))()
self.addr = ctypes.addressof(ctypes_array)
ctypes_array = (c_sample_type * (size_bytes // bytes_per_sample)).from_address(
self.addr
)
self.buffer = np.ctypeslib.as_array(ctypes_array)
self.ctypes_buffer = ctypes_array
def free_mem(self) -> None:
"""
Uncommit memory allocated with this buffer object
"""
self._allocated = False
if sys.platform == "win32":
MEM_RELEASE = 0x8000
ctypes.windll.kernel32.VirtualFree(
ctypes.c_void_p(self.addr), 0, MEM_RELEASE
)
def __del__(self) -> None:
"""
If python garbage collects this object, __del__ should be called and it
is the last chance to uncommit the memory to prevent a memory leak.
This method is not very reliable so users should not rely on this
functionality
"""
if self._allocated:
self.free_mem()
logger.warning(
"Buffer prevented memory leak; Memory released to Windows.\n"
"Memory should have been released before buffer was deleted."
)
class AcquisitionInterface(Generic[OutputType]):
"""
This class represents all choices that the end-user has to make regarding
the data-acquisition. this class should be subclassed to program these
choices.
The basic structure of an acquisition is:
- Call to :meth:`AlazarTech_ATS.acquire` internal configuration
- Call to :meth:`AcquisitionInterface.pre_start_capture`
- Call to the start capture of the Alazar board
- Call to :meth:`AcquisitionInterface.pre_acquire`
- Loop over all buffers that need to be acquired
dump each buffer to acquisitioncontroller.handle_buffer
(only if buffers need to be recycled to finish the acquisiton)
- Dump remaining buffers to :meth:`AcquisitionInterface.handle_buffer`
alazar internals
- Return return value from :meth:`AcquisitionController.post_acquire`
"""
def pre_start_capture(self) -> None:
"""
Use this method to prepare yourself for the data acquisition
The Alazar instrument will call this method right before
'AlazarStartCapture' is called
"""
pass
def pre_acquire(self) -> None:
"""
This method is called immediately after 'AlazarStartCapture' is called
"""
pass
def handle_buffer(
self, buffer: npt.NDArray, buffer_number: int | None = None
) -> None:
"""
This method should store or process the information that is contained
in the buffers obtained during the acquisition.
Args:
buffer: np.array with the data from the Alazar card
buffer_number: counter for which buffer we are handling
"""
raise NotImplementedError("This method should be implemented in a subclass")
def post_acquire(self) -> OutputType:
"""
This method should return any information you want to save from this
acquisition. The acquisition method from the Alazar driver will use
this data as its own return value
Returns:
this function should return all relevant data that you want
to get form the acquisition
"""
raise NotImplementedError("This method should be implemented in a subclass")
def buffer_done_callback(self, buffers_completed: int) -> None:
"""
This method is called when a buffer is completed. It can be used
if you want to implement an event that happens for each buffer.
You will probably want to combine this with `AUX_IN_TRIGGER_ENABLE`
to wait before starting capture of the next buffer.
Args:
buffers_completed: how many buffers have been completed and copied
to local memory at the time of this callback.
"""
pass
class AcquisitionController(Instrument, AcquisitionInterface[Any], Generic[OutputType]):
"""
Compatibility class. The methods of :class:`AcquisitionController`
have been extracted. This class is the base class fro AcquisitionInterfaces
that are intended to be QCoDeS instruments at the same time.
"""
def __init__(
self, name: str, alazar_name: str, **kwargs: Unpack[InstrumentBaseKWArgs]
):
"""
Args:
name: The name of the AcquisitionController
alazar_name: The name of the alazar instrument.
**kwargs: kwargs are forwarded to base class.
"""
super().__init__(name, **kwargs)
self._alazar: AlazarTechATS = self.find_instrument(
alazar_name, instrument_class=AlazarTechATS
)
def _get_alazar(self) -> AlazarTechATS:
"""
Returns a reference to the alazar instrument. A call to self._alazar is
quicker, so use that if in need for speed
:return: reference to the Alazar instrument
"""
return self._alazar