Skip to content

Commit f630184

Browse files
vvittis@isc.tuc.grvvittis@isc.tuc.gr
authored andcommitted
Base Learner Proposition: No VDDM , Yes: SVFDT-II new new
1 parent f864148 commit f630184

File tree

3 files changed

+29
-31
lines changed

3 files changed

+29
-31
lines changed

src/main/java/DistributedLearning.java

Lines changed: 23 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ public static void main(String[] args) throws Exception {
9292

9393
//KAFKA INPUT SOURCE
9494
DataStream<String> stream = env
95-
.addSource(new FlinkKafkaConsumer<>("sine_dataset_3M_full_abrupt", new SimpleStringSchema(), properties).setStartFromEarliest())
95+
.addSource(new FlinkKafkaConsumer<>("sine_3M_2_drifts_500000_2000000", new SimpleStringSchema(), properties).setStartFromEarliest())
9696
.name("Kafka Input Source").setParallelism(parallelism).setMaxParallelism(parallelism);
9797

9898

@@ -151,7 +151,7 @@ public void flatMap(String input_stream, Collector<Tuple3<String, Integer, Integ
151151

152152

153153
// PERFORMANCE (ERROR-RATE) MONITORING SINK
154-
partial_result.addSink(new FlinkKafkaProducer<>("localhost:9092", "sine_dataset_3M_full_abrupt_without",
154+
partial_result.addSink(new FlinkKafkaProducer<>("localhost:9092", "sine_3M_2_drifts_500000_2000000_without_prop",
155155
(SerializationSchema<Tuple6<Integer, Integer, Integer, Integer, Double, Integer>>)
156156
element -> (element.getField(5).toString() + "," + element.getField(4).toString() + "," + element.getField(0).toString()).getBytes()))
157157
.name("Visualizing Performance Metrics").setParallelism(parallelism);
@@ -192,7 +192,6 @@ public boolean filter(Tuple6<Integer, Integer, Integer, Integer, Double, Integer
192192
static class StatefulMap extends RichFlatMapFunction<Tuple3<String, Integer, Integer>, Tuple6<Integer, Integer, Integer, Integer, Double, Integer>> {
193193

194194
private transient ValueState<HoeffdingTree> hoeffdingTreeValueState;
195-
private transient ValueState<HoeffdingTree> backup_hoeffdingTreeValueState;
196195
private transient ValueState<HoeffdingTree> background_hoeffdingTreeValueState;
197196
private transient ValueState<ConceptDriftDetector> ConceptDriftDetectorValueState;
198197
private transient ValueState<Boolean> empty_state;
@@ -247,10 +246,24 @@ public void flatMap(Tuple3<String, Integer, Integer> input_stream, Collector<Tup
247246
* "As each training example is presented ot our algorithm, for each base model,
248247
* choose the example K - Poisson(1) times and update the base model accordingly."
249248
*/
250-
if (instance_id > 1000000 && instance_id < 1000010) {
249+
250+
if (instance_id > 10000 && instance_id < 10010) {
251251
System.out.println("HT " + hoeffding_tree_id + " Instance " + instance_id + " size " + ht.SizeHT(ht.root) + " accuracy " + ht.getAccuracy());
252252
}
253-
if (instance_id > 3000000 && instance_id < 3000010) {
253+
if (instance_id > 500000 && instance_id < 500010) {
254+
System.out.println("HT " + hoeffding_tree_id + " Instance " + instance_id + " size " + ht.SizeHT(ht.root) + " accuracy " + ht.getAccuracy());
255+
}
256+
257+
if (instance_id > 600000 && instance_id < 600010) {
258+
System.out.println("HT " + hoeffding_tree_id + " Instance " + instance_id + " size " + ht.SizeHT(ht.root) + " accuracy " + ht.getAccuracy());
259+
}
260+
if (instance_id > 1900000 && instance_id < 1900010) {
261+
System.out.println("HT " + hoeffding_tree_id + " Instance " + instance_id + " size " + ht.SizeHT(ht.root) + " accuracy " + ht.getAccuracy());
262+
}
263+
if (instance_id > 2100000 && instance_id < 2100010) {
264+
System.out.println("HT " + hoeffding_tree_id + " Instance " + instance_id + " size " + ht.SizeHT(ht.root) + " accuracy " + ht.getAccuracy());
265+
}
266+
if (instance_id > 2990000 && instance_id < 2990010) {
254267
System.out.println("HT " + hoeffding_tree_id + " Instance " + instance_id + " size " + ht.SizeHT(ht.root) + " accuracy " + ht.getAccuracy());
255268
}
256269

@@ -262,13 +275,8 @@ public void flatMap(Tuple3<String, Integer, Integer> input_stream, Collector<Tup
262275
//error_rate = ht.getErrorRate();
263276
hoeffdingTreeValueState.update(ht);
264277

265-
HoeffdingTree backup_ht = backup_hoeffdingTreeValueState.value();
266-
HoeffdingTree.Returninfo rn1 = backup_ht.TestHoeffdingTree(backup_ht.root, features, purpose_id);
267-
prediction1 = rn1.getPrediction();
268-
backup_ht.UpdateHoeffdingTree(rn1.getNode(), features, instance_weight);
269-
backup_hoeffdingTreeValueState.update(backup_ht);
270278

271-
collector.collect(new Tuple6<>(instance_id, prediction1, -1, purpose_id, backup_ht.getErrorRate(), 2));
279+
272280
// Concept Drift Handler
273281
if (drift_detection_method_id != 0) {
274282

@@ -296,7 +304,7 @@ public void flatMap(Tuple3<String, Integer, Integer> input_stream, Collector<Tup
296304
empty_background_state.update(false);
297305
// Warning Signal. Create & Train the Background Tree
298306
HoeffdingTree background_hoeffdingTree = new HoeffdingTree();
299-
background_hoeffdingTree.NEW_CreateHoeffdingTree(7, 9, 200, 0.0001, 0.05, this.combination_function, hoeffding_tree_id, 1);
307+
background_hoeffdingTree.NEW_CreateHoeffdingTree(4, 4, 200, 0.0001, 0.05, this.combination_function, hoeffding_tree_id, 1);
300308
// background_hoeffdingTree.print_m_features();
301309
background_hoeffdingTreeValueState.update(background_hoeffdingTree);
302310
}
@@ -336,16 +344,13 @@ public void flatMap(Tuple3<String, Integer, Integer> input_stream, Collector<Tup
336344
* Otherwise, we would have had the same weight throughout the streaming passage.
337345
* */
338346
hoeffdingTreeValueState.update(ht);
339-
HoeffdingTree backup_ht = backup_hoeffdingTreeValueState.value();
340-
prediction1 = backup_ht.TestHoeffdingTree(backup_ht.root, features, purpose_id).getPrediction();
341-
backup_hoeffdingTreeValueState.update(backup_ht);
347+
342348
/* In case of instances which are fet to the system for prediction, we do not know its true label.
343349
* Therefore, our need for a homogeneous output from the state, leads with no other choice of assigning
344350
* an identifier in the true_label position (aka 3rd Integer in the collector)
345351
* */
346352
//System.out.println("Testing HT with id " + hoeffding_tree_id + " which has error-rate " + ht.getErrorRate() + " predicts " + prediction + " for the instance with id " + instance_id + " while the true label is " + true_label);
347353
collector.collect(new Tuple6<>(instance_id, prediction, true_label, purpose_id, ht.getErrorRate(), 1));
348-
collector.collect(new Tuple6<>(instance_id, prediction, true_label, purpose_id, backup_ht.getErrorRate(), 2));
349354
} else if (instance_id == -1 && true_label == -1 && purpose_id == -1) {
350355
HoeffdingTree ht = hoeffdingTreeValueState.value();
351356
int size = ht.SizeHT(ht.root);
@@ -370,7 +375,6 @@ public void flatMap(Tuple3<String, Integer, Integer> input_stream, Collector<Tup
370375
age_of_maturity.update(age_of_maturity.value() + 1);
371376
if (purpose_id == 5) {
372377
HoeffdingTree ht = hoeffdingTreeValueState.value();
373-
HoeffdingTree backup_ht = backup_hoeffdingTreeValueState.value();
374378
/* Online Bagging*/
375379
// if(true_label == 0) {
376380
// System.out.println("Training 0 " + features[0] + "," + features[1] + "," + true_label + "," + instance_weight + "," + instance_id);
@@ -379,10 +383,8 @@ public void flatMap(Tuple3<String, Integer, Integer> input_stream, Collector<Tup
379383
// System.out.println(" Training 1 " + features[0] + "," + features[1] + "," + true_label + "," + instance_weight + "," + instance_id);
380384
// }
381385
ht.UpdateHoeffdingTree(ht.root, features, instance_weight);
382-
backup_ht.UpdateHoeffdingTree(backup_ht.root, features, instance_weight);
383386

384387
hoeffdingTreeValueState.update(ht);
385-
backup_hoeffdingTreeValueState.update(backup_ht);
386388
}
387389
}
388390
} else if (empty_state.value()) {
@@ -396,14 +398,11 @@ public void flatMap(Tuple3<String, Integer, Integer> input_stream, Collector<Tup
396398
/* If state is empty, we have to Create a new HoeffdingTree.HoeffdingTree. */
397399
HoeffdingTree hoeffdingTree = new HoeffdingTree();
398400
// hoeffdingTree.CreateHoeffdingTree(2, 2, 200, 0.0001, 0.05, this.combination_function, hoeffding_tree_id, 0);
399-
hoeffdingTree.NEW_CreateHoeffdingTree(2, 2, 200, 0.000001, 0.05, this.combination_function, hoeffding_tree_id, age_of_maturity_input);
401+
hoeffdingTree.NEW_CreateHoeffdingTree(4, 4, 200, 0.000001, 0.05, this.combination_function, hoeffding_tree_id, age_of_maturity_input);
400402

401403
hoeffdingTreeValueState.update(hoeffdingTree);
402404
hoeffdingTree.print_m_features();
403-
HoeffdingTree back_up_hoeffdingTree = new HoeffdingTree();
404-
back_up_hoeffdingTree.NEW_CreateHoeffdingTree(2, 2, 200, 0.000001, 0.05, this.combination_function, hoeffding_tree_id, age_of_maturity_input);
405405

406-
backup_hoeffdingTreeValueState.update(back_up_hoeffdingTree);
407406
/* Also we create the a new ConceptDriftDetector.ConceptDriftDetector */
408407
if (drift_detection_method_id != 0) {
409408
// System.out.println("Intro to Concept Drift");
@@ -421,8 +420,7 @@ public void open(Configuration conf) {
421420
hoeffdingTreeValueState = getRuntimeContext().getState(descriptor_hoeffding_tree);
422421

423422

424-
ValueStateDescriptor<HoeffdingTree> descriptor_backup_hoeffdingTreeValueState = new ValueStateDescriptor<HoeffdingTree>("backup_hoeffdingTreeValueState", HoeffdingTree.class);
425-
backup_hoeffdingTreeValueState = getRuntimeContext().getState(descriptor_backup_hoeffdingTreeValueState);
423+
426424

427425
/* Background Hoeffding Tree */
428426
ValueStateDescriptor<HoeffdingTree> descriptor_background_hoeffding_tree = new ValueStateDescriptor<HoeffdingTree>("background_hoeffdingTreeValueState", HoeffdingTree.class);

src/main/java/HoeffdingTree/HoeffdingTree.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -73,15 +73,15 @@ public boolean NeedUpdate() {
7373
statistics_ht_history[3] = 0;
7474
statistics_ht_history[4] = 0;
7575
}
76-
if (this.getAccuracy() < statistics_ht_history[0] + 2*statistics_ht_history[3]) {
77-
if (this.getAccuracy() > statistics_ht_history[0] - 2*statistics_ht_history[3]) {
76+
if (this.getAccuracy() < statistics_ht_history[0] + statistics_ht_history[3]) {
77+
if (this.getAccuracy() > statistics_ht_history[0] - statistics_ht_history[3]) {
7878
this.counter1 = this.counter1 + 1;
7979
// System.out.println("Counter 1" + this.counter1 + " Counter 2" + this.counter);
8080
}
8181
}
82-
if ((this.counter1 / this.counter) > 0.8) {
83-
return false;
84-
}
82+
// if ((this.counter1 / this.counter) > 0.8) {
83+
// return false;
84+
// }
8585
}
8686
return true;
8787

src/main/java/Utilities/DefaultValues.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ public class DefaultValues {
2727
int combination_function = 3;
2828
double weighted_voting_parameter = 1;
2929
int age_of_maturity = 1000;
30-
int drift_detection_method_id = 0;
30+
int drift_detection_method_id = 1;
3131

3232

3333

0 commit comments

Comments
 (0)