Skip to content

Commit 6046342

Browse files
committed
Merge branch 'xtm' of gitlab.postgrespro.ru:pgpro-dev/postgrespro into xtm
2 parents 11456b2 + d1281fa commit 6046342

File tree

14 files changed

+322
-104
lines changed

14 files changed

+322
-104
lines changed

contrib/pg_dtm/Makefile

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
MODULE_big = pg_dtm
2-
OBJS = pg_dtm.o libdtm.o
2+
OBJS = pg_dtm.o libdtm.o sockhub/libsockhub.a
3+
4+
sockhub/libsockhub.a:
5+
make -C sockhub
36

47
EXTENSION = pg_dtm
58
DATA = pg_dtm--1.0.sql

contrib/pg_dtm/dtmd/Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ CC=gcc
22
CFLAGS=-g -Wall -Iinclude -D_LARGEFILE64_SOURCE # -DDEBUG
33
LIBUV_PREFIX=$(HOME)/libuv-build
44
LIBUV_CFLAGS=-I"$(LIBUV_PREFIX)/include" -L"$(LIBUV_PREFIX)/lib"
5-
LIBUV_LDFLAGS=-luv -pthread
5+
LIBUV_LDFLAGS=-luv -pthread -lrt
66

77
SYSTEM=$(shell uname -s)
88
ifeq ($(SYSTEM),Darwin)

contrib/pg_dtm/dtmd/include/util.h

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,19 @@ int falloc(int fd, off64_t size);
1818
char *destructive_concat(char *a, char *b);
1919

2020
#ifndef DEBUG
21-
#define shout(...)
21+
#define debug(...)
2222
#else
23-
#define shout(...) \
23+
#define debug(...) \
2424
do { \
2525
fprintf(stderr, __VA_ARGS__); \
2626
fflush(stderr); \
2727
} while (0)
28-
2928
#endif
3029

30+
#define shout(...) \
31+
do { \
32+
fprintf(stderr, __VA_ARGS__); \
33+
fflush(stderr); \
34+
} while (0)
35+
3136
#endif

contrib/pg_dtm/dtmd/src/clog.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ int clog_read(clog_t clog, xid_t xid) {
133133
bool clog_write(clog_t clog, xid_t xid, int status) {
134134
clogfile_t *file = clog_xid_to_file(clog, xid);
135135
if (!file) {
136-
shout("xid %016llx out of range, creating the file\n", xid);
136+
debug("xid %016llx out of range, creating the file\n", xid);
137137
clogfile_t newfile;
138138
if (!clogfile_open_by_id(&newfile, clog->datadir, XID_TO_FILEID(xid), true)) {
139139
shout(
@@ -149,7 +149,7 @@ bool clog_write(clog_t clog, xid_t xid, int status) {
149149
}
150150
file = clog_xid_to_file(clog, xid);
151151
if (!file) {
152-
shout("the file is absent despite out efforts\n");
152+
shout("the file is absent despite our efforts\n");
153153
return false;
154154
}
155155
bool ok = clogfile_set_status(file, xid, status);

contrib/pg_dtm/dtmd/src/clogfile.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,14 +33,14 @@ bool clogfile_open_by_id(clogfile_t *clogfile, char *datadir, int fileid, bool c
3333
shout("cannot create clog file '%s': %s\n", clogfile->path, strerror(errno));
3434
return false;
3535
}
36-
shout("created clog file '%s'\n", clogfile->path);
36+
debug("created clog file '%s'\n", clogfile->path);
3737
} else {
3838
fd = open(clogfile->path, O_RDWR);
3939
if (fd == -1) {
4040
shout("cannot open clog file '%s': %s\n", clogfile->path, strerror(errno));
4141
return false;
4242
}
43-
shout("opened clog file '%s'\n", clogfile->path);
43+
debug("opened clog file '%s'\n", clogfile->path);
4444
}
4545

4646
if (falloc(fd, BYTES_PER_FILE)) {

contrib/pg_dtm/dtmd/src/eventwrap.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ int eventwrap(
8787
onconnect_cb = onconnect;
8888
ondisconnect_cb = ondisconnect;
8989

90-
shout("libuv version: %s\n", uv_version_string());
90+
debug("libuv version: %s\n", uv_version_string());
9191

9292
loop = uv_default_loop();
9393

contrib/pg_dtm/dtmd/src/main.c

Lines changed: 133 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,9 @@
44
#include <unistd.h>
55
#include <assert.h>
66
#include <time.h>
7+
#include <signal.h>
8+
#include <errno.h>
9+
#include <sys/wait.h>
710

811
#include "clog.h"
912
#include "parser.h"
@@ -54,7 +57,7 @@ static void free_client_data(client_data_t *cd) {
5457
static int next_client_id = 0;
5558
static void onconnect(void *stream, void **clientdata) {
5659
*clientdata = create_client_data(next_client_id++);
57-
shout("[%d] connected\n", CLIENT_ID(*clientdata));
60+
debug("[%d] connected\n", CLIENT_ID(*clientdata));
5861
}
5962

6063
static void notify_listeners(Transaction *t, int status) {
@@ -88,7 +91,7 @@ static void notify_listeners(Transaction *t, int status) {
8891
}
8992

9093
static void ondisconnect(void *stream, void *clientdata) {
91-
shout("[%d] disconnected\n", CLIENT_ID(clientdata));
94+
debug("[%d] disconnected\n", CLIENT_ID(clientdata));
9295

9396
if (CLIENT_XID(clientdata) != INVALID_XID) {
9497
int i;
@@ -126,7 +129,7 @@ static void ondisconnect(void *stream, void *clientdata) {
126129
}
127130

128131
#ifdef DEBUG
129-
static void shout_cmd(void *clientdata, cmd_t *cmd) {
132+
static void debug_cmd(void *clientdata, cmd_t *cmd) {
130133
char *cmdname;
131134
switch (cmd->cmd) {
132135
case CMD_RESERVE : cmdname = "RESERVE"; break;
@@ -137,15 +140,15 @@ static void shout_cmd(void *clientdata, cmd_t *cmd) {
137140
case CMD_STATUS : cmdname = "STATUS"; break;
138141
default : cmdname = "unknown";
139142
}
140-
shout("[%d] %s", CLIENT_ID(clientdata), cmdname);
143+
debug("[%d] %s", CLIENT_ID(clientdata), cmdname);
141144
int i;
142145
for (i = 0; i < cmd->argc; i++) {
143-
shout(" %llu", cmd->argv[i]);
146+
debug(" %llu", cmd->argv[i]);
144147
}
145-
shout("\n");
148+
debug("\n");
146149
}
147150
#else
148-
#define shout_cmd(...)
151+
#define debug_cmd(...)
149152
#endif
150153

151154
#define CHECK(COND, CDATA, MSG) \
@@ -197,14 +200,14 @@ static char *onreserve(void *stream, void *clientdata, cmd_t *cmd) {
197200
int minsize = cmd->argv[1];
198201
xid_t maxxid = minxid + minsize - 1;
199202

200-
shout(
203+
debug(
201204
"[%d] RESERVE: asked for range %llu-%llu\n",
202205
CLIENT_ID(clientdata),
203206
minxid, maxxid
204207
);
205208

206209
if ((prev_gxid >= minxid) || (maxxid >= next_gxid)) {
207-
shout(
210+
debug(
208211
"[%d] RESERVE: local range %llu-%llu is not between global range %llu-%llu\n",
209212
CLIENT_ID(clientdata),
210213
minxid, maxxid,
@@ -215,7 +218,7 @@ static char *onreserve(void *stream, void *clientdata, cmd_t *cmd) {
215218
maxxid = max(maxxid, minxid + minsize - 1);
216219
next_gxid = maxxid + 1;
217220
}
218-
shout(
221+
debug(
219222
"[%d] RESERVE: allocating range %llu-%llu\n",
220223
CLIENT_ID(clientdata),
221224
minxid, maxxid
@@ -514,7 +517,7 @@ static char *onnoise(void *stream, void *clientdata, cmd_t *cmd) {
514517
// }
515518

516519
static char *oncmd(void *stream, void *clientdata, cmd_t *cmd) {
517-
shout_cmd(clientdata, cmd);
520+
debug_cmd(clientdata, cmd);
518521

519522
char *result = NULL;
520523
switch (cmd->cmd) {
@@ -547,12 +550,6 @@ static char *ondata(void *stream, void *clientdata, size_t len, char *data) {
547550
parser_t parser = CLIENT_PARSER(clientdata);
548551
char *response = NULL;
549552

550-
// shout(
551-
// "[%d] got some data[%lu] %s\n",
552-
// CLIENT_ID(clientdata),
553-
// len, data
554-
// );
555-
556553
// The idea is to feed each character through
557554
// the parser, which will return a cmd from
558555
// time to time.
@@ -585,16 +582,102 @@ static char *ondata(void *stream, void *clientdata, size_t len, char *data) {
585582
}
586583

587584
static void usage(char *prog) {
588-
printf("Usage: %s [-d DATADIR] [-a HOST] [-p PORT]\n", prog);
585+
printf(
586+
"Usage: %s [-d DATADIR] [-k] [-a HOST] [-p PORT] [-l LOGFILE]\n"
587+
" dtmd will try to kill the other one running at\n"
588+
" the same DATADIR.\n"
589+
" -l : Run as a daemon and write output to LOGFILE.\n"
590+
" -k : Just kill the other dtm and exit.\n",
591+
prog
592+
);
593+
}
594+
595+
// Reads a pid from the file at 'pidpath'.
596+
// Returns the pid, or 0 in case of error.
597+
int read_pid(char *pidpath) {
598+
FILE *f = fopen(pidpath, "r");
599+
if (f == NULL) {
600+
debug("failed to open pidfile for reading: %s\n", strerror(errno));
601+
return 0;
602+
}
603+
604+
int pid = 0;
605+
if (fscanf(f, "%d", &pid) != 1) {
606+
shout("failed to read pid from pidfile\n");
607+
pid = 0;
608+
}
609+
610+
if (fclose(f)) {
611+
shout("failed to close pidfile O_o: %s\n", strerror(errno));
612+
}
613+
return pid;
614+
}
615+
616+
// Returns the pid, or 0 in case of error.
617+
int write_pid(char *pidpath, int pid) {
618+
FILE *f = fopen(pidpath, "w");
619+
if (f == NULL) {
620+
shout("failed to open pidfile for writing: %s\n", strerror(errno));
621+
return 0;
622+
}
623+
624+
if (fprintf(f, "%d\n", pid) < 0) {
625+
shout("failed to write pid to pidfile\n");
626+
pid = 0;
627+
}
628+
629+
if (fclose(f)) {
630+
shout("failed to close pidfile O_o: %s\n", strerror(errno));
631+
}
632+
return pid;
633+
}
634+
635+
// If there is a pidfile in 'datadir',
636+
// sends TERM signal to the corresponding pid.
637+
void kill_the_elder(char *datadir) {
638+
char *pidpath = join_path(datadir, "dtmd.pid");
639+
int pid = read_pid(pidpath);
640+
free(pidpath);
641+
642+
if (pid > 1) {
643+
if (kill(pid, SIGTERM)) {
644+
switch(errno) {
645+
case EPERM:
646+
shout("was not allowed to kill pid=%d\n", pid);
647+
break;
648+
case ESRCH:
649+
shout("pid=%d not found for killing\n", pid);
650+
break;
651+
}
652+
}
653+
debug("SIGTERM sent to pid=%d\n" pid);
654+
debug("waiting for pid=%d to die\n" pid);
655+
waitpid(pid, NULL, 0);
656+
debug("pid=%d died\n" pid);
657+
} else {
658+
debug("no elder to kill\n" pid);
659+
}
660+
}
661+
662+
char *pidpath;
663+
void die(int signum) {
664+
shout("terminated\n");
665+
if (unlink(pidpath) == -1) {
666+
shout("could not remove pidfile: %s\n", strerror(errno));
667+
}
668+
exit(signum);
589669
}
590670

591671
int main(int argc, char **argv) {
592672
char *datadir = DEFAULT_DATADIR;
593673
char *listenhost = DEFAULT_LISTENHOST;
674+
char *logfilename = NULL;
675+
bool daemonize = false;
676+
bool assassin = false;
594677
int listenport = DEFAULT_LISTENPORT;
595678

596679
int opt;
597-
while ((opt = getopt(argc, argv, "hd:a:p:")) != -1) {
680+
while ((opt = getopt(argc, argv, "hd:a:p:l:k")) != -1) {
598681
switch (opt) {
599682
case 'd':
600683
datadir = optarg;
@@ -605,21 +688,51 @@ int main(int argc, char **argv) {
605688
case 'p':
606689
listenport = atoi(optarg);
607690
break;
691+
case 'l':
692+
logfilename = optarg;
693+
daemonize = true;
694+
break;
608695
case 'h':
609696
usage(argv[0]);
610697
return EXIT_SUCCESS;
698+
case 'k':
699+
assassin = true;
700+
break;
611701
default:
612702
usage(argv[0]);
613703
return EXIT_FAILURE;
614704
}
615705
}
616706

707+
kill_the_elder(datadir);
708+
if (assassin) {
709+
return EXIT_SUCCESS;
710+
}
711+
712+
if (logfilename) {
713+
freopen(logfilename, "a", stdout);
714+
freopen(logfilename, "a", stderr);
715+
}
716+
617717
clg = clog_open(datadir);
618718
if (!clg) {
619719
shout("could not open clog at '%s'\n", datadir);
620720
return EXIT_FAILURE;
621721
}
622722

723+
if (daemonize) {
724+
if (daemon(true, true) == -1) {
725+
shout("could not daemonize: %s\n", strerror(errno));
726+
return EXIT_FAILURE;
727+
}
728+
}
729+
730+
pidpath = join_path(datadir, "dtmd.pid");
731+
signal(SIGTERM, die);
732+
signal(SIGINT, die);
733+
734+
write_pid(pidpath, getpid());
735+
623736
prev_gxid = MIN_XID;
624737
next_gxid = MIN_XID;
625738
transactions_count = 0;
@@ -630,5 +743,6 @@ int main(int argc, char **argv) {
630743
);
631744

632745
clog_close(clg);
746+
633747
return retcode;
634748
}

0 commit comments

Comments
 (0)