9
9
"os"
10
10
11
11
zmq "github.com/alecthomas/gozmq"
12
+ "github.com/pkg/errors"
12
13
)
13
14
14
15
var logger * log.Logger
@@ -37,17 +38,16 @@ type SocketGroup struct {
37
38
}
38
39
39
40
// PrepareSockets sets up the ZMQ sockets through which the kernel will communicate.
40
- func PrepareSockets (connInfo ConnectionInfo ) (sg SocketGroup ) {
41
+ func PrepareSockets (connInfo ConnectionInfo ) (SocketGroup , error ) {
41
42
42
- // TODO handle errors.
43
- context , _ := zmq .NewContext ()
44
- sg .ShellSocket , _ = context .NewSocket (zmq .ROUTER )
45
- sg .ControlSocket , _ = context .NewSocket (zmq .ROUTER )
46
- sg .StdinSocket , _ = context .NewSocket (zmq .ROUTER )
47
- sg .IOPubSocket , _ = context .NewSocket (zmq .PUB )
43
+ // Initialize the Socket Group.
44
+ context , sg , err := createSockets ()
45
+ if err != nil {
46
+ return sg , errors .Wrap (err , "Could not initialize context and Socket Group" )
47
+ }
48
48
49
+ // Bind the sockets.
49
50
address := fmt .Sprintf ("%v://%v:%%v" , connInfo .Transport , connInfo .IP )
50
-
51
51
sg .ShellSocket .Bind (fmt .Sprintf (address , connInfo .ShellPort ))
52
52
sg .ControlSocket .Bind (fmt .Sprintf (address , connInfo .ControlPort ))
53
53
sg .StdinSocket .Bind (fmt .Sprintf (address , connInfo .StdinPort ))
@@ -57,11 +57,46 @@ func PrepareSockets(connInfo ConnectionInfo) (sg SocketGroup) {
57
57
sg .Key = []byte (connInfo .Key )
58
58
59
59
// Start the heartbeat device
60
- HBSocket , _ := context .NewSocket (zmq .REP )
60
+ HBSocket , err := context .NewSocket (zmq .REP )
61
+ if err != nil {
62
+ return sg , errors .Wrap (err , "Could not get the Heartbeat device socket" )
63
+ }
61
64
HBSocket .Bind (fmt .Sprintf (address , connInfo .HBPort ))
62
65
go zmq .Device (zmq .FORWARDER , HBSocket , HBSocket )
63
66
64
- return
67
+ return sg , nil
68
+ }
69
+
70
+ // createSockets initializes the sockets for the socket group based on values from zmq.
71
+ func createSockets () (* zmq.Context , SocketGroup , error ) {
72
+
73
+ context , err := zmq .NewContext ()
74
+ if err != nil {
75
+ return context , SocketGroup {}, errors .Wrap (err , "Could not create zmq Context" )
76
+ }
77
+
78
+ var sg SocketGroup
79
+ sg .ShellSocket , err = context .NewSocket (zmq .ROUTER )
80
+ if err != nil {
81
+ return context , sg , errors .Wrap (err , "Could not get Shell Socket" )
82
+ }
83
+
84
+ sg .ControlSocket , err = context .NewSocket (zmq .ROUTER )
85
+ if err != nil {
86
+ return context , sg , errors .Wrap (err , "Could not get Control Socket" )
87
+ }
88
+
89
+ sg .StdinSocket , err = context .NewSocket (zmq .ROUTER )
90
+ if err != nil {
91
+ return context , sg , errors .Wrap (err , "Could not get Stdin Socket" )
92
+ }
93
+
94
+ sg .IOPubSocket , err = context .NewSocket (zmq .PUB )
95
+ if err != nil {
96
+ return context , sg , errors .Wrap (err , "Could not get IOPub Socket" )
97
+ }
98
+
99
+ return context , sg , nil
65
100
}
66
101
67
102
// HandleShellMsg responds to a message on the shell ROUTER socket.
@@ -126,14 +161,16 @@ func RunKernel(connectionFile string, logwriter io.Writer) {
126
161
if err != nil {
127
162
log .Fatalln (err )
128
163
}
129
- err = json .Unmarshal (bs , & connInfo )
130
- if err != nil {
164
+ if err = json .Unmarshal (bs , & connInfo ); err != nil {
131
165
log .Fatalln (err )
132
166
}
133
167
logger .Printf ("%+v\n " , connInfo )
134
168
135
169
// Set up the ZMQ sockets through which the kernel will communicate
136
- sockets := PrepareSockets (connInfo )
170
+ sockets , err := PrepareSockets (connInfo )
171
+ if err != nil {
172
+ log .Fatalln (err )
173
+ }
137
174
138
175
pi := zmq.PollItems {
139
176
zmq.PollItem {Socket : sockets .ShellSocket , Events : zmq .POLLIN },
@@ -144,8 +181,7 @@ func RunKernel(connectionFile string, logwriter io.Writer) {
144
181
var msgparts [][]byte
145
182
// Message receiving loop:
146
183
for {
147
- _ , err = zmq .Poll (pi , - 1 )
148
- if err != nil {
184
+ if _ , err = zmq .Poll (pi , - 1 ); err != nil {
149
185
log .Fatalln (err )
150
186
}
151
187
switch {
0 commit comments