Skip to content

Commit

Permalink
added query event in dummyEventHandler
Browse files Browse the repository at this point in the history
Signed-off-by: axfor <[email protected]>
  • Loading branch information
axfor committed Jun 23, 2023
1 parent bea526d commit d6fe43b
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 26 deletions.
10 changes: 5 additions & 5 deletions canal/canal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func TestCreateTableExp(t *testing.T) {
t.Fatalf("TestCreateTableExp:case %s failed\n", s)
}
for _, st := range stmts {
nodes := parseStmt(st)
nodes := parseDDLStmt(st)
if len(nodes) == 0 {
continue
}
Expand Down Expand Up @@ -207,7 +207,7 @@ func TestAlterTableExp(t *testing.T) {
t.Fatalf("TestAlterTableExp:case %s failed\n", s)
}
for _, st := range stmts {
nodes := parseStmt(st)
nodes := parseDDLStmt(st)
if len(nodes) == 0 {
continue
}
Expand Down Expand Up @@ -241,7 +241,7 @@ func TestRenameTableExp(t *testing.T) {
t.Fatalf("TestRenameTableExp:case %s failed\n", s)
}
for _, st := range stmts {
nodes := parseStmt(st)
nodes := parseDDLStmt(st)
if len(nodes) == 0 {
continue
}
Expand Down Expand Up @@ -285,7 +285,7 @@ func TestDropTableExp(t *testing.T) {
t.Fatalf("TestDropTableExp:case %s failed\n", s)
}
for _, st := range stmts {
nodes := parseStmt(st)
nodes := parseDDLStmt(st)
if len(nodes) == 0 {
continue
}
Expand Down Expand Up @@ -329,7 +329,7 @@ func TestWithoutSchemeExp(t *testing.T) {
t.Fatalf("TestCreateTableExp:case %s failed\n", s.Query)
}
for _, st := range stmts {
nodes := parseStmt(st)
nodes := parseDDLStmt(st)
if len(nodes) == 0 {
continue
}
Expand Down
7 changes: 6 additions & 1 deletion canal/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package canal
import (
"github.com/go-mysql-org/go-mysql/mysql"
"github.com/go-mysql-org/go-mysql/replication"
"github.com/pingcap/tidb/parser/ast"
)

type EventHandler interface {
Expand All @@ -17,6 +18,8 @@ type EventHandler interface {
OnGTID(header *replication.EventHeader, gtid mysql.GTIDSet) error
// OnPosSynced Use your own way to sync position. When force is true, sync position immediately.
OnPosSynced(header *replication.EventHeader, pos mysql.Position, set mysql.GTIDSet, force bool) error
// OnQueryEvent is query event include(create user,drop user,create index event,etd.)
OnQueryEvent(ev *replication.BinlogEvent, e *replication.QueryEvent, stmt ast.StmtNode, pos *Position) error
String() string
}

Expand All @@ -38,7 +41,9 @@ func (h *DummyEventHandler) OnGTID(*replication.EventHeader, mysql.GTIDSet) erro
func (h *DummyEventHandler) OnPosSynced(*replication.EventHeader, mysql.Position, mysql.GTIDSet, bool) error {
return nil
}

func (h *DummyEventHandler) OnQueryEvent(*replication.BinlogEvent, *replication.QueryEvent, ast.StmtNode, *Position) error {
return nil
}
func (h *DummyEventHandler) String() string { return "DummyEventHandler" }

// `SetEventHandler` registers the sync handler, you must register your
Expand Down
69 changes: 54 additions & 15 deletions canal/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,25 +141,29 @@ func (c *Canal) runSyncBinlog() error {
c.cfg.Logger.Errorf("parse query(%s) err %v, will skip this event", e.Query, err)
continue
}
posInfo := &Position{
Position: pos,
SavePos: savePos,
Force: force,
}
for _, stmt := range stmts {
nodes := parseStmt(stmt)
for _, node := range nodes {
if node.db == "" {
node.db = string(e.Schema)
}
if err = c.updateTable(ev.Header, node.db, node.table); err != nil {
return errors.Trace(err)
}
}
nodes := parseDDLStmt(stmt)
if len(nodes) > 0 {
savePos = true
force = true
// Now we only handle Table Changed DDL, maybe we will support more later.
if err = c.eventHandler.OnDDL(ev.Header, pos, e); err != nil {
return errors.Trace(err)
posInfo.SavePos = true
posInfo.Force = true
err := c.handleDDLEvent(ev, e, nodes, posInfo)
if err != nil {
c.cfg.Logger.Errorf("handle ddl event err %v", err)
}
} else {
err := c.handleQueryEvent(ev, e, stmt, posInfo)
if err != nil {
c.cfg.Logger.Errorf("handle query event err %v", err)
}
}
}
savePos = posInfo.SavePos
force = posInfo.Force
if savePos && e.GSet != nil {
c.master.UpdateGTIDSet(e.GSet)
}
Expand All @@ -183,7 +187,7 @@ type node struct {
table string
}

func parseStmt(stmt ast.StmtNode) (ns []*node) {
func parseDDLStmt(stmt ast.StmtNode) (ns []*node) {
switch t := stmt.(type) {
case *ast.RenameTableStmt:
for _, tableInfo := range t.TableToTables {
Expand Down Expand Up @@ -231,6 +235,7 @@ func (c *Canal) updateTable(header *replication.EventHeader, db, table string) (
}
return
}

func (c *Canal) updateReplicationDelay(ev *replication.BinlogEvent) {
var newDelay uint32
now := uint32(time.Now().Unix())
Expand Down Expand Up @@ -336,3 +341,37 @@ func (c *Canal) CatchMasterPos(timeout time.Duration) error {

return c.WaitUntilPos(pos, timeout)
}

type Position struct {
mysql.Position
SavePos bool
Force bool
}

// handleDDLEvent is handle DDL event
func (c *Canal) handleDDLEvent(ev *replication.BinlogEvent, e *replication.QueryEvent, nodes []*node, pos *Position) error {
for _, node := range nodes {
if node.db == "" {
node.db = string(e.Schema)
}
if err := c.updateTable(ev.Header, node.db, node.table); err != nil {
return errors.Trace(err)
}
}
if len(nodes) > 0 {
// Now we only handle Table Changed DDL, maybe we will support more later.
if err := c.eventHandler.OnDDL(ev.Header, pos.Position, e); err != nil {
return errors.Trace(err)
}
}
return nil
}

// handleQueryEvent is handle some common query events (e.g., DDL,CREATE or DROP USER,GRANT)
// DDL event use handleDDLEvent, others use the handleQueryEvent
func (c *Canal) handleQueryEvent(ev *replication.BinlogEvent, e *replication.QueryEvent, stmt ast.StmtNode, pos *Position) error {
if err := c.eventHandler.OnQueryEvent(ev, e, stmt, pos); err != nil {
return errors.Trace(err)
}
return nil
}
10 changes: 6 additions & 4 deletions client/resp.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,10 +336,10 @@ func (c *Conn) readResultsetStreaming(data []byte, binary bool, result *Result,
return nil
}

func (c *Conn) readResultColumns(result *Result) (err error) {
func (c *Conn) readResultColumns(result *Result) error {
var i = 0
var data []byte

var err error
for {
rawPkgLen := len(result.RawPkg)
result.RawPkg, err = c.ReadPacketReuseMem(result.RawPkg)
Expand Down Expand Up @@ -378,8 +378,9 @@ func (c *Conn) readResultColumns(result *Result) (err error) {
}
}

func (c *Conn) readResultRows(result *Result, isBinary bool) (err error) {
func (c *Conn) readResultRows(result *Result, isBinary bool) error {
var data []byte
var err error

for {
rawPkgLen := len(result.RawPkg)
Expand Down Expand Up @@ -425,10 +426,11 @@ func (c *Conn) readResultRows(result *Result, isBinary bool) (err error) {
return nil
}

func (c *Conn) readResultRowsStreaming(result *Result, isBinary bool, perRowCb SelectPerRowCallback) (err error) {
func (c *Conn) readResultRowsStreaming(result *Result, isBinary bool, perRowCb SelectPerRowCallback) error {
var (
data []byte
row []FieldValue
err error
)

for {
Expand Down
3 changes: 2 additions & 1 deletion mysql/field.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,11 @@ const (
FieldValueTypeString
)

func (f *Field) Parse(p FieldData) (err error) {
func (f *Field) Parse(p FieldData) error {
f.Data = p

var n int
var err error
pos := 0
//skip catelog, always def
n, err = SkipLengthEncodedString(p)
Expand Down

0 comments on commit d6fe43b

Please sign in to comment.