@@ -8,12 +8,41 @@ const replace = require('./replace-vars');
8
8
const tryJSONParse = require ( './try-json-parse' ) ;
9
9
const logger = require ( './log' ) ;
10
10
const version = require ( './version' ) ;
11
+ const stream = require ( 'stream' ) ;
11
12
12
13
module . exports = {
13
14
request : requestHandler ,
14
15
response : responseHandler ,
16
+ streamingResponse : streamResponseHandler ,
15
17
} ;
16
18
19
+ const binaryRequest = [
20
+ 'application/vnd.docker.image.rootfs.diff.tar.gzip' ,
21
+ ] ;
22
+
23
+ const streams = new Map ( ) ;
24
+
25
+ // Stream processing is only available on
26
+ // the server side i.e. the client sends
27
+ // binary chunks to the server.
28
+ function streamResponseHandler ( token ) {
29
+ return ( streamingID , chunk , finished ) => {
30
+ var streamBuffer = streams . get ( streamingID ) ;
31
+
32
+ if ( streamBuffer ) {
33
+ if ( chunk ) {
34
+ streamBuffer . write ( chunk ) ;
35
+ }
36
+ if ( finished ) {
37
+ streamBuffer . end ( ) ;
38
+ streams . delete ( streamingID ) ;
39
+ }
40
+ } else {
41
+ logger . warn ( { streamingID, token } , 'discarding binary chunk' ) ;
42
+ }
43
+ } ;
44
+ }
45
+
17
46
// 1. Request coming in over HTTP conn (logged)
18
47
// 2. Filter for rule match (log and block if no match)
19
48
// 3. Relay over websocket conn (logged)
@@ -43,6 +72,29 @@ function requestHandler(filterRules) {
43
72
44
73
req . url = result . url ;
45
74
logContext . ioUrl = result . url ;
75
+
76
+ // check if this is a streaming request for binary data
77
+ if ( req . headers && binaryRequest . indexOf ( req . headers [ 'accept' ] ) !== - 1 ) {
78
+ const streamingID = uuid ( ) ;
79
+ const streamBuffer = new stream . PassThrough ( ) ;
80
+
81
+ logContext . streamingID = streamingID ;
82
+ logger . debug ( logContext , 'sending binary request over websocket connection' ) ;
83
+
84
+ streams . set ( streamingID , streamBuffer ) ;
85
+ streamBuffer . pipe ( res ) ;
86
+
87
+ res . locals . io . send ( 'request' , {
88
+ url : req . url ,
89
+ method : req . method ,
90
+ body : req . body ,
91
+ headers : req . headers ,
92
+ streamingID,
93
+ } ) ;
94
+
95
+ return ;
96
+ }
97
+
46
98
logger . debug ( logContext , 'sending request over websocket connection' ) ;
47
99
48
100
// relay the http request over the websocket, handle websocket response
@@ -51,6 +103,7 @@ function requestHandler(filterRules) {
51
103
method : req . method ,
52
104
body : req . body ,
53
105
headers : req . headers ,
106
+ streamingID : '' ,
54
107
} , ioResponse => {
55
108
logContext . ioStatus = ioResponse . status ;
56
109
logContext . ioHeaders = ioResponse . headers ;
@@ -84,15 +137,16 @@ function requestHandler(filterRules) {
84
137
// 3. Relay over HTTP conn (logged)
85
138
// 4. Get response over HTTP conn (logged)
86
139
// 5. Send response over websocket conn
87
- function responseHandler ( filterRules , config ) {
140
+ function responseHandler ( filterRules , config , io ) {
88
141
const filters = Filters ( filterRules ) ;
89
142
90
- return ( brokerToken ) => ( { url, headers = { } , method, body = null } = { } , emit ) => {
143
+ return ( brokerToken ) => ( { url, headers = { } , method, body = null , streamingID = '' } = { } , emit ) => {
91
144
const logContext = {
92
145
url,
93
146
method,
94
147
headers,
95
148
requestId : headers [ 'snyk-request-id' ] || uuid ( ) ,
149
+ streamingID,
96
150
} ;
97
151
98
152
logger . debug ( logContext , 'received request over webscoket connection' ) ;
@@ -197,15 +251,31 @@ function responseHandler(filterRules, config) {
197
251
198
252
logger . debug ( logContext , 'sending websocket request over HTTP connection' ) ;
199
253
200
- request ( {
254
+ var req = {
201
255
url : result . url ,
202
256
headers : headers ,
203
257
method,
204
258
body,
205
259
agentOptions : {
206
260
ca : config . caCert , // Optional CA cert
207
261
} ,
208
- } , ( error , response , body ) => {
262
+ } ;
263
+
264
+ // check if this is a streaming request for binary data
265
+ if ( streamingID ) {
266
+ logger . debug ( logContext , 'serving binary stream request' ) ;
267
+
268
+ req . encoding = null ; // no encoding for binary data
269
+
270
+ request ( req ) . on ( 'data' , chunk => {
271
+ io . send ( 'chunk' , streamingID , chunk , false ) ;
272
+ } ) . on ( 'end' , ( ) => {
273
+ io . send ( 'chunk' , streamingID , '' , true ) ;
274
+ } ) ;
275
+ return ;
276
+ }
277
+
278
+ request ( req , ( error , response , body ) => {
209
279
if ( error ) {
210
280
logContext . error = error ;
211
281
logger . error ( logContext , 'error while sending websocket request over HTTP connection' ) ;
0 commit comments