Skip to content

Commit 3b1c4d9

Browse files
authored
Merge pull request #934 from oom-ai/jinghan/export_stream
Jinghan/implement API `ChannelExportStream`
2 parents c4b25a4 + 228277a commit 3b1c4d9

File tree

4 files changed

+76
-1
lines changed

4 files changed

+76
-1
lines changed

internal/database/offline/types.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ import (
88

99
type ExportOpt struct {
1010
SnapshotTable string
11+
CdcTable *string
12+
UnixMilli *int64
1113
EntityName string
1214
Features types.FeatureList
1315
Limit *uint64

pkg/oomstore/export.go

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ import (
66
"fmt"
77
"os"
88

9+
"github.com/oom-ai/oomstore/internal/database/dbutil"
10+
911
"github.com/pkg/errors"
1012
"github.com/spf13/cast"
1113

@@ -14,7 +16,7 @@ import (
1416
)
1517

1618
/*
17-
Export feature values of a particular revision.
19+
ChannelExport exports batch feature values of a particular revision.
1820
Usage Example:
1921
exportResult, err := store.Export(ctx, opt)
2022
if err != nil {
@@ -93,3 +95,48 @@ func (s *OomStore) Export(ctx context.Context, opt types.ExportOpt) error {
9395
}
9496
return exportResult.CheckStreamError()
9597
}
98+
99+
// ChannelExportStream exports the latest streaming feature values up to the given timestamp.
100+
// Currently, this API can only export features in one feature group.
101+
func (s *OomStore) ChannelExportStream(ctx context.Context, opt types.ChannelExportStreamOpt) (*types.ExportResult, error) {
102+
if err := validateFeatureFullNames(opt.FeatureFullNames); err != nil {
103+
return nil, errors.WithStack(err)
104+
}
105+
features, err := s.ListFeature(ctx, types.ListFeatureOpt{
106+
FeatureFullNames: &opt.FeatureFullNames,
107+
})
108+
if err != nil {
109+
return nil, errors.WithStack(err)
110+
}
111+
if len(features.GroupIDs()) != 1 {
112+
return nil, fmt.Errorf("expected 1 group, got %d groups", len(features.GroupIDs()))
113+
}
114+
group := features[0].Group
115+
revisions, err := s.ListRevision(ctx, &group.ID)
116+
if err != nil {
117+
return nil, errors.WithStack(err)
118+
}
119+
revision := revisions.Before(opt.UnixMilli)
120+
if revision == nil {
121+
return nil, fmt.Errorf("no feature values up to %d, use a later timestamp", opt.UnixMilli)
122+
}
123+
if revision.SnapshotTable == "" {
124+
if err = s.Snapshot(ctx, group.Name); err != nil {
125+
return nil, errors.WithStack(err)
126+
}
127+
}
128+
129+
snapshotTable := dbutil.OfflineStreamSnapshotTableName(group.ID, revision.Revision)
130+
cdcTable := dbutil.OfflineStreamCdcTableName(group.ID, revision.Revision)
131+
stream, exportErr := s.offline.Export(ctx, offline.ExportOpt{
132+
SnapshotTable: snapshotTable,
133+
CdcTable: &cdcTable,
134+
UnixMilli: &opt.UnixMilli,
135+
EntityName: group.Entity.Name,
136+
Features: features,
137+
Limit: opt.Limit,
138+
})
139+
140+
header := append([]string{group.Entity.Name}, features.Names()...)
141+
return types.NewExportResult(header, stream, exportErr), nil
142+
}

pkg/oomstore/types/options.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,12 @@ type ChannelExportOpt struct {
3636
Limit *uint64
3737
}
3838

39+
type ChannelExportStreamOpt struct {
40+
UnixMilli int64
41+
FeatureFullNames []string
42+
Limit *uint64
43+
}
44+
3945
type ExportOpt struct {
4046
RevisionID int
4147
FeatureNames []string

pkg/oomstore/types/revision.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package types
22

33
import (
4+
"sort"
45
"time"
56
)
67

@@ -62,6 +63,25 @@ func (l RevisionList) Filter(filter func(*Revision) bool) (rs RevisionList) {
6263
return
6364
}
6465

66+
func (l RevisionList) Before(unixMilli int64) *Revision {
67+
if len(l) == 0 {
68+
return nil
69+
}
70+
sort.Slice(l, func(i, j int) bool {
71+
return l[i].Revision < l[j].Revision
72+
})
73+
if l[0].Revision > unixMilli {
74+
return nil
75+
}
76+
var i int
77+
for i = range l {
78+
if l[i].Revision > unixMilli {
79+
break
80+
}
81+
}
82+
return l[i-1]
83+
}
84+
6585
func (l RevisionList) GroupIDs() []int {
6686
groupIDMap := make(map[int]struct{})
6787
for _, r := range l {

0 commit comments

Comments
 (0)