18
18
import java .util .ArrayList ;
19
19
import java .util .Arrays ;
20
20
import java .util .Collections ;
21
+ import java .util .Date ;
21
22
import java .util .Iterator ;
22
23
import java .util .LinkedHashMap ;
23
24
import java .util .List ;
58
59
import static zipkin .Constants .SERVER_ADDR ;
59
60
import static zipkin .Constants .SERVER_RECV ;
60
61
import static zipkin .internal .Util .UTF_8 ;
62
+ import static zipkin .internal .Util .getDays ;
61
63
import static zipkin .storage .mysql .internal .generated .tables .ZipkinAnnotations .ZIPKIN_ANNOTATIONS ;
64
+ import static zipkin .storage .mysql .internal .generated .tables .ZipkinDependencies .ZIPKIN_DEPENDENCIES ;
62
65
import static zipkin .storage .mysql .internal .generated .tables .ZipkinSpans .ZIPKIN_SPANS ;
63
66
64
67
final class MySQLSpanStore implements SpanStore {
@@ -74,11 +77,14 @@ final class MySQLSpanStore implements SpanStore {
74
77
private final DataSource datasource ;
75
78
private final DSLContexts context ;
76
79
private final Lazy <Boolean > hasIpv6 ;
80
+ private final Lazy <Boolean > hasPreAggregatedDependencies ;
77
81
78
- MySQLSpanStore (DataSource datasource , DSLContexts context , Lazy <Boolean > hasIpv6 ) {
82
+ MySQLSpanStore (DataSource datasource , DSLContexts context , Lazy <Boolean > hasIpv6 ,
83
+ Lazy <Boolean > hasPreAggregatedDependencies ) {
79
84
this .datasource = datasource ;
80
85
this .context = context ;
81
86
this .hasIpv6 = hasIpv6 ;
87
+ this .hasPreAggregatedDependencies = hasPreAggregatedDependencies ;
82
88
}
83
89
84
90
private Endpoint endpoint (Record a ) {
@@ -273,39 +279,56 @@ public List<String> getSpanNames(String serviceName) {
273
279
274
280
@ Override
275
281
public List <DependencyLink > getDependencies (long endTs , @ Nullable Long lookback ) {
276
- endTs = endTs * 1000 ;
277
282
try (Connection conn = datasource .getConnection ()) {
278
- // Lazy fetching the cursor prevents us from buffering the whole dataset in memory.
279
- Cursor <Record5 <Long , Long , Long , String , String >> cursor = context .get (conn )
280
- .selectDistinct (ZIPKIN_SPANS .TRACE_ID , ZIPKIN_SPANS .PARENT_ID , ZIPKIN_SPANS .ID ,
281
- ZIPKIN_ANNOTATIONS .A_KEY , ZIPKIN_ANNOTATIONS .ENDPOINT_SERVICE_NAME )
282
- // left joining allows us to keep a mapping of all span ids, not just ones that have
283
- // special annotations. We need all span ids to reconstruct the trace tree. We need
284
- // the whole trace tree so that we can accurately skip local spans.
285
- .from (ZIPKIN_SPANS .leftJoin (ZIPKIN_ANNOTATIONS )
286
- .on (ZIPKIN_SPANS .TRACE_ID .eq (ZIPKIN_ANNOTATIONS .TRACE_ID ).and (
287
- ZIPKIN_SPANS .ID .eq (ZIPKIN_ANNOTATIONS .SPAN_ID )))
288
- .and (ZIPKIN_ANNOTATIONS .A_KEY .in (CLIENT_ADDR , SERVER_RECV , SERVER_ADDR )))
289
- .where (lookback == null ?
290
- ZIPKIN_SPANS .START_TS .lessOrEqual (endTs ) :
291
- ZIPKIN_SPANS .START_TS .between (endTs - lookback * 1000 , endTs ))
292
- // Grouping so that later code knows when a span or trace is finished.
293
- .groupBy (ZIPKIN_SPANS .TRACE_ID , ZIPKIN_SPANS .ID , ZIPKIN_ANNOTATIONS .A_KEY ).fetchLazy ();
294
-
295
- Iterator <Iterator <DependencyLinkSpan >> traces =
296
- new DependencyLinkSpanIterator .ByTraceId (cursor .iterator ());
297
-
298
- if (!traces .hasNext ()) return Collections .emptyList ();
299
-
300
- DependencyLinker linker = new DependencyLinker ();
301
-
302
- while (traces .hasNext ()) {
303
- linker .putTrace (traces .next ());
283
+ if (hasPreAggregatedDependencies .get ()) {
284
+ List <Date > days = getDays (endTs , lookback );
285
+ List <DependencyLink > unmerged = context .get (conn )
286
+ .selectFrom (ZIPKIN_DEPENDENCIES )
287
+ .where (ZIPKIN_DEPENDENCIES .DAY .in (days ))
288
+ .fetch ((Record l ) -> DependencyLink .create (
289
+ l .get (ZIPKIN_DEPENDENCIES .PARENT ),
290
+ l .get (ZIPKIN_DEPENDENCIES .CHILD ),
291
+ l .get (ZIPKIN_DEPENDENCIES .CALL_COUNT ))
292
+ );
293
+ return DependencyLinker .merge (unmerged );
294
+ } else {
295
+ return aggregateDependencies (endTs , lookback , conn );
304
296
}
305
-
306
- return linker .link ();
307
297
} catch (SQLException e ) {
308
298
throw new RuntimeException ("Error querying dependencies for endTs " + endTs + " and lookback " + lookback + ": " + e .getMessage ());
309
299
}
310
300
}
301
+
302
+ List <DependencyLink > aggregateDependencies (long endTs , @ Nullable Long lookback , Connection conn ) {
303
+ endTs = endTs * 1000 ;
304
+ // Lazy fetching the cursor prevents us from buffering the whole dataset in memory.
305
+ Cursor <Record5 <Long , Long , Long , String , String >> cursor = context .get (conn )
306
+ .selectDistinct (ZIPKIN_SPANS .TRACE_ID , ZIPKIN_SPANS .PARENT_ID , ZIPKIN_SPANS .ID ,
307
+ ZIPKIN_ANNOTATIONS .A_KEY , ZIPKIN_ANNOTATIONS .ENDPOINT_SERVICE_NAME )
308
+ // left joining allows us to keep a mapping of all span ids, not just ones that have
309
+ // special annotations. We need all span ids to reconstruct the trace tree. We need
310
+ // the whole trace tree so that we can accurately skip local spans.
311
+ .from (ZIPKIN_SPANS .leftJoin (ZIPKIN_ANNOTATIONS )
312
+ .on (ZIPKIN_SPANS .TRACE_ID .eq (ZIPKIN_ANNOTATIONS .TRACE_ID ).and (
313
+ ZIPKIN_SPANS .ID .eq (ZIPKIN_ANNOTATIONS .SPAN_ID )))
314
+ .and (ZIPKIN_ANNOTATIONS .A_KEY .in (CLIENT_ADDR , SERVER_RECV , SERVER_ADDR )))
315
+ .where (lookback == null ?
316
+ ZIPKIN_SPANS .START_TS .lessOrEqual (endTs ) :
317
+ ZIPKIN_SPANS .START_TS .between (endTs - lookback * 1000 , endTs ))
318
+ // Grouping so that later code knows when a span or trace is finished.
319
+ .groupBy (ZIPKIN_SPANS .TRACE_ID , ZIPKIN_SPANS .ID , ZIPKIN_ANNOTATIONS .A_KEY ).fetchLazy ();
320
+
321
+ Iterator <Iterator <DependencyLinkSpan >> traces =
322
+ new DependencyLinkSpanIterator .ByTraceId (cursor .iterator ());
323
+
324
+ if (!traces .hasNext ()) return Collections .emptyList ();
325
+
326
+ DependencyLinker linker = new DependencyLinker ();
327
+
328
+ while (traces .hasNext ()) {
329
+ linker .putTrace (traces .next ());
330
+ }
331
+
332
+ return linker .link ();
333
+ }
311
334
}
0 commit comments