Files
2026-04-13 20:00:32 +08:00

1621 lines
50 KiB
PHP
Executable File

<?php
namespace app\command;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Output\OutputInterface;
use support\think\Db;
/**
* OpenIM 命令行工具
*
* 提供OpenIM相关的命令行操作,包括:
* - 用户ID转换
* - URL修复
* - 消息处理
* - 群组同步
* - 缓存管理
*/
class Openim extends Command
{
/**
* 命令默认名称
* @var string
*/
protected static $defaultName = 'Openim';
/**
* 命令默认描述
* @var string
*/
protected static $defaultDescription = 'OpenIM 命令行工具';
/**
* OpenIM SDK实例
* @var object|null
*/
public $sdk = null;
/**
* 每批处理消息数量
* @var int
*/
const BATCH_SIZE = 1000;
/**
* 特殊系统账号映射表
* @var array
*/
const SPECIAL_ACCOUNTS = [
'official_team' => 'SystemOfficialTeam',
'officialteam' => 'SystemOfficialTeam',
'group_bot' => 'SystemGroupBot',
'imAdmin' => 'SystemImAdmin',
'system' => 'SystemAccount'
];
/**
* 单聊类型常量
* @var int
*/
const SINGLE_CHAT_TYPE = 1;
/**
* 群聊类型常量
* @var int
*/
const GROUP_CHAT_TYPE = 2;
function send_group_msg(InputInterface $input, OutputInterface $output):int
{
$sdk = $this->getSdk();
for($i=100;$i<101;$i++){
$data = $sdk->message->sendGroupMessage('69546124','62106234',[
"content" => [
"content" => '第'.$i.'条消息'
],
"contentType" => 101,
"isOnlineOnly" => false,
"notOfflinePush" => false,
"sendTime" => time()*1000,
"offlinePushInfo" => [
"title" => "send message",
"desc" => '第'.$i.'条消息',
"ex" => "",
"iOSPushSound" => "default",
"iOSBadgeCount" => true
],
"ex" => ""
]);
cp($data);
}
return 0;
}
function revoke_msg(InputInterface $input, OutputInterface $output)
{
$user_id = $input->getOption('userID');
$msg_seq = $input->getOption('seq');
if(!$user_id){
cp('user_id不能为空');
return 1;
}
if(!$msg_seq){
cp('msg_seq不能为空');
return 1;
}
$data = $this->getSdk()->message->revokeMessage('sg_2639473367',$msg_seq,$user_id);
//$data = $this->getSdk()->message->userClearAllMsg('100014');
cp($data);
return 0;
}
function delete_all_msg(InputInterface $input, OutputInterface $output)
{
$user_id = $input->getOption('userID');
if(!$user_id){
cp('user_id不能为空');
return 1;
}
$data = $this->getSdk()->message->userClearAllMsg($user_id);
cp($data);
return 0;
}
/**
* 修复seq(空方法,待实现)
*
* @param InputInterface $input
* @param OutputInterface $output
* @return void
*/
function fix_seq(InputInterface $input, OutputInterface $output)
{
//
$conversation_list = \app\model\Openim\Conversation::order('id','asc')->select();
foreach($conversation_list as $item){
$item->msgs = json_decode($item->msgs,true);
$item->save();
}
$list = \app\model\Openim\Msg::whereLike('doc_id','sg_3136475753%')->orderBy('id','desc')->select();
\app\model\Openim\Seq::where('conversation_id','sg_3136475753')->update([
'max_seq'=>count($list)*100+1
]);
\app\model\Openim\SeqUser::where('conversation_id','sg_3136475753')->update([
'read_seq'=>count($list)*100+1
]);
foreach($list as $item){
$item->msgs = json_decode($item->msgs,true);
$item->save();
}
}
/**
* 修复并清理消息数据
*
* 该方法是 OpenIM seq 修复的核心功能,包含以下操作:
* 1. 清理消息文档中的空消息(msg 为 null 的条目)
* 2. 根据实际消息数据重新计算并修复 seq 表
* 3. 根据用户加入时间重新计算并修复 seq_user 表
* 4. 清理 Redis 缓存
*
* OpenIM Seq 设计机制说明:
* - seq 表:记录每个会话(conversation)的最小和最大消息序号
* - min_seq: 会话中第一条消息的 seq
* - max_seq: 会话中最后一条消息的 seq + 1(用于分配下一个 seq)
*
* - seq_user 表:记录每个用户在每个会话中的消息范围
* - min_seq: 用户加入会话后第一条消息的 seq
* - max_seq: 会话的当前最大 seq
* - read_seq: 用户已读到的 seq(默认为 max_seq,表示全部已读)
*
* - 消息读取流程:
* 1. 客户端通过 GetConversationMaxSeq 获取会话最大 seq
* 2. 客户端通过 GetHasReadSeq 获取用户已读 seq
* 3. 未读消息数 = max_seq - read_seq
* 4. 拉取消息时,根据 seq 范围从 msg 表查询
*
* - 已读标记流程:
* 1. 客户端调用 MarkConversationAsRead 标记已读
* 2. 服务端更新 seq_user 表的 read_seq
* 3. 发送已读回执通知其他端
*
* @param bool $dryRun 是否为试运行模式(只显示不执行)
* @param bool $clearCache 是否清理 Redis 缓存
* @return array 统计信息
*/
protected function cleanEmptyMessages($dryRun = false, $clearCache = true)
{
cp("========== 开始修复消息和 Seq 数据 ==========");
$stats = [
'total_docs' => 0,
'cleaned_docs' => 0,
'total_msgs' => 0,
'cleaned_msgs' => 0,
'seq_updated' => 0,
'seq_user_updated' => 0,
'conversations' => []
];
$msgModel = new \app\model\Openim\Msg();
$seqModel = new \app\model\Openim\Seq();
$seqUserModel = new \app\model\Openim\SeqUser();
$friendModel = new \app\model\Openim\Friend();
$groupMemberModel = new \app\model\Openim\GroupMember();
$total = $msgModel->count('id');
cp("消息文档总数: {$total}");
$conversations = [];
for ($i = 0; $i < $total; $i += self::BATCH_SIZE) {
$list = $msgModel->limit($i, self::BATCH_SIZE)->select();
foreach ($list as $item) {
$stats['total_docs']++;
$docIdParts = explode(':', $item['doc_id']);
$conversationId = $docIdParts[0];
if (!isset($conversations[$conversationId])) {
$conversations[$conversationId] = [
'messages' => [],
'min_seq' => PHP_INT_MAX,
'max_seq' => 0,
'user_joins' => []
];
}
$originalMsgCount = count($item['msgs'] ?? []);
$validMsgs = [];
if (!empty($item['msgs']) && is_array($item['msgs'])) {
foreach ($item['msgs'] as $msg) {
if (!is_null($msg['msg'])) {
$validMsgs[] = $msg;
$seq = $msg['msg']['seq'] ?? 0;
$sendTime = $msg['msg']['send_time'] ?? 0;
$sendId = $msg['msg']['send_id'] ?? '';
$recvId = $msg['msg']['recv_id'] ?? '';
$groupId = $msg['msg']['group_id'] ?? '';
$sessionType = $msg['msg']['session_type'] ?? self::SINGLE_CHAT_TYPE;
if ($seq > 0) {
$conversations[$conversationId]['messages'][] = [
'seq' => $seq,
'send_time' => $sendTime,
'send_id' => $sendId,
'recv_id' => $recvId,
'group_id' => $groupId,
'session_type' => $sessionType
];
$conversations[$conversationId]['min_seq'] = min($conversations[$conversationId]['min_seq'], $seq);
$conversations[$conversationId]['max_seq'] = max($conversations[$conversationId]['max_seq'], $seq);
}
$stats['total_msgs']++;
} else {
$stats['cleaned_msgs']++;
}
}
}
$cleanedMsgCount = count($validMsgs);
if ($cleanedMsgCount != $originalMsgCount) {
if (!$dryRun) {
$item->msgs = $validMsgs;
$item->save();
}
$stats['cleaned_docs']++;
cp("文档 {$item['doc_id']}: 清理了 " . ($originalMsgCount - $cleanedMsgCount) . " 条空消息");
}
if ($stats['total_docs'] % 100 == 0) {
cp("已处理 {$stats['total_docs']}/{$total} 个文档...");
}
}
}
cp("\n========== 分析会话数据 ==========");
$stats['conversations'] = $conversations;
foreach ($conversations as $convId => $convData) {
if (empty($convData['messages'])) {
continue;
}
sort($convData['messages']);
$minSeq = $convData['min_seq'];
$maxSeq = $convData['max_seq'] + 1;
cp("会话 {$convId}: min_seq={$minSeq}, max_seq={$maxSeq}, 消息数=" . count($convData['messages']));
if (!$dryRun) {
$this->updateSeq($convId, $minSeq, $maxSeq);
}
$stats['seq_updated']++;
$userJoins = $this->getUserJoinTimes($convId, $friendModel, $groupMemberModel);
foreach ($userJoins as $userId => $joinTime) {
$userMinSeq = $this->calculateUserMinSeq($convData['messages'], $joinTime, $maxSeq);
$userMaxSeq = $maxSeq - 1;
$readSeq = $userMaxSeq;
if (!$dryRun) {
$this->updateSeqUser($userId, $convId, $userMinSeq, $userMaxSeq, $readSeq);
}
$stats['seq_user_updated']++;
if ($stats['seq_user_updated'] % 100 == 0) {
cp("已更新 {$stats['seq_user_updated']} 条 seq_user 记录...");
}
}
}
if ($clearCache && !$dryRun) {
$this->clearSeqCache();
}
cp("\n========== 修复完成 ==========");
cp("统计信息:");
cp(" - 处理文档数: {$stats['total_docs']}");
cp(" - 清理文档数: {$stats['cleaned_docs']}");
cp(" - 消息总数: {$stats['total_msgs']}");
cp(" - 清理消息数: {$stats['cleaned_msgs']}");
cp(" - 更新 seq 数: {$stats['seq_updated']}");
cp(" - 更新 seq_user 数: {$stats['seq_user_updated']}");
return $stats;
}
/**
* 更新 seq 表
*
* @param string $conversationId 会话ID
* @param int $minSeq 最小seq
* @param int $maxSeq 最大seq
* @return void
*/
protected function updateSeq($conversationId, $minSeq, $maxSeq)
{
$seqModel = new \app\model\Openim\Seq();
$existing = $seqModel->where('conversation_id', $conversationId)->find();
if ($existing) {
$seqModel->where('conversation_id', $conversationId)->update([
'min_seq' => $minSeq,
'max_seq' => $maxSeq
]);
} else {
$seqModel->insert([
'conversation_id' => $conversationId,
'min_seq' => $minSeq,
'max_seq' => $maxSeq
]);
}
}
/**
* 更新 seq_user 表
*
* @param string $userId 用户ID
* @param string $conversationId 会话ID
* @param int $minSeq 用户最小seq
* @param int $maxSeq 用户最大seq
* @param int $readSeq 已读seq
* @return void
*/
protected function updateSeqUser($userId, $conversationId, $minSeq, $maxSeq, $readSeq)
{
$seqUserModel = new \app\model\Openim\SeqUser();
$existing = $seqUserModel->where('user_id', $userId)
->where('conversation_id', $conversationId)
->find();
if ($existing) {
$seqUserModel->where('user_id', $userId)
->where('conversation_id', $conversationId)
->update([
'min_seq' => $minSeq,
'max_seq' => $maxSeq,
'read_seq' => $readSeq
]);
} else {
$seqUserModel->insert([
'user_id' => $userId,
'conversation_id' => $conversationId,
'min_seq' => $minSeq,
'max_seq' => $maxSeq,
'read_seq' => $readSeq
]);
}
}
/**
* 获取用户加入会话的时间
*
* @param string $conversationId 会话ID
* @param object $friendModel 好友模型
* @param object $groupMemberModel 群成员模型
* @return array 用户ID => 加入时间戳
*/
protected function getUserJoinTimes($conversationId, $friendModel, $groupMemberModel)
{
$userJoins = [];
$parts = explode('_', $conversationId);
if (count($parts) >= 2 && $parts[0] === 'si') {
$user1 = $parts[1];
$user2 = $parts[2] ?? '';
$friend = $friendModel->where('owner_user_id', $user1)
->where('friend_user_id', $user2)
->find();
if ($friend) {
$joinTime = strtotime($friend['create_time']) * 1000;
$userJoins[$user1] = $joinTime;
$userJoins[$user2] = $joinTime;
}
} elseif (count($parts) >= 2 && $parts[0] === 'sg') {
$groupId = $parts[1];
$members = $groupMemberModel->where('group_id', $groupId)->select();
foreach ($members as $member) {
$userJoins[$member['user_id']] = strtotime($member['join_time']) * 1000;
}
}
return $userJoins;
}
/**
* 计算用户在会话中的最小 seq
*
* 根据用户加入时间,找到用户加入后的第一条消息的 seq
*
* @param array $messages 消息列表
* @param int $joinTime 用户加入时间戳
* @param int $maxSeq 会话最大seq
* @return int 用户最小seq
*/
protected function calculateUserMinSeq($messages, $joinTime, $maxSeq)
{
foreach ($messages as $msg) {
if ($msg['send_time'] >= $joinTime) {
return $msg['seq'];
}
}
return $maxSeq;
}
/**
* 清理 Seq 相关的 Redis 缓存
*
* 清理以下缓存键:
* - SEQ_USER_MAX:* - 用户最大 seq 缓存
* - SEQ_USER_MIN:* - 用户最小 seq 缓存
* - SEQ_USER_READ:* - 用户已读 seq 缓存
* - MALLOC_SEQ:* - 会话 seq 分配缓存
* - MSG_CACHE:* - 消息缓存
*
* @return void
*/
protected function clearSeqCache()
{
cp("\n清理 Redis 缓存...");
$patterns = [
'SEQ_USER_MAX:*',
'SEQ_USER_MIN:*',
'SEQ_USER_READ:*',
'MALLOC_SEQ:*',
'MSG_CACHE:*'
];
$totalDeleted = 0;
foreach ($patterns as $pattern) {
try {
$keys = cache()->handler()->keys($pattern);
if (!empty($keys)) {
cache()->handler()->del(...$keys);
$totalDeleted += count($keys);
cp(" 删除 {$pattern}: " . count($keys) . " 个键");
}
} catch (\Exception $e) {
cp(" 警告: 清理 {$pattern} 失败: " . $e->getMessage());
}
}
cp("共清理 {$totalDeleted} 个缓存键");
}
/**
* 修复头像URL
*
* 将旧的CDN域名替换为新的CDN域名,
* 并同步更新用户头像和群组头像
*
* @return int
*/
function fixurl()
{
$sdk = $this->getSdk();
$oldUrl = 'http://103.39.222.184:10002/object/';
$newUrl = 'https://s1.shun777.com/imapi/object/';
$this->updateUserAvatars($sdk, $oldUrl, $newUrl);
$this->updateGroupAvatars($oldUrl, $newUrl);
$this->cleanEmptyMessages();
return 0;
}
/**
* 更新用户头像
*
* @param object $sdk OpenIM SDK实例
* @param string $oldUrl 旧URL
* @param string $newUrl 新URL
* @return void
*/
protected function updateUserAvatars($sdk, $oldUrl, $newUrl)
{
$users = Db::name('User')
->whereLike('avatar', '%' . $oldUrl)
->field('avatar,id,userID')
->select();
foreach ($users as $user) {
$avatar = str_replace($oldUrl, $newUrl, $user['avatar']);
Db::name('User')
->where('id', $user['id'])
->update(['avatar' => $avatar]);
$userData = [
'faceURL' => $avatar
];
$sdk->user->updateUserInfo($user['userID'], $userData);
}
}
/**
* 更新群组头像
*
* @param string $oldUrl 旧URL
* @param string $newUrl 新URL
* @return void
*/
protected function updateGroupAvatars($oldUrl, $newUrl)
{
$groupModel = new \app\model\Openim\Group();
$groupList = $groupModel->whereLike('face_url', '%' . $oldUrl)->select();
foreach ($groupList as $group) {
$avatar = str_replace($oldUrl, $newUrl, $group['face_url']);
$groupModel->where('id', $group['id'])->update(['face_url' => $avatar]);
}
}
/**
* 转换用户ID命令入口
*
* @param InputInterface $input
* @param OutputInterface $output
* @return int
*/
function convertUserID(InputInterface $input, OutputInterface $output): int
{
$userID = $input->getOption('userID');
$this->_convertUserID($userID, $output);
return 0;
}
/**
* 转换用户ID内部方法
*
* 批量将用户ID从旧格式转换为新格式
*
* @param int $userID
* @param OutputInterface $output
* @return int
*/
function _convertUserID($userID, $output): int
{
$oldUserID = \support\Encrypt::userIDEncode($userID);
$newUserID = \support\Encrypt::intEncode($userID);
cp("开始转换用户ID: {$userID} -> {$oldUserID} -> {$newUserID}");
for ($i = 100006; $i < 102028; $i++) {
$newUserID = \support\Encrypt::intEncode($i);
Db::name('User')->where('id', $i)->update(['userID' => $newUserID]);
}
cp("用户ID:{$userID} -> {$newUserID} 转换完成");
return 0;
}
/**
* 转换好友表
*
* @param string $oldUserID
* @param string $newUserID
* @param OutputInterface $output
* @return void
*/
protected function convertFriendTable($oldUserID, $newUserID, $output)
{
$model = new \app\model\Openim\Friend();
$list = $model->select();
$count = 0;
foreach ($list as $item) {
$saveData = [];
if ($item['owner_user_id']) {
$saveData['owner_user_id'] = $this->turnID($item['owner_user_id']);
}
if ($item['friend_user_id']) {
$saveData['friend_user_id'] = $this->turnID($item['friend_user_id']);
}
if ($item['operator_user_id']) {
$saveData['operator_user_id'] = $this->turnID($item['operator_user_id']);
}
if (!empty($saveData)) {
$model->where('id', $item['id'])->update($saveData);
}
$count++;
}
$output->writeln("Friend 表转换完成,共 {$count} 条记录");
}
/**
* 转换好友请求表
*
* @param string $oldUserID
* @param string $newUserID
* @param OutputInterface $output
* @return void
*/
protected function convertFriendRequestTable($oldUserID, $newUserID, $output)
{
$model = new \app\model\Openim\FriendRequest();
$list = $model->select();
$count = 0;
foreach ($list as $item) {
$saveData = [];
if ($item['from_user_id']) {
$saveData['from_user_id'] = $this->turnID($item['from_user_id']);
}
if ($item['to_user_id']) {
$saveData['to_user_id'] = $this->turnID($item['to_user_id']);
}
if ($item['handler_user_id']) {
$saveData['handler_user_id'] = $this->turnID($item['handler_user_id']);
}
if (!empty($saveData)) {
$model->where('id', $item['id'])->update($saveData);
}
$count++;
}
$output->writeln("FriendRequest 表转换完成,共 {$count} 条记录");
}
/**
* 转换群组表
*
* @param string $oldUserID
* @param string $newUserID
* @param OutputInterface $output
* @return void
*/
protected function convertGroupTable($oldUserID, $newUserID, $output)
{
$model = new \app\model\Openim\Group();
$list = $model->select();
$count = 0;
foreach ($list as $item) {
$saveData = [];
if ($item['creator_user_id']) {
$saveData['creator_user_id'] = $this->turnID($item['creator_user_id']);
}
if ($item['notification_user_id']) {
$saveData['notification_user_id'] = $this->turnID($item['notification_user_id']);
}
if (!empty($saveData)) {
$model->where('id', $item['id'])->update($saveData);
}
$count++;
}
$output->writeln("Group 表转换完成,共 {$count} 条记录");
}
/**
* 转换群组成员表
*
* @param string $oldUserID
* @param string $newUserID
* @param OutputInterface $output
* @return void
*/
protected function convertGroupMemberTable($oldUserID, $newUserID, $output)
{
$model = new \app\model\Openim\GroupMember();
$list = $model->select();
$count = 0;
foreach ($list as $item) {
$saveData = [];
if ($item['user_id']) {
$saveData['user_id'] = $this->turnID($item['user_id']);
}
if ($item['inviter_user_id']) {
$saveData['inviter_user_id'] = $this->turnID($item['inviter_user_id']);
}
if ($item['operator_user_id']) {
$saveData['operator_user_id'] = $this->turnID($item['operator_user_id']);
}
if (!empty($saveData)) {
$model->where('id', $item['id'])->update($saveData);
}
$count++;
}
$output->writeln("GroupMember 表转换完成,共 {$count} 条记录");
}
/**
* 转换黑名单表
*
* @param string $oldUserID
* @param string $newUserID
* @param OutputInterface $output
* @return void
*/
protected function convertBlackTable($oldUserID, $newUserID, $output)
{
$model = new \app\model\Openim\Black();
$list = $model->select();
$count = 0;
foreach ($list as $item) {
$saveData = [];
if ($item['block_user_id']) {
$saveData['block_user_id'] = $this->turnID($item['block_user_id']);
}
if ($item['owner_user_id']) {
$saveData['owner_user_id'] = $this->turnID($item['owner_user_id']);
}
if ($item['operator_user_id']) {
$saveData['operator_user_id'] = $this->turnID($item['operator_user_id']);
}
if (!empty($saveData)) {
$model->where('id', $item['id'])->update($saveData);
}
$count++;
}
$output->writeln("Black 表转换完成,共 {$count} 条记录");
}
/**
* 转换会话表
*
* @param string $oldUserID
* @param string $newUserID
* @param OutputInterface $output
* @return void
*/
protected function convertConversationTable($oldUserID, $newUserID, $output)
{
$model = new \app\model\Openim\Conversation();
$list = $model->select();
$count = 0;
foreach ($list as $item) {
$saveData = [];
if ($item['user_id']) {
$saveData['user_id'] = $this->turnID($item['user_id']);
}
if ($item['owner_user_id']) {
$saveData['owner_user_id'] = $this->turnID($item['owner_user_id']);
}
if ($item['conversation_id']) {
$saveData['conversation_id'] = $this->handlerConversationID($item['conversation_id']);
}
$model->where('id', $item['id'])->update($saveData);
$count++;
}
$output->writeln("Conversation 表转换完成,共 {$count} 条记录");
}
/**
* 转换用户表
*
* @param string $oldUserID
* @param string $newUserID
* @param OutputInterface $output
* @return void
*/
protected function convertUserTable($oldUserID, $newUserID, $output)
{
$model = new \app\model\Openim\User();
$list = $model->select();
foreach ($list as $user) {
$model->where('id', $user['id'])->update([
'user_id' => $this->turnID($user['user_id'])
]);
}
}
/**
* 转换群组请求表
*
* @param string $oldUserID
* @param string $newUserID
* @param OutputInterface $output
* @return void
*/
protected function convertGroupRequestTable($oldUserID, $newUserID, $output)
{
$model = new \app\model\Openim\GroupRequest();
$list = $model->select();
$count = 0;
foreach ($list as $item) {
$saveData = [];
if ($item['user_id']) {
$saveData['user_id'] = $this->turnID($item['user_id']);
}
if ($item['handle_user_id']) {
$saveData['handle_user_id'] = $this->turnID($item['handle_user_id']);
}
if ($item['inviter_user_id']) {
$saveData['inviter_user_id'] = $this->turnID($item['inviter_user_id']);
}
$model->where('id', $item['id'])->update($saveData);
$count++;
}
$output->writeln("GroupRequest 表转换完成,共 {$count} 条记录");
}
/**
* 转换seq表
*
* @param string $oldUserID
* @param string $newUserID
* @param OutputInterface $output
* @return void
*/
protected function convertSeqTable($oldUserID, $newUserID, $output)
{
$model = new \app\model\Openim\Seq();
$list = $model->select();
$count = 0;
foreach ($list as $item) {
$model->where('id', $item['id'])->update([
'conversation_id' => $this->handlerConversationID($item['conversation_id'])
]);
$count++;
}
$output->writeln("Seq 表转换完成,共 {$count} 条记录");
}
/**
* 转换seq_user表
*
* @param string $oldUserID
* @param string $newUserID
* @param OutputInterface $output
* @return void
*/
protected function convertSeqUserTable($oldUserID, $newUserID, $output)
{
$model = new \app\model\Openim\SeqUser();
$list = $model->select();
$count = 0;
foreach ($list as $item) {
$model->where('id', $item['id'])->update([
'user_id' => $this->turnID($item->user_id),
'conversation_id' => $this->handlerConversationID($item['conversation_id'])
]);
$count++;
}
if ($count) {
$output->writeln("SeqUser 表转换完成,共 {$count} 条记录");
}
}
/**
* 转换所有版本表
*
* @param string $oldUserID
* @param string $newUserID
* @param OutputInterface $output
* @return void
*/
protected function convertVersionTables($oldUserID, $newUserID, $output)
{
$this->convertConversationVersionTable($oldUserID, $newUserID, $output);
$this->convertFriendVersionTable($oldUserID, $newUserID, $output);
$this->convertGroupMemberVersionTable($oldUserID, $newUserID, $output);
$this->convertGroupJoinVersionTable($oldUserID, $newUserID, $output);
}
/**
* 转换会话版本表
*
* @param string $oldUserID
* @param string $newUserID
* @param OutputInterface $output
* @return void
*/
protected function convertConversationVersionTable($oldUserID, $newUserID, $output)
{
$model = new \app\model\Openim\ConversationVersion();
$list = $model->select();
$count = 0;
foreach ($list as $item) {
$saveData = [];
$saveData['d_id'] = $this->turnID($item['d_id']);
if (!empty($item['logs']) && is_array($item['logs'])) {
$logs = $item['logs'];
foreach ($logs as $key => $log) {
if (isset($log['e_id'])) {
$logs[$key]['e_id'] = $this->handlerConversationID($log['e_id']);
}
}
$saveData['logs'] = $logs;
}
$model->where('id', $item['id'])->update($saveData);
$count++;
}
$output->writeln("ConversationVersion 表转换完成,共 {$count} 条记录");
}
/**
* 转换好友版本表
*
* @param string $oldUserID
* @param string $newUserID
* @param OutputInterface $output
* @return void
*/
protected function convertFriendVersionTable($oldUserID, $newUserID, $output)
{
$model = new \app\model\Openim\FriendVersion();
$list = $model->select();
$count = 0;
foreach ($list as $item) {
$saveData = [];
$saveData['d_id'] = $this->turnID($item['d_id']);
if (!empty($item['logs']) && is_array($item['logs'])) {
$logs = $item['logs'];
foreach ($logs as $key => $log) {
if (isset($log['e_id'])) {
$logs[$key]['e_id'] = $this->handlerConversationID($log['e_id']);
}
}
$saveData['logs'] = $logs;
}
$model->where('id', $item['id'])->update($saveData);
$count++;
}
$output->writeln("FriendVersion 表转换完成,共 {$count} 条记录");
}
/**
* 转换群组成员版本表
*
* @param string $oldUserID
* @param string $newUserID
* @param OutputInterface $output
* @return void
*/
protected function convertGroupMemberVersionTable($oldUserID, $newUserID, $output)
{
$model = new \app\model\Openim\GroupMemberVersion();
$list = $model->select();
$count = 0;
foreach ($list as $item) {
$saveData = [];
$saveData['d_id'] = $this->turnID($item['d_id']);
if (!empty($item['logs']) && is_array($item['logs'])) {
$logs = $item['logs'];
foreach ($logs as $key => $log) {
if (isset($log['e_id'])) {
$logs[$key]['e_id'] = $this->handlerConversationID($log['e_id']);
}
}
$saveData['logs'] = $logs;
}
$model->where('id', $item['id'])->update($saveData);
$count++;
}
$output->writeln("GroupMemberVersion 表转换完成,共 {$count} 条记录");
}
/**
* 转换群组加入版本表
*
* @param string $oldUserID
* @param string $newUserID
* @param OutputInterface $output
* @return void
*/
protected function convertGroupJoinVersionTable($oldUserID, $newUserID, $output)
{
$model = new \app\model\Openim\GroupJoinVersion();
$list = $model->select();
$count = 0;
foreach ($list as $item) {
$saveData = [];
$saveData['d_id'] = $this->turnID($item['d_id']);
if (!empty($item['logs']) && is_array($item['logs'])) {
$logs = $item['logs'];
foreach ($logs as $key => $log) {
if (isset($log['e_id'])) {
$logs[$key]['e_id'] = $this->handlerConversationID($log['e_id']);
}
}
$saveData['logs'] = $logs;
}
$model->where('id', $item['id'])->update($saveData);
$count++;
}
$output->writeln("GroupJoinVersion 表转换完成,共 {$count} 条记录");
}
/**
* 转换用户ID
*
* 将用户ID从旧格式转换为新格式
*
* @param string $id
* @return string
*/
function turnID($id)
{
if (isset(self::SPECIAL_ACCOUNTS[$id])) {
return self::SPECIAL_ACCOUNTS[$id];
}
try {
return \support\Encrypt::intEncode(\support\Encrypt::userIDDecode($id));
} catch (\Exception $e) {
return $id;
}
}
/**
* 转换消息表
*
* 处理消息内容中的用户ID、URL等
*
* @param string $oldUserID
* @param string $newUserID
* @param OutputInterface $output
* @return void
*/
protected function convertMsgTable($oldUserID, $newUserID, $output)
{
$model = new \app\model\Openim\Msg();
$total = $model->count('id');
$count = 0;
for ($i = 0; $i < $total; $i += self::BATCH_SIZE) {
$list = $model->limit($i, self::BATCH_SIZE)->select();
foreach ($list as $item) {
$saveData = $this->processMessageItem($item);
if (!empty($saveData)) {
$model->where('id', $item['id'])->update($saveData);
$count++;
}
}
}
}
/**
* 处理单条消息记录
*
* @param array $item
* @return array
*/
protected function processMessageItem($item)
{
$saveData = [];
$docIdParts = explode(':', $item['doc_id']);
$docIdParts[0] = $this->handlerConversationID($docIdParts[0]);
$saveData['doc_id'] = $docIdParts[0] . ':' . $docIdParts[1];
foreach ($item['msgs'] as $key => $msg) {
$saveData['msgs'][$key] = $this->processSingleMessage($msg);
}
return $saveData;
}
/**
* 处理单条消息
*
* @param array $msg
* @return array
*/
protected function processSingleMessage($msg)
{
if (isset($msg['msg']['send_id'])) {
$msg['msg']['send_id'] = $this->turnID($msg['msg']['send_id']);
}
if (isset($msg['msg']['recv_id'])) {
$msg['msg']['recv_id'] = $this->turnID($msg['msg']['recv_id']);
}
$content = [];
if ($msg['msg']['content']) {
$content = json_decode($msg['msg']['content'], true);
}
$content = $this->processMessageContent($content, $msg['msg']['content_type']);
$msg['msg']['content'] = json_encode($content, JSON_UNESCAPED_UNICODE);
return $msg;
}
/**
* 处理消息内容
*
* @param array $content
* @param string $contentType
* @return array
*/
protected function processMessageContent($content, $contentType)
{
switch ($contentType) {
case '101':
break;
case '102':
$urlFields = ['sourcePath'];
$this->handlerUrl($content, $urlFields);
if (isset($content['sourcePicture'])) {
$this->handlerUrl($content['sourcePicture'], ['url']);
}
if (isset($content['bigPicture'])) {
$this->handlerUrl($content['bigPicture'], ['url']);
}
if (isset($content['snapshotPicture'])) {
$this->handlerUrl($content['snapshotPicture'], ['url']);
}
break;
case '103':
$this->handlerUrl($content, ['soundPath', 'sourceUrl']);
break;
case '104':
$this->handlerUrl($content, ['videoUrl', 'snapshotUrl', 'snapshotPath', 'videoPath']);
break;
case '106':
if (isset($content['atUserList'])) {
foreach ($content['atUserList'] as $k => $userID) {
$content['atUserList'][$k] = $this->turnID($userID);
}
}
if (isset($content['atUsersInfo'])) {
foreach ($content['atUsersInfo'] as $k => $v) {
$content['atUsersInfo'][$k]['atUserID'] = $this->turnID($v['atUserID']);
}
}
break;
case '108':
$this->handlerUserId($content, ['userID']);
$this->handlerUrl($content, ['faceURL']);
break;
case '114':
if (isset($content['quoteMessage'])) {
$this->handlerUserId($content['quoteMessage'], ['sendID', 'recvID']);
$this->handlerUrl($content['quoteMessage'], ['senderFaceUrl']);
}
break;
case '1201':
$content = $this->processFriendNotification($content);
break;
case '1501':
case '1504':
case '1507':
case '1508':
case '1509':
case '1510':
case '1512':
case '1513':
case '1514':
case '1519':
case '1515':
case '1520':
$content = $this->processGroupNotification($content);
break;
case '1701':
$content = $this->processReadNotification($content);
break;
case '2001':
break;
case '2200':
$content = $this->processReadReceipt($content);
break;
case '2101':
$content = $this->processMessageRecall($content);
break;
}
return $content;
}
/**
* 处理好友通知消息
*
* @param array $content
* @return array
*/
protected function processFriendNotification($content)
{
$detail = json_decode($content['detail'], true);
if (isset($detail['fromToUserID'])) {
$this->handlerUserId($detail['fromToUserID'], ['fromUserID', 'toUserID']);
}
if (isset($detail['request'])) {
$this->handlerUserId($detail['request'], ['handlerUserID', 'fromUserID', 'toUserID']);
$this->handlerUrl($detail['request'], ['fromFaceURL', 'toFaceURL']);
}
$content['detail'] = json_encode($detail, JSON_UNESCAPED_UNICODE);
return $content;
}
/**
* 处理群组通知消息
*
* @param array $content
* @return array
*/
protected function processGroupNotification($content)
{
$detail = json_decode($content['detail'], true);
if (isset($detail['memberList'])) {
foreach ($detail['memberList'] as $memberIndex => $member) {
$this->handlerUserId($member, ['userID', 'operatorUserID', 'inviterUserID']);
$this->handlerUrl($member, ['faceURL']);
$detail['memberList'][$memberIndex] = $member;
}
}
if (isset($detail['invitedUserList'])) {
foreach ($detail['invitedUserList'] as $memberIndex => $member) {
$this->handlerUserId($member, ['userID', 'operatorUserID', 'inviterUserID']);
$this->handlerUrl($member, ['faceURL']);
$detail['invitedUserList'][$memberIndex] = $member;
}
}
$userFields = [
'groupOwnerUser', 'inviterUser', 'oldGroupOwnerInfo',
'newGroupOwner', 'entrantUser', 'mutedUser', 'opUser', 'quitUser'
];
foreach ($userFields as $field) {
if (isset($detail[$field])) {
$this->handlerUserId($detail[$field], ['userID', 'operatorUserID', 'inviterUserID']);
$this->handlerUrl($detail[$field], ['faceURL']);
}
}
if (isset($detail['group'])) {
$this->handlerUserId($detail['group'], ['ownerUserID', 'creatorUserID']);
$this->handlerUrl($detail['group'], ['faceURL']);
}
if (isset($detail['kickedUserList'])) {
foreach ($detail['kickedUserList'] as $memberIndex => $member) {
$this->handlerUserId($member, ['userID', 'operatorUserID', 'inviterUserID']);
$this->handlerUrl($member, ['faceURL']);
$detail['kickedUserList'][$memberIndex] = $member;
}
}
$content['detail'] = json_encode($detail, JSON_UNESCAPED_UNICODE);
return $content;
}
/**
* 处理消息已读通知
*
* @param array $content
* @return array
*/
protected function processReadNotification($content)
{
$detail = json_decode($content['detail'], true);
$this->handlerUserId($detail, ['recvID', 'sendID']);
if (isset($detail['conversationID'])) {
$detail['conversationID'] = $this->handlerConversationID($detail['conversationID']);
}
$content['detail'] = json_encode($detail, JSON_UNESCAPED_UNICODE);
return $content;
}
/**
* 处理消息已读回执
*
* @param array $content
* @return array
*/
protected function processReadReceipt($content)
{
$detail = json_decode($content['detail'], true);
$this->handlerUserId($detail, ['markAsReadUserID']);
if (isset($detail['conversationID'])) {
$detail['conversationID'] = $this->handlerConversationID($detail['conversationID']);
}
$content['detail'] = json_encode($detail, JSON_UNESCAPED_UNICODE);
return $content;
}
/**
* 处理消息撤回
*
* @param array $content
* @return array
*/
protected function processMessageRecall($content)
{
$detail = json_decode($content['detail'], true);
$this->handlerUserId($detail, ['revokerUserID']);
$detail['conversationID'] = $this->handlerConversationID($detail['conversationID']);
$content['detail'] = json_encode($detail, JSON_UNESCAPED_UNICODE);
return $content;
}
/**
* 处理会话ID
*
* 将会话ID中的用户ID转换为新格式
*
* @param string $data
* @return string
*/
function handlerConversationID($data)
{
$data = str_replace('official_team', 'officialteam', $data);
$cids = explode('_', $data);
$cids[1] = $this->turnID($cids[1]);
if (count($cids) > 2) {
$cids[2] = $this->turnID($cids[2]);
}
return implode('_', $cids);
}
/**
* 处理用户ID字段
*
* @param array $data
* @param array $userIDFields
* @return void
*/
function handlerUserId(&$data, $userIDFields)
{
foreach ($userIDFields as $userIDField) {
if (isset($data[$userIDField]) && $data[$userIDField]) {
$data[$userIDField] = $this->turnID($data[$userIDField]);
}
}
}
/**
* 处理URL字段
*
* @param array $data
* @param array $urlFields
* @return void
*/
function handlerUrl(&$data, $urlFields)
{
foreach ($urlFields as $urlField) {
if (isset($data[$urlField])) {
$data[$urlField] = $this->cdn($data[$urlField]);
}
}
}
/**
* CDN URL处理
*
* 将HTTP协议转换为HTTPS
*
* @param string $url
* @return string
*/
function cdn($url)
{
if (substr($url, 0, 2) == '//') {
$url = 'https:' . $url;
}
if (substr($url, 0, 7) == 'http://') {
$url = 'https://' . substr($url, 7);
}
return $url;
}
/**
* 获取用户(测试方法)
*
* @return int
*/
function getUser()
{
$im = $this->getSdk();
$data = $im->user->searchNotificationAccount('');
cp($data);
return 0;
}
/**
* 修改用户(测试方法)
*
* @param InputInterface $input
* @param OutputInterface $output
* @return int
*/
private function change_user(InputInterface $input, OutputInterface $output): int
{
$im = $this->getSdk();
$data = $im->user->updateUserInfo(
\support\Encrypt::userIDencode('100006'),
['userInfo' => ['userId' => 'wx100001']]
);
cp($data);
return self::SUCCESS;
}
/**
* 同步用户(未完成)
*
* @param InputInterface $input
* @param OutputInterface $output
* @return int
*/
private function sync_users(InputInterface $input, OutputInterface $output): int
{
$im = $this->getSdk();
$data = $im->user->getAllUsersUid(1, 1000);
cp($data);
return self::SUCCESS;
}
/**
* 同步缓存
*
* 同步群组相关数据到缓存
*
* @return int
*/
function sync_cache()
{
$res = \app\model\Openim\Group::field('group_id,creator_user_id')->select();
$groupCreate = [];
foreach ($res as $v) {
if (!isset($groupCreate[$v['creator_user_id']])) {
$groupCreate[$v['creator_user_id']] = 0;
}
if ($v['status'] != 2) {
$groupCreate[$v['creator_user_id']] += 1;
}
cache('group_owner_' . $v['group_id'], $v['creator_user_id']);
$groupUserCount = \app\model\Openim\GroupMember::field('group_id,count(*) as count')
->where('group_id', $v['group_id'])
->count('user_id');
cp('群组数量', $v['group_id'], '成员数量:', $groupUserCount);
cache('group_' . $v['group_id'] . '_user_count', $groupUserCount);
}
foreach ($groupCreate as $userId => $count) {
cp('用户:', $userId, '创建群组数量:', $count);
cache('user_' . $userId . '_create_group_count', $count);
}
return 0;
}
/**
* 获取OpenIM SDK实例
*
* @return object
*/
function getSdk()
{
if ($this->sdk) {
return $this->sdk;
}
$this->sdk = new \support\OpenImSdk\Client([
'host' => config('openim.server'),
'secret' => config('openim.secret'),
]);
return $this->sdk;
}
/**
* 配置命令参数
*
* @return void
*/
protected function configure()
{
$this->addOption('action', 'a', InputOption::VALUE_OPTIONAL, '操作类型');
$this->addOption('conversationID', 'c', InputOption::VALUE_OPTIONAL, '会话ID');
$this->addOption('userID', 'u', InputOption::VALUE_OPTIONAL, '用户ID');
$this->addOption('seq', 's', InputOption::VALUE_OPTIONAL, '消息序列');
}
/**
* 执行命令
*
* @param InputInterface $input
* @param OutputInterface $output
* @return int
*/
protected function execute(InputInterface $input, OutputInterface $output): int
{
$action = $input->getOption('action');
if (!$action) {
$output->writeln('空操作');
return self::FAILURE;
}
if (method_exists($this, $action)) {
return $this->$action($input, $output);
}
$output->writeln($action . '不存在1');
return self::FAILURE;
}
}