2
2
3
3
import lombok .RequiredArgsConstructor ;
4
4
import lombok .extern .slf4j .Slf4j ;
5
- import org .lowcoder .domain .application .model .ApplicationHistorySnapshot ;
6
- import org .lowcoder .domain .application .model .ApplicationHistorySnapshotTS ;
7
5
import org .lowcoder .sdk .config .CommonConfig ;
8
6
import org .springframework .data .mongodb .core .MongoTemplate ;
9
- import org .springframework .data .mongodb .core .query .Criteria ;
10
- import org .springframework .data .mongodb .core .query .Query ;
11
7
import org .springframework .scheduling .annotation .Scheduled ;
12
8
import org .springframework .stereotype .Component ;
13
9
16
12
import java .util .List ;
17
13
import java .util .concurrent .TimeUnit ;
18
14
15
+ import com .mongodb .client .MongoCollection ;
16
+ import com .mongodb .client .MongoCursor ;
17
+ import com .mongodb .client .model .Filters ;
18
+ import org .bson .Document ;
19
+
19
20
@ Slf4j
20
21
@ RequiredArgsConstructor
21
22
@ Component
@@ -24,23 +25,122 @@ public class ArchiveSnapshotTask {
24
25
private final CommonConfig commonConfig ;
25
26
private final MongoTemplate mongoTemplate ;
26
27
27
- @ Scheduled (initialDelay = 1 , fixedRate = 1 , timeUnit = TimeUnit .DAYS )
28
+ @ Scheduled (initialDelay = 0 , fixedRate = 1 , timeUnit = TimeUnit .DAYS )
28
29
public void archive () {
30
+ int mongoVersion = getMongoDBVersion ();
29
31
Instant thresholdDate = Instant .now ().minus (commonConfig .getQuery ().getAppSnapshotKeepDuration (), ChronoUnit .DAYS );
30
- List <ApplicationHistorySnapshotTS > snapshots = mongoTemplate .find (new Query ().addCriteria (Criteria .where ("createdAt" ).lte (thresholdDate )), ApplicationHistorySnapshotTS .class );
31
- snapshots .forEach (snapshot -> {
32
- ApplicationHistorySnapshot applicationHistorySnapshot = new ApplicationHistorySnapshot ();
33
- applicationHistorySnapshot .setApplicationId (snapshot .getApplicationId ());
34
- applicationHistorySnapshot .setDsl (snapshot .getDsl ());
35
- applicationHistorySnapshot .setContext (snapshot .getContext ());
36
- applicationHistorySnapshot .setCreatedAt (snapshot .getCreatedAt ());
37
- applicationHistorySnapshot .setCreatedBy (snapshot .getCreatedBy ());
38
- applicationHistorySnapshot .setModifiedBy (snapshot .getModifiedBy ());
39
- applicationHistorySnapshot .setUpdatedAt (snapshot .getUpdatedAt ());
40
- applicationHistorySnapshot .setId (snapshot .getId ());
41
- mongoTemplate .insert (applicationHistorySnapshot );
42
- mongoTemplate .remove (snapshot );
43
- });
32
+
33
+ if (mongoVersion >= 5 ) {
34
+ archiveForVersion5AndAbove (thresholdDate );
35
+ } else {
36
+ archiveForVersionBelow5 (thresholdDate );
37
+ }
38
+ }
39
+
40
+ private int getMongoDBVersion () {
41
+ Document buildInfo = mongoTemplate .getDb ().runCommand (new Document ("buildInfo" , 1 ));
42
+ String version = buildInfo .getString ("version" );
43
+ return Integer .parseInt (version .split ("\\ ." )[0 ]); // Parse major version
44
44
}
45
45
46
+ private void archiveForVersion5AndAbove (Instant thresholdDate ) {
47
+ log .info ("Running archival for MongoDB version >= 5" );
48
+
49
+ MongoCollection <Document > sourceCollection = mongoTemplate .getDb ().getCollection ("applicationHistorySnapshotTS" );
50
+ MongoCollection <Document > targetCollection = mongoTemplate .getDb ().getCollection ("applicationHistorySnapshot" );
51
+
52
+ long totalDocuments = sourceCollection .countDocuments (Filters .lte ("createdAt" , thresholdDate ));
53
+ log .info ("Total documents to archive: {}" , totalDocuments );
54
+
55
+ long processedCount = 0 ;
56
+
57
+ try (MongoCursor <Document > cursor = sourceCollection .find (Filters .lte ("createdAt" , thresholdDate )).iterator ()) {
58
+ while (cursor .hasNext ()) {
59
+ Document document = cursor .next ();
60
+
61
+ // Transform the document for the target collection
62
+ document .put ("id" , document .getObjectId ("_id" )); // Map `_id` to `id`
63
+ document .remove ("_id" );
64
+
65
+ // Insert the document into the target collection
66
+ try {
67
+ targetCollection .insertOne (document );
68
+ } catch (Exception e ) {
69
+ log .error ("Failed to insert document with ID {}. Error: {}" , document .getObjectId ("id" ), e .getMessage ());
70
+ continue ;
71
+ }
72
+
73
+ // Remove the document from the source collection
74
+ try {
75
+ sourceCollection .deleteOne (Filters .eq ("_id" , document .getObjectId ("id" )));
76
+ } catch (Exception e ) {
77
+ log .error ("Failed to delete document with ID {}. Error: {}" , document .getObjectId ("id" ), e .getMessage ());
78
+ continue ;
79
+ }
80
+
81
+ processedCount ++;
82
+ log .info ("Processed document {} / {}" , processedCount , totalDocuments );
83
+ }
84
+ } catch (Exception e ) {
85
+ log .error ("Failed during archival process. Error: {}" , e .getMessage ());
86
+ }
87
+
88
+ log .info ("Archival process completed. Total documents archived: {}" , processedCount );
89
+ }
90
+
91
+ private void archiveForVersionBelow5 (Instant thresholdDate ) {
92
+ log .info ("Running archival for MongoDB version < 5" );
93
+
94
+ MongoCollection <Document > sourceCollection = mongoTemplate .getDb ().getCollection ("applicationHistorySnapshotTS" );
95
+
96
+ long totalDocuments = sourceCollection .countDocuments (Filters .lte ("createdAt" , thresholdDate ));
97
+ log .info ("Total documents to archive: {}" , totalDocuments );
98
+
99
+ long processedCount = 0 ;
100
+
101
+ try (MongoCursor <Document > cursor = sourceCollection .find (Filters .lte ("createdAt" , thresholdDate )).iterator ()) {
102
+ while (cursor .hasNext ()) {
103
+ Document document = cursor .next ();
104
+
105
+ // Transform the document for the target collection
106
+ document .put ("id" , document .getObjectId ("_id" )); // Map `_id` to `id`
107
+ document .remove ("_id" );
108
+
109
+ // Use aggregation with $out for the single document
110
+ try {
111
+ sourceCollection .aggregate (List .of (
112
+ Filters .eq ("_id" , document .getObjectId ("id" )),
113
+ new Document ("$project" , new Document ()
114
+ .append ("applicationId" , document .get ("applicationId" ))
115
+ .append ("dsl" , document .get ("dsl" ))
116
+ .append ("context" , document .get ("context" ))
117
+ .append ("createdAt" , document .get ("createdAt" ))
118
+ .append ("createdBy" , document .get ("createdBy" ))
119
+ .append ("modifiedBy" , document .get ("modifiedBy" ))
120
+ .append ("updatedAt" , document .get ("updatedAt" ))
121
+ .append ("id" , document .get ("id" ))),
122
+ new Document ("$out" , "applicationHistorySnapshot" )
123
+ )).first ();
124
+ } catch (Exception e ) {
125
+ log .error ("Failed to aggregate and insert document with ID {}. Error: {}" , document .getObjectId ("id" ), e .getMessage ());
126
+ continue ;
127
+ }
128
+
129
+ // Remove the document from the source collection
130
+ try {
131
+ sourceCollection .deleteOne (Filters .eq ("_id" , document .getObjectId ("id" )));
132
+ } catch (Exception e ) {
133
+ log .error ("Failed to delete document with ID {}. Error: {}" , document .getObjectId ("id" ), e .getMessage ());
134
+ continue ;
135
+ }
136
+
137
+ processedCount ++;
138
+ log .info ("Processed document {} / {}" , processedCount , totalDocuments );
139
+ }
140
+ } catch (Exception e ) {
141
+ log .error ("Failed during archival process. Error: {}" , e .getMessage ());
142
+ }
143
+
144
+ log .info ("Archival process completed. Total documents archived: {}" , processedCount );
145
+ }
46
146
}
0 commit comments