-
Notifications
You must be signed in to change notification settings - Fork 117
Expand file tree
/
Copy pathserver_ev.lua
More file actions
266 lines (249 loc) · 7.19 KB
/
server_ev.lua
File metadata and controls
266 lines (249 loc) · 7.19 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
local socket = require'socket'
local tools = require'websocket.tools'
local frame = require'websocket.frame'
local handshake = require'websocket.handshake'
local tconcat = table.concat
local tinsert = table.insert
local ev
local loop
local clients = {}
clients[true] = {}
local client = function(sock,protocol)
assert(sock)
sock:setoption('tcp-nodelay',true)
local fd = sock:getfd()
local message_io
local close_timer
local async_send = require'websocket.ev_common'.async_send(sock,loop)
local self = {}
self.state = 'OPEN'
self.sock = sock
local user_on_error
local on_error = function(s,err)
if clients[protocol] ~= nil and clients[protocol][self] ~= nil then
clients[protocol][self] = nil
end
if user_on_error then
user_on_error(self,err)
else
print('Websocket server error',err)
end
end
local user_on_close
local on_close = function(was_clean,code,reason)
if clients[protocol] ~= nil and clients[protocol][self] ~= nil then
clients[protocol][self] = nil
end
if close_timer then
close_timer:stop(loop)
close_timer = nil
end
message_io:stop(loop)
self.state = 'CLOSED'
if user_on_close then
user_on_close(self,was_clean,code,reason or '')
end
sock:shutdown()
sock:close()
end
local handle_sock_err = function(err)
if err == 'closed' then
if self.state ~= 'CLOSED' then
on_close(false,1006,'')
end
else
on_error(err)
end
end
local user_on_message = function() end
local TEXT = frame.TEXT
local BINARY = frame.BINARY
local on_message = function(message,opcode)
if opcode == TEXT or opcode == BINARY then
user_on_message(self,message,opcode)
elseif opcode == frame.CLOSE then
if self.state ~= 'CLOSING' then
self.state = 'CLOSING'
local code,reason = frame.decode_close(message)
local encoded = frame.encode_close(code)
encoded = frame.encode(encoded,frame.CLOSE)
async_send(encoded,
function()
on_close(true,code or 1006,reason)
end,handle_sock_err)
else
on_close(true,1006,'')
end
end
end
self.send = function(_,message,opcode)
local encoded = frame.encode(message,opcode or frame.TEXT)
return async_send(encoded)
end
self.on_close = function(_,on_close_arg)
user_on_close = on_close_arg
end
self.on_error = function(_,on_error_arg)
user_on_error = on_error_arg
end
self.on_message = function(_,on_message_arg)
user_on_message = on_message_arg
end
self.broadcast = function(_,...)
for client in pairs(clients[protocol]) do
if client.state == 'OPEN' then
client:send(...)
end
end
end
self.close = function(_,code,reason,timeout)
if clients[protocol] ~= nil and clients[protocol][self] ~= nil then
clients[protocol][self] = nil
end
if not message_io then
self:start()
end
if self.state == 'OPEN' then
self.state = 'CLOSING'
assert(message_io)
timeout = timeout or 3
local encoded = frame.encode_close(code or 1000,reason or '')
encoded = frame.encode(encoded,frame.CLOSE)
async_send(encoded)
close_timer = ev.Timer.new(function()
close_timer = nil
on_close(false,1006,'timeout')
end,timeout)
close_timer:start(loop)
end
end
self.start = function()
message_io = require'websocket.ev_common'.message_io(
sock,loop,
on_message,
handle_sock_err)
end
return self
end
local listen = function(opts)
assert(opts and (opts.protocols or opts.default))
ev = require'ev'
loop = opts.loop or ev.Loop.default
local user_on_error
local on_error = function(s,err)
if user_on_error then
user_on_error(s,err)
else
print(err)
end
end
local protocols = {}
if opts.protocols then
for protocol in pairs(opts.protocols) do
clients[protocol] = {}
tinsert(protocols,protocol)
end
end
local self = {}
self.on_error = function(self,on_error)
user_on_error = on_error
end
local listener,err = socket.bind(opts.interface or '*',opts.port or 80)
if not listener then
error(err)
end
listener:settimeout(0)
self.sock = function()
return listener
end
local listen_io = ev.IO.new(
function()
local client_sock = listener:accept()
client_sock:settimeout(0)
assert(client_sock)
local request = {}
local last
ev.IO.new(
function(loop,read_io)
repeat
local line,err,part = client_sock:receive('*l')
if line then
if last then
line = last..line
last = nil
end
request[#request+1] = line
elseif err ~= 'timeout' then
on_error(self,'Websocket Handshake failed due to socket err:'..err)
read_io:stop(loop)
return
else
last = part
return
end
until line == ''
read_io:stop(loop)
local upgrade_request = tconcat(request,'\r\n')
local response,protocol = handshake.accept_upgrade(upgrade_request,protocols)
if not response then
print('Handshake failed, Request:')
print(upgrade_request)
client_sock:close()
return
end
local index
ev.IO.new(
function(loop,write_io)
local len = #response
local sent,err = client_sock:send(response,index)
if not sent then
write_io:stop(loop)
print('Websocket client closed while handshake',err)
elseif sent == len then
write_io:stop(loop)
local protocol_handler
local new_client
local protocol_index
if protocol and opts.protocols[protocol] then
protocol_index = protocol
protocol_handler = opts.protocols[protocol]
elseif opts.default then
-- true is the 'magic' index for the default handler
protocol_index = true
protocol_handler = opts.default
else
client_sock:close()
if on_error then
on_error('bad protocol')
end
return
end
new_client = client(client_sock,protocol_index)
clients[protocol_index][new_client] = true
protocol_handler(new_client)
new_client:start(loop)
else
assert(sent < len)
index = sent
end
end,client_sock:getfd(),ev.WRITE):start(loop)
end,client_sock:getfd(),ev.READ):start(loop)
end,listener:getfd(),ev.READ)
self.close = function(keep_clients)
listen_io:stop(loop)
listener:close()
listener = nil
if not keep_clients then
for protocol,clients in pairs(clients) do
for client in pairs(clients) do
client:close()
end
end
end
end
listen_io:start(loop)
return self
end
return {
listen = listen
}