2016-07-09 15 views
0

So habe ich Hadoop 2.7.1 auf einem 3-Rechner-Cluster installiert. Ich versuche, einen MapReduce-Job mit invertiertem Index mithilfe von MRJob und Hadoop Streaming auszuführen.MRJob gleichen Schlüssel wird an verschiedene Reduzierungen gesendet

Hier ist meine Konfiguration:

MRJob.SORT_VALUES = True 

def steps(self): 
    JOBCONF_STEP1 = { 
     "mapred.map.tasks":20, 
     "mapred.reduce.tasks":10 
    } 
    return [MRStep(jobconf=JOBCONF_STEP1, 
       mapper=self.mapper, 
       reducer=self.reducer) 
      ] 

Allerdings habe ich in meiner Ausgabe bemerkt habe, dass ich oft den gleichen Schlüssel zu zwei verschiedenen Reduzierungen gehen. Dies führt zu einer Ausgabe, die wie folgt aussieht:

Key | Output 
Z | 2 
X | 1,2 
X | 3 
Z | 1 

Das bedeutet, dass eine Reduzierung die X-Taste wird immer und die Werte 1 und 2, während ein anderer Minderer wird auch die X-Taste und den Wert bekommt 3. Aber ich will nur ein Reduzierer, um die X-Taste und alle zugehörigen Werte zu erhalten.

So ist die gewünschte Ausgabe ist:

Key | Output 
X | 1,2,3 
Z | 1,2 

Wie kann ich dieses Problem beheben?

Hier ist mein MRJob Code

%%writefile invertedIndex.py 

import json 
import mrjob 
from mrjob.job import MRJob 
from mrjob.step import MRStep 
class MRinvertedIndex(MRJob): 

    MRJob.SORT_VALUES = True 

    def steps(self): 
     JOBCONF_STEP1 = { 
      "mapred.map.tasks":20, 
      "mapred.reduce.tasks":10 
     } 
     return [MRStep(jobconf=JOBCONF_STEP1, 
        mapper=self.mapper, 
        reducer=self.reducer) 
       ] 

    def mapper(self,_,line): 
     key, stripe = line.split("\t") 
     stripe = json.loads(stripe) 
     for w in stripe: 
      yield w, key 

    def reducer(self,key,values): 
     d = [v for v in values] 
     yield key,d 

    if __name__ == '__main__': 
     MRinvertedIndex.run() enter code here 

Antwort

0

figured it out. Das Problem war, dass MRJob standardmäßig folgende einstellte:

'stream.num.map.output.key.fields': '1' 

Ich weiß nicht, wie 2 muß sein die Standardeinstellung für diese:

'stream.num.map.output.key.fields': '2' 

ich das Problem, indem explizit Einstellung in jobconf aufgelöst Einstellung, aber zumindest löste ich mein Problem