diff --git a/README.md b/README.md index eec66ad..7d9640f 100644 --- a/README.md +++ b/README.md @@ -21,6 +21,7 @@ See [ITX Proxy Implementation Architecture](docs/itx-proxy-implementation.md) fo - **JWT Authentication**: Secure authentication via Heimdall JWT tokens - **OAuth2 M2M**: Machine-to-machine authentication with ITX using Auth0 - **ID Mapping**: Automatic v1/v2 ID translation via NATS +- **Event Processing**: Real-time sync of v1 survey data to v2 indexer and FGA (see [Event Processing](docs/event-processing.md)) - **OpenFGA Authorization**: Fine-grained access control - **OpenAPI Spec**: Auto-generated from Goa design - **Kubernetes Ready**: Includes Helm charts with health checks and probes @@ -179,6 +180,15 @@ The service is configured via environment variables: - `NATS_URL` - NATS server URL for ID mapping - `ID_MAPPING_DISABLED` - Disable ID mapping for local dev (default: false) +### Event Processing + +- `EVENT_PROCESSING_ENABLED` - Enable/disable event processing (default: true) +- `EVENT_CONSUMER_NAME` - JetStream consumer name (default: survey-service-kv-consumer) +- `EVENT_STREAM_NAME` - JetStream stream name (default: KV_v1-objects) +- `EVENT_FILTER_SUBJECT` - NATS subject filter (default: $KV.v1-objects.>) + +See [Event Processing Documentation](docs/event-processing.md) for details. + ## Docker ### Build Image @@ -238,11 +248,13 @@ kubectl logs -n lfx -l app=lfx-v2-survey-service │ └── survey/v1/design/ # API design (DSL) ├── cmd/ # Application entry points │ └── survey-api/ # Main service binary +│ └── eventing/ # Event processing handlers ├── gen/ # Generated code (from Goa) ├── internal/ # Private application code │ ├── domain/ # Domain interfaces and types │ ├── infrastructure/ # Infrastructure implementations │ │ ├── auth/ # JWT authentication +│ │ ├── eventing/ # Event processing infrastructure │ │ ├── idmapper/ # ID mapping (NATS) │ │ └── proxy/ # ITX proxy client │ ├── logging/ # Structured logging @@ -253,6 +265,7 @@ kubectl logs -n lfx -l app=lfx-v2-survey-service │ └── models/itx/ # ITX API models ├── docs/ # Documentation │ ├── api-contracts/ # API contract documentation +│ ├── event-processing.md # Event processing guide │ └── itx-proxy-implementation.md # Architecture guide ├── charts/ # Helm charts │ └── lfx-v2-survey-service/ diff --git a/cmd/survey-api/eventing/event_processor.go b/cmd/survey-api/eventing/event_processor.go new file mode 100644 index 0000000..dad25a8 --- /dev/null +++ b/cmd/survey-api/eventing/event_processor.go @@ -0,0 +1,156 @@ +// Copyright The Linux Foundation and each contributor to LFX. +// SPDX-License-Identifier: MIT + +package eventing + +import ( + "context" + "fmt" + "log/slog" + "time" + + "github.com/linuxfoundation/lfx-v2-survey-service/internal/domain" + "github.com/linuxfoundation/lfx-v2-survey-service/internal/infrastructure/eventing" + "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" +) + +const ( + V1MappingsBucket = "v1-mappings" +) + +// EventProcessor handles NATS KV bucket event processing +type EventProcessor struct { + natsConn *nats.Conn + jsInstance jetstream.JetStream + consumer jetstream.Consumer + consumeCtx jetstream.ConsumeContext + publisher domain.EventPublisher + idMapper domain.IDMapper + mappingsKV jetstream.KeyValue + logger *slog.Logger + config eventing.Config +} + +// NewEventProcessor creates a new event processor +func NewEventProcessor( + cfg eventing.Config, + idMapper domain.IDMapper, + logger *slog.Logger, +) (*EventProcessor, error) { + // Connect to NATS + conn, err := nats.Connect(cfg.NATSURL, + nats.DrainTimeout(30*time.Second), + nats.ErrorHandler(func(nc *nats.Conn, sub *nats.Subscription, err error) { + if sub != nil { + logger.With("error", err, "subject", sub.Subject).Error("NATS async error encountered") + } else { + logger.With("error", err).Error("NATS async error encountered") + } + }), + nats.ClosedHandler(func(nc *nats.Conn) { + logger.Warn("NATS connection closed") + }), + ) + if err != nil { + return nil, fmt.Errorf("failed to connect to NATS: %w", err) + } + + // Create JetStream context + jsContext, err := jetstream.New(conn) + if err != nil { + conn.Close() + return nil, fmt.Errorf("failed to create JetStream context: %w", err) + } + + // Initialize publisher + publisher := eventing.NewNATSPublisher(conn, logger) + + // Access the V1 mappings KV bucket + mappingsKV, err := jsContext.KeyValue(context.Background(), V1MappingsBucket) + if err != nil { + conn.Close() + return nil, fmt.Errorf("failed to access %s KV bucket: %w", V1MappingsBucket, err) + } + + return &EventProcessor{ + natsConn: conn, + jsInstance: jsContext, + publisher: publisher, + idMapper: idMapper, + mappingsKV: mappingsKV, + logger: logger, + config: cfg, + }, nil +} + +// Start starts the event processor +func (ep *EventProcessor) Start(ctx context.Context) error { + ep.logger.Info("Starting event processor", "consumer_name", ep.config.ConsumerName) + + // Create or update consumer + consumer, err := ep.jsInstance.CreateOrUpdateConsumer(ctx, ep.config.StreamName, jetstream.ConsumerConfig{ + Name: ep.config.ConsumerName, + Durable: ep.config.ConsumerName, + DeliverPolicy: jetstream.DeliverLastPerSubjectPolicy, + AckPolicy: jetstream.AckExplicitPolicy, + FilterSubject: ep.config.FilterSubject, + MaxDeliver: ep.config.MaxDeliver, + AckWait: ep.config.AckWait, + MaxAckPending: ep.config.MaxAckPending, + Description: "Durable/shared KV bucket watcher for survey service", + }) + if err != nil { + return fmt.Errorf("failed to create or update consumer: %w", err) + } + ep.consumer = consumer + + // Start consuming messages + consumeCtx, err := consumer.Consume(func(msg jetstream.Msg) { + kvMessageHandler(ctx, msg, ep.publisher, ep.idMapper, ep.mappingsKV, ep.logger) + }, jetstream.ConsumeErrHandler(func(_ jetstream.ConsumeContext, err error) { + ep.logger.With("error", err).Error("KV consumer error encountered") + })) + if err != nil { + return fmt.Errorf("failed to start consuming messages: %w", err) + } + ep.consumeCtx = consumeCtx + + ep.logger.Info("Event processor started successfully") + + // Block until context is cancelled + <-ctx.Done() + + ep.logger.Info("Event processor context cancelled") + return nil +} + +// Stop stops the event processor gracefully +func (ep *EventProcessor) Stop() error { + ep.logger.Info("Stopping event processor...") + + // Stop the consumer + if ep.consumeCtx != nil { + ep.consumeCtx.Stop() + ep.logger.Info("Consumer stopped") + } + + // Drain and close the NATS connection + if ep.natsConn != nil { + if err := ep.natsConn.Drain(); err != nil { + ep.logger.With("error", err).Error("Error draining NATS connection") + } + ep.natsConn.Close() + ep.logger.Info("NATS connection closed") + } + + // Close the publisher + if ep.publisher != nil { + if err := ep.publisher.Close(); err != nil { + ep.logger.With("error", err).Error("Error closing publisher") + } + } + + ep.logger.Info("Event processor stopped successfully") + return nil +} diff --git a/cmd/survey-api/eventing/kv_handler.go b/cmd/survey-api/eventing/kv_handler.go new file mode 100644 index 0000000..fe9cbce --- /dev/null +++ b/cmd/survey-api/eventing/kv_handler.go @@ -0,0 +1,225 @@ +// Copyright The Linux Foundation and each contributor to LFX. +// SPDX-License-Identifier: MIT + +package eventing + +import ( + "context" + "encoding/json" + "fmt" + "log/slog" + "strings" + "time" + + "github.com/linuxfoundation/lfx-v2-survey-service/internal/domain" + "github.com/nats-io/nats.go/jetstream" +) + +const ( + V1ObjectsBucket = "v1-objects" +) + +// kvEntry implements a mock jetstream.KeyValueEntry interface for the handler +type kvEntry struct { + key string + value []byte + operation jetstream.KeyValueOp +} + +func (e *kvEntry) Key() string { + return e.key +} + +func (e *kvEntry) Value() []byte { + return e.value +} + +func (e *kvEntry) Operation() jetstream.KeyValueOp { + return e.operation +} + +func (e *kvEntry) Bucket() string { + return V1ObjectsBucket +} + +func (e *kvEntry) Created() time.Time { + return time.Now() +} + +func (e *kvEntry) Delta() uint64 { + return 0 +} + +func (e *kvEntry) Revision() uint64 { + return 0 +} + +// kvMessageHandler processes KV update messages from the consumer +func kvMessageHandler( + ctx context.Context, + msg jetstream.Msg, + publisher domain.EventPublisher, + idMapper domain.IDMapper, + mappingsKV jetstream.KeyValue, + logger *slog.Logger, +) { + // Parse the message as a KV entry + headers := msg.Headers() + subject := msg.Subject() + + // Extract key from the subject ($KV.{bucket}.{key}) + key := "" + if len(subject) > len(fmt.Sprintf("$KV.%s.", V1ObjectsBucket)) { + key = subject[len(fmt.Sprintf("$KV.%s.", V1ObjectsBucket)):] + } + + // Determine operation from headers + operation := jetstream.KeyValuePut // Default to PUT + if opHeader := headers.Get("KV-Operation"); opHeader != "" { + switch opHeader { + case "DEL": + operation = jetstream.KeyValueDelete + case "PURGE": + operation = jetstream.KeyValuePurge + } + } + + // Create a mock KV entry for the handler + entry := &kvEntry{ + key: key, + value: msg.Data(), + operation: operation, + } + + // Process the KV entry and check if retry is needed + shouldRetry := kvHandler(ctx, entry, publisher, idMapper, mappingsKV, logger) + + // Handle message acknowledgment based on retry decision + if shouldRetry { + // Get message metadata to determine retry attempt number + metadata, err := msg.Metadata() + if err != nil { + logger.With("error", err, "key", key).Warn("failed to get message metadata, using default delay") + metadata = &jetstream.MsgMetadata{NumDelivered: 1} + } + + // Calculate exponential backoff delay based on delivery attempt + // Attempts: 1st retry = 2s, 2nd retry = 10s, 3rd+ retry = 20s + var delay time.Duration + switch metadata.NumDelivered { + case 1: + delay = 2 * time.Second + case 2: + delay = 10 * time.Second + default: + // This case won't be hit if MaxDeliver is set to 3 or less + delay = 20 * time.Second + } + + // NAK the message with exponential backoff delay + // This allows time for parent objects (e.g., surveys) to be stored before retrying child objects (e.g., survey responses) + if err := msg.NakWithDelay(delay); err != nil { + logger.With("error", err, "key", key).Error("failed to NAK KV JetStream message for retry") + } else { + logger.With("key", key, "attempt", metadata.NumDelivered, "delay_seconds", delay.Seconds()).Debug("NAKed KV message for retry with exponential backoff") + } + } else { + // Acknowledge the message + if err := msg.Ack(); err != nil { + logger.With("error", err, "key", key).Error("failed to acknowledge KV JetStream message") + } + } +} + +// kvHandler routes KV entries by operation type +// Returns true if the message should be retried (NAK), false if it should be acknowledged (ACK) +func kvHandler( + ctx context.Context, + entry jetstream.KeyValueEntry, + publisher domain.EventPublisher, + idMapper domain.IDMapper, + mappingsKV jetstream.KeyValue, + logger *slog.Logger, +) bool { + switch entry.Operation() { + case jetstream.KeyValuePut: + return handleKVPut(ctx, entry, publisher, idMapper, mappingsKV, logger) + case jetstream.KeyValueDelete, jetstream.KeyValuePurge: + return handleKVDelete(ctx, entry, publisher, idMapper, mappingsKV, logger) + default: + logger.With("key", entry.Key(), "operation", entry.Operation()).Debug("ignoring unknown KV operation") + return false // ACK unknown operations + } +} + +// handleKVPut processes PUT operations by routing to specific handlers based on key prefix +func handleKVPut( + ctx context.Context, + entry jetstream.KeyValueEntry, + publisher domain.EventPublisher, + idMapper domain.IDMapper, + mappingsKV jetstream.KeyValue, + logger *slog.Logger, +) bool { + key := entry.Key() + value := entry.Value() + + // Unmarshal the data + var v1Data map[string]interface{} + if err := json.Unmarshal(value, &v1Data); err != nil { + logger.With("error", err, "key", key).Error("failed to unmarshal KV data") + return false // Permanent error, ACK and skip + } + + // Extract key prefix (before first period) + parts := strings.SplitN(key, ".", 2) + if len(parts) == 0 { + logger.With("key", key).Warn("invalid key format") + return false // ACK invalid keys + } + prefix := parts[0] + + // Route to specific handlers based on prefix + switch prefix { + case "itx-surveys": + return handleSurveyUpdate(ctx, key, v1Data, publisher, idMapper, mappingsKV, logger) + case "itx-survey-responses": + return handleSurveyResponseUpdate(ctx, key, v1Data, publisher, idMapper, mappingsKV, logger) + default: + // Not a survey-related key, ACK and skip + return false + } +} + +// handleKVDelete processes DELETE and PURGE operations +func handleKVDelete( + ctx context.Context, + entry jetstream.KeyValueEntry, + publisher domain.EventPublisher, + idMapper domain.IDMapper, + mappingsKV jetstream.KeyValue, + logger *slog.Logger, +) bool { + key := entry.Key() + logger.With("key", key, "operation", entry.Operation()).Debug("received delete/purge operation") + + // Extract key prefix (before first period) + parts := strings.SplitN(key, ".", 2) + if len(parts) < 2 { + logger.With("key", key).Warn("skipping delete - invalid key format") + return false // Permanent error, ACK and skip + } + + prefix := parts[0] + uid := parts[1] // The UID is everything after the first period + + // Route to appropriate delete handler based on prefix + switch prefix { + case "itx-surveys": + return handleSurveyDelete(ctx, uid, publisher, mappingsKV, logger) + case "itx-survey-responses": + return handleSurveyResponseDelete(ctx, uid, publisher, mappingsKV, logger) + default: + return false // ACK unsupported types + } +} diff --git a/cmd/survey-api/eventing/survey_event_handler.go b/cmd/survey-api/eventing/survey_event_handler.go new file mode 100644 index 0000000..5a6ac39 --- /dev/null +++ b/cmd/survey-api/eventing/survey_event_handler.go @@ -0,0 +1,523 @@ +// Copyright The Linux Foundation and each contributor to LFX. +// SPDX-License-Identifier: MIT + +package eventing + +import ( + "context" + "encoding/json" + "fmt" + "log/slog" + "strconv" + "strings" + + indexerConstants "github.com/linuxfoundation/lfx-v2-indexer-service/pkg/constants" + "github.com/linuxfoundation/lfx-v2-survey-service/internal/domain" + "github.com/nats-io/nats.go/jetstream" +) + +const errKey = "error" + +// SurveyDBRaw represents raw survey data from v1 DynamoDB/NATS KV bucket +// This is only used for unmarshaling - numeric fields come as strings from DynamoDB +type SurveyDBRaw struct { + ID string `json:"id"` + SurveyMonkeyID string `json:"survey_monkey_id"` + IsProjectSurvey bool `json:"is_project_survey"` + StageFilter string `json:"stage_filter"` + CreatorUsername string `json:"creator_username"` + CreatorName string `json:"creator_name"` + CreatorID string `json:"creator_id"` + CreatedAt string `json:"created_at"` + LastModifiedAt string `json:"last_modified_at"` + LastModifiedBy string `json:"last_modified_by"` + SurveyTitle string `json:"survey_title"` + SurveySendDate string `json:"survey_send_date"` + SurveyCutoffDate string `json:"survey_cutoff_date"` + SurveyReminderRateDays int `json:"survey_reminder_rate_days"` + SendImmediately bool `json:"send_immediately"` + EmailSubject string `json:"email_subject"` + EmailBody string `json:"email_body"` + EmailBodyText string `json:"email_body_text"` + CommitteeCategory string `json:"committee_category"` + Committees []SurveyCommitteeDBRaw `json:"committees"` + CommitteeVotingEnabled bool `json:"committee_voting_enabled"` + SurveyStatus string `json:"survey_status"` + NPSValue int `json:"nps_value"` + NumPromoters int `json:"num_promoters"` + NumPassives int `json:"num_passives"` + NumDetractors int `json:"num_detractors"` + TotalRecipients int `json:"total_recipients"` + TotalSentRecipients int `json:"total_recipients_sent"` + TotalResponses int `json:"total_responses"` + TotalRecipientsOpened int `json:"total_recipients_opened"` + TotalRecipientsClicked int `json:"total_recipients_clicked"` + TotalDeliveryErrors int `json:"total_delivery_errors"` + IsNPSSurvey bool `json:"is_nps_survey"` + CollectorURL string `json:"collector_url"` +} + +// UnmarshalJSON implements custom unmarshaling to handle both string and int inputs for numeric fields. +func (s *SurveyDBRaw) UnmarshalJSON(data []byte) error { + // Use a temporary struct with interface{} types for numeric fields + tmp := struct { + ID string `json:"id"` + SurveyMonkeyID string `json:"survey_monkey_id"` + IsProjectSurvey bool `json:"is_project_survey"` + StageFilter string `json:"stage_filter"` + CreatorUsername string `json:"creator_username"` + CreatorName string `json:"creator_name"` + CreatorID string `json:"creator_id"` + CreatedAt string `json:"created_at"` + LastModifiedAt string `json:"last_modified_at"` + LastModifiedBy string `json:"last_modified_by"` + SurveyTitle string `json:"survey_title"` + SurveySendDate string `json:"survey_send_date"` + SurveyCutoffDate string `json:"survey_cutoff_date"` + SurveyReminderRateDays interface{} `json:"survey_reminder_rate_days"` + SendImmediately bool `json:"send_immediately"` + EmailSubject string `json:"email_subject"` + EmailBody string `json:"email_body"` + EmailBodyText string `json:"email_body_text"` + CommitteeCategory string `json:"committee_category"` + Committees []SurveyCommitteeDBRaw `json:"committees"` + CommitteeVotingEnabled bool `json:"committee_voting_enabled"` + SurveyStatus string `json:"survey_status"` + NPSValue interface{} `json:"nps_value"` + NumPromoters interface{} `json:"num_promoters"` + NumPassives interface{} `json:"num_passives"` + NumDetractors interface{} `json:"num_detractors"` + TotalRecipients interface{} `json:"total_recipients"` + TotalSentRecipients interface{} `json:"total_recipients_sent"` + TotalResponses interface{} `json:"total_responses"` + TotalRecipientsOpened interface{} `json:"total_recipients_opened"` + TotalRecipientsClicked interface{} `json:"total_recipients_clicked"` + TotalDeliveryErrors interface{} `json:"total_delivery_errors"` + IsNPSSurvey bool `json:"is_nps_survey"` + CollectorURL string `json:"collector_url"` + }{} + + if err := json.Unmarshal(data, &tmp); err != nil { + return err + } + + // Helper function to convert interface{} to int + convertToInt := func(v interface{}) (int, error) { + if v == nil { + return 0, nil + } + switch val := v.(type) { + case string: + if val == "" { + return 0, nil + } + return strconv.Atoi(val) + case float64: + return int(val), nil + case int: + return val, nil + default: + return 0, fmt.Errorf("invalid type for numeric field: %T", v) + } + } + + // Assign string and bool fields directly + s.ID = tmp.ID + s.SurveyMonkeyID = tmp.SurveyMonkeyID + s.IsProjectSurvey = tmp.IsProjectSurvey + s.StageFilter = tmp.StageFilter + s.CreatorUsername = tmp.CreatorUsername + s.CreatorName = tmp.CreatorName + s.CreatorID = tmp.CreatorID + s.CreatedAt = tmp.CreatedAt + s.LastModifiedAt = tmp.LastModifiedAt + s.LastModifiedBy = tmp.LastModifiedBy + s.SurveyTitle = tmp.SurveyTitle + s.SurveySendDate = tmp.SurveySendDate + s.SurveyCutoffDate = tmp.SurveyCutoffDate + s.SendImmediately = tmp.SendImmediately + s.EmailSubject = tmp.EmailSubject + s.EmailBody = tmp.EmailBody + s.EmailBodyText = tmp.EmailBodyText + s.CommitteeCategory = tmp.CommitteeCategory + s.Committees = tmp.Committees + s.CommitteeVotingEnabled = tmp.CommitteeVotingEnabled + s.SurveyStatus = tmp.SurveyStatus + s.IsNPSSurvey = tmp.IsNPSSurvey + s.CollectorURL = tmp.CollectorURL + + // Convert numeric fields + var err error + if s.SurveyReminderRateDays, err = convertToInt(tmp.SurveyReminderRateDays); err != nil { + return fmt.Errorf("failed to convert survey_reminder_rate_days: %w", err) + } + if s.NPSValue, err = convertToInt(tmp.NPSValue); err != nil { + return fmt.Errorf("failed to convert nps_value: %w", err) + } + if s.NumPromoters, err = convertToInt(tmp.NumPromoters); err != nil { + return fmt.Errorf("failed to convert num_promoters: %w", err) + } + if s.NumPassives, err = convertToInt(tmp.NumPassives); err != nil { + return fmt.Errorf("failed to convert num_passives: %w", err) + } + if s.NumDetractors, err = convertToInt(tmp.NumDetractors); err != nil { + return fmt.Errorf("failed to convert num_detractors: %w", err) + } + if s.TotalRecipients, err = convertToInt(tmp.TotalRecipients); err != nil { + return fmt.Errorf("failed to convert total_recipients: %w", err) + } + if s.TotalSentRecipients, err = convertToInt(tmp.TotalSentRecipients); err != nil { + return fmt.Errorf("failed to convert total_recipients_sent: %w", err) + } + if s.TotalResponses, err = convertToInt(tmp.TotalResponses); err != nil { + return fmt.Errorf("failed to convert total_responses: %w", err) + } + if s.TotalRecipientsOpened, err = convertToInt(tmp.TotalRecipientsOpened); err != nil { + return fmt.Errorf("failed to convert total_recipients_opened: %w", err) + } + if s.TotalRecipientsClicked, err = convertToInt(tmp.TotalRecipientsClicked); err != nil { + return fmt.Errorf("failed to convert total_recipients_clicked: %w", err) + } + if s.TotalDeliveryErrors, err = convertToInt(tmp.TotalDeliveryErrors); err != nil { + return fmt.Errorf("failed to convert total_delivery_errors: %w", err) + } + + return nil +} + +// SurveyCommitteeDBRaw represents raw committee data from v1 DynamoDB +type SurveyCommitteeDBRaw struct { + CommitteeID string `json:"committee_id"` // v1 SFID + CommitteeName string `json:"committee_name"` + ProjectID string `json:"project_id"` // v1 SFID + ProjectName string `json:"project_name"` + NPSValue int `json:"nps_value"` + NumPromoters int `json:"num_promoters"` + NumPassives int `json:"num_passives"` + NumDetractors int `json:"num_detractors"` + TotalRecipients int `json:"total_recipients"` + TotalSentRecipients int `json:"total_recipients_sent"` + TotalResponses int `json:"total_responses"` + TotalRecipientsOpened int `json:"total_recipients_opened"` + TotalRecipientsClicked int `json:"total_recipients_clicked"` + TotalDeliveryErrors int `json:"total_delivery_errors"` +} + +// UnmarshalJSON implements custom unmarshaling to handle both string and int inputs for numeric fields. +func (c *SurveyCommitteeDBRaw) UnmarshalJSON(data []byte) error { + // Use a temporary struct with interface{} types for numeric fields + tmp := struct { + CommitteeID string `json:"committee_id"` + CommitteeName string `json:"committee_name"` + ProjectID string `json:"project_id"` + ProjectName string `json:"project_name"` + NPSValue interface{} `json:"nps_value"` + NumPromoters interface{} `json:"num_promoters"` + NumPassives interface{} `json:"num_passives"` + NumDetractors interface{} `json:"num_detractors"` + TotalRecipients interface{} `json:"total_recipients"` + TotalSentRecipients interface{} `json:"total_recipients_sent"` + TotalResponses interface{} `json:"total_responses"` + TotalRecipientsOpened interface{} `json:"total_recipients_opened"` + TotalRecipientsClicked interface{} `json:"total_recipients_clicked"` + TotalDeliveryErrors interface{} `json:"total_delivery_errors"` + }{} + + if err := json.Unmarshal(data, &tmp); err != nil { + return err + } + + // Helper function to convert interface{} to int + convertToInt := func(v interface{}) (int, error) { + if v == nil { + return 0, nil + } + switch val := v.(type) { + case string: + if val == "" { + return 0, nil + } + return strconv.Atoi(val) + case float64: + return int(val), nil + case int: + return val, nil + default: + return 0, fmt.Errorf("invalid type for numeric field: %T", v) + } + } + + // Assign string fields directly + c.CommitteeID = tmp.CommitteeID + c.CommitteeName = tmp.CommitteeName + c.ProjectID = tmp.ProjectID + c.ProjectName = tmp.ProjectName + + // Convert numeric fields + var err error + if c.NPSValue, err = convertToInt(tmp.NPSValue); err != nil { + return fmt.Errorf("failed to convert nps_value: %w", err) + } + if c.NumPromoters, err = convertToInt(tmp.NumPromoters); err != nil { + return fmt.Errorf("failed to convert num_promoters: %w", err) + } + if c.NumPassives, err = convertToInt(tmp.NumPassives); err != nil { + return fmt.Errorf("failed to convert num_passives: %w", err) + } + if c.NumDetractors, err = convertToInt(tmp.NumDetractors); err != nil { + return fmt.Errorf("failed to convert num_detractors: %w", err) + } + if c.TotalRecipients, err = convertToInt(tmp.TotalRecipients); err != nil { + return fmt.Errorf("failed to convert total_recipients: %w", err) + } + if c.TotalSentRecipients, err = convertToInt(tmp.TotalSentRecipients); err != nil { + return fmt.Errorf("failed to convert total_recipients_sent: %w", err) + } + if c.TotalResponses, err = convertToInt(tmp.TotalResponses); err != nil { + return fmt.Errorf("failed to convert total_responses: %w", err) + } + if c.TotalRecipientsOpened, err = convertToInt(tmp.TotalRecipientsOpened); err != nil { + return fmt.Errorf("failed to convert total_recipients_opened: %w", err) + } + if c.TotalRecipientsClicked, err = convertToInt(tmp.TotalRecipientsClicked); err != nil { + return fmt.Errorf("failed to convert total_recipients_clicked: %w", err) + } + if c.TotalDeliveryErrors, err = convertToInt(tmp.TotalDeliveryErrors); err != nil { + return fmt.Errorf("failed to convert total_delivery_errors: %w", err) + } + + return nil +} + +// handleSurveyUpdate processes a survey update from itx-surveys records +// Returns true if the message should be retried (NAK), false if it should be acknowledged (ACK) +func handleSurveyUpdate( + ctx context.Context, + key string, + v1Data map[string]interface{}, + publisher domain.EventPublisher, + idMapper domain.IDMapper, + mappingsKV jetstream.KeyValue, + logger *slog.Logger, +) bool { + funcLogger := logger.With("key", key, "handler", "survey") + + funcLogger.DebugContext(ctx, "processing survey update") + + // Convert v1Data map to survey data with proper v2 format + surveyData, err := convertMapToSurveyData(ctx, v1Data, idMapper, funcLogger) + if err != nil { + funcLogger.With(errKey, err).ErrorContext(ctx, "failed to convert v1Data to survey") + return false // Permanent error, ACK and skip + } + + // Extract the survey UID + if surveyData.UID == "" { + funcLogger.ErrorContext(ctx, "missing or invalid uid in survey data") + return false // Permanent error, ACK and skip + } + funcLogger = funcLogger.With("survey_uid", surveyData.UID) + + // Check if survey has at least one valid parent reference (committee or project) + hasValidParent := false + for _, committee := range surveyData.Committees { + if committee.CommitteeUID != "" || committee.ProjectUID != "" { + hasValidParent = true + break + } + } + + if !hasValidParent { + funcLogger.InfoContext(ctx, "skipping survey sync - no valid parent references found") + return false // Permanent issue, ACK and skip + } + + // Determine action (created vs updated) by checking if mapping exists + mappingKey := fmt.Sprintf("survey.%s", surveyData.UID) + indexerAction := indexerConstants.ActionCreated + if _, err := mappingsKV.Get(ctx, mappingKey); err == nil { + indexerAction = indexerConstants.ActionUpdated + } + + // Publish to indexer and FGA-sync + if err := publisher.PublishSurveyEvent(ctx, string(indexerAction), surveyData); err != nil { + funcLogger.With(errKey, err).ErrorContext(ctx, "failed to publish survey event") + // Check if this is a transient error that should be retried + if isTransientError(err) { + return true // NAK for retry + } + return false // Permanent error, ACK and skip + } + + // Store mapping to track that we've seen this survey + if _, err := mappingsKV.Put(ctx, mappingKey, []byte("1")); err != nil { + funcLogger.With(errKey, err).WarnContext(ctx, "failed to store survey mapping") + // Don't retry on mapping storage failures + } + + funcLogger.InfoContext(ctx, "successfully sent survey indexer and access messages") + return false // Success, ACK the message +} + +// convertMapToSurveyData converts v1 survey data to v2 format with proper types and UIDs +func convertMapToSurveyData( + ctx context.Context, + v1Data map[string]interface{}, + idMapper domain.IDMapper, + logger *slog.Logger, +) (*domain.SurveyData, error) { + // Convert map to JSON bytes, then to SurveyDBRaw to handle string fields + jsonBytes, err := json.Marshal(v1Data) + if err != nil { + return nil, fmt.Errorf("failed to marshal v1Data to JSON: %w", err) + } + + var surveyDB SurveyDBRaw + if err := json.Unmarshal(jsonBytes, &surveyDB); err != nil { + return nil, fmt.Errorf("failed to unmarshal JSON into SurveyDBRaw: %w", err) + } + + // Build v2 survey data struct - numeric fields are now properly typed after UnmarshalJSON + surveyData := &domain.SurveyData{ + UID: surveyDB.ID, + ID: surveyDB.ID, + SurveyMonkeyID: surveyDB.SurveyMonkeyID, + IsProjectSurvey: surveyDB.IsProjectSurvey, + StageFilter: surveyDB.StageFilter, + CreatorUsername: surveyDB.CreatorUsername, + CreatorName: surveyDB.CreatorName, + CreatorID: surveyDB.CreatorID, + CreatedAt: surveyDB.CreatedAt, + LastModifiedAt: surveyDB.LastModifiedAt, + LastModifiedBy: surveyDB.LastModifiedBy, + SurveyTitle: surveyDB.SurveyTitle, + SurveySendDate: surveyDB.SurveySendDate, + SurveyCutoffDate: surveyDB.SurveyCutoffDate, + SurveyReminderRateDays: surveyDB.SurveyReminderRateDays, + SendImmediately: surveyDB.SendImmediately, + EmailSubject: surveyDB.EmailSubject, + EmailBody: surveyDB.EmailBody, + EmailBodyText: surveyDB.EmailBodyText, + CommitteeCategory: surveyDB.CommitteeCategory, + CommitteeVotingEnabled: surveyDB.CommitteeVotingEnabled, + SurveyStatus: surveyDB.SurveyStatus, + NPSValue: surveyDB.NPSValue, + NumPromoters: surveyDB.NumPromoters, + NumPassives: surveyDB.NumPassives, + NumDetractors: surveyDB.NumDetractors, + TotalRecipients: surveyDB.TotalRecipients, + TotalSentRecipients: surveyDB.TotalSentRecipients, + TotalResponses: surveyDB.TotalResponses, + TotalRecipientsOpened: surveyDB.TotalRecipientsOpened, + TotalRecipientsClicked: surveyDB.TotalRecipientsClicked, + TotalDeliveryErrors: surveyDB.TotalDeliveryErrors, + IsNPSSurvey: surveyDB.IsNPSSurvey, + CollectorURL: surveyDB.CollectorURL, + } + + // Process committees array - numeric fields are now properly typed after UnmarshalJSON + for _, committeeDB := range surveyDB.Committees { + committeeData := domain.SurveyCommitteeData{ + CommitteeID: committeeDB.CommitteeID, + CommitteeName: committeeDB.CommitteeName, + ProjectID: committeeDB.ProjectID, + ProjectName: committeeDB.ProjectName, + NPSValue: committeeDB.NPSValue, + NumPromoters: committeeDB.NumPromoters, + NumPassives: committeeDB.NumPassives, + NumDetractors: committeeDB.NumDetractors, + TotalRecipients: committeeDB.TotalRecipients, + TotalSentRecipients: committeeDB.TotalSentRecipients, + TotalResponses: committeeDB.TotalResponses, + TotalRecipientsOpened: committeeDB.TotalRecipientsOpened, + TotalRecipientsClicked: committeeDB.TotalRecipientsClicked, + TotalDeliveryErrors: committeeDB.TotalDeliveryErrors, + } + + // Map v1 committee ID (SFID) to v2 committee UID + if committeeDB.CommitteeID != "" { + committeeUID, err := idMapper.MapCommitteeV1ToV2(ctx, committeeDB.CommitteeID) + if err != nil { + logger.With(errKey, err, "field", "committee_id", "value", committeeDB.CommitteeID). + WarnContext(ctx, "failed to get v2 committee UID from v1 committee ID") + // Don't set committee_uid if mapping fails + } else { + committeeData.CommitteeUID = committeeUID + } + } + + // Map v1 project ID (SFID) to v2 project UID + if committeeDB.ProjectID != "" { + projectUID, err := idMapper.MapProjectV1ToV2(ctx, committeeDB.ProjectID) + if err != nil { + logger.With(errKey, err, "field", "project_id", "value", committeeDB.ProjectID). + WarnContext(ctx, "failed to get v2 project UID from v1 project ID") + // Don't set project_uid if mapping fails + } else { + committeeData.ProjectUID = projectUID + logger.With("v1_project_id", committeeDB.ProjectID, "v2_project_uid", projectUID). + DebugContext(ctx, "mapped project v1 ID to v2 UID") + } + } + + surveyData.Committees = append(surveyData.Committees, committeeData) + } + + return surveyData, nil +} + +// handleSurveyDelete processes a survey delete from itx-surveys records +// Returns true if the message should be retried (NAK), false if it should be acknowledged (ACK) +func handleSurveyDelete( + ctx context.Context, + uid string, + publisher domain.EventPublisher, + mappingsKV jetstream.KeyValue, + logger *slog.Logger, +) bool { + funcLogger := logger.With("survey_uid", uid, "handler", "survey_delete") + + funcLogger.DebugContext(ctx, "processing survey delete") + + // Create minimal survey data for delete event + surveyData := &domain.SurveyData{ + UID: uid, + ID: uid, + } + + // Publish delete event to indexer and FGA-sync + if err := publisher.PublishSurveyEvent(ctx, string(indexerConstants.ActionDeleted), surveyData); err != nil { + funcLogger.With(errKey, err).ErrorContext(ctx, "failed to publish survey delete event") + // Check if this is a transient error that should be retried + if isTransientError(err) { + return true // NAK for retry + } + return false // Permanent error, ACK and skip + } + + // Remove mapping from v1-mappings KV + mappingKey := fmt.Sprintf("survey.%s", uid) + if err := mappingsKV.Delete(ctx, mappingKey); err != nil { + funcLogger.With(errKey, err).WarnContext(ctx, "failed to delete survey mapping") + // Don't retry on mapping deletion failures + } + + funcLogger.InfoContext(ctx, "successfully sent survey delete indexer and access messages") + return false // Success, ACK the message +} + +// isTransientError determines if an error is transient and should be retried +func isTransientError(err error) bool { + if err == nil { + return false + } + + errStr := err.Error() + // NATS publish errors, timeouts, connection issues + if strings.Contains(errStr, "timeout") || strings.Contains(errStr, "connection") || + strings.Contains(errStr, "unavailable") || strings.Contains(errStr, "deadline") { + return true + } + + return false +} diff --git a/cmd/survey-api/eventing/survey_response_event_handler.go b/cmd/survey-api/eventing/survey_response_event_handler.go new file mode 100644 index 0000000..2c7e38c --- /dev/null +++ b/cmd/survey-api/eventing/survey_response_event_handler.go @@ -0,0 +1,381 @@ +// Copyright The Linux Foundation and each contributor to LFX. +// SPDX-License-Identifier: MIT + +package eventing + +import ( + "context" + "encoding/json" + "fmt" + "log/slog" + "strconv" + + indexerConstants "github.com/linuxfoundation/lfx-v2-indexer-service/pkg/constants" + "github.com/linuxfoundation/lfx-v2-survey-service/internal/domain" + "github.com/nats-io/nats.go/jetstream" +) + +// SurveyResponseDBRaw represents raw survey response data from v1 DynamoDB/NATS KV bucket +type SurveyResponseDBRaw struct { + ID string `json:"id"` + SurveyID string `json:"survey_id"` + SurveyMonkeyRespondent string `json:"survey_monkey_respondent_id"` + Email string `json:"email"` + CommitteeMemberID string `json:"committee_member_id,omitempty"` + FirstName string `json:"first_name"` + LastName string `json:"last_name"` + CreatedAt string `json:"created_at"` + ResponseDatetime string `json:"response_datetime"` + LastReceivedTime string `json:"last_received_time"` + NumAutomatedRemindersReceived int `json:"num_automated_reminders_received"` + Username string `json:"username"` + VotingStatus string `json:"voting_status"` + Role string `json:"role"` + JobTitle string `json:"job_title"` + MembershipTier string `json:"membership_tier"` + Organization domain.SurveyResponseOrgData `json:"organization"` + Project SurveyResponseProjectDBRaw `json:"project"` + CommitteeID string `json:"committee_id"` // v1 SFID + CommitteeVotingEnabled bool `json:"committee_voting_enabled"` + SurveyLink string `json:"survey_link"` + NPSValue int `json:"nps_value"` + SurveyMonkeyQuestionAnswers []domain.SurveyMonkeyQuestionAnswers `json:"survey_monkey_question_answers"` + SESMessageID string `json:"ses_message_id"` + SESBounceType string `json:"ses_bounce_type"` + SESBounceSubtype string `json:"ses_bounce_subtype"` + SESBounceDiagnosticCode string `json:"ses_bounce_diagnostic_code"` + SESComplaintExists bool `json:"ses_complaint_exists"` + SESComplaintType string `json:"ses_complaint_type"` + SESComplaintDate string `json:"ses_complaint_date"` + SESDeliverySuccessful bool `json:"ses_delivery_successful"` + EmailOpenedFirstTime string `json:"email_opened_first_time"` + EmailOpenedLastTime string `json:"email_opened_last_time"` + LinkClickedFirstTime string `json:"link_clicked_first_time"` + LinkClickedLastTime string `json:"link_clicked_last_time"` + Excluded bool `json:"excluded"` +} + +// UnmarshalJSON implements custom unmarshaling to handle both string and int inputs for numeric fields. +func (r *SurveyResponseDBRaw) UnmarshalJSON(data []byte) error { + // Use a temporary struct with interface{} types for numeric fields + tmp := struct { + ID string `json:"id"` + SurveyID string `json:"survey_id"` + SurveyMonkeyRespondent string `json:"survey_monkey_respondent_id"` + Email string `json:"email"` + CommitteeMemberID string `json:"committee_member_id,omitempty"` + FirstName string `json:"first_name"` + LastName string `json:"last_name"` + CreatedAt string `json:"created_at"` + ResponseDatetime string `json:"response_datetime"` + LastReceivedTime string `json:"last_received_time"` + NumAutomatedRemindersReceived interface{} `json:"num_automated_reminders_received"` + Username string `json:"username"` + VotingStatus string `json:"voting_status"` + Role string `json:"role"` + JobTitle string `json:"job_title"` + MembershipTier string `json:"membership_tier"` + Organization domain.SurveyResponseOrgData `json:"organization"` + Project SurveyResponseProjectDBRaw `json:"project"` + CommitteeID string `json:"committee_id"` + CommitteeVotingEnabled bool `json:"committee_voting_enabled"` + SurveyLink string `json:"survey_link"` + NPSValue interface{} `json:"nps_value"` + SurveyMonkeyQuestionAnswers []domain.SurveyMonkeyQuestionAnswers `json:"survey_monkey_question_answers"` + SESMessageID string `json:"ses_message_id"` + SESBounceType string `json:"ses_bounce_type"` + SESBounceSubtype string `json:"ses_bounce_subtype"` + SESBounceDiagnosticCode string `json:"ses_bounce_diagnostic_code"` + SESComplaintExists bool `json:"ses_complaint_exists"` + SESComplaintType string `json:"ses_complaint_type"` + SESComplaintDate string `json:"ses_complaint_date"` + SESDeliverySuccessful bool `json:"ses_delivery_successful"` + EmailOpenedFirstTime string `json:"email_opened_first_time"` + EmailOpenedLastTime string `json:"email_opened_last_time"` + LinkClickedFirstTime string `json:"link_clicked_first_time"` + LinkClickedLastTime string `json:"link_clicked_last_time"` + Excluded bool `json:"excluded"` + }{} + + if err := json.Unmarshal(data, &tmp); err != nil { + return err + } + + // Helper function to convert interface{} to int + convertToInt := func(v interface{}) (int, error) { + if v == nil { + return 0, nil + } + switch val := v.(type) { + case string: + if val == "" { + return 0, nil + } + return strconv.Atoi(val) + case float64: + return int(val), nil + case int: + return val, nil + default: + return 0, fmt.Errorf("invalid type for numeric field: %T", v) + } + } + + // Assign all fields + r.ID = tmp.ID + r.SurveyID = tmp.SurveyID + r.SurveyMonkeyRespondent = tmp.SurveyMonkeyRespondent + r.Email = tmp.Email + r.CommitteeMemberID = tmp.CommitteeMemberID + r.FirstName = tmp.FirstName + r.LastName = tmp.LastName + r.CreatedAt = tmp.CreatedAt + r.ResponseDatetime = tmp.ResponseDatetime + r.LastReceivedTime = tmp.LastReceivedTime + r.Username = tmp.Username + r.VotingStatus = tmp.VotingStatus + r.Role = tmp.Role + r.JobTitle = tmp.JobTitle + r.MembershipTier = tmp.MembershipTier + r.Organization = tmp.Organization + r.Project = tmp.Project + r.CommitteeID = tmp.CommitteeID + r.CommitteeVotingEnabled = tmp.CommitteeVotingEnabled + r.SurveyLink = tmp.SurveyLink + r.SurveyMonkeyQuestionAnswers = tmp.SurveyMonkeyQuestionAnswers + r.SESMessageID = tmp.SESMessageID + r.SESBounceType = tmp.SESBounceType + r.SESBounceSubtype = tmp.SESBounceSubtype + r.SESBounceDiagnosticCode = tmp.SESBounceDiagnosticCode + r.SESComplaintExists = tmp.SESComplaintExists + r.SESComplaintType = tmp.SESComplaintType + r.SESComplaintDate = tmp.SESComplaintDate + r.SESDeliverySuccessful = tmp.SESDeliverySuccessful + r.EmailOpenedFirstTime = tmp.EmailOpenedFirstTime + r.EmailOpenedLastTime = tmp.EmailOpenedLastTime + r.LinkClickedFirstTime = tmp.LinkClickedFirstTime + r.LinkClickedLastTime = tmp.LinkClickedLastTime + r.Excluded = tmp.Excluded + + // Convert numeric fields + var err error + if r.NumAutomatedRemindersReceived, err = convertToInt(tmp.NumAutomatedRemindersReceived); err != nil { + return fmt.Errorf("failed to convert num_automated_reminders_received: %w", err) + } + if r.NPSValue, err = convertToInt(tmp.NPSValue); err != nil { + return fmt.Errorf("failed to convert nps_value: %w", err) + } + + return nil +} + +// SurveyResponseProjectDBRaw represents raw project data from v1 +type SurveyResponseProjectDBRaw struct { + ID string `json:"id"` // v1 SFID + Name string `json:"name"` +} + +// handleSurveyResponseUpdate processes a survey response update from itx-survey-responses records +// Returns true if the message should be retried (NAK), false if it should be acknowledged (ACK) +func handleSurveyResponseUpdate( + ctx context.Context, + key string, + v1Data map[string]interface{}, + publisher domain.EventPublisher, + idMapper domain.IDMapper, + mappingsKV jetstream.KeyValue, + logger *slog.Logger, +) bool { + funcLogger := logger.With("key", key, "handler", "survey_response") + + funcLogger.DebugContext(ctx, "processing survey response update") + + // Convert v1Data map to survey response data with proper v2 format + responseData, err := convertMapToSurveyResponseData(ctx, v1Data, idMapper, funcLogger) + if err != nil { + funcLogger.With(errKey, err).ErrorContext(ctx, "failed to convert v1Data to survey response") + return false // Permanent error, ACK and skip + } + + // Extract the survey response UID + if responseData.UID == "" { + funcLogger.ErrorContext(ctx, "missing or invalid uid in survey response data") + return false // Permanent error, ACK and skip + } + funcLogger = funcLogger.With("survey_response_id", responseData.UID) + + // Check if parent survey exists in mappings before proceeding + if responseData.SurveyID == "" { + funcLogger.ErrorContext(ctx, "survey response missing required parent survey ID") + return false // Permanent error, ACK and skip + } + funcLogger = funcLogger.With("survey_id", responseData.SurveyID) + surveyMappingKey := fmt.Sprintf("survey.%s", responseData.SurveyID) + if _, err := mappingsKV.Get(ctx, surveyMappingKey); err != nil { + funcLogger.With(errKey, err).InfoContext(ctx, "parent survey not found in mappings, will retry survey response sync") + return true // NAK for retry - survey may not be processed yet + } + + // Check if parent project exists in mappings + if responseData.Project.ProjectUID == "" { + funcLogger.With("project_id", responseData.Project.ID).InfoContext(ctx, "skipping survey response sync - parent project not found in mappings") + return false // Permanent issue, ACK and skip + } + + // Determine action (created vs updated) by checking if mapping exists + mappingKey := fmt.Sprintf("survey_response.%s", responseData.UID) + indexerAction := indexerConstants.ActionCreated + if _, err := mappingsKV.Get(ctx, mappingKey); err == nil { + indexerAction = indexerConstants.ActionUpdated + } + + // Publish to indexer and FGA-sync + if err := publisher.PublishSurveyResponseEvent(ctx, string(indexerAction), responseData); err != nil { + funcLogger.With(errKey, err).ErrorContext(ctx, "failed to publish survey response event") + // Check if this is a transient error that should be retried + if isTransientError(err) { + return true // NAK for retry + } + return false // Permanent error, ACK and skip + } + + // Store mapping to track that we've seen this survey response + if _, err := mappingsKV.Put(ctx, mappingKey, []byte("1")); err != nil { + funcLogger.With(errKey, err).WarnContext(ctx, "failed to store survey response mapping") + // Don't retry on mapping storage failures + } + + funcLogger.InfoContext(ctx, "successfully sent survey response indexer and access messages") + return false // Success, ACK the message +} + +// convertMapToSurveyResponseData converts v1 survey response data to v2 format with proper types and UIDs +func convertMapToSurveyResponseData( + ctx context.Context, + v1Data map[string]interface{}, + idMapper domain.IDMapper, + logger *slog.Logger, +) (*domain.SurveyResponseData, error) { + // Convert map to JSON bytes, then to SurveyResponseDBRaw to handle string/raw fields + jsonBytes, err := json.Marshal(v1Data) + if err != nil { + return nil, fmt.Errorf("failed to marshal v1Data to JSON: %w", err) + } + + var responseDB SurveyResponseDBRaw + if err := json.Unmarshal(jsonBytes, &responseDB); err != nil { + return nil, fmt.Errorf("failed to unmarshal JSON into SurveyResponseDBRaw: %w", err) + } + + // Build v2 survey response data struct - numeric fields are now properly typed after UnmarshalJSON + responseData := &domain.SurveyResponseData{ + UID: responseDB.ID, + ID: responseDB.ID, + SurveyID: responseDB.SurveyID, + SurveyUID: responseDB.SurveyID, // survey_id becomes survey_uid in v2 + SurveyMonkeyRespondent: responseDB.SurveyMonkeyRespondent, + Email: responseDB.Email, + CommitteeMemberID: responseDB.CommitteeMemberID, + FirstName: responseDB.FirstName, + LastName: responseDB.LastName, + CreatedAt: responseDB.CreatedAt, + ResponseDatetime: responseDB.ResponseDatetime, + LastReceivedTime: responseDB.LastReceivedTime, + NumAutomatedRemindersReceived: responseDB.NumAutomatedRemindersReceived, + Username: responseDB.Username, + VotingStatus: responseDB.VotingStatus, + Role: responseDB.Role, + JobTitle: responseDB.JobTitle, + MembershipTier: responseDB.MembershipTier, + Organization: responseDB.Organization, + CommitteeID: responseDB.CommitteeID, + CommitteeVotingEnabled: responseDB.CommitteeVotingEnabled, + SurveyLink: responseDB.SurveyLink, + NPSValue: responseDB.NPSValue, + SurveyMonkeyQuestionAnswers: responseDB.SurveyMonkeyQuestionAnswers, + SESMessageID: responseDB.SESMessageID, + SESBounceType: responseDB.SESBounceType, + SESBounceSubtype: responseDB.SESBounceSubtype, + SESBounceDiagnosticCode: responseDB.SESBounceDiagnosticCode, + SESComplaintExists: responseDB.SESComplaintExists, + SESComplaintType: responseDB.SESComplaintType, + SESComplaintDate: responseDB.SESComplaintDate, + SESDeliverySuccessful: responseDB.SESDeliverySuccessful, + EmailOpenedFirstTime: responseDB.EmailOpenedFirstTime, + EmailOpenedLastTime: responseDB.EmailOpenedLastTime, + LinkClickedFirstTime: responseDB.LinkClickedFirstTime, + LinkClickedLastTime: responseDB.LinkClickedLastTime, + Excluded: responseDB.Excluded, + } + + // Process project with ID mapping + responseData.Project = domain.SurveyResponseProjectData{ + ID: responseDB.Project.ID, + Name: responseDB.Project.Name, + } + + if responseDB.Project.ID != "" { + projectUID, err := idMapper.MapProjectV1ToV2(ctx, responseDB.Project.ID) + if err != nil { + logger.With(errKey, err, "field", "project.id", "value", responseDB.Project.ID). + WarnContext(ctx, "failed to get v2 project UID from v1 project ID") + // Don't set project_uid if mapping fails - will be caught by validation + } else { + responseData.Project.ProjectUID = projectUID + } + } + + // Map v1 committee ID (SFID) to v2 committee UID + if responseDB.CommitteeID != "" { + committeeUID, err := idMapper.MapCommitteeV1ToV2(ctx, responseDB.CommitteeID) + if err != nil { + logger.With(errKey, err, "field", "committee_id", "value", responseDB.CommitteeID). + WarnContext(ctx, "failed to get v2 committee UID from v1 committee ID") + // Don't set committee_uid if mapping fails + } else { + responseData.CommitteeUID = committeeUID + } + } + + return responseData, nil +} + +// handleSurveyResponseDelete processes a survey response delete from itx-survey-responses records +// Returns true if the message should be retried (NAK), false if it should be acknowledged (ACK) +func handleSurveyResponseDelete( + ctx context.Context, + uid string, + publisher domain.EventPublisher, + mappingsKV jetstream.KeyValue, + logger *slog.Logger, +) bool { + funcLogger := logger.With("survey_response_uid", uid, "handler", "survey_response_delete") + + funcLogger.DebugContext(ctx, "processing survey response delete") + + // Create minimal survey response data for delete event + responseData := &domain.SurveyResponseData{ + UID: uid, + ID: uid, + } + + // Publish delete event to indexer and FGA-sync + if err := publisher.PublishSurveyResponseEvent(ctx, string(indexerConstants.ActionDeleted), responseData); err != nil { + funcLogger.With(errKey, err).ErrorContext(ctx, "failed to publish survey response delete event") + // Check if this is a transient error that should be retried + if isTransientError(err) { + return true // NAK for retry + } + return false // Permanent error, ACK and skip + } + + // Remove mapping from v1-mappings KV + mappingKey := fmt.Sprintf("survey_response.%s", uid) + if err := mappingsKV.Delete(ctx, mappingKey); err != nil { + funcLogger.With(errKey, err).WarnContext(ctx, "failed to delete survey response mapping") + // Don't retry on mapping deletion failures + } + + funcLogger.InfoContext(ctx, "successfully sent survey response delete indexer and access messages") + return false // Success, ACK the message +} diff --git a/cmd/survey-api/main.go b/cmd/survey-api/main.go index b907f33..32422f0 100644 --- a/cmd/survey-api/main.go +++ b/cmd/survey-api/main.go @@ -13,10 +13,12 @@ import ( "syscall" "time" + apieventing "github.com/linuxfoundation/lfx-v2-survey-service/cmd/survey-api/eventing" surveysvr "github.com/linuxfoundation/lfx-v2-survey-service/gen/http/survey/server" surveysvc "github.com/linuxfoundation/lfx-v2-survey-service/gen/survey" "github.com/linuxfoundation/lfx-v2-survey-service/internal/domain" "github.com/linuxfoundation/lfx-v2-survey-service/internal/infrastructure/auth" + "github.com/linuxfoundation/lfx-v2-survey-service/internal/infrastructure/eventing" "github.com/linuxfoundation/lfx-v2-survey-service/internal/infrastructure/idmapper" "github.com/linuxfoundation/lfx-v2-survey-service/internal/infrastructure/proxy" "github.com/linuxfoundation/lfx-v2-survey-service/internal/logging" @@ -40,6 +42,12 @@ func run() int { // Load configuration from environment cfg := loadConfig() + // Validate configuration + if err := cfg.validate(); err != nil { + fmt.Fprintf(os.Stderr, "Configuration error: %v\n", err) + return 1 + } + // Initialize structured logging logging.InitStructureLogConfig() logger := slog.Default() @@ -91,6 +99,49 @@ func run() int { idMapper = natsMapper } + // Create shutdown channel for coordinating graceful shutdown + shutdown := make(chan struct{}, 1) + + // Initialize event processor (if enabled) + var eventProcessor *apieventing.EventProcessor + var eventProcessorCtx context.Context + var eventProcessorCancel context.CancelFunc + if cfg.EventProcessingEnabled { + logger.Info("Event processing is ENABLED - initializing event processor") + ep, err := apieventing.NewEventProcessor(eventing.Config{ + NATSURL: cfg.NATSURL, + ConsumerName: cfg.EventConsumerName, + StreamName: cfg.EventStreamName, + FilterSubject: cfg.EventFilterSubject, + MaxDeliver: 3, + AckWait: 30 * time.Second, + MaxAckPending: 1000, + }, idMapper, logger) + if err != nil { + logger.Error("Failed to initialize event processor", "error", err) + return 1 + } + eventProcessor = ep + + // Create context for event processor lifecycle + eventProcessorCtx, eventProcessorCancel = context.WithCancel(context.Background()) + + // Start event processor in goroutine + go func() { + if err := eventProcessor.Start(eventProcessorCtx); err != nil { + logger.Error("Event processor error", "error", err) + // Signal shutdown instead of calling os.Exit + select { + case shutdown <- struct{}{}: + default: + } + } + }() + logger.Info("Event processor started in background") + } else { + logger.Info("Event processing is DISABLED - skipping event processor initialization") + } + // Initialize service layer surveyService := service.NewSurveyService(jwtAuth, proxyClient, idMapper, logger) @@ -146,18 +197,41 @@ func run() int { logger.Info("HTTP server listening", "addr", srv.Addr) if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed { logger.Error("HTTP server error", "error", err) - os.Exit(1) + // Signal shutdown instead of calling os.Exit + select { + case shutdown <- struct{}{}: + default: + } } }() - // Wait for interrupt signal + // Wait for interrupt signal or shutdown event quit := make(chan os.Signal, 1) signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) - <-quit + + select { + case <-quit: + logger.Info("Received interrupt signal") + case <-shutdown: + logger.Info("Received shutdown signal from background goroutine") + } logger.Info("Shutting down server...") - // Graceful shutdown with timeout + // Stop event processor first (if enabled) + if eventProcessor != nil { + logger.Info("Stopping event processor...") + // Cancel the event processor context to stop the Start method + if eventProcessorCancel != nil { + eventProcessorCancel() + } + // Then stop the consumer and cleanup resources + if err := eventProcessor.Stop(); err != nil { + logger.Error("Error stopping event processor", "error", err) + } + } + + // Graceful shutdown of HTTP server with timeout ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() @@ -172,45 +246,46 @@ func run() int { // config holds the application configuration type config struct { - Port string - JWKSURL string - Audience string - MockLocalPrincipal string - ITXBaseURL string - ITXAuth0Domain string - ITXClientID string - ITXPrivateKey string - ITXAudience string - ITXTimeout time.Duration - NATSURL string - NATSTimeout time.Duration - IDMappingDisabled bool + Port string + JWKSURL string + Audience string + MockLocalPrincipal string + ITXBaseURL string + ITXAuth0Domain string + ITXClientID string + ITXPrivateKey string + ITXAudience string + ITXTimeout time.Duration + NATSURL string + NATSTimeout time.Duration + IDMappingDisabled bool + EventProcessingEnabled bool + EventConsumerName string + EventStreamName string + EventFilterSubject string } // loadConfig loads configuration from environment variables func loadConfig() config { - cfg := config{ - Port: getEnv("PORT", "8080"), - JWKSURL: getEnv("JWKS_URL", "http://heimdall:4457/.well-known/jwks"), - Audience: getEnv("AUDIENCE", "lfx-v2-survey-service"), - MockLocalPrincipal: getEnv("JWT_AUTH_DISABLED_MOCK_LOCAL_PRINCIPAL", ""), - ITXBaseURL: getEnv("ITX_BASE_URL", "https://api.dev.itx.linuxfoundation.org/"), - ITXAuth0Domain: getEnv("ITX_AUTH0_DOMAIN", "linuxfoundation-dev.auth0.com"), - ITXClientID: getEnv("ITX_CLIENT_ID", ""), - ITXPrivateKey: getEnv("ITX_CLIENT_PRIVATE_KEY", ""), - ITXAudience: getEnv("ITX_AUDIENCE", "https://api.dev.itx.linuxfoundation.org/"), - ITXTimeout: 30 * time.Second, - NATSURL: getEnv("NATS_URL", "nats://nats:4222"), - NATSTimeout: 5 * time.Second, - IDMappingDisabled: getEnv("ID_MAPPING_DISABLED", "") == "true", + return config{ + Port: getEnv("PORT", "8080"), + JWKSURL: getEnv("JWKS_URL", "http://heimdall:4457/.well-known/jwks"), + Audience: getEnv("AUDIENCE", "lfx-v2-survey-service"), + MockLocalPrincipal: getEnv("JWT_AUTH_DISABLED_MOCK_LOCAL_PRINCIPAL", ""), + ITXBaseURL: getEnv("ITX_BASE_URL", "https://api.dev.itx.linuxfoundation.org/"), + ITXAuth0Domain: getEnv("ITX_AUTH0_DOMAIN", "linuxfoundation-dev.auth0.com"), + ITXClientID: getEnv("ITX_CLIENT_ID", ""), + ITXPrivateKey: getEnv("ITX_CLIENT_PRIVATE_KEY", ""), + ITXAudience: getEnv("ITX_AUDIENCE", "https://api.dev.itx.linuxfoundation.org/"), + ITXTimeout: 30 * time.Second, + NATSURL: getEnv("NATS_URL", "nats://nats:4222"), + NATSTimeout: 5 * time.Second, + IDMappingDisabled: getEnv("ID_MAPPING_DISABLED", "") == "true", + EventProcessingEnabled: getEnv("EVENT_PROCESSING_ENABLED", "true") == "true", + EventConsumerName: getEnv("EVENT_CONSUMER_NAME", "survey-service-kv-consumer"), + EventStreamName: getEnv("EVENT_STREAM_NAME", "KV_v1-objects"), + EventFilterSubject: getEnv("EVENT_FILTER_SUBJECT", "$KV.v1-objects.>"), } - - if err := cfg.validate(); err != nil { - slog.Error("Configuration validation failed", "error", err) - os.Exit(1) - } - - return cfg } // validate checks that required configuration values are set diff --git a/docs/event-processing.md b/docs/event-processing.md new file mode 100644 index 0000000..94f59ed --- /dev/null +++ b/docs/event-processing.md @@ -0,0 +1,375 @@ +# Event Processing + +## Overview + +The survey service implements NATS KV bucket event processing to automatically sync survey and survey response data from the v1 system to the v2 system. This enables real-time data synchronization, search indexing, and access control updates without manual intervention. + +## Architecture + +### Components + +``` +┌─────────────────┐ +│ v1 DynamoDB │ +│ (via KV) │ +└────────┬────────┘ + │ + ├─ itx-surveys:* + └─ itx-survey-responses:* + │ + v +┌─────────────────────────────────┐ +│ NATS KV Bucket: v1-objects │ +└────────┬────────────────────────┘ + │ + v +┌─────────────────────────────────┐ +│ Event Processor │ +│ (JetStream Consumer) │ +└────────┬────────────────────────┘ + │ + ├─ Transform v1 → v2 + ├─ Map IDs (v1 SFID → v2 UUID) + │ + v +┌────────────────┬────────────────┐ +│ │ │ +│ Indexer │ FGA-Sync │ +│ Service │ Service │ +│ │ │ +│ (Search Index) │ (Access Control) +└────────────────┴────────────────┘ +``` + +### Event Flow + +1. **Watch**: Event processor watches the `v1-objects` KV bucket for keys matching: + - `itx-surveys:*` - Survey data + - `itx-survey-responses:*` - Survey response data + +2. **Transform**: Converts v1 format to v2 format: + - String fields → proper types (strings to ints) + - v1 SFIDs → v2 UUIDs (committees, projects) + - Preserves all data including SurveyMonkey answers + +3. **Publish**: Sends transformed data to two downstream services: + - **Indexer Service** (`lfx.index.survey`, `lfx.index.survey_response`) + - Enables search functionality + - Includes parent references (committee, project) + - Provides access control metadata + + - **FGA-Sync Service** (`lfx.fga-sync.update_access`, `lfx.fga-sync.delete_access`) + - Updates Fine-Grained Authorization (FGA) tuples + - Manages viewer/auditor permissions + - Links surveys to committees and projects + +4. **Track**: Records processed events in `v1-mappings` KV bucket for deduplication + +## Configuration + +### Environment Variables + +| Variable | Default | Description | +|----------|---------|-------------| +| `EVENT_PROCESSING_ENABLED` | `true` | Enable/disable event processing | +| `EVENT_CONSUMER_NAME` | `survey-service-kv-consumer` | JetStream consumer name | +| `EVENT_STREAM_NAME` | `KV_v1-objects` | JetStream stream name | +| `EVENT_FILTER_SUBJECT` | `$KV.v1-objects.>` | NATS subject filter pattern | +| `NATS_URL` | `nats://nats:4222` | NATS server URL | + +### Consumer Configuration + +- **Delivery Policy**: `DeliverLastPerSubjectPolicy` - Processes the latest version of each key +- **Ack Policy**: `AckExplicitPolicy` - Requires explicit acknowledgment +- **Max Deliver**: `3` - Retries transient failures up to 3 times +- **Ack Wait**: `30s` - Timeout before message redelivery +- **Max Ack Pending**: `1000` - Maximum unacknowledged messages + +## Data Transformation + +### Survey Data + +**v1 Format (DynamoDB/KV)**: + +- All numeric fields stored as strings (e.g., `"nps_value": "8"`) +- v1 SFIDs for committees and projects +- Committee array with per-committee statistics + +**v2 Format (Transformed)**: + +- Proper types (integers, booleans) +- v2 UUIDs for committees and projects +- Mapped via IDMapper service +- Preserved committee array structure + +**Example Transformation**: + +```json +// v1 Input +{ + "id": "survey-123", + "nps_value": "8", + "total_responses": "42", + "committees": [{ + "committee_id": "a094V00000A1BcdQAF", // v1 SFID + "project_id": "a094V00000A1XyzQAF", // v1 SFID + "nps_value": "9" + }] +} + +// v2 Output +{ + "uid": "survey-123", + "nps_value": 8, // Integer + "total_responses": 42, // Integer + "committees": [{ + "committee_uid": "550e8400-e29b-41d4-a716-446655440000", // v2 UUID + "project_uid": "6ba7b810-9dad-11d1-80b4-00c04fd430c8", // v2 UUID + "nps_value": 9 + }] +} +``` + +### Survey Response Data + +**v1 Format**: Similar string-based fields, v1 references + +**v2 Format**: + +- Proper types +- Mapped project/committee/survey UIDs +- Preserved SurveyMonkey question answers (no transformation) + +## Error Handling + +### Transient Errors (Retry) + +These errors trigger NAK (negative acknowledgment) for automatic retry: + +- NATS connection timeouts +- IDMapper service unavailable +- Network failures +- Temporary downstream service outages + +**Action**: Message redelivered up to `MaxDeliver` times (3 attempts) + +### Permanent Errors (Skip) + +These errors trigger ACK to skip and move on: + +- Invalid JSON structure +- Missing required fields (e.g., empty `id`) +- No parent references (survey/response orphaned) +- Malformed data + +**Action**: Log warning and continue processing other messages + +### ID Mapping Failures + +When v1→v2 ID mapping fails: + +- Log warning with v1 ID +- Skip setting v2 UID for that reference +- Continue processing with remaining valid data +- Survey/response may be skipped if critical references missing + +## Operations + +### Starting the Service + +Event processing starts automatically when the service starts: + +```bash +# Default (event processing enabled) +./survey-api + +# Explicitly enable +EVENT_PROCESSING_ENABLED=true ./survey-api + +# Disable event processing +EVENT_PROCESSING_ENABLED=false ./survey-api +``` + +### Monitoring + +**Log Messages**: + +``` +INFO Event processing is ENABLED - initializing event processor +INFO Event processor started in background +INFO processing survey update key=itx-surveys:survey-123 +INFO successfully sent survey indexer and access messages survey_id=survey-123 +``` + +**Consumer Status**: + +```bash +# Check consumer status +nats consumer info KV_v1-objects survey-service-kv-consumer +``` + +### Lifecycle + +1. **Startup**: Event processor initializes after IDMapper +2. **Running**: Processes events in background goroutine +3. **Shutdown**: Graceful shutdown sequence: + - Context cancellation stops the consumer + - Consumer drains pending messages + - NATS connection closed + - HTTP server shutdown + +### Troubleshooting + +**No events processing**: + +- Check `EVENT_PROCESSING_ENABLED=true` +- Verify NATS connection: `NATS_URL` +- Check consumer exists: `nats consumer ls KV_v1-objects` + +**Events failing repeatedly**: + +- Check logs for permanent errors +- Verify IDMapper service is running +- Confirm indexer and FGA-sync services are available + +**Duplicate processing**: + +- Check `v1-mappings` KV bucket for tracking entries +- Verify consumer name is unique per instance + +**ID mapping failures**: + +- Ensure IDMapper service has v1↔v2 mappings populated +- Check project/committee references exist in v1 system + +## Deduplication + +The service uses the `v1-mappings` KV bucket to track processed events: + +**Key Pattern**: + +- Surveys: `survey:{uid}` +- Responses: `survey_response:{uid}` + +**Value**: Timestamp of last processing + +**Logic**: + +- If mapping exists → **UPDATE** operation +- If mapping missing → **CREATE** operation +- After processing → Store/update mapping entry + +This ensures: + +- First event creates the resource +- Subsequent events update the resource +- No duplicate resources in downstream services + +## Performance Considerations + +**Concurrency**: + +- Single consumer per service instance +- Messages processed sequentially per consumer +- Multiple service instances = parallel processing + +**Throughput**: + +- `MaxAckPending=1000` allows up to 1000 in-flight messages +- Adjust based on processing speed and resource availability + +**Backpressure**: + +- Consumer automatically pauses when `MaxAckPending` reached +- Resumes when pending count drops + +**Resource Usage**: + +- Event processor runs in background goroutine (low overhead) +- NATS connection shared with IDMapper +- Memory footprint minimal (streaming model) + +## Related Services + +### IDMapper Service + +- Maps v1 SFIDs ↔ v2 UUIDs +- Required for event processing +- Queries via NATS request-reply pattern + +### Indexer Service + +- Receives transformed survey/response data +- Indexes in OpenSearch for search functionality +- Handles `ActionCreated`, `ActionUpdated`, `ActionDeleted` + +### FGA-Sync Service + +- Receives access control updates +- Manages OpenFGA authorization tuples +- Links resources to parent entities (committees, projects) + +## Development + +### Testing Event Processing + +1. **Disable in local development**: + + ```bash + export EVENT_PROCESSING_ENABLED=false + ``` + +2. **Watch consumer activity**: + + ```bash + nats consumer next KV_v1-objects survey-service-kv-consumer --count 10 + ``` + +3. **Trigger test event**: + + ```bash + # Put test survey in v1-objects KV + nats kv put v1-objects itx-surveys:test-123 '{"id":"test-123",...}' + ``` + +4. **Check processing logs**: + + ```bash + # Look for processing messages + grep "processing survey update" logs/survey-api.log + ``` + +### Code Structure + +``` +cmd/survey-api/eventing/ +├── event_processor.go # Lifecycle management +├── kv_handler.go # Event routing by key prefix +├── survey_event_handler.go # Survey transformation logic +└── survey_response_event_handler.go # Response transformation logic + +internal/domain/ +├── event_models.go # v2 data models +└── event_publisher.go # Publisher interface + +internal/infrastructure/eventing/ +├── event_config.go # Configuration structs +└── nats_publisher.go # NATS publishing implementation +``` + +### Adding New Event Types + +To add processing for new entity types: + +1. Create handler in `cmd/survey-api/eventing/{entity}_event_handler.go` +2. Add routing logic in `kv_handler.go` +3. Define v2 model in `internal/domain/event_models.go` +4. Add publisher method in `internal/infrastructure/eventing/nats_publisher.go` +5. Update documentation + +## References + +- [NATS JetStream](https://docs.nats.io/nats-concepts/jetstream) +- [NATS KV Store](https://docs.nats.io/nats-concepts/jetstream/key-value-store) +- [OpenFGA Authorization](https://openfga.dev/) +- [Voting Service PR #8](https://github.com/linuxfoundation/lfx-v2-voting-service/pull/8) - Reference implementation diff --git a/go.mod b/go.mod index 8238565..94616dc 100644 --- a/go.mod +++ b/go.mod @@ -9,6 +9,7 @@ require ( github.com/auth0/go-auth0 v1.33.0 github.com/auth0/go-jwt-middleware/v2 v2.3.1 github.com/google/uuid v1.6.0 + github.com/linuxfoundation/lfx-v2-indexer-service v0.4.14 github.com/nats-io/nats.go v1.48.0 github.com/remychantenay/slog-otel v1.3.4 goa.design/goa/v3 v3.24.1 diff --git a/go.sum b/go.sum index 3b05315..86b9219 100644 --- a/go.sum +++ b/go.sum @@ -38,6 +38,10 @@ github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0= github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4= github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/lestrrat-go/blackmagic v1.0.3 h1:94HXkVLxkZO9vJI/w2u1T0DAoprShFd13xtnSINtDWs= github.com/lestrrat-go/blackmagic v1.0.3/go.mod h1:6AWFyKNNj0zEXQYfTMPfZrAXUWUfTIZ5ECEUEJaijtw= github.com/lestrrat-go/httpcc v1.0.1 h1:ydWCStUeJLkpYyjLDHihupbn2tYmZ7m22BGkcvZZrIE= @@ -50,6 +54,8 @@ github.com/lestrrat-go/jwx/v2 v2.1.6 h1:hxM1gfDILk/l5ylers6BX/Eq1m/pnxe9NBwW6lVf github.com/lestrrat-go/jwx/v2 v2.1.6/go.mod h1:Y722kU5r/8mV7fYDifjug0r8FK8mZdw0K0GpJw/l8pU= github.com/lestrrat-go/option v1.0.1 h1:oAzP2fvZGQKWkvHa1/SAcFolBEca1oN+mQ7eooNBEYU= github.com/lestrrat-go/option v1.0.1/go.mod h1:5ZHFbivi4xwXxhxY9XHDe2FHo6/Z7WWmtT7T5nBBp3I= +github.com/linuxfoundation/lfx-v2-indexer-service v0.4.14 h1:GQpAYAjEUNJgg/r4gxGG/teWAAp0eN5+pt/eiM3uaXo= +github.com/linuxfoundation/lfx-v2-indexer-service v0.4.14/go.mod h1:j013GdKST/hMWFhciRuzJd0sy764sNtlmO3gqmsnaCA= github.com/manveru/faker v0.0.0-20171103152722-9fbc68a78c4d h1:Zj+PHjnhRYWBK6RqCDBcAhLXoi3TzC27Zad/Vn+gnVQ= github.com/manveru/faker v0.0.0-20171103152722-9fbc68a78c4d/go.mod h1:WZy8Q5coAB1zhY9AOBJP0O6J4BuDfbupUDavKY+I3+s= github.com/manveru/gobdd v0.0.0-20131210092515-f1a17fdd710b h1:3E44bLeN8uKYdfQqVQycPnaVviZdBLbizFhU49mtbe4= @@ -64,6 +70,8 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/remychantenay/slog-otel v1.3.4 h1:xoM41ayLff2U8zlK5PH31XwD7Lk3W9wKfl4+RcmKom4= github.com/remychantenay/slog-otel v1.3.4/go.mod h1:ZkazuFMICKGDrO0r1njxKRdjTt/YcXKn6v2+0q/b0+U= +github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII= +github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o= github.com/segmentio/asm v1.2.0 h1:9BQrFxC+YOHJlTlHGkTrFWf59nbL3XnCoFLTwDCI7ys= github.com/segmentio/asm v1.2.0/go.mod h1:BqMnlJP91P8d+4ibuonYZw9mfnzI9HfxselHZr5aAcs= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= @@ -108,8 +116,9 @@ golang.org/x/text v0.33.0/go.mod h1:LuMebE6+rBincTi9+xWTY8TztLzKHc/9C1uBCG27+q8= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.41.0 h1:a9b8iMweWG+S0OBnlU36rzLp20z1Rp10w+IY2czHTQc= golang.org/x/tools v0.41.0/go.mod h1:XSY6eDqxVNiYgezAVqqCeihT4j1U2CCsqvH3WhQpnlg= -gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/dnaeon/go-vcr.v3 v3.2.0 h1:Rltp0Vf+Aq0u4rQXgmXgtgoRDStTnFN83cWgSGSoRzM= gopkg.in/dnaeon/go-vcr.v3 v3.2.0/go.mod h1:2IMOnnlx9I6u9x+YBsM3tAMx6AlOxnJ0pWxQAzZ79Ag= gopkg.in/go-jose/go-jose.v2 v2.6.3 h1:nt80fvSDlhKWQgSWyHyy5CfmlQr+asih51R8PTWNKKs= diff --git a/internal/domain/errors.go b/internal/domain/errors.go index 956a3b1..e8c489f 100644 --- a/internal/domain/errors.go +++ b/internal/domain/errors.go @@ -9,7 +9,7 @@ import "errors" type ErrorType int const ( - ErrorTypeValidation ErrorType = iota // 400 Bad Request + ErrorTypeValidation ErrorType = iota // 400 Bad Request ErrorTypeNotFound // 404 Not Found ErrorTypeConflict // 409 Conflict ErrorTypeInternal // 500 Internal Server Error diff --git a/internal/domain/event_models.go b/internal/domain/event_models.go new file mode 100644 index 0000000..9c01186 --- /dev/null +++ b/internal/domain/event_models.go @@ -0,0 +1,134 @@ +// Copyright The Linux Foundation and each contributor to LFX. +// SPDX-License-Identifier: MIT + +package domain + +// SurveyData represents v2 survey data after transformation from v1 format +type SurveyData struct { + UID string `json:"uid"` // v2 UID (same as ID) + ID string `json:"id"` // v1 ID + SurveyMonkeyID string `json:"survey_monkey_id"` + IsProjectSurvey bool `json:"is_project_survey"` + StageFilter string `json:"stage_filter"` + CreatorUsername string `json:"creator_username"` + CreatorName string `json:"creator_name"` + CreatorID string `json:"creator_id"` + CreatedAt string `json:"created_at"` + LastModifiedAt string `json:"last_modified_at"` + LastModifiedBy string `json:"last_modified_by"` + SurveyTitle string `json:"survey_title"` + SurveySendDate string `json:"survey_send_date"` + SurveyCutoffDate string `json:"survey_cutoff_date"` + SurveyReminderRateDays int `json:"survey_reminder_rate_days"` + SendImmediately bool `json:"send_immediately"` + EmailSubject string `json:"email_subject"` + EmailBody string `json:"email_body"` + EmailBodyText string `json:"email_body_text"` + CommitteeCategory string `json:"committee_category"` + Committees []SurveyCommitteeData `json:"committees"` + CommitteeVotingEnabled bool `json:"committee_voting_enabled"` + SurveyStatus string `json:"survey_status"` + NPSValue int `json:"nps_value"` + NumPromoters int `json:"num_promoters"` + NumPassives int `json:"num_passives"` + NumDetractors int `json:"num_detractors"` + TotalRecipients int `json:"total_recipients"` + TotalSentRecipients int `json:"total_recipients_sent"` + TotalResponses int `json:"total_responses"` + TotalRecipientsOpened int `json:"total_recipients_opened"` + TotalRecipientsClicked int `json:"total_recipients_clicked"` + TotalDeliveryErrors int `json:"total_delivery_errors"` + IsNPSSurvey bool `json:"is_nps_survey"` + CollectorURL string `json:"collector_url"` +} + +// SurveyCommitteeData represents committee data with v2 UIDs +type SurveyCommitteeData struct { + CommitteeUID string `json:"committee_uid"` // v2 UID + CommitteeID string `json:"committee_id"` // v1 SFID + CommitteeName string `json:"committee_name"` + ProjectUID string `json:"project_uid"` // v2 UID + ProjectID string `json:"project_id"` // v1 SFID + ProjectName string `json:"project_name"` + NPSValue int `json:"nps_value"` + NumPromoters int `json:"num_promoters"` + NumPassives int `json:"num_passives"` + NumDetractors int `json:"num_detractors"` + TotalRecipients int `json:"total_recipients"` + TotalSentRecipients int `json:"total_recipients_sent"` + TotalResponses int `json:"total_responses"` + TotalRecipientsOpened int `json:"total_recipients_opened"` + TotalRecipientsClicked int `json:"total_recipients_clicked"` + TotalDeliveryErrors int `json:"total_delivery_errors"` +} + +// SurveyResponseData represents v2 survey response data after transformation from v1 format +type SurveyResponseData struct { + UID string `json:"uid"` // v2 UID (same as ID) + ID string `json:"id"` // v1 ID + SurveyID string `json:"survey_id"` // v1 survey ID + SurveyUID string `json:"survey_uid"` // v2 survey UID + SurveyMonkeyRespondent string `json:"survey_monkey_respondent_id"` + Email string `json:"email"` + CommitteeMemberID string `json:"committee_member_id,omitempty"` + FirstName string `json:"first_name"` + LastName string `json:"last_name"` + CreatedAt string `json:"created_at"` + ResponseDatetime string `json:"response_datetime"` + LastReceivedTime string `json:"last_received_time"` + NumAutomatedRemindersReceived int `json:"num_automated_reminders_received"` + Username string `json:"username"` + VotingStatus string `json:"voting_status"` + Role string `json:"role"` + JobTitle string `json:"job_title"` + MembershipTier string `json:"membership_tier"` + Organization SurveyResponseOrgData `json:"organization"` + Project SurveyResponseProjectData `json:"project"` + CommitteeUID string `json:"committee_uid"` // v2 UID + CommitteeID string `json:"committee_id"` // v1 SFID + CommitteeVotingEnabled bool `json:"committee_voting_enabled"` + SurveyLink string `json:"survey_link"` + NPSValue int `json:"nps_value"` + SurveyMonkeyQuestionAnswers []SurveyMonkeyQuestionAnswers `json:"survey_monkey_question_answers"` + SESMessageID string `json:"ses_message_id"` + SESBounceType string `json:"ses_bounce_type"` + SESBounceSubtype string `json:"ses_bounce_subtype"` + SESBounceDiagnosticCode string `json:"ses_bounce_diagnostic_code"` + SESComplaintExists bool `json:"ses_complaint_exists"` + SESComplaintType string `json:"ses_complaint_type"` + SESComplaintDate string `json:"ses_complaint_date"` + SESDeliverySuccessful bool `json:"ses_delivery_successful"` + EmailOpenedFirstTime string `json:"email_opened_first_time"` + EmailOpenedLastTime string `json:"email_opened_last_time"` + LinkClickedFirstTime string `json:"link_clicked_first_time"` + LinkClickedLastTime string `json:"link_clicked_last_time"` + Excluded bool `json:"excluded"` +} + +// SurveyMonkeyQuestionAnswers contains a SurveyMonkey response +type SurveyMonkeyQuestionAnswers struct { + QuestionID string `json:"question_id"` + QuestionText string `json:"question_text"` + QuestionFamily string `json:"question_family"` + QuestionSubtype string `json:"question_subtype"` + Answers []SurveyMonkeyAnswer `json:"answers"` +} + +// SurveyMonkeyAnswer contains a SurveyMonkey answer to a question +type SurveyMonkeyAnswer struct { + ChoiceID string `json:"choice_id"` + Text string `json:"text"` +} + +// SurveyResponseProjectData contains project data with v2 UIDs +type SurveyResponseProjectData struct { + ProjectUID string `json:"project_uid"` // v2 UID + ID string `json:"id"` // v1 SFID + Name string `json:"name"` +} + +// SurveyResponseOrgData contains organization data +type SurveyResponseOrgData struct { + ID string `json:"id"` + Name string `json:"name"` +} diff --git a/internal/domain/event_publisher.go b/internal/domain/event_publisher.go new file mode 100644 index 0000000..f40c3d7 --- /dev/null +++ b/internal/domain/event_publisher.go @@ -0,0 +1,21 @@ +// Copyright The Linux Foundation and each contributor to LFX. +// SPDX-License-Identifier: MIT + +package domain + +import "context" + +// EventPublisher defines the interface for publishing survey and survey response events +// to the indexer and FGA-sync services +type EventPublisher interface { + // PublishSurveyEvent publishes a survey event to indexer and FGA-sync + // action should be "created", "updated", or "deleted" + PublishSurveyEvent(ctx context.Context, action string, survey *SurveyData) error + + // PublishSurveyResponseEvent publishes a survey response event to indexer and FGA-sync + // action should be "created", "updated", or "deleted" + PublishSurveyResponseEvent(ctx context.Context, action string, response *SurveyResponseData) error + + // Close closes the publisher connection + Close() error +} diff --git a/internal/infrastructure/eventing/event_config.go b/internal/infrastructure/eventing/event_config.go new file mode 100644 index 0000000..fc2d335 --- /dev/null +++ b/internal/infrastructure/eventing/event_config.go @@ -0,0 +1,17 @@ +// Copyright The Linux Foundation and each contributor to LFX. +// SPDX-License-Identifier: MIT + +package eventing + +import "time" + +// Config holds the configuration for the event processor +type Config struct { + NATSURL string + ConsumerName string + StreamName string + FilterSubject string + MaxDeliver int + AckWait time.Duration + MaxAckPending int +} diff --git a/internal/infrastructure/eventing/nats_publisher.go b/internal/infrastructure/eventing/nats_publisher.go new file mode 100644 index 0000000..e3c6653 --- /dev/null +++ b/internal/infrastructure/eventing/nats_publisher.go @@ -0,0 +1,380 @@ +// Copyright The Linux Foundation and each contributor to LFX. +// SPDX-License-Identifier: MIT + +package eventing + +import ( + "context" + "encoding/json" + "fmt" + "log/slog" + "slices" + + indexerConstants "github.com/linuxfoundation/lfx-v2-indexer-service/pkg/constants" + indexerTypes "github.com/linuxfoundation/lfx-v2-indexer-service/pkg/types" + "github.com/linuxfoundation/lfx-v2-survey-service/internal/domain" + "github.com/nats-io/nats.go" +) + +// NATS subject constants for survey operations +const ( + // IndexSurveySubject is the subject for survey indexing + IndexSurveySubject = "lfx.index.survey" + + // IndexSurveyResponseSubject is the subject for survey response indexing + IndexSurveyResponseSubject = "lfx.index.survey_response" + + // UpdateAccessSubject is the subject for FGA access control updates + UpdateAccessSubject = "lfx.fga-sync.update_access" + + // DeleteAccessSubject is the subject for FGA access control deletions + DeleteAccessSubject = "lfx.fga-sync.delete_access" +) + +// GenericFGAMessage represents a generic FGA message +type GenericFGAMessage struct { + ObjectType string `json:"object_type"` + Operation string `json:"operation"` + Data map[string]interface{} `json:"data"` +} + +// NATSPublisher implements the EventPublisher interface +type NATSPublisher struct { + conn *nats.Conn + logger *slog.Logger +} + +// NewNATSPublisher creates a new NATS publisher +func NewNATSPublisher(conn *nats.Conn, logger *slog.Logger) *NATSPublisher { + return &NATSPublisher{ + conn: conn, + logger: logger, + } +} + +// PublishSurveyEvent publishes a survey event to indexer and FGA-sync +func (p *NATSPublisher) PublishSurveyEvent(ctx context.Context, action string, survey *domain.SurveyData) error { + // Send to indexer + if err := p.sendSurveyIndexerMessage(ctx, IndexSurveySubject, indexerConstants.MessageAction(action), survey); err != nil { + return fmt.Errorf("failed to send survey indexer message: %w", err) + } + + // Send to FGA-sync - different message for delete vs create/update + if action == string(indexerConstants.ActionDeleted) { + if err := p.sendDeleteAccessMessage("survey", survey.UID); err != nil { + return fmt.Errorf("failed to send survey delete access message: %w", err) + } + } else { + if err := p.sendSurveyAccessMessage(survey); err != nil { + return fmt.Errorf("failed to send survey access message: %w", err) + } + } + + return nil +} + +// PublishSurveyResponseEvent publishes a survey response event to indexer and FGA-sync +func (p *NATSPublisher) PublishSurveyResponseEvent(ctx context.Context, action string, response *domain.SurveyResponseData) error { + // Send to indexer + if err := p.sendSurveyResponseIndexerMessage(ctx, IndexSurveyResponseSubject, indexerConstants.MessageAction(action), response); err != nil { + return fmt.Errorf("failed to send survey response indexer message: %w", err) + } + + // Send to FGA-sync - different message for delete vs create/update + if action == string(indexerConstants.ActionDeleted) { + if err := p.sendDeleteAccessMessage("survey_response", response.UID); err != nil { + return fmt.Errorf("failed to send survey response delete access message: %w", err) + } + } else { + if err := p.sendSurveyResponseAccessMessage(response); err != nil { + return fmt.Errorf("failed to send survey response access message: %w", err) + } + } + + return nil +} + +// Close closes the publisher connection +func (p *NATSPublisher) Close() error { + // NATS connection is managed by the event processor, so we don't close it here + return nil +} + +// appendIfNotExists adds a value to a slice only if it doesn't already exist +func appendIfNotExists(slice []string, value string) []string { + if !slices.Contains(slice, value) { + return append(slice, value) + } + return slice +} + +// sendSurveyIndexerMessage routes to the appropriate indexer message handler based on action +func (p *NATSPublisher) sendSurveyIndexerMessage(ctx context.Context, subject string, action indexerConstants.MessageAction, data *domain.SurveyData) error { + // Build IndexingConfig (needed for both create/update and delete) + nameAndAliases := []string{} + parentRefs := []string{} + + if data.SurveyTitle != "" { + nameAndAliases = append(nameAndAliases, data.SurveyTitle) + } + + // Add committee and project references from committees array + for _, committee := range data.Committees { + if committee.CommitteeUID != "" { + parentRefs = append(parentRefs, fmt.Sprintf("committee:%s", committee.CommitteeUID)) + } + if committee.ProjectUID != "" { + projectRef := fmt.Sprintf("project:%s", committee.ProjectUID) + parentRefs = appendIfNotExists(parentRefs, projectRef) + } + } + + indexingConfig := &indexerTypes.IndexingConfig{ + ObjectID: data.UID, + AccessCheckObject: fmt.Sprintf("survey:%s", data.UID), + AccessCheckRelation: "viewer", + HistoryCheckObject: fmt.Sprintf("survey:%s", data.UID), + HistoryCheckRelation: "auditor", + SortName: data.SurveyTitle, + NameAndAliases: nameAndAliases, + ParentRefs: parentRefs, + Fulltext: data.SurveyTitle, + } + + if action == indexerConstants.ActionDeleted { + return p.sendIndexerDeleteMessage(ctx, subject, action, data.UID, indexingConfig) + } + + return p.sendIndexerCreateUpdateMessage(ctx, subject, action, data, indexingConfig) +} + +// sendSurveyAccessMessage sends the message to the NATS server for the survey access control +func (p *NATSPublisher) sendSurveyAccessMessage(survey *domain.SurveyData) error { + // Build committee and project references + committeeRefs := []string{} + projectRefs := []string{} + + for _, committee := range survey.Committees { + if committee.CommitteeUID != "" { + committeeRefs = append(committeeRefs, committee.CommitteeUID) + } + if committee.ProjectUID != "" { + projectRefs = appendIfNotExists(projectRefs, committee.ProjectUID) + } + } + + references := map[string][]string{} + if len(committeeRefs) > 0 { + references["committee"] = committeeRefs + } + if len(projectRefs) > 0 { + references["project"] = projectRefs + } + + // Skip sending access message if there are no references + if len(references) == 0 { + return nil + } + + accessMsg := GenericFGAMessage{ + ObjectType: "survey", + Operation: "update_access", + Data: map[string]interface{}{ + "uid": survey.UID, + "public": false, + "references": references, + }, + } + + accessMsgBytes, err := json.Marshal(accessMsg) + if err != nil { + return fmt.Errorf("failed to marshal access message: %w", err) + } + + // Publish the message to NATS + if err := p.conn.Publish(UpdateAccessSubject, accessMsgBytes); err != nil { + return fmt.Errorf("failed to publish access message to subject %s: %w", UpdateAccessSubject, err) + } + + return nil +} + +// sendSurveyResponseIndexerMessage routes to the appropriate indexer message handler based on action +func (p *NATSPublisher) sendSurveyResponseIndexerMessage(ctx context.Context, subject string, action indexerConstants.MessageAction, data *domain.SurveyResponseData) error { + // Build IndexingConfig (needed for both create/update and delete) + nameAndAliases := []string{} + parentRefs := []string{} + + if data.Email != "" { + nameAndAliases = append(nameAndAliases, data.Email) + } + if data.Project.ProjectUID != "" { + parentRefs = append(parentRefs, fmt.Sprintf("project:%s", data.Project.ProjectUID)) + } + if data.SurveyUID != "" { + parentRefs = append(parentRefs, fmt.Sprintf("survey:%s", data.SurveyUID)) + } + + indexingConfig := &indexerTypes.IndexingConfig{ + ObjectID: data.UID, + AccessCheckObject: fmt.Sprintf("survey:%s", data.SurveyUID), + AccessCheckRelation: "viewer", + HistoryCheckObject: fmt.Sprintf("survey_response:%s", data.UID), + HistoryCheckRelation: "auditor", + SortName: data.Email, + NameAndAliases: nameAndAliases, + ParentRefs: parentRefs, + Fulltext: fmt.Sprintf("%s %s %s", data.Email, data.FirstName, data.LastName), + } + + if action == indexerConstants.ActionDeleted { + return p.sendIndexerDeleteMessage(ctx, subject, action, data.UID, indexingConfig) + } + + return p.sendIndexerCreateUpdateMessage(ctx, subject, action, data, indexingConfig) +} + +// sendSurveyResponseAccessMessage sends the message to the NATS server for the survey response access control +func (p *NATSPublisher) sendSurveyResponseAccessMessage(data *domain.SurveyResponseData) error { + relations := map[string][]string{} + if data.Username != "" { + relations["writer"] = []string{data.Username} + relations["viewer"] = []string{data.Username} + } + + references := map[string][]string{} + if data.Project.ProjectUID != "" { + references["project"] = []string{data.Project.ProjectUID} + } + if data.SurveyUID != "" { + references["survey"] = []string{data.SurveyUID} + } + + // Skip sending access message if there are no relations or references + if len(relations) == 0 && len(references) == 0 { + return nil + } + + accessMsg := GenericFGAMessage{ + ObjectType: "survey_response", + Operation: "update_access", + Data: map[string]interface{}{ + "uid": data.UID, + "public": false, + "relations": relations, + "references": references, + }, + } + + accessMsgBytes, err := json.Marshal(accessMsg) + if err != nil { + return fmt.Errorf("failed to marshal access message: %w", err) + } + + // Publish the message to NATS + if err := p.conn.Publish(UpdateAccessSubject, accessMsgBytes); err != nil { + return fmt.Errorf("failed to publish access message to subject %s: %w", UpdateAccessSubject, err) + } + + return nil +} + +// sendDeleteAccessMessage sends a delete access message to FGA-sync +func (p *NATSPublisher) sendDeleteAccessMessage(objectType string, uid string) error { + // Construct delete access message + deleteMsg := GenericFGAMessage{ + ObjectType: objectType, + Operation: "delete_access", + Data: map[string]interface{}{ + "uid": uid, + }, + } + + deleteMsgBytes, err := json.Marshal(deleteMsg) + if err != nil { + return fmt.Errorf("failed to marshal delete access message: %w", err) + } + + // Publish the message to NATS + if err := p.conn.Publish(DeleteAccessSubject, deleteMsgBytes); err != nil { + return fmt.Errorf("failed to publish delete access message to subject %s: %w", DeleteAccessSubject, err) + } + + return nil +} + +// sendIndexerDeleteMessage sends a generic delete message to the indexer with just the UID +func (p *NATSPublisher) sendIndexerDeleteMessage(ctx context.Context, subject string, action indexerConstants.MessageAction, uid string, indexingConfig *indexerTypes.IndexingConfig) error { + headers := p.buildHeaders(ctx) + + message := indexerTypes.IndexerMessageEnvelope{ + Action: action, + Headers: headers, + Data: uid, + IndexingConfig: indexingConfig, + } + + messageBytes, err := json.Marshal(message) + if err != nil { + return fmt.Errorf("failed to marshal indexer delete message for subject %s: %w", subject, err) + } + + p.logger.With("subject", subject, "action", action, "uid", uid).DebugContext(ctx, "constructed indexer delete message") + + // Publish the message to NATS + if err := p.conn.Publish(subject, messageBytes); err != nil { + return fmt.Errorf("failed to publish indexer delete message to subject %s: %w", subject, err) + } + + return nil +} + +// sendIndexerCreateUpdateMessage sends a generic create/update message to the indexer with full object and IndexingConfig +func (p *NATSPublisher) sendIndexerCreateUpdateMessage(ctx context.Context, subject string, action indexerConstants.MessageAction, data interface{}, indexingConfig *indexerTypes.IndexingConfig) error { + headers := p.buildHeaders(ctx) + + public := false + indexingConfig.Public = &public + + // Construct the indexer message + message := indexerTypes.IndexerMessageEnvelope{ + Action: action, + Headers: headers, + Data: data, + IndexingConfig: indexingConfig, + } + + messageBytes, err := json.Marshal(message) + if err != nil { + return fmt.Errorf("failed to marshal indexer message for subject %s: %w", subject, err) + } + + p.logger.With("subject", subject, "action", action).DebugContext(ctx, "constructed indexer message") + + // Publish the message to NATS + if err := p.conn.Publish(subject, messageBytes); err != nil { + return fmt.Errorf("failed to publish indexer message to subject %s: %w", subject, err) + } + + return nil +} + +// buildHeaders extracts headers from context for NATS messages +func (p *NATSPublisher) buildHeaders(ctx context.Context) map[string]string { + headers := make(map[string]string) + + // Extract authorization from context if available + if authorization, ok := ctx.Value("authorization").(string); ok { + headers["authorization"] = authorization + } else { + // Fallback for system-generated events + headers["authorization"] = "Bearer survey-service" + } + + // Extract principal from context if available + if principal, ok := ctx.Value("principal").(string); ok { + headers["x-on-behalf-of"] = principal + } + + return headers +} diff --git a/pkg/models/itx/models.go b/pkg/models/itx/models.go index 01238cd..d83ef1a 100644 --- a/pkg/models/itx/models.go +++ b/pkg/models/itx/models.go @@ -13,62 +13,62 @@ type GetSurveyParams struct { // ScheduleSurveyRequest represents the request to schedule a survey in ITX type ScheduleSurveyRequest struct { - IsProjectSurvey *bool `json:"is_project_survey,omitempty"` - StageFilter *string `json:"stage_filter,omitempty"` - CreatorUsername *string `json:"creator_username,omitempty"` - CreatorName *string `json:"creator_name,omitempty"` - CreatorID *string `json:"creator_id,omitempty"` - SurveyMonkeyID *string `json:"survey_monkey_id,omitempty"` - SurveyTitle *string `json:"survey_title,omitempty"` - SendImmediately *bool `json:"send_immediately,omitempty"` - SurveySendDate *string `json:"survey_send_date,omitempty"` // RFC3339 string - SurveyCutoffDate *string `json:"survey_cutoff_date,omitempty"` // RFC3339 string - SurveyReminderRateDays *int `json:"survey_reminder_rate_days,omitempty"` - EmailSubject *string `json:"email_subject,omitempty"` - EmailBody *string `json:"email_body,omitempty"` // HTML - EmailBodyText *string `json:"email_body_text,omitempty"` // Plain text - Committees []string `json:"committees,omitempty"` - CommitteeVotingEnabled *bool `json:"committee_voting_enabled,omitempty"` + IsProjectSurvey *bool `json:"is_project_survey,omitempty"` + StageFilter *string `json:"stage_filter,omitempty"` + CreatorUsername *string `json:"creator_username,omitempty"` + CreatorName *string `json:"creator_name,omitempty"` + CreatorID *string `json:"creator_id,omitempty"` + SurveyMonkeyID *string `json:"survey_monkey_id,omitempty"` + SurveyTitle *string `json:"survey_title,omitempty"` + SendImmediately *bool `json:"send_immediately,omitempty"` + SurveySendDate *string `json:"survey_send_date,omitempty"` // RFC3339 string + SurveyCutoffDate *string `json:"survey_cutoff_date,omitempty"` // RFC3339 string + SurveyReminderRateDays *int `json:"survey_reminder_rate_days,omitempty"` + EmailSubject *string `json:"email_subject,omitempty"` + EmailBody *string `json:"email_body,omitempty"` // HTML + EmailBodyText *string `json:"email_body_text,omitempty"` // Plain text + Committees []string `json:"committees,omitempty"` + CommitteeVotingEnabled *bool `json:"committee_voting_enabled,omitempty"` } // SurveyScheduleResponse represents the response from scheduling a survey type SurveyScheduleResponse struct { - ID string `json:"id"` - SurveyMonkeyID *string `json:"survey_monkey_id,omitempty"` - IsProjectSurvey *bool `json:"is_project_survey,omitempty"` - StageFilter *string `json:"stage_filter,omitempty"` - CreatorUsername *string `json:"creator_username,omitempty"` - CreatorName *string `json:"creator_name,omitempty"` - CreatorID *string `json:"creator_id,omitempty"` - CreatedAt *string `json:"created_at,omitempty"` // RFC3339 string - LastModifiedAt *string `json:"last_modified_at,omitempty"` // RFC3339 string - LastModifiedBy *string `json:"last_modified_by,omitempty"` - SurveyTitle *string `json:"survey_title,omitempty"` - SurveyStatus string `json:"survey_status"` // scheduled, sending, sent, cancelled - ResponseStatus *string `json:"response_status,omitempty"` // scheduled, open, closed - SurveySendDate *string `json:"survey_send_date,omitempty"` // RFC3339 string - SurveyCutoffDate *string `json:"survey_cutoff_date,omitempty"` // RFC3339 string - SurveyReminderRateDays *int `json:"survey_reminder_rate_days,omitempty"` - EmailSubject *string `json:"email_subject,omitempty"` - EmailBody *string `json:"email_body,omitempty"` // HTML - EmailBodyText *string `json:"email_body_text,omitempty"` // Plain text - CommitteeCategory *string `json:"committee_category,omitempty"` - Committees []SurveyCommittee `json:"committees,omitempty"` - CommitteeVotingEnabled *bool `json:"committee_voting_enabled,omitempty"` - SurveyURL *string `json:"survey_url,omitempty"` - SendImmediately *bool `json:"send_immediately,omitempty"` - TotalRecipients *int `json:"total_recipients,omitempty"` - TotalResponses *int `json:"total_responses,omitempty"` - IsNPSSurvey *bool `json:"is_nps_survey,omitempty"` - NPSValue *float64 `json:"nps_value,omitempty"` - NumPromoters *int `json:"num_promoters,omitempty"` - NumPassives *int `json:"num_passives,omitempty"` - NumDetractors *int `json:"num_detractors,omitempty"` - TotalBouncedEmails *int `json:"total_bounced_emails,omitempty"` - NumAutomatedRemindersToSend *int `json:"num_automated_reminders_to_send,omitempty"` - NumAutomatedRemindersSent *int `json:"num_automated_reminders_sent,omitempty"` - NextAutomatedReminderAt *string `json:"next_automated_reminder_at,omitempty"` // RFC3339 string - LatestAutomatedReminderSentAt *string `json:"latest_automated_reminder_sent_at,omitempty"` // RFC3339 string + ID string `json:"id"` + SurveyMonkeyID *string `json:"survey_monkey_id,omitempty"` + IsProjectSurvey *bool `json:"is_project_survey,omitempty"` + StageFilter *string `json:"stage_filter,omitempty"` + CreatorUsername *string `json:"creator_username,omitempty"` + CreatorName *string `json:"creator_name,omitempty"` + CreatorID *string `json:"creator_id,omitempty"` + CreatedAt *string `json:"created_at,omitempty"` // RFC3339 string + LastModifiedAt *string `json:"last_modified_at,omitempty"` // RFC3339 string + LastModifiedBy *string `json:"last_modified_by,omitempty"` + SurveyTitle *string `json:"survey_title,omitempty"` + SurveyStatus string `json:"survey_status"` // scheduled, sending, sent, cancelled + ResponseStatus *string `json:"response_status,omitempty"` // scheduled, open, closed + SurveySendDate *string `json:"survey_send_date,omitempty"` // RFC3339 string + SurveyCutoffDate *string `json:"survey_cutoff_date,omitempty"` // RFC3339 string + SurveyReminderRateDays *int `json:"survey_reminder_rate_days,omitempty"` + EmailSubject *string `json:"email_subject,omitempty"` + EmailBody *string `json:"email_body,omitempty"` // HTML + EmailBodyText *string `json:"email_body_text,omitempty"` // Plain text + CommitteeCategory *string `json:"committee_category,omitempty"` + Committees []SurveyCommittee `json:"committees,omitempty"` + CommitteeVotingEnabled *bool `json:"committee_voting_enabled,omitempty"` + SurveyURL *string `json:"survey_url,omitempty"` + SendImmediately *bool `json:"send_immediately,omitempty"` + TotalRecipients *int `json:"total_recipients,omitempty"` + TotalResponses *int `json:"total_responses,omitempty"` + IsNPSSurvey *bool `json:"is_nps_survey,omitempty"` + NPSValue *float64 `json:"nps_value,omitempty"` + NumPromoters *int `json:"num_promoters,omitempty"` + NumPassives *int `json:"num_passives,omitempty"` + NumDetractors *int `json:"num_detractors,omitempty"` + TotalBouncedEmails *int `json:"total_bounced_emails,omitempty"` + NumAutomatedRemindersToSend *int `json:"num_automated_reminders_to_send,omitempty"` + NumAutomatedRemindersSent *int `json:"num_automated_reminders_sent,omitempty"` + NextAutomatedReminderAt *string `json:"next_automated_reminder_at,omitempty"` // RFC3339 string + LatestAutomatedReminderSentAt *string `json:"latest_automated_reminder_sent_at,omitempty"` // RFC3339 string } // SurveyCommittee represents a committee associated with a survey @@ -87,8 +87,8 @@ type SurveyCommittee struct { type UpdateSurveyRequest struct { CreatorID *string `json:"creator_id,omitempty"` SurveyTitle *string `json:"survey_title,omitempty"` - SurveySendDate *string `json:"survey_send_date,omitempty"` // RFC3339 string - SurveyCutoffDate *string `json:"survey_cutoff_date,omitempty"` // RFC3339 string + SurveySendDate *string `json:"survey_send_date,omitempty"` // RFC3339 string + SurveyCutoffDate *string `json:"survey_cutoff_date,omitempty"` // RFC3339 string SurveyReminderRateDays *int `json:"survey_reminder_rate_days,omitempty"` EmailSubject *string `json:"email_subject,omitempty"` EmailBody *string `json:"email_body,omitempty"` @@ -109,27 +109,27 @@ type BulkResendRequest struct { // PreviewSendResponse represents the response from preview_send endpoint type PreviewSendResponse struct { - AffectedProjects []LFXProject `json:"affected_projects,omitempty"` - AffectedCommittees []ExcludedCommittee `json:"affected_committees,omitempty"` - AffectedRecipients []ITXPreviewRecipient `json:"affected_recipients,omitempty"` + AffectedProjects []LFXProject `json:"affected_projects,omitempty"` + AffectedCommittees []ExcludedCommittee `json:"affected_committees,omitempty"` + AffectedRecipients []ITXPreviewRecipient `json:"affected_recipients,omitempty"` } // LFXProject represents a project in the preview send response type LFXProject struct { - ID string `json:"id"` - Name string `json:"name"` - Slug string `json:"slug"` - Status string `json:"status"` + ID string `json:"id"` + Name string `json:"name"` + Slug string `json:"slug"` + Status string `json:"status"` LogoURL *string `json:"logo_url,omitempty"` } // ExcludedCommittee represents a committee in the preview send response type ExcludedCommittee struct { - ProjectID string `json:"project_id"` - ProjectName string `json:"project_name"` - CommitteeID string `json:"committee_id"` - CommitteeName string `json:"committee_name"` - CommitteeCategory string `json:"committee_category"` + ProjectID string `json:"project_id"` + ProjectName string `json:"project_name"` + CommitteeID string `json:"committee_id"` + CommitteeName string `json:"committee_name"` + CommitteeCategory string `json:"committee_category"` } // ITXPreviewRecipient represents a recipient in the preview send response @@ -145,26 +145,26 @@ type ITXPreviewRecipient struct { // SurveyResults represents aggregated survey results type SurveyResults struct { - SurveyResults []SurveyResultItem `json:"survey_results"` - CommentResults []CommentResult `json:"comment_results,omitempty"` - NumRecipients int `json:"num_recipients"` - NumResponses int `json:"num_responses"` - SurveyEndTime *time.Time `json:"survey_end_time,omitempty"` + SurveyResults []SurveyResultItem `json:"survey_results"` + CommentResults []CommentResult `json:"comment_results,omitempty"` + NumRecipients int `json:"num_recipients"` + NumResponses int `json:"num_responses"` + SurveyEndTime *time.Time `json:"survey_end_time,omitempty"` } // SurveyResultItem represents results for a single survey question type SurveyResultItem struct { - QuestionID string `json:"question_id"` - QuestionText string `json:"question_text"` - QuestionType string `json:"question_type"` - Responses []QuestionResponse `json:"responses"` + QuestionID string `json:"question_id"` + QuestionText string `json:"question_text"` + QuestionType string `json:"question_type"` + Responses []QuestionResponse `json:"responses"` } // QuestionResponse represents a response summary for a question type QuestionResponse struct { - Answer string `json:"answer"` - Count int `json:"count"` - Percentage float64 `json:"percentage"` + Answer string `json:"answer"` + Count int `json:"count"` + Percentage float64 `json:"percentage"` } // CommentResult represents comment/text responses @@ -176,23 +176,23 @@ type CommentResult struct { // SurveyResponse represents a response from a survey participant type SurveyResponse struct { - SurveyResponseUID string `json:"survey_response_uid"` - SurveyUID string `json:"survey_uid"` - ProjectUID string `json:"project_uid"` - ResponseStatus string `json:"response_status"` // submitted, in_progress - SubmittedAt *time.Time `json:"submitted_at,omitempty"` - UserName *string `json:"user_name,omitempty"` - UserEmail *string `json:"user_email,omitempty"` - Answers []SurveyAnswer `json:"answers,omitempty"` + SurveyResponseUID string `json:"survey_response_uid"` + SurveyUID string `json:"survey_uid"` + ProjectUID string `json:"project_uid"` + ResponseStatus string `json:"response_status"` // submitted, in_progress + SubmittedAt *time.Time `json:"submitted_at,omitempty"` + UserName *string `json:"user_name,omitempty"` + UserEmail *string `json:"user_email,omitempty"` + Answers []SurveyAnswer `json:"answers,omitempty"` } // SurveyAnswer represents an answer to a survey question type SurveyAnswer struct { - QuestionID string `json:"question_id"` - AnswerText *string `json:"answer_text,omitempty"` // For text questions - ChoiceIDs []string `json:"choice_ids,omitempty"` // For multiple_choice - RatingValue *int `json:"rating_value,omitempty"` // For rating questions - YesNoValue *bool `json:"yes_no_value,omitempty"` // For yes_no questions + QuestionID string `json:"question_id"` + AnswerText *string `json:"answer_text,omitempty"` // For text questions + ChoiceIDs []string `json:"choice_ids,omitempty"` // For multiple_choice + RatingValue *int `json:"rating_value,omitempty"` // For rating questions + YesNoValue *bool `json:"yes_no_value,omitempty"` // For yes_no questions } // CreateSurveyResponseRequest represents the request to submit a survey response @@ -244,9 +244,9 @@ type UserEmail struct { // ExclusionUser represents the user information in an extended exclusion type ExclusionUser struct { - ID *string `json:"id,omitempty"` - Username *string `json:"username,omitempty"` - Emails []UserEmail `json:"emails,omitempty"` + ID *string `json:"id,omitempty"` + Username *string `json:"username,omitempty"` + Emails []UserEmail `json:"emails,omitempty"` } // ExtendedExclusion represents an exclusion with user information diff --git a/pkg/utils/string.go b/pkg/utils/string.go new file mode 100644 index 0000000..93e356c --- /dev/null +++ b/pkg/utils/string.go @@ -0,0 +1,21 @@ +// Copyright The Linux Foundation and each contributor to LFX. +// SPDX-License-Identifier: MIT + +package utils + +// Contains checks if a string contains a substring +func Contains(s, substr string) bool { + return len(s) >= len(substr) && (s == substr || len(s) > len(substr) && (s[:len(substr)] == substr || + s[len(s)-len(substr):] == substr || + containsMiddle(s, substr))) +} + +// containsMiddle checks if substr appears in the middle of s +func containsMiddle(s, substr string) bool { + for i := 0; i <= len(s)-len(substr); i++ { + if s[i:i+len(substr)] == substr { + return true + } + } + return false +} diff --git a/scripts/delete_survey_documents.sh b/scripts/delete_survey_documents.sh new file mode 100755 index 0000000..ef84a8b --- /dev/null +++ b/scripts/delete_survey_documents.sh @@ -0,0 +1,258 @@ +#!/bin/bash +# Copyright The Linux Foundation and each contributor to LFX. +# SPDX-License-Identifier: MIT + +# Script to delete all survey and survey_response documents from OpenSearch +# This is a temporary utility script for cleaning up test/migration data + +set -e + +# Check for required dependencies +if ! command -v jq &> /dev/null; then + echo "Error: jq is not installed. Please install jq to run this script." + echo " - macOS: brew install jq" + echo " - Ubuntu/Debian: apt-get install jq" + echo " - RHEL/CentOS: yum install jq" + exit 1 +fi + +if ! command -v curl &> /dev/null; then + echo "Error: curl is not installed. Please install curl to run this script." + exit 1 +fi + +# Configuration +OPENSEARCH_URL="${OPENSEARCH_URL:-http://opensearch-cluster-master.lfx.svc.cluster.local:9200}" +INDEX_NAME="${INDEX_NAME:-resources}" + +echo "================================================" +echo "Survey & Survey Response Cleanup Script" +echo "================================================" +echo "OpenSearch URL: $OPENSEARCH_URL" +echo "OpenSearch Index: $INDEX_NAME" +echo "" +echo "This will delete ALL OpenSearch documents with type:" +echo " - survey" +echo " - survey_response" +echo "" +read -p "Are you sure you want to proceed? (yes/no): " CONFIRM + +if [ "$CONFIRM" != "yes" ]; then + echo "Aborted." + exit 0 +fi + +echo "" +echo "Step 1: Counting survey documents..." +SURVEY_RESPONSE=$(curl -s -w "\n%{http_code}" -X GET "${OPENSEARCH_URL}/${INDEX_NAME}/_count" \ + -H 'Content-Type: application/json' \ + -d '{ + "query": { + "term": { + "object_type": "survey" + } + } + }') + +HTTP_CODE=$(echo "$SURVEY_RESPONSE" | tail -n1) +SURVEY_BODY=$(echo "$SURVEY_RESPONSE" | sed '$d') + +if [ "$HTTP_CODE" != "200" ]; then + echo "Error: Failed to query OpenSearch (HTTP $HTTP_CODE)" + echo "Response: $SURVEY_BODY" + exit 1 +fi + +SURVEY_COUNT=$(echo "$SURVEY_BODY" | jq -r '.count') +if [ "$SURVEY_COUNT" = "null" ] || [ -z "$SURVEY_COUNT" ]; then + echo "Error: Invalid response from OpenSearch" + echo "Response: $SURVEY_BODY" + exit 1 +fi + +echo "Found $SURVEY_COUNT survey documents" + +echo "" +echo "Step 2: Counting survey_response documents..." +RESPONSE_RESPONSE=$(curl -s -w "\n%{http_code}" -X GET "${OPENSEARCH_URL}/${INDEX_NAME}/_count" \ + -H 'Content-Type: application/json' \ + -d '{ + "query": { + "term": { + "object_type": "survey_response" + } + } + }') + +HTTP_CODE=$(echo "$RESPONSE_RESPONSE" | tail -n1) +RESPONSE_BODY=$(echo "$RESPONSE_RESPONSE" | sed '$d') + +if [ "$HTTP_CODE" != "200" ]; then + echo "Error: Failed to query OpenSearch (HTTP $HTTP_CODE)" + echo "Response: $RESPONSE_BODY" + exit 1 +fi + +RESPONSE_COUNT=$(echo "$RESPONSE_BODY" | jq -r '.count') +if [ "$RESPONSE_COUNT" = "null" ] || [ -z "$RESPONSE_COUNT" ]; then + echo "Error: Invalid response from OpenSearch" + echo "Response: $RESPONSE_BODY" + exit 1 +fi + +echo "Found $RESPONSE_COUNT survey_response documents" + +TOTAL_COUNT=$((SURVEY_COUNT + RESPONSE_COUNT)) +echo "" +echo "Total documents to delete: $TOTAL_COUNT" + +if [ "$TOTAL_COUNT" -eq 0 ]; then + echo "No documents to delete. Exiting." + exit 0 +fi + +echo "" +read -p "Proceed with deletion? (yes/no): " CONFIRM_DELETE + +if [ "$CONFIRM_DELETE" != "yes" ]; then + echo "Aborted." + exit 0 +fi + +# Delete OpenSearch documents for surveys +echo "" +echo "Step 3: Deleting OpenSearch survey documents..." +SURVEY_DELETE_RESPONSE=$(curl -s -w "\n%{http_code}" -X POST "${OPENSEARCH_URL}/${INDEX_NAME}/_delete_by_query?conflicts=proceed" \ + -H 'Content-Type: application/json' \ + -d '{ + "query": { + "term": { + "object_type": "survey" + } + } + }') + +HTTP_CODE=$(echo "$SURVEY_DELETE_RESPONSE" | tail -n1) +SURVEY_RESULT=$(echo "$SURVEY_DELETE_RESPONSE" | sed '$d') + +if [ "$HTTP_CODE" != "200" ]; then + echo "Error: Failed to delete survey documents (HTTP $HTTP_CODE)" + echo "Response: $SURVEY_RESULT" + exit 1 +fi + +SURVEY_DELETED=$(echo "$SURVEY_RESULT" | jq -r '.deleted') +if [ "$SURVEY_DELETED" = "null" ]; then + SURVEY_DELETED=0 +fi + +echo "Deleted $SURVEY_DELETED survey documents from OpenSearch" + +# Delete OpenSearch documents for survey responses +echo "" +echo "Step 4: Deleting OpenSearch survey_response documents..." +RESPONSE_DELETE_RESPONSE=$(curl -s -w "\n%{http_code}" -X POST "${OPENSEARCH_URL}/${INDEX_NAME}/_delete_by_query?conflicts=proceed" \ + -H 'Content-Type: application/json' \ + -d '{ + "query": { + "term": { + "object_type": "survey_response" + } + } + }') + +HTTP_CODE=$(echo "$RESPONSE_DELETE_RESPONSE" | tail -n1) +RESPONSE_RESULT=$(echo "$RESPONSE_DELETE_RESPONSE" | sed '$d') + +if [ "$HTTP_CODE" != "200" ]; then + echo "Error: Failed to delete survey_response documents (HTTP $HTTP_CODE)" + echo "Response: $RESPONSE_RESULT" + exit 1 +fi + +RESPONSE_DELETED=$(echo "$RESPONSE_RESULT" | jq -r '.deleted') +if [ "$RESPONSE_DELETED" = "null" ]; then + RESPONSE_DELETED=0 +fi + +echo "Deleted $RESPONSE_DELETED survey_response documents from OpenSearch" + +TOTAL_DELETED=$((SURVEY_DELETED + RESPONSE_DELETED)) + +echo "" +echo "================================================" +echo "Cleanup Complete" +echo "================================================" +echo "OpenSearch documents deleted: $TOTAL_DELETED" +echo "" +echo "Waiting 5 seconds for OpenSearch to process deletions..." +sleep 5 + +# Verify OpenSearch cleanup +echo "" +echo "Step 5: Verifying OpenSearch cleanup..." + +VERIFY_SURVEY_RESPONSE=$(curl -s -w "\n%{http_code}" -X GET "${OPENSEARCH_URL}/${INDEX_NAME}/_count" \ + -H 'Content-Type: application/json' \ + -d '{ + "query": { + "term": { + "object_type": "survey" + } + } + }') + +HTTP_CODE=$(echo "$VERIFY_SURVEY_RESPONSE" | tail -n1) +VERIFY_SURVEY_BODY=$(echo "$VERIFY_SURVEY_RESPONSE" | sed '$d') + +if [ "$HTTP_CODE" = "200" ]; then + REMAINING_SURVEY=$(echo "$VERIFY_SURVEY_BODY" | jq -r '.count') + if [ "$REMAINING_SURVEY" = "null" ]; then + REMAINING_SURVEY=0 + fi +else + echo "Warning: Failed to verify survey document count (HTTP $HTTP_CODE)" + REMAINING_SURVEY="unknown" +fi + +VERIFY_RESPONSE_RESPONSE=$(curl -s -w "\n%{http_code}" -X GET "${OPENSEARCH_URL}/${INDEX_NAME}/_count" \ + -H 'Content-Type: application/json' \ + -d '{ + "query": { + "term": { + "object_type": "survey_response" + } + } + }') + +HTTP_CODE=$(echo "$VERIFY_RESPONSE_RESPONSE" | tail -n1) +VERIFY_RESPONSE_BODY=$(echo "$VERIFY_RESPONSE_RESPONSE" | sed '$d') + +if [ "$HTTP_CODE" = "200" ]; then + REMAINING_RESPONSE=$(echo "$VERIFY_RESPONSE_BODY" | jq -r '.count') + if [ "$REMAINING_RESPONSE" = "null" ]; then + REMAINING_RESPONSE=0 + fi +else + echo "Warning: Failed to verify survey_response document count (HTTP $HTTP_CODE)" + REMAINING_RESPONSE="unknown" +fi + +echo "Remaining survey documents: $REMAINING_SURVEY" +echo "Remaining survey_response documents: $REMAINING_RESPONSE" + +if [ "$REMAINING_SURVEY" = "unknown" ] || [ "$REMAINING_RESPONSE" = "unknown" ]; then + echo "Total remaining: unknown (verification failed)" + echo "" + echo "⚠ Warning: Could not verify cleanup completion" +else + TOTAL_REMAINING=$((REMAINING_SURVEY + REMAINING_RESPONSE)) + echo "Total remaining: $TOTAL_REMAINING" + + echo "" + if [ "$TOTAL_REMAINING" -eq 0 ]; then + echo "✓ All OpenSearch documents successfully removed!" + else + echo "⚠ Warning: $TOTAL_REMAINING documents still remain." + fi +fi