16
16
/* eslint-disable @typescript-eslint/no-explicit-any */
17
17
18
18
import { CancellationToken , CancellationTokenSource } from '../cancellation' ;
19
- import { Disposable , DisposableCollection } from '../disposable' ;
19
+ import { DisposableWrapper , Disposable , DisposableCollection } from '../disposable' ;
20
20
import { Emitter , Event } from '../event' ;
21
21
import { Deferred } from '../promise-util' ;
22
22
import { Channel } from './channel' ;
@@ -57,6 +57,7 @@ export class RpcProtocol {
57
57
static readonly CANCELLATION_TOKEN_KEY = 'add.cancellation.token' ;
58
58
59
59
protected readonly pendingRequests : Map < number , Deferred < any > > = new Map ( ) ;
60
+ protected readonly pendingRequestCancellationEventListeners : Map < number , DisposableWrapper > = new Map ( ) ;
60
61
61
62
protected nextMessageId : number = 0 ;
62
63
@@ -80,6 +81,8 @@ export class RpcProtocol {
80
81
channel . onClose ( event => {
81
82
this . pendingRequests . forEach ( pending => pending . reject ( new Error ( event . reason ) ) ) ;
82
83
this . pendingRequests . clear ( ) ;
84
+ this . pendingRequestCancellationEventListeners . forEach ( disposable => disposable . dispose ( ) ) ;
85
+ this . pendingRequestCancellationEventListeners . clear ( ) ;
83
86
this . toDispose . dispose ( ) ;
84
87
} ) ;
85
88
this . toDispose . push ( channel . onMessage ( readBuffer => this . handleMessage ( this . decoder . parse ( readBuffer ( ) ) ) ) ) ;
@@ -131,6 +134,7 @@ export class RpcProtocol {
131
134
} else {
132
135
throw new Error ( `No reply handler for reply with id: ${ id } ` ) ;
133
136
}
137
+ this . disposeCancellationEventListener ( id ) ;
134
138
}
135
139
136
140
protected handleReplyErr ( id : number , error : any ) : void {
@@ -141,6 +145,15 @@ export class RpcProtocol {
141
145
} else {
142
146
throw new Error ( `No reply handler for error reply with id: ${ id } ` ) ;
143
147
}
148
+ this . disposeCancellationEventListener ( id ) ;
149
+ }
150
+
151
+ protected disposeCancellationEventListener ( id : number ) : void {
152
+ const toDispose = this . pendingRequestCancellationEventListeners . get ( id ) ;
153
+ if ( toDispose ) {
154
+ this . pendingRequestCancellationEventListeners . delete ( id ) ;
155
+ toDispose . dispose ( ) ;
156
+ }
144
157
}
145
158
146
159
sendRequest < T > ( method : string , args : any [ ] ) : Promise < T > {
@@ -157,14 +170,21 @@ export class RpcProtocol {
157
170
158
171
this . pendingRequests . set ( id , reply ) ;
159
172
173
+ // register disposable before output.commit() even when not available yet
174
+ const disposableWrapper = new DisposableWrapper ( ) ;
175
+ this . pendingRequestCancellationEventListeners . set ( id , disposableWrapper ) ;
176
+
160
177
const output = this . channel . getWriteBuffer ( ) ;
161
178
this . encoder . request ( output , id , method , args ) ;
162
179
output . commit ( ) ;
163
180
164
181
if ( cancellationToken ?. isCancellationRequested ) {
165
182
this . sendCancel ( id ) ;
166
183
} else {
167
- cancellationToken ?. onCancellationRequested ( ( ) => this . sendCancel ( id ) ) ;
184
+ const disposable = cancellationToken ?. onCancellationRequested ( ( ) => this . sendCancel ( id ) ) ;
185
+ if ( disposable ) {
186
+ disposableWrapper . set ( disposable ) ;
187
+ }
168
188
}
169
189
170
190
return reply . promise ;
0 commit comments