Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
refactor provisioners to use new protocol
Signed-off-by: Spike Curtis <spike@coder.com>
  • Loading branch information
spikecurtis committed Aug 23, 2023
commit a29592ab53d720541d67f62a7229b95165ec1964
293 changes: 154 additions & 139 deletions provisioner/echo/serve.go

Large diffs are not rendered by default.

306 changes: 173 additions & 133 deletions provisioner/echo/serve_test.go

Large diffs are not rendered by default.

122 changes: 45 additions & 77 deletions provisioner/terraform/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
)

type executor struct {
logger slog.Logger
server *server
mut *sync.Mutex
binaryPath string
Expand All @@ -50,8 +51,10 @@ func (e *executor) execWriteOutput(ctx, killCtx context.Context, args, env []str
ctx, span := e.server.startTrace(ctx, fmt.Sprintf("exec - terraform %s", args[0]))
defer span.End()
span.SetAttributes(attribute.StringSlice("args", args))
e.logger.Debug(ctx, "starting command", slog.F("args", args))

defer func() {
e.logger.Debug(ctx, "closing writers", slog.Error(err))
closeErr := stdOutWriter.Close()
if err == nil && closeErr != nil {
err = closeErr
Expand All @@ -62,6 +65,7 @@ func (e *executor) execWriteOutput(ctx, killCtx context.Context, args, env []str
}
}()
if ctx.Err() != nil {
e.logger.Debug(ctx, "context canceled before command started", slog.F("args", args))
return ctx.Err()
}

Expand Down Expand Up @@ -90,11 +94,14 @@ func (e *executor) execWriteOutput(ctx, killCtx context.Context, args, env []str
)
err = cmd.Start()
if err != nil {
e.logger.Debug(ctx, "failed to start command", slog.F("args", args))
return err
}
interruptCommandOnCancel(ctx, killCtx, cmd)
interruptCommandOnCancel(ctx, killCtx, e.logger, cmd)

return cmd.Wait()
err = cmd.Wait()
e.logger.Debug(ctx, "command done", slog.F("args", args), slog.Error(err))
return err
}

// execParseJSON must only be called while the lock is held.
Expand All @@ -120,7 +127,7 @@ func (e *executor) execParseJSON(ctx, killCtx context.Context, args, env []strin
if err != nil {
return err
}
interruptCommandOnCancel(ctx, killCtx, cmd)
interruptCommandOnCancel(ctx, killCtx, e.logger, cmd)

err = cmd.Wait()
if err != nil {
Expand Down Expand Up @@ -207,15 +214,23 @@ func (e *executor) init(ctx, killCtx context.Context, logr logSink) error {
return e.execWriteOutput(ctx, killCtx, args, e.basicEnv(), outWriter, errWriter)
}

func getPlanFilePath(workdir string) string {
return filepath.Join(workdir, "terraform.tfplan")
}

func getStateFilePath(workdir string) string {
return filepath.Join(workdir, "terraform.tfstate")
}

// revive:disable-next-line:flag-parameter
func (e *executor) plan(ctx, killCtx context.Context, env, vars []string, logr logSink, destroy bool) (*proto.Provision_Response, error) {
func (e *executor) plan(ctx, killCtx context.Context, env, vars []string, logr logSink, destroy bool) (*proto.PlanComplete, error) {
ctx, span := e.server.startTrace(ctx, tracing.FuncName())
defer span.End()

e.mut.Lock()
defer e.mut.Unlock()

planfilePath := filepath.Join(e.workdir, "terraform.tfplan")
planfilePath := getPlanFilePath(e.workdir)
args := []string{
"plan",
"-no-color",
Expand Down Expand Up @@ -248,19 +263,10 @@ func (e *executor) plan(ctx, killCtx context.Context, env, vars []string, logr l
if err != nil {
return nil, err
}
planFileByt, err := os.ReadFile(planfilePath)
if err != nil {
return nil, err
}
return &proto.Provision_Response{
Type: &proto.Provision_Response_Complete{
Complete: &proto.Provision_Complete{
Parameters: state.Parameters,
Resources: state.Resources,
GitAuthProviders: state.GitAuthProviders,
Plan: planFileByt,
},
},
return &proto.PlanComplete{
Parameters: state.Parameters,
Resources: state.Resources,
GitAuthProviders: state.GitAuthProviders,
}, nil
}

Expand Down Expand Up @@ -344,7 +350,7 @@ func (e *executor) graph(ctx, killCtx context.Context) (string, error) {
if err != nil {
return "", err
}
interruptCommandOnCancel(ctx, killCtx, cmd)
interruptCommandOnCancel(ctx, killCtx, e.logger, cmd)

err = cmd.Wait()
if err != nil {
Expand All @@ -355,33 +361,22 @@ func (e *executor) graph(ctx, killCtx context.Context) (string, error) {

func (e *executor) apply(
ctx, killCtx context.Context,
plan []byte,
env []string,
logr logSink,
) (*proto.Provision_Response, error) {
) (*proto.ApplyComplete, error) {
ctx, span := e.server.startTrace(ctx, tracing.FuncName())
defer span.End()

e.mut.Lock()
defer e.mut.Unlock()

planFile, err := os.CreateTemp("", "coder-terrafrom-plan")
if err != nil {
return nil, xerrors.Errorf("create plan file: %w", err)
}
_, err = planFile.Write(plan)
if err != nil {
return nil, xerrors.Errorf("write plan file: %w", err)
}
defer os.Remove(planFile.Name())

args := []string{
"apply",
"-no-color",
"-auto-approve",
"-input=false",
"-json",
planFile.Name(),
getPlanFilePath(e.workdir),
}

outWriter, doneOut := provisionLogWriter(logr)
Expand All @@ -393,7 +388,7 @@ func (e *executor) apply(
<-doneErr
}()

err = e.execWriteOutput(ctx, killCtx, args, env, outWriter, errWriter)
err := e.execWriteOutput(ctx, killCtx, args, env, outWriter, errWriter)
if err != nil {
return nil, xerrors.Errorf("terraform apply: %w", err)
}
Expand All @@ -406,15 +401,11 @@ func (e *executor) apply(
if err != nil {
return nil, xerrors.Errorf("read statefile %q: %w", statefilePath, err)
}
return &proto.Provision_Response{
Type: &proto.Provision_Response_Complete{
Complete: &proto.Provision_Complete{
Parameters: state.Parameters,
Resources: state.Resources,
GitAuthProviders: state.GitAuthProviders,
State: stateContent,
},
},
return &proto.ApplyComplete{
Parameters: state.Parameters,
Resources: state.Resources,
GitAuthProviders: state.GitAuthProviders,
State: stateContent,
}, nil
}

Expand Down Expand Up @@ -459,48 +450,28 @@ func (e *executor) state(ctx, killCtx context.Context) (*tfjson.State, error) {
return state, nil
}

func interruptCommandOnCancel(ctx, killCtx context.Context, cmd *exec.Cmd) {
func interruptCommandOnCancel(ctx, killCtx context.Context, logger slog.Logger, cmd *exec.Cmd) {
go func() {
select {
case <-ctx.Done():
var err error
switch runtime.GOOS {
case "windows":
// Interrupts aren't supported by Windows.
_ = cmd.Process.Kill()
err = cmd.Process.Kill()
default:
_ = cmd.Process.Signal(os.Interrupt)
err = cmd.Process.Signal(os.Interrupt)
}
logger.Debug(ctx, "interrupted command", slog.F("args", cmd.Args), slog.Error(err))

case <-killCtx.Done():
logger.Debug(ctx, "kill context ended", slog.F("args", cmd.Args))
}
}()
}

type logSink interface {
Log(*proto.Log)
}

type streamLogSink struct {
// Any errors writing to the stream will be logged to logger.
logger slog.Logger
stream proto.DRPCProvisioner_ProvisionStream
}

var _ logSink = streamLogSink{}

func (s streamLogSink) Log(l *proto.Log) {
err := s.stream.Send(&proto.Provision_Response{
Type: &proto.Provision_Response_Log{
Log: l,
},
})
if err != nil {
s.logger.Warn(context.Background(), "write log to stream",
slog.F("level", l.Level.String()),
slog.F("message", l.Output),
slog.Error(err),
)
}
ProvisionLog(l proto.LogLevel, o string)
}

// logWriter creates a WriteCloser that will log each line of text at the given level. The WriteCloser must be closed
Expand All @@ -524,7 +495,7 @@ func readAndLog(sink logSink, r io.Reader, done chan<- any, level proto.LogLevel
continue
}

sink.Log(&proto.Log{Level: level, Output: scanner.Text()})
sink.ProvisionLog(level, scanner.Text())
continue
}

Expand All @@ -541,7 +512,7 @@ func readAndLog(sink logSink, r io.Reader, done chan<- any, level proto.LogLevel
if logLevel == proto.LogLevel_INFO {
logLevel = proto.LogLevel_DEBUG
}
sink.Log(&proto.Log{Level: logLevel, Output: log.Message})
sink.ProvisionLog(logLevel, log.Message)
}
}

Expand Down Expand Up @@ -586,15 +557,15 @@ func provisionReadAndLog(sink logSink, r io.Reader, done chan<- any) {
}

logLevel := convertTerraformLogLevel(log.Level, sink)
sink.Log(&proto.Log{Level: logLevel, Output: log.Message})
sink.ProvisionLog(logLevel, log.Message)

// If the diagnostic is provided, let's provide a bit more info!
if log.Diagnostic == nil {
continue
}
logLevel = convertTerraformLogLevel(string(log.Diagnostic.Severity), sink)
for _, diagLine := range strings.Split(FormatDiagnostic(log.Diagnostic), "\n") {
sink.Log(&proto.Log{Level: logLevel, Output: diagLine})
sink.ProvisionLog(logLevel, diagLine)
}
}
}
Expand All @@ -612,10 +583,7 @@ func convertTerraformLogLevel(logLevel string, sink logSink) proto.LogLevel {
case "error":
return proto.LogLevel_ERROR
default:
sink.Log(&proto.Log{
Level: proto.LogLevel_WARN,
Output: fmt.Sprintf("unable to convert log level %s", logLevel),
})
sink.ProvisionLog(proto.LogLevel_WARN, fmt.Sprintf("unable to convert log level %s", logLevel))
return proto.LogLevel_INFO
}
}
Expand Down
4 changes: 2 additions & 2 deletions provisioner/terraform/executor_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ type mockLogger struct {

var _ logSink = &mockLogger{}

func (m *mockLogger) Log(l *proto.Log) {
m.logs = append(m.logs, l)
func (m *mockLogger) ProvisionLog(l proto.LogLevel, o string) {
m.logs = append(m.logs, &proto.Log{Level: l, Output: o})
}

func TestLogWriter_Mainline(t *testing.T) {
Expand Down
22 changes: 10 additions & 12 deletions provisioner/terraform/parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,20 @@ import (
"golang.org/x/xerrors"

"github.com/coder/coder/v2/coderd/tracing"
"github.com/coder/coder/v2/provisionersdk"
"github.com/coder/coder/v2/provisionersdk/proto"
)

// Parse extracts Terraform variables from source-code.
func (s *server) Parse(request *proto.Parse_Request, stream proto.DRPCProvisioner_ParseStream) error {
_, span := s.startTrace(stream.Context(), tracing.FuncName())
func (s *server) Parse(sess *provisionersdk.Session, _ *proto.ParseRequest, _ <-chan struct{}) *proto.ParseComplete {
ctx := sess.Context()
_, span := s.startTrace(ctx, tracing.FuncName())
defer span.End()

// Load the module and print any parse errors.
module, diags := tfconfig.LoadModule(request.Directory)
module, diags := tfconfig.LoadModule(sess.WorkDirectory)
if diags.HasErrors() {
return xerrors.Errorf("load module: %s", formatDiagnostics(request.Directory, diags))
return provisionersdk.ParseErrorf("load module: %s", formatDiagnostics(sess.WorkDirectory, diags))
}

// Sort variables by (filename, line) to make the ordering consistent
Expand All @@ -40,17 +42,13 @@ func (s *server) Parse(request *proto.Parse_Request, stream proto.DRPCProvisione
for _, v := range variables {
mv, err := convertTerraformVariable(v)
if err != nil {
return xerrors.Errorf("can't convert the Terraform variable to a managed one: %w", err)
return provisionersdk.ParseErrorf("can't convert the Terraform variable to a managed one: %s", err)
}
templateVariables = append(templateVariables, mv)
}
return stream.Send(&proto.Parse_Response{
Type: &proto.Parse_Response_Complete{
Complete: &proto.Parse_Complete{
TemplateVariables: templateVariables,
},
},
})
return &proto.ParseComplete{
TemplateVariables: templateVariables,
}
}

// Converts a Terraform variable to a template-wide variable, processed by Coder.
Expand Down
Loading