Créer des batchs multi-thread avec Symfony / Propel
La création de batchs multi-thread avec des applications basées sur Propel n'est pas simple, car la connexion à la base de données est détruite à chaque fin de tâche enfant. De cela résulte des erreurs de connexion assez difficiles à diagnostiquer. Cet article propose une solution à ce problème.
Ce guide est testé sur:
- Symfony 1.0
Description du problème
Lorsque l'on utilise Propel dans un environnement multi-threadé, la fin d'un thread entraine la destruction de la connexion à la base de donnée. Cela entraine des erreur du type :
PHP Fatal error: Uncaught exception 'PropelException' with message ' [wrapped: No database selected [Native Error: MySQL server has gone away]]' in /usr/share/php/symfony/vendor/propel/util/BasePeer.php:458
Stack trace:
#0 lib/model/om/BaseMonObjetPeer.php(323): BasePeer::doSelect(Object(Criteria), Object(sfDebugConnection))
#1 lib/model/om/BaseMonObjetPeer.php(291): BaseMonObjetPeer::doSelectRS(Object(Criteria), Object(sfDebugConnection))
#2 lib/model/om/BaseMonObjetPeer.php(1185): BaseMonObjetPeer::doSelect(Object(Criteria), Object(sfDebugConnection))
#3 lib/model/MonObjetPeer.php(169): BaseMonObjetPeer::retrieveByPK('2')
#4 batch/mon-batch.php(11): MonObjetPeer::maMethodeMultithread()
#5 {main}
thrown in /usr/share/php/symfony/vendor/propel/util/BasePeer.php on line 458
Correction de l'erreur
Pour résoudre cette erreur de connexion, il suffit de forcer la fermeture de la connexion SQL, et recréer ensuite cette connexion. Cette action doit se placer avant toute requête SQL et se résume aux deux appels suivants :
Propel::close();
Propel::initialize();
Vous pouvez aussi vous référer à l'exemple ci-dessous.
Source : Merci à Hans pour son travail sur le ticket #556 : Problems with connection closing and use in PHP daemon.
Exemple de batch utilisant la base de données dans un projet Symfony 1.0
Pour créer un batch, il suffit d'ajouter le fichier "mon-batch.php" dans le dossier "batch" de votre projet. Ce fichier doit contenir quelquechose de ce genre :
#!/bin/php
<?php
define('SF_ROOT_DIR', realpath(dirname(__FILE__).'/..'));
define('SF_APP', 'frontend');
define('SF_ENVIRONMENT', 'dev');
define('SF_DEBUG', true);
require_once(SF_ROOT_DIR.DIRECTORY_SEPARATOR.'apps'.DIRECTORY_SEPARATOR.SF_APP.DIRECTORY_SEPARATOR.'config'.DIRECTORY_SEPARATOR.'config.php');
MonObjetPeer::maMethodeMultithread()
Une fois ce fichier créé, vous pouvez ajouter la méthode lancée par ce batch dans votre objet "MonObjetPeer". Par exemple :
class MonObjetPeer extends BaseMonObjetPeer
{
/**
* Forking batch for example
*
* @static
* @access public
* @return void
*/
public static function maMethodeMultithread()
{
// initialize database manager
// This allow to fetch data from database.
$databaseManager = new sfDatabaseManager();
$databaseManager->initialize();
$global_start_time = microtime(true);
$max_forks = sfConfig::get('app_batchs_max_forks', 1);
global $children;
$children = array();
function reap_children()
{
global $children;
$tmp = array();
foreach ($children as $pid) // for each living child
{
if (pcntl_waitpid($pid, $status, WNOHANG) != $pid)
{
array_push($tmp, $pid);
}
// else
// {
// echo "[SIGCHLD] child $pid reaped.\n";
// }
} // for each living child
$children = $tmp;
return count($tmp);
} // reap_children()
function sigalrm_handler()
{
die("[SIGALRM] hang in feed update?\n");
} // sigalrm_handler()
function sigchld_handler($signal)
{
$running_jobs = reap_children();
// echo "[SIGCHLD] jobs left: $running_jobs\n";
pcntl_waitpid(-1, $status, WNOHANG);
} // sigchld_handler()
pcntl_signal(SIGALRM, 'sigalrm_handler');
pcntl_signal(SIGCHLD, 'sigchld_handler');
// We fetch existings MonObjet IDs.
$criteria = new Criteria();
$criteria->addSelectColumn(MonObjetPeer::ID);
$rs = MonObjetPeer::doSelectRS($criteria);
$rs->setFetchMode(ResultSet::FETCHMODE_ASSOC);
$mon_objet_ids = array();
while($rs->next()) // For each resultset line.
{
$record = $rs->getRow();
$mon_objet_ids[] = $record['ID'];
} // For each resultset line.
unset($record);
unset($rs);
foreach($mon_objet_ids as $mon_objet_id) // For each MonObjet ID.
{
// Since each child make the SQL connection die with him,
// We force the closing of database connections in order to be able to reconnect.
Propel::close();
Propel::initialize();
// We fetch the object.
$mon_objet = MonObjetPeer::retrieveByPK($mon_objet_id);
if(count($children) < $max_forks) // Test if a fork slot is available.
{
$pid = pcntl_fork();
if ($pid == -1)
{
die("fork failed!\n");
}
else if ($pid)
{
array_push($children, $pid);
// echo sprintf("[MASTER] spawned client %d [PID:%d]...\n", count($children), $pid);
}
else
{
pcntl_signal(SIGCHLD, SIG_IGN);
pcntl_signal(SIGINT, SIG_DFL);
// Processing MonObjet.
echo sprintf("MonObjet n°%d : name : %s.", $mon_objet->getId(), $mon_objet->getName());
$start_time = microtime(true);
$mon_objet->maGrosseMethode();
}
}
} // For each MonObjet ID.
// We wait for all forks to exit.
while(count($children))
{
// We clean children list.
reap_children();
sleep(1);
}
echo sprintf("Completed in %01.2f seconds.\n", microtime(true) - $global_start_time);
} // maMethodeMultithread()
} // class MonObjetPeer