Skip to content

Commit 6e5797b

Browse files
committed
rules/merge_inputs: Switch to shared remote file support
Removes the need to custom rules to download from AWS S3 and adds support for https, but requires Snakemake >=8.0.0.
1 parent 40bcee8 commit 6e5797b

2 files changed

Lines changed: 25 additions & 56 deletions

File tree

rules/main.smk

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,8 @@
1+
from snakemake.utils import min_version
2+
3+
# Minimum Snakemake version needed for the storage plugins used in remote_files.smk
4+
min_version("8.0.0")
5+
16
# constrain the wildcards to not include `_` which we use to separate "parts" of filenames (where a part may be a wildcard itself)
27
wildcard_constraints:
38
subtype = "[^_/]+",
@@ -14,7 +19,6 @@ SEGMENTS = ["pb2", "pb1", "pa", "ha","np", "na", "mp", "ns"]
1419
# (2) Filter the other segments by simply force-including the same strains as (1)
1520
SAME_STRAINS = bool(config.get('same_strains_per_segment', False))
1621

17-
NEXTSTRAIN_PUBLIC_BUCKET = "s3://nextstrain-data/"
1822

1923
rule all:
2024
input:
@@ -33,6 +37,7 @@ rule test_target:
3337
class InvalidConfigError(Exception):
3438
pass
3539

40+
include: "../shared/vendored/snakemake/remote_files.smk"
3641
# This uses the `InvalidConfigError` defined above
3742
include: "merge_inputs.smk"
3843

rules/merge_inputs.smk

Lines changed: 19 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -13,60 +13,50 @@ def _parse_config_input(input):
1313
in which case it must not include the wildcard substring.
1414
1515
Returns a dictionary with optional keys:
16-
- metadata:string - the relative path to the metadata file. If the original data was remote then this represents
17-
the output of a rule which downloads the file
18-
- metadata_location:string - the URI for the remote file if applicable else `None`
19-
- sequences:function. Takes in wildcards and returns the relative path to the sequences FASTA for the provided
16+
- metadata:string - path or url to the metadata file.
17+
- sequences:function. Takes in wildcards and returns path or url to the sequences FASTA for the provided
2018
segment wildcard, or returns `None` if this input doesn't define sequences for the provided segment.
21-
- sequences_location:function. Takes in wildcards and returns the URI for the remote file, or `None`, where applicable.
2219
2320
Raises InvalidConfigError
2421
"""
25-
name = input['name']
26-
lambda_none = lambda w: None
27-
28-
info = {'metadata': None, 'metadata_location': None, 'sequences': lambda_none, 'sequences_location': lambda_none}
29-
30-
def _source(uri, *, s3, local):
31-
if uri.startswith('s3://'):
32-
return s3
33-
elif uri.lower().startswith('http://') or uri.lower().startswith('https://'):
34-
raise InvalidConfigError("Workflow cannot yet handle HTTP[S] inputs")
35-
return local
36-
37-
if location:=input.get('metadata', False):
38-
info['metadata'] = _source(location, s3=f"data/{name}/metadata.tsv", local=location)
39-
info['metadata_location'] = _source(location, s3=location, local=None)
22+
info = {
23+
"name": input["name"],
24+
"metadata": path_or_url(input["metadata"]) if input.get("metadata") else None,
25+
"sequences": None,
26+
}
4027

4128
if location:=input.get('sequences', False):
4229
if isinstance(location, dict):
43-
info['sequences'] = lambda w: _source(location[w.segment], s3=f"data/{name}/sequences_{w.segment}.fasta", local=location[w.segment]) \
44-
if w.segment in location \
45-
else None
46-
info['sequences_location'] = lambda w: _source(location[w.segment], s3=location[w.segment], local=None) \
30+
info['sequences'] = lambda w: path_or_url(location[w.segment]) \
4731
if w.segment in location \
4832
else None
4933
elif isinstance(location, str):
50-
info['sequences'] = _source(location, s3=lambda w: f"data/{name}/sequences_{w.segment}.fasta", local=lambda w: location.format(segment=w.segment))
51-
info['sequences_location'] = _source(location, s3=lambda w: location.format(segment=w.segment), local=lambda_none)
34+
info['sequences'] = lambda w: path_or_url(location)
5235
else:
53-
raise InvalidConfigError(f"Config input for {name} specifies sequences in an unknown format; must be dict or string")
36+
raise InvalidConfigError(f"Config input for {info['name']} specifies sequences in an unknown format; must be dict or string")
5437

5538
return info
5639

57-
5840
def _gather_inputs():
5941
all_inputs = [*config['inputs'], *config.get('additional_inputs', [])]
6042

6143
if len(all_inputs)==0:
6244
raise InvalidConfigError("Config must define at least one element in config.inputs or config.additional_inputs lists")
6345
if not all([isinstance(i, dict) for i in all_inputs]):
64-
raise InvalidConfigError("All of the elements in config.inputs and config.additional_inputs lists must be dictionaries"
46+
raise InvalidConfigError("All of the elements in config.inputs and config.additional_inputs lists must be dictionaries. "
6547
"If you've used a command line '--config' double check your quoting.")
6648
if len({i['name'] for i in all_inputs})!=len(all_inputs):
6749
raise InvalidConfigError("Names of inputs (config.inputs and config.additional_inputs) must be unique")
6850
if not all(['name' in i and ('sequences' in i or 'metadata' in i) for i in all_inputs]):
6951
raise InvalidConfigError("Each input (config.inputs and config.additional_inputs) must have a 'name' and 'metadata' and/or 'sequences'")
52+
if not any(['metadata' in i for i in all_inputs]):
53+
raise InvalidConfigError("At least one input must have 'metadata'")
54+
if not any (['sequences' in i for i in all_inputs]):
55+
raise InvalidConfigError("At least one input must have 'sequences'")
56+
57+
available_keys = set(['name', 'metadata', 'sequences'])
58+
if any([len(set(el.keys())-available_keys)>0 for el in all_inputs]):
59+
raise InvalidConfigError(f"Each input (config.inputs and config.additional_inputs) can only include keys of {', '.join(available_keys)}")
7060

7161
return {i['name']: _parse_config_input(i) for i in all_inputs}
7262

@@ -80,32 +70,6 @@ def input_sequences(wildcards):
8070
inputs = list(filter(None, [info['sequences'](wildcards) for info in input_sources.values() if info.get('sequences', None)]))
8171
return inputs[0] if len(inputs)==1 else "results/sequences_merged_{segment}.fasta"
8272

83-
rule download_s3_sequences:
84-
output:
85-
sequences = "data/{input_name}/sequences_{segment}.fasta",
86-
params:
87-
address = lambda w: input_sources[w.input_name]['sequences_location'](w),
88-
no_sign_request=lambda w: "--no-sign-request" \
89-
if input_sources[w.input_name]['sequences_location'](w).startswith(NEXTSTRAIN_PUBLIC_BUCKET) \
90-
else "",
91-
shell:
92-
"""
93-
aws s3 cp {params.no_sign_request:q} {params.address:q} - | zstd -d > {output.sequences}
94-
"""
95-
96-
rule download_s3_metadata:
97-
output:
98-
metadata = "data/{input_name}/metadata.tsv",
99-
params:
100-
address = lambda w: input_sources[w.input_name]['metadata_location'],
101-
no_sign_request=lambda w: "--no-sign-request" \
102-
if input_sources[w.input_name]['metadata_location'].startswith(NEXTSTRAIN_PUBLIC_BUCKET) \
103-
else "",
104-
shell:
105-
"""
106-
aws s3 cp {params.no_sign_request:q} {params.address:q} - | zstd -d > {output.metadata}
107-
"""
108-
10973
rule merge_metadata:
11074
"""
11175
This rule should only be invoked if there are multiple defined metadata inputs

0 commit comments

Comments
 (0)