Installation

Install the latest version using composer:

composer require sweikenb/pcntl

Changelog

Please consult the CHANGELOG.md for latest update information.

System Requirements

This library requires at least PHP v8.2 with the following extensions enabled:

  • pcntl
  • posix
  • sockets

Note that this library is incompatible with the grpc extension!

In order to execute the unit-tests you will need to run them in a linux environment.

Process Manager

The process manager is the core of this library and provides the basic threading functionality by utilizing the native functions of the pcntl and posix modules.

Basic Usage

use Sweikenb\Library\Pcntl\Api\ChildProcessInterface as ChildProcess;
use Sweikenb\Library\Pcntl\Api\ParentProcessInterface as ParentProcess;
use Sweikenb\Library\Pcntl\Api\ProcessOutputInterface as Output;
use Sweikenb\Library\Pcntl\ProcessManager;

require __DIR__ . '/vendor/autoload.php';

$pm = new ProcessManager();

// inline callback
$pm->runProcess(fn() => sleep(3));

// example with all available callback parameters
$pm->runProcess(function (ChildProcess $child, ParentProcess $parent, Output $output) {
    $output->stdout(sprintf("Parent PID: %s\n", $parent->getId()));
    $output->stdout(sprintf("Child PID: %s\n", $child->getId()));
});

// return 'true' or 'null'/'void' to indicate that the execution was successful
$pm->runProcess(function () {
    //...
    return true; // this will exit the child process with exit-code `0`
});

// return 'false' to indicate that the execution failed
$pm->runProcess(function () {
    //...
    return false; // this will exit the child process with exit-code `1`
});

// prints the PID of the main process
var_dump($pm->getMainProcess()->getId());

Wait for Children

If you want to continue synchronously after creating child-threads, simply call the wait()-method of the process manager. By default, the method is called automatically at the end of each script-execution but this can be configured using the $autoWait-setting of the ProcessManager.

use Sweikenb\Library\Pcntl\ProcessManager;

require __DIR__ . '/vendor/autoload.php';

$pm = new ProcessManager();

// run in sync
// ...

// run the next three lines async
$pm->runProcess(fn() => sleep(3));
$pm->runProcess(fn() => sleep(1));
$pm->runProcess(fn() => sleep(2));

// wait for all threads to finish
$pm->wait();

// continue to run in sync from here
// ...

If you wat to know which children exited, you can provide a callback function for the wait()-method:

// wait for all threads to finish
$pm->wait(function (int $status, int $pid) {
    echo sprintf("The child with pid %s exited with status code %s\n", $pid, $status);
});

By default, the wait()-method will wait for ALL children to exit before it continues the programm. If you wish to only wait for a specific children to exit, you can modify this behavior by returning false in the callback:

$pm->wait(function (int $status, int $pid) {
    if ($status === 1) {
        // the child failed, lets stop waiting and continue with the programm-flow
        return false;
    }

    // info
    echo sprintf("The child with pid %s exited with status code %s\n", $pid, $status);

    // continue to wait
    return true;
});

Beside the callback in the wait()-method itself, there are also thread-hooks that can be used to get notified when a thread is created or finished.

Thread-Hooks

You can register hooks that gets triggered during the lifetime of a thread. Note that you can register multiple hooks for the same lifetime-event and that the callbacks are executed in the order of registration.

onThreadCreate

Registers a callback that gets called whenever a thread is created:

$pm->onThreadCreate(function (ChildProcessInterface $child) {
    echo sprintf("The child with pid %s was created.", $child->getId());
});

onThreadExit

Registers a callback that gets called whenever a thread exits:

$pm->onThreadExit(function (int $status, int $pid) {
    echo sprintf("The child with pid %s exited with status %s", $pid, $status);
});

Please note that the callback of the wait() method gets called BEFORE the lifecycle hooks.

Settings

  • $autoWait
    • enables or disables the automatic wait at the end of the script
    • default: true RECOMMENDED!
  • $propagateSignals
    • list of signals that should be propagated to the child-processes
    • default signals:
      • SIGTERM graceful exit request by the system or user
      • SIGINT user interrupts the execution (e.g. ctrl + c in the terminal)
      • SIGHUP usually used to request a config reload
      • SIGALRM usually used for timeout management
      • SIGUSR1 custom signal 1
      • SIGUSR2 custom signal 2
    • please note that SIGCHLD can NOT be propagated due to how the process-manager internally handles this signal
  • $processFactory
    • factory instance that should be used to create the process models
    • default: Sweikenb\Library\Pcntl\Factory\ProcessFactory
  • $processOutput
    • output instance that should be used as proxy for writing data to STDOUT and STDERR
    • default: Sweikenb\Library\Pcntl\ProcessOutput

Process Queue

If you do not know how many threads you might need, but you want to limit the amount of threads that will be forked simultaneously, you can use the ProcessQueue which internally ensures that your thread-limit is never exceeded.

Basic Usage

use Sweikenb\Library\Pcntl\ProcessQueue;

require __DIR__ . '/vendor/autoload.php';

// only 4 threads will be forked and further callbacks must wait
// until free slots are available
$maxThreads = 4;

$queue = new ProcessQueue($maxThreads);
for ($i = 0; $i < 100; $i++) {
    $queue->addToQueue(fn() => sleep(3));
}

// wait until the whole queue is done
$queue->wait();

Settings

  • $maxThreads
    • the maximum number of threads that might be forked (min. 1)
  • $processManager
    • instance of the process manager to be used
    • default: Sweikenb\Library\Pcntl\ProcessManager

Inter Process Communication (IPC)

When working with threads, you might want to send data between the parent- and the child-process (e.g. to update data or return the result of an asynchronous workload).

In order to do so, the threads have a direct socket-connection which can be used to send messages with custom payloads.

Topic and Payload

The topic is intended to be used as description, intention or routing of the message. Beside the fact that it must be a string, the topic can be anything you like.

The payload on the other hand is intended to carry the actual data you want to transfer between the threads. Please beware that you can only send payloads that are serializable. Any kind of file-pointer or resource (e.g. database-connections) will NOT work!

You might also want to refer to the Common Pitfalls and Workarounds section if you run into trouble.

Basic Usage

use Sweikenb\Library\Pcntl\Api\ChildProcessInterface as ChildProcess;
use Sweikenb\Library\Pcntl\Api\ParentProcessInterface as ParentProcess;
use Sweikenb\Library\Pcntl\Factory\MessageFactory;
use Sweikenb\Library\Pcntl\ProcessManager;

require __DIR__ . '/vendor/autoload.php';

$pm = new ProcessManager();
$factory = new MessageFactory();

$child = $pm->runProcess(
    function (ChildProcess $child, ParentProcess $parent) use ($factory) {
        $messageFromParent = $parent->getNextMessage();
        $parent->sendMessage(
            $factory->create(
                'hello parent',
                [
                    'pid' => $child->getId(),
                    'lastMessage' => $messageFromParent->getTopic()
                ]
            )
        );
    }
);

$child->sendMessage($factory->create('hello child', null));
$messageFromChild = $child->getNextMessage();

var_dump($messageFromChild);

This will output something like this:

class Sweikenb\Library\Pcntl\Model\Ipc\MessageModel#14 (2) {
  private string $topic =>
  string(12) "hello parent"
  private mixed $payload =>
  array(2) {
    'pid' =>
    int(54723)
    'lastMessage' =>
    string(11) "hello child"
  }
}

Process finished with exit code 0

Common Pitfalls and Workarounds

Database Connections and File-Pointer

When working with database-connections, file-pointer or any other kind of ressource, you have to be careful when forking processes.

A fork is a copy of the original thread that contains a copy of the memory too. The pointer and resources referenced by the copied memory still belong to the main thread and when accessing them it wil cause errors and a lot of problems.

Fortunately, there are some simple workarounds:

  1. Do not open any connection or pointer before you fork your process (easy enough)
  2. If you need to open a connection or resource, close it and create a new instance inside the fork:
use Sweikenb\Library\Pcntl\ProcessManager;

require __DIR__ . '/vendor/autoload.php';

$pm = new ProcessManager();

// get the connection
$connection = new Connection();

// load results
$results = $connection->getReults();

// close connection
$connection->close();
foreach ($results as $result) {
    $pm->runProcess(function () use ($result) {
        // re-open connection
        $connection = new Connection();

        // TODO process data

        // update data
        $connection->update($result);

        // close connection
        $connection->close();
    });
}