diff --git a/.gitignore b/.gitignore index 046d499..252485f 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,4 @@ composer.lock .idea/ vendor/ tests/prikey.pem +demo.php diff --git a/README.md b/README.md index 87a7420..0f58b6b 100644 --- a/README.md +++ b/README.md @@ -7,7 +7,7 @@ composer require maiyoule/mqttclient_author ### 配置 ```php -$manager = new MQTTManager(); +$manager = new AuthorServerClient(); //设置APPID $manager->setAppId(''); //设置私钥 diff --git a/composer.json b/composer.json index 289426e..50a9fbc 100644 --- a/composer.json +++ b/composer.json @@ -2,7 +2,7 @@ "name": "maiyoule/mqttclient_author", "type": "library", "description": "MQTT管理模块操作库", - "version": "1.0.3", + "version": "1.1.0", "license": "MIT", "authors": [ { @@ -14,7 +14,9 @@ "require": { "php": ">=8.0", "guzzlehttp/guzzle": "^7.0", - "phpseclib/phpseclib": "~3.0" + "phpseclib/phpseclib": "~3.0", + "workerman/mqtt": "^2.2", + "psr/log": "^3.0" }, "require-dev": { "phpunit/phpunit": "^11.0" @@ -29,4 +31,4 @@ "cn\\com\\maiyoule\\mqttclient\\test\\": "tests" } } -} \ No newline at end of file +} diff --git a/src/AuthorAppMQTTClient.php b/src/AuthorAppMQTTClient.php new file mode 100644 index 0000000..a7f5b5a --- /dev/null +++ b/src/AuthorAppMQTTClient.php @@ -0,0 +1,282 @@ +server; + } + + public function setServer(string $server): AuthorAppMQTTClient + { + $this->server = $server; + return $this; + } + + public function getPort(): int + { + return $this->port; + } + + public function setPort(int $port): AuthorAppMQTTClient + { + $this->port = $port; + return $this; + } + + public function getClientId(): string + { + return $this->clientId; + } + + public function setClientId(string $clientId): AuthorAppMQTTClient + { + $this->clientId = $clientId; + return $this; + } + + public function getUsername(): string + { + return $this->username; + } + + public function setUsername(string $username): AuthorAppMQTTClient + { + $this->username = $username; + return $this; + } + + public function getPassword(): string + { + return $this->password; + } + + public function setPassword(string $password): AuthorAppMQTTClient + { + $this->password = $password; + return $this; + } + + + public function getAppid(): string + { + return $this->appid; + } + + public function setAppid(string $appid): AuthorAppMQTTClient + { + $this->appid = $appid; + return $this; + } + + + private LoggerInterface $logger; + + public function getLogger(): LoggerInterface + { + return $this->logger; + } + + public function setLogger(LoggerInterface $logger): void + { + $this->logger = $logger; + } + + + /** + * @throws \Exception + */ + private function __construct() + { + if (class_exists('Worker')) { + throw new \Exception('没有找到Workman'); + } + } + + private ?Worker $worker; + + /** + * 启动服务 + * @return bool + * @throws \Throwable + */ + public function start(): bool + { + $this->worker = new Worker(); + + $this->worker->onWorkerStart = function () { + $this->logger->info('Worker 启动'); + $this->__startWorker(); + }; + $this->worker->onMessage = function ($conn, $data) { + $this->logger->debug($data); + }; + $this->worker->onWorkerStop = function () { + $this->logger->info('Worker 停止'); + $this->__stopWorker(); + }; + $this->worker->onError = function () { + $this->logger->error('Worker启动遇到错误'); + }; + if (!Worker::isRunning()) { + Worker::runAll(); + } + return true; + } + + /** + * 停止 + * @return void + */ + public function stop(): void + { + if (is_null($this->worker)) { + return; + } + $this->worker->stop(); + $this->worker = null; + } + + private function __stopWorker(): void + { + if (is_null($this->client)) { + return; + } + $topic = sprintf('biz/%s/#', $this->getAppid()); + + $this->client->unsubscribe($topic); + //清理订阅 + $this->client->disconnect(); + } + + /** + * @throws \Exception + */ + private function __startWorker(): void + { + //参考 https://www.workerman.net/doc/workerman/components/workerman-mqtt.html + $option = [ + 'keepalive' => 29, + 'client_id' => $this->getClientId(), + 'protocol_name' => 'MQTT', + 'protocol_level' => 5, + 'clean_session' => true, + 'reconnect_period' => 2, + 'connect_timeout' => 10, + 'username' => $this->getUsername(), + 'password' => $this->getPassword(), + 'resubscribe' => true, + 'debug' => false + ]; + $address = sprintf('mqtt://%s:%d', $this->getServer(), $this->getPort()); + $this->logger->debug('建立MQTT连接到' . $address); + + $this->client = new Client($address, $option); + //连接回调 + $this->client->onConnect = function (Client $connection) { + $this->logger->debug('MQTT 连接成功'); + $topic = sprintf('biz/%s/#', $this->getAppid()); + $connection->subscribe($topic, ['qos' => 0], function (\Exception|null $exception, array $granted) { + + if (!is_null($exception)) { + $this->logger->error($exception->getMessage()); + } + $this->logger->debug('主题订阅:' . json_encode($granted)); + }); + + $this->callEvent(self::EVENT_CONNECT, $connection); + }; + $this->client->onClose = function () { + $this->logger->debug('MQTT 连接断开'); + $this->callEvent(self::EVENT_CLOSE); + }; + $this->client->onMessage = function (string $topic, string $data, Client $client) { + //收到消息 + $this->logger->debug(sprintf('MQTT 收到消息 Topic:%s Data:%s', $topic, $data)); + $this->callEvent(self::EVENT_MESSAGE, $topic, $data, $client); + }; + //遇到错误 + $this->client->onError = function (\Exception $err) { + $this->logger->error('MQTT 遇到错误' . $err->getMessage()); + $this->callEvent(self::EVENT_ERROR, $err); + }; + $this->client->connect(); + + } + + const EVENT_CONNECT = 'connect'; + const EVENT_CLOSE = 'close'; + + const EVENT_MESSAGE = 'message'; + const EVENT_ERROR = 'error'; + + private array $mapEvent = []; + + public function registerEvent(string $event, \Closure $callback): AuthorAppMQTTClient + { + $this->mapEvent[$event][] = $callback; + return $this; + } + + private function callEvent(string $event, ...$args): void + { + if (isset($this->mapEvent[$event])) { + foreach ($this->mapEvent[$event] as $callback) { + call_user_func_array($callback, $args); + } + } + } + + /** + * 发送消息到目标客户端 + * @param SendMQTTMessage $message + * @return bool + */ + public function sendToTarget(SendMQTTMessage $message): bool + { + //组装主题 + $topic = sprintf('biz/%s/%s/%s/downstream', $this->getAppid(), $message->getBiz(), $message->getTargetId()); + if (is_null($this->client)) { + return false; + } + $options = [ + 'qos' => $message->getQoS(), + 'retain' => $message->isRetain(), + 'dup' => false + ]; + $this->client->publish($topic, $message->getBiz(), $options); + return true; + } + + +} \ No newline at end of file diff --git a/src/MQTTManager.php b/src/AuthorServerClient.php similarity index 98% rename from src/MQTTManager.php rename to src/AuthorServerClient.php index be07361..53990d6 100644 --- a/src/MQTTManager.php +++ b/src/AuthorServerClient.php @@ -8,7 +8,10 @@ use GuzzleHttp\Client; use GuzzleHttp\Exception\ClientException; use phpseclib3\Crypt\RSA; -class MQTTManager +/** + * MQTT服务端接口 + */ +class AuthorServerClient { /** * 私钥 diff --git a/src/biz/AppUserCreateRequest.php b/src/biz/AppUserCreateRequest.php index a7d2a77..16f0075 100644 --- a/src/biz/AppUserCreateRequest.php +++ b/src/biz/AppUserCreateRequest.php @@ -29,8 +29,11 @@ class AppUserCreateRequest extends IRequest return $this->biz; } - public function setBiz(array $biz): void + public function setBiz(array|string $biz): void { + if (is_string($biz)) { + $biz = [$biz]; + } $this->biz = $biz; } diff --git a/src/biz/AppUsersRequest.php b/src/biz/AppUsersRequest.php new file mode 100644 index 0000000..beb095d --- /dev/null +++ b/src/biz/AppUsersRequest.php @@ -0,0 +1,34 @@ +role; + } + + public function setRole(string $role): void + { + $this->role = $role; + } + + + public function body(): array + { + return [ + 'role' => $this->role, + ]; + } +} \ No newline at end of file diff --git a/src/biz/client/SendMQTTMessage.php b/src/biz/client/SendMQTTMessage.php new file mode 100644 index 0000000..6a626c9 --- /dev/null +++ b/src/biz/client/SendMQTTMessage.php @@ -0,0 +1,91 @@ +setBiz($biz); + $instance->setTargetId($targetId); + $instance->setMessage($message); + $instance->setQoS($qos); + $instance->setRetain($retain); + return $instance; + } + + const QOS_0 = 0; + const QOS_1 = 1; + const QOS_2 = 2; + + public function isRetain(): bool + { + return $this->retain; + } + + public function setRetain(bool $retain): void + { + $this->retain = $retain; + } + + public function getQoS(): int + { + return $this->qoS; + } + + public function setQoS(int $qoS): void + { + $this->qoS = $qoS; + } + + public function getMessage(): string + { + return $this->message; + } + + public function setMessage(string $message): void + { + $this->message = $message; + } + + public function getTargetId(): string + { + return $this->targetId; + } + + public function setTargetId(string $targetId): void + { + $this->targetId = $targetId; + } + + public function getBiz(): string + { + return $this->biz; + } + + public function setBiz(string $biz): void + { + $this->biz = $biz; + } + +} \ No newline at end of file diff --git a/tests/AppManagerTest.php b/tests/AppManagerTest.php index d5867c1..7eb02a3 100644 --- a/tests/AppManagerTest.php +++ b/tests/AppManagerTest.php @@ -6,34 +6,43 @@ use cn\com\maiyoule\mqttclient\biz\AppUserCreateRequest; use cn\com\maiyoule\mqttclient\biz\AppUserDeleteRequest; use cn\com\maiyoule\mqttclient\biz\AppUserUpdateRequest; use cn\com\maiyoule\mqttclient\exception\ApiException; -use cn\com\maiyoule\mqttclient\MQTTManager; +use cn\com\maiyoule\mqttclient\AuthorServerClient; use PHPUnit\Framework\Assert; use PHPUnit\Framework\TestCase; class AppManagerTest extends TestCase { - private MQTTManager $manager; + private AuthorServerClient $manager; protected function setUp(): void { - $this->manager = new MQTTManager(); - $this->manager->setAppId('6MMVTLW66D'); + $this->manager = new AuthorServerClient(); + $this->manager->setAppId('6N6QPV2FYD'); $this->manager->setPrivateKey(file_get_contents(__DIR__ . '/prikey.pem')); - $this->manager->setApi('http://localhost:8000/api/'); + $this->manager->setApi('https://mqttauthor_dev.maiyoule.com.cn/api/'); $this->manager->setDebug(true); } + public function testAdmin() + { + $request = new AppUserCreateRequest(); + $request->setPassword('123'); + $request->setRole('admin'); + $request->setBiz('ch'); + $biz = $this->manager->exec($request); + $this->assertTrue($biz->isSuccess(), $biz->getMessage()); + } + + public function testRunUser() { - - try { $request = new AppUserCreateRequest(); $request->setPassword('111'); $request->setFettle(''); $request->setRole('admin'); - $request->setBiz(['ch','ws']); + $request->setBiz(['ch', 'ws']); $biz = $this->manager->exec($request); if (!$biz->isSuccess()) { diff --git a/tests/AuthorAppMQTTClientTest.php b/tests/AuthorAppMQTTClientTest.php new file mode 100644 index 0000000..74edecc --- /dev/null +++ b/tests/AuthorAppMQTTClientTest.php @@ -0,0 +1,39 @@ +client->stop(); + } + + + public function testInit() + { + $this->client = AuthorAppMQTTClient::getInstance(); + $this->client->setServer('202.200.18.22'); + $this->client->setAppid('6N6QPV2FYD'); + $this->client + ->setClientId('6NEC8SKNK5') + ->setUsername('6N6QPV2FYD') + ->setPassword('123') + ->registerEvent(AuthorAppMQTTClient::EVENT_CONNECT, function ($connection) { + print_r('连接成功'); + }); + $result = $this->client->start(); + + self::assertTrue($result, '启动失败'); + } +}