Skip to content

Commit 25c78e2

Browse files
committed
fix(datasource): invalidate cursor for deleted knowledge
1 parent 1c31d64 commit 25c78e2

5 files changed

Lines changed: 253 additions & 2 deletions

File tree

internal/application/repository/datasource_repo.go

Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package repository
22

33
import (
44
"context"
5+
"encoding/json"
56
"errors"
67
"time"
78

@@ -107,6 +108,147 @@ func (r *DataSourceRepository) UpdateSyncState(ctx context.Context, ds *types.Da
107108
return nil
108109
}
109110

111+
// InvalidateCursorItem removes a single item from a data source's incremental
112+
// cursor. This keeps the cursor consistent when a locally synced knowledge item
113+
// is deleted, allowing the next incremental sync to re-fetch that upstream
114+
// item even if its edit timestamp did not change.
115+
func (r *DataSourceRepository) InvalidateCursorItem(
116+
ctx context.Context,
117+
tenantID uint64,
118+
dataSourceID string,
119+
externalID string,
120+
sourceResourceID string,
121+
) error {
122+
if dataSourceID == "" {
123+
return errors.New("data source id is empty")
124+
}
125+
if externalID == "" {
126+
return nil
127+
}
128+
129+
var ds types.DataSource
130+
if err := r.db.WithContext(ctx).
131+
Where("id = ?", dataSourceID).
132+
Where("tenant_id = ?", tenantID).
133+
Where("deleted_at IS NULL").
134+
First(&ds).Error; err != nil {
135+
if errors.Is(err, gorm.ErrRecordNotFound) {
136+
return nil
137+
}
138+
return err
139+
}
140+
if len(ds.LastSyncCursor) == 0 {
141+
return nil
142+
}
143+
144+
var cursor map[string]interface{}
145+
if err := json.Unmarshal(ds.LastSyncCursor, &cursor); err != nil {
146+
return err
147+
}
148+
if !invalidateCursorItem(cursor, externalID, sourceResourceID) {
149+
return nil
150+
}
151+
cursorJSON, err := json.Marshal(cursor)
152+
if err != nil {
153+
return err
154+
}
155+
156+
return r.db.WithContext(ctx).
157+
Model(&types.DataSource{}).
158+
Where("id = ?", dataSourceID).
159+
Where("tenant_id = ?", tenantID).
160+
Update("last_sync_cursor", types.JSON(cursorJSON)).Error
161+
}
162+
163+
func invalidateCursorItem(cursor map[string]interface{}, externalID string, sourceResourceID string) bool {
164+
if cursor == nil || externalID == "" {
165+
return false
166+
}
167+
168+
connectorCursor, hasConnectorCursor := cursor["connector_cursor"].(map[string]interface{})
169+
if !hasConnectorCursor {
170+
connectorCursor = cursor
171+
}
172+
173+
changed := invalidateNestedCursorItem(connectorCursor, externalID, sourceResourceID)
174+
if changed {
175+
return true
176+
}
177+
if hasConnectorCursor {
178+
return invalidateNestedCursorItem(cursor, externalID, sourceResourceID)
179+
}
180+
return false
181+
}
182+
183+
func invalidateNestedCursorItem(node map[string]interface{}, externalID string, sourceResourceID string) bool {
184+
if node == nil {
185+
return false
186+
}
187+
188+
if sourceResourceID != "" {
189+
return invalidateNestedCursorItemForResource(node, externalID, sourceResourceID)
190+
}
191+
192+
return deleteCursorItemRecursive(node, externalID)
193+
}
194+
195+
func invalidateNestedCursorItemForResource(node map[string]interface{}, externalID string, sourceResourceID string) bool {
196+
if node == nil {
197+
return false
198+
}
199+
200+
changed := false
201+
for key, value := range node {
202+
nested, ok := value.(map[string]interface{})
203+
if !ok {
204+
continue
205+
}
206+
if key == sourceResourceID {
207+
if deleteCursorItemRecursive(nested, externalID) {
208+
changed = true
209+
if len(nested) == 0 {
210+
delete(node, key)
211+
}
212+
}
213+
continue
214+
}
215+
if invalidateNestedCursorItemForResource(nested, externalID, sourceResourceID) {
216+
changed = true
217+
if len(nested) == 0 {
218+
delete(node, key)
219+
}
220+
}
221+
}
222+
return changed
223+
}
224+
225+
func deleteCursorItemRecursive(node map[string]interface{}, externalID string) bool {
226+
if node == nil {
227+
return false
228+
}
229+
230+
changed := false
231+
if _, exists := node[externalID]; exists {
232+
delete(node, externalID)
233+
changed = true
234+
}
235+
236+
for key, value := range node {
237+
nested, ok := value.(map[string]interface{})
238+
if !ok {
239+
continue
240+
}
241+
if deleteCursorItemRecursive(nested, externalID) {
242+
changed = true
243+
if len(nested) == 0 {
244+
delete(node, key)
245+
}
246+
}
247+
}
248+
249+
return changed
250+
}
251+
110252
// Delete performs a soft delete
111253
func (r *DataSourceRepository) Delete(ctx context.Context, id string) error {
112254
if id == "" {

internal/application/repository/datasource_repo_test.go

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package repository
22

33
import (
44
"context"
5+
"encoding/json"
56
"testing"
67
"time"
78

@@ -51,6 +52,79 @@ func TestDataSourceRepositoryUpdateSyncStateClearsErrorMessage(t *testing.T) {
5152
require.NotNil(t, stored.LastSyncAt)
5253
}
5354

55+
func TestDataSourceRepositoryInvalidateCursorItemRemovesFeishuNode(t *testing.T) {
56+
db := setupDataSourceRepoTestDB(t)
57+
repo := NewDataSourceRepository(db)
58+
cursor := types.JSON(`{
59+
"last_sync_time":"2026-06-10T00:00:00Z",
60+
"connector_cursor":{
61+
"space_node_times":{
62+
"space-1:root-a":{"node-a":"100","node-b":"200"},
63+
"space-1:root-b":{"node-a":"300"}
64+
}
65+
}
66+
}`)
67+
ds := &types.DataSource{
68+
ID: "ds-feishu",
69+
TenantID: 1,
70+
KnowledgeBaseID: "kb-1",
71+
Name: "Feishu",
72+
Type: types.ConnectorTypeFeishu,
73+
Status: types.DataSourceStatusActive,
74+
LastSyncCursor: cursor,
75+
}
76+
require.NoError(t, repo.Create(context.Background(), ds))
77+
78+
require.NoError(t, repo.InvalidateCursorItem(context.Background(), 1, ds.ID, "node-a", "space-1:root-a"))
79+
80+
var stored types.DataSource
81+
require.NoError(t, db.First(&stored, "id = ?", ds.ID).Error)
82+
var decoded map[string]interface{}
83+
require.NoError(t, json.Unmarshal(stored.LastSyncCursor, &decoded))
84+
connectorCursor := decoded["connector_cursor"].(map[string]interface{})
85+
spaceNodeTimes := connectorCursor["space_node_times"].(map[string]interface{})
86+
rootA := spaceNodeTimes["space-1:root-a"].(map[string]interface{})
87+
rootB := spaceNodeTimes["space-1:root-b"].(map[string]interface{})
88+
assert.NotContains(t, rootA, "node-a")
89+
assert.Equal(t, "200", rootA["node-b"])
90+
assert.Equal(t, "300", rootB["node-a"])
91+
}
92+
93+
func TestDataSourceRepositoryInvalidateCursorItemRemovesGenericExternalID(t *testing.T) {
94+
db := setupDataSourceRepoTestDB(t)
95+
repo := NewDataSourceRepository(db)
96+
cursor := types.JSON(`{
97+
"last_sync_time":"2026-06-10T00:00:00Z",
98+
"connector_cursor":{
99+
"page_edit_times":{
100+
"page-a":"2026-06-09T00:00:00Z",
101+
"page-b":"2026-06-09T01:00:00Z"
102+
}
103+
}
104+
}`)
105+
ds := &types.DataSource{
106+
ID: "ds-notion",
107+
TenantID: 1,
108+
KnowledgeBaseID: "kb-1",
109+
Name: "Notion",
110+
Type: types.ConnectorTypeNotion,
111+
Status: types.DataSourceStatusActive,
112+
LastSyncCursor: cursor,
113+
}
114+
require.NoError(t, repo.Create(context.Background(), ds))
115+
116+
require.NoError(t, repo.InvalidateCursorItem(context.Background(), 1, ds.ID, "page-a", ""))
117+
118+
var stored types.DataSource
119+
require.NoError(t, db.First(&stored, "id = ?", ds.ID).Error)
120+
var decoded map[string]interface{}
121+
require.NoError(t, json.Unmarshal(stored.LastSyncCursor, &decoded))
122+
connectorCursor := decoded["connector_cursor"].(map[string]interface{})
123+
pageEditTimes := connectorCursor["page_edit_times"].(map[string]interface{})
124+
assert.NotContains(t, pageEditTimes, "page-a")
125+
assert.Contains(t, pageEditTimes, "page-b")
126+
}
127+
54128
func TestSyncLogRepositoryUpdateResultClearsErrorMessage(t *testing.T) {
55129
db := setupDataSourceRepoTestDB(t)
56130
repo := NewSyncLogRepository(db)

internal/application/service/knowledge.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ type knowledgeService struct {
4343
retrieveEngine interfaces.RetrieveEngineRegistry
4444
ownership retriever.TenantStoreOwnership
4545
repo interfaces.KnowledgeRepository
46+
dataSourceRepo interfaces.DataSourceRepository
4647
kbService interfaces.KnowledgeBaseService
4748
tenantRepo interfaces.TenantRepository
4849
tenantService interfaces.TenantService
@@ -84,6 +85,7 @@ const (
8485
func NewKnowledgeService(
8586
config *config.Config,
8687
repo interfaces.KnowledgeRepository,
88+
dataSourceRepo interfaces.DataSourceRepository,
8789
documentReader interfaces.DocumentReader,
8890
kbService interfaces.KnowledgeBaseService,
8991
tenantRepo interfaces.TenantRepository,
@@ -110,6 +112,7 @@ func NewKnowledgeService(
110112
return &knowledgeService{
111113
config: config,
112114
repo: repo,
115+
dataSourceRepo: dataSourceRepo,
113116
kbService: kbService,
114117
tenantRepo: tenantRepo,
115118
tenantService: tenantService,

internal/application/service/knowledge_delete.go

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,23 @@ func deleteExtractedImages(ctx context.Context, fileSvc interfaces.FileService,
5555
}
5656
}
5757

58+
func (s *knowledgeService) invalidateDataSourceCursorForKnowledge(ctx context.Context, knowledge *types.Knowledge) {
59+
if s.dataSourceRepo == nil || knowledge == nil || len(knowledge.Metadata) == 0 {
60+
return
61+
}
62+
metadata := knowledge.GetMetadata()
63+
dataSourceID := strings.TrimSpace(metadata["datasource_id"])
64+
externalID := strings.TrimSpace(metadata["external_id"])
65+
if dataSourceID == "" || externalID == "" {
66+
return
67+
}
68+
sourceResourceID := strings.TrimSpace(metadata["source_resource_id"])
69+
tenantID := types.MustTenantIDFromContext(ctx)
70+
if err := s.dataSourceRepo.InvalidateCursorItem(ctx, tenantID, dataSourceID, externalID, sourceResourceID); err != nil {
71+
logger.Warnf(ctx, "failed to invalidate data source cursor for deleted knowledge %s: %v", knowledge.ID, err)
72+
}
73+
}
74+
5875
// DeleteKnowledge deletes a knowledge entry and all related resources
5976
func (s *knowledgeService) DeleteKnowledge(ctx context.Context, id string) error {
6077
// Get the knowledge entry
@@ -190,7 +207,11 @@ func (s *knowledgeService) DeleteKnowledge(ctx context.Context, id string) error
190207
return err
191208
}
192209
// Delete the knowledge entry itself from the database
193-
return s.repo.DeleteKnowledge(ctx, ctx.Value(types.TenantIDContextKey).(uint64), id)
210+
if err := s.repo.DeleteKnowledge(ctx, ctx.Value(types.TenantIDContextKey).(uint64), id); err != nil {
211+
return err
212+
}
213+
s.invalidateDataSourceCursorForKnowledge(ctx, knowledge)
214+
return nil
194215
}
195216

196217
// cleanupWikiOnKnowledgeDelete handles wiki pages when a source document is deleted.
@@ -577,7 +598,13 @@ func (s *knowledgeService) DeleteKnowledgeList(ctx context.Context, ids []string
577598
return err
578599
}
579600
// 5. Delete the knowledge entry itself from the database
580-
return s.repo.DeleteKnowledgeList(ctx, tenantInfo.ID, ids)
601+
if err := s.repo.DeleteKnowledgeList(ctx, tenantInfo.ID, ids); err != nil {
602+
return err
603+
}
604+
for _, knowledge := range knowledgeList {
605+
s.invalidateDataSourceCursorForKnowledge(ctx, knowledge)
606+
}
607+
return nil
581608
}
582609

583610
func (s *knowledgeService) cleanupKnowledgeResources(ctx context.Context, knowledge *types.Knowledge) error {

internal/types/interfaces/datasource.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,11 @@ type DataSourceRepository interface {
8282
// UpdateSyncState updates only fields produced by a sync run.
8383
UpdateSyncState(ctx context.Context, ds *types.DataSource) error
8484

85+
// InvalidateCursorItem removes a synced item from the stored incremental
86+
// cursor so deleting local knowledge lets the next sync fetch it again even
87+
// if the upstream document itself has not changed.
88+
InvalidateCursorItem(ctx context.Context, tenantID uint64, dataSourceID string, externalID string, sourceResourceID string) error
89+
8590
// Delete performs a soft delete
8691
Delete(ctx context.Context, id string) error
8792

0 commit comments

Comments
 (0)