2016-05-24 7 views
1

Ich versuche, die Tweets von Sample Stream in einer Datenbank zu speichern und speichern Sie die RAW JSON zur gleichen Zeit. Ich verwende Twitter4jStatusClient nach dem example in Hbc Github-Repository. Da ich nur eine Teilmenge von Informationen in Echtzeit in der Datenbank abspeichere, hoffe ich, auch den rohen JSON des Tweets zu speichern, damit ich zusätzliche Informationen abrufen kann, wenn ich sie brauche. Die Verwendung von Twitter4jStatusClient bedeutet jedoch, dass der Listener in einem anderen Thread ausgeführt wird, und in here heißt es, dass, um das Json-Objekt zu erhalten, es von demselben Thread ausgeführt werden muss, der das Json-Objekt abgerufen hat. Gibt es eine Möglichkeit, den JSON-String zu speichern, wenn Twitter4JStatusClient verwendet wird? Ich entschied mich, diese example nicht zu verwenden, weil ich nur bestimmte Aktionen ausführen und die JSON-Zeichenfolge speichern wollte, wenn es ein Status ist. Vielen Dank! Rette JSON-Zeichenketten mit hbc Twitter4jStatusClient

// Create an appropriately sized blocking queue 
    BlockingQueue<String> queue = new LinkedBlockingQueue<String>(10000); 

    // Define our endpoint: By default, delimited=length is set (we need this for our processor) 
    // and stall warnings are on. 
    StatusesSampleEndpoint endpoint = new StatusesSampleEndpoint(); 
    // Specify the language filter for the endpoint 
    endpoint.addQueryParameter(Constants.LANGUAGE_PARAM, Joiner.on(',').join(Lists.newArrayList("en"))); 
    endpoint.stallWarnings(false); 

    Authentication auth = new OAuth1(consumerKey, consumerSecret, token, secret); 

    // Create a new BasicClient. By default gzip is enabled. 
    BasicClient client = new ClientBuilder() 
      .name("sampleStreamClient") 
      .hosts(Constants.STREAM_HOST) 
      .endpoint(endpoint) 
      .authentication(auth) 
      .processor(new StringDelimitedProcessor(queue)) 
      .build(); 

    // Create an executor service which will spawn threads to do the actual work of parsing the incoming messages and 
    // calling the listeners on each message 
    int numProcessingThreads = 4; 
    ExecutorService service = Executors.newFixedThreadPool(numProcessingThreads); 


    StatusListener listener = new SampleStreamStatusListener(jsonInserter); 

    // Wrap our BasicClient with the twitter4j client 
    t4jClient = new Twitter4jStatusClient(
      client, queue, Lists.newArrayList(listener), service); 

Antwort

0

Ich hatte ein ähnliches Problem mit Twitter4jStatusClient, hier sind ein paar Ideen

Eine Zwischen Warteschlange

Sie könnten einen separaten Thread-Pool haben, der die rohen Nachrichten aus Ihrem queue Variable liest, speichert sie irgendwo und bringt sie in eine neue Warteschlange, die wir hbcQueue aufrufen, die Sie an den Twitter4jStatusClient-Konstruktor anstelle von queue übergeben.

BlockingQueue<String> hbcQueue = new LinkedBlockingQueue<>(10000); 
ExecutorService rawJsonSaver = Executors.newFixedThreadPool(numProcessingThreads); 
for (int i = 0; i < numProcessingThreads; i++) { 
    rawJsonSaver.execute(() -> { 
    for (;;) { 
     try { 
     String msg = queue.take(); 
     JSONObject jobj = new JSONObject(msg); 
     if (JSONObjectType.determine(jobj) == JSONObjectType.Type.STATUS) { 
      System.out.println(msg); // Save it 
      hbcQueue.add(msg); 
     } 
     } catch (InterruptedException e) { 
     Thread.currentThread().interrupt(); break; 
     } catch (JSONException e) { 
     continue; 
     } 
    } 
    }); 
} 
Twitter4jStatusClient t4jClient = new Twitter4jStatusClient(
    client, hbcQueue, Lists.newArrayList(listener), service); 

Aber natürlich Dies hat die Leistungsnachteile des JSON ein zweites Mal Parsing und das Hinzufügen eines weiteren blockierenden Sperroperation für die zweite parallele Warteschlange.

Re-Serialisierung

Wenn Sie später wird die rohe JSON in Java verarbeiten, könnte man glatt Java Serialisierung verwenden, da das Status Objekt übergeben, um Ihre StatusListenerSerializable implementiert. Dies ist nicht weit von der erneuten Serialisierung in JSON entfernt, aber zumindest müssen Sie nicht jedes Feld manuell serialisieren.

@Override 
    public void onStatus(final Status status) { 
    byte[] serializedStatus; 
    try (ByteArrayOutputStream byteStream = new ByteArrayOutputStream(); 
     ObjectOutputStream objStream = new ObjectOutputStream(byteStream)) { 
     objStream.writeObject(status); 
     serializedStatus = byteStream.toByteArray(); 
    } catch (IOException e) { 
     throw new RuntimeException(e); 
    } 
    // store serializedStatus 
    // . . . 
    }