-
Notifications
You must be signed in to change notification settings - Fork 902
feat: add agent acks to in-memory coordinator #12786
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
1c287b2
4b14af0
7e2d1bb
3557c11
1347370
2896d6b
214d380
4aaa0e2
5660e03
298655b
88416d2
bfda37e
2ee3c51
cae734d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
When an agent receives a node, it responds with an ACK which is relayed to the client. After the client receives the ACK, it's allowed to begin pinging.
- Loading branch information
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -102,6 +102,7 @@ type Coordinatee interface { | |
} | ||
|
||
type Coordination interface { | ||
AwaitAck() <-chan struct{} | ||
coadler marked this conversation as resolved.
Show resolved
Hide resolved
|
||
io.Closer | ||
Error() <-chan error | ||
} | ||
|
@@ -111,9 +112,14 @@ type remoteCoordination struct { | |
closed bool | ||
errChan chan error | ||
coordinatee Coordinatee | ||
tgt uuid.UUID | ||
logger slog.Logger | ||
protocol proto.DRPCTailnet_CoordinateClient | ||
respLoopDone chan struct{} | ||
|
||
ackOnce sync.Once | ||
// tgtAck is closed when an ack from tgt is received. | ||
tgtAck chan struct{} | ||
coadler marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
|
||
func (c *remoteCoordination) Close() (retErr error) { | ||
|
@@ -161,14 +167,54 @@ func (c *remoteCoordination) respLoop() { | |
c.sendErr(xerrors.Errorf("read: %w", err)) | ||
return | ||
} | ||
err = c.coordinatee.UpdatePeers(resp.GetPeerUpdates()) | ||
if err != nil { | ||
c.sendErr(xerrors.Errorf("update peers: %w", err)) | ||
return | ||
|
||
if len(resp.GetPeerUpdates()) > 0 { | ||
err = c.coordinatee.UpdatePeers(resp.GetPeerUpdates()) | ||
if err != nil { | ||
c.sendErr(xerrors.Errorf("update peers: %w", err)) | ||
return | ||
} | ||
|
||
// Only send acks from agents. | ||
if c.tgt == uuid.Nil { | ||
// Send an ack back for all received peers. This could | ||
// potentially be smarter to only send an ACK once per client, | ||
// but there's nothing currently stopping clients from reusing | ||
// IDs. | ||
for _, peer := range resp.GetPeerUpdates() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I worry this is too superficial --- here we are only acknowledging the fact that we received a peer update, not that it was programmed into wireguard, which is what is actually needed for the handshake to complete. I guess this is an OK start in that it cuts out any propagation delay from the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, completely eliminating the race would require digging down into the configmaps which I wasn't keen to do unless necessary. In my testing with the in-memory coordinator I wasn't able to hit the 5s backoff anymore. I suspect pgcoord to actually fare better considering the extra round trip latency as compared to the in-memory coordinator. |
||
err := c.protocol.Send(&proto.CoordinateRequest{ | ||
TunnelAck: &proto.CoordinateRequest_Ack{Id: peer.Id}, | ||
}) | ||
if err != nil { | ||
c.sendErr(xerrors.Errorf("send: %w", err)) | ||
return | ||
} | ||
} | ||
} | ||
} | ||
|
||
// If we receive an ack, close the tgtAck channel to notify the waiting | ||
// client. | ||
if ack := resp.GetTunnelAck(); ack != nil { | ||
dstID, err := uuid.FromBytes(ack.Id) | ||
if err != nil { | ||
c.sendErr(xerrors.Errorf("parse ack id: %w", err)) | ||
return | ||
} | ||
|
||
if c.tgt == dstID { | ||
c.ackOnce.Do(func() { | ||
close(c.tgtAck) | ||
}) | ||
} | ||
} | ||
} | ||
} | ||
|
||
func (c *remoteCoordination) AwaitAck() <-chan struct{} { | ||
return c.tgtAck | ||
} | ||
|
||
// NewRemoteCoordination uses the provided protocol to coordinate the provided coordinatee (usually a | ||
// Conn). If the tunnelTarget is not uuid.Nil, then we add a tunnel to the peer (i.e. we are acting as | ||
// a client---agents should NOT set this!). | ||
|
@@ -179,9 +225,11 @@ func NewRemoteCoordination(logger slog.Logger, | |
c := &remoteCoordination{ | ||
errChan: make(chan error, 1), | ||
coordinatee: coordinatee, | ||
tgt: tunnelTarget, | ||
logger: logger, | ||
protocol: protocol, | ||
respLoopDone: make(chan struct{}), | ||
tgtAck: make(chan struct{}), | ||
} | ||
if tunnelTarget != uuid.Nil { | ||
c.Lock() | ||
|
@@ -327,6 +375,13 @@ func (c *inMemoryCoordination) respLoop() { | |
} | ||
} | ||
|
||
func (*inMemoryCoordination) AwaitAck() <-chan struct{} { | ||
// This is only used for tests, so just return a closed channel. | ||
ch := make(chan struct{}) | ||
close(ch) | ||
return ch | ||
} | ||
|
||
func (c *inMemoryCoordination) Close() error { | ||
c.Lock() | ||
defer c.Unlock() | ||
|
@@ -658,6 +713,31 @@ func (c *core) handleRequest(p *peer, req *proto.CoordinateRequest) error { | |
if req.Disconnect != nil { | ||
c.removePeerLocked(p.id, proto.CoordinateResponse_PeerUpdate_DISCONNECTED, "graceful disconnect") | ||
} | ||
if ack := req.TunnelAck; ack != nil { | ||
err := c.handleAckLocked(pr, ack) | ||
if err != nil { | ||
return xerrors.Errorf("handle ack: %w", err) | ||
} | ||
} | ||
return nil | ||
} | ||
|
||
func (c *core) handleAckLocked(src *peer, ack *proto.CoordinateRequest_Ack) error { | ||
dstID, err := uuid.FromBytes(ack.Id) | ||
if err != nil { | ||
// this shouldn't happen unless there is a client error. Close the connection so the client | ||
// doesn't just happily continue thinking everything is fine. | ||
return xerrors.Errorf("unable to convert bytes to UUID: %w", err) | ||
} | ||
|
||
dst, ok := c.peers[dstID] | ||
coadler marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if ok { | ||
dst.resps <- &proto.CoordinateResponse{ | ||
coadler marked this conversation as resolved.
Show resolved
Hide resolved
|
||
TunnelAck: &proto.CoordinateResponse_Ack{ | ||
Id: src.id[:], | ||
}, | ||
} | ||
} | ||
return nil | ||
} | ||
|
||
|
Uh oh!
There was an error while loading. Please reload this page.