diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/request/JavaBinUpdateRequestCodec.java b/solr/solrj/src/java/org/apache/solr/client/solrj/request/JavaBinUpdateRequestCodec.java index 3e72746c4ba..782f441eb19 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/request/JavaBinUpdateRequestCodec.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/request/JavaBinUpdateRequestCodec.java @@ -22,14 +22,11 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.lang.invoke.MethodHandles; -import java.util.ArrayList; -import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Map.Entry; -import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.solr.common.SolrException; +import org.apache.solr.common.SolrException.ErrorCode; import org.apache.solr.common.SolrInputDocument; import org.apache.solr.common.SolrInputField; import org.apache.solr.common.params.ModifiableSolrParams; @@ -38,8 +35,6 @@ import org.apache.solr.common.util.DataInputInputStream; import org.apache.solr.common.util.JavaBinCodec; import org.apache.solr.common.util.NamedList; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * Provides methods for marshalling an UpdateRequest to a NamedList which can be serialized in the @@ -49,15 +44,6 @@ * @since solr 1.4 */ public class JavaBinUpdateRequestCodec { - private boolean readStringAsCharSeq = false; - - public JavaBinUpdateRequestCodec setReadStringAsCharSeq(boolean flag) { - this.readStringAsCharSeq = flag; - return this; - } - - private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); - private static final AtomicBoolean WARNED_ABOUT_INDEX_TIME_BOOSTS = new AtomicBoolean(); /** * Converts an UpdateRequest to a NamedList which can be serialized to the given OutputStream in @@ -69,32 +55,32 @@ public JavaBinUpdateRequestCodec setReadStringAsCharSeq(boolean flag) { */ public void marshal(UpdateRequest updateRequest, OutputStream os) throws IOException { NamedList nl = new NamedList<>(); + NamedList params = updateRequest.getParams().toNamedList(); if (updateRequest.getCommitWithin() != -1) { params.add("commitWithin", updateRequest.getCommitWithin()); } - Iterator docIter = null; - - if (updateRequest.getDocIterator() != null) { - docIter = updateRequest.getDocIterator(); - } - - Map> docMap = updateRequest.getDocumentsMap(); - nl.add("params", params); // 0: params + if (updateRequest.getDeleteByIdMap() != null) { nl.add("delByIdMap", updateRequest.getDeleteByIdMap()); } + nl.add("delByQ", updateRequest.getDeleteQuery()); + Map> docMap = updateRequest.getDocumentsMap(); if (docMap != null) { - nl.add("docsMap", docMap.entrySet().iterator()); + nl.add("docsMap", docMap.entrySet().iterator()); // of Map.Entry } else { + Iterator docIter; if (updateRequest.getDocuments() != null) { docIter = updateRequest.getDocuments().iterator(); + } else { + docIter = updateRequest.getDocIterator(); } nl.add("docs", docIter); } + try (JavaBinCodec codec = new JavaBinCodec()) { codec.marshal(nl, os); } @@ -114,65 +100,48 @@ public void marshal(UpdateRequest updateRequest, OutputStream os) throws IOExcep @SuppressWarnings({"unchecked"}) public UpdateRequest unmarshal(InputStream is, final StreamingUpdateHandler handler) throws IOException { - final UpdateRequest updateRequest = new UpdateRequest(); - List>> doclist; - List>> docMap; - List delById; - Map> delByIdMap; - List delByQ; - final NamedList[] namedList = new NamedList[1]; - try (JavaBinCodec codec = new StreamingCodec(namedList, updateRequest, handler)) { - codec.unmarshal(is); + final NamedList namedList; + + // process documents: + + // reads documents, sending to handler. Other data is in NamedList + try (var codec = new StreamingCodec(handler)) { + namedList = codec.unmarshal(is); } - // NOTE: if the update request contains only delete commands the params - // must be loaded now - if (updateRequest.getParams().iterator().hasNext() == false) { // no params - NamedList params = (NamedList) namedList[0].get("params"); + // process deletes: + + final UpdateRequest updateRequest = new UpdateRequest(); + { + NamedList params = (NamedList) namedList.get("params"); if (params != null) { - updateRequest.setParams(new ModifiableSolrParams(params.toSolrParams())); + updateRequest.setParams(ModifiableSolrParams.of(params.toSolrParams())); } } - delById = (List) namedList[0].get("delById"); - delByIdMap = (Map>) namedList[0].get("delByIdMap"); - delByQ = (List) namedList[0].get("delByQ"); - doclist = (List) namedList[0].get("docs"); - Object docsMapObj = namedList[0].get("docsMap"); - - if (docsMapObj instanceof Map) { // SOLR-5762 - docMap = new ArrayList<>(((Map) docsMapObj).entrySet()); - } else { - docMap = (List>>) docsMapObj; - } - // we don't add any docs, because they were already processed - // deletes are handled later, and must be passed back on the UpdateRequest - - if (delById != null) { - for (String s : delById) { - updateRequest.deleteById(s); - } + for (String s : (List) namedList.getOrDefault("delById", List.of())) { + updateRequest.deleteById(s); } - if (delByIdMap != null) { - for (Map.Entry> entry : delByIdMap.entrySet()) { - Map params = entry.getValue(); - if (params != null) { - Long version = (Long) params.get(UpdateRequest.VER); - if (params.containsKey(ShardParams._ROUTE_)) { - updateRequest.deleteById( - entry.getKey(), (String) params.get(ShardParams._ROUTE_), version); - } else { - updateRequest.deleteById(entry.getKey(), version); - } + + for (var entry : + ((Map>) namedList.getOrDefault("delByIdMap", Map.of())) + .entrySet()) { + Map params = entry.getValue(); + if (params != null) { + Long version = (Long) params.get(UpdateRequest.VER); + if (params.containsKey(ShardParams._ROUTE_)) { + updateRequest.deleteById( + entry.getKey(), (String) params.get(ShardParams._ROUTE_), version); } else { - updateRequest.deleteById(entry.getKey()); + updateRequest.deleteById(entry.getKey(), version); } + } else { + updateRequest.deleteById(entry.getKey()); } } - if (delByQ != null) { - for (String s : delByQ) { - updateRequest.deleteByQuery(s); - } + + for (String s : (List) namedList.getOrDefault("delByQ", List.of())) { + updateRequest.deleteByQuery(s); } return updateRequest; @@ -194,23 +163,23 @@ public Object getFieldValue(String name) { } } - class StreamingCodec extends JavaBinCodec { + static class StreamingCodec extends JavaBinCodec { - // TODO This could probably be an AtomicReference> - private final NamedList[] namedList; - private final UpdateRequest updateRequest; + private NamedList resultNamedList; private final StreamingUpdateHandler handler; // NOTE: this only works because this is an anonymous inner class // which will only ever be used on a single stream -- if this class // is ever refactored, this will not work. - private boolean seenOuterMostDocIterator; + private boolean seenOuterMostDocIterator = false; - public StreamingCodec( - NamedList[] namedList, UpdateRequest updateRequest, StreamingUpdateHandler handler) { - this.namedList = namedList; - this.updateRequest = updateRequest; + StreamingCodec(StreamingUpdateHandler handler) { this.handler = handler; - seenOuterMostDocIterator = false; + } + + @Override + public NamedList unmarshal(InputStream is) throws IOException { + super.unmarshal(is); + return resultNamedList; } @Override @@ -222,8 +191,8 @@ protected SolrInputDocument createSolrInputDocument(int sz) { public NamedList readNamedList(DataInputInputStream dis) throws IOException { int sz = readSize(dis); NamedList nl = new NamedList<>(); - if (namedList[0] == null) { - namedList[0] = nl; + if (this.resultNamedList == null) { + this.resultNamedList = nl; } for (int i = 0; i < sz; i++) { String name = (String) readVal(dis); @@ -233,42 +202,6 @@ public NamedList readNamedList(DataInputInputStream dis) throws IOExcept return nl; } - private SolrInputDocument listToSolrInputDocument(List> namedList) { - SolrInputDocument doc = new SolrInputDocument(); - for (int i = 0; i < namedList.size(); i++) { - NamedList nl = namedList.get(i); - if (i == 0) { - Float boost = (Float) nl.getVal(0); - if (boost != null && boost.floatValue() != 1f) { - String message = - "Ignoring document boost: " - + boost - + " as index-time boosts are not supported anymore"; - if (WARNED_ABOUT_INDEX_TIME_BOOSTS.compareAndSet(false, true)) { - log.warn(message); - } else { - log.debug(message); - } - } - } else { - Float boost = (Float) nl.getVal(2); - if (boost != null && boost.floatValue() != 1f) { - String message = - "Ignoring field boost: " - + boost - + " as index-time boosts are not supported anymore"; - if (WARNED_ABOUT_INDEX_TIME_BOOSTS.compareAndSet(false, true)) { - log.warn(message); - } else { - log.debug(message); - } - } - doc.addField((String) nl.getVal(0), nl.getVal(1)); - } - } - return doc; - } - @Override public List readIterator(DataInputInputStream fis) throws IOException { // default behavior for reading any regular Iterator in the stream @@ -277,67 +210,52 @@ public List readIterator(DataInputInputStream fis) throws IOException { // special treatment for first outermost Iterator // (the list of documents) seenOuterMostDocIterator = true; - return readOuterMostDocIterator(fis); + readDocs(fis); + return List.of(); // bogus; already processed } - private List readOuterMostDocIterator(DataInputInputStream fis) throws IOException { - if (namedList[0] == null) namedList[0] = new NamedList<>(); - NamedList params = (NamedList) namedList[0].get("params"); - if (params == null) params = new NamedList<>(); - updateRequest.setParams(new ModifiableSolrParams(params.toSolrParams())); - if (handler == null) return super.readIterator(fis); - Integer commitWithin = null; - Boolean overwrite = null; - Object o = null; - super.readStringAsCharSeq = JavaBinUpdateRequestCodec.this.readStringAsCharSeq; - try { - while (true) { - if (o == null) { - o = readVal(fis); - } - - if (o == END_OBJ) { - break; - } + private void readDocs(DataInputInputStream fis) throws IOException { + if (resultNamedList == null) resultNamedList = new NamedList<>(); - SolrInputDocument sdoc = null; - if (o instanceof List) { - @SuppressWarnings("unchecked") - List> list = (List>) o; - sdoc = listToSolrInputDocument(list); - } else if (o instanceof NamedList) { - UpdateRequest req = new UpdateRequest(); - req.setParams(new ModifiableSolrParams(((NamedList) o).toSolrParams())); - handler.update(null, req, null, null); - } else if (o instanceof Map.Entry) { - @SuppressWarnings("unchecked") - Map.Entry> entry = - (Map.Entry>) o; - sdoc = entry.getKey(); - Map p = entry.getValue(); - if (p != null) { - commitWithin = (Integer) p.get(UpdateRequest.COMMIT_WITHIN); - overwrite = (Boolean) p.get(UpdateRequest.OVERWRITE); - } - } else if (o instanceof SolrInputDocument) { - sdoc = (SolrInputDocument) o; - } else if (o instanceof Map) { - sdoc = convertMapToSolrInputDoc((Map) o); - } + UpdateRequest updateRequest = new UpdateRequest(); + NamedList params = (NamedList) resultNamedList.get("params"); // always precedes docs + if (params != null) { + updateRequest.setParams(ModifiableSolrParams.of(params.toSolrParams())); + } - // peek at the next object to see if we're at the end - o = readVal(fis); - if (o == END_OBJ) { - // indicate that we've hit the last doc in the batch, used to enable optimizations when - // doing replication - updateRequest.lastDocInBatch(); + Object o = readVal(fis); + while (o != END_OBJ) { + Integer commitWithin = null; + Boolean overwrite = null; + + SolrInputDocument sdoc; + if (o instanceof Map.Entry) { // doc + options. UpdateRequest "docsMap" + @SuppressWarnings("unchecked") + Map.Entry> entry = + (Map.Entry>) o; + sdoc = entry.getKey(); + Map p = entry.getValue(); + if (p != null) { + commitWithin = (Integer) p.get(UpdateRequest.COMMIT_WITHIN); + overwrite = (Boolean) p.get(UpdateRequest.OVERWRITE); } + } else if (o instanceof SolrInputDocument d) { // doc. UpdateRequest "docs"" + sdoc = d; + } else if (o instanceof Map m) { // doc. To imitate JSON style. SOLR-13731 + sdoc = convertMapToSolrInputDoc(m); + } else { + throw new SolrException(ErrorCode.BAD_REQUEST, "Unexpected data type: " + o.getClass()); + } - handler.update(sdoc, updateRequest, commitWithin, overwrite); + // peek at the next object to see if we're at the end + o = readVal(fis); + if (o == END_OBJ) { + // indicate that we've hit the last doc in the batch, used to enable optimizations when + // doing replication + updateRequest.lastDocInBatch(); } - return Collections.emptyList(); - } finally { - super.readStringAsCharSeq = false; + + handler.update(sdoc, updateRequest, commitWithin, overwrite); } } diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/request/TestUpdateRequestCodec.java b/solr/solrj/src/test/org/apache/solr/client/solrj/request/TestUpdateRequestCodec.java index ef24375ca16..e7ab61ddff7 100644 --- a/solr/solrj/src/test/org/apache/solr/client/solrj/request/TestUpdateRequestCodec.java +++ b/solr/solrj/src/test/org/apache/solr/client/solrj/request/TestUpdateRequestCodec.java @@ -247,13 +247,14 @@ public void testBackCompat4_5() throws IOException { InputStream is = getClass().getResourceAsStream("/solrj/updateReq_4_5.bin"); assertNotNull("updateReq_4_5.bin was not found", is); + List unmarshalledDocs = new ArrayList<>(); UpdateRequest updateUnmarshalled = new JavaBinUpdateRequestCodec() .unmarshal( is, (document, req, commitWithin, override) -> { if (commitWithin == null) { - req.add(document); + unmarshalledDocs.add(document); } System.err.println( "Doc" @@ -263,6 +264,7 @@ public void testBackCompat4_5() throws IOException { + " , override:" + override); }); + updateUnmarshalled.add(unmarshalledDocs); System.err.println(updateUnmarshalled.getDocumentsMap()); System.err.println(updateUnmarshalled.getDocuments());