Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
e2a0a51
added support for workload identity federation
brock-acryl Jul 4, 2025
1c792af
using google auth library directly
brock-acryl Jul 5, 2025
5421931
changed the method for reading in the WIF creds
bryanprosser-acryl Jan 28, 2026
a6e4dd5
Merge branch 'master' into gcs-wif-support-update
bryanprosser-acryl Jan 28, 2026
589ac2a
fix: update pydantic validators to use field_validator and model_vali…
bryanprosser-acryl Jan 28, 2026
e47d68a
Merge branch 'master' into gcs-wif-support-update
bryanprosser-acryl Jan 28, 2026
8163634
added some error handling
bryanprosser-acryl Jan 28, 2026
0ccb883
Merge branch 'gcs-wif-support-update' of https://github.com/datahub-p…
bryanprosser-acryl Jan 28, 2026
078ea30
added scope
bryanprosser-acryl Jan 29, 2026
5933bce
Merge branch 'master' into gcs-wif-support-update
bryanprosser-acryl Jan 29, 2026
580c138
added gcs wrapper for WIF
bryanprosser-acryl Jan 29, 2026
82b397f
fixed enum types based on review
bryanprosser-acryl Mar 4, 2026
8e598e6
Merge branch 'master' into gcs-wif-support-update
bryanprosser-acryl Mar 4, 2026
866500a
removed unnecessary comments
bryanprosser-acryl Mar 4, 2026
880adbb
Merge branch 'gcs-wif-support-update' of https://github.com/datahub-p…
bryanprosser-acryl Mar 4, 2026
22943d2
documentation updated to include reference to the WIF in the prerequi…
bryanprosser-acryl Mar 4, 2026
a03a4be
Logic added to cleanup the WIF file (if created)
bryanprosser-acryl Mar 4, 2026
ae769bb
added WIF unit tests
bryanprosser-acryl Mar 4, 2026
4d8782e
Fixed type issue
bryanprosser-acryl Mar 4, 2026
bf8e7d5
Fixed linting issues
bryanprosser-acryl Mar 4, 2026
71e9549
updated markdown based on prettier check
bryanprosser-acryl Mar 4, 2026
da5fd7a
Merge branch 'master' into gcs-wif-support-update
bryanprosser-acryl Mar 4, 2026
5b2c55e
Merge branch 'master' into gcs-wif-support-update
bryanprosser-acryl Mar 4, 2026
80feb8d
test commit
bryanprosser-acryl Mar 23, 2026
4320589
fix(ingestion/gcs): resolve merge conflicts and restructure docs to m…
bryanprosser-acryl Mar 23, 2026
07f1de4
refactor(gcs): update Workload Identity Federation credential handling
bryanprosser-acryl Mar 24, 2026
62e4861
Merge branch 'master' into gcs-wif-support-update
bryanprosser-acryl Mar 24, 2026
1f7a427
feat(gcs): Centralise GCP Workload Identity Federation configuration …
bryanprosser-acryl Mar 24, 2026
0bd874c
Merge branch 'gcs-wif-support-update' of https://github.com/datahub-p…
bryanprosser-acryl Mar 24, 2026
df2df56
Merge branch 'master' into gcs-wif-support-update
bryanprosser-acryl Mar 24, 2026
aa88811
fix(ingestion/gcs): address PR review findings for WIF implementation
bryanprosser-acryl Mar 24, 2026
3bb4e2b
Merge branch 'gcs-wif-support-update' of https://github.com/datahub-p…
bryanprosser-acryl Mar 24, 2026
60564fe
Merge branch 'master' into gcs-wif-support-update
bryanprosser-acryl Mar 24, 2026
60803c4
Merge branch 'master' into gcs-wif-support-update
bryanprosser-acryl Mar 24, 2026
d7e6154
Merge branch 'master' into gcs-wif-support-update
bryanprosser-acryl Mar 24, 2026
0a1e129
Merge branch 'master' into gcs-wif-support-update
bryanprosser-acryl Mar 27, 2026
b0c4239
feat(gcp): Add validation for mutual exclusion of WIF configuration o…
bryanprosser-acryl Mar 27, 2026
4a90c0d
Merge branch 'master' into gcs-wif-support-update
bryanprosser-acryl Mar 27, 2026
f95c495
Merge branch 'master' into gcs-wif-support-update
bryanprosser-acryl Mar 27, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions metadata-ingestion/docs/sources/gcs/gcs_pre.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,18 @@ and uses DataHub S3 Data Lake integration source under the hood. Refer section [

Before running ingestion, ensure network connectivity to the source, valid authentication credentials, and read permissions for metadata APIs required by this module.

1. Create a service account with "Storage Object Viewer" Role - https://cloud.google.com/iam/docs/service-accounts-create
2. Make sure you meet following requirements to generate HMAC key - https://cloud.google.com/storage/docs/authentication/managing-hmackeys#before-you-begin
3. Create an HMAC key for service account created above - https://cloud.google.com/storage/docs/authentication/managing-hmackeys#create .
The GCS source supports two authentication methods:

1. **HMAC Keys** (default): Service account key-based authentication using Google Cloud Storage HMAC keys.
2. **Workload Identity Federation**: Keyless, token-based authentication. Recommended for Kubernetes/GKE workloads (avoids storing service account keys), cross-cloud authentication (AWS/Azure → GCP), and environments requiring automatic credential rotation.

#### HMAC Authentication

1. Create a service account with "Storage Object Viewer" role — [Create a service account](https://cloud.google.com/iam/docs/service-accounts-create).
2. Ensure you meet the [requirements to generate an HMAC key](https://cloud.google.com/storage/docs/authentication/managing-hmackeys#before-you-begin).
3. Create an HMAC key for the service account — [Create HMAC keys](https://cloud.google.com/storage/docs/authentication/managing-hmackeys#create).

#### Workload Identity Federation

1. Set up a Workload Identity Pool and Provider in Google Cloud — [Workload Identity Federation](https://cloud.google.com/iam/docs/workload-identity-federation).
2. Configure the credential (file path, inline JSON, or JSON string) that the connector will use to obtain short-lived access tokens. For Kubernetes (GKE), see [Workload Identity](https://cloud.google.com/kubernetes-engine/docs/how-to/workload-identity).
51 changes: 51 additions & 0 deletions metadata-ingestion/docs/sources/gcs/gcs_recipe.yml
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
# HMAC Authentication (default)
source:
type: gcs
config:
Expand All @@ -6,3 +7,53 @@ source:
credential:
hmac_access_id: <hmac access id>
hmac_access_secret: <hmac access secret>

---

# Workload Identity Federation with configuration file
source:
type: gcs
config:
auth_type: workload_identity_federation
gcp_wif_configuration: "/path/to/gcp_wif_configuration.json"
path_specs:
- include: gs://gcs-ingestion-bucket/parquet_example/{table}/year={partition[0]}/*.parquet

---

# Workload Identity Federation with inline configuration (dict)
source:
type: gcs
config:
auth_type: workload_identity_federation
gcp_wif_configuration_json:
type: external_account
audience: "//iam.googleapis.com/projects/PROJECT_NUMBER/locations/global/workloadIdentityPools/POOL_ID/providers/PROVIDER_ID"
subject_token_type: "urn:ietf:params:oauth:token-type:jwt"
token_url: "https://sts.googleapis.com/v1/token"
credential_source:
file: "/var/run/secrets/tokens/gcp-ksa/token"
service_account_impersonation_url: "https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/SERVICE_ACCOUNT_EMAIL:generateAccessToken"
path_specs:
- include: gs://gcs-ingestion-bucket/parquet_example/{table}/year={partition[0]}/*.parquet

---

# Workload Identity Federation with JSON string (copy-paste from file)
source:
type: gcs
config:
auth_type: workload_identity_federation
gcp_wif_configuration_json_string: |
{
"type": "external_account",
"audience": "//iam.googleapis.com/projects/PROJECT_NUMBER/locations/global/workloadIdentityPools/POOL_ID/providers/PROVIDER_ID",
"subject_token_type": "urn:ietf:params:oauth:token-type:jwt",
"token_url": "https://sts.googleapis.com/v1/token",
"credential_source": {
"file": "/var/run/secrets/tokens/gcp-ksa/token"
},
"service_account_impersonation_url": "https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/SERVICE_ACCOUNT_EMAIL:generateAccessToken"
}
path_specs:
- include: gs://gcs-ingestion-bucket/parquet_example/{table}/year={partition[0]}/*.parquet
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
import json
import logging
from typing import Any, Dict, Optional, Tuple, Union

from google.auth import load_credentials_from_dict
from google.auth.credentials import Credentials
from google.auth.transport.requests import Request
from pydantic import Field, model_validator

from datahub.configuration.common import ConfigModel

logger = logging.getLogger(__name__)


class GCPWIFConfig(ConfigModel):
"""
Mixin config for GCP Workload Identity Federation (WIF) authentication.

Provides three mutually-exclusive ways to supply the WIF JSON configuration.
Sources that support WIF inherit from this class and call `load_wif_credentials`
to obtain a `google.auth.credentials.Credentials` object.

BigQuery, Dataplex, VertexAI, and other GCP sources can adopt this mixin when
they need WIF support — no changes required to this module.
"""

gcp_wif_configuration: Optional[str] = Field(
default=None,
description=(
"Path to the GCP Workload Identity Federation configuration JSON file. "
"Mutually exclusive with gcp_wif_configuration_json and "
"gcp_wif_configuration_json_string."
),
)

gcp_wif_configuration_json: Optional[Union[str, Dict[str, Any]]] = Field(
default=None,
description=(
"GCP Workload Identity Federation configuration as a JSON string or dict. "
"Mutually exclusive with gcp_wif_configuration and "
"gcp_wif_configuration_json_string."
),
)

gcp_wif_configuration_json_string: Optional[str] = Field(
default=None,
description=(
"GCP Workload Identity Federation configuration as a JSON string "
"(contents of the configuration file). Useful for injecting configuration "
"from secrets managers. Mutually exclusive with gcp_wif_configuration and "
"gcp_wif_configuration_json."
),
)

@model_validator(mode="before")
@classmethod
def _validate_wif_json_format(cls, values: Dict[str, Any]) -> Dict[str, Any]:
"""Validate that the JSON-typed WIF options contain valid JSON."""
if not isinstance(values, dict):
return values

gcp_wif_configuration_json = values.get("gcp_wif_configuration_json")
gcp_wif_configuration_json_string = values.get(
"gcp_wif_configuration_json_string"
)

if gcp_wif_configuration_json:
if isinstance(gcp_wif_configuration_json, str):
try:
json.loads(gcp_wif_configuration_json)
except json.JSONDecodeError as e:
raise ValueError(
f"gcp_wif_configuration_json must be valid JSON: {e}"
) from e
elif not isinstance(gcp_wif_configuration_json, dict):
raise ValueError(
"gcp_wif_configuration_json must be either a JSON string or a dictionary"
)

if gcp_wif_configuration_json_string:
try:
json.loads(gcp_wif_configuration_json_string)
except json.JSONDecodeError as e:
raise ValueError(
f"gcp_wif_configuration_json_string must be valid JSON: {e}"
) from e

return values

@model_validator(mode="after")
def _validate_wif_mutual_exclusion(self) -> "GCPWIFConfig":
"""Validate that at most one WIF configuration option is set."""
provided = [
opt
for opt in [
self.gcp_wif_configuration,
self.gcp_wif_configuration_json,
self.gcp_wif_configuration_json_string,
]
if opt is not None
]
if len(provided) > 1:
raise ValueError(
"Cannot specify multiple WIF configuration options. Use only one of: "
"gcp_wif_configuration, gcp_wif_configuration_json, or gcp_wif_configuration_json_string."
)
return self


def load_wif_credentials(
wif_config: GCPWIFConfig,
) -> Tuple[Credentials, Optional[str]]:
"""
Load GCP Workload Identity Federation credentials from a GCPWIFConfig.

Resolves whichever config option is set to a dict, then calls
`google.auth.load_credentials_from_dict`. Applies the cloud-platform scope
(required for service account impersonation via WIF) and attempts an initial
token refresh to validate the credentials.

Returns:
A tuple of (credentials, project_id). project_id may be None if the WIF
configuration does not specify one.

Raises:
ValueError: If no WIF configuration is provided or if credential loading fails.
"""
if not any(
[
wif_config.gcp_wif_configuration,
wif_config.gcp_wif_configuration_json,
wif_config.gcp_wif_configuration_json_string,
]
):
raise ValueError("No valid WIF configuration provided")
Comment on lines +128 to +135

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this could be moved up as a pydantic validation
you could also add the validation that only one is set


try:
if wif_config.gcp_wif_configuration:
with open(wif_config.gcp_wif_configuration) as f:
wif_config_dict: Dict[str, Any] = json.load(f)
logger.info(
"Using Workload Identity Federation configuration from file: %s",
wif_config.gcp_wif_configuration,
)
elif wif_config.gcp_wif_configuration_json:
if isinstance(wif_config.gcp_wif_configuration_json, dict):
wif_config_dict = wif_config.gcp_wif_configuration_json
else:
wif_config_dict = json.loads(wif_config.gcp_wif_configuration_json)
logger.info(
"Using Workload Identity Federation configuration from JSON content"
)
else:
wif_config_dict = json.loads(wif_config.gcp_wif_configuration_json_string) # type: ignore[arg-type]
logger.info(
"Using Workload Identity Federation configuration from JSON string"
)

credentials, project_id = load_credentials_from_dict(wif_config_dict)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

# Impersonation (WIF → SA) requires scopes; otherwise IAM returns 400 "Scope required."
credentials = credentials.with_scopes(
["https://www.googleapis.com/auth/cloud-platform"]
)

# Try to refresh credentials to validate they work.
# If refresh fails, log a warning but continue — the caller will refresh
# automatically on the first actual API call.
try:
credentials.refresh(Request())
logger.debug("Successfully refreshed WIF credentials")
except Exception as refresh_error:
logger.warning(
"Failed to refresh WIF credentials during setup (this may be expected): %s",
refresh_error,
)

logger.info("Successfully loaded Workload Identity Federation credentials")
return credentials, project_id

except Exception as e:
raise ValueError(
f"Failed to load Workload Identity Federation credentials: {e}"
) from e
Loading
Loading