multithreading.php
author Dan
Tue, 26 May 2009 15:25:30 -0400
changeset 69 73780a159e15
parent 68 32f6e2ee15ab
child 76 487a16c7117c
permissions -rw-r--r--
DCOP: added safeguards for mysterious zombie behavior

<?php

/**
 * Multi-threading (well sort of) tools
 * 
 * Greyhound - real web management for Amarok
 * Copyright (C) 2008 Dan Fuhry
 *
 * This program is Free Software; you can redistribute and/or modify it under the terms of the GNU General Public License
 * as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied
 * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for details.
 */

require_once('json.php');

/**
 * Global signal handler for SIGCHLD.
 */

function Threader_SigChld()
{
  global $threader_instances;
  foreach ( $threader_instances as &$mt )
  {
    if ( is_object($mt) )
    {
      $mt->event_sigchld();
    }
  }
}

/**
 * Global signal handler for SIGUSR2.
 */

function Threader_SigUsr2()
{
  global $threader_instances, $threader_notick;
  if ( @$threader_notick )
    return;
  foreach ( $threader_instances as &$mt )
  {
    if ( is_object($mt) )
    {
      $mt->event_sigusr2();
    }
  }
}

/**
 * List of Threader instances. Needed for global handling of signals.
 * @var array
 */

global $threader_instances;
$threader_instances = array();

/**
 * Tools for emulating multi-threaded operation in PHP scripts.
 * @package Amarok
 * @subpackage WebControl
 * @author Dan Fuhry
 * @license GNU General Public License <http://www.gnu.org/licenses/old-licenses/gpl-2.0.html>
 */

class Threader
{
  
  /**
   * Return value of fork() if the process is a child.
   * @const int
   */
  
  const FORK_CHILD = -1;
  
  /**
   * Set to true if this is a child process. No exceptions.
   * @var bool
   * @access private
   */
  
  private $is_child = false;
  
  /**
   * Sockets for inter-process communication.
   * @var array
   * @access private
   */
  
  protected $ipc_sockets = array();
  
  /**
   * Socket for communication with the parent. Obviously only used after calling fork().
   * @var resource
   * @access private
   */
  
  protected $parent_sock = false;
  
  /**
   * Services_JSON instance.
   * @var object
   * @access private
   */
  
  protected $json = false;
  
  /**
   * PID of the parent process.
   * @var int
   * @access private
   */
  
  protected $parent_pid = 1;
  
  /**
   * List of actions for IPC events.
   * @var array
   * @access private
   */
  
  protected $ipc_actions = array();
  
  /**
   * Constructor. Sets up signal handlers. Nothing to see here, move along.
   */
  
  public function __construct()
  {
    global $threader_instances;
    
    if ( function_exists('pcntl_signal') )
    {
      declare(ticks=1);
      
      $threader_instances[] =& $this;
      
      pcntl_signal(SIGCHLD, 'Threader_SigChld');
      pcntl_signal(SIGUSR2, 'Threader_SigUsr2');
    }
    
    $this->json = new Services_JSON(SERVICES_JSON_LOOSE_TYPE);
    $this->parent_pid = getmypid();
  }
  
  /**
   * Forks the current process. See your system's fork(2) man page for details.
   * @return int FORK_CHILD if child process, PID of child if parent process. Returns false on failure.
   */
  
  public function fork()
  {
    // create our new sockets for IPC
    $socket_pair = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, STREAM_IPPROTO_IP);
    // fork (emoticon of the day: --<E)
    $fork_result = pcntl_fork();
    if ( $fork_result == -1 )
    {
      // fork failed.
      return false;
    }
    else if ( $fork_result )
    {
      // we are the parent - register the child
      fclose($socket_pair[0]);
      $this->ipc_sockets[$fork_result] = $socket_pair[1];
      return $fork_result;
    }
    else
    {
      // we are the child.
      fclose($socket_pair[1]);
      $this->parent_sock = $socket_pair[0];
      $this->is_child = true;
      return self::FORK_CHILD;
    }
  }
  
  /**
   * Are we a child?
   * @return bool
   */
  
  public function is_child()
  {
    return $this->is_child;
  }
  
  /**
   * Register an action so that when it is fired over IPC, a custom function can be called.
   * @param string Action
   * @param callback Function to call
   * @return true on success, false on failure
   */
  
  function ipc_register($action, $callback)
  {
    if ( !is_string($action) || empty($action) || !is_callable($callback) )
    {
      return false;
    }
    $this->ipc_actions[$action] = $callback;
    return true;
  }
  
  /**
   * Send through an IPC event. If this is a child, it only notifies the parent; if we're the parent, all children are notified. 
   * @param array Data to be sent through. This must be an associative array containing an "action" key at minimum. If this a key "propagate" set to true, a parent that receives this will propagate the message to all children.
   * @return null
   */
  
  function ipc_send($data)
  {
    if ( !isset($data['action']) )
    {
      return false;
    }
    $data = $this->json->encode($data);
    if ( $this->is_child() )
    {
      fwrite($this->parent_sock, "$data\n");
      // signal the parent that we've got an event
      posix_kill($this->parent_pid, SIGUSR2);
    }
    else
    {
      // signal each child
      foreach ( $this->ipc_sockets as $pid => $socket )
      {
        fwrite($socket, "$data\n");
        posix_kill($pid, SIGUSR2);
      }
    }
    return null;
  }
  
  /**
   * Handler for SIGCHLD events.
   * @access private
   */
  
  function event_sigchld()
  {
    // this should never happen to children.
    if ( $this->is_child() )
    {
      return null;
    }
    
    // wait for child to exit.
    pcntl_wait($status);
    // for each child PID, kill with signal 0 (effectively, test if process is alive)
    // if posix_kill fails, it's dead so remove it from the list.
    foreach ( $this->ipc_sockets as $pid => $socket )
    {
      if ( !@posix_kill($pid, 0) )
      {
        // signal failed.
        fclose($socket);
        unset($this->ipc_sockets[$pid]);
      }
    }
  }
  
  /**
   * Handler for IPC events.
   * @access private
   */
  
  function event_sigusr2()
  {
    if ( $this->is_child() )
    {
      // this is easy - the parent sent the signal.
      @stream_set_blocking($this->parent_sock, 0);
      $command = rtrim(fgets($this->parent_sock, 102400), "\n");
    }
    else
    {
      // since we can't find which PID sent the signal, set the timeout to a very small amount
      // of time and try to read; if we get something, awesome.
      foreach ( $this->ipc_sockets as $pid => $socket )
      {
        @stream_set_blocking($socket, 0);
        $command = rtrim(@fgets($socket, 102400), "\n");
        if ( !empty($command) )
        {
          break;
        }
      }
    }
    if ( empty($command) )
    {
      // hmm, got a sigusr2 without an incoming command. oh well, ignore.
      return null;
    }
    $command = $this->json->decode($command);
    if ( !isset($command['action']) )
    {
      // no action = no way to figure out how to handle this.
      return null;
    }
    if ( !isset($this->ipc_actions[$command['action']]) )
    {
      // action not registered
      return null;
    }
    // should we propagate this event?
    if ( !$this->is_child() && isset($command['propagate']) && $command['propagate'] === true )
    {
      $this->ipc_send($command);
    }
    // we're good
    @call_user_func($this->ipc_actions[$command['action']], $command, $this);
  }
  
  /**
   * Kills all child processes.
   * @access public
   */
  
  public function kill_all_children()
  {
    foreach ( $this->ipc_sockets as $pid => $socket )
    {
      $socklen = count($this->ipc_sockets);
      posix_kill($pid, SIGTERM);
      // wait until we are conscious of this child's death
      while ( count($this->ipc_sockets) >= $socklen )
      {
        usleep(20000);
      }
    }
  }
  
}

?>