Skip to content

Commit

Permalink
Merge branch 'ljh' into 'master'
Browse files Browse the repository at this point in the history
fix: get pos update

See merge request liangjunhao/bug-notify!10
  • Loading branch information
paterleng committed Sep 10, 2024
2 parents 07968e8 + 05aa0db commit f2b70cf
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 140 deletions.
2 changes: 1 addition & 1 deletion api/dingdingapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func SendMessage(data model.SendMsg) error {
msg["msgtype"] = data.MsgType
if data.MsgType == "markdown" {
msg[data.MsgType] = map[string]interface{}{
"title": "bug",
"title": "任务",
"text": data.Content,
}
} else {
Expand Down
16 changes: 16 additions & 0 deletions bug-notify.log
Original file line number Diff line number Diff line change
Expand Up @@ -535,3 +535,19 @@
{"level":"INFO","time":"2024-09-10T14:57:25.928+0800","caller":"init-tool/logger.go:36","msg":"init logger success"}
{"level":"INFO","time":"2024-09-10T14:58:26.209+0800","caller":"init-tool/logger.go:36","msg":"init logger success"}
{"level":"INFO","time":"2024-09-10T15:27:35.708+0800","caller":"init-tool/logger.go:36","msg":"init logger success"}
{"level":"INFO","time":"2024-09-10T16:34:19.682+0800","caller":"init-tool/logger.go:36","msg":"init logger success"}
{"level":"INFO","time":"2024-09-10T17:11:48.857+0800","caller":"init-tool/logger.go:36","msg":"init logger success"}
{"level":"INFO","time":"2024-09-10T17:12:28.314+0800","caller":"init-tool/logger.go:36","msg":"init logger success"}
{"level":"INFO","time":"2024-09-10T19:44:43.229+0800","caller":"init-tool/logger.go:36","msg":"init logger success"}
{"level":"INFO","time":"2024-09-10T19:50:27.792+0800","caller":"init-tool/logger.go:36","msg":"init logger success"}
{"level":"INFO","time":"2024-09-10T19:57:52.850+0800","caller":"init-tool/logger.go:36","msg":"init logger success"}
{"level":"INFO","time":"2024-09-10T19:59:10.813+0800","caller":"init-tool/logger.go:36","msg":"init logger success"}
{"level":"ERROR","time":"2024-09-10T19:59:16.892+0800","caller":"bug-notify/main.go:26","msg":"数据中心异常,请联系管理员处理"}
{"level":"INFO","time":"2024-09-10T19:59:42.954+0800","caller":"init-tool/logger.go:36","msg":"init logger success"}
{"level":"ERROR","time":"2024-09-10T19:59:43.270+0800","caller":"bug-notify/main.go:26","msg":"数据中心异常,请联系管理员处理"}
{"level":"INFO","time":"2024-09-10T20:04:38.269+0800","caller":"init-tool/logger.go:36","msg":"init logger success"}
{"level":"ERROR","time":"2024-09-10T20:04:38.465+0800","caller":"bug-notify/main.go:26","msg":"数据中心异常,请联系管理员处理"}
{"level":"INFO","time":"2024-09-10T20:05:00.150+0800","caller":"init-tool/logger.go:36","msg":"init logger success"}
{"level":"ERROR","time":"2024-09-10T20:05:00.390+0800","caller":"bug-notify/main.go:28","msg":"数据中心异常,请联系管理员处理"}
{"level":"INFO","time":"2024-09-10T20:05:25.828+0800","caller":"init-tool/logger.go:36","msg":"init logger success"}
{"level":"INFO","time":"2024-09-10T20:05:49.559+0800","caller":"init-tool/logger.go:36","msg":"init logger success"}
177 changes: 40 additions & 137 deletions handle/notify-handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,54 +12,39 @@ import (
"fmt"
"github.com/go-mysql-org/go-mysql/canal"
"github.com/go-mysql-org/go-mysql/mysql"
"github.com/go-mysql-org/go-mysql/replication"
"go.uber.org/zap"
"os"
"strconv"
"sync"
)

type MyEventHandler struct {
canal.DummyEventHandler
}

func (h *MyEventHandler) OnRow(e *canal.RowsEvent) error {
tableMap := make(map[string]int)
for _, t := range init_tool.Conf.Table.TableName {
tableMap[t]++
func (h *MyEventHandler) OnPosSynced(header *replication.EventHeader, pos mysql.Position, set mysql.GTIDSet, force bool) error {
//存储文件
marshal, err := json.Marshal(pos)
if err != nil {
zap.L().Error("转换失败:", zap.Error(err))
return err
}
err = StroageFile(string(marshal))
if err != nil {
zap.L().Error("文件写入失败:", zap.Error(err))
return err
}
return nil
}

if _, ok := tableMap[e.Table.Name]; ok {
c, err := init_tool.GoMysqlConn()
if err != nil {
zap.L().Fatal("创建连接失败")
return err
}
defer c.Close()

c.SetEventHandler(&MyEventHandler{})

masterPos, err := c.GetMasterPos()
var pos uint32
if e.Header != nil {
pos = e.Header.LogPos
fmt.Println("header", e.Header)
}
p := mysql.Position{
Name: masterPos.Name,
Pos: pos,
}

action := e.Action
olddata, newdata := GetData(e)
switch action {
case controller.UPDATE:
UpdateHandle(olddata, newdata, p)
}
func (h *MyEventHandler) OnRow(e *canal.RowsEvent) error {
action := e.Action
newdata := GetData(e)
switch action {
case controller.UPDATE:
UpdateHandle(newdata)
}
fmt.Println("表%s", e.Table)
fmt.Println("数据", e.Rows)
fmt.Println("我是action", e.Action)
s := e.String()
fmt.Println("我是s", s)
return nil
}

Expand Down Expand Up @@ -91,9 +76,6 @@ func NotifyHandle() {
Pos: pos.Pos,
}
c.RunFrom(p)
//masterPos, err := c.GetMasterPos()
//c.RunFrom(masterPos)

}

func Ttttt() {
Expand All @@ -118,66 +100,7 @@ func Ttttt() {
}
}

func InsertHandle(olddata *model.DataChanges, position mysql.Position) {
//对比数据,看有什么变化
//project, err := dao.GetProject(olddata.ProjectID)
//if err != nil {
// zap.L().Error("获取项目失败:", zap.Error(err))
// return
//}
//phone, err := dao.GetPhoneByUserID(olddata.AssignedToID)
//if err != nil {
// return
//}
//takeName, createName, err := GetUserName(olddata.AssignedToID, olddata.AuthorID)
//if err != nil {
// return
//}
//data := model.SendMsg{
// AtMobiles: []string{phone},
// IsAtAll: false,
// Content: fmt.Sprintf(
// "<center><font color=Blue size=5>温馨提醒</font></center>"+
// "---"+
// "> **所属项目:%s**"+
// "---"+
// "> **bug主题:%s**"+
// "---"+
// "> **创建人:%s**"+
// "---"+
// "> **处理人:%s**"+
// "---"+
// "\n @%s \n", project, olddata.Subject, createName, takeName, phone),
// MsgType: "actionCard",
// Url: "http://192.168.10.6:3000/issues/" + strconv.Itoa(int(olddata.ID)),
//}
//file, err := os.ReadFile("pos.txt")
//if err != nil {
// zap.L().Error("读文件失败", zap.Error(err))
//}
//var pos model.Potion
//json.Unmarshal(file, pos)
//err = api.SendMessage(data)
//if err != nil {
// zap.L().Error("消息发送失败:", zap.Error(err))
// return
//}
//if pos.Pos != 0 {
// //存储文件
// marshal, err := json.Marshal(position)
// if err != nil {
// zap.L().Error("转换失败:", zap.Error(err))
// return
// }
// err = StroageFile(string(marshal))
// if err != nil {
// zap.L().Error("文件写入失败:", zap.Error(err))
// return
// }
//}
}

func UpdateHandle(olddata, newdata *model.DataChanges, position mysql.Position) {
func UpdateHandle(newdata *model.DataChanges) {
project, err := dao.GetProject(newdata.ProjectID)
if err != nil {
zap.L().Error("获取项目失败:", zap.Error(err))
Expand Down Expand Up @@ -227,9 +150,9 @@ func UpdateHandle(olddata, newdata *model.DataChanges, position mysql.Position)
"\n--- \n"+
"\n> **所属项目:%s**\n"+
"\n--- \n"+
"\n> **bug主题:%s**\n"+
"\n> **任务主题:%s**\n"+
"\n--- \n"+
"\n> **bug状态:%s**\n"+
"\n> **任务状态:%s**\n"+
"\n--- \n"+
"\n> **创建人:%s** \n"+
"\n--- \n"+
Expand All @@ -239,45 +162,25 @@ func UpdateHandle(olddata, newdata *model.DataChanges, position mysql.Position)
MsgType: "actionCard",
Url: init_tool.Conf.Redmine.URL + strconv.Itoa(int(newdata.ID)),
}
file, err := os.ReadFile(controller.POSFILENAME)
if err != nil {
zap.L().Error("读文件失败", zap.Error(err))
return
}
var pos model.Potion
json.Unmarshal(file, &pos)
err = api.SendMessage(data)
if err != nil {
zap.L().Error("消息发送失败:", zap.Error(err))
return
}
if pos.Pos != 0 {
//存储文件
marshal, err := json.Marshal(position)
if err != nil {
zap.L().Error("转换失败:", zap.Error(err))
return
}
err = StroageFile(string(marshal))
if err != nil {
zap.L().Error("文件写入失败:", zap.Error(err))
return
}
}
}

func GetData(e *canal.RowsEvent) (*model.DataChanges, *model.DataChanges) {
oldData := new(model.DataChanges)
oldData.ID = e.Rows[0][0].(int32)
oldData.ProjectID = e.Rows[0][2].(int32)
oldData.Subject = e.Rows[0][3].(string)
oldData.StatusID = e.Rows[0][7].(int32)
if e.Rows[0][8] != nil {
oldData.AssignedToID = e.Rows[0][8].(int32)
} else {
oldData.AssignedToID = 0
}
oldData.AuthorID = e.Rows[0][11].(int32)
func GetData(e *canal.RowsEvent) *model.DataChanges {
//oldData := new(model.DataChanges)
//oldData.ID = e.Rows[0][0].(int32)
//oldData.ProjectID = e.Rows[0][2].(int32)
//oldData.Subject = e.Rows[0][3].(string)
//oldData.StatusID = e.Rows[0][7].(int32)
//if e.Rows[0][8] != nil {
// oldData.AssignedToID = e.Rows[0][8].(int32)
//} else {
// oldData.AssignedToID = 0
//}
//oldData.AuthorID = e.Rows[0][11].(int32)
newData := new(model.DataChanges)
if e.Action == controller.UPDATE {
newData.ID = e.Rows[1][0].(int32)
Expand All @@ -291,7 +194,7 @@ func GetData(e *canal.RowsEvent) (*model.DataChanges, *model.DataChanges) {
}
newData.AuthorID = e.Rows[1][11].(int32)
}
return oldData, newData
return newData
}

func StroageFile(data string) (err error) {
Expand All @@ -301,17 +204,17 @@ func StroageFile(data string) (err error) {
}
defer file.Close()
writer := bufio.NewWriter(file)

mu := sync.Mutex{}
mu.Lock()
_, err = writer.WriteString(data)
if err != nil {
return
}

mu.Unlock()
err = writer.Flush()
if err != nil {
return
}

return nil
}

Expand Down
5 changes: 5 additions & 0 deletions init-tool/gomysql-conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@ func GoMysqlConn() (*canal.Canal, error) {
cfg.Password = Conf.MySQLConfig.Password
cfg.Dump.TableDB = Conf.Table.TableDB
cfg.Dump.Tables = Conf.Table.TableName
var tables []string
for _, name := range Conf.Table.TableName {
tables = append(tables, Conf.Table.TableDB+"."+name)
}
cfg.IncludeTableRegex = tables

c, err := canal.NewCanal(cfg)
return c, err
Expand Down
4 changes: 3 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"bug-notify/handle"
init_tool "bug-notify/init-tool"
"fmt"
"github.com/gin-gonic/gin"
"go.uber.org/zap"
)
Expand All @@ -14,7 +15,7 @@ func main() {
}
engine := gin.Default()
//启动一个协程用于执行binlog
go safelyRun(handle.Ttttt)
//go safelyRun(handle.Ttttt)
go safelyRun(handle.NotifyHandle)
go safelyRun(handle.TimeingTasks)
engine.Run(init_tool.Conf.ProjectConfig.Address + ":" + init_tool.Conf.ProjectConfig.Port)
Expand All @@ -23,6 +24,7 @@ func main() {
func safelyRun(f func()) {
defer func() {
if r := recover(); r != nil {
fmt.Println(r)
zap.L().Error("数据中心异常,请联系管理员处理")
}
}()
Expand Down
2 changes: 1 addition & 1 deletion pos.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"Name":"binlog.000021","Pos":88670}
{"Name":"binlog.000021","Pos":105090}

0 comments on commit f2b70cf

Please sign in to comment.