librelist archives

« back to archive

Background tasks with Photon

Background tasks with Photon

From:
Loic d'Anterroches
Date:
2011-02-20 @ 20:56
Hello,

here is the documentation, the code is available in the repository.
Basically, you can now write background tasks which are going to run in
their own process and communicate with your web application over zeromq.
The communication is hidden on your side, for example, you can get the
time from the time server just by doing:

        $runner = new \photon\task\Runner();
        $time = $runner->run('photon_timeserver', 'ms');
        return new \photon\http\Response($time, 'text/plain');

or send something to be logged (you do not care if the logging is done
directly or not):

        $runner = new \photon\task\Runner();
        $runner->run('photon_logger', array(time(), 'ms'));
        return new \photon\http\Response('OK', 'text/plain');

The first is waiting for the answer, the last is returning directly.
This is the base I needed to have the chat server running. So, the next
step is to understand a bit what Zed has done with the chat example and
adapt it to run on top of Photon. Once done, I think it will be time to
release (ok, first I will pay a designer to make the design of the
photon-project.com website).

Yes, I am happy, really happy, this is kick ass stuff and it will really
shake the current PHP framework ecosystem.

loïc

title: Background Tasks with Photon
description:
Background tasks allow you to perform operations in the background
to offload process intensive or specialized work. A good example is
to use a task to manage a chat server.
+++
keywords: asynchronous, background operation
author: Loïc d'Anterroches
authorurl: http://danterroches.org
date: 20th of February, 2011
content:

## A Task Run Something in the Background

Examples of tasks for you to have an idea:

- return the time to synchronize different systems;
- receive logs and write them to disk in batch every 60 seconds to make
large writes;
- keep track of all the connected users in a chat and send them the
current messages;
- send notifications emails to have the webpage refresh fast;
- small memory cache available for all the Photon processes;
- performing indexing of documents after insert or update.

Good candidats to be converted as tasks:

- you do not need the answer to the operation for the client right now;
- the operation task more than 50 ms in average to complete;
- the operation serves as a reference data for others;
- the operation connect to remote systems with high latency.

**Tasks can be asynchronous**, they get a job to be done and do not
return the answer directly or **synchronous**, they answer directly.

## Asynchronous Mail Sender Task

Suppose we have a message to send to 25 persons during the processing
of a request. For example, you have updated a bug with some comments
and 25 persons want to be notified of the bug evolution. This would
cost you a lot of time to send directly the emails, so we are going to
send them through an asynchronous task.

The task will take:

- the list of email addresses;
- the message subject;
- the message body.

You will see that it is extremely simple to create such a task, you
just need to define the method doing the work, simply named `work` and
the name of the task.

~~~~~
// We always put the code in a namespace. Here you will put this code in
// the yourproject/apps/mywebapp/task.php file.
namespace mywebapp\task;
// We need the project configuration to have the sender email.
use photon\config\Container as Conf;

class MailSender extends \photon\task\AsyncTask
{
    /**
     * The name of the task. It must be unique.
     */
    public $name = 'mailsender';

    /**
     * We receive the work and send the emails.
     */
    public function work($socket)
    {
        // task get in the message its name, the id of the client requesting
        // the work (good for logging) and the payload. Most of the time
        // the payload will be json encoded.
        list($taskname, $client, $payload) = explode(' ',
$socket->recv(), 3);
        $payload = json_decode($payload);
        $emails = $payload['emails'];
        $subject = $payload['subject'];
        $body = $payload['body'];
        $from = ;
        $headers = 'From: ' . Conf::f('from_email') . "\r\n" .
                   'X-Mailer: Photon/' . \photon\VERSION;
        foreach ($payload['emails'] as $to) {
            // You just send your email as usual, maybe using some of the
            // nice PEAR libs to do so.
            mail($to, $payload['subject'], $payload['body'], $headers);
        }
    }
}
~~~~~

Now, you have created your task, the last step is to install it. To do
so, just add to your configuration file that the `mailsender` task is
available as the `\mywebapp\task\MailSender` class.

~~~~~
return array(
    ...
    'installed_tasks' => array('mailsender' => '\mywebapp\task\MailSender'),
);
~~~~~

Now stop, start your project and list the available processes:

~~~~~
$ hnu server stop && hnu server start
$ hnu server list
Photon id                                     Uptime        Served  Mem.
(kB)  Peak mem. (kB)

-----------------------------------------------------------------------------------------------
loa-desktop-32559-1298195713                  0d00:00:54    0       1557
      1567
loa-desktop-32560-1298195713                  0d00:00:54    0       1557
      1567
loa-desktop-task-mailsender-32558-1298195713  0d00:00:54    0       1525
      1540
loa-desktop-32561-1298195713                  0d00:00:54    0       1557
      1567

-----------------------------------------------------------------------------------------------
~~~~~

You can see the line `loa-desktop-task-mailsender-32558-1298195713`,
your task is available and ready to process work for you. Yeah! Tho
next step is to send work to your task.

So, in your code, we are going to send work to the `mailsender` task:

~~~~~
class MyViews
{
    public function updateTicket($request, $match)
    {
        // Do a lot of work to update the ticket
        $payload = array(
                 'emails' => array('foo1@example.com',
                                   'foo2@example.com',
                                   'foo3@example.com',
                                   'foo4@example.com'),
                 'subject' => 'Ticket 123 has been updated.',
                 'body' => 'Hello, Photon is great and you too.');
        // The runner will take care of the communication with the task,
        // even if the task is on a remote system far away.
        $runner = new \photon\task\Runner();
        // The run call will return immediately!
        $runner->run('mailsender', $payload);
        return new \photon\http\Response('Ticket updated!', 'text/plain');
    }
}
~~~~~

Could it be simpler? You start a task runner `\photon\task\Runner` and
send a payload to the `mailsender` task. Nothing more, nothing less.

With a traditional framework, you need to setup manually a queue
system, define a protocol, get workers to connect to the queue and
supervise everything. Most of the time people are ending up installing
[Gearman][gearman] or rolling their own system. If this is just to
send emails, this is really a lot of work for such a small job!

[gearman]: http://gearman.org/ "Gearman Work Server"

## Synchronous Memory Cache

Now, even if we have a very fast framework, if you want to display a
feed on your user page, it is better to retrieve the feed from a cache
instead of getting it each time. So, we are going to create a memory
cache task which will be available for all the Photon processes. We
could cache the feed in the memory of each Photon process, but if you
have 20 of them and have a refresh every 20 minutes, it means that you
will have 20 fetches of the feed every 20 minutes. Not good.

For a memory cache, one want to directly get the results of the
request, the request being either to store or get a value. We are
going to add the task in the same namespace as the previous task.

~~~~~
// We always put the code in a namespace. Here you will put this code in
// the yourproject/tasks/mywebapp/task.php file.
namespace mywebapp\task;

// Here the MailSender task

// Notice that we extend the "SyncTask"
class MemoryCache extends \photon\task\SyncTask
{
    /**
     * The name of the task. It must be unique.
     *
     * By convention, prefix the name with the name of your
     * application, this will save your day when you will need
     * to integrate another "mailer" task in your system.
     */
    public $name = 'mywebapp_memorycache';

    /**
     * This is the memory storage.
     *
     * For each key we store: array($value, $expiration_time, $stats)
     */
    public static $storage = array();
    public static $counter = 0;

    /**
     * We receive the work and answer directly.
     */
    public function work($socket)
    {
        list($taskname, $client, $payload) = explode(' ',
$socket->recv(), 3);
        $payload = json_decode($payload);
        $ans = sprintf('%s %s %%s', $client, $taskname);
        switch ($payload['action']) {
        case 'get':
            if (isset(self::$storage[$payload['key']])
                && self::$storage[$payload['key']][1] > time()) {
                $ans = sprintf($ans,

json_encode(self::$storage[$payload['key']][0]));
                self::$storage[$payload['key']][2]++;
            } else {
                $ans = sprintf($ans, json_encode(null));
                unset(self::$storage[$payload['key']]);
            }
            break;
        case 'put':
            self::$storage[$payload['key']] = array($payload['val'],
                                              $payload['timeout'] + time(),
                                              0);
            $ans = sprintf($ans, json_encode('OK'));
            break;
        case default:
            $ans = sprintf($ans, json_encode('KO'));
        }
        $socket->send($ans);
    }

    /**
     * the loop() method is called after each call to work or at least
     * every 200 ms. We are going to use it to clean the old keys.
     *
     * This is not strictly needed, but may prevent accumulation of dead
     *  keys in the memory.
     */
    public function loop()
    {
        // checking every 200ms is not good, so we are going to check
        // every few minutes
        self::$counter++;
        if (0 !== (self::$counter % 2000)) {
            return;
        }
        self::$counter = 0; // We reset the counter
        $time = time();
        foreach (self::$storage as $key => $val) {
            if ($val[1] < $time) {
                unset(self::$storage[$key]);
            }
        }
    }
}
~~~~~

As you can see, it is a bit more complex, but in less than 40 lines of
code you have a functional albeit limited memory cache task. The
really important point to notice is that you must both receive the
work with `$socket->recv()` and send the answer with `$socket->send()`
within the `work()` method. This is asynchronous task, that is, your
client is waiting for your answer right now!

Now, we have two tasks we can enable:

~~~~~
return array(
    ...
    'installed_tasks' => array(
                      'mailsender' => '\mywebapp\task\MailSender',
                      'mywebapp_memorycache' =>
'\mywebapp\task\MemoryCache',

                     ),
    );
~~~~~

By default, a task is listening on the `tcp://127.0.0.1:5997` socket,
as we do not want both tasks to listen on the same socket, we are
going to tell the memory cache to listen on another socket. This is
done in the configuration file:

~~~~~
return array(
    ...
    'installed_tasks' => array(
                      'mailsender' => '\mywebapp\task\MailSender',
                      'mywebapp_memorycache' =>
'\mywebapp\task\MemoryCache',

                     ),
    'photon_task_mywebapp_memorycache' =>
               array('sub_addr' => 'tcp://127.0.0.1:5998'),
    );
~~~~~

We just say, instead of listening to the standard socket, use another
one. Now, stop and start photon, then check what you have as running tasks:

~~~~~
$ hnu server list
Waiting for the answers...
Photon id                                    Uptime        Served  Mem.
(kB)  Peak mem. (kB)

----------------------------------------------------------------------------------------------
loa-desktop-3201-1298206641                            0d00:00:02    0
     1558       1568
loa-desktop-task-mailsender-3198-1298206641            0d00:00:02    0
     1536       1551
loa-desktop-3200-1298206641                            0d00:00:02    0
     1558       1568
loa-desktop-task-mywebapp_memorycache-3197-1298206641  0d00:00:01    0
     1536       1551
loa-desktop-3199-1298206641                            0d00:00:02    0
     1558       1568

----------------------------------------------------------------------------------------------
5 Photon servers running. Memory usage: 7749kB.
~~~~~

Perfect, you have the default three handlers receiving the requests
from Mongrel2 and the two background tasks. Let's have fun and use
this new task.

~~~~~
class MyViews
{
    public function displayFeed($request, $match)
    {
        $payload = array('action' => 'get',
                         'key' => 'my_feed');
        $runner = new \photon\task\Runner();
        // The run call will wait for the task answer.
        $ans = $runner->run('mywebapp_memorycache', $payload);
        if ($ans === null) {
            $ans = get_the_feed();
            $payload = array('action' => 'put',
                             'key' => 'my_feed',
                             'val' => $ans,
                             'timeout' => 3600); // 1 hour
            $runner->run('mywebapp_memorycache', $payload);
        }
        return new \photon\http\Response($ans);
    }
}
~~~~~

Simple... you just created a memory cache daemon which will be
available to the 3 photon processes running at the moment.

But, you can see that in your request, the `get_the_feed()` call is
going to take time, so, the better way would be to create an
asynchronous task to retrieve the feed, store the result in memory for
one year and store in another key the age of the feed. If the feed is
older than 1h, trigger the asynchronous task to refresh the feed and
push it in the memory cache. You just need to control that if 30
clients are triggering the retrieval of the feed, only one is
performed per hour. This is easy as you would have only one task
running to refresh the feed.

Now, I hope you understand how easy it is to write tasks and use
them. Also, you can start to think your application in a way to
provide maximal speed to your end users.



-- 
Dr Loïc d'Anterroches
Founder Céondo Ltd

w: www.ceondo.com       |  e: loic@ceondo.com
t: +44 (0)207 183 0016  |  f: +44 (0)207 183 0124

Céondo Ltd
Dalton House
60 Windsor Avenue
London
SW19 2RR / United Kingdom