Skip to content

Commit 5317b56

Browse files
committed
Start socklib as background worker
1 parent bc7a35e commit 5317b56

File tree

6 files changed

+136
-56
lines changed

6 files changed

+136
-56
lines changed

contrib/pg_dtm/Makefile

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
MODULE_big = pg_dtm
2-
OBJS = pg_dtm.o libdtm.o
2+
OBJS = pg_dtm.o libdtm.o sockhub/libsockhub.a
3+
4+
sockhub/libsockhub.a:
5+
make -C sockhub
36

47
EXTENSION = pg_dtm
58
DATA = pg_dtm--1.0.sql

contrib/pg_dtm/libdtm.c

Lines changed: 74 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,10 @@ typedef struct DTMConnData
2222
int sock;
2323
} DTMConnData;
2424

25+
static char *dtmhost = NULL;
26+
static int dtmport = 0;
27+
static char* dtm_unix_sock_dir;
28+
2529
typedef unsigned long long xid_t;
2630

2731
// Returns true if the write was successful.
@@ -151,51 +155,73 @@ static bool dtm_read_status(DTMConn dtm, XidStatus *s)
151155
// Connects to the specified DTM.
152156
static DTMConn DtmConnect(char *host, int port)
153157
{
154-
struct addrinfo *addrs = NULL;
155-
struct addrinfo hint;
156-
char portstr[6];
157-
struct addrinfo *a;
158-
159-
memset(&hint, 0, sizeof(hint));
160-
hint.ai_socktype = SOCK_STREAM;
161-
hint.ai_family = AF_INET;
162-
snprintf(portstr, 6, "%d", port);
163-
hint.ai_protocol = getprotobyname("tcp")->p_proto;
164-
if (getaddrinfo(host, portstr, &hint, &addrs))
165-
{
166-
perror("resolve address");
167-
return NULL;
168-
}
169-
170-
for (a = addrs; a != NULL; a = a->ai_next)
171-
{
172-
DTMConn dtm;
173-
int one = 1;
174-
int sock = socket(a->ai_family, a->ai_socktype, a->ai_protocol);
175-
if (sock == -1)
176-
{
177-
perror("failed to create a socket");
178-
continue;
179-
}
180-
setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one));
181-
182-
if (connect(sock, a->ai_addr, a->ai_addrlen) == -1)
183-
{
184-
perror("failed to connect to an address");
185-
close(sock);
186-
continue;
187-
}
188-
189-
// success
190-
freeaddrinfo(addrs);
191-
dtm = malloc(sizeof(DTMConnData));
192-
dtm->sock = sock;
193-
return dtm;
194-
}
195-
196-
freeaddrinfo(addrs);
197-
fprintf(stderr, "could not connect\n");
198-
return NULL;
158+
DTMConn dtm;
159+
int sd;
160+
161+
if (strcmp(host, "localhost") == 0) {
162+
struct sockaddr sock;
163+
int len = offsetof(struct sockaddr, sa_data) + snprintf(sock.sa_data, sizeof(sock.sa_data), "%s/p%u", dtm_unix_sock_dir, port);
164+
sock.sa_family = AF_UNIX;
165+
166+
sd = socket(AF_UNIX, SOCK_STREAM, 0);
167+
if (sd == -1)
168+
{
169+
perror("failed to create a unix socket");
170+
}
171+
if (connect(sd, &sock, len) == -1)
172+
{
173+
perror("failed to connect to an address");
174+
close(sd);
175+
return NULL;
176+
}
177+
dtm = malloc(sizeof(DTMConnData));
178+
dtm->sock = sd;
179+
return dtm;
180+
} else {
181+
struct addrinfo *addrs = NULL;
182+
struct addrinfo hint;
183+
char portstr[6];
184+
struct addrinfo *a;
185+
186+
memset(&hint, 0, sizeof(hint));
187+
hint.ai_socktype = SOCK_STREAM;
188+
hint.ai_family = AF_INET;
189+
snprintf(portstr, 6, "%d", port);
190+
hint.ai_protocol = getprotobyname("tcp")->p_proto;
191+
if (getaddrinfo(host, portstr, &hint, &addrs))
192+
{
193+
perror("resolve address");
194+
return NULL;
195+
}
196+
197+
for (a = addrs; a != NULL; a = a->ai_next)
198+
{
199+
int one = 1;
200+
sd = socket(a->ai_family, a->ai_socktype, a->ai_protocol);
201+
if (sd == -1)
202+
{
203+
perror("failed to create a socket");
204+
continue;
205+
}
206+
setsockopt(sd, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one));
207+
208+
if (connect(sd, a->ai_addr, a->ai_addrlen) == -1)
209+
{
210+
perror("failed to connect to an address");
211+
close(sd);
212+
continue;
213+
}
214+
215+
// success
216+
freeaddrinfo(addrs);
217+
dtm = malloc(sizeof(DTMConnData));
218+
dtm->sock = sd;
219+
return dtm;
220+
}
221+
freeaddrinfo(addrs);
222+
}
223+
fprintf(stderr, "could not connect\n");
224+
return NULL;
199225
}
200226

201227
/*
@@ -231,16 +257,14 @@ static bool dtm_query(DTMConn dtm, char cmd, int argc, ...)
231257
return true;
232258
}
233259

234-
static char *dtmhost = NULL;
235-
static int dtmport = 0;
236-
237-
void TuneToDtm(char *host, int port) {
260+
void DtmGlobalConfig(char *host, int port, char* sock_dir) {
238261
if (dtmhost) {
239262
free(dtmhost);
240263
dtmhost = NULL;
241264
}
242265
dtmhost = strdup(host);
243266
dtmport = port;
267+
dtm_unix_sock_dir = sock_dir;
244268
}
245269

246270
static DTMConn GetConnection()
@@ -255,7 +279,7 @@ static DTMConn GetConnection()
255279
elog(ERROR, "Failed to connect to DTMD %s:%d", dtmhost, dtmport);
256280
}
257281
} else {
258-
elog(ERROR, "DTMD address not specified");
282+
/* elog(ERROR, "DTMD address not specified"); */
259283
}
260284
}
261285
return dtm;

contrib/pg_dtm/libdtm.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010

1111
// Sets up the host and port for DTM connection.
1212
// The defaults are "127.0.0.1" and 5431.
13-
void TuneToDtm(char *host, int port);
13+
void DtmGlobalConfig(char *host, int port, char* sock_dir);
1414

1515
void DtmInitSnapshot(Snapshot snapshot);
1616

contrib/pg_dtm/pg_dtm.c

Lines changed: 55 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
#include "postgres.h"
1313
#include "fmgr.h"
1414
#include "miscadmin.h"
15+
#include "postmaster/postmaster.h"
16+
#include "postmaster/bgworker.h"
1517
#include "storage/s_lock.h"
1618
#include "storage/spin.h"
1719
#include "storage/lmgr.h"
@@ -39,6 +41,7 @@
3941
#include "storage/pmsignal.h"
4042
#include "storage/proc.h"
4143
#include "utils/syscache.h"
44+
#include "sockhub/sockhub.h"
4245

4346
#include "libdtm.h"
4447

@@ -74,6 +77,7 @@ static bool TransactionIdIsInSnapshot(TransactionId xid, Snapshot snapshot);
7477
static bool TransactionIdIsInDoubt(TransactionId xid);
7578

7679
static void DtmShmemStartup(void);
80+
static void DtmBackgroundWorker(Datum arg);
7781

7882
static shmem_startup_hook_type prev_shmem_startup_hook;
7983
static HTAB* xid_in_doubt;
@@ -91,7 +95,17 @@ static TransactionManager DtmTM = { DtmGetTransactionStatus, DtmSetTransactionSt
9195

9296
static char *DtmHost;
9397
static int DtmPort;
94-
98+
static int DtmBufferSize;
99+
100+
static BackgroundWorker DtmWorker = {
101+
"DtmWorker",
102+
0, /* do not need connection to the database */
103+
BgWorkerStart_PostmasterStart,
104+
1, /* restrart in one second (is it possible to restort immediately?) */
105+
DtmBackgroundWorker
106+
};
107+
108+
95109

96110
#define XTM_TRACE(fmt, ...)
97111
//#define XTM_INFO(fmt, ...) fprintf(stderr, fmt, ## __VA_ARGS__)
@@ -623,6 +637,9 @@ static void DtmInitialize()
623637
dtm->nReservedXids = 0;
624638
dtm->minXid = InvalidTransactionId;
625639
RegisterXactCallback(DtmXactCallback, NULL);
640+
if (DtmBufferSize != 0) {
641+
RegisterBackgroundWorker(&DtmWorker);
642+
}
626643
}
627644
LWLockRelease(AddinShmemInitLock);
628645

@@ -716,6 +733,21 @@ _PG_init(void)
716733
NULL
717734
);
718735

736+
DefineCustomIntVariable(
737+
"dtm.buffer_size",
738+
"Size of sockhub buffer for connection to DTM daemon, if 0, then direct connection will be used",
739+
NULL,
740+
&DtmBufferSize,
741+
0,
742+
0,
743+
INT_MAX,
744+
PGC_POSTMASTER,
745+
0,
746+
NULL,
747+
NULL,
748+
NULL
749+
);
750+
719751
DefineCustomStringVariable(
720752
"dtm.host",
721753
"The host where DTM daemon resides",
@@ -744,7 +776,7 @@ _PG_init(void)
744776
NULL
745777
);
746778

747-
TuneToDtm(DtmHost, DtmPort);
779+
DtmGlobalConfig(DtmHost, DtmPort, Unix_socket_directories);
748780

749781
/*
750782
* Install hooks.
@@ -835,3 +867,24 @@ Datum dtm_join_transaction(PG_FUNCTION_ARGS)
835867
PG_RETURN_VOID();
836868
}
837869

870+
void DtmBackgroundWorker(Datum arg)
871+
{
872+
Shub shub;
873+
ShubParams params;
874+
char unix_sock_path[MAXPGPATH];
875+
876+
snprintf(unix_sock_path, sizeof(unix_sock_path), "%s/p%d", Unix_socket_directories, DtmPort);
877+
878+
ShubInitParams(&params);
879+
880+
params.host = DtmHost;
881+
params.port = DtmPort;
882+
params.file = unix_sock_path;
883+
params.buffer_size = DtmBufferSize;
884+
885+
DtmGlobalConfig("localhost", DtmPort, Unix_socket_directories);
886+
887+
ShubInitialize(&shub, &params);
888+
889+
ShubLoop(&shub);
890+
}

contrib/pg_dtm/sockhub/Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
CC = gcc
2-
CFLAGS = -c -I. -Wall -O2 -g
2+
CFLAGS = -c -I. -Wall -O2 -g -fPIC
33
LD = $(CC)
44
LDFLAGS = -g
55
AR = ar

contrib/pg_dtm/tests/transfers.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,5 +2,5 @@
22
go run transfers.go \
33
-d 'dbname=postgres port=5432' \
44
-d 'dbname=postgres port=5433' \
5-
-d 'dbname=postgres port=5434' \
5+
-m \
66
-g

0 commit comments

Comments
 (0)