diff --git a/README.md b/README.md index f08f71c..05f51eb 100644 --- a/README.md +++ b/README.md @@ -14,6 +14,8 @@ JanusGraph, coupled with the FoundationDB storage adapter provides the following |FDB Storage Adapter|JanusGraph|FoundationDB| |-:|-:|-:| |0.1.0|0.3.0|5.2.5| +|0.2.0|0.4.0|6.2.7| + # Getting started diff --git a/pom.xml b/pom.xml index f0ef473..b9d4c3f 100644 --- a/pom.xml +++ b/pom.xml @@ -13,8 +13,8 @@ 3.6.2 1.8 - 0.3.0 - 5.2.5 + 0.4.0 + 6.2.10 2.10 false true @@ -66,10 +66,11 @@ test - com.apple - foundationdb + org.foundationdb + fdb-java ${foundationdb.version} + com.palantir.docker.compose docker-compose-rule-junit4 diff --git a/src/main/java/com/experoinc/janusgraph/diskstorage/foundationdb/FoundationDBKeyValueStore.java b/src/main/java/com/experoinc/janusgraph/diskstorage/foundationdb/FoundationDBKeyValueStore.java index 5f7ee85..733058b 100644 --- a/src/main/java/com/experoinc/janusgraph/diskstorage/foundationdb/FoundationDBKeyValueStore.java +++ b/src/main/java/com/experoinc/janusgraph/diskstorage/foundationdb/FoundationDBKeyValueStore.java @@ -121,44 +121,45 @@ public RecordIterator getSlice(KVQuery query, StoreTransaction tx final StaticBuffer keyStart = query.getStart(); final StaticBuffer keyEnd = query.getEnd(); final KeySelector selector = query.getKeySelector(); - final List result = new ArrayList<>(); final byte[] foundKey = db.pack(keyStart.as(ENTRY_FACTORY)); final byte[] endKey = db.pack(keyEnd.as(ENTRY_FACTORY)); try { - final List results = tx.getRange(foundKey, endKey, query.getLimit()); - - for (final KeyValue keyValue : results) { - StaticBuffer key = getBuffer(db.unpack(keyValue.getKey()).getBytes(0)); - if (selector.include(key)) - result.add(new KeyValueEntry(key, getBuffer(keyValue.getValue()))); - } + final Iterator results = tx.getRange(foundKey, endKey, query.getLimit()); + return new FoundationDBRecordIterator(results, selector); } catch (Exception e) { throw new PermanentBackendException(e); } - - log.trace("db={}, op=getSlice, tx={}, resultcount={}", name, txh, result.size()); - - return new FoundationDBRecordIterator(result); } private class FoundationDBRecordIterator implements RecordIterator { - private final Iterator entries; + private final Iterator entries; + private final KeySelector selector; + private KeyValueEntry keyValueEntry; - public FoundationDBRecordIterator(final List result) { - this.entries = result.iterator(); + public FoundationDBRecordIterator(final Iterator result, final KeySelector keySelector) { + this.entries = result; + this.selector = keySelector; } @Override public boolean hasNext() { - return entries.hasNext(); + while (entries.hasNext()) { + KeyValue keyValue = entries.next(); + StaticBuffer key = getBuffer(db.unpack(keyValue.getKey()).getBytes(0)); + if (selector.include(key)) { + this.keyValueEntry = new KeyValueEntry(key, getBuffer(keyValue.getValue())); + return true; + } + } + return false; } @Override public KeyValueEntry next() { - return entries.next(); + return this.keyValueEntry; } @Override @@ -172,39 +173,23 @@ public void remove() { } @Override - public Map> getSlices(List queries, StoreTransaction txh) throws BackendException { + public Map> getSlices(List queries, StoreTransaction txh) throws BackendException { log.trace("beginning db={}, op=getSlice, tx={}", name, txh); FoundationDBTx tx = getTransaction(txh); final Map> resultMap = new ConcurrentHashMap<>(); - final List> futures = new ArrayList<>(); - try { - final List preppedQueries = new LinkedList<>(); for (final KVQuery query : queries) { final StaticBuffer keyStart = query.getStart(); final StaticBuffer keyEnd = query.getEnd(); final KeySelector selector = query.getKeySelector(); final byte[] foundKey = db.pack(keyStart.as(ENTRY_FACTORY)); final byte[] endKey = db.pack(keyEnd.as(ENTRY_FACTORY)); - preppedQueries.add(new Object[]{query, foundKey, endKey}); - } - final Map> result = tx.getMultiRange(preppedQueries); - - for (Map.Entry> entry : result.entrySet()) { - final List results = new ArrayList<>(); - for (final KeyValue keyValue : entry.getValue()) { - final StaticBuffer key = getBuffer(db.unpack(keyValue.getKey()).getBytes(0)); - if (entry.getKey().getKeySelector().include(key)) - results.add(new KeyValueEntry(key, getBuffer(keyValue.getValue()))); - } - resultMap.put(entry.getKey(), new FoundationDBRecordIterator(results)); - + resultMap.put(query, new FoundationDBRecordIterator(tx.getRange(foundKey, endKey, query.getLimit()), selector)); } + return resultMap; } catch (Exception e) { throw new PermanentBackendException(e); } - - return resultMap; } @Override diff --git a/src/main/java/com/experoinc/janusgraph/diskstorage/foundationdb/FoundationDBTx.java b/src/main/java/com/experoinc/janusgraph/diskstorage/foundationdb/FoundationDBTx.java index 785a9b5..0c5164e 100644 --- a/src/main/java/com/experoinc/janusgraph/diskstorage/foundationdb/FoundationDBTx.java +++ b/src/main/java/com/experoinc/janusgraph/diskstorage/foundationdb/FoundationDBTx.java @@ -2,8 +2,10 @@ import com.apple.foundationdb.Database; import com.apple.foundationdb.KeyValue; -import com.apple.foundationdb.Range; +import com.apple.foundationdb.ReadTransaction; import com.apple.foundationdb.Transaction; +import com.apple.foundationdb.Range; +import com.apple.foundationdb.async.AsyncIterable; import org.janusgraph.diskstorage.BackendException; import org.janusgraph.diskstorage.BaseTransactionConfig; import org.janusgraph.diskstorage.PermanentBackendException; @@ -12,10 +14,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Collections; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; @@ -90,6 +89,7 @@ public synchronized void rollback() throws BackendException { tx.close(); tx = null; } catch (Exception e) { + log.error("failed to rollback", e); throw new PermanentBackendException(e); } finally { if (tx != null) @@ -118,12 +118,14 @@ public synchronized void commit() throws BackendException { failing = false; break; } catch (IllegalStateException | ExecutionException e) { + log.warn("failed to commit transaction", e); if (isolationLevel.equals(IsolationLevel.SERIALIZABLE) || isolationLevel.equals(IsolationLevel.READ_COMMITTED_NO_WRITE)) { break; } restart(); } catch (Exception e) { + log.error("failed to commit", e); throw new PermanentBackendException(e); } } @@ -151,12 +153,15 @@ public byte[] get(final byte[] key) throws PermanentBackendException { byte[] value = null; for (int i = 0; i < maxRuns; i++) { try { - value = this.tx.get(key).get(); + ReadTransaction transaction = getTransaction(this.isolationLevel, this.tx); + value = transaction.get(key).get(); failing = false; break; } catch (ExecutionException e) { + log.warn("failed to get ", e); this.restart(); } catch (Exception e) { + log.error("failed to get key {}", key, e); throw new PermanentBackendException(e); } } @@ -166,28 +171,28 @@ public byte[] get(final byte[] key) throws PermanentBackendException { return value; } - public List getRange(final byte[] startKey, final byte[] endKey, - final int limit) throws PermanentBackendException { - boolean failing = true; - List result = Collections.emptyList(); - for (int i = 0; i < maxRuns; i++) { - final int startTxId = txCtr.get(); - try { - result = tx.getRange(new Range(startKey, endKey), limit).asList().get(); - if (result == null) return Collections.emptyList(); - failing = false; - break; - } catch (ExecutionException e) { - if (txCtr.get() == startTxId) - this.restart(); - } catch (Exception e) { - throw new PermanentBackendException(e); + public Iterator getRange(final byte[] startKey, final byte[] endKey, + final int limit) throws PermanentBackendException { + try { + ReadTransaction transaction = getTransaction(isolationLevel, this.tx); + AsyncIterable result = transaction.getRange(new Range(startKey, endKey), limit); + if (result == null) { + return Collections.emptyIterator(); } + return result.iterator(); + } catch (Exception e) { + log.error("raising backend exception for startKey {} endKey {} limit", startKey, endKey, limit, e); + throw new PermanentBackendException(e); } - if (failing) { - throw new PermanentBackendException("Max transaction reset count exceeded"); + } + + private T getTransaction(IsolationLevel isolationLevel, Transaction tx) { + if(IsolationLevel.READ_COMMITTED_NO_WRITE.equals(isolationLevel) + || IsolationLevel.READ_COMMITTED_WITH_WRITE.equals(isolationLevel)) { + return (T)tx.snapshot(); + } else { + return (T)tx; } - return result; } public synchronized Map> getMultiRange(final List queries) @@ -229,6 +234,7 @@ public synchronized Map> getMultiRange(final List