Skip to content

Commit 142368f

Browse files
authored
feat: Add PayloadTransformer port and mapping adapters for operational gateway (#1318)
* feat: add PayloadTransformer port and mapping adapters for operational gateway Introduces the PayloadTransformer port interface with InstructionOutcome and InstructionRoute types. Implements two adapters: - mapping.Transformer: delegates to the shared bidirectional mapping engine using named MappingDefinitions, with header passthrough and HTTP status fallback when provider_status is absent from the mapped result. - passthrough.Transformer: no-op implementation that serializes the instruction payload as-is, used when no mapping is configured. Outbound transform latency: ~2.5us. Inbound: ~1.4us (well under 5ms target). * fix: add nil guards to mapping and passthrough transformers - NewTransformer panics on nil resolver or engine (fail-fast wiring) - TransformOutbound returns ErrTransformFailed for nil instruction or route - TransformInbound returns ErrTransformFailed for nil route - Add nil guard tests for all three cases in both adapters * refactor: address CodeRabbit nitpicks in transformer tests - newTestInstruction accepts *testing.T to surface constructor errors - Remove unused benchmark instruction setup (inst + MarkDispatching/MarkDelivered) - TestTransformer_ImplementsPayloadTransformer checks engine creation error * fix: Set ShouldRetry for transient HTTP failures in passthrough transformer 429 (rate limit) and 5xx (server error) codes are transient and should be retried. Add header-copy isolation test to assert defensive copy semantics. * fix: Set ShouldRetry for transient HTTP failures in mapping transformer Align mapping adapter with passthrough adapter: 429 and 5xx status codes set ShouldRetry=true in both defaultOutcome (no mapping) and the HTTP fallback path in extractOutcome (mapping present but no provider_status field). Explicit ShouldRetry from the mapping definition takes precedence. --------- Co-authored-by: Ben Coombs <bjcoombs@users.noreply.github.com>
1 parent 41f6553 commit 142368f

6 files changed

Lines changed: 1062 additions & 0 deletions

File tree

Lines changed: 207 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,207 @@
1+
// Package mapping provides the MappingTransformer adapter for the operational gateway.
2+
// It delegates payload transformation to the shared bidirectional mapping engine, which
3+
// applies MappingDefinition proto transforms between the internal instruction format
4+
// and the provider's external format.
5+
package mapping
6+
7+
import (
8+
"context"
9+
"encoding/json"
10+
"fmt"
11+
"log/slog"
12+
13+
mappingv1 "github.com/meridianhub/meridian/api/proto/meridian/mapping/v1"
14+
"github.com/meridianhub/meridian/services/operational-gateway/domain"
15+
"github.com/meridianhub/meridian/services/operational-gateway/ports"
16+
sharedmapping "github.com/meridianhub/meridian/shared/pkg/mapping"
17+
)
18+
19+
// DefinitionResolver looks up a MappingDefinition by name for the current tenant.
20+
// Implementations may call the reference-data gRPC service, query a local cache,
21+
// or read from the manifest.
22+
type DefinitionResolver interface {
23+
// Resolve returns the latest active MappingDefinition with the given name.
24+
// Returns ports.ErrMappingNotFound if no active definition exists.
25+
Resolve(ctx context.Context, name string) (*mappingv1.MappingDefinition, error)
26+
}
27+
28+
// Transformer applies bidirectional MappingDefinition transforms to instruction payloads.
29+
// It implements ports.PayloadTransformer using the shared mapping engine.
30+
type Transformer struct {
31+
resolver DefinitionResolver
32+
engine *sharedmapping.Engine
33+
logger *slog.Logger
34+
}
35+
36+
// NewTransformer creates a new mapping Transformer.
37+
func NewTransformer(resolver DefinitionResolver, engine *sharedmapping.Engine, logger *slog.Logger) *Transformer {
38+
if resolver == nil {
39+
panic("mapping.NewTransformer: resolver must not be nil")
40+
}
41+
if engine == nil {
42+
panic("mapping.NewTransformer: engine must not be nil")
43+
}
44+
if logger == nil {
45+
logger = slog.Default()
46+
}
47+
return &Transformer{
48+
resolver: resolver,
49+
engine: engine,
50+
logger: logger,
51+
}
52+
}
53+
54+
// TransformOutbound serializes the instruction payload to JSON and applies the outbound
55+
// MappingDefinition transform named by route.OutboundMapping.
56+
// When route.OutboundMapping is empty, the instruction payload is returned as-is (passthrough).
57+
// Returns the transformed body bytes and any static route headers.
58+
func (t *Transformer) TransformOutbound(ctx context.Context, instruction *domain.Instruction, route *ports.InstructionRoute) ([]byte, map[string]string, error) {
59+
if instruction == nil {
60+
return nil, nil, fmt.Errorf("%w: instruction is nil", ports.ErrTransformFailed)
61+
}
62+
if route == nil {
63+
return nil, nil, fmt.Errorf("%w: route is nil", ports.ErrTransformFailed)
64+
}
65+
payloadBytes, err := json.Marshal(instruction.Payload)
66+
if err != nil {
67+
return nil, nil, fmt.Errorf("%w: marshaling instruction payload: %w", ports.ErrTransformFailed, err)
68+
}
69+
70+
if route.OutboundMapping == "" {
71+
return payloadBytes, copyHeaders(route.Headers), nil
72+
}
73+
74+
def, err := t.resolver.Resolve(ctx, route.OutboundMapping)
75+
if err != nil {
76+
return nil, nil, fmt.Errorf("%w: resolving outbound mapping %q: %w", ports.ErrTransformFailed, route.OutboundMapping, err)
77+
}
78+
79+
transformed, err := t.engine.TransformOutbound(def, payloadBytes)
80+
if err != nil {
81+
return nil, nil, fmt.Errorf("%w: applying outbound mapping %q: %w", ports.ErrTransformFailed, route.OutboundMapping, err)
82+
}
83+
84+
t.logger.DebugContext(ctx, "outbound transform applied",
85+
"mapping", route.OutboundMapping,
86+
"instruction_id", instruction.ID.String(),
87+
)
88+
89+
return transformed, copyHeaders(route.Headers), nil
90+
}
91+
92+
// TransformInbound applies the inbound MappingDefinition transform named by route.InboundMapping
93+
// to the provider response body and extracts an InstructionOutcome.
94+
// When route.InboundMapping is empty, a default InstructionOutcome is derived from the HTTP
95+
// status code: 2xx is treated as success with no external ID, anything else as a failure.
96+
func (t *Transformer) TransformInbound(ctx context.Context, statusCode int, body []byte, route *ports.InstructionRoute) (*ports.InstructionOutcome, error) {
97+
if route == nil {
98+
return nil, fmt.Errorf("%w: route is nil", ports.ErrTransformFailed)
99+
}
100+
if route.InboundMapping == "" {
101+
return defaultOutcome(statusCode), nil
102+
}
103+
104+
def, err := t.resolver.Resolve(ctx, route.InboundMapping)
105+
if err != nil {
106+
return nil, fmt.Errorf("%w: resolving inbound mapping %q: %w", ports.ErrTransformFailed, route.InboundMapping, err)
107+
}
108+
109+
result, err := t.engine.TransformInbound(def, body)
110+
if err != nil {
111+
return nil, fmt.Errorf("%w: applying inbound mapping %q: %w", ports.ErrTransformFailed, route.InboundMapping, err)
112+
}
113+
114+
t.logger.DebugContext(ctx, "inbound transform applied",
115+
"mapping", route.InboundMapping,
116+
"status_code", statusCode,
117+
)
118+
119+
outcome, err := extractOutcome(result.ProtoJSON, statusCode)
120+
if err != nil {
121+
return nil, fmt.Errorf("%w: extracting outcome from inbound mapping result: %w", ports.ErrTransformFailed, err)
122+
}
123+
124+
return outcome, nil
125+
}
126+
127+
// extractOutcome parses the mapped JSON output into an InstructionOutcome.
128+
// It looks for the well-known fields: external_id, provider_status, should_retry, failure_reason.
129+
// Unknown fields in the mapped output are silently ignored.
130+
func extractOutcome(mappedJSON []byte, statusCode int) (*ports.InstructionOutcome, error) {
131+
var mapped struct {
132+
ExternalID string `json:"external_id"`
133+
ProviderStatus string `json:"provider_status"`
134+
ShouldRetry bool `json:"should_retry"`
135+
FailureReason string `json:"failure_reason"`
136+
}
137+
138+
if len(mappedJSON) > 0 {
139+
if err := json.Unmarshal(mappedJSON, &mapped); err != nil {
140+
return nil, fmt.Errorf("parsing mapped outcome JSON: %w", err)
141+
}
142+
}
143+
144+
outcome := &ports.InstructionOutcome{
145+
ExternalID: mapped.ExternalID,
146+
ProviderStatus: mapped.ProviderStatus,
147+
ShouldRetry: mapped.ShouldRetry,
148+
FailureReason: mapped.FailureReason,
149+
}
150+
151+
// If provider_status is absent from the mapping, fall back to HTTP status code semantics.
152+
if outcome.ProviderStatus == "" {
153+
if isSuccess(statusCode) {
154+
outcome.ProviderStatus = "ACCEPTED"
155+
} else {
156+
outcome.ProviderStatus = "REJECTED"
157+
if outcome.FailureReason == "" {
158+
outcome.FailureReason = fmt.Sprintf("provider returned HTTP %d", statusCode)
159+
}
160+
// Only set ShouldRetry from HTTP semantics when the mapping did not
161+
// explicitly set it (i.e. it is still false/zero).
162+
if !outcome.ShouldRetry {
163+
outcome.ShouldRetry = isTransient(statusCode)
164+
}
165+
}
166+
}
167+
168+
return outcome, nil
169+
}
170+
171+
// defaultOutcome derives an InstructionOutcome from the HTTP status code alone,
172+
// used when no inbound mapping is configured for a route.
173+
func defaultOutcome(statusCode int) *ports.InstructionOutcome {
174+
if isSuccess(statusCode) {
175+
return &ports.InstructionOutcome{
176+
ProviderStatus: "ACCEPTED",
177+
}
178+
}
179+
return &ports.InstructionOutcome{
180+
ProviderStatus: "REJECTED",
181+
FailureReason: fmt.Sprintf("provider returned HTTP %d", statusCode),
182+
ShouldRetry: isTransient(statusCode),
183+
}
184+
}
185+
186+
// isSuccess returns true if the HTTP status code indicates a successful response (2xx).
187+
func isSuccess(statusCode int) bool {
188+
return statusCode >= 200 && statusCode < 300
189+
}
190+
191+
// isTransient returns true for HTTP status codes that indicate a transient failure
192+
// which warrants a retry: 429 (rate limit) and 5xx (server errors).
193+
func isTransient(statusCode int) bool {
194+
return statusCode == 429 || (statusCode >= 500 && statusCode < 600)
195+
}
196+
197+
// copyHeaders returns a shallow copy of the header map, or nil if the map is empty.
198+
func copyHeaders(headers map[string]string) map[string]string {
199+
if len(headers) == 0 {
200+
return nil
201+
}
202+
out := make(map[string]string, len(headers))
203+
for k, v := range headers {
204+
out[k] = v
205+
}
206+
return out
207+
}

0 commit comments

Comments
 (0)