Skip to content

Commit e00fd69

Browse files
committed
Transitional commit.
1 parent 3b9ebfd commit e00fd69

File tree

5 files changed

+111
-16
lines changed

5 files changed

+111
-16
lines changed

contrib/pg_exchange/dmq.h

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,10 @@ typedef int8 DmqSenderId;
2121

2222
extern void dmq_init(const char *library_name);
2323

24-
extern DmqDestinationId dmq_destination_add(char *connstr, char *sender_name,
25-
char *receiver_name, int ping_period);
24+
extern DmqDestinationId dmq_destination_add(char *connstr,
25+
char *sender_name,
26+
char *receiver_name,
27+
int ping_period);
2628
extern DmqConnState dmq_get_destination_status(DmqDestinationId dest_id);
2729
extern void dmq_destination_drop(const char *receiver_name);
2830

@@ -37,11 +39,13 @@ extern const char *dmq_sender_name(DmqSenderId id);
3739
extern DmqDestinationId dmq_remote_id(const char *name);
3840

3941
extern const char *
40-
dmq_pop(DmqSenderId *sender_id, void **msg, Size *len, uint64 mask, bool waitMsg);
42+
dmq_pop(DmqSenderId *sender_id, void **msg, Size *len, uint64 mask,
43+
bool waitMsg);
4144
extern bool dmq_pop_nb(DmqSenderId *sender_id, StringInfo msg, uint64 mask);
4245

4346
extern void dmq_push(DmqDestinationId dest_id, char *stream_name, char *msg);
44-
extern void dmq_push_buffer(DmqDestinationId dest_id, char *stream_name, const void *buffer, size_t len);
47+
extern void dmq_push_buffer(DmqDestinationId dest_id, char *stream_name,
48+
const void *buffer, size_t len);
4549

4650
typedef void (*dmq_receiver_hook_type) (const char *);
4751
extern dmq_receiver_hook_type dmq_receiver_start_hook;

contrib/pg_exchange/nodeDistPlanExec.c

Lines changed: 48 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ static CustomScanMethods distplanexec_plan_methods;
4444
static CustomExecMethods distplanexec_exec_methods;
4545

4646
char destsName[10] = "DMQ_DESTS";
47+
char *network_interface;
4748

4849

4950
static Node *CreateDistPlanExecState(CustomScan *node);
@@ -257,7 +258,7 @@ EstablishDMQConnections(const lcontext *context, const char *serverName,
257258
sprintf(connstr, "host=%s port=%d "
258259
"fallback_application_name=%s",
259260
host, port, senderName);
260-
261+
elog(LOG, "CONN STR: %s", connstr);
261262
sub->dest_id = dmq_destination_add(connstr, senderName, receiverName, 10);
262263
memcpy(sub->node, receiverName, strlen(receiverName) + 1);
263264
}
@@ -391,12 +392,9 @@ ExplainDistPlanExec(CustomScanState *node, List *ancestors, ExplainState *es)
391392
}
392393

393394
static struct Plan *
394-
CreateDistExecPlan(PlannerInfo *root,
395-
RelOptInfo *rel,
396-
struct CustomPath *best_path,
397-
List *tlist,
398-
List *clauses,
399-
List *custom_plans)
395+
CreateDistExecPlan(PlannerInfo *root, RelOptInfo *rel,
396+
struct CustomPath *best_path,
397+
List *tlist, List *clauses, List *custom_plans)
400398
{
401399
CustomScan *distExecNode;
402400

@@ -572,7 +570,7 @@ localize_plan(Plan *node, lcontext *context)
572570
if (IsExchangePlanNode(node))
573571
{
574572
List *private = ((CustomScan *) node)->custom_private;
575-
elog(LOG, "LOCALIZE: exchange");
573+
576574
if (lnext(lnext(list_head(private))))
577575
context->indexinfo = (IndexOptInfo *) lthird(private);
578576
}
@@ -582,7 +580,6 @@ elog(LOG, "LOCALIZE: exchange");
582580
if (context->foreign_scans != NIL)
583581
{
584582
CustomScan *css = (CustomScan *) node;
585-
// Index scanrelid = ((Scan *) cstmSubPlan1(node))->scanrelid;
586583

587584
Assert(list_length(context->foreign_scans) == 1);
588585
css->custom_plans = list_delete_ptr(css->custom_plans,
@@ -722,10 +719,48 @@ FSExtractServerName(Oid fsid, char **host, int *port)
722719
*host = hostname;
723720
}
724721

722+
#include <unistd.h>
723+
#include <sys/types.h>
724+
#include <sys/socket.h>
725+
#include <sys/ioctl.h>
726+
#include <netinet/in.h>
727+
#include <net/if.h>
728+
#include <arpa/inet.h>
729+
#include "common/ip.h"
730+
725731
void
726732
GetMyServerName(char **host, int *port)
727733
{
728-
*host = pstrdup(LOCALHOST);
734+
int fd;
735+
struct ifreq ifr;
736+
struct addrinfo hintp;
737+
struct addrinfo *result;
738+
char *sipaddr;
739+
struct sockaddr_storage saddr;
740+
int res;
741+
742+
fd = socket(AF_INET, SOCK_DGRAM, 0);
743+
744+
/* I want to get an IPv4 IP address */
745+
ifr.ifr_addr.sa_family = AF_INET;
746+
747+
/* I want IP address attached to "eth0" */
748+
strncpy(ifr.ifr_name, network_interface, IFNAMSIZ-1);
749+
ioctl(fd, SIOCGIFADDR, &ifr);
750+
close(fd);
751+
752+
MemSet(&hintp, 0, sizeof(hintp));
753+
hintp.ai_family = AF_INET;
754+
hintp.ai_flags = AI_ALL;
755+
sipaddr = inet_ntoa(((struct sockaddr_in *)&ifr.ifr_addr)->sin_addr);
756+
if ((res = pg_getaddrinfo_all(sipaddr, NULL, &hintp, &result)) != 0)
757+
elog(FATAL, "Cannot resolve network address %s, error=%d.", sipaddr, res);
758+
memcpy(&saddr, result->ai_addr, result->ai_addrlen);
759+
*host = (char *) palloc0(NI_MAXHOST);
760+
if (pg_getnameinfo_all(&saddr, result->ai_addrlen, *host, NI_MAXHOST,
761+
NULL, 0, 0) != 0)
762+
elog(FATAL, "Cannot resolve network name");
763+
729764
*port = PostPortNumber;
730765
}
731766

@@ -755,8 +790,9 @@ dmq_init_barrier(DMQDestCont *dmq_data, PlanState *child)
755790
/* Wait for dmq connection establishing */
756791
for (i = 0; i < dmq_data->nservers; i++)
757792
while (dmq_get_destination_status(dmq_data->dests[i].dest_id) != Active);
758-
793+
elog(LOG, "DMQ INIT BARRIER");
759794
init_exchange_channel(child, (void *) dmq_data);
795+
elog(LOG, "END DMQ INIT BARRIER");
760796
}
761797

762798
static bool
@@ -809,7 +845,7 @@ init_exchange_channel(PlanState *node, void *context)
809845
}
810846
else
811847
state->indexes[i] = j;
812-
848+
elog(LOG, "SendByteMessage: j=%d, dest_id=%d, stream=%s", j, dmq_data->dests[j].dest_id, state->stream);
813849
SendByteMessage(dmq_data->dests[j].dest_id, state->stream, ib);
814850
}
815851
return false;

contrib/pg_exchange/nodeDistPlanExec.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ typedef struct
2727
} lcontext;
2828

2929

30+
extern char *network_interface;
3031
extern char destsName[10];
3132
#define DISTEXECPATHNAME "DistExecPath"
3233

contrib/pg_exchange/pg_exchange.c

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,13 +55,25 @@ shmem_size(void)
5555
sizeof(DMQDestinations)));
5656
return MAXALIGN(size);
5757
}
58+
#include "common/ip.h"
59+
#include "arpa/inet.h"
60+
#include "sys/socket.h"
61+
#include <netinet/in.h>
62+
#include <netdb.h>
63+
#include <sys/un.h>
64+
#include "libpq/pqcomm.h"
5865

5966
/*
6067
* Module load/unload callback
6168
*/
6269
void
6370
_PG_init(void)
6471
{
72+
DefineCustomStringVariable("network_interface",
73+
"Set network interface for EXCHANGE communications",
74+
NULL, &network_interface, "lo",
75+
PGC_SUSET, GUC_NOT_IN_SAMPLE, NULL, NULL, NULL);
76+
6577
EXCHANGE_Init_methods();
6678
DUMMYSCAN_Init_methods();
6779
EXEC_Hooks_init();
@@ -72,6 +84,47 @@ _PG_init(void)
7284

7385
old_dmq_receiver_stop_hook = dmq_receiver_stop_hook;
7486
dmq_receiver_stop_hook = OnNodeDisconnect;
87+
/* {
88+
char host[1024];
89+
FILE *f = fopen("/home/andrey/PostgresPro/pgcluster/hosts.txt", "rt");
90+
struct addrinfo hintp;
91+
struct addrinfo *result;
92+
MemSet(&hintp, 0, sizeof(hintp));
93+
// hintp.ai_socktype = SOCK_STREAM;
94+
hintp.ai_family = AF_UNSPEC;
95+
hintp.ai_flags = AI_ALL;
96+
97+
Assert(f != NULL);
98+
while (!feof(f))
99+
{
100+
struct addrinfo *next;
101+
int i=0;
102+
int res1;
103+
104+
fscanf(f, "%s", host);
105+
res1 = pg_getaddrinfo_all(host, NULL, &hintp, &result);
106+
next = result;
107+
108+
while (next != NULL)
109+
{
110+
SockAddr a1;
111+
int res2;
112+
char node[NI_MAXHOST];
113+
char service[NI_MAXSERV];
114+
char *res;
115+
116+
res = inet_ntoa(((struct sockaddr_in *)next->ai_addr)->sin_addr);
117+
memcpy(&a1.addr, next->ai_addr, next->ai_addrlen);
118+
res2 = pg_getnameinfo_all(&a1.addr, next->ai_addrlen, node, NI_MAXHOST,
119+
service, NI_MAXSERV, 0);
120+
elog(LOG, "[%d] srchost: %s, res: [%d, %d] IP: %s, host: %s, service: %s.",
121+
i++, host, res1, res2, res, node, service);
122+
next = next->ai_next;
123+
}
124+
pg_freeaddrinfo_all(hintp.ai_family, result);
125+
}
126+
fclose(f);
127+
} */
75128
}
76129

77130
Datum

contrib/pg_exchange/stream.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -325,6 +325,7 @@ RecvTuple(TupleDesc tupdesc, char *streamName, int *status)
325325
*status = 3;
326326
break;
327327
}
328+
328329
pfree(buf);
329330
return (TupleTableSlot *) NULL;
330331
}

0 commit comments

Comments
 (0)