feat: task can report step and final result
This commit is contained in:
parent
9be39b8cd4
commit
6030610c04
7 changed files with 219 additions and 36 deletions
|
@ -31,3 +31,23 @@ type Client interface {
|
||||||
// UpdateStep updates the build step.
|
// UpdateStep updates the build step.
|
||||||
UpdateStep(ctx context.Context, args *runnerv1.UpdateStepRequest) error
|
UpdateStep(ctx context.Context, args *runnerv1.UpdateStepRequest) error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type contextKey string
|
||||||
|
|
||||||
|
const clientContextKey = contextKey("gitea.rpc.client")
|
||||||
|
|
||||||
|
// FromContext returns the client from the context.
|
||||||
|
func FromContext(ctx context.Context) Client {
|
||||||
|
val := ctx.Value(clientContextKey)
|
||||||
|
if val != nil {
|
||||||
|
if c, ok := val.(Client); ok {
|
||||||
|
return c
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// WithClient returns a new context with the given client.
|
||||||
|
func WithClient(ctx context.Context, c Client) context.Context {
|
||||||
|
return context.WithValue(ctx, clientContextKey, c)
|
||||||
|
}
|
||||||
|
|
|
@ -3,6 +3,7 @@ package cmd
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"os"
|
"os"
|
||||||
|
"strconv"
|
||||||
|
|
||||||
"gitea.com/gitea/act_runner/engine"
|
"gitea.com/gitea/act_runner/engine"
|
||||||
"gitea.com/gitea/act_runner/runtime"
|
"gitea.com/gitea/act_runner/runtime"
|
||||||
|
@ -30,7 +31,7 @@ func initLogging(cfg Config) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func Execute(ctx context.Context) {
|
func Execute(ctx context.Context) {
|
||||||
task := runtime.NewTask()
|
task := runtime.NewTask(0)
|
||||||
|
|
||||||
// ./act_runner
|
// ./act_runner
|
||||||
rootCmd := &cobra.Command{
|
rootCmd := &cobra.Command{
|
||||||
|
@ -74,7 +75,8 @@ func runRoot(ctx context.Context, task *runtime.Task) func(cmd *cobra.Command, a
|
||||||
log.WithError(err).Fatalln("failed to connect docker daemon engine")
|
log.WithError(err).Fatalln("failed to connect docker daemon engine")
|
||||||
}
|
}
|
||||||
|
|
||||||
task.JobID = jobID
|
task.BuildID, _ = strconv.ParseInt(jobID, 10, 64)
|
||||||
return task.Run(ctx)
|
task.Run(ctx)
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
8
go.sum
8
go.sum
|
@ -25,14 +25,6 @@ cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohl
|
||||||
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
|
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
|
||||||
gitea.com/gitea/act v0.0.0-20220812010840-c467b06265ee h1:T0wftx4RaYqbTH4t0A7bXGXxemZloCrjReA7xJvIVdY=
|
gitea.com/gitea/act v0.0.0-20220812010840-c467b06265ee h1:T0wftx4RaYqbTH4t0A7bXGXxemZloCrjReA7xJvIVdY=
|
||||||
gitea.com/gitea/act v0.0.0-20220812010840-c467b06265ee/go.mod h1:G37Vfz4J6kJ5NbcPI5xQUkeWPVkUCP5J+MFkaWU9jNY=
|
gitea.com/gitea/act v0.0.0-20220812010840-c467b06265ee/go.mod h1:G37Vfz4J6kJ5NbcPI5xQUkeWPVkUCP5J+MFkaWU9jNY=
|
||||||
gitea.com/gitea/proto-go v0.0.0-20220817054638-17fb0016dd41 h1:FIGF6szYd3lBIwvbeedfU5Lc7uG1Xpzi7bkktS6Vdvg=
|
|
||||||
gitea.com/gitea/proto-go v0.0.0-20220817054638-17fb0016dd41/go.mod h1:hD8YwSHusjwjEEgubW6XFvnZuNhMZTHz6lwjfltEt/Y=
|
|
||||||
gitea.com/gitea/proto-go v0.0.0-20220828011358-d0a015a5b095 h1:Ng3GDJLYpsG3lYdaqDzeZFkRm5ShA2V+LWJSHRD0IQ0=
|
|
||||||
gitea.com/gitea/proto-go v0.0.0-20220828011358-d0a015a5b095/go.mod h1:hD8YwSHusjwjEEgubW6XFvnZuNhMZTHz6lwjfltEt/Y=
|
|
||||||
gitea.com/gitea/proto-go v0.0.0-20220828031749-616e40329b57 h1:eVM6m3h5KpmJM2+LEqroENFaMs2kAo8QNIPyMgho9jg=
|
|
||||||
gitea.com/gitea/proto-go v0.0.0-20220828031749-616e40329b57/go.mod h1:hD8YwSHusjwjEEgubW6XFvnZuNhMZTHz6lwjfltEt/Y=
|
|
||||||
gitea.com/gitea/proto-go v0.0.0-20220901061207-b88901a1b9bc h1:kTVjwKxXma2yAdgXz8T1tiJihtWFK8jGLqArX2NownM=
|
|
||||||
gitea.com/gitea/proto-go v0.0.0-20220901061207-b88901a1b9bc/go.mod h1:hD8YwSHusjwjEEgubW6XFvnZuNhMZTHz6lwjfltEt/Y=
|
|
||||||
gitea.com/gitea/proto-go v0.0.0-20220901135226-82a982903134 h1:5ofH0FGEkIj/P9a6oFDgkdmGSWow1yD1uubiftMA2Kw=
|
gitea.com/gitea/proto-go v0.0.0-20220901135226-82a982903134 h1:5ofH0FGEkIj/P9a6oFDgkdmGSWow1yD1uubiftMA2Kw=
|
||||||
gitea.com/gitea/proto-go v0.0.0-20220901135226-82a982903134/go.mod h1:hD8YwSHusjwjEEgubW6XFvnZuNhMZTHz6lwjfltEt/Y=
|
gitea.com/gitea/proto-go v0.0.0-20220901135226-82a982903134/go.mod h1:hD8YwSHusjwjEEgubW6XFvnZuNhMZTHz6lwjfltEt/Y=
|
||||||
github.com/Azure/azure-sdk-for-go v16.2.1+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc=
|
github.com/Azure/azure-sdk-for-go v16.2.1+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc=
|
||||||
|
|
|
@ -93,5 +93,12 @@ func (p *Poller) poll(ctx context.Context, thread int) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// FIXME: for testing task
|
||||||
|
// stage.Id = 111
|
||||||
|
// stage.BuildId = 1222
|
||||||
|
|
||||||
|
// set client to context, so that the stage can use it to
|
||||||
|
ctx = client.WithClient(ctx, p.Client)
|
||||||
|
|
||||||
return p.Dispatch(ctx, stage)
|
return p.Dispatch(ctx, stage)
|
||||||
}
|
}
|
||||||
|
|
|
@ -29,10 +29,7 @@ func (s *Runner) Run(ctx context.Context, stage *runnerv1.Stage) error {
|
||||||
WithField("runner.BuildID", stage.BuildId)
|
WithField("runner.BuildID", stage.BuildId)
|
||||||
|
|
||||||
l.Info("start running pipeline")
|
l.Info("start running pipeline")
|
||||||
// TODO: docker runner with stage data
|
|
||||||
// task.Run is blocking, so we need to use goroutine to run it in background
|
|
||||||
// return task metadata and status to the server
|
|
||||||
task := NewTask()
|
|
||||||
|
|
||||||
return task.Run(ctx)
|
// TODO: use ctx to transfer usage information
|
||||||
|
return startTask(stage.BuildId, ctx)
|
||||||
}
|
}
|
||||||
|
|
173
runtime/task.go
173
runtime/task.go
|
@ -5,12 +5,18 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"gitea.com/gitea/act_runner/client"
|
||||||
"github.com/nektos/act/pkg/artifacts"
|
"github.com/nektos/act/pkg/artifacts"
|
||||||
"github.com/nektos/act/pkg/common"
|
"github.com/nektos/act/pkg/common"
|
||||||
"github.com/nektos/act/pkg/model"
|
"github.com/nektos/act/pkg/model"
|
||||||
"github.com/nektos/act/pkg/runner"
|
"github.com/nektos/act/pkg/runner"
|
||||||
|
"github.com/sirupsen/logrus"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
|
|
||||||
|
runnerv1 "gitea.com/gitea/proto-go/runner/v1"
|
||||||
)
|
)
|
||||||
|
|
||||||
type TaskInput struct {
|
type TaskInput struct {
|
||||||
|
@ -54,6 +60,7 @@ type TaskInput struct {
|
||||||
|
|
||||||
type taskLogHook struct {
|
type taskLogHook struct {
|
||||||
entries []*log.Entry
|
entries []*log.Entry
|
||||||
|
lock sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *taskLogHook) Levels() []log.Level {
|
func (h *taskLogHook) Levels() []log.Level {
|
||||||
|
@ -62,27 +69,69 @@ func (h *taskLogHook) Levels() []log.Level {
|
||||||
|
|
||||||
func (h *taskLogHook) Fire(entry *log.Entry) error {
|
func (h *taskLogHook) Fire(entry *log.Entry) error {
|
||||||
if flag, ok := entry.Data["raw_output"]; ok {
|
if flag, ok := entry.Data["raw_output"]; ok {
|
||||||
|
h.lock.Lock()
|
||||||
if flagVal, ok := flag.(bool); flagVal && ok {
|
if flagVal, ok := flag.(bool); flagVal && ok {
|
||||||
log.Infof("task log: %s", entry.Message)
|
log.Infof("task log: %s", entry.Message)
|
||||||
h.entries = append(h.entries, entry)
|
h.entries = append(h.entries, entry)
|
||||||
}
|
}
|
||||||
|
h.lock.Unlock()
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
type Task struct {
|
func (h *taskLogHook) swapLogs() []*log.Entry {
|
||||||
JobID string
|
if len(h.entries) == 0 {
|
||||||
Input *TaskInput
|
return nil
|
||||||
LogHook *taskLogHook
|
}
|
||||||
|
h.lock.Lock()
|
||||||
|
entries := h.entries
|
||||||
|
h.entries = nil
|
||||||
|
h.lock.Unlock()
|
||||||
|
return entries
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewTask() *Task {
|
type TaskState int
|
||||||
|
|
||||||
|
const (
|
||||||
|
// TaskStateUnknown is the default state
|
||||||
|
TaskStateUnknown TaskState = iota
|
||||||
|
// TaskStatePending is the pending state
|
||||||
|
// pending means task is received, parsing actions and preparing to run
|
||||||
|
TaskStatePending
|
||||||
|
// TaskStateRunning is the state when the task is running
|
||||||
|
// running means task is running
|
||||||
|
TaskStateRunning
|
||||||
|
// TaskStateSuccess is the state when the task is successful
|
||||||
|
// success means task is successful without any error
|
||||||
|
TaskStateSuccess
|
||||||
|
// TaskStateFailure is the state when the task is failed
|
||||||
|
// failure means task is failed with error
|
||||||
|
TaskStateFailure
|
||||||
|
)
|
||||||
|
|
||||||
|
type Task struct {
|
||||||
|
BuildID int64
|
||||||
|
Input *TaskInput
|
||||||
|
|
||||||
|
logHook *taskLogHook
|
||||||
|
state TaskState
|
||||||
|
client client.Client
|
||||||
|
log *logrus.Entry
|
||||||
|
}
|
||||||
|
|
||||||
|
// newTask creates a new task
|
||||||
|
func NewTask(buildID int64) *Task {
|
||||||
task := &Task{
|
task := &Task{
|
||||||
Input: &TaskInput{
|
Input: &TaskInput{
|
||||||
reuseContainers: true,
|
reuseContainers: true,
|
||||||
ForgeInstance: "gitea",
|
ForgeInstance: "gitea",
|
||||||
},
|
},
|
||||||
LogHook: &taskLogHook{},
|
BuildID: buildID,
|
||||||
|
|
||||||
|
state: TaskStatePending,
|
||||||
|
client: nil,
|
||||||
|
log: logrus.WithField("buildID", buildID),
|
||||||
|
logHook: &taskLogHook{},
|
||||||
}
|
}
|
||||||
task.Input.repoDirectory, _ = os.Getwd()
|
task.Input.repoDirectory, _ = os.Getwd()
|
||||||
return task
|
return task
|
||||||
|
@ -109,14 +158,86 @@ func demoPlatforms() map[string]string {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *Task) Run(ctx context.Context) error {
|
// reportFailure reports the failure of the task
|
||||||
|
func (t *Task) reportFailure(ctx context.Context, err error) {
|
||||||
|
t.state = TaskStateFailure
|
||||||
|
finishTask(t.BuildID)
|
||||||
|
|
||||||
|
t.log.Errorf("task failed: %v", err)
|
||||||
|
|
||||||
|
if t.client == nil {
|
||||||
|
// TODO: fill the step request
|
||||||
|
stepRequest := &runnerv1.UpdateStepRequest{}
|
||||||
|
t.client.UpdateStep(ctx, stepRequest)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *Task) startReporting(interval int64, ctx context.Context) {
|
||||||
|
for {
|
||||||
|
time.Sleep(time.Duration(interval) * time.Second)
|
||||||
|
if t.state == TaskStateSuccess || t.state == TaskStateFailure {
|
||||||
|
t.log.Debugf("task reporting stopped")
|
||||||
|
break
|
||||||
|
}
|
||||||
|
t.reportStep(ctx)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// reportStep reports the step of the task
|
||||||
|
func (t *Task) reportStep(ctx context.Context) {
|
||||||
|
if t.client == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
logValues := t.logHook.swapLogs()
|
||||||
|
if len(logValues) == 0 {
|
||||||
|
t.log.Debugf("no log to report")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
t.log.Infof("reporting %d logs", len(logValues))
|
||||||
|
|
||||||
|
// TODO: fill the step request
|
||||||
|
stepRequest := &runnerv1.UpdateStepRequest{}
|
||||||
|
t.client.UpdateStep(ctx, stepRequest)
|
||||||
|
}
|
||||||
|
|
||||||
|
// reportSuccess reports the success of the task
|
||||||
|
func (t *Task) reportSuccess(ctx context.Context) {
|
||||||
|
t.state = TaskStateSuccess
|
||||||
|
finishTask(t.BuildID)
|
||||||
|
|
||||||
|
t.log.Infof("task success")
|
||||||
|
|
||||||
|
if t.client == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: fill the step request
|
||||||
|
stepRequest := &runnerv1.UpdateStepRequest{}
|
||||||
|
t.client.UpdateStep(ctx, stepRequest)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *Task) Run(ctx context.Context) {
|
||||||
|
// get client for context, use for reporting
|
||||||
|
t.client = client.FromContext(ctx)
|
||||||
|
if t.client == nil {
|
||||||
|
t.log.Warnf("no client found in context")
|
||||||
|
} else {
|
||||||
|
t.log.Infof("client found in context")
|
||||||
|
}
|
||||||
|
|
||||||
workflowsPath, err := getWorkflowsPath(t.Input.repoDirectory)
|
workflowsPath, err := getWorkflowsPath(t.Input.repoDirectory)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
t.reportFailure(ctx, err)
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
t.log.Debugf("workflows path: %s", workflowsPath)
|
||||||
|
|
||||||
planner, err := model.NewWorkflowPlanner(workflowsPath, false)
|
planner, err := model.NewWorkflowPlanner(workflowsPath, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
t.reportFailure(ctx, err)
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
var eventName string
|
var eventName string
|
||||||
|
@ -124,7 +245,7 @@ func (t *Task) Run(ctx context.Context) error {
|
||||||
if len(events) > 0 {
|
if len(events) > 0 {
|
||||||
// set default event type to first event
|
// set default event type to first event
|
||||||
// this way user dont have to specify the event.
|
// this way user dont have to specify the event.
|
||||||
log.Debugf("Using detected workflow event: %s", events[0])
|
t.log.Debugf("Using detected workflow event: %s", events[0])
|
||||||
eventName = events[0]
|
eventName = events[0]
|
||||||
} else {
|
} else {
|
||||||
if plan := planner.PlanEvent("push"); plan != nil {
|
if plan := planner.PlanEvent("push"); plan != nil {
|
||||||
|
@ -134,18 +255,22 @@ func (t *Task) Run(ctx context.Context) error {
|
||||||
|
|
||||||
// build the plan for this run
|
// build the plan for this run
|
||||||
var plan *model.Plan
|
var plan *model.Plan
|
||||||
jobID := t.JobID
|
jobID := ""
|
||||||
|
if t.BuildID > 0 {
|
||||||
|
jobID = fmt.Sprintf("%d", t.BuildID)
|
||||||
|
}
|
||||||
if jobID != "" {
|
if jobID != "" {
|
||||||
log.Debugf("Planning job: %s", jobID)
|
t.log.Infof("Planning job: %s", jobID)
|
||||||
plan = planner.PlanJob(jobID)
|
plan = planner.PlanJob(jobID)
|
||||||
} else {
|
} else {
|
||||||
log.Debugf("Planning event: %s", eventName)
|
t.log.Infof("Planning event: %s", eventName)
|
||||||
plan = planner.PlanEvent(eventName)
|
plan = planner.PlanEvent(eventName)
|
||||||
}
|
}
|
||||||
|
|
||||||
curDir, err := os.Getwd()
|
curDir, err := os.Getwd()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
t.reportFailure(ctx, err)
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// run the plan
|
// run the plan
|
||||||
|
@ -182,21 +307,29 @@ func (t *Task) Run(ctx context.Context) error {
|
||||||
}
|
}
|
||||||
r, err := runner.New(config)
|
r, err := runner.New(config)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("new config failed: %v", err)
|
t.reportFailure(ctx, err)
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
cancel := artifacts.Serve(ctx, input.artifactServerPath, input.artifactServerPort)
|
cancel := artifacts.Serve(ctx, input.artifactServerPath, input.artifactServerPort)
|
||||||
|
t.log.Debugf("artifacts server started at %s:%s", input.artifactServerPath, input.artifactServerPort)
|
||||||
|
|
||||||
executor := r.NewPlanExecutor(plan).Finally(func(ctx context.Context) error {
|
executor := r.NewPlanExecutor(plan).Finally(func(ctx context.Context) error {
|
||||||
cancel()
|
cancel()
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
|
||||||
ctx = common.WithLoggerHook(ctx, t.LogHook)
|
t.log.Infof("workflow prepared")
|
||||||
|
|
||||||
|
// add logger recorders
|
||||||
|
ctx = common.WithLoggerHook(ctx, t.logHook)
|
||||||
|
|
||||||
|
go t.startReporting(1, ctx)
|
||||||
|
|
||||||
if err := executor(ctx); err != nil {
|
if err := executor(ctx); err != nil {
|
||||||
log.Warnf("workflow execution failed:%v, logs: %d", err, len(t.LogHook.entries))
|
t.reportFailure(ctx, err)
|
||||||
return err
|
return
|
||||||
}
|
}
|
||||||
log.Infof("workflow completed, logs: %d", len(t.LogHook.entries))
|
|
||||||
return nil
|
t.reportSuccess(ctx)
|
||||||
}
|
}
|
||||||
|
|
32
runtime/taskmap.go
Normal file
32
runtime/taskmap.go
Normal file
|
@ -0,0 +1,32 @@
|
||||||
|
package runtime
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"sync"
|
||||||
|
)
|
||||||
|
|
||||||
|
var globalTaskMap sync.Map
|
||||||
|
|
||||||
|
// startTask adds the task to global map
|
||||||
|
func startTask(buildID int64, ctx context.Context) error {
|
||||||
|
_, exist := globalTaskMap.Load(buildID)
|
||||||
|
if exist {
|
||||||
|
return fmt.Errorf("task %d already exists", buildID)
|
||||||
|
}
|
||||||
|
|
||||||
|
task := NewTask(buildID)
|
||||||
|
|
||||||
|
// set task ve to global map
|
||||||
|
// when task is done or canceled, it will be removed from the map
|
||||||
|
globalTaskMap.Store(buildID, task)
|
||||||
|
|
||||||
|
go task.Run(ctx)
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// finishTask removes the task from global map
|
||||||
|
func finishTask(buildID int64) {
|
||||||
|
globalTaskMap.Delete(buildID)
|
||||||
|
}
|
Loading…
Reference in a new issue