-
Notifications
You must be signed in to change notification settings - Fork 30
Expand file tree
/
Copy pathchannel.html
More file actions
192 lines (159 loc) · 6.66 KB
/
channel.html
File metadata and controls
192 lines (159 loc) · 6.66 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
---
layout: doc
title: Unified Async/WebSocket API
---
<h2>Unified Async/Websocket API</h2>
<p>This page talks about the async and websocket extension for real-time application. Using the latest and greatest:</p>
{% highlight clojure %}
[http-kit "2.3.0"] ; Add to your project.clj
(:use org.httpkit.server) ; import namespace
{% endhighlight %}
<p class="note">
The API is still working in progress, but is fully working.
If interested, use <a href="https://github.com/http-kit/http-kit/issues">GitHub issues page</a> for suggestions, bug reports, or general discussions.
Please refer <a href="https://github.com/http-kit/http-kit/blob/protocol-api/src/org/httpkit/server.clj">server.clj</a> more detail, it has pretty clear docstr.
</p>
<p class="note">
This API is not compatible with `2.0.0-rc4`.
</p>
<h3 id="motivation" class="anchor">Motivation</h3>
<p>Why a new API</p>
<ul>
<li>Simpler and more consistent than previous one</li>
<li>New feature: HTTP streaming</li>
<li>Unify the API for HTTP (streaming or long-polling) and WebSocket: allow gracefully degrading to long-polling/streaming when WebSocket is not available in client</li>
<li>Sever also get notified when client close/away for long-polling/streaming</li>
</ul>
<h3 id="protocol" class="anchor">Channel Protocol</h3>
<p>Unified asynchronous channel interface for HTTP (streaming or long-polling) and WebSocket.</p>
Channel defines the following contract:
<ul>
<li>
<code>open?</code>: Returns true iff channel is open.
</li>
<li>
<code>close</code>: Closes the channel. Idempotent: returns true if the channel was actually closed, or false if it was already closed.
</li>
<li>
<code>websocket?</code>: Returns true iff channel is a WebSocket.
</li>
<li>
<code>send!</code>:
Sends data to client and returns true if the data was successfully sent,
or false if the channel is closed. Data is sent directly to the client,
NO RING MIDDLEWARE IS APPLIED.
Data form: {:headers _ :status _ :body _} or just body. Note that :headers
and :status will be stripped for WebSocket and for HTTP streaming responses
after the first.
</li>
<li>
<code>on-receive</code>:
Sets handler (fn [message-string]) for notification of client WebSocket
messages. Message ordering is guaranteed by server.
</li>
<li>
<code>on-close</code>:
Sets handler (fn [status]) for notification of channel being closed by the
server or client. Handler will be invoked at most once. Useful for clean-up
</li>
</ul>
<h3 id="get-channel" class="anchor">Get the channel</h3>
<p>A macro <code>with-channel</code> is provided, example: </p>
{% highlight clojure %}
(defn handler [req]
(with-channel req channel ;; get the channel,
;; channel can be HTTP(long-polling, streaming) or Websocket
;; manipulate them with a unified API
))
{% endhighlight %}
<h3 id="polling" class="anchor">Long polling example</h3>
<p>long polling is very much like streaming</p>
{% highlight clojure %}
(defn handler [request]
(with-channel request channel
(on-some-event ;; wait for event
(send! channel {:status 200
:headers {"Content-Type" "application/json; charset=utf-8"}
:body "hello, polling"}
true)))) ; send! and close. just be explicit, true is the default for streaming/polling,
(run-server handler {:port 9090})
{% endhighlight %}
<h3 id="streaming" class="anchor">Streaming example</h3>
{% highlight clojure %}
(defn handler [request]
(with-channel request channel
(on-close channel (fn [status] (println "closed by sever, or client")))
(loop [id 0]
(when (< id 10)
(schedule-task (* id 200) ;; sent a message every 200ms
(send! channel (str "message from server #" id) false)) ; false => don't close after send
(recur (inc id))))
(schedule-task 10000 (close channel)))) ;; close in 10s.
;;; open you browser http://127.0.0.1:9090, a new message show up every 200ms
(run-server handler {:port 9090})
{% endhighlight %}
<h3 id="websocket" class="anchor">Websocket example</h3>
{% highlight clojure %}
(defn handler [request]
(with-channel request channel
(on-close channel (fn [status] (println "client close it" status)))
(on-receive channel (fn [data] ;; echo it back
(send! channel data)))))
(run-server handler {:port 9090})
{% endhighlight %}
<h3 id="channel-group" class="anchor">A utility: ChannelStore</h3>
<p>I find a utility like ChannelStore(or ChannelGroup) is quite helpful when using the API</p>
{% highlight clojure %}
(defprotocol ChannelStore
(add-to-group [store group ch attach])
(remove-from-group [store group ch])
(broadcast [store group fn]))
;;; implementation using atom.
(deftype MemoryChannelStore [store-map]
ChannelStore
(add-to-group [_ group ch attach]
(swap! store-map (fn [old-store]
(assoc old-store
group
(assoc (or (old-store group) {})
ch attach)))))
(remove-from-group [_ group ch]
(swap! store-map (fn [old-store]
(let [m (update-in old-store [group] dissoc ch)]
(if (empty? (m group))
(dissoc m group)
m)))))
(broadcast [store group f]
(doseq [[ch attach] (@store-map group)]
(if (websocket? ch)
;; give back attachment (some state for pass back)
(send! ch (json-str (f ch attach)))
(send! ch {:status 200
:headers {"Content-Type"
"application/json; charset=utf-8"}
:body (json-str (f ch attach))}
true))))) ; close it
;;; example usage
(defonce channel-store (MemoryChannelStore. (atom {})))
(defn handler [req]
(let [group (-> req :params :group)
since_ts (-> req :params :since_ts)] ;; state variable, from client
(with-channel req ch
(add-to-group channel-store group ch since_ts)
(on-close ch (fn [status]
(remove-from-group channel-store group ch))))))
;;; some logic in other place
(on-some-event
(broadcast channel-store a-group
(fn [ch since_ts] ; since_ts is passed back.
(some-data-return-to-clients since_ts))))
{% endhighlight %}
<p>If this is proven to be useful, http-kit can provide a built in more efficient implentation</p>
<!-- LocalWords: clojure defn json charset utf endhighlight fn str
-->
<!-- LocalWords: println Websocket defprotocol ChannelStore ch req
-->
<!-- LocalWords: deftype MemoryChannelStore assoc dissoc doseq
-->
<!-- LocalWords: websocket defonce params
-->