-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathnats_mapper.go
More file actions
159 lines (130 loc) · 4.49 KB
/
nats_mapper.go
File metadata and controls
159 lines (130 loc) · 4.49 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
// Copyright The Linux Foundation and each contributor to LFX.
// SPDX-License-Identifier: MIT
package idmapper
import (
"context"
"fmt"
"strings"
"time"
"github.com/linuxfoundation/lfx-v2-survey-service/internal/domain"
"github.com/nats-io/nats.go"
)
const (
// NATS subject for v1-sync-helper lookup
lookupSubject = "lfx.lookup_v1_mapping"
// Default request timeout
defaultTimeout = 5 * time.Second
)
// Config holds the configuration for the NATS-based ID mapper
type Config struct {
URL string
Timeout time.Duration
}
// NATSMapper implements IDMapper using NATS messaging to the v1-sync-helper service
type NATSMapper struct {
conn *nats.Conn
timeout time.Duration
}
// NewNATSMapper creates a new NATS-based ID mapper
func NewNATSMapper(cfg Config) (*NATSMapper, error) {
if cfg.URL == "" {
return nil, fmt.Errorf("NATS URL is required")
}
timeout := cfg.Timeout
if timeout == 0 {
timeout = defaultTimeout
}
// Connect to NATS server
conn, err := nats.Connect(cfg.URL)
if err != nil {
return nil, fmt.Errorf("failed to connect to NATS: %w", err)
}
return &NATSMapper{
conn: conn,
timeout: timeout,
}, nil
}
// Close closes the NATS connection
func (m *NATSMapper) Close() {
if m.conn != nil {
m.conn.Close()
}
}
// MapProjectV2ToV1 maps a v2 project UID to v1 project SFID
func (m *NATSMapper) MapProjectV2ToV1(ctx context.Context, v2UID string) (string, error) {
if v2UID == "" {
return "", domain.NewValidationError("v2 project UID is required")
}
// Request format: project.uid.{v2_uuid} returns {v1_sfid}
key := fmt.Sprintf("project.uid.%s", v2UID)
return m.lookup(ctx, key)
}
// MapProjectV1ToV2 maps a v1 project SFID to v2 project UID
func (m *NATSMapper) MapProjectV1ToV2(ctx context.Context, v1SFID string) (string, error) {
if v1SFID == "" {
return "", domain.NewValidationError("v1 project SFID is required")
}
// Request format: project.sfid.{v1_sfid} returns {v2_uuid}
key := fmt.Sprintf("project.sfid.%s", v1SFID)
return m.lookup(ctx, key)
}
// MapCommitteeV2ToV1 maps a v2 committee UID to v1 committee SFID
// The NATS response format is {project_sfid}:{committee_sfid}, but we only return the committee SFID
func (m *NATSMapper) MapCommitteeV2ToV1(ctx context.Context, v2UID string) (string, error) {
if v2UID == "" {
return "", domain.NewValidationError("v2 committee UID is required")
}
// Request format: committee.uid.{v2_uuid} returns {project_sfid}:{committee_sfid}
key := fmt.Sprintf("committee.uid.%s", v2UID)
response, err := m.lookup(ctx, key)
if err != nil {
return "", err
}
// Parse the response to extract only the committee SFID
// Format: "projectSFID:committeeSFID" -> we want "committeeSFID"
// If no colon present, assume the response is already just the committee SFID
parts := strings.Split(response, ":")
if len(parts) == 1 {
return response, nil
}
if len(parts) != 2 {
return "", domain.NewUnavailableError(fmt.Sprintf("unexpected committee mapping format: %s", response))
}
committeeSFID := parts[1]
if committeeSFID == "" {
return "", domain.NewUnavailableError("committee SFID is empty in mapping response")
}
return committeeSFID, nil
}
// MapCommitteeV1ToV2 maps a v1 committee SFID to v2 committee UID
func (m *NATSMapper) MapCommitteeV1ToV2(ctx context.Context, v1SFID string) (string, error) {
if v1SFID == "" {
return "", domain.NewValidationError("v1 committee SFID is required")
}
// Request format: committee.sfid.{v1_sfid} returns {v2_uuid}
key := fmt.Sprintf("committee.sfid.%s", v1SFID)
return m.lookup(ctx, key)
}
// lookup performs the NATS request/reply lookup
func (m *NATSMapper) lookup(ctx context.Context, key string) (string, error) {
// Send request with timeout
msg, err := m.conn.RequestWithContext(ctx, lookupSubject, []byte(key))
if err != nil {
if err == context.DeadlineExceeded || err == nats.ErrTimeout {
return "", domain.NewUnavailableError("v1-sync-helper lookup timed out", err)
}
return "", domain.NewUnavailableError("failed to lookup ID mapping", err)
}
// Parse response
response := string(msg.Data)
// Check for error response (prefixed with "error: ")
if after, ok := strings.CutPrefix(response, "error: "); ok {
errMsg := after
return "", domain.NewUnavailableError(fmt.Sprintf("v1-sync-helper error: %s", errMsg))
}
// Empty response means not found - return as validation error since client provided invalid ID
if response == "" {
return "", domain.NewValidationError(fmt.Sprintf("invalid ID: mapping not found for %s", key))
}
return response, nil
}