|
4 | 4 | "context"
|
5 | 5 | "database/sql"
|
6 | 6 | "testing"
|
7 |
| - "time" |
8 | 7 |
|
9 | 8 | "github.com/prometheus/client_golang/prometheus"
|
10 | 9 | "github.com/stretchr/testify/assert"
|
@@ -136,63 +135,54 @@ func TestPGPubsubDriver(t *testing.T) {
|
136 | 135 | require.NoError(t, err)
|
137 | 136 | defer closePg()
|
138 | 137 |
|
139 |
| - // wrap the pg driver with one we can control |
140 |
| - d, err := dbtestutil.Register() |
| 138 | + // use a separate subber and pubber so we can keep track of listener connections |
| 139 | + db, err := sql.Open("postgres", connectionURL) |
141 | 140 | require.NoError(t, err)
|
142 |
| - |
143 |
| - db, err := sql.Open(d.Name(), connectionURL) |
| 141 | + pubber, err := pubsub.New(ctx, logger, db, connectionURL) |
144 | 142 | require.NoError(t, err)
|
145 |
| - defer db.Close() |
146 | 143 |
|
147 |
| - ps, err := pubsub.New(ctx, logger, db, connectionURL) |
| 144 | + // use a connector that sends us the connections for the subber |
| 145 | + subDriver := dbtestutil.NewDriver() |
| 146 | + tconn, err := subDriver.Connector(connectionURL) |
148 | 147 | require.NoError(t, err)
|
149 |
| - defer ps.Close() |
| 148 | + tcdb := sql.OpenDB(tconn) |
| 149 | + subber, err := pubsub.New(ctx, logger, tcdb, connectionURL) |
| 150 | + require.NoError(t, err) |
| 151 | + defer subber.Close() |
150 | 152 |
|
151 | 153 | // test that we can publish and subscribe
|
152 |
| - gotChan := make(chan struct{}) |
| 154 | + gotChan := make(chan struct{}, 1) |
153 | 155 | defer close(gotChan)
|
154 |
| - subCancel, err := ps.Subscribe("test", func(_ context.Context, _ []byte) { |
| 156 | + subCancel, err := subber.Subscribe("test", func(_ context.Context, _ []byte) { |
155 | 157 | gotChan <- struct{}{}
|
156 | 158 | })
|
157 | 159 | require.NoError(t, err)
|
158 | 160 | defer subCancel()
|
159 | 161 |
|
160 |
| - err = ps.Publish("test", []byte("hello")) |
| 162 | + t.Log("publishing message") |
| 163 | + // send a message |
| 164 | + err = pubber.Publish("test", []byte("hello")) |
161 | 165 | require.NoError(t, err)
|
162 | 166 |
|
163 |
| - select { |
164 |
| - case <-gotChan: |
165 |
| - case <-ctx.Done(): |
166 |
| - t.Fatal("timeout waiting for message") |
167 |
| - } |
| 167 | + // wait for the message |
| 168 | + _ = testutil.RequireRecvCtx(ctx, t, gotChan) |
168 | 169 |
|
169 |
| - reconnectChan := make(chan struct{}) |
170 |
| - go func() { |
171 |
| - d.WaitForConnection() |
172 |
| - // wait a bit to make sure the pubsub has reestablished it's connection |
173 |
| - // if we don't wait, the publish may be dropped because the pubsub hasn't initialized yet. |
174 |
| - time.Sleep(1 * time.Second) |
175 |
| - reconnectChan <- struct{}{} |
176 |
| - }() |
| 170 | + // read out first connection |
| 171 | + firstConn := testutil.RequireRecvCtx(ctx, t, subDriver.Connections) |
177 | 172 |
|
178 | 173 | // drop the underlying connection being used by the pubsub
|
179 | 174 | // the pq.Listener should reconnect and repopulate it's listeners
|
180 | 175 | // so old subscriptions should still work
|
181 |
| - d.DropConnections() |
| 176 | + err = firstConn.Close() |
| 177 | + require.NoError(t, err) |
182 | 178 |
|
183 |
| - select { |
184 |
| - case <-reconnectChan: |
185 |
| - case <-ctx.Done(): |
186 |
| - t.Fatal("timeout waiting for reconnect") |
187 |
| - } |
| 179 | + // wait for the reconnect |
| 180 | + _ = testutil.RequireRecvCtx(ctx, t, subDriver.Connections) |
188 | 181 |
|
189 | 182 | // ensure our old subscription still fires
|
190 |
| - err = ps.Publish("test", []byte("hello-again")) |
| 183 | + err = pubber.Publish("test", []byte("hello-again")) |
191 | 184 | require.NoError(t, err)
|
192 | 185 |
|
193 |
| - select { |
194 |
| - case <-gotChan: |
195 |
| - case <-ctx.Done(): |
196 |
| - t.Fatal("timeout waiting for message after reconnect") |
197 |
| - } |
| 186 | + // wait for the message on the old subscription |
| 187 | + _ = testutil.RequireRecvCtx(ctx, t, gotChan) |
198 | 188 | }
|
0 commit comments