A Python wrapper for AzCopy that feels native and gets out of your way.
Performance: AzCopy, written in Go, significantly outperforms Python's Azure SDK for bulk transfers. Go's goroutines provide true parallelism for file I/O and network operations, while Python's GIL limits concurrency. For large-scale transfers, AzCopy can be 5-10x faster.
Python Integration: But switching between Python and bash scripts breaks your workflow. Azpype solves this by wrapping AzCopy in a native Python interface. Now you can:
- Write pure Python scripts with data processing before and after transfers
- Capture and parse output programmatically
- Handle errors with try/except blocks
- Integrate with your existing Python data pipeline
Additional Benefits:
- Zero-configuration setup - Bundles the right AzCopy binary for your platform
- Smart defaults - YAML config for common settings, override with kwargs when needed
- Rich logging - Structured logs with loguru, daily rotation, and visual command output
- Built-in validation - Checks auth, network, and paths before executing
- Job management - List, resume, and recover failed transfers programmatically
pip install azpypeThat's it. Azpype automatically:
- Downloads the appropriate AzCopy binary (v10.18.1) for your platform
- Creates a config directory at
~/.azpype/ - Sets up a default configuration file
from azpype.commands.copy import Copy
# Upload a local directory to Azure Blob Storage
Copy(
source="./data",
destination="https://myaccount.blob.core.windows.net/mycontainer/"
).execute()
# Download from Azure to local
Copy(
source="https://myaccount.blob.core.windows.net/mycontainer/data/",
destination="./downloads"
).execute()The execute() method returns an AzCopyStdoutParser object with parsed attributes - no manual string parsing needed!
# Execute returns a parsed object with useful attributes
result = Copy(
source="./data",
destination="https://myaccount.blob.core.windows.net/mycontainer/"
).execute()
# Access structured data directly
print(f"Job ID: {result.job_id}")
print(f"Files transferred: {result.number_of_file_transfers_completed}")
print(f"Files skipped: {result.number_of_file_transfers_skipped}")
print(f"Bytes transferred: {result.total_bytes_transferred}")
print(f"Elapsed time: {result.elapsed_time} minutes")
print(f"Final status: {result.final_job_status}")
# Use exit code for flow control
if result.exit_code == 0:
print("Transfer successful!")
else:
print(f"Transfer failed: {result.stdout}")The parser automatically extracts these attributes from AzCopy output:
| Attribute | Type | Description |
|---|---|---|
exit_code |
int | Command exit code (0 = success) |
job_id |
str | Unique job identifier for resuming |
elapsed_time |
float | Transfer duration in minutes |
final_job_status |
str | Status like "Completed", "CompletedWithSkipped", "Failed" |
number_of_file_transfers |
int | Total files attempted |
number_of_file_transfers_completed |
int | Successfully transferred files |
number_of_file_transfers_skipped |
int | Files skipped (already exist, etc.) |
number_of_file_transfers_failed |
int | Failed file transfers |
total_bytes_transferred |
int | Total data transferred in bytes |
total_number_of_transfers |
int | Total transfer operations |
stdout |
str | Raw command output if needed |
raw_stdout |
str | Unprocessed output with ANSI codes |
def smart_sync_with_monitoring(local_path, remote_path):
"""
Sync data and monitor transfer metrics
"""
result = Copy(
source=local_path,
destination=remote_path,
overwrite="ifSourceNewer",
recursive=True
).execute()
# Make decisions based on parsed results
if result.exit_code != 0:
raise Exception(f"Transfer failed: {result.final_job_status}")
if result.number_of_file_transfers_failed > 0:
print(f"Warning: {result.number_of_file_transfers_failed} files failed")
# Could trigger retry logic here
if result.number_of_file_transfers_skipped == result.number_of_file_transfers:
print("All files already up-to-date")
return "NO_CHANGES"
# Report transfer metrics
gb_transferred = result.total_bytes_transferred / (1024**3)
transfer_rate = gb_transferred / (result.elapsed_time / 60) # GB/hour
print(f"Transferred {gb_transferred:.2f} GB at {transfer_rate:.2f} GB/hour")
print(f"Completed: {result.number_of_file_transfers_completed} files")
return result.job_id # Return for potential resume operationsSet these environment variables:
import os
os.environ["AZCOPY_TENANT_ID"] = "your-tenant-id"
os.environ["AZCOPY_SPA_APPLICATION_ID"] = "your-app-id"
os.environ["AZCOPY_SPA_CLIENT_SECRET"] = "your-secret"
os.environ["AZCOPY_AUTO_LOGIN_TYPE"] = "SPN"Or use a .env file:
# .env
AZCOPY_TENANT_ID=your-tenant-id
AZCOPY_SPA_APPLICATION_ID=your-app-id
AZCOPY_SPA_CLIENT_SECRET=your-secret
AZCOPY_AUTO_LOGIN_TYPE=SPNfrom dotenv import load_dotenv
load_dotenv()
from azpype.commands.copy import Copy
Copy(source, destination).execute()Pass the token directly (without the leading ?):
Copy(
source="./data",
destination="https://myaccount.blob.core.windows.net/mycontainer/",
sas_token="sv=2021-12-02&ss=b&srt=sco&sp=rwdlacyx..."
).execute()Azpype uses a two-level configuration system:
Located at ~/.azpype/copy_config.yaml:
# Overwrite strategy at destination
overwrite: 'ifSourceNewer' # Options: 'true', 'false', 'prompt', 'ifSourceNewer'
# Recursive copy for directories
recursive: true
# Create MD5 hashes during upload
put-md5: true
# Number of parallel transfers
concurrency: 16Override any config value at runtime:
Copy(
source="./data",
destination="https://...",
overwrite="true", # Override YAML setting
concurrency=32, # Increase parallelism
dry_run=True, # Test without copying
exclude_pattern="*.tmp" # Add exclusion pattern
).execute()# Upload only Python files
Copy(
source="./project",
destination="https://myaccount.blob.core.windows.net/code/",
include_pattern="*.py",
recursive=True
).execute()
# Exclude temporary files
Copy(
source="./data",
destination="https://myaccount.blob.core.windows.net/backup/",
exclude_pattern="*.tmp;*.log;*.cache",
recursive=True
).execute()# Only upload newer files
Copy(
source="./local-data",
destination="https://myaccount.blob.core.windows.net/data/",
overwrite="ifSourceNewer",
recursive=True
).execute()
# Never overwrite existing files
Copy(
source="./archive",
destination="https://myaccount.blob.core.windows.net/archive/",
overwrite="false"
).execute()# See what would be copied without actually transferring
Copy(
source="./large-dataset",
destination="https://myaccount.blob.core.windows.net/datasets/",
dry_run=True
).execute()Resume failed or cancelled transfers:
from azpype.commands.jobs import Jobs
jobs = Jobs()
# List all jobs
exit_code, output = jobs.list()
# Resume a specific job
jobs.resume(job_id="abc123-def456")
# Find and resume the last failed job
job_id = jobs.last_failed()
if job_id:
jobs.resume(job_id=job_id)
# Auto-recover (find and resume last failed)
jobs.recover_last_failed()Azpype provides rich logging with automatic rotation:
- Location:
~/.azpype/azpype_YYYY-MM-DD.log - Rotation: Daily, with 7-day retention and gzip compression
- Console output: Color-coded with progress indicators
- Command details: Full command, exit codes, and stdout/stderr captured
Example log output:
2025-08-15 19:09:29 | INFO | COPY | Starting copy operation
2025-08-15 19:09:29 | INFO | COPY | ========== COMMAND EXECUTION ==========
2025-08-15 19:09:29 | INFO | COPY | Command: azcopy copy ./data https://...
2025-08-15 19:09:29 | INFO | COPY | Exit Code: 0
2025-08-15 19:09:29 | INFO | COPY | STDOUT:
2025-08-15 19:09:29 | INFO | COPY | Job abc123 has started
2025-08-15 19:09:29 | INFO | COPY | 100.0%, 10 Done, 0 Failed, 0 Pending
Common options for the Copy command:
| Option | Type | Description |
|---|---|---|
overwrite |
str | How to handle existing files: 'true', 'false', 'prompt', 'ifSourceNewer' |
recursive |
bool | Include subdirectories |
include_pattern |
str | Include only matching files (wildcards supported) |
exclude_pattern |
str | Exclude matching files (wildcards supported) |
dry_run |
bool | Preview what would be copied without transferring |
concurrency |
int | Number of parallel transfers |
block_size_mb |
float | Block size for large files (in MiB) |
put_md5 |
bool | Create MD5 hashes during upload |
check_length |
bool | Verify file sizes after transfer |
as_subdir |
bool | Place folder sources as subdirectories |
MIT