Skip to content

fix: avoid terraform state concurrent access, remove global mutex #5273

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Dec 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 15 additions & 5 deletions cli/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,14 @@ func Server(vip *viper.Viper, newAPI func(context.Context, *coderd.Options) (*co
notifyCtx, notifyStop := signal.NotifyContext(ctx, InterruptSignals...)
defer notifyStop()

// Ensure we have a unique cache directory for this process.
cacheDir := filepath.Join(cfg.CacheDirectory.Value, uuid.NewString())
err = os.MkdirAll(cacheDir, 0o700)
if err != nil {
return xerrors.Errorf("create cache directory: %w", err)
}
defer os.RemoveAll(cacheDir)

// Clean up idle connections at the end, e.g.
// embedded-postgres can leave an idle connection
// which is caught by goleaks.
Expand Down Expand Up @@ -355,7 +363,7 @@ func Server(vip *viper.Viper, newAPI func(context.Context, *coderd.Options) (*co
Database: databasefake.New(),
DERPMap: derpMap,
Pubsub: database.NewPubsubInMemory(),
CacheDir: cfg.CacheDirectory.Value,
CacheDir: cacheDir,
GoogleTokenValidator: googleTokenValidator,
GitAuthConfigs: gitAuthConfigs,
RealIPConfig: realIPConfig,
Expand Down Expand Up @@ -632,7 +640,8 @@ func Server(vip *viper.Viper, newAPI func(context.Context, *coderd.Options) (*co
}()
provisionerdMetrics := provisionerd.NewMetrics(options.PrometheusRegistry)
for i := 0; i < cfg.Provisioner.Daemons.Value; i++ {
daemon, err := newProvisionerDaemon(ctx, coderAPI, provisionerdMetrics, logger, cfg, errCh, false)
daemonCacheDir := filepath.Join(cacheDir, fmt.Sprintf("provisioner-%d", i))
daemon, err := newProvisionerDaemon(ctx, coderAPI, provisionerdMetrics, logger, cfg, daemonCacheDir, errCh, false)
if err != nil {
return xerrors.Errorf("create provisioner daemon: %w", err)
}
Expand Down Expand Up @@ -902,6 +911,7 @@ func newProvisionerDaemon(
metrics provisionerd.Metrics,
logger slog.Logger,
cfg *codersdk.DeploymentConfig,
cacheDir string,
errCh chan error,
dev bool,
) (srv *provisionerd.Server, err error) {
Expand All @@ -912,9 +922,9 @@ func newProvisionerDaemon(
}
}()

err = os.MkdirAll(cfg.CacheDirectory.Value, 0o700)
err = os.MkdirAll(cacheDir, 0o700)
if err != nil {
return nil, xerrors.Errorf("mkdir %q: %w", cfg.CacheDirectory.Value, err)
return nil, xerrors.Errorf("mkdir %q: %w", cacheDir, err)
}

terraformClient, terraformServer := provisionersdk.MemTransportPipe()
Expand All @@ -930,7 +940,7 @@ func newProvisionerDaemon(
ServeOptions: &provisionersdk.ServeOptions{
Listener: terraformServer,
},
CachePath: cfg.CacheDirectory.Value,
CachePath: cacheDir,
Logger: logger,
})
if err != nil && !xerrors.Is(err, context.Canceled) {
Expand Down
75 changes: 34 additions & 41 deletions provisioner/terraform/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,27 +23,15 @@ import (
"github.com/coder/coder/provisionersdk/proto"
)

// initMut is a global mutex that protects the Terraform cache directory from
// concurrent usage by path. Only `terraform init` commands are guarded by this
// mutex.
//
// When cache path is set, we must protect against multiple calls to
// `terraform init`.
//
// From the Terraform documentation:
//
// Note: The plugin cache directory is not guaranteed to be concurrency
// safe. The provider installer's behavior in environments with multiple
// terraform init calls is undefined.
var initMut = &sync.Mutex{}

type executor struct {
mut *sync.Mutex
binaryPath string
cachePath string
workdir string
// cachePath and workdir must not be used by multiple processes at once.
cachePath string
workdir string
}

func (e executor) basicEnv() []string {
func (e *executor) basicEnv() []string {
// Required for "terraform init" to find "git" to
// clone Terraform modules.
env := safeEnviron()
Expand All @@ -55,7 +43,8 @@ func (e executor) basicEnv() []string {
return env
}

func (e executor) execWriteOutput(ctx, killCtx context.Context, args, env []string, stdOutWriter, stdErrWriter io.WriteCloser) (err error) {
// execWriteOutput must only be called while the lock is held.
func (e *executor) execWriteOutput(ctx, killCtx context.Context, args, env []string, stdOutWriter, stdErrWriter io.WriteCloser) (err error) {
defer func() {
closeErr := stdOutWriter.Close()
if err == nil && closeErr != nil {
Expand Down Expand Up @@ -98,7 +87,8 @@ func (e executor) execWriteOutput(ctx, killCtx context.Context, args, env []stri
return cmd.Wait()
}

func (e executor) execParseJSON(ctx, killCtx context.Context, args, env []string, v interface{}) error {
// execParseJSON must only be called while the lock is held.
func (e *executor) execParseJSON(ctx, killCtx context.Context, args, env []string, v interface{}) error {
if ctx.Err() != nil {
return ctx.Err()
}
Expand Down Expand Up @@ -133,7 +123,7 @@ func (e executor) execParseJSON(ctx, killCtx context.Context, args, env []string
return nil
}

func (e executor) checkMinVersion(ctx context.Context) error {
func (e *executor) checkMinVersion(ctx context.Context) error {
v, err := e.version(ctx)
if err != nil {
return err
Expand All @@ -147,7 +137,8 @@ func (e executor) checkMinVersion(ctx context.Context) error {
return nil
}

func (e executor) version(ctx context.Context) (*version.Version, error) {
// version doesn't need the lock because it doesn't read or write to any state.
func (e *executor) version(ctx context.Context) (*version.Version, error) {
return versionFromBinaryPath(ctx, e.binaryPath)
}

Expand Down Expand Up @@ -177,7 +168,10 @@ func versionFromBinaryPath(ctx context.Context, binaryPath string) (*version.Ver
return version.NewVersion(vj.Version)
}

func (e executor) init(ctx, killCtx context.Context, logr logSink) error {
func (e *executor) init(ctx, killCtx context.Context, logr logSink) error {
e.mut.Lock()
defer e.mut.Unlock()

outWriter, doneOut := logWriter(logr, proto.LogLevel_DEBUG)
errWriter, doneErr := logWriter(logr, proto.LogLevel_ERROR)
defer func() {
Expand All @@ -193,23 +187,14 @@ func (e executor) init(ctx, killCtx context.Context, logr logSink) error {
"-input=false",
}

// When cache path is set, we must protect against multiple calls
// to `terraform init`.
//
// From the Terraform documentation:
// Note: The plugin cache directory is not guaranteed to be
// concurrency safe. The provider installer's behavior in
// environments with multiple terraform init calls is undefined.
if e.cachePath != "" {
initMut.Lock()
defer initMut.Unlock()
}

return e.execWriteOutput(ctx, killCtx, args, e.basicEnv(), outWriter, errWriter)
}

// revive:disable-next-line:flag-parameter
func (e executor) plan(ctx, killCtx context.Context, env, vars []string, logr logSink, destroy bool) (*proto.Provision_Response, error) {
func (e *executor) plan(ctx, killCtx context.Context, env, vars []string, logr logSink, destroy bool) (*proto.Provision_Response, error) {
e.mut.Lock()
defer e.mut.Unlock()

planfilePath := filepath.Join(e.workdir, "terraform.tfplan")
args := []string{
"plan",
Expand Down Expand Up @@ -257,7 +242,8 @@ func (e executor) plan(ctx, killCtx context.Context, env, vars []string, logr lo
}, nil
}

func (e executor) planResources(ctx, killCtx context.Context, planfilePath string) ([]*proto.Resource, error) {
// planResources must only be called while the lock is held.
func (e *executor) planResources(ctx, killCtx context.Context, planfilePath string) ([]*proto.Resource, error) {
plan, err := e.showPlan(ctx, killCtx, planfilePath)
if err != nil {
return nil, xerrors.Errorf("show terraform plan file: %w", err)
Expand All @@ -270,14 +256,16 @@ func (e executor) planResources(ctx, killCtx context.Context, planfilePath strin
return ConvertResources(plan.PlannedValues.RootModule, rawGraph)
}

func (e executor) showPlan(ctx, killCtx context.Context, planfilePath string) (*tfjson.Plan, error) {
// showPlan must only be called while the lock is held.
func (e *executor) showPlan(ctx, killCtx context.Context, planfilePath string) (*tfjson.Plan, error) {
args := []string{"show", "-json", "-no-color", planfilePath}
p := new(tfjson.Plan)
err := e.execParseJSON(ctx, killCtx, args, e.basicEnv(), p)
return p, err
}

func (e executor) graph(ctx, killCtx context.Context) (string, error) {
// graph must only be called while the lock is held.
func (e *executor) graph(ctx, killCtx context.Context) (string, error) {
if ctx.Err() != nil {
return "", ctx.Err()
}
Expand All @@ -302,9 +290,12 @@ func (e executor) graph(ctx, killCtx context.Context) (string, error) {
}

// revive:disable-next-line:flag-parameter
func (e executor) apply(
func (e *executor) apply(
ctx, killCtx context.Context, plan []byte, env []string, logr logSink,
) (*proto.Provision_Response, error) {
e.mut.Lock()
defer e.mut.Unlock()

planFile, err := ioutil.TempFile("", "coder-terrafrom-plan")
if err != nil {
return nil, xerrors.Errorf("create plan file: %w", err)
Expand Down Expand Up @@ -356,7 +347,8 @@ func (e executor) apply(
}, nil
}

func (e executor) stateResources(ctx, killCtx context.Context) ([]*proto.Resource, error) {
// stateResources must only be called while the lock is held.
func (e *executor) stateResources(ctx, killCtx context.Context) ([]*proto.Resource, error) {
state, err := e.state(ctx, killCtx)
if err != nil {
return nil, err
Expand All @@ -375,7 +367,8 @@ func (e executor) stateResources(ctx, killCtx context.Context) ([]*proto.Resourc
return resources, nil
}

func (e executor) state(ctx, killCtx context.Context) (*tfjson.State, error) {
// state must only be called while the lock is held.
func (e *executor) state(ctx, killCtx context.Context) (*tfjson.State, error) {
args := []string{"show", "-json", "-no-color"}
state := &tfjson.State{}
err := e.execParseJSON(ctx, killCtx, args, e.basicEnv(), state)
Expand Down
6 changes: 3 additions & 3 deletions provisioner/terraform/provision_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func readProvisionLog(t *testing.T, response proto.DRPCProvisioner_ProvisionClie

if log := msg.GetLog(); log != nil {
t.Log(log.Level.String(), log.Output)
logBuf.WriteString(log.Output)
_, _ = logBuf.WriteString(log.Output)
}
if c = msg.GetComplete(); c != nil {
require.Empty(t, c.Error)
Expand Down Expand Up @@ -190,8 +190,6 @@ func TestProvision_Cancel(t *testing.T) {
func TestProvision(t *testing.T) {
t.Parallel()

ctx, api := setupProvisioner(t, nil)

testCases := []struct {
Name string
Files map[string]string
Expand Down Expand Up @@ -329,6 +327,8 @@ func TestProvision(t *testing.T) {
t.Run(testCase.Name, func(t *testing.T) {
t.Parallel()

ctx, api := setupProvisioner(t, nil)

directory := t.TempDir()
for path, content := range testCase.Files {
err := os.WriteFile(filepath.Join(directory, path), []byte(content), 0o600)
Expand Down
13 changes: 9 additions & 4 deletions provisioner/terraform/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package terraform
import (
"context"
"path/filepath"
"sync"
"time"

"github.com/cli/safeexec"
Expand All @@ -22,8 +23,9 @@ type ServeOptions struct {
// BinaryPath specifies the "terraform" binary to use.
// If omitted, the $PATH will attempt to find it.
BinaryPath string
CachePath string
Logger slog.Logger
// CachePath must not be used by multiple processes at once.
CachePath string
Logger slog.Logger

// ExitTimeout defines how long we will wait for a running Terraform
// command to exit (cleanly) if the provision was stopped. This only
Expand Down Expand Up @@ -91,6 +93,7 @@ func Serve(ctx context.Context, options *ServeOptions) error {
options.ExitTimeout = defaultExitTimeout
}
return provisionersdk.Serve(ctx, &server{
execMut: &sync.Mutex{},
binaryPath: options.BinaryPath,
cachePath: options.CachePath,
logger: options.Logger,
Expand All @@ -99,14 +102,16 @@ func Serve(ctx context.Context, options *ServeOptions) error {
}

type server struct {
execMut *sync.Mutex
binaryPath string
cachePath string
logger slog.Logger
exitTimeout time.Duration
}

func (s *server) executor(workdir string) executor {
return executor{
func (s *server) executor(workdir string) *executor {
return &executor{
mut: s.execMut,
binaryPath: s.binaryPath,
cachePath: s.cachePath,
workdir: workdir,
Expand Down
3 changes: 2 additions & 1 deletion provisionerd/provisionerd.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ type Options struct {
JobPollJitter time.Duration
JobPollDebounce time.Duration
Provisioners Provisioners
WorkDirectory string
// WorkDirectory must not be used by multiple processes at once.
WorkDirectory string
}

// New creates and starts a provisioner daemon.
Expand Down