Skip to content

Commit 53b1987

Browse files
committed
core: improve rpc protocol
- Ensure that pending requests are properly rejected when the underlying service channel is closed. - Make id-property from `NotificationMessage`s. optional. Ids in notification are not strictly required and can be safely omitted. - Rename `RpcConnectionFactory` to `RpcProtocolFactory` - Use a deferred in the `RpcProxyFactory` for protocol initialization Part of #12581
1 parent c1a2b7b commit 53b1987

File tree

3 files changed

+25
-23
lines changed

3 files changed

+25
-23
lines changed

packages/core/src/common/message-rpc/rpc-message-encoder.ts

+4-4
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ export interface RequestMessage {
5151

5252
export interface NotificationMessage {
5353
type: RpcMessageType.Notification;
54-
id: number;
54+
id?: number;
5555
method: string;
5656
args: any[];
5757
}
@@ -111,7 +111,7 @@ export interface RpcMessageDecoder {
111111
export interface RpcMessageEncoder {
112112
cancel(buf: WriteBuffer, requestId: number): void;
113113

114-
notification(buf: WriteBuffer, requestId: number, method: string, args: any[]): void
114+
notification(buf: WriteBuffer, method: string, args: any[], id?: number): void
115115

116116
request(buf: WriteBuffer, requestId: number, method: string, args: any[]): void
117117

@@ -130,8 +130,8 @@ export class MsgPackMessageEncoder implements RpcMessageEncoder {
130130
cancel(buf: WriteBuffer, requestId: number): void {
131131
this.encode<CancelMessage>(buf, { type: RpcMessageType.Cancel, id: requestId });
132132
}
133-
notification(buf: WriteBuffer, requestId: number, method: string, args: any[]): void {
134-
this.encode<NotificationMessage>(buf, { type: RpcMessageType.Notification, id: requestId, method, args });
133+
notification(buf: WriteBuffer, method: string, args: any[], id?: number): void {
134+
this.encode<NotificationMessage>(buf, { type: RpcMessageType.Notification, method, args, id });
135135
}
136136
request(buf: WriteBuffer, requestId: number, method: string, args: any[]): void {
137137
this.encode<RequestMessage>(buf, { type: RpcMessageType.Request, id: requestId, method, args });

packages/core/src/common/message-rpc/rpc-protocol.ts

+8-4
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,11 @@ export class RpcProtocol {
7777
this.encoder = options.encoder ?? new MsgPackMessageEncoder();
7878
this.decoder = options.decoder ?? new MsgPackMessageDecoder();
7979
this.toDispose.push(this.onNotificationEmitter);
80-
channel.onClose(() => this.toDispose.dispose());
80+
channel.onClose(event => {
81+
this.pendingRequests.forEach(pending => pending.reject(new Error(event.reason)));
82+
this.pendingRequests.clear();
83+
this.toDispose.dispose();
84+
});
8185
this.toDispose.push(channel.onMessage(readBuffer => this.handleMessage(this.decoder.parse(readBuffer()))));
8286
this.mode = options.mode ?? 'default';
8387

@@ -98,7 +102,7 @@ export class RpcProtocol {
98102
return;
99103
}
100104
case RpcMessageType.Notification: {
101-
this.handleNotify(message.id, message.method, message.args);
105+
this.handleNotify(message.method, message.args, message.id);
102106
return;
103107
}
104108
}
@@ -179,7 +183,7 @@ export class RpcProtocol {
179183
}
180184

181185
const output = this.channel.getWriteBuffer();
182-
this.encoder.notification(output, this.nextMessageId++, method, args);
186+
this.encoder.notification(output, method, args, this.nextMessageId++);
183187
output.commit();
184188
}
185189

@@ -226,7 +230,7 @@ export class RpcProtocol {
226230
}
227231
}
228232

229-
protected async handleNotify(id: number, method: string, args: any[]): Promise<void> {
233+
protected async handleNotify(method: string, args: any[], id?: number): Promise<void> {
230234
if (this.toDispose.disposed) {
231235
return;
232236
}

packages/core/src/common/messaging/proxy-factory.ts

+13-15
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import { Emitter, Event } from '../event';
2323
import { Channel } from '../message-rpc/channel';
2424
import { RequestHandler, RpcProtocol } from '../message-rpc/rpc-protocol';
2525
import { ConnectionHandler } from './handler';
26+
import { Deferred } from '../promise-util';
2627

2728
export type JsonRpcServer<Client> = Disposable & {
2829
/**
@@ -55,11 +56,11 @@ export class JsonRpcConnectionHandler<T extends object> implements ConnectionHan
5556
}
5657
}
5758
/**
58-
* Factory for creating a new {@link RpcConnection} for a given chanel and {@link RequestHandler}.
59+
* Factory for creating a new {@link RpcProtocol} for a given chanel and {@link RequestHandler}.
5960
*/
60-
export type RpcConnectionFactory = (channel: Channel, requestHandler: RequestHandler) => RpcProtocol;
61+
export type RpcProtocolFactory = (channel: Channel, requestHandler: RequestHandler) => RpcProtocol;
6162

62-
const defaultRPCConnectionFactory: RpcConnectionFactory = (channel, requestHandler) => new RpcProtocol(channel, requestHandler);
63+
const defaultRpcProtocolFactory: RpcProtocolFactory = (channel, requestHandler) => new RpcProtocol(channel, requestHandler);
6364

6465
/**
6566
* Factory for JSON-RPC proxy objects.
@@ -109,25 +110,22 @@ export class JsonRpcProxyFactory<T extends object> implements ProxyHandler<T> {
109110
protected readonly onDidOpenConnectionEmitter = new Emitter<void>();
110111
protected readonly onDidCloseConnectionEmitter = new Emitter<void>();
111112

112-
protected connectionPromiseResolve: (connection: RpcProtocol) => void;
113-
protected connectionPromise: Promise<RpcProtocol>;
113+
protected rpcDeferred: Deferred<RpcProtocol>;
114114

115115
/**
116116
* Build a new JsonRpcProxyFactory.
117117
*
118118
* @param target - The object to expose to JSON-RPC methods calls. If this
119119
* is omitted, the proxy won't be able to handle requests, only send them.
120120
*/
121-
constructor(public target?: any, protected rpcConnectionFactory = defaultRPCConnectionFactory) {
121+
constructor(public target?: any, protected rpcProtocolFactory = defaultRpcProtocolFactory) {
122122
this.waitForConnection();
123123
}
124124

125125
protected waitForConnection(): void {
126-
this.connectionPromise = new Promise(resolve =>
127-
this.connectionPromiseResolve = resolve
128-
);
129-
this.connectionPromise.then(connection => {
130-
connection.channel.onClose(() => {
126+
this.rpcDeferred = new Deferred<RpcProtocol>();
127+
this.rpcDeferred.promise.then(protocol => {
128+
protocol.channel.onClose(() => {
131129
this.onDidCloseConnectionEmitter.fire(undefined);
132130
// Wait for connection in case the backend reconnects
133131
this.waitForConnection();
@@ -143,10 +141,10 @@ export class JsonRpcProxyFactory<T extends object> implements ProxyHandler<T> {
143141
* response.
144142
*/
145143
listen(channel: Channel): void {
146-
const connection = this.rpcConnectionFactory(channel, (meth, args) => this.onRequest(meth, ...args));
147-
connection.onNotification(event => this.onNotification(event.method, ...event.args));
144+
const protocol = this.rpcProtocolFactory(channel, (meth, args) => this.onRequest(meth, ...args));
145+
protocol.onNotification(event => this.onNotification(event.method, ...event.args));
148146

149-
this.connectionPromiseResolve(connection);
147+
this.rpcDeferred.resolve(protocol);
150148
}
151149

152150
/**
@@ -249,7 +247,7 @@ export class JsonRpcProxyFactory<T extends object> implements ProxyHandler<T> {
249247
return (...args: any[]) => {
250248
const method = p.toString();
251249
const capturedError = new Error(`Request '${method}' failed`);
252-
return this.connectionPromise.then(connection =>
250+
return this.rpcDeferred.promise.then(connection =>
253251
new Promise<void>((resolve, reject) => {
254252
try {
255253
if (isNotify) {

0 commit comments

Comments
 (0)