graphframes.lib package¶
Collection of utilities usable with message aggregation¶
-
class
graphframes.lib.
AggregateMessages
[source]¶ Collection of utilities usable with
graphframes.GraphFrame.aggregateMessages()
.-
dst
¶ Reference for destination column, used for specifying messages.
-
edge
¶ Reference for edge column, used for specifying messages.
-
static
getCachedDataFrame
(df)[source]¶ Create a new cached copy of a DataFrame.
This utility method is useful for iterative DataFrame-based algorithms. See Scala documentation for more details.
- WARNING: This is NOT the same as DataFrame.cache().
- The original DataFrame will NOT be cached.
-
msg
¶ Reference for message column, used for specifying aggregation function.
-
src
¶ Reference for source column, used for specifying messages.
-
-
class
graphframes.lib.
Pregel
(graph)[source]¶ Implements a Pregel-like bulk-synchronous message-passing API based on DataFrame operations.
See Malewicz et al., Pregel: a system for large-scale graph processing for a detailed description of the Pregel algorithm.
You can construct a Pregel instance using either this constructor or
graphframes.GraphFrame.pregel
, then use builder pattern to describe the operations, and then callrun()
to start a run. It returns a DataFrame of vertices from the last iteration.When a run starts, it expands the vertices DataFrame using column expressions defined by
withVertexColumn()
. Those additional vertex properties can be changed during Pregel iterations. In each Pregel iteration, there are three phases:- Given each edge triplet, generate messages and specify target vertices to send,
described by
sendMsgToDst()
andsendMsgToSrc()
. - Aggregate messages by target vertex IDs, described by
aggMsgs()
. - Update additional vertex properties based on aggregated messages and states from previous iteration,
described by
withVertexColumn()
.
Please find what columns you can reference at each phase in the method API docs.
You can control the number of iterations by
setMaxIter()
and check API docs for advanced controls.Parameters: graph – a graphframes.GraphFrame
object holding a graph with vertices and edges stored as DataFrames.>>> from graphframes import GraphFrame >>> from pyspark.sql.functions import coalesce, col, lit, sum, when >>> from graphframes.lib import Pregel >>> edges = spark.createDataFrame([[0, 1], ... [1, 2], ... [2, 4], ... [2, 0], ... [3, 4], # 3 has no in-links ... [4, 0], ... [4, 2]], ["src", "dst"]) >>> edges.cache() >>> vertices = spark.createDataFrame([[0], [1], [2], [3], [4]], ["id"]) >>> numVertices = vertices.count() >>> vertices = GraphFrame(vertices, edges).outDegrees >>> vertices.cache() >>> graph = GraphFrame(vertices, edges) >>> alpha = 0.15 >>> ranks = graph.pregel \ ... .setMaxIter(5) \ ... .withVertexColumn("rank", lit(1.0 / numVertices), \ ... coalesce(Pregel.msg(), lit(0.0)) * lit(1.0 - alpha) + lit(alpha / numVertices)) \ ... .sendMsgToDst(Pregel.src("rank") / Pregel.src("outDegree")) \ ... .aggMsgs(sum(Pregel.msg())) \ ... .run()
-
aggMsgs
(aggExpr)[source]¶ Defines how messages are aggregated after grouped by target vertex IDs.
Parameters: aggExpr – the message aggregation expression, such as sum(Pregel.msg()). You can reference the message column by msg()
and the vertex ID by col(“id”), while the latter is usually not used.
-
static
dst
(colName)[source]¶ References a destination vertex column in generating messages to send.
See
sendMsgToSrc()
andsendMsgToDst()
Parameters: colName – the vertex column name.
-
static
edge
(colName)[source]¶ References an edge column in generating messages to send.
See
sendMsgToSrc()
andsendMsgToDst()
Parameters: colName – the edge column name.
-
static
msg
()[source]¶ References the message column in aggregating messages and updating additional vertex columns.
See
aggMsgs()
andwithVertexColumn()
-
run
()[source]¶ Runs the defined Pregel algorithm.
Returns: the result vertex DataFrame from the final iteration including both original and additional columns.
-
sendMsgToDst
(msgExpr)[source]¶ Defines a message to send to the destination vertex of each edge triplet.
You can call it multiple times to send more than one messages.
See method
sendMsgToSrc()
.Parameters: msgExpr – the message expression to send to the destination vertex given a (src, edge, dst) triplet. Source/destination vertex properties and edge properties are nested under columns src, dst, and edge, respectively. You can reference them using src()
,dst()
, andedge()
. Null messages are not included in message aggregation.
-
sendMsgToSrc
(msgExpr)[source]¶ Defines a message to send to the source vertex of each edge triplet.
You can call it multiple times to send more than one messages.
See method
sendMsgToDst()
.Parameters: msgExpr – the expression of the message to send to the source vertex given a (src, edge, dst) triplet. Source/destination vertex properties and edge properties are nested under columns src, dst, and edge, respectively. You can reference them using src()
,dst()
, andedge()
. Null messages are not included in message aggregation.
-
setCheckpointInterval
(value)[source]¶ Sets the number of iterations between two checkpoints (default: 2).
This is an advanced control to balance query plan optimization and checkpoint data I/O cost. In most cases, you should keep the default value.
Checkpoint is disabled if this is set to 0.
-
static
src
(colName)[source]¶ References a source vertex column in generating messages to send.
See
sendMsgToSrc()
andsendMsgToDst()
Parameters: colName – the vertex column name.
-
withVertexColumn
(colName, initialExpr, updateAfterAggMsgsExpr)[source]¶ Defines an additional vertex column at the start of run and how to update it in each iteration.
You can call it multiple times to add more than one additional vertex columns.
Parameters: - colName – the name of the additional vertex column. It cannot be an existing vertex column in the graph.
- initialExpr – the expression to initialize the additional vertex column. You can reference all original vertex columns in this expression.
- updateAfterAggMsgsExpr – the expression to update the additional vertex column after messages aggregation.
You can reference all original vertex columns, additional vertex columns, and the
aggregated message column using
msg()
. If the vertex received no messages, the message column would be null.
- Given each edge triplet, generate messages and specify target vertices to send,
described by