32
32
import re
33
33
import time
34
34
import weakref
35
+ TLS_V1_3_SUPPORTED = False
36
+ try :
37
+ import ssl
38
+ if hasattr (ssl , "HAS_TLSv1_3" ) and ssl .HAS_TLSv1_3 :
39
+ TLS_V1_3_SUPPORTED = True
40
+ except :
41
+ # If import fails, we don't have SSL support.
42
+ pass
35
43
36
44
from .catch23 import make_abc , BYTE_TYPES , STRING_TYPES
37
45
from .conversion import MySQLConverterBase
38
46
from .constants import (ClientFlag , CharacterSet , CONN_ATTRS_DN ,
39
- DEFAULT_CONFIGURATION )
47
+ DEFAULT_CONFIGURATION , OPENSSL_CS_NAMES ,
48
+ TLS_CIPHER_SUITES , TLS_VERSIONS )
40
49
from .optionfiles import MySQLOptionsParser
41
50
from . import errors
42
51
43
52
NAMED_TUPLE_CACHE = weakref .WeakValueDictionary ()
44
53
54
+ DUPLICATED_IN_LIST_ERROR = (
55
+ "The '{list}' list must not contain repeated values, the value "
56
+ "'{value}' is duplicated." )
57
+
58
+ TLS_VERSION_ERROR = ("The given tls_version: '{}' is not recognized as a valid "
59
+ "TLS protocol version (should be one of {})." )
60
+
61
+ TLS_VER_NO_SUPPORTED = ("No supported TLS protocol version found in the "
62
+ "'tls-versions' list '{}'. " )
63
+
64
+
45
65
@make_abc (ABCMeta )
46
66
class MySQLConnectionAbstract (object ):
47
67
@@ -152,6 +172,151 @@ def _read_option_files(self, config):
152
172
config [option ] = value [0 ]
153
173
return config
154
174
175
+ def _validate_tls_ciphersuites (self ):
176
+ """Validates the tls_ciphersuites option.
177
+ """
178
+ tls_ciphersuites = []
179
+ tls_cs = self ._ssl ["tls_ciphersuites" ]
180
+
181
+ if isinstance (tls_cs , STRING_TYPES ):
182
+ if not (tls_cs .startswith ("[" ) and
183
+ tls_cs .endswith ("]" )):
184
+ raise AttributeError ("tls_ciphersuites must be a list, "
185
+ "found: '{}'" .format (tls_cs ))
186
+ else :
187
+ tls_css = tls_cs [1 :- 1 ].split ("," )
188
+ if not tls_css :
189
+ raise AttributeError ("No valid cipher suite found "
190
+ "in 'tls_ciphersuites' list." )
191
+ for _tls_cs in tls_css :
192
+ _tls_cs = tls_cs .strip ().upper ()
193
+ if _tls_cs :
194
+ tls_ciphersuites .append (_tls_cs )
195
+
196
+ elif isinstance (tls_cs , list ):
197
+ tls_ciphersuites = [tls_cs for tls_cs in tls_cs if tls_cs ]
198
+
199
+ elif isinstance (tls_cs , set ):
200
+ for tls_cs in tls_ciphersuites :
201
+ if tls_cs :
202
+ tls_ciphersuites .append (tls_cs )
203
+ else :
204
+ raise AttributeError (
205
+ "tls_ciphersuites should be a list with one or more "
206
+ "ciphersuites. Found: '{}'" .format (tls_cs ))
207
+
208
+ tls_versions = TLS_VERSIONS [:] if self ._ssl .get ("tls_versions" , None ) \
209
+ is None else self ._ssl ["tls_versions" ][:]
210
+
211
+ # A newer TLS version can use a cipher introduced on
212
+ # an older version.
213
+ tls_versions .sort (reverse = True )
214
+ newer_tls_ver = tls_versions [0 ]
215
+ # translated_names[0] belongs to TLSv1, TLSv1.1 and TLSv1.2
216
+ # translated_names[1] are TLSv1.3 only
217
+ translated_names = [[],[]]
218
+ iani_cipher_suites_names = {}
219
+ ossl_cipher_suites_names = []
220
+
221
+ # Old ciphers can work with new TLS versions.
222
+ # Find all the ciphers introduced on previous TLS versions.
223
+ for tls_ver in TLS_VERSIONS [:TLS_VERSIONS .index (newer_tls_ver ) + 1 ]:
224
+ iani_cipher_suites_names .update (TLS_CIPHER_SUITES [tls_ver ])
225
+ ossl_cipher_suites_names .extend (OPENSSL_CS_NAMES [tls_ver ])
226
+
227
+ for name in tls_ciphersuites :
228
+ if "-" in name and name in ossl_cipher_suites_names :
229
+ if name in OPENSSL_CS_NAMES ["TLSv1.3" ]:
230
+ translated_names [1 ].append (name )
231
+ else :
232
+ translated_names [0 ].append (name )
233
+ elif name in iani_cipher_suites_names :
234
+ translated_name = iani_cipher_suites_names [name ]
235
+ if translated_name in translated_names :
236
+ raise AttributeError (
237
+ DUPLICATED_IN_LIST_ERROR .format (
238
+ list = "tls_ciphersuites" , value = translated_name ))
239
+ else :
240
+ if name in TLS_CIPHER_SUITES ["TLSv1.3" ]:
241
+ translated_names [1 ].append (
242
+ iani_cipher_suites_names [name ])
243
+ else :
244
+ translated_names [0 ].append (
245
+ iani_cipher_suites_names [name ])
246
+ else :
247
+ raise AttributeError (
248
+ "The value '{}' in tls_ciphersuites is not a valid "
249
+ "cipher suite" .format (name ))
250
+ if not translated_names [0 ] and not translated_names [1 ]:
251
+ raise AttributeError ("No valid cipher suite found in the "
252
+ "'tls_ciphersuites' list." )
253
+ translated_names = [":" .join (translated_names [0 ]),
254
+ ":" .join (translated_names [1 ])]
255
+ self ._ssl ["tls_ciphersuites" ] = translated_names
256
+
257
+ def _validate_tls_versions (self ):
258
+ """Validates the tls_versions option.
259
+ """
260
+ tls_versions = []
261
+ tls_version = self ._ssl ["tls_versions" ]
262
+
263
+ if isinstance (tls_version , STRING_TYPES ):
264
+ if not (tls_version .startswith ("[" ) and tls_version .endswith ("]" )):
265
+ raise AttributeError ("tls_versions must be a list, found: '{}'"
266
+ "" .format (tls_version ))
267
+ else :
268
+ tls_vers = tls_version [1 :- 1 ].split ("," )
269
+ for tls_ver in tls_vers :
270
+ tls_version = tls_ver .strip ()
271
+ if tls_version == "" :
272
+ continue
273
+ elif tls_version not in TLS_VERSIONS :
274
+ raise AttributeError (
275
+ TLS_VERSION_ERROR .format (tls_version , TLS_VERSIONS ))
276
+ elif tls_version in tls_versions :
277
+ raise AttributeError (
278
+ DUPLICATED_IN_LIST_ERROR .format (
279
+ list = "tls_versions" , value = tls_version ))
280
+ tls_versions .append (tls_version )
281
+ if tls_vers == ["TLSv1.3" ] and not TLS_V1_3_SUPPORTED :
282
+ raise AttributeError (
283
+ TLS_VER_NO_SUPPORTED .format (tls_version , TLS_VERSIONS ))
284
+ elif isinstance (tls_version , list ):
285
+ if not tls_version :
286
+ raise AttributeError (
287
+ "At least one TLS protocol version must be specified in "
288
+ "'tls_versions' list." )
289
+ for tls_ver in tls_version :
290
+ if tls_ver not in TLS_VERSIONS :
291
+ raise AttributeError (
292
+ TLS_VERSION_ERROR .format (tls_ver , TLS_VERSIONS ))
293
+ elif tls_ver in tls_versions :
294
+ raise AttributeError (
295
+ DUPLICATED_IN_LIST_ERROR .format (
296
+ list = "tls_versions" , value = tls_ver ))
297
+ else :
298
+ tls_versions .append (tls_ver )
299
+ elif isinstance (tls_version , set ):
300
+ for tls_ver in tls_version :
301
+ if tls_ver not in TLS_VERSIONS :
302
+ raise AttributeError (
303
+ TLS_VERSION_ERROR .format (tls_ver , TLS_VERSIONS ))
304
+ tls_versions .append (tls_ver )
305
+ else :
306
+ raise AttributeError (
307
+ "tls_versions should be a list with one or more of versions in "
308
+ "{}. found: '{}'" .format (", " .join (TLS_VERSIONS ), tls_versions ))
309
+
310
+ if not tls_versions :
311
+ raise AttributeError (
312
+ "At least one TLS protocol version must be specified "
313
+ "in 'tls_versions' list when this option is given." )
314
+ if tls_versions == ["TLSv1.3" ] and not TLS_V1_3_SUPPORTED :
315
+ raise AttributeError (
316
+ TLS_VER_NO_SUPPORTED .format (tls_version , TLS_VERSIONS ))
317
+ tls_versions .sort ()
318
+ self ._ssl ["tls_versions" ] = tls_versions
319
+
155
320
@property
156
321
def user (self ):
157
322
"""User used while connecting to MySQL"""
@@ -333,6 +498,9 @@ def config(self, **kwargs):
333
498
if key .startswith ('ssl_' ):
334
499
set_ssl_flag = True
335
500
self ._ssl .update ({key .replace ('ssl_' , '' ): value })
501
+ elif key .startswith ('tls_' ):
502
+ set_ssl_flag = True
503
+ self ._ssl .update ({key : value })
336
504
else :
337
505
attribute = '_' + key
338
506
try :
@@ -349,8 +517,7 @@ def config(self, **kwargs):
349
517
DEFAULT_CONFIGURATION ['ssl_verify_identity' ]
350
518
# Make sure both ssl_key/ssl_cert are set, or neither (XOR)
351
519
if 'ca' not in self ._ssl or self ._ssl ['ca' ] is None :
352
- raise AttributeError (
353
- "Missing ssl_ca argument." )
520
+ self ._ssl ['ca' ] = ""
354
521
if bool ('key' in self ._ssl ) != bool ('cert' in self ._ssl ):
355
522
raise AttributeError (
356
523
"ssl_key and ssl_cert need to be both "
@@ -365,6 +532,18 @@ def config(self, **kwargs):
365
532
"ssl_key and ssl_cert need to be both "
366
533
"set, or neither."
367
534
)
535
+ if "tls_versions" in self ._ssl and \
536
+ self ._ssl ["tls_versions" ] is not None :
537
+ if self ._ssl_disabled :
538
+ raise AttributeError ("The tls_versions option can not be "
539
+ "used along with ssl_disabled." )
540
+ self ._validate_tls_versions ()
541
+
542
+ if "tls_ciphersuites" in self ._ssl and self ._ssl ["tls_ciphersuites" ] is not None :
543
+ if self ._ssl_disabled :
544
+ raise AttributeError ("The tls_ciphersuites option can not "
545
+ "be used along with ssl_disabled." )
546
+ self ._validate_tls_ciphersuites ()
368
547
369
548
if self ._conn_attrs is None :
370
549
self ._conn_attrs = {}
@@ -779,7 +958,7 @@ def connect(self, **kwargs):
779
958
780
959
self .disconnect ()
781
960
self ._open_connection ()
782
- # Server does not allow to run any other statement different from ALTER
961
+ # Server does not allow to run any other statement different from ALTER
783
962
# when user's password has been expired.
784
963
if not self ._client_flags & ClientFlag .CAN_HANDLE_EXPIRED_PASSWORDS :
785
964
self ._post_connection ()
0 commit comments