parallel\run

(1.0.0)

parallel\run — ExĂ©cution

Description

function parallel\run(Closure $task): ?Future

Planifie task pour exécution en parallÚle.

function parallel\run(Closure $task, array $argv): ?Future

Planifie task pour exécution en parallÚle, passant argv à l'exécution.

Planification automatique

Si un \parallel\Runtime créé et mis en cache par un appel précédent à parallel\run() est inactif, il sera utilisé pour exécuter la tùche. Si aucun \parallel\Runtime n'est inactif, parallel créera et mettra en cache un \parallel\Runtime.

Note: Les objets \parallel\Runtime créés par le développeur ne sont pas utilisés pour la planification automatique.

Liste de paramĂštres

task
Une Closure avec des caractéristiques spécifiques.
argv
Un array d'arguments avec des caractéristiques spécifiques à passer à task au moment de l'exécution.

Caractéristiques de la tùche

Les fermetures planifiées pour l'exécution en parallÚle ne doivent pas :

  • accepter ou retourner par rĂ©fĂ©rence
  • accepter ou retourner des objets internes (voir notes)
  • exĂ©cuter un ensemble limitĂ© d'instructions

Les instructions interdites dans les fermetures destinées à l'exécution en parallÚle sont :

  • yield
  • utiliser by-reference
  • dĂ©clarer des classes
  • dĂ©clarer des fonctions nommĂ©es

Note: Les fermetures imbriquées peuvent yield ou utiliser by-reference, mais ne doivent pas contenir de déclarations de classes ou de fonctions nommées.

Note: Aucune instruction n'est interdite dans les fichiers que la tĂąche peut inclure.

Caractéristiques des arguments

Les arguments ne doivent pas:

  • contenir des rĂ©fĂ©rences
  • contenir des ressources
  • contenir des objets internes (voir notes)

Note: Dans le cas des ressources de flux de fichiers, la ressource sera convertie en descripteur de fichier et passée en int si possible, ceci n'est pas supporté sur Windows.

Notes sur les objets internes

Les objets internes utilisent gĂ©nĂ©ralement une structure personnalisĂ©e qui ne peut pas ĂȘtre copiĂ©e en toute sĂ©curitĂ© par valeur, PHP manque actuellement des mĂ©canismes pour le faire (sans sĂ©rialisation) et donc seuls les objets qui n'utilisent pas une structure personnalisĂ©e peuvent ĂȘtre partagĂ©s.

Certains objets internes n'utilisent pas de structure personnalisĂ©e, par exemple parallel\Events\Event et peuvent donc ĂȘtre partagĂ©s.

Les fermetures sont un type spĂ©cial d'objet interne et peuvent ĂȘtre copiĂ©es par valeur, et peuvent donc ĂȘtre partagĂ©es.

Les canaux sont centraux pour l'Ă©criture de code parallĂšle et supportent l'accĂšs et l'exĂ©cution concurrents par nĂ©cessitĂ©, et peuvent donc ĂȘtre partagĂ©s.

Avertissement

Une classe utilisateur qui Ă©tend une classe interne peut utiliser une structure personnalisĂ©e telle que dĂ©finie par la classe interne, auquel cas elle ne peut pas ĂȘtre copiĂ©e en toute sĂ©curitĂ© par valeur, et ne peut donc pas ĂȘtre partagĂ©e.

Valeurs de retour

Avertissement

La Future retournĂ©e ne doit pas ĂȘtre ignorĂ©e lorsque la tĂąche contient une dĂ©claration de retour ou de lancer.

Exceptions

Avertissement

Lance une parallel\Runtime\Error\Closed si parallel\Runtime était fermé.

Avertissement

Lance une parallel\Runtime\Error\IllegalFunction si task est une fermeture créée à partir d'une fonction interne.

Avertissement

Lance une parallel\Runtime\Error\IllegalInstruction si task contient des instructions illégales.

Avertissement

Lance une parallel\Runtime\Error\IllegalParameter si task accepte ou argv contient des variables illégales.

Avertissement

Lance une parallel\Runtime\Error\IllegalReturn si task retourne illégalement.

add a note

User Contributed Notes 3 notes

up
24
john_2885 at yahoo dot com ¶
6 years ago
Here's a more substantial example of how to use the run functional API.

<?php
/*********************************************
 * Sample parallel functional API
 * 
 * Scenario
 * -------------------------------------------
 * Given a large number of rows of
 * data to process, divide the work amongst
 * a set of workers.  Each worker is responsible
 * for finishing their assigned task.
 * 
 * In the code below, assume we have arbitrary 
 * start and end IDs (rows) - we will try to
 * divide the number of IDs (rows) evenly
 * across 8 workers.  The workers will get the
 * following batches to process to completion:
 *
 * Total number of IDs (rows): 1371129
 * Each worker will get 171392 IDs to process
 *
 * Worker 1: IDs from 11001 to 182393
 * Worker 2: IDs from 182393 to 353785
 * Worker 3: IDs from 353785 to 525177
 * Worker 4: IDs from 525177 to 696569
 * Worker 5: IDs from 696569 to 867961
 * Worker 6: IDs from 867961 to 1039353
 * Worker 7: IDs from 1039353 to 1210745
 * Worker 8: IDs from 1210745 to 1382130
 *
 * Each worker then processes 5000 rows at a time
 * until they are done with their assigned work
 *
 *********************************************/

use \parallel\{Runtime, Future, Channel, Events};

$minId = 11001;
$maxId = 1382130;
$workers = 8;
$totalIds = $maxId - $minId;
// Try to divide IDs evenly across the number of workers
$batchSize = ceil($totalIds / $workers);
// The last batch gets whatever is left over
$lastBatch = $totalIds % $batchSize;
// The number of IDs (rows) to divide the overall
// task into sub-batches 
$rowsToFetch = 5000;

print "Total IDs: " . $totalIds . "\n";
print "Batch Size: " . $batchSize . "\n";
print "Last Batch: " . $lastBatch . "\n";

$producer = function(int $worker, int $startId, int $endId, int $fetchSize) {
    $tempMinId = $startId;
    $tempMaxId = $tempMinId + $fetchSize;
    $fetchCount = 1;
    
    print "Worker " . $worker . " working on IDs from " . $startId . " to " . $endId . "\n";
    
    while($tempMinId < $endId) {
        for($i = $tempMinId; $i < $tempMaxId; $i++) {
            $usleep = rand(500000, 1000000);
            usleep($usleep);
            print "Worker " . $worker .  " finished batch " . $fetchCount . " from ID " . $tempMinId . " to " . $tempMaxId . "\n";
            // Need to explicitly break out of the for loop once complete or else it will forever process only the first sub-batch
            break;
        }
        
        // Now we move on to the next sub-batch for this worker
        $tempMinId = $tempMaxId;
        $tempMaxId = $tempMinId + $fetchSize;
        if($tempMaxId > $endId) {
            $tempMaxId = $endId;
        }
        // Introduce some timing randomness
        $sleep = rand(1,5);
        sleep($sleep);
        $fetchCount++;
    }
    
    // This worker has completed their entire batch
    print "Worker " . $worker .  " finished\n";
    
};

// Create our workers and have them start working on their task
// In this case, it's a set of 171392 IDs to process
for($i = 0; $i < $workers; $i++) {
    $startId = $minId + ($i * $batchSize);
    $endId = $startId + $batchSize;
    if($i == ($workers - 1)) {
        $endId = $maxId;
    }
    \parallel\run($producer, array(($i+1), $startId, $endId, $rowsToFetch));
}

?>
up
9
anonymous user ¶
5 years ago
Although function declaration is not allowed inside thread exec code, include is allowed. So if we want to declare a function, we could write another file that contain the function and include it.
# main.php
<?php
$runtime = new parallel\Runtime ();
$future = $runtime->run ( function () {
    $future = $runtime->run ( function () {
        include "included.php";
        return add (1, 3);
    }, [ ] );
echo $future->value ();
# output: 4
# included.php
<?php
function add($a, $b){
    return $a + $b;
}
up
3
Thierry Kauffmann ¶
4 years ago
<?php

/**
 * Sample parralel functional API
 * using a generator instead of a static list of items to process
 * 
 * Items to process in parallel come from a generator
 * It could be anything : e.g fetch a mysql array, a DirectoryIterator...
 * Thus the number of items to process in parallel is NOT known in advance
 * 
 * This algorithm attributes items to each parallel thread dynamically
 * As soon as a thread has finished working
 * It is assigned a new item to process
 * until all items are processed (generator closes)
 * 
 * In this example we process 50 items in 5 parallel threads
 * It produces output in this form (output changes at each run) :
 * 
 * ThreadId: 1 => Item: 1 (Start)
 * ThreadId: 2 => Item: 2 (Start)
 * ThreadId: 3 => Item: 3 (Start)
 * ThreadId: 4 => Item: 4 (Start)
 * ThreadId: 5 => Item: 5 (Start)
 * ThreadId: 5 => Item: 5 Sleep: 3s (End)
 * ThreadId: 5 => Item: 6 (Start)
 * ThreadId: 3 => Item: 3 Sleep: 4s (End)
 * ThreadId: 3 => Item: 7 (Start)
 * ThreadId: 2 => Item: 2 Sleep: 6s (End)
 * ThreadId: 2 => Item: 8 (Start)
 * ...
 * ThreadId: 4 => Item: 44 Sleep: 6s (End)
 * ThreadId: 4 => Item: 49 (Start)
 * ThreadId: 3 => Item: 46 Sleep: 5s (End)
 * ThreadId: 3 => Item: 50 (Start)
 * ThreadId: 2 => Item: 43 Sleep: 9s (End)
 * Destroy ThreadId: 2
 * ThreadId: 1 => Item: 47 Sleep: 5s (End)
 * Destroy ThreadId: 1
 * ThreadId: 4 => Item: 49 Sleep: 7s (End)
 * Destroy ThreadId: 4
 * ThreadId: 5 => Item: 48 Sleep: 10s (End)
 * Destroy ThreadId: 5
 * ThreadId: 3 => Item: 50 Sleep: 10s (End)
 * Destroy ThreadId: 3
 */

use \parallel\{Runtime, Future, Channel, Events};

// Generate list of items to process with a generator
function generator(int $item_count) {
    for ($i=1; $i <= $item_count; $i++) {
        yield $i;
    }
}

function testConcurrency(int $concurrency, int $item_count) {

    $generator = generator($item_count);

    // Function executing in each thread. Have a snap for a random time for example !
    $producer = function (int $item_id) {
        $seconds = rand(1, 10);
        sleep($seconds);
        return ['item_id' => $item_id, 'sleep_seconds' => $seconds];
    };

    // Fill up threads with initial 'inactive' state
    $threads = array_fill(1, $concurrency, ['is_active' => false]);

    while (true) {
        // Loop through threads until all threads are finished
        foreach ($threads as $thread_id => $thread) {
            if (!$thread['is_active'] and $generator->valid()) {
                // Thread is inactive and generator still has values : run something in the thread
                $item_id = $generator->current();
                $threads[$thread_id]['run'] = \parallel\run($producer, [$item_id]);
                echo "ThreadId: $thread_id => Item: $item_id (Start)\n";
                $threads[$thread_id]['is_active'] = true;
                $generator->next();
            } elseif (!isset($threads[$thread_id]['run'])) {
                // Destroy supplementary threads in case generator closes sooner than number of threads
                echo "Destroy ThreadId: $thread_id\n";
                unset($threads[$thread_id]);
            } elseif ($threads[$thread_id]['run']->done()) {
                // Thread finished. Get results
                $item = $threads[$thread_id]['run']->value();
                echo "ThreadId: $thread_id => Item: {$item['item_id']} Sleep: {$item['sleep_seconds']}s (End)\n";

                if (!$generator->valid()) {
                    // Generator is closed then destroy thread
                    echo "Destroy ThreadId: $thread_id\n";
                    unset($threads[$thread_id]);
                } else {
                    // Thread is ready to run again
                    $threads[$thread_id]['is_active'] = false;
                }
            }
        }

        // Escape loop when all threads are destroyed
        if (empty($threads)) break;
    }
}

$concurrency = 5;
$item_count = 50;

testConcurrency($concurrency, $item_count);

?>