2016-07-19 9 views
1

(Dies basiert auf versucht, eine Integer RDD zu einer TholdDropResult RDD zuordnen, aber wir müssen eine einzige SparkDoDrop initialisieren, um alle (10^8) TholdDropResults zu generieren, daher die Verwendung der einzigen Geschmack in Java von mapPartition von mapPartitionsWithIndex, dass die Art der Funktion bieten wir benötigen, methinks)Apache Spark Function2, bekommt keine Deklaration richtig

. Frage: ich erhalte eine Fehlermeldung mit org.apache.spark.api.java.function.Function2

ich bin um herauszufinden, wie nicht in der Lage arbeite mit dem "boolean" in eine new Function2

Wenn ich diesen Code versuchen, dann nach rechts, die new Function2 Erklärung zu sehen, die Mühe geben, mir zu sein scheint (hinzugefügt Builder-Stil Formatierung Antwort):

JavaRDD<TholdDropResult> dropResultsN = dataSetN.mapPartitionsWithIndex(
             new Function2<Integer, 
             Iterator<Integer>, 
             Iterator<TholdDropResult>>(){ 

     @Override 
     public Iterator<TholdDropResult> call(Integer partitionID, Iterator<Integer> integerIterator) throws Exception { 
      // 
      SparkDoDrop standin = makeNewSparkDoDrop(); 
      standin.initializeLI(); 
      List<TholdDropResult> rddToReturn = new ArrayList<>(); 
      while (integerIterator.hasNext()){ 
       rddToReturn.add(standin.call(integerIterator.next())); 
      } 
      return rddToReturn.iterator(); 

     }}); 
    dropResultsN.persist(StorageLevel.MEMORY_ONLY()); 

Hier ist der vollständige Fehler, wenn ich laufe gradle build:

JavaRDD<TholdDropResult> dropResultsN = dataSetN.mapPartitionsWithIndex(new Function2<Integer, Iterator<Integer>, Iterator<TholdDropResult>>(){ 
required: Function2<Integer,Iterator<Integer>,Iterator<R>>,boolean 
    found: <anonymous Function2<Integer,Iterator<Integer>,Iterator<TholdDropResult>>> 
    reason: cannot infer type-variable(s) R 
    (actual and formal argument lists differ in length) 
    where R,T,This are type-variables: 
    R extends Object declared in method <R>mapPartitionsWithIndex(Function2<Integer,Iterator<T>,Iterator<R>>,boolean) 
    T extends Object declared in class AbstractJavaRDDLike 
    This extends JavaRDDLike<T,This> declared in class AbstractJavaRDDLike 

Wenn ich versuche, wie so dort die Boolesche arg zu platzieren: new Function2<Integer, Iterator<Integer>, Iterator<TholdDropResult>, Boolean>() ich erhalte eine Fehlermeldung:

error: wrong number of type arguments; required 3 
      JavaRDD<TholdDropResult> dropResultsN = dataSetN.mapPartitionsWithIndex(new Function2<Integer, Iterator<Integer>, Iterator<TholdDropResult>, Boolean>(){ 

Schließlich, wenn ich boolean statt Boolean verwenden erhalte ich einen anderen Fehler:

error: unexpected type 
      JavaRDD<TholdDropResult> dropResultsN = dataSetN.mapPartitionsWithIndex(new Function2<Integer, Iterator<Integer>, Iterator<TholdDropResult>, boolean>(){ 
                                         ^
    required: reference 
    found: boolean 

error: wrong number of type arguments; required 3 
      JavaRDD<TholdDropResult> dropResultsN = dataSetN.mapPartitionsWithIndex(new Function2<Integer, Iterator<Integer>, Iterator<TholdDropResult>, boolean>(){ 
+0

Und FWIW der Der Grund für diesen Ansatz ist, dass die Zeile 'standin.initializeLI();' ein ziemlich großes Objekt erzeugt, das wir nicht serialisieren wollen. Wir möchten, dass Spark dieses Objekt auf dem Executor erzeugt. – JimLohse

Antwort

1

Sie müssen die Nähe der Function2 mit einem zusätzlichen > vor dem Boolean:

JavaRDD<TholdDropResult> dropResultsN = 
    dataSetN.mapPartitionsWithIndex(new Function2<Integer, 
               Iterator<Integer>, 
               Iterator<TholdDropResult>>, Boolean> 

Die Signatur von mapPartitionsWithIndex sieht so aus:

<R> JavaRDD<R> mapPartitionsWithIndex(Function2<java.lang.Integer, 
               java.util.Iterator<T>, 
               java.util.Iterator<R>> f, 
               boolean preservesPartitioning) 

Die Function2 nimmt eine Integer und eine Iterator<T> und gibt eine Iterator<R> zurück. Der erwartete boolean ist ein Parameter, der nicht innerhalb der Function2 definiert ist.

+0

Ich werde +1 und akzeptiere, zum Teil um mich an die viel bessere Formatierung zu erinnern, die ich verwenden sollte. Zur gleichen Zeit hat das es nicht behoben. Ich bekomme _unexpected token_, einen Fehler auf der @Override und mehr, wenn ich 'dataSetN.mapPartitionsWithIndex (neue Funktion2 , Iterator >, boolean>() {' – JimLohse

+0

Ich würde eine ForeachPartition () aber ich brauche dies, um eine RDD von TholdDropResult zurückzugeben, ich glaube nicht, dass foreachPartition jede andere Funktion als VoidFunction erlaubt.hmmmm – JimLohse

+1

Ich habe es, siehe meine Antwort und DANKE für mich in die richtige Richtung, auch Hilfe von dieser Antwort auf [Apache Spark mapPartitionsWithIndex] (http://stackoverflow.com/a/28938316/3255525) – JimLohse

0

Dies funktioniert, nicht sicher, warum, aber die Function2 Aussondern hat der Trick (natürlich habe ich noch nicht kompiliert und ausgeführt :)

 Function2 makeLIThenDropResults = new Function2<Integer, 
                 Iterator<Integer>, 
                 Iterator<TholdDropResult>>() { 
      @Override 
      public Iterator<TholdDropResult> call(Integer partitionID, Iterator<Integer> integerIterator) throws Exception { 
       SparkDoDrop standin = makeNewSparkDoDrop(); 

       standin.initializeLI(); 
       List<TholdDropResult> rddToReturn = new ArrayList<>(); 
       while (integerIterator.hasNext()){ 
        rddToReturn.add(standin.call(integerIterator.next())); 
       } 
       return rddToReturn.iterator(); 
      } 
     }; 

     // now make the RDD of subset of N 
     // setup bogus arrays of size N for parallelize to lead to dropResultsN 
     JavaRDD<TholdDropResult> dropResultsN = dataSetN.mapPartitionsWithIndex(makeLIThenDropResults, true); 

(Hut Spitze zu this answer on Apache Spark mapPartitionsWithIndex)

+0

kompiliert und lief, Danke an all die Hilfe – JimLohse