Skip to content

Commit 0a5f57d

Browse files
committed
Bugfix on restart node problem.
OnNodeDisconnect callback added. DMQ receive error about instance disconnection and call the routine. It is removes entry from hash table ExchShmem->htab referenced to disconnected node, make detach_receiver() and destination_drop(). We do not needed in receiver_start callback because check (and establish, if needed) connections before each query.
1 parent 47198b5 commit 0a5f57d

File tree

6 files changed

+60
-7
lines changed

6 files changed

+60
-7
lines changed

contrib/pg_exchange/dmq.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1685,7 +1685,7 @@ dmq_get_destination_status(DmqDestinationId dest_id)
16851685
}
16861686

16871687
void
1688-
dmq_destination_drop(char *receiver_name)
1688+
dmq_destination_drop(const char *receiver_name)
16891689
{
16901690
DmqDestinationId dest_id;
16911691
pid_t sender_pid;

contrib/pg_exchange/dmq.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ extern void dmq_init(const char *library_name);
2424
extern DmqDestinationId dmq_destination_add(char *connstr, char *sender_name,
2525
char *receiver_name, int ping_period);
2626
extern DmqConnState dmq_get_destination_status(DmqDestinationId dest_id);
27-
extern void dmq_destination_drop(char *receiver_name);
27+
extern void dmq_destination_drop(const char *receiver_name);
2828

2929
extern DmqSenderId dmq_attach_receiver(const char *sender_name, int mask_pos);
3030
extern void dmq_detach_receiver(const char *sender_name);
@@ -43,7 +43,7 @@ extern bool dmq_pop_nb(DmqSenderId *sender_id, StringInfo msg, uint64 mask);
4343
extern void dmq_push(DmqDestinationId dest_id, char *stream_name, char *msg);
4444
extern void dmq_push_buffer(DmqDestinationId dest_id, char *stream_name, const void *buffer, size_t len);
4545

46-
typedef void (*dmq_receiver_hook_type) (char *);
46+
typedef void (*dmq_receiver_hook_type) (const char *);
4747
extern dmq_receiver_hook_type dmq_receiver_start_hook;
4848
extern dmq_receiver_hook_type dmq_receiver_stop_hook;
4949

contrib/pg_exchange/pg_exchange.c

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
#include "nodeDummyscan.h"
2424
#include "nodes/nodes.h"
2525
#include "utils/builtins.h"
26+
#include "utils/hsearch.h"
2627
#include "utils/guc.h"
2728
#include "utils/memutils.h"
2829
#include "utils/plancache.h"
@@ -37,10 +38,12 @@ PG_FUNCTION_INFO_V1(pg_exec_plan);
3738

3839
#define DMQ_CONNSTR_MAX_LEN 150
3940

41+
dmq_receiver_hook_type old_dmq_receiver_stop_hook;
4042

4143
void _PG_init(void);
4244
static void deserialize_plan(char **squery, char **splan, char **sparams);
4345
static void exec_plan(char *squery, PlannedStmt *pstmt, ParamListInfo paramLI, const char *serverName);
46+
static void OnNodeDisconnect(const char *node_name);
4447

4548
static Size
4649
shmem_size(void)
@@ -66,6 +69,9 @@ _PG_init(void)
6669

6770
RequestAddinShmemSpace(shmem_size());
6871
RequestNamedLWLockTranche("pg_exchange", 1);
72+
73+
old_dmq_receiver_stop_hook = dmq_receiver_stop_hook;
74+
dmq_receiver_stop_hook = OnNodeDisconnect;
6975
}
7076

7177
Datum
@@ -198,3 +204,37 @@ exec_plan(char *squery, PlannedStmt *pstmt, ParamListInfo paramLI, const char *s
198204
receiver->rDestroy(receiver);
199205
ReleaseCachedPlan(cplan, false);
200206
}
207+
208+
static void
209+
OnNodeDisconnect(const char *node_name)
210+
{
211+
HASH_SEQ_STATUS status;
212+
DMQDestinations *dest;
213+
Oid serverid = InvalidOid;
214+
215+
elog(LOG, "Node %s: disconnected", node_name);
216+
217+
218+
LWLockAcquire(ExchShmem->lock, LW_EXCLUSIVE);
219+
220+
hash_seq_init(&status, ExchShmem->htab);
221+
222+
while ((dest = hash_seq_search(&status)) != NULL)
223+
{
224+
if (!(strcmp(dest->node, node_name) == 0))
225+
continue;
226+
227+
serverid = dest->serverid;
228+
dmq_detach_receiver(node_name);
229+
dmq_destination_drop(node_name);
230+
break;
231+
}
232+
hash_seq_term(&status);
233+
234+
if (OidIsValid(serverid))
235+
hash_search(ExchShmem->htab, &serverid, HASH_REMOVE, NULL);
236+
else
237+
elog(LOG, "Record on disconnected server %u with name %s not found.",
238+
serverid, node_name);
239+
LWLockRelease(ExchShmem->lock);
240+
}

contrib/pg_exchange/stream.c

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -227,14 +227,12 @@ ISendTuple(DmqDestinationId dest_id, char *stream, TupleTableSlot *slot,
227227
static void
228228
wait_for_delivery(OStream *ostream)
229229
{
230-
int attempts;
231-
232-
for (attempts = 0; attempts < 50; attempts++)
230+
for (;;)
233231
{
234232
int waits;
235233

236234
pg_usleep(10);
237-
attempts++;
235+
238236
for (waits = 0; waits < 100000; waits++)
239237
{
240238
if (checkDelivery(ostream))

contrib/pg_execplan/tests/test.sql

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
SELECT * FROM pt;
2+
SELECT * FROM rt;
3+
SELECT * FROM st;
4+
SELECT count(*) FROM pt;
5+
SELECT count(*) FROM pt,rt;
6+
SELECT count(*) FROM pt,rt,st;
7+
SELECT count(*) FROM pt,rt WHERE pt.id=rt.id;
8+
SELECT count(*) FROM pt,rt,st WHERE pt.id=rt.id and rt.id=st.id;
9+
SELECT count(*) FROM pt,rt,st WHERE pt.id=rt.id and rt.id=st.payload;
10+
SELECT count(*) FROM pt,rt,st WHERE pt.id=rt.payload and rt.id=st.payload;
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
export PGDATABASE=test_base
2+
psql -p 5432 -c "SELECT * FROM pt;"
3+
pg_ctl -D PGDATA_n1 stop
4+
pg_ctl -w -c -o "-p 5433" -D PGDATA_n1 -l n1.log start
5+
psql -p 5432 -c "SELECT * FROM pt;"

0 commit comments

Comments
 (0)