Ich bin ein Projekt jetzt entwickelt, wo ich genau die gleiche Anforderung hatte, habe ich in Verbindung mit core.async Sockelservice verwendet, um SSE zu implementieren, und es funktioniert wirklich gut.
Leider kann ich nicht Open-Source diese Arbeit jetzt, aber im Grunde habe ich etwas wie die Schnipsel unten, nur komplizierter wegen der Authentifizierung, die in SSE aus Browser nicht besonders einfach ist, weil Sie nicht können Übergeben Sie benutzerdefinierte Header in Ihrer neuen EventSource (SOME_URI); Anruf.
So werden die Schnipsel:
(ns chat-service.service
(:require [clojure.set :as set]
[clojure.core.async :as async :refer [<!! >!! <! >!]]
[cheshire.core :as json]
[io.pedestal.service.http :as bootstrap]
[io.pedestal.service.log :as log]
[io.pedestal.service.http.route :as route]
[io.pedestal.service.http.sse :as sse]
[io.pedestal.service.http.route.definition :refer [defroutes]]))
(def ^{:private true :doc "Formatting opts"} json-opts {:date-format "MMM dd, yyyy HH:mm:ss Z"})
(def ^{:private true :doc "Users to notification channels"} subscribers->notifications (atom {}))
;; private helper functions
(def ^:private generate-id #(.toString (java.util.UUID/randomUUID)))
(defn- sse-msg [event msg-data]
{:event event :msg msg-data})
;; service functions
(defn- remove-subscriber
"Removes transport channel from atom subscribers->notifications and tears down
SSE connection."
[transport-channel context]
(let [subscriber (get (set/map-invert @subscribers->notifications) transport-channel)]
(log/info :msg (str "Removing SSE connection for subscriber with ID : " subscriber))
(swap! subscribers->notifications dissoc subscriber)
(sse/end-event-stream context)))
(defn send-event
"Sends updates via SSE connection, takes also transport channel to close it
in case of the exception."
[transport-channel context {:keys [event msg]}]
(try
(log/info :msg "calling event sending fn")
(sse/send-event context event (json/generate-string msg json-opts))
(catch java.io.IOException ioe
(async/close! transport-channel))))
(defn create-transport-channel
"Creates transport channel with receiving end pushing updates to SSE connection.
Associates this transport channel in atom subscribers->notifications under random
generated UUID."
[context]
(let [temporary-id (generate-id)
channel (async/chan)]
(swap! subscribers->notifications assoc temporary-id channel)
(async/go-loop []
(when-let [payload (<! channel)]
(send-event channel context payload)
(recur))
(remove-subscriber channel context))
(async/put! channel (sse-msg "eventsourceVerification"
{:handshakeToken temporary-id}))))
(defn subscribe
"Subscribes anonymous user to SSE connection. Transport channel with timeout set up
will be created for pushing any new data to this connection."
[context]
(create-transport-channel context))
(defroutes routes
[[["/notifications/chat"
{:get [::subscribe (sse/start-event-stream subscribe)]}]]])
(def service {:env :prod
::bootstrap/routes routes
::bootstrap/resource-path "/public"
::bootstrap/type :jetty
::bootstrap/port 8081})
Ein "Problem" i auftritt, ist die Standardmethode, wie Sockel Griffe SSE Verbindungen fallen gelassen.
Aufgrund des geplanten Heartbeat-Jobs wird eine Ausnahme protokolliert, wenn die Verbindung getrennt wird und Sie den End-Event-Stream-Kontext nicht aufgerufen haben.
Ich wünschte, es gäbe eine Möglichkeit, dieses Verhalten zu deaktivieren/zwicken oder zumindest eine eigene Abreißfunktion bereitzustellen, die aufgerufen wird, wenn der Heartbeat-Job mit EofException fehlschlägt.
siehe https://github.com/sunng87/ring-jetty9-adapter und http-kit unterstützen auch ws – edbond
websockets unterstützung = http://caniuse.com/websockets http-kit docs: http: // http- kit.org/server.html#channel Sobald du eine Nachricht bekommen hast, leg los! es zu channeln und du fertig. – edbond