Skip to content

Commit d438abf

Browse files
authored
Merge pull request openzipkin#1196 from openzipkin/mysql-dependencies
Conditionally supports pre-aggregated dependencies in MySQL
2 parents 723a422 + a80c90a commit d438abf

File tree

14 files changed

+323
-55
lines changed

14 files changed

+323
-55
lines changed

pom.xml

+1-1
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@
5353
<cassandra-driver-core.version>3.0.2</cassandra-driver-core.version>
5454
<moshi.version>1.2.0</moshi.version>
5555
<okio.version>1.9.0</okio.version>
56-
<jooq.version>3.8.3</jooq.version>
56+
<jooq.version>3.8.4</jooq.version>
5757
<spring-boot.version>1.3.6.RELEASE</spring-boot.version>
5858
<!-- MySQL connector is GPL, even if it has an OSS exception.
5959
https://www.mysql.com/about/legal/licensing/foss-exception/

zipkin-storage/cassandra/src/main/java/zipkin/storage/cassandra/CassandraSpanStore.java

+2-12
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@
6363
import static com.google.common.util.concurrent.Futures.immediateFailedFuture;
6464
import static com.google.common.util.concurrent.Futures.immediateFuture;
6565
import static com.google.common.util.concurrent.Futures.transform;
66+
import static zipkin.internal.Util.getDays;
6667
import static zipkin.internal.Util.midnightUTC;
6768

6869
public final class CassandraSpanStore implements GuavaSpanStore {
@@ -351,10 +352,7 @@ enum AdjustTraces implements Function<Collection<List<Span>>, List<List<Span>>>
351352

352353
@Override public ListenableFuture<List<DependencyLink>> getDependencies(long endTs,
353354
@Nullable Long lookback) {
354-
long endEpochDayMillis = midnightUTC(endTs);
355-
long startEpochDayMillis = midnightUTC(endTs - (lookback != null ? lookback : endTs));
356-
357-
List<Date> days = getDays(startEpochDayMillis, endEpochDayMillis);
355+
List<Date> days = getDays(endTs, lookback);
358356
try {
359357
BoundStatement bound = CassandraUtil.bindWithName(selectDependencies, "select-dependencies")
360358
.setList("days", days);
@@ -585,12 +583,4 @@ class DurationRow {
585583
return String.format("trace_id=%d, duration=%d, timestamp=%d", trace_id, duration, timestamp);
586584
}
587585
}
588-
589-
private static List<Date> getDays(long from, long to) {
590-
List<Date> days = new ArrayList<>();
591-
for (long time = from; time <= to; time += TimeUnit.DAYS.toMillis(1)) {
592-
days.add(new Date(time));
593-
}
594-
return days;
595-
}
596586
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/**
2+
* Copyright 2015-2016 The OpenZipkin Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
5+
* in compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License
10+
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
11+
* or implied. See the License for the specific language governing permissions and limitations under
12+
* the License.
13+
*/
14+
package zipkin.storage.mysql;
15+
16+
import java.sql.Connection;
17+
import java.sql.SQLException;
18+
import java.util.logging.Level;
19+
import java.util.logging.Logger;
20+
import javax.sql.DataSource;
21+
import org.jooq.DSLContext;
22+
import org.jooq.exception.DataAccessException;
23+
import zipkin.internal.Lazy;
24+
25+
import static org.jooq.impl.DSL.count;
26+
import static zipkin.storage.mysql.internal.generated.tables.ZipkinDependencies.ZIPKIN_DEPENDENCIES;
27+
28+
/**
29+
* Returns true when the zipkin_dependencies table exists and has data in it, implying the spark job
30+
* has been run.
31+
*/
32+
final class HasPreAggregatedDependencies extends Lazy<Boolean> {
33+
private static final Logger LOG = Logger.getLogger(HasPreAggregatedDependencies.class.getName());
34+
35+
final DataSource datasource;
36+
final DSLContexts context;
37+
38+
HasPreAggregatedDependencies(DataSource datasource, DSLContexts context) {
39+
this.datasource = datasource;
40+
this.context = context;
41+
}
42+
43+
@Override protected Boolean compute() {
44+
try (Connection conn = datasource.getConnection()) {
45+
DSLContext dsl = context.get(conn);
46+
return dsl.select(count()).from(ZIPKIN_DEPENDENCIES).fetchAny().value1() > 0;
47+
} catch (DataAccessException e) {
48+
if (e.sqlState().equals("42S02")) {
49+
LOG.warning("zipkin_dependencies doesn't exist, so pre-aggregated dependencies are not " +
50+
"supported. Execute mysql.sql located in this jar to add the table");
51+
return false;
52+
}
53+
problemReading(e);
54+
} catch (SQLException | RuntimeException e) {
55+
problemReading(e);
56+
}
57+
return false;
58+
}
59+
60+
static void problemReading(Exception e) {
61+
LOG.log(Level.WARNING, "problem reading zipkin_dependencies", e);
62+
}
63+
}

zipkin-storage/mysql/src/main/java/zipkin/storage/mysql/MySQLSpanStore.java

+53-30
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import java.util.ArrayList;
1919
import java.util.Arrays;
2020
import java.util.Collections;
21+
import java.util.Date;
2122
import java.util.Iterator;
2223
import java.util.LinkedHashMap;
2324
import java.util.List;
@@ -58,7 +59,9 @@
5859
import static zipkin.Constants.SERVER_ADDR;
5960
import static zipkin.Constants.SERVER_RECV;
6061
import static zipkin.internal.Util.UTF_8;
62+
import static zipkin.internal.Util.getDays;
6163
import static zipkin.storage.mysql.internal.generated.tables.ZipkinAnnotations.ZIPKIN_ANNOTATIONS;
64+
import static zipkin.storage.mysql.internal.generated.tables.ZipkinDependencies.ZIPKIN_DEPENDENCIES;
6265
import static zipkin.storage.mysql.internal.generated.tables.ZipkinSpans.ZIPKIN_SPANS;
6366

6467
final class MySQLSpanStore implements SpanStore {
@@ -74,11 +77,14 @@ final class MySQLSpanStore implements SpanStore {
7477
private final DataSource datasource;
7578
private final DSLContexts context;
7679
private final Lazy<Boolean> hasIpv6;
80+
private final Lazy<Boolean> hasPreAggregatedDependencies;
7781

78-
MySQLSpanStore(DataSource datasource, DSLContexts context, Lazy<Boolean> hasIpv6) {
82+
MySQLSpanStore(DataSource datasource, DSLContexts context, Lazy<Boolean> hasIpv6,
83+
Lazy<Boolean> hasPreAggregatedDependencies) {
7984
this.datasource = datasource;
8085
this.context = context;
8186
this.hasIpv6 = hasIpv6;
87+
this.hasPreAggregatedDependencies = hasPreAggregatedDependencies;
8288
}
8389

8490
private Endpoint endpoint(Record a) {
@@ -273,39 +279,56 @@ public List<String> getSpanNames(String serviceName) {
273279

274280
@Override
275281
public List<DependencyLink> getDependencies(long endTs, @Nullable Long lookback) {
276-
endTs = endTs * 1000;
277282
try (Connection conn = datasource.getConnection()) {
278-
// Lazy fetching the cursor prevents us from buffering the whole dataset in memory.
279-
Cursor<Record5<Long, Long, Long, String, String>> cursor = context.get(conn)
280-
.selectDistinct(ZIPKIN_SPANS.TRACE_ID, ZIPKIN_SPANS.PARENT_ID, ZIPKIN_SPANS.ID,
281-
ZIPKIN_ANNOTATIONS.A_KEY, ZIPKIN_ANNOTATIONS.ENDPOINT_SERVICE_NAME)
282-
// left joining allows us to keep a mapping of all span ids, not just ones that have
283-
// special annotations. We need all span ids to reconstruct the trace tree. We need
284-
// the whole trace tree so that we can accurately skip local spans.
285-
.from(ZIPKIN_SPANS.leftJoin(ZIPKIN_ANNOTATIONS)
286-
.on(ZIPKIN_SPANS.TRACE_ID.eq(ZIPKIN_ANNOTATIONS.TRACE_ID).and(
287-
ZIPKIN_SPANS.ID.eq(ZIPKIN_ANNOTATIONS.SPAN_ID)))
288-
.and(ZIPKIN_ANNOTATIONS.A_KEY.in(CLIENT_ADDR, SERVER_RECV, SERVER_ADDR)))
289-
.where(lookback == null ?
290-
ZIPKIN_SPANS.START_TS.lessOrEqual(endTs) :
291-
ZIPKIN_SPANS.START_TS.between(endTs - lookback * 1000, endTs))
292-
// Grouping so that later code knows when a span or trace is finished.
293-
.groupBy(ZIPKIN_SPANS.TRACE_ID, ZIPKIN_SPANS.ID, ZIPKIN_ANNOTATIONS.A_KEY).fetchLazy();
294-
295-
Iterator<Iterator<DependencyLinkSpan>> traces =
296-
new DependencyLinkSpanIterator.ByTraceId(cursor.iterator());
297-
298-
if (!traces.hasNext()) return Collections.emptyList();
299-
300-
DependencyLinker linker = new DependencyLinker();
301-
302-
while (traces.hasNext()) {
303-
linker.putTrace(traces.next());
283+
if (hasPreAggregatedDependencies.get()) {
284+
List<Date> days = getDays(endTs, lookback);
285+
List<DependencyLink> unmerged = context.get(conn)
286+
.selectFrom(ZIPKIN_DEPENDENCIES)
287+
.where(ZIPKIN_DEPENDENCIES.DAY.in(days))
288+
.fetch((Record l) -> DependencyLink.create(
289+
l.get(ZIPKIN_DEPENDENCIES.PARENT),
290+
l.get(ZIPKIN_DEPENDENCIES.CHILD),
291+
l.get(ZIPKIN_DEPENDENCIES.CALL_COUNT))
292+
);
293+
return DependencyLinker.merge(unmerged);
294+
} else {
295+
return aggregateDependencies(endTs, lookback, conn);
304296
}
305-
306-
return linker.link();
307297
} catch (SQLException e) {
308298
throw new RuntimeException("Error querying dependencies for endTs " + endTs + " and lookback " + lookback + ": " + e.getMessage());
309299
}
310300
}
301+
302+
List<DependencyLink> aggregateDependencies(long endTs, @Nullable Long lookback, Connection conn) {
303+
endTs = endTs * 1000;
304+
// Lazy fetching the cursor prevents us from buffering the whole dataset in memory.
305+
Cursor<Record5<Long, Long, Long, String, String>> cursor = context.get(conn)
306+
.selectDistinct(ZIPKIN_SPANS.TRACE_ID, ZIPKIN_SPANS.PARENT_ID, ZIPKIN_SPANS.ID,
307+
ZIPKIN_ANNOTATIONS.A_KEY, ZIPKIN_ANNOTATIONS.ENDPOINT_SERVICE_NAME)
308+
// left joining allows us to keep a mapping of all span ids, not just ones that have
309+
// special annotations. We need all span ids to reconstruct the trace tree. We need
310+
// the whole trace tree so that we can accurately skip local spans.
311+
.from(ZIPKIN_SPANS.leftJoin(ZIPKIN_ANNOTATIONS)
312+
.on(ZIPKIN_SPANS.TRACE_ID.eq(ZIPKIN_ANNOTATIONS.TRACE_ID).and(
313+
ZIPKIN_SPANS.ID.eq(ZIPKIN_ANNOTATIONS.SPAN_ID)))
314+
.and(ZIPKIN_ANNOTATIONS.A_KEY.in(CLIENT_ADDR, SERVER_RECV, SERVER_ADDR)))
315+
.where(lookback == null ?
316+
ZIPKIN_SPANS.START_TS.lessOrEqual(endTs) :
317+
ZIPKIN_SPANS.START_TS.between(endTs - lookback * 1000, endTs))
318+
// Grouping so that later code knows when a span or trace is finished.
319+
.groupBy(ZIPKIN_SPANS.TRACE_ID, ZIPKIN_SPANS.ID, ZIPKIN_ANNOTATIONS.A_KEY).fetchLazy();
320+
321+
Iterator<Iterator<DependencyLinkSpan>> traces =
322+
new DependencyLinkSpanIterator.ByTraceId(cursor.iterator());
323+
324+
if (!traces.hasNext()) return Collections.emptyList();
325+
326+
DependencyLinker linker = new DependencyLinker();
327+
328+
while (traces.hasNext()) {
329+
linker.putTrace(traces.next());
330+
}
331+
332+
return linker.link();
333+
}
311334
}

zipkin-storage/mysql/src/main/java/zipkin/storage/mysql/MySQLStorage.java

+13-3
Original file line numberDiff line numberDiff line change
@@ -24,16 +24,17 @@
2424
import zipkin.storage.AsyncSpanConsumer;
2525
import zipkin.storage.AsyncSpanStore;
2626
import zipkin.storage.SpanStore;
27+
import zipkin.storage.StorageAdapters;
2728
import zipkin.storage.StorageComponent;
2829

2930
import static zipkin.internal.Util.checkNotNull;
3031
import static zipkin.storage.StorageAdapters.blockingToAsync;
3132
import static zipkin.storage.mysql.internal.generated.DefaultCatalog.DEFAULT_CATALOG;
3233
import static zipkin.storage.mysql.internal.generated.tables.ZipkinAnnotations.ZIPKIN_ANNOTATIONS;
34+
import static zipkin.storage.mysql.internal.generated.tables.ZipkinDependencies.ZIPKIN_DEPENDENCIES;
3335
import static zipkin.storage.mysql.internal.generated.tables.ZipkinSpans.ZIPKIN_SPANS;
3436

3537
public final class MySQLStorage implements StorageComponent {
36-
3738
public static Builder builder() {
3839
return new Builder();
3940
}
@@ -80,25 +81,33 @@ public MySQLStorage build() {
8081
private final Executor executor;
8182
private final DSLContexts context;
8283
final Lazy<Boolean> hasIpv6;
84+
final Lazy<Boolean> hasPreAggregatedDependencies;
8385
private final SpanStore spanStore;
8486
private final AsyncSpanStore asyncSpanStore;
87+
private final MySQLSpanConsumer spanConsumer;
8588
private final AsyncSpanConsumer asyncSpanConsumer;
8689

8790
MySQLStorage(MySQLStorage.Builder builder) {
8891
this.datasource = checkNotNull(builder.datasource, "datasource");
8992
this.executor = checkNotNull(builder.executor, "executor");
9093
this.context = new DSLContexts(builder.settings, builder.listenerProvider);
9194
this.hasIpv6 = new HasIpv6(datasource, context);
92-
this.spanStore = new MySQLSpanStore(datasource, context, hasIpv6);
95+
this.hasPreAggregatedDependencies = new HasPreAggregatedDependencies(datasource, context);
96+
this.spanStore = new MySQLSpanStore(datasource, context, hasIpv6, hasPreAggregatedDependencies);
9397
this.asyncSpanStore = blockingToAsync(spanStore, executor);
94-
this.asyncSpanConsumer = blockingToAsync(new MySQLSpanConsumer(datasource, context, hasIpv6), executor);
98+
this.spanConsumer = new MySQLSpanConsumer(datasource, context, hasIpv6);
99+
this.asyncSpanConsumer = blockingToAsync(spanConsumer, executor);
95100
}
96101

97102
/** Returns the session in use by this storage component. */
98103
public DataSource datasource() {
99104
return datasource;
100105
}
101106

107+
public StorageAdapters.SpanConsumer spanConsumer() {
108+
return spanConsumer;
109+
}
110+
102111
@Override public SpanStore spanStore() {
103112
return spanStore;
104113
}
@@ -131,6 +140,7 @@ void clear() {
131140
try (Connection conn = datasource.getConnection()) {
132141
context.get(conn).truncate(ZIPKIN_SPANS).execute();
133142
context.get(conn).truncate(ZIPKIN_ANNOTATIONS).execute();
143+
context.get(conn).truncate(ZIPKIN_DEPENDENCIES).execute();
134144
} catch (SQLException | RuntimeException e) {
135145
throw new AssertionError(e);
136146
}

zipkin-storage/mysql/src/main/java/zipkin/storage/mysql/internal/generated/DefaultCatalog.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -33,14 +33,14 @@
3333
@Generated(
3434
value = {
3535
"http://www.jooq.org",
36-
"jOOQ version:3.8.3"
36+
"jOOQ version:3.8.4"
3737
},
3838
comments = "This class is generated by jOOQ"
3939
)
4040
@SuppressWarnings({ "all", "unchecked", "rawtypes" })
4141
public class DefaultCatalog extends CatalogImpl {
4242

43-
private static final long serialVersionUID = -1715313401;
43+
private static final long serialVersionUID = 276513288;
4444

4545
/**
4646
* The reference instance of <code></code>

zipkin-storage/mysql/src/main/java/zipkin/storage/mysql/internal/generated/Tables.java

+7-1
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import javax.annotation.Generated;
2121

2222
import zipkin.storage.mysql.internal.generated.tables.ZipkinAnnotations;
23+
import zipkin.storage.mysql.internal.generated.tables.ZipkinDependencies;
2324
import zipkin.storage.mysql.internal.generated.tables.ZipkinSpans;
2425

2526

@@ -29,7 +30,7 @@
2930
@Generated(
3031
value = {
3132
"http://www.jooq.org",
32-
"jOOQ version:3.8.3"
33+
"jOOQ version:3.8.4"
3334
},
3435
comments = "This class is generated by jOOQ"
3536
)
@@ -41,6 +42,11 @@ public class Tables {
4142
*/
4243
public static final ZipkinAnnotations ZIPKIN_ANNOTATIONS = zipkin.storage.mysql.internal.generated.tables.ZipkinAnnotations.ZIPKIN_ANNOTATIONS;
4344

45+
/**
46+
* The table <code>zipkin.zipkin_dependencies</code>.
47+
*/
48+
public static final ZipkinDependencies ZIPKIN_DEPENDENCIES = zipkin.storage.mysql.internal.generated.tables.ZipkinDependencies.ZIPKIN_DEPENDENCIES;
49+
4450
/**
4551
* The table <code>zipkin.zipkin_spans</code>.
4652
*/

zipkin-storage/mysql/src/main/java/zipkin/storage/mysql/internal/generated/Zipkin.java

+9-2
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.jooq.impl.SchemaImpl;
2929

3030
import zipkin.storage.mysql.internal.generated.tables.ZipkinAnnotations;
31+
import zipkin.storage.mysql.internal.generated.tables.ZipkinDependencies;
3132
import zipkin.storage.mysql.internal.generated.tables.ZipkinSpans;
3233

3334

@@ -37,14 +38,14 @@
3738
@Generated(
3839
value = {
3940
"http://www.jooq.org",
40-
"jOOQ version:3.8.3"
41+
"jOOQ version:3.8.4"
4142
},
4243
comments = "This class is generated by jOOQ"
4344
)
4445
@SuppressWarnings({ "all", "unchecked", "rawtypes" })
4546
public class Zipkin extends SchemaImpl {
4647

47-
private static final long serialVersionUID = 1459649196;
48+
private static final long serialVersionUID = -496810941;
4849

4950
/**
5051
* The reference instance of <code>zipkin</code>
@@ -56,6 +57,11 @@ public class Zipkin extends SchemaImpl {
5657
*/
5758
public final ZipkinAnnotations ZIPKIN_ANNOTATIONS = zipkin.storage.mysql.internal.generated.tables.ZipkinAnnotations.ZIPKIN_ANNOTATIONS;
5859

60+
/**
61+
* The table <code>zipkin.zipkin_dependencies</code>.
62+
*/
63+
public final ZipkinDependencies ZIPKIN_DEPENDENCIES = zipkin.storage.mysql.internal.generated.tables.ZipkinDependencies.ZIPKIN_DEPENDENCIES;
64+
5965
/**
6066
* The table <code>zipkin.zipkin_spans</code>.
6167
*/
@@ -87,6 +93,7 @@ public final List<Table<?>> getTables() {
8793
private final List<Table<?>> getTables0() {
8894
return Arrays.<Table<?>>asList(
8995
ZipkinAnnotations.ZIPKIN_ANNOTATIONS,
96+
ZipkinDependencies.ZIPKIN_DEPENDENCIES,
9097
ZipkinSpans.ZIPKIN_SPANS);
9198
}
9299
}

zipkin-storage/mysql/src/main/java/zipkin/storage/mysql/internal/generated/tables/ZipkinAnnotations.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -35,14 +35,14 @@
3535
@Generated(
3636
value = {
3737
"http://www.jooq.org",
38-
"jOOQ version:3.8.3"
38+
"jOOQ version:3.8.4"
3939
},
4040
comments = "This class is generated by jOOQ"
4141
)
4242
@SuppressWarnings({ "all", "unchecked", "rawtypes" })
4343
public class ZipkinAnnotations extends TableImpl<Record> {
4444

45-
private static final long serialVersionUID = 234778620;
45+
private static final long serialVersionUID = 1032479933;
4646

4747
/**
4848
* The reference instance of <code>zipkin.zipkin_annotations</code>

0 commit comments

Comments
 (0)