@@ -52,10 +52,11 @@ static void *get_shared_state(void)
52
52
return shared .state ;
53
53
}
54
54
55
- static void try_next_peer (void )
55
+ static void select_next_peer (void )
56
56
{
57
- while (! wcfg . peers [ * shared . leader ]. up )
57
+ do {
58
58
* shared .leader = (* shared .leader + 1 ) % RAFTABLE_PEERS_MAX ;
59
+ } while (!wcfg .peers [* shared .leader ].up );
59
60
}
60
61
61
62
static void disconnect_leader (void )
@@ -64,7 +65,7 @@ static void disconnect_leader(void)
64
65
{
65
66
close (leadersock );
66
67
}
67
- try_next_peer ();
68
+ select_next_peer ();
68
69
leadersock = -1 ;
69
70
}
70
71
@@ -74,6 +75,9 @@ static bool connect_leader(void)
74
75
struct addrinfo hint ;
75
76
char portstr [6 ];
76
77
struct addrinfo * a ;
78
+ int rc ;
79
+
80
+ if (* shared .leader == NOBODY ) select_next_peer ();
77
81
78
82
HostPort * leaderhp = wcfg .peers + * shared .leader ;
79
83
@@ -83,10 +87,12 @@ static bool connect_leader(void)
83
87
snprintf (portstr , 6 , "%d" , leaderhp -> port );
84
88
hint .ai_protocol = getprotobyname ("tcp" )-> p_proto ;
85
89
86
- if (getaddrinfo (leaderhp -> host , portstr , & hint , & addrs ))
90
+ if (( rc = getaddrinfo (leaderhp -> host , portstr , & hint , & addrs ) ))
87
91
{
88
92
disconnect_leader ();
89
- perror ("failed to resolve address" );
93
+ fprintf (stderr , "failed to resolve address '%s:%d': %s" ,
94
+ leaderhp -> host , leaderhp -> port ,
95
+ gai_strerror (rc ));
90
96
return false;
91
97
}
92
98
@@ -168,11 +174,14 @@ void raftable_set(char *key, char *value)
168
174
size += vallen ;
169
175
ru = palloc (size );
170
176
177
+ ru -> expector = wcfg .id ;
178
+ ru -> fieldnum = 1 ;
179
+
171
180
RaftableField * f = (RaftableField * )ru -> data ;
172
181
f -> keylen = keylen ;
173
182
f -> vallen = vallen ;
174
183
memcpy (f -> data , key , keylen );
175
- memcpy (f -> data + keylen , key , vallen );
184
+ memcpy (f -> data + keylen , value , vallen );
176
185
177
186
bool ok = false;
178
187
while (!ok )
@@ -201,6 +210,18 @@ void raftable_set(char *key, char *value)
201
210
}
202
211
sent += newbytes ;
203
212
}
213
+
214
+ if (ok )
215
+ {
216
+ int status ;
217
+ int recved = read (s , & status , sizeof (status ));
218
+ if (recved != sizeof (status ))
219
+ {
220
+ disconnect_leader ();
221
+ fprintf (stderr , "failed to recv the update status from the leader\n" );
222
+ ok = false;
223
+ }
224
+ }
204
225
}
205
226
206
227
pfree (ru );
@@ -358,6 +379,7 @@ _PG_init(void)
358
379
);
359
380
parse_peers (wcfg .peers , peerstr );
360
381
382
+ request_shmem ();
361
383
worker_register (& wcfg );
362
384
363
385
PreviousShmemStartupHook = shmem_startup_hook ;
0 commit comments