Maybe I am just biased because I have made a few changes to the deferred library over the past few months? One such change I made added a feature that allows a task to fail once without having an impact on subsequent retries; this can be accomplished by raising a SingularTaskFailure. Over this weekend, I found that I wanted to use this feature for a special type* of worker. Since I wanted to utilize this unique exception, I wanted to make sure that this worker only ran in a deferred task.
Initially I thought I was lost, since any pickled method wouldn't directly have access to the task queue specific headers from the request. But luckily, many of these headers persist as environment variables, so can be accessed via os.environ or os.getenv, yippee! Being a good little (Python) boy, I decided to abstract this requirement into a decorator and let the function do it's own work in peace.
Upon realizing the usefulness of such a decorator, I decided to write about it, so here it is:
import functools import os from google.appengine.ext.deferred import defer from google.appengine.ext.deferred.deferred import _DEFAULT_QUEUE as DEFAULT_QUEUE from google.appengine.ext.deferred.deferred import _DEFAULT_URL as DEFERRED_URL QUEUE_KEY = 'HTTP_X_APPENGINE_QUEUENAME' URL_KEY = 'PATH_INFO' def DeferredWorkerDecorator(method): @functools.wraps(method) def DeferredOnlyMethod(*args, **kwargs): path_info = os.environ.get(URL_KEY, '') if path_info != DEFERRED_URL: raise EnvironmentError('Wrong path of execution: {}'.format(path_info)) queue_name = os.environ.get(QUEUE_KEY, '') if queue_name != DEFAULT_QUEUE: raise EnvironmentError('Wrong queue name: {}'.format(queue_name)) return method(*args, **kwargs) return DeferredOnlyMethodThis decorator first checks if the environment variable PATH_INFO is set to the default value for the deferred queue: /_ah/queue/deferred. If this is not the case (or if the environment variable is not set), an EnvironmentError is raised. Then the environment variable HTTP_X_APPENGINE_QUEUENAME is checked against the name of the default queue: default. Again, if this is incorrect or unset, an EnvironmentError is raised. If both these checks pass, the decorated method is called with its arguments and the value is returned.
To use this decorator:
import time from google.appengine.ext.deferred import SingularTaskFailure @DeferredWorkerDecorator def WorkerMethod(): if too_busy(): time.sleep(30) raise SingularTaskFailure # do work WorkerMethod() # This will fail with an EnvironmentError defer(WorkerMethod) # This will perform the work, but in it's own taskIn case you want to extend this, here is a more "complete" list of some helpful values that you may be able to retrieve from environment variables:
HTTP_X_APPENGINE_TASKRETRYCOUNT HTTP_X_APPENGINE_QUEUENAME HTTP_X_APPENGINE_TASKNAME HTTP_X_APPENGINE_TASKEXECUTIONCOUNT HTTP_X_APPENGINE_TASKETA HTTP_X_APPENGINE_COUNTRY HTTP_X_APPENGINE_CURRENT_NAMESPACE PATH_INFO
*Specialized Worker: I had two different reasons to raise a SingularTaskFailure in my worker. First, I was polling for resources that may not have been online, so wanted the task to sleep and then restart (after raising the one time failure). Second, I was using a special sentinel in the datastore to determine if the current user had any other job in progress. Again, I wanted to sleep and try again until the current user's other job had completed.
No comments:
New comments are not allowed.