Skip to content

Commit cf53bc5

Browse files
committed
bulk: Fields defined in the _default_ mapping of an index template should be picked up when an index alias filter is parsed if a new index is introduced when a document is indexed into an index that doesn't exist yet via the bulk api.
Closes elastic#10609
1 parent 4b1d6db commit cf53bc5

File tree

2 files changed

+56
-21
lines changed

2 files changed

+56
-21
lines changed

src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,6 @@
2121

2222
import com.google.common.collect.Lists;
2323
import com.google.common.collect.Maps;
24-
import com.google.common.collect.Sets;
25-
2624
import org.elasticsearch.ElasticsearchException;
2725
import org.elasticsearch.ElasticsearchParseException;
2826
import org.elasticsearch.ExceptionsHelper;
@@ -60,11 +58,7 @@
6058
import org.elasticsearch.threadpool.ThreadPool;
6159
import org.elasticsearch.transport.TransportService;
6260

63-
import java.util.HashMap;
64-
import java.util.List;
65-
import java.util.Locale;
66-
import java.util.Map;
67-
import java.util.Set;
61+
import java.util.*;
6862
import java.util.concurrent.atomic.AtomicInteger;
6963

7064
/**
@@ -105,22 +99,33 @@ protected void doExecute(final BulkRequest bulkRequest, final ActionListener<Bul
10599
final AtomicArray<BulkItemResponse> responses = new AtomicArray<>(bulkRequest.requests.size());
106100

107101
if (autoCreateIndex.needToCheck()) {
108-
final Set<String> indices = Sets.newHashSet();
102+
// Keep track of all unique indices and all unique types per index for the create index requests:
103+
final Map<String, Set<String>> indicesAndTypes = new HashMap<>();
109104
for (ActionRequest request : bulkRequest.requests) {
110105
if (request instanceof DocumentRequest) {
111106
DocumentRequest req = (DocumentRequest) request;
112-
if (!indices.contains(req.index())) {
113-
indices.add(req.index());
107+
Set<String> types = indicesAndTypes.get(req.index());
108+
if (types == null) {
109+
indicesAndTypes.put(req.index(), types = new HashSet<>());
114110
}
111+
types.add(req.type());
115112
} else {
116113
throw new ElasticsearchException("Parsed unknown request in bulk actions: " + request.getClass().getSimpleName());
117114
}
118115
}
119-
final AtomicInteger counter = new AtomicInteger(indices.size());
116+
final AtomicInteger counter = new AtomicInteger(indicesAndTypes.size());
120117
ClusterState state = clusterService.state();
121-
for (final String index : indices) {
118+
for (Map.Entry<String, Set<String>> entry : indicesAndTypes.entrySet()) {
119+
final String index = entry.getKey();
122120
if (autoCreateIndex.shouldAutoCreate(index, state)) {
123-
createIndexAction.execute(new CreateIndexRequest(bulkRequest).index(index).cause("auto(bulk api)").masterNodeTimeout(bulkRequest.timeout()), new ActionListener<CreateIndexResponse>() {
121+
CreateIndexRequest createIndexRequest = new CreateIndexRequest(bulkRequest);
122+
createIndexRequest.index(index);
123+
for (String type : entry.getValue()) {
124+
createIndexRequest.mapping(type);
125+
}
126+
createIndexRequest.cause("auto(bulk api)");
127+
createIndexRequest.masterNodeTimeout(bulkRequest.timeout());
128+
createIndexAction.execute(createIndexRequest, new ActionListener<CreateIndexResponse>() {
124129
@Override
125130
public void onResponse(CreateIndexResponse result) {
126131
if (counter.decrementAndGet() == 0) {

src/test/java/org/elasticsearch/indices/template/SimpleIndexTemplateTests.java

Lines changed: 38 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse;
3030
import org.elasticsearch.action.admin.indices.template.get.GetIndexTemplatesResponse;
3131
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequestBuilder;
32+
import org.elasticsearch.action.bulk.BulkResponse;
33+
import org.elasticsearch.action.index.IndexRequest;
3234
import org.elasticsearch.action.search.SearchResponse;
3335
import org.elasticsearch.cluster.metadata.AliasMetaData;
3436
import org.elasticsearch.common.Priority;
@@ -604,38 +606,66 @@ public void testMultipleAliasesPrecedence() throws Exception {
604606
public void testStrictAliasParsingInIndicesCreatedViaTemplates() throws Exception {
605607
// Indexing into a should succeed, because the field mapping for field 'field' is defined in the test mapping.
606608
client().admin().indices().preparePutTemplate("template1")
607-
.setTemplate("a")
609+
.setTemplate("a*")
608610
.setOrder(0)
609611
.addMapping("test", "field", "type=string")
610612
.addAlias(new Alias("alias1").filter(termFilter("field", "value"))).get();
611613
// Indexing into b should succeed, because the field mapping for field 'field' is defined in the _default_ mapping and the test type exists.
612614
client().admin().indices().preparePutTemplate("template2")
613-
.setTemplate("b")
615+
.setTemplate("b*")
614616
.setOrder(0)
615617
.addMapping("_default_", "field", "type=string")
616618
.addMapping("test")
617619
.addAlias(new Alias("alias2").filter(termFilter("field", "value"))).get();
618620
// Indexing into c should succeed, because the field mapping for field 'field' is defined in the _default_ mapping.
619621
client().admin().indices().preparePutTemplate("template3")
620-
.setTemplate("c")
622+
.setTemplate("c*")
621623
.setOrder(0)
622624
.addMapping("_default_", "field", "type=string")
623625
.addAlias(new Alias("alias3").filter(termFilter("field", "value"))).get();
624626
// Indexing into d index should fail, since there is field with name 'field' in the mapping
625627
client().admin().indices().preparePutTemplate("template4")
626-
.setTemplate("d")
628+
.setTemplate("d*")
627629
.setOrder(0)
628630
.addAlias(new Alias("alias4").filter(termFilter("field", "value"))).get();
629631

630-
client().prepareIndex("a", "test", "test").setSource("{}").get();
631-
client().prepareIndex("b", "test", "test").setSource("{}").get();
632-
client().prepareIndex("c", "test", "test").setSource("{}").get();
632+
client().prepareIndex("a1", "test", "test").setSource("{}").get();
633+
BulkResponse response = client().prepareBulk().add(new IndexRequest("a2", "test", "test").source("{}")).get();
634+
assertThat(response.hasFailures(), is(false));
635+
assertThat(response.getItems()[0].isFailed(), equalTo(false));
636+
assertThat(response.getItems()[0].getIndex(), equalTo("a2"));
637+
assertThat(response.getItems()[0].getType(), equalTo("test"));
638+
assertThat(response.getItems()[0].getId(), equalTo("test"));
639+
assertThat(response.getItems()[0].getVersion(), equalTo(1l));
640+
641+
client().prepareIndex("b1", "test", "test").setSource("{}").get();
642+
response = client().prepareBulk().add(new IndexRequest("b2", "test", "test").source("{}")).get();
643+
assertThat(response.hasFailures(), is(false));
644+
assertThat(response.getItems()[0].isFailed(), equalTo(false));
645+
assertThat(response.getItems()[0].getIndex(), equalTo("b2"));
646+
assertThat(response.getItems()[0].getType(), equalTo("test"));
647+
assertThat(response.getItems()[0].getId(), equalTo("test"));
648+
assertThat(response.getItems()[0].getVersion(), equalTo(1l));
649+
650+
client().prepareIndex("c1", "test", "test").setSource("{}").get();
651+
response = client().prepareBulk().add(new IndexRequest("c2", "test", "test").source("{}")).get();
652+
assertThat(response.hasFailures(), is(false));
653+
assertThat(response.getItems()[0].isFailed(), equalTo(false));
654+
assertThat(response.getItems()[0].getIndex(), equalTo("c2"));
655+
assertThat(response.getItems()[0].getType(), equalTo("test"));
656+
assertThat(response.getItems()[0].getId(), equalTo("test"));
657+
assertThat(response.getItems()[0].getVersion(), equalTo(1l));
658+
633659
try {
634-
client().prepareIndex("d", "test", "test").setSource("{}").get();
660+
client().prepareIndex("d1", "test", "test").setSource("{}").get();
635661
fail();
636662
} catch (Exception e) {
637663
assertThat(ExceptionsHelper.unwrapCause(e), instanceOf(ElasticsearchIllegalArgumentException.class));
638664
assertThat(e.getMessage(), containsString("failed to parse filter for alias [alias4]"));
639665
}
666+
response = client().prepareBulk().add(new IndexRequest("d2", "test", "test").source("{}")).get();
667+
assertThat(response.hasFailures(), is(true));
668+
assertThat(response.getItems()[0].isFailed(), equalTo(true));
669+
assertThat(response.getItems()[0].getFailureMessage(), containsString("failed to parse filter for alias [alias4]"));
640670
}
641671
}

0 commit comments

Comments
 (0)