Skip to content
Browse files

init

  • Loading branch information...
0 parents commit 1e63ad1eedbf8f345567d0a2e73330ada064b424 @marcj marcj committed
Showing with 613 additions and 0 deletions.
  1. +9 −0 Bridges/BridgeInterface.php
  2. +36 −0 Bridges/Symfony.php
  3. +67 −0 Client.php
  4. +41 −0 Commands/StartCommand.php
  5. +34 −0 Commands/StatusCommand.php
  6. +269 −0 ProcessManager.php
  7. +119 −0 ProcessSlave.php
  8. +12 −0 README.md
  9. +15 −0 bin/ppm
  10. +11 −0 composer.json
9 Bridges/BridgeInterface.php
@@ -0,0 +1,9 @@
+<?php
+
+namespace PHPPM\Bridges;
+
+interface BridgeInterface
+{
+ public function bootstrap();
+ public function onRequest(\React\Http\Request $request, \React\Http\Response $response);
+}
36 Bridges/Symfony.php
@@ -0,0 +1,36 @@
+<?php
+
+namespace PHPPM\Bridges;
+
+use PHPPM\Bridges\BridgeInterface;
+
+class Symfony implements BridgeInterface
+{
+
+ /**
+ * @var \AppKernel
+ */
+ protected $kernel;
+
+ public function bootstrap()
+ {
+ require_once './vendor/autoload.php';
+ require_once './app/AppKernel.php';
+ $this->kernel = new \AppKernel('prod', false);
+ $this->kernel->loadClassCache();
+ }
+
+ public function onRequest(\React\Http\Request $request, \React\Http\Response $response)
+ {
+ $syRequest = new \Symfony\Component\HttpFoundation\Request();
+ $syRequest->headers->replace($request->getHeaders());
+ $syRequest->setMethod($request->getMethod());
+ $syRequest->server->set('REQUEST_URI', $request->getPath());
+ $syRequest->server->set('SERVER_NAME', explode(':', $request->getHeaders()['Host'])[0]);
+
+ $syResponse = $this->kernel->handle($syRequest);
+ $headers = array_map('current', $syResponse->headers->all());
+ $response->writeHead($syResponse->getStatusCode(), $headers);
+ $response->end($syResponse->getContent());
+ }
+}
67 Client.php
@@ -0,0 +1,67 @@
+<?php
+
+namespace PHPPM;
+
+class Client
+{
+ /**
+ * @var int
+ */
+ protected $controllerPort = 5100;
+
+ /**
+ * @var \React\EventLoop\ExtEventLoop|\React\EventLoop\LibEventLoop|\React\EventLoop\LibEvLoop|\React\EventLoop\StreamSelectLoop
+ */
+ protected $loop;
+
+ /**
+ * @var \React\Socket\Connection
+ */
+ protected $connection;
+
+ function __construct($controllerPort = 8080)
+ {
+ $this->controllerPort = $controllerPort;
+ $this->loop = \React\EventLoop\Factory::create();
+ }
+
+ /**
+ * @return \React\Socket\Connection
+ */
+ protected function getConnection()
+ {
+ if ($this->connection) {
+ $this->connection->close();
+ unset($this->connection);
+ }
+ $client = stream_socket_client('tcp://127.0.0.1:5500');
+ $this->connection = new \React\Socket\Connection($client, $this->loop);
+ return $this->connection;
+ }
+
+ protected function request($command, $options, $callback)
+ {
+ $data['cmd'] = $command;
+ $data['options'] = $options;
+ $connection = $this->getConnection();
+
+ $result = '';
+ $connection->on('data', function($data) use ($result) {
+ $result .= $data;
+ });
+
+ $connection->on('close', function() use ($callback, $result) {
+ $callback($result);
+ });
+
+ $connection->write(json_encode($data));
+ }
+
+ public function getStatus(callable $callback)
+ {
+ $this->request('status', [], function($result) use ($callback) {
+ $callback(json_decode($result, true));
+ });
+ }
+
+}
41 Commands/StartCommand.php
@@ -0,0 +1,41 @@
+<?php
+
+namespace PHPPM\Commands;
+
+use PHPPM\ProcessManager;
+use Symfony\Component\Console\Command\Command;
+use Symfony\Component\Console\Input\InputInterface;
+use Symfony\Component\Console\Input\InputOption;
+use Symfony\Component\Console\Output\OutputInterface;
+
+class StartCommand extends Command
+{
+ /**
+ * {@inheritdoc}
+ */
+ protected function configure()
+ {
+ parent::configure();
+
+ $this
+ ->setName('start')
+ ->addArgument('working-directory', null, 'working directory', './')
+ ->addOption('bridge', null, InputOption::VALUE_REQUIRED)
+ ->setDescription('Starts the server')
+ ;
+ }
+
+ protected function execute(InputInterface $input, OutputInterface $output)
+ {
+ if ($workingDir = $input->getArgument('working-directory')) {
+ chdir($workingDir);
+ }
+
+ $bridge = $input->getOption('bridge');
+
+ $handler = new ProcessManager();
+ $handler->setBridge($bridge);
+ $handler->run();
+ }
+
+}
34 Commands/StatusCommand.php
@@ -0,0 +1,34 @@
+<?php
+
+namespace PHPPM\Commands;
+
+use PHPPM\Client;
+use Symfony\Component\Console\Command\Command;
+use Symfony\Component\Console\Input\InputInterface;
+use Symfony\Component\Console\Output\OutputInterface;
+
+class StatusCommand extends Command
+{
+ /**
+ * {@inheritdoc}
+ */
+ protected function configure()
+ {
+ parent::configure();
+
+ $this
+ ->setName('status')
+ ->addArgument('working-directory', null, 'working directory', './')
+ ->setDescription('Status of all processes')
+ ;
+ }
+
+ protected function execute(InputInterface $input, OutputInterface $output)
+ {
+ $handler = new Client();
+ $handler->getStatus(function($status) {
+ var_dump($status);
+ });
+ }
+
+}
269 ProcessManager.php
@@ -0,0 +1,269 @@
+<?php
+
+namespace PHPPM;
+
+class ProcessManager
+{
+ /**
+ * @var array
+ */
+ protected $slaves = [];
+
+ /**
+ * @var \React\EventLoop\LibEventLoop|\React\EventLoop\StreamSelectLoop
+ */
+ protected $loop;
+
+ /**
+ * @var \React\Socket\Server
+ */
+ protected $controller;
+
+ /**
+ * @var \React\Socket\Server
+ */
+ protected $web;
+
+ /**
+ * @var int
+ */
+ protected $slaveCount = 1;
+
+ /**
+ * @var bool
+ */
+ protected $waitForSlaves = true;
+
+ /**
+ * Whether the server is up and thus creates new slaves when they die or not.
+ *
+ * @var bool
+ */
+ protected $run = false;
+
+ /**
+ * @var int
+ */
+ protected $index = 0;
+
+ /**
+ * @var string
+ */
+ protected $bridge;
+
+ /**
+ * @var int
+ */
+ protected $port = 8080;
+
+ function __construct($port = 8080, $slaveCount = 8)
+ {
+ $this->slaveCount = $slaveCount;
+ $this->port = $port;
+ }
+
+ public function fork()
+ {
+ if ($this->run) {
+ throw new \LogicException('Can not fork when already run.');
+ }
+
+ if (!pcntl_fork()) {
+ $this->run();
+ } else {
+ }
+ }
+
+ /**
+ * @param string $bridge
+ */
+ public function setBridge($bridge)
+ {
+ $this->bridge = $bridge;
+ }
+
+ /**
+ * @return string
+ */
+ public function getBridge()
+ {
+ return $this->bridge;
+ }
+
+ public function run()
+ {
+ $this->loop = \React\EventLoop\Factory::create();
+ $this->controller = new \React\Socket\Server($this->loop);
+ $this->controller->on('connection', array($this, 'onSlaveConnection'));
+ $this->controller->listen(5500);
+
+ $this->web = new \React\Socket\Server($this->loop);
+ $this->web->on('connection', array($this, 'onWeb'));
+ $this->web->listen($this->port);
+
+ echo "Waiting for slaves ... ";
+
+ for ($i = 0; $i < $this->slaveCount; $i++) {
+ $this->newInstance();
+ }
+
+ $this->run = true;
+ $this->loop();
+ }
+
+ public function onWeb(\React\Socket\Connection $incoming)
+ {
+ $slaveId = $this->getNextSlave();
+ echo sprintf("Slave #%d, fly and win!\n", $slaveId);
+ $port = $this->slaves[$slaveId]['port'];
+ $client = stream_socket_client('tcp://127.0.0.1:' . $port);
+ $redirect = new \React\Stream\Stream($client, $this->loop);
+
+ $redirect->on(
+ 'close',
+ function () use ($incoming) {
+ $incoming->end();
+ }
+ );
+
+ $incoming->on(
+ 'data',
+ function ($data) use ($redirect) {
+ $redirect->write($data);
+ }
+ );
+
+ $redirect->on(
+ 'data',
+ function ($data) use ($incoming) {
+ $incoming->write($data);
+ }
+ );
+ }
+
+ /**
+ * @return integer
+ */
+ protected function getNextSlave()
+ {
+ $count = count($this->slaves);
+
+ $this->index++;
+ if ($count === $this->index) {
+ //end
+ $this->index = 0;
+ }
+
+ return $this->index;
+ }
+
+ public function onSlaveConnection(\React\Socket\Connection $conn)
+ {
+ $conn->on(
+ 'data',
+ \Closure::bind(
+ function ($data) use ($conn) {
+ $this->onData($data, $conn);
+ },
+ $this
+ )
+ );
+ $conn->on(
+ 'close',
+ \Closure::bind(
+ function () use ($conn) {
+ foreach ($this->slaves as $idx => $slave) {
+ if ($slave['connection'] === $conn) {
+ unset($this->slaves[$idx]);
+ $this->checkSlaves();
+ }
+ }
+ },
+ $this
+ )
+ );
+ }
+
+ public function onData($data, $conn)
+ {
+ $this->processMessage($data, $conn);
+ }
+
+ public function processMessage($data, $conn)
+ {
+ $data = json_decode($data, true);
+
+ $method = 'command' . ucfirst($data['cmd']);
+ if (is_callable(array($this, $method))) {
+ $this->$method($data, $conn);
+ }
+ }
+
+ protected function commandStatus($options, $conn)
+ {
+ $result['activeSlaves'] = count($this->slaves);
+ $conn->end(json_encode($result));
+ }
+
+ protected function commandRegister(array $data, $conn)
+ {
+ $pid = (int)$data['pid'];
+ $port = (int)$data['port'];
+ $this->slaves[] = array(
+ 'pid' => $pid,
+ 'port' => $port,
+ 'connection' => $conn
+ );
+ if ($this->waitForSlaves && $this->slaveCount === count($this->slaves)) {
+ $slaves = array();
+ foreach ($this->slaves as $slave) {
+ $slaves[] = $slave['port'];
+ }
+ echo sprintf("%d slaves (%s) up and ready.\n", $this->slaveCount, implode(', ', $slaves));
+ }
+ }
+
+ protected function commandUnregister(array $data)
+ {
+ $pid = (int)$data['pid'];
+ echo sprintf("Slave died. (pid %d)\n", $pid);
+ foreach ($this->slaves as $idx => $slave) {
+ if ($slave['pid'] === $pid) {
+ unset($this->slaves[$idx]);
+ $this->checkSlaves();
+ }
+ }
+ $this->checkSlaves();
+ }
+
+ protected function checkSlaves()
+ {
+ if (!$this->run) {
+ return;
+ }
+
+ $i = count($this->slaves);
+ if ($this->slaveCount !== $i) {
+ echo sprintf('Boot %d new slaves ... ', $this->slaveCount - $i);
+ $this->waitForSlaves = true;
+ for (; $i < $this->slaveCount; $i++) {
+ $this->newInstance();
+ }
+ }
+ }
+
+ function loop()
+ {
+ $this->loop->run();
+ }
+
+ function newInstance()
+ {
+ $pid = pcntl_fork();
+ if (!$pid) {
+ //we're in the slave now
+ new ProcessSlave($this->getBridge());
+ exit;
+ }
+ }
+}
119 ProcessSlave.php
@@ -0,0 +1,119 @@
+<?php
+
+namespace PHPPM;
+
+class ProcessSlave
+{
+
+ /**
+ * @var \React\EventLoop\LibEventLoop|\React\EventLoop\StreamSelectLoop
+ */
+ protected $loop;
+
+ /**
+ * @var resource
+ */
+ protected $client;
+
+ /**
+ * @var \React\Socket\Connection
+ */
+ protected $connection;
+
+ /**
+ * @var string
+ */
+ protected $bridgeName;
+
+ /**
+ * @var Bridges\BridgeInterface
+ */
+ protected $bridge;
+
+ public function __construct($bridgeName = null)
+ {
+ $this->bridgeName = $bridgeName;
+ $this->bootstrap();
+ $this->connectToMaster();
+ $this->loop->run();
+ }
+
+ protected function shutdown()
+ {
+ echo "SHUTTING SLAVE PROCESS DOWN\n";
+ $this->bye();
+ exit;
+ }
+
+ /**
+ * @return Bridges\BridgeInterface
+ */
+ protected function getBridge()
+ {
+ if (null === $this->bridge && $this->bridgeName) {
+ $bridgeClass = sprintf('PHPPM\Bridges\\%s', ucfirst($this->bridgeName));
+ $this->bridge = new $bridgeClass;
+ }
+
+ return $this->bridge;
+ }
+
+ protected function bootstrap()
+ {
+ if ($bridge = $this->getBridge()) {
+ $bridge->bootstrap();
+ }
+ }
+
+ public function connectToMaster()
+ {
+ $this->loop = \React\EventLoop\Factory::create();
+ $this->client = stream_socket_client('tcp://127.0.0.1:5500');
+ $this->connection = new \React\Socket\Connection($this->client, $this->loop);
+
+ $this->connection->on(
+ 'close',
+ \Closure::bind(
+ function () {
+ $this->shutdown();
+ },
+ $this
+ )
+ );
+
+ $socket = new \React\Socket\Server($this->loop);
+ $http = new \React\Http\Server($socket);
+ $http->on('request', array($this, 'onRequest'));
+
+ $port = 5501;
+ while ($port < 5600) {
+ try {
+ $socket->listen($port);
+ break;
+ } catch( \React\Socket\ConnectionException $e ) {
+ $port++;
+ }
+ }
+
+ $this->connection->write(json_encode(array('cmd' => 'register', 'pid' => getmypid(), 'port' => $port)));
+ }
+
+ public function onRequest(\React\Http\Request $request, \React\Http\Response $response)
+ {
+ if ($bridge = $this->getBridge()) {
+ return $bridge->onRequest($request, $response);
+ } else {
+ $response->writeHead('404');
+ $response->end('No Bridge Defined.');
+ }
+ }
+
+ public function bye()
+ {
+ if ($this->connection->isWritable()) {
+ $this->connection->write(json_encode(array('cmd' => 'unregister', 'pid' => getmypid())));
+ $this->connection->close();
+ }
+ $this->loop->stop();
+ }
+}
12 README.md
@@ -0,0 +1,12 @@
+PHP ProcessManager for ReactPHP
+===============================
+
+This is the library used in my blog entry about high performance PHP applications.
+
+First initial version. Final version will be pushed when the blog entry is published.
+
+Example:
+
+```bash
+$ ./bin/ppm start ~/bude/symfony-24/ --bridge=symfony
+```
15 bin/ppm
@@ -0,0 +1,15 @@
+#!/usr/bin/env php
+<?php
+
+set_time_limit(0);
+
+$loader = require_once __DIR__ . '/../vendor/autoload.php';
+
+use Symfony\Component\Console\Application;
+use PHPPM\Commands\StartCommand;
+use PHPPM\Commands\StatusCommand;
+
+$app = new Application('PHP-PM');
+$app->add(new StartCommand);
+$app->add(new StatusCommand);
+$app->run();
11 composer.json
@@ -0,0 +1,11 @@
+{
+ "require": {
+ "symfony/console": "~2.4",
+ "react/react": "0.3.3"
+ },
+ "autoload": {
+ "psr-4": {
+ "PHPPM\\": ""
+ }
+ }
+}

0 comments on commit 1e63ad1

Please sign in to comment.
Something went wrong with that request. Please try again.