Skip to content

Commit

Permalink
Create plan plan config and nodes in a transaction
Browse files Browse the repository at this point in the history
  • Loading branch information
crazytaxii committed Jun 23, 2024
1 parent 48c4293 commit 0d2a967
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 29 deletions.
53 changes: 37 additions & 16 deletions pkg/controller/plan/plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"fmt"

"gorm.io/gorm"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"

Expand Down Expand Up @@ -82,27 +83,47 @@ type plan struct {
// 2. 创建部署配置
// 3. 创建节点列表
func (p *plan) Create(ctx context.Context, req *types.CreatePlanRequest) error {
object, err := p.factory.Plan().Create(ctx, &model.Plan{
plan := &model.Plan{
Name: req.Name,
Description: req.Description,
})
if err != nil {
klog.Errorf("failed to create plan %s: %v", req.Name, err)
return errors.ErrServerInternal
}
planId := object.Id

// 创建计划的关联配置
if err = p.CreateConfig(ctx, planId, &req.Config); err != nil {
klog.Errorf("failed to create plan %s config: %v", req.Name, err)
// TODO: 事物优化
_ = p.Delete(ctx, planId)
return errors.ErrServerInternal
withConfig := func(plan *model.Plan, tx *gorm.DB) (*gorm.DB, error) {
if err := p.preCreateConfig(ctx, plan.Id, &req.Config); err != nil {
return nil, err
}

planConfig, err := p.buildPlanConfig(ctx, &req.Config)
if err != nil {
return nil, err
}
planConfig.PlanId = plan.Id

if err := p.factory.Plan().TxCreateConfig(ctx, tx, planConfig); err != nil {
klog.Errorf("failed to create plan(%s) config: %v", plan.Id, err)
return nil, err
}
return tx, nil
}

withNodes := func(plan *model.Plan, tx *gorm.DB) (*gorm.DB, error) {
for _, r := range req.Nodes {
node, err := p.buildNodeFromRequest(plan.Id, &r)
if err != nil {
klog.Errorf("failed to build plan(%d) node from request: %v", plan.Id, err)
return nil, err
}

if err := p.factory.Plan().TxCreateNode(ctx, tx, node); err != nil {
klog.Errorf("failed to create node(%s): %v", r.Name, err)
return nil, err
}
}
return tx, nil
}
// 创建关联节点
if err = p.CreateNodes(ctx, planId, req.Nodes); err != nil {
klog.Errorf("failed to create plan %s nodes: %v", req.Name, err)
_ = p.Delete(ctx, planId)

if _, err := p.factory.Plan().Create(ctx, plan, withConfig, withNodes); err != nil {
klog.Errorf("failed to create plan %s: %v", req.Name, err)
return errors.ErrServerInternal
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/plan/plan_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (p *plan) CreateConfig(ctx context.Context, pid int64, req *types.CreatePla
}
planConfig.PlanId = pid
// 创建配置
if _, err = p.factory.Plan().CreatConfig(ctx, planConfig); err != nil {
if _, err = p.factory.Plan().CreateConfig(ctx, planConfig); err != nil {
klog.Errorf("failed to create plan(%s) config: %v", pid, err)
return err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/plan/plan_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func (p *plan) createNode(ctx context.Context, planId int64, req *types.CreatePl
klog.Errorf("failed to build plan(%d) node from request: %v", planId, err)
return err
}
if _, err = p.factory.Plan().CreatNode(ctx, node); err != nil {
if _, err = p.factory.Plan().CreateNode(ctx, node); err != nil {
klog.Errorf("failed to create node(%s): %v", req.Name, err)
return err
}
Expand Down Expand Up @@ -186,7 +186,7 @@ func (p *plan) CreateOrUpdateNode(ctx context.Context, object *model.Node) error
}
// 不存在则创建
klog.Infof("plan(%d) node(%s) not exist, try to create it.", object.PlanId, object.Name)
_, err = p.factory.Plan().CreatNode(ctx, object)
_, err = p.factory.Plan().CreateNode(ctx, object)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/plan/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func (p *plan) createPlanTasksIfNotExist(tasks ...Handler) error {
}

// 不存在记录则新建
if _, err = p.factory.Plan().CreatTask(context.TODO(), &model.Task{
if _, err = p.factory.Plan().CreateTask(context.TODO(), &model.Task{
Name: name,
PlanId: planId,
Step: step,
Expand Down
60 changes: 51 additions & 9 deletions pkg/db/plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,14 @@ import (
)

type PlanInterface interface {
Create(ctx context.Context, object *model.Plan) (*model.Plan, error)
Create(ctx context.Context, object *model.Plan, opts ...CreatePlanOption) (*model.Plan, error)
Update(ctx context.Context, pid int64, resourceVersion int64, updates map[string]interface{}) error
Delete(ctx context.Context, pid int64) (*model.Plan, error)
Get(ctx context.Context, pid int64) (*model.Plan, error)
List(ctx context.Context) ([]model.Plan, error)

CreatNode(ctx context.Context, object *model.Node) (*model.Node, error)
CreateNode(ctx context.Context, object *model.Node) (*model.Node, error)
TxCreateNode(ctx context.Context, tx *gorm.DB, object *model.Node) error
UpdateNode(ctx context.Context, nodeId int64, resourceVersion int64, updates map[string]interface{}) error
DeleteNode(ctx context.Context, nodeId int64) (*model.Node, error)
GetNode(ctx context.Context, nodeId int64) (*model.Node, error)
Expand All @@ -44,7 +45,8 @@ type PlanInterface interface {

DeleteNodesByNames(ctx context.Context, planId int64, names []string) error

CreatConfig(ctx context.Context, object *model.Config) (*model.Config, error)
CreateConfig(ctx context.Context, object *model.Config) (*model.Config, error)
TxCreateConfig(ctx context.Context, tx *gorm.DB, object *model.Config) error
UpdateConfig(ctx context.Context, cfgId int64, resourceVersion int64, updates map[string]interface{}) error
DeleteConfig(ctx context.Context, cfgId int64) (*model.Config, error)
GetConfig(ctx context.Context, cfgId int64) (*model.Config, error)
Expand All @@ -53,7 +55,7 @@ type PlanInterface interface {
DeleteConfigByPlan(ctx context.Context, planId int64) error
GetConfigByPlan(ctx context.Context, planId int64) (*model.Config, error)

CreatTask(ctx context.Context, object *model.Task) (*model.Task, error)
CreateTask(ctx context.Context, object *model.Task) (*model.Task, error)
UpdateTask(ctx context.Context, pid int64, name string, updates map[string]interface{}) error
DeleteTask(ctx context.Context, pid int64) error
ListTasks(ctx context.Context, pid int64) ([]model.Task, error)
Expand All @@ -66,12 +68,34 @@ type plan struct {
db *gorm.DB
}

func (p *plan) Create(ctx context.Context, object *model.Plan) (*model.Plan, error) {
type CreatePlanOption func(plan *model.Plan, tx *gorm.DB) (*gorm.DB, error)

func (p *plan) Create(ctx context.Context, object *model.Plan, opts ...CreatePlanOption) (*model.Plan, error) {
now := time.Now()
object.GmtCreate = now
object.GmtModified = now

if err := p.db.WithContext(ctx).Create(object).Error; err != nil {
if opts == nil {
// no transaction
if err := p.db.WithContext(ctx).Create(object).Error; err != nil {
return nil, err
}
return object, nil
}

if err := p.db.WithContext(ctx).Transaction(func(tx *gorm.DB) (err error) {
if err = tx.Create(object).Error; err != nil {
return
}

for _, opt := range opts {
if tx, err = opt(object, tx); err != nil {
return
}
}

return
}); err != nil {
return nil, err
}
return object, nil
Expand Down Expand Up @@ -124,7 +148,7 @@ func (p *plan) List(ctx context.Context) ([]model.Plan, error) {
return objects, nil
}

func (p *plan) CreatNode(ctx context.Context, object *model.Node) (*model.Node, error) {
func (p *plan) CreateNode(ctx context.Context, object *model.Node) (*model.Node, error) {
now := time.Now()
object.GmtCreate = now
object.GmtModified = now
Expand All @@ -135,6 +159,15 @@ func (p *plan) CreatNode(ctx context.Context, object *model.Node) (*model.Node,
return object, nil
}

// TxCreateNode creates a node object in the given transaction.
func (p *plan) TxCreateNode(ctx context.Context, tx *gorm.DB, object *model.Node) error {
now := time.Now()
object.GmtCreate = now
object.GmtModified = now

return tx.WithContext(ctx).Create(object).Error
}

func (p *plan) UpdateNode(ctx context.Context, nodeId int64, resourceVersion int64, updates map[string]interface{}) error {
// 系统维护字段
updates["gmt_modified"] = time.Now()
Expand Down Expand Up @@ -206,7 +239,7 @@ func (p *plan) ListNodes(ctx context.Context, pid int64) ([]model.Node, error) {
return objects, nil
}

func (p *plan) CreatConfig(ctx context.Context, object *model.Config) (*model.Config, error) {
func (p *plan) CreateConfig(ctx context.Context, object *model.Config) (*model.Config, error) {
now := time.Now()
object.GmtCreate = now
object.GmtModified = now
Expand All @@ -217,6 +250,15 @@ func (p *plan) CreatConfig(ctx context.Context, object *model.Config) (*model.Co
return object, nil
}

// TxCreateConfig creates a config object in the given transaction.
func (p *plan) TxCreateConfig(ctx context.Context, tx *gorm.DB, object *model.Config) error {
now := time.Now()
object.GmtCreate = now
object.GmtModified = now

return tx.WithContext(ctx).Create(object).Error
}

func (p *plan) UpdateConfig(ctx context.Context, cid int64, resourceVersion int64, updates map[string]interface{}) error {
// 系统维护字段
updates["gmt_modified"] = time.Now()
Expand Down Expand Up @@ -279,7 +321,7 @@ func (p *plan) GetConfigByPlan(ctx context.Context, planId int64) (*model.Config
return &object, nil
}

func (p *plan) CreatTask(ctx context.Context, object *model.Task) (*model.Task, error) {
func (p *plan) CreateTask(ctx context.Context, object *model.Task) (*model.Task, error) {
now := time.Now()
object.GmtCreate = now
object.GmtModified = now
Expand Down

0 comments on commit 0d2a967

Please sign in to comment.