feat: add a function for business info change to update related conve… (#3225)

* feat: add a function for business info change to update related conversation's ex info.

* feat: add a function for business info change to update related conversation's ex info.

* feat: add a function for business info change to update related conversation's ex info.

* feat: add a function for business info change to update related conversation's ex info.
This commit is contained in:
OpenIM-Gordon
2025-03-21 15:10:31 +08:00
committed by GitHub
parent b969827b9a
commit 11044eac58
17 changed files with 95 additions and 284 deletions
@@ -46,6 +46,9 @@ type ConversationDatabase interface {
// SetUsersConversationFieldTx updates a specific field for multiple users' conversations, creating new conversations if they do not exist, or updates them otherwise. This operation is
// transactional.
SetUsersConversationFieldTx(ctx context.Context, userIDs []string, conversation *relationtb.Conversation, fieldMap map[string]any) error
// UpdateUserConversations updates all conversations related to a specified user.
// This function does NOT update the user's own conversations but rather the conversations where this user is involved (e.g., other users' conversations referencing this user).
UpdateUserConversations(ctx context.Context, userID string, args map[string]any) error
// CreateGroupChatConversation creates a group chat conversation for the specified group ID and user IDs.
CreateGroupChatConversation(ctx context.Context, groupID string, userIDs []string, conversations *relationtb.Conversation) error
// GetConversationIDs retrieves conversation IDs for a given user.
@@ -145,6 +148,18 @@ func (c *conversationDatabase) SetUsersConversationFieldTx(ctx context.Context,
})
}
func (c *conversationDatabase) UpdateUserConversations(ctx context.Context, userID string, args map[string]any) error {
conversations, err := c.conversationDB.UpdateUserConversations(ctx, userID, args)
if err != nil {
return err
}
cache := c.cache.CloneConversationCache()
for _, conversation := range conversations {
cache = cache.DelUsersConversation(conversation.ConversationID, conversation.OwnerUserID).DelConversationVersionUserIDs(conversation.OwnerUserID)
}
return cache.ChainExecDel(ctx)
}
func (c *conversationDatabase) UpdateUsersConversationField(ctx context.Context, userIDs []string, conversationID string, args map[string]any) error {
_, err := c.conversationDB.UpdateByMap(ctx, userIDs, conversationID, args)
if err != nil {
@@ -1,34 +0,0 @@
package controller
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"time"
)
type StreamMsgDatabase interface {
CreateStreamMsg(ctx context.Context, model *model.StreamMsg) error
AppendStreamMsg(ctx context.Context, clientMsgID string, startIndex int, packets []string, end bool, deadlineTime time.Time) error
GetStreamMsg(ctx context.Context, clientMsgID string) (*model.StreamMsg, error)
}
func NewStreamMsgDatabase(db database.StreamMsg) StreamMsgDatabase {
return &streamMsgDatabase{db: db}
}
type streamMsgDatabase struct {
db database.StreamMsg
}
func (m *streamMsgDatabase) CreateStreamMsg(ctx context.Context, model *model.StreamMsg) error {
return m.db.CreateStreamMsg(ctx, model)
}
func (m *streamMsgDatabase) AppendStreamMsg(ctx context.Context, clientMsgID string, startIndex int, packets []string, end bool, deadlineTime time.Time) error {
return m.db.AppendStreamMsg(ctx, clientMsgID, startIndex, packets, end, deadlineTime)
}
func (m *streamMsgDatabase) GetStreamMsg(ctx context.Context, clientMsgID string) (*model.StreamMsg, error) {
return m.db.GetStreamMsg(ctx, clientMsgID)
}
@@ -24,6 +24,7 @@ import (
type Conversation interface {
Create(ctx context.Context, conversations []*model.Conversation) (err error)
UpdateByMap(ctx context.Context, userIDs []string, conversationID string, args map[string]any) (rows int64, err error)
UpdateUserConversations(ctx context.Context, userID string, args map[string]any) ([]*model.Conversation, error)
Update(ctx context.Context, conversation *model.Conversation) (err error)
Find(ctx context.Context, ownerUserID string, conversationIDs []string) (conversations []*model.Conversation, err error)
FindUserID(ctx context.Context, userIDs []string, conversationIDs []string) ([]string, error)
@@ -21,23 +21,32 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"github.com/openimsdk/protocol/constant"
"github.com/openimsdk/tools/db/mongoutil"
"github.com/openimsdk/tools/db/pagination"
"github.com/openimsdk/tools/errs"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)
func NewConversationMongo(db *mongo.Database) (*ConversationMgo, error) {
coll := db.Collection(database.ConversationName)
_, err := coll.Indexes().CreateOne(context.Background(), mongo.IndexModel{
Keys: bson.D{
{Key: "owner_user_id", Value: 1},
{Key: "conversation_id", Value: 1},
_, err := coll.Indexes().CreateMany(context.Background(), []mongo.IndexModel{
{
Keys: bson.D{
{Key: "owner_user_id", Value: 1},
{Key: "conversation_id", Value: 1},
},
Options: options.Index().SetUnique(true),
},
{
Keys: bson.D{
{Key: "user_id", Value: 1},
},
Options: options.Index(),
},
Options: options.Index().SetUnique(true),
})
if err != nil {
return nil, errs.Wrap(err)
@@ -101,6 +110,38 @@ func (c *ConversationMgo) UpdateByMap(ctx context.Context, userIDs []string, con
return rows, nil
}
func (c *ConversationMgo) UpdateUserConversations(ctx context.Context, userID string, args map[string]any) ([]*model.Conversation, error) {
if len(args) == 0 {
return nil, nil
}
filter := bson.M{
"user_id": userID,
}
conversations, err := mongoutil.Find[*model.Conversation](ctx, c.coll, filter, options.Find().SetProjection(bson.M{"_id": 0, "owner_user_id": 1, "conversation_id": 1}))
if err != nil {
return nil, err
}
err = mongoutil.IncrVersion(func() error {
_, err := mongoutil.UpdateMany(ctx, c.coll, filter, bson.M{"$set": args})
if err != nil {
return err
}
return nil
}, func() error {
for _, conversation := range conversations {
if err := c.version.IncrVersion(ctx, conversation.OwnerUserID, []string{conversation.ConversationID}, model.VersionStateUpdate); err != nil {
return err
}
}
return nil
})
if err != nil {
return nil, err
}
return conversations, nil
}
func (c *ConversationMgo) Update(ctx context.Context, conversation *model.Conversation) (err error) {
return mongoutil.IncrVersion(func() error {
return mongoutil.UpdateOne(ctx, c.coll, bson.M{"owner_user_id": conversation.OwnerUserID, "conversation_id": conversation.ConversationID}, bson.M{"$set": conversation}, true)
@@ -1,60 +0,0 @@
package mgo
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/tools/db/mongoutil"
"github.com/openimsdk/tools/errs"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"time"
)
func NewStreamMsgMongo(db *mongo.Database) (*StreamMsgMongo, error) {
coll := db.Collection(database.StreamMsgName)
_, err := coll.Indexes().CreateOne(context.Background(), mongo.IndexModel{
Keys: bson.D{
{Key: "client_msg_id", Value: 1},
},
Options: options.Index().SetUnique(true),
})
if err != nil {
return nil, errs.Wrap(err)
}
return &StreamMsgMongo{coll: coll}, nil
}
type StreamMsgMongo struct {
coll *mongo.Collection
}
func (m *StreamMsgMongo) CreateStreamMsg(ctx context.Context, val *model.StreamMsg) error {
if val.Packets == nil {
val.Packets = []string{}
}
return mongoutil.InsertMany(ctx, m.coll, []*model.StreamMsg{val})
}
func (m *StreamMsgMongo) AppendStreamMsg(ctx context.Context, clientMsgID string, startIndex int, packets []string, end bool, deadlineTime time.Time) error {
update := bson.M{
"$set": bson.M{
"end": end,
"deadline_time": deadlineTime,
},
}
if len(packets) > 0 {
update["$push"] = bson.M{
"packets": bson.M{
"$each": packets,
"$position": startIndex,
},
}
}
return mongoutil.UpdateOne(ctx, m.coll, bson.M{"client_msg_id": clientMsgID, "end": false}, update, true)
}
func (m *StreamMsgMongo) GetStreamMsg(ctx context.Context, clientMsgID string) (*model.StreamMsg, error) {
return mongoutil.FindOne[*model.StreamMsg](ctx, m.coll, bson.M{"client_msg_id": clientMsgID})
}
-13
View File
@@ -1,13 +0,0 @@
package database
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"time"
)
type StreamMsg interface {
CreateStreamMsg(ctx context.Context, model *model.StreamMsg) error
AppendStreamMsg(ctx context.Context, clientMsgID string, startIndex int, packets []string, end bool, deadlineTime time.Time) error
GetStreamMsg(ctx context.Context, clientMsgID string) (*model.StreamMsg, error)
}
-21
View File
@@ -1,21 +0,0 @@
package model
import (
"time"
)
const (
StreamMsgStatusWait = 0
StreamMsgStatusDone = 1
StreamMsgStatusFail = 2
)
type StreamMsg struct {
ClientMsgID string `bson:"client_msg_id"`
ConversationID string `bson:"conversation_id"`
UserID string `bson:"user_id"`
Packets []string `bson:"packets"`
End bool `bson:"end"`
CreateTime time.Time `bson:"create_time"`
DeadlineTime time.Time `bson:"deadline_time"`
}