diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..9912bd5
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,3 @@
+composer.lock
+vendor/
+.idea/
diff --git a/LICENSE b/LICENSE
new file mode 100644
index 0000000..8dada3e
--- /dev/null
+++ b/LICENSE
@@ -0,0 +1,201 @@
+ Apache License
+ Version 2.0, January 2004
+ http://www.apache.org/licenses/
+
+ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+ 1. Definitions.
+
+ "License" shall mean the terms and conditions for use, reproduction,
+ and distribution as defined by Sections 1 through 9 of this document.
+
+ "Licensor" shall mean the copyright owner or entity authorized by
+ the copyright owner that is granting the License.
+
+ "Legal Entity" shall mean the union of the acting entity and all
+ other entities that control, are controlled by, or are under common
+ control with that entity. For the purposes of this definition,
+ "control" means (i) the power, direct or indirect, to cause the
+ direction or management of such entity, whether by contract or
+ otherwise, or (ii) ownership of fifty percent (50%) or more of the
+ outstanding shares, or (iii) beneficial ownership of such entity.
+
+ "You" (or "Your") shall mean an individual or Legal Entity
+ exercising permissions granted by this License.
+
+ "Source" form shall mean the preferred form for making modifications,
+ including but not limited to software source code, documentation
+ source, and configuration files.
+
+ "Object" form shall mean any form resulting from mechanical
+ transformation or translation of a Source form, including but
+ not limited to compiled object code, generated documentation,
+ and conversions to other media types.
+
+ "Work" shall mean the work of authorship, whether in Source or
+ Object form, made available under the License, as indicated by a
+ copyright notice that is included in or attached to the work
+ (an example is provided in the Appendix below).
+
+ "Derivative Works" shall mean any work, whether in Source or Object
+ form, that is based on (or derived from) the Work and for which the
+ editorial revisions, annotations, elaborations, or other modifications
+ represent, as a whole, an original work of authorship. For the purposes
+ of this License, Derivative Works shall not include works that remain
+ separable from, or merely link (or bind by name) to the interfaces of,
+ the Work and Derivative Works thereof.
+
+ "Contribution" shall mean any work of authorship, including
+ the original version of the Work and any modifications or additions
+ to that Work or Derivative Works thereof, that is intentionally
+ submitted to Licensor for inclusion in the Work by the copyright owner
+ or by an individual or Legal Entity authorized to submit on behalf of
+ the copyright owner. For the purposes of this definition, "submitted"
+ means any form of electronic, verbal, or written communication sent
+ to the Licensor or its representatives, including but not limited to
+ communication on electronic mailing lists, source code control systems,
+ and issue tracking systems that are managed by, or on behalf of, the
+ Licensor for the purpose of discussing and improving the Work, but
+ excluding communication that is conspicuously marked or otherwise
+ designated in writing by the copyright owner as "Not a Contribution."
+
+ "Contributor" shall mean Licensor and any individual or Legal Entity
+ on behalf of whom a Contribution has been received by Licensor and
+ subsequently incorporated within the Work.
+
+ 2. Grant of Copyright License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ copyright license to reproduce, prepare Derivative Works of,
+ publicly display, publicly perform, sublicense, and distribute the
+ Work and such Derivative Works in Source or Object form.
+
+ 3. Grant of Patent License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ (except as stated in this section) patent license to make, have made,
+ use, offer to sell, sell, import, and otherwise transfer the Work,
+ where such license applies only to those patent claims licensable
+ by such Contributor that are necessarily infringed by their
+ Contribution(s) alone or by combination of their Contribution(s)
+ with the Work to which such Contribution(s) was submitted. If You
+ institute patent litigation against any entity (including a
+ cross-claim or counterclaim in a lawsuit) alleging that the Work
+ or a Contribution incorporated within the Work constitutes direct
+ or contributory patent infringement, then any patent licenses
+ granted to You under this License for that Work shall terminate
+ as of the date such litigation is filed.
+
+ 4. Redistribution. You may reproduce and distribute copies of the
+ Work or Derivative Works thereof in any medium, with or without
+ modifications, and in Source or Object form, provided that You
+ meet the following conditions:
+
+ (a) You must give any other recipients of the Work or
+ Derivative Works a copy of this License; and
+
+ (b) You must cause any modified files to carry prominent notices
+ stating that You changed the files; and
+
+ (c) You must retain, in the Source form of any Derivative Works
+ that You distribute, all copyright, patent, trademark, and
+ attribution notices from the Source form of the Work,
+ excluding those notices that do not pertain to any part of
+ the Derivative Works; and
+
+ (d) If the Work includes a "NOTICE" text file as part of its
+ distribution, then any Derivative Works that You distribute must
+ include a readable copy of the attribution notices contained
+ within such NOTICE file, excluding those notices that do not
+ pertain to any part of the Derivative Works, in at least one
+ of the following places: within a NOTICE text file distributed
+ as part of the Derivative Works; within the Source form or
+ documentation, if provided along with the Derivative Works; or,
+ within a display generated by the Derivative Works, if and
+ wherever such third-party notices normally appear. The contents
+ of the NOTICE file are for informational purposes only and
+ do not modify the License. You may add Your own attribution
+ notices within Derivative Works that You distribute, alongside
+ or as an addendum to the NOTICE text from the Work, provided
+ that such additional attribution notices cannot be construed
+ as modifying the License.
+
+ You may add Your own copyright statement to Your modifications and
+ may provide additional or different license terms and conditions
+ for use, reproduction, or distribution of Your modifications, or
+ for any such Derivative Works as a whole, provided Your use,
+ reproduction, and distribution of the Work otherwise complies with
+ the conditions stated in this License.
+
+ 5. Submission of Contributions. Unless You explicitly state otherwise,
+ any Contribution intentionally submitted for inclusion in the Work
+ by You to the Licensor shall be under the terms and conditions of
+ this License, without any additional terms or conditions.
+ Notwithstanding the above, nothing herein shall supersede or modify
+ the terms of any separate license agreement you may have executed
+ with Licensor regarding such Contributions.
+
+ 6. Trademarks. This License does not grant permission to use the trade
+ names, trademarks, service marks, or product names of the Licensor,
+ except as required for reasonable and customary use in describing the
+ origin of the Work and reproducing the content of the NOTICE file.
+
+ 7. Disclaimer of Warranty. Unless required by applicable law or
+ agreed to in writing, Licensor provides the Work (and each
+ Contributor provides its Contributions) on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ implied, including, without limitation, any warranties or conditions
+ of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+ PARTICULAR PURPOSE. You are solely responsible for determining the
+ appropriateness of using or redistributing the Work and assume any
+ risks associated with Your exercise of permissions under this License.
+
+ 8. Limitation of Liability. In no event and under no legal theory,
+ whether in tort (including negligence), contract, or otherwise,
+ unless required by applicable law (such as deliberate and grossly
+ negligent acts) or agreed to in writing, shall any Contributor be
+ liable to You for damages, including any direct, indirect, special,
+ incidental, or consequential damages of any character arising as a
+ result of this License or out of the use or inability to use the
+ Work (including but not limited to damages for loss of goodwill,
+ work stoppage, computer failure or malfunction, or any and all
+ other commercial damages or losses), even if such Contributor
+ has been advised of the possibility of such damages.
+
+ 9. Accepting Warranty or Additional Liability. While redistributing
+ the Work or Derivative Works thereof, You may choose to offer,
+ and charge a fee for, acceptance of support, warranty, indemnity,
+ or other liability obligations and/or rights consistent with this
+ License. However, in accepting such obligations, You may act only
+ on Your own behalf and on Your sole responsibility, not on behalf
+ of any other Contributor, and only if You agree to indemnify,
+ defend, and hold each Contributor harmless for any liability
+ incurred by, or claims asserted against, such Contributor by reason
+ of your accepting any such warranty or additional liability.
+
+ END OF TERMS AND CONDITIONS
+
+ APPENDIX: How to apply the Apache License to your work.
+
+ To apply the Apache License to your work, attach the following
+ boilerplate notice, with the fields enclosed by brackets "{}"
+ replaced with your own identifying information. (Don't include
+ the brackets!) The text should be enclosed in the appropriate
+ comment syntax for the file format. We also recommend that a
+ file or class name and description of purpose be included on the
+ same "printed page" as the copyright notice for easier
+ identification within third-party archives.
+
+ Copyright {yyyy} {name of copyright owner}
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
diff --git a/README.md b/README.md
index e75289b..f0a92d2 100644
--- a/README.md
+++ b/README.md
@@ -1,2 +1,139 @@
-# think-worker
+ThinkPHP Workerman 扩展
+===============
+交流群:981069000 [](https://qm.qq.com/q/A8YNpzrzC8)
+
+## 安装
+```
+composer require topthink/think-worker
+```
+
+## 说明
+> 由于windows下无法在一个文件里启动多个worker,所以本扩展不支持windows平台
+
+## 使用方法
+
+### HttpServer
+
+在命令行启动服务端
+~~~
+php think worker
+~~~
+
+然后就可以通过浏览器直接访问当前应用
+
+~~~
+http://localhost:8080
+~~~
+
+如果需要使用守护进程方式运行,建议使用supervisor来管理进程
+
+## 访问静态文件
+> 建议使用nginx来支持静态文件访问,也可使用路由输出文件内容,下面是示例,可参照修改
+1. 添加静态文件路由:
+
+```php
+Route::get('static/:path', function (string $path) {
+ $filename = public_path() . $path;
+ return new \think\worker\response\File($filename);
+})->pattern(['path' => '.*\.\w+$']);
+```
+
+2. 访问路由 `http://localhost/static/文件路径`
+
+## 队列支持
+
+使用方法见 [think-queue](https://github.com/top-think/think-queue)
+
+以下配置代替think-queue里的最后一步:`监听任务并执行`,无需另外起进程执行队列
+
+```php
+return [
+ // ...
+ 'queue' => [
+ 'enable' => true,
+ //键名是队列名称
+ 'workers' => [
+ //下面参数是不设置时的默认配置
+ 'default' => [
+ 'delay' => 0,
+ 'sleep' => 3,
+ 'tries' => 0,
+ 'timeout' => 60,
+ 'worker_num' => 1,
+ ],
+ //使用@符号后面可指定队列使用驱动
+ 'default@connection' => [
+ //此处可不设置任何参数,使用上面的默认配置
+ ],
+ ],
+ ],
+ // ...
+];
+
+```
+
+### websocket
+
+> 使用路由调度的方式,可以让不同路径的websocket服务响应不同的事件
+
+#### 配置
+
+```
+worker.websocket = true 时开启
+```
+
+#### 路由定义
+```php
+Route::get('path1','controller/action1');
+Route::get('path2','controller/action2');
+```
+
+#### 控制器
+
+```php
+use \think\worker\Websocket;
+use \think\worker\websocket\Frame;
+
+class Controller {
+
+ public function action1(){
+
+ return (new \think\worker\response\Websocket())
+ ->onOpen(...)
+ ->onMessage(function(Websocket $websocket, Frame $frame){
+ ...
+ })
+ ->onClose(...);
+ }
+
+ public function action2(){
+
+ return (new \think\worker\response\Websocket())
+ ->onOpen(...)
+ ->onMessage(function(Websocket $websocket, Frame $frame){
+ ...
+ })
+ ->onClose(...);
+ }
+}
+```
+
+
+## 自定义worker
+监听`worker.init`事件 注入`Manager`对象,调用addWorker方法添加
+~~~php
+use think\worker\Manager;
+use \think\worker\Worker;
+
+//...
+
+public function handle(Manager $manager){
+ $worker = $manager->addWorker(function(Worker $worker){
+ //..其他回调或处理
+ //动态添加监听可参考 https://www.workerman.net/doc/workerman/worker/listen.html
+ });
+}
+
+//...
+~~~
diff --git a/composer.json b/composer.json
new file mode 100644
index 0000000..8395726
--- /dev/null
+++ b/composer.json
@@ -0,0 +1,55 @@
+{
+ "name": "topthink/think-worker",
+ "description": "workerman extend for thinkphp",
+ "license": "Apache-2.0",
+ "authors": [
+ {
+ "name": "liu21st",
+ "email": "liu21st@gmail.com"
+ }
+ ],
+ "require": {
+ "php": ">=8.2",
+ "workerman/workerman": "~5.0.0",
+ "topthink/framework": "^8.0",
+ "revolt/event-loop": "^1.0",
+ "workerman/redis": "^2.0",
+ "symfony/finder": ">=4.3"
+ },
+ "autoload": {
+ "psr-4": {
+ "think\\worker\\": "src"
+ }
+ },
+ "extra": {
+ "think": {
+ "services": [
+ "think\\worker\\Service"
+ ],
+ "config": {
+ "worker": "src/config/worker.php"
+ }
+ }
+ },
+ "require-dev": {
+ "pestphp/pest": "^3.7",
+ "guzzlehttp/guzzle": "^7.0",
+ "topthink/think-queue": "^3.0",
+ "phpstan/phpstan": "^2.0",
+ "ratchet/pawl": "^0.4.1"
+ },
+ "autoload-dev": {
+ "psr-4": {
+ "app\\": "tests/stub/app"
+ }
+ },
+ "config": {
+ "allow-plugins": {
+ "pestphp/pest-plugin": true
+ }
+ },
+ "scripts": {
+ "analyze": "phpstan --memory-limit=1G",
+ "test": "pest --colors=always"
+ }
+}
diff --git a/phpstan.neon b/phpstan.neon
new file mode 100644
index 0000000..6c8556d
--- /dev/null
+++ b/phpstan.neon
@@ -0,0 +1,16 @@
+parameters:
+ level: 5
+ paths:
+ - src
+ - tests
+ universalObjectCratesClasses:
+ - PHPUnit\Framework\TestCase
+ ignoreErrors:
+ - '#Function root_path not found.#'
+ - '#Function env not found.#'
+ - '#Function app_path not found.#'
+ - '#Function config_path not found.#'
+ - '#Function public_path not found.#'
+ - '#Function json not found.#'
+ - '#Function runtime_path not found.#'
+ - '#Constant STUB_DIR not found.#'
diff --git a/phpunit.xml b/phpunit.xml
new file mode 100644
index 0000000..715322c
--- /dev/null
+++ b/phpunit.xml
@@ -0,0 +1,18 @@
+
+
+
+
+ ./tests
+
+
+
+
+
+ ./src
+
+
+
diff --git a/src/App.php b/src/App.php
new file mode 100644
index 0000000..08a6210
--- /dev/null
+++ b/src/App.php
@@ -0,0 +1,23 @@
+inConsole = $inConsole;
+ }
+
+ public function runningInConsole(): bool
+ {
+ return $this->inConsole;
+ }
+
+ public function clearInstances()
+ {
+ $this->instances = [];
+ }
+}
diff --git a/src/Conduit.php b/src/Conduit.php
new file mode 100644
index 0000000..683de5a
--- /dev/null
+++ b/src/Conduit.php
@@ -0,0 +1,22 @@
+app->config->get("worker.conduit.{$name}", []);
+ }
+
+ public function getDefaultDriver()
+ {
+ return $this->app->config->get('worker.conduit.type', 'socket');
+ }
+}
diff --git a/src/Http.php b/src/Http.php
new file mode 100644
index 0000000..e4f9e86
--- /dev/null
+++ b/src/Http.php
@@ -0,0 +1,17 @@
+subscribe();
+ return $this->workerId;
+ }
+
+ public function sendMessage($workerId, $message)
+ {
+ if ($workerId === $this->workerId) {
+ $this->manager->triggerEvent('message', $message);
+ } else {
+ $this->publish($workerId, $message);
+ }
+ }
+
+ public function subscribe()
+ {
+ $this->workerId = $this->conduit->inc('ipc:worker');
+ $this->conduit->subscribe("ipc:message:{$this->workerId}", function ($message) {
+ $this->manager->triggerEvent('message', unserialize($message));
+ });
+ }
+
+ public function publish($workerId, $message)
+ {
+ $this->conduit->publish("ipc:message:{$workerId}", serialize($message));
+ }
+}
diff --git a/src/Manager.php b/src/Manager.php
new file mode 100644
index 0000000..ba9fcf4
--- /dev/null
+++ b/src/Manager.php
@@ -0,0 +1,27 @@
+prepareHttp();
+ $this->prepareQueue();
+ $this->prepareConduit();
+ }
+}
diff --git a/src/Sandbox.php b/src/Sandbox.php
new file mode 100644
index 0000000..f0599b6
--- /dev/null
+++ b/src/Sandbox.php
@@ -0,0 +1,210 @@
+app = $app;
+ $this->snapshots = new WeakMap();
+ $this->initialize();
+ }
+
+ protected function initialize()
+ {
+ Container::setInstance(function () {
+ return $this->getSnapshot();
+ });
+
+ $this->setInitialConfig();
+ $this->setInitialServices();
+ $this->setInitialEvent();
+ $this->setInitialResetters();
+ }
+
+ public function run(Closure $callable, ?object $key = null)
+ {
+ $this->snapshot = $this->createApp($key);
+ try {
+ $this->snapshot->invoke($callable, [$this]);
+ } catch (Throwable $e) {
+ $this->snapshot->make(Handle::class)->report($e);
+ } finally {
+ if (empty($key)) {
+ $this->snapshot->clearInstances();
+ }
+ $this->snapshot = null;
+ $this->setInstance($this->app);
+ }
+ }
+
+ protected function createApp(?object $key = null)
+ {
+ if (!empty($key)) {
+ if (isset($this->snapshots[$key])) {
+ return $this->snapshots[$key]->app;
+ }
+ }
+
+ $app = clone $this->app;
+ $this->setInstance($app);
+ $this->resetApp($app);
+
+ if (!empty($key)) {
+ $this->snapshots[$key] = new class($app) {
+ public function __construct(public App $app)
+ {
+ }
+
+ public function __destruct()
+ {
+ $this->app->clearInstances();
+ }
+ };
+ }
+
+ return $app;
+ }
+
+ protected function resetApp(App $app)
+ {
+ foreach ($this->resetters as $resetter) {
+ $resetter->handle($app, $this);
+ }
+ }
+
+ protected function setInstance(App $app)
+ {
+ $app->instance('app', $app);
+ $app->instance(Container::class, $app);
+
+ $reflectObject = new ReflectionObject($app);
+ $reflectProperty = $reflectObject->getProperty('services');
+ $services = $reflectProperty->getValue($app);
+
+ foreach ($services as $service) {
+ $this->modifyProperty($service, $app);
+ }
+ }
+
+ /**
+ * Set initial config.
+ */
+ protected function setInitialConfig()
+ {
+ $this->config = clone $this->app->config;
+ }
+
+ protected function setInitialEvent()
+ {
+ $this->event = clone $this->app->event;
+ }
+
+ protected function setInitialServices()
+ {
+ $services = $this->config->get('worker.services', []);
+
+ foreach ($services as $service) {
+ if (class_exists($service) && !in_array($service, $this->services)) {
+ $serviceObj = new $service($this->app);
+ $this->services[$service] = $serviceObj;
+ }
+ }
+ }
+
+ /**
+ * Initialize resetters.
+ */
+ protected function setInitialResetters()
+ {
+ $resetters = [
+ ClearInstances::class,
+ ResetConfig::class,
+ ResetEvent::class,
+ ResetService::class,
+ ResetModel::class,
+ ResetPaginator::class,
+ ];
+
+ $resetters = array_merge($resetters, $this->config->get('worker.resetters', []));
+
+ foreach ($resetters as $resetter) {
+ $resetterClass = $this->app->make($resetter);
+ if (!$resetterClass instanceof ResetterInterface) {
+ throw new RuntimeException("{$resetter} must implement " . ResetterInterface::class);
+ }
+ $this->resetters[$resetter] = $resetterClass;
+ }
+ }
+
+ public function getSnapshot()
+ {
+ $snapshot = $this->snapshot;
+ if ($snapshot instanceof App) {
+ return $snapshot;
+ }
+
+ throw new InvalidArgumentException('The app object has not been initialized');
+ }
+
+ /**
+ * Get config snapshot.
+ */
+ public function getConfig()
+ {
+ return $this->config;
+ }
+
+ public function getEvent()
+ {
+ return $this->event;
+ }
+
+ public function getServices()
+ {
+ return $this->services;
+ }
+
+}
diff --git a/src/Service.php b/src/Service.php
new file mode 100644
index 0000000..9f4946b
--- /dev/null
+++ b/src/Service.php
@@ -0,0 +1,23 @@
+
+// +----------------------------------------------------------------------
+namespace think\worker;
+
+use think\worker\command\Server;
+
+class Service extends \think\Service
+{
+ public function boot()
+ {
+ $this->commands([
+ Server::class,
+ ]);
+ }
+}
diff --git a/src/Watcher.php b/src/Watcher.php
new file mode 100644
index 0000000..f538b0a
--- /dev/null
+++ b/src/Watcher.php
@@ -0,0 +1,33 @@
+app->config->get('worker.hot_update.' . $name, $default);
+ }
+
+ protected function resolveParams($name): array
+ {
+ return [
+ array_filter($this->getConfig('include', []), function ($dir) {
+ return is_dir($dir);
+ }),
+ $this->getConfig('exclude', []),
+ $this->getConfig('name', []),
+ ];
+ }
+
+ public function getDefaultDriver()
+ {
+ return $this->getConfig('type', 'scan');
+ }
+}
diff --git a/src/Websocket.php b/src/Websocket.php
new file mode 100644
index 0000000..9533c47
--- /dev/null
+++ b/src/Websocket.php
@@ -0,0 +1,121 @@
+app = $app;
+ $this->room = $room;
+ $this->event = $event;
+ $this->connection = $connection;
+ }
+
+ /**
+ * @return Pusher
+ */
+ protected function makePusher()
+ {
+ return $this->app->invokeClass(Pusher::class);
+ }
+
+ public function to(...$values)
+ {
+ return $this->makePusher()->to(...$values);
+ }
+
+ public function push($data)
+ {
+ $this->makePusher()->to($this->getSender())->push($data);
+ }
+
+ public function emit(string $event, ...$data)
+ {
+ $this->makePusher()->to($this->getSender())->emit($event, ...$data);
+ }
+
+ public function join(...$rooms): self
+ {
+ $this->room->add($this->getSender(), ...$rooms);
+
+ return $this;
+ }
+
+ public function leave(...$rooms): self
+ {
+ $this->room->delete($this->getSender(), ...$rooms);
+
+ return $this;
+ }
+
+ public function setConnected($connected)
+ {
+ $this->connected = $connected;
+ }
+
+ public function isEstablished()
+ {
+ return $this->connected;
+ }
+
+ public function close()
+ {
+ if ($this->connection) {
+ $this->connection->close();
+ }
+ }
+
+ public function setSender(string $fd)
+ {
+ $this->sender = $fd;
+ return $this;
+ }
+
+ public function getSender()
+ {
+ if (empty($this->sender)) {
+ throw new RuntimeException('Cannot use websocket as current client before handshake!');
+ }
+ return $this->sender;
+ }
+}
diff --git a/src/Worker.php b/src/Worker.php
new file mode 100644
index 0000000..ab1b144
--- /dev/null
+++ b/src/Worker.php
@@ -0,0 +1,19 @@
+
+// +----------------------------------------------------------------------
+
+namespace think\worker\command;
+
+use think\console\Command;
+use think\worker\Manager;
+
+/**
+ * Worker Server 命令行类
+ */
+class Server extends Command
+{
+ protected $config = [];
+
+ public function configure()
+ {
+ $this->setName('worker')
+ ->setDescription('Workerman Server for ThinkPHP');
+ }
+
+ public function handle(Manager $manager)
+ {
+ $manager->start();
+ }
+
+}
diff --git a/src/concerns/InteractsWithConduit.php b/src/concerns/InteractsWithConduit.php
new file mode 100644
index 0000000..29e6ff9
--- /dev/null
+++ b/src/concerns/InteractsWithConduit.php
@@ -0,0 +1,20 @@
+conduit = $this->container->make(Conduit::class);
+ $this->conduit->prepare();
+ $this->onEvent('workerStart', function () {
+ $this->app->instance(Conduit::class, $this->conduit);
+ });
+ }
+}
diff --git a/src/concerns/InteractsWithHttp.php b/src/concerns/InteractsWithHttp.php
new file mode 100644
index 0000000..332a18a
--- /dev/null
+++ b/src/concerns/InteractsWithHttp.php
@@ -0,0 +1,321 @@
+getConfig('http.enable', true)) {
+
+ $this->wsEnable = $this->getConfig('websocket.enable', false);
+
+ if ($this->wsEnable) {
+ $this->prepareWebsocket();
+ }
+
+ $workerNum = $this->getConfig('http.worker_num', 4);
+ $this->addWorker([$this, 'createHttpServer'], 'http server', $workerNum);
+ }
+ }
+
+ public function createHttpServer()
+ {
+ $this->preloadHttp();
+
+ $host = $this->getConfig('http.host');
+ $port = $this->getConfig('http.port');
+ $options = $this->getConfig('http.options', []);
+
+ $server = new Worker("\\think\\worker\\protocols\\FlexHttp://{$host}:{$port}", $options);
+
+ $server->reusePort = true;
+
+ $server->onMessage = function (TcpConnection $connection, $data) {
+ if ($data instanceof WorkerRequest) {
+ if ($this->wsEnable && $this->isWebsocketRequest($data)) {
+ $this->onHandShake($connection, $data);
+ } else {
+ $this->onRequest($connection, $data);
+ }
+ } elseif ($data instanceof Frame) {
+ $this->onMessage($connection, $data);
+ }
+ };
+
+ $server->onClose = function (TcpConnection $connection) {
+ if ($this->wsEnable) {
+ $this->onClose($connection);
+ }
+ };
+
+ $server->listen();
+ }
+
+ protected function preloadHttp()
+ {
+ $http = $this->app->http;
+ $this->app->invokeMethod([$http, 'loadMiddleware'], [], true);
+
+ if ($this->app->config->get('app.with_route', true)) {
+ $this->app->invokeMethod([$http, 'loadRoutes'], [], true);
+ $route = clone $this->app->route;
+ unset($this->app->route);
+
+ $this->app->resolving(WorkerHttp::class, function ($http, App $app) use ($route) {
+ $newRoute = clone $route;
+ $this->modifyProperty($newRoute, $app);
+ $app->instance('route', $newRoute);
+ });
+ }
+
+ $middleware = clone $this->app->middleware;
+ unset($this->app->middleware);
+
+ $this->app->resolving(WorkerHttp::class, function ($http, App $app) use ($middleware) {
+ $newMiddleware = clone $middleware;
+ $this->modifyProperty($newMiddleware, $app);
+ $app->instance('middleware', $newMiddleware);
+ });
+
+ unset($this->app->http);
+ $this->app->bind(Http::class, WorkerHttp::class);
+ }
+
+ public function onRequest(TcpConnection $connection, WorkerRequest $wkRequest)
+ {
+ $this->runInSandbox(function (Http $http, Event $event, WorkerApp $app) use ($connection, $wkRequest) {
+
+ $app->setInConsole(false);
+
+ $request = $this->prepareRequest($wkRequest);
+
+ try {
+ $response = $this->handleRequest($http, $request);
+ $this->prepareResponse($response);
+ } catch (Throwable $e) {
+ $handle = $app->make(Handle::class);
+ $handle->report($e);
+ $response = $handle->render($request, $e);
+ }
+
+ $this->sendResponse($connection, $request, $response, $app->cookie);
+
+ //关闭连接
+ $connection->close();
+
+ $http->end($response);
+ });
+ }
+
+ protected function handleRequest(Http $http, $request)
+ {
+ $level = ob_get_level();
+ ob_start();
+
+ $response = $http->run($request);
+
+ if (ob_get_length() > 0) {
+ $content = $response->getContent();
+ $response->content(ob_get_contents() . $content);
+ }
+
+ while (ob_get_level() > $level) {
+ ob_end_clean();
+ }
+
+ return $response;
+ }
+
+ protected function prepareRequest(WorkerRequest $wkRequest)
+ {
+ $header = $wkRequest->header();
+ $server = [];
+
+ foreach ($header as $key => $value) {
+ $server['http_' . str_replace('-', '_', $key)] = $value;
+ }
+
+ // 重新实例化请求对象 处理请求数据
+ /** @var \think\Request $request */
+ $request = $this->app->make('request', [], true);;
+
+ $queryString = $wkRequest->queryString();
+
+ return $request
+ ->setMethod($wkRequest->method())
+ ->withHeader($header)
+ ->withServer($server)
+ ->withGet($wkRequest->get())
+ ->withPost($wkRequest->post())
+ ->withCookie($wkRequest->cookie())
+ ->withFiles($wkRequest->file())
+ ->withInput($wkRequest->rawBody())
+ ->setBaseUrl($wkRequest->uri())
+ ->setUrl($wkRequest->uri() . (!empty($queryString) ? '?' . $queryString : ''))
+ ->setPathinfo(ltrim($wkRequest->path(), '/'));
+ }
+
+ protected function prepareResponse(\think\Response $response)
+ {
+ switch (true) {
+ case $response instanceof View:
+ $response->getContent();
+ break;
+ }
+ }
+
+ protected function sendResponse(TcpConnection $connection, \think\Request $request, \think\Response $response, Cookie $cookie)
+ {
+ switch (true) {
+ case $response instanceof IteratorResponse:
+ $this->sendIterator($connection, $response, $cookie);
+ break;
+ case $response instanceof FileResponse:
+ $this->sendFile($connection, $request, $response, $cookie);
+ break;
+ default:
+ $this->sendContent($connection, $response, $cookie);
+ }
+ }
+
+ protected function sendIterator(TcpConnection $connection, IteratorResponse $response, Cookie $cookie)
+ {
+ $wkResponse = $this->createResponse($response, $cookie);
+ $connection->send($wkResponse);
+
+ foreach ($response as $content) {
+ $connection->send($content, true);
+ }
+ }
+
+ protected function sendFile(TcpConnection $connection, \think\Request $request, FileResponse $response, Cookie $cookie)
+ {
+ $ifNoneMatch = $request->header('If-None-Match');
+ $ifRange = $request->header('If-Range');
+
+ $code = $response->getCode();
+ $file = $response->getFile();
+ $eTag = $response->getHeader('ETag');
+ $lastModified = $response->getHeader('Last-Modified');
+
+ $fileSize = $file->getSize();
+ $offset = 0;
+ $length = -1;
+
+ if ($ifNoneMatch == $eTag) {
+ $code = 304;
+ } elseif (!$ifRange || $ifRange === $eTag || $ifRange === $lastModified) {
+ $range = $request->header('Range', '');
+ if (Str::startsWith($range, 'bytes=')) {
+ [$start, $end] = explode('-', substr($range, 6), 2) + [0];
+
+ $end = ('' === $end) ? $fileSize - 1 : (int) $end;
+
+ if ('' === $start) {
+ $start = $fileSize - $end;
+ $end = $fileSize - 1;
+ } else {
+ $start = (int) $start;
+ }
+
+ if ($start <= $end) {
+ $end = min($end, $fileSize - 1);
+ if ($start < 0 || $start > $end) {
+ $code = 416;
+ $response->header([
+ 'Content-Range' => sprintf('bytes */%s', $fileSize),
+ ]);
+ } elseif ($end - $start < $fileSize - 1) {
+ $length = $end < $fileSize ? $end - $start + 1 : -1;
+ $offset = $start;
+ $code = 206;
+ $response->header([
+ 'Content-Range' => sprintf('bytes %s-%s/%s', $start, $end, $fileSize),
+ 'Content-Length' => $end - $start + 1,
+ ]);
+ }
+ }
+ }
+ }
+
+ $wkResponse = $this->createResponse($response, $cookie);
+
+ if ($code >= 200 && $code < 300 && $length !== 0) {
+ $wkResponse->withFile($file->getPathname(), $offset, $length);
+ }
+
+ $connection->send($wkResponse);
+ }
+
+ protected function sendContent(TcpConnection $connection, \think\Response $response, Cookie $cookie)
+ {
+ $response->header(['Transfer-Encoding' => 'chunked']);
+
+ $wkResponse = $this->createResponse($response, $cookie);
+
+ $connection->send($wkResponse);
+
+ $content = $response->getContent();
+ if ($content) {
+ $contentSize = strlen($content);
+ $chunkSize = 8192;
+
+ if ($contentSize > $chunkSize) {
+ $sendSize = 0;
+ do {
+ if (!$connection->send(new Chunk(substr($content, $sendSize, $chunkSize)))) {
+ break;
+ }
+ } while (($sendSize += $chunkSize) < $contentSize);
+ } else {
+ $connection->send(new Chunk($content));
+ }
+ }
+ $connection->send(new Chunk(''));
+ }
+
+ protected function createResponse(\think\Response $response, Cookie $cookie, $body = '')
+ {
+ $code = $response->getCode();
+ $header = $response->getHeader();
+
+ $wkResponse = new Response($code, $header, $body);
+
+ foreach ($cookie->getCookie() as $name => $val) {
+ [$value, $expire, $option] = $val;
+ $wkResponse->cookie($name, $value, $expire, $option['path'], $option['domain'], (bool) $option['secure'], (bool) $option['httponly'], $option['samesite']);
+ }
+
+ return $wkResponse;
+ }
+}
diff --git a/src/concerns/InteractsWithQueue.php b/src/concerns/InteractsWithQueue.php
new file mode 100644
index 0000000..2b7cf00
--- /dev/null
+++ b/src/concerns/InteractsWithQueue.php
@@ -0,0 +1,97 @@
+getConfig('queue.workers', []);
+
+ foreach ($workers as $queue => $options) {
+
+ if (strpos($queue, '@') !== false) {
+ [$queue, $connection] = explode('@', $queue);
+ } else {
+ $connection = null;
+ }
+
+ $workerNum = Arr::get($options, 'worker_num', 1);
+
+ $this->addWorker(function () use ($options, $connection, $queue) {
+ $delay = Arr::get($options, 'delay', 0);
+ $sleep = Arr::get($options, 'sleep', 3);
+ $tries = Arr::get($options, 'tries', 0);
+ $timeout = Arr::get($options, 'timeout', 60);
+ $qWorker = $this->app->make(Worker::class);
+
+ if ($this->supportsAsyncSignals()) {
+ pcntl_signal(SIGALRM, function () {
+ \think\worker\Worker::stopAll();
+ });
+
+ pcntl_alarm($timeout);
+ }
+
+ $this->createRunTimer(function () use ($connection, $queue, $delay, $sleep, $tries, $qWorker) {
+ $this->runInSandbox(function () use ($connection, $queue, $delay, $sleep, $tries, $qWorker) {
+ $qWorker->runNextJob($connection, $queue, $delay, $sleep, $tries);
+ });
+ if ($this->supportsAsyncSignals()) {
+ pcntl_alarm(0);
+ }
+ });
+ }, "queue [$queue]", $workerNum);
+ }
+ }
+
+ protected function supportsAsyncSignals()
+ {
+ return extension_loaded('pcntl');
+ }
+
+ protected function createRunTimer($func)
+ {
+ Timer::add(0.1, function () use ($func) {
+ $func();
+ $this->createRunTimer($func);
+ }, [], false);
+ }
+
+ protected function prepareQueue()
+ {
+ if ($this->getConfig('queue.enable', false)) {
+ $this->listenForEvents();
+ $this->createQueueWorkers();
+ }
+ }
+
+ /**
+ * 注册事件
+ */
+ protected function listenForEvents()
+ {
+ $this->container->event->listen(JobFailed::class, function (JobFailed $event) {
+ $this->logFailedJob($event);
+ });
+ }
+
+ /**
+ * 记录失败任务
+ * @param JobFailed $event
+ */
+ protected function logFailedJob(JobFailed $event)
+ {
+ $this->container['queue.failer']->log(
+ $event->connection,
+ $event->job->getQueue(),
+ $event->job->getRawBody(),
+ $event->exception
+ );
+ }
+}
diff --git a/src/concerns/InteractsWithServer.php b/src/concerns/InteractsWithServer.php
new file mode 100644
index 0000000..a01809e
--- /dev/null
+++ b/src/concerns/InteractsWithServer.php
@@ -0,0 +1,119 @@
+name = $name;
+ $worker->count = $count;
+
+ $worker->onWorkerStart = function (Worker $worker) use ($func) {
+ $this->clearCache();
+ $this->prepareApplication();
+
+ $this->conduit->connect();
+
+ $this->workerId = $this->ipc->listenMessage();
+
+ $this->triggerEvent('workerStart', $worker);
+
+ $func($worker);
+ };
+
+ $worker->onWorkerReload = function () {
+ $this->stopping = true;
+ };
+
+ return $worker;
+ }
+
+ public function getWorkerId()
+ {
+ return $this->workerId;
+ }
+
+ public function isStopping()
+ {
+ return $this->stopping;
+ }
+
+ /**
+ * 启动服务
+ */
+ public function start(): void
+ {
+ $this->initialize();
+ $this->prepareIpc();
+ $this->triggerEvent('init');
+
+ //热更新
+ if ($this->getConfig('hot_update.enable', false)) {
+ $this->addHotUpdateWorker();
+ }
+
+ Worker::runAll();
+ }
+
+ protected function prepareIpc()
+ {
+ $this->ipc = $this->container->make(Ipc::class);
+ }
+
+ public function sendMessage($workerId, $message)
+ {
+ $this->ipc->sendMessage($workerId, $message);
+ }
+
+ /**
+ * 热更新
+ */
+ protected function addHotUpdateWorker()
+ {
+ $worker = new Worker();
+
+ $worker->name = 'hot update';
+ $worker->reloadable = false;
+
+ $worker->onWorkerStart = function () {
+ $watcher = $this->container->make(Watcher::class);
+ $watcher->watch(function () {
+ posix_kill(posix_getppid(), SIGUSR1);
+ });
+ };
+ }
+
+ /**
+ * 清除apc、op缓存
+ */
+ protected function clearCache()
+ {
+ if (extension_loaded('apc')) {
+ apc_clear_cache();
+ }
+
+ if (extension_loaded('Zend OPcache')) {
+ opcache_reset();
+ }
+ }
+
+}
diff --git a/src/concerns/InteractsWithWebsocket.php b/src/concerns/InteractsWithWebsocket.php
new file mode 100644
index 0000000..486a89e
--- /dev/null
+++ b/src/concerns/InteractsWithWebsocket.php
@@ -0,0 +1,145 @@
+onEvent('workerStart', function () {
+ $handlerClass = $this->getConfig('websocket.handler', Handler::class);
+ $this->app->bind(HandlerInterface::class, $handlerClass);
+
+ $this->onEvent('message', function ($message) {
+ if ($message instanceof PushMessage) {
+ if (isset($this->messageSender[$message->to])) {
+ $this->messageSender[$message->to]($message->data);
+ }
+ }
+ });
+ });
+ }
+
+ public function onHandShake(TcpConnection $connection, WorkerRequest $wkRequest)
+ {
+ $this->runInSandbox(function (App $app, Http $http, Event $event) use ($connection, $wkRequest) {
+ $request = $this->prepareRequest($wkRequest);
+
+ $response = $http->run($request);
+ if (!$response instanceof \think\worker\response\Websocket) {
+ $connection->close();
+ return;
+ }
+
+ $event->subscribe([$response]);
+ $this->upgrade($connection, $wkRequest);
+
+ $websocket = $app->make(Websocket::class, [$connection], true);
+ $app->instance(Websocket::class, $websocket);
+
+ $id = "{$this->workerId}.{$connection->id}";
+
+ $websocket->setSender($id);
+ $websocket->join($id);
+
+ $handler = $app->make(HandlerInterface::class);
+
+ $this->messageSender[$connection->id] = function ($data) use ($connection, $handler) {
+ $connection->send($handler->encodeMessage($data));
+ };
+
+ try {
+ $handler->onOpen($request);
+ } catch (Throwable $e) {
+ $this->logServerError($e);
+ }
+ }, $connection);
+ }
+
+ public function onMessage(TcpConnection $connection, Frame $frame)
+ {
+ $this->runInSandbox(function (App $app) use ($frame) {
+ $handler = $app->make(HandlerInterface::class);
+ try {
+ $handler->onMessage($frame);
+ } catch (Throwable $e) {
+ $this->logServerError($e);
+ }
+ }, $connection);
+ }
+
+ public function onClose(TcpConnection $connection)
+ {
+ $this->runInSandbox(function (App $app) use ($connection) {
+ if ($app->exists(Websocket::class)) {
+ $websocket = $app->make(Websocket::class);
+ $handler = $app->make(HandlerInterface::class);
+ try {
+ $handler->onClose();
+ } catch (Throwable $e) {
+ $this->logServerError($e);
+ }
+
+ // leave all rooms
+ $websocket->leave();
+
+ unset($this->messageSender[$connection->id]);
+
+ $websocket->setConnected(false);
+ }
+ }, $connection);
+ }
+
+ protected function isWebsocketRequest(WorkerRequest $request)
+ {
+ $header = $request->header();
+ return strcasecmp(Arr::get($header, 'connection', ''), 'upgrade') === 0 &&
+ strcasecmp(Arr::get($header, 'upgrade', ''), 'websocket') === 0;
+ }
+
+ protected function upgrade(TcpConnection $connection, WorkerRequest $request)
+ {
+ $key = $request->header('Sec-WebSocket-Key');
+
+ $headers = [
+ 'Upgrade' => 'websocket',
+ 'Sec-WebSocket-Version' => '13',
+ 'Connection' => 'Upgrade',
+ 'Sec-WebSocket-Accept' => base64_encode(sha1($key . '258EAFA5-E914-47DA-95CA-C5AB0DC85B11', true)),
+ ];
+
+ if ($protocol = $request->header('Sec-Websocket-Protocol')) {
+ $headers['Sec-WebSocket-Protocol'] = $protocol;
+ }
+
+ $response = new Response(101, $headers);
+
+ $connection->send($response);
+
+ // Websocket data buffer.
+ $connection->context->websocketDataBuffer = '';
+ // Current websocket frame length.
+ $connection->context->websocketCurrentFrameLength = 0;
+ // Current websocket frame data.
+ $connection->context->websocketCurrentFrameBuffer = '';
+
+ $connection->context->websocketHandshake = true;
+ }
+}
diff --git a/src/concerns/ModifyProperty.php b/src/concerns/ModifyProperty.php
new file mode 100644
index 0000000..83d9a4e
--- /dev/null
+++ b/src/concerns/ModifyProperty.php
@@ -0,0 +1,20 @@
+hasProperty($property)) {
+ $reflectProperty = $reflectObject->getProperty($property);
+ if(PHP_VERSION_ID < 80100) {
+ $reflectProperty->setAccessible(true);
+ }
+ $reflectProperty->setValue($object, $value);
+ }
+ }
+}
diff --git a/src/concerns/WithApplication.php b/src/concerns/WithApplication.php
new file mode 100644
index 0000000..00b68fb
--- /dev/null
+++ b/src/concerns/WithApplication.php
@@ -0,0 +1,77 @@
+app instanceof WorkerApp) {
+ $this->app = new WorkerApp($this->container->getRootPath());
+
+ $this->app->bind(WorkerApp::class, App::class);
+ $this->app->bind(Manager::class, $this);
+
+ $this->app->initialize();
+ $this->app->instance('request', $this->container->request);
+ $this->prepareConcretes();
+ }
+ }
+
+ /**
+ * 预加载
+ */
+ protected function prepareConcretes()
+ {
+ $defaultConcretes = ['db', 'cache', 'event'];
+
+ $concretes = array_merge($defaultConcretes, $this->getConfig('concretes', []));
+
+ foreach ($concretes as $concrete) {
+ $this->app->make($concrete);
+ }
+ }
+
+ public function getApp()
+ {
+ return $this->app;
+ }
+
+ /**
+ * 获取沙箱
+ * @return Sandbox
+ */
+ protected function getSandbox()
+ {
+ return $this->app->make(Sandbox::class);
+ }
+
+ /**
+ * 在沙箱中执行
+ * @param Closure $callable
+ */
+ public function runInSandbox(Closure $callable, ?object $key = null)
+ {
+ try {
+ $this->getSandbox()->run($callable, $key);
+ } catch (Throwable $e) {
+ $this->logServerError($e);
+ }
+ }
+}
diff --git a/src/concerns/WithContainer.php b/src/concerns/WithContainer.php
new file mode 100644
index 0000000..07152bc
--- /dev/null
+++ b/src/concerns/WithContainer.php
@@ -0,0 +1,75 @@
+container = $container;
+ }
+
+ protected function getContainer()
+ {
+ return $this->container;
+ }
+
+ /**
+ * 获取配置
+ * @param string $name
+ * @param mixed $default
+ * @return mixed
+ */
+ public function getConfig(string $name, $default = null)
+ {
+ return $this->container->config->get("worker.{$name}", $default);
+ }
+
+ /**
+ * 触发事件
+ * @param string $event
+ * @param mixed $params
+ */
+ public function triggerEvent(string $event, $params = null): void
+ {
+ $this->container->event->trigger("worker.{$event}", $params);
+ }
+
+ /**
+ * 监听事件
+ * @param string $event
+ * @param $listener
+ * @param bool $first
+ */
+ public function onEvent(string $event, $listener, bool $first = false): void
+ {
+ $this->container->event->listen("worker.{$event}", $listener, $first);
+ }
+
+ /**
+ * Log server error.
+ *
+ * @param Throwable $e
+ */
+ public function logServerError(Throwable $e)
+ {
+ /** @var Handle $handle */
+ $handle = $this->container->make(Handle::class);
+
+ $handle->report($e);
+ }
+}
diff --git a/src/conduit/Driver.php b/src/conduit/Driver.php
new file mode 100644
index 0000000..07bd309
--- /dev/null
+++ b/src/conduit/Driver.php
@@ -0,0 +1,26 @@
+ */
+ protected $suspensions = [];
+ protected $events = [];
+
+ public function __construct(protected Manager $manager)
+ {
+ $filename = runtime_path() . 'conduit.sock';
+ @unlink($filename);
+ $this->domain = "unix://{$filename}";
+ }
+
+ public function prepare()
+ {
+ //启动服务端
+ Server::run($this->domain);
+ }
+
+ public function connect()
+ {
+ $suspension = EventLoop::getSuspension();
+ $this->connection = $this->createConnection($suspension);
+ $suspension->suspend();
+
+ Timer::add($this->pingInterval, function () {
+ if ($this->connection) {
+ $this->connection->send('');
+ }
+ });
+
+ Timer::add(1, function () {
+ //检查是否超时
+ foreach ($this->suspensions as $id => $suspension) {
+ if (time() - $suspension[1] > 10) {
+ $suspension[0]->throw(new Exception('conduit connection is timeout'));
+ unset($this->suspensions[$id]);
+ }
+ }
+ });
+ }
+
+ public function get(string $name)
+ {
+ return $this->sendAndRecv(Command::create('get', $name));
+ }
+
+ public function set(string $name, $value)
+ {
+ $this->send(Command::create('set', $name, $value));
+ }
+
+ public function inc(string $name, int $step = 1)
+ {
+ return $this->sendAndRecv(Command::create('inc', $name, $step));
+ }
+
+ public function sAdd(string $name, ...$value)
+ {
+ $this->send(Command::create('sAdd', $name, $value));
+ }
+
+ public function sRem(string $name, $value)
+ {
+ $this->send(Command::create('sRem', $name, $value));
+ }
+
+ public function sMembers(string $name)
+ {
+ return $this->sendAndRecv(Command::create('sMembers', $name));
+ }
+
+ public function publish(string $name, $value)
+ {
+ $this->send(Command::create('publish', $name, $value));
+ }
+
+ public function subscribe(string $name, $callback)
+ {
+ $this->send(Command::create('subscribe', $name));
+ $this->events[$name] = $callback;
+ }
+
+ protected function sendAndRecv(Command $command)
+ {
+ $suspension = EventLoop::getSuspension();
+
+ $id = $this->id++;
+
+ $command->id = $id;
+
+ $this->suspensions[$id] = [$suspension, time()];
+
+ $this->send($command);
+
+ return $suspension->suspend();
+ }
+
+ protected function send(Command $command)
+ {
+ if (!$this->connection) {
+ throw new Exception('conduit connection is disconnected');
+ }
+
+ $this->connection->send(serialize($command));
+ }
+
+ protected function createConnection(?Suspension $suspension = null)
+ {
+ $connection = new AsyncTcpConnection($this->domain);
+
+ $connection->protocol = Frame::class;
+
+ $connection->onConnect = function () use ($suspension) {
+ $this->clearTimer();
+ if ($suspension) {
+ $suspension->resume();
+ }
+ //补订阅
+ foreach ($this->events as $name => $callback) {
+ $this->send(Command::create('subscribe', $name));
+ }
+ };
+
+ $connection->onMessage = function ($connection, $buffer) {
+ /** @var Result|Event $result */
+ $result = unserialize($buffer);
+
+ if ($result instanceof Event) {
+ if (isset($this->events[$result->name])) {
+ $this->events[$result->name]($result->data);
+ }
+ } elseif (isset($result->id) && isset($this->suspensions[$result->id])) {
+ [$suspension] = $this->suspensions[$result->id];
+ $suspension->resume($result->data);
+ unset($this->suspensions[$result->id]);
+ }
+ };
+
+ $connection->onClose = function () {
+ $this->connection = null;
+ //重连
+ $this->clearTimer();
+ $this->reconnectTimer = Timer::add(1, function () {
+ $this->connection = $this->createConnection();
+ });
+ };
+
+ $connection->connect();
+
+ return $connection;
+ }
+
+ protected function clearTimer()
+ {
+ if ($this->reconnectTimer) {
+ Timer::del($this->reconnectTimer);
+ $this->reconnectTimer = null;
+ }
+ }
+}
diff --git a/src/conduit/driver/socket/Command.php b/src/conduit/driver/socket/Command.php
new file mode 100644
index 0000000..9c6450f
--- /dev/null
+++ b/src/conduit/driver/socket/Command.php
@@ -0,0 +1,22 @@
+name = $name;
+ $packet->key = $key;
+ $packet->data = $data;
+
+ return $packet;
+ }
+}
diff --git a/src/conduit/driver/socket/Event.php b/src/conduit/driver/socket/Event.php
new file mode 100644
index 0000000..dc6b0c5
--- /dev/null
+++ b/src/conduit/driver/socket/Event.php
@@ -0,0 +1,15 @@
+ */
+ protected $subscribers = [];
+
+ public function onMessage(TcpConnection $connection, $buffer)
+ {
+ if (empty($buffer)) {
+ return;
+ }
+ /** @var Command $command */
+ $command = unserialize($buffer);
+
+ $result = Result::create($command->id);
+
+ switch ($command->name) {
+ case 'get':
+ $result->data = $this->data[$command->key] ?? null;
+ break;
+ case 'set':
+ $this->data[$command->key] = $command->data;
+ break;
+ case 'inc':
+ if (!isset($this->data[$command->key]) || !is_integer($this->data[$command->key])) {
+ $this->data[$command->key] = 0;
+ }
+ $result->data = $this->data[$command->key] += $command->data ?? 1;
+ break;
+ case 'sAdd':
+ if (!isset($this->data[$command->key]) || !is_array($this->data[$command->key])) {
+ $this->data[$command->key] = [];
+ }
+ $this->data[$command->key] = array_merge($this->data[$command->key], $command->data);
+ break;
+ case 'sRem':
+ if (!isset($this->data[$command->key]) || !is_array($this->data[$command->key])) {
+ $this->data[$command->key] = [];
+ }
+ $this->data[$command->key] = array_diff($this->data[$command->key], [$command->data]);
+ break;
+ case 'sMembers':
+ if (!isset($this->data[$command->key]) || !is_array($this->data[$command->key])) {
+ $this->data[$command->key] = [];
+ }
+ $result->data = $this->data[$command->key];
+ break;
+ case 'subscribe':
+ if (!isset($this->subscribers[$command->key])) {
+ $this->subscribers[$command->key] = [];
+ }
+ $this->subscribers[$command->key][] = $connection;
+ break;
+ case 'publish':
+ if (!empty($this->subscribers[$command->key])) {
+ foreach ($this->subscribers[$command->key] as $conn) {
+ $conn->send(serialize(Event::create($command->key, $command->data)));
+ }
+ }
+ break;
+ }
+
+ if (isset($result->id)) {
+ $connection->send(serialize($result));
+ }
+ }
+
+ public function onClose(TcpConnection $connection)
+ {
+ if (!empty($this->subscribers)) {
+ foreach ($this->subscribers as $key => $connections) {
+ $this->subscribers[$key] = array_udiff($connections, [$connection], function ($a, $b) {
+ return $a <=> $b;
+ });
+ }
+ }
+ }
+
+ public static function run($domain)
+ {
+ //启动服务端
+ $server = new self();
+
+ $worker = new Worker($domain);
+
+ $worker->name = 'conduit';
+ $worker->protocol = Frame::class;
+ $worker->reloadable = false;
+
+ $worker->onMessage = [$server, 'onMessage'];
+ $worker->onClose = [$server, 'onClose'];
+ }
+}
diff --git a/src/config/worker.php b/src/config/worker.php
new file mode 100644
index 0000000..b8bd04f
--- /dev/null
+++ b/src/config/worker.php
@@ -0,0 +1,39 @@
+
+// +----------------------------------------------------------------------
+
+use think\worker\websocket\Handler;
+
+return [
+ 'http' => [
+ 'enable' => true,
+ 'host' => '0.0.0.0',
+ 'port' => 8080,
+ 'worker_num' => 4,
+ 'options' => [],
+ ],
+ 'websocket' => [
+ 'enable' => false,
+ 'handler' => Handler::class,
+ 'ping_interval' => 25000,
+ 'ping_timeout' => 60000,
+ ],
+ //队列
+ 'queue' => [
+ 'enable' => false,
+ 'workers' => [],
+ ],
+ 'hot_update' => [
+ 'enable' => env('APP_DEBUG', false),
+ 'name' => ['*.php'],
+ 'include' => [app_path(), config_path(), root_path('route')],
+ 'exclude' => [],
+ ],
+];
diff --git a/src/contract/ResetterInterface.php b/src/contract/ResetterInterface.php
new file mode 100644
index 0000000..c9345e9
--- /dev/null
+++ b/src/contract/ResetterInterface.php
@@ -0,0 +1,17 @@
+to = $to;
+ $this->data = $data;
+ }
+}
diff --git a/src/protocols/FlexHttp.php b/src/protocols/FlexHttp.php
new file mode 100644
index 0000000..a87297c
--- /dev/null
+++ b/src/protocols/FlexHttp.php
@@ -0,0 +1,39 @@
+context->websocketHandshake)) {
+ return Http::input($buffer, $connection);
+ } else {
+ return Websocket::input($buffer, $connection);
+ }
+ }
+
+ public static function decode(string $buffer, TcpConnection $connection)
+ {
+ if (empty($connection->context->websocketHandshake)) {
+ return Http::decode($buffer, $connection);
+ } else {
+ $data = Websocket::decode($buffer, $connection);
+ return new Frame($connection->id, $data);
+ }
+ }
+
+ public static function encode(mixed $response, TcpConnection $connection): string
+ {
+ if (empty($connection->context->websocketHandshake)) {
+ return Http::encode($response, $connection);
+ } else {
+ return Websocket::encode($response, $connection);
+ }
+ }
+}
diff --git a/src/resetters/ClearInstances.php b/src/resetters/ClearInstances.php
new file mode 100644
index 0000000..9aa919f
--- /dev/null
+++ b/src/resetters/ClearInstances.php
@@ -0,0 +1,23 @@
+getConfig()->get('worker.instances', []));
+
+ foreach ($instances as $instance) {
+ $app->delete($instance);
+ }
+
+ return $app;
+ }
+}
diff --git a/src/resetters/ResetConfig.php b/src/resetters/ResetConfig.php
new file mode 100644
index 0000000..24c648c
--- /dev/null
+++ b/src/resetters/ResetConfig.php
@@ -0,0 +1,18 @@
+instance('config', clone $sandbox->getConfig());
+
+ return $app;
+ }
+}
diff --git a/src/resetters/ResetEvent.php b/src/resetters/ResetEvent.php
new file mode 100644
index 0000000..1530e0a
--- /dev/null
+++ b/src/resetters/ResetEvent.php
@@ -0,0 +1,22 @@
+getEvent();
+ $this->modifyProperty($event, $app);
+ $app->instance('event', $event);
+
+ return $app;
+ }
+}
diff --git a/src/resetters/ResetModel.php b/src/resetters/ResetModel.php
new file mode 100644
index 0000000..9899563
--- /dev/null
+++ b/src/resetters/ResetModel.php
@@ -0,0 +1,21 @@
+getSnapshot()->invoke(...$args);
+ });
+ }
+ }
+}
diff --git a/src/resetters/ResetPaginator.php b/src/resetters/ResetPaginator.php
new file mode 100644
index 0000000..3ef634a
--- /dev/null
+++ b/src/resetters/ResetPaginator.php
@@ -0,0 +1,30 @@
+getSnapshot()->request->baseUrl();
+ });
+
+ Paginator::currentPageResolver(function ($varPage = 'page') use ($sandbox) {
+
+ $page = $sandbox->getSnapshot()->request->param($varPage);
+
+ if (filter_var($page, FILTER_VALIDATE_INT) !== false && (int) $page >= 1) {
+ return (int) $page;
+ }
+
+ return 1;
+ });
+ }
+}
diff --git a/src/resetters/ResetService.php b/src/resetters/ResetService.php
new file mode 100644
index 0000000..6f6d2bc
--- /dev/null
+++ b/src/resetters/ResetService.php
@@ -0,0 +1,33 @@
+getServices() as $service) {
+ $this->modifyProperty($service, $app);
+ if (method_exists($service, 'register')) {
+ $service->register();
+ }
+ if (method_exists($service, 'boot')) {
+ $app->invoke([$service, 'boot']);
+ }
+ }
+ }
+
+}
diff --git a/src/response/File.php b/src/response/File.php
new file mode 100644
index 0000000..afcca87
--- /dev/null
+++ b/src/response/File.php
@@ -0,0 +1,112 @@
+ 'application/octet-stream',
+ 'Accept-Ranges' => 'bytes',
+ ];
+
+ /**
+ * @var SplFileInfo
+ */
+ protected $file;
+
+ public function __construct($file, ?string $contentDisposition = null, bool $autoEtag = true, bool $autoLastModified = true, bool $autoContentType = true)
+ {
+ $this->setFile($file, $contentDisposition, $autoEtag, $autoLastModified, $autoContentType);
+ }
+
+ public function getFile()
+ {
+ return $this->file;
+ }
+
+ public function setFile($file, ?string $contentDisposition = null, bool $autoEtag = true, bool $autoLastModified = true, bool $autoContentType = true)
+ {
+ if (!$file instanceof SplFileInfo) {
+ $file = new SplFileInfo((string) $file);
+ }
+
+ if (!$file->isReadable()) {
+ throw new RuntimeException('File must be readable.');
+ }
+
+ $this->header['Content-Length'] = $file->getSize();
+
+ $this->file = $file;
+
+ if ($autoEtag) {
+ $this->setAutoEtag();
+ }
+
+ if ($autoLastModified) {
+ $this->setAutoLastModified();
+ }
+
+ if ($contentDisposition) {
+ $this->setContentDisposition($contentDisposition);
+ }
+
+ if ($autoContentType) {
+ $this->setAutoContentType();
+ }
+
+ return $this;
+ }
+
+ public function setAutoContentType()
+ {
+ if (extension_loaded('fileinfo')) {
+ $finfo = finfo_open(FILEINFO_MIME_TYPE);
+
+ $mimeType = finfo_file($finfo, $this->file->getPathname());
+ if ($mimeType) {
+ $this->header['Content-Type'] = $mimeType;
+ }
+ }
+ }
+
+ public function setContentDisposition(string $disposition, string $filename = '')
+ {
+ if ('' === $filename) {
+ $filename = $this->file->getFilename();
+ }
+
+ $this->header['Content-Disposition'] = "{$disposition}; filename=\"{$filename}\"";
+
+ return $this;
+ }
+
+ public function setAutoLastModified()
+ {
+ $mTime = $this->file->getMTime();
+ if ($mTime) {
+ $date = DateTime::createFromFormat('U', (string) $mTime);
+ $this->lastModified($date->format('D, d M Y H:i:s') . ' GMT');
+ }
+ return $this;
+ }
+
+ public function setAutoEtag()
+ {
+ $eTag = "W/\"" . sha1_file($this->file->getPathname()) . "\"";
+
+ return $this->eTag($eTag);
+ }
+
+ protected function sendData(string $data): void
+ {
+ readfile($this->file->getPathname());
+ }
+}
diff --git a/src/response/Iterator.php b/src/response/Iterator.php
new file mode 100644
index 0000000..d74bb79
--- /dev/null
+++ b/src/response/Iterator.php
@@ -0,0 +1,22 @@
+iterator = $iterator;
+ }
+
+ public function getIterator(): Traversable
+ {
+ return $this->iterator;
+ }
+}
diff --git a/src/response/Websocket.php b/src/response/Websocket.php
new file mode 100644
index 0000000..614e01e
--- /dev/null
+++ b/src/response/Websocket.php
@@ -0,0 +1,66 @@
+listeners['Open'] = $listener;
+ return $this;
+ }
+
+ public function onMessage($listener)
+ {
+ $this->listeners['Message'] = $listener;
+ return $this;
+ }
+
+ public function onEvent($listener)
+ {
+ $this->listeners['Event'] = $listener;
+ return $this;
+ }
+
+ public function onClose($listener)
+ {
+ $this->listeners['Close'] = $listener;
+ return $this;
+ }
+
+ public function onConnect($listener)
+ {
+ $this->listeners['Connect'] = $listener;
+ return $this;
+ }
+
+ public function onDisconnect($listener)
+ {
+ $this->listeners['Disconnect'] = $listener;
+ return $this;
+ }
+
+ public function onPing($listener)
+ {
+ $this->listeners['Ping'] = $listener;
+ return $this;
+ }
+
+ public function onPong($listener)
+ {
+ $this->listeners['Pong'] = $listener;
+ return $this;
+ }
+
+ public function subscribe(Event $event)
+ {
+ foreach ($this->listeners as $eventName => $listener) {
+ $event->listen('worker.websocket.' . $eventName, $listener);
+ }
+ }
+}
diff --git a/src/watcher/Driver.php b/src/watcher/Driver.php
new file mode 100644
index 0000000..f02ca2c
--- /dev/null
+++ b/src/watcher/Driver.php
@@ -0,0 +1,8 @@
+directory = $directory;
+ $this->exclude = $exclude;
+ $this->name = $name;
+ }
+
+ public function watch(callable $callback)
+ {
+ $ms = 2000;
+ $seconds = ceil(($ms + 1000) / 1000);
+ $minutes = sprintf('-%.2f', $seconds / 60);
+
+ $dest = implode(' ', $this->directory);
+
+ $name = empty($this->name) ? '' : ' \( ' . join(' -o ', array_map(fn($v) => "-name \"{$v}\"", $this->name)) . ' \)';
+ $notName = '';
+ $notPath = '';
+ if (!empty($this->exclude)) {
+ $excludeDirs = $excludeFiles = [];
+ foreach ($this->exclude as $directory) {
+ $directory = rtrim($directory, '/');
+ if (is_dir($directory)) {
+ $excludeDirs[] = $directory;
+ } else {
+ $excludeFiles[] = $directory;
+ }
+ }
+
+ if (!empty($excludeFiles)) {
+ $notPath = ' -not \( ' . join(' -and ', array_map(fn($v) => "-name \"{$v}\"", $excludeFiles)) . ' \)';
+ }
+
+ if (!empty($excludeDirs)) {
+ $notPath = ' -not \( ' . join(' -and ', array_map(fn($v) => "-path \"{$v}/*\"", $excludeDirs)) . ' \)';
+ }
+ }
+
+ $command = "find {$dest}{$name}{$notName}{$notPath} -mmin {$minutes} -type f -print";
+
+ Timer::add($ms / 1000, function () use ($callback, $command) {
+ $stdout = $this->exec($command);
+ if (!empty($stdout)) {
+ call_user_func($callback);
+ }
+ });
+ }
+
+ public function exec($command)
+ {
+ $process = Process::fromShellCommandline($command);
+ $process->run();
+ if ($process->isSuccessful()) {
+ return $process->getOutput();
+ }
+ return false;
+ }
+
+}
diff --git a/src/watcher/Scan.php b/src/watcher/Scan.php
new file mode 100644
index 0000000..bacd478
--- /dev/null
+++ b/src/watcher/Scan.php
@@ -0,0 +1,52 @@
+finder = new Finder();
+ $this->finder
+ ->files()
+ ->name($name)
+ ->in($directory)
+ ->exclude($exclude);
+ }
+
+ protected function findFiles()
+ {
+ $files = [];
+ /** @var SplFileInfo $f */
+ foreach ($this->finder as $f) {
+ $files[$f->getRealpath()] = $f->getMTime();
+ }
+ return $files;
+ }
+
+ public function watch(callable $callback)
+ {
+ $this->files = $this->findFiles();
+
+ Timer::add(2, function () use ($callback) {
+ $files = $this->findFiles();
+
+ foreach ($files as $path => $time) {
+ if (empty($this->files[$path]) || $this->files[$path] != $time) {
+ call_user_func($callback);
+ break;
+ }
+ }
+
+ $this->files = $files;
+ });
+ }
+}
diff --git a/src/websocket/Event.php b/src/websocket/Event.php
new file mode 100644
index 0000000..411a420
--- /dev/null
+++ b/src/websocket/Event.php
@@ -0,0 +1,15 @@
+type = $type;
+ $this->data = $data;
+ }
+}
diff --git a/src/websocket/Frame.php b/src/websocket/Frame.php
new file mode 100644
index 0000000..82bce04
--- /dev/null
+++ b/src/websocket/Frame.php
@@ -0,0 +1,10 @@
+event = $event;
+ }
+
+ /**
+ * "onOpen" listener.
+ *
+ * @param Request $request
+ */
+ public function onOpen(Request $request)
+ {
+ $this->event->trigger('worker.websocket.Open', $request);
+ }
+
+ /**
+ * "onMessage" listener.
+ *
+ * @param Frame $frame
+ */
+ public function onMessage(Frame $frame)
+ {
+ $this->event->trigger('worker.websocket.Message', $frame);
+
+ $event = $this->decode($frame->data);
+ if ($event) {
+ $this->event->trigger('worker.websocket.Event', $event);
+ }
+ }
+
+ /**
+ * "onClose" listener.
+ */
+ public function onClose()
+ {
+ $this->event->trigger('worker.websocket.Close');
+ }
+
+ protected function decode($payload)
+ {
+ $data = json_decode($payload, true);
+ if (!empty($data['type'])) {
+ return new WsEvent($data['type'], $data['data'] ?? null);
+ }
+ return null;
+ }
+
+ public function encodeMessage($message)
+ {
+ if ($message instanceof WsEvent) {
+ return json_encode([
+ 'type' => $message->type,
+ 'data' => $message->data,
+ ]);
+ }
+ return $message;
+ }
+}
diff --git a/src/websocket/Pusher.php b/src/websocket/Pusher.php
new file mode 100644
index 0000000..0fcfaf6
--- /dev/null
+++ b/src/websocket/Pusher.php
@@ -0,0 +1,67 @@
+manager = $manager;
+ $this->room = $room;
+ }
+
+ public function to(...$values)
+ {
+ foreach ($values as $value) {
+ if (is_array($value)) {
+ $this->to(...$value);
+ } elseif (!in_array($value, $this->to)) {
+ $this->to[] = $value;
+ }
+ }
+
+ return $this;
+ }
+
+ /**
+ * Push message to related descriptors
+ * @param $data
+ * @return void
+ */
+ public function push($data): void
+ {
+ $fds = [];
+
+ foreach ($this->to as $room) {
+ $clients = $this->room->getClients($room);
+ if (!empty($clients)) {
+ $fds = array_merge($fds, $clients);
+ }
+ }
+
+ foreach (array_unique($fds) as $fd) {
+ [$workerId, $fd] = explode('.', $fd);
+ $this->manager->sendMessage((int) $workerId, new PushMessage((int) $fd, $data));
+ }
+ }
+
+ public function emit(string $event, ...$data): void
+ {
+ $this->push(new Event($event, $data));
+ }
+}
diff --git a/src/websocket/Room.php b/src/websocket/Room.php
new file mode 100644
index 0000000..3851ff3
--- /dev/null
+++ b/src/websocket/Room.php
@@ -0,0 +1,54 @@
+conduit->sAdd($this->getClientKey($fd), ...$rooms);
+
+ foreach ($rooms as $room) {
+ $this->conduit->sAdd($this->getRoomKey($room), $fd);
+ }
+ }
+
+ public function delete($fd, ...$rooms)
+ {
+ $rooms = count($rooms) ? $rooms : $this->getRooms($fd);
+
+ $this->conduit->sRem($this->getClientKey($fd), ...$rooms);
+
+ foreach ($rooms as $room) {
+ $this->conduit->sRem($this->getRoomKey($room), $fd);
+ }
+ }
+
+ public function getClients(string $room)
+ {
+ return $this->conduit->sMembers($this->getRoomKey($room)) ?: [];
+ }
+
+ public function getRooms(string $fd)
+ {
+ return $this->conduit->sMembers($this->getClientKey($fd)) ?: [];
+ }
+
+ protected function getClientKey(string $key)
+ {
+ return "ws:client:{$key}";
+ }
+
+ protected function getRoomKey($room)
+ {
+ return "ws:room:{$room}";
+ }
+}
diff --git a/src/websocket/socketio/EnginePacket.php b/src/websocket/socketio/EnginePacket.php
new file mode 100644
index 0000000..d1335d7
--- /dev/null
+++ b/src/websocket/socketio/EnginePacket.php
@@ -0,0 +1,81 @@
+type = $type;
+ $this->data = $data;
+ }
+
+ public static function open($payload)
+ {
+ return new self(self::OPEN, $payload);
+ }
+
+ public static function pong($payload = '')
+ {
+ return new self(self::PONG, $payload);
+ }
+
+ public static function ping()
+ {
+ return new self(self::PING);
+ }
+
+ public static function message($payload)
+ {
+ return new self(self::MESSAGE, $payload);
+ }
+
+ public static function fromString(string $packet)
+ {
+ return new self(substr($packet, 0, 1), substr($packet, 1) ?: '');
+ }
+
+ public function toString()
+ {
+ return $this->type . $this->data;
+ }
+}
diff --git a/src/websocket/socketio/Handler.php b/src/websocket/socketio/Handler.php
new file mode 100644
index 0000000..948560c
--- /dev/null
+++ b/src/websocket/socketio/Handler.php
@@ -0,0 +1,198 @@
+event = $event;
+ $this->config = $config;
+ $this->websocket = $websocket;
+ $this->pingInterval = $this->config->get('worker.websocket.ping_interval', 25000);
+ $this->pingTimeout = $this->config->get('worker.websocket.ping_timeout', 60000);
+ }
+
+ /**
+ * "onOpen" listener.
+ *
+ * @param Request $request
+ */
+ public function onOpen(Request $request)
+ {
+ $this->eio = $request->param('EIO');
+
+ $payload = json_encode(
+ [
+ 'sid' => base64_encode(uniqid()),
+ 'upgrades' => [],
+ 'pingInterval' => $this->pingInterval,
+ 'pingTimeout' => $this->pingTimeout,
+ ]
+ );
+
+ $this->push(EnginePacket::open($payload));
+
+ $this->event->trigger('worker.websocket.Open', $request);
+
+ if ($this->eio < 4) {
+ $this->resetPingTimeout($this->pingInterval + $this->pingTimeout);
+ $this->onConnect();
+ } else {
+ $this->schedulePing();
+ }
+ }
+
+ /**
+ * "onMessage" listener.
+ *
+ * @param Frame $frame
+ */
+ public function onMessage(Frame $frame)
+ {
+ $enginePacket = EnginePacket::fromString($frame->data);
+
+ $this->event->trigger('worker.websocket.Message', $enginePacket);
+
+ $this->resetPingTimeout($this->pingInterval + $this->pingTimeout);
+
+ switch ($enginePacket->type) {
+ case EnginePacket::MESSAGE:
+ $packet = Packet::fromString($enginePacket->data);
+ switch ($packet->type) {
+ case Packet::CONNECT:
+ $this->onConnect($packet->data);
+ break;
+ case Packet::EVENT:
+ $type = array_shift($packet->data);
+ $data = $packet->data;
+ $result = $this->event->trigger('worker.websocket.Event', new WsEvent($type, $data));
+
+ if ($packet->id !== null) {
+ $responsePacket = Packet::create(Packet::ACK, [
+ 'id' => $packet->id,
+ 'nsp' => $packet->nsp,
+ 'data' => $result,
+ ]);
+
+ $this->push($responsePacket);
+ }
+ break;
+ case Packet::DISCONNECT:
+ $this->event->trigger('worker.websocket.Disconnect');
+ $this->websocket->close();
+ break;
+ default:
+ $this->websocket->close();
+ break;
+ }
+ break;
+ case EnginePacket::PING:
+ $this->event->trigger('worker.websocket.Ping');
+ $this->push(EnginePacket::pong($enginePacket->data));
+ break;
+ case EnginePacket::PONG:
+ $this->event->trigger('worker.websocket.Pong');
+ $this->schedulePing();
+ break;
+ default:
+ $this->websocket->close();
+ break;
+ }
+ }
+
+ /**
+ * "onClose" listener.
+ */
+ public function onClose()
+ {
+ Timer::del($this->pingTimeoutTimer);
+ Timer::del($this->pingIntervalTimer);
+ $this->event->trigger('worker.websocket.Close');
+ }
+
+ protected function onConnect($data = null)
+ {
+ try {
+ $this->event->trigger('worker.websocket.Connect', $data);
+ $packet = Packet::create(Packet::CONNECT);
+ if ($this->eio >= 4) {
+ $packet->data = ['sid' => base64_encode(uniqid())];
+ }
+ } catch (Exception $exception) {
+ $packet = Packet::create(Packet::CONNECT_ERROR, [
+ 'data' => ['message' => $exception->getMessage()],
+ ]);
+ }
+
+ $this->push($packet);
+ }
+
+ protected function resetPingTimeout($timeout)
+ {
+ Timer::del($this->pingTimeoutTimer);
+ $this->pingTimeoutTimer = Timer::delay($timeout, function () {
+ $this->websocket->close();
+ });
+ }
+
+ protected function schedulePing()
+ {
+ Timer::del($this->pingIntervalTimer);
+ $this->pingIntervalTimer = Timer::delay($this->pingInterval, function () {
+ $this->push(EnginePacket::ping());
+ $this->resetPingTimeout($this->pingTimeout);
+ });
+ }
+
+ public function encodeMessage($message)
+ {
+ if ($message instanceof WsEvent) {
+ $message = Packet::create(Packet::EVENT, [
+ 'data' => array_merge([$message->type], $message->data),
+ ]);
+ }
+
+ if ($message instanceof Packet) {
+ $message = EnginePacket::message($message->toString());
+ }
+
+ if ($message instanceof EnginePacket) {
+ $message = $message->toString();
+ }
+
+ return $message;
+ }
+
+ protected function push($data)
+ {
+ $this->websocket->push($data);
+ }
+
+}
diff --git a/src/websocket/socketio/Packet.php b/src/websocket/socketio/Packet.php
new file mode 100644
index 0000000..9b277a8
--- /dev/null
+++ b/src/websocket/socketio/Packet.php
@@ -0,0 +1,134 @@
+type = $type;
+ }
+
+ public static function create($type, array $decoded = [])
+ {
+ $new = new self($type);
+ $new->id = $decoded['id'] ?? null;
+ if (isset($decoded['nsp'])) {
+ $new->nsp = $decoded['nsp'] ?: '/';
+ } else {
+ $new->nsp = '/';
+ }
+ $new->data = $decoded['data'] ?? null;
+ return $new;
+ }
+
+ public function toString()
+ {
+ $str = '' . $this->type;
+ if ($this->nsp && '/' !== $this->nsp) {
+ $str .= $this->nsp . ',';
+ }
+
+ if ($this->id !== null) {
+ $str .= $this->id;
+ }
+
+ if (null !== $this->data) {
+ $str .= json_encode($this->data, JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES);
+ }
+ return $str;
+ }
+
+ public static function fromString(string $str)
+ {
+ $i = 0;
+
+ $packet = new Packet((int) substr($str, 0, 1));
+
+ // look up namespace (if any)
+ if ('/' === substr($str, $i + 1, 1)) {
+ $nsp = '';
+ while (++$i) {
+ $c = substr($str, $i, 1);
+ if (',' === $c) {
+ break;
+ }
+ $nsp .= $c;
+ if ($i === strlen($str)) {
+ break;
+ }
+ }
+ $packet->nsp = $nsp;
+ } else {
+ $packet->nsp = '/';
+ }
+
+ // look up id
+ $next = substr($str, $i + 1, 1);
+ if ('' !== $next && is_numeric($next)) {
+ $id = '';
+ while (++$i) {
+ $c = substr($str, $i, 1);
+ if (null == $c || !is_numeric($c)) {
+ --$i;
+ break;
+ }
+ $id .= substr($str, $i, 1);
+ if ($i === strlen($str)) {
+ break;
+ }
+ }
+ $packet->id = intval($id);
+ }
+
+ // look up json data
+ if (substr($str, ++$i, 1)) {
+ $packet->data = json_decode(substr($str, $i), true);
+ }
+
+ return $packet;
+ }
+}
diff --git a/tests/Pest.php b/tests/Pest.php
new file mode 100644
index 0000000..48d3610
--- /dev/null
+++ b/tests/Pest.php
@@ -0,0 +1,2 @@
+ 'false',
+ 'PHP_QUEUE_ENABLE' => 'false',
+ ]);
+ $process->start();
+ $wait = 0;
+
+ while (!$process->getOutput()) {
+ $wait++;
+ if ($wait > 30) {
+ throw new Exception('server start failed');
+ }
+ sleep(1);
+ }
+});
+
+afterAll(function () use (&$process) {
+ echo $process->getOutput();
+ $process->stop();
+});
+
+beforeEach(function () {
+ $this->httpClient = new Client([
+ 'base_uri' => 'http://127.0.0.1:8080',
+ 'cookies' => true,
+ 'http_errors' => false,
+ 'timeout' => 1,
+ ]);
+});
+
+it('callback route', function () {
+ $response = $this->httpClient->get('/');
+
+ expect($response->getStatusCode())
+ ->toBe(200)
+ ->and($response->getBody()->getContents())
+ ->toBe('hello world');
+});
+
+it('controller route', function () {
+ $jar = new CookieJar();
+
+ $response = $this->httpClient->get('/test', ['cookies' => $jar]);
+
+ expect($response->getStatusCode())
+ ->toBe(200)
+ ->and($response->getBody()->getContents())
+ ->toBe('test')
+ ->and($jar->getCookieByName('name')->getValue())
+ ->toBe('think');
+});
+
+it('json post', function () {
+
+ $data = [
+ 'name' => 'think',
+ ];
+ $response = $this->httpClient->post('/json', [
+ 'json' => $data,
+ ]);
+
+ expect($response->getStatusCode())
+ ->toBe(200)
+ ->and($response->getBody()->getContents())
+ ->toBe(json_encode($data));
+});
+
+it('put and delete request', function () {
+ $response = $this->httpClient->put('/');
+
+ expect($response->getStatusCode())
+ ->toBe(200)
+ ->and($response->getBody()->getContents())
+ ->toBe('put');
+
+ $response = $this->httpClient->delete('/');
+
+ expect($response->getStatusCode())
+ ->toBe(200)
+ ->and($response->getBody()->getContents())
+ ->toBe('delete');
+});
+
+it('file response', function () {
+ $response = $this->httpClient->get('/static/asset.txt');
+
+ expect($response->getStatusCode())
+ ->toBe(200)
+ ->and($response->getBody()->getContents())
+ ->toBe(file_get_contents(STUB_DIR . '/public/asset.txt'));
+});
+
+it('sse', function () {
+ $response = $this->httpClient->get('/sse', [
+ 'stream' => true,
+ 'timeout' => 3,
+ ]);
+
+ $body = $response->getBody();
+
+ $buffer = '';
+ while (!$body->eof()) {
+ $text = $body->read(1);
+ if ($text == "\r") {
+ continue;
+ }
+ $buffer .= $text;
+ if ($text == "\n") {
+ if ($buffer != "\n") {
+ expect($buffer)->toStartWith('data: ');
+ }
+ $buffer = '';
+ }
+ }
+});
+
+it('hot update', function () {
+ $response = $this->httpClient->get('/hot');
+
+ expect($response->getStatusCode())
+ ->toBe(404);
+
+ $route = <<httpClient->get('/hot');
+
+ expect($response->getStatusCode())
+ ->toBe(200)
+ ->and($response->getBody()->getContents())
+ ->toBe('hot');
+})->after(function () {
+ @unlink(STUB_DIR . '/route/hot.php');
+})->skipOnWindows();
diff --git a/tests/feature/QueueTest.php b/tests/feature/QueueTest.php
new file mode 100644
index 0000000..be3ed3e
--- /dev/null
+++ b/tests/feature/QueueTest.php
@@ -0,0 +1,5 @@
+ 'true',
+ 'PHP_QUEUE_ENABLE' => 'false',
+ 'PHP_HOT_ENABLE' => 'false',
+ ]);
+ $process->start();
+ $wait = 0;
+
+ while (!$process->getOutput()) {
+ $wait++;
+ if ($wait > 30) {
+ throw new Exception('server start failed');
+ }
+ sleep(1);
+ }
+});
+
+afterAll(function () use (&$process) {
+ echo $process->getOutput();
+ $process->stop();
+});
+
+beforeEach(function () {
+ $this->httpClient = new Client([
+ 'base_uri' => 'http://127.0.0.1:8080',
+ 'cookies' => true,
+ 'http_errors' => false,
+ 'timeout' => 1,
+ ]);
+});
+
+it('http', function () {
+ $response = $this->httpClient->get('/');
+
+ expect($response->getStatusCode())
+ ->toBe(200)
+ ->and($response->getBody()->getContents())
+ ->toBe('hello world');
+});
+
+it('websocket', function () {
+ $connected = 0;
+ $messages = [];
+ connect('ws://127.0.0.1:8080/websocket')
+ ->then(function (\Ratchet\Client\WebSocket $conn) use (&$connected, &$messages) {
+ $connected++;
+ $conn->on('message', function ($msg) use ($conn, &$messages) {
+ $messages[] = (string) $msg;
+ $conn->close();
+ });
+ });
+
+ connect('ws://127.0.0.1:8080/websocket')
+ ->then(function (\Ratchet\Client\WebSocket $conn) use (&$connected, &$messages) {
+ $connected++;
+ $conn->on('message', function ($msg) use ($conn, &$messages) {
+ $messages[] = (string) $msg;
+ $conn->close();
+ });
+
+ $conn->send('hello');
+ });
+
+ Loop::get()->run();
+
+ expect($connected)->toBe(2);
+ expect($messages)->toBe(['hello', 'hello']);
+});
diff --git a/tests/stub/.env b/tests/stub/.env
new file mode 100644
index 0000000..ea0466f
--- /dev/null
+++ b/tests/stub/.env
@@ -0,0 +1 @@
+APP_DEBUG=true
diff --git a/tests/stub/app/controller/Index.php b/tests/stub/app/controller/Index.php
new file mode 100644
index 0000000..823461c
--- /dev/null
+++ b/tests/stub/app/controller/Index.php
@@ -0,0 +1,20 @@
+post());
+ }
+}
diff --git a/tests/stub/config/app.php b/tests/stub/config/app.php
new file mode 100644
index 0000000..9dbc43d
--- /dev/null
+++ b/tests/stub/config/app.php
@@ -0,0 +1,4 @@
+ 'error',
+];
diff --git a/tests/stub/config/cache.php b/tests/stub/config/cache.php
new file mode 100644
index 0000000..4b44a20
--- /dev/null
+++ b/tests/stub/config/cache.php
@@ -0,0 +1,9 @@
+ 'file',
+ 'stores' => [
+ 'file' => [
+ 'type' => 'File',
+ ],
+ ]
+];
diff --git a/tests/stub/config/queue.php b/tests/stub/config/queue.php
new file mode 100644
index 0000000..fb911ff
--- /dev/null
+++ b/tests/stub/config/queue.php
@@ -0,0 +1,40 @@
+
+// +----------------------------------------------------------------------
+
+return [
+ 'default' => 'redis',
+ 'connections' => [
+ 'sync' => [
+ 'type' => 'sync',
+ ],
+ 'database' => [
+ 'type' => 'database',
+ 'queue' => 'default',
+ 'table' => 'jobs',
+ 'connection' => null,
+ ],
+ 'redis' => [
+ 'type' => 'redis',
+ 'queue' => 'default',
+ 'host' => env('REDIS_HOST', 'redis'),
+ 'port' => env('REDIS_PORT', 6379),
+ 'password' => '',
+ 'select' => 0,
+ 'timeout' => 0,
+ 'persistent' => true,
+ 'retry_after' => 600,
+ ],
+ ],
+ 'failed' => [
+ 'type' => 'none',
+ 'table' => 'failed_jobs',
+ ],
+];
diff --git a/tests/stub/config/worker.php b/tests/stub/config/worker.php
new file mode 100644
index 0000000..952e79d
--- /dev/null
+++ b/tests/stub/config/worker.php
@@ -0,0 +1,45 @@
+
+// +----------------------------------------------------------------------
+
+use think\worker\websocket\Handler;
+
+return [
+ 'http' => [
+ 'enable' => env('HTTP_ENABLE', true),
+ 'host' => '0.0.0.0',
+ 'port' => 8080,
+ 'worker_num' => 2,
+ 'options' => [],
+ ],
+ 'websocket' => [
+ 'enable' => env('WEBSOCKET_ENABLE', true),
+ 'handler' => Handler::class,
+ 'ping_interval' => 25000,
+ 'ping_timeout' => 60000,
+ ],
+ //队列
+ 'queue' => [
+ 'enable' => env('QUEUE_ENABLE', true),
+ 'workers' => [
+ 'default' => [],
+ ],
+ ],
+ //共享数据
+ 'conduit' => [
+ 'type' => 'socket',
+ ],
+ 'hot_update' => [
+ 'enable' => env('HOT_ENABLE', true),
+ 'name' => ['*.php'],
+ 'include' => [app_path(), config_path(), root_path('route')],
+ 'exclude' => [],
+ ],
+];
diff --git a/tests/stub/public/asset.txt b/tests/stub/public/asset.txt
new file mode 100644
index 0000000..4c545ef
--- /dev/null
+++ b/tests/stub/public/asset.txt
@@ -0,0 +1 @@
+Asset
diff --git a/tests/stub/route/app.php b/tests/stub/route/app.php
new file mode 100644
index 0000000..c137ddc
--- /dev/null
+++ b/tests/stub/route/app.php
@@ -0,0 +1,51 @@
+header([
+ 'Content-Type' => 'text/event-stream',
+ 'Cache-Control' => 'no-cache, must-revalidate',
+ ]);
+});
+
+Route::get('/websocket', function () {
+ return (new \think\worker\response\Websocket())
+ ->onOpen(function (\think\worker\Websocket $websocket) {
+ $websocket->join('foo');
+ })
+ ->onMessage(function (\think\worker\Websocket $websocket, \think\worker\websocket\Frame $frame) {
+ $websocket->to('foo')->push($frame->data);
+ });
+});
+
+Route::get('test', 'index/test');
+Route::post('json', 'index/json');
+
+Route::get('static/:path', function (string $path) {
+ $filename = public_path() . $path;
+ return new \think\worker\response\File($filename);
+})->pattern(['path' => '.*\.\w+$']);
diff --git a/tests/stub/runtime/.gitignore b/tests/stub/runtime/.gitignore
new file mode 100644
index 0000000..c96a04f
--- /dev/null
+++ b/tests/stub/runtime/.gitignore
@@ -0,0 +1,2 @@
+*
+!.gitignore
\ No newline at end of file
diff --git a/tests/stub/think b/tests/stub/think
new file mode 100644
index 0000000..d82ca91
--- /dev/null
+++ b/tests/stub/think
@@ -0,0 +1,12 @@
+#!/usr/bin/env php
+console->addCommands([\think\worker\command\Server::class]);
+
+$app->console->run();