@@ -5816,6 +5816,34 @@ handle_error(NdbEventOperation *pOp)
5816
5816
DBUG_RETURN (0 );
5817
5817
}
5818
5818
5819
+ /*
5820
+ * Handle exceptional events- event buffer inconsistent and full
5821
+ * by injecting a GAP record into the binlog.
5822
+ */
5823
+ static
5824
+ void
5825
+ handle_gap_epoch (THD *thd,
5826
+ NdbDictionary::Event::TableEvent event_type,
5827
+ Uint64 gap_epoch,
5828
+ injector *inj) {
5829
+ char reason[20 ];
5830
+ if (event_type == NdbDictionary::Event::TE_INCONSISTENT) {
5831
+ my_snprintf (reason, sizeof (reason), " missing data" );
5832
+ }
5833
+ else if (event_type == NdbDictionary::Event::TE_OUT_OF_MEMORY) {
5834
+ my_snprintf (reason, sizeof (reason), " event buffer full" );
5835
+ }
5836
+
5837
+ char errmsg[80 ];
5838
+ my_snprintf (errmsg, sizeof (errmsg),
5839
+ " Detected %s in GCI %llu, "
5840
+ " inserting GAP event" ,
5841
+ reason, gap_epoch);
5842
+ DBUG_PRINT (" info" , (" %s" , errmsg));
5843
+
5844
+ LEX_STRING const msg= { C_STRING_WITH_LEN (errmsg) };
5845
+ inj->record_incident (thd, binary_log::Incident_event::INCIDENT_LOST_EVENTS, msg);
5846
+ }
5819
5847
5820
5848
/*
5821
5849
Handle _non_ data events from the storage nodes
@@ -7185,7 +7213,7 @@ Ndb_binlog_thread::do_run()
7185
7213
7186
7214
my_thread_yield ();
7187
7215
mysql_mutex_lock (&injector_event_mutex);
7188
- res= i_ndb->pollEvents (10 , &gci);
7216
+ res= i_ndb->pollEvents2 (10 , &gci);
7189
7217
mysql_mutex_unlock (&injector_event_mutex);
7190
7218
}
7191
7219
if (gci > schema_gci)
@@ -7313,13 +7341,18 @@ Ndb_binlog_thread::do_run()
7313
7341
// Capture any dynamic changes to max_alloc
7314
7342
i_ndb->set_eventbuf_max_alloc (opt_ndb_eventbuffer_max_alloc);
7315
7343
7344
+ if (ndb_log_empty_epochs ()) {
7345
+ // Ensure that empty epochs (event type TE_EMPTY) are queued
7346
+ i_ndb->setEventBufferQueueEmptyEpoch (true );
7347
+ }
7348
+
7316
7349
mysql_mutex_lock (&injector_event_mutex);
7317
7350
Uint64 latest_epoch= 0 ;
7318
7351
const int poll_wait= (ndb_binlog_running) ? tot_poll_wait : 0 ;
7319
- const int res= i_ndb->pollEvents (poll_wait, &latest_epoch);
7352
+ const int res= i_ndb->pollEvents2 (poll_wait, &latest_epoch);
7320
7353
(void )res; // Unused except DBUG_PRINT
7321
7354
mysql_mutex_unlock (&injector_event_mutex);
7322
- i_pOp = i_ndb->nextEvent ();
7355
+ i_pOp = i_ndb->nextEvent2 ();
7323
7356
if (ndb_binlog_running)
7324
7357
{
7325
7358
ndb_latest_received_binlog_epoch= latest_epoch;
@@ -7476,7 +7509,6 @@ Ndb_binlog_thread::do_run()
7476
7509
updateInjectorStats (s_ndb, i_ndb);
7477
7510
}
7478
7511
7479
- Uint64 inconsistent_epoch= 0 ;
7480
7512
if (!ndb_binlog_running)
7481
7513
{
7482
7514
/*
@@ -7495,127 +7527,151 @@ Ndb_binlog_thread::do_run()
7495
7527
}
7496
7528
updateInjectorStats (s_ndb, i_ndb);
7497
7529
}
7530
+ /*
7531
+ * The logic of handling the current epoch:
7532
+ * if <current event operation belongs to current_epoch)> {
7533
+ * if <event_type of the op is a gap> {
7534
+ * <handle gap epoch>; <get nextEvent2>
7535
+ * } else {
7536
+ * if <empty epoch> {
7537
+ * <handle empty epoch>; <get nextEvent2>
7538
+ * } else { // handle non-empty epoch
7539
+ * for <all ops belonging to the current epoch>
7540
+ * <handle one op> ; <get nextEvent2>
7541
+ * } // else: handle non-empty epoch
7542
+ * } // else: handle non-gap epoch
7543
+ * } // else if (i_pOp != NULL ..
7544
+ */
7545
+ else if (i_pOp != NULL && i_pOp->getEpoch () == current_epoch) {
7498
7546
7499
- // i_pOp == NULL means an inconsistent epoch or the queue is empty
7500
- else if (i_pOp == NULL && !i_ndb->isConsistent (inconsistent_epoch))
7501
- {
7502
- char errmsg[64 ];
7503
- uint end= sprintf (&errmsg[0 ],
7504
- " Detected missing data in GCI %llu, "
7505
- " inserting GAP event" , inconsistent_epoch);
7506
- errmsg[end]= ' \0 ' ;
7507
- DBUG_PRINT (" info" ,
7508
- (" Detected missing data in GCI %llu, "
7509
- " inserting GAP event" , inconsistent_epoch));
7510
- LEX_STRING const msg= { C_STRING_WITH_LEN (errmsg) };
7511
- inj->record_incident (thd,
7512
- binary_log::Incident_event::INCIDENT_LOST_EVENTS,
7513
- msg);
7514
- }
7547
+ // Handle current_epoch which can be a gap, empty or a non-empty epoch
7548
+ const NdbDictionary::Event::TableEvent event_type = i_pOp->getEventType2 ();
7515
7549
7516
- /* Handle all events withing 'current_epoch', or possible
7517
- * log an empty epoch if log_empty_epoch is specified.
7518
- */
7519
- else if ((i_pOp != NULL && i_pOp->getEpoch () == current_epoch) ||
7520
- (ndb_log_empty_epochs () &&
7521
- current_epoch > ndb_latest_handled_binlog_epoch))
7522
- {
7523
- thd->proc_info = " Processing events" ;
7524
- ndb_binlog_index_row _row;
7525
- ndb_binlog_index_row *rows= &_row;
7526
- injector::transaction trans;
7527
- unsigned trans_row_count= 0 ;
7528
- unsigned trans_slave_row_count= 0 ;
7529
- rows= &_row;
7530
- memset (&_row, 0 , sizeof (_row));
7531
- thd->variables .character_set_client = &my_charset_latin1;
7532
- DBUG_PRINT (" info" , (" Initializing transaction" ));
7533
- inj->new_trans (thd, &trans);
7534
-
7535
- if (i_pOp == NULL || i_pOp->getEpoch () != current_epoch)
7536
- {
7537
- /*
7538
- Must be an empty epoch since the condition
7539
- (ndb_log_empty_epochs() &&
7540
- current_epoch > ndb_latest_handled_binlog_epoch)
7541
- must be true we write empty epoch into
7542
- ndb_binlog_index
7543
- */
7544
- assert (ndb_log_empty_epochs ());
7545
- assert (current_epoch > ndb_latest_handled_binlog_epoch);
7546
- DBUG_PRINT (" info" , (" Writing empty epoch for gci %llu" , current_epoch));
7547
- commit_trans (thd, trans, current_epoch, rows, trans_row_count,
7548
- trans_slave_row_count);
7550
+ if (event_type == NdbDictionary::Event::TE_INCONSISTENT ||
7551
+ event_type == NdbDictionary::Event::TE_OUT_OF_MEMORY) {
7552
+
7553
+ DBUG_PRINT (" info" , (" Handle gap epoch" ));
7554
+ handle_gap_epoch (thd, event_type, current_epoch, inj);
7555
+
7556
+ i_pOp= i_ndb->nextEvent2 ();
7557
+
7558
+ // No need to updateInjectorStats(), since these events are
7559
+ // created by the event buffer, not sent by Ndb.
7549
7560
}
7550
- else
7551
- {
7552
- assert (i_pOp != NULL && i_pOp->getEpoch () == current_epoch);
7553
- DBUG_PRINT (" info" , (" Handling epoch: %u/%u" ,
7554
- (uint)(current_epoch >> 32 ),
7555
- (uint)(current_epoch)));
7556
- // sometimes get TE_ALTER with invalid table
7557
- assert (i_pOp->getEventType () == NdbDictionary::Event::TE_ALTER ||
7558
- ! IS_NDB_BLOB_PREFIX (i_pOp->getEvent ()->getTable ()->getName ()));
7559
- assert (current_epoch <= ndb_latest_received_binlog_epoch);
7560
- initial_work (i_ndb);
7561
- pass_table_map_before_epoch (i_ndb, trans);
7562
-
7563
- if (trans.good ())
7564
- {
7565
- /* Inject ndb_apply_status WRITE_ROW event */
7566
- if (!injectApplyStatusWriteRow (trans, current_epoch))
7561
+
7562
+ else {
7563
+ // Handle empty or non-empty epoch
7564
+ thd->proc_info = " Processing events" ;
7565
+ ndb_binlog_index_row _row;
7566
+ ndb_binlog_index_row *rows= &_row;
7567
+ injector::transaction trans;
7568
+ unsigned trans_row_count= 0 ;
7569
+ unsigned trans_slave_row_count= 0 ;
7570
+
7571
+ rows= &_row;
7572
+ memset (&_row, 0 , sizeof (_row));
7573
+ thd->variables .character_set_client = &my_charset_latin1;
7574
+ DBUG_PRINT (" info" , (" Initializing transaction" ));
7575
+ inj->new_trans (thd, &trans);
7576
+
7577
+ if (event_type == NdbDictionary::Event::TE_EMPTY) {
7578
+ // Handle empty epoch
7579
+ if (ndb_log_empty_epochs ()) {
7580
+ DBUG_PRINT (" info" , (" Writing empty epoch %u/%u "
7581
+ " latest_handled_binlog_epoch %u/%u" ,
7582
+ (uint)(current_epoch >> 32 ),
7583
+ (uint)(current_epoch),
7584
+ (uint)(ndb_latest_handled_binlog_epoch >> 32 ),
7585
+ (uint)(ndb_latest_handled_binlog_epoch)));
7586
+
7587
+ DBUG_PRINT (" info" , (" Writing empty epoch for gci %llu" , current_epoch));
7588
+ commit_trans (thd, trans, current_epoch, rows, trans_row_count,
7589
+ trans_slave_row_count);
7590
+ }
7591
+
7592
+ i_pOp= i_ndb->nextEvent2 ();
7593
+
7594
+ // No need to updateInjectorStats(), since this event is
7595
+ // created by the event buffer, not sent by Ndb.
7596
+
7597
+ } else {
7598
+ // Handle non-empty epoch
7599
+ DBUG_PRINT (" info" , (" Handling epoch: %u/%u" ,
7600
+ (uint)(current_epoch >> 32 ),
7601
+ (uint)(current_epoch)));
7602
+
7603
+ // sometimes get TE_ALTER with invalid table
7604
+ assert (i_pOp->getEventType () == NdbDictionary::Event::TE_ALTER ||
7605
+ ! IS_NDB_BLOB_PREFIX (i_pOp->getEvent ()->getTable ()->getName ()));
7606
+
7607
+ /*
7608
+ * ndb_latest_received_binlog_epoch ==0: cluster is restarted after
7609
+ * a cluster failure. Let injector_ndb handle the received events
7610
+ * including TE_NODE_FAILURE and/or TE_CLUSTER_FAILURE.
7611
+ */
7612
+ if (ndb_latest_received_binlog_epoch != 0 )
7613
+ assert (current_epoch <= ndb_latest_received_binlog_epoch);
7614
+
7615
+ initial_work (i_ndb);
7616
+ pass_table_map_before_epoch (i_ndb, trans);
7617
+
7618
+ if (trans.good ())
7567
7619
{
7568
- sql_print_error (" NDB Binlog: Failed to inject apply status write row" );
7620
+ /* Inject ndb_apply_status WRITE_ROW event */
7621
+ if (!injectApplyStatusWriteRow (trans, current_epoch))
7622
+ {
7623
+ sql_print_error (" NDB Binlog: Failed to inject apply status write row" );
7624
+ }
7569
7625
}
7570
- }
7571
7626
7572
- do
7573
- {
7574
- if (i_pOp->hasError () &&
7575
- handle_error (i_pOp) < 0 )
7576
- goto err;
7627
+ do
7628
+ {
7629
+ if (i_pOp->hasError () &&
7630
+ handle_error (i_pOp) < 0 )
7631
+ goto err;
7577
7632
7578
7633
#ifndef NDEBUG
7579
- check_event_list_consistency (i_ndb, i_pOp, current_epoch);
7634
+ check_event_list_consistency (i_ndb, i_pOp, current_epoch);
7580
7635
#endif
7581
- if ((unsigned ) i_pOp->getEventType () <
7582
- (unsigned ) NDBEVENT::TE_FIRST_NON_DATA_EVENT)
7583
- handle_data_event (thd, i_ndb, i_pOp, &rows, trans,
7584
- trans_row_count, trans_slave_row_count);
7585
- else
7586
- {
7587
- handle_non_data_event (thd, i_pOp, *rows);
7588
- DBUG_PRINT (" info" , (" s_ndb first: %s" , s_ndb->getEventOperation () ?
7589
- s_ndb->getEventOperation ()->getEvent ()->getTable ()->getName () :
7590
- " <empty>" ));
7591
- DBUG_PRINT (" info" , (" i_ndb first: %s" , i_ndb->getEventOperation () ?
7592
- i_ndb->getEventOperation ()->getEvent ()->getTable ()->getName () :
7593
- " <empty>" ));
7594
- }
7636
+ if ((unsigned ) i_pOp->getEventType () <
7637
+ (unsigned ) NDBEVENT::TE_FIRST_NON_DATA_EVENT)
7638
+ handle_data_event (thd, i_ndb, i_pOp, &rows, trans,
7639
+ trans_row_count, trans_slave_row_count);
7640
+ else
7641
+ {
7642
+ handle_non_data_event (thd, i_pOp, *rows);
7643
+ DBUG_PRINT (" info" , (" s_ndb first: %s" , s_ndb->getEventOperation () ?
7644
+ s_ndb->getEventOperation ()->getEvent ()->getTable ()->getName () :
7645
+ " <empty>" ));
7646
+ DBUG_PRINT (" info" , (" i_ndb first: %s" , i_ndb->getEventOperation () ?
7647
+ i_ndb->getEventOperation ()->getEvent ()->getTable ()->getName () :
7648
+ " <empty>" ));
7649
+ }
7595
7650
7596
- // Capture any dynamic changes to max_alloc
7597
- i_ndb->set_eventbuf_max_alloc (opt_ndb_eventbuffer_max_alloc);
7651
+ // Capture any dynamic changes to max_alloc
7652
+ i_ndb->set_eventbuf_max_alloc (opt_ndb_eventbuffer_max_alloc);
7598
7653
7599
- i_pOp = i_ndb->nextEvent ();
7600
- } while (i_pOp && i_pOp->getEpoch () == current_epoch);
7654
+ i_pOp = i_ndb->nextEvent2 ();
7655
+ } while (i_pOp && i_pOp->getEpoch () == current_epoch);
7601
7656
7602
- updateInjectorStats (s_ndb, i_ndb);
7657
+ updateInjectorStats (s_ndb, i_ndb);
7603
7658
7604
- /*
7605
- NOTE: i_pOp is now referring to an event in the next epoch
7606
- or is == NULL
7607
- */
7659
+ /*
7660
+ NOTE: i_pOp is now referring to an event in the next epoch
7661
+ or is == NULL
7662
+ */
7608
7663
7609
- commit_trans (thd, trans, current_epoch, rows, trans_row_count,
7610
- trans_slave_row_count);
7664
+ commit_trans (thd, trans, current_epoch, rows, trans_row_count,
7665
+ trans_slave_row_count);
7611
7666
7612
- /*
7613
- NOTE: There are possible more i_pOp available.
7614
- However, these are from another epoch and should be handled
7615
- in next iteration of the binlog injector loop.
7616
- */
7617
- }
7618
- } // end: 'handled a 'current_epoch' of i_pOp's
7667
+ /*
7668
+ NOTE: There are possible more i_pOp available.
7669
+ However, these are from another epoch and should be handled
7670
+ in next iteration of the binlog injector loop.
7671
+ */
7672
+ } // else: handle non-empty epoch
7673
+ } // else: handle non-gap epoch
7674
+ } // else if (i_pOp != NULL ..
7619
7675
7620
7676
// Notify the schema event handler about post_epoch so it may finish
7621
7677
// any outstanding business
0 commit comments