Skip to content

Commit d7001cf

Browse files
author
Jimmy
committed
complete full refactor, added a lot of features including binary messages through zeromq, fixed the rpc manager, major cleanup
1 parent a1d2371 commit d7001cf

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

74 files changed

+6251
-5342
lines changed

.gitmodules

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,3 @@
1-
[submodule "generated/proto/ticker"]
2-
path = generated/proto/ticker
3-
url = https://github.com/RoseLabsMx/ftso-data-sources-proto-files.git
41
[submodule "datasource/cryptocurrency/mexc/protobuf"]
52
path = datasource/cryptocurrency/mexc/protobuf
63
url = https://github.com/mexcdevelop/websocket-proto.git

README.md

Lines changed: 11 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ Flow goes like this:
1414
# Supported data sources:
1515
### Crypto:
1616
binance,binance.us,bitfinex,bitget,bitmart,bitrue,bitstamp,bybit,coinex,coinbase,cryptocom,digifinex,fmfw,gateio,
17-
hitbtc,huobi,kraken,kucoin,lbank,mexc,okx,pionex,toobit,whitebit,xt
17+
hitbtc,htx,kraken,kucoin,lbank,mexc,okx,pionex,toobit,whitebit,xt
1818

1919
### Stocks, Commodities, Forex:
2020
tiingo,metalsdev
@@ -28,11 +28,9 @@ Flow goes like this:
2828
`git clone https://github.com/LightFTSO/ftso-data-sources.git`
2929
2. cd into it
3030
`cd ftso-data-sources`
31-
3. (optionally clone submodules for protobuf codegen)
32-
`git submodule update --init --recursive`
33-
4. Create the config.yaml (see the section below)
31+
3. Create the config.yaml (see the section below)
3432
`touch config.yaml`
35-
5. Run it
33+
4. Run it
3634
Locally with `make run`:
3735
You need go 1.22+ installed, see https://go.dev/doc/install
3836
```bash
@@ -43,11 +41,7 @@ make run
4341
Run in docker:
4442
`docker compose up -d ftso-data-sources`
4543

46-
Supported go version: 1.22+
47-
48-
# Protobuf Definitions and Codegen
49-
50-
See https://github.com/RoseLabsMx/ftso-data-sources-proto-files
44+
Supported go version: 1.25.5+
5145

5246
# Configuration
5347
Modify the following sample configuration, by default, the program will look for a file called `config.yaml` in it's root folder or you can specify the file with the `-config <file>` flag
@@ -78,7 +72,7 @@ datasources:
7872
- source: fmfw
7973
- source: gateio
8074
- source: hitbtc
81-
- source: huobi
75+
- source: htx
8276
- source: kucoin
8377
- source: kraken
8478
- source: lbank
@@ -105,10 +99,10 @@ stats:
10599
interval: 60s
106100

107101
# see https://mqtt.org/
108-
mqtt:
109-
enabled: false
110-
url: "tcp://localhost:1883"
111-
qos_level: 1
102+
zeromq:
103+
enabled: false
104+
port: 9998
105+
flushinterval: 5ms
112106

113107
# See https://redis.io/docs/latest/develop/data-types/timeseries/
114108
redis_ts:
@@ -132,6 +126,8 @@ file_output:
132126
websocket_server:
133127
enabled: false
134128
ticker_endpoint: /tickers
129+
flush_interval: 50ms
130+
serialization_protocol: json
135131

136132
assets:
137133
forex:
@@ -338,15 +334,6 @@ Response:
338334
}
339335
```
340336
341-
# Counting ticker rate
342-
You can count the number of tickers per second enabling the file-output consumer, using /dev/stdout as the output file
343-
or using the MQTT consumer, in another terminal connect to it using a client program and pipe the output to the program `pv`, e.g.:
344-
345-
`./ftso-data-sources | pv --line-mode --timer --rate > /dev/null`
346-
Outputs:
347-
`03:22 [ 496 /s]`
348-
For more info on `pv`, visit [https://docs.oracle.com/cd/E86824_01/html/E54763/pv-1.html](Oracle's man pages)
349-
350337
# Contributing
351338
352339
Contributions are welcome! Please feel free to submit a PR.

config/config.go

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
package config
22

33
import (
4-
"log/slog"
4+
"fmt"
5+
"os"
56
"path"
67

78
"github.com/spf13/viper"
@@ -35,7 +36,7 @@ type ConfigOptions struct {
3536
RedisOptions consumer.RedisOptions `mapstructure:"redis_ts"`
3637
WebsocketConsumerOptions consumer.WebsocketConsumerOptions `mapstructure:"websocket_server"`
3738
FileConsumerOptions consumer.FileConsumerOptions `mapstructure:"file_output"`
38-
MQTTConsumerOptions consumer.MqttConsumerOptions `mapstructure:"mqtt"`
39+
ZMQConsumerOptions consumer.ZMQConsumerOptions `mapstructure:"zeromq"`
3940

4041
TickerTransformationOptions []tickertopic.TransformationOptions `mapstructure:"ticker_transformations"`
4142
}
@@ -64,15 +65,27 @@ func LoadConfig(configFile string) (config ConfigOptions, err error) {
6465
return ConfigOptions{}, err
6566
}
6667

67-
config.WebsocketConsumerOptions.Port = config.Port
68-
6968
Config = config
7069
return config, nil
7170
}
7271

73-
func SaveConfig() error {
74-
slog.Info("Saving current configuraton to backup file")
75-
err := viper.WriteConfigAs("config.original.yaml")
72+
// SafeWriteConfig writes the config to a temp file and atomically moves it
73+
// to the destination. This prevents file corruption on crash.
74+
func WriteConfig(filename string) error {
75+
// 1. Define a temporary filename (e.g., config.yaml.tmp)
76+
tempFile := "tmp." + filename
77+
78+
// 2. Write the viper config to the temporary file
79+
if err := viper.WriteConfigAs(tempFile); err != nil {
80+
return fmt.Errorf("failed to write temp config: %w", err)
81+
}
82+
83+
// 3. Atomically rename temp file to original file
84+
// This operation is atomic on POSIX systems (Linux/macOS) and safe on Windows
85+
// (if the file exists, it is replaced).
86+
if err := os.Rename(tempFile, filename); err != nil {
87+
return fmt.Errorf("failed to move temp config to %s: %w", filename, err)
88+
}
7689

77-
return err
90+
return nil
7891
}

config/defaults.go

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ func setDefaults() {
1212
viper.SetDefault("message_buffer_size", 0)
1313
viper.SetDefault("port", 9999)
1414

15-
viper.SetDefault("assets.crypto", constants.BASES_CRYPTO)
15+
viper.SetDefault("assets.crypto", constants.AssetList{})
1616
viper.SetDefault("assets.commodities", constants.AssetList{})
1717
viper.SetDefault("assets.forex", constants.AssetList{})
1818
viper.SetDefault("assets.stocks", constants.AssetList{})
@@ -25,9 +25,6 @@ func setDefaults() {
2525
viper.SetDefault("file_output.enabled", false)
2626
viper.SetDefault("file_output.filename", "")
2727

28-
viper.SetDefault("mqtt.enabled", false)
29-
viper.SetDefault("mqtt.qos_level", 0)
30-
3128
viper.SetDefault("redis_ts.enabled", false)
3229
viper.SetDefault("redis_ts.client_options.initaddress", []string{"127.0.0.1:6379"})
3330
viper.SetDefault("redis_ts.client_options.username", "")
@@ -38,14 +35,12 @@ func setDefaults() {
3835

3936
viper.SetDefault("websocket_server.enabled", false)
4037
viper.SetDefault("websocket_server.ticker_endpoint", "/tickers")
41-
viper.SetDefault("websocket_server.flush_interval", "500ms")
38+
viper.SetDefault("websocket_server.flush_interval", "50ms")
4239
viper.SetDefault("websocket_server.serialization_protocol", "json")
4340

44-
viper.SetDefault("questdb.enabled", false)
45-
viper.SetDefault("questdb.flush_interval", "10s")
46-
viper.SetDefault("questdb.client_options.address", "127.0.0.0.1:9000")
47-
viper.SetDefault("questdb.client_options.schema", "http")
48-
viper.SetDefault("questdb.individual_feed_table", true)
41+
viper.SetDefault("zeromq.enabled", "")
42+
viper.SetDefault("zeromq.port", 9998)
43+
viper.SetDefault("zeromq.flush_interval", "5ms")
4944

5045
viper.SetDefault("ticker_transformations", tickertopic.TransformationOptions{})
5146
}

config/update.go

Lines changed: 31 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,58 +1,56 @@
11
package config
22

33
import (
4+
"fmt"
45
"log/slog"
56

67
"github.com/spf13/viper"
78
)
89

9-
func UpdateConfig(newConfig ConfigOptions, saveCurrentConfig bool) {
10-
// save the configuration object before changes are applied, for backup purposes
11-
if saveCurrentConfig {
12-
SaveConfig()
13-
}
10+
func UpdateConfig(newConfig ConfigOptions) {
11+
fmt.Printf("%+v\n", newConfig)
1412

13+
// Core
1514
viper.Set("env", newConfig.Env)
1615
viper.Set("log_level", newConfig.LogLevel)
1716
viper.Set("message_buffer_size", newConfig.MessageBufferSize)
1817
viper.Set("port", newConfig.Port)
1918

20-
viper.Set("assets.crypto", newConfig.Assets.Crypto)
21-
viper.Set("assets.commodities", newConfig.Assets.Commodities)
22-
viper.Set("assets.forex", newConfig.Assets.Forex)
23-
viper.Set("assets.stocks", newConfig.Assets.Stocks)
19+
// Assets
20+
viper.Set("assets", newConfig.Assets)
2421

25-
viper.Set("stats.enabled", newConfig.Stats.Enabled)
26-
viper.Set("stats.interval", newConfig.Stats.Interval)
22+
// Stats
23+
viper.Set("stats", newConfig.Stats)
2724

25+
// Datasources
2826
viper.Set("datasources", newConfig.Datasources)
2927

30-
viper.Set("file_output.enabled", newConfig.FileConsumerOptions.Enabled)
31-
viper.Set("file_output.filename", newConfig.FileConsumerOptions.OutputFilename)
32-
33-
viper.Set("mqtt.enabled", newConfig.MQTTConsumerOptions.Enabled)
34-
viper.Set("mqtt.qos_level", newConfig.MQTTConsumerOptions.QOSLevel)
28+
// File Output
29+
viper.Set("file_output", newConfig.FileConsumerOptions)
3530

31+
// Redis
3632
viper.Set("redis_ts.enabled", newConfig.RedisOptions.Enabled)
37-
viper.Set("redis_ts.client_options.initaddress", newConfig.RedisOptions.ClientOptions.InitAddress)
38-
viper.Set("redis_ts.client_options.username", newConfig.RedisOptions.ClientOptions.Username)
39-
viper.Set("redis_ts.client_options.password", newConfig.RedisOptions.ClientOptions.Password)
40-
viper.Set("redis_ts.ts.retention", newConfig.RedisOptions.TsOptions.Retention)
41-
viper.Set("redis_ts.ts.chunksize", newConfig.RedisOptions.TsOptions.ChunkSize)
42-
viper.Set("redis_ts.ts.maxmemory", newConfig.RedisOptions.TsOptions.MaxMemory)
43-
44-
viper.Set("websocket_server.enabled", newConfig.WebsocketConsumerOptions.Enabled)
45-
viper.Set("websocket_server.ticker_endpoint", newConfig.WebsocketConsumerOptions.TickersEndpoint)
46-
viper.Set("websocket_server.flush_interval", newConfig.WebsocketConsumerOptions.FlushInterval)
47-
viper.Set("websocket_server.serialization_protocol", newConfig.WebsocketConsumerOptions.SerializationProtocol)
33+
viper.Set("redis_ts.client_options", newConfig.RedisOptions.ClientOptions)
34+
viper.Set("redis_ts.ts", newConfig.RedisOptions.TsOptions)
35+
36+
// Websocket
37+
viper.Set("websocket_server", newConfig.WebsocketConsumerOptions)
38+
39+
// ZeroMQ
40+
viper.Set("zeromq", newConfig.ZMQConsumerOptions)
4841

42+
// Ticker Transformations
4943
viper.Set("ticker_transformations", newConfig.TickerTransformationOptions)
5044

51-
if saveCurrentConfig {
52-
slog.Info("Saving new config")
53-
err := viper.WriteConfig()
54-
if err != nil {
55-
slog.Error("error saving new config to file", "error", err)
56-
}
45+
slog.Info("Persisting new configuration to disk")
46+
47+
// Get the actual config file path from viper (defaulting if not set)
48+
configFile := viper.ConfigFileUsed()
49+
if configFile == "" {
50+
configFile = "config.yaml"
51+
}
52+
53+
if err := WriteConfig(configFile); err != nil {
54+
slog.Error("CRITICAL: Failed to save config file", "error", err)
5755
}
5856
}

constants/constants.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,6 @@ import (
77
type AssetName string
88
type AssetList []string
99

10-
var BASES_CRYPTO = AssetList{"FLR", "SGB", "BTC", "XRP", "LTC", "XLM", "DOGE",
11-
"ADA", "ALGO", "ETH", "FIL", "ARB", "AVAX", "BNB", "MATIC", "SOL", "USDC", "USDT", "XDC",
12-
"TRX", "DOT", "SHIB", "UNI", "HBAR", "NEAR", "VET", "RNDR", "STRK", "AAVE", "QNT",
13-
"XTZ", "GALA", "ATOM", "ETC", "BEAM", "IMX", "STX", "APT", "OP", "ICP", "INJ", "TIA", "GRT", "SUI", "LDO"}
14-
1510
var BASES_FOREX = AssetList{
1611
"EUR",
1712
"JPY",

0 commit comments

Comments
 (0)