Skip to content

Commit 7fb701e

Browse files
feat(executors): add support for specifying prefix for query path
1 parent 31faa1d commit 7fb701e

File tree

2 files changed

+13
-3
lines changed

2 files changed

+13
-3
lines changed

libs/executors/garf/executors/entrypoints/cli.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -105,12 +105,14 @@ def main():
105105
)
106106
for query in queries:
107107
if isinstance(query, garf.executors.workflow.QueryPath):
108+
query_path = query.full_path
108109
if re.match(
109-
'^(http|gs|s3|aruze|hdfs|webhdfs|ssh|scp|sftp)', query.path
110+
'^(http|gs|s3|aruze|hdfs|webhdfs|ssh|scp|sftp)', query_path
110111
):
111-
batch[query.path] = reader_client.read(query.path)
112+
batch[query.path] = reader_client.read(query_path)
112113
else:
113-
query_path = wf_parent / pathlib.Path(query.path)
114+
if not query.prefix:
115+
query_path = wf_parent / pathlib.Path(query.path)
114116
if not query_path.exists():
115117
raise workflow.GarfWorkflowError(
116118
f'Query: {query_path} not found'

libs/executors/garf/executors/workflow.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
import os
1717
import pathlib
18+
import re
1819

1920
import pydantic
2021
import smart_open
@@ -37,6 +38,13 @@ class QueryPath(pydantic.BaseModel):
3738
"""Path file with query."""
3839

3940
path: str
41+
prefix: str | None = None
42+
43+
@property
44+
def full_path(self) -> str:
45+
if self.prefix:
46+
return re.sub('/$', '', self.prefix) + '/' + self.path
47+
return self.path
4048

4149

4250
class QueryDefinition(pydantic.BaseModel):

0 commit comments

Comments
 (0)