Skip to content

Commit 817c1a0

Browse files
authored
Don't use ChannelMultiplexer in RPCProtocol (#13980)
Fixes #13960 Contributed on behalf of STMicroelectronics Signed-off-by: Thomas Mäder <[email protected]>
1 parent 44d42ec commit 817c1a0

File tree

2 files changed

+62
-209
lines changed

2 files changed

+62
-209
lines changed

packages/plugin-ext/src/common/proxy-handler.ts

-143
This file was deleted.

packages/plugin-ext/src/common/rpc-protocol.ts

+62-66
Original file line numberDiff line numberDiff line change
@@ -22,17 +22,14 @@
2222

2323
/* eslint-disable @typescript-eslint/no-explicit-any */
2424

25-
import { Channel, Disposable, DisposableCollection, isObject, ReadBuffer, URI, WriteBuffer } from '@theia/core';
25+
import { Channel, Disposable, DisposableCollection, isObject, ReadBuffer, RpcProtocol, URI, WriteBuffer } from '@theia/core';
2626
import { Emitter, Event } from '@theia/core/lib/common/event';
27-
import { ChannelMultiplexer, MessageProvider } from '@theia/core/lib/common/message-rpc/channel';
28-
import { MsgPackMessageDecoder, MsgPackMessageEncoder } from '@theia/core/lib/common/message-rpc/rpc-message-encoder';
27+
import { MessageProvider } from '@theia/core/lib/common/message-rpc/channel';
2928
import { Uint8ArrayReadBuffer, Uint8ArrayWriteBuffer } from '@theia/core/lib/common/message-rpc/uint8-array-message-buffer';
30-
import { ClientProxyHandler, ProxySynchronizer, RpcInvocationHandler } from './proxy-handler';
3129
import { MsgPackExtensionManager } from '@theia/core/lib/common/message-rpc/msg-pack-extension-manager';
3230
import { URI as VSCodeURI } from '@theia/core/shared/vscode-uri';
3331
import { BinaryBuffer } from '@theia/core/lib/common/buffer';
3432
import { Range, Position } from '../plugin/types-impl';
35-
import { Deferred } from '@theia/core/lib/common/promise-util';
3633

3734
export interface MessageConnection {
3835
send(msg: string): void;
@@ -79,21 +76,36 @@ export namespace ConnectionClosedError {
7976
}
8077

8178
export class RPCProtocolImpl implements RPCProtocol {
82-
private readonly locals = new Map<string, RpcInvocationHandler>();
79+
private readonly locals = new Map<string, any>();
8380
private readonly proxies = new Map<string, any>();
84-
private readonly multiplexer: ChannelMultiplexer;
85-
private readonly encoder = new MsgPackMessageEncoder();
86-
private readonly decoder = new MsgPackMessageDecoder();
87-
private readonly initCallback: ProxySynchronizer;
81+
private readonly rpc: RpcProtocol;
8882

8983
private readonly toDispose = new DisposableCollection(
9084
Disposable.create(() => { /* mark as no disposed */ })
9185
);
9286

9387
constructor(channel: Channel) {
94-
this.toDispose.push(this.multiplexer = new ChannelMultiplexer(new BatchingChannel(channel)));
88+
this.rpc = new RpcProtocol(new BatchingChannel(channel), (method, args) => this.handleRequest(method, args));
89+
this.rpc.onNotification((evt: { method: string; args: any[]; }) => this.handleNotification(evt.method, evt.args));
9590
this.toDispose.push(Disposable.create(() => this.proxies.clear()));
96-
this.initCallback = new ProxySynchronizerImpl();
91+
}
92+
93+
handleNotification(method: any, args: any[]): void {
94+
const serviceId = args[0] as string;
95+
const handler: any = this.locals.get(serviceId);
96+
if (!handler) {
97+
throw new Error(`no local service handler with id ${serviceId}`);
98+
}
99+
handler[method](...(args.slice(1)));
100+
}
101+
102+
handleRequest(method: string, args: any[]): Promise<any> {
103+
const serviceId = args[0] as string;
104+
const handler: any = this.locals.get(serviceId);
105+
if (!handler) {
106+
throw new Error(`no local service handler with id ${serviceId}`);
107+
}
108+
return handler[method](...(args.slice(1)));
97109
}
98110

99111
dispose(): void {
@@ -117,76 +129,60 @@ export class RPCProtocolImpl implements RPCProtocol {
117129
}
118130

119131
protected createProxy<T>(proxyId: string): T {
120-
const handler = new ClientProxyHandler({
121-
id: proxyId, encoder: this.encoder, decoder: this.decoder, channelProvider: () => this.multiplexer.open(proxyId), proxySynchronizer: this.initCallback
122-
});
132+
const handler = {
133+
get: (target: any, name: string, receiver: any): any => {
134+
if (target[name] || name.charCodeAt(0) !== 36 /* CharCode.DollarSign */) {
135+
// not a remote property
136+
return target[name];
137+
}
138+
const isNotify = this.isNotification(name);
139+
return async (...args: any[]) => {
140+
const method = name.toString();
141+
if (isNotify) {
142+
this.rpc.sendNotification(method, [proxyId, ...args]);
143+
} else {
144+
return await this.rpc.sendRequest(method, [proxyId, ...args]) as Promise<any>;
145+
}
146+
};
147+
}
148+
149+
};
123150
return new Proxy(Object.create(null), handler);
124151
}
125152

153+
/**
154+
* Return whether the given property represents a notification. If true,
155+
* the promise returned from the invocation will resolve immediately to `undefined`
156+
*
157+
* A property leads to a notification rather than a method call if its name
158+
* begins with `notify` or `on`.
159+
*
160+
* @param p - The property being called on the proxy.
161+
* @return Whether `p` represents a notification.
162+
*/
163+
protected isNotification(p: PropertyKey): boolean {
164+
let propertyString = p.toString();
165+
if (propertyString.charCodeAt(0) === 36/* CharCode.DollarSign */) {
166+
propertyString = propertyString.substring(1);
167+
}
168+
return propertyString.startsWith('notify') || propertyString.startsWith('on');
169+
}
170+
126171
set<T, R extends T>(identifier: ProxyIdentifier<T>, instance: R): R {
127172
if (this.isDisposed) {
128173
throw ConnectionClosedError.create();
129174
}
130-
const invocationHandler = this.locals.get(identifier.id);
131-
if (!invocationHandler) {
132-
const handler = new RpcInvocationHandler({ id: identifier.id, target: instance, encoder: this.encoder, decoder: this.decoder });
133-
134-
const channel = this.multiplexer.getOpenChannel(identifier.id);
135-
if (channel) {
136-
handler.listen(channel);
137-
} else {
138-
const channelOpenListener = this.multiplexer.onDidOpenChannel(event => {
139-
if (event.id === identifier.id) {
140-
handler.listen(event.channel);
141-
channelOpenListener.dispose();
142-
}
143-
});
144-
}
145-
146-
this.locals.set(identifier.id, handler);
175+
if (!this.locals.has(identifier.id)) {
176+
this.locals.set(identifier.id, instance);
147177
if (Disposable.is(instance)) {
148178
this.toDispose.push(instance);
149179
}
150180
this.toDispose.push(Disposable.create(() => this.locals.delete(identifier.id)));
151-
152181
}
153182
return instance;
154183
}
155184
}
156185

157-
export class ProxySynchronizerImpl implements ProxySynchronizer {
158-
159-
private readonly runningInitializations = new Set<string>();
160-
161-
private _pendingProxyInitializations: Deferred<void>;
162-
163-
constructor() {
164-
this._pendingProxyInitializations = new Deferred();
165-
/* after creation no init is active */
166-
this._pendingProxyInitializations.resolve();
167-
}
168-
169-
startProxyInitialization(id: string, init: Promise<void>): void {
170-
if (this.runningInitializations.size === 0) {
171-
this._pendingProxyInitializations = new Deferred();
172-
}
173-
init.then(() => this.finishedProxyInitialization(id));
174-
this.runningInitializations.add(id);
175-
}
176-
177-
protected finishedProxyInitialization(id: string): void {
178-
this.runningInitializations.delete(id);
179-
if (this.runningInitializations.size === 0) {
180-
this._pendingProxyInitializations.resolve();
181-
}
182-
}
183-
184-
pendingProxyInitializations(): Promise<void> {
185-
return this._pendingProxyInitializations.promise;
186-
}
187-
188-
}
189-
190186
/**
191187
* Wraps and underlying channel to send/receive multiple messages in one go:
192188
* - multiple messages to be sent from one stack get sent in bulk at `process.nextTick`.

0 commit comments

Comments
 (0)