Useful MySQL Routines

From CodeCodex

Following are some useful routines for interfacing to MySQL.

Python[edit]

The following routines work with connection objects created with the MySQLdb module.

Queries[edit]

First of all, a generator to allow you to iterate over the records returned from a query:

def SQLIter(Conn, Cmd, Values = None, MapFn = None) :
    """generator which executes Cmd with Values in a new cursor on Conn,
    yielding the rows one at a time, optionally mapped through function MapFn."""
    if MapFn == None :
        MapFn = lambda x : x
    #end if
    Cursor = Conn.cursor()
    Cursor.execute(Cmd, Values)
    while True :
        NextRow = Cursor.fetchone()
        if NextRow == None :
            break
        #end if
        yield MapFn(NextRow)
    #end while
    Cursor.close()
#end SQLIter

For example, to get a list of values of one particular field:

TheList = list \
  (
    SQLIter
      (
        Conn = TheConn,
        Cmd = "select name from people where town = %s",
        Values = ["mytown"],
        MapFn = lambda x : x[0]
      )
  )

Building on the above, the following routine allows you to pass a list of field names to return, and results in iterating over a sequence of dictionaries, each one mapping those field names to corresponding values for each matching record in turn:

def GetEachRecord(Conn, TableName, Fields, Condition = None, Values = None, Extra = None) :
    """generator which does an SQL query which can return 0 or more
    result rows, yielding each record in turn as a mapping from
    field name to field value. TableName can be a single table name,
    or a comma-separated list of names for a join. Extra allows
    specification of order/group clauses."""
    FieldNames = []
    Alias = re.compile(r"^.+\s+as\s+(\S+)$", re.IGNORECASE)
    for Field in Fields :
        Match = Alias.search(Field)
        if Match != None :
            FieldNames.append(Match.group(1))
        else :
            FieldNames.append(Field)
        #end if
    #end for
    Cmd = \
      (
            "select "
        +
            ", ".join(Fields)
        +
            " from "
        +
            TableName
      )
    if Condition != None :
        Cmd += " where " + Condition
    #end if
    if Extra != None :
        Cmd += " " + Extra
    #end if
    return SQLIter \
      (
        Conn = Conn,
        Cmd = Cmd,
        Values = Values,
        MapFn = lambda Row : dict(zip(FieldNames, Row))
      )
#end GetEachRecord

Additional feature of the above routine: each field name can be of the form “field as alias”, and the alias will be used instead of field for the dictionary key. This is handy when doing a selection from multiple tables, or where field is an expression, to allow for shorter references.

Example:

for \
        Item \
    in \
        GetEachRecord \
          (
            Conn = MyConn,
            TableName = "people",
            Condition = "town = %s",
            Values = ["mytown"],
            Fields = ["name", "address"]
          ) \
:
    print "name: %s, address: %s\n" % (Item["name"], Item["address"])
#end for

Quoting[edit]

Sometimes the auto-quoting provided by the execute method isn’t enough; for example, when dynamically constructing complicated queries. In such situations, the following routine will turn any Python value into a properly-quoted MySQL string:

def SQLString(Str) :
    """returns a MySQL string literal which evaluates to Str. Needed
    for those times when MySQLdb's automatic quoting isn't good enough."""
    Result = []
    for Ch in str(Str) :
        if Ch == "\0" :
            Ch = "\\0"
        elif Ch == "\010" :
            Ch = "\\b"
        elif Ch == "\011" :
            Ch = "\\t"
        elif Ch == "\012" :
            Ch = "\\n"
        elif Ch == "\015" :
            Ch = "\\r"
        elif Ch == "\032" :
            Ch = "\\z"
        elif Ch == "'" or Ch == "\"" or Ch == "\\" :
            Ch = "\\" + Ch
        #end if
        Result.append(Ch)
    #end for
    return "\"" + "".join(Result) + "\""
#end SQLString

The following uses the above to construct a parenthesized list of correctly-quoted values, suitable for use with the MySQL “in” operator:

def SQLStringList(TheList) :
    """returns a MySQL list containing the items of TheList, suitable
    for use in an "in" clause."""
    return \
        "(" + ", ".join(SQLString(Str) for Str in TheList) + ")"
#end SQLStringList

For example:

Towns = ["mytown", "yourtown", "theirtown"]
MyCursor.execute \
  (
        "select name from people where town in "
    +
        SQLStringList(Towns)
  )

Another useful routine escapes MySQL’s wildcard characters (“%” and “_”):

def EscapeSQLWild(Str) :
    """escapes MySQL pattern wildcards in Str."""
    Result = []
    for Ch in str(Str) :
        if Ch == "\\" or Ch == "%" or Ch == "_" :
            Result.append("\\")
        #end if
        Result.append(Ch)
    #end for
    return "".join(Result)
#end EscapeSQLWild

This is useful for doing substring matches against user input, blocking special interpretation of wildcard characters entered by the user, for example:

MyCursor.execute \
  (
        "select name from people where name like "
    +
        SQLString("%" + EscapeSQLWild(WhatUserTyped) + "%")
  ) # find all matching names

Bulk Insertion[edit]

If you’re inserting a large number of records into a MySQL table, here are a couple of simple tips:

  • Insert multiple records by specifying more than one set of values in a single insert statement: insert into table (fieldnames) values (values1), (values2) .... You can usefully create 100 or more records with a single insert statement.
  • Use the “insert delayed” form to tell MySQL to further batch up the inserts (but note the limitations on table types).

The following BulkInserter class encapsulates these tips into a convenient form:

class BulkInserter :
    """bulk insertion of lots of records into an SQL table."""

    def __init__(self, Conn, TableName, FieldNames, IgnoreDuplicates) :
        """Conn is the MySQL connection to use; TableName is the name of the
        table into which to insert records; FieldNames is the list of field names;
        and IgnoreDuplicates is True to ignore duplicate insertions, False to
        report an error."""
        self.sql = Conn
        self.cursor = None
        self.TableName = TableName
        self.FieldNames = tuple(FieldNames)
        self.IgnoreDuplicates = IgnoreDuplicates
        self.InsertLimit = 500 # could even be larger
        self.FieldValues = []
    #end __init__

    def AddRecord(self, FieldValues) :
        """adds another record to the table. FieldValues is the list of field
        values, corresponding in order to the previously-specified FieldNames."""
        if len(self.FieldValues) == self.InsertLimit :
            self.DoneInsert()
        #end if
        if self.cursor == None :
            self.cursor = self.sql.cursor()
        #end if
        ThisRecord = []
        if type(FieldValues) == dict :
            for FieldName in self.FieldNames :
                FieldValue = FieldValues[FieldName]
                if FieldValue != None :
                    ThisRecord.append(SQLString(FieldValue))
                else :
                    ThisRecord.append("null")
                #end if
            #end for
        else :
            for FieldValue in FieldValues :
                if FieldValue != None :
                    ThisRecord.append(SQLString(FieldValue))
                else :
                    ThisRecord.append("null")
                #end if
            #end for
        #end if
        self.FieldValues.append(ThisRecord)
    #end AddRecord

    def DoneInsert(self) :
        """Call this after the last AddRecord call to make sure all insertions
        have been flushed to the table."""
        if len(self.FieldValues) != 0 :
            Insert = "insert delayed"
            if self.IgnoreDuplicates :
                Insert += " ignore"
            #end if
            Insert += " into " + self.TableName + " (" + ", ".join(self.FieldNames) + ") values"
            FirstInsert = True
            for ThisRecord in self.FieldValues :
                if FirstInsert :
                    FirstInsert = False
                else :
                    Insert += ","
                #end if
                Insert += " (" + ", ".join(ThisRecord) + ")"
            #end for
            self.cursor.execute(Insert)
            self.cursor.close()
            self.cursor = None
            self.FieldValues = []
        #end if
    #end DoneInsert

#end BulkInserter

For example, this class could be used very simply as follows:

Insert = BulkInserter \
  (
    Conn = MyConn,
    TableName = "people",
    FieldNames = ["name", "address"],
    IgnoreDuplicates = False
  )
for Line in open("people.txt", "r") :
    # assume each line in file has name and address separated by tab
    Items = line.rstrip("\n").split("\t", 1)
    Insert.AddRecord(Items)
#end for
Insert.DoneInsert()