forked from microsoft/Qcodes
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest_measurement_context_manager.py
2484 lines (2019 loc) · 86 KB
/
test_measurement_context_manager.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
from __future__ import annotations
import logging
import os
import random
import re
import traceback
from time import sleep
from typing import Any
import hypothesis.strategies as hst
import numpy as np
import pytest
import xarray as xr
from hypothesis import HealthCheck, given, settings
from numpy.testing import assert_allclose, assert_array_equal
from pytest import LogCaptureFixture
import qcodes as qc
import qcodes.validators as vals
from qcodes.dataset.data_set import DataSet, load_by_id
from qcodes.dataset.descriptions.param_spec import ParamSpecBase
from qcodes.dataset.experiment_container import new_experiment
from qcodes.dataset.export_config import DataExportType
from qcodes.dataset.measurements import Measurement
from qcodes.dataset.sqlite.connection import atomic_transaction
from qcodes.parameters import (
DelegateParameter,
ManualParameter,
Parameter,
expand_setpoints_helper,
)
from qcodes.station import Station
from qcodes.validators import ComplexNumbers
from tests.common import retry_until_does_not_throw
def test_log_messages(caplog: LogCaptureFixture, meas_with_registered_param) -> None:
caplog.set_level(logging.INFO)
with meas_with_registered_param.run():
pass
assert "Set the run_timestamp of run_id" in caplog.text
assert "Starting measurement with guid" in caplog.text
assert "Finished measurement with guid" in caplog.text
def test_log_includes_extra_info(
caplog: LogCaptureFixture, meas_with_registered_param
) -> None:
caplog.set_level(logging.INFO)
meas_with_registered_param._extra_log_info = "some extra info"
with meas_with_registered_param.run():
pass
assert "some extra info" in caplog.text
@pytest.mark.usefixtures("experiment")
def test_register_parameter_arg_types(DAC, DMM):
"""Test basis and setpoints argument types."""
meas = Measurement()
meas.register_parameter(DAC.ch1)
with pytest.raises(TypeError):
meas.register_parameter(DMM.v1, basis=DAC.ch1)
with pytest.raises(TypeError):
meas.register_parameter(DMM.v1, setpoints=DAC.ch1)
with pytest.raises(TypeError):
meas.register_parameter(DMM.v1, basis="foo")
with pytest.raises(TypeError):
meas.register_parameter(DMM.v1, setpoints="foo")
with pytest.raises(TypeError):
meas.register_parameter(DMM.v1, basis=(DAC.ch1, 3)) # type: ignore[arg-type]
with pytest.raises(TypeError):
meas.register_parameter(DMM.v1, setpoints=(DAC.ch1, 3)) # type: ignore[arg-type]
def test_register_parameter_numbers(DAC, DMM) -> None:
"""
Test the registration of scalar QCoDeS parameters
"""
parameters = [DAC.ch1, DAC.ch2, DMM.v1, DMM.v2]
not_parameters = ("", "Parameter", 0, 1.1, Measurement)
meas = Measurement()
for not_a_parameter in not_parameters:
with pytest.raises(ValueError):
meas.register_parameter(not_a_parameter) # type: ignore[arg-type]
my_param = DAC.ch1
meas.register_parameter(my_param)
assert len(meas.parameters) == 1
paramspec = meas.parameters[str(my_param)]
assert paramspec.name == str(my_param)
assert paramspec.label == my_param.label
assert paramspec.unit == my_param.unit
assert paramspec.type == "numeric"
# we allow the registration of the EXACT same parameter twice...
meas.register_parameter(my_param)
# ... but not a different parameter with a new name
attrs = ["label", "unit"]
vals = ["new label", "new unit"]
for attr, val in zip(attrs, vals):
old_val = getattr(my_param, attr)
setattr(my_param, attr, val)
match = re.escape("Parameter already registered in this Measurement.")
with pytest.raises(ValueError, match=match):
meas.register_parameter(my_param)
setattr(my_param, attr, old_val)
assert len(meas.parameters) == 1
paramspec = meas.parameters[str(my_param)]
assert paramspec.name == str(my_param)
assert paramspec.label == my_param.label
assert paramspec.unit == my_param.unit
assert paramspec.type == "numeric"
for parameter in parameters:
with pytest.raises(ValueError):
meas.register_parameter(my_param, setpoints=(parameter,))
with pytest.raises(ValueError):
meas.register_parameter(my_param, basis=(parameter,))
meas.register_parameter(DAC.ch2)
meas.register_parameter(DMM.v1)
meas.register_parameter(DMM.v2)
meas.unregister_parameter(my_param)
meas.register_parameter(my_param, basis=(DAC.ch2,), setpoints=(DMM.v1, DMM.v2))
assert set(meas.parameters.keys()) == {
str(DAC.ch2),
str(DMM.v1),
str(DMM.v2),
str(my_param),
}
paramspec = meas.parameters[str(my_param)]
assert paramspec.name == str(my_param)
meas = Measurement()
meas.register_parameter(DAC.ch1)
meas.register_parameter(DAC.ch2, setpoints=(DAC.ch1,))
with pytest.raises(ValueError):
meas.register_parameter(DMM.v1, setpoints=(DAC.ch2,))
def test_register_custom_parameter(DAC) -> None:
"""
Test the registration of custom parameters
"""
meas = Measurement()
name = "V_modified"
unit = "V^2"
label = "square of the voltage"
meas.register_custom_parameter(name, label, unit)
assert len(meas.parameters) == 1
assert isinstance(meas.parameters[name], ParamSpecBase)
assert meas.parameters[name].unit == unit
assert meas.parameters[name].label == label
assert meas.parameters[name].type == "numeric"
newunit = "V^3"
newlabel = "cube of the voltage"
meas.unregister_parameter(name)
meas.register_custom_parameter(name, newlabel, newunit)
assert len(meas.parameters) == 1
assert isinstance(meas.parameters[name], ParamSpecBase)
assert meas.parameters[name].unit == newunit
assert meas.parameters[name].label == newlabel
with pytest.raises(ValueError):
meas.register_custom_parameter(name, label, unit, setpoints=(DAC.ch1,))
with pytest.raises(ValueError):
meas.register_custom_parameter(name, label, unit, basis=(DAC.ch2,))
meas.register_parameter(DAC.ch1)
meas.register_parameter(DAC.ch2)
meas.register_custom_parameter("strange_dac")
meas.unregister_parameter(name)
meas.register_custom_parameter(
name, label, unit, setpoints=(DAC.ch1, str(DAC.ch2)), basis=("strange_dac",)
)
assert len(meas.parameters) == 4
meas.parameters[name]
with pytest.raises(ValueError):
meas.register_custom_parameter(
"double dependence", "label", "unit", setpoints=(name,)
)
def test_register_delegate_parameters() -> None:
x_param = Parameter("x", set_cmd=None, get_cmd=None)
complex_param = Parameter(
"complex_param", get_cmd=None, set_cmd=None, vals=ComplexNumbers()
)
delegate_param = DelegateParameter("delegate", source=complex_param)
meas = Measurement()
meas.register_parameter(x_param)
meas.register_parameter(delegate_param, setpoints=(x_param,))
assert len(meas.parameters) == 2
assert meas.parameters["delegate"].type == "complex"
assert meas.parameters["x"].type == "numeric"
def test_register_delegate_parameters_with_late_source() -> None:
x_param = Parameter("x", set_cmd=None, get_cmd=None)
complex_param = Parameter(
"complex_param", get_cmd=None, set_cmd=None, vals=ComplexNumbers()
)
delegate_param = DelegateParameter("delegate", source=None)
meas = Measurement()
meas.register_parameter(x_param)
delegate_param.source = complex_param
meas.register_parameter(delegate_param, setpoints=(x_param,))
assert len(meas.parameters) == 2
assert meas.parameters["delegate"].type == "complex"
assert meas.parameters["x"].type == "numeric"
def test_register_delegate_parameters_with_late_source_chain():
x_param = Parameter("x", set_cmd=None, get_cmd=None)
complex_param = Parameter(
"complex_param", get_cmd=None, set_cmd=None, vals=ComplexNumbers()
)
delegate_inner = DelegateParameter("delegate_inner", source=None)
delegate_outer = DelegateParameter("delegate_outer", source=None)
meas = Measurement()
meas.register_parameter(x_param)
delegate_outer.source = delegate_inner
delegate_inner.source = complex_param
meas.register_parameter(delegate_outer, setpoints=(x_param,))
assert len(meas.parameters) == 2
assert meas.parameters["delegate_outer"].type == "complex"
assert meas.parameters["x"].type == "numeric"
def test_unregister_parameter(DAC, DMM) -> None:
"""
Test the unregistering of parameters.
"""
DAC.add_parameter("impedance", get_cmd=lambda: 5)
meas = Measurement()
meas.register_parameter(DAC.ch2)
meas.register_parameter(DMM.v1)
meas.register_parameter(DMM.v2)
meas.register_parameter(DAC.ch1, basis=(DMM.v1, DMM.v2), setpoints=(DAC.ch2,))
with pytest.raises(ValueError):
meas.unregister_parameter(DAC.ch2)
with pytest.raises(ValueError):
meas.unregister_parameter(str(DAC.ch2))
with pytest.raises(ValueError):
meas.unregister_parameter(DMM.v1)
with pytest.raises(ValueError):
meas.unregister_parameter(DMM.v2)
meas.unregister_parameter(DAC.ch1)
assert set(meas.parameters.keys()) == {str(DAC.ch2), str(DMM.v1), str(DMM.v2)}
meas.unregister_parameter(DAC.ch2)
assert set(meas.parameters.keys()) == {str(DMM.v1), str(DMM.v2)}
not_parameters = [DAC, DMM, 0.0, 1]
for notparam in not_parameters:
with pytest.raises(ValueError):
meas.unregister_parameter(notparam)
# unregistering something not registered should silently "succeed"
meas.unregister_parameter("totes_not_registered")
meas.unregister_parameter(DAC.ch2)
meas.unregister_parameter(DAC.ch2)
@pytest.mark.usefixtures("experiment")
@pytest.mark.parametrize("bg_writing", [True, False])
def test_mixing_array_and_numeric(DAC, bg_writing) -> None:
"""
Test that mixing array and numeric types is okay
"""
meas = Measurement()
meas.register_parameter(DAC.ch1, paramtype="numeric")
meas.register_parameter(DAC.ch2, paramtype="array")
with meas.run(write_in_background=bg_writing) as datasaver:
datasaver.add_result(
(DAC.ch1, np.array([DAC.ch1(), DAC.ch1()])),
(DAC.ch2, np.array([DAC.ch2(), DAC.ch1()])),
)
def test_measurement_name_default(experiment, DAC, DMM) -> None:
fmt = experiment.format_string
exp_id = experiment.exp_id
default_name = "results"
meas = Measurement()
assert meas.name == ""
meas.register_parameter(DAC.ch1)
meas.register_parameter(DMM.v1, setpoints=[DAC.ch1])
with meas.run() as datasaver:
run_id = datasaver.run_id
expected_name = fmt.format(default_name, exp_id, run_id)
ds = datasaver.dataset
assert isinstance(ds, DataSet)
assert ds.table_name == expected_name
assert ds.name == default_name
def test_measurement_name_changed_via_attribute(experiment, DAC, DMM) -> None:
fmt = experiment.format_string
exp_id = experiment.exp_id
name = "yolo"
meas = Measurement()
meas.name = name
meas.register_parameter(DAC.ch1)
meas.register_parameter(DMM.v1, setpoints=[DAC.ch1])
with meas.run() as datasaver:
run_id = datasaver.run_id
expected_name = fmt.format("results", exp_id, run_id)
ds = datasaver.dataset
assert isinstance(ds, DataSet)
assert ds.table_name == expected_name
assert ds.name == name
def test_measurement_name_set_as_argument(experiment, DAC, DMM) -> None:
fmt = experiment.format_string
exp_id = experiment.exp_id
name = "yolo"
meas = Measurement(name=name, exp=experiment)
meas.register_parameter(DAC.ch1)
meas.register_parameter(DMM.v1, setpoints=[DAC.ch1])
with meas.run() as datasaver:
run_id = datasaver.run_id
expected_name = fmt.format("results", exp_id, run_id)
ds = datasaver.dataset
assert isinstance(ds, DataSet)
assert ds.table_name == expected_name
assert ds.name == name
@settings(deadline=None)
@given(wp=hst.one_of(hst.integers(), hst.floats(allow_nan=False), hst.text()))
@pytest.mark.usefixtures("empty_temp_db")
def test_setting_write_period(wp) -> None:
new_experiment("firstexp", sample_name="no sample")
meas = Measurement()
meas.register_custom_parameter(name="dummy")
if isinstance(wp, str):
with pytest.raises(ValueError):
meas.write_period = wp # type: ignore[assignment]
elif wp < 1e-3:
with pytest.raises(ValueError):
meas.write_period = wp
else:
meas.write_period = wp
assert meas._write_period == float(wp)
with meas.run() as datasaver:
assert datasaver.write_period == float(wp)
@settings(deadline=None)
@given(wp=hst.one_of(hst.integers(), hst.floats(allow_nan=False), hst.text()))
@pytest.mark.usefixtures("experiment")
def test_setting_write_period_from_config(wp) -> None:
qc.config.dataset.write_period = wp
if isinstance(wp, str):
with pytest.raises(ValueError):
Measurement()
elif wp < 1e-3:
with pytest.raises(ValueError):
Measurement()
else:
meas = Measurement()
assert meas.write_period == float(wp)
meas.register_custom_parameter(name="dummy")
with meas.run() as datasaver:
assert datasaver.write_period == float(wp)
@pytest.mark.parametrize("write_in_background", [True, False])
@pytest.mark.usefixtures("experiment")
def test_setting_write_in_background_from_config(write_in_background) -> None:
qc.config.dataset.write_in_background = write_in_background
meas = Measurement()
meas.register_custom_parameter(name="dummy")
with meas.run() as datasaver:
ds = datasaver.dataset
assert isinstance(ds, DataSet)
assert ds._writer_status.write_in_background is write_in_background
@pytest.mark.usefixtures("experiment")
def test_method_chaining(DAC) -> None:
(
Measurement()
.register_parameter(DAC.ch1)
.register_custom_parameter(name="freqax", label="Frequency axis", unit="Hz")
.add_before_run((lambda: None), ())
.add_after_run((lambda: None), ())
.add_subscriber((lambda values, idx, state: None), state=[])
)
@pytest.mark.usefixtures("experiment")
@settings(deadline=None, suppress_health_check=(HealthCheck.function_scoped_fixture,))
@given(words=hst.lists(elements=hst.text(), min_size=4, max_size=10))
def test_enter_and_exit_actions(DAC, words) -> None:
# we use a list to check that the functions executed
# in the correct order
def action(lst, word):
lst.append(word)
meas = Measurement()
meas.register_parameter(DAC.ch1)
testlist: list[str] = []
splitpoint = round(len(words) / 2)
for n in range(splitpoint):
meas.add_before_run(action, (testlist, words[n]))
for m in range(splitpoint, len(words)):
meas.add_after_run(action, (testlist, words[m]))
assert len(meas.enteractions) == splitpoint
assert len(meas.exitactions) == len(words) - splitpoint
with meas.run() as _:
assert testlist == words[:splitpoint]
assert testlist == words
meas = Measurement()
with pytest.raises(ValueError):
meas.add_before_run(action, "no list!")
with pytest.raises(ValueError):
meas.add_after_run(action, testlist)
def test_subscriptions(experiment, DAC, DMM) -> None:
"""
Test that subscribers are called at the moment the data is flushed to
database
Note that for the purpose of this test, flush_data_to_database method is
called explicitly instead of waiting for the data to be flushed
automatically after the write_period passes after a add_result call.
"""
def collect_all_results(results, length, state):
"""
Updates the *state* to contain all the *results* acquired
during the experiment run
"""
# Due to the fact that by default subscribers only hold 1 data value
# in their internal queue, this assignment should work (i.e. not
# overwrite values in the "state" object) assuming that at the start
# of the experiment both the dataset and the *state* objects have
# the same length.
state[length] = results
def collect_values_larger_than_7(results, length, state):
"""
Appends to the *state* only the values from *results*
that are larger than 7
"""
for result_tuple in results:
state += [value for value in result_tuple if value > 7]
meas = Measurement(exp=experiment)
meas.register_parameter(DAC.ch1)
meas.register_parameter(DMM.v1, setpoints=(DAC.ch1,))
# key is the number of the result tuple,
# value is the result tuple itself
all_results_dict: dict[str, Any] = {}
values_larger_than_7: list[float] = []
meas.add_subscriber(collect_all_results, state=all_results_dict)
assert len(meas.subscribers) == 1
meas.add_subscriber(collect_values_larger_than_7, state=values_larger_than_7)
assert len(meas.subscribers) == 2
meas.write_period = 0.2
with meas.run() as datasaver:
# Assert that the measurement, runner, and datasaver
# have added subscribers to the dataset
ds = datasaver.dataset
assert isinstance(ds, DataSet)
assert len(ds.subscribers) == 2
assert all_results_dict == {}
assert values_larger_than_7 == []
dac_vals_and_dmm_vals = list(zip(range(5), range(3, 8)))
values_larger_than_7__expected = []
for num in range(5):
(dac_val, dmm_val) = dac_vals_and_dmm_vals[num]
values_larger_than_7__expected += [
val for val in (dac_val, dmm_val) if val > 7
]
datasaver.add_result((DAC.ch1, dac_val), (DMM.v1, dmm_val))
# Ensure that data is flushed to the database despite the write
# period, so that the database triggers are executed, which in turn
# add data to the queues within the subscribers
datasaver.flush_data_to_database()
# In order to make this test deterministic, we need to ensure that
# just enough time has passed between the moment the data is
# flushed to database and the "state" object (that is passed to
# subscriber constructor) has been updated by the corresponding
# subscriber's callback function. At the moment, there is no robust
# way to ensure this. The reason is that the subscribers have
# internal queue which is populated via a trigger call from the SQL
# database, hence from this "main" thread it is difficult to say
# whether the queue is empty because the subscriber callbacks have
# already been executed or because the triggers of the SQL database
# has not been executed yet.
#
# In order to overcome this problem, a special decorator is used to
# wrap the assertions. This is going to ensure that some time is
# given to the Subscriber threads to finish exhausting the queue.
@retry_until_does_not_throw(
exception_class_to_expect=AssertionError, delay=0.5, tries=20
)
def assert_states_updated_from_callbacks():
assert values_larger_than_7 == values_larger_than_7__expected
assert list(all_results_dict.keys()) == [
result_index for result_index in range(1, num + 1 + 1)
]
assert_states_updated_from_callbacks()
# Ensure that after exiting the "run()" context,
# all subscribers get unsubscribed from the dataset
assert len(ds.subscribers) == 0
# Ensure that the triggers for each subscriber
# have been removed from the database
get_triggers_sql = "SELECT * FROM sqlite_master WHERE TYPE = 'trigger';"
triggers = atomic_transaction(ds.conn, get_triggers_sql).fetchall()
assert len(triggers) == 0
def test_subscribers_called_at_exiting_context_if_queue_is_not_empty(
experiment, DAC
) -> None:
"""
Upon quitting the "run()" context, verify that in case the queue is
not empty, the subscriber's callback is still called on that data.
This situation is created by setting the minimum length of the queue
to a number that is larger than the number of value written to the dataset.
"""
def collect_x_vals(results, length, state):
"""
Collects first elements of results tuples in *state*
"""
index_of_x = 0
state += [res[index_of_x] for res in results]
meas = Measurement(exp=experiment)
meas.register_parameter(DAC.ch1)
collected_x_vals: list[float] = []
meas.add_subscriber(collect_x_vals, state=collected_x_vals)
given_x_vals = [0, 1, 2, 3]
with meas.run() as datasaver:
# Set the minimum queue size of the subscriber to more that
# the total number of values being added to the dataset;
# this way the subscriber callback is not called before
# we exit the "run()" context.
ds = datasaver.dataset
assert isinstance(ds, DataSet)
subscriber = next(iter(ds.subscribers.values()))
subscriber.min_queue_length = int(len(given_x_vals) + 1)
for x in given_x_vals:
datasaver.add_result((DAC.ch1, x))
# Verify that the subscriber callback is not called yet
assert collected_x_vals == []
# Verify that the subscriber callback is finally called
assert collected_x_vals == given_x_vals
@pytest.mark.serial
@pytest.mark.flaky(reruns=5)
def test_subscribers_called_for_all_data_points(experiment, DAC, DMM) -> None:
N = random.randint(2000, 3000)
def sub_get_x_vals(results, length, state):
"""
A list of all x values
"""
state += [res[0] for res in results]
def sub_get_y_vals(results, length, state):
"""
A list of all y values
"""
state += [res[1] for res in results]
meas = Measurement(exp=experiment)
meas.register_parameter(DAC.ch1)
meas.register_parameter(DMM.v1, setpoints=(DAC.ch1,))
xvals: list[float] = []
yvals: list[float] = []
meas.add_subscriber(sub_get_x_vals, state=xvals)
meas.add_subscriber(sub_get_y_vals, state=yvals)
given_xvals = range(N)
given_yvals = range(1, N + 1)
with meas.run() as datasaver:
for x, y in zip(given_xvals, given_yvals):
datasaver.add_result((DAC.ch1, x), (DMM.v1, y))
assert xvals == list(given_xvals)
assert yvals == list(given_yvals)
# There is no way around it: this test is slow. We test that write_period
# works and hence we must wait for some time to elapse. Sorry.
@pytest.mark.serial
@settings(
max_examples=5,
deadline=None,
suppress_health_check=(HealthCheck.function_scoped_fixture,),
)
@given(
breakpoint=hst.integers(min_value=1, max_value=19),
write_period=hst.floats(min_value=0.1, max_value=1.5),
set_values=hst.lists(elements=hst.floats(), min_size=20, max_size=20),
get_values=hst.lists(elements=hst.floats(), min_size=20, max_size=20),
)
def test_datasaver_scalars(
experiment, DAC, DMM, set_values, get_values, breakpoint, write_period
) -> None:
no_of_runs = len(experiment)
station = Station(DAC, DMM)
meas = Measurement(station=station)
meas.write_period = write_period
assert meas.write_period == write_period
meas.register_parameter(DAC.ch1)
meas.register_parameter(DMM.v1, setpoints=(DAC.ch1,))
with meas.run() as datasaver:
for set_v, get_v in zip(set_values[:breakpoint], get_values[:breakpoint]):
datasaver.add_result((DAC.ch1, set_v), (DMM.v1, get_v))
assert datasaver._dataset.number_of_results == 0
sleep(write_period * 1.1)
datasaver.add_result(
(DAC.ch1, set_values[breakpoint]), (DMM.v1, get_values[breakpoint])
)
# test that work with time intervals are often flaky
# so we add a bit more wait time here if the expected number
# of points have not been written
for _ in range(10):
if not datasaver.points_written == breakpoint + 1:
sleep(write_period * 1.1)
assert datasaver.points_written == breakpoint + 1
assert datasaver.run_id == no_of_runs + 1
with meas.run() as datasaver:
with pytest.raises(ValueError):
datasaver.add_result((DAC.ch2, 1), (DAC.ch2, 2))
with pytest.raises(ValueError):
datasaver.add_result((DMM.v1, 0))
# More assertions of setpoints, labels and units in the DB!
def test_datasaver_inst_metadata(experiment, DAC_with_metadata, DMM) -> None:
"""
Check that additional instrument metadata is captured into the dataset snapshot
"""
station = Station(DAC_with_metadata, DMM)
meas = Measurement(station=station)
meas.register_parameter(DAC_with_metadata.ch1)
meas.register_parameter(DMM.v1, setpoints=(DAC_with_metadata.ch1,))
with meas.run() as datasaver:
for set_v in range(10):
DAC_with_metadata.ch1.set(set_v)
datasaver.add_result((DAC_with_metadata.ch1, set_v), (DMM.v1, DMM.v1.get()))
assert datasaver.dataset.snapshot is not None
station_snapshot = datasaver.dataset.snapshot["station"]
assert station_snapshot["instruments"]["dummy_dac"]["metadata"] == {
"dac": "metadata"
}
def test_exception_happened_during_measurement_is_stored_in_dataset_metadata(
experiment,
) -> None:
meas = Measurement()
meas.register_custom_parameter(name="nodata")
class SomeMeasurementException(Exception):
pass
dataset = None
# `pytest.raises`` is used here instead of custom try-except for convenience
with pytest.raises(SomeMeasurementException, match="foo") as e:
with meas.run() as datasaver:
dataset = datasaver.dataset
raise SomeMeasurementException("foo")
assert dataset is not None
metadata = dataset.metadata
assert "measurement_exception" in metadata
expected_exception_string = "".join(
traceback.format_exception(e.type, e.value, e.tb)
)
exception_string = metadata["measurement_exception"]
assert exception_string == expected_exception_string
@pytest.mark.parametrize("bg_writing", [True, False])
@settings(max_examples=10, deadline=None)
@given(N=hst.integers(min_value=2, max_value=500))
@pytest.mark.usefixtures("empty_temp_db")
def test_datasaver_arrays_lists_tuples(bg_writing, N) -> None:
new_experiment("firstexp", sample_name="no sample")
meas = Measurement()
meas.register_custom_parameter(name="freqax", label="Frequency axis", unit="Hz")
meas.register_custom_parameter(
name="signal",
label="qubit signal",
unit="Majorana number",
setpoints=("freqax",),
)
with meas.run(write_in_background=bg_writing) as datasaver:
freqax = np.linspace(1e6, 2e6, N)
signal = np.random.randn(N)
datasaver.add_result(("freqax", freqax), ("signal", signal))
assert datasaver.points_written == N
ds = datasaver.dataset
assert isinstance(ds, DataSet)
assert not ds.conn.atomic_in_progress
with meas.run(write_in_background=bg_writing) as datasaver:
freqax = np.linspace(1e6, 2e6, N)
signal = np.random.randn(N - 1)
with pytest.raises(ValueError):
datasaver.add_result(("freqax", freqax), ("signal", signal))
meas.register_custom_parameter(
name="gate_voltage", label="Gate tuning potential", unit="V"
)
meas.unregister_parameter("signal")
meas.register_custom_parameter(
name="signal",
label="qubit signal",
unit="Majorana flux",
setpoints=("freqax", "gate_voltage"),
)
# save arrays
with meas.run(write_in_background=bg_writing) as datasaver:
freqax = np.linspace(1e6, 2e6, N)
signal1 = np.random.randn(N)
datasaver.add_result(
("freqax", freqax), ("signal", signal1), ("gate_voltage", 0)
)
assert datasaver.points_written == N
ds = datasaver.dataset
assert isinstance(ds, DataSet)
assert not ds.conn.atomic_in_progress
# save lists
with meas.run(write_in_background=bg_writing) as datasaver:
freqax2 = list(np.linspace(1e6, 2e6, N))
signal2 = list(np.random.randn(N))
datasaver.add_result(
("freqax", freqax2), ("signal", signal2), ("gate_voltage", 0)
)
assert datasaver.points_written == N
# save tuples
with meas.run(write_in_background=bg_writing) as datasaver:
freqax3 = tuple(np.linspace(1e6, 2e6, N))
signal3 = tuple(np.random.randn(N))
datasaver.add_result(
("freqax", freqax3), ("signal", signal3), ("gate_voltage", 0)
)
assert datasaver.points_written == N
@pytest.mark.parametrize("bg_writing", [True, False])
@settings(max_examples=10, deadline=None)
@given(N=hst.integers(min_value=2, max_value=500))
@pytest.mark.usefixtures("empty_temp_db")
def test_datasaver_numeric_and_array_paramtype(bg_writing, N) -> None:
"""
Test saving one parameter with 'numeric' paramtype and one parameter with
'array' paramtype
"""
new_experiment("firstexp", sample_name="no sample")
meas = Measurement()
meas.register_custom_parameter(
name="numeric_1", label="Magnetic field", unit="T", paramtype="numeric"
)
meas.register_custom_parameter(
name="array_1",
label="Alazar signal",
unit="V",
paramtype="array",
setpoints=("numeric_1",),
)
signal = np.random.randn(113)
with meas.run(bg_writing) as datasaver:
datasaver.add_result(("numeric_1", 3.75), ("array_1", signal))
assert datasaver.points_written == 1
ds = datasaver.dataset
assert isinstance(ds, DataSet)
assert ds.parameters is not None
data = ds.get_parameter_data(*ds.parameters.split(","))
assert (data["numeric_1"]["numeric_1"] == np.array([3.75])).all()
assert np.allclose(data["array_1"]["array_1"], signal)
@pytest.mark.parametrize("bg_writing", [True, False])
@pytest.mark.usefixtures("empty_temp_db")
def test_datasaver_numeric_after_array_paramtype(bg_writing) -> None:
"""
Test that passing values for 'array' parameter in `add_result` before
passing values for 'numeric' parameter works.
"""
new_experiment("firstexp", sample_name="no sample")
meas = Measurement()
meas.register_custom_parameter(
name="numeric_1", label="Magnetic field", unit="T", paramtype="numeric"
)
meas.register_custom_parameter(
name="array_1",
label="Alazar signal",
unit="V",
paramtype="array",
setpoints=("numeric_1",),
)
signal = np.random.randn(113)
with meas.run(write_in_background=bg_writing) as datasaver:
# it is important that first comes the 'array' data and then 'numeric'
datasaver.add_result(("array_1", signal), ("numeric_1", 3.75))
assert datasaver.points_written == 1
ds = datasaver.dataset
assert isinstance(ds, DataSet)
assert ds.parameters is not None
data = ds.get_parameter_data(*ds.parameters.split(","))
assert (data["numeric_1"]["numeric_1"] == np.array([3.75])).all()
assert np.allclose(data["array_1"]["array_1"], signal)
@pytest.mark.parametrize("bg_writing", [True, False])
@pytest.mark.usefixtures("experiment")
def test_datasaver_foul_input(bg_writing) -> None:
meas = Measurement()
meas.register_custom_parameter(
"foul", label="something unnatural", unit="Fahrenheit"
)
foul_stuff = [Parameter("foul"), {1, 2, 3}]
with meas.run(bg_writing) as datasaver:
for ft in foul_stuff:
with pytest.raises(ValueError):
datasaver.add_result(("foul", ft)) # type: ignore[arg-type]
@settings(max_examples=10, deadline=None)
@given(N=hst.integers(min_value=2, max_value=500))
@pytest.mark.usefixtures("empty_temp_db")
@pytest.mark.parametrize("bg_writing", [True, False])
@pytest.mark.parametrize("storage_type", ["numeric", "array"])
def test_datasaver_unsized_arrays(N, storage_type, bg_writing) -> None:
new_experiment("firstexp", sample_name="no sample")
meas = Measurement()
meas.register_custom_parameter(
name="freqax", label="Frequency axis", unit="Hz", paramtype=storage_type
)
meas.register_custom_parameter(
name="signal",
label="qubit signal",
unit="Majorana number",
setpoints=("freqax",),
paramtype=storage_type,
)
# note that np.array(some_number) is not the same as the number
# its also not an array with a shape. Check here that we handle it
# correctly
with meas.run(write_in_background=bg_writing) as datasaver:
freqax = np.linspace(1e6, 2e6, N)
np.random.seed(0)
signal = np.random.randn(N)
for i in range(N):
myfreq = np.array(freqax[i])
assert myfreq.shape == ()
mysignal = np.array(signal[i])
assert mysignal.shape == ()
datasaver.add_result(("freqax", myfreq), ("signal", mysignal))
assert datasaver.points_written == N
ds = datasaver.dataset
assert isinstance(ds, DataSet)