Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit cb68de4

Browse files
committedNov 3, 2023
wip stream
1 parent 9865c12 commit cb68de4

File tree

11 files changed

+154
-108
lines changed

11 files changed

+154
-108
lines changed
 

‎pkg/datasource/csvsource/csv_kline_decoder.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@ var (
2121
// ErrInvalidTimeFormat is returned when the CSV price record does not have a valid time unix milli format.
2222
ErrInvalidTimeFormat = errors.New("cannot parse time string")
2323

24+
// ErrInvalidOrderSideFormat is returned when the CSV side record does not have a valid buy or sell string.
25+
ErrInvalidOrderSideFormat = errors.New("cannot parse order side string")
26+
2427
// ErrInvalidPriceFormat is returned when the CSV price record does not prices in expected format.
2528
ErrInvalidPriceFormat = errors.New("OHLC prices must be in valid decimal format")
2629

‎pkg/datasource/csvsource/csv_tick_converter.go

Lines changed: 0 additions & 73 deletions
This file was deleted.

‎pkg/datasource/csvsource/csv_tick_decoder.go

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,9 @@ package csvsource
22

33
import (
44
"encoding/csv"
5-
"strconv"
6-
"strings"
75

86
"github.com/c9s/bbgo/pkg/fixedpoint"
7+
"github.com/c9s/bbgo/pkg/types"
98
)
109

1110
// CSVTickDecoder is an extension point for CSVTickReader to support custom file formats.
@@ -20,19 +19,15 @@ func NewBinanceCSVTickReader(csv *csv.Reader) *CSVTickReader {
2019
}
2120

2221
// BinanceCSVKLineDecoder decodes a CSV record from Binance into a CsvTick.
23-
func BinanceCSVTickDecoder(row []string, intex int) (*CsvTick, error) {
22+
func BinanceCSVTickDecoder(row []string, _ int) (*CsvTick, error) {
2423
if len(row) < 7 {
2524
return nil, ErrNotEnoughColumns
2625
}
27-
timestamp, err := strconv.ParseInt(row[5], 10, 64)
28-
if err != nil {
29-
return nil, ErrInvalidTimeFormat
30-
}
3126
size := fixedpoint.MustNewFromString(row[2])
3227
price := fixedpoint.MustNewFromString(row[1])
3328
hn := price.Mul(size)
3429
return &CsvTick{
35-
Timestamp: timestamp / 1000,
30+
Timestamp: types.MustParseMillisecondTimestamp(row[5]),
3631
Size: size,
3732
Price: price,
3833
HomeNotional: hn,
@@ -55,18 +50,17 @@ func BybitCSVTickDecoder(row []string, index int) (*CsvTick, error) {
5550
if index == 0 {
5651
return nil, nil
5752
}
58-
startTime := strings.Split(row[0], ".")[0]
59-
timestamp, err := strconv.ParseInt(startTime, 10, 64)
53+
side, err := types.StrToSideType(row[2])
6054
if err != nil {
61-
return nil, ErrInvalidTimeFormat
55+
return nil, ErrInvalidOrderSideFormat
6256
}
6357
return &CsvTick{
64-
Timestamp: timestamp,
58+
Timestamp: types.MustParseMillisecondTimestamp(row[0]),
6559
Symbol: row[1],
66-
Side: row[2],
60+
Side: side,
61+
TickDirection: row[5],
6762
Size: fixedpoint.MustNewFromString(row[3]),
6863
Price: fixedpoint.MustNewFromString(row[4]),
69-
TickDirection: row[5],
7064
HomeNotional: fixedpoint.MustNewFromString(row[8]),
7165
ForeignNotional: fixedpoint.MustNewFromString(row[9]),
7266
}, nil

‎pkg/datasource/csvsource/csv_tick_reader.go

Lines changed: 72 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,9 @@ package csvsource
33
import (
44
"encoding/csv"
55
"io"
6+
"time"
67

8+
"github.com/c9s/bbgo/pkg/fixedpoint"
79
"github.com/c9s/bbgo/pkg/types"
810
)
911

@@ -13,6 +15,7 @@ var _ TickReader = (*CSVTickReader)(nil)
1315
type CSVTickReader struct {
1416
csv *csv.Reader
1517
decoder CSVTickDecoder
18+
klines []types.KLine
1619
}
1720

1821
// MakeCSVTickReader is a factory method type that creates a new CSVTickReader.
@@ -23,6 +26,7 @@ func NewCSVTickReader(csv *csv.Reader) *CSVTickReader {
2326
return &CSVTickReader{
2427
csv: csv,
2528
decoder: BinanceCSVTickDecoder,
29+
klines: []types.KLine{},
2630
}
2731
}
2832

@@ -31,28 +35,29 @@ func NewCSVTickReaderWithDecoder(csv *csv.Reader, decoder CSVTickDecoder) *CSVTi
3135
return &CSVTickReader{
3236
csv: csv,
3337
decoder: decoder,
38+
klines: []types.KLine{},
3439
}
3540
}
3641

3742
// ReadAll reads all the KLines from the underlying CSV data.
38-
func (r *CSVTickReader) ReadAll(interval types.Interval) error {
43+
func (r *CSVTickReader) ReadAll(interval types.Interval) (k []types.KLine, err error) {
3944
var i int
4045
for {
4146
tick, err := r.Read(i)
4247
if err == io.EOF {
4348
break
4449
}
4550
if err != nil {
46-
return err
51+
return k, err
4752
}
48-
i++
53+
i++ // used as jump logic inside decoder to skip csv headers in case
4954
if tick == nil {
5055
continue
5156
}
52-
ConvertCsvTickToKLines(tick, interval)
57+
r.CsvTickToKLines(tick, interval)
5358
}
5459

55-
return nil
60+
return k, nil
5661
}
5762

5863
// Read reads the next KLine from the underlying CSV data.
@@ -64,3 +69,65 @@ func (r *CSVTickReader) Read(i int) (*CsvTick, error) {
6469

6570
return r.decoder(rec, i)
6671
}
72+
73+
// Convert ticks to KLine with interval
74+
func (c *CSVTickReader) CsvTickToKLines(tick *CsvTick, interval types.Interval) {
75+
var (
76+
currentCandle = types.KLine{}
77+
high = fixedpoint.Zero
78+
low = fixedpoint.Zero
79+
)
80+
isOpen, t := c.detCandleStart(tick.Timestamp.Time(), interval)
81+
82+
if isOpen {
83+
c.klines = append(c.klines, types.KLine{
84+
StartTime: types.NewTimeFromUnix(t.Unix(), 0),
85+
EndTime: types.NewTimeFromUnix(t.Add(interval.Duration()).Unix(), 0),
86+
Open: tick.Price,
87+
High: tick.Price,
88+
Low: tick.Price,
89+
Close: tick.Price,
90+
Volume: tick.HomeNotional,
91+
})
92+
return
93+
}
94+
95+
currentCandle = c.klines[len(c.klines)-1]
96+
97+
if tick.Price.Float64() > currentCandle.High.Float64() {
98+
high = tick.Price
99+
} else {
100+
high = currentCandle.High
101+
}
102+
103+
if tick.Price.Float64() < currentCandle.Low.Float64() {
104+
low = tick.Price
105+
} else {
106+
low = currentCandle.Low
107+
}
108+
109+
c.klines[len(c.klines)-1] = types.KLine{
110+
StartTime: currentCandle.StartTime,
111+
EndTime: currentCandle.EndTime,
112+
Open: currentCandle.Open,
113+
High: high,
114+
Low: low,
115+
Close: tick.Price,
116+
Volume: currentCandle.Volume.Add(tick.HomeNotional),
117+
}
118+
}
119+
120+
func (c *CSVTickReader) detCandleStart(ts time.Time, interval types.Interval) (isOpen bool, t time.Time) {
121+
if len(c.klines) == 0 {
122+
return true, interval.Convert(ts)
123+
}
124+
var (
125+
current = c.klines[len(c.klines)-1]
126+
end = current.EndTime.Time()
127+
)
128+
if ts.After(end) {
129+
return true, end
130+
}
131+
132+
return false, t
133+
}

‎pkg/datasource/csvsource/csv_tick_reader_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"github.com/stretchr/testify/assert"
99

1010
"github.com/c9s/bbgo/pkg/fixedpoint"
11+
"github.com/c9s/bbgo/pkg/types"
1112
)
1213

1314
var assertTickEq = func(t *testing.T, exp, act *CsvTick) {
@@ -28,7 +29,7 @@ func TestCSVTickReader_ReadWithBinanceDecoder(t *testing.T) {
2829
name: "Read Tick",
2930
give: "11782578,6.00000000,1.00000000,14974844,14974844,1698623884463,True,True",
3031
want: &CsvTick{
31-
Timestamp: 1698623884,
32+
Timestamp: types.NewMillisecondTimestampFromInt(1698623884463),
3233
Size: fixedpoint.NewFromFloat(1),
3334
Price: fixedpoint.NewFromFloat(6),
3435
HomeNotional: fixedpoint.NewFromFloat(6),

‎pkg/datasource/csvsource/read_ticks.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import (
1212
// TickReader is an interface for reading candlesticks.
1313
type TickReader interface {
1414
Read(i int) (*CsvTick, error)
15-
ReadAll(interval types.Interval) error
15+
ReadAll(interval types.Interval) (k []types.KLine, err error)
1616
}
1717

1818
// ReadTicksFromCSV reads all the .csv files in a given directory or a single file into a slice of Ticks.
@@ -23,8 +23,8 @@ func ReadTicksFromCSV(path string, interval types.Interval) ([]types.KLine, erro
2323
}
2424

2525
// ReadTicksFromCSVWithDecoder permits using a custom CSVTickReader.
26-
func ReadTicksFromCSVWithDecoder(path string, interval types.Interval, maker MakeCSVTickReader) ([]types.KLine, error) {
27-
err := filepath.WalkDir(path, func(path string, d fs.DirEntry, err error) error {
26+
func ReadTicksFromCSVWithDecoder(path string, interval types.Interval, maker MakeCSVTickReader) (klines []types.KLine, err error) {
27+
err = filepath.WalkDir(path, func(path string, d fs.DirEntry, err error) error {
2828
if err != nil {
2929
return err
3030
}
@@ -41,7 +41,7 @@ func ReadTicksFromCSVWithDecoder(path string, interval types.Interval, maker Mak
4141
//nolint:errcheck // Read ops only so safe to ignore err return
4242
defer file.Close()
4343
reader := maker(csv.NewReader(file))
44-
err = reader.ReadAll(interval)
44+
klines, err = reader.ReadAll(interval)
4545
if err != nil {
4646
return err
4747
}

‎pkg/datasource/csvsource/stream.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ func (s *Stream) simulateEvents() error {
7474
}
7575
s.StandardStream.EmitMarketTrade(trade)
7676

77-
ConvertCsvTickToKLines(tick, s.config.Interval)
77+
reader.CsvTickToKLines(tick, s.config.Interval)
7878
kline := klines[len(klines)-1]
7979
if kline.Closed {
8080
s.StandardStream.EmitKLineClosed(kline)

‎pkg/datasource/csvsource/types.go

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,16 @@ package csvsource
22

33
import (
44
"github.com/c9s/bbgo/pkg/fixedpoint"
5+
"github.com/c9s/bbgo/pkg/types"
56
)
67

78
type CsvTick struct {
8-
Timestamp int64 `json:"timestamp"`
9-
Symbol string `json:"symbol"`
10-
Side string `json:"side"`
11-
TickDirection string `json:"tickDirection"`
12-
Size fixedpoint.Value `json:"size"`
13-
Price fixedpoint.Value `json:"price"`
14-
HomeNotional fixedpoint.Value `json:"homeNotional"`
15-
ForeignNotional fixedpoint.Value `json:"foreignNotional"`
9+
Symbol string `json:"symbol"`
10+
TickDirection string `json:"tickDirection"`
11+
Side types.SideType `json:"side"`
12+
Size fixedpoint.Value `json:"size"`
13+
Price fixedpoint.Value `json:"price"`
14+
HomeNotional fixedpoint.Value `json:"homeNotional"`
15+
ForeignNotional fixedpoint.Value `json:"foreignNotional"`
16+
Timestamp types.MillisecondTimestamp `json:"timestamp"`
1617
}

‎pkg/indicator/v2/volume_profile.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import (
88
"gonum.org/v1/gonum/stat"
99

1010
bbgofloats "github.com/c9s/bbgo/pkg/datatype/floats"
11-
1211
"github.com/c9s/bbgo/pkg/types"
1312
)
1413

‎pkg/indicator/v2/volume_profile_test.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ import (
1111
"github.com/stretchr/testify/assert"
1212

1313
"github.com/c9s/bbgo/pkg/datasource/csvsource"
14-
1514
"github.com/c9s/bbgo/pkg/types"
1615
)
1716

0 commit comments

Comments
 (0)
Please sign in to comment.