2016-04-22 9 views
2

Ich Portierung einige Graph.pregel Algorithmen GraphFrame.aggregateMessages. Ich finde die GraphFrame APIs ein wenig umständlich. In den Graph APIs kann ich einen case class als meine Nachricht senden. Aber in dem GraphFrame APIs, aggregateMessages.sendToSrc und .sendToDst Arbeit entweder auf einen SQL-Ausdruck String oder auf einem Column. Ich finde das so stark wie es ein Schmerz in den Arsch ist.Shortcuts für die Erstellung von komplizierten Säulenstrukturen in Spark-

Angenommen, Sie haben:

case class Vote(yay: Boolean, voters: Long = 1L) 
case class Send(vote: Vote, from: Long) 

GraphX Verwendung und die pregel Funktion, kann ich ein sendMsg bauen, die Iterator[(VertexId,Send)] zurückgibt, die wie etwas sein könnte: Iterator((1L, Send(Vote(yay = true), from = 2L)))

Mit GraphFrames ich Column bauen das dient dem gleichen Zweck wie Iterator[(VertexId,Send)], idealerweise ohne meine bereits definierte case classes komplett aufzugeben (viel komplizierter als das obige Beispiel).

Welche Verknüpfungen sind da, um das zu tun?

Was ich so weit gekommen:

Es war ziemlich einfach, eine Instanz eines case class in eine entsprechende Struktur zu konvertieren. Diese meist kommt mich dorthin:

def ccToStruct(cc: Product) : Column = { 
    val values = cc.productIterator 
    var seq = Seq[Column]() 
    while (values.hasNext) { 
    val field = values.next() match { 
     case p: Some[Product @unchecked] if (p.get.productArity > 0) => ccToStruct(p.get) 
     case p: Product if (p.productArity > 0) => ccToStruct(p) 
     case x => lit(x) 
    } 
    seq = seq :+ field 
    } 
    struct(seq:_*) 
} 

Dies lässt mich tun:

ccToStruct(Send(Vote(true, 1L), 123L)) 
// res4: org.apache.spark.sql.Column = struct(struct(true,1),123) 

ich das Schema Flicken müsste ein wenig um es richtig machen arbeiten, aber bevor ich anfing, dass ich zu tun erkannte, dass dies ein völlig nutzloser Ansatz ist. Sie wollen nie wirklich einen case class Wert auf ein struct konvertieren - eine ziemlich nutzlos Nachricht ccToStruct(Send(Vote(true, 1L), 123L)) schaffen. Es entspricht dem Senden eines lit(Send(..))-Werts - außer dass lit() keine Fallklassen unterstützt.

Was wollen Sie stattdessen zu tun ist und lit Werte mit AM.dst("*") und AM.src("*") Spalten passen zu mischen, aber so entsprechend dem Schema des case class zu tun. (Ich dachte daran, Case-Klassen insgesamt aufzugeben, aber ich habe eine UDAF zu sum meine Nachrichten, und diese Logik war sehr einfach zu portieren, solange ich Fallklassen weiter verwenden.)

Ich glaube, die Antwort ist in der Lage zu sein schaffen eine Struktur wie folgt aus:

import org.graphframes.lib.AggregateMessages 
val AM = AggregateMessages 

val msg = Seq[Any](Seq[Any](true, 1L), AM.src("id")) 

und dann, dass die Verwendung struct() und das Schema meiner Fallklasse zu einem Column zu konvertieren.

Wenn niemand eine bessere Art und Weise hat, dies zu tun (und wahrscheinlich auch wenn jemand tut) Ich werde meine eigene Frage mit der Lösung beantworten später.

Antwort

0

Hier ist, was ich kam mit.

Für das, was ich tun will, was Column Objekte mit der Struktur der Fallklassen zu schaffen, ist aber mit der Fähigkeit, DataFrame.columns zu binden, entschied ich meine primäre Datenstruktur ein Seq[Any] sein sollte. Die Seq sollte der Struktur meiner Fallklasse entsprechen - die Seq ist im Grunde die Konstruktorargumente der Fallklasse. Wenn mein Fall Klasse:

case class Vote(yay: Boolean, voters: Long) 

Dann könnte ich erstellen die folgende Abstimmung artige Seq:

val voteSeq = Seq[Any](true, 1L) 

Aber der Grund, warum ich ein Seq[Any] verwenden müssen, ist, weil noch interessanter ist, kann ich erstellen :

val boundVote = Seq[Any](true, AM.edge("voters")) 

kam ich mit ein paar Funktionen auf, die verwendet werden können, die Seq zu einem Column zu konvertieren. Ich erstelle die Column mit der SQL-Funktion struct(). Sie könnten dies auch mit SQL-String-Ausdrücken tun. Aber ich entschied mich stattdessen mit Columns zu gehen. Ich wollte es so sauber wie möglich machen, und String SQL Ausdrücke werden unordentlich.

Wenn Sie nicht Ihre Spalten korrekt in Ihrer Struktur benennen, Sie structs erhalten, die wie folgt aussehen:

vote: struct (nullable = false) 
    |-- col1: boolean (nullable = false) 
    |-- col2: long (nullable = false) 

Das werde später ist saugen zu versuchen, wenn Sie versuchen, dass in einem Fall, Klasse zu konvertieren. Stattdessen haben Sie as für alle Spalten zu verwenden, so erhalten Sie:

vote: struct (nullable = false) 
    |-- yay: boolean (nullable = false) 
    |-- voters: long (nullable = false) 

Die Lösung ist eine StructType zu nehmen und dass die Feldnamen erstellen verwenden. Wie sich herausstellte, hatte ich bereits automatisch eine StructType aus einer Fallklasse abgeleitet - also das war der einfache Teil. Die erste Funktion macht das harte Teil - es sowohl rekursiv Wanderungen durch das Seq und das Schema und erzeugt Columns die letztlich in einem letzten verpackt erhalten: struct(colSeq:_*)

def seqToColumnSchema(anySeq: Seq[Any], schema: StructType) : Column = { 
    var colSeq = Seq[Column]() 
    anySeq.zip(schema.fields).foreach{ case (value, field) => { 
    colSeq = colSeq :+ (value match { 
     case c: Column => c as field.name 
     case p: Seq[Any] if (p.length > 0) => { 
     field.dataType match { 
      case s: StructType => seqToColumnSchema(p, s) as field.name 
      case a: ArrayType => array(p.map(v => lit(v)):_*) as field.name 
      case x => lit(x) as field.name 
     } 
     } 
     case x => lit(x) as field.name 
    }) 
    }} 
    struct(colSeq:_*) 
} 

Diese zweite Funktion ist nur ein Wrapper um die ersten, aber es können Sie tun:

seqToColumn[Vote](Seq(true, AM.edge("voters"))) 

Anstatt die StructType zu schaffen, müssen Sie nur den Namen der Fallklasse zu geben, haben innerhalb der [...]

import org.apache.spark.sql.catalyst.ScalaReflection  

def seqToColumn[T: TypeTag](anySeq: Seq[Any]) : Column = { 
    val schema = ScalaReflection.schemaFor[T].dataType.asInstanceOf[StructType] 
    seqToColumnSchema(anySeq, schema) 
} 

Das alles, nur so, dass ich das tun kann:

import org.graphframes.lib.AggregateMessages 
val AM = AggregateMessages 

case class Vote(yay: Boolean, voters: Long) 

val voteSeq = Seq[Any](true, AM.edge("voters")) 
val voteMsg = seqToColumn[Vote](voteSeq) 
// voteMsg: org.apache.spark.sql.Column = struct(true AS yay#18,edge[voters] AS voters#19) 

graphFrame.aggregateMessages.sendToDst(voteMsg).agg(voteSum(AM.msg) as "out").printSchema 
root 
|-- id: long (nullable = false) 
|-- out: struct (nullable = true) 
| |-- vote: struct (nullable = false) 
| | |-- yay: boolean (nullable = false) 
| | |-- voters: long (nullable = false) 
+0

Nope kein Fehler - Sie müssen nur angeben müssen 'Seq [Alles]' 'weil Seq (1,0, 4 l, 123) 'ist nicht das gleiche wie' Seq [Any] (1.0, 4L, 123) 'und nur der zweite quetscht deine Werte nicht in kompatible Typen. –