使用标准日志接口

This commit is contained in:
X14XA\shengli 2025-05-13 21:59:28 +08:00
parent 2ff864b8c8
commit 9939ee4c8b
3 changed files with 59 additions and 28 deletions

1
.gitignore vendored
View File

@ -2,3 +2,4 @@ composer.lock
.idea/ .idea/
vendor/ vendor/
tests/prikey.pem tests/prikey.pem
demo.php

View File

@ -15,7 +15,8 @@
"php": ">=8.0", "php": ">=8.0",
"guzzlehttp/guzzle": "^7.0", "guzzlehttp/guzzle": "^7.0",
"phpseclib/phpseclib": "~3.0", "phpseclib/phpseclib": "~3.0",
"workerman/mqtt": "^2.2" "workerman/mqtt": "^2.2",
"psr/log": "^3.0"
}, },
"require-dev": { "require-dev": {
"phpunit/phpunit": "^11.0" "phpunit/phpunit": "^11.0"

View File

@ -3,6 +3,7 @@
namespace cn\com\maiyoule\mqttclient; namespace cn\com\maiyoule\mqttclient;
use cn\com\maiyoule\mqttclient\biz\client\SendMQTTMessage; use cn\com\maiyoule\mqttclient\biz\client\SendMQTTMessage;
use Psr\Log\LoggerInterface;
use Workerman\Mqtt\Client; use Workerman\Mqtt\Client;
use Workerman\Worker; use Workerman\Worker;
@ -22,7 +23,7 @@ class AuthorAppMQTTClient
} }
private string $appid = ''; private string $appid = '';
private string $server = ''; private string $server = 'mqtt://mqttauthor.maiyoule.com.cn';
private int $port = 1883; private int $port = 1883;
private string $clientId = ''; private string $clientId = '';
private string $username = ''; private string $username = '';
@ -99,6 +100,19 @@ class AuthorAppMQTTClient
} }
private LoggerInterface $logger;
public function getLogger(): LoggerInterface
{
return $this->logger;
}
public function setLogger(LoggerInterface $logger): void
{
$this->logger = $logger;
}
/** /**
* @throws \Exception * @throws \Exception
*/ */
@ -121,16 +135,18 @@ class AuthorAppMQTTClient
$this->worker = new Worker(); $this->worker = new Worker();
$this->worker->onWorkerStart = function () { $this->worker->onWorkerStart = function () {
$this->logger->info('Worker 启动');
$this->__startWorker(); $this->__startWorker();
}; };
$this->worker->onMessage = function ($conn, $data) { $this->worker->onMessage = function ($conn, $data) {
print_r($data); $this->logger->debug($data);
}; };
$this->worker->onWorkerStop = function () { $this->worker->onWorkerStop = function () {
$this->logger->info('Worker 停止');
$this->__stopWorker(); $this->__stopWorker();
}; };
$this->worker->onError = function () { $this->worker->onError = function () {
print_r('遇到错误'); $this->logger->error('Worker启动遇到错误');
}; };
if (!Worker::isRunning()) { if (!Worker::isRunning()) {
Worker::runAll(); Worker::runAll();
@ -163,6 +179,9 @@ class AuthorAppMQTTClient
$this->client->disconnect(); $this->client->disconnect();
} }
/**
* @throws \Exception
*/
private function __startWorker(): void private function __startWorker(): void
{ {
//参考 https://www.workerman.net/doc/workerman/components/workerman-mqtt.html //参考 https://www.workerman.net/doc/workerman/components/workerman-mqtt.html
@ -170,39 +189,49 @@ class AuthorAppMQTTClient
'keepalive' => 29, 'keepalive' => 29,
'client_id' => $this->getClientId(), 'client_id' => $this->getClientId(),
'protocol_name' => 'MQTT', 'protocol_name' => 'MQTT',
'protocol_level' => 4, 'protocol_level' => 5,
'clean_session' => true, 'clean_session' => true,
'reconnect_period' => 2, 'reconnect_period' => 2,
'connect_timeout' => 10, 'connect_timeout' => 10,
'username' => $this->getUsername(), 'username' => $this->getUsername(),
'password' => $this->getPassword(), 'password' => $this->getPassword(),
'resubscribe' => true 'resubscribe' => true,
'debug' => false
]; ];
$address = sprintf('mqtt://%s:%d', $this->getServer(), $this->getPort()); $address = sprintf('mqtt://%s:%d', $this->getServer(), $this->getPort());
try { $this->logger->debug('建立MQTT连接到' . $address);
$this->client = new Client($address, $option);
//连接回调
$this->client->onConnect = function (Client $connection) {
$topic = sprintf('biz/%s/#', $this->getAppid());
$connection->subscribe($topic, ['qos' => 0]);
$this->callEvent(self::EVENT_CONNECT, $connection); $this->client = new Client($address, $option);
}; //连接回调
$this->client->onClose = function () { $this->client->onConnect = function (Client $connection) {
$this->callEvent(self::EVENT_CLOSE); $this->logger->debug('MQTT 连接成功');
}; $topic = sprintf('biz/%s/#', $this->getAppid());
$this->client->onMessage = function (string $topic, string $data, Client $client) { $connection->subscribe($topic, ['qos' => 0], function (\Exception|null $exception, array $granted) {
//收到消息
$this->callEvent(self::EVENT_MESSAGE, $topic, $data, $client); if (!is_null($exception)) {
}; $this->logger->error($exception->getMessage());
//遇到错误 }
$this->client->onError = function (\Exception $err) { $this->logger->debug('主题订阅:' . json_encode($granted));
$this->callEvent(self::EVENT_ERROR, $err); });
};
$this->client->connect(); $this->callEvent(self::EVENT_CONNECT, $connection);
} catch (\Exception $e) { };
$this->client->onClose = function () {
$this->logger->debug('MQTT 连接断开');
$this->callEvent(self::EVENT_CLOSE);
};
$this->client->onMessage = function (string $topic, string $data, Client $client) {
//收到消息
$this->logger->debug(sprintf('MQTT 收到消息 Topic:%s Data:%s', $topic, $data));
$this->callEvent(self::EVENT_MESSAGE, $topic, $data, $client);
};
//遇到错误
$this->client->onError = function (\Exception $err) {
$this->logger->error('MQTT 遇到错误' . $err->getMessage());
$this->callEvent(self::EVENT_ERROR, $err);
};
$this->client->connect();
}
} }
const EVENT_CONNECT = 'connect'; const EVENT_CONNECT = 'connect';