Files
im/app/command/FixOpenimSeq.php
2026-04-10 18:04:33 +08:00

274 lines
11 KiB
PHP

<?php
namespace app\command;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
class FixOpenimSeq extends Command
{
protected static $defaultName = 'fix:openim:seq';
protected static $defaultDescription = '修复 OpenIM MongoDB seq 相关字段';
private $conversationSeqMap = [];
protected function configure(): void
{
}
protected function execute(InputInterface $input, OutputInterface $output): int
{
$this->log("╔════════════════════════════════════════════════════════════╗");
$this->log("║ 开始修复 OpenIM Seq 数据 ║");
$this->log("╚════════════════════════════════════════════════════════════╝");
$this->log("");
// 分析数据(获取会话 seq 信息)
$this->analyzeMsgSeq();
// 执行修复
$this->fixAll();
return 0;
}
private function getConversationId(array $msg): ?string
{
$sessionType = $msg['session_type'] ?? null;
$sendId = $msg['send_id'] ?? '';
$recvId = $msg['recv_id'] ?? '';
$groupId = $msg['group_id'] ?? '';
if ($sessionType === 3 || !empty($groupId)) {
return 'sg_' . $groupId;
}
if ($sessionType === 1 || ($sendId && $recvId)) {
$ids = [(int)$sendId, (int)$recvId];
sort($ids);
return 'si_' . $ids[0] . '_' . $ids[1];
}
return null;
}
private function analyzeMsgSeq(): void
{
$msgCount = \app\model\Openim\Msg::count();
$this->log("分析消息数据...");
$this->log("消息文档数: {$msgCount}");
$processedDocs = 0;
$totalMsgs = 0;
$msgs = \app\model\Openim\Msg::select();
foreach ($msgs as $doc) {
$processedDocs++;
$msgsArray = $doc['msgs'];
if ($msgsArray instanceof \think\model\Collection) {
$msgsArray = $msgsArray->toArray();
}
if (!empty($msgsArray) && is_array($msgsArray)) {
foreach ($msgsArray as $msgItem) {
if (isset($msgItem['msg'])) {
$msg = $msgItem['msg'];
$conversationId = $this->getConversationId($msg);
$seq = $msg['seq'] ?? null;
if ($conversationId && $seq !== null) {
$totalMsgs++;
if (!isset($this->conversationSeqMap[$conversationId])) {
$this->conversationSeqMap[$conversationId] = [
'min_seq' => $seq,
'max_seq' => $seq,
'count' => 0
];
}
$this->conversationSeqMap[$conversationId]['min_seq'] = min($this->conversationSeqMap[$conversationId]['min_seq'], $seq);
$this->conversationSeqMap[$conversationId]['max_seq'] = max($this->conversationSeqMap[$conversationId]['max_seq'], $seq);
$this->conversationSeqMap[$conversationId]['count']++;
}
}
}
}
if ($processedDocs % 100 == 0) {
$this->log("已处理 {$processedDocs}/{$msgCount} 个文档...");
}
}
$this->log("处理完成!共处理 {$processedDocs} 个文档,{$totalMsgs} 条消息");
$this->log("发现 " . count($this->conversationSeqMap) . " 个会话");
$this->log("");
}
private function fixAll(): void
{
$this->log("开始修复数据...");
$this->log("");
if (empty($this->conversationSeqMap)) {
$this->log("错误:缺少会话 seq 数据");
return;
}
$seqFixed = 0;
$seqCreated = 0;
$seqUserFixed = 0;
$conversationFixed = 0;
foreach ($this->conversationSeqMap as $conversationId => $seqInfo) {
// 修复 seq 表
$existing = \app\model\Openim\Seq::where('conversation_id', $conversationId)->find();
if ($existing) {
$max_seq = 0;
if(str_starts_with($conversationId,'sg_')){
$max_seq = ceil($seqInfo['max_seq']/100)*100+1;
}else{
$max_seq = ceil($seqInfo['max_seq']/50)*50+1;
}
$existing->max_seq = $max_seq;
$existing->min_seq = 0;
$existing->save();
$seqFixed++;
} else {
}
// 修复 seq_user 表
\app\model\Openim\SeqUser::where('conversation_id', $conversationId)->update([
'max_seq' => 0,
'min_seq' => 0,
]);
\app\model\Openim\SeqUser::where('conversation_id', $conversationId)
->where('read_seq','<>',$seqInfo['max_seq'])
->update([
'read_seq' => $seqInfo['max_seq'],
]);
// 修复 conversation 表
\app\model\Openim\Conversation::where('min_seq', '>',0)->update([
'max_seq' => 0,
'min_seq' => 0,
]);
}
$this->log("修复完成!");
$this->log("- seq 表: 更新 {$seqFixed} 条,新建 {$seqCreated}");
$this->log("- seq_user 表: 更新 {$seqUserFixed}");
$this->log("- conversation 表: 更新 {$conversationFixed}");
$this->log("");
}
private function log(string $message): void
{
echo $message . "\n";
}
/**
* 重置 seq 相关字段并重新计算
*/
public function fix_seq(): void
{
$this->log("\n═══════════════════════════════════════════════════════════");
$this->log(" 执行 fix_seq 方法 ");
$this->log("═══════════════════════════════════════════════════════════");
// 1. 获取所有会话ID
$conversationIds = [];
// 从 seq 表获取所有会话ID
$seqRecords = \app\model\Openim\Seq::field('conversation_id')->select()->toArray();
foreach ($seqRecords as $record) {
$conversationIds[] = $record['conversation_id'];
}
// 去重
$conversationIds = array_unique($conversationIds);
$totalConversations = count($conversationIds);
$this->log("发现 {$totalConversations} 个会话");
$processed = 0;
foreach ($conversationIds as $conversationId) {
cp('更新:'.$conversationId);
continue;
$processed++;
$this->log("\n处理会话 {$conversationId} ({$processed}/{$totalConversations})");
// 2. 计算变量A
$msgCount = \app\model\Openim\Msg::whereLike('doc_id', "{$conversationId}%")->count();
$multiplier = strpos($conversationId, 'sg_') === 0 ? 100 : 50;
$baseA = $msgCount * $multiplier + 1;
// 确保 A 是 1, 51, 101 等递增格式
$remainder = $baseA % $multiplier;
if ($remainder != 1) {
$baseA = $baseA - $remainder + 1;
}
// 3. 获取最后一条消息的 seq
$lastSeq = 0;
$msgDocs = \app\model\Openim\Msg::whereLike('doc_id', "{$conversationId}%")->select();
foreach ($msgDocs as $doc) {
$msgsArray = $doc['msgs'];
if ($msgsArray instanceof \think\model\Collection) {
$msgsArray = $msgsArray->toArray();
}
if (!empty($msgsArray) && is_array($msgsArray)) {
foreach ($msgsArray as $msgItem) {
if (isset($msgItem['msg']['seq'])) {
$lastSeq = max($lastSeq, (int)$msgItem['msg']['seq']);
}
}
}
}
// 确保 A 大于最后一条消息的 seq
if ($baseA <= $lastSeq) {
$baseA = $lastSeq + 50 - ($lastSeq % 50) + 1;
if ($baseA % 50 != 1) {
$baseA += 1;
}
}
$this->log(" - 消息记录数: {$msgCount}");
$this->log(" - 乘数: {$multiplier}");
$this->log(" - 最后消息 seq: {$lastSeq}");
$this->log(" - 计算变量 A: {$baseA}");
// 4. 更新 seq 表
$seq = \app\model\Openim\Seq::where('conversation_id', $conversationId)->find();
if ($seq) {
$seq->max_seq = $baseA;
$seq->min_seq = 0;
$seq->save();
$this->log(" - 更新 seq 表: max_seq={$baseA}, min_seq=0");
}
// 5. 更新 conversation 表
$conversations = \app\model\Openim\Conversation::where('conversation_id', $conversationId)->select();
foreach ($conversations as $conversation) {
$conversation->max_seq = 0;
$conversation->min_seq = 0;
$conversation->save();
}
$this->log(" - 更新 conversation 表: max_seq=0, min_seq=0");
// 6. 更新 seq_user 表
$seqUsers = \app\model\Openim\SeqUser::where('conversation_id', $conversationId)->select();
foreach ($seqUsers as $seqUser) {
cp('更新:'.$conversationId);
$seqUser->max_seq = 0;
$seqUser->min_seq = 0;
$seqUser->read_seq = $lastSeq;
$seqUser->save();
}
$this->log(" - 更新 seq_user 表: max_seq=0, min_seq=0, read_seq={$lastSeq}");
}
$this->log("\n═══════════════════════════════════════════════════════════");
$this->log(" fix_seq 方法执行完成 ");
$this->log("═══════════════════════════════════════════════════════════");
}
}