Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion .github/workflows/ux-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,11 @@ jobs:
workers: 1
memory: 7168
timeout: 1200
- backend: gcs-local
services: "minio,postgresql,metadata-service,fake-gcs-server"
workers: 4
Comment thread
greptile-apps[bot] marked this conversation as resolved.
memory: 6144
timeout: 900
- backend: sfn-batch
services: "minio,postgresql,metadata-service,localbatch,ddb-local,sfn-local"
workers: 2
Expand Down Expand Up @@ -151,6 +156,10 @@ jobs:
pip install --upgrade pip
pip install -e ".[dev]"

- name: Install GCS dependencies
if: matrix.backend == 'gcs-local'
run: pip install google-cloud-storage

- name: Set up minikube
uses: medyagh/setup-minikube@aba8d5ff1666d19b9549133e3b92e70d4fc52cb7
with:
Expand Down Expand Up @@ -226,7 +235,7 @@ jobs:
SERVICES: ${{ matrix.services }}

- name: Pre-pull python:3.9 into minikube
if: matrix.backend != 'local' && matrix.backend != 'sfn-batch'
if: matrix.backend != 'local' && matrix.backend != 'sfn-batch' && matrix.backend != 'gcs-local'
run: minikube image pull python:3.9

- name: Save minikube images to cache
Expand All @@ -253,6 +262,12 @@ jobs:
if: matrix.backend == 'airflow-kubernetes'
run: devtools/ci/wait-airflow-api.sh

- name: Set GCS emulator environment
if: matrix.backend == 'gcs-local'
run: |
echo "METAFLOW_DEFAULT_DATASTORE=gs" >> $GITHUB_ENV
echo "STORAGE_EMULATOR_HOST=http://localhost:4443" >> $GITHUB_ENV

- name: Clean up completed pods and start background cleanup
if: matrix.backend == 'airflow-kubernetes'
run: |
Expand Down
3 changes: 3 additions & 0 deletions devtools/Tiltfile
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ components = {
"ddb-local": [],
"sfn-local": ["ddb-local"],
"airflow": ["postgresql"],
"fake-gcs-server": [],
}

# ---------------------------------------------------------------------------
Expand Down Expand Up @@ -93,6 +94,7 @@ load('./tilt/localbatch.tiltfile', 'setup_localbatch')
load('./tilt/ddb_local.tiltfile', 'setup_ddb_local')
load('./tilt/sfn_local.tiltfile', 'setup_sfn_local')
load('./tilt/airflow.tiltfile', 'setup_airflow')
load('./tilt/fake_gcs_server.tiltfile', 'setup_fake_gcs_server')

_SETUP = {
"minio": setup_minio,
Expand All @@ -104,6 +106,7 @@ _SETUP = {
"ddb-local": setup_ddb_local,
"sfn-local": setup_sfn_local,
"airflow": setup_airflow,
"fake-gcs-server": setup_fake_gcs_server,
}

# ---------------------------------------------------------------------------
Expand Down
22 changes: 22 additions & 0 deletions devtools/tilt/fake_gcs_server.tiltfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
load('./_result.tiltfile', 'new_result')

def setup_fake_gcs_server(ctx):
k8s_yaml(read_file('./tilt/k8s/fake-gcs-server.yaml'))
k8s_yaml(read_file('./tilt/k8s/gcs-bucket-init-job.yaml'))
k8s_yaml(read_file('./tilt/k8s/fake-gcs-secret.yaml'))

k8s_resource(
'fake-gcs-server',
port_forwards=['4443:4443'],
links=[link('http://localhost:4443/storage/v1/b', 'fake-gcs-server buckets')],
labels=['fake-gcs-server'],
)

k8s_resource('gcs-bucket-init', resource_deps=['fake-gcs-server'], labels=['fake-gcs-server'])

return new_result(
config={"METAFLOW_DATASTORE_SYSROOT_GS": "gs://metaflow-test/metaflow"},
shell_env={"STORAGE_EMULATOR_HOST": "http://localhost:4443"},
config_resources=['gcs-bucket-init'],
k8s_secrets=['fake-gcs-secret'],
)
7 changes: 7 additions & 0 deletions devtools/tilt/k8s/fake-gcs-secret.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
apiVersion: v1
kind: Secret
metadata:
name: fake-gcs-secret
type: Opaque
stringData:
STORAGE_EMULATOR_HOST: http://fake-gcs-server:4443
40 changes: 40 additions & 0 deletions devtools/tilt/k8s/fake-gcs-server.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: fake-gcs-server
spec:
replicas: 1
selector:
matchLabels:
app: fake-gcs-server
template:
metadata:
labels:
app: fake-gcs-server
spec:
containers:
- name: fake-gcs-server
image: fsouza/fake-gcs-server:latest
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Mutable latest image tag reduces reproducibility

fsouza/fake-gcs-server:latest can silently pick up a breaking upstream release between CI runs, making failures hard to diagnose. Pinning to a specific release tag (e.g. 1.15.0) keeps the environment reproducible. The same applies to curlimages/curl:latest in gcs-bucket-init-job.yaml.

# Port 4443 is the fake-gcs-server default; we keep it for
# compatibility even though we use -scheme http (not HTTPS).
args: ["-scheme", "http", "-host", "0.0.0.0", "-port", "4443"]
ports:
- containerPort: 4443
resources:
requests:
cpu: 50m
memory: 64Mi
limits:
cpu: 200m
memory: 128Mi
---
apiVersion: v1
kind: Service
metadata:
name: fake-gcs-server
spec:
selector:
app: fake-gcs-server
ports:
- port: 4443
targetPort: 4443
27 changes: 27 additions & 0 deletions devtools/tilt/k8s/gcs-bucket-init-job.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
apiVersion: batch/v1
kind: Job
metadata:
name: gcs-bucket-init
spec:
ttlSecondsAfterFinished: 120
template:
spec:
restartPolicy: OnFailure
containers:
- name: init
image: curlimages/curl:latest
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Mutable latest image tag

curlimages/curl:latest is a floating tag; pinning to a digest or specific version (e.g. curlimages/curl:8.7.1) makes the bucket-init job deterministic across CI runs.

command: ["/bin/sh", "-ec"]
args:
- |
curl -sf -X POST \
http://fake-gcs-server:4443/storage/v1/b \
-H "Content-Type: application/json" \
-d '{"name":"metaflow-test"}'
echo "Bucket 'metaflow-test' created successfully"
Comment on lines +12 to +20
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Non-idempotent bucket init breaks on retry

With restartPolicy: OnFailure, if the container is killed after the bucket is created (e.g., OOM eviction, node pressure) but before the Job records success, Kubernetes restarts the container. The second attempt POSTs to an already-existing bucket, gets a 409, and curl -sf treats that as a failure — causing repeated retries until the Job's backoffLimit is exhausted and it enters a permanent Failed state. Handling the 409 makes the script safe to retry:

Suggested change
image: curlimages/curl:latest
command: ["/bin/sh", "-ec"]
args:
- |
curl -sf -X POST \
http://fake-gcs-server:4443/storage/v1/b \
-H "Content-Type: application/json" \
-d '{"name":"metaflow-test"}'
echo "Bucket 'metaflow-test' created successfully"
command: ["/bin/sh", "-ec"]
args:
- |
HTTP_STATUS=$(curl -s -o /dev/null -w "%{http_code}" -X POST \
http://fake-gcs-server:4443/storage/v1/b \
-H "Content-Type: application/json" \
-d '{"name":"metaflow-test"}')
[ "$HTTP_STATUS" = "200" ] || [ "$HTTP_STATUS" = "409" ] || \
(echo "Unexpected status: $HTTP_STATUS" && exit 1)
echo "Bucket 'metaflow-test' ready (status: $HTTP_STATUS)"

resources:
requests:
cpu: 25m
memory: 32Mi
limits:
cpu: 100m
memory: 64Mi
17 changes: 12 additions & 5 deletions metaflow/plugins/gcp/gs_storage_client_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,19 @@ def _get_gs_storage_client_default():
cache_key = _get_cache_key()
if cache_key not in _client_cache:
from google.cloud import storage
import google.auth

credentials, project_id = google.auth.default(scopes=storage.Client.SCOPE)
_client_cache[cache_key] = storage.Client(
credentials=credentials, project=project_id
)
if os.environ.get("STORAGE_EMULATOR_HOST"):
# When a storage emulator is configured, create a plain Client()
# which auto-detects the emulator and uses anonymous credentials.
# Calling google.auth.default() would fail without real GCP creds.
_client_cache[cache_key] = storage.Client()
else:
import google.auth

credentials, project_id = google.auth.default(scopes=storage.Client.SCOPE)
_client_cache[cache_key] = storage.Client(
credentials=credentials, project=project_id
)
return _client_cache[cache_key]


Expand Down
28 changes: 28 additions & 0 deletions test/ux/core/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,37 @@ def _set_devstack_env():
os.environ.setdefault("AWS_ENDPOINT_URL_EVENTBRIDGE", "http://localhost:7777")


def _setup_gcs_emulator():
"""Configure the GCS client factory to use anonymous credentials.

When STORAGE_EMULATOR_HOST is set, the google-cloud-storage Client
automatically uses anonymous credentials and routes requests to the
emulator -- but only when no explicit credentials are passed.
Metaflow's default GCP client provider calls google.auth.default()
first, which fails without real GCP credentials. We monkey-patch
the factory to return a plain Client() that auto-detects the emulator.
"""
if not os.environ.get("STORAGE_EMULATOR_HOST"):
return

try:
from google.cloud import storage
from metaflow.plugins.gcp import gs_storage_client_factory as factory

_emulator_client = storage.Client()

def _get_emulator_client():
return _emulator_client

factory.get_gs_storage_client = _get_emulator_client
except ImportError:
pass


def pytest_configure(config):
"""
Called early by pytest (before collection) so env vars are set before
metaflow is imported at module level by the test files.
"""
_set_devstack_env()
_setup_gcs_emulator()
10 changes: 8 additions & 2 deletions test/ux/core/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,9 +265,15 @@ def verify_run_provenance(run: Run, decospecs: Any) -> None:
start_task = run["start"].task

ds_type = start_task.metadata_dict.get("ds-type")
# Only enforce the S3 check when the test environment uses a remote datastore.
# Only enforce the remote datastore check when the test environment uses one.
# Local-only CI environments (METAFLOW_DEFAULT_DATASTORE=local) do not have MinIO.
if os.environ.get("METAFLOW_DEFAULT_DATASTORE", "") != "local":
default_ds = os.environ.get("METAFLOW_DEFAULT_DATASTORE", "")
if default_ds == "gs":
assert ds_type == "gs", (
f"Expected datastore type 'gs' (GCS), got {ds_type!r}. "
f"Artifacts may be stored locally — check METAFLOW_HOME / METAFLOW_PROFILE."
)
elif default_ds != "local":
assert ds_type == "s3", (
f"Expected datastore type 's3' (MinIO), got {ds_type!r}. "
f"Artifacts may be stored locally — check METAFLOW_HOME / METAFLOW_PROFILE."
Expand Down
8 changes: 8 additions & 0 deletions test/ux/ux_test_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,14 @@ backends:
decospec: null
enabled: true

# GCS local: runs flows locally but uses Google Cloud Storage (fake-gcs-server)
# instead of S3/MinIO for the datastore. Tests the GCS datastore backend.
- name: gcs-local
scheduler_type: null
cluster: null
decospec: null
enabled: true

# Argo Workflows + Kubernetes (devstack: minikube + argo-workflows)
- name: argo-kubernetes
scheduler_type: argo-workflows
Expand Down
Loading