From 24ba4bdabef1ee01f968455a42cbc1ccd8625254 Mon Sep 17 00:00:00 2001 From: Thomasr Date: Wed, 20 Nov 2024 03:34:25 -0500 Subject: [PATCH 1/3] convert snapshot migration to use aggregation pipeline --- .../runner/migrations/DatabaseChangelog.java | 45 +++++++++++-------- 1 file changed, 27 insertions(+), 18 deletions(-) diff --git a/server/api-service/lowcoder-server/src/main/java/org/lowcoder/runner/migrations/DatabaseChangelog.java b/server/api-service/lowcoder-server/src/main/java/org/lowcoder/runner/migrations/DatabaseChangelog.java index ddf0422ab..2cd26381a 100644 --- a/server/api-service/lowcoder-server/src/main/java/org/lowcoder/runner/migrations/DatabaseChangelog.java +++ b/server/api-service/lowcoder-server/src/main/java/org/lowcoder/runner/migrations/DatabaseChangelog.java @@ -44,6 +44,7 @@ import java.time.Instant; import java.time.temporal.ChronoUnit; +import java.util.Arrays; import java.util.List; import java.util.Set; @@ -319,35 +320,43 @@ public void addTimeSeriesSnapshotHistory(MongockTemplate mongoTemplate, CommonCo // Create the time-series collection if it doesn't exist if (!mongoTemplate.collectionExists(ApplicationHistorySnapshotTS.class)) { - if(mongoVersion < 5) { + if (mongoVersion < 5) { mongoTemplate.createCollection(ApplicationHistorySnapshotTS.class); } else { mongoTemplate.createCollection(ApplicationHistorySnapshotTS.class, CollectionOptions.empty().timeSeries("createdAt")); } } + Instant thresholdDate = Instant.now().minus(commonConfig.getQuery().getAppSnapshotKeepDuration(), ChronoUnit.DAYS); - List snapshots = mongoTemplate.find(new Query().addCriteria(Criteria.where("createdAt").gte(thresholdDate)), ApplicationHistorySnapshot.class); - snapshots.forEach(snapshot -> { - ApplicationHistorySnapshotTS applicationHistorySnapshotTS = new ApplicationHistorySnapshotTS(); - applicationHistorySnapshotTS.setApplicationId(snapshot.getApplicationId()); - applicationHistorySnapshotTS.setDsl(snapshot.getDsl()); - applicationHistorySnapshotTS.setContext(snapshot.getContext()); - applicationHistorySnapshotTS.setCreatedAt(snapshot.getCreatedAt()); - applicationHistorySnapshotTS.setCreatedBy(snapshot.getCreatedBy()); - applicationHistorySnapshotTS.setModifiedBy(snapshot.getModifiedBy()); - applicationHistorySnapshotTS.setUpdatedAt(snapshot.getUpdatedAt()); - applicationHistorySnapshotTS.setId(snapshot.getId()); - mongoTemplate.insert(applicationHistorySnapshotTS); - mongoTemplate.remove(snapshot); - }); - // Ensure indexes if needed + // Use aggregation to move and transform data + Document match = new Document("$match", + new Document("createdAt", new Document("$gte", thresholdDate))); + + Document project = new Document("$project", new Document() + .append("applicationId", 1) + .append("dsl", 1) + .append("context", 1) + .append("createdAt", 1) + .append("createdBy", 1) + .append("modifiedBy", 1) + .append("updatedAt", 1) + .append("id", "$_id")); // Map MongoDB's default `_id` to `id` if needed. + + Document out = new Document("$out", "applicationHistorySnapshotTS"); // Target collection name + + // Execute the aggregation pipeline + mongoTemplate.getDb() + .getCollection("applicationHistorySnapshot") // Original collection name + .aggregate(Arrays.asList(match, project, out)) + .toCollection(); + ensureIndexes(mongoTemplate, ApplicationHistorySnapshotTS.class, makeIndex("applicationId"), - makeIndex("createdAt") - ); + makeIndex("createdAt")); } + private void addGidField(MongockTemplate mongoTemplate, String collectionName) { // Create a query to match all documents Query query = new Query(); From 0c5c344ea72ce57d26edd093ed468dc8b45c1cd5 Mon Sep 17 00:00:00 2001 From: Thomasr Date: Wed, 20 Nov 2024 03:48:52 -0500 Subject: [PATCH 2/3] delete after copying records --- .../org/lowcoder/runner/migrations/DatabaseChangelog.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/server/api-service/lowcoder-server/src/main/java/org/lowcoder/runner/migrations/DatabaseChangelog.java b/server/api-service/lowcoder-server/src/main/java/org/lowcoder/runner/migrations/DatabaseChangelog.java index 2cd26381a..ba5324923 100644 --- a/server/api-service/lowcoder-server/src/main/java/org/lowcoder/runner/migrations/DatabaseChangelog.java +++ b/server/api-service/lowcoder-server/src/main/java/org/lowcoder/runner/migrations/DatabaseChangelog.java @@ -4,6 +4,7 @@ import com.github.cloudyrock.mongock.ChangeSet; import com.github.cloudyrock.mongock.driver.mongodb.springdata.v4.decorator.impl.MongockTemplate; import com.github.f4b6a3.uuid.UuidCreator; +import com.mongodb.client.result.DeleteResult; import lombok.extern.slf4j.Slf4j; import org.bson.Document; import org.lowcoder.domain.application.model.Application; @@ -351,6 +352,10 @@ public void addTimeSeriesSnapshotHistory(MongockTemplate mongoTemplate, CommonCo .aggregate(Arrays.asList(match, project, out)) .toCollection(); + // Delete the migrated records + Query deleteQuery = new Query(Criteria.where("createdAt").gte(thresholdDate)); + DeleteResult deleteResult = mongoTemplate.remove(deleteQuery, ApplicationHistorySnapshot.class); + ensureIndexes(mongoTemplate, ApplicationHistorySnapshotTS.class, makeIndex("applicationId"), makeIndex("createdAt")); From 1eca3afffdb63da4ca7b5dca9d673c0a8fd62853 Mon Sep 17 00:00:00 2001 From: Thomasr Date: Wed, 20 Nov 2024 03:58:20 -0500 Subject: [PATCH 3/3] different migration based on mongo version. --- .../runner/migrations/DatabaseChangelog.java | 101 ++++++++++++------ 1 file changed, 68 insertions(+), 33 deletions(-) diff --git a/server/api-service/lowcoder-server/src/main/java/org/lowcoder/runner/migrations/DatabaseChangelog.java b/server/api-service/lowcoder-server/src/main/java/org/lowcoder/runner/migrations/DatabaseChangelog.java index ba5324923..153fd765f 100644 --- a/server/api-service/lowcoder-server/src/main/java/org/lowcoder/runner/migrations/DatabaseChangelog.java +++ b/server/api-service/lowcoder-server/src/main/java/org/lowcoder/runner/migrations/DatabaseChangelog.java @@ -4,6 +4,8 @@ import com.github.cloudyrock.mongock.ChangeSet; import com.github.cloudyrock.mongock.driver.mongodb.springdata.v4.decorator.impl.MongockTemplate; import com.github.f4b6a3.uuid.UuidCreator; +import com.mongodb.client.MongoCollection; +import com.mongodb.client.MongoCursor; import com.mongodb.client.result.DeleteResult; import lombok.extern.slf4j.Slf4j; import org.bson.Document; @@ -315,47 +317,80 @@ private int getMongoDBVersion(MongockTemplate mongoTemplate) { @ChangeSet(order = "026", id = "add-time-series-snapshot-history", author = "") public void addTimeSeriesSnapshotHistory(MongockTemplate mongoTemplate, CommonConfig commonConfig) { int mongoVersion = getMongoDBVersion(mongoTemplate); - if (mongoVersion < 5) { - log.warn("MongoDB version is below 5. Time-series collections are not supported. Upgrade the MongoDB version."); - } - - // Create the time-series collection if it doesn't exist - if (!mongoTemplate.collectionExists(ApplicationHistorySnapshotTS.class)) { - if (mongoVersion < 5) { - mongoTemplate.createCollection(ApplicationHistorySnapshotTS.class); - } else { - mongoTemplate.createCollection(ApplicationHistorySnapshotTS.class, CollectionOptions.empty().timeSeries("createdAt")); - } - } Instant thresholdDate = Instant.now().minus(commonConfig.getQuery().getAppSnapshotKeepDuration(), ChronoUnit.DAYS); - // Use aggregation to move and transform data - Document match = new Document("$match", - new Document("createdAt", new Document("$gte", thresholdDate))); + if (mongoVersion >= 5) { + // MongoDB version >= 5: Use manual insert query + if (!mongoTemplate.collectionExists(ApplicationHistorySnapshotTS.class)) { + mongoTemplate.createCollection(ApplicationHistorySnapshotTS.class, + CollectionOptions.empty().timeSeries("createdAt")); + } - Document project = new Document("$project", new Document() - .append("applicationId", 1) - .append("dsl", 1) - .append("context", 1) - .append("createdAt", 1) - .append("createdBy", 1) - .append("modifiedBy", 1) - .append("updatedAt", 1) - .append("id", "$_id")); // Map MongoDB's default `_id` to `id` if needed. + // Aggregation pipeline to fetch the records + List aggregationPipeline = Arrays.asList( + new Document("$match", new Document("createdAt", new Document("$gte", thresholdDate))), + new Document("$project", new Document() + .append("applicationId", 1) + .append("dsl", 1) + .append("context", 1) + .append("createdAt", 1) + .append("createdBy", 1) + .append("modifiedBy", 1) + .append("updatedAt", 1) + .append("id", "$_id")) // Map `_id` to `id` if needed + ); + + MongoCollection sourceCollection = mongoTemplate.getDb().getCollection("applicationHistorySnapshot"); + MongoCollection targetCollection = mongoTemplate.getDb().getCollection("applicationHistorySnapshotTS"); + + // Fetch results and insert them into the time-series collection + try (MongoCursor cursor = sourceCollection.aggregate(aggregationPipeline).iterator()) { + while (cursor.hasNext()) { + Document document = cursor.next(); + targetCollection.insertOne(document); // Insert into the time-series collection + } + } - Document out = new Document("$out", "applicationHistorySnapshotTS"); // Target collection name + // Delete the migrated records + Query deleteQuery = new Query(Criteria.where("createdAt").gte(thresholdDate)); + DeleteResult deleteResult = mongoTemplate.remove(deleteQuery, ApplicationHistorySnapshot.class); - // Execute the aggregation pipeline - mongoTemplate.getDb() - .getCollection("applicationHistorySnapshot") // Original collection name - .aggregate(Arrays.asList(match, project, out)) - .toCollection(); + log.info("Deleted {} records from the source collection.", deleteResult.getDeletedCount()); + } else { + // MongoDB version < 5: Use aggregation with $out + if (!mongoTemplate.collectionExists(ApplicationHistorySnapshotTS.class)) { + mongoTemplate.createCollection(ApplicationHistorySnapshotTS.class); // Create a regular collection + } - // Delete the migrated records - Query deleteQuery = new Query(Criteria.where("createdAt").gte(thresholdDate)); - DeleteResult deleteResult = mongoTemplate.remove(deleteQuery, ApplicationHistorySnapshot.class); + // Aggregation pipeline with $out + List aggregationPipeline = Arrays.asList( + new Document("$match", new Document("createdAt", new Document("$gte", thresholdDate))), + new Document("$project", new Document() + .append("applicationId", 1) + .append("dsl", 1) + .append("context", 1) + .append("createdAt", 1) + .append("createdBy", 1) + .append("modifiedBy", 1) + .append("updatedAt", 1) + .append("id", "$_id")), // Map `_id` to `id` if needed + new Document("$out", "applicationHistorySnapshotTS") // Write directly to the target collection + ); + + mongoTemplate.getDb() + .getCollection("applicationHistorySnapshot") + .aggregate(aggregationPipeline) + .toCollection(); + + // Delete the migrated records + Query deleteQuery = new Query(Criteria.where("createdAt").gte(thresholdDate)); + DeleteResult deleteResult = mongoTemplate.remove(deleteQuery, ApplicationHistorySnapshot.class); + + log.info("Deleted {} records from the source collection.", deleteResult.getDeletedCount()); + } + // Ensure indexes on the new collection ensureIndexes(mongoTemplate, ApplicationHistorySnapshotTS.class, makeIndex("applicationId"), makeIndex("createdAt"));