@@ -68,7 +68,7 @@ public void startBootstrap(RowMap startBootstrapRow, Schema schema, AbstractProd
68
68
LOGGER .info (String .format ("bootstrapping started for %s.%s, binlog position is %s" , databaseName , tableName , position .toString ()));
69
69
try ( Connection connection = getConnection () ) {
70
70
setBootstrapRowToStarted (startBootstrapRow , connection );
71
- ResultSet resultSet = getAllRows (databaseName , tableName , connection );
71
+ ResultSet resultSet = getAllRows (databaseName , tableName , schema , connection );
72
72
int insertedRows = 0 ;
73
73
while ( resultSet .next () ) {
74
74
RowMap row = new RowMap (
@@ -165,9 +165,14 @@ private void ensureTable(String tableName, Database database) {
165
165
findTable (tableName , database );
166
166
}
167
167
168
- private ResultSet getAllRows (String databaseName , String tableName , Connection connection ) throws SQLException , InterruptedException {
168
+ private ResultSet getAllRows (String databaseName , String tableName , Schema schema , Connection connection ) throws SQLException , InterruptedException {
169
169
Statement statement = createBatchStatement (connection );
170
- return statement .executeQuery (String .format ("select * from %s.%s" , databaseName , tableName ));
170
+ String pk = schema .findDatabase (databaseName ).findTable (tableName ).getPKString ();
171
+ if ( pk != null ) {
172
+ return statement .executeQuery (String .format ("select * from %s.%s order by %s" , databaseName , tableName , pk ));
173
+ } else {
174
+ return statement .executeQuery (String .format ("select * from %s.%s" , databaseName , tableName ));
175
+ }
171
176
}
172
177
173
178
private Statement createBatchStatement (Connection connection ) throws SQLException , InterruptedException {
0 commit comments