@@ -882,6 +882,108 @@ def test_caching_sha2_password(self):
882
882
# Check the first auth packet
883
883
self .assertEqual (packets [1 ][4 :], auth_pkt )
884
884
885
+ @unittest .skipIf (not tests .SSL_AVAILABLE , "Python has no SSL support" )
886
+ def test_auth_plugin_is_not_supported_but_required (self ):
887
+ """Test behavior when server's default auth_plugin is required"""
888
+ self .cnx ._socket .sock = tests .DummySocket ()
889
+ flags = constants .ClientFlag .get_default ()
890
+ flags |= constants .ClientFlag .SSL
891
+ kwargs = {
892
+ 'username' : 'ham' ,
893
+ 'password' : 'spam' ,
894
+ 'database' : 'test' ,
895
+ 'charset' : 33 ,
896
+ 'client_flags' : flags ,
897
+ 'ssl_options' : {
898
+ 'ca' : os .path .join (tests .SSL_DIR , 'tests_CA_cert.pem' ),
899
+ 'cert' : os .path .join (tests .SSL_DIR , 'tests_client_cert.pem' ),
900
+ 'key' : os .path .join (tests .SSL_DIR , 'tests_client_key.pem' ),
901
+ },
902
+ }
903
+
904
+ self .cnx ._handshake ['auth_plugin' ] = 'unsupported_auth_plugin'
905
+ self .cnx ._handshake ['auth_data' ] = b'abcdef!012345'
906
+ self .cnx ._socket .switch_to_ssl = \
907
+ lambda ca , cert , key , verify_cert , cipher : None
908
+
909
+ # Test perform_full_authentication
910
+ # Exchange:
911
+ # Client Server
912
+ # ------ ------
913
+ # make_ssl_auth
914
+ # first_auth
915
+ # Sorry user requires the unsupported_auth_plugin
916
+ # second_auth
917
+ # OK
918
+ self .cnx ._socket .sock .reset ()
919
+ self .cnx ._socket .sock .add_packets ([
920
+ bytearray (b'\x2c \x00 \x00 \x03 \xfe \x75 \x6e \x73 \x75 \x70 \x70 \x6f \x72 '
921
+ b'\x74 \x65 \x64 \x5f \x61 \x75 \x74 \x68 \x5f \x70 \x6c \x75 \x67 '
922
+ b'\x69 \x6e \x00 \x60 \x1e \x10 \x78 \x01 \x3c \x1e \x33 \x38 \x6f '
923
+ b'\x08 \x5f \x0d \x7a \x6f \x01 \x7b \x7a \x4a \x0d \x00 ' )
924
+ ])
925
+ # Since the fictional authoritation plugin 'unsupported_auth_plugin' is
926
+ # not supported a NotSupportedError is raised
927
+ self .assertRaises (errors .NotSupportedError , self .cnx ._do_auth , ** kwargs )
928
+
929
+ @unittest .skipIf (not tests .SSL_AVAILABLE , "Python has no SSL support" )
930
+ @unittest .skipIf (tests .MYSQL_VERSION < (8 , 0 , 3 ),
931
+ "caching_sha2_password plugin not supported by server." )
932
+ def test_auth_plugin_fall_back_if_not_supported (self ):
933
+ """Test Fall back to dflt auth_plugin if server's plugin is unknown"""
934
+ self .cnx ._socket .sock = tests .DummySocket ()
935
+ flags = constants .ClientFlag .get_default ()
936
+ flags |= constants .ClientFlag .SSL
937
+ kwargs = {
938
+ 'username' : 'ham' ,
939
+ 'password' : 'spam' ,
940
+ 'database' : 'test' ,
941
+ 'charset' : 33 ,
942
+ 'client_flags' : flags ,
943
+ 'ssl_options' : {
944
+ 'ca' : os .path .join (tests .SSL_DIR , 'tests_CA_cert.pem' ),
945
+ 'cert' : os .path .join (tests .SSL_DIR , 'tests_client_cert.pem' ),
946
+ 'key' : os .path .join (tests .SSL_DIR , 'tests_client_key.pem' ),
947
+ },
948
+ }
949
+
950
+ self .cnx ._handshake ['auth_plugin' ] = 'unsupported_auth_plugin'
951
+ self .cnx ._handshake ['auth_data' ] = b'abcdef!012345'
952
+ self .cnx ._socket .switch_to_ssl = \
953
+ lambda ca , cert , key , verify_cert , cipher : None
954
+
955
+ # Test perform_full_authentication
956
+ # Exchange:
957
+ # Client Server
958
+ # ------ ------
959
+ # make_ssl_auth
960
+ # first_auth
961
+ # auth_switch to mysql_native_password
962
+ # second_auth
963
+ # OK
964
+ self .cnx ._socket .sock .reset ()
965
+ self .cnx ._socket .sock .add_packets ([
966
+ bytearray (b'\x8d \xaa \x0b \x00 \x00 \x00 ' ), # full_auth request
967
+ bytearray (b'\x00 @!\x00 \x00 \x00 \x00 \x00 \x00 \x00 \x00 \x00 \x00 \x00 '
968
+ b'\x00 \x00 \x00 \x00 \x00 \x00 \x00 \x00 \x00 \x00 \x00 \x00 ' ),
969
+ bytearray (b'\x07 \x00 \x00 \x05 \x00 \x00 \x00 \x02 \x00 \x00 \x00 ' ) # OK
970
+ ])
971
+
972
+ # No exception should be raided
973
+ self .cnx ._do_auth (** kwargs )
974
+
975
+ packets = self .cnx ._socket .sock ._client_sends
976
+
977
+ self .cnx ._handshake ['auth_plugin' ] = 'mysql_native_password'
978
+ auth_pkt = self .cnx ._protocol .make_auth (
979
+ self .cnx ._handshake , kwargs ['username' ],
980
+ kwargs ['password' ], kwargs ['database' ],
981
+ charset = kwargs ['charset' ],
982
+ client_flags = kwargs ['client_flags' ],
983
+ ssl_enabled = True )
984
+ # Check the first_auth packet
985
+ self .assertEqual (packets [1 ][4 :], auth_pkt )
986
+
885
987
@unittest .skipIf (not tests .SSL_AVAILABLE , "Python has no SSL support" )
886
988
def test__do_auth_ssl (self ):
887
989
"""Authenticate with the MySQL server using SSL"""
0 commit comments