From 00d7a381aa369870a773a5a7b8027f3d7532843f Mon Sep 17 00:00:00 2001 From: "X14XA\\shengli" Date: Tue, 6 Jan 2026 13:37:07 +0800 Subject: [PATCH] =?UTF-8?q?=E5=88=9D=E5=A7=8B=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 3 + LICENSE | 201 ++++++++++++ README.md | 139 ++++++++- composer.json | 55 ++++ phpstan.neon | 16 + phpunit.xml | 18 ++ src/App.php | 23 ++ src/Conduit.php | 22 ++ src/Http.php | 17 ++ src/Ipc.php | 41 +++ src/Manager.php | 27 ++ src/Sandbox.php | 210 +++++++++++++ src/Service.php | 23 ++ src/Watcher.php | 33 ++ src/Websocket.php | 121 ++++++++ src/Worker.php | 19 ++ src/command/Server.php | 35 +++ src/concerns/InteractsWithConduit.php | 20 ++ src/concerns/InteractsWithHttp.php | 321 ++++++++++++++++++++ src/concerns/InteractsWithQueue.php | 97 ++++++ src/concerns/InteractsWithServer.php | 119 ++++++++ src/concerns/InteractsWithWebsocket.php | 145 +++++++++ src/concerns/ModifyProperty.php | 18 ++ src/concerns/WithApplication.php | 77 +++++ src/concerns/WithContainer.php | 75 +++++ src/conduit/Driver.php | 26 ++ src/conduit/driver/Socket.php | 186 ++++++++++++ src/conduit/driver/socket/Command.php | 22 ++ src/conduit/driver/socket/Event.php | 15 + src/conduit/driver/socket/Result.php | 16 + src/conduit/driver/socket/Server.php | 102 +++++++ src/config/worker.php | 39 +++ src/contract/ResetterInterface.php | 17 ++ src/contract/websocket/HandlerInterface.php | 31 ++ src/message/PushMessage.php | 15 + src/protocols/FlexHttp.php | 39 +++ src/resetters/ClearInstances.php | 23 ++ src/resetters/ResetConfig.php | 18 ++ src/resetters/ResetEvent.php | 22 ++ src/resetters/ResetModel.php | 21 ++ src/resetters/ResetPaginator.php | 30 ++ src/resetters/ResetService.php | 33 ++ src/response/File.php | 112 +++++++ src/response/Iterator.php | 22 ++ src/response/Websocket.php | 66 ++++ src/watcher/Driver.php | 8 + src/watcher/Find.php | 72 +++++ src/watcher/Scan.php | 52 ++++ src/websocket/Event.php | 15 + src/websocket/Frame.php | 10 + src/websocket/Handler.php | 71 +++++ src/websocket/Pusher.php | 67 ++++ src/websocket/Room.php | 54 ++++ src/websocket/socketio/EnginePacket.php | 81 +++++ src/websocket/socketio/Handler.php | 198 ++++++++++++ src/websocket/socketio/Packet.php | 134 ++++++++ tests/Pest.php | 2 + tests/feature/HttpTest.php | 153 ++++++++++ tests/feature/QueueTest.php | 5 + tests/feature/WebsocketTest.php | 77 +++++ tests/stub/.env | 1 + tests/stub/app/controller/Index.php | 20 ++ tests/stub/config/app.php | 4 + tests/stub/config/cache.php | 9 + tests/stub/config/queue.php | 40 +++ tests/stub/config/worker.php | 45 +++ tests/stub/public/asset.txt | 1 + tests/stub/route/app.php | 51 ++++ tests/stub/runtime/.gitignore | 2 + tests/stub/think | 12 + 70 files changed, 3913 insertions(+), 1 deletion(-) create mode 100644 .gitignore create mode 100644 LICENSE create mode 100644 composer.json create mode 100644 phpstan.neon create mode 100644 phpunit.xml create mode 100644 src/App.php create mode 100644 src/Conduit.php create mode 100644 src/Http.php create mode 100644 src/Ipc.php create mode 100644 src/Manager.php create mode 100644 src/Sandbox.php create mode 100644 src/Service.php create mode 100644 src/Watcher.php create mode 100644 src/Websocket.php create mode 100644 src/Worker.php create mode 100644 src/command/Server.php create mode 100644 src/concerns/InteractsWithConduit.php create mode 100644 src/concerns/InteractsWithHttp.php create mode 100644 src/concerns/InteractsWithQueue.php create mode 100644 src/concerns/InteractsWithServer.php create mode 100644 src/concerns/InteractsWithWebsocket.php create mode 100644 src/concerns/ModifyProperty.php create mode 100644 src/concerns/WithApplication.php create mode 100644 src/concerns/WithContainer.php create mode 100644 src/conduit/Driver.php create mode 100644 src/conduit/driver/Socket.php create mode 100644 src/conduit/driver/socket/Command.php create mode 100644 src/conduit/driver/socket/Event.php create mode 100644 src/conduit/driver/socket/Result.php create mode 100644 src/conduit/driver/socket/Server.php create mode 100644 src/config/worker.php create mode 100644 src/contract/ResetterInterface.php create mode 100644 src/contract/websocket/HandlerInterface.php create mode 100644 src/message/PushMessage.php create mode 100644 src/protocols/FlexHttp.php create mode 100644 src/resetters/ClearInstances.php create mode 100644 src/resetters/ResetConfig.php create mode 100644 src/resetters/ResetEvent.php create mode 100644 src/resetters/ResetModel.php create mode 100644 src/resetters/ResetPaginator.php create mode 100644 src/resetters/ResetService.php create mode 100644 src/response/File.php create mode 100644 src/response/Iterator.php create mode 100644 src/response/Websocket.php create mode 100644 src/watcher/Driver.php create mode 100644 src/watcher/Find.php create mode 100644 src/watcher/Scan.php create mode 100644 src/websocket/Event.php create mode 100644 src/websocket/Frame.php create mode 100644 src/websocket/Handler.php create mode 100644 src/websocket/Pusher.php create mode 100644 src/websocket/Room.php create mode 100644 src/websocket/socketio/EnginePacket.php create mode 100644 src/websocket/socketio/Handler.php create mode 100644 src/websocket/socketio/Packet.php create mode 100644 tests/Pest.php create mode 100644 tests/feature/HttpTest.php create mode 100644 tests/feature/QueueTest.php create mode 100644 tests/feature/WebsocketTest.php create mode 100644 tests/stub/.env create mode 100644 tests/stub/app/controller/Index.php create mode 100644 tests/stub/config/app.php create mode 100644 tests/stub/config/cache.php create mode 100644 tests/stub/config/queue.php create mode 100644 tests/stub/config/worker.php create mode 100644 tests/stub/public/asset.txt create mode 100644 tests/stub/route/app.php create mode 100644 tests/stub/runtime/.gitignore create mode 100644 tests/stub/think diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..9912bd5 --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +composer.lock +vendor/ +.idea/ diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..8dada3e --- /dev/null +++ b/LICENSE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "{}" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright {yyyy} {name of copyright owner} + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/README.md b/README.md index e75289b..f0a92d2 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,139 @@ -# think-worker +ThinkPHP Workerman 扩展 +=============== +交流群:981069000 [![点击加群](https://pub.idqqimg.com/wpa/images/group.png "点击加群")](https://qm.qq.com/q/A8YNpzrzC8) + +## 安装 +``` +composer require topthink/think-worker +``` + +## 说明 +> 由于windows下无法在一个文件里启动多个worker,所以本扩展不支持windows平台 + +## 使用方法 + +### HttpServer + +在命令行启动服务端 +~~~ +php think worker +~~~ + +然后就可以通过浏览器直接访问当前应用 + +~~~ +http://localhost:8080 +~~~ + +如果需要使用守护进程方式运行,建议使用supervisor来管理进程 + +## 访问静态文件 +> 建议使用nginx来支持静态文件访问,也可使用路由输出文件内容,下面是示例,可参照修改 +1. 添加静态文件路由: + +```php +Route::get('static/:path', function (string $path) { + $filename = public_path() . $path; + return new \think\worker\response\File($filename); +})->pattern(['path' => '.*\.\w+$']); +``` + +2. 访问路由 `http://localhost/static/文件路径` + +## 队列支持 + +使用方法见 [think-queue](https://github.com/top-think/think-queue) + +以下配置代替think-queue里的最后一步:`监听任务并执行`,无需另外起进程执行队列 + +```php +return [ + // ... + 'queue' => [ + 'enable' => true, + //键名是队列名称 + 'workers' => [ + //下面参数是不设置时的默认配置 + 'default' => [ + 'delay' => 0, + 'sleep' => 3, + 'tries' => 0, + 'timeout' => 60, + 'worker_num' => 1, + ], + //使用@符号后面可指定队列使用驱动 + 'default@connection' => [ + //此处可不设置任何参数,使用上面的默认配置 + ], + ], + ], + // ... +]; + +``` + +### websocket + +> 使用路由调度的方式,可以让不同路径的websocket服务响应不同的事件 + +#### 配置 + +``` +worker.websocket = true 时开启 +``` + +#### 路由定义 +```php +Route::get('path1','controller/action1'); +Route::get('path2','controller/action2'); +``` + +#### 控制器 + +```php +use \think\worker\Websocket; +use \think\worker\websocket\Frame; + +class Controller { + + public function action1(){ + + return (new \think\worker\response\Websocket()) + ->onOpen(...) + ->onMessage(function(Websocket $websocket, Frame $frame){ + ... + }) + ->onClose(...); + } + + public function action2(){ + + return (new \think\worker\response\Websocket()) + ->onOpen(...) + ->onMessage(function(Websocket $websocket, Frame $frame){ + ... + }) + ->onClose(...); + } +} +``` + + +## 自定义worker +监听`worker.init`事件 注入`Manager`对象,调用addWorker方法添加 +~~~php +use think\worker\Manager; +use \think\worker\Worker; + +//... + +public function handle(Manager $manager){ + $worker = $manager->addWorker(function(Worker $worker){ + //..其他回调或处理 + //动态添加监听可参考 https://www.workerman.net/doc/workerman/worker/listen.html + }); +} + +//... +~~~ diff --git a/composer.json b/composer.json new file mode 100644 index 0000000..8395726 --- /dev/null +++ b/composer.json @@ -0,0 +1,55 @@ +{ + "name": "topthink/think-worker", + "description": "workerman extend for thinkphp", + "license": "Apache-2.0", + "authors": [ + { + "name": "liu21st", + "email": "liu21st@gmail.com" + } + ], + "require": { + "php": ">=8.2", + "workerman/workerman": "~5.0.0", + "topthink/framework": "^8.0", + "revolt/event-loop": "^1.0", + "workerman/redis": "^2.0", + "symfony/finder": ">=4.3" + }, + "autoload": { + "psr-4": { + "think\\worker\\": "src" + } + }, + "extra": { + "think": { + "services": [ + "think\\worker\\Service" + ], + "config": { + "worker": "src/config/worker.php" + } + } + }, + "require-dev": { + "pestphp/pest": "^3.7", + "guzzlehttp/guzzle": "^7.0", + "topthink/think-queue": "^3.0", + "phpstan/phpstan": "^2.0", + "ratchet/pawl": "^0.4.1" + }, + "autoload-dev": { + "psr-4": { + "app\\": "tests/stub/app" + } + }, + "config": { + "allow-plugins": { + "pestphp/pest-plugin": true + } + }, + "scripts": { + "analyze": "phpstan --memory-limit=1G", + "test": "pest --colors=always" + } +} diff --git a/phpstan.neon b/phpstan.neon new file mode 100644 index 0000000..6c8556d --- /dev/null +++ b/phpstan.neon @@ -0,0 +1,16 @@ +parameters: + level: 5 + paths: + - src + - tests + universalObjectCratesClasses: + - PHPUnit\Framework\TestCase + ignoreErrors: + - '#Function root_path not found.#' + - '#Function env not found.#' + - '#Function app_path not found.#' + - '#Function config_path not found.#' + - '#Function public_path not found.#' + - '#Function json not found.#' + - '#Function runtime_path not found.#' + - '#Constant STUB_DIR not found.#' diff --git a/phpunit.xml b/phpunit.xml new file mode 100644 index 0000000..715322c --- /dev/null +++ b/phpunit.xml @@ -0,0 +1,18 @@ + + + + + ./tests + + + + + + ./src + + + diff --git a/src/App.php b/src/App.php new file mode 100644 index 0000000..08a6210 --- /dev/null +++ b/src/App.php @@ -0,0 +1,23 @@ +inConsole = $inConsole; + } + + public function runningInConsole(): bool + { + return $this->inConsole; + } + + public function clearInstances() + { + $this->instances = []; + } +} diff --git a/src/Conduit.php b/src/Conduit.php new file mode 100644 index 0000000..683de5a --- /dev/null +++ b/src/Conduit.php @@ -0,0 +1,22 @@ +app->config->get("worker.conduit.{$name}", []); + } + + public function getDefaultDriver() + { + return $this->app->config->get('worker.conduit.type', 'socket'); + } +} diff --git a/src/Http.php b/src/Http.php new file mode 100644 index 0000000..e4f9e86 --- /dev/null +++ b/src/Http.php @@ -0,0 +1,17 @@ +subscribe(); + return $this->workerId; + } + + public function sendMessage($workerId, $message) + { + if ($workerId === $this->workerId) { + $this->manager->triggerEvent('message', $message); + } else { + $this->publish($workerId, $message); + } + } + + public function subscribe() + { + $this->workerId = $this->conduit->inc('ipc:worker'); + $this->conduit->subscribe("ipc:message:{$this->workerId}", function ($message) { + $this->manager->triggerEvent('message', unserialize($message)); + }); + } + + public function publish($workerId, $message) + { + $this->conduit->publish("ipc:message:{$workerId}", serialize($message)); + } +} diff --git a/src/Manager.php b/src/Manager.php new file mode 100644 index 0000000..ba9fcf4 --- /dev/null +++ b/src/Manager.php @@ -0,0 +1,27 @@ +prepareHttp(); + $this->prepareQueue(); + $this->prepareConduit(); + } +} diff --git a/src/Sandbox.php b/src/Sandbox.php new file mode 100644 index 0000000..f0599b6 --- /dev/null +++ b/src/Sandbox.php @@ -0,0 +1,210 @@ +app = $app; + $this->snapshots = new WeakMap(); + $this->initialize(); + } + + protected function initialize() + { + Container::setInstance(function () { + return $this->getSnapshot(); + }); + + $this->setInitialConfig(); + $this->setInitialServices(); + $this->setInitialEvent(); + $this->setInitialResetters(); + } + + public function run(Closure $callable, ?object $key = null) + { + $this->snapshot = $this->createApp($key); + try { + $this->snapshot->invoke($callable, [$this]); + } catch (Throwable $e) { + $this->snapshot->make(Handle::class)->report($e); + } finally { + if (empty($key)) { + $this->snapshot->clearInstances(); + } + $this->snapshot = null; + $this->setInstance($this->app); + } + } + + protected function createApp(?object $key = null) + { + if (!empty($key)) { + if (isset($this->snapshots[$key])) { + return $this->snapshots[$key]->app; + } + } + + $app = clone $this->app; + $this->setInstance($app); + $this->resetApp($app); + + if (!empty($key)) { + $this->snapshots[$key] = new class($app) { + public function __construct(public App $app) + { + } + + public function __destruct() + { + $this->app->clearInstances(); + } + }; + } + + return $app; + } + + protected function resetApp(App $app) + { + foreach ($this->resetters as $resetter) { + $resetter->handle($app, $this); + } + } + + protected function setInstance(App $app) + { + $app->instance('app', $app); + $app->instance(Container::class, $app); + + $reflectObject = new ReflectionObject($app); + $reflectProperty = $reflectObject->getProperty('services'); + $services = $reflectProperty->getValue($app); + + foreach ($services as $service) { + $this->modifyProperty($service, $app); + } + } + + /** + * Set initial config. + */ + protected function setInitialConfig() + { + $this->config = clone $this->app->config; + } + + protected function setInitialEvent() + { + $this->event = clone $this->app->event; + } + + protected function setInitialServices() + { + $services = $this->config->get('worker.services', []); + + foreach ($services as $service) { + if (class_exists($service) && !in_array($service, $this->services)) { + $serviceObj = new $service($this->app); + $this->services[$service] = $serviceObj; + } + } + } + + /** + * Initialize resetters. + */ + protected function setInitialResetters() + { + $resetters = [ + ClearInstances::class, + ResetConfig::class, + ResetEvent::class, + ResetService::class, + ResetModel::class, + ResetPaginator::class, + ]; + + $resetters = array_merge($resetters, $this->config->get('worker.resetters', [])); + + foreach ($resetters as $resetter) { + $resetterClass = $this->app->make($resetter); + if (!$resetterClass instanceof ResetterInterface) { + throw new RuntimeException("{$resetter} must implement " . ResetterInterface::class); + } + $this->resetters[$resetter] = $resetterClass; + } + } + + public function getSnapshot() + { + $snapshot = $this->snapshot; + if ($snapshot instanceof App) { + return $snapshot; + } + + throw new InvalidArgumentException('The app object has not been initialized'); + } + + /** + * Get config snapshot. + */ + public function getConfig() + { + return $this->config; + } + + public function getEvent() + { + return $this->event; + } + + public function getServices() + { + return $this->services; + } + +} diff --git a/src/Service.php b/src/Service.php new file mode 100644 index 0000000..9f4946b --- /dev/null +++ b/src/Service.php @@ -0,0 +1,23 @@ + +// +---------------------------------------------------------------------- +namespace think\worker; + +use think\worker\command\Server; + +class Service extends \think\Service +{ + public function boot() + { + $this->commands([ + Server::class, + ]); + } +} diff --git a/src/Watcher.php b/src/Watcher.php new file mode 100644 index 0000000..f538b0a --- /dev/null +++ b/src/Watcher.php @@ -0,0 +1,33 @@ +app->config->get('worker.hot_update.' . $name, $default); + } + + protected function resolveParams($name): array + { + return [ + array_filter($this->getConfig('include', []), function ($dir) { + return is_dir($dir); + }), + $this->getConfig('exclude', []), + $this->getConfig('name', []), + ]; + } + + public function getDefaultDriver() + { + return $this->getConfig('type', 'scan'); + } +} diff --git a/src/Websocket.php b/src/Websocket.php new file mode 100644 index 0000000..9533c47 --- /dev/null +++ b/src/Websocket.php @@ -0,0 +1,121 @@ +app = $app; + $this->room = $room; + $this->event = $event; + $this->connection = $connection; + } + + /** + * @return Pusher + */ + protected function makePusher() + { + return $this->app->invokeClass(Pusher::class); + } + + public function to(...$values) + { + return $this->makePusher()->to(...$values); + } + + public function push($data) + { + $this->makePusher()->to($this->getSender())->push($data); + } + + public function emit(string $event, ...$data) + { + $this->makePusher()->to($this->getSender())->emit($event, ...$data); + } + + public function join(...$rooms): self + { + $this->room->add($this->getSender(), ...$rooms); + + return $this; + } + + public function leave(...$rooms): self + { + $this->room->delete($this->getSender(), ...$rooms); + + return $this; + } + + public function setConnected($connected) + { + $this->connected = $connected; + } + + public function isEstablished() + { + return $this->connected; + } + + public function close() + { + if ($this->connection) { + $this->connection->close(); + } + } + + public function setSender(string $fd) + { + $this->sender = $fd; + return $this; + } + + public function getSender() + { + if (empty($this->sender)) { + throw new RuntimeException('Cannot use websocket as current client before handshake!'); + } + return $this->sender; + } +} diff --git a/src/Worker.php b/src/Worker.php new file mode 100644 index 0000000..ab1b144 --- /dev/null +++ b/src/Worker.php @@ -0,0 +1,19 @@ + +// +---------------------------------------------------------------------- + +namespace think\worker\command; + +use think\console\Command; +use think\worker\Manager; + +/** + * Worker Server 命令行类 + */ +class Server extends Command +{ + protected $config = []; + + public function configure() + { + $this->setName('worker') + ->setDescription('Workerman Server for ThinkPHP'); + } + + public function handle(Manager $manager) + { + $manager->start(); + } + +} diff --git a/src/concerns/InteractsWithConduit.php b/src/concerns/InteractsWithConduit.php new file mode 100644 index 0000000..29e6ff9 --- /dev/null +++ b/src/concerns/InteractsWithConduit.php @@ -0,0 +1,20 @@ +conduit = $this->container->make(Conduit::class); + $this->conduit->prepare(); + $this->onEvent('workerStart', function () { + $this->app->instance(Conduit::class, $this->conduit); + }); + } +} diff --git a/src/concerns/InteractsWithHttp.php b/src/concerns/InteractsWithHttp.php new file mode 100644 index 0000000..332a18a --- /dev/null +++ b/src/concerns/InteractsWithHttp.php @@ -0,0 +1,321 @@ +getConfig('http.enable', true)) { + + $this->wsEnable = $this->getConfig('websocket.enable', false); + + if ($this->wsEnable) { + $this->prepareWebsocket(); + } + + $workerNum = $this->getConfig('http.worker_num', 4); + $this->addWorker([$this, 'createHttpServer'], 'http server', $workerNum); + } + } + + public function createHttpServer() + { + $this->preloadHttp(); + + $host = $this->getConfig('http.host'); + $port = $this->getConfig('http.port'); + $options = $this->getConfig('http.options', []); + + $server = new Worker("\\think\\worker\\protocols\\FlexHttp://{$host}:{$port}", $options); + + $server->reusePort = true; + + $server->onMessage = function (TcpConnection $connection, $data) { + if ($data instanceof WorkerRequest) { + if ($this->wsEnable && $this->isWebsocketRequest($data)) { + $this->onHandShake($connection, $data); + } else { + $this->onRequest($connection, $data); + } + } elseif ($data instanceof Frame) { + $this->onMessage($connection, $data); + } + }; + + $server->onClose = function (TcpConnection $connection) { + if ($this->wsEnable) { + $this->onClose($connection); + } + }; + + $server->listen(); + } + + protected function preloadHttp() + { + $http = $this->app->http; + $this->app->invokeMethod([$http, 'loadMiddleware'], [], true); + + if ($this->app->config->get('app.with_route', true)) { + $this->app->invokeMethod([$http, 'loadRoutes'], [], true); + $route = clone $this->app->route; + unset($this->app->route); + + $this->app->resolving(WorkerHttp::class, function ($http, App $app) use ($route) { + $newRoute = clone $route; + $this->modifyProperty($newRoute, $app); + $app->instance('route', $newRoute); + }); + } + + $middleware = clone $this->app->middleware; + unset($this->app->middleware); + + $this->app->resolving(WorkerHttp::class, function ($http, App $app) use ($middleware) { + $newMiddleware = clone $middleware; + $this->modifyProperty($newMiddleware, $app); + $app->instance('middleware', $newMiddleware); + }); + + unset($this->app->http); + $this->app->bind(Http::class, WorkerHttp::class); + } + + public function onRequest(TcpConnection $connection, WorkerRequest $wkRequest) + { + $this->runInSandbox(function (Http $http, Event $event, WorkerApp $app) use ($connection, $wkRequest) { + + $app->setInConsole(false); + + $request = $this->prepareRequest($wkRequest); + + try { + $response = $this->handleRequest($http, $request); + $this->prepareResponse($response); + } catch (Throwable $e) { + $handle = $app->make(Handle::class); + $handle->report($e); + $response = $handle->render($request, $e); + } + + $this->sendResponse($connection, $request, $response, $app->cookie); + + //关闭连接 + $connection->close(); + + $http->end($response); + }); + } + + protected function handleRequest(Http $http, $request) + { + $level = ob_get_level(); + ob_start(); + + $response = $http->run($request); + + if (ob_get_length() > 0) { + $content = $response->getContent(); + $response->content(ob_get_contents() . $content); + } + + while (ob_get_level() > $level) { + ob_end_clean(); + } + + return $response; + } + + protected function prepareRequest(WorkerRequest $wkRequest) + { + $header = $wkRequest->header(); + $server = []; + + foreach ($header as $key => $value) { + $server['http_' . str_replace('-', '_', $key)] = $value; + } + + // 重新实例化请求对象 处理请求数据 + /** @var \think\Request $request */ + $request = $this->app->make('request', [], true);; + + $queryString = $wkRequest->queryString(); + + return $request + ->setMethod($wkRequest->method()) + ->withHeader($header) + ->withServer($server) + ->withGet($wkRequest->get()) + ->withPost($wkRequest->post()) + ->withCookie($wkRequest->cookie()) + ->withFiles($wkRequest->file()) + ->withInput($wkRequest->rawBody()) + ->setBaseUrl($wkRequest->uri()) + ->setUrl($wkRequest->uri() . (!empty($queryString) ? '?' . $queryString : '')) + ->setPathinfo(ltrim($wkRequest->path(), '/')); + } + + protected function prepareResponse(\think\Response $response) + { + switch (true) { + case $response instanceof View: + $response->getContent(); + break; + } + } + + protected function sendResponse(TcpConnection $connection, \think\Request $request, \think\Response $response, Cookie $cookie) + { + switch (true) { + case $response instanceof IteratorResponse: + $this->sendIterator($connection, $response, $cookie); + break; + case $response instanceof FileResponse: + $this->sendFile($connection, $request, $response, $cookie); + break; + default: + $this->sendContent($connection, $response, $cookie); + } + } + + protected function sendIterator(TcpConnection $connection, IteratorResponse $response, Cookie $cookie) + { + $wkResponse = $this->createResponse($response, $cookie); + $connection->send($wkResponse); + + foreach ($response as $content) { + $connection->send($content, true); + } + } + + protected function sendFile(TcpConnection $connection, \think\Request $request, FileResponse $response, Cookie $cookie) + { + $ifNoneMatch = $request->header('If-None-Match'); + $ifRange = $request->header('If-Range'); + + $code = $response->getCode(); + $file = $response->getFile(); + $eTag = $response->getHeader('ETag'); + $lastModified = $response->getHeader('Last-Modified'); + + $fileSize = $file->getSize(); + $offset = 0; + $length = -1; + + if ($ifNoneMatch == $eTag) { + $code = 304; + } elseif (!$ifRange || $ifRange === $eTag || $ifRange === $lastModified) { + $range = $request->header('Range', ''); + if (Str::startsWith($range, 'bytes=')) { + [$start, $end] = explode('-', substr($range, 6), 2) + [0]; + + $end = ('' === $end) ? $fileSize - 1 : (int) $end; + + if ('' === $start) { + $start = $fileSize - $end; + $end = $fileSize - 1; + } else { + $start = (int) $start; + } + + if ($start <= $end) { + $end = min($end, $fileSize - 1); + if ($start < 0 || $start > $end) { + $code = 416; + $response->header([ + 'Content-Range' => sprintf('bytes */%s', $fileSize), + ]); + } elseif ($end - $start < $fileSize - 1) { + $length = $end < $fileSize ? $end - $start + 1 : -1; + $offset = $start; + $code = 206; + $response->header([ + 'Content-Range' => sprintf('bytes %s-%s/%s', $start, $end, $fileSize), + 'Content-Length' => $end - $start + 1, + ]); + } + } + } + } + + $wkResponse = $this->createResponse($response, $cookie); + + if ($code >= 200 && $code < 300 && $length !== 0) { + $wkResponse->withFile($file->getPathname(), $offset, $length); + } + + $connection->send($wkResponse); + } + + protected function sendContent(TcpConnection $connection, \think\Response $response, Cookie $cookie) + { + $response->header(['Transfer-Encoding' => 'chunked']); + + $wkResponse = $this->createResponse($response, $cookie); + + $connection->send($wkResponse); + + $content = $response->getContent(); + if ($content) { + $contentSize = strlen($content); + $chunkSize = 8192; + + if ($contentSize > $chunkSize) { + $sendSize = 0; + do { + if (!$connection->send(new Chunk(substr($content, $sendSize, $chunkSize)))) { + break; + } + } while (($sendSize += $chunkSize) < $contentSize); + } else { + $connection->send(new Chunk($content)); + } + } + $connection->send(new Chunk('')); + } + + protected function createResponse(\think\Response $response, Cookie $cookie, $body = '') + { + $code = $response->getCode(); + $header = $response->getHeader(); + + $wkResponse = new Response($code, $header, $body); + + foreach ($cookie->getCookie() as $name => $val) { + [$value, $expire, $option] = $val; + $wkResponse->cookie($name, $value, $expire, $option['path'], $option['domain'], (bool) $option['secure'], (bool) $option['httponly'], $option['samesite']); + } + + return $wkResponse; + } +} diff --git a/src/concerns/InteractsWithQueue.php b/src/concerns/InteractsWithQueue.php new file mode 100644 index 0000000..2b7cf00 --- /dev/null +++ b/src/concerns/InteractsWithQueue.php @@ -0,0 +1,97 @@ +getConfig('queue.workers', []); + + foreach ($workers as $queue => $options) { + + if (strpos($queue, '@') !== false) { + [$queue, $connection] = explode('@', $queue); + } else { + $connection = null; + } + + $workerNum = Arr::get($options, 'worker_num', 1); + + $this->addWorker(function () use ($options, $connection, $queue) { + $delay = Arr::get($options, 'delay', 0); + $sleep = Arr::get($options, 'sleep', 3); + $tries = Arr::get($options, 'tries', 0); + $timeout = Arr::get($options, 'timeout', 60); + $qWorker = $this->app->make(Worker::class); + + if ($this->supportsAsyncSignals()) { + pcntl_signal(SIGALRM, function () { + \think\worker\Worker::stopAll(); + }); + + pcntl_alarm($timeout); + } + + $this->createRunTimer(function () use ($connection, $queue, $delay, $sleep, $tries, $qWorker) { + $this->runInSandbox(function () use ($connection, $queue, $delay, $sleep, $tries, $qWorker) { + $qWorker->runNextJob($connection, $queue, $delay, $sleep, $tries); + }); + if ($this->supportsAsyncSignals()) { + pcntl_alarm(0); + } + }); + }, "queue [$queue]", $workerNum); + } + } + + protected function supportsAsyncSignals() + { + return extension_loaded('pcntl'); + } + + protected function createRunTimer($func) + { + Timer::add(0.1, function () use ($func) { + $func(); + $this->createRunTimer($func); + }, [], false); + } + + protected function prepareQueue() + { + if ($this->getConfig('queue.enable', false)) { + $this->listenForEvents(); + $this->createQueueWorkers(); + } + } + + /** + * 注册事件 + */ + protected function listenForEvents() + { + $this->container->event->listen(JobFailed::class, function (JobFailed $event) { + $this->logFailedJob($event); + }); + } + + /** + * 记录失败任务 + * @param JobFailed $event + */ + protected function logFailedJob(JobFailed $event) + { + $this->container['queue.failer']->log( + $event->connection, + $event->job->getQueue(), + $event->job->getRawBody(), + $event->exception + ); + } +} diff --git a/src/concerns/InteractsWithServer.php b/src/concerns/InteractsWithServer.php new file mode 100644 index 0000000..a01809e --- /dev/null +++ b/src/concerns/InteractsWithServer.php @@ -0,0 +1,119 @@ +name = $name; + $worker->count = $count; + + $worker->onWorkerStart = function (Worker $worker) use ($func) { + $this->clearCache(); + $this->prepareApplication(); + + $this->conduit->connect(); + + $this->workerId = $this->ipc->listenMessage(); + + $this->triggerEvent('workerStart', $worker); + + $func($worker); + }; + + $worker->onWorkerReload = function () { + $this->stopping = true; + }; + + return $worker; + } + + public function getWorkerId() + { + return $this->workerId; + } + + public function isStopping() + { + return $this->stopping; + } + + /** + * 启动服务 + */ + public function start(): void + { + $this->initialize(); + $this->prepareIpc(); + $this->triggerEvent('init'); + + //热更新 + if ($this->getConfig('hot_update.enable', false)) { + $this->addHotUpdateWorker(); + } + + Worker::runAll(); + } + + protected function prepareIpc() + { + $this->ipc = $this->container->make(Ipc::class); + } + + public function sendMessage($workerId, $message) + { + $this->ipc->sendMessage($workerId, $message); + } + + /** + * 热更新 + */ + protected function addHotUpdateWorker() + { + $worker = new Worker(); + + $worker->name = 'hot update'; + $worker->reloadable = false; + + $worker->onWorkerStart = function () { + $watcher = $this->container->make(Watcher::class); + $watcher->watch(function () { + posix_kill(posix_getppid(), SIGUSR1); + }); + }; + } + + /** + * 清除apc、op缓存 + */ + protected function clearCache() + { + if (extension_loaded('apc')) { + apc_clear_cache(); + } + + if (extension_loaded('Zend OPcache')) { + opcache_reset(); + } + } + +} diff --git a/src/concerns/InteractsWithWebsocket.php b/src/concerns/InteractsWithWebsocket.php new file mode 100644 index 0000000..486a89e --- /dev/null +++ b/src/concerns/InteractsWithWebsocket.php @@ -0,0 +1,145 @@ +onEvent('workerStart', function () { + $handlerClass = $this->getConfig('websocket.handler', Handler::class); + $this->app->bind(HandlerInterface::class, $handlerClass); + + $this->onEvent('message', function ($message) { + if ($message instanceof PushMessage) { + if (isset($this->messageSender[$message->to])) { + $this->messageSender[$message->to]($message->data); + } + } + }); + }); + } + + public function onHandShake(TcpConnection $connection, WorkerRequest $wkRequest) + { + $this->runInSandbox(function (App $app, Http $http, Event $event) use ($connection, $wkRequest) { + $request = $this->prepareRequest($wkRequest); + + $response = $http->run($request); + if (!$response instanceof \think\worker\response\Websocket) { + $connection->close(); + return; + } + + $event->subscribe([$response]); + $this->upgrade($connection, $wkRequest); + + $websocket = $app->make(Websocket::class, [$connection], true); + $app->instance(Websocket::class, $websocket); + + $id = "{$this->workerId}.{$connection->id}"; + + $websocket->setSender($id); + $websocket->join($id); + + $handler = $app->make(HandlerInterface::class); + + $this->messageSender[$connection->id] = function ($data) use ($connection, $handler) { + $connection->send($handler->encodeMessage($data)); + }; + + try { + $handler->onOpen($request); + } catch (Throwable $e) { + $this->logServerError($e); + } + }, $connection); + } + + public function onMessage(TcpConnection $connection, Frame $frame) + { + $this->runInSandbox(function (App $app) use ($frame) { + $handler = $app->make(HandlerInterface::class); + try { + $handler->onMessage($frame); + } catch (Throwable $e) { + $this->logServerError($e); + } + }, $connection); + } + + public function onClose(TcpConnection $connection) + { + $this->runInSandbox(function (App $app) use ($connection) { + if ($app->exists(Websocket::class)) { + $websocket = $app->make(Websocket::class); + $handler = $app->make(HandlerInterface::class); + try { + $handler->onClose(); + } catch (Throwable $e) { + $this->logServerError($e); + } + + // leave all rooms + $websocket->leave(); + + unset($this->messageSender[$connection->id]); + + $websocket->setConnected(false); + } + }, $connection); + } + + protected function isWebsocketRequest(WorkerRequest $request) + { + $header = $request->header(); + return strcasecmp(Arr::get($header, 'connection', ''), 'upgrade') === 0 && + strcasecmp(Arr::get($header, 'upgrade', ''), 'websocket') === 0; + } + + protected function upgrade(TcpConnection $connection, WorkerRequest $request) + { + $key = $request->header('Sec-WebSocket-Key'); + + $headers = [ + 'Upgrade' => 'websocket', + 'Sec-WebSocket-Version' => '13', + 'Connection' => 'Upgrade', + 'Sec-WebSocket-Accept' => base64_encode(sha1($key . '258EAFA5-E914-47DA-95CA-C5AB0DC85B11', true)), + ]; + + if ($protocol = $request->header('Sec-Websocket-Protocol')) { + $headers['Sec-WebSocket-Protocol'] = $protocol; + } + + $response = new Response(101, $headers); + + $connection->send($response); + + // Websocket data buffer. + $connection->context->websocketDataBuffer = ''; + // Current websocket frame length. + $connection->context->websocketCurrentFrameLength = 0; + // Current websocket frame data. + $connection->context->websocketCurrentFrameBuffer = ''; + + $connection->context->websocketHandshake = true; + } +} diff --git a/src/concerns/ModifyProperty.php b/src/concerns/ModifyProperty.php new file mode 100644 index 0000000..cf75d03 --- /dev/null +++ b/src/concerns/ModifyProperty.php @@ -0,0 +1,18 @@ +hasProperty($property)) { + $reflectProperty = $reflectObject->getProperty($property); + $reflectProperty->setAccessible(true); + $reflectProperty->setValue($object, $value); + } + } +} diff --git a/src/concerns/WithApplication.php b/src/concerns/WithApplication.php new file mode 100644 index 0000000..00b68fb --- /dev/null +++ b/src/concerns/WithApplication.php @@ -0,0 +1,77 @@ +app instanceof WorkerApp) { + $this->app = new WorkerApp($this->container->getRootPath()); + + $this->app->bind(WorkerApp::class, App::class); + $this->app->bind(Manager::class, $this); + + $this->app->initialize(); + $this->app->instance('request', $this->container->request); + $this->prepareConcretes(); + } + } + + /** + * 预加载 + */ + protected function prepareConcretes() + { + $defaultConcretes = ['db', 'cache', 'event']; + + $concretes = array_merge($defaultConcretes, $this->getConfig('concretes', [])); + + foreach ($concretes as $concrete) { + $this->app->make($concrete); + } + } + + public function getApp() + { + return $this->app; + } + + /** + * 获取沙箱 + * @return Sandbox + */ + protected function getSandbox() + { + return $this->app->make(Sandbox::class); + } + + /** + * 在沙箱中执行 + * @param Closure $callable + */ + public function runInSandbox(Closure $callable, ?object $key = null) + { + try { + $this->getSandbox()->run($callable, $key); + } catch (Throwable $e) { + $this->logServerError($e); + } + } +} diff --git a/src/concerns/WithContainer.php b/src/concerns/WithContainer.php new file mode 100644 index 0000000..07152bc --- /dev/null +++ b/src/concerns/WithContainer.php @@ -0,0 +1,75 @@ +container = $container; + } + + protected function getContainer() + { + return $this->container; + } + + /** + * 获取配置 + * @param string $name + * @param mixed $default + * @return mixed + */ + public function getConfig(string $name, $default = null) + { + return $this->container->config->get("worker.{$name}", $default); + } + + /** + * 触发事件 + * @param string $event + * @param mixed $params + */ + public function triggerEvent(string $event, $params = null): void + { + $this->container->event->trigger("worker.{$event}", $params); + } + + /** + * 监听事件 + * @param string $event + * @param $listener + * @param bool $first + */ + public function onEvent(string $event, $listener, bool $first = false): void + { + $this->container->event->listen("worker.{$event}", $listener, $first); + } + + /** + * Log server error. + * + * @param Throwable $e + */ + public function logServerError(Throwable $e) + { + /** @var Handle $handle */ + $handle = $this->container->make(Handle::class); + + $handle->report($e); + } +} diff --git a/src/conduit/Driver.php b/src/conduit/Driver.php new file mode 100644 index 0000000..07bd309 --- /dev/null +++ b/src/conduit/Driver.php @@ -0,0 +1,26 @@ + */ + protected $suspensions = []; + protected $events = []; + + public function __construct(protected Manager $manager) + { + $filename = runtime_path() . 'conduit.sock'; + @unlink($filename); + $this->domain = "unix://{$filename}"; + } + + public function prepare() + { + //启动服务端 + Server::run($this->domain); + } + + public function connect() + { + $suspension = EventLoop::getSuspension(); + $this->connection = $this->createConnection($suspension); + $suspension->suspend(); + + Timer::add($this->pingInterval, function () { + if ($this->connection) { + $this->connection->send(''); + } + }); + + Timer::add(1, function () { + //检查是否超时 + foreach ($this->suspensions as $id => $suspension) { + if (time() - $suspension[1] > 10) { + $suspension[0]->throw(new Exception('conduit connection is timeout')); + unset($this->suspensions[$id]); + } + } + }); + } + + public function get(string $name) + { + return $this->sendAndRecv(Command::create('get', $name)); + } + + public function set(string $name, $value) + { + $this->send(Command::create('set', $name, $value)); + } + + public function inc(string $name, int $step = 1) + { + return $this->sendAndRecv(Command::create('inc', $name, $step)); + } + + public function sAdd(string $name, ...$value) + { + $this->send(Command::create('sAdd', $name, $value)); + } + + public function sRem(string $name, $value) + { + $this->send(Command::create('sRem', $name, $value)); + } + + public function sMembers(string $name) + { + return $this->sendAndRecv(Command::create('sMembers', $name)); + } + + public function publish(string $name, $value) + { + $this->send(Command::create('publish', $name, $value)); + } + + public function subscribe(string $name, $callback) + { + $this->send(Command::create('subscribe', $name)); + $this->events[$name] = $callback; + } + + protected function sendAndRecv(Command $command) + { + $suspension = EventLoop::getSuspension(); + + $id = $this->id++; + + $command->id = $id; + + $this->suspensions[$id] = [$suspension, time()]; + + $this->send($command); + + return $suspension->suspend(); + } + + protected function send(Command $command) + { + if (!$this->connection) { + throw new Exception('conduit connection is disconnected'); + } + + $this->connection->send(serialize($command)); + } + + protected function createConnection(?Suspension $suspension = null) + { + $connection = new AsyncTcpConnection($this->domain); + + $connection->protocol = Frame::class; + + $connection->onConnect = function () use ($suspension) { + $this->clearTimer(); + if ($suspension) { + $suspension->resume(); + } + //补订阅 + foreach ($this->events as $name => $callback) { + $this->send(Command::create('subscribe', $name)); + } + }; + + $connection->onMessage = function ($connection, $buffer) { + /** @var Result|Event $result */ + $result = unserialize($buffer); + + if ($result instanceof Event) { + if (isset($this->events[$result->name])) { + $this->events[$result->name]($result->data); + } + } elseif (isset($result->id) && isset($this->suspensions[$result->id])) { + [$suspension] = $this->suspensions[$result->id]; + $suspension->resume($result->data); + unset($this->suspensions[$result->id]); + } + }; + + $connection->onClose = function () { + $this->connection = null; + //重连 + $this->clearTimer(); + $this->reconnectTimer = Timer::add(1, function () { + $this->connection = $this->createConnection(); + }); + }; + + $connection->connect(); + + return $connection; + } + + protected function clearTimer() + { + if ($this->reconnectTimer) { + Timer::del($this->reconnectTimer); + $this->reconnectTimer = null; + } + } +} diff --git a/src/conduit/driver/socket/Command.php b/src/conduit/driver/socket/Command.php new file mode 100644 index 0000000..9c6450f --- /dev/null +++ b/src/conduit/driver/socket/Command.php @@ -0,0 +1,22 @@ +name = $name; + $packet->key = $key; + $packet->data = $data; + + return $packet; + } +} diff --git a/src/conduit/driver/socket/Event.php b/src/conduit/driver/socket/Event.php new file mode 100644 index 0000000..dc6b0c5 --- /dev/null +++ b/src/conduit/driver/socket/Event.php @@ -0,0 +1,15 @@ + */ + protected $subscribers = []; + + public function onMessage(TcpConnection $connection, $buffer) + { + if (empty($buffer)) { + return; + } + /** @var Command $command */ + $command = unserialize($buffer); + + $result = Result::create($command->id); + + switch ($command->name) { + case 'get': + $result->data = $this->data[$command->key] ?? null; + break; + case 'set': + $this->data[$command->key] = $command->data; + break; + case 'inc': + if (!isset($this->data[$command->key]) || !is_integer($this->data[$command->key])) { + $this->data[$command->key] = 0; + } + $result->data = $this->data[$command->key] += $command->data ?? 1; + break; + case 'sAdd': + if (!isset($this->data[$command->key]) || !is_array($this->data[$command->key])) { + $this->data[$command->key] = []; + } + $this->data[$command->key] = array_merge($this->data[$command->key], $command->data); + break; + case 'sRem': + if (!isset($this->data[$command->key]) || !is_array($this->data[$command->key])) { + $this->data[$command->key] = []; + } + $this->data[$command->key] = array_diff($this->data[$command->key], [$command->data]); + break; + case 'sMembers': + if (!isset($this->data[$command->key]) || !is_array($this->data[$command->key])) { + $this->data[$command->key] = []; + } + $result->data = $this->data[$command->key]; + break; + case 'subscribe': + if (!isset($this->subscribers[$command->key])) { + $this->subscribers[$command->key] = []; + } + $this->subscribers[$command->key][] = $connection; + break; + case 'publish': + if (!empty($this->subscribers[$command->key])) { + foreach ($this->subscribers[$command->key] as $conn) { + $conn->send(serialize(Event::create($command->key, $command->data))); + } + } + break; + } + + if (isset($result->id)) { + $connection->send(serialize($result)); + } + } + + public function onClose(TcpConnection $connection) + { + if (!empty($this->subscribers)) { + foreach ($this->subscribers as $key => $connections) { + $this->subscribers[$key] = array_udiff($connections, [$connection], function ($a, $b) { + return $a <=> $b; + }); + } + } + } + + public static function run($domain) + { + //启动服务端 + $server = new self(); + + $worker = new Worker($domain); + + $worker->name = 'conduit'; + $worker->protocol = Frame::class; + $worker->reloadable = false; + + $worker->onMessage = [$server, 'onMessage']; + $worker->onClose = [$server, 'onClose']; + } +} diff --git a/src/config/worker.php b/src/config/worker.php new file mode 100644 index 0000000..b8bd04f --- /dev/null +++ b/src/config/worker.php @@ -0,0 +1,39 @@ + +// +---------------------------------------------------------------------- + +use think\worker\websocket\Handler; + +return [ + 'http' => [ + 'enable' => true, + 'host' => '0.0.0.0', + 'port' => 8080, + 'worker_num' => 4, + 'options' => [], + ], + 'websocket' => [ + 'enable' => false, + 'handler' => Handler::class, + 'ping_interval' => 25000, + 'ping_timeout' => 60000, + ], + //队列 + 'queue' => [ + 'enable' => false, + 'workers' => [], + ], + 'hot_update' => [ + 'enable' => env('APP_DEBUG', false), + 'name' => ['*.php'], + 'include' => [app_path(), config_path(), root_path('route')], + 'exclude' => [], + ], +]; diff --git a/src/contract/ResetterInterface.php b/src/contract/ResetterInterface.php new file mode 100644 index 0000000..c9345e9 --- /dev/null +++ b/src/contract/ResetterInterface.php @@ -0,0 +1,17 @@ +to = $to; + $this->data = $data; + } +} diff --git a/src/protocols/FlexHttp.php b/src/protocols/FlexHttp.php new file mode 100644 index 0000000..a87297c --- /dev/null +++ b/src/protocols/FlexHttp.php @@ -0,0 +1,39 @@ +context->websocketHandshake)) { + return Http::input($buffer, $connection); + } else { + return Websocket::input($buffer, $connection); + } + } + + public static function decode(string $buffer, TcpConnection $connection) + { + if (empty($connection->context->websocketHandshake)) { + return Http::decode($buffer, $connection); + } else { + $data = Websocket::decode($buffer, $connection); + return new Frame($connection->id, $data); + } + } + + public static function encode(mixed $response, TcpConnection $connection): string + { + if (empty($connection->context->websocketHandshake)) { + return Http::encode($response, $connection); + } else { + return Websocket::encode($response, $connection); + } + } +} diff --git a/src/resetters/ClearInstances.php b/src/resetters/ClearInstances.php new file mode 100644 index 0000000..9aa919f --- /dev/null +++ b/src/resetters/ClearInstances.php @@ -0,0 +1,23 @@ +getConfig()->get('worker.instances', [])); + + foreach ($instances as $instance) { + $app->delete($instance); + } + + return $app; + } +} diff --git a/src/resetters/ResetConfig.php b/src/resetters/ResetConfig.php new file mode 100644 index 0000000..24c648c --- /dev/null +++ b/src/resetters/ResetConfig.php @@ -0,0 +1,18 @@ +instance('config', clone $sandbox->getConfig()); + + return $app; + } +} diff --git a/src/resetters/ResetEvent.php b/src/resetters/ResetEvent.php new file mode 100644 index 0000000..1530e0a --- /dev/null +++ b/src/resetters/ResetEvent.php @@ -0,0 +1,22 @@ +getEvent(); + $this->modifyProperty($event, $app); + $app->instance('event', $event); + + return $app; + } +} diff --git a/src/resetters/ResetModel.php b/src/resetters/ResetModel.php new file mode 100644 index 0000000..9899563 --- /dev/null +++ b/src/resetters/ResetModel.php @@ -0,0 +1,21 @@ +getSnapshot()->invoke(...$args); + }); + } + } +} diff --git a/src/resetters/ResetPaginator.php b/src/resetters/ResetPaginator.php new file mode 100644 index 0000000..3ef634a --- /dev/null +++ b/src/resetters/ResetPaginator.php @@ -0,0 +1,30 @@ +getSnapshot()->request->baseUrl(); + }); + + Paginator::currentPageResolver(function ($varPage = 'page') use ($sandbox) { + + $page = $sandbox->getSnapshot()->request->param($varPage); + + if (filter_var($page, FILTER_VALIDATE_INT) !== false && (int) $page >= 1) { + return (int) $page; + } + + return 1; + }); + } +} diff --git a/src/resetters/ResetService.php b/src/resetters/ResetService.php new file mode 100644 index 0000000..6f6d2bc --- /dev/null +++ b/src/resetters/ResetService.php @@ -0,0 +1,33 @@ +getServices() as $service) { + $this->modifyProperty($service, $app); + if (method_exists($service, 'register')) { + $service->register(); + } + if (method_exists($service, 'boot')) { + $app->invoke([$service, 'boot']); + } + } + } + +} diff --git a/src/response/File.php b/src/response/File.php new file mode 100644 index 0000000..afcca87 --- /dev/null +++ b/src/response/File.php @@ -0,0 +1,112 @@ + 'application/octet-stream', + 'Accept-Ranges' => 'bytes', + ]; + + /** + * @var SplFileInfo + */ + protected $file; + + public function __construct($file, ?string $contentDisposition = null, bool $autoEtag = true, bool $autoLastModified = true, bool $autoContentType = true) + { + $this->setFile($file, $contentDisposition, $autoEtag, $autoLastModified, $autoContentType); + } + + public function getFile() + { + return $this->file; + } + + public function setFile($file, ?string $contentDisposition = null, bool $autoEtag = true, bool $autoLastModified = true, bool $autoContentType = true) + { + if (!$file instanceof SplFileInfo) { + $file = new SplFileInfo((string) $file); + } + + if (!$file->isReadable()) { + throw new RuntimeException('File must be readable.'); + } + + $this->header['Content-Length'] = $file->getSize(); + + $this->file = $file; + + if ($autoEtag) { + $this->setAutoEtag(); + } + + if ($autoLastModified) { + $this->setAutoLastModified(); + } + + if ($contentDisposition) { + $this->setContentDisposition($contentDisposition); + } + + if ($autoContentType) { + $this->setAutoContentType(); + } + + return $this; + } + + public function setAutoContentType() + { + if (extension_loaded('fileinfo')) { + $finfo = finfo_open(FILEINFO_MIME_TYPE); + + $mimeType = finfo_file($finfo, $this->file->getPathname()); + if ($mimeType) { + $this->header['Content-Type'] = $mimeType; + } + } + } + + public function setContentDisposition(string $disposition, string $filename = '') + { + if ('' === $filename) { + $filename = $this->file->getFilename(); + } + + $this->header['Content-Disposition'] = "{$disposition}; filename=\"{$filename}\""; + + return $this; + } + + public function setAutoLastModified() + { + $mTime = $this->file->getMTime(); + if ($mTime) { + $date = DateTime::createFromFormat('U', (string) $mTime); + $this->lastModified($date->format('D, d M Y H:i:s') . ' GMT'); + } + return $this; + } + + public function setAutoEtag() + { + $eTag = "W/\"" . sha1_file($this->file->getPathname()) . "\""; + + return $this->eTag($eTag); + } + + protected function sendData(string $data): void + { + readfile($this->file->getPathname()); + } +} diff --git a/src/response/Iterator.php b/src/response/Iterator.php new file mode 100644 index 0000000..d74bb79 --- /dev/null +++ b/src/response/Iterator.php @@ -0,0 +1,22 @@ +iterator = $iterator; + } + + public function getIterator(): Traversable + { + return $this->iterator; + } +} diff --git a/src/response/Websocket.php b/src/response/Websocket.php new file mode 100644 index 0000000..614e01e --- /dev/null +++ b/src/response/Websocket.php @@ -0,0 +1,66 @@ +listeners['Open'] = $listener; + return $this; + } + + public function onMessage($listener) + { + $this->listeners['Message'] = $listener; + return $this; + } + + public function onEvent($listener) + { + $this->listeners['Event'] = $listener; + return $this; + } + + public function onClose($listener) + { + $this->listeners['Close'] = $listener; + return $this; + } + + public function onConnect($listener) + { + $this->listeners['Connect'] = $listener; + return $this; + } + + public function onDisconnect($listener) + { + $this->listeners['Disconnect'] = $listener; + return $this; + } + + public function onPing($listener) + { + $this->listeners['Ping'] = $listener; + return $this; + } + + public function onPong($listener) + { + $this->listeners['Pong'] = $listener; + return $this; + } + + public function subscribe(Event $event) + { + foreach ($this->listeners as $eventName => $listener) { + $event->listen('worker.websocket.' . $eventName, $listener); + } + } +} diff --git a/src/watcher/Driver.php b/src/watcher/Driver.php new file mode 100644 index 0000000..f02ca2c --- /dev/null +++ b/src/watcher/Driver.php @@ -0,0 +1,8 @@ +directory = $directory; + $this->exclude = $exclude; + $this->name = $name; + } + + public function watch(callable $callback) + { + $ms = 2000; + $seconds = ceil(($ms + 1000) / 1000); + $minutes = sprintf('-%.2f', $seconds / 60); + + $dest = implode(' ', $this->directory); + + $name = empty($this->name) ? '' : ' \( ' . join(' -o ', array_map(fn($v) => "-name \"{$v}\"", $this->name)) . ' \)'; + $notName = ''; + $notPath = ''; + if (!empty($this->exclude)) { + $excludeDirs = $excludeFiles = []; + foreach ($this->exclude as $directory) { + $directory = rtrim($directory, '/'); + if (is_dir($directory)) { + $excludeDirs[] = $directory; + } else { + $excludeFiles[] = $directory; + } + } + + if (!empty($excludeFiles)) { + $notPath = ' -not \( ' . join(' -and ', array_map(fn($v) => "-name \"{$v}\"", $excludeFiles)) . ' \)'; + } + + if (!empty($excludeDirs)) { + $notPath = ' -not \( ' . join(' -and ', array_map(fn($v) => "-path \"{$v}/*\"", $excludeDirs)) . ' \)'; + } + } + + $command = "find {$dest}{$name}{$notName}{$notPath} -mmin {$minutes} -type f -print"; + + Timer::add($ms / 1000, function () use ($callback, $command) { + $stdout = $this->exec($command); + if (!empty($stdout)) { + call_user_func($callback); + } + }); + } + + public function exec($command) + { + $process = Process::fromShellCommandline($command); + $process->run(); + if ($process->isSuccessful()) { + return $process->getOutput(); + } + return false; + } + +} diff --git a/src/watcher/Scan.php b/src/watcher/Scan.php new file mode 100644 index 0000000..bacd478 --- /dev/null +++ b/src/watcher/Scan.php @@ -0,0 +1,52 @@ +finder = new Finder(); + $this->finder + ->files() + ->name($name) + ->in($directory) + ->exclude($exclude); + } + + protected function findFiles() + { + $files = []; + /** @var SplFileInfo $f */ + foreach ($this->finder as $f) { + $files[$f->getRealpath()] = $f->getMTime(); + } + return $files; + } + + public function watch(callable $callback) + { + $this->files = $this->findFiles(); + + Timer::add(2, function () use ($callback) { + $files = $this->findFiles(); + + foreach ($files as $path => $time) { + if (empty($this->files[$path]) || $this->files[$path] != $time) { + call_user_func($callback); + break; + } + } + + $this->files = $files; + }); + } +} diff --git a/src/websocket/Event.php b/src/websocket/Event.php new file mode 100644 index 0000000..411a420 --- /dev/null +++ b/src/websocket/Event.php @@ -0,0 +1,15 @@ +type = $type; + $this->data = $data; + } +} diff --git a/src/websocket/Frame.php b/src/websocket/Frame.php new file mode 100644 index 0000000..82bce04 --- /dev/null +++ b/src/websocket/Frame.php @@ -0,0 +1,10 @@ +event = $event; + } + + /** + * "onOpen" listener. + * + * @param Request $request + */ + public function onOpen(Request $request) + { + $this->event->trigger('worker.websocket.Open', $request); + } + + /** + * "onMessage" listener. + * + * @param Frame $frame + */ + public function onMessage(Frame $frame) + { + $this->event->trigger('worker.websocket.Message', $frame); + + $event = $this->decode($frame->data); + if ($event) { + $this->event->trigger('worker.websocket.Event', $event); + } + } + + /** + * "onClose" listener. + */ + public function onClose() + { + $this->event->trigger('worker.websocket.Close'); + } + + protected function decode($payload) + { + $data = json_decode($payload, true); + if (!empty($data['type'])) { + return new WsEvent($data['type'], $data['data'] ?? null); + } + return null; + } + + public function encodeMessage($message) + { + if ($message instanceof WsEvent) { + return json_encode([ + 'type' => $message->type, + 'data' => $message->data, + ]); + } + return $message; + } +} diff --git a/src/websocket/Pusher.php b/src/websocket/Pusher.php new file mode 100644 index 0000000..0fcfaf6 --- /dev/null +++ b/src/websocket/Pusher.php @@ -0,0 +1,67 @@ +manager = $manager; + $this->room = $room; + } + + public function to(...$values) + { + foreach ($values as $value) { + if (is_array($value)) { + $this->to(...$value); + } elseif (!in_array($value, $this->to)) { + $this->to[] = $value; + } + } + + return $this; + } + + /** + * Push message to related descriptors + * @param $data + * @return void + */ + public function push($data): void + { + $fds = []; + + foreach ($this->to as $room) { + $clients = $this->room->getClients($room); + if (!empty($clients)) { + $fds = array_merge($fds, $clients); + } + } + + foreach (array_unique($fds) as $fd) { + [$workerId, $fd] = explode('.', $fd); + $this->manager->sendMessage((int) $workerId, new PushMessage((int) $fd, $data)); + } + } + + public function emit(string $event, ...$data): void + { + $this->push(new Event($event, $data)); + } +} diff --git a/src/websocket/Room.php b/src/websocket/Room.php new file mode 100644 index 0000000..3851ff3 --- /dev/null +++ b/src/websocket/Room.php @@ -0,0 +1,54 @@ +conduit->sAdd($this->getClientKey($fd), ...$rooms); + + foreach ($rooms as $room) { + $this->conduit->sAdd($this->getRoomKey($room), $fd); + } + } + + public function delete($fd, ...$rooms) + { + $rooms = count($rooms) ? $rooms : $this->getRooms($fd); + + $this->conduit->sRem($this->getClientKey($fd), ...$rooms); + + foreach ($rooms as $room) { + $this->conduit->sRem($this->getRoomKey($room), $fd); + } + } + + public function getClients(string $room) + { + return $this->conduit->sMembers($this->getRoomKey($room)) ?: []; + } + + public function getRooms(string $fd) + { + return $this->conduit->sMembers($this->getClientKey($fd)) ?: []; + } + + protected function getClientKey(string $key) + { + return "ws:client:{$key}"; + } + + protected function getRoomKey($room) + { + return "ws:room:{$room}"; + } +} diff --git a/src/websocket/socketio/EnginePacket.php b/src/websocket/socketio/EnginePacket.php new file mode 100644 index 0000000..d1335d7 --- /dev/null +++ b/src/websocket/socketio/EnginePacket.php @@ -0,0 +1,81 @@ +type = $type; + $this->data = $data; + } + + public static function open($payload) + { + return new self(self::OPEN, $payload); + } + + public static function pong($payload = '') + { + return new self(self::PONG, $payload); + } + + public static function ping() + { + return new self(self::PING); + } + + public static function message($payload) + { + return new self(self::MESSAGE, $payload); + } + + public static function fromString(string $packet) + { + return new self(substr($packet, 0, 1), substr($packet, 1) ?: ''); + } + + public function toString() + { + return $this->type . $this->data; + } +} diff --git a/src/websocket/socketio/Handler.php b/src/websocket/socketio/Handler.php new file mode 100644 index 0000000..948560c --- /dev/null +++ b/src/websocket/socketio/Handler.php @@ -0,0 +1,198 @@ +event = $event; + $this->config = $config; + $this->websocket = $websocket; + $this->pingInterval = $this->config->get('worker.websocket.ping_interval', 25000); + $this->pingTimeout = $this->config->get('worker.websocket.ping_timeout', 60000); + } + + /** + * "onOpen" listener. + * + * @param Request $request + */ + public function onOpen(Request $request) + { + $this->eio = $request->param('EIO'); + + $payload = json_encode( + [ + 'sid' => base64_encode(uniqid()), + 'upgrades' => [], + 'pingInterval' => $this->pingInterval, + 'pingTimeout' => $this->pingTimeout, + ] + ); + + $this->push(EnginePacket::open($payload)); + + $this->event->trigger('worker.websocket.Open', $request); + + if ($this->eio < 4) { + $this->resetPingTimeout($this->pingInterval + $this->pingTimeout); + $this->onConnect(); + } else { + $this->schedulePing(); + } + } + + /** + * "onMessage" listener. + * + * @param Frame $frame + */ + public function onMessage(Frame $frame) + { + $enginePacket = EnginePacket::fromString($frame->data); + + $this->event->trigger('worker.websocket.Message', $enginePacket); + + $this->resetPingTimeout($this->pingInterval + $this->pingTimeout); + + switch ($enginePacket->type) { + case EnginePacket::MESSAGE: + $packet = Packet::fromString($enginePacket->data); + switch ($packet->type) { + case Packet::CONNECT: + $this->onConnect($packet->data); + break; + case Packet::EVENT: + $type = array_shift($packet->data); + $data = $packet->data; + $result = $this->event->trigger('worker.websocket.Event', new WsEvent($type, $data)); + + if ($packet->id !== null) { + $responsePacket = Packet::create(Packet::ACK, [ + 'id' => $packet->id, + 'nsp' => $packet->nsp, + 'data' => $result, + ]); + + $this->push($responsePacket); + } + break; + case Packet::DISCONNECT: + $this->event->trigger('worker.websocket.Disconnect'); + $this->websocket->close(); + break; + default: + $this->websocket->close(); + break; + } + break; + case EnginePacket::PING: + $this->event->trigger('worker.websocket.Ping'); + $this->push(EnginePacket::pong($enginePacket->data)); + break; + case EnginePacket::PONG: + $this->event->trigger('worker.websocket.Pong'); + $this->schedulePing(); + break; + default: + $this->websocket->close(); + break; + } + } + + /** + * "onClose" listener. + */ + public function onClose() + { + Timer::del($this->pingTimeoutTimer); + Timer::del($this->pingIntervalTimer); + $this->event->trigger('worker.websocket.Close'); + } + + protected function onConnect($data = null) + { + try { + $this->event->trigger('worker.websocket.Connect', $data); + $packet = Packet::create(Packet::CONNECT); + if ($this->eio >= 4) { + $packet->data = ['sid' => base64_encode(uniqid())]; + } + } catch (Exception $exception) { + $packet = Packet::create(Packet::CONNECT_ERROR, [ + 'data' => ['message' => $exception->getMessage()], + ]); + } + + $this->push($packet); + } + + protected function resetPingTimeout($timeout) + { + Timer::del($this->pingTimeoutTimer); + $this->pingTimeoutTimer = Timer::delay($timeout, function () { + $this->websocket->close(); + }); + } + + protected function schedulePing() + { + Timer::del($this->pingIntervalTimer); + $this->pingIntervalTimer = Timer::delay($this->pingInterval, function () { + $this->push(EnginePacket::ping()); + $this->resetPingTimeout($this->pingTimeout); + }); + } + + public function encodeMessage($message) + { + if ($message instanceof WsEvent) { + $message = Packet::create(Packet::EVENT, [ + 'data' => array_merge([$message->type], $message->data), + ]); + } + + if ($message instanceof Packet) { + $message = EnginePacket::message($message->toString()); + } + + if ($message instanceof EnginePacket) { + $message = $message->toString(); + } + + return $message; + } + + protected function push($data) + { + $this->websocket->push($data); + } + +} diff --git a/src/websocket/socketio/Packet.php b/src/websocket/socketio/Packet.php new file mode 100644 index 0000000..9b277a8 --- /dev/null +++ b/src/websocket/socketio/Packet.php @@ -0,0 +1,134 @@ +type = $type; + } + + public static function create($type, array $decoded = []) + { + $new = new self($type); + $new->id = $decoded['id'] ?? null; + if (isset($decoded['nsp'])) { + $new->nsp = $decoded['nsp'] ?: '/'; + } else { + $new->nsp = '/'; + } + $new->data = $decoded['data'] ?? null; + return $new; + } + + public function toString() + { + $str = '' . $this->type; + if ($this->nsp && '/' !== $this->nsp) { + $str .= $this->nsp . ','; + } + + if ($this->id !== null) { + $str .= $this->id; + } + + if (null !== $this->data) { + $str .= json_encode($this->data, JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES); + } + return $str; + } + + public static function fromString(string $str) + { + $i = 0; + + $packet = new Packet((int) substr($str, 0, 1)); + + // look up namespace (if any) + if ('/' === substr($str, $i + 1, 1)) { + $nsp = ''; + while (++$i) { + $c = substr($str, $i, 1); + if (',' === $c) { + break; + } + $nsp .= $c; + if ($i === strlen($str)) { + break; + } + } + $packet->nsp = $nsp; + } else { + $packet->nsp = '/'; + } + + // look up id + $next = substr($str, $i + 1, 1); + if ('' !== $next && is_numeric($next)) { + $id = ''; + while (++$i) { + $c = substr($str, $i, 1); + if (null == $c || !is_numeric($c)) { + --$i; + break; + } + $id .= substr($str, $i, 1); + if ($i === strlen($str)) { + break; + } + } + $packet->id = intval($id); + } + + // look up json data + if (substr($str, ++$i, 1)) { + $packet->data = json_decode(substr($str, $i), true); + } + + return $packet; + } +} diff --git a/tests/Pest.php b/tests/Pest.php new file mode 100644 index 0000000..48d3610 --- /dev/null +++ b/tests/Pest.php @@ -0,0 +1,2 @@ + 'false', + 'PHP_QUEUE_ENABLE' => 'false', + ]); + $process->start(); + $wait = 0; + + while (!$process->getOutput()) { + $wait++; + if ($wait > 30) { + throw new Exception('server start failed'); + } + sleep(1); + } +}); + +afterAll(function () use (&$process) { + echo $process->getOutput(); + $process->stop(); +}); + +beforeEach(function () { + $this->httpClient = new Client([ + 'base_uri' => 'http://127.0.0.1:8080', + 'cookies' => true, + 'http_errors' => false, + 'timeout' => 1, + ]); +}); + +it('callback route', function () { + $response = $this->httpClient->get('/'); + + expect($response->getStatusCode()) + ->toBe(200) + ->and($response->getBody()->getContents()) + ->toBe('hello world'); +}); + +it('controller route', function () { + $jar = new CookieJar(); + + $response = $this->httpClient->get('/test', ['cookies' => $jar]); + + expect($response->getStatusCode()) + ->toBe(200) + ->and($response->getBody()->getContents()) + ->toBe('test') + ->and($jar->getCookieByName('name')->getValue()) + ->toBe('think'); +}); + +it('json post', function () { + + $data = [ + 'name' => 'think', + ]; + $response = $this->httpClient->post('/json', [ + 'json' => $data, + ]); + + expect($response->getStatusCode()) + ->toBe(200) + ->and($response->getBody()->getContents()) + ->toBe(json_encode($data)); +}); + +it('put and delete request', function () { + $response = $this->httpClient->put('/'); + + expect($response->getStatusCode()) + ->toBe(200) + ->and($response->getBody()->getContents()) + ->toBe('put'); + + $response = $this->httpClient->delete('/'); + + expect($response->getStatusCode()) + ->toBe(200) + ->and($response->getBody()->getContents()) + ->toBe('delete'); +}); + +it('file response', function () { + $response = $this->httpClient->get('/static/asset.txt'); + + expect($response->getStatusCode()) + ->toBe(200) + ->and($response->getBody()->getContents()) + ->toBe(file_get_contents(STUB_DIR . '/public/asset.txt')); +}); + +it('sse', function () { + $response = $this->httpClient->get('/sse', [ + 'stream' => true, + 'timeout' => 3, + ]); + + $body = $response->getBody(); + + $buffer = ''; + while (!$body->eof()) { + $text = $body->read(1); + if ($text == "\r") { + continue; + } + $buffer .= $text; + if ($text == "\n") { + if ($buffer != "\n") { + expect($buffer)->toStartWith('data: '); + } + $buffer = ''; + } + } +}); + +it('hot update', function () { + $response = $this->httpClient->get('/hot'); + + expect($response->getStatusCode()) + ->toBe(404); + + $route = <<httpClient->get('/hot'); + + expect($response->getStatusCode()) + ->toBe(200) + ->and($response->getBody()->getContents()) + ->toBe('hot'); +})->after(function () { + @unlink(STUB_DIR . '/route/hot.php'); +})->skipOnWindows(); diff --git a/tests/feature/QueueTest.php b/tests/feature/QueueTest.php new file mode 100644 index 0000000..be3ed3e --- /dev/null +++ b/tests/feature/QueueTest.php @@ -0,0 +1,5 @@ + 'true', + 'PHP_QUEUE_ENABLE' => 'false', + 'PHP_HOT_ENABLE' => 'false', + ]); + $process->start(); + $wait = 0; + + while (!$process->getOutput()) { + $wait++; + if ($wait > 30) { + throw new Exception('server start failed'); + } + sleep(1); + } +}); + +afterAll(function () use (&$process) { + echo $process->getOutput(); + $process->stop(); +}); + +beforeEach(function () { + $this->httpClient = new Client([ + 'base_uri' => 'http://127.0.0.1:8080', + 'cookies' => true, + 'http_errors' => false, + 'timeout' => 1, + ]); +}); + +it('http', function () { + $response = $this->httpClient->get('/'); + + expect($response->getStatusCode()) + ->toBe(200) + ->and($response->getBody()->getContents()) + ->toBe('hello world'); +}); + +it('websocket', function () { + $connected = 0; + $messages = []; + connect('ws://127.0.0.1:8080/websocket') + ->then(function (\Ratchet\Client\WebSocket $conn) use (&$connected, &$messages) { + $connected++; + $conn->on('message', function ($msg) use ($conn, &$messages) { + $messages[] = (string) $msg; + $conn->close(); + }); + }); + + connect('ws://127.0.0.1:8080/websocket') + ->then(function (\Ratchet\Client\WebSocket $conn) use (&$connected, &$messages) { + $connected++; + $conn->on('message', function ($msg) use ($conn, &$messages) { + $messages[] = (string) $msg; + $conn->close(); + }); + + $conn->send('hello'); + }); + + Loop::get()->run(); + + expect($connected)->toBe(2); + expect($messages)->toBe(['hello', 'hello']); +}); diff --git a/tests/stub/.env b/tests/stub/.env new file mode 100644 index 0000000..ea0466f --- /dev/null +++ b/tests/stub/.env @@ -0,0 +1 @@ +APP_DEBUG=true diff --git a/tests/stub/app/controller/Index.php b/tests/stub/app/controller/Index.php new file mode 100644 index 0000000..823461c --- /dev/null +++ b/tests/stub/app/controller/Index.php @@ -0,0 +1,20 @@ +post()); + } +} diff --git a/tests/stub/config/app.php b/tests/stub/config/app.php new file mode 100644 index 0000000..9dbc43d --- /dev/null +++ b/tests/stub/config/app.php @@ -0,0 +1,4 @@ + 'error', +]; diff --git a/tests/stub/config/cache.php b/tests/stub/config/cache.php new file mode 100644 index 0000000..4b44a20 --- /dev/null +++ b/tests/stub/config/cache.php @@ -0,0 +1,9 @@ + 'file', + 'stores' => [ + 'file' => [ + 'type' => 'File', + ], + ] +]; diff --git a/tests/stub/config/queue.php b/tests/stub/config/queue.php new file mode 100644 index 0000000..fb911ff --- /dev/null +++ b/tests/stub/config/queue.php @@ -0,0 +1,40 @@ + +// +---------------------------------------------------------------------- + +return [ + 'default' => 'redis', + 'connections' => [ + 'sync' => [ + 'type' => 'sync', + ], + 'database' => [ + 'type' => 'database', + 'queue' => 'default', + 'table' => 'jobs', + 'connection' => null, + ], + 'redis' => [ + 'type' => 'redis', + 'queue' => 'default', + 'host' => env('REDIS_HOST', 'redis'), + 'port' => env('REDIS_PORT', 6379), + 'password' => '', + 'select' => 0, + 'timeout' => 0, + 'persistent' => true, + 'retry_after' => 600, + ], + ], + 'failed' => [ + 'type' => 'none', + 'table' => 'failed_jobs', + ], +]; diff --git a/tests/stub/config/worker.php b/tests/stub/config/worker.php new file mode 100644 index 0000000..952e79d --- /dev/null +++ b/tests/stub/config/worker.php @@ -0,0 +1,45 @@ + +// +---------------------------------------------------------------------- + +use think\worker\websocket\Handler; + +return [ + 'http' => [ + 'enable' => env('HTTP_ENABLE', true), + 'host' => '0.0.0.0', + 'port' => 8080, + 'worker_num' => 2, + 'options' => [], + ], + 'websocket' => [ + 'enable' => env('WEBSOCKET_ENABLE', true), + 'handler' => Handler::class, + 'ping_interval' => 25000, + 'ping_timeout' => 60000, + ], + //队列 + 'queue' => [ + 'enable' => env('QUEUE_ENABLE', true), + 'workers' => [ + 'default' => [], + ], + ], + //共享数据 + 'conduit' => [ + 'type' => 'socket', + ], + 'hot_update' => [ + 'enable' => env('HOT_ENABLE', true), + 'name' => ['*.php'], + 'include' => [app_path(), config_path(), root_path('route')], + 'exclude' => [], + ], +]; diff --git a/tests/stub/public/asset.txt b/tests/stub/public/asset.txt new file mode 100644 index 0000000..4c545ef --- /dev/null +++ b/tests/stub/public/asset.txt @@ -0,0 +1 @@ +Asset diff --git a/tests/stub/route/app.php b/tests/stub/route/app.php new file mode 100644 index 0000000..c137ddc --- /dev/null +++ b/tests/stub/route/app.php @@ -0,0 +1,51 @@ +header([ + 'Content-Type' => 'text/event-stream', + 'Cache-Control' => 'no-cache, must-revalidate', + ]); +}); + +Route::get('/websocket', function () { + return (new \think\worker\response\Websocket()) + ->onOpen(function (\think\worker\Websocket $websocket) { + $websocket->join('foo'); + }) + ->onMessage(function (\think\worker\Websocket $websocket, \think\worker\websocket\Frame $frame) { + $websocket->to('foo')->push($frame->data); + }); +}); + +Route::get('test', 'index/test'); +Route::post('json', 'index/json'); + +Route::get('static/:path', function (string $path) { + $filename = public_path() . $path; + return new \think\worker\response\File($filename); +})->pattern(['path' => '.*\.\w+$']); diff --git a/tests/stub/runtime/.gitignore b/tests/stub/runtime/.gitignore new file mode 100644 index 0000000..c96a04f --- /dev/null +++ b/tests/stub/runtime/.gitignore @@ -0,0 +1,2 @@ +* +!.gitignore \ No newline at end of file diff --git a/tests/stub/think b/tests/stub/think new file mode 100644 index 0000000..d82ca91 --- /dev/null +++ b/tests/stub/think @@ -0,0 +1,12 @@ +#!/usr/bin/env php +console->addCommands([\think\worker\command\Server::class]); + +$app->console->run();