@@ -112,23 +112,17 @@ public ReadOnlyMemory<byte> ToBytes()
112
112
/// </summary>
113
113
/// <param name="speaker">Speaker to use for sending reply</param>
114
114
/// <param name="message">Original received message</param>
115
- public class ReplyableRpcMessage < TSi , TS , TRi , TR > ( Speaker < TSi , TS , TRi , TR > speaker , TR message ) : RpcMessage < TRi >
116
- where TSi : IMessage < TSi >
117
- where TS : RpcMessage < TSi >
118
- where TRi : class , IMessage < TRi > , new ( )
119
- where TR : RpcMessage < TRi >
115
+ public class ReplyableRpcMessage < TS , TR > ( Speaker < TS , TR > speaker , TR message ) : RpcMessage < TR >
116
+ where TS : RpcMessage < TS > , IMessage < TS >
117
+ where TR : RpcMessage < TR > , IMessage < TR > , new ( )
120
118
{
121
- public override RPC Rpc
119
+ public override RPC RpcField
122
120
{
123
- get => message . Rpc ;
124
- set => message . Rpc = value ;
121
+ get => message . RpcField ;
122
+ set => message . RpcField = value ;
125
123
}
126
124
127
- public override TRi Message
128
- {
129
- get => message . Message ;
130
- set => message . Message = value ;
131
- }
125
+ public override TR Message => message ;
132
126
133
127
/// <summary>
134
128
/// Sends a reply to the original message.
@@ -144,20 +138,15 @@ public async Task SendReply(TS reply, CancellationToken ct = default)
144
138
/// <summary>
145
139
/// Manages an RPC connection between two peers, allowing messages to be sent and received.
146
140
/// </summary>
147
- /// <typeparam name="TSi">The inner message type for sent messages</typeparam>
148
141
/// <typeparam name="TS">The wrapped message type for sent messages</typeparam>
149
- /// <typeparam name="TRi">The inner message type for received messages</typeparam>
150
142
/// <typeparam name="TR">The wrapped message type for received messages</typeparam>
151
- public class Speaker < TSi , TS , TRi , TR > : IDisposable , IAsyncDisposable
152
- // TODO: it would be nice if this could just be the inner or wrapped types instead
153
- where TSi : IMessage < TSi >
154
- where TS : RpcMessage < TSi >
155
- where TRi : class , IMessage < TRi > , new ( )
156
- where TR : RpcMessage < TRi >
143
+ public class Speaker < TS , TR > : IDisposable , IAsyncDisposable
144
+ where TS : RpcMessage < TS > , IMessage < TS >
145
+ where TR : RpcMessage < TR > , IMessage < TR > , new ( )
157
146
{
158
147
public delegate void OnErrorDelegate ( Exception e ) ;
159
148
160
- public delegate void OnReceiveDelegate ( ReplyableRpcMessage < TSi , TS , TRi , TR > message ) ;
149
+ public delegate void OnReceiveDelegate ( ReplyableRpcMessage < TS , TR > message ) ;
161
150
162
151
private readonly Stream _conn ;
163
152
@@ -168,7 +157,7 @@ public class Speaker<TSi, TS, TRi, TR> : IDisposable, IAsyncDisposable
168
157
169
158
private readonly ConcurrentDictionary < ulong , TaskCompletionSource < TR > > _pendingReplies = new ( ) ;
170
159
private readonly Task _receiveTask ;
171
- private readonly Serdes < TSi , TS , TRi , TR > _serdes = new ( ) ;
160
+ private readonly Serdes < TS , TR > _serdes = new ( ) ;
172
161
private readonly SpeakerRole _them ;
173
162
174
163
// _lastMessageId is incremented using an atomic operation, and as such the
@@ -282,15 +271,15 @@ private async Task ReceiveLoop(CancellationToken ct = default)
282
271
while ( ! ct . IsCancellationRequested )
283
272
{
284
273
var message = await _serdes . ReadMessage ( _conn , ct ) ;
285
- if ( message . Rpc . ResponseTo != 0 )
274
+ if ( message . RpcField . ResponseTo != 0 )
286
275
// Look up the TaskCompletionSource for the message ID and
287
276
// complete it with the message.
288
- if ( _pendingReplies . TryRemove ( message . Rpc . ResponseTo , out var tcs ) )
277
+ if ( _pendingReplies . TryRemove ( message . RpcField . ResponseTo , out var tcs ) )
289
278
tcs . SetResult ( message ) ;
290
279
291
280
// TODO: we should log unknown replies
292
281
// Start a new task in the background to handle the message.
293
- _ = Task . Run ( ( ) => Receive . Invoke ( new ReplyableRpcMessage < TSi , TS , TRi , TR > ( this , message ) ) , ct ) ;
282
+ _ = Task . Run ( ( ) => Receive . Invoke ( new ReplyableRpcMessage < TS , TR > ( this , message ) ) , ct ) ;
294
283
}
295
284
}
296
285
catch ( OperationCanceledException )
@@ -310,7 +299,7 @@ private async Task ReceiveLoop(CancellationToken ct = default)
310
299
/// <param name="ct">Optional cancellation token</param>
311
300
public async Task SendMessage ( TS message , CancellationToken ct = default )
312
301
{
313
- message . Rpc = new RPC
302
+ message . RpcField = new RPC
314
303
{
315
304
MsgId = Interlocked . Add ( ref _lastMessageId , 1 ) ,
316
305
ResponseTo = 0 ,
@@ -327,7 +316,7 @@ public async Task SendMessage(TS message, CancellationToken ct = default)
327
316
/// <returns>Received reply</returns>
328
317
public async ValueTask < TR > SendMessageAwaitReply ( TS message , CancellationToken ct = default )
329
318
{
330
- message . Rpc = new RPC
319
+ message . RpcField = new RPC
331
320
{
332
321
MsgId = Interlocked . Add ( ref _lastMessageId , 1 ) ,
333
322
ResponseTo = 0 ,
@@ -336,7 +325,7 @@ public async ValueTask<TR> SendMessageAwaitReply(TS message, CancellationToken c
336
325
// Configure a TaskCompletionSource to complete when the reply is
337
326
// received.
338
327
var tcs = new TaskCompletionSource < TR > ( ) ;
339
- _pendingReplies [ message . Rpc . MsgId ] = tcs ;
328
+ _pendingReplies [ message . RpcField . MsgId ] = tcs ;
340
329
try
341
330
{
342
331
await _serdes . WriteMessage ( _conn , message , ct ) ;
@@ -347,7 +336,7 @@ public async ValueTask<TR> SendMessageAwaitReply(TS message, CancellationToken c
347
336
{
348
337
// Clean up the pending reply if it was not received before
349
338
// cancellation.
350
- _pendingReplies . TryRemove ( message . Rpc . MsgId , out _ ) ;
339
+ _pendingReplies . TryRemove ( message . RpcField . MsgId , out _ ) ;
351
340
}
352
341
}
353
342
@@ -359,11 +348,11 @@ public async ValueTask<TR> SendMessageAwaitReply(TS message, CancellationToken c
359
348
/// <param name="ct">Optional cancellation token</param>
360
349
public async Task SendReply ( TR originalMessage , TS reply , CancellationToken ct = default )
361
350
{
362
- reply . Rpc = new RPC
351
+ reply . RpcField = new RPC
363
352
{
364
353
MsgId = Interlocked . Add ( ref _lastMessageId , 1 ) ,
365
- ResponseTo = originalMessage . Rpc . MsgId ,
354
+ ResponseTo = originalMessage . RpcField . MsgId ,
366
355
} ;
367
356
await _serdes . WriteMessage ( _conn , reply , ct ) ;
368
357
}
369
- }
358
+ }
0 commit comments