Kann mir jemand sagen, wie Pig UDF-Objekte instanziiert? Ich habe Pig benutzt, um eine Pipeline aufzubauen, um einige Daten zu verarbeiten. Ich habe die Pipeline im Multi-Node-Cluster Hadoop
implementiert. Und ich möchte alle Zwischenergebnisse speichern, die nach jedem Schritt in der Pipeline erzeugt werden. Also schrieb ich eine UDF in Java, die eine HTTP-Verbindung bei der Initialisierung öffnet und Daten in exec
überträgt. Außerdem werde ich die Verbindung in finalize
des Objekts schließen.Wie instanziiert Pig UDF-Objekte
Mein Skript wie folgt vereinfacht werden kann:
REGISTER MyPackage.jar;
DEFINE InterStore test.InterStore('localhost', '58888');
DEFINE Clean test.Clean();
raw = LOAD 'mydata';
cleaned = FILTER (FOREACH raw GENERATE FLATTEN(Clean(*))) BY NOT ($0 MATCHES '');
cleaned = FOREACH cleaned GENERATE FLATTEN(InterStore(*));
named = FOREACH cleaned GENERATE $1 AS LocationID, $2 AS AccessCount;
named = FOREACH named GENERATE FLATTEN(InterStore(*)) AS (LocationID, AccessCount);
grp = GROUP named BY LocationID;
grp = FOREACH grp GENERATE FLATTEN(InterStore(*)) AS (group, named:{(LocationID, AccessCount)});
sum = FOREACH grp GENERATE group AS LocationID, SUM(named.AccessCount) AS TotalAccesses;
sum = FOREACH sum GENERATE FLATTEN(InterStore(*)) AS (LocationID, TotalAccesses);
ordered = ORDER sum BY TotalAccesses DESC;
STORE ordered INTO 'result';
und der Code für Inters kann wie unten vereinfacht werden:
class InterStore extends EvalFunc<Tuple>{
HttpURLConnection con; //Avoid redundant connection establishment in exec
public InterStore(String ip, String port) throws IOException
{
URL url = new URL("http://" + ip + ':' + port);
con = (HttpURLConnection)url.openConnection();
con.setRequestMethod("PUT");
con.setDoOutput(true);
con.setDoInput(true);
}
public Tuple exec(Tuple input) throws IOException
{
con.getOutputStream().write((input.toDelimitedString(",")+'\n').getBytes());
return input;
}
@Override
protected void finalize() throws Throwable
{
con.getOutputStream().close();
int respcode = con.getResponseCode();
BufferedReader in = new BufferedReader(new InputStreamReader(con.getInputStream()));
System.out.printf("Resp Code:%d, %s\n", respcode, in.readLine());
in.close();
}
}
aber ich, dass die HTTP-Verbindung nicht gefunden Daten übertragen kann erfolgreich es tut im lokalen Modus. Wie geht man damit um?
Hallo, wenn Sie das Skript teilen können, wo Sie versuchten Verwenden Sie Ihre UDF, die helfen würde. – kecso
Ich habe ein Codebeispiel hinzugefügt. Thx ~ – Trams