Skip to content

Commit 9e19d40

Browse files
committed
Simplify generics usage
The RpcMessage<T> type is now implemented directly on the protobuf types using partial classes.
1 parent 44b1c32 commit 9e19d40

File tree

6 files changed

+86
-129
lines changed

6 files changed

+86
-129
lines changed

.idea/.idea.Coder.Desktop/.idea/workspace.xml

Lines changed: 29 additions & 29 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Rpc.Proto/RpcMessage.cs

Lines changed: 16 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -9,63 +9,36 @@ namespace Coder.Desktop.Rpc.Proto;
99
public abstract class RpcMessage<T> where T : IMessage<T>
1010
{
1111
/// <summary>
12-
/// The inner RPC component of the message.
12+
/// The inner RPC component of the message. This is a separate field as the C# compiler does not allow the existing Rpc
13+
/// field to be overridden or implement this abstract property.
1314
/// </summary>
14-
public abstract RPC Rpc { get; set; }
15+
public abstract RPC RpcField { get; set; }
1516

1617
/// <summary>
17-
/// The actual full message.
18+
/// The inner message component of the message. This exists so values of type RpcMessage can easily get message
19+
/// contents.
1820
/// </summary>
19-
public abstract T Message { get; set; }
21+
public abstract T Message { get; }
2022
}
2123

22-
/// <summary>
23-
/// Wraps a protobuf <c>ManagerMessage</c> to implement <c>RpcMessage</c>.
24-
/// </summary>
25-
public class ManagerMessageWrapper(ManagerMessage message) : RpcMessage<ManagerMessage>
24+
public partial class ManagerMessage : RpcMessage<ManagerMessage>
2625
{
27-
public override RPC Rpc
26+
public override RPC RpcField
2827
{
29-
get => Message.Rpc;
30-
set => Message.Rpc = value;
28+
get => Rpc;
29+
set => Rpc = value;
3130
}
3231

33-
public override ManagerMessage Message { get; set; } = message;
32+
public override ManagerMessage Message => this;
3433
}
3534

36-
/// <summary>
37-
/// Wraps a protobuf <c>TunnelMessage</c> to implement <c>RpcMessage</c>.
38-
/// </summary>
39-
public class TunnelMessageWrapper(TunnelMessage message) : RpcMessage<TunnelMessage>
35+
public partial class TunnelMessage : RpcMessage<TunnelMessage>
4036
{
41-
public override RPC Rpc
37+
public override RPC RpcField
4238
{
43-
get => Message.Rpc;
44-
set => Message.Rpc = value;
39+
get => Rpc;
40+
set => Rpc = value;
4541
}
4642

47-
public override TunnelMessage Message { get; set; } = message;
43+
public override TunnelMessage Message => this;
4844
}
49-
50-
/// <summary>
51-
/// Provides extension methods for Protobuf messages.
52-
/// </summary>
53-
public static class ProtoWrappers
54-
{
55-
/// <summary>
56-
/// Attempts to convert a Protobuf message to an <c>RpcMessage</c>.
57-
/// </summary>
58-
/// <param name="message">Protobuf message</param>
59-
/// <typeparam name="T">Protobuf message type</typeparam>
60-
/// <returns>A wrapped message</returns>
61-
/// <exception cref="ArgumentException">Unknown message type</exception>
62-
public static RpcMessage<T> ToRpcMessage<T>(this IMessage<T> message) where T : IMessage<T>
63-
{
64-
return message switch
65-
{
66-
TunnelMessage tunnelMessage => (RpcMessage<T>)(object)new TunnelMessageWrapper(tunnelMessage),
67-
ManagerMessage managerMessage => (RpcMessage<T>)(object)new ManagerMessageWrapper(managerMessage),
68-
_ => throw new ArgumentException($"Unknown message type {message.GetType()}"),
69-
};
70-
}
71-
}

Rpc/Serdes.cs

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -30,15 +30,13 @@ public void Dispose()
3030
/// <summary>
3131
/// Serdes provides serialization and deserialization of messages read from a Stream.
3232
/// </summary>
33-
public class Serdes<TSi, TS, TRi, TR>
34-
where TSi : IMessage<TSi>
35-
where TS : RpcMessage<TSi>
36-
where TRi : class, IMessage<TRi>, new()
37-
where TR : RpcMessage<TRi>
33+
public class Serdes<TS, TR>
34+
where TS : RpcMessage<TS>, IMessage<TS>
35+
where TR : RpcMessage<TR>, IMessage<TR>, new()
3836
{
3937
private const int MaxMessageSize = 0x1000000; // 16MiB
4038

41-
private readonly MessageParser<TRi> _parser = new(() => new TRi());
39+
private readonly MessageParser<TR> _parser = new(() => new TR());
4240

4341
private readonly RaiiSemaphoreSlim _readLock = new(1, 1);
4442
private readonly RaiiSemaphoreSlim _writeLock = new(1, 1);
@@ -54,7 +52,7 @@ public async Task WriteMessage(Stream conn, TS message, CancellationToken ct = d
5452
{
5553
using var _ = await _writeLock.LockAsync(ct);
5654

57-
var mb = message.Message.ToByteArray();
55+
var mb = message.ToByteArray();
5856
if (mb.Length > MaxMessageSize)
5957
throw new ArgumentException($"Marshalled message size {mb.Length} exceeds maximum {MaxMessageSize}");
6058

@@ -87,7 +85,6 @@ public async Task<TR> ReadMessage(Stream conn, CancellationToken ct = default)
8785

8886
var msg = _parser.ParseFrom(msgBytes);
8987
if (msg == null) throw new IOException("Failed to parse message");
90-
91-
return msg.ToRpcMessage() as TR ?? throw new InvalidOperationException("Failed to cast message");
88+
return msg;
9289
}
93-
}
90+
}

Rpc/Speaker.cs

Lines changed: 22 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -112,23 +112,17 @@ public ReadOnlyMemory<byte> ToBytes()
112112
/// </summary>
113113
/// <param name="speaker">Speaker to use for sending reply</param>
114114
/// <param name="message">Original received message</param>
115-
public class ReplyableRpcMessage<TSi, TS, TRi, TR>(Speaker<TSi, TS, TRi, TR> speaker, TR message) : RpcMessage<TRi>
116-
where TSi : IMessage<TSi>
117-
where TS : RpcMessage<TSi>
118-
where TRi : class, IMessage<TRi>, new()
119-
where TR : RpcMessage<TRi>
115+
public class ReplyableRpcMessage<TS, TR>(Speaker<TS, TR> speaker, TR message) : RpcMessage<TR>
116+
where TS : RpcMessage<TS>, IMessage<TS>
117+
where TR : RpcMessage<TR>, IMessage<TR>, new()
120118
{
121-
public override RPC Rpc
119+
public override RPC RpcField
122120
{
123-
get => message.Rpc;
124-
set => message.Rpc = value;
121+
get => message.RpcField;
122+
set => message.RpcField = value;
125123
}
126124

127-
public override TRi Message
128-
{
129-
get => message.Message;
130-
set => message.Message = value;
131-
}
125+
public override TR Message => message;
132126

133127
/// <summary>
134128
/// Sends a reply to the original message.
@@ -144,20 +138,15 @@ public async Task SendReply(TS reply, CancellationToken ct = default)
144138
/// <summary>
145139
/// Manages an RPC connection between two peers, allowing messages to be sent and received.
146140
/// </summary>
147-
/// <typeparam name="TSi">The inner message type for sent messages</typeparam>
148141
/// <typeparam name="TS">The wrapped message type for sent messages</typeparam>
149-
/// <typeparam name="TRi">The inner message type for received messages</typeparam>
150142
/// <typeparam name="TR">The wrapped message type for received messages</typeparam>
151-
public class Speaker<TSi, TS, TRi, TR> : IDisposable, IAsyncDisposable
152-
// TODO: it would be nice if this could just be the inner or wrapped types instead
153-
where TSi : IMessage<TSi>
154-
where TS : RpcMessage<TSi>
155-
where TRi : class, IMessage<TRi>, new()
156-
where TR : RpcMessage<TRi>
143+
public class Speaker<TS, TR> : IDisposable, IAsyncDisposable
144+
where TS : RpcMessage<TS>, IMessage<TS>
145+
where TR : RpcMessage<TR>, IMessage<TR>, new()
157146
{
158147
public delegate void OnErrorDelegate(Exception e);
159148

160-
public delegate void OnReceiveDelegate(ReplyableRpcMessage<TSi, TS, TRi, TR> message);
149+
public delegate void OnReceiveDelegate(ReplyableRpcMessage<TS, TR> message);
161150

162151
private readonly Stream _conn;
163152

@@ -168,7 +157,7 @@ public class Speaker<TSi, TS, TRi, TR> : IDisposable, IAsyncDisposable
168157

169158
private readonly ConcurrentDictionary<ulong, TaskCompletionSource<TR>> _pendingReplies = new();
170159
private readonly Task _receiveTask;
171-
private readonly Serdes<TSi, TS, TRi, TR> _serdes = new();
160+
private readonly Serdes<TS, TR> _serdes = new();
172161
private readonly SpeakerRole _them;
173162

174163
// _lastMessageId is incremented using an atomic operation, and as such the
@@ -282,15 +271,15 @@ private async Task ReceiveLoop(CancellationToken ct = default)
282271
while (!ct.IsCancellationRequested)
283272
{
284273
var message = await _serdes.ReadMessage(_conn, ct);
285-
if (message.Rpc.ResponseTo != 0)
274+
if (message.RpcField.ResponseTo != 0)
286275
// Look up the TaskCompletionSource for the message ID and
287276
// complete it with the message.
288-
if (_pendingReplies.TryRemove(message.Rpc.ResponseTo, out var tcs))
277+
if (_pendingReplies.TryRemove(message.RpcField.ResponseTo, out var tcs))
289278
tcs.SetResult(message);
290279

291280
// TODO: we should log unknown replies
292281
// Start a new task in the background to handle the message.
293-
_ = Task.Run(() => Receive.Invoke(new ReplyableRpcMessage<TSi, TS, TRi, TR>(this, message)), ct);
282+
_ = Task.Run(() => Receive.Invoke(new ReplyableRpcMessage<TS, TR>(this, message)), ct);
294283
}
295284
}
296285
catch (OperationCanceledException)
@@ -310,7 +299,7 @@ private async Task ReceiveLoop(CancellationToken ct = default)
310299
/// <param name="ct">Optional cancellation token</param>
311300
public async Task SendMessage(TS message, CancellationToken ct = default)
312301
{
313-
message.Rpc = new RPC
302+
message.RpcField = new RPC
314303
{
315304
MsgId = Interlocked.Add(ref _lastMessageId, 1),
316305
ResponseTo = 0,
@@ -327,7 +316,7 @@ public async Task SendMessage(TS message, CancellationToken ct = default)
327316
/// <returns>Received reply</returns>
328317
public async ValueTask<TR> SendMessageAwaitReply(TS message, CancellationToken ct = default)
329318
{
330-
message.Rpc = new RPC
319+
message.RpcField = new RPC
331320
{
332321
MsgId = Interlocked.Add(ref _lastMessageId, 1),
333322
ResponseTo = 0,
@@ -336,7 +325,7 @@ public async ValueTask<TR> SendMessageAwaitReply(TS message, CancellationToken c
336325
// Configure a TaskCompletionSource to complete when the reply is
337326
// received.
338327
var tcs = new TaskCompletionSource<TR>();
339-
_pendingReplies[message.Rpc.MsgId] = tcs;
328+
_pendingReplies[message.RpcField.MsgId] = tcs;
340329
try
341330
{
342331
await _serdes.WriteMessage(_conn, message, ct);
@@ -347,7 +336,7 @@ public async ValueTask<TR> SendMessageAwaitReply(TS message, CancellationToken c
347336
{
348337
// Clean up the pending reply if it was not received before
349338
// cancellation.
350-
_pendingReplies.TryRemove(message.Rpc.MsgId, out _);
339+
_pendingReplies.TryRemove(message.RpcField.MsgId, out _);
351340
}
352341
}
353342

@@ -359,11 +348,11 @@ public async ValueTask<TR> SendMessageAwaitReply(TS message, CancellationToken c
359348
/// <param name="ct">Optional cancellation token</param>
360349
public async Task SendReply(TR originalMessage, TS reply, CancellationToken ct = default)
361350
{
362-
reply.Rpc = new RPC
351+
reply.RpcField = new RPC
363352
{
364353
MsgId = Interlocked.Add(ref _lastMessageId, 1),
365-
ResponseTo = originalMessage.Rpc.MsgId,
354+
ResponseTo = originalMessage.RpcField.MsgId,
366355
};
367356
await _serdes.WriteMessage(_conn, reply, ct);
368357
}
369-
}
358+
}

Rpc/Version.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,4 +68,4 @@ public void Validate(ApiVersion other)
6868
if (AdditionalMajors.Any(major => other.Major == major)) return;
6969
throw new ApiCompatibilityException(this, other, "Version is no longer supported");
7070
}
71-
}
71+
}

0 commit comments

Comments
 (0)