Compare commits

..

No commits in common. "d9cf1f12b4f8844d6e57c0f2d5d6b839fec54f2c" and "2ff864b8c87689e7253ff51f34cfed138921fb4c" have entirely different histories.

3 changed files with 29 additions and 60 deletions

1
.gitignore vendored
View File

@ -2,4 +2,3 @@ composer.lock
.idea/ .idea/
vendor/ vendor/
tests/prikey.pem tests/prikey.pem
demo.php

View File

@ -2,7 +2,7 @@
"name": "maiyoule/mqttclient_author", "name": "maiyoule/mqttclient_author",
"type": "library", "type": "library",
"description": "MQTT管理模块操作库", "description": "MQTT管理模块操作库",
"version": "1.1.0", "version": "1.0.3",
"license": "MIT", "license": "MIT",
"authors": [ "authors": [
{ {
@ -15,8 +15,7 @@
"php": ">=8.0", "php": ">=8.0",
"guzzlehttp/guzzle": "^7.0", "guzzlehttp/guzzle": "^7.0",
"phpseclib/phpseclib": "~3.0", "phpseclib/phpseclib": "~3.0",
"workerman/mqtt": "^2.2", "workerman/mqtt": "^2.2"
"psr/log": "^3.0"
}, },
"require-dev": { "require-dev": {
"phpunit/phpunit": "^11.0" "phpunit/phpunit": "^11.0"

View File

@ -3,7 +3,6 @@
namespace cn\com\maiyoule\mqttclient; namespace cn\com\maiyoule\mqttclient;
use cn\com\maiyoule\mqttclient\biz\client\SendMQTTMessage; use cn\com\maiyoule\mqttclient\biz\client\SendMQTTMessage;
use Psr\Log\LoggerInterface;
use Workerman\Mqtt\Client; use Workerman\Mqtt\Client;
use Workerman\Worker; use Workerman\Worker;
@ -23,7 +22,7 @@ class AuthorAppMQTTClient
} }
private string $appid = ''; private string $appid = '';
private string $server = 'mqtt://mqttauthor.maiyoule.com.cn'; private string $server = '';
private int $port = 1883; private int $port = 1883;
private string $clientId = ''; private string $clientId = '';
private string $username = ''; private string $username = '';
@ -100,19 +99,6 @@ class AuthorAppMQTTClient
} }
private LoggerInterface $logger;
public function getLogger(): LoggerInterface
{
return $this->logger;
}
public function setLogger(LoggerInterface $logger): void
{
$this->logger = $logger;
}
/** /**
* @throws \Exception * @throws \Exception
*/ */
@ -135,18 +121,16 @@ class AuthorAppMQTTClient
$this->worker = new Worker(); $this->worker = new Worker();
$this->worker->onWorkerStart = function () { $this->worker->onWorkerStart = function () {
$this->logger->info('Worker 启动');
$this->__startWorker(); $this->__startWorker();
}; };
$this->worker->onMessage = function ($conn, $data) { $this->worker->onMessage = function ($conn, $data) {
$this->logger->debug($data); print_r($data);
}; };
$this->worker->onWorkerStop = function () { $this->worker->onWorkerStop = function () {
$this->logger->info('Worker 停止');
$this->__stopWorker(); $this->__stopWorker();
}; };
$this->worker->onError = function () { $this->worker->onError = function () {
$this->logger->error('Worker启动遇到错误'); print_r('遇到错误');
}; };
if (!Worker::isRunning()) { if (!Worker::isRunning()) {
Worker::runAll(); Worker::runAll();
@ -179,9 +163,6 @@ class AuthorAppMQTTClient
$this->client->disconnect(); $this->client->disconnect();
} }
/**
* @throws \Exception
*/
private function __startWorker(): void private function __startWorker(): void
{ {
//参考 https://www.workerman.net/doc/workerman/components/workerman-mqtt.html //参考 https://www.workerman.net/doc/workerman/components/workerman-mqtt.html
@ -189,49 +170,39 @@ class AuthorAppMQTTClient
'keepalive' => 29, 'keepalive' => 29,
'client_id' => $this->getClientId(), 'client_id' => $this->getClientId(),
'protocol_name' => 'MQTT', 'protocol_name' => 'MQTT',
'protocol_level' => 5, 'protocol_level' => 4,
'clean_session' => true, 'clean_session' => true,
'reconnect_period' => 2, 'reconnect_period' => 2,
'connect_timeout' => 10, 'connect_timeout' => 10,
'username' => $this->getUsername(), 'username' => $this->getUsername(),
'password' => $this->getPassword(), 'password' => $this->getPassword(),
'resubscribe' => true, 'resubscribe' => true
'debug' => false
]; ];
$address = sprintf('mqtt://%s:%d', $this->getServer(), $this->getPort()); $address = sprintf('mqtt://%s:%d', $this->getServer(), $this->getPort());
$this->logger->debug('建立MQTT连接到' . $address); try {
$this->client = new Client($address, $option);
//连接回调
$this->client->onConnect = function (Client $connection) {
$topic = sprintf('biz/%s/#', $this->getAppid());
$connection->subscribe($topic, ['qos' => 0]);
$this->client = new Client($address, $option); $this->callEvent(self::EVENT_CONNECT, $connection);
//连接回调 };
$this->client->onConnect = function (Client $connection) { $this->client->onClose = function () {
$this->logger->debug('MQTT 连接成功'); $this->callEvent(self::EVENT_CLOSE);
$topic = sprintf('biz/%s/#', $this->getAppid()); };
$connection->subscribe($topic, ['qos' => 0], function (\Exception|null $exception, array $granted) { $this->client->onMessage = function (string $topic, string $data, Client $client) {
//收到消息
if (!is_null($exception)) { $this->callEvent(self::EVENT_MESSAGE, $topic, $data, $client);
$this->logger->error($exception->getMessage()); };
} //遇到错误
$this->logger->debug('主题订阅:' . json_encode($granted)); $this->client->onError = function (\Exception $err) {
}); $this->callEvent(self::EVENT_ERROR, $err);
};
$this->callEvent(self::EVENT_CONNECT, $connection); $this->client->connect();
}; } catch (\Exception $e) {
$this->client->onClose = function () {
$this->logger->debug('MQTT 连接断开');
$this->callEvent(self::EVENT_CLOSE);
};
$this->client->onMessage = function (string $topic, string $data, Client $client) {
//收到消息
$this->logger->debug(sprintf('MQTT 收到消息 Topic:%s Data:%s', $topic, $data));
$this->callEvent(self::EVENT_MESSAGE, $topic, $data, $client);
};
//遇到错误
$this->client->onError = function (\Exception $err) {
$this->logger->error('MQTT 遇到错误' . $err->getMessage());
$this->callEvent(self::EVENT_ERROR, $err);
};
$this->client->connect();
}
} }
const EVENT_CONNECT = 'connect'; const EVENT_CONNECT = 'connect';