mirror of
https://github.com/openimsdk/open-im-server.git
synced 2026-05-17 07:19:02 +08:00
Merge remote-tracking branch 'origin/main' into pre-release-v3.8.4
# Conflicts: # .env # .github/workflows/docker-build-and-release-services-images.yml # .github/workflows/merge-from-milestone.yml # .github/workflows/update-version-file-on-release.yml # README.md # README_zh_CN.md # cmd/main.go # config/discovery.yml # config/notification.yml # config/openim-api.yml # config/openim-msggateway.yml # config/openim-msgtransfer.yml # config/openim-push.yml # config/openim-rpc-auth.yml # config/openim-rpc-conversation.yml # config/openim-rpc-friend.yml # config/openim-rpc-group.yml # config/openim-rpc-msg.yml # config/openim-rpc-third.yml # config/openim-rpc-user.yml # config/share.yml # config/webhooks.yml # deployments/templates/config.yaml # docker-compose.yml # go.mod # go.sum # internal/api/init.go # internal/api/jssdk/tools.go # internal/api/msg.go # internal/api/prometheus_discovery.go # internal/api/router.go # internal/msggateway/init.go # internal/msggateway/ws_server.go # internal/msgtransfer/init.go # internal/msgtransfer/online_history_msg_handler.go # internal/msgtransfer/online_msg_to_mongo_handler.go # internal/push/push.go # internal/rpc/auth/auth.go # internal/rpc/conversation/conversation.go # internal/rpc/group/group.go # internal/rpc/msg/callback.go # internal/rpc/msg/server.go # internal/rpc/relation/friend.go # internal/rpc/relation/notification.go # internal/rpc/third/third.go # internal/rpc/user/user.go # internal/tools/cron/cron_task.go # magefile.go # pkg/common/cmd/api.go # pkg/common/cmd/msg_transfer.go # pkg/common/config/config.go # pkg/common/discovery/discoveryregister.go # pkg/common/prommetrics/prommetrics.go # pkg/common/startrpc/start.go # pkg/common/storage/database/mgo/cache.go # pkg/common/storage/database/mgo/msg_test.go # pkg/rpcli/auth.go # pkg/rpcli/tool.go # pkg/rpcli/user.go # test/stress-test-v2/main.go # test/stress-test/main.go # tools/seq/internal/seq.go # version/version
This commit is contained in:
+1
-1
@@ -26,7 +26,7 @@ func TestName111111(t *testing.T) {
|
||||
"172.16.8.124:7005",
|
||||
"172.16.8.124:7006",
|
||||
},
|
||||
ClusterMode: true,
|
||||
RedisMode: "cluster",
|
||||
Password: "passwd123",
|
||||
//Address: []string{"localhost:16379"},
|
||||
//Password: "openIM123",
|
||||
|
||||
@@ -671,12 +671,18 @@ func (db *commonMsgDatabase) SearchMessage(ctx context.Context, req *pbmsg.Searc
|
||||
func (db *commonMsgDatabase) FindOneByDocIDs(ctx context.Context, conversationIDs []string, seqs map[string]int64) (map[string]*sdkws.MsgData, error) {
|
||||
totalMsgs := make(map[string]*sdkws.MsgData)
|
||||
for _, conversationID := range conversationIDs {
|
||||
seq := seqs[conversationID]
|
||||
seq, ok := seqs[conversationID]
|
||||
if !ok {
|
||||
log.ZWarn(ctx, "seq not found for conversationID", errs.New("seq not found for conversation"), "conversationID", conversationID)
|
||||
continue
|
||||
}
|
||||
docID := db.msgTable.GetDocID(conversationID, seq)
|
||||
msgs, err := db.msgDocDatabase.FindOneByDocID(ctx, docID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
log.ZWarn(ctx, "FindOneByDocID failed", err, "conversationID", conversationID, "docID", docID, "seq", seq)
|
||||
continue
|
||||
}
|
||||
|
||||
index := db.msgTable.GetMsgIndex(seq)
|
||||
totalMsgs[conversationID] = convert.MsgDB2Pb(msgs.Msg[index].Msg)
|
||||
}
|
||||
|
||||
@@ -97,16 +97,34 @@ func (u *userDatabase) InitOnce(ctx context.Context, users []*model.User) error
|
||||
}
|
||||
|
||||
// Determine which users are missing from the database.
|
||||
missingUsers := datautil.SliceAnySub(users, existingUsers, func(e *model.User) string {
|
||||
var (
|
||||
missing, update []*model.User
|
||||
)
|
||||
existMap := datautil.SliceToMap(existingUsers, func(e *model.User) string {
|
||||
return e.UserID
|
||||
})
|
||||
orgMap := datautil.SliceToMap(users, func(e *model.User) string { return e.UserID })
|
||||
for k, u1 := range orgMap {
|
||||
if u2, ok := existMap[k]; !ok {
|
||||
missing = append(missing, u1)
|
||||
} else if u1.Nickname != u2.Nickname {
|
||||
update = append(update, u1)
|
||||
}
|
||||
}
|
||||
|
||||
// Create records for missing users.
|
||||
if len(missingUsers) > 0 {
|
||||
if err := u.userDB.Create(ctx, missingUsers); err != nil {
|
||||
if len(missing) > 0 {
|
||||
if err := u.userDB.Create(ctx, missing); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if len(update) > 0 {
|
||||
for i := range update {
|
||||
if err := u.userDB.UpdateByMap(ctx, update[i].UserID, map[string]any{"nickname": update[i].Nickname}); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -127,7 +127,7 @@ func (x *CacheMgo) Del(ctx context.Context, key []string) error {
|
||||
return nil
|
||||
}
|
||||
_, err := x.coll.DeleteMany(ctx, bson.M{"key": bson.M{"$in": key}})
|
||||
return err
|
||||
return errs.Wrap(err)
|
||||
}
|
||||
|
||||
func (x *CacheMgo) lockKey(key string) string {
|
||||
|
||||
@@ -321,7 +321,12 @@ func (m *MsgMgo) searchMessageIndex(ctx context.Context, filter any, nextID prim
|
||||
}
|
||||
|
||||
func (m *MsgMgo) searchMessage(ctx context.Context, req *msg.SearchMessageReq) (int64, []searchMessageIndex, error) {
|
||||
filter := bson.M{}
|
||||
filter := bson.M{
|
||||
"msgs.msg": bson.M{
|
||||
"$exists": true,
|
||||
"$type": "object",
|
||||
},
|
||||
}
|
||||
if req.RecvID != "" {
|
||||
filter["$or"] = bson.A{
|
||||
bson.M{"msgs.msg.recv_id": req.RecvID},
|
||||
|
||||
@@ -9,6 +9,8 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
|
||||
"github.com/openimsdk/protocol/msg"
|
||||
"github.com/openimsdk/protocol/sdkws"
|
||||
"github.com/openimsdk/tools/db/mongoutil"
|
||||
"go.mongodb.org/mongo-driver/bson"
|
||||
"go.mongodb.org/mongo-driver/mongo"
|
||||
@@ -148,3 +150,29 @@ func TestName5(t *testing.T) {
|
||||
// }
|
||||
// t.Log(seq, sendTime)
|
||||
//}
|
||||
|
||||
func TestSearchMessage(t *testing.T) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second*300)
|
||||
defer cancel()
|
||||
cli := Result(mongo.Connect(ctx, options.Client().ApplyURI("mongodb://openIM:openIM123@172.16.8.135:37017/openim_v3?maxPoolSize=100").SetConnectTimeout(5*time.Second)))
|
||||
|
||||
msgMongo, err := NewMsgMongo(cli.Database("openim_v3"))
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
ts := time.Now().Add(-time.Hour * 24 * 5).UnixMilli()
|
||||
t.Log(ts)
|
||||
req := &msg.SearchMessageReq{
|
||||
//SendID: "yjz",
|
||||
//RecvID: "aibot",
|
||||
Pagination: &sdkws.RequestPagination{
|
||||
PageNumber: 1,
|
||||
ShowNumber: 20,
|
||||
},
|
||||
}
|
||||
count, resp, err := msgMongo.SearchMessage(ctx, req)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
t.Log(resp, count)
|
||||
}
|
||||
|
||||
@@ -120,15 +120,11 @@ func (m *MsgDocModel) GetDocID(conversationID string, seq int64) string {
|
||||
|
||||
func (m *MsgDocModel) GetDocIDSeqsMap(conversationID string, seqs []int64) map[string][]int64 {
|
||||
t := make(map[string][]int64)
|
||||
for i := 0; i < len(seqs); i++ {
|
||||
docID := m.GetDocID(conversationID, seqs[i])
|
||||
if value, ok := t[docID]; !ok {
|
||||
var temp []int64
|
||||
t[docID] = append(temp, seqs[i])
|
||||
} else {
|
||||
t[docID] = append(value, seqs[i])
|
||||
}
|
||||
for _, seq := range seqs {
|
||||
docID := m.GetDocID(conversationID, seq)
|
||||
t[docID] = append(t[docID], seq)
|
||||
}
|
||||
|
||||
return t
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user