Главная
Блог разработчиков phpBB
 
+ 17 предустановленных модов
+ SEO-оптимизация форума
+ авторизация через соц. сети
+ защита от спама

Делаем вебсокеты на PHP с нуля

Anna | 31.05.2014 | нет комментариев

Некоторое время назад я выбирал библиотеку для работы с вебсокетами. На просторах интернета я натыкался на статьи по интеграции node.js с yii, а примерно все статьи о вебсокетах на прогре ограничивались инструкциями к тому, как применять phpdaemon.

Я постигал библиотеки phpdaemon и ratchet, они довольно монструозны (причём применяя ratchet для отправки сообщения определенному пользователю рекомендовано добавочно применять wamp). Мне не вовсе было ясно для чего применять таких монстров, которые требуют установку других монстров. Почитав исходники этих, а также других библиотек, я разобрался как всё устроено и мне захотелось написать примитивный вебсокет-сервер на php независимо. Это помогло мне закрепить изученный материал и наткнуться на некоторые подводные камни, о которых я не имел представления.

Так я решил написать нужный для меня функционал с нуля.

Получившийся код и ссылка на демонстрационный чат в конце статьи.

Поставленные цели:

1) разобраться с серверными сокетами в php
2) разобраться с протоколом вебсокетов
3) написать с нуля примитивный сервер вебсокетов

1) Серверные сокеты в php

До этого момента я имел смутные представления о серверных сокетах. Почитав исходники нескольких библиотек для работы с вебсокетами я столкнулся с двумя схемами их реализаций:

применяя растяжение php «socket»:

$socket = socket_create(AF_INET, SOCK_STREAM, SOL_TCP);//создаём сокет
socket_bind($socket, '127.0.0.1', 8000);//привязываем его к указанным ip и порту
socket_set_option($socket, SOL_SOCKET, SO_REUSEADDR, 1);//разрешаем применять один порт для нескольких соединений
socket_listen($socket);//слушаем сокет

либо применяя растяжение php «stream»:

$socket = stream_socket_server("tcp://127.0.0.1:8000", $errno, $errstr);

Я предпочёл 2-й вариант ввиду его краткости.

Выходит, мы сотворили серверный сокет и сейчас хотим обрабатывать новые соединения к нему, для этого вновь же есть два варианта

while ($connect = stream_socket_accept($socket, -1)) {//ожидаем новое соединение (без таймаута)
    ...обрабатываем $connect
}

 

Пример простого http сервера, тот, что на все запросы отвечает: Здравствуй

#!/usr/bin/env php
<?php

$socket = stream_socket_server("tcp://0.0.0.0:8000", $errno, $errstr);

if (!$socket) {
    die("$errstr ($errno)\n");
}

while ($connect = stream_socket_accept($socket, -1)) {
    fwrite($connect, "HTTP/1.1 200 OK\r\nContent-Type: text/html\r\nConnection: close\r\n\r\nПривет");
    fclose($connect);
}

fclose($socket);

либо с применением stream_select

$connects = array();
while (true) {
    //формируем массив прослушиваемых сокетов:
    $read = $connects;
    $read[] = $socket;
    $write = $except = null;

    if (!stream_select($read, $write, $except, null)) {//ожидаем сокеты доступные для чтения (без таймаута)
        break;
    }

    if (in_array($socket, $read)) {//есть новое соединение
        $connect = stream_socket_accept($socket, -1);//принимаем новое соединение
        $connects[] = $connect;//добавляем его в список нужных для обработки
        unset($read[ array_search($socket, $read) ]);
    }

    foreach($read as $connect) {//обрабатываем все соединения
        ...обрабатываем $connect
        unset($connects[ array_search($connect, $connects) ]);
    }
}

 

Пример простого http сервера с применением stream_select, тот, что на все запросы отвечает: Здравствуй

#!/usr/bin/env php
<?php

$socket = stream_socket_server("tcp://0.0.0.0:8000", $errno, $errstr);

if (!$socket) {
    die("$errstr ($errno)\n");
}

$connects = array();
while (true) {
    //формируем массив прослушиваемых сокетов:
    $read = $connects;
    $read []= $socket;
    $write = $except = null;

    if (!stream_select($read, $write, $except, null)) {//ожидаем сокеты доступные для чтения (без таймаута)
        break;
    }

    if (in_array($socket, $read)) {//есть новое соединение
        $connect = stream_socket_accept($socket, -1);//принимаем новое соединение
        $connects[] = $connect;//добавляем его в список нужных для обработки
        unset($read[ array_search($socket, $read) ]);
    }

    foreach($read as $connect) {//обрабатываем все соединения
        $headers = '';
        while ($buffer = rtrim(fgets($connect))) {
            $headers .= $buffer;
        }
        fwrite($connect, "HTTP/1.1 200 OK\r\nContent-Type: text/html\r\nConnection: close\r\n\r\nПривет");
        fclose($connect);
        unset($connects[ array_search($connect, $connects) ]);
    }
}

fclose($server);

Т.к. нам в последующем необходимо будет единовременно обрабатывать и серверный сокет на предмет новых соединений, и теснее существующие подключения, на предмет новых сообщений, то остановимся на втором варианте.

2) Протокол вебсокетов

В этой статье отлично описан протокол взаимодействия.
Нас волнует два момента:

«Рукопожатие» либо handshake:

Считываем значение Sec-WebSocket-Key из пришедшего заголовка от заказчика, рассчитываем на его основе Sec-WebSocket-Accept и отправляем итоговый результат:

$SecWebSocketAccept = base64_encode(pack('H*', sha1($SecWebSocketKey . '258EAFA5-E914-47DA-95CA-C5AB0DC85B11')));
$response = "HTTP/1.1 101 Web Socket Protocol Handshake\r\n" .
    "Upgrade: websocket\r\n" .
    "Connection: Upgrade\r\n" .
    "Sec-WebSocket-Accept:$SecWebSocketAccept\r\n\r\n";

 

Пример функции, которая это делает

function handshake($connect) {
    $info = array();

    $line = fgets($connect);
    $header = explode(' ', $line);
    $info['method'] = $header[0];
    $info['uri'] = $header[1];

    //считываем заголовки из соединения
    while ($line = rtrim(fgets($connect))) {
        if (preg_match('/\A(\S ): (.*)\z/', $line, $matches)) {
            $info[$matches[1]] = $matches[2];
        } else {
            break;
        }
    }

    $address = explode(':', stream_socket_get_name($connect, true)); //получаем адрес заказчика
    $info['ip'] = $address[0];
    $info['port'] = $address[1];

    if (empty($info['Sec-WebSocket-Key'])) {
        return false;
    }

    //отправляем заголовок согласно протоколу вебсокета
    $SecWebSocketAccept = base64_encode(pack('H*', sha1($info['Sec-WebSocket-Key'] . '258EAFA5-E914-47DA-95CA-C5AB0DC85B11')));
    $upgrade = "HTTP/1.1 101 Web Socket Protocol Handshake\r\n" .
	"Upgrade: websocket\r\n" .
	"Connection: Upgrade\r\n" .
	"Sec-WebSocket-Accept:$SecWebSocketAccept\r\n\r\n";
    fwrite($connect, $upgrade);

    return $info;
}
обмен сообщениями

Позже приобретения данных из вебсокета нам необходимо их раскодировать, а при отправке закодировать.
Всё в той же статье отлично описано кодирование сообщений, но нам по-сути необходимы только две функции: decode и encode.

Пример реализации функций decode и encode

function decode($data)
{
$unmaskedPayload = ”;
$decodedData = array();// estimate frame type:
$firstByteBinary = sprintf(‘%08b’, ord($data[0]));
$secondByteBinary = sprintf(‘%08b’, ord($data[1]));
$opcode = bindec(substr($firstByteBinary, 4, 4));
$isMasked = ($secondByteBinary[0] == ’1′)? true: false;
$payloadLength = ord($data[1]) & 127;

// unmasked frame is received:
if (!$isMasked) {
return array(‘type’ => ”, ‘payload’ => ”, ‘error’ => ‘protocol error (1002)’);
}

switch ($opcode) {
// text frame:
case 1:
$decodedData['type'] = ‘text’;
break;

case 2:
$decodedData['type'] = ‘binary’;
break;

// connection close frame:
case 8:
$decodedData['type'] = ‘close’;
break;

// ping frame:
case 9:
$decodedData['type'] = ‘ping’;
break;

// pong frame:
case 10:
$decodedData['type'] = ‘pong’;
break;

default:
return array(‘type’ => ”, ‘payload’ => ”, ‘error’ => ‘unknown opcode (1003)’);
}

if ($payloadLength === 126) {
$mask = substr($data, 4, 4);
$payloadOffset = 8;
$dataLength = bindec(sprintf(‘%08b’, ord($data[2])). sprintf(‘%08b’, ord($data[3]))) $payloadOffset;
} elseif ($payloadLength === 127) {
$mask = substr($data, 10, 4);
$payloadOffset = 14;
$tmp = ”;
for ($i = 0; $i < 8; $i ) {
$tmp .= sprintf(‘%08b’, ord($data[$i 2]));
}
$dataLength = bindec($tmp) $payloadOffset;
unset($tmp);
} else {
$mask = substr($data, 2, 4);
$payloadOffset = 6;
$dataLength = $payloadLength $payloadOffset;
}

/**
* We have to check for large frames here. socket_recv cuts at 1024 bytes
* so if websocket-frame is > 1024 bytes we have to wait until whole
* data is transferd.
*/
if (strlen($data) < $dataLength) {
return false;
}

if ($isMasked) {
for ($i = $payloadOffset; $i < $dataLength; $i ) {
$j = $i — $payloadOffset;
if (isset($data[$i])) {
$unmaskedPayload .= $data[$i] ^ $mask[$j % 4];
}
}
$decodedData['payload'] = $unmaskedPayload;
} else {
$payloadOffset = $payloadOffset — 4;
$decodedData['payload'] = substr($data, $payloadOffset);
}

return $decodedData;
}

function encode($payload, $type = ‘text’, $masked = false)
{
$frameHead = array();
$payloadLength = strlen($payload);

switch ($type) {
case ‘text’:
// first byte indicates FIN, Text-Frame (10000001):
$frameHead[0] = 129;
break;

case ‘close’:
// first byte indicates FIN, Close Frame(10001000):
$frameHead[0] = 136;
break;

case ‘ping’:
// first byte indicates FIN, Ping frame (10001001):
$frameHead[0] = 137;
break;

case ‘pong’:
// first byte indicates FIN, Pong frame (10001010):
$frameHead[0] = 138;
break;
}

// set mask and payload length (using 1, 3 or 9 bytes)
if ($payloadLength > 65535) {
$payloadLengthBin = str_split(sprintf(‘%064b’, $payloadLength), 8);
$frameHead[1] = ($masked === true)? 255: 127;
for ($i = 0; $i < 8; $i ) {
$frameHead[$i 2] = bindec($payloadLengthBin[$i]);
}
// most significant bit MUST be 0
if ($frameHead[2] > 127) {
return array(‘type’ => ”, ‘payload’ => ”, ‘error’ => ‘frame too large (1004)’);
}
} elseif ($payloadLength > 125) {
$payloadLengthBin = str_split(sprintf(‘%016b’, $payloadLength), 8);
$frameHead[1] = ($masked === true)? 254: 126;
$frameHead[2] = bindec($payloadLengthBin[0]);
$frameHead[3] = bindec($payloadLengthBin[1]);
} else {
$frameHead[1] = ($masked === true)? $payloadLength 128: $payloadLength;
}

// convert frame-head to string:
foreach (array_keys($frameHead) as $i) {
$frameHead[$i] = chr($frameHead[$i]);
}
if ($masked === true) {
// generate a random mask:
$mask = array();
for ($i = 0; $i < 4; $i ) {
$mask[$i] = chr(rand(0, 255));
}

$frameHead = array_merge($frameHead, $mask);
}
$frame = implode(”, $frameHead);

// append payload to frame:
for ($i = 0; $i < $payloadLength; $i ) {
$frame .= ($masked === true)? $payload[$i] ^ $mask[$i % 4]: $payload[$i];
}

return $frame;
}

Примитивный сервер вебсокетов

Выходит, у нас есть каждая нужная информация.
Применяя код простого http сервера из первой части, а также функции handshake, decode и encode из 2-й мы можем собрать примитивный сервер вебсокетов.

Пример реализации простого сервера вебсокетов

#!/usr/bin/env php
<?php

$socket = stream_socket_server("tcp://0.0.0.0:8000", $errno, $errstr);

if (!$socket) {
    die("$errstr ($errno)\n");
}

$connects = array();
while (true) {
    //формируем массив прослушиваемых сокетов:
$read = $connects;
    $read []= $socket;
    $write = $except = null;

    if (!stream_select($read, $write, $except, null)) {//ожидаем сокеты доступные для чтения (без таймаута)
        break;
    }

    if (in_array($socket, $read)) {//есть новое соединение
        //принимаем новое соединение и изготавливаем рукопожатие:
        if (($connect = stream_socket_accept($socket, -1)) && $info = handshake($connect)) {
            $connects[] = $connect;//добавляем его в список нужных для обработки
            onOpen($connect, $info);//вызываем пользовательский сценарий
        }
        $connects[] = $connect;//добавляем его в список нужных для обработки
        unset($read[ array_search($socket, $read) ]);
    }

    foreach($read as $connect) {//обрабатываем все соединения
        $data = fread($connect, 100000);

        if (!$data) { //соединение было закрыто
            fclose($connect);
            unset($connects[ array_search($connect, $connects) ]);
            onClose($connect);//вызываем пользовательский сценарий
            continue;
        }

        onMessage($connect, $data);//вызываем пользовательский сценарий
    }
}

fclose($server);

function handshake($connect) {
    $info = array();

    $line = fgets($connect);
    $header = explode(' ', $line);
    $info['method'] = $header[0];
    $info['uri'] = $header[1];

    //считываем заголовки из соединения
    while ($line = rtrim(fgets($connect))) {
        if (preg_match('/\A(\S ): (.*)\z/', $line, $matches)) {
            $info[$matches[1]] = $matches[2];
        } else {
            break;
        }
    }

    $address = explode(':', stream_socket_get_name($connect, true)); //получаем адрес заказчика
    $info['ip'] = $address[0];
    $info['port'] = $address[1];

    if (empty($info['Sec-WebSocket-Key'])) {
        return false;
    }

    //отправляем заголовок согласно протоколу вебсокета
    $SecWebSocketAccept = base64_encode(pack('H*', sha1($info['Sec-WebSocket-Key'] . '258EAFA5-E914-47DA-95CA-C5AB0DC85B11')));
    $upgrade = "HTTP/1.1 101 Web Socket Protocol Handshake\r\n" .
        "Upgrade: websocket\r\n" .
        "Connection: Upgrade\r\n" .
        "Sec-WebSocket-Accept:$SecWebSocketAccept\r\n\r\n";
    fwrite($connect, $upgrade);

    return $info;
}

function encode($payload, $type = 'text', $masked = false)
{
    $frameHead = array();
    $payloadLength = strlen($payload);

    switch ($type) {
        case 'text':
            // first byte indicates FIN, Text-Frame (10000001):
            $frameHead[0] = 129;
            break;

        case 'close':
            // first byte indicates FIN, Close Frame(10001000):
            $frameHead[0] = 136;
            break;

        case 'ping':
            // first byte indicates FIN, Ping frame (10001001):
            $frameHead[0] = 137;
            break;

        case 'pong':
            // first byte indicates FIN, Pong frame (10001010):
            $frameHead[0] = 138;
            break;
    }

    // set mask and payload length (using 1, 3 or 9 bytes)
    if ($payloadLength > 65535) {
        $payloadLengthBin = str_split(sprintf('%064b', $payloadLength), 8);
        $frameHead[1] = ($masked === true) ? 255 : 127;
        for ($i = 0; $i < 8; $i  ) {
            $frameHead[$i   2] = bindec($payloadLengthBin[$i]);
        }
        // most significant bit MUST be 0
        if ($frameHead[2] > 127) {
            return array('type' => '', 'payload' => '', 'error' => 'frame too large (1004)');
        }
    } elseif ($payloadLength > 125) {
        $payloadLengthBin = str_split(sprintf('%016b', $payloadLength), 8);
        $frameHead[1] = ($masked === true) ? 254 : 126;
        $frameHead[2] = bindec($payloadLengthBin[0]);
        $frameHead[3] = bindec($payloadLengthBin[1]);
    } else {
        $frameHead[1] = ($masked === true) ? $payloadLength   128 : $payloadLength;
    }

    // convert frame-head to string:
    foreach (array_keys($frameHead) as $i) {
        $frameHead[$i] = chr($frameHead[$i]);
    }
    if ($masked === true) {
        // generate a random mask:
        $mask = array();
        for ($i = 0; $i < 4; $i  ) {
            $mask[$i] = chr(rand(0, 255));
        }

        $frameHead = array_merge($frameHead, $mask);
    }
    $frame = implode('', $frameHead);

    // append payload to frame:
    for ($i = 0; $i < $payloadLength; $i  ) {
        $frame .= ($masked === true) ? $payload[$i] ^ $mask[$i % 4] : $payload[$i];
    }

    return $frame;
}

function decode($data)
{
    $unmaskedPayload = '';
    $decodedData = array();

    // estimate frame type:
    $firstByteBinary = sprintf('%08b', ord($data[0]));
    $secondByteBinary = sprintf('%08b', ord($data[1]));
    $opcode = bindec(substr($firstByteBinary, 4, 4));
    $isMasked = ($secondByteBinary[0] == '1') ? true : false;
    $payloadLength = ord($data[1]) & 127;

    // unmasked frame is received:
    if (!$isMasked) {
        return array('type' => '', 'payload' => '', 'error' => 'protocol error (1002)');
    }

    switch ($opcode) {
        // text frame:
        case 1:
            $decodedData['type'] = 'text';
            break;

        case 2:
            $decodedData['type'] = 'binary';
            break;

        // connection close frame:
        case 8:
            $decodedData['type'] = 'close';
            break;

        // ping frame:
        case 9:
            $decodedData['type'] = 'ping';
            break;

        // pong frame:
        case 10:
            $decodedData['type'] = 'pong';
            break;

        default:
            return array('type' => '', 'payload' => '', 'error' => 'unknown opcode (1003)');
    }

    if ($payloadLength === 126) {
        $mask = substr($data, 4, 4);
        $payloadOffset = 8;
        $dataLength = bindec(sprintf('%08b', ord($data[2])) . sprintf('%08b', ord($data[3])))   $payloadOffset;
    } elseif ($payloadLength === 127) {
        $mask = substr($data, 10, 4);
        $payloadOffset = 14;
        $tmp = '';
        for ($i = 0; $i < 8; $i  ) {
            $tmp .= sprintf('%08b', ord($data[$i   2]));
        }
        $dataLength = bindec($tmp)   $payloadOffset;
        unset($tmp);
    } else {
        $mask = substr($data, 2, 4);
        $payloadOffset = 6;
        $dataLength = $payloadLength   $payloadOffset;
    }

    /**
     * We have to check for large frames here. socket_recv cuts at 1024 bytes
     * so if websocket-frame is > 1024 bytes we have to wait until whole
     * data is transferd.
     */
    if (strlen($data) < $dataLength) {
        return false;
    }

    if ($isMasked) {
        for ($i = $payloadOffset; $i < $dataLength; $i  ) {
            $j = $i - $payloadOffset;
            if (isset($data[$i])) {
                $unmaskedPayload .= $data[$i] ^ $mask[$j % 4];
            }
        }
        $decodedData['payload'] = $unmaskedPayload;
    } else {
        $payloadOffset = $payloadOffset - 4;
        $decodedData['payload'] = substr($data, $payloadOffset);
    }

    return $decodedData;
}

//пользовательские сценарии:

function onOpen($connect, $info) {
    echo "open\n";
    fwrite($connect, encode('Здравствуй'));
}

function onClose($connect) {
    echo "close\n";
}

function onMessage($connect, $data) {
    echo decode($data)['payload'] . "\n";
}

В приведённом примере дозволено менять пользовательские сценарии onOpen, onClose и onMessage для реализации нужного функционала.

Поставленные цели достигнуты.
Если данный материал вам покажется увлекательным, то в дальнейшей статье я опишу как дозволено запускать несколько процессов для обработки соединений (один мастер и несколько воркеров), межпроцессное взаимодействие, интеграцию с вашим фреймворком на примере компонента yii.

демонстрационный чат с вышеописанными функциями

Код демонстрационного чата

#!/usr/bin/env php
<?php

class WebsocketServer
{
    public function __construct($config) {
        $this->config = $config;
    }

    public function start() {
        //открываем серверный сокет
        $server = stream_socket_server("tcp://{$this->config['host']}:{$this->config['port']}", $errorNumber, $errorString);

        if (!$server) {
            die("error: stream_socket_server: $errorString ($errorNumber)\r\n");
        }

        list($pid, $master, $workers) = $this->spawnWorkers();//создаём дочерние процессы

        if ($pid) {//мастер
            fclose($server);//мастер не будет обрабатывать входящие соединения на основном сокете
            $WebsocketMaster = new WebsocketMaster($workers);//мастер будет пересылать сообщения между воркерами
            $WebsocketMaster->start();
        } else {//воркер
            $WebsocketHandler = new WebsocketHandler($server, $master);
            $WebsocketHandler->start();
        }
    }

    protected function spawnWorkers() {
        $master = null;
        $workers = array();
        $i = 0;
        while ($i < $this->config['workers']) {
            $i  ;
            //создаём парные сокеты, через них будут связываться мастер и воркер
            $pair = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, STREAM_IPPROTO_IP);

            $pid = pcntl_fork();//создаём форк
            if ($pid == -1) {
                die("error: pcntl_fork\r\n");
            } elseif ($pid) { //мастер
                fclose($pair[0]);
                $workers[$pid] = $pair[1];//один из пары будет в мастере
            } else { //воркер
                fclose($pair[1]);
                $master = $pair[0];//второй в воркере
                break;
            }
        }

        return array($pid, $master, $workers);
    }
}

class WebsocketMaster
{
    protected $workers = array();
    protected $clients = array();

    public function __construct($workers) {
        $this->clients = $this->workers = $workers;
    }

    public function start() {
        while (true) {
            //подготавливаем массив всех сокетов, которые необходимо обработать
            $read = $this->clients;
            //$read[] = $service;

            stream_select($read, $write, $except, null);//обновляем массив сокетов, которые дозволено обработать

            if ($read) {//пришли данные от подключенных заказчиков
                foreach ($read as $client) {
                    $data = fread($client, 100000);

                    if (!$data) { //соединение было закрыто
                        unset($this->clients[intval($client)]);
                        @fclose($client);
                        continue;
                    }

                    foreach ($this->workers as $worker) {//пересылаем данные во все воркеры
                        if ($worker !== $client) {
                            fwrite($worker, $data);
                        }
                    }
                }
            }
        }
    }
}

abstract class WebsocketWorker
{
    protected $clients = array();
    protected $server;
    protected $master;
    protected $pid;

    public function __construct($server, $master) {
        $this->server = $server;
        $this->master = $master;
        $this->pid = posix_getpid();
    }

    public function start() {
        while (true) {
            //подготавливаем массив всех сокетов, которые необходимо обработать
            $read = $this->clients;
            $read[] = $this->server;
            $read[] = $this->master;

            stream_select($read, $write, $except, null);//обновляем массив сокетов, которые дозволено обработать

            if (in_array($this->server, $read)) { //на серверный сокет пришёл запрос от нового заказчика
                //подключаемся к нему и делаем рукопожатие, согласно протоколу вебсокета
                if (($client = stream_socket_accept($this->server, -1)) && $info = $this->handshake($client)) {
                    $this->clients[intval($client)] = $client;
                    $this->onOpen($client, $info);//вызываем пользовательский сценарий
                }

                //удаляем сервеный сокет из массива, Дабы не обработать его в этом цикле ещё раз
                unset($read[array_search($this->server, $read)]);
            }

            if (in_array($this->master, $read)) { //пришли данные от мастера
                $data = fread($this->master, 100000);

                $this->onSend($data);//вызываем пользовательский сценарий

                //удаляем мастера из массива, Дабы не обработать его в этом цикле ещё раз
                unset($read[array_search($this->master, $read)]);
            }

            if ($read) {//пришли данные от подключенных заказчиков
                foreach ($read as $client) {
                    $data = fread($client, 100000);

                    if (!$data) { //соединение было закрыто
                        unset($this->clients[intval($client)]);
                        @fclose($client);
                        $this->onClose($client);//вызываем пользовательский сценарий
                        continue;
                    }

                    $this->onMessage($client, $data);//вызываем пользовательский сценарий
                }
            }

        }
    }

    protected function handshake($client) {
        $info = array();

        $line = fgets($client);

        if (!$line) {
            return false;
        }

        $header = explode(' ', $line);
        $info['method'] = $header[0];
        $info['uri'] = $header[1];

        //считываем загаловки из соединения
        while ($line = rtrim(fgets($client))) {
            if (preg_match('/\A(\S ): (.*)\z/', $line, $matches)) {
                $info[$matches[1]] = $matches[2];
            } else {
                break;
            }
        }

        $address = explode(':', stream_socket_get_name($client, true)); //получаем адрес заказчика
        $info['ip'] = $address[0];
        $info['port'] = $address[1];

        if (empty($info['Sec-WebSocket-Key'])) {
            return false;
        }

        //отправляем заголовок согласно протоколу вебсокета
        $SecWebSocketAccept = base64_encode(pack('H*', sha1($info['Sec-WebSocket-Key'] . '258EAFA5-E914-47DA-95CA-C5AB0DC85B11')));
        $upgrade = "HTTP/1.1 101 Web Socket Protocol Handshake\r\n" .
            "Upgrade: websocket\r\n" .
            "Connection: Upgrade\r\n" .
            "Sec-WebSocket-Accept:$SecWebSocketAccept\r\n\r\n";
        fwrite($client, $upgrade);

        return $info;
    }

    protected function encode($payload, $type = 'text', $masked = false)
    {
        $frameHead = array();
        $payloadLength = strlen($payload);

        switch ($type) {
            case 'text':
                // first byte indicates FIN, Text-Frame (10000001):
                $frameHead[0] = 129;
                break;

            case 'close':
                // first byte indicates FIN, Close Frame(10001000):
                $frameHead[0] = 136;
                break;

            case 'ping':
                // first byte indicates FIN, Ping frame (10001001):
                $frameHead[0] = 137;
                break;

            case 'pong':
                // first byte indicates FIN, Pong frame (10001010):
                $frameHead[0] = 138;
                break;
        }

        // set mask and payload length (using 1, 3 or 9 bytes)
        if ($payloadLength > 65535) {
            $payloadLengthBin = str_split(sprintf('%064b', $payloadLength), 8);
            $frameHead[1] = ($masked === true) ? 255 : 127;
            for ($i = 0; $i < 8; $i  ) {
                $frameHead[$i   2] = bindec($payloadLengthBin[$i]);
            }
            // most significant bit MUST be 0
            if ($frameHead[2] > 127) {
                return array('type' => '', 'payload' => '', 'error' => 'frame too large (1004)');
            }
        } elseif ($payloadLength > 125) {
            $payloadLengthBin = str_split(sprintf('%016b', $payloadLength), 8);
            $frameHead[1] = ($masked === true) ? 254 : 126;
            $frameHead[2] = bindec($payloadLengthBin[0]);
            $frameHead[3] = bindec($payloadLengthBin[1]);
        } else {
            $frameHead[1] = ($masked === true) ? $payloadLength   128 : $payloadLength;
        }

        // convert frame-head to string:
        foreach (array_keys($frameHead) as $i) {
            $frameHead[$i] = chr($frameHead[$i]);
        }
        if ($masked === true) {
            // generate a random mask:
            $mask = array();
            for ($i = 0; $i < 4; $i  ) {
                $mask[$i] = chr(rand(0, 255));
            }

            $frameHead = array_merge($frameHead, $mask);
        }
        $frame = implode('', $frameHead);

        // append payload to frame:
        for ($i = 0; $i < $payloadLength; $i  ) {
            $frame .= ($masked === true) ? $payload[$i] ^ $mask[$i % 4] : $payload[$i];
        }

        return $frame;
    }

    protected function decode($data)
    {
        $unmaskedPayload = '';
        $decodedData = array();

        // estimate frame type:
        $firstByteBinary = sprintf('%08b', ord($data[0]));
        $secondByteBinary = sprintf('%08b', ord($data[1]));
        $opcode = bindec(substr($firstByteBinary, 4, 4));
        $isMasked = ($secondByteBinary[0] == '1') ? true : false;
        $payloadLength = ord($data[1]) & 127;

        // unmasked frame is received:
        if (!$isMasked) {
            return array('type' => '', 'payload' => '', 'error' => 'protocol error (1002)');
        }

        switch ($opcode) {
            // text frame:
            case 1:
                $decodedData['type'] = 'text';
                break;

            case 2:
                $decodedData['type'] = 'binary';
                break;

            // connection close frame:
            case 8:
                $decodedData['type'] = 'close';
                break;

            // ping frame:
            case 9:
                $decodedData['type'] = 'ping';
                break;

            // pong frame:
            case 10:
                $decodedData['type'] = 'pong';
                break;

            default:
                return array('type' => '', 'payload' => '', 'error' => 'unknown opcode (1003)');
        }

        if ($payloadLength === 126) {
            $mask = substr($data, 4, 4);
            $payloadOffset = 8;
            $dataLength = bindec(sprintf('%08b', ord($data[2])) . sprintf('%08b', ord($data[3])))   $payloadOffset;
        } elseif ($payloadLength === 127) {
            $mask = substr($data, 10, 4);
            $payloadOffset = 14;
            $tmp = '';
            for ($i = 0; $i < 8; $i  ) {
                $tmp .= sprintf('%08b', ord($data[$i   2]));
            }
            $dataLength = bindec($tmp)   $payloadOffset;
            unset($tmp);
        } else {
            $mask = substr($data, 2, 4);
            $payloadOffset = 6;
            $dataLength = $payloadLength   $payloadOffset;
        }

        /**
         * We have to check for large frames here. socket_recv cuts at 1024 bytes
         * so if websocket-frame is > 1024 bytes we have to wait until whole
         * data is transferd.
         */
        if (strlen($data) < $dataLength) {
            return false;
        }

        if ($isMasked) {
            for ($i = $payloadOffset; $i < $dataLength; $i  ) {
                $j = $i - $payloadOffset;
                if (isset($data[$i])) {
                    $unmaskedPayload .= $data[$i] ^ $mask[$j % 4];
                }
            }
            $decodedData['payload'] = $unmaskedPayload;
        } else {
            $payloadOffset = $payloadOffset - 4;
            $decodedData['payload'] = substr($data, $payloadOffset);
        }

        return $decodedData;
    }

    abstract protected function onOpen($client, $info);

    abstract protected function onClose($client);

    abstract protected function onMessage($client, $data);

    abstract protected function onSend($data);

    abstract protected function send($data);
}

//пример реализации чата
class WebsocketHandler extends WebsocketWorker
{
    protected function onOpen($client, $info) {//вызывается при соединении с новым заказчиком

    }

    protected function onClose($client) {//вызывается при закрытии соединения заказчиком

    }

    protected function onMessage($client, $data) {//вызывается при приобретении сообщения от заказчика
        $data = $this->decode($data);
        //var_export($data);
        //шлем каждому сообщение, о том, что пишет один из заказчиков
        $message = 'пользователь #' . intval($client) . ' (' . $this->pid . '): ' . $data['payload'];
        $this->send($message);

        $this->sendHelper($message);
    }

    protected function onSend($data) {//вызывается при приобретении сообщения от мастера
        $this->sendHelper($data);
    }

    protected function send($message) {//отправляем сообщение на мастер, Дабы он разослал его на все воркеры
        @fwrite($this->master, $message);
    }

    private function sendHelper($data) {
        $data = $this->encode($data);
        foreach ($this->clients as $client) {
            @fwrite($client, $data);
        }
    }
}

$config = array(
    'host' => '0.0.0.0',
    'port' => 8000,
    'workers' => 16,
);

$WebsocketServer = new WebsocketServer($config);
$WebsocketServer->start();

Источник: programmingmaster.ru
Оставить комментарий
Форум phpBB, русская поддержка форума phpBB
Рейтинг@Mail.ru 2008 - 2017 © BB3x.ru - русская поддержка форума phpBB