feat: 实现TCP Server

This commit is contained in:
zimoyin
2026-03-02 21:59:43 +08:00
parent 043306819b
commit a79dfae57d
144 changed files with 15785 additions and 140 deletions
+313
View File
@@ -0,0 +1,313 @@
<?php
namespace app\flow\nodes;
use app\flow\config\StepConfig;
use app\flow\ProcessContext;
use app\flow\strategies\ProcessStrategyInterface;
use app\utils\Logger;
/**
* 流程节点抽象基类
* 实现责任链的基础逻辑
* 如果要新增标准流程的节点,请继承本类并实现抽象方法。并且需要配置 ProcessConfig 。无论是运行时新增节点还是静态修改节点配置,都会在运行时生效
*/
abstract class AbstractProcessNode implements ProcessNodeInterface
{
/**
* 下一个处理节点
*/
protected ?ProcessNodeInterface $next = null;
/**
* 是否启用
*/
protected bool $enabled = true;
/**
* 策略列表
* @var ProcessStrategyInterface[]
*/
protected array $strategies = [];
/**
* 节点配置
*/
protected ?StepConfig $config = null;
/**
* 设置下一个节点
*/
public function setNext(ProcessNodeInterface $next): ProcessNodeInterface
{
$this->next = $next;
return $this;
}
/**
* 获取下一个节点
*/
public function getNext(): ?ProcessNodeInterface
{
return $this->next;
}
/**
* 从当前节点开始处理流程链
*/
public function handle(ProcessContext $context): ProcessContext
{
// 如果节点被禁用,直接传递给下一个节点
if (!$this->isEnabled()) {
Logger::debug('[{}-Node] 节点已禁用,跳过', [$this->getCode()]);
return $this->passToNext($context);
}
// 执行前置策略
$context = $this->executeBeforeStrategies($context);
// 如果前置策略返回错误,不再继续
if (!$context->success) {
Logger::debug('[{}-Node] 前置策略拦截 error={}', [
$this->getCode(),
$context->errorMessage,
]);
return $context;
}
// 如果不能处理当前步骤,传递给下一个节点
if (!$this->canHandle($context)) {
Logger::debug('[{}-Node] 不能处理当前步骤,跳过', [$this->getCode()]);
return $this->passToNext($context);
}
// 输出当前节点
Logger::debug('[{}-Node] 开始处理 step={} batch={}', [
$this->getCode(),
$context->currentStep,
$context->batchNo ?: '-',
]);
// 执行节点具体处理逻辑
$context = $this->doHandle($context);
Logger::debug('[{}-Node] 处理完成 step={} batch={} success={}', [
$this->getCode(),
$context->currentStep,
$context->batchNo ?: '-',
$context->success,
]);
// 执行后置策略
$context = $this->executeAfterStrategies($context);
// 后置策略拦截
if (!$context->success) {
Logger::debug('[{}-Node] 后置策略拦截 error={}', [
$this->getCode(),
$context->errorMessage,
]);
return $context;
}
$nextNode = $this->getNext();
// 跳过节点逻辑
for ($i = 0; $i < $context->skipNodeCount; $i++) {
Logger::debug('[{}-Node] 跳过节点 code={}', [$this->getCode(), $nextNode->getCode()]);
$nextNode = $nextNode->getNext();
}
// 传递给下一个节点
return empty($nextNode) ? $context : $nextNode->handle($context);
}
/**
* @return array 需要的前置节点列表
*/
public function getRequiredNodes($default = []): array
{
return (!empty($this->getConfig()->required)) ? $this->getConfig()->required : $default;
}
/**
* 是否是需要的前置节点
* @param string $currentStep
* @param array $default
* @return bool
*/
public function isRequiredNode(string $currentStep, array $default = []): bool
{
return in_array($currentStep, $this->getRequiredNodes($default));
}
/**
* 当前刷的读卡器类型,是否和当前节点的配置匹配
*/
public function isMatchReaderType(ProcessContext $context): bool
{
return $this->getCode() === $context->readerType;
}
/**
* 传递给下一个节点
*/
protected function passToNext(ProcessContext $context): ProcessContext
{
if ($this->next !== null) {
return $this->next->handle($context);
}
return $context;
}
/**
* 停止传递给下一个节点
*/
protected function stopNext(ProcessContext $context): ProcessContext
{
$context->skipNodeCount = count($this->getRemainingNodes());
return $context;
}
/**
* 获取剩余节点
*/
protected function getRemainingNodes(): array
{
$remainingNodes = [];
$node = $this->next;
while ($node !== null) {
$remainingNodes[] = $node;
$node = $node->getNext();
}
return $remainingNodes;
}
/**
* 执行前置策略
*
* 在节点核心逻辑执行之前,按顺序执行所有标记为 'before' 阶段的策略
*
* 执行流程:
* 1. 遍历所有已注册的策略
* 2. 筛选出阶段为 'before' 的策略
* 3. 依次执行策略的 execute 方法
* 4. 如果某个策略导致上下文错误(!$context->isSuccess()),立即中断后续策略执行
*
* 前置策略的应用:
* - 时间验证:检查步骤执行时间是否符合要求
* - 权限检查:验证操作员是否有权限执行该步骤
* - 状态校验:确认流程状态是否允许进入当前步骤
* - 数据准备:为节点处理准备必要的数据
*
* @param ProcessContext $context 流程上下文
*
* @return ProcessContext 经过策略处理后的上下文
* - 如果策略执行成功,返回修改后的上下文
* - 如果策略执行失败,返回包含错误信息的上下文
*
* @see ProcessStrategyInterface::execute() 策略执行接口
* @see ProcessStrategyInterface::getPhase() 获取策略执行阶段
*/
protected function executeBeforeStrategies(ProcessContext $context): ProcessContext
{
foreach ($this->strategies as $strategy) {
if ($strategy->getPhase() === 'before') {
$context = $strategy->execute($context, $this);
if (!$context->success) {
break;
}
}
}
return $context;
}
/**
* 执行后置策略
*/
protected function executeAfterStrategies(ProcessContext $context): ProcessContext
{
foreach ($this->strategies as $strategy) {
if ($strategy->getPhase() === 'after') {
Logger::debug('[{}-Node] 执行后置策略 code={}', [$this->getCode(), $strategy::class]);
$context = $strategy->execute($context, $this);
}
}
return $context;
}
/**
* 具体的处理逻辑,由子类实现
*/
abstract protected function doHandle(ProcessContext $context): ProcessContext;
/**
* 是否启用
*/
public function isEnabled(): bool
{
return $this->enabled;
}
/**
* 设置是否启用
*/
public function setEnabled(bool $enabled): ProcessNodeInterface
{
$this->enabled = $enabled;
return $this;
}
/**
* 添加策略
*/
public function addStrategy(ProcessStrategyInterface $strategy): self
{
$this->strategies[] = $strategy;
return $this;
}
/**
* 设置配置
*/
public function setConfig(StepConfig $config): self
{
$this->config = $config;
return $this;
}
/**
* 获取配置
*/
public function getConfig(): StepConfig
{
return $this->config;
}
/**
* 判断当前节点是否能处理该步骤
* 默认实现:检查当前步骤是否匹配节点编码
*/
public function canHandle(ProcessContext $context): bool
{
return $context->currentStep === $this->getCode();
}
/**
* 获取当前对象地址的哈希值
*/
public function _hash(): string
{
return spl_object_hash($this);
}
/**
* 获取节点名称
*/
abstract static public function getName(): string;
/**
* 获取节点编码
*/
abstract public function getCode(): string;
}