Ich Portierung einige Graph.pregel
Algorithmen GraphFrame.aggregateMessages
. Ich finde die GraphFrame
APIs ein wenig umständlich. In den Graph
APIs kann ich einen case class
als meine Nachricht senden. Aber in dem GraphFrame
APIs, aggregateMessages.sendToSrc
und .sendToDst
Arbeit entweder auf einen SQL-Ausdruck String
oder auf einem Column
. Ich finde das so stark wie es ein Schmerz in den Arsch ist.Shortcuts für die Erstellung von komplizierten Säulenstrukturen in Spark-
Angenommen, Sie haben:
case class Vote(yay: Boolean, voters: Long = 1L)
case class Send(vote: Vote, from: Long)
GraphX
Verwendung und die pregel
Funktion, kann ich ein sendMsg
bauen, die Iterator[(VertexId,Send)]
zurückgibt, die wie etwas sein könnte: Iterator((1L, Send(Vote(yay = true), from = 2L)))
Mit GraphFrames
ich Column
bauen das dient dem gleichen Zweck wie Iterator[(VertexId,Send)]
, idealerweise ohne meine bereits definierte case classes
komplett aufzugeben (viel komplizierter als das obige Beispiel).
Welche Verknüpfungen sind da, um das zu tun?
Was ich so weit gekommen:
Es war ziemlich einfach, eine Instanz eines case class
in eine entsprechende Struktur zu konvertieren. Diese meist kommt mich dorthin:
def ccToStruct(cc: Product) : Column = {
val values = cc.productIterator
var seq = Seq[Column]()
while (values.hasNext) {
val field = values.next() match {
case p: Some[Product @unchecked] if (p.get.productArity > 0) => ccToStruct(p.get)
case p: Product if (p.productArity > 0) => ccToStruct(p)
case x => lit(x)
}
seq = seq :+ field
}
struct(seq:_*)
}
Dies lässt mich tun:
ccToStruct(Send(Vote(true, 1L), 123L))
// res4: org.apache.spark.sql.Column = struct(struct(true,1),123)
ich das Schema Flicken müsste ein wenig um es richtig machen arbeiten, aber bevor ich anfing, dass ich zu tun erkannte, dass dies ein völlig nutzloser Ansatz ist. Sie wollen nie wirklich einen case class
Wert auf ein struct
konvertieren - eine ziemlich nutzlos Nachricht ccToStruct(Send(Vote(true, 1L), 123L))
schaffen. Es entspricht dem Senden eines lit(Send(..))
-Werts - außer dass lit()
keine Fallklassen unterstützt.
Was wollen Sie stattdessen zu tun ist und lit
Werte mit AM.dst("*")
und AM.src("*")
Spalten passen zu mischen, aber so entsprechend dem Schema des case class
zu tun. (Ich dachte daran, Case-Klassen insgesamt aufzugeben, aber ich habe eine UDAF
zu sum
meine Nachrichten, und diese Logik war sehr einfach zu portieren, solange ich Fallklassen weiter verwenden.)
Ich glaube, die Antwort ist in der Lage zu sein schaffen eine Struktur wie folgt aus:
import org.graphframes.lib.AggregateMessages
val AM = AggregateMessages
val msg = Seq[Any](Seq[Any](true, 1L), AM.src("id"))
und dann, dass die Verwendung struct()
und das Schema meiner Fallklasse zu einem Column
zu konvertieren.
Wenn niemand eine bessere Art und Weise hat, dies zu tun (und wahrscheinlich auch wenn jemand tut) Ich werde meine eigene Frage mit der Lösung beantworten später.
Nope kein Fehler - Sie müssen nur angeben müssen 'Seq [Alles]' 'weil Seq (1,0, 4 l, 123) 'ist nicht das gleiche wie' Seq [Any] (1.0, 4L, 123) 'und nur der zweite quetscht deine Werte nicht in kompatible Typen. –