From 42a14f53943f4de2933c4e3d0bd44a339a11d5a6 Mon Sep 17 00:00:00 2001 From: Chris Rodgers Date: Wed, 9 Jun 2021 17:00:35 -0400 Subject: [PATCH 1/9] station.l_child can now send START to multiple CHILDID --- autopilot/networking/station.py | 31 ++++++++++++++++++++++++++----- 1 file changed, 26 insertions(+), 5 deletions(-) diff --git a/autopilot/networking/station.py b/autopilot/networking/station.py index 95704b9c..64fdab6f 100644 --- a/autopilot/networking/station.py +++ b/autopilot/networking/station.py @@ -1270,19 +1270,40 @@ def l_continuous(self, msg:Message): def l_child(self, msg:Message): """ - Telling our child to run a task. + Tell one or more children to start running a task. - Args: - msg (): + By default, the `key` argument passed to `self.send` is 'START'. + However, this can be overriden by providing the desired string + as `msg.value['KEY']`. - Returns: + This checks the pref `CHILDID` to get the names of one or more children. + If that pref is a string, sends the message to just that child. + If that pref is a list, sends the message to each child in the list. + Args: + msg (): A message to send to the child or children. + + Returns: + nothing """ + # Take `KEY` from msg.value['KEY'] if available + # Otherwise, use 'START' if 'KEY' in msg.value.keys(): KEY = msg.value['KEY'] else: KEY = 'START' - self.send(to=prefs.get('CHILDID'), key=KEY, value=msg.value) + + # Get the list of children + childid_pref = prefs.get('CHILDID') + + # Send to one or more children + if isinstance(childid_pref, list): + # It's a list of multiple children, send to each + for childid in pref_childid: + self.send(to=childid, key=KEY, value=msg.value) + else: + # Send to the only child + self.send(to=pref_childid, key=KEY, value=msg.value) def l_forward(self, msg:Message): """ From 02e29fe81bf63198720f2ce647a6eca8824ee9af Mon Sep 17 00:00:00 2001 From: Chris Rodgers Date: Wed, 9 Jun 2021 17:02:37 -0400 Subject: [PATCH 2/9] update prefs to indicate that CHILDID can now be a list --- autopilot/prefs.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/autopilot/prefs.py b/autopilot/prefs.py index 0a1047f3..4a62b909 100644 --- a/autopilot/prefs.py +++ b/autopilot/prefs.py @@ -315,8 +315,9 @@ class Scopes(Enum): "scope": Scopes.LINEAGE }, 'CHILDID': { - 'type': 'str', - "text": "Child ID:", + 'type': 'list', + "text": "List of Child ID:", + 'default': [], "depends": ("LINEAGE", "PARENT"), "scope": Scopes.LINEAGE }, From 881e438111b8a1da53dbbd097b40b25f8572591b Mon Sep 17 00:00:00 2001 From: Chris Rodgers Date: Wed, 9 Jun 2021 17:08:20 -0400 Subject: [PATCH 3/9] reverting commits that belong in multi_child branch --- autopilot/networking/station.py | 31 +++++-------------------------- autopilot/prefs.py | 5 ++--- 2 files changed, 7 insertions(+), 29 deletions(-) diff --git a/autopilot/networking/station.py b/autopilot/networking/station.py index 64fdab6f..95704b9c 100644 --- a/autopilot/networking/station.py +++ b/autopilot/networking/station.py @@ -1270,40 +1270,19 @@ def l_continuous(self, msg:Message): def l_child(self, msg:Message): """ - Tell one or more children to start running a task. - - By default, the `key` argument passed to `self.send` is 'START'. - However, this can be overriden by providing the desired string - as `msg.value['KEY']`. - - This checks the pref `CHILDID` to get the names of one or more children. - If that pref is a string, sends the message to just that child. - If that pref is a list, sends the message to each child in the list. + Telling our child to run a task. Args: - msg (): A message to send to the child or children. + msg (): + + Returns: - Returns: - nothing """ - # Take `KEY` from msg.value['KEY'] if available - # Otherwise, use 'START' if 'KEY' in msg.value.keys(): KEY = msg.value['KEY'] else: KEY = 'START' - - # Get the list of children - childid_pref = prefs.get('CHILDID') - - # Send to one or more children - if isinstance(childid_pref, list): - # It's a list of multiple children, send to each - for childid in pref_childid: - self.send(to=childid, key=KEY, value=msg.value) - else: - # Send to the only child - self.send(to=pref_childid, key=KEY, value=msg.value) + self.send(to=prefs.get('CHILDID'), key=KEY, value=msg.value) def l_forward(self, msg:Message): """ diff --git a/autopilot/prefs.py b/autopilot/prefs.py index 4a62b909..0a1047f3 100644 --- a/autopilot/prefs.py +++ b/autopilot/prefs.py @@ -315,9 +315,8 @@ class Scopes(Enum): "scope": Scopes.LINEAGE }, 'CHILDID': { - 'type': 'list', - "text": "List of Child ID:", - 'default': [], + 'type': 'str', + "text": "Child ID:", "depends": ("LINEAGE", "PARENT"), "scope": Scopes.LINEAGE }, From b270985470a0d0d700b42e4ac72308385999d272 Mon Sep 17 00:00:00 2001 From: PAC Lab Date: Thu, 31 Mar 2022 14:07:01 -0400 Subject: [PATCH 4/9] Add the ex_ContData Task --- autopilot/tasks/ex_ContData.py | 301 +++++++++++++++++++++++++++++++++ 1 file changed, 301 insertions(+) create mode 100644 autopilot/tasks/ex_ContData.py diff --git a/autopilot/tasks/ex_ContData.py b/autopilot/tasks/ex_ContData.py new file mode 100644 index 00000000..0e7f8f71 --- /dev/null +++ b/autopilot/tasks/ex_ContData.py @@ -0,0 +1,301 @@ +"""This module defines the ex_ContData Task. + +This is an example showing how to return both trial-based and continuous data. +""" + +import threading +import itertools +import random +import datetime +import functools +from collections import OrderedDict as odict +import time +import queue +import tables +import numpy as np +import pandas +import autopilot.hardware.gpio +from autopilot.stim.sound import sounds +from autopilot.tasks.task import Task +from autopilot.networking import Net_Node +from autopilot import prefs +from autopilot.hardware import BCM_TO_BOARD +from autopilot.core.loggers import init_logger +from autopilot.stim.sound import jackclient + +# The name of the task +# This declaration allows Subject to identify which class in this file +# contains the task class. +TASK = 'ex_ContData' + + +## Define the Task +class ex_ContData(Task): + """The probabalistic auditory foraging task (PAFT). + + This passes through three stages: + choose_stimulus, wait_for_response, end_of_trial + + The first two stages both return TrialData. The second stage also + returns ContinuousData. + + To understand the stage progression logic, see: + * autopilot.core.pilot.Pilot.run_task - the main loop + * autopilot.tasks.task.Task.handle_trigger - set stage trigger + + To understand the data saving logic, see: + * autopilot.core.terminal.Terminal.l_data - what happens when data is sent + * autopilot.core.subject.Subject.data_thread - how data is saved + """ + + ## Define the class attributes + # This defines params we receive from terminal on session init + # It also determines the params that are available to specify in the + # Protocol creation GUI. + # The params themselves are defined the protocol json. + # Presently these can only be of type int, bool, enum (aka list), or sound + # Defaults cannot be specified here or in the GUI, only in the corresponding + # kwarg in __init__ + PARAMS = odict() + PARAMS['reward'] = { + 'tag':'Reward Duration (ms)', + 'type':'int', + } + + # Per https://docs.auto-pi-lot.com/en/latest/guide/task.html: + # The `TrialData` object is used by the `Subject` class when a task + # is assigned to create the data storage table + # 'trial_num' and 'session_num' get added by the `Subject` class + # 'session_num' is properly set by `Subject`, but 'trial_num' needs + # to be set properly here. + # If they are left unspecified on any given trial, they receive + # a default value, such as 0 for Int32Col. + class TrialData(tables.IsDescription): + # The trial within this session + # Unambigously label this + trial_in_session = tables.Int32Col() + + # If this isn't specified here, it will be added anyway + trial_num = tables.Int32Col() + + # The chosens stimulus and response + # Must specify the max length of the string, we use 64 to be safe + chosen_stimulus = tables.StringCol(64) + chosen_response = tables.StringCol(64) + + # The timestamps + timestamp_trial_start = tables.StringCol(64) + timestamp_response = tables.StringCol(64) + + # Definie continuous data + # https://docs.auto-pi-lot.com/en/latest/guide/task.html + # autopilot.core.subject.Subject.data_thread would like one of the + # keys to be "timestamp" + # Actually, no I think that is extracted automatically from the + # networked message, and should not be defined here + class ContinuousData(tables.IsDescription): + string_data = tables.StringCol(64) + int_data = tables.Int32Col() + + # Per https://docs.auto-pi-lot.com/en/latest/guide/task.html: + # The HARDWARE dictionary maps a hardware type (eg. POKES) and + # identifier (eg. 'L') to a Hardware object. The task uses the hardware + # parameterization in the prefs file (also see setup_pilot) to + # instantiate each of the hardware objects, so their naming system + # must match (ie. there must be a prefs.PINS['POKES']['L'] entry in + # prefs for a task that has a task.HARDWARE['POKES']['L'] object). + HARDWARE = {} + + # This is used by the terminal to plot the results of each trial + PLOT = { + 'data': { + 'target': 'point' + } + } + + + ## Define the class methods + def __init__(self, stage_block, current_trial, step_name, task_type, + subject, step, session, pilot, graduation, reward): + """Initialize a new ex_ContData Task. + + + All arguments are provided by the Terminal. + + Note that this __init__ does not call the superclass __init__, + because that superclass Task inclues functions for punish_block + and so on that we don't want to use. + + Some params, such as `step_name` and `task_type`, are always required + to be specified in the json defining this protocol + + Other params, such as `reward`, are custom to this particular task. + They should be described in the class attribute `PARAMS` above, and + their values should be specified in the protocol json. + + Arguments + --------- + stage_block (:class:`threading.Event`): + used to signal to the carrying Pilot that the current trial + stage is over + current_trial (int): + If not zero, initial number of `trial_counter` + This is set to be 1 greater than the last value of "trial_num" + in the HDF5 file by autopilot.core.subject.Subject.prepare_run + step_name : string + This is passed from the "protocol" json + Currently it is always "ex_ContData" + task_type : string + This is passed from the "protocol" json + Currently it is always "ex_ContData" + subject : string + The name of the subject + step : 0 + Index into the "protocol" json? + session : int + number of times it's been started + pilot : string + The name of this pilot + graduation : dict + Probably a dict of graduation criteria + reward (int): + ms to open solenoids + This is passed from the "protocol" json + """ + + ## These are things that would normally be done in superclass __init__ + # Set up a logger first, so we can debug if anything goes wrong + self.logger = init_logger(self) + + # This threading.Event is checked by Pilot.run_task before + # advancing through stages. Clear it to wait for triggers; set + # it to advance to the next stage. + self.stage_block = stage_block + + # This is needed for sending Node messages + self.subject = subject + + # This is used to count the trials for the "trial_num" HDF5 column + self.counter_trials_across_sessions = itertools.count(int(current_trial)) + + # This is used to count the trials for the "trial_in_session" HDF5 column + self.counter_trials_in_session = itertools.count(0) + + # A dict of hardware triggers + self.triggers = {} + + + ## Define the stages + # Stage list to iterate + # Iterate through these three stages forever + stage_list = [ + self.choose_stimulus, self.wait_for_response, self.end_of_trial] + self.num_stages = len(stage_list) + self.stages = itertools.cycle(stage_list) + + + ## Init hardware -- this sets self.hardware, self.pin_id, and + ## assigns self.handle_trigger to gpio callbacks + self.init_hardware() + + + ## For reporting continuous data to the Terminal + # With instance=True, I get a threading error about current event loop + self.node = Net_Node( + id="T_{}".format(prefs.get('NAME')), + upstream=prefs.get('NAME'), + port=prefs.get('MSGPORT'), + listens={}, + instance=False, + ) + + def choose_stimulus(self): + """A stage that chooses the stimulus""" + # Get timestamp + timestamp_trial_start = datetime.datetime.now() + + # Wait a little before doing anything + self.logger.debug( + 'choose_stimulus: entering stage at {}'.format( + timestamp_trial_start.isoformat())) + time.sleep(3) + + # Choose stimulus randomly + chosen_stimulus = random.choice(['stim0', 'stim1', 'stim2']) + self.logger.debug('choose_stimulus: chose {}'.format(chosen_stimulus)) + + # Continue to the next stage + # CLEAR means "wait for triggers" + # SET means "advance anyway" + self.stage_block.set() + + # Return data about chosen_stim so it will be added to HDF5 + # I think it's best to increment trial_num now, since this is the + # first return from this trial. Even if we don't increment trial_num, + # it will still make another row in the HDF5, but it might warn. + # (This hapepns in autopilot.core.subject.Subject.data_thread) + return { + 'chosen_stimulus': chosen_stimulus, + 'timestamp_trial_start': timestamp_trial_start.isoformat(), + 'trial_num': next(self.counter_trials_across_sessions), + 'trial_in_session': next(self.counter_trials_in_session), + } + + def wait_for_response(self): + """A stage that waits for a response""" + # Wait a little before doing anything + self.logger.debug('wait_for_response: entering stage') + time.sleep(3) + + # Choose response randomly + chosen_response = random.choice(['choice0', 'choice1']) + + # Get timestamp of response + timestamp_response = datetime.datetime.now() + self.logger.debug('wait_for_response: chose {} at {}'.format( + chosen_response, timestamp_response.isoformat())) + + # Directly report continuous data to terminal (aka _T) + # Otherwise it can be encoded in the returned data, but that is only + # once per stage + # subject is needed by core.terminal.Terminal.l_data + # pilot is needed by networking.station.Terminal_Station.l_data + # timestamp and continuous are needed by subject.Subject.data_thread + self.node.send( + to='_T', + key='DATA', + value={ + 'subject': self.subject, + 'pilot': prefs.get('NAME'), + 'continuous': True, + 'string_data': 'a long string', + 'int_data': 3, + 'timestamp': datetime.datetime.now().isoformat(), + }, + ) + + # Continue to the next stage + self.stage_block.set() + + # Return data about chosen_stim so it will be added to HDF5 + # Could also return continuous data here + return { + 'chosen_response': chosen_response, + 'timestamp_response': timestamp_response.isoformat(), + } + + def end_of_trial(self): + """A stage that ends the trial""" + # Wait a little before doing anything + self.logger.debug('end_of_trial: entering stage') + time.sleep(3) + + # Cleanup logic could go here + + # Continue to the next stage + self.stage_block.set() + + # Return TRIAL_END so the Terminal knows the trial is over + return { + 'TRIAL_END': True, + } \ No newline at end of file From 920f3ed6bd285db07086fb019d257788464dd906 Mon Sep 17 00:00:00 2001 From: PAC Lab Date: Thu, 31 Mar 2022 10:40:08 -0400 Subject: [PATCH 5/9] access column names as .columns.keys; catch KeyError and AttributeError --- autopilot/core/subject.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/autopilot/core/subject.py b/autopilot/core/subject.py index acddb184..97ff6a32 100644 --- a/autopilot/core/subject.py +++ b/autopilot/core/subject.py @@ -545,7 +545,7 @@ def assign_protocol(self, protocol, step_n=0): cont_group = h5f.create_group(step_group, "continuous_data") # save data names as attributes - data_names = tuple(task_class.ContinuousData.keys()) + data_names = tuple(task_class.ContinuousData.columns.keys()) cont_group._v_attrs['data'] = data_names #cont_descriptor = task_class.ContinuousData @@ -818,7 +818,7 @@ def data_thread(self, queue): cont_tables = {} cont_rows = {} - except AttributeError: + except (KeyError, AttributeError): continuous_table = False # start getting data @@ -830,13 +830,16 @@ def data_thread(self, queue): # there must be a more elegant way to check if something is a key and it is true... # yet here we are if 'continuous' in data.keys(): + print("in continuous") for k, v in data.items(): # if this isn't data that we're expecting, ignore it if k not in cont_data: + print("{} not in cont_data".format(k)) continue # if we haven't made a table yet, do it if k not in cont_tables.keys(): + print("{} not in cont_tables.keys".format(k)) # make atom for this data try: # if it's a numpy array... @@ -862,10 +865,13 @@ def data_thread(self, queue): }, filters=self.continuous_filter) cont_rows[k] = cont_tables[k].row + + print("created") cont_rows[k][k] = v cont_rows[k]['timestamp'] = data['timestamp'] cont_rows[k].append() + print("stored") # continue, the rest is for handling trial data continue From 419aa8f6f87710e7260b33c5143915e8d1db22cb Mon Sep 17 00:00:00 2001 From: PAC Lab Date: Thu, 31 Mar 2022 14:18:40 -0400 Subject: [PATCH 6/9] remove debug statements --- autopilot/core/subject.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/autopilot/core/subject.py b/autopilot/core/subject.py index 97ff6a32..80489896 100644 --- a/autopilot/core/subject.py +++ b/autopilot/core/subject.py @@ -830,16 +830,13 @@ def data_thread(self, queue): # there must be a more elegant way to check if something is a key and it is true... # yet here we are if 'continuous' in data.keys(): - print("in continuous") for k, v in data.items(): # if this isn't data that we're expecting, ignore it if k not in cont_data: - print("{} not in cont_data".format(k)) continue # if we haven't made a table yet, do it if k not in cont_tables.keys(): - print("{} not in cont_tables.keys".format(k)) # make atom for this data try: # if it's a numpy array... @@ -865,13 +862,10 @@ def data_thread(self, queue): }, filters=self.continuous_filter) cont_rows[k] = cont_tables[k].row - - print("created") cont_rows[k][k] = v cont_rows[k]['timestamp'] = data['timestamp'] cont_rows[k].append() - print("stored") # continue, the rest is for handling trial data continue From 720074838e9b494cdaa8045932c2c41b6b38cef2 Mon Sep 17 00:00:00 2001 From: PAC Lab Date: Thu, 31 Mar 2022 14:26:20 -0400 Subject: [PATCH 7/9] Revert "remove debug statements" This reverts commit 419aa8f6f87710e7260b33c5143915e8d1db22cb. --- autopilot/core/subject.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/autopilot/core/subject.py b/autopilot/core/subject.py index 80489896..97ff6a32 100644 --- a/autopilot/core/subject.py +++ b/autopilot/core/subject.py @@ -830,13 +830,16 @@ def data_thread(self, queue): # there must be a more elegant way to check if something is a key and it is true... # yet here we are if 'continuous' in data.keys(): + print("in continuous") for k, v in data.items(): # if this isn't data that we're expecting, ignore it if k not in cont_data: + print("{} not in cont_data".format(k)) continue # if we haven't made a table yet, do it if k not in cont_tables.keys(): + print("{} not in cont_tables.keys".format(k)) # make atom for this data try: # if it's a numpy array... @@ -862,10 +865,13 @@ def data_thread(self, queue): }, filters=self.continuous_filter) cont_rows[k] = cont_tables[k].row + + print("created") cont_rows[k][k] = v cont_rows[k]['timestamp'] = data['timestamp'] cont_rows[k].append() + print("stored") # continue, the rest is for handling trial data continue From e3950c587abb92b6abc23492e7295ae1eb9c0c5c Mon Sep 17 00:00:00 2001 From: PAC Lab Date: Thu, 31 Mar 2022 11:35:58 -0400 Subject: [PATCH 8/9] same dtype fix for timestamp --- autopilot/core/subject.py | 29 +++++++++++++++++++---------- 1 file changed, 19 insertions(+), 10 deletions(-) diff --git a/autopilot/core/subject.py b/autopilot/core/subject.py index 97ff6a32..af5cfcff 100644 --- a/autopilot/core/subject.py +++ b/autopilot/core/subject.py @@ -840,19 +840,28 @@ def data_thread(self, queue): # if we haven't made a table yet, do it if k not in cont_tables.keys(): print("{} not in cont_tables.keys".format(k)) - # make atom for this data - try: - # if it's a numpy array... - col_atom = tables.Atom.from_type(v.dtype.name, v.shape) - except AttributeError: - temp_array = np.array(v) - col_atom = tables.Atom.from_type(temp_array.dtype.name, temp_array.shape) + + # Make it an array if it's not already + varr = np.asarray(v) + + # Special case string + if 'str' in varr.dtype.name: + col_atom = tables.StringAtom(len(v)) + else: + col_atom = tables.Atom.from_type( + varr.dtype.name, varr.shape) + # should have come in with a timestamp # TODO: Log if no timestamp is received try: - temp_timestamp_arr = np.array(data['timestamp']) - timestamp_atom = tables.Atom.from_type(temp_timestamp_arr.dtype.name, - temp_timestamp_arr.shape) + varr_timestamp = np.asarray(data['timestamp']) + # Special case string + if 'str' in varr_timestamp.dtype.name: + timestamp_atom = tables.StringAtom(len(data['timestamp'])) + else: + timestamp_atom = tables.Atom.from_type( + varr_timestamp.dtype.name, + varr_timestamp.shape) except KeyError: self.logger.warning('no timestamp sent with continuous data') From 91e603b682c14b40993fa38675cb3c1dc32e968c Mon Sep 17 00:00:00 2001 From: PAC Lab Date: Thu, 31 Mar 2022 14:29:01 -0400 Subject: [PATCH 9/9] remove debug statements --- autopilot/core/subject.py | 7 ------- 1 file changed, 7 deletions(-) diff --git a/autopilot/core/subject.py b/autopilot/core/subject.py index af5cfcff..289adf44 100644 --- a/autopilot/core/subject.py +++ b/autopilot/core/subject.py @@ -830,17 +830,13 @@ def data_thread(self, queue): # there must be a more elegant way to check if something is a key and it is true... # yet here we are if 'continuous' in data.keys(): - print("in continuous") for k, v in data.items(): # if this isn't data that we're expecting, ignore it if k not in cont_data: - print("{} not in cont_data".format(k)) continue # if we haven't made a table yet, do it if k not in cont_tables.keys(): - print("{} not in cont_tables.keys".format(k)) - # Make it an array if it's not already varr = np.asarray(v) @@ -874,13 +870,10 @@ def data_thread(self, queue): }, filters=self.continuous_filter) cont_rows[k] = cont_tables[k].row - - print("created") cont_rows[k][k] = v cont_rows[k]['timestamp'] = data['timestamp'] cont_rows[k].append() - print("stored") # continue, the rest is for handling trial data continue