Skip to content

Commit 1158ab0

Browse files
authored
Merge branch 'develop-v1' into remove-main-project-reference
2 parents aba9fc5 + 0a1e6e6 commit 1158ab0

File tree

4 files changed

+174
-18
lines changed

4 files changed

+174
-18
lines changed

RELEASE_NOTES.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
### New in 1.2.1 (working version, not released yet)
22

33
* Fix: Prevent multiple calls of the same async subscribers when dispatching events (by @alexeyfv)
4+
* Fix: Better exception handling and propagation in `ReadModelPopulator`
45

56
### New in 1.2.0 (released 2025-03-09)
67

Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
// The MIT License (MIT)
2+
//
3+
// Copyright (c) 2015-2025 Rasmus Mikkelsen
4+
// https://github.com/eventflow/EventFlow
5+
//
6+
// Permission is hereby granted, free of charge, to any person obtaining a copy of
7+
// this software and associated documentation files (the "Software"), to deal in
8+
// the Software without restriction, including without limitation the rights to
9+
// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
10+
// the Software, and to permit persons to whom the Software is furnished to do so,
11+
// subject to the following conditions:
12+
//
13+
// The above copyright notice and this permission notice shall be included in all
14+
// copies or substantial portions of the Software.
15+
//
16+
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17+
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
18+
// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
19+
// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
20+
// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
21+
// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
22+
23+
using System;
24+
using System.Collections.Generic;
25+
using System.Threading.Tasks;
26+
using System.Threading;
27+
using EventFlow.Aggregates;
28+
using EventFlow.Core;
29+
using EventFlow.EventStores;
30+
using EventFlow.Extensions;
31+
using EventFlow.ReadStores;
32+
using Microsoft.Extensions.DependencyInjection;
33+
using NUnit.Framework;
34+
35+
namespace EventFlow.Tests.Exploration
36+
{
37+
// Related https://github.com/eventflow/EventFlow/issues/1083
38+
public class ReadModelRepopulateExplorationTest
39+
{
40+
private IServiceProvider _serviceProvider;
41+
42+
[SetUp]
43+
public void SetUp()
44+
{
45+
_serviceProvider = EventFlowOptions.New()
46+
.AddEvents(new[] { typeof(EventV1), typeof(EventV2) })
47+
.AddEventUpgraders(typeof(BrokenUpgradeV1ToV2))
48+
.UseInMemoryReadStoreFor<UpgradeReadModel>()
49+
.ServiceCollection.BuildServiceProvider();
50+
}
51+
52+
[TearDown]
53+
public void TearDown()
54+
{
55+
(_serviceProvider as IDisposable)?.Dispose();
56+
}
57+
58+
[Test]
59+
public async Task ActuallyStops()
60+
{
61+
// Arrange
62+
var id = BrokenId.New;
63+
var aggregateStore = _serviceProvider.GetRequiredService<IAggregateStore>();
64+
var readModelPopulator = _serviceProvider.GetRequiredService<IReadModelPopulator>();
65+
await aggregateStore.UpdateAsync<BrokenAggregate, BrokenId>(
66+
id,
67+
SourceId.New,
68+
(a, c) =>
69+
{
70+
a.EmitUpgradeEventV1();
71+
return Task.CompletedTask;
72+
},
73+
CancellationToken.None);
74+
75+
// Act and Assert
76+
using var timeoutSource = new CancellationTokenSource(TimeSpan.FromSeconds(2));
77+
Assert.ThrowsAsync<Exception>(() => readModelPopulator.PopulateAsync(typeof(UpgradeReadModel), timeoutSource.Token));
78+
}
79+
80+
public class UpgradeReadModel : IReadModel,
81+
IAmReadModelFor<BrokenAggregate, BrokenId, EventV1>,
82+
IAmReadModelFor<BrokenAggregate, BrokenId, EventV2>
83+
{
84+
public Task ApplyAsync(
85+
IReadModelContext context,
86+
IDomainEvent<BrokenAggregate, BrokenId, EventV1> domainEvent,
87+
CancellationToken cancellationToken)
88+
{
89+
return Task.CompletedTask;
90+
}
91+
92+
public Task ApplyAsync(
93+
IReadModelContext context,
94+
IDomainEvent<BrokenAggregate, BrokenId, EventV2> domainEvent,
95+
CancellationToken cancellationToken)
96+
{
97+
return Task.CompletedTask;
98+
}
99+
}
100+
101+
public class BrokenId : Identity<BrokenId>
102+
{
103+
public BrokenId(string value) : base(value) { }
104+
}
105+
106+
public class BrokenAggregate : AggregateRoot<BrokenAggregate, BrokenId>,
107+
IEmit<EventV1>,
108+
IEmit<EventV2>
109+
{
110+
public BrokenAggregate(BrokenId id) : base(id) { }
111+
112+
public bool V1Applied { get; private set; }
113+
public bool V2Applied { get; private set; }
114+
115+
public void EmitUpgradeEventV1()
116+
{
117+
Emit(new EventV1());
118+
}
119+
120+
public void Apply(EventV1 aggregateEvent)
121+
{
122+
V1Applied = true;
123+
}
124+
125+
public void Apply(EventV2 aggregateEvent)
126+
{
127+
V2Applied = true;
128+
}
129+
}
130+
131+
public class EventV1 : IAggregateEvent<BrokenAggregate, BrokenId>
132+
{
133+
}
134+
135+
public class EventV2 : IAggregateEvent<BrokenAggregate, BrokenId>
136+
{
137+
}
138+
139+
public class BrokenUpgradeV1ToV2 : EventUpgraderNonAsync<BrokenAggregate, BrokenId>
140+
{
141+
protected override IEnumerable<IDomainEvent<BrokenAggregate, BrokenId>> Upgrade(
142+
IDomainEvent<BrokenAggregate, BrokenId> domainEvent)
143+
{
144+
throw new Exception("Always broken!");
145+
}
146+
}
147+
}
148+
}

Source/EventFlow/EventStores/EventUpgrader.cs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
using System.Collections.Generic;
2424
using System.Runtime.CompilerServices;
2525
using System.Threading;
26-
using System.Threading.Tasks;
2726
using EventFlow.Aggregates;
2827
using EventFlow.Core;
2928

@@ -43,6 +42,9 @@ public override async IAsyncEnumerable<IDomainEvent<TAggregate, TIdentity>> Upgr
4342
IEventUpgradeContext eventUpgradeContext,
4443
[EnumeratorCancellation] CancellationToken cancellationToken)
4544
{
45+
// We check as it now before calling as it is not passed to the legacy method
46+
cancellationToken.ThrowIfCancellationRequested();
47+
4648
foreach (var upgradedDomainEvent in Upgrade(domainEvent))
4749
{
4850
yield return upgradedDomainEvent;
@@ -60,7 +62,7 @@ public virtual async IAsyncEnumerable<IDomainEvent> UpgradeAsync(
6062
[EnumeratorCancellation] CancellationToken cancellationToken)
6163
{
6264
var castDomainEvent = (IDomainEvent<TAggregate, TIdentity>) domainEvent;
63-
await foreach (var e in UpgradeAsync(castDomainEvent, eventUpgradeContext, cancellationToken).WithCancellation(cancellationToken))
65+
await foreach (var e in UpgradeAsync(castDomainEvent, eventUpgradeContext, cancellationToken))
6466
{
6567
yield return e;
6668
}

Source/EventFlow/ReadStores/ReadModelPopulator.cs

Lines changed: 21 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
using System;
2424
using System.Collections.Concurrent;
2525
using System.Collections.Generic;
26-
using System.Data;
2726
using System.Diagnostics;
2827
using System.Linq;
2928
using System.Reflection;
@@ -48,7 +47,7 @@ public class ReadModelPopulator : IReadModelPopulator
4847
private readonly IServiceProvider _serviceProvider;
4948
private readonly IEventUpgradeContextFactory _eventUpgradeContextFactory;
5049
private readonly IMemoryCache _memoryCache;
51-
private ConcurrentQueue<AllEventsPage> _pipedEvents = new ConcurrentQueue<AllEventsPage>();
50+
private readonly ConcurrentQueue<AllEventsPage> _pipedEvents = new ConcurrentQueue<AllEventsPage>();
5251

5352
public ReadModelPopulator(
5453
ILogger<ReadModelPopulator> logger,
@@ -118,14 +117,15 @@ public async Task PopulateAsync(IReadOnlyCollection<Type> readModelTypes, Cancel
118117
var combinedReadModelTypeString = string.Join(", ", readModelTypes.Select(type => type.PrettyPrint()));
119118
_logger.LogInformation("Starting populating of {ReadModelTypes}", combinedReadModelTypeString);
120119

121-
var loadEventsTasks = LoadEvents(cancellationToken);
122-
var processEventQueueTask = ProcessEventQueue(readModelTypes, cancellationToken);
120+
var loadEventsTasks = LoadEventsAsync(cancellationToken);
121+
var processEventQueueTask = ProcessEventQueueAsync(readModelTypes, cancellationToken);
122+
123123
await Task.WhenAll(loadEventsTasks, processEventQueueTask);
124124

125-
_logger.LogInformation("Population of readmodels completed");
125+
_logger.LogInformation("Population of read models completed");
126126
}
127127

128-
private async Task LoadEvents(CancellationToken cancellationToken)
128+
private async Task LoadEventsAsync(CancellationToken cancellationToken)
129129
{
130130
long totalEvents = 0;
131131
var currentPosition = GlobalPosition.Start;
@@ -162,22 +162,23 @@ private async Task LoadEvents(CancellationToken cancellationToken)
162162
}
163163
}
164164

165-
private async Task ProcessEventQueue(IReadOnlyCollection<Type> readModelTypes, CancellationToken cancellationToken)
165+
private async Task ProcessEventQueueAsync(
166+
IReadOnlyCollection<Type> readModelTypes,
167+
CancellationToken cancellationToken)
166168
{
167169
var domainEventsToProcess = new List<IDomainEvent>();
168-
AllEventsPage fetchedEvents;
169170

170171
var hasMoreEvents = true;
171172
do
172173
{
173174
var noEventsToReady = !_pipedEvents.Any();
174175
if (noEventsToReady)
175176
{
176-
await Task.Delay(100);
177+
await Task.Delay(100, cancellationToken);
177178
continue;
178179
}
179180

180-
_pipedEvents.TryDequeue(out fetchedEvents);
181+
_pipedEvents.TryDequeue(out var fetchedEvents);
181182
if (fetchedEvents == null)
182183
{
183184
continue;
@@ -190,7 +191,7 @@ private async Task ProcessEventQueue(IReadOnlyCollection<Type> readModelTypes, C
190191
var processEvents = !hasMoreEvents || batchExceedsThreshold;
191192
if (processEvents)
192193
{
193-
var readModelUpdateTasks = readModelTypes.Select(readModelType => ProcessEvents(readModelType, domainEventsToProcess, cancellationToken));
194+
var readModelUpdateTasks = readModelTypes.Select(readModelType => ProcessEventsAsync(readModelType, domainEventsToProcess, cancellationToken));
194195
await Task.WhenAll(readModelUpdateTasks);
195196

196197
domainEventsToProcess.Clear();
@@ -199,7 +200,10 @@ private async Task ProcessEventQueue(IReadOnlyCollection<Type> readModelTypes, C
199200
while (hasMoreEvents);
200201
}
201202

202-
private async Task ProcessEvents(Type readModelType, IReadOnlyCollection<IDomainEvent> processEvents, CancellationToken cancellationToken)
203+
private async Task ProcessEventsAsync(
204+
Type readModelType,
205+
IReadOnlyCollection<IDomainEvent> processEvents,
206+
CancellationToken cancellationToken)
203207
{
204208
try
205209
{
@@ -209,11 +213,12 @@ private async Task ProcessEvents(Type readModelType, IReadOnlyCollection<IDomain
209213

210214
var readModelTypes = new[]
211215
{
212-
typeof( IAmReadModelFor<,,> )
216+
typeof(IAmReadModelFor<,,> )
213217
};
214218

215-
var aggregateEventTypes = _memoryCache.GetOrCreate(CacheKey.With(GetType(), readModelType.ToString(), nameof(ProcessEvents)),
216-
e => new HashSet<Type>(readModelType.GetTypeInfo()
219+
var aggregateEventTypes = _memoryCache.GetOrCreate(
220+
CacheKey.With(GetType(), readModelType.ToString(), nameof(ProcessEventsAsync)),
221+
_ => new HashSet<Type>(readModelType.GetTypeInfo()
217222
.GetInterfaces()
218223
.Where(i => i.GetTypeInfo().IsGenericType && readModelTypes.Contains(i.GetGenericTypeDefinition()))
219224
.Select(i => i.GetTypeInfo().GetGenericArguments()[2])));
@@ -249,7 +254,7 @@ private async Task ProcessEvents(Type readModelType, IReadOnlyCollection<IDomain
249254
}
250255
catch (Exception e)
251256
{
252-
_logger.LogWarning($"Exception when populating: {readModelType}. Details: {e}");
257+
_logger.LogError(e, $"Exception when populating: {readModelType}");
253258
}
254259
}
255260

0 commit comments

Comments
 (0)