diff --git a/docs/configs/janusgraph-cfg.md b/docs/configs/janusgraph-cfg.md index e8d07a7043..1affcaa30b 100644 --- a/docs/configs/janusgraph-cfg.md +++ b/docs/configs/janusgraph-cfg.md @@ -589,7 +589,7 @@ HBase storage options | storage.hbase.short-cf-names | Whether to shorten the names of JanusGraph's column families to one-character mnemonics to conserve storage space | Boolean | true | FIXED | | storage.hbase.skip-schema-check | Assume that JanusGraph's HBase table and column families already exist. When this is true, JanusGraph will not check for the existence of its table/CFs, nor will it attempt to create them under any circumstances. This is useful when running JanusGraph without HBase admin privileges. | Boolean | false | MASKABLE | | storage.hbase.snapshot-name | The name of an existing HBase snapshot to be used by HBaseSnapshotInputFormat | String | janusgraph-snapshot | LOCAL | -| storage.hbase.snapshot-restore-dir | The temporary directory to be used by HBaseSnapshotInputFormat to restore a snapshot. This directory should be on the same File System as the HBase root dir. | String | /tmp | LOCAL | +| storage.hbase.snapshot-restore-dir | The temporary directory to be used by HBaseSnapshotInputFormat to restore a snapshot. This directory should be on the same File System as the HBase root dir. | String | /var/folders/1t/1106nwhx4x95p3l9vw7kml4c0000gp/T/ | LOCAL | | storage.hbase.table | The name of the table JanusGraph will use. When storage.hbase.skip-schema-check is false, JanusGraph will automatically create this table if it does not already exist. If this configuration option is not provided but graph.graphname is, the table will be set to that value. | String | janusgraph | LOCAL | ### storage.lock diff --git a/janusgraph-all/pom.xml b/janusgraph-all/pom.xml index c07884c9f5..a909158b0e 100644 --- a/janusgraph-all/pom.xml +++ b/janusgraph-all/pom.xml @@ -82,6 +82,48 @@ janusgraph-bigtable ${project.version} + + org.janusgraph + janusgraph-tablestore + ${project.version} + + + io.netty + netty-transport-native-epoll + + + ch.qos.reload4j + reload4j + + + org.apache.httpcomponents + httpasyncclient + + + + + org.apache.hbase + hbase-client + 2.5.10 + + + io.netty + netty-transport-native-epoll + + + ch.qos.reload4j + reload4j + + + org.jruby.jcodings + jcodings + + + org.jruby.jcodings + jcodings + + + org.janusgraph janusgraph-hadoop diff --git a/janusgraph-core/src/main/java/org/janusgraph/core/util/ReflectiveConfigOptionLoader.java b/janusgraph-core/src/main/java/org/janusgraph/core/util/ReflectiveConfigOptionLoader.java index 3d2365ae2a..11d106544d 100644 --- a/janusgraph-core/src/main/java/org/janusgraph/core/util/ReflectiveConfigOptionLoader.java +++ b/janusgraph-core/src/main/java/org/janusgraph/core/util/ReflectiveConfigOptionLoader.java @@ -122,6 +122,7 @@ public void loadStandard(Class caller) { */ List classnames = Collections.unmodifiableList(Arrays.asList( "org.janusgraph.diskstorage.hbase.HBaseStoreManager", + "org.janusgraph.diskstorage.tablestore.TableStoreStoreManager", "org.janusgraph.diskstorage.cql.CQLConfigOptions", "org.janusgraph.diskstorage.es.ElasticSearchIndex", "org.janusgraph.diskstorage.solr.SolrIndex", diff --git a/janusgraph-core/src/main/java/org/janusgraph/diskstorage/Backend.java b/janusgraph-core/src/main/java/org/janusgraph/diskstorage/Backend.java index d805dc389c..25971e530c 100644 --- a/janusgraph-core/src/main/java/org/janusgraph/diskstorage/Backend.java +++ b/janusgraph-core/src/main/java/org/janusgraph/diskstorage/Backend.java @@ -171,6 +171,7 @@ public class Backend implements LockerProvider, AutoCloseable { put(StandardStoreManager.BDB_JE, STORAGE_DIRECTORY); put(StandardStoreManager.CQL, STORAGE_HOSTS); put(StandardStoreManager.HBASE, STORAGE_HOSTS); + put(StandardStoreManager.TABLESTORE,STORAGE_HOSTS); //put(StandardStorageBackend.IN_MEMORY, null); put(StandardStoreManager.SCYLLA, STORAGE_HOSTS); }}); diff --git a/janusgraph-core/src/main/java/org/janusgraph/diskstorage/StandardStoreManager.java b/janusgraph-core/src/main/java/org/janusgraph/diskstorage/StandardStoreManager.java index 186ed0aa50..846959b9b5 100644 --- a/janusgraph-core/src/main/java/org/janusgraph/diskstorage/StandardStoreManager.java +++ b/janusgraph-core/src/main/java/org/janusgraph/diskstorage/StandardStoreManager.java @@ -29,6 +29,7 @@ public enum StandardStoreManager { BDB_JE("org.janusgraph.diskstorage.berkeleyje.BerkeleyJEStoreManager", "berkeleyje"), CQL("org.janusgraph.diskstorage.cql.CQLStoreManager", "cql"), HBASE("org.janusgraph.diskstorage.hbase.HBaseStoreManager", "hbase"), + TABLESTORE("org.janusgraph.diskstorage.tablestore.TableStoreStoreManager", "tablestore"), IN_MEMORY("org.janusgraph.diskstorage.inmemory.InMemoryStoreManager", "inmemory"), SCYLLA("org.janusgraph.diskstorage.cql.ScyllaStoreManager", "scylla"); diff --git a/janusgraph-dist/docker/Dockerfile b/janusgraph-dist/docker/Dockerfile index aaf7485e60..b992da36d5 100644 --- a/janusgraph-dist/docker/Dockerfile +++ b/janusgraph-dist/docker/Dockerfile @@ -18,7 +18,7 @@ FROM debian:buster-slim as builder ARG TARGETARCH ARG JANUS_VERSION=1.0.0-SNAPSHOT -ARG BUILD_PATH=janusgraph-java-11-1.0.0-SNAPSHOT +ARG BUILD_PATH=janusgraph-full-1.1.0-SNAPSHOT ARG YQ_VERSION=3.4.1 ENV JANUS_VERSION=${JANUS_VERSION} \ diff --git a/janusgraph-dist/pom.xml b/janusgraph-dist/pom.xml index 2f4ce6d41f..0970db8ebb 100644 --- a/janusgraph-dist/pom.xml +++ b/janusgraph-dist/pom.xml @@ -252,52 +252,6 @@ - - org.codehaus.mojo - exec-maven-plugin - false - - - generate-janusgraph-config-table - generate-resources - - java - - - org.janusgraph.util.system.ConfigurationPrinter - - org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.ROOT_NS - ${doc.dir}/configs/janusgraph-cfg.md - - - - - run-cfilter - generate-resources - - exec - - - java - - - -classpath - - - org.janusgraph.util.system.ConfigurationFileFilter - - ${assembly.cfilter.in.dir} - - ${assembly.cfilter.out.dir} - - - - - generate-janusgraph-gremlin-imports - none - - - maven-source-plugin diff --git a/janusgraph-tablestore/docker/Dockerfile b/janusgraph-tablestore/docker/Dockerfile new file mode 100644 index 0000000000..75ef6326a3 --- /dev/null +++ b/janusgraph-tablestore/docker/Dockerfile @@ -0,0 +1,59 @@ +# Copyright 2021 JanusGraph Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +FROM debian:bookworm-20240211 as builder + +ARG HBASE_VERSION=2.5.0 +ARG HBASE_DIST="https://archive.apache.org/dist/hbase" + +RUN apt-get update && \ + DEBIAN_FRONTEND=noninteractive apt-get install -y curl + +RUN curl -SL ${HBASE_DIST}/${HBASE_VERSION}/hbase-${HBASE_VERSION}-bin.tar.gz | tar -x -z && mv hbase-${HBASE_VERSION} /opt/hbase +WORKDIR /opt/hbase +RUN rm -rf ./docs ./LEGAL ./*.txt ./*/*.cmd + +COPY ./hbase-site.xml /opt/hbase/conf/hbase-site.xml +COPY ./hbase-policy.xml /opt/hbase/conf/hbase-policy.xml + +FROM openjdk:8-jre-buster + +ARG HBASE_UID=1000 +ARG HBASE_GID=1000 + +ENV LOGS_DIR=/data/logs + +COPY --from=builder /opt/hbase/ /opt/hbase/ + +RUN ln -sf /opt/hbase/bin/* /usr/bin/ +RUN mkdir -p $LOGS_DIR /data/hbase /data/wal /data/run /opt/hbase/logs +RUN apt-get update && \ + DEBIAN_FRONTEND=noninteractive apt-get install -y socat +WORKDIR /opt/hbase +COPY ./entrypoint.sh /entrypoint.sh + +EXPOSE 2181 +EXPOSE 16000 +EXPOSE 16010 +EXPOSE 16020 +EXPOSE 16030 + +ENTRYPOINT ["/entrypoint.sh"] + +RUN groupadd -g ${HBASE_GID} hbase && \ + useradd -s /bin/bash hbase -u ${HBASE_UID} -g ${HBASE_GID} && \ + chmod +x /entrypoint.sh && \ + chown -R ${HBASE_UID}:${HBASE_GID} /data && \ + chown -R ${HBASE_UID}:${HBASE_GID} /opt/hbase/logs +USER hbase diff --git a/janusgraph-tablestore/docker/entrypoint.sh b/janusgraph-tablestore/docker/entrypoint.sh new file mode 100644 index 0000000000..db99806ae8 --- /dev/null +++ b/janusgraph-tablestore/docker/entrypoint.sh @@ -0,0 +1,20 @@ +#!/usr/bin/env bash +# +# Copyright 2022 JanusGraph Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# re export zookeeper +socat tcp-listen:2182,fork,reuseaddr tcp-connect:localhost:2181 & + +/usr/bin/hbase master start \ No newline at end of file diff --git a/janusgraph-tablestore/docker/hbase-policy.xml b/janusgraph-tablestore/docker/hbase-policy.xml new file mode 100644 index 0000000000..e45f23c962 --- /dev/null +++ b/janusgraph-tablestore/docker/hbase-policy.xml @@ -0,0 +1,53 @@ + + + + + + + security.client.protocol.acl + * + ACL for HRegionInterface protocol implementations (ie. + clients talking to HRegionServers) + The ACL is a comma-separated list of user and group names. The user and + group list is separated by a blank. For e.g. "alice,bob users,wheel". + A special value of "*" means all users are allowed. + + + + security.admin.protocol.acl + * + ACL for HMasterInterface protocol implementation (ie. + clients talking to HMaster for admin operations). + The ACL is a comma-separated list of user and group names. The user and + group list is separated by a blank. For e.g. "alice,bob users,wheel". + A special value of "*" means all users are allowed. + + + + security.masterregion.protocol.acl + * + ACL for HMasterRegionInterface protocol implementations + (for HRegionServers communicating with HMaster) + The ACL is a comma-separated list of user and group names. The user and + group list is separated by a blank. For e.g. "alice,bob users,wheel". + A special value of "*" means all users are allowed. + + diff --git a/janusgraph-tablestore/docker/hbase-site.xml b/janusgraph-tablestore/docker/hbase-site.xml new file mode 100644 index 0000000000..03b2f28d8a --- /dev/null +++ b/janusgraph-tablestore/docker/hbase-site.xml @@ -0,0 +1,45 @@ + + + + + hbase.zookeeper.quorum + localhost + + + hbase.rootdir + file:////data/hbase + + + hbase.cluster.distributed + false + + + hbase.wal.dir + file:////data/wal + + + hbase.unsafe.stream.capability.enforce + false + + + + hbase.master.info.bindAddress + 0.0.0.0 + + + + hbase.regionserver.info.bindAddress + 0.0.0.0 + + + + hbase.master.ipc.address + 0.0.0.0 + + + + hbase.regionserver.ipc.address + 0.0.0.0 + + + diff --git a/janusgraph-tablestore/pom.xml b/janusgraph-tablestore/pom.xml new file mode 100644 index 0000000000..7288882cfb --- /dev/null +++ b/janusgraph-tablestore/pom.xml @@ -0,0 +1,195 @@ + + 4.0.0 + + org.janusgraph + janusgraph + 1.1.0-SNAPSHOT + ../pom.xml + + janusgraph-tablestore + JanusGraph-Tablestore: Alibaba Cloud Tablestore driver + https://janusgraph.org + + + 2.0 + 1000 + 1000 + ${basedir}/.. + ${basedir}/target + + + + + + + org.apache.logging.log4j + log4j-slf4j-impl + runtime + true + + + ch.qos.logback + logback-classic + runtime + true + + + + + org.janusgraph + janusgraph-core + ${project.version} + + + org.janusgraph + janusgraph-hadoop + ${project.version} + true + + + org.apache.hbase + hbase-shaded-client + ${hbase2.version} + + + log4j + log4j + + + org.apache.htrace + htrace-core4 + + + + + org.apache.hbase.thirdparty + hbase-noop-htrace + ${htrace.version} + + + org.apache.hbase + hbase-shaded-mapreduce + ${hbase2.version} + + + ch.qos.reload4j + reload4j + + + + + org.janusgraph + janusgraph-backend-testutils + ${project.version} + test + + + org.janusgraph + janusgraph-hadoop + ${project.version} + tests + test + + + org.testcontainers + testcontainers + test + + + org.testcontainers + junit-jupiter + test + + + + + + + + + net.java.dev.jna + jna + ${jna.version} + test + + + + com.aliyun.openservices + tablestore-hbase-client + 2.0.12 + + + io.netty + netty-transport-native-epoll + + + ch.qos.reload4j + reload4j + + + org.slf4j + slf4j-api + + + org.apache.hbase + hbase-client + + + org.apache.httpcomponents + httpasyncclient + + + + + + + + + ${basedir}/target + + + ${basedir}/src/main/resources + true + + + + + maven-surefire-plugin + + + ${hbase.docker.version} + ${hbase.docker.uid} + ${hbase.docker.gid} + ${test.hbase.targetdir} + + + + + default-test + none + + + hbase-test + + test + + test + + + + + maven-jar-plugin + + + pack-test-jar + + prepare-package + + test-jar + + + + + + + diff --git a/janusgraph-tablestore/src/main/java/org/janusgraph/diskstorage/tablestore/TableStoreKeyColumnValueStore.java b/janusgraph-tablestore/src/main/java/org/janusgraph/diskstorage/tablestore/TableStoreKeyColumnValueStore.java new file mode 100644 index 0000000000..9c837d08a8 --- /dev/null +++ b/janusgraph-tablestore/src/main/java/org/janusgraph/diskstorage/tablestore/TableStoreKeyColumnValueStore.java @@ -0,0 +1,390 @@ +// Copyright 2017 JanusGraph Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.janusgraph.diskstorage.tablestore; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; +import com.google.common.collect.Iterators; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.filter.ColumnPaginationFilter; +import org.apache.hadoop.hbase.filter.ColumnRangeFilter; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.filter.FilterList; +import org.janusgraph.diskstorage.BackendException; +import org.janusgraph.diskstorage.Entry; +import org.janusgraph.diskstorage.EntryList; +import org.janusgraph.diskstorage.EntryMetaData; +import org.janusgraph.diskstorage.PermanentBackendException; +import org.janusgraph.diskstorage.StaticBuffer; +import org.janusgraph.diskstorage.TemporaryBackendException; +import org.janusgraph.diskstorage.keycolumnvalue.KCVMutation; +import org.janusgraph.diskstorage.keycolumnvalue.KCVSUtil; +import org.janusgraph.diskstorage.keycolumnvalue.KeyColumnValueStore; +import org.janusgraph.diskstorage.keycolumnvalue.KeyIterator; +import org.janusgraph.diskstorage.keycolumnvalue.KeyRangeQuery; +import org.janusgraph.diskstorage.keycolumnvalue.KeySliceQuery; +import org.janusgraph.diskstorage.keycolumnvalue.KeySlicesIterator; +import org.janusgraph.diskstorage.keycolumnvalue.MultiSlicesQuery; +import org.janusgraph.diskstorage.keycolumnvalue.SliceQuery; +import org.janusgraph.diskstorage.keycolumnvalue.StoreTransaction; +import org.janusgraph.diskstorage.util.RecordIterator; +import org.janusgraph.diskstorage.util.StaticArrayBuffer; +import org.janusgraph.diskstorage.util.StaticArrayEntry; +import org.janusgraph.diskstorage.util.StaticArrayEntryList; +import org.janusgraph.util.system.IOUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import javax.annotation.Nullable; + +import static org.janusgraph.diskstorage.tablestore.TableStoreStoreManager.defaultColumnFamilyNameBytes; + +/** + * Here are some areas that might need work: + *

+ * - batching? (consider HTable#batch, HTable#setAutoFlush(false) + * - tuning HTable#setWriteBufferSize (?) + * - writing a server-side filter to replace ColumnCountGetFilter, which drops + * all columns on the row where it reaches its limit. This requires getSlice, + * currently, to impose its limit on the client side. That obviously won't + * scale. + * - RowMutations for combining Puts+Deletes (need a newer HBase than 0.92 for this) + * - (maybe) fiddle with HTable#setRegionCachePrefetch and/or #prewarmRegionCache + *

+ * There may be other problem areas. These are just the ones of which I'm aware. + */ +public class TableStoreKeyColumnValueStore implements KeyColumnValueStore { + + private static final Logger logger = LoggerFactory.getLogger(TableStoreKeyColumnValueStore.class); + + private final TableName tableName; + private final TableStoreStoreManager storeManager; + + private final String storeName; + + private final HBaseGetter entryGetter; + + private final Connection cnx; + + private final byte[] columnFamilyBytes = defaultColumnFamilyNameBytes; + + TableStoreKeyColumnValueStore(TableStoreStoreManager storeManager, Connection cnx, TableName tableName, String storeName) { + this.storeManager = storeManager; + this.cnx = cnx; + this.tableName = tableName; + this.storeName = storeName; + this.entryGetter = new HBaseGetter(storeManager.getMetaDataSchema(storeName)); + } + + @Override + public void close() throws BackendException { + } + + @Override + public EntryList getSlice(KeySliceQuery query, StoreTransaction txh) throws BackendException { + Map result = getHelper(Collections.singletonList(query.getKey()), getFilter(query)); + return Iterables.getOnlyElement(result.values(), EntryList.EMPTY_LIST); + } + + @Override + public Map getSlice(List keys, SliceQuery query, StoreTransaction txh) throws BackendException { + return getHelper(keys, getFilter(query)); + } + + @Override + public void mutate(StaticBuffer key, List additions, List deletions, StoreTransaction txh) throws BackendException { + Map mutations = ImmutableMap.of(key, new KCVMutation(additions, deletions)); + mutateMany(mutations, txh); + } + + @Override + public void acquireLock(StaticBuffer key, + StaticBuffer column, + StaticBuffer expectedValue, + StoreTransaction txh) { + throw new UnsupportedOperationException(); + } + + @Override + public KeyIterator getKeys(KeyRangeQuery query, StoreTransaction txh) throws BackendException { + return executeKeySliceQuery(query.getKeyStart().as(StaticBuffer.ARRAY_FACTORY), + query.getKeyEnd().as(StaticBuffer.ARRAY_FACTORY), + new FilterList(FilterList.Operator.MUST_PASS_ALL), + query); + } + + @Override + public String getName() { + return storeName; + } + + @Override + public KeyIterator getKeys(SliceQuery query, StoreTransaction txh) throws BackendException { + return executeKeySliceQuery(new FilterList(FilterList.Operator.MUST_PASS_ALL), query); + } + + @Override + public KeySlicesIterator getKeys(MultiSlicesQuery queries, StoreTransaction txh) { + throw new UnsupportedOperationException(); + } + + public static Filter getFilter(SliceQuery query) { + byte[] colStartBytes = query.getSliceStart().length() > 0 ? query.getSliceStart().as(StaticBuffer.ARRAY_FACTORY) : null; + byte[] colEndBytes = query.getSliceEnd().length() > 0 ? query.getSliceEnd().as(StaticBuffer.ARRAY_FACTORY) : null; + + Filter filter = new ColumnRangeFilter(colStartBytes, true, colEndBytes, false); + + if (query.hasLimit()) { + filter = new FilterList(FilterList.Operator.MUST_PASS_ALL, + filter, + new ColumnPaginationFilter(query.getLimit(), colStartBytes)); + } + + logger.debug("Generated HBase Filter {}", filter); + + return filter; + } + + private Map getHelper(List keys, Filter getFilter) throws BackendException { + List requests = new ArrayList<>(keys.size()); + { + for (StaticBuffer key : keys) { + Get g = new Get(key.as(StaticBuffer.ARRAY_FACTORY)).addFamily(columnFamilyBytes).setFilter(getFilter); + try { + g.setTimeRange(0, Long.MAX_VALUE); + } catch (IOException e) { + throw new PermanentBackendException(e); + } + requests.add(g); + } + } + + final Map resultMap = new HashMap<>(keys.size()); + + try { + Table table = null; + final Result[] results; + + try { + table = cnx.getTable(tableName); + results = table.get(requests); + } finally { + IOUtils.closeQuietly(table); + } + + if (results == null) + return KCVSUtil.emptyResults(keys); + + assert results.length==keys.size(); + + for (int i = 0; i < results.length; i++) { + final Result result = results[i]; + NavigableMap>> f = result.getMap(); + + if (f == null) { // no result for this key + resultMap.put(keys.get(i), EntryList.EMPTY_LIST); + continue; + } + + // actual key with + NavigableMap> r = f.get(columnFamilyBytes); + resultMap.put(keys.get(i), (r == null) + ? EntryList.EMPTY_LIST + : StaticArrayEntryList.ofBytes(r.entrySet(), entryGetter)); + } + + return resultMap; + } catch (InterruptedIOException e) { + // added to support traversal interruption + Thread.currentThread().interrupt(); + throw new PermanentBackendException(e); + } catch (IOException e) { + throw new TemporaryBackendException(e); + } + } + + private void mutateMany(Map mutations, StoreTransaction txh) throws BackendException { + storeManager.mutateMany(ImmutableMap.of(storeName, mutations), txh); + } + + private KeyIterator executeKeySliceQuery(FilterList filters, @Nullable SliceQuery columnSlice) throws BackendException { + return executeKeySliceQuery(null, null, filters, columnSlice); + } + + private KeyIterator executeKeySliceQuery(@Nullable byte[] startKey, + @Nullable byte[] endKey, + FilterList filters, + @Nullable SliceQuery columnSlice) throws BackendException { + Scan scan = new Scan().addFamily(columnFamilyBytes); + + try { + scan.setTimeRange(0, Long.MAX_VALUE); + } catch (IOException e) { + throw new PermanentBackendException(e); + } + + if (startKey != null) + scan.withStartRow(startKey); + + if (endKey != null) + scan.withStopRow(endKey); + + if (columnSlice != null) { + filters.addFilter(getFilter(columnSlice)); + } + + Table table = null; + + try { + table = cnx.getTable(tableName); + return new RowIterator(table, table.getScanner(scan.setFilter(filters)), columnFamilyBytes); + } catch (IOException e) { + IOUtils.closeQuietly(table); + throw new PermanentBackendException(e); + } + } + + private class RowIterator implements KeyIterator { + private final Closeable table; + private final Iterator rows; + private final byte[] columnFamilyBytes; + + private Result currentRow; + private boolean isClosed; + + public RowIterator(Closeable table, ResultScanner rows, byte[] columnFamilyBytes) { + this.table = table; + this.columnFamilyBytes = Arrays.copyOf(columnFamilyBytes, columnFamilyBytes.length); + this.rows = Iterators.filter(rows.iterator(), result -> null != result && null != result.getRow()); + } + + @Override + public RecordIterator getEntries() { + ensureOpen(); + + return new RecordIterator() { + private final Iterator>> kv; + { + final Map>> map = currentRow.getMap(); + Preconditions.checkNotNull(map); + kv = map.get(columnFamilyBytes).entrySet().iterator(); + } + + @Override + public boolean hasNext() { + ensureOpen(); + return kv.hasNext(); + } + + @Override + public Entry next() { + ensureOpen(); + return StaticArrayEntry.ofBytes(kv.next(), entryGetter); + } + + @Override + public void close() { + isClosed = true; + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + + @Override + public boolean hasNext() { + ensureOpen(); + return rows.hasNext(); + } + + @Override + public StaticBuffer next() { + ensureOpen(); + + currentRow = rows.next(); + return StaticArrayBuffer.of(currentRow.getRow()); + } + + @Override + public void close() { + IOUtils.closeQuietly(table); + isClosed = true; + logger.debug("RowIterator closed table {}", table); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + + private void ensureOpen() { + if (isClosed) + throw new IllegalStateException("Iterator has been closed."); + } + } + + private static class HBaseGetter implements StaticArrayEntry.GetColVal>, byte[]> { + + private final EntryMetaData[] schema; + + private HBaseGetter(EntryMetaData[] schema) { + this.schema = schema; + } + + @Override + public byte[] getColumn(Map.Entry> element) { + return element.getKey(); + } + + @Override + public byte[] getValue(Map.Entry> element) { + return element.getValue().lastEntry().getValue(); + } + + @Override + public EntryMetaData[] getMetaSchema(Map.Entry> element) { + return schema; + } + + @Override + public Object getMetaData(Map.Entry> element, EntryMetaData meta) { + if (meta == EntryMetaData.TIMESTAMP) { + return element.getValue().lastEntry().getKey(); + } + throw new UnsupportedOperationException("Unsupported meta data: " + meta); + } + } +} diff --git a/janusgraph-tablestore/src/main/java/org/janusgraph/diskstorage/tablestore/TableStoreStoreManager.java b/janusgraph-tablestore/src/main/java/org/janusgraph/diskstorage/tablestore/TableStoreStoreManager.java new file mode 100644 index 0000000000..df103e4572 --- /dev/null +++ b/janusgraph-tablestore/src/main/java/org/janusgraph/diskstorage/tablestore/TableStoreStoreManager.java @@ -0,0 +1,947 @@ +// Copyright 2017 JanusGraph Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.janusgraph.diskstorage.tablestore; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.collect.BiMap; +import com.google.common.collect.ImmutableBiMap; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Sets; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.Row; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.io.compress.Compression; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; +import org.janusgraph.core.JanusGraphException; +import org.janusgraph.diskstorage.BackendException; +import org.janusgraph.diskstorage.BaseTransactionConfig; +import org.janusgraph.diskstorage.Entry; +import org.janusgraph.diskstorage.EntryMetaData; +import org.janusgraph.diskstorage.PermanentBackendException; +import org.janusgraph.diskstorage.StaticBuffer; +import org.janusgraph.diskstorage.StoreMetaData; +import org.janusgraph.diskstorage.TemporaryBackendException; +import org.janusgraph.diskstorage.common.DistributedStoreManager; +import org.janusgraph.diskstorage.configuration.ConfigElement; +import org.janusgraph.diskstorage.configuration.ConfigNamespace; +import org.janusgraph.diskstorage.configuration.ConfigOption; +import org.janusgraph.diskstorage.configuration.Configuration; +import org.janusgraph.diskstorage.keycolumnvalue.KCVMutation; +import org.janusgraph.diskstorage.keycolumnvalue.KeyColumnValueStore; +import org.janusgraph.diskstorage.keycolumnvalue.KeyColumnValueStoreManager; +import org.janusgraph.diskstorage.keycolumnvalue.KeyRange; +import org.janusgraph.diskstorage.keycolumnvalue.StandardStoreFeatures; +import org.janusgraph.diskstorage.keycolumnvalue.StoreFeatures; +import org.janusgraph.diskstorage.keycolumnvalue.StoreTransaction; +import org.janusgraph.diskstorage.util.BufferUtil; +import org.janusgraph.diskstorage.util.StaticArrayBuffer; +import org.janusgraph.diskstorage.util.time.TimestampProviders; +import org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration; +import org.janusgraph.graphdb.configuration.PreInitializeConfigOptions; +import org.janusgraph.util.system.IOUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; + +import static org.janusgraph.diskstorage.Backend.EDGESTORE_NAME; +import static org.janusgraph.diskstorage.Backend.INDEXSTORE_NAME; +import static org.janusgraph.diskstorage.Backend.LOCK_STORE_SUFFIX; +import static org.janusgraph.diskstorage.Backend.SYSTEM_MGMT_LOG_NAME; +import static org.janusgraph.diskstorage.Backend.SYSTEM_TX_LOG_NAME; +import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.GRAPH_NAME; +import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.IDS_STORE_NAME; +import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.SYSTEM_PROPERTIES_STORE_NAME; + +/** + * Storage Manager for HBase + * + * @author Dan LaRocque <dalaro@hopcount.org> + */ +@PreInitializeConfigOptions +public class TableStoreStoreManager extends DistributedStoreManager implements KeyColumnValueStoreManager { + + private static final Logger logger = LoggerFactory.getLogger(TableStoreStoreManager.class); + + public static final ConfigNamespace HBASE_NS = + new ConfigNamespace(GraphDatabaseConfiguration.STORAGE_NS, "hbase", "HBase storage options"); + + public static final ConfigOption SHORT_CF_NAMES = + new ConfigOption<>(HBASE_NS, "short-cf-names", + "Whether to shorten the names of JanusGraph's column families to one-character mnemonics " + + "to conserve storage space", ConfigOption.Type.FIXED, true); + + public static final String COMPRESSION_DEFAULT = "-DEFAULT-"; + + public static final ConfigOption COMPRESSION = + new ConfigOption<>(HBASE_NS, "compression-algorithm", + "An HBase Compression.Algorithm enum string which will be applied to newly created column families. " + + "The compression algorithm must be installed and available on the HBase cluster. JanusGraph cannot install " + + "and configure new compression algorithms on the HBase cluster by itself.", + ConfigOption.Type.MASKABLE, "GZ"); + + public static final ConfigOption SKIP_SCHEMA_CHECK = + new ConfigOption<>(HBASE_NS, "skip-schema-check", + "Assume that JanusGraph's HBase table and column families already exist. " + + "When this is true, JanusGraph will not check for the existence of its table/CFs, " + + "nor will it attempt to create them under any circumstances. This is useful " + + "when running JanusGraph without HBase admin privileges.", + ConfigOption.Type.MASKABLE, false); + + public static final ConfigOption HBASE_TABLE = + new ConfigOption<>(HBASE_NS, "table", + "The name of the table JanusGraph will use. When " + ConfigElement.getPath(SKIP_SCHEMA_CHECK) + + " is false, JanusGraph will automatically create this table if it does not already exist." + + " If this configuration option is not provided but graph.graphname is, the table will be set" + + " to that value.", + ConfigOption.Type.LOCAL, "janusgraph"); + + public static final ConfigOption HBASE_SNAPSHOT = + new ConfigOption<>(HBASE_NS, "snapshot-name", + "The name of an existing HBase snapshot to be used by HBaseSnapshotInputFormat", + ConfigOption.Type.LOCAL, "janusgraph-snapshot"); + + public static final ConfigOption HBASE_SNAPSHOT_RESTORE_DIR = + new ConfigOption<>(HBASE_NS, "snapshot-restore-dir", + "The temporary directory to be used by HBaseSnapshotInputFormat to restore a snapshot." + + " This directory should be on the same File System as the HBase root dir.", + ConfigOption.Type.LOCAL, System.getProperty("java.io.tmpdir")); + + /** + * Related bug fixed in 0.98.0, 0.94.7, 0.95.0: + * + * https://issues.apache.org/jira/browse/HBASE-8170 + */ + public static final int MIN_REGION_COUNT = 3; + + /** + * The total number of HBase regions to create with JanusGraph's table. This + * setting only effects table creation; this normally happens just once when + * JanusGraph connects to an HBase backend for the first time. + */ + public static final ConfigOption REGION_COUNT = + new ConfigOption(HBASE_NS, "region-count", + "The number of initial regions set when creating JanusGraph's HBase table", + ConfigOption.Type.MASKABLE, Integer.class, input -> null != input && MIN_REGION_COUNT <= input); + + /** + * This setting is used only when {@link #REGION_COUNT} is unset. + *

+ * If JanusGraph's HBase table does not exist, then it will be created with total + * region count = (number of servers reported by ClusterStatus) * (this + * value). + *

+ * The Apache HBase manual suggests an order-of-magnitude range of potential + * values for this setting: + * + *

    + *
  • + * 2.5.2.7. Managed Splitting: + *
    + * What's the optimal number of pre-split regions to create? Mileage will + * vary depending upon your application. You could start low with 10 + * pre-split regions / server and watch as data grows over time. It's + * better to err on the side of too little regions and rolling split later. + *
    + *
  • + *
  • + * 9.7 Regions: + *
    + * In general, HBase is designed to run with a small (20-200) number of + * relatively large (5-20Gb) regions per server... Typically you want to + * keep your region count low on HBase for numerous reasons. Usually + * right around 100 regions per RegionServer has yielded the best results. + *
    + *
  • + *
+ * + * These considerations may differ for other HBase implementations (e.g. MapR). + */ + public static final ConfigOption REGIONS_PER_SERVER = + new ConfigOption<>(HBASE_NS, "regions-per-server", + "The number of regions per regionserver to set when creating JanusGraph's HBase table", + ConfigOption.Type.MASKABLE, Integer.class); + + public static final int PORT_DEFAULT = 2181; // Not used. Just for the parent constructor. + + public static final TimestampProviders PREFERRED_TIMESTAMPS = TimestampProviders.MILLI; + + public static final ConfigNamespace HBASE_CONFIGURATION_NAMESPACE = + new ConfigNamespace(HBASE_NS, "ext", "Overrides for hbase-{site,default}.xml options", true); + + public static final byte[] defaultColumnFamilyNameBytes = Bytes.toBytes("s"); + + private static final StaticBuffer FOUR_ZERO_BYTES = BufferUtil.zeroBuffer(4); + + // Immutable instance fields + private final BiMap shortCfNameMap; + private final String tableNamePrefix; + private final String compression; + private final int regionCount; + private final int regionsPerServer; + private final Connection cnx; + private final boolean shortCfNames; + private final boolean skipSchemaCheck; + // Cached return value of getDeployment() as requesting it can be expensive. + private Deployment deployment = null; + + private final org.apache.hadoop.conf.Configuration hconf; + + private static final ConcurrentHashMap openManagers = new ConcurrentHashMap<>(); + + // Mutable instance state + private final ConcurrentMap openStores; + + public TableStoreStoreManager(org.janusgraph.diskstorage.configuration.Configuration config) throws BackendException { + super(config, PORT_DEFAULT); + + shortCfNameMap = createShortCfMap(config); + + Preconditions.checkArgument(null != shortCfNameMap); + Collection shorts = shortCfNameMap.values(); + Preconditions.checkArgument(Sets.newHashSet(shorts).size() == shorts.size()); + + this.tableNamePrefix = determineTableNamePrefix(config); + if(tableNamePrefix.contains("_")) { + throw new PermanentBackendException(String.format("tableNamePrefix %s contains invalid characters '_'",tableNamePrefix)); + } + this.compression = config.get(COMPRESSION); + this.regionCount = config.has(REGION_COUNT) ? config.get(REGION_COUNT) : -1; + this.regionsPerServer = config.has(REGIONS_PER_SERVER) ? config.get(REGIONS_PER_SERVER) : -1; + this.skipSchemaCheck = config.get(SKIP_SCHEMA_CHECK); + + /* + * Specifying both region count options is permitted but may be + * indicative of a misunderstanding, so issue a warning. + */ + if (config.has(REGIONS_PER_SERVER) && config.has(REGION_COUNT)) { + logger.warn("Both {} and {} are set in JanusGraph's configuration, but " + + "the former takes precedence and the latter will be ignored.", + REGION_COUNT, REGIONS_PER_SERVER); + } + + /* This static factory calls HBaseConfiguration.addHbaseResources(), + * which in turn applies the contents of hbase-default.xml and then + * applies the contents of hbase-site.xml. + */ + hconf = HBaseConfiguration.create(); + + // Copy a subset of our commons config into a Hadoop config + int keysLoaded=0; + Map configSub = config.getSubset(HBASE_CONFIGURATION_NAMESPACE); + for (Map.Entry entry : configSub.entrySet()) { + logger.info("HBase configuration: setting {}={}", entry.getKey(), entry.getValue()); + if (entry.getValue()==null) continue; + hconf.set(entry.getKey(), entry.getValue().toString()); + keysLoaded++; + } + + logger.debug("HBase configuration: set a total of {} configuration values", keysLoaded); + + // Special case for STORAGE_HOSTS + if (config.has(GraphDatabaseConfiguration.STORAGE_HOSTS)) { + String zkQuorumKey = "hbase.zookeeper.quorum"; + String csHostList = Joiner.on(",").join(config.get(GraphDatabaseConfiguration.STORAGE_HOSTS)); + hconf.set(zkQuorumKey, csHostList); + logger.info("Copied host list from {} to {}: {}", GraphDatabaseConfiguration.STORAGE_HOSTS, zkQuorumKey, csHostList); + } + + // Special case for STORAGE_PORT + if (config.has(GraphDatabaseConfiguration.STORAGE_PORT)) { + String zkPortKey = "hbase.zookeeper.property.clientPort"; + Integer zkPort = config.get(GraphDatabaseConfiguration.STORAGE_PORT); + hconf.set(zkPortKey, zkPort.toString()); + logger.info("Copied Zookeeper Port from {} to {}: {}", GraphDatabaseConfiguration.STORAGE_PORT, zkPortKey, zkPort); + } + + this.shortCfNames = config.get(SHORT_CF_NAMES); + + try { + this.cnx = ConnectionFactory.createConnection(hconf); + } catch (IOException e) { + throw new PermanentBackendException(e); + } + + if (logger.isTraceEnabled()) { + openManagers.put(this, new Throwable("Manager Opened")); + dumpOpenManagers(); + } + + logger.debug("Dumping HBase config key=value pairs"); + for (Map.Entry entry : hconf) { + logger.debug("[HBaseConfig] " + entry.getKey() + "=" + entry.getValue()); + } + logger.debug("End of HBase config key=value pairs"); + + openStores = new ConcurrentHashMap<>(); + } + + public static BiMap createShortCfMap(Configuration config) { + return ImmutableBiMap.builder() + .put(INDEXSTORE_NAME, "g") + .put(INDEXSTORE_NAME + LOCK_STORE_SUFFIX, "h") + .put(config.get(IDS_STORE_NAME), "i") + .put(EDGESTORE_NAME, "e") + .put(EDGESTORE_NAME + LOCK_STORE_SUFFIX, "f") + .put(SYSTEM_PROPERTIES_STORE_NAME, "s") + .put(SYSTEM_PROPERTIES_STORE_NAME + LOCK_STORE_SUFFIX, "t") + .put(SYSTEM_MGMT_LOG_NAME, "m") + .put(SYSTEM_TX_LOG_NAME, "l") + .build(); + } + + @Override + public Deployment getDeployment() { + if (null != deployment) { + return deployment; + } + + List local; + try { + local = getLocalKeyPartition(); + deployment = null != local && !local.isEmpty() ? Deployment.LOCAL : Deployment.REMOTE; + } catch (BackendException e) { + throw new RuntimeException(e); + } + return deployment; + } + + @Override + public String toString() { + return getClass().getSimpleName() + tableNamePrefix; + } + + public void dumpOpenManagers() { + int estimatedSize = openManagers.size(); + logger.trace("---- Begin open HBase store manager list ({} managers) ----", estimatedSize); + for (TableStoreStoreManager m : openManagers.keySet()) { + logger.trace("Manager {} opened at:", m, openManagers.get(m)); + } + logger.trace("---- End open HBase store manager list ({} managers) ----", estimatedSize); + } + + @Override + public void close() { + openStores.clear(); + if (logger.isTraceEnabled()) + openManagers.remove(this); + IOUtils.closeQuietly(cnx); + } + + @Override + public StoreFeatures getFeatures() { + + Configuration c = GraphDatabaseConfiguration.buildGraphConfiguration(); + + StandardStoreFeatures.Builder fb = new StandardStoreFeatures.Builder() + .orderedScan(true).unorderedScan(true).batchMutation(true) + .multiQuery(true).distributed(true).keyOrdered(true).storeTTL(true) + .cellTTL(false).timestamps(true).preferredTimestamps(PREFERRED_TIMESTAMPS) + .optimisticLocking(true).keyConsistent(c).localKeyPartition(false); + +// try { +// fb.localKeyPartition(getDeployment() == Deployment.LOCAL); +// } catch (Exception e) { +// logger.warn("Unexpected exception during getDeployment()", e); +// } + + return fb.build(); + } + + @Override + public void mutateMany(Map> mutations, StoreTransaction txh) throws BackendException { + Long putTimestamp = null; + Long delTimestamp = null; + MaskedTimestamp commitTime = null; + if (assignTimestamp) { + commitTime = new MaskedTimestamp(txh); + putTimestamp = commitTime.getAdditionTime(times); + delTimestamp = commitTime.getDeletionTime(times); + } + // In case of an addition and deletion with identical timestamps, the + // deletion tombstone wins. + // https://hbase.apache.org/book/versions.html#d244e4250 + final Map, Delete>>> commandsPerTable = + convertToCommands(mutations, putTimestamp, delTimestamp); + + for (Map.Entry, Delete>>> entry : commandsPerTable.entrySet()) { + TableName tableName = entry.getKey(); + Map, Delete>> commandsPerKey = entry.getValue(); + + final List batch = new ArrayList<>(commandsPerKey.size()); // actual batch operation + + // convert sorted commands into representation required for 'batch' operation + for (Pair, Delete> commands : commandsPerKey.values()) { + if (commands.getFirst() != null && !commands.getFirst().isEmpty()) + batch.addAll(commands.getFirst()); + + if (commands.getSecond() != null) + batch.add(commands.getSecond()); + } + + try { + Table table = null; + + try { + table = cnx.getTable(tableName); + table.batch(batch, new Object[batch.size()]); + } finally { + IOUtils.closeQuietly(table); + } + } catch (IOException | InterruptedException e) { + throw new TemporaryBackendException(e); + } + } + + if (commitTime != null) { + sleepAfterWrite(commitTime); + } + } + + @Override + public KeyColumnValueStore openDatabase(String storeName, StoreMetaData.Container metaData) throws BackendException { + // HBase does not support retrieving cell-level TTL by the client. + Preconditions.checkArgument(!storageConfig.has(GraphDatabaseConfiguration.STORE_META_TTL, storeName) + || !storageConfig.get(GraphDatabaseConfiguration.STORE_META_TTL, storeName)); + + TableStoreKeyColumnValueStore store = openStores.get(storeName); + + if (store == null) { + final String tableNameSuffix = getTableNameSuffixForStoreName(storeName); + TableName tableName = getTableName(tableNameSuffix); + TableStoreKeyColumnValueStore newStore = new TableStoreKeyColumnValueStore(this, cnx, tableName, storeName); + + store = openStores.putIfAbsent(storeName, newStore); // nothing bad happens if we loose to other thread + + if (store == null) { + if (!skipSchemaCheck) { + int tableTTLInSeconds = -1; + if (metaData.contains(StoreMetaData.TTL)) { + tableTTLInSeconds = metaData.get(StoreMetaData.TTL); + } + ensureTableExists(tableName, tableTTLInSeconds); + } + + store = newStore; + } + } + + return store; + } + + @Override + public StoreTransaction beginTransaction(final BaseTransactionConfig config) { + return new TableStoreTransaction(config); + } + + @Override + public String getName() { + return getClass().getSimpleName() + tableNamePrefix; + } + + /** + * Deletes the specified table with all its columns. + * ATTENTION: Invoking this method will delete the table if it exists and therefore causes data loss. + */ + @Override + public void clearStorage() throws BackendException { + String regex = "^" + tableNamePrefix+ "_.+$"; + try (Admin adm = getAdminInterface()) { + adm.deleteTables(regex); + } catch (IOException e) + { + throw new TemporaryBackendException(e); + } + } + + + @Override + public boolean exists() throws BackendException { + String regex = "^" + tableNamePrefix+ "_.+$"; + try (Admin adm = getAdminInterface()) { + TableName[] tableNames = adm.listTableNames(regex); + return null != tableNames && tableNames.length > 0; + } catch (IOException e) + { + throw new TemporaryBackendException(e); + } + } + + @Override + public List getLocalKeyPartition() throws BackendException { + throw new UnsupportedOperationException(); +// List result = new LinkedList<>(); +// try { +// if (skipSchemaCheck) { +// logger.debug("Skipping schema check"); +// if (!exists()) throw new PermanentBackendException("TablePrefix " + tableNamePrefix + " doesn't exist!"); +// } else { +// logger.debug("Performing schema check"); +// +// } +// Map normed = normalizeKeyBounds(getRegionLocations(tableName)); +// +// for (Map.Entry e : normed.entrySet()) { +// if (NetworkUtil.isLocalConnection(e.getValue().getHostname())) { +// result.add(e.getKey()); +// logger.debug("Found local key/row partition {} on host {}", e.getKey(), e.getValue()); +// } else { +// logger.debug("Discarding remote {}", e.getValue()); +// } +// } +// } catch (MasterNotRunningException e) { +// logger.warn("Unexpected MasterNotRunningException", e); +// } catch (ZooKeeperConnectionException e) { +// logger.warn("Unexpected ZooKeeperConnectionException", e); +// } catch (IOException e) { +// logger.warn("Unexpected IOException", e); +// } +// return result; + } + + private List getRegionLocations(TableName tableName) + throws IOException + { + return cnx.getRegionLocator(tableName).getAllRegionLocations(); + } + + /** + * Given a map produced by {@link Connection#getRegionLocator(TableName)}, transform + * each key from an {@link RegionInfo} to a {@link KeyRange} expressing the + * region's start and end key bounds using JanusGraph-partitioning-friendly + * conventions (start inclusive, end exclusive, zero bytes appended where + * necessary to make all keys at least 4 bytes long). + *

+ * This method iterates over the entries in its map parameter and performs + * the following conditional conversions on its keys. "Require" below means + * either a {@link Preconditions} invocation or an assertion. HRegionInfo + * sometimes returns start and end keys of zero length; this method replaces + * zero length keys with null before doing any of the checks described + * below. The parameter map and the values it contains are only read and + * never modified. + * + *

    + *
  • If an entry's HRegionInfo has null start and end keys, then first + * require that the parameter map is a singleton, and then return a + * single-entry map whose {@code KeyRange} has start and end buffers that + * are both four bytes of zeros.
  • + *
  • If the entry has a null end key (but non-null start key), put an + * equivalent entry in the result map with a start key identical to the + * input, except that zeros are appended to values less than 4 bytes long, + * and an end key that is four bytes of zeros. + *
  • If the entry has a null start key (but non-null end key), put an + * equivalent entry in the result map where the start key is four bytes of + * zeros, and the end key has zeros appended, if necessary, to make it at + * least 4 bytes long, after which one is added to the padded value in + * unsigned 32-bit arithmetic with overflow allowed.
  • + *
  • Any entry which matches none of the above criteria results in an + * equivalent entry in the returned map, except that zeros are appended to + * both keys to make each at least 4 bytes long, and the end key is then + * incremented as described in the last bullet point.
  • + *
+ * + * After iterating over the parameter map, this method checks that it either + * saw no entries with null keys, one entry with a null start key and a + * different entry with a null end key, or one entry with both start and end + * keys null. If any null keys are observed besides these three cases, the + * method will die with a precondition failure. + * + * @param locations A list of HRegionInfo + * @return JanusGraph-friendly expression of each region's rowkey boundaries + */ + private Map normalizeKeyBounds(List locations) { + + HRegionLocation nullStart = null; + HRegionLocation nullEnd = null; + + ImmutableMap.Builder b = ImmutableMap.builder(); + + for (HRegionLocation location : locations) { + RegionInfo regionInfo = location.getRegion(); + ServerName serverName = location.getServerName(); + byte[] startKey = regionInfo.getStartKey(); + byte[] endKey = regionInfo.getEndKey(); + + if (0 == startKey.length) { + startKey = null; + logger.trace("Converted zero-length HBase startKey byte array to null"); + } + + if (0 == endKey.length) { + endKey = null; + logger.trace("Converted zero-length HBase endKey byte array to null"); + } + + if (null == startKey && null == endKey) { + Preconditions.checkState(1 == locations.size()); + logger.debug("HBase table {} has a single region {}", tableNamePrefix, regionInfo); + // Choose arbitrary shared value = startKey = endKey + return b.put(new KeyRange(FOUR_ZERO_BYTES, FOUR_ZERO_BYTES), serverName).build(); + } else if (null == startKey) { + logger.debug("Found HRegionInfo with null startKey on server {}: {}", serverName, regionInfo); + Preconditions.checkState(null == nullStart); + nullStart = location; + // I thought endBuf would be inclusive from the HBase javadoc, but in practice it is exclusive + StaticBuffer endBuf = StaticArrayBuffer.of(zeroExtend(endKey)); + // Replace null start key with zeroes + b.put(new KeyRange(FOUR_ZERO_BYTES, endBuf), serverName); + } else if (null == endKey) { + logger.debug("Found HRegionInfo with null endKey on server {}: {}", serverName, regionInfo); + Preconditions.checkState(null == nullEnd); + nullEnd = location; + // Replace null end key with zeroes + b.put(new KeyRange(StaticArrayBuffer.of(zeroExtend(startKey)), FOUR_ZERO_BYTES), serverName); + } else { + // Convert HBase's inclusive end keys into exclusive JanusGraph end keys + StaticBuffer startBuf = StaticArrayBuffer.of(zeroExtend(startKey)); + StaticBuffer endBuf = StaticArrayBuffer.of(zeroExtend(endKey)); + + KeyRange kr = new KeyRange(startBuf, endBuf); + b.put(kr, serverName); + logger.debug("Found HRegionInfo with non-null end and start keys on server {}: {}", serverName, regionInfo); + } + } + + // Require either no null key bounds or a pair of them + Preconditions.checkState((null == nullStart) == (null == nullEnd)); + + // Check that every key in the result is at least 4 bytes long + Map result = b.build(); + for (KeyRange kr : result.keySet()) { + Preconditions.checkState(4 <= kr.getStart().length()); + Preconditions.checkState(4 <= kr.getEnd().length()); + } + + return result; + } + + /** + * If the parameter is shorter than 4 bytes, then create and return a new 4 + * byte array with the input array's bytes followed by zero bytes. Otherwise + * return the parameter. + * + * @param dataToPad non-null but possibly zero-length byte array + * @return either the parameter or a new array + */ + private byte[] zeroExtend(byte[] dataToPad) { + assert null != dataToPad; + + final int targetLength = 4; + + if (targetLength <= dataToPad.length) + return dataToPad; + + byte[] padded = new byte[targetLength]; + + System.arraycopy(dataToPad, 0, padded, 0, dataToPad.length); + + for (int i = dataToPad.length; i < padded.length; i++) + padded[i] = (byte)0; + + return padded; + } + + public static String shortenCfName(BiMap shortCfNameMap, String longName) throws PermanentBackendException { + final String s; + if (shortCfNameMap.containsKey(longName)) { + s = shortCfNameMap.get(longName); + Preconditions.checkNotNull(s); + logger.debug("Substituted default CF name \"{}\" with short form \"{}\" to reduce HBase KeyValue size", longName, s); + } else { + if (shortCfNameMap.containsValue(longName)) { + String fmt = "Must use CF long-form name \"%s\" instead of the short-form name \"%s\" when configured with %s=true"; + String msg = String.format(fmt, shortCfNameMap.inverse().get(longName), longName, SHORT_CF_NAMES.getName()); + throw new PermanentBackendException(msg); + } + s = longName; + logger.debug("Kept default CF name \"{}\" because it has no associated short form", s); + } + return s; + } + + private TableDescriptor ensureTableExists(TableName tableName, int ttlInSeconds) throws BackendException { + Admin adm = null; + + TableDescriptor desc; + + try { // Create our table, if necessary + adm = getAdminInterface(); + + if (adm.tableExists(tableName)) { + desc = adm.getDescriptor(tableName); + } else { + desc = createTable(tableName, ttlInSeconds, adm); + } + } catch (IOException e) { + throw new TemporaryBackendException(e); + } finally { + IOUtils.closeQuietly(adm); + } + + return desc; + } + + private TableDescriptor createTable(TableName tableName, int ttlInSeconds, Admin adm) throws IOException { + + TableDescriptorBuilder desc = TableDescriptorBuilder.newBuilder(tableName); + ColumnFamilyDescriptorBuilder columnDescriptor = ColumnFamilyDescriptorBuilder.newBuilder(defaultColumnFamilyNameBytes); + setCFOptions(columnDescriptor, ttlInSeconds); + desc.setColumnFamily(columnDescriptor.build()); + TableDescriptor td = desc.build(); + + int count; // total regions to create + String src; + + if (MIN_REGION_COUNT <= (count = regionCount)) { + src = "region count configuration"; + } else if (0 < regionsPerServer && + MIN_REGION_COUNT <= (count = regionsPerServer * getEstimatedRegionServerCount(adm))) { + src = "ClusterStatus server count"; + } else { + count = -1; + src = "default"; + } + + if (MIN_REGION_COUNT < count) { + adm.createTable(td, getStartKey(count), getEndKey(count), count); + logger.debug("Created table {} with region count {} from {}", tableName, count, src); + } else { + adm.createTable(td); + logger.debug("Created table {} with default start key, end key, and region count", tableName); + } + + return td; + } + + private int getEstimatedRegionServerCount(Admin adm) + { + int serverCount = -1; + try { + serverCount = adm.getRegionServers().size(); + logger.debug("Read {} servers from HBase ClusterStatus", serverCount); + } catch (IOException e) { + logger.debug("Unable to retrieve HBase cluster status", e); + } + return serverCount; + } + + /** + * This method generates the second argument to + * {@link Admin#createTable(TableDescriptor, byte[], byte[], int)} + *

+ * From the {@code createTable} javadoc: + * "The start key specified will become the end key of the first region of + * the table, and the end key specified will become the start key of the + * last region of the table (the first region has a null start key and + * the last region has a null end key)" + *

+ * To summarize, the {@code createTable} argument called "startKey" is + * actually the end key of the first region. + */ + private byte[] getStartKey(int regionCount) { + ByteBuffer regionWidth = ByteBuffer.allocate(4); + regionWidth.putInt((int)(((1L << 32) - 1L) / regionCount)).flip(); + return StaticArrayBuffer.of(regionWidth).getBytes(0, 4); + } + + /** + * Companion to {@link #getStartKey(int)}. See its javadoc for details. + */ + private byte[] getEndKey(int regionCount) { + ByteBuffer regionWidth = ByteBuffer.allocate(4); + regionWidth.putInt((int)(((1L << 32) - 1L) / regionCount * (regionCount - 1))).flip(); + return StaticArrayBuffer.of(regionWidth).getBytes(0, 4); + } + + private void setCFOptions(ColumnFamilyDescriptorBuilder columnDescriptor, int ttlInSeconds) { + if (null != compression && !compression.equals(COMPRESSION_DEFAULT)) + columnDescriptor.setCompressionType(Compression.Algorithm.valueOf(compression)); + + if (ttlInSeconds > 0) + columnDescriptor.setTimeToLive(ttlInSeconds); + } + + /** + * Convert JanusGraph internal Mutation representation into HBase native commands. + * + * @param mutations Mutations to convert into HBase commands. + * @param putTimestamp The timestamp to use for Put commands. + * @param delTimestamp The timestamp to use for Delete commands. + * @return Commands sorted by key converted from JanusGraph internal representation. + * @throws org.janusgraph.diskstorage.PermanentBackendException + */ + @VisibleForTesting + Map, Delete>>> convertToCommands(Map> mutations, + final Long putTimestamp, + final Long delTimestamp) throws PermanentBackendException { + // A map of rowkey to commands (list of Puts, Delete) + + Map, Delete>>> commandsPerTable = new HashMap<>(); + for (Map.Entry> entry : mutations.entrySet()) { + final Map, Delete>> commandsPerKey = new HashMap<>(); + String tableNameSuffix = getTableNameSuffixForStoreName(entry.getKey()); + TableName tableName = getTableName(tableNameSuffix); + for (Map.Entry m : entry.getValue().entrySet()) { + final byte[] key = m.getKey().as(StaticBuffer.ARRAY_FACTORY); + KCVMutation mutation = m.getValue(); + + Pair, Delete> commands = commandsPerKey.get(m.getKey()); + + // The first time we go through the list of input , + // create the holder for a particular rowkey + if (commands == null) { + commands = new Pair<>(); + // List of all the Puts for this rowkey, including the ones without TTL and with TTL. + final List putList = new ArrayList<>(); + commands.setFirst(putList); + commandsPerKey.put(m.getKey(), commands); + } + + if (mutation.hasDeletions()) { + if (commands.getSecond() == null) { + Delete d = new Delete(key); + if (delTimestamp != null) { + d.setTimestamp(delTimestamp); + } + commands.setSecond(d); + } + + for (StaticBuffer b : mutation.getDeletions()) { + // commands.getSecond() is a Delete for this rowkey. + addColumnToDelete(commands.getSecond(), b.as(StaticBuffer.ARRAY_FACTORY), delTimestamp); + } + } + + if (mutation.hasAdditions()) { + // All the entries (column cells) with the rowkey use this one Put, except the ones with TTL. + final Put putColumnsWithoutTtl = putTimestamp != null ? new Put(key, putTimestamp) : new Put(key); + // At the end of this loop, there will be one Put entry in the commands.getFirst() list that + // contains all additions without TTL set, and possible multiple Put entries for columns + // that have TTL set. + for (Entry e : mutation.getAdditions()) { + + // Deal with TTL within the entry (column cell) first + // HBase cell level TTL is actually set at the Mutation/Put level. + // Therefore we need to construct a new Put for each entry (column cell) with TTL. + // We can not combine them because column cells within the same rowkey may: + // 1. have no TTL + // 2. have TTL + // 3. have different TTL + final Integer ttl = (Integer) e.getMetaData().get(EntryMetaData.TTL); + if (null != ttl && ttl > 0) { + // Create a new Put + Put putColumnWithTtl = putTimestamp != null ? new Put(key, putTimestamp) : new Put(key); + addColumnToPut(putColumnWithTtl, putTimestamp, e); + // Convert ttl from second (JanusGraph TTL) to milliseconds (HBase TTL) + // @see JanusGraphManagement#setTTL(JanusGraphSchemaType, Duration) + // HBase supports cell-level TTL for versions 0.98.6 and above. + (putColumnWithTtl).setTTL(TimeUnit.SECONDS.toMillis((long)ttl)); + // commands.getFirst() is the list of Puts for this rowkey. Add this + // Put column with TTL to the list. + commands.getFirst().add(putColumnWithTtl); + } else { + addColumnToPut(putColumnsWithoutTtl, putTimestamp, e); + } + } + // If there were any mutations without TTL set, add them to commands.getFirst() + if (!putColumnsWithoutTtl.isEmpty()) { + commands.getFirst().add(putColumnsWithoutTtl); + } + } + } + commandsPerTable.put(tableName,commandsPerKey); + } + + return commandsPerTable; + } + + private void addColumnToDelete(Delete d, byte[] qualifier, Long delTimestamp) { + if (delTimestamp != null) { + d.addColumns(defaultColumnFamilyNameBytes, qualifier, delTimestamp); + } else { + d.addColumns(defaultColumnFamilyNameBytes, qualifier); + } + } + + private void addColumnToPut(Put p, Long putTimestamp, Entry e) { + final byte[] qualifier = e.getColumnAs(StaticBuffer.ARRAY_FACTORY); + final byte[] value = e.getValueAs(StaticBuffer.ARRAY_FACTORY); + if (putTimestamp != null) { + p.addColumn(defaultColumnFamilyNameBytes, qualifier, putTimestamp, value); + } else { + p.addColumn(defaultColumnFamilyNameBytes, qualifier, value); + } + } + + private TableName getTableName(String tableNameSuffix) { + String tableNameStr = tableNamePrefix + "_" + tableNameSuffix; + return TableName.valueOf(tableNameStr); + } + + private String getTableNameSuffixForStoreName(String storeName) throws PermanentBackendException { + return shortCfNames ? shortenCfName(shortCfNameMap, storeName) : storeName; + } + + private Admin getAdminInterface() { + try { + return cnx.getAdmin(); + } catch (IOException e) { + throw new JanusGraphException(e); + } + } + + private String determineTableNamePrefix(Configuration config) { + if ((!config.has(HBASE_TABLE)) && (config.has(GRAPH_NAME))) { + return config.get(GRAPH_NAME); + } + return config.get(HBASE_TABLE); + } + + @VisibleForTesting + protected org.apache.hadoop.conf.Configuration getHBaseConf() { + return hconf; + } + +// @Override +// public Object getHadoopManager() { +// return new HBaseHadoopStoreManager(); +// } +} + + diff --git a/janusgraph-tablestore/src/main/java/org/janusgraph/diskstorage/tablestore/TableStoreTransaction.java b/janusgraph-tablestore/src/main/java/org/janusgraph/diskstorage/tablestore/TableStoreTransaction.java new file mode 100644 index 0000000000..711f8828d7 --- /dev/null +++ b/janusgraph-tablestore/src/main/java/org/janusgraph/diskstorage/tablestore/TableStoreTransaction.java @@ -0,0 +1,33 @@ +// Copyright 2017 JanusGraph Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.janusgraph.diskstorage.tablestore; + +import org.janusgraph.diskstorage.BaseTransactionConfig; +import org.janusgraph.diskstorage.common.AbstractStoreTransaction; + +/** + * This class overrides and adds nothing compared with + * {@link org.janusgraph.diskstorage.locking.consistentkey.ExpectedValueCheckingTransaction}; however, it creates a transaction type specific + * to HBase, which lets us check for user errors like passing a Cassandra + * transaction into a HBase method. + * + * @author Dan LaRocque <dalaro@hopcount.org> + */ +public class TableStoreTransaction extends AbstractStoreTransaction { + + public TableStoreTransaction(final BaseTransactionConfig config) { + super(config); + } +} diff --git a/janusgraph-tablestore/src/test/java/org/janusgraph/TableStoreContainer.java b/janusgraph-tablestore/src/test/java/org/janusgraph/TableStoreContainer.java new file mode 100644 index 0000000000..5b64eb00e6 --- /dev/null +++ b/janusgraph-tablestore/src/test/java/org/janusgraph/TableStoreContainer.java @@ -0,0 +1,205 @@ +// Copyright 2021 JanusGraph Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.janusgraph; + +import org.apache.commons.configuration2.PropertiesConfiguration; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.janusgraph.core.JanusGraphException; +import org.janusgraph.diskstorage.BackendException; +import org.janusgraph.diskstorage.TemporaryBackendException; +import org.janusgraph.diskstorage.configuration.BasicConfiguration; +import org.janusgraph.diskstorage.configuration.ModifiableConfiguration; +import org.janusgraph.diskstorage.configuration.WriteConfiguration; +import org.janusgraph.diskstorage.configuration.backend.CommonsConfiguration; +import org.janusgraph.diskstorage.tablestore.TableStoreStoreManager; +import org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration; +import org.janusgraph.graphdb.database.idassigner.placement.SimpleBulkPlacementStrategy; +import org.janusgraph.util.system.ConfigurationUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.BindMode; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.images.builder.ImageFromDockerfile; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; + +import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.ROOT_NS; + +public class TableStoreContainer extends GenericContainer { + private static final Logger logger = LoggerFactory.getLogger(TableStoreContainer.class); + + public static final String HBASE_TARGET_DIR = "test.hbase.targetdir"; + public static final String HBASE_DOCKER_PATH = "janusgraph-tablestore/docker"; + private static final String DEFAULT_VERSION = "2.6.0"; + private static final String DEFAULT_UID = "1000"; + private static final String DEFAULT_GID = "1000"; + + private static String getVersion() { + String property = System.getProperty("hbase.docker.version"); + if (StringUtils.isNotEmpty(property)) + return property; + return DEFAULT_VERSION; + } + + private static Path getPath() { + try { + Path path = Paths.get(".").toRealPath(); + if (path.getParent().endsWith("janusgraph")) { + path = Paths.get(path.toString(), "..").toRealPath(); + } + return Paths.get(path.toString(), getRelativePath()); + } catch (IOException ex) { + throw new JanusGraphException(ex); + } + } + + private static String getRelativePath() { + String property = System.getProperty("hbase.docker.path"); + if (StringUtils.isNotEmpty(property)) + return property; + return HBASE_DOCKER_PATH; + } + + private static String getUid() { + String property = System.getProperty("hbase.docker.uid"); + if (StringUtils.isNotEmpty(property)) + return property; + return DEFAULT_UID; + } + + private static String getGid() { + String property = System.getProperty("hbase.docker.gid"); + if (StringUtils.isNotEmpty(property)) + return property; + return DEFAULT_GID; + } + + private static String getTargetDir() { + String property = System.getProperty(HBASE_TARGET_DIR); + if (StringUtils.isNotEmpty(property)) + return property; + return Paths.get(System.getProperty("user.dir"), "target").toString(); + } + + public TableStoreContainer() { + this(false); + } + + public TableStoreContainer(boolean mountRoot) { + super(new ImageFromDockerfile() + .withFileFromPath(".", getPath()) + .withBuildArg("HBASE_VERSION", getVersion()) + .withBuildArg("HBASE_UID", getUid()) + .withBuildArg("HBASE_GID", getGid())); + addFixedExposedPort(2181, 2182); + addFixedExposedPort(16000, 16000); + addFixedExposedPort(16010, 16010); + addFixedExposedPort(16020, 16020); + addFixedExposedPort(16030, 16030); + + if (mountRoot) { + try { + Files.createDirectories(getHBaseRootDir()); + } catch (IOException e) { + logger.warn("failed to create folder", e); + throw new JanusGraphException(e); + } + addFileSystemBind(getHBaseRootDir().toString(), "/data/hbase", BindMode.READ_WRITE); + } + + withCreateContainerCmdModifier(createContainerCmd -> { + createContainerCmd + .withHostName("localhost"); + }); + waitingFor(Wait.forLogMessage(".*Master has completed initialization.*", 1)); + } + + public Path getHBaseRootDir() { + return Paths.get(getTargetDir(), "hbase-root"); + } + + private Connection createConnection() throws IOException { + Configuration entries = HBaseConfiguration.create(); + entries.set("hbase.zookeeper.quorum", "localhost"); + return ConnectionFactory.createConnection(entries); + } + + /** + * Create a snapshot for a table. + * + * @param snapshotName + * @param table + * @throws BackendException + */ + public synchronized void createSnapshot(String snapshotName, String table) + throws BackendException { + try (Connection hc = createConnection(); Admin admin = hc.getAdmin()) { + admin.snapshot(snapshotName, TableName.valueOf(table)); + } catch (Exception e) { + logger.warn("Create HBase snapshot failed", e); + throw new TemporaryBackendException("Create HBase snapshot failed", e); + } + } + + /** + * Delete a snapshot. + * + * @param snapshotName + * @throws IOException + */ + public synchronized void deleteSnapshot(String snapshotName) throws IOException { + try (Connection hc = createConnection(); Admin admin = hc.getAdmin()) { + admin.deleteSnapshot(snapshotName); + } + } + + public WriteConfiguration getWriteConfiguration() { + return getModifiableConfiguration().getConfiguration(); + } + + public ModifiableConfiguration getModifiableConfiguration() { + return getNamedConfiguration(null, null); + } + + public ModifiableConfiguration getNamedConfiguration(String tableName, String graphName) { + ModifiableConfiguration config; + try { + PropertiesConfiguration cc = ConfigurationUtil.loadPropertiesConfig("target/test-classes/tablestore.properties"); + CommonsConfiguration commonsConfiguration = new CommonsConfiguration(cc); + config = new ModifiableConfiguration(ROOT_NS, commonsConfiguration, BasicConfiguration.Restriction.NONE); + config.set(GraphDatabaseConfiguration.STORAGE_BACKEND, "tablestore"); + if (StringUtils.isNotEmpty(tableName)) config.set(TableStoreStoreManager.HBASE_TABLE, tableName); + if (StringUtils.isNotEmpty(graphName)) config.set(GraphDatabaseConfiguration.GRAPH_NAME, graphName); + config.set(GraphDatabaseConfiguration.TIMESTAMP_PROVIDER, TableStoreStoreManager.PREFERRED_TIMESTAMPS); + config.set(GraphDatabaseConfiguration.STORAGE_HOSTS, new String[]{"localhost"}); + config.set(SimpleBulkPlacementStrategy.CONCURRENT_PARTITIONS, 1); + config.set(GraphDatabaseConfiguration.DROP_ON_CLEAR, false); + return config; + }catch (Exception e) { + logger.error("Failed to load configuration", e); + } + return null; + } +} diff --git a/janusgraph-tablestore/src/test/java/org/janusgraph/blueprints/TableStoreGraphComputerProvider.java b/janusgraph-tablestore/src/test/java/org/janusgraph/blueprints/TableStoreGraphComputerProvider.java new file mode 100644 index 0000000000..0f5489e36c --- /dev/null +++ b/janusgraph-tablestore/src/test/java/org/janusgraph/blueprints/TableStoreGraphComputerProvider.java @@ -0,0 +1,34 @@ +// Copyright 2017 JanusGraph Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.janusgraph.blueprints; + +import org.apache.tinkerpop.gremlin.GraphProvider; +import org.janusgraph.diskstorage.configuration.ModifiableConfiguration; +import org.janusgraph.graphdb.olap.computer.FulgoraGraphComputer; + +/** + * @author Matthias Broecheler (me@matthiasb.com) + */ +@GraphProvider.Descriptor(computer = FulgoraGraphComputer.class) +public class TableStoreGraphComputerProvider extends AbstractJanusGraphComputerProvider { + + @Override + public ModifiableConfiguration getJanusGraphConfiguration(String graphName, Class test, String testMethodName) { + ModifiableConfiguration config = super.getJanusGraphConfiguration(graphName, test, testMethodName); + config.setAll(TableStoreGraphProvider.HBASE_CONTAINER.getNamedConfiguration(graphName, "").getAll()); + return config; + } + +} diff --git a/janusgraph-tablestore/src/test/java/org/janusgraph/blueprints/TableStoreGraphProvider.java b/janusgraph-tablestore/src/test/java/org/janusgraph/blueprints/TableStoreGraphProvider.java new file mode 100644 index 0000000000..f43eef0e89 --- /dev/null +++ b/janusgraph-tablestore/src/test/java/org/janusgraph/blueprints/TableStoreGraphProvider.java @@ -0,0 +1,66 @@ +// Copyright 2017 JanusGraph Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.janusgraph.blueprints; + +import org.janusgraph.TableStoreContainer; +import org.janusgraph.diskstorage.configuration.ModifiableConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.ServerSocket; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +/** + * @author Matthias Broecheler (me@matthiasb.com) + */ +public class TableStoreGraphProvider extends AbstractJanusGraphProvider { + private static final Logger logger = LoggerFactory.getLogger(TableStoreGraphProvider.class); + public static final TableStoreContainer HBASE_CONTAINER; + + static { + waitForBindablePort(2181); + waitForBindablePort(16000); + HBASE_CONTAINER = new TableStoreContainer(); + HBASE_CONTAINER.start(); + } + + private static void waitForBindablePort(int port) { + boolean canBindPort = false; + do { + try (ServerSocket serverSocket = new ServerSocket(port)) { + assertNotNull(serverSocket); + assertEquals(serverSocket.getLocalPort(), port); + serverSocket.close(); + canBindPort = true; + continue; + } catch (IOException e) { + logger.warn("can't bind port", e); + } + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + logger.warn("can't sleep", e); + } + } while (!canBindPort); + } + + @Override + public ModifiableConfiguration getJanusGraphConfiguration(String graphName, Class test, String testMethodName) { + return HBASE_CONTAINER.getNamedConfiguration(graphName, ""); + } +} diff --git a/janusgraph-tablestore/src/test/java/org/janusgraph/blueprints/TableStoreMultiQueryGraphProvider.java b/janusgraph-tablestore/src/test/java/org/janusgraph/blueprints/TableStoreMultiQueryGraphProvider.java new file mode 100644 index 0000000000..fb26a37f8d --- /dev/null +++ b/janusgraph-tablestore/src/test/java/org/janusgraph/blueprints/TableStoreMultiQueryGraphProvider.java @@ -0,0 +1,29 @@ +// Copyright 2017 JanusGraph Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.janusgraph.blueprints; + +import org.janusgraph.diskstorage.configuration.ModifiableConfiguration; +import org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration; + +/** + * @author Ted Wilmes (twilmes@gmail.com) + */ +public class TableStoreMultiQueryGraphProvider extends AbstractJanusGraphProvider { + @Override + public ModifiableConfiguration getJanusGraphConfiguration(String graphName, Class test, String testMethodName) { + return TableStoreGraphProvider.HBASE_CONTAINER.getNamedConfiguration(graphName, "") + .set(GraphDatabaseConfiguration.USE_MULTIQUERY, true); + } +} diff --git a/janusgraph-tablestore/src/test/java/org/janusgraph/blueprints/process/TableStoreComputerTest.java b/janusgraph-tablestore/src/test/java/org/janusgraph/blueprints/process/TableStoreComputerTest.java new file mode 100644 index 0000000000..0f02c96308 --- /dev/null +++ b/janusgraph-tablestore/src/test/java/org/janusgraph/blueprints/process/TableStoreComputerTest.java @@ -0,0 +1,32 @@ +// Copyright 2017 JanusGraph Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.janusgraph.blueprints.process; + +import org.apache.tinkerpop.gremlin.GraphProviderClass; +import org.apache.tinkerpop.gremlin.process.ProcessComputerSuite; +import org.janusgraph.blueprints.TableStoreGraphComputerProvider; +import org.janusgraph.core.JanusGraph; +import org.junit.runner.RunWith; + +/** + * @author Matthias Broecheler (me@matthiasb.com) + */ +@RunWith(ProcessComputerSuite.class) +@GraphProviderClass(provider = TableStoreGraphComputerProvider.class, graph = JanusGraph.class) +public class TableStoreComputerTest { + + // TP3 ignores @BeforeAll + +} diff --git a/janusgraph-tablestore/src/test/java/org/janusgraph/blueprints/process/TableStoreMultiQueryProcessTest.java b/janusgraph-tablestore/src/test/java/org/janusgraph/blueprints/process/TableStoreMultiQueryProcessTest.java new file mode 100644 index 0000000000..75af830e22 --- /dev/null +++ b/janusgraph-tablestore/src/test/java/org/janusgraph/blueprints/process/TableStoreMultiQueryProcessTest.java @@ -0,0 +1,29 @@ +// Copyright 2017 JanusGraph Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.janusgraph.blueprints.process; + +import org.apache.tinkerpop.gremlin.GraphProviderClass; +import org.apache.tinkerpop.gremlin.process.ProcessStandardSuite; +import org.janusgraph.blueprints.TableStoreMultiQueryGraphProvider; +import org.janusgraph.core.JanusGraph; +import org.junit.runner.RunWith; + +/** + * @author Ted Wilmese (twilmes@gmail.com) + */ +@RunWith(ProcessStandardSuite.class) +@GraphProviderClass(provider = TableStoreMultiQueryGraphProvider.class, graph = JanusGraph.class) +public class TableStoreMultiQueryProcessTest { +} diff --git a/janusgraph-tablestore/src/test/java/org/janusgraph/blueprints/process/TableStoreProcessTest.java b/janusgraph-tablestore/src/test/java/org/janusgraph/blueprints/process/TableStoreProcessTest.java new file mode 100644 index 0000000000..65ed63332b --- /dev/null +++ b/janusgraph-tablestore/src/test/java/org/janusgraph/blueprints/process/TableStoreProcessTest.java @@ -0,0 +1,29 @@ +// Copyright 2017 JanusGraph Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.janusgraph.blueprints.process; + +import org.apache.tinkerpop.gremlin.GraphProviderClass; +import org.apache.tinkerpop.gremlin.process.ProcessStandardSuite; +import org.janusgraph.blueprints.TableStoreGraphProvider; +import org.janusgraph.core.JanusGraph; +import org.junit.runner.RunWith; + +/** + * @author Matthias Broecheler (me@matthiasb.com) + */ +@RunWith(ProcessStandardSuite.class) +@GraphProviderClass(provider = TableStoreGraphProvider.class, graph = JanusGraph.class) +public class TableStoreProcessTest { +} diff --git a/janusgraph-tablestore/src/test/java/org/janusgraph/blueprints/structure/TableStoreStructureTest.java b/janusgraph-tablestore/src/test/java/org/janusgraph/blueprints/structure/TableStoreStructureTest.java new file mode 100644 index 0000000000..9b8fb49f80 --- /dev/null +++ b/janusgraph-tablestore/src/test/java/org/janusgraph/blueprints/structure/TableStoreStructureTest.java @@ -0,0 +1,29 @@ +// Copyright 2017 JanusGraph Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.janusgraph.blueprints.structure; + +import org.apache.tinkerpop.gremlin.GraphProviderClass; +import org.apache.tinkerpop.gremlin.structure.StructureStandardSuite; +import org.janusgraph.blueprints.TableStoreGraphProvider; +import org.janusgraph.core.JanusGraph; +import org.junit.runner.RunWith; + +/** + * @author Matthias Broecheler (me@matthiasb.com) + */ +@RunWith(StructureStandardSuite.class) +@GraphProviderClass(provider = TableStoreGraphProvider.class, graph = JanusGraph.class) +public class TableStoreStructureTest { +} diff --git a/janusgraph-tablestore/src/test/java/org/janusgraph/diskstorage/tablestore/TableStoreDistributedStoreManagerTest.java b/janusgraph-tablestore/src/test/java/org/janusgraph/diskstorage/tablestore/TableStoreDistributedStoreManagerTest.java new file mode 100644 index 0000000000..0bab08573f --- /dev/null +++ b/janusgraph-tablestore/src/test/java/org/janusgraph/diskstorage/tablestore/TableStoreDistributedStoreManagerTest.java @@ -0,0 +1,41 @@ +// Copyright 2017 JanusGraph Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.janusgraph.diskstorage.tablestore; + +import org.janusgraph.TableStoreContainer; +import org.janusgraph.diskstorage.BackendException; +import org.janusgraph.diskstorage.DistributedStoreManagerTest; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +@Testcontainers +public class TableStoreDistributedStoreManagerTest extends DistributedStoreManagerTest { + @Container + public static final TableStoreContainer tableStoreContainer = new TableStoreContainer(); + + @BeforeEach + public void setUp() throws BackendException { + manager = new TableStoreStoreManager(tableStoreContainer.getModifiableConfiguration()); + store = manager.openDatabase("distributedStoreTest"); + } + + @AfterEach + public void tearDown() throws BackendException { + store.close(); + manager.close(); + } +} diff --git a/janusgraph-tablestore/src/test/java/org/janusgraph/diskstorage/tablestore/TableStoreIDAuthorityTest.java b/janusgraph-tablestore/src/test/java/org/janusgraph/diskstorage/tablestore/TableStoreIDAuthorityTest.java new file mode 100644 index 0000000000..8ff450d019 --- /dev/null +++ b/janusgraph-tablestore/src/test/java/org/janusgraph/diskstorage/tablestore/TableStoreIDAuthorityTest.java @@ -0,0 +1,33 @@ +// Copyright 2017 JanusGraph Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.janusgraph.diskstorage.tablestore; + +import org.janusgraph.TableStoreContainer; +import org.janusgraph.diskstorage.BackendException; +import org.janusgraph.diskstorage.IDAuthorityTest; +import org.janusgraph.diskstorage.keycolumnvalue.KeyColumnValueStoreManager; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + + +@Testcontainers +public class TableStoreIDAuthorityTest extends IDAuthorityTest { + @Container + public static final TableStoreContainer tableStoreContainer = new TableStoreContainer(); + + public KeyColumnValueStoreManager openStorageManager() throws BackendException { + return new TableStoreStoreManager(tableStoreContainer.getModifiableConfiguration()); + } +} diff --git a/janusgraph-tablestore/src/test/java/org/janusgraph/diskstorage/tablestore/TableStoreLockStoreTest.java b/janusgraph-tablestore/src/test/java/org/janusgraph/diskstorage/tablestore/TableStoreLockStoreTest.java new file mode 100644 index 0000000000..5a683e39cf --- /dev/null +++ b/janusgraph-tablestore/src/test/java/org/janusgraph/diskstorage/tablestore/TableStoreLockStoreTest.java @@ -0,0 +1,33 @@ +// Copyright 2017 JanusGraph Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.janusgraph.diskstorage.tablestore; + +import org.janusgraph.TableStoreContainer; +import org.janusgraph.diskstorage.BackendException; +import org.janusgraph.diskstorage.LockKeyColumnValueStoreTest; +import org.janusgraph.diskstorage.configuration.Configuration; +import org.janusgraph.diskstorage.keycolumnvalue.KeyColumnValueStoreManager; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +@Testcontainers +public class TableStoreLockStoreTest extends LockKeyColumnValueStoreTest { + @Container + public static final TableStoreContainer tableStoreContainer = new TableStoreContainer(); + + public KeyColumnValueStoreManager openStorageManager(int idx, Configuration configuration) throws BackendException { + return new TableStoreStoreManager(tableStoreContainer.getModifiableConfiguration()); + } +} diff --git a/janusgraph-tablestore/src/test/java/org/janusgraph/diskstorage/tablestore/TableStoreLogTest.java b/janusgraph-tablestore/src/test/java/org/janusgraph/diskstorage/tablestore/TableStoreLogTest.java new file mode 100644 index 0000000000..4c8e014651 --- /dev/null +++ b/janusgraph-tablestore/src/test/java/org/janusgraph/diskstorage/tablestore/TableStoreLogTest.java @@ -0,0 +1,34 @@ +// Copyright 2017 JanusGraph Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.janusgraph.diskstorage.tablestore; + +import org.janusgraph.TableStoreContainer; +import org.janusgraph.diskstorage.BackendException; +import org.janusgraph.diskstorage.keycolumnvalue.KeyColumnValueStoreManager; +import org.janusgraph.diskstorage.log.KCVSLogTest; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +@Testcontainers +public class TableStoreLogTest extends KCVSLogTest { + @Container + public static final TableStoreContainer tableStoreContainer = new TableStoreContainer(); + + @Override + public KeyColumnValueStoreManager openStorageManager() throws BackendException { + return new TableStoreStoreManager(tableStoreContainer.getModifiableConfiguration()); + } + +} diff --git a/janusgraph-tablestore/src/test/java/org/janusgraph/diskstorage/tablestore/TableStoreMultiWriteStoreTest.java b/janusgraph-tablestore/src/test/java/org/janusgraph/diskstorage/tablestore/TableStoreMultiWriteStoreTest.java new file mode 100644 index 0000000000..90ad5a9b4d --- /dev/null +++ b/janusgraph-tablestore/src/test/java/org/janusgraph/diskstorage/tablestore/TableStoreMultiWriteStoreTest.java @@ -0,0 +1,32 @@ +// Copyright 2017 JanusGraph Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.janusgraph.diskstorage.tablestore; + +import org.janusgraph.TableStoreContainer; +import org.janusgraph.diskstorage.BackendException; +import org.janusgraph.diskstorage.MultiWriteKeyColumnValueStoreTest; +import org.janusgraph.diskstorage.keycolumnvalue.KeyColumnValueStoreManager; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +@Testcontainers +public class TableStoreMultiWriteStoreTest extends MultiWriteKeyColumnValueStoreTest { + @Container + public static final TableStoreContainer _tableStoreContainer = new TableStoreContainer(); + + public KeyColumnValueStoreManager openStorageManager() throws BackendException { + return new TableStoreStoreManager(_tableStoreContainer.getModifiableConfiguration()); + } +} diff --git a/janusgraph-tablestore/src/test/java/org/janusgraph/diskstorage/tablestore/TableStoreStoreManagerConfigTest.java b/janusgraph-tablestore/src/test/java/org/janusgraph/diskstorage/tablestore/TableStoreStoreManagerConfigTest.java new file mode 100644 index 0000000000..0e5059ccac --- /dev/null +++ b/janusgraph-tablestore/src/test/java/org/janusgraph/diskstorage/tablestore/TableStoreStoreManagerConfigTest.java @@ -0,0 +1,149 @@ +// Copyright 2017 JanusGraph Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.janusgraph.diskstorage.tablestore; + +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.core.Appender; +import org.apache.logging.log4j.core.appender.WriterAppender; +import org.apache.logging.log4j.core.filter.LevelMatchFilter; +import org.apache.logging.log4j.core.layout.PatternLayout; +import org.janusgraph.TableStoreContainer; +import org.janusgraph.diskstorage.BackendException; +import org.janusgraph.diskstorage.PermanentBackendException; +import org.janusgraph.diskstorage.common.DistributedStoreManager; +import org.janusgraph.diskstorage.configuration.BasicConfiguration; +import org.janusgraph.diskstorage.configuration.ConfigElement; +import org.janusgraph.diskstorage.configuration.ModifiableConfiguration; +import org.janusgraph.diskstorage.configuration.WriteConfiguration; +import org.janusgraph.diskstorage.keycolumnvalue.KeyColumnValueStore; +import org.janusgraph.diskstorage.util.time.TimestampProviders; +import org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration; +import org.janusgraph.graphdb.configuration.builder.GraphDatabaseConfigurationBuilder; +import org.junit.jupiter.api.Test; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +import java.io.StringWriter; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +@Testcontainers +public class TableStoreStoreManagerConfigTest { + @Container + public static final TableStoreContainer tableStoreContainer = new TableStoreContainer(); + + @Test + public void testShortCfNames() throws Exception { + org.apache.logging.log4j.core.Logger log = (org.apache.logging.log4j.core.Logger)LogManager.getLogger(TableStoreStoreManager.class); + StringWriter writer = new StringWriter(); + Appender appender = WriterAppender.createAppender(PatternLayout.newBuilder().withPattern("%p: %m%n").build(), LevelMatchFilter.newBuilder().setLevel(Level.WARN).build(), writer, "test", false, false); + appender.start(); + log.addAppender(appender); + + // Open the TableStoreStoreManager and store with default SHORT_CF_NAMES true. + WriteConfiguration config = tableStoreContainer.getWriteConfiguration(); + TableStoreStoreManager manager = new TableStoreStoreManager(new BasicConfiguration(GraphDatabaseConfiguration.ROOT_NS, + config, BasicConfiguration.Restriction.NONE)); + KeyColumnValueStore store = manager.openDatabase(GraphDatabaseConfiguration.SYSTEM_PROPERTIES_STORE_NAME); + + store.close(); + manager.close(); + + // Open the TableStoreStoreManager and store with SHORT_CF_NAMES false. + config.set(ConfigElement.getPath(TableStoreStoreManager.SHORT_CF_NAMES), false); + manager = new TableStoreStoreManager(new BasicConfiguration(GraphDatabaseConfiguration.ROOT_NS, + config, BasicConfiguration.Restriction.NONE)); + writer.getBuffer().setLength(0); + store = manager.openDatabase(GraphDatabaseConfiguration.SYSTEM_PROPERTIES_STORE_NAME); + + // Verify we get WARN. + assertTrue(writer.toString().startsWith("WARN: Configuration"), writer.toString()); + log.removeAppender(appender); + + store.close(); + manager.close(); + } + + @Test + // Test TableStore preferred timestamp provider MILLI is set by default + public void testTableStoreTimestampProvider() { + // Get an empty configuration + // GraphDatabaseConfiguration.buildGraphConfiguration() only build an empty one. + ModifiableConfiguration config = GraphDatabaseConfiguration.buildGraphConfiguration(); + // Set backend to tableStore + config.set(GraphDatabaseConfiguration.STORAGE_BACKEND, "tableStore"); + // Instantiate a GraphDatabaseConfiguration based on the above + GraphDatabaseConfiguration graphConfig = new GraphDatabaseConfigurationBuilder().build(config.getConfiguration()); + // Check the TIMESTAMP_PROVIDER has been set to the TableStore preferred MILLI + TimestampProviders provider = graphConfig.getConfiguration().get(GraphDatabaseConfiguration.TIMESTAMP_PROVIDER); + assertEquals(TableStoreStoreManager.PREFERRED_TIMESTAMPS, provider); + } + + @Test + public void testTableStoreStoragePort() throws BackendException { + WriteConfiguration config = tableStoreContainer.getWriteConfiguration(); + config.set(ConfigElement.getPath(GraphDatabaseConfiguration.STORAGE_PORT), 2000); + TableStoreStoreManager manager = new TableStoreStoreManager(new BasicConfiguration(GraphDatabaseConfiguration.ROOT_NS, + config, BasicConfiguration.Restriction.NONE)); + // Check the native property in TableStore conf. + String port = manager.getHBaseConf().get("hbase.zookeeper.property.clientPort"); + assertEquals("2000", port); + } + + @Test + // Test TableStore skip-schema-check config + public void testTableStoreSkipSchemaCheck() throws Exception { + org.apache.logging.log4j.core.Logger log = (org.apache.logging.log4j.core.Logger)LogManager.getLogger(TableStoreStoreManager.class); + Level savedLevel = log.getLevel(); + log.setLevel(Level.DEBUG); + StringWriter writer = new StringWriter(); + Appender appender = WriterAppender.createAppender(PatternLayout.newBuilder().withPattern("%p: %m%n").build(), LevelMatchFilter.newBuilder().setLevel(Level.DEBUG).build(), writer, "test", false, false); + appender.start(); + log.addAppender(appender); + + // Open the TableStoreStoreManager with default skip-schema-check false. + WriteConfiguration config = tableStoreContainer.getWriteConfiguration(); + TableStoreStoreManager manager = new TableStoreStoreManager(new BasicConfiguration(GraphDatabaseConfiguration.ROOT_NS, + config, BasicConfiguration.Restriction.NONE)); + assertEquals(manager.getDeployment(), DistributedStoreManager.Deployment.REMOTE); + // Verify we get "Performing schema check". + assertTrue(writer.toString().contains("Performing schema check"), writer.toString()); + manager.close(); + + // Open the TableStoreStoreManager with skip-schema-check true. + config.set(ConfigElement.getPath(TableStoreStoreManager.SKIP_SCHEMA_CHECK), true); + manager = new TableStoreStoreManager(new BasicConfiguration(GraphDatabaseConfiguration.ROOT_NS, + config, BasicConfiguration.Restriction.NONE)); + writer.getBuffer().setLength(0); + + assertEquals(manager.getDeployment(), DistributedStoreManager.Deployment.REMOTE); + // Verify we get "Skipping schema check". + assertTrue(writer.toString().contains("Skipping schema check"), writer.toString()); + + log.removeAppender(appender); + log.setLevel(savedLevel); + // Test when tableStore table does not exist with skip-schema-check true. + config.set(ConfigElement.getPath(TableStoreStoreManager.HBASE_TABLE), "unknown_table"); + TableStoreStoreManager skipSchemaManager = new TableStoreStoreManager(new BasicConfiguration(GraphDatabaseConfiguration.ROOT_NS, + config, BasicConfiguration.Restriction.NONE)); + Exception ex = assertThrows(PermanentBackendException.class, () -> skipSchemaManager.getLocalKeyPartition()); + assertEquals("Table unknown_table doesn't exist in TableStore!", ex.getMessage()); + + manager.close(); + } +} diff --git a/janusgraph-tablestore/src/test/java/org/janusgraph/diskstorage/tablestore/TableStoreStoreManagerMutationTest.java b/janusgraph-tablestore/src/test/java/org/janusgraph/diskstorage/tablestore/TableStoreStoreManagerMutationTest.java new file mode 100644 index 0000000000..f3ffc26fea --- /dev/null +++ b/janusgraph-tablestore/src/test/java/org/janusgraph/diskstorage/tablestore/TableStoreStoreManagerMutationTest.java @@ -0,0 +1,173 @@ +// Copyright 2017 JanusGraph Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.janusgraph.diskstorage.tablestore; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.util.Pair; +import org.janusgraph.TableStoreContainer; +import org.janusgraph.diskstorage.Entry; +import org.janusgraph.diskstorage.EntryMetaData; +import org.janusgraph.diskstorage.KeyColumnValueStoreUtil; +import org.janusgraph.diskstorage.StaticBuffer; +import org.janusgraph.diskstorage.keycolumnvalue.KCVMutation; +import org.janusgraph.diskstorage.util.StaticArrayBuffer; +import org.janusgraph.diskstorage.util.StaticArrayEntry; +import org.junit.jupiter.api.Test; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; + +@Testcontainers +public class TableStoreStoreManagerMutationTest { + @Container + public static final TableStoreContainer tableStoreContainer = new TableStoreContainer(); + + @Test + public void testKCVMutationToPuts() throws Exception { + final Map> storeMutationMap = new HashMap<>(); + final Map rowkeyMutationMap = new HashMap<>(); + final List expectedColumnsWithTTL = new ArrayList<>(); + final List expectedColumnsWithoutTTL = new ArrayList<>(); + final List expectedColumnDelete = new ArrayList<>(); + StaticArrayEntry e = null; + StaticBuffer rowkey, col, val; + // 2 rows + for (int row = 0; row < 2; row++) { + + rowkey = KeyColumnValueStoreUtil.longToByteBuffer(row); + + List additions = new ArrayList<>(); + List deletions = new ArrayList<>(); + + // 100 columns each row + int i; + for (i = 0; i < 100; i++) { + col = KeyColumnValueStoreUtil.longToByteBuffer(i); + val = KeyColumnValueStoreUtil.longToByteBuffer(i + 100); + e = (StaticArrayEntry) StaticArrayEntry.of(col, val); + // Set half of the columns with TTL, also vary the TTL values + if (i % 2 == 0) { + e.setMetaData(EntryMetaData.TTL, i % 10 + 1); + // Collect the columns with TTL. Only do this for one row + if (row == 1) { + expectedColumnsWithTTL.add((long) i); + } + } else { + // Collect the columns without TTL. Only do this for one row + if (row == 1) { + expectedColumnsWithoutTTL.add((long) i); + } + } + additions.add(e); + } + // Add one deletion to the row + if (row == 1) { + expectedColumnDelete.add((long) (i - 1)); + } + deletions.add(e); + rowkeyMutationMap.put(rowkey, new KCVMutation(additions, deletions)); + } + storeMutationMap.put("store1", rowkeyMutationMap); + TableStoreStoreManager manager = new TableStoreStoreManager(tableStoreContainer.getModifiableConfiguration()); + final Map, Delete>> commandsPerRowKey + = manager.convertToCommands(storeMutationMap, 0L, 0L).values().stream().findFirst().orElse(null); + + // 2 rows + assertEquals(commandsPerRowKey.size(), 2); + + // Verify puts + final List putColumnsWithTTL = new ArrayList<>(); + final List putColumnsWithoutTTL = new ArrayList<>(); + Pair, Delete> commands = commandsPerRowKey.values().iterator().next(); + long colName; + for (Put p : commands.getFirst()) { + // In Put, Long.MAX_VALUE means no TTL + for (Map.Entry> me : p.getFamilyCellMap().entrySet()) { + for (Cell c : me.getValue()) { + colName = KeyColumnValueStoreUtil.bufferToLong(new StaticArrayBuffer(CellUtil.cloneQualifier(c))); + if (p.getTTL() < Long.MAX_VALUE) { + putColumnsWithTTL.add(colName); + } else { + putColumnsWithoutTTL.add(colName); + } + } + } + } + Collections.sort(putColumnsWithoutTTL); + Collections.sort(putColumnsWithTTL); + assertArrayEquals(expectedColumnsWithoutTTL.toArray(), putColumnsWithoutTTL.toArray()); + assertArrayEquals(expectedColumnsWithTTL.toArray(), putColumnsWithTTL.toArray()); + + // Verify deletes + final List deleteColumns = new ArrayList<>(); + Delete d = commands.getSecond(); + for (Map.Entry> me : d.getFamilyCellMap().entrySet()) { + for (Cell c : me.getValue()) { + colName = KeyColumnValueStoreUtil.bufferToLong(new StaticArrayBuffer(CellUtil.cloneQualifier(c))); + deleteColumns.add(colName); + } + } + Collections.sort(deleteColumns); + assertArrayEquals(expectedColumnDelete.toArray(), deleteColumns.toArray()); + } + + @Test + public void testMutationToPutsTTL() throws Exception{ + final Map> storeMutationMap = new HashMap<>(); + final Map rowkeyMutationMap = new HashMap<>(); + final List expectedColumnsWithTTL = new ArrayList<>(); + final List putColumnsWithTTL = new ArrayList<>(); + List additions = new ArrayList<>(); + List deletions = new ArrayList<>(); + + StaticBuffer rowkey = KeyColumnValueStoreUtil.longToByteBuffer(0); + StaticBuffer col = KeyColumnValueStoreUtil.longToByteBuffer(1); + StaticBuffer val = KeyColumnValueStoreUtil.longToByteBuffer(2); + + StaticArrayEntry e = (StaticArrayEntry) StaticArrayEntry.of(col, val); + //Test TTL with int max value / 1000 + 1 + //When convert this value from second to millisec will over Integer limit + e.setMetaData(EntryMetaData.TTL, Integer.MAX_VALUE/1000+1); + + Integer ttl = (Integer) e.getMetaData().get(EntryMetaData.TTL); + expectedColumnsWithTTL.add(TimeUnit.SECONDS.toMillis((long)ttl));//convert second to millisec with long format + + additions.add(e); + deletions.add(e); + rowkeyMutationMap.put(rowkey, new KCVMutation(additions, deletions)); + storeMutationMap.put("store1", rowkeyMutationMap); + TableStoreStoreManager manager = new TableStoreStoreManager(tableStoreContainer.getModifiableConfiguration()); + final Map, Delete>> commandsPerRowKey + = manager.convertToCommands(storeMutationMap, 0L, 0L).values().stream().findFirst().orElse(null); + Pair, Delete> commands = commandsPerRowKey.values().iterator().next(); + + //Verify Put TTL + Put put = commands.getFirst().get(0); + putColumnsWithTTL.add(put.getTTL()); + assertArrayEquals(expectedColumnsWithTTL.toArray(), putColumnsWithTTL.toArray()); + } +} diff --git a/janusgraph-tablestore/src/test/java/org/janusgraph/diskstorage/tablestore/TableStoreStoreTest.java b/janusgraph-tablestore/src/test/java/org/janusgraph/diskstorage/tablestore/TableStoreStoreTest.java new file mode 100644 index 0000000000..6cc78e3343 --- /dev/null +++ b/janusgraph-tablestore/src/test/java/org/janusgraph/diskstorage/tablestore/TableStoreStoreTest.java @@ -0,0 +1,72 @@ +// Copyright 2017 JanusGraph Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.janusgraph.diskstorage.tablestore; + +import org.janusgraph.TableStoreContainer; +import org.janusgraph.diskstorage.BackendException; +import org.janusgraph.diskstorage.KeyColumnValueStoreTest; +import org.janusgraph.diskstorage.configuration.BasicConfiguration; +import org.janusgraph.diskstorage.configuration.ModifiableConfiguration; +import org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInfo; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +@Testcontainers +public class TableStoreStoreTest extends KeyColumnValueStoreTest { + @Container + public static final TableStoreContainer tableStoreContainer = new TableStoreContainer(); + + public TableStoreStoreManager openStorageManager(ModifiableConfiguration config) throws BackendException { + return new TableStoreStoreManager(new BasicConfiguration(GraphDatabaseConfiguration.ROOT_NS, config.getConfiguration(), BasicConfiguration.Restriction.NONE)); + } + + public TableStoreStoreManager openStorageManager() throws BackendException { + return openStorageManager(""); + } + + public TableStoreStoreManager openStorageManager(String tableName) throws BackendException { + return openStorageManager(tableName, ""); + } + + public TableStoreStoreManager openStorageManager(String tableName, String graphName) throws BackendException { + return new TableStoreStoreManager(tableStoreContainer.getNamedConfiguration(tableName, graphName)); + } + + @Test + public void testGetKeysWithKeyRange(TestInfo testInfo) throws Exception { + super.testGetKeysWithKeyRange(testInfo); + } + + @Override + public TableStoreStoreManager openStorageManagerForClearStorageTest() throws Exception { + return openStorageManager(tableStoreContainer.getModifiableConfiguration().set(GraphDatabaseConfiguration.DROP_ON_CLEAR, true)); + } + + @Test + public void tableShouldEqualSuppliedTableName() throws BackendException { + final TableStoreStoreManager mgr = openStorageManager("randomTableName"); + assertEquals("randomTableName", mgr.getName()); + } + + @Test + public void tableShouldEqualGraphName() throws BackendException { + final TableStoreStoreManager mgr = openStorageManager("", "randomGraphName"); + assertEquals("randomGraphName", mgr.getName()); + } +} diff --git a/janusgraph-tablestore/src/test/java/org/janusgraph/graphdb/tablestore/TableStoreCustomIdTest.java b/janusgraph-tablestore/src/test/java/org/janusgraph/graphdb/tablestore/TableStoreCustomIdTest.java new file mode 100644 index 0000000000..55826129e0 --- /dev/null +++ b/janusgraph-tablestore/src/test/java/org/janusgraph/graphdb/tablestore/TableStoreCustomIdTest.java @@ -0,0 +1,32 @@ +// Copyright 2022 JanusGraph Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.janusgraph.graphdb.tablestore; + +import org.janusgraph.TableStoreContainer; +import org.janusgraph.diskstorage.configuration.ModifiableConfiguration; +import org.janusgraph.graphdb.JanusGraphCustomIdTest; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +@Testcontainers +public class TableStoreCustomIdTest extends JanusGraphCustomIdTest { + @Container + public static final TableStoreContainer tableStoreContainer = new TableStoreContainer(); + + @Override + public ModifiableConfiguration getModifiableConfiguration() { + return tableStoreContainer.getModifiableConfiguration(); + } +} diff --git a/janusgraph-tablestore/src/test/java/org/janusgraph/graphdb/tablestore/TableStoreGraphConcurrentTest.java b/janusgraph-tablestore/src/test/java/org/janusgraph/graphdb/tablestore/TableStoreGraphConcurrentTest.java new file mode 100644 index 0000000000..9d96e3d7d2 --- /dev/null +++ b/janusgraph-tablestore/src/test/java/org/janusgraph/graphdb/tablestore/TableStoreGraphConcurrentTest.java @@ -0,0 +1,32 @@ +// Copyright 2017 JanusGraph Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.janusgraph.graphdb.tablestore; + +import org.janusgraph.TableStoreContainer; +import org.janusgraph.diskstorage.configuration.WriteConfiguration; +import org.janusgraph.graphdb.JanusGraphConcurrentTest; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +@Testcontainers +public class TableStoreGraphConcurrentTest extends JanusGraphConcurrentTest { + @Container + public static final TableStoreContainer tableStoreContainer = new TableStoreContainer(); + + @Override + public WriteConfiguration getConfiguration() { + return tableStoreContainer.getWriteConfiguration(); + } +} diff --git a/janusgraph-tablestore/src/test/java/org/janusgraph/graphdb/tablestore/TableStoreGraphPerformanceMemoryTest.java b/janusgraph-tablestore/src/test/java/org/janusgraph/graphdb/tablestore/TableStoreGraphPerformanceMemoryTest.java new file mode 100644 index 0000000000..b959f08a6f --- /dev/null +++ b/janusgraph-tablestore/src/test/java/org/janusgraph/graphdb/tablestore/TableStoreGraphPerformanceMemoryTest.java @@ -0,0 +1,33 @@ +// Copyright 2017 JanusGraph Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.janusgraph.graphdb.tablestore; + +import org.janusgraph.TableStoreContainer; +import org.janusgraph.diskstorage.configuration.WriteConfiguration; +import org.janusgraph.graphdb.JanusGraphPerformanceMemoryTest; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +@Testcontainers +public class TableStoreGraphPerformanceMemoryTest extends JanusGraphPerformanceMemoryTest { + @Container + public static final TableStoreContainer tableStoreContainer = new TableStoreContainer(); + + @Override + public WriteConfiguration getConfiguration() { + return tableStoreContainer.getWriteConfiguration(); + } + +} diff --git a/janusgraph-tablestore/src/test/java/org/janusgraph/graphdb/tablestore/TableStoreGraphTest.java b/janusgraph-tablestore/src/test/java/org/janusgraph/graphdb/tablestore/TableStoreGraphTest.java new file mode 100644 index 0000000000..f4eec5c859 --- /dev/null +++ b/janusgraph-tablestore/src/test/java/org/janusgraph/graphdb/tablestore/TableStoreGraphTest.java @@ -0,0 +1,84 @@ +// Copyright 2017 JanusGraph Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.janusgraph.graphdb.tablestore; + +import org.janusgraph.TableStoreContainer; +import org.janusgraph.diskstorage.configuration.WriteConfiguration; +import org.janusgraph.graphdb.JanusGraphTest; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +import java.util.Arrays; +import java.util.stream.Stream; + +import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.ASSIGN_TIMESTAMP; +import static org.junit.jupiter.params.provider.Arguments.arguments; + +@Testcontainers +public class TableStoreGraphTest extends JanusGraphTest { + @Container + public static final TableStoreContainer tableStoreContainer = new TableStoreContainer(); + + @Override + public WriteConfiguration getConfiguration() { + return tableStoreContainer.getWriteConfiguration(); + } + + @Override @Test @Disabled("TableStore does not support retrieving cell TTL by client") + public void testVertexTTLImplicitKey() { } + + @Override @Test @Disabled("TableStore does not support retrieving cell TTL by client") + public void testEdgeTTLImplicitKey() { } + + protected static Stream generateConsistencyConfigs() { + return Arrays.stream(new Arguments[]{ + arguments(true), + arguments(false) + }); + } + + @Override + @Test + @Disabled + public void testConsistencyEnforcement() { + // disable original test in favour of parameterized test + } + + @ParameterizedTest + @MethodSource("generateConsistencyConfigs") + public void testConsistencyEnforcement(boolean assignTimestamp) { + clopen(option(ASSIGN_TIMESTAMP), assignTimestamp); + super.testConsistencyEnforcement(); + } + + @Override + @Test + @Disabled + public void testConcurrentConsistencyEnforcement() { + // disable original test in favour of parameterized test + } + + @ParameterizedTest + @MethodSource("generateConsistencyConfigs") + public void testConcurrentConsistencyEnforcement(boolean assignTimestamp) throws Exception { + clopen(option(ASSIGN_TIMESTAMP), assignTimestamp); + super.testConcurrentConsistencyEnforcement(); + } +} diff --git a/janusgraph-tablestore/src/test/java/org/janusgraph/graphdb/tablestore/TableStoreLuceneCustomIdTest.java b/janusgraph-tablestore/src/test/java/org/janusgraph/graphdb/tablestore/TableStoreLuceneCustomIdTest.java new file mode 100644 index 0000000000..7599ef2f77 --- /dev/null +++ b/janusgraph-tablestore/src/test/java/org/janusgraph/graphdb/tablestore/TableStoreLuceneCustomIdTest.java @@ -0,0 +1,43 @@ +// Copyright 2022 JanusGraph Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.janusgraph.graphdb.tablestore; + +import org.janusgraph.TableStoreContainer; +import org.janusgraph.StorageSetup; +import org.janusgraph.diskstorage.configuration.ModifiableConfiguration; +import org.janusgraph.graphdb.JanusGraphCustomIdIndexTest; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.INDEX_BACKEND; +import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.INDEX_DIRECTORY; + + +@Testcontainers +public class TableStoreLuceneCustomIdTest extends JanusGraphCustomIdIndexTest { + + @Container + public static final TableStoreContainer tableStoreContainer = new TableStoreContainer(); + + @Override + protected ModifiableConfiguration getModifiableConfiguration() { + ModifiableConfiguration config = tableStoreContainer.getModifiableConfiguration(); + for (String indexBackend : getIndexBackends()) { + config.set(INDEX_BACKEND, "lucene", indexBackend); + config.set(INDEX_DIRECTORY, StorageSetup.getHomeDir("lucene"), indexBackend); + } + return config; + } +} diff --git a/janusgraph-tablestore/src/test/java/org/janusgraph/graphdb/tablestore/TableStoreOLAPTest.java b/janusgraph-tablestore/src/test/java/org/janusgraph/graphdb/tablestore/TableStoreOLAPTest.java new file mode 100644 index 0000000000..71879d0baf --- /dev/null +++ b/janusgraph-tablestore/src/test/java/org/janusgraph/graphdb/tablestore/TableStoreOLAPTest.java @@ -0,0 +1,33 @@ +// Copyright 2017 JanusGraph Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.janusgraph.graphdb.tablestore; + +import org.janusgraph.TableStoreContainer; +import org.janusgraph.diskstorage.configuration.WriteConfiguration; +import org.janusgraph.olap.OLAPTest; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +@Testcontainers +public class TableStoreOLAPTest extends OLAPTest { + @Container + public static final TableStoreContainer tableStoreContainer = new TableStoreContainer(); + + @Override + public WriteConfiguration getConfiguration() { + return tableStoreContainer.getWriteConfiguration(); + } + +} diff --git a/janusgraph-tablestore/src/test/java/org/janusgraph/graphdb/tablestore/TableStoreOperationCountingTest.java b/janusgraph-tablestore/src/test/java/org/janusgraph/graphdb/tablestore/TableStoreOperationCountingTest.java new file mode 100644 index 0000000000..b50e87bf8c --- /dev/null +++ b/janusgraph-tablestore/src/test/java/org/janusgraph/graphdb/tablestore/TableStoreOperationCountingTest.java @@ -0,0 +1,45 @@ +// Copyright 2017 JanusGraph Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.janusgraph.graphdb.tablestore; + +import org.janusgraph.TableStoreContainer; +import org.janusgraph.diskstorage.configuration.WriteConfiguration; +import org.janusgraph.graphdb.JanusGraphOperationCountingTest; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +/** + * @author Matthias Broecheler (me@matthiasb.com) + */ +@Testcontainers +public class TableStoreOperationCountingTest extends JanusGraphOperationCountingTest { + @Container + public static final TableStoreContainer tableStoreContainer = new TableStoreContainer(); + + @Override + public WriteConfiguration getBaseConfiguration() { + return tableStoreContainer.getWriteConfiguration(); + } + + @Override + @Disabled + @Test + public void testCacheConcurrency() { + //Don't run this test; + } + +} diff --git a/janusgraph-tablestore/src/test/java/org/janusgraph/graphdb/tablestore/TableStorePartitionGraphTest.java b/janusgraph-tablestore/src/test/java/org/janusgraph/graphdb/tablestore/TableStorePartitionGraphTest.java new file mode 100644 index 0000000000..d3300a281e --- /dev/null +++ b/janusgraph-tablestore/src/test/java/org/janusgraph/graphdb/tablestore/TableStorePartitionGraphTest.java @@ -0,0 +1,36 @@ +// Copyright 2017 JanusGraph Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.janusgraph.graphdb.tablestore; + +import org.janusgraph.TableStoreContainer; +import org.janusgraph.diskstorage.configuration.WriteConfiguration; +import org.janusgraph.graphdb.JanusGraphPartitionGraphTest; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +/** + * @author Matthias Broecheler (me@matthiasb.com) + */ +@Testcontainers +public class TableStorePartitionGraphTest extends JanusGraphPartitionGraphTest { + @Container + public static final TableStoreContainer tableStoreContainer = new TableStoreContainer(); + + @Override + public WriteConfiguration getBaseConfiguration() { + return tableStoreContainer.getWriteConfiguration(); + } + +} diff --git a/janusgraph-tablestore/src/test/java/org/janusgraph/hadoop/TableStoreCustomIdSparkTest.java b/janusgraph-tablestore/src/test/java/org/janusgraph/hadoop/TableStoreCustomIdSparkTest.java new file mode 100644 index 0000000000..41944d4a4b --- /dev/null +++ b/janusgraph-tablestore/src/test/java/org/janusgraph/hadoop/TableStoreCustomIdSparkTest.java @@ -0,0 +1,50 @@ +// Copyright 2022 JanusGraph Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.janusgraph.hadoop; + +import org.apache.commons.configuration2.PropertiesConfiguration; +import org.apache.commons.configuration2.ex.ConfigurationException; +import org.apache.tinkerpop.gremlin.structure.Graph; +import org.apache.tinkerpop.gremlin.structure.util.GraphFactory; +import org.janusgraph.TableStoreContainer; +import org.janusgraph.diskstorage.configuration.ModifiableConfiguration; +import org.janusgraph.util.system.ConfigurationUtil; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; + +@Testcontainers +public class TableStoreCustomIdSparkTest extends JanusGraphCustomIdSparkTest { + @Container + public static final TableStoreContainer tableStoreContainer = new TableStoreContainer(); + + @Override + protected Graph getSparkGraph() throws IOException, ConfigurationException { + final PropertiesConfiguration config = ConfigurationUtil.loadPropertiesConfig("target/test-classes/hbase-read.properties", false); + Path baseOutDir = Paths.get((String) config.getProperty("gremlin.hadoop.outputLocation")); + baseOutDir.toFile().mkdirs(); + String outDir = Files.createTempDirectory(baseOutDir, null).toAbsolutePath().toString(); + config.setProperty("gremlin.hadoop.outputLocation", outDir); + return GraphFactory.open(config); + } + + protected ModifiableConfiguration getModifiableConfiguration() { + return tableStoreContainer.getModifiableConfiguration(); + } +} diff --git a/janusgraph-tablestore/src/test/java/org/janusgraph/hadoop/TableStoreIndexManagementIT.java b/janusgraph-tablestore/src/test/java/org/janusgraph/hadoop/TableStoreIndexManagementIT.java new file mode 100644 index 0000000000..09deaf778c --- /dev/null +++ b/janusgraph-tablestore/src/test/java/org/janusgraph/hadoop/TableStoreIndexManagementIT.java @@ -0,0 +1,32 @@ +// Copyright 2017 JanusGraph Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.janusgraph.hadoop; + +import org.janusgraph.TableStoreContainer; +import org.janusgraph.diskstorage.configuration.WriteConfiguration; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +@Testcontainers +public class TableStoreIndexManagementIT extends AbstractIndexManagementIT { + @Container + public static final TableStoreContainer tableStoreContainer = new TableStoreContainer(); + + @Override + public WriteConfiguration getConfiguration() { + return tableStoreContainer.getWriteConfiguration(); + } + +} diff --git a/janusgraph-tablestore/src/test/java/org/janusgraph/hadoop/TableStoreInputFormatIT.java b/janusgraph-tablestore/src/test/java/org/janusgraph/hadoop/TableStoreInputFormatIT.java new file mode 100644 index 0000000000..b3a3f110e9 --- /dev/null +++ b/janusgraph-tablestore/src/test/java/org/janusgraph/hadoop/TableStoreInputFormatIT.java @@ -0,0 +1,55 @@ +// Copyright 2017 JanusGraph Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.janusgraph.hadoop; + +import org.apache.commons.configuration2.PropertiesConfiguration; +import org.apache.commons.configuration2.ex.ConfigurationException; +import org.apache.tinkerpop.gremlin.structure.Graph; +import org.apache.tinkerpop.gremlin.structure.util.GraphFactory; +import org.janusgraph.TableStoreContainer; +import org.janusgraph.diskstorage.configuration.WriteConfiguration; +import org.janusgraph.util.system.ConfigurationUtil; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; + +@Testcontainers +public class TableStoreInputFormatIT extends AbstractInputFormatIT { + @Container + private static final TableStoreContainer tableStore = new TableStoreContainer(); + + private PropertiesConfiguration getGraphConfiguration(final String filename) throws ConfigurationException, IOException { + final PropertiesConfiguration config = ConfigurationUtil.loadPropertiesConfig(filename, false); + Path baseOutDir = Paths.get((String) config.getProperty("gremlin.hadoop.outputLocation")); + baseOutDir.toFile().mkdirs(); + String outDir = Files.createTempDirectory(baseOutDir, null).toAbsolutePath().toString(); + config.setProperty("gremlin.hadoop.outputLocation", outDir); + return config; + } + + @Override + public WriteConfiguration getConfiguration() { + return tableStore.getWriteConfiguration(); + } + + @Override + protected Graph getGraph() throws ConfigurationException, IOException { + return GraphFactory.open(getGraphConfiguration("target/test-classes/hbase-read.properties")); + } +} diff --git a/janusgraph-tablestore/src/test/java/org/janusgraph/hadoop/TableStoreSnapshotInputFormatIT.java b/janusgraph-tablestore/src/test/java/org/janusgraph/hadoop/TableStoreSnapshotInputFormatIT.java new file mode 100644 index 0000000000..bf24121a9a --- /dev/null +++ b/janusgraph-tablestore/src/test/java/org/janusgraph/hadoop/TableStoreSnapshotInputFormatIT.java @@ -0,0 +1,316 @@ +// Copyright 2017 JanusGraph Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.janusgraph.hadoop; + +import org.apache.commons.configuration2.PropertiesConfiguration; +import org.apache.commons.configuration2.ex.ConfigurationException; +import org.apache.tinkerpop.gremlin.process.computer.Computer; +import org.apache.tinkerpop.gremlin.process.traversal.P; +import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource; +import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__; +import org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer; +import org.apache.tinkerpop.gremlin.structure.Direction; +import org.apache.tinkerpop.gremlin.structure.Graph; +import org.apache.tinkerpop.gremlin.structure.Vertex; +import org.apache.tinkerpop.gremlin.structure.util.GraphFactory; +import org.janusgraph.TableStoreContainer; +import org.janusgraph.core.Cardinality; +import org.janusgraph.core.JanusGraphVertex; +import org.janusgraph.diskstorage.configuration.WriteConfiguration; +import org.janusgraph.example.GraphOfTheGodsFactory; +import org.janusgraph.util.system.ConfigurationUtil; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * This test suite contains the same tests as in TableStoreInputFormatIT and AbstractInputFormatIT, but + * takes TableStore snapshots of the graph table during the tests. The snapshots are used by + * TableStoreSnapshotInputFormat. + */ +@Testcontainers +public class TableStoreSnapshotInputFormatIT extends AbstractInputFormatIT { + @Container + public static final TableStoreContainer tableStoreContainer = new TableStoreContainer(true); + + // Used by this test only. Need to be consistent with hbase-read-snapshot.properties + private final String table = "janusgraph"; + private final String snapshotName = "janusgraph-snapshot"; + + + @AfterEach + @Override + public void tearDown() throws Exception { + super.tearDown(); + tableStoreContainer.deleteSnapshot(snapshotName); + } + + @Test + @Override + public void testReadGraphOfTheGods() throws Exception { + GraphOfTheGodsFactory.load(graph, null, true); + assertEquals(12L, (long) graph.traversal().V().count().next()); + // Take a snapshot of the graph table + tableStoreContainer.createSnapshot(snapshotName, table); + + Graph g = getGraph(); + GraphTraversalSource t = g.traversal().withComputer(SparkGraphComputer.class); + assertEquals(12L, (long) t.V().count().next()); + } + + @Test + @Override + public void testReadWideVertexWithManyProperties() throws Exception { + int numProps = 1 << 16; + + long numV = 1; + mgmt.makePropertyKey("p").cardinality(Cardinality.LIST).dataType(Integer.class).make(); + mgmt.commit(); + finishSchema(); + + for (int j = 0; j < numV; j++) { + Vertex v = graph.addVertex(); + for (int i = 0; i < numProps; i++) { + v.property("p", i); + } + } + graph.tx().commit(); + + assertEquals(numV, (long) graph.traversal().V().count().next()); + Map propertiesOnVertex = graph.traversal().V().valueMap().next(); + List valuesOnP = (List) propertiesOnVertex.values().iterator().next(); + assertEquals(numProps, valuesOnP.size()); + for (int i = 0; i < numProps; i++) { + assertEquals(Integer.toString(i), valuesOnP.get(i).toString()); + } + // Take a snapshot of the graph table + tableStoreContainer.createSnapshot(snapshotName, table); + + Graph g = getGraph(); + GraphTraversalSource t = g.traversal().withComputer(SparkGraphComputer.class); + assertEquals(numV, (long) t.V().count().next()); + propertiesOnVertex = t.V().valueMap().next(); + final Set observedValuesOnP = Collections.unmodifiableSet(new HashSet<>((List) propertiesOnVertex.values().iterator().next())); + assertEquals(numProps, observedValuesOnP.size()); + // order may not be preserved in multi-value properties + assertEquals(Collections.unmodifiableSet(new HashSet<>(valuesOnP)), observedValuesOnP, "Unexpected values"); + } + + @Test + @Override + public void testReadSelfEdge() throws Exception { + GraphOfTheGodsFactory.load(graph, null, true); + assertEquals(12L, (long) graph.traversal().V().count().next()); + + // Add a self-loop on sky with edge label "lives"; it's nonsense, but at least it needs no + // schema changes + JanusGraphVertex sky = + graph.query().has("name", "sky").vertices().iterator().next(); + assertNotNull(sky); + assertEquals("sky", sky.value("name")); + assertEquals(1L, sky.query().direction(Direction.IN).edgeCount()); + assertEquals(0L, sky.query().direction(Direction.OUT).edgeCount()); + assertEquals(1L, sky.query().direction(Direction.BOTH).edgeCount()); + sky.addEdge("lives", sky, "reason", "testReadSelfEdge"); + assertEquals(2L, sky.query().direction(Direction.IN).edgeCount()); + assertEquals(1L, sky.query().direction(Direction.OUT).edgeCount()); + assertEquals(3L, sky.query().direction(Direction.BOTH).edgeCount()); + graph.tx().commit(); + // Take a snapshot of the graph table + tableStoreContainer.createSnapshot(snapshotName, table); + + // Read the new edge using the inputformat + Graph g = getGraph(); + GraphTraversalSource t = g.traversal().withComputer(SparkGraphComputer.class); + Iterator edgeIdIter = t.V().has("name", "sky").bothE().id(); + assertNotNull(edgeIdIter); + assertTrue(edgeIdIter.hasNext()); + Set edges = new HashSet<>(); + edgeIdIter.forEachRemaining(edges::add); + assertEquals(2, edges.size()); + } + + @Test + @Override + public void testReadMultipleSelfEdges() throws Exception { + GraphOfTheGodsFactory.load(graph, null, true); + assertEquals(12L, (long) graph.traversal().V().count().next()); + + // Similarly to testReadSelfEdge(), add multiple self-loop edges on sky with edge label "lives" + JanusGraphVertex sky = graph.query().has("name", "sky").vertices().iterator().next(); + assertNotNull(sky); + assertEquals("sky", sky.value("name")); + assertEquals(1L, sky.query().direction(Direction.IN).edgeCount()); + assertEquals(0L, sky.query().direction(Direction.OUT).edgeCount()); + assertEquals(1L, sky.query().direction(Direction.BOTH).edgeCount()); + sky.addEdge("lives", sky, "reason", "testReadMultipleSelfEdges1"); + sky.addEdge("lives", sky, "reason", "testReadMultipleSelfEdges2"); + sky.addEdge("lives", sky, "reason", "testReadMultipleSelfEdges3"); + assertEquals(4L, sky.query().direction(Direction.IN).edgeCount()); + assertEquals(3L, sky.query().direction(Direction.OUT).edgeCount()); + assertEquals(7L, sky.query().direction(Direction.BOTH).edgeCount()); + graph.tx().commit(); + // Take a snapshot of the graph table + tableStoreContainer.createSnapshot(snapshotName, table); + + // Read all the new edges using the inputformat + Graph g = getGraph(); + GraphTraversalSource t = g.traversal().withComputer(SparkGraphComputer.class); + Iterator edgeIdIterator = t.V().has("name", "sky").bothE().id(); + assertNotNull(edgeIdIterator); + assertTrue(edgeIdIterator.hasNext()); + Set edges = new HashSet<>(); + edgeIdIterator.forEachRemaining(edges::add); + assertEquals(4, edges.size()); + } + + @Test + @Override + public void testGeoshapeGetValues() throws Exception { + GraphOfTheGodsFactory.load(graph, null, true); + // Take a snapshot of the graph table + tableStoreContainer.createSnapshot(snapshotName, table); + + // Read geoshape using the inputformat + Graph g = getGraph(); + GraphTraversalSource t = g.traversal().withComputer(SparkGraphComputer.class); + Iterator geoIter = t.E().values("place"); + assertNotNull(geoIter); + assertTrue(geoIter.hasNext()); + Set geos = new HashSet<>(); + geoIter.forEachRemaining(geos::add); + assertEquals(3, geos.size()); + } + + @Test + @Override + public void testReadGraphOfTheGodsWithEdgeFiltering() throws Exception { + GraphOfTheGodsFactory.load(graph, null, true); + assertEquals(17L, (long) graph.traversal().E().count().next()); + // Take a snapshot of the graph table + tableStoreContainer.createSnapshot(snapshotName, table); + + // Read graph filtering out "battled" edges. + Graph g = getGraph(); + Computer computer = Computer.compute(SparkGraphComputer.class) + .edges(__.bothE().hasLabel(P.neq("battled"))); + GraphTraversalSource t = g.traversal().withComputer(computer); + assertEquals(14L, (long) t.E().count().next()); + } + + @Test + @Override + public void testGraphWithIsolatedVertices() throws Exception { + String key = "vertexKey"; + graph.addVertex(key); + graph.tx().commit(); + // Take a snapshot of the graph table + tableStoreContainer.createSnapshot(snapshotName, table); + + // Read graph using the inputformat. + Graph g = getGraph(); + GraphTraversalSource t = g.traversal().withComputer(SparkGraphComputer.class); + assertEquals(1L, (long) t.V().count().next()); + } + + @Test + @Override + public void testSchemaVerticesAreSkipped() throws Exception { + mgmt.makePropertyKey("p").dataType(Integer.class).make(); + mgmt.makeVertexLabel("v").make(); + mgmt.makeEdgeLabel("e").make(); + finishSchema(); + // Take a snapshot of the graph table + tableStoreContainer.createSnapshot(snapshotName, table); + + // Read graph using the inputformat. + Graph g = getGraph(); + GraphTraversalSource t = g.traversal().withComputer(SparkGraphComputer.class); + assertEquals(0L, (long) t.V().count().next()); + } + + @Test + @Override + public void testReadWithMetaProperties() throws Exception { + GraphOfTheGodsFactory.load(graph, null, true); + GraphTraversalSource t = graph.traversal(); + + assertEquals(0L, (long) t.V().has("name", "sky").properties("property").count().next()); + + mgmt.makePropertyKey("prop").cardinality(Cardinality.SINGLE).dataType(String.class).make(); + mgmt.makePropertyKey("meta_property").cardinality(Cardinality.SINGLE).dataType(String.class).make(); + mgmt.commit(); + finishSchema(); + + t.V().has("name", "sky") + .property("prop", "value") + .iterate(); + graph.tx().commit(); + assertEquals(1L, (long) t.V().has("name", "sky").properties("prop").count().next()); + assertEquals(0L, (long) t.V().has("name", "sky").properties("prop") + .properties("meta_property").count().next()); + + t.V() + .has("name", "sky") + .properties("prop") + .property("meta_property", "meta_value") + .iterate(); + graph.tx().commit(); + assertEquals(1L, (long) t.V().has("name", "sky").properties("prop") + .properties("meta_property").count().next()); + + // Take a snapshot of the graph table + tableStoreContainer.createSnapshot(snapshotName, table); + + Graph g = getGraph(); + t = g.traversal().withComputer(SparkGraphComputer.class); + assertEquals(1L, (long) t.V().has("name", "sky").properties("prop").count().next()); + assertEquals(1L, (long) t.V().has("name", "sky").properties("prop") + .properties("meta_property").count().next()); + } + + protected Graph getGraph() throws IOException, ConfigurationException { + final PropertiesConfiguration config = ConfigurationUtil.loadPropertiesConfig("target/test-classes/hbase-read-snapshot.properties", false); + Path baseOutDir = Paths.get((String) config.getProperty("gremlin.hadoop.outputLocation")); + baseOutDir.toFile().mkdirs(); + String outDir = Files.createTempDirectory(baseOutDir, null).toAbsolutePath().toString(); + config.setProperty("gremlin.hadoop.outputLocation", outDir); + // Set the hbase.rootdir property. This is needed by HBaseSnapshotInputFormat. + config.setProperty("janusgraphmr.ioformat.conf.storage.hbase.ext.hbase.rootdir", + tableStoreContainer.getHBaseRootDir().toString()); + return GraphFactory.open(config); + } + + @Override + public WriteConfiguration getConfiguration() { + return tableStoreContainer.getWriteConfiguration(); + } +} diff --git a/janusgraph-tablestore/src/test/resources/hbase-read-snapshot.properties b/janusgraph-tablestore/src/test/resources/hbase-read-snapshot.properties new file mode 100644 index 0000000000..5dbcfb412f --- /dev/null +++ b/janusgraph-tablestore/src/test/resources/hbase-read-snapshot.properties @@ -0,0 +1,42 @@ +# Copyright 2019 JanusGraph Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# +# Hadoop Graph Configuration +# +gremlin.graph=org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph +gremlin.hadoop.graphReader=org.janusgraph.hadoop.formats.hbase.HBaseSnapshotInputFormat +gremlin.hadoop.graphWriter=org.apache.hadoop.mapreduce.lib.output.NullOutputFormat + +gremlin.hadoop.jarsInDistributedCache=true +gremlin.hadoop.inputLocation=none +gremlin.hadoop.outputLocation=target/output + +# +# JanusGraph HBase InputFormat configuration +# +janusgraphmr.ioformat.conf.storage.backend=hbase +janusgraphmr.ioformat.conf.storage.hostname=localhost +janusgraphmr.ioformat.conf.storage.hbase.snapshot-name=janusgraph-snapshot +janusgraphmr.ioformat.conf.storage.hbase.snapshot-restore-dir=target/output + +# +# SparkGraphComputer Configuration +# +spark.master=local[4] +spark.driver.host=127.0.0.1 +spark.executor.memory=1g +spark.serializer=org.apache.spark.serializer.KryoSerializer +spark.kryo.registrator=org.janusgraph.hadoop.serialize.JanusGraphKryoRegistrator +spark.hadoopRDD.ignoreEmptySplits=false diff --git a/janusgraph-tablestore/src/test/resources/hbase-read.properties b/janusgraph-tablestore/src/test/resources/hbase-read.properties new file mode 100644 index 0000000000..4ff1131c97 --- /dev/null +++ b/janusgraph-tablestore/src/test/resources/hbase-read.properties @@ -0,0 +1,42 @@ +# Copyright 2019 JanusGraph Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# +# Hadoop Graph Configuration +# +gremlin.graph=org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph +gremlin.hadoop.graphReader=org.janusgraph.hadoop.formats.hbase.HBaseInputFormat +gremlin.hadoop.graphWriter=org.apache.hadoop.mapreduce.lib.output.NullOutputFormat + +gremlin.hadoop.jarsInDistributedCache=true +gremlin.hadoop.inputLocation=none +gremlin.hadoop.outputLocation=target/output +gremlin.spark.persistContext=true + +# +# JanusGraph HBase InputFormat configuration +# +janusgraphmr.ioformat.conf.storage.backend=hbase +janusgraphmr.ioformat.conf.storage.hostname=localhost +janusgraphmr.ioformat.conf.storage.hbase.table=janusgraph + +# +# SparkGraphComputer Configuration +# +spark.master=local[4] +spark.driver.host=127.0.0.1 +spark.executor.memory=1g +spark.serializer=org.apache.spark.serializer.KryoSerializer +spark.kryo.registrator=org.janusgraph.hadoop.serialize.JanusGraphKryoRegistrator +spark.hadoopRDD.ignoreEmptySplits=false diff --git a/janusgraph-tablestore/src/test/resources/log4j2-test.xml b/janusgraph-tablestore/src/test/resources/log4j2-test.xml new file mode 100644 index 0000000000..8f0c992a9f --- /dev/null +++ b/janusgraph-tablestore/src/test/resources/log4j2-test.xml @@ -0,0 +1,15 @@ + + + + + + %d{HH:mm:ss} %-5level %class.%method{36} - %msg%n + + + + + + + + + \ No newline at end of file diff --git a/pom.xml b/pom.xml index d895d2de19..9b8a6c8e93 100644 --- a/pom.xml +++ b/pom.xml @@ -146,6 +146,7 @@ janusgraph-examples janusgraph-mixed-index-utils janusgraph-scylla + janusgraph-tablestore @@ -1459,6 +1460,11 @@ cdi-api 2.0 + + org.apache.hbase + hbase-client + 2.5.10 +