Useful MySQL Routines
From CodeCodex
Following are some useful routines for interfacing to MySQL.
Contents
Python[edit]
The following routines work with connection objects created with the MySQLdb module.
Queries[edit]
First of all, a generator to allow you to iterate over the records returned from a query:
def SQLIter(Conn, Cmd, Values = None, MapFn = None) : """generator which executes Cmd with Values in a new cursor on Conn, yielding the rows one at a time, optionally mapped through function MapFn.""" if MapFn == None : MapFn = lambda x : x #end if Cursor = Conn.cursor() Cursor.execute(Cmd, Values) while True : NextRow = Cursor.fetchone() if NextRow == None : break #end if yield MapFn(NextRow) #end while Cursor.close() #end SQLIter
For example, to get a list of values of one particular field:
TheList = list \ ( SQLIter ( Conn = TheConn, Cmd = "select name from people where town = %s", Values = ["mytown"], MapFn = lambda x : x[0] ) )
Building on the above, the following routine allows you to pass a list of field names to return, and results in iterating over a sequence of dictionaries, each one mapping those field names to corresponding values for each matching record in turn:
def GetEachRecord(Conn, TableName, Fields, Condition = None, Values = None, Extra = None) : """generator which does an SQL query which can return 0 or more result rows, yielding each record in turn as a mapping from field name to field value. TableName can be a single table name, or a comma-separated list of names for a join. Extra allows specification of order/group clauses.""" FieldNames = [] Alias = re.compile(r"^.+\s+as\s+(\S+)$", re.IGNORECASE) for Field in Fields : Match = Alias.search(Field) if Match != None : FieldNames.append(Match.group(1)) else : FieldNames.append(Field) #end if #end for Cmd = \ ( "select " + ", ".join(Fields) + " from " + TableName ) if Condition != None : Cmd += " where " + Condition #end if if Extra != None : Cmd += " " + Extra #end if return SQLIter \ ( Conn = Conn, Cmd = Cmd, Values = Values, MapFn = lambda Row : dict(zip(FieldNames, Row)) ) #end GetEachRecord
Additional feature of the above routine: each field name can be of the form “field as alias”, and the alias will be used instead of field for the dictionary key. This is handy when doing a selection from multiple tables, or where field is an expression, to allow for shorter references.
Example:
for \ Item \ in \ GetEachRecord \ ( Conn = MyConn, TableName = "people", Condition = "town = %s", Values = ["mytown"], Fields = ["name", "address"] ) \ : print "name: %s, address: %s\n" % (Item["name"], Item["address"]) #end for
Quoting[edit]
Sometimes the auto-quoting provided by the execute method isn’t enough; for example, when dynamically constructing complicated queries. In such situations, the following routine will turn any Python value into a properly-quoted MySQL string:
def SQLString(Str) : """returns a MySQL string literal which evaluates to Str. Needed for those times when MySQLdb's automatic quoting isn't good enough.""" Result = [] for Ch in str(Str) : if Ch == "\0" : Ch = "\\0" elif Ch == "\010" : Ch = "\\b" elif Ch == "\011" : Ch = "\\t" elif Ch == "\012" : Ch = "\\n" elif Ch == "\015" : Ch = "\\r" elif Ch == "\032" : Ch = "\\z" elif Ch == "'" or Ch == "\"" or Ch == "\\" : Ch = "\\" + Ch #end if Result.append(Ch) #end for return "\"" + "".join(Result) + "\"" #end SQLString
The following uses the above to construct a parenthesized list of correctly-quoted values, suitable for use with the MySQL “in” operator:
def SQLStringList(TheList) : """returns a MySQL list containing the items of TheList, suitable for use in an "in" clause.""" return \ "(" + ", ".join(SQLString(Str) for Str in TheList) + ")" #end SQLStringList
For example:
Towns = ["mytown", "yourtown", "theirtown"] MyCursor.execute \ ( "select name from people where town in " + SQLStringList(Towns) )
Another useful routine escapes MySQL’s wildcard characters (“%” and “_”):
def EscapeSQLWild(Str) : """escapes MySQL pattern wildcards in Str.""" Result = [] for Ch in str(Str) : if Ch == "\\" or Ch == "%" or Ch == "_" : Result.append("\\") #end if Result.append(Ch) #end for return "".join(Result) #end EscapeSQLWild
This is useful for doing substring matches against user input, blocking special interpretation of wildcard characters entered by the user, for example:
MyCursor.execute \ ( "select name from people where name like " + SQLString("%" + EscapeSQLWild(WhatUserTyped) + "%") ) # find all matching names
Bulk Insertion[edit]
If you’re inserting a large number of records into a MySQL table, here are a couple of simple tips:
- Insert multiple records by specifying more than one set of values in a single insert statement:
insert into table (fieldnames) values (values1), (values2) ...
. You can usefully create 100 or more records with a single insert statement. - Use the “insert delayed” form to tell MySQL to further batch up the inserts (but note the limitations on table types).
The following BulkInserter class encapsulates these tips into a convenient form:
class BulkInserter : """bulk insertion of lots of records into an SQL table.""" def __init__(self, Conn, TableName, FieldNames, IgnoreDuplicates) : """Conn is the MySQL connection to use; TableName is the name of the table into which to insert records; FieldNames is the list of field names; and IgnoreDuplicates is True to ignore duplicate insertions, False to report an error.""" self.sql = Conn self.cursor = None self.TableName = TableName self.FieldNames = tuple(FieldNames) self.IgnoreDuplicates = IgnoreDuplicates self.InsertLimit = 500 # could even be larger self.FieldValues = [] #end __init__ def AddRecord(self, FieldValues) : """adds another record to the table. FieldValues is the list of field values, corresponding in order to the previously-specified FieldNames.""" if len(self.FieldValues) == self.InsertLimit : self.DoneInsert() #end if if self.cursor == None : self.cursor = self.sql.cursor() #end if ThisRecord = [] if type(FieldValues) == dict : for FieldName in self.FieldNames : FieldValue = FieldValues[FieldName] if FieldValue != None : ThisRecord.append(SQLString(FieldValue)) else : ThisRecord.append("null") #end if #end for else : for FieldValue in FieldValues : if FieldValue != None : ThisRecord.append(SQLString(FieldValue)) else : ThisRecord.append("null") #end if #end for #end if self.FieldValues.append(ThisRecord) #end AddRecord def DoneInsert(self) : """Call this after the last AddRecord call to make sure all insertions have been flushed to the table.""" if len(self.FieldValues) != 0 : Insert = "insert delayed" if self.IgnoreDuplicates : Insert += " ignore" #end if Insert += " into " + self.TableName + " (" + ", ".join(self.FieldNames) + ") values" FirstInsert = True for ThisRecord in self.FieldValues : if FirstInsert : FirstInsert = False else : Insert += "," #end if Insert += " (" + ", ".join(ThisRecord) + ")" #end for self.cursor.execute(Insert) self.cursor.close() self.cursor = None self.FieldValues = [] #end if #end DoneInsert #end BulkInserter
For example, this class could be used very simply as follows:
Insert = BulkInserter \ ( Conn = MyConn, TableName = "people", FieldNames = ["name", "address"], IgnoreDuplicates = False ) for Line in open("people.txt", "r") : # assume each line in file has name and address separated by tab Items = line.rstrip("\n").split("\t", 1) Insert.AddRecord(Items) #end for Insert.DoneInsert()