Skip to content

Commit e50ebe8

Browse files
authored
Merge pull request #73 from amosproj/data-ingest-integration
shell data ingest integration for parquet & csv
2 parents 3fa4a56 + 5a20cfa commit e50ebe8

File tree

6 files changed

+1103
-128
lines changed

6 files changed

+1103
-128
lines changed
Lines changed: 253 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,253 @@
1+
"""
2+
Shell Data Ingestion Script
3+
4+
Downloads Shell sensor data from Azure Blob Storage and saves locally as CSV/Parquet.
5+
6+
Usage:
7+
export AZURE_ACCOUNT_URL="https://azassaexdseq00039ewduni.blob.core.windows.net"
8+
export AZURE_CONTAINER_NAME="rtimedata"
9+
export AZURE_SAS_TOKEN="?sv=2020-10-02&ss=btqf&srt=sco&st=..."
10+
11+
python ingest_shell_data.py --output-format parquet
12+
python ingest_shell_data.py --output-format csv
13+
python ingest_shell_data.py --output-format both
14+
15+
"""
16+
17+
import argparse
18+
import os
19+
import sys
20+
from pathlib import Path
21+
from datetime import datetime
22+
23+
sys.path.insert(0, str(Path(__file__).resolve().parents[3] / "src" / "sdk" / "python"))
24+
from rtdip_sdk.pipelines.sources import PythonAzureBlobSource
25+
26+
DEFAULT_OUTPUT_DIR = Path(__file__).parent / ".." / "data"
27+
28+
29+
def print_header(text: str):
30+
"""Print formatted header."""
31+
print("\n" + "=" * 80)
32+
print(text)
33+
print("=" * 80)
34+
35+
36+
def print_step(step: str):
37+
"""Print formatted step."""
38+
print(f"\n{step}")
39+
print("-" * 80)
40+
41+
42+
def get_config_value(arg_value, env_var_name, config_name):
43+
"""Get configuration value from args or environment variable."""
44+
if arg_value:
45+
print(f"Using {config_name} from command-line argument")
46+
return arg_value
47+
48+
env_value = os.environ.get(env_var_name)
49+
if env_value:
50+
print(f"Using {config_name} from {env_var_name} environment variable")
51+
return env_value
52+
53+
return None
54+
55+
56+
def get_credentials(args) -> str:
57+
"""Get Azure credentials from args or environment variables."""
58+
if args.sas_token:
59+
print("Using SAS token from command-line argument")
60+
return args.sas_token
61+
if args.account_key:
62+
print("Using account key from command-line argument")
63+
return args.account_key
64+
65+
sas_token = os.environ.get("AZURE_SAS_TOKEN")
66+
if sas_token:
67+
print("Using SAS token from AZURE_SAS_TOKEN environment variable")
68+
return sas_token
69+
70+
account_key = os.environ.get("AZURE_ACCOUNT_KEY")
71+
if account_key:
72+
print("Using account key from AZURE_ACCOUNT_KEY environment variable")
73+
return account_key
74+
75+
raise ValueError(
76+
"No Azure credentials provided. Use --sas-token/--account-key or set "
77+
"AZURE_SAS_TOKEN/AZURE_ACCOUNT_KEY environment variable"
78+
)
79+
80+
81+
def download_data(account_url: str, container_name: str, credential: str):
82+
"""Download data from Azure Blob Storage."""
83+
print_step("Downloading data from Azure Blob Storage")
84+
print(f"Account URL: {account_url}")
85+
print(f"Container: {container_name}")
86+
87+
source = PythonAzureBlobSource(
88+
account_url=account_url,
89+
container_name=container_name,
90+
credential=credential,
91+
file_pattern="*.parquet",
92+
combine_blobs=True,
93+
eager=True,
94+
)
95+
96+
print("\nReading data...")
97+
df = source.read_batch()
98+
99+
print(f"Shape: {df.shape}")
100+
print(f"Columns: {df.columns}")
101+
102+
return df
103+
104+
105+
def save_data(df, output_format: str, output_dir: Path):
106+
"""Save DataFrame to specified format(s)."""
107+
print_step("Saving data")
108+
output_dir.mkdir(parents=True, exist_ok=True)
109+
110+
formats = []
111+
if output_format in ["parquet", "both"]:
112+
formats.append("parquet")
113+
if output_format in ["csv", "both"]:
114+
formats.append("csv")
115+
116+
saved_files = []
117+
118+
for fmt in formats:
119+
output_path = output_dir / f"ShellData.{fmt}"
120+
print(f"\nSaving to {output_path}")
121+
122+
if fmt == "parquet":
123+
if hasattr(df, "write_parquet"):
124+
df.write_parquet(output_path)
125+
else:
126+
df.to_parquet(output_path, index=False)
127+
elif fmt == "csv":
128+
if hasattr(df, "write_csv"):
129+
df.write_csv(output_path)
130+
else:
131+
df.to_csv(output_path, index=False)
132+
133+
file_size_mb = output_path.stat().st_size / (1024 * 1024)
134+
print(f"Saved {file_size_mb:.2f} MB")
135+
saved_files.append(output_path)
136+
137+
return saved_files
138+
139+
140+
def print_summary(saved_files: list, total_time: float):
141+
"""Print ingestion summary."""
142+
print_header("INGESTION SUMMARY")
143+
144+
print("\nFiles created:")
145+
for file_path in saved_files:
146+
size_mb = file_path.stat().st_size / (1024 * 1024)
147+
print(
148+
f" • {file_path.relative_to(file_path.parent.parent)} ({size_mb:.2f} MB)"
149+
)
150+
151+
print(f"\nTotal time: {total_time:.2f} seconds")
152+
print("\nData is now ready for preprocessing!")
153+
print(f"Location: {saved_files[0].parent}")
154+
155+
156+
def main():
157+
parser = argparse.ArgumentParser(
158+
description="Ingest Shell sensor data from Azure Blob Storage using RTDIP SDK"
159+
)
160+
161+
# Azure configuration (can be provided via CLI or environment variables)
162+
parser.add_argument(
163+
"--account-url",
164+
type=str,
165+
help="Azure Storage account URL (or use AZURE_ACCOUNT_URL env var)",
166+
)
167+
parser.add_argument(
168+
"--container-name",
169+
type=str,
170+
help="Azure Blob container name (or use AZURE_CONTAINER_NAME env var)",
171+
)
172+
173+
# Credentials
174+
parser.add_argument(
175+
"--sas-token",
176+
type=str,
177+
help="Azure SAS token for authentication (should start with '?')",
178+
)
179+
parser.add_argument(
180+
"--account-key", type=str, help="Azure Storage account key for authentication"
181+
)
182+
183+
# Output configuration
184+
parser.add_argument(
185+
"--output-dir",
186+
type=Path,
187+
default=DEFAULT_OUTPUT_DIR,
188+
help=f"Output directory for downloaded data (default: {DEFAULT_OUTPUT_DIR})",
189+
)
190+
parser.add_argument(
191+
"--output-format",
192+
type=str,
193+
choices=["parquet", "csv", "both"],
194+
default="both",
195+
help="Output format: parquet, csv, or both (default: both)",
196+
)
197+
198+
args = parser.parse_args()
199+
200+
# Start ingestion
201+
print_header("SHELL DATA INGESTION")
202+
print(f"Start time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
203+
204+
start_time = datetime.now()
205+
206+
try:
207+
# Get configuration from args or environment variables
208+
account_url = get_config_value(
209+
args.account_url, "AZURE_ACCOUNT_URL", "account URL"
210+
)
211+
container_name = get_config_value(
212+
args.container_name, "AZURE_CONTAINER_NAME", "container name"
213+
)
214+
credential = get_credentials(args)
215+
216+
# Validate required parameters
217+
if not account_url:
218+
raise ValueError(
219+
"Azure account URL is required. Provide via --account-url or AZURE_ACCOUNT_URL environment variable.\n"
220+
"Example: export AZURE_ACCOUNT_URL='https://azassaexdseq00039ewduni.blob.core.windows.net'"
221+
)
222+
if not container_name:
223+
raise ValueError(
224+
"Azure container name is required. Provide via --container-name or AZURE_CONTAINER_NAME environment variable.\n"
225+
"Example: export AZURE_CONTAINER_NAME='rtimedata'"
226+
)
227+
228+
# Download data
229+
df = download_data(
230+
account_url=account_url,
231+
container_name=container_name,
232+
credential=credential,
233+
)
234+
235+
# Save to file(s)
236+
saved_files = save_data(df, args.output_format, args.output_dir)
237+
238+
# Summary
239+
total_time = (datetime.now() - start_time).total_seconds()
240+
print_summary(saved_files, total_time)
241+
242+
return 0
243+
244+
except Exception as e:
245+
print(f"\nERROR: Ingestion failed with error: {e}")
246+
import traceback
247+
248+
traceback.print_exc()
249+
return 1
250+
251+
252+
if __name__ == "__main__":
253+
sys.exit(main())

0 commit comments

Comments
 (0)