From 9939ee4c8bfa14194e77fd66e12963be26ff888f Mon Sep 17 00:00:00 2001 From: "X14XA\\shengli" Date: Tue, 13 May 2025 21:59:28 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BD=BF=E7=94=A8=E6=A0=87=E5=87=86=E6=97=A5?= =?UTF-8?q?=E5=BF=97=E6=8E=A5=E5=8F=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 1 + composer.json | 3 +- src/AuthorAppMQTTClient.php | 83 +++++++++++++++++++++++++------------ 3 files changed, 59 insertions(+), 28 deletions(-) diff --git a/.gitignore b/.gitignore index 046d499..252485f 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,4 @@ composer.lock .idea/ vendor/ tests/prikey.pem +demo.php diff --git a/composer.json b/composer.json index 8c2cb4b..4216350 100644 --- a/composer.json +++ b/composer.json @@ -15,7 +15,8 @@ "php": ">=8.0", "guzzlehttp/guzzle": "^7.0", "phpseclib/phpseclib": "~3.0", - "workerman/mqtt": "^2.2" + "workerman/mqtt": "^2.2", + "psr/log": "^3.0" }, "require-dev": { "phpunit/phpunit": "^11.0" diff --git a/src/AuthorAppMQTTClient.php b/src/AuthorAppMQTTClient.php index 37ec268..a7f5b5a 100644 --- a/src/AuthorAppMQTTClient.php +++ b/src/AuthorAppMQTTClient.php @@ -3,6 +3,7 @@ namespace cn\com\maiyoule\mqttclient; use cn\com\maiyoule\mqttclient\biz\client\SendMQTTMessage; +use Psr\Log\LoggerInterface; use Workerman\Mqtt\Client; use Workerman\Worker; @@ -22,7 +23,7 @@ class AuthorAppMQTTClient } private string $appid = ''; - private string $server = ''; + private string $server = 'mqtt://mqttauthor.maiyoule.com.cn'; private int $port = 1883; private string $clientId = ''; private string $username = ''; @@ -99,6 +100,19 @@ class AuthorAppMQTTClient } + private LoggerInterface $logger; + + public function getLogger(): LoggerInterface + { + return $this->logger; + } + + public function setLogger(LoggerInterface $logger): void + { + $this->logger = $logger; + } + + /** * @throws \Exception */ @@ -121,16 +135,18 @@ class AuthorAppMQTTClient $this->worker = new Worker(); $this->worker->onWorkerStart = function () { + $this->logger->info('Worker 启动'); $this->__startWorker(); }; $this->worker->onMessage = function ($conn, $data) { - print_r($data); + $this->logger->debug($data); }; $this->worker->onWorkerStop = function () { + $this->logger->info('Worker 停止'); $this->__stopWorker(); }; $this->worker->onError = function () { - print_r('遇到错误'); + $this->logger->error('Worker启动遇到错误'); }; if (!Worker::isRunning()) { Worker::runAll(); @@ -163,6 +179,9 @@ class AuthorAppMQTTClient $this->client->disconnect(); } + /** + * @throws \Exception + */ private function __startWorker(): void { //参考 https://www.workerman.net/doc/workerman/components/workerman-mqtt.html @@ -170,39 +189,49 @@ class AuthorAppMQTTClient 'keepalive' => 29, 'client_id' => $this->getClientId(), 'protocol_name' => 'MQTT', - 'protocol_level' => 4, + 'protocol_level' => 5, 'clean_session' => true, 'reconnect_period' => 2, 'connect_timeout' => 10, 'username' => $this->getUsername(), 'password' => $this->getPassword(), - 'resubscribe' => true + 'resubscribe' => true, + 'debug' => false ]; $address = sprintf('mqtt://%s:%d', $this->getServer(), $this->getPort()); - try { - $this->client = new Client($address, $option); - //连接回调 - $this->client->onConnect = function (Client $connection) { - $topic = sprintf('biz/%s/#', $this->getAppid()); - $connection->subscribe($topic, ['qos' => 0]); + $this->logger->debug('建立MQTT连接到' . $address); - $this->callEvent(self::EVENT_CONNECT, $connection); - }; - $this->client->onClose = function () { - $this->callEvent(self::EVENT_CLOSE); - }; - $this->client->onMessage = function (string $topic, string $data, Client $client) { - //收到消息 - $this->callEvent(self::EVENT_MESSAGE, $topic, $data, $client); - }; - //遇到错误 - $this->client->onError = function (\Exception $err) { - $this->callEvent(self::EVENT_ERROR, $err); - }; - $this->client->connect(); - } catch (\Exception $e) { + $this->client = new Client($address, $option); + //连接回调 + $this->client->onConnect = function (Client $connection) { + $this->logger->debug('MQTT 连接成功'); + $topic = sprintf('biz/%s/#', $this->getAppid()); + $connection->subscribe($topic, ['qos' => 0], function (\Exception|null $exception, array $granted) { + + if (!is_null($exception)) { + $this->logger->error($exception->getMessage()); + } + $this->logger->debug('主题订阅:' . json_encode($granted)); + }); + + $this->callEvent(self::EVENT_CONNECT, $connection); + }; + $this->client->onClose = function () { + $this->logger->debug('MQTT 连接断开'); + $this->callEvent(self::EVENT_CLOSE); + }; + $this->client->onMessage = function (string $topic, string $data, Client $client) { + //收到消息 + $this->logger->debug(sprintf('MQTT 收到消息 Topic:%s Data:%s', $topic, $data)); + $this->callEvent(self::EVENT_MESSAGE, $topic, $data, $client); + }; + //遇到错误 + $this->client->onError = function (\Exception $err) { + $this->logger->error('MQTT 遇到错误' . $err->getMessage()); + $this->callEvent(self::EVENT_ERROR, $err); + }; + $this->client->connect(); - } } const EVENT_CONNECT = 'connect';