['total' => 0, 'success' => 0, 'failed' => 0], 'groups' => ['total' => 0, 'success' => 0, 'failed' => 0], 'members' => ['total' => 0, 'success' => 0, 'failed' => 0], 'messages' => ['total' => 0, 'success' => 0, 'failed' => 0, 'skipped' => 0], ]; protected function configure(): void { $this->addOption('step', 's', InputOption::VALUE_OPTIONAL, '执行步骤: users/groups/members/messages/all', 'all'); $this->addOption('skip-users', null, InputOption::VALUE_OPTIONAL, '跳过的用户ID(逗号分隔)'); $this->addOption('skip-groups', null, InputOption::VALUE_OPTIONAL, '跳过的群ID(逗号分隔)'); $this->addOption('clean', null, InputOption::VALUE_NONE, '清空现有数据后再迁移'); $this->addOption('retry', 'r', InputOption::VALUE_OPTIONAL, '失败重试次数', 3); } protected function execute(InputInterface $input, OutputInterface $output): int { $step = $input->getOption('step'); $skipUsers = $input->getOption('skip-users') ? explode(',', $input->getOption('skip-users')) : []; $skipGroups = $input->getOption('skip-groups') ? explode(',', $input->getOption('skip-groups')) : []; $clean = $input->getOption('clean'); $retry = (int)$input->getOption('retry'); // 自动忽略特殊用户 $defaultSkipUsers = ['group_bot', 'official_team', 'system','imAdmin']; $skipUsers = array_merge($skipUsers, $defaultSkipUsers); $this->skipUsers = array_unique($skipUsers); $this->skipGroups = array_unique($skipGroups); $this->retry = $retry; $this->log($output, "╔════════════════════════════════════════════════════════════╗"); $this->log($output, "║ OpenIM 数据迁移工具 v2.0 ║"); $this->log($output, "╚════════════════════════════════════════════════════════════╝"); $this->log($output, ""); if ($clean) { $this->log($output, "🗑️ 清理模式:会清空现有数据"); } $this->log($output, "📍 执行步骤: {$step}"); $this->log($output, ""); if($step == 'restore'){ $this->restoreMongoDB($output, '/vol3/1000/code/im/admin/backup/openim_v3_groups_20260413141105.json'); return 0; } $this->cleanExistingData($output,[]); try { $this->initConnections($output); //return 0 ; if ($clean) { $this->cleanExistingData($output,[ 'conversation', 'conversation_version', // 会话相关集合 'data_version', // 数据版本集合 'friend', 'friend_request', 'friend_version', // 好友关系相关集合 'group', 'group_join_version','group_member','group_member_version','group_request', // 群组相关集合 'msg','seq','seq_user' // 消息和序列号相关集合 ]); return 0; } cache('admin_token_imAdmin',null); $steps = $step === 'all' ? [ 'users', 'friends', 'groups', //'members', 'messages' ] : [$step]; foreach ($steps as $s) { // 备份数据 $backupFile = $this->backupMongoDB($output, $s); try { switch ($s) { case 'users': $this->migrateUsers($output); break; case 'friends': $this->migrateFriends($output); break; case 'groups': $this->migrateGroups($output); break; case 'members': $this->migrateGroupMembers($output); break; case 'messages': $this->migrateMessages($output); break; } } catch (\Exception $e) { // 遇到错误,回滚数据 if (!empty($backupFile)) { $this->restoreMongoDB($output, $backupFile); } throw $e; } } $this->printStats($output); return self::SUCCESS; } catch (\Exception $e) { $this->log($output, "❌ 错误: " . $e->getMessage()); //$this->log($output, $e->getTraceAsString()); return self::FAILURE; } } private function migrateUsers(OutputInterface $output): void { //之前残留了一部分数据,是单向好友,这里没做删除,所以数据大小和之前的不一样,用户重新删除一次就好了 $this->log($output, ""); $this->log($output, "═════════════════ 步骤1: 迁移用户 ═════════════════"); $this->log($output, "清理旧的数据"); $this->cleanExistingData($output,[ 'user' ]); $user_list = (new \app\model\Openim\User())->setOption('connection','tettt') ->whereNotIn('user_id',$this->skipUsers) ->field('user_id,nickname,face_url') ->select(); $user_list = $user_list->toArray(); // 1. 创建进度条(内置核心方法) $progressBar = new ProgressBar($output, count($user_list)); // 可选:设置进度条样式(字符、长度等) $progressBar->setBarCharacter('█'); $progressBar->setEmptyBarCharacter('░'); $progressBar->setProgressCharacter('▶'); $progressBar->setBarWidth(400); // 2. 开始显示 $progressBar->start(); echo sprintf("\r"); while(count($user_list) > 0){ $step = 100; $user = array_slice($user_list,0,$step); $user_list = array_slice($user_list,$step); $this->sdk->user->userRegister($user); $progressBar->advance($step); } // 4. 结束进度条 $progressBar->finish(); } private function migrateFriends(OutputInterface $output): void { //之前残留了一部分数据,是单向好友,这里没做删除,所以数据大小和之前的不一样,用户重新删除一次就好了 $this->log($output, ""); $this->log($output, "═════════════════ 步骤3: 迁移好友 ═════════════════"); $this->log($output, "清理旧的数据"); // $this->cleanExistingData($output,[ // 'conversation', 'conversation_version', // 会话相关集合 // 'data_version', // 数据版本集合 // 'friend', 'friend_request', 'friend_version', // 好友关系相关集合 // 'group', 'group_join_version','group_member','group_member_version','group_request', // 群组相关集合 // 'msg','seq','seq_user' // 消息和序列号相关集合 // ]); $user_list = (new \app\model\Openim\User())->setOption('connection','tettt') ->whereNotNull('user_id') ->column('user_id'); // 1. 创建进度条(内置核心方法) $progressBar = new ProgressBar($output, count($user_list)); // 可选:设置进度条样式(字符、长度等) $progressBar->setBarCharacter('█'); $progressBar->setEmptyBarCharacter('░'); $progressBar->setProgressCharacter('▶'); $progressBar->setBarWidth(400); // 2. 开始显示 $progressBar->start(); foreach($user_list as $userID){ $friend_list = (new \app\model\Openim\Friend())->setOption('connection','tettt') ->where('owner_user_id',$userID) ->column('friend_user_id'); if(count($friend_list)){ while(count($friend_list)){ $_friend_list = array_slice($friend_list, 0, 500); $friend_list = array_slice($friend_list, 500); $this->sdk->friend->importFriend($userID,$_friend_list); } } $progressBar->advance(); } // 4. 结束进度条 $progressBar->finish(); } private function migrateGroups(OutputInterface $output): void { $this->log($output, ""); $this->log($output, "═════════════════ 步骤2: 迁移群组 ═════════════════"); $this->log($output, ""); $options = []; $groups = $this->queryOldDb('group', [], $options); $this->stats['groups']['total'] = count($groups); $this->log($output, "📊 找到 {$this->stats['groups']['total']} 个群组"); $processed = 0; // 1. 创建进度条(内置核心方法) $progressBar = new ProgressBar($output, count($groups)); // 可选:设置进度条样式(字符、长度等) $progressBar->setBarCharacter('█'); $progressBar->setEmptyBarCharacter('░'); $progressBar->setProgressCharacter('▶'); $progressBar->setBarWidth(400); // 2. 开始显示 $progressBar->start(); foreach ($groups as $group) { $processed++; $groupID = (string)($group['group_id'] ?? $group['groupID'] ?? ''); if (empty($groupID) || in_array($groupID, $this->skipGroups)) { $this->stats['groups']['failed']++; continue; } $ownerUserID = (string)($group['owner_user_id'] ?? $group['ownerUserID'] ?? $group['creator_user_id'] ?? $group['creatorUserID'] ?? ''); if (empty($ownerUserID)) { $this->stats['groups']['failed']++; continue; } $groupName = (string)($group['group_name'] ?? $group['groupName'] ?? ''); $faceURL = (string)($group['face_url'] ?? $group['faceURL'] ?? ''); $introduction = (string)($group['introduction'] ?? ''); $notification = (string)($group['notification'] ?? ''); $ex = (string)($group['ex'] ?? ''); // 群组设置字段 $groupType = (int)($group['group_type'] ?? $group['groupType'] ?? 2); $needVerification = (int)($group['need_verification'] ?? $group['needVerification'] ?? 0); $lookMemberInfo = (int)($group['look_member_info'] ?? $group['lookMemberInfo'] ?? 0); $applyMemberFriend = (int)($group['apply_member_friend'] ?? $group['applyMemberFriend'] ?? 0); $progress = sprintf("[群组 %d/%d]", $processed, $this->stats['groups']['total']); if ($processed % 20 == 0 || $processed == 1) { $this->log($output, "{$progress} 处理中..."); } $this->log($output, "{$progress} 尝试创建群组: {$groupID}, 群主: {$ownerUserID}"); // 管理员信息 $adminUserIDs = (new \app\model\Openim\GroupMember())->setOption('connection','tettt') ->where('group_id',$groupID) ->where('role_level',60) ->column('user_id'); //cp($adminUserIDs ); // 成员信息 $memberUserIDs = (new \app\model\Openim\GroupMember())->setOption('connection','tettt') ->where('group_id',$groupID) ->where('role_level',20) ->column('user_id'); //cp($memberUserIDs ); $memberUserIDs = array_unique($memberUserIDs); $_memberUserIDs = array_slice($memberUserIDs, 0, 10); $memberUserIDs = array_slice($memberUserIDs, 10); try { $this->sdk->group->createGroup( $ownerUserID, $_memberUserIDs, $adminUserIDs, $groupName, $groupID, $faceURL, $introduction, $notification, $ex, $groupType, $needVerification, $lookMemberInfo, $applyMemberFriend ); while(count($memberUserIDs)){ $_memberUserIDs = array_slice($memberUserIDs, 0, 10); $memberUserIDs = array_slice($memberUserIDs, 10); try{ $this->sdk->group->inviteUserToGroup($groupID, $_memberUserIDs); } catch (\Exception $e) { $this->log($output, "{$progress} ❌ 邀请成员失败: " . $e->getMessage()); } } $this->stats['groups']['success']++; //$this->log($output, "{$progress} ✅ 创建成功"); } catch (\Exception $e) { $this->stats['groups']['failed']++; if ($e->getCode() == 1202 || strpos($e->getMessage(), 'GroupIDExisted') !== false) { $this->log($output, "{$progress} ℹ️ 群组已存在,跳过创建"); $this->stats['groups']['success']++; continue; } else { $this->log($output, "{$progress} ❌ 创建失败: " . $e->getMessage()); } } $progressBar->advance(); } // 4. 结束进度条 $progressBar->finish(); } private function migrateGroupMembers(OutputInterface $output): void { $this->log($output, ""); $this->log($output, "═════════════════ 步骤3: 迁移群成员 ═════════════════"); $this->log($output, ""); $groups = $this->queryOldDb('group', [], ['projection' => ['group_id' => 1, 'groupID' => 1]]); $groupIDs = []; foreach ($groups as $g) { $gid = (string)($g['group_id'] ?? $g['groupID'] ?? ''); if (!empty($gid) && !in_array($gid, $this->skipGroups)) { $groupIDs[] = $gid; } } $totalMembers = 0; foreach ($groupIDs as $groupID) { $members = $this->queryOldDb('group_member', ['group_id' => $groupID]); $ownerUserID = null; $adminUserIDs = []; $memberUserIDs = []; foreach ($members as $member) { $userID = (string)($member['user_id'] ?? $member['userID'] ?? ''); if (empty($userID)) continue; $roleLevel = (int)($member['role_level'] ?? $member['roleLevel'] ?? 0); if ($roleLevel == 100) { $ownerUserID = $userID; } elseif ($roleLevel == 60) { $adminUserIDs[] = $userID; } else { $memberUserIDs[] = $userID; } } if (empty($memberUserIDs) && empty($adminUserIDs)) { continue; } $totalMembers += count($memberUserIDs); $this->stats['members']['total'] += count($memberUserIDs); $progress = sprintf("[群 %s 成员 %d]", $groupID, count($memberUserIDs)); $this->log($output, "{$progress} 处理中..."); // 分批邀请,每批最多50人 $batches = array_chunk($memberUserIDs, 50); foreach ($batches as $batch) { if (empty($batch)) { continue; } $attempts = 0; while ($attempts < $this->retry) { try { $this->log($output, "{$progress} 邀请成员: " . implode(', ', array_slice($batch, 0, 5)) . (count($batch) > 5 ? '...' : '')); $result = $this->sdk->group->inviteUserToGroup($groupID, $ownerUserID ?? 'admin', $batch); $this->log($output, "{$progress} API返回: " . json_encode($result, JSON_UNESCAPED_UNICODE)); if (isset($result['errCode']) && $result['errCode'] != 0) { // 检查是否是重复键错误 if (strpos($result['errMsg'] ?? '', 'duplicate key') !== false || strpos($result['errMsg'] ?? '', 'DuplicateKey') !== false) { $this->log($output, "{$progress} ℹ️ 部分成员已存在,跳过"); $this->stats['members']['success'] += count($batch); } else { $this->stats['members']['failed'] += count($batch); $this->log($output, "{$progress} ❌ 邀请失败: " . ($result['errMsg'] ?? '未知错误')); } } else { $this->stats['members']['success'] += count($batch); $this->log($output, "{$progress} ✅ 邀请成功"); } break; } catch (\Exception $e) { $attempts++; // 检查是否是重复键错误 if (strpos($e->getMessage(), 'duplicate key') !== false || strpos($e->getMessage(), 'DuplicateKey') !== false) { $this->log($output, "{$progress} ℹ️ 部分成员已存在,跳过"); $this->stats['members']['success'] += count($batch); break; } elseif ($attempts >= $this->retry) { $this->stats['members']['failed'] += count($batch); $this->log($output, "{$progress} ❌ 邀请异常: " . $e->getMessage()); } else { $this->log($output, "{$progress} ⚠️ 邀请失败,第 {$attempts}/{$this->retry} 次重试..."); usleep(100000); } } } usleep(10000); } } $this->log($output, "📊 共处理 {$totalMembers} 个群成员"); } private function migrateMessages(OutputInterface $output): void { $this->log($output, ""); $this->log($output, "═════════════════ 步骤4: 迁移消息 ═════════════════"); $this->log($output, ""); $pipeline = [ ['$unwind' => '$msgs'], ['$match' => ['msgs.msg' => ['$ne' => null]]], ['$sort' => ['msgs.msg.send_time' => 1]], ]; $pipeline[] = ['$project' => ['doc_id' => 1, 'msg' => '$msgs.msg']]; $command = new \MongoDB\Driver\Command([ 'aggregate' => 'msg', 'pipeline' => $pipeline, 'cursor' => new \stdClass ]); $cursor = $this->oldManager->executeCommand('tettt', $command); $messages = []; foreach ($cursor as $doc) { $messages[] = $this->bsonToArray($doc); } $this->stats['messages']['total'] = count($messages); $this->log($output, "📊 找到 {$this->stats['messages']['total']} 条消息"); $processed = 0; foreach ($messages as $doc) { $processed++; $msg = $doc['msg'] ?? []; if (empty($msg)) { $this->stats['messages']['skipped']++; continue; } $sendID = (string)($msg['send_id'] ?? $msg['sendID'] ?? ''); $recvID = (string)($msg['recv_id'] ?? $msg['recvID'] ?? ''); $groupID = (string)($msg['group_id'] ?? $msg['groupID'] ?? ''); $contentType = (int)($msg['content_type'] ?? $msg['contentType'] ?? 101); $sessionType = (int)($msg['session_type'] ?? $msg['sessionType'] ?? 1); if (in_array($sendID, $this->skipUsers)) { $this->stats['messages']['skipped']++; continue; } if ($sessionType == 3 && in_array($groupID, $this->skipGroups)) { $this->stats['messages']['skipped']++; continue; } // 跳过特殊消息类型(如系统通知等) if (in_array($contentType, [200, 201, 202, 203, 204, 205])) { $this->stats['messages']['skipped']++; continue; } $progress = sprintf("[消息 %d/%d]", $processed, $this->stats['messages']['total']); if ($processed % 100 == 0 || $processed == 1) { $this->log($output, "{$progress} 处理中..."); } try { $this->log($output, "{$progress} 发送消息: sendID={$sendID}, recvID={$recvID}, groupID={$groupID}, contentType={$contentType}, sessionType={$sessionType}"); $result = $this->sendMessage($msg); $this->log($output, "{$progress} API返回: " . json_encode($result, JSON_UNESCAPED_UNICODE)); if ($result['success'] ?? false) { $this->stats['messages']['success']++; if ($processed % 100 == 0) { $this->log($output, "{$progress} ✅ 发送成功"); } } else { $this->stats['messages']['failed']++; $this->log($output, "{$progress} ❌ 发送失败: " . ($result['errMsg'] ?? '未知错误')); // 遇到NotInGroupYetError时跳过,继续迁移其他消息 if (strpos(($result['errMsg'] ?? ''), 'NotInGroupYetError') === false) { // 遇到其他错误时退出 throw new \Exception("消息发送失败: " . ($result['errMsg'] ?? '未知错误')); } else { $this->log($output, "{$progress} ℹ️ 跳过NotInGroupYetError错误,继续迁移"); } } } catch (\Exception $e) { $this->stats['messages']['failed']++; $this->log($output, "{$progress} ❌ 发送异常: " . $e->getMessage()); // 遇到NotInGroupYetError异常时跳过,继续迁移其他消息 if (strpos($e->getMessage(), 'NotInGroupYetError') === false) { // 遇到其他异常时退出 throw $e; } else { $this->log($output, "{$progress} ℹ️ 跳过NotInGroupYetError异常,继续迁移"); } } if ($this->delay > 0) { usleep($this->delay * 1000); } } } private function sendMessage(array $msg): array { $sendID = (string)($msg['send_id'] ?? $msg['sendID'] ?? ''); $recvID = (string)($msg['recv_id'] ?? $msg['recvID'] ?? ''); $groupID = (string)($msg['group_id'] ?? $msg['groupID'] ?? ''); $contentType = (int)($msg['content_type'] ?? $msg['contentType'] ?? 101); $sessionType = (int)($msg['session_type'] ?? $msg['sessionType'] ?? 1); $sendTime = (int)($msg['send_time'] ?? $msg['sendTime'] ?? 0); $content = $msg['content'] ?? ''; $ex = (string)($msg['ex'] ?? ''); if (empty($sendID)) { return ['success' => false, 'errMsg' => 'sendID为空']; } $contentData = $this->parseContent($content, $contentType); // 构建消息数据 $messageData = [ 'content' => $contentData, 'contentType' => $contentType, 'sendTime' => $sendTime, 'ex' => $ex, 'isOnlineOnly' => false, 'notOfflinePush' => true ]; // 根据会话类型调用不同的发送方法 if ($sessionType == 1 && !empty($recvID)) { // 单聊 $result = $this->sdk->message->sendSingleMessage($sendID, $recvID, $messageData); } elseif (!empty($groupID)) { // 群聊 $result = $this->sdk->message->sendGroupMessage($sendID, $groupID, $messageData); } else { return ['success' => false, 'errMsg' => '缺少必要的参数']; } return [ 'success' => !($result['errCode'] ?? 0), 'errMsg' => $result['errMsg'] ?? '' ]; } private function parseContent($content, int $contentType): array { if (is_string($content)) { $decoded = json_decode($content, true); if (json_last_error() === JSON_ERROR_NONE && is_array($decoded)) { return $decoded; } return ['content' => $content, 'text' => $content]; } if (is_array($content)) { return $content; } return ['content' => '', 'text' => '']; } private function bsonToArray($data): array { if ($data instanceof \MongoDB\Model\BSONArray) { return $data->getArrayCopy(); } if ($data instanceof \MongoDB\Model\BSONDocument) { return $data->getArrayCopy(); } if (is_object($data)) { return json_decode(json_encode($data), true); } return is_array($data) ? $data : []; } private function printStats(OutputInterface $output): void { $this->log($output, ""); $this->log($output, "╔════════════════════════════════════════════════════════════╗"); $this->log($output, "║ 迁移统计报告 ║"); $this->log($output, "╠════════════════════════════════════════════════════════════╣"); $this->log($output, "║ 用户: 总数 {$this->stats['users']['total']}, 成功 {$this->stats['users']['success']}, 失败 {$this->stats['users']['failed']}"); $this->log($output, "║ 群组: 总数 {$this->stats['groups']['total']}, 成功 {$this->stats['groups']['success']}, 失败 {$this->stats['groups']['failed']}"); $this->log($output, "║ 成员: 总数 {$this->stats['members']['total']}, 成功 {$this->stats['members']['success']}, 失败 {$this->stats['members']['failed']}"); $this->log($output, "║ 消息: 总数 {$this->stats['messages']['total']}, 成功 {$this->stats['messages']['success']}, 失败 {$this->stats['messages']['failed']}, 跳过 {$this->stats['messages']['skipped']}"); $this->log($output, "╚════════════════════════════════════════════════════════════╝"); } private function log(OutputInterface $output, string $message): void { $output->writeln($message); } /** * 获取OpenIM SDK实例 * * @return object */ function getSdk() { if ($this->sdk) { return $this->sdk; } $this->sdk = new \support\OpenImSdk\Client([ 'host' => config('openim.server'), 'secret' => config('openim.secret'), ]); return $this->sdk; } private function initConnections(OutputInterface $output): void { $this->log($output, "正在初始化连接..."); $this->getSdk(); $uri = 'mongodb://commie:n1e5a6s6m7@127.0.0.1:37017/tettt?authSource=admin'; $this->oldManager = new \MongoDB\Driver\Manager($uri); $this->log($output, "✅ 连接成功"); } private function cleanExistingData(OutputInterface $output,$collections=[]): void { // 记录开始清理数据的日志信息 $this->log($output, "\n═════════════════ 清理现有数据 ═════════════════"); $this->log($output, ""); // 构建新数据库(OpenIM v3)的MongoDB连接URI $uri = 'mongodb://commie:n1e5a6s6m7@127.0.0.1:37017/openim_v3?authSource=admin'; // 创建MongoDB驱动管理器实例,用于操作新数据库 $this->newManager = new \MongoDB\Driver\Manager($uri); try { // 记录开始清理数据的状态 $this->log($output, "正在清理mongodb数据..."); // 定义需要清空的数据集合列表(保留user集合,清空其他所有业务数据) // 遍历所有需要清理的集合,逐个执行清空操作 foreach ($collections as $collection) { try { // 创建批量写入操作对象 $bulk = new \MongoDB\Driver\BulkWrite; // 添加删除所有文档的操作(空条件表示删除全部) $bulk->delete([]); // 执行批量删除操作,指定数据库和集合名称 $this->newManager->executeBulkWrite('openim_v3.' . $collection, $bulk); // 记录该集合清空成功的日志 $this->log($output, "已清空集合: {$collection}"); } catch (\Exception $e) { // 单个集合清空失败时记录警告信息,不影响其他集合的清理 $this->log($output, "⚠️ 清空集合 {$collection} 失败: " . $e->getMessage()); } } $this->log($output, "正在清理redis数据..."); $redis = new \Redis(); $host = '127.0.0.1'; $port = 16379; $password = 'n1e5a6s6m7'; $output->writeln("连接 Redis: {$host}:{$port}"); if ($redis->connect($host, $port)) { if (!empty($password)) { $redis->auth($password); } $result = $redis->flushAll(); if ($result) { $output->writeln("✅ Redis 清空成功"); } else { $output->writeln("❌ Redis 清空失败"); } } else { $output->writeln("❌ 无法连接到 Redis"); } // 记录所有数据清理完成的日志 $this->log($output, "✅ 数据清理完成"); } catch (\Exception $e) { // 捕获整体清理过程中的异常,记录错误但不抛出,确保程序继续执行 $this->log($output, "❌ 清理数据失败: " . $e->getMessage()); // 不抛出异常,继续执行 } } private function queryOldDb(string $collection, array $filter = [], array $options = []): array { $query = new \MongoDB\Driver\Query($filter, $options); $cursor = $this->oldManager->executeQuery('tettt.' . $collection, $query); $result = []; foreach ($cursor as $doc) { $result[] = $this->bsonToArray($doc); } return $result; } /** * 备份MongoDB数据 * @param OutputInterface $output * @param string $step * @return string */ private function backupMongoDB(OutputInterface $output, string $step): string { $this->log($output, "═════════════════ 备份MongoDB数据 ═════════════════"); // 确保备份目录存在 if (!is_dir($this->backupDir)) { mkdir($this->backupDir, 0755, true); } // 生成备份文件名 $timestamp = date('YmdHis'); $backupFile = "{$this->backupDir}/openim_v3_{$step}_{$timestamp}.json"; try { // 使用现有的新数据库连接 if (!$this->newManager) { $uri = 'mongodb://commie:n1e5a6s6m7@127.0.0.1:37017/openim_v3?authSource=admin'; $this->newManager = new \MongoDB\Driver\Manager($uri); } // 获取所有集合 $command = new \MongoDB\Driver\Command(['listCollections' => 1]); $cursor = $this->newManager->executeCommand('openim_v3', $command); $collections = $cursor->toArray(); //$this->log($output, "找到 " . count($collections) . " 个集合"); $backupData = []; // 备份每个集合 foreach ($collections as $collection) { $collectionName = $collection->name; if (in_array($collectionName, ['system.indexes', 'system.profile'])) { continue; } //$this->log($output, "备份集合: {$collectionName}"); $query = new \MongoDB\Driver\Query([]); $cursor = $this->newManager->executeQuery('openim_v3.' . $collectionName, $query); $documents = []; foreach ($cursor as $doc) { $document = $this->bsonToArray($doc); // 处理ObjectId if (isset($document['_id']) && is_array($document['_id']) && isset($document['_id']['$oid'])) { $document['_id'] = $document['_id']['$oid']; } $documents[] = $document; } if (!empty($documents)) { $backupData[$collectionName] = $documents; //$this->log($output, " - 备份了 " . count($documents) . " 条记录"); } } // 保存备份文件 file_put_contents($backupFile, json_encode($backupData, JSON_UNESCAPED_UNICODE | JSON_PRETTY_PRINT)); $this->log($output, "✅ 备份成功: {$backupFile} (" . filesize($backupFile) . " 字节)"); $this->currentBackup = $backupFile; return $backupFile; } catch (\Exception $e) { $this->log($output, "❌ 备份失败: " . $e->getMessage()); return ''; } } /** * 回滚MongoDB数据 * @param OutputInterface $output * @param string $backupFile * @return bool */ private function restoreMongoDB(OutputInterface $output, string $backupFile): bool { $this->log($output, "═════════════════ 回滚MongoDB数据 ═════════════════"); if (!file_exists($backupFile)) { $this->log($output, "❌ 备份文件不存在: {$backupFile}"); return false; } try { // 读取备份文件 $backupData = json_decode(file_get_contents($backupFile), true); if (empty($backupData)) { $this->log($output, "❌ 备份文件为空"); return false; } // 连接到新数据库 $uri = 'mongodb://commie:n1e5a6s6m7@127.0.0.1:37017/openim_v3?authSource=admin'; $manager = new \MongoDB\Driver\Manager($uri); // 清空所有集合 $this->log($output, "清空现有数据..."); foreach (array_keys($backupData) as $collectionName) { $bulk = new \MongoDB\Driver\BulkWrite; $bulk->delete([]); $manager->executeBulkWrite('openim_v3.' . $collectionName, $bulk); } // 恢复数据 foreach ($backupData as $collectionName => $documents) { $documentCount = count($documents); $this->log($output, "恢复集合: {$collectionName} ({$documentCount} 条记录)"); $bulk = new \MongoDB\Driver\BulkWrite; foreach ($documents as $document) { // 处理_id字段 if (isset($document['_id']) && is_string($document['_id'])) { // 尝试创建ObjectId try { $document['_id'] = new \MongoDB\BSON\ObjectId($document['_id']); } catch (\Exception $e) { // 如果不是有效的ObjectId格式,保持原样 } } $bulk->insert($document); } $result = $manager->executeBulkWrite('openim_v3.' . $collectionName, $bulk); $this->log($output, "恢复成功: {$result->getInsertedCount()} 条记录"); } $this->log($output, "✅ 回滚成功"); return true; } catch (\Exception $e) { $this->log($output, "❌ 回滚失败: " . $e->getMessage()); return false; } } }