diff --git a/libcusp/include/galois/graphs/DistributedLocalGraph.h b/libcusp/include/galois/graphs/DistributedLocalGraph.h index 91935ec55..1661651f4 100644 --- a/libcusp/include/galois/graphs/DistributedLocalGraph.h +++ b/libcusp/include/galois/graphs/DistributedLocalGraph.h @@ -754,7 +754,8 @@ class DistLocalGraph { */ inline void determineThreadRangesMaster() { // make sure this hasn't been called before - assert(masterRanges.size() == 0); + if (masterRanges.size() != 0) + masterRanges.clear(); // first check if we even need to do any work; if already calculated, // use already calculated vector @@ -780,7 +781,8 @@ class DistLocalGraph { */ inline void determineThreadRangesWithEdges() { // make sure not called before - assert(withEdgeRanges.size() == 0); + if (withEdgeRanges.size() != 0) + withEdgeRanges.clear(); // first check if we even need to do any work; if already calculated, // use already calculated vector @@ -802,7 +804,8 @@ class DistLocalGraph { * over the graph in different ways. */ void initializeSpecificRanges() { - assert(specificRanges.size() == 0); + if (specificRanges.size() != 0) + specificRanges.clear(); // TODO/FIXME assertion likely not safe if a host gets no nodes // make sure the thread ranges have already been calculated @@ -949,20 +952,27 @@ class DistLocalGraph { host, galois::runtime::evilPhase, std::move(b)); } + void updateRanges() { + determineThreadRanges(); + determineThreadRangesMaster(); + determineThreadRangesWithEdges(); + initializeSpecificRanges(); + } + // Assumptions: // 1. A vertex is added before any edges are added to it // 2. No support for deleting edges/vertices yet // 3. Only works for OEC void updateVariables(bool isVertex, uint64_t src, - std::optional> dsts = std::nullopt) { + std::optional> dsts = std::nullopt, + std::optional> dstData = std::nullopt) { if (isVertex) { if (globalToLocalMap.find(src) == globalToLocalMap.end()) { localToGlobalVector.push_back(src); globalToLocalMap[src] = localToGlobalVector.size() - 1; numNodes++; - } else { } numOwned++; } else { @@ -971,14 +981,24 @@ class DistLocalGraph { if (edge_begin(srcLID) == edge_end(srcLID)) { numNodesWithEdges++; } + uint32_t i = 0; for (auto token : dsts.value()) { if (globalToLocalMap.find(token) == globalToLocalMap.end()) { localToGlobalVector.push_back(token); globalToLocalMap[token] = localToGlobalVector.size() - 1; numNodes++; + numNodesWithEdges++; + std::vector data; + data.push_back(dstData.value()[i]); + graph->addVertices(data); } + i++; if (!isOwned(token)) { mirrorNodes[getHostID(token)].push_back(token); + } else { + if (edge_begin(getLID(token)) == edge_end(getLID(token))) { + numNodesWithEdges++; + } } } numEdges += dsts.value().size(); @@ -990,7 +1010,7 @@ class DistLocalGraph { uint64_t belongsTo = getHostID(token); if (belongsTo == id) { updateVariables(true, token); - // graph->addVertexTopologyOnly(); + graph->addVertexTopologyOnly(); } else { sendModifyRequest(belongsTo, ADD_VERTEX_TOPOLOGY_ONLY, token); } @@ -1025,18 +1045,17 @@ class DistLocalGraph { } void addEdges(uint64_t src, std::vector dsts, - std::vector data) { + std::vector data, std::vector dstData) { uint64_t belongsTo = getHostID(src); if (belongsTo == id) { - updateVariables(false, src, dsts); + updateVariables(false, src, dsts, dstData); std::vector lids; for (uint32_t i = 0; i < dsts.size(); i++) { lids.push_back(getLID(dsts[i])); } graph->addEdges(getLID(src), lids, data); - } else { - sendModifyRequest(belongsTo, ADD_EDGES, src, dsts, data); + sendModifyRequest(belongsTo, ADD_EDGES, src, dsts, data, dstData); } } diff --git a/libgalois/include/galois/runtime/GraphUpdateManager.h b/libgalois/include/galois/runtime/GraphUpdateManager.h index 612ade11d..019a5d371 100644 --- a/libgalois/include/galois/runtime/GraphUpdateManager.h +++ b/libgalois/include/galois/runtime/GraphUpdateManager.h @@ -58,6 +58,7 @@ class graphUpdateManager { stopCheck = true; while (!checkThread.joinable()) ; + graph->updateRanges(); checkThread.join(); return stopIngest; } @@ -85,7 +86,8 @@ class graphUpdateManager { dsts.push_back(edge.dst); std::vector data; data.push_back(edge); - graph->addEdges(edge.src, dsts, data); + std::vector dstData = fileParser->GetDstData(value.edges); + graph->addEdges(edge.src, dsts, data, dstData); } } } @@ -110,6 +112,7 @@ class graphUpdateManager { galois::runtime::getHostBarrier().wait(); std::this_thread::sleep_for( std::chrono::milliseconds(periodForCheck)); + graph->updateRanges(); lineNumber = 0; } } @@ -141,7 +144,9 @@ class graphUpdateManager { galois::runtime::gDeserialize(m->second, edge_dsts); std::vector edge_data; galois::runtime::gDeserialize(m->second, edge_data); - graph->addEdges(src_node, edge_dsts, edge_data); + std::vector dst_data; + galois::runtime::gDeserialize(m->second, dst_data); + graph->addEdges(src_node, edge_dsts, edge_data, dst_data); } } std::this_thread::sleep_for( diff --git a/libwmd/include/galois/wmd/schema.h b/libwmd/include/galois/wmd/schema.h index 0cddf7fad..166cc47ec 100644 --- a/libwmd/include/galois/wmd/schema.h +++ b/libwmd/include/galois/wmd/schema.h @@ -77,6 +77,7 @@ class FileParser { virtual ~FileParser() {} virtual ParsedGraphStructure ParseLine(char* line, uint64_t lineLength) = 0; + virtual std::vector GetDstData(std::vector& edges) = 0; static std::vector SplitLine(const char* line, uint64_t lineLength, char delim, uint64_t numTokens) { @@ -171,6 +172,13 @@ class WMDParser : public FileParser { return ParsedGraphStructure(edges); } } + std::vector GetDstData(std::vector& edges) override { + std::vector dstData; + for (auto& edge : edges) { + dstData.emplace_back(edge.dst, 0, edge.dst_type); + } + return dstData; + } private: uint64_t csvFields_;