Merge remote-tracking branch 'origin/errcode' into errcode

# Conflicts:
#	internal/rpc/msg/delete.go
#	internal/rpc/msg/send_pull.go
#	pkg/common/db/cache/msg.go
#	pkg/common/db/cache/redis.go
#	pkg/common/db/cache/redis_test.go
This commit is contained in:
withchao
2023-02-24 13:55:53 +08:00
212 changed files with 1788 additions and 2590 deletions
+1 -1
View File
@@ -1,7 +1,7 @@
package apistruct
import (
sdkws "Open_IM/pkg/proto/sdkws"
sdkws "OpenIM/pkg/proto/sdkws"
)
type CommResp struct {
+2 -2
View File
@@ -1,8 +1,8 @@
package apistruct
import (
pbRelay "Open_IM/pkg/proto/relay"
sdkws "Open_IM/pkg/proto/sdkws"
pbRelay "OpenIM/pkg/proto/relay"
sdkws "OpenIM/pkg/proto/sdkws"
)
type DeleteUsersReq struct {
+2 -2
View File
@@ -1,8 +1,8 @@
package apistruct
import (
"Open_IM/pkg/proto/msg"
sdkws "Open_IM/pkg/proto/sdkws"
"OpenIM/pkg/proto/msg"
sdkws "OpenIM/pkg/proto/sdkws"
)
type DelMsgReq struct {
+1 -1
View File
@@ -1,7 +1,7 @@
package apistruct
import (
sdkws "Open_IM/pkg/proto/sdkws"
sdkws "OpenIM/pkg/proto/sdkws"
)
type GetUsersInfoReq struct {
+1 -1
View File
@@ -1,7 +1,7 @@
package callbackstruct
import (
"Open_IM/pkg/common/constant"
"OpenIM/pkg/common/constant"
"fmt"
)
+2 -2
View File
@@ -1,8 +1,8 @@
package callbackstruct
import (
"Open_IM/pkg/apistruct"
common "Open_IM/pkg/proto/sdkws"
"OpenIM/pkg/apistruct"
common "OpenIM/pkg/proto/sdkws"
)
type CallbackCommand string
+2 -2
View File
@@ -1,8 +1,8 @@
package callbackstruct
import (
"Open_IM/pkg/proto/msg"
sdkws "Open_IM/pkg/proto/sdkws"
"OpenIM/pkg/proto/msg"
sdkws "OpenIM/pkg/proto/sdkws"
)
type CallbackBeforeSendSingleMsgReq struct {
+1 -1
View File
@@ -1,6 +1,6 @@
package callbackstruct
import common "Open_IM/pkg/proto/sdkws"
import common "OpenIM/pkg/proto/sdkws"
type CallbackBeforePushReq struct {
UserStatusBatchCallbackReq
+217 -328
View File
@@ -16,9 +16,8 @@ var (
Root = filepath.Join(filepath.Dir(b), "../../..")
)
const ConfName = "openIMConf"
var Config config
var NotificationConfig notificationConfig
type CallBackConfig struct {
Enable bool `yaml:"enable"`
@@ -26,6 +25,21 @@ type CallBackConfig struct {
CallbackFailedContinue *bool `yaml:"callbackFailedContinue"`
}
type PConversation struct {
ReliabilityLevel int `yaml:"reliabilityLevel"`
UnreadCount bool `yaml:"unreadCount"`
}
type POfflinePush struct {
PushSwitch bool `yaml:"switch"`
Title string `yaml:"title"`
Desc string `yaml:"desc"`
Ext string `yaml:"ext"`
}
type PDefaultTips struct {
Tips string `yaml:"tips"`
}
type config struct {
ServerIP string `yaml:"serverip"`
@@ -263,7 +277,6 @@ type config struct {
BadgeCount bool `yaml:"badgeCount"`
Production bool `yaml:"production"`
}
Callback struct {
CallbackUrl string `yaml:"callbackUrl"`
CallbackBeforeSendSingleMsg CallBackConfig `yaml:"callbackBeforeSendSingleMsg"`
@@ -282,191 +295,8 @@ type config struct {
CallbackBeforeMemberJoinGroup CallBackConfig `yaml:"callbackBeforeMemberJoinGroup"`
CallbackBeforeSetGroupMemberInfo CallBackConfig `yaml:"callbackBeforeSetGroupMemberInfo"`
} `yaml:"callback"`
Notification struct {
///////////////////////group/////////////////////////////
GroupCreated struct {
Conversation PConversation `yaml:"conversation"`
OfflinePush POfflinePush `yaml:"offlinePush"`
DefaultTips PDefaultTips `yaml:"defaultTips"`
} `yaml:"groupCreated"`
GroupInfoSet struct {
Conversation PConversation `yaml:"conversation"`
OfflinePush POfflinePush `yaml:"offlinePush"`
DefaultTips PDefaultTips `yaml:"defaultTips"`
} `yaml:"groupInfoSet"`
JoinGroupApplication struct {
Conversation PConversation `yaml:"conversation"`
OfflinePush POfflinePush `yaml:"offlinePush"`
DefaultTips PDefaultTips `yaml:"defaultTips"`
} `yaml:"joinGroupApplication"`
MemberQuit struct {
Conversation PConversation `yaml:"conversation"`
OfflinePush POfflinePush `yaml:"offlinePush"`
DefaultTips PDefaultTips `yaml:"defaultTips"`
} `yaml:"memberQuit"`
GroupApplicationAccepted struct {
Conversation PConversation `yaml:"conversation"`
OfflinePush POfflinePush `yaml:"offlinePush"`
DefaultTips PDefaultTips `yaml:"defaultTips"`
} `yaml:"groupApplicationAccepted"`
GroupApplicationRejected struct {
Conversation PConversation `yaml:"conversation"`
OfflinePush POfflinePush `yaml:"offlinePush"`
DefaultTips PDefaultTips `yaml:"defaultTips"`
} `yaml:"groupApplicationRejected"`
GroupOwnerTransferred struct {
Conversation PConversation `yaml:"conversation"`
OfflinePush POfflinePush `yaml:"offlinePush"`
DefaultTips PDefaultTips `yaml:"defaultTips"`
} `yaml:"groupOwnerTransferred"`
MemberKicked struct {
Conversation PConversation `yaml:"conversation"`
OfflinePush POfflinePush `yaml:"offlinePush"`
DefaultTips PDefaultTips `yaml:"defaultTips"`
} `yaml:"memberKicked"`
MemberInvited struct {
Conversation PConversation `yaml:"conversation"`
OfflinePush POfflinePush `yaml:"offlinePush"`
DefaultTips PDefaultTips `yaml:"defaultTips"`
} `yaml:"memberInvited"`
MemberEnter struct {
Conversation PConversation `yaml:"conversation"`
OfflinePush POfflinePush `yaml:"offlinePush"`
DefaultTips PDefaultTips `yaml:"defaultTips"`
} `yaml:"memberEnter"`
GroupDismissed struct {
Conversation PConversation `yaml:"conversation"`
OfflinePush POfflinePush `yaml:"offlinePush"`
DefaultTips PDefaultTips `yaml:"defaultTips"`
} `yaml:"groupDismissed"`
GroupMuted struct {
Conversation PConversation `yaml:"conversation"`
OfflinePush POfflinePush `yaml:"offlinePush"`
DefaultTips PDefaultTips `yaml:"defaultTips"`
} `yaml:"groupMuted"`
GroupCancelMuted struct {
Conversation PConversation `yaml:"conversation"`
OfflinePush POfflinePush `yaml:"offlinePush"`
DefaultTips PDefaultTips `yaml:"defaultTips"`
} `yaml:"groupCancelMuted"`
GroupMemberMuted struct {
Conversation PConversation `yaml:"conversation"`
OfflinePush POfflinePush `yaml:"offlinePush"`
DefaultTips PDefaultTips `yaml:"defaultTips"`
} `yaml:"groupMemberMuted"`
GroupMemberCancelMuted struct {
Conversation PConversation `yaml:"conversation"`
OfflinePush POfflinePush `yaml:"offlinePush"`
DefaultTips PDefaultTips `yaml:"defaultTips"`
} `yaml:"groupMemberCancelMuted"`
GroupMemberInfoSet struct {
Conversation PConversation `yaml:"conversation"`
OfflinePush POfflinePush `yaml:"offlinePush"`
DefaultTips PDefaultTips `yaml:"defaultTips"`
} `yaml:"groupMemberInfoSet"`
GroupMemberSetToAdmin struct {
Conversation PConversation `yaml:"conversation"`
OfflinePush POfflinePush `yaml:"offlinePush"`
DefaultTips PDefaultTips `yaml:"defaultTips"`
} `yaml:"groupMemberSetToAdmin"`
GroupMemberSetToOrdinary struct {
Conversation PConversation `yaml:"conversation"`
OfflinePush POfflinePush `yaml:"offlinePush"`
DefaultTips PDefaultTips `yaml:"defaultTips"`
} `yaml:"groupMemberSetToOrdinaryUser"`
////////////////////////user///////////////////////
UserInfoUpdated struct {
Conversation PConversation `yaml:"conversation"`
OfflinePush POfflinePush `yaml:"offlinePush"`
DefaultTips PDefaultTips `yaml:"defaultTips"`
} `yaml:"userInfoUpdated"`
//////////////////////friend///////////////////////
FriendApplication struct {
Conversation PConversation `yaml:"conversation"`
OfflinePush POfflinePush `yaml:"offlinePush"`
DefaultTips PDefaultTips `yaml:"defaultTips"`
} `yaml:"friendApplicationAdded"`
FriendApplicationApproved struct {
Conversation PConversation `yaml:"conversation"`
OfflinePush POfflinePush `yaml:"offlinePush"`
DefaultTips PDefaultTips `yaml:"defaultTips"`
} `yaml:"friendApplicationApproved"`
FriendApplicationRejected struct {
Conversation PConversation `yaml:"conversation"`
OfflinePush POfflinePush `yaml:"offlinePush"`
DefaultTips PDefaultTips `yaml:"defaultTips"`
} `yaml:"friendApplicationRejected"`
FriendAdded struct {
Conversation PConversation `yaml:"conversation"`
OfflinePush POfflinePush `yaml:"offlinePush"`
DefaultTips PDefaultTips `yaml:"defaultTips"`
} `yaml:"friendAdded"`
FriendDeleted struct {
Conversation PConversation `yaml:"conversation"`
OfflinePush POfflinePush `yaml:"offlinePush"`
DefaultTips PDefaultTips `yaml:"defaultTips"`
} `yaml:"friendDeleted"`
FriendRemarkSet struct {
Conversation PConversation `yaml:"conversation"`
OfflinePush POfflinePush `yaml:"offlinePush"`
DefaultTips PDefaultTips `yaml:"defaultTips"`
} `yaml:"friendRemarkSet"`
BlackAdded struct {
Conversation PConversation `yaml:"conversation"`
OfflinePush POfflinePush `yaml:"offlinePush"`
DefaultTips PDefaultTips `yaml:"defaultTips"`
} `yaml:"blackAdded"`
BlackDeleted struct {
Conversation PConversation `yaml:"conversation"`
OfflinePush POfflinePush `yaml:"offlinePush"`
DefaultTips PDefaultTips `yaml:"defaultTips"`
} `yaml:"blackDeleted"`
FriendInfoUpdated struct {
Conversation PConversation `yaml:"conversation"`
OfflinePush POfflinePush `yaml:"offlinePush"`
DefaultTips PDefaultTips `yaml:"defaultTips"`
} `yaml:"friendInfoUpdated"`
ConversationOptUpdate struct {
Conversation PConversation `yaml:"conversation"`
OfflinePush POfflinePush `yaml:"offlinePush"`
DefaultTips PDefaultTips `yaml:"defaultTips"`
} `yaml:"conversationOptUpdate"`
ConversationSetPrivate struct {
Conversation PConversation `yaml:"conversation"`
OfflinePush POfflinePush `yaml:"offlinePush"`
DefaultTips struct {
OpenTips string `yaml:"openTips"`
CloseTips string `yaml:"closeTips"`
} `yaml:"defaultTips"`
} `yaml:"conversationSetPrivate"`
Signal struct {
OfflinePush struct {
Title string `yaml:"title"`
} `yaml:"offlinePush"`
} `yaml:"signal"`
}
Rtc struct {
Notification Notification `yaml:"notification"`
Rtc struct {
SignalTimeout string `yaml:"signalTimeout"`
} `yaml:"rtc"`
@@ -484,88 +314,207 @@ type config struct {
MessageTransferPrometheusPort []int `yaml:"messageTransferPrometheusPort"`
} `yaml:"prometheus"`
}
type PConversation struct {
ReliabilityLevel int `yaml:"reliabilityLevel"`
UnreadCount bool `yaml:"unreadCount"`
type notificationConfig struct {
Notification Notification `yaml:"notification"`
}
type POfflinePush struct {
PushSwitch bool `yaml:"switch"`
Title string `yaml:"title"`
Desc string `yaml:"desc"`
Ext string `yaml:"ext"`
}
type PDefaultTips struct {
Tips string `yaml:"tips"`
type Notification struct {
GroupCreated struct {
Conversation PConversation `yaml:"conversation"`
OfflinePush POfflinePush `yaml:"offlinePush"`
DefaultTips PDefaultTips `yaml:"defaultTips"`
} `yaml:"groupCreated"`
GroupInfoSet struct {
Conversation PConversation `yaml:"conversation"`
OfflinePush POfflinePush `yaml:"offlinePush"`
DefaultTips PDefaultTips `yaml:"defaultTips"`
} `yaml:"groupInfoSet"`
JoinGroupApplication struct {
Conversation PConversation `yaml:"conversation"`
OfflinePush POfflinePush `yaml:"offlinePush"`
DefaultTips PDefaultTips `yaml:"defaultTips"`
} `yaml:"joinGroupApplication"`
MemberQuit struct {
Conversation PConversation `yaml:"conversation"`
OfflinePush POfflinePush `yaml:"offlinePush"`
DefaultTips PDefaultTips `yaml:"defaultTips"`
} `yaml:"memberQuit"`
GroupApplicationAccepted struct {
Conversation PConversation `yaml:"conversation"`
OfflinePush POfflinePush `yaml:"offlinePush"`
DefaultTips PDefaultTips `yaml:"defaultTips"`
} `yaml:"groupApplicationAccepted"`
GroupApplicationRejected struct {
Conversation PConversation `yaml:"conversation"`
OfflinePush POfflinePush `yaml:"offlinePush"`
DefaultTips PDefaultTips `yaml:"defaultTips"`
} `yaml:"groupApplicationRejected"`
GroupOwnerTransferred struct {
Conversation PConversation `yaml:"conversation"`
OfflinePush POfflinePush `yaml:"offlinePush"`
DefaultTips PDefaultTips `yaml:"defaultTips"`
} `yaml:"groupOwnerTransferred"`
MemberKicked struct {
Conversation PConversation `yaml:"conversation"`
OfflinePush POfflinePush `yaml:"offlinePush"`
DefaultTips PDefaultTips `yaml:"defaultTips"`
} `yaml:"memberKicked"`
MemberInvited struct {
Conversation PConversation `yaml:"conversation"`
OfflinePush POfflinePush `yaml:"offlinePush"`
DefaultTips PDefaultTips `yaml:"defaultTips"`
} `yaml:"memberInvited"`
MemberEnter struct {
Conversation PConversation `yaml:"conversation"`
OfflinePush POfflinePush `yaml:"offlinePush"`
DefaultTips PDefaultTips `yaml:"defaultTips"`
} `yaml:"memberEnter"`
GroupDismissed struct {
Conversation PConversation `yaml:"conversation"`
OfflinePush POfflinePush `yaml:"offlinePush"`
DefaultTips PDefaultTips `yaml:"defaultTips"`
} `yaml:"groupDismissed"`
GroupMuted struct {
Conversation PConversation `yaml:"conversation"`
OfflinePush POfflinePush `yaml:"offlinePush"`
DefaultTips PDefaultTips `yaml:"defaultTips"`
} `yaml:"groupMuted"`
GroupCancelMuted struct {
Conversation PConversation `yaml:"conversation"`
OfflinePush POfflinePush `yaml:"offlinePush"`
DefaultTips PDefaultTips `yaml:"defaultTips"`
} `yaml:"groupCancelMuted"`
GroupMemberMuted struct {
Conversation PConversation `yaml:"conversation"`
OfflinePush POfflinePush `yaml:"offlinePush"`
DefaultTips PDefaultTips `yaml:"defaultTips"`
} `yaml:"groupMemberMuted"`
GroupMemberCancelMuted struct {
Conversation PConversation `yaml:"conversation"`
OfflinePush POfflinePush `yaml:"offlinePush"`
DefaultTips PDefaultTips `yaml:"defaultTips"`
} `yaml:"groupMemberCancelMuted"`
GroupMemberInfoSet struct {
Conversation PConversation `yaml:"conversation"`
OfflinePush POfflinePush `yaml:"offlinePush"`
DefaultTips PDefaultTips `yaml:"defaultTips"`
} `yaml:"groupMemberInfoSet"`
GroupMemberSetToAdmin struct {
Conversation PConversation `yaml:"conversation"`
OfflinePush POfflinePush `yaml:"offlinePush"`
DefaultTips PDefaultTips `yaml:"defaultTips"`
} `yaml:"groupMemberSetToAdmin"`
GroupMemberSetToOrdinary struct {
Conversation PConversation `yaml:"conversation"`
OfflinePush POfflinePush `yaml:"offlinePush"`
DefaultTips PDefaultTips `yaml:"defaultTips"`
} `yaml:"groupMemberSetToOrdinaryUser"`
////////////////////////user///////////////////////
UserInfoUpdated struct {
Conversation PConversation `yaml:"conversation"`
OfflinePush POfflinePush `yaml:"offlinePush"`
DefaultTips PDefaultTips `yaml:"defaultTips"`
} `yaml:"userInfoUpdated"`
//////////////////////friend///////////////////////
FriendApplication struct {
Conversation PConversation `yaml:"conversation"`
OfflinePush POfflinePush `yaml:"offlinePush"`
DefaultTips PDefaultTips `yaml:"defaultTips"`
} `yaml:"friendApplicationAdded"`
FriendApplicationApproved struct {
Conversation PConversation `yaml:"conversation"`
OfflinePush POfflinePush `yaml:"offlinePush"`
DefaultTips PDefaultTips `yaml:"defaultTips"`
} `yaml:"friendApplicationApproved"`
FriendApplicationRejected struct {
Conversation PConversation `yaml:"conversation"`
OfflinePush POfflinePush `yaml:"offlinePush"`
DefaultTips PDefaultTips `yaml:"defaultTips"`
} `yaml:"friendApplicationRejected"`
FriendAdded struct {
Conversation PConversation `yaml:"conversation"`
OfflinePush POfflinePush `yaml:"offlinePush"`
DefaultTips PDefaultTips `yaml:"defaultTips"`
} `yaml:"friendAdded"`
FriendDeleted struct {
Conversation PConversation `yaml:"conversation"`
OfflinePush POfflinePush `yaml:"offlinePush"`
DefaultTips PDefaultTips `yaml:"defaultTips"`
} `yaml:"friendDeleted"`
FriendRemarkSet struct {
Conversation PConversation `yaml:"conversation"`
OfflinePush POfflinePush `yaml:"offlinePush"`
DefaultTips PDefaultTips `yaml:"defaultTips"`
} `yaml:"friendRemarkSet"`
BlackAdded struct {
Conversation PConversation `yaml:"conversation"`
OfflinePush POfflinePush `yaml:"offlinePush"`
DefaultTips PDefaultTips `yaml:"defaultTips"`
} `yaml:"blackAdded"`
BlackDeleted struct {
Conversation PConversation `yaml:"conversation"`
OfflinePush POfflinePush `yaml:"offlinePush"`
DefaultTips PDefaultTips `yaml:"defaultTips"`
} `yaml:"blackDeleted"`
FriendInfoUpdated struct {
Conversation PConversation `yaml:"conversation"`
OfflinePush POfflinePush `yaml:"offlinePush"`
DefaultTips PDefaultTips `yaml:"defaultTips"`
} `yaml:"friendInfoUpdated"`
ConversationOptUpdate struct {
Conversation PConversation `yaml:"conversation"`
OfflinePush POfflinePush `yaml:"offlinePush"`
DefaultTips PDefaultTips `yaml:"defaultTips"`
} `yaml:"conversationOptUpdate"`
ConversationSetPrivate struct {
Conversation PConversation `yaml:"conversation"`
OfflinePush POfflinePush `yaml:"offlinePush"`
DefaultTips struct {
OpenTips string `yaml:"openTips"`
CloseTips string `yaml:"closeTips"`
} `yaml:"defaultTips"`
} `yaml:"conversationSetPrivate"`
Signal struct {
OfflinePush struct {
Title string `yaml:"title"`
} `yaml:"offlinePush"`
} `yaml:"signal"`
}
type usualConfig struct {
Etcd struct {
UserName string `yaml:"userName"`
Password string `yaml:"password"`
Secret string `yaml:"secret"`
} `yaml:"etcd"`
Mysql struct {
DBUserName string `yaml:"dbMysqlUserName"`
DBPassword string `yaml:"dbMysqlPassword"`
} `yaml:"mysql"`
Mongo struct {
DBUserName string `yaml:"dbUserName"`
DBPassword string `yaml:"dbPassword"`
} `yaml:"mongo"`
Redis struct {
DBUserName string `yaml:"dbUserName"`
DBPassword string `yaml:"dbPassWord"`
} `yaml:"redis"`
Kafka struct {
SASLUserName string `yaml:"SASLUserName"`
SASLPassword string `yaml:"SASLPassword"`
} `yaml:"kafka"`
Credential struct {
Minio struct {
AccessKeyID string `yaml:"accessKeyID"`
SecretAccessKey string `yaml:"secretAccessKey"`
Endpoint string `yaml:"endpoint"`
} `yaml:"minio"`
} `yaml:"credential"`
Secret string `yaml:"secret"`
Tokenpolicy struct {
AccessSecret string `yaml:"accessSecret"`
AccessExpire int64 `yaml:"accessExpire"`
} `yaml:"tokenpolicy"`
Messageverify struct {
FriendVerify bool `yaml:"friendVerify"`
} `yaml:"messageverify"`
Push struct {
Getui struct {
PushUrl string `yaml:"pushUrl"`
MasterSecret string `yaml:"masterSecret"`
AppKey string `yaml:"appKey"`
Enable bool `yaml:"enable"`
} `yaml:"getui"`
} `yaml:"push"`
}
var UsualConfig usualConfig
func unmarshalConfig(config interface{}, configPath string) {
func unmarshalConfig(config interface{}, configPath string) error {
bytes, err := ioutil.ReadFile(configPath)
if err != nil {
panic(err.Error() + configPath)
return err
}
if err = yaml.Unmarshal(bytes, config); err != nil {
panic(err.Error())
return err
}
}
func initConfig(config interface{}, configName, configPath string) {
func initConfig(config interface{}, configName, configPath string) error {
if configPath == "" {
var env string
var env = "CONFIG_NAME"
if configName == "config.yaml" {
env = "CONFIG_NAME"
} else if configName == "usualConfig.yaml" {
@@ -584,78 +533,18 @@ func initConfig(config interface{}, configName, configPath string) {
configPath = fmt.Sprintf("../config/%s", configName)
}
}
unmarshalConfig(config, configPath)
return unmarshalConfig(config, configPath)
}
func InitConfig(configPath string) {
initConfig(&Config, "config.yaml", configPath)
initConfig(&UsualConfig, "usualConfig.yaml", configPath)
if Config.Mysql.DBUserName == "" {
Config.Mysql.DBUserName = UsualConfig.Mysql.DBUserName
func InitConfig(configPath string) error {
err := initConfig(&Config, "config.yaml", configPath)
if err != nil {
return err
}
if Config.Mysql.DBPassword == "" {
Config.Mysql.DBPassword = UsualConfig.Mysql.DBPassword
}
if Config.Redis.DBUserName == "" {
Config.Redis.DBUserName = UsualConfig.Redis.DBUserName
}
if Config.Redis.DBPassWord == "" {
Config.Redis.DBPassWord = UsualConfig.Redis.DBPassword
}
if Config.Mongo.DBUserName == "" {
Config.Mongo.DBUserName = UsualConfig.Mongo.DBUserName
}
if Config.Mongo.DBPassword == "" {
Config.Mongo.DBPassword = UsualConfig.Mongo.DBPassword
}
if Config.Kafka.SASLUserName == "" {
Config.Kafka.SASLUserName = UsualConfig.Kafka.SASLUserName
}
if Config.Kafka.SASLPassword == "" {
Config.Kafka.SASLPassword = UsualConfig.Kafka.SASLPassword
}
if Config.Credential.Minio.AccessKeyID == "" {
Config.Credential.Minio.AccessKeyID = UsualConfig.Credential.Minio.AccessKeyID
}
if Config.Credential.Minio.SecretAccessKey == "" {
Config.Credential.Minio.SecretAccessKey = UsualConfig.Credential.Minio.SecretAccessKey
}
if Config.Credential.Minio.Endpoint == "" {
Config.Credential.Minio.Endpoint = UsualConfig.Credential.Minio.Endpoint
}
if Config.MessageVerify.FriendVerify == nil {
Config.MessageVerify.FriendVerify = &UsualConfig.Messageverify.FriendVerify
}
if Config.Push.Getui.MasterSecret == "" {
Config.Push.Getui.MasterSecret = UsualConfig.Push.Getui.MasterSecret
}
if Config.Push.Getui.AppKey == "" {
Config.Push.Getui.AppKey = UsualConfig.Push.Getui.AppKey
}
if Config.Push.Getui.PushUrl == "" {
Config.Push.Getui.PushUrl = UsualConfig.Push.Getui.PushUrl
}
if Config.Push.Getui.Enable == nil {
Config.Push.Getui.Enable = &UsualConfig.Push.Getui.Enable
}
if Config.Secret == "" {
Config.Secret = UsualConfig.Secret
}
if Config.TokenPolicy.AccessExpire == 0 {
Config.TokenPolicy.AccessExpire = UsualConfig.Tokenpolicy.AccessExpire
}
if Config.TokenPolicy.AccessSecret == "" {
Config.TokenPolicy.AccessSecret = UsualConfig.Tokenpolicy.AccessSecret
err = initConfig(&NotificationConfig, "notification.yaml", configPath)
if err != nil {
return err
}
}
func init() {
InitConfig("")
Config.Notification = NotificationConfig.Notification
return nil
}
+3 -3
View File
@@ -1,9 +1,9 @@
package cache
import (
"Open_IM/pkg/common/db/relation"
"Open_IM/pkg/common/tracelog"
"Open_IM/pkg/utils"
"OpenIM/pkg/common/db/relation"
"OpenIM/pkg/common/tracelog"
"OpenIM/pkg/utils"
"context"
"github.com/dtm-labs/rockscache"
"github.com/go-redis/redis/v8"
+4 -4
View File
@@ -1,10 +1,10 @@
package cache
import (
"Open_IM/pkg/common/db/relation"
relationTb "Open_IM/pkg/common/db/table/relation"
"Open_IM/pkg/common/tracelog"
"Open_IM/pkg/utils"
"OpenIM/pkg/common/db/relation"
relationTb "OpenIM/pkg/common/db/table/relation"
"OpenIM/pkg/common/tracelog"
"OpenIM/pkg/utils"
"context"
"github.com/dtm-labs/rockscache"
"github.com/go-redis/redis/v8"
+3 -3
View File
@@ -1,9 +1,9 @@
package cache
import (
"Open_IM/pkg/common/db/table/unrelation"
"Open_IM/pkg/common/tracelog"
"Open_IM/pkg/utils"
"OpenIM/pkg/common/db/table/unrelation"
"OpenIM/pkg/common/tracelog"
"OpenIM/pkg/utils"
"context"
"github.com/dtm-labs/rockscache"
"time"
+4 -4
View File
@@ -1,10 +1,10 @@
package cache
import (
"Open_IM/pkg/common/db/relation"
relationTb "Open_IM/pkg/common/db/table/relation"
"Open_IM/pkg/common/tracelog"
"Open_IM/pkg/utils"
"OpenIM/pkg/common/db/relation"
relationTb "OpenIM/pkg/common/db/table/relation"
"OpenIM/pkg/common/tracelog"
"OpenIM/pkg/utils"
"context"
"encoding/json"
"github.com/dtm-labs/rockscache"
+4 -4
View File
@@ -1,10 +1,10 @@
package cache
import (
relationTb "Open_IM/pkg/common/db/table/relation"
unrelation2 "Open_IM/pkg/common/db/table/unrelation"
"Open_IM/pkg/common/tracelog"
"Open_IM/pkg/utils"
relationTb "OpenIM/pkg/common/db/table/relation"
unrelation2 "OpenIM/pkg/common/db/table/unrelation"
"OpenIM/pkg/common/tracelog"
"OpenIM/pkg/utils"
"context"
"github.com/dtm-labs/rockscache"
"github.com/go-redis/redis/v8"
-475
View File
@@ -1,475 +0,0 @@
package cache
import (
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/tracelog"
pbChat "Open_IM/pkg/proto/msg"
pbRtc "Open_IM/pkg/proto/rtc"
"Open_IM/pkg/proto/sdkws"
"Open_IM/pkg/utils"
"context"
"errors"
"fmt"
"github.com/golang/protobuf/jsonpb"
"github.com/golang/protobuf/proto"
"strconv"
"time"
"github.com/go-redis/redis/v8"
)
const (
userIncrSeq = "REDIS_USER_INCR_SEQ:" // user incr seq
appleDeviceToken = "DEVICE_TOKEN"
userMinSeq = "REDIS_USER_MIN_SEQ:"
getuiToken = "GETUI_TOKEN"
getuiTaskID = "GETUI_TASK_ID"
messageCache = "MESSAGE_CACHE:"
signalCache = "SIGNAL_CACHE:"
signalListCache = "SIGNAL_LIST_CACHE:"
FcmToken = "FCM_TOKEN:"
groupUserMinSeq = "GROUP_USER_MIN_SEQ:"
groupMaxSeq = "GROUP_MAX_SEQ:"
groupMinSeq = "GROUP_MIN_SEQ:"
sendMsgFailedFlag = "SEND_MSG_FAILED_FLAG:"
userBadgeUnreadCountSum = "USER_BADGE_UNREAD_COUNT_SUM:"
exTypeKeyLocker = "EX_LOCK:"
uidPidToken = "UID_PID_TOKEN_STATUS:"
SignalListCache = "SIGNAL_LIST_CACHE:"
SignalCache = "SIGNAL_CACHE:"
)
type MsgCache interface {
IncrUserSeq(ctx context.Context, userID string) (int64, error)
GetUserMaxSeq(ctx context.Context, userID string) (int64, error)
SetUserMaxSeq(ctx context.Context, userID string, maxSeq int64) error
SetUserMinSeq(ctx context.Context, userID string, minSeq int64) (err error)
GetUserMinSeq(ctx context.Context, userID string) (int64, error)
SetGroupUserMinSeq(ctx context.Context, groupID, userID string, minSeq int64) (err error)
GetGroupUserMinSeq(ctx context.Context, groupID, userID string) (int64, error)
GetGroupMaxSeq(ctx context.Context, groupID string) (int64, error)
GetGroupMinSeq(ctx context.Context, groupID string) (int64, error)
IncrGroupMaxSeq(ctx context.Context, groupID string) (int64, error)
SetGroupMaxSeq(ctx context.Context, groupID string, maxSeq int64) error
SetGroupMinSeq(ctx context.Context, groupID string, minSeq int64) error
AddTokenFlag(ctx context.Context, userID string, platformID int, token string, flag int) error
GetTokensWithoutError(ctx context.Context, userID, platformID string) (map[string]int, error)
SetTokenMapByUidPid(ctx context.Context, userID string, platformID int, m map[string]int) error
DeleteTokenByUidPid(ctx context.Context, userID string, platformID int, fields []string) error
GetMessageListBySeq(ctx context.Context, userID string, seqList []int64) (seqMsg []*sdkws.MsgData, failedSeqList []int64, err error)
SetMessageToCache(ctx context.Context, userID string, msgList []*pbChat.MsgDataToMQ) (int, error)
DeleteMessageFromCache(ctx context.Context, userID string, msgList []*pbChat.MsgDataToMQ) error
CleanUpOneUserAllMsg(ctx context.Context, userID string) error
HandleSignalInfo(ctx context.Context, msg *sdkws.MsgData, pushToUserID string) (isSend bool, err error)
GetSignalInfoFromCacheByClientMsgID(ctx context.Context, clientMsgID string) (invitationInfo *pbRtc.SignalInviteReq, err error)
GetAvailableSignalInvitationInfo(ctx context.Context, userID string) (invitationInfo *pbRtc.SignalInviteReq, err error)
DelUserSignalList(ctx context.Context, userID string) error
DelMsgFromCache(ctx context.Context, userID string, seqList []int64) error
SetGetuiToken(ctx context.Context, token string, expireTime int64) error
GetGetuiToken(ctx context.Context) (string, error)
SetGetuiTaskID(ctx context.Context, taskID string, expireTime int64) error
GetGetuiTaskID(ctx context.Context) (string, error)
SetSendMsgStatus(ctx context.Context, id string, status int32) error
GetSendMsgStatus(ctx context.Context, id string) (int32, error)
SetFcmToken(ctx context.Context, account string, platformID int, fcmToken string, expireTime int64) (err error)
GetFcmToken(ctx context.Context, account string, platformID int) (string, error)
DelFcmToken(ctx context.Context, account string, platformID int) error
IncrUserBadgeUnreadCountSum(ctx context.Context, userID string) (int, error)
SetUserBadgeUnreadCountSum(ctx context.Context, userID string, value int) error
GetUserBadgeUnreadCountSum(ctx context.Context, userID string) (int, error)
JudgeMessageReactionEXISTS(ctx context.Context, clientMsgID string, sessionType int32) (bool, error)
GetOneMessageAllReactionList(ctx context.Context, clientMsgID string, sessionType int32) (map[string]string, error)
DeleteOneMessageKey(ctx context.Context, clientMsgID string, sessionType int32, subKey string) error
SetMessageReactionExpire(ctx context.Context, clientMsgID string, sessionType int32, expiration time.Duration) (bool, error)
GetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey string) (string, error)
SetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey, value string) error
LockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error
UnLockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error
}
func NewMsgCache(client redis.UniversalClient) MsgCache {
return &msgCache{rdb: client}
}
type msgCache struct {
rdb redis.UniversalClient
}
func (m *msgCache) IncrUserSeq(ctx context.Context, userID string) (int64, error) {
return utils.Wrap2(m.rdb.Get(ctx, userIncrSeq+userID).Int64())
}
func (m *msgCache) GetUserMaxSeq(ctx context.Context, userID string) (int64, error) {
return utils.Wrap2(m.rdb.Get(ctx, userIncrSeq+userID).Int64())
}
func (m *msgCache) SetUserMaxSeq(ctx context.Context, userID string, maxSeq int64) error {
return utils.Wrap1(m.rdb.Set(ctx, userIncrSeq+userID, maxSeq, 0).Err())
}
func (m *msgCache) SetUserMinSeq(ctx context.Context, userID string, minSeq int64) (err error) {
return utils.Wrap1(m.rdb.Set(ctx, userMinSeq+userID, minSeq, 0).Err())
}
func (m *msgCache) GetUserMinSeq(ctx context.Context, userID string) (int64, error) {
return utils.Wrap2(m.rdb.Get(ctx, userMinSeq+userID).Int64())
}
func (m *msgCache) SetGroupUserMinSeq(ctx context.Context, groupID, userID string, minSeq int64) (err error) {
key := groupUserMinSeq + "g:" + groupID + "u:" + userID
return utils.Wrap1(m.rdb.Set(ctx, key, minSeq, 0).Err())
}
func (m *msgCache) GetGroupUserMinSeq(ctx context.Context, groupID, userID string) (int64, error) {
return utils.Wrap2(m.rdb.Get(ctx, groupMinSeq+groupID).Int64())
}
func (m *msgCache) GetGroupMaxSeq(ctx context.Context, groupID string) (int64, error) {
return utils.Wrap2(m.rdb.Get(ctx, groupMaxSeq+groupID).Int64())
}
func (m *msgCache) GetGroupMinSeq(ctx context.Context, groupID string) (int64, error) {
return utils.Wrap2(m.rdb.Get(ctx, groupMinSeq+groupID).Int64())
}
func (m *msgCache) IncrGroupMaxSeq(ctx context.Context, groupID string) (int64, error) {
key := groupMaxSeq + groupID
seq, err := m.rdb.Incr(ctx, key).Uint64()
return int64(seq), utils.Wrap1(err)
}
func (m *msgCache) SetGroupMaxSeq(ctx context.Context, groupID string, maxSeq int64) error {
key := groupMaxSeq + groupID
return utils.Wrap1(m.rdb.Set(ctx, key, maxSeq, 0).Err())
}
func (m *msgCache) SetGroupMinSeq(ctx context.Context, groupID string, minSeq int64) error {
key := groupMinSeq + groupID
return utils.Wrap1(m.rdb.Set(ctx, key, minSeq, 0).Err())
}
func (m *msgCache) AddTokenFlag(ctx context.Context, userID string, platformID int, token string, flag int) error {
key := uidPidToken + userID + ":" + constant.PlatformIDToName(platformID)
return utils.Wrap1(m.rdb.HSet(ctx, key, token, flag).Err())
}
func (m *msgCache) GetTokensWithoutError(ctx context.Context, userID, platformID string) (map[string]int, error) {
key := uidPidToken + userID + ":" + platformID
r, err := m.rdb.HGetAll(ctx, key).Result()
if err != nil {
return nil, utils.Wrap1(err)
}
mm := make(map[string]int)
for k, v := range r {
mm[k] = utils.StringToInt(v)
}
return mm, nil
}
func (m *msgCache) SetTokenMapByUidPid(ctx context.Context, userID string, platformID int, r map[string]int) error {
key := uidPidToken + userID + ":" + constant.PlatformIDToName(platformID)
mm := make(map[string]interface{})
for k, v := range r {
mm[k] = v
}
return utils.Wrap1(m.rdb.HSet(ctx, key, mm).Err())
}
func (m *msgCache) DeleteTokenByUidPid(ctx context.Context, userID string, platformID int, fields []string) error {
key := uidPidToken + userID + ":" + constant.PlatformIDToName(platformID)
return utils.Wrap1(m.rdb.HDel(ctx, key, fields...).Err())
}
func (m *msgCache) GetMessageListBySeq(ctx context.Context, userID string, seqList []int64) (seqMsg []*sdkws.MsgData, failedSeqList []int64, err error) {
var errResult error
for _, v := range seqList {
//MESSAGE_CACHE:169.254.225.224_reliability1653387820_0_1
key := messageCache + userID + "_" + strconv.Itoa(int(v))
result, err := m.rdb.Get(ctx, key).Result()
if err != nil {
errResult = err
failedSeqList = append(failedSeqList, v)
} else {
msg := sdkws.MsgData{}
err = jsonpb.UnmarshalString(result, &msg)
if err != nil {
errResult = err
failedSeqList = append(failedSeqList, v)
} else {
seqMsg = append(seqMsg, &msg)
}
}
}
return seqMsg, failedSeqList, errResult
}
func (m *msgCache) SetMessageToCache(ctx context.Context, userID string, msgList []*pbChat.MsgDataToMQ) (int, error) {
pipe := m.rdb.Pipeline()
var failedList []pbChat.MsgDataToMQ
for _, msg := range msgList {
key := messageCache + userID + "_" + strconv.Itoa(int(msg.MsgData.Seq))
s, err := utils.Pb2String(msg.MsgData)
if err != nil {
return 0, utils.Wrap1(err)
}
err = pipe.Set(ctx, key, s, time.Duration(config.Config.MsgCacheTimeout)*time.Second).Err()
if err != nil {
return 0, utils.Wrap1(err)
}
}
if len(failedList) != 0 {
return len(failedList), errors.New(fmt.Sprintf("set msg to cache failed, failed lists: %q,%s", failedList, tracelog.GetOperationID(ctx)))
}
_, err := pipe.Exec(ctx)
return 0, err
}
func (m *msgCache) DeleteMessageFromCache(ctx context.Context, userID string, msgList []*pbChat.MsgDataToMQ) error {
for _, msg := range msgList {
if err := m.rdb.Del(ctx, messageCache+userID+"_"+strconv.Itoa(int(msg.MsgData.Seq))).Err(); err != nil {
return utils.Wrap1(err)
}
}
return nil
}
func (m *msgCache) CleanUpOneUserAllMsg(ctx context.Context, userID string) error {
key := messageCache + userID + "_" + "*"
vals, err := m.rdb.Keys(ctx, key).Result()
if err == redis.Nil {
return nil
}
if err != nil {
return utils.Wrap1(err)
}
for _, v := range vals {
if err := m.rdb.Del(ctx, v).Err(); err != nil {
return utils.Wrap1(err)
}
}
return nil
}
func (m *msgCache) HandleSignalInfo(ctx context.Context, msg *sdkws.MsgData, pushToUserID string) (isSend bool, err error) {
req := &pbRtc.SignalReq{}
if err := proto.Unmarshal(msg.Content, req); err != nil {
return false, utils.Wrap1(err)
}
var inviteeUserIDList []string
var isInviteSignal bool
switch signalInfo := req.Payload.(type) {
case *pbRtc.SignalReq_Invite:
inviteeUserIDList = signalInfo.Invite.Invitation.InviteeUserIDList
isInviteSignal = true
case *pbRtc.SignalReq_InviteInGroup:
inviteeUserIDList = signalInfo.InviteInGroup.Invitation.InviteeUserIDList
isInviteSignal = true
if !utils.Contain(pushToUserID, inviteeUserIDList...) {
return false, nil
}
case *pbRtc.SignalReq_HungUp, *pbRtc.SignalReq_Cancel, *pbRtc.SignalReq_Reject, *pbRtc.SignalReq_Accept:
return false, utils.Wrap1(errors.New("signalInfo do not need offlinePush"))
default:
return false, nil
}
if isInviteSignal {
for _, userID := range inviteeUserIDList {
timeout, err := strconv.Atoi(config.Config.Rtc.SignalTimeout)
if err != nil {
return false, utils.Wrap1(err)
}
keyList := SignalListCache + userID
err = m.rdb.LPush(ctx, keyList, msg.ClientMsgID).Err()
if err != nil {
return false, utils.Wrap1(err)
}
err = m.rdb.Expire(ctx, keyList, time.Duration(timeout)*time.Second).Err()
if err != nil {
return false, utils.Wrap1(err)
}
key := SignalCache + msg.ClientMsgID
err = m.rdb.Set(ctx, key, msg.Content, time.Duration(timeout)*time.Second).Err()
if err != nil {
return false, utils.Wrap1(err)
}
}
}
return true, nil
}
func (m *msgCache) GetSignalInfoFromCacheByClientMsgID(ctx context.Context, clientMsgID string) (invitationInfo *pbRtc.SignalInviteReq, err error) {
bytes, err := m.rdb.Get(ctx, SignalCache+clientMsgID).Bytes()
if err != nil {
return nil, utils.Wrap1(err)
}
req := &pbRtc.SignalReq{}
if err = proto.Unmarshal(bytes, req); err != nil {
return nil, utils.Wrap1(err)
}
invitationInfo = &pbRtc.SignalInviteReq{}
switch req2 := req.Payload.(type) {
case *pbRtc.SignalReq_Invite:
invitationInfo.Invitation = req2.Invite.Invitation
invitationInfo.OpUserID = req2.Invite.OpUserID
case *pbRtc.SignalReq_InviteInGroup:
invitationInfo.Invitation = req2.InviteInGroup.Invitation
invitationInfo.OpUserID = req2.InviteInGroup.OpUserID
}
return invitationInfo, nil
}
func (m *msgCache) GetAvailableSignalInvitationInfo(ctx context.Context, userID string) (invitationInfo *pbRtc.SignalInviteReq, err error) {
key, err := m.rdb.LPop(ctx, SignalListCache+userID).Result()
if err != nil {
return nil, utils.Wrap1(err)
}
invitationInfo, err = m.GetSignalInfoFromCacheByClientMsgID(ctx, key)
if err != nil {
return nil, err
}
return invitationInfo, m.DelUserSignalList(ctx, userID)
}
func (m *msgCache) DelUserSignalList(ctx context.Context, userID string) error {
return utils.Wrap1(m.rdb.Del(ctx, SignalListCache+userID).Err())
}
func (m *msgCache) DelMsgFromCache(ctx context.Context, userID string, seqList []int64) error {
for _, seq := range seqList {
key := messageCache + userID + "_" + strconv.Itoa(int(seq))
result, err := m.rdb.Get(ctx, key).Result()
if err != nil {
if err == redis.Nil {
continue
}
return utils.Wrap1(err)
}
var msg sdkws.MsgData
if err := jsonpb.UnmarshalString(result, &msg); err != nil {
return err
}
msg.Status = constant.MsgDeleted
s, err := utils.Pb2String(&msg)
if err != nil {
return utils.Wrap1(err)
}
if err := m.rdb.Set(ctx, key, s, time.Duration(config.Config.MsgCacheTimeout)*time.Second).Err(); err != nil {
return utils.Wrap1(err)
}
}
return nil
}
func (m *msgCache) SetGetuiToken(ctx context.Context, token string, expireTime int64) error {
return utils.Wrap1(m.rdb.Set(ctx, getuiToken, token, time.Duration(expireTime)*time.Second).Err())
}
func (m *msgCache) GetGetuiToken(ctx context.Context) (string, error) {
return utils.Wrap2(m.rdb.Get(ctx, getuiToken).Result())
}
func (m *msgCache) SetGetuiTaskID(ctx context.Context, taskID string, expireTime int64) error {
return utils.Wrap1(m.rdb.Set(ctx, getuiTaskID, taskID, time.Duration(expireTime)*time.Second).Err())
}
func (m *msgCache) GetGetuiTaskID(ctx context.Context) (string, error) {
return utils.Wrap2(m.rdb.Get(ctx, getuiTaskID).Result())
}
func (m *msgCache) SetSendMsgStatus(ctx context.Context, id string, status int32) error {
return utils.Wrap1(m.rdb.Set(ctx, sendMsgFailedFlag+id, status, time.Hour*24).Err())
}
func (m *msgCache) GetSendMsgStatus(ctx context.Context, id string) (int32, error) {
result, err := m.rdb.Get(ctx, sendMsgFailedFlag+id).Int()
return int32(result), utils.Wrap1(err)
}
func (m *msgCache) SetFcmToken(ctx context.Context, account string, platformID int, fcmToken string, expireTime int64) (err error) {
return utils.Wrap1(m.rdb.Set(ctx, FcmToken+account+":"+strconv.Itoa(platformID), fcmToken, time.Duration(expireTime)*time.Second).Err())
}
func (m *msgCache) GetFcmToken(ctx context.Context, account string, platformID int) (string, error) {
return utils.Wrap2(m.rdb.Get(ctx, FcmToken+account+":"+strconv.Itoa(platformID)).Result())
}
func (m *msgCache) DelFcmToken(ctx context.Context, account string, platformID int) error {
return utils.Wrap1(m.rdb.Del(ctx, FcmToken+account+":"+strconv.Itoa(platformID)).Err())
}
func (m *msgCache) IncrUserBadgeUnreadCountSum(ctx context.Context, userID string) (int, error) {
seq, err := m.rdb.Incr(ctx, userBadgeUnreadCountSum+userID).Result()
return int(seq), utils.Wrap1(err)
}
func (m *msgCache) SetUserBadgeUnreadCountSum(ctx context.Context, userID string, value int) error {
return utils.Wrap1(m.rdb.Set(ctx, userBadgeUnreadCountSum+userID, value, 0).Err())
}
func (m *msgCache) GetUserBadgeUnreadCountSum(ctx context.Context, userID string) (int, error) {
return utils.Wrap2(m.rdb.Get(ctx, userBadgeUnreadCountSum+userID).Int())
}
func (m *msgCache) LockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error {
key := exTypeKeyLocker + clientMsgID + "_" + TypeKey
return utils.Wrap1(m.rdb.SetNX(ctx, key, 1, time.Minute).Err())
}
func (m *msgCache) UnLockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error {
key := exTypeKeyLocker + clientMsgID + "_" + TypeKey
return utils.Wrap1(m.rdb.Del(ctx, key).Err())
}
func (m *msgCache) getMessageReactionExPrefix(clientMsgID string, sessionType int32) string {
switch sessionType {
case constant.SingleChatType:
return "EX_SINGLE_" + clientMsgID
case constant.GroupChatType:
return "EX_GROUP_" + clientMsgID
case constant.SuperGroupChatType:
return "EX_SUPER_GROUP_" + clientMsgID
case constant.NotificationChatType:
return "EX_NOTIFICATION" + clientMsgID
}
return ""
}
func (m *msgCache) JudgeMessageReactionEXISTS(ctx context.Context, clientMsgID string, sessionType int32) (bool, error) {
n, err := m.rdb.Exists(ctx, m.getMessageReactionExPrefix(clientMsgID, sessionType)).Result()
if err != nil {
return false, utils.Wrap(err, "")
}
return n > 0, nil
}
func (m *msgCache) SetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey, value string) error {
return utils.Wrap1(m.rdb.HSet(ctx, m.getMessageReactionExPrefix(clientMsgID, sessionType), typeKey, value).Err())
}
func (m *msgCache) SetMessageReactionExpire(ctx context.Context, clientMsgID string, sessionType int32, expiration time.Duration) (bool, error) {
return utils.Wrap2(m.rdb.Expire(ctx, m.getMessageReactionExPrefix(clientMsgID, sessionType), expiration).Result())
}
func (m *msgCache) GetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey string) (string, error) {
return utils.Wrap2(m.rdb.HGet(ctx, m.getMessageReactionExPrefix(clientMsgID, sessionType), typeKey).Result())
}
func (m *msgCache) GetOneMessageAllReactionList(ctx context.Context, clientMsgID string, sessionType int32) (map[string]string, error) {
return utils.Wrap2(m.rdb.HGetAll(ctx, m.getMessageReactionExPrefix(clientMsgID, sessionType)).Result())
}
func (m *msgCache) DeleteOneMessageKey(ctx context.Context, clientMsgID string, sessionType int32, subKey string) error {
return utils.Wrap1(m.rdb.HDel(ctx, m.getMessageReactionExPrefix(clientMsgID, sessionType), subKey).Err())
}
+225 -291
View File
@@ -1,21 +1,47 @@
package cache
import (
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant"
pbChat "Open_IM/pkg/proto/msg"
pbRtc "Open_IM/pkg/proto/rtc"
"Open_IM/pkg/proto/sdkws"
"Open_IM/pkg/utils"
"OpenIM/pkg/common/config"
"OpenIM/pkg/common/constant"
"OpenIM/pkg/common/tracelog"
pbChat "OpenIM/pkg/proto/msg"
pbRtc "OpenIM/pkg/proto/rtc"
"OpenIM/pkg/proto/sdkws"
"OpenIM/pkg/utils"
"context"
"errors"
"fmt"
"github.com/golang/protobuf/jsonpb"
"github.com/golang/protobuf/proto"
"strconv"
"time"
"github.com/go-redis/redis/v8"
"github.com/golang/protobuf/jsonpb"
"github.com/golang/protobuf/proto"
)
const (
userIncrSeq = "REDIS_USER_INCR_SEQ:" // user incr seq
appleDeviceToken = "DEVICE_TOKEN"
userMinSeq = "REDIS_USER_MIN_SEQ:"
getuiToken = "GETUI_TOKEN"
getuiTaskID = "GETUI_TASK_ID"
messageCache = "MESSAGE_CACHE:"
signalCache = "SIGNAL_CACHE:"
signalListCache = "SIGNAL_LIST_CACHE:"
FcmToken = "FCM_TOKEN:"
groupUserMinSeq = "GROUP_USER_MIN_SEQ:"
groupMaxSeq = "GROUP_MAX_SEQ:"
groupMinSeq = "GROUP_MIN_SEQ:"
sendMsgFailedFlag = "SEND_MSG_FAILED_FLAG:"
userBadgeUnreadCountSum = "USER_BADGE_UNREAD_COUNT_SUM:"
exTypeKeyLocker = "EX_LOCK:"
uidPidToken = "UID_PID_TOKEN_STATUS:"
SignalListCache = "SIGNAL_LIST_CACHE:"
SignalCache = "SIGNAL_CACHE:"
)
type Cache interface {
@@ -28,6 +54,7 @@ type Cache interface {
SetGroupUserMinSeq(ctx context.Context, groupID, userID string, minSeq int64) (err error)
GetGroupUserMinSeq(ctx context.Context, groupID, userID string) (int64, error)
GetGroupMaxSeq(ctx context.Context, groupID string) (int64, error)
GetGroupMinSeq(ctx context.Context, groupID string) (int64, error)
IncrGroupMaxSeq(ctx context.Context, groupID string) (int64, error)
SetGroupMaxSeq(ctx context.Context, groupID string, maxSeq int64) error
SetGroupMinSeq(ctx context.Context, groupID string, minSeq int64) error
@@ -38,7 +65,7 @@ type Cache interface {
SetTokenMapByUidPid(ctx context.Context, userID string, platformID int, m map[string]int) error
DeleteTokenByUidPid(ctx context.Context, userID string, platformID int, fields []string) error
GetMessageListBySeq(ctx context.Context, userID string, seqList []int64) (seqMsg []*sdkws.MsgData, failedSeqList []int64, err error)
GetMessagesBySeq(ctx context.Context, userID string, seqList []int64) (seqMsg []*sdkws.MsgData, failedSeqList []int64, err error)
SetMessageToCache(ctx context.Context, userID string, msgList []*pbChat.MsgDataToMQ) (int, error)
DeleteMessageFromCache(ctx context.Context, userID string, msgList []*pbChat.MsgDataToMQ) error
CleanUpOneUserAllMsg(ctx context.Context, userID string) error
@@ -53,8 +80,8 @@ type Cache interface {
SetGetuiTaskID(ctx context.Context, taskID string, expireTime int64) error
GetGetuiTaskID(ctx context.Context) (string, error)
SetSendMsgStatus(ctx context.Context, status int32) error
GetSendMsgStatus(ctx context.Context) (int, error)
SetSendMsgStatus(ctx context.Context, id string, status int32) error
GetSendMsgStatus(ctx context.Context, id string) (int32, error)
SetFcmToken(ctx context.Context, account string, platformID int, fcmToken string, expireTime int64) (err error)
GetFcmToken(ctx context.Context, account string, platformID int) (string, error)
DelFcmToken(ctx context.Context, account string, platformID int) error
@@ -71,298 +98,227 @@ type Cache interface {
UnLockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error
}
// native redis operate
//func NewRedis() *RedisClient {
// o := &RedisClient{}
// o.InitRedis()
// return o
//}
func NewRedis() (*RedisClient, error) {
var rdb redis.UniversalClient
if config.Config.Redis.EnableCluster {
rdb = redis.NewClusterClient(&redis.ClusterOptions{
Addrs: config.Config.Redis.DBAddress,
Username: config.Config.Redis.DBUserName,
Password: config.Config.Redis.DBPassWord, // no password set
PoolSize: 50,
})
//if err := rdb.Ping(ctx).Err();err != nil {
// return nil, fmt.Errorf("redis ping %w", err)
//}
//return &RedisClient{rdb: rdb}, nil
} else {
rdb = redis.NewClient(&redis.Options{
Addr: config.Config.Redis.DBAddress[0],
Username: config.Config.Redis.DBUserName,
Password: config.Config.Redis.DBPassWord, // no password set
DB: 0, // use default DB
PoolSize: 100, // 连接池大小
})
//err := rdb.Ping(ctx).Err()
//if err != nil {
// panic(err.Error() + " redis " + config.Config.Redis.DBAddress[0] + config.Config.Redis.DBUserName + config.Config.Redis.DBPassWord)
//}
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
err := rdb.Ping(ctx).Err()
if err != nil {
return nil, fmt.Errorf("redis ping %w", err)
}
return &RedisClient{rdb: rdb}, nil
func NewMsgCache(client redis.UniversalClient) Cache {
return &msgCache{rdb: client}
}
type RedisClient struct {
type msgCache struct {
rdb redis.UniversalClient
}
func NewRedisClient(rdb redis.UniversalClient) *RedisClient {
return &RedisClient{rdb: rdb}
func (m *msgCache) IncrUserSeq(ctx context.Context, userID string) (int64, error) {
return utils.Wrap2(m.rdb.Get(ctx, userIncrSeq+userID).Int64())
}
func (r *RedisClient) GetClient() redis.UniversalClient {
return r.rdb
func (m *msgCache) GetUserMaxSeq(ctx context.Context, userID string) (int64, error) {
return utils.Wrap2(m.rdb.Get(ctx, userIncrSeq+userID).Int64())
}
// Perform seq auto-increment operation of user messages
func (r *RedisClient) IncrUserSeq(ctx context.Context, uid string) (int64, error) {
key := userIncrSeq + uid
seq, err := r.rdb.Incr(ctx, key).Result()
return seq, err
func (m *msgCache) SetUserMaxSeq(ctx context.Context, userID string, maxSeq int64) error {
return utils.Wrap1(m.rdb.Set(ctx, userIncrSeq+userID, maxSeq, 0).Err())
}
// Get the largest Seq
func (r *RedisClient) GetUserMaxSeq(ctx context.Context, uid string) (int64, error) {
key := userIncrSeq + uid
seq, err := r.rdb.Get(ctx, key).Result()
return int64(utils.StringToInt(seq)), err
func (m *msgCache) SetUserMinSeq(ctx context.Context, userID string, minSeq int64) (err error) {
return utils.Wrap1(m.rdb.Set(ctx, userMinSeq+userID, minSeq, 0).Err())
}
// set the largest Seq
func (r *RedisClient) SetUserMaxSeq(ctx context.Context, uid string, maxSeq int64) error {
key := userIncrSeq + uid
return r.rdb.Set(ctx, key, maxSeq, 0).Err()
func (m *msgCache) GetUserMinSeq(ctx context.Context, userID string) (int64, error) {
return utils.Wrap2(m.rdb.Get(ctx, userMinSeq+userID).Int64())
}
// Set the user's minimum seq
func (r *RedisClient) SetUserMinSeq(ctx context.Context, uid string, minSeq int64) (err error) {
key := userMinSeq + uid
return r.rdb.Set(ctx, key, minSeq, 0).Err()
}
// Get the smallest Seq
func (r *RedisClient) GetUserMinSeq(ctx context.Context, uid string) (int64, error) {
key := userMinSeq + uid
seq, err := r.rdb.Get(ctx, key).Result()
return int64(utils.StringToInt(seq)), err
}
func (r *RedisClient) SetGroupUserMinSeq(ctx context.Context, groupID, userID string, minSeq int64) (err error) {
func (m *msgCache) SetGroupUserMinSeq(ctx context.Context, groupID, userID string, minSeq int64) (err error) {
key := groupUserMinSeq + "g:" + groupID + "u:" + userID
return r.rdb.Set(ctx, key, minSeq, 0).Err()
}
func (r *RedisClient) GetGroupUserMinSeq(ctx context.Context, groupID, userID string) (int64, error) {
key := groupUserMinSeq + "g:" + groupID + "u:" + userID
seq, err := r.rdb.Get(ctx, key).Result()
return int64(utils.StringToInt(seq)), err
return utils.Wrap1(m.rdb.Set(ctx, key, minSeq, 0).Err())
}
func (r *RedisClient) GetGroupMaxSeq(ctx context.Context, groupID string) (int64, error) {
func (m *msgCache) GetGroupUserMinSeq(ctx context.Context, groupID, userID string) (int64, error) {
return utils.Wrap2(m.rdb.Get(ctx, groupMinSeq+groupID).Int64())
}
func (m *msgCache) GetGroupMaxSeq(ctx context.Context, groupID string) (int64, error) {
return utils.Wrap2(m.rdb.Get(ctx, groupMaxSeq+groupID).Int64())
}
func (m *msgCache) GetGroupMinSeq(ctx context.Context, groupID string) (int64, error) {
return utils.Wrap2(m.rdb.Get(ctx, groupMinSeq+groupID).Int64())
}
func (m *msgCache) IncrGroupMaxSeq(ctx context.Context, groupID string) (int64, error) {
key := groupMaxSeq + groupID
seq, err := r.rdb.Get(ctx, key).Result()
return int64(utils.StringToInt(seq)), err
seq, err := m.rdb.Incr(ctx, key).Uint64()
return int64(seq), utils.Wrap1(err)
}
func (r *RedisClient) IncrGroupMaxSeq(ctx context.Context, groupID string) (int64, error) {
func (m *msgCache) SetGroupMaxSeq(ctx context.Context, groupID string, maxSeq int64) error {
key := groupMaxSeq + groupID
seq, err := r.rdb.Incr(ctx, key).Result()
return seq, err
return utils.Wrap1(m.rdb.Set(ctx, key, maxSeq, 0).Err())
}
func (r *RedisClient) SetGroupMaxSeq(ctx context.Context, groupID string, maxSeq int64) error {
key := groupMaxSeq + groupID
return r.rdb.Set(ctx, key, maxSeq, 0).Err()
}
func (r *RedisClient) SetGroupMinSeq(ctx context.Context, groupID string, minSeq int64) error {
func (m *msgCache) SetGroupMinSeq(ctx context.Context, groupID string, minSeq int64) error {
key := groupMinSeq + groupID
return r.rdb.Set(ctx, key, minSeq, 0).Err()
return utils.Wrap1(m.rdb.Set(ctx, key, minSeq, 0).Err())
}
// Store userid and platform class to redis
func (r *RedisClient) AddTokenFlag(ctx context.Context, userID string, platformID int, token string, flag int) error {
func (m *msgCache) AddTokenFlag(ctx context.Context, userID string, platformID int, token string, flag int) error {
key := uidPidToken + userID + ":" + constant.PlatformIDToName(platformID)
return r.rdb.HSet(ctx, key, token, flag).Err()
return utils.Wrap1(m.rdb.HSet(ctx, key, token, flag).Err())
}
//key:userID+platform-> <token, flag>
func (r *RedisClient) GetTokenMapByUidPid(ctx context.Context, userID string, platformID int) (map[string]int, error) {
key := uidPidToken + userID + ":" + strconv.Itoa(platformID)
m, err := r.rdb.HGetAll(ctx, key).Result()
mm := make(map[string]int)
for k, v := range m {
mm[k] = utils.StringToInt(v)
}
return mm, err
}
func (r *RedisClient) GetTokensWithoutError(ctx context.Context, userID, platform string) (map[string]int, error) {
key := uidPidToken + userID + ":" + platform
m, err := r.rdb.HGetAll(ctx, key).Result()
if err != nil && err == redis.Nil {
return nil, nil
func (m *msgCache) GetTokensWithoutError(ctx context.Context, userID, platformID string) (map[string]int, error) {
key := uidPidToken + userID + ":" + platformID
m, err := m.rdb.HGetAll(ctx, key).Result()
if err != nil {
return nil, utils.Wrap1(err)
}
mm := make(map[string]int)
for k, v := range m {
mm[k] = utils.StringToInt(v)
}
return mm, utils.Wrap(err, "")
return mm, nil
}
func (r *RedisClient) SetTokenMapByUidPid(ctx context.Context, userID string, platform string, m map[string]int) error {
key := uidPidToken + userID + ":" + platform
func (m *msgCache) SetTokenMapByUidPid(ctx context.Context, userID string, platformID int, m map[string]int) error {
key := uidPidToken + userID + ":" + constant.PlatformIDToName(platformID)
mm := make(map[string]interface{})
for k, v := range m {
mm[k] = v
}
return r.rdb.HSet(ctx, key, mm).Err()
return utils.Wrap1(m.rdb.HSet(ctx, key, mm).Err())
}
func (r *RedisClient) DeleteTokenByUidPid(ctx context.Context, userID string, platform string, fields []string) error {
key := uidPidToken + userID + ":" + platform
return r.rdb.HDel(ctx, key, fields...).Err()
func (m *msgCache) DeleteTokenByUidPid(ctx context.Context, userID string, platformID int, fields []string) error {
key := uidPidToken + userID + ":" + constant.PlatformIDToName(platformID)
return utils.Wrap1(m.rdb.HDel(ctx, key, fields...).Err())
}
func (r *RedisClient) GetMessagesBySeq(ctx context.Context, userID string, seqList []int64, operationID string) (seqMsgs []*sdkws.MsgData, failedSeqs []int64, err2 error) {
func (m *msgCache) GetMessagesBySeq(ctx context.Context, userID string, seqList []int64) (seqMsg []*sdkws.MsgData, failedSeqList []int64, err error) {
var errResult error
for _, v := range seqList {
//MESSAGE_CACHE:169.254.225.224_reliability1653387820_0_1
key := messageCache + userID + "_" + strconv.Itoa(int(v))
result, err := r.rdb.Get(ctx, key).Result()
result, err := m.rdb.Get(ctx, key).Result()
if err != nil {
if err != redis.Nil {
err2 = err
}
failedSeqs = append(failedSeqs, v)
errResult = err
failedSeqList = append(failedSeqList, v)
} else {
msg := sdkws.MsgData{}
err = jsonpb.UnmarshalString(result, &msg)
if err != nil {
err2 = err
failedSeqs = append(failedSeqs, v)
errResult = err
failedSeqList = append(failedSeqList, v)
} else {
seqMsgs = append(seqMsgs, &msg)
seqMsg = append(seqMsg, &msg)
}
}
}
return seqMsgs, failedSeqs, err2
return seqMsg, failedSeqList, errResult
}
func (r *RedisClient) SetMessageToCache(ctx context.Context, userID string, msgs []*pbChat.MsgDataToMQ, uid string) (int, error) {
pipe := r.rdb.Pipeline()
var failedMsgs []pbChat.MsgDataToMQ
for _, msg := range msgs {
key := messageCache + uid + "_" + strconv.Itoa(int(msg.MsgData.Seq))
func (m *msgCache) SetMessageToCache(ctx context.Context, userID string, msgList []*pbChat.MsgDataToMQ) (int, error) {
pipe := m.rdb.Pipeline()
var failedList []pbChat.MsgDataToMQ
for _, msg := range msgList {
key := messageCache + userID + "_" + strconv.Itoa(int(msg.MsgData.Seq))
s, err := utils.Pb2String(msg.MsgData)
if err != nil {
continue
return 0, utils.Wrap1(err)
}
err = pipe.Set(ctx, key, s, time.Duration(config.Config.MsgCacheTimeout)*time.Second).Err()
//err = r.rdb.HMSet(ctx, "12", map[string]interface{}{"1": 2, "343": false}).Err()
if err != nil {
failedMsgs = append(failedMsgs, *msg)
return 0, utils.Wrap1(err)
}
}
if len(failedMsgs) != 0 {
return len(failedMsgs), errors.New(fmt.Sprintf("set msg to cache failed, failed lists: %v", failedMsgs))
if len(failedList) != 0 {
return len(failedList), errors.New(fmt.Sprintf("set msg to cache failed, failed lists: %q,%s", failedList, tracelog.GetOperationID(ctx)))
}
_, err := pipe.Exec(ctx)
return 0, err
}
func (r *RedisClient) DeleteMessageFromCache(ctx context.Context, userID string, msgs []*pbChat.MsgDataToMQ) error {
for _, msg := range msgs {
key := messageCache + userID + "_" + strconv.Itoa(int(msg.MsgData.Seq))
err := r.rdb.Del(ctx, key).Err()
if err != nil {
func (m *msgCache) DeleteMessageFromCache(ctx context.Context, userID string, msgList []*pbChat.MsgDataToMQ) error {
for _, msg := range msgList {
if err := m.rdb.Del(ctx, messageCache+userID+"_"+strconv.Itoa(int(msg.MsgData.Seq))).Err(); err != nil {
return utils.Wrap1(err)
}
}
return nil
}
func (r *RedisClient) CleanUpOneUserAllMsg(ctx context.Context, userID string) error {
func (m *msgCache) CleanUpOneUserAllMsg(ctx context.Context, userID string) error {
key := messageCache + userID + "_" + "*"
vals, err := r.rdb.Keys(ctx, key).Result()
vals, err := m.rdb.Keys(ctx, key).Result()
if err == redis.Nil {
return nil
}
if err != nil {
return utils.Wrap(err, "")
return utils.Wrap1(err)
}
for _, v := range vals {
err = r.rdb.Del(ctx, v).Err()
if err := m.rdb.Del(ctx, v).Err(); err != nil {
return utils.Wrap1(err)
}
}
return nil
}
func (r *RedisClient) HandleSignalInfo(ctx context.Context, operationID string, msg *sdkws.MsgData, pushToUserID string) (isSend bool, err error) {
func (m *msgCache) HandleSignalInfo(ctx context.Context, msg *sdkws.MsgData, pushToUserID string) (isSend bool, err error) {
req := &pbRtc.SignalReq{}
if err := proto.Unmarshal(msg.Content, req); err != nil {
return false, err
return false, utils.Wrap1(err)
}
var inviteeUserIDs []string
var inviteeUserIDList []string
var isInviteSignal bool
switch signalInfo := req.Payload.(type) {
case *pbRtc.SignalReq_Invite:
inviteeUserIDs = signalInfo.Invite.Invitation.InviteeUserIDList
inviteeUserIDList = signalInfo.Invite.Invitation.InviteeUserIDList
isInviteSignal = true
case *pbRtc.SignalReq_InviteInGroup:
inviteeUserIDs = signalInfo.InviteInGroup.Invitation.InviteeUserIDList
inviteeUserIDList = signalInfo.InviteInGroup.Invitation.InviteeUserIDList
isInviteSignal = true
if !utils.IsContain(pushToUserID, inviteeUserIDs) {
if !utils.Contain(pushToUserID, inviteeUserIDList...) {
return false, nil
}
case *pbRtc.SignalReq_HungUp, *pbRtc.SignalReq_Cancel, *pbRtc.SignalReq_Reject, *pbRtc.SignalReq_Accept:
return false, nil
return false, utils.Wrap1(errors.New("signalInfo do not need offlinePush"))
default:
return false, nil
}
if isInviteSignal {
for _, userID := range inviteeUserIDs {
for _, userID := range inviteeUserIDList {
timeout, err := strconv.Atoi(config.Config.Rtc.SignalTimeout)
if err != nil {
return false, err
return false, utils.Wrap1(err)
}
keyList := signalListCache + userID
err = r.rdb.LPush(ctx, keyList, msg.ClientMsgID).Err()
keyList := SignalListCache + userID
err = m.rdb.LPush(ctx, keyList, msg.ClientMsgID).Err()
if err != nil {
return false, err
return false, utils.Wrap1(err)
}
err = r.rdb.Expire(ctx, keyList, time.Duration(timeout)*time.Second).Err()
err = m.rdb.Expire(ctx, keyList, time.Duration(timeout)*time.Second).Err()
if err != nil {
return false, err
return false, utils.Wrap1(err)
}
key := signalCache + msg.ClientMsgID
err = r.rdb.Set(ctx, key, msg.Content, time.Duration(timeout)*time.Second).Err()
key := SignalCache + msg.ClientMsgID
err = m.rdb.Set(ctx, key, msg.Content, time.Duration(timeout)*time.Second).Err()
if err != nil {
return false, err
return false, utils.Wrap1(err)
}
}
}
return true, nil
}
func (r *RedisClient) GetSignalInfoFromCacheByClientMsgID(ctx context.Context, clientMsgID string) (invitationInfo *pbRtc.SignalInviteReq, err error) {
key := signalCache + clientMsgID
invitationInfo = &pbRtc.SignalInviteReq{}
bytes, err := r.rdb.Get(ctx, key).Bytes()
func (m *msgCache) GetSignalInfoFromCacheByClientMsgID(ctx context.Context, clientMsgID string) (invitationInfo *pbRtc.SignalInviteReq, err error) {
bytes, err := m.rdb.Get(ctx, SignalCache+clientMsgID).Bytes()
if err != nil {
return nil, err
return nil, utils.Wrap1(err)
}
req := &pbRtc.SignalReq{}
if err = proto.Unmarshal(bytes, req); err != nil {
return nil, err
return nil, utils.Wrap1(err)
}
invitationInfo = &pbRtc.SignalInviteReq{}
switch req2 := req.Payload.(type) {
case *pbRtc.SignalReq_Invite:
invitationInfo.Invitation = req2.Invite.Invitation
@@ -371,162 +327,112 @@ func (r *RedisClient) GetSignalInfoFromCacheByClientMsgID(ctx context.Context, c
invitationInfo.Invitation = req2.InviteInGroup.Invitation
invitationInfo.OpUserID = req2.InviteInGroup.OpUserID
}
return invitationInfo, err
}
func (r *RedisClient) GetAvailableSignalInvitationInfo(ctx context.Context, userID string) (invitationInfo *pbRtc.SignalInviteReq, err error) {
keyList := signalListCache + userID
result := r.rdb.LPop(ctx, keyList)
if err = result.Err(); err != nil {
return nil, utils.Wrap(err, "GetAvailableSignalInvitationInfo failed")
}
key, err := result.Result()
if err != nil {
return nil, utils.Wrap(err, "GetAvailableSignalInvitationInfo failed")
}
invitationInfo, err = r.GetSignalInfoFromCacheByClientMsgID(ctx, key)
if err != nil {
return nil, utils.Wrap(err, "GetSignalInfoFromCacheByClientMsgID")
}
err = r.DelUserSignalList(ctx, userID)
if err != nil {
return nil, utils.Wrap(err, "GetSignalInfoFromCacheByClientMsgID")
}
return invitationInfo, nil
}
func (r *RedisClient) DelUserSignalList(ctx context.Context, userID string) error {
keyList := signalListCache + userID
err := r.rdb.Del(ctx, keyList).Err()
return err
func (m *msgCache) GetAvailableSignalInvitationInfo(ctx context.Context, userID string) (invitationInfo *pbRtc.SignalInviteReq, err error) {
key, err := m.rdb.LPop(ctx, SignalListCache+userID).Result()
if err != nil {
return nil, utils.Wrap1(err)
}
invitationInfo, err = m.GetSignalInfoFromCacheByClientMsgID(ctx, key)
if err != nil {
return nil, err
}
return invitationInfo, m.DelUserSignalList(ctx, userID)
}
func (r *RedisClient) DelMsgFromCache(ctx context.Context, uid string, seqList []int64, operationID string) {
func (m *msgCache) DelUserSignalList(ctx context.Context, userID string) error {
return utils.Wrap1(m.rdb.Del(ctx, SignalListCache+userID).Err())
}
func (m *msgCache) DelMsgFromCache(ctx context.Context, userID string, seqList []int64) error {
for _, seq := range seqList {
key := messageCache + uid + "_" + strconv.Itoa(int(seq))
result, err := r.rdb.Get(ctx, key).Result()
key := messageCache + userID + "_" + strconv.Itoa(int(seq))
result, err := m.rdb.Get(ctx, key).Result()
if err != nil {
if err == redis.Nil {
} else {
continue
}
continue
return utils.Wrap1(err)
}
var msg sdkws.MsgData
if err := utils.String2Pb(result, &msg); err != nil {
continue
if err := jsonpb.UnmarshalString(result, &msg); err != nil {
return err
}
msg.Status = constant.MsgDeleted
s, err := utils.Pb2String(&msg)
if err != nil {
continue
return utils.Wrap1(err)
}
if err := r.rdb.Set(ctx, key, s, time.Duration(config.Config.MsgCacheTimeout)*time.Second).Err(); err != nil {
if err := m.rdb.Set(ctx, key, s, time.Duration(config.Config.MsgCacheTimeout)*time.Second).Err(); err != nil {
return utils.Wrap1(err)
}
}
return nil
}
func (r *RedisClient) SetGetuiToken(ctx context.Context, token string, expireTime int64) error {
return r.rdb.Set(ctx, getuiToken, token, time.Duration(expireTime)*time.Second).Err()
func (m *msgCache) SetGetuiToken(ctx context.Context, token string, expireTime int64) error {
return utils.Wrap1(m.rdb.Set(ctx, getuiToken, token, time.Duration(expireTime)*time.Second).Err())
}
func (r *RedisClient) GetGetuiToken(ctx context.Context) (string, error) {
result, err := r.rdb.Get(ctx, getuiToken).Result()
return result, err
func (m *msgCache) GetGetuiToken(ctx context.Context) (string, error) {
return utils.Wrap2(m.rdb.Get(ctx, getuiToken).Result())
}
func (r *RedisClient) SetGetuiTaskID(ctx context.Context, taskID string, expireTime int64) error {
return r.rdb.Set(ctx, getuiTaskID, taskID, time.Duration(expireTime)*time.Second).Err()
func (m *msgCache) SetGetuiTaskID(ctx context.Context, taskID string, expireTime int64) error {
return utils.Wrap1(m.rdb.Set(ctx, getuiTaskID, taskID, time.Duration(expireTime)*time.Second).Err())
}
func (r *RedisClient) GetGetuiTaskID(ctx context.Context) (string, error) {
result, err := r.rdb.Get(ctx, getuiTaskID).Result()
return result, err
func (m *msgCache) GetGetuiTaskID(ctx context.Context) (string, error) {
return utils.Wrap2(m.rdb.Get(ctx, getuiTaskID).Result())
}
func (r *RedisClient) SetSendMsgStatus(ctx context.Context, status int32, operationID string) error {
return r.rdb.Set(ctx, sendMsgFailedFlag+operationID, status, time.Hour*24).Err()
func (m *msgCache) SetSendMsgStatus(ctx context.Context, id string, status int32) error {
return utils.Wrap1(m.rdb.Set(ctx, sendMsgFailedFlag+id, status, time.Hour*24).Err())
}
func (r *RedisClient) GetSendMsgStatus(ctx context.Context, operationID string) (int, error) {
result, err := r.rdb.Get(ctx, sendMsgFailedFlag+operationID).Result()
if err != nil {
return 0, err
}
status, err := strconv.Atoi(result)
return status, err
func (m *msgCache) GetSendMsgStatus(ctx context.Context, id string) (int32, error) {
result, err := m.rdb.Get(ctx, sendMsgFailedFlag+id).Int()
return int32(result), utils.Wrap1(err)
}
func (r *RedisClient) SetFcmToken(ctx context.Context, account string, platformID int, fcmToken string, expireTime int64) (err error) {
key := FcmToken + account + ":" + strconv.Itoa(platformID)
return r.rdb.Set(ctx, key, fcmToken, time.Duration(expireTime)*time.Second).Err()
func (m *msgCache) SetFcmToken(ctx context.Context, account string, platformID int, fcmToken string, expireTime int64) (err error) {
return utils.Wrap1(m.rdb.Set(ctx, FcmToken+account+":"+strconv.Itoa(platformID), fcmToken, time.Duration(expireTime)*time.Second).Err())
}
func (r *RedisClient) GetFcmToken(ctx context.Context, account string, platformID int) (string, error) {
key := FcmToken + account + ":" + strconv.Itoa(platformID)
return r.rdb.Get(ctx, key).Result()
}
func (r *RedisClient) DelFcmToken(ctx context.Context, account string, platformID int) error {
key := FcmToken + account + ":" + strconv.Itoa(platformID)
return r.rdb.Del(ctx, key).Err()
}
func (r *RedisClient) IncrUserBadgeUnreadCountSum(ctx context.Context, uid string) (int, error) {
key := userBadgeUnreadCountSum + uid
seq, err := r.rdb.Incr(ctx, key).Result()
return int(seq), err
}
func (r *RedisClient) SetUserBadgeUnreadCountSum(ctx context.Context, uid string, value int) error {
key := userBadgeUnreadCountSum + uid
return r.rdb.Set(ctx, key, value, 0).Err()
}
func (r *RedisClient) GetUserBadgeUnreadCountSum(ctx context.Context, uid string) (int, error) {
key := userBadgeUnreadCountSum + uid
seq, err := r.rdb.Get(ctx, key).Result()
return utils.StringToInt(seq), err
}
func (r *RedisClient) JudgeMessageReactionEXISTS(ctx context.Context, clientMsgID string, sessionType int32) (bool, error) {
key := r.getMessageReactionExPrefix(clientMsgID, sessionType)
n, err := r.rdb.Exists(ctx, key).Result()
return n > 0, err
func (m *msgCache) GetFcmToken(ctx context.Context, account string, platformID int) (string, error) {
return utils.Wrap2(m.rdb.Get(ctx, FcmToken+account+":"+strconv.Itoa(platformID)).Result())
}
func (r *RedisClient) GetOneMessageAllReactionList(ctx context.Context, clientMsgID string, sessionType int32) (map[string]string, error) {
key := r.getMessageReactionExPrefix(clientMsgID, sessionType)
return r.rdb.HGetAll(ctx, key).Result()
}
func (r *RedisClient) DeleteOneMessageKey(ctx context.Context, clientMsgID string, sessionType int32, subKey string) error {
key := r.getMessageReactionExPrefix(clientMsgID, sessionType)
return r.rdb.HDel(ctx, key, subKey).Err()
}
func (r *RedisClient) SetMessageReactionExpire(ctx context.Context, clientMsgID string, sessionType int32, expiration time.Duration) (bool, error) {
key := r.getMessageReactionExPrefix(clientMsgID, sessionType)
return r.rdb.Expire(ctx, key, expiration).Result()
func (m *msgCache) DelFcmToken(ctx context.Context, account string, platformID int) error {
return utils.Wrap1(m.rdb.Del(ctx, FcmToken+account+":"+strconv.Itoa(platformID)).Err())
}
func (r *RedisClient) GetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey string) (string, error) {
key := r.getMessageReactionExPrefix(clientMsgID, sessionType)
result, err := r.rdb.HGet(ctx, key, typeKey).Result()
return result, err
func (m *msgCache) IncrUserBadgeUnreadCountSum(ctx context.Context, userID string) (int, error) {
seq, err := m.rdb.Incr(ctx, userBadgeUnreadCountSum+userID).Result()
return int(seq), utils.Wrap1(err)
}
func (r *RedisClient) SetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey, value string) error {
key := r.getMessageReactionExPrefix(clientMsgID, sessionType)
return r.rdb.HSet(ctx, key, typeKey, value).Err()
func (m *msgCache) SetUserBadgeUnreadCountSum(ctx context.Context, userID string, value int) error {
return utils.Wrap1(m.rdb.Set(ctx, userBadgeUnreadCountSum+userID, value, 0).Err())
}
func (r *RedisClient) LockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error {
func (m *msgCache) GetUserBadgeUnreadCountSum(ctx context.Context, userID string) (int, error) {
return utils.Wrap2(m.rdb.Get(ctx, userBadgeUnreadCountSum+userID).Int())
}
func (m *msgCache) LockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error {
key := exTypeKeyLocker + clientMsgID + "_" + TypeKey
return r.rdb.SetNX(ctx, key, 1, time.Minute).Err()
return utils.Wrap1(m.rdb.SetNX(ctx, key, 1, time.Minute).Err())
}
func (r *RedisClient) UnLockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error {
func (m *msgCache) UnLockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error {
key := exTypeKeyLocker + clientMsgID + "_" + TypeKey
return r.rdb.Del(ctx, key).Err()
return utils.Wrap1(m.rdb.Del(ctx, key).Err())
}
func (r *RedisClient) getMessageReactionExPrefix(clientMsgID string, sessionType int32) string {
func (m *msgCache) getMessageReactionExPrefix(clientMsgID string, sessionType int32) string {
switch sessionType {
case constant.SingleChatType:
return "EX_SINGLE_" + clientMsgID
@@ -539,3 +445,31 @@ func (r *RedisClient) getMessageReactionExPrefix(clientMsgID string, sessionType
}
return ""
}
func (m *msgCache) JudgeMessageReactionEXISTS(ctx context.Context, clientMsgID string, sessionType int32) (bool, error) {
n, err := m.rdb.Exists(ctx, m.getMessageReactionExPrefix(clientMsgID, sessionType)).Result()
if err != nil {
return false, utils.Wrap(err, "")
}
return n > 0, nil
}
func (m *msgCache) SetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey, value string) error {
return utils.Wrap1(m.rdb.HSet(ctx, m.getMessageReactionExPrefix(clientMsgID, sessionType), typeKey, value).Err())
}
func (m *msgCache) SetMessageReactionExpire(ctx context.Context, clientMsgID string, sessionType int32, expiration time.Duration) (bool, error) {
return utils.Wrap2(m.rdb.Expire(ctx, m.getMessageReactionExPrefix(clientMsgID, sessionType), expiration).Result())
}
func (m *msgCache) GetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey string) (string, error) {
return utils.Wrap2(m.rdb.HGet(ctx, m.getMessageReactionExPrefix(clientMsgID, sessionType), typeKey).Result())
}
func (m *msgCache) GetOneMessageAllReactionList(ctx context.Context, clientMsgID string, sessionType int32) (map[string]string, error) {
return utils.Wrap2(m.rdb.HGetAll(ctx, m.getMessageReactionExPrefix(clientMsgID, sessionType)).Result())
}
func (m *msgCache) DeleteOneMessageKey(ctx context.Context, clientMsgID string, sessionType int32, subKey string) error {
return utils.Wrap1(m.rdb.HDel(ctx, m.getMessageReactionExPrefix(clientMsgID, sessionType), subKey).Err())
}
+103 -103
View File
@@ -1,107 +1,107 @@
package cache
//import (
// "Open_IM/pkg/common/constant"
// pbChat "Open_IM/pkg/proto/msg"
// common "Open_IM/pkg/proto/sdkws"
// "context"
// "flag"
// "fmt"
// "github.com/stretchr/testify/assert"
// "testing"
//)
//
//var DB RedisClient
//
//func Test_SetTokenMapByUidPid(t *testing.T) {
// //m := make(map[string]int, 0)
// //m["test1"] = 1
// //m["test2"] = 2
// //m["2332"] = 4
// //err := DB.SetTokenMapByUidPid("1234", 2, m)
// //assert.Nil(t, err)
//
//}
//func Test_GetTokenMapByUidPid(t *testing.T) {
// //m, err := DB.GetTokenMapByUidPid("1234", "Android")
// //assert.Nil(t, err)
// //fmt.Println(m)
//}
//
////func TestDataBases_GetMultiConversationMsgOpt(t *testing.T) {
//// m, err := DB.GetMultiConversationMsgOpt("fg", []string{"user", "age", "color"})
//// assert.Nil(t, err)
//// fmt.Println(m)
////}
//func Test_GetKeyTTL(t *testing.T) {
// ctx := context.Background()
// key := flag.String("key", "key", "key value")
// flag.Parse()
// ttl, err := DB.GetClient().TTL(ctx, *key).Result()
import (
"OpenIM/pkg/common/constant"
pbChat "OpenIM/pkg/proto/msg"
common "OpenIM/pkg/proto/sdkws"
"context"
"flag"
"fmt"
"github.com/stretchr/testify/assert"
"testing"
)
var DB RedisClient
func Test_SetTokenMapByUidPid(t *testing.T) {
m := make(map[string]int, 0)
m["test1"] = 1
m["test2"] = 2
m["2332"] = 4
err := DB.SetTokenMapByUidPid("1234", 2, m)
assert.Nil(t, err)
}
func Test_GetTokenMapByUidPid(t *testing.T) {
m, err := DB.GetTokenMapByUidPid("1234", "Android")
assert.Nil(t, err)
fmt.Println(m)
}
//func TestDataBases_GetMultiConversationMsgOpt(t *testing.T) {
// m, err := DB.GetMultiConversationMsgOpt("fg", []string{"user", "age", "color"})
// assert.Nil(t, err)
// fmt.Println(ttl)
//}
//func Test_HGetAll(t *testing.T) {
// ctx := context.Background()
// key := flag.String("key", "key", "key value")
// flag.Parse()
// ttl, err := DB.GetClient().TTL(ctx, *key).Result()
// assert.Nil(t, err)
// fmt.Println(ttl)
//}
//
//func Test_NewSetMessageToCache(t *testing.T) {
// var msg pbChat.MsgDataToMQ
// m := make(map[string]bool)
// var offlinePush common.OfflinePushInfo
// offlinePush.Title = "3"
// offlinePush.Ex = "34"
// offlinePush.IOSPushSound = "+1"
// offlinePush.IOSBadgeCount = true
// m[constant.IsPersistent] = true
// m[constant.IsHistory] = true
// var data common.MsgData
// uid := "test_uid"
// data.Seq = 11
// data.ClientMsgID = "23jwhjsdf"
// data.SendID = "111"
// data.RecvID = "222"
// data.Content = []byte{1, 2, 3, 4, 5, 6, 7}
// data.Seq = 1212
// data.Options = m
// data.OfflinePushInfo = &offlinePush
// data.AtUserIDList = []string{"1212", "23232"}
// msg.MsgData = &data
// messageList := []*pbChat.MsgDataToMQ{&msg}
// err, _ := DB.SetMessageToCache(messageList, uid, "cacheTest")
// assert.Nil(t, err)
//
//}
//func Test_NewGetMessageListBySeq(t *testing.T) {
// var msg pbChat.MsgDataToMQ
// var data common.MsgData
// uid := "test_uid"
// data.Seq = 11
// data.ClientMsgID = "23jwhjsdf"
// msg.MsgData = &data
//
// seqMsg, failedSeqList, err := DB.GetMessageListBySeq(uid, []uint32{1212}, "cacheTest")
// assert.Nil(t, err)
// fmt.Println(seqMsg, failedSeqList)
//
//}
//
//func Test_SetFcmToken(t *testing.T) {
// uid := "test_uid"
// token := "dfnWBtOjSj-XIZnUvDlegv:APA91bG09XTtiXfpE6U7gUVMOhnKcUkNCv4WHn0UZr2clUi-tS1jEH-HiCEW8GIAhjLIGcfUJ6NIKteC023ZxDH7J0PJ5sTxoup3fHDUPLU7KgQoZS4tPyFqCbZ6bRB7esDPEnD1n_s0"
// platformID := 2
// err := DB.SetFcmToken(uid, platformID, token, 0)
// assert.Nil(t, err)
//}
//func Test_GetFcmToken(t *testing.T) {
// uid := "test_uid"
// platformID := 2
// token, err := DB.GetFcmToken(uid, platformID)
// assert.Nil(t, err)
// fmt.Println("token is :", token)
// fmt.Println(m)
//}
func Test_GetKeyTTL(t *testing.T) {
ctx := context.Background()
key := flag.String("key", "key", "key value")
flag.Parse()
ttl, err := DB.GetClient().TTL(ctx, *key).Result()
assert.Nil(t, err)
fmt.Println(ttl)
}
func Test_HGetAll(t *testing.T) {
ctx := context.Background()
key := flag.String("key", "key", "key value")
flag.Parse()
ttl, err := DB.GetClient().TTL(ctx, *key).Result()
assert.Nil(t, err)
fmt.Println(ttl)
}
func Test_NewSetMessageToCache(t *testing.T) {
var msg pbChat.MsgDataToMQ
m := make(map[string]bool)
var offlinePush common.OfflinePushInfo
offlinePush.Title = "3"
offlinePush.Ex = "34"
offlinePush.IOSPushSound = "+1"
offlinePush.IOSBadgeCount = true
m[constant.IsPersistent] = true
m[constant.IsHistory] = true
var data common.MsgData
uid := "test_uid"
data.Seq = 11
data.ClientMsgID = "23jwhjsdf"
data.SendID = "111"
data.RecvID = "222"
data.Content = []byte{1, 2, 3, 4, 5, 6, 7}
data.Seq = 1212
data.Options = m
data.OfflinePushInfo = &offlinePush
data.AtUserIDList = []string{"1212", "23232"}
msg.MsgData = &data
messageList := []*pbChat.MsgDataToMQ{&msg}
err, _ := DB.SetMessageToCache(messageList, uid, "cacheTest")
assert.Nil(t, err)
}
func Test_NewGetMessagesBySeq(t *testing.T) {
var msg pbChat.MsgDataToMQ
var data common.MsgData
uid := "test_uid"
data.Seq = 11
data.ClientMsgID = "23jwhjsdf"
msg.MsgData = &data
seqMsg, failedSeqList, err := DB.GetMessagesBySeq(uid, []uint32{1212}, "cacheTest")
assert.Nil(t, err)
fmt.Println(seqMsg, failedSeqList)
}
func Test_SetFcmToken(t *testing.T) {
uid := "test_uid"
token := "dfnWBtOjSj-XIZnUvDlegv:APA91bG09XTtiXfpE6U7gUVMOhnKcUkNCv4WHn0UZr2clUi-tS1jEH-HiCEW8GIAhjLIGcfUJ6NIKteC023ZxDH7J0PJ5sTxoup3fHDUPLU7KgQoZS4tPyFqCbZ6bRB7esDPEnD1n_s0"
platformID := 2
err := DB.SetFcmToken(uid, platformID, token, 0)
assert.Nil(t, err)
}
func Test_GetFcmToken(t *testing.T) {
uid := "test_uid"
platformID := 2
token, err := DB.GetFcmToken(uid, platformID)
assert.Nil(t, err)
fmt.Println("token is :", token)
}
+1 -1
View File
@@ -1,7 +1,7 @@
package cache
import (
"Open_IM/pkg/utils"
"OpenIM/pkg/utils"
"context"
"encoding/json"
"github.com/dtm-labs/rockscache"
+3 -3
View File
@@ -1,9 +1,9 @@
package cache
import (
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/tokenverify"
"Open_IM/pkg/utils"
"OpenIM/pkg/common/constant"
"OpenIM/pkg/common/tokenverify"
"OpenIM/pkg/utils"
"context"
"github.com/golang-jwt/jwt/v4"
)
+4 -4
View File
@@ -1,10 +1,10 @@
package cache
import (
"Open_IM/pkg/common/db/relation"
relationTb "Open_IM/pkg/common/db/table/relation"
"Open_IM/pkg/common/tracelog"
"Open_IM/pkg/utils"
"OpenIM/pkg/common/db/relation"
relationTb "OpenIM/pkg/common/db/table/relation"
"OpenIM/pkg/common/tracelog"
"OpenIM/pkg/utils"
"context"
"encoding/json"
"github.com/dtm-labs/rockscache"
+1 -1
View File
@@ -1,7 +1,7 @@
package controller
import (
"Open_IM/pkg/common/db/cache"
"OpenIM/pkg/common/db/cache"
"context"
"github.com/go-redis/redis/v8"
)
+2 -2
View File
@@ -1,8 +1,8 @@
package controller
import (
"Open_IM/pkg/common/db/table/relation"
"Open_IM/pkg/utils"
"OpenIM/pkg/common/db/table/relation"
"OpenIM/pkg/utils"
"context"
"errors"
"gorm.io/gorm"
+8 -31
View File
@@ -1,50 +1,27 @@
package controller
import (
"Open_IM/pkg/common/db/relation"
relationTb "Open_IM/pkg/common/db/table/relation"
pbMsg "Open_IM/pkg/proto/msg"
"gorm.io/gorm"
relationTb "OpenIM/pkg/common/db/table/relation"
pbMsg "OpenIM/pkg/proto/msg"
)
type ChatLogInterface interface {
type ChatLogDatabase interface {
CreateChatLog(msg pbMsg.MsgDataToMQ) error
GetChatLog(chatLog *relationTb.ChatLogModel, pageNumber, showNumber int32, contentTypeList []int32) (int64, []relationTb.ChatLogModel, error)
}
func NewChatLogController(db *gorm.DB) ChatLogInterface {
return &ChatLogController{database: NewChatLogDataBase(db)}
}
type ChatLogController struct {
database ChatLogDataBaseInterface
}
func (c *ChatLogController) CreateChatLog(msg pbMsg.MsgDataToMQ) error {
return c.database.CreateChatLog(msg)
}
func (c *ChatLogController) GetChatLog(chatLog *relationTb.ChatLogModel, pageNumber, showNumber int32, contentTypeList []int32) (int64, []relationTb.ChatLogModel, error) {
return c.database.GetChatLog(chatLog, pageNumber, showNumber, contentTypeList)
}
type ChatLogDataBaseInterface interface {
CreateChatLog(msg pbMsg.MsgDataToMQ) error
GetChatLog(chatLog *relationTb.ChatLogModel, pageNumber, showNumber int32, contentTypeList []int32) (int64, []relationTb.ChatLogModel, error)
func NewChatLogDatabase(chatLogModelInterface relationTb.ChatLogModelInterface) ChatLogDatabase {
return &ChatLogDataBase{chatLogModel: chatLogModelInterface}
}
type ChatLogDataBase struct {
chatLogDB relationTb.ChatLogModelInterface
}
func NewChatLogDataBase(db *gorm.DB) ChatLogDataBaseInterface {
return &ChatLogDataBase{chatLogDB: relation.NewChatLog(db)}
chatLogModel relationTb.ChatLogModelInterface
}
func (c *ChatLogDataBase) CreateChatLog(msg pbMsg.MsgDataToMQ) error {
return c.chatLogDB.Create(msg)
return c.chatLogModel.Create(msg)
}
func (c *ChatLogDataBase) GetChatLog(chatLog *relationTb.ChatLogModel, pageNumber, showNumber int32, contentTypeList []int32) (int64, []relationTb.ChatLogModel, error) {
return c.chatLogDB.GetChatLog(chatLog, pageNumber, showNumber, contentTypeList)
return c.chatLogModel.GetChatLog(chatLog, pageNumber, showNumber, contentTypeList)
}
+6 -6
View File
@@ -1,12 +1,12 @@
package controller
import (
"Open_IM/internal/tx"
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/db/cache"
"Open_IM/pkg/common/db/relation"
relationTb "Open_IM/pkg/common/db/table/relation"
"Open_IM/pkg/utils"
"OpenIM/pkg/common/constant"
"OpenIM/pkg/common/db/cache"
"OpenIM/pkg/common/db/relation"
relationTb "OpenIM/pkg/common/db/table/relation"
"OpenIM/pkg/common/db/tx"
"OpenIM/pkg/utils"
"context"
"encoding/json"
)
+2 -2
View File
@@ -1,8 +1,8 @@
package controller
import (
unRelationTb "Open_IM/pkg/common/db/table/unrelation"
"Open_IM/pkg/proto/sdkws"
unRelationTb "OpenIM/pkg/common/db/table/unrelation"
"OpenIM/pkg/proto/sdkws"
"context"
"github.com/go-redis/redis/v8"
"go.mongodb.org/mongo-driver/mongo"
+4 -4
View File
@@ -1,10 +1,10 @@
package controller
import (
"Open_IM/internal/tx"
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/db/table/relation"
"Open_IM/pkg/utils"
"OpenIM/pkg/common/constant"
"OpenIM/pkg/common/db/table/relation"
"OpenIM/pkg/common/db/tx"
"OpenIM/pkg/utils"
"context"
"errors"
"gorm.io/gorm"
+6 -6
View File
@@ -1,12 +1,12 @@
package controller
import (
"Open_IM/internal/tx"
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/db/cache"
relationTb "Open_IM/pkg/common/db/table/relation"
unRelationTb "Open_IM/pkg/common/db/table/unrelation"
"Open_IM/pkg/utils"
"OpenIM/pkg/common/constant"
"OpenIM/pkg/common/db/cache"
relationTb "OpenIM/pkg/common/db/table/relation"
unRelationTb "OpenIM/pkg/common/db/table/unrelation"
"OpenIM/pkg/common/db/tx"
"OpenIM/pkg/utils"
"context"
"fmt"
"github.com/dtm-labs/rockscache"
+84 -84
View File
@@ -1,21 +1,21 @@
package controller
import (
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/db/cache"
unRelationTb "Open_IM/pkg/common/db/table/unrelation"
"Open_IM/pkg/common/db/unrelation"
"Open_IM/pkg/common/log"
"Open_IM/pkg/common/prome"
"Open_IM/pkg/common/tracelog"
"OpenIM/pkg/common/constant"
"OpenIM/pkg/common/db/cache"
unRelationTb "OpenIM/pkg/common/db/table/unrelation"
"OpenIM/pkg/common/db/unrelation"
"OpenIM/pkg/common/log"
"OpenIM/pkg/common/prome"
"OpenIM/pkg/common/tracelog"
"fmt"
"github.com/gogo/protobuf/sortkeys"
"sync"
"time"
pbMsg "Open_IM/pkg/proto/msg"
"Open_IM/pkg/proto/sdkws"
"Open_IM/pkg/utils"
pbMsg "OpenIM/pkg/proto/msg"
"OpenIM/pkg/proto/sdkws"
"OpenIM/pkg/utils"
"context"
"errors"
"github.com/go-redis/redis/v8"
@@ -24,7 +24,7 @@ import (
"github.com/golang/protobuf/proto"
)
type MsgDatabaseInterface interface {
type MsgDatabase interface {
// 批量插入消息
BatchInsertChat2DB(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ, currentMaxSeq int64) error
// 刪除redis中消息缓存
@@ -50,6 +50,8 @@ type MsgDatabaseInterface interface {
GetSuperGroupMinMaxSeqInMongoAndCache(ctx context.Context, groupID string) (minSeqMongo, maxSeqMongo, maxSeqCache int64, err error)
// 设置群用户最小seq 直接调用cache
SetGroupUserMinSeq(ctx context.Context, groupID, userID string, minSeq int64) (err error)
GetGroupUserMinSeq(ctx context.Context, groupID, userID string) (int64, error)
// 设置用户最小seq 直接调用cache
SetUserMinSeq(ctx context.Context, userID string, minSeq int64) (err error)
@@ -71,22 +73,21 @@ type MsgDatabaseInterface interface {
GetUserMinSeq(ctx context.Context, userID string) (int64, error)
GetGroupMaxSeq(ctx context.Context, groupID string) (int64, error)
GetGroupMinSeq(ctx context.Context, groupID string) (int64, error)
GetMessageListBySeq(ctx context.Context, userID string, seqs []int64) ([]*sdkws.MsgData, error)
}
func NewMsgDatabase(mgo *mongo.Client, rdb redis.UniversalClient) MsgDatabaseInterface {
return &MsgDatabase{}
func NewMsgDatabase(mgo *mongo.Client, rdb redis.UniversalClient) MsgDatabase {
return &msgDatabase{}
}
type MsgDatabase struct {
type msgDatabase struct {
mgo unRelationTb.MsgDocModelInterface
cache cache.MsgCache
cache cache.Cache
msg unRelationTb.MsgDocModel
ExtendMsg unRelationTb.ExtendMsgSetModelInterface
rdb redis.Client
}
func (db *MsgDatabase) reactionExtensionList(reactionExtensionList map[string]*sdkws.KeyValue) map[string]unRelationTb.KeyValueModel {
func (db *msgDatabase) reactionExtensionList(reactionExtensionList map[string]*sdkws.KeyValue) map[string]unRelationTb.KeyValueModel {
r := make(map[string]unRelationTb.KeyValueModel)
for key, value := range reactionExtensionList {
r[key] = unRelationTb.KeyValueModel{
@@ -98,35 +99,35 @@ func (db *MsgDatabase) reactionExtensionList(reactionExtensionList map[string]*s
return r
}
func (db *MsgDatabase) JudgeMessageReactionEXISTS(ctx context.Context, clientMsgID string, sessionType int32) (bool, error) {
func (db *msgDatabase) JudgeMessageReactionEXISTS(ctx context.Context, clientMsgID string, sessionType int32) (bool, error) {
return db.cache.JudgeMessageReactionEXISTS(ctx, clientMsgID, sessionType)
}
func (db *MsgDatabase) SetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey, value string) error {
func (db *msgDatabase) SetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey, value string) error {
return db.cache.SetMessageTypeKeyValue(ctx, clientMsgID, sessionType, typeKey, value)
}
func (db *MsgDatabase) SetMessageReactionExpire(ctx context.Context, clientMsgID string, sessionType int32, expiration time.Duration) (bool, error) {
func (db *msgDatabase) SetMessageReactionExpire(ctx context.Context, clientMsgID string, sessionType int32, expiration time.Duration) (bool, error) {
return db.cache.SetMessageReactionExpire(ctx, clientMsgID, sessionType, expiration)
}
func (db *MsgDatabase) GetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey string) (string, error) {
func (db *msgDatabase) GetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey string) (string, error) {
return db.cache.GetMessageTypeKeyValue(ctx, clientMsgID, sessionType, typeKey)
}
func (db *MsgDatabase) GetOneMessageAllReactionList(ctx context.Context, clientMsgID string, sessionType int32) (map[string]string, error) {
func (db *msgDatabase) GetOneMessageAllReactionList(ctx context.Context, clientMsgID string, sessionType int32) (map[string]string, error) {
return db.cache.GetOneMessageAllReactionList(ctx, clientMsgID, sessionType)
}
func (db *MsgDatabase) DeleteOneMessageKey(ctx context.Context, clientMsgID string, sessionType int32, subKey string) error {
func (db *msgDatabase) DeleteOneMessageKey(ctx context.Context, clientMsgID string, sessionType int32, subKey string) error {
return db.cache.DeleteOneMessageKey(ctx, clientMsgID, sessionType, subKey)
}
func (db *MsgDatabase) InsertOrUpdateReactionExtendMsgSet(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, msgFirstModifyTime int64, reactionExtensionList map[string]*sdkws.KeyValue) error {
func (db *msgDatabase) InsertOrUpdateReactionExtendMsgSet(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, msgFirstModifyTime int64, reactionExtensionList map[string]*sdkws.KeyValue) error {
return db.ExtendMsg.InsertOrUpdateReactionExtendMsgSet(ctx, sourceID, sessionType, clientMsgID, msgFirstModifyTime, db.reactionExtensionList(reactionExtensionList))
}
func (db *MsgDatabase) GetExtendMsg(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, maxMsgUpdateTime int64) (*pbMsg.ExtendMsg, error) {
func (db *msgDatabase) GetExtendMsg(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, maxMsgUpdateTime int64) (*pbMsg.ExtendMsg, error) {
extendMsgSet, err := db.ExtendMsg.GetExtendMsgSet(ctx, sourceID, sessionType, maxMsgUpdateTime)
if err != nil {
return nil, err
@@ -154,45 +155,40 @@ func (db *MsgDatabase) GetExtendMsg(ctx context.Context, sourceID string, sessio
}, nil
}
func (db *MsgDatabase) DeleteReactionExtendMsgSet(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, msgFirstModifyTime int64, reactionExtensionList map[string]*sdkws.KeyValue) error {
func (db *msgDatabase) DeleteReactionExtendMsgSet(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, msgFirstModifyTime int64, reactionExtensionList map[string]*sdkws.KeyValue) error {
return db.ExtendMsg.DeleteReactionExtendMsgSet(ctx, sourceID, sessionType, clientMsgID, msgFirstModifyTime, db.reactionExtensionList(reactionExtensionList))
}
func (db *MsgDatabase) SetSendMsgStatus(ctx context.Context, id string, status int32) error {
func (db *msgDatabase) SetSendMsgStatus(ctx context.Context, id string, status int32) error {
return db.cache.SetSendMsgStatus(ctx, id, status)
}
func (db *MsgDatabase) GetSendMsgStatus(ctx context.Context, id string) (int32, error) {
func (db *msgDatabase) GetSendMsgStatus(ctx context.Context, id string) (int32, error) {
return db.cache.GetSendMsgStatus(ctx, id)
}
func (db *MsgDatabase) MsgToMQ(ctx context.Context, key string, mq *pbMsg.MsgDataToMQ) error {
func (db *msgDatabase) MsgToMQ(ctx context.Context, key string, mq *pbMsg.MsgDataToMQ) error {
//TODO implement me
panic("implement me")
}
func (db *MsgDatabase) GetUserMaxSeq(ctx context.Context, userID string) (int64, error) {
func (db *msgDatabase) GetUserMaxSeq(ctx context.Context, userID string) (int64, error) {
return db.cache.GetUserMaxSeq(ctx, userID)
}
func (db *MsgDatabase) GetUserMinSeq(ctx context.Context, userID string) (int64, error) {
func (db *msgDatabase) GetUserMinSeq(ctx context.Context, userID string) (int64, error) {
return db.cache.GetUserMinSeq(ctx, userID)
}
func (db *MsgDatabase) GetGroupMaxSeq(ctx context.Context, groupID string) (int64, error) {
func (db *msgDatabase) GetGroupMaxSeq(ctx context.Context, groupID string) (int64, error) {
return db.cache.GetGroupMaxSeq(ctx, groupID)
}
func (db *MsgDatabase) GetGroupMinSeq(ctx context.Context, groupID string) (int64, error) {
func (db *msgDatabase) GetGroupMinSeq(ctx context.Context, groupID string) (int64, error) {
return db.cache.GetGroupMinSeq(ctx, groupID)
}
func (db *MsgDatabase) GetMessageListBySeq(ctx context.Context, userID string, seqs []int64) ([]*sdkws.MsgData, error) {
seqMsg, _, err := db.cache.GetMessageListBySeq(ctx, userID, seqs)
return seqMsg, err
}
func (db *MsgDatabase) BatchInsertChat2DB(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ, currentMaxSeq int64) error {
func (db *msgDatabase) BatchInsertChat2DB(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ, currentMaxSeq int64) error {
//newTime := utils.GetCurrentTimestampByMill()
if int64(len(msgList)) > db.msg.GetSingleGocMsgNum() {
return errors.New("too large")
@@ -246,18 +242,18 @@ func (db *MsgDatabase) BatchInsertChat2DB(ctx context.Context, sourceID string,
doc.DocID = docID
doc.Msg = msgsToMongo
if err = db.mgo.Create(ctx, doc); err != nil {
prome.PromeInc(prome.MsgInsertMongoFailedCounter)
prome.Inc(prome.MsgInsertMongoFailedCounter)
//log.NewError(operationID, "InsertOne failed", filter, err.Error(), sChat)
return utils.Wrap(err, "")
}
prome.PromeInc(prome.MsgInsertMongoSuccessCounter)
prome.Inc(prome.MsgInsertMongoSuccessCounter)
} else {
prome.PromeInc(prome.MsgInsertMongoFailedCounter)
prome.Inc(prome.MsgInsertMongoFailedCounter)
//log.Error(operationID, "FindOneAndUpdate failed ", err.Error(), filter)
return utils.Wrap(err, "")
}
} else {
prome.PromeInc(prome.MsgInsertMongoSuccessCounter)
prome.Inc(prome.MsgInsertMongoSuccessCounter)
}
}
if docIDNext != "" {
@@ -266,21 +262,21 @@ func (db *MsgDatabase) BatchInsertChat2DB(ctx context.Context, sourceID string,
nextDoc.Msg = msgsToMongoNext
//log.NewDebug(operationID, "filter ", seqUidNext, "list ", msgListToMongoNext, "userID: ", userID)
if err = db.mgo.Create(ctx, nextDoc); err != nil {
prome.PromeInc(prome.MsgInsertMongoFailedCounter)
prome.Inc(prome.MsgInsertMongoFailedCounter)
//log.NewError(operationID, "InsertOne failed", filter, err.Error(), sChat)
return utils.Wrap(err, "")
}
prome.PromeInc(prome.MsgInsertMongoSuccessCounter)
prome.Inc(prome.MsgInsertMongoSuccessCounter)
}
//log.Debug(operationID, "batch mgo cost time ", mongo2.getCurrentTimestampByMill()-newTime, userID, len(msgList))
return nil
}
func (db *MsgDatabase) DeleteMessageFromCache(ctx context.Context, userID string, msgs []*pbMsg.MsgDataToMQ) error {
func (db *msgDatabase) DeleteMessageFromCache(ctx context.Context, userID string, msgs []*pbMsg.MsgDataToMQ) error {
return db.cache.DeleteMessageFromCache(ctx, userID, msgs)
}
func (db *MsgDatabase) BatchInsertChat2Cache(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ) (int64, error) {
func (db *msgDatabase) BatchInsertChat2Cache(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ) (int64, error) {
//newTime := utils.GetCurrentTimestampByMill()
lenList := len(msgList)
if int64(lenList) > db.msg.GetSingleGocMsgNum() {
@@ -300,10 +296,10 @@ func (db *MsgDatabase) BatchInsertChat2Cache(ctx context.Context, sourceID strin
//log.Debug(operationID, "constant.SingleChatType lastMaxSeq before add ", currentMaxSeq, "userID ", sourceID, err)
}
if err != nil && err != redis.Nil {
prome.PromeInc(prome.SeqGetFailedCounter)
prome.Inc(prome.SeqGetFailedCounter)
return 0, utils.Wrap(err, "")
}
prome.PromeInc(prome.SeqGetSuccessCounter)
prome.Inc(prome.SeqGetSuccessCounter)
lastMaxSeq := currentMaxSeq
for _, m := range msgList {
currentMaxSeq++
@@ -313,10 +309,10 @@ func (db *MsgDatabase) BatchInsertChat2Cache(ctx context.Context, sourceID strin
//log.Debug(operationID, "SetMessageToCache ", sourceID, len(msgList))
failedNum, err := db.cache.SetMessageToCache(ctx, sourceID, msgList)
if err != nil {
prome.PromeAdd(prome.MsgInsertRedisFailedCounter, failedNum)
prome.Add(prome.MsgInsertRedisFailedCounter, failedNum)
//log.Error(operationID, "setMessageToCache failed, continue ", err.Error(), len(msgList), sourceID)
} else {
prome.PromeInc(prome.MsgInsertRedisSuccessCounter)
prome.Inc(prome.MsgInsertRedisSuccessCounter)
}
//log.Debug(operationID, "batch to redis cost time ", mongo2.getCurrentTimestampByMill()-newTime, sourceID, len(msgList))
if msgList[0].MsgData.SessionType == constant.SuperGroupChatType {
@@ -325,14 +321,14 @@ func (db *MsgDatabase) BatchInsertChat2Cache(ctx context.Context, sourceID strin
err = db.cache.SetUserMaxSeq(ctx, sourceID, currentMaxSeq)
}
if err != nil {
prome.PromeInc(prome.SeqSetFailedCounter)
prome.Inc(prome.SeqSetFailedCounter)
} else {
prome.PromeInc(prome.SeqSetSuccessCounter)
prome.Inc(prome.SeqSetSuccessCounter)
}
return lastMaxSeq, utils.Wrap(err, "")
}
func (db *MsgDatabase) DelMsgBySeqs(ctx context.Context, userID string, seqs []int64) (totalUnExistSeqs []int64, err error) {
func (db *msgDatabase) DelMsgBySeqs(ctx context.Context, userID string, seqs []int64) (totalUnExistSeqs []int64, err error) {
sortkeys.Int64s(seqs)
docIDSeqsMap := db.msg.GetDocIDSeqsMap(userID, seqs)
lock := sync.Mutex{}
@@ -353,7 +349,7 @@ func (db *MsgDatabase) DelMsgBySeqs(ctx context.Context, userID string, seqs []i
return totalUnExistSeqs, nil
}
func (db *MsgDatabase) DelMsgBySeqsInOneDoc(ctx context.Context, docID string, seqs []int64) (unExistSeqs []int64, err error) {
func (db *msgDatabase) DelMsgBySeqsInOneDoc(ctx context.Context, docID string, seqs []int64) (unExistSeqs []int64, err error) {
seqMsgs, indexes, unExistSeqs, err := db.GetMsgAndIndexBySeqsInOneDoc(ctx, docID, seqs)
if err != nil {
return nil, err
@@ -366,7 +362,7 @@ func (db *MsgDatabase) DelMsgBySeqsInOneDoc(ctx context.Context, docID string, s
return unExistSeqs, nil
}
func (db *MsgDatabase) GetMsgAndIndexBySeqsInOneDoc(ctx context.Context, docID string, seqs []int64) (seqMsgs []*sdkws.MsgData, indexes []int, unExistSeqs []int64, err error) {
func (db *msgDatabase) GetMsgAndIndexBySeqsInOneDoc(ctx context.Context, docID string, seqs []int64) (seqMsgs []*sdkws.MsgData, indexes []int, unExistSeqs []int64, err error) {
doc, err := db.mgo.FindOneByDocID(ctx, docID)
if err != nil {
return nil, nil, nil, err
@@ -397,7 +393,7 @@ func (db *MsgDatabase) GetMsgAndIndexBySeqsInOneDoc(ctx context.Context, docID s
return seqMsgs, indexes, unExistSeqs, nil
}
func (db *MsgDatabase) GetNewestMsg(ctx context.Context, sourceID string) (msgPb *sdkws.MsgData, err error) {
func (db *msgDatabase) GetNewestMsg(ctx context.Context, sourceID string) (msgPb *sdkws.MsgData, err error) {
msgInfo, err := db.mgo.GetNewestMsg(ctx, sourceID)
if err != nil {
return nil, err
@@ -405,7 +401,7 @@ func (db *MsgDatabase) GetNewestMsg(ctx context.Context, sourceID string) (msgPb
return db.unmarshalMsg(msgInfo)
}
func (db *MsgDatabase) GetOldestMsg(ctx context.Context, sourceID string) (msgPb *sdkws.MsgData, err error) {
func (db *msgDatabase) GetOldestMsg(ctx context.Context, sourceID string) (msgPb *sdkws.MsgData, err error) {
msgInfo, err := db.mgo.GetOldestMsg(ctx, sourceID)
if err != nil {
return nil, err
@@ -413,7 +409,7 @@ func (db *MsgDatabase) GetOldestMsg(ctx context.Context, sourceID string) (msgPb
return db.unmarshalMsg(msgInfo)
}
func (db *MsgDatabase) unmarshalMsg(msgInfo *unRelationTb.MsgInfoModel) (msgPb *sdkws.MsgData, err error) {
func (db *msgDatabase) unmarshalMsg(msgInfo *unRelationTb.MsgInfoModel) (msgPb *sdkws.MsgData, err error) {
msgPb = &sdkws.MsgData{}
err = proto.Unmarshal(msgInfo.Msg, msgPb)
if err != nil {
@@ -422,7 +418,7 @@ func (db *MsgDatabase) unmarshalMsg(msgInfo *unRelationTb.MsgInfoModel) (msgPb *
return msgPb, nil
}
func (db *MsgDatabase) getMsgBySeqs(ctx context.Context, sourceID string, seqs []int64, diffusionType int) (seqMsg []*sdkws.MsgData, err error) {
func (db *msgDatabase) getMsgBySeqs(ctx context.Context, sourceID string, seqs []int64, diffusionType int) (seqMsgs []*sdkws.MsgData, err error) {
var hasSeqs []int64
singleCount := 0
m := db.msg.GetDocIDSeqsMap(sourceID, seqs)
@@ -440,7 +436,7 @@ func (db *MsgDatabase) getMsgBySeqs(ctx context.Context, sourceID string, seqs [
return nil, err
}
if utils.Contain(msgPb.Seq, value...) {
seqMsg = append(seqMsg, msgPb)
seqMsgs = append(seqMsgs, msgPb)
hasSeqs = append(hasSeqs, msgPb.Seq)
singleCount++
if singleCount == len(value) {
@@ -458,54 +454,54 @@ func (db *MsgDatabase) getMsgBySeqs(ctx context.Context, sourceID string, seqs [
} else if diffusionType == constant.ReadDiffusion {
exceptionMsg = db.msg.GenExceptionSuperGroupMessageBySeqs(diff, sourceID)
}
seqMsg = append(seqMsg, exceptionMsg...)
seqMsgs = append(seqMsgs, exceptionMsg...)
}
return seqMsg, nil
return seqMsgs, nil
}
func (db *MsgDatabase) GetMsgBySeqs(ctx context.Context, userID string, seqs []int64) (seqMsg []*sdkws.MsgData, err error) {
successMsgs, failedSeqs, err := db.cache.GetMessageListBySeq(ctx, userID, seqs)
func (db *msgDatabase) GetMsgBySeqs(ctx context.Context, userID string, seqs []int64) (successMsgs []*sdkws.MsgData, err error) {
successMsgs, failedSeqs, err := db.cache.GetMessagesBySeq(ctx, userID, seqs)
if err != nil {
if err != redis.Nil {
prome.PromeAdd(prome.MsgPullFromRedisFailedCounter, len(failedSeqs))
prome.Add(prome.MsgPullFromRedisFailedCounter, len(failedSeqs))
log.Error(tracelog.GetOperationID(ctx), "get message from redis exception", err.Error(), failedSeqs)
}
}
prome.PromeAdd(prome.MsgPullFromRedisSuccessCounter, len(successMsgs))
prome.Add(prome.MsgPullFromRedisSuccessCounter, len(successMsgs))
if len(failedSeqs) > 0 {
mongoMsgs, err := db.getMsgBySeqs(ctx, userID, seqs, constant.WriteDiffusion)
if err != nil {
prome.PromeAdd(prome.MsgPullFromMongoFailedCounter, len(failedSeqs))
prome.Add(prome.MsgPullFromMongoFailedCounter, len(failedSeqs))
return nil, err
}
prome.PromeAdd(prome.MsgPullFromMongoSuccessCounter, len(mongoMsgs))
prome.Add(prome.MsgPullFromMongoSuccessCounter, len(mongoMsgs))
successMsgs = append(successMsgs, mongoMsgs...)
}
return successMsgs, nil
}
func (db *MsgDatabase) GetSuperGroupMsgBySeqs(ctx context.Context, groupID string, seqs []int64) (seqMsg []*sdkws.MsgData, err error) {
successMsgs, failedSeqs, err := db.cache.GetMessageListBySeq(ctx, groupID, seqs)
func (db *msgDatabase) GetSuperGroupMsgBySeqs(ctx context.Context, groupID string, seqs []int64) (successMsgs []*sdkws.MsgData, err error) {
successMsgs, failedSeqs, err := db.cache.GetMessagesBySeq(ctx, groupID, seqs)
if err != nil {
if err != redis.Nil {
prome.PromeAdd(prome.MsgPullFromRedisFailedCounter, len(failedSeqs))
prome.Add(prome.MsgPullFromRedisFailedCounter, len(failedSeqs))
log.Error(tracelog.GetOperationID(ctx), "get message from redis exception", err.Error(), failedSeqs)
}
}
prome.PromeAdd(prome.MsgPullFromRedisSuccessCounter, len(successMsgs))
prome.Add(prome.MsgPullFromRedisSuccessCounter, len(successMsgs))
if len(failedSeqs) > 0 {
mongoMsgs, err := db.getMsgBySeqs(ctx, groupID, seqs, constant.ReadDiffusion)
if err != nil {
prome.PromeAdd(prome.MsgPullFromMongoFailedCounter, len(failedSeqs))
prome.Add(prome.MsgPullFromMongoFailedCounter, len(failedSeqs))
return nil, err
}
prome.PromeAdd(prome.MsgPullFromMongoSuccessCounter, len(mongoMsgs))
prome.Add(prome.MsgPullFromMongoSuccessCounter, len(mongoMsgs))
successMsgs = append(successMsgs, mongoMsgs...)
}
return successMsgs, nil
}
func (db *MsgDatabase) CleanUpUserMsg(ctx context.Context, userID string) error {
func (db *msgDatabase) CleanUpUserMsg(ctx context.Context, userID string) error {
err := db.DeleteUserMsgsAndSetMinSeq(ctx, userID, 0)
if err != nil {
return err
@@ -514,7 +510,7 @@ func (db *MsgDatabase) CleanUpUserMsg(ctx context.Context, userID string) error
return utils.Wrap(err, "")
}
func (db *MsgDatabase) DeleteUserSuperGroupMsgsAndSetMinSeq(ctx context.Context, groupID string, userIDs []string, remainTime int64) error {
func (db *msgDatabase) DeleteUserSuperGroupMsgsAndSetMinSeq(ctx context.Context, groupID string, userIDs []string, remainTime int64) error {
var delStruct delMsgRecursionStruct
minSeq, err := db.deleteMsgRecursion(ctx, groupID, unRelationTb.OldestList, &delStruct, remainTime)
if err != nil {
@@ -542,7 +538,7 @@ func (db *MsgDatabase) DeleteUserSuperGroupMsgsAndSetMinSeq(ctx context.Context,
return nil
}
func (db *MsgDatabase) DeleteUserMsgsAndSetMinSeq(ctx context.Context, userID string, remainTime int64) error {
func (db *msgDatabase) DeleteUserMsgsAndSetMinSeq(ctx context.Context, userID string, remainTime int64) error {
var delStruct delMsgRecursionStruct
minSeq, err := db.deleteMsgRecursion(ctx, userID, unRelationTb.OldestList, &delStruct, remainTime)
if err != nil {
@@ -568,7 +564,7 @@ func (d *delMsgRecursionStruct) getSetMinSeq() int64 {
// seq 70
// set minSeq 21
// recursion 删除list并且返回设置的最小seq
func (db *MsgDatabase) deleteMsgRecursion(ctx context.Context, sourceID string, index int64, delStruct *delMsgRecursionStruct, remainTime int64) (int64, error) {
func (db *msgDatabase) deleteMsgRecursion(ctx context.Context, sourceID string, index int64, delStruct *delMsgRecursionStruct, remainTime int64) (int64, error) {
// find from oldest list
msgs, err := db.mgo.GetMsgsByIndex(ctx, sourceID, index)
if err != nil || msgs.DocID == "" {
@@ -633,7 +629,7 @@ func (db *MsgDatabase) deleteMsgRecursion(ctx context.Context, sourceID string,
return seq, utils.Wrap(err, "deleteMsg failed")
}
func (db *MsgDatabase) GetUserMinMaxSeqInMongoAndCache(ctx context.Context, userID string) (minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache int64, err error) {
func (db *msgDatabase) GetUserMinMaxSeqInMongoAndCache(ctx context.Context, userID string) (minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache int64, err error) {
minSeqMongo, maxSeqMongo, err = db.GetMinMaxSeqMongo(ctx, userID)
if err != nil {
return 0, 0, 0, 0, err
@@ -650,7 +646,7 @@ func (db *MsgDatabase) GetUserMinMaxSeqInMongoAndCache(ctx context.Context, user
return
}
func (db *MsgDatabase) GetSuperGroupMinMaxSeqInMongoAndCache(ctx context.Context, groupID string) (minSeqMongo, maxSeqMongo, maxSeqCache int64, err error) {
func (db *msgDatabase) GetSuperGroupMinMaxSeqInMongoAndCache(ctx context.Context, groupID string) (minSeqMongo, maxSeqMongo, maxSeqCache int64, err error) {
minSeqMongo, maxSeqMongo, err = db.GetMinMaxSeqMongo(ctx, groupID)
if err != nil {
return 0, 0, 0, err
@@ -662,7 +658,7 @@ func (db *MsgDatabase) GetSuperGroupMinMaxSeqInMongoAndCache(ctx context.Context
return
}
func (db *MsgDatabase) GetMinMaxSeqMongo(ctx context.Context, sourceID string) (minSeqMongo, maxSeqMongo int64, err error) {
func (db *msgDatabase) GetMinMaxSeqMongo(ctx context.Context, sourceID string) (minSeqMongo, maxSeqMongo int64, err error) {
oldestMsgMongo, err := db.mgo.GetOldestMsg(ctx, sourceID)
if err != nil {
return 0, 0, err
@@ -684,10 +680,14 @@ func (db *MsgDatabase) GetMinMaxSeqMongo(ctx context.Context, sourceID string) (
return
}
func (db *MsgDatabase) SetGroupUserMinSeq(ctx context.Context, groupID, userID string, minSeq int64) (err error) {
func (db *msgDatabase) SetGroupUserMinSeq(ctx context.Context, groupID, userID string, minSeq int64) (err error) {
return db.cache.SetGroupUserMinSeq(ctx, groupID, userID, minSeq)
}
func (db *MsgDatabase) SetUserMinSeq(ctx context.Context, userID string, minSeq int64) (err error) {
func (db *msgDatabase) SetUserMinSeq(ctx context.Context, userID string, minSeq int64) (err error) {
return db.cache.SetUserMinSeq(ctx, userID, minSeq)
}
func (db *msgDatabase) GetGroupUserMinSeq(ctx context.Context, groupID, userID string) (int64, error) {
return db.cache.GetGroupUserMinSeq(ctx, groupID, userID)
}
+2 -2
View File
@@ -1,7 +1,7 @@
package controller
import (
"Open_IM/pkg/common/db/cache"
"OpenIM/pkg/common/db/cache"
"context"
)
@@ -10,7 +10,7 @@ type PushInterface interface {
}
type PushDataBase struct {
cache cache.MsgCache
cache cache.Cache
}
func (p *PushDataBase) DelFcmToken(ctx context.Context, userID string, platformID int) error {
+3 -3
View File
@@ -1,9 +1,9 @@
package controller
import (
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/db/table/relation"
"Open_IM/pkg/utils"
"OpenIM/pkg/common/constant"
"OpenIM/pkg/common/db/table/relation"
"OpenIM/pkg/utils"
"context"
)
+3 -3
View File
@@ -1,9 +1,9 @@
package localcache
import (
"Open_IM/pkg/common/config"
"Open_IM/pkg/discoveryregistry"
"Open_IM/pkg/proto/conversation"
"OpenIM/pkg/common/config"
"OpenIM/pkg/discoveryregistry"
"OpenIM/pkg/proto/conversation"
"context"
"sync"
)
+4 -4
View File
@@ -1,10 +1,10 @@
package localcache
import (
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant"
"Open_IM/pkg/discoveryregistry"
"Open_IM/pkg/proto/group"
"OpenIM/pkg/common/config"
"OpenIM/pkg/common/constant"
"OpenIM/pkg/discoveryregistry"
"OpenIM/pkg/proto/group"
"context"
"sync"
)
+3 -3
View File
@@ -1,9 +1,9 @@
package relation
import (
"Open_IM/pkg/common/db/table/relation"
"Open_IM/pkg/common/tracelog"
"Open_IM/pkg/utils"
"OpenIM/pkg/common/db/table/relation"
"OpenIM/pkg/common/tracelog"
"OpenIM/pkg/utils"
"context"
"gorm.io/gorm"
)
+6 -6
View File
@@ -1,11 +1,11 @@
package relation
import (
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/db/table/relation"
pbMsg "Open_IM/pkg/proto/msg"
sdkws "Open_IM/pkg/proto/sdkws"
"Open_IM/pkg/utils"
"OpenIM/pkg/common/constant"
"OpenIM/pkg/common/db/table/relation"
pbMsg "OpenIM/pkg/proto/msg"
sdkws "OpenIM/pkg/proto/sdkws"
"OpenIM/pkg/utils"
"fmt"
"github.com/golang/protobuf/jsonpb"
"github.com/golang/protobuf/proto"
@@ -17,7 +17,7 @@ type ChatLogGorm struct {
DB *gorm.DB
}
func NewChatLog(db *gorm.DB) *ChatLogGorm {
func NewChatLogGorm(db *gorm.DB) *ChatLogGorm {
return &ChatLogGorm{DB: db}
}
+4 -4
View File
@@ -1,10 +1,10 @@
package relation
import (
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/db/table/relation"
"Open_IM/pkg/common/tracelog"
"Open_IM/pkg/utils"
"OpenIM/pkg/common/constant"
"OpenIM/pkg/common/db/table/relation"
"OpenIM/pkg/common/tracelog"
"OpenIM/pkg/utils"
"context"
"gorm.io/gorm"
)
+3 -3
View File
@@ -1,9 +1,9 @@
package relation
import (
"Open_IM/pkg/common/db/table/relation"
"Open_IM/pkg/common/tracelog"
"Open_IM/pkg/utils"
"OpenIM/pkg/common/db/table/relation"
"OpenIM/pkg/common/tracelog"
"OpenIM/pkg/utils"
"context"
"gorm.io/gorm"
)
@@ -1,9 +1,9 @@
package relation
import (
"Open_IM/pkg/common/db/table/relation"
"Open_IM/pkg/common/tracelog"
"Open_IM/pkg/utils"
"OpenIM/pkg/common/db/table/relation"
"OpenIM/pkg/common/tracelog"
"OpenIM/pkg/utils"
"context"
"gorm.io/gorm"
)
+4 -4
View File
@@ -1,10 +1,10 @@
package relation
import (
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/db/table/relation"
"Open_IM/pkg/common/tracelog"
"Open_IM/pkg/utils"
"OpenIM/pkg/common/constant"
"OpenIM/pkg/common/db/table/relation"
"OpenIM/pkg/common/tracelog"
"OpenIM/pkg/utils"
"context"
"gorm.io/gorm"
)
+3 -3
View File
@@ -1,9 +1,9 @@
package relation
import (
"Open_IM/pkg/common/db/table/relation"
"Open_IM/pkg/common/tracelog"
"Open_IM/pkg/utils"
"OpenIM/pkg/common/db/table/relation"
"OpenIM/pkg/common/tracelog"
"OpenIM/pkg/utils"
"context"
"gorm.io/gorm"
)
@@ -1,9 +1,9 @@
package relation
import (
"Open_IM/pkg/common/db/table/relation"
"Open_IM/pkg/common/tracelog"
"Open_IM/pkg/utils"
"OpenIM/pkg/common/db/table/relation"
"OpenIM/pkg/common/tracelog"
"OpenIM/pkg/utils"
"context"
"gorm.io/gorm"
)
+1 -1
View File
@@ -1,7 +1,7 @@
package relation
import (
"Open_IM/pkg/common/config"
"OpenIM/pkg/common/config"
"fmt"
"gorm.io/driver/mysql"
"time"
+3 -3
View File
@@ -1,9 +1,9 @@
package relation
import (
"Open_IM/pkg/common/db/table/relation"
"Open_IM/pkg/common/tracelog"
"Open_IM/pkg/utils"
"OpenIM/pkg/common/db/table/relation"
"OpenIM/pkg/common/tracelog"
"OpenIM/pkg/utils"
"context"
"gorm.io/gorm"
)
+1 -1
View File
@@ -1,7 +1,7 @@
package relation
import (
"Open_IM/pkg/utils"
"OpenIM/pkg/utils"
"gorm.io/gorm"
)
+1 -1
View File
@@ -4,7 +4,7 @@ import (
"gorm.io/gorm"
"time"
pbMsg "Open_IM/pkg/proto/msg"
pbMsg "OpenIM/pkg/proto/msg"
)
const (
@@ -1,30 +0,0 @@
package relation
import "time"
// these two is virtual table just for cms
type ActiveGroup struct {
Name string
ID string `gorm:"column:recv_id"`
MessageNum int `gorm:"column:message_num"`
}
type ActiveUser struct {
Name string
ID string `gorm:"column:send_id"`
MessageNum int `gorm:"column:message_num"`
}
type StatisticsInterface interface {
GetActiveUserNum(from, to time.Time) (num int64, err error)
GetIncreaseUserNum(from, to time.Time) (num int64, err error)
GetTotalUserNum() (num int64, err error)
GetTotalUserNumByDate(to time.Time) (num int64, err error)
GetSingleChatMessageNum(from, to time.Time) (num int64, err error)
GetGroupMessageNum(from, to time.Time) (num int64, err error)
GetIncreaseGroupNum(from, to time.Time) (num int64, err error)
GetTotalGroupNum() (num int64, err error)
GetGroupNum(to time.Time) (num int64, err error)
GetActiveGroups(from, to time.Time, limit int) ([]*ActiveGroup, error)
GetActiveUsers(from, to time.Time, limit int) (activeUsers []*ActiveUser, err error)
}
@@ -1,6 +1,7 @@
package unrelation
import (
"OpenIM/pkg/proto/sdkws"
"context"
"strconv"
"strings"
@@ -40,8 +41,8 @@ type ExtendMsgSetModelInterface interface {
GetAllExtendMsgSet(ctx context.Context, sourceID string, opts *GetAllExtendMsgSetOpts) (sets []*ExtendMsgSetModel, err error)
GetExtendMsgSet(ctx context.Context, sourceID string, sessionType int32, maxMsgUpdateTime int64) (*ExtendMsgSetModel, error)
InsertExtendMsg(ctx context.Context, sourceID string, sessionType int32, msg *ExtendMsgModel) error
InsertOrUpdateReactionExtendMsgSet(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, msgFirstModifyTime int64, reactionExtensionList map[string]KeyValueModel) error
DeleteReactionExtendMsgSet(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, msgFirstModifyTime int64, reactionExtensionList map[string]KeyValueModel) error
InsertOrUpdateReactionExtendMsgSet(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, msgFirstModifyTime int64, reactionExtensionList map[string]*sdkws.KeyValue) error
DeleteReactionExtendMsgSet(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, msgFirstModifyTime int64, reactionExtensionList map[string]*sdkws.KeyValue) error
TakeExtendMsg(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, maxMsgUpdateTime int64) (extendMsg *ExtendMsgModel, err error)
}
+2 -2
View File
@@ -1,8 +1,8 @@
package unrelation
import (
"Open_IM/pkg/common/constant"
"Open_IM/pkg/proto/sdkws"
"OpenIM/pkg/common/constant"
"OpenIM/pkg/proto/sdkws"
"context"
"strconv"
"strings"
+19
View File
@@ -0,0 +1,19 @@
package tx
import (
"gorm.io/gorm"
)
func NewGorm(db *gorm.DB) Tx {
return &_Gorm{tx: db}
}
type _Gorm struct {
tx *gorm.DB
}
func (g *_Gorm) Transaction(fn func(tx any) error) error {
return g.tx.Transaction(func(tx *gorm.DB) error {
return fn(tx)
})
}
+31
View File
@@ -0,0 +1,31 @@
package tx
import (
"OpenIM/pkg/utils"
"context"
"go.mongodb.org/mongo-driver/mongo"
)
func NewMongo(client *mongo.Client) CtxTx {
return &_Mongo{
client: client,
}
}
type _Mongo struct {
client *mongo.Client
}
func (m *_Mongo) Transaction(ctx context.Context, fn func(ctx context.Context) error) error {
sess, err := m.client.StartSession()
if err != nil {
return err
}
sCtx := mongo.NewSessionContext(ctx, sess)
defer sess.EndSession(sCtx)
if err := fn(sCtx); err != nil {
_ = sess.AbortTransaction(sCtx)
return err
}
return utils.Wrap(sess.CommitTransaction(sCtx), "")
}
+11
View File
@@ -0,0 +1,11 @@
package tx
import "context"
type Tx interface {
Transaction(fn func(tx any) error) error
}
type CtxTx interface {
Transaction(ctx context.Context, fn func(ctx context.Context) error) error
}
+3 -3
View File
@@ -1,9 +1,9 @@
package unrelation
import (
unRelationTb "Open_IM/pkg/common/db/table/unrelation"
"Open_IM/pkg/proto/sdkws"
"Open_IM/pkg/utils"
unRelationTb "OpenIM/pkg/common/db/table/unrelation"
"OpenIM/pkg/proto/sdkws"
"OpenIM/pkg/utils"
"context"
"errors"
"fmt"
+3 -3
View File
@@ -1,9 +1,9 @@
package unrelation
import (
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/db/table/unrelation"
"Open_IM/pkg/utils"
"OpenIM/pkg/common/config"
"OpenIM/pkg/common/db/table/unrelation"
"OpenIM/pkg/utils"
"context"
"fmt"
"go.mongodb.org/mongo-driver/mongo"
+3 -3
View File
@@ -1,9 +1,9 @@
package unrelation
import (
table "Open_IM/pkg/common/db/table/unrelation"
"Open_IM/pkg/proto/sdkws"
"Open_IM/pkg/utils"
table "OpenIM/pkg/common/db/table/unrelation"
"OpenIM/pkg/proto/sdkws"
"OpenIM/pkg/utils"
"context"
"errors"
"fmt"
+24 -36
View File
@@ -1,9 +1,9 @@
package unrelation
import (
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/db/table/unrelation"
"Open_IM/pkg/utils"
"OpenIM/pkg/common/config"
"OpenIM/pkg/common/db/table/unrelation"
"OpenIM/pkg/utils"
"context"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
@@ -57,7 +57,6 @@ func (s *SuperGroupMongoDriver) FindSuperGroup(ctx context.Context, groupIDs []s
return nil, err
}
defer cursor.Close(ctx)
if err := cursor.All(ctx, &groups); err != nil {
return nil, utils.Wrap(err, "")
}
@@ -65,43 +64,32 @@ func (s *SuperGroupMongoDriver) FindSuperGroup(ctx context.Context, groupIDs []s
}
func (s *SuperGroupMongoDriver) AddUserToSuperGroup(ctx context.Context, groupID string, userIDs []string) error {
opts := options.Session().SetDefaultReadConcern(readconcern.Majority())
return s.MgoDB.Client().UseSessionWithOptions(ctx, opts, func(sCtx mongo.SessionContext) error {
_, err := s.superGroupCollection.UpdateOne(sCtx, bson.M{"group_id": groupID}, bson.M{"$addToSet": bson.M{"member_id_list": bson.M{"$each": userIDs}}})
_, err := s.superGroupCollection.UpdateOne(ctx, bson.M{"group_id": groupID}, bson.M{"$addToSet": bson.M{"member_id_list": bson.M{"$each": userIDs}}})
if err != nil {
return err
}
upsert := true
opts := &options.UpdateOptions{
Upsert: &upsert,
}
for _, userID := range userIDs {
_, err = s.userToSuperGroupCollection.UpdateOne(ctx, bson.M{"user_id": userID}, bson.M{"$addToSet": bson.M{"group_id_list": groupID}}, opts)
if err != nil {
_ = sCtx.AbortTransaction(ctx)
return err
return utils.Wrap(err, "transaction failed")
}
upsert := true
opts := &options.UpdateOptions{
Upsert: &upsert,
}
for _, userID := range userIDs {
_, err = s.userToSuperGroupCollection.UpdateOne(sCtx, bson.M{"user_id": userID}, bson.M{"$addToSet": bson.M{"group_id_list": groupID}}, opts)
if err != nil {
_ = sCtx.AbortTransaction(ctx)
return utils.Wrap(err, "transaction failed")
}
}
return sCtx.CommitTransaction(ctx)
})
}
}
func (s *SuperGroupMongoDriver) RemoverUserFromSuperGroup(ctx context.Context, groupID string, userIDs []string) error {
opts := options.Session().SetDefaultReadConcern(readconcern.Majority())
return s.MgoDB.Client().UseSessionWithOptions(ctx, opts, func(sCtx mongo.SessionContext) error {
_, err := s.superGroupCollection.UpdateOne(sCtx, bson.M{"group_id": groupID}, bson.M{"$pull": bson.M{"member_id_list": bson.M{"$in": userIDs}}})
if err != nil {
_ = sCtx.AbortTransaction(ctx)
return err
}
err = s.RemoveGroupFromUser(sCtx, groupID, userIDs)
if err != nil {
_ = sCtx.AbortTransaction(ctx)
return err
}
return sCtx.CommitTransaction(ctx)
})
_, err := s.superGroupCollection.UpdateOne(ctx, bson.M{"group_id": groupID}, bson.M{"$pull": bson.M{"member_id_list": bson.M{"$in": userIDs}}})
if err != nil {
return err
}
err = s.RemoveGroupFromUser(ctx, groupID, userIDs)
if err != nil {
return err
}
return nil
}
func (s *SuperGroupMongoDriver) GetSuperGroupByUserID(ctx context.Context, userID string) (*unrelation.UserToSuperGroupModel, error) {
+3 -3
View File
@@ -7,9 +7,9 @@
package http
import (
"Open_IM/pkg/callbackstruct"
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant"
"OpenIM/pkg/callbackstruct"
"OpenIM/pkg/common/config"
"OpenIM/pkg/common/constant"
"bytes"
"encoding/json"
"io/ioutil"
+1 -1
View File
@@ -1,7 +1,7 @@
package kafka
import (
"Open_IM/pkg/common/config"
"OpenIM/pkg/common/config"
"sync"
"github.com/Shopify/sarama"
+5 -5
View File
@@ -1,15 +1,15 @@
package kafka
import (
"Open_IM/pkg/common/config"
log "Open_IM/pkg/common/log"
"Open_IM/pkg/utils"
"OpenIM/pkg/common/config"
log "OpenIM/pkg/common/log"
"OpenIM/pkg/utils"
"errors"
"github.com/Shopify/sarama"
"github.com/golang/protobuf/proto"
prome "Open_IM/pkg/common/prome"
prome "OpenIM/pkg/common/prome"
)
type Producer struct {
@@ -66,7 +66,7 @@ func (p *Producer) SendMessage(m proto.Message, key string, operationID string)
partition, offset, err := p.producer.SendMessage(kMsg)
log.Info(operationID, "ByteEncoder SendMessage end", "key ", kMsg.Key.Length(), kMsg.Value.Length(), p.producer)
if err == nil {
prome.PromeInc(prome.SendMsgCounter)
prome.Inc(prome.SendMsgCounter)
}
return partition, offset, utils.Wrap(err, "")
}
+1 -1
View File
@@ -7,7 +7,7 @@
package log
import (
"Open_IM/pkg/common/config"
"OpenIM/pkg/common/config"
"context"
"fmt"
elasticV7 "github.com/olivere/elastic/v7"
+1 -1
View File
@@ -7,7 +7,7 @@
package log
import (
"Open_IM/pkg/utils"
"OpenIM/pkg/utils"
"github.com/sirupsen/logrus"
"runtime"
"strings"
+2 -2
View File
@@ -1,8 +1,8 @@
package log
import (
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/tracelog"
"OpenIM/pkg/common/config"
"OpenIM/pkg/common/tracelog"
"bufio"
"context"
//"bufio"
+4 -4
View File
@@ -1,10 +1,10 @@
package middleware
import (
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/log"
"Open_IM/pkg/common/tracelog"
"Open_IM/pkg/utils"
"OpenIM/pkg/common/constant"
"OpenIM/pkg/common/log"
"OpenIM/pkg/common/tracelog"
"OpenIM/pkg/utils"
"context"
"fmt"
"google.golang.org/grpc"
+1 -1
View File
@@ -5,7 +5,7 @@ import (
"encoding/json"
"time"
"Open_IM/pkg/common/log"
"OpenIM/pkg/common/log"
"google.golang.org/grpc"
"google.golang.org/grpc/peer"
+10 -10
View File
@@ -1,7 +1,7 @@
package prome
import (
"Open_IM/pkg/common/config"
"OpenIM/pkg/common/config"
"bytes"
"net/http"
"strconv"
@@ -11,7 +11,7 @@ import (
"github.com/prometheus/client_golang/prometheus/promhttp"
)
func StartPromeSrv(prometheusPort int) error {
func StartPrometheusSrv(prometheusPort int) error {
if config.Config.Prometheus.Enable {
http.Handle("/metrics", promhttp.Handler())
err := http.ListenAndServe(":"+strconv.Itoa(prometheusPort), nil)
@@ -37,19 +37,19 @@ func (r responseBodyWriter) Write(b []byte) (int, error) {
return r.ResponseWriter.Write(b)
}
func PromeTheusMiddleware(c *gin.Context) {
PromeInc(ApiRequestCounter)
func PrometheusMiddleware(c *gin.Context) {
Inc(ApiRequestCounter)
w := &responseBodyWriter{body: &bytes.Buffer{}, ResponseWriter: c.Writer}
c.Writer = w
c.Next()
if c.Writer.Status() == http.StatusOK {
PromeInc(ApiRequestSuccessCounter)
Inc(ApiRequestSuccessCounter)
} else {
PromeInc(ApiRequestFailedCounter)
Inc(ApiRequestFailedCounter)
}
}
func PromeInc(counter prometheus.Counter) {
func Inc(counter prometheus.Counter) {
if config.Config.Prometheus.Enable {
if counter != nil {
counter.Inc()
@@ -57,7 +57,7 @@ func PromeInc(counter prometheus.Counter) {
}
}
func PromeAdd(counter prometheus.Counter, add int) {
func Add(counter prometheus.Counter, add int) {
if config.Config.Prometheus.Enable {
if counter != nil {
counter.Add(float64(add))
@@ -65,7 +65,7 @@ func PromeAdd(counter prometheus.Counter, add int) {
}
}
func PromeGaugeInc(gauges prometheus.Gauge) {
func GaugeInc(gauges prometheus.Gauge) {
if config.Config.Prometheus.Enable {
if gauges != nil {
gauges.Inc()
@@ -73,7 +73,7 @@ func PromeGaugeInc(gauges prometheus.Gauge) {
}
}
func PromeGaugeDec(gauges prometheus.Gauge) {
func GaugeDec(gauges prometheus.Gauge) {
if config.Config.Prometheus.Enable {
if gauges != nil {
gauges.Dec()
+4 -4
View File
@@ -1,10 +1,10 @@
package tokenverify
import (
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/tracelog"
"Open_IM/pkg/utils"
"OpenIM/pkg/common/config"
"OpenIM/pkg/common/constant"
"OpenIM/pkg/common/tracelog"
"OpenIM/pkg/utils"
"context"
"github.com/golang-jwt/jwt/v4"
"time"
+1 -1
View File
@@ -1,7 +1,7 @@
package tracelog
import (
"Open_IM/pkg/utils"
"OpenIM/pkg/utils"
"context"
"github.com/sirupsen/logrus"
"runtime"
+1 -1
View File
@@ -1,7 +1,7 @@
package errs
import (
"Open_IM/pkg/utils"
"OpenIM/pkg/utils"
"fmt"
"github.com/pkg/errors"
"strings"
+2 -2
View File
@@ -1,12 +1,12 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// source: auth/auth.proto
package pbAuth // import "Open_IM/pkg/proto/auth"
package pbAuth // import "OpenIM/pkg/proto/auth"
import proto "github.com/golang/protobuf/proto"
import fmt "fmt"
import math "math"
import _ "Open_IM/pkg/proto/sdkws"
import _ "OpenIM/pkg/proto/sdkws"
import (
context "golang.org/x/net/context"
+1 -1
View File
@@ -1,7 +1,7 @@
syntax = "proto3";
import "Open-IM-Server/pkg/proto/sdkws/ws.proto";
package pbAuth;
option go_package = "Open_IM/pkg/proto/auth;pbAuth";
option go_package = "OpenIM/pkg/proto/auth;pbAuth";
message userTokenReq {
+4 -4
View File
@@ -11,7 +11,7 @@ echo "proto file generate success"
j=0
for file in $(find ./Open_IM -name "*.go"); do # Not recommended, will break on whitespace
for file in $(find ./OpenIM -name "*.go"); do # Not recommended, will break on whitespace
filelist[j]=$file
j=`expr $j + 1`
done
@@ -19,10 +19,10 @@ done
for ((i = 0; i < ${#filelist[*]}; i++)); do
proto=${filelist[$i]}
cp $proto ${proto#*./Open_IM/pkg/proto/}
cp $proto ${proto#*./OpenIM/pkg/proto/}
done
rm Open_IM -rf
#find ./ -type f -path "*.pb.go"|xargs sed -i 's/\".\/sdkws\"/\"Open_IM\/pkg\/proto\/sdkws\"/g'
rm OpenIM -rf
#find ./ -type f -path "*.pb.go"|xargs sed -i 's/\".\/sdkws\"/\"OpenIM\/pkg\/proto\/sdkws\"/g'
+1 -1
View File
@@ -1,7 +1,7 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// source: conversation/conversation.proto
package conversation // import "Open_IM/pkg/proto/conversation"
package conversation // import "OpenIM/pkg/proto/conversation"
import proto "github.com/golang/protobuf/proto"
import fmt "fmt"
+1 -1
View File
@@ -1,5 +1,5 @@
syntax = "proto3";
option go_package = "Open_IM/pkg/proto/conversation;conversation";
option go_package = "OpenIM/pkg/proto/conversation;conversation";
package conversation;
message Conversation{
+1 -1
View File
@@ -1,5 +1,5 @@
syntax = "proto3";
option go_package = "Open_IM/pkg/proto/file;file";
option go_package = "OpenIM/pkg/proto/file;file";
package file;
+2 -2
View File
@@ -1,12 +1,12 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// source: friend/friend.proto
package friend // import "Open_IM/pkg/proto/friend"
package friend // import "OpenIM/pkg/proto/friend"
import proto "github.com/golang/protobuf/proto"
import fmt "fmt"
import math "math"
import sdkws "Open_IM/pkg/proto/sdkws"
import sdkws "OpenIM/pkg/proto/sdkws"
import (
context "golang.org/x/net/context"
+1 -1
View File
@@ -1,6 +1,6 @@
syntax = "proto3";
import "Open-IM-Server/pkg/proto/sdkws/ws.proto";
option go_package = "Open_IM/pkg/proto/friend;friend";
option go_package = "OpenIM/pkg/proto/friend;friend";
package friend;
message getPaginationFriendsReq{
+2 -2
View File
@@ -1,12 +1,12 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// source: group/group.proto
package group // import "Open_IM/pkg/proto/group"
package group // import "OpenIM/pkg/proto/group"
import proto "github.com/golang/protobuf/proto"
import fmt "fmt"
import math "math"
import sdkws "Open_IM/pkg/proto/sdkws"
import sdkws "OpenIM/pkg/proto/sdkws"
import wrapperspb "google.golang.org/protobuf/types/known/wrapperspb"
import (
+1 -1
View File
@@ -1,7 +1,7 @@
syntax = "proto3";
import "Open-IM-Server/pkg/proto/sdkws/ws.proto";
import "Open-IM-Server/pkg/proto/sdkws/wrappers.proto";
option go_package = "Open_IM/pkg/proto/group;group";
option go_package = "OpenIM/pkg/proto/group;group";
package group;
+2 -2
View File
@@ -1,12 +1,12 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// source: msg/msg.proto
package msg // import "Open_IM/pkg/proto/msg"
package msg // import "OpenIM/pkg/proto/msg"
import proto "github.com/golang/protobuf/proto"
import fmt "fmt"
import math "math"
import sdkws "Open_IM/pkg/proto/sdkws"
import sdkws "OpenIM/pkg/proto/sdkws"
import wrapperspb "google.golang.org/protobuf/types/known/wrapperspb"
import (
+1 -1
View File
@@ -1,7 +1,7 @@
syntax = "proto3";
import "Open-IM-Server/pkg/proto/sdkws/ws.proto";
import "Open-IM-Server/pkg/proto/sdkws/wrappers.proto";
option go_package = "Open_IM/pkg/proto/msg;msg";
option go_package = "OpenIM/pkg/proto/msg;msg";
package msg;
message MsgDataToMQ{
+2 -2
View File
@@ -1,12 +1,12 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// source: relay/relay.proto
package pbRelay // import "Open_IM/pkg/proto/relay"
package pbRelay // import "OpenIM/pkg/proto/relay"
import proto "github.com/golang/protobuf/proto"
import fmt "fmt"
import math "math"
import sdkws "Open_IM/pkg/proto/sdkws"
import sdkws "OpenIM/pkg/proto/sdkws"
import (
context "golang.org/x/net/context"
+1 -1
View File
@@ -1,6 +1,6 @@
syntax = "proto3";
import "Open-IM-Server/pkg/proto/sdkws/ws.proto";
option go_package = "Open_IM/pkg/proto/relay;pbRelay";
option go_package = "OpenIM/pkg/proto/relay;pbRelay";
package relay;
message OnlinePushMsgReq {
+2 -2
View File
@@ -1,12 +1,12 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// source: push/push.proto
package pbPush // import "Open_IM/pkg/proto/push"
package pbPush // import "OpenIM/pkg/proto/push"
import proto "github.com/golang/protobuf/proto"
import fmt "fmt"
import math "math"
import sdkws "Open_IM/pkg/proto/sdkws"
import sdkws "OpenIM/pkg/proto/sdkws"
import (
context "golang.org/x/net/context"
+1 -1
View File
@@ -1,6 +1,6 @@
syntax = "proto3";
import "Open-IM-Server/pkg/proto/sdkws/ws.proto";
option go_package = "Open_IM/pkg/proto/push;pbPush";
option go_package = "OpenIM/pkg/proto/push;pbPush";
package push;
message PushMsgReq {
+1 -1
View File
@@ -1,7 +1,7 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// source: rtc/rtc.proto
package rtc // import "Open_IM/pkg/proto/rtc"
package rtc // import "OpenIM/pkg/proto/rtc"
import proto "github.com/golang/protobuf/proto"
import fmt "fmt"
+1 -1
View File
@@ -1,5 +1,5 @@
syntax = "proto3";
option go_package = "Open_IM/pkg/proto/rtc;rtc";
option go_package = "OpenIM/pkg/proto/rtc;rtc";
package proto;
message CommonResp{
+1 -1
View File
@@ -1,7 +1,7 @@
syntax = "proto3";
import "Open-IM-Server/pkg/proto/sdkws/ws.proto";
import "Open-IM-Server/pkg/proto/sdkws/wrappers.proto";
option go_package = "Open_IM/pkg/proto/group;group";
option go_package = "OpenIM/pkg/proto/group;group";
package group;
+1 -1
View File
@@ -1,7 +1,7 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// source: sdkws/ws.proto
package sdkws // import "Open_IM/pkg/proto/sdkws"
package sdkws // import "OpenIM/pkg/proto/sdkws"
import proto "github.com/golang/protobuf/proto"
import fmt "fmt"
+1 -1
View File
@@ -1,6 +1,6 @@
syntax = "proto3";
import "Open-IM-Server/pkg/proto/sdkws/wrappers.proto";
option go_package = "Open_IM/pkg/proto/sdkws;sdkws";
option go_package = "OpenIM/pkg/proto/sdkws;sdkws";
package sdkws;
+3 -3
View File
@@ -1,13 +1,13 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// source: user/user.proto
package user // import "Open_IM/pkg/proto/user"
package user // import "OpenIM/pkg/proto/user"
import proto "github.com/golang/protobuf/proto"
import fmt "fmt"
import math "math"
import conversation "Open_IM/pkg/proto/conversation"
import sdkws "Open_IM/pkg/proto/sdkws"
import conversation "OpenIM/pkg/proto/conversation"
import sdkws "OpenIM/pkg/proto/sdkws"
import (
context "golang.org/x/net/context"
+1 -1
View File
@@ -1,7 +1,7 @@
syntax = "proto3";
import "Open-IM-Server/pkg/proto/sdkws/ws.proto";
import "Open-IM-Server/pkg/proto/conversation/conversation.proto";
option go_package = "Open_IM/pkg/proto/user;user";
option go_package = "OpenIM/pkg/proto/user;user";
package user;
+1 -1
View File
@@ -1,7 +1,7 @@
package statistics
import (
"Open_IM/pkg/common/log"
"OpenIM/pkg/common/log"
"time"
)
+2 -2
View File
@@ -1,8 +1,8 @@
package utils
import (
"Open_IM/pkg/common/constant"
sdkws "Open_IM/pkg/proto/sdkws"
"OpenIM/pkg/common/constant"
sdkws "OpenIM/pkg/proto/sdkws"
"github.com/golang/protobuf/proto"
)
+1 -1
View File
@@ -1,7 +1,7 @@
package utils
import (
"Open_IM/pkg/common/constant"
"OpenIM/pkg/common/constant"
"fmt"
"math/rand"
"os"
+3 -3
View File
@@ -1,9 +1,9 @@
package utils
import (
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/tokenverify"
"OpenIM/pkg/common/config"
"OpenIM/pkg/common/constant"
"OpenIM/pkg/common/tokenverify"
"testing"
"time"
+1 -1
View File
@@ -1,7 +1,7 @@
package utils
import (
"Open_IM/pkg/common/constant"
"OpenIM/pkg/common/constant"
"testing"
"github.com/stretchr/testify/assert"
+1 -1
View File
@@ -7,7 +7,7 @@
package utils
import (
"Open_IM/pkg/common/constant"
"OpenIM/pkg/common/constant"
"encoding/json"
"math/rand"
"strconv"