Skip to content

Commit b15877e

Browse files
committed
Add mtm.get_cluster_info function
1 parent 8c66044 commit b15877e

File tree

5 files changed

+88
-29
lines changed

5 files changed

+88
-29
lines changed

contrib/mmts/bgwpool.c

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,6 @@ typedef struct
1616
int id;
1717
} BgwPoolExecutorCtx;
1818

19-
size_t n_snapshots;
20-
size_t n_active;
21-
2219
static void BgwPoolMainLoop(Datum arg)
2320
{
2421
BgwPoolExecutorCtx* ctx = (BgwPoolExecutorCtx*)arg;
@@ -36,7 +33,8 @@ static void BgwPoolMainLoop(Datum arg)
3633
size = *(int*)&pool->queue[pool->head];
3734
Assert(size < pool->size);
3835
work = malloc(size);
39-
pool->active -= 1;
36+
pool->pending -= 1;
37+
pool->active += 1;
4038
if (pool->head + size + 4 > pool->size) {
4139
memcpy(work, pool->queue, size);
4240
pool->head = INTALIGN(size);
@@ -54,6 +52,9 @@ static void BgwPoolMainLoop(Datum arg)
5452
SpinLockRelease(&pool->lock);
5553
pool->executor(id, work, size);
5654
free(work);
55+
SpinLockAcquire(&pool->lock);
56+
pool->active -= 1;
57+
SpinLockRelease(&pool->lock);
5758
}
5859
}
5960

@@ -71,6 +72,7 @@ void BgwPoolInit(BgwPool* pool, BgwPoolExecutor executor, char const* dbname, si
7172
pool->tail = 0;
7273
pool->size = queueSize;
7374
pool->active = 0;
75+
pool->pending = 0;
7476
strcpy(pool->dbname, dbname);
7577
}
7678

@@ -126,9 +128,7 @@ void BgwPoolExecute(BgwPool* pool, void* work, size_t size)
126128
PGSemaphoreLock(&pool->overflow);
127129
SpinLockAcquire(&pool->lock);
128130
} else {
129-
pool->active += 1;
130-
n_snapshots += 1;
131-
n_active += pool->active;
131+
pool->pending += 1;
132132
*(int*)&pool->queue[pool->tail] = size;
133133
if (pool->size - pool->tail >= size + 4) {
134134
memcpy(&pool->queue[pool->tail+4], work, size);

contrib/mmts/bgwpool.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ typedef struct
2020
size_t tail;
2121
size_t size;
2222
size_t active;
23+
size_t pending;
2324
bool producerBlocked;
2425
char dbname[MAX_DBNAME_LEN];
2526
char* queue;

contrib/mmts/multimaster--1.0.sql

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,18 +24,22 @@ AS 'MODULE_PATHNAME','mtm_get_snapshot'
2424
LANGUAGE C;
2525

2626

27-
CREATE TYPE mtm.node_state AS (id integer, disabled bool, disconnected bool, catchUp bool, slotLag bigint, avgTransDelay bigint, lastStatusChange timestamp, connStr text);
27+
CREATE TYPE mtm.node_state AS ("id" integer, "disabled" bool, "disconnected" bool, "catchUp" bool, "slotLag" bigint, "avgTransDelay" bigint, "lastStatusChange" timestamp, "connStr" text);
2828

2929
CREATE FUNCTION mtm.get_nodes_state() RETURNS SETOF mtm.node_state
3030
AS 'MODULE_PATHNAME','mtm_get_nodes_state'
3131
LANGUAGE C;
3232

33-
CREATE TYPE mtm.cluster_state AS (status text, disabledNodeMask bigint, disconnectedNodeMask bigint, catchUpNodeMask bigint, nNodes integer, nActiveQueries integer, queueSize bigint, transCount bigint, timeShift bigint, recoverySlot integer);
33+
CREATE TYPE mtm.cluster_state AS ("status" text, "disabledNodeMask" bigint, "disconnectedNodeMask" bigint, "catchUpNodeMask" bigint, "nNodes" integer, "nActiveQueries" integer, "nPendingQueries" integer, "queueSize" bigint, "transCount" bigint, "timeShift" bigint, "recoverySlot" integer);
3434

3535
CREATE FUNCTION mtm.get_cluster_state() RETURNS mtm.cluster_state
3636
AS 'MODULE_PATHNAME','mtm_get_cluster_state'
3737
LANGUAGE C;
3838

39+
CREATE FUNCTION mtm.get_cluster_info() RETURNS SETOF mtm.cluster_state
40+
AS 'MODULE_PATHNAME','mtm_get_cluster_info'
41+
LANGUAGE C;
42+
3943
CREATE FUNCTION mtm.make_table_local(relation regclass) RETURNS void
4044
AS 'MODULE_PATHNAME','mtm_make_table_local'
4145
LANGUAGE C;

contrib/mmts/multimaster.c

Lines changed: 71 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ PG_FUNCTION_INFO_V1(mtm_recover_node);
108108
PG_FUNCTION_INFO_V1(mtm_get_snapshot);
109109
PG_FUNCTION_INFO_V1(mtm_get_nodes_state);
110110
PG_FUNCTION_INFO_V1(mtm_get_cluster_state);
111+
PG_FUNCTION_INFO_V1(mtm_get_cluster_info);
111112
PG_FUNCTION_INFO_V1(mtm_make_table_local);
112113
PG_FUNCTION_INFO_V1(mtm_dump_lock_graph);
113114

@@ -1609,7 +1610,7 @@ _PG_init(void)
16091610
"Minamal amount of time (milliseconds) to wait 2PC confirmation from all nodes",
16101611
"Timeout for 2PC is calculated as MAX(prepare_time*2pc_prepare_ratio/100,2pc_min_timeout)",
16111612
&Mtm2PCMinTimeout,
1612-
10000,
1613+
100000, /* 100 seconds */
16131614
0,
16141615
INT_MAX,
16151616
PGC_BACKEND,
@@ -1624,7 +1625,7 @@ _PG_init(void)
16241625
"Percent of prepare time for maximal time of second phase of two-pahse commit",
16251626
"Timeout for 2PC is calculated as MAX(prepare_time*2pc_prepare_ratio/100,2pc_min_timeout)",
16261627
&Mtm2PCPrepareRatio,
1627-
100,
1628+
1000, /* 10 times */
16281629
0,
16291630
INT_MAX,
16301631
PGC_BACKEND,
@@ -2178,10 +2179,9 @@ mtm_get_snapshot(PG_FUNCTION_ARGS)
21782179
typedef struct
21792180
{
21802181
int nodeId;
2181-
char* connStrPtr;
21822182
TupleDesc desc;
2183-
Datum values[8];
2184-
bool nulls[8];
2183+
Datum values[Natts_mtm_nodes_state];
2184+
bool nulls[Natts_mtm_nodes_state];
21852185
} MtmGetNodeStateCtx;
21862186

21872187
Datum
@@ -2190,7 +2190,6 @@ mtm_get_nodes_state(PG_FUNCTION_ARGS)
21902190
FuncCallContext* funcctx;
21912191
MtmGetNodeStateCtx* usrfctx;
21922192
MemoryContext oldcontext;
2193-
char* p;
21942193
int64 lag;
21952194
bool is_first_call = SRF_IS_FIRSTCALL();
21962195

@@ -2200,7 +2199,6 @@ mtm_get_nodes_state(PG_FUNCTION_ARGS)
22002199
usrfctx = (MtmGetNodeStateCtx*)palloc(sizeof(MtmGetNodeStateCtx));
22012200
get_call_result_type(fcinfo, NULL, &usrfctx->desc);
22022201
usrfctx->nodeId = 1;
2203-
usrfctx->connStrPtr = pstrdup(MtmConnStrs);
22042202
memset(usrfctx->nulls, false, sizeof(usrfctx->nulls));
22052203
funcctx->user_fctx = usrfctx;
22062204
MemoryContextSwitchTo(oldcontext);
@@ -2219,23 +2217,19 @@ mtm_get_nodes_state(PG_FUNCTION_ARGS)
22192217
usrfctx->nulls[4] = lag < 0;
22202218
usrfctx->values[5] = Int64GetDatum(Mtm->transCount ? Mtm->nodes[usrfctx->nodeId-1].transDelay/Mtm->transCount : 0);
22212219
usrfctx->values[6] = TimestampTzGetDatum(time_t_to_timestamptz(Mtm->nodes[usrfctx->nodeId-1].lastStatusChangeTime));
2222-
p = strchr(usrfctx->connStrPtr, ',');
2223-
if (p != NULL) {
2224-
*p++ = '\0';
2225-
}
2226-
usrfctx->values[7] = CStringGetTextDatum(usrfctx->connStrPtr);
2227-
usrfctx->connStrPtr = p;
2220+
usrfctx->values[7] = CStringGetTextDatum(Mtm->nodes[usrfctx->nodeId-1].con.connStr);
22282221
usrfctx->nodeId += 1;
22292222

22302223
SRF_RETURN_NEXT(funcctx, HeapTupleGetDatum(heap_form_tuple(usrfctx->desc, usrfctx->values, usrfctx->nulls)));
22312224
}
22322225

2226+
22332227
Datum
22342228
mtm_get_cluster_state(PG_FUNCTION_ARGS)
22352229
{
22362230
TupleDesc desc;
2237-
Datum values[10];
2238-
bool nulls[10] = {false};
2231+
Datum values[Natts_mtm_cluster_state];
2232+
bool nulls[Natts_mtm_cluster_state] = {false};
22392233
get_call_result_type(fcinfo, NULL, &desc);
22402234

22412235
values[0] = CStringGetTextDatum(MtmNodeStatusMnem[Mtm->status]);
@@ -2244,16 +2238,73 @@ mtm_get_cluster_state(PG_FUNCTION_ARGS)
22442238
values[3] = Int64GetDatum(Mtm->nodeLockerMask);
22452239
values[4] = Int32GetDatum(Mtm->nNodes);
22462240
values[5] = Int32GetDatum((int)Mtm->pool.active);
2247-
values[6] = Int64GetDatum(BgwPoolGetQueueSize(&Mtm->pool));
2248-
values[7] = Int64GetDatum(Mtm->transCount);
2249-
values[8] = Int64GetDatum(Mtm->timeShift);
2250-
values[9] = Int32GetDatum(Mtm->recoverySlot);
2251-
nulls[9] = Mtm->recoverySlot == 0;
2241+
values[6] = Int32GetDatum((int)Mtm->pool.pending);
2242+
values[7] = Int64GetDatum(BgwPoolGetQueueSize(&Mtm->pool));
2243+
values[8] = Int64GetDatum(Mtm->transCount);
2244+
values[9] = Int64GetDatum(Mtm->timeShift);
2245+
values[10] = Int32GetDatum(Mtm->recoverySlot);
22522246

22532247
PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(desc, values, nulls)));
22542248
}
22552249

22562250

2251+
typedef struct
2252+
{
2253+
int nodeId;
2254+
} MtmGetClusterInfoCtx;
2255+
2256+
2257+
Datum
2258+
mtm_get_cluster_info(PG_FUNCTION_ARGS)
2259+
{
2260+
2261+
FuncCallContext* funcctx;
2262+
MtmGetClusterInfoCtx* usrfctx;
2263+
MemoryContext oldcontext;
2264+
TupleDesc desc;
2265+
bool is_first_call = SRF_IS_FIRSTCALL();
2266+
int i;
2267+
PGconn* conn;
2268+
PGresult *result;
2269+
char* values[Natts_mtm_cluster_state];
2270+
HeapTuple tuple;
2271+
2272+
if (is_first_call) {
2273+
funcctx = SRF_FIRSTCALL_INIT();
2274+
oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
2275+
usrfctx = (MtmGetClusterInfoCtx*)palloc(sizeof(MtmGetNodeStateCtx));
2276+
get_call_result_type(fcinfo, NULL, &desc);
2277+
funcctx->attinmeta = TupleDescGetAttInMetadata(desc);
2278+
usrfctx->nodeId = 1;
2279+
funcctx->user_fctx = usrfctx;
2280+
MemoryContextSwitchTo(oldcontext);
2281+
}
2282+
funcctx = SRF_PERCALL_SETUP();
2283+
usrfctx = (MtmGetClusterInfoCtx*)funcctx->user_fctx;
2284+
if (usrfctx->nodeId > MtmNodes) {
2285+
SRF_RETURN_DONE(funcctx);
2286+
}
2287+
conn = PQconnectdb(Mtm->nodes[usrfctx->nodeId-1].con.connStr);
2288+
if (PQstatus(conn) != CONNECTION_OK) {
2289+
elog(ERROR, "Failed to establish connection '%s' to node %d", Mtm->nodes[usrfctx->nodeId-1].con.connStr, usrfctx->nodeId);
2290+
}
2291+
result = PQexec(conn, "select * from mtm.get_cluster_state()");
2292+
2293+
if (PQresultStatus(result) != PGRES_TUPLES_OK || PQntuples(result) != 1) {
2294+
elog(ERROR, "Failed to receive data from %d", usrfctx->nodeId);
2295+
}
2296+
2297+
for (i = 0; i < Natts_mtm_cluster_state; i++) {
2298+
values[i] = PQgetvalue(result, 0, i);
2299+
}
2300+
tuple = BuildTupleFromCStrings(funcctx->attinmeta, values);
2301+
PQclear(result);
2302+
PQfinish(conn);
2303+
usrfctx->nodeId += 1;
2304+
SRF_RETURN_NEXT(funcctx, HeapTupleGetDatum(tuple));
2305+
}
2306+
2307+
22572308
Datum mtm_make_table_local(PG_FUNCTION_ARGS)
22582309
{
22592310
Oid reloid = PG_GETARG_OID(1);

contrib/mmts/multimaster.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,9 @@
5656
#define Anum_mtm_local_tables_rel_schema 1
5757
#define Anum_mtm_local_tables_rel_name 2
5858

59+
#define Natts_mtm_cluster_state 11
60+
#define Natts_mtm_nodes_state 8
61+
5962
typedef uint64 csn_t; /* commit serial number */
6063
#define INVALID_CSN ((csn_t)-1)
6164

0 commit comments

Comments
 (0)