2015-10-14 7 views
5

Programmierung mit pyspark auf einem Spark Cluster, die Daten sind groß und in Stücken so kann nicht in den Speicher geladen werden oder überprüfen Sie die Gesundheit der Daten leichtpyspark: TypeError: IntegerType kann kein Objekt vom Typ <type 'unicode'>

im Grunde sieht es aus wie

af.b Current%20events 1 996 
af.b Kategorie:Musiek 1 4468 
af.b Spesiaal:RecentChangesLinked/Gebruikerbespreking:Freakazoid 1 5209 
af.b Spesiaal:RecentChangesLinked/Sir_Arthur_Conan_Doyle 1 5214 

wikipedia Daten:

ich es von aws S3 gelesen und dann versuchen, Funken Datenrahmen mit dem folgenden python-Code in pyspark Interpreter zu konstruieren:

parts = data.map(lambda l: l.split()) 
wikis = parts.map(lambda p: (p[0], p[1],p[2],p[3])) 


fields = [StructField("project", StringType(), True), 
StructField("title", StringType(), True), 
StructField("count", IntegerType(), True), 
StructField("byte_size", StringType(), True)] 

schema = StructType(fields) 

df = sqlContext.createDataFrame(wikis, schema) 

alle sehen gut aus, gibt nur createDataFrame mich

Traceback (most recent call last): 
File "<stdin>", line 1, in <module> 
File "/usr/lib/spark/python/pyspark/sql/context.py", line 404, in createDataFrame 
rdd, schema = self._createFromRDD(data, schema, samplingRatio) 
File "/usr/lib/spark/python/pyspark/sql/context.py", line 298, in _createFromRDD 
_verify_type(row, schema) 
File "/usr/lib/spark/python/pyspark/sql/types.py", line 1152, in _verify_type 
_verify_type(v, f.dataType) 
File "/usr/lib/spark/python/pyspark/sql/types.py", line 1136, in _verify_type 
raise TypeError("%s can not accept object in type %s" % (dataType, type(obj))) 
TypeError: IntegerType can not accept object in type <type 'unicode'> 

Fehler, warum ich nicht die dritte Spalte kann, die auf IntegerType zählen werden sollte? Wie kann ich das lösen?

+1

könnte es sein, weil Ihr Byte_size Structfield vom Typ StringType ist und sollte IntegerType? – ccheneson

+0

@ccheneson thx für den Kommentar –

Antwort

5

Wie von ccheneson festgestellt, übergeben Sie falsche Typen.

Sie data sieht wie folgt aus der Annahme:

data = sc.parallelize(["af.b Current%20events 1 996"]) 

Nach der ersten Karte, die Sie RDD[List[String]] erhalten:

parts = data.map(lambda l: l.split()) 
parts.first() 
## ['af.b', 'Current%20events', '1', '996'] 

Die zweite Karte wandelt es (String, String, String, String) tuple:

wikis = parts.map(lambda p: (p[0], p[1], p[2],p[3])) 
wikis.first() 
## ('af.b', 'Current%20events', '1', '996') 

Ihre schema besagt, dass 3r d eine ganze Zahl Spalten:

[f.dataType for f in schema.fields] 
## [StringType, StringType, IntegerType, StringType] 

Schema am meisten verwendet wird, eine vollständige Tabellensuche zu vermeiden Typen abzuleiten, und führt keine Art Gießen.

können Sie entweder werfen Sie Ihre Daten während der letzten Karte:

wikis = parts.map(lambda p: (p[0], p[1], int(p[2]), p[3])) 

Oder definieren count als StringType und Gusssäule

fields[2] = StructField("count", StringType(), True) 
schema = StructType(fields) 

wikis.toDF(schema).withColumn("cnt", col("count").cast("integer")).drop("count") 

Auf einer seitlichen Anmerkung count ist reserviertes Wort in SQL und shouldn‘ t als Spaltenname verwendet werden. In Spark wird es in einigen Kontexten wie erwartet funktionieren und in einem anderen fehlschlagen.

0

Mit Apache 2.0 können Sie Funke auf das Schema Ihrer Daten schließen lassen. Alles in allem müssen Sie Ihre Parser-Funktion wie oben dargelegt einbetten:

"Wenn das Schema None ist, versucht es, das Schema (Spaltennamen und -typen) aus Daten abzuleiten, die eine RDD von Row oder sein sollten namedtuple oder dict. "