Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Only disconnect socket when sending is done #295

Merged
merged 1 commit into from
Nov 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion capnp-rpc-net/capTP_capnp.ml
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ module Make (Network : S.NETWORK) = struct
if not t.disconnecting then (
t.disconnecting <- true;
send_abort t ex;
Endpoint.disconnect t.endpoint;
Conn.disconnect t.conn ex
)

Expand Down
18 changes: 10 additions & 8 deletions capnp-rpc-net/endpoint.ml
Original file line number Diff line number Diff line change
Expand Up @@ -109,14 +109,16 @@ let shutdown_send t =
Write.close t.writer

let rec run_writer ~tags t =
let bufs = Write.await_batch t.writer in
match Eio.Flow.single_write t.flow bufs with
| n -> Write.shift t.writer n; run_writer ~tags t
| exception (Eio.Io (Eio.Net.E Connection_reset _, _) as ex) ->
Log.info (fun f -> f ~tags "Send failed: %a" Eio.Exn.pp ex)
| exception ex ->
Eio.Fiber.check ();
Log.warn (fun f -> f ~tags "Error sending messages: %a (will shutdown connection)" Fmt.exn ex)
match Write.await_batch t.writer with
| exception End_of_file -> () (* Due to [shutdown_send] closing it. *)
| bufs ->
match Eio.Flow.single_write t.flow bufs with
| n -> Write.shift t.writer n; run_writer ~tags t
| exception (Eio.Io (Eio.Net.E Connection_reset _, _) as ex) ->
Log.info (fun f -> f ~tags "Send failed: %a" Eio.Exn.pp ex)
| exception ex ->
Eio.Fiber.check ();
Log.warn (fun f -> f ~tags "Error sending messages: %a (will shutdown connection)" Fmt.exn ex)

let run_writer ~tags t =
let cleanup () =
Expand Down
2 changes: 1 addition & 1 deletion capnp-rpc-net/vat.ml
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ module Make (Network : S.NETWORK) = struct
let my_id = Auth.Secret_key.digest ~hash (Lazy.force t.secret_key) in
let keep_new = (my_id > peer_id) = (mode = `Connect) in
if keep_new then (
let reason = Capnp_rpc.Exception.v "Closing duplicate connection" in
let reason = Capnp_rpc.Exception.v "Invalidated by newer connection" in
CapTP.disconnect existing reason;
run_connection_tls t endpoint r
) else (
Expand Down
22 changes: 12 additions & 10 deletions capnp-rpc/proto/capTP.ml
Original file line number Diff line number Diff line change
Expand Up @@ -874,7 +874,6 @@ module Make (EP : Message_types.ENDPOINT) = struct
else `SenderPromise id

let bootstrap t remote_promise =
check_connected t;
Questions.alloc t.questions (Question.v ~params_for_release:[] ~remote_promise)

(* This is for level 0 implementations, which don't understand about releasing caps. *)
Expand Down Expand Up @@ -1086,15 +1085,18 @@ module Make (EP : Message_types.ENDPOINT) = struct
t.queue_send (`Disembargo_request request)

let bootstrap t object_id =
let result = make_remote_promise t in
let question = Send.bootstrap t (result :> Core_types.struct_resolver) in
result#set_question question;
let qid = Question.id question in
Log.debug (fun f -> f ~tags:(with_qid qid t) "Sending: bootstrap");
t.queue_send (`Bootstrap (qid, object_id));
let service = result#cap Wire.Path.root in
dec_ref result;
service
match t.disconnected with
| Some ex -> Core_types.broken_cap ex
| None ->
let result = make_remote_promise t in
let question = Send.bootstrap t (result :> Core_types.struct_resolver) in
result#set_question question;
let qid = Question.id question in
Log.debug (fun f -> f ~tags:(with_qid qid t) "Sending: bootstrap");
t.queue_send (`Bootstrap (qid, object_id));
let service = result#cap Wire.Path.root in
dec_ref result;
service

module Switchable = struct
class type handler = object
Expand Down
6 changes: 4 additions & 2 deletions test/test.ml
Original file line number Diff line number Diff line change
Expand Up @@ -598,13 +598,15 @@ let test_crossed_calls ~net =
let c_got, s_got =
match c_got, s_got with
| Ok x, Ok y -> (x, y)
| Ok x, Error _ ->
| Ok x, Error (`Capnp e) ->
(* Server got an error. Try client again. *)
Logs.info (fun f -> f ~tags:Test_utils.server_tags "%a" Capnp_rpc.Error.pp e);
let to_client = Sturdy_ref.connect_exn sr_to_client in
Capability.with_ref to_client @@ fun to_client ->
Echo.ping to_client "ping" |> fun s_got -> (x, s_got)
| Error _, Ok y ->
| Error (`Capnp e), Ok y ->
(* Client got an error. Try server again. *)
Logs.info (fun f -> f ~tags:Test_utils.client_tags "%a" Capnp_rpc.Error.pp e);
let to_server = Sturdy_ref.connect_exn sr_to_server in
Capability.with_ref to_server @@ fun to_server ->
Echo.ping to_server "ping" |> fun c_got -> (c_got, y)
Expand Down
Loading