Skip to content

Commit

Permalink
feat(pixiu): add pagination
Browse files Browse the repository at this point in the history
  • Loading branch information
jihu committed Dec 10, 2024
1 parent 855c928 commit f9fb99e
Show file tree
Hide file tree
Showing 14 changed files with 180 additions and 102 deletions.
8 changes: 4 additions & 4 deletions pkg/client/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type WrapObject struct {
type Task struct {
sync.RWMutex

Lister func(ctx context.Context, planId int64, opts ...db.Options) ([]model.Task, int64, error)
Lister func(ctx context.Context, planId int64, opts ...db.Options) ([]model.Task, error)
items map[int64]WrapObject
}

Expand All @@ -45,7 +45,7 @@ func NewTaskCache() *Task {
return t
}

func (t *Task) SetLister(Lister func(ctx context.Context, planId int64, opts ...db.Options) ([]model.Task, int64, error)) {
func (t *Task) SetLister(Lister func(ctx context.Context, planId int64, opts ...db.Options) ([]model.Task, error)) {
t.Lock()
defer t.Unlock()

Expand Down Expand Up @@ -127,7 +127,7 @@ func (t *Task) WaitForCacheSync(planId int64) error {
return nil
}

tasks, _, err := t.Lister(context.TODO(), planId)
tasks, err := t.Lister(context.TODO(), planId)
if err != nil {
return fmt.Errorf("failed to get plan(%d) tasks from database: %v", planId, err)
}
Expand Down Expand Up @@ -155,7 +155,7 @@ func (t *Task) syncTasks() {
continue
}

newTasks, _, err := t.Lister(context.TODO(), planId)
newTasks, err := t.Lister(context.TODO(), planId)
if err != nil {
klog.Errorf("[syncTasks] failed to list plan(%d) tasks: %v", planId, err)
delete(t.items, planId)
Expand Down
11 changes: 10 additions & 1 deletion pkg/controller/audit/audit.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,16 @@ func (a *audit) Get(ctx context.Context, aid int64) (*types.Audit, error) {
func (a *audit) List(ctx context.Context, req *types.PageRequest) (*types.PageResponse, error) {
var ts []types.Audit

objects, total, err := a.factory.Audit().List(ctx, req.BuildPageNation()...)
total, err := a.factory.Audit().Count(ctx)
if err != nil {
klog.Errorf("failed to get tenant count: %v", err)
return nil, errors.ErrServerInternal
}
if total == 0 {
return &types.PageResponse{}, nil
}

objects, err := a.factory.Audit().List(ctx, req.BuildPageNation()...)
if err != nil {
klog.Errorf("failed to get tenants: %v", err)
return nil, errors.ErrServerInternal
Expand Down
14 changes: 11 additions & 3 deletions pkg/controller/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,15 @@ func (c *cluster) Get(ctx context.Context, cid int64) (*types.Cluster, error) {
func (c *cluster) List(ctx context.Context, req *types.PageRequest) (*types.PageResponse, error) {
opts := append(ctrlutil.MakeDbOptions(ctx), req.BuildPageNation()...)

objects, total, err := c.factory.Cluster().List(ctx, opts...)
total, err := c.factory.Cluster().Count(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get cluster count: %v", err)
}
if total == 0 {
return &types.PageResponse{}, nil
}

objects, err := c.factory.Cluster().List(ctx, opts...)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -698,7 +706,7 @@ func (c *cluster) GetKubernetesMetaFromPlan(ctx context.Context, planId int64) (
return nil, err
}

nodes, _, err := c.factory.Plan().ListNodes(ctx, planId)
nodes, err := c.factory.Plan().ListNodes(ctx, planId)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -840,7 +848,7 @@ func (c *cluster) GetClusterStatusFromPlanTask(planId int64) (model.ClusterStatu

// 尝试获取最新的任务状态
// 获取失败也不中断返回
if tasks, _, err := c.factory.Plan().ListTasks(context.TODO(), planId); err == nil {
if tasks, err := c.factory.Plan().ListTasks(context.TODO(), planId); err == nil {
if len(tasks) == 0 {
status = model.ClusterStatusUnStart
} else {
Expand Down
17 changes: 13 additions & 4 deletions pkg/controller/plan/plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,10 +272,19 @@ func (p *plan) List(ctx context.Context, req *types.PageRequest) (*types.PageRes
opts = req.BuildPageNation()
}

objects, total, err := p.factory.Plan().List(ctx, opts...)
total, err := p.factory.Plan().Count(ctx)
if err != nil {
klog.Errorf("failed to get plan count: %v", err)
return nil, err
}
if total == 0 {
return &types.PageResponse{}, nil
}

objects, err := p.factory.Plan().List(ctx, opts...)
if err != nil {
klog.Errorf("failed to get plans: %v", err)
return nil, errors.ErrServerInternal
return nil, err
}

for _, object := range objects {
Expand Down Expand Up @@ -379,7 +388,7 @@ func (p *plan) preStart(ctx context.Context, pid int64) error {
// TaskIsRunning
// 校验是否有任务正在运行
func (p *plan) TaskIsRunning(ctx context.Context, planId int64) (bool, error) {
tasks, _, err := p.factory.Plan().ListTasks(ctx, planId)
tasks, err := p.factory.Plan().ListTasks(ctx, planId)
if err != nil {
klog.Errorf("failed to get tasks of plan %d: %v", planId, err)
return false, errors.ErrServerInternal
Expand Down Expand Up @@ -414,7 +423,7 @@ func (p *plan) model2Type(o *model.Plan) (*types.Plan, error) {

// 尝试获取最新的任务状态
// 获取失败也不中断返回
if tasks, _, err := p.factory.Plan().ListTasks(context.TODO(), o.Id); err == nil {
if tasks, err := p.factory.Plan().ListTasks(context.TODO(), o.Id); err == nil {
if len(tasks) == 0 {
status = model.UnStartPlanStatus
} else {
Expand Down
15 changes: 12 additions & 3 deletions pkg/controller/plan/plan_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (p *plan) UpdateNode(ctx context.Context, pid int64, nodeId int64, req *typ
// 新增没有的节点
// 更新已存在的节点
func (p *plan) updateNodesIfNeeded(ctx context.Context, planId int64, req *types.UpdatePlanRequest) error {
oldNodes, _, err := p.factory.Plan().ListNodes(ctx, planId)
oldNodes, err := p.factory.Plan().ListNodes(ctx, planId)
if err != nil {
return err
}
Expand Down Expand Up @@ -171,10 +171,19 @@ func (p *plan) ListNodes(ctx context.Context, pid int64, req *types.PageRequest)
opts = req.BuildPageNation()
}

objects, total, err := p.factory.Plan().ListNodes(ctx, pid, opts...)
total, err := p.factory.Plan().NodeCount(ctx, pid)
if err != nil {
klog.Errorf("failed to get plan(%d) node count: %v", pid, err)
return nil, err
}
if total == 0 {
return &types.PageResponse{}, nil
}

objects, err := p.factory.Plan().ListNodes(ctx, pid, opts...)
if err != nil {
klog.Errorf("failed to get plan(%d) nodes: %v", pid, err)
return nil, errors.ErrServerInternal
return nil, err
}

for _, object := range objects {
Expand Down
11 changes: 10 additions & 1 deletion pkg/controller/plan/plan_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,16 @@ func (p *plan) RunTask(ctx context.Context, planId int64, taskId int64) error {
func (p *plan) ListTasks(ctx context.Context, planId int64, req *types.PageRequest) (*types.PageResponse, error) {
var tasks []types.PlanTask

objects, total, err := p.factory.Plan().ListTasks(ctx, planId, req.BuildPageNation()...)
total, err := p.factory.Plan().TaskCount(ctx, planId)
if err != nil {
klog.Errorf("failed to get plan(%d) task count: %v", planId, err)
return nil, err
}
if total == 0 {
return &types.PageResponse{}, nil
}

objects, err := p.factory.Plan().ListTasks(ctx, planId, req.BuildPageNation()...)
if err != nil {
klog.Errorf("failed to get plan(%d) tasks: %v", planId, err)
return nil, err
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/plan/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func (t TaskData) validate() error {
}

func (p *plan) getTaskData(ctx context.Context, planId int64) (TaskData, error) {
nodes, _, err := p.factory.Plan().ListNodes(ctx, planId)
nodes, err := p.factory.Plan().ListNodes(ctx, planId)
if err != nil {
return TaskData{}, err
}
Expand Down Expand Up @@ -196,7 +196,7 @@ func (p *plan) GetRunner(osImage string) (string, error) {
// 任务启动时设置为运行中,结束时同步为结束状态(成功或者失败)
// TODO: 后续优化,判断对应部署容器是否在运行,根据容器的运行结果同步状态
func (p *plan) syncStatus(ctx context.Context, planId int64) error {
tasks, _, err := p.factory.Plan().ListTasks(ctx, planId)
tasks, err := p.factory.Plan().ListTasks(ctx, planId)
if err != nil {
return err
}
Expand Down
13 changes: 11 additions & 2 deletions pkg/controller/tenant/tenant.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,10 +121,19 @@ func (t *tenant) Get(ctx context.Context, tid int64) (*types.Tenant, error) {
func (t *tenant) List(ctx context.Context, req *types.PageRequest) (*types.PageResponse, error) {
var ts []types.Tenant

objects, total, err := t.factory.Tenant().List(ctx, req.BuildPageNation()...)
total, err := t.factory.Tenant().Count(ctx)
if err != nil {
klog.Errorf("failed to get tenant count: %v", err)
return nil, err
}
if total == 0 {
return &types.PageResponse{}, nil
}

objects, err := t.factory.Tenant().List(ctx, req.BuildPageNation()...)
if err != nil {
klog.Errorf("failed to get tenants: %v", err)
return nil, errors.ErrServerInternal
return nil, err
}

for _, object := range objects {
Expand Down
10 changes: 8 additions & 2 deletions pkg/controller/user/user.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,10 +227,16 @@ func (u *user) Get(ctx context.Context, userId int64) (*types.User, error) {
func (u *user) List(ctx context.Context, req *types.PageRequest) (*types.PageResponse, error) {
var users []types.User

objects, total, err := u.factory.User().List(ctx, req.BuildPageNation()...)
total, err := u.factory.User().Count(ctx)
if err != nil {
klog.Errorf("failed to get user count: %v", err)
return nil, err
}

objects, err := u.factory.User().List(ctx, req.BuildPageNation()...)
if err != nil {
klog.Errorf("failed to get user list: %v", err)
return nil, errors.ErrServerInternal
return nil, err
}

for _, object := range objects {
Expand Down
26 changes: 15 additions & 11 deletions pkg/db/audit.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,11 @@ import (
)

type AuditInterface interface {
List(ctx context.Context, opts ...Options) ([]model.Audit, int64, error)
List(ctx context.Context, opts ...Options) ([]model.Audit, error)
Get(ctx context.Context, id int64) (*model.Audit, error)
Create(ctx context.Context, object *model.Audit) (*model.Audit, error)
BatchDelete(ctx context.Context, opts ...Options) (int64, error)
Count(ctx context.Context) (int64, error)
}

type audit struct {
Expand Down Expand Up @@ -63,23 +64,26 @@ func (a *audit) Get(ctx context.Context, aid int64) (*model.Audit, error) {
return audit, nil
}

func (a *audit) List(ctx context.Context, opts ...Options) ([]model.Audit, int64, error) {
var (
audits []model.Audit
total int64
)
func (a *audit) Count(ctx context.Context) (int64, error) {
var total int64
if err := a.db.WithContext(ctx).Model(&model.Audit{}).Count(&total).Error; err != nil {
return 0, err
}
return total, nil
}

func (a *audit) List(ctx context.Context, opts ...Options) ([]model.Audit, error) {
var audits []model.Audit

tx := a.db.WithContext(ctx)
for _, opt := range opts {
tx = opt(tx)
}
if err := tx.Find(&audits).Error; err != nil {
return nil, 0, err
}
if err := tx.Model(&model.Audit{}).Count(&total).Error; err != nil {
return nil, 0, err
return nil, err
}

return audits, total, nil
return audits, nil
}

func (a *audit) BatchDelete(ctx context.Context, opts ...Options) (int64, error) {
Expand Down
27 changes: 16 additions & 11 deletions pkg/db/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ type ClusterInterface interface {
Update(ctx context.Context, cid int64, resourceVersion int64, updates map[string]interface{}) error
Delete(ctx context.Context, cluster *model.Cluster, fns ...func(*model.Cluster) error) error
Get(ctx context.Context, cid int64, opts ...Options) (*model.Cluster, error)
List(ctx context.Context, opts ...Options) ([]model.Cluster, int64, error)
List(ctx context.Context, opts ...Options) ([]model.Cluster, error)
Count(ctx context.Context) (int64, error)

// InternalUpdate 内部更新,不更新版本号
InternalUpdate(ctx context.Context, cid int64, updates map[string]interface{}) error
Expand Down Expand Up @@ -132,23 +133,27 @@ func (c *cluster) Get(ctx context.Context, cid int64, opts ...Options) (*model.C
return &object, nil
}

func (c *cluster) List(ctx context.Context, opts ...Options) ([]model.Cluster, int64, error) {
var (
cs []model.Cluster
total int64
)
func (c *cluster) Count(ctx context.Context) (int64, error) {
var total int64
if err := c.db.WithContext(ctx).Model(&model.Cluster{}).Count(&total).Error; err != nil {
return 0, err
}

return total, nil
}

func (c *cluster) List(ctx context.Context, opts ...Options) ([]model.Cluster, error) {
var cs []model.Cluster

tx := c.db.WithContext(ctx)
for _, opt := range opts {
tx = opt(tx)
}
if err := tx.Find(&cs).Error; err != nil {
return nil, 0, err
}
if err := tx.Model(&model.Cluster{}).Count(&total).Error; err != nil {
return nil, 0, err
return nil, err
}

return cs, total, nil
return cs, nil
}

func (c *cluster) GetClusterByName(ctx context.Context, name string) (*model.Cluster, error) {
Expand Down
Loading

0 comments on commit f9fb99e

Please sign in to comment.