2016-07-19 17 views
0

PySpark newb hier und ich versuche, eine unhandliche RDD von historischen Daten zu organisieren. Ich lese die Daten in (von WASB) und muss seine Struktur verstehen. Ich habe eine allgemeine Vorstellung vom Schema, aber da dies ein großer Auszug ist, kann ich sehen, dass nicht alle Datensätze konsistent sind.Schema kann nicht mit PySpark angewendet werden - Inkonsistente Felder

Worum ich mich bemühe, bezieht sich auf RDD-Elemente nach Position, damit ich versuchen kann, etwas Sinn aus den Daten zu extrahieren. Ich kann mich wegen der Inkonsistenz jetzt nicht auf ein Schema festlegen - das bedeutet, dass Datenrahmen keine Option sind und ich den flexiblen Abfragestil eines DF verliere.

Kurze Zusammenfassung der Umwelt und die Daten, die in Frage:..
Azure HDInsight Cluster, Daten in WASB
HDFS v 2.7
YARN v 2.7
Funken v 1.6 (HA Config mit 8 Arbeitern Knoten (16 Kerne x 112 GB RAM jedes)
Jupyter - PySpark
Daten:. seltsam abgegrenzt "CSV" mit ca. 765mm Aufzeichnungen

lesen die Daten

Split auf dem funky Begrenzer
ops = ops.map(lambda s: s.split(u"\ufffd")).cache()

Zeige 5 Aufzeichnungen RDD
ops.take(5)

[ [u'ALER RECOMMENDED BRAKE FLUID EXCHANGE - $139.88 ~', u'~PERFORMED DEALER RECOMMENDED BRAKE FLUID EXCHANGE USING A SPECIAL BRAKE FLUID EXCHANGE MACHINE,A PRESSURIZED SUPPLY OF MOC BRAKE FLUID', u'HIST_LD', u'2016-03-08 16:02:53', u'ARCHIVE'] ,[u'04638', u'0734140', u'2011-10-19', u'345267460', u'2', u'TIR', u'', u'0', u'685745051', u'TIRE INFLATION RULE CK ALL TIRES FOR ACCURATE PSI PER MANUFACTURER SPECIFICATION', u'HIST_LD', u'2016-03-08 16:01:39', u'ARCHIVE'] ,[u'04638', u'0734140', u'2011-10-19', u'345267460', u'1', u'PRIME ITEM', u'', u'0', u'0', u'TIRE INFLATION RULE CK ALL TIRES FOR ACCURATE PSI PER MANUFACTURER SPECIFICATION ~', u'~TIRE INFLATION RULE CK ALL TIRES FOR ACCURATE PSI PER MANUFACTURER SPECIFICATIONS AND DOCUMENT PSI ~', u'~ ~', u'~20450 SET AT 36 PSI.', u'HIST_LD', u'2016-03-08 16:01:39', u'ARCHIVE'] ,[u'12093', u'0399468', u'2011-10-19', u'345268559', u'2', u'201', u'', u'1.5', u'0', u'REPLACED GAS CAP AND SANDED FILLER NECK', u'HIST_LD', u'2016-03-08 16:07:15', u'ARCHIVE'] ,[u'12093', u'0399468', u'2011-10-19', u'345268559', u'1', u'PRIME ITEM', u'', u'0', u'0', u'REPLACED GAS CAP AND SANDED FILLER NECK ~', u'~REPLACE GAS CAP AND SAND FILLER NECK', u'HIST_LD', u'2016-03-08 16:07:15', u'ARCHIVE'] ]

Ich sehe, dass die dritte Spalte ein Datum sein könnte, Wie extrahiere ich diesen Wert aus jeder Zeile in der RD D?
(Pseudo-Code Probe für das Jahr 2013 Daten sucht hier):
ops.filter(lambda x[2]: year(x[2])==2013)

Ich finde begrenzte Dokumentation darüber, wie diese online zu tun - vor allem, da es inkonsequent strukturierte Daten ohne entscheidendes Schema zu Gerangel gehört. Bottom Line ist, was sollte diese Linie von "Pseudo-Code" eigentlich sein?

Mein ultimatives Ziel ist es, Daten von 2013-2015 zu analysieren, diese in ihre eigenen Datenrahmen zu partitionieren und sie in Hive zu schreiben. Danke für die Hilfe!

Antwort

0

beste Option, die ich im Moment denke, ist zu

  1. Split die Zeilen in der RDD basierend auf dem Begrenzer

rddNew = rddOriginal.map(lambda s: s.split(u"\ufffd")).cache()

  1. Durchsuchen Sie das Element, von dem Sie vermuten, dass es das fragliche Datum ist, und erstellen Sie eine neue RDD für die Zeit "Partition" mithilfe von RegEx-Fertigkeiten aus der Bibliothek "re"

import re rdd2013 = opcode.filter(lambda x : re.search('^2013', x[2]))

0

So ist dieses ein ein Weg, das Problem zu lösen:

from datetime import datetime 

def only_pass(maybe_date): 
    try: 
     datetime.strptime(maybe_date,"%Y-%m-%d").date() 
     return 1 
    except Exception as err:  
     return 0 

only_rows_with_dates = rdd.filter(lambda row: only_pass(row[2]) == 1) 
  • nicht hier nichts ausrichten kann:/hoffen, dass Sie erhalten den Punkt.
+0

Danke für das Feedback - an diesem Punkt muß ich für eine nicht Stress über die Überprüfung „vielleicht Date.“ Ich kann als Zeichenfolge analysieren und PySpark SQL verwenden, um bei Bedarf zu konvertieren. Ich denke, ich habe im Moment eine gute Boot-Strappy-Lösung. –