Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
149 changes: 126 additions & 23 deletions python/delta_sharing/_internal_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,17 @@
#

from abc import ABC, abstractmethod
from datetime import datetime
from datetime import datetime, timezone
from typing import Optional
import requests
import base64
import json
from jwcrypto import jwk, jwt
import threading
import requests.sessions
import time
from typing import Dict
import uuid

from delta_sharing.protocol import (
DeltaSharingProfile,
Expand Down Expand Up @@ -113,28 +115,10 @@ def __init__(self, access_token: str, expires_in: int, creation_timestamp: int):
self.creation_timestamp = creation_timestamp


class OAuthClient:
def __init__(
self, token_endpoint: str, client_id: str, client_secret: str, scope: Optional[str] = None
):
self.token_endpoint = token_endpoint
self.client_id = client_id
self.client_secret = client_secret
self.scope = scope

class OAuthClient(ABC):
@abstractmethod
def client_credentials(self) -> OAuthClientCredentials:
credentials = base64.b64encode(
f"{self.client_id}:{self.client_secret}".encode("utf-8")
).decode("utf-8")
headers = {
"accept": "application/json",
"authorization": f"Basic {credentials}",
"content-type": "application/x-www-form-urlencoded",
}
body = f"grant_type=client_credentials{f'&scope={self.scope}' if self.scope else ''}"
response = requests.post(self.token_endpoint, headers=headers, data=body)
response.raise_for_status()
return self.parse_oauth_token_response(response.text)
pass

def parse_oauth_token_response(self, response: str) -> OAuthClientCredentials:
if not response:
Expand Down Expand Up @@ -169,6 +153,94 @@ def parse_oauth_token_response(self, response: str) -> OAuthClientCredentials:
)


class ClientSecretOAuthClient(OAuthClient):
def __init__(
self,
token_endpoint: str,
client_id: str,
client_secret: str,
scope: Optional[str] = None,
):
self.token_endpoint = token_endpoint
self.client_id = client_id
self.client_secret = client_secret
self.scope = scope

def client_credentials(self) -> OAuthClientCredentials:
credentials = base64.b64encode(
f"{self.client_id}:{self.client_secret}".encode("utf-8")
).decode("utf-8")
headers = {
"accept": "application/json",
"authorization": f"Basic {credentials}",
"content-type": "application/x-www-form-urlencoded",
}
body = f"grant_type=client_credentials{f'&scope={self.scope}' if self.scope else ''}"
response = requests.post(self.token_endpoint, headers=headers, data=body)
response.raise_for_status()
return self.parse_oauth_token_response(response.text)


class PrivateKeyOAuthClient(OAuthClient):
def __init__(
self,
token_endpoint: str,
client_id: str,
key_id: str,
private_key: str,
issuer: str,
scope: Optional[str] = None,
resource: Optional[str] = None,
algorithm: Optional[str] = None,
):
self.token_endpoint = token_endpoint
self.client_id = client_id
self.key_id = key_id
self.private_key = private_key
self.issuer = issuer
self.scope = scope
self.resource = resource
if algorithm is None:
algorithm = "RS256"
self.algorithm = algorithm

def client_credentials(self) -> OAuthClientCredentials:
timestamp = int(datetime.now(timezone.utc).timestamp())
jwt_header = {"alg": self.algorithm, "kid": self.key_id}
jwt_claims = {
"aud": self.issuer,
"iss": self.client_id,
Copy link
Collaborator

@moderakh moderakh Aug 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@paalvibe how do you intend to use this client? Specifically, do you plan to use this change with Delta Sharing over OIDC M2M?

From the PR, it sounds like you are sending a self-signed token to your configured token endpoint, which then returns a JWT.

I am curious what the resultant token from your token endpoint looks like?

Could you share a sample of the JWT that your token endpoint returns (and that the client ultimately sends to the Delta Sharing server), along with the corresponding OIDC federation policy configuration the server uses to authenticate the request?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, Moe,

Yes, we are planning to use Delta Sharing over OIDC M2M.

Here is an example of the decoded JWT returned by the token endpoint:

{
  "aud": "https://acme.org",
  "sub": "0192: 315300649;acme:customerdata.gold",
  "scope": "acme:customerdata.gold",
  "iss": "https://sky.maskinporten.no",
  "client_amr": "private_key_jwt",
  "token_type": "Bearer",
  "exp": 1694222211,
  "iat": 1694333311,
  "client_id": "abcd1234-1234-abcd-abcd-12341234abcd",
  "jti": "lwlwlwlw4lwlwlwlwl4lwlw4-lw-lwl4lwl4lwl4lwl4",
  "consumer": {
    "authority": "iso6523-actorid-upis",
    "ID": "0192:315300649"
  }
}

Here are the policy details needed in Databricks OIDC server configuration:

Issuer URL: https://sky.maskinporten.no/
Subject claim: sub
Subject: 0192:315300649;acme:customerdata.gold
Audiences: https://acme.org/

sky.maskinporten.no is the national company identity provider in Norway: https://docs-digdir-no.translate.goog/docs/Maskinporten/maskinporten_skyporten.html?_x_tr_sl=no&_x_tr_tl=en&_x_tr_hl=en&_x_tr_pto=wapp

It is a machine to machine identity service, where access is granted in a maskinporten web interface or API according to organization number e.g. "0192:315300649" and scope e.g. "acme:customerdata.gold".

We have already implemented this for sharing cloud resources on AWS, Azure and GCP, as explained in the documentation above.

We have already tested it with our branch and it works. It would enable any data consumer identified with skyporten to read a Delta Share wherever they are, inside or outside Databricks, e.g. in a local notebook.

More info if of intersest:

At Samferdselsdata.no (Public Transport Sector Data Sharing initiative) we have working with Norwegian Digitalisation Agency (Digdir) to implement Delta Shares directly with the Maskinporten National Orgnumber OAuth2 service for authentication, like we have achieved for IAM based cloud access with Skyporten (https://docs.digdir.no/docs/Maskinporten/maskinporten_skyporten). This would mean that one could simply declare which org number should have access, and avoid any credentials exchange at all. Hopefully, a similar pattern will also be possible to use across Europe soon. In practice this enables country code+orgnumber based delta sharing instead of Entra or email-based access.

"iat": timestamp,
"exp": timestamp + 120,
"jti": str(uuid.uuid4()),
}
if self.scope:
jwt_claims["scope"] = self.scope
if self.resource:
jwt_claims["resource"] = self.resource # In OAuth 2 spec audience is called resource
signed_jwt = self._signed_jwt(jwt_header, jwt_claims)
body = {
"grant_type": "urn:ietf:params:oauth:grant-type:jwt-bearer",
"assertion": signed_jwt,
}
headers = {
"accept": "application/json",
"content-type": "application/x-www-form-urlencoded",
}
response = requests.post(self.token_endpoint, headers=headers, data=body)
response.raise_for_status()
return self.parse_oauth_token_response(response.text)

def _signed_jwt(self, jwt_header, jwt_claims):
"""Generate a signed JWT token using the private key"""
jwt_token = jwt.JWT(header=jwt_header, claims=jwt_claims)
with open(self.private_key, "rb") as key_file:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This assumes the key_file is available on disk and that the client can read it directly. How would this work in a Spark cluster environment, where the key isn’t necessarily stored on local disk? Where is the key expected to be stored in that case?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have not tested it inside a spark cluster environment, since we envisioned that data scientists would want to read the data in their development context. However, we are happy to get feedback from you on how to get it. In spark, one could get the key from a secrets service, and write the contents to a local path which is passed to DeltaSharingProfile(). However, an alternative or secondary option could be to pass the secret value into DeltaSharingProfile() as parameter.

pem_data = key_file.read()
private_key = jwk.JWK.from_pem(pem_data)
jwt_token.make_signed_token(private_key)
return jwt_token.serialize()


class OAuthClientCredentialsAuthProvider(AuthCredentialProvider):
def __init__(self, oauth_client: OAuthClient, auth_config: AuthConfig = AuthConfig()):
self.auth_config = auth_config
Expand Down Expand Up @@ -210,6 +282,8 @@ def create_auth_credential_provider(profile: DeltaSharingProfile):
if profile.share_credentials_version == 2:
if profile.type == "oauth_client_credentials":
return AuthCredentialProviderFactory.__oauth_client_credentials(profile)
elif profile.type == "oauth_client_private_key":
return AuthCredentialProviderFactory.__oauth_client_private_key(profile)
elif profile.type == "basic":
return AuthCredentialProviderFactory.__auth_basic(profile)
elif profile.share_credentials_version == 1 and (
Expand All @@ -236,7 +310,7 @@ def __oauth_client_credentials(profile):
if profile in AuthCredentialProviderFactory.__oauth_auth_provider_cache:
return AuthCredentialProviderFactory.__oauth_auth_provider_cache[profile]

oauth_client = OAuthClient(
oauth_client = ClientSecretOAuthClient(
token_endpoint=profile.token_endpoint,
client_id=profile.client_id,
client_secret=profile.client_secret,
Expand All @@ -248,6 +322,35 @@ def __oauth_client_credentials(profile):
AuthCredentialProviderFactory.__oauth_auth_provider_cache[profile] = provider
return provider

@staticmethod
def __oauth_client_private_key(profile):
# Once a clientId/privateKey/keyId is exchanged for an accessToken,
# the accessToken can be reused until it expires.
# Resource-claim in JWT-grant is optional, value is set in config.share.audience
# The Python client re-creates DeltaSharingClient for different requests.
# To ensure the OAuth access_token is reused,
# we keep a mapping from profile -> OAuthClientCredentialsAuthProvider.
# This prevents re-initializing OAuthClientCredentialsAuthProvider for the same profile,
# ensuring the access_token can be reused.
if profile in AuthCredentialProviderFactory.__oauth_auth_provider_cache:
return AuthCredentialProviderFactory.__oauth_auth_provider_cache[profile]

oauth_client = PrivateKeyOAuthClient(
token_endpoint=profile.token_endpoint,
client_id=profile.client_id,
key_id=profile.key_id,
private_key=profile.private_key,
issuer=profile.issuer,
resource=profile.audience,
scope=profile.scope,
algorithm=profile.algorithm,
)
provider = OAuthClientCredentialsAuthProvider(
oauth_client=oauth_client, auth_config=AuthConfig()
)
AuthCredentialProviderFactory.__oauth_auth_provider_cache[profile] = provider
return provider

@staticmethod
def __auth_bearer_token(profile):
return BearerTokenAuthProvider(profile.bearer_token, profile.expiration_time)
Expand Down
24 changes: 23 additions & 1 deletion python/delta_sharing/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ class DeltaSharingProfile:
username: Optional[str] = None
password: Optional[str] = None
scope: Optional[str] = None
issuer: Optional[str] = None
audience: Optional[str] = None
private_key: Optional[str] = None
key_id: Optional[str] = None
algorithm: Optional[str] = None

def __post_init__(self):
if self.share_credentials_version > DeltaSharingProfile.CURRENT:
Expand Down Expand Up @@ -78,7 +83,24 @@ def from_json(json) -> "DeltaSharingProfile":
)
elif share_credentials_version == 2:
type = json["type"]
if type == "oauth_client_credentials":
if type == "oauth_client_private_key":
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we update the share profile format for the new auth type to use the following structure?

{
  "shareCredentialsVersion": 2,
  "endpoint": "https://sharing.delta.io/delta-sharing",
  "type": "oauth_jwt_bearer_private_key_jwt",
  "auth": {
    "tokenEndpoint": "https://test.sky.maskinporten.no/token",
    "clientId": "6ff6dc4a-6924-42a7-a692-309b2323bb5c",
    "issuer": "https://test.sky.maskinporten.no/",
    "audience": "https://sky.kartverket.no/",
    "scope": "entur:skyporten.demo",
    "privateKey" : {
      "keyId": "b7eb622a-58b5-4e3c-9358-45301c4ea584",
      "privateKeyFile": "/home/secrets/delta_share.pem",
      "algorithm": "RS256"
    }
  }
}

We should keep the existing supported format unchanged for backward compatibility.
This new structure would only apply to the new authentication method.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. We will make the change.

token_endpoint = json["tokenEndpoint"]
if token_endpoint is not None and token_endpoint.endswith("/"):
token_endpoint = token_endpoint[:-1]
return DeltaSharingProfile(
share_credentials_version=share_credentials_version,
type=type,
endpoint=endpoint,
token_endpoint=token_endpoint,
issuer=json["issuer"],
client_id=json["clientId"],
private_key=json["privateKey"],
key_id=json["keyId"],
audience=json["audience"],
scope=json.get("scope"),
algorithm=json.get("algorithm"),
)
elif type == "oauth_client_credentials":
token_endpoint = json["tokenEndpoint"]
if token_endpoint is not None and token_endpoint.endswith("/"):
token_endpoint = token_endpoint[:-1]
Expand Down
93 changes: 86 additions & 7 deletions python/delta_sharing/tests/test_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from unittest.mock import MagicMock
from datetime import datetime, timedelta
from delta_sharing._internal_auth import (
OAuthClient,
ClientSecretOAuthClient,
BasicAuthProvider,
AuthCredentialProviderFactory,
OAuthClientCredentialsAuthProvider,
Expand Down Expand Up @@ -68,7 +68,7 @@ def test_bearer_token_auth_provider_get_expiration_time():


def test_oauth_client_credentials_auth_provider_exchange_token():
oauth_client = MagicMock(spec=OAuthClient)
oauth_client = MagicMock(spec=ClientSecretOAuthClient)
profile = MagicMock()
profile.token_endpoint = "http://example.com/token"
profile.client_id = "client-id"
Expand All @@ -91,7 +91,7 @@ def test_oauth_client_credentials_auth_provider_exchange_token():


def test_oauth_client_credentials_auth_provider_reuse_token():
oauth_client = MagicMock(spec=OAuthClient)
oauth_client = MagicMock(spec=ClientSecretOAuthClient)
profile = MagicMock()
profile.token_endpoint = "http://example.com/token"
profile.client_id = "client-id"
Expand All @@ -114,7 +114,7 @@ def test_oauth_client_credentials_auth_provider_reuse_token():


def test_oauth_client_credentials_auth_provider_refresh_token():
oauth_client = MagicMock(spec=OAuthClient)
oauth_client = MagicMock(spec=ClientSecretOAuthClient)
profile = MagicMock()
profile.token_endpoint = "http://example.com/token"
profile.client_id = "client-id"
Expand All @@ -141,7 +141,7 @@ def test_oauth_client_credentials_auth_provider_refresh_token():


def test_oauth_client_credentials_auth_provider_needs_refresh():
oauth_client = MagicMock(spec=OAuthClient)
oauth_client = MagicMock(spec=ClientSecretOAuthClient)
profile = MagicMock()
profile.token_endpoint = "http://example.com/token"
profile.client_id = "client-id"
Expand All @@ -165,7 +165,7 @@ def test_oauth_client_credentials_auth_provider_needs_refresh():


def test_oauth_client_credentials_auth_provider_is_expired():
oauth_client = MagicMock(spec=OAuthClient)
oauth_client = MagicMock(spec=ClientSecretOAuthClient)
profile = MagicMock()
profile.token_endpoint = "http://example.com/token"
profile.client_id = "client-id"
Expand All @@ -177,7 +177,7 @@ def test_oauth_client_credentials_auth_provider_is_expired():


def test_oauth_client_credentials_auth_provider_get_expiration_time():
oauth_client = MagicMock(spec=OAuthClient)
oauth_client = MagicMock(spec=ClientSecretOAuthClient)
profile = MagicMock()
profile.token_endpoint = "http://example.com/token"
profile.client_id = "client-id"
Expand Down Expand Up @@ -246,6 +246,21 @@ def test_factory_creation():
provider = AuthCredentialProviderFactory.create_auth_credential_provider(profile_oauth)
assert isinstance(provider, OAuthClientCredentialsAuthProvider)

profile_pk = DeltaSharingProfile(
share_credentials_version=2,
type="oauth_client_private_key",
endpoint="https://localhost/delta-sharing/",
token_endpoint="https://localhost/token",
client_id="clientId",
private_key="privateKey",
key_id="keyId",
issuer="issuer",
scope="scope",
audience="audience",
)
provider = AuthCredentialProviderFactory.create_auth_credential_provider(profile_pk)
assert isinstance(provider, OAuthClientCredentialsAuthProvider)


def test_oauth_auth_provider_reused():
profile_oauth1 = DeltaSharingProfile(
Expand Down Expand Up @@ -297,3 +312,67 @@ def test_oauth_auth_provider_with_different_profiles():
provider2 = AuthCredentialProviderFactory.create_auth_credential_provider(profile_oauth2)

assert provider1 != provider2


def test_oauth_private_key_auth_provider_reused():
profile_pk1 = DeltaSharingProfile(
share_credentials_version=2,
type="oauth_client_private_key",
endpoint="https://localhost/delta-sharing/",
token_endpoint="https://localhost/token",
client_id="clientId",
private_key="privateKey",
key_id="keyId",
issuer="issuer",
scope="scope",
audience="audience",
)
provider1 = AuthCredentialProviderFactory.create_auth_credential_provider(profile_pk1)
assert isinstance(provider1, OAuthClientCredentialsAuthProvider)

profile_pk2 = DeltaSharingProfile(
share_credentials_version=2,
type="oauth_client_private_key",
endpoint="https://localhost/delta-sharing/",
token_endpoint="https://localhost/token",
client_id="clientId",
private_key="privateKey",
key_id="keyId",
issuer="issuer",
scope="scope",
audience="audience",
)
provider2 = AuthCredentialProviderFactory.create_auth_credential_provider(profile_pk2)
assert provider1 == provider2


def test_oauth_private_key_auth_provider_with_different_profiles():
profile_pk1 = DeltaSharingProfile(
share_credentials_version=2,
type="oauth_client_private_key",
endpoint="https://localhost/delta-sharing/",
token_endpoint="https://localhost/1/token",
client_id="clientId",
private_key="privateKey",
key_id="keyId",
issuer="issuer",
scope="scope",
audience="audience",
)
provider1 = AuthCredentialProviderFactory.create_auth_credential_provider(profile_pk1)
assert isinstance(provider1, OAuthClientCredentialsAuthProvider)

profile_pk2 = DeltaSharingProfile(
share_credentials_version=2,
type="oauth_client_private_key",
endpoint="https://localhost/delta-sharing/",
token_endpoint="https://localhost/2/token",
client_id="clientId",
private_key="privateKey",
key_id="keyId",
issuer="issuer",
scope="scope",
audience="audience",
)
provider2 = AuthCredentialProviderFactory.create_auth_credential_provider(profile_pk2)
assert provider1 != provider2
Loading
Loading