Skip to content

FIX: Ensure copy_data callbacks run even when all rows are skipped #33002

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jun 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 9 additions & 10 deletions migrations/lib/importer/copy_step.rb
Original file line number Diff line number Diff line change
Expand Up @@ -75,35 +75,34 @@ def execute
def copy_data
table_name = self.class.table_name || self.class.name&.demodulize&.underscore
column_names = self.class.column_names || @discourse_db.column_names(table_name)
skipped_rows = []

if self.class.store_mapped_ids?
@last_id = @discourse_db.last_id_of(table_name)
@mapping_type = find_mapping_type(table_name)
end

@discourse_db.copy_data(table_name, column_names, fetch_rows(skipped_rows)) do |inserted_rows|
after_commit_of_inserted_rows(inserted_rows)

if skipped_rows.any?
after_commit_of_skipped_rows(skipped_rows)
skipped_rows.clear
end
@discourse_db.copy_data(table_name, column_names, fetch_rows) do |inserted_rows, skipped_rows|
after_commit_of_inserted_rows(inserted_rows) if inserted_rows.any?
after_commit_of_skipped_rows(skipped_rows) if skipped_rows.any?
end

@discourse_db.fix_last_id_of(table_name) if self.class.store_mapped_ids?
@intermediate_db.commit_transaction
end

def fetch_rows(skipped_rows)
def fetch_rows
skip_row_marker = ::Migrations::Importer::DiscourseDB::SKIP_ROW_MARKER

Enumerator.new do |enumerator|
query, parameters = self.class.rows_query

@intermediate_db.query(query, *parameters) do |row|
if (transformed_row = transform_row(row))
enumerator << transformed_row
@stats.reset
else
skipped_rows << row
row[skip_row_marker] = true
enumerator << row
@stats.reset(skip_count: 1)
end

Expand Down
24 changes: 22 additions & 2 deletions migrations/lib/importer/discourse_db.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
module Migrations::Importer
class DiscourseDB
COPY_BATCH_SIZE = 1_000
SKIP_ROW_MARKER = :"$skip"

def initialize
@encoder = PG::TextEncoder::CopyRow.new
Expand All @@ -14,6 +15,11 @@ def copy_data(table_name, column_names, rows)
quoted_column_name_list = column_names.map { |c| quote_identifier(c) }.join(",")
sql = "COPY #{table_name} (#{quoted_column_name_list}) FROM STDIN"

inserted_rows = []
skipped_rows = []
Comment on lines +18 to +19
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the same spirit of micro-optimizing, what do you think about pre-allocating these too? We know they won't be larger than COPY_BATCH_SIZE

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea

column_count = column_names.size
data = Array.new(column_count)

rows.each_slice(COPY_BATCH_SIZE) do |sliced_rows|
# TODO Maybe add error handling and check if all rows fail to insert, or only
# some of them fail. Currently, if a single row fails to insert, then an exception
Expand All @@ -23,14 +29,28 @@ def copy_data(table_name, column_names, rows)
@connection.transaction do
@connection.copy_data(sql, @encoder) do
sliced_rows.each do |row|
data = column_names.map { |c| row[c] }
if row[SKIP_ROW_MARKER]
skipped_rows << row
next
end

i = 0
while i < column_count
data[i] = row[column_names[i]]
i += 1
end

@connection.put_copy_data(data)
inserted_rows << row
end
end

# give the caller a chance to do some work when a batch has been committed,
# for example, to store ID mappings
yield sliced_rows
yield inserted_rows, skipped_rows

inserted_rows.clear
skipped_rows.clear
end
end

Expand Down