Skip to content

Commit 7d9a134

Browse files
wj596wj596
authored andcommitted
v1.0.2
1 parent 3219ae2 commit 7d9a134

File tree

19 files changed

+1264
-121
lines changed

19 files changed

+1264
-121
lines changed

README.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,3 +118,9 @@ server_id=1 # 配置 MySQL replaction 需要定义,不要和 go-mysql-transfer
118118

119119
* 9.22 release
120120

121+
**v1.0.2 release**
122+
123+
* Lua引擎增加数据库操作(dbOps)模块、 http操作(httpOps)模块
124+
* 修复enum类型出现的乱码问题
125+
* redis增加Sorted Set数据类型
126+

global/config.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,8 @@ func NewConfigWithFile(name string) (*Config, error) {
177177
if err := checkElsConfig(&c); err != nil {
178178
return nil, errors.Trace(err)
179179
}
180+
default:
181+
return nil, errors.Errorf("unsupported target: %s", c.Target)
180182
}
181183

182184
_config = &c
@@ -361,6 +363,10 @@ func checkElsConfig(c *Config) error {
361363
return nil
362364
}
363365

366+
func checkLuaConfig(c *Config) error {
367+
return nil
368+
}
369+
364370
func (c *Config) IsCluster() bool {
365371
if !c.IsZk() && !c.IsEtcd() {
366372
return false

global/model.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,15 @@
1818
package global
1919

2020
import (
21-
"github.com/siddontang/go-mysql/schema"
2221
"sync"
22+
23+
"github.com/siddontang/go-mysql/schema"
2324
)
2425

2526
type RowRequest struct {
2627
RuleKey string
2728
Action string
29+
OldRow []interface{}
2830
Row []interface{}
2931
}
3032

@@ -39,6 +41,8 @@ type RedisRespond struct {
3941
Structure string
4042
Key string
4143
Field string
44+
Score float64
45+
OldVal interface{}
4246
Val interface{}
4347
}
4448

global/rule.go

Lines changed: 29 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -37,10 +37,11 @@ import (
3737
)
3838

3939
const (
40-
RedisStructureString = "1"
41-
RedisStructureHash = "2"
42-
RedisStructureList = "3"
43-
RedisStructureSet = "4"
40+
RedisStructureString = "String"
41+
RedisStructureHash = "Hash"
42+
RedisStructureList = "List"
43+
RedisStructureSet = "Set"
44+
RedisStructureSortedSet = "SortedSet"
4445

4546
ValEncoderJson = "json"
4647
ValEncoderKVCommas = "kv-commas"
@@ -93,12 +94,14 @@ type Rule struct {
9394
RedisHashFieldPrefix string `yaml:"redis_hash_field_prefix"`
9495
// 使用哪个列的值作为hash的field,仅redis_structure为hash时起作用
9596
RedisHashFieldColumn string `yaml:"redis_hash_field_column"`
96-
97-
RedisKeyColumnIndex int
98-
RedisKeyColumnIndexs []int
99-
RedisKeyColumnIndexMap map[string]int
100-
RedisHashFieldColumnIndex int
101-
RedisHashFieldColumnIndexs []int
97+
// Sorted Set(有序集合)的Score
98+
RedisSortedSetScoreColumn string `yaml:"redis_sorted_set_score_column"`
99+
RedisKeyColumnIndex int
100+
RedisKeyColumnIndexs []int
101+
RedisKeyColumnIndexMap map[string]int
102+
RedisHashFieldColumnIndex int
103+
RedisHashFieldColumnIndexs []int
104+
RedisSortedSetScoreColumnIndex int
102105

103106
// ------------------- ROCKETMQ -----------------
104107
RocketmqTopic string `yaml:"rocketmq_topic"` //rocketmq topic名称,可以为空,为空时使用表名称
@@ -191,21 +194,6 @@ func RuleInsList() []*Rule {
191194
return list
192195
}
193196

194-
func StructureName(structure string) string {
195-
switch structure {
196-
case RedisStructureString:
197-
return "string"
198-
case RedisStructureHash:
199-
return "hash"
200-
case RedisStructureList:
201-
return "list"
202-
case RedisStructureSet:
203-
return "set"
204-
}
205-
206-
return ""
207-
}
208-
209197
func (s *Rule) Initialize() error {
210198
if err := s.buildPaddingMap(); err != nil {
211199
return err
@@ -513,8 +501,21 @@ func (s *Rule) initRedisConfig() error {
513501
if s.RedisKeyValue == "" {
514502
return errors.New("empty redis_key_value not allowed in rule")
515503
}
504+
case "SORTEDSET":
505+
s.RedisStructure = RedisStructureSortedSet
506+
if s.RedisKeyValue == "" {
507+
return errors.New("empty redis_key_value not allowed in rule")
508+
}
509+
if s.RedisSortedSetScoreColumn == "" {
510+
return errors.New("empty redis_sorted_set_score_column not allowed in rule")
511+
}
512+
_, index := s.TableColumn(s.RedisSortedSetScoreColumn)
513+
if index < 0 {
514+
return errors.New("redis_sorted_set_score_column must be table column")
515+
}
516+
s.RedisHashFieldColumnIndex = index
516517
default:
517-
return errors.Errorf(" redis_structure must be string or hash or list or set")
518+
return errors.Errorf("redis_structure must be string or hash or list or set")
518519
}
519520

520521
if s.RedisKeyColumn != "" {
@@ -664,9 +665,11 @@ func (s *Rule) PreCompileLuaScript(dataDir string) error {
664665
strings.Contains(script, `HSET(`) ||
665666
strings.Contains(script, `RPUSH(`) ||
666667
strings.Contains(script, `SADD(`) ||
668+
strings.Contains(script, `ZADD(`) ||
667669
strings.Contains(script, `DEL(`) ||
668670
strings.Contains(script, `HDEL(`) ||
669671
strings.Contains(script, `LREM(`) ||
672+
strings.Contains(script, `ZREM(`) ||
670673
strings.Contains(script, `SREM(`)) {
671674

672675
return errors.New("lua script incorrect format")

go.mod

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,11 @@ require (
1616
github.com/onsi/ginkgo v1.14.0 // indirect
1717
github.com/pingcap/errors v0.11.4
1818
github.com/pingcap/tidb v1.1.0-beta.0.20191115021711-b274eb2079dc
19+
github.com/pkg/errors v0.9.1
1920
github.com/pquerna/ffjson v0.0.0-20190930134022-aa0246cd15f7
2021
github.com/prometheus/client_golang v1.0.0
2122
github.com/samuel/go-zookeeper v0.0.0-20200724154423-2164a8ac840e
2223
github.com/satori/go.uuid v1.2.0
23-
github.com/shopspring/decimal v0.0.0-20180709203117-cd690d0c9e24
2424
github.com/siddontang/go-log v0.0.0-20180807004314-8d05993dda07
2525
github.com/siddontang/go-mysql v1.1.0
2626
github.com/streadway/amqp v1.0.0
@@ -31,7 +31,6 @@ require (
3131
go.mongodb.org/mongo-driver v1.4.0
3232
go.uber.org/atomic v1.6.0
3333
go.uber.org/zap v1.15.0
34-
golang.org/x/tools v0.0.0-20191107010934-f79515f33823
3534
gopkg.in/natefinch/lumberjack.v2 v2.0.0
3635
gopkg.in/yaml.v2 v2.3.0
3736
)

main.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -64,9 +64,8 @@ func main() {
6464
n := runtime.GOMAXPROCS(runtime.NumCPU())
6565
log.Println(fmt.Sprintf("GOMAXPROCS :%d", n))
6666

67-
68-
//cfgPath = "D:\\transfer\\app_elasticsearch.yml"
69-
//stockFlag =true
67+
cfgPath = "D:\\transfer\\app_redis.yml"
68+
//stockFlag = true
7069

7170
err := service.InitApplication(cfgPath)
7271
if err != nil {

service/endpoint/endpoint.go

Lines changed: 34 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929
"github.com/vmihailenco/msgpack"
3030

3131
"go-mysql-transfer/global"
32+
"go-mysql-transfer/service/luaengine"
3233
"go-mysql-transfer/storage"
3334
"go-mysql-transfer/util/logutil"
3435
"go-mysql-transfer/util/stringutil"
@@ -42,7 +43,9 @@ type Endpoint interface {
4243
Close()
4344
}
4445

45-
func NewEndpoint(c *global.Config) Endpoint {
46+
func NewEndpoint(c *global.Config, ds *canal.Canal) Endpoint {
47+
luaengine.InitActuator(ds)
48+
4649
if c.IsRedis() {
4750
return newRedisEndpoint(c)
4851
}
@@ -76,23 +79,29 @@ func NewEndpoint(c *global.Config) Endpoint {
7679
}
7780

7881
func convertColumnData(value interface{}, col *schema.TableColumn, rule *global.Rule) interface{} {
82+
if value == nil {
83+
return nil
84+
}
85+
7986
switch col.Type {
8087
case schema.TYPE_ENUM:
8188
switch value := value.(type) {
8289
case int64:
83-
// for binlog, ENUM may be int64, but for dump, enum is string
8490
eNum := value - 1
8591
if eNum < 0 || eNum >= int64(len(col.EnumValues)) {
8692
// we insert invalid enum value before, so return empty
8793
logutil.Warnf("invalid binlog enum index %d, for enum %v", eNum, col.EnumValues)
8894
return ""
8995
}
9096
return col.EnumValues[eNum]
97+
case string:
98+
return value
99+
case []byte:
100+
return string(value)
91101
}
92102
case schema.TYPE_SET:
93103
switch value := value.(type) {
94104
case int64:
95-
// for binlog, SET may be int64, but for dump, SET is string
96105
bitmask := value
97106
sets := make([]string, 0, len(col.SetValues))
98107
for i, s := range col.SetValues {
@@ -105,12 +114,9 @@ func convertColumnData(value interface{}, col *schema.TableColumn, rule *global.
105114
case schema.TYPE_BIT:
106115
switch value := value.(type) {
107116
case string:
108-
// for binlog, BIT is int64, but for dump, BIT is string
109-
// for dump 0x01 is for 1, \0 is for 0
110117
if value == "\x01" {
111118
return int64(1)
112119
}
113-
114120
return int64(0)
115121
}
116122
case schema.TYPE_STRING:
@@ -236,6 +242,28 @@ func keyValueMap(re *global.RowRequest, rule *global.Rule, primitive bool) map[s
236242
return kv
237243
}
238244

245+
func oldKeyValueMap(request *global.RowRequest, rule *global.Rule, primitive bool) map[string]interface{} {
246+
kv := make(map[string]interface{}, len(rule.PaddingMap))
247+
if primitive {
248+
for _, padding := range rule.PaddingMap {
249+
kv[padding.ColumnName] = convertColumnData(request.OldRow[padding.ColumnIndex], padding.ColumnMetadata, rule)
250+
}
251+
return kv
252+
}
253+
254+
for _, padding := range rule.PaddingMap {
255+
kv[padding.WrapName] = convertColumnData(request.OldRow[padding.ColumnIndex], padding.ColumnMetadata, rule)
256+
}
257+
258+
if rule.DefaultColumnValueConfig != "" {
259+
for k, v := range rule.DefaultColumnValueMap {
260+
kv[rule.WrapName(k)] = v
261+
}
262+
}
263+
264+
return kv
265+
}
266+
239267
func primaryKey(re *global.RowRequest, rule *global.Rule) interface{} {
240268
if rule.IsCompositeKey { // 组合ID
241269
var key string

0 commit comments

Comments
 (0)