@@ -92,7 +92,7 @@ public static void main(String[] args) throws Exception {
92
92
93
93
//KAFKA INPUT SOURCE
94
94
DataStream <String > stream = env
95
- .addSource (new FlinkKafkaConsumer <>("sine_dataset_3M_full_abrupt " , new SimpleStringSchema (), properties ).setStartFromEarliest ())
95
+ .addSource (new FlinkKafkaConsumer <>("sine_3M_2_drifts_500000_2000000 " , new SimpleStringSchema (), properties ).setStartFromEarliest ())
96
96
.name ("Kafka Input Source" ).setParallelism (parallelism ).setMaxParallelism (parallelism );
97
97
98
98
@@ -151,7 +151,7 @@ public void flatMap(String input_stream, Collector<Tuple3<String, Integer, Integ
151
151
152
152
153
153
// PERFORMANCE (ERROR-RATE) MONITORING SINK
154
- partial_result .addSink (new FlinkKafkaProducer <>("localhost:9092" , "sine_dataset_3M_full_abrupt_without " ,
154
+ partial_result .addSink (new FlinkKafkaProducer <>("localhost:9092" , "sine_3M_2_drifts_500000_2000000_without_prop " ,
155
155
(SerializationSchema <Tuple6 <Integer , Integer , Integer , Integer , Double , Integer >>)
156
156
element -> (element .getField (5 ).toString () + "," + element .getField (4 ).toString () + "," + element .getField (0 ).toString ()).getBytes ()))
157
157
.name ("Visualizing Performance Metrics" ).setParallelism (parallelism );
@@ -192,7 +192,6 @@ public boolean filter(Tuple6<Integer, Integer, Integer, Integer, Double, Integer
192
192
static class StatefulMap extends RichFlatMapFunction <Tuple3 <String , Integer , Integer >, Tuple6 <Integer , Integer , Integer , Integer , Double , Integer >> {
193
193
194
194
private transient ValueState <HoeffdingTree > hoeffdingTreeValueState ;
195
- private transient ValueState <HoeffdingTree > backup_hoeffdingTreeValueState ;
196
195
private transient ValueState <HoeffdingTree > background_hoeffdingTreeValueState ;
197
196
private transient ValueState <ConceptDriftDetector > ConceptDriftDetectorValueState ;
198
197
private transient ValueState <Boolean > empty_state ;
@@ -247,10 +246,24 @@ public void flatMap(Tuple3<String, Integer, Integer> input_stream, Collector<Tup
247
246
* "As each training example is presented ot our algorithm, for each base model,
248
247
* choose the example K - Poisson(1) times and update the base model accordingly."
249
248
*/
250
- if (instance_id > 1000000 && instance_id < 1000010 ) {
249
+
250
+ if (instance_id > 10000 && instance_id < 10010 ) {
251
251
System .out .println ("HT " + hoeffding_tree_id + " Instance " + instance_id + " size " + ht .SizeHT (ht .root ) + " accuracy " + ht .getAccuracy ());
252
252
}
253
- if (instance_id > 3000000 && instance_id < 3000010 ) {
253
+ if (instance_id > 500000 && instance_id < 500010 ) {
254
+ System .out .println ("HT " + hoeffding_tree_id + " Instance " + instance_id + " size " + ht .SizeHT (ht .root ) + " accuracy " + ht .getAccuracy ());
255
+ }
256
+
257
+ if (instance_id > 600000 && instance_id < 600010 ) {
258
+ System .out .println ("HT " + hoeffding_tree_id + " Instance " + instance_id + " size " + ht .SizeHT (ht .root ) + " accuracy " + ht .getAccuracy ());
259
+ }
260
+ if (instance_id > 1900000 && instance_id < 1900010 ) {
261
+ System .out .println ("HT " + hoeffding_tree_id + " Instance " + instance_id + " size " + ht .SizeHT (ht .root ) + " accuracy " + ht .getAccuracy ());
262
+ }
263
+ if (instance_id > 2100000 && instance_id < 2100010 ) {
264
+ System .out .println ("HT " + hoeffding_tree_id + " Instance " + instance_id + " size " + ht .SizeHT (ht .root ) + " accuracy " + ht .getAccuracy ());
265
+ }
266
+ if (instance_id > 2990000 && instance_id < 2990010 ) {
254
267
System .out .println ("HT " + hoeffding_tree_id + " Instance " + instance_id + " size " + ht .SizeHT (ht .root ) + " accuracy " + ht .getAccuracy ());
255
268
}
256
269
@@ -262,13 +275,8 @@ public void flatMap(Tuple3<String, Integer, Integer> input_stream, Collector<Tup
262
275
//error_rate = ht.getErrorRate();
263
276
hoeffdingTreeValueState .update (ht );
264
277
265
- HoeffdingTree backup_ht = backup_hoeffdingTreeValueState .value ();
266
- HoeffdingTree .Returninfo rn1 = backup_ht .TestHoeffdingTree (backup_ht .root , features , purpose_id );
267
- prediction1 = rn1 .getPrediction ();
268
- backup_ht .UpdateHoeffdingTree (rn1 .getNode (), features , instance_weight );
269
- backup_hoeffdingTreeValueState .update (backup_ht );
270
278
271
- collector . collect ( new Tuple6 <>( instance_id , prediction1 , - 1 , purpose_id , backup_ht . getErrorRate (), 2 ));
279
+
272
280
// Concept Drift Handler
273
281
if (drift_detection_method_id != 0 ) {
274
282
@@ -296,7 +304,7 @@ public void flatMap(Tuple3<String, Integer, Integer> input_stream, Collector<Tup
296
304
empty_background_state .update (false );
297
305
// Warning Signal. Create & Train the Background Tree
298
306
HoeffdingTree background_hoeffdingTree = new HoeffdingTree ();
299
- background_hoeffdingTree .NEW_CreateHoeffdingTree (7 , 9 , 200 , 0.0001 , 0.05 , this .combination_function , hoeffding_tree_id , 1 );
307
+ background_hoeffdingTree .NEW_CreateHoeffdingTree (4 , 4 , 200 , 0.0001 , 0.05 , this .combination_function , hoeffding_tree_id , 1 );
300
308
// background_hoeffdingTree.print_m_features();
301
309
background_hoeffdingTreeValueState .update (background_hoeffdingTree );
302
310
}
@@ -336,16 +344,13 @@ public void flatMap(Tuple3<String, Integer, Integer> input_stream, Collector<Tup
336
344
* Otherwise, we would have had the same weight throughout the streaming passage.
337
345
* */
338
346
hoeffdingTreeValueState .update (ht );
339
- HoeffdingTree backup_ht = backup_hoeffdingTreeValueState .value ();
340
- prediction1 = backup_ht .TestHoeffdingTree (backup_ht .root , features , purpose_id ).getPrediction ();
341
- backup_hoeffdingTreeValueState .update (backup_ht );
347
+
342
348
/* In case of instances which are fet to the system for prediction, we do not know its true label.
343
349
* Therefore, our need for a homogeneous output from the state, leads with no other choice of assigning
344
350
* an identifier in the true_label position (aka 3rd Integer in the collector)
345
351
* */
346
352
//System.out.println("Testing HT with id " + hoeffding_tree_id + " which has error-rate " + ht.getErrorRate() + " predicts " + prediction + " for the instance with id " + instance_id + " while the true label is " + true_label);
347
353
collector .collect (new Tuple6 <>(instance_id , prediction , true_label , purpose_id , ht .getErrorRate (), 1 ));
348
- collector .collect (new Tuple6 <>(instance_id , prediction , true_label , purpose_id , backup_ht .getErrorRate (), 2 ));
349
354
} else if (instance_id == -1 && true_label == -1 && purpose_id == -1 ) {
350
355
HoeffdingTree ht = hoeffdingTreeValueState .value ();
351
356
int size = ht .SizeHT (ht .root );
@@ -370,7 +375,6 @@ public void flatMap(Tuple3<String, Integer, Integer> input_stream, Collector<Tup
370
375
age_of_maturity .update (age_of_maturity .value () + 1 );
371
376
if (purpose_id == 5 ) {
372
377
HoeffdingTree ht = hoeffdingTreeValueState .value ();
373
- HoeffdingTree backup_ht = backup_hoeffdingTreeValueState .value ();
374
378
/* Online Bagging*/
375
379
// if(true_label == 0) {
376
380
// System.out.println("Training 0 " + features[0] + "," + features[1] + "," + true_label + "," + instance_weight + "," + instance_id);
@@ -379,10 +383,8 @@ public void flatMap(Tuple3<String, Integer, Integer> input_stream, Collector<Tup
379
383
// System.out.println(" Training 1 " + features[0] + "," + features[1] + "," + true_label + "," + instance_weight + "," + instance_id);
380
384
// }
381
385
ht .UpdateHoeffdingTree (ht .root , features , instance_weight );
382
- backup_ht .UpdateHoeffdingTree (backup_ht .root , features , instance_weight );
383
386
384
387
hoeffdingTreeValueState .update (ht );
385
- backup_hoeffdingTreeValueState .update (backup_ht );
386
388
}
387
389
}
388
390
} else if (empty_state .value ()) {
@@ -396,14 +398,11 @@ public void flatMap(Tuple3<String, Integer, Integer> input_stream, Collector<Tup
396
398
/* If state is empty, we have to Create a new HoeffdingTree.HoeffdingTree. */
397
399
HoeffdingTree hoeffdingTree = new HoeffdingTree ();
398
400
// hoeffdingTree.CreateHoeffdingTree(2, 2, 200, 0.0001, 0.05, this.combination_function, hoeffding_tree_id, 0);
399
- hoeffdingTree .NEW_CreateHoeffdingTree (2 , 2 , 200 , 0.000001 , 0.05 , this .combination_function , hoeffding_tree_id , age_of_maturity_input );
401
+ hoeffdingTree .NEW_CreateHoeffdingTree (4 , 4 , 200 , 0.000001 , 0.05 , this .combination_function , hoeffding_tree_id , age_of_maturity_input );
400
402
401
403
hoeffdingTreeValueState .update (hoeffdingTree );
402
404
hoeffdingTree .print_m_features ();
403
- HoeffdingTree back_up_hoeffdingTree = new HoeffdingTree ();
404
- back_up_hoeffdingTree .NEW_CreateHoeffdingTree (2 , 2 , 200 , 0.000001 , 0.05 , this .combination_function , hoeffding_tree_id , age_of_maturity_input );
405
405
406
- backup_hoeffdingTreeValueState .update (back_up_hoeffdingTree );
407
406
/* Also we create the a new ConceptDriftDetector.ConceptDriftDetector */
408
407
if (drift_detection_method_id != 0 ) {
409
408
// System.out.println("Intro to Concept Drift");
@@ -421,8 +420,7 @@ public void open(Configuration conf) {
421
420
hoeffdingTreeValueState = getRuntimeContext ().getState (descriptor_hoeffding_tree );
422
421
423
422
424
- ValueStateDescriptor <HoeffdingTree > descriptor_backup_hoeffdingTreeValueState = new ValueStateDescriptor <HoeffdingTree >("backup_hoeffdingTreeValueState" , HoeffdingTree .class );
425
- backup_hoeffdingTreeValueState = getRuntimeContext ().getState (descriptor_backup_hoeffdingTreeValueState );
423
+
426
424
427
425
/* Background Hoeffding Tree */
428
426
ValueStateDescriptor <HoeffdingTree > descriptor_background_hoeffding_tree = new ValueStateDescriptor <HoeffdingTree >("background_hoeffdingTreeValueState" , HoeffdingTree .class );
0 commit comments