Skip to content

Commit 4ebc2d7

Browse files
author
Adrian Cole
committed
Refactors storage query from using JOIN to using EXISTS for conditions
As suggested by @asvanberg, this could improve read performance dramatically. Cleanups for commit will wail until this is verified.
1 parent 88f1f29 commit 4ebc2d7

File tree

2 files changed

+54
-40
lines changed

2 files changed

+54
-40
lines changed

zipkin-storage/mysql/src/main/java/zipkin/storage/mysql/MySQLSpanStore.java

Lines changed: 52 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,12 @@
2828
import org.jooq.Condition;
2929
import org.jooq.Cursor;
3030
import org.jooq.DSLContext;
31+
import org.jooq.Field;
3132
import org.jooq.Record;
3233
import org.jooq.Row3;
3334
import org.jooq.SelectConditionStep;
34-
import org.jooq.SelectField;
3535
import org.jooq.SelectOffsetStep;
3636
import org.jooq.TableField;
37-
import org.jooq.TableOnConditionStep;
3837
import zipkin.Annotation;
3938
import zipkin.BinaryAnnotation;
4039
import zipkin.BinaryAnnotation.Type;
@@ -46,10 +45,9 @@
4645
import zipkin.internal.v2.Span;
4746
import zipkin.storage.QueryRequest;
4847
import zipkin.storage.SpanStore;
49-
import zipkin.storage.mysql.internal.generated.tables.ZipkinAnnotations;
5048

51-
import static java.util.Collections.emptyList;
5249
import static java.util.stream.Collectors.groupingBy;
50+
import static org.jooq.impl.DSL.exists;
5351
import static org.jooq.impl.DSL.row;
5452
import static zipkin.BinaryAnnotation.Type.STRING;
5553
import static zipkin.Constants.CLIENT_ADDR;
@@ -91,54 +89,69 @@ SelectOffsetStep<? extends Record> toTraceIdQuery(DSLContext context, QueryReque
9189
long endTs = (request.endTs > 0 && request.endTs != Long.MAX_VALUE) ? request.endTs * 1000
9290
: System.currentTimeMillis() * 1000;
9391

94-
TableOnConditionStep<?> table = ZIPKIN_SPANS.join(ZIPKIN_ANNOTATIONS)
95-
.on(schema.joinCondition(ZIPKIN_ANNOTATIONS));
96-
97-
int i = 0;
92+
Field<Long> max = ZIPKIN_SPANS.START_TS.max().as("max");
93+
List<Field<?>> distinctFields = new ArrayList<>(schema.spanIdFields);
94+
distinctFields.add(max);
95+
List<Field<?>> spanFields = new ArrayList<>(distinctFields);
96+
spanFields.add(ZIPKIN_SPANS.ID);
97+
spanFields.add(ZIPKIN_SPANS.START_TS);
98+
SelectConditionStep<Record> spanQuery = context
99+
.select(distinctFields)
100+
.from(ZIPKIN_SPANS)
101+
.where(ZIPKIN_SPANS.START_TS.between(endTs - request.lookback * 1000, endTs));
102+
103+
Map<String, String> annotationQuery = new LinkedHashMap<>();
98104
for (String key : request.annotations) {
99-
ZipkinAnnotations aTable = ZIPKIN_ANNOTATIONS.as("a" + i++);
100-
table = maybeOnService(table.join(aTable)
101-
.on(schema.joinCondition(aTable))
102-
.and(aTable.A_KEY.eq(key)), aTable, request.serviceName);
105+
annotationQuery.put(key, "");
103106
}
104-
105107
for (Map.Entry<String, String> kv : request.binaryAnnotations.entrySet()) {
106-
ZipkinAnnotations aTable = ZIPKIN_ANNOTATIONS.as("a" + i++);
107-
table = maybeOnService(table.join(aTable)
108-
.on(schema.joinCondition(aTable))
109-
.and(aTable.A_TYPE.eq(STRING.value))
110-
.and(aTable.A_KEY.eq(kv.getKey()))
111-
.and(aTable.A_VALUE.eq(kv.getValue().getBytes(UTF_8))), aTable, request.serviceName);
108+
annotationQuery.put(kv.getKey(), kv.getValue());
109+
}
110+
111+
if (annotationQuery.isEmpty()) {
112+
spanQuery.and(exists(constraintsWithoutAnnotations(context, request, spanFields)));
112113
}
113114

114-
List<SelectField<?>> distinctFields = new ArrayList<>(schema.spanIdFields);
115-
distinctFields.add(ZIPKIN_SPANS.START_TS.max());
116-
SelectConditionStep<Record> dsl = context.selectDistinct(distinctFields)
117-
.from(table)
118-
.where(ZIPKIN_SPANS.START_TS.between(endTs - request.lookback * 1000, endTs));
115+
// The annotation query needs to be run per key/value pair
116+
for (Map.Entry<String, String> kv : annotationQuery.entrySet()) {
117+
SelectConditionStep and = constraintsWithoutAnnotations(context, request, spanFields);
118+
119+
if (kv.getValue().isEmpty()) { // timeline annotation value or tag key
120+
and.and(ZIPKIN_ANNOTATIONS.A_KEY.eq(kv.getKey()));
121+
} else { // tag key + value
122+
and.and(ZIPKIN_ANNOTATIONS.A_TYPE.eq(STRING.value))
123+
.and(ZIPKIN_ANNOTATIONS.A_KEY.eq(kv.getKey()))
124+
.and(ZIPKIN_ANNOTATIONS.A_VALUE.eq(kv.getValue().getBytes(UTF_8)));
125+
}
126+
127+
spanQuery.and(exists(and));
128+
}
129+
130+
return spanQuery.groupBy(schema.spanIdFields)
131+
.orderBy(max.desc()).limit(request.limit);
132+
}
133+
134+
private SelectConditionStep constraintsWithoutAnnotations(DSLContext context,
135+
QueryRequest request, List<Field<?>> spanFields) {
136+
SelectConditionStep and = context.selectOne()
137+
.from(ZIPKIN_ANNOTATIONS)
138+
.where(schema.joinCondition(ZIPKIN_ANNOTATIONS));
119139

120140
if (request.serviceName != null) {
121-
dsl.and(ZIPKIN_ANNOTATIONS.ENDPOINT_SERVICE_NAME.eq(request.serviceName));
141+
and.and(ZIPKIN_ANNOTATIONS.ENDPOINT_SERVICE_NAME.eq(request.serviceName));
122142
}
123143

124144
if (request.spanName != null) {
125-
dsl.and(ZIPKIN_SPANS.NAME.eq(request.spanName));
145+
spanFields.add(ZIPKIN_SPANS.NAME);
146+
and.and(ZIPKIN_SPANS.NAME.eq(request.spanName));
126147
}
127148

128149
if (request.minDuration != null && request.maxDuration != null) {
129-
dsl.and(ZIPKIN_SPANS.DURATION.between(request.minDuration, request.maxDuration));
150+
and.and(ZIPKIN_SPANS.DURATION.between(request.minDuration, request.maxDuration));
130151
} else if (request.minDuration != null) {
131-
dsl.and(ZIPKIN_SPANS.DURATION.greaterOrEqual(request.minDuration));
152+
and.and(ZIPKIN_SPANS.DURATION.greaterOrEqual(request.minDuration));
132153
}
133-
return dsl
134-
.groupBy(schema.spanIdFields)
135-
.orderBy(ZIPKIN_SPANS.START_TS.max().desc()).limit(request.limit);
136-
}
137-
138-
static TableOnConditionStep<?> maybeOnService(TableOnConditionStep<Record> table,
139-
ZipkinAnnotations aTable, String serviceName) {
140-
if (serviceName == null) return table;
141-
return table.and(aTable.ENDPOINT_SERVICE_NAME.eq(serviceName));
154+
return and;
142155
}
143156

144157
List<List<zipkin.Span>> getTraces(@Nullable QueryRequest request, @Nullable Long traceIdHigh,
@@ -215,7 +228,8 @@ List<List<zipkin.Span>> getTraces(@Nullable QueryRequest request, @Nullable Long
215228
return GroupByTraceId.apply(allSpans, strictTraceId, !raw);
216229
}
217230

218-
static <T> T maybeGet(Record record, TableField<Record, T> field, T defaultValue) {
231+
@Nullable
232+
static <T> T maybeGet(Record record, TableField<Record, T> field, @Nullable T defaultValue) {
219233
if (record.fieldsRow().indexOf(field) < 0) {
220234
return defaultValue;
221235
} else {
@@ -264,7 +278,6 @@ public List<String> getServiceNames() {
264278

265279
@Override
266280
public List<String> getSpanNames(String serviceName) {
267-
if (serviceName == null) return emptyList();
268281
serviceName = serviceName.toLowerCase(); // service names are always lowercase!
269282
try (Connection conn = datasource.getConnection()) {
270283
return context.get(conn)

zipkin-storage/mysql/src/main/java/zipkin/storage/mysql/Schema.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import java.util.Arrays;
1818
import java.util.List;
1919
import java.util.Set;
20+
import javax.annotation.Nullable;
2021
import javax.sql.DataSource;
2122
import org.jooq.Condition;
2223
import org.jooq.Field;
@@ -106,7 +107,7 @@ Condition spanTraceIdCondition(SelectOffsetStep<? extends Record> traceIdQuery)
106107
}
107108
}
108109

109-
Condition spanTraceIdCondition(Long traceIdHigh, long traceIdLow) {
110+
Condition spanTraceIdCondition(@Nullable Long traceIdHigh, long traceIdLow) {
110111
return traceIdHigh != null && hasTraceIdHigh
111112
? row(ZIPKIN_SPANS.TRACE_ID_HIGH, ZIPKIN_SPANS.TRACE_ID).eq(traceIdHigh, traceIdLow)
112113
: ZIPKIN_SPANS.TRACE_ID.eq(traceIdLow);

0 commit comments

Comments
 (0)