1
1
mutable struct Multi
2
2
lock :: ReentrantLock
3
3
handle :: Ptr{Cvoid}
4
- timer :: Ptr{Cvoid}
4
+ timer :: Timer
5
5
easies :: Vector{Easy}
6
6
grace :: UInt64
7
7
8
8
function Multi (grace:: Integer = typemax (UInt64))
9
- timer = jl_malloc (Base. _sizeof_uv_timer)
10
- uv_timer_init (timer)
11
- multi = new (ReentrantLock (), C_NULL , timer, Easy[], grace)
9
+ multi = new (ReentrantLock (), C_NULL , Timer (0 ), Easy[], grace)
12
10
finalizer (multi) do multi
13
- uv_timer_stop (multi. timer)
14
- uv_close (multi. timer, cglobal (:jl_free ))
11
+ close (multi. timer)
15
12
done! (multi)
16
13
end
17
14
end
32
29
33
30
# adding & removing easy handles
34
31
35
- function cleanup_callback (uv_timer_p:: Ptr{Cvoid} ):: Cvoid
36
- # # TODO : use a member access API
37
- multi_p = unsafe_load (convert (Ptr{Ptr{Cvoid}}, uv_timer_p))
38
- multi = unsafe_pointer_to_objref (multi_p):: Multi
39
- done! (multi)
40
- return
41
- end
42
-
43
32
function add_handle (multi:: Multi , easy:: Easy )
44
33
lock (multi. lock) do
45
34
if isempty (multi. easies)
46
35
preserve_handle (multi)
47
- uv_timer_stop (multi. timer) # stop grace timer
36
+ close (multi. timer) # stop grace timer
48
37
end
49
38
push! (multi. easies, easy)
50
39
init! (multi)
@@ -57,11 +46,14 @@ function remove_handle(multi::Multi, easy::Easy)
57
46
@check curl_multi_remove_handle (multi. handle, easy. handle)
58
47
deleteat! (multi. easies, findlast (== (easy), multi. easies):: Int )
59
48
! isempty (multi. easies) && return
60
- cleanup_cb = @cfunction (cleanup_callback, Cvoid, (Ptr{Cvoid},))
61
49
if multi. grace <= 0
62
50
done! (multi)
63
51
elseif 0 < multi. grace < typemax (multi. grace)
64
- uv_timer_start (multi. timer, cleanup_cb, multi. grace, 0 )
52
+ multi. timer = Timer (multi. grace/ 1000 )
53
+ @async begin
54
+ wait (multi. timer)
55
+ isopen (multi. timer) && done! (multi)
56
+ end
65
57
end
66
58
unpreserve_handle (multi)
67
59
end
@@ -73,15 +65,14 @@ function set_defaults(multi::Multi)
73
65
# currently no defaults
74
66
end
75
67
76
- # libuv callbacks
68
+ # multi-socket handle state updates
77
69
78
70
struct CURLMsg
79
71
msg :: CURLMSG
80
72
easy :: Ptr{Cvoid}
81
73
code :: CURLcode
82
74
end
83
75
84
- # should already be locked
85
76
function check_multi_info (multi:: Multi )
86
77
while true
87
78
p = curl_multi_info_read (multi. handle, Ref {Cint} ())
@@ -104,37 +95,15 @@ function check_multi_info(multi::Multi)
104
95
end
105
96
end
106
97
107
- function event_callback (
108
- uv_poll_p :: Ptr{Cvoid} ,
109
- status :: Cint ,
110
- events :: Cint ,
111
- ):: Cvoid
112
- # # TODO : use a member access API
113
- multi_p = unsafe_load (convert (Ptr{Ptr{Cvoid}}, uv_poll_p))
114
- multi = unsafe_pointer_to_objref (multi_p):: Multi
115
- sock_p = uv_poll_p + Base. _sizeof_uv_poll
116
- sock = unsafe_load (convert (Ptr{curl_socket_t}, sock_p))
117
- flags = 0
118
- events & UV_READABLE != 0 && (flags |= CURL_CSELECT_IN)
119
- events & UV_WRITABLE != 0 && (flags |= CURL_CSELECT_OUT)
120
- lock (multi. lock) do
121
- @check curl_multi_socket_action (multi. handle, sock, flags)
122
- check_multi_info (multi)
123
- end
124
- end
98
+ # curl callbacks
125
99
126
- function timeout_callback (uv_timer_p:: Ptr{Cvoid} ):: Cvoid
127
- # # TODO : use a member access API
128
- multi_p = unsafe_load (convert (Ptr{Ptr{Cvoid}}, uv_timer_p))
129
- multi = unsafe_pointer_to_objref (multi_p):: Multi
100
+ function do_multi (multi:: Multi )
130
101
lock (multi. lock) do
131
102
@check curl_multi_socket_action (multi. handle, CURL_SOCKET_TIMEOUT, 0 )
132
103
check_multi_info (multi)
133
104
end
134
105
end
135
106
136
- # curl callbacks
137
-
138
107
function timer_callback (
139
108
multi_h :: Ptr{Cvoid} ,
140
109
timeout_ms :: Clong ,
@@ -143,15 +112,13 @@ function timer_callback(
143
112
multi = unsafe_pointer_to_objref (multi_p):: Multi
144
113
@assert multi_h == multi. handle
145
114
if timeout_ms == 0
146
- lock (multi. lock) do
147
- @check curl_multi_socket_action (multi. handle, CURL_SOCKET_TIMEOUT, 0 )
148
- check_multi_info (multi)
149
- end
115
+ do_multi (multi)
150
116
elseif timeout_ms >= 0
151
- timeout_cb = @cfunction (timeout_callback, Cvoid, (Ptr{Cvoid},))
152
- uv_timer_start (multi. timer, timeout_cb, max (1 , timeout_ms), 0 )
117
+ multi. timer = Timer (timeout_ms/ 1000 ) do timer
118
+ do_multi (multi)
119
+ end
153
120
elseif timeout_ms == - 1
154
- uv_timer_stop (multi. timer)
121
+ close (multi. timer)
155
122
else
156
123
@async @error (" timer_callback: invalid timeout value" , timeout_ms)
157
124
return - 1
@@ -164,46 +131,47 @@ function socket_callback(
164
131
sock :: curl_socket_t ,
165
132
action :: Cint ,
166
133
multi_p :: Ptr{Cvoid} ,
167
- uv_poll_p :: Ptr{Cvoid} ,
134
+ watcher_p :: Ptr{Cvoid} ,
168
135
):: Cint
136
+ if action ∉ (CURL_POLL_IN, CURL_POLL_OUT, CURL_POLL_INOUT, CURL_POLL_REMOVE)
137
+ @async @error (" socket_callback: unexpected action" , action)
138
+ return - 1
139
+ end
169
140
multi = unsafe_pointer_to_objref (multi_p):: Multi
141
+ if watcher_p != C_NULL
142
+ old_watcher = unsafe_pointer_to_objref (watcher_p):: FDWatcher
143
+ @check curl_multi_assign (multi. handle, sock, C_NULL )
144
+ unpreserve_handle (old_watcher)
145
+ end
170
146
if action in (CURL_POLL_IN, CURL_POLL_OUT, CURL_POLL_INOUT)
171
- if uv_poll_p == C_NULL
172
- uv_poll_p = uv_poll_alloc ()
173
- uv_poll_init (uv_poll_p, sock)
174
- # # TODO : use a member access API
175
- unsafe_store! (convert (Ptr{Ptr{Cvoid}}, uv_poll_p), multi_p)
176
- sock_p = uv_poll_p + Base. _sizeof_uv_poll
177
- unsafe_store! (convert (Ptr{curl_socket_t}, sock_p), sock)
178
- lock (multi. lock) do
179
- @check curl_multi_assign (multi. handle, sock, uv_poll_p)
147
+ readable = action in (CURL_POLL_IN, CURL_POLL_INOUT)
148
+ writable = action in (CURL_POLL_OUT, CURL_POLL_INOUT)
149
+ watcher = FDWatcher (OS_HANDLE (sock), readable, writable)
150
+ preserve_handle (watcher)
151
+ watcher_p = pointer_from_objref (watcher)
152
+ @check curl_multi_assign (multi. handle, sock, watcher_p)
153
+ task = @async while true
154
+ events = try wait (watcher)
155
+ catch err
156
+ err isa EOFError && break
157
+ rethrow ()
180
158
end
181
- end
182
- events = 0
183
- action != CURL_POLL_IN && (events |= UV_WRITABLE)
184
- action != CURL_POLL_OUT && (events |= UV_READABLE)
185
- event_cb = @cfunction (event_callback, Cvoid, (Ptr{Cvoid}, Cint, Cint))
186
- uv_poll_start (uv_poll_p, events, event_cb)
187
- elseif action == CURL_POLL_REMOVE
188
- if uv_poll_p != C_NULL
189
- uv_poll_stop (uv_poll_p)
190
- uv_close (uv_poll_p, cglobal (:jl_free ))
159
+ flags = CURL_CSELECT_IN * isreadable (events) +
160
+ CURL_CSELECT_OUT * iswritable (events) +
161
+ CURL_CSELECT_ERR * events. disconnect
191
162
lock (multi. lock) do
192
- @check curl_multi_assign (multi. handle, sock, C_NULL )
163
+ @check curl_multi_socket_action (multi. handle, sock, flags)
164
+ check_multi_info (multi)
193
165
end
194
166
end
195
- else
196
- @async @error (" socket_callback: unexpected action" , action)
197
- return - 1
167
+ @isdefined (errormonitor) && errormonitor (task)
198
168
end
169
+ @isdefined (old_watcher) && close (old_watcher)
199
170
return 0
200
171
end
201
172
202
173
function add_callbacks (multi:: Multi )
203
- # stash multi handle pointer in timer
204
174
multi_p = pointer_from_objref (multi)
205
- # # TODO : use a member access API
206
- unsafe_store! (convert (Ptr{Ptr{Cvoid}}, multi. timer), multi_p)
207
175
208
176
# set timer callback
209
177
timer_cb = @cfunction (timer_callback, Cint, (Ptr{Cvoid}, Clong, Ptr{Cvoid}))
0 commit comments