修复
This commit is contained in:
+152
-367
@@ -12,63 +12,23 @@ class FixOpenimSeq extends Command
|
|||||||
protected static $defaultName = 'fix:openim:seq';
|
protected static $defaultName = 'fix:openim:seq';
|
||||||
protected static $defaultDescription = '修复 OpenIM MongoDB seq 相关字段';
|
protected static $defaultDescription = '修复 OpenIM MongoDB seq 相关字段';
|
||||||
|
|
||||||
private $dryRun = true;
|
|
||||||
private $backupDir = '';
|
|
||||||
private $conversationSeqMap = [];
|
private $conversationSeqMap = [];
|
||||||
|
|
||||||
protected function configure(): void
|
protected function configure(): void
|
||||||
{
|
{
|
||||||
$this->addOption('execute', 'e', InputOption::VALUE_NONE, '实际执行修复(默认只预览)');
|
|
||||||
$this->addOption('backup-dir', 'b', InputOption::VALUE_OPTIONAL, '备份目录', '/tmp/openim_seq_backup');
|
|
||||||
$this->addOption('step', 's', InputOption::VALUE_OPTIONAL, '执行步骤: analyze, backup, fix, all', 'all');
|
|
||||||
}
|
}
|
||||||
|
|
||||||
protected function execute(InputInterface $input, OutputInterface $output): int
|
protected function execute(InputInterface $input, OutputInterface $output): int
|
||||||
{
|
{
|
||||||
$this->dryRun = !$input->getOption('execute');
|
|
||||||
$this->backupDir = $input->getOption('backup-dir');
|
|
||||||
$step = $input->getOption('step');
|
|
||||||
|
|
||||||
if ($this->dryRun) {
|
|
||||||
$this->log("╔════════════════════════════════════════════════════════════╗");
|
$this->log("╔════════════════════════════════════════════════════════════╗");
|
||||||
$this->log("║ 预览模式(不会修改数据) ║");
|
$this->log("║ 开始修复 OpenIM Seq 数据 ║");
|
||||||
$this->log("║ 使用 --execute 参数实际执行修复 ║");
|
|
||||||
$this->log("╚════════════════════════════════════════════════════════════╝");
|
$this->log("╚════════════════════════════════════════════════════════════╝");
|
||||||
} else {
|
|
||||||
$this->log("╔════════════════════════════════════════════════════════════╗");
|
|
||||||
$this->log("║ 执行模式 - 开始修复 ║");
|
|
||||||
$this->log("╚════════════════════════════════════════════════════════════╝");
|
|
||||||
}
|
|
||||||
$this->log("");
|
$this->log("");
|
||||||
|
|
||||||
$this->log("【OpenIM seq 设计原理】");
|
// 分析数据(获取会话 seq 信息)
|
||||||
$this->log(" - seq 表: 存储每个会话的全局 max_seq/min_seq");
|
|
||||||
$this->log(" - seq_user 表: 存储每个用户在每个会话中的 max_seq/min_seq/read_seq");
|
|
||||||
$this->log(" - conversation 表: 会话信息,冗余存储 max_seq/min_seq");
|
|
||||||
$this->log(" - read_seq 不能大于 max_seq(否则未读数变负)");
|
|
||||||
$this->log("");
|
|
||||||
|
|
||||||
switch ($step) {
|
|
||||||
case 'analyze':
|
|
||||||
$this->analyzeAll();
|
|
||||||
break;
|
|
||||||
case 'backup':
|
|
||||||
$this->backupAll();
|
|
||||||
break;
|
|
||||||
case 'fix':
|
|
||||||
if (empty($this->conversationSeqMap)) {
|
|
||||||
$this->log("需要先分析消息数据...");
|
|
||||||
$this->analyzeMsgSeq();
|
$this->analyzeMsgSeq();
|
||||||
}
|
// 执行修复
|
||||||
$this->fixAll();
|
$this->fixAll();
|
||||||
break;
|
|
||||||
default:
|
|
||||||
$this->analyzeAll();
|
|
||||||
if (!$this->dryRun) {
|
|
||||||
$this->backupAll();
|
|
||||||
$this->fixAll();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
@@ -93,74 +53,11 @@ class FixOpenimSeq extends Command
|
|||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
private function analyzeAll(): void
|
|
||||||
{
|
|
||||||
$this->log("═══════════════════════════════════════════════════════════");
|
|
||||||
$this->log(" 第一步:分析数据 ");
|
|
||||||
$this->log("═══════════════════════════════════════════════════════════");
|
|
||||||
$this->log("");
|
|
||||||
|
|
||||||
$this->log("【1. seq 表分析】");
|
|
||||||
$seqCount = \app\model\Openim\Seq::count();
|
|
||||||
$this->log(" 总记录数: {$seqCount}");
|
|
||||||
$seqSample = \app\model\Openim\Seq::limit(5)->select()->toArray();
|
|
||||||
foreach ($seqSample as $i => $row) {
|
|
||||||
$this->log(" 样本[{$i}]: conversation_id={$row['conversation_id']}, max_seq={$row['max_seq']}, min_seq={$row['min_seq']}");
|
|
||||||
}
|
|
||||||
$this->log("");
|
|
||||||
|
|
||||||
$this->log("【2. seq_user 表分析】");
|
|
||||||
$seqUserCount = \app\model\Openim\SeqUser::count();
|
|
||||||
$this->log(" 总记录数: {$seqUserCount}");
|
|
||||||
$seqUserSample = \app\model\Openim\SeqUser::limit(5)->select()->toArray();
|
|
||||||
foreach ($seqUserSample as $i => $row) {
|
|
||||||
$this->log(" 样本[{$i}]: conversation_id={$row['conversation_id']}, user_id={$row['user_id']}, max_seq={$row['max_seq']}, min_seq={$row['min_seq']}, read_seq={$row['read_seq']}");
|
|
||||||
}
|
|
||||||
$this->log("");
|
|
||||||
|
|
||||||
$this->log("【3. conversation 表分析】");
|
|
||||||
$conversationCount = \app\model\Openim\Conversation::count();
|
|
||||||
$this->log(" 总记录数: {$conversationCount}");
|
|
||||||
$conversationSample = \app\model\Openim\Conversation::limit(5)->select()->toArray();
|
|
||||||
foreach ($conversationSample as $i => $row) {
|
|
||||||
$this->log(" 样本[{$i}]: conversation_id={$row['conversation_id']}, owner_user_id={$row['owner_user_id']}, max_seq=" . ($row['max_seq'] ?? 'null') . ", min_seq=" . ($row['min_seq'] ?? 'null'));
|
|
||||||
}
|
|
||||||
$this->log("");
|
|
||||||
|
|
||||||
$this->log("【4. msg 表分析 - 统计每个会话的 seq 范围】");
|
|
||||||
$this->analyzeMsgSeq();
|
|
||||||
$this->log("");
|
|
||||||
}
|
|
||||||
|
|
||||||
private function analyzeMsgSeq(): void
|
private function analyzeMsgSeq(): void
|
||||||
{
|
{
|
||||||
$msgCount = \app\model\Openim\Msg::count();
|
$msgCount = \app\model\Openim\Msg::count();
|
||||||
$this->log(" 消息文档数: {$msgCount}");
|
$this->log("分析消息数据...");
|
||||||
$this->log(" 正在分析消息中的 seq 范围...");
|
$this->log("消息文档数: {$msgCount}");
|
||||||
|
|
||||||
$sampleDoc = \app\model\Openim\Msg::limit(1)->select()->first();
|
|
||||||
if ($sampleDoc) {
|
|
||||||
$this->log(" 样本文档 doc_id: " . ($sampleDoc['doc_id'] ?? 'null'));
|
|
||||||
$msgsArray = $sampleDoc['msgs'];
|
|
||||||
if ($msgsArray instanceof \think\model\Collection) {
|
|
||||||
$msgsArray = $msgsArray->toArray();
|
|
||||||
}
|
|
||||||
$this->log(" 样本文档 msgs 数量: " . count($msgsArray));
|
|
||||||
if (count($msgsArray) > 0) {
|
|
||||||
$firstMsg = $msgsArray[0];
|
|
||||||
if (isset($firstMsg['msg'])) {
|
|
||||||
$msg = $firstMsg['msg'];
|
|
||||||
$this->log(" 第一条消息 session_type: " . ($msg['session_type'] ?? 'null'));
|
|
||||||
$this->log(" 第一条消息 send_id: " . ($msg['send_id'] ?? 'null'));
|
|
||||||
$this->log(" 第一条消息 recv_id: " . ($msg['recv_id'] ?? 'null'));
|
|
||||||
$this->log(" 第一条消息 group_id: " . ($msg['group_id'] ?? 'null'));
|
|
||||||
$this->log(" 第一条消息 seq: " . ($msg['seq'] ?? 'null'));
|
|
||||||
$conversationId = $this->getConversationId($msg);
|
|
||||||
$this->log(" 计算得到的 conversationID: {$conversationId}");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
$this->log("");
|
|
||||||
|
|
||||||
$processedDocs = 0;
|
$processedDocs = 0;
|
||||||
$totalMsgs = 0;
|
$totalMsgs = 0;
|
||||||
@@ -197,292 +94,180 @@ class FixOpenimSeq extends Command
|
|||||||
}
|
}
|
||||||
|
|
||||||
if ($processedDocs % 100 == 0) {
|
if ($processedDocs % 100 == 0) {
|
||||||
$this->log(" 已处理 {$processedDocs}/{$msgCount} 个文档...");
|
$this->log("已处理 {$processedDocs}/{$msgCount} 个文档...");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
$this->log(" 处理完成!共处理 {$processedDocs} 个文档,{$totalMsgs} 条消息");
|
$this->log("处理完成!共处理 {$processedDocs} 个文档,{$totalMsgs} 条消息");
|
||||||
$this->log(" 发现 " . count($this->conversationSeqMap) . " 个会话");
|
$this->log("发现 " . count($this->conversationSeqMap) . " 个会话");
|
||||||
$this->log("");
|
|
||||||
|
|
||||||
$this->log("【5. 会话 seq 范围统计(前10个)】");
|
|
||||||
$i = 0;
|
|
||||||
foreach ($this->conversationSeqMap as $conversationId => $seqInfo) {
|
|
||||||
if ($i++ >= 10) break;
|
|
||||||
$this->log(" {$conversationId}: min_seq={$seqInfo['min_seq']}, max_seq={$seqInfo['max_seq']}, msg_count={$seqInfo['count']}");
|
|
||||||
}
|
|
||||||
$this->log("");
|
|
||||||
|
|
||||||
$this->log("【6. 检测问题数据】");
|
|
||||||
$this->detectProblematicData();
|
|
||||||
}
|
|
||||||
|
|
||||||
private function detectProblematicData(): void
|
|
||||||
{
|
|
||||||
$seqProblems = 0;
|
|
||||||
$seqUserProblems = 0;
|
|
||||||
$conversationProblems = 0;
|
|
||||||
|
|
||||||
$this->log(" 正在检测 seq 表...");
|
|
||||||
$total = \app\model\Openim\Seq::count();
|
|
||||||
$batchSize = 1000;
|
|
||||||
for ($offset = 0; $offset < $total; $offset += $batchSize) {
|
|
||||||
$seqRecords = \app\model\Openim\Seq::field('conversation_id,max_seq,min_seq')->limit($offset, $batchSize)->select()->toArray();
|
|
||||||
foreach ($seqRecords as $seq) {
|
|
||||||
$conversationId = $seq['conversation_id'];
|
|
||||||
if (isset($this->conversationSeqMap[$conversationId])) {
|
|
||||||
$expected = $this->conversationSeqMap[$conversationId];
|
|
||||||
if ($seq['max_seq'] != $expected['max_seq'] || $seq['min_seq'] != $expected['min_seq']) {
|
|
||||||
$seqProblems++;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
$seqRecords = null;
|
|
||||||
gc_collect_cycles();
|
|
||||||
}
|
|
||||||
$this->log(" seq 表问题记录: {$seqProblems} 条(max_seq/min_seq 与实际消息不符)");
|
|
||||||
|
|
||||||
$this->log(" 正在检测 seq_user 表...");
|
|
||||||
$total = \app\model\Openim\SeqUser::count();
|
|
||||||
for ($offset = 0; $offset < $total; $offset += $batchSize) {
|
|
||||||
$seqUserRecords = \app\model\Openim\SeqUser::field('conversation_id,user_id,max_seq,min_seq,read_seq')->limit($offset, $batchSize)->select()->toArray();
|
|
||||||
foreach ($seqUserRecords as $seqUser) {
|
|
||||||
$conversationId = $seqUser['conversation_id'];
|
|
||||||
if (isset($this->conversationSeqMap[$conversationId])) {
|
|
||||||
$expected = $this->conversationSeqMap[$conversationId];
|
|
||||||
if ($seqUser['max_seq'] != $expected['max_seq'] ||
|
|
||||||
$seqUser['min_seq'] != $expected['min_seq'] ||
|
|
||||||
$seqUser['read_seq'] > $expected['max_seq']) {
|
|
||||||
$seqUserProblems++;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
$seqUserRecords = null;
|
|
||||||
gc_collect_cycles();
|
|
||||||
}
|
|
||||||
$this->log(" seq_user 表问题记录: {$seqUserProblems} 条(seq 不符或 read_seq > max_seq)");
|
|
||||||
|
|
||||||
$this->log(" 正在检测 conversation 表...");
|
|
||||||
$total = \app\model\Openim\Conversation::count();
|
|
||||||
for ($offset = 0; $offset < $total; $offset += $batchSize) {
|
|
||||||
$conversationRecords = \app\model\Openim\Conversation::field('conversation_id,max_seq,min_seq')->limit($offset, $batchSize)->select()->toArray();
|
|
||||||
foreach ($conversationRecords as $conversation) {
|
|
||||||
$conversationId = $conversation['conversation_id'];
|
|
||||||
if (isset($this->conversationSeqMap[$conversationId])) {
|
|
||||||
$expected = $this->conversationSeqMap[$conversationId];
|
|
||||||
$currentMax = $conversation['max_seq'] ?? 0;
|
|
||||||
$currentMin = $conversation['min_seq'] ?? 0;
|
|
||||||
if ($currentMax != $expected['max_seq'] || $currentMin != $expected['min_seq']) {
|
|
||||||
$conversationProblems++;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
$conversationRecords = null;
|
|
||||||
gc_collect_cycles();
|
|
||||||
}
|
|
||||||
$this->log(" conversation 表问题记录: {$conversationProblems} 条(max_seq/min_seq 与实际消息不符)");
|
|
||||||
$this->log("");
|
|
||||||
}
|
|
||||||
|
|
||||||
private function backupAll(): void
|
|
||||||
{
|
|
||||||
$this->log("═══════════════════════════════════════════════════════════");
|
|
||||||
$this->log(" 第二步:备份数据 ");
|
|
||||||
$this->log("═══════════════════════════════════════════════════════════");
|
|
||||||
$this->log("");
|
|
||||||
|
|
||||||
if (!is_dir($this->backupDir)) {
|
|
||||||
mkdir($this->backupDir, 0755, true);
|
|
||||||
}
|
|
||||||
|
|
||||||
$timestamp = date('Ymd_His');
|
|
||||||
$backupPath = $this->backupDir . '/backup_' . $timestamp;
|
|
||||||
mkdir($backupPath, 0755, true);
|
|
||||||
|
|
||||||
$this->log("备份目录: {$backupPath}");
|
|
||||||
$this->log("");
|
|
||||||
|
|
||||||
$this->log("【1. 备份 seq 表】");
|
|
||||||
$total = \app\model\Openim\Seq::count();
|
|
||||||
$batchSize = 1000;
|
|
||||||
$seqData = [];
|
|
||||||
for ($offset = 0; $offset < $total; $offset += $batchSize) {
|
|
||||||
$batch = \app\model\Openim\Seq::limit($offset, $batchSize)->select()->toArray();
|
|
||||||
$seqData = array_merge($seqData, $batch);
|
|
||||||
$batch = null;
|
|
||||||
gc_collect_cycles();
|
|
||||||
}
|
|
||||||
file_put_contents($backupPath . '/seq.json', json_encode($seqData, JSON_UNESCAPED_UNICODE | JSON_PRETTY_PRINT));
|
|
||||||
$this->log(" 已备份 " . count($seqData) . " 条记录 -> seq.json");
|
|
||||||
$seqData = null;
|
|
||||||
|
|
||||||
$this->log("【2. 备份 seq_user 表】");
|
|
||||||
$total = \app\model\Openim\SeqUser::count();
|
|
||||||
$seqUserData = [];
|
|
||||||
for ($offset = 0; $offset < $total; $offset += $batchSize) {
|
|
||||||
$batch = \app\model\Openim\SeqUser::limit($offset, $batchSize)->select()->toArray();
|
|
||||||
$seqUserData = array_merge($seqUserData, $batch);
|
|
||||||
$batch = null;
|
|
||||||
gc_collect_cycles();
|
|
||||||
}
|
|
||||||
file_put_contents($backupPath . '/seq_user.json', json_encode($seqUserData, JSON_UNESCAPED_UNICODE | JSON_PRETTY_PRINT));
|
|
||||||
$this->log(" 已备份 " . count($seqUserData) . " 条记录 -> seq_user.json");
|
|
||||||
$seqUserData = null;
|
|
||||||
|
|
||||||
$this->log("【3. 备份 conversation 表】");
|
|
||||||
$total = \app\model\Openim\Conversation::count();
|
|
||||||
$conversationData = [];
|
|
||||||
for ($offset = 0; $offset < $total; $offset += $batchSize) {
|
|
||||||
$batch = \app\model\Openim\Conversation::limit($offset, $batchSize)->select()->toArray();
|
|
||||||
$conversationData = array_merge($conversationData, $batch);
|
|
||||||
$batch = null;
|
|
||||||
gc_collect_cycles();
|
|
||||||
}
|
|
||||||
file_put_contents($backupPath . '/conversation.json', json_encode($conversationData, JSON_UNESCAPED_UNICODE | JSON_PRETTY_PRINT));
|
|
||||||
$this->log(" 已备份 " . count($conversationData) . " 条记录 -> conversation.json");
|
|
||||||
$conversationData = null;
|
|
||||||
|
|
||||||
$this->log("");
|
|
||||||
$this->log("✓ 备份完成!备份文件保存在: {$backupPath}");
|
|
||||||
$this->log("");
|
$this->log("");
|
||||||
}
|
}
|
||||||
|
|
||||||
private function fixAll(): void
|
private function fixAll(): void
|
||||||
{
|
{
|
||||||
$this->log("═══════════════════════════════════════════════════════════");
|
$this->log("开始修复数据...");
|
||||||
$this->log(" 第三步:修复数据 ");
|
|
||||||
$this->log("═══════════════════════════════════════════════════════════");
|
|
||||||
$this->log("");
|
$this->log("");
|
||||||
|
|
||||||
if (empty($this->conversationSeqMap)) {
|
if (empty($this->conversationSeqMap)) {
|
||||||
$this->log("错误:缺少会话 seq 数据,请先运行分析步骤");
|
$this->log("错误:缺少会话 seq 数据");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
$this->log("【1. 修复 seq 表】");
|
|
||||||
$this->log(" 原理:根据消息表中的实际 seq 更新 max_seq 和 min_seq");
|
|
||||||
$seqFixed = 0;
|
$seqFixed = 0;
|
||||||
$seqCreated = 0;
|
$seqCreated = 0;
|
||||||
foreach ($this->conversationSeqMap as $conversationId => $seqInfo) {
|
|
||||||
$existing = \app\model\Openim\Seq::where('conversation_id', $conversationId)->find();
|
|
||||||
|
|
||||||
if ($existing) {
|
|
||||||
$oldMax = $existing['max_seq'];
|
|
||||||
$oldMin = $existing['min_seq'];
|
|
||||||
if ($oldMax != $seqInfo['max_seq'] || $oldMin != $seqInfo['min_seq']) {
|
|
||||||
$existing->max_seq = $seqInfo['max_seq'];
|
|
||||||
$existing->min_seq = $seqInfo['min_seq'];
|
|
||||||
$existing->save();
|
|
||||||
$this->log(" 更新 {$conversationId}: max_seq {$oldMax} -> {$seqInfo['max_seq']}, min_seq {$oldMin} -> {$seqInfo['min_seq']}");
|
|
||||||
$seqFixed++;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
\app\model\Openim\Seq::create([
|
|
||||||
'conversation_id' => $conversationId,
|
|
||||||
'max_seq' => $seqInfo['max_seq'],
|
|
||||||
'min_seq' => $seqInfo['min_seq'],
|
|
||||||
]);
|
|
||||||
$this->log(" 创建 {$conversationId}: max_seq={$seqInfo['max_seq']}, min_seq={$seqInfo['min_seq']}");
|
|
||||||
$seqCreated++;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
$this->log(" 完成:更新 {$seqFixed} 条,新建 {$seqCreated} 条");
|
|
||||||
$this->log("");
|
|
||||||
|
|
||||||
$this->log("【2. 修复 seq_user 表】");
|
|
||||||
$this->log(" 原理:更新 max_seq/min_seq 为会话值,确保 read_seq <= max_seq");
|
|
||||||
$seqUserFixed = 0;
|
$seqUserFixed = 0;
|
||||||
$seqUserReadFixed = 0;
|
|
||||||
|
|
||||||
$total = \app\model\Openim\SeqUser::count();
|
|
||||||
$batchSize = 500;
|
|
||||||
for ($offset = 0; $offset < $total; $offset += $batchSize) {
|
|
||||||
$seqUsers = \app\model\Openim\SeqUser::limit($offset, $batchSize)->select();
|
|
||||||
foreach ($seqUsers as $seqUser) {
|
|
||||||
$conversationId = $seqUser['conversation_id'];
|
|
||||||
if (isset($this->conversationSeqMap[$conversationId])) {
|
|
||||||
$seqInfo = $this->conversationSeqMap[$conversationId];
|
|
||||||
$oldMax = $seqUser['max_seq'];
|
|
||||||
$oldMin = $seqUser['min_seq'];
|
|
||||||
$oldRead = $seqUser['read_seq'];
|
|
||||||
$needSave = false;
|
|
||||||
|
|
||||||
if ($oldMax != $seqInfo['max_seq'] || $oldMin != $seqInfo['min_seq']) {
|
|
||||||
$seqUser->max_seq = $seqInfo['max_seq'];
|
|
||||||
$seqUser->min_seq = $seqInfo['min_seq'];
|
|
||||||
$needSave = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
if ($oldRead > $seqInfo['max_seq']) {
|
|
||||||
$seqUser->read_seq = $seqInfo['max_seq'];
|
|
||||||
$this->log(" 修正 read_seq {$conversationId}/{$seqUser['user_id']}: {$oldRead} -> {$seqInfo['max_seq']}");
|
|
||||||
$seqUserReadFixed++;
|
|
||||||
$needSave = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
if ($needSave) {
|
|
||||||
$seqUser->save();
|
|
||||||
$seqUserFixed++;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
$seqUsers = null;
|
|
||||||
gc_collect_cycles();
|
|
||||||
}
|
|
||||||
$this->log(" 完成:更新 {$seqUserFixed} 条,其中修正 read_seq {$seqUserReadFixed} 条");
|
|
||||||
$this->log("");
|
|
||||||
|
|
||||||
$this->log("【3. 修复 conversation 表】");
|
|
||||||
$this->log(" 原理:根据消息表中的实际 seq 更新 max_seq 和 min_seq");
|
|
||||||
$conversationFixed = 0;
|
$conversationFixed = 0;
|
||||||
$conversationNotInMap = 0;
|
|
||||||
$total = \app\model\Openim\Conversation::count();
|
|
||||||
$debugLogged = false;
|
|
||||||
for ($offset = 0; $offset < $total; $offset += $batchSize) {
|
|
||||||
$conversations = \app\model\Openim\Conversation::limit($offset, $batchSize)->select();
|
|
||||||
foreach ($conversations as $conversation) {
|
|
||||||
$conversationId = $conversation['conversation_id'];
|
|
||||||
if (isset($this->conversationSeqMap[$conversationId])) {
|
|
||||||
$seqInfo = $this->conversationSeqMap[$conversationId];
|
|
||||||
$oldMax = $conversation['max_seq'] ?? 0;
|
|
||||||
$oldMin = $conversation['min_seq'] ?? 0;
|
|
||||||
|
|
||||||
if ($oldMax != $seqInfo['max_seq'] || $oldMin != $seqInfo['min_seq']) {
|
foreach ($this->conversationSeqMap as $conversationId => $seqInfo) {
|
||||||
$data = [
|
// 修复 seq 表
|
||||||
'max_seq' => (int)$seqInfo['max_seq'],
|
$existing = \app\model\Openim\Seq::where('conversation_id', $conversationId)->find();
|
||||||
'min_seq' => (int)$seqInfo['min_seq'],
|
if ($existing) {
|
||||||
];
|
$max_seq = 0;
|
||||||
$result = \app\model\Openim\Conversation::where('conversation_id', $conversationId)
|
if(str_starts_with($conversationId,'sg_')){
|
||||||
->where('owner_user_id', $conversation['owner_user_id'])
|
$max_seq = ceil($seqInfo['max_seq']/100)*100+1;
|
||||||
->update($data);
|
}else{
|
||||||
if ($result) {
|
$max_seq = ceil($seqInfo['max_seq']/50)*50+1;
|
||||||
$conversationFixed++;
|
}
|
||||||
|
$existing->max_seq = $max_seq;
|
||||||
|
$existing->min_seq = 0;
|
||||||
|
$existing->save();
|
||||||
|
$seqFixed++;
|
||||||
} else {
|
} else {
|
||||||
$this->log(" 警告: 更新失败 {$conversationId} / {$conversation['owner_user_id']}");
|
|
||||||
}
|
}
|
||||||
}
|
|
||||||
} else {
|
|
||||||
$conversationNotInMap++;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
$conversations = null;
|
|
||||||
gc_collect_cycles();
|
|
||||||
}
|
|
||||||
$this->log(" 完成:更新 {$conversationFixed} 条,不在消息映射中 {$conversationNotInMap} 条");
|
|
||||||
$this->log("");
|
|
||||||
|
|
||||||
$this->log("═══════════════════════════════════════════════════════════");
|
// 修复 seq_user 表
|
||||||
$this->log(" 修复完成! ");
|
\app\model\Openim\SeqUser::where('conversation_id', $conversationId)->update([
|
||||||
$this->log("═══════════════════════════════════════════════════════════");
|
'max_seq' => 0,
|
||||||
|
'min_seq' => 0,
|
||||||
|
]);
|
||||||
|
\app\model\Openim\SeqUser::where('conversation_id', $conversationId)
|
||||||
|
->where('read_seq','<>',$seqInfo['max_seq'])
|
||||||
|
->update([
|
||||||
|
'read_seq' => $seqInfo['max_seq'],
|
||||||
|
]);
|
||||||
|
|
||||||
|
// 修复 conversation 表
|
||||||
|
\app\model\Openim\Conversation::where('min_seq', '>',0)->update([
|
||||||
|
'max_seq' => 0,
|
||||||
|
'min_seq' => 0,
|
||||||
|
]);
|
||||||
|
}
|
||||||
|
|
||||||
|
$this->log("修复完成!");
|
||||||
|
$this->log("- seq 表: 更新 {$seqFixed} 条,新建 {$seqCreated} 条");
|
||||||
|
$this->log("- seq_user 表: 更新 {$seqUserFixed} 条");
|
||||||
|
$this->log("- conversation 表: 更新 {$conversationFixed} 条");
|
||||||
$this->log("");
|
$this->log("");
|
||||||
$this->log("修复统计:");
|
|
||||||
$this->log(" - seq 表: 更新 {$seqFixed} 条,新建 {$seqCreated} 条");
|
|
||||||
$this->log(" - seq_user 表: 更新 {$seqUserFixed} 条,修正 read_seq {$seqUserReadFixed} 条");
|
|
||||||
$this->log(" - conversation 表: 更新 {$conversationFixed} 条");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private function log(string $message): void
|
private function log(string $message): void
|
||||||
{
|
{
|
||||||
echo $message . "\n";
|
echo $message . "\n";
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 重置 seq 相关字段并重新计算
|
||||||
|
*/
|
||||||
|
public function fix_seq(): void
|
||||||
|
{
|
||||||
|
$this->log("\n═══════════════════════════════════════════════════════════");
|
||||||
|
$this->log(" 执行 fix_seq 方法 ");
|
||||||
|
$this->log("═══════════════════════════════════════════════════════════");
|
||||||
|
|
||||||
|
// 1. 获取所有会话ID
|
||||||
|
$conversationIds = [];
|
||||||
|
|
||||||
|
// 从 seq 表获取所有会话ID
|
||||||
|
$seqRecords = \app\model\Openim\Seq::field('conversation_id')->select()->toArray();
|
||||||
|
foreach ($seqRecords as $record) {
|
||||||
|
$conversationIds[] = $record['conversation_id'];
|
||||||
|
}
|
||||||
|
|
||||||
|
// 去重
|
||||||
|
$conversationIds = array_unique($conversationIds);
|
||||||
|
$totalConversations = count($conversationIds);
|
||||||
|
$this->log("发现 {$totalConversations} 个会话");
|
||||||
|
|
||||||
|
$processed = 0;
|
||||||
|
foreach ($conversationIds as $conversationId) {
|
||||||
|
cp('更新:'.$conversationId);
|
||||||
|
continue;
|
||||||
|
$processed++;
|
||||||
|
$this->log("\n处理会话 {$conversationId} ({$processed}/{$totalConversations})");
|
||||||
|
|
||||||
|
// 2. 计算变量A
|
||||||
|
$msgCount = \app\model\Openim\Msg::whereLike('doc_id', "{$conversationId}%")->count();
|
||||||
|
$multiplier = strpos($conversationId, 'sg_') === 0 ? 100 : 50;
|
||||||
|
$baseA = $msgCount * $multiplier + 1;
|
||||||
|
|
||||||
|
// 确保 A 是 1, 51, 101 等递增格式
|
||||||
|
$remainder = $baseA % $multiplier;
|
||||||
|
if ($remainder != 1) {
|
||||||
|
$baseA = $baseA - $remainder + 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 3. 获取最后一条消息的 seq
|
||||||
|
$lastSeq = 0;
|
||||||
|
$msgDocs = \app\model\Openim\Msg::whereLike('doc_id', "{$conversationId}%")->select();
|
||||||
|
foreach ($msgDocs as $doc) {
|
||||||
|
$msgsArray = $doc['msgs'];
|
||||||
|
if ($msgsArray instanceof \think\model\Collection) {
|
||||||
|
$msgsArray = $msgsArray->toArray();
|
||||||
|
}
|
||||||
|
if (!empty($msgsArray) && is_array($msgsArray)) {
|
||||||
|
foreach ($msgsArray as $msgItem) {
|
||||||
|
if (isset($msgItem['msg']['seq'])) {
|
||||||
|
$lastSeq = max($lastSeq, (int)$msgItem['msg']['seq']);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 确保 A 大于最后一条消息的 seq
|
||||||
|
if ($baseA <= $lastSeq) {
|
||||||
|
$baseA = $lastSeq + 50 - ($lastSeq % 50) + 1;
|
||||||
|
if ($baseA % 50 != 1) {
|
||||||
|
$baseA += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
$this->log(" - 消息记录数: {$msgCount}");
|
||||||
|
$this->log(" - 乘数: {$multiplier}");
|
||||||
|
$this->log(" - 最后消息 seq: {$lastSeq}");
|
||||||
|
$this->log(" - 计算变量 A: {$baseA}");
|
||||||
|
|
||||||
|
// 4. 更新 seq 表
|
||||||
|
$seq = \app\model\Openim\Seq::where('conversation_id', $conversationId)->find();
|
||||||
|
if ($seq) {
|
||||||
|
$seq->max_seq = $baseA;
|
||||||
|
$seq->min_seq = 0;
|
||||||
|
$seq->save();
|
||||||
|
$this->log(" - 更新 seq 表: max_seq={$baseA}, min_seq=0");
|
||||||
|
}
|
||||||
|
|
||||||
|
// 5. 更新 conversation 表
|
||||||
|
$conversations = \app\model\Openim\Conversation::where('conversation_id', $conversationId)->select();
|
||||||
|
foreach ($conversations as $conversation) {
|
||||||
|
$conversation->max_seq = 0;
|
||||||
|
$conversation->min_seq = 0;
|
||||||
|
$conversation->save();
|
||||||
|
}
|
||||||
|
$this->log(" - 更新 conversation 表: max_seq=0, min_seq=0");
|
||||||
|
|
||||||
|
// 6. 更新 seq_user 表
|
||||||
|
$seqUsers = \app\model\Openim\SeqUser::where('conversation_id', $conversationId)->select();
|
||||||
|
foreach ($seqUsers as $seqUser) {
|
||||||
|
cp('更新:'.$conversationId);
|
||||||
|
$seqUser->max_seq = 0;
|
||||||
|
$seqUser->min_seq = 0;
|
||||||
|
$seqUser->read_seq = $lastSeq;
|
||||||
|
$seqUser->save();
|
||||||
|
}
|
||||||
|
$this->log(" - 更新 seq_user 表: max_seq=0, min_seq=0, read_seq={$lastSeq}");
|
||||||
|
}
|
||||||
|
|
||||||
|
$this->log("\n═══════════════════════════════════════════════════════════");
|
||||||
|
$this->log(" fix_seq 方法执行完成 ");
|
||||||
|
$this->log("═══════════════════════════════════════════════════════════");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -19,13 +19,13 @@ use support\think\Db;
|
|||||||
* - 群组同步
|
* - 群组同步
|
||||||
* - 缓存管理
|
* - 缓存管理
|
||||||
*/
|
*/
|
||||||
class OpenIm extends Command
|
class Openim extends Command
|
||||||
{
|
{
|
||||||
/**
|
/**
|
||||||
* 命令默认名称
|
* 命令默认名称
|
||||||
* @var string
|
* @var string
|
||||||
*/
|
*/
|
||||||
protected static $defaultName = 'openim';
|
protected static $defaultName = 'Openim';
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 命令默认描述
|
* 命令默认描述
|
||||||
@@ -68,6 +68,31 @@ class OpenIm extends Command
|
|||||||
* @var int
|
* @var int
|
||||||
*/
|
*/
|
||||||
const GROUP_CHAT_TYPE = 2;
|
const GROUP_CHAT_TYPE = 2;
|
||||||
|
function send_group_msg(InputInterface $input, OutputInterface $output):int
|
||||||
|
{
|
||||||
|
$sdk = $this->getSdk();
|
||||||
|
for($i=100;$i<101;$i++){
|
||||||
|
$data = $sdk->message->sendGroupMessage('69546124','62106234',[
|
||||||
|
"content" => [
|
||||||
|
"content" => '第'.$i.'条消息'
|
||||||
|
],
|
||||||
|
"contentType" => 101,
|
||||||
|
"isOnlineOnly" => false,
|
||||||
|
"notOfflinePush" => false,
|
||||||
|
"sendTime" => time()*1000,
|
||||||
|
"offlinePushInfo" => [
|
||||||
|
"title" => "send message",
|
||||||
|
"desc" => '第'.$i.'条消息',
|
||||||
|
"ex" => "",
|
||||||
|
"iOSPushSound" => "default",
|
||||||
|
"iOSBadgeCount" => true
|
||||||
|
],
|
||||||
|
"ex" => ""
|
||||||
|
]);
|
||||||
|
cp($data);
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
function revoke_msg(InputInterface $input, OutputInterface $output)
|
function revoke_msg(InputInterface $input, OutputInterface $output)
|
||||||
{
|
{
|
||||||
@@ -107,6 +132,23 @@ class OpenIm extends Command
|
|||||||
*/
|
*/
|
||||||
function fix_seq(InputInterface $input, OutputInterface $output)
|
function fix_seq(InputInterface $input, OutputInterface $output)
|
||||||
{
|
{
|
||||||
|
//
|
||||||
|
$conversation_list = \app\model\Openim\Conversation::order('id','asc')->select();
|
||||||
|
foreach($conversation_list as $item){
|
||||||
|
$item->msgs = json_decode($item->msgs,true);
|
||||||
|
$item->save();
|
||||||
|
}
|
||||||
|
$list = \app\model\Openim\Msg::whereLike('doc_id','sg_3136475753%')->orderBy('id','desc')->select();
|
||||||
|
\app\model\Openim\Seq::where('conversation_id','sg_3136475753')->update([
|
||||||
|
'max_seq'=>count($list)*100+1
|
||||||
|
]);
|
||||||
|
\app\model\Openim\SeqUser::where('conversation_id','sg_3136475753')->update([
|
||||||
|
'read_seq'=>count($list)*100+1
|
||||||
|
]);
|
||||||
|
foreach($list as $item){
|
||||||
|
$item->msgs = json_decode($item->msgs,true);
|
||||||
|
$item->save();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -1572,7 +1614,7 @@ class OpenIm extends Command
|
|||||||
return $this->$action($input, $output);
|
return $this->$action($input, $output);
|
||||||
}
|
}
|
||||||
|
|
||||||
$output->writeln($action . '不存在');
|
$output->writeln($action . '不存在1');
|
||||||
return self::FAILURE;
|
return self::FAILURE;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -23,7 +23,7 @@ class Message
|
|||||||
}
|
}
|
||||||
public function sendMessage(string $sendID, array $data=[]): array
|
public function sendMessage(string $sendID, array $data=[]): array
|
||||||
{
|
{
|
||||||
$user = \think\facade\Db::name('user')->where('id',$sendID)->field('avatar,nickname')->find();
|
$user = \think\facade\Db::name('user')->where('userID',$sendID)->field('avatar,nickname')->find();
|
||||||
$data = array_merge([
|
$data = array_merge([
|
||||||
"sendID" => $sendID,
|
"sendID" => $sendID,
|
||||||
"senderNickname" => $user['nickname'],
|
"senderNickname" => $user['nickname'],
|
||||||
|
|||||||
Reference in New Issue
Block a user