class Pregel extends AnyRef

Implements a Pregel-like bulk-synchronous message-passing API based on DataFrame operations.

See Malewicz et al., Pregel: a system for large-scale graph processing for a detailed description of the Pregel algorithm.

You can construct a Pregel instance using either this constructor or org.graphframes.GraphFrame#pregel, then use builder pattern to describe the operations, and then call run to start a run. It returns a DataFrame of vertices from the last iteration.

When a run starts, it expands the vertices DataFrame using column expressions defined by withVertexColumn. Those additional vertex properties can be changed during Pregel iterations. In each Pregel iteration, there are three phases:

  • Given each edge triplet, generate messages and specify target vertices to send, described by sendMsgToDst and sendMsgToSrc.
  • Aggregate messages by target vertex IDs, described by aggMsgs.
  • Update additional vertex properties based on aggregated messages and states from previous iteration, described by withVertexColumn.

Please find what columns you can reference at each phase in the method API docs.

You can control the number of iterations by setMaxIter and check API docs for advanced controls.

Example code for Page Rank:

val edges = ...
val vertices = GraphFrame.fromEdges(edges).outDegrees.cache()
val numVertices = vertices.count()
val graph = GraphFrame(vertices, edges)
val alpha = 0.15
val ranks = graph.pregel
  .withVertexColumn("rank", lit(1.0 / numVertices),
    coalesce(Pregel.msg, lit(0.0)) * (1.0 - alpha) + alpha / numVertices)
  .sendMsgToDst(Pregel.src("rank") / Pregel.src("outDegree"))
  .aggMsgs(sum(Pregel.msg))
  .run()
See also

org.graphframes.GraphFrame#pregel

Malewicz et al., Pregel: a system for large-scale graph processing.

Linear Supertypes
Ordering
  1. Alphabetic
  2. By Inheritance
Inherited
  1. Pregel
  2. AnyRef
  3. Any
Implicitly
  1. by any2stringadd
  2. by StringFormat
  3. by Ensuring
  4. by ArrowAssoc
  1. Hide All
  2. Show All
Visibility
  1. Public
  2. All

Instance Constructors

  1. new Pregel(graph: GraphFrame)

    graph

    The graph that Pregel will run on.

Value Members

  1. final def !=(arg0: Any): Boolean
    Definition Classes
    AnyRef → Any
  2. final def ##(): Int
    Definition Classes
    AnyRef → Any
  3. def +(other: String): String
    Implicit
    This member is added by an implicit conversion from Pregel to any2stringadd[Pregel] performed by method any2stringadd in scala.Predef.
    Definition Classes
    any2stringadd
  4. def ->[B](y: B): (Pregel, B)
    Implicit
    This member is added by an implicit conversion from Pregel to ArrowAssoc[Pregel] performed by method ArrowAssoc in scala.Predef.
    Definition Classes
    ArrowAssoc
    Annotations
    @inline()
  5. final def ==(arg0: Any): Boolean
    Definition Classes
    AnyRef → Any
  6. def aggMsgs(aggExpr: Column): Pregel.this.type

    Defines how messages are aggregated after grouped by target vertex IDs.

    Defines how messages are aggregated after grouped by target vertex IDs.

    aggExpr

    the message aggregation expression, such as sum(Pregel.msg). You can reference the message column by Pregel$#msg and the vertex ID by GraphFrame$#ID, while the latter is usually not used.

  7. final def asInstanceOf[T0]: T0
    Definition Classes
    Any
  8. def clone(): AnyRef
    Attributes
    protected[lang]
    Definition Classes
    AnyRef
    Annotations
    @throws( ... ) @native()
  9. def ensuring(cond: (Pregel) ⇒ Boolean, msg: ⇒ Any): Pregel
    Implicit
    This member is added by an implicit conversion from Pregel to Ensuring[Pregel] performed by method Ensuring in scala.Predef.
    Definition Classes
    Ensuring
  10. def ensuring(cond: (Pregel) ⇒ Boolean): Pregel
    Implicit
    This member is added by an implicit conversion from Pregel to Ensuring[Pregel] performed by method Ensuring in scala.Predef.
    Definition Classes
    Ensuring
  11. def ensuring(cond: Boolean, msg: ⇒ Any): Pregel
    Implicit
    This member is added by an implicit conversion from Pregel to Ensuring[Pregel] performed by method Ensuring in scala.Predef.
    Definition Classes
    Ensuring
  12. def ensuring(cond: Boolean): Pregel
    Implicit
    This member is added by an implicit conversion from Pregel to Ensuring[Pregel] performed by method Ensuring in scala.Predef.
    Definition Classes
    Ensuring
  13. final def eq(arg0: AnyRef): Boolean
    Definition Classes
    AnyRef
  14. def equals(arg0: Any): Boolean
    Definition Classes
    AnyRef → Any
  15. def finalize(): Unit
    Attributes
    protected[lang]
    Definition Classes
    AnyRef
    Annotations
    @throws( classOf[java.lang.Throwable] )
  16. final def getClass(): Class[_]
    Definition Classes
    AnyRef → Any
    Annotations
    @native()
  17. val graph: GraphFrame
  18. def hashCode(): Int
    Definition Classes
    AnyRef → Any
    Annotations
    @native()
  19. final def isInstanceOf[T0]: Boolean
    Definition Classes
    Any
  20. final def ne(arg0: AnyRef): Boolean
    Definition Classes
    AnyRef
  21. final def notify(): Unit
    Definition Classes
    AnyRef
    Annotations
    @native()
  22. final def notifyAll(): Unit
    Definition Classes
    AnyRef
    Annotations
    @native()
  23. def run(): DataFrame

    Runs the defined Pregel algorithm.

    Runs the defined Pregel algorithm.

    returns

    the result vertex DataFrame from the final iteration including both original and additional columns.

  24. def sendMsgToDst(msgExpr: Column): Pregel.this.type

    Defines a message to send to the destination vertex of each edge triplet.

    Defines a message to send to the destination vertex of each edge triplet.

    You can call it multiple times to send more than one messages.

    msgExpr

    the message expression to send to the destination vertex given a (src, edge, dst) triplet. Source/destination vertex properties and edge properties are nested under columns src, dst, and edge, respectively. You can reference them using Pregel$#src, Pregel$#dst, and Pregel$#edge. Null messages are not included in message aggregation.

    See also

    sendMsgToSrc

  25. def sendMsgToSrc(msgExpr: Column): Pregel.this.type

    Defines a message to send to the source vertex of each edge triplet.

    Defines a message to send to the source vertex of each edge triplet.

    You can call it multiple times to send more than one messages.

    msgExpr

    the expression of the message to send to the source vertex given a (src, edge, dst) triplet. Source/destination vertex properties and edge properties are nested under columns src, dst, and edge, respectively. You can reference them using Pregel$#src, Pregel$#dst, and Pregel$#edge. Null messages are not included in message aggregation.

    See also

    sendMsgToDst

  26. def setCheckpointInterval(value: Int): Pregel.this.type

    Sets the number of iterations between two checkpoints (default: 2).

    Sets the number of iterations between two checkpoints (default: 2).

    This is an advanced control to balance query plan optimization and checkpoint data I/O cost. In most cases, you should keep the default value.

    Checkpoint is disabled if this is set to 0.

  27. def setMaxIter(value: Int): Pregel.this.type

    Sets the max number of iterations (default: 10).

  28. final def synchronized[T0](arg0: ⇒ T0): T0
    Definition Classes
    AnyRef
  29. def toString(): String
    Definition Classes
    AnyRef → Any
  30. final def wait(): Unit
    Definition Classes
    AnyRef
    Annotations
    @throws( ... )
  31. final def wait(arg0: Long, arg1: Int): Unit
    Definition Classes
    AnyRef
    Annotations
    @throws( ... )
  32. final def wait(arg0: Long): Unit
    Definition Classes
    AnyRef
    Annotations
    @throws( ... ) @native()
  33. def withVertexColumn(colName: String, initialExpr: Column, updateAfterAggMsgsExpr: Column): Pregel.this.type

    Defines an additional vertex column at the start of run and how to update it in each iteration.

    Defines an additional vertex column at the start of run and how to update it in each iteration.

    You can call it multiple times to add more than one additional vertex columns.

    colName

    the name of the additional vertex column. It cannot be an existing vertex column in the graph.

    initialExpr

    the expression to initialize the additional vertex column. You can reference all original vertex columns in this expression.

    updateAfterAggMsgsExpr

    the expression to update the additional vertex column after messages aggregation. You can reference all original vertex columns, additional vertex columns, and the aggregated message column using Pregel$#msg. If the vertex received no messages, the message column would be null.

  34. def [B](y: B): (Pregel, B)
    Implicit
    This member is added by an implicit conversion from Pregel to ArrowAssoc[Pregel] performed by method ArrowAssoc in scala.Predef.
    Definition Classes
    ArrowAssoc

Deprecated Value Members

  1. def formatted(fmtstr: String): String
    Implicit
    This member is added by an implicit conversion from Pregel to StringFormat[Pregel] performed by method StringFormat in scala.Predef.
    Definition Classes
    StringFormat
    Annotations
    @deprecated @inline()
    Deprecated

    (Since version 2.12.16) Use formatString.format(value) instead of value.formatted(formatString), or use the f"" string interpolator. In Java 15 and later, formatted resolves to the new method in String which has reversed parameters.

Inherited from AnyRef

Inherited from Any

Inherited by implicit conversion any2stringadd from Pregel to any2stringadd[Pregel]

Inherited by implicit conversion StringFormat from Pregel to StringFormat[Pregel]

Inherited by implicit conversion Ensuring from Pregel to Ensuring[Pregel]

Inherited by implicit conversion ArrowAssoc from Pregel to ArrowAssoc[Pregel]

Ungrouped