diff --git a/internal/rpc/msg/extend_msg.go b/internal/rpc/msg/extend_msg.go index ea0f02088..47afbbdaa 100644 --- a/internal/rpc/msg/extend_msg.go +++ b/internal/rpc/msg/extend_msg.go @@ -35,7 +35,7 @@ func (rpc *rpcChat) SetMessageReactionExtensions(ctx context.Context, req *msg.S log.Debug(req.OperationID, "redis handle firstly", req.String()) rResp.MsgFirstModifyTime = utils.GetCurrentTimestampByMill() for k, v := range req.ReactionExtensionList { - err := lockMessageTypeKey(req.ClientMsgID, k) + err := rpc.dMessageLocker.LockMessageTypeKey(req.ClientMsgID, k) if err != nil { setKeyResultInfo(&rResp, 100, err.Error(), req.ClientMsgID, k, v) continue @@ -54,14 +54,40 @@ func (rpc *rpcChat) SetMessageReactionExtensions(ctx context.Context, req *msg.S log.Error(req.OperationID, "SetMessageReactionExpire err:", err.Error(), req.String()) } } else { - //mongo处理 + for k, v := range req.ReactionExtensionList { + err := rpc.dMessageLocker.LockMessageTypeKey(req.ClientMsgID, k) + if err != nil { + setKeyResultInfo(&rResp, 100, err.Error(), req.ClientMsgID, k, v) + continue + } + redisValue, err := db.DB.GetMessageTypeKeyValue(req.ClientMsgID, req.SessionType, k) + if err != nil && err != go_redis.Nil { + setKeyResultInfo(&rResp, 200, err.Error(), req.ClientMsgID, k, v) + continue + } + temp := new(server_api_params.KeyValue) + utils.JsonStringToStruct(redisValue, temp) + if v.LatestUpdateTime != temp.LatestUpdateTime { + setKeyResultInfo(&rResp, 300, "message have update", req.ClientMsgID, k, temp) + continue + } else { + v.LatestUpdateTime = utils.GetCurrentTimestampByMill() + newerr := db.DB.SetMessageTypeKeyValue(req.ClientMsgID, req.SessionType, k, utils.StructToJsonString(v)) + if newerr != nil { + setKeyResultInfo(&rResp, 201, newerr.Error(), req.ClientMsgID, k, temp) + continue + } + setKeyResultInfo(&rResp, 0, "", req.ClientMsgID, k, v) + } + + } } } else { log.Debug(req.OperationID, "redis handle secondly", req.String()) for k, v := range req.ReactionExtensionList { - err := lockMessageTypeKey(req.ClientMsgID, k) + err := rpc.dMessageLocker.LockMessageTypeKey(req.ClientMsgID, k) if err != nil { setKeyResultInfo(&rResp, 100, err.Error(), req.ClientMsgID, k, v) continue @@ -178,7 +204,7 @@ func (rpc *rpcChat) DeleteMessageReactionExtensions(ctx context.Context, req *ms if isExists { log.Debug(req.OperationID, "redis handle this delete", req.String()) for _, v := range req.ReactionExtensionList { - err := lockMessageTypeKey(req.ClientMsgID, v.TypeKey) + err := rpc.dMessageLocker.LockMessageTypeKey(req.ClientMsgID, v.TypeKey) if err != nil { setDeleteKeyResultInfo(&rResp, 100, err.Error(), req.ClientMsgID, v.TypeKey, v) continue @@ -210,16 +236,3 @@ func (rpc *rpcChat) DeleteMessageReactionExtensions(ctx context.Context, req *ms log.Debug(req.OperationID, utils.GetSelfFuncName(), "rpc return is:", rResp.String()) return &rResp, nil } -func lockMessageTypeKey(clientMsgID, typeKey string) (err error) { - for i := 0; i < 3; i++ { - err = db.DB.LockMessageTypeKey(clientMsgID, typeKey) - if err != nil { - time.Sleep(time.Millisecond * 100) - continue - } else { - break - } - } - return err - -} diff --git a/internal/rpc/msg/lock.go b/internal/rpc/msg/lock.go new file mode 100644 index 000000000..4f40b62a0 --- /dev/null +++ b/internal/rpc/msg/lock.go @@ -0,0 +1,32 @@ +package msg + +import ( + "Open_IM/pkg/common/db" + "time" +) + +type MessageLocker interface { + LockMessageTypeKey(clientMsgID, typeKey string) (err error) + UnLockMessageTypeKey(clientMsgID string, typeKey string) error +} +type LockerMessage struct{} + +func NewLockerMessage() *LockerMessage { + return &LockerMessage{} +} +func (l *LockerMessage) LockMessageTypeKey(clientMsgID, typeKey string) (err error) { + for i := 0; i < 3; i++ { + err = db.DB.LockMessageTypeKey(clientMsgID, typeKey) + if err != nil { + time.Sleep(time.Millisecond * 100) + continue + } else { + break + } + } + return err + +} +func (l *LockerMessage) UnLockMessageTypeKey(clientMsgID string, typeKey string) error { + return db.DB.UnLockMessageTypeKey(clientMsgID, typeKey) +} diff --git a/internal/rpc/msg/rpcChat.go b/internal/rpc/msg/rpcChat.go index f1329d47a..470325deb 100644 --- a/internal/rpc/msg/rpcChat.go +++ b/internal/rpc/msg/rpcChat.go @@ -31,7 +31,8 @@ type rpcChat struct { etcdAddr []string messageWriter MessageWriter //offlineProducer *kafka.Producer - delMsgCh chan deleteMsg + delMsgCh chan deleteMsg + dMessageLocker MessageLocker } type deleteMsg struct { @@ -48,6 +49,7 @@ func NewRpcChatServer(port int) *rpcChat { rpcRegisterName: config.Config.RpcRegisterName.OpenImMsgName, etcdSchema: config.Config.Etcd.EtcdSchema, etcdAddr: config.Config.Etcd.EtcdAddr, + dMessageLocker: NewLockerMessage(), } rc.messageWriter = kafka.NewKafkaProducer(config.Config.Kafka.Ws2mschat.Addr, config.Config.Kafka.Ws2mschat.Topic) //rc.offlineProducer = kafka.NewKafkaProducer(config.Config.Kafka.Ws2mschatOffline.Addr, config.Config.Kafka.Ws2mschatOffline.Topic)