diff --git a/api/proto/meridian/control_plane/v1/manifest_history_service.proto b/api/proto/meridian/control_plane/v1/manifest_history_service.proto index 1b66d0a2a..0d7c8139f 100644 --- a/api/proto/meridian/control_plane/v1/manifest_history_service.proto +++ b/api/proto/meridian/control_plane/v1/manifest_history_service.proto @@ -47,6 +47,19 @@ service ManifestHistoryService { get: "/v1/manifests/diff" }; } + + // ExportManifest reconstructs a manifest from live service state. + // Queries each downstream service (reference-data, market-information, party, + // operational-gateway, internal-account) for their current resources and + // assembles them into a complete Manifest proto. Used to migrate existing + // tenants (set up via direct gRPC before the manifest-first model) into the + // manifest-first workflow. This is a read-only operation — no mutations and + // no manifest version is created. + rpc ExportManifest(ExportManifestRequest) returns (ExportManifestResponse) { + option (google.api.http) = { + get: "/v1/manifests/export" + }; + } } // ======================================== @@ -251,3 +264,53 @@ message DiffSummary { // has_breaking_changes indicates whether any action is a breaking change. bool has_breaking_changes = 6; } + +// ======================================== +// Export Manifest +// ======================================== + +// ExportManifestRequest requests a manifest reconstructed from live service state. +// All fields are optional filters to control which sections are included. +message ExportManifestRequest { + // include_sections filters which manifest sections to include in the export. + // If empty, all sections are included. Valid values: + // "instruments", "account_types", "valuation_rules", "sagas", + // "market_data", "organizations", "internal_accounts", + // "operational_gateway", "mappings", "party_types", "payment_rails" + repeated string include_sections = 1 [(buf.validate.field).repeated = { + max_items: 20 + items: { + string: { + min_len: 1 + max_len: 64 + } + } + }]; + + // manifest_version filters the export to match a specific previously applied + // manifest version string. When set, metadata and sections not backed by live + // services (e.g., valuation_rules, seed_data) are sourced from this version. + // When empty, the most recently applied manifest version is used as fallback. + string manifest_version = 2 [(buf.validate.field).string.max_len = 50]; +} + +// ExportManifestResponse contains the reconstructed manifest and export metadata. +message ExportManifestResponse { + // manifest is the reconstructed manifest from live service state. + Manifest manifest = 1; + + // exported_at is the timestamp when the export was generated. + google.protobuf.Timestamp exported_at = 2; + + // checksum is the SHA-256 hash of the reconstructed manifest content. + string checksum = 3; + + // section_sources documents where each section's data was sourced from. + // Keys are section names, values describe the source (e.g., "live:reference-data", + // "fallback:manifest-v1.0"). + map section_sources = 4; + + // warnings contains non-fatal issues encountered during export + // (e.g., a service was unreachable, partial data returned). + repeated string warnings = 5; +} diff --git a/services/control-plane/internal/manifest/export.go b/services/control-plane/internal/manifest/export.go new file mode 100644 index 000000000..d755196ba --- /dev/null +++ b/services/control-plane/internal/manifest/export.go @@ -0,0 +1,489 @@ +package manifest + +import ( + "context" + "crypto/sha256" + "errors" + "fmt" + "time" + + controlplanev1 "github.com/meridianhub/meridian/api/proto/meridian/control_plane/v1" + "google.golang.org/protobuf/encoding/protojson" + "google.golang.org/protobuf/types/known/timestamppb" +) + +// SectionName identifies a manifest section for filtering. +type SectionName string + +// Manifest section names for filtering ExportManifest results. +const ( + SectionInstruments SectionName = "instruments" + SectionAccountTypes SectionName = "account_types" + SectionValuationRules SectionName = "valuation_rules" + SectionSagas SectionName = "sagas" + SectionMarketData SectionName = "market_data" + SectionOrganizations SectionName = "organizations" + SectionInternalAccounts SectionName = "internal_accounts" + SectionOperationalGateway SectionName = "operational_gateway" + SectionMappings SectionName = "mappings" + SectionPartyTypes SectionName = "party_types" + SectionPaymentRails SectionName = "payment_rails" +) + +// allSections is the complete set of exportable sections. +var allSections = map[SectionName]bool{ + SectionInstruments: true, + SectionAccountTypes: true, + SectionValuationRules: true, + SectionSagas: true, + SectionMarketData: true, + SectionOrganizations: true, + SectionInternalAccounts: true, + SectionOperationalGateway: true, + SectionMappings: true, + SectionPartyTypes: true, + SectionPaymentRails: true, +} + +// InstrumentCollector retrieves instruments from the live reference-data service. +type InstrumentCollector interface { + ListInstruments(ctx context.Context) ([]*controlplanev1.InstrumentDefinition, error) +} + +// AccountTypeCollector retrieves account types from the live reference-data service. +type AccountTypeCollector interface { + ListAccountTypes(ctx context.Context) ([]*controlplanev1.AccountTypeDefinition, error) +} + +// SagaCollector retrieves saga definitions from the live saga registry. +type SagaCollector interface { + ListSagas(ctx context.Context) ([]*controlplanev1.SagaDefinition, error) +} + +// MarketDataCollector retrieves market data sources and data sets from the live market-information service. +type MarketDataCollector interface { + ListMarketDataSources(ctx context.Context) ([]*controlplanev1.MarketDataSourceDefinition, error) + ListMarketDataSets(ctx context.Context) ([]*controlplanev1.MarketDataSetDefinition, error) +} + +// OrganizationCollector retrieves organizations from the live party service. +type OrganizationCollector interface { + ListOrganizations(ctx context.Context) ([]*controlplanev1.OrganizationDefinition, error) +} + +// InternalAccountCollector retrieves internal accounts from the live internal-account service. +type InternalAccountCollector interface { + ListInternalAccounts(ctx context.Context) ([]*controlplanev1.InternalAccountDefinition, error) +} + +// OperationalGatewayCollector retrieves provider connections and routes from the live operational-gateway service. +type OperationalGatewayCollector interface { + ListProviderConnections(ctx context.Context) ([]*controlplanev1.ProviderConnectionConfig, error) + ListInstructionRoutes(ctx context.Context) ([]*controlplanev1.InstructionRouteConfig, error) +} + +// ExportCollectors groups all collector interfaces for manifest export. +// All fields are optional — nil collectors result in fallback to the stored manifest +// for that section. +type ExportCollectors struct { + Instruments InstrumentCollector + AccountTypes AccountTypeCollector + Sagas SagaCollector + MarketData MarketDataCollector + Organizations OrganizationCollector + InternalAccounts InternalAccountCollector + OperationalGateway OperationalGatewayCollector +} + +// ExportService reconstructs manifests from live service state. +type ExportService struct { + history *HistoryService + collectors *ExportCollectors +} + +// NewExportService creates a new ExportService. +// history is required for fallback data; collectors may be nil (all sections fall back). +func NewExportService(history *HistoryService, collectors *ExportCollectors) (*ExportService, error) { + if history == nil { + return nil, ErrNilRepository + } + if collectors == nil { + collectors = &ExportCollectors{} + } + return &ExportService{ + history: history, + collectors: collectors, + }, nil +} + +// ExportResult holds the output of an export operation. +type ExportResult struct { + Manifest *controlplanev1.Manifest + ExportedAt time.Time + Checksum string + SectionSources map[string]string + Warnings []string +} + +// Export reconstructs a manifest from live service state. +// includeSections filters which sections to include (empty means all). +// manifestVersion specifies which stored manifest to use for fallback data. +func (s *ExportService) Export(ctx context.Context, includeSections []string, manifestVersion string) (*ExportResult, error) { + // Determine which sections to include. + sections := parseSections(includeSections) + + // Load fallback manifest from history. + fallback, fallbackVersion, err := s.loadFallbackManifest(ctx, manifestVersion) + if err != nil { + return nil, fmt.Errorf("load fallback manifest: %w", err) + } + + result := &ExportResult{ + Manifest: &controlplanev1.Manifest{}, + ExportedAt: time.Now().UTC(), + SectionSources: make(map[string]string), + } + + // Always include version and metadata from fallback. + if fallback != nil { + result.Manifest.Version = fallback.Version + result.Manifest.Metadata = fallback.Metadata + } else { + result.Manifest.Version = "1.0" + result.Manifest.Metadata = &controlplanev1.ManifestMetadata{ + Name: "exported", + Description: "Manifest reconstructed from live service state", + } + } + + // Collect each section. + if sections[SectionInstruments] { + s.collectInstruments(ctx, result, fallback, fallbackVersion) + } + if sections[SectionAccountTypes] { + s.collectAccountTypes(ctx, result, fallback, fallbackVersion) + } + if sections[SectionValuationRules] { + s.collectValuationRules(result, fallback, fallbackVersion) + } + if sections[SectionSagas] { + s.collectSagas(ctx, result, fallback, fallbackVersion) + } + if sections[SectionMarketData] { + s.collectMarketData(ctx, result, fallback, fallbackVersion) + } + if sections[SectionOrganizations] { + s.collectOrganizations(ctx, result, fallback, fallbackVersion) + } + if sections[SectionInternalAccounts] { + s.collectInternalAccounts(ctx, result, fallback, fallbackVersion) + } + if sections[SectionOperationalGateway] { + s.collectOperationalGateway(ctx, result, fallback, fallbackVersion) + } + if sections[SectionMappings] { + s.collectMappings(result, fallback, fallbackVersion) + } + if sections[SectionPartyTypes] { + s.collectPartyTypes(result, fallback, fallbackVersion) + } + if sections[SectionPaymentRails] { + s.collectPaymentRails(result, fallback, fallbackVersion) + } + + // Compute checksum. + checksum, err := manifestChecksum(result.Manifest) + if err != nil { + return nil, fmt.Errorf("compute checksum: %w", err) + } + result.Checksum = checksum + + return result, nil +} + +// parseSections converts include_sections strings to a set of SectionNames. +// Returns all sections if the input is empty. +func parseSections(include []string) map[SectionName]bool { + if len(include) == 0 { + // Return all sections. + result := make(map[SectionName]bool, len(allSections)) + for k, v := range allSections { + result[k] = v + } + return result + } + result := make(map[SectionName]bool, len(include)) + for _, s := range include { + name := SectionName(s) + if allSections[name] { + result[name] = true + } + } + return result +} + +// loadFallbackManifest loads the manifest used as fallback for sections without live collectors. +func (s *ExportService) loadFallbackManifest(ctx context.Context, version string) (*controlplanev1.Manifest, string, error) { + var entity *VersionEntity + var err error + + if version != "" { + entity, err = s.history.GetManifestVersion(ctx, version) + } else { + entity, err = s.history.GetCurrentManifest(ctx) + } + + if err != nil { + if isNotFound(err) { + return nil, "", nil + } + return nil, "", err + } + + manifest, err := unmarshalManifest(entity.ManifestJSON) + if err != nil { + return nil, "", fmt.Errorf("unmarshal fallback manifest: %w", err) + } + return manifest, entity.Version, nil +} + +// isNotFound checks if an error is a not-found condition. +func isNotFound(err error) bool { + return err != nil && errors.Is(err, ErrVersionNotFound) +} + +func (s *ExportService) collectInstruments(ctx context.Context, result *ExportResult, fallback *controlplanev1.Manifest, fallbackVersion string) { + if s.collectors.Instruments != nil { + instruments, err := s.collectors.Instruments.ListInstruments(ctx) + if err != nil { + result.Warnings = append(result.Warnings, fmt.Sprintf("instruments: failed to query live state: %v", err)) + if fallback != nil { + result.Manifest.Instruments = fallback.Instruments + result.SectionSources["instruments"] = fmt.Sprintf("fallback:manifest-v%s", fallbackVersion) + } + return + } + result.Manifest.Instruments = instruments + result.SectionSources["instruments"] = "live:reference-data" + return + } + if fallback != nil { + result.Manifest.Instruments = fallback.Instruments + result.SectionSources["instruments"] = fmt.Sprintf("fallback:manifest-v%s", fallbackVersion) + } +} + +func (s *ExportService) collectAccountTypes(ctx context.Context, result *ExportResult, fallback *controlplanev1.Manifest, fallbackVersion string) { + if s.collectors.AccountTypes != nil { + accountTypes, err := s.collectors.AccountTypes.ListAccountTypes(ctx) + if err != nil { + result.Warnings = append(result.Warnings, fmt.Sprintf("account_types: failed to query live state: %v", err)) + if fallback != nil { + result.Manifest.AccountTypes = fallback.AccountTypes + result.SectionSources["account_types"] = fmt.Sprintf("fallback:manifest-v%s", fallbackVersion) + } + return + } + result.Manifest.AccountTypes = accountTypes + result.SectionSources["account_types"] = "live:reference-data" + return + } + if fallback != nil { + result.Manifest.AccountTypes = fallback.AccountTypes + result.SectionSources["account_types"] = fmt.Sprintf("fallback:manifest-v%s", fallbackVersion) + } +} + +func (s *ExportService) collectValuationRules(result *ExportResult, fallback *controlplanev1.Manifest, fallbackVersion string) { + // Valuation rules have no dedicated live service — always from fallback. + if fallback != nil { + result.Manifest.ValuationRules = fallback.ValuationRules + result.SectionSources["valuation_rules"] = fmt.Sprintf("fallback:manifest-v%s", fallbackVersion) + } +} + +func (s *ExportService) collectSagas(ctx context.Context, result *ExportResult, fallback *controlplanev1.Manifest, fallbackVersion string) { + if s.collectors.Sagas != nil { + sagas, err := s.collectors.Sagas.ListSagas(ctx) + if err != nil { + result.Warnings = append(result.Warnings, fmt.Sprintf("sagas: failed to query live state: %v", err)) + if fallback != nil { + result.Manifest.Sagas = fallback.Sagas + result.SectionSources["sagas"] = fmt.Sprintf("fallback:manifest-v%s", fallbackVersion) + } + return + } + result.Manifest.Sagas = sagas + result.SectionSources["sagas"] = "live:saga-registry" + return + } + if fallback != nil { + result.Manifest.Sagas = fallback.Sagas + result.SectionSources["sagas"] = fmt.Sprintf("fallback:manifest-v%s", fallbackVersion) + } +} + +func (s *ExportService) collectMarketData(ctx context.Context, result *ExportResult, fallback *controlplanev1.Manifest, fallbackVersion string) { + if s.collectors.MarketData != nil { + sources, srcErr := s.collectors.MarketData.ListMarketDataSources(ctx) + datasets, dsErr := s.collectors.MarketData.ListMarketDataSets(ctx) + + if srcErr != nil || dsErr != nil { + var warning string + if srcErr != nil { + warning = fmt.Sprintf("market_data.sources: %v", srcErr) + } + if dsErr != nil { + if warning != "" { + warning += "; " + } + warning += fmt.Sprintf("market_data.datasets: %v", dsErr) + } + result.Warnings = append(result.Warnings, warning) + if fallback != nil { + result.Manifest.MarketData = fallback.MarketData + result.SectionSources["market_data"] = fmt.Sprintf("fallback:manifest-v%s", fallbackVersion) + } + return + } + + result.Manifest.MarketData = &controlplanev1.MarketDataConfig{ + Sources: sources, + Datasets: datasets, + } + result.SectionSources["market_data"] = "live:market-information" + return + } + if fallback != nil { + result.Manifest.MarketData = fallback.MarketData + result.SectionSources["market_data"] = fmt.Sprintf("fallback:manifest-v%s", fallbackVersion) + } +} + +func (s *ExportService) collectOrganizations(ctx context.Context, result *ExportResult, fallback *controlplanev1.Manifest, fallbackVersion string) { + if s.collectors.Organizations != nil { + orgs, err := s.collectors.Organizations.ListOrganizations(ctx) + if err != nil { + result.Warnings = append(result.Warnings, fmt.Sprintf("organizations: failed to query live state: %v", err)) + if fallback != nil { + result.Manifest.Organizations = fallback.Organizations + result.SectionSources["organizations"] = fmt.Sprintf("fallback:manifest-v%s", fallbackVersion) + } + return + } + result.Manifest.Organizations = orgs + result.SectionSources["organizations"] = "live:party" + return + } + if fallback != nil { + result.Manifest.Organizations = fallback.Organizations + result.SectionSources["organizations"] = fmt.Sprintf("fallback:manifest-v%s", fallbackVersion) + } +} + +func (s *ExportService) collectInternalAccounts(ctx context.Context, result *ExportResult, fallback *controlplanev1.Manifest, fallbackVersion string) { + if s.collectors.InternalAccounts != nil { + accounts, err := s.collectors.InternalAccounts.ListInternalAccounts(ctx) + if err != nil { + result.Warnings = append(result.Warnings, fmt.Sprintf("internal_accounts: failed to query live state: %v", err)) + if fallback != nil { + result.Manifest.InternalAccounts = fallback.InternalAccounts + result.SectionSources["internal_accounts"] = fmt.Sprintf("fallback:manifest-v%s", fallbackVersion) + } + return + } + result.Manifest.InternalAccounts = accounts + result.SectionSources["internal_accounts"] = "live:internal-account" + return + } + if fallback != nil { + result.Manifest.InternalAccounts = fallback.InternalAccounts + result.SectionSources["internal_accounts"] = fmt.Sprintf("fallback:manifest-v%s", fallbackVersion) + } +} + +func (s *ExportService) collectOperationalGateway(ctx context.Context, result *ExportResult, fallback *controlplanev1.Manifest, fallbackVersion string) { + if s.collectors.OperationalGateway != nil { + connections, connErr := s.collectors.OperationalGateway.ListProviderConnections(ctx) + routes, routeErr := s.collectors.OperationalGateway.ListInstructionRoutes(ctx) + + if connErr != nil || routeErr != nil { + var warning string + if connErr != nil { + warning = fmt.Sprintf("operational_gateway.connections: %v", connErr) + } + if routeErr != nil { + if warning != "" { + warning += "; " + } + warning += fmt.Sprintf("operational_gateway.routes: %v", routeErr) + } + result.Warnings = append(result.Warnings, warning) + if fallback != nil { + result.Manifest.OperationalGateway = fallback.OperationalGateway + result.SectionSources["operational_gateway"] = fmt.Sprintf("fallback:manifest-v%s", fallbackVersion) + } + return + } + + result.Manifest.OperationalGateway = &controlplanev1.OperationalGatewayConfig{ + ProviderConnections: connections, + InstructionRoutes: routes, + } + result.SectionSources["operational_gateway"] = "live:operational-gateway" + return + } + if fallback != nil { + result.Manifest.OperationalGateway = fallback.OperationalGateway + result.SectionSources["operational_gateway"] = fmt.Sprintf("fallback:manifest-v%s", fallbackVersion) + } +} + +func (s *ExportService) collectMappings(result *ExportResult, fallback *controlplanev1.Manifest, fallbackVersion string) { + // Mappings have no dedicated live service — always from fallback. + if fallback != nil { + result.Manifest.Mappings = fallback.Mappings + result.SectionSources["mappings"] = fmt.Sprintf("fallback:manifest-v%s", fallbackVersion) + } +} + +func (s *ExportService) collectPartyTypes(result *ExportResult, fallback *controlplanev1.Manifest, fallbackVersion string) { + // Party types — always from fallback for now. + if fallback != nil { + result.Manifest.PartyTypes = fallback.PartyTypes + result.SectionSources["party_types"] = fmt.Sprintf("fallback:manifest-v%s", fallbackVersion) + } +} + +func (s *ExportService) collectPaymentRails(result *ExportResult, fallback *controlplanev1.Manifest, fallbackVersion string) { + // Payment rails have no dedicated live service — always from fallback. + if fallback != nil { + result.Manifest.PaymentRails = fallback.PaymentRails + result.SectionSources["payment_rails"] = fmt.Sprintf("fallback:manifest-v%s", fallbackVersion) + } +} + +// manifestChecksum computes a SHA-256 checksum of a Manifest proto. +func manifestChecksum(m *controlplanev1.Manifest) (string, error) { + marshaler := protojson.MarshalOptions{ + UseProtoNames: true, + EmitUnpopulated: false, + } + b, err := marshaler.Marshal(m) + if err != nil { + return "", fmt.Errorf("marshal manifest for checksum: %w", err) + } + sum := sha256.Sum256(b) + return fmt.Sprintf("%x", sum), nil +} + +// ToProtoResponse converts an ExportResult to the gRPC response. +func (r *ExportResult) ToProtoResponse() *controlplanev1.ExportManifestResponse { + return &controlplanev1.ExportManifestResponse{ + Manifest: r.Manifest, + ExportedAt: timestamppb.New(r.ExportedAt), + Checksum: r.Checksum, + SectionSources: r.SectionSources, + Warnings: r.Warnings, + } +} diff --git a/services/control-plane/internal/manifest/export_test.go b/services/control-plane/internal/manifest/export_test.go new file mode 100644 index 000000000..c988afb8a --- /dev/null +++ b/services/control-plane/internal/manifest/export_test.go @@ -0,0 +1,648 @@ +package manifest + +import ( + "context" + "errors" + "testing" + + controlplanev1 "github.com/meridianhub/meridian/api/proto/meridian/control_plane/v1" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// --- Mock collectors --- + +type mockInstrumentCollector struct { + instruments []*controlplanev1.InstrumentDefinition + err error +} + +func (m *mockInstrumentCollector) ListInstruments(_ context.Context) ([]*controlplanev1.InstrumentDefinition, error) { + return m.instruments, m.err +} + +type mockAccountTypeCollector struct { + accountTypes []*controlplanev1.AccountTypeDefinition + err error +} + +func (m *mockAccountTypeCollector) ListAccountTypes(_ context.Context) ([]*controlplanev1.AccountTypeDefinition, error) { + return m.accountTypes, m.err +} + +type mockSagaCollector struct { + sagas []*controlplanev1.SagaDefinition + err error +} + +func (m *mockSagaCollector) ListSagas(_ context.Context) ([]*controlplanev1.SagaDefinition, error) { + return m.sagas, m.err +} + +type mockMarketDataCollector struct { + sources []*controlplanev1.MarketDataSourceDefinition + datasets []*controlplanev1.MarketDataSetDefinition + srcErr error + dsErr error +} + +func (m *mockMarketDataCollector) ListMarketDataSources(_ context.Context) ([]*controlplanev1.MarketDataSourceDefinition, error) { + return m.sources, m.srcErr +} + +func (m *mockMarketDataCollector) ListMarketDataSets(_ context.Context) ([]*controlplanev1.MarketDataSetDefinition, error) { + return m.datasets, m.dsErr +} + +type mockOrganizationCollector struct { + orgs []*controlplanev1.OrganizationDefinition + err error +} + +func (m *mockOrganizationCollector) ListOrganizations(_ context.Context) ([]*controlplanev1.OrganizationDefinition, error) { + return m.orgs, m.err +} + +type mockInternalAccountCollector struct { + accounts []*controlplanev1.InternalAccountDefinition + err error +} + +func (m *mockInternalAccountCollector) ListInternalAccounts(_ context.Context) ([]*controlplanev1.InternalAccountDefinition, error) { + return m.accounts, m.err +} + +type mockOperationalGatewayCollector struct { + connections []*controlplanev1.ProviderConnectionConfig + routes []*controlplanev1.InstructionRouteConfig + connErr error + routeErr error +} + +func (m *mockOperationalGatewayCollector) ListProviderConnections(_ context.Context) ([]*controlplanev1.ProviderConnectionConfig, error) { + return m.connections, m.connErr +} + +func (m *mockOperationalGatewayCollector) ListInstructionRoutes(_ context.Context) ([]*controlplanev1.InstructionRouteConfig, error) { + return m.routes, m.routeErr +} + +// --- Test helpers --- + +func testFallbackManifest() *controlplanev1.Manifest { + return &controlplanev1.Manifest{ + Version: "1.0", + Metadata: &controlplanev1.ManifestMetadata{ + Name: "Test Economy", + Industry: "testing", + }, + Instruments: []*controlplanev1.InstrumentDefinition{ + { + Code: "GBP", + Name: "British Pound", + Type: controlplanev1.InstrumentType_INSTRUMENT_TYPE_FIAT, + Dimensions: &controlplanev1.InstrumentDimensions{ + Unit: "GBP", + Precision: 2, + }, + }, + }, + AccountTypes: []*controlplanev1.AccountTypeDefinition{ + { + Code: "CURRENT", + Name: "Current Account", + NormalBalance: controlplanev1.NormalBalance_NORMAL_BALANCE_DEBIT, + }, + }, + ValuationRules: []*controlplanev1.ValuationRule{ + { + FromInstrument: "KWH", + ToInstrument: "GBP", + Method: controlplanev1.ValuationMethod_VALUATION_METHOD_SPOT_RATE, + Source: "nordpool", + }, + }, + Sagas: []*controlplanev1.SagaDefinition{ + { + Name: "test_saga", + Trigger: "api:/v1/test", + Script: "print('hello')", + }, + }, + } +} + +// --- Tests --- + +func TestParseSections(t *testing.T) { + t.Run("empty includes all sections", func(t *testing.T) { + result := parseSections(nil) + assert.Equal(t, len(allSections), len(result)) + for k := range allSections { + assert.True(t, result[k], "expected section %s to be included", k) + } + }) + + t.Run("specific sections", func(t *testing.T) { + result := parseSections([]string{"instruments", "sagas"}) + assert.Len(t, result, 2) + assert.True(t, result[SectionInstruments]) + assert.True(t, result[SectionSagas]) + }) + + t.Run("unknown sections are ignored", func(t *testing.T) { + result := parseSections([]string{"instruments", "nonexistent"}) + assert.Len(t, result, 1) + assert.True(t, result[SectionInstruments]) + }) +} + +func TestManifestChecksum(t *testing.T) { + t.Run("deterministic checksum", func(t *testing.T) { + m := &controlplanev1.Manifest{ + Version: "1.0", + Metadata: &controlplanev1.ManifestMetadata{ + Name: "Test", + }, + } + c1, err := manifestChecksum(m) + require.NoError(t, err) + c2, err := manifestChecksum(m) + require.NoError(t, err) + assert.Equal(t, c1, c2) + assert.Len(t, c1, 64) // SHA-256 hex length + }) + + t.Run("different manifests produce different checksums", func(t *testing.T) { + m1 := &controlplanev1.Manifest{Version: "1.0", Metadata: &controlplanev1.ManifestMetadata{Name: "A"}} + m2 := &controlplanev1.Manifest{Version: "1.0", Metadata: &controlplanev1.ManifestMetadata{Name: "B"}} + c1, _ := manifestChecksum(m1) + c2, _ := manifestChecksum(m2) + assert.NotEqual(t, c1, c2) + }) +} + +func TestExportService_NewExportService(t *testing.T) { + t.Run("nil history returns error", func(t *testing.T) { + _, err := NewExportService(nil, nil) + require.Error(t, err) + }) +} + +func TestExportResult_ToProtoResponse(t *testing.T) { + result := &ExportResult{ + Manifest: &controlplanev1.Manifest{Version: "1.0"}, + Checksum: "abc123", + SectionSources: map[string]string{ + "instruments": "live:reference-data", + }, + Warnings: []string{"test warning"}, + } + resp := result.ToProtoResponse() + assert.Equal(t, "1.0", resp.Manifest.Version) + assert.Equal(t, "abc123", resp.Checksum) + assert.Equal(t, "live:reference-data", resp.SectionSources["instruments"]) + assert.Equal(t, []string{"test warning"}, resp.Warnings) + assert.NotNil(t, resp.ExportedAt) +} + +func TestExportService_Export_AllFallback(t *testing.T) { + // When no collectors are configured, all sections come from fallback. + // We use a mock history service by creating one with a repository that has a stored manifest. + // For unit testing without a DB, we'll test the export logic directly. + + t.Run("no collectors, no fallback returns empty manifest", func(t *testing.T) { + // Create a minimal ExportService with a history that returns not-found. + svc := &ExportService{ + history: &HistoryService{repo: nil}, // will cause panic if called + collectors: &ExportCollectors{}, + } + + // Call export with a custom loadFallbackManifest override. + // Since we can't easily mock the history service in a unit test without a DB, + // we test the collection functions directly. + result := &ExportResult{ + Manifest: &controlplanev1.Manifest{Version: "1.0"}, + SectionSources: make(map[string]string), + } + + // With nil fallback, collect functions should be no-ops. + svc.collectInstruments(context.Background(), result, nil, "") + assert.Nil(t, result.Manifest.Instruments) + assert.Empty(t, result.SectionSources["instruments"]) + }) + + t.Run("no collectors, with fallback populates from fallback", func(t *testing.T) { + svc := &ExportService{ + collectors: &ExportCollectors{}, + } + + fb := testFallbackManifest() + result := &ExportResult{ + Manifest: &controlplanev1.Manifest{Version: "1.0"}, + SectionSources: make(map[string]string), + } + + svc.collectInstruments(context.Background(), result, fb, "1.0") + assert.Equal(t, fb.Instruments, result.Manifest.Instruments) + assert.Equal(t, "fallback:manifest-v1.0", result.SectionSources["instruments"]) + + svc.collectAccountTypes(context.Background(), result, fb, "1.0") + assert.Equal(t, fb.AccountTypes, result.Manifest.AccountTypes) + assert.Equal(t, "fallback:manifest-v1.0", result.SectionSources["account_types"]) + + svc.collectValuationRules(result, fb, "1.0") + assert.Equal(t, fb.ValuationRules, result.Manifest.ValuationRules) + assert.Equal(t, "fallback:manifest-v1.0", result.SectionSources["valuation_rules"]) + + svc.collectSagas(context.Background(), result, fb, "1.0") + assert.Equal(t, fb.Sagas, result.Manifest.Sagas) + assert.Equal(t, "fallback:manifest-v1.0", result.SectionSources["sagas"]) + }) +} + +func TestExportService_Export_LiveCollectors(t *testing.T) { + liveInstruments := []*controlplanev1.InstrumentDefinition{ + { + Code: "USD", + Name: "US Dollar", + Type: controlplanev1.InstrumentType_INSTRUMENT_TYPE_FIAT, + Dimensions: &controlplanev1.InstrumentDimensions{ + Unit: "USD", + Precision: 2, + }, + }, + } + + liveAccountTypes := []*controlplanev1.AccountTypeDefinition{ + { + Code: "SAVINGS", + Name: "Savings Account", + NormalBalance: controlplanev1.NormalBalance_NORMAL_BALANCE_DEBIT, + }, + } + + liveSagas := []*controlplanev1.SagaDefinition{ + { + Name: "live_saga", + Trigger: "api:/v1/live", + Script: "print('live')", + }, + } + + t.Run("instruments from live collector", func(t *testing.T) { + svc := &ExportService{ + collectors: &ExportCollectors{ + Instruments: &mockInstrumentCollector{instruments: liveInstruments}, + }, + } + + result := &ExportResult{ + Manifest: &controlplanev1.Manifest{}, + SectionSources: make(map[string]string), + } + + fb := testFallbackManifest() + svc.collectInstruments(context.Background(), result, fb, "1.0") + + assert.Equal(t, liveInstruments, result.Manifest.Instruments) + assert.Equal(t, "live:reference-data", result.SectionSources["instruments"]) + }) + + t.Run("account types from live collector", func(t *testing.T) { + svc := &ExportService{ + collectors: &ExportCollectors{ + AccountTypes: &mockAccountTypeCollector{accountTypes: liveAccountTypes}, + }, + } + + result := &ExportResult{ + Manifest: &controlplanev1.Manifest{}, + SectionSources: make(map[string]string), + } + + svc.collectAccountTypes(context.Background(), result, nil, "") + assert.Equal(t, liveAccountTypes, result.Manifest.AccountTypes) + assert.Equal(t, "live:reference-data", result.SectionSources["account_types"]) + }) + + t.Run("sagas from live collector", func(t *testing.T) { + svc := &ExportService{ + collectors: &ExportCollectors{ + Sagas: &mockSagaCollector{sagas: liveSagas}, + }, + } + + result := &ExportResult{ + Manifest: &controlplanev1.Manifest{}, + SectionSources: make(map[string]string), + } + + svc.collectSagas(context.Background(), result, nil, "") + assert.Equal(t, liveSagas, result.Manifest.Sagas) + assert.Equal(t, "live:saga-registry", result.SectionSources["sagas"]) + }) +} + +func TestExportService_Export_CollectorError_FallsBack(t *testing.T) { + fb := testFallbackManifest() + + t.Run("instrument collector error falls back with warning", func(t *testing.T) { + svc := &ExportService{ + collectors: &ExportCollectors{ + Instruments: &mockInstrumentCollector{err: errors.New("connection refused")}, + }, + } + + result := &ExportResult{ + Manifest: &controlplanev1.Manifest{}, + SectionSources: make(map[string]string), + } + + svc.collectInstruments(context.Background(), result, fb, "1.0") + + assert.Equal(t, fb.Instruments, result.Manifest.Instruments) + assert.Equal(t, "fallback:manifest-v1.0", result.SectionSources["instruments"]) + require.Len(t, result.Warnings, 1) + assert.Contains(t, result.Warnings[0], "connection refused") + }) + + t.Run("saga collector error falls back with warning", func(t *testing.T) { + svc := &ExportService{ + collectors: &ExportCollectors{ + Sagas: &mockSagaCollector{err: errors.New("timeout")}, + }, + } + + result := &ExportResult{ + Manifest: &controlplanev1.Manifest{}, + SectionSources: make(map[string]string), + } + + svc.collectSagas(context.Background(), result, fb, "1.0") + + assert.Equal(t, fb.Sagas, result.Manifest.Sagas) + assert.Equal(t, "fallback:manifest-v1.0", result.SectionSources["sagas"]) + require.Len(t, result.Warnings, 1) + assert.Contains(t, result.Warnings[0], "timeout") + }) + + t.Run("collector error without fallback produces warning only", func(t *testing.T) { + svc := &ExportService{ + collectors: &ExportCollectors{ + Instruments: &mockInstrumentCollector{err: errors.New("fail")}, + }, + } + + result := &ExportResult{ + Manifest: &controlplanev1.Manifest{}, + SectionSources: make(map[string]string), + } + + svc.collectInstruments(context.Background(), result, nil, "") + + assert.Nil(t, result.Manifest.Instruments) + assert.Empty(t, result.SectionSources["instruments"]) + require.Len(t, result.Warnings, 1) + }) +} + +func TestExportService_Export_MarketData(t *testing.T) { + liveSources := []*controlplanev1.MarketDataSourceDefinition{ + {Code: "ECB", Name: "European Central Bank"}, + } + liveDatasets := []*controlplanev1.MarketDataSetDefinition{ + {Code: "USD_EUR_FX", Unit: "USD/EUR"}, + } + + t.Run("market data from live collector", func(t *testing.T) { + svc := &ExportService{ + collectors: &ExportCollectors{ + MarketData: &mockMarketDataCollector{ + sources: liveSources, + datasets: liveDatasets, + }, + }, + } + + result := &ExportResult{ + Manifest: &controlplanev1.Manifest{}, + SectionSources: make(map[string]string), + } + + svc.collectMarketData(context.Background(), result, nil, "") + + require.NotNil(t, result.Manifest.MarketData) + assert.Equal(t, liveSources, result.Manifest.MarketData.Sources) + assert.Equal(t, liveDatasets, result.Manifest.MarketData.Datasets) + assert.Equal(t, "live:market-information", result.SectionSources["market_data"]) + }) + + t.Run("market data source error falls back", func(t *testing.T) { + fb := testFallbackManifest() + fb.MarketData = &controlplanev1.MarketDataConfig{ + Sources: []*controlplanev1.MarketDataSourceDefinition{{Code: "FB"}}, + } + + svc := &ExportService{ + collectors: &ExportCollectors{ + MarketData: &mockMarketDataCollector{srcErr: errors.New("fail")}, + }, + } + + result := &ExportResult{ + Manifest: &controlplanev1.Manifest{}, + SectionSources: make(map[string]string), + } + + svc.collectMarketData(context.Background(), result, fb, "1.0") + + assert.Equal(t, fb.MarketData, result.Manifest.MarketData) + assert.Equal(t, "fallback:manifest-v1.0", result.SectionSources["market_data"]) + require.Len(t, result.Warnings, 1) + }) +} + +func TestExportService_Export_OperationalGateway(t *testing.T) { + liveConns := []*controlplanev1.ProviderConnectionConfig{ + {ConnectionId: "stripe-payments", ProviderName: "Stripe"}, + } + liveRoutes := []*controlplanev1.InstructionRouteConfig{ + {InstructionType: "payment.initiate", ConnectionId: "stripe-payments"}, + } + + t.Run("operational gateway from live collector", func(t *testing.T) { + svc := &ExportService{ + collectors: &ExportCollectors{ + OperationalGateway: &mockOperationalGatewayCollector{ + connections: liveConns, + routes: liveRoutes, + }, + }, + } + + result := &ExportResult{ + Manifest: &controlplanev1.Manifest{}, + SectionSources: make(map[string]string), + } + + svc.collectOperationalGateway(context.Background(), result, nil, "") + + require.NotNil(t, result.Manifest.OperationalGateway) + assert.Equal(t, liveConns, result.Manifest.OperationalGateway.ProviderConnections) + assert.Equal(t, liveRoutes, result.Manifest.OperationalGateway.InstructionRoutes) + assert.Equal(t, "live:operational-gateway", result.SectionSources["operational_gateway"]) + }) + + t.Run("operational gateway connection error falls back", func(t *testing.T) { + fb := testFallbackManifest() + fb.OperationalGateway = &controlplanev1.OperationalGatewayConfig{ + ProviderConnections: []*controlplanev1.ProviderConnectionConfig{{ConnectionId: "fb"}}, + } + + svc := &ExportService{ + collectors: &ExportCollectors{ + OperationalGateway: &mockOperationalGatewayCollector{connErr: errors.New("fail")}, + }, + } + + result := &ExportResult{ + Manifest: &controlplanev1.Manifest{}, + SectionSources: make(map[string]string), + } + + svc.collectOperationalGateway(context.Background(), result, fb, "1.0") + + assert.Equal(t, fb.OperationalGateway, result.Manifest.OperationalGateway) + assert.Equal(t, "fallback:manifest-v1.0", result.SectionSources["operational_gateway"]) + require.Len(t, result.Warnings, 1) + }) +} + +func TestExportService_Export_Organizations(t *testing.T) { + liveOrgs := []*controlplanev1.OrganizationDefinition{ + {Code: "ACME", Name: "Acme Corp", PartyType: "ORGANIZATION"}, + } + + t.Run("organizations from live collector", func(t *testing.T) { + svc := &ExportService{ + collectors: &ExportCollectors{ + Organizations: &mockOrganizationCollector{orgs: liveOrgs}, + }, + } + + result := &ExportResult{ + Manifest: &controlplanev1.Manifest{}, + SectionSources: make(map[string]string), + } + + svc.collectOrganizations(context.Background(), result, nil, "") + assert.Equal(t, liveOrgs, result.Manifest.Organizations) + assert.Equal(t, "live:party", result.SectionSources["organizations"]) + }) +} + +func TestExportService_Export_InternalAccounts(t *testing.T) { + liveAccounts := []*controlplanev1.InternalAccountDefinition{ + {Code: "REVENUE_GBP", AccountType: "REVENUE", Instrument: "GBP"}, + } + + t.Run("internal accounts from live collector", func(t *testing.T) { + svc := &ExportService{ + collectors: &ExportCollectors{ + InternalAccounts: &mockInternalAccountCollector{accounts: liveAccounts}, + }, + } + + result := &ExportResult{ + Manifest: &controlplanev1.Manifest{}, + SectionSources: make(map[string]string), + } + + svc.collectInternalAccounts(context.Background(), result, nil, "") + assert.Equal(t, liveAccounts, result.Manifest.InternalAccounts) + assert.Equal(t, "live:internal-account", result.SectionSources["internal_accounts"]) + }) +} + +func TestExportService_Export_FallbackOnlySections(t *testing.T) { + fb := testFallbackManifest() + + t.Run("valuation rules always from fallback", func(t *testing.T) { + svc := &ExportService{collectors: &ExportCollectors{}} + result := &ExportResult{ + Manifest: &controlplanev1.Manifest{}, + SectionSources: make(map[string]string), + } + svc.collectValuationRules(result, fb, "1.0") + assert.Equal(t, fb.ValuationRules, result.Manifest.ValuationRules) + assert.Equal(t, "fallback:manifest-v1.0", result.SectionSources["valuation_rules"]) + }) + + t.Run("payment rails always from fallback", func(t *testing.T) { + svc := &ExportService{collectors: &ExportCollectors{}} + result := &ExportResult{ + Manifest: &controlplanev1.Manifest{}, + SectionSources: make(map[string]string), + } + svc.collectPaymentRails(result, fb, "1.0") + // fb has no payment rails, so should be nil + assert.Nil(t, result.Manifest.PaymentRails) + }) + + t.Run("mappings always from fallback", func(t *testing.T) { + svc := &ExportService{collectors: &ExportCollectors{}} + result := &ExportResult{ + Manifest: &controlplanev1.Manifest{}, + SectionSources: make(map[string]string), + } + svc.collectMappings(result, fb, "1.0") + assert.Nil(t, result.Manifest.Mappings) + }) + + t.Run("party types always from fallback", func(t *testing.T) { + svc := &ExportService{collectors: &ExportCollectors{}} + result := &ExportResult{ + Manifest: &controlplanev1.Manifest{}, + SectionSources: make(map[string]string), + } + svc.collectPartyTypes(result, fb, "1.0") + assert.Nil(t, result.Manifest.PartyTypes) + }) +} + +func TestExportService_Export_SectionFiltering(t *testing.T) { + t.Run("include only instruments section", func(t *testing.T) { + liveInstruments := []*controlplanev1.InstrumentDefinition{ + { + Code: "EUR", Name: "Euro", Type: controlplanev1.InstrumentType_INSTRUMENT_TYPE_FIAT, + Dimensions: &controlplanev1.InstrumentDimensions{Unit: "EUR", Precision: 2}, + }, + } + + svc := &ExportService{ + collectors: &ExportCollectors{ + Instruments: &mockInstrumentCollector{instruments: liveInstruments}, + Sagas: &mockSagaCollector{sagas: []*controlplanev1.SagaDefinition{{Name: "should_not_appear"}}}, + }, + } + + fb := testFallbackManifest() + result := &ExportResult{ + Manifest: &controlplanev1.Manifest{Version: fb.Version, Metadata: fb.Metadata}, + SectionSources: make(map[string]string), + } + + sections := parseSections([]string{"instruments"}) + // Only instruments section should be collected. + if sections[SectionInstruments] { + svc.collectInstruments(context.Background(), result, fb, "1.0") + } + if sections[SectionSagas] { + svc.collectSagas(context.Background(), result, fb, "1.0") + } + + assert.Equal(t, liveInstruments, result.Manifest.Instruments) + assert.Nil(t, result.Manifest.Sagas) // Sagas not included. + }) +} diff --git a/services/control-plane/internal/manifest/grpc_handler.go b/services/control-plane/internal/manifest/grpc_handler.go index a64233c21..ec69ecfd4 100644 --- a/services/control-plane/internal/manifest/grpc_handler.go +++ b/services/control-plane/internal/manifest/grpc_handler.go @@ -18,8 +18,9 @@ var ErrHistoryServiceRequired = errors.New("history service is required") type HistoryHandler struct { controlplanev1.UnimplementedManifestHistoryServiceServer - history *HistoryService - logger *slog.Logger + history *HistoryService + exporter *ExportService + logger *slog.Logger } // NewHistoryHandler creates a new HistoryHandler. @@ -36,6 +37,17 @@ func NewHistoryHandler(history *HistoryService, logger *slog.Logger) (*HistoryHa }, nil } +// NewHistoryHandlerWithExport creates a HistoryHandler with export support. +// The exporter enables the ExportManifest RPC; when nil, the RPC returns Unimplemented. +func NewHistoryHandlerWithExport(history *HistoryService, exporter *ExportService, logger *slog.Logger) (*HistoryHandler, error) { + h, err := NewHistoryHandler(history, logger) + if err != nil { + return nil, err + } + h.exporter = exporter + return h, nil +} + // GetCurrentManifest retrieves the most recently applied manifest for the tenant. func (h *HistoryHandler) GetCurrentManifest( ctx context.Context, @@ -184,3 +196,24 @@ func diffPlanToProtoSummary(plan *differ.DiffPlan) *controlplanev1.DiffSummary { } return summary } + +// ExportManifest reconstructs a manifest from live service state. +func (h *HistoryHandler) ExportManifest( + ctx context.Context, + req *controlplanev1.ExportManifestRequest, +) (*controlplanev1.ExportManifestResponse, error) { + if h.exporter == nil { + return nil, status.Error(codes.Unimplemented, "export manifest not configured") + } + + result, err := h.exporter.Export(ctx, req.GetIncludeSections(), req.GetManifestVersion()) + if err != nil { + if errors.Is(err, ErrVersionNotFound) { + return nil, status.Error(codes.NotFound, "fallback manifest version not found") + } + h.logger.Error("failed to export manifest", "error", err) + return nil, status.Error(codes.Internal, "failed to export manifest") + } + + return result.ToProtoResponse(), nil +} diff --git a/services/control-plane/internal/manifest/grpc_handler_test.go b/services/control-plane/internal/manifest/grpc_handler_test.go index 506a3a778..3b81e2741 100644 --- a/services/control-plane/internal/manifest/grpc_handler_test.go +++ b/services/control-plane/internal/manifest/grpc_handler_test.go @@ -166,3 +166,36 @@ func TestDiffPlanToProtoSummary_Empty(t *testing.T) { assert.Equal(t, int32(0), summary.Creates) assert.False(t, summary.HasBreakingChanges) } + +func TestExportManifest_NilExporter_ReturnsUnimplemented(t *testing.T) { + repo := &Repository{} + svc, err := NewHistoryService(repo) + require.NoError(t, err) + + handler, err := NewHistoryHandler(svc, nil) + require.NoError(t, err) + + _, err = handler.ExportManifest(context.Background(), &controlplanev1.ExportManifestRequest{}) + st, ok := status.FromError(err) + require.True(t, ok) + assert.Equal(t, codes.Unimplemented, st.Code()) +} + +func TestNewHistoryHandlerWithExport(t *testing.T) { + repo := &Repository{} + svc, err := NewHistoryService(repo) + require.NoError(t, err) + + exporter, err := NewExportService(svc, nil) + require.NoError(t, err) + + handler, err := NewHistoryHandlerWithExport(svc, exporter, nil) + require.NoError(t, err) + assert.NotNil(t, handler) + assert.NotNil(t, handler.exporter) +} + +func TestNewHistoryHandlerWithExport_NilHistory(t *testing.T) { + _, err := NewHistoryHandlerWithExport(nil, nil, nil) + assert.ErrorIs(t, err, ErrHistoryServiceRequired) +} diff --git a/services/control-plane/service/manifest_history.go b/services/control-plane/service/manifest_history.go index f95554209..a94456810 100644 --- a/services/control-plane/service/manifest_history.go +++ b/services/control-plane/service/manifest_history.go @@ -18,13 +18,18 @@ type ManifestHistoryServiceConfig struct { // Logger is the structured logger. Defaults to slog.Default() if nil. Logger *slog.Logger + + // ExportCollectors provides live service collectors for ExportManifest. + // When nil, the ExportManifest RPC returns Unimplemented. + ExportCollectors *manifest.ExportCollectors } // ErrDBRequired is returned when DB is nil during service registration. var ErrDBRequired = errors.New("manifest history service: database connection is required") // RegisterManifestHistoryService creates and registers the ManifestHistoryService -// on the given gRPC server. +// on the given gRPC server. When ExportCollectors is provided, the ExportManifest +// RPC is enabled; otherwise it returns Unimplemented. func RegisterManifestHistoryService(server *grpc.Server, cfg ManifestHistoryServiceConfig) error { if cfg.DB == nil { return ErrDBRequired @@ -40,7 +45,16 @@ func RegisterManifestHistoryService(server *grpc.Server, cfg ManifestHistoryServ return fmt.Errorf("manifest history service: %w", err) } - handler, err := manifest.NewHistoryHandler(historySvc, cfg.Logger) + var handler *manifest.HistoryHandler + if cfg.ExportCollectors != nil { + exporter, exportErr := manifest.NewExportService(historySvc, cfg.ExportCollectors) + if exportErr != nil { + return fmt.Errorf("manifest export service: %w", exportErr) + } + handler, err = manifest.NewHistoryHandlerWithExport(historySvc, exporter, cfg.Logger) + } else { + handler, err = manifest.NewHistoryHandler(historySvc, cfg.Logger) + } if err != nil { return fmt.Errorf("manifest history handler: %w", err) }