Skip to content

Commit d3a99db

Browse files
committed
Merge branch 'kotsachin-master'
2 parents 13e3591 + 8fc8b65 commit d3a99db

File tree

4 files changed

+114
-31
lines changed

4 files changed

+114
-31
lines changed

bin/pg_repack.c

Lines changed: 57 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,7 @@ static void repack_one_table(repack_table *table, const char *order_by);
204204
static bool repack_table_indexes(PGresult *index_details);
205205
static bool repack_all_indexes(char *errbuf, size_t errsize);
206206
static void repack_cleanup(bool fatal, const repack_table *table);
207+
static void repack_cleanup_callback(bool fatal, void *userdata);
207208
static bool rebuild_indexes(const repack_table *table);
208209

209210
static char *getstr(PGresult *res, int row, int col);
@@ -235,13 +236,20 @@ static bool only_indexes = false;
235236
static int wait_timeout = 60; /* in seconds */
236237
static int jobs = 0; /* number of concurrent worker conns. */
237238
static bool dryrun = false;
239+
static unsigned int temp_obj_num = 0; /* temporary objects counter */
238240

239241
/* buffer should have at least 11 bytes */
240242
static char *
241243
utoa(unsigned int value, char *buffer)
242244
{
243245
sprintf(buffer, "%u", value);
244-
return buffer;
246+
/* XXX: originally, we would just return buffer here without
247+
* the pgut_strdup(). But repack_cleanup_callback() seems to
248+
* depend on getting back a freshly strdup'd copy of buffer,
249+
* not sure why. So now we are leaking a tiny bit of memory
250+
* with each utoa() call.
251+
*/
252+
return pgut_strdup(buffer);
245253
}
246254

247255
static pgut_option options[] =
@@ -898,6 +906,13 @@ rebuild_indexes(const repack_table *table)
898906

899907
ret = select(max_fd + 1, &input_mask, NULL, NULL, &timeout);
900908
#endif
909+
/* XXX: the errno != EINTR check means we won't bail
910+
* out on SIGINT. We should probably just remove this
911+
* check, though it seems we also need to fix up
912+
* the on_interrupt handling for workers' index
913+
* builds (those PGconns don't seem to have c->cancel
914+
* set, so we don't cancel the in-progress builds).
915+
*/
901916
if (ret < 0 && errno != EINTR)
902917
elog(ERROR, "poll() failed: %d, %d", ret, errno);
903918

@@ -990,7 +1005,7 @@ static void
9901005
repack_one_table(repack_table *table, const char *orderby)
9911006
{
9921007
PGresult *res = NULL;
993-
const char *params[2];
1008+
const char *params[3];
9941009
int num;
9951010
char *vxid = NULL;
9961011
char buffer[12];
@@ -1041,6 +1056,9 @@ repack_one_table(repack_table *table, const char *orderby)
10411056

10421057
if (dryrun)
10431058
return;
1059+
1060+
/* push repack_cleanup_callback() on stack to clean temporary objects */
1061+
pgut_atexit_push(repack_cleanup_callback, &table->target_oid);
10441062

10451063
/*
10461064
* 1. Setup advisory lock and trigger on main table.
@@ -1146,8 +1164,11 @@ repack_one_table(repack_table *table, const char *orderby)
11461164
CLEARPGRES(res);
11471165

11481166
command(table->create_pktype, 0, NULL);
1167+
temp_obj_num++;
11491168
command(table->create_log, 0, NULL);
1169+
temp_obj_num++;
11501170
command(table->create_trigger, 0, NULL);
1171+
temp_obj_num++;
11511172
command(table->enable_trigger, 0, NULL);
11521173
printfStringInfo(&sql, "SELECT repack.disable_autovacuum('repack.log_%u')", table->target_oid);
11531174
command(sql.data, 0, NULL);
@@ -1283,6 +1304,7 @@ repack_one_table(repack_table *table, const char *orderby)
12831304
goto cleanup;
12841305

12851306
command(table->create_table, 0, NULL);
1307+
temp_obj_num++;
12861308
printfStringInfo(&sql, "SELECT repack.disable_autovacuum('repack.table_%u')", table->target_oid);
12871309
if (table->drop_columns)
12881310
command(table->drop_columns, 0, NULL);
@@ -1375,8 +1397,10 @@ repack_one_table(repack_table *table, const char *orderby)
13751397
elog(DEBUG2, "---- drop ----");
13761398

13771399
command("BEGIN ISOLATION LEVEL READ COMMITTED", 0, NULL);
1378-
command("SELECT repack.repack_drop($1)", 1, params);
1400+
params[1] = utoa(temp_obj_num, buffer);
1401+
command("SELECT repack.repack_drop($1, $2)", 2, params);
13791402
command("COMMIT", 0, NULL);
1403+
temp_obj_num = 0; /* reset temporary object counter after cleanup */
13801404

13811405
/*
13821406
* 7. Analyze.
@@ -1395,7 +1419,7 @@ repack_one_table(repack_table *table, const char *orderby)
13951419

13961420
/* Release advisory lock on table. */
13971421
params[0] = REPACK_LOCK_PREFIX_STR;
1398-
params[1] = buffer;
1422+
params[1] = utoa(table->target_oid, buffer);
13991423

14001424
res = pgut_execute(connection, "SELECT pg_advisory_unlock($1, CAST(-2147483648 + $2::bigint AS integer))",
14011425
2, params);
@@ -1675,6 +1699,31 @@ lock_exclusive(PGconn *conn, const char *relid, const char *lock_query, bool sta
16751699
return ret;
16761700
}
16771701

1702+
/* This function calls to repack_drop() to clean temporary objects on error
1703+
* in creation of temporary objects.
1704+
*/
1705+
void
1706+
repack_cleanup_callback(bool fatal, void *userdata)
1707+
{
1708+
Oid target_table = *(Oid *) userdata;
1709+
const char *params[2];
1710+
char buffer[12];
1711+
1712+
if(fatal)
1713+
{
1714+
params[0] = utoa(target_table, buffer);
1715+
params[1] = utoa(temp_obj_num, buffer);
1716+
1717+
/* testing PQstatus() of connection and conn2, as we do
1718+
* in repack_cleanup(), doesn't seem to work here,
1719+
* so just use an unconditional reconnect().
1720+
*/
1721+
reconnect(ERROR);
1722+
command("SELECT repack.repack_drop($1, $2)", 2, params);
1723+
temp_obj_num = 0; /* reset temporary object counter after cleanup */
1724+
}
1725+
}
1726+
16781727
/*
16791728
* The userdata pointing a table being re-organized. We need to cleanup temp
16801729
* objects before the program exits.
@@ -1689,7 +1738,7 @@ repack_cleanup(bool fatal, const repack_table *table)
16891738
else
16901739
{
16911740
char buffer[12];
1692-
const char *params[1];
1741+
const char *params[2];
16931742

16941743
/* Try reconnection if not available. */
16951744
if (PQstatus(connection) != CONNECTION_OK ||
@@ -1698,7 +1747,9 @@ repack_cleanup(bool fatal, const repack_table *table)
16981747

16991748
/* do cleanup */
17001749
params[0] = utoa(table->target_oid, buffer);
1701-
command("SELECT repack.repack_drop($1)", 1, params);
1750+
params[1] = utoa(temp_obj_num, buffer);
1751+
command("SELECT repack.repack_drop($1, $2)", 2, params);
1752+
temp_obj_num = 0; /* reset temporary object counter after cleanup */
17021753
}
17031754
}
17041755

bin/pgut/pgut.c

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ static void on_before_exec(pgutConn *conn);
6464
static void on_after_exec(pgutConn *conn);
6565
static void on_interrupt(void);
6666
static void on_cleanup(void);
67-
static void exit_or_abort(int exitcode);
67+
static void exit_or_abort(int exitcode, int elevel);
6868

6969
void
7070
pgut_init(int argc, char **argv)
@@ -872,7 +872,10 @@ pgut_errfinish(int dummy, ...)
872872
edata->detail.data);
873873

874874
if (pgut_abort_level <= edata->elevel && edata->elevel <= PANIC)
875-
exit_or_abort(edata->code);
875+
{
876+
in_cleanup = true; /* need to be set for cleaning temporary objects on error */
877+
exit_or_abort(edata->code, edata->elevel);
878+
}
876879
}
877880

878881
#ifndef PGUT_OVERRIDE_ELOG
@@ -1180,7 +1183,9 @@ call_atexit_callbacks(bool fatal)
11801183
pgut_atexit_item *item;
11811184

11821185
for (item = pgut_atexit_stack; item; item = item->next)
1186+
{
11831187
item->callback(fatal, item->userdata);
1188+
}
11841189
}
11851190

11861191
static void
@@ -1193,12 +1198,19 @@ on_cleanup(void)
11931198
}
11941199

11951200
static void
1196-
exit_or_abort(int exitcode)
1201+
exit_or_abort(int exitcode, int elevel)
11971202
{
1198-
if (in_cleanup)
1203+
1204+
if (in_cleanup && FATAL > elevel)
11991205
{
12001206
/* oops, error in cleanup*/
12011207
call_atexit_callbacks(true);
1208+
exit(exitcode);
1209+
}
1210+
else if (FATAL <= elevel <= PANIC)
1211+
{
1212+
/* on FATAL or PANIC */
1213+
call_atexit_callbacks(true);
12021214
abort();
12031215
}
12041216
else

lib/pg_repack.sql.in

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -247,7 +247,7 @@ CREATE FUNCTION repack.repack_swap(oid) RETURNS void AS
247247
'MODULE_PATHNAME', 'repack_swap'
248248
LANGUAGE C VOLATILE STRICT;
249249

250-
CREATE FUNCTION repack.repack_drop(oid) RETURNS void AS
250+
CREATE FUNCTION repack.repack_drop(oid, int) RETURNS void AS
251251
'MODULE_PATHNAME', 'repack_drop'
252252
LANGUAGE C VOLATILE STRICT;
253253

lib/repack.c

Lines changed: 40 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -928,6 +928,7 @@ Datum
928928
repack_drop(PG_FUNCTION_ARGS)
929929
{
930930
Oid oid = PG_GETARG_OID(0);
931+
int numobj = PG_GETARG_INT32(1);
931932
const char *relname = get_quoted_relname(oid);
932933
const char *nspname = get_quoted_nspname(oid);
933934

@@ -943,14 +944,41 @@ repack_drop(PG_FUNCTION_ARGS)
943944
/* connect to SPI manager */
944945
repack_init();
945946

947+
/* drop log table: must be done before dropping the pk type,
948+
* since the log table is dependent on the pk type. (That's
949+
* why we check numobj > 1 here.)
950+
*/
951+
if (numobj > 1)
952+
{
953+
execute_with_format(
954+
SPI_OK_UTILITY,
955+
"DROP TABLE IF EXISTS repack.log_%u CASCADE",
956+
oid);
957+
--numobj;
958+
}
959+
960+
/* drop type for pk type */
961+
if (numobj > 0)
962+
{
963+
execute_with_format(
964+
SPI_OK_UTILITY,
965+
"DROP TYPE IF EXISTS repack.pk_%u",
966+
oid);
967+
--numobj;
968+
}
969+
946970
/*
947971
* drop repack trigger: We have already dropped the trigger in normal
948972
* cases, but it can be left on error.
949973
*/
950-
execute_with_format(
951-
SPI_OK_UTILITY,
952-
"DROP TRIGGER IF EXISTS z_repack_trigger ON %s.%s CASCADE",
953-
nspname, relname);
974+
if (numobj > 0)
975+
{
976+
execute_with_format(
977+
SPI_OK_UTILITY,
978+
"DROP TRIGGER IF EXISTS z_repack_trigger ON %s.%s CASCADE",
979+
nspname, relname);
980+
--numobj;
981+
}
954982

955983
#if PG_VERSION_NUM < 80400
956984
/* delete autovacuum settings */
@@ -965,23 +993,15 @@ repack_drop(PG_FUNCTION_ARGS)
965993
oid, oid);
966994
#endif
967995

968-
/* drop log table */
969-
execute_with_format(
970-
SPI_OK_UTILITY,
971-
"DROP TABLE IF EXISTS repack.log_%u CASCADE",
972-
oid);
973-
974996
/* drop temp table */
975-
execute_with_format(
976-
SPI_OK_UTILITY,
977-
"DROP TABLE IF EXISTS repack.table_%u CASCADE",
978-
oid);
979-
980-
/* drop type for log table */
981-
execute_with_format(
982-
SPI_OK_UTILITY,
983-
"DROP TYPE IF EXISTS repack.pk_%u CASCADE",
984-
oid);
997+
if (numobj > 0)
998+
{
999+
execute_with_format(
1000+
SPI_OK_UTILITY,
1001+
"DROP TABLE IF EXISTS repack.table_%u CASCADE",
1002+
oid);
1003+
--numobj;
1004+
}
9851005

9861006
SPI_finish();
9871007

0 commit comments

Comments
 (0)