2016-04-14 6 views
0

Ich versuche, einen umgekehrten HTTP-Proxy mit Spray/Akka zu implementieren, aber läuft in Schwierigkeiten. Ich habe festgestellt, dass mein Proxy-Server unter bestimmten Umständen weiterhin Daten vom Upstream-Server empfängt, selbst nachdem der Client die Verbindung getrennt hat.Spray Reverse Proxy: Daten weiterleiten, nachdem der Client die Verbindung getrennt hat

Hier ist, wie ich meine Spray-Proxy-Richtlinie umzusetzen (nur eine wenig Änderung bthuillier's implementation):

trait ProxyDirectives { 

    private def sending(f: RequestContext ⇒ HttpRequest)(implicit system: ActorSystem): Route = { 
    val transport = IO(Http)(system) 
    ctx ⇒ transport.tell(f(ctx), ctx.responder) 
    } 

    /** 
    * Re-shape the original request, to match the destination server. 
    */ 
    private def reShapeRequest(req: HttpRequest, uri: Uri): HttpRequest = { 
    req.copy(
     uri = uri, 
     headers = req.headers.map { 
     case x: HttpHeaders.Host => HttpHeaders.Host(uri.authority.host.address, uri.authority.port) 
     case x => x 
     } 
    ) 
    } 

    /** 
    * proxy the request to the specified uri 
    * 
    */ 
    def proxyTo(uri: Uri)(implicit system: ActorSystem): Route = { 
    sending(ctx => reShapeRequest(ctx.request, uri)) 
    } 
} 

Dieser Reverse-Proxy wird gut funktionieren, wenn ich eine Proxy-Schicht zwischen dem Client setzen und dem Server (das heißt, Client < -> proxyTo < -> Server), aber es wird Probleme haben, wenn ich zwei Schichten zwischen dem Client und dem Server. Zum Beispiel, wenn ich habe den folgenden einfachen Python HTTP-Server erhalten:

import socket 
from threading import Thread, Semaphore 
import time 

from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer 
from SocketServer import ThreadingMixIn 


class MyHTTPHandler(BaseHTTPRequestHandler): 
    protocol_version = 'HTTP/1.1' 

    def do_GET(self): 
     self.send_response(200) 
     self.send_header('Transfer-Encoding', 'chunked') 
     self.end_headers() 

     for i in range(100): 
      data = ('%s\n' % i).encode('utf-8') 
      self.wfile.write(hex(len(data))[2:].encode('utf-8')) 
      self.wfile.write(b'\r\n') 
      self.wfile.write(data) 
      self.wfile.write(b'\r\n') 
      time.sleep(1) 
     self.wfile.write(b'0\r\n\r\n') 


class MyServer(ThreadingMixIn, HTTPServer): 
    def server_bind(self): 
     HTTPServer.server_bind(self) 
     self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 

    def server_close(self): 
     HTTPServer.server_close(self) 


if __name__ == '__main__': 
    server = MyServer(('127.0.0.1', 8080), MyHTTPHandler) 
    server.serve_forever() 

die im Grunde nichts tut, sondern eine segmentierte Antwort öffnen (für den Lauf langfristigen, damit können wir die Fragen Prüfung). Und wenn ich die Kette zwei Schichten von Proxies auf folgende Weise:

class TestActor(val target: String)(implicit val system: ActorSystem) extends Actor 
    with HttpService 
    with ProxyDirectives 
{ 
    // we use the enclosing ActorContext's or ActorSystem's dispatcher for our Futures and Scheduler 
    implicit private def executionContext = actorRefFactory.dispatcher 

    // the HttpService trait defines only one abstract member, which 
    // connects the services environment to the enclosing actor or test 
    def actorRefFactory = context 

    val serviceRoute: Route = { 
    get { 
     proxyTo(target) 
    } 
    } 

    // runs the service routes. 
    def receive = runRoute(serviceRoute) orElse handleTimeouts 

    private def handleTimeouts: Receive = { 
    case Timedout(x: HttpRequest) => 
     sender ! HttpResponse(StatusCodes.InternalServerError, "Request timed out.") 
    } 
} 

object DebugMain extends App { 
    val actorName = "TestActor" 
    implicit val system = ActorSystem(actorName) 

    // create and start our service actor 
    val service = system.actorOf(
    Props { new TestActor("http://127.0.0.1:8080") }, 
    s"${actorName}Service" 
) 
    val service2 = system.actorOf(
    Props { new TestActor("http://127.0.0.1:8081") }, 
    s"${actorName}2Service" 
) 

    IO(Http) ! Http.Bind(service, "::0", port = 8081) 
    IO(Http) ! Http.Bind(service2, "::0", port = 8082) 
} 

Verwenden curl http://localhost:8082 an den Proxy-Server verbinden, und Sie werden das Akka System hält Daten sehen die Übertragung auch nach curl getötet wurde (möglicherweise einschalten die Protokolle der DEBUG-Ebene, um Details zu sehen).

Wie kann ich mit diesem Problem umgehen? Vielen Dank.

Antwort

0

Nun, es stellt sich heraus, ein sehr komplexes Problem, während meine Lösung fast 100 Zeilen Codes dauert.

Eigentlich existiert das Problem nicht nur, wenn ich zwei Schichten von Proxies stapeln. Wenn ich einen Layer-Proxy verwende, existiert das Problem zwar, aber es wird kein Protokoll gedruckt, daher ist mir dieses Problem vorher nicht bekannt.

Das Schlüsselproblem ist, dass, während wir IO(Http) ! HttpRequest verwenden, es tatsächlich eine Host-Level-API von Spraydose ist. Die Verbindungen von APIs auf Host-Ebene werden von Spray HttpManager verwaltet, auf das unser Code nicht zugreifen kann. Daher können wir nichts mit dieser Verbindung tun, es sei denn, wir senden eine Http.CloseAll an IO(Http), was dazu führt, dass alle Upstream-Verbindungen geschlossen werden.

(Wenn jemand weiß, wie man die Verbindung von HttpManager bekommt, bitte sagen Sie mir).

Wir müssen Verbindung-Level-APIs von Spraydose verwenden, um für diese Situation zu dienen. Also habe ich mit so etwas wie diese kommen:

/** 
    * Proxy to upstream server, where the server response may be a long connection. 
    * 
    * @param uri Target URI, where to proxy to. 
    * @param system Akka actor system. 
    */ 
def proxyToLongConnection(uri: Uri)(implicit system: ActorSystem): Route = { 
    val io = IO(Http)(system) 

    ctx => { 
    val request = reShapeRequest(ctx.request, uri) 

    // We've successfully opened a connection to upstream server, now start proxying data. 
    actorRefFactory.actorOf { 
     Props { 
     new Actor with ActorLogging { 
      private var upstream: ActorRef = null 
      private val upstreamClosed = new AtomicBoolean(false) 
      private val clientClosed = new AtomicBoolean(false) 
      private val contextStopped = new AtomicBoolean(false) 

      // Connect to the upstream server. 
      { 
      implicit val timeout = Timeout(FiniteDuration(10, TimeUnit.SECONDS)) 
      io ! Http.Connect(
       request.uri.authority.host.toString, 
       request.uri.effectivePort, 
       sslEncryption = request.uri.scheme == "https" 
      ) 
      context.become(connecting) 
      } 

      def connecting: Receive = { 
      case _: Http.Connected => 
       upstream = sender() 
       upstream ! request 
       context.unbecome() // Restore the context to [[receive]] 

      case Http.CommandFailed(Http.Connect(address, _, _, _, _)) => 
       log.warning("Could not connect to {}", address) 
       complete(StatusCodes.GatewayTimeout)(ctx) 
       closeBothSide() 

      case x: Http.ConnectionClosed => 
       closeBothSide() 
      } 

      override def receive: Receive = { 
      case x: HttpResponse => 
       ctx.responder ! x.withAck(ContinueSend(0)) 

      case x: ChunkedMessageEnd => 
       ctx.responder ! x.withAck(ContinueSend(0)) 

      case x: ContinueSend => 
       closeBothSide() 

      case x: Failure => 
       closeBothSide() 

      case x: Http.ConnectionClosed => 
       closeBothSide() 

      case x => 
       // Proxy everything else from server to the client. 
       ctx.responder ! x 
      } 

      private def closeBothSide(): Unit = { 
      if (upstream != null) { 
       if (!upstreamClosed.getAndSet(true)) { 
       upstream ! Http.Close 
       } 
      } 
      if (!clientClosed.getAndSet(true)) { 
       ctx.responder ! Http.Close 
      } 
      if (!contextStopped.getAndSet(true)) { 
       context.stop(self) 
      } 
      } 
     } // new Actor 
     } // Props 
    } // actorOf 
    } // (ctx: RequestContext) => Unit 
} 

Der Code ist wenig lang, und ich bezweifle, dass es sollte etwas mehr saubere und einfache Implementierung sein (eigentlich bin ich nicht vertraut mit Akka). Trotzdem funktioniert dieser Code, daher stelle ich diese Lösung hier vor. Sie können Ihre Lösung für dieses Problem frei posten, wenn Sie etwas besseres gefunden haben.