2016-03-28 8 views
1

Wie kann ich ein DataFrame aus einem JavaRDD enthält Ganzzahlen erstellen. Ich habe etwas wie unten getan, aber nicht funktioniert.Wie erstelle ich einen Spark-Datenframe von Integer RDD

List<Integer> input = Arrays.asList(101, 103, 105); 
JavaRDD<Integer> inputRDD = sc.parallelize(input); 
DataFrame dataframe = sqlcontext.createDataFrame(inputRDD, Integer.class); 

Ich habe ClassCastException sagen org.apache.spark.sql.types.IntegerType$ cannot be cast to org.apache.spark.sql.types.StructType

Wie kann ich das erreichen?

Antwort

1

Offenbar (wenn auch nicht intuitiv), ist diese createDataFrame Überlastung kann für „Bean“ Typen nur arbeiten, was bedeutet, dass Typen entsprechen nicht zu einem Einbau-Spark-Typ SQL.

Sie können, dass in dem Quellcode sehen, die Klasse, die Sie mit einem Spark-SQL-Typ in JavaTypeInference.inferDataType angepasst passieren, und das Ergebnis in ein StructType gegossen (dataType.asInstanceOf[StructType] in SQLContext.getSchema sehen - aber die in „primitivem“ Typ gebaut (wie IntegerType) sind NICHTStructType s ... Sieht aus wie ein Bug oder nicht dokumentierte Verhalten zu mir ....

Abhilfen:

  1. Ihre Integer s mit einem "bean" Klasse Wrap (das ist hässlich, ich weiß):

    public static class MyBean { 
        final int value; 
    
        MyBean(int value) { 
         this.value = value; 
        } 
    
        public int getValue() { 
         return value; 
        } 
    } 
    
    List<MyBean> input = Arrays.asList(new MyBean(101), new MyBean(103), new MyBean(105)); 
    JavaRDD<MyBean> inputRDD = sc.parallelize(input); 
    DataFrame dataframe = sqlcontext.createDataFrame(inputRDD, MyBean.class); 
    
    dataframe.show(); // this works... 
    
  2. Convert sich RDD<Row>:

    // convert to Rows: 
    JavaRDD<Row> rowRdd = inputRDD.map(new Function<Integer, Row>() { 
        @Override 
        public Row call(Integer v1) throws Exception { 
         return RowFactory.create(v1); 
        } 
    }); 
    
    // create schema (this looks nicer in Scala...): 
    StructType schema = new StructType(new StructField[]{new StructField("number", IntegerType$.MODULE$, false, Metadata.empty())}); 
    
    DataFrame dataframe = sqlcontext.createDataFrame(rowRdd, schema); 
    dataframe.show(); // this works... 
    
0

Jetzt Spark 2.2 können Sie das tun, um ein Dataset zu erstellen.

Dataset<Integer> dataSet = sqlContext().createDataset(javardd.rdd(), Encoders.INT());