Skip to content

Commit 555960a

Browse files
committed
Create explain_dr.c and move DestReceiver-related code there.
explain.c has grown rather large, and the code that deals with the DestReceiver that supports the SERIALIZE option is pretty easily severable from the rest of explain.c; hence, move it to a separate file. Reviewed-by: Peter Geoghegan <pg@bowt.ie> Discussion: http://postgr.es/m/CA+TgmoYutMw1Jgo8BWUmB3TqnOhsEAJiYO=rOQufF4gPLWmkLQ@mail.gmail.com
1 parent 9173e8b commit 555960a

File tree

5 files changed

+341
-298
lines changed

5 files changed

+341
-298
lines changed

src/backend/commands/Makefile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ OBJS = \
3434
dropcmds.o \
3535
event_trigger.o \
3636
explain.o \
37+
explain_dr.o \
3738
explain_format.o \
3839
extension.o \
3940
foreigncmds.o \

src/backend/commands/explain.c

Lines changed: 1 addition & 298 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
#include "catalog/pg_type.h"
1818
#include "commands/createas.h"
1919
#include "commands/defrem.h"
20+
#include "commands/explain_dr.h"
2021
#include "commands/explain_format.h"
2122
#include "commands/prepare.h"
2223
#include "foreign/fdwapi.h"
@@ -50,14 +51,6 @@ ExplainOneQuery_hook_type ExplainOneQuery_hook = NULL;
5051
explain_get_index_name_hook_type explain_get_index_name_hook = NULL;
5152

5253

53-
/* Instrumentation data for SERIALIZE option */
54-
typedef struct SerializeMetrics
55-
{
56-
uint64 bytesSent; /* # of bytes serialized */
57-
instr_time timeSpent; /* time spent serializing */
58-
BufferUsage bufferUsage; /* buffers accessed during serialization */
59-
} SerializeMetrics;
60-
6154
/*
6255
* Various places within need to convert bytes to kilobytes. Round these up
6356
* to the next whole kilobyte.
@@ -161,7 +154,6 @@ static ExplainWorkersState *ExplainCreateWorkersState(int num_workers);
161154
static void ExplainOpenWorker(int n, ExplainState *es);
162155
static void ExplainCloseWorker(int n, ExplainState *es);
163156
static void ExplainFlushWorkersState(ExplainState *es);
164-
static SerializeMetrics GetSerializationMetrics(DestReceiver *dest);
165157

166158

167159

@@ -4939,292 +4931,3 @@ ExplainFlushWorkersState(ExplainState *es)
49394931
pfree(wstate->worker_state_save);
49404932
pfree(wstate);
49414933
}
4942-
4943-
/*
4944-
* DestReceiver functions for SERIALIZE option
4945-
*
4946-
* A DestReceiver for query tuples, that serializes passed rows into RowData
4947-
* messages while measuring the resources expended and total serialized size,
4948-
* while never sending the data to the client. This allows measuring the
4949-
* overhead of deTOASTing and datatype out/sendfuncs, which are not otherwise
4950-
* exercisable without actually hitting the network.
4951-
*/
4952-
typedef struct SerializeDestReceiver
4953-
{
4954-
DestReceiver pub;
4955-
ExplainState *es; /* this EXPLAIN statement's ExplainState */
4956-
int8 format; /* text or binary, like pq wire protocol */
4957-
TupleDesc attrinfo; /* the output tuple desc */
4958-
int nattrs; /* current number of columns */
4959-
FmgrInfo *finfos; /* precomputed call info for output fns */
4960-
MemoryContext tmpcontext; /* per-row temporary memory context */
4961-
StringInfoData buf; /* buffer to hold the constructed message */
4962-
SerializeMetrics metrics; /* collected metrics */
4963-
} SerializeDestReceiver;
4964-
4965-
/*
4966-
* Get the function lookup info that we'll need for output.
4967-
*
4968-
* This is a subset of what printtup_prepare_info() does. We don't need to
4969-
* cope with format choices varying across columns, so it's slightly simpler.
4970-
*/
4971-
static void
4972-
serialize_prepare_info(SerializeDestReceiver *receiver,
4973-
TupleDesc typeinfo, int nattrs)
4974-
{
4975-
/* get rid of any old data */
4976-
if (receiver->finfos)
4977-
pfree(receiver->finfos);
4978-
receiver->finfos = NULL;
4979-
4980-
receiver->attrinfo = typeinfo;
4981-
receiver->nattrs = nattrs;
4982-
if (nattrs <= 0)
4983-
return;
4984-
4985-
receiver->finfos = (FmgrInfo *) palloc0(nattrs * sizeof(FmgrInfo));
4986-
4987-
for (int i = 0; i < nattrs; i++)
4988-
{
4989-
FmgrInfo *finfo = receiver->finfos + i;
4990-
Form_pg_attribute attr = TupleDescAttr(typeinfo, i);
4991-
Oid typoutput;
4992-
Oid typsend;
4993-
bool typisvarlena;
4994-
4995-
if (receiver->format == 0)
4996-
{
4997-
/* wire protocol format text */
4998-
getTypeOutputInfo(attr->atttypid,
4999-
&typoutput,
5000-
&typisvarlena);
5001-
fmgr_info(typoutput, finfo);
5002-
}
5003-
else if (receiver->format == 1)
5004-
{
5005-
/* wire protocol format binary */
5006-
getTypeBinaryOutputInfo(attr->atttypid,
5007-
&typsend,
5008-
&typisvarlena);
5009-
fmgr_info(typsend, finfo);
5010-
}
5011-
else
5012-
ereport(ERROR,
5013-
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
5014-
errmsg("unsupported format code: %d", receiver->format)));
5015-
}
5016-
}
5017-
5018-
/*
5019-
* serializeAnalyzeReceive - collect tuples for EXPLAIN (SERIALIZE)
5020-
*
5021-
* This should match printtup() in printtup.c as closely as possible,
5022-
* except for the addition of measurement code.
5023-
*/
5024-
static bool
5025-
serializeAnalyzeReceive(TupleTableSlot *slot, DestReceiver *self)
5026-
{
5027-
TupleDesc typeinfo = slot->tts_tupleDescriptor;
5028-
SerializeDestReceiver *myState = (SerializeDestReceiver *) self;
5029-
MemoryContext oldcontext;
5030-
StringInfo buf = &myState->buf;
5031-
int natts = typeinfo->natts;
5032-
instr_time start,
5033-
end;
5034-
BufferUsage instr_start;
5035-
5036-
/* only measure time, buffers if requested */
5037-
if (myState->es->timing)
5038-
INSTR_TIME_SET_CURRENT(start);
5039-
if (myState->es->buffers)
5040-
instr_start = pgBufferUsage;
5041-
5042-
/* Set or update my derived attribute info, if needed */
5043-
if (myState->attrinfo != typeinfo || myState->nattrs != natts)
5044-
serialize_prepare_info(myState, typeinfo, natts);
5045-
5046-
/* Make sure the tuple is fully deconstructed */
5047-
slot_getallattrs(slot);
5048-
5049-
/* Switch into per-row context so we can recover memory below */
5050-
oldcontext = MemoryContextSwitchTo(myState->tmpcontext);
5051-
5052-
/*
5053-
* Prepare a DataRow message (note buffer is in per-query context)
5054-
*
5055-
* Note that we fill a StringInfo buffer the same as printtup() does, so
5056-
* as to capture the costs of manipulating the strings accurately.
5057-
*/
5058-
pq_beginmessage_reuse(buf, PqMsg_DataRow);
5059-
5060-
pq_sendint16(buf, natts);
5061-
5062-
/*
5063-
* send the attributes of this tuple
5064-
*/
5065-
for (int i = 0; i < natts; i++)
5066-
{
5067-
FmgrInfo *finfo = myState->finfos + i;
5068-
Datum attr = slot->tts_values[i];
5069-
5070-
if (slot->tts_isnull[i])
5071-
{
5072-
pq_sendint32(buf, -1);
5073-
continue;
5074-
}
5075-
5076-
if (myState->format == 0)
5077-
{
5078-
/* Text output */
5079-
char *outputstr;
5080-
5081-
outputstr = OutputFunctionCall(finfo, attr);
5082-
pq_sendcountedtext(buf, outputstr, strlen(outputstr));
5083-
}
5084-
else
5085-
{
5086-
/* Binary output */
5087-
bytea *outputbytes;
5088-
5089-
outputbytes = SendFunctionCall(finfo, attr);
5090-
pq_sendint32(buf, VARSIZE(outputbytes) - VARHDRSZ);
5091-
pq_sendbytes(buf, VARDATA(outputbytes),
5092-
VARSIZE(outputbytes) - VARHDRSZ);
5093-
}
5094-
}
5095-
5096-
/*
5097-
* We mustn't call pq_endmessage_reuse(), since that would actually send
5098-
* the data to the client. Just count the data, instead. We can leave
5099-
* the buffer alone; it'll be reset on the next iteration (as would also
5100-
* happen in printtup()).
5101-
*/
5102-
myState->metrics.bytesSent += buf->len;
5103-
5104-
/* Return to caller's context, and flush row's temporary memory */
5105-
MemoryContextSwitchTo(oldcontext);
5106-
MemoryContextReset(myState->tmpcontext);
5107-
5108-
/* Update timing data */
5109-
if (myState->es->timing)
5110-
{
5111-
INSTR_TIME_SET_CURRENT(end);
5112-
INSTR_TIME_ACCUM_DIFF(myState->metrics.timeSpent, end, start);
5113-
}
5114-
5115-
/* Update buffer metrics */
5116-
if (myState->es->buffers)
5117-
BufferUsageAccumDiff(&myState->metrics.bufferUsage,
5118-
&pgBufferUsage,
5119-
&instr_start);
5120-
5121-
return true;
5122-
}
5123-
5124-
/*
5125-
* serializeAnalyzeStartup - start up the serializeAnalyze receiver
5126-
*/
5127-
static void
5128-
serializeAnalyzeStartup(DestReceiver *self, int operation, TupleDesc typeinfo)
5129-
{
5130-
SerializeDestReceiver *receiver = (SerializeDestReceiver *) self;
5131-
5132-
Assert(receiver->es != NULL);
5133-
5134-
switch (receiver->es->serialize)
5135-
{
5136-
case EXPLAIN_SERIALIZE_NONE:
5137-
Assert(false);
5138-
break;
5139-
case EXPLAIN_SERIALIZE_TEXT:
5140-
receiver->format = 0; /* wire protocol format text */
5141-
break;
5142-
case EXPLAIN_SERIALIZE_BINARY:
5143-
receiver->format = 1; /* wire protocol format binary */
5144-
break;
5145-
}
5146-
5147-
/* Create per-row temporary memory context */
5148-
receiver->tmpcontext = AllocSetContextCreate(CurrentMemoryContext,
5149-
"SerializeTupleReceive",
5150-
ALLOCSET_DEFAULT_SIZES);
5151-
5152-
/* The output buffer is re-used across rows, as in printtup.c */
5153-
initStringInfo(&receiver->buf);
5154-
5155-
/* Initialize results counters */
5156-
memset(&receiver->metrics, 0, sizeof(SerializeMetrics));
5157-
INSTR_TIME_SET_ZERO(receiver->metrics.timeSpent);
5158-
}
5159-
5160-
/*
5161-
* serializeAnalyzeShutdown - shut down the serializeAnalyze receiver
5162-
*/
5163-
static void
5164-
serializeAnalyzeShutdown(DestReceiver *self)
5165-
{
5166-
SerializeDestReceiver *receiver = (SerializeDestReceiver *) self;
5167-
5168-
if (receiver->finfos)
5169-
pfree(receiver->finfos);
5170-
receiver->finfos = NULL;
5171-
5172-
if (receiver->buf.data)
5173-
pfree(receiver->buf.data);
5174-
receiver->buf.data = NULL;
5175-
5176-
if (receiver->tmpcontext)
5177-
MemoryContextDelete(receiver->tmpcontext);
5178-
receiver->tmpcontext = NULL;
5179-
}
5180-
5181-
/*
5182-
* serializeAnalyzeDestroy - destroy the serializeAnalyze receiver
5183-
*/
5184-
static void
5185-
serializeAnalyzeDestroy(DestReceiver *self)
5186-
{
5187-
pfree(self);
5188-
}
5189-
5190-
/*
5191-
* Build a DestReceiver for EXPLAIN (SERIALIZE) instrumentation.
5192-
*/
5193-
DestReceiver *
5194-
CreateExplainSerializeDestReceiver(ExplainState *es)
5195-
{
5196-
SerializeDestReceiver *self;
5197-
5198-
self = (SerializeDestReceiver *) palloc0(sizeof(SerializeDestReceiver));
5199-
5200-
self->pub.receiveSlot = serializeAnalyzeReceive;
5201-
self->pub.rStartup = serializeAnalyzeStartup;
5202-
self->pub.rShutdown = serializeAnalyzeShutdown;
5203-
self->pub.rDestroy = serializeAnalyzeDestroy;
5204-
self->pub.mydest = DestExplainSerialize;
5205-
5206-
self->es = es;
5207-
5208-
return (DestReceiver *) self;
5209-
}
5210-
5211-
/*
5212-
* GetSerializationMetrics - collect metrics
5213-
*
5214-
* We have to be careful here since the receiver could be an IntoRel
5215-
* receiver if the subject statement is CREATE TABLE AS. In that
5216-
* case, return all-zeroes stats.
5217-
*/
5218-
static SerializeMetrics
5219-
GetSerializationMetrics(DestReceiver *dest)
5220-
{
5221-
SerializeMetrics empty;
5222-
5223-
if (dest->mydest == DestExplainSerialize)
5224-
return ((SerializeDestReceiver *) dest)->metrics;
5225-
5226-
memset(&empty, 0, sizeof(SerializeMetrics));
5227-
INSTR_TIME_SET_ZERO(empty.timeSpent);
5228-
5229-
return empty;
5230-
}

0 commit comments

Comments
 (0)