Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,6 @@ __pycache__/

# mkdocs
site/

# memories (local development notes)
memories/
4 changes: 4 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ kinesis = ["quixstreams[aws]", "mypy-boto3-kinesis>=1.35.65,<2.0"]
s3 = ["quixstreams[aws]", "mypy-boto3-s3>=1.35.65,<2.0"]
redis = ["redis[hiredis]>=5.2.0,<6"]
azure-file = ["quixstreams[azure]"]
quixdatalake = ["pandas>=1.0.0,<3.0", "pyarrow>=17.0.0", "quixportal>=0.1.0"]

[tool.setuptools.packages.find]
include = ["quixstreams*"]
Expand Down Expand Up @@ -129,6 +130,9 @@ module = [
"confluent_kafka.*",
"pyarrow.*",
"jsonpath_ng.*",
"quixportal.*",
"requests.*",
"pandas.*",
]
ignore_missing_imports = true

Expand Down
285 changes: 285 additions & 0 deletions quixstreams/sinks/core/_blob_storage_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,285 @@
"""
Blob Storage Client

Unified blob storage client that abstracts away provider-specific implementations.
Uses quixportal for flexible blob storage access (Azure, AWS S3, GCP, MinIO, local).
"""

import concurrent.futures
import logging
from typing import Any, Callable, Dict, List, Optional

try:
from quixportal import get_filesystem
from quixportal.storage.config import BlobStorageProvider, load_config_from_env
except ImportError as exc:
raise ImportError(
f"Package {exc.name} is missing: "
'run "pip install quixstreams[quixdatalake] '
'--extra-index-url https://pkgs.dev.azure.com/quix-analytics/53f7fe95-59fe-4307-b479-2473b96de6d1/_packaging/public/pypi/simple/" '
"to use QuixLakeBlobStorageSink"
) from exc

logger = logging.getLogger(__name__)


def get_bucket_name() -> str:
"""
Extract the bucket/container name from the quixportal configuration.

This reads the Quix__BlobStorage__Connection__Json environment variable
and extracts the bucket name based on the provider type.

:returns: The bucket name for S3/MinIO or container name for Azure
:raises ValueError: If configuration is missing or bucket name cannot be determined
"""
config = load_config_from_env()

if config.provider in (
BlobStorageProvider.S3,
BlobStorageProvider.MINIO,
BlobStorageProvider.S3_COMPATIBLE,
BlobStorageProvider.GCP,
):
if config.s3_compatible:
return config.s3_compatible.bucket_name
raise ValueError(f"S3-compatible config missing for provider {config.provider}")

elif config.provider == BlobStorageProvider.AZURE:
if config.azure_blob_storage:
return config.azure_blob_storage.container_name
raise ValueError("Azure blob storage config missing")

elif config.provider == BlobStorageProvider.LOCAL:
# For local storage, use the directory name as "bucket"
if config.local_storage:
import os

return os.path.basename(config.local_storage.directory_path.rstrip("/\\"))
raise ValueError("Local storage config missing")

raise ValueError(f"Unsupported provider: {config.provider}")


class BlobStorageClient:
"""
Unified blob storage client that abstracts away provider-specific implementations.
Uses quixportal for flexible blob storage access.
"""

def __init__(self, base_path: str = "", max_workers: int = 10):
"""
Initialize blob storage client via quixportal.

:param base_path: Optional base path prefix for all operations.
If empty, filesystem is used without DirFileSystem wrapper.
:param max_workers: Maximum number of concurrent upload workers.
"""
# Only pass base_path if non-empty to avoid quixportal wrapping with DirFileSystem(base_path=".")
# which causes "Access denied" errors on GCP S3-compatible storage
if base_path:
self._fs = get_filesystem(base_path=base_path)
else:
self._fs = get_filesystem()
self.base_path = base_path
self._max_workers = max_workers
self._executor: Optional[concurrent.futures.ThreadPoolExecutor] = None

def _get_executor(self) -> concurrent.futures.ThreadPoolExecutor:
"""Get or create thread pool executor for concurrent uploads."""
if self._executor is None:
self._executor = concurrent.futures.ThreadPoolExecutor(
max_workers=self._max_workers
)
return self._executor

def list_objects(self, prefix: str, max_keys: int = 100) -> List[Dict[str, Any]]:
"""
List objects in blob storage with a given prefix.

:param prefix: Object prefix/path
:param max_keys: Maximum number of objects to return
:returns: List of dicts with 'Key' and 'Size' keys
"""
try:
result: List[Dict[str, Any]] = []
found_files: set[str] = set()

# Build base pattern
base_pattern = prefix if prefix else ""
if base_pattern and not base_pattern.endswith("/"):
base_pattern = base_pattern + "/"

# Try multiple depth levels (up to 10 levels deep)
for depth in range(0, 10):
if len(result) >= max_keys:
break

if depth == 0:
pattern = f"{base_pattern}*" if base_pattern else "*"
else:
pattern = f"{base_pattern}{'*/' * depth}*"

try:
files = self._fs.glob(pattern)
for f in files:
if f in found_files:
continue
if len(result) >= max_keys:
break
try:
if self._fs.exists(f):
size = self._fs.size(f)
result.append(
{"Key": f, "Size": size if size is not None else 0}
)
found_files.add(f)
except Exception as e:
logger.debug(f"Error processing {f}: {e}")
except Exception as e:
logger.debug(f"Glob pattern at depth {depth} failed: {e}")
continue

return result
except Exception as e:
logger.warning(f"Error listing objects with prefix {prefix}: {e}")
return []

def put_object(self, key: str, body: bytes) -> None:
"""
Put/upload data to blob storage.

:param key: Object key/path
:param body: Object data as bytes
"""
try:
with self._fs.open(key, "wb") as f:
f.write(body)
logger.debug(f"Uploaded {key}")
except Exception as e:
logger.error(f"Error uploading {key}: {e}")
raise

def put_object_async(
self, key: str, body: bytes, callback: Optional[Callable] = None
) -> concurrent.futures.Future:
"""
Asynchronously put/upload data to blob storage.

:param key: Object key/path
:param body: Object data as bytes
:param callback: Optional callback function to call on completion
:returns: Future object that can be used to wait for completion
"""

def _upload():
self.put_object(key, body)
if callback:
callback(key)
return key

future = self._get_executor().submit(_upload)
return future

def head_object(self, key: str) -> Optional[Dict[str, Any]]:
"""
Get metadata about an object without reading its contents.

:param key: Object key
:returns: Dict with 'Key' and 'Size' keys, or None if object doesn't exist
"""
try:
if self._fs.exists(key):
size = self._fs.size(key)
return {"Key": key, "Size": size if size is not None else 0}
return None
except Exception as e:
logger.warning(f"Error getting metadata for {key}: {e}")
return None

def exists(self, key: str) -> bool:
"""
Check if an object exists in blob storage.

:param key: Object key
:returns: True if object exists, False otherwise
"""
try:
return self._fs.exists(key)
except Exception as e:
logger.warning(f"Error checking existence of {key}: {e}")
return False

def delete_object(self, key: str) -> None:
"""
Delete a single object from blob storage.

:param key: Object key
"""
try:
if self._fs.exists(key):
try:
self._fs.rm_file(key)
except (NotImplementedError, AttributeError):
self._fs.rm(key)
logger.debug(f"Deleted {key}")
except Exception as e:
logger.warning(f"Failed to delete {key}: {e}")

def delete_objects(self, keys: List[str]) -> None:
"""
Delete multiple objects from blob storage.

:param keys: List of object keys to delete
"""
for key in keys:
self.delete_object(key)

def get_object(self, key: str) -> bytes:
"""
Get/download data from blob storage.

:param key: Object key
:returns: Object data as bytes
"""
try:
with self._fs.open(key, "rb") as f:
return f.read()
except Exception as e:
logger.error(f"Error downloading {key}: {e}")
raise

def ensure_path_exists(self, auto_create: bool = True) -> bool:
"""
Ensure the base path/bucket is accessible, optionally creating it.

:param auto_create: If True, attempt to create the path if it doesn't exist
:returns: True if path exists or was created successfully, False otherwise
"""
try:
# Try to list the root to verify access
self._fs.ls("")
logger.info("Successfully connected to blob storage")
return True
except Exception as e:
if auto_create:
try:
# Try to create the path/bucket
self._fs.mkdir("")
logger.info("Created blob storage path")
return True
except Exception as create_error:
logger.warning(
f"Could not create blob storage path: {create_error}. "
"Continuing anyway - path may already exist or be created on first write."
)
return True # Continue anyway with fallback behavior
else:
logger.error(f"Failed to access blob storage: {e}")
return False

def shutdown(self):
"""Shutdown the thread pool executor."""
if self._executor is not None:
self._executor.shutdown(wait=True)
self._executor = None
71 changes: 71 additions & 0 deletions quixstreams/sinks/core/_catalog_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
"""
REST Catalog Client

Simple HTTP client for REST Catalog API interactions.
"""

from typing import Optional

import requests


class CatalogClient:
"""Simple HTTP client for REST Catalog API interactions."""

def __init__(self, base_url: str, auth_token: Optional[str] = None):
"""
Initialize the catalog client.

:param base_url: Base URL of the REST Catalog API
:param auth_token: Optional authentication token for API requests
"""
self.base_url = base_url.rstrip("/")
self.auth_token = auth_token
self._session = requests.Session()

# Set up authentication header if token is provided
if self.auth_token:
self._session.headers["Authorization"] = f"Bearer {self.auth_token}"

def get(self, path: str, timeout: int = 30) -> requests.Response:
"""
Make a GET request to the catalog API.

:param path: API endpoint path (will be appended to base_url)
:param timeout: Request timeout in seconds
:returns: Response object from requests library
"""
url = f"{self.base_url}{path}"
return self._session.get(url, timeout=timeout)

def post(
self, path: str, json: Optional[dict] = None, timeout: int = 30
) -> requests.Response:
"""
Make a POST request to the catalog API.

:param path: API endpoint path (will be appended to base_url)
:param json: JSON payload to send in request body
:param timeout: Request timeout in seconds
:returns: Response object from requests library
"""
url = f"{self.base_url}{path}"
return self._session.post(url, json=json, timeout=timeout)

def put(
self, path: str, json: Optional[dict] = None, timeout: int = 30
) -> requests.Response:
"""
Make a PUT request to the catalog API.

:param path: API endpoint path (will be appended to base_url)
:param json: JSON payload to send in request body
:param timeout: Request timeout in seconds
:returns: Response object from requests library
"""
url = f"{self.base_url}{path}"
return self._session.put(url, json=json, timeout=timeout)

def __str__(self):
"""String representation showing the base URL."""
return self.base_url
Loading