Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions internal/prebuiltconfigs/tools/dataplex.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,19 @@ tools:
search_entries:
kind: dataplex-search-entries
source: dataplex-source
description: Use this tool to search for entries in Dataplex Catalog based on the provided search query.
description: Searches the entries in Dataplex Catalog based on the provided search query.
lookup_entry:
kind: dataplex-lookup-entry
source: dataplex-source
description: Use this tool to retrieve a specific entry from Dataplex Catalog.
description: Retrieves a specific entry from Dataplex Catalog.
search_aspect_types:
kind: dataplex-search-aspect-types
source: dataplex-source
description: Use this tool to find aspect types relevant to the query.
description: Find aspect types relevant to the query.
lookup_context:
kind: dataplex-lookup-context
source: dataplex-source
description: Use this tool to retrieve rich metadata regarding one or more data assets along with their relationships.
description: Retrieves rich metadata regarding one or more data assets along with their relationships.

toolsets:
discovery:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"fmt"
"net/http"
"strings"

"cloud.google.com/go/dataplex/apiv1/dataplexpb"
"github.com/goccy/go-yaml"
Expand Down Expand Up @@ -65,9 +66,13 @@ func (cfg Config) ToolConfigType() string {
}

func (cfg Config) Initialize(srcs map[string]sources.Source) (tools.Tool, error) {
name := parameters.NewStringParameter("name", "The project to which the request should be attributed in the following form: projects/{project}/locations/{location}.")
resources := parameters.NewArrayParameter("resources", "A list of up to 10 resources names for which metadata is needed.", parameters.NewStringParameter("resource", "Name of a resource in the following format: projects/{project}/locations/{location}/entryGroups/{group}/entries/{entry}."))
params := parameters.Parameters{name, resources}
resources := parameters.NewArrayParameter("resources",
"Required. A list of up to 10 resource names from same project and location.",
parameters.NewStringParameter("resource",
"Name of a resource in the following format: projects/{project_number}/locations/{location}/entryGroups/{group}/entries/{entry}."+
" Example for a BigQuery table: 'projects/{project_number}/locations/{location}/entryGroups/@bigquery/entries/bigquery.googleapis.com/projects/{project_id}/datasets/{dataset_id}/tables/{table_id}'."+
" This is the same value which is returned by the search_entries tool's response in the dataplexEntry.name field."))
params := parameters.Parameters{resources}

mcpManifest := tools.GetMcpManifest(cfg.Name, cfg.Description, cfg.AuthRequired, params, nil)

Expand Down Expand Up @@ -102,12 +107,33 @@ func (t Tool) Invoke(ctx context.Context, resourceMgr tools.SourceProvider, para
}

paramsMap := params.AsMap()
name, _ := paramsMap["name"].(string)
resourcesSlice, err := parameters.ConvertAnySliceToTyped(paramsMap["resources"].([]any), "string")
if err != nil {
return nil, util.NewAgentError(fmt.Sprintf("can't convert resources to array of strings: %s", err), err)
}
resources := resourcesSlice.([]string)

if len(resources) == 0 {
err := fmt.Errorf("resources cannot be empty")
return nil, util.NewAgentError(err.Error(), err)
}
var name string
for i, resource := range resources {
parts := strings.Split(resource, "/")
if len(parts) < 4 || parts[0] != "projects" || parts[2] != "locations" {
err := fmt.Errorf("invalid resource format at index %d, must be in the format of projects/{project_number}/locations/{location}/entryGroups/{group}/entries/{entry}", i)
return nil, util.NewAgentError(err.Error(), err)
}

currentName := strings.Join(parts[:4], "/")
if i == 0 {
name = currentName
} else if name != currentName {
err := fmt.Errorf("all resources must belong to the same project and location. Please make separate calls for each distinct project and location combination.")
return nil, util.NewAgentError(err.Error(), err)
}
}

resp, err := source.LookupContext(ctx, name, resources)
if err != nil {
return nil, util.ProcessGcpError(err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,6 @@ func TestParseFromYamlDataplexLookupContext(t *testing.T) {
source: my-instance
description: some description
parameters:
- name: name
type: string
description: some name description
- name: resources
type: array
description: some resources description
Expand All @@ -81,7 +78,6 @@ func TestParseFromYamlDataplexLookupContext(t *testing.T) {
Description: "some description",
AuthRequired: []string{},
Parameters: []parameters.Parameter{
parameters.NewStringParameter("name", "some name description"),
parameters.NewArrayParameter("resources", "some resources description", parameters.NewStringParameter("resource", "some resource description")),
},
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"fmt"
"net/http"
"strings"

dataplexpb "cloud.google.com/go/dataplex/apiv1/dataplexpb"
"github.com/goccy/go-yaml"
Expand Down Expand Up @@ -70,7 +71,7 @@ func (cfg Config) Initialize(srcs map[string]sources.Source) (tools.Tool, error)

**Type:** Integer

**Description:** Specifies the parts of the entry and its aspects to return.
**Description:** Optional. Specifies the parts of the entry and its aspects to return.

**Possible Values:**

Expand All @@ -80,11 +81,10 @@ func (cfg Config) Initialize(srcs map[string]sources.Source) (tools.Tool, error)
* 4 (ALL): Return the entry and both required and optional aspects (at most 100 aspects)
`

name := parameters.NewStringParameter("name", "The project to which the request should be attributed in the following form: projects/{project}/locations/{location}.")
view := parameters.NewIntParameterWithDefault("view", 2, viewDesc)
aspectTypes := parameters.NewArrayParameterWithDefault("aspectTypes", []any{}, "Limits the aspects returned to the provided aspect types. It only works when used together with CUSTOM view.", parameters.NewStringParameter("aspectType", "The types of aspects to be included in the response in the format `projects/{project}/locations/{location}/aspectTypes/{aspectType}`."))
entry := parameters.NewStringParameter("entry", "The resource name of the Entry in the following form: projects/{project}/locations/{location}/entryGroups/{entryGroup}/entries/{entry}.")
params := parameters.Parameters{name, view, aspectTypes, entry}
aspectTypes := parameters.NewArrayParameterWithDefault("aspectTypes", []any{}, "Optional. Limits the aspects returned to the provided aspect types. It only works when used together with CUSTOM view.", parameters.NewStringParameter("aspectType", "The types of aspects to be included in the response in the format `projects/{project}/locations/{location}/aspectTypes/{aspectType}`."))
entry := parameters.NewStringParameter("entry", "Required. The resource name of the Entry in the following form: projects/{project}/locations/{location}/entryGroups/{entryGroup}/entries/{entry}.")
params := parameters.Parameters{entry, view, aspectTypes}

mcpManifest := tools.GetMcpManifest(cfg.Name, cfg.Description, cfg.AuthRequired, params, nil)

Expand Down Expand Up @@ -119,10 +119,16 @@ func (t Tool) Invoke(ctx context.Context, resourceMgr tools.SourceProvider, para
}

paramsMap := params.AsMap()
name, _ := paramsMap["name"].(string)
entry, _ := paramsMap["entry"].(string)
view, _ := paramsMap["view"].(int)
aspectTypeSlice, err := parameters.ConvertAnySliceToTyped(paramsMap["aspectTypes"].([]any), "string")

parts := strings.Split(entry, "/")
if len(parts) < 4 || parts[0] != "projects" || parts[2] != "locations" {
err = fmt.Errorf("invalid entry format: must be in the form projects/{project}/locations/{location}/entryGroups/{entryGroup}/entries/{entry}")
return nil, util.NewAgentError(err.Error(), err)
}
name := strings.Join(parts[:4], "/")
if err != nil {
return nil, util.NewAgentError(fmt.Sprintf("can't convert aspectTypes to array of strings: %s", err), err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,6 @@ func TestParseFromYamlDataplexLookupEntry(t *testing.T) {
source: my-instance
description: some description
parameters:
- name: name
type: string
description: some name description
- name: view
type: string
description: some view description
Expand All @@ -88,7 +85,6 @@ func TestParseFromYamlDataplexLookupEntry(t *testing.T) {
Description: "some description",
AuthRequired: []string{},
Parameters: []parameters.Parameter{
parameters.NewStringParameter("name", "some name description"),
parameters.NewStringParameter("view", "some view description"),
parameters.NewArrayParameterWithDefault("aspectTypes", []any{}, "some aspect types description", parameters.NewStringParameter("aspectType", "some aspect type description")),
parameters.NewStringParameter("entry", "some entry description"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,12 @@ func (cfg Config) ToolConfigType() string {
}

func (cfg Config) Initialize(srcs map[string]sources.Source) (tools.Tool, error) {
query := parameters.NewStringParameter("query", "The query against which entries in scope should be matched.")
query := parameters.NewStringParameter("query",
"A query string for searching entries, following Dataplex search syntax. " +
"Supports logical operators (AND, OR, NOT) and grouping. " +
"For example, to find a table that might have been renamed, you could use 'type:table (name:books OR fiction)'. " +
"This can be more efficient than multiple separate calls." +
"Warning: Performing broad searches without specific filters (e.g., type:table) can be slow and consume significant resources. When performing exploratory searches, always use the pageSize parameter to limit the number of results returned.")
scope := parameters.NewStringParameterWithDefault("scope", "", "A scope limits the search space to a particular project or organization. It must be in the format: organizations/<org_id> or projects/<project_id> or projects/<project_number>.")
pageSize := parameters.NewIntParameterWithDefault("pageSize", 5, "Number of results in the search page.")
orderBy := parameters.NewStringParameterWithDefault("orderBy", "relevance", "Specifies the ordering of results. Supported values are: relevance, last_modified_timestamp, last_modified_timestamp asc")
Expand Down
45 changes: 35 additions & 10 deletions tests/dataplex/dataplex_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ func runDataplexToolGetTest(t *testing.T) {
{
name: "get my-dataplex-lookup-entry-tool",
toolName: "my-dataplex-lookup-entry-tool",
expectedParams: []string{"name", "view", "aspectTypes", "entry"},
expectedParams: []string{"entry", "view", "aspectTypes"},
},
{
name: "get my-dataplex-search-aspect-types-tool",
Expand Down Expand Up @@ -581,7 +581,7 @@ func runDataplexLookupEntryToolInvokeTest(t *testing.T, tableName string, datase
name: "Success - Entry Found with Authorization",
api: "http://127.0.0.1:5000/api/tool/my-auth-dataplex-lookup-entry-tool/invoke",
requestHeader: map[string]string{"my-google-auth_token": idToken},
requestBody: bytes.NewBuffer([]byte(fmt.Sprintf("{\"name\":\"projects/%s/locations/us\", \"entry\":\"projects/%s/locations/us/entryGroups/@bigquery/entries/bigquery.googleapis.com/projects/%s/datasets/%s\"}", DataplexProject, DataplexProject, DataplexProject, datasetName))),
requestBody: bytes.NewBuffer([]byte(fmt.Sprintf("{\"entry\":\"projects/%s/locations/us/entryGroups/@bigquery/entries/bigquery.googleapis.com/projects/%s/datasets/%s\"}", DataplexProject, DataplexProject, datasetName))),
wantStatusCode: 200,
expectResult: true,
wantContentKey: "name",
Expand All @@ -590,7 +590,7 @@ func runDataplexLookupEntryToolInvokeTest(t *testing.T, tableName string, datase
name: "Failure - Invalid Authorization Token",
api: "http://127.0.0.1:5000/api/tool/my-auth-dataplex-lookup-entry-tool/invoke",
requestHeader: map[string]string{"my-google-auth_token": "invalid_token"},
requestBody: bytes.NewBuffer([]byte(fmt.Sprintf("{\"name\":\"projects/%s/locations/us\", \"entry\":\"projects/%s/locations/us/entryGroups/@bigquery/entries/bigquery.googleapis.com/projects/%s/datasets/%s\"}", DataplexProject, DataplexProject, DataplexProject, datasetName))),
requestBody: bytes.NewBuffer([]byte(fmt.Sprintf("{\"entry\":\"projects/%s/locations/us/entryGroups/@bigquery/entries/bigquery.googleapis.com/projects/%s/datasets/%s\"}", DataplexProject, DataplexProject, datasetName))),
wantStatusCode: 401,
expectResult: false,
wantContentKey: "name",
Expand All @@ -599,24 +599,32 @@ func runDataplexLookupEntryToolInvokeTest(t *testing.T, tableName string, datase
name: "Failure - Without Authorization Token",
api: "http://127.0.0.1:5000/api/tool/my-auth-dataplex-lookup-entry-tool/invoke",
requestHeader: map[string]string{},
requestBody: bytes.NewBuffer([]byte(fmt.Sprintf("{\"name\":\"projects/%s/locations/us\", \"entry\":\"projects/%s/locations/us/entryGroups/@bigquery/entries/bigquery.googleapis.com/projects/%s/datasets/%s\"}", DataplexProject, DataplexProject, DataplexProject, datasetName))),
requestBody: bytes.NewBuffer([]byte(fmt.Sprintf("{\"entry\":\"projects/%s/locations/us/entryGroups/@bigquery/entries/bigquery.googleapis.com/projects/%s/datasets/%s\"}", DataplexProject, DataplexProject, datasetName))),
wantStatusCode: 401,
expectResult: false,
wantContentKey: "name",
},
{
name: "Failure - Invalid Entry Format",
api: "http://127.0.0.1:5000/api/tool/my-dataplex-lookup-entry-tool/invoke",
requestHeader: map[string]string{},
requestBody: bytes.NewBuffer([]byte(`{"entry":"invalid/entry/format"}`)),
wantStatusCode: 400,
expectResult: false,
},
{
name: "Failure - Entry Not Found or Permission Denied",
api: "http://127.0.0.1:5000/api/tool/my-dataplex-lookup-entry-tool/invoke",
requestHeader: map[string]string{},
requestBody: bytes.NewBuffer([]byte(fmt.Sprintf("{\"name\":\"projects/%s/locations/us\", \"entry\":\"projects/%s/locations/us/entryGroups/@bigquery/entries/bigquery.googleapis.com/projects/%s/datasets/%s\"}", DataplexProject, DataplexProject, DataplexProject, "non-existent-dataset"))),
requestBody: bytes.NewBuffer([]byte(fmt.Sprintf("{\"entry\":\"projects/%s/locations/us/entryGroups/@bigquery/entries/bigquery.googleapis.com/projects/%s/datasets/%s\"}", DataplexProject, DataplexProject, "non-existent-dataset"))),
wantStatusCode: 200,
expectResult: false,
},
{
name: "Success - Entry Found with Basic View",
api: "http://127.0.0.1:5000/api/tool/my-dataplex-lookup-entry-tool/invoke",
requestHeader: map[string]string{},
requestBody: bytes.NewBuffer([]byte(fmt.Sprintf("{\"name\":\"projects/%s/locations/us\", \"entry\":\"projects/%s/locations/us/entryGroups/@bigquery/entries/bigquery.googleapis.com/projects/%s/datasets/%s/tables/%s\", \"view\": %d}", DataplexProject, DataplexProject, DataplexProject, datasetName, tableName, 1))),
requestBody: bytes.NewBuffer([]byte(fmt.Sprintf("{\"entry\":\"projects/%s/locations/us/entryGroups/@bigquery/entries/bigquery.googleapis.com/projects/%s/datasets/%s/tables/%s\", \"view\": %d}", DataplexProject, DataplexProject, datasetName, tableName, 1))),
wantStatusCode: 200,
expectResult: true,
wantContentKey: "name",
Expand All @@ -626,15 +634,15 @@ func runDataplexLookupEntryToolInvokeTest(t *testing.T, tableName string, datase
name: "Failure - Entry with Custom View without Aspect Types",
api: "http://127.0.0.1:5000/api/tool/my-dataplex-lookup-entry-tool/invoke",
requestHeader: map[string]string{},
requestBody: bytes.NewBuffer([]byte(fmt.Sprintf("{\"name\":\"projects/%s/locations/us\", \"entry\":\"projects/%s/locations/us/entryGroups/@bigquery/entries/bigquery.googleapis.com/projects/%s/datasets/%s/tables/%s\", \"view\": %d}", DataplexProject, DataplexProject, DataplexProject, datasetName, tableName, 3))),
requestBody: bytes.NewBuffer([]byte(fmt.Sprintf("{\"entry\":\"projects/%s/locations/us/entryGroups/@bigquery/entries/bigquery.googleapis.com/projects/%s/datasets/%s/tables/%s\", \"view\": %d}", DataplexProject, DataplexProject, datasetName, tableName, 3))),
wantStatusCode: 200,
expectResult: false,
},
{
name: "Success - Entry Found with only Schema Aspect",
api: "http://127.0.0.1:5000/api/tool/my-dataplex-lookup-entry-tool/invoke",
requestHeader: map[string]string{},
requestBody: bytes.NewBuffer([]byte(fmt.Sprintf("{\"name\":\"projects/%s/locations/us\", \"entry\":\"projects/%s/locations/us/entryGroups/@bigquery/entries/bigquery.googleapis.com/projects/%s/datasets/%s/tables/%s\", \"aspectTypes\":[\"projects/dataplex-types/locations/global/aspectTypes/schema\"], \"view\": %d}", DataplexProject, DataplexProject, DataplexProject, datasetName, tableName, 3))),
requestBody: bytes.NewBuffer([]byte(fmt.Sprintf("{\"entry\":\"projects/%s/locations/us/entryGroups/@bigquery/entries/bigquery.googleapis.com/projects/%s/datasets/%s/tables/%s\", \"aspectTypes\":[\"projects/dataplex-types/locations/global/aspectTypes/schema\"], \"view\": %d}", DataplexProject, DataplexProject, datasetName, tableName, 3))),
wantStatusCode: 200,
expectResult: true,
wantContentKey: "aspects",
Expand Down Expand Up @@ -834,9 +842,8 @@ func runDataplexLookupContextToolInvokeTest(t *testing.T, tableName string, data
t.Fatalf("error getting Google ID token: %s", err)
}

name := fmt.Sprintf("projects/%s/locations/us", DataplexProject)
resourceName := fmt.Sprintf("projects/%s/locations/us/entryGroups/@bigquery/entries/bigquery.googleapis.com/projects/%s/datasets/%s/tables/%s", DataplexProject, DataplexProject, datasetName, tableName)
requestBodyFmt := fmt.Sprintf(`{"name":"%s","resources":["%s"]}`, name, resourceName)
requestBodyFmt := fmt.Sprintf(`{"resources":["%s"]}`, resourceName)

testCases := []struct {
name string
Expand Down Expand Up @@ -874,6 +881,24 @@ func runDataplexLookupContextToolInvokeTest(t *testing.T, tableName string, data
expectResult: false,
wantContentKey: "context",
},
{
name: "Failure - Invalid Resource Format",
api: "http://127.0.0.1:5000/api/tool/my-dataplex-lookup-context-tool/invoke",
requestHeader: map[string]string{},
requestBody: bytes.NewBufferString(`{"resources":["projects/test-project/invalid-format"]}`),
wantStatusCode: 400,
expectResult: false,
wantContentKey: "context",
},
{
name: "Failure - Resources with different locations",
api: "http://127.0.0.1:5000/api/tool/my-dataplex-lookup-context-tool/invoke",
requestHeader: map[string]string{},
requestBody: bytes.NewBufferString(`{"resources":["projects/test-project/locations/us/entryGroups/g1/entries/e1", "projects/test-project/locations/europe-west1/entryGroups/g2/entries/e2"]}`),
wantStatusCode: 400,
expectResult: false,
wantContentKey: "context",
},
}

for _, tc := range testCases {
Expand Down