Skip to content

Commit e8f233d

Browse files
feat(utils): add AdlsLakeFileSystem for Azure Data Lake Storage (#70)
**New ADLS Gen2 backend and API enhancements:** * Added the `AdlsLakeFileSystem` class, providing direct access to Azure Data Lake Storage Gen2 via the Azure SDK, with methods matching `LakeFileSystem` and additional directory listing support. * Updated `__init__.py` to support lazy importing of `AdlsLakeFileSystem`, preventing hard dependency on Azure packages unless needed. * Extended the `pyproject.toml` to include `azure-storage-file-datalake` in the `azure` extra requirements for proper dependency management. **Documentation improvements:** * Revised `docs/packages/dataorc-utils/lake/index.md` to explain both `LakeFileSystem` and `AdlsLakeFileSystem`, their APIs, constructor options, path handling differences, and authentication details. **Testing enhancements:** * Consolidated tests for `LakeFileSystem` to also work for `AdlsLakeFileSystem`.
1 parent 9a03420 commit e8f233d

File tree

5 files changed

+351
-30
lines changed

5 files changed

+351
-30
lines changed

docs/packages/dataorc-utils/lake/index.md

Lines changed: 98 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,19 +4,28 @@ title: dataorc-utils - Lake
44

55
# dataorc-utils — Lake
66

7-
Filesystem utilities for reading and writing to the Data Lake in Databricks pipelines.
7+
Filesystem utilities for reading and writing to the Data Lake.
88

99
## Overview
1010

11-
The `lake` module provides a unified interface for file operations on Azure Data Lake Storage,
12-
abstracting away the differences between local development and Databricks runtime environments.
11+
The `lake` module provides a unified interface for file operations on Azure Data Lake Storage.
12+
Two implementations are available:
13+
14+
| Class | Backend | Use case |
15+
|-------|---------|----------|
16+
| `LakeFileSystem` | Local / FUSE mount (via `fsspec`) | Databricks with mounted storage |
17+
| `AdlsLakeFileSystem` | ADLS Gen2 SDK (direct) | Any environment — no mounts or dbutils needed |
18+
19+
Both classes expose the **same core API** (`read_text`, `write_text`, `read_json`, `write_json`,
20+
`exists`, `delete`), so switching between them requires only changing the constructor.
1321

1422
**Key design principle:** The module is **path-agnostic**. It performs pure I/O operations
15-
without assuming any specific mounting conventions. Path normalization (e.g., `dls://``/mnt/...`)
16-
is the responsibility of your pipeline code.
23+
without assuming any specific mounting conventions.
1724

1825
## Quick start
1926

27+
### LakeFileSystem (mount-based)
28+
2029
```python
2130
from dataorc_utils.lake import LakeFileSystem
2231

@@ -36,6 +45,40 @@ if fs.exists("old_file.txt"):
3645
fs.delete("old_file.txt")
3746
```
3847

48+
### AdlsLakeFileSystem (direct ADLS Gen2)
49+
50+
!!! note "Requires the `azure` extra"
51+
Install with: `pip install dataorc-utils[azure]`
52+
53+
```python
54+
from dataorc_utils.lake import AdlsLakeFileSystem
55+
56+
# Connect directly to ADLS Gen2 — no mounts or dbutils required
57+
fs = AdlsLakeFileSystem(
58+
account_url="https://testdatadevsc.dfs.core.windows.net",
59+
container="bronze",
60+
base_path="sales/orders", # optional prefix inside the container
61+
)
62+
63+
# Same API from here on
64+
fs.write_text("metadata.txt", "Pipeline run: 2026-02-02")
65+
content = fs.read_text("metadata.txt")
66+
67+
fs.write_json("config.json", {"version": 1, "status": "complete"})
68+
config = fs.read_json("config.json")
69+
70+
if fs.exists("old_file.txt"):
71+
fs.delete("old_file.txt")
72+
73+
# Bonus: list files in a directory
74+
files = fs.list_paths("_metadata/")
75+
```
76+
77+
Authentication uses `DefaultAzureCredential` by default, which supports
78+
Managed Identity, Azure CLI (`az login`), and environment variables.
79+
You can also pass a custom credential via the `credential` parameter
80+
(e.g. `ManagedIdentityCredential()`).
81+
3982
## API Reference
4083

4184
### LakeFileSystem
@@ -77,6 +120,48 @@ fs = LakeFileSystem(base_path="/dbfs/mnt/datalakestore/bronze")
77120
| `exists(path)` | `bool` | Check if a file or directory exists. |
78121
| `delete(path)` | `bool` | Delete a file. Returns `True` if deleted, `False` if didn't exist. |
79122

123+
---
124+
125+
### AdlsLakeFileSystem
126+
127+
Direct connection to ADLS Gen2 — no mounts or Databricks utilities required.
128+
129+
#### Constructor
130+
131+
| Parameter | Type | Description |
132+
|-----------|------|-------------|
133+
| `account_url` | `str` | Full DFS endpoint, e.g. `"https://<account>.dfs.core.windows.net"` |
134+
| `container` | `str` | File-system / container name, e.g. `"bronze"` |
135+
| `base_path` | `str` | Optional prefix inside the container prepended to every path. Defaults to `""`. |
136+
| `credential` | `Any \| None` | Any Azure credential accepted by the SDK. Defaults to `DefaultAzureCredential()`. |
137+
138+
#### Methods
139+
140+
`AdlsLakeFileSystem` exposes the same text, JSON, and directory methods as `LakeFileSystem`,
141+
plus one additional method:
142+
143+
##### Text Operations
144+
145+
| Method | Returns | Description |
146+
|--------|---------|-------------|
147+
| `read_text(path)` | `str \| None` | Read a UTF-8 text file. Returns `None` if the file doesn't exist. |
148+
| `write_text(path, content)` | `None` | Write (or overwrite) a UTF-8 text file. |
149+
150+
##### JSON Operations
151+
152+
| Method | Returns | Description |
153+
|--------|---------|-------------|
154+
| `read_json(path)` | `dict \| None` | Read and parse a JSON file. Returns `None` if the file doesn't exist or parse fails. |
155+
| `write_json(path, data, indent=2)` | `None` | Write a dictionary as JSON. |
156+
157+
##### Directory Operations
158+
159+
| Method | Returns | Description |
160+
|--------|---------|-------------|
161+
| `exists(path)` | `bool` | Check if a file exists. |
162+
| `delete(path)` | `bool` | Delete a file. Returns `True` if deleted, `False` otherwise. |
163+
| `list_paths(path, recursive=True)` | `list[str]` | List file paths under `path`. Returns paths relative to `base_path`. |
164+
80165
## Usage in Pipelines
81166

82167
### With CorePipelineConfig
@@ -105,6 +190,8 @@ fs.write_json("_metadata/run_info.json", {
105190

106191
### Path Handling
107192

193+
#### LakeFileSystem
194+
108195
The module does **not** perform path normalization. Your pipeline code is responsible for
109196
providing correct absolute paths for the runtime environment.
110197

@@ -119,6 +206,12 @@ fs = LakeFileSystem()
119206
fs.write_text("/dbfs/mnt/datalakestore/bronze/file.txt", "content")
120207
```
121208

209+
#### AdlsLakeFileSystem
210+
211+
Paths are always **relative to the container and `base_path`** — no mount prefixes needed.
212+
For example, with `container="bronze"` and `base_path="sales/orders"`,
213+
calling `fs.write_text("file.txt", ...)` resolves to `bronze/sales/orders/file.txt`.
214+
122215
### Error Handling
123216

124217
The module returns `None` for missing files rather than raising exceptions:

packages/dataorc-utils/pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ dev = [
2929
azure = [
3030
"azure-identity",
3131
"azure-keyvault-secrets",
32+
"azure-storage-file-datalake",
3233
]
3334

3435
[tool.ruff]
Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
1-
"""Data lake filesystem utilities for Databricks pipelines."""
1+
"""Data lake filesystem utilities."""
22

3+
from .adls_filesystem import AdlsLakeFileSystem
34
from .filesystem import LakeFileSystem
45

5-
__all__ = ["LakeFileSystem"]
6+
__all__ = ["AdlsLakeFileSystem", "LakeFileSystem"]
Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
"""AdlsLakeFileSystem - ADLS Gen2-backed file operations.
2+
3+
Drop-in alternative for LakeFileSystem that connects directly to
4+
Azure Data Lake Storage Gen2 via the Azure SDK, removing the
5+
dependency on Databricks mounts / dbutils.
6+
7+
Requires the ``azure`` extra::
8+
9+
pip install dataorc-utils[azure]
10+
"""
11+
12+
from __future__ import annotations
13+
14+
import json
15+
import logging
16+
from typing import Any
17+
18+
from azure.identity import DefaultAzureCredential
19+
from azure.storage.filedatalake import DataLakeServiceClient
20+
21+
logger = logging.getLogger(__name__)
22+
23+
# Type alias for JSON-serializable data (matches LakeFileSystem)
24+
JSONValue = dict[str, Any] | list[Any] | str | int | float | bool | None
25+
26+
27+
class AdlsLakeFileSystem:
28+
"""ADLS Gen2-backed file operations — drop-in for LakeFileSystem.
29+
30+
Args:
31+
account_url: Full DFS endpoint, e.g.
32+
``"https://<storage_account>.dfs.core.windows.net"``
33+
container: File-system / container name, e.g. ``"bronze"``
34+
base_path: Optional prefix inside the container prepended to
35+
every path.
36+
credential: Any Azure credential accepted by the SDK.
37+
Defaults to ``DefaultAzureCredential()``.
38+
39+
Example::
40+
41+
fs = AdlsLakeFileSystem(
42+
account_url="https://testdatadevsc.dfs.core.windows.net",
43+
container="bronze",
44+
base_path="raw/my_pipeline",
45+
)
46+
fs.write_json("data.json", {"key": "value"})
47+
data = fs.read_json("data.json")
48+
"""
49+
50+
def __init__(
51+
self,
52+
account_url: str,
53+
container: str,
54+
base_path: str = "",
55+
credential: Any | None = None,
56+
):
57+
self._credential = credential or DefaultAzureCredential()
58+
self._service = DataLakeServiceClient(
59+
account_url=account_url,
60+
credential=self._credential,
61+
)
62+
self._fs_client = self._service.get_file_system_client(
63+
file_system=container,
64+
)
65+
self._base_path = base_path.strip("/")
66+
67+
# ------------------------------------------------------------------
68+
# Internal helpers
69+
# ------------------------------------------------------------------
70+
71+
def _resolve(self, path: str) -> str:
72+
"""Join *path* with the configured base-path prefix."""
73+
path = path.lstrip("/")
74+
if self._base_path:
75+
return f"{self._base_path}/{path}"
76+
return path
77+
78+
# ------------------------------------------------------------------
79+
# Text operations
80+
# ------------------------------------------------------------------
81+
82+
def read_text(self, path: str) -> str | None:
83+
"""Read a UTF-8 text file. Returns ``None`` if the file does not exist."""
84+
resolved = self._resolve(path)
85+
try:
86+
file_client = self._fs_client.get_file_client(resolved)
87+
download = file_client.download_file()
88+
return download.readall().decode("utf-8")
89+
except Exception:
90+
logger.debug("Could not read %s", resolved, exc_info=True)
91+
return None
92+
93+
def write_text(self, path: str, content: str) -> None:
94+
"""Write (or overwrite) a UTF-8 text file.
95+
96+
Parent "directories" are created implicitly by ADLS Gen2.
97+
"""
98+
resolved = self._resolve(path)
99+
file_client = self._fs_client.get_file_client(resolved)
100+
file_client.upload_data(content.encode("utf-8"), overwrite=True)
101+
102+
# ------------------------------------------------------------------
103+
# JSON operations
104+
# ------------------------------------------------------------------
105+
106+
def read_json(self, path: str) -> JSONValue:
107+
"""Read a JSON file. Returns ``None`` if the file does not exist or parsing fails."""
108+
content = self.read_text(path)
109+
if content is None:
110+
return None
111+
try:
112+
return json.loads(content)
113+
except json.JSONDecodeError as exc:
114+
logger.warning("Failed to parse JSON from %s: %s", path, exc)
115+
return None
116+
117+
def write_json(self, path: str, data: JSONValue, indent: int = 2) -> None:
118+
"""Write a JSON file."""
119+
self.write_text(path, json.dumps(data, indent=indent, default=str))
120+
121+
# ------------------------------------------------------------------
122+
# Directory / existence helpers
123+
# ------------------------------------------------------------------
124+
125+
def exists(self, path: str) -> bool:
126+
"""Check whether a file exists."""
127+
resolved = self._resolve(path)
128+
try:
129+
file_client = self._fs_client.get_file_client(resolved)
130+
file_client.get_file_properties()
131+
return True
132+
except Exception:
133+
return False
134+
135+
def delete(self, path: str) -> bool:
136+
"""Delete a file. Returns ``True`` if deleted, ``False`` otherwise."""
137+
resolved = self._resolve(path)
138+
try:
139+
file_client = self._fs_client.get_file_client(resolved)
140+
file_client.delete_file()
141+
return True
142+
except Exception:
143+
return False

0 commit comments

Comments
 (0)