mirror of
https://github.com/openimsdk/open-im-server.git
synced 2026-04-28 06:19:20 +08:00
fix: Fix Bug related issues with OpenIM actions PR (#921)
* feat: add test changelog file Signed-off-by: Xinwei Xiong(cubxxw-openim) <3293172751nss@gmail.com> * feat: add test changelog file Signed-off-by: Xinwei Xiong(cubxxw-openim) <3293172751nss@gmail.com> * feat: add test changelog file Signed-off-by: Xinwei Xiong(cubxxw-openim) <3293172751nss@gmail.com> * feat: add test changelog file Signed-off-by: Xinwei Xiong(cubxxw-openim) <3293172751nss@gmail.com> * feat: add test changelog file Signed-off-by: Xinwei Xiong(cubxxw-openim) <3293172751nss@gmail.com> * feat: add test changelog file Signed-off-by: Xinwei Xiong(cubxxw-openim) <3293172751nss@gmail.com> * feat: add test changelog file Signed-off-by: Xinwei Xiong(cubxxw-openim) <3293172751nss@gmail.com> * feat: add scripts test Signed-off-by: Xinwei Xiong(cubxxw-openim) <3293172751nss@gmail.com> * feat: add scripts test and format Signed-off-by: Xinwei Xiong(cubxxw-openim) <3293172751nss@gmail.com> * feat: add scripts test and format Signed-off-by: Xinwei Xiong(cubxxw-openim) <3293172751nss@gmail.com> * feat: add scripts test and format Signed-off-by: Xinwei Xiong(cubxxw-openim) <3293172751nss@gmail.com> * feat: add scripts test and format Signed-off-by: Xinwei Xiong(cubxxw-openim) <3293172751nss@gmail.com> * feat: add scripts test and format Signed-off-by: Xinwei Xiong(cubxxw-openim) <3293172751nss@gmail.com> * feat: add scripts test and format Signed-off-by: Xinwei Xiong(cubxxw-openim) <3293172751nss@gmail.com> * feat: add scripts test and format Signed-off-by: Xinwei Xiong(cubxxw-openim) <3293172751nss@gmail.com> * feat: add scripts test and format Signed-off-by: Xinwei Xiong(cubxxw-openim) <3293172751nss@gmail.com> * feat: add scripts test and format Signed-off-by: Xinwei Xiong(cubxxw-openim) <3293172751nss@gmail.com> --------- Signed-off-by: Xinwei Xiong(cubxxw-openim) <3293172751nss@gmail.com>
This commit is contained in:
@@ -27,14 +27,14 @@ import (
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/convert"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/cache"
|
||||
unRelationTb "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/unrelation"
|
||||
unrelationtb "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/unrelation"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/unrelation"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/kafka"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/prome"
|
||||
|
||||
"go.mongodb.org/mongo-driver/mongo"
|
||||
|
||||
pbMsg "github.com/OpenIMSDK/protocol/msg"
|
||||
pbmsg "github.com/OpenIMSDK/protocol/msg"
|
||||
"github.com/OpenIMSDK/protocol/sdkws"
|
||||
"github.com/OpenIMSDK/tools/utils"
|
||||
)
|
||||
@@ -48,7 +48,7 @@ type CommonMsgDatabase interface {
|
||||
// 批量插入消息
|
||||
BatchInsertChat2DB(ctx context.Context, conversationID string, msgs []*sdkws.MsgData, currentMaxSeq int64) error
|
||||
// 撤回消息
|
||||
RevokeMsg(ctx context.Context, conversationID string, seq int64, revoke *unRelationTb.RevokeModel) error
|
||||
RevokeMsg(ctx context.Context, conversationID string, seq int64, revoke *unrelationtb.RevokeModel) error
|
||||
// mark as read
|
||||
MarkSingleChatMsgsAsRead(ctx context.Context, userID string, conversationID string, seqs []int64) error
|
||||
// 刪除redis中消息缓存
|
||||
@@ -93,7 +93,7 @@ type CommonMsgDatabase interface {
|
||||
GetConversationMinMaxSeqInMongoAndCache(ctx context.Context, conversationID string) (minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache int64, err error)
|
||||
SetSendMsgStatus(ctx context.Context, id string, status int32) error
|
||||
GetSendMsgStatus(ctx context.Context, id string) (int32, error)
|
||||
SearchMessage(ctx context.Context, req *pbMsg.SearchMessageReq) (total int32, msgData []*sdkws.MsgData, err error)
|
||||
SearchMessage(ctx context.Context, req *pbmsg.SearchMessageReq) (total int32, msgData []*sdkws.MsgData, err error)
|
||||
|
||||
// to mq
|
||||
MsgToMQ(ctx context.Context, key string, msg2mq *sdkws.MsgData) error
|
||||
@@ -109,7 +109,7 @@ type CommonMsgDatabase interface {
|
||||
ase bool,
|
||||
pageNumber int32,
|
||||
showNumber int32,
|
||||
) (msgCount int64, userCount int64, users []*unRelationTb.UserCount, dateCount map[string]int64, err error)
|
||||
) (msgCount int64, userCount int64, users []*unrelationtb.UserCount, dateCount map[string]int64, err error)
|
||||
RangeGroupSendCount(
|
||||
ctx context.Context,
|
||||
start time.Time,
|
||||
@@ -117,11 +117,11 @@ type CommonMsgDatabase interface {
|
||||
ase bool,
|
||||
pageNumber int32,
|
||||
showNumber int32,
|
||||
) (msgCount int64, userCount int64, groups []*unRelationTb.GroupCount, dateCount map[string]int64, err error)
|
||||
) (msgCount int64, userCount int64, groups []*unrelationtb.GroupCount, dateCount map[string]int64, err error)
|
||||
ConvertMsgsDocLen(ctx context.Context, conversationIDs []string)
|
||||
}
|
||||
|
||||
func NewCommonMsgDatabase(msgDocModel unRelationTb.MsgDocModelInterface, cacheModel cache.MsgModel) CommonMsgDatabase {
|
||||
func NewCommonMsgDatabase(msgDocModel unrelationtb.MsgDocModelInterface, cacheModel cache.MsgModel) CommonMsgDatabase {
|
||||
return &commonMsgDatabase{
|
||||
msgDocDatabase: msgDocModel,
|
||||
cache: cacheModel,
|
||||
@@ -139,8 +139,8 @@ func InitCommonMsgDatabase(rdb redis.UniversalClient, database *mongo.Database)
|
||||
}
|
||||
|
||||
type commonMsgDatabase struct {
|
||||
msgDocDatabase unRelationTb.MsgDocModelInterface
|
||||
msg unRelationTb.MsgDocModel
|
||||
msgDocDatabase unrelationtb.MsgDocModelInterface
|
||||
msg unrelationtb.MsgDocModel
|
||||
cache cache.MsgModel
|
||||
producer *kafka.Producer
|
||||
producerToMongo *kafka.Producer
|
||||
@@ -155,14 +155,14 @@ func (db *commonMsgDatabase) MsgToMQ(ctx context.Context, key string, msg2mq *sd
|
||||
|
||||
func (db *commonMsgDatabase) MsgToModifyMQ(ctx context.Context, key, conversationID string, messages []*sdkws.MsgData) error {
|
||||
if len(messages) > 0 {
|
||||
_, _, err := db.producerToModify.SendMessage(ctx, key, &pbMsg.MsgDataToModifyByMQ{ConversationID: conversationID, Messages: messages})
|
||||
_, _, err := db.producerToModify.SendMessage(ctx, key, &pbmsg.MsgDataToModifyByMQ{ConversationID: conversationID, Messages: messages})
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (db *commonMsgDatabase) MsgToPushMQ(ctx context.Context, key, conversationID string, msg2mq *sdkws.MsgData) (int32, int64, error) {
|
||||
partition, offset, err := db.producerToPush.SendMessage(ctx, key, &pbMsg.PushMsgDataToMQ{MsgData: msg2mq, ConversationID: conversationID})
|
||||
partition, offset, err := db.producerToPush.SendMessage(ctx, key, &pbmsg.PushMsgDataToMQ{MsgData: msg2mq, ConversationID: conversationID})
|
||||
if err != nil {
|
||||
log.ZError(ctx, "MsgToPushMQ", err, "key", key, "msg2mq", msg2mq)
|
||||
return 0, 0, err
|
||||
@@ -172,7 +172,7 @@ func (db *commonMsgDatabase) MsgToPushMQ(ctx context.Context, key, conversationI
|
||||
|
||||
func (db *commonMsgDatabase) MsgToMongoMQ(ctx context.Context, key, conversationID string, messages []*sdkws.MsgData, lastSeq int64) error {
|
||||
if len(messages) > 0 {
|
||||
_, _, err := db.producerToMongo.SendMessage(ctx, key, &pbMsg.MsgDataToMongoByMQ{LastSeq: lastSeq, ConversationID: conversationID, MsgData: messages})
|
||||
_, _, err := db.producerToMongo.SendMessage(ctx, key, &pbmsg.MsgDataToMongoByMQ{LastSeq: lastSeq, ConversationID: conversationID, MsgData: messages})
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
@@ -188,13 +188,13 @@ func (db *commonMsgDatabase) BatchInsertBlock(ctx context.Context, conversationI
|
||||
var ok bool
|
||||
switch key {
|
||||
case updateKeyMsg:
|
||||
var msg *unRelationTb.MsgDataModel
|
||||
msg, ok = field.(*unRelationTb.MsgDataModel)
|
||||
var msg *unrelationtb.MsgDataModel
|
||||
msg, ok = field.(*unrelationtb.MsgDataModel)
|
||||
if msg != nil && msg.Seq != firstSeq+int64(i) {
|
||||
return errs.ErrInternalServer.Wrap("seq is invalid")
|
||||
}
|
||||
case updateKeyRevoke:
|
||||
_, ok = field.(*unRelationTb.RevokeModel)
|
||||
_, ok = field.(*unrelationtb.RevokeModel)
|
||||
default:
|
||||
return errs.ErrInternalServer.Wrap("key is invalid")
|
||||
}
|
||||
@@ -234,9 +234,9 @@ func (db *commonMsgDatabase) BatchInsertBlock(ctx context.Context, conversationI
|
||||
continue // 匹配到了,继续下一个(不一定修改)
|
||||
}
|
||||
}
|
||||
doc := unRelationTb.MsgDocModel{
|
||||
doc := unrelationtb.MsgDocModel{
|
||||
DocID: db.msg.GetDocID(conversationID, seq),
|
||||
Msg: make([]*unRelationTb.MsgInfoModel, num),
|
||||
Msg: make([]*unrelationtb.MsgInfoModel, num),
|
||||
}
|
||||
var insert int // 插入的数量
|
||||
for j := i; j < len(fields); j++ {
|
||||
@@ -247,18 +247,18 @@ func (db *commonMsgDatabase) BatchInsertBlock(ctx context.Context, conversationI
|
||||
insert++
|
||||
switch key {
|
||||
case updateKeyMsg:
|
||||
doc.Msg[db.msg.GetMsgIndex(seq)] = &unRelationTb.MsgInfoModel{
|
||||
Msg: fields[j].(*unRelationTb.MsgDataModel),
|
||||
doc.Msg[db.msg.GetMsgIndex(seq)] = &unrelationtb.MsgInfoModel{
|
||||
Msg: fields[j].(*unrelationtb.MsgDataModel),
|
||||
}
|
||||
case updateKeyRevoke:
|
||||
doc.Msg[db.msg.GetMsgIndex(seq)] = &unRelationTb.MsgInfoModel{
|
||||
Revoke: fields[j].(*unRelationTb.RevokeModel),
|
||||
doc.Msg[db.msg.GetMsgIndex(seq)] = &unrelationtb.MsgInfoModel{
|
||||
Revoke: fields[j].(*unrelationtb.RevokeModel),
|
||||
}
|
||||
}
|
||||
}
|
||||
for i, model := range doc.Msg {
|
||||
if model == nil {
|
||||
model = &unRelationTb.MsgInfoModel{}
|
||||
model = &unrelationtb.MsgInfoModel{}
|
||||
doc.Msg[i] = model
|
||||
}
|
||||
if model.DelList == nil {
|
||||
@@ -288,9 +288,9 @@ func (db *commonMsgDatabase) BatchInsertChat2DB(ctx context.Context, conversatio
|
||||
if msg == nil {
|
||||
continue
|
||||
}
|
||||
var offlinePushModel *unRelationTb.OfflinePushModel
|
||||
var offlinePushModel *unrelationtb.OfflinePushModel
|
||||
if msg.OfflinePushInfo != nil {
|
||||
offlinePushModel = &unRelationTb.OfflinePushModel{
|
||||
offlinePushModel = &unrelationtb.OfflinePushModel{
|
||||
Title: msg.OfflinePushInfo.Title,
|
||||
Desc: msg.OfflinePushInfo.Desc,
|
||||
Ex: msg.OfflinePushInfo.Ex,
|
||||
@@ -298,7 +298,7 @@ func (db *commonMsgDatabase) BatchInsertChat2DB(ctx context.Context, conversatio
|
||||
IOSBadgeCount: msg.OfflinePushInfo.IOSBadgeCount,
|
||||
}
|
||||
}
|
||||
msgs[i] = &unRelationTb.MsgDataModel{
|
||||
msgs[i] = &unrelationtb.MsgDataModel{
|
||||
SendID: msg.SendID,
|
||||
RecvID: msg.RecvID,
|
||||
GroupID: msg.GroupID,
|
||||
@@ -325,7 +325,7 @@ func (db *commonMsgDatabase) BatchInsertChat2DB(ctx context.Context, conversatio
|
||||
return db.BatchInsertBlock(ctx, conversationID, msgs, updateKeyMsg, msgList[0].Seq)
|
||||
}
|
||||
|
||||
func (db *commonMsgDatabase) RevokeMsg(ctx context.Context, conversationID string, seq int64, revoke *unRelationTb.RevokeModel) error {
|
||||
func (db *commonMsgDatabase) RevokeMsg(ctx context.Context, conversationID string, seq int64, revoke *unrelationtb.RevokeModel) error {
|
||||
return db.BatchInsertBlock(ctx, conversationID, []any{revoke}, updateKeyRevoke, seq)
|
||||
}
|
||||
|
||||
@@ -413,7 +413,7 @@ func (db *commonMsgDatabase) getMsgBySeqs(ctx context.Context, userID, conversat
|
||||
return totalMsgs, nil
|
||||
}
|
||||
|
||||
func (db *commonMsgDatabase) findMsgInfoBySeq(ctx context.Context, userID, docID string, seqs []int64) (totalMsgs []*unRelationTb.MsgInfoModel, err error) {
|
||||
func (db *commonMsgDatabase) findMsgInfoBySeq(ctx context.Context, userID, docID string, seqs []int64) (totalMsgs []*unrelationtb.MsgInfoModel, err error) {
|
||||
msgs, err := db.msgDocDatabase.GetMsgBySeqIndexIn1Doc(ctx, userID, docID, seqs)
|
||||
for _, msg := range msgs {
|
||||
if msg.IsRead {
|
||||
@@ -927,7 +927,7 @@ func (db *commonMsgDatabase) RangeUserSendCount(
|
||||
ase bool,
|
||||
pageNumber int32,
|
||||
showNumber int32,
|
||||
) (msgCount int64, userCount int64, users []*unRelationTb.UserCount, dateCount map[string]int64, err error) {
|
||||
) (msgCount int64, userCount int64, users []*unrelationtb.UserCount, dateCount map[string]int64, err error) {
|
||||
return db.msgDocDatabase.RangeUserSendCount(ctx, start, end, group, ase, pageNumber, showNumber)
|
||||
}
|
||||
|
||||
@@ -938,11 +938,11 @@ func (db *commonMsgDatabase) RangeGroupSendCount(
|
||||
ase bool,
|
||||
pageNumber int32,
|
||||
showNumber int32,
|
||||
) (msgCount int64, userCount int64, groups []*unRelationTb.GroupCount, dateCount map[string]int64, err error) {
|
||||
) (msgCount int64, userCount int64, groups []*unrelationtb.GroupCount, dateCount map[string]int64, err error) {
|
||||
return db.msgDocDatabase.RangeGroupSendCount(ctx, start, end, ase, pageNumber, showNumber)
|
||||
}
|
||||
|
||||
func (db *commonMsgDatabase) SearchMessage(ctx context.Context, req *pbMsg.SearchMessageReq) (total int32, msgData []*sdkws.MsgData, err error) {
|
||||
func (db *commonMsgDatabase) SearchMessage(ctx context.Context, req *pbmsg.SearchMessageReq) (total int32, msgData []*sdkws.MsgData, err error) {
|
||||
var totalMsgs []*sdkws.MsgData
|
||||
total, msgs, err := db.msgDocDatabase.SearchMessage(ctx, req)
|
||||
if err != nil {
|
||||
|
||||
Reference in New Issue
Block a user