Skip to content

Commit e16266e

Browse files
authored
Add s5cmd data_store (#44)
* [WIP] Add s5cmd data_store * Fix golint issue, added fmt check to ci * Add go vet check to CI * Update ci to avoid node version deprecation warning * Add s5cmd data_store, upload functions * Full implemented LocalStorage. Added UploadFromReader func. Zap logger. Code cleanup * Reusing code
1 parent 3d11cce commit e16266e

10 files changed

Lines changed: 468 additions & 111 deletions

File tree

.github/workflows/main.yml

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,10 @@ jobs:
1010
runs-on: ubuntu-latest
1111
steps:
1212
- name: Checkout
13-
uses: actions/checkout@v2
13+
uses: actions/checkout@v3.4.0
1414
with:
1515
submodules: true
16-
- uses: actions/setup-go@v2
16+
- uses: actions/setup-go@v4.0.0
1717
with:
1818
go-version: '^1.19'
1919
- name: Build
@@ -24,15 +24,19 @@ jobs:
2424
runs-on: ubuntu-latest
2525
steps:
2626
- name: Checkout
27-
uses: actions/checkout@v2
27+
uses: actions/checkout@v3.4.0
2828
with:
2929
submodules: true
30-
- uses: actions/setup-go@v2
30+
- uses: actions/setup-go@v4.0.0
3131
with:
3232
go-version: '^1.19'
3333
- run: make build
3434
- name: ModTidy check
3535
run: make check-modtidy
36+
- name: Fmt check
37+
run: make fmt-check
38+
- name: Vet check
39+
run: make vet
3640
- name: Lint check
3741
run: |
3842
make install_lint
@@ -42,10 +46,10 @@ jobs:
4246
runs-on: ubuntu-latest
4347
steps:
4448
- name: Checkout
45-
uses: actions/checkout@v2
49+
uses: actions/checkout@v3.4.0
4650
with:
4751
submodules: true
48-
- uses: actions/setup-go@v2
52+
- uses: actions/setup-go@v4.0.0
4953
with:
5054
go-version: '^1.19'
5155
- name: "Launch db engine in background"
@@ -60,10 +64,10 @@ jobs:
6064
runs-on: ubuntu-latest
6165
steps:
6266
- name: Checkout
63-
uses: actions/checkout@v2
67+
uses: actions/checkout@v3.4.0
6468
with:
6569
submodules: true
66-
- uses: actions/setup-go@v2
70+
- uses: actions/setup-go@v4.0.0
6771
with:
6872
go-version: '^1.19'
6973
- name: "Launch db engine in background"

Makefile

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
GOFMT_FILES?=$$(find . -name '*.go')
2+
13
build:
24
go build ./...
35

@@ -9,16 +11,28 @@ gitclean:
911
git submodule foreach --recursive git clean -xfd
1012

1113
install_lint:
12-
curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(shell go env GOPATH)/bin v1.50.1
14+
go install github.com/golangci/golangci-lint/cmd/golangci-lint@v1.51.0
1315

1416
check-modtidy:
1517
go mod tidy
1618
git diff --exit-code -- go.mod go.sum
1719

1820
lint:
21+
@echo "==> Checking that code complies with golangci-lint requirements..."
1922
golangci-lint --version
2023
golangci-lint run -E gofmt -E gosec -E goconst -E gocritic --timeout 5m
2124

25+
fmt:
26+
gofmt -w -s $(GOFMT_FILES)
27+
28+
fmt-check:
29+
@echo "==> Checking that code complies with gofmt requirements..."
30+
gofmt -l -s $(GOFMT_FILES)
31+
32+
vet:
33+
@echo "==> Checking that code complies with go vet requirements..."
34+
go vet $$(go list ./...)
35+
2236
test: build
2337
go test -race ./...
2438

components/connections/data_store/builder.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,31 @@ package data_store
22

33
import (
44
"fmt"
5+
56
"go.uber.org/zap"
67
)
78

89
const (
10+
S5Storage = "s5"
911
S3Storage = "s3"
1012
LocalStorage = "local"
13+
ContentType = "application/octet-stream"
14+
DataPath = "data"
15+
S3url = "s3://"
1116
)
1217

1318
func NewDataStoreClient(config DataStoreConfig) (DataStoreClient, error) {
19+
if len(config.ContentType) == 0 {
20+
config.ContentType = ContentType
21+
}
22+
if len(config.DataPath) == 0 {
23+
config.DataPath = DataPath
24+
}
1425
zap.S().Infof("[DataStore] - Creating client for service '%s'", config.Service)
1526
switch config.Service {
27+
case S5Storage:
28+
client, err := newS5cmdClient(config)
29+
return DataStoreClient{client}, err
1630
case S3Storage:
1731
client, err := newMinioClient(config)
1832
return DataStoreClient{client}, err
Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,15 @@
11
package data_store
22

3-
import "context"
3+
import (
4+
"context"
5+
"io"
6+
)
47

58
type IDataStoreClient interface {
69
GetFile(name string, location string) ([]byte, error)
710
UploadFromFile(filePath string, dest string) error
811
UploadFromBytes(data []byte, destFolder string, destName string) error
12+
UploadFromReader(data io.Reader, size int64, destFolder string, destName string) error
913
List(dir string, prefix string) ([]string, error)
1014
ListChan(ctx context.Context, dir string, prefix string) (<-chan string, error)
1115
StorageType() string
@@ -16,10 +20,11 @@ type DataStoreClient struct {
1620
}
1721

1822
type DataStoreConfig struct {
19-
Url string
20-
UseHttps bool
21-
User string
22-
Password string
23-
Service string
24-
DataPath string
23+
Url string
24+
UseHttps bool
25+
User string
26+
Password string
27+
Service string
28+
DataPath string
29+
ContentType string
2530
}

components/connections/data_store/local.go

Lines changed: 77 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -3,22 +3,38 @@ package data_store
33
import (
44
"context"
55
"fmt"
6-
"go.uber.org/zap"
76
"io"
87
"os"
9-
"strings"
8+
"path/filepath"
9+
"time"
10+
11+
"go.uber.org/zap"
1012
)
1113

1214
type LocalClient struct {
13-
DataPath string
15+
dataPath string
1416
}
1517

16-
func newLocalClient(config DataStoreConfig) (LocalClient, error) {
17-
return LocalClient{DataPath: config.DataPath}, nil
18+
func newLocalClient(config DataStoreConfig) (*LocalClient, error) {
19+
return &LocalClient{
20+
dataPath: config.DataPath,
21+
}, nil
1822
}
1923

20-
func (c LocalClient) GetFile(object string, bucket string) ([]byte, error) {
21-
targetObject := fmt.Sprintf("%s/%s/%s", c.DataPath, bucket, object)
24+
func (c *LocalClient) GetDataPath() string {
25+
return c.dataPath
26+
}
27+
28+
func (c *LocalClient) GetFile(object string, bucket string) ([]byte, error) {
29+
if len(bucket) == 0 || len(object) == 0 {
30+
zap.S().Errorf("Bucket or object are empty")
31+
return nil, fmt.Errorf("Bucket or object are empty")
32+
}
33+
34+
start := time.Now()
35+
defer elapsed(start, "["+c.StorageType()+"] Get file")
36+
37+
targetObject := fmt.Sprintf("%s/%s/%s", c.GetDataPath(), bucket, object)
2238
data, err := os.ReadFile(targetObject)
2339
if err != nil {
2440
zap.S().Errorf("err when getting object from store: %v", err.Error())
@@ -28,58 +44,82 @@ func (c LocalClient) GetFile(object string, bucket string) ([]byte, error) {
2844
return data, nil
2945
}
3046

31-
func (c LocalClient) List(bucket string, prefix string) ([]string, error) {
32-
var list []string
33-
files, err := os.ReadDir(fmt.Sprintf("%s/%s", c.DataPath, bucket))
47+
func (c *LocalClient) List(bucket string, prefix string) ([]string, error) {
48+
return list(c, bucket, prefix)
49+
}
50+
51+
func (c *LocalClient) ListChan(ctx context.Context, bucket string, prefix string) (<-chan string, error) {
52+
if len(bucket) == 0 || len(prefix) == 0 {
53+
zap.S().Errorf("Bucket or prefix are empty")
54+
return nil, fmt.Errorf("Bucket or prefix are empty")
55+
}
56+
57+
start := time.Now()
58+
defer elapsed(start, "["+c.StorageType()+"] List channel files")
59+
60+
list, err := filepath.Glob(fmt.Sprintf("%s/%s/%s*", c.GetDataPath(), bucket, prefix))
3461
if err != nil {
3562
zap.S().Errorf("could not read directory '%s': %v", bucket, err)
3663
return nil, err
3764
}
3865

39-
for _, file := range files {
40-
fileName := file.Name()
41-
if strings.Contains(fileName, prefix) {
42-
list = append(list, fileName)
66+
outChan := make(chan string, 10)
67+
go func(files []string) {
68+
defer close(outChan)
69+
70+
for _, f := range files {
71+
select {
72+
case <-ctx.Done():
73+
return
74+
default:
75+
outChan <- filepath.Base(f)
76+
}
4377
}
44-
}
78+
}(list)
4579

46-
return list, nil
80+
return outChan, nil
4781
}
4882

49-
func (c LocalClient) ListChan(ctx context.Context, bucket string, prefix string) (<-chan string, error) {
50-
panic("not implemented")
83+
func (c *LocalClient) UploadFromFile(name string, folder string) error {
84+
return uploadFromFile(c, name, folder)
5185
}
5286

53-
func (c LocalClient) UploadFromFile(name string, dest string) error {
54-
file, err := os.Open(name)
55-
if err != nil {
56-
return err
57-
}
58-
defer file.Close()
87+
func (c *LocalClient) UploadFromBytes(data []byte, folder string, name string) error {
88+
return uploadFromBytes(c, data, folder, name)
89+
}
5990

60-
fileStat, err := file.Stat()
61-
if err != nil {
62-
return err
91+
func (c *LocalClient) UploadFromReader(data io.Reader, size int64, folder string, name string) error {
92+
if len(folder) == 0 || len(name) == 0 {
93+
zap.S().Errorf("Folder or name are empty")
94+
return fmt.Errorf("Folder or name are empty")
6395
}
6496

65-
if !fileStat.Mode().IsRegular() {
66-
return fmt.Errorf("%s is not a regular file", name)
67-
}
97+
start := time.Now()
98+
defer elapsed(start, "["+c.StorageType()+"] Upload from reader")
6899

69-
out, err := os.Create(fmt.Sprintf("%s/%s/%s", c.DataPath, dest, name))
100+
destFolder := fmt.Sprintf("%s/%s", c.GetDataPath(), folder)
101+
if _, err := os.Stat(destFolder); err != nil {
102+
if !os.IsNotExist(err) {
103+
return err
104+
}
105+
if err := os.MkdirAll(destFolder, os.ModePerm); err != nil {
106+
return err
107+
}
108+
}
109+
destUrl := fmt.Sprintf("%s/%s", destFolder, name)
110+
out, err := os.Create(destUrl)
70111
if err != nil {
71112
return err
72113
}
73114
defer out.Close()
74115

75-
_, err = io.Copy(out, file)
76-
return err
77-
}
116+
_, err = io.Copy(out, data)
117+
118+
zap.S().Debugf("[%s] Operation: upload, Source: %s, Destination: %s, Size: %d", c.StorageType(), name, destUrl, size)
78119

79-
func (c LocalClient) UploadFromBytes(data []byte, destFolder string, destName string) error {
80-
panic("not implemented")
120+
return err
81121
}
82122

83-
func (c LocalClient) StorageType() string {
123+
func (c *LocalClient) StorageType() string {
84124
return LocalStorage
85125
}

0 commit comments

Comments
 (0)