2013-03-25 15 views
6

Ich habe einen JSON-RPC-Dienst, der für eine der Anforderungen einen kontinuierlichen Strom von JSON-Objekten zurückgibt.HTTP Continuous Packeted Stream mit Indy

I.e. :

{id:'1'} 
{id:'2'} 
//30 minutes of no data 
{id:'3'} 
//... 

Natürlich gibt es keine Content-Length, weil der Strom ist endlos.

Ich benutze benutzerdefinierte TStream-Nachkommen zum Empfangen und Parsen der Daten. Aber intern TIdHttp puffert die Daten und übergibt es mir nicht, bis RecvBufferSize Bytes empfangen werden.

Daraus ergibt sich:

{id:'1'} //received 
{id:'2'} //buffered by Indy but not received 
//30 minutes of no data 
{id:'3'} //this is where Indy commits {id:'2'} to me 

Offensichtlich wird dies nicht tun, weil die Botschaft, die vor 30 Minuten zählte, sollte vor 30 Minuten geliefert wurden.

Ich möchte, dass Indy genau das macht, was Sockets tun: Lesen Sie RecvBufferSize oder weniger, wenn Daten verfügbar sind, und kehren Sie sofort zurück.

Ich habe this discussion von 2005 gefunden, wo einige arme Seele versucht, das Problem zu Indy-Entwicklern zu erklären, aber sie haben ihn nicht verstanden. (Lesen Sie es; es ist ein trauriger Anblick)

Wie auch immer, er arbeitete um dies zu schreiben, indem er benutzerdefinierte IOHandler Nachkommen schrieb, aber das war im Jahr 2005, vielleicht gibt es heute einige fertige Lösungen?

Antwort

2

Während TCP-Stream war eine Option, am Ende ging ich mit der ursprünglichen Lösung des Schreibens benutzerdefinierte TIdIOHandlerStack Nachkommen.

Die Motivation war, dass ich mit TIdHTTP weiß, was nicht funktioniert und nur das beheben muss, während das Umschalten auf TCP niedrigerer Ebene bedeutet, dass neue Probleme entstehen können.

Here's the code that I'm using, und ich werde die wichtigsten Punkte hier diskutieren.

Neu TIdStreamIoHandler muss von TIdIOHandlerStack erben.

Zwei benötigen Funktionen neu geschrieben werden: ReadBytes und ReadStream:

function TryReadBytes(var VBuffer: TIdBytes; AByteCount: Integer; 
    AAppend: Boolean = True): integer; virtual; 
procedure ReadStream(AStream: TStream; AByteCount: TIdStreamSize = -1; 
    AReadUntilDisconnect: Boolean = False); override; 

Beide Indy Funktionen geändert werden, die in IdIOHandler.TIdIOHandler gefunden werden kann. In ReadBytes die while Klausel muss durch eine Singe ReadFromSource() Anfrage ersetzt werden, so dass TryReadBytes kehrt nach dem Lesen auf AByteCount Bytes auf einmal.

Auf dieser Basis ReadStream hat alle Kombinationen von AByteCount zu behandeln (> 0, < 0) und ReadUntilDisconnect (wahr, falsch) zyklisch lesen und schreiben Sie dann Datenblocks aus der Steckdose ankommen zu streamen.

Beachten Sie, dass ReadStream auch in dieser Stream-Version nicht vorzeitig beendet werden muss, wenn nur ein Teil der angeforderten Daten im Socket verfügbar ist. Es muss nur diesen Teil sofort in den Stream schreiben, anstatt ihn in FInputBuffer zwischenzuspeichern, dann blocken und auf den nächsten Teil der Daten warten.

+0

da Indy Open Source ist, können modifizierte Quellen (und, falls hilfreich für andere, sollten) veröffentlicht werden – mjn

+0

@mjn: Ich wusste das nicht, danke. Der Code wurde hinzugefügt. – himself

2

Sie müssen keinen IOHandler-Nachfolger schreiben, es ist bereits mit der Klasse TIdTCPClient möglich. Es stellt ein TIdIOHandler-Objekt zur Verfügung, das über Methoden zum Lesen aus dem Socket verfügt. Diese ReadXXX-Methoden blockieren, bis die angeforderten Daten gelesen wurden oder eine Zeitüberschreitung auftritt. Solange die Verbindung besteht, kann ReadXXX in einer Schleife ausgeführt werden und jedesmal, wenn es ein neues JSON-Objekt empfängt, an die Anwendungslogik übergeben.

Ihr Beispiel sieht so aus, als ob alle JSON-Objekte nur eine Zeile haben. JSON-Objekte können jedoch mehrere Zeilen umfassen. In diesem Fall muss der Client-Code wissen, wie sie getrennt sind.


Update: in einer ähnlichen Frage Stackoverflow (für .Net) für einen 'Streaming' HTTP JSON Web-Service, die upvoted Lösung verwendet, um ein untergeordnetes TCP-Client anstelle einem HTTP-Client: Reading data from an open HTTP stream

4

Klingt für mich wie eine WebSocket Aufgabe, da Ihre Verbindung nicht einfach HTTP-Frage/Antwort mehr orientiert ist, sondern ein Strom von Inhalten.

Für einen Code siehe WebSocket server implementations for Delphi.

Es gibt at least one based on Indy, vom Autor von AsmProfiler.

AFAIK gibt es zwei Arten von Stream in Websockets: Binär und Text. Ich vermute, dass Ihr JSON-Stream ein Textinhalt ist, aus der Sicht des Web-Sockets.

Eine andere Möglichkeit ist es, long-pooling oder einige ältere Protokolle zu verwenden, die rooterfreundlicher sind - wenn die Verbindung zum WebSockets-Modus wechselt, ist es kein Standard-HTTP mehr, also einige "sinnvolle" Paketprüfungs-Tools (auf einem Unternehmensnetzwerk) kann es als Sicherheitsangriff (z. B. DoS) identifizieren und die Verbindung möglicherweise beenden.

+0

Wenn ich es richtig mache, müssen beide Lösungen den Dienst neu schreiben? Weil ich keinen Zugang dazu habe. – himself

+0

@himself Wenn Ihre Anfrage ist, dass die Verbindung offen ist und keine Content-Length-Header verwendet wird, ist dies kein HTTP mehr, daher müssen Sie die Serviceseite ändern. –

+0

Mhm, rate mal was die Serviceseite sagen wird? "Nirgendwo im HTTP-Standard heißt es, dass HTTP-Middleware Daten für längere Zeiträume puffern kann. Daher ist unser Service in Ordnung, ich nehme an, dass Sie Ihren HTTP-Client-Code reparieren müssen". Zurück zum Anfang. – himself

0

Es gibt tatsächlich eine Länge Daten direkt vor dem Inhalt des Pakets, die im Chunked-Encoding-Transfer-Modus übertragen. Unter Verwendung dieser Längendaten liest IOhandler von idhttp ein Paket um ein Paket, um zu streamen. Die kleinste bedeutungstragende Einheit ist ein Paket. Daher sollte es nicht notwendig sein, Zeichen nacheinander aus einem Paket zu lesen, und dann müssen die Funktionen von IOHandler nicht geändert werden. Das einzige Problem ist, dass idhttp nicht aufhören würde, die Stream-Daten wegen der endlosen Stream-Daten zum nächsten Schritt zu schalten: Es gibt kein End-Paket. So ist die Lösung wird mit idhttp onwork Ereignisse eine Lesung aus dem Strom zu triggern und der Einstellung der Stromposition auf Null zu vermeiden, um Überlauf .like dies:

//add a event handler to idhttp  
    IdHTTP.OnWork := IdHTTPWork; 


    procedure TRatesStreamWorker.IdHTTPWork(ASender: TObject; AWorkMode: TWorkMode; AWorkCount: Int64); 
    begin 
     ..... 
     ResponseStringStream.Position :=0; 
     s:=ResponseStringStream.ReadString(ResponseStringStream.Size) ;//this is the packet conten 
     ResponseStringStream.Clear; 
     ... 
    end; 

procedure TForm1.ButtonGetStreamPricesClick(Sender: TObject); 
var 
begin 
    .....  
    source := RatesWorker.RatesURL+'EUR_USD'; 
    RatesWorker.IdHTTP.Get(source,RatesWorker.ResponseStringStream); 
end; 

Noch eine benutzerdefinierte Schreib() -Funktion von Tstream kann sein, bessere Lösung für diese Art von Anforderung.