Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

JavaBinUpdateRequestCodec: Dead code removal #3207

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,9 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.SolrInputField;
import org.apache.solr.common.params.ModifiableSolrParams;
Expand All @@ -38,8 +33,6 @@
import org.apache.solr.common.util.DataInputInputStream;
import org.apache.solr.common.util.JavaBinCodec;
import org.apache.solr.common.util.NamedList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Provides methods for marshalling an UpdateRequest to a NamedList which can be serialized in the
Expand All @@ -49,15 +42,6 @@
* @since solr 1.4
*/
public class JavaBinUpdateRequestCodec {
private boolean readStringAsCharSeq = false;

public JavaBinUpdateRequestCodec setReadStringAsCharSeq(boolean flag) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unused; always false

this.readStringAsCharSeq = flag;
return this;
}

private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static final AtomicBoolean WARNED_ABOUT_INDEX_TIME_BOOSTS = new AtomicBoolean();

/**
* Converts an UpdateRequest to a NamedList which can be serialized to the given OutputStream in
Expand All @@ -69,32 +53,32 @@ public JavaBinUpdateRequestCodec setReadStringAsCharSeq(boolean flag) {
*/
public void marshal(UpdateRequest updateRequest, OutputStream os) throws IOException {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

simply moving code around for clarity in this method

NamedList<Object> nl = new NamedList<>();

NamedList<Object> params = updateRequest.getParams().toNamedList();
if (updateRequest.getCommitWithin() != -1) {
params.add("commitWithin", updateRequest.getCommitWithin());
}
Iterator<SolrInputDocument> docIter = null;

if (updateRequest.getDocIterator() != null) {
docIter = updateRequest.getDocIterator();
}

Map<SolrInputDocument, Map<String, Object>> docMap = updateRequest.getDocumentsMap();

nl.add("params", params); // 0: params

if (updateRequest.getDeleteByIdMap() != null) {
nl.add("delByIdMap", updateRequest.getDeleteByIdMap());
}

nl.add("delByQ", updateRequest.getDeleteQuery());

Map<SolrInputDocument, Map<String, Object>> docMap = updateRequest.getDocumentsMap();
if (docMap != null) {
nl.add("docsMap", docMap.entrySet().iterator());
nl.add("docsMap", docMap.entrySet().iterator()); // of Map.Entry
} else {
Iterator<SolrInputDocument> docIter;
if (updateRequest.getDocuments() != null) {
docIter = updateRequest.getDocuments().iterator();
} else {
docIter = updateRequest.getDocIterator();
}
nl.add("docs", docIter);
}

try (JavaBinCodec codec = new JavaBinCodec()) {
codec.marshal(nl, os);
}
Expand All @@ -114,65 +98,48 @@ public void marshal(UpdateRequest updateRequest, OutputStream os) throws IOExcep
@SuppressWarnings({"unchecked"})
public UpdateRequest unmarshal(InputStream is, final StreamingUpdateHandler handler)
throws IOException {
final UpdateRequest updateRequest = new UpdateRequest();
List<List<NamedList<?>>> doclist;
List<Entry<SolrInputDocument, Map<Object, Object>>> docMap;
List<String> delById;
Map<String, Map<String, Object>> delByIdMap;
List<String> delByQ;
final NamedList<?>[] namedList = new NamedList<?>[1];
try (JavaBinCodec codec = new StreamingCodec(namedList, updateRequest, handler)) {
codec.unmarshal(is);
final NamedList<Object> namedList;

// process documents:

// reads documents, sending to handler. Other data is in NamedList
try (var codec = new StreamingCodec(handler)) {
namedList = codec.unmarshal(is);
Comment on lines +108 to +109
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

big improvement, simplifying the request/response contract here with the internal StreamingCodec. No need to have the array of NamedList or pass that in or pass an UpdateRequest. Now a simple NamedList response.

}

// NOTE: if the update request contains only delete commands the params
// must be loaded now
if (updateRequest.getParams().iterator().hasNext() == false) { // no params
NamedList<?> params = (NamedList<?>) namedList[0].get("params");
// process deletes:

final UpdateRequest updateRequest = new UpdateRequest();
{
NamedList<?> params = (NamedList<?>) namedList.get("params");
if (params != null) {
updateRequest.setParams(new ModifiableSolrParams(params.toSolrParams()));
updateRequest.setParams(ModifiableSolrParams.of(params.toSolrParams()));
}
}
delById = (List<String>) namedList[0].get("delById");
delByIdMap = (Map<String, Map<String, Object>>) namedList[0].get("delByIdMap");
delByQ = (List<String>) namedList[0].get("delByQ");
doclist = (List) namedList[0].get("docs");
Object docsMapObj = namedList[0].get("docsMap");

if (docsMapObj instanceof Map) { // SOLR-5762
docMap = new ArrayList<>(((Map) docsMapObj).entrySet());
} else {
docMap = (List<Entry<SolrInputDocument, Map<Object, Object>>>) docsMapObj;
}

// we don't add any docs, because they were already processed
// deletes are handled later, and must be passed back on the UpdateRequest
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the code improvements made this comment needless


if (delById != null) {
for (String s : delById) {
updateRequest.deleteById(s);
}
for (String s : (List<String>) namedList.getOrDefault("delById", List.of())) {
updateRequest.deleteById(s);
}
if (delByIdMap != null) {
for (Map.Entry<String, Map<String, Object>> entry : delByIdMap.entrySet()) {
Map<String, Object> params = entry.getValue();
if (params != null) {
Long version = (Long) params.get(UpdateRequest.VER);
if (params.containsKey(ShardParams._ROUTE_)) {
updateRequest.deleteById(
entry.getKey(), (String) params.get(ShardParams._ROUTE_), version);
} else {
updateRequest.deleteById(entry.getKey(), version);
}

for (var entry :
((Map<String, Map<String, Object>>) namedList.getOrDefault("delByIdMap", Map.of()))
.entrySet()) {
Map<String, Object> params = entry.getValue();
if (params != null) {
Long version = (Long) params.get(UpdateRequest.VER);
if (params.containsKey(ShardParams._ROUTE_)) {
updateRequest.deleteById(
entry.getKey(), (String) params.get(ShardParams._ROUTE_), version);
} else {
updateRequest.deleteById(entry.getKey());
updateRequest.deleteById(entry.getKey(), version);
}
} else {
updateRequest.deleteById(entry.getKey());
}
}
if (delByQ != null) {
for (String s : delByQ) {
updateRequest.deleteByQuery(s);
}

for (String s : (List<String>) namedList.getOrDefault("delByQ", List.of())) {
updateRequest.deleteByQuery(s);
}

return updateRequest;
Expand All @@ -194,23 +161,23 @@ public Object getFieldValue(String name) {
}
}

class StreamingCodec extends JavaBinCodec {
static class StreamingCodec extends JavaBinCodec {

// TODO This could probably be an AtomicReference<NamedList<?>>
private final NamedList<?>[] namedList;
private final UpdateRequest updateRequest;
private NamedList<Object> resultNamedList;
private final StreamingUpdateHandler handler;
// NOTE: this only works because this is an anonymous inner class
// which will only ever be used on a single stream -- if this class
// is ever refactored, this will not work.
private boolean seenOuterMostDocIterator;
private boolean seenOuterMostDocIterator = false;

public StreamingCodec(
NamedList<?>[] namedList, UpdateRequest updateRequest, StreamingUpdateHandler handler) {
this.namedList = namedList;
this.updateRequest = updateRequest;
StreamingCodec(StreamingUpdateHandler handler) {
this.handler = handler;
seenOuterMostDocIterator = false;
}

@Override
public NamedList<Object> unmarshal(InputStream is) throws IOException {
super.unmarshal(is);
return resultNamedList;
}

@Override
Expand All @@ -222,8 +189,8 @@ protected SolrInputDocument createSolrInputDocument(int sz) {
public NamedList<Object> readNamedList(DataInputInputStream dis) throws IOException {
int sz = readSize(dis);
NamedList<Object> nl = new NamedList<>();
if (namedList[0] == null) {
namedList[0] = nl;
if (this.resultNamedList == null) {
this.resultNamedList = nl;
}
for (int i = 0; i < sz; i++) {
String name = (String) readVal(dis);
Expand All @@ -233,42 +200,6 @@ public NamedList<Object> readNamedList(DataInputInputStream dis) throws IOExcept
return nl;
}

private SolrInputDocument listToSolrInputDocument(List<NamedList<?>> namedList) {
SolrInputDocument doc = new SolrInputDocument();
for (int i = 0; i < namedList.size(); i++) {
NamedList<?> nl = namedList.get(i);
if (i == 0) {
Float boost = (Float) nl.getVal(0);
if (boost != null && boost.floatValue() != 1f) {
String message =
"Ignoring document boost: "
+ boost
+ " as index-time boosts are not supported anymore";
if (WARNED_ABOUT_INDEX_TIME_BOOSTS.compareAndSet(false, true)) {
log.warn(message);
} else {
log.debug(message);
}
}
} else {
Float boost = (Float) nl.getVal(2);
if (boost != null && boost.floatValue() != 1f) {
String message =
"Ignoring field boost: "
+ boost
+ " as index-time boosts are not supported anymore";
if (WARNED_ABOUT_INDEX_TIME_BOOSTS.compareAndSet(false, true)) {
log.warn(message);
} else {
log.debug(message);
}
}
doc.addField((String) nl.getVal(0), nl.getVal(1));
}
}
return doc;
}

@Override
public List<Object> readIterator(DataInputInputStream fis) throws IOException {
// default behavior for reading any regular Iterator in the stream
Expand All @@ -277,67 +208,52 @@ public List<Object> readIterator(DataInputInputStream fis) throws IOException {
// special treatment for first outermost Iterator
// (the list of documents)
seenOuterMostDocIterator = true;
return readOuterMostDocIterator(fis);
readDocs(fis);
return List.of(); // bogus; already processed
}

private List<Object> readOuterMostDocIterator(DataInputInputStream fis) throws IOException {
if (namedList[0] == null) namedList[0] = new NamedList<>();
NamedList<?> params = (NamedList<?>) namedList[0].get("params");
if (params == null) params = new NamedList<>();
updateRequest.setParams(new ModifiableSolrParams(params.toSolrParams()));
if (handler == null) return super.readIterator(fis);
Integer commitWithin = null;
Boolean overwrite = null;
Object o = null;
super.readStringAsCharSeq = JavaBinUpdateRequestCodec.this.readStringAsCharSeq;
try {
while (true) {
if (o == null) {
o = readVal(fis);
}

if (o == END_OBJ) {
break;
}
private void readDocs(DataInputInputStream fis) throws IOException {
if (resultNamedList == null) resultNamedList = new NamedList<>();

SolrInputDocument sdoc = null;
if (o instanceof List) {
@SuppressWarnings("unchecked")
List<NamedList<?>> list = (List<NamedList<?>>) o;
sdoc = listToSolrInputDocument(list);
Comment on lines -304 to -307
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

} else if (o instanceof NamedList) {
UpdateRequest req = new UpdateRequest();
req.setParams(new ModifiableSolrParams(((NamedList) o).toSolrParams()));
handler.update(null, req, null, null);
Comment on lines -308 to -311
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

} else if (o instanceof Map.Entry) {
@SuppressWarnings("unchecked")
Map.Entry<SolrInputDocument, Map<?, ?>> entry =
(Map.Entry<SolrInputDocument, Map<?, ?>>) o;
sdoc = entry.getKey();
Map<?, ?> p = entry.getValue();
if (p != null) {
commitWithin = (Integer) p.get(UpdateRequest.COMMIT_WITHIN);
overwrite = (Boolean) p.get(UpdateRequest.OVERWRITE);
}
} else if (o instanceof SolrInputDocument) {
sdoc = (SolrInputDocument) o;
} else if (o instanceof Map) {
sdoc = convertMapToSolrInputDoc((Map) o);
}
UpdateRequest updateRequest = new UpdateRequest();
NamedList<?> params = (NamedList<?>) resultNamedList.get("params"); // always precedes docs
if (params != null) {
updateRequest.setParams(ModifiableSolrParams.of(params.toSolrParams()));
}

// peek at the next object to see if we're at the end
o = readVal(fis);
if (o == END_OBJ) {
// indicate that we've hit the last doc in the batch, used to enable optimizations when
// doing replication
updateRequest.lastDocInBatch();
Object o = readVal(fis);
while (o != END_OBJ) {
Integer commitWithin = null;
Boolean overwrite = null;

SolrInputDocument sdoc;
if (o instanceof Map.Entry) { // doc + options. UpdateRequest "docsMap"
@SuppressWarnings("unchecked")
Map.Entry<SolrInputDocument, Map<?, ?>> entry =
(Map.Entry<SolrInputDocument, Map<?, ?>>) o;
sdoc = entry.getKey();
Map<?, ?> p = entry.getValue();
if (p != null) {
commitWithin = (Integer) p.get(UpdateRequest.COMMIT_WITHIN);
overwrite = (Boolean) p.get(UpdateRequest.OVERWRITE);
}
} else if (o instanceof SolrInputDocument d) { // doc. UpdateRequest "docs""
sdoc = d;
} else if (o instanceof Map<?, ?> m) { // doc. To imitate JSON style. SOLR-13731
sdoc = convertMapToSolrInputDoc(m);
} else {
throw new IllegalStateException("Unexpected data type: " + o.getClass());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not being silent about the unexpected! I'll change this to SolrException(ErrorCode.BAD_REQUEST

}

handler.update(sdoc, updateRequest, commitWithin, overwrite);
// peek at the next object to see if we're at the end
o = readVal(fis);
if (o == END_OBJ) {
// indicate that we've hit the last doc in the batch, used to enable optimizations when
// doing replication
updateRequest.lastDocInBatch();
}
return Collections.emptyList();
} finally {
super.readStringAsCharSeq = false;

handler.update(sdoc, updateRequest, commitWithin, overwrite);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,13 +247,14 @@ public void testBackCompat4_5() throws IOException {

InputStream is = getClass().getResourceAsStream("/solrj/updateReq_4_5.bin");
assertNotNull("updateReq_4_5.bin was not found", is);
List<SolrInputDocument> unmarshalledDocs = new ArrayList<>();
UpdateRequest updateUnmarshalled =
new JavaBinUpdateRequestCodec()
.unmarshal(
is,
(document, req, commitWithin, override) -> {
if (commitWithin == null) {
req.add(document);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test needed an update because JavaBinUpdateRequestCodec no longer uses the same UpdateRequest between the add-docs and delete-docs parts, which doesn't matter. Arguably, the callback shouldn't even take an UpdateRequest anyway.

unmarshalledDocs.add(document);
}
System.err.println(
"Doc"
Expand All @@ -263,6 +264,7 @@ public void testBackCompat4_5() throws IOException {
+ " , override:"
+ override);
});
updateUnmarshalled.add(unmarshalledDocs);

System.err.println(updateUnmarshalled.getDocumentsMap());
System.err.println(updateUnmarshalled.getDocuments());
Expand Down
Loading