Workerman 是什么?
Workerman,高性能socket服务框架 Workerman - Github
下载/安装 WorkerMan实际上就是一个PHP代码包,如果你的PHP环境已经装好,只需要把WorkerMan源代码或者demo下载下来即可运行。
1 composer require workerman/workerman
启动停止 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 # 启动 // 以debug(调试)方式启动 php start.php start // 以daemon(守护进程)方式启动 php start.php start-d # 停止 php start.php stop # 重启 php start.php restart # 平滑重启 php start.php reload # 查看状态 php start.php status # 查看连接状态 php start.php connections
简单的开发示例 实例一、使用HTTP协议对外提供Web服务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 <?php require_once '../vendor/autoload.php' ;use Workerman \Worker ;$http_worker = new Worker ("http://0.0.0.0:2345" );$http_worker ->count = 4 ;$http_worker ->onMessage = function ($connection , $data ) { var_dump ($_GET , $_POST , $_COOKIE , $_SESSION , $_SERVER , $_FILES ); $connection ->send ("Hello Workerman\n" ); }; Worker ::runAll ();
命令行运行 (windows用户用 cmd命令行,下同)
1 2 3 4 5 6 $ php http_server.php ----------------------- WORKERMAN ----------------------------- Workerman version:3.5.22 PHP version:7.2.1 ------------------------ WORKERS ------------------------------- worker listen processes status none http://0.0.0.0:2345 4 [ok]
测试
在浏览器中访问url http://127.0.0.1:2345
浏览器返回: Hello Workerman 命令行: 打印数据$_GET等数据
实例二、使用WebSocket协议对外提供服务
Test\websocket_server.php
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 <?php require_once '../vendor/autoload.php' ;use Workerman \Worker ;$ws_worker = new Worker ("websocket://0.0.0.0:2000" );$ws_worker ->count = 4 ;$ws_worker ->onConnect = function ($connection ) { echo "New Connection\n" ; }; $ws_worker ->onMessage = function ($connection , $data ) { $connection ->send ('Hello' . $data ); }; $ws_worker ->onClose = function ($connection ) { echo "Connection Closed\n" ; }; Worker ::runAll ();
命令行运行
1 2 3 4 5 6 $ php websocket_server.php ----------------------- WORKERMAN ----------------------------- Workerman version:3.5.22 PHP version:7.2.1 ------------------------ WORKERS ------------------------------- worker listen processes status none websocket://0.0.0.0:2000 4 [ok]
测试
打开chrome浏览器,按F12打开调试控制台,在Console一栏输入(或者把下面代码放入到html页面用js运行)
1 2 3 4 5 6 7 8 9 10 // 假设服务端ip为127.0.0.1 ws = new WebSocket("ws://127.0.0.1:2000"); ws.onopen = function() { alert("连接成功"); ws.send('WEBSOCKET'); alert("给服务端发送一个字符串:WEBSOCKET"); }; ws.onmessage = function(e) { alert("收到服务端的消息:" + e.data); };
实例三、直接使用TCP传输数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 <?php require_once '../vendor/autoload.php' ;use Workerman \Worker ;$tcp_worker = new Worker ("tcp://0.0.0.0:1234" );$tcp_worker ->count = 4 ;$tcp_worker ->onConnect = function ($connection ) { echo "New TCP Connection\n" ; }; $tcp_worker ->onMessage = function ($connection , $data ) { $connection ->send ("hello tcp $data " ); }; $tcp_worker ->onClose = function ($connection ) { echo "Connection Closed\n" ; }; Worker ::runAll ();
命令行运行
1 2 3 4 5 6 $ php tcp_server.php ----------------------- WORKERMAN ----------------------------- Workerman version:3.5.22 PHP version:7.2.1 ------------------------ WORKERS ------------------------------- worker listen processes status none tcp://0.0.0.0:1234 4 [ok]
测试: 命令行运行
开发须知 目录结构 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 Workerman ├── Connection │ ├── AsyncTcpConnection.php // 异步TCP连接类 │ ├── AsyncUdpConnection.php // 异步UDP连接类 │ ├── ConnectionInterface.php// Socket连接接口 │ ├── TcpConnection.php // TCP连接类 │ └── UdpConnection.php // UDP连接类 ├── Events │ ├── React │ │ ├── Base.php // mime类型 │ │ ├── ExtEventLoop.php // 外部事件循环 │ │ ├── ExtLibEventLoop.php // 外部库事件循环 │ │ └── StreamSelectLoop.php // 流选择循环 │ ├── Ev.php // Libev网络事件库 │ ├── Event.php // Livevent事件循环 │ ├── EventInterface.php // 网络事件库接口 │ ├── Libevent.php // Libevent网络事件库 │ ├── Select.php // Select网络事件库 │ └── Swoole.php // Swoole网络事件库 ├── Lib │ ├── Constants.php // 常量定义 │ └── Timer.php // 定时器 ├── Protocols │ ├── Http │ │ └── mime.types // mime类型 │ ├── Frame.php // Frame协议实现 │ ├── Http.php // Http协议实现 │ ├── ProtocolInterface.php // 协议接口类 │ ├── Text.php // Text协议实现 │ ├── Websocket.php // Websocket协议实现 │ └── Ws.php // 客户端的Websocket协议实现 ├── Autoloader.php // 自动加载类 ├── WebServer.php // WebServer └── Worker.php // Worker
通讯协议 WorkerMan已经支持的协议 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 use Workerman \Worker ;$websocket_worker = new Worker ('websocket://0.0.0.0:2345' );$text_worker = new Worker ('text://0.0.0.0:2346' );$frame_worker = new Worker ('frame://0.0.0.0:2347' );$tcp_worker = new Worker ('tcp://0.0.0.0:2348' );$udp_worker = new Worker ('udp://0.0.0.0:2349' );$unix_worker = new Worker ('unix:///tmp/wm.sock' );
基础使用 常见问题 心跳检测 注意 :长连接应用必须加心跳,否则连接可能由于长时间未通讯被路由节点强行断开。
心跳作用主要有两个:
客户端定时给服务端发送点数据,防止连接由于长时间没有通讯而被某些节点的防火墙关闭导致连接断开的情况。
服务端可以通过心跳来判断客户端是否在线,如果客户端在规定时间内没有发来任何数据,就认为客户端下线。这样可以检测到客户端由于极端情况(断电、断网等)下线的事件。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 <?php require_once '../vendor/autoload.php' ;use Workerman \Worker ;use Workerman \Lib \Timer ;define ('HEARTBEAT_TIME' , 59 );$text_worker = new Worker ("text://0.0.0.0:2345" );$text_worker ->onMessage = function ($connection , $msg ) { $connection ->lastMsgTime = time (); }; $text_worker ->onWorkerStart = function ($worker ) { Timer ::add (1 , function () use ($worker ) { $time_now = time (); foreach ($worker ->connections as $connection ) { if (empty ($connection ->lastMsgTime)) { $connection ->lastMsgTime = $time_now ; continue ; } if ($time_now - $connection ->lastMsgTime > HEARTBEAT_TIME) { $connection ->close (); } }; }); }; Worker ::runAll ();
以上配置为如果客户端超过59秒没有发送任何数据给服务端,则服务端认为客户端已经掉线,服务端关闭连接并触发onClose。
客户端连接失败原因 连接失败客户端一般会有两种报错,connection refuse
和 connection timeout
Connection refuse (连接拒绝)
Connection timeout (连接超时)
是否支持多线程 ? Workerman有一个依赖pthreads扩展
的MT多线程版本
,但是由于pthreads扩展
还不够稳定,所以这个Workerman多线程版本已经不再维护。
目前Workerman及其周边产品都是基于多进程单线程
的。
支持哪些协议 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 // http协议 $worker1 = new Worker('http://0.0.0.0:1221'); // websocket协议 $worker2 = new Worker('websocket://0.0.0.0:1222'); // text文本协议(telnet协议) $worker3 = new Worker('text://0.0.0.0:1223'); // frame文本协议(可用于二进制数传输) $worker3 = new Worker('frame://0.0.0.0:1223'); // 直接基于tcp传输 $worker4 = new Worker('tcp://0.0.0.0:1224'); // 直接基于udp传输 $worker5 = new Worker('udp://0.0.0.0:1225');
对象和资源的持久化 在传统的Web开发中,PHP创建的对象、数据、资源等会在请求完毕后全部释放,导致很难做到持久化。而在WorkerMan中可以轻松做到这些。
在WorkerMan中如果想在内存中永久保存某些数据资源,可以将资源放到全局变量中或者类的静态成员中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 <?php require_once '../vendor/autoload.php'; use Workerman\Worker; $worker = new Worker("http://0.0.0.0:5678"); // 全局变量,保存当前进程的客户端连接数 $connection_ count = 0;// 进程数 $worker->count = 4; $worker->onConnect = function ($connection) { // 有新的客户端连接时,连接数+1 global $connection_count; ++$connection_count; echo "现在客户端连接数为: $connection_count\n"; }; $worker->onClose = function ($connection) { // 客户端关闭时,连接数-1 global $connection_count; $connection_count--; echo "现在客户端连接数为: $connection_count\n"; }; Worker::runAll();
支出多少并发 并发 概念太模糊,这里以两种可以量化的指标并发连接数 和并发请求数 来说明。
是指服务器当前时刻一共维持了多少TCP连接,而这些连接上是否有数据通讯并不关注,例如一台消息推送服务器上可能维持了百万的设备连接,由于连接上很少有数据通讯,所以这台服务器上负载可能几乎为0,只要内存足够,还可以继续接受连接。
并发连接数 受限于服务器内存,一般24G内存workerman服务器可以支持大概120W 并发连接。
一般用QPS(服务器每秒处理多少请求)来衡量,而当前时刻服务器上有多少个tcp连接并不十分关注。例如一台服务器只有10个客户端连接,每个客户端连接上每秒有1W个请求,那么要求服务端需要至少能支撑10*1W=10W每秒的吞吐量(QPS)。假设10W吞吐量每秒是这台服务器的极限,如果每个客户端每秒发送1个请求给服务端,那么这台服务器能够支撑10W个客户端。
并发请求数 受限于服务器cpu处理能力,一台24核workerman服务器可以达到45W 每秒的吞吐量(QPS),实际值根据业务复杂度以及代码质量有所变化。
更改代码后不生效 Workerman是常驻内存运行的,常驻内存可以避免重复读取磁盘、重复解释编译PHP,以便达到最高性能。所以更改业务代码后需要手动reload 或者restart 才能生效。
同时workerman提供一个监控文件更新的服务,该服务检测到有文件更新后会自动运行reload,从新载入PHP文件。开发者将其放入到项目中随着项目启动即可。
注意:windows系统不支持reload,无法使用监控服务
文件监控服务下载地址
向某个特定客户端发送数据 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 <?php require_once '../vendor/autoload.php'; use Workerman\Worker; // 创建Websocket服务 $worker = new Worker("websocket://0.0.0.0:2000"); // 进程数 $worker->count = 1; // 新增加一个属性,用来保存uid到connection的映射(uid是用户id或者客户端唯一标识) $worker->uidConnections = array(); $worker->onMessage = function ($connection, $data) { global $worker; // 判断当前客户端是否已经验证,即是否设置了uid if (!isset($connection->uid)) { // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证) $connection->uid = $data; $worker->uidConnections[$connection->uid] = $connection; return $connection->send("Login Success, your uid is $connection->uid"); } list($recv_ uid, $msg) = explode(':', $data); // 全局广播 if ($recv_uid == 'all') { broadcast($msg); } else { sendMsgByUid($recv_uid, $msg); } }; $worker->onClose = function ($connection) { global $worker; if (isset($connection->uid)) { // 连接断开时删除映射 unset($worker->uidConnections[$connection->uid]); } }; // 向所有验证的用户推送数据 function broadcast($msg) { global $worker; foreach ($worker->uidConnections as $connection) { $connection->send($msg); } } // 针对uid推送数据 function sendMsgByUid($uid, $msg) { global $worker; if (isset($worker->uidConnections[$uid])) { $connection = $worker->uidConnections[$uid]; $connection->send($msg); } }; Worker::runAll();
前端调用
1 2 3 4 5 6 7 8 var ws = new WebSocket('ws://127.0.0.1:2000'); ws.onopen = function(){ var uid = 'uid1'; ws.send(uid); }; ws.onmessage = function(e){ alert(e.data); };
如何主动推送消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 <?php require_once '../vendor/autoload.php'; use Workerman\Worker; use Workerman\Lib\Timer; $worker = new Worker("websocket://0.0.0.0:2000"); $worker->onWorkerStart = function ($worker) { // 进程启动后定时推送数据给客户端 Timer::add(1, function () use ($worker) { foreach ($worker->connections as $connection) { $connection->send("Hello Man"); } }); }; Worker::runAll();
如何实现异步任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 <?php require_once '../vendor/autoload.php' ;use Workerman \Worker ;$task_worker = new Worker ("text://0.0.0.0:12345" );$task_worker ->count = 100 ;$task_worker ->name = 'TaskWorker' ;$task_worker ->reusePort = true ;$task_worker ->onMessage = function ($connection , $task_data ) { $task_data = json_decode ($task_data , true ); $connection ->send (json_encode ($task_data )); }; Worker ::runAll ();
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 <?php require_once '../vendor/autoload.php' ;use Workerman \Worker ;use Workerman \Connection \AsyncTcpConnection ;$worker = new Worker ("websocket://0.0.0.0:8080" );$worker ->onMessage = function ($ws_connection , $msg ) { $task_connection = new AsyncTcpConnection ('Text://127.0.0.1:12345' ); $task_data = array ( 'function' => 'send_mail' , 'args' => array ('from' =>'xxx' , 'to' =>'xxx' , 'contents' =>'xxx' ), ); $task_connection ->send (json_encode ($task_data )); $task_connection ->onMessage = function ($task_connection , $task_result ) use ($ws_connection ) { var_dump ($task_result ); $task_connection ->close (); $ws_connection ->send ('task_complete' ); }; $task_connection ->connect (); }; Worker ::runAll ();
1 2 3 4 5 6 7 var ws = new WebSocket('ws://127.0.0.1:8080'); ws.onopen = function(){ ws.send(); }; ws.onmessage = function(e){ alert(e.data); };
监听IPv6
问:如何让客户端即能通过ipv4地址访问,也能通过ipv6地址访问?
答:在初始化容器的时候监听地址写[::]即可。
1 2 $worker = new Worker('http://[::]:8080'); $gateway = new Gateway('websocket://[::]:8081');
PHP几种回调写法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 <?php require_once '../vendor/autoload.php' ;use Workerman \Worker ;$http_worker = new Worker ("http://0.0.0.0:2345" );$http_worker ->onMessage = function ($connection , $data ) { $connection ->send ("Hello World" ); }; Worker ::runAll ();
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 <?php require_once '../vendor/autoload.php' ;use Workerman \Worker ;$http_worker = new Worker ("http://0.0.0.0:2345" );$http_worker ->onMessage = 'on_message' ;function on_message ($connection , $data ) { $connection ->send ("Hello World" ); }; Worker ::runAll ();
WorkerClass.php
1 2 3 4 5 6 7 8 9 10 11 <?php class WorkerClass { public function __construct ( ) {} public function onWorkerStart ($worker ) {} public function onConnect ($connect ) {} public function onMessage ($connection , $message ) {} public function onClose ($connection ) {} public function onWorkerStop ($connection ) {} }
启动脚本 start.php
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 <?php use Workerman \Worker ;require_once '../vendor/autoload.php' ;require_once __DIR__ . '/WorkerClass.php' ;$worker = new Worker ("websocket://0.0.0.0:1234" );$object = new WorkerClass ();$worker ->onWorkerStart = array ($object , 'onWorkerStart' );$worker ->onConnect = array ($object , 'onConnect' );$worker ->onMessage = array ($object , 'onMessage' );$worker ->onClose = array ($object , 'onClose' );$worker ->onWorkerStop = array ($object , 'onWorkerStop' );Worker ::runAll ();
注意: 以上的代码结构不允许在构造函数里初始化资源(MySQL连接、Redis连接、Memcache连接等),因为$my_object = new MyClass();运行在主进程。以MySQL为例,在主进程初始化的MySQL连接等资源会被子进程继承,每个子进程都可以操作这个数据库连接,但是这些连接在MySQL服务端对应的是同一个连接,会发生不可预期的错误,例如mysql gone away 错误。
以上代码结构如果需要在类的构造函数里初始化资源,可以采用以下写法。
WorkerClass.php
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 <?php class WorkerClass { protected $db = null ; public function __construct ( ) { $db = new MyDbClass (); } public function onWorkerStart ($worker ) {} public function onConnect ($connect ) {} public function onMessage ($connection , $message ) {} public function onClose ($connection ) {} public function onWorkerStop ($connection ) {} }
启动脚本 start.php
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 <?php use Workerman \Worker ;require_once '../vendor/autoload.php' ;$worker = new Worker ("websocket://0.0.0.0:1234" );$worker ->onWorkerStart = function ($worker ) { require_once __DIR__ . '/WorkerClass.php' ; $object = new WorkerClass (); $worker ->onWorkerStart = array ($object , 'onWorkerStart' ); $worker ->onConnect = array ($object , 'onConnect' ); $worker ->onMessage = array ($object , 'onMessage' ); $worker ->onClose = array ($object , 'onClose' ); $worker ->onWorkerStop = array ($object , 'onWorkerStop' ); }; Worker ::runAll ();
1 2 3 4 5 6 7 8 9 10 11 <?php class WorkerClass { public function __construct ( ) {} public static function onWorkerStart ($worker ) {} public static function onConnect ($connect ) {} public static function onMessage ($connection , $message ) {} public static function onClose ($connection ) {} public static function onWorkerStop ($connection ) {} }
接收一定请求后重启 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 <?php require_once '../vendor/autoload.php' ;use Workerman \Worker ;$worker = new Worker ("websocket://0.0.0.0:8080" );$worker ->onMessage = function ($connection , $data ) { static $request_count ; if (++$request_count > 10000 ) { Worker ::stopAll (); } };
Chrome 请求两次问题 上面操作, 计数客户端连接操作,一次chrome请求有两个连接
1 2 3 4 5 6 7 8 $ php global .php ----------------------- WORKERMAN ----------------------------- Workerman version:3.5 .22 PHP version:7.2 .1 ------------------------ WORKERS ------------------------------- worker listen processes status none http: 现在客户端连接数为: 1 现在客户端连接数为: 2
在Swoole里面找到了答案: Chrome 请求两次问题
使用Chrome浏览器访问服务器,会产生额外的一次请求,/favicon.ico,可以在代码中响应404错误。
1 2 3 4 5 6 7 8 $http ->on ('request' , function ($request , $response ) { if ($request ->server['path_info' ] == '/favicon.ico' || $request ->server['request_uri' ] == '/favicon.ico' ) { return $response ->end (); } var_dump ($request ->get, $request ->post); $response ->header ("Content-Type" , "text/html; charset=utf-8" ); $response ->end ("<h1>Hello Swoole. #" .rand (1000 , 9999 )."</h1>" ); });
相关协议 Websocket 协议 目前Workerman的WebSocke协议版本为13
WebSocket protocol 是HTML5一种新的协议。它实现了浏览器与服务器全双工通信
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 <?php use Workerman \Worker ;require_once '../vendor/autoload.php' ;$websocket = new Worker ("websocket://0.0.0.0:8181" );$websocket ->onConnect = function ($connection ) { $connection ->onWebSocketConnect = function ($connection , $http_header ) { if ($_SERVER ['HTTP_ORIGIN' ] != '127.0.0.1' ) { $connection ->close (); } }; };
WS 协议 目前Workerman的ws协议版本为13
workerman可以作为客户端通过ws协议发起websocket连接,连到远程websocket服务器,实现双向通讯。
注意 :ws协议只能通过AsyncTcpConnection作为客户端使用,不能作为websocket服务端监听协议。也就是说以下写法是错误的。
1 $worker = new Worker('ws://0.0.0.0:8080');
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 <?php require_once '../vendor/autoload.php' ;use Workerman \Worker ;use Workerman \Connection \AsyncTcpConnection ;$worker = new Worker ();$worker ->onWorkerStart = function ( ) { $ws_connection = new AsyncTcpConnection ("ws:echo.websocket.org:80" ); $ws_connection ->onConnect = function ($connection ) { $connection ->send ('Hello' ); }; $ws_connection ->onMessage = function ($connection , $data ) { echo "recv: $data \n" ; }; $ws_connection ->onError = function ($connection , $code , $msg ) { echo "error: $msg \n" ; }; $ws_connection ->onClose = function ($connection ) { echo "connection closed\n" ; }; $ws_connection ->connect (); }; Worker ::runAll ();
Text 协议 Workerman定义了一种叫做text的文本协议,协议格式为 数据包+换行符
,即在每个数据包末尾加上一个换行符表示包的结束。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 <?php require_once '../vendor/autoload.php' ;use Workerman \Worker ;$text_worker = new Worker ("text://0.0.0.0:5678" );$text_worker ->onMessage = function ($connection , $data ) { var_dump ($data ); $connection ->send ("hello world" ); }; Worker ::runAll ();
Frame 协议 workerman定义了一种叫做frame的协议,协议格式为 总包长+包体
,其中包长为4字节网络字节序的整数,包体可以是普通文本或者二进制数据。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 <?php use Workerman \Connection \TcpConnection ;class Frame { public static function input ($buffer , TcpConnection $connection ) { if (strlen ($buffer ) < 4 ) { return 0 ; } $unpack_data = unpack ('Ntotal_length' , $buffer ); return $unpack_data ['total_length' ]; } public static function decode ($buffer ) { return substr ($buffer , 4 ); } public static function encode ($buffer ) { $total_length = 4 + strlen ($buffer ); return pack ('N' , $total_length ) . $buffer ; } }
参考