2016-05-04 9 views
0

Kann mir jemand sagen, wie Pig UDF-Objekte instanziiert? Ich habe Pig benutzt, um eine Pipeline aufzubauen, um einige Daten zu verarbeiten. Ich habe die Pipeline im Multi-Node-Cluster Hadoop implementiert. Und ich möchte alle Zwischenergebnisse speichern, die nach jedem Schritt in der Pipeline erzeugt werden. Also schrieb ich eine UDF in Java, die eine HTTP-Verbindung bei der Initialisierung öffnet und Daten in exec überträgt. Außerdem werde ich die Verbindung in finalize des Objekts schließen.Wie instanziiert Pig UDF-Objekte

Mein Skript wie folgt vereinfacht werden kann:

REGISTER MyPackage.jar; 
DEFINE InterStore test.InterStore('localhost', '58888'); 
DEFINE Clean  test.Clean(); 

raw = LOAD 'mydata'; 
cleaned = FILTER (FOREACH raw GENERATE FLATTEN(Clean(*))) BY NOT ($0 MATCHES ''); 
cleaned = FOREACH cleaned GENERATE FLATTEN(InterStore(*)); 
named = FOREACH cleaned GENERATE $1 AS LocationID, $2 AS AccessCount; 
named = FOREACH named GENERATE FLATTEN(InterStore(*)) AS (LocationID, AccessCount); 
grp = GROUP named BY LocationID; 
grp = FOREACH grp GENERATE FLATTEN(InterStore(*)) AS (group, named:{(LocationID, AccessCount)}); 
sum = FOREACH grp GENERATE group AS LocationID, SUM(named.AccessCount) AS TotalAccesses; 
sum = FOREACH sum GENERATE FLATTEN(InterStore(*)) AS (LocationID, TotalAccesses); 
ordered = ORDER sum BY TotalAccesses DESC; 
STORE ordered INTO 'result'; 

und der Code für Inters kann wie unten vereinfacht werden:

class InterStore extends EvalFunc<Tuple>{ 
    HttpURLConnection con; //Avoid redundant connection establishment in exec 
    public InterStore(String ip, String port) throws IOException 
    { 
    URL url = new URL("http://" + ip + ':' + port); 
    con = (HttpURLConnection)url.openConnection(); 
    con.setRequestMethod("PUT"); 
    con.setDoOutput(true); 
    con.setDoInput(true); 
    } 
    public Tuple exec(Tuple input) throws IOException 
    { 
    con.getOutputStream().write((input.toDelimitedString(",")+'\n').getBytes()); 
    return input; 
    } 
    @Override 
    protected void finalize() throws Throwable 
    { 
    con.getOutputStream().close(); 
    int respcode = con.getResponseCode(); 
    BufferedReader in = new BufferedReader(new InputStreamReader(con.getInputStream())); 
    System.out.printf("Resp Code:%d, %s\n", respcode, in.readLine()); 
    in.close(); 
    } 
} 

aber ich, dass die HTTP-Verbindung nicht gefunden Daten übertragen kann erfolgreich es tut im lokalen Modus. Wie geht man damit um?

+0

Hallo, wenn Sie das Skript teilen können, wo Sie versuchten Verwenden Sie Ihre UDF, die helfen würde. – kecso

+0

Ich habe ein Codebeispiel hinzugefügt. Thx ~ – Trams

Antwort

0

Gibt es einen Dienst, der 'localhost', '58888' abhört?

Beachten Sie, dass der lokale Host sind, unterscheidet sich von den einzelnen Ausführungsknoten, können Sie dies tun:

%default LHOST `localhost` 

und verwenden diese Variable als Parameter

DEFINE InterStore test.InterStore('$LHOST', '58888'); 

Im Allgemeinen würde ich ein paar Ausdrucke tun in der UDF und überprüfen Sie die Parameter übergeben, und testen Sie die Verbindung (wie Ping und überprüfen, ob der Port vom Hadoop-Knoten zugänglich ist)