-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathutils.py
More file actions
119 lines (95 loc) · 3.49 KB
/
utils.py
File metadata and controls
119 lines (95 loc) · 3.49 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
import logging
import os
import re
from enum import Enum
from pathlib import Path
from typing import Tuple
from urllib.parse import unquote, urlparse
from servicex import Sample, ServiceXSpec, dataset
from servicex_local import find_dataset, install_sx_local, Platform
class SXLocationOptions(Enum):
"""Options for which backend we can use"""
mustUseLocal = "mustUseLocal"
mustUseRemote = "mustUseRemote"
anyLocation = "anyLocation"
def build_sx_spec(query, ds_name: Tuple[str, list[str]], prefer_local: bool = False):
"""Build a ServiceX spec from the given query and dataset."""
# Pass our local preference to find_dataset.
dataset, location_options = find_dataset(ds_name, prefer_local=prefer_local)
# Determine whether to use the local endpoint.
if location_options == SXLocationOptions.mustUseRemote:
use_local = False
elif prefer_local or location_options == SXLocationOptions.mustUseLocal:
use_local = True
else:
use_local = False
adaptor = None
# Second branch: decide on the backend.
if use_local:
codegen_name, backend_name, adaptor = install_sx_local("docker://sslhep/servicex_func_adl_xaod_transformer:25.2.41", Platform.singularity)
else:
backend_name = "af.uchicago"
codegen_name = "atlasr25"
# Build the ServiceX spec
spec = ServiceXSpec(
Sample=[
Sample(
Name="MySample",
Dataset=dataset,
Query=query,
Codegen=codegen_name,
),
],
)
return spec, backend_name, adaptor
import servicex_local as sx_local
from func_adl import ObjectStream
from dataclasses import dataclass
from servicex_analysis_utils import to_awk
from servicex import deliver
@dataclass
class RunConfig:
ignore_cache: bool
run_locally: bool
def run_query(
ds_name: Tuple[str, list[str]],
query: ObjectStream,
config: RunConfig = RunConfig(ignore_cache=False, run_locally=False),
):
# Build the ServiceX spec and run it.
spec, backend_name, adaptor = build_sx_spec(query, ds_name, config.run_locally)
if config.run_locally or backend_name == "local-backend":
sx_result = sx_local.deliver(
spec, adaptor=adaptor, ignore_local_cache=config.ignore_cache
)
else:
if config.run_locally:
raise ValueError(f"Unable to run dataset {ds_name} locally.")
sx_result = deliver(
spec, servicex_name=backend_name, ignore_local_cache=config.ignore_cache
)
result_list = to_awk(sx_result)["MySample"]
return result_list
from ruamel.yaml import YAML
from ruamel.yaml.scalarstring import DoubleQuotedScalarString
def insert_script_into_yaml(corrections_path, yaml_block):
yaml_path="types.yaml"
output_path="types.yaml"
yaml = YAML()
yaml.preserve_quotes = True
with open(corrections_path, "r") as f:
corrections_lines = [DoubleQuotedScalarString(line.rstrip('\n')) for line in f]
with open(yaml_path, "r") as f:
yaml_data = yaml.load(f)
found = False
for entry in yaml_data.get("files", []):
if entry.get("name") == yaml_block:
entry["contents"] = corrections_lines
found = True
break
if not found:
raise ValueError("Could not find the block with name " + yaml_block + " in 'files'")
# Save to output YAML file
with open(output_path, "w") as f:
yaml.dump(yaml_data, f)
print(f"Updated YAML written to: {output_path}")