Skip to content

Commit 88e9ffe

Browse files
authored
Merge pull request #77 from CocaineCong/feature-add-proto
feat:support upload by file streaming in proto & refactor redis module
2 parents ea702cc + f1f7ddf commit 88e9ffe

14 files changed

Lines changed: 571 additions & 39 deletions

File tree

app/gateway/http/index_platform.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"github.com/pkg/errors"
2525

2626
"github.com/CocaineCong/tangseng/app/gateway/rpc"
27+
"github.com/CocaineCong/tangseng/consts/e"
2728
pb "github.com/CocaineCong/tangseng/idl/pb/index_platform"
2829
"github.com/CocaineCong/tangseng/pkg/ctl"
2930
log "github.com/CocaineCong/tangseng/pkg/logger"
@@ -47,3 +48,27 @@ func BuildIndexByFiles(ctx *gin.Context) {
4748

4849
ctx.JSON(http.StatusOK, ctl.RespSuccess(ctx, r))
4950
}
51+
52+
func UploadIndexByFiles(ctx *gin.Context) {
53+
var req pb.BuildIndexReq
54+
if err := ctx.ShouldBind(&req); err != nil {
55+
log.LogrusObj.Errorf("Bind:%v", err)
56+
ctx.JSON(http.StatusOK, ctl.RespError(ctx, err, "绑定参数错误"))
57+
return
58+
}
59+
file, fileHeader, _ := ctx.Request.FormFile("file")
60+
if fileHeader == nil {
61+
err := errors.New(e.GetMsg(e.ErrorUploadFile))
62+
ctx.JSON(http.StatusOK, ctl.RespError(ctx, err, "上传错误"))
63+
log.LogrusObj.Error(err)
64+
return
65+
}
66+
r, err := rpc.UploadByStream(ctx, &req, file, fileHeader.Size)
67+
if err != nil {
68+
log.LogrusObj.Errorf("rpc.BuildIndex failed, original error: %T %v", errors.Cause(err), errors.Cause(err))
69+
ctx.JSON(http.StatusOK, ctl.RespError(ctx, err, "UploadIndexByFiles RPC服务调用错误"))
70+
return
71+
}
72+
73+
ctx.JSON(http.StatusOK, ctl.RespSuccess(ctx, r))
74+
}

app/gateway/routes/index_platform.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,5 +27,6 @@ func IndexPlatformRegisterHandlers(rg *gin.RouterGroup) {
2727
indexPlatformGroup := rg.Group("/index_platform")
2828
{
2929
indexPlatformGroup.POST("/build_index", http.BuildIndexByFiles)
30+
indexPlatformGroup.POST("/upload_index", http.UploadIndexByFiles)
3031
}
3132
}

app/gateway/rpc/index_platform.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,13 @@ package rpc
1919

2020
import (
2121
"context"
22+
"io"
23+
"mime/multipart"
2224

2325
"github.com/pkg/errors"
2426

2527
pb "github.com/CocaineCong/tangseng/idl/pb/index_platform"
28+
log "github.com/CocaineCong/tangseng/pkg/logger"
2629
)
2730

2831
// BuildIndex 建立索引的RPC调用
@@ -35,3 +38,30 @@ func BuildIndex(ctx context.Context, req *pb.BuildIndexReq) (resp *pb.BuildIndex
3538

3639
return
3740
}
41+
42+
func UploadByStream(ctx context.Context, req *pb.BuildIndexReq, file multipart.File, fileSize int64) (resp *pb.UploadResponse, err error) {
43+
stream, err := IndexPlatformClient.UploadFile(ctx)
44+
if err != nil {
45+
err = errors.WithMessage(err, "IndexPlatformClient.UploadStream err")
46+
return
47+
}
48+
buf := make([]byte, 1024*1024) // 1MB chunks
49+
for {
50+
n, errx := file.Read(buf)
51+
if errx == io.EOF {
52+
break
53+
}
54+
if err = stream.Send(&pb.FileChunk{
55+
Content: buf[:n],
56+
}); err != nil {
57+
log.LogrusObj.Error("stream.Send", err)
58+
return
59+
}
60+
}
61+
resp, err = stream.CloseAndRecv()
62+
if err != nil && err != io.EOF {
63+
return nil, err
64+
}
65+
66+
return resp, nil
67+
}

app/index_platform/service/index_platform.go

Lines changed: 111 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,15 @@ import (
2121
"context"
2222
"fmt"
2323
"hash/fnv"
24+
"io"
2425
"os"
2526
"sort"
2627
"strings"
2728
"sync"
2829

29-
"github.com/pkg/errors"
30-
3130
"github.com/RoaringBitmap/roaring"
3231
cmap "github.com/orcaman/concurrent-map/v2"
32+
"github.com/pkg/errors"
3333
"github.com/spf13/cast"
3434

3535
"github.com/CocaineCong/tangseng/app/index_platform/analyzer"
@@ -234,3 +234,112 @@ func iHash(key string) int64 { // nolint:golint,unused
234234
_, _ = h.Write([]byte(key))
235235
return int64(h.Sum32() & 0x7fffffff)
236236
}
237+
238+
func (s *IndexPlatformSrv) UploadFile(stream pb.IndexPlatformService_UploadFileServer) (err error) {
239+
ctx := stream.Context()
240+
// 时间估计
241+
invertedIndex := cmap.New[*roaring.Bitmap]() // 倒排索引
242+
dictTrie := trie.NewTrie() // 前缀树
243+
// mapreduce 这个是用chan和goroutine来代替master和worker的rpc调用,避免了频繁的rpc调用
244+
_, _ = mapreduce.MapReduce(func(source chan<- []byte) {
245+
chunk, err := stream.Recv()
246+
if err == io.EOF {
247+
_ = stream.SendAndClose(&pb.UploadResponse{
248+
Code: e.SUCCESS,
249+
Message: cconsts.IndexPlatformUploadSuccess,
250+
})
251+
}
252+
source <- chunk.Content
253+
}, func(item []byte, writer mapreduce.Writer[[]*types.KeyValue], cancel func(error)) {
254+
// 控制并发
255+
var wg sync.WaitGroup
256+
ch := make(chan struct{}, 3)
257+
258+
keyValueList := make([]*types.KeyValue, 0, 1e3)
259+
lines := strings.Split(string(item), "\r\n")
260+
for _, line := range lines[1:] {
261+
ch <- struct{}{}
262+
wg.Add(1)
263+
docStruct, _ := input_data.Doc2Struct(line) // line 转 docs struct
264+
if docStruct.DocId == 0 {
265+
continue
266+
}
267+
268+
// 分词
269+
tokens, _ := analyzer.GseCutForBuildIndex(docStruct.DocId, docStruct.Body)
270+
for _, v := range tokens {
271+
if v.Token == "" || v.Token == " " {
272+
continue
273+
}
274+
keyValueList = append(keyValueList, &types.KeyValue{Key: v.Token, Value: cast.ToString(v.DocId)})
275+
dictTrie.Insert(v.Token)
276+
}
277+
278+
// 建立正排索引
279+
go func(docStruct *types.Document) {
280+
err = input_data.DocData2Kfk(docStruct)
281+
if err != nil {
282+
logs.LogrusObj.Error(err)
283+
}
284+
defer wg.Done()
285+
<-ch
286+
}(docStruct)
287+
}
288+
wg.Wait()
289+
290+
// // 构建前缀树 // TODO: kafka异步处理一下前缀树的插入,不然占着这里的资源
291+
// go func(tokenList []string) {
292+
// err = input_data.DocTrie2Kfk(tokenList)
293+
// if err != nil {
294+
// logs.LogrusObj.Error("DocTrie2Kfk", err)
295+
// }
296+
// }(tokenList)
297+
298+
// shuffle 排序过程
299+
sort.Sort(types.ByKey(keyValueList))
300+
writer.Write(keyValueList)
301+
}, func(pipe <-chan []*types.KeyValue, writer mapreduce.Writer[string], cancel func(error)) {
302+
for values := range pipe {
303+
for _, v := range values { // 构建倒排索引
304+
if value, ok := invertedIndex.Get(v.Key); ok {
305+
value.AddInt(cast.ToInt(v.Value))
306+
invertedIndex.Set(v.Key, value)
307+
} else {
308+
docIds := roaring.NewBitmap()
309+
docIds.AddInt(cast.ToInt(v.Value))
310+
invertedIndex.Set(v.Key, docIds)
311+
}
312+
}
313+
}
314+
})
315+
316+
// 存储倒排索引
317+
go func() {
318+
newCtx := clone.NewContextWithoutDeadline()
319+
newCtx.Clone(ctx)
320+
err = storeInvertedIndexByHash(newCtx, invertedIndex)
321+
if err != nil {
322+
logs.LogrusObj.Error("storeInvertedIndexByHash error ", err)
323+
}
324+
}()
325+
326+
logs.LogrusObj.Infoln("storeInvertedIndexByHash End")
327+
328+
// 存储前缀树
329+
go func() {
330+
newCtx := clone.NewContextWithoutDeadline()
331+
newCtx.Clone(ctx)
332+
err = storeDictTrieByHash(newCtx, dictTrie)
333+
if err != nil {
334+
logs.LogrusObj.Error("storeDictTrieByHash error ", err)
335+
logs.LogrusObj.Errorf("stack trace: \n%+v\n", err)
336+
}
337+
}()
338+
339+
return nil
340+
}
341+
342+
func (s *IndexPlatformSrv) DownloadFile(file *pb.FileRequest, req pb.IndexPlatformService_DownloadFileServer) (err error) {
343+
344+
return nil
345+
}

consts/e/code.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,17 +22,18 @@ const (
2222
ERROR = 500
2323
InvalidParams = 400
2424

25-
//成员错误
25+
// 成员错误
2626
ErrorExistUser = 10002
2727
ErrorNotExistUser = 10003
2828
ErrorFailEncryption = 10006
2929
ErrorNotCompare = 10007
30+
ErrorUploadFile = 10008
3031

3132
HaveSignUp = 20001
3233
ErrorActivityTimeout = 20002
3334

34-
ErrorAuthCheckTokenFail = 30001 //token 错误
35-
ErrorAuthCheckTokenTimeout = 30002 //token 过期
35+
ErrorAuthCheckTokenFail = 30001 // token 错误
36+
ErrorAuthCheckTokenTimeout = 30002 // token 过期
3637
ErrorAuthToken = 30003
3738
ErrorAuth = 30004
3839
ErrorAuthNotFound = 30005

consts/e/msg.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ var MsgFlags = map[int]string{
3030
ErrorNotCompare: "不匹配",
3131
ErrorDatabase: "数据库操作出错,请重试",
3232
ErrorAuthNotFound: "Token不能为空",
33+
ErrorUploadFile: "上传错误",
3334
}
3435

3536
// GetMsg 获取状态码对应信息

consts/index_platform.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
package consts
2+
3+
const (
4+
IndexPlatformUploadSuccess = "上传成功"
5+
)

idl/index_platform.proto

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,4 +15,21 @@ message BuildIndexResp {
1515

1616
service IndexPlatformService {
1717
rpc BuildIndexService(BuildIndexReq) returns (BuildIndexResp);
18+
rpc UploadFile(stream FileChunk) returns (UploadResponse); // 客户端流式传输
19+
rpc DownloadFile(FileRequest) returns (stream FileChunk); // 服务端流式传输
20+
}
21+
22+
message FileChunk {
23+
bytes content = 1;
24+
}
25+
26+
message UploadResponse {
27+
// @inject_tag:form:"code" uri:"code"
28+
int64 code = 1;
29+
// @inject_tag:form:"message" uri:"message"
30+
string message = 2;
31+
}
32+
33+
message FileRequest {
34+
string filename = 1;
1835
}

0 commit comments

Comments
 (0)