2016-04-06 8 views
1

Ich versuche eine XML-Datei vorzuverarbeiten, um bestimmte Knoten zu extrahieren, bevor sie in MapReduce eingefügt werden. Ich habe den folgenden Code:Python mrjob mapreduce wie man die Eingabedatei vorverarbeitet

from mrjob.compat import jobconf_from_env 
from mrjob.job import MRJob 
from mrjob.util import cmd_line, bash_wrap 

class MRCountLinesByFile(MRJob): 
    def configure_options(self): 
     super(MRCountLinesByFile, self).configure_options() 
     self.add_file_option('--filter') 

    def mapper_cmd(self): 
     cmd = cmd_line([self.options.filter, jobconf_from_env('mapreduce.map.input.file']) 
     return cmd 



if __name__ == '__main__': 
    MRCountLinesByFile.run() 

Und auf der Kommandozeile, I-Typ:

python3 test_job_conf.py --filter ./filter.py -r local < test.txt 

test.txt ist eine normale XML-Datei wie here. Während filter.py ist ein Skript, um alle Titelinformationen zu finden.

Allerdings erhalte ich folgende Fehler:

Creating temp directory /tmp/test_job_conf.vagrant.20160406.042648.689625 
Running step 1 of 1... 
Traceback (most recent call last): 
    File "./filter.py", line 8, in <module> 
    with open(filename) as f: 
FileNotFoundError: [Errno 2] No such file or directory: 'None' 
Step 1 of 1 failed: Command '['./filter.py', 'None']' returned non-zero exit status 1 

Es sieht aus wie mapreduce.map.input.fileNone in diesem Fall machen. Wie kann ich die mapper_cmd Funktion bitten, die Datei zu lesen, die mrjob gerade liest?

Antwort

0

Nach meinem Verständnis geht in Ihrem selbst. Add_file_option sollte den Pfad zu Ihrer Datei haben.

Ich verstehe nicht ganz Ihr Szenario richtig, aber hier ist mein Verständnis. Sie verwenden die Option configure, um sicherzustellen, dass eine bestimmte Datei an alle Mapper zur Verarbeitung gesendet wird, wenn Sie beispielsweise eine zusätzliche Suche nach Daten in einer anderen Datei als der Quelle durchführen möchten. Diese zusätzliche Lookup-Datei wird von self.add_file_option zur Verfügung gestellt ('- items', help = 'Path to u.item').

Um etwas vor einem Reducer oder einer Mapper-Phase vorzuverarbeiten, verwenden Sie den Reducer_init oder den Mapper_init. Diese Init- oder die Verarbeitungsschritte müssen auch in Ihrer Schrittfunktion erwähnt werden, wie zum Beispiel unten gezeigt.

def steps(self): 
     return [ 
      MRStep(mapper=self.mapper_get_name, 
        reducer_init=self.reducer_init, 
        reducer=self.reducer_count_name), 
      MRStep(reducer = self.reducer_find_maxname) 
     ] 

Innerhalb Ihrer Init-Funktion führen Sie die eigentliche Vorverarbeitung durch, bevor Sie sie an den Mapper oder Reducer senden. Sagen Sie zum Beispiel öffnen Sie eine Datei xyz und kopieren Sie die Werte in das erste Feld in einem anderen Feld, das ich in meinem Reduzierer verwenden würde und die gleiche ausgeben.

def reducer_init(self): 
     self.movieNames = {}  
     with open("xyz") as f: 
      for line in f: 
       fields = line.split('|') 
       self.myNames[fields[0]] = fields[1] 

Hoffe, das hilft !!