Skip to content

feat: Charset parameter for Mysql connection #1027

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 38 commits into
base: master
Choose a base branch
from

Conversation

sigalikanevsky
Copy link

No description provided.

eitamring and others added 30 commits January 6, 2022 15:02
…ests

Removed FlushBinlog which was only used for testing
…deleted_while_reading

Fix/eitam/add skip when table is deleted while reading
…_decode

Added support to convert byte to string dynamically
…_to_date

Change mysql_date default zero based value to nil
…_to_date

Change mysql_MYSQL_TYPE_TIMESTAMP2 and MYSQL_TYPE_TIMESTAMP to be def…
…_to_date

Change decodeDatetime2 default to zero
Change decodeTimestamp2 to return nil in case sec = 0
…p_log

Changed SP is missing to debug level
Copy link
Collaborator

@lance6716 lance6716 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please fix git conflict

@@ -132,7 +132,7 @@ type MyEventHandler struct {
}

func (h *MyEventHandler) OnRow(e *canal.RowsEvent) error {
log.Infof("%s %v\n", e.Action, e.Rows)
log.Debugf("%s %v\n", e.Action, e.Rows)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the log level should be kept, other developers may rely on this behaviour.
Also for other files.

@@ -65,6 +65,10 @@ func NewCanal(cfg *Config) (*Canal, error) {

c.ctx, c.cancel = context.WithCancel(context.Background())

if cfg.WaitTimeBetweenConnectionSeconds > 0 {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it should be <= 0 to set the default value?

@@ -270,38 +279,10 @@ func (c *Canal) handleRowsEvent(e *replication.BinlogEvent) error {
return errors.Errorf("%s not supported now", e.Header.EventType)
}
events := newRowsEvent(t, action, ev.Rows, e.Header)
events.Header.Gtid = c.SyncedGTIDSet()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain why this is changed?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@@ -697,7 +711,8 @@ func (b *BinlogSyncer) onStream(s *BinlogStreamer) {
return
}

log.Errorf("retry sync err: %v, wait 1s and retry again", err)
log.Errorf("retry sync err: %v, wait %s s and retry again", err, b.cfg.WaitTimeBetweenConnectionSeconds)
time.Sleep(b.cfg.WaitTimeBetweenConnectionSeconds)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the sleep is at line 705. Don't need to add extra time.Sleep.

@@ -719,7 +734,11 @@ func (b *BinlogSyncer) onStream(s *BinlogStreamer) {

switch data[0] {
case OK_HEADER:
if err = b.parseEvent(s, data); err != nil {
if err = b.parseEvent(b.nextPos.Name, s, data); err != nil {
// if the error is errMissingTableMapEvent skip
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

data may be lost if we don't return error

case MYSQL_TYPE_STRING:
v, n = decodeString(data, length)
if !utf8.Valid(data) && e.Table.isLatin {
v, n = decodeStringLatin1(data, length)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The decoding and converting should be done at upper layer, not here. replication package aims to provide a golang representation of binlog.

Copy link
Collaborator

@dveeden dveeden left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please reference an issue or describe the changes in the description of the PR

@@ -1182,6 +1217,29 @@ func decodeString(data []byte, length int) (v string, n int) {
return
}

func decodeStringLatin1(data []byte, length int) (v string, n int) {
// Define the Latin1 decoder
decoder := charmap.ISO8859_1.NewDecoder()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that latin1 in MySQL is cp1252 and not ISO 8859-1:

"MySQL's latin1 is the same as the Windows cp1252 character set. This means it is the same as the official ISO 8859-1 or IANA (Internet Assigned Numbers Authority) latin1, except that IANA latin1 treats the code points between 0x80 and 0x9f as “undefined,” whereas cp1252, and therefore MySQL's latin1, assign characters for those positions. For example, 0x80 is the Euro sign. For the “undefined” entries in cp1252, MySQL translates 0x81 to Unicode 0x0081, 0x8d to 0x008d, 0x8f to 0x008f, 0x90 to 0x0090, and 0x9d to 0x009d. "

source: https://dev.mysql.com/doc/refman/8.4/en/charset-we-sets.html

Suggested change
decoder := charmap.ISO8859_1.NewDecoder()
decoder := charmap.Windows1252.NewDecoder()

@@ -1182,6 +1217,29 @@ func decodeString(data []byte, length int) (v string, n int) {
return
}

func decodeStringLatin1(data []byte, length int) (v string, n int) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be good to make things a bit more generic, so it could work with multiple charsets and not just latin1.

However I think a vast majority of MySQL installations use either latin1 or utf8/utf8mb4. Maybe some ascii, but that's a subset of both latin1 and utf8/utf8mb4...

@dveeden
Copy link
Collaborator

dveeden commented May 6, 2025

@sigalikanevsky Could you fix the conflicts by merging the master branch into your branch (or rebasing on top of the master branch)?

Comment on lines -276 to -303
func (c *Canal) FlushBinlog() error {
_, err := c.Execute("FLUSH BINARY LOGS")
return errors.Trace(err)
}

func (c *Canal) WaitUntilPos(pos mysql.Position, timeout time.Duration) error {
timer := time.NewTimer(timeout)
for {
select {
case <-timer.C:
return errors.Errorf("wait position %v too long > %s", pos, timeout)
default:
err := c.FlushBinlog()
if err != nil {
return errors.Trace(err)
}
curPos := c.master.Position()
if curPos.Compare(pos) >= 0 {
return nil
} else {
log.Debugf("master pos is %v, wait catching %v", curPos, pos)
time.Sleep(100 * time.Millisecond)
}
}
}

return nil
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removing this break the API of the package

I don't see why they were moved as functions accepting Canal

@@ -142,7 +143,15 @@ func (c *Canal) runSyncBinlog() error {
case *replication.QueryEvent:
stmts, _, err := c.parser.Parse(string(e.Query), "", "")
if err != nil {
log.Errorf("parse query(%s) err %v, will skip this event", e.Query, err)
msg := err.Error()
if strings.Contains(strings.ToLower(msg), strings.ToLower("procedure")) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Parsing error message like this is a bad practice.

Don't you have an error code should could catch?

Comment on lines -25 to +26
log.Infof("start sync binlog at binlog file %v", pos)
log.Debugf("start sync binlog at binlog file %v", pos)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You might think there is a need to change the log lever for these information messages.

But it should be done in another PR, not this one.

It brings confusion, especially because there are no links between these changes and the topic of the PR

@@ -270,38 +279,10 @@ func (c *Canal) handleRowsEvent(e *replication.BinlogEvent) error {
return errors.Errorf("%s not supported now", e.Header.EventType)
}
events := newRowsEvent(t, action, ev.Rows, e.Header)
events.Header.Gtid = c.SyncedGTIDSet()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Comment on lines +33 to +34
// MaxWaitTimeBetween Check connection
WaitTimeBetweenConnectionSeconds time.Duration
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// MaxWaitTimeBetween Check connection
WaitTimeBetweenConnectionSeconds time.Duration
// WaitTimeBetweenConnectionSeconds is the delay between two connections when an connection fails
WaitTimeBetweenConnectionSeconds time.Duration

Also, it's a time.Duration, so for naming you should avoid mentioning seconds

Suggested change
// MaxWaitTimeBetween Check connection
WaitTimeBetweenConnectionSeconds time.Duration
// DelayBetweenConnection is the delay between two connections when an connection fails
DelayBetweenConnection time.Duration

if err = b.parseEvent(s, data); err != nil {
if err = b.parseEvent(b.nextPos.Name, s, data); err != nil {
// if the error is errMissingTableMapEvent skip
if strings.Contains(strings.ToLower(errors.Cause(err).Error()), errMissingTableMapEvent.Error()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This way to test the error is absolutely discouraged.

Using errors.Is maybe?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants