Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
231 changes: 0 additions & 231 deletions cli/internal/providers/datacatalog/localcatalog/tracking_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,43 +30,6 @@ type TrackingPlan struct {
LocalID string `json:"id"`
Description string `json:"description,omitempty"`
Rules []*TPRule `json:"rules,omitempty"`
// Event and Props underneath event on the tracking plan
// This is automatically generated by the code when expanding refs
EventProps []*TPEvent `json:"event_props,omitempty"`
}

type TPEvent struct {
Name string
LocalID string
Ref string
Description string
CategoryRef *string
Type string
AllowUnplanned bool
IdentitySection string
Properties []*TPEventProperty
Variants Variants
}

func (e *TPEvent) PropertyByLocalID(localID string) *TPEventProperty {
for _, prop := range e.Properties {
if prop.LocalID == localID {
return prop
}
}
return nil
}

type TPEventProperty struct {
Name string `json:"name"`
Ref string `json:"$ref"`
LocalID string `json:"id"`
Description string `json:"description"`
Type string `json:"type"`
Config map[string]interface{} `json:"config"`
Required bool `json:"required"`
Properties []*TPEventProperty `json:"properties,omitempty"` // NEW: Nested properties
AdditionalProperties *bool `json:"additionalProperties,omitempty"`
}

type TPRule struct {
Expand Down Expand Up @@ -95,200 +58,6 @@ type TPRuleIncludes struct {
Ref string `json:"$ref"`
}

// ExpandRefs simply expands the references being held
// when reading the tracking plan with the actual events and properties
func (tp *TrackingPlan) ExpandRefs(dc *DataCatalog) error {
log.Debug("expanding refs for the tracking plan", "id", tp.LocalID)

expandedEvents := make([]*TPEvent, 0)
for _, rule := range tp.Rules {

switch {
case rule.Event != nil:
tpEvent, err := expandEventRefs(rule, dc)
if err != nil {
return fmt.Errorf("expanding event refs on the rule: %s in tracking plan: %s, err:%w",
rule.LocalID,
tp.LocalID,
err)
}
expandedEvents = append(expandedEvents, tpEvent)

case rule.Includes != nil:
tpEvents, err := expandIncludeRefs(rule, dc)
if err != nil {
return fmt.Errorf("expanding include refs on the rule: %s in tracking plan: %s, err:%w",
rule.LocalID,
tp.LocalID,
err)
}
expandedEvents = append(expandedEvents, tpEvents...)

default:
return fmt.Errorf("both the event and includes section in the rule:%s for tp: %s are nil", rule.LocalID, tp.LocalID)
}

}
tp.EventProps = expandedEvents
return nil
}

// expandIncludeRefs expands the include references in the tracking plan rule definition
// TODO: Make this function recursive to allow for multiple levels of include
func expandIncludeRefs(rule *TPRule, fetcher CatalogResourceFetcher) ([]*TPEvent, error) {
log.Debug("expanding include refs within the rule", "ruleID", rule.LocalID)

if rule.Includes == nil {
return nil, fmt.Errorf("empty rule includes")
}

matches := IncludeRegex.FindStringSubmatch(rule.Includes.Ref)
if len(matches) != 3 {
return nil, fmt.Errorf("includes ref: %s invalid as failed regex match", rule.Includes.Ref)
}

tpGroup, ruleID := matches[1], matches[2]
rules := make([]*TPRule, 0)

if ruleID == "*" {
eventRules, _ := fetcher.TPEventRules(tpGroup)
rules = append(rules, eventRules...) // fetch all the tp rules in the group
} else {
rules = append(rules, fetcher.TPEventRule(tpGroup, ruleID)) // fetch the specific rule with the tpGroup
}

toReturn := make([]*TPEvent, 0)
// Assume rules are now actual rules and not indirections
for _, rule := range rules {

if rule.Event == nil {
continue
}

// This rule should have event ref only
// which we can expand now
event, err := expandEventRefs(rule, fetcher)
if err != nil {
return nil, fmt.Errorf("expanding event ref of the expanded include rule: %s failed, err: %w", rule.LocalID, err)
}

toReturn = append(toReturn, event)
}

return toReturn, nil
}

// expandEventRefs expands the direct event references in the tracking plan rule definition
func expandEventRefs(rule *TPRule, fetcher CatalogResourceFetcher) (*TPEvent, error) {
log.Debug("expanding event refs within the rule", "ruleID", rule.LocalID)

if rule.Event == nil {
return nil, fmt.Errorf("empty rule event")
}

matches := EventRegex.FindStringSubmatch(rule.Event.Ref)
if len(matches) != 3 {
return nil, fmt.Errorf("event ref: %s invalid as failed regex match", rule.Event.Ref)
}

eventGroup, eventID := matches[1], matches[2]
event := fetcher.Event(eventGroup, eventID)
if event == nil {
return nil, fmt.Errorf("looking up event: %s in group: %s failed", eventID, eventGroup)
}

var categoryRef *string
if event.CategoryRef != nil {
catMatches := CategoryRegex.FindStringSubmatch(*event.CategoryRef)
if len(catMatches) != 3 {
return nil, fmt.Errorf("category ref: %s invalid as failed regex match", *event.CategoryRef)
}
categoryRef = event.CategoryRef
}

toReturn := TPEvent{
Name: event.Name,
LocalID: event.LocalID,
Ref: rule.Event.Ref,
Description: event.Description,
CategoryRef: categoryRef,
Type: event.Type,
AllowUnplanned: rule.Event.AllowUnplanned,
IdentitySection: rule.Event.IdentitySection,
Properties: make([]*TPEventProperty, 0),
Variants: rule.Variants,
}

// Load the properties from the data catalog (including nested properties)
// into corresponding event on the tracking plan
for _, prop := range rule.Properties {
property, err := expandPropertyRefs(prop, fetcher)
if err != nil {
return nil, fmt.Errorf("expanding property refs within the property: %s failed, err: %w", prop.Ref, err)
}

toReturn.Properties = append(toReturn.Properties, &TPEventProperty{
Name: property.Name,
Ref: prop.Ref,
LocalID: property.LocalID,
Properties: property.Properties,
Description: property.Description,
Type: property.Type,
Required: prop.Required,
Config: shallowCopy(property.Config),
AdditionalProperties: property.AdditionalProperties,
})
}

return &toReturn, nil
}

func expandPropertyRefs(prop *TPRuleProperty, fetcher CatalogResourceFetcher) (*TPEventProperty, error) {
log.Debug("expanding property refs within the property", "propertyID", prop.Ref)

matches := PropRegex.FindStringSubmatch(prop.Ref)
if len(matches) != 3 {
return nil, fmt.Errorf("property ref: %s invalid as failed regex match", prop.Ref)
}

propertyGroup, propertyID := matches[1], matches[2]
property := fetcher.Property(propertyGroup, propertyID)
if property == nil {
return nil, fmt.Errorf("looking up property: %s in group: %s failed", propertyID, propertyGroup)
}

properties := make([]*TPEventProperty, 0)
for _, nestedProp := range prop.Properties {
nestedProp, err := expandPropertyRefs(nestedProp, fetcher)
if err != nil {
return nil, fmt.Errorf("expanding nested property refs within the property: %s failed, err: %w", prop.Ref, err)
}
properties = append(properties, nestedProp)
}

return &TPEventProperty{
Name: property.Name,
Ref: prop.Ref,
Properties: properties,
LocalID: property.LocalID,
Description: property.Description,
Type: property.Type,
Required: prop.Required,
Config: shallowCopy(property.Config),
AdditionalProperties: prop.AdditionalProperties,
}, nil
}

func shallowCopy(input map[string]any) map[string]any {
output := make(map[string]any, len(input))

for k, v := range input {
output[k] = v
}

return output
}

func ExtractTrackingPlan(s *specs.Spec) (TrackingPlan, error) {
log.Debug("extracting tracking plan from resource definition", "metadata.name", s.Metadata["name"])

Expand Down
85 changes: 43 additions & 42 deletions cli/internal/providers/datacatalog/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,9 @@ func (p *Provider) Validate(_ *resources.Graph) error {
}

func (p *Provider) ResourceGraph() (*resources.Graph, error) {
if err := inflateRefs(p.dc); err != nil {
return nil, fmt.Errorf("inflating refs: %w", err)
}
// if err := inflateRefs(p.dc); err != nil {
// return nil, fmt.Errorf("inflating refs: %w", err)
// }

return createResourceGraph(p.dc)
}
Expand Down Expand Up @@ -138,17 +138,26 @@ func createResourceGraph(catalog *localcatalog.DataCatalog) (*resources.Graph, e
graph := resources.NewGraph()

// First, pre-calculate all URNs to use for references
propIDToURN := make(map[string]string)
var (
propIDToURN = make(map[string]string)
propIDToProperty = make(map[string]*localcatalog.Property)
)

for _, props := range catalog.Properties {
for _, prop := range props {
propIDToURN[prop.LocalID] = resources.URN(prop.LocalID, pstate.PropertyResourceType)
propIDToProperty[prop.LocalID] = &prop
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Loop variable pointer causes all map entries to share same address

Taking the address of loop variables prop and event (&prop and &event) in the range loops causes all map entries in propIDToProperty and eventIDToEvent to point to the same memory location. In Go versions before 1.22, the loop variable is reused across iterations, so all entries will reference the last value processed. This causes getEntityFromRef to return incorrect property/event data for all lookups except the last one, resulting in tracking plans being created with wrong metadata.

Additional Locations (1)

Fix in Cursor Fix in Web

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are on go 1.24 and this issue was fixed.

}
}

eventIDToURN := make(map[string]string)
var (
eventIDToURN = make(map[string]string)
eventIDToEvent = make(map[string]*localcatalog.Event)
)
for _, events := range catalog.Events {
for _, event := range events {
eventIDToURN[event.LocalID] = resources.URN(event.LocalID, pstate.EventResourceType)
eventIDToEvent[event.LocalID] = &event
}
}

Expand Down Expand Up @@ -205,6 +214,30 @@ func createResourceGraph(catalog *localcatalog.DataCatalog) (*resources.Graph, e
}
}

getEntityFromRef := func(ref string) any {
parts := strings.Split(ref, "/")
if len(parts) < 4 {
return nil
}

var (
// #/events/group/id => events = parts[1], group = parts[2], id = parts[3]
entityType = parts[1]
id = parts[3]
)

switch entityType {
case localcatalog.KindProperties:
return propIDToProperty[id]

case localcatalog.KindEvents:
return eventIDToEvent[id]

default:
return nil
}
}

// Add properties to the graph
for group, props := range catalog.Properties {
for _, prop := range props {
Expand All @@ -228,8 +261,6 @@ func createResourceGraph(catalog *localcatalog.DataCatalog) (*resources.Graph, e
)),
)
graph.AddResource(resource)

propIDToURN[prop.LocalID] = resource.URN()
}
}

Expand All @@ -252,10 +283,8 @@ func createResourceGraph(catalog *localcatalog.DataCatalog) (*resources.Graph, e
event.LocalID,
)),
)
graph.AddResource(resource)

graph.AddResource(resource)
eventIDToURN[event.LocalID] = resource.URN()
}
}

Expand Down Expand Up @@ -311,7 +340,11 @@ func createResourceGraph(catalog *localcatalog.DataCatalog) (*resources.Graph, e
log.Debug("adding tracking plan to graph", "tp", tp.LocalID, "group", group)

args := pstate.TrackingPlanArgs{}
if err := args.FromCatalogTrackingPlan(tp, getURNFromRef); err != nil {
if err := args.FromCatalogTrackingPlan(
tp,
getURNFromRef,
getEntityFromRef,
); err != nil {
return nil, fmt.Errorf("creating tracking plan args: %w", err)
}

Expand All @@ -328,39 +361,7 @@ func createResourceGraph(catalog *localcatalog.DataCatalog) (*resources.Graph, e
)),
)
graph.AddResource(resource)
graph.AddDependencies(resource.URN(), getDependencies(tp, propIDToURN, eventIDToURN))
}

return graph, nil
}

// getDependencies simply fetch the dependencies on the trackingplan in form of the URN's
// of the properties and events that are used in the tracking plan
func getDependencies(tp *localcatalog.TrackingPlan, propIDToURN, eventIDToURN map[string]string) []string {
dependencies := make([]string, 0)

for _, event := range tp.EventProps {
if urn, ok := eventIDToURN[event.LocalID]; ok {
dependencies = append(dependencies, urn)
}

for _, prop := range event.Properties {
if urn, ok := propIDToURN[prop.LocalID]; ok {
dependencies = append(dependencies, urn)
}
}
}

return dependencies
}

func inflateRefs(catalog *localcatalog.DataCatalog) error {
log.Debug("inflating all the references in the catalog")

for _, tp := range catalog.TrackingPlans {
if err := tp.ExpandRefs(catalog); err != nil {
return fmt.Errorf("expanding refs on tp: %s err: %w", tp.LocalID, err)
}
}
return nil
}
Loading