Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

READ_COMMITTED_NO_WRITE #8

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ JanusGraph, coupled with the FoundationDB storage adapter provides the following
|FDB Storage Adapter|JanusGraph|FoundationDB|
|-:|-:|-:|
|0.1.0|0.3.0|5.2.5|
|0.2.0|0.4.0|6.2.7|


# Getting started

Expand Down
9 changes: 5 additions & 4 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
<properties>
<maven.compiler.plugin.version>3.6.2</maven.compiler.plugin.version>
<jdk.version>1.8</jdk.version>
<janusgraph.version>0.3.0</janusgraph.version>
<foundationdb.version>5.2.5</foundationdb.version>
<janusgraph.version>0.4.0</janusgraph.version>
<foundationdb.version>6.2.10</foundationdb.version>
<dependency.plugin.version>2.10</dependency.plugin.version>
<test.skip.default>false</test.skip.default>
<test.skip.tp>true</test.skip.tp>
Expand Down Expand Up @@ -66,10 +66,11 @@
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.apple</groupId>
<artifactId>foundationdb</artifactId>
<groupId>org.foundationdb</groupId>
<artifactId>fdb-java</artifactId>
<version>${foundationdb.version}</version>
</dependency>

<dependency>
<groupId>com.palantir.docker.compose</groupId>
<artifactId>docker-compose-rule-junit4</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,44 +121,45 @@ public RecordIterator<KeyValueEntry> getSlice(KVQuery query, StoreTransaction tx
final StaticBuffer keyStart = query.getStart();
final StaticBuffer keyEnd = query.getEnd();
final KeySelector selector = query.getKeySelector();
final List<KeyValueEntry> result = new ArrayList<>();
final byte[] foundKey = db.pack(keyStart.as(ENTRY_FACTORY));
final byte[] endKey = db.pack(keyEnd.as(ENTRY_FACTORY));

try {
final List<KeyValue> results = tx.getRange(foundKey, endKey, query.getLimit());

for (final KeyValue keyValue : results) {
StaticBuffer key = getBuffer(db.unpack(keyValue.getKey()).getBytes(0));
if (selector.include(key))
result.add(new KeyValueEntry(key, getBuffer(keyValue.getValue())));
}
final Iterator<KeyValue> results = tx.getRange(foundKey, endKey, query.getLimit());
return new FoundationDBRecordIterator(results, selector);
} catch (Exception e) {
throw new PermanentBackendException(e);
}

log.trace("db={}, op=getSlice, tx={}, resultcount={}", name, txh, result.size());

return new FoundationDBRecordIterator(result);
}



private class FoundationDBRecordIterator implements RecordIterator<KeyValueEntry> {
private final Iterator<KeyValueEntry> entries;
private final Iterator<KeyValue> entries;
private final KeySelector selector;
private KeyValueEntry keyValueEntry;

public FoundationDBRecordIterator(final List<KeyValueEntry> result) {
this.entries = result.iterator();
public FoundationDBRecordIterator(final Iterator<KeyValue> result, final KeySelector keySelector) {
this.entries = result;
this.selector = keySelector;
}

@Override
public boolean hasNext() {
return entries.hasNext();
while (entries.hasNext()) {
KeyValue keyValue = entries.next();
StaticBuffer key = getBuffer(db.unpack(keyValue.getKey()).getBytes(0));
if (selector.include(key)) {
this.keyValueEntry = new KeyValueEntry(key, getBuffer(keyValue.getValue()));
return true;
}
}
return false;
}

@Override
public KeyValueEntry next() {
return entries.next();
return this.keyValueEntry;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Repeated calls of next() without calling hasNext() in between will return the same value. Additionally, calling hasNext() multiple times will skip entries.

}

@Override
Expand All @@ -172,39 +173,23 @@ public void remove() {
}

@Override
public Map<KVQuery,RecordIterator<KeyValueEntry>> getSlices(List<KVQuery> queries, StoreTransaction txh) throws BackendException {
public Map<KVQuery, RecordIterator<KeyValueEntry>> getSlices(List<KVQuery> queries, StoreTransaction txh) throws BackendException {
log.trace("beginning db={}, op=getSlice, tx={}", name, txh);
FoundationDBTx tx = getTransaction(txh);
final Map<KVQuery, RecordIterator<KeyValueEntry>> resultMap = new ConcurrentHashMap<>();
final List<CompletableFuture<Void>> futures = new ArrayList<>();

try {
final List<Object[]> preppedQueries = new LinkedList<>();
for (final KVQuery query : queries) {
final StaticBuffer keyStart = query.getStart();
final StaticBuffer keyEnd = query.getEnd();
final KeySelector selector = query.getKeySelector();
final byte[] foundKey = db.pack(keyStart.as(ENTRY_FACTORY));
final byte[] endKey = db.pack(keyEnd.as(ENTRY_FACTORY));
preppedQueries.add(new Object[]{query, foundKey, endKey});
}
final Map<KVQuery, List<KeyValue>> result = tx.getMultiRange(preppedQueries);

for (Map.Entry<KVQuery, List<KeyValue>> entry : result.entrySet()) {
final List<KeyValueEntry> results = new ArrayList<>();
for (final KeyValue keyValue : entry.getValue()) {
final StaticBuffer key = getBuffer(db.unpack(keyValue.getKey()).getBytes(0));
if (entry.getKey().getKeySelector().include(key))
results.add(new KeyValueEntry(key, getBuffer(keyValue.getValue())));
}
resultMap.put(entry.getKey(), new FoundationDBRecordIterator(results));

resultMap.put(query, new FoundationDBRecordIterator(tx.getRange(foundKey, endKey, query.getLimit()), selector));
}
return resultMap;
} catch (Exception e) {
throw new PermanentBackendException(e);
}

return resultMap;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@

import com.apple.foundationdb.Database;
import com.apple.foundationdb.KeyValue;
import com.apple.foundationdb.Range;
import com.apple.foundationdb.ReadTransaction;
import com.apple.foundationdb.Transaction;
import com.apple.foundationdb.Range;
import com.apple.foundationdb.async.AsyncIterable;
import org.janusgraph.diskstorage.BackendException;
import org.janusgraph.diskstorage.BaseTransactionConfig;
import org.janusgraph.diskstorage.PermanentBackendException;
Expand All @@ -12,10 +14,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
Expand Down Expand Up @@ -90,6 +89,7 @@ public synchronized void rollback() throws BackendException {
tx.close();
tx = null;
} catch (Exception e) {
log.error("failed to rollback", e);
throw new PermanentBackendException(e);
} finally {
if (tx != null)
Expand Down Expand Up @@ -118,12 +118,14 @@ public synchronized void commit() throws BackendException {
failing = false;
break;
} catch (IllegalStateException | ExecutionException e) {
log.warn("failed to commit transaction", e);
if (isolationLevel.equals(IsolationLevel.SERIALIZABLE) ||
isolationLevel.equals(IsolationLevel.READ_COMMITTED_NO_WRITE)) {
break;
}
restart();
} catch (Exception e) {
log.error("failed to commit", e);
throw new PermanentBackendException(e);
}
}
Expand Down Expand Up @@ -151,12 +153,15 @@ public byte[] get(final byte[] key) throws PermanentBackendException {
byte[] value = null;
for (int i = 0; i < maxRuns; i++) {
try {
value = this.tx.get(key).get();
ReadTransaction transaction = getTransaction(this.isolationLevel, this.tx);
value = transaction.get(key).get();
failing = false;
break;
} catch (ExecutionException e) {
log.warn("failed to get ", e);
this.restart();
} catch (Exception e) {
log.error("failed to get key {}", key, e);
throw new PermanentBackendException(e);
}
}
Expand All @@ -166,28 +171,28 @@ public byte[] get(final byte[] key) throws PermanentBackendException {
return value;
}

public List<KeyValue> getRange(final byte[] startKey, final byte[] endKey,
final int limit) throws PermanentBackendException {
boolean failing = true;
List<KeyValue> result = Collections.emptyList();
for (int i = 0; i < maxRuns; i++) {
final int startTxId = txCtr.get();
try {
result = tx.getRange(new Range(startKey, endKey), limit).asList().get();
if (result == null) return Collections.emptyList();
failing = false;
break;
} catch (ExecutionException e) {
if (txCtr.get() == startTxId)
this.restart();
} catch (Exception e) {
throw new PermanentBackendException(e);
public Iterator<KeyValue> getRange(final byte[] startKey, final byte[] endKey,
final int limit) throws PermanentBackendException {
try {
ReadTransaction transaction = getTransaction(isolationLevel, this.tx);
AsyncIterable<KeyValue> result = transaction.getRange(new Range(startKey, endKey), limit);
if (result == null) {
return Collections.emptyIterator();
}
return result.iterator();
} catch (Exception e) {
log.error("raising backend exception for startKey {} endKey {} limit", startKey, endKey, limit, e);
throw new PermanentBackendException(e);
}
if (failing) {
throw new PermanentBackendException("Max transaction reset count exceeded");
}

private <T> T getTransaction(IsolationLevel isolationLevel, Transaction tx) {
if(IsolationLevel.READ_COMMITTED_NO_WRITE.equals(isolationLevel)
|| IsolationLevel.READ_COMMITTED_WITH_WRITE.equals(isolationLevel)) {
return (T)tx.snapshot();
} else {
return (T)tx;
}
return result;
}

public synchronized Map<KVQuery, List<KeyValue>> getMultiRange(final List<Object[]> queries)
Expand Down Expand Up @@ -229,6 +234,7 @@ public synchronized Map<KVQuery, List<KeyValue>> getMultiRange(final List<Objec
} catch (IllegalStateException is) {
// illegal state can arise from tx being closed while tx is inflight
} catch (Exception e) {
log.error("failed to get multi range for queries {}", queries, e);
throw new PermanentBackendException(e);
}
}
Expand Down