Skip to content

Commit e59c45c

Browse files
Yisaerngjaying
andauthored
fix: cherry-pic fix to master-2.2 (#3853)
Signed-off-by: Jiyong Huang <huangjy@emqx.io> Signed-off-by: Song Gao <disxiaofei@163.com> Co-authored-by: Jiyong Huang <huangjy@emqx.io>
1 parent 229f5c8 commit e59c45c

File tree

4 files changed

+19
-15
lines changed

4 files changed

+19
-15
lines changed

extensions/impl/sql/source.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -330,6 +330,11 @@ func buildScanValueByColumnType(ctx api.StreamContext, colName, colType string,
330330
return &sql.NullInt64{}
331331
}
332332
return new(int64)
333+
case "TIMESTAMP":
334+
if nullable {
335+
return &sql.NullTime{}
336+
}
337+
return new(time.Time)
333338
default:
334339
ctx.GetLogger().Debugf("sql source meet column %v unknown columnType:%v", colName, colType)
335340
return nil

internal/server/schema_init.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ func schemasHandler(w http.ResponseWriter, r *http.Request) {
7777
return
7878
}
7979
if err = sch.Validate(); err != nil {
80-
handleError(w, nil, "Invalid body", logger)
80+
handleError(w, err, "Invalid body", logger)
8181
return
8282
}
8383
err = schema.Register(sch)
@@ -120,7 +120,6 @@ func schemaHandler(w http.ResponseWriter, r *http.Request) {
120120
handleError(w, err, "schema update command error", logger)
121121
return
122122
}
123-
w.WriteHeader(http.StatusOK)
124123
case http.MethodPut:
125124
sch := &schema.Info{Type: def.SchemaType(st), Name: name}
126125
err := json.NewDecoder(r.Body).Decode(sch)

internal/topo/node/source_node.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -258,7 +258,7 @@ func (m *SourceNode) Run(ctx api.StreamContext, ctrlCh chan<- error) {
258258
defer func() {
259259
m.s.Close(ctx)
260260
m.Close()
261-
if m.notifySub {
261+
if m.notifySub && sig.Ctrl != nil {
262262
sig.Ctrl.Rem(m.name)
263263
}
264264
}()
@@ -282,7 +282,7 @@ func (m *SourceNode) Run(ctx api.StreamContext, ctrlCh chan<- error) {
282282
if err != nil {
283283
return err
284284
}
285-
if m.notifySub {
285+
if m.notifySub && sig.Ctrl != nil {
286286
sig.Ctrl.Add(m.name)
287287
}
288288
return nil

sdk/go/runtime/sink.go

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ package runtime
1616

1717
import (
1818
context2 "context"
19-
"encoding/json"
2019
"fmt"
2120

2221
"go.nanomsg.org/mangos/v3"
@@ -90,16 +89,17 @@ func (s *sinkRuntime) run() {
9089
if err != nil {
9190
s.ctx.GetLogger().Errorf("collect error: %s", err.Error())
9291
}
93-
r := &ackResponse{}
94-
if err != nil {
95-
r.Error = err.Error()
96-
}
97-
data, _ := json.Marshal(r)
98-
if err := s.ackCh.Send(data); err != nil {
99-
s.ctx.GetLogger().Errorf("ack error: %s", err.Error())
100-
_ = s.stop()
101-
return
102-
}
92+
// temporary remove golang plugin sink ack
93+
//r := &ackResponse{}
94+
//if err != nil {
95+
// r.Error = err.Error()
96+
//}
97+
//data, _ := json.Marshal(r)
98+
//if err := s.ackCh.Send(data); err != nil {
99+
// s.ctx.GetLogger().Errorf("ack error: %s", err.Error())
100+
// _ = s.stop()
101+
// return
102+
//}
103103
}
104104
}
105105

0 commit comments

Comments
 (0)