Skip to content

Commit 7be2e99

Browse files
committed
chore(clone): Optimize concurrent processing, improve error handling and annotations
1 parent a1473fe commit 7be2e99

File tree

3 files changed

+67
-51
lines changed

3 files changed

+67
-51
lines changed

internal/application/repository/knowledge.go

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -150,17 +150,29 @@ func (r *knowledgeRepository) CheckKnowledgeExists(
150150
return true, &knowledge, nil
151151
}
152152
} else if params.Type == "url" {
153+
// If file hash exists, prioritize exact match using hash
154+
if params.FileHash != "" {
155+
var knowledge types.Knowledge
156+
err := query.Where("type = 'url' AND file_hash = ?", params.FileHash).First(&knowledge).Error
157+
if err == nil && knowledge.ID != "" {
158+
return true, &knowledge, nil
159+
}
160+
if err != nil && !errors.Is(err, gorm.ErrRecordNotFound) {
161+
return false, nil, err
162+
}
163+
}
164+
153165
if params.URL != "" {
154166
var knowledge types.Knowledge
155167
err := query.Where("type = 'url' AND source = ?", params.URL).First(&knowledge).Error
156-
if err != nil {
157-
if errors.Is(err, gorm.ErrRecordNotFound) {
158-
return false, nil, nil
159-
}
168+
if err == nil && knowledge.ID != "" {
169+
return true, &knowledge, nil
170+
}
171+
if err != nil && !errors.Is(err, gorm.ErrRecordNotFound) {
160172
return false, nil, err
161173
}
162-
return true, &knowledge, nil
163174
}
175+
return false, nil, nil
164176
}
165177

166178
// No valid parameters, default to not existing

internal/application/service/knowledge.go

Lines changed: 39 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ import (
1414
"slices"
1515
"sort"
1616
"strings"
17-
"sync"
1817
"time"
1918

2019
"github.com/Tencent/WeKnora/internal/application/service/retriever"
@@ -271,9 +270,11 @@ func (s *knowledgeService) CreateKnowledgeFromURL(ctx context.Context,
271270
// Check if URL already exists in the knowledge base
272271
tenantID := ctx.Value(types.TenantIDContextKey).(uint)
273272
logger.Infof(ctx, "Checking if URL exists, tenant ID: %d", tenantID)
273+
fileHash := calculateStr(url)
274274
exists, existingKnowledge, err := s.repo.CheckKnowledgeExists(ctx, tenantID, kbID, &types.KnowledgeCheckParams{
275-
Type: "url",
276-
URL: url,
275+
Type: "url",
276+
URL: url,
277+
FileHash: fileHash,
277278
})
278279
if err != nil {
279280
logger.Errorf(ctx, "Failed to check knowledge existence: %v", err)
@@ -306,6 +307,7 @@ func (s *knowledgeService) CreateKnowledgeFromURL(ctx context.Context,
306307
KnowledgeBaseID: kbID,
307308
Type: "url",
308309
Source: url,
310+
FileHash: fileHash,
309311
ParseStatus: "pending",
310312
EnableStatus: "disabled",
311313
CreatedAt: time.Now(),
@@ -1375,49 +1377,47 @@ func (s *knowledgeService) CloneKnowledgeBase(ctx context.Context, srcID, dstID
13751377
logger.Infof(ctx, "Knowledge after update to add: %d, delete: %d", len(addKnowledge), len(delKnowledge))
13761378

13771379
batch := 10
1378-
wg := sync.WaitGroup{}
1379-
errCh := make(chan error, len(delKnowledge))
1380+
g, gctx := errgroup.WithContext(ctx)
13801381
for ids := range slices.Chunk(delKnowledge, batch) {
1381-
wg.Add(1)
1382-
go func(ids []string) {
1383-
defer wg.Done()
1384-
if err := s.DeleteKnowledgeList(ctx, ids); err != nil {
1385-
errCh <- fmt.Errorf("delete knowledge %v: %w", ids, err)
1382+
ids = ids
1383+
g.Go(func() error {
1384+
err := s.DeleteKnowledgeList(gctx, ids)
1385+
if err != nil {
1386+
logger.Errorf(gctx, "delete partial knowledge %v: %w", ids, err)
1387+
return err
13861388
}
1387-
}(ids)
1389+
return nil
1390+
})
13881391
}
1389-
wg.Wait()
1390-
close(errCh)
1391-
for err := range errCh {
1392-
if err != nil {
1393-
return err
1394-
}
1392+
err = g.Wait()
1393+
if err != nil {
1394+
logger.Errorf(ctx, "delete total knowledge %d: %v", len(delKnowledge), err)
1395+
return err
13951396
}
13961397

1397-
wg = sync.WaitGroup{}
1398-
errCh = make(chan error, len(addKnowledge)+len(delKnowledge))
1399-
for ids := range slices.Chunk(addKnowledge, batch) {
1400-
wg.Add(1)
1401-
go func(ids []string) {
1402-
defer wg.Done()
1403-
for _, kID := range ids {
1404-
srcKn, err := s.repo.GetKnowledgeByID(ctx, srcKB.TenantID, kID)
1405-
if err != nil {
1406-
errCh <- fmt.Errorf("get knowledge %s: %w", kID, err)
1407-
continue
1408-
}
1409-
if err := s.cloneKnowledge(ctx, srcKn, dstKB); err != nil {
1410-
errCh <- fmt.Errorf("move knowledge %s: %w", kID, err)
1411-
}
1398+
// Copy context out of auto-stop task
1399+
g, gctx = errgroup.WithContext(ctx)
1400+
g.SetLimit(batch)
1401+
for _, knowledge := range addKnowledge {
1402+
knowledge = knowledge
1403+
g.Go(func() error {
1404+
srcKn, err := s.repo.GetKnowledgeByID(gctx, srcKB.TenantID, knowledge)
1405+
if err != nil {
1406+
logger.Errorf(gctx, "get knowledge %s: %w", knowledge, err)
1407+
return err
1408+
}
1409+
err = s.cloneKnowledge(gctx, srcKn, dstKB)
1410+
if err != nil {
1411+
logger.Errorf(gctx, "clone knowledge %s: %w", knowledge, err)
1412+
return err
14121413
}
1413-
}(ids)
1414+
return nil
1415+
})
14141416
}
1415-
wg.Wait()
1416-
close(errCh)
1417-
for err := range errCh {
1418-
if err != nil {
1419-
return err
1420-
}
1417+
err = g.Wait()
1418+
if err != nil {
1419+
logger.Errorf(ctx, "add total knowledge %d: %v", len(addKnowledge), err)
1420+
return err
14211421
}
14221422
return nil
14231423
}

internal/handler/knowledgebase.go

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package handler
22

33
import (
4+
"context"
45
"net/http"
56

67
"github.com/Tencent/WeKnora/internal/errors"
@@ -279,13 +280,16 @@ func (h *KnowledgeBaseHandler) CopyKnowledgeBase(c *gin.Context) {
279280

280281
logger.Infof(ctx, "Copy knowledge base, ID: %s to ID: %s", req.SourceID, req.TargetID)
281282

282-
err := h.knowledgeService.CloneKnowledgeBase(ctx, req.SourceID, req.TargetID)
283-
if err != nil {
284-
logger.ErrorWithFields(ctx, err, nil)
285-
c.Error(errors.NewInternalServerError(err.Error()))
286-
return
287-
}
288-
logger.Infof(ctx, "Knowledge base copy successfully, ID: %s to ID: %s", req.SourceID, req.TargetID)
283+
go func(ctx context.Context) {
284+
err := h.knowledgeService.CloneKnowledgeBase(ctx, req.SourceID, req.TargetID)
285+
if err != nil {
286+
logger.Errorf(ctx, "Failed to copy knowledge base, ID: %s to ID: %s", req.SourceID, req.TargetID)
287+
return
288+
}
289+
logger.Infof(ctx, "Knowledge base copy from ID: %s to ID: %s successfully", req.SourceID, req.TargetID)
290+
}(logger.CloneContext(ctx))
291+
292+
logger.Infof(ctx, "Knowledge base start copy from ID: %s to ID: %s", req.SourceID, req.TargetID)
289293
c.JSON(http.StatusOK, gin.H{
290294
"success": true,
291295
"message": "Knowledge base copy successfully",

0 commit comments

Comments
 (0)