Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add examples for api call algorithm and string id mapping #21

Merged
merged 3 commits into from
Dec 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions example/src/main/resources/data.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
src,dst,weight
1,2,1.0
1,3,2.0
1,4,3.0
1,5,1.5
2,3,2.6
2,4,2.0
3,4,3.0
3,5,4.0
2,5,1.0
10 changes: 10 additions & 0 deletions example/src/main/resources/string_data.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
src,dst,weight
a,b,1.0
c,d,2.0
1,4,3.0
1,5,1.5
2,3,2.6
2,4,2.0
3,4,3.0
3,5,4.0
2,5,1.0
10 changes: 10 additions & 0 deletions example/src/main/resources/test.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
1,1
2,1
3,1
4,4
5,4
6,4
7,4
8,4
9,9
10,9
Original file line number Diff line number Diff line change
@@ -1,9 +1,61 @@
/* Copyright (c) 2020 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License,
* attached with Common Clause Condition 1.0, found in the LICENSES directory.
* This source code is licensed under Apache 2.0 License.
*/

package com.vesoft.nebula.algorithm

object ClusteringCoefficientExample {}
import com.facebook.thrift.protocol.TCompactProtocol
import com.vesoft.nebula.algorithm.config.CoefficientConfig
import com.vesoft.nebula.algorithm.lib.ClusteringCoefficientAlgo
import org.apache.spark.SparkConf
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.{DataFrame, SparkSession}

object ClusteringCoefficientExample {

def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf()
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.registerKryoClasses(Array[Class[_]](classOf[TCompactProtocol]))
val spark = SparkSession
.builder()
.master("local")
.config(sparkConf)
.getOrCreate()

val csvDF = ReadData.readCsvData(spark)
val nebulaDF = ReadData.readNebulaData(spark)
val liveJournalDF = ReadData.readLiveJournalData(spark)

localClusteringCoefficient(spark, csvDF)
globalCLusteringCoefficient(spark, csvDF)
}

/**
* compute for local clustering coefficient
*/
def localClusteringCoefficient(spark: SparkSession, df: DataFrame): Unit = {
val localClusteringCoefficientConfig = new CoefficientConfig("local")
val localClusterCoeff =
ClusteringCoefficientAlgo.apply(spark, df, localClusteringCoefficientConfig)
localClusterCoeff.show()
localClusterCoeff
.filter(row => !row.get(1).toString.equals("0.0"))
.orderBy(col("clustercoefficient"))
.write
.option("header", true)
.csv("hdfs://127.0.0.1:9000/tmp/ccresult")
}

/**
* compute for global clustering coefficient
*/
def globalCLusteringCoefficient(spark: SparkSession, df: DataFrame): Unit = {
val globalClusteringCoefficientConfig = new CoefficientConfig("global")
val globalClusterCoeff =
ClusteringCoefficientAlgo.apply(spark, df, globalClusteringCoefficientConfig)
globalClusterCoeff.show()
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,32 @@

package com.vesoft.nebula.algorithm

object DegreeStaticExample {}
import com.facebook.thrift.protocol.TCompactProtocol
import com.vesoft.nebula.algorithm.lib.{DegreeStaticAlgo}
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}

object DegreeStaticExample {

def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf()
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.registerKryoClasses(Array[Class[_]](classOf[TCompactProtocol]))
val spark = SparkSession
.builder()
.master("local")
.config(sparkConf)
.getOrCreate()

// val csvDF = ReadData.readCsvData(spark)
// val nebulaDF = ReadData.readNebulaData(spark)
val journalDF = ReadData.readLiveJournalData(spark)

degree(spark, journalDF)
}

def degree(spark: SparkSession, df: DataFrame): Unit = {
val degree = DegreeStaticAlgo.apply(spark, df)
degree.show()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,36 @@

package com.vesoft.nebula.algorithm

object GraphTriangleCountExample {}
import com.facebook.thrift.protocol.TCompactProtocol
import com.vesoft.nebula.algorithm.ClusteringCoefficientExample.{
globalCLusteringCoefficient,
localClusteringCoefficient
}
import com.vesoft.nebula.algorithm.lib.GraphTriangleCountAlgo
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}

object GraphTriangleCountExample {

def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf()
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.registerKryoClasses(Array[Class[_]](classOf[TCompactProtocol]))
val spark = SparkSession
.builder()
.master("local")
.config(sparkConf)
.getOrCreate()

// val csvDF = ReadData.readCsvData(spark)
// val nebulaDF = ReadData.readNebulaData(spark)
val journalDF = ReadData.readLiveJournalData(spark)

graphTriangleCount(spark, journalDF)
}

def graphTriangleCount(spark: SparkSession, df: DataFrame): Unit = {
val graphTriangleCount = GraphTriangleCountAlgo.apply(spark, df)
graphTriangleCount.show()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/* Copyright (c) 2021 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License.
*/

package com.vesoft.nebula.algorithm

import com.facebook.thrift.protocol.TCompactProtocol
import com.vesoft.nebula.algorithm.config.{LPAConfig, LouvainConfig}
import com.vesoft.nebula.algorithm.lib.{LabelPropagationAlgo, LouvainAlgo}
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}

object LouvainExample {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf()
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.registerKryoClasses(Array[Class[_]](classOf[TCompactProtocol]))
val spark = SparkSession
.builder()
.master("local")
.config(sparkConf)
.getOrCreate()

// val csvDF = ReadData.readCsvData(spark)
// val nebulaDF = ReadData.readNebulaData(spark)
val journalDF = ReadData.readLiveJournalData(spark)

louvain(spark, journalDF)
}

def louvain(spark: SparkSession, df: DataFrame): Unit = {
val louvainConfig = LouvainConfig(10, 5, 0.5)
val louvain = LouvainAlgo.apply(spark, df, louvainConfig, false)
louvain.show()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,33 @@

package com.vesoft.nebula.algorithm

object LpaExample {}
import com.facebook.thrift.protocol.TCompactProtocol
import com.vesoft.nebula.algorithm.config.LPAConfig
import com.vesoft.nebula.algorithm.lib.LabelPropagationAlgo
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}

object LpaExample {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf()
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.registerKryoClasses(Array[Class[_]](classOf[TCompactProtocol]))
val spark = SparkSession
.builder()
.master("local")
.config(sparkConf)
.getOrCreate()

// val csvDF = ReadData.readCsvData(spark)
// val nebulaDF = ReadData.readNebulaData(spark)
val journalDF = ReadData.readLiveJournalData(spark)

lpa(spark, journalDF)
}

def lpa(spark: SparkSession, df: DataFrame): Unit = {
val lpaConfig = LPAConfig(Int.MaxValue)
val lpa = LabelPropagationAlgo.apply(spark, df, lpaConfig, false)
lpa.show()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,105 @@

package com.vesoft.nebula.algorithm

object PageRankExample {}
import com.facebook.thrift.protocol.TCompactProtocol
import com.vesoft.nebula.algorithm.config.{CcConfig, PRConfig}
import com.vesoft.nebula.algorithm.lib.{PageRankAlgo, StronglyConnectedComponentsAlgo}
import org.apache.spark.SparkConf
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.{col, dense_rank}
import org.apache.spark.sql.{DataFrame, SparkSession}

object PageRankExample {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf()
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.registerKryoClasses(Array[Class[_]](classOf[TCompactProtocol]))
val spark = SparkSession
.builder()
.master("local")
.config(sparkConf)
.getOrCreate()

// the edge data has numerical vid
val csvDF = ReadData.readCsvData(spark)
pagerank(spark, csvDF)

// the edge data has string vid
val stringCsvDF = ReadData.readStringCsvData(spark)
pagerankWithIdMaping(spark, stringCsvDF)
}

/**
* the src id and dst id are numerical values
*/
def pagerank(spark: SparkSession, df: DataFrame): Unit = {
val pageRankConfig = PRConfig(3, 0.85)
val pr = PageRankAlgo.apply(spark, df, pageRankConfig, false)
pr.show()
}

/**
* convert src id and dst id to numerical value
*/
def pagerankWithIdMaping(spark: SparkSession, df: DataFrame): Unit = {
val encodedDF = convertStringId2LongId(df)
val pageRankConfig = PRConfig(3, 0.85)
val pr = PageRankAlgo.apply(spark, encodedDF, pageRankConfig, false)
val decodedPr = reconvertLongId2StringId(spark, pr)
decodedPr.show()
}

/**
* if your edge data has String type src_id and dst_id, then you need to convert the String id to Long id.
*
* in this example, the columns of edge dataframe is: src, dst
*
*/
def convertStringId2LongId(dataframe: DataFrame): DataFrame = {
// get all vertex ids from edge dataframe
val srcIdDF: DataFrame = dataframe.select("src").withColumnRenamed("src", "id")
val dstIdDF: DataFrame = dataframe.select("dst").withColumnRenamed("dst", "id")
val idDF = srcIdDF.union(dstIdDF).distinct()
idDF.show()

// encode id to Long type using dense_rank, the encodeId has two columns: id, encodedId
// then you need to save the encodeId to convert back for the algorithm's result.
val encodeId = idDF.withColumn("encodedId", dense_rank().over(Window.orderBy("id")))
encodeId.write.option("header", true).csv("file:///tmp/encodeId.csv")
encodeId.show()

// convert the edge data's src and dst
val srcJoinDF = dataframe
.join(encodeId)
.where(col("src") === col("id"))
.drop("src")
.drop("id")
.withColumnRenamed("encodedId", "src")
srcJoinDF.cache()
val dstJoinDF = srcJoinDF
.join(encodeId)
.where(col("dst") === col("id"))
.drop("dst")
.drop("id")
.withColumnRenamed("encodedId", "dst")
dstJoinDF.show()

// make the first two columns of edge dataframe are src and dst id
dstJoinDF.select("src", "dst", "weight")
}

/**
* re-convert the algorithm's result
* @return dataframe with columns: id, {algo_name}
*/
def reconvertLongId2StringId(spark: SparkSession, dataframe: DataFrame): DataFrame = {
// the String id and Long id map data
val encodeId = spark.read.option("header", true).csv("file:///tmp/encodeId.csv")

encodeId
.join(dataframe)
.where(col("encodedId") === col("_id"))
.drop("encodedId")
.drop("_id")
}
}
Loading