Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion azure-storage/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ alpakka {

#azure-credentials
credentials {
# valid values are anon (annonymous), SharedKey, SharedKeyLite, and sas
# valid values are anon (anonymous), SharedKey, SharedKeyLite, sas, and BearerToken
# BearerToken uses Azure AD OAuth2 via azure-identity (supports managed identity, workload identity, etc.)
authorization-type = anon
authorization-type = ${?AZURE_STORAGE_AUTHORIZATION_TYPE}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package azure
package storage

import akka.actor.ClassicActorSystemProvider
import com.azure.core.credential.TokenCredential
import com.typesafe.config.Config

import java.time.{Duration => JavaDuration}
Expand All @@ -22,7 +23,8 @@ final class StorageSettings(val apiVersion: String,
val azureNameKeyCredential: AzureNameKeyCredential,
val sasToken: Option[String],
val retrySettings: RetrySettings,
val algorithm: String) {
val algorithm: String,
val tokenCredential: Option[TokenCredential] = None) {

/** Java API */
def getApiVersion: String = apiVersion
Expand All @@ -45,37 +47,37 @@ final class StorageSettings(val apiVersion: String,
/** Java API */
def getAlgorithm: String = algorithm

/** Java API */
def witApiVersion(apiVersion: String): StorageSettings = copy(apiVersion = apiVersion)
Comment thread
johanandren marked this conversation as resolved.
Outdated

/** Java API */
def withAuthorizationType(authorizationType: String): StorageSettings = copy(authorizationType = authorizationType)

/** Java API */
def withSasToken(sasToken: String): StorageSettings = copy(sasToken = emptyStringToOption(sasToken))

/** Java API */
def withAzureNameKeyCredential(azureNameKeyCredential: AzureNameKeyCredential): StorageSettings =
copy(azureNameKeyCredential = azureNameKeyCredential)

/** Java API */
def withEndPointUrl(endPointUrl: String): StorageSettings = copy(endPointUrl = emptyStringToOption(endPointUrl))

/** Java API */
def withRetrySettings(retrySettings: RetrySettings): StorageSettings = copy(retrySettings = retrySettings)

/** Java API */
def withAlgorithm(algorithm: String): StorageSettings = copy(algorithm = algorithm)

def withTokenCredential(tokenCredential: TokenCredential): StorageSettings =
copy(authorizationType = BearerTokenAuthorizationType, tokenCredential = Some(tokenCredential))

/** Java API */
def getTokenCredential: Optional[TokenCredential] = tokenCredential.toJava

override def toString: String =
s"""StorageSettings(
| apiVersion=$apiVersion,
| authorizationType=$authorizationType,
| endPointUrl=$endPointUrl,
| azureNameKeyCredential=$azureNameKeyCredential,
| sasToken=$sasToken
| sasToken=$sasToken,
| retrySettings=$retrySettings,
| algorithm=$algorithm
| algorithm=$algorithm,
| tokenCredential=${tokenCredential.map(_ => "<provided>").getOrElse("<none>")}
|)""".stripMargin.replaceAll(System.lineSeparator(), "")

override def equals(other: Any): Boolean = other match {
Expand All @@ -86,13 +88,20 @@ final class StorageSettings(val apiVersion: String,
Objects.equals(azureNameKeyCredential, that.azureNameKeyCredential) &&
sasToken == that.sasToken &&
Objects.equals(retrySettings, that.retrySettings) &&
algorithm == that.algorithm
algorithm == that.algorithm &&
tokenCredential == that.tokenCredential

case _ => false
}

override def hashCode(): Int =
Objects.hash(apiVersion, authorizationType, azureNameKeyCredential, sasToken, retrySettings, algorithm)
Objects.hash(apiVersion,
authorizationType,
azureNameKeyCredential,
sasToken,
retrySettings,
algorithm,
tokenCredential)
Comment thread
johanandren marked this conversation as resolved.

private def copy(
apiVersion: String = apiVersion,
Expand All @@ -101,21 +110,41 @@ final class StorageSettings(val apiVersion: String,
azureNameKeyCredential: AzureNameKeyCredential = azureNameKeyCredential,
sasToken: Option[String] = sasToken,
retrySettings: RetrySettings = retrySettings,
algorithm: String = algorithm
algorithm: String = algorithm,
tokenCredential: Option[TokenCredential] = tokenCredential
) =
StorageSettings(apiVersion,
authorizationType,
endPointUrl,
azureNameKeyCredential,
sasToken,
retrySettings,
algorithm)
new StorageSettings(apiVersion,
authorizationType,
endPointUrl,
azureNameKeyCredential,
sasToken,
retrySettings,
algorithm,
tokenCredential)
}

object StorageSettings {
private[storage] val ConfigPath = "alpakka.azure-storage"

private def buildDefaultAzureCredential(): TokenCredential =
try {
val clazz = Class.forName("com.azure.identity.DefaultAzureCredentialBuilder")
val builder = clazz.getDeclaredConstructor().newInstance()
clazz.getMethod("build").invoke(builder).asInstanceOf[TokenCredential]
} catch {
case _: ClassNotFoundException =>
throw new RuntimeException(
"BearerToken authorization type requires the 'com.azure:azure-identity' library on the classpath. " +
"Add it as a dependency to your project."
)
}

private val AuthorizationTypes =
Seq(AnonymousAuthorizationType, SharedKeyAuthorizationType, SharedKeyLiteAuthorizationType, SasAuthorizationType)
Seq(AnonymousAuthorizationType,
SharedKeyAuthorizationType,
SharedKeyLiteAuthorizationType,
SasAuthorizationType,
BearerTokenAuthorizationType)

def apply(
apiVersion: String,
Expand Down Expand Up @@ -167,14 +196,19 @@ object StorageSettings {
val retrySettings =
if (config.hasPath("retry-settings")) RetrySettings(config.getConfig("retry-settings")) else RetrySettings.Default

StorageSettings(
val tokenCredential =
if (authorizationType == BearerTokenAuthorizationType) Some(buildDefaultAzureCredential())
else None

new StorageSettings(
apiVersion = apiVersion,
authorizationType = authorizationType,
endPointUrl = config.getOptionalString("endpoint-url"),
azureNameKeyCredential = AzureNameKeyCredential(credentials),
sasToken = credentials.getOptionalString("sas-token"),
retrySettings = retrySettings,
algorithm = config.getString("signing-algorithm", "HmacSHA256")
algorithm = config.getString("signing-algorithm", "HmacSHA256"),
tokenCredential = tokenCredential
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,13 @@ import akka.NotUsed
import akka.http.scaladsl.model.{HttpRequest, Uri}
import akka.http.scaladsl.model.headers._
import akka.stream.scaladsl.Source
import com.azure.core.credential.TokenRequestContext

import java.time.Clock
import java.util.Base64
import javax.crypto.Mac
import javax.crypto.spec.SecretKeySpec
import scala.jdk.FutureConverters._

/** Takes initial request and add signed `Authorization` header and essential XMS headers.
*
Expand All @@ -42,7 +44,19 @@ final case class Signer(initialRequest: HttpRequest, settings: StorageSettings)(
val authorizationType = settings.authorizationType
if (authorizationType == AnonymousAuthorizationType || authorizationType == SasAuthorizationType)
Source.single(requestWithHeaders)
else
else if (authorizationType == BearerTokenAuthorizationType) {
val credential = settings.tokenCredential.getOrElse(
throw new IllegalStateException("TokenCredential must be provided for BearerToken authorization type")
)
val context = new TokenRequestContext().addScopes(Signer.StorageScope)
Source
.future(credential.getToken(context).toFuture.asScala)
.map { accessToken =>
requestWithHeaders.addHeader(
RawHeader(AuthorizationHeaderKey, s"Bearer ${accessToken.getToken}")
)
}
} else
Source.single(
requestWithHeaders.addHeader(
RawHeader(AuthorizationHeaderKey, generateAuthorizationHeader)
Expand Down Expand Up @@ -133,6 +147,7 @@ final case class Signer(initialRequest: HttpRequest, settings: StorageSettings)(
}

object Signer {
private val StorageScope = "https://storage.azure.com/.default"
private val SharedKeyHeaders =
Seq(
`Content-Encoding`.name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ package object storage {
private[storage] val SharedKeyAuthorizationType = "SharedKey"
private[storage] val SharedKeyLiteAuthorizationType = "SharedKeyLite"
private[storage] val SasAuthorizationType = "sas"
private[storage] val BearerTokenAuthorizationType = "BearerToken"
private[storage] val BlobType = "blob"
private[storage] val FileType = "file"
private[storage] val BlockBlobType = "BlockBlob"
Expand Down
49 changes: 49 additions & 0 deletions azure-storage/src/test/java/docs/javadsl/RequestBuilderTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,16 @@
import akka.http.javadsl.model.ContentTypes;
import akka.http.javadsl.model.headers.ByteRange;
import akka.http.scaladsl.model.headers.RawHeader;
import akka.stream.alpakka.azure.storage.AzureNameKeyCredential;
import akka.stream.alpakka.azure.storage.RetrySettings;
import akka.stream.alpakka.azure.storage.StorageSettings;
import akka.stream.alpakka.azure.storage.headers.ServerSideEncryption;
import akka.stream.alpakka.azure.storage.requests.CreateFile;
import akka.stream.alpakka.azure.storage.requests.GetBlob;
import akka.stream.alpakka.azure.storage.requests.PutBlockBlob;
import akka.stream.alpakka.testkit.javadsl.LogCapturingJunit4;
import com.azure.identity.DefaultAzureCredentialBuilder;
import com.azure.identity.ManagedIdentityCredentialBuilder;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
Expand Down Expand Up @@ -83,4 +88,48 @@ public void populateAdditionalHeaders() {
Assert.assertEquals(
new RawHeader("If-Match", "foobar"), requestBuilder.additionalHeaders().head());
}

@Test
public void createSettingsWithDefaultAzureCredential() {
// #bearer-token-default
var credential = new DefaultAzureCredentialBuilder().build();

var settings =
StorageSettings.create(
"2024-11-04",
"anon",
java.util.Optional.empty(),
AzureNameKeyCredential.create("myaccount", ""),
java.util.Optional.empty(),
RetrySettings.Default(),
"HmacSHA256")
.withTokenCredential(credential);
// #bearer-token-default

Assert.assertEquals("BearerToken", settings.authorizationType());
Assert.assertTrue(settings.getTokenCredential().isPresent());
}

@Test
public void createSettingsWithManagedIdentityCredential() {
// #bearer-token-managed-identity
// User Assigned Managed Identity
var credential =
new ManagedIdentityCredentialBuilder().clientId("<managed-identity-client-id>").build();

var settings =
StorageSettings.create(
"2024-11-04",
"anon",
java.util.Optional.empty(),
AzureNameKeyCredential.create("myaccount", ""),
java.util.Optional.empty(),
RetrySettings.Default(),
"HmacSHA256")
.withTokenCredential(credential);
// #bearer-token-managed-identity

Assert.assertEquals("BearerToken", settings.authorizationType());
Assert.assertTrue(settings.getTokenCredential().isPresent());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package docs.scaladsl

import akka.http.scaladsl.model.ContentTypes
import akka.http.scaladsl.model.headers.{ByteRange, RawHeader}
import akka.stream.alpakka.azure.storage.{AzureNameKeyCredential, StorageSettings}
import akka.stream.alpakka.azure.storage.headers.ServerSideEncryption
import akka.stream.alpakka.azure.storage.requests.{CreateFile, GetBlob, PutBlockBlob}
import akka.stream.alpakka.testkit.scaladsl.LogCapturing
Expand Down Expand Up @@ -67,4 +68,49 @@ class RequestBuilderSpec extends AnyFlatSpec with Matchers with LogCapturing {

requestBuilder.additionalHeaders shouldBe Seq(RawHeader("If-Match", "foobar"))
}

it should "create settings with default Azure credential" in {
//#bearer-token-default
import com.azure.identity.DefaultAzureCredentialBuilder

val credential = new DefaultAzureCredentialBuilder().build()

val settings = StorageSettings(
apiVersion = "2024-11-04",
authorizationType = "anon",
endPointUrl = None,
azureNameKeyCredential = AzureNameKeyCredential("myaccount", Array.empty[Byte]),
sasToken = None,
retrySettings = akka.stream.alpakka.azure.storage.RetrySettings.Default,
algorithm = "HmacSHA256"
).withTokenCredential(credential)
//#bearer-token-default

settings.authorizationType shouldBe "BearerToken"
settings.tokenCredential shouldBe defined
}

it should "create settings with managed identity credential" in {
//#bearer-token-managed-identity
import com.azure.identity.ManagedIdentityCredentialBuilder

// User Assigned Managed Identity
val credential = new ManagedIdentityCredentialBuilder()
.clientId("<managed-identity-client-id>")
.build()

val settings = StorageSettings(
apiVersion = "2024-11-04",
authorizationType = "anon",
endPointUrl = None,
azureNameKeyCredential = AzureNameKeyCredential("myaccount", Array.empty[Byte]),
sasToken = None,
retrySettings = akka.stream.alpakka.azure.storage.RetrySettings.Default,
algorithm = "HmacSHA256"
).withTokenCredential(credential)
//#bearer-token-managed-identity

settings.authorizationType shouldBe "BearerToken"
settings.tokenCredential shouldBe defined
}
}
39 changes: 39 additions & 0 deletions docs/src/main/paradox/azure-storage.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,45 @@ At minimum following configurations needs to be set:
* `account-key`, Account key to use to create authorization signature, mandatory for `SharedKey` or `SharedKeyLite` authorization types, as described [here](https://learn.microsoft.com/en-us/rest/api/storageservices/authorize-with-shared-key). Environment variable `AZURE_STORAGE_ACCOUNT_KEY` can be set to override this configuration.
* `sas-token` if authorization type is `sas`. Environment variable `AZURE_STORAGE_SAS_TOKEN` can be set to override this configuration.

### Azure AD authentication (BearerToken)

For environments where shared keys or SAS tokens are not desirable, the connector supports OAuth2 bearer token authentication via the [Azure Identity](https://learn.microsoft.com/en-us/java/api/overview/azure/identity-readme?view=azure-java-stable) library. This enables authentication using Managed Identity (system or user-assigned), workload identity, environment credentials, and other mechanisms supported by `DefaultAzureCredential`.

The `azure-identity` library is an optional dependency. To use bearer token authentication, add it to your project:

@@dependency [sbt,Maven,Gradle] {
group=com.azure
artifact=azure-identity
version=1.15.4
}

To use bearer token authentication via configuration, set `authorization-type` to `BearerToken`. This will automatically use `DefaultAzureCredential` which tries multiple credential sources in order:

```
alpakka.azure-storage.credentials {
authorization-type = BearerToken
account-name = "myaccount"
}
```

To use `DefaultAzureCredential` programmatically:

Scala
: @@snip [snip](/azure-storage/src/test/scala/docs/scaladsl/RequestBuilderSpec.scala) { #bearer-token-default }

Java
: @@snip [snip](/azure-storage/src/test/java/docs/javadsl/RequestBuilderTest.java) { #bearer-token-default }

For User Assigned Managed Identity (UAMI), provide the client ID of the managed identity:

Scala
: @@snip [snip](/azure-storage/src/test/scala/docs/scaladsl/RequestBuilderSpec.scala) { #bearer-token-managed-identity }

Java
: @@snip [snip](/azure-storage/src/test/java/docs/javadsl/RequestBuilderTest.java) { #bearer-token-managed-identity }

Any `com.azure.core.credential.TokenCredential` implementation can be used with `withTokenCredential`, including `ClientSecretCredential`, `ClientCertificateCredential`, `WorkloadIdentityCredential`, and others from the `azure-identity` library.

## Building request

Each function takes two parameters `objectPath` and `requestBuilder`. The `objectPath` is a `/` separated string of the path of the blob
Expand Down
Loading