Skip to content

Commit

Permalink
Fix get all tags (#2461)
Browse files Browse the repository at this point in the history
* Fix get all tags.

* Fix empty props.

Co-authored-by: Yee <[email protected]>
  • Loading branch information
CPWstatic and yixinglu authored Apr 7, 2021
1 parent f71d502 commit 721ae51
Show file tree
Hide file tree
Showing 4 changed files with 141 additions and 51 deletions.
145 changes: 100 additions & 45 deletions src/graph/FetchVerticesExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ Status FetchVerticesExecutor::prepareTags() {
return Status::Error("tags shall never be empty");
}

if (tagNames.size() == 1 && *tagNames[0] == "*") {
if (sentence_->isAllTags()) {
auto tagsStatus = ectx()->schemaManager()->getAllTag(spaceId_);
if (!tagsStatus.ok()) {
return tagsStatus.status();
Expand Down Expand Up @@ -363,48 +363,10 @@ void FetchVerticesExecutor::processResult(RpcResponse &&result) {

std::unordered_map<TagID, std::shared_ptr<const meta::SchemaProviderIf>> tagSchemaMap;
std::set<TagID> tagIdSet;
for (auto &resp : all) {
if (!resp.__isset.vertices || resp.vertices.empty()) {
continue;
}
auto *vertexSchema = resp.get_vertex_schema();
if (vertexSchema != nullptr) {
std::transform(vertexSchema->cbegin(), vertexSchema->cend(),
std::inserter(tagSchemaMap, tagSchemaMap.begin()), [](auto &s) {
return std::make_pair(
s.first, std::make_shared<ResultSchemaProvider>(s.second));
});
}

for (auto &vdata : resp.vertices) {
if (!vdata.__isset.tag_data || vdata.tag_data.empty()) {
continue;
}
for (auto& tagData : vdata.tag_data) {
auto& data = tagData.data;
VertexID vid = vdata.vertex_id;
TagID tagId = tagData.tag_id;
if (tagSchemaMap.find(tagId) == tagSchemaMap.end()) {
auto ver = RowReader::getSchemaVer(data);
if (ver < 0) {
LOG(ERROR) << "Found schema version negative " << ver;
doError(Status::Error("Found schema version negative: %d", ver));
return;
}
auto schema = ectx()->schemaManager()->getTagSchema(spaceId_, tagId, ver);
if (schema == nullptr) {
VLOG(3) << "Schema not found for tag id: " << tagId;
// Ignore the bad data.
continue;
}
tagSchemaMap[tagId] = schema;
}
auto vschema = tagSchemaMap[tagId];
auto vreader = RowReader::getRowReader(data, vschema);
dataMap[vid].emplace(std::make_pair(tagId, std::move(vreader)));
tagIdSet.insert(tagId);
}
}
if (sentence_->isAllTags() && yieldClause_ == nullptr) {
processAll(all, dataMap, tagSchemaMap, tagIdSet);
} else {
process(all, dataMap, tagSchemaMap, tagIdSet);
}

if (yieldClause_ == nullptr) {
Expand Down Expand Up @@ -473,7 +435,7 @@ void FetchVerticesExecutor::processResult(RpcResponse &&result) {
auto tagIter = ds.find(tagId);
if (tagIter != ds.end()) {
auto vreader = tagIter->second.get();
auto vschema = vreader->getSchema().get();
auto vschema = tagSchemaMap[tagId].get();
return Collector::getProp(vschema, prop, vreader);
} else {
auto ts = ectx()->schemaManager()->getTagSchema(spaceId_, tagId);
Expand Down Expand Up @@ -543,7 +505,7 @@ void FetchVerticesExecutor::processResult(RpcResponse &&result) {
auto tagIter = ds.find(tagId);
if (tagIter != ds.end()) {
auto vreader = tagIter->second.get();
auto vschema = vreader->getSchema().get();
auto vschema = tagSchemaMap[tagId].get();
return Collector::getProp(vschema, prop, vreader);
} else {
auto ts = ectx()->schemaManager()->getTagSchema(spaceId_, tagId);
Expand Down Expand Up @@ -588,6 +550,99 @@ void FetchVerticesExecutor::processResult(RpcResponse &&result) {
finishExecution(std::move(rsWriter));
}

void FetchVerticesExecutor::process(
const std::vector<storage::cpp2::QueryResponse> &all,
std::unordered_map<VertexID, std::map<TagID, RowReader>> &dataMap,
std::unordered_map<TagID, std::shared_ptr<const meta::SchemaProviderIf>> &tagSchemaMap,
std::set<TagID> &tagIdSet) {
for (auto &resp : all) {
if (!resp.__isset.vertices || resp.vertices.empty()) {
continue;
}
auto *vertexSchema = resp.get_vertex_schema();
if (vertexSchema != nullptr) {
std::transform(vertexSchema->cbegin(), vertexSchema->cend(),
std::inserter(tagSchemaMap, tagSchemaMap.begin()), [](auto &s) {
return std::make_pair(
s.first, std::make_shared<ResultSchemaProvider>(s.second));
});
}

for (auto &vdata : resp.vertices) {
if (!vdata.__isset.tag_data || vdata.tag_data.empty()) {
continue;
}
for (auto& tagData : vdata.tag_data) {
auto& data = tagData.data;
VertexID vid = vdata.vertex_id;
TagID tagId = tagData.tag_id;
auto vschema = tagSchemaMap[tagId];
auto& readers = dataMap[vid];
if (vschema == nullptr) {
auto latestSchema = ectx()->schemaManager()->getTagSchema(spaceId_, tagId);
if (latestSchema == nullptr) {
VLOG(3) << "Schema not found for tag id: " << tagId;
// Ignore the bad data.
continue;
}
tagSchemaMap[tagId] = latestSchema;
vschema = latestSchema;
}
auto vreader = RowReader::getRowReader(data, vschema);
readers.emplace(std::make_pair(tagId, std::move(vreader)));
tagIdSet.insert(tagId);
}
}
}
}

void FetchVerticesExecutor::processAll(
const std::vector<storage::cpp2::QueryResponse> &all,
std::unordered_map<VertexID, std::map<TagID, RowReader>> &dataMap,
std::unordered_map<TagID, std::shared_ptr<const meta::SchemaProviderIf>> &tagSchemaMap,
std::set<TagID> &tagIdSet) {
for (auto &resp : all) {
if (!resp.__isset.vertices || resp.vertices.empty()) {
continue;
}

for (auto &vdata : resp.vertices) {
if (!vdata.__isset.tag_data || vdata.tag_data.empty()) {
continue;
}
for (auto& tagData : vdata.tag_data) {
auto& data = tagData.data;
VertexID vid = vdata.vertex_id;
TagID tagId = tagData.tag_id;
auto ver = RowReader::getSchemaVer(data);
if (ver < 0) {
LOG(ERROR) << "Found schema version negative " << ver;
doError(Status::Error("Found schema version negative: %d", ver));
return;
}
auto schema = ectx()->schemaManager()->getTagSchema(spaceId_, tagId, ver);
if (schema == nullptr) {
VLOG(3) << "Schema not found for tag id: " << tagId;
// Ignore the bad data.
continue;
}
if (tagSchemaMap.find(tagId) == tagSchemaMap.end()) {
auto latestSchema = ectx()->schemaManager()->getTagSchema(spaceId_, tagId);
if (latestSchema == nullptr) {
VLOG(3) << "Schema not found for tag id: " << tagId;
// Ignore the bad data.
continue;
}
tagSchemaMap[tagId] = latestSchema;
}
auto vreader = RowReader::getRowReader(data, schema);
dataMap[vid].emplace(std::make_pair(tagId, std::move(vreader)));
tagIdSet.insert(tagId);
}
}
}
}

void FetchVerticesExecutor::setupResponse(cpp2::ExecutionResponse &resp) {
if (resp_ == nullptr) {
resp_ = std::make_unique<cpp2::ExecutionResponse>();
Expand Down
12 changes: 12 additions & 0 deletions src/graph/FetchVerticesExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,18 @@ class FetchVerticesExecutor final : public TraverseExecutor {
kRef,
};

void process(
const std::vector<storage::cpp2::QueryResponse> &all,
std::unordered_map<VertexID, std::map<TagID, RowReader>> &dataMap,
std::unordered_map<TagID, std::shared_ptr<const meta::SchemaProviderIf>> &tagSchemaMap,
std::set<TagID> &tagIdSet);

void processAll(
const std::vector<storage::cpp2::QueryResponse> &all,
std::unordered_map<VertexID, std::map<TagID, RowReader>> &dataMap,
std::unordered_map<TagID, std::shared_ptr<const meta::SchemaProviderIf>> &tagSchemaMap,
std::set<TagID> &tagIdSet);

private:
GraphSpaceID spaceId_{-1};
FromType fromType_{kInstantExpr};
Expand Down
30 changes: 24 additions & 6 deletions src/graph/TraverseExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -248,38 +248,56 @@ OptVariantType Collector::getProp(const meta::SchemaProviderIf *schema,
switch (type) {
case SupportedType::BOOL: {
bool v;
reader->getBool(prop, v);
auto ret = reader->getBool(prop, v);
if (ret != ResultType::SUCCEEDED) {
return RowReader::getDefaultProp(type);
}
VLOG(3) << "get prop: " << prop << ", value: " << v;
return v;
}
case SupportedType::TIMESTAMP:
case SupportedType::INT: {
int64_t v;
reader->getInt(prop, v);
auto ret = reader->getInt(prop, v);
if (ret != ResultType::SUCCEEDED) {
return RowReader::getDefaultProp(type);
}
VLOG(3) << "get prop: " << prop << ", value: " << v;
return v;
}
case SupportedType::VID: {
VertexID v;
reader->getVid(prop, v);
auto ret = reader->getVid(prop, v);
if (ret != ResultType::SUCCEEDED) {
return RowReader::getDefaultProp(type);
}
VLOG(3) << "get prop: " << prop << ", value: " << v;
return v;
}
case SupportedType::FLOAT: {
float v;
reader->getFloat(prop, v);
auto ret = reader->getFloat(prop, v);
if (ret != ResultType::SUCCEEDED) {
return RowReader::getDefaultProp(type);
}
VLOG(3) << "get prop: " << prop << ", value: " << v;
return static_cast<double>(v);
}
case SupportedType::DOUBLE: {
double v;
reader->getDouble(prop, v);
auto ret = reader->getDouble(prop, v);
if (ret != ResultType::SUCCEEDED) {
return RowReader::getDefaultProp(type);
}
VLOG(3) << "get prop: " << prop << ", value: " << v;
return v;
}
case SupportedType::STRING: {
folly::StringPiece v;
reader->getString(prop, v);
auto ret = reader->getString(prop, v);
if (ret != ResultType::SUCCEEDED) {
return RowReader::getDefaultProp(type);
}
VLOG(3) << "get prop: " << prop << ", value: " << v;
return v.toString();
}
Expand Down
5 changes: 5 additions & 0 deletions src/parser/TraverseSentences.h
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,11 @@ class FetchVerticesSentence final : public Sentence {

std::string toString() const override;

bool isAllTags() const {
const auto& tagNames = tags_->labels();
return tagNames.size() == 1 && *tagNames[0] == "*";
}

private:
std::unique_ptr<FetchLabels> tags_;
std::unique_ptr<VertexIDList> vidList_;
Expand Down

0 comments on commit 721ae51

Please sign in to comment.