18
18
import os
19
19
from typing import Dict , Any , List
20
20
21
- import pandas as pd
22
- from pandas ._testing import assert_frame_equal
23
21
from pyflink .table import Table , StreamTableEnvironment
24
22
25
23
from pyflink .ml .core .api import Model
@@ -39,17 +37,19 @@ def test_pipeline_model(self):
39
37
model = PipelineModel ([model_a , model_b , model_c ])
40
38
output_table = model .transform (input_table )[0 ]
41
39
42
- assert_frame_equal (output_table .to_pandas (),
43
- pd .DataFrame ([[31 ], [32 ], [33 ]], columns = ['a' ]))
40
+ predicted_results = [result [0 ] for result in
41
+ self .t_env .to_data_stream (output_table ).execute_and_collect ()]
42
+ self .assertEqual (predicted_results , [31 , 32 , 33 ])
44
43
45
44
# Saves and loads the PipelineModel.
46
45
path = os .path .join (self .temp_dir , "test_pipeline_model" )
47
46
model .save (path )
48
47
loaded_model = PipelineModel .load (self .t_env , path )
49
48
50
49
output_table2 = loaded_model .transform (input_table )[0 ]
51
- assert_frame_equal (output_table2 .to_pandas (),
52
- pd .DataFrame ([[31 ], [32 ], [33 ]], columns = ['a' ]))
50
+ predicted_results = [result [0 ] for result in
51
+ self .t_env .to_data_stream (output_table2 ).execute_and_collect ()]
52
+ self .assertEqual (predicted_results , [31 , 32 , 33 ])
53
53
54
54
def test_pipeline (self ):
55
55
input_table = self .t_env .from_elements ([(1 ,), (2 ,), (3 ,)], ['a' ])
@@ -60,8 +60,9 @@ def test_pipeline(self):
60
60
model = estimator .fit (input_table )
61
61
output_table = model .transform (input_table )[0 ]
62
62
63
- assert_frame_equal (output_table .to_pandas (),
64
- pd .DataFrame ([[21 ], [22 ], [23 ]], columns = ['a' ]))
63
+ predicted_results = [result [0 ] for result in
64
+ self .t_env .to_data_stream (output_table ).execute_and_collect ()]
65
+ self .assertEqual (predicted_results , [21 , 22 , 23 ])
65
66
66
67
# Saves and loads the PipelineModel.
67
68
path = os .path .join (self .temp_dir , "test_pipeline" )
@@ -70,8 +71,10 @@ def test_pipeline(self):
70
71
71
72
model = loaded_estimator .fit (input_table )
72
73
output_table = model .transform (input_table )[0 ]
73
- assert_frame_equal (output_table .to_pandas (),
74
- pd .DataFrame ([[21 ], [22 ], [23 ]], columns = ['a' ]))
74
+
75
+ predicted_results = [result [0 ] for result in
76
+ self .t_env .to_data_stream (output_table ).execute_and_collect ()]
77
+ self .assertEqual (predicted_results , [21 , 22 , 23 ])
75
78
76
79
77
80
class Add10Model (Model ):
0 commit comments