From c07313e3f4e56530802b99e820094ca46975c065 Mon Sep 17 00:00:00 2001 From: Garrett Delfosse Date: Tue, 28 Nov 2023 13:55:54 +0000 Subject: [PATCH 1/3] fix: insert replica when removed by cleanup --- enterprise/replicasync/replicasync.go | 21 ++++++++++++++++++++- enterprise/replicasync/replicasync_test.go | 19 +++++++++++++++++++ 2 files changed, 39 insertions(+), 1 deletion(-) diff --git a/enterprise/replicasync/replicasync.go b/enterprise/replicasync/replicasync.go index ddc9774387257..0414de3dc4ce5 100644 --- a/enterprise/replicasync/replicasync.go +++ b/enterprise/replicasync/replicasync.go @@ -331,7 +331,26 @@ func (m *Manager) syncReplicas(ctx context.Context) error { Primary: m.self.Primary, }) if err != nil { - return xerrors.Errorf("update replica: %w", err) + if !errors.Is(err, sql.ErrNoRows) { + return xerrors.Errorf("update replica: %w", err) + } + // self replica has been cleaned up, we must reinsert + // nolint:gocritic // Updating a replica is a system function. + replica, err = m.db.InsertReplica(dbauthz.AsSystemRestricted(ctx), database.InsertReplicaParams{ + ID: m.self.ID, + CreatedAt: dbtime.Now(), + UpdatedAt: dbtime.Now(), + StartedAt: m.self.StartedAt, + RelayAddress: m.self.RelayAddress, + RegionID: m.self.RegionID, + Hostname: m.self.Hostname, + Version: m.self.Version, + DatabaseLatency: int32(databaseLatency.Microseconds()), + Primary: m.self.Primary, + }) + if err != nil { + return xerrors.Errorf("update replica: %w", err) + } } if m.self.Error != replica.Error { // Publish an update occurred! diff --git a/enterprise/replicasync/replicasync_test.go b/enterprise/replicasync/replicasync_test.go index 7b305051d74a2..600eb839e28bf 100644 --- a/enterprise/replicasync/replicasync_test.go +++ b/enterprise/replicasync/replicasync_test.go @@ -255,6 +255,25 @@ func TestReplica(t *testing.T) { } wg.Wait() }) + t.Run("UpsertAfterDelete", func(t *testing.T) { + t.Parallel() + db, pubsub := dbtestutil.NewDB(t) + ctx, cancelCtx := context.WithCancel(context.Background()) + defer cancelCtx() + server, err := replicasync.New(ctx, slogtest.Make(t, nil), db, pubsub, &replicasync.Options{ + RelayAddress: "google.com", + CleanupInterval: time.Millisecond, + UpdateInterval: time.Millisecond, + }) + require.NoError(t, err) + defer server.Close() + err = db.DeleteReplicasUpdatedBefore(ctx, dbtime.Now()) + require.NoError(t, err) + deleteTime := dbtime.Now() + require.Eventually(t, func() bool { + return server.Self().UpdatedAt.After(deleteTime) + }, testutil.WaitShort, testutil.IntervalFast) + }) } type derpyHandler struct { From 71076ee549440c9f0884ea770acb08ae3ca1ea60 Mon Sep 17 00:00:00 2001 From: Garrett Delfosse Date: Tue, 28 Nov 2023 14:00:02 +0000 Subject: [PATCH 2/3] update error logs --- enterprise/replicasync/replicasync.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/enterprise/replicasync/replicasync.go b/enterprise/replicasync/replicasync.go index 0414de3dc4ce5..2acbea07ef96a 100644 --- a/enterprise/replicasync/replicasync.go +++ b/enterprise/replicasync/replicasync.go @@ -171,7 +171,7 @@ func (m *Manager) loop(ctx context.Context) { } err := m.syncReplicas(ctx) if err != nil && !errors.Is(err, context.Canceled) { - m.logger.Warn(ctx, "run replica update loop", slog.Error(err)) + m.logger.Error(ctx, "run replica update loop", slog.Error(err)) } } } @@ -192,7 +192,7 @@ func (m *Manager) subscribe(ctx context.Context) error { update = func() { err := m.syncReplicas(ctx) if err != nil && !errors.Is(err, context.Canceled) { - m.logger.Warn(ctx, "run replica from subscribe", slog.Error(err)) + m.logger.Error(ctx, "run replica from subscribe", slog.Error(err)) } updateMutex.Lock() if needsUpdate { From 8e891847e0e953628d839238feb8c0df9b3e7326 Mon Sep 17 00:00:00 2001 From: Garrett Delfosse Date: Tue, 28 Nov 2023 19:05:41 +0000 Subject: [PATCH 3/3] Revert "update error logs" This reverts commit 71076ee549440c9f0884ea770acb08ae3ca1ea60. --- enterprise/replicasync/replicasync.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/enterprise/replicasync/replicasync.go b/enterprise/replicasync/replicasync.go index 2acbea07ef96a..0414de3dc4ce5 100644 --- a/enterprise/replicasync/replicasync.go +++ b/enterprise/replicasync/replicasync.go @@ -171,7 +171,7 @@ func (m *Manager) loop(ctx context.Context) { } err := m.syncReplicas(ctx) if err != nil && !errors.Is(err, context.Canceled) { - m.logger.Error(ctx, "run replica update loop", slog.Error(err)) + m.logger.Warn(ctx, "run replica update loop", slog.Error(err)) } } } @@ -192,7 +192,7 @@ func (m *Manager) subscribe(ctx context.Context) error { update = func() { err := m.syncReplicas(ctx) if err != nil && !errors.Is(err, context.Canceled) { - m.logger.Error(ctx, "run replica from subscribe", slog.Error(err)) + m.logger.Warn(ctx, "run replica from subscribe", slog.Error(err)) } updateMutex.Lock() if needsUpdate {