Skip to content

Commit

Permalink
Push vFilter down. (#4260)
Browse files Browse the repository at this point in the history
* Push vFilter down.

* Rebase.

Co-authored-by: Sophie <[email protected]>
  • Loading branch information
Shylock-Hg and Sophie-Xie authored Jun 7, 2022
1 parent 32f449f commit 591a8c5
Show file tree
Hide file tree
Showing 12 changed files with 326 additions and 33 deletions.
17 changes: 16 additions & 1 deletion src/graph/executor/query/TraverseExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ folly::Future<Status> TraverseExecutor::getNeighbors() {
finalStep ? traverse_->random() : false,
finalStep ? traverse_->orderBy() : std::vector<storage::cpp2::OrderBy>(),
finalStep ? traverse_->limit(qctx()) : -1,
(currentStep_ == 1 && zeroStep()) ? nullptr : traverse_->filter())
selectFilter())
.via(runner())
.thenValue([this, getNbrTime](StorageRpcResponse<GetNeighborsResponse>&& resp) mutable {
SCOPED_TIMER(&execTime_);
Expand All @@ -108,6 +108,21 @@ folly::Future<Status> TraverseExecutor::getNeighbors() {
});
}

Expression* TraverseExecutor::selectFilter() {
Expression* filter = nullptr;
if (!(currentStep_ == 1 && zeroStep())) {
filter = const_cast<Expression*>(traverse_->filter());
}
if (currentStep_ == 1) {
if (filter == nullptr) {
filter = traverse_->firstStepFilter();
} else if (traverse_->firstStepFilter() != nullptr) {
filter = LogicalExpression::makeAnd(&objPool_, filter, traverse_->firstStepFilter());
}
}
return filter;
}

void TraverseExecutor::addStats(RpcResponse& resp, int64_t getNbrTimeInUSec) {
auto& hostLatency = resp.hostLatency();
std::stringstream ss;
Expand Down
3 changes: 3 additions & 0 deletions src/graph/executor/query/TraverseExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,10 @@ class TraverseExecutor final : public StorageAccessExecutor {
std::unordered_map<Value, Paths>& zeroSteps,
size_t& count);

Expression* selectFilter();

private:
ObjectPool objPool_;
DataSet reqDs_;
const Traverse* traverse_{nullptr};
MatchStepRange* range_{nullptr};
Expand Down
1 change: 1 addition & 0 deletions src/graph/optimizer/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ nebula_add_library(
rule/PushFilterDownAggregateRule.cpp
rule/PushFilterDownProjectRule.cpp
rule/PushFilterDownLeftJoinRule.cpp
rule/PushFilterDownNodeRule.cpp
rule/PushFilterDownScanVerticesRule.cpp
rule/PushVFilterDownScanVerticesRule.cpp
rule/OptimizeEdgeIndexScanByFilterRule.cpp
Expand Down
125 changes: 125 additions & 0 deletions src/graph/optimizer/rule/PushFilterDownNodeRule.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/* Copyright (c) 2021 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License.
*/

#include "graph/optimizer/rule/PushFilterDownNodeRule.h"

#include "common/expression/Expression.h"
#include "common/expression/LogicalExpression.h"
#include "graph/optimizer/OptContext.h"
#include "graph/optimizer/OptGroup.h"
#include "graph/planner/plan/PlanNode.h"
#include "graph/planner/plan/Query.h"
#include "graph/util/ExpressionUtils.h"
#include "graph/visitor/ExtractFilterExprVisitor.h"

using nebula::Expression;
using nebula::graph::AppendVertices;
using nebula::graph::Explore;
using nebula::graph::PlanNode;
using nebula::graph::QueryContext;
using nebula::graph::Traverse;

namespace nebula {
namespace opt {

/*static*/ const std::initializer_list<graph::PlanNode::Kind> PushFilterDownNodeRule::kNodes{
graph::PlanNode::Kind::kTraverse,
graph::PlanNode::Kind::kAppendVertices,
};

std::unique_ptr<OptRule> PushFilterDownNodeRule::kInstance =
std::unique_ptr<PushFilterDownNodeRule>(new PushFilterDownNodeRule());

PushFilterDownNodeRule::PushFilterDownNodeRule() {
RuleSet::QueryRules().addRule(this);
}

const Pattern &PushFilterDownNodeRule::pattern() const {
static Pattern pattern = Pattern::create(kNodes);
return pattern;
}

StatusOr<OptRule::TransformResult> PushFilterDownNodeRule::transform(
OptContext *ctx, const MatchedResult &matched) const {
auto qctx = ctx->qctx();
auto pool = qctx->objPool();
auto *groupNode = matched.node;
auto *node = groupNode->node();
Expression *vFilter = nullptr;
if (node->kind() == graph::PlanNode::Kind::kTraverse) {
auto *traverse = static_cast<const Traverse *>(node);
vFilter = traverse->vFilter()->clone();
} else if (node->kind() == graph::PlanNode::Kind::kAppendVertices) {
auto *append = static_cast<const AppendVertices *>(node);
vFilter = append->vFilter()->clone();
} else {
DLOG(FATAL) << "Unsupported node kind: " << node->kind();
return TransformResult::noTransform();
}
auto visitor = graph::ExtractFilterExprVisitor::makePushGetVertices(pool);
vFilter->accept(&visitor);
if (!visitor.ok()) {
return TransformResult::noTransform();
}
auto remainedExpr = std::move(visitor).remainedExpr();
auto *explore = static_cast<const Explore *>(node);
auto *newExplore = static_cast<Explore *>(node->clone());
newExplore->setOutputVar(explore->outputVar());
auto newExploreGroupNode = OptGroupNode::create(ctx, newExplore, groupNode->group());
if (explore->filter() != nullptr) {
vFilter = LogicalExpression::makeAnd(pool, vFilter, newExplore->filter());
}
if (newExplore->kind() == graph::PlanNode::Kind::kTraverse) {
auto *traverse = static_cast<Traverse *>(newExplore);
traverse->setVertexFilter(remainedExpr);
if (traverse->firstStepFilter() != nullptr) {
vFilter = LogicalExpression::makeAnd(pool, vFilter, traverse->firstStepFilter());
}
traverse->setFirstStepFilter(vFilter);
} else if (newExplore->kind() == graph::PlanNode::Kind::kAppendVertices) {
auto *append = static_cast<AppendVertices *>(newExplore);
append->setVertexFilter(remainedExpr);
append->setFilter(vFilter);
} else {
DLOG(FATAL) << "Unsupported node kind: " << newExplore->kind();
return TransformResult::noTransform();
}

newExploreGroupNode->setDeps(groupNode->dependencies());

TransformResult result;
result.eraseAll = true;
result.newGroupNodes.emplace_back(newExploreGroupNode);
return result;
}

bool PushFilterDownNodeRule::match(OptContext *octx, const MatchedResult &matched) const {
if (!OptRule::match(octx, matched)) {
return false;
}
const auto *node = matched.node->node();
if (node->kind() == graph::PlanNode::Kind::kTraverse) {
const auto *traverse = static_cast<const graph::Traverse *>(node);
if (traverse->vFilter() == nullptr) {
return false;
}
} else if (node->kind() == graph::PlanNode::Kind::kAppendVertices) {
const auto *append = static_cast<const graph::AppendVertices *>(node);
if (append->vFilter() == nullptr) {
return false;
}
} else {
DLOG(FATAL) << "Unexpected node kind: " << node->kind();
return false;
}
return true;
}

std::string PushFilterDownNodeRule::toString() const {
return "PushFilterDownNodeRule";
}

} // namespace opt
} // namespace nebula
35 changes: 35 additions & 0 deletions src/graph/optimizer/rule/PushFilterDownNodeRule.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Copyright (c) 2021 vesoft inc. All rights reserved.
//
// This source code is licensed under Apache 2.0 License.

#pragma once

#include <memory>

#include "graph/optimizer/OptRule.h"

namespace nebula {
namespace opt {

// Push down the vertex filter of Traverse/AppendVertices

class PushFilterDownNodeRule final : public OptRule {
public:
const Pattern &pattern() const override;

StatusOr<TransformResult> transform(OptContext *ctx, const MatchedResult &matched) const override;

std::string toString() const override;

bool match(OptContext *ctx, const MatchedResult &matched) const override;

private:
PushFilterDownNodeRule();

static std::unique_ptr<OptRule> kInstance;

static const std::initializer_list<graph::PlanNode::Kind> kNodes;
};

} // namespace opt
} // namespace nebula
52 changes: 34 additions & 18 deletions src/graph/optimizer/rule/PushVFilterDownScanVerticesRule.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,17 @@ bool PushVFilterDownScanVerticesRule::match(OptContext *ctx, const MatchedResult
if (propExpr->prop() != kVid) {
return false;
}
if (appendVertices->vFilter() == nullptr) {
if (appendVertices->vFilter() == nullptr && appendVertices->filter() == nullptr) {
return false;
}
auto tagPropExprs = graph::ExpressionUtils::collectAll(appendVertices->vFilter(),
{Expression::Kind::kTagProperty});
for (const auto &tagPropExpr : tagPropExprs) {
auto tagProp = static_cast<const PropertyExpression *>(tagPropExpr);
if (tagProp->sym() == "*") {
return false;
if (appendVertices->vFilter() != nullptr) {
auto tagPropExprs = graph::ExpressionUtils::collectAll(appendVertices->vFilter(),
{Expression::Kind::kTagProperty});
for (const auto &tagPropExpr : tagPropExprs) {
auto tagProp = static_cast<const PropertyExpression *>(tagPropExpr);
if (tagProp->sym() == "*") {
return false;
}
}
}
return true;
Expand All @@ -78,26 +80,40 @@ StatusOr<OptRule::TransformResult> PushVFilterDownScanVerticesRule::transform(
auto sv = static_cast<const ScanVertices *>(svGroupNode->node());
auto qctx = ctx->qctx();
auto pool = qctx->objPool();
auto condition = appendVertices->vFilter()->clone();
auto newAppendVertices = appendVertices->clone();
Expression *condition = nullptr;
if (appendVertices->vFilter() != nullptr) {
condition = appendVertices->vFilter()->clone();

auto visitor = graph::ExtractFilterExprVisitor::makePushGetVertices(pool);
condition->accept(&visitor);
if (!visitor.ok()) {
return TransformResult::noTransform();
}

auto visitor = graph::ExtractFilterExprVisitor::makePushGetVertices(pool);
condition->accept(&visitor);
if (!visitor.ok()) {
return TransformResult::noTransform();
auto remainedExpr = std::move(visitor).remainedExpr();
newAppendVertices->setVertexFilter(remainedExpr);
}

auto remainedExpr = std::move(visitor).remainedExpr();
OptGroupNode *newAppendVerticesGroupNode = nullptr;
auto newAppendVertices = appendVertices->clone();
newAppendVertices->setVertexFilter(remainedExpr);
newAppendVertices->setOutputVar(appendVertices->outputVar());
newAppendVerticesGroupNode =
OptGroupNode::create(ctx, newAppendVertices, appendVerticesGroupNode->group());

auto newSVFilter = condition;
if (newAppendVertices->filter() != nullptr) {
if (newSVFilter == nullptr) {
newSVFilter = newAppendVertices->filter();
} else {
newSVFilter = LogicalExpression::makeAnd(pool, newAppendVertices->filter(), newSVFilter);
}
newAppendVertices->setFilter(nullptr);
}
if (sv->filter() != nullptr) {
auto logicExpr = LogicalExpression::makeAnd(pool, condition, sv->filter()->clone());
newSVFilter = logicExpr;
if (newSVFilter == nullptr) {
newSVFilter = sv->filter()->clone();
} else {
newSVFilter = LogicalExpression::makeAnd(pool, newSVFilter, sv->filter()->clone());
}
}

auto newSV = static_cast<ScanVertices *>(sv->clone());
Expand Down
10 changes: 8 additions & 2 deletions src/graph/planner/match/MatchSolver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,14 +92,20 @@ Expression* MatchSolver::makeIndexFilter(const std::string& label,
return TagPropertyExpression::make(qctx->objPool(), label, prop);
};

auto root = LogicalExpression::makeAnd(qctx->objPool());
std::vector<Expression*> operands;
operands.reserve(map->size());
for (const auto& item : map->items()) {
operands.emplace_back(RelationalExpression::makeEQ(
qctx->objPool(), makePropExpr(item.first), item.second->clone()));
}
root->setOperands(std::move(operands));
Expression* root = nullptr;
DCHECK(!operands.empty());
if (operands.size() == 1) {
root = operands.front();
} else {
root = LogicalExpression::makeAnd(qctx->objPool());
static_cast<LogicalExpression*>(root)->setOperands(std::move(operands));
}
return root;
}

Expand Down
6 changes: 6 additions & 0 deletions src/graph/planner/plan/Query.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -738,6 +738,9 @@ void Traverse::cloneMembers(const Traverse& g) {
setEdgeFilter(g.eFilter_->clone());
}
setTrackPrevPath(g.trackPrevPath_);
if (g.firstStepFilter_ != nullptr) {
setFirstStepFilter(g.firstStepFilter_->clone());
}
}

std::unique_ptr<PlanNodeDescription> Traverse::explain() const {
Expand All @@ -746,6 +749,9 @@ std::unique_ptr<PlanNodeDescription> Traverse::explain() const {
addDescription("vertex filter", vFilter_ != nullptr ? vFilter_->toString() : "", desc.get());
addDescription("edge filter", eFilter_ != nullptr ? eFilter_->toString() : "", desc.get());
addDescription("if_track_previous_path", folly::toJson(util::toJson(trackPrevPath_)), desc.get());
addDescription("first step filter",
firstStepFilter_ != nullptr ? firstStepFilter_->toString() : "",
desc.get());
return desc;
}

Expand Down
10 changes: 10 additions & 0 deletions src/graph/planner/plan/Query.h
Original file line number Diff line number Diff line change
Expand Up @@ -1487,6 +1487,14 @@ class Traverse final : public GetNeighbors {
trackPrevPath_ = track;
}

Expression* firstStepFilter() const {
return firstStepFilter_;
}

void setFirstStepFilter(Expression* filter) {
firstStepFilter_ = filter;
}

private:
Traverse(QueryContext* qctx, PlanNode* input, GraphSpaceID space)
: GetNeighbors(qctx, Kind::kTraverse, input, space) {
Expand All @@ -1500,6 +1508,8 @@ class Traverse final : public GetNeighbors {
Expression* vFilter_{nullptr};
Expression* eFilter_{nullptr};
bool trackPrevPath_{true};
// Push down filter in first step
Expression* firstStepFilter_{nullptr};
};

// Append vertices to a path.
Expand Down
6 changes: 5 additions & 1 deletion src/graph/visitor/ExtractFilterExprVisitor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,11 @@ void ExtractFilterExprVisitor::visit(VersionedVariableExpression *) {
canBePushed_ = false;
}

void ExtractFilterExprVisitor::visit(TagPropertyExpression *) {
void ExtractFilterExprVisitor::visit(TagPropertyExpression *expr) {
if (expr->sym() == "*") { // Storage don't support '*' for tag
canBePushed_ = false;
return;
}
switch (pushType_) {
case PushType::kGetNeighbors:
canBePushed_ = false;
Expand Down
Loading

0 comments on commit 591a8c5

Please sign in to comment.