0

Ich versuche, einige Analysen in ToDF TempTable zu speichern, aber eine die folgenden Fehler erhalten ": 215: Fehler: Wert toDF ist kein Mitglied von Double". Ich lese Daten einer Cassandra-Tabelle, und ich mache einige Berechnungen. Ich möchte diese Ergebnisse in einer temporären Tabelle speichern. Ich bin neu in Scala, jemand, kann mir bitte helfen? meinen CodeScala: Speichern Ergebnis in einer ToDf-Tabelle

case class Consumo(consumo:Double, consumo_mensal: Double, mes: org.joda.time.DateTime,ano: org.joda.time.DateTime, soma_pf: Double,empo_gasto: Double); 

object Analysegridata{ 

val conf = new SparkConf(true) 

.set("spark.cassandra.connection.host","127.0.0.1").setAppName("LiniarRegression") 
.set("spark.cassandra.connection.port", "9042") 
.set("spark.driver.allowMultipleContexts", "true") 
.set("spark.streaming.receiver.writeAheadLog.enable", "true"); 
val sc = new SparkContext(conf); 

val ssc = new StreamingContext(sc, Seconds(1)) 

val sqlContext = new org.apache.spark.sql.SQLContext(sc); 
val checkpointDirectory = "/var/lib/cassandra/data" 
ssc.checkpoint(checkpointDirectory) // set checkpoint directory 

// val context = StreamingContext.getOrCreate(checkpointDirectory)  
import sqlContext.implicits._ 
JavaSparkContext.fromSparkContext(sc); 

def rddconsumo(rddData: Double): Double = { 

val rddData: Double = { 
    implicit val data = conf 
    val grid = sc.cassandraTable("smartgrids", "analyzer").as((r:Double) => (r)).collect 

def goto(cs: Array[Double]): Double = { 

    var consumo = 0.0; 
    var totaldias = 0; 
    var soma_pf = 0.0; 
    var somamc = 0.0; 
    var tempo_gasto = 0.0; 
    var consumo_mensal = 0.0; 
    var i=0 
for (i <- 0 until grid.length) {  
    val minutos = sc.cassandraTable("smartgrids","analyzer_temp").select("timecol", "MINUTE"); 
    val horas = sc.cassandraTable("smartgrids","analyzer_temp").select("timecol","HOUR_OF_DAY"); 
    val dia = sc.cassandraTable("smartgrids","analyzer_temp").select("timecol", "DAY_OF_MONTH"); 
    val ano = sc.cassandraTable("smartgrids","analyzer_temp").select("timecol", "YEAR"); 
    val mes = sc.cassandraTable("smartgrids","analyzer_temp").select("timecol", "MONTH"); 
    val potencia = sc.cassandraTable("smartgrids","analyzer_temp").select("n_pf1", "n_pf2", "n_pf3") 

def convert_minutos (minuto : Int) : Double ={ 
    minuto/60 
} 
    dia.foreach (i => { 

    def adSum(potencia: Array[Double]) = { 

    var i=0; 
     while (i < potencia.length) { 
     soma_pf += potencia(i); 
     i += 1; 
     soma_pf; 
     println("Potemcia =" + soma_pf) 
    } 
} 
    def tempo(minutos: Array[Int]) = { 
     var i=0; 
     while (i < minutos.length) { 
     somamc += convert_minutos(minutos(i)) 
     i += 1; 
     somamc 
    } 
    } 

    def tempogasto(horas: Array[Int]) = { 
     var i=0; 
     while (i < horas.length) { 
     tempo_gasto = horas(i) + somamc; 
     i += 1; 
     tempo_gasto; 
     println("Temo que o aparelho esteve ligado =" + tempo_gasto) 
    } 
} 

def consumof(dia: Array[Int]) = { 
    var i=0; 
    while (i < dia.length) { 
     consumo = soma_pf * tempo_gasto; 
     i += 1; 
     consumo; 
     println("Consumo diario =" + consumo)  
    } 
    } 
}) 

mes.foreach (i => { 

def totaltempo(dia: Array[Int]) = { 
    var i = 0; 
    while(i < dia.length){ 
     totaldias += dia(i); 
     i += 1; 
     totaldias; 
     println("Numero total de dias =" + totaldias) 
    } 
} 
def consumomensal(mes: Array[Int]) = { 
    var i = 0; 
    while(i < mes.length){ 
     consumo_mensal = consumo * totaldias; 
     i += 1; 
    consumo_mensal; 
    println("Consumo Mensal =" + consumo_mensal); 
    } 
} 
}) 

} 
    consumo; 
    totaldias; 
    consumo_mensal; 
    soma_pf; 
    tempo_gasto; 
    somamc 

} 

rddData 

    } 
    rddData.toDF().registerTempTable("rddData") 
} 
     ssc.start() 
     ssc.awaitTermination() 




error: value toDF is not a member of Double" 
+0

Bitte den gesamten Fehler-Stack posten! Das wäre hilfreich –

+0

: 260: Fehler: Wert toDF ist kein Mitglied von Double rddData.toDF(). RegisterTempTable ("rddData") –

Antwort

0

Es ist ziemlich unklar, was Sie versuchen, genau das zu tun (zu viel Code, versuchen, eine minimal Beispiel bietet), aber es gibt ein paar offensichtliche Probleme:

  1. rddData hat Typ Double: scheint wie es sollte RDD[Double] sein (die eine verteilte Sammlung von Double-Werten ist). Der Versuch, einen einzelnen Double Wert als eine Tabelle zu speichern, macht nicht viel Sinn, und tatsächlich - funktioniert nicht (toDF kann auf eine RDD aufgerufen werden, kein Typ, speziell nicht auf Double, wie der Compiler warnt).
  2. collect() Sie die Daten: Wenn Sie eine RDD laden möchten, verwandeln es einige Manipulation verwenden und es dann als Tabelle speichern - collect() sollte wahrscheinlich nicht auf dieser RDD aufgerufen werden. collect() sendet alle Daten (verteilt über den Cluster) auf den einzelnen "Treiber" -Maschinentyp (den, auf dem dieser Code ausgeführt wird) - nach dem Sie den Cluster nicht nutzen und die Datenstruktur RDD nicht erneut verwenden, t konvertieren Sie die Daten unter Verwendung von toDF in einen DataFrame.