Skip to content

Commit e2c720b

Browse files
committed
Perform sql_pop delete in bulk instead of one-at-a-time.
Prior to 5061046 these DELETEs had been done in large batches (of DEFAULT_PEEK_COUNT size), but that naive method of choosing rows to delete was unsafe. Here we continue to keep track of which rows must be deleted as we process them.
1 parent f0cd787 commit e2c720b

File tree

2 files changed

+31
-11
lines changed

2 files changed

+31
-11
lines changed

lib/pg_repack.sql.in

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,7 @@ CREATE VIEW repack.tables AS
191191
'INSERT INTO repack.table_' || R.oid || ' VALUES ($1.*)' AS sql_insert,
192192
'DELETE FROM repack.table_' || R.oid || ' WHERE ' || repack.get_compare_pkey(PK.indexrelid, '$1') AS sql_delete,
193193
'UPDATE repack.table_' || R.oid || ' SET ' || repack.get_assign(R.oid, '$2') || ' WHERE ' || repack.get_compare_pkey(PK.indexrelid, '$1') AS sql_update,
194-
'DELETE FROM repack.log_' || R.oid || ' WHERE id = $1' AS sql_pop
194+
'DELETE FROM repack.log_' || R.oid || ' WHERE id IN (' AS sql_pop
195195
FROM pg_class R
196196
LEFT JOIN pg_class T ON R.reltoastrelid = T.oid
197197
LEFT JOIN repack.primary_keys PK

lib/repack.c

Lines changed: 30 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,7 @@ repack_trigger(PG_FUNCTION_ARGS)
190190
* @param sql_insert SQL to insert into temp table.
191191
* @param sql_delete SQL to delete from temp table.
192192
* @param sql_update SQL to update temp table.
193-
* @param sql_pop SQL to delete tuple from log table.
193+
* @param sql_pop SQL to bulk-delete tuples from log table.
194194
* @param count Max number of operations, or no count iff <=0.
195195
* @retval Number of performed operations.
196196
*/
@@ -203,18 +203,20 @@ repack_apply(PG_FUNCTION_ARGS)
203203
const char *sql_insert = PG_GETARG_CSTRING(1);
204204
const char *sql_delete = PG_GETARG_CSTRING(2);
205205
const char *sql_update = PG_GETARG_CSTRING(3);
206-
const char *sql_pop = PG_GETARG_CSTRING(4);
206+
/* sql_pop, the fourth arg, will be used in the loop below */
207207
int32 count = PG_GETARG_INT32(5);
208208

209209
SPIPlanPtr plan_peek = NULL;
210210
SPIPlanPtr plan_insert = NULL;
211211
SPIPlanPtr plan_delete = NULL;
212212
SPIPlanPtr plan_update = NULL;
213-
SPIPlanPtr plan_pop = NULL;
214213
uint32 n, i;
215214
Oid argtypes_peek[1] = { INT4OID };
216215
Datum values_peek[1];
217216
bool nulls_peek[1] = { 0 };
217+
StringInfoData sql_pop;
218+
219+
initStringInfo(&sql_pop);
218220

219221
/* authority check */
220222
must_be_superuser("repack_apply");
@@ -252,15 +254,22 @@ repack_apply(PG_FUNCTION_ARGS)
252254
argtypes[1] = SPI_gettypeid(desc, 2); /* pk */
253255
argtypes[2] = SPI_gettypeid(desc, 3); /* row */
254256

257+
resetStringInfo(&sql_pop);
258+
appendStringInfoString(&sql_pop, PG_GETARG_CSTRING(4));
259+
255260
for (i = 0; i < ntuples; i++, n++)
256261
{
257262
HeapTuple tuple;
263+
char *pkid;
258264

259265
tuple = tuptable->vals[i];
260266
values[0] = SPI_getbinval(tuple, desc, 1, &nulls[0]);
261267
values[1] = SPI_getbinval(tuple, desc, 2, &nulls[1]);
262268
values[2] = SPI_getbinval(tuple, desc, 3, &nulls[2]);
263269

270+
pkid = SPI_getvalue(tuple, desc, 1);
271+
Assert(pkid != NULL);
272+
264273
if (nulls[1])
265274
{
266275
/* INSERT */
@@ -282,15 +291,26 @@ repack_apply(PG_FUNCTION_ARGS)
282291
plan_update = repack_prepare(sql_update, 2, &argtypes[1]);
283292
execute_plan(SPI_OK_UPDATE, plan_update, &values[1], &nulls[1]);
284293
}
285-
/* Delete tuple in log.
286-
* XXX It would be a lot more efficient to perform
287-
* this DELETE in bulk, but be careful to only
288-
* delete log entries we have actually processed.
294+
295+
/* Add the primary key ID of each row from the log
296+
* table we have processed so far to this
297+
* DELETE ... IN (...) query string, so we
298+
* can delete all the rows we have processed at-once.
289299
*/
290-
if (plan_pop == NULL)
291-
plan_pop = repack_prepare(sql_pop, 1, argtypes);
292-
execute_plan(SPI_OK_DELETE, plan_pop, values, nulls);
300+
if (i == 0)
301+
appendStringInfoString(&sql_pop, pkid);
302+
else
303+
appendStringInfo(&sql_pop, ",%s", pkid);
304+
pfree(pkid);
293305
}
306+
/* i must be > 0 (and hence we must have some rows to delete)
307+
* since SPI_processed > 0
308+
*/
309+
Assert(i > 0);
310+
appendStringInfoString(&sql_pop, ");");
311+
312+
/* Bulk delete of processed rows from the log table */
313+
execute(SPI_OK_DELETE, sql_pop.data);
294314

295315
SPI_freetuptable(tuptable);
296316
}

0 commit comments

Comments
 (0)