|
28 | 28 | import org.jooq.Condition;
|
29 | 29 | import org.jooq.Cursor;
|
30 | 30 | import org.jooq.DSLContext;
|
| 31 | +import org.jooq.Field; |
31 | 32 | import org.jooq.Record;
|
32 | 33 | import org.jooq.Row3;
|
33 | 34 | import org.jooq.SelectConditionStep;
|
34 |
| -import org.jooq.SelectField; |
35 | 35 | import org.jooq.SelectOffsetStep;
|
36 | 36 | import org.jooq.TableField;
|
37 |
| -import org.jooq.TableOnConditionStep; |
38 | 37 | import zipkin.Annotation;
|
39 | 38 | import zipkin.BinaryAnnotation;
|
40 | 39 | import zipkin.BinaryAnnotation.Type;
|
|
46 | 45 | import zipkin.internal.v2.Span;
|
47 | 46 | import zipkin.storage.QueryRequest;
|
48 | 47 | import zipkin.storage.SpanStore;
|
49 |
| -import zipkin.storage.mysql.internal.generated.tables.ZipkinAnnotations; |
50 | 48 |
|
51 |
| -import static java.util.Collections.emptyList; |
52 | 49 | import static java.util.stream.Collectors.groupingBy;
|
| 50 | +import static org.jooq.impl.DSL.exists; |
53 | 51 | import static org.jooq.impl.DSL.row;
|
54 | 52 | import static zipkin.BinaryAnnotation.Type.STRING;
|
55 | 53 | import static zipkin.Constants.CLIENT_ADDR;
|
@@ -91,54 +89,69 @@ SelectOffsetStep<? extends Record> toTraceIdQuery(DSLContext context, QueryReque
|
91 | 89 | long endTs = (request.endTs > 0 && request.endTs != Long.MAX_VALUE) ? request.endTs * 1000
|
92 | 90 | : System.currentTimeMillis() * 1000;
|
93 | 91 |
|
94 |
| - TableOnConditionStep<?> table = ZIPKIN_SPANS.join(ZIPKIN_ANNOTATIONS) |
95 |
| - .on(schema.joinCondition(ZIPKIN_ANNOTATIONS)); |
96 |
| - |
97 |
| - int i = 0; |
| 92 | + Field<Long> max = ZIPKIN_SPANS.START_TS.max().as("max"); |
| 93 | + List<Field<?>> distinctFields = new ArrayList<>(schema.spanIdFields); |
| 94 | + distinctFields.add(max); |
| 95 | + List<Field<?>> spanFields = new ArrayList<>(distinctFields); |
| 96 | + spanFields.add(ZIPKIN_SPANS.ID); |
| 97 | + spanFields.add(ZIPKIN_SPANS.START_TS); |
| 98 | + SelectConditionStep<Record> spanQuery = context |
| 99 | + .select(distinctFields) |
| 100 | + .from(ZIPKIN_SPANS) |
| 101 | + .where(ZIPKIN_SPANS.START_TS.between(endTs - request.lookback * 1000, endTs)); |
| 102 | + |
| 103 | + Map<String, String> annotationQuery = new LinkedHashMap<>(); |
98 | 104 | for (String key : request.annotations) {
|
99 |
| - ZipkinAnnotations aTable = ZIPKIN_ANNOTATIONS.as("a" + i++); |
100 |
| - table = maybeOnService(table.join(aTable) |
101 |
| - .on(schema.joinCondition(aTable)) |
102 |
| - .and(aTable.A_KEY.eq(key)), aTable, request.serviceName); |
| 105 | + annotationQuery.put(key, ""); |
103 | 106 | }
|
104 |
| - |
105 | 107 | for (Map.Entry<String, String> kv : request.binaryAnnotations.entrySet()) {
|
106 |
| - ZipkinAnnotations aTable = ZIPKIN_ANNOTATIONS.as("a" + i++); |
107 |
| - table = maybeOnService(table.join(aTable) |
108 |
| - .on(schema.joinCondition(aTable)) |
109 |
| - .and(aTable.A_TYPE.eq(STRING.value)) |
110 |
| - .and(aTable.A_KEY.eq(kv.getKey())) |
111 |
| - .and(aTable.A_VALUE.eq(kv.getValue().getBytes(UTF_8))), aTable, request.serviceName); |
| 108 | + annotationQuery.put(kv.getKey(), kv.getValue()); |
| 109 | + } |
| 110 | + |
| 111 | + if (annotationQuery.isEmpty()) { |
| 112 | + spanQuery.and(exists(constraintsWithoutAnnotations(context, request, spanFields))); |
112 | 113 | }
|
113 | 114 |
|
114 |
| - List<SelectField<?>> distinctFields = new ArrayList<>(schema.spanIdFields); |
115 |
| - distinctFields.add(ZIPKIN_SPANS.START_TS.max()); |
116 |
| - SelectConditionStep<Record> dsl = context.selectDistinct(distinctFields) |
117 |
| - .from(table) |
118 |
| - .where(ZIPKIN_SPANS.START_TS.between(endTs - request.lookback * 1000, endTs)); |
| 115 | + // The annotation query needs to be run per key/value pair |
| 116 | + for (Map.Entry<String, String> kv : annotationQuery.entrySet()) { |
| 117 | + SelectConditionStep and = constraintsWithoutAnnotations(context, request, spanFields); |
| 118 | + |
| 119 | + if (kv.getValue().isEmpty()) { // timeline annotation value or tag key |
| 120 | + and.and(ZIPKIN_ANNOTATIONS.A_KEY.eq(kv.getKey())); |
| 121 | + } else { // tag key + value |
| 122 | + and.and(ZIPKIN_ANNOTATIONS.A_TYPE.eq(STRING.value)) |
| 123 | + .and(ZIPKIN_ANNOTATIONS.A_KEY.eq(kv.getKey())) |
| 124 | + .and(ZIPKIN_ANNOTATIONS.A_VALUE.eq(kv.getValue().getBytes(UTF_8))); |
| 125 | + } |
| 126 | + |
| 127 | + spanQuery.and(exists(and)); |
| 128 | + } |
| 129 | + |
| 130 | + return spanQuery.groupBy(schema.spanIdFields) |
| 131 | + .orderBy(max.desc()).limit(request.limit); |
| 132 | + } |
| 133 | + |
| 134 | + private SelectConditionStep constraintsWithoutAnnotations(DSLContext context, |
| 135 | + QueryRequest request, List<Field<?>> spanFields) { |
| 136 | + SelectConditionStep and = context.selectOne() |
| 137 | + .from(ZIPKIN_ANNOTATIONS) |
| 138 | + .where(schema.joinCondition(ZIPKIN_ANNOTATIONS)); |
119 | 139 |
|
120 | 140 | if (request.serviceName != null) {
|
121 |
| - dsl.and(ZIPKIN_ANNOTATIONS.ENDPOINT_SERVICE_NAME.eq(request.serviceName)); |
| 141 | + and.and(ZIPKIN_ANNOTATIONS.ENDPOINT_SERVICE_NAME.eq(request.serviceName)); |
122 | 142 | }
|
123 | 143 |
|
124 | 144 | if (request.spanName != null) {
|
125 |
| - dsl.and(ZIPKIN_SPANS.NAME.eq(request.spanName)); |
| 145 | + spanFields.add(ZIPKIN_SPANS.NAME); |
| 146 | + and.and(ZIPKIN_SPANS.NAME.eq(request.spanName)); |
126 | 147 | }
|
127 | 148 |
|
128 | 149 | if (request.minDuration != null && request.maxDuration != null) {
|
129 |
| - dsl.and(ZIPKIN_SPANS.DURATION.between(request.minDuration, request.maxDuration)); |
| 150 | + and.and(ZIPKIN_SPANS.DURATION.between(request.minDuration, request.maxDuration)); |
130 | 151 | } else if (request.minDuration != null) {
|
131 |
| - dsl.and(ZIPKIN_SPANS.DURATION.greaterOrEqual(request.minDuration)); |
| 152 | + and.and(ZIPKIN_SPANS.DURATION.greaterOrEqual(request.minDuration)); |
132 | 153 | }
|
133 |
| - return dsl |
134 |
| - .groupBy(schema.spanIdFields) |
135 |
| - .orderBy(ZIPKIN_SPANS.START_TS.max().desc()).limit(request.limit); |
136 |
| - } |
137 |
| - |
138 |
| - static TableOnConditionStep<?> maybeOnService(TableOnConditionStep<Record> table, |
139 |
| - ZipkinAnnotations aTable, String serviceName) { |
140 |
| - if (serviceName == null) return table; |
141 |
| - return table.and(aTable.ENDPOINT_SERVICE_NAME.eq(serviceName)); |
| 154 | + return and; |
142 | 155 | }
|
143 | 156 |
|
144 | 157 | List<List<zipkin.Span>> getTraces(@Nullable QueryRequest request, @Nullable Long traceIdHigh,
|
@@ -215,7 +228,8 @@ List<List<zipkin.Span>> getTraces(@Nullable QueryRequest request, @Nullable Long
|
215 | 228 | return GroupByTraceId.apply(allSpans, strictTraceId, !raw);
|
216 | 229 | }
|
217 | 230 |
|
218 |
| - static <T> T maybeGet(Record record, TableField<Record, T> field, T defaultValue) { |
| 231 | + @Nullable |
| 232 | + static <T> T maybeGet(Record record, TableField<Record, T> field, @Nullable T defaultValue) { |
219 | 233 | if (record.fieldsRow().indexOf(field) < 0) {
|
220 | 234 | return defaultValue;
|
221 | 235 | } else {
|
@@ -264,7 +278,6 @@ public List<String> getServiceNames() {
|
264 | 278 |
|
265 | 279 | @Override
|
266 | 280 | public List<String> getSpanNames(String serviceName) {
|
267 |
| - if (serviceName == null) return emptyList(); |
268 | 281 | serviceName = serviceName.toLowerCase(); // service names are always lowercase!
|
269 | 282 | try (Connection conn = datasource.getConnection()) {
|
270 | 283 | return context.get(conn)
|
|
0 commit comments