Skip to content

Commit e0ec268

Browse files
committed
wip csv source stream
1 parent 145a3d1 commit e0ec268

File tree

3 files changed

+47
-26
lines changed

3 files changed

+47
-26
lines changed

pkg/datasource/csvsource/csv_tick_converter.go

Lines changed: 34 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package csvsource
22

33
import (
4-
"encoding/csv"
54
"time"
65

76
"github.com/c9s/bbgo/pkg/fixedpoint"
@@ -11,14 +10,12 @@ import (
1110
type ICSVTickConverter interface {
1211
LatestKLine() (k *types.KLine)
1312
GetKLineResult() []types.KLine
14-
CsvTickToKLine(tick *CsvTick, interval types.Interval)
13+
CsvTickToKLine(tick *CsvTick, interval types.Interval) (closesKLine bool)
1514
}
1615

1716
// CSVTickConverter takes a tick and internally converts it to a KLine slice
1817
type CSVTickConverter struct {
19-
csv *csv.Reader
20-
decoder CSVTickDecoder
21-
klines []types.KLine
18+
klines []types.KLine
2219
}
2320

2421
func NewCSVTickConverter() ICSVTickConverter {
@@ -41,7 +38,7 @@ func (c *CSVTickConverter) GetKLineResult() []types.KLine {
4138
}
4239

4340
// Convert ticks to KLine with interval
44-
func (c *CSVTickConverter) CsvTickToKLine(tick *CsvTick, interval types.Interval) {
41+
func (c *CSVTickConverter) CsvTickToKLine(tick *CsvTick, interval types.Interval) (closesKLine bool) {
4542
var (
4643
currentCandle = types.KLine{}
4744
high = fixedpoint.Zero
@@ -50,15 +47,25 @@ func (c *CSVTickConverter) CsvTickToKLine(tick *CsvTick, interval types.Interval
5047
isOpen, t := c.detCandleStart(tick.Timestamp.Time(), interval)
5148

5249
if isOpen {
50+
k := c.LatestKLine()
5351
c.klines = append(c.klines, types.KLine{
54-
StartTime: types.NewTimeFromUnix(t.Unix(), 0),
55-
EndTime: types.NewTimeFromUnix(t.Add(interval.Duration()).Unix(), 0),
56-
Open: tick.Price,
57-
High: tick.Price,
58-
Low: tick.Price,
59-
Close: tick.Price,
60-
Volume: tick.HomeNotional,
52+
Exchange: tick.Exchange,
53+
Symbol: tick.Symbol,
54+
Interval: interval,
55+
StartTime: types.NewTimeFromUnix(t.Unix(), 0),
56+
EndTime: types.NewTimeFromUnix(t.Add(interval.Duration()).Unix(), 0),
57+
Open: tick.Price,
58+
High: tick.Price,
59+
Low: tick.Price,
60+
Close: tick.Price,
61+
Volume: tick.HomeNotional,
62+
QuoteVolume: tick.ForeignNotional,
63+
Closed: false,
6164
})
65+
if k != nil {
66+
k.Closed = true // k is pointer
67+
closesKLine = true
68+
}
6269
return
6370
}
6471

@@ -77,14 +84,21 @@ func (c *CSVTickConverter) CsvTickToKLine(tick *CsvTick, interval types.Interval
7784
}
7885

7986
c.klines[len(c.klines)-1] = types.KLine{
80-
StartTime: currentCandle.StartTime,
81-
EndTime: currentCandle.EndTime,
82-
Open: currentCandle.Open,
83-
High: high,
84-
Low: low,
85-
Close: tick.Price,
86-
Volume: currentCandle.Volume.Add(tick.HomeNotional),
87+
StartTime: currentCandle.StartTime,
88+
EndTime: currentCandle.EndTime,
89+
Exchange: tick.Exchange,
90+
Symbol: tick.Symbol,
91+
Interval: interval,
92+
Open: currentCandle.Open,
93+
High: high,
94+
Low: low,
95+
Close: tick.Price,
96+
Volume: currentCandle.Volume.Add(tick.HomeNotional),
97+
QuoteVolume: currentCandle.QuoteVolume.Add(tick.ForeignNotional),
98+
Closed: false,
8799
}
100+
101+
return
88102
}
89103

90104
func (c *CSVTickConverter) detCandleStart(ts time.Time, interval types.Interval) (isOpen bool, t time.Time) {

pkg/datasource/csvsource/csv_tick_decoder.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@ func BinanceCSVTickDecoder(row []string, _ int) (*CsvTick, error) {
3333
qty := fixedpoint.MustNewFromString(row[2])
3434
baseQty := fixedpoint.MustNewFromString(row[2])
3535
price := fixedpoint.MustNewFromString(row[1])
36-
// isBuyerMaker=false trade will qualify as BUY.
3736
id, err := strconv.ParseUint(row[0], 10, 64)
3837
if err != nil {
3938
return nil, err
@@ -42,6 +41,7 @@ func BinanceCSVTickDecoder(row []string, _ int) (*CsvTick, error) {
4241
if err != nil {
4342
return nil, err
4443
}
44+
// isBuyerMaker=false trade will qualify as BUY.
4545
side := types.SideTypeBuy
4646
if isBuyerMaker {
4747
side = types.SideTypeSell

pkg/datasource/csvsource/stream.go

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"io/fs"
66
"os"
77
"path/filepath"
8+
"time"
89

910
"github.com/c9s/bbgo/pkg/fixedpoint"
1011
"github.com/c9s/bbgo/pkg/types"
@@ -22,6 +23,7 @@ type Stream struct {
2223

2324
type CsvStreamConfig struct {
2425
Interval types.Interval
26+
RateLimit time.Duration `json:"csvPath"`
2527
CsvPath string `json:"csvPath"`
2628
Symbol string `json:"symbol"`
2729
BaseCoin string `json:"baseCoin"`
@@ -45,8 +47,11 @@ func NewStream(cfg *CsvStreamConfig) *Stream {
4547
return stream
4648
}
4749

48-
func (s *Stream) simulateEvents() error {
50+
func (s *Stream) Simulate() error {
4951
var i int
52+
converter := NewCSVTickConverter()
53+
54+
// iterate equity series at csv path and stream
5055
err := filepath.WalkDir(s.config.CsvPath, func(path string, d fs.DirEntry, err error) error {
5156
if err != nil {
5257
return err
@@ -63,7 +68,6 @@ func (s *Stream) simulateEvents() error {
6368
}
6469
//nolint:errcheck // Read ops only so safe to ignore err return
6570
defer file.Close()
66-
converter := NewCSVTickConverter()
6771
reader := NewBybitCSVTickReader(csv.NewReader(file))
6872
tick, err := reader.Read(i)
6973
if err != nil {
@@ -76,14 +80,17 @@ func (s *Stream) simulateEvents() error {
7680
}
7781
s.StandardStream.EmitMarketTrade(*trade)
7882

79-
converter.CsvTickToKLine(tick, s.config.Interval)
8083
kline := converter.LatestKLine()
81-
if kline.Closed {
84+
closesKline := converter.CsvTickToKLine(tick, s.config.Interval)
85+
if closesKline {
8286
s.StandardStream.EmitKLineClosed(*kline)
8387
} else {
88+
kline = converter.LatestKLine()
8489
s.StandardStream.EmitKLine(*kline)
8590
}
86-
91+
// allow for execution time of indicators and strategy
92+
time.Sleep(s.config.RateLimit) // Max execution time for tradingview strategy
93+
// to optimize exec time consider callback channel once a strategy has finished running another tick is emitted
8794
return nil
8895
})
8996
if err != nil {

0 commit comments

Comments
 (0)