From f558bf9d881a3fe0fed12c9fa2688406b0353f98 Mon Sep 17 00:00:00 2001 From: BLAZZ Date: Sat, 26 Nov 2022 11:55:29 +0800 Subject: [PATCH 1/7] support auto download flushed binlog and parse event for cloud computing platform (etc. aliyun) --- canal/canal.go | 2 + canal/local.go | 105 ++++++++++++++++++++++++++++++++++ canal/sync.go | 2 +- replication/binlogstreamer.go | 13 +++++ 4 files changed, 121 insertions(+), 1 deletion(-) create mode 100644 canal/local.go diff --git a/canal/canal.go b/canal/canal.go index 27799de80..113305546 100644 --- a/canal/canal.go +++ b/canal/canal.go @@ -52,6 +52,8 @@ type Canal struct { ctx context.Context cancel context.CancelFunc + + binFileDownload BinlogFileDownload } // canal will retry fetching unknown table's meta after UnknownTableRetryPeriod diff --git a/canal/local.go b/canal/local.go new file mode 100644 index 000000000..7cf704cf8 --- /dev/null +++ b/canal/local.go @@ -0,0 +1,105 @@ +package canal + +import ( + "context" + "github.com/go-mysql-org/go-mysql/mysql" + "github.com/go-mysql-org/go-mysql/replication" + "github.com/pingcap/errors" +) + +// BinlogFileDownload download the binlog file from cloud computing platform (etc. aliyun) +type BinlogFileDownload func(mysql.Position) (localBinFilePath string, err error) + +// WithLocalBinlogDownload registers the local bin file download, +// that allows download the flushed binlog file to local (etc. aliyun) +func (c *Canal) WithLocalBinlogDownload(d BinlogFileDownload) { + c.binFileDownload = d +} + +func (c *Canal) adaptLocalBinFileStreamer(syncMasterStreamer *replication.BinlogStreamer, err error) (*LocalBinFileAdapterStreamer, error) { + return &LocalBinFileAdapterStreamer{ + BinlogStreamer: syncMasterStreamer, + syncMasterStreamer: syncMasterStreamer, + canal: c, + binFileDownload: c.binFileDownload, + }, err +} + +// LocalBinFileAdapterStreamer will support to download flushed binlog file for continuous sync in cloud computing platform +type LocalBinFileAdapterStreamer struct { + *replication.BinlogStreamer // the running streamer, it will be localStreamer or sync master streamer + syncMasterStreamer *replication.BinlogStreamer // syncMasterStreamer is the streamer from startSyncer + canal *Canal + binFileDownload BinlogFileDownload +} + +// GetEvent will auto switch the running streamer and return replication.BinlogEvent +func (s *LocalBinFileAdapterStreamer) GetEvent(ctx context.Context) (*replication.BinlogEvent, error) { + if s.binFileDownload == nil { // not support to use local bin file + return s.BinlogStreamer.GetEvent(ctx) + } + + ev, err := s.BinlogStreamer.GetEvent(ctx) + + if err == nil { + switch ev.Event.(type) { + case *replication.RotateEvent: // RotateEvent means need to change steamer back to sync master to retry sync + s.BinlogStreamer = s.syncMasterStreamer + } + return ev, err + } + + if err == replication.ErrNeedSyncAgain { // restart master if last sync master syncer has error + s.canal.syncer.Close() + _ = s.canal.prepareSyncer() + + newStreamer, startErr := s.canal.startSyncer() + if startErr == nil { + ev, err = newStreamer.GetEvent(ctx) + } + // set all streamer to the new sync master streamer + s.BinlogStreamer = newStreamer + s.syncMasterStreamer = newStreamer + } + + if mysqlErr, ok := err.(*mysql.MyError); ok { + // change to local binlog file streamer to adapter the steamer + if mysqlErr.Code == mysql.ER_MASTER_FATAL_ERROR_READING_BINLOG && + mysqlErr.Message == "Could not find first log file name in binary log index file" { + gset := s.canal.master.GTIDSet() + if gset == nil || gset.String() == "" { // currently only support xid mode + s.canal.cfg.Logger.Info("Could not find first log, try to download the local binlog for retry") + pos := s.canal.master.Position() + newStreamer := newLocalBinFileStreamer(s.binFileDownload, pos) + + s.syncMasterStreamer = s.BinlogStreamer + s.BinlogStreamer = newStreamer + + return newStreamer.GetEvent(ctx) + } + } + } + + return ev, err +} + +func newLocalBinFileStreamer(download BinlogFileDownload, position mysql.Position) *replication.BinlogStreamer { + streamer := replication.NewBinlogStreamer() + binFilePath, err := download(position) + if err != nil { + streamer.CloseWithError(errors.New("local binlog file not exist")) + } + + beginFromHere := false + go replication.NewBinlogParser().ParseFile(binFilePath, 0, func(be *replication.BinlogEvent) error { + if be.Header.LogPos == position.Pos || position.Pos == 4 { // go ahead to check if begin + beginFromHere = true + } + if beginFromHere { + streamer.PutEvent(be) + } + return nil + }) + + return streamer +} diff --git a/canal/sync.go b/canal/sync.go index 1c9c8d308..835fc7a1a 100644 --- a/canal/sync.go +++ b/canal/sync.go @@ -35,7 +35,7 @@ func (c *Canal) startSyncer() (*replication.BinlogStreamer, error) { } func (c *Canal) runSyncBinlog() error { - s, err := c.startSyncer() + s, err := c.adaptLocalBinFileStreamer(c.startSyncer()) if err != nil { return err } diff --git a/replication/binlogstreamer.go b/replication/binlogstreamer.go index 56b8622a8..93e96f52a 100644 --- a/replication/binlogstreamer.go +++ b/replication/binlogstreamer.go @@ -92,3 +92,16 @@ func newBinlogStreamer() *BinlogStreamer { return s } + +// PutEvent puts event to BinlogStreamer +func (s *BinlogStreamer) PutEvent(ev *BinlogEvent) { + s.ch <- ev +} + +func (s *BinlogStreamer) CloseWithError(err error) { + s.closeWithError(err) +} + +func NewBinlogStreamer() *BinlogStreamer { + return newBinlogStreamer() +} From 226e13734c5424d7f4532afbd728423f7f70d602 Mon Sep 17 00:00:00 2001 From: BLAZZ Date: Mon, 28 Nov 2022 10:49:39 +0800 Subject: [PATCH 2/7] linter bug fix --- canal/local.go | 23 +++++++++++++---------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/canal/local.go b/canal/local.go index 7cf704cf8..c6d3000f4 100644 --- a/canal/local.go +++ b/canal/local.go @@ -2,6 +2,7 @@ package canal import ( "context" + "github.com/go-mysql-org/go-mysql/mysql" "github.com/go-mysql-org/go-mysql/replication" "github.com/pingcap/errors" @@ -90,16 +91,18 @@ func newLocalBinFileStreamer(download BinlogFileDownload, position mysql.Positio streamer.CloseWithError(errors.New("local binlog file not exist")) } - beginFromHere := false - go replication.NewBinlogParser().ParseFile(binFilePath, 0, func(be *replication.BinlogEvent) error { - if be.Header.LogPos == position.Pos || position.Pos == 4 { // go ahead to check if begin - beginFromHere = true - } - if beginFromHere { - streamer.PutEvent(be) - } - return nil - }) + go func(binFilePath string, streamer *replication.BinlogStreamer) { + beginFromHere := false + _ = replication.NewBinlogParser().ParseFile(binFilePath, 0, func(be *replication.BinlogEvent) error { + if be.Header.LogPos == position.Pos || position.Pos == 4 { // go ahead to check if begin + beginFromHere = true + } + if beginFromHere { + streamer.PutEvent(be) + } + return nil + }) + }(binFilePath, streamer) return streamer } From e7f35db3bc3530c1ab367289cae8976cbdfabfe5 Mon Sep 17 00:00:00 2001 From: BLAZZ Date: Tue, 29 Nov 2022 19:33:24 +0800 Subject: [PATCH 3/7] goimports format --- canal/local.go | 216 ++++++++++++++++++++++++------------------------- 1 file changed, 108 insertions(+), 108 deletions(-) diff --git a/canal/local.go b/canal/local.go index c6d3000f4..f308ffc9d 100644 --- a/canal/local.go +++ b/canal/local.go @@ -1,108 +1,108 @@ -package canal - -import ( - "context" - - "github.com/go-mysql-org/go-mysql/mysql" - "github.com/go-mysql-org/go-mysql/replication" - "github.com/pingcap/errors" -) - -// BinlogFileDownload download the binlog file from cloud computing platform (etc. aliyun) -type BinlogFileDownload func(mysql.Position) (localBinFilePath string, err error) - -// WithLocalBinlogDownload registers the local bin file download, -// that allows download the flushed binlog file to local (etc. aliyun) -func (c *Canal) WithLocalBinlogDownload(d BinlogFileDownload) { - c.binFileDownload = d -} - -func (c *Canal) adaptLocalBinFileStreamer(syncMasterStreamer *replication.BinlogStreamer, err error) (*LocalBinFileAdapterStreamer, error) { - return &LocalBinFileAdapterStreamer{ - BinlogStreamer: syncMasterStreamer, - syncMasterStreamer: syncMasterStreamer, - canal: c, - binFileDownload: c.binFileDownload, - }, err -} - -// LocalBinFileAdapterStreamer will support to download flushed binlog file for continuous sync in cloud computing platform -type LocalBinFileAdapterStreamer struct { - *replication.BinlogStreamer // the running streamer, it will be localStreamer or sync master streamer - syncMasterStreamer *replication.BinlogStreamer // syncMasterStreamer is the streamer from startSyncer - canal *Canal - binFileDownload BinlogFileDownload -} - -// GetEvent will auto switch the running streamer and return replication.BinlogEvent -func (s *LocalBinFileAdapterStreamer) GetEvent(ctx context.Context) (*replication.BinlogEvent, error) { - if s.binFileDownload == nil { // not support to use local bin file - return s.BinlogStreamer.GetEvent(ctx) - } - - ev, err := s.BinlogStreamer.GetEvent(ctx) - - if err == nil { - switch ev.Event.(type) { - case *replication.RotateEvent: // RotateEvent means need to change steamer back to sync master to retry sync - s.BinlogStreamer = s.syncMasterStreamer - } - return ev, err - } - - if err == replication.ErrNeedSyncAgain { // restart master if last sync master syncer has error - s.canal.syncer.Close() - _ = s.canal.prepareSyncer() - - newStreamer, startErr := s.canal.startSyncer() - if startErr == nil { - ev, err = newStreamer.GetEvent(ctx) - } - // set all streamer to the new sync master streamer - s.BinlogStreamer = newStreamer - s.syncMasterStreamer = newStreamer - } - - if mysqlErr, ok := err.(*mysql.MyError); ok { - // change to local binlog file streamer to adapter the steamer - if mysqlErr.Code == mysql.ER_MASTER_FATAL_ERROR_READING_BINLOG && - mysqlErr.Message == "Could not find first log file name in binary log index file" { - gset := s.canal.master.GTIDSet() - if gset == nil || gset.String() == "" { // currently only support xid mode - s.canal.cfg.Logger.Info("Could not find first log, try to download the local binlog for retry") - pos := s.canal.master.Position() - newStreamer := newLocalBinFileStreamer(s.binFileDownload, pos) - - s.syncMasterStreamer = s.BinlogStreamer - s.BinlogStreamer = newStreamer - - return newStreamer.GetEvent(ctx) - } - } - } - - return ev, err -} - -func newLocalBinFileStreamer(download BinlogFileDownload, position mysql.Position) *replication.BinlogStreamer { - streamer := replication.NewBinlogStreamer() - binFilePath, err := download(position) - if err != nil { - streamer.CloseWithError(errors.New("local binlog file not exist")) - } - - go func(binFilePath string, streamer *replication.BinlogStreamer) { - beginFromHere := false - _ = replication.NewBinlogParser().ParseFile(binFilePath, 0, func(be *replication.BinlogEvent) error { - if be.Header.LogPos == position.Pos || position.Pos == 4 { // go ahead to check if begin - beginFromHere = true - } - if beginFromHere { - streamer.PutEvent(be) - } - return nil - }) - }(binFilePath, streamer) - - return streamer -} +package canal + +import ( + "context" + + "github.com/go-mysql-org/go-mysql/mysql" + "github.com/go-mysql-org/go-mysql/replication" + "github.com/pingcap/errors" +) + +// BinlogFileDownload download the binlog file from cloud computing platform (etc. aliyun) +type BinlogFileDownload func(mysql.Position) (localBinFilePath string, err error) + +// WithLocalBinlogDownload registers the local bin file download, +// that allows download the flushed binlog file to local (etc. aliyun) +func (c *Canal) WithLocalBinlogDownload(d BinlogFileDownload) { + c.binFileDownload = d +} + +func (c *Canal) adaptLocalBinFileStreamer(syncMasterStreamer *replication.BinlogStreamer, err error) (*LocalBinFileAdapterStreamer, error) { + return &LocalBinFileAdapterStreamer{ + BinlogStreamer: syncMasterStreamer, + syncMasterStreamer: syncMasterStreamer, + canal: c, + binFileDownload: c.binFileDownload, + }, err +} + +// LocalBinFileAdapterStreamer will support to download flushed binlog file for continuous sync in cloud computing platform +type LocalBinFileAdapterStreamer struct { + *replication.BinlogStreamer // the running streamer, it will be localStreamer or sync master streamer + syncMasterStreamer *replication.BinlogStreamer // syncMasterStreamer is the streamer from startSyncer + canal *Canal + binFileDownload BinlogFileDownload +} + +// GetEvent will auto switch the running streamer and return replication.BinlogEvent +func (s *LocalBinFileAdapterStreamer) GetEvent(ctx context.Context) (*replication.BinlogEvent, error) { + if s.binFileDownload == nil { // not support to use local bin file + return s.BinlogStreamer.GetEvent(ctx) + } + + ev, err := s.BinlogStreamer.GetEvent(ctx) + + if err == nil { + switch ev.Event.(type) { + case *replication.RotateEvent: // RotateEvent means need to change steamer back to sync master to retry sync + s.BinlogStreamer = s.syncMasterStreamer + } + return ev, err + } + + if err == replication.ErrNeedSyncAgain { // restart master if last sync master syncer has error + s.canal.syncer.Close() + _ = s.canal.prepareSyncer() + + newStreamer, startErr := s.canal.startSyncer() + if startErr == nil { + ev, err = newStreamer.GetEvent(ctx) + } + // set all streamer to the new sync master streamer + s.BinlogStreamer = newStreamer + s.syncMasterStreamer = newStreamer + } + + if mysqlErr, ok := err.(*mysql.MyError); ok { + // change to local binlog file streamer to adapter the steamer + if mysqlErr.Code == mysql.ER_MASTER_FATAL_ERROR_READING_BINLOG && + mysqlErr.Message == "Could not find first log file name in binary log index file" { + gset := s.canal.master.GTIDSet() + if gset == nil || gset.String() == "" { // currently only support xid mode + s.canal.cfg.Logger.Info("Could not find first log, try to download the local binlog for retry") + pos := s.canal.master.Position() + newStreamer := newLocalBinFileStreamer(s.binFileDownload, pos) + + s.syncMasterStreamer = s.BinlogStreamer + s.BinlogStreamer = newStreamer + + return newStreamer.GetEvent(ctx) + } + } + } + + return ev, err +} + +func newLocalBinFileStreamer(download BinlogFileDownload, position mysql.Position) *replication.BinlogStreamer { + streamer := replication.NewBinlogStreamer() + binFilePath, err := download(position) + if err != nil { + streamer.CloseWithError(errors.New("local binlog file not exist")) + } + + go func(binFilePath string, streamer *replication.BinlogStreamer) { + beginFromHere := false + _ = replication.NewBinlogParser().ParseFile(binFilePath, 0, func(be *replication.BinlogEvent) error { + if be.Header.LogPos == position.Pos || position.Pos == 4 { // go ahead to check if begin + beginFromHere = true + } + if beginFromHere { + streamer.PutEvent(be) + } + return nil + }) + }(binFilePath, streamer) + + return streamer +} From 17fa910fa5a8eced4310ddd871b5eb3ab9905c65 Mon Sep 17 00:00:00 2001 From: BLAZZ Date: Thu, 1 Dec 2022 17:06:41 +0800 Subject: [PATCH 4/7] fix startErr panic and change some name --- canal/canal.go | 2 +- canal/local.go | 39 ++++++++++++++++++++------------------- 2 files changed, 21 insertions(+), 20 deletions(-) diff --git a/canal/canal.go b/canal/canal.go index 1c1fd8d94..001c4ad79 100644 --- a/canal/canal.go +++ b/canal/canal.go @@ -55,7 +55,7 @@ type Canal struct { ctx context.Context cancel context.CancelFunc - binFileDownload BinlogFileDownload + binFileDownload BinlogFileDownloader } // canal will retry fetching unknown table's meta after UnknownTableRetryPeriod diff --git a/canal/local.go b/canal/local.go index f308ffc9d..606b32a66 100644 --- a/canal/local.go +++ b/canal/local.go @@ -8,34 +8,34 @@ import ( "github.com/pingcap/errors" ) -// BinlogFileDownload download the binlog file from cloud computing platform (etc. aliyun) -type BinlogFileDownload func(mysql.Position) (localBinFilePath string, err error) +// BinlogFileDownloader downloads the binlog file and return the path to it. It's often used to download binlog backup from RDS service. +type BinlogFileDownloader func(mysql.Position) (localBinFilePath string, err error) -// WithLocalBinlogDownload registers the local bin file download, -// that allows download the flushed binlog file to local (etc. aliyun) -func (c *Canal) WithLocalBinlogDownload(d BinlogFileDownload) { +// WithLocalBinlogDownloader registers the local bin file downloader, +// that allows download the backup binlog file from RDS service to local +func (c *Canal) WithLocalBinlogDownloader(d BinlogFileDownloader) { c.binFileDownload = d } -func (c *Canal) adaptLocalBinFileStreamer(syncMasterStreamer *replication.BinlogStreamer, err error) (*LocalBinFileAdapterStreamer, error) { - return &LocalBinFileAdapterStreamer{ - BinlogStreamer: syncMasterStreamer, - syncMasterStreamer: syncMasterStreamer, +func (c *Canal) adaptLocalBinFileStreamer(remoteBinlogStreamer *replication.BinlogStreamer, err error) (*localBinFileAdapterStreamer, error) { + return &localBinFileAdapterStreamer{ + BinlogStreamer: remoteBinlogStreamer, + syncMasterStreamer: remoteBinlogStreamer, canal: c, binFileDownload: c.binFileDownload, }, err } -// LocalBinFileAdapterStreamer will support to download flushed binlog file for continuous sync in cloud computing platform -type LocalBinFileAdapterStreamer struct { +// localBinFileAdapterStreamer will support to download flushed binlog file for continuous sync in cloud computing platform +type localBinFileAdapterStreamer struct { *replication.BinlogStreamer // the running streamer, it will be localStreamer or sync master streamer - syncMasterStreamer *replication.BinlogStreamer // syncMasterStreamer is the streamer from startSyncer + syncMasterStreamer *replication.BinlogStreamer // syncMasterStreamer is the streamer from canal startSyncer canal *Canal - binFileDownload BinlogFileDownload + binFileDownload BinlogFileDownloader } -// GetEvent will auto switch the running streamer and return replication.BinlogEvent -func (s *LocalBinFileAdapterStreamer) GetEvent(ctx context.Context) (*replication.BinlogEvent, error) { +// GetEvent will auto switch the local and remote streamer to get binlog event if possible. +func (s *localBinFileAdapterStreamer) GetEvent(ctx context.Context) (*replication.BinlogEvent, error) { if s.binFileDownload == nil { // not support to use local bin file return s.BinlogStreamer.GetEvent(ctx) } @@ -55,9 +55,10 @@ func (s *LocalBinFileAdapterStreamer) GetEvent(ctx context.Context) (*replicatio _ = s.canal.prepareSyncer() newStreamer, startErr := s.canal.startSyncer() - if startErr == nil { - ev, err = newStreamer.GetEvent(ctx) + if startErr != nil { + return nil, startErr } + ev, err = newStreamer.GetEvent(ctx) // set all streamer to the new sync master streamer s.BinlogStreamer = newStreamer s.syncMasterStreamer = newStreamer @@ -68,7 +69,7 @@ func (s *LocalBinFileAdapterStreamer) GetEvent(ctx context.Context) (*replicatio if mysqlErr.Code == mysql.ER_MASTER_FATAL_ERROR_READING_BINLOG && mysqlErr.Message == "Could not find first log file name in binary log index file" { gset := s.canal.master.GTIDSet() - if gset == nil || gset.String() == "" { // currently only support xid mode + if gset == nil || gset.String() == "" { // currently only support position based replication s.canal.cfg.Logger.Info("Could not find first log, try to download the local binlog for retry") pos := s.canal.master.Position() newStreamer := newLocalBinFileStreamer(s.binFileDownload, pos) @@ -84,7 +85,7 @@ func (s *LocalBinFileAdapterStreamer) GetEvent(ctx context.Context) (*replicatio return ev, err } -func newLocalBinFileStreamer(download BinlogFileDownload, position mysql.Position) *replication.BinlogStreamer { +func newLocalBinFileStreamer(download BinlogFileDownloader, position mysql.Position) *replication.BinlogStreamer { streamer := replication.NewBinlogStreamer() binFilePath, err := download(position) if err != nil { From b0193e02388f3078f15c70bb5f19014f240397af Mon Sep 17 00:00:00 2001 From: BLAZZ Date: Fri, 2 Dec 2022 10:14:51 +0800 Subject: [PATCH 5/7] name change --- canal/canal.go | 2 +- canal/local.go | 10 +++++----- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/canal/canal.go b/canal/canal.go index 001c4ad79..013ee2a45 100644 --- a/canal/canal.go +++ b/canal/canal.go @@ -55,7 +55,7 @@ type Canal struct { ctx context.Context cancel context.CancelFunc - binFileDownload BinlogFileDownloader + binFileDownloader BinlogFileDownloader } // canal will retry fetching unknown table's meta after UnknownTableRetryPeriod diff --git a/canal/local.go b/canal/local.go index 606b32a66..6db4a10e4 100644 --- a/canal/local.go +++ b/canal/local.go @@ -14,7 +14,7 @@ type BinlogFileDownloader func(mysql.Position) (localBinFilePath string, err err // WithLocalBinlogDownloader registers the local bin file downloader, // that allows download the backup binlog file from RDS service to local func (c *Canal) WithLocalBinlogDownloader(d BinlogFileDownloader) { - c.binFileDownload = d + c.binFileDownloader = d } func (c *Canal) adaptLocalBinFileStreamer(remoteBinlogStreamer *replication.BinlogStreamer, err error) (*localBinFileAdapterStreamer, error) { @@ -22,7 +22,7 @@ func (c *Canal) adaptLocalBinFileStreamer(remoteBinlogStreamer *replication.Binl BinlogStreamer: remoteBinlogStreamer, syncMasterStreamer: remoteBinlogStreamer, canal: c, - binFileDownload: c.binFileDownload, + binFileDownloader: c.binFileDownloader, }, err } @@ -31,12 +31,12 @@ type localBinFileAdapterStreamer struct { *replication.BinlogStreamer // the running streamer, it will be localStreamer or sync master streamer syncMasterStreamer *replication.BinlogStreamer // syncMasterStreamer is the streamer from canal startSyncer canal *Canal - binFileDownload BinlogFileDownloader + binFileDownloader BinlogFileDownloader } // GetEvent will auto switch the local and remote streamer to get binlog event if possible. func (s *localBinFileAdapterStreamer) GetEvent(ctx context.Context) (*replication.BinlogEvent, error) { - if s.binFileDownload == nil { // not support to use local bin file + if s.binFileDownloader == nil { // not support to use local bin file return s.BinlogStreamer.GetEvent(ctx) } @@ -72,7 +72,7 @@ func (s *localBinFileAdapterStreamer) GetEvent(ctx context.Context) (*replicatio if gset == nil || gset.String() == "" { // currently only support position based replication s.canal.cfg.Logger.Info("Could not find first log, try to download the local binlog for retry") pos := s.canal.master.Position() - newStreamer := newLocalBinFileStreamer(s.binFileDownload, pos) + newStreamer := newLocalBinFileStreamer(s.binFileDownloader, pos) s.syncMasterStreamer = s.BinlogStreamer s.BinlogStreamer = newStreamer From 3f517c88c7c23273783380169a9fce860fcd733c Mon Sep 17 00:00:00 2001 From: BLAZZ Date: Mon, 5 Dec 2022 13:45:52 +0800 Subject: [PATCH 6/7] process parse error --- canal/local.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/canal/local.go b/canal/local.go index 6db4a10e4..5e98bf7c9 100644 --- a/canal/local.go +++ b/canal/local.go @@ -94,7 +94,7 @@ func newLocalBinFileStreamer(download BinlogFileDownloader, position mysql.Posit go func(binFilePath string, streamer *replication.BinlogStreamer) { beginFromHere := false - _ = replication.NewBinlogParser().ParseFile(binFilePath, 0, func(be *replication.BinlogEvent) error { + err := replication.NewBinlogParser().ParseFile(binFilePath, 0, func(be *replication.BinlogEvent) error { if be.Header.LogPos == position.Pos || position.Pos == 4 { // go ahead to check if begin beginFromHere = true } @@ -103,6 +103,9 @@ func newLocalBinFileStreamer(download BinlogFileDownloader, position mysql.Posit } return nil }) + if err != nil { + streamer.CloseWithError(err) + } }(binFilePath, streamer) return streamer From 9f98edac135b506dc69b87078b00f8a433ae273e Mon Sep 17 00:00:00 2001 From: BLAZZ Date: Fri, 9 Dec 2022 14:28:52 +0800 Subject: [PATCH 7/7] use syncer's parser and support gtid event --- canal/local.go | 43 +++++++++++++++++-------------- replication/binlogsyncer.go | 51 ++++++++++++++++++++++--------------- 2 files changed, 55 insertions(+), 39 deletions(-) diff --git a/canal/local.go b/canal/local.go index 5e98bf7c9..45729e292 100644 --- a/canal/local.go +++ b/canal/local.go @@ -58,47 +58,52 @@ func (s *localBinFileAdapterStreamer) GetEvent(ctx context.Context) (*replicatio if startErr != nil { return nil, startErr } - ev, err = newStreamer.GetEvent(ctx) // set all streamer to the new sync master streamer s.BinlogStreamer = newStreamer s.syncMasterStreamer = newStreamer + + ev, err = newStreamer.GetEvent(ctx) } - if mysqlErr, ok := err.(*mysql.MyError); ok { - // change to local binlog file streamer to adapter the steamer - if mysqlErr.Code == mysql.ER_MASTER_FATAL_ERROR_READING_BINLOG && - mysqlErr.Message == "Could not find first log file name in binary log index file" { - gset := s.canal.master.GTIDSet() - if gset == nil || gset.String() == "" { // currently only support position based replication - s.canal.cfg.Logger.Info("Could not find first log, try to download the local binlog for retry") - pos := s.canal.master.Position() - newStreamer := newLocalBinFileStreamer(s.binFileDownloader, pos) + mysqlErr, ok := err.(*mysql.MyError) + // only 'Could not find first log' can create local streamer, ignore other errors + if !ok || mysqlErr.Code != mysql.ER_MASTER_FATAL_ERROR_READING_BINLOG || + mysqlErr.Message != "Could not find first log file name in binary log index file" { + return ev, err + } - s.syncMasterStreamer = s.BinlogStreamer - s.BinlogStreamer = newStreamer + s.canal.cfg.Logger.Info("Could not find first log, try to download the local binlog for retry") - return newStreamer.GetEvent(ctx) - } - } - } + // local binlog need next position to find binlog file and begin event + pos := s.canal.master.Position() + newStreamer := s.newLocalBinFileStreamer(s.binFileDownloader, pos) + s.BinlogStreamer = newStreamer - return ev, err + return newStreamer.GetEvent(ctx) } -func newLocalBinFileStreamer(download BinlogFileDownloader, position mysql.Position) *replication.BinlogStreamer { +func (s *localBinFileAdapterStreamer) newLocalBinFileStreamer(download BinlogFileDownloader, position mysql.Position) *replication.BinlogStreamer { streamer := replication.NewBinlogStreamer() binFilePath, err := download(position) if err != nil { streamer.CloseWithError(errors.New("local binlog file not exist")) + return streamer } go func(binFilePath string, streamer *replication.BinlogStreamer) { beginFromHere := false - err := replication.NewBinlogParser().ParseFile(binFilePath, 0, func(be *replication.BinlogEvent) error { + err := s.canal.syncer.GetBinlogParser().ParseFile(binFilePath, 0, func(be *replication.BinlogEvent) error { + if position.Pos < 4 { // binlog first pos is 4, if pos < 4 means canal gives error position info + return nil + } if be.Header.LogPos == position.Pos || position.Pos == 4 { // go ahead to check if begin beginFromHere = true } if beginFromHere { + if err := s.canal.syncer.StorePosAndGTID(be); err != nil { + streamer.CloseWithError(err) + return nil + } streamer.PutEvent(be) } return nil diff --git a/replication/binlogsyncer.go b/replication/binlogsyncer.go index 7421e102b..08e17deef 100644 --- a/replication/binlogsyncer.go +++ b/replication/binlogsyncer.go @@ -794,6 +794,32 @@ func (b *BinlogSyncer) parseEvent(s *BinlogStreamer, data []byte) error { return errors.Trace(err) } + if err := b.StorePosAndGTID(e); err != nil { + return errors.Trace(err) + } + + needStop := false + select { + case s.ch <- e: + case <-b.ctx.Done(): + needStop = true + } + + if needACK { + err := b.replySemiSyncACK(b.nextPos) + if err != nil { + return errors.Trace(err) + } + } + + if needStop { + return errors.New("sync is been closing...") + } + + return nil +} + +func (b *BinlogSyncer) StorePosAndGTID(e *BinlogEvent) error { if e.Header.LogPos > 0 { // Some events like FormatDescriptionEvent return 0, ignore. b.nextPos.Pos = e.Header.LogPos @@ -830,7 +856,7 @@ func (b *BinlogSyncer) parseEvent(s *BinlogStreamer, data []byte) error { break } prev := b.currGset.Clone() - err = b.currGset.(*MariadbGTIDSet).AddSet(&event.GTID) + err := b.currGset.(*MariadbGTIDSet).AddSet(&event.GTID) if err != nil { return errors.Trace(err) } @@ -847,25 +873,6 @@ func (b *BinlogSyncer) parseEvent(s *BinlogStreamer, data []byte) error { event.GSet = getCurrentGtidSet() } } - - needStop := false - select { - case s.ch <- e: - case <-b.ctx.Done(): - needStop = true - } - - if needACK { - err := b.replySemiSyncACK(b.nextPos) - if err != nil { - return errors.Trace(err) - } - } - - if needStop { - return errors.New("sync is been closing...") - } - return nil } @@ -902,3 +909,7 @@ func (b *BinlogSyncer) killConnection(conn *client.Conn, id uint32) { } b.cfg.Logger.Infof("kill last connection id %d", id) } + +func (b *BinlogSyncer) GetBinlogParser() *BinlogParser { + return b.parser +}