Skip to content

Commit eeeb868

Browse files
committed
Merge branch 'xtm' of gitlab.postgrespro.ru:pgpro-dev/postgrespro into xtm
2 parents 1e3909c + c2f8668 commit eeeb868

File tree

13 files changed

+131
-81
lines changed

13 files changed

+131
-81
lines changed

contrib/pg_xtm/dtmd/include/transaction.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ typedef struct Transaction {
1515
xid_t xid;
1616

1717
int size; // number of paritcipants
18-
int max_size; // maximal number of participants
1918

2019
// for + against ≤ size
2120
int votes_for;

contrib/pg_xtm/dtmd/src/main.c

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -257,18 +257,11 @@ static char *onbegin(void *stream, void *clientdata, cmd_t *cmd) {
257257
);
258258

259259
CHECK(
260-
cmd->argc == 1,
260+
cmd->argc == 0,
261261
clientdata,
262262
"BEGIN: wrong number of arguments"
263263
);
264264

265-
int size = cmd->argv[0];
266-
CHECK(
267-
size <= MAX_NODES,
268-
clientdata,
269-
"BEGIN: 'size' > MAX_NODES"
270-
);
271-
272265
CHECK(
273266
CLIENT_XID(clientdata) == INVALID_XID,
274267
clientdata,
@@ -280,7 +273,6 @@ static char *onbegin(void *stream, void *clientdata, cmd_t *cmd) {
280273

281274
prev_gxid = t->xid = next_gxid++;
282275
t->snapshots_count = 0;
283-
t->max_size = size;
284276
t->size = 1;
285277

286278
CLIENT_SNAPSENT(clientdata) = 0;
@@ -438,11 +430,6 @@ static char *onsnapshot(void *stream, void *clientdata, cmd_t *cmd) {
438430
CLIENT_SNAPSENT(clientdata) = 0;
439431
CLIENT_XID(clientdata) = t->xid;
440432
t->size += 1;
441-
CHECK(
442-
t->size <= t->max_size,
443-
clientdata,
444-
"SNAPSHOT: too many participants"
445-
);
446433
}
447434

448435
CHECK(

contrib/pg_xtm/dtmd/src/transaction.c

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ void transaction_clear(Transaction *t) {
2828

2929
t->xid = INVALID_XID;
3030
t->size = 0;
31-
t->max_size = 0;
3231
t->votes_for = 0;
3332
t->votes_against = 0;
3433
t->snapshots_count = 0;

contrib/pg_xtm/libdtm.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -281,15 +281,15 @@ void DtmInitSnapshot(Snapshot snapshot)
281281
// transaction id, fills the 'snapshot' and 'gxmin' on success. 'gxmin' is the
282282
// smallest xmin among all snapshots known to DTM. Returns INVALID_XID
283283
// otherwise.
284-
TransactionId DtmGlobalStartTransaction(int nParticipants, Snapshot snapshot, TransactionId *gxmin)
284+
TransactionId DtmGlobalStartTransaction(Snapshot snapshot, TransactionId *gxmin)
285285
{
286286
bool ok;
287287
xid_t xid;
288288
xid_t number;
289289
DTMConn dtm = GetConnection();
290290

291291
// query
292-
if (!dtm_query(dtm, 'b', 1, nParticipants)) goto failure;
292+
if (!dtm_query(dtm, 'b', 0)) goto failure;
293293

294294
// response
295295
if (!dtm_read_bool(dtm, &ok)) goto failure;

contrib/pg_xtm/libdtm.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,11 @@
1010

1111
void DtmInitSnapshot(Snapshot snapshot);
1212

13-
// Starts a new global transaction of nParticipants size. Returns the
13+
// Starts a new global transaction. Returns the
1414
// transaction id, fills the 'snapshot' and 'gxmin' on success. 'gxmin' is the
1515
// smallest xmin among all snapshots known to DTM. Returns INVALID_XID
1616
// otherwise.
17-
TransactionId DtmGlobalStartTransaction(int nParticipants, Snapshot snapshot, TransactionId *gxmin);
17+
TransactionId DtmGlobalStartTransaction(Snapshot snapshot, TransactionId *gxmin);
1818

1919
// Asks the DTM for a fresh snapshot. Fills the 'snapshot' and 'gxmin' on
2020
// success. 'gxmin' is the smallest xmin among all snapshots known to DTM.

contrib/pg_xtm/pg_dtm--1.0.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
-- complain if script is sourced in psql, rather than via CREATE EXTENSION
22
\echo Use "CREATE EXTENSION pg_dtm" to load this file. \quit
33

4-
CREATE FUNCTION dtm_begin_transaction(n_participants integer) RETURNS integer
4+
CREATE FUNCTION dtm_begin_transaction() RETURNS integer
55
AS 'MODULE_PATHNAME','dtm_begin_transaction'
66
LANGUAGE C;
77

contrib/pg_xtm/pg_dtm.c

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ static bool DtmTransactionIdIsInProgress(TransactionId xid);
7474
static TransactionId DtmGetNextXid(void);
7575
static TransactionId DtmGetNewTransactionId(bool isSubXact);
7676
static TransactionId DtmGetOldestXmin(Relation rel, bool ignoreVacuum);
77+
static TransactionId DtmGetGlobalTransactionId(void);
7778

7879
static bool TransactionIdIsInSnapshot(TransactionId xid, Snapshot snapshot);
7980
static bool TransactionIdIsInDoubt(TransactionId xid);
@@ -92,7 +93,7 @@ static bool DtmGlobalXidAssigned;
9293
static int DtmLocalXidReserve;
9394
static int DtmCurcid;
9495
static Snapshot DtmLastSnapshot;
95-
static TransactionManager DtmTM = { DtmGetTransactionStatus, DtmSetTransactionStatus, DtmGetSnapshot, DtmGetNewTransactionId, DtmGetOldestXmin, DtmTransactionIdIsInProgress };
96+
static TransactionManager DtmTM = { DtmGetTransactionStatus, DtmSetTransactionStatus, DtmGetSnapshot, DtmGetNewTransactionId, DtmGetOldestXmin, DtmTransactionIdIsInProgress, DtmGetGlobalTransactionId };
9697

9798

9899
#define XTM_TRACE(fmt, ...)
@@ -323,6 +324,12 @@ static TransactionId DtmGetNextXid()
323324
return xid;
324325
}
325326

327+
TransactionId
328+
DtmGetGlobalTransactionId()
329+
{
330+
return DtmNextXid;
331+
}
332+
326333
TransactionId
327334
DtmGetNewTransactionId(bool isSubXact)
328335
{
@@ -667,8 +674,8 @@ static void DtmInitialize()
667674
static void
668675
DtmXactCallback(XactEvent event, void *arg)
669676
{
677+
XTM_INFO("%d: DtmXactCallbackevent=%d isGlobal=%d, nextxid=%d\n", getpid(), event, DtmGlobalXidAssigned, DtmNextXid);
670678
if (event == XACT_EVENT_COMMIT || event == XACT_EVENT_ABORT) {
671-
XTM_INFO("%d: DtmXactCallbackevent=%d isGlobal=%d, nextxid=%d\n", getpid(), event, DtmGlobalXidAssigned, DtmNextXid);
672679
if (DtmGlobalXidAssigned) {
673680
DtmGlobalXidAssigned = false;
674681
} else if (TransactionIdIsValid(DtmNextXid)) {
@@ -780,10 +787,9 @@ dtm_get_current_snapshot_xmax(PG_FUNCTION_ARGS)
780787
Datum
781788
dtm_begin_transaction(PG_FUNCTION_ARGS)
782789
{
783-
int nParticipants = PG_GETARG_INT32(0);
784790
Assert(!TransactionIdIsValid(DtmNextXid));
785791

786-
DtmNextXid = DtmGlobalStartTransaction(nParticipants, &DtmSnapshot, &dtm->minXid);
792+
DtmNextXid = DtmGlobalStartTransaction(&DtmSnapshot, &dtm->minXid);
787793
Assert(TransactionIdIsValid(DtmNextXid));
788794
XTM_INFO("%d: Start global transaction %d, dtm->minXid=%d\n", getpid(), DtmNextXid, dtm->minXid);
789795

contrib/pg_xtm/sockhub/sockhub.c

Lines changed: 59 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -125,13 +125,11 @@ static void reconnect(Shub* shub)
125125
} while (rc < 0 && errno == EINTR);
126126

127127
if (rc >= 0 || errno == EINPROGRESS) {
128-
if (rc >= 0) {
129-
}
130128
break;
131129
}
132130
}
133131
if (rc < 0) {
134-
if (errno != ENOENT && errno != ECONNREFUSED) {
132+
if (errno != ENOENT && errno != ECONNREFUSED && errno != EINPROGRESS) {
135133
shub->params->error_handler("Connection can not be establish", SHUB_FATAL_ERROR);
136134
}
137135
if (max_attempts-- != 0) {
@@ -187,6 +185,7 @@ void ShubInitialize(Shub* shub, ShubParams* params)
187185
FD_ZERO(&shub->inset);
188186
FD_SET(shub->input, &shub->inset);
189187

188+
shub->output = -1;
190189
reconnect(shub);
191190

192191
shub->in_buffer = malloc(params->buffer_size);
@@ -207,52 +206,58 @@ void ShubLoop(Shub* shub)
207206
while (1) {
208207
fd_set events;
209208
struct timeval tm;
210-
int i, max_fd, rc;
211-
unsigned int n, size;
209+
int i, rc;
210+
int max_fd = shub->max_fd;
212211

213212
tm.tv_sec = shub->params->delay/1000;
214213
tm.tv_usec = shub->params->delay % 1000 * 1000;
215214

216215
events = shub->inset;
217-
rc = select(shub->max_fd+1, &events, NULL, NULL, shub->in_buffer_used == 0 ? NULL : &tm);
216+
rc = select(max_fd+1, &events, NULL, NULL, shub->in_buffer_used == 0 ? NULL : &tm);
218217
if (rc < 0) {
219218
if (errno != EINTR) {
220219
shub->params->error_handler("Select failed", SHUB_RECOVERABLE_ERROR);
221220
recovery(shub);
222221
}
223222
} else {
224223
if (rc > 0) {
225-
for (i = 0, max_fd = shub->max_fd; i <= max_fd; i++) {
224+
for (i = 0; i <= max_fd; i++) {
226225
if (FD_ISSET(i, &events)) {
227-
if (i == shub->input) {
226+
if (i == shub->input) { /* accept incomming connection */
228227
int s = accept(i, NULL, NULL);
229228
if (s < 0) {
230229
shub->params->error_handler("Failed to accept socket", SHUB_RECOVERABLE_ERROR);
231230
} else {
232-
if (s > max_fd) {
231+
if (s > shub->max_fd) {
233232
shub->max_fd = s;
234233
}
235234
FD_SET(s, &shub->inset);
236235
}
237-
} else if (i == shub->output) {
236+
} else if (i == shub->output) { /* receive response from server */
237+
/* try to read as much as possible */
238238
int available = recv(shub->output, shub->out_buffer + shub->out_buffer_used, buffer_size - shub->out_buffer_used, 0);
239239
int pos = 0;
240240
if (available <= 0) {
241241
shub->params->error_handler("Failed to read inet socket", SHUB_RECOVERABLE_ERROR);
242242
reconnect(shub);
243+
continue;
243244
}
244245
shub->out_buffer_used += available;
246+
247+
/* loop through all received responses */
245248
while (pos + sizeof(ShubMessageHdr) <= shub->out_buffer_used) {
246-
ShubMessageHdr* hdr = (ShubMessageHdr*)(shub->out_buffer + pos);
249+
ShubMessageHdr* hdr = (ShubMessageHdr*)&shub->out_buffer[pos];
247250
int chan = hdr->chan;
248-
n = pos + sizeof(ShubMessageHdr) + hdr->size <= shub->out_buffer_used ? hdr->size + sizeof(ShubMessageHdr) : shub->out_buffer_used - pos;
251+
unsigned int n = pos + sizeof(ShubMessageHdr) + hdr->size <= shub->out_buffer_used
252+
? hdr->size + sizeof(ShubMessageHdr)
253+
: shub->out_buffer_used - pos;
249254
if (!write_socket(chan, (char*)hdr, n)) {
250255
shub->params->error_handler("Failed to write to local socket", SHUB_RECOVERABLE_ERROR);
251256
close_socket(shub, chan);
252257
chan = -1;
253258
}
254-
/* read rest of message if it doesn't fit in buffer */
255259
if (n != hdr->size + sizeof(ShubMessageHdr)) {
260+
/* read rest of message if it doesn't fit in the buffer */
256261
int tail = hdr->size + sizeof(ShubMessageHdr) - n;
257262
do {
258263
n = tail < buffer_size ? tail : buffer_size;
@@ -274,56 +279,73 @@ void ShubLoop(Shub* shub)
274279
}
275280
pos += n;
276281
}
282+
/* Move partly fetched message header (if any) to the beginning of byffer */
277283
memcpy(shub->out_buffer, shub->out_buffer + pos, shub->out_buffer_used - pos);
278284
shub->out_buffer_used -= pos;
279-
} else {
285+
} else { /* receive request from client */
280286
ShubMessageHdr* hdr = (ShubMessageHdr*)&shub->in_buffer[shub->in_buffer_used];
281-
if (!read_socket(i, (char*)hdr, sizeof(ShubMessageHdr))) {
287+
int chan = i;
288+
if (!read_socket(chan, (char*)hdr, sizeof(ShubMessageHdr))) { /* fetch message header */
282289
shub->params->error_handler("Failed to read local socket", SHUB_RECOVERABLE_ERROR);
283290
close_socket(shub, i);
284291
} else {
285-
size = hdr->size;
286-
hdr->chan = i;
292+
unsigned int size = hdr->size;
293+
hdr->chan = chan; /* remember socket descriptor from which this message was read */
287294
if (size + shub->in_buffer_used + sizeof(ShubMessageHdr) > buffer_size) {
288-
if (shub->in_buffer_used != 0) {
295+
/* message doesn't completely fit in buffer */
296+
if (shub->in_buffer_used != 0) { /* if buffer is not empty...*/
297+
/* ... then send it */
289298
while (!write_socket(shub->output, shub->in_buffer, shub->in_buffer_used)) {
290299
shub->params->error_handler("Failed to write to inet socket", SHUB_RECOVERABLE_ERROR);
291300
reconnect(shub);
292301
}
302+
/* move received message header to the beginning of the buffer */
293303
memcpy(shub->in_buffer, shub->in_buffer + shub->in_buffer_used, sizeof(ShubMessageHdr));
294304
shub->in_buffer_used = 0;
295305
}
296306
}
297307
shub->in_buffer_used += sizeof(ShubMessageHdr);
298308

299-
while (1) {
309+
do {
300310
unsigned int n = size + shub->in_buffer_used > buffer_size ? buffer_size - shub->in_buffer_used : size;
301-
if (!read_socket(i, shub->in_buffer + shub->in_buffer_used, n)) {
311+
/* fetch message body */
312+
if (chan >= 0 && !read_socket(chan, shub->in_buffer + shub->in_buffer_used, n)) {
302313
shub->params->error_handler("Failed to read local socket", SHUB_RECOVERABLE_ERROR);
303-
close_socket(shub, i);
304-
break;
305-
} else {
306-
if (n != size) {
307-
while (!write_socket(shub->output, shub->in_buffer, n)) {
308-
shub->params->error_handler("Failed to write to inet socket", SHUB_RECOVERABLE_ERROR);
309-
reconnect(shub);
310-
}
311-
size -= n;
312-
shub->in_buffer_used = 0;
313-
} else {
314-
shub->in_buffer_used += n;
314+
close_socket(shub, chan);
315+
if (hdr != NULL) { /* if message header is not yet sent to the server... */
316+
/* ... then skip this message */
317+
shub->in_buffer_used = (char*)hdr - shub->in_buffer;
315318
break;
319+
} else { /* if message was partly sent to the server, we can not skip it, so we have to send garbage to the server */
320+
chan = -1; /* do not try to read rest of body of this message */
316321
}
322+
}
323+
shub->in_buffer_used += n;
324+
size -= n;
325+
/* if there is no more free space in the buffer to receive new message header... */
326+
if (shub->in_buffer_used + sizeof(ShubMessageHdr) > buffer_size) {
327+
328+
/* ... then send buffer to the server */
329+
while (!write_socket(shub->output, shub->in_buffer, shub->in_buffer_used)) {
330+
shub->params->error_handler("Failed to write to inet socket", SHUB_RECOVERABLE_ERROR);
331+
reconnect(shub);
332+
}
333+
hdr = NULL; /* message is partly sent to the server: can not skip it any more */
334+
shub->in_buffer_used = 0;
317335
}
318-
}
336+
} while (size != 0); /* repeat until all message body is received */
319337
}
320338
}
321339
}
322340
}
323-
} else if (shub->in_buffer_used != 0) {
324-
while (!write_socket(shub->output, shub->in_buffer, shub->in_buffer_used)) {
325-
shub->params->error_handler("Failed to write to inet socket", SHUB_RECOVERABLE_ERROR);
326-
reconnect(shub);
341+
} else { /* timeout expired */
342+
if (shub->in_buffer_used != 0) { /* if buffer is not empty... */
343+
/* ...then send it */
344+
while (!write_socket(shub->output, shub->in_buffer, shub->in_buffer_used)) {
345+
shub->params->error_handler("Failed to write to inet socket", SHUB_RECOVERABLE_ERROR);
346+
reconnect(shub);
347+
}
348+
shub->in_buffer_used = 0;
327349
}
328350
}
329351
}

0 commit comments

Comments
 (0)