|
4 | 4 | from enum import Enum |
5 | 5 | from functools import cache |
6 | 6 | from pathlib import Path |
| 7 | +from typing import Any, AsyncGenerator |
7 | 8 |
|
8 | 9 | import psutil |
9 | 10 | from loguru import logger |
@@ -69,6 +70,74 @@ def delete_everything(path: Path) -> None: |
69 | 70 | logger.warning(f"Failed to delete: {item}, {exception}") |
70 | 71 |
|
71 | 72 |
|
| 73 | +async def delete_everything_stream(path: Path) -> AsyncGenerator[dict[str, Any], None]: |
| 74 | + """Delete everything in a path and yield information about each file being deleted. |
| 75 | +
|
| 76 | + Args: |
| 77 | + path: Path to delete |
| 78 | +
|
| 79 | + Yields: |
| 80 | + Dictionary containing information about each file being deleted: |
| 81 | + { |
| 82 | + 'path': str, # Path of the file being deleted |
| 83 | + 'size': int, # Size of the file in bytes |
| 84 | + 'type': str, # 'file' or 'directory' |
| 85 | + 'success': bool # Whether deletion was successful |
| 86 | + } |
| 87 | + """ |
| 88 | + if path.is_file() and not file_is_open(path): |
| 89 | + try: |
| 90 | + size = path.stat().st_size |
| 91 | + path.unlink() |
| 92 | + # fmt: off |
| 93 | + yield { |
| 94 | + "path": str(path), |
| 95 | + "size": size, |
| 96 | + "type": "file", |
| 97 | + "success": True |
| 98 | + } |
| 99 | + # fmt: on |
| 100 | + except Exception as exception: |
| 101 | + logger.warning(f"Failed to delete: {path}, {exception}") |
| 102 | + # fmt: off |
| 103 | + yield { |
| 104 | + "path": str(path), |
| 105 | + "size": 0, |
| 106 | + "type": "file", |
| 107 | + "success": False |
| 108 | + } |
| 109 | + # fmt: on |
| 110 | + return |
| 111 | + |
| 112 | + for item in path.glob("*"): |
| 113 | + try: |
| 114 | + if item.is_file() and not file_is_open(item): |
| 115 | + size = item.stat().st_size |
| 116 | + item.unlink() |
| 117 | + # fmt: off |
| 118 | + yield { |
| 119 | + "path": str(item), |
| 120 | + "size": size, |
| 121 | + "type": "file", |
| 122 | + "success": True |
| 123 | + } |
| 124 | + # fmt: on |
| 125 | + if item.is_dir() and not item.is_symlink(): |
| 126 | + # Delete folder contents |
| 127 | + async for info in delete_everything_stream(item): |
| 128 | + yield info |
| 129 | + except Exception as exception: |
| 130 | + logger.warning(f"Failed to delete: {item}, {exception}") |
| 131 | + # fmt: off |
| 132 | + yield { |
| 133 | + "path": str(item), |
| 134 | + "size": 0, |
| 135 | + "type": "directory" if item.is_dir() else "file", |
| 136 | + "success": False |
| 137 | + } |
| 138 | + # fmt: on |
| 139 | + |
| 140 | + |
72 | 141 | def file_is_open(path: Path) -> bool: |
73 | 142 | try: |
74 | 143 | kernel_functions_timeout = str(2) |
|
0 commit comments