Files
im/app/command/MigrateMessages.php
2026-04-13 20:00:32 +08:00

873 lines
38 KiB
PHP
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
<?php
namespace app\command;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
// 引入内置进度条类
use Symfony\Component\Console\Helper\ProgressBar;
use support\think\Db;
class MigrateMessages extends Command
{
protected static $defaultName = 'migrate:messages';
protected static $defaultDescription = '从老数据库迁移数据到新OpenIM数据库';
private $sdk = null;
private $oldManager = null;
private $newManager = null;
private $retry = 3;
private $delay = 2;
private $backupDir = '/vol3/1000/code/im/admin/backup';
private $currentBackup = null;
private $skipUsers = [];
private $skipGroups = [];
private $stats = [
'users' => ['total' => 0, 'success' => 0, 'failed' => 0],
'groups' => ['total' => 0, 'success' => 0, 'failed' => 0],
'members' => ['total' => 0, 'success' => 0, 'failed' => 0],
'messages' => ['total' => 0, 'success' => 0, 'failed' => 0, 'skipped' => 0],
];
protected function configure(): void
{
$this->addOption('step', 's', InputOption::VALUE_OPTIONAL, '执行步骤: users/groups/members/messages/all', 'all');
$this->addOption('skip-users', null, InputOption::VALUE_OPTIONAL, '跳过的用户ID(逗号分隔)');
$this->addOption('skip-groups', null, InputOption::VALUE_OPTIONAL, '跳过的群ID(逗号分隔)');
$this->addOption('clean', null, InputOption::VALUE_NONE, '清空现有数据后再迁移');
$this->addOption('retry', 'r', InputOption::VALUE_OPTIONAL, '失败重试次数', 3);
}
protected function execute(InputInterface $input, OutputInterface $output): int
{
$step = $input->getOption('step');
$skipUsers = $input->getOption('skip-users') ? explode(',', $input->getOption('skip-users')) : [];
$skipGroups = $input->getOption('skip-groups') ? explode(',', $input->getOption('skip-groups')) : [];
$clean = $input->getOption('clean');
$retry = (int)$input->getOption('retry');
// 自动忽略特殊用户
$defaultSkipUsers = ['group_bot', 'official_team', 'system','imAdmin'];
$skipUsers = array_merge($skipUsers, $defaultSkipUsers);
$this->skipUsers = array_unique($skipUsers);
$this->skipGroups = array_unique($skipGroups);
$this->retry = $retry;
$this->log($output, "╔════════════════════════════════════════════════════════════╗");
$this->log($output, "║ OpenIM 数据迁移工具 v2.0 ║");
$this->log($output, "╚════════════════════════════════════════════════════════════╝");
$this->log($output, "");
if ($clean) {
$this->log($output, "🗑️ 清理模式:会清空现有数据");
}
$this->log($output, "📍 执行步骤: {$step}");
$this->log($output, "");
if($step == 'restore'){
$this->restoreMongoDB($output, '/vol3/1000/code/im/admin/backup/openim_v3_groups_20260413141105.json');
return 0;
}
$this->cleanExistingData($output,[]);
try {
$this->initConnections($output);
//return 0 ;
if ($clean) {
$this->cleanExistingData($output,[
'conversation', 'conversation_version', // 会话相关集合
'data_version', // 数据版本集合
'friend', 'friend_request', 'friend_version', // 好友关系相关集合
'group', 'group_join_version','group_member','group_member_version','group_request', // 群组相关集合
'msg','seq','seq_user' // 消息和序列号相关集合
]);
return 0;
}
cache('admin_token_imAdmin',null);
$steps = $step === 'all' ? [
'users',
'friends',
'groups',
//'members',
'messages'
] : [$step];
foreach ($steps as $s) {
// 备份数据
$backupFile = $this->backupMongoDB($output, $s);
try {
switch ($s) {
case 'users':
$this->migrateUsers($output);
break;
case 'friends':
$this->migrateFriends($output);
break;
case 'groups':
$this->migrateGroups($output);
break;
case 'members':
$this->migrateGroupMembers($output);
break;
case 'messages':
$this->migrateMessages($output);
break;
}
} catch (\Exception $e) {
// 遇到错误,回滚数据
if (!empty($backupFile)) {
$this->restoreMongoDB($output, $backupFile);
}
throw $e;
}
}
$this->printStats($output);
return self::SUCCESS;
} catch (\Exception $e) {
$this->log($output, "❌ 错误: " . $e->getMessage());
//$this->log($output, $e->getTraceAsString());
return self::FAILURE;
}
}
private function migrateUsers(OutputInterface $output): void
{
//之前残留了一部分数据,是单向好友,这里没做删除,所以数据大小和之前的不一样,用户重新删除一次就好了
$this->log($output, "");
$this->log($output, "═════════════════ 步骤1: 迁移用户 ═════════════════");
$this->log($output, "清理旧的数据");
$this->cleanExistingData($output,[
'user'
]);
$user_list = (new \app\model\Openim\User())->setOption('connection','tettt')
->whereNotIn('user_id',$this->skipUsers)
->field('user_id,nickname,face_url')
->select();
$user_list = $user_list->toArray();
// 1. 创建进度条(内置核心方法)
$progressBar = new ProgressBar($output, count($user_list));
// 可选:设置进度条样式(字符、长度等)
$progressBar->setBarCharacter('█');
$progressBar->setEmptyBarCharacter('░');
$progressBar->setProgressCharacter('▶');
$progressBar->setBarWidth(400);
// 2. 开始显示
$progressBar->start();
echo sprintf("\r");
while(count($user_list) > 0){
$step = 100;
$user = array_slice($user_list,0,$step);
$user_list = array_slice($user_list,$step);
$this->sdk->user->userRegister($user);
$progressBar->advance($step);
}
// 4. 结束进度条
$progressBar->finish();
}
private function migrateFriends(OutputInterface $output): void
{
//之前残留了一部分数据,是单向好友,这里没做删除,所以数据大小和之前的不一样,用户重新删除一次就好了
$this->log($output, "");
$this->log($output, "═════════════════ 步骤3: 迁移好友 ═════════════════");
$this->log($output, "清理旧的数据");
// $this->cleanExistingData($output,[
// 'conversation', 'conversation_version', // 会话相关集合
// 'data_version', // 数据版本集合
// 'friend', 'friend_request', 'friend_version', // 好友关系相关集合
// 'group', 'group_join_version','group_member','group_member_version','group_request', // 群组相关集合
// 'msg','seq','seq_user' // 消息和序列号相关集合
// ]);
$user_list = (new \app\model\Openim\User())->setOption('connection','tettt')
->whereNotNull('user_id')
->column('user_id');
// 1. 创建进度条(内置核心方法)
$progressBar = new ProgressBar($output, count($user_list));
// 可选:设置进度条样式(字符、长度等)
$progressBar->setBarCharacter('█');
$progressBar->setEmptyBarCharacter('░');
$progressBar->setProgressCharacter('▶');
$progressBar->setBarWidth(400);
// 2. 开始显示
$progressBar->start();
foreach($user_list as $userID){
$friend_list = (new \app\model\Openim\Friend())->setOption('connection','tettt')
->where('owner_user_id',$userID)
->column('friend_user_id');
if(count($friend_list)){
while(count($friend_list)){
$_friend_list = array_slice($friend_list, 0, 500);
$friend_list = array_slice($friend_list, 500);
$this->sdk->friend->importFriend($userID,$_friend_list);
}
}
$progressBar->advance();
}
// 4. 结束进度条
$progressBar->finish();
}
private function migrateGroups(OutputInterface $output): void
{
$this->log($output, "");
$this->log($output, "═════════════════ 步骤2: 迁移群组 ═════════════════");
$this->log($output, "");
$options = [];
$groups = $this->queryOldDb('group', [], $options);
$this->stats['groups']['total'] = count($groups);
$this->log($output, "📊 找到 {$this->stats['groups']['total']} 个群组");
$processed = 0;
// 1. 创建进度条(内置核心方法)
$progressBar = new ProgressBar($output, count($groups));
// 可选:设置进度条样式(字符、长度等)
$progressBar->setBarCharacter('█');
$progressBar->setEmptyBarCharacter('░');
$progressBar->setProgressCharacter('▶');
$progressBar->setBarWidth(400);
// 2. 开始显示
$progressBar->start();
foreach ($groups as $group) {
$processed++;
$groupID = (string)($group['group_id'] ?? $group['groupID'] ?? '');
if (empty($groupID) || in_array($groupID, $this->skipGroups)) {
$this->stats['groups']['failed']++;
continue;
}
$ownerUserID = (string)($group['owner_user_id'] ?? $group['ownerUserID'] ?? $group['creator_user_id'] ?? $group['creatorUserID'] ?? '');
if (empty($ownerUserID)) {
$this->stats['groups']['failed']++;
continue;
}
$groupName = (string)($group['group_name'] ?? $group['groupName'] ?? '');
$faceURL = (string)($group['face_url'] ?? $group['faceURL'] ?? '');
$introduction = (string)($group['introduction'] ?? '');
$notification = (string)($group['notification'] ?? '');
$ex = (string)($group['ex'] ?? '');
// 群组设置字段
$groupType = (int)($group['group_type'] ?? $group['groupType'] ?? 2);
$needVerification = (int)($group['need_verification'] ?? $group['needVerification'] ?? 0);
$lookMemberInfo = (int)($group['look_member_info'] ?? $group['lookMemberInfo'] ?? 0);
$applyMemberFriend = (int)($group['apply_member_friend'] ?? $group['applyMemberFriend'] ?? 0);
$progress = sprintf("[群组 %d/%d]", $processed, $this->stats['groups']['total']);
if ($processed % 20 == 0 || $processed == 1) {
$this->log($output, "{$progress} 处理中...");
}
$this->log($output, "{$progress} 尝试创建群组: {$groupID}, 群主: {$ownerUserID}");
// 管理员信息
$adminUserIDs = (new \app\model\Openim\GroupMember())->setOption('connection','tettt')
->where('group_id',$groupID)
->where('role_level',60)
->column('user_id');
//cp($adminUserIDs );
// 成员信息
$memberUserIDs = (new \app\model\Openim\GroupMember())->setOption('connection','tettt')
->where('group_id',$groupID)
->where('role_level',20)
->column('user_id');
//cp($memberUserIDs );
$memberUserIDs = array_unique($memberUserIDs);
$_memberUserIDs = array_slice($memberUserIDs, 0, 10);
$memberUserIDs = array_slice($memberUserIDs, 10);
try {
$this->sdk->group->createGroup(
$ownerUserID,
$_memberUserIDs,
$adminUserIDs,
$groupName,
$groupID,
$faceURL,
$introduction,
$notification,
$ex,
$groupType,
$needVerification,
$lookMemberInfo,
$applyMemberFriend
);
while(count($memberUserIDs)){
$_memberUserIDs = array_slice($memberUserIDs, 0, 10);
$memberUserIDs = array_slice($memberUserIDs, 10);
try{
$this->sdk->group->inviteUserToGroup($groupID, $_memberUserIDs);
} catch (\Exception $e) {
$this->log($output, "{$progress} ❌ 邀请成员失败: " . $e->getMessage());
}
}
$this->stats['groups']['success']++;
//$this->log($output, "{$progress} ✅ 创建成功");
} catch (\Exception $e) {
$this->stats['groups']['failed']++;
if ($e->getCode() == 1202 || strpos($e->getMessage(), 'GroupIDExisted') !== false) {
$this->log($output, "{$progress} ️ 群组已存在,跳过创建");
$this->stats['groups']['success']++;
continue;
} else {
$this->log($output, "{$progress} ❌ 创建失败: " . $e->getMessage());
}
}
$progressBar->advance();
}
// 4. 结束进度条
$progressBar->finish();
}
private function migrateGroupMembers(OutputInterface $output): void
{
$this->log($output, "");
$this->log($output, "═════════════════ 步骤3: 迁移群成员 ═════════════════");
$this->log($output, "");
$groups = $this->queryOldDb('group', [], ['projection' => ['group_id' => 1, 'groupID' => 1]]);
$groupIDs = [];
foreach ($groups as $g) {
$gid = (string)($g['group_id'] ?? $g['groupID'] ?? '');
if (!empty($gid) && !in_array($gid, $this->skipGroups)) {
$groupIDs[] = $gid;
}
}
$totalMembers = 0;
foreach ($groupIDs as $groupID) {
$members = $this->queryOldDb('group_member', ['group_id' => $groupID]);
$ownerUserID = null;
$adminUserIDs = [];
$memberUserIDs = [];
foreach ($members as $member) {
$userID = (string)($member['user_id'] ?? $member['userID'] ?? '');
if (empty($userID)) continue;
$roleLevel = (int)($member['role_level'] ?? $member['roleLevel'] ?? 0);
if ($roleLevel == 100) {
$ownerUserID = $userID;
} elseif ($roleLevel == 60) {
$adminUserIDs[] = $userID;
} else {
$memberUserIDs[] = $userID;
}
}
if (empty($memberUserIDs) && empty($adminUserIDs)) {
continue;
}
$totalMembers += count($memberUserIDs);
$this->stats['members']['total'] += count($memberUserIDs);
$progress = sprintf("[群 %s 成员 %d]", $groupID, count($memberUserIDs));
$this->log($output, "{$progress} 处理中...");
// 分批邀请,每批最多50人
$batches = array_chunk($memberUserIDs, 50);
foreach ($batches as $batch) {
if (empty($batch)) {
continue;
}
$attempts = 0;
while ($attempts < $this->retry) {
try {
$this->log($output, "{$progress} 邀请成员: " . implode(', ', array_slice($batch, 0, 5)) . (count($batch) > 5 ? '...' : ''));
$result = $this->sdk->group->inviteUserToGroup($groupID, $ownerUserID ?? 'admin', $batch);
$this->log($output, "{$progress} API返回: " . json_encode($result, JSON_UNESCAPED_UNICODE));
if (isset($result['errCode']) && $result['errCode'] != 0) {
// 检查是否是重复键错误
if (strpos($result['errMsg'] ?? '', 'duplicate key') !== false || strpos($result['errMsg'] ?? '', 'DuplicateKey') !== false) {
$this->log($output, "{$progress} ️ 部分成员已存在,跳过");
$this->stats['members']['success'] += count($batch);
} else {
$this->stats['members']['failed'] += count($batch);
$this->log($output, "{$progress} ❌ 邀请失败: " . ($result['errMsg'] ?? '未知错误'));
}
} else {
$this->stats['members']['success'] += count($batch);
$this->log($output, "{$progress} ✅ 邀请成功");
}
break;
} catch (\Exception $e) {
$attempts++;
// 检查是否是重复键错误
if (strpos($e->getMessage(), 'duplicate key') !== false || strpos($e->getMessage(), 'DuplicateKey') !== false) {
$this->log($output, "{$progress} ️ 部分成员已存在,跳过");
$this->stats['members']['success'] += count($batch);
break;
} elseif ($attempts >= $this->retry) {
$this->stats['members']['failed'] += count($batch);
$this->log($output, "{$progress} ❌ 邀请异常: " . $e->getMessage());
} else {
$this->log($output, "{$progress} ⚠️ 邀请失败,第 {$attempts}/{$this->retry} 次重试...");
usleep(100000);
}
}
}
usleep(10000);
}
}
$this->log($output, "📊 共处理 {$totalMembers} 个群成员");
}
private function migrateMessages(OutputInterface $output): void
{
$this->log($output, "");
$this->log($output, "═════════════════ 步骤4: 迁移消息 ═════════════════");
$this->log($output, "");
$pipeline = [
['$unwind' => '$msgs'],
['$match' => ['msgs.msg' => ['$ne' => null]]],
['$sort' => ['msgs.msg.send_time' => 1]],
];
$pipeline[] = ['$project' => ['doc_id' => 1, 'msg' => '$msgs.msg']];
$command = new \MongoDB\Driver\Command([
'aggregate' => 'msg',
'pipeline' => $pipeline,
'cursor' => new \stdClass
]);
$cursor = $this->oldManager->executeCommand('tettt', $command);
$messages = [];
foreach ($cursor as $doc) {
$messages[] = $this->bsonToArray($doc);
}
$this->stats['messages']['total'] = count($messages);
$this->log($output, "📊 找到 {$this->stats['messages']['total']} 条消息");
$processed = 0;
foreach ($messages as $doc) {
$processed++;
$msg = $doc['msg'] ?? [];
if (empty($msg)) {
$this->stats['messages']['skipped']++;
continue;
}
$sendID = (string)($msg['send_id'] ?? $msg['sendID'] ?? '');
$recvID = (string)($msg['recv_id'] ?? $msg['recvID'] ?? '');
$groupID = (string)($msg['group_id'] ?? $msg['groupID'] ?? '');
$contentType = (int)($msg['content_type'] ?? $msg['contentType'] ?? 101);
$sessionType = (int)($msg['session_type'] ?? $msg['sessionType'] ?? 1);
if (in_array($sendID, $this->skipUsers)) {
$this->stats['messages']['skipped']++;
continue;
}
if ($sessionType == 3 && in_array($groupID, $this->skipGroups)) {
$this->stats['messages']['skipped']++;
continue;
}
// 跳过特殊消息类型(如系统通知等)
if (in_array($contentType, [200, 201, 202, 203, 204, 205])) {
$this->stats['messages']['skipped']++;
continue;
}
$progress = sprintf("[消息 %d/%d]", $processed, $this->stats['messages']['total']);
if ($processed % 100 == 0 || $processed == 1) {
$this->log($output, "{$progress} 处理中...");
}
try {
$this->log($output, "{$progress} 发送消息: sendID={$sendID}, recvID={$recvID}, groupID={$groupID}, contentType={$contentType}, sessionType={$sessionType}");
$result = $this->sendMessage($msg);
$this->log($output, "{$progress} API返回: " . json_encode($result, JSON_UNESCAPED_UNICODE));
if ($result['success'] ?? false) {
$this->stats['messages']['success']++;
if ($processed % 100 == 0) {
$this->log($output, "{$progress} ✅ 发送成功");
}
} else {
$this->stats['messages']['failed']++;
$this->log($output, "{$progress} ❌ 发送失败: " . ($result['errMsg'] ?? '未知错误'));
// 遇到NotInGroupYetError时跳过,继续迁移其他消息
if (strpos(($result['errMsg'] ?? ''), 'NotInGroupYetError') === false) {
// 遇到其他错误时退出
throw new \Exception("消息发送失败: " . ($result['errMsg'] ?? '未知错误'));
} else {
$this->log($output, "{$progress} ️ 跳过NotInGroupYetError错误,继续迁移");
}
}
} catch (\Exception $e) {
$this->stats['messages']['failed']++;
$this->log($output, "{$progress} ❌ 发送异常: " . $e->getMessage());
// 遇到NotInGroupYetError异常时跳过,继续迁移其他消息
if (strpos($e->getMessage(), 'NotInGroupYetError') === false) {
// 遇到其他异常时退出
throw $e;
} else {
$this->log($output, "{$progress} ️ 跳过NotInGroupYetError异常,继续迁移");
}
}
if ($this->delay > 0) {
usleep($this->delay * 1000);
}
}
}
private function sendMessage(array $msg): array
{
$sendID = (string)($msg['send_id'] ?? $msg['sendID'] ?? '');
$recvID = (string)($msg['recv_id'] ?? $msg['recvID'] ?? '');
$groupID = (string)($msg['group_id'] ?? $msg['groupID'] ?? '');
$contentType = (int)($msg['content_type'] ?? $msg['contentType'] ?? 101);
$sessionType = (int)($msg['session_type'] ?? $msg['sessionType'] ?? 1);
$sendTime = (int)($msg['send_time'] ?? $msg['sendTime'] ?? 0);
$content = $msg['content'] ?? '';
$ex = (string)($msg['ex'] ?? '');
if (empty($sendID)) {
return ['success' => false, 'errMsg' => 'sendID为空'];
}
$contentData = $this->parseContent($content, $contentType);
// 构建消息数据
$messageData = [
'content' => $contentData,
'contentType' => $contentType,
'sendTime' => $sendTime,
'ex' => $ex,
'isOnlineOnly' => false,
'notOfflinePush' => true
];
// 根据会话类型调用不同的发送方法
if ($sessionType == 1 && !empty($recvID)) {
// 单聊
$result = $this->sdk->message->sendSingleMessage($sendID, $recvID, $messageData);
} elseif (!empty($groupID)) {
// 群聊
$result = $this->sdk->message->sendGroupMessage($sendID, $groupID, $messageData);
} else {
return ['success' => false, 'errMsg' => '缺少必要的参数'];
}
return [
'success' => !($result['errCode'] ?? 0),
'errMsg' => $result['errMsg'] ?? ''
];
}
private function parseContent($content, int $contentType): array
{
if (is_string($content)) {
$decoded = json_decode($content, true);
if (json_last_error() === JSON_ERROR_NONE && is_array($decoded)) {
return $decoded;
}
return ['content' => $content, 'text' => $content];
}
if (is_array($content)) {
return $content;
}
return ['content' => '', 'text' => ''];
}
private function bsonToArray($data): array
{
if ($data instanceof \MongoDB\Model\BSONArray) {
return $data->getArrayCopy();
}
if ($data instanceof \MongoDB\Model\BSONDocument) {
return $data->getArrayCopy();
}
if (is_object($data)) {
return json_decode(json_encode($data), true);
}
return is_array($data) ? $data : [];
}
private function printStats(OutputInterface $output): void
{
$this->log($output, "");
$this->log($output, "╔════════════════════════════════════════════════════════════╗");
$this->log($output, "║ 迁移统计报告 ║");
$this->log($output, "╠════════════════════════════════════════════════════════════╣");
$this->log($output, "║ 用户: 总数 {$this->stats['users']['total']}, 成功 {$this->stats['users']['success']}, 失败 {$this->stats['users']['failed']}");
$this->log($output, "║ 群组: 总数 {$this->stats['groups']['total']}, 成功 {$this->stats['groups']['success']}, 失败 {$this->stats['groups']['failed']}");
$this->log($output, "║ 成员: 总数 {$this->stats['members']['total']}, 成功 {$this->stats['members']['success']}, 失败 {$this->stats['members']['failed']}");
$this->log($output, "║ 消息: 总数 {$this->stats['messages']['total']}, 成功 {$this->stats['messages']['success']}, 失败 {$this->stats['messages']['failed']}, 跳过 {$this->stats['messages']['skipped']}");
$this->log($output, "╚════════════════════════════════════════════════════════════╝");
}
private function log(OutputInterface $output, string $message): void
{
$output->writeln($message);
}
/**
* 获取OpenIM SDK实例
*
* @return object
*/
function getSdk()
{
if ($this->sdk) {
return $this->sdk;
}
$this->sdk = new \support\OpenImSdk\Client([
'host' => config('openim.server'),
'secret' => config('openim.secret'),
]);
return $this->sdk;
}
private function initConnections(OutputInterface $output): void
{
$this->log($output, "正在初始化连接...");
$this->getSdk();
$uri = 'mongodb://commie:n1e5a6s6m7@127.0.0.1:37017/tettt?authSource=admin';
$this->oldManager = new \MongoDB\Driver\Manager($uri);
$this->log($output, "✅ 连接成功");
}
private function cleanExistingData(OutputInterface $output,$collections=[]): void
{
// 记录开始清理数据的日志信息
$this->log($output, "\n═════════════════ 清理现有数据 ═════════════════");
$this->log($output, "");
// 构建新数据库(OpenIM v3)的MongoDB连接URI
$uri = 'mongodb://commie:n1e5a6s6m7@127.0.0.1:37017/openim_v3?authSource=admin';
// 创建MongoDB驱动管理器实例,用于操作新数据库
$this->newManager = new \MongoDB\Driver\Manager($uri);
try {
// 记录开始清理数据的状态
$this->log($output, "正在清理mongodb数据...");
// 定义需要清空的数据集合列表(保留user集合,清空其他所有业务数据)
// 遍历所有需要清理的集合,逐个执行清空操作
foreach ($collections as $collection) {
try {
// 创建批量写入操作对象
$bulk = new \MongoDB\Driver\BulkWrite;
// 添加删除所有文档的操作(空条件表示删除全部)
$bulk->delete([]);
// 执行批量删除操作,指定数据库和集合名称
$this->newManager->executeBulkWrite('openim_v3.' . $collection, $bulk);
// 记录该集合清空成功的日志
$this->log($output, "已清空集合: {$collection}");
} catch (\Exception $e) {
// 单个集合清空失败时记录警告信息,不影响其他集合的清理
$this->log($output, "⚠️ 清空集合 {$collection} 失败: " . $e->getMessage());
}
}
$this->log($output, "正在清理redis数据...");
$redis = new \Redis();
$host = '127.0.0.1';
$port = 16379;
$password = 'n1e5a6s6m7';
$output->writeln("连接 Redis: {$host}:{$port}");
if ($redis->connect($host, $port)) {
if (!empty($password)) {
$redis->auth($password);
}
$result = $redis->flushAll();
if ($result) {
$output->writeln("✅ Redis 清空成功");
} else {
$output->writeln("❌ Redis 清空失败");
}
} else {
$output->writeln("❌ 无法连接到 Redis");
}
// 记录所有数据清理完成的日志
$this->log($output, "✅ 数据清理完成");
} catch (\Exception $e) {
// 捕获整体清理过程中的异常,记录错误但不抛出,确保程序继续执行
$this->log($output, "❌ 清理数据失败: " . $e->getMessage());
// 不抛出异常,继续执行
}
}
private function queryOldDb(string $collection, array $filter = [], array $options = []): array
{
$query = new \MongoDB\Driver\Query($filter, $options);
$cursor = $this->oldManager->executeQuery('tettt.' . $collection, $query);
$result = [];
foreach ($cursor as $doc) {
$result[] = $this->bsonToArray($doc);
}
return $result;
}
/**
* 备份MongoDB数据
* @param OutputInterface $output
* @param string $step
* @return string
*/
private function backupMongoDB(OutputInterface $output, string $step): string
{
$this->log($output, "═════════════════ 备份MongoDB数据 ═════════════════");
// 确保备份目录存在
if (!is_dir($this->backupDir)) {
mkdir($this->backupDir, 0755, true);
}
// 生成备份文件名
$timestamp = date('YmdHis');
$backupFile = "{$this->backupDir}/openim_v3_{$step}_{$timestamp}.json";
try {
// 使用现有的新数据库连接
if (!$this->newManager) {
$uri = 'mongodb://commie:n1e5a6s6m7@127.0.0.1:37017/openim_v3?authSource=admin';
$this->newManager = new \MongoDB\Driver\Manager($uri);
}
// 获取所有集合
$command = new \MongoDB\Driver\Command(['listCollections' => 1]);
$cursor = $this->newManager->executeCommand('openim_v3', $command);
$collections = $cursor->toArray();
//$this->log($output, "找到 " . count($collections) . " 个集合");
$backupData = [];
// 备份每个集合
foreach ($collections as $collection) {
$collectionName = $collection->name;
if (in_array($collectionName, ['system.indexes', 'system.profile'])) {
continue;
}
//$this->log($output, "备份集合: {$collectionName}");
$query = new \MongoDB\Driver\Query([]);
$cursor = $this->newManager->executeQuery('openim_v3.' . $collectionName, $query);
$documents = [];
foreach ($cursor as $doc) {
$document = $this->bsonToArray($doc);
// 处理ObjectId
if (isset($document['_id']) && is_array($document['_id']) && isset($document['_id']['$oid'])) {
$document['_id'] = $document['_id']['$oid'];
}
$documents[] = $document;
}
if (!empty($documents)) {
$backupData[$collectionName] = $documents;
//$this->log($output, " - 备份了 " . count($documents) . " 条记录");
}
}
// 保存备份文件
file_put_contents($backupFile, json_encode($backupData, JSON_UNESCAPED_UNICODE | JSON_PRETTY_PRINT));
$this->log($output, "✅ 备份成功: {$backupFile} (" . filesize($backupFile) . " 字节)");
$this->currentBackup = $backupFile;
return $backupFile;
} catch (\Exception $e) {
$this->log($output, "❌ 备份失败: " . $e->getMessage());
return '';
}
}
/**
* 回滚MongoDB数据
* @param OutputInterface $output
* @param string $backupFile
* @return bool
*/
private function restoreMongoDB(OutputInterface $output, string $backupFile): bool
{
$this->log($output, "═════════════════ 回滚MongoDB数据 ═════════════════");
if (!file_exists($backupFile)) {
$this->log($output, "❌ 备份文件不存在: {$backupFile}");
return false;
}
try {
// 读取备份文件
$backupData = json_decode(file_get_contents($backupFile), true);
if (empty($backupData)) {
$this->log($output, "❌ 备份文件为空");
return false;
}
// 连接到新数据库
$uri = 'mongodb://commie:n1e5a6s6m7@127.0.0.1:37017/openim_v3?authSource=admin';
$manager = new \MongoDB\Driver\Manager($uri);
// 清空所有集合
$this->log($output, "清空现有数据...");
foreach (array_keys($backupData) as $collectionName) {
$bulk = new \MongoDB\Driver\BulkWrite;
$bulk->delete([]);
$manager->executeBulkWrite('openim_v3.' . $collectionName, $bulk);
}
// 恢复数据
foreach ($backupData as $collectionName => $documents) {
$documentCount = count($documents);
$this->log($output, "恢复集合: {$collectionName} ({$documentCount} 条记录)");
$bulk = new \MongoDB\Driver\BulkWrite;
foreach ($documents as $document) {
// 处理_id字段
if (isset($document['_id']) && is_string($document['_id'])) {
// 尝试创建ObjectId
try {
$document['_id'] = new \MongoDB\BSON\ObjectId($document['_id']);
} catch (\Exception $e) {
// 如果不是有效的ObjectId格式,保持原样
}
}
$bulk->insert($document);
}
$result = $manager->executeBulkWrite('openim_v3.' . $collectionName, $bulk);
$this->log($output, "恢复成功: {$result->getInsertedCount()} 条记录");
}
$this->log($output, "✅ 回滚成功");
return true;
} catch (\Exception $e) {
$this->log($output, "❌ 回滚失败: " . $e->getMessage());
return false;
}
}
}