Skip to content

Commit c29914e

Browse files
committed
feat: Support for binary data streaming
1 parent f716d71 commit c29914e

File tree

3 files changed

+82
-8
lines changed

3 files changed

+82
-8
lines changed

lib/client/socket.js

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,12 @@ module.exports = ({ url, token, filters, config, identifyingMetadata }) => {
3030

3131
logger.info({ url }, 'broker client is connecting to broker server');
3232

33-
const response = relay.response(filters, config);
33+
const response = relay.response(filters, config, io);
34+
const streamingResponse = relay.streamingResponse;
3435

3536
// RS note: this bind doesn't feel right, it feels like a sloppy way of
3637
// getting the filters into the request function.
38+
io.on('chunk', streamingResponse(token));
3739
io.on('request', response());
3840
io.on('error', ({ type, description }) => {
3941
if (type === 'TransportError') {
@@ -53,7 +55,7 @@ module.exports = ({ url, token, filters, config, identifyingMetadata }) => {
5355
io.on('close', () => {
5456
logger.warn({ url, token }, 'websocket connection to the broker server was closed');
5557
});
56-
58+
5759
// only required if we're manually opening the connection
5860
// io.open();
5961

lib/relay.js

Lines changed: 74 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,41 @@ const replace = require('./replace-vars');
88
const tryJSONParse = require('./try-json-parse');
99
const logger = require('./log');
1010
const version = require('./version');
11+
const stream = require('stream');
1112

1213
module.exports = {
1314
request: requestHandler,
1415
response: responseHandler,
16+
streamingResponse: streamResponseHandler,
1517
};
1618

19+
const binaryRequest = [
20+
'application/vnd.docker.image.rootfs.diff.tar.gzip',
21+
];
22+
23+
const streams = new Map();
24+
25+
// Stream processing is only available on
26+
// the server side i.e. the client sends
27+
// binary chunks to the server.
28+
function streamResponseHandler(token) {
29+
return (streamingID, chunk, finished) => {
30+
var streamBuffer = streams.get(streamingID);
31+
32+
if (streamBuffer) {
33+
if (chunk) {
34+
streamBuffer.write(chunk);
35+
}
36+
if (finished) {
37+
streamBuffer.end();
38+
streams.delete(streamingID);
39+
}
40+
} else {
41+
logger.warn({ streamingID, token }, 'discarding binary chunk');
42+
}
43+
};
44+
}
45+
1746
// 1. Request coming in over HTTP conn (logged)
1847
// 2. Filter for rule match (log and block if no match)
1948
// 3. Relay over websocket conn (logged)
@@ -43,6 +72,29 @@ function requestHandler(filterRules) {
4372

4473
req.url = result.url;
4574
logContext.ioUrl = result.url;
75+
76+
// check if this is a streaming request for binary data
77+
if (req.headers && binaryRequest.indexOf(req.headers['accept']) !== -1) {
78+
const streamingID = uuid();
79+
const streamBuffer = new stream.PassThrough();
80+
81+
logContext.streamingID = streamingID;
82+
logger.debug(logContext, 'sending binary request over websocket connection');
83+
84+
streams.set(streamingID, streamBuffer);
85+
streamBuffer.pipe(res);
86+
87+
res.locals.io.send('request', {
88+
url: req.url,
89+
method: req.method,
90+
body: req.body,
91+
headers: req.headers,
92+
streamingID,
93+
});
94+
95+
return;
96+
}
97+
4698
logger.debug(logContext, 'sending request over websocket connection');
4799

48100
// relay the http request over the websocket, handle websocket response
@@ -51,6 +103,7 @@ function requestHandler(filterRules) {
51103
method: req.method,
52104
body: req.body,
53105
headers: req.headers,
106+
streamingID : '',
54107
}, ioResponse => {
55108
logContext.ioStatus = ioResponse.status;
56109
logContext.ioHeaders = ioResponse.headers;
@@ -84,15 +137,16 @@ function requestHandler(filterRules) {
84137
// 3. Relay over HTTP conn (logged)
85138
// 4. Get response over HTTP conn (logged)
86139
// 5. Send response over websocket conn
87-
function responseHandler(filterRules, config) {
140+
function responseHandler(filterRules, config, io) {
88141
const filters = Filters(filterRules);
89142

90-
return (brokerToken) => ({ url, headers = {}, method, body = null } = {}, emit) => {
143+
return (brokerToken) => ({ url, headers = {}, method, body = null, streamingID = '' } = {}, emit) => {
91144
const logContext = {
92145
url,
93146
method,
94147
headers,
95148
requestId: headers['snyk-request-id'] || uuid(),
149+
streamingID,
96150
};
97151

98152
logger.debug(logContext, 'received request over webscoket connection');
@@ -197,15 +251,31 @@ function responseHandler(filterRules, config) {
197251

198252
logger.debug(logContext, 'sending websocket request over HTTP connection');
199253

200-
request({
254+
var req = {
201255
url: result.url,
202256
headers: headers,
203257
method,
204258
body,
205259
agentOptions: {
206260
ca: config.caCert, // Optional CA cert
207261
},
208-
}, (error, response, body) => {
262+
};
263+
264+
// check if this is a streaming request for binary data
265+
if (streamingID) {
266+
logger.debug(logContext, 'serving binary stream request');
267+
268+
req.encoding = null; // no encoding for binary data
269+
270+
request(req).on('data', chunk => {
271+
io.send('chunk', streamingID, chunk, false);
272+
}).on('end', () => {
273+
io.send('chunk', streamingID, '', true);
274+
});
275+
return;
276+
}
277+
278+
request(req, (error, response, body) => {
209279
if (error) {
210280
logContext.error = error;
211281
logger.error(logContext, 'error while sending websocket request over HTTP connection');

lib/server/socket.js

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,8 @@ module.exports = ({ server, filters, config }) => {
88
io.plugin('emitter', Emitter);
99

1010
const connections = new Map();
11-
const response = relay.response(filters, config);
11+
const response = relay.response(filters, config, io);
12+
const streamingResponse = relay.streamingResponse;
1213

1314
io.on('error', error => logger.error({ error }, 'Primus/engine.io server error'));
1415

@@ -57,9 +58,10 @@ module.exports = ({ server, filters, config }) => {
5758
clientPool.unshift({ socket, metadata: clientData.metadata });
5859
connections.set(token, clientPool);
5960

61+
socket.on('chunk', streamingResponse(token));
6062
socket.on('request', response(token));
6163
});
62-
64+
6365
['close', 'end', 'disconnect'].forEach(e => socket.on(e, () => close(e)));
6466
socket.on('error', (error) => {
6567
logger.warn({ error }, 'error on websocket connection');

0 commit comments

Comments
 (0)