@@ -81,7 +81,7 @@ protected override void MessageSeversForIdsOrJson(IEnumerable<IServerAddress> se
81
81
/// <returns></returns>
82
82
protected override bool ShouldMakeDistributedCall ( IEnumerable < IServerAddress > servers , ICacheRefresher refresher , MessageType dispatchType )
83
83
{
84
-
84
+
85
85
if ( _initialized == false )
86
86
{
87
87
return false ;
@@ -92,11 +92,11 @@ protected override bool ShouldMakeDistributedCall(IEnumerable<IServerAddress> se
92
92
}
93
93
94
94
protected override void PerformDistributedCall (
95
- IEnumerable < IServerAddress > servers ,
96
- ICacheRefresher refresher ,
97
- MessageType dispatchType ,
98
- IEnumerable < object > ids = null ,
99
- Type idArrayType = null ,
95
+ IEnumerable < IServerAddress > servers ,
96
+ ICacheRefresher refresher ,
97
+ MessageType dispatchType ,
98
+ IEnumerable < object > ids = null ,
99
+ Type idArrayType = null ,
100
100
string jsonPayload = null )
101
101
{
102
102
var msg = new DistributedMessage
@@ -131,7 +131,7 @@ internal void FirstSync()
131
131
// server and it will need to rebuild it's own persisted cache. Currently in that case it is Lucene and the xml
132
132
// cache file.
133
133
LogHelper . Warn < DatabaseServerMessenger > ( "No last synced Id found, this generally means this is a new server/install. The server will rebuild its caches and indexes and then adjust it's last synced id to the latest found in the database and will start maintaining cache updates based on that id" ) ;
134
-
134
+
135
135
//perform rebuilds if specified
136
136
if ( _options . RebuildingCallbacks != null )
137
137
{
@@ -140,8 +140,8 @@ internal void FirstSync()
140
140
callback ( ) ;
141
141
}
142
142
}
143
-
144
-
143
+
144
+
145
145
//go get the last id in the db and store it
146
146
var lastId = _appContext . DatabaseContext . Database . ExecuteScalar < int > (
147
147
"SELECT MAX(id) FROM umbracoCacheInstruction" ) ;
@@ -174,53 +174,69 @@ internal void Sync()
174
174
175
175
using ( DisposableTimer . DebugDuration < DatabaseServerMessenger > ( "Syncing from database..." ) )
176
176
{
177
- //get the outstanding items
178
-
179
- var sql = new Sql ( ) . Select ( "*" )
180
- . From < CacheInstructionDto > ( )
181
- . Where < CacheInstructionDto > ( dto => dto . Id > _lastId )
182
- . OrderBy < CacheInstructionDto > ( dto => dto . Id ) ;
183
-
184
- var list = _appContext . DatabaseContext . Database . Fetch < CacheInstructionDto > ( sql ) ;
185
-
186
- if ( list . Count > 0 )
187
- {
188
-
189
- // TODO: Here we could ignore anything that originated from this server since it will already have processed that
190
- // change locally (see notes in DatabaseServerMessenger.MessageSeversForIdsOrJson ). However, depending on the sequence
191
- // of events, I think this server will still need to process it's own instructions from the database to ensure the correct
192
- // instruction's sequence order is processed.
193
- //
194
- // var toProcess = list.Where(x => x.OriginatedFrom != GetOriginatorValue());
195
-
196
- foreach ( var item in list )
197
- {
198
- try
199
- {
200
- var jsonArray = JsonConvert . DeserializeObject < JArray > ( item . JsonInstruction ) ;
201
- UpdateRefreshers ( jsonArray ) ;
202
- }
203
- catch ( JsonException ex )
204
- {
205
- LogHelper . Error < DatabaseServerMessenger > ( "Could not deserialize a distributed cache instruction! Value: " + item . JsonInstruction , ex ) ;
206
- }
207
- }
208
-
209
- SaveLastSynced ( list . Max ( x => x . Id ) ) ;
210
- }
177
+ PerformSyncFromDb ( ) ;
211
178
212
179
//prune old records
213
- _appContext . DatabaseContext . Database . Delete < CacheInstructionDto > ( "WHERE utcStamp < @pruneDate" , new { pruneDate = DateTime . UtcNow . AddDays ( _options . DaysToRetainInstructionRecords * - 1 ) } ) ;
180
+ _appContext . DatabaseContext . Database . Delete < CacheInstructionDto > ( "WHERE utcStamp < @pruneDate" , new { pruneDate = DateTime . UtcNow . AddDays ( _options . DaysToRetainInstructionRecords * - 1 ) } ) ;
214
181
215
182
//reset
216
183
_syncing = false ;
217
184
}
218
-
219
185
}
220
186
}
221
187
}
222
188
}
223
189
190
+ /// <summary>
191
+ /// This checks outstanding cache instructions from the db and processes them
192
+ /// </summary>
193
+ /// <remarks>
194
+ /// This is not thread safe
195
+ /// </remarks>
196
+ private void PerformSyncFromDb ( )
197
+ {
198
+ //TODO: We 'could' recurse to ensure that no remaining instructions are pending in the table before proceeding but I don't think that
199
+ // would be a good idea since instructions could keep getting added and then all other threads will probably get stuck from serving requests
200
+ // (depending on what the cache refreshers are doing). I think it's best we do the one time check, process them and continue, if there are
201
+ // pending requests after being processed, they'll just be processed on the next poll.
202
+
203
+ //get the outstanding items
204
+
205
+ var sql = new Sql ( ) . Select ( "*" )
206
+ . From < CacheInstructionDto > ( )
207
+ . Where < CacheInstructionDto > ( dto => dto . Id > _lastId )
208
+ . OrderBy < CacheInstructionDto > ( dto => dto . Id ) ;
209
+
210
+ var list = _appContext . DatabaseContext . Database . Fetch < CacheInstructionDto > ( sql ) ;
211
+
212
+ if ( list . Count > 0 )
213
+ {
214
+
215
+ // TODO: Here we could ignore anything that originated from this server since it will already have processed that
216
+ // change locally (see notes in DatabaseServerMessenger.MessageSeversForIdsOrJson ). However, depending on the sequence
217
+ // of events, I think this server will still need to process it's own instructions from the database to ensure the correct
218
+ // instruction's sequence order is processed.
219
+ //
220
+ // var toProcess = list.Where(x => x.OriginatedFrom != GetOriginatorValue());
221
+
222
+ foreach ( var item in list )
223
+ {
224
+ try
225
+ {
226
+ var jsonArray = JsonConvert . DeserializeObject < JArray > ( item . JsonInstruction ) ;
227
+ UpdateRefreshers ( jsonArray ) ;
228
+ }
229
+ catch ( JsonException ex )
230
+ {
231
+ LogHelper . Error < DatabaseServerMessenger > ( "Could not deserialize a distributed cache instruction! Value: " + item . JsonInstruction , ex ) ;
232
+ }
233
+ }
234
+
235
+ SaveLastSynced ( list . Max ( x => x . Id ) ) ;
236
+ }
237
+ }
238
+
239
+
224
240
internal void UpdateRefreshers ( JArray jsonArray )
225
241
{
226
242
foreach ( var jsonItem in jsonArray )
@@ -287,7 +303,7 @@ internal void ReadLastSynced()
287
303
/// <returns></returns>
288
304
protected string GetOriginatorValue ( )
289
305
{
290
- return JsonConvert . SerializeObject ( new { machineName = NetworkHelper . MachineName , appDomainAppId = HttpRuntime . AppDomainAppId } ) ;
306
+ return JsonConvert . SerializeObject ( new { machineName = NetworkHelper . MachineName , appDomainAppId = HttpRuntime . AppDomainAppId } ) ;
291
307
}
292
308
293
309
/// <summary>
@@ -309,7 +325,7 @@ private void SaveLastSynced(int id)
309
325
File . WriteAllText ( Path . Combine ( tempFolder , HttpRuntime . AppDomainAppId . ReplaceNonAlphanumericChars ( string . Empty ) + "-lastsynced.txt" ) , id . ToString ( CultureInfo . InvariantCulture ) ) ;
310
326
}
311
327
312
-
328
+
313
329
#region Updates the refreshers
314
330
private void RefreshAll ( Guid uniqueIdentifier )
315
331
{
@@ -355,7 +371,7 @@ private void RemoveById(Guid uniqueIdentifier, int Id)
355
371
{
356
372
var cr = CacheRefreshersResolver . Current . GetById ( uniqueIdentifier ) ;
357
373
cr . Remove ( Id ) ;
358
- }
374
+ }
359
375
#endregion
360
376
361
377
/// <summary>
@@ -379,7 +395,7 @@ public void OnApplicationStarted(object sender, EventArgs eventArgs)
379
395
{
380
396
LogHelper . Warn < DatabaseServerMessenger > ( "The app is not configured or cannot connect to the database, this server cannot be initialized with " + typeof ( DatabaseServerMessenger ) + ", distributed calls will not be enabled for this server" ) ;
381
397
}
382
-
398
+
383
399
}
384
400
}
385
401
}
0 commit comments