Skip to content

Commit c709914

Browse files
- store annotations in the indexed text column individually, providing better search usability in the UI, and less disk space used,
- remove fetch size to statements. that's not needed against this datamodel. ref: openzipkin#1289
1 parent 14b64be commit c709914

File tree

4 files changed

+26
-27
lines changed

4 files changed

+26
-27
lines changed

zipkin-storage/cassandra3/src/main/java/zipkin/storage/cassandra3/CassandraSpanConsumer.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -99,8 +99,7 @@ public ListenableFuture<Void> accept(List<Span> rawSpans) {
9999
if (timestamp != null) {
100100
// Contract for Repository.storeServiceSpanName is to store the span twice, once with
101101
// the span name and another with empty string.
102-
futures.add(storeServiceSpanName(serviceName, span.name, timestamp, span.duration,
103-
span.traceId));
102+
futures.add(storeServiceSpanName(serviceName, span.name, timestamp, span.duration, span.traceId));
104103
if (!span.name.isEmpty()) { // If span.name == "", this would be redundant
105104
futures.add(
106105
storeServiceSpanName(serviceName, "", timestamp, span.duration, span.traceId));

zipkin-storage/cassandra3/src/main/java/zipkin/storage/cassandra3/CassandraSpanStore.java

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -214,8 +214,7 @@ public ListenableFuture<List<List<Span>>> getTraces(final QueryRequest request)
214214
futureKeySetsToIntersect.add(traceIdToTimestamp);
215215
for (String annotationKey : annotationKeys) {
216216
futureKeySetsToIntersect
217-
.add(getTraceIdsByAnnotation(annotationKey, request.endTs, request.lookback,
218-
traceIndexFetchSize));
217+
.add(getTraceIdsByAnnotation(annotationKey, request.endTs, request.lookback, traceIndexFetchSize));
219218
}
220219
// We achieve the AND goal, by intersecting each of the key sets.
221220
traceIds =
@@ -225,8 +224,7 @@ public ListenableFuture<List<List<Span>>> getTraces(final QueryRequest request)
225224
return transform(traceIds, new AsyncFunction<Collection<BigInteger>, List<List<Span>>>() {
226225
@Override public ListenableFuture<List<List<Span>>> apply(Collection<BigInteger> traceIds) {
227226
traceIds = FluentIterable.from(traceIds).limit(request.limit).toSet();
228-
return transform(getSpansByTraceIds(ImmutableSet.copyOf(traceIds), maxTraceCols),
229-
AdjustTraces.INSTANCE);
227+
return transform(getSpansByTraceIds(ImmutableSet.copyOf(traceIds), maxTraceCols), AdjustTraces.INSTANCE);
230228
}
231229

232230
@Override public String toString() {
@@ -355,8 +353,6 @@ ListenableFuture<Collection<List<Span>>> getSpansByTraceIds(Set<BigInteger> trac
355353
.setSet("trace_id", traceIds)
356354
.setInt("limit_", limit);
357355

358-
bound.setFetchSize(Integer.MAX_VALUE);
359-
360356
return transform(session.executeAsync(bound),
361357
new Function<ResultSet, Collection<List<Span>>>() {
362358
@Override public Collection<List<Span>> apply(ResultSet input) {
@@ -481,8 +477,6 @@ ListenableFuture<Map<BigInteger, Long>> getTraceIdsByAnnotation(
481477
.setUUID("end_ts", UUIDs.endOf(endTsMillis))
482478
.setInt("limit_", limit);
483479

484-
bound.setFetchSize(Integer.MAX_VALUE);
485-
486480
return transform(session.executeAsync(bound),
487481
new Function<ResultSet, Map<BigInteger, Long>>() {
488482
@Override public Map<BigInteger, Long> apply(ResultSet input) {

zipkin-storage/cassandra3/src/main/java/zipkin/storage/cassandra3/CassandraUtil.java

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.math.BigInteger;
2222
import java.util.Collection;
2323
import java.util.Collections;
24+
import java.util.HashSet;
2425
import java.util.LinkedHashSet;
2526
import java.util.List;
2627
import java.util.Map;
@@ -73,19 +74,20 @@ static Set<String> annotationKeys(Span span) {
7374
if (Constants.CORE_ANNOTATIONS.contains(a.value)) continue;
7475

7576
if (a.endpoint != null && !a.endpoint.serviceName.isEmpty()) {
76-
annotationKeys.add(a.endpoint.serviceName + ":" + a.value);
77+
annotationKeys.add(a.endpoint.serviceName);
78+
annotationKeys.add(a.value);
7779
}
7880
}
7981
for (BinaryAnnotation b : span.binaryAnnotations) {
80-
if (b.type == BinaryAnnotation.Type.STRING
81-
&& b.endpoint != null
82-
&& !b.endpoint.serviceName.isEmpty()
83-
&& b.value.length <= LONGEST_VALUE_TO_INDEX * 4) { // UTF_8 is up to 4bytes/char
84-
String value = new String(b.value, UTF_8);
85-
if (value.length() > LONGEST_VALUE_TO_INDEX) continue;
86-
87-
annotationKeys.add(b.endpoint.serviceName + ":" + b.key);
88-
annotationKeys.add(b.endpoint.serviceName + ":" + b.key + ":" + new String(b.value, UTF_8));
82+
if (b.endpoint != null && !b.endpoint.serviceName.isEmpty() && !Constants.CORE_ANNOTATIONS.contains(b.key)) {
83+
annotationKeys.add(b.endpoint.serviceName);
84+
if (b.type == BinaryAnnotation.Type.STRING) {
85+
String value = new String(b.value, UTF_8);
86+
value = value.substring(0, Math.min(value.length(), LONGEST_VALUE_TO_INDEX));
87+
annotationKeys.add(b.key + ":" + value);
88+
} else {
89+
annotationKeys.add(b.key);
90+
}
8991
}
9092
}
9193
return annotationKeys;
@@ -96,12 +98,13 @@ static List<String> annotationKeys(QueryRequest request) {
9698
return Collections.emptyList();
9799
}
98100
checkArgument(request.serviceName != null, "serviceName needed with annotation query");
99-
Set<String> annotationKeys = new LinkedHashSet<>();
101+
Set<String> annotationKeys = new HashSet<>();
102+
annotationKeys.add(request.serviceName);
100103
for (String a : request.annotations) { // doesn't include CORE_ANNOTATIONS
101-
annotationKeys.add(request.serviceName + ":" + a);
104+
annotationKeys.add(a);
102105
}
103106
for (Map.Entry<String, String> b : request.binaryAnnotations.entrySet()) {
104-
annotationKeys.add(request.serviceName + ":" + b.getKey() + ":" + b.getValue());
107+
annotationKeys.add(b.getKey() + ":" + b.getValue());
105108
}
106109
return sortedList(annotationKeys);
107110
}

zipkin-storage/cassandra3/src/test/java/zipkin/storage/cassandra3/CassandraUtilTest.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ public void annotationKeys() {
5252
.serviceName("service")
5353
.addAnnotation(Constants.ERROR)
5454
.addBinaryAnnotation(TraceKeys.HTTP_METHOD, "GET").build()))
55-
.containsExactly("service:error", "service:http.method:GET");
55+
.containsExactly("error", "http.method:GET", "service");
5656
}
5757

5858
@Test
@@ -62,7 +62,7 @@ public void annotationKeys_dedupes() {
6262
.serviceName("service")
6363
.addAnnotation(Constants.ERROR)
6464
.addAnnotation(Constants.ERROR).build()))
65-
.containsExactly("service:error");
65+
.containsExactly( "error", "service");
6666
}
6767

6868
@Test
@@ -78,7 +78,7 @@ public void annotationKeys_skipsCoreAndAddressAnnotations() throws Exception {
7878
.containsOnly(Constants.SERVER_ADDR, Constants.CLIENT_ADDR);
7979

8080
assertThat(CassandraUtil.annotationKeys(span))
81-
.isEmpty();
81+
.containsExactly("web", "ca", "app", "sa");
8282
}
8383

8484
@Test
@@ -95,6 +95,9 @@ public void annotationKeys_skipsBinaryAnnotationsLongerThan256chars() throws Exc
9595
)).build();
9696

9797
assertThat(CassandraUtil.annotationKeys(span))
98-
.containsOnly("web:aws.arn", "web:aws.arn:" + arn);
98+
.containsOnly(
99+
"web",
100+
"aws.arn:" + arn,
101+
TraceKeys.HTTP_URL + ":" + url.substring(0, CassandraUtil.LONGEST_VALUE_TO_INDEX));
99102
}
100103
}

0 commit comments

Comments
 (0)