@@ -24,16 +24,17 @@ type connIO struct {
24
24
// coordCtx is the parent context, that is, the context of the Coordinator
25
25
coordCtx context.Context
26
26
// peerCtx is the context of the connection to our peer
27
- peerCtx context.Context
28
- cancel context.CancelFunc
29
- logger slog.Logger
30
- requests <- chan * proto.CoordinateRequest
31
- responses chan <- * proto.CoordinateResponse
32
- bindings chan <- binding
33
- tunnels chan <- tunnel
34
- auth agpl.TunnelAuth
35
- mu sync.Mutex
36
- closed bool
27
+ peerCtx context.Context
28
+ cancel context.CancelFunc
29
+ logger slog.Logger
30
+ requests <- chan * proto.CoordinateRequest
31
+ responses chan <- * proto.CoordinateResponse
32
+ bindings chan <- binding
33
+ tunnels chan <- tunnel
34
+ auth agpl.TunnelAuth
35
+ mu sync.Mutex
36
+ closed bool
37
+ disconnected bool
37
38
38
39
name string
39
40
start int64
@@ -76,20 +77,29 @@ func newConnIO(coordContext context.Context,
76
77
77
78
func (c * connIO ) recvLoop () {
78
79
defer func () {
79
- // withdraw bindings & tunnels when we exit. We need to use the parent context here, since
80
+ // withdraw bindings & tunnels when we exit. We need to use the coordinator context here, since
80
81
// our own context might be canceled, but we still need to withdraw.
81
82
b := binding {
82
83
bKey : bKey (c .UniqueID ()),
84
+ kind : proto .CoordinateResponse_PeerUpdate_LOST ,
85
+ }
86
+ if c .disconnected {
87
+ b .kind = proto .CoordinateResponse_PeerUpdate_DISCONNECTED
83
88
}
84
89
if err := sendCtx (c .coordCtx , c .bindings , b ); err != nil {
85
90
c .logger .Debug (c .coordCtx , "parent context expired while withdrawing bindings" , slog .Error (err ))
86
91
}
87
- t := tunnel {
88
- tKey : tKey {src : c .UniqueID ()},
89
- active : false ,
90
- }
91
- if err := sendCtx (c .coordCtx , c .tunnels , t ); err != nil {
92
- c .logger .Debug (c .coordCtx , "parent context expired while withdrawing tunnels" , slog .Error (err ))
92
+ // only remove tunnels on graceful disconnect. If we remove tunnels for lost peers, then
93
+ // this will look like a disconnect from the peer perspective, since we query for active peers
94
+ // by using the tunnel as a join in the database
95
+ if c .disconnected {
96
+ t := tunnel {
97
+ tKey : tKey {src : c .UniqueID ()},
98
+ active : false ,
99
+ }
100
+ if err := sendCtx (c .coordCtx , c .tunnels , t ); err != nil {
101
+ c .logger .Debug (c .coordCtx , "parent context expired while withdrawing tunnels" , slog .Error (err ))
102
+ }
93
103
}
94
104
}()
95
105
defer c .Close ()
@@ -111,13 +121,16 @@ func (c *connIO) recvLoop() {
111
121
}
112
122
}
113
123
124
+ var errDisconnect = xerrors .New ("graceful disconnect" )
125
+
114
126
func (c * connIO ) handleRequest (req * proto.CoordinateRequest ) error {
115
127
c .logger .Debug (c .peerCtx , "got request" )
116
128
if req .UpdateSelf != nil {
117
129
c .logger .Debug (c .peerCtx , "got node update" , slog .F ("node" , req .UpdateSelf ))
118
130
b := binding {
119
131
bKey : bKey (c .UniqueID ()),
120
132
node : req .UpdateSelf .Node ,
133
+ kind : proto .CoordinateResponse_PeerUpdate_NODE ,
121
134
}
122
135
if err := sendCtx (c .coordCtx , c .bindings , b ); err != nil {
123
136
c .logger .Debug (c .peerCtx , "failed to send binding" , slog .Error (err ))
@@ -169,7 +182,11 @@ func (c *connIO) handleRequest(req *proto.CoordinateRequest) error {
169
182
return err
170
183
}
171
184
}
172
- // TODO: (spikecurtis) support Disconnect
185
+ if req .Disconnect != nil {
186
+ c .logger .Debug (c .peerCtx , "graceful disconnect" )
187
+ c .disconnected = true
188
+ return errDisconnect
189
+ }
173
190
return nil
174
191
}
175
192
0 commit comments