9

Ich schreibe einen Anwendungsserver in Clojure, die ClojureScript auf dem Client verwenden wird.Server Push von Daten von Clojure zu ClojureScript

Ich mag einen effizienten, idiomatischen Weg finden, Daten vom Server an den Client als Echtzeit-Ereignisse zu drücken, im Idealfall einer gewisse Kombination der Verwendung:

  • http-kit
  • core.async
  • Ring

(Aber ich bin offen für andere Möglichkeiten)

Kann jemand bieten Ein gutes Beispiel dafür?

+0

siehe https://github.com/sunng87/ring-jetty9-adapter und http-kit unterstützen auch ws – edbond

+0

websockets unterstützung = http://caniuse.com/websockets http-kit docs: http: // http- kit.org/server.html#channel Sobald du eine Nachricht bekommen hast, leg los! es zu channeln und du fertig. – edbond

Antwort

4

Ich bin ein Projekt jetzt entwickelt, wo ich genau die gleiche Anforderung hatte, habe ich in Verbindung mit core.async Sockelservice verwendet, um SSE zu implementieren, und es funktioniert wirklich gut.

Leider kann ich nicht Open-Source diese Arbeit jetzt, aber im Grunde habe ich etwas wie die Schnipsel unten, nur komplizierter wegen der Authentifizierung, die in SSE aus Browser nicht besonders einfach ist, weil Sie nicht können Übergeben Sie benutzerdefinierte Header in Ihrer neuen EventSource (SOME_URI); Anruf.

So werden die Schnipsel:

(ns chat-service.service 
    (:require [clojure.set :as set] 
      [clojure.core.async :as async :refer [<!! >!! <! >!]] 
      [cheshire.core :as json] 
      [io.pedestal.service.http :as bootstrap] 
      [io.pedestal.service.log :as log] 
      [io.pedestal.service.http.route :as route] 
      [io.pedestal.service.http.sse :as sse] 
      [io.pedestal.service.http.route.definition :refer [defroutes]])) 

(def ^{:private true :doc "Formatting opts"} json-opts {:date-format "MMM dd, yyyy HH:mm:ss Z"}) 

(def ^{:private true :doc "Users to notification channels"} subscribers->notifications (atom {})) 

;; private helper functions 
(def ^:private generate-id #(.toString (java.util.UUID/randomUUID))) 

(defn- sse-msg [event msg-data] 
    {:event event :msg msg-data}) 

;; service functions  
(defn- remove-subscriber 
    "Removes transport channel from atom subscribers->notifications and tears down 
    SSE connection." 
    [transport-channel context] 
    (let [subscriber (get (set/map-invert @subscribers->notifications) transport-channel)] 
    (log/info :msg (str "Removing SSE connection for subscriber with ID : " subscriber)) 
    (swap! subscribers->notifications dissoc subscriber) 
    (sse/end-event-stream context))) 

(defn send-event 
    "Sends updates via SSE connection, takes also transport channel to close it 
    in case of the exception." 
    [transport-channel context {:keys [event msg]}] 
    (try 
    (log/info :msg "calling event sending fn") 
    (sse/send-event context event (json/generate-string msg json-opts)) 
    (catch java.io.IOException ioe 
     (async/close! transport-channel)))) 

(defn create-transport-channel 
    "Creates transport channel with receiving end pushing updates to SSE connection. 
    Associates this transport channel in atom subscribers->notifications under random 
    generated UUID." 
    [context] 
    (let [temporary-id (generate-id) 
     channel (async/chan)] 
    (swap! subscribers->notifications assoc temporary-id channel) 
    (async/go-loop [] 
     (when-let [payload (<! channel)] 
     (send-event channel context payload) 
     (recur)) 
     (remove-subscriber channel context)) 
    (async/put! channel (sse-msg "eventsourceVerification" 
           {:handshakeToken temporary-id})))) 

(defn subscribe 
    "Subscribes anonymous user to SSE connection. Transport channel with timeout set up 
    will be created for pushing any new data to this connection." 
    [context] 
    (create-transport-channel context)) 

(defroutes routes 
    [[["/notifications/chat" 
    {:get [::subscribe (sse/start-event-stream subscribe)]}]]]) 

(def service {:env :prod 
       ::bootstrap/routes routes 
       ::bootstrap/resource-path "/public" 
       ::bootstrap/type :jetty 
       ::bootstrap/port 8081}) 

Ein "Problem" i auftritt, ist die Standardmethode, wie Sockel Griffe SSE Verbindungen fallen gelassen.

Aufgrund des geplanten Heartbeat-Jobs wird eine Ausnahme protokolliert, wenn die Verbindung getrennt wird und Sie den End-Event-Stream-Kontext nicht aufgerufen haben.

Ich wünschte, es gäbe eine Möglichkeit, dieses Verhalten zu deaktivieren/zwicken oder zumindest eine eigene Abreißfunktion bereitzustellen, die aufgerufen wird, wenn der Heartbeat-Job mit EofException fehlschlägt.

5

Ich bevorzuge zu verwenden aleph, hier ist die wiki, können Sie einfach wrap-ring-handler Funktion verwenden, um existierende Handler zu wickeln.

Für die "Push" -Funktion ist der nützlichste Teil der asph-Handler von Aleph. Es baut auf netty auf, nicht ein one-connection-one-thread-Modell, so dass die Server-Seite sich keine Sorgen über die Anzahl der TCP-Verbindungen machen müssen.

Einige implementieren Details:

  • Server-Seite verwenden aysnc Handler, halten alle Client-Verbindungen (Kanäle)
  • In 60 (zum Beispiel) Sekunden, wenn es keine 'neuen Daten' ist, senden Sie eine leere Antwort
  • Wenn die Serverseite eine Antwort hat, senden Sie sie.
  • Die Client-Seite kann einfach eine normale HTTP-Anfrage an die
  • Server senden, wenn der Client die Antwort erhalten, behandeln Sie den Antworttext und dann wieder eine HTTP-Anfrage
  • Sie bitte auf den Client neu senden und alle der Proxy-Server den richtigen Timeout-Wert

es gibt mehr Möglichkeiten, hier zu setzen: http://en.wikipedia.org/wiki/Push_technology

+0

Aleph + websockets hat auch für mich gut funktioniert – Hendekagon

5

ich die Bibliothek Chord vor kurzem, und ich mag es wirklich ausprobiert habe.

Es bietet eine kleine core.async Wrapper um die Websocket-Unterstützung in http-kit.

Von der Github Seite:

Auf dem Server

(:require [chord.http-kit :refer [with-channel]] 
      [clojure.core.async :refer [<! >! put! close! go]]) 

(defn your-handler [req] 
    (with-channel req ws-ch 
    (go 
     (let [{:keys [message]} (<! ws-ch)] 
     (println "Message received:" message) 
     (>! ws-ch "Hello client from server!") 
     (close! ws-ch))))) 

auf dem Client

(:require [chord.client :refer [ws-ch]] 
      [cljs.core.async :refer [<! >! put! close!]]) 
(:require-macros [cljs.core.async.macros :refer [go]]) 

(go 
    (let [ws (<! (ws-ch "ws://localhost:3000/ws"))] 
    (>! ws "Hello server from client!"))) 

Ich denke, es aber in frühen Stadien nach wie vor ist - es funktioniert nicht handle Trennungen noch.