chore(piepline): add runtime package.
Signed-off-by: Bo-Yi Wu <appleboy.tw@gmail.com>
This commit is contained in:
parent
bca586ffd0
commit
7d55fd57c9
7 changed files with 99 additions and 211 deletions
|
@ -2,10 +2,15 @@ package client
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
|
||||||
|
v1 "gitea.com/gitea/proto/gen/proto/v1"
|
||||||
)
|
)
|
||||||
|
|
||||||
// A Client manages communication with the runner.
|
// A Client manages communication with the runner.
|
||||||
type Client interface {
|
type Client interface {
|
||||||
// Ping sends a ping message to the server to test connectivity.
|
// Ping sends a ping message to the server to test connectivity.
|
||||||
Ping(ctx context.Context, machine string) error
|
Ping(ctx context.Context, machine string) error
|
||||||
|
|
||||||
|
// Request requests the next available build stage for execution.
|
||||||
|
Request(ctx context.Context) (*v1.Stage, error)
|
||||||
}
|
}
|
||||||
|
|
|
@ -83,3 +83,22 @@ func (p *HTTPClient) Ping(ctx context.Context, machine string) error {
|
||||||
_, err := client.Ping(ctx, req)
|
_, err := client.Ping(ctx, req)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Ping sends a ping message to the server to test connectivity.
|
||||||
|
func (p *HTTPClient) Request(ctx context.Context) (*v1.Stage, error) {
|
||||||
|
client := v1connect.NewRunnerServiceClient(
|
||||||
|
p.Client,
|
||||||
|
p.Endpoint,
|
||||||
|
p.opts...,
|
||||||
|
)
|
||||||
|
req := connect.NewRequest(&v1.ConnectRequest{})
|
||||||
|
|
||||||
|
req.Header().Set("X-Gitea-Token", p.Secret)
|
||||||
|
|
||||||
|
res, err := client.Connect(ctx, req)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return res.Msg.Stage, err
|
||||||
|
}
|
||||||
|
|
214
cmd/damon.go
214
cmd/damon.go
|
@ -7,126 +7,14 @@ import (
|
||||||
"gitea.com/gitea/act_runner/client"
|
"gitea.com/gitea/act_runner/client"
|
||||||
"gitea.com/gitea/act_runner/engine"
|
"gitea.com/gitea/act_runner/engine"
|
||||||
"gitea.com/gitea/act_runner/poller"
|
"gitea.com/gitea/act_runner/poller"
|
||||||
"golang.org/x/sync/errgroup"
|
"gitea.com/gitea/act_runner/runtime"
|
||||||
|
|
||||||
"github.com/joho/godotenv"
|
"github.com/joho/godotenv"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
"github.com/spf13/cobra"
|
"github.com/spf13/cobra"
|
||||||
|
"golang.org/x/sync/errgroup"
|
||||||
)
|
)
|
||||||
|
|
||||||
// type Message struct {
|
|
||||||
// Version int //
|
|
||||||
// Type int // message type, 1 register 2 error
|
|
||||||
// RunnerUUID string // runner uuid
|
|
||||||
// BuildUUID string // build uuid
|
|
||||||
// ErrCode int // error code
|
|
||||||
// ErrContent string // errors message
|
|
||||||
// EventName string
|
|
||||||
// EventPayload string
|
|
||||||
// JobID string // only run the special job, empty means run all the jobs
|
|
||||||
// }
|
|
||||||
|
|
||||||
// const (
|
|
||||||
// MsgTypeRegister = iota + 1 // register
|
|
||||||
// MsgTypeError // error
|
|
||||||
// MsgTypeRequestBuild // request build task
|
|
||||||
// MsgTypeIdle // no task
|
|
||||||
// MsgTypeBuildResult // build result
|
|
||||||
// )
|
|
||||||
|
|
||||||
// func handleVersion1(ctx context.Context, conn *websocket.Conn, message []byte, msg *Message) error {
|
|
||||||
// switch msg.Type {
|
|
||||||
// case MsgTypeRegister:
|
|
||||||
// log.Info().Msgf("received registered success: %s", message)
|
|
||||||
// return conn.WriteJSON(&Message{
|
|
||||||
// Version: 1,
|
|
||||||
// Type: MsgTypeRequestBuild,
|
|
||||||
// RunnerUUID: msg.RunnerUUID,
|
|
||||||
// })
|
|
||||||
// case MsgTypeError:
|
|
||||||
// log.Info().Msgf("received error msessage: %s", message)
|
|
||||||
// return conn.WriteJSON(&Message{
|
|
||||||
// Version: 1,
|
|
||||||
// Type: MsgTypeRequestBuild,
|
|
||||||
// RunnerUUID: msg.RunnerUUID,
|
|
||||||
// })
|
|
||||||
// case MsgTypeIdle:
|
|
||||||
// log.Info().Msgf("received no task")
|
|
||||||
// return conn.WriteJSON(&Message{
|
|
||||||
// Version: 1,
|
|
||||||
// Type: MsgTypeRequestBuild,
|
|
||||||
// RunnerUUID: msg.RunnerUUID,
|
|
||||||
// })
|
|
||||||
// case MsgTypeRequestBuild:
|
|
||||||
// switch msg.EventName {
|
|
||||||
// case "push":
|
|
||||||
// input := Input{
|
|
||||||
// forgeInstance: "github.com",
|
|
||||||
// reuseContainers: true,
|
|
||||||
// }
|
|
||||||
|
|
||||||
// ctx, cancel := context.WithTimeout(ctx, time.Hour)
|
|
||||||
// defer cancel()
|
|
||||||
|
|
||||||
// done := make(chan error)
|
|
||||||
// go func(chan error) {
|
|
||||||
// done <- runTask(ctx, &input, "")
|
|
||||||
// }(done)
|
|
||||||
|
|
||||||
// c := time.NewTicker(time.Second)
|
|
||||||
// defer c.Stop()
|
|
||||||
|
|
||||||
// for {
|
|
||||||
// select {
|
|
||||||
// case <-ctx.Done():
|
|
||||||
// cancel()
|
|
||||||
// log.Info().Msgf("cancel task")
|
|
||||||
// return nil
|
|
||||||
// case err := <-done:
|
|
||||||
// if err != nil {
|
|
||||||
// log.Error().Msgf("runTask failed: %v", err)
|
|
||||||
// return conn.WriteJSON(&Message{
|
|
||||||
// Version: 1,
|
|
||||||
// Type: MsgTypeBuildResult,
|
|
||||||
// RunnerUUID: msg.RunnerUUID,
|
|
||||||
// BuildUUID: msg.BuildUUID,
|
|
||||||
// ErrCode: 1,
|
|
||||||
// ErrContent: err.Error(),
|
|
||||||
// })
|
|
||||||
// }
|
|
||||||
// log.Error().Msgf("runTask success")
|
|
||||||
// return conn.WriteJSON(&Message{
|
|
||||||
// Version: 1,
|
|
||||||
// Type: MsgTypeBuildResult,
|
|
||||||
// RunnerUUID: msg.RunnerUUID,
|
|
||||||
// BuildUUID: msg.BuildUUID,
|
|
||||||
// })
|
|
||||||
// case <-c.C:
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// default:
|
|
||||||
// return fmt.Errorf("unknow event %s with payload %s", msg.EventName, msg.EventPayload)
|
|
||||||
// }
|
|
||||||
// default:
|
|
||||||
// return fmt.Errorf("received a message with an unsupported type: %#v", msg)
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
|
|
||||||
// // TODO: handle the message
|
|
||||||
// func handleMessage(ctx context.Context, conn *websocket.Conn, message []byte) error {
|
|
||||||
// var msg Message
|
|
||||||
// if err := json.Unmarshal(message, &msg); err != nil {
|
|
||||||
// return fmt.Errorf("unmarshal received message faild: %v", err)
|
|
||||||
// }
|
|
||||||
|
|
||||||
// switch msg.Version {
|
|
||||||
// case 1:
|
|
||||||
// return handleVersion1(ctx, conn, message, &msg)
|
|
||||||
// default:
|
|
||||||
// return fmt.Errorf("recevied a message with an unsupported version, consider upgrade your runner")
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
|
|
||||||
func runDaemon(ctx context.Context, input *Input) func(cmd *cobra.Command, args []string) error {
|
func runDaemon(ctx context.Context, input *Input) func(cmd *cobra.Command, args []string) error {
|
||||||
return func(cmd *cobra.Command, args []string) error {
|
return func(cmd *cobra.Command, args []string) error {
|
||||||
log.Infoln("Starting runner daemon")
|
log.Infoln("Starting runner daemon")
|
||||||
|
@ -202,7 +90,13 @@ func runDaemon(ctx context.Context, input *Input) func(cmd *cobra.Command, args
|
||||||
|
|
||||||
var g errgroup.Group
|
var g errgroup.Group
|
||||||
|
|
||||||
poller := poller.New(cli)
|
runner := &runtime.Runner{
|
||||||
|
Client: cli,
|
||||||
|
Machine: cfg.Runner.Name,
|
||||||
|
Environ: cfg.Runner.Environ,
|
||||||
|
}
|
||||||
|
|
||||||
|
poller := poller.New(cli, runner.Run)
|
||||||
|
|
||||||
g.Go(func() error {
|
g.Go(func() error {
|
||||||
log.WithField("capacity", cfg.Runner.Capacity).
|
log.WithField("capacity", cfg.Runner.Capacity).
|
||||||
|
@ -221,95 +115,5 @@ func runDaemon(ctx context.Context, input *Input) func(cmd *cobra.Command, args
|
||||||
Errorln("shutting down the server")
|
Errorln("shutting down the server")
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
// var conn *websocket.Conn
|
|
||||||
// var err error
|
|
||||||
// ticker := time.NewTicker(time.Second)
|
|
||||||
// defer ticker.Stop()
|
|
||||||
// var failedCnt int
|
|
||||||
// for {
|
|
||||||
// select {
|
|
||||||
// case <-ctx.Done():
|
|
||||||
// log.Info().Msgf("cancel task")
|
|
||||||
// if conn != nil {
|
|
||||||
// err = conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
|
|
||||||
// if err != nil {
|
|
||||||
// log.Error().Msgf("write close: %v", err)
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// if errors.Is(ctx.Err(), context.Canceled) {
|
|
||||||
// return nil
|
|
||||||
// }
|
|
||||||
// return ctx.Err()
|
|
||||||
// case <-ticker.C:
|
|
||||||
// if conn == nil {
|
|
||||||
// log.Trace().Msgf("trying connect %v", "ws://localhost:3000/api/actions")
|
|
||||||
// conn, _, err = websocket.DefaultDialer.DialContext(ctx, "ws://localhost:3000/api/actions", nil)
|
|
||||||
// if err != nil {
|
|
||||||
// log.Error().Msgf("dial: %v", err)
|
|
||||||
// break
|
|
||||||
// }
|
|
||||||
|
|
||||||
// // register the client
|
|
||||||
// msg := Message{
|
|
||||||
// Version: 1,
|
|
||||||
// Type: MsgTypeRegister,
|
|
||||||
// RunnerUUID: "111111",
|
|
||||||
// }
|
|
||||||
// bs, err := json.Marshal(&msg)
|
|
||||||
// if err != nil {
|
|
||||||
// log.Error().Msgf("Marshal: %v", err)
|
|
||||||
// break
|
|
||||||
// }
|
|
||||||
|
|
||||||
// if err = conn.WriteMessage(websocket.TextMessage, bs); err != nil {
|
|
||||||
// log.Error().Msgf("register failed: %v", err)
|
|
||||||
// conn.Close()
|
|
||||||
// conn = nil
|
|
||||||
// break
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
|
|
||||||
// const timeout = time.Second * 10
|
|
||||||
|
|
||||||
// for {
|
|
||||||
// select {
|
|
||||||
// case <-ctx.Done():
|
|
||||||
// log.Info().Msg("cancel task")
|
|
||||||
// return nil
|
|
||||||
// default:
|
|
||||||
// }
|
|
||||||
|
|
||||||
// _ = conn.SetReadDeadline(time.Now().Add(timeout))
|
|
||||||
// conn.SetPongHandler(func(string) error {
|
|
||||||
// return conn.SetReadDeadline(time.Now().Add(timeout))
|
|
||||||
// })
|
|
||||||
|
|
||||||
// _, message, err := conn.ReadMessage()
|
|
||||||
// if err != nil {
|
|
||||||
// if websocket.IsCloseError(err, websocket.CloseAbnormalClosure) ||
|
|
||||||
// websocket.IsCloseError(err, websocket.CloseNormalClosure) {
|
|
||||||
// log.Trace().Msgf("closed from remote")
|
|
||||||
// conn.Close()
|
|
||||||
// conn = nil
|
|
||||||
// } else if !strings.Contains(err.Error(), "i/o timeout") {
|
|
||||||
// log.Error().Msgf("read message failed: %#v", err)
|
|
||||||
// }
|
|
||||||
// failedCnt++
|
|
||||||
// if failedCnt > 60 {
|
|
||||||
// if conn != nil {
|
|
||||||
// conn.Close()
|
|
||||||
// conn = nil
|
|
||||||
// }
|
|
||||||
// failedCnt = 0
|
|
||||||
// }
|
|
||||||
// break
|
|
||||||
// }
|
|
||||||
|
|
||||||
// if err := handleMessage(ctx, conn, message); err != nil {
|
|
||||||
// log.Error().Msgf(err.Error())
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
2
go.mod
2
go.mod
|
@ -3,7 +3,7 @@ module gitea.com/gitea/act_runner
|
||||||
go 1.18
|
go 1.18
|
||||||
|
|
||||||
require (
|
require (
|
||||||
gitea.com/gitea/proto v0.0.0-20220813120843-ce4b5dd68c1f
|
gitea.com/gitea/proto v0.0.0-20220814042910-32799131d693
|
||||||
github.com/bufbuild/connect-go v0.3.0
|
github.com/bufbuild/connect-go v0.3.0
|
||||||
github.com/docker/docker v20.10.17+incompatible
|
github.com/docker/docker v20.10.17+incompatible
|
||||||
github.com/joho/godotenv v1.4.0
|
github.com/joho/godotenv v1.4.0
|
||||||
|
|
4
go.sum
4
go.sum
|
@ -25,8 +25,8 @@ cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohl
|
||||||
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
|
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
|
||||||
gitea.com/gitea/act v0.0.0-20220812010840-c467b06265ee h1:T0wftx4RaYqbTH4t0A7bXGXxemZloCrjReA7xJvIVdY=
|
gitea.com/gitea/act v0.0.0-20220812010840-c467b06265ee h1:T0wftx4RaYqbTH4t0A7bXGXxemZloCrjReA7xJvIVdY=
|
||||||
gitea.com/gitea/act v0.0.0-20220812010840-c467b06265ee/go.mod h1:G37Vfz4J6kJ5NbcPI5xQUkeWPVkUCP5J+MFkaWU9jNY=
|
gitea.com/gitea/act v0.0.0-20220812010840-c467b06265ee/go.mod h1:G37Vfz4J6kJ5NbcPI5xQUkeWPVkUCP5J+MFkaWU9jNY=
|
||||||
gitea.com/gitea/proto v0.0.0-20220813120843-ce4b5dd68c1f h1:o1fHWLbdhicM5Q6EXvpBLrpu/fVqlobYw1sU1YKSreM=
|
gitea.com/gitea/proto v0.0.0-20220814042910-32799131d693 h1:innZDNfMfvOBnvnfuFqCpk2XesFUKyaeFiEbNSsaxsA=
|
||||||
gitea.com/gitea/proto v0.0.0-20220813120843-ce4b5dd68c1f/go.mod h1:LWD9G0VCMxaDY4I+J3vSqJF5OYNum33pQtRpx43516s=
|
gitea.com/gitea/proto v0.0.0-20220814042910-32799131d693/go.mod h1:LWD9G0VCMxaDY4I+J3vSqJF5OYNum33pQtRpx43516s=
|
||||||
github.com/Azure/azure-sdk-for-go v16.2.1+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc=
|
github.com/Azure/azure-sdk-for-go v16.2.1+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc=
|
||||||
github.com/Azure/go-ansiterm v0.0.0-20170929234023-d6e3b3328b78 h1:w+iIsaOQNcT7OZ575w+acHgRric5iCyQh+xv+KJ4HB8=
|
github.com/Azure/go-ansiterm v0.0.0-20170929234023-d6e3b3328b78 h1:w+iIsaOQNcT7OZ575w+acHgRric5iCyQh+xv+KJ4HB8=
|
||||||
github.com/Azure/go-ansiterm v0.0.0-20170929234023-d6e3b3328b78/go.mod h1:LmzpDX56iTiv29bbRTIsUNlaFfuhWRQBWjQdVyAevI8=
|
github.com/Azure/go-ansiterm v0.0.0-20170929234023-d6e3b3328b78/go.mod h1:LmzpDX56iTiv29bbRTIsUNlaFfuhWRQBWjQdVyAevI8=
|
||||||
|
|
|
@ -2,22 +2,26 @@ package poller
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"errors"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
v1 "gitea.com/gitea/proto/gen/proto/v1"
|
||||||
"gitea.com/gitea/act_runner/client"
|
"gitea.com/gitea/act_runner/client"
|
||||||
|
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
)
|
)
|
||||||
|
|
||||||
func New(cli client.Client) *Poller {
|
func New(cli client.Client, dispatch func(context.Context, *v1.Stage) error) *Poller {
|
||||||
return &Poller{
|
return &Poller{
|
||||||
Client: cli,
|
Client: cli,
|
||||||
|
Dispatch: dispatch,
|
||||||
routineGroup: newRoutineGroup(),
|
routineGroup: newRoutineGroup(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
type Poller struct {
|
type Poller struct {
|
||||||
Client client.Client
|
Client client.Client
|
||||||
|
Dispatch func(context.Context, *v1.Stage) error
|
||||||
|
|
||||||
routineGroup *routineGroup
|
routineGroup *routineGroup
|
||||||
}
|
}
|
||||||
|
@ -53,5 +57,21 @@ func (p *Poller) poll(ctx context.Context, thread int) error {
|
||||||
// TODO: fetch the job from remote server
|
// TODO: fetch the job from remote server
|
||||||
time.Sleep(time.Second)
|
time.Sleep(time.Second)
|
||||||
|
|
||||||
|
// request a new build stage for execution from the central
|
||||||
|
// build server.
|
||||||
|
stage, err := p.Client.Request(ctx)
|
||||||
|
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
|
||||||
|
log.WithError(err).Trace("poller: no stage returned")
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
if err != nil {
|
||||||
|
log.WithError(err).Error("poller: cannot request stage")
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if stage == nil || stage.BuildUuid == "" {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return p.Dispatch(ctx, stage)
|
||||||
|
}
|
||||||
|
|
40
runtime/runtime.go
Normal file
40
runtime/runtime.go
Normal file
|
@ -0,0 +1,40 @@
|
||||||
|
package runtime
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
v1 "gitea.com/gitea/proto/gen/proto/v1"
|
||||||
|
"gitea.com/gitea/act_runner/client"
|
||||||
|
|
||||||
|
"github.com/sirupsen/logrus"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Runner runs the pipeline.
|
||||||
|
type Runner struct {
|
||||||
|
Machine string
|
||||||
|
Environ map[string]string
|
||||||
|
Client client.Client
|
||||||
|
}
|
||||||
|
|
||||||
|
// Run runs the pipeline stage.
|
||||||
|
func (s *Runner) Run(ctx context.Context, stage *v1.Stage) error {
|
||||||
|
l := logrus.
|
||||||
|
WithField("stage.build_uuid", stage.BuildUuid).
|
||||||
|
WithField("stage.runner_uuid", stage.RunnerUuid)
|
||||||
|
|
||||||
|
l.Info("stage received")
|
||||||
|
// TODO: Update stage structure
|
||||||
|
|
||||||
|
return s.run(ctx, stage)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Runner) run(ctx context.Context, stage *v1.Stage) error {
|
||||||
|
l := logrus.
|
||||||
|
WithField("stage.build_uuid", stage.BuildUuid).
|
||||||
|
WithField("stage.runner_uuid", stage.RunnerUuid)
|
||||||
|
|
||||||
|
l.Info("start running pipeline")
|
||||||
|
// TODO: docker runner with stage data
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
Loading…
Reference in a new issue