Skip to content

Commit 615d74b

Browse files
Sarah Brownadriancole
Sarah Brown
authored andcommitted
Makes serviceName optional, supporting endTs and lookback
Features such as dependency linking and cross-service UI are expensive to implement as fan-outs. This makes serviceName optional to support this use case, initially applying to endTs and lookback.
1 parent 16d7935 commit 615d74b

File tree

9 files changed

+222
-141
lines changed

9 files changed

+222
-141
lines changed

zipkin-anormdb/src/main/scala/com/twitter/zipkin/storage/anormdb/AnormSpanStore.scala

Lines changed: 68 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -190,37 +190,43 @@ class AnormSpanStore(val db: DB,
190190
}
191191

192192
// TODO: rewrite or delete anorm, as its implementation unnecessarily uses sliced queries
193-
val sliceQueryHeader =
193+
val allTracesHeader =
194194
"""SELECT t1.trace_id, start_ts
195195
|FROM zipkin_spans t1
196196
|JOIN zipkin_annotations t2 ON t1.trace_id = t2.trace_id AND t1.id = t2.span_id
197-
|WHERE t2.endpoint_service_name = {service_name}
198197
""".stripMargin
199198

199+
val sliceQueryHeader = s"${allTracesHeader} WHERE t2.endpoint_service_name = {service_name}"
200+
200201
val sliceQueryFooter =
201202
"""AND start_ts BETWEEN {start_ts} AND {end_ts}
202203
|GROUP BY t1.trace_id
203204
|ORDER BY start_ts DESC
204205
|LIMIT {limit}
205206
""".stripMargin
206207

207-
override def getTraceIdsByName(serviceName: String, spanName: Option[String],
208+
override def getTraceIdsByName(serviceName: Option[String], spanName: Option[String],
208209
endTs: Long, lookback: Long, limit: Int): Future[Seq[IndexedTraceId]] = db.inNewThreadWithRecoverableRetry {
209-
210210
if (endTs <= 0 || limit <= 0) {
211211
Seq.empty
212212
}
213213
else {
214214
implicit val (conn, borrowTime) = borrowConn()
215215
try {
216-
val result: List[(Long, Long)] = SQL(sliceQueryHeader +
217-
"AND (name = {name} OR {name} = '')" + sliceQueryFooter)
218-
.on("service_name" -> serviceName)
219-
.on("name" -> spanName.getOrElse(""))
220-
.on("start_ts" -> (endTs - lookback) * 1000)
221-
.on("end_ts" -> endTs * 1000)
222-
.on("limit" -> limit)
223-
.as((long("trace_id") ~ long("start_ts") map flatten) *)
216+
val query: SimpleSql[Row] = serviceName match {
217+
case Some(name) => SQL(sliceQueryHeader +
218+
"AND (name = {name} OR {name} = '')" + sliceQueryFooter)
219+
.on("service_name" -> name)
220+
case None => SQL(allTracesHeader +
221+
"AND (name = {name} OR {name} = '')" + sliceQueryFooter)
222+
}
223+
val result: List[(Long, Long)] =
224+
query.on("name" -> spanName.getOrElse(""))
225+
.on("start_ts" -> (endTs - lookback) * 1000)
226+
.on("end_ts" -> endTs * 1000)
227+
.on("limit" -> limit)
228+
.as((long("trace_id") ~ long("start_ts") map flatten) *)
229+
224230
result map { case (tId, ts) =>
225231
IndexedTraceId(traceId = tId, timestamp = ts)
226232
}
@@ -230,20 +236,28 @@ class AnormSpanStore(val db: DB,
230236
}
231237
}
232238

233-
override def getTraceIdsByAnnotation(serviceName: String, annotation: String, value: Option[ByteBuffer],
239+
override def getTraceIdsByAnnotation(serviceName: Option[String], annotation: String, value: Option[ByteBuffer],
234240
endTs: Long, lookback: Long, limit: Int): Future[Seq[IndexedTraceId]] = db.inNewThreadWithRecoverableRetry {
235241
implicit val (conn, borrowTime) = borrowConn()
236242
try {
237243
val result:List[(Long, Long)] = value match {
238244
// Binary annotations
239245
case Some(bytes) => {
240-
SQL(sliceQueryHeader +
241-
"""AND t2.a_key = {annotation}
242-
|AND t2.a_value = {value}
243-
|AND t2.a_type != -1
244-
""".stripMargin + sliceQueryFooter)
245-
.on("service_name" -> serviceName)
246-
.on("annotation" -> annotation)
246+
val query:SimpleSql[Row] = serviceName match {
247+
case Some(name) => SQL(sliceQueryHeader +
248+
"""AND t2.a_key = {annotation}
249+
|AND t2.a_value = {value}
250+
|AND t2.a_type != -1
251+
""".stripMargin + sliceQueryFooter)
252+
.on("service_name" -> name)
253+
case None => SQL(allTracesHeader +
254+
"""AND t2.a_key = {annotation}
255+
|AND t2.a_value = {value}
256+
|AND t2.a_type != -1
257+
""".stripMargin + sliceQueryFooter)
258+
}
259+
260+
query.on("annotation" -> annotation)
247261
.on("value" -> Util.getArrayFromBuffer(bytes))
248262
.on("start_ts" -> (endTs - lookback) * 1000)
249263
.on("end_ts" -> endTs * 1000)
@@ -252,12 +266,19 @@ class AnormSpanStore(val db: DB,
252266
}
253267
// Normal annotations
254268
case None => {
255-
SQL(sliceQueryHeader +
256-
"""AND t2.a_key = {annotation}
257-
|AND t2.a_type = -1
258-
""".stripMargin + sliceQueryFooter)
259-
.on("service_name" -> serviceName)
260-
.on("annotation" -> annotation)
269+
val query:SimpleSql[Row] = serviceName match {
270+
case Some(name) => SQL(sliceQueryHeader +
271+
"""AND t2.a_key = {annotation}
272+
|AND t2.a_type = -1
273+
""".stripMargin + sliceQueryFooter)
274+
.on("service_name" -> name)
275+
case None => SQL(allTracesHeader +
276+
"""AND t2.a_key = {annotation}
277+
|AND t2.a_type = -1
278+
""".stripMargin + sliceQueryFooter)
279+
}
280+
281+
query.on("annotation" -> annotation)
261282
.on("start_ts" -> (endTs - lookback) * 1000)
262283
.on("end_ts" -> endTs * 1000)
263284
.on("limit" -> limit)
@@ -274,7 +295,7 @@ class AnormSpanStore(val db: DB,
274295
}
275296

276297
override protected def getTraceIdsByDuration(
277-
serviceName: String,
298+
serviceName: Option[String],
278299
spanName: Option[String],
279300
minDuration: Long,
280301
maxDuration: Option[Long],
@@ -284,18 +305,26 @@ class AnormSpanStore(val db: DB,
284305
): Future[Seq[IndexedTraceId]] = db.inNewThreadWithRecoverableRetry {
285306
implicit val (conn, borrowTime) = borrowConn()
286307
try {
287-
val result: List[(Long, Long)] = SQL(sliceQueryHeader +
288-
"""AND (name = {name} OR {name} = '')
289-
|AND duration BETWEEN {min_duration} AND {max_duration}
290-
""".stripMargin + sliceQueryFooter)
291-
.on("name" -> spanName.getOrElse(""))
292-
.on("service_name" -> serviceName)
293-
.on("min_duration" -> minDuration)
294-
.on("max_duration" -> maxDuration.getOrElse(Long.MaxValue))
295-
.on("start_ts" -> (endTs - lookback) * 1000)
296-
.on("end_ts" -> endTs * 1000)
297-
.on("limit" -> limit)
298-
.as((long("trace_id") ~ long("start_ts") map flatten) *)
308+
val query: SimpleSql[Row] = serviceName match {
309+
case Some(name) => SQL(sliceQueryHeader +
310+
"""AND (name = {name} OR {name} = '')
311+
|AND duration BETWEEN {min_duration} AND {max_duration}
312+
""".stripMargin + sliceQueryFooter)
313+
.on("service_name" -> name)
314+
case None => SQL(allTracesHeader +
315+
"""AND (name = {name} OR {name} = '')
316+
|AND duration BETWEEN {min_duration} AND {max_duration}
317+
""".stripMargin + sliceQueryFooter)
318+
}
319+
val result: List[(Long, Long)] =
320+
query.on("name" -> spanName.getOrElse(""))
321+
.on("min_duration" -> minDuration)
322+
.on("max_duration" -> maxDuration.getOrElse(Long.MaxValue))
323+
.on("start_ts" -> (endTs - lookback) * 1000)
324+
.on("end_ts" -> endTs * 1000)
325+
.on("limit" -> limit)
326+
.as((long("trace_id") ~ long("start_ts") map flatten) *)
327+
299328
result map { case (tId, ts) =>
300329
IndexedTraceId(traceId = tId, timestamp = ts)
301330
}

zipkin-cassandra-core/src/main/java/org/twitter/zipkin/storage/cassandra/Repository.java

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import com.google.common.io.CharStreams;
1818
import com.google.common.util.concurrent.Futures;
1919
import com.google.common.util.concurrent.ListenableFuture;
20+
2021
import java.io.IOException;
2122
import java.io.InputStreamReader;
2223
import java.io.Reader;
@@ -169,7 +170,7 @@ public Repository(String keyspace, Cluster cluster, Boolean ensureSchema) {
169170
selectTraceIdsByServiceName = session.prepare(
170171
QueryBuilder.select("ts", "trace_id")
171172
.from("service_name_index")
172-
.where(QueryBuilder.eq("service_name", QueryBuilder.bindMarker("service_name")))
173+
.where(QueryBuilder.in("service_name", QueryBuilder.bindMarker("service_name")))
173174
.and(QueryBuilder.in("bucket", QueryBuilder.bindMarker("bucket")))
174175
.and(QueryBuilder.gte("ts", QueryBuilder.bindMarker("start_ts")))
175176
.and(QueryBuilder.lte("ts", QueryBuilder.bindMarker("end_ts")))
@@ -517,13 +518,11 @@ private String debugInsertSpanName(String serviceName, String spanName, int ttl)
517518
.replace(":ttl_", String.valueOf(ttl));
518519
}
519520

520-
public ListenableFuture<Map<Long,Long>> getTraceIdsByServiceName(String serviceName, long endTs, long lookback, int limit) {
521-
Preconditions.checkNotNull(serviceName);
522-
Preconditions.checkArgument(!serviceName.isEmpty());
521+
public ListenableFuture<Map<Long,Long>> getTraceIdsByServiceName(List<String> serviceNames, long endTs, long lookback, int limit) {
523522
long startTs = endTs - lookback;
524523
try {
525524
BoundStatement bound = selectTraceIdsByServiceName.bind()
526-
.setString("service_name", serviceName)
525+
.setList("service_name", serviceNames)
527526
.setList("bucket", ALL_BUCKETS)
528527
.setBytesUnsafe("start_ts", serializeTs(startTs))
529528
.setBytesUnsafe("end_ts", serializeTs(endTs))
@@ -532,7 +531,7 @@ public ListenableFuture<Map<Long,Long>> getTraceIdsByServiceName(String serviceN
532531
bound.setFetchSize(Integer.MAX_VALUE);
533532

534533
if (LOG.isDebugEnabled()) {
535-
LOG.debug(debugSelectTraceIdsByServiceName(serviceName, startTs, endTs, limit));
534+
LOG.debug(debugSelectTraceIdsByServiceName(serviceNames, startTs, endTs, limit));
536535
}
537536

538537
return Futures.transform(
@@ -546,14 +545,14 @@ public ListenableFuture<Map<Long,Long>> getTraceIdsByServiceName(String serviceN
546545
}
547546
);
548547
} catch (RuntimeException ex) {
549-
LOG.error("failed " + debugSelectTraceIdsByServiceName(serviceName, startTs, endTs, limit), ex);
548+
LOG.error("failed " + debugSelectTraceIdsByServiceName(serviceNames, startTs, endTs, limit), ex);
550549
return Futures.immediateFailedFuture(ex);
551550
}
552551
}
553552

554-
private String debugSelectTraceIdsByServiceName(String serviceName, long startTs, long endTs, int limit) {
553+
private String debugSelectTraceIdsByServiceName(List<String> serviceNames, long startTs, long endTs, int limit) {
555554
return selectTraceIdsByServiceName.getQueryString()
556-
.replace(":service_name", serviceName)
555+
.replace(":service_name", String.valueOf(serviceNames))
557556
.replace(":start_ts", new Date(startTs / 1000).toString())
558557
.replace(":end_ts", new Date(endTs / 1000).toString())
559558
.replace(":limit_", String.valueOf(limit));

zipkin-cassandra/src/main/scala/com/twitter/zipkin/storage/cassandra/CassandraSpanStore.scala

Lines changed: 37 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import com.twitter.zipkin.conversions.thrift._
2626
import com.twitter.zipkin.storage.{CollectAnnotationQueries, IndexedTraceId, SpanStore}
2727
import com.twitter.zipkin.thriftscala.{Span => ThriftSpan}
2828
import com.twitter.zipkin.util.{FutureUtil, Util}
29+
import java.util
2930
import org.twitter.zipkin.storage.cassandra.Repository
3031

3132
import scala.collection.JavaConverters._
@@ -269,20 +270,27 @@ abstract class CassandraSpanStore(
269270
}
270271

271272
override def getTraceIdsByName(
272-
serviceName: String,
273+
serviceName: Option[String],
273274
spanName: Option[String],
274275
endTs: Long,
275276
lookback: Long,
276277
limit: Int
277278
): Future[Seq[IndexedTraceId]] = {
278279
QueryGetTraceIdsByNameCounter.incr()
279280

280-
val traceIdsFuture = FutureUtil.toFuture(spanName match {
281+
val traceIdsFuture = (serviceName, spanName) match {
281282
// if we have a span name, look up in the service + span name index
282283
// if not, look up by service name only
283-
case Some(x :String) => repository.getTraceIdsBySpanName(serviceName, x, endTs * 1000, lookback * 1000, limit)
284-
case None => repository.getTraceIdsByServiceName(serviceName, endTs * 1000, lookback * 1000, limit)
285-
})
284+
case (Some(x: String), Some(y: String)) =>
285+
FutureUtil.toFuture(repository.getTraceIdsBySpanName(x, y, endTs * 1000, lookback * 1000, limit))
286+
case (Some(x: String), None) =>
287+
FutureUtil.toFuture(repository.getTraceIdsByServiceName(Seq(x).asJava, endTs * 1000, lookback * 1000, limit))
288+
case (None, Some(y: String)) =>
289+
Future.exception(new UnsupportedOperationException)
290+
case (None, None) => FutureUtil.toFuture(repository.getServiceNames).flatMap { names =>
291+
FutureUtil.toFuture(repository.getTraceIdsByServiceName(new util.ArrayList(names), endTs * 1000, lookback * 1000, limit))
292+
}
293+
}
286294

287295
traceIdsFuture.map { traceIds =>
288296
traceIds.asScala
@@ -292,27 +300,29 @@ abstract class CassandraSpanStore(
292300
}
293301

294302
override def getTraceIdsByAnnotation(
295-
serviceName: String,
303+
serviceName: Option[String],
296304
annotation: String,
297305
value: Option[ByteBuffer],
298306
endTs: Long,
299307
lookback: Long,
300308
limit: Int
301309
): Future[Seq[IndexedTraceId]] = {
302310
QueryGetTraceIdsByAnnotationCounter.incr()
303-
304-
FutureUtil.toFuture(
305-
repository
306-
.getTraceIdsByAnnotation(annotationKey(serviceName, annotation, value), endTs * 1000, lookback * 1000, limit))
307-
.map { traceIds =>
308-
traceIds.asScala
309-
.map { case (traceId, ts) => IndexedTraceId(traceId, timestamp = ts) }
310-
.toSeq
311-
}
311+
serviceName match {
312+
case Some(name) => FutureUtil.toFuture(
313+
repository
314+
.getTraceIdsByAnnotation(annotationKey(name, annotation, value), endTs * 1000, lookback * 1000, limit))
315+
.map { traceIds =>
316+
traceIds.asScala
317+
.map { case (traceId, ts) => IndexedTraceId(traceId, timestamp = ts) }
318+
.toSeq
319+
}
320+
case None => Future.exception(new UnsupportedOperationException)
321+
}
312322
}
313323

314324
override protected def getTraceIdsByDuration(
315-
serviceName: String,
325+
serviceName: Option[String],
316326
spanName: Option[String],
317327
minDuration: Long,
318328
maxDuration: Option[Long],
@@ -322,15 +332,17 @@ abstract class CassandraSpanStore(
322332
): Future[Seq[IndexedTraceId]] = {
323333
QueryGetTraceIdsByDurationCounter.incr()
324334

325-
Future.exception(new UnsupportedOperationException)
326-
FutureUtil.toFuture(
327-
repository
328-
.getTraceIdsByDuration(serviceName, spanName getOrElse "", minDuration, maxDuration getOrElse Long.MaxValue,
329-
endTs * 1000, (endTs - lookback) * 1000, limit, indexTtl.inSeconds))
330-
.map { traceIds =>
331-
traceIds.asScala
332-
.map { case (traceId, ts) => IndexedTraceId(traceId, timestamp = ts) }
333-
.toSeq
335+
serviceName match {
336+
case Some(name) => FutureUtil.toFuture(
337+
repository
338+
.getTraceIdsByDuration(name, spanName getOrElse "", minDuration, maxDuration getOrElse Long.MaxValue,
339+
endTs * 1000, (endTs - lookback) * 1000, limit, indexTtl.inSeconds))
340+
.map { traceIds =>
341+
traceIds.asScala
342+
.map { case (traceId, ts) => IndexedTraceId(traceId, timestamp = ts) }
343+
.toSeq
344+
}
345+
case None => Future.exception(new UnsupportedOperationException)
334346
}
335347
}
336348
}

zipkin-common/src/main/scala/com/twitter/zipkin/storage/CollectAnnotationQueries.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ trait CollectAnnotationQueries {
1919
* Only return maximum of limit trace ids from before the endTs.
2020
*/
2121
protected def getTraceIdsByName(
22-
serviceName: String,
22+
serviceName: Option[String],
2323
spanName: Option[String],
2424
endTs: Long,
2525
lookback: Long,
@@ -32,7 +32,7 @@ trait CollectAnnotationQueries {
3232
* Only return maximum of limit trace ids from before the endTs.
3333
*/
3434
protected def getTraceIdsByAnnotation(
35-
serviceName: String,
35+
serviceName: Option[String],
3636
annotation: String,
3737
value: Option[ByteBuffer],
3838
endTs: Long,
@@ -42,7 +42,7 @@ trait CollectAnnotationQueries {
4242

4343
/** Only return traces where [[Span.duration]] is between minDuration and maxDuration */
4444
protected def getTraceIdsByDuration(
45-
serviceName: String,
45+
serviceName: Option[String],
4646
spanName: Option[String],
4747
minDuration: Long,
4848
maxDuration: Option[Long],

0 commit comments

Comments
 (0)