From 005c7756940d9575ac34d8578e31c8137d2dcad9 Mon Sep 17 00:00:00 2001 From: commie Date: Mon, 6 Apr 2026 04:35:35 +0800 Subject: [PATCH] 22 --- app/api/controller/MomentsController.php | 12 +- app/command/CheckConversation.php | 60 +++ app/command/FixOpenimSeq.php | 488 +++++++++++++++++++++++ check_conversation.php | 52 +++ 4 files changed, 608 insertions(+), 4 deletions(-) create mode 100644 app/command/CheckConversation.php create mode 100644 app/command/FixOpenimSeq.php create mode 100644 check_conversation.php diff --git a/app/api/controller/MomentsController.php b/app/api/controller/MomentsController.php index 2ea5a9b..0e7a8ed 100644 --- a/app/api/controller/MomentsController.php +++ b/app/api/controller/MomentsController.php @@ -160,12 +160,16 @@ class MomentsController extends BaseController $item->user->avatar = cdnurl($item->user->avatar); } - foreach ($item->likes as &$like) { - if (!empty($like['avatar'])) { - $like['avatar'] = cdnurl($like['avatar']); + $likes = $item->likes; + if (is_array($likes) || $likes instanceof \Traversable) { + foreach ($likes as &$like) { + if (!empty($like['avatar'])) { + $like['avatar'] = cdnurl($like['avatar']); + } } + unset($like); + $item->likes = $likes; } - unset($like); foreach ($item->comments as $comment) { if ($comment->user && $comment->user->avatar) { diff --git a/app/command/CheckConversation.php b/app/command/CheckConversation.php new file mode 100644 index 0000000..9e27897 --- /dev/null +++ b/app/command/CheckConversation.php @@ -0,0 +1,60 @@ +where('conversation_id', $conversationId) + ->where('owner_user_id', $ownerUserId) + ->find(); + + if ($conv) { + cp("找到记录:"); + print_r($conv->toArray()); + cp(""); + if (isset($conv['max_seq'])) { + cp("max_seq: " . $conv['max_seq']); + } else { + cp("max_seq 不存在"); + } + if (isset($conv['min_seq'])) { + cp("min_seq: " . $conv['min_seq']); + } else { + cp("min_seq 不存在"); + } + } else { + cp("未找到记录"); + cp("\n查找同一 conversation_id 的其他记录:"); + $allConvs = $convModel->where('conversation_id', $conversationId)->select(); + foreach ($allConvs as $c) { + cp("owner_user_id: " . ($c['owner_user_id'] ?? 'null')); + cp(" max_seq: " . ($c['max_seq'] ?? 'null')); + cp(" min_seq: " . ($c['min_seq'] ?? 'null')); + } + } + + cp("\n=== 检查 seq 表 ==="); + $seqModel = new \app\model\Openim\Seq(); + $seq = $seqModel->where('conversation_id', $conversationId)->find(); + if ($seq) { + cp("conversation_id: " . $seq['conversation_id']); + cp("max_seq: " . $seq['max_seq']); + cp("min_seq: " . $seq['min_seq']); + } + + return 0; + } +} diff --git a/app/command/FixOpenimSeq.php b/app/command/FixOpenimSeq.php new file mode 100644 index 0000000..b4f5e5a --- /dev/null +++ b/app/command/FixOpenimSeq.php @@ -0,0 +1,488 @@ +addOption('execute', 'e', InputOption::VALUE_NONE, '实际执行修复(默认只预览)'); + $this->addOption('backup-dir', 'b', InputOption::VALUE_OPTIONAL, '备份目录', '/tmp/openim_seq_backup'); + $this->addOption('step', 's', InputOption::VALUE_OPTIONAL, '执行步骤: analyze, backup, fix, all', 'all'); + } + + protected function execute(InputInterface $input, OutputInterface $output): int + { + $this->dryRun = !$input->getOption('execute'); + $this->backupDir = $input->getOption('backup-dir'); + $step = $input->getOption('step'); + + if ($this->dryRun) { + $this->log("╔════════════════════════════════════════════════════════════╗"); + $this->log("║ 预览模式(不会修改数据) ║"); + $this->log("║ 使用 --execute 参数实际执行修复 ║"); + $this->log("╚════════════════════════════════════════════════════════════╝"); + } else { + $this->log("╔════════════════════════════════════════════════════════════╗"); + $this->log("║ 执行模式 - 开始修复 ║"); + $this->log("╚════════════════════════════════════════════════════════════╝"); + } + $this->log(""); + + $this->log("【OpenIM seq 设计原理】"); + $this->log(" - seq 表: 存储每个会话的全局 max_seq/min_seq"); + $this->log(" - seq_user 表: 存储每个用户在每个会话中的 max_seq/min_seq/read_seq"); + $this->log(" - conversation 表: 会话信息,冗余存储 max_seq/min_seq"); + $this->log(" - read_seq 不能大于 max_seq(否则未读数变负)"); + $this->log(""); + + switch ($step) { + case 'analyze': + $this->analyzeAll(); + break; + case 'backup': + $this->backupAll(); + break; + case 'fix': + if (empty($this->conversationSeqMap)) { + $this->log("需要先分析消息数据..."); + $this->analyzeMsgSeq(); + } + $this->fixAll(); + break; + default: + $this->analyzeAll(); + if (!$this->dryRun) { + $this->backupAll(); + $this->fixAll(); + } + } + + return 0; + } + + private function getConversationId(array $msg): ?string + { + $sessionType = $msg['session_type'] ?? null; + $sendId = $msg['send_id'] ?? ''; + $recvId = $msg['recv_id'] ?? ''; + $groupId = $msg['group_id'] ?? ''; + + if ($sessionType === 3 || !empty($groupId)) { + return 'sg_' . $groupId; + } + + if ($sessionType === 1 || ($sendId && $recvId)) { + $ids = [(int)$sendId, (int)$recvId]; + sort($ids); + return 'si_' . $ids[0] . '_' . $ids[1]; + } + + return null; + } + + private function analyzeAll(): void + { + $this->log("═══════════════════════════════════════════════════════════"); + $this->log(" 第一步:分析数据 "); + $this->log("═══════════════════════════════════════════════════════════"); + $this->log(""); + + $this->log("【1. seq 表分析】"); + $seqCount = \app\model\Openim\Seq::count(); + $this->log(" 总记录数: {$seqCount}"); + $seqSample = \app\model\Openim\Seq::limit(5)->select()->toArray(); + foreach ($seqSample as $i => $row) { + $this->log(" 样本[{$i}]: conversation_id={$row['conversation_id']}, max_seq={$row['max_seq']}, min_seq={$row['min_seq']}"); + } + $this->log(""); + + $this->log("【2. seq_user 表分析】"); + $seqUserCount = \app\model\Openim\SeqUser::count(); + $this->log(" 总记录数: {$seqUserCount}"); + $seqUserSample = \app\model\Openim\SeqUser::limit(5)->select()->toArray(); + foreach ($seqUserSample as $i => $row) { + $this->log(" 样本[{$i}]: conversation_id={$row['conversation_id']}, user_id={$row['user_id']}, max_seq={$row['max_seq']}, min_seq={$row['min_seq']}, read_seq={$row['read_seq']}"); + } + $this->log(""); + + $this->log("【3. conversation 表分析】"); + $conversationCount = \app\model\Openim\Conversation::count(); + $this->log(" 总记录数: {$conversationCount}"); + $conversationSample = \app\model\Openim\Conversation::limit(5)->select()->toArray(); + foreach ($conversationSample as $i => $row) { + $this->log(" 样本[{$i}]: conversation_id={$row['conversation_id']}, owner_user_id={$row['owner_user_id']}, max_seq=" . ($row['max_seq'] ?? 'null') . ", min_seq=" . ($row['min_seq'] ?? 'null')); + } + $this->log(""); + + $this->log("【4. msg 表分析 - 统计每个会话的 seq 范围】"); + $this->analyzeMsgSeq(); + $this->log(""); + } + + private function analyzeMsgSeq(): void + { + $msgCount = \app\model\Openim\Msg::count(); + $this->log(" 消息文档数: {$msgCount}"); + $this->log(" 正在分析消息中的 seq 范围..."); + + $sampleDoc = \app\model\Openim\Msg::limit(1)->select()->first(); + if ($sampleDoc) { + $this->log(" 样本文档 doc_id: " . ($sampleDoc['doc_id'] ?? 'null')); + $msgsArray = $sampleDoc['msgs']; + if ($msgsArray instanceof \think\model\Collection) { + $msgsArray = $msgsArray->toArray(); + } + $this->log(" 样本文档 msgs 数量: " . count($msgsArray)); + if (count($msgsArray) > 0) { + $firstMsg = $msgsArray[0]; + if (isset($firstMsg['msg'])) { + $msg = $firstMsg['msg']; + $this->log(" 第一条消息 session_type: " . ($msg['session_type'] ?? 'null')); + $this->log(" 第一条消息 send_id: " . ($msg['send_id'] ?? 'null')); + $this->log(" 第一条消息 recv_id: " . ($msg['recv_id'] ?? 'null')); + $this->log(" 第一条消息 group_id: " . ($msg['group_id'] ?? 'null')); + $this->log(" 第一条消息 seq: " . ($msg['seq'] ?? 'null')); + $conversationId = $this->getConversationId($msg); + $this->log(" 计算得到的 conversationID: {$conversationId}"); + } + } + } + $this->log(""); + + $processedDocs = 0; + $totalMsgs = 0; + + $msgs = \app\model\Openim\Msg::select(); + foreach ($msgs as $doc) { + $processedDocs++; + $msgsArray = $doc['msgs']; + if ($msgsArray instanceof \think\model\Collection) { + $msgsArray = $msgsArray->toArray(); + } + if (!empty($msgsArray) && is_array($msgsArray)) { + foreach ($msgsArray as $msgItem) { + if (isset($msgItem['msg'])) { + $msg = $msgItem['msg']; + $conversationId = $this->getConversationId($msg); + $seq = $msg['seq'] ?? null; + + if ($conversationId && $seq !== null) { + $totalMsgs++; + if (!isset($this->conversationSeqMap[$conversationId])) { + $this->conversationSeqMap[$conversationId] = [ + 'min_seq' => $seq, + 'max_seq' => $seq, + 'count' => 0 + ]; + } + $this->conversationSeqMap[$conversationId]['min_seq'] = min($this->conversationSeqMap[$conversationId]['min_seq'], $seq); + $this->conversationSeqMap[$conversationId]['max_seq'] = max($this->conversationSeqMap[$conversationId]['max_seq'], $seq); + $this->conversationSeqMap[$conversationId]['count']++; + } + } + } + } + + if ($processedDocs % 100 == 0) { + $this->log(" 已处理 {$processedDocs}/{$msgCount} 个文档..."); + } + } + + $this->log(" 处理完成!共处理 {$processedDocs} 个文档,{$totalMsgs} 条消息"); + $this->log(" 发现 " . count($this->conversationSeqMap) . " 个会话"); + $this->log(""); + + $this->log("【5. 会话 seq 范围统计(前10个)】"); + $i = 0; + foreach ($this->conversationSeqMap as $conversationId => $seqInfo) { + if ($i++ >= 10) break; + $this->log(" {$conversationId}: min_seq={$seqInfo['min_seq']}, max_seq={$seqInfo['max_seq']}, msg_count={$seqInfo['count']}"); + } + $this->log(""); + + $this->log("【6. 检测问题数据】"); + $this->detectProblematicData(); + } + + private function detectProblematicData(): void + { + $seqProblems = 0; + $seqUserProblems = 0; + $conversationProblems = 0; + + $this->log(" 正在检测 seq 表..."); + $total = \app\model\Openim\Seq::count(); + $batchSize = 1000; + for ($offset = 0; $offset < $total; $offset += $batchSize) { + $seqRecords = \app\model\Openim\Seq::field('conversation_id,max_seq,min_seq')->limit($offset, $batchSize)->select()->toArray(); + foreach ($seqRecords as $seq) { + $conversationId = $seq['conversation_id']; + if (isset($this->conversationSeqMap[$conversationId])) { + $expected = $this->conversationSeqMap[$conversationId]; + if ($seq['max_seq'] != $expected['max_seq'] || $seq['min_seq'] != $expected['min_seq']) { + $seqProblems++; + } + } + } + $seqRecords = null; + gc_collect_cycles(); + } + $this->log(" seq 表问题记录: {$seqProblems} 条(max_seq/min_seq 与实际消息不符)"); + + $this->log(" 正在检测 seq_user 表..."); + $total = \app\model\Openim\SeqUser::count(); + for ($offset = 0; $offset < $total; $offset += $batchSize) { + $seqUserRecords = \app\model\Openim\SeqUser::field('conversation_id,user_id,max_seq,min_seq,read_seq')->limit($offset, $batchSize)->select()->toArray(); + foreach ($seqUserRecords as $seqUser) { + $conversationId = $seqUser['conversation_id']; + if (isset($this->conversationSeqMap[$conversationId])) { + $expected = $this->conversationSeqMap[$conversationId]; + if ($seqUser['max_seq'] != $expected['max_seq'] || + $seqUser['min_seq'] != $expected['min_seq'] || + $seqUser['read_seq'] > $expected['max_seq']) { + $seqUserProblems++; + } + } + } + $seqUserRecords = null; + gc_collect_cycles(); + } + $this->log(" seq_user 表问题记录: {$seqUserProblems} 条(seq 不符或 read_seq > max_seq)"); + + $this->log(" 正在检测 conversation 表..."); + $total = \app\model\Openim\Conversation::count(); + for ($offset = 0; $offset < $total; $offset += $batchSize) { + $conversationRecords = \app\model\Openim\Conversation::field('conversation_id,max_seq,min_seq')->limit($offset, $batchSize)->select()->toArray(); + foreach ($conversationRecords as $conversation) { + $conversationId = $conversation['conversation_id']; + if (isset($this->conversationSeqMap[$conversationId])) { + $expected = $this->conversationSeqMap[$conversationId]; + $currentMax = $conversation['max_seq'] ?? 0; + $currentMin = $conversation['min_seq'] ?? 0; + if ($currentMax != $expected['max_seq'] || $currentMin != $expected['min_seq']) { + $conversationProblems++; + } + } + } + $conversationRecords = null; + gc_collect_cycles(); + } + $this->log(" conversation 表问题记录: {$conversationProblems} 条(max_seq/min_seq 与实际消息不符)"); + $this->log(""); + } + + private function backupAll(): void + { + $this->log("═══════════════════════════════════════════════════════════"); + $this->log(" 第二步:备份数据 "); + $this->log("═══════════════════════════════════════════════════════════"); + $this->log(""); + + if (!is_dir($this->backupDir)) { + mkdir($this->backupDir, 0755, true); + } + + $timestamp = date('Ymd_His'); + $backupPath = $this->backupDir . '/backup_' . $timestamp; + mkdir($backupPath, 0755, true); + + $this->log("备份目录: {$backupPath}"); + $this->log(""); + + $this->log("【1. 备份 seq 表】"); + $total = \app\model\Openim\Seq::count(); + $batchSize = 1000; + $seqData = []; + for ($offset = 0; $offset < $total; $offset += $batchSize) { + $batch = \app\model\Openim\Seq::limit($offset, $batchSize)->select()->toArray(); + $seqData = array_merge($seqData, $batch); + $batch = null; + gc_collect_cycles(); + } + file_put_contents($backupPath . '/seq.json', json_encode($seqData, JSON_UNESCAPED_UNICODE | JSON_PRETTY_PRINT)); + $this->log(" 已备份 " . count($seqData) . " 条记录 -> seq.json"); + $seqData = null; + + $this->log("【2. 备份 seq_user 表】"); + $total = \app\model\Openim\SeqUser::count(); + $seqUserData = []; + for ($offset = 0; $offset < $total; $offset += $batchSize) { + $batch = \app\model\Openim\SeqUser::limit($offset, $batchSize)->select()->toArray(); + $seqUserData = array_merge($seqUserData, $batch); + $batch = null; + gc_collect_cycles(); + } + file_put_contents($backupPath . '/seq_user.json', json_encode($seqUserData, JSON_UNESCAPED_UNICODE | JSON_PRETTY_PRINT)); + $this->log(" 已备份 " . count($seqUserData) . " 条记录 -> seq_user.json"); + $seqUserData = null; + + $this->log("【3. 备份 conversation 表】"); + $total = \app\model\Openim\Conversation::count(); + $conversationData = []; + for ($offset = 0; $offset < $total; $offset += $batchSize) { + $batch = \app\model\Openim\Conversation::limit($offset, $batchSize)->select()->toArray(); + $conversationData = array_merge($conversationData, $batch); + $batch = null; + gc_collect_cycles(); + } + file_put_contents($backupPath . '/conversation.json', json_encode($conversationData, JSON_UNESCAPED_UNICODE | JSON_PRETTY_PRINT)); + $this->log(" 已备份 " . count($conversationData) . " 条记录 -> conversation.json"); + $conversationData = null; + + $this->log(""); + $this->log("✓ 备份完成!备份文件保存在: {$backupPath}"); + $this->log(""); + } + + private function fixAll(): void + { + $this->log("═══════════════════════════════════════════════════════════"); + $this->log(" 第三步:修复数据 "); + $this->log("═══════════════════════════════════════════════════════════"); + $this->log(""); + + if (empty($this->conversationSeqMap)) { + $this->log("错误:缺少会话 seq 数据,请先运行分析步骤"); + return; + } + + $this->log("【1. 修复 seq 表】"); + $this->log(" 原理:根据消息表中的实际 seq 更新 max_seq 和 min_seq"); + $seqFixed = 0; + $seqCreated = 0; + foreach ($this->conversationSeqMap as $conversationId => $seqInfo) { + $existing = \app\model\Openim\Seq::where('conversation_id', $conversationId)->find(); + + if ($existing) { + $oldMax = $existing['max_seq']; + $oldMin = $existing['min_seq']; + if ($oldMax != $seqInfo['max_seq'] || $oldMin != $seqInfo['min_seq']) { + $existing->max_seq = $seqInfo['max_seq']; + $existing->min_seq = $seqInfo['min_seq']; + $existing->save(); + $this->log(" 更新 {$conversationId}: max_seq {$oldMax} -> {$seqInfo['max_seq']}, min_seq {$oldMin} -> {$seqInfo['min_seq']}"); + $seqFixed++; + } + } else { + \app\model\Openim\Seq::create([ + 'conversation_id' => $conversationId, + 'max_seq' => $seqInfo['max_seq'], + 'min_seq' => $seqInfo['min_seq'], + ]); + $this->log(" 创建 {$conversationId}: max_seq={$seqInfo['max_seq']}, min_seq={$seqInfo['min_seq']}"); + $seqCreated++; + } + } + $this->log(" 完成:更新 {$seqFixed} 条,新建 {$seqCreated} 条"); + $this->log(""); + + $this->log("【2. 修复 seq_user 表】"); + $this->log(" 原理:更新 max_seq/min_seq 为会话值,确保 read_seq <= max_seq"); + $seqUserFixed = 0; + $seqUserReadFixed = 0; + + $total = \app\model\Openim\SeqUser::count(); + $batchSize = 500; + for ($offset = 0; $offset < $total; $offset += $batchSize) { + $seqUsers = \app\model\Openim\SeqUser::limit($offset, $batchSize)->select(); + foreach ($seqUsers as $seqUser) { + $conversationId = $seqUser['conversation_id']; + if (isset($this->conversationSeqMap[$conversationId])) { + $seqInfo = $this->conversationSeqMap[$conversationId]; + $oldMax = $seqUser['max_seq']; + $oldMin = $seqUser['min_seq']; + $oldRead = $seqUser['read_seq']; + $needSave = false; + + if ($oldMax != $seqInfo['max_seq'] || $oldMin != $seqInfo['min_seq']) { + $seqUser->max_seq = $seqInfo['max_seq']; + $seqUser->min_seq = $seqInfo['min_seq']; + $needSave = true; + } + + if ($oldRead > $seqInfo['max_seq']) { + $seqUser->read_seq = $seqInfo['max_seq']; + $this->log(" 修正 read_seq {$conversationId}/{$seqUser['user_id']}: {$oldRead} -> {$seqInfo['max_seq']}"); + $seqUserReadFixed++; + $needSave = true; + } + + if ($needSave) { + $seqUser->save(); + $seqUserFixed++; + } + } + } + $seqUsers = null; + gc_collect_cycles(); + } + $this->log(" 完成:更新 {$seqUserFixed} 条,其中修正 read_seq {$seqUserReadFixed} 条"); + $this->log(""); + + $this->log("【3. 修复 conversation 表】"); + $this->log(" 原理:根据消息表中的实际 seq 更新 max_seq 和 min_seq"); + $conversationFixed = 0; + $conversationNotInMap = 0; + $total = \app\model\Openim\Conversation::count(); + $debugLogged = false; + for ($offset = 0; $offset < $total; $offset += $batchSize) { + $conversations = \app\model\Openim\Conversation::limit($offset, $batchSize)->select(); + foreach ($conversations as $conversation) { + $conversationId = $conversation['conversation_id']; + if (isset($this->conversationSeqMap[$conversationId])) { + $seqInfo = $this->conversationSeqMap[$conversationId]; + $oldMax = $conversation['max_seq'] ?? 0; + $oldMin = $conversation['min_seq'] ?? 0; + + if ($oldMax != $seqInfo['max_seq'] || $oldMin != $seqInfo['min_seq']) { + $data = [ + 'max_seq' => (int)$seqInfo['max_seq'], + 'min_seq' => (int)$seqInfo['min_seq'], + ]; + $result = \app\model\Openim\Conversation::where('conversation_id', $conversationId) + ->where('owner_user_id', $conversation['owner_user_id']) + ->update($data); + if ($result) { + $conversationFixed++; + } else { + $this->log(" 警告: 更新失败 {$conversationId} / {$conversation['owner_user_id']}"); + } + } + } else { + $conversationNotInMap++; + } + } + $conversations = null; + gc_collect_cycles(); + } + $this->log(" 完成:更新 {$conversationFixed} 条,不在消息映射中 {$conversationNotInMap} 条"); + $this->log(""); + + $this->log("═══════════════════════════════════════════════════════════"); + $this->log(" 修复完成! "); + $this->log("═══════════════════════════════════════════════════════════"); + $this->log(""); + $this->log("修复统计:"); + $this->log(" - seq 表: 更新 {$seqFixed} 条,新建 {$seqCreated} 条"); + $this->log(" - seq_user 表: 更新 {$seqUserFixed} 条,修正 read_seq {$seqUserReadFixed} 条"); + $this->log(" - conversation 表: 更新 {$conversationFixed} 条"); + } + + private function log(string $message): void + { + echo $message . "\n"; + } +} diff --git a/check_conversation.php b/check_conversation.php new file mode 100644 index 0000000..d01e208 --- /dev/null +++ b/check_conversation.php @@ -0,0 +1,52 @@ +selectDatabase($mongoConfig['database']); + +$conversationId = 'sg_2639473367'; +$ownerUserId = '83484627'; + +echo "=== 检查 conversation 表记录 ===\n"; +$convCollection = $db->selectCollection('conversation'); +$conv = $convCollection->findOne([ + 'conversation_id' => $conversationId, + 'owner_user_id' => $ownerUserId +]); + +if ($conv) { + echo "找到记录:\n"; + print_r($conv); + echo "\n"; + if (isset($conv['max_seq'])) { + echo "max_seq: " . $conv['max_seq'] . "\n"; + } else { + echo "max_seq 不存在\n"; + } + if (isset($conv['min_seq'])) { + echo "min_seq: " . $conv['min_seq'] . "\n"; + } else { + echo "min_seq 不存在\n"; + } +} else { + echo "未找到记录\n"; + echo "\n查找同一 conversation_id 的其他记录:\n"; + $allConvs = $convCollection->find(['conversation_id' => $conversationId])->toArray(); + foreach ($allConvs as $c) { + echo "owner_user_id: " . ($c['owner_user_id'] ?? 'null') . "\n"; + echo " max_seq: " . ($c['max_seq'] ?? 'null') . "\n"; + echo " min_seq: " . ($c['min_seq'] ?? 'null') . "\n"; + } +} + +echo "\n=== 检查 seq 表 ===\n"; +$seqCollection = $db->selectCollection('seq'); +$seq = $seqCollection->findOne(['conversation_id' => $conversationId]); +if ($seq) { + echo "conversation_id: " . $seq['conversation_id'] . "\n"; + echo "max_seq: " . $seq['max_seq'] . "\n"; + echo "min_seq: " . $seq['min_seq'] . "\n"; +}