Skip to content

Commit d2de27f

Browse files
authored
fix(slack-gateway): preserve channel conversation continuity (#215)
* fix(slack-gateway): preserve channel conversation continuity * fix(slack-gateway): migrate legacy channel conversation ids * fix(api): resolve legacy channel hashes during migration * fix(api): reuse lone legacy channel route during migration * fix(api): migrate unlabeled legacy channel routes * fix(api): allow canonical slack channels after legacy thread fanout * fix(slack-gateway): stop persisting reply ts aliases
1 parent 587b259 commit d2de27f

7 files changed

Lines changed: 962 additions & 119 deletions

File tree

api/channel_conversation_service.go

Lines changed: 160 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -27,19 +27,20 @@ const (
2727
)
2828

2929
type channelConversationUpsertRequest struct {
30-
RequestID string `json:"requestId,omitempty"`
31-
Namespace string `json:"namespace,omitempty"`
32-
ConversationID string `json:"conversationId,omitempty"`
33-
PrincipalID string `json:"principalId"`
34-
InstanceID string `json:"instanceId"`
35-
OwnerID string `json:"ownerId"`
36-
Provider string `json:"provider"`
37-
ExternalScopeType string `json:"externalScopeType"`
38-
ExternalTenantID string `json:"externalTenantId"`
39-
ExternalChannelID string `json:"externalChannelId"`
40-
ExternalConversationID string `json:"externalConversationId"`
41-
Title string `json:"title,omitempty"`
42-
CWD string `json:"cwd,omitempty"`
30+
RequestID string `json:"requestId,omitempty"`
31+
Namespace string `json:"namespace,omitempty"`
32+
ConversationID string `json:"conversationId,omitempty"`
33+
PrincipalID string `json:"principalId"`
34+
InstanceID string `json:"instanceId"`
35+
OwnerID string `json:"ownerId"`
36+
Provider string `json:"provider"`
37+
ExternalScopeType string `json:"externalScopeType"`
38+
ExternalTenantID string `json:"externalTenantId"`
39+
ExternalChannelID string `json:"externalChannelId"`
40+
ExternalConversationID string `json:"externalConversationId"`
41+
LookupExternalConversationIDs []string `json:"lookupExternalConversationIds,omitempty"`
42+
Title string `json:"title,omitempty"`
43+
CWD string `json:"cwd,omitempty"`
4344
}
4445

4546
type normalizedChannelConversationIdentity struct {
@@ -93,6 +94,10 @@ func normalizeChannelConversationUpsertRequest(body channelConversationUpsertReq
9394
if body.ExternalConversationID == "" {
9495
return channelConversationUpsertRequest{}, normalizedChannelConversationIdentity{}, echo.NewHTTPError(http.StatusBadRequest, "externalConversationId is required")
9596
}
97+
body.LookupExternalConversationIDs = normalizeLookupExternalConversationIDs(
98+
body.ExternalConversationID,
99+
body.LookupExternalConversationIDs,
100+
)
96101

97102
return body, normalizedChannelConversationIdentity{
98103
principalID: body.PrincipalID,
@@ -104,6 +109,30 @@ func normalizeChannelConversationUpsertRequest(body channelConversationUpsertReq
104109
}, nil
105110
}
106111

112+
func normalizeLookupExternalConversationIDs(primaryID string, lookupIDs []string) []string {
113+
primaryID = strings.TrimSpace(primaryID)
114+
if len(lookupIDs) == 0 {
115+
return nil
116+
}
117+
normalized := make([]string, 0, len(lookupIDs))
118+
seen := map[string]struct{}{}
119+
for _, lookupID := range lookupIDs {
120+
lookupID = strings.TrimSpace(lookupID)
121+
if lookupID == "" || lookupID == primaryID {
122+
continue
123+
}
124+
if _, ok := seen[lookupID]; ok {
125+
continue
126+
}
127+
seen[lookupID] = struct{}{}
128+
normalized = append(normalized, lookupID)
129+
}
130+
if len(normalized) == 0 {
131+
return nil
132+
}
133+
return normalized
134+
}
135+
107136
func channelConversationRouteHash(identity normalizedChannelConversationIdentity, ownerID, instanceID string) string {
108137
sum := sha256.Sum256([]byte(strings.Join([]string{
109138
identity.principalID,
@@ -211,6 +240,15 @@ func channelConversationHasExternalConversationID(conversation *spritzv1.SpritzC
211240
return false
212241
}
213242

243+
func channelConversationHasAnyExternalConversationID(conversation *spritzv1.SpritzConversation, externalConversationIDs []string) bool {
244+
for _, externalConversationID := range externalConversationIDs {
245+
if channelConversationHasExternalConversationID(conversation, externalConversationID) {
246+
return true
247+
}
248+
}
249+
return false
250+
}
251+
214252
func channelConversationMatchesIdentity(conversation *spritzv1.SpritzConversation, identity normalizedChannelConversationIdentity) bool {
215253
return channelConversationMatchesBaseIdentity(conversation, identity) &&
216254
channelConversationHasExternalConversationID(conversation, identity.externalConversationID)
@@ -275,31 +313,42 @@ func (s *server) getAdminScopedACPReadySpritz(c echo.Context, namespace, instanc
275313
return spritz, nil
276314
}
277315

278-
func (s *server) findChannelConversation(c echo.Context, namespace string, spritz *spritzv1.Spritz, identity normalizedChannelConversationIdentity) (*spritzv1.SpritzConversation, bool, error) {
279-
exactList := &spritzv1.SpritzConversationList{}
280-
if err := s.client.List(
281-
c.Request().Context(),
282-
exactList,
283-
client.InNamespace(namespace),
284-
client.MatchingLabels{
285-
acpConversationLabelKey: acpConversationLabelValue,
286-
acpConversationOwnerLabelKey: ownerLabelValue(spritz.Spec.Owner.ID),
287-
acpConversationSpritzLabelKey: spritz.Name,
288-
channelConversationRouteLabelKey: channelConversationRouteHash(identity, spritz.Spec.Owner.ID, spritz.Name),
289-
},
290-
); err != nil {
291-
return nil, false, err
292-
}
316+
func (s *server) findChannelConversation(c echo.Context, namespace string, spritz *spritzv1.Spritz, identity normalizedChannelConversationIdentity, lookupExternalConversationIDs []string) (*spritzv1.SpritzConversation, bool, error) {
317+
matchExternalConversationIDs := append(
318+
[]string{identity.externalConversationID},
319+
lookupExternalConversationIDs...,
320+
)
293321
var match *spritzv1.SpritzConversation
294-
for i := range exactList.Items {
295-
item := &exactList.Items[i]
296-
if !channelConversationMatchesIdentity(item, identity) {
297-
continue
322+
for _, externalConversationID := range matchExternalConversationIDs {
323+
candidateIdentity := identity
324+
candidateIdentity.externalConversationID = externalConversationID
325+
exactList := &spritzv1.SpritzConversationList{}
326+
if err := s.client.List(
327+
c.Request().Context(),
328+
exactList,
329+
client.InNamespace(namespace),
330+
client.MatchingLabels{
331+
acpConversationLabelKey: acpConversationLabelValue,
332+
acpConversationOwnerLabelKey: ownerLabelValue(spritz.Spec.Owner.ID),
333+
acpConversationSpritzLabelKey: spritz.Name,
334+
channelConversationRouteLabelKey: channelConversationRouteHash(candidateIdentity, spritz.Spec.Owner.ID, spritz.Name),
335+
},
336+
); err != nil {
337+
return nil, false, err
298338
}
299-
if match != nil {
300-
return nil, true, echo.NewHTTPError(http.StatusConflict, "channel conversation is ambiguous")
339+
for i := range exactList.Items {
340+
item := &exactList.Items[i]
341+
if !channelConversationMatchesBaseIdentity(item, identity) || !channelConversationHasAnyExternalConversationID(item, matchExternalConversationIDs) {
342+
continue
343+
}
344+
if match != nil && item.Name == match.Name {
345+
continue
346+
}
347+
if match != nil {
348+
return nil, true, echo.NewHTTPError(http.StatusConflict, "channel conversation is ambiguous")
349+
}
350+
match = item.DeepCopy()
301351
}
302-
match = item.DeepCopy()
303352
}
304353

305354
baseList := &spritzv1.SpritzConversationList{}
@@ -320,23 +369,92 @@ func (s *server) findChannelConversation(c echo.Context, namespace string, sprit
320369
); err != nil {
321370
return nil, false, err
322371
}
372+
var fallbackMatch *spritzv1.SpritzConversation
373+
fallbackMatchCount := 0
323374
for i := range baseList.Items {
324375
item := &baseList.Items[i]
325-
if !channelConversationMatchesIdentity(item, identity) {
376+
if !channelConversationMatchesBaseIdentity(item, identity) {
326377
continue
327378
}
328-
if match != nil && item.Name == match.Name {
379+
if channelConversationHasAnyExternalConversationID(item, matchExternalConversationIDs) {
380+
if match != nil && item.Name == match.Name {
381+
continue
382+
}
383+
if match != nil {
384+
return nil, true, echo.NewHTTPError(http.StatusConflict, "channel conversation is ambiguous")
385+
}
386+
match = item.DeepCopy()
329387
continue
330388
}
389+
390+
// During the Slack cutover, a previously used channel may only have an
391+
// older per-thread/per-message identity. Reuse that lone base-route match
392+
// instead of forking a fresh channel-scoped conversation.
331393
if match != nil {
332-
return nil, true, echo.NewHTTPError(http.StatusConflict, "channel conversation is ambiguous")
394+
continue
395+
}
396+
if fallbackMatch != nil && item.Name == fallbackMatch.Name {
397+
continue
398+
}
399+
if fallbackMatch == nil {
400+
fallbackMatch = item.DeepCopy()
401+
}
402+
fallbackMatchCount++
403+
}
404+
if match != nil {
405+
return match, true, nil
406+
}
407+
408+
legacyList := &spritzv1.SpritzConversationList{}
409+
if err := s.client.List(
410+
c.Request().Context(),
411+
legacyList,
412+
client.InNamespace(namespace),
413+
client.MatchingLabels{
414+
acpConversationLabelKey: acpConversationLabelValue,
415+
acpConversationOwnerLabelKey: ownerLabelValue(spritz.Spec.Owner.ID),
416+
acpConversationSpritzLabelKey: spritz.Name,
417+
},
418+
); err != nil {
419+
return nil, false, err
420+
}
421+
for i := range legacyList.Items {
422+
item := &legacyList.Items[i]
423+
if strings.TrimSpace(item.Labels[channelConversationBaseRouteLabelKey]) != "" {
424+
continue
425+
}
426+
if !channelConversationMatchesBaseIdentity(item, identity) {
427+
continue
428+
}
429+
if channelConversationHasAnyExternalConversationID(item, matchExternalConversationIDs) {
430+
if match != nil && item.Name == match.Name {
431+
continue
432+
}
433+
if match != nil {
434+
return nil, true, echo.NewHTTPError(http.StatusConflict, "channel conversation is ambiguous")
435+
}
436+
match = item.DeepCopy()
437+
continue
438+
}
439+
440+
// Some pre-cutover conversations predate the base-route label entirely.
441+
// Reuse that lone legacy match instead of forking a new channel-scoped
442+
// conversation on the first post-deploy top-level message.
443+
if fallbackMatch != nil && item.Name == fallbackMatch.Name {
444+
continue
333445
}
334-
match = item.DeepCopy()
446+
if fallbackMatch == nil {
447+
fallbackMatch = item.DeepCopy()
448+
}
449+
fallbackMatchCount++
450+
}
451+
if match != nil {
452+
return match, true, nil
335453
}
336-
if match == nil {
337-
return nil, false, nil
454+
if fallbackMatchCount == 1 {
455+
return fallbackMatch, true, nil
338456
}
339-
return match, true, nil
457+
return nil, false, nil
340458
}
341459

342460
func applyChannelConversationMetadata(conversation *spritzv1.SpritzConversation, identity normalizedChannelConversationIdentity, requestID string, spritz *spritzv1.Spritz) {

api/channel_conversations.go

Lines changed: 67 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,52 @@ import (
55

66
"github.com/labstack/echo/v4"
77
apierrors "k8s.io/apimachinery/pkg/api/errors"
8+
"k8s.io/client-go/util/retry"
89
spritzv1 "spritz.sh/operator/api/v1"
910
)
1011

12+
func (s *server) backfillFoundChannelConversation(
13+
c echo.Context,
14+
namespace string,
15+
conversationName string,
16+
identity normalizedChannelConversationIdentity,
17+
spritz *spritzv1.Spritz,
18+
requestID string,
19+
) (*spritzv1.SpritzConversation, error) {
20+
var updated *spritzv1.SpritzConversation
21+
err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
22+
current := &spritzv1.SpritzConversation{}
23+
if err := s.client.Get(c.Request().Context(), clientKey(namespace, conversationName), current); err != nil {
24+
return err
25+
}
26+
changed := ensureChannelConversationBaseRouteLabel(current, identity, spritz)
27+
if !channelConversationHasExternalConversationID(current, identity.externalConversationID) {
28+
aliasChanged, err := appendChannelConversationAlias(current, identity.externalConversationID)
29+
if err != nil {
30+
return err
31+
}
32+
changed = changed || aliasChanged
33+
}
34+
if requestID != "" {
35+
if current.Annotations == nil {
36+
current.Annotations = map[string]string{}
37+
}
38+
current.Annotations[requestIDAnnotationKey] = requestID
39+
changed = true
40+
}
41+
if !changed {
42+
updated = current.DeepCopy()
43+
return nil
44+
}
45+
if err := s.client.Update(c.Request().Context(), current); err != nil {
46+
return err
47+
}
48+
updated = current.DeepCopy()
49+
return nil
50+
})
51+
return updated, err
52+
}
53+
1154
func (s *server) upsertChannelConversation(c echo.Context) error {
1255
if !s.acp.enabled {
1356
return writeError(c, http.StatusNotFound, "acp disabled")
@@ -61,7 +104,7 @@ func (s *server) upsertChannelConversation(c echo.Context) error {
61104
if !channelConversationMatchesBaseIdentity(existing, identity) || !channelConversationBelongsToSpritz(existing, spritz) {
62105
return writeError(c, http.StatusConflict, "channel conversation is ambiguous")
63106
}
64-
conversation, found, err := s.findChannelConversation(c, namespace, spritz, identity)
107+
conversation, found, err := s.findChannelConversation(c, namespace, spritz, identity, normalizedBody.LookupExternalConversationIDs)
65108
if err != nil {
66109
if httpErr, ok := err.(*echo.HTTPError); ok {
67110
return writeError(c, httpErr.Code, httpErr.Message.(string))
@@ -71,35 +114,39 @@ func (s *server) upsertChannelConversation(c echo.Context) error {
71114
if found && conversation.Name != existing.Name {
72115
return writeError(c, http.StatusConflict, "channel conversation is ambiguous")
73116
}
74-
changed := ensureChannelConversationBaseRouteLabel(existing, identity, spritz)
75-
aliasChanged, err := appendChannelConversationAlias(existing, identity.externalConversationID)
117+
updatedConversation, err := s.backfillFoundChannelConversation(
118+
c,
119+
namespace,
120+
existing.Name,
121+
identity,
122+
spritz,
123+
normalizedBody.RequestID,
124+
)
76125
if err != nil {
77-
return writeError(c, http.StatusInternalServerError, err.Error())
78-
}
79-
changed = changed || aliasChanged
80-
if normalizedBody.RequestID != "" {
81-
if existing.Annotations == nil {
82-
existing.Annotations = map[string]string{}
83-
}
84-
existing.Annotations[requestIDAnnotationKey] = normalizedBody.RequestID
85-
changed = true
86-
}
87-
if changed {
88-
if err := s.client.Update(c.Request().Context(), existing); err != nil {
89-
return s.writeACPResourceError(c, err)
90-
}
126+
return s.writeACPResourceError(c, err)
91127
}
92-
return writeJSON(c, http.StatusOK, map[string]any{"created": false, "conversation": existing})
128+
return writeJSON(c, http.StatusOK, map[string]any{"created": false, "conversation": updatedConversation})
93129
}
94-
conversation, found, err := s.findChannelConversation(c, namespace, spritz, identity)
130+
conversation, found, err := s.findChannelConversation(c, namespace, spritz, identity, normalizedBody.LookupExternalConversationIDs)
95131
if err != nil {
96132
if httpErr, ok := err.(*echo.HTTPError); ok {
97133
return writeError(c, httpErr.Code, httpErr.Message.(string))
98134
}
99135
return writeError(c, http.StatusInternalServerError, err.Error())
100136
}
101137
if found {
102-
return writeJSON(c, http.StatusOK, map[string]any{"created": false, "conversation": conversation})
138+
updatedConversation, err := s.backfillFoundChannelConversation(
139+
c,
140+
namespace,
141+
conversation.Name,
142+
identity,
143+
spritz,
144+
normalizedBody.RequestID,
145+
)
146+
if err != nil {
147+
return s.writeACPResourceError(c, err)
148+
}
149+
return writeJSON(c, http.StatusOK, map[string]any{"created": false, "conversation": updatedConversation})
103150
}
104151

105152
conversation, err = buildACPConversationResource(spritz, normalizedBody.Title, normalizedBody.CWD)

0 commit comments

Comments
 (0)