Menu

[947b1a]: / MySQLdb / MySQLdb / converters.py  Maximize  Restore  History

Download this file

295 lines (249 with data), 8.1 kB

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
"""
MySQLdb type conversion module
------------------------------
"""
<<<<<<< HEAD
from _mysql import NULL
from MySQLdb.constants import FIELD_TYPE, FLAG
from MySQLdb.times import datetime_to_sql, timedelta_to_sql, \
timedelta_or_None, datetime_or_None, date_or_None, \
mysql_timestamp_converter
from types import InstanceType
=======
from _mysql import string_literal, escape_sequence, escape_dict, escape, NULL
from MySQLdb.constants import FIELD_TYPE, FLAG
from MySQLdb.times import *
try:
from types import IntType, LongType, FloatType, NoneType, TupleType, ListType, DictType, InstanceType, \
StringType, UnicodeType, ObjectType, BooleanType, ClassType, TypeType
except ImportError:
# Python 3
long = int
IntType, LongType, FloatType, NoneType = int, long, float, type(None)
TupleType, ListType, DictType, InstanceType = tuple, list, dict, None
StringType, UnicodeType, ObjectType, BooleanType = bytes, str, object, bool
>>>>>>> MySQLdb-1.2
import array
import datetime
from decimal import Decimal
from itertools import izip
<<<<<<< HEAD
__revision__ = "$Revision$"[11:-2]
__author__ = "$Author$"[9:-2]
def bool_to_sql(connection, boolean):
"""Convert a Python bool to an SQL literal."""
return str(int(boolean))
def SET_to_Set(value):
"""Convert MySQL SET column to Python set."""
return set([ i for i in value.split(',') if i ])
=======
try:
set
except NameError:
from sets import Set as set
def Bool2Str(s, d): return str(int(s))
def Str2Set(s):
return set([ i for i in s.split(',') if i ])
def Set2Str(s, d):
return string_literal(','.join(s), d)
def Thing2Str(s, d):
"""Convert something into a string via str()."""
return str(s)
>>>>>>> MySQLdb-1.2
def Set_to_sql(connection, value):
"""Convert a Python set to an SQL literal."""
return connection.string_literal(','.join(value))
def object_to_sql(connection, obj):
"""Convert something into a string via str().
The result will not be quoted."""
return connection.escape_string(str(obj))
def unicode_to_sql(connection, value):
"""Convert a unicode object to a string using the connection encoding."""
return connection.string_literal(value.encode(connection.character_set_name()))
def float_to_sql(connection, value):
return '%.15g' % value
def None_to_sql(connection, value):
"""Convert None to NULL."""
return NULL # duh
def object_to_quoted_sql(connection, obj):
"""Convert something into a SQL string literal."""
if hasattr(obj, "__unicode__"):
return unicode_to_sql(connection, obj)
return connection.string_literal(str(obj))
def instance_to_sql(connection, obj):
"""Convert an Instance to a string representation. If the __str__()
method produces acceptable output, then you don't need to add the
class to conversions; it will be handled by the default
converter. If the exact class is not found in conv, it will use the
first class it can find for which obj is an instance.
"""
if obj.__class__ in conv:
return conv[obj.__class__](obj, conv)
classes = [ key for key in conv.keys()
if isinstance(obj, key) ]
if not classes:
return conv[types.StringType](obj, conv)
conv[obj.__class__] = conv[classes[0]]
return conv[classes[0]](obj, conv)
def array_to_sql(connection, obj):
return connection.string_literal(obj.tostring())
simple_type_encoders = {
int: object_to_sql,
long: object_to_sql,
float: float_to_sql,
type(None): None_to_sql,
unicode: unicode_to_sql,
object: instance_to_sql,
bool: bool_to_sql,
datetime.datetime: datetime_to_sql,
datetime.timedelta: timedelta_to_sql,
set: Set_to_sql,
str: object_to_quoted_sql, # default
}
<<<<<<< HEAD
# This is for MySQL column types that can be converted directly
# into Python types without having to look at metadata (flags,
# character sets, etc.). This should always be used as the last
# resort.
simple_field_decoders = {
=======
if o.__class__ in d:
return d[o.__class__](o, d)
cl = filter(lambda x,o=o:
type(x) is ClassType
and isinstance(o, x), d.keys())
if not cl and hasattr(types, 'ObjectType'):
cl = filter(lambda x,o=o:
type(x) is TypeType
and isinstance(o, x)
and d[x] is not Instance2Str,
d.keys())
if not cl:
return d[types.StringType](o,d)
d[o.__class__] = d[cl[0]]
return d[cl[0]](o, d)
def char_array(s):
return array.array('c', s)
def array2Str(o, d):
return Thing2Literal(o.tostring(), d)
conversions = {
IntType: Thing2Str,
LongType: Long2Int,
FloatType: Float2Str,
NoneType: None2NULL,
TupleType: escape_sequence,
ListType: escape_sequence,
DictType: escape_dict,
InstanceType: Instance2Str,
array.ArrayType: array2Str,
StringType: Thing2Literal, # default
UnicodeType: Unicode2Str,
ObjectType: Instance2Str,
BooleanType: Bool2Str,
DateTimeType: DateTime2literal,
DateTimeDeltaType: DateTimeDelta2literal,
set: Set2Str,
>>>>>>> MySQLdb-1.2
FIELD_TYPE.TINY: int,
FIELD_TYPE.SHORT: int,
FIELD_TYPE.LONG: int,
FIELD_TYPE.FLOAT: float,
FIELD_TYPE.DOUBLE: float,
<<<<<<< HEAD
FIELD_TYPE.DECIMAL: Decimal,
FIELD_TYPE.NEWDECIMAL: Decimal,
FIELD_TYPE.LONGLONG: int,
=======
FIELD_TYPE.DECIMAL: float,
FIELD_TYPE.NEWDECIMAL: float,
FIELD_TYPE.LONGLONG: long,
>>>>>>> MySQLdb-1.2
FIELD_TYPE.INT24: int,
FIELD_TYPE.YEAR: int,
FIELD_TYPE.SET: SET_to_Set,
FIELD_TYPE.TIMESTAMP: mysql_timestamp_converter,
<<<<<<< HEAD
FIELD_TYPE.DATETIME: datetime_or_None,
FIELD_TYPE.TIME: timedelta_or_None,
FIELD_TYPE.DATE: date_or_None,
}
# Decoder protocol
# Each decoder is passed a field object.
# The decoder returns a single value:
# * A callable that given an SQL value, returns a Python object.
# This can be as simple as int or str, etc. If the decoder
# returns None, this decoder will be ignored and the next decoder
# on the stack will be checked.
def default_decoder(field):
return str
def default_encoder(value):
return object_to_quoted_sql
def simple_decoder(field):
return simple_field_decoders.get(field.type, None)
def simple_encoder(value):
return simple_type_encoders.get(type(value), None)
character_types = [
FIELD_TYPE.BLOB,
FIELD_TYPE.STRING,
FIELD_TYPE.VAR_STRING,
FIELD_TYPE.VARCHAR,
]
def character_decoder(field):
if field.type not in character_types:
return None
if field.charsetnr == 63: # BINARY
return str
charset = field.result.connection.character_set_name()
def char_to_unicode(s):
if s is None:
return s
return s.decode(charset)
return char_to_unicode
default_decoders = [
character_decoder,
simple_decoder,
default_decoder,
]
default_encoders = [
simple_encoder,
default_encoder,
]
def get_codec(field, codecs):
for c in codecs:
func = c(field)
if func:
return func
# the default codec is guaranteed to work
def iter_row_decoder(decoders, row):
if row is None:
return None
return ( d(col) for d, col in izip(decoders, row) )
def tuple_row_decoder(decoders, row):
if row is None:
return None
return tuple(iter_row_decoder(decoders, row))
default_row_formatter = tuple_row_decoder
=======
FIELD_TYPE.DATETIME: DateTime_or_None,
FIELD_TYPE.TIME: TimeDelta_or_None,
FIELD_TYPE.DATE: Date_or_None,
FIELD_TYPE.BLOB: [
(FLAG.BINARY, str),
],
FIELD_TYPE.STRING: [
(FLAG.BINARY, str),
],
FIELD_TYPE.VAR_STRING: [
(FLAG.BINARY, str),
],
FIELD_TYPE.VARCHAR: [
(FLAG.BINARY, str),
],
}
try:
from decimal import Decimal
conversions[FIELD_TYPE.DECIMAL] = Decimal
conversions[FIELD_TYPE.NEWDECIMAL] = Decimal
except ImportError:
pass
>>>>>>> MySQLdb-1.2