2016-07-27 81 views
8

Ich möchte senden einen Strom von einer Reihe von Dokumenten an einen Web-Service. Dies spart den HTTP-Request/Response-Overhead und konzentriert sich auf die Dokumente selbst.Senden eines Stream von Dokumenten an einen Jersey @ POST-Endpunkt

In Python können Sie etwas tun:

r = requests.post('https://stream.twitter.com/1/statuses/filter.json', 
    data={'track': 'requests'}, auth=('username', 'password'), 
    stream=True) 

for line in r.iter_lines(): 
    if line: # filter out keep-alive new lines 
     print json.loads(line) 

Ich bin ein Beispiel für jemanden, der eine Anfrage an einen Jersey Rest api Streaming. Ich hatte gehofft, die Client-Seite und die Server-Seite zu sehen, dass es funktioniert. Aber ich kämpfe hart, um dort ein Beispiel zu finden.

Das Beispiel würde zeigen Idealer:

Client: 
    Open request 
    Iterate over huge document list 
    Write document to open request stream 
    Close request 

Server: 
    @POST method 
    Open entity stream 
    Iterate over entity stream while next document is available 
     Process document 
    Close entity stream    

Wenn wir es richtig finden Sie auf dem Server-Verarbeitungseinheiten, während sie noch auf dem Client zu senden! Großer Gewinn!

+1

Wenn Ihr Client und Server sind beide Jersey-basiert, könnten Sie versuchen, [ChunkedOutput] (https://jersey.java.net/documentation/latest/async. html # chunked-output) und ein entsprechender ChunkedInput. Ich weiß nicht, wie ein Client in Javascript ChunkedInput jedoch tun würde. –

+0

Gibt es einen Grund, warum wir die Dateien nicht einzeln einsenden? –

+0

Riesige Batch-Prozess. Interesse an Geschwindigkeit mit wenigen Knoten wie möglich. Streaming wird viel Aufwand sparen, wenn ich mir das richtig überlege. –

Antwort

5

Eine der einfachsten Möglichkeiten, dies zu erreichen, besteht darin, Jersey den POST-Handler mit InputStream für den HTTP-POST-Text bereitstellen zu lassen. Die Methode kann den InputStream und den JSON-Parser Ihrer Wahl verwenden, um jedes Objekt zu analysieren und dann zu behandeln.

Im folgenden Beispiel, das ein Jackson ObjectReader eine MappingIterator erzeugt, die jedes Person Dokument in dem Array und verarbeitet parst, wie sie an den Server

/** 
* Parse and process an arbitrarily large JSON array of Person documents 
*/ 
@Path("persons") 
public static class PersonResource { 

    private static final ObjectReader reader = new ObjectMapper().readerFor(Person.class); 

    @Path("inputstream") 
    @Consumes("application/json") 
    @POST 
    public void inputstream(final InputStream is) throws IOException { 
     final MappingIterator<Person> persons = reader.readValues(is); 
     while (persons.hasNext()) { 
      final Person person = persons.next(); 
      // process 
      System.out.println(person); 
     } 
    } 
} 

Ebenfalls geliefert wird, kann der Jersey Client Framework einen Strom schicken von Dokumenten, wenn mit einem Jackson ObjectMapper konfiguriert. Das folgende Beispiel demonstriert dies mit dem Jersey Test-Framework. Die Client-Streams einen beliebig großen Iterator von Person Dokumenten

public class JacksonStreamingTest extends JerseyTest { 

    @Override 
    protected Application configure() { 
     return new ResourceConfig(PersonResource.class, ObjectMapperProvider.class); 
    } 

    /** 
    * Registers the application {@link ObjectMapper} as the JAX-RS provider for application/json 
    */ 
    @Provider 
    @Produces(MediaType.APPLICATION_JSON) 
    public static class ObjectMapperProvider implements ContextResolver<ObjectMapper> { 

     private static final ObjectMapper mapper = new ObjectMapper(); 

     public ObjectMapper getContext(final Class<?> objectType) { 
      return mapper; 
     } 
    } 

    @Override 
    protected void configureClient(final ClientConfig config) { 
     config.register(ObjectMapperProvider.class); 
    } 

    @Test 
    public void test() { 
     final Set<Person> persons = Collections.singleton(Person.of("Tracy", "Jordan")); 
     final Response response = target("persons/inputstream").request().post(Entity.json(persons.iterator())); 
     assertThat(response.getStatus()).isEqualTo(204); 
    } 
} 
+0

Große Antwort! Führt Jetty eine Pufferung durch, die die Leistung beeinträchtigen könnte? –

+0

Jersey (nicht Jetty) ist der Rahmen, der schließlich die 'InputStream'-Methode liefert, die mit '@ POST' verziert ist, natürlich kann Jersey auf einem Jetty-Server laufen, in welchem ​​Fall der' InputStream' übergeben werden kann entlang von Jetty ohne Modifikation - es ist unklar. Der JSON-Parser hat wahrscheinlich eine eigene Pufferung (Jackson verwendet einen Puffer beim Parsen), aber um sicher zu gehen, könnte man den 'InputStream' immer mit einem [BufferedInputStream] (https://docs.oracle.com/javase) umschließen /7/docs/api/java/io/BufferedInputStream.html) – jdgilday

+0

OK @jdgilday zwei Dinge: 1) Ohne das folgende Snippet würden die Dokumente niemals leeren: 'clientConfig.property (ClientProperties.CHUNKED_ENCODING_SIZE, Integer.valueOf (0)); ' 2) wir verlieren wie ~ 1% der Dokumente.Ich kann bestätigen, dass sie von der 'next() 'Methode des Iterators auf dem Client mit dem Debugger zurückgegeben werden, aber aus irgendeinem Grund gehen ein paar Dokumente nicht dorthin. –