Skip to content

Commit b605907

Browse files
dragonpooludomikula
authored andcommitted
modify to use aggregate
1 parent dc98eb3 commit b605907

File tree

1 file changed

+119
-19
lines changed

1 file changed

+119
-19
lines changed

server/api-service/lowcoder-server/src/main/java/org/lowcoder/runner/task/ArchiveSnapshotTask.java

Lines changed: 119 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,8 @@
22

33
import lombok.RequiredArgsConstructor;
44
import lombok.extern.slf4j.Slf4j;
5-
import org.lowcoder.domain.application.model.ApplicationHistorySnapshot;
6-
import org.lowcoder.domain.application.model.ApplicationHistorySnapshotTS;
75
import org.lowcoder.sdk.config.CommonConfig;
86
import org.springframework.data.mongodb.core.MongoTemplate;
9-
import org.springframework.data.mongodb.core.query.Criteria;
10-
import org.springframework.data.mongodb.core.query.Query;
117
import org.springframework.scheduling.annotation.Scheduled;
128
import org.springframework.stereotype.Component;
139

@@ -16,6 +12,11 @@
1612
import java.util.List;
1713
import java.util.concurrent.TimeUnit;
1814

15+
import com.mongodb.client.MongoCollection;
16+
import com.mongodb.client.MongoCursor;
17+
import com.mongodb.client.model.Filters;
18+
import org.bson.Document;
19+
1920
@Slf4j
2021
@RequiredArgsConstructor
2122
@Component
@@ -24,23 +25,122 @@ public class ArchiveSnapshotTask {
2425
private final CommonConfig commonConfig;
2526
private final MongoTemplate mongoTemplate;
2627

27-
@Scheduled(initialDelay = 1, fixedRate = 1, timeUnit = TimeUnit.DAYS)
28+
@Scheduled(initialDelay = 0, fixedRate = 1, timeUnit = TimeUnit.DAYS)
2829
public void archive() {
30+
int mongoVersion = getMongoDBVersion();
2931
Instant thresholdDate = Instant.now().minus(commonConfig.getQuery().getAppSnapshotKeepDuration(), ChronoUnit.DAYS);
30-
List<ApplicationHistorySnapshotTS> snapshots = mongoTemplate.find(new Query().addCriteria(Criteria.where("createdAt").lte(thresholdDate)), ApplicationHistorySnapshotTS.class);
31-
snapshots.forEach(snapshot -> {
32-
ApplicationHistorySnapshot applicationHistorySnapshot = new ApplicationHistorySnapshot();
33-
applicationHistorySnapshot.setApplicationId(snapshot.getApplicationId());
34-
applicationHistorySnapshot.setDsl(snapshot.getDsl());
35-
applicationHistorySnapshot.setContext(snapshot.getContext());
36-
applicationHistorySnapshot.setCreatedAt(snapshot.getCreatedAt());
37-
applicationHistorySnapshot.setCreatedBy(snapshot.getCreatedBy());
38-
applicationHistorySnapshot.setModifiedBy(snapshot.getModifiedBy());
39-
applicationHistorySnapshot.setUpdatedAt(snapshot.getUpdatedAt());
40-
applicationHistorySnapshot.setId(snapshot.getId());
41-
mongoTemplate.insert(applicationHistorySnapshot);
42-
mongoTemplate.remove(snapshot);
43-
});
32+
33+
if (mongoVersion >= 5) {
34+
archiveForVersion5AndAbove(thresholdDate);
35+
} else {
36+
archiveForVersionBelow5(thresholdDate);
37+
}
38+
}
39+
40+
private int getMongoDBVersion() {
41+
Document buildInfo = mongoTemplate.getDb().runCommand(new Document("buildInfo", 1));
42+
String version = buildInfo.getString("version");
43+
return Integer.parseInt(version.split("\\.")[0]); // Parse major version
4444
}
4545

46+
private void archiveForVersion5AndAbove(Instant thresholdDate) {
47+
log.info("Running archival for MongoDB version >= 5");
48+
49+
MongoCollection<Document> sourceCollection = mongoTemplate.getDb().getCollection("applicationHistorySnapshotTS");
50+
MongoCollection<Document> targetCollection = mongoTemplate.getDb().getCollection("applicationHistorySnapshot");
51+
52+
long totalDocuments = sourceCollection.countDocuments(Filters.lte("createdAt", thresholdDate));
53+
log.info("Total documents to archive: {}", totalDocuments);
54+
55+
long processedCount = 0;
56+
57+
try (MongoCursor<Document> cursor = sourceCollection.find(Filters.lte("createdAt", thresholdDate)).iterator()) {
58+
while (cursor.hasNext()) {
59+
Document document = cursor.next();
60+
61+
// Transform the document for the target collection
62+
document.put("id", document.getObjectId("_id")); // Map `_id` to `id`
63+
document.remove("_id");
64+
65+
// Insert the document into the target collection
66+
try {
67+
targetCollection.insertOne(document);
68+
} catch (Exception e) {
69+
log.error("Failed to insert document with ID {}. Error: {}", document.getObjectId("id"), e.getMessage());
70+
continue;
71+
}
72+
73+
// Remove the document from the source collection
74+
try {
75+
sourceCollection.deleteOne(Filters.eq("_id", document.getObjectId("id")));
76+
} catch (Exception e) {
77+
log.error("Failed to delete document with ID {}. Error: {}", document.getObjectId("id"), e.getMessage());
78+
continue;
79+
}
80+
81+
processedCount++;
82+
log.info("Processed document {} / {}", processedCount, totalDocuments);
83+
}
84+
} catch (Exception e) {
85+
log.error("Failed during archival process. Error: {}", e.getMessage());
86+
}
87+
88+
log.info("Archival process completed. Total documents archived: {}", processedCount);
89+
}
90+
91+
private void archiveForVersionBelow5(Instant thresholdDate) {
92+
log.info("Running archival for MongoDB version < 5");
93+
94+
MongoCollection<Document> sourceCollection = mongoTemplate.getDb().getCollection("applicationHistorySnapshotTS");
95+
96+
long totalDocuments = sourceCollection.countDocuments(Filters.lte("createdAt", thresholdDate));
97+
log.info("Total documents to archive: {}", totalDocuments);
98+
99+
long processedCount = 0;
100+
101+
try (MongoCursor<Document> cursor = sourceCollection.find(Filters.lte("createdAt", thresholdDate)).iterator()) {
102+
while (cursor.hasNext()) {
103+
Document document = cursor.next();
104+
105+
// Transform the document for the target collection
106+
document.put("id", document.getObjectId("_id")); // Map `_id` to `id`
107+
document.remove("_id");
108+
109+
// Use aggregation with $out for the single document
110+
try {
111+
sourceCollection.aggregate(List.of(
112+
Filters.eq("_id", document.getObjectId("id")),
113+
new Document("$project", new Document()
114+
.append("applicationId", document.get("applicationId"))
115+
.append("dsl", document.get("dsl"))
116+
.append("context", document.get("context"))
117+
.append("createdAt", document.get("createdAt"))
118+
.append("createdBy", document.get("createdBy"))
119+
.append("modifiedBy", document.get("modifiedBy"))
120+
.append("updatedAt", document.get("updatedAt"))
121+
.append("id", document.get("id"))),
122+
new Document("$out", "applicationHistorySnapshot")
123+
)).first();
124+
} catch (Exception e) {
125+
log.error("Failed to aggregate and insert document with ID {}. Error: {}", document.getObjectId("id"), e.getMessage());
126+
continue;
127+
}
128+
129+
// Remove the document from the source collection
130+
try {
131+
sourceCollection.deleteOne(Filters.eq("_id", document.getObjectId("id")));
132+
} catch (Exception e) {
133+
log.error("Failed to delete document with ID {}. Error: {}", document.getObjectId("id"), e.getMessage());
134+
continue;
135+
}
136+
137+
processedCount++;
138+
log.info("Processed document {} / {}", processedCount, totalDocuments);
139+
}
140+
} catch (Exception e) {
141+
log.error("Failed during archival process. Error: {}", e.getMessage());
142+
}
143+
144+
log.info("Archival process completed. Total documents archived: {}", processedCount);
145+
}
46146
}

0 commit comments

Comments
 (0)