Wednesday, August 29, 2012

Last to Cross the Finish Line: Part Two

Recently, my colleague +Fred Sauer and I gave a tech talk called "Last Across the Finish Line: Asynchronous Tasks with App Engine". This is part two in a three part series where I will share our learnings and give some helpful references to the App Engine documentation.

Check out the previous post if you haven't already. In this section, we'll cover the two WSGI handlers in main.py serving requests for our application and the client side code that communicates with our application.

Imports

Before defining the handlers, let's first review the imports:
import json

from google.appengine.api import channel
from google.appengine.api import users
from google.appengine.ext.webapp.util import login_required
import webapp2
from webapp2_extras import jinja2

from display import RandomRowColumnOrdering
from display import SendColor
from models import PopulateBatch
We import json for serialization of messages. Specific to App Engine, we import channel to use the Channel API, users and login_required for authenticating users within a request, webapp2 for creating WSGI Handlers and jinja2 for templating.

Finally, we import four functions from the two other modules defined within our project. From the display module, we import the SendColor function that we explored in part one and the RandomRowColumnOrdering function, which generates all possible row, column pairs in a random order. From the as of yet undiscussed models module we import the PopulateBatch function, which takes a session ID and a batch of work to be done and spawns workers to carry out the batch of work.

Handlers

This module defines two handlers: the main page for the user interface and an AJAX handler which will begin spawning the workers.

For the main page we use jinja2 templates to render from the template main.html in the templates folder:
class MainPage(webapp2.RequestHandler):
  def RenderResponse(self, template, **context):
    jinja2_renderer = jinja2.get_jinja2(app=self.app)
    rendered_value = jinja2_renderer.render_template(template, **context)
    self.response.write(rendered_value)
  @login_required
  def get(self):
    user_id = users.get_current_user().user_id()
    token = channel.create_channel(user_id)
    self.RenderResponse('main.html', token=token, table_id='pixels',
                        rows=8, columns=8)
In get — the actual handler serving the GET request from the browser — we use the login_required decorator to make sure the user is signed in, and then create a channel for message passing using the ID of the signed in user. The template takes an HTML ID, rows and columns to create an HTML table as the "quilt" that the user will see. We pass the created token for the channel, an HTML ID for the table and the rows and columns to the template by simply specifying them as keyword arguments.

For the handler which will spawn the workers, we use RandomRowColumnOrdering to generate row, column pairs. Using each pair along with the SendColor function and the user ID (as a proxy for session ID) for message passing, we add a unit of work to the batch
class BeginWork(webapp2.RequestHandler):
  # Can't use login_required decorator here because it is not
  # supported for POST requests
  def post(self):
    response = {'batch_populated': False}
    try:
      # Will raise an AttributeError if no current user
      user_id = users.get_current_user().user_id()
      # TODO: return 400 if not logged in 
      work = []
      for row, column in RandomRowColumnOrdering(8, 8):
        args = (row, column, user_id)
        work.append((SendColor, args, {}))  # No keyword args
      PopulateBatch(user_id, work)
      response['batch_populated'] = True
    except:
      # TODO: Consider logging traceback.format_exception(*sys.exc_info()) here
      pass
    self.response.write(json.dumps(response))
Finally, for routing applications within our app, we define:
app = webapp2.WSGIApplication([('/begin-work', BeginWork),
                               ('/', MainPage)],
                              debug=True)
and specify
handlers:
- url: /.*
  script: main.app
in app.yaml; to use WSGI apps, the App Engine runtime must be python27.

Client Side Javascript and jQuery

In the template main.html we use jQuery to make AJAX requests and manage the CSS for each square in our "quilt". We also define some other Javascript functions for interacting with the App Engine Channel API. In the HTML <head> element we load the Channel Javascript API, and in the <body> element we open a channel using the {{ token }} passed in to the template:
<head>
  <script src="/_ah/channel/jsapi"></script>
</head>
<body>
  <script type="text/javascript">
    channel = new goog.appengine.Channel('{{ token }}');
    socket = channel.open();
    socket.onerror = function() { console.log('Socket error'); };
    socket.onclose = function() { console.log('Socket closed'); };
  </script>
</body>
In addition to onerror and onclose, we define more complex functions for the onopen and onmessage callbacks.

First, when the socket has been opened, we send a POST request to /begin-work to signal that the channel is ready for communication. If the response indicates that the batch of workers has been initialized successfully, we call a method setStatus which will reveal the progress spinner:
socket.onopen = function() {
  $.post('/begin-work', function(data) {
    var response = JSON.parse(data);
    if (response.batch_populated) {
      setStatus('Loading began');
    }
  });
}
As we defined in part one, each SendColor worker sends back a message along the channel representing a row, column pair and a color. On message receipt, we use these messages to set the background color of the corresponding square to the color provided:
socket.onmessage = function(msg) {
  var response = JSON.parse(msg.data);
  var squareIndex = 8*response.row + response.column;
  var squareId = '#square' + squareIndex.toString();
  $(squareId).css('background-color', response.color);
}
As you can see from squareId, each square in the table generated by the template has an HMTL ID so we can specifically target it.

Next...

In the final post, we'll define the PopulateBatch function and explore the ndb models and Task Queue operations that make it work.

Monday, August 27, 2012

Last to Cross the Finish Line: Part One

Recently, my colleague +Fred Sauer and I gave a tech talk called "Last Across the Finish Line: Asynchronous Tasks with App Engine". This is part one in a three part series where I will share our learnings and give some helpful references to the App Engine documentation.

Intro

Before I dive in, a quick overview of our approach:
  • "Fan out; Fan in" First spread tasks over independent workers; then gather the results back together
  • Use task queues to perform background work in parallel
    • Tasks have built-in retries
    • Can respond quickly to the client, making UI more responsive
  • Operate asynchronously when individual tasks can be executed independently, hence can be run concurrently
    • If tasks are too work intensive to run synchronously, (attempt to) break work into small independent pieces
  • Break work into smaller tasks, for example:
    • rendering media (sounds, images, video)
    • retrieving and parsing data from an external service (Google Drive, Cloud Storage, GitHub, ...)
  • Keep track of all workers; notify client when work is complete
Before talking about the sample, let's check it out in action:
We are randomly generating a color in a worker and sending it back to the client to fill in a square in the "quilt". (Thanks to +Iein Valdez for this term.) In this example, think of each square as a (most likely more complex) compute task.

Application Overview

The application has a simple structure:
gae-last-across-the-finish-line/
|-- app.yaml
|-- display.py
|-- main.py
|-- models.py
+-- templates/
       +-- main.html
We'll inspect each of the Python modules display.py, main.py and models.py individually and explore how they interact with one another. In addition to this, we'll briefly inspect the HTML and Javascript contained in the template main.html, to understand how the workers pass messages back to the client.

In this post, I will explain the actual background work we did and briefly touch on the methods for communicating with the client, but won't get into client side code or the generic code for running the workers and watching them all as they cross the finish line. In the second post, we’ll examine the client side code and in the third, we’ll discuss the models that orchestrate the work.

Workers

These worker methods are defined in display.py. To generate the random colors, we simply choose a hexadecimal digit six different times and throw a # on at the beginning:
import random

HEX_DIGITS = '0123456789ABCDEF'

def RandHexColor(length=6):
  result = [random.choice(HEX_DIGITS) for _ in range(length)]
  return '#' + ''.join(result)
With RandHexColor in hand, we define a worker that will take a row and column to be colored and a session ID that will identify the client requesting the work. This worker will generate a random color and then send it to the specified client along with the row and column. To pass messages to the client, we use the Channel API and serialize our messages using the json library in Python.
import json
from google.appengine.api import channel

def SendColor(row, column, session_id):
  color = RandHexColor(length=6)
  color_dict = {'row': row, 'column': column, 'color': color}
  channel.send_message(session_id, json.dumps(color_dict))

Next...

In the next post, we'll explore the WSGI handlers that run the application and the client side code that handles the messages from the workers.

Sunday, August 19, 2012

A Decorator for App Engine Deferred Tasks

I happen to be a big fan of the deferred library for both Python runtimes in Google App Engine. If an application needs to queue up work, breaking the work into easy to understand units by writing worker methods and then deferring the work into tasks is a breeze using the deferred library. For the majority of cases, if fine grained control over the method of execution is not needed, using the deferred library is a great (and in my opinion, the correct) abstraction.

Maybe I am just biased because I have made a few changes to the deferred library over the past few months? One such change I made added a feature that allows a task to fail once without having an impact on subsequent retries; this can be accomplished by raising a SingularTaskFailure. Over this weekend, I found that I wanted to use this feature for a special type* of worker. Since I wanted to utilize this unique exception, I wanted to make sure that this worker only ran in a deferred task.

Initially I thought I was lost, since any pickled method wouldn't directly have access to the task queue specific headers from the request. But luckily, many of these headers persist as environment variables, so can be accessed via os.environ or os.getenv, yippee! Being a good little (Python) boy, I decided to abstract this requirement into a decorator and let the function do it's own work in peace.

Upon realizing the usefulness of such a decorator, I decided to write about it, so here it is:
import functools
import os

from google.appengine.ext.deferred import defer
from google.appengine.ext.deferred.deferred import _DEFAULT_QUEUE as DEFAULT_QUEUE
from google.appengine.ext.deferred.deferred import _DEFAULT_URL as DEFERRED_URL

QUEUE_KEY = 'HTTP_X_APPENGINE_QUEUENAME'
URL_KEY = 'PATH_INFO'

def DeferredWorkerDecorator(method):
  @functools.wraps(method)
  def DeferredOnlyMethod(*args, **kwargs):
    path_info = os.environ.get(URL_KEY, '')
    if path_info != DEFERRED_URL:
      raise EnvironmentError('Wrong path of execution: {}'.format(path_info))
    queue_name = os.environ.get(QUEUE_KEY, '')
    if queue_name != DEFAULT_QUEUE:
      raise EnvironmentError('Wrong queue name: {}'.format(queue_name))

    return method(*args, **kwargs)

  return DeferredOnlyMethod
This decorator first checks if the environment variable PATH_INFO is set to the default value for the deferred queue: /_ah/queue/deferred. If this is not the case (or if the environment variable is not set), an EnvironmentError is raised. Then the environment variable HTTP_X_APPENGINE_QUEUENAME is checked against the name of the default queue: default. Again, if this is incorrect or unset, an EnvironmentError is raised. If both these checks pass, the decorated method is called with its arguments and the value is returned.

To use this decorator:
import time

from google.appengine.ext.deferred import SingularTaskFailure

@DeferredWorkerDecorator
def WorkerMethod():
  if too_busy():
    time.sleep(30)
    raise SingularTaskFailure

   # do work

WorkerMethod()  # This will fail with an EnvironmentError
defer(WorkerMethod)  # This will perform the work, but in it's own task
In case you want to extend this, here is a more "complete" list of some helpful values that you may be able to retrieve from environment variables:
HTTP_X_APPENGINE_TASKRETRYCOUNT
HTTP_X_APPENGINE_QUEUENAME
HTTP_X_APPENGINE_TASKNAME
HTTP_X_APPENGINE_TASKEXECUTIONCOUNT
HTTP_X_APPENGINE_TASKETA
HTTP_X_APPENGINE_COUNTRY
HTTP_X_APPENGINE_CURRENT_NAMESPACE
PATH_INFO

*Specialized WorkerI had two different reasons to raise a SingularTaskFailure in my worker. First, I was polling for resources that may not have been online, so wanted the task to sleep and then restart (after raising the one time failure). Second, I was using a special sentinel in the datastore to determine if the current user had any other job in progress. Again, I wanted to sleep and try again until the current user's other job had completed.