-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
added query event in dummy event handler #791
Conversation
Signed-off-by: axfor <[email protected]>
52ce4ff
to
d6fe43b
Compare
Signed-off-by: axfor <[email protected]>
canal/sync.go
Outdated
@@ -336,3 +341,37 @@ func (c *Canal) CatchMasterPos(timeout time.Duration) error { | |||
|
|||
return c.WaitUntilPos(pos, timeout) | |||
} | |||
|
|||
type Position struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Based on mysql.Position
adds SavePos
and Force
fields
I think queryEvent contains multiple sub-events, pos save logic, and it is up to the end-user event to decide whether to save or not
func (h *DummyEventHandler) OnQueryEvent(header *replication.EventHeader, stmt ast.StmtNode, pos *Position, e *replication.QueryEvent) error {
//....
pos.SavePos =true
return nil
}
Of course, it can also be handled uniformly in the main loop of canal/sync.go
events.
https://github.com/axfor/go-mysql/blob/feature-axx/canal/sync.go#L161
func (c *Canal) startSyncer() (*replication.BinlogStreamer, error) {
//....
for _, stmt := range stmts {
nodes := parseDDLStmt(stmt)
if len(nodes) > 0 {
// OnDDL
} else {
err := c.handleQueryEvent(ev.Header, stmt, posInfo, e)
if err != nil {
c.cfg.Logger.Errorf("handle query event err %v", err)
} else {
savePos = true // for save pos
force = true // for save pos
}
}
//....
}
//....
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is up to the end-user event to decide whether to save or not
I think it's more obvious to use return value to do it.
OnQueryEvent(ev *replication.BinlogEvent, e *replication.QueryEvent, stmt ast.StmtNode, pos mysql.Position) (savePos bool, force bool, err error)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I'll modify it
update commit of onQueryEvent Co-authored-by: lance6716 <[email protected]>
Signed-off-by: axfor <[email protected]>
@@ -17,18 +18,22 @@ type EventHandler interface { | |||
OnGTID(header *replication.EventHeader, gtid mysql.GTIDSet) error | |||
// OnPosSynced Use your own way to sync position. When force is true, sync position immediately. | |||
OnPosSynced(header *replication.EventHeader, pos mysql.Position, set mysql.GTIDSet, force bool) error | |||
// OnQueryEvent is query event include (create user, drop user, create index, etc.) | |||
// Note: the OnQueryEvent has lower priority than OnDDL even | |||
OnQueryEvent(header *replication.EventHeader, stmt ast.StmtNode, pos mysql.Position, e *replication.QueryEvent) (bool, bool, error) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please use named return in this interface, or explain what's the meaning of two bool
s in comments. So users of this library will understand when they implement this interface.
if err != nil { | ||
return savePos, force, errors.Trace(err) | ||
} | ||
return savePos, force, nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if err != nil { | |
return savePos, force, errors.Trace(err) | |
} | |
return savePos, force, nil | |
return savePos, force, errors.Trace(err) |
@axfor could you check the open conversations and fix the conflicts? |
Closing this as it has become stale |
added more query event