1
1
mutable struct Multi
2
2
lock :: ReentrantLock
3
3
handle :: Ptr{Cvoid}
4
- timer :: Timer
4
+ timer :: Ptr{Cvoid}
5
5
easies :: Vector{Easy}
6
6
grace :: UInt64
7
7
8
8
function Multi (grace:: Integer = typemax (UInt64))
9
- multi = new (ReentrantLock (), C_NULL , Timer (0 ), Easy[], grace)
9
+ timer = jl_malloc (Base. _sizeof_uv_timer)
10
+ uv_timer_init (timer)
11
+ multi = new (ReentrantLock (), C_NULL , timer, Easy[], grace)
10
12
finalizer (multi) do multi
11
- close (multi. timer)
13
+ uv_timer_stop (multi. timer)
14
+ uv_close (multi. timer, cglobal (:jl_free ))
12
15
done! (multi)
13
16
end
14
17
end
29
32
30
33
# adding & removing easy handles
31
34
35
+ function cleanup_callback (uv_timer_p:: Ptr{Cvoid} ):: Cvoid
36
+ # # TODO : use a member access API
37
+ multi_p = unsafe_load (convert (Ptr{Ptr{Cvoid}}, uv_timer_p))
38
+ multi = unsafe_pointer_to_objref (multi_p):: Multi
39
+ done! (multi)
40
+ return
41
+ end
42
+
32
43
function add_handle (multi:: Multi , easy:: Easy )
33
44
lock (multi. lock) do
34
45
if isempty (multi. easies)
35
46
preserve_handle (multi)
36
- close (multi. timer) # stop grace timer
47
+ uv_timer_stop (multi. timer) # stop grace timer
37
48
end
38
49
push! (multi. easies, easy)
39
50
init! (multi)
@@ -46,14 +57,11 @@ function remove_handle(multi::Multi, easy::Easy)
46
57
@check curl_multi_remove_handle (multi. handle, easy. handle)
47
58
deleteat! (multi. easies, findlast (== (easy), multi. easies):: Int )
48
59
! isempty (multi. easies) && return
60
+ cleanup_cb = @cfunction (cleanup_callback, Cvoid, (Ptr{Cvoid},))
49
61
if multi. grace <= 0
50
62
done! (multi)
51
63
elseif 0 < multi. grace < typemax (multi. grace)
52
- multi. timer = Timer (multi. grace/ 1000 )
53
- @async begin
54
- wait (multi. timer)
55
- isopen (multi. timer) && done! (multi)
56
- end
64
+ uv_timer_start (multi. timer, cleanup_cb, multi. grace, 0 )
57
65
end
58
66
unpreserve_handle (multi)
59
67
end
@@ -65,14 +73,15 @@ function set_defaults(multi::Multi)
65
73
# currently no defaults
66
74
end
67
75
68
- # multi-socket handle state updates
76
+ # libuv callbacks
69
77
70
78
struct CURLMsg
71
79
msg :: CURLMSG
72
80
easy :: Ptr{Cvoid}
73
81
code :: CURLcode
74
82
end
75
83
84
+ # should already be locked
76
85
function check_multi_info (multi:: Multi )
77
86
while true
78
87
p = curl_multi_info_read (multi. handle, Ref {Cint} ())
@@ -95,15 +104,37 @@ function check_multi_info(multi::Multi)
95
104
end
96
105
end
97
106
98
- # curl callbacks
107
+ function event_callback (
108
+ uv_poll_p :: Ptr{Cvoid} ,
109
+ status :: Cint ,
110
+ events :: Cint ,
111
+ ):: Cvoid
112
+ # # TODO : use a member access API
113
+ multi_p = unsafe_load (convert (Ptr{Ptr{Cvoid}}, uv_poll_p))
114
+ multi = unsafe_pointer_to_objref (multi_p):: Multi
115
+ sock_p = uv_poll_p + Base. _sizeof_uv_poll
116
+ sock = unsafe_load (convert (Ptr{curl_socket_t}, sock_p))
117
+ flags = 0
118
+ events & UV_READABLE != 0 && (flags |= CURL_CSELECT_IN)
119
+ events & UV_WRITABLE != 0 && (flags |= CURL_CSELECT_OUT)
120
+ lock (multi. lock) do
121
+ @check curl_multi_socket_action (multi. handle, sock, flags)
122
+ check_multi_info (multi)
123
+ end
124
+ end
99
125
100
- function do_multi (multi:: Multi )
126
+ function timeout_callback (uv_timer_p:: Ptr{Cvoid} ):: Cvoid
127
+ # # TODO : use a member access API
128
+ multi_p = unsafe_load (convert (Ptr{Ptr{Cvoid}}, uv_timer_p))
129
+ multi = unsafe_pointer_to_objref (multi_p):: Multi
101
130
lock (multi. lock) do
102
131
@check curl_multi_socket_action (multi. handle, CURL_SOCKET_TIMEOUT, 0 )
103
132
check_multi_info (multi)
104
133
end
105
134
end
106
135
136
+ # curl callbacks
137
+
107
138
function timer_callback (
108
139
multi_h :: Ptr{Cvoid} ,
109
140
timeout_ms :: Clong ,
@@ -112,13 +143,15 @@ function timer_callback(
112
143
multi = unsafe_pointer_to_objref (multi_p):: Multi
113
144
@assert multi_h == multi. handle
114
145
if timeout_ms == 0
115
- do_multi (multi)
116
- elseif timeout_ms >= 0
117
- multi. timer = Timer (timeout_ms/ 1000 ) do timer
118
- do_multi (multi)
146
+ lock (multi. lock) do
147
+ @check curl_multi_socket_action (multi. handle, CURL_SOCKET_TIMEOUT, 0 )
148
+ check_multi_info (multi)
119
149
end
150
+ elseif timeout_ms >= 0
151
+ timeout_cb = @cfunction (timeout_callback, Cvoid, (Ptr{Cvoid},))
152
+ uv_timer_start (multi. timer, timeout_cb, max (1 , timeout_ms), 0 )
120
153
elseif timeout_ms == - 1
121
- close (multi. timer)
154
+ uv_timer_stop (multi. timer)
122
155
else
123
156
@async @error (" timer_callback: invalid timeout value" , timeout_ms)
124
157
return - 1
@@ -131,47 +164,46 @@ function socket_callback(
131
164
sock :: curl_socket_t ,
132
165
action :: Cint ,
133
166
multi_p :: Ptr{Cvoid} ,
134
- watcher_p :: Ptr{Cvoid} ,
167
+ uv_poll_p :: Ptr{Cvoid} ,
135
168
):: Cint
136
- if action ∉ (CURL_POLL_IN, CURL_POLL_OUT, CURL_POLL_INOUT, CURL_POLL_REMOVE)
137
- @async @error (" socket_callback: unexpected action" , action)
138
- return - 1
139
- end
140
169
multi = unsafe_pointer_to_objref (multi_p):: Multi
141
- if watcher_p != C_NULL
142
- old_watcher = unsafe_pointer_to_objref (watcher_p):: FDWatcher
143
- @check curl_multi_assign (multi. handle, sock, C_NULL )
144
- unpreserve_handle (old_watcher)
145
- end
146
170
if action in (CURL_POLL_IN, CURL_POLL_OUT, CURL_POLL_INOUT)
147
- readable = action in (CURL_POLL_IN, CURL_POLL_INOUT)
148
- writable = action in (CURL_POLL_OUT, CURL_POLL_INOUT)
149
- watcher = FDWatcher (OS_HANDLE (sock), readable, writable)
150
- preserve_handle (watcher)
151
- watcher_p = pointer_from_objref (watcher)
152
- @check curl_multi_assign (multi. handle, sock, watcher_p)
153
- task = @async while true
154
- events = try wait (watcher)
155
- catch err
156
- err isa EOFError && break
157
- rethrow ()
171
+ if uv_poll_p == C_NULL
172
+ uv_poll_p = uv_poll_alloc ()
173
+ uv_poll_init (uv_poll_p, sock)
174
+ # # TODO : use a member access API
175
+ unsafe_store! (convert (Ptr{Ptr{Cvoid}}, uv_poll_p), multi_p)
176
+ sock_p = uv_poll_p + Base. _sizeof_uv_poll
177
+ unsafe_store! (convert (Ptr{curl_socket_t}, sock_p), sock)
178
+ lock (multi. lock) do
179
+ @check curl_multi_assign (multi. handle, sock, uv_poll_p)
158
180
end
159
- flags = CURL_CSELECT_IN * isreadable (events) +
160
- CURL_CSELECT_OUT * iswritable (events) +
161
- CURL_CSELECT_ERR * events. disconnect
181
+ end
182
+ events = 0
183
+ action != CURL_POLL_IN && (events |= UV_WRITABLE)
184
+ action != CURL_POLL_OUT && (events |= UV_READABLE)
185
+ event_cb = @cfunction (event_callback, Cvoid, (Ptr{Cvoid}, Cint, Cint))
186
+ uv_poll_start (uv_poll_p, events, event_cb)
187
+ elseif action == CURL_POLL_REMOVE
188
+ if uv_poll_p != C_NULL
189
+ uv_poll_stop (uv_poll_p)
190
+ uv_close (uv_poll_p, cglobal (:jl_free ))
162
191
lock (multi. lock) do
163
- @check curl_multi_socket_action (multi. handle, sock, flags)
164
- check_multi_info (multi)
192
+ @check curl_multi_assign (multi. handle, sock, C_NULL )
165
193
end
166
194
end
167
- @isdefined (errormonitor) && errormonitor (task)
195
+ else
196
+ @async @error (" socket_callback: unexpected action" , action)
197
+ return - 1
168
198
end
169
- @isdefined (old_watcher) && close (old_watcher)
170
199
return 0
171
200
end
172
201
173
202
function add_callbacks (multi:: Multi )
203
+ # stash multi handle pointer in timer
174
204
multi_p = pointer_from_objref (multi)
205
+ # # TODO : use a member access API
206
+ unsafe_store! (convert (Ptr{Ptr{Cvoid}}, multi. timer), multi_p)
175
207
176
208
# set timer callback
177
209
timer_cb = @cfunction (timer_callback, Cint, (Ptr{Cvoid}, Clong, Ptr{Cvoid}))
0 commit comments