|
13 | 13 | */
|
14 | 14 | package zipkin.server;
|
15 | 15 |
|
| 16 | +import io.undertow.io.Receiver; |
| 17 | +import io.undertow.server.HandlerWrapper; |
| 18 | +import io.undertow.server.HttpHandler; |
| 19 | +import io.undertow.server.HttpServerExchange; |
| 20 | +import io.undertow.util.HttpString; |
16 | 21 | import java.io.ByteArrayInputStream;
|
17 | 22 | import java.io.ByteArrayOutputStream;
|
18 | 23 | import java.io.IOException;
|
19 | 24 | import java.util.zip.GZIPInputStream;
|
20 | 25 | import org.springframework.beans.factory.annotation.Autowired;
|
21 | 26 | import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
22 |
| -import org.springframework.http.ResponseEntity; |
23 |
| -import org.springframework.util.concurrent.ListenableFuture; |
24 |
| -import org.springframework.util.concurrent.SettableListenableFuture; |
25 |
| -import org.springframework.web.bind.annotation.RequestBody; |
26 |
| -import org.springframework.web.bind.annotation.RequestHeader; |
27 |
| -import org.springframework.web.bind.annotation.RequestMapping; |
28 |
| -import org.springframework.web.bind.annotation.RestController; |
| 27 | +import org.springframework.context.annotation.Configuration; |
29 | 28 | import zipkin.SpanDecoder;
|
30 | 29 | import zipkin.collector.Collector;
|
31 | 30 | import zipkin.collector.CollectorMetrics;
|
32 | 31 | import zipkin.collector.CollectorSampler;
|
33 |
| -import zipkin.internal.Nullable; |
34 | 32 | import zipkin.internal.V2JsonSpanDecoder;
|
35 | 33 | import zipkin.storage.Callback;
|
36 | 34 | import zipkin.storage.StorageComponent;
|
37 | 35 |
|
38 |
| -import static org.springframework.web.bind.annotation.RequestMethod.POST; |
| 36 | +import static zipkin.SpanDecoder.JSON_DECODER; |
| 37 | +import static zipkin.SpanDecoder.THRIFT_DECODER; |
39 | 38 |
|
40 | 39 | /**
|
41 |
| - * Implements the POST /api/v1/spans endpoint used by instrumentation. |
| 40 | + * Implements the POST /api/v1/spans and /api/v2/spans endpoints used by instrumentation. |
42 | 41 | */
|
43 |
| -@RestController |
| 42 | +@Configuration |
44 | 43 | @ConditionalOnProperty(name = "zipkin.collector.http.enabled", matchIfMissing = true)
|
45 |
| -public class ZipkinHttpCollector { |
46 |
| - static final ResponseEntity<?> SUCCESS = ResponseEntity.accepted().build(); |
47 |
| - static final String APPLICATION_THRIFT = "application/x-thrift"; |
48 |
| - static final SpanDecoder JSON2_DECODER = new V2JsonSpanDecoder(); |
| 44 | +class ZipkinHttpCollector implements HttpHandler, HandlerWrapper { |
| 45 | + |
| 46 | + static final HttpString |
| 47 | + POST = HttpString.tryFromString("POST"), |
| 48 | + CONTENT_TYPE = HttpString.tryFromString("Content-Type"), |
| 49 | + CONTENT_ENCODING = HttpString.tryFromString("Content-Encoding"); |
49 | 50 |
|
50 | 51 | final CollectorMetrics metrics;
|
51 | 52 | final Collector collector;
|
| 53 | + final HttpCollector JSON_V2, JSON_V1, THRIFT; |
| 54 | + final Receiver.ErrorCallback errorCallback; |
| 55 | + private HttpHandler next; |
52 | 56 |
|
53 | 57 | @Autowired ZipkinHttpCollector(StorageComponent storage, CollectorSampler sampler,
|
54 |
| - CollectorMetrics metrics) { |
| 58 | + CollectorMetrics metrics) { |
55 | 59 | this.metrics = metrics.forTransport("http");
|
56 | 60 | this.collector = Collector.builder(getClass())
|
57 |
| - .storage(storage).sampler(sampler).metrics(this.metrics).build(); |
| 61 | + .storage(storage).sampler(sampler).metrics(this.metrics).build(); |
| 62 | + this.JSON_V2 = new HttpCollector(new V2JsonSpanDecoder()); |
| 63 | + this.JSON_V1 = new HttpCollector(JSON_DECODER); |
| 64 | + this.THRIFT = new HttpCollector(THRIFT_DECODER); |
| 65 | + this.errorCallback = new Receiver.ErrorCallback() { |
| 66 | + @Override public void error(HttpServerExchange exchange, IOException e) { |
| 67 | + ZipkinHttpCollector.this.metrics.incrementMessagesDropped(); |
| 68 | + ZipkinHttpCollector.error(exchange, e); |
| 69 | + } |
| 70 | + }; |
58 | 71 | }
|
59 | 72 |
|
60 |
| - @RequestMapping(value = "/api/v2/spans", method = POST) |
61 |
| - public ListenableFuture<ResponseEntity<?>> uploadSpansJson2( |
62 |
| - @RequestHeader(value = "Content-Encoding", required = false) String encoding, |
63 |
| - @RequestBody byte[] body |
64 |
| - ) { |
65 |
| - return validateAndStoreSpans(encoding, JSON2_DECODER, body); |
66 |
| - } |
| 73 | + @Override public void handleRequest(HttpServerExchange exchange) throws Exception { |
| 74 | + boolean v2 = exchange.getRelativePath().equals("/api/v2/spans"); |
| 75 | + boolean v1 = !v2 && exchange.getRelativePath().equals("/api/v1/spans"); |
| 76 | + if (!v2 && !v1) { |
| 77 | + next.handleRequest(exchange); |
| 78 | + return; |
| 79 | + } |
67 | 80 |
|
68 |
| - @RequestMapping(value = "/api/v1/spans", method = POST) |
69 |
| - public ListenableFuture<ResponseEntity<?>> uploadSpansJson( |
70 |
| - @RequestHeader(value = "Content-Encoding", required = false) String encoding, |
71 |
| - @RequestBody byte[] body |
72 |
| - ) { |
73 |
| - return validateAndStoreSpans(encoding, SpanDecoder.JSON_DECODER, body); |
| 81 | + if (!POST.equals(exchange.getRequestMethod())) { |
| 82 | + next.handleRequest(exchange); |
| 83 | + return; |
| 84 | + } |
| 85 | + |
| 86 | + String contentTypeValue = exchange.getRequestHeaders().getFirst(CONTENT_TYPE); |
| 87 | + boolean json = contentTypeValue == null || contentTypeValue.startsWith("application/json"); |
| 88 | + boolean thrift = !json && contentTypeValue.startsWith("application/x-thrift"); |
| 89 | + if (!json && !thrift) { |
| 90 | + exchange.setStatusCode(400) |
| 91 | + .getResponseSender() |
| 92 | + .send("unsupported content type " + contentTypeValue + "\n"); |
| 93 | + return; |
| 94 | + } |
| 95 | + |
| 96 | + HttpCollector collector = v2 ? JSON_V2 : thrift ? THRIFT : JSON_V1; |
| 97 | + metrics.incrementMessages(); |
| 98 | + exchange.getRequestReceiver().receiveFullBytes(collector, errorCallback); |
74 | 99 | }
|
75 | 100 |
|
76 |
| - @RequestMapping(value = "/api/v1/spans", method = POST, consumes = APPLICATION_THRIFT) |
77 |
| - public ListenableFuture<ResponseEntity<?>> uploadSpansThrift( |
78 |
| - @RequestHeader(value = "Content-Encoding", required = false) String encoding, |
79 |
| - @RequestBody byte[] body |
80 |
| - ) { |
81 |
| - return validateAndStoreSpans(encoding, SpanDecoder.THRIFT_DECODER, body); |
| 101 | + @Override public HttpHandler wrap(HttpHandler handler) { |
| 102 | + this.next = handler; |
| 103 | + return this; |
82 | 104 | }
|
83 | 105 |
|
84 |
| - ListenableFuture<ResponseEntity<?>> validateAndStoreSpans(String encoding, SpanDecoder decoder, |
85 |
| - byte[] body) { |
86 |
| - SettableListenableFuture<ResponseEntity<?>> result = new SettableListenableFuture<>(); |
87 |
| - metrics.incrementMessages(); |
88 |
| - if (encoding != null && encoding.contains("gzip")) { |
89 |
| - try { |
90 |
| - body = gunzip(body); |
91 |
| - } catch (IOException e) { |
92 |
| - metrics.incrementMessagesDropped(); |
93 |
| - result.set(ResponseEntity.badRequest().body("Cannot gunzip spans: " + e.getMessage() + "\n")); |
94 |
| - } |
| 106 | + final class HttpCollector implements Receiver.FullBytesCallback { |
| 107 | + final SpanDecoder decoder; |
| 108 | + |
| 109 | + HttpCollector(SpanDecoder decoder) { |
| 110 | + this.decoder = decoder; |
95 | 111 | }
|
96 |
| - collector.acceptSpans(body, decoder, new Callback<Void>() { |
97 |
| - @Override public void onSuccess(@Nullable Void value) { |
98 |
| - result.set(SUCCESS); |
99 |
| - } |
100 | 112 |
|
101 |
| - @Override public void onError(Throwable t) { |
102 |
| - String message = t.getMessage() == null ? t.getClass().getSimpleName() : t.getMessage(); |
103 |
| - result.set(t.getMessage() == null || message.startsWith("Cannot store") |
104 |
| - ? ResponseEntity.status(500).body(message + "\n") |
105 |
| - : ResponseEntity.status(400).body(message + "\n")); |
| 113 | + @Override public void handle(HttpServerExchange exchange, byte[] body) { |
| 114 | + String encoding = exchange.getRequestHeaders().getFirst(CONTENT_ENCODING); |
| 115 | + |
| 116 | + if (encoding != null && encoding.contains("gzip")) { |
| 117 | + try { |
| 118 | + body = gunzip(body); |
| 119 | + } catch (IOException e) { |
| 120 | + metrics.incrementMessagesDropped(); |
| 121 | + exchange.setStatusCode(400) |
| 122 | + .getResponseSender().send("Cannot gunzip spans: " + e.getMessage() + "\n"); |
| 123 | + return; |
| 124 | + } |
106 | 125 | }
|
107 |
| - }); |
108 |
| - return result; |
| 126 | + collector.acceptSpans(body, decoder, new Callback<Void>() { |
| 127 | + @Override public void onSuccess(Void value) { |
| 128 | + exchange.setStatusCode(202).getResponseSender().close(); |
| 129 | + } |
| 130 | + |
| 131 | + @Override public void onError(Throwable t) { |
| 132 | + error(exchange, t); |
| 133 | + } |
| 134 | + }); |
| 135 | + } |
| 136 | + } |
| 137 | + |
| 138 | + static void error(HttpServerExchange exchange, Throwable e) { |
| 139 | + String message = e.getMessage(); |
| 140 | + int code = message == null || message.startsWith("Cannot store") ? 500 : 400; |
| 141 | + if (message == null) message = e.getClass().getSimpleName(); |
| 142 | + exchange.setStatusCode(code).getResponseSender().send(message); |
109 | 143 | }
|
110 | 144 |
|
| 145 | + // TODO: there's gotta be an N/IO way to gunzip |
111 | 146 | private static final ThreadLocal<byte[]> GZIP_BUFFER = new ThreadLocal<byte[]>() {
|
112 | 147 | @Override protected byte[] initialValue() {
|
113 | 148 | return new byte[1024];
|
|
0 commit comments