Skip to content

Commit 81c7cfc

Browse files
authored
Merge pull request openzipkin#1187 from openzipkin/dependency-link-replay
Allows dependency link jobs to be replayed
2 parents 7bbc94f + d40b21e commit 81c7cfc

File tree

8 files changed

+59
-42
lines changed

8 files changed

+59
-42
lines changed

zipkin-storage/cassandra/src/main/java/zipkin/storage/cassandra/CassandraSpanStore.java

Lines changed: 4 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -50,9 +50,9 @@
5050
import zipkin.Span;
5151
import zipkin.internal.CorrectForClockSkew;
5252
import zipkin.internal.Dependencies;
53+
import zipkin.internal.DependencyLinker;
5354
import zipkin.internal.MergeById;
5455
import zipkin.internal.Nullable;
55-
import zipkin.internal.Pair;
5656
import zipkin.storage.QueryRequest;
5757
import zipkin.storage.guava.GuavaSpanStore;
5858

@@ -368,24 +368,14 @@ enum ConvertDependenciesResponse implements Function<ResultSet, List<DependencyL
368368
INSTANCE;
369369

370370
@Override public List<DependencyLink> apply(ResultSet rs) {
371-
// Combine the dependency links from startEpochDayMillis until endEpochDayMillis
372-
Map<Pair<String>, Long> links = new LinkedHashMap<>();
373-
371+
ImmutableList.Builder<DependencyLink> unmerged = ImmutableList.builder();
374372
for (Row row : rs) {
375373
ByteBuffer encodedDayOfDependencies = row.getBytes("dependencies");
376374
for (DependencyLink link : Dependencies.fromThrift(encodedDayOfDependencies).links) {
377-
Pair<String> parentChild = Pair.create(link.parent, link.child);
378-
long callCount = links.containsKey(parentChild) ? links.get(parentChild) : 0L;
379-
callCount += link.callCount;
380-
links.put(parentChild, callCount);
375+
unmerged.add(link);
381376
}
382377
}
383-
384-
List<DependencyLink> result = new ArrayList<>(links.size());
385-
for (Map.Entry<Pair<String>, Long> link : links.entrySet()) {
386-
result.add(DependencyLink.create(link.getKey()._1, link.getKey()._2, link.getValue()));
387-
}
388-
return result;
378+
return DependencyLinker.merge(unmerged.build());
389379
}
390380
}
391381

zipkin-storage/elasticsearch/src/main/java/zipkin/storage/elasticsearch/ElasticsearchSpanStore.java

Lines changed: 8 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -42,12 +42,11 @@
4242
import org.elasticsearch.search.aggregations.bucket.nested.Nested;
4343
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
4444
import org.elasticsearch.search.aggregations.bucket.terms.Terms.Order;
45-
import org.elasticsearch.search.aggregations.metrics.sum.Sum;
46-
import org.elasticsearch.search.aggregations.metrics.tophits.TopHits;
4745
import zipkin.Codec;
4846
import zipkin.DependencyLink;
4947
import zipkin.Span;
5048
import zipkin.internal.CorrectForClockSkew;
49+
import zipkin.internal.DependencyLinker;
5150
import zipkin.internal.MergeById;
5251
import zipkin.internal.Nullable;
5352
import zipkin.internal.Util;
@@ -332,12 +331,6 @@ enum ConvertSpanNameResponse implements Function<SearchResponse, List<String>> {
332331
strings.toArray(new String[strings.size()]))
333332
.setIndicesOptions(IndicesOptions.lenientExpandOpen())
334333
.setTypes(ElasticsearchConstants.DEPENDENCY_LINK)
335-
.addAggregation(AggregationBuilders.terms("parent_child_agg")
336-
.field("parent_child")
337-
.subAggregation(AggregationBuilders.topHits("hits_agg")
338-
.setSize(1))
339-
.subAggregation(AggregationBuilders.sum("callCount_agg")
340-
.field("callCount")))
341334
.setQuery(matchAllQuery());
342335

343336
return Futures.transform(ElasticFutures.toGuava(elasticRequest.execute()), ConvertDependenciesResponse.INSTANCE);
@@ -347,24 +340,17 @@ enum ConvertDependenciesResponse implements Function<SearchResponse, List<Depend
347340
INSTANCE;
348341

349342
@Override public List<DependencyLink> apply(SearchResponse response) {
350-
if (response.getAggregations() == null) {
351-
return Collections.emptyList();
352-
}
353-
Terms parentChildAgg = response.getAggregations().get("parent_child_agg");
354-
if (parentChildAgg == null) {
343+
if (response.getHits() == null) {
355344
return Collections.emptyList();
356345
}
357-
ImmutableList.Builder<DependencyLink> links = ImmutableList.builder();
358-
for (Terms.Bucket bucket : parentChildAgg.getBuckets()) {
359-
TopHits hitsAgg = bucket.getAggregations().get("hits_agg");
360-
Sum callCountAgg = bucket.getAggregations().get("callCount_agg");
361-
// We would have no bucket if there wasn't a hit, so this should always be non-empty.
362-
SearchHit hit = hitsAgg.getHits().getAt(0);
346+
347+
ImmutableList.Builder<DependencyLink> unmerged = ImmutableList.builder();
348+
for (SearchHit hit : response.getHits()) {
363349
DependencyLink link = Codec.JSON.readDependencyLink(hit.getSourceRef().toBytes());
364-
link = link.toBuilder().callCount((long) callCountAgg.getValue()).build();
365-
links.add(link);
350+
unmerged.add(link);
366351
}
367-
return links.build();
352+
353+
return DependencyLinker.merge(unmerged.build());
368354
}
369355
}
370356

zipkin-storage/elasticsearch/src/main/java/zipkin/storage/elasticsearch/ElasticsearchStorage.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,10 +140,10 @@ public Client client() {
140140
request.add(lazyClient.get().prepareIndex(
141141
indexNameFormatter.indexNameForTimestamp(midnight),
142142
ElasticsearchConstants.DEPENDENCY_LINK)
143+
.setId(link.parent + "|" + link.child) // Unique constraint
143144
.setSource(
144145
"parent", link.parent,
145146
"child", link.child,
146-
"parent_child", link.parent + "|" + link.child, // For aggregating callCount
147147
"callCount", link.callCount));
148148
}
149149
request.execute().actionGet();

zipkin-storage/elasticsearch/src/test/java/zipkin/storage/elasticsearch/ElasticsearchTestGraph.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ enum ElasticsearchTestGraph {
2929

3030
@Override protected ElasticsearchStorage compute() {
3131
if (ex != null) throw ex;
32-
ElasticsearchStorage result = new ElasticsearchStorage.Builder().build();
32+
ElasticsearchStorage result = new ElasticsearchStorage.Builder().index("test_zipkin").build();
3333
CheckResult check = result.check();
3434
if (check.ok) return result;
3535
throw ex = new AssumptionViolatedException(check.exception.getMessage());

zipkin/src/main/java/zipkin/DependencyLink.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,11 +42,11 @@ public static DependencyLink create(String parent, String child, long callCount)
4242
this.callCount = callCount;
4343
}
4444

45-
public Builder toBuilder(){
45+
public Builder toBuilder() {
4646
return new Builder(this);
4747
}
4848

49-
public static Builder builder(){
49+
public static Builder builder() {
5050
return new Builder();
5151
}
5252

zipkin/src/main/java/zipkin/internal/DependencyLinker.java

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,7 @@
2929
* <p>This implementation traverses the tree, and only creates links between {@link
3030
* DependencyLinkSpan.Kind#SERVER server} spans. One exception is at the bottom of the trace tree.
3131
* {@link DependencyLinkSpan.Kind#CLIENT client} spans that record their {@link
32-
* DependencyLinkSpan#peerService peer} are included, as this accounts for uninstrumented
33-
* services.
32+
* DependencyLinkSpan#peerService peer} are included, as this accounts for uninstrumented services.
3433
*/
3534
public final class DependencyLinker {
3635
private static final Logger logger = Logger.getLogger(DependencyLinker.class.getName());
@@ -120,4 +119,22 @@ public List<DependencyLink> link() {
120119
}
121120
return result;
122121
}
122+
123+
/** links are merged by mapping to parent/child and summing corresponding links */
124+
public static List<DependencyLink> merge(Iterable<DependencyLink> in) {
125+
Map<Pair<String>, Long> links = new LinkedHashMap<>();
126+
127+
for (DependencyLink link : in) {
128+
Pair<String> parentChild = Pair.create(link.parent, link.child);
129+
long callCount = links.containsKey(parentChild) ? links.get(parentChild) : 0L;
130+
callCount += link.callCount;
131+
links.put(parentChild, callCount);
132+
}
133+
134+
List<DependencyLink> result = new ArrayList<>(links.size());
135+
for (Map.Entry<Pair<String>, Long> link : links.entrySet()) {
136+
result.add(DependencyLink.create(link.getKey()._1, link.getKey()._2, link.getValue()));
137+
}
138+
return result;
139+
}
123140
}

zipkin/src/test/java/zipkin/internal/DependencyLinkerTest.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,4 +169,18 @@ public void cannotLinkSingleSpanWithoutBothServiceNames() {
169169
.isEmpty();
170170
}
171171
}
172+
173+
@Test
174+
public void merge() {
175+
List<DependencyLink> links = asList(
176+
DependencyLink.create("client", "server", 2L),
177+
DependencyLink.create("client", "server", 2L),
178+
DependencyLink.create("client", "client", 1L)
179+
);
180+
181+
assertThat(DependencyLinker.merge(links)).containsExactly(
182+
DependencyLink.create("client", "server", 4L),
183+
DependencyLink.create("client", "client", 1L)
184+
);
185+
}
172186
}

zipkin/src/test/java/zipkin/storage/DependenciesTest.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,16 @@ public void getDependencies() {
8484
.containsOnlyElementsOf(LINKS);
8585
}
8686

87+
/** It should be safe to run dependency link jobs twice */
88+
@Test
89+
public void replayOverwrites() {
90+
processDependencies(TRACE);
91+
processDependencies(TRACE);
92+
93+
assertThat(store().getDependencies(TODAY + 1000L, null))
94+
.containsOnlyElementsOf(LINKS);
95+
}
96+
8797
/** Edge-case when there are no spans, or instrumentation isn't logging annotations properly. */
8898
@Test
8999
public void empty() {

0 commit comments

Comments
 (0)