From c5edbbd004418890b593dd9217160b02f5f2143d Mon Sep 17 00:00:00 2001 From: Gitea Date: Tue, 23 Aug 2022 20:34:47 +0800 Subject: [PATCH] feat: move main task logic to runtime package --- cmd/{damon.go => daemon.go} | 39 ++----- cmd/root.go | 208 +++++------------------------------- engine/engine.go | 43 ++++++++ runtime/task.go | 202 ++++++++++++++++++++++++++++++++++ 4 files changed, 277 insertions(+), 215 deletions(-) rename cmd/{damon.go => daemon.go} (69%) create mode 100644 engine/engine.go create mode 100644 runtime/task.go diff --git a/cmd/damon.go b/cmd/daemon.go similarity index 69% rename from cmd/damon.go rename to cmd/daemon.go index 9971373..8c0fbfd 100644 --- a/cmd/damon.go +++ b/cmd/daemon.go @@ -15,11 +15,11 @@ import ( "golang.org/x/sync/errgroup" ) -func runDaemon(ctx context.Context, input *Input) func(cmd *cobra.Command, args []string) error { +func runDaemon(ctx context.Context, task *runtime.Task) func(cmd *cobra.Command, args []string) error { return func(cmd *cobra.Command, args []string) error { log.Infoln("Starting runner daemon") - _ = godotenv.Load(input.envFile) + _ = godotenv.Load(task.Input.EnvFile) cfg, err := fromEnviron() if err != nil { log.WithError(err). @@ -28,36 +28,10 @@ func runDaemon(ctx context.Context, input *Input) func(cmd *cobra.Command, args initLogging(cfg) - engine, err := engine.New() - if err != nil { - log.WithError(err). - Fatalln("cannot load the docker engine") - } - - count := 0 - for { - err := engine.Ping(ctx) - if err == context.Canceled { - break - } - select { - case <-ctx.Done(): - return ctx.Err() - default: - } - if err != nil { - log.WithError(err). - Errorln("cannot ping the docker daemon") - count++ - if count == 5 { - log.WithError(err). - Fatalf("retry count reached: %d", count) - } - time.Sleep(time.Second) - } else { - log.Infoln("successfully pinged the docker daemon") - break - } + // try to connect to docker daemon + // if failed, exit with error + if err := engine.Start(ctx); err != nil { + log.WithError(err).Fatalln("failed to connect docker daemon engine") } cli := client.New( @@ -81,6 +55,7 @@ func runDaemon(ctx context.Context, input *Input) func(cmd *cobra.Command, args if err != nil { log.WithError(err). Errorln("cannot ping the remote server") + // TODO: if ping failed, retry or exit time.Sleep(time.Second) } else { log.Infoln("successfully pinged the remote server") diff --git a/cmd/root.go b/cmd/root.go index 0ba2198..186b1bb 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -2,66 +2,17 @@ package cmd import ( "context" - "fmt" "os" - "path/filepath" + "gitea.com/gitea/act_runner/engine" + "gitea.com/gitea/act_runner/runtime" "github.com/mattn/go-isatty" - "github.com/nektos/act/pkg/artifacts" - "github.com/nektos/act/pkg/common" - "github.com/nektos/act/pkg/model" - "github.com/nektos/act/pkg/runner" log "github.com/sirupsen/logrus" "github.com/spf13/cobra" ) const version = "0.1" -type Input struct { - envFile string - actor string - // workdir string - // workflowsPath string - // autodetectEvent bool - // eventPath string - reuseContainers bool - bindWorkdir bool - // secrets []string - // envs []string - // platforms []string - // dryrun bool - forcePull bool - forceRebuild bool - // noOutput bool - // envfile string - // secretfile string - insecureSecrets bool - // defaultBranch string - privileged bool - usernsMode string - containerArchitecture string - containerDaemonSocket string - // noWorkflowRecurse bool - useGitIgnore bool - forgeInstance string - containerCapAdd []string - containerCapDrop []string - autoRemove bool - artifactServerPath string - artifactServerPort string - jsonLogger bool - noSkipCheckout bool - // remoteName string -} - -func (i *Input) newPlatforms() map[string]string { - return map[string]string{ - "ubuntu-latest": "node:16-buster-slim", - "ubuntu-20.04": "node:16-buster-slim", - "ubuntu-18.04": "node:16-buster-slim", - } -} - // initLogging setup the global logrus logger. func initLogging(cfg Config) { isTerm := isatty.IsTerminal(os.Stdout.Fd()) @@ -79,161 +30,52 @@ func initLogging(cfg Config) { } func Execute(ctx context.Context) { - input := Input{ - reuseContainers: true, - forgeInstance: "gitea.com", - } + task := runtime.NewTask() + + // ./act_runner rootCmd := &cobra.Command{ Use: "act [event name to run]\nIf no event name passed, will default to \"on: push\"", Short: "Run GitHub actions locally by specifying the event name (e.g. `push`) or an action name directly.", Args: cobra.MaximumNArgs(1), - RunE: runCommand(ctx, &input), + RunE: runRoot(ctx, task), Version: version, SilenceUsage: true, } - rootCmd.AddCommand(&cobra.Command{ + rootCmd.Flags().BoolP("run", "r", false, "run workflows") + rootCmd.Flags().StringP("job", "j", "", "run job") + rootCmd.PersistentFlags().StringVarP(&task.Input.ForgeInstance, "forge-instance", "", "github.com", "Forge instance to use.") + rootCmd.PersistentFlags().StringVarP(&task.Input.EnvFile, "env-file", "", ".env", "Read in a file of environment variables.") + + // ./act_runner daemon + daemonCmd := &cobra.Command{ Aliases: []string{"daemon"}, Use: "daemon [event name to run]\nIf no event name passed, will default to \"on: push\"", Short: "Run GitHub actions locally by specifying the event name (e.g. `push`) or an action name directly.", Args: cobra.MaximumNArgs(1), - RunE: runDaemon(ctx, &input), - }) - rootCmd.Flags().BoolP("run", "r", false, "run workflows") - rootCmd.Flags().StringP("job", "j", "", "run job") - rootCmd.PersistentFlags().StringVarP(&input.forgeInstance, "forge-instance", "", "github.com", "Forge instance to use.") - rootCmd.PersistentFlags().StringVarP(&input.envFile, "env-file", "", ".env", "Read in a file of environment variables.") + RunE: runDaemon(ctx, task), + } + rootCmd.AddCommand(daemonCmd) if err := rootCmd.Execute(); err != nil { os.Exit(1) } } -// getWorkflowsPath return the workflows directory, it will try .gitea first and then fallback to .github -func getWorkflowsPath() (string, error) { - dir, err := os.Getwd() - if err != nil { - return "", err - } - p := filepath.Join(dir, ".gitea/workflows") - _, err = os.Stat(p) - if err != nil { - if !os.IsNotExist(err) { - return "", err - } - return filepath.Join(dir, ".github/workflows"), nil - } - return p, nil -} - -func runTask(ctx context.Context, input *Input, jobID string) error { - workflowsPath, err := getWorkflowsPath() - if err != nil { - return err - } - planner, err := model.NewWorkflowPlanner(workflowsPath, false) - if err != nil { - return err - } - - var eventName string - events := planner.GetEvents() - if len(events) > 0 { - // set default event type to first event - // this way user dont have to specify the event. - log.Debugf("Using detected workflow event: %s", events[0]) - eventName = events[0] - } else { - if plan := planner.PlanEvent("push"); plan != nil { - eventName = "push" - } - } - - // build the plan for this run - var plan *model.Plan - if jobID != "" { - log.Debugf("Planning job: %s", jobID) - plan = planner.PlanJob(jobID) - } else { - log.Debugf("Planning event: %s", eventName) - plan = planner.PlanEvent(eventName) - } - - curDir, err := os.Getwd() - if err != nil { - return err - } - - // run the plan - config := &runner.Config{ - Actor: input.actor, - EventName: eventName, - EventPath: "", - DefaultBranch: "", - ForcePull: input.forcePull, - ForceRebuild: input.forceRebuild, - ReuseContainers: input.reuseContainers, - Workdir: curDir, - BindWorkdir: input.bindWorkdir, - LogOutput: true, - JSONLogger: input.jsonLogger, - // Env: envs, - // Secrets: secrets, - InsecureSecrets: input.insecureSecrets, - Platforms: input.newPlatforms(), - Privileged: input.privileged, - UsernsMode: input.usernsMode, - ContainerArchitecture: input.containerArchitecture, - ContainerDaemonSocket: input.containerDaemonSocket, - UseGitIgnore: input.useGitIgnore, - GitHubInstance: input.forgeInstance, - ContainerCapAdd: input.containerCapAdd, - ContainerCapDrop: input.containerCapDrop, - AutoRemove: input.autoRemove, - ArtifactServerPath: input.artifactServerPath, - ArtifactServerPort: input.artifactServerPort, - NoSkipCheckout: input.noSkipCheckout, - // RemoteName: input.remoteName, - } - r, err := runner.New(config) - if err != nil { - return fmt.Errorf("New config failed: %v", err) - } - - cancel := artifacts.Serve(ctx, input.artifactServerPath, input.artifactServerPort) - - executor := r.NewPlanExecutor(plan).Finally(func(ctx context.Context) error { - cancel() - return nil - }) - - outputHook := new(taskLogHook) - ctx = common.WithLoggerHook(ctx, outputHook) - return executor(ctx) -} - -type taskLogHook struct{} - -func (h *taskLogHook) Levels() []log.Level { - return log.AllLevels -} - -func (h *taskLogHook) Fire(entry *log.Entry) error { - if flag, ok := entry.Data["raw_output"]; ok { - if flagVal, ok := flag.(bool); flagVal && ok { - log.Infof("task log: %s", entry.Message) - } - } - return nil -} - -func runCommand(ctx context.Context, input *Input) func(cmd *cobra.Command, args []string) error { +func runRoot(ctx context.Context, task *runtime.Task) func(cmd *cobra.Command, args []string) error { return func(cmd *cobra.Command, args []string) error { jobID, err := cmd.Flags().GetString("job") if err != nil { return err } - return runTask(ctx, input, jobID) + // try to connect to docker daemon + // if failed, exit with error + if err := engine.Start(ctx); err != nil { + log.WithError(err).Fatalln("failed to connect docker daemon engine") + } + + task.JobID = jobID + return task.Run(ctx) } } diff --git a/engine/engine.go b/engine/engine.go new file mode 100644 index 0000000..6a37b4b --- /dev/null +++ b/engine/engine.go @@ -0,0 +1,43 @@ +package engine + +import ( + "context" + "fmt" + "time" + + log "github.com/sirupsen/logrus" +) + +// Start start docker engine api loop +func Start(ctx context.Context) error { + engine, err := New() + if err != nil { + return err + } + + count := 0 + for { + err := engine.Ping(ctx) + if err == context.Canceled { + break + } + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + if err != nil { + log.WithError(err). + Errorln("cannot ping the docker daemon") + count++ + if count == 5 { + return fmt.Errorf("retry connect to docker daemon failed: %d times", count) + } + time.Sleep(time.Second) + } else { + log.Infoln("successfully ping the docker daemon") + break + } + } + return nil +} diff --git a/runtime/task.go b/runtime/task.go new file mode 100644 index 0000000..1f632f1 --- /dev/null +++ b/runtime/task.go @@ -0,0 +1,202 @@ +package runtime + +import ( + "context" + "fmt" + "os" + "path/filepath" + + "github.com/nektos/act/pkg/artifacts" + "github.com/nektos/act/pkg/common" + "github.com/nektos/act/pkg/model" + "github.com/nektos/act/pkg/runner" + log "github.com/sirupsen/logrus" +) + +type TaskInput struct { + repoDirectory string + actor string + // workdir string + // workflowsPath string + // autodetectEvent bool + // eventPath string + reuseContainers bool + bindWorkdir bool + // secrets []string + // envs []string + // platforms []string + // dryrun bool + forcePull bool + forceRebuild bool + // noOutput bool + // envfile string + // secretfile string + insecureSecrets bool + // defaultBranch string + privileged bool + usernsMode string + containerArchitecture string + containerDaemonSocket string + // noWorkflowRecurse bool + useGitIgnore bool + containerCapAdd []string + containerCapDrop []string + autoRemove bool + artifactServerPath string + artifactServerPort string + jsonLogger bool + noSkipCheckout bool + // remoteName string + + ForgeInstance string + EnvFile string +} + +type taskLogHook struct { + entries []*log.Entry +} + +func (h *taskLogHook) Levels() []log.Level { + return log.AllLevels +} + +func (h *taskLogHook) Fire(entry *log.Entry) error { + if flag, ok := entry.Data["raw_output"]; ok { + if flagVal, ok := flag.(bool); flagVal && ok { + log.Infof("task log: %s", entry.Message) + h.entries = append(h.entries, entry) + } + } + return nil +} + +type Task struct { + JobID string + Input *TaskInput + LogHook *taskLogHook +} + +func NewTask() *Task { + task := &Task{ + Input: &TaskInput{ + reuseContainers: true, + ForgeInstance: "gitea", + }, + LogHook: &taskLogHook{}, + } + task.Input.repoDirectory, _ = os.Getwd() + return task +} + +// getWorkflowsPath return the workflows directory, it will try .gitea first and then fallback to .github +func getWorkflowsPath(dir string) (string, error) { + p := filepath.Join(dir, ".gitea/workflows") + _, err := os.Stat(p) + if err != nil { + if !os.IsNotExist(err) { + return "", err + } + return filepath.Join(dir, ".github/workflows"), nil + } + return p, nil +} + +func demoPlatforms() map[string]string { + return map[string]string{ + "ubuntu-latest": "node:16-buster-slim", + "ubuntu-20.04": "node:16-buster-slim", + "ubuntu-18.04": "node:16-buster-slim", + } +} + +func (t *Task) Run(ctx context.Context) error { + workflowsPath, err := getWorkflowsPath(t.Input.repoDirectory) + if err != nil { + return err + } + planner, err := model.NewWorkflowPlanner(workflowsPath, false) + if err != nil { + return err + } + + var eventName string + events := planner.GetEvents() + if len(events) > 0 { + // set default event type to first event + // this way user dont have to specify the event. + log.Debugf("Using detected workflow event: %s", events[0]) + eventName = events[0] + } else { + if plan := planner.PlanEvent("push"); plan != nil { + eventName = "push" + } + } + + // build the plan for this run + var plan *model.Plan + var jobID = t.JobID + if jobID != "" { + log.Debugf("Planning job: %s", jobID) + plan = planner.PlanJob(jobID) + } else { + log.Debugf("Planning event: %s", eventName) + plan = planner.PlanEvent(eventName) + } + + curDir, err := os.Getwd() + if err != nil { + return err + } + + // run the plan + input := t.Input + config := &runner.Config{ + Actor: input.actor, + EventName: eventName, + EventPath: "", + DefaultBranch: "", + ForcePull: input.forcePull, + ForceRebuild: input.forceRebuild, + ReuseContainers: input.reuseContainers, + Workdir: curDir, + BindWorkdir: input.bindWorkdir, + LogOutput: true, + JSONLogger: input.jsonLogger, + // Env: envs, + // Secrets: secrets, + InsecureSecrets: input.insecureSecrets, + Platforms: demoPlatforms(), + Privileged: input.privileged, + UsernsMode: input.usernsMode, + ContainerArchitecture: input.containerArchitecture, + ContainerDaemonSocket: input.containerDaemonSocket, + UseGitIgnore: input.useGitIgnore, + GitHubInstance: input.ForgeInstance, + ContainerCapAdd: input.containerCapAdd, + ContainerCapDrop: input.containerCapDrop, + AutoRemove: input.autoRemove, + ArtifactServerPath: input.artifactServerPath, + ArtifactServerPort: input.artifactServerPort, + NoSkipCheckout: input.noSkipCheckout, + // RemoteName: input.remoteName, + } + r, err := runner.New(config) + if err != nil { + return fmt.Errorf("new config failed: %v", err) + } + + cancel := artifacts.Serve(ctx, input.artifactServerPath, input.artifactServerPort) + + executor := r.NewPlanExecutor(plan).Finally(func(ctx context.Context) error { + cancel() + return nil + }) + + ctx = common.WithLoggerHook(ctx, t.LogHook) + if err := executor(ctx); err != nil { + log.Warnf("workflow execution failed:%v, logs: %d", err, len(t.LogHook.entries)) + return err + } + log.Infof("workflow completed, logs: %d", len(t.LogHook.entries)) + return nil +}