IComeFromTheNet.

Released LaterJob - Database queue for shared web hosting with metrics

  • Tags: php, projects,
  • Written on: December 20, 2012


I released a new library today LaterJob.

I have had an interest in simple queues since writing an email campaign tool in 2010. In that project I used a single cron worker and a database table and given the small scale it worked. During development I began to ask myself an important question how long does it take to send? with different phrasing what is the service time of the queue, the time taken from a job entering to it leaving?

While it is possible to quote an arbitrary number ‘say all emails are sent in 3 hours’, only by analyzing a set of sending metrics will some certainty be known and I couldn’t find a php queuing library providing a metrics so I began LaterJob after finishing my search.

I wanted to ensure that:
  1. Queue designed for single node.
  2. Working on limited php environment (shared hosting).
  3. Install via Composer.
  4. Provided a Restful API to build client side graphs / tools.

Concepts

Workers.

A worker requires a cron script and not a daemon, you may have overlapping executions with each obtaining a lock on a set of jobs. The lock is temporal and will expire which is useful if a script dies in the middle of an execution and once the lock expires those jobs will be picked up by a later worker execution.

I advise you to not run more than one cron script for the same worker ( ok if executions overlap). A single cron job with quick execution time (every minute) should be sufficient for a database backed queue.

A worker has the following 3 states, in the activity log:

  1. Starting.
  2. Finished.
  3. Error.

Jobs.

A job represents a unit of work and are processed by workers. Upon entering a queue a job has an initial state added when processing begins it transitions to starting from there it can translate to one of 3 states the first state finished occurring when a job has a successful conclusion the second error when the job could not be completed but retried later with the last failed occurring at an error and the exhaustion of the retry limit.

After a job has transitioned to error state a temporal lock is added introducing a delay before the next retry. This timer is set along with the retry limit in the queue configuration.

Activity.

Each time a worker and job transition the action is recorded into the activity log. Accessed via the activity API this information is used to generate metrics by the monitor.

Monitor.

The monitor is run via a cron script once an hour and will run over the activity for the last hour and generate metrics which can be combined into periods like 6hours , 12 hours , 1 day, 1 week identifying trends and generating queue health information.

The monitor locks itself preventing overlapping executions and recounts. For example a script that runs 4 times and hour will only execute once but If the server was down for 2 hours the monitor will allow next 2 executions to run and cover the backlog.

Metrics

Metrics are recorded for job and workers,

Workers

  1. Maximum executing time of a worker in the hour.
  2. Minimum executing time of a worker in the hour.
  3. Mean executing time of a worker in the hour.
  4. Mean throughput of workers in the hour, the number of jobs processed by worker.
  5. Maximum throughput, from config, maximum number of jobs that a worker can process.
  6. Worker utilization (mean throughput / maximum throughput).

Jobs

  1. Number of jobs added in the last hour.
  2. Number of jobs started in the last hour.
  3. Number of jobs failed in the last hour.
  4. Number of jobs finished in the last hour.
  5. Number of jobs in error in the last hour.
  6. Mean service time, ie time from when job transitioned to added to finished / failed.
  7. Minimum service Time.
  8. Maximum service Time.

The most important metric?

The mean service time and worker utilization are the key metrics if the service time is greater than the base time, jobs are not cleared fast enough. If worker utilization is high scripts are processing their maximum.

Increase Capacity?

There are two options run workers more frequently or increase the maximum throughput thus lowering utilization and increasing capacity for busy times.

Example Runner of a job.

<?php
 
    /**
    *  Example Runner for a job
    *
    * @param InputInterface $input An InputInterface instance
    * @param OutputInterface $output An OutputInterface instance
    * @return null|integer null or 0 if everything went fine, or an error code
    */
    protected function execute(InputInterface $input, OutputInterface $output)
    {
        $queue = $this->getHelper('queue')->getQueue();
 
        # output to console.
        $queue->getDispatcher()->addSubscriber(new ConsoleSubscriber($output));
 
        # fetch a worker instance from queue api
        $worker = $queue->worker();
 
 
        try {
            # start the worker, record started state in the activity log
            $worker->start(new DateTime());
 
            # load the allocator, allocates work from pool of locked jobs 
            $allocator = $worker->receive(new DateTime());
 
            # fetch worker handle
            $handle = $worker->getId();
 
            # iterate over jobs to proces
     foreach($allocator as $job) {
 
                # inner error catch so error job wont stop worker processing
                try {
 
                    # start job, will transition job from added to starting in the activity log
                    $job->start($handle,new DateTime());
 
                    # simulate time taken by a single job to process
                    $sleep = (integer) mt_rand(0,15);    
                    sleep($sleep);        
 
                    # simulate a chance to fail    
                    $value = $this->floatRand(0,1);
 
                    if($value <= 0.08) {
                        # Setting the rety left to 0 to force a `failed` transition.
                        # the api will decerement the count for you normally.
                        $job->getStorage()->setRetryLeft(0);
 
                        # throw exception to be caught by inner handler
                        throw new LaterJobException('failure has occured');
                    }
 
                    # cause the `error` transition if retry counter > 0
                    if($value <= 0.08) {
                        throw new LaterJobException('error has occured');
                    }
 
                    # normal execution finished, transition from starting to finished in activity log
                    $job->finish($handle,new DateTime());
                }
                catch(LaterJobException $e) {
 
                    # transiton to failed or error  up to the developer to handle
                    # which transiton to pick, you may want the option to ignore failures
                    # and go to failed.
                    if($job->getRetryCount() > 0) {
                        $job->error($handle,new DateTime(),$e->getMessage());    
                    }
                    else {
                        $job->fail($handle,new DateTime(),$e->getMessage());    
                    }
                }
 
            }
 
        # finish the worker, will record finished state in activity log
            $worker->finish(new DateTime());
 
        } catch(LaterJobException $e) {
            # transition to error state, will record error state in activity log
            $worker->error($handle,new DateTime(),$e->getMessage());
            throw $e;            
        }
 
    return 0;
    }
 
?>