Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docs/data-sources/confluent_catalog_integration.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,4 +89,7 @@ In addition to the preceding arguments, the following attributes are exported:
- `endpoint` - (Required String) The catalog integration connection endpoint for Snowflake Open Catalog.
- `warehouse` - (Required String) Warehouse name of the Snowflake Open Catalog.
- `allowed_scope` - (Required String) Allowed scope of the Snowflake Open Catalog.
- `unity` (Optional Configuration Block) supports the following:
- `workspace_endpoint` - (Required String) The Databricks workspace URL associated with the Unity Catalog.
- `catalog_name` - (Required String) The name of the catalog within Unity Catalog.
- `suspended` - (Optional Boolean) Indicates whether the Catalog Integration should be suspended.
5 changes: 5 additions & 0 deletions docs/resources/confluent_catalog_integration.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,11 @@ The following arguments are supported:
- `client_secret` - (Required String, Sensitive) The client secret of the catalog integration.
- `warehouse` - (Required String) Warehouse name of the Snowflake Open Catalog, for example, `catalog-name`.
- `allowed_scope` - (Required String) Allowed scope of the Snowflake Open Catalog.
- `unity` (Optional Configuration Block) supports the following (see [Integrate Tableflow with Unity Catalog in Confluent Cloud](https://docs.confluent.io/cloud/current/topics/tableflow/how-to-guides/catalog-integration/integrate-with-unity-catalog.html) for more details):
- `workspace_endpoint` - (Required String) The Databricks workspace URL associated with the Unity Catalog, for example, `https://user1.cloud.databricks.com`.
- `catalog_name` - (Required String) The name of the catalog within Unity Catalog.
- `client_id` - (Required String, Sensitive) The OAuth client ID used to authenticate with the Unity Catalog.
- `client_secret` - (Required String, Sensitive) The OAuth client secret used for authentication with the Unity Catalog.
- `credentials` (Optional Configuration Block) supports the following:
- `key` - (Required String) The Tableflow API Key.
- `secret` - (Required String, Sensitive) The Tableflow API Secret.
Expand Down
2 changes: 1 addition & 1 deletion docs/resources/confluent_tableflow_topic.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ description: |-

-> **Note:** It is recommended to set `lifecycle { prevent_destroy = true }` on production instances to prevent accidental tableflow topic deletion. This setting rejects plans that would destroy or recreate the tableflow topic, such as attempting to change uneditable attributes. Read more about it in the [Terraform docs](https://www.terraform.io/language/meta-arguments/lifecycle#prevent_destroy).

-> **Note:** Make sure to use `confluent_catalog_integration` [resource](https://registry.terraform.io/providers/confluentinc/confluent/latest/docs/resources/confluent_catalog_integration) if you want to integrate Tableflow with AWS Glue Catalog or Snowflake Open Catalog.
-> **Note:** Make sure to use `confluent_catalog_integration` [resource](https://registry.terraform.io/providers/confluentinc/confluent/latest/docs/resources/confluent_catalog_integration) if you want to integrate Tableflow with AWS Glue Catalog, Snowflake Open Catalog, or Unity Catalog.

## Example Usage

Expand Down
22 changes: 22 additions & 0 deletions internal/provider/data_source_catalog_integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ func catalogIntegrationDataSource() *schema.Resource {
paramCredentials: credentialsSchema(),
paramAwsGlue: awsGlueDataSourceSchema(),
paramSnowflake: snowflakeDataSourceSchema(),
paramUnity: unityDataSourceSchema(),
},
}
}
Expand Down Expand Up @@ -129,3 +130,24 @@ func snowflakeDataSourceSchema() *schema.Schema {
Computed: true,
}
}

Copy link

Copilot AI Nov 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The unityDataSourceSchema function is missing a documentation comment explaining its purpose, similar to how other schema functions in the file have descriptive comments.

Suggested change
// unityDataSourceSchema returns the schema definition for the Unity Catalog integration
// data source, including fields for the Databricks workspace endpoint and catalog name.

Copilot uses AI. Check for mistakes.
func unityDataSourceSchema() *schema.Schema {
return &schema.Schema{
Type: schema.TypeList,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
paramWorkspaceEndpoint: {
Type: schema.TypeString,
Computed: true,
Description: "The Databricks workspace URL associated with the Unity Catalog.",
},
paramCatalogName: {
Type: schema.TypeString,
Computed: true,
Description: "The name of the catalog within Unity Catalog.",
},
},
},
Computed: true,
}
}
111 changes: 110 additions & 1 deletion internal/provider/data_source_catalog_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ const (
CatalogIntegrationDataSourceScenarioName = "confluent_catalog_integration Data Source Lifecycle"
)

func TestAccDataSourceCatalogIntegration(t *testing.T) {
func TestAccDataSourceCatalogIntegrationAwsGlue(t *testing.T) {
ctx := context.Background()

wiremockContainer, err := setupWiremock(ctx)
Expand Down Expand Up @@ -60,12 +60,121 @@ func TestAccDataSourceCatalogIntegration(t *testing.T) {
resource.TestCheckResourceAttr(CatalogIntegrationResourceName, "suspended", "false"),
resource.TestCheckResourceAttr(CatalogIntegrationResourceName, "aws_glue.#", "1"),
resource.TestCheckResourceAttr(CatalogIntegrationResourceName, "snowflake.#", "0"),
resource.TestCheckResourceAttr(CatalogIntegrationResourceName, "unity.#", "0"),
resource.TestCheckResourceAttr(CatalogIntegrationResourceName, "aws_glue.0.provider_integration_id", "cspi-stgce89r7"),
),
},
},
})
}

func TestAccDataSourceCatalogIntegrationSnowflake(t *testing.T) {
ctx := context.Background()

wiremockContainer, err := setupWiremock(ctx)
if err != nil {
t.Fatal(err)
}
defer wiremockContainer.Terminate(ctx)

mockServerUrl := wiremockContainer.URI
wiremockClient := wiremock.NewClient(mockServerUrl)
// nolint:errcheck
defer wiremockClient.Reset()

// nolint:errcheck
defer wiremockClient.ResetAllScenarios()

readCatalogIntegrationResponse, _ := os.ReadFile("../testdata/catalog_integration/read_created_snowflake_ci.json")
_ = wiremockClient.StubFor(wiremock.Get(wiremock.URLPathEqualTo("/tableflow/v1/catalog-integrations/tci-abc123")).
InScenario(CatalogIntegrationDataSourceScenarioName).
WhenScenarioStateIs(wiremock.ScenarioStateStarted).
WillReturn(
string(readCatalogIntegrationResponse),
contentTypeJSONHeader,
http.StatusOK,
))

CatalogIntegrationResourceName := "data.confluent_catalog_integration.main"

resource.Test(t, resource.TestCase{
PreCheck: func() { testAccPreCheck(t) },
ProviderFactories: testAccProviderFactories,
Steps: []resource.TestStep{
{
Config: testAccCheckDataSourceCatalogIntegration(mockServerUrl, "tci-abc123"),
Check: resource.ComposeTestCheckFunc(
resource.TestCheckResourceAttr(CatalogIntegrationResourceName, "id", "tci-abc123"),
resource.TestCheckResourceAttr(CatalogIntegrationResourceName, "display_name", "catalog_integration_1"),
resource.TestCheckResourceAttr(CatalogIntegrationResourceName, "environment.#", "1"),
resource.TestCheckResourceAttr(CatalogIntegrationResourceName, "environment.0.id", "env-abc123"),
resource.TestCheckResourceAttr(CatalogIntegrationResourceName, "kafka_cluster.#", "1"),
resource.TestCheckResourceAttr(CatalogIntegrationResourceName, "kafka_cluster.0.id", "lkc-00000"),
resource.TestCheckResourceAttr(CatalogIntegrationResourceName, "suspended", "false"),
resource.TestCheckResourceAttr(CatalogIntegrationResourceName, "aws_glue.#", "0"),
resource.TestCheckResourceAttr(CatalogIntegrationResourceName, "unity.#", "0"),
resource.TestCheckResourceAttr(CatalogIntegrationResourceName, "snowflake.#", "1"),
resource.TestCheckResourceAttr(CatalogIntegrationResourceName, "snowflake.0.endpoint", "https://vuser1_polaris.snowflakecomputing.com/"),
resource.TestCheckResourceAttr(CatalogIntegrationResourceName, "snowflake.0.warehouse", "warehouse-name"),
resource.TestCheckResourceAttr(CatalogIntegrationResourceName, "snowflake.0.allowed_scope", "allowed-scope"),
),
},
},
})
}

func TestAccDataSourceCatalogIntegrationUnity(t *testing.T) {
ctx := context.Background()

wiremockContainer, err := setupWiremock(ctx)
if err != nil {
t.Fatal(err)
}
defer wiremockContainer.Terminate(ctx)

mockServerUrl := wiremockContainer.URI
wiremockClient := wiremock.NewClient(mockServerUrl)
// nolint:errcheck
defer wiremockClient.Reset()

// nolint:errcheck
defer wiremockClient.ResetAllScenarios()

readCatalogIntegrationResponse, _ := os.ReadFile("../testdata/catalog_integration/read_created_unity_ci.json")
_ = wiremockClient.StubFor(wiremock.Get(wiremock.URLPathEqualTo("/tableflow/v1/catalog-integrations/tci-abc123")).
InScenario(CatalogIntegrationDataSourceScenarioName).
WhenScenarioStateIs(wiremock.ScenarioStateStarted).
WillReturn(
string(readCatalogIntegrationResponse),
contentTypeJSONHeader,
http.StatusOK,
))

CatalogIntegrationResourceName := "data.confluent_catalog_integration.main"

resource.Test(t, resource.TestCase{
PreCheck: func() { testAccPreCheck(t) },
ProviderFactories: testAccProviderFactories,
Steps: []resource.TestStep{
{
Config: testAccCheckDataSourceCatalogIntegration(mockServerUrl, "tci-abc123"),
Check: resource.ComposeTestCheckFunc(
resource.TestCheckResourceAttr(CatalogIntegrationResourceName, "id", "tci-abc123"),
resource.TestCheckResourceAttr(CatalogIntegrationResourceName, "display_name", "catalog_integration_1"),
resource.TestCheckResourceAttr(CatalogIntegrationResourceName, "environment.#", "1"),
resource.TestCheckResourceAttr(CatalogIntegrationResourceName, "environment.0.id", "env-abc123"),
resource.TestCheckResourceAttr(CatalogIntegrationResourceName, "kafka_cluster.#", "1"),
resource.TestCheckResourceAttr(CatalogIntegrationResourceName, "kafka_cluster.0.id", "lkc-00000"),
resource.TestCheckResourceAttr(CatalogIntegrationResourceName, "suspended", "false"),
resource.TestCheckResourceAttr(CatalogIntegrationResourceName, "aws_glue.#", "0"),
resource.TestCheckResourceAttr(CatalogIntegrationResourceName, "snowflake.#", "0"),
resource.TestCheckResourceAttr(CatalogIntegrationResourceName, "unity.#", "1"),
resource.TestCheckResourceAttr(CatalogIntegrationResourceName, "unity.0.workspace_endpoint", "https://user1.cloud.databricks.com"),
resource.TestCheckResourceAttr(CatalogIntegrationResourceName, "unity.0.catalog_name", "catalog_name"),
),
},
},
})
}

func testAccCheckDataSourceCatalogIntegration(mockServerUrl, resourceId string) string {
Expand Down
Loading