diff --git a/GoMybatis.go b/GoMybatis.go index 4c6f43c..a787ae6 100644 --- a/GoMybatis.go +++ b/GoMybatis.go @@ -92,15 +92,11 @@ func WriteMapper(bean reflect.Value, xml []byte, sessionFactory *SessionFactory, } var session Session var err error - if len(args) == 1 && args[0].IsValid() == true && !args[0].IsNil() { - session = sessionFactory.NewSession(beanName, SessionType_TransationRM, args[0].Interface().(*TransationRMClientConfig)) - } else { - session = sessionFactory.NewSession(beanName, SessionType_Default, nil) - } if session != nil { returnValue.Elem().Set(reflect.ValueOf(session).Elem().Addr().Convert(*returnType.ReturnOutType)) } else { - err = utils.NewError("GoMybatis", "Create Session fail.") + //err = utils.NewError("GoMybatis", "Create Session fail.arg session not exist!") + session = sessionFactory.NewSession(beanName, SessionType_Default) } return buildReturnValues(returnType, returnValue, err) } @@ -331,7 +327,7 @@ func exeMethodByXml(elementType ElementType, beanName string, sessionFactory *Se } //session if session == nil { - session = sessionFactory.NewSession(beanName, SessionType_Default, nil) + session = sessionFactory.NewSession(beanName, SessionType_Default) //not arg session,just close! defer closeSession(sessionFactory, session) } diff --git a/GoMybatisEngine_test.go b/GoMybatisEngine_test.go index 72568cd..78d9f51 100644 --- a/GoMybatisEngine_test.go +++ b/GoMybatisEngine_test.go @@ -7,6 +7,7 @@ import ( "sync" "testing" "time" + _ "github.com/go-sql-driver/mysql" ) //假设Mysql 数据库查询时间为0,框架单协程的Benchmark性能 diff --git a/README-en.md b/README-en.md index 2aef224..641e1fa 100644 --- a/README-en.md +++ b/README-en.md @@ -22,9 +22,6 @@ -Template elementHigh development efficiency template, one line of code to achieve additions, deletions, modifications, logical deletions, optimistic lock version number
#### Asynchronous Message Queue Logging System ![Image text](https://zhuxiujia.github.io/gomybatis.io/assets/log_system.png) -#### Over-remote substitution for micro-service members to handle transactions supports micro-services during the transition from single database (Mysql, postgresql) to distributed database (TiDB, cockroachdb...) -![Image text](https://zhuxiujia.github.io/gomybatis.io/assets/tx.png) - Database Driven List ``` Mysql: github.com/go-sql-driver/mysql diff --git a/README.md b/README.md index 7f8ecec..2bc821b 100644 --- a/README.md +++ b/README.md @@ -27,9 +27,6 @@ #### 异步消息队列日志系统 ![Image text](https://zhuxiujia.github.io/gomybatis.io/assets/log_system.png) -#### 通过远程代替微服务成员 处理事务支持 处于 单数据库(Mysql,postgresql)-分布式数据库(TiDB,cockroachdb...)过渡期间的微服务 -![Image text](https://zhuxiujia.github.io/gomybatis.io/assets/tx.png) - 数据库驱动列表 ``` Mysql: github.com/go-sql-driver/mysql diff --git a/SessionFactory.go b/SessionFactory.go index 16f9144..225cdb0 100644 --- a/SessionFactory.go +++ b/SessionFactory.go @@ -11,7 +11,7 @@ func (it SessionFactory) New(Engine SessionEngine) SessionFactory { return it } -func (it *SessionFactory) NewSession(mapperName string, sessionType SessionType, config *TransationRMClientConfig) Session { +func (it *SessionFactory) NewSession(mapperName string, sessionType SessionType) Session { if it.SessionMap == nil || it.Engine == nil { panic("[GoMybatis] SessionFactory not init! you must call method SessionFactory.New(*)") } @@ -35,16 +35,6 @@ func (it *SessionFactory) NewSession(mapperName string, sessionType SessionType, panic(err) } break - case SessionType_TransationRM: - if config == nil { - panic("[GoMybatis] SessionFactory can not create TransationRMSession,config *TransationRMClientConfig is nil!") - } - var transationRMSession = TransationRMSession{}.New(mapperName, config.TransactionId, &TransationRMClient{ - RetryTime: config.RetryTime, - Addr: config.Addr, - }, config.Status) - newSession = Session(*transationRMSession) - break default: panic("[GoMybatis] newSession() must have a SessionType!") } diff --git a/SessionType.go b/SessionType.go index 41c5d2c..b0c73ff 100644 --- a/SessionType.go +++ b/SessionType.go @@ -5,5 +5,4 @@ type SessionType = int const ( SessionType_Default SessionType = iota //默认session类型 SessionType_Local //本地session - SessionType_TransationRM //远程session ) diff --git a/TransactionDefinition.go b/TransactionDefinition.go deleted file mode 100644 index 6b77eab..0000000 --- a/TransactionDefinition.go +++ /dev/null @@ -1,42 +0,0 @@ -package GoMybatis - -import "time" - -type PROPAGATION int -type ISOLATION int - -//隔离级别 -const ( - ISOLATION_DEFAULT ISOLATION = iota - 1 - ISOLATION_READ_UNCOMMITTED - ISOLATION_READ_COMMITTED - ISOLATION_REPEATABLE_READ - ISOLATION_SERIALIZABLE -) - -//传播行为 -const ( - PROPAGATION_REQUIRED PROPAGATION = iota - PROPAGATION_SUPPORTS - PROPAGATION_MANDATORY - PROPAGATION_REQUIRES_NEW - PROPAGATION_NOT_SUPPORTED - PROPAGATION_NEVER - PROPAGATION_NESTED -) - -type TransactionDefinition struct { - PropagationBehavior PROPAGATION - IsolationLevel ISOLATION - Timeout time.Duration - IsReadOnly bool -} - -func (it TransactionDefinition) Default() TransactionDefinition { - return TransactionDefinition{ - PropagationBehavior: PROPAGATION_REQUIRED, - IsolationLevel: ISOLATION_DEFAULT, - Timeout: 0, - IsReadOnly: false, - } -} diff --git a/TransactionFactory.go b/TransactionFactory.go deleted file mode 100644 index 5cd3d0d..0000000 --- a/TransactionFactory.go +++ /dev/null @@ -1,49 +0,0 @@ -package GoMybatis - -type TransactionFactory struct { - TransactionStatuss map[string]*TransactionStatus - SessionFactory *SessionFactory -} - -func (it TransactionFactory) New(SessionFactory *SessionFactory) TransactionFactory { - it.TransactionStatuss = make(map[string]*TransactionStatus) - it.SessionFactory = SessionFactory - return it -} - -func (it *TransactionFactory) GetTransactionStatus(mapperName string, transactionId string) (*TransactionStatus, error) { - var Session Session - var result = it.TransactionStatuss[transactionId] - if result == nil { - Session = it.SessionFactory.NewSession(mapperName, SessionType_Default, nil) - var transaction = Transaction{ - Id: transactionId, - Session: Session, - } - var transactionStatus = TransactionStatus{ - IsNewTransaction: true, - Transaction: &transaction, - } - result = &transactionStatus - it.TransactionStatuss[transactionId] = result - } - result.MapperName = mapperName - return result, nil -} - -func (it *TransactionFactory) SetTransactionStatus(transactionId string, transaction *TransactionStatus) { - if transactionId == "" { - return - } - it.TransactionStatuss[transactionId] = transaction -} - -func (it *TransactionFactory) Append(mapperName string, transactionId string, transaction TransactionStatus) { - if transactionId == "" { - return - } - var old, _ = it.GetTransactionStatus(mapperName, transactionId) - if old != nil { - it.SetTransactionStatus(transactionId, old) - } -} diff --git a/TransactionFactory_test.go b/TransactionFactory_test.go deleted file mode 100644 index 5e3f805..0000000 --- a/TransactionFactory_test.go +++ /dev/null @@ -1 +0,0 @@ -package GoMybatis diff --git a/TransactionManager.go b/TransactionManager.go deleted file mode 100644 index b205943..0000000 --- a/TransactionManager.go +++ /dev/null @@ -1,243 +0,0 @@ -package GoMybatis - -import ( - "github.com/zhuxiujia/GoMybatis/utils" - "log" -) - -type Transaction_Status int - -const ( - Transaction_Status_NO Transaction_Status = iota //非事务 - Transaction_Status_Prepare //准备事务 - Transaction_Status_Commit //提交事务 - Transaction_Status_Rollback //回滚事务 -) - -func (status Transaction_Status) ToString() string { - switch status { - case Transaction_Status_NO: - return "Transaction_Status_NO" - case Transaction_Status_Prepare: - return "Transaction_Status_Prepare" - case Transaction_Status_Commit: - return "Transaction_Status_Commit" - case Transaction_Status_Rollback: - return "Transaction_Status_Rollback" - default: - return "not init Transaction_Status!" - } -} - -type ActionType int - -const ( - ActionType_Exec ActionType = iota //执行 - ActionType_Query //查询 -) - -type TransactionReqDTO struct { - Status Transaction_Status - TransactionId string //事务id(不可空) - OwnerId string //所有者 - Sql string //sql内容(可空) - ActionType ActionType - MapperName string //mapper名称 -} - -type TransactionRspDTO struct { - TransactionId string //事务id(不可空) - Error string - Success int - Query []map[string][]byte - Exec Result -} - -type TransactionManager interface { - GetTransaction(mapperName string, def *TransactionDefinition, transactionId string, OwnerId string) (*TransactionStatus, error) - Commit(mapperName string, transactionId string) error - Rollback(mapperName string, transactionId string) error -} - -type DefaultTransationManager struct { - TransactionManager - SessionFactory *SessionFactory - TransactionFactory *TransactionFactory -} - -func (it DefaultTransationManager) New(SessionFactory *SessionFactory, TransactionFactory *TransactionFactory) DefaultTransationManager { - it.SessionFactory = SessionFactory - it.TransactionFactory = TransactionFactory - return it -} - -func (it DefaultTransationManager) GetTransaction(mapperName string, def *TransactionDefinition, transactionId string, OwnerId string) (*TransactionStatus, error) { - if transactionId == "" { - return nil, utils.NewError("TransactionManager", " transactionId ="+transactionId+" transations is nil!") - } - if def == nil { - var d = TransactionDefinition{}.Default() - def = &d - } - //TODO equal mapperName - var transationStatus, err = it.TransactionFactory.GetTransactionStatus(mapperName, transactionId) - if err != nil { - return nil, err - } - if def.PropagationBehavior == PROPAGATION_REQUIRED { - //todo doBegin - if transationStatus.IsNewTransaction { - //新事务,则调用begin - transationStatus.OwnerId = OwnerId - var err = transationStatus.Begin() - if err == nil { - if def.Timeout != 0 { - //transation out of time,default not set out of time - //事务超时,时间大于0则启动超时机制 - transationStatus.DelayFlush(def.Timeout) - } - return transationStatus, err - } - } - } - return transationStatus, nil -} - -func (it DefaultTransationManager) Commit(mapperName string, transactionId string) error { - //TODO equal mapperName - var transactions, err = it.TransactionFactory.GetTransactionStatus(mapperName, transactionId) - if err != nil { - log.Println(err) - return err - } - return transactions.Commit() -} - -func (it DefaultTransationManager) Rollback(mapperName string, transactionId string) error { - //TODO equal mapperName - var transactions, err = it.TransactionFactory.GetTransactionStatus(mapperName, transactionId) - if err != nil { - log.Println(err) - return err - } - return transactions.Rollback() -} - -//执行事务 -func (it DefaultTransationManager) DoTransaction(dto TransactionReqDTO) TransactionRspDTO { - var transcationStatus *TransactionStatus - var err error - - transcationStatus, err = it.GetTransaction(dto.MapperName, nil, dto.TransactionId, dto.OwnerId) - if transcationStatus == nil || transcationStatus.Transaction == nil || transcationStatus.Transaction.Session == nil { - return TransactionRspDTO{ - TransactionId: dto.TransactionId, - Error: "Transaction does not exist,id=" + dto.TransactionId, - } - } - if err != nil { - return TransactionRspDTO{ - TransactionId: dto.TransactionId, - Error: err.Error(), - } - } - if err != nil { - return TransactionRspDTO{ - TransactionId: dto.TransactionId, - Error: err.Error(), - } - } - log.Println("[TransactionManager] do transactionId=", dto.TransactionId, ",sessionId=", transcationStatus.Transaction.Session.Id(), "status=", dto.Status.ToString()) - - if dto.Status == Transaction_Status_NO { - defer transcationStatus.Flush() //关闭 - return it.DoAction(dto, transcationStatus) - } else if dto.Status == Transaction_Status_Prepare { - return it.DoAction(dto, transcationStatus) - } else if dto.Status == Transaction_Status_Commit { - if transcationStatus.OwnerId == dto.OwnerId { //PROPAGATION_REQUIRED 情况下 子事务 不可提交 - defer transcationStatus.Flush() //关闭 - err = transcationStatus.Commit() - if err != nil { - return TransactionRspDTO{ - TransactionId: dto.TransactionId, - Error: err.Error(), - } - } - //TODO equal mapperName - var transaction, err = it.TransactionFactory.GetTransactionStatus(dto.MapperName, dto.TransactionId) - if err != nil { - log.Println(err) - } else { - transaction.Flush() - } - } - } else if dto.Status == Transaction_Status_Rollback { - defer transcationStatus.Flush() //关闭,//PROPAGATION_REQUIRED 情况下 子事务 可关闭 - err = transcationStatus.Rollback() - if err != nil { - return TransactionRspDTO{ - TransactionId: dto.TransactionId, - Error: err.Error(), - } - } - } else { - err = utils.NewError("TransactionManager", " arg have no action!") - } - var errString = "" - if err != nil { - errString = err.Error() - } - return TransactionRspDTO{ - TransactionId: dto.TransactionId, - Error: errString, - } -} - -//执行数据库操作 -func (it DefaultTransationManager) DoAction(dto TransactionReqDTO, transcationStatus *TransactionStatus) TransactionRspDTO { - if transcationStatus.IsCompleted { - var TransactionRspDTO = TransactionRspDTO{ - TransactionId: dto.TransactionId, - Error: "[TransactionManager] transaction fail!it is completed!", - } - return TransactionRspDTO - } - if dto.Sql == "" { - var TransactionRspDTO = TransactionRspDTO{ - TransactionId: dto.TransactionId, - } - return TransactionRspDTO - } - if dto.ActionType == ActionType_Exec { - log.Println("[TransactionManager] TransactionId:", dto.TransactionId, ",Exec:", dto.Sql) - var res, e = transcationStatus.Transaction.Session.Exec(dto.Sql) - var err string - if e != nil { - err = e.Error() - return TransactionRspDTO{ - TransactionId: dto.TransactionId, - Error: err, - } - } else { - return TransactionRspDTO{ - TransactionId: dto.TransactionId, - Exec: *res, - Error: err, - } - } - } else { - log.Println("[TransactionManager] Query ", dto.Sql) - var res, e = transcationStatus.Transaction.Session.Query(dto.Sql) - var err string - if e != nil { - err = e.Error() - } - var TransactionRspDTO = TransactionRspDTO{ - TransactionId: dto.TransactionId, - Query: res, - Error: err, - } - return TransactionRspDTO - } -} diff --git a/TransactionManager_test.go b/TransactionManager_test.go deleted file mode 100644 index d40ef12..0000000 --- a/TransactionManager_test.go +++ /dev/null @@ -1,131 +0,0 @@ -package GoMybatis - -import ( - "errors" - "fmt" - _ "github.com/go-sql-driver/mysql" - "github.com/zhuxiujia/GoMybatis/example" - "github.com/zhuxiujia/GoMybatis/utils" - "strconv" - "testing" -) - -var manager DefaultTransationManager - -func TestManager(t *testing.T) { - if example.MysqlUri == "" || example.MysqlUri == "*" { - fmt.Println("no mysql config in project, you must set the mysql link!") - return - } - var engine = GoMybatisEngine{}.New() - err := engine.Open("mysql", example.MysqlUri) //此处请按格式填写你的mysql链接,这里用*号代替 - if err != nil { - t.Fatal(err) - } - var SessionFactory = SessionFactory{}.New(&engine) - var TransactionFactory = TransactionFactory{}.New(&SessionFactory) - manager = DefaultTransationManager{}.New(&SessionFactory, &TransactionFactory) - - var TestPropertyServiceA TestPropertyServiceA - var TestPropertyServiceB TestPropertyServiceB - var TestOrderService = TestOrderService{ - TestPropertyServiceA: TestPropertyServiceA, - TestPropertyServiceB: TestPropertyServiceB, - } - err = TestOrderService.Transform(utils.CreateUUID(), "20181023162632152fd236d6877ff4", "20180926172013b85403d3715d46ed", 100) - if err != nil { - t.Fatal(err) - } -} - -type TestPropertyServiceA struct{} - -//单事务2 -func (TestPropertyServiceA) Add(transactionId string, id string, amt int) error { - var sql = "UPDATE `test`.`biz_property` SET `pool_amount`= (pool_amount+" + strconv.Itoa(amt) + ") WHERE `user_id`='" + id + "';" - //todo proxy send error - var dto = TransactionReqDTO{ - TransactionId: transactionId, - OwnerId: utils.CreateUUID(), - Status: Transaction_Status_Prepare, - ActionType: ActionType_Exec, - Sql: sql, - } - var result = manager.DoTransaction(dto) - fmt.Println(dto.TransactionId, result.Exec) - dto.Status = Transaction_Status_Commit - rspDTO := manager.DoTransaction(dto) //commit - if rspDTO.Error != "" { - return errors.New(rspDTO.Error) - } - return nil -} - -type TestPropertyServiceB struct{} - -//单事务1 -func (TestPropertyServiceB) Reduce(transactionId string, id string, amt int) error { - var sql = "UPDATE `test`.`biz_property` SET `pool_amount`= (pool_amount-" + strconv.Itoa(amt) + ") WHERE `user_id`='" + id + "';" - //todo proxy send error - var dto = TransactionReqDTO{ - TransactionId: transactionId, - OwnerId: utils.CreateUUID(), - Status: Transaction_Status_Prepare, - ActionType: ActionType_Exec, - Sql: sql, - } - var result = manager.DoTransaction(dto) - fmt.Println(dto.TransactionId, result.Exec) - dto.Status = Transaction_Status_Commit - rspDTO := manager.DoTransaction(dto) //commit - if rspDTO.Error != "" { - return errors.New(rspDTO.Error) - } - return nil -} - -type TestOrderService struct { - TestPropertyServiceA TestPropertyServiceA //A微服务 - TestPropertyServiceB TestPropertyServiceB //B微服务 -} - -//嵌套事务 -func (it TestOrderService) Transform(transactionId string, outid string, inId string, amount int) error { - var OwnerId = utils.CreateUUID() - var dto = TransactionReqDTO{ - TransactionId: transactionId, - OwnerId: OwnerId, - Status: Transaction_Status_Prepare, - ActionType: ActionType_Exec, - Sql: "", - } - rspDTO := manager.DoTransaction(dto) //开启事务 - if rspDTO.Error != "" { - return errors.New(rspDTO.Error) - } - //事务id=2018092d6172014a2a4c8a949f1004623,已存在的事务不可提交commit,只能提交状态rollback和Pause - var e1 = it.TestPropertyServiceB.Reduce(transactionId, outid, amount) - if e1 != nil { - return e1 - } - - dto.Status = Transaction_Status_Rollback - rspDTO = manager.DoTransaction(dto) - if rspDTO.Error != "" { - return errors.New(rspDTO.Error) - } - //事务id=2018092d6172014a2a4c8a949f1004623,已存在的事务不可提交commit,只能提交状态rollback和Pause - var e = it.TestPropertyServiceA.Add(transactionId, inId, amount) - if e != nil { - return e - } - - //manager.Rollback(transactionId) - //事务id=2018092d6172014a2a4c8a949f1004623,原始事务可提交commit,rollback和Pause - dto.Status = Transaction_Status_Commit - rspDTO = manager.DoTransaction(dto) - if rspDTO.Error != "" { - return errors.New(rspDTO.Error) - } - return nil -} diff --git a/TransactionStatus.go b/TransactionStatus.go deleted file mode 100644 index 9ba11e0..0000000 --- a/TransactionStatus.go +++ /dev/null @@ -1,69 +0,0 @@ -package GoMybatis - -import ( - "errors" - "time" -) - -type TransactionStatus struct { - MapperName string - OwnerId string //所有者id - IsNewTransaction bool //是否新启动的事务 - HasSavepoint bool //是否保存点 - IsRollbackOnly bool //是否只允许rollback - IsCompleted bool //是否完成 - HasSetDelayClose bool //是否设置了延迟关闭/回滚 - Transaction *Transaction //事务对象 -} - -type Transaction struct { - Id string - Session Session -} - -func (it *TransactionStatus) Rollback() error { - if it.IsCompleted == true { - return errors.New("[TransactionManager] can not Rollback() a completed Transaction!") - } - it.IsCompleted = true - defer it.Flush() //close session - return it.Transaction.Session.Rollback() -} - -func (it *TransactionStatus) Commit() error { - if it.IsCompleted == true { - return errors.New("[TransactionManager] can not Commit() a completed Transaction!") - } - it.IsCompleted = true - defer it.Flush() //close session - return it.Transaction.Session.Commit() -} - -func (it *TransactionStatus) Begin() error { - if it.IsNewTransaction == false { - return errors.New("[TransactionManager] can not Begin() a old Transaction!") - } - it.IsNewTransaction = false - return it.Transaction.Session.Begin() -} - -func (it *TransactionStatus) Flush() { - if it.Transaction != nil && it.Transaction.Session != nil { - it.Transaction.Session.Close() - it.Transaction.Session = nil - it.Transaction = nil - } -} - -//延迟关闭 -func (it *TransactionStatus) DelayFlush(t time.Duration) { - if it.HasSetDelayClose == false { - go func() { - time.Sleep(t) - if it.IsCompleted == false { - it.Rollback() - } - }() - it.HasSetDelayClose = true - } -} diff --git a/TransationRMClient.go b/TransationRMClient.go deleted file mode 100644 index 8b1fa90..0000000 --- a/TransationRMClient.go +++ /dev/null @@ -1,91 +0,0 @@ -package GoMybatis - -import ( - "github.com/zhuxiujia/GoMybatis/utils" - "net/rpc" - "net/rpc/jsonrpc" -) - -const ConnectError = "connection is shut down" -const CallMethod = "TransationRMServer.Msg" - -type TransationRMClientConfig struct { - Addr string - RetryTime int - TransactionId string - Status Transaction_Status -} - -type RemoteSessionEngine struct { - SessionEngine - Client *TransationRMClient -} - -func (it RemoteSessionEngine) New(Client *TransationRMClient) RemoteSessionEngine { - it.Client = Client - return it -} - -func (it *RemoteSessionEngine) NewSession() Session { - var TransationSession = TransationRMSession{Client: it.Client} - var session = Session(&TransationSession) - return session -} - -type TransationRMClient struct { - Client *rpc.Client - Addr string - RetryTime int -} - -func (it *TransationRMClient) Link(addr string) (*rpc.Client, error) { - it.Addr = addr - var client, error = it.autoLink() - if error != nil { - return client, error - } else { - it.Client = client - return client, nil - } -} -func (it *TransationRMClient) autoLink() (*rpc.Client, error) { - if it.Client != nil { - it.Client.Close() - it.Client = nil - } - return jsonrpc.Dial("tcp", it.Addr) -} - -func (it *TransationRMClient) Call(arg TransactionReqDTO, result *TransactionRspDTO) error { - var error error - if it.Client == nil { - if it.Addr != "" { - var c, err = it.Link(it.Addr) - if err != nil { - return err - } - it.Client = c - } else { - error = utils.NewError("TransationRMClient", " link have no addr!") - return error - } - } - error = it.Client.Call(CallMethod, arg, result) - if error != nil && error.Error() == ConnectError { - for i := 0; i < it.RetryTime; i++ { - it.autoLink() - error = it.Client.Call(CallMethod, arg, result) - if error == nil { - break - } - } - } - return error -} - -func (it *TransationRMClient) Close() error { - if it.Client != nil { - return it.Client.Close() - } - return nil -} diff --git a/TransationRMClient_test.go b/TransationRMClient_test.go deleted file mode 100644 index 84596c1..0000000 --- a/TransationRMClient_test.go +++ /dev/null @@ -1,46 +0,0 @@ -package GoMybatis - -import ( - "fmt" - _ "github.com/go-sql-driver/mysql" - "github.com/zhuxiujia/GoMybatis/example" - "testing" -) - -func TestTransationRM(t *testing.T) { - if example.MysqlUri == "" || example.MysqlUri == "*" { - fmt.Println("no mysql config in project, you must set the mysql link!") - return - } - var addr = "127.0.0.1:17235" - go ServerTransationTcp(addr, "mysql", example.MysqlUri) //事务服务器节点1 - - var TransationRMClient = TransationRMClient{ - RetryTime: 3, - Addr: addr, - } - - var transationRMServerSession = TransationRMSession{ - Client: &TransationRMClient, - OwnerId: "1234", - } - - var e error - - e = transationRMServerSession.Begin() - if e != nil { - t.Fatal(e) - } - - result, e := transationRMServerSession.Exec("UPDATE `test`.`biz_activity` SET `name`='rs168-10' WHERE `id`='170';") - if e != nil { - t.Fatal(e) - } - - fmt.Println(result) - - e = transationRMServerSession.Commit() - if e != nil { - t.Fatal(e) - } -} diff --git a/TransationRMServer.go b/TransationRMServer.go deleted file mode 100644 index 489a59d..0000000 --- a/TransationRMServer.go +++ /dev/null @@ -1,57 +0,0 @@ -package GoMybatis - -import ( - "log" - "net" - "net/rpc" - "net/rpc/jsonrpc" -) - -type TransationRMServer struct { - DefaultTransationManager *DefaultTransationManager -} - -func (it TransationRMServer) Msg(arg TransactionReqDTO, result *TransactionRspDTO) (err error) { - defer func() { - if err := recover(); err != nil { - log.Println("[TransationRMServer]work failed:", err) - } - }() - var rsp = it.DefaultTransationManager.DoTransaction(arg) - *result = rsp - return nil -} - -//开启一个事务节点 -func ServerTransationTcp(addr string, driverName, dataSourceName string) { - transationRMServer := new(TransationRMServer) - var engine = GoMybatisEngine{}.New() - err := engine.Open(driverName, dataSourceName) - if err != nil { - panic(err.Error()) - } - var SessionFactory = SessionFactory{}.New(&engine) - var TransactionFactory = TransactionFactory{}.New(&SessionFactory) - var manager = DefaultTransationManager{}.New(&SessionFactory, &TransactionFactory) - transationRMServer.DefaultTransationManager = &manager - - //注册rpc服务 - err = rpc.Register(transationRMServer) - if err != nil { - panic(err) - } - var tcpUrl = addr - - l, e := net.Listen("tcp", tcpUrl) - if e != nil { - log.Fatalf("[TransationRMServer]net rpc.Listen tcp :0: %v", e) - panic(e) - } - for { - conn, e := l.Accept() - if e != nil { - continue - } - go jsonrpc.ServeConn(conn) - } -} diff --git a/TransationRMServerSession.go b/TransationRMServerSession.go deleted file mode 100644 index 594ed41..0000000 --- a/TransationRMServerSession.go +++ /dev/null @@ -1,74 +0,0 @@ -package GoMybatis - -import ( - "github.com/zhuxiujia/GoMybatis/utils" -) - -type TransationRMSession struct { - MapperName string - TransactionId string - OwnerId string - Client *TransationRMClient - Status Transaction_Status //默认0,非事务 -} - -func (it TransationRMSession) New(mapperName string, TransactionId string, Client *TransationRMClient, Status Transaction_Status) *Session { - it.OwnerId = utils.CreateUUID() - it.TransactionId = TransactionId - it.Client = Client - it.Status = Status - it.MapperName = mapperName - var Session = Session(&it) - return &Session -} - -func (it *TransationRMSession) Id() string { - return it.TransactionId -} - -func (it *TransationRMSession) Query(sqlorArgs string) ([]map[string][]byte, error) { - var result TransactionRspDTO - var error = it.Client.Call(TransactionReqDTO{MapperName: it.MapperName, Status: it.Status, TransactionId: it.TransactionId, Sql: sqlorArgs, ActionType: ActionType_Query, OwnerId: it.OwnerId}, &result) - if error == nil && result.Error != "" { - error = utils.NewError("TransationRMSession", result.Error) - } - return result.Query, error -} - -func (it *TransationRMSession) Exec(sqlorArgs string) (*Result, error) { - var result TransactionRspDTO - var error = it.Client.Call(TransactionReqDTO{MapperName: it.MapperName, Status: it.Status, TransactionId: it.TransactionId, Sql: sqlorArgs, ActionType: ActionType_Exec, OwnerId: it.OwnerId}, &result) - if error == nil && result.Error != "" { - error = utils.NewError("TransationRMSession", result.Error) - } - return &result.Exec, error -} - -func (it *TransationRMSession) Rollback() error { - it.Status = Transaction_Status_Rollback - var result TransactionRspDTO - return it.Client.Call(TransactionReqDTO{MapperName: it.MapperName, Status: it.Status, TransactionId: it.TransactionId, ActionType: ActionType_Exec, OwnerId: it.OwnerId}, &result) -} - -func (it *TransationRMSession) Commit() error { - it.Status = Transaction_Status_Commit - var result TransactionRspDTO - return it.Client.Call(TransactionReqDTO{MapperName: it.MapperName, Status: it.Status, TransactionId: it.TransactionId, ActionType: ActionType_Exec, OwnerId: it.OwnerId}, &result) -} - -func (it *TransationRMSession) Begin() error { - it.TransactionId = utils.CreateUUID() - it.Status = Transaction_Status_Prepare - var result TransactionRspDTO - var err = it.Client.Call(TransactionReqDTO{MapperName: it.MapperName, Status: it.Status, TransactionId: it.TransactionId, ActionType: ActionType_Exec, OwnerId: it.OwnerId}, &result) - return err -} - -func (it *TransationRMSession) Close() { - if it.Status == Transaction_Status_Prepare { - it.Rollback() - } - if it.Client != nil { - it.Client.Close() - } -} diff --git a/TransationRMServer_test.go b/TransationRMServer_test.go deleted file mode 100644 index 5e3f805..0000000 --- a/TransationRMServer_test.go +++ /dev/null @@ -1 +0,0 @@ -package GoMybatis diff --git a/example/Example_test.go b/example/Example_test.go index 9163745..96cdd8a 100644 --- a/example/Example_test.go +++ b/example/Example_test.go @@ -33,8 +33,7 @@ type ExampleActivityMapper struct { DeleteById func(id string) (int64, error) `mapperParams:"id"` Choose func(deleteFlag int) ([]Activity, error) `mapperParams:"deleteFlag"` SelectLinks func(column string) ([]Activity, error) `mapperParams:"column"` - NewSession func(config *GoMybatis.TransationRMClientConfig) (GoMybatis.Session, error) //参数:config,传nil为本地session,传值则为远程 remote session - //NewSession func() (GoMybatis.Session, error) //NewSession也可以无参数写法 + NewSession func() (GoMybatis.Session, error) //session为事务操作 } //初始化mapper文件和结构体 @@ -201,7 +200,7 @@ func Test_local_Transation(t *testing.T) { return } //使用事务 - var session, err = exampleActivityMapper.NewSession(nil) + var session, err = exampleActivityMapper.NewSession() if err != nil { t.Fatal(err) } @@ -219,50 +218,6 @@ func Test_local_Transation(t *testing.T) { session.Close() //关闭事务 } -//远程事务示例,可用于分布式微服务(单数据库,多个微服务) -func Test_Remote_Transation(t *testing.T) { - if MysqlUri == "" || MysqlUri == "*" { - fmt.Println("no database url define in MysqlConfig.go , you must set the mysql link!") - return - } - //启动GoMybatis独立节点事务服务器,通过rpc调用 - var remoteAddr = "127.0.0.1:17235" - //go GoMybatis.ServerTransationTcp(remoteAddr, "mysql", MysqlUri) - - //开始使用 - //关键,使用远程Session替换本地Session调用 - var transationRMSession, err = exampleActivityMapper.NewSession(&GoMybatis.TransationRMClientConfig{ - Addr: remoteAddr, - RetryTime: 3, - TransactionId: "12345678", - Status: GoMybatis.Transaction_Status_NO, - }) - if err != nil { - t.Fatal(err) - } - - //开启远程事务 - err = transationRMSession.Begin() - if err != nil { - t.Fatal(err) - } - //使用mapper - var activityBean = Activity{ - Id: "170", - Name: "rs168-11", - } - _, err = exampleActivityMapper.UpdateById(&transationRMSession, activityBean) - if err != nil { - panic(err) - } - //提交远程事务 - transationRMSession.Commit() - //回滚远程事务 - //transationRMSession.Rollback() - - transationRMSession.Close() -} - func Test_choose(t *testing.T) { if MysqlUri == "" || MysqlUri == "*" { fmt.Println("no database url define in MysqlConfig.go , you must set the mysql link!")