Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion azure-storage/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ alpakka {

#azure-credentials
credentials {
# valid values are anon (annonymous), SharedKey, SharedKeyLite, and sas
# valid values are anon (anonymous), SharedKey, SharedKeyLite, sas, and BearerToken
# BearerToken uses Azure AD OAuth2 via azure-identity (supports managed identity, workload identity, etc.)
authorization-type = anon
authorization-type = ${?AZURE_STORAGE_AUTHORIZATION_TYPE}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package azure
package storage

import akka.actor.ClassicActorSystemProvider
import com.azure.core.credential.TokenCredential
import com.typesafe.config.Config

import java.time.{Duration => JavaDuration}
Expand All @@ -22,7 +23,8 @@ final class StorageSettings(val apiVersion: String,
val azureNameKeyCredential: AzureNameKeyCredential,
val sasToken: Option[String],
val retrySettings: RetrySettings,
val algorithm: String) {
val algorithm: String,
val tokenCredential: Option[TokenCredential] = None) {

/** Java API */
def getApiVersion: String = apiVersion
Expand All @@ -45,37 +47,40 @@ final class StorageSettings(val apiVersion: String,
/** Java API */
def getAlgorithm: String = algorithm

/** Java API */
def witApiVersion(apiVersion: String): StorageSettings = copy(apiVersion = apiVersion)
def withApiVersion(apiVersion: String): StorageSettings = copy(apiVersion = apiVersion)

@deprecated("Use withApiVersion instead", "azure-storage 10.0.0")
def witApiVersion(apiVersion: String): StorageSettings = withApiVersion(apiVersion)

/** Java API */
def withAuthorizationType(authorizationType: String): StorageSettings = copy(authorizationType = authorizationType)

/** Java API */
def withSasToken(sasToken: String): StorageSettings = copy(sasToken = emptyStringToOption(sasToken))

/** Java API */
def withAzureNameKeyCredential(azureNameKeyCredential: AzureNameKeyCredential): StorageSettings =
copy(azureNameKeyCredential = azureNameKeyCredential)

/** Java API */
def withEndPointUrl(endPointUrl: String): StorageSettings = copy(endPointUrl = emptyStringToOption(endPointUrl))

/** Java API */
def withRetrySettings(retrySettings: RetrySettings): StorageSettings = copy(retrySettings = retrySettings)

/** Java API */
def withAlgorithm(algorithm: String): StorageSettings = copy(algorithm = algorithm)

def withTokenCredential(tokenCredential: TokenCredential): StorageSettings =
copy(authorizationType = BearerTokenAuthorizationType, tokenCredential = Some(tokenCredential))

/** Java API */
def getTokenCredential: Optional[TokenCredential] = tokenCredential.toJava

override def toString: String =
s"""StorageSettings(
| apiVersion=$apiVersion,
| authorizationType=$authorizationType,
| endPointUrl=$endPointUrl,
| azureNameKeyCredential=$azureNameKeyCredential,
| sasToken=$sasToken
| sasToken=$sasToken,
| retrySettings=$retrySettings,
| algorithm=$algorithm
| algorithm=$algorithm,
| tokenCredential=${tokenCredential.map(_ => "<provided>").getOrElse("<none>")}
|)""".stripMargin.replaceAll(System.lineSeparator(), "")

override def equals(other: Any): Boolean = other match {
Expand All @@ -86,13 +91,21 @@ final class StorageSettings(val apiVersion: String,
Objects.equals(azureNameKeyCredential, that.azureNameKeyCredential) &&
sasToken == that.sasToken &&
Objects.equals(retrySettings, that.retrySettings) &&
algorithm == that.algorithm
algorithm == that.algorithm &&
tokenCredential == that.tokenCredential

case _ => false
}

override def hashCode(): Int =
Objects.hash(apiVersion, authorizationType, azureNameKeyCredential, sasToken, retrySettings, algorithm)
Objects.hash(apiVersion,
authorizationType,
endPointUrl,
azureNameKeyCredential,
sasToken,
retrySettings,
algorithm,
tokenCredential)
Comment thread
johanandren marked this conversation as resolved.

private def copy(
apiVersion: String = apiVersion,
Expand All @@ -101,21 +114,41 @@ final class StorageSettings(val apiVersion: String,
azureNameKeyCredential: AzureNameKeyCredential = azureNameKeyCredential,
sasToken: Option[String] = sasToken,
retrySettings: RetrySettings = retrySettings,
algorithm: String = algorithm
algorithm: String = algorithm,
tokenCredential: Option[TokenCredential] = tokenCredential
) =
StorageSettings(apiVersion,
authorizationType,
endPointUrl,
azureNameKeyCredential,
sasToken,
retrySettings,
algorithm)
new StorageSettings(apiVersion,
authorizationType,
endPointUrl,
azureNameKeyCredential,
sasToken,
retrySettings,
algorithm,
tokenCredential)
}

object StorageSettings {
private[storage] val ConfigPath = "alpakka.azure-storage"

private def buildDefaultAzureCredential(): TokenCredential =
try {
val clazz = Class.forName("com.azure.identity.DefaultAzureCredentialBuilder")
val builder = clazz.getDeclaredConstructor().newInstance()
clazz.getMethod("build").invoke(builder).asInstanceOf[TokenCredential]
} catch {
case _: ClassNotFoundException =>
throw new RuntimeException(
"BearerToken authorization type requires the 'com.azure:azure-identity' library on the classpath. " +
"Add it as a dependency to your project."
)
}

private val AuthorizationTypes =
Seq(AnonymousAuthorizationType, SharedKeyAuthorizationType, SharedKeyLiteAuthorizationType, SasAuthorizationType)
Seq(AnonymousAuthorizationType,
SharedKeyAuthorizationType,
SharedKeyLiteAuthorizationType,
SasAuthorizationType,
BearerTokenAuthorizationType)

def apply(
apiVersion: String,
Expand Down Expand Up @@ -167,14 +200,19 @@ object StorageSettings {
val retrySettings =
if (config.hasPath("retry-settings")) RetrySettings(config.getConfig("retry-settings")) else RetrySettings.Default

StorageSettings(
val tokenCredential =
if (authorizationType == BearerTokenAuthorizationType) Some(buildDefaultAzureCredential())
else None

new StorageSettings(
apiVersion = apiVersion,
authorizationType = authorizationType,
endPointUrl = config.getOptionalString("endpoint-url"),
azureNameKeyCredential = AzureNameKeyCredential(credentials),
sasToken = credentials.getOptionalString("sas-token"),
retrySettings = retrySettings,
algorithm = config.getString("signing-algorithm", "HmacSHA256")
algorithm = config.getString("signing-algorithm", "HmacSHA256"),
tokenCredential = tokenCredential
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,13 @@ import akka.NotUsed
import akka.http.scaladsl.model.{HttpRequest, Uri}
import akka.http.scaladsl.model.headers._
import akka.stream.scaladsl.Source
import com.azure.core.credential.TokenRequestContext

import java.time.Clock
import java.util.Base64
import javax.crypto.Mac
import javax.crypto.spec.SecretKeySpec
import scala.jdk.FutureConverters._

/** Takes initial request and add signed `Authorization` header and essential XMS headers.
*
Expand All @@ -42,7 +44,19 @@ final case class Signer(initialRequest: HttpRequest, settings: StorageSettings)(
val authorizationType = settings.authorizationType
if (authorizationType == AnonymousAuthorizationType || authorizationType == SasAuthorizationType)
Source.single(requestWithHeaders)
else
else if (authorizationType == BearerTokenAuthorizationType) {
val credential = settings.tokenCredential.getOrElse(
throw new IllegalStateException("TokenCredential must be provided for BearerToken authorization type")
)
val context = new TokenRequestContext().addScopes(Signer.StorageScope)
Source
.future(credential.getToken(context).toFuture.asScala)
.map { accessToken =>
requestWithHeaders.addHeader(
RawHeader(AuthorizationHeaderKey, s"Bearer ${accessToken.getToken}")
)
}
} else
Source.single(
requestWithHeaders.addHeader(
RawHeader(AuthorizationHeaderKey, generateAuthorizationHeader)
Expand Down Expand Up @@ -133,6 +147,7 @@ final case class Signer(initialRequest: HttpRequest, settings: StorageSettings)(
}

object Signer {
private val StorageScope = "https://storage.azure.com/.default"
private val SharedKeyHeaders =
Seq(
`Content-Encoding`.name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ package object storage {
private[storage] val SharedKeyAuthorizationType = "SharedKey"
private[storage] val SharedKeyLiteAuthorizationType = "SharedKeyLite"
private[storage] val SasAuthorizationType = "sas"
private[storage] val BearerTokenAuthorizationType = "BearerToken"
private[storage] val BlobType = "blob"
private[storage] val FileType = "file"
private[storage] val BlockBlobType = "BlockBlob"
Expand Down
49 changes: 49 additions & 0 deletions azure-storage/src/test/java/docs/javadsl/RequestBuilderTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,16 @@
import akka.http.javadsl.model.ContentTypes;
import akka.http.javadsl.model.headers.ByteRange;
import akka.http.scaladsl.model.headers.RawHeader;
import akka.stream.alpakka.azure.storage.AzureNameKeyCredential;
import akka.stream.alpakka.azure.storage.RetrySettings;
import akka.stream.alpakka.azure.storage.StorageSettings;
import akka.stream.alpakka.azure.storage.headers.ServerSideEncryption;
import akka.stream.alpakka.azure.storage.requests.CreateFile;
import akka.stream.alpakka.azure.storage.requests.GetBlob;
import akka.stream.alpakka.azure.storage.requests.PutBlockBlob;
import akka.stream.alpakka.testkit.javadsl.LogCapturingJunit4;
import com.azure.identity.DefaultAzureCredentialBuilder;
import com.azure.identity.ManagedIdentityCredentialBuilder;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
Expand Down Expand Up @@ -83,4 +88,48 @@ public void populateAdditionalHeaders() {
Assert.assertEquals(
new RawHeader("If-Match", "foobar"), requestBuilder.additionalHeaders().head());
}

@Test
public void createSettingsWithDefaultAzureCredential() {
// #bearer-token-default
var credential = new DefaultAzureCredentialBuilder().build();

var settings =
StorageSettings.create(
"2024-11-04",
"anon",
java.util.Optional.empty(),
AzureNameKeyCredential.create("myaccount", ""),
java.util.Optional.empty(),
RetrySettings.Default(),
"HmacSHA256")
.withTokenCredential(credential);
// #bearer-token-default

Assert.assertEquals("BearerToken", settings.authorizationType());
Assert.assertTrue(settings.getTokenCredential().isPresent());
}

@Test
public void createSettingsWithManagedIdentityCredential() {
// #bearer-token-managed-identity
// User Assigned Managed Identity
var credential =
new ManagedIdentityCredentialBuilder().clientId("<managed-identity-client-id>").build();

var settings =
StorageSettings.create(
"2024-11-04",
"anon",
java.util.Optional.empty(),
AzureNameKeyCredential.create("myaccount", ""),
java.util.Optional.empty(),
RetrySettings.Default(),
"HmacSHA256")
.withTokenCredential(credential);
// #bearer-token-managed-identity

Assert.assertEquals("BearerToken", settings.authorizationType());
Assert.assertTrue(settings.getTokenCredential().isPresent());
}
}
Loading