Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions deps.edn
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
{:paths ["src"]
:deps
{funcool/promesa {:mvn/version "8.0.450"}
manifold/manifold {:mvn/version "0.2.4"}
com.cognitect/transit-clj {:mvn/version "1.0.329"}
{funcool/promesa {:mvn/version "11.0.678"}
manifold/manifold {:mvn/version "0.4.3"}
com.cognitect/transit-clj {:mvn/version "1.0.333"}
com.cognitect/transit-cljs {:mvn/version "0.8.280"}
org.clojure/core.match {:mvn/version "1.0.0"}
org.clojure/core.match {:mvn/version "1.1.0"}
metosin/sieppari {:mvn/version "0.0.0-alpha13"}}

:aliases
{:outdated {:extra-deps {com.github.liquidz/antq {:mvn/version "1.9.867"}}
{:outdated {:extra-deps {com.github.liquidz/antq {:mvn/version "2.11.1250"}}
:main-opts ["-m" "antq.core"]}}}
89 changes: 67 additions & 22 deletions src/plasma/client.cljs
Original file line number Diff line number Diff line change
Expand Up @@ -37,28 +37,72 @@
"Build a transport using a websocket"
([url] (websocket-transport url {}))
([url {:keys [on-open on-close on-error
on-reconnect auto-reconnect?
transit-read-handlers
transit-write-handlers]}]
(let [ws (js/WebSocket. url)
reader (t/reader :json {:handlers transit-read-handlers})
writer (t/writer :json {:handlers transit-write-handlers})
state (atom {:connected? false :buffer []})
send-f #(if (:connected? @state)
(.send ws (t/write writer %&))
(swap! state update :buffer conj %&))]
(set! (.-onopen ws)
(fn []
(swap! state assoc :connected? true)
(doseq [msg (:buffer @state)]
(apply send-f msg))
(swap! state assoc :buffer [])
(when on-open (on-open))))
(set! (.-onclose ws) #(do (swap! state assoc :connected? false)
(when on-close (on-close))))
(when on-error
(set! (.-onerror ws) on-error))
(set! (.-onmessage ws)
#(receive! (t/read reader (.-data %))))
transit-write-handlers]
:or {auto-reconnect? true}}]
(let [reader (t/reader :json {:handlers transit-read-handlers})
writer (t/writer :json {:handlers transit-write-handlers})
ws-state (atom {:connected? false :buffer []})
ws (atom nil)
send-f #(if (:connected? @ws-state)
(.send @ws (t/write writer %&))
(swap! ws-state update :buffer conj %&))

reconnect-timer (atom nil)
reconnect-count (atom 0)

-on-message #(receive! (t/read reader (.-data %)))

-on-open (fn []
(let [reconnecting? @reconnect-timer]
(when @reconnect-timer
(js/clearTimeout @reconnect-timer)
;; re-request known streams
(->> @state vals (filter map?)
(map (fn [{:keys [resource-id event args]}]
(send-f :plasma/request resource-id event args)))
doall)
(reset! reconnect-count 0))

;; send any buffered messages
(swap! ws-state assoc :connected? true)
(doseq [msg (:buffer @ws-state)]
(apply send-f msg))
(swap! ws-state assoc :buffer [])

;; call either on-reconnect or on-open
(when (and reconnecting? on-reconnect)
(on-reconnect))
(when (and (not reconnecting?) on-open)
(on-open))))

-on-close (fn [reconnect-f]
(swap! ws-state assoc :connected? false)
(when on-close (on-close))

(when auto-reconnect?
(when @reconnect-timer
(js/clearTimeout @reconnect-timer))
(let [reconnect-ms (+ (* (dec (Math/pow 2 @reconnect-count)) 500) 1000)]
(reset! reconnect-timer
(js/setTimeout
(fn []
(swap! reconnect-count inc)
(js/clearTimeout @reconnect-timer)
(reconnect-f))
reconnect-ms)))))

setup-ws
(fn reconnect-ws []
(reset! ws (js/WebSocket. url))
(set! (.-onopen @ws) -on-open)
(set! (.-onclose @ws)
;; note passing _this_ function in as an arg here
(partial -on-close reconnect-ws))
(when on-error (set! (.-onerror @ws) on-error))
(set! (.-onmessage @ws) -on-message))]
(setup-ws)
send-f)))

(defn use-transport!
Expand All @@ -85,7 +129,8 @@

Returns a plasma stream."
[event args]
(let [{:keys [resource-id] :as s} (s/stream)]
(let [{:keys [resource-id] :as s} (s/stream)
s (assoc s :event event :args args)]
(swap! state assoc resource-id s)
(transport/send! :plasma/request resource-id event args)
s))