Skip to content

Commit

Permalink
Merge branch '67-shortest-path-bounded' of https://github.com/cwida/d…
Browse files Browse the repository at this point in the history
…uckpgq-extension into 67-shortest-path-bounded
  • Loading branch information
SiberiaWolfP committed Feb 26, 2024
2 parents 81ca5f9 + aa8dfa4 commit 737ab85
Show file tree
Hide file tree
Showing 11 changed files with 334 additions and 145 deletions.
9 changes: 6 additions & 3 deletions duckpgq/src/duckpgq/functions/scalar/csr_creation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ static void CsrInitializeVertex(DuckPGQState &context, int32_t id,
csr->initialized_v = true;
context.csr_list[id] = std::move(csr);
} catch (std::bad_alloc const &) {
throw Exception(ExceptionType::INTERNAL, "Unable to initialize vector of size for csr vertex table "
throw Exception(ExceptionType::INTERNAL,
"Unable to initialize vector of size for csr vertex table "
"representation");
}

Expand All @@ -55,7 +56,8 @@ static void CsrInitializeEdge(DuckPGQState &context, int32_t id, int64_t v_size,
csr_entry->second->e.resize(e_size, 0);
csr_entry->second->edge_ids.resize(e_size, 0);
} catch (std::bad_alloc const &) {
throw Exception(ExceptionType::INTERNAL, "Unable to initialize vector of size for csr edge table "
throw Exception(ExceptionType::INTERNAL,
"Unable to initialize vector of size for csr edge table "
"representation");
}
for (auto i = 1; i < v_size + 2; i++) {
Expand All @@ -82,7 +84,8 @@ static void CsrInitializeWeight(DuckPGQState &context, int32_t id,
throw NotImplementedException("Unrecognized weight type detected.");
}
} catch (std::bad_alloc const &) {
throw Exception(ExceptionType::INTERNAL, "Unable to initialize vector of size for csr weight table "
throw Exception(ExceptionType::INTERNAL,
"Unable to initialize vector of size for csr weight table "
"representation");
}

Expand Down
3 changes: 3 additions & 0 deletions duckpgq/src/duckpgq/functions/scalar/iterativelength.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,12 @@ static void IterativeLengthFunction(DataChunk &args, ExpressionState &state,
while (started_searches < args.size()) {
int64_t search_num = started_searches++;
int64_t src_pos = vdata_src.sel->get_index(search_num);
int64_t dst_pos = vdata_dst.sel->get_index(search_num);
if (!vdata_src.validity.RowIsValid(src_pos)) {
result_validity.SetInvalid(search_num);
result_data[search_num] = (int64_t)-1; /* no path */
} else if (src_data[src_pos] == dst_data[dst_pos]) {
result_data[search_num] = (int64_t)0; /* source == destination, length is 0 */
} else {
result_data[search_num] = (int64_t)-1; /* initialize to no path */
visit1[src_data[src_pos]][lane] = true;
Expand Down
47 changes: 22 additions & 25 deletions duckpgq/src/duckpgq/functions/scalar/iterativelength_lowerbound.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,9 @@
namespace duckdb {

static bool IterativeLengthLowerBound(int64_t v_size, int64_t *v, vector<int64_t> &e,
// vector<vector<unordered_set<int64_t>>> &parents_v,
vector<std::bitset<LANE_LIMIT>> &seen,
vector<std::bitset<LANE_LIMIT>> &visit,
vector<std::bitset<LANE_LIMIT>> &next) {
vector<std::bitset<LANE_LIMIT>> &seen,
vector<std::bitset<LANE_LIMIT>> &visit,
vector<std::bitset<LANE_LIMIT>> &next) {
bool change = false;
for (auto i = 0; i < v_size; i++) {
next[i] = 0;
Expand All @@ -23,11 +22,6 @@ static bool IterativeLengthLowerBound(int64_t v_size, int64_t *v, vector<int64_t
if (visit[i][lane]) {
for (auto offset = v[i]; offset < v[i + 1]; offset++) {
auto n = e[offset];
// if (seen[n][lane] == false || parents_v[i][lane].find(n) == parents_v[i][lane].end()) {
// parents_v[n][lane] = parents_v[i][lane];
// parents_v[n][lane].insert(i);
// next[n][lane] = true;
// }
next[n][lane] = true;
}
}
Expand All @@ -38,12 +32,13 @@ static bool IterativeLengthLowerBound(int64_t v_size, int64_t *v, vector<int64_t
seen[i] = seen[i] | next[i];
change |= next[i].any();
}

return change;
}

static void IterativeLengthLowerBoundFunction(DataChunk &args, ExpressionState &state,
Vector &result) {
static void IterativeLengthLowerBoundFunction(DataChunk &args,
ExpressionState &state,
Vector &result) {
auto &func_expr = (BoundFunctionExpression &)state.expr;
auto &info = (IterativeLengthFunctionData &)*func_expr.bind_info;
auto duckpgq_state_entry = info.context.registered_state.find("duckpgq");
Expand Down Expand Up @@ -105,7 +100,8 @@ static void IterativeLengthLowerBoundFunction(DataChunk &args, ExpressionState &
vector<std::bitset<LANE_LIMIT>> seen(v_size);
vector<std::bitset<LANE_LIMIT>> visit1(v_size);
vector<std::bitset<LANE_LIMIT>> visit2(v_size);
vector<vector<unordered_set<int64_t>>> parents_v(v_size, std::vector<unordered_set<int64_t>>(LANE_LIMIT));
vector<vector<unordered_set<int64_t>>> parents_v(
v_size, std::vector<unordered_set<int64_t>>(LANE_LIMIT));

// maps lane to search number
short lane_to_num[LANE_LIMIT];
Expand All @@ -129,7 +125,7 @@ static void IterativeLengthLowerBoundFunction(DataChunk &args, ExpressionState &
while (started_searches < args.size()) {
int64_t search_num = started_searches++;
int64_t src_pos = vdata_src.sel->get_index(search_num);
if (!vdata_src.validity.RowIsValid(src_pos)) {
if (!vdata_src.validity.RowIsValid(src_pos)) { // NULL value
result_validity.SetInvalid(search_num);
result_data[search_num] = (int64_t)-1; /* no path */
} else {
Expand All @@ -144,16 +140,17 @@ static void IterativeLengthLowerBoundFunction(DataChunk &args, ExpressionState &

// make passes while a lane is still active
for (int64_t iter = 1; active && iter <= upper_bound; iter++) {
if (!IterativeLengthLowerBound(v_size, v, e, seen, (iter & 1) ? visit1 : visit2,
(iter & 1) ? visit2 : visit1)) {
if (!IterativeLengthLowerBound(v_size, v, e, seen,
(iter & 1) ? visit1 : visit2,
(iter & 1) ? visit2 : visit1)) {
break;
}
// detect lanes that finished
for (int64_t lane = 0; lane < LANE_LIMIT; lane++) {
int64_t search_num = lane_to_num[lane];
if (search_num >= 0) { // active lane
int64_t dst_pos = vdata_dst.sel->get_index(search_num);
if (seen[dst_data[dst_pos]][lane]){
if (seen[dst_data[dst_pos]][lane]) {

// check if the path length is within bounds
// bound vector is either a constant or a flat vector
Expand All @@ -170,7 +167,6 @@ static void IterativeLengthLowerBoundFunction(DataChunk &args, ExpressionState &
lane_to_num[lane] = -1; // mark inactive
active--;
}

}
}
}
Expand All @@ -189,13 +185,14 @@ static void IterativeLengthLowerBoundFunction(DataChunk &args, ExpressionState &
duckpgq_state->csr_to_delete.insert(info.csr_id);
}

CreateScalarFunctionInfo DuckPGQFunctions::GetIterativeLengthLowerBoundFunction() {
auto fun = ScalarFunction("iterativelength_lowerbound",
{LogicalType::INTEGER, LogicalType::BIGINT,
LogicalType::BIGINT, LogicalType::BIGINT,
LogicalType::BIGINT, LogicalType::BIGINT},
LogicalType::BIGINT, IterativeLengthLowerBoundFunction,
IterativeLengthFunctionData::IterativeLengthBind);
CreateScalarFunctionInfo
DuckPGQFunctions::GetIterativeLengthLowerBoundFunction() {
auto fun = ScalarFunction(
"iterativelength_lowerbound",
{LogicalType::INTEGER, LogicalType::BIGINT, LogicalType::BIGINT,
LogicalType::BIGINT, LogicalType::BIGINT, LogicalType::BIGINT},
LogicalType::BIGINT, IterativeLengthLowerBoundFunction,
IterativeLengthFunctionData::IterativeLengthBind);
return CreateScalarFunctionInfo(fun);
}

Expand Down
3 changes: 2 additions & 1 deletion duckpgq/src/duckpgq/functions/scalar/reachability.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,8 @@ static void ReachabilityFunction(DataChunk &args, ExpressionState &state,
break;
}
default:
throw Exception(ExceptionType::INTERNAL, "Unknown reachability mode encountered");
throw Exception(ExceptionType::INTERNAL,
"Unknown reachability mode encountered");
}
} else {
exit_early = BfsWithoutArray(exit_early, csr, input_size, seen, visit,
Expand Down
13 changes: 6 additions & 7 deletions duckpgq/src/duckpgq/functions/scalar/shortest_path.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -236,13 +236,12 @@ static void ShortestPathFunction(DataChunk &args, ExpressionState &state,
}

CreateScalarFunctionInfo DuckPGQFunctions::GetShortestPathFunction() {
auto fun = ScalarFunction("shortestpath",
{LogicalType::INTEGER, LogicalType::BIGINT,
LogicalType::BIGINT, LogicalType::BIGINT,
LogicalType::BIGINT, LogicalType::BIGINT},
LogicalType::LIST(LogicalType::BIGINT),
ShortestPathFunction,
IterativeLengthFunctionData::IterativeLengthBind);
auto fun = ScalarFunction(
"shortestpath",
{LogicalType::INTEGER, LogicalType::BIGINT, LogicalType::BIGINT,
LogicalType::BIGINT, LogicalType::BIGINT, LogicalType::BIGINT},
LogicalType::LIST(LogicalType::BIGINT), ShortestPathFunction,
IterativeLengthFunctionData::IterativeLengthBind);
return CreateScalarFunctionInfo(fun);
}

Expand Down
66 changes: 32 additions & 34 deletions duckpgq/src/duckpgq/functions/scalar/shortest_path_lowerbound.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,12 @@
namespace duckdb {

static bool IterativeLengthLowerBound(int64_t v_size, int64_t *V, vector<int64_t> &E,
vector<int64_t> &edge_ids,
// vector<vector<unordered_set<int64_t>>> &parents_v,
vector<vector<vector<int64_t>>> &paths_v,
vector<vector<vector<int64_t>>> &paths_e,
vector<std::bitset<LANE_LIMIT>> &seen,
vector<std::bitset<LANE_LIMIT>> &visit,
vector<std::bitset<LANE_LIMIT>> &next) {
vector<int64_t> &edge_ids,
vector<vector<vector<int64_t>>> &paths_v,
vector<vector<vector<int64_t>>> &paths_e,
vector<std::bitset<LANE_LIMIT>> &seen,
vector<std::bitset<LANE_LIMIT>> &visit,
vector<std::bitset<LANE_LIMIT>> &next) {
bool change = false;
// map<pair<int64_t, int64_t>, unordered_set<int64_t>> parents_v_cache;
map<pair<int64_t, int64_t>, vector<int64_t>> paths_v_cache;
Expand All @@ -36,7 +35,8 @@ static bool IterativeLengthLowerBound(int64_t v_size, int64_t *V, vector<int64_t

for (auto lane = 0; lane < LANE_LIMIT; lane++) {
if (visit[v][lane]) {
//! If the node has not been visited, then update the parent and edge
//! If the node has not been visited, then update the parent and
//! edge
if (seen[n][lane] == false) {
if (visit[n][lane]) {
// parents_v_cache[make_pair(n, lane)] = parents_v[v][lane];
Expand All @@ -54,20 +54,17 @@ static bool IterativeLengthLowerBound(int64_t v_size, int64_t *V, vector<int64_t
paths_e[n][lane].push_back(edge_id);
}
next[n][lane] = true;
}
}
}
}
}
}
}

// for (auto const& cache: parents_v_cache) {
// parents_v[cache.first.first][cache.first.second] = cache.second;
// }
for (auto const& cache: paths_v_cache) {
for (auto const &cache : paths_v_cache) {
paths_v[cache.first.first][cache.first.second] = cache.second;
}
for (auto const& cache: paths_e_cache) {
for (auto const &cache : paths_e_cache) {
paths_e[cache.first.first][cache.first.second] = cache.second;
}

Expand All @@ -78,8 +75,9 @@ static bool IterativeLengthLowerBound(int64_t v_size, int64_t *V, vector<int64_t
return change;
}

static void ShortestPathLowerBoundFunction(DataChunk &args, ExpressionState &state,
Vector &result) {
static void ShortestPathLowerBoundFunction(DataChunk &args,
ExpressionState &state,
Vector &result) {
auto &func_expr = (BoundFunctionExpression &)state.expr;
auto &info = (IterativeLengthFunctionData &)*func_expr.bind_info;
auto duckpgq_state_entry = info.context.registered_state.find("duckpgq");
Expand Down Expand Up @@ -129,10 +127,10 @@ static void ShortestPathLowerBoundFunction(DataChunk &args, ExpressionState &sta
vector<std::bitset<LANE_LIMIT>> visit1(v_size);
vector<std::bitset<LANE_LIMIT>> visit2(v_size);

// vector<vector<unordered_set<int64_t>>> parents_v(v_size, std::vector<unordered_set<int64_t>>(LANE_LIMIT));
vector<vector<vector<int64_t>>> paths_v(v_size, std::vector<vector<int64_t>>(LANE_LIMIT));
vector<vector<vector<int64_t>>> paths_e(v_size, std::vector<vector<int64_t>>(LANE_LIMIT));

vector<vector<vector<int64_t>>> paths_v(
v_size, std::vector<vector<int64_t>>(LANE_LIMIT));
vector<vector<vector<int64_t>>> paths_e(
v_size, std::vector<vector<int64_t>>(LANE_LIMIT));

// maps lane to search number
int16_t lane_to_num[LANE_LIMIT];
Expand Down Expand Up @@ -171,9 +169,9 @@ static void ShortestPathLowerBoundFunction(DataChunk &args, ExpressionState &sta
//! make passes while a lane is still active
for (int64_t iter = 1; active && iter <= upper_bound; iter++) {
//! Perform one step of bfs exploration
if (!IterativeLengthLowerBound(v_size, v, e, edge_ids, paths_v, paths_e, seen,
(iter & 1) ? visit1 : visit2,
(iter & 1) ? visit2 : visit1)) {
if (!IterativeLengthLowerBound(
v_size, v, e, edge_ids, paths_v, paths_e, seen,
(iter & 1) ? visit1 : visit2, (iter & 1) ? visit2 : visit1)) {
break;
}
// detect lanes that finished
Expand All @@ -192,23 +190,24 @@ static void ShortestPathLowerBoundFunction(DataChunk &args, ExpressionState &sta
auto it_v = paths_v[dst_data[dst_pos]][lane].begin(),
end_v = paths_v[dst_data[dst_pos]][lane].end();
auto it_e = paths_e[dst_data[dst_pos]][lane].begin(),
end_e = paths_e[dst_data[dst_pos]][lane].end();
end_e = paths_e[dst_data[dst_pos]][lane].end();
while (it_v != end_v && it_e != end_e) {
output_vector.push_back(*it_v);
output_vector.push_back(*it_e);
it_v++;
it_e++;
}
output_vector.push_back(dst_data[dst_pos]);
auto output = make_uniq<Vector>(LogicalType::LIST(LogicalType::BIGINT));
auto output =
make_uniq<Vector>(LogicalType::LIST(LogicalType::BIGINT));
for (auto val : output_vector) {
Value value_to_insert = val;
ListVector::PushBack(*output, value_to_insert);
}
result_data[search_num].length = ListVector::GetListSize(*output);
result_data[search_num].offset = total_len;
ListVector::Append(result, ListVector::GetEntry(*output),
ListVector::GetListSize(*output));
ListVector::GetListSize(*output));
total_len += result_data[search_num].length;
lane_to_num[lane] = -1; // mark inactive
}
Expand All @@ -222,21 +221,20 @@ static void ShortestPathLowerBoundFunction(DataChunk &args, ExpressionState &sta
int64_t search_num = lane_to_num[lane];
if (search_num >= 0) { // active lane
result_validity.SetInvalid(search_num);
lane_to_num[lane] = -1; // mark inactive
lane_to_num[lane] = -1; // mark inactive
}
}
}
duckpgq_state->csr_to_delete.insert(info.csr_id);
}

// CreateScalarFunctionInfo DuckPGQFunctions::GetShortestPathLowerBoundFunction() {
// auto fun = ScalarFunction("shortestpath_lowerbound",
// {LogicalType::INTEGER, LogicalType::BIGINT,
// LogicalType::BIGINT, LogicalType::BIGINT,
// LogicalType::BIGINT, LogicalType::BIGINT},
// LogicalType::LIST(LogicalType::BIGINT),
// ShortestPathLowerBoundFunction,
// IterativeLengthFunctionData::IterativeLengthBind);
// auto fun = ScalarFunction(
// "shortestpath_lowerbound",
// {LogicalType::INTEGER, LogicalType::BIGINT, LogicalType::BIGINT,
// LogicalType::BIGINT, LogicalType::BIGINT, LogicalType::BIGINT},
// LogicalType::LIST(LogicalType::BIGINT), ShortestPathLowerBoundFunction,
// IterativeLengthFunctionData::IterativeLengthBind);
// return CreateScalarFunctionInfo(fun);
// }

Expand Down
Loading

0 comments on commit 737ab85

Please sign in to comment.