Monthly Archives: February 2018

Ensure list only consists of strings

A nice little to remember for future use is when a list must consist of only strings:

def arrayOfStrings(l):
    nl = [s if isinstance(s, str) else str(s) for s in l]
    return(nl)

It looks like it could be quite Pythonic and isn’t necessarily how I would have written it if left to my own devices; I might be braver and try that sort of structure in other situations.

Alas, I don’t have a reference for where I grabbed this from but it’s more than likely from StackOverflow.

Advertisements

Dynamic (in-memory) zip file creation

With a recent project I wanted to offer a download of a bunch of files as a ZIP download to make redistribution. I started by thinking that this would involve creating a zip file on disk and then serving it to the client.

It doesn’t.

This method creates an in-memory zip image of a temporary directory in which the required file are created. The directory is deleted before the zip file is returned to the client. Note the use of send_file to return the content and how it allows us to specify the name of the downloaded file.

If running the app under uwsgi there are some precautions to take:

  • import the send_file module from flask (the default wsgi version is broken)
  • Include ‘wsgi-disable-file-wrapper’ in the uwsgi settings
import os
import io
import tempfile
import zipfile
from peewee import *

# A simple function to write a file in a given directory
def makeFile(dir, filename, data):
    fname = str(dir) + '/' + str(filename)
    fh = open(fname, "w")
    fh.write(data)
    fh.close()
    return True
"""
Function to create a zip file containing the application configuration files
To create the zip file, we need to create a memory file (BytesIO))
Params:
 row: Database row object containing columns for filename and
 content to be written in the form
 { {"file1": "Content for file 1}, {"file2": "Different content for file"} }
Returns:
 data: BytesIO object containing the zipped files
"""
def mkZipFile(row):
    zipdir = tempfile.mkdtemp(prefix='/tmp/')
    oldpath = os.getcwd()
    os.chdir(zipdir)

    jdata = json.loads(row)
    for conf in jdata:
        for f in conf:
            makeFile(zipdir, f, conf[f])

    # Create the in-memory zip image
    data = io.BytesIO()
    with zipfile.ZipFile(data, mode='w') as z:
        for fname in os.listdir("."):
            z.write(fname)
            os.unlink(fname)
    data.seek(0)

    os.chdir(oldpath)
    os.rmdir(zipdir)
    return data

The route takes the form

"""
Route to create a zip file containing all the required data files
Params:
 id: the numeric id of the record in the database
Returns:
 ZIP: file in ZIP format containing the download files
"""
@app.route('/download/<int:id>.zip', methods=['GET'])
def showZip(id):
    rs = db_table.select(db_table.data_column).where(db_table.id == id).get()

    zfile = mkZipFile(rs)
    if not zfile:
        return Response(response="Invalid zip file", status="400")

    return send_file(
        zfile,
        mimetype='application/zip',
        as_attachment=True,
        attachment_filename='file_bundle.zip')

References

Form input validation

A technique for handling form input validation for running an arbitrary number of tests against the submitted data.

Probably doesn’t work with multiple select options

A database table to store all the possible input fields

CREATE TABLE form_items (
 id INT PRIMARY KEY NOT NULL,
 name CHAR(64) NOT NULL,
 type CHAR(6) NOT NULL DEFAULT 'string',
 updated_by CHAR(32),
 updated_at DATETIME
);

There might be additional columns to identify the page or section that an input value is found on.

Then we have a table to identify the validation functions available

CREATE TABLE column_validations (
 id INT PRIMARY KEY NOT NULL,
 name CHAR(32) UNIQUE NOT NULL,
 updated_by CHAR(32),
 updated_at DATETIME
);

There might be other columns to identify the severity and whether to continue or not. in the event of failure

Then we have a many-to-many map of validators to columns

CREATE TABLE column_validation_form_items(
 column_validation_id INT REFERENCES column_validations,
 form_item_id INT REFERENCES form_items
);

A PeeWee model might look like

class BaseModel(Model):
  with app.app_context():
    class Meta:
    database = SqliteDatabase(app.config['DATABASE'])

class form_items(BaseModel):
  id = IntegerField(primary_key=True)
  name = CharField(unique=False)
  type = CharField(default='string')
  updated_by = CharField()
  updated_at = DateTimeField(default=datetime.datetime.now())

class column_validations(BaseModel):
  id = IntegerField(primary_key=True)
  name = CharField(unique=True)
  updated_by = CharField()
  updated_at = DateTimeField(default=datetime.datetime.now())

class column_validation_config_columns(BaseModel):
  column_validation = ForeignKeyField(column_validations)
  form_item = ForeignKeyField(form_items)
  class Meta:
    primary_key = CompositeKey('column_validation', 'form_item')

Sample function to run through the form value validations:

# Take the name of the form field and the submitted value 
 def cleanInput(name, value):
  # Array to map the validator names from the database to the function
  # Not sure if it's possible in Python to do the equivalent of a callback
  # without this type of referencing

  validators = {
    "valid_ip_address": valid_ip_address,
    "valid_ip_range":valid_ip_range,
    "valid_url": valid_url,
    "true_or_false": true_or_false,
    "yes_or_no": yes_or_no,
    "is_integer": is_integer}

  rs = (column_validations.select(column_validations.name)
    .join(column_validation_form_items, JOIN.INNER)
    .join(form_items, JOIN.INNER)
    .where(form_items.name == name))
  # Provide a failsafe for when no validator is found
  # Try each of the validators when the input value in non-blank
  if len(value) > 0:
    status = True
    msg = ""
    for proc in rs:
      func = proc.name
      if func in validators:
        (status, msg) = validators[func](value)
        if not status:
          return (False, "Error: %s validation failure: %s" % (name, msg))
  else: # Empty value, just return True
    return(True, value)
  return (True, msg)

Some sample validation functions. Note that these return a tuple to indicate the status along with an error message. If the status is false, dsiplay the error message and redisplay the form; otherwise continue.

"""
Simple function to validate an IPv4 address
This needs to import socket but note how it avoids regex patterns
Params:
 addr: string containing IP address
Returns:
 (True|False, addr|errmsg)
"""
def valid_ip_address(addr):
  try:
    socket.inet_pton(socket.AF_INET, addr)
    return (True, addr)
  except socket.error:
    return (False, ("Invalid IP address, %s." % addr))

"""
Simple function to validate an IPv4 address range in CIDR format
Pattern match taken from http://blog.markhatton.co.uk/2011/03/15/regular-expressions-for-ip-addresses-cidr-ranges-and-hostnames/
Params:
 ip_range: string containing IP address range
Returns:
 (True|False, ip_range|errmsg)
"""
def valid_ip_range(ip_range):
  patt = "^(([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.){3}([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])(\/([0-9]|[1-2][0-9]|3[0-2]))$"
  if re.search(patt, ip_range):
    return (True, ip_range)
  return(False, ("Invalid IPv4 range: %s" % ip_range))

"""
Function to validate a submitted website address
Params:
 addr: String containing a URL
Returns:
 (True|False, url|errmsg)
"""
def valid_url(url):
  if len(url) > 2083:
    return(False, ("Invalid url, %s: too long." % url))
  patt = "(http|https):\/\/(\w+:{0,1}\w*@)?(\S+)(:[0-9]+)?(\/|\/([\w#!:.?+=&%@!\-\/]))?"
  if re.search(patt, url, re.IGNORECASE):
    return (True, url)
  return (False, ("Invalid url, %s." % url))

def true_or_false(value):
  for patt in ('true', 'false', '1', '0'):
    if re.search(patt, value, re.IGNORECASE):
      return (True, value)
  return(False, ("%s must be either true or false, 0 or 1" % value))

def yes_or_no(value):
  for patt in ('yes', 'no', '1', '0'):
    if re.search(patt, value, re.IGNORECASE):
      return (True, value)
  return(False, ("%s must be either true or false, 0 or 1" % value))

"""
Simple function to return whether a value is an Integer
Params:
 value: String representation of an integer
Returns:
 (True|False, value|errmsg)
"""
def is_integer(value):
  if isinstance(int(value), int):
    return (True, value)
  return (False, ("%s is not an integer" % value))

References

HTTP-based DB backup

If working with  Python Flask application in a closed environment it can often be handy to be able to grab a copy of the database behind the application.

Here are a few simple functions that can be used to provide a URL which will yield a (very simple) SQL export of the database schema and table contents.

"""
Simple route that captures a request to backup the database. It will
create a file containing
* the SQL commands to delete existing tables
* SQL schema commands for the database
* SQL insert operations for all the records in all the tables
params:
 None
Returns:
 File: contains database schema and rows from all the tables
"""
@app.route('/myapp/db_backup', methods=['GET'])
def myAppDbBackup():
    db_bkp = ""
    for tbl in database.get_tables():
        db_bkp = db_bkp + backupDbTable(tbl)
        if db_bkp == "":
            return Response(status="409 Empty backup")
     h = Headers()
     h.add('Content-Type', 'text/plain')
     h.add('Content-Disposition', 'attachment', filename='my_lovely_db_backup.sql')
     return Response(db_bkp, mimetype='application/x-sql', headers=h)

"""
Table backup function
Params:
 table: The name of the table to db_backup
Returns:
 String: All the table records as insert statements
"""
def backupDbTable(table):
    bkpstr = ""
    values = ""
    inscol = ""
    for col in database.get_columns(table):
        inscol = inscol + (col.name if not inscol else (", %s" % col.name))
        insert = ("INSERT INTO %s (%s) VALUES" % (table, inscol))

    bkpstr = ("%s;\n" % getTableSchema(table))
    valstr = ""
    qry = ("SELECT %s FROM %s" % (inscol, table))
    rs = database.execute_sql(qry)
    for row in rs:
        valstr = "', '".join(flattenArray(row))
        values = values + ("('" + valstr + "')" if not values else ",\n('" + valstr + "')")
    bkpstr = bkpstr + insert + values + ";\n"
    return bkpstr

"""
Function to retrieve the statement to create a table in the database
Params:
 table: The name of the table being backed up
Returns:
 String: The CREATE TABLE statement for table
"""
def getTableSchema(table):
    schema_qry = ("SELECT sql FROM sqlite_master WHERE type='table' AND name = ?")
    rs = database.execute_sql(schema_qry, [table])
    for schema in rs:
        return schema
def flattenArray(ary):
    flat = []
    for el in ary:
    if isinstance(el, list):
        flat += flattenArray(el)
    else:
        flat.append(el)
    return(flat)

It is always worth remembering that this is an intrinsically insecure operation but a small measure of protection could come from putting the app behind an Apache httpd or nginx reverse proxy with an SSL cert and user authentication. But best to limit to a closed environment.

Python Flask error handlers

Writing general error handlers for Python Flask applications is very straightdorward

@app.errorhandler(sqlite3.OperationalError)
def sqlite3_op_error(err):
 return render_template('errors/500.html', errmsg=('Database error: %s' % err))
@app.errorhandler(OperationalError)
def peewee_op_error(err):
 return render_template('errors/500.html', errmsg=('ORM error: %s' % err))
@app.errorhandler(NameError)
def name_error(err):
 return render_template('errors/500.html', errmsg=('Application error: %s' % err))
@app.errorhandler(404)
def page_not_found(err):
 return render_template('errors/404.html', errmsg=(err))
@app.errorhandler(DoesNotExist)
def query_does_not_exist(err):
 return render_template('errors/409.html', errmsg=('Query error: %s' % err))
@app.errorhandler(IntegrityError)
def query_integrity_error(err):
 return render_template('errors/409.html', errmsg=('Integrify error: %s' % err))

A template might take the form

{%- extends "error_layout.html" -%}
{%- block fourzeronine -%}
{{ super() }}

<p>
A database query error has occurred: {{ errmsg }}
</p>
<p>Back to <a href="/">index</a>.
{%- endblock -%}

 

The new submission form

This post will detail the processing of a new submission form that contains a multiple select form element.

A widget is described as a set of versioned components where a component version could be used in many widgets:
components:
id: INT
name: STRING
description: TEXT

component_versions:
id: INT
name: STRING
component_id: INT

widgets:
id: INT
name: STRING
description: TEXT

component_version_widgets:
component_id: IN
widget_id: INT

A simple route to handle a request to create  new item could be something like:

@app.route('/widgets/new', methods=['GET', 'POST'])
def newWidget():
    # Wasn't able to find any real documentation or examples of how to use the concat
    # function but sort of figured it out by looking at the PeeWee source code. This
    # uses a local function to create a list of list of lists of the items to be
    # displayed in the multiple select element in the form
    # [ [1, <component_name>-<component_version> ], [2, <component_name>-<component_version> ], ... ]
    # that is hopefully easier to process in the template 
    cvids = dict2lol(component_versions
            .select(component_versions.id.alias("cmpntid"),
             components.name.concat("-").concat(component_versions.name).alias("cmpntname"))
             .join(components, JOIN.INNER,
             on=component_versions.component_id == components.id)
             .dicts(), 'cmpntid', 'cmpntname')
    if request.method == 'POST':
        (status, msg) = validateWidget(request.form)
         # Use .getlist() to retrieve the multiple select items
         slctd = request.form.getlist('component_version_ids')
         if status:
             # Although auto-increment works with SQLite3 it didn't seem to work
             # when creating records with PeeWee
             nextId = widgets.select(fn.Max(widgets.id)+1).scalar()
             widgets.create(id=nextId,
                                         name=request.form['name'], \
                                         description=request.form['description'], \
                                         updated_by="config.admin", \
                                          updated_at=datetime.datetime.now())
             for cvid in slctd:
                  component_version_widget.create(widget_id=nextId, component_version_id=cvid)
                  logActivity('create', 'component_version_widgets', nextId, ("component_version_id: %s" % pvid))

             logActivity('create', 'widgets', nextId, request.form['name'])
              flash(("Saved new widget, update for %s." % request.form['name']))
              return redirect('/widgets')
        else:
             flash("Widget creation failed: %s." % msg)
              slctd = []
              for pv in request.form.getlist('component_version_ids'):
                 slctd.append(int(pv))
                 wdgt = {"description": request.form['description'],
                           "name": request.form['name'],
                           "component_version_ids": slctd}
    else:
        wdgt = {'name': 'Widget name', 'description': 'Brief description', "component_version_ids": []}
    return render_template('views/widgets/new.html', wdgt=wdgt, cvids=cvids, req=request)

Notes:

  • cvids: a list of lists of select options in the form [id, name]. Possibly being
    lazy but this is a bit simpler than trying to process what might come out of a PeeWee
    result set
  • slctd: a list of selected options from the select form; either from the database or the
    submitted form data. When creating this list from the submitted form we need to cast the id values as int’s or they won’t be recognised by the template inline conditional.
  • wdgt: dict containing the values for the template form items
  • formatting python code in the edit window is quite tricky so no guarantees that it compiles cleanly

The template to display this might look a bit like,

<form name="widgets" method="POST" action="{{ req.path }}">

<p>
Widget Widget name Description {{ rel['description'] }}
Component versions:
{%- for opt in cvids-%} {{ opt[1] }} {%- endfor -%}
</p>
<input type="submit" name="submit" value="Save widget"> </form>

The only real point to note in here is the method by which the ‘selected’ attribute is added to the option in the select list: the inline if.

References

The edit form submission

This post will detail the processing of an edit submission form that contains a multiple select form element.

A widget is described as a set of versioned components where a component version could be used in many widgets:

components:
id: INT
name: STRING
description: TEXT

component_versions:
id: INT
name: STRING
component_id: INT

widgets:
id: INT
name: STRING
description: TEXT

component_version_widgets:
component_id: INT
widget_id: INT

@app.route('/widgets/<int:id>/edit', methods=['GET', 'POST'])
def editWidget(id):
    # Get the component names and versions and format them for easy display
    # in the template
    cvids = dict2lol(component_versions
                .select(component_versions.id.alias("cmpntverid"),
                 components.name.concat("-").concat(component_versions.name).alias("cmpntname"))
                 .join(components, JOIN.INNER,
                 on=component_versions.component_id == components.id)
                 .dicts(), 'cmpntverid', 'cmpntname')

    if request.method == 'POST':
         (status, msg) = validateWidget(request.form)
         slctd = request.form.getlist('component_version_ids')
         if status:
             savewdgt = widgets(id=id,
                       name=request.form['name'], \
                       description=request.form['description'], \
                       updated_by="config.admin", \
                       updated_at=datetime.datetime.now())
             savewdgt.save()
            # Delete the current component versions for this widget and save
             # the values submitted in the form
             qry = component_version_widgets.delete().where(component_version_widgets.widget_id == id)
             qry.execute()
             for cvid in slctd:
                 component_version_widgets.create(widget_id=id, component_version_id=cvid)
 logActivity('update', 'component_version_widgets', id, ("component_version_id: %s" % cvid))

                logActivity('update', 'widgets', id, request.form['name'])
            flash(("Saved update for widget, %s." % request.form['name']))
            return redirect('/widgets')
        else:
             flash(("widget update failed: %s." % msg))
             slctd = []
             for cv in request.form.getlist('component_version_ids'):
                slctd.append(int(cv))
                wdgt = {"description": request.form['description'], "name": request.form['name'],
 "component_version_ids": slctd}
    else:
        # Display the initial edit form with details from the database
         # Get the details of the item to be edited 
         try:
             rs = widgets.select().where(widgets.id == id).get()
         except DoesNotExist:
             flash(("Cannot locate widget record with id = %s" % id))
         return redirect('/widgets')


    slctd = []
    for cvid in component_version_widgets \
           .select(component_version_widgets.component_version_id) \
           .where(component_version_widgets.widget_id == id):
        slctd.append(cvid.component_version_id)
        wdgt = {'name': rs.name, 'description': rs.description, 'component_version_ids': slctd}

    return render_template('views/widgets/edit.html', wdgt=wdgt, cvids=cvids, req=request)

Notes:

  • cvids: a list of lists of select options in the form [id, name]. Possibly being lazy but this is a bit simpler than trying to process what might come out of a PeeWee result set
  • slctd: a list of selected options from the select form; either from the database or the
    submitted form data. When creating this list from the submitted form we need to cast the id values as int’s or they won’t be recognised by the template inline conditional.
  • wdgt: dict containing the values for the template form items

The dict2lol function is simply:

def dict2lol(rs, val, text):
    lol = []
    for row in rs:
        lol.append([row[val], row[text]])
    return(lol)

The template to display this might look a bit like,

<form name="widgets" method="POST" action="{{ req.path }}">
<p>
Widget Widget name Description {{ rel['description'] }}
Component versions:
{%- for opt in cvids-%} {{ opt[1] }} {%- endfor -%}
</p>
<input type="submit" name="submit" value="Save widget"> </form>

The only real point to note in here is the method by which the ‘selected’
attribute is added to the option in the select list: the inline if.

References