This is a primitive for implementing graph algorithms.
Breadth-first search (BFS)
Breadth-first search (BFS)
This method returns a DataFrame of valid shortest paths from vertices matching fromExpr
to vertices matching toExpr
. If multiple paths are valid and have the same length,
the DataFrame will return one Row for each path. If no paths are valid, the DataFrame will
be empty.
Note: "Shortest" means globally shortest path. I.e., if the shortest path between two vertices
matching fromExpr
and toExpr
is length 5 (edges) but no path is shorter than 5, then all
paths returned by BFS will have length 5.
The returned DataFrame will have the following columns:
from
start vertex of pathe[i]
edge i in the path, indexed from 0v[i]
intermediate vertex i in the path, indexed from 1to
end vertex of path
Each of these columns is a StructType whose fields are the same as the columns of
GraphFrame.vertices or GraphFrame.edges.For example, suppose we have a graph g. Say the vertices DataFrame of g has columns "id" and "job", and the edges DataFrame of g has columns "src", "dst", and "relation".
// Search from vertex "Joe" to find the closet vertices with attribute job = CEO. g.bfs(col("id") === "Joe", col("job") === "CEO").run()
If we found a path of 3 edges, each row would have columns:
from | e0 | v1 | e1 | v2 | e2 | to
In the above row, each vertex column (from, v1, v2, to) would have fields "id" and "job" (just like g.vertices). Each edge column (e0, e1, e2) would have fields "src", "dst", and "relation".
If there are ties, then each of the equal paths will be returned as a separate Row.
If one or more vertices match both the from and to conditions, then there is a 0-hop path.
The returned DataFrame will have the "from" and "to" columns (as above); however,
the "from" and "to" columns will be exactly the same. There will be one row for each vertex
in GraphFrame.vertices matching both fromExpr
and toExpr
.
Parameters:
fromExpr
Spark SQL expression specifying valid starting vertices for the BFS.
This condition will be matched against each vertex's id or attributes.
To start from a specific vertex, this could be "id = [start vertex id]".
To start from multiple valid vertices, this can operate on vertex attributes.toExpr
Spark SQL expression specifying valid target vertices for the BFS.
This condition will be matched against each vertex's id or attributes.maxPathLength
Limit on the length of paths. If no valid paths of length
<= maxPathLength are found, then the BFS is terminated.
(default = 10)edgeFilter
Spark SQL expression specifying edges which may be used in the search.
This allows the user to disallow crossing certain edges. Such filters
can be applied post-hoc after BFS, run specifying the filter here is more
efficient.Returns:
Connected components algorithm.
Connected components algorithm.
Computes the connected component membership of each vertex and returns a DataFrame of vertex information with each vertex assigned a component ID.
The resulting DataFrame contains all the vertex information and one additional column:
LongType
): unique ID for this component
Run static Label Propagation for detecting communities in networks.
Run static Label Propagation for detecting communities in networks.
Each node in the network is initially assigned to its own community. At every iteration, nodes send their community affiliation to all neighbors and update their state to the mode community affiliation of incoming messages.
LPA is a standard community detection algorithm for graphs. It is very inexpensive computationally, although (1) convergence is not guaranteed and (2) one can end up with trivial solutions (all nodes are identified into a single community).
The resulting DataFrame contains all the original vertex information and one additional column:
LongType
): label of community affiliation
PageRank algorithm implementation.
PageRank algorithm implementation. There are two implementations of PageRank.
The first implementation uses the standalone GraphFrame interface and runs PageRank
for a fixed number of iterations. This can be run by setting maxIter
.
var PR = Array.fill(n)( 1.0 ) val oldPR = Array.fill(n)( 1.0 ) for( iter <- 0 until maxIter ) { swap(oldPR, PR) for( i <- 0 until n ) { PR[i] = alpha + (1 - alpha) * inNbrs[i].map(j => oldPR[j] / outDeg[j]).sum } }
The second implementation uses the org.apache.spark.graphx.Pregel
interface and runs PageRank
until convergence. This can be run by setting tol
.
var PR = Array.fill(n)( 1.0 ) val oldPR = Array.fill(n)( 0.0 ) while( max(abs(PR - oldPr)) > tol ) { swap(oldPR, PR) for( i <- 0 until n if abs(PR[i] - oldPR[i]) > tol ) { PR[i] = alpha + (1 - \alpha) * inNbrs[i].map(j => oldPR[j] / outDeg[j]).sum } }
alpha
is the random reset probability (typically 0.15), inNbrs[i]
is the set of
neighbors which link to i
and outDeg[j]
is the out degree of vertex j
.
Note that this is not the "normalized" PageRank and as a consequence pages that have no inlinks will have a PageRank of alpha. In particular, the pageranks may have some values greater than 1.
The resulting vertices DataFrame contains one additional column:
DoubleType
): the pagerank of this vertexThe resulting edges DataFrame contains one additional column:
DoubleType
): the normalized weight of this edge after running PageRank
Parallel Personalized PageRank algorithm implementation.
Parallel Personalized PageRank algorithm implementation.
This implementation uses the standalone GraphFrame interface and
runs personalized PageRank in parallel for a fixed number of iterations.
This can be run by setting maxIter
.
The source vertex Ids are set in sourceIds
.
A simple local implementation of this algorithm is as follows.
var oldPR = Array.fill(n)( 1.0 ) val PR = (0 until n).map(i => if sourceIds.contains(i) alpha else 0.0) for( iter <- 0 until maxIter ) { swap(oldPR, PR) for( i <- 0 until n ) { PR[i] = (1 - alpha) * inNbrs[i].map(j => oldPR[j] / outDeg[j]).sum if (sourceIds.contains(i)) PR[i] += alpha } }
alpha
is the random reset probability (typically 0.15), inNbrs[i]
is the set of
neighbors which link to i
and outDeg[j]
is the out degree of vertex j
.
Note that this is not the "normalized" PageRank and as a consequence pages that have no inlinks will have a PageRank of alpha. In particular, the pageranks may have some values greater than 1.
The resulting vertices DataFrame contains one additional column:
VectorType
): the pageranks of this vertex from all input source verticesThe resulting edges DataFrame contains one additional column:
DoubleType
): the normalized weight of this edge after running PageRank
Implement SVD++ based on "Factorization Meets the Neighborhood: a Multifaceted Collaborative Filtering Model", available at https://movie-datamining.googlecode.com/svn/trunk/kdd08koren.pdf.
Implement SVD++ based on "Factorization Meets the Neighborhood: a Multifaceted Collaborative Filtering Model", available at https://movie-datamining.googlecode.com/svn/trunk/kdd08koren.pdf.
Note: The status of this algorithm is EXPERIMENTAL. Its API and implementation may be changed in the future.
The prediction rule is r_{ui} = u + b_{u} + b_{i} + q_{i}*(p_{u} + |N(u)|^{-0.5}*sum(y)). See the details on page 6 of the article.
Configuration parameters: see the description of each parameter in the article.
Returns a DataFrame with vertex attributes containing the trained model. See the object (static) members for the names of the output columns.
Computes shortest paths from every vertex to the given set of landmark vertices.
Computes shortest paths from every vertex to the given set of landmark vertices. Note that this takes edge direction into account.
The returned DataFrame contains all the original vertex information as well as one additional column:
MapType[vertex ID type, IntegerType]
): For each vertex v, a map containing
the shortest-path distance to each reachable landmark vertex.
Compute the strongly connected component (SCC) of each vertex and return a DataFrame with each vertex assigned to the SCC containing that vertex.
Compute the strongly connected component (SCC) of each vertex and return a DataFrame with each vertex assigned to the SCC containing that vertex.
The resulting DataFrame contains all the original vertex information and one additional column:
LongType
): unique ID for this component
Computes the number of triangles passing through each vertex.
Computes the number of triangles passing through each vertex.
This algorithm ignores edge direction; i.e., all edges are treated as undirected. In a multigraph, duplicate edges will be counted only once.
Note that this provides the same algorithm as GraphX, but GraphX assumes the user provides a graph in the correct format. In Spark 2.0, GraphX can automatically canonicalize the graph to put it in this format.
The returned DataFrame contains all the original vertex information and one additional column:
LongType
): the count of triangles
This is a primitive for implementing graph algorithms. This method aggregates messages from the neighboring edges and vertices of each vertex.
For each triplet (source vertex, edge, destination vertex) in GraphFrame.triplets, this can send a message to the source and/or destination vertices.
AggregateMessages.sendToSrc()
sends a message to the source vertex of each tripletAggregateMessages.sendToDst()
sends a message to the destination vertex of each tripletAggregateMessages.agg
specifies an aggregation function for aggregating the messages sent to each vertex. It also runs the aggregation, computing a DataFrame with one row for each vertex which receives > 0 messages. The DataFrame has 2 columns:Column
specified inAggregateMessages.agg()
)When specifying the messages and aggregation function, the user may reference columns using:
Note: If you use this operation to write an iterative algorithm, you may want to use getCachedDataFrame() as a workaround for caching issues.
We can use this function to compute the in-degree of each vertex