Skip to content

Commit 3c3b24e

Browse files
committed
feat: Streaming connections can now be defined in the accept.json file, adding tests, headers are now transferred, proper error handling, started adding text to the readme
1 parent c29914e commit 3c3b24e

File tree

9 files changed

+193
-40
lines changed

9 files changed

+193
-40
lines changed

README.npm.md

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,22 @@ The `private` rules should be determined by what you want to allow through the s
117117

118118
This `public` rule will ensure everything is forwarded to your clients, and will allow your client to handle blocking out messages.
119119

120+
Sometimes large amounts of data need to be transferred in a response to a request. In these instances it might makes sense to stream
121+
the data for these requests:
122+
123+
```json
124+
"public": [{
125+
"//": "accept any requests to our connected clients",
126+
"method": "any",
127+
"path": "/blobs/*",
128+
"stream": true
129+
}, {
130+
"//": "accept any requests to our connected clients",
131+
"method": "any",
132+
"path": "/*"
133+
}],
134+
```
135+
120136
## Development & how to test
121137

122138
The project's source code is written in full ES6 (with commonjs modules). This requires the source to be developed with node@6. However, during the release process, the code is transpiled to ES5 via babel and is released with node LTS in mind, node@4 and upwards.
@@ -284,4 +300,3 @@ Public filters are for requests that a recieved on your broker client and are in
284300
* [License: Apache License, Version 2.0](LICENSE)
285301
* [Contributing](.github/CONTRIBUTING.md)
286302
* [Security](SECURITY.md)
287-

lib/filters/index.js

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ module.exports = ruleSource => {
3232
// array of entries with
3333
const tests = rules.map(entry => {
3434
const keys = [];
35-
let { method, origin, path, valid } = entry;
35+
let { method, origin, path, valid, stream } = entry;
3636
method = (method || 'get').toLowerCase();
3737
valid = valid || [];
3838

@@ -135,6 +135,7 @@ module.exports = ruleSource => {
135135
return {
136136
url: origin + url + querystring,
137137
auth: entry.auth && authHeader(entry.auth),
138+
stream
138139
};
139140
};
140141
});

lib/relay.js

Lines changed: 71 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -9,36 +9,44 @@ const tryJSONParse = require('./try-json-parse');
99
const logger = require('./log');
1010
const version = require('./version');
1111
const stream = require('stream');
12+
const NodeCache = require('node-cache');
1213

1314
module.exports = {
1415
request: requestHandler,
1516
response: responseHandler,
1617
streamingResponse: streamResponseHandler,
1718
};
1819

19-
const binaryRequest = [
20-
'application/vnd.docker.image.rootfs.diff.tar.gzip',
21-
];
20+
const streams = new NodeCache({
21+
stdTTL: 18000, // 5 hours
22+
checkperiod: 3600, // 1 hour
23+
useClones: false
24+
});
2225

23-
const streams = new Map();
24-
25-
// Stream processing is only available on
26-
// the server side i.e. the client sends
27-
// binary chunks to the server.
2826
function streamResponseHandler(token) {
29-
return (streamingID, chunk, finished) => {
30-
var streamBuffer = streams.get(streamingID);
27+
return (streamingID, chunk, finished, ioResponse) => {
28+
const stream = streams.get(streamingID);
3129

32-
if (streamBuffer) {
33-
if (chunk) {
34-
streamBuffer.write(chunk);
35-
}
36-
if (finished) {
37-
streamBuffer.end();
38-
streams.delete(streamingID);
30+
if (stream) {
31+
const { streamBuffer, response } = stream;
32+
33+
if (streamBuffer) {
34+
if (ioResponse) {
35+
response.status(ioResponse.status)
36+
.set(ioResponse.headers);
37+
}
38+
if (chunk) {
39+
streamBuffer.write(chunk);
40+
}
41+
if (finished) {
42+
streamBuffer.end();
43+
streams.del(streamingID);
44+
}
45+
} else {
46+
logger.warn({ streamingID, token }, 'discarding binary chunk');
3947
}
4048
} else {
41-
logger.warn({ streamingID, token }, 'discarding binary chunk');
49+
logger.warn({ streamingID, token }, 'trying to write into a closed stream');
4250
}
4351
};
4452
}
@@ -74,14 +82,16 @@ function requestHandler(filterRules) {
7482
logContext.ioUrl = result.url;
7583

7684
// check if this is a streaming request for binary data
77-
if (req.headers && binaryRequest.indexOf(req.headers['accept']) !== -1) {
85+
if (result.stream) {
7886
const streamingID = uuid();
7987
const streamBuffer = new stream.PassThrough();
80-
8188
logContext.streamingID = streamingID;
82-
logger.debug(logContext, 'sending binary request over websocket connection');
89+
logger.debug(logContext, 'sending stream request over websocket connection');
8390

84-
streams.set(streamingID, streamBuffer);
91+
streams.set(streamingID, {
92+
response : res,
93+
streamBuffer
94+
});
8595
streamBuffer.pipe(res);
8696

8797
res.locals.io.send('request', {
@@ -90,6 +100,12 @@ function requestHandler(filterRules) {
90100
body: req.body,
91101
headers: req.headers,
92102
streamingID,
103+
}, () => {
104+
// Streaming requests should not be handled using the emit function
105+
// but rather by sending 'chunk' messages
106+
const msg = 'Broker client does not support streaming requests';
107+
logger.error(logContext, msg);
108+
return res.status(501).send({message: msg});
93109
});
94110

95111
return;
@@ -263,41 +279,60 @@ function responseHandler(filterRules, config, io) {
263279

264280
// check if this is a streaming request for binary data
265281
if (streamingID) {
266-
logger.debug(logContext, 'serving binary stream request');
282+
logger.debug(logContext, 'serving stream request');
267283

268284
req.encoding = null; // no encoding for binary data
269285

270-
request(req).on('data', chunk => {
286+
request(req).on('response', response => {
287+
const status = (response && response.statusCode) || 500;
288+
logResponse(logContext, status, response.headers);
289+
io.send('chunk', streamingID, '', false, {
290+
status,
291+
headers : response.headers
292+
});
293+
}).on('data', chunk => {
271294
io.send('chunk', streamingID, chunk, false);
272295
}).on('end', () => {
273296
io.send('chunk', streamingID, '', true);
297+
}).on('error', error => {
298+
logError(logContext, error);
299+
io.send('chunk', streamingID, error.message, true, {
300+
status : 500
301+
});
274302
});
275303
return;
276304
}
277305

278306
request(req, (error, response, body) => {
279307
if (error) {
280-
logContext.error = error;
281-
logger.error(logContext, 'error while sending websocket request over HTTP connection');
308+
logError(logContext, error);
282309
return emit({
283310
status: 500,
284311
body: error.message
285312
});
286313
}
287314

288315
const status = (response && response.statusCode) || 500;
289-
logContext.httpStatus = status;
290-
logContext.httpHeaders = response.headers;
291-
292-
const logMsg = 'sending response back to websocket connection';
293-
if (status <= 200) {
294-
logger.debug(logContext, logMsg);
295-
} else {
296-
logger.info(logContext, logMsg);
297-
}
298-
316+
logResponse(logContext, status, response.headers);
299317
emit({ status, body, headers: response.headers });
300318
});
301319
});
302320
};
303321
}
322+
323+
function logResponse(logContext, status, headers) {
324+
logContext.httpStatus = status;
325+
logContext.httpHeaders = headers;
326+
327+
const logMsg = 'sending response back to websocket connection';
328+
if (status <= 200) {
329+
logger.debug(logContext, logMsg);
330+
} else {
331+
logger.info(logContext, logMsg);
332+
}
333+
}
334+
335+
function logError(logContext, error) {
336+
logContext.error = error;
337+
logger.error(logContext, 'error while sending websocket request over HTTP connection');
338+
}

package-lock.json

Lines changed: 21 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
"lodash.escaperegexp": "^4.1.2",
4747
"lodash.mapvalues": "^4.6.0",
4848
"minimatch": "^3.0.4",
49+
"node-cache": "^4.2.1",
4950
"path-to-regexp": "^1.5.3",
5051
"primus": "^6.0.1",
5152
"primus-emitter": "^3.1.1",

test/fixtures/client/filters.json

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,11 @@
7272
]
7373
},
7474

75+
{
76+
"path": "/test-blob/*",
77+
"method": "GET",
78+
"origin": "http://localhost:${originPort}"
79+
},
7580

7681
{
7782
"path": "/repos/:repo/:owner/contents/folder*/package.json",

test/fixtures/server/filters.json

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,21 @@
2828
"path": "/echo-query/:param?",
2929
"method": "GET",
3030
"origin": "http://localhost:${originPort}"
31+
},
32+
33+
{
34+
"path": "/test-blob/*",
35+
"method": "GET",
36+
"origin": "http://localhost:${originPort}",
3137
}
3238

3339
],
3440
"public": [
41+
{
42+
"path": "/test-blob/*",
43+
"method": "GET",
44+
"stream": true
45+
},
3546
{
3647
"path": "/*",
3748
"method": "GET"

test/functional/server-client.test.js

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ test('proxy requests originating from behind the broker server', t => {
4444
server.io.on('connection', socket => {
4545
socket.on('identify', clientData => {
4646
const token = clientData.token;
47-
t.plan(22);
47+
t.plan(24);
4848

4949
t.test('identification', t => {
5050
const filters = require(`${clientRootPath}/${ACCEPT}`);
@@ -288,6 +288,49 @@ test('proxy requests originating from behind the broker server', t => {
288288
});
289289
});
290290

291+
t.test('successfully stream data', t => {
292+
const url = `http://localhost:${serverPort}/broker/${token}/test-blob/1`;
293+
request({
294+
url,
295+
method: 'get',
296+
encoding: null
297+
}, (err, res, body) => {
298+
299+
// No encoding is only possible when streaming
300+
// data as we otherwise encode the data
301+
// when making the request on the client.
302+
303+
t.equal(res.statusCode, 299, '299 statusCode');
304+
t.equal(res.headers['test-orig-url'],
305+
'/test-blob/1', 'orig URL');
306+
307+
// Check that the server response with the correct data
308+
309+
const buf = new Buffer(500);
310+
for (var i=0; i<500; i++) {
311+
buf.writeUInt8(i & 0xFF, i);
312+
}
313+
t.deepEqual(body, buf);
314+
315+
t.end();
316+
});
317+
});
318+
319+
t.test('fail to stream data', t => {
320+
const url = `http://localhost:${serverPort}/broker/${token}/test-blob/2`;
321+
request({
322+
url,
323+
method: 'get',
324+
encoding: null
325+
}, (err, res, body) => {
326+
t.equal(res.statusCode, 500, '500 statusCode');
327+
t.equal(res.headers['test-orig-url'],
328+
'/test-blob/2', 'orig URL');
329+
t.equal(String(body), 'Test Error');
330+
t.end();
331+
});
332+
});
333+
291334
t.test('clean up', t => {
292335
client.close();
293336
setTimeout(() => {

test/utils.js

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,27 @@ const { app: echoServer, server } = webserver({
1515
httpsCert: process.env.TEST_CERT, // Optional
1616
});
1717

18+
echoServer.get(
19+
'/test-blob/1',
20+
(req, res) => {
21+
res.setHeader('test-orig-url', req.originalUrl);
22+
res.status(299);
23+
24+
const buf = new Buffer(500);
25+
for (var i=0; i<500; i++) {
26+
buf.writeUInt8(i & 0xFF, i);
27+
}
28+
res.send(buf);
29+
});
30+
31+
echoServer.get(
32+
'/test-blob/2',
33+
(req, res) => {
34+
res.setHeader('test-orig-url', req.originalUrl);
35+
res.status(500);
36+
res.send('Test Error');
37+
});
38+
1839
echoServer.get('/basic-auth', (req, res) => {
1940
res.send(req.headers.authorization);
2041
});
@@ -44,7 +65,7 @@ echoServer.get(
4465
(req, res) => {
4566
res.send(req.originalUrl);
4667
});
47-
68+
4869
echoServer.get('/repos/owner/repo/contents/folder/package.json',
4970
(req, res) => {
5071
res.json({headers: req.headers, query: req.query, url: req.url});

0 commit comments

Comments
 (0)