This commit is contained in:
wangchuxiao
2023-05-16 16:31:35 +08:00
parent 67ad00ab04
commit 2446a5d48c
11 changed files with 279 additions and 235 deletions
+23 -15
View File
@@ -2,10 +2,10 @@ package controller
import (
"context"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/cache"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/relation"
"github.com/OpenIMSDK/Open-IM-Server/pkg/errs"
"gorm.io/gorm"
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
)
type BlackDatabase interface {
@@ -31,12 +31,26 @@ func NewBlackDatabase(black relation.BlackModelInterface, cache cache.BlackCache
// Create 增加黑名单
func (b *blackDatabase) Create(ctx context.Context, blacks []*relation.BlackModel) (err error) {
return b.black.Create(ctx, blacks)
if err := b.black.Create(ctx, blacks); err != nil {
return err
}
return b.deleteBlackIDsCache(ctx, blacks)
}
// Delete 删除黑名单
func (b *blackDatabase) Delete(ctx context.Context, blacks []*relation.BlackModel) (err error) {
return b.black.Delete(ctx, blacks)
if err := b.black.Delete(ctx, blacks); err != nil {
return err
}
return b.deleteBlackIDsCache(ctx, blacks)
}
func (b *blackDatabase) deleteBlackIDsCache(ctx context.Context, blacks []*relation.BlackModel) (err error) {
cache := b.cache.NewCache()
for _, black := range blacks {
cache = cache.DelBlackIDs(ctx, black.OwnerUserID)
}
return cache.ExecDel(ctx)
}
// FindOwnerBlacks 获取黑名单列表
@@ -46,21 +60,15 @@ func (b *blackDatabase) FindOwnerBlacks(ctx context.Context, ownerUserID string,
// CheckIn 检查user2是否在user1的黑名单列表中(inUser1Blacks==true) 检查user1是否在user2的黑名单列表中(inUser2Blacks==true)
func (b *blackDatabase) CheckIn(ctx context.Context, userID1, userID2 string) (inUser1Blacks bool, inUser2Blacks bool, err error) {
_, err = b.black.Take(ctx, userID1, userID2)
userID1BlackIDs, err := b.cache.GetBlackIDs(ctx, userID1)
if err != nil {
if errs.Unwrap(err) != gorm.ErrRecordNotFound {
return
}
return
}
inUser1Blacks = err == nil
_, err = b.black.Take(ctx, userID2, userID1)
userID2BlackIDs, err := b.cache.GetBlackIDs(ctx, userID2)
if err != nil {
if errs.Unwrap(err) != gorm.ErrRecordNotFound {
return
}
return
}
inUser2Blacks = err == nil
return inUser1Blacks, inUser2Blacks, nil
return utils.IsContain(userID2, userID1BlackIDs), utils.IsContain(userID1, userID2BlackIDs), nil
}
func (b *blackDatabase) FindBlackIDs(ctx context.Context, ownerUserID string) (blackIDs []string, err error) {
+47 -28
View File
@@ -46,14 +46,14 @@ type ConversationDataBase struct {
tx tx.Tx
}
func (c *ConversationDataBase) SetUsersConversationFiledTx(ctx context.Context, userIDs []string, conversation *relationTb.ConversationModel, filedMap map[string]interface{}) error {
return c.tx.Transaction(func(tx any) error {
func (c *ConversationDataBase) SetUsersConversationFiledTx(ctx context.Context, userIDs []string, conversation *relationTb.ConversationModel, filedMap map[string]interface{}) (err error) {
cache := c.cache.NewCache()
if err := c.tx.Transaction(func(tx any) error {
conversationTx := c.conversationDB.NewTx(tx)
haveUserIDs, err := conversationTx.FindUserID(ctx, userIDs, []string{conversation.ConversationID})
if err != nil {
return err
}
cache := c.cache.NewCache()
if len(haveUserIDs) > 0 {
_, err = conversationTx.UpdateByMap(ctx, haveUserIDs, conversation.ConversationID, filedMap)
if err != nil {
@@ -71,19 +71,20 @@ func (c *ConversationDataBase) SetUsersConversationFiledTx(ctx context.Context,
}
temp.OwnerUserID = v
conversations = append(conversations, temp)
}
}
if len(conversations) > 0 {
err = conversationTx.Create(ctx, conversations)
if err != nil {
return err
}
cache = cache.DelConversationIDs(NotUserIDs)
cache = cache.DelConversationIDs(NotUserIDs...)
}
// clear cache
log.ZDebug(ctx, "SetUsersConversationFiledTx", "cache", cache.GetPreDelKeys(), "addr", &cache)
return cache.ExecDel(ctx)
})
return nil
}); err != nil {
return err
}
return cache.ExecDel(ctx)
}
func (c *ConversationDataBase) UpdateUsersConversationFiled(ctx context.Context, userIDs []string, conversationID string, args map[string]interface{}) error {
@@ -98,13 +99,17 @@ func (c *ConversationDataBase) CreateConversation(ctx context.Context, conversat
if err := c.conversationDB.Create(ctx, conversations); err != nil {
return err
}
return nil
var userIDs []string
for _, conversation := range conversations {
userIDs = append(userIDs, conversation.OwnerUserID)
}
return c.cache.DelConversationIDs(userIDs...).ExecDel(ctx)
}
func (c *ConversationDataBase) SyncPeerUserPrivateConversationTx(ctx context.Context, conversations []*relationTb.ConversationModel) error {
return c.tx.Transaction(func(tx any) error {
cache := c.cache.NewCache()
if err := c.tx.Transaction(func(tx any) error {
conversationTx := c.conversationDB.NewTx(tx)
cache := c.cache.NewCache()
for _, conversation := range conversations {
for _, v := range [][2]string{{conversation.OwnerUserID, conversation.UserID}, {conversation.UserID, conversation.OwnerUserID}} {
haveUserIDs, err := conversationTx.FindUserID(ctx, []string{v[0]}, []string{conversation.ConversationID})
@@ -126,12 +131,15 @@ func (c *ConversationDataBase) SyncPeerUserPrivateConversationTx(ctx context.Con
if err := conversationTx.Create(ctx, []*relationTb.ConversationModel{&newConversation}); err != nil {
return err
}
cache = cache.DelConversationIDs([]string{v[0]})
cache = cache.DelConversationIDs([]string{v[0]}...)
}
}
}
return c.cache.ExecDel(ctx)
})
return nil
}); err != nil {
return err
}
return c.cache.ExecDel(ctx)
}
func (c *ConversationDataBase) FindConversations(ctx context.Context, ownerUserID string, conversationIDs []string) ([]*relationTb.ConversationModel, error) {
@@ -147,7 +155,8 @@ func (c *ConversationDataBase) GetUserAllConversation(ctx context.Context, owner
}
func (c *ConversationDataBase) SetUserConversations(ctx context.Context, ownerUserID string, conversations []*relationTb.ConversationModel) error {
return c.tx.Transaction(func(tx any) error {
cache := c.cache.NewCache()
if err := c.tx.Transaction(func(tx any) error {
var conversationIDs []string
for _, conversation := range conversations {
conversationIDs = append(conversationIDs, conversation.ConversationID)
@@ -181,13 +190,14 @@ func (c *ConversationDataBase) SetUserConversations(ctx context.Context, ownerUs
if err != nil {
return err
}
cache = cache.DelConversationIDs([]string{ownerUserID}...)
}
cache := c.cache.NewCache()
if len(notExistConversations) > 0 {
cache = cache.DelConversationIDs([]string{ownerUserID})
}
return cache.DelConvsersations(ownerUserID, existConversationIDs).ExecDel(ctx)
})
cache = cache.DelConvsersations(ownerUserID, existConversationIDs...)
return nil
}); err != nil {
return err
}
return cache.ExecDel(ctx)
}
func (c *ConversationDataBase) FindRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) ([]string, error) {
@@ -195,27 +205,36 @@ func (c *ConversationDataBase) FindRecvMsgNotNotifyUserIDs(ctx context.Context,
}
func (c *ConversationDataBase) CreateGroupChatConversation(ctx context.Context, groupID string, userIDs []string) error {
cache := c.cache.NewCache()
conversationID := utils.GetConversationIDBySessionType(constant.SuperGroupChatType, groupID)
return c.tx.Transaction(func(tx any) error {
existConversationUserIDs, err := c.conversationDB.FindUserID(ctx, userIDs, []string{groupID})
if err := c.tx.Transaction(func(tx any) error {
existConversationUserIDs, err := c.conversationDB.FindUserID(ctx, userIDs, []string{conversationID})
if err != nil {
return err
}
notExistUserIDs := utils.DifferenceString(userIDs, existConversationUserIDs)
var conversations []*relationTb.ConversationModel
for _, v := range notExistUserIDs {
conversation := relationTb.ConversationModel{ConversationType: constant.SuperGroupChatType, GroupID: groupID, OwnerUserID: v, ConversationID: conversationID}
conversations = append(conversations, &conversation)
}
cache = cache.DelConversationIDs(notExistUserIDs...)
err = c.conversationDB.Create(ctx, conversations)
if err != nil {
return err
}
_, err = c.conversationDB.UpdateByMap(ctx, userIDs, conversationID, map[string]interface{}{"max_seq": 0})
_, err = c.conversationDB.UpdateByMap(ctx, existConversationUserIDs, conversationID, map[string]interface{}{"max_seq": 0})
if err != nil {
return err
}
for _, v := range existConversationUserIDs {
cache = cache.DelConvsersations(v, conversationID)
}
return nil
}); err != nil {
return err
})
}
return cache.ExecDel(ctx)
}
func (c *ConversationDataBase) GetConversationIDs(ctx context.Context, userID string) ([]string, error) {
+22 -27
View File
@@ -55,19 +55,15 @@ func NewFriendDatabase(friend relation.FriendModelInterface, friendRequest relat
// ok 检查user2是否在user1的好友列表中(inUser1Friends==true) 检查user1是否在user2的好友列表中(inUser2Friends==true)
func (f *friendDatabase) CheckIn(ctx context.Context, userID1, userID2 string) (inUser1Friends bool, inUser2Friends bool, err error) {
friends, err := f.friend.FindUserState(ctx, userID1, userID2)
userID1FriendIDs, err := f.cache.GetFriendIDs(ctx, userID1)
if err != nil {
return false, false, err
return
}
for _, v := range friends {
if v.OwnerUserID == userID1 && v.FriendUserID == userID2 {
inUser1Friends = true
}
if v.OwnerUserID == userID2 && v.FriendUserID == userID1 {
inUser2Friends = true
}
userID2FriendIDs, err := f.cache.GetFriendIDs(ctx, userID2)
if err != nil {
return
}
return
return utils.IsContain(userID2, userID1FriendIDs), utils.IsContain(userID1, userID2FriendIDs), nil
}
// 增加或者更新好友申请 如果之前有记录则更新,没有记录则新增
@@ -100,7 +96,8 @@ func (f *friendDatabase) AddFriendRequest(ctx context.Context, fromUserID, toUse
// (1)先判断是否在好友表 (在不在都不返回错误) (2)对于不在好友列表的 插入即可
func (f *friendDatabase) BecomeFriends(ctx context.Context, ownerUserID string, friendUserIDs []string, addSource int32) (err error) {
return f.tx.Transaction(func(tx any) error {
cache := f.cache.NewCache()
if err := f.tx.Transaction(func(tx any) error {
//先find 找出重复的 去掉重复的
fs1, err := f.friend.NewTx(tx).FindFriends(ctx, ownerUserID, friendUserIDs)
if err != nil {
@@ -135,8 +132,12 @@ func (f *friendDatabase) BecomeFriends(ctx context.Context, ownerUserID string,
return err
}
newFriendIDs = append(newFriendIDs, ownerUserID)
return f.cache.DelFriendIDs(newFriendIDs...).ExecDel(ctx)
})
cache = cache.DelFriendIDs(newFriendIDs...)
return nil
}); err != nil {
return nil
}
return cache.ExecDel(ctx)
}
// 拒绝好友申请 (1)检查是否有申请记录且为未处理状态 (没有记录返回错误) (2)修改申请记录 已拒绝
@@ -199,24 +200,18 @@ func (f *friendDatabase) AgreeFriendRequest(ctx context.Context, friendRequest *
// 删除好友 外部判断是否好友关系
func (f *friendDatabase) Delete(ctx context.Context, ownerUserID string, friendUserIDs []string) (err error) {
return f.tx.Transaction(func(tx any) error {
if err := f.friend.Delete(ctx, ownerUserID, friendUserIDs); err != nil {
return err
}
return f.cache.DelFriendIDs(append(friendUserIDs, ownerUserID)...).ExecDel(ctx)
})
if err := f.friend.Delete(ctx, ownerUserID, friendUserIDs); err != nil {
return err
}
return f.cache.DelFriendIDs(append(friendUserIDs, ownerUserID)...).ExecDel(ctx)
}
// 更新好友备注 零值也支持
func (f *friendDatabase) UpdateRemark(ctx context.Context, ownerUserID, friendUserID, remark string) (err error) {
return f.tx.Transaction(func(tx any) error {
err := f.friend.UpdateRemark(ctx, ownerUserID, friendUserID, remark)
if err != nil {
return err
}
return f.cache.DelFriend(ownerUserID, friendUserID).ExecDel(ctx)
})
if err := f.friend.UpdateRemark(ctx, ownerUserID, friendUserID, remark); err != nil {
return err
}
return f.cache.DelFriend(ownerUserID, friendUserID).ExecDel(ctx)
}
// 获取ownerUserID的好友列表 无结果不返回错误
+68 -57
View File
@@ -113,7 +113,8 @@ func (g *groupDatabase) FindGroupMemberUserID(ctx context.Context, groupID strin
}
func (g *groupDatabase) CreateGroup(ctx context.Context, groups []*relationTb.GroupModel, groupMembers []*relationTb.GroupMemberModel) error {
return g.tx.Transaction(func(tx any) error {
var cache = g.cache.NewCache()
if err := g.tx.Transaction(func(tx any) error {
if len(groups) > 0 {
if err := g.groupDB.NewTx(tx).Create(ctx, groups); err != nil {
return err
@@ -128,7 +129,7 @@ func (g *groupDatabase) CreateGroup(ctx context.Context, groups []*relationTb.Gr
return group.GroupID
})
m := make(map[string]struct{})
var cache = g.cache.NewCache()
for _, groupMember := range groupMembers {
if _, ok := m[groupMember.GroupID]; !ok {
m[groupMember.GroupID] = struct{}{}
@@ -137,8 +138,11 @@ func (g *groupDatabase) CreateGroup(ctx context.Context, groups []*relationTb.Gr
cache = cache.DelJoinedGroupID(groupMember.UserID).DelGroupMembersInfo(groupMember.GroupID, groupMember.UserID)
}
cache = cache.DelGroupsInfo(createGroupIDs...)
return cache.ExecDel(ctx)
})
return nil
}); err != nil {
return err
}
return cache.ExecDel(ctx)
}
func (g *groupDatabase) TakeGroup(ctx context.Context, groupID string) (group *relationTb.GroupModel, err error) {
@@ -154,16 +158,15 @@ func (g *groupDatabase) SearchGroup(ctx context.Context, keyword string, pageNum
}
func (g *groupDatabase) UpdateGroup(ctx context.Context, groupID string, data map[string]any) error {
return g.tx.Transaction(func(tx any) error {
if err := g.groupDB.NewTx(tx).UpdateMap(ctx, groupID, data); err != nil {
return err
}
return g.cache.DelGroupsInfo(groupID).ExecDel(ctx)
})
if err := g.groupDB.UpdateMap(ctx, groupID, data); err != nil {
return err
}
return g.cache.DelGroupsInfo(groupID).ExecDel(ctx)
}
func (g *groupDatabase) DismissGroup(ctx context.Context, groupID string) error {
return g.tx.Transaction(func(tx any) error {
cache := g.cache.NewCache()
if err := g.tx.Transaction(func(tx any) error {
if err := g.groupDB.NewTx(tx).UpdateStatus(ctx, groupID, constant.GroupStatusDismissed); err != nil {
return err
}
@@ -174,8 +177,12 @@ func (g *groupDatabase) DismissGroup(ctx context.Context, groupID string) error
if err != nil {
return err
}
return g.cache.DelJoinedGroupID(userIDs...).DelGroupsInfo(groupID).DelGroupMemberIDs(groupID).DelGroupsMemberNum(groupID).DelGroupMembersHash(groupID).ExecDel(ctx)
})
cache = cache.DelJoinedGroupID(userIDs...).DelGroupsInfo(groupID).DelGroupMemberIDs(groupID).DelGroupsMemberNum(groupID).DelGroupMembersHash(groupID)
return nil
}); err != nil {
return err
}
return cache.ExecDel(ctx)
}
func (g *groupDatabase) TakeGroupMember(ctx context.Context, groupID string, userID string) (groupMember *relationTb.GroupMemberModel, err error) {
@@ -236,7 +243,8 @@ func (g *groupDatabase) SearchGroupMember(ctx context.Context, keyword string, g
}
func (g *groupDatabase) HandlerGroupRequest(ctx context.Context, groupID string, userID string, handledMsg string, handleResult int32, member *relationTb.GroupMemberModel) error {
return g.tx.Transaction(func(tx any) error {
cache := g.cache.NewCache()
if err := g.tx.Transaction(func(tx any) error {
if err := g.groupRequestDB.NewTx(tx).UpdateHandler(ctx, groupID, userID, handledMsg, handleResult); err != nil {
return err
}
@@ -244,19 +252,20 @@ func (g *groupDatabase) HandlerGroupRequest(ctx context.Context, groupID string,
if err := g.groupMemberDB.NewTx(tx).Create(ctx, []*relationTb.GroupMemberModel{member}); err != nil {
return err
}
return g.cache.DelGroupMembersHash(groupID).DelGroupMemberIDs(groupID).DelGroupsMemberNum(groupID).DelJoinedGroupID(member.UserID).ExecDel(ctx)
cache = cache.DelGroupMembersHash(groupID).DelGroupMemberIDs(groupID).DelGroupsMemberNum(groupID).DelJoinedGroupID(member.UserID)
}
return nil
})
}); err != nil {
return err
}
return cache.ExecDel(ctx)
}
func (g *groupDatabase) DeleteGroupMember(ctx context.Context, groupID string, userIDs []string) error {
return g.tx.Transaction(func(tx any) error {
if err := g.groupMemberDB.NewTx(tx).Delete(ctx, groupID, userIDs); err != nil {
return err
}
return g.cache.DelGroupMembersHash(groupID).DelGroupMemberIDs(groupID).DelGroupsMemberNum(groupID).DelJoinedGroupID(userIDs...).DelGroupMembersInfo(groupID, userIDs...).ExecDel(ctx)
})
if err := g.groupMemberDB.Delete(ctx, groupID, userIDs); err != nil {
return err
}
return g.cache.DelGroupMembersHash(groupID).DelGroupMemberIDs(groupID).DelGroupsMemberNum(groupID).DelJoinedGroupID(userIDs...).DelGroupMembersInfo(groupID, userIDs...).ExecDel(ctx)
}
func (g *groupDatabase) MapGroupMemberUserID(ctx context.Context, groupIDs []string) (map[string]*relationTb.GroupSimpleUserID, error) {
@@ -276,7 +285,7 @@ func (g *groupDatabase) MapGroupMemberNum(ctx context.Context, groupIDs []string
}
func (g *groupDatabase) TransferGroupOwner(ctx context.Context, groupID string, oldOwnerUserID, newOwnerUserID string, roleLevel int32) error {
return g.tx.Transaction(func(tx any) error {
if err := g.tx.Transaction(func(tx any) error {
rowsAffected, err := g.groupMemberDB.NewTx(tx).UpdateRoleLevel(ctx, groupID, oldOwnerUserID, roleLevel)
if err != nil {
return err
@@ -291,30 +300,34 @@ func (g *groupDatabase) TransferGroupOwner(ctx context.Context, groupID string,
if rowsAffected != 1 {
return utils.Wrap(fmt.Errorf("newOwnerUserID %s rowsAffected = %d", newOwnerUserID, rowsAffected), "")
}
return g.cache.DelGroupMembersInfo(groupID, oldOwnerUserID, newOwnerUserID).ExecDel(ctx)
})
return nil
}); err != nil {
return err
}
return g.cache.DelGroupMembersInfo(groupID, oldOwnerUserID, newOwnerUserID).ExecDel(ctx)
}
func (g *groupDatabase) UpdateGroupMember(ctx context.Context, groupID string, userID string, data map[string]any) error {
return g.tx.Transaction(func(tx any) error {
if err := g.groupMemberDB.NewTx(tx).Update(ctx, groupID, userID, data); err != nil {
return err
}
return g.cache.DelGroupMembersInfo(groupID, userID).ExecDel(ctx)
})
if err := g.groupMemberDB.Update(ctx, groupID, userID, data); err != nil {
return err
}
return g.cache.DelGroupMembersInfo(groupID, userID).ExecDel(ctx)
}
func (g *groupDatabase) UpdateGroupMembers(ctx context.Context, data []*relationTb.BatchUpdateGroupMember) error {
return g.tx.Transaction(func(tx any) error {
var cache = g.cache.NewCache()
var cache = g.cache.NewCache()
if err := g.tx.Transaction(func(tx any) error {
for _, item := range data {
if err := g.groupMemberDB.NewTx(tx).Update(ctx, item.GroupID, item.UserID, item.Map); err != nil {
return err
}
cache = cache.DelGroupMembersInfo(item.GroupID, item.UserID)
}
return cache.ExecDel(ctx)
})
return nil
}); err != nil {
return err
}
return cache.ExecDel(ctx)
}
func (g *groupDatabase) CreateGroupRequest(ctx context.Context, requests []*relationTb.GroupRequestModel) error {
@@ -346,16 +359,15 @@ func (g *groupDatabase) FindJoinSuperGroup(ctx context.Context, userID string) (
}
func (g *groupDatabase) CreateSuperGroup(ctx context.Context, groupID string, initMemberIDs []string) error {
return g.ctxTx.Transaction(ctx, func(ctx context.Context) error {
if err := g.mongoDB.CreateSuperGroup(ctx, groupID, initMemberIDs); err != nil {
return err
}
return g.cache.DelSuperGroupMemberIDs(groupID).DelJoinedSuperGroupIDs(initMemberIDs...).ExecDel(ctx)
})
if err := g.mongoDB.CreateSuperGroup(ctx, groupID, initMemberIDs); err != nil {
return err
}
return g.cache.DelSuperGroupMemberIDs(groupID).DelJoinedSuperGroupIDs(initMemberIDs...).ExecDel(ctx)
}
func (g *groupDatabase) DeleteSuperGroup(ctx context.Context, groupID string) error {
return g.ctxTx.Transaction(ctx, func(ctx context.Context) error {
cache := g.cache.NewCache()
if err := g.ctxTx.Transaction(ctx, func(ctx context.Context) error {
if err := g.mongoDB.DeleteSuperGroup(ctx, groupID); err != nil {
return err
}
@@ -363,28 +375,27 @@ func (g *groupDatabase) DeleteSuperGroup(ctx context.Context, groupID string) er
if err != nil {
return err
}
cache := g.cache.DelSuperGroupMemberIDs(groupID)
cache = cache.DelSuperGroupMemberIDs(groupID)
if len(models) > 0 {
cache = cache.DelJoinedSuperGroupIDs(models[0].MemberIDs...)
}
return cache.ExecDel(ctx)
})
return nil
}); err != nil {
return err
}
return cache.ExecDel(ctx)
}
func (g *groupDatabase) DeleteSuperGroupMember(ctx context.Context, groupID string, userIDs []string) error {
return g.ctxTx.Transaction(ctx, func(ctx context.Context) error {
if err := g.mongoDB.RemoverUserFromSuperGroup(ctx, groupID, userIDs); err != nil {
return err
}
return g.cache.DelSuperGroupMemberIDs(groupID).DelJoinedSuperGroupIDs(userIDs...).ExecDel(ctx)
})
if err := g.mongoDB.RemoverUserFromSuperGroup(ctx, groupID, userIDs); err != nil {
return err
}
return g.cache.DelSuperGroupMemberIDs(groupID).DelJoinedSuperGroupIDs(userIDs...).ExecDel(ctx)
}
func (g *groupDatabase) CreateSuperGroupMember(ctx context.Context, groupID string, userIDs []string) error {
return g.ctxTx.Transaction(ctx, func(ctx context.Context) error {
if err := g.mongoDB.AddUserToSuperGroup(ctx, groupID, userIDs); err != nil {
return err
}
return g.cache.DelSuperGroupMemberIDs(groupID).DelJoinedSuperGroupIDs(userIDs...).ExecDel(ctx)
})
if err := g.mongoDB.AddUserToSuperGroup(ctx, groupID, userIDs); err != nil {
return err
}
return g.cache.DelSuperGroupMemberIDs(groupID).DelJoinedSuperGroupIDs(userIDs...).ExecDel(ctx)
}
@@ -100,13 +100,12 @@ type commonMsgDatabase struct {
msgDocDatabase unRelationTb.MsgDocModelInterface
extendMsgDatabase unRelationTb.ExtendMsgSetModelInterface
extendMsgSetModel unRelationTb.ExtendMsgSetModel
msg unRelationTb.MsgDocModel
cache cache.MsgModel
producer *kafka.Producer
producerToMongo *kafka.Producer
producerToModify *kafka.Producer
producerToPush *kafka.Producer
// model
msg unRelationTb.MsgDocModel
}
func (db *commonMsgDatabase) MsgToMQ(ctx context.Context, key string, msg2mq *sdkws.MsgData) error {
@@ -277,7 +276,7 @@ func (db *commonMsgDatabase) DelMsgBySeqs(ctx context.Context, conversationID st
}
func (db *commonMsgDatabase) DelMsgBySeqsInOneDoc(ctx context.Context, docID string, seqs []int64) (unExistSeqs []int64, err error) {
seqMsgs, indexes, unExistSeqs, err := db.GetMsgAndIndexBySeqsInOneDoc(ctx, docID, seqs)
seqMsgs, indexes, unExistSeqs, err := db.msgDocDatabase.GetMsgAndIndexBySeqsInOneDoc(ctx, docID, seqs)
if err != nil {
return nil, err
}
@@ -289,37 +288,6 @@ func (db *commonMsgDatabase) DelMsgBySeqsInOneDoc(ctx context.Context, docID str
return unExistSeqs, nil
}
func (db *commonMsgDatabase) GetMsgAndIndexBySeqsInOneDoc(ctx context.Context, docID string, seqs []int64) (seqMsgs []*sdkws.MsgData, indexes []int, unExistSeqs []int64, err error) {
doc, err := db.msgDocDatabase.FindOneByDocID(ctx, docID)
if err != nil {
return nil, nil, nil, err
}
singleCount := 0
var hasSeqList []int64
for i := 0; i < len(doc.Msg); i++ {
msgPb, err := db.unmarshalMsg(&doc.Msg[i])
if err != nil {
return nil, nil, nil, err
}
if utils.Contain(msgPb.Seq, seqs...) {
indexes = append(indexes, i)
seqMsgs = append(seqMsgs, msgPb)
hasSeqList = append(hasSeqList, msgPb.Seq)
singleCount++
if singleCount == len(seqs) {
break
}
}
}
for _, i := range seqs {
if utils.Contain(i, hasSeqList...) {
continue
}
unExistSeqs = append(unExistSeqs, i)
}
return seqMsgs, indexes, unExistSeqs, nil
}
func (db *commonMsgDatabase) GetNewestMsg(ctx context.Context, conversationID string) (msgPb *sdkws.MsgData, err error) {
msgInfo, err := db.msgDocDatabase.GetNewestMsg(ctx, conversationID)
if err != nil {
@@ -395,12 +363,11 @@ func (db *commonMsgDatabase) refetchDelSeqsMsgs(ctx context.Context, conversatio
}
func (db *commonMsgDatabase) findMsgBySeq(ctx context.Context, docID string, seqs []int64) (seqMsgs []*sdkws.MsgData, unExistSeqs []int64, err error) {
beginSeq, endSeq := db.msg.GetSeqsBeginEnd(seqs)
msgs, err := db.msgDocDatabase.GetMsgBySeqIndexIn1Doc(ctx, docID, beginSeq, endSeq)
msgs, err := db.msgDocDatabase.GetMsgBySeqIndexIn1Doc(ctx, docID, seqs)
if err != nil {
return nil, nil, err
}
log.ZDebug(ctx, "findMsgBySeq", "docID", docID, "seqs", seqs, "beginSeq", beginSeq, "endSeq", endSeq, "len(msgs)", len(msgs))
log.ZDebug(ctx, "findMsgBySeq", "docID", docID, "seqs", seqs, "len(msgs)", len(msgs))
seqMsgs = append(seqMsgs, msgs...)
if len(msgs) == 0 {
unExistSeqs = seqs
@@ -416,7 +383,7 @@ func (db *commonMsgDatabase) findMsgBySeq(ctx context.Context, docID string, seq
}
}
}
msgs, _, unExistSeqs, err = db.GetMsgAndIndexBySeqsInOneDoc(ctx, docID, unExistSeqs)
msgs, _, unExistSeqs, err = db.msgDocDatabase.GetMsgAndIndexBySeqsInOneDoc(ctx, docID, unExistSeqs)
if err != nil {
return nil, nil, err
}
@@ -446,7 +413,7 @@ func (db *commonMsgDatabase) getMsgBySeqsRange(ctx context.Context, conversation
m = db.msg.GetDocIDSeqsMap(conversationID, totalNotExistSeqs)
for docID, seqs := range m {
docID = db.msg.ToNextDoc(docID)
msgs, _, unExistSeqs, err := db.GetMsgAndIndexBySeqsInOneDoc(ctx, docID, seqs)
msgs, _, unExistSeqs, err := db.msgDocDatabase.GetMsgAndIndexBySeqsInOneDoc(ctx, docID, seqs)
if err != nil {
missedSeqs = append(missedSeqs, seqs...)
log.ZError(ctx, "get message from mongo exception", err, "docID", docID, "seqs", seqs)
@@ -477,7 +444,6 @@ func (db *commonMsgDatabase) getMsgBySeqsRange(ctx context.Context, conversation
if len(totalNotExistSeqs) > 0 || len(delSeqs) > 0 {
sort.Sort(utils.MsgBySeq(seqMsgs))
}
// missSeqs为依然缺失的
return seqMsgs, nil
}
+18 -22
View File
@@ -76,40 +76,36 @@ func (u *userDatabase) Find(ctx context.Context, userIDs []string) (users []*rel
// 插入多条 外部保证userID 不重复 且在db中不存在
func (u *userDatabase) Create(ctx context.Context, users []*relation.UserModel) (err error) {
return u.tx.Transaction(func(tx any) error {
if err := u.tx.Transaction(func(tx any) error {
err = u.userDB.Create(ctx, users)
if err != nil {
return err
}
var userIDs []string
for _, user := range users {
userIDs = append(userIDs, user.UserID)
}
return u.cache.DelUsersInfo(userIDs...).ExecDel(ctx)
})
return nil
}); err != nil {
return err
}
var userIDs []string
for _, user := range users {
userIDs = append(userIDs, user.UserID)
}
return u.cache.DelUsersInfo(userIDs...).ExecDel(ctx)
}
// 更新(非零值) 外部保证userID存在
func (u *userDatabase) Update(ctx context.Context, user *relation.UserModel) (err error) {
return u.tx.Transaction(func(tx any) error {
err = u.userDB.Update(ctx, user)
if err != nil {
return err
}
return u.cache.DelUsersInfo(user.UserID).ExecDel(ctx)
})
if err := u.userDB.Update(ctx, user); err != nil {
return err
}
return u.cache.DelUsersInfo(user.UserID).ExecDel(ctx)
}
// 更新(零值) 外部保证userID存在
func (u *userDatabase) UpdateByMap(ctx context.Context, userID string, args map[string]interface{}) (err error) {
return u.tx.Transaction(func(tx any) error {
err = u.userDB.UpdateByMap(ctx, userID, args)
if err != nil {
return err
}
return u.cache.DelUsersInfo(userID).ExecDel(ctx)
})
if err := u.userDB.UpdateByMap(ctx, userID, args); err != nil {
return err
}
return u.cache.DelUsersInfo(userID).ExecDel(ctx)
}
// 获取,如果没找到,不返回错误