Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 87 additions & 0 deletions K8/CronJobs/minion-reg-over-time.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
apiVersion: batch/v1beta1
kind: CronJob # This tells kubernetes what kind of class it is working with
metadata:
name: minion-rot
spec:
schedule: "*/5 * * * *" # every 5 minutes
concurrencyPolicy: Allow
failedJobsHistoryLimit: 10
successfulJobsHistoryLimit: 10
jobTemplate:
spec:
parallelism: 1 # how many process in parallel I want
template:
spec:
restartPolicy: Never
hostNetwork: true # This option will allow the pod to use the host network for internet access
volumes:
- name: mnt
hostPath:
path: /mnt
affinity: # Affinity to select certain nodes with 11GB, 12GB, or 24GB memory
nodeAffinity:
requiredDuringSchedulingIgnoredDuringExecution: # Require nodes to have this label
nodeSelectorTerms:
- matchExpressions:
- key: kubernetes.io/hostname # Target label is gpu_mem_size
operator: In # Key must have one of the following values
values:
- at-compute003
- at-compute004
- at-compute005
- at-compute006
- at-compute007
- at-compute008
- at-compute009
- at-compute010
- at-compute011
- at-compute012
- at-compute013
- at-compute014
priorityClassName: medium-priority
containers:
- name: minion
image: at-docker.ad.bcm.edu:5000/pipeline:latest
volumeMounts:
- name: mnt
mountPath: /mnt
resources:
requests:
cpu: 1
memory: 32Gi
ephemeral-storage: 50Gi
env:
- name: DJ_HOST
valueFrom:
secretKeyRef:
name: datajoint-credentials
key: DJ_HOST
- name: DJ_USER
valueFrom:
secretKeyRef:
name: datajoint-credentials
key: DJ_USER
- name: DJ_PASS
valueFrom:
secretKeyRef:
name: datajoint-credentials
key: DJ_PASS
- name: GITHUB_USERNAME
valueFrom:
secretKeyRef:
name: github-credentials
key: GITHUB_USERNAME
- name: GITHUB_PASSWORD
valueFrom:
secretKeyRef:
name: github-credentials
key: GITHUB_PASSWORD
command: ["/bin/bash"]
args: ["-c", "rm -r pipeline &&\
git clone https://$(GITHUB_USERNAME):$(GITHUB_PASSWORD)@github.com/ZhuokunDing/pipeline.git &&\
pip3 install pipeline/python/ &&\
git clone https://$(GITHUB_USERNAME):$(GITHUB_PASSWORD)@github.com/cajal/stimulus-pipeline.git &&\
pip3 install stimulus-pipeline/python/ &&\
git clone https://$(GITHUB_USERNAME):$(GITHUB_PASSWORD)@github.com/cajal/stimuli.git &&\
pip3 install stimuli/python/ &&\
python3 /data/pipeline/python/scripts/populate-reg-over-time.py"]
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ services:


bash:
image: ninai/stimulus-pipeline:latest
image: at-docker:5000/stimulus-pipeline:latest
volumes:
- /mnt/:/mnt/
- /home/pipeline_data/:/tmp/
Expand Down
23 changes: 20 additions & 3 deletions python/pipeline/stack.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
dj.config['cache'] = '/tmp/dj-cache'


schema = dj.schema('pipeline_stack', locals(), create_tables=False)
schema = dj.schema('pipeline_stack', locals(), create_tables=True)

@schema
class StackInfo(dj.Imported):
Expand Down Expand Up @@ -2067,18 +2067,35 @@ def _make_tuples(self, key):
'distance': distance})


@schema
class RegistrationOverTimeTask(dj.Manual):
definition = """ # scans that will be candidates for stack.RegistrationOverTime
-> CorrectedStack.proj(stack_session='session') # animal_id, stack_session, stack_idx, volume_id
-> shared.Channel.proj(stack_channel='channel')
-> experiment.Scan.proj(scan_session='session') # animal_id, scan_session, scan_idx
-> shared.Channel.proj(scan_channel='channel')
-> shared.Field
-> shared.RegistrationMethod
"""

@schema
class RegistrationOverTime(dj.Computed):
definition = """ # register a field at different timepoints of recording

-> PreprocessedStack.proj(stack_session='session', stack_channel='channel')
-> RegistrationTask
"""

@property
def key_source(self):
stacks = PreprocessedStack.proj(stack_session='session', stack_channel='channel')
return stacks * RegistrationTask & {'registration_method': 5}
keys = (
PreprocessedStack.proj(stack_session='session', stack_channel='channel')
* (RegistrationTask & {'registration_method': 5})
* experiment.Stack.proj(..., stack_session='session')
) & RegistrationOverTimeTask
return keys


class Chunk(dj.Part):
definition = """ # single registered chunk

Expand Down
29 changes: 29 additions & 0 deletions python/scripts/populate-reg-over-time.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
#!/usr/local/bin/python3

'''
For populating RegistrationOverTime
Prerequisite:
- insert scans and stacks to RegistrationTask
- run populate-minion.py (this is usually run by the pipeline minion automatically)
- insert scans and stacks of interest into stack.ZDriftQuery
'''

from pipeline import stack
import logging
import datajoint as dj

## database logging code

logging.basicConfig(level=logging.ERROR)
logging.getLogger('datajoint.connection').setLevel(logging.DEBUG)
if hasattr(dj.connection, 'query_log_max_length'):
dj.connection.query_log_max_length = 3000

# delete errors
err_msg_timeout = 'error_message = "InternalError: (1205, \'Lock wait timeout exceeded; try restarting transaction\')"'
err_msg_sigterm = 'error_message = "SystemExit: SIGTERM received"'
timestamp = 'timestamp > "2021-12-10"'
(stack.schema.jobs & [err_msg_timeout, err_msg_sigterm] & timestamp).delete()

# populate RegistrationOverTime
stack.RegistrationOverTime.populate(reserve_jobs=True, suppress_errors=True)