File tree Expand file tree Collapse file tree 3 files changed +17
-2
lines changed Expand file tree Collapse file tree 3 files changed +17
-2
lines changed Original file line number Diff line number Diff line change @@ -129,6 +129,7 @@ static shmem_startup_hook_type PreviousShmemStartupHook;
129
129
dmq_receiver_hook_type dmq_receiver_start_hook ;
130
130
dmq_receiver_hook_type dmq_receiver_stop_hook ;
131
131
132
+ int dmq_heartbeat_timeout ;
132
133
void dmq_sender_main (Datum main_arg );
133
134
134
135
PG_FUNCTION_INFO_V1 (dmq_receiver_loop );
@@ -1138,9 +1139,9 @@ dmq_receiver_loop(PG_FUNCTION_ARGS)
1138
1139
// XXX: is it enough?
1139
1140
CHECK_FOR_INTERRUPTS ();
1140
1141
1141
- if (dmq_now () - last_message_at > 2000 )
1142
+ if (dmq_now () - last_message_at > dmq_heartbeat_timeout )
1142
1143
{
1143
- mtm_log (ERROR , "[DMQ] exit receiver due to heatbeat timeout" );
1144
+ mtm_log (ERROR , "[DMQ] exit receiver due to heartbeat timeout" );
1144
1145
}
1145
1146
1146
1147
}
Original file line number Diff line number Diff line change @@ -19,6 +19,8 @@ typedef int8 DmqSenderId;
19
19
#define DMQ_MAX_DESTINATIONS 127
20
20
#define DMQ_MAX_RECEIVERS 100
21
21
22
+ extern int dmq_heartbeat_timeout ;
23
+
22
24
extern void dmq_init (const char * library_name );
23
25
24
26
extern DmqDestinationId dmq_destination_add (char * connstr ,
Original file line number Diff line number Diff line change @@ -80,6 +80,18 @@ _PG_init(void)
80
80
NULL ,
81
81
NULL );
82
82
83
+ DefineCustomIntVariable ("dmq_heartbeat_timeout" ,
84
+ "Max timeout between heartbeat messages" ,
85
+ NULL ,
86
+ & dmq_heartbeat_timeout ,
87
+ 20000 ,
88
+ 1 , INT_MAX ,
89
+ PGC_USERSET ,
90
+ GUC_UNIT_MS ,
91
+ NULL ,
92
+ NULL ,
93
+ NULL );
94
+
83
95
EXCHANGE_Init_methods ();
84
96
DUMMYSCAN_Init_methods ();
85
97
EXEC_Hooks_init ();
You can’t perform that action at this time.
0 commit comments