-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsender.go
132 lines (115 loc) · 3.41 KB
/
sender.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
// Package sender contains an actor that receives messages from both the backend manager
// and postgres, and it forwards those messages to the client application
package sender
import (
"log"
"time"
"github.com/coder543/roundabout/frontend"
"github.com/coder543/roundabout/misc"
"github.com/jackc/pgproto3/v2"
)
var silentChannel = make(<-chan pgproto3.BackendMessage, 1)
func Launch(
closeBackend func(),
detachDB func(txStatus byte) bool,
send func(msg pgproto3.BackendMessage) error,
closed <-chan struct{},
newDB <-chan frontend.AttachChannels,
) chan<- pgproto3.BackendMessage {
out := make(chan pgproto3.BackendMessage, 1)
go sender(
closeBackend,
detachDB,
send,
closed,
newDB,
out,
)
return out
}
func sender(
closeBackend func(),
detachDB func(txStatus byte) bool,
send func(msg pgproto3.BackendMessage) error,
closed <-chan struct{},
newDB <-chan frontend.AttachChannels,
out <-chan pgproto3.BackendMessage,
) {
defer misc.Recover()
defer closeBackend()
dbRec := silentChannel
var dbSync *misc.Cond
defer func() {
if dbSync != nil {
dbSync.SignalLocked()
}
}()
for {
var bmsg pgproto3.BackendMessage
var ok bool
select {
case <-closed: // this backend is shutting down, we're either disconnecting the client or vice versa
return
case newRec := <-newDB: // switch postgres connections
if dbSync != nil {
log.Println("received unexpected new database connection")
return // kill this client connection, since we don't understand the current state
}
dbSync = newRec.OutSync
dbRec = newRec.Out
continue
case bmsg, ok = <-dbRec: // the currently attached postgres is sending a message
// if the dbRec channel is closed unexpectedly, we should kill this backend
if !ok {
return
}
case bmsg, ok = <-out: // roundabout is sending a message directly to the client
// sometimes we need to send a message to the client first, in which
// case the bmsg channel will be closed after the final message
if !ok {
return
}
}
// if this is an RFQ, we want to detach from the database
rfq, wantToDetach := bmsg.(*pgproto3.ReadyForQuery)
if wantToDetach {
// make a copy of the RFQ so that we can safely detach the database now
rfqCopy := *rfq
bmsg = &rfqCopy
if dbSync != nil {
// There may exist a pathological case where Postgres could send a message
// immediately after a client being initialized receives the post-preamble RFQ.
// In this specific case, the frontend.receiver would be waiting to send the
// message through the channel, and dbSync.L would be locked. SignalLocked()
// might create a deadlock if that happened, so instead we could spawn this
// into a separate goroutine here.
//
// The code as written should prevent this deadlock, but if a deadlock appears...
// this is the first place to check.
dbSync.SignalLocked()
}
// the backend manager decides whether detaching is successful or not
if detachDB(rfq.TxStatus) {
dbRec = silentChannel
dbSync = nil
}
}
// log.Println("b-msg-1", misc.Marshal(bmsg))
err := send(bmsg)
if err != nil {
// retry loop on temporary failure
for i := 0; i < 10 && err != nil && misc.IsTemporary(err); i++ {
time.Sleep(25 * time.Millisecond)
err = send(bmsg)
}
// did retry fail?
if err != nil {
log.Println("b-send", err)
return
}
}
if dbSync != nil {
dbSync.SignalLocked()
}
}
}