Skip to content

Commit dc9de8b

Browse files
authored
Starts preparing v2 Codec package (openzipkin#1696)
All current Zipkin codec operations include the following: * encoding spans one-by-one as reported by instrumentation * encoding a list of encoded spans ready to send into a bytes message * decoding a list of spans off a transport message Zipkin v1 includes legacy encoding, which is a bit different. For example, old variants of Kafka accepted single-element messages instead of a list. We don't need to carry-forward this anymore. Moreover, interfaces of reporter vs collector were split across jars in Zipkin v1 due to the former being defined late. This introduces the following key interfaces and cleans up some code around codec. * `byte[] Encoder.encode(S span)` * `M MessageEncoder.encode(List<byte[]> encodedSpans)` * `List<S> Decoder.decodeList(byte[] message)` All of these are temporarily placed in an internal package until other code around it stabilizes.
1 parent 88e0ef1 commit dc9de8b

File tree

32 files changed

+646
-371
lines changed

32 files changed

+646
-371
lines changed

benchmarks/src/main/java/zipkin/benchmarks/CodecBenchmarks.java

Lines changed: 11 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.util.Collections;
2121
import java.util.List;
2222
import java.util.concurrent.TimeUnit;
23+
import java.util.stream.Collectors;
2324
import org.apache.thrift.TDeserializer;
2425
import org.apache.thrift.TException;
2526
import org.apache.thrift.TSerializer;
@@ -42,7 +43,9 @@
4243
import zipkin.Endpoint;
4344
import zipkin.Span;
4445
import zipkin.internal.Span2;
45-
import zipkin.internal.Span2Codec;
46+
import zipkin.internal.v2.codec.MessageEncoder;
47+
import zipkin.internal.v2.codec.Decoder;
48+
import zipkin.internal.v2.codec.Encoder;
4649

4750
/**
4851
* This compares the speed of the bundled java codec with the approach used in the scala
@@ -156,29 +159,19 @@ public byte[] writeClientSpan_thrift_libthrift() throws TException {
156159
return serialize(clientSpanLibThrift);
157160
}
158161

159-
static final byte[] span2Json = read("/span2.json");
160-
static final Span2 span2 = Span2Codec.JSON.readSpan(span2Json);
161-
static final List<Span2> tenClientSpan2s = Collections.nCopies(10, span2);
162-
static final byte[] tenClientSpan2sJson = Span2Codec.JSON.writeSpans(tenClientSpan2s);
163-
164-
@Benchmark
165-
public Span2 readClientSpan_json_span2() {
166-
return Span2Codec.JSON.readSpan(span2Json);
167-
}
162+
static final Span2 span2 = Decoder.JSON.decodeList(read("/span2.json")).get(0);
163+
static final byte[] tenClientSpan2sJson = MessageEncoder.JSON_BYTES.encode(
164+
Collections.nCopies(10, span2).stream().map(Encoder.JSON::encode).collect(Collectors.toList())
165+
);
168166

169167
@Benchmark
170168
public List<Span2> readTenClientSpans_json_span2() {
171-
return Span2Codec.JSON.readSpans(tenClientSpan2sJson);
169+
return Decoder.JSON.decodeList(tenClientSpan2sJson);
172170
}
173171

174172
@Benchmark
175173
public byte[] writeClientSpan_json_span2() {
176-
return Span2Codec.JSON.writeSpan(span2);
177-
}
178-
179-
@Benchmark
180-
public byte[] writeTenClientSpans_json_span2() {
181-
return Span2Codec.JSON.writeSpans(tenClientSpan2s);
174+
return Encoder.JSON.encode(span2);
182175
}
183176

184177
static final byte[] rpcSpanJson = read("/span-rpc.json");
@@ -254,7 +247,7 @@ public byte[] writeRpcV6Span_thrift_libthrift() throws TException {
254247
// Convenience main entry-point
255248
public static void main(String[] args) throws RunnerException {
256249
Options opt = new OptionsBuilder()
257-
.include(".*" + CodecBenchmarks.class.getSimpleName())
250+
.include("CodecBenchmarks.readTenClientSpans_json_span2")
258251
.build();
259252

260253
new Runner(opt).run();

benchmarks/src/main/resources/span2.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
{
1+
[{
22
"traceId": "86154a4ba6e91385",
33
"parentId": "86154a4ba6e91385",
44
"id": "4d1e00c0db9010db",
@@ -29,4 +29,4 @@
2929
"http.path": "/api",
3030
"clnt/finagle.version": "6.45.0"
3131
}
32-
}
32+
}]

zipkin-collector/kafka/src/test/java/zipkin/collector/kafka/KafkaCollectorTest.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,13 +31,15 @@
3131
import zipkin.collector.InMemoryCollectorMetrics;
3232
import zipkin.collector.kafka.KafkaCollector.Builder;
3333
import zipkin.internal.ApplyTimestampAndDuration;
34-
import zipkin.internal.Span2Codec;
3534
import zipkin.internal.Span2Converter;
35+
import zipkin.internal.v2.codec.MessageEncoder;
36+
import zipkin.internal.v2.codec.Encoder;
3637
import zipkin.storage.AsyncSpanConsumer;
3738
import zipkin.storage.AsyncSpanStore;
3839
import zipkin.storage.SpanStore;
3940
import zipkin.storage.StorageComponent;
4041

42+
import static java.util.Arrays.asList;
4143
import static org.assertj.core.api.Assertions.assertThat;
4244
import static zipkin.TestObjects.LOTS_OF_SPANS;
4345
import static zipkin.TestObjects.TRACE;
@@ -145,19 +147,19 @@ public void messageWithMultipleSpans_json2() throws Exception {
145147
ApplyTimestampAndDuration.apply(LOTS_OF_SPANS[1])
146148
);
147149

148-
byte[] bytes = Span2Codec.JSON.writeSpans(Arrays.asList(
149-
Span2Converter.fromSpan(spans.get(0)).get(0),
150-
Span2Converter.fromSpan(spans.get(1)).get(0)
150+
byte[] message = MessageEncoder.JSON_BYTES.encode(asList(
151+
Encoder.JSON.encode(Span2Converter.fromSpan(spans.get(0)).get(0)),
152+
Encoder.JSON.encode(Span2Converter.fromSpan(spans.get(1)).get(0))
151153
));
152154

153-
producer.send(new KeyedMessage<>(builder.topic, bytes));
155+
producer.send(new KeyedMessage<>(builder.topic, message));
154156

155157
try (KafkaCollector collector = newKafkaTransport(builder, consumer)) {
156158
assertThat(recvdSpans.take()).containsAll(spans);
157159
}
158160

159161
assertThat(kafkaMetrics.messages()).isEqualTo(1);
160-
assertThat(kafkaMetrics.bytes()).isEqualTo(bytes.length);
162+
assertThat(kafkaMetrics.bytes()).isEqualTo(message.length);
161163
assertThat(kafkaMetrics.spans()).isEqualTo(spans.size());
162164
}
163165

zipkin-collector/kafka10/src/test/java/zipkin/collector/kafka10/KafkaCollectorTest.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -38,13 +38,15 @@
3838
import zipkin.collector.InMemoryCollectorMetrics;
3939
import zipkin.collector.kafka10.KafkaCollector.Builder;
4040
import zipkin.internal.ApplyTimestampAndDuration;
41-
import zipkin.internal.Span2Codec;
4241
import zipkin.internal.Span2Converter;
42+
import zipkin.internal.v2.codec.MessageEncoder;
43+
import zipkin.internal.v2.codec.Encoder;
4344
import zipkin.storage.AsyncSpanConsumer;
4445
import zipkin.storage.AsyncSpanStore;
4546
import zipkin.storage.SpanStore;
4647
import zipkin.storage.StorageComponent;
4748

49+
import static java.util.Arrays.asList;
4850
import static org.assertj.core.api.Assertions.assertThat;
4951
import static zipkin.TestObjects.LOTS_OF_SPANS;
5052
import static zipkin.TestObjects.TRACE;
@@ -199,20 +201,20 @@ public void messageWithMultipleSpans_json2() throws Exception {
199201
ApplyTimestampAndDuration.apply(LOTS_OF_SPANS[1])
200202
);
201203

202-
byte[] bytes = Span2Codec.JSON.writeSpans(Arrays.asList(
203-
Span2Converter.fromSpan(spans.get(0)).get(0),
204-
Span2Converter.fromSpan(spans.get(1)).get(0)
204+
byte[] message = MessageEncoder.JSON_BYTES.encode(asList(
205+
Encoder.JSON.encode(Span2Converter.fromSpan(spans.get(0)).get(0)),
206+
Encoder.JSON.encode(Span2Converter.fromSpan(spans.get(1)).get(0))
205207
));
206208

207-
produceSpans(bytes, builder.topic);
209+
produceSpans(message, builder.topic);
208210

209211
try (KafkaCollector collector = builder.build()) {
210212
collector.start();
211213
assertThat(receivedSpans.take()).containsAll(spans);
212214
}
213215

214216
assertThat(kafkaMetrics.messages()).isEqualTo(1);
215-
assertThat(kafkaMetrics.bytes()).isEqualTo(bytes.length);
217+
assertThat(kafkaMetrics.bytes()).isEqualTo(message.length);
216218
assertThat(kafkaMetrics.spans()).isEqualTo(spans.size());
217219
}
218220

zipkin-junit/src/main/java/zipkin/junit/ZipkinDispatcher.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
import zipkin.SpanDecoder;
2929
import zipkin.collector.Collector;
3030
import zipkin.collector.CollectorMetrics;
31-
import zipkin.internal.Span2JsonDecoder;
31+
import zipkin.internal.Span2JsonSpanDecoder;
3232
import zipkin.storage.Callback;
3333
import zipkin.storage.QueryRequest;
3434
import zipkin.storage.SpanStore;
@@ -37,7 +37,7 @@
3737
import static zipkin.internal.Util.lowerHexToUnsignedLong;
3838

3939
final class ZipkinDispatcher extends Dispatcher {
40-
static final SpanDecoder JSON2_DECODER = new Span2JsonDecoder();
40+
static final SpanDecoder JSON2_DECODER = new Span2JsonSpanDecoder();
4141

4242
private final SpanStore store;
4343
private final Collector consumer;

zipkin-junit/src/test/java/zipkin/junit/ZipkinRuleTest.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,9 @@
3131
import zipkin.Codec;
3232
import zipkin.Span;
3333
import zipkin.internal.ApplyTimestampAndDuration;
34-
import zipkin.internal.Span2Codec;
3534
import zipkin.internal.Span2Converter;
35+
import zipkin.internal.v2.codec.MessageEncoder;
36+
import zipkin.internal.v2.codec.Encoder;
3637

3738
import static java.lang.String.format;
3839
import static java.util.Arrays.asList;
@@ -68,15 +69,15 @@ public void getTraces_storedViaPostVersion2() throws IOException {
6869
ApplyTimestampAndDuration.apply(LOTS_OF_SPANS[1])
6970
);
7071

71-
byte[] bytes = Span2Codec.JSON.writeSpans(Arrays.asList(
72-
Span2Converter.fromSpan(spans.get(0)).get(0),
73-
Span2Converter.fromSpan(spans.get(1)).get(0)
72+
byte[] message = MessageEncoder.JSON_BYTES.encode(asList(
73+
Encoder.JSON.encode(Span2Converter.fromSpan(spans.get(0)).get(0)),
74+
Encoder.JSON.encode(Span2Converter.fromSpan(spans.get(1)).get(0))
7475
));
7576

7677
// write the span to the zipkin using http api v2
7778
Response response = client.newCall(new Request.Builder()
7879
.url(zipkin.httpUrl() + "/api/v2/spans")
79-
.post(RequestBody.create(MediaType.parse("application/json"), bytes)).build()
80+
.post(RequestBody.create(MediaType.parse("application/json"), message)).build()
8081
).execute();
8182
assertThat(response.code()).isEqualTo(202);
8283

zipkin-server/src/main/java/zipkin/server/ZipkinHttpCollector.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,12 @@
2727
import org.springframework.web.bind.annotation.RequestHeader;
2828
import org.springframework.web.bind.annotation.RequestMapping;
2929
import org.springframework.web.bind.annotation.RestController;
30-
import zipkin.Span;
3130
import zipkin.SpanDecoder;
3231
import zipkin.collector.Collector;
3332
import zipkin.collector.CollectorMetrics;
3433
import zipkin.collector.CollectorSampler;
3534
import zipkin.internal.Nullable;
36-
import zipkin.internal.Span2JsonDecoder;
35+
import zipkin.internal.Span2JsonSpanDecoder;
3736
import zipkin.storage.Callback;
3837
import zipkin.storage.StorageComponent;
3938

@@ -48,7 +47,7 @@
4847
public class ZipkinHttpCollector {
4948
static final ResponseEntity<?> SUCCESS = ResponseEntity.accepted().build();
5049
static final String APPLICATION_THRIFT = "application/x-thrift";
51-
static final SpanDecoder JSON2_DECODER = new Span2JsonDecoder();
50+
static final SpanDecoder JSON2_DECODER = new Span2JsonSpanDecoder();
5251

5352
final CollectorMetrics metrics;
5453
final Collector collector;

zipkin-server/src/test/java/zipkin/server/ZipkinServerIntegrationTest.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
*/
1414
package zipkin.server;
1515

16-
import java.util.Collections;
1716
import okio.Buffer;
1817
import okio.GzipSink;
1918
import org.junit.Before;
@@ -33,8 +32,9 @@
3332
import zipkin.Codec;
3433
import zipkin.Span;
3534
import zipkin.internal.ApplyTimestampAndDuration;
36-
import zipkin.internal.Span2Codec;
3735
import zipkin.internal.Span2Converter;
36+
import zipkin.internal.v2.codec.MessageEncoder;
37+
import zipkin.internal.v2.codec.Encoder;
3838
import zipkin.storage.InMemoryStorage;
3939

4040
import static java.lang.String.format;
@@ -85,11 +85,11 @@ public void writeSpans_noContentTypeIsJson() throws Exception {
8585
public void writeSpans_version2() throws Exception {
8686
Span span = ApplyTimestampAndDuration.apply(LOTS_OF_SPANS[0]);
8787

88-
byte[] bytes = Span2Codec.JSON.writeSpans(Collections.singletonList(
89-
Span2Converter.fromSpan(span).get(0)
88+
byte[] message = MessageEncoder.JSON_BYTES.encode(asList(
89+
Encoder.JSON.encode(Span2Converter.fromSpan(span).get(0))
9090
));
9191

92-
performAsync(post("/api/v2/spans").content(bytes))
92+
performAsync(post("/api/v2/spans").content(message))
9393
.andExpect(status().isAccepted());
9494

9595
// sleep as the the storage operation is async

zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/ElasticsearchHttpSpanConsumer.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,8 @@
2626
import zipkin.Span;
2727
import zipkin.internal.Nullable;
2828
import zipkin.internal.Span2;
29-
import zipkin.internal.Span2Codec;
3029
import zipkin.internal.Span2Converter;
30+
import zipkin.internal.v2.codec.Encoder;
3131
import zipkin.storage.AsyncSpanConsumer;
3232
import zipkin.storage.Callback;
3333

@@ -150,9 +150,9 @@ static byte[] prefixWithTimestampMillisAndQuery(Span2 span, @Nullable Long times
150150
if (LOG.isLoggable(Level.FINE)) {
151151
LOG.log(Level.FINE, "Error indexing query for span: " + span, e);
152152
}
153-
return Span2Codec.JSON.writeSpan(span);
153+
return Encoder.JSON.encode(span);
154154
}
155-
byte[] document = Span2Codec.JSON.writeSpan(span);
155+
byte[] document = Encoder.JSON.encode(span);
156156
if (query.rangeEquals(0L, ByteString.of(new byte[] {'{', '}'}))) {
157157
return document;
158158
}

zipkin-storage/elasticsearch-http/src/test/java/zipkin/storage/elasticsearch/http/ElasticsearchHttpSpanConsumerTest.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,9 @@
2929
import zipkin.TestObjects;
3030
import zipkin.internal.CallbackCaptor;
3131
import zipkin.internal.Span2;
32-
import zipkin.internal.Span2Codec;
3332
import zipkin.internal.Span2Converter;
33+
import zipkin.internal.v2.codec.MessageEncoder;
34+
import zipkin.internal.v2.codec.Decoder;
3435

3536
import static java.util.Arrays.asList;
3637
import static org.assertj.core.api.Assertions.assertThat;
@@ -137,9 +138,12 @@ public void close() throws IOException {
137138
.timestamp(TODAY * 1000).build();
138139
Span2 span2 = Span2Converter.fromSpan(span).get(0);
139140

140-
byte[] document = prefixWithTimestampMillisAndQuery(span2, span.timestamp);
141-
assertThat(Span2Codec.JSON.readSpan(document))
142-
.isEqualTo(span2); // ignores timestamp_millis field
141+
byte[] message = MessageEncoder.JSON_BYTES.encode(asList(
142+
prefixWithTimestampMillisAndQuery(span2, span.timestamp)
143+
));
144+
145+
assertThat(Decoder.JSON.decodeList(message))
146+
.containsOnly(span2); // ignores timestamp_millis field
143147
}
144148

145149
@Test public void doesntWriteSpanId() throws Exception {

zipkin-storage/elasticsearch-http/src/test/java/zipkin/storage/elasticsearch/http/JsonAdaptersTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,9 @@
2525
import zipkin.TestObjects;
2626
import zipkin.internal.ApplyTimestampAndDuration;
2727
import zipkin.internal.Span2;
28-
import zipkin.internal.Span2Codec;
2928
import zipkin.internal.Span2Converter;
3029
import zipkin.internal.Util;
30+
import zipkin.internal.v2.codec.Encoder;
3131

3232
import static org.assertj.core.api.Assertions.assertThat;
3333
import static org.assertj.core.api.Assertions.entry;
@@ -137,7 +137,7 @@ public void span_roundTrip() throws IOException {
137137
Span span = ApplyTimestampAndDuration.apply(TestObjects.LOTS_OF_SPANS[0]);
138138
Span2 span2 = Span2Converter.fromSpan(span).get(0);
139139
Buffer bytes = new Buffer();
140-
bytes.write(Span2Codec.JSON.writeSpan(span2));
140+
bytes.write(Encoder.JSON.encode(span2));
141141
assertThat(SPAN_ADAPTER.fromJson(bytes))
142142
.isEqualTo(span);
143143
}
@@ -162,7 +162,7 @@ public void span_specialCharsInJson() throws IOException {
162162
.build();
163163

164164
Buffer bytes = new Buffer();
165-
bytes.write(Span2Codec.JSON.writeSpan(worstSpanInTheWorld));
165+
bytes.write(Encoder.JSON.encode(worstSpanInTheWorld));
166166
assertThat(SPAN_ADAPTER.fromJson(bytes))
167167
.isEqualTo(Span2Converter.toSpan(worstSpanInTheWorld));
168168
}

zipkin/src/main/java/zipkin/Annotation.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
*/
1414
package zipkin;
1515

16+
import java.io.Serializable;
1617
import zipkin.internal.JsonCodec;
1718
import zipkin.internal.Nullable;
1819

@@ -25,7 +26,8 @@
2526
*
2627
* <p>Unlike log statements, annotations are often codes: Ex. {@link Constants#SERVER_RECV "sr"}.
2728
*/
28-
public final class Annotation implements Comparable<Annotation> {
29+
public final class Annotation implements Comparable<Annotation>, Serializable { // for Spark jobs
30+
private static final long serialVersionUID = 0L;
2931

3032
public static Annotation create(long timestamp, String value, @Nullable Endpoint endpoint) {
3133
return new Annotation(timestamp, value, endpoint);

zipkin/src/main/java/zipkin/Endpoint.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
*/
1414
package zipkin;
1515

16+
import java.io.Serializable;
1617
import java.net.InetAddress;
1718
import java.nio.ByteBuffer;
1819
import java.util.Arrays;
@@ -34,7 +35,8 @@
3435
* exception allows zipkin to display network context of uninstrumented services, or clients such as
3536
* web browsers.
3637
*/
37-
public final class Endpoint {
38+
public final class Endpoint implements Serializable { // for Spark jobs
39+
private static final long serialVersionUID = 0L;
3840

3941
/**
4042
* @deprecated as leads to null pointer exceptions on port. Use {@link #builder()} instead.

0 commit comments

Comments
 (0)