Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 24 additions & 4 deletions src/prefect/cli/_prompts.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
from prefect.utilities import urls
from prefect.utilities._git import get_git_remote_origin_url
from prefect.utilities.asyncutils import LazySemaphore
from prefect.utilities.filesystem import get_open_file_limit
from prefect.utilities.filesystem import filter_files, get_open_file_limit
from prefect.utilities.processutils import get_sys_executable, run_process
from prefect.utilities.slugify import slugify

Expand All @@ -75,6 +75,8 @@
# actors to open files.
OPEN_FILE_SEMAPHORE = LazySemaphore(lambda: math.floor(get_open_file_limit() * 0.5))

logger = get_logger(__name__)


async def find_flow_functions_in_file(path: anyio.Path) -> list[dict[str, str]]:
decorator_name = "flow"
Expand Down Expand Up @@ -162,18 +164,36 @@ async def find_flow_functions_in_file(path: anyio.Path) -> list[dict[str, str]]:
return decorated_functions


async def search_for_flow_functions(directory: str = ".") -> list[dict[str, str]]:
async def search_for_flow_functions(
directory: str = ".", exclude_patterns: list[str] | None = None
) -> list[dict[str, str]]:
"""
Search for flow functions in the provided directory. If no directory is provided,
the current working directory is used.

Args:
directory: The directory to search in
exclude_patterns: List of patterns to exclude from the search, defaults to
["**/site-packages/**"]

Returns:
List[Dict]: the flow name, function name, and filepath of all flow functions found
"""
path = anyio.Path(directory)
exclude_patterns = exclude_patterns or ["**/site-packages/**"]
coros: list[Coroutine[list[dict[str, str]], Any, Any]] = []
async for file in path.rglob("*.py"):
coros.append(find_flow_functions_in_file(file))

try:
for file in filter_files(
root=str(path),
ignore_patterns=["*", "!**/*.py", *exclude_patterns],
include_dirs=False,
):
coros.append(find_flow_functions_in_file(anyio.Path(str(path / file))))

except (PermissionError, OSError) as e:
logger.error(f"Error searching for flow functions: {e}")
return []

return [fn for file_fns in await asyncio.gather(*coros) for fn in file_fns]

Expand Down
22 changes: 22 additions & 0 deletions tests/cli/test_prompts.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,28 @@ async def test_find_flow_functions_in_file_returns_empty_list_on_file_error(
assert await find_flow_functions_in_file(AnyioPath("foo.py")) == []
assert "Could not open foo.py" in caplog.text

async def test_excludes_site_packages(self, project_dir: Path):
"""Test that search_for_flow_functions excludes site-packages directories."""
site_packages = project_dir / "lib" / "python3.8" / "site-packages"
site_packages.mkdir(parents=True)
(site_packages / "package" / "flows").mkdir(parents=True)
(site_packages / "package" / "flows" / "flow.py").write_text(
"from prefect import flow\n@flow\ndef site_pkg_flow(): pass"
)

(project_dir / "flow.py").write_text(
"from prefect import flow\n@flow\ndef regular_flow(): pass"
)

flows = await search_for_flow_functions(str(project_dir))

assert not any("site-packages" in flow["filepath"] for flow in flows)

assert any(
flow["flow_name"] == "regular_flow" and flow["filepath"].endswith("flow.py")
for flow in flows
)

async def test_prefect_can_be_imported_from_non_main_thread(self):
"""testing due to `asyncio.Semaphore` error when importing prefect from a worker thread
in python <= 3.9
Expand Down