初始化

This commit is contained in:
2026-01-06 13:37:07 +08:00
parent c3435595fe
commit 00d7a381aa
70 changed files with 3913 additions and 1 deletions

26
src/conduit/Driver.php Normal file
View File

@@ -0,0 +1,26 @@
<?php
namespace think\worker\conduit;
abstract class Driver
{
abstract public function prepare();
abstract public function connect();
abstract public function get(string $name);
abstract public function set(string $name, $value);
abstract public function inc(string $name, int $step = 1);
abstract public function sAdd(string $name, ...$value);
abstract public function sRem(string $name, $value);
abstract public function sMembers(string $name);
abstract public function publish(string $name, $value);
abstract public function subscribe(string $name, $callback);
}

View File

@@ -0,0 +1,186 @@
<?php
namespace think\worker\conduit\driver;
use Exception;
use Revolt\EventLoop;
use Revolt\EventLoop\Suspension;
use think\worker\conduit\Driver;
use think\worker\conduit\driver\socket\Command;
use think\worker\conduit\driver\socket\Event;
use think\worker\conduit\driver\socket\Result;
use think\worker\conduit\driver\socket\Server;
use think\worker\Manager;
use Workerman\Connection\AsyncTcpConnection;
use Workerman\Protocols\Frame;
use Workerman\Timer;
class Socket extends Driver
{
protected $id = 0;
protected $domain;
/** @var AsyncTcpConnection|null */
protected $connection = null;
protected $reconnectTimer;
protected $pingInterval = 55;
/** @var array<int, array{0: Suspension, 1: int}> */
protected $suspensions = [];
protected $events = [];
public function __construct(protected Manager $manager)
{
$filename = runtime_path() . 'conduit.sock';
@unlink($filename);
$this->domain = "unix://{$filename}";
}
public function prepare()
{
//启动服务端
Server::run($this->domain);
}
public function connect()
{
$suspension = EventLoop::getSuspension();
$this->connection = $this->createConnection($suspension);
$suspension->suspend();
Timer::add($this->pingInterval, function () {
if ($this->connection) {
$this->connection->send('');
}
});
Timer::add(1, function () {
//检查是否超时
foreach ($this->suspensions as $id => $suspension) {
if (time() - $suspension[1] > 10) {
$suspension[0]->throw(new Exception('conduit connection is timeout'));
unset($this->suspensions[$id]);
}
}
});
}
public function get(string $name)
{
return $this->sendAndRecv(Command::create('get', $name));
}
public function set(string $name, $value)
{
$this->send(Command::create('set', $name, $value));
}
public function inc(string $name, int $step = 1)
{
return $this->sendAndRecv(Command::create('inc', $name, $step));
}
public function sAdd(string $name, ...$value)
{
$this->send(Command::create('sAdd', $name, $value));
}
public function sRem(string $name, $value)
{
$this->send(Command::create('sRem', $name, $value));
}
public function sMembers(string $name)
{
return $this->sendAndRecv(Command::create('sMembers', $name));
}
public function publish(string $name, $value)
{
$this->send(Command::create('publish', $name, $value));
}
public function subscribe(string $name, $callback)
{
$this->send(Command::create('subscribe', $name));
$this->events[$name] = $callback;
}
protected function sendAndRecv(Command $command)
{
$suspension = EventLoop::getSuspension();
$id = $this->id++;
$command->id = $id;
$this->suspensions[$id] = [$suspension, time()];
$this->send($command);
return $suspension->suspend();
}
protected function send(Command $command)
{
if (!$this->connection) {
throw new Exception('conduit connection is disconnected');
}
$this->connection->send(serialize($command));
}
protected function createConnection(?Suspension $suspension = null)
{
$connection = new AsyncTcpConnection($this->domain);
$connection->protocol = Frame::class;
$connection->onConnect = function () use ($suspension) {
$this->clearTimer();
if ($suspension) {
$suspension->resume();
}
//补订阅
foreach ($this->events as $name => $callback) {
$this->send(Command::create('subscribe', $name));
}
};
$connection->onMessage = function ($connection, $buffer) {
/** @var Result|Event $result */
$result = unserialize($buffer);
if ($result instanceof Event) {
if (isset($this->events[$result->name])) {
$this->events[$result->name]($result->data);
}
} elseif (isset($result->id) && isset($this->suspensions[$result->id])) {
[$suspension] = $this->suspensions[$result->id];
$suspension->resume($result->data);
unset($this->suspensions[$result->id]);
}
};
$connection->onClose = function () {
$this->connection = null;
//重连
$this->clearTimer();
$this->reconnectTimer = Timer::add(1, function () {
$this->connection = $this->createConnection();
});
};
$connection->connect();
return $connection;
}
protected function clearTimer()
{
if ($this->reconnectTimer) {
Timer::del($this->reconnectTimer);
$this->reconnectTimer = null;
}
}
}

View File

@@ -0,0 +1,22 @@
<?php
namespace think\worker\conduit\driver\socket;
class Command
{
public $id;
public $name;
public $key;
public $data;
public static function create($name, $key, $data = null)
{
$packet = new self();
$packet->name = $name;
$packet->key = $key;
$packet->data = $data;
return $packet;
}
}

View File

@@ -0,0 +1,15 @@
<?php
namespace think\worker\conduit\driver\socket;
class Event
{
public function __construct(public $name, public $data)
{
}
public static function create($name, $data)
{
return new self($name, $data);
}
}

View File

@@ -0,0 +1,16 @@
<?php
namespace think\worker\conduit\driver\socket;
class Result
{
public function __construct(public $id = null, public $data = null)
{
}
public static function create($id = null)
{
return new self($id);
}
}

View File

@@ -0,0 +1,102 @@
<?php
namespace think\worker\conduit\driver\socket;
use think\worker\Worker;
use Workerman\Connection\TcpConnection;
use Workerman\Protocols\Frame;
class Server
{
protected $data = [];
/** @var array<string,TcpConnection[]> */
protected $subscribers = [];
public function onMessage(TcpConnection $connection, $buffer)
{
if (empty($buffer)) {
return;
}
/** @var Command $command */
$command = unserialize($buffer);
$result = Result::create($command->id);
switch ($command->name) {
case 'get':
$result->data = $this->data[$command->key] ?? null;
break;
case 'set':
$this->data[$command->key] = $command->data;
break;
case 'inc':
if (!isset($this->data[$command->key]) || !is_integer($this->data[$command->key])) {
$this->data[$command->key] = 0;
}
$result->data = $this->data[$command->key] += $command->data ?? 1;
break;
case 'sAdd':
if (!isset($this->data[$command->key]) || !is_array($this->data[$command->key])) {
$this->data[$command->key] = [];
}
$this->data[$command->key] = array_merge($this->data[$command->key], $command->data);
break;
case 'sRem':
if (!isset($this->data[$command->key]) || !is_array($this->data[$command->key])) {
$this->data[$command->key] = [];
}
$this->data[$command->key] = array_diff($this->data[$command->key], [$command->data]);
break;
case 'sMembers':
if (!isset($this->data[$command->key]) || !is_array($this->data[$command->key])) {
$this->data[$command->key] = [];
}
$result->data = $this->data[$command->key];
break;
case 'subscribe':
if (!isset($this->subscribers[$command->key])) {
$this->subscribers[$command->key] = [];
}
$this->subscribers[$command->key][] = $connection;
break;
case 'publish':
if (!empty($this->subscribers[$command->key])) {
foreach ($this->subscribers[$command->key] as $conn) {
$conn->send(serialize(Event::create($command->key, $command->data)));
}
}
break;
}
if (isset($result->id)) {
$connection->send(serialize($result));
}
}
public function onClose(TcpConnection $connection)
{
if (!empty($this->subscribers)) {
foreach ($this->subscribers as $key => $connections) {
$this->subscribers[$key] = array_udiff($connections, [$connection], function ($a, $b) {
return $a <=> $b;
});
}
}
}
public static function run($domain)
{
//启动服务端
$server = new self();
$worker = new Worker($domain);
$worker->name = 'conduit';
$worker->protocol = Frame::class;
$worker->reloadable = false;
$worker->onMessage = [$server, 'onMessage'];
$worker->onClose = [$server, 'onClose'];
}
}