Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions cli/installers/resources/default-source-providers.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,6 @@ spec:
- database
- host
- port
- password
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we make user and password optional here along with identity block, and make the mgmt_api enforce that atleast one of them is present?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should avoid making global assumptions about how all sources should work in the Management API. Ideally we want to offload this validation to the implementor of each source.

- user
- tables
---
apiVersion: v1
Expand Down
4 changes: 2 additions & 2 deletions sources/relational/debezium-reactivator/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>21</maven.compiler.source>
<maven.compiler.target>21</maven.compiler.target>
<version.debezium>2.7.3.Final</version.debezium>
<version.debezium>3.2.2.Final</version.debezium>
</properties>

<dependencies>
Expand Down Expand Up @@ -69,7 +69,7 @@
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-identity</artifactId>
<version>1.9.2</version>
<version>1.18.0</version>
</dependency>

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,14 +134,16 @@ private Configuration createBaseConfig() {
.with("tombstones.on.delete", false)
// Used as a namespace for the connector storage.
.with("topic.prefix", cleanSourceId)

.with("errors.max.retries", "3")
.build();
}

private void startEngine(ChangePublisher changePublisher, Configuration config, RelationalGraphMapping mappings) {
final Properties props = config.asProperties();
engine = DebeziumEngine.create(Json.class)
.using(props)
.using(OffsetCommitPolicy.always())
.using(OffsetCommitPolicy.always())
.using((success, message, error) -> {
if (!success && error != null) {
log.error("Error in Debezium engine: {}", error.getMessage());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package io.drasi.databases;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.azure.identity.DefaultAzureCredentialBuilder;
import com.azure.core.credential.TokenRequestContext;

public class AzureIdentityTokenFetcher {
private static final Logger log = LoggerFactory.getLogger(AzureIdentityTokenFetcher.class);

private static final String SCOPE = "https://ossrdbms-aad.database.windows.net/.default";

public String getToken() {

log.info("Fetching Azure AD token");

// Fetch Azure AD token
var credential = new DefaultAzureCredentialBuilder().build();
var tokenContext = new TokenRequestContext().addScopes(SCOPE);
String accessToken = credential.getToken(tokenContext).block().getToken();

return accessToken;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
import io.drasi.models.NodeMapping;
import io.drasi.models.RelationalGraphMapping;
import io.drasi.source.sdk.Reactivator;
import io.drasi.source.sdk.SourceProxy;

import org.checkerframework.checker.units.qual.t;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -90,7 +90,7 @@ public String getTablesListConfigName() {
@Override
public Configuration createConnectorConfig(Configuration baseConfig) {
var publicationSlotName = "rg_" + baseConfig.getString("name");
return Configuration.create()
var result = Configuration.create()
// Start with the base configuration.
.with(baseConfig)
// Specify the Postgres connector class.
Expand All @@ -104,8 +104,22 @@ public Configuration createConnectorConfig(Configuration baseConfig) {
// Name of replication slot for streaming changes from the database. Default is debezium.
.with("slot.name", publicationSlotName)
// If started first time, start from beginning, else start from last stored LSN.
.with("snapshot.mode", "no_data")
.build();
.with("snapshot.mode", "no_data");

String identityType = SourceProxy.GetConfigValue("IDENTITY_TYPE");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we reconsider (maybe in a separate PR) adding fields to source-provider instead of relying on undocumented environment variables like IDENTITY_TYPE ?

Should the SDK provide a better way of achieving this? I worry that we're not using the schema-driven approach properly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have varying degrees of support for abstracting that environment variable across our SDKs, I think the .net version has this. We also do not yet have any documentation on building a source, I think that would be the place to document these. Could you clarify what you mean by "schema-driven approach"?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By schema-driven I meant that source-provider defines the schema of the Source Config, so it can drive what fields are supported in the config.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The identity type is a higher level field in the YAML that is applicable to all sources / reactions. The config schema is set for the configuration section, which is specific per component.

if ("MicrosoftEntraWorkloadID".equals(identityType)) {

var tokenFetcher = new AzureIdentityTokenFetcher();
var accessToken = tokenFetcher.getToken();

result = result
.with("database.password", accessToken)
.with("database.sslmode", "require");

log.info("Using Azure Identity PostgreSQL Driver for authentication");
}

return result.build();
}

@Override
Expand Down
4 changes: 2 additions & 2 deletions sources/relational/sql-proxy/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>42.7.4</version>
<version>42.7.8</version>
</dependency>

<dependency>
Expand All @@ -60,7 +60,7 @@
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-identity</artifactId>
<version>1.9.2</version>
<version>1.18.0</version>
</dependency>

<dependency>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package io.drasi;

import com.azure.identity.DefaultAzureCredentialBuilder;
import com.azure.core.credential.AccessToken;
import io.drasi.source.sdk.SourceProxy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.Properties;

public class AzureIdentityPostgreSQLConnection {

private static final Logger log = LoggerFactory.getLogger(AzureIdentityPostgreSQLConnection.class);
private static final String POSTGRES_SCOPE = "https://ossrdbms-aad.database.windows.net/.default";

public static Connection getConnection() throws SQLException {
try {
log.info("Using Azure Identity for PostgreSQL authentication");

// Get access token using DefaultAzureCredential
var credential = new DefaultAzureCredentialBuilder().build();
AccessToken token = credential.getTokenSync(
new com.azure.core.credential.TokenRequestContext()
.addScopes(POSTGRES_SCOPE)
);

// Set up connection properties
var props = new Properties();
props.setProperty("user", SourceProxy.GetConfigValue("user"));
props.setProperty("password", token.getToken());

// For Azure PostgreSQL with managed identity, we need to set these additional properties
props.setProperty("ssl", "true");
props.setProperty("sslmode", "require");

String connectionUrl = "jdbc:postgresql://"
+ SourceProxy.GetConfigValue("host") + ":"
+ SourceProxy.GetConfigValue("port") + "/"
+ SourceProxy.GetConfigValue("database");

log.debug("Connecting to PostgreSQL using Azure Identity token");
return DriverManager.getConnection(connectionUrl, props);

} catch (Exception e) {
log.error("Failed to authenticate using Azure Identity", e);
throw new SQLException("Azure Identity authentication failed", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,13 @@ public void close() {
private static Connection getConnection() throws SQLException {
switch (SourceProxy.GetConfigValue("connector")) {
case "PostgreSQL":
// Check if Azure Identity should be used
String identityType = SourceProxy.GetConfigValue("IDENTITY_TYPE");
if ("MicrosoftEntraWorkloadID".equals(identityType)) {
return AzureIdentityPostgreSQLConnection.getConnection();
}

// Default PostgreSQL connection with username/password
var propsPG = new Properties();
propsPG.setProperty("user", SourceProxy.GetConfigValue("user"));
propsPG.setProperty("password", SourceProxy.GetConfigValue("password"));
Expand Down
Loading