Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

长时间运行的任务探查接口 #20

Open
flycash opened this issue Jul 19, 2024 · 10 comments
Open

长时间运行的任务探查接口 #20

flycash opened this issue Jul 19, 2024 · 10 comments

Comments

@flycash
Copy link
Contributor

flycash commented Jul 19, 2024

目前我们支持了 HTTP 接口作为远程任务。

现在我们需要考虑这么一种场景:我的任务需要长时间运行,并且我作为用户,我希望知道我的运行进度。

因此我们改进调度 HTTP 任务的方式,为此我们有一个假定,即用户提供的 HTTP 任务接口,是符合我们预期的。

因此,对于一个 HTTP 任务来说:

  • 用户在配置里面会提供一个超时配置,这个配置是整个任务超时的配置,而不是我们发起 HTTP 调用时候的超时,这里称为 taskTimeout;
  • 我们会有一个默认的 HTTP 调用超时配置,这个是指发出 HTTP 请求的超时配置
  • 因此,当我们触发一个 HTTP 任务的时候:
    • 如果此时 HTTP 响应返回了 SUCCESS/FAILED 的状态,则这个任务已经结束了
    • 如果此时 HTTP 响应返回了 RUNNING 的状态,则这个任务还在运行中;
  • 如果触发的时候,我们收到了 RUNNING 的状态,那么后续会以固定间隔去查询任务的运行结果,这个间隔作为任务配置的一部分。在查询的时候,我们会把 execution id 传递过去(也就是你的 history 的 id)。
    • 如果依旧返回 RUNNING,那么后面还会继续询问,并且在响应里面带上 0-100 之间的数字表达进度(用户自己算,我们不管)
    • 如果返回了 SUCCESS/FAILED,就代表结束了
  • 如果在查询的时候,发现任务的运行时间已经超过了 taskTimeout,那么要给 HTTP 任务再发一个请求,告诉 HTTP 任务超时了,要求它退出(也就是强制结束运行)。那么 HTTP 此时应该返回退出成功或者失败。但是不管返回什么,我们后面都不会再次询问了。

一些设计要点:

  • 你可以强制用户的任务配置,POST 方法就是触发新任务,GET 就是查询任务的进度。例如:POST https://user/xxx/hello,就是触发任务,GET https://user/xxx/hello/execution_id 就是查询任务状态。你有别的设计也可以,例如说用户检测是否有 execution id 来判定是不是新触发,而你可以决定 execution id 放哪里,可以是路径,可以是头部,也可以是放 body。
  • 要注意,后续我们会考虑提供类似 MQ 之类的机制来允许用户回调我们的系统,也就是他们主动上报结果,这个时候,只是形态不同,但是内核是一样的。也就是说,我们掉 HTTP 接口,得知运行情况之后执行的代码,和他们主动上报(不管是通过MQ还是回调接口)之后运行的代码,应该是同一套。
  • 要注意部分失败和超时问题,重点考虑任何一个步骤,我们收到了超时响应,应该怎么办。
@flycash
Copy link
Contributor Author

flycash commented Jul 19, 2024

不需要写文档,你直接写你代码和接口,我大概就能看明白。看不明白的时候,我就会问你的。

@junwense
Copy link
Contributor

我理解是不是这样,提供一个一个查询接口,然后这个接口检查任务运行状态【这个任务可以是http任务】,这个时候我们可以要求任务的流程提供者提供一个接口,返回任务执行的状态,假设是running,successed,fail,unknow,然后根据结果做一些事情,这里用hid做幂等就行。

@flycash
Copy link
Contributor Author

flycash commented Jul 19, 2024

我理解是不是这样,提供一个一个查询接口,然后这个接口检查任务运行状态【这个任务可以是http任务】,这个时候我们可以要求任务的流程提供者提供一个接口,返回任务执行的状态,假设是running,successed,fail,unknow,然后根据结果做一些事情,这里用hid做幂等就行。

是的。

@flycash
Copy link
Contributor Author

flycash commented Jul 19, 2024

目前我们可以简化一下设计,也就是认为调度任务的 HTTP 接口,也可以用来查询任务进度。后面我们可以考虑允许用户提供不同的接口,或者集中提供一个接口查询所有的在它这里调度的任务的进度

@flycash
Copy link
Contributor Author

flycash commented Jul 19, 2024

@Jared-lu

@Jared-lu
Copy link
Contributor

在部分失败和超时响应的处理上,要考虑用户节点是不是挂了?如果挂了,那当前这个任务是直接终止,等待下一次调度呢,还是让另外的节点立刻去执行?

@flycash
Copy link
Contributor Author

flycash commented Jul 20, 2024

暂时还不需要考虑这个问题。等后面我们统一设计容错机制。暂时只需要一个探活,后续我们可以要求节点上报更多数据,或者通过第三方监控来汇总性能数据,执行容错。

@junwense
Copy link
Contributor

写了个基本的demo,我不太能理解,现在的task,executor,这2个实体具体的职责范围,可能要明确讨论下,

package executor

import (
	"context"
	"errors"
	"github.com/ecodeclub/ecron/internal/storage"
	"github.com/ecodeclub/ecron/internal/task"
	"log/slog"
	"time"
)

// ExecuteTask
// http服务抽象
// 这里需要改造目前的task设计,要抽取出一个类似于taskCfg的存储结构
// 然后这里根据cid和tid,会创建一个新的task,并且生成eid,如果创建失败,则没有eid
// 但是这样设计会有一个问题,即用户创建的是周期执行任务,eid一个会有问题,,所有也可以考虑不提供返回eid
// 提供接口让用户根据此task自己查询eid
func ExecuteTask(cid int64, tid int64, exp map[string]string) int64 {
	panic("")
}

// QueryJobByEid
// http服务抽象
// 根据eid目前任务状态,返回一个json
func QueryJobByEid(eid int64) string {
	panic("")
}

type CommonStateExecutor struct {
	l            *HttpExecutor
	executionDAO storage.ExecutionDAO
	logger       *slog.Logger
}

func (s *CommonStateExecutor) Name() string {
	return "有状态的" + s.l.Name()
}

// Run 考虑把eid传入ctx,发给job的真正执行方
func (s *CommonStateExecutor) Run(ctx context.Context, t task.Task) error {
	// 这里不把eid暴露出去,则只能通过提供接口查询记录表里面的数据获取eid
	s.saveExecutors(t.ID, t.Eid, task.ExecStatusStarted)
	var eid = t.Eid
	var err error = nil
	ch := make(chan struct{}, 1)
	ticker := time.NewTicker(time.Second * 3)
	ctx, cancel := context.WithCancel(ctx)
	go func() {
		go func() {
			err = s.l.Run(ctx, t)
		}()
		for {
			i := 0
			select {
			// 这里是调用http接口获取状态
			case <-ticker.C:
				// 这段代码考虑通过封装,暴露成http接口,可以主动上报错误,但是比较麻烦的是ch这个怎么处理
				state, rates := getTargetState()
				switch state {
				case TargetStateRunning:
					i = 0
					s.updateExecutorRate(t.ID, t.Eid, rates)
				case TargetStateFailed:
					ch <- struct{}{}
					cancel()
					return
				case TargetStateSucceed:
					ch <- struct{}{}
					cancel()
					return
				case TargetStateTimeout:
					i++
					if i == 3 {
						ch <- struct{}{}
						cancel()
						return
					}
				default:
					ch <- struct{}{}
					cancel()
					return
				}
			case <-ctx.Done():
				return
			}
		}

	}()

	select {
	case <-ch:
	case <-ctx.Done():
		err = ctx.Err()
		close(ch)
		_ = cancelTarget()
	}
	defer func() {
		switch {
		case errors.Is(err, context.DeadlineExceeded):
			s.updateExecutor(t.ID, eid, task.ExecStatusDeadlineExceeded)
		case errors.Is(err, context.Canceled):
			s.updateExecutor(t.ID, eid, task.ExecStatusCancelled)
		case err == nil:
			s.updateExecutor(t.ID, eid, task.ExecStatusSuccess)
		default:
			s.updateExecutor(t.ID, eid, task.ExecStatusFailed)
		}
	}()

	return err
}

type TargetState uint8

const (
	TargetStateUnknown TargetState = iota
	TargetStateRunning
	TargetStateSucceed
	TargetStateFailed
	TargetStateTimeout
)

// todo 抽取成方法放在task上
func getTargetState() (TargetState, uint8) {
	return TargetStateSucceed, 0
}

// todo 抽取成方法放在task上
func cancelTarget() error {
	panic("")
}

func (s *CommonStateExecutor) GetState(ctx context.Context, tid int64, eid int64) (ExecStatus, error) {
	panic(" 从 executionDAO 里查当前任务执行状态")
}
func (s *CommonStateExecutor) saveExecutors(tid int64, eid int64, status task.ExecStatus) {
	panic("记录执行记录,返回eid")
}
func (s *CommonStateExecutor) updateExecutor(tid int64, eid int64, status task.ExecStatus) {
	panic("更新结果")
}
func (s *CommonStateExecutor) updateExecutorRate(tid int64, eid int64, rate uint8) {
	panic("更新结果")
}

@flycash
Copy link
Contributor Author

flycash commented Jul 22, 2024

很简单,Scheduler 就是你的老板,你就是 Executor。你的老板只负责安排任务给你,然后你去执行。在任务这里,根据任务的形态引入了不同的实现,比如说 http 任务代表的是任务本身就是一个 http 接口,调度执行就是发一个 http 请求。

Scheduler 只是居中指挥而已。比如说抢占一个任务,挑选合适的执行节点,安排进度追踪这种

@junwense
Copy link
Contributor

junwense commented Jul 23, 2024

很简单,Scheduler 就是你的老板,你就是 Executor。你的老板只负责安排任务给你,然后你去执行。在任务这里,根据任务的形态引入了不同的实现,比如说 http 任务代表的是任务本身就是一个 http 接口,调度执行就是发一个 http 请求。

Scheduler 只是居中指挥而已。比如说抢占一个任务,挑选合适的执行节点,安排进度追踪这种

Scheduler作为调度者,他的职责我觉得是没有异议的
executor作为执行器和task作为任务这里就有比较奇怪的争议:
比如

  1. 按照以下范围设计:用法可能就是有固定几个executor,然后用户要创建不同复杂的task注册
    executor作为执行器,只负责http请求,grpc请求,这些事情
    task作为任务,任务可以是普通任务,可观测的任务,可以重复执行的任务,固定定时执行的任务,执行完成休息一段时间再执行的任务(也就是现在的task设计)
    这样,就会导致任务的形态很多变,还可能引入taskCfg这种抽象

  2. 按照另一个纬度设计:用法可能是只要指定一个简单的task配置,然后写一个复杂的executor,可以组合不同executor
    executor有很多个,http请求的executor里面也可以包含各种实现,比如可观测的http请求,固定执行http的请求,这种设计就要求把task的调度逻辑封装在executor中,这个和现在设计是存在冲突的,
    task作为任务,只有配置信息,不负责流程。

最后附下个人先入为主的观点:
scheuler作为调度者
taskcfg作为任务的配置:
包含2部分,1.是任务的调度模式、任务的资源需求,2.任务的调用执行信息
executor作为任务具体执行者,这里为什么我纠结,其实很多task的执行流程是一样的,或者说是有共性,比方说http调用,http调用的可观测叠加,grpc调用,grpc的可观测,甚至可以说shell调用,那么就有一个问题,这些应该是通过修饰器组合而成的,这个时候就要明确下哪些是可以做成修饰器,哪些要做出executor的固定环节,目前我觉得调度的流程应该是不能修饰的,调度流程和执行流程应该是分开的2个部分

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants