mirror of
https://github.com/openimsdk/open-im-server.git
synced 2026-04-28 06:19:20 +08:00
bugfix(conversation):removed unexpectedly called functions and itself to avoid out of index query. (#3668)
# Conflicts: # internal/rpc/conversation/conversation.go # pkg/common/storage/database/mgo/conversation.go
This commit is contained in:
@@ -513,14 +513,6 @@ func (c *conversationServer) GetUserConversationIDsHash(ctx context.Context, req
|
|||||||
return &pbconversation.GetUserConversationIDsHashResp{Hash: hash}, nil
|
return &pbconversation.GetUserConversationIDsHashResp{Hash: hash}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *conversationServer) GetConversationsByConversationID(ctx context.Context, req *pbconversation.GetConversationsByConversationIDReq) (*pbconversation.GetConversationsByConversationIDResp, error) {
|
|
||||||
conversations, err := c.conversationDatabase.GetConversationsByConversationID(ctx, req.ConversationIDs)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return &pbconversation.GetConversationsByConversationIDResp{Conversations: convert.ConversationsDB2Pb(conversations)}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *conversationServer) GetConversationOfflinePushUserIDs(ctx context.Context, req *pbconversation.GetConversationOfflinePushUserIDsReq) (*pbconversation.GetConversationOfflinePushUserIDsResp, error) {
|
func (c *conversationServer) GetConversationOfflinePushUserIDs(ctx context.Context, req *pbconversation.GetConversationOfflinePushUserIDsReq) (*pbconversation.GetConversationOfflinePushUserIDsResp, error) {
|
||||||
if req.ConversationID == "" {
|
if req.ConversationID == "" {
|
||||||
return nil, errs.ErrArgs.WrapMsg("conversationID is empty")
|
return nil, errs.ErrArgs.WrapMsg("conversationID is empty")
|
||||||
@@ -717,56 +709,6 @@ func (c *conversationServer) GetOwnerConversation(ctx context.Context, req *pbco
|
|||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *conversationServer) GetConversationsNeedClearMsg(ctx context.Context, _ *pbconversation.GetConversationsNeedClearMsgReq) (*pbconversation.GetConversationsNeedClearMsgResp, error) {
|
|
||||||
num, err := c.conversationDatabase.GetAllConversationIDsNumber(ctx)
|
|
||||||
if err != nil {
|
|
||||||
log.ZError(ctx, "GetAllConversationIDsNumber failed", err)
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
const batchNum = 100
|
|
||||||
|
|
||||||
if num == 0 {
|
|
||||||
return nil, errs.New("Need Destruct Msg is nil").Wrap()
|
|
||||||
}
|
|
||||||
|
|
||||||
maxPage := (num + batchNum - 1) / batchNum
|
|
||||||
|
|
||||||
temp := make([]*dbModel.Conversation, 0, maxPage*batchNum)
|
|
||||||
|
|
||||||
for pageNumber := 0; pageNumber < int(maxPage); pageNumber++ {
|
|
||||||
pagination := &sdkws.RequestPagination{
|
|
||||||
PageNumber: int32(pageNumber),
|
|
||||||
ShowNumber: batchNum,
|
|
||||||
}
|
|
||||||
|
|
||||||
conversationIDs, err := c.conversationDatabase.PageConversationIDs(ctx, pagination)
|
|
||||||
if err != nil {
|
|
||||||
log.ZError(ctx, "PageConversationIDs failed", err, "pageNumber", pageNumber)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// log.ZDebug(ctx, "PageConversationIDs success", "pageNumber", pageNumber, "conversationIDsNum", len(conversationIDs), "conversationIDs", conversationIDs)
|
|
||||||
if len(conversationIDs) == 0 {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
conversations, err := c.conversationDatabase.GetConversationsByConversationID(ctx, conversationIDs)
|
|
||||||
if err != nil {
|
|
||||||
log.ZError(ctx, "GetConversationsByConversationID failed", err, "conversationIDs", conversationIDs)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, conversation := range conversations {
|
|
||||||
if conversation.IsMsgDestruct && conversation.MsgDestructTime != 0 && ((time.Now().UnixMilli() > (conversation.MsgDestructTime + conversation.LatestMsgDestructTime.UnixMilli() + 8*60*60)) || // 8*60*60 is UTC+8
|
|
||||||
conversation.LatestMsgDestructTime.IsZero()) {
|
|
||||||
temp = append(temp, conversation)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return &pbconversation.GetConversationsNeedClearMsgResp{Conversations: convert.ConversationsDB2Pb(temp)}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *conversationServer) GetNotNotifyConversationIDs(ctx context.Context, req *pbconversation.GetNotNotifyConversationIDsReq) (*pbconversation.GetNotNotifyConversationIDsResp, error) {
|
func (c *conversationServer) GetNotNotifyConversationIDs(ctx context.Context, req *pbconversation.GetNotNotifyConversationIDsReq) (*pbconversation.GetNotNotifyConversationIDsResp, error) {
|
||||||
if err := authverify.CheckAccess(ctx, req.UserID); err != nil {
|
if err := authverify.CheckAccess(ctx, req.UserID); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
|||||||
@@ -19,7 +19,6 @@ import (
|
|||||||
|
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
|
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
|
||||||
"github.com/openimsdk/protocol/constant"
|
"github.com/openimsdk/protocol/constant"
|
||||||
"github.com/openimsdk/protocol/conversation"
|
|
||||||
"github.com/openimsdk/protocol/msg"
|
"github.com/openimsdk/protocol/msg"
|
||||||
"github.com/openimsdk/protocol/sdkws"
|
"github.com/openimsdk/protocol/sdkws"
|
||||||
"github.com/openimsdk/tools/log"
|
"github.com/openimsdk/tools/log"
|
||||||
@@ -74,7 +73,7 @@ func (m *msgServer) DeleteMsgs(ctx context.Context, req *msg.DeleteMsgsReq) (*ms
|
|||||||
if err := m.MsgDatabase.DeleteMsgsPhysicalBySeqs(ctx, req.ConversationID, req.Seqs); err != nil {
|
if err := m.MsgDatabase.DeleteMsgsPhysicalBySeqs(ctx, req.ConversationID, req.Seqs); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
conv, err := m.conversationClient.GetConversationsByConversationID(ctx, req.ConversationID)
|
conv, err := m.conversationClient.GetConversation(ctx, req.ConversationID, req.UserID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@@ -116,14 +115,12 @@ func (m *msgServer) DeleteMsgPhysical(ctx context.Context, req *msg.DeleteMsgPhy
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (m *msgServer) clearConversation(ctx context.Context, conversationIDs []string, userID string, deleteSyncOpt *msg.DeleteSyncOpt) error {
|
func (m *msgServer) clearConversation(ctx context.Context, conversationIDs []string, userID string, deleteSyncOpt *msg.DeleteSyncOpt) error {
|
||||||
conversations, err := m.conversationClient.GetConversationsByConversationIDs(ctx, conversationIDs)
|
conversations, err := m.conversationClient.GetConversations(ctx, conversationIDs, userID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
var existConversations []*conversation.Conversation
|
|
||||||
var existConversationIDs []string
|
var existConversationIDs []string
|
||||||
for _, conversation := range conversations {
|
for _, conversation := range conversations {
|
||||||
existConversations = append(existConversations, conversation)
|
|
||||||
existConversationIDs = append(existConversationIDs, conversation.ConversationID)
|
existConversationIDs = append(existConversationIDs, conversation.ConversationID)
|
||||||
}
|
}
|
||||||
log.ZDebug(ctx, "ClearConversationsMsg", "existConversationIDs", existConversationIDs)
|
log.ZDebug(ctx, "ClearConversationsMsg", "existConversationIDs", existConversationIDs)
|
||||||
@@ -152,7 +149,7 @@ func (m *msgServer) clearConversation(ctx context.Context, conversationIDs []str
|
|||||||
if err := m.MsgDatabase.SetMinSeqs(ctx, m.getMinSeqs(maxSeqs)); err != nil {
|
if err := m.MsgDatabase.SetMinSeqs(ctx, m.getMinSeqs(maxSeqs)); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
for _, conversation := range existConversations {
|
for _, conversation := range conversations {
|
||||||
tips := &sdkws.ClearConversationTips{UserID: userID, ConversationIDs: []string{conversation.ConversationID}}
|
tips := &sdkws.ClearConversationTips{UserID: userID, ConversationIDs: []string{conversation.ConversationID}}
|
||||||
m.notificationSender.NotificationWithSessionType(ctx, userID, m.conversationAndGetRecvID(conversation, userID), constant.ClearConversationNotification, conversation.ConversationType, tips)
|
m.notificationSender.NotificationWithSessionType(ctx, userID, m.conversationAndGetRecvID(conversation, userID), constant.ClearConversationNotification, conversation.ConversationType, tips)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -61,8 +61,6 @@ type ConversationDatabase interface {
|
|||||||
GetAllConversationIDsNumber(ctx context.Context) (int64, error)
|
GetAllConversationIDsNumber(ctx context.Context) (int64, error)
|
||||||
// PageConversationIDs paginates through conversation IDs based on the specified pagination settings.
|
// PageConversationIDs paginates through conversation IDs based on the specified pagination settings.
|
||||||
PageConversationIDs(ctx context.Context, pagination pagination.Pagination) (conversationIDs []string, err error)
|
PageConversationIDs(ctx context.Context, pagination pagination.Pagination) (conversationIDs []string, err error)
|
||||||
// GetConversationsByConversationID retrieves conversations by their IDs.
|
|
||||||
GetConversationsByConversationID(ctx context.Context, conversationIDs []string) ([]*relationtb.Conversation, error)
|
|
||||||
// GetConversationIDsNeedDestruct fetches conversations that need to be destructed based on specific criteria.
|
// GetConversationIDsNeedDestruct fetches conversations that need to be destructed based on specific criteria.
|
||||||
GetConversationIDsNeedDestruct(ctx context.Context) ([]*relationtb.Conversation, error)
|
GetConversationIDsNeedDestruct(ctx context.Context) ([]*relationtb.Conversation, error)
|
||||||
// GetConversationNotReceiveMessageUserIDs gets user IDs for users in a conversation who have not received messages.
|
// GetConversationNotReceiveMessageUserIDs gets user IDs for users in a conversation who have not received messages.
|
||||||
@@ -375,10 +373,6 @@ func (c *conversationDatabase) PageConversationIDs(ctx context.Context, paginati
|
|||||||
return c.conversationDB.PageConversationIDs(ctx, pagination)
|
return c.conversationDB.PageConversationIDs(ctx, pagination)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *conversationDatabase) GetConversationsByConversationID(ctx context.Context, conversationIDs []string) ([]*relationtb.Conversation, error) {
|
|
||||||
return c.conversationDB.GetConversationsByConversationID(ctx, conversationIDs)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *conversationDatabase) GetConversationIDsNeedDestruct(ctx context.Context) ([]*relationtb.Conversation, error) {
|
func (c *conversationDatabase) GetConversationIDsNeedDestruct(ctx context.Context) ([]*relationtb.Conversation, error) {
|
||||||
return c.conversationDB.GetConversationIDsNeedDestruct(ctx)
|
return c.conversationDB.GetConversationIDsNeedDestruct(ctx)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -39,7 +39,6 @@ type Conversation interface {
|
|||||||
GetAllConversationIDs(ctx context.Context) ([]string, error)
|
GetAllConversationIDs(ctx context.Context) ([]string, error)
|
||||||
GetAllConversationIDsNumber(ctx context.Context) (int64, error)
|
GetAllConversationIDsNumber(ctx context.Context) (int64, error)
|
||||||
PageConversationIDs(ctx context.Context, pagination pagination.Pagination) (conversationIDs []string, err error)
|
PageConversationIDs(ctx context.Context, pagination pagination.Pagination) (conversationIDs []string, err error)
|
||||||
GetConversationsByConversationID(ctx context.Context, conversationIDs []string) ([]*model.Conversation, error)
|
|
||||||
GetConversationIDsNeedDestruct(ctx context.Context) ([]*model.Conversation, error)
|
GetConversationIDsNeedDestruct(ctx context.Context) ([]*model.Conversation, error)
|
||||||
GetConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) ([]string, error)
|
GetConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) ([]string, error)
|
||||||
FindConversationUserVersion(ctx context.Context, userID string, version uint, limit int) (*model.VersionLog, error)
|
FindConversationUserVersion(ctx context.Context, userID string, version uint, limit int) (*model.VersionLog, error)
|
||||||
|
|||||||
@@ -47,6 +47,12 @@ func NewConversationMongo(db *mongo.Database) (*ConversationMgo, error) {
|
|||||||
},
|
},
|
||||||
Options: options.Index(),
|
Options: options.Index(),
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
Keys: bson.D{
|
||||||
|
{Key: "conversation_id", Value: 1},
|
||||||
|
},
|
||||||
|
Options: options.Index().SetUnique(true),
|
||||||
|
},
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errs.Wrap(err)
|
return nil, errs.Wrap(err)
|
||||||
@@ -232,10 +238,6 @@ func (c *ConversationMgo) PageConversationIDs(ctx context.Context, pagination pa
|
|||||||
return mongoutil.FindPageOnly[string](ctx, c.coll, bson.M{}, pagination, options.Find().SetProjection(bson.M{"conversation_id": 1}))
|
return mongoutil.FindPageOnly[string](ctx, c.coll, bson.M{}, pagination, options.Find().SetProjection(bson.M{"conversation_id": 1}))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *ConversationMgo) GetConversationsByConversationID(ctx context.Context, conversationIDs []string) ([]*model.Conversation, error) {
|
|
||||||
return mongoutil.Find[*model.Conversation](ctx, c.coll, bson.M{"conversation_id": bson.M{"$in": conversationIDs}})
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *ConversationMgo) GetConversationIDsNeedDestruct(ctx context.Context) ([]*model.Conversation, error) {
|
func (c *ConversationMgo) GetConversationIDsNeedDestruct(ctx context.Context) ([]*model.Conversation, error) {
|
||||||
// "is_msg_destruct = 1 && msg_destruct_time != 0 && (UNIX_TIMESTAMP(NOW()) > (msg_destruct_time + UNIX_TIMESTAMP(latest_msg_destruct_time)) || latest_msg_destruct_time is NULL)"
|
// "is_msg_destruct = 1 && msg_destruct_time != 0 && (UNIX_TIMESTAMP(NOW()) > (msg_destruct_time + UNIX_TIMESTAMP(latest_msg_destruct_time)) || latest_msg_destruct_time is NULL)"
|
||||||
return mongoutil.Find[*model.Conversation](ctx, c.coll, bson.M{
|
return mongoutil.Find[*model.Conversation](ctx, c.coll, bson.M{
|
||||||
|
|||||||
@@ -2,6 +2,7 @@ package rpcli
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
|
||||||
"github.com/openimsdk/protocol/conversation"
|
"github.com/openimsdk/protocol/conversation"
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
)
|
)
|
||||||
@@ -30,18 +31,6 @@ func (x *ConversationClient) SetConversations(ctx context.Context, ownerUserIDs
|
|||||||
return ignoreResp(x.ConversationClient.SetConversations(ctx, req))
|
return ignoreResp(x.ConversationClient.SetConversations(ctx, req))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (x *ConversationClient) GetConversationsByConversationIDs(ctx context.Context, conversationIDs []string) ([]*conversation.Conversation, error) {
|
|
||||||
if len(conversationIDs) == 0 {
|
|
||||||
return nil, nil
|
|
||||||
}
|
|
||||||
req := &conversation.GetConversationsByConversationIDReq{ConversationIDs: conversationIDs}
|
|
||||||
return extractField(ctx, x.ConversationClient.GetConversationsByConversationID, req, (*conversation.GetConversationsByConversationIDResp).GetConversations)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (x *ConversationClient) GetConversationsByConversationID(ctx context.Context, conversationID string) (*conversation.Conversation, error) {
|
|
||||||
return firstValue(x.GetConversationsByConversationIDs(ctx, []string{conversationID}))
|
|
||||||
}
|
|
||||||
|
|
||||||
func (x *ConversationClient) SetConversationMinSeq(ctx context.Context, conversationID string, ownerUserIDs []string, minSeq int64) error {
|
func (x *ConversationClient) SetConversationMinSeq(ctx context.Context, conversationID string, ownerUserIDs []string, minSeq int64) error {
|
||||||
if len(ownerUserIDs) == 0 {
|
if len(ownerUserIDs) == 0 {
|
||||||
return nil
|
return nil
|
||||||
|
|||||||
Reference in New Issue
Block a user