@@ -421,9 +421,11 @@ bgw_main_concurrent_part(Datum main_arg)
421
421
/* Do the job */
422
422
do
423
423
{
424
- Oid types [2 ] = { OIDOID , INT4OID };
425
- Datum vals [2 ] = { part_slot -> relid , part_slot -> batch_size };
426
- bool nulls [2 ] = { false, false };
424
+ MemoryContext old_mcxt ;
425
+
426
+ Oid types [2 ] = { OIDOID , INT4OID };
427
+ Datum vals [2 ] = { part_slot -> relid , part_slot -> batch_size };
428
+ bool nulls [2 ] = { false, false };
427
429
428
430
/* Reset loop variables */
429
431
failed = false;
@@ -432,22 +434,25 @@ bgw_main_concurrent_part(Datum main_arg)
432
434
/* Start new transaction (syscache access etc.) */
433
435
StartTransactionCommand ();
434
436
437
+ /* We'll need this to recover from errors */
438
+ old_mcxt = CurrentMemoryContext ;
439
+
435
440
SPI_connect ();
436
441
PushActiveSnapshot (GetTransactionSnapshot ());
437
442
438
443
/* Prepare the query if needed */
439
444
if (sql == NULL )
440
445
{
441
- MemoryContext oldcontext ;
446
+ MemoryContext current_mcxt ;
442
447
443
448
/*
444
449
* Allocate as SQL query in top memory context because current
445
450
* context will be destroyed after transaction finishes
446
451
*/
447
- oldcontext = MemoryContextSwitchTo (TopMemoryContext );
452
+ current_mcxt = MemoryContextSwitchTo (TopMemoryContext );
448
453
sql = psprintf ("SELECT %s._partition_data_concurrent($1::oid, p_limit:=$2)" ,
449
454
get_namespace_name (get_pathman_schema ()));
450
- MemoryContextSwitchTo (oldcontext );
455
+ MemoryContextSwitchTo (current_mcxt );
451
456
}
452
457
453
458
/* Exec ret = _partition_data_concurrent() */
@@ -471,21 +476,33 @@ bgw_main_concurrent_part(Datum main_arg)
471
476
}
472
477
PG_CATCH ();
473
478
{
474
- ErrorData * error ;
475
-
476
- EmitErrorReport ();
479
+ ErrorData * error ;
480
+ char * sleep_time_str ;
477
481
482
+ /* Switch to the original context & copy edata */
483
+ MemoryContextSwitchTo (old_mcxt );
478
484
error = CopyErrorData ();
479
- elog (LOG , "%s: %s" , concurrent_part_bgw , error -> message );
480
485
FlushErrorState ();
486
+
487
+ /* Print messsage for this BGWorker to server log */
488
+ sleep_time_str = datum_to_cstring (Float8GetDatum (part_slot -> sleep_time ),
489
+ FLOAT8OID );
490
+ ereport (LOG ,
491
+ (errmsg ("%s: %s" , concurrent_part_bgw , error -> message ),
492
+ errdetail ("Attempt: %d/%d, sleep time: %s" ,
493
+ failures_count + 1 ,
494
+ PART_WORKER_MAX_ATTEMPTS ,
495
+ sleep_time_str )));
496
+ pfree (sleep_time_str ); /* free the time string */
497
+
481
498
FreeErrorData (error );
482
499
483
500
/*
484
501
* The most common exception we can catch here is a deadlock with
485
502
* concurrent user queries. Check that attempts count doesn't exceed
486
503
* some reasonable value
487
504
*/
488
- if (failures_count ++ > PART_WORKER_MAX_ATTEMPTS )
505
+ if (failures_count ++ >= PART_WORKER_MAX_ATTEMPTS )
489
506
{
490
507
/* Mark slot as FREE */
491
508
part_slot -> worker_status = WS_FREE ;
@@ -510,8 +527,11 @@ bgw_main_concurrent_part(Datum main_arg)
510
527
if (failed )
511
528
{
512
529
#ifdef USE_ASSERT_CHECKING
513
- elog (DEBUG2 , "%s: could not relocate batch, total: %lu [%u]" ,
514
- concurrent_part_bgw , part_slot -> total_rows , MyProcPid );
530
+ elog (DEBUG1 , "%s: could not relocate batch (%d/%d), total: %lu [%u]" ,
531
+ concurrent_part_bgw ,
532
+ failures_count , PART_WORKER_MAX_ATTEMPTS , /* current/max */
533
+ part_slot -> total_rows ,
534
+ MyProcPid );
515
535
#endif
516
536
517
537
/* Abort transaction and sleep for a second */
@@ -528,7 +548,7 @@ bgw_main_concurrent_part(Datum main_arg)
528
548
part_slot -> total_rows += rows ;
529
549
530
550
#ifdef USE_ASSERT_CHECKING
531
- elog (DEBUG2 , "%s: relocated %d rows, total: %lu [%u]" ,
551
+ elog (DEBUG1 , "%s: relocated %d rows, total: %lu [%u]" ,
532
552
concurrent_part_bgw , rows , part_slot -> total_rows , MyProcPid );
533
553
#endif
534
554
}
0 commit comments