@@ -232,6 +232,7 @@ static void XLogSendLogical(void);
232
232
static void WalSndDone (WalSndSendDataCallback send_data );
233
233
static XLogRecPtr GetStandbyFlushRecPtr (void );
234
234
static void IdentifySystem (void );
235
+ static void ReadReplicationSlot (ReadReplicationSlotCmd * cmd );
235
236
static void CreateReplicationSlot (CreateReplicationSlotCmd * cmd );
236
237
static void DropReplicationSlot (DropReplicationSlotCmd * cmd );
237
238
static void StartReplication (StartReplicationCmd * cmd );
@@ -457,6 +458,104 @@ IdentifySystem(void)
457
458
end_tup_output (tstate );
458
459
}
459
460
461
+ /* Handle READ_REPLICATION_SLOT command */
462
+ static void
463
+ ReadReplicationSlot (ReadReplicationSlotCmd * cmd )
464
+ {
465
+ #define READ_REPLICATION_SLOT_COLS 3
466
+ ReplicationSlot * slot ;
467
+ DestReceiver * dest ;
468
+ TupOutputState * tstate ;
469
+ TupleDesc tupdesc ;
470
+ Datum values [READ_REPLICATION_SLOT_COLS ];
471
+ bool nulls [READ_REPLICATION_SLOT_COLS ];
472
+
473
+ tupdesc = CreateTemplateTupleDesc (READ_REPLICATION_SLOT_COLS );
474
+ TupleDescInitBuiltinEntry (tupdesc , (AttrNumber ) 1 , "slot_type" ,
475
+ TEXTOID , -1 , 0 );
476
+ TupleDescInitBuiltinEntry (tupdesc , (AttrNumber ) 2 , "restart_lsn" ,
477
+ TEXTOID , -1 , 0 );
478
+ /* TimeLineID is unsigned, so int4 is not wide enough. */
479
+ TupleDescInitBuiltinEntry (tupdesc , (AttrNumber ) 3 , "restart_tli" ,
480
+ INT8OID , -1 , 0 );
481
+
482
+ MemSet (values , 0 , READ_REPLICATION_SLOT_COLS * sizeof (Datum ));
483
+ MemSet (nulls , true, READ_REPLICATION_SLOT_COLS * sizeof (bool ));
484
+
485
+ LWLockAcquire (ReplicationSlotControlLock , LW_SHARED );
486
+ slot = SearchNamedReplicationSlot (cmd -> slotname , false);
487
+ if (slot == NULL || !slot -> in_use )
488
+ {
489
+ LWLockRelease (ReplicationSlotControlLock );
490
+ }
491
+ else
492
+ {
493
+ ReplicationSlot slot_contents ;
494
+ int i = 0 ;
495
+
496
+ /* Copy slot contents while holding spinlock */
497
+ SpinLockAcquire (& slot -> mutex );
498
+ slot_contents = * slot ;
499
+ SpinLockRelease (& slot -> mutex );
500
+ LWLockRelease (ReplicationSlotControlLock );
501
+
502
+ if (OidIsValid (slot_contents .data .database ))
503
+ ereport (ERROR ,
504
+ errcode (ERRCODE_FEATURE_NOT_SUPPORTED ),
505
+ errmsg ("cannot use \"%s\" with logical replication slot \"%s\"" ,
506
+ "READ_REPLICATION_SLOT" ,
507
+ NameStr (slot_contents .data .name )));
508
+
509
+ /* slot type */
510
+ values [i ] = CStringGetTextDatum ("physical" );
511
+ nulls [i ] = false;
512
+ i ++ ;
513
+
514
+ /* start LSN */
515
+ if (!XLogRecPtrIsInvalid (slot_contents .data .restart_lsn ))
516
+ {
517
+ char xloc [64 ];
518
+
519
+ snprintf (xloc , sizeof (xloc ), "%X/%X" ,
520
+ LSN_FORMAT_ARGS (slot_contents .data .restart_lsn ));
521
+ values [i ] = CStringGetTextDatum (xloc );
522
+ nulls [i ] = false;
523
+ }
524
+ i ++ ;
525
+
526
+ /* timeline this WAL was produced on */
527
+ if (!XLogRecPtrIsInvalid (slot_contents .data .restart_lsn ))
528
+ {
529
+ TimeLineID slots_position_timeline ;
530
+ TimeLineID current_timeline ;
531
+ List * timeline_history = NIL ;
532
+
533
+ /*
534
+ * While in recovery, use as timeline the currently-replaying one
535
+ * to get the LSN position's history.
536
+ */
537
+ if (RecoveryInProgress ())
538
+ (void ) GetXLogReplayRecPtr (& current_timeline );
539
+ else
540
+ current_timeline = ThisTimeLineID ;
541
+
542
+ timeline_history = readTimeLineHistory (current_timeline );
543
+ slots_position_timeline = tliOfPointInHistory (slot_contents .data .restart_lsn ,
544
+ timeline_history );
545
+ values [i ] = Int64GetDatum ((int64 ) slots_position_timeline );
546
+ nulls [i ] = false;
547
+ }
548
+ i ++ ;
549
+
550
+ Assert (i == READ_REPLICATION_SLOT_COLS );
551
+ }
552
+
553
+ dest = CreateDestReceiver (DestRemoteSimple );
554
+ tstate = begin_tup_output_tupdesc (dest , tupdesc , & TTSOpsVirtual );
555
+ do_tup_output (tstate , values , nulls );
556
+ end_tup_output (tstate );
557
+ }
558
+
460
559
461
560
/*
462
561
* Handle TIMELINE_HISTORY command.
@@ -1622,6 +1721,13 @@ exec_replication_command(const char *cmd_string)
1622
1721
EndReplicationCommand (cmdtag );
1623
1722
break ;
1624
1723
1724
+ case T_ReadReplicationSlotCmd :
1725
+ cmdtag = "READ_REPLICATION_SLOT" ;
1726
+ set_ps_display (cmdtag );
1727
+ ReadReplicationSlot ((ReadReplicationSlotCmd * ) cmd_node );
1728
+ EndReplicationCommand (cmdtag );
1729
+ break ;
1730
+
1625
1731
case T_BaseBackupCmd :
1626
1732
cmdtag = "BASE_BACKUP" ;
1627
1733
set_ps_display (cmdtag );
0 commit comments