Skip to content

Commit 9b4d973

Browse files
committed
Fix syslogger to not lose log coherency under high load.
The original coding of the syslogger had an arbitrary limit of 20 large messages concurrently in progress, after which it would just punt and dump message fragments to the output file separately. Our ambitions are a bit higher than that now, so allow the data structure to expand as necessary. Reported and patched by Andrew Dunstan; some editing by Tom
1 parent 49281db commit 9b4d973

File tree

1 file changed

+71
-61
lines changed

1 file changed

+71
-61
lines changed

src/backend/postmaster/syslogger.c

Lines changed: 71 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
#include "lib/stringinfo.h"
3535
#include "libpq/pqsignal.h"
3636
#include "miscadmin.h"
37+
#include "nodes/pg_list.h"
3738
#include "pgtime.h"
3839
#include "postmaster/fork_process.h"
3940
#include "postmaster/postmaster.h"
@@ -92,11 +93,14 @@ static char *last_file_name = NULL;
9293
static char *last_csv_file_name = NULL;
9394

9495
/*
95-
* Buffers for saving partial messages from different backends. We don't expect
96-
* that there will be very many outstanding at one time, so 20 seems plenty of
97-
* leeway. If this array gets full we won't lose messages, but we will lose
98-
* the protocol protection against them being partially written or interleaved.
96+
* Buffers for saving partial messages from different backends.
9997
*
98+
* Keep NBUFFER_LISTS lists of these, with the entry for a given source pid
99+
* being in the list numbered (pid % NBUFFER_LISTS), so as to cut down on
100+
* the number of entries we have to examine for any one incoming message.
101+
* There must never be more than one entry for the same source pid.
102+
*
103+
* An inactive buffer is not removed from its list, just held for re-use.
100104
* An inactive buffer has pid == 0 and undefined contents of data.
101105
*/
102106
typedef struct
@@ -105,8 +109,8 @@ typedef struct
105109
StringInfoData data; /* accumulated data, as a StringInfo */
106110
} save_buffer;
107111

108-
#define CHUNK_SLOTS 20
109-
static save_buffer saved_chunks[CHUNK_SLOTS];
112+
#define NBUFFER_LISTS 256
113+
static List *buffer_lists[NBUFFER_LISTS];
110114

111115
/* These must be exported for EXEC_BACKEND case ... annoying */
112116
#ifndef WIN32
@@ -597,7 +601,7 @@ SysLogger_Start(void)
597601
* Now we are done with the write end of the pipe.
598602
* CloseHandle() must not be called because the preceding
599603
* close() closes the underlying handle.
600-
*/
604+
*/
601605
syslogPipe[1] = 0;
602606
#endif
603607
redirection_done = true;
@@ -739,6 +743,12 @@ process_pipe_input(char *logbuffer, int *bytes_in_logbuffer)
739743
(p.is_last == 't' || p.is_last == 'f' ||
740744
p.is_last == 'T' || p.is_last == 'F'))
741745
{
746+
List *buffer_list;
747+
ListCell *cell;
748+
save_buffer *existing_slot = NULL,
749+
*free_slot = NULL;
750+
StringInfo str;
751+
742752
chunklen = PIPE_HEADER_SIZE + p.len;
743753

744754
/* Fall out of loop if we don't have the whole chunk yet */
@@ -748,80 +758,70 @@ process_pipe_input(char *logbuffer, int *bytes_in_logbuffer)
748758
dest = (p.is_last == 'T' || p.is_last == 'F') ?
749759
LOG_DESTINATION_CSVLOG : LOG_DESTINATION_STDERR;
750760

751-
if (p.is_last == 'f' || p.is_last == 'F')
761+
/* Locate any existing buffer for this source pid */
762+
buffer_list = buffer_lists[p.pid % NBUFFER_LISTS];
763+
foreach(cell, buffer_list)
752764
{
753-
/*
754-
* Save a complete non-final chunk in the per-pid buffer if
755-
* possible - if not just write it out.
756-
*/
757-
int free_slot = -1,
758-
existing_slot = -1;
759-
int i;
760-
StringInfo str;
765+
save_buffer *buf = (save_buffer *) lfirst(cell);
761766

762-
for (i = 0; i < CHUNK_SLOTS; i++)
767+
if (buf->pid == p.pid)
763768
{
764-
if (saved_chunks[i].pid == p.pid)
765-
{
766-
existing_slot = i;
767-
break;
768-
}
769-
if (free_slot < 0 && saved_chunks[i].pid == 0)
770-
free_slot = i;
769+
existing_slot = buf;
770+
break;
771771
}
772-
if (existing_slot >= 0)
772+
if (buf->pid == 0 && free_slot == NULL)
773+
free_slot = buf;
774+
}
775+
776+
if (p.is_last == 'f' || p.is_last == 'F')
777+
{
778+
/*
779+
* Save a complete non-final chunk in a per-pid buffer
780+
*/
781+
if (existing_slot != NULL)
773782
{
774-
str = &(saved_chunks[existing_slot].data);
783+
/* Add chunk to data from preceding chunks */
784+
str = &(existing_slot->data);
775785
appendBinaryStringInfo(str,
776786
cursor + PIPE_HEADER_SIZE,
777787
p.len);
778788
}
779-
else if (free_slot >= 0)
789+
else
780790
{
781-
saved_chunks[free_slot].pid = p.pid;
782-
str = &(saved_chunks[free_slot].data);
791+
/* First chunk of message, save in a new buffer */
792+
if (free_slot == NULL)
793+
{
794+
/*
795+
* Need a free slot, but there isn't one in the list,
796+
* so create a new one and extend the list with it.
797+
*/
798+
free_slot = palloc(sizeof(save_buffer));
799+
buffer_list = lappend(buffer_list, free_slot);
800+
buffer_lists[p.pid % NBUFFER_LISTS] = buffer_list;
801+
}
802+
free_slot->pid = p.pid;
803+
str = &(free_slot->data);
783804
initStringInfo(str);
784805
appendBinaryStringInfo(str,
785806
cursor + PIPE_HEADER_SIZE,
786807
p.len);
787808
}
788-
else
789-
{
790-
/*
791-
* If there is no free slot we'll just have to take our
792-
* chances and write out a partial message and hope that
793-
* it's not followed by something from another pid.
794-
*/
795-
write_syslogger_file(cursor + PIPE_HEADER_SIZE, p.len,
796-
dest);
797-
}
798809
}
799810
else
800811
{
801812
/*
802813
* Final chunk --- add it to anything saved for that pid, and
803814
* either way write the whole thing out.
804815
*/
805-
int existing_slot = -1;
806-
int i;
807-
StringInfo str;
808-
809-
for (i = 0; i < CHUNK_SLOTS; i++)
810-
{
811-
if (saved_chunks[i].pid == p.pid)
812-
{
813-
existing_slot = i;
814-
break;
815-
}
816-
}
817-
if (existing_slot >= 0)
816+
if (existing_slot != NULL)
818817
{
819-
str = &(saved_chunks[existing_slot].data);
818+
str = &(existing_slot->data);
820819
appendBinaryStringInfo(str,
821820
cursor + PIPE_HEADER_SIZE,
822821
p.len);
823822
write_syslogger_file(str->data, str->len, dest);
824-
saved_chunks[existing_slot].pid = 0;
823+
/* Mark the buffer unused, and reclaim string storage */
824+
existing_slot->pid = 0;
825825
pfree(str->data);
826826
}
827827
else
@@ -877,17 +877,27 @@ static void
877877
flush_pipe_input(char *logbuffer, int *bytes_in_logbuffer)
878878
{
879879
int i;
880-
StringInfo str;
881880

882881
/* Dump any incomplete protocol messages */
883-
for (i = 0; i < CHUNK_SLOTS; i++)
882+
for (i = 0; i < NBUFFER_LISTS; i++)
884883
{
885-
if (saved_chunks[i].pid != 0)
884+
List *list = buffer_lists[i];
885+
ListCell *cell;
886+
887+
foreach(cell, list)
886888
{
887-
str = &(saved_chunks[i].data);
888-
write_syslogger_file(str->data, str->len, LOG_DESTINATION_STDERR);
889-
saved_chunks[i].pid = 0;
890-
pfree(str->data);
889+
save_buffer *buf = (save_buffer *) lfirst(cell);
890+
891+
if (buf->pid != 0)
892+
{
893+
StringInfo str = &(buf->data);
894+
895+
write_syslogger_file(str->data, str->len,
896+
LOG_DESTINATION_STDERR);
897+
/* Mark the buffer unused, and reclaim string storage */
898+
buf->pid = 0;
899+
pfree(str->data);
900+
}
891901
}
892902
}
893903

0 commit comments

Comments
 (0)