Skip to content

Commit 9b88d12

Browse files
Adrian Coleadriancole
Adrian Cole
authored andcommitted
Rewrites http collector to undertow
1 parent d49bb9d commit 9b88d12

File tree

2 files changed

+100
-58
lines changed

2 files changed

+100
-58
lines changed

zipkin-server/src/main/java/zipkin/server/ZipkinHttpCollector.java

Lines changed: 93 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -13,101 +13,136 @@
1313
*/
1414
package zipkin.server;
1515

16+
import io.undertow.io.Receiver;
17+
import io.undertow.server.HandlerWrapper;
18+
import io.undertow.server.HttpHandler;
19+
import io.undertow.server.HttpServerExchange;
20+
import io.undertow.util.HttpString;
1621
import java.io.ByteArrayInputStream;
1722
import java.io.ByteArrayOutputStream;
1823
import java.io.IOException;
1924
import java.util.zip.GZIPInputStream;
2025
import org.springframework.beans.factory.annotation.Autowired;
2126
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
22-
import org.springframework.http.ResponseEntity;
23-
import org.springframework.util.concurrent.ListenableFuture;
24-
import org.springframework.util.concurrent.SettableListenableFuture;
25-
import org.springframework.web.bind.annotation.RequestBody;
26-
import org.springframework.web.bind.annotation.RequestHeader;
27-
import org.springframework.web.bind.annotation.RequestMapping;
28-
import org.springframework.web.bind.annotation.RestController;
27+
import org.springframework.context.annotation.Configuration;
2928
import zipkin.SpanDecoder;
3029
import zipkin.collector.Collector;
3130
import zipkin.collector.CollectorMetrics;
3231
import zipkin.collector.CollectorSampler;
33-
import zipkin.internal.Nullable;
3432
import zipkin.internal.V2JsonSpanDecoder;
3533
import zipkin.storage.Callback;
3634
import zipkin.storage.StorageComponent;
3735

38-
import static org.springframework.web.bind.annotation.RequestMethod.POST;
36+
import static zipkin.SpanDecoder.JSON_DECODER;
37+
import static zipkin.SpanDecoder.THRIFT_DECODER;
3938

4039
/**
41-
* Implements the POST /api/v1/spans endpoint used by instrumentation.
40+
* Implements the POST /api/v1/spans and /api/v2/spans endpoints used by instrumentation.
4241
*/
43-
@RestController
42+
@Configuration
4443
@ConditionalOnProperty(name = "zipkin.collector.http.enabled", matchIfMissing = true)
45-
public class ZipkinHttpCollector {
46-
static final ResponseEntity<?> SUCCESS = ResponseEntity.accepted().build();
47-
static final String APPLICATION_THRIFT = "application/x-thrift";
48-
static final SpanDecoder JSON2_DECODER = new V2JsonSpanDecoder();
44+
class ZipkinHttpCollector implements HttpHandler, HandlerWrapper {
45+
46+
static final HttpString
47+
POST = HttpString.tryFromString("POST"),
48+
CONTENT_TYPE = HttpString.tryFromString("Content-Type"),
49+
CONTENT_ENCODING = HttpString.tryFromString("Content-Encoding");
4950

5051
final CollectorMetrics metrics;
5152
final Collector collector;
53+
final HttpCollector JSON_V2, JSON_V1, THRIFT;
54+
final Receiver.ErrorCallback errorCallback;
55+
private HttpHandler next;
5256

5357
@Autowired ZipkinHttpCollector(StorageComponent storage, CollectorSampler sampler,
54-
CollectorMetrics metrics) {
58+
CollectorMetrics metrics) {
5559
this.metrics = metrics.forTransport("http");
5660
this.collector = Collector.builder(getClass())
57-
.storage(storage).sampler(sampler).metrics(this.metrics).build();
61+
.storage(storage).sampler(sampler).metrics(this.metrics).build();
62+
this.JSON_V2 = new HttpCollector(new V2JsonSpanDecoder());
63+
this.JSON_V1 = new HttpCollector(JSON_DECODER);
64+
this.THRIFT = new HttpCollector(THRIFT_DECODER);
65+
this.errorCallback = new Receiver.ErrorCallback() {
66+
@Override public void error(HttpServerExchange exchange, IOException e) {
67+
ZipkinHttpCollector.this.metrics.incrementMessagesDropped();
68+
ZipkinHttpCollector.error(exchange, e);
69+
}
70+
};
5871
}
5972

60-
@RequestMapping(value = "/api/v2/spans", method = POST)
61-
public ListenableFuture<ResponseEntity<?>> uploadSpansJson2(
62-
@RequestHeader(value = "Content-Encoding", required = false) String encoding,
63-
@RequestBody byte[] body
64-
) {
65-
return validateAndStoreSpans(encoding, JSON2_DECODER, body);
66-
}
73+
@Override public void handleRequest(HttpServerExchange exchange) throws Exception {
74+
boolean v2 = exchange.getRelativePath().equals("/api/v2/spans");
75+
boolean v1 = !v2 && exchange.getRelativePath().equals("/api/v1/spans");
76+
if (!v2 && !v1) {
77+
next.handleRequest(exchange);
78+
return;
79+
}
6780

68-
@RequestMapping(value = "/api/v1/spans", method = POST)
69-
public ListenableFuture<ResponseEntity<?>> uploadSpansJson(
70-
@RequestHeader(value = "Content-Encoding", required = false) String encoding,
71-
@RequestBody byte[] body
72-
) {
73-
return validateAndStoreSpans(encoding, SpanDecoder.JSON_DECODER, body);
81+
if (!POST.equals(exchange.getRequestMethod())) {
82+
next.handleRequest(exchange);
83+
return;
84+
}
85+
86+
String contentTypeValue = exchange.getRequestHeaders().getFirst(CONTENT_TYPE);
87+
boolean json = contentTypeValue == null || contentTypeValue.startsWith("application/json");
88+
boolean thrift = !json && contentTypeValue.startsWith("application/x-thrift");
89+
if (!json && !thrift) {
90+
exchange.setStatusCode(400)
91+
.getResponseSender()
92+
.send("unsupported content type " + contentTypeValue + "\n");
93+
return;
94+
}
95+
96+
HttpCollector collector = v2 ? JSON_V2 : thrift ? THRIFT : JSON_V1;
97+
metrics.incrementMessages();
98+
exchange.getRequestReceiver().receiveFullBytes(collector, errorCallback);
7499
}
75100

76-
@RequestMapping(value = "/api/v1/spans", method = POST, consumes = APPLICATION_THRIFT)
77-
public ListenableFuture<ResponseEntity<?>> uploadSpansThrift(
78-
@RequestHeader(value = "Content-Encoding", required = false) String encoding,
79-
@RequestBody byte[] body
80-
) {
81-
return validateAndStoreSpans(encoding, SpanDecoder.THRIFT_DECODER, body);
101+
@Override public HttpHandler wrap(HttpHandler handler) {
102+
this.next = handler;
103+
return this;
82104
}
83105

84-
ListenableFuture<ResponseEntity<?>> validateAndStoreSpans(String encoding, SpanDecoder decoder,
85-
byte[] body) {
86-
SettableListenableFuture<ResponseEntity<?>> result = new SettableListenableFuture<>();
87-
metrics.incrementMessages();
88-
if (encoding != null && encoding.contains("gzip")) {
89-
try {
90-
body = gunzip(body);
91-
} catch (IOException e) {
92-
metrics.incrementMessagesDropped();
93-
result.set(ResponseEntity.badRequest().body("Cannot gunzip spans: " + e.getMessage() + "\n"));
94-
}
106+
final class HttpCollector implements Receiver.FullBytesCallback {
107+
final SpanDecoder decoder;
108+
109+
HttpCollector(SpanDecoder decoder) {
110+
this.decoder = decoder;
95111
}
96-
collector.acceptSpans(body, decoder, new Callback<Void>() {
97-
@Override public void onSuccess(@Nullable Void value) {
98-
result.set(SUCCESS);
99-
}
100112

101-
@Override public void onError(Throwable t) {
102-
String message = t.getMessage() == null ? t.getClass().getSimpleName() : t.getMessage();
103-
result.set(t.getMessage() == null || message.startsWith("Cannot store")
104-
? ResponseEntity.status(500).body(message + "\n")
105-
: ResponseEntity.status(400).body(message + "\n"));
113+
@Override public void handle(HttpServerExchange exchange, byte[] body) {
114+
String encoding = exchange.getRequestHeaders().getFirst(CONTENT_ENCODING);
115+
116+
if (encoding != null && encoding.contains("gzip")) {
117+
try {
118+
body = gunzip(body);
119+
} catch (IOException e) {
120+
metrics.incrementMessagesDropped();
121+
exchange.setStatusCode(400)
122+
.getResponseSender().send("Cannot gunzip spans: " + e.getMessage() + "\n");
123+
return;
124+
}
106125
}
107-
});
108-
return result;
126+
collector.acceptSpans(body, decoder, new Callback<Void>() {
127+
@Override public void onSuccess(Void value) {
128+
exchange.setStatusCode(202).getResponseSender().close();
129+
}
130+
131+
@Override public void onError(Throwable t) {
132+
error(exchange, t);
133+
}
134+
});
135+
}
136+
}
137+
138+
static void error(HttpServerExchange exchange, Throwable e) {
139+
String message = e.getMessage();
140+
int code = message == null || message.startsWith("Cannot store") ? 500 : 400;
141+
if (message == null) message = e.getClass().getSimpleName();
142+
exchange.setStatusCode(code).getResponseSender().send(message);
109143
}
110144

145+
// TODO: there's gotta be an N/IO way to gunzip
111146
private static final ThreadLocal<byte[]> GZIP_BUFFER = new ThreadLocal<byte[]>() {
112147
@Override protected byte[] initialValue() {
113148
return new byte[1024];

zipkin-server/src/main/java/zipkin/server/ZipkinServerConfiguration.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,12 +47,19 @@ public class ZipkinServerConfiguration {
4747

4848
@Autowired(required = false)
4949
UndertowDeploymentInfoCustomizer httpRequestDurationCustomizer;
50+
@Autowired(required = false)
51+
ZipkinHttpCollector httpCollector;
5052

5153
@Bean public UndertowEmbeddedServletContainerFactory embeddedServletContainerFactory(
5254
@Value("${zipkin.query.allowed-origins:*}") String allowedOrigins
5355
) {
5456
UndertowEmbeddedServletContainerFactory factory = new UndertowEmbeddedServletContainerFactory();
5557
CorsHandler cors = new CorsHandler(allowedOrigins);
58+
if (httpCollector != null) {
59+
factory.addDeploymentInfoCustomizers(
60+
info -> info.addInitialHandlerChainWrapper(httpCollector)
61+
);
62+
}
5663
factory.addDeploymentInfoCustomizers(
5764
info -> info.addInitialHandlerChainWrapper(cors)
5865
);

0 commit comments

Comments
 (0)