Skip to content

Commit 849872c

Browse files
Replaces print statements with logging (#28)
1 parent 9fcc4f4 commit 849872c

File tree

6 files changed

+63
-65
lines changed

6 files changed

+63
-65
lines changed

keras_remote/backend/execution.py

Lines changed: 15 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -13,15 +13,14 @@
1313

1414
import cloudpickle
1515

16+
from absl import logging
17+
1618
from keras_remote.constants import get_default_zone, zone_to_region
1719
from keras_remote.infra import container_builder
1820
from keras_remote.backend import gke_client
19-
from keras_remote.infra import infra
2021
from keras_remote.utils import packager
2122
from keras_remote.utils import storage
2223

23-
logger = infra.logger
24-
2524

2625
@dataclass
2726
class JobContext:
@@ -155,7 +154,7 @@ def _prepare_artifacts(
155154
ctx: JobContext, tmpdir: str, caller_frame_depth: int = 3
156155
) -> None:
157156
"""Phase 1: Package function payload and working directory context."""
158-
logger.info("Packaging function and context...")
157+
logging.info("Packaging function and context...")
159158

160159
# Get caller directory
161160
frame = inspect.stack()[caller_frame_depth]
@@ -170,28 +169,28 @@ def _prepare_artifacts(
170169
packager.save_payload(
171170
ctx.func, ctx.args, ctx.kwargs, ctx.env_vars, ctx.payload_path
172171
)
173-
logger.info(f"Payload serialized to {ctx.payload_path}")
172+
logging.info("Payload serialized to %s", ctx.payload_path)
174173

175174
# Zip working directory
176175
ctx.context_path = os.path.join(tmpdir, "context.zip")
177176
packager.zip_working_dir(caller_path, ctx.context_path)
178-
logger.info(f"Context packaged to {ctx.context_path}")
177+
logging.info("Context packaged to %s", ctx.context_path)
179178

180179
# Find requirements.txt
181180
ctx.requirements_path = _find_requirements(caller_path)
182181
if ctx.requirements_path:
183-
logger.info(f"Found requirements.txt: {ctx.requirements_path}")
182+
logging.info("Found requirements.txt: %s", ctx.requirements_path)
184183
else:
185-
logger.info("No requirements.txt found")
184+
logging.info("No requirements.txt found")
186185

187186

188187
def _build_container(ctx: JobContext) -> None:
189188
"""Phase 2: Build or get cached container image."""
190189
if ctx.container_image:
191190
ctx.image_uri = ctx.container_image
192-
logger.info(f"Using custom container: {ctx.image_uri}")
191+
logging.info("Using custom container: %s", ctx.image_uri)
193192
else:
194-
logger.info("Building container image...")
193+
logging.info("Building container image...")
195194
ctx.image_uri = container_builder.get_or_build_container(
196195
base_image="python:3.12-slim",
197196
requirements_path=ctx.requirements_path,
@@ -203,7 +202,7 @@ def _build_container(ctx: JobContext) -> None:
203202

204203
def _upload_artifacts(ctx: JobContext) -> None:
205204
"""Phase 3: Upload artifacts to Cloud Storage."""
206-
logger.info(f"Uploading artifacts to Cloud Storage (job: {ctx.job_id})...")
205+
logging.info("Uploading artifacts to Cloud Storage (job: %s)...", ctx.job_id)
207206
storage.upload_artifacts(
208207
bucket_name=ctx.bucket_name,
209208
job_id=ctx.job_id,
@@ -215,7 +214,7 @@ def _upload_artifacts(ctx: JobContext) -> None:
215214

216215
def _download_result(ctx: JobContext) -> dict:
217216
"""Phase 6: Download and deserialize result from Cloud Storage."""
218-
logger.info("Downloading result...")
217+
logging.info("Downloading result...")
219218
result_path = storage.download_result(
220219
ctx.bucket_name, ctx.job_id, project=ctx.project
221220
)
@@ -226,14 +225,14 @@ def _download_result(ctx: JobContext) -> dict:
226225

227226
def _cleanup_and_return(ctx: JobContext, result_payload: dict) -> Any:
228227
"""Phase 7: Cleanup Cloud Storage artifacts and handle result."""
229-
logger.info("Cleaning up artifacts...")
228+
logging.info("Cleaning up artifacts...")
230229
storage.cleanup_artifacts(ctx.bucket_name, ctx.job_id, project=ctx.project)
231230

232231
if result_payload["success"]:
233-
logger.info("Remote execution completed successfully")
232+
logging.info("Remote execution completed successfully")
234233
return result_payload["result"]
235234
else:
236-
logger.error(f"Remote execution failed:\n{result_payload['traceback']}")
235+
logging.error("Remote execution failed:\n%s", result_payload['traceback'])
237236
raise result_payload["exception"]
238237

239238

@@ -264,7 +263,7 @@ def execute_remote(ctx: JobContext, backend: BackendClient) -> Any:
264263
_upload_artifacts(ctx)
265264

266265
# Phase 4: Submit job (backend-specific)
267-
logger.info(f"Submitting job to {backend.__class__.__name__}...")
266+
logging.info("Submitting job to %s...", backend.__class__.__name__)
268267
job = backend.submit_job(ctx)
269268

270269
# Phase 5: Wait for completion (with cleanup on failure)

keras_remote/backend/gke_client.py

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,10 @@
66
from kubernetes import client, config
77
from kubernetes.client.rest import ApiException
88

9+
from absl import logging
10+
911
from keras_remote.core.accelerators import TpuConfig
1012
from keras_remote.core import accelerators
11-
from keras_remote.infra import infra
12-
13-
logger = infra.logger
1413

1514

1615
def submit_k8s_job(
@@ -58,10 +57,10 @@ def submit_k8s_job(
5857

5958
try:
6059
created_job = batch_v1.create_namespaced_job(namespace=namespace, body=job)
61-
logger.info(f"Submitted K8s job: {job_name}")
62-
logger.info(f"View job with: kubectl get job {job_name} -n {namespace}")
63-
logger.info(
64-
f"View logs with: kubectl logs -l job-name={job_name} -n {namespace}"
60+
logging.info("Submitted K8s job: %s", job_name)
61+
logging.info("View job with: kubectl get job %s -n %s", job_name, namespace)
62+
logging.info(
63+
"View logs with: kubectl logs -l job-name=%s -n %s", job_name, namespace
6564
)
6665
return created_job
6766
except ApiException as e:
@@ -124,7 +123,7 @@ def wait_for_job(job, namespace="default", timeout=3600, poll_interval=10):
124123

125124
# Check completion conditions
126125
if job_status.status.succeeded and job_status.status.succeeded >= 1:
127-
print(f"[REMOTE] Job {job_name} completed successfully")
126+
logging.info("Job %s completed successfully", job_name)
128127
return "success"
129128

130129
if job_status.status.failed and job_status.status.failed >= 1:
@@ -137,7 +136,7 @@ def wait_for_job(job, namespace="default", timeout=3600, poll_interval=10):
137136

138137
# Job still running
139138
if not logged_running:
140-
logger.info(f"Job {job_name} running...")
139+
logging.info("Job %s running...", job_name)
141140
logged_running = True
142141

143142
time.sleep(poll_interval)
@@ -160,13 +159,13 @@ def cleanup_job(job_name, namespace="default"):
160159
namespace=namespace,
161160
body=client.V1DeleteOptions(propagation_policy="Foreground"),
162161
)
163-
logger.info(f"Deleted K8s job: {job_name}")
162+
logging.info("Deleted K8s job: %s", job_name)
164163
except ApiException as e:
165164
if e.status == 404:
166165
# Job already deleted
167166
pass
168167
else:
169-
logger.warning(f"Failed to delete job {job_name}: {e.reason}")
168+
logging.warning("Failed to delete job %s: %s", job_name, e.reason)
170169

171170

172171
def _parse_accelerator(accelerator):
@@ -339,7 +338,7 @@ def _print_pod_logs(core_v1, job_name, namespace):
339338
logs = core_v1.read_namespaced_pod_log(
340339
pod.metadata.name, namespace, tail_lines=100
341340
)
342-
print(f"[REMOTE] Pod {pod.metadata.name} logs:\n{logs}")
341+
logging.info("Pod %s logs:\n%s", pod.metadata.name, logs)
343342

344343

345344
def _check_pod_scheduling(core_v1, job_name, namespace):

keras_remote/infra/container_builder.py

Lines changed: 15 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,10 @@
1212
from google.cloud import artifactregistry_v1
1313
from google.cloud import storage
1414
from google.cloud.devtools import cloudbuild_v1
15+
from absl import logging
16+
1517
from keras_remote.constants import zone_to_ar_location, get_default_zone
1618
from keras_remote.core import accelerators
17-
from keras_remote.infra import infra
18-
19-
logger = infra.logger
2019

2120
REMOTE_RUNNER_FILE_NAME = "remote_runner.py"
2221
# Paths relative to this file's location (keras_remote/infra/)
@@ -54,19 +53,18 @@ def get_or_build_container(base_image, requirements_path, accelerator_type, proj
5453

5554
# Check if image exists
5655
if _image_exists(image_uri, project):
57-
logger.info(f"Using cached container: {image_uri}")
56+
logging.info("Using cached container: %s", image_uri)
5857
ar_url = (
5958
"https://console.cloud.google.com/artifacts"
6059
f"/docker/{project}/{ar_location}"
6160
f"/keras-remote/base?project={project}"
6261
)
63-
logger.info(f"View image: {ar_url}")
62+
logging.info("View image: %s", ar_url)
6463
return image_uri
6564

6665
# Build new image
67-
logger.info(
68-
f"Building new container (requirements changed): "
69-
f"{image_uri}"
66+
logging.info(
67+
"Building new container (requirements changed): %s", image_uri
7068
)
7169
return _build_and_push(
7270
base_image, requirements_path, accelerator_type,
@@ -138,7 +136,7 @@ def _image_exists(image_uri, project):
138136
except google_exceptions.NotFound:
139137
return False
140138
except Exception:
141-
logger.warning("Unexpected error checking image existence",
139+
logging.warning("Unexpected error checking image existence",
142140
exc_info=True)
143141
return False
144142

@@ -212,26 +210,26 @@ def _build_and_push(base_image, requirements_path, accelerator_type,
212210
)
213211
)
214212

215-
logger.info("Submitting build to Cloud Build...")
213+
logging.info("Submitting build to Cloud Build...")
216214
operation = build_client.create_build(project_id=project, build=build_config)
217215

218216
# Get build ID from the operation metadata
219217
build_id = operation.metadata.build.id if hasattr(operation, "metadata") else None
220218
if build_id:
221-
logger.info(f"Build ID: {build_id}")
222-
logger.info(f"View build: https://console.cloud.google.com/cloud-build/builds/{build_id}?project={project}")
219+
logging.info("Build ID: %s", build_id)
220+
logging.info("View build: https://console.cloud.google.com/cloud-build/builds/%s?project=%s", build_id, project)
223221

224-
logger.info("Building container image (this may take 5-10 minutes)...")
222+
logging.info("Building container image (this may take 5-10 minutes)...")
225223
result = operation.result(timeout=1200) # 20 minute timeout
226224

227225
if result.status == cloudbuild_v1.Build.Status.SUCCESS:
228-
logger.info(f"Container built successfully: {image_uri}")
226+
logging.info("Container built successfully: %s", image_uri)
229227
ar_url = (
230228
"https://console.cloud.google.com/artifacts"
231229
f"/docker/{project}/{ar_location}"
232230
f"/keras-remote/base?project={project}"
233231
)
234-
logger.info(f"View image: {ar_url}")
232+
logging.info("View image: %s", ar_url)
235233
return image_uri
236234
else:
237235
raise RuntimeError(f"Build failed with status: {result.status}")
@@ -298,7 +296,7 @@ def _upload_build_source(tarball_path, bucket_name, project):
298296
blob.upload_from_filename(tarball_path)
299297

300298
gcs_uri = f"gs://{bucket_name}/{blob_name}"
301-
logger.info(f"Uploaded build source to {gcs_uri}")
302-
logger.info(f"View source: https://console.cloud.google.com/storage/browser/{bucket_name}?project={project}")
299+
logging.info("Uploaded build source to %s", gcs_uri)
300+
logging.info("View source: https://console.cloud.google.com/storage/browser/%s?project=%s", bucket_name, project)
303301

304302
return gcs_uri

keras_remote/runner/remote_runner.py

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import traceback
1313
import zipfile
1414

15+
from absl import logging
1516
import cloudpickle
1617
from google.cloud import storage
1718

@@ -26,7 +27,9 @@ def main():
2627
"""
2728

2829
if len(sys.argv) < 4:
29-
print("Usage: remote_runner.py <context_gcs> <payload_gcs> <result_gcs>")
30+
logging.error(
31+
"Usage: remote_runner.py <context_gcs> <payload_gcs> <result_gcs>"
32+
)
3033
sys.exit(1)
3134

3235
run_gcs_mode()
@@ -44,7 +47,7 @@ def run_gcs_mode():
4447
payload_gcs = sys.argv[2]
4548
result_gcs = sys.argv[3]
4649

47-
print(f"[REMOTE] Starting GCS execution mode", flush=True)
50+
logging.info("Starting GCS execution mode")
4851

4952
# Define local paths using tempfile
5053
context_path = os.path.join(TEMP_DIR, "context.zip")
@@ -56,7 +59,7 @@ def run_gcs_mode():
5659
storage_client = storage.Client()
5760

5861
# Download artifacts from Cloud Storage
59-
print(f"[REMOTE] Downloading artifacts...", flush=True)
62+
logging.info("Downloading artifacts...")
6063
_download_from_gcs(storage_client, context_gcs, context_path)
6164
_download_from_gcs(storage_client, payload_gcs, payload_path)
6265

@@ -72,7 +75,7 @@ def run_gcs_mode():
7275
sys.path.insert(0, workspace_dir)
7376

7477
# Load and deserialize payload
75-
print("[REMOTE] Loading function payload", flush=True)
78+
logging.info("Loading function payload")
7679
with open(payload_path, "rb") as f:
7780
payload = cloudpickle.load(f)
7881

@@ -81,19 +84,19 @@ def run_gcs_mode():
8184
kwargs = payload["kwargs"]
8285
env_vars = payload.get("env_vars", {})
8386
if env_vars:
84-
print(f"[REMOTE] Setting {len(env_vars)} environment variables", flush=True)
87+
logging.info("Setting %d environment variables", len(env_vars))
8588
os.environ.update(env_vars)
8689

8790
# Execute function and capture result
88-
print(f"[REMOTE] Executing {func.__name__}()", flush=True)
91+
logging.info("Executing %s()", func.__name__)
8992
result = None
9093
exception = None
9194

9295
try:
9396
result = func(*args, **kwargs)
94-
print(f"[REMOTE] Function completed successfully", flush=True)
97+
logging.info("Function completed successfully")
9598
except BaseException as e:
96-
print(f"[REMOTE] ERROR: {type(e).__name__}: {e}", flush=True)
99+
logging.error("%s: %s", type(e).__name__, e)
97100
traceback.print_exc()
98101
sys.stdout.flush()
99102
sys.stderr.flush()
@@ -107,21 +110,21 @@ def run_gcs_mode():
107110
"success": exception is None,
108111
"result": result if exception is None else None,
109112
"exception": exception,
110-
"traceback": traceback.format_exc() if exception else None
113+
"traceback": traceback.format_exc() if exception else None,
111114
}
112115

113116
with open(result_path, "wb") as f:
114117
cloudpickle.dump(result_payload, f)
115118

116119
# Upload result to Cloud Storage
117-
print(f"[REMOTE] Uploading result...", flush=True)
120+
logging.info("Uploading result...")
118121
_upload_to_gcs(storage_client, result_path, result_gcs)
119122

120-
print("[REMOTE] Execution complete", flush=True)
123+
logging.info("Execution complete")
121124
sys.exit(0 if exception is None else 1)
122125

123126
except Exception as e:
124-
print(f"[REMOTE] FATAL ERROR: {e}", flush=True)
127+
logging.fatal("%s", e)
125128
traceback.print_exc()
126129
sys.exit(1)
127130

0 commit comments

Comments
 (0)