Sunday, August 19, 2012

A Decorator for App Engine Deferred Tasks

I happen to be a big fan of the deferred library for both Python runtimes in Google App Engine. If an application needs to queue up work, breaking the work into easy to understand units by writing worker methods and then deferring the work into tasks is a breeze using the deferred library. For the majority of cases, if fine grained control over the method of execution is not needed, using the deferred library is a great (and in my opinion, the correct) abstraction.

Maybe I am just biased because I have made a few changes to the deferred library over the past few months? One such change I made added a feature that allows a task to fail once without having an impact on subsequent retries; this can be accomplished by raising a SingularTaskFailure. Over this weekend, I found that I wanted to use this feature for a special type* of worker. Since I wanted to utilize this unique exception, I wanted to make sure that this worker only ran in a deferred task.

Initially I thought I was lost, since any pickled method wouldn't directly have access to the task queue specific headers from the request. But luckily, many of these headers persist as environment variables, so can be accessed via os.environ or os.getenv, yippee! Being a good little (Python) boy, I decided to abstract this requirement into a decorator and let the function do it's own work in peace.

Upon realizing the usefulness of such a decorator, I decided to write about it, so here it is:
import functools
import os

from google.appengine.ext.deferred import defer
from google.appengine.ext.deferred.deferred import _DEFAULT_QUEUE as DEFAULT_QUEUE
from google.appengine.ext.deferred.deferred import _DEFAULT_URL as DEFERRED_URL

QUEUE_KEY = 'HTTP_X_APPENGINE_QUEUENAME'
URL_KEY = 'PATH_INFO'

def DeferredWorkerDecorator(method):
  @functools.wraps(method)
  def DeferredOnlyMethod(*args, **kwargs):
    path_info = os.environ.get(URL_KEY, '')
    if path_info != DEFERRED_URL:
      raise EnvironmentError('Wrong path of execution: {}'.format(path_info))
    queue_name = os.environ.get(QUEUE_KEY, '')
    if queue_name != DEFAULT_QUEUE:
      raise EnvironmentError('Wrong queue name: {}'.format(queue_name))

    return method(*args, **kwargs)

  return DeferredOnlyMethod
This decorator first checks if the environment variable PATH_INFO is set to the default value for the deferred queue: /_ah/queue/deferred. If this is not the case (or if the environment variable is not set), an EnvironmentError is raised. Then the environment variable HTTP_X_APPENGINE_QUEUENAME is checked against the name of the default queue: default. Again, if this is incorrect or unset, an EnvironmentError is raised. If both these checks pass, the decorated method is called with its arguments and the value is returned.

To use this decorator:
import time

from google.appengine.ext.deferred import SingularTaskFailure

@DeferredWorkerDecorator
def WorkerMethod():
  if too_busy():
    time.sleep(30)
    raise SingularTaskFailure

   # do work

WorkerMethod()  # This will fail with an EnvironmentError
defer(WorkerMethod)  # This will perform the work, but in it's own task
In case you want to extend this, here is a more "complete" list of some helpful values that you may be able to retrieve from environment variables:
HTTP_X_APPENGINE_TASKRETRYCOUNT
HTTP_X_APPENGINE_QUEUENAME
HTTP_X_APPENGINE_TASKNAME
HTTP_X_APPENGINE_TASKEXECUTIONCOUNT
HTTP_X_APPENGINE_TASKETA
HTTP_X_APPENGINE_COUNTRY
HTTP_X_APPENGINE_CURRENT_NAMESPACE
PATH_INFO

*Specialized WorkerI had two different reasons to raise a SingularTaskFailure in my worker. First, I was polling for resources that may not have been online, so wanted the task to sleep and then restart (after raising the one time failure). Second, I was using a special sentinel in the datastore to determine if the current user had any other job in progress. Again, I wanted to sleep and try again until the current user's other job had completed.

No comments: