feat: optimize code and support running in single process mode (#3142)

* pb

* fix: Modifying other fields while setting IsPrivateChat does not take effect

* fix: quote message error revoke

* refactoring scheduled tasks

* refactoring scheduled tasks

* refactoring scheduled tasks

* refactoring scheduled tasks

* refactoring scheduled tasks

* refactoring scheduled tasks

* upgrading pkg tools

* fix

* fix

* optimize log output

* feat: support GetLastMessage

* feat: support GetLastMessage

* feat: s3 switch

* feat: s3 switch

* fix: GetUsersOnline

* feat: SendBusinessNotification supported configuration parameters

* feat: SendBusinessNotification supported configuration parameters

* feat: SendBusinessNotification supported configuration parameters

* feat: seq conversion failed without exiting

* monolithic

* fix: DeleteDoc crash

* fix: DeleteDoc crash

* fix: monolithic

* fix: monolithic

* fix: fill send time

* fix: fill send time

* fix: crash caused by withdrawing messages from users who have left the group

* fix: mq

* fix: mq

* fix: user msg timestamp

* fix: mq

* 1

* 1

* 1

* 1

* 1

* 1

* 1

* seq read config

* seq read config

* 1

* 1

* fix: the source message of the reference is withdrawn, and the referenced message is deleted

* 1

* 1

* 1

* 1

* 1

* 1

* 1

* 1

* 1

* 1

* 1

* 1

* 1

* 1
This commit is contained in:
chao
2025-02-14 16:18:27 +08:00
committed by OpenIM-Robot
parent 4b3a2b7483
commit f322ddca77
100 changed files with 3044 additions and 1857 deletions
+21 -3
View File
@@ -19,6 +19,7 @@ import (
"github.com/openimsdk/open-im-server/v3/internal/api"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/startrpc"
"github.com/openimsdk/open-im-server/v3/version"
"github.com/openimsdk/tools/system/program"
"github.com/spf13/cobra"
@@ -32,7 +33,7 @@ type ApiCmd struct {
}
func NewApiCmd() *ApiCmd {
apiConfig := api.Config{AllConfig: &config.AllConfig{}}
var apiConfig api.Config
ret := &ApiCmd{apiConfig: &apiConfig}
ret.configMap = map[string]any{
config.DiscoveryConfigFilename: &apiConfig.Discovery,
@@ -61,7 +62,7 @@ func NewApiCmd() *ApiCmd {
ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap))
ret.ctx = context.WithValue(context.Background(), "version", version.Version)
ret.Command.RunE = func(cmd *cobra.Command, args []string) error {
apiConfig.ConfigPath = ret.configPath
apiConfig.ConfigPath = config.Path(ret.configPath)
return ret.runE()
}
return ret
@@ -72,5 +73,22 @@ func (a *ApiCmd) Exec() error {
}
func (a *ApiCmd) runE() error {
return api.Start(a.ctx, a.Index(), a.apiConfig)
a.apiConfig.Index = config.Index(a.Index())
prometheus := config.Prometheus{
Enable: a.apiConfig.API.Prometheus.Enable,
Ports: a.apiConfig.API.Prometheus.Ports,
}
return startrpc.Start(
a.ctx, &a.apiConfig.Discovery,
&prometheus,
a.apiConfig.API.Api.ListenIP, "",
a.apiConfig.API.Prometheus.AutoSetPorts,
nil, int(a.apiConfig.Index),
a.apiConfig.Discovery.RpcService.MessageGateway,
&a.apiConfig.Notification,
a.apiConfig,
[]string{},
[]string{},
api.Start,
)
}
+1
View File
@@ -38,6 +38,7 @@ func NewAuthRpcCmd() *AuthRpcCmd {
ret.configMap = map[string]any{
config.OpenIMRPCAuthCfgFileName: &authConfig.RpcConfig,
config.RedisConfigFileName: &authConfig.RedisConfig,
config.MongodbConfigFileName: &authConfig.MongoConfig,
config.ShareFileName: &authConfig.Share,
config.DiscoveryConfigFilename: &authConfig.Discovery,
}
+18 -4
View File
@@ -17,8 +17,9 @@ package cmd
import (
"context"
"github.com/openimsdk/open-im-server/v3/internal/tools"
"github.com/openimsdk/open-im-server/v3/internal/tools/cron"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/startrpc"
"github.com/openimsdk/open-im-server/v3/version"
"github.com/openimsdk/tools/system/program"
"github.com/spf13/cobra"
@@ -28,11 +29,11 @@ type CronTaskCmd struct {
*RootCmd
ctx context.Context
configMap map[string]any
cronTaskConfig *tools.CronTaskConfig
cronTaskConfig *cron.Config
}
func NewCronTaskCmd() *CronTaskCmd {
var cronTaskConfig tools.CronTaskConfig
var cronTaskConfig cron.Config
ret := &CronTaskCmd{cronTaskConfig: &cronTaskConfig}
ret.configMap = map[string]any{
config.OpenIMCronTaskCfgFileName: &cronTaskConfig.CronTask,
@@ -52,5 +53,18 @@ func (a *CronTaskCmd) Exec() error {
}
func (a *CronTaskCmd) runE() error {
return tools.Start(a.ctx, a.cronTaskConfig)
var prometheus config.Prometheus
return startrpc.Start(
a.ctx, &a.cronTaskConfig.Discovery,
&prometheus,
"", "",
true,
nil, 0,
"",
nil,
a.cronTaskConfig,
[]string{},
[]string{},
cron.Start,
)
}
+17 -1
View File
@@ -19,6 +19,7 @@ import (
"github.com/openimsdk/open-im-server/v3/internal/msggateway"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/startrpc"
"github.com/openimsdk/open-im-server/v3/version"
"github.com/openimsdk/tools/system/program"
@@ -55,5 +56,20 @@ func (m *MsgGatewayCmd) Exec() error {
}
func (m *MsgGatewayCmd) runE() error {
return msggateway.Start(m.ctx, m.Index(), m.msgGatewayConfig)
m.msgGatewayConfig.Index = config.Index(m.Index())
rpc := m.msgGatewayConfig.MsgGateway.RPC
var prometheus config.Prometheus
return startrpc.Start(
m.ctx, &m.msgGatewayConfig.Discovery,
&prometheus,
rpc.ListenIP, rpc.RegisterIP,
rpc.AutoSetPorts,
rpc.Ports, int(m.msgGatewayConfig.Index),
m.msgGatewayConfig.Discovery.RpcService.MessageGateway,
nil,
m.msgGatewayConfig,
[]string{},
[]string{},
msggateway.Start,
)
}
+16 -1
View File
@@ -19,6 +19,7 @@ import (
"github.com/openimsdk/open-im-server/v3/internal/msgtransfer"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/startrpc"
"github.com/openimsdk/open-im-server/v3/version"
"github.com/openimsdk/tools/system/program"
"github.com/spf13/cobra"
@@ -56,5 +57,19 @@ func (m *MsgTransferCmd) Exec() error {
}
func (m *MsgTransferCmd) runE() error {
return msgtransfer.Start(m.ctx, m.Index(), m.msgTransferConfig)
m.msgTransferConfig.Index = config.Index(m.Index())
var prometheus config.Prometheus
return startrpc.Start(
m.ctx, &m.msgTransferConfig.Discovery,
&prometheus,
"", "",
true,
nil, int(m.msgTransferConfig.Index),
"",
nil,
m.msgTransferConfig,
[]string{},
[]string{},
msgtransfer.Start,
)
}
+2 -1
View File
@@ -38,6 +38,7 @@ func NewPushRpcCmd() *PushRpcCmd {
ret.configMap = map[string]any{
config.OpenIMPushCfgFileName: &pushConfig.RpcConfig,
config.RedisConfigFileName: &pushConfig.RedisConfig,
config.MongodbConfigFileName: &pushConfig.MongoConfig,
config.KafkaConfigFileName: &pushConfig.KafkaConfig,
config.ShareFileName: &pushConfig.Share,
config.NotificationFileName: &pushConfig.NotificationConfig,
@@ -48,7 +49,7 @@ func NewPushRpcCmd() *PushRpcCmd {
ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap))
ret.ctx = context.WithValue(context.Background(), "version", version.Version)
ret.Command.RunE = func(cmd *cobra.Command, args []string) error {
ret.pushConfig.FcmConfigPath = ret.ConfigPath()
ret.pushConfig.FcmConfigPath = config.Path(ret.ConfigPath())
return ret.runE()
}
return ret
+4 -10
View File
@@ -12,7 +12,6 @@ import (
"github.com/openimsdk/tools/discovery/etcd"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/utils/runtimeenv"
"github.com/spf13/cobra"
clientv3 "go.etcd.io/etcd/client/v3"
)
@@ -86,14 +85,12 @@ func (r *RootCmd) initEtcd() error {
return err
}
disConfig := config.Discovery{}
env := runtimeenv.RuntimeEnvironment()
err = config.Load(configDirectory, config.DiscoveryConfigFilename, config.EnvPrefixMap[config.DiscoveryConfigFilename],
env, &disConfig)
err = config.Load(configDirectory, config.DiscoveryConfigFilename, config.EnvPrefixMap[config.DiscoveryConfigFilename], &disConfig)
if err != nil {
return err
}
if disConfig.Enable == config.ETCD {
discov, _ := kdisc.NewDiscoveryRegister(&disConfig, env, nil)
discov, _ := kdisc.NewDiscoveryRegister(&disConfig, nil)
r.etcdClient = discov.(*etcd.SvcDiscoveryRegistryImpl).GetClient()
}
return nil
@@ -125,18 +122,16 @@ func (r *RootCmd) initializeConfiguration(cmd *cobra.Command, opts *CmdOpts) err
return err
}
runtimeEnv := runtimeenv.RuntimeEnvironment()
// Load common configuration file
//opts.configMap[ShareFileName] = StructEnvPrefix{EnvPrefix: shareEnvPrefix, ConfigStruct: &r.share}
for configFileName, configStruct := range opts.configMap {
err := config.Load(configDirectory, configFileName, config.EnvPrefixMap[configFileName], runtimeEnv, configStruct)
err := config.Load(configDirectory, configFileName, config.EnvPrefixMap[configFileName], configStruct)
if err != nil {
return err
}
}
// Load common log configuration file
return config.Load(configDirectory, config.LogConfigFileName, config.EnvPrefixMap[config.LogConfigFileName], runtimeEnv, &r.log)
return config.Load(configDirectory, config.LogConfigFileName, config.EnvPrefixMap[config.LogConfigFileName], &r.log)
}
func (r *RootCmd) updateConfigFromEtcd(opts *CmdOpts) error {
@@ -208,7 +203,6 @@ func (r *RootCmd) applyOptions(opts ...func(*CmdOpts)) *CmdOpts {
func (r *RootCmd) initializeLogger(cmdOpts *CmdOpts) error {
err := log.InitLoggerFromConfig(
cmdOpts.loggerPrefixName,
r.processName,
"", "",
+411 -339
View File
@@ -28,382 +28,348 @@ import (
"github.com/openimsdk/tools/s3/oss"
)
const StructTagName = "yaml"
type Path string
type Index int
type CacheConfig struct {
Topic string `mapstructure:"topic"`
SlotNum int `mapstructure:"slotNum"`
SlotSize int `mapstructure:"slotSize"`
SuccessExpire int `mapstructure:"successExpire"`
FailedExpire int `mapstructure:"failedExpire"`
Topic string `yaml:"topic"`
SlotNum int `yaml:"slotNum"`
SlotSize int `yaml:"slotSize"`
SuccessExpire int `yaml:"successExpire"`
FailedExpire int `yaml:"failedExpire"`
}
type LocalCache struct {
User CacheConfig `mapstructure:"user"`
Group CacheConfig `mapstructure:"group"`
Friend CacheConfig `mapstructure:"friend"`
Conversation CacheConfig `mapstructure:"conversation"`
User CacheConfig `yaml:"user"`
Group CacheConfig `yaml:"group"`
Friend CacheConfig `yaml:"friend"`
Conversation CacheConfig `yaml:"conversation"`
}
type Log struct {
StorageLocation string `mapstructure:"storageLocation"`
RotationTime uint `mapstructure:"rotationTime"`
RemainRotationCount uint `mapstructure:"remainRotationCount"`
RemainLogLevel int `mapstructure:"remainLogLevel"`
IsStdout bool `mapstructure:"isStdout"`
IsJson bool `mapstructure:"isJson"`
IsSimplify bool `mapstructure:"isSimplify"`
WithStack bool `mapstructure:"withStack"`
StorageLocation string `yaml:"storageLocation"`
RotationTime uint `yaml:"rotationTime"`
RemainRotationCount uint `yaml:"remainRotationCount"`
RemainLogLevel int `yaml:"remainLogLevel"`
IsStdout bool `yaml:"isStdout"`
IsJson bool `yaml:"isJson"`
IsSimplify bool `yaml:"isSimplify"`
WithStack bool `yaml:"withStack"`
}
type Minio struct {
Bucket string `mapstructure:"bucket"`
AccessKeyID string `mapstructure:"accessKeyID"`
SecretAccessKey string `mapstructure:"secretAccessKey"`
SessionToken string `mapstructure:"sessionToken"`
InternalAddress string `mapstructure:"internalAddress"`
ExternalAddress string `mapstructure:"externalAddress"`
PublicRead bool `mapstructure:"publicRead"`
Bucket string `yaml:"bucket"`
AccessKeyID string `yaml:"accessKeyID"`
SecretAccessKey string `yaml:"secretAccessKey"`
SessionToken string `yaml:"sessionToken"`
InternalAddress string `yaml:"internalAddress"`
ExternalAddress string `yaml:"externalAddress"`
PublicRead bool `yaml:"publicRead"`
}
type Mongo struct {
URI string `mapstructure:"uri"`
Address []string `mapstructure:"address"`
Database string `mapstructure:"database"`
Username string `mapstructure:"username"`
Password string `mapstructure:"password"`
AuthSource string `mapstructure:"authSource"`
MaxPoolSize int `mapstructure:"maxPoolSize"`
MaxRetry int `mapstructure:"maxRetry"`
URI string `yaml:"uri"`
Address []string `yaml:"address"`
Database string `yaml:"database"`
Username string `yaml:"username"`
Password string `yaml:"password"`
AuthSource string `yaml:"authSource"`
MaxPoolSize int `yaml:"maxPoolSize"`
MaxRetry int `yaml:"maxRetry"`
}
type Kafka struct {
Username string `mapstructure:"username"`
Password string `mapstructure:"password"`
ProducerAck string `mapstructure:"producerAck"`
CompressType string `mapstructure:"compressType"`
Address []string `mapstructure:"address"`
ToRedisTopic string `mapstructure:"toRedisTopic"`
ToMongoTopic string `mapstructure:"toMongoTopic"`
ToPushTopic string `mapstructure:"toPushTopic"`
ToOfflinePushTopic string `mapstructure:"toOfflinePushTopic"`
ToRedisGroupID string `mapstructure:"toRedisGroupID"`
ToMongoGroupID string `mapstructure:"toMongoGroupID"`
ToPushGroupID string `mapstructure:"toPushGroupID"`
ToOfflineGroupID string `mapstructure:"toOfflinePushGroupID"`
Username string `yaml:"username"`
Password string `yaml:"password"`
ProducerAck string `yaml:"producerAck"`
CompressType string `yaml:"compressType"`
Address []string `yaml:"address"`
ToRedisTopic string `yaml:"toRedisTopic"`
ToMongoTopic string `yaml:"toMongoTopic"`
ToPushTopic string `yaml:"toPushTopic"`
ToOfflinePushTopic string `yaml:"toOfflinePushTopic"`
ToRedisGroupID string `yaml:"toRedisGroupID"`
ToMongoGroupID string `yaml:"toMongoGroupID"`
ToPushGroupID string `yaml:"toPushGroupID"`
ToOfflineGroupID string `yaml:"toOfflinePushGroupID"`
Tls TLSConfig `mapstructure:"tls"`
Tls TLSConfig `yaml:"tls"`
}
type TLSConfig struct {
EnableTLS bool `mapstructure:"enableTLS"`
CACrt string `mapstructure:"caCrt"`
ClientCrt string `mapstructure:"clientCrt"`
ClientKey string `mapstructure:"clientKey"`
ClientKeyPwd string `mapstructure:"clientKeyPwd"`
InsecureSkipVerify bool `mapstructure:"insecureSkipVerify"`
EnableTLS bool `yaml:"enableTLS"`
CACrt string `yaml:"caCrt"`
ClientCrt string `yaml:"clientCrt"`
ClientKey string `yaml:"clientKey"`
ClientKeyPwd string `yaml:"clientKeyPwd"`
InsecureSkipVerify bool `yaml:"insecureSkipVerify"`
}
type API struct {
Api struct {
ListenIP string `mapstructure:"listenIP"`
Ports []int `mapstructure:"ports"`
CompressionLevel int `mapstructure:"compressionLevel"`
} `mapstructure:"api"`
ListenIP string `yaml:"listenIP"`
Ports []int `yaml:"ports"`
CompressionLevel int `yaml:"compressionLevel"`
} `yaml:"api"`
Prometheus struct {
Enable bool `mapstructure:"enable"`
AutoSetPorts bool `mapstructure:"autoSetPorts"`
Ports []int `mapstructure:"ports"`
GrafanaURL string `mapstructure:"grafanaURL"`
} `mapstructure:"prometheus"`
Enable bool `yaml:"enable"`
AutoSetPorts bool `yaml:"autoSetPorts"`
Ports []int `yaml:"ports"`
GrafanaURL string `yaml:"grafanaURL"`
} `yaml:"prometheus"`
}
type CronTask struct {
CronExecuteTime string `mapstructure:"cronExecuteTime"`
RetainChatRecords int `mapstructure:"retainChatRecords"`
FileExpireTime int `mapstructure:"fileExpireTime"`
DeleteObjectType []string `mapstructure:"deleteObjectType"`
CronExecuteTime string `yaml:"cronExecuteTime"`
RetainChatRecords int `yaml:"retainChatRecords"`
FileExpireTime int `yaml:"fileExpireTime"`
DeleteObjectType []string `yaml:"deleteObjectType"`
}
type OfflinePushConfig struct {
Enable bool `mapstructure:"enable"`
Title string `mapstructure:"title"`
Desc string `mapstructure:"desc"`
Ext string `mapstructure:"ext"`
Enable bool `yaml:"enable"`
Title string `yaml:"title"`
Desc string `yaml:"desc"`
Ext string `yaml:"ext"`
}
type NotificationConfig struct {
IsSendMsg bool `mapstructure:"isSendMsg"`
ReliabilityLevel int `mapstructure:"reliabilityLevel"`
UnreadCount bool `mapstructure:"unreadCount"`
OfflinePush OfflinePushConfig `mapstructure:"offlinePush"`
IsSendMsg bool `yaml:"isSendMsg"`
ReliabilityLevel int `yaml:"reliabilityLevel"`
UnreadCount bool `yaml:"unreadCount"`
OfflinePush OfflinePushConfig `yaml:"offlinePush"`
}
type Notification struct {
GroupCreated NotificationConfig `mapstructure:"groupCreated"`
GroupInfoSet NotificationConfig `mapstructure:"groupInfoSet"`
JoinGroupApplication NotificationConfig `mapstructure:"joinGroupApplication"`
MemberQuit NotificationConfig `mapstructure:"memberQuit"`
GroupApplicationAccepted NotificationConfig `mapstructure:"groupApplicationAccepted"`
GroupApplicationRejected NotificationConfig `mapstructure:"groupApplicationRejected"`
GroupOwnerTransferred NotificationConfig `mapstructure:"groupOwnerTransferred"`
MemberKicked NotificationConfig `mapstructure:"memberKicked"`
MemberInvited NotificationConfig `mapstructure:"memberInvited"`
MemberEnter NotificationConfig `mapstructure:"memberEnter"`
GroupDismissed NotificationConfig `mapstructure:"groupDismissed"`
GroupMuted NotificationConfig `mapstructure:"groupMuted"`
GroupCancelMuted NotificationConfig `mapstructure:"groupCancelMuted"`
GroupMemberMuted NotificationConfig `mapstructure:"groupMemberMuted"`
GroupMemberCancelMuted NotificationConfig `mapstructure:"groupMemberCancelMuted"`
GroupMemberInfoSet NotificationConfig `mapstructure:"groupMemberInfoSet"`
GroupCreated NotificationConfig `yaml:"groupCreated"`
GroupInfoSet NotificationConfig `yaml:"groupInfoSet"`
JoinGroupApplication NotificationConfig `yaml:"joinGroupApplication"`
MemberQuit NotificationConfig `yaml:"memberQuit"`
GroupApplicationAccepted NotificationConfig `yaml:"groupApplicationAccepted"`
GroupApplicationRejected NotificationConfig `yaml:"groupApplicationRejected"`
GroupOwnerTransferred NotificationConfig `yaml:"groupOwnerTransferred"`
MemberKicked NotificationConfig `yaml:"memberKicked"`
MemberInvited NotificationConfig `yaml:"memberInvited"`
MemberEnter NotificationConfig `yaml:"memberEnter"`
GroupDismissed NotificationConfig `yaml:"groupDismissed"`
GroupMuted NotificationConfig `yaml:"groupMuted"`
GroupCancelMuted NotificationConfig `yaml:"groupCancelMuted"`
GroupMemberMuted NotificationConfig `yaml:"groupMemberMuted"`
GroupMemberCancelMuted NotificationConfig `yaml:"groupMemberCancelMuted"`
GroupMemberInfoSet NotificationConfig `yaml:"groupMemberInfoSet"`
GroupMemberSetToAdmin NotificationConfig `yaml:"groupMemberSetToAdmin"`
GroupMemberSetToOrdinary NotificationConfig `yaml:"groupMemberSetToOrdinaryUser"`
GroupInfoSetAnnouncement NotificationConfig `mapstructure:"groupInfoSetAnnouncement"`
GroupInfoSetName NotificationConfig `mapstructure:"groupInfoSetName"`
FriendApplicationAdded NotificationConfig `mapstructure:"friendApplicationAdded"`
FriendApplicationApproved NotificationConfig `mapstructure:"friendApplicationApproved"`
FriendApplicationRejected NotificationConfig `mapstructure:"friendApplicationRejected"`
FriendAdded NotificationConfig `mapstructure:"friendAdded"`
FriendDeleted NotificationConfig `mapstructure:"friendDeleted"`
FriendRemarkSet NotificationConfig `mapstructure:"friendRemarkSet"`
BlackAdded NotificationConfig `mapstructure:"blackAdded"`
BlackDeleted NotificationConfig `mapstructure:"blackDeleted"`
FriendInfoUpdated NotificationConfig `mapstructure:"friendInfoUpdated"`
UserInfoUpdated NotificationConfig `mapstructure:"userInfoUpdated"`
UserStatusChanged NotificationConfig `mapstructure:"userStatusChanged"`
ConversationChanged NotificationConfig `mapstructure:"conversationChanged"`
ConversationSetPrivate NotificationConfig `mapstructure:"conversationSetPrivate"`
GroupInfoSetAnnouncement NotificationConfig `yaml:"groupInfoSetAnnouncement"`
GroupInfoSetName NotificationConfig `yaml:"groupInfoSetName"`
FriendApplicationAdded NotificationConfig `yaml:"friendApplicationAdded"`
FriendApplicationApproved NotificationConfig `yaml:"friendApplicationApproved"`
FriendApplicationRejected NotificationConfig `yaml:"friendApplicationRejected"`
FriendAdded NotificationConfig `yaml:"friendAdded"`
FriendDeleted NotificationConfig `yaml:"friendDeleted"`
FriendRemarkSet NotificationConfig `yaml:"friendRemarkSet"`
BlackAdded NotificationConfig `yaml:"blackAdded"`
BlackDeleted NotificationConfig `yaml:"blackDeleted"`
FriendInfoUpdated NotificationConfig `yaml:"friendInfoUpdated"`
UserInfoUpdated NotificationConfig `yaml:"userInfoUpdated"`
UserStatusChanged NotificationConfig `yaml:"userStatusChanged"`
ConversationChanged NotificationConfig `yaml:"conversationChanged"`
ConversationSetPrivate NotificationConfig `yaml:"conversationSetPrivate"`
}
type Prometheus struct {
Enable bool `mapstructure:"enable"`
Ports []int `mapstructure:"ports"`
Enable bool `yaml:"enable"`
Ports []int `yaml:"ports"`
}
type MsgGateway struct {
RPC struct {
RegisterIP string `mapstructure:"registerIP"`
AutoSetPorts bool `mapstructure:"autoSetPorts"`
Ports []int `mapstructure:"ports"`
} `mapstructure:"rpc"`
Prometheus Prometheus `mapstructure:"prometheus"`
ListenIP string `mapstructure:"listenIP"`
RPC RPC `yaml:"rpc"`
Prometheus Prometheus `yaml:"prometheus"`
ListenIP string `yaml:"listenIP"`
LongConnSvr struct {
Ports []int `mapstructure:"ports"`
WebsocketMaxConnNum int `mapstructure:"websocketMaxConnNum"`
WebsocketMaxMsgLen int `mapstructure:"websocketMaxMsgLen"`
WebsocketTimeout int `mapstructure:"websocketTimeout"`
} `mapstructure:"longConnSvr"`
Ports []int `yaml:"ports"`
WebsocketMaxConnNum int `yaml:"websocketMaxConnNum"`
WebsocketMaxMsgLen int `yaml:"websocketMaxMsgLen"`
WebsocketTimeout int `yaml:"websocketTimeout"`
} `yaml:"longConnSvr"`
}
type MsgTransfer struct {
Prometheus struct {
Enable bool `mapstructure:"enable"`
AutoSetPorts bool `mapstructure:"autoSetPorts"`
Ports []int `mapstructure:"ports"`
} `mapstructure:"prometheus"`
Enable bool `yaml:"enable"`
AutoSetPorts bool `yaml:"autoSetPorts"`
Ports []int `yaml:"ports"`
} `yaml:"prometheus"`
}
type Push struct {
RPC struct {
RegisterIP string `mapstructure:"registerIP"`
ListenIP string `mapstructure:"listenIP"`
AutoSetPorts bool `mapstructure:"autoSetPorts"`
Ports []int `mapstructure:"ports"`
} `mapstructure:"rpc"`
Prometheus Prometheus `mapstructure:"prometheus"`
MaxConcurrentWorkers int `mapstructure:"maxConcurrentWorkers"`
Enable string `mapstructure:"enable"`
RPC RPC `yaml:"rpc"`
Prometheus Prometheus `yaml:"prometheus"`
MaxConcurrentWorkers int `yaml:"maxConcurrentWorkers"`
Enable string `yaml:"enable"`
GeTui struct {
PushUrl string `mapstructure:"pushUrl"`
MasterSecret string `mapstructure:"masterSecret"`
AppKey string `mapstructure:"appKey"`
Intent string `mapstructure:"intent"`
ChannelID string `mapstructure:"channelID"`
ChannelName string `mapstructure:"channelName"`
} `mapstructure:"geTui"`
PushUrl string `yaml:"pushUrl"`
MasterSecret string `yaml:"masterSecret"`
AppKey string `yaml:"appKey"`
Intent string `yaml:"intent"`
ChannelID string `yaml:"channelID"`
ChannelName string `yaml:"channelName"`
} `yaml:"geTui"`
FCM struct {
FilePath string `mapstructure:"filePath"`
AuthURL string `mapstructure:"authURL"`
} `mapstructure:"fcm"`
FilePath string `yaml:"filePath"`
AuthURL string `yaml:"authURL"`
} `yaml:"fcm"`
JPush struct {
AppKey string `mapstructure:"appKey"`
MasterSecret string `mapstructure:"masterSecret"`
PushURL string `mapstructure:"pushURL"`
PushIntent string `mapstructure:"pushIntent"`
} `mapstructure:"jpush"`
AppKey string `yaml:"appKey"`
MasterSecret string `yaml:"masterSecret"`
PushURL string `yaml:"pushURL"`
PushIntent string `yaml:"pushIntent"`
} `yaml:"jpush"`
IOSPush struct {
PushSound string `mapstructure:"pushSound"`
BadgeCount bool `mapstructure:"badgeCount"`
Production bool `mapstructure:"production"`
} `mapstructure:"iosPush"`
FullUserCache bool `mapstructure:"fullUserCache"`
PushSound string `yaml:"pushSound"`
BadgeCount bool `yaml:"badgeCount"`
Production bool `yaml:"production"`
} `yaml:"iosPush"`
FullUserCache bool `yaml:"fullUserCache"`
}
type Auth struct {
RPC struct {
RegisterIP string `mapstructure:"registerIP"`
ListenIP string `mapstructure:"listenIP"`
AutoSetPorts bool `mapstructure:"autoSetPorts"`
Ports []int `mapstructure:"ports"`
} `mapstructure:"rpc"`
Prometheus Prometheus `mapstructure:"prometheus"`
RPC RPC `yaml:"rpc"`
Prometheus Prometheus `yaml:"prometheus"`
TokenPolicy struct {
Expire int64 `mapstructure:"expire"`
} `mapstructure:"tokenPolicy"`
Expire int64 `yaml:"expire"`
} `yaml:"tokenPolicy"`
}
type Conversation struct {
RPC struct {
RegisterIP string `mapstructure:"registerIP"`
ListenIP string `mapstructure:"listenIP"`
AutoSetPorts bool `mapstructure:"autoSetPorts"`
Ports []int `mapstructure:"ports"`
} `mapstructure:"rpc"`
Prometheus Prometheus `mapstructure:"prometheus"`
RPC RPC `yaml:"rpc"`
Prometheus Prometheus `yaml:"prometheus"`
}
type Friend struct {
RPC struct {
RegisterIP string `mapstructure:"registerIP"`
ListenIP string `mapstructure:"listenIP"`
AutoSetPorts bool `mapstructure:"autoSetPorts"`
Ports []int `mapstructure:"ports"`
} `mapstructure:"rpc"`
Prometheus Prometheus `mapstructure:"prometheus"`
RPC RPC `yaml:"rpc"`
Prometheus Prometheus `yaml:"prometheus"`
}
type Group struct {
RPC struct {
RegisterIP string `mapstructure:"registerIP"`
ListenIP string `mapstructure:"listenIP"`
AutoSetPorts bool `mapstructure:"autoSetPorts"`
Ports []int `mapstructure:"ports"`
} `mapstructure:"rpc"`
Prometheus Prometheus `mapstructure:"prometheus"`
EnableHistoryForNewMembers bool `mapstructure:"enableHistoryForNewMembers"`
RPC RPC `yaml:"rpc"`
Prometheus Prometheus `yaml:"prometheus"`
EnableHistoryForNewMembers bool `yaml:"enableHistoryForNewMembers"`
}
type Msg struct {
RPC struct {
RegisterIP string `mapstructure:"registerIP"`
ListenIP string `mapstructure:"listenIP"`
AutoSetPorts bool `mapstructure:"autoSetPorts"`
Ports []int `mapstructure:"ports"`
} `mapstructure:"rpc"`
Prometheus Prometheus `mapstructure:"prometheus"`
FriendVerify bool `mapstructure:"friendVerify"`
RPC RPC `yaml:"rpc"`
Prometheus Prometheus `yaml:"prometheus"`
FriendVerify bool `yaml:"friendVerify"`
}
type Third struct {
RPC struct {
RegisterIP string `mapstructure:"registerIP"`
ListenIP string `mapstructure:"listenIP"`
AutoSetPorts bool `mapstructure:"autoSetPorts"`
Ports []int `mapstructure:"ports"`
} `mapstructure:"rpc"`
Prometheus Prometheus `mapstructure:"prometheus"`
RPC RPC `yaml:"rpc"`
Prometheus Prometheus `yaml:"prometheus"`
Object struct {
Enable string `mapstructure:"enable"`
Cos Cos `mapstructure:"cos"`
Oss Oss `mapstructure:"oss"`
Kodo Kodo `mapstructure:"kodo"`
Aws Aws `mapstructure:"aws"`
} `mapstructure:"object"`
Enable string `yaml:"enable"`
Cos Cos `yaml:"cos"`
Oss Oss `yaml:"oss"`
Kodo Kodo `yaml:"kodo"`
Aws Aws `yaml:"aws"`
} `yaml:"object"`
}
type Cos struct {
BucketURL string `mapstructure:"bucketURL"`
SecretID string `mapstructure:"secretID"`
SecretKey string `mapstructure:"secretKey"`
SessionToken string `mapstructure:"sessionToken"`
PublicRead bool `mapstructure:"publicRead"`
BucketURL string `yaml:"bucketURL"`
SecretID string `yaml:"secretID"`
SecretKey string `yaml:"secretKey"`
SessionToken string `yaml:"sessionToken"`
PublicRead bool `yaml:"publicRead"`
}
type Oss struct {
Endpoint string `mapstructure:"endpoint"`
Bucket string `mapstructure:"bucket"`
BucketURL string `mapstructure:"bucketURL"`
AccessKeyID string `mapstructure:"accessKeyID"`
AccessKeySecret string `mapstructure:"accessKeySecret"`
SessionToken string `mapstructure:"sessionToken"`
PublicRead bool `mapstructure:"publicRead"`
Endpoint string `yaml:"endpoint"`
Bucket string `yaml:"bucket"`
BucketURL string `yaml:"bucketURL"`
AccessKeyID string `yaml:"accessKeyID"`
AccessKeySecret string `yaml:"accessKeySecret"`
SessionToken string `yaml:"sessionToken"`
PublicRead bool `yaml:"publicRead"`
}
type Kodo struct {
Endpoint string `mapstructure:"endpoint"`
Bucket string `mapstructure:"bucket"`
BucketURL string `mapstructure:"bucketURL"`
AccessKeyID string `mapstructure:"accessKeyID"`
AccessKeySecret string `mapstructure:"accessKeySecret"`
SessionToken string `mapstructure:"sessionToken"`
PublicRead bool `mapstructure:"publicRead"`
Endpoint string `yaml:"endpoint"`
Bucket string `yaml:"bucket"`
BucketURL string `yaml:"bucketURL"`
AccessKeyID string `yaml:"accessKeyID"`
AccessKeySecret string `yaml:"accessKeySecret"`
SessionToken string `yaml:"sessionToken"`
PublicRead bool `yaml:"publicRead"`
}
type Aws struct {
Endpoint string `mapstructure:"endpoint"`
Region string `mapstructure:"region"`
Bucket string `mapstructure:"bucket"`
AccessKeyID string `mapstructure:"accessKeyID"`
SecretAccessKey string `mapstructure:"secretAccessKey"`
SessionToken string `mapstructure:"sessionToken"`
Region string `yaml:"region"`
Bucket string `yaml:"bucket"`
AccessKeyID string `yaml:"accessKeyID"`
SecretAccessKey string `yaml:"secretAccessKey"`
SessionToken string `yaml:"sessionToken"`
PublicRead bool `yaml:"publicRead"`
}
type User struct {
RPC struct {
RegisterIP string `mapstructure:"registerIP"`
ListenIP string `mapstructure:"listenIP"`
AutoSetPorts bool `mapstructure:"autoSetPorts"`
Ports []int `mapstructure:"ports"`
} `mapstructure:"rpc"`
Prometheus Prometheus `mapstructure:"prometheus"`
RPC RPC `yaml:"rpc"`
Prometheus Prometheus `yaml:"prometheus"`
}
type RPC struct {
RegisterIP string `yaml:"registerIP"`
ListenIP string `yaml:"listenIP"`
AutoSetPorts bool `yaml:"autoSetPorts"`
Ports []int `yaml:"ports"`
}
type Redis struct {
Address []string `mapstructure:"address"`
Username string `mapstructure:"username"`
Password string `mapstructure:"password"`
ClusterMode bool `mapstructure:"clusterMode"`
DB int `mapstructure:"storage"`
MaxRetry int `mapstructure:"maxRetry"`
PoolSize int `mapstructure:"poolSize"`
Disable bool `yaml:"-"`
Address []string `yaml:"address"`
Username string `yaml:"username"`
Password string `yaml:"password"`
ClusterMode bool `yaml:"clusterMode"`
DB int `yaml:"storage"`
MaxRetry int `yaml:"maxRetry"`
PoolSize int `yaml:"poolSize"`
}
type BeforeConfig struct {
Enable bool `yaml:"enable"`
Timeout int `yaml:"timeout"`
FailedContinue bool `yaml:"failedContinue"`
DeniedTypes []int32 `yaml:"deniedTypes"`
Enable bool `yaml:"enable"`
Timeout int `yaml:"timeout"`
FailedContinue bool `yaml:"failedContinue"`
AllowedTypes []string `yaml:"allowedTypes"`
DeniedTypes []string `yaml:"deniedTypes"`
}
type AfterConfig struct {
Enable bool `yaml:"enable"`
Timeout int `yaml:"timeout"`
AttentionIds []string `yaml:"attentionIds"`
DeniedTypes []int32 `yaml:"deniedTypes"`
AllowedTypes []string `yaml:"allowedTypes"`
DeniedTypes []string `yaml:"deniedTypes"`
}
type Share struct {
Secret string `yaml:"secret"`
IMAdminUserID []string `yaml:"imAdminUserID"`
MultiLogin MultiLogin `yaml:"multiLogin"`
RPCMaxBodySize MaxRequestBody `yaml:"rpcMaxBodySize"`
}
type MaxRequestBody struct {
RequestMaxBodySize int `yaml:"requestMaxBodySize"`
ResponseMaxBodySize int `yaml:"responseMaxBodySize"`
Secret string `yaml:"secret"`
IMAdminUserID []string `yaml:"imAdminUserID"`
MultiLogin MultiLogin `yaml:"multiLogin"`
}
type MultiLogin struct {
Policy int `mapstructure:"policy"`
MaxNumOneEnd int `mapstructure:"maxNumOneEnd"`
Policy int `yaml:"policy"`
MaxNumOneEnd int `yaml:"maxNumOneEnd"`
}
type RpcRegisterName struct {
User string `mapstructure:"user"`
Friend string `mapstructure:"friend"`
Msg string `mapstructure:"msg"`
Push string `mapstructure:"push"`
MessageGateway string `mapstructure:"messageGateway"`
Group string `mapstructure:"group"`
Auth string `mapstructure:"auth"`
Conversation string `mapstructure:"conversation"`
Third string `mapstructure:"third"`
type RpcService struct {
User string `yaml:"user"`
Friend string `yaml:"friend"`
Msg string `yaml:"msg"`
Push string `yaml:"push"`
MessageGateway string `yaml:"messageGateway"`
Group string `yaml:"group"`
Auth string `yaml:"auth"`
Conversation string `yaml:"conversation"`
Third string `yaml:"third"`
}
func (r *RpcRegisterName) GetServiceNames() []string {
@@ -422,81 +388,73 @@ func (r *RpcRegisterName) GetServiceNames() []string {
// FullConfig stores all configurations for before and after events
type Webhooks struct {
URL string `mapstructure:"url"`
BeforeSendSingleMsg BeforeConfig `mapstructure:"beforeSendSingleMsg"`
BeforeUpdateUserInfoEx BeforeConfig `mapstructure:"beforeUpdateUserInfoEx"`
AfterUpdateUserInfoEx AfterConfig `mapstructure:"afterUpdateUserInfoEx"`
AfterSendSingleMsg AfterConfig `mapstructure:"afterSendSingleMsg"`
BeforeSendGroupMsg BeforeConfig `mapstructure:"beforeSendGroupMsg"`
BeforeMsgModify BeforeConfig `mapstructure:"beforeMsgModify"`
AfterSendGroupMsg AfterConfig `mapstructure:"afterSendGroupMsg"`
AfterUserOnline AfterConfig `mapstructure:"afterUserOnline"`
AfterUserOffline AfterConfig `mapstructure:"afterUserOffline"`
AfterUserKickOff AfterConfig `mapstructure:"afterUserKickOff"`
BeforeOfflinePush BeforeConfig `mapstructure:"beforeOfflinePush"`
BeforeOnlinePush BeforeConfig `mapstructure:"beforeOnlinePush"`
BeforeGroupOnlinePush BeforeConfig `mapstructure:"beforeGroupOnlinePush"`
BeforeAddFriend BeforeConfig `mapstructure:"beforeAddFriend"`
BeforeUpdateUserInfo BeforeConfig `mapstructure:"beforeUpdateUserInfo"`
AfterUpdateUserInfo AfterConfig `mapstructure:"afterUpdateUserInfo"`
BeforeCreateGroup BeforeConfig `mapstructure:"beforeCreateGroup"`
AfterCreateGroup AfterConfig `mapstructure:"afterCreateGroup"`
BeforeMemberJoinGroup BeforeConfig `mapstructure:"beforeMemberJoinGroup"`
BeforeSetGroupMemberInfo BeforeConfig `mapstructure:"beforeSetGroupMemberInfo"`
AfterSetGroupMemberInfo AfterConfig `mapstructure:"afterSetGroupMemberInfo"`
AfterQuitGroup AfterConfig `mapstructure:"afterQuitGroup"`
AfterKickGroupMember AfterConfig `mapstructure:"afterKickGroupMember"`
AfterDismissGroup AfterConfig `mapstructure:"afterDismissGroup"`
BeforeApplyJoinGroup BeforeConfig `mapstructure:"beforeApplyJoinGroup"`
AfterGroupMsgRead AfterConfig `mapstructure:"afterGroupMsgRead"`
AfterSingleMsgRead AfterConfig `mapstructure:"afterSingleMsgRead"`
BeforeUserRegister BeforeConfig `mapstructure:"beforeUserRegister"`
AfterUserRegister AfterConfig `mapstructure:"afterUserRegister"`
AfterTransferGroupOwner AfterConfig `mapstructure:"afterTransferGroupOwner"`
BeforeSetFriendRemark BeforeConfig `mapstructure:"beforeSetFriendRemark"`
AfterSetFriendRemark AfterConfig `mapstructure:"afterSetFriendRemark"`
AfterGroupMsgRevoke AfterConfig `mapstructure:"afterGroupMsgRevoke"`
AfterJoinGroup AfterConfig `mapstructure:"afterJoinGroup"`
BeforeInviteUserToGroup BeforeConfig `mapstructure:"beforeInviteUserToGroup"`
AfterSetGroupInfo AfterConfig `mapstructure:"afterSetGroupInfo"`
BeforeSetGroupInfo BeforeConfig `mapstructure:"beforeSetGroupInfo"`
AfterSetGroupInfoEx AfterConfig `mapstructure:"afterSetGroupInfoEx"`
BeforeSetGroupInfoEx BeforeConfig `mapstructure:"beforeSetGroupInfoEx"`
AfterRevokeMsg AfterConfig `mapstructure:"afterRevokeMsg"`
BeforeAddBlack BeforeConfig `mapstructure:"beforeAddBlack"`
AfterAddFriend AfterConfig `mapstructure:"afterAddFriend"`
BeforeAddFriendAgree BeforeConfig `mapstructure:"beforeAddFriendAgree"`
AfterAddFriendAgree AfterConfig `mapstructure:"afterAddFriendAgree"`
AfterDeleteFriend AfterConfig `mapstructure:"afterDeleteFriend"`
BeforeImportFriends BeforeConfig `mapstructure:"beforeImportFriends"`
AfterImportFriends AfterConfig `mapstructure:"afterImportFriends"`
AfterRemoveBlack AfterConfig `mapstructure:"afterRemoveBlack"`
URL string `yaml:"url"`
BeforeSendSingleMsg BeforeConfig `yaml:"beforeSendSingleMsg"`
BeforeUpdateUserInfoEx BeforeConfig `yaml:"beforeUpdateUserInfoEx"`
AfterUpdateUserInfoEx AfterConfig `yaml:"afterUpdateUserInfoEx"`
AfterSendSingleMsg AfterConfig `yaml:"afterSendSingleMsg"`
BeforeSendGroupMsg BeforeConfig `yaml:"beforeSendGroupMsg"`
BeforeMsgModify BeforeConfig `yaml:"beforeMsgModify"`
AfterSendGroupMsg AfterConfig `yaml:"afterSendGroupMsg"`
AfterUserOnline AfterConfig `yaml:"afterUserOnline"`
AfterUserOffline AfterConfig `yaml:"afterUserOffline"`
AfterUserKickOff AfterConfig `yaml:"afterUserKickOff"`
BeforeOfflinePush BeforeConfig `yaml:"beforeOfflinePush"`
BeforeOnlinePush BeforeConfig `yaml:"beforeOnlinePush"`
BeforeGroupOnlinePush BeforeConfig `yaml:"beforeGroupOnlinePush"`
BeforeAddFriend BeforeConfig `yaml:"beforeAddFriend"`
BeforeUpdateUserInfo BeforeConfig `yaml:"beforeUpdateUserInfo"`
AfterUpdateUserInfo AfterConfig `yaml:"afterUpdateUserInfo"`
BeforeCreateGroup BeforeConfig `yaml:"beforeCreateGroup"`
AfterCreateGroup AfterConfig `yaml:"afterCreateGroup"`
BeforeMemberJoinGroup BeforeConfig `yaml:"beforeMemberJoinGroup"`
BeforeSetGroupMemberInfo BeforeConfig `yaml:"beforeSetGroupMemberInfo"`
AfterSetGroupMemberInfo AfterConfig `yaml:"afterSetGroupMemberInfo"`
AfterQuitGroup AfterConfig `yaml:"afterQuitGroup"`
AfterKickGroupMember AfterConfig `yaml:"afterKickGroupMember"`
AfterDismissGroup AfterConfig `yaml:"afterDismissGroup"`
BeforeApplyJoinGroup BeforeConfig `yaml:"beforeApplyJoinGroup"`
AfterGroupMsgRead AfterConfig `yaml:"afterGroupMsgRead"`
AfterSingleMsgRead AfterConfig `yaml:"afterSingleMsgRead"`
BeforeUserRegister BeforeConfig `yaml:"beforeUserRegister"`
AfterUserRegister AfterConfig `yaml:"afterUserRegister"`
AfterTransferGroupOwner AfterConfig `yaml:"afterTransferGroupOwner"`
BeforeSetFriendRemark BeforeConfig `yaml:"beforeSetFriendRemark"`
AfterSetFriendRemark AfterConfig `yaml:"afterSetFriendRemark"`
AfterGroupMsgRevoke AfterConfig `yaml:"afterGroupMsgRevoke"`
AfterJoinGroup AfterConfig `yaml:"afterJoinGroup"`
BeforeInviteUserToGroup BeforeConfig `yaml:"beforeInviteUserToGroup"`
AfterSetGroupInfo AfterConfig `yaml:"afterSetGroupInfo"`
BeforeSetGroupInfo BeforeConfig `yaml:"beforeSetGroupInfo"`
AfterSetGroupInfoEx AfterConfig `yaml:"afterSetGroupInfoEx"`
BeforeSetGroupInfoEx BeforeConfig `yaml:"beforeSetGroupInfoEx"`
AfterRevokeMsg AfterConfig `yaml:"afterRevokeMsg"`
BeforeAddBlack BeforeConfig `yaml:"beforeAddBlack"`
AfterAddFriend AfterConfig `yaml:"afterAddFriend"`
BeforeAddFriendAgree BeforeConfig `yaml:"beforeAddFriendAgree"`
AfterAddFriendAgree AfterConfig `yaml:"afterAddFriendAgree"`
AfterDeleteFriend AfterConfig `yaml:"afterDeleteFriend"`
BeforeImportFriends BeforeConfig `yaml:"beforeImportFriends"`
AfterImportFriends AfterConfig `yaml:"afterImportFriends"`
AfterRemoveBlack AfterConfig `yaml:"afterRemoveBlack"`
}
type ZooKeeper struct {
Schema string `mapstructure:"schema"`
Address []string `mapstructure:"address"`
Username string `mapstructure:"username"`
Password string `mapstructure:"password"`
Schema string `yaml:"schema"`
Address []string `yaml:"address"`
Username string `yaml:"username"`
Password string `yaml:"password"`
}
type Discovery struct {
Enable string `mapstructure:"enable"`
Etcd Etcd `mapstructure:"etcd"`
Kubernetes Kubernetes `mapstructure:"kubernetes"`
RpcService RpcService `mapstructure:"rpcService"`
Enable string `yaml:"enable"`
Etcd Etcd `yaml:"etcd"`
Kubernetes Kubernetes `yaml:"kubernetes"`
RpcService RpcService `yaml:"rpcService"`
}
type RpcService struct {
User string `mapstructure:"user"`
Friend string `mapstructure:"friend"`
Msg string `mapstructure:"msg"`
Push string `mapstructure:"push"`
MessageGateway string `mapstructure:"messageGateway"`
Group string `mapstructure:"group"`
Auth string `mapstructure:"auth"`
Conversation string `mapstructure:"conversation"`
Third string `mapstructure:"third"`
type Kubernetes struct {
Namespace string `yaml:"namespace"`
}
func (r *RpcService) GetServiceNames() []string {
@@ -517,10 +475,10 @@ type Kubernetes struct {
Namespace string `yaml:"namespace"`
}
type Etcd struct {
RootDirectory string `mapstructure:"rootDirectory"`
Address []string `mapstructure:"address"`
Username string `mapstructure:"username"`
Password string `mapstructure:"password"`
RootDirectory string `yaml:"rootDirectory"`
Address []string `yaml:"address"`
Username string `yaml:"username"`
Password string `yaml:"password"`
}
func (m *Mongo) Build() *mongoutil.Config {
@@ -923,3 +881,117 @@ func (a *AllConfig) GetConfigNames() []string {
a.Webhooks.GetConfigFileName(),
}
}
const (
FileName = "config.yaml"
DiscoveryConfigFilename = "discovery.yml"
KafkaConfigFileName = "kafka.yml"
LocalCacheConfigFileName = "local-cache.yml"
LogConfigFileName = "log.yml"
MinioConfigFileName = "minio.yml"
MongodbConfigFileName = "mongodb.yml"
NotificationFileName = "notification.yml"
OpenIMAPICfgFileName = "openim-api.yml"
OpenIMCronTaskCfgFileName = "openim-crontask.yml"
OpenIMMsgGatewayCfgFileName = "openim-msggateway.yml"
OpenIMMsgTransferCfgFileName = "openim-msgtransfer.yml"
OpenIMPushCfgFileName = "openim-push.yml"
OpenIMRPCAuthCfgFileName = "openim-rpc-auth.yml"
OpenIMRPCConversationCfgFileName = "openim-rpc-conversation.yml"
OpenIMRPCFriendCfgFileName = "openim-rpc-friend.yml"
OpenIMRPCGroupCfgFileName = "openim-rpc-group.yml"
OpenIMRPCMsgCfgFileName = "openim-rpc-msg.yml"
OpenIMRPCThirdCfgFileName = "openim-rpc-third.yml"
OpenIMRPCUserCfgFileName = "openim-rpc-user.yml"
RedisConfigFileName = "redis.yml"
ShareFileName = "share.yml"
WebhooksConfigFileName = "webhooks.yml"
)
func (d *Discovery) GetConfigFileName() string {
return DiscoveryConfigFilename
}
func (k *Kafka) GetConfigFileName() string {
return KafkaConfigFileName
}
func (lc *LocalCache) GetConfigFileName() string {
return LocalCacheConfigFileName
}
func (l *Log) GetConfigFileName() string {
return LogConfigFileName
}
func (m *Minio) GetConfigFileName() string {
return MinioConfigFileName
}
func (m *Mongo) GetConfigFileName() string {
return MongodbConfigFileName
}
func (n *Notification) GetConfigFileName() string {
return NotificationFileName
}
func (a *API) GetConfigFileName() string {
return OpenIMAPICfgFileName
}
func (ct *CronTask) GetConfigFileName() string {
return OpenIMCronTaskCfgFileName
}
func (mg *MsgGateway) GetConfigFileName() string {
return OpenIMMsgGatewayCfgFileName
}
func (mt *MsgTransfer) GetConfigFileName() string {
return OpenIMMsgTransferCfgFileName
}
func (p *Push) GetConfigFileName() string {
return OpenIMPushCfgFileName
}
func (a *Auth) GetConfigFileName() string {
return OpenIMRPCAuthCfgFileName
}
func (c *Conversation) GetConfigFileName() string {
return OpenIMRPCConversationCfgFileName
}
func (f *Friend) GetConfigFileName() string {
return OpenIMRPCFriendCfgFileName
}
func (g *Group) GetConfigFileName() string {
return OpenIMRPCGroupCfgFileName
}
func (m *Msg) GetConfigFileName() string {
return OpenIMRPCMsgCfgFileName
}
func (t *Third) GetConfigFileName() string {
return OpenIMRPCThirdCfgFileName
}
func (u *User) GetConfigFileName() string {
return OpenIMRPCUserCfgFileName
}
func (r *Redis) GetConfigFileName() string {
return RedisConfigFileName
}
func (s *Share) GetConfigFileName() string {
return ShareFileName
}
func (w *Webhooks) GetConfigFileName() string {
return WebhooksConfigFileName
}
+4 -1
View File
@@ -14,13 +14,16 @@
package config
import "github.com/openimsdk/tools/utils/runtimeenv"
const ConfKey = "conf"
const (
MountConfigFilePath = "CONFIG_PATH"
DeploymentType = "DEPLOYMENT_TYPE"
KUBERNETES = "kubernetes"
KUBERNETES = runtimeenv.Kubernetes
ETCD = "etcd"
//Standalone = "standalone"
)
const (
+11
View File
@@ -0,0 +1,11 @@
package config
var standalone bool
func SetStandalone() {
standalone = true
}
func Standalone() bool {
return standalone
}
+4 -3
View File
@@ -7,11 +7,12 @@ import (
"github.com/mitchellh/mapstructure"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/utils/runtimeenv"
"github.com/spf13/viper"
)
func Load(configDirectory string, configFileName string, envPrefix string, runtimeEnv string, config any) error {
if runtimeEnv == KUBERNETES {
func Load(configDirectory string, configFileName string, envPrefix string, config any) error {
if runtimeenv.RuntimeEnvironment() == KUBERNETES {
mountPath := os.Getenv(MountConfigFilePath)
if mountPath == "" {
return errs.ErrArgs.WrapMsg(MountConfigFilePath + " env is empty")
@@ -35,7 +36,7 @@ func loadConfig(path string, envPrefix string, config any) error {
}
if err := v.Unmarshal(config, func(config *mapstructure.DecoderConfig) {
config.TagName = "mapstructure"
config.TagName = StructTagName
}); err != nil {
return errs.WrapMsg(err, "failed to unmarshal config", "path", path, "envPrefix", envPrefix)
}
+7 -2
View File
@@ -19,6 +19,8 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/tools/discovery"
"github.com/openimsdk/tools/discovery/standalone"
"github.com/openimsdk/tools/utils/runtimeenv"
"google.golang.org/grpc"
"github.com/openimsdk/tools/discovery/kubernetes"
@@ -28,8 +30,11 @@ import (
)
// NewDiscoveryRegister creates a new service discovery and registry client based on the provided environment type.
func NewDiscoveryRegister(discovery *config.Discovery, runtimeEnv string, watchNames []string) (discovery.SvcDiscoveryRegistry, error) {
if runtimeEnv == config.KUBERNETES {
func NewDiscoveryRegister(discovery *config.Discovery, watchNames []string) (discovery.SvcDiscoveryRegistry, error) {
if config.Standalone() {
return standalone.GetSvcDiscoveryRegistry(), nil
}
if runtimeenv.RuntimeEnvironment() == config.KUBERNETES {
return kubernetes.NewKubernetesConnManager(discovery.Kubernetes.Namespace,
grpc.WithDefaultCallOptions(
grpc.MaxCallSendMsgSize(1024*1024*20),
+7 -8
View File
@@ -1,10 +1,11 @@
package prommetrics
import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"net"
"strconv"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
)
var (
@@ -24,6 +25,10 @@ var (
)
)
func RegistryApi() {
registry.MustRegister(apiCounter, httpCounter)
}
func ApiInit(listener net.Listener) error {
apiRegistry := prometheus.NewRegistry()
cs := append(
@@ -41,9 +46,3 @@ func APICall(path string, method string, apiCode int) {
func HttpCall(path string, method string, status int) {
httpCounter.With(prometheus.Labels{"path": path, "method": method, "status": strconv.Itoa(status)}).Inc()
}
//func ApiHandler() http.Handler {
// return promhttp.InstrumentMetricHandler(
// apiRegistry, promhttp.HandlerFor(apiRegistry, promhttp.HandlerOpts{}),
// )
//}
-31
View File
@@ -1,31 +0,0 @@
package prommetrics
import "fmt"
const (
APIKeyName = "api"
MessageTransferKeyName = "message-transfer"
)
type Target struct {
Target string `json:"target"`
Labels map[string]string `json:"labels"`
}
type RespTarget struct {
Targets []string `json:"targets"`
Labels map[string]string `json:"labels"`
}
func BuildDiscoveryKey(name string) string {
return fmt.Sprintf("%s/%s/%s", "openim", "prometheus_discovery", name)
}
func BuildDefaultTarget(host string, ip int) Target {
return Target{
Target: fmt.Sprintf("%s:%d", host, ip),
Labels: map[string]string{
"namespace": "default",
},
}
}
-15
View File
@@ -1,15 +0,0 @@
// Copyright © 2024 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package prommetrics // import "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
+4
View File
@@ -24,3 +24,7 @@ var (
Help: "The number of user login",
})
)
func RegistryAuth() {
registry.MustRegister(UserLoginCounter)
}
+9
View File
@@ -36,3 +36,12 @@ var (
Help: "The number of group chat msg failed processed",
})
)
func RegistryMsg() {
registry.MustRegister(
SingleChatMsgProcessSuccessCounter,
SingleChatMsgProcessFailedCounter,
GroupChatMsgProcessSuccessCounter,
GroupChatMsgProcessFailedCounter,
)
}
@@ -24,3 +24,7 @@ var (
Help: "The number of online user num",
})
)
func RegistryMsgGateway() {
registry.MustRegister(OnlineUserGauge)
}
+7
View File
@@ -28,3 +28,10 @@ var (
Help: "The number of messages with a push time exceeding 10 seconds",
})
)
func RegistryPush() {
registry.MustRegister(
MsgOfflinePushFailedCounter,
MsgLoneTimePushCounter,
)
}
+4
View File
@@ -8,3 +8,7 @@ var (
Help: "The number of user login",
})
)
func RegistryUser() {
registry.MustRegister(UserRegisterCounter)
}
+75 -2
View File
@@ -15,14 +15,42 @@
package prommetrics
import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors"
"errors"
"fmt"
"net"
"net/http"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors"
"github.com/prometheus/client_golang/prometheus/promhttp"
)
const commonPath = "/metrics"
var registry = &prometheusRegistry{prometheus.NewRegistry()}
type prometheusRegistry struct {
*prometheus.Registry
}
func (x *prometheusRegistry) MustRegister(cs ...prometheus.Collector) {
for _, c := range cs {
if err := x.Registry.Register(c); err != nil {
if errors.As(err, &prometheus.AlreadyRegisteredError{}) {
continue
}
panic(err)
}
}
}
func init() {
registry.MustRegister(
collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}),
collectors.NewGoCollector(),
)
}
var (
baseCollector = []prometheus.Collector{
collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}),
@@ -36,3 +64,48 @@ func Init(registry *prometheus.Registry, listener net.Listener, path string, han
srv.Handle(path, handler)
return http.Serve(listener, srv)
}
func RegistryAll() {
RegistryApi()
RegistryAuth()
RegistryMsg()
RegistryMsgGateway()
RegistryPush()
RegistryUser()
RegistryRpc()
RegistryTransfer()
}
func Start(listener net.Listener) error {
srv := http.NewServeMux()
srv.Handle(commonPath, promhttp.HandlerFor(registry, promhttp.HandlerOpts{}))
return http.Serve(listener, srv)
}
const (
APIKeyName = "api"
MessageTransferKeyName = "message-transfer"
)
type Target struct {
Target string `json:"target"`
Labels map[string]string `json:"labels"`
}
type RespTarget struct {
Targets []string `json:"targets"`
Labels map[string]string `json:"labels"`
}
func BuildDiscoveryKey(name string) string {
return fmt.Sprintf("%s/%s/%s", "openim", "prometheus_discovery", name)
}
func BuildDefaultTarget(host string, ip int) Target {
return Target{
Target: fmt.Sprintf("%s:%d", host, ip),
Labels: map[string]string{
"namespace": "default",
},
}
}
@@ -14,6 +14,8 @@
package prommetrics
import "testing"
//func TestNewGrpcPromObj(t *testing.T) {
// // Create a custom metric to pass into the NewGrpcPromObj function.
// customMetric := prometheus.NewCounter(prometheus.CounterOpts{
@@ -67,3 +69,9 @@ package prommetrics
// })
// }
//}
func TestName(t *testing.T) {
RegistryApi()
RegistryApi()
}
+7 -2
View File
@@ -1,12 +1,13 @@
package prommetrics
import (
"net"
"strconv"
gp "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"net"
"strconv"
)
const rpcPath = commonPath
@@ -22,6 +23,10 @@ var (
)
)
func RegistryRpc() {
registry.MustRegister(rpcCounter)
}
func RpcInit(cs []prometheus.Collector, listener net.Listener) error {
reg := prometheus.NewRegistry()
cs = append(append(
+12 -1
View File
@@ -15,9 +15,10 @@
package prommetrics
import (
"net"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"net"
)
var (
@@ -43,6 +44,16 @@ var (
})
)
func RegistryTransfer() {
registry.MustRegister(
MsgInsertRedisSuccessCounter,
MsgInsertRedisFailedCounter,
MsgInsertMongoSuccessCounter,
MsgInsertMongoFailedCounter,
SeqSetFailedCounter,
)
}
func TransferInit(listener net.Listener) error {
reg := prometheus.NewRegistry()
cs := append(
-15
View File
@@ -1,15 +0,0 @@
// Copyright © 2024 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package redispubsub // import "github.com/openimsdk/open-im-server/v3/pkg/common/redispubsub"
-30
View File
@@ -1,30 +0,0 @@
// Copyright © 2024 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package redispubsub
import "github.com/redis/go-redis/v9"
type Publisher struct {
client redis.UniversalClient
channel string
}
func NewPublisher(client redis.UniversalClient, channel string) *Publisher {
return &Publisher{client: client, channel: channel}
}
func (p *Publisher) Publish(message string) error {
return p.client.Publish(ctx, p.channel, message).Err()
}
-49
View File
@@ -1,49 +0,0 @@
// Copyright © 2024 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package redispubsub
import (
"context"
"github.com/redis/go-redis/v9"
)
var ctx = context.Background()
type Subscriber struct {
client redis.UniversalClient
channel string
}
func NewSubscriber(client redis.UniversalClient, channel string) *Subscriber {
return &Subscriber{client: client, channel: channel}
}
func (s *Subscriber) OnMessage(ctx context.Context, callback func(string)) error {
messageChannel := s.client.Subscribe(ctx, s.channel).Channel()
go func() {
for {
select {
case <-ctx.Done():
return
case msg := <-messageChannel:
callback(msg.Payload)
}
}
}()
return nil
}
-15
View File
@@ -1,15 +0,0 @@
// Copyright © 2024 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package startrpc // import "github.com/openimsdk/open-im-server/v3/pkg/common/startrpc"
+3 -94
View File
@@ -37,8 +37,7 @@ import (
"github.com/openimsdk/tools/discovery"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/log"
grpccli "github.com/openimsdk/tools/mw/grpc/client"
grpcsrv "github.com/openimsdk/tools/mw/grpc/server"
"github.com/openimsdk/tools/mw"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
@@ -47,64 +46,6 @@ func init() {
prommetrics.RegistryAll()
}
func getConfigRpcMaxRequestBody(value reflect.Value) *conf.MaxRequestBody {
for value.Kind() == reflect.Pointer {
value = value.Elem()
}
if value.Kind() == reflect.Struct {
num := value.NumField()
for i := 0; i < num; i++ {
field := value.Field(i)
if !field.CanInterface() {
continue
}
for field.Kind() == reflect.Pointer {
field = field.Elem()
}
switch elem := field.Interface().(type) {
case conf.Share:
return &elem.RPCMaxBodySize
case conf.MaxRequestBody:
return &elem
}
if field.Kind() == reflect.Struct {
if elem := getConfigRpcMaxRequestBody(field); elem != nil {
return elem
}
}
}
}
return nil
}
func getConfigShare(value reflect.Value) *conf.Share {
for value.Kind() == reflect.Pointer {
value = value.Elem()
}
if value.Kind() == reflect.Struct {
num := value.NumField()
for i := 0; i < num; i++ {
field := value.Field(i)
if !field.CanInterface() {
continue
}
for field.Kind() == reflect.Pointer {
field = field.Elem()
}
switch elem := field.Interface().(type) {
case conf.Share:
return &elem
}
if field.Kind() == reflect.Struct {
if elem := getConfigShare(field); elem != nil {
return elem
}
}
}
}
return nil
}
func Start[T any](ctx context.Context, disc *conf.Discovery, prometheusConfig *conf.Prometheus, listenIP,
registerIP string, autoSetPorts bool, rpcPorts []int, index int, rpcRegisterName string, notification *conf.Notification, config T,
watchConfigNames []string, watchServiceNames []string,
@@ -115,32 +56,7 @@ func Start[T any](ctx context.Context, disc *conf.Discovery, prometheusConfig *c
conf.InitNotification(notification)
}
maxRequestBody := getConfigRpcMaxRequestBody(reflect.ValueOf(config))
shareConfig := getConfigShare(reflect.ValueOf(config))
log.ZDebug(ctx, "rpc start", "rpcMaxRequestBody", maxRequestBody, "rpcRegisterName", rpcRegisterName, "registerIP", registerIP, "listenIP", listenIP)
options = append(options,
grpcsrv.GrpcServerMetadataContext(),
grpcsrv.GrpcServerLogger(),
grpcsrv.GrpcServerErrorConvert(),
grpcsrv.GrpcServerRequestValidate(),
grpcsrv.GrpcServerPanicCapture(),
)
if shareConfig != nil && len(shareConfig.IMAdminUserID) > 0 {
options = append(options, grpcServerIMAdminUserID(shareConfig.IMAdminUserID))
}
var clientOptions []grpc.DialOption
if maxRequestBody != nil {
if maxRequestBody.RequestMaxBodySize > 0 {
options = append(options, grpc.MaxRecvMsgSize(maxRequestBody.RequestMaxBodySize))
clientOptions = append(clientOptions, grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(maxRequestBody.RequestMaxBodySize)))
}
if maxRequestBody.ResponseMaxBodySize > 0 {
options = append(options, grpc.MaxSendMsgSize(maxRequestBody.ResponseMaxBodySize))
clientOptions = append(clientOptions, grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(maxRequestBody.ResponseMaxBodySize)))
}
}
options = append(options, mw.GrpcServer())
registerIP, err := network.GetRpcRegisterIP(registerIP)
if err != nil {
@@ -166,16 +82,9 @@ func Start[T any](ctx context.Context, disc *conf.Discovery, prometheusConfig *c
defer client.Close()
client.AddOption(
grpc.WithTransportCredentials(insecure.NewCredentials()),
mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin")),
grpccli.GrpcClientLogger(),
grpccli.GrpcClientContext(),
grpccli.GrpcClientErrorConvert(),
)
if len(clientOptions) > 0 {
client.AddOption(clientOptions...)
}
ctx, cancel := context.WithCancelCause(ctx)
+50
View File
@@ -0,0 +1,50 @@
package mcache
import (
"context"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
"github.com/openimsdk/tools/s3/minio"
)
func NewMinioCache(cache database.Cache) minio.Cache {
return &minioCache{
cache: cache,
expireTime: time.Hour * 24 * 7,
}
}
type minioCache struct {
cache database.Cache
expireTime time.Duration
}
func (g *minioCache) getObjectImageInfoKey(key string) string {
return cachekey.GetObjectImageInfoKey(key)
}
func (g *minioCache) getMinioImageThumbnailKey(key string, format string, width int, height int) string {
return cachekey.GetMinioImageThumbnailKey(key, format, width, height)
}
func (g *minioCache) DelObjectImageInfoKey(ctx context.Context, keys ...string) error {
ks := make([]string, 0, len(keys))
for _, key := range keys {
ks = append(ks, g.getObjectImageInfoKey(key))
}
return g.cache.Del(ctx, ks)
}
func (g *minioCache) DelImageThumbnailKey(ctx context.Context, key string, format string, width int, height int) error {
return g.cache.Del(ctx, []string{g.getMinioImageThumbnailKey(key, format, width, height)})
}
func (g *minioCache) GetImageObjectKeyInfo(ctx context.Context, key string, fn func(ctx context.Context) (*minio.ImageInfo, error)) (*minio.ImageInfo, error) {
return getCache[*minio.ImageInfo](ctx, g.cache, g.getObjectImageInfoKey(key), g.expireTime, fn)
}
func (g *minioCache) GetThumbnailKey(ctx context.Context, key string, format string, width int, height int, minioCache func(ctx context.Context) (string, error)) (string, error) {
return getCache[string](ctx, g.cache, g.getMinioImageThumbnailKey(key, format, width, height), g.expireTime, minioCache)
}
+132
View File
@@ -0,0 +1,132 @@
package mcache
import (
"context"
"strconv"
"sync"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/open-im-server/v3/pkg/localcache"
"github.com/openimsdk/open-im-server/v3/pkg/localcache/lru"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/utils/datautil"
"github.com/redis/go-redis/v9"
)
var (
memMsgCache lru.LRU[string, *model.MsgInfoModel]
initMemMsgCache sync.Once
)
func NewMsgCache(cache database.Cache, msgDocDatabase database.Msg) cache.MsgCache {
initMemMsgCache.Do(func() {
memMsgCache = lru.NewLayLRU[string, *model.MsgInfoModel](1024*8, time.Hour, time.Second*10, localcache.EmptyTarget{}, nil)
})
return &msgCache{
cache: cache,
msgDocDatabase: msgDocDatabase,
memMsgCache: memMsgCache,
}
}
type msgCache struct {
cache database.Cache
msgDocDatabase database.Msg
memMsgCache lru.LRU[string, *model.MsgInfoModel]
}
func (x *msgCache) getSendMsgKey(id string) string {
return cachekey.GetSendMsgKey(id)
}
func (x *msgCache) SetSendMsgStatus(ctx context.Context, id string, status int32) error {
return x.cache.Set(ctx, x.getSendMsgKey(id), strconv.Itoa(int(status)), time.Hour*24)
}
func (x *msgCache) GetSendMsgStatus(ctx context.Context, id string) (int32, error) {
key := x.getSendMsgKey(id)
res, err := x.cache.Get(ctx, []string{key})
if err != nil {
return 0, err
}
val, ok := res[key]
if !ok {
return 0, errs.Wrap(redis.Nil)
}
status, err := strconv.Atoi(val)
if err != nil {
return 0, errs.WrapMsg(err, "GetSendMsgStatus strconv.Atoi error", "val", val)
}
return int32(status), nil
}
func (x *msgCache) getMsgCacheKey(conversationID string, seq int64) string {
return cachekey.GetMsgCacheKey(conversationID, seq)
}
func (x *msgCache) GetMessageBySeqs(ctx context.Context, conversationID string, seqs []int64) ([]*model.MsgInfoModel, error) {
if len(seqs) == 0 {
return nil, nil
}
keys := make([]string, 0, len(seqs))
keySeq := make(map[string]int64, len(seqs))
for _, seq := range seqs {
key := x.getMsgCacheKey(conversationID, seq)
keys = append(keys, key)
keySeq[key] = seq
}
res, err := x.memMsgCache.GetBatch(keys, func(keys []string) (map[string]*model.MsgInfoModel, error) {
findSeqs := make([]int64, 0, len(keys))
for _, key := range keys {
seq, ok := keySeq[key]
if !ok {
continue
}
findSeqs = append(findSeqs, seq)
}
res, err := x.msgDocDatabase.FindSeqs(ctx, conversationID, seqs)
if err != nil {
return nil, err
}
kv := make(map[string]*model.MsgInfoModel)
for i := range res {
msg := res[i]
if msg == nil || msg.Msg == nil || msg.Msg.Seq <= 0 {
continue
}
key := x.getMsgCacheKey(conversationID, msg.Msg.Seq)
kv[key] = msg
}
return kv, nil
})
if err != nil {
return nil, err
}
return datautil.Values(res), nil
}
func (x msgCache) DelMessageBySeqs(ctx context.Context, conversationID string, seqs []int64) error {
if len(seqs) == 0 {
return nil
}
for _, seq := range seqs {
x.memMsgCache.Del(x.getMsgCacheKey(conversationID, seq))
}
return nil
}
func (x *msgCache) SetMessageBySeqs(ctx context.Context, conversationID string, msgs []*model.MsgInfoModel) error {
for i := range msgs {
msg := msgs[i]
if msg == nil || msg.Msg == nil || msg.Msg.Seq <= 0 {
continue
}
x.memMsgCache.Set(x.getMsgCacheKey(conversationID, msg.Msg.Seq), msg)
}
return nil
}
+82
View File
@@ -0,0 +1,82 @@
package mcache
import (
"context"
"sync"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
)
var (
globalOnlineCache cache.OnlineCache
globalOnlineOnce sync.Once
)
func NewOnlineCache() cache.OnlineCache {
globalOnlineOnce.Do(func() {
globalOnlineCache = &onlineCache{
user: make(map[string]map[int32]struct{}),
}
})
return globalOnlineCache
}
type onlineCache struct {
lock sync.RWMutex
user map[string]map[int32]struct{}
}
func (x *onlineCache) GetOnline(ctx context.Context, userID string) ([]int32, error) {
x.lock.RLock()
defer x.lock.RUnlock()
pSet, ok := x.user[userID]
if !ok {
return nil, nil
}
res := make([]int32, 0, len(pSet))
for k := range pSet {
res = append(res, k)
}
return res, nil
}
func (x *onlineCache) SetUserOnline(ctx context.Context, userID string, online, offline []int32) error {
x.lock.Lock()
defer x.lock.Unlock()
pSet, ok := x.user[userID]
if ok {
for _, p := range offline {
delete(pSet, p)
}
}
if len(online) > 0 {
if !ok {
pSet = make(map[int32]struct{})
x.user[userID] = pSet
}
for _, p := range online {
pSet[p] = struct{}{}
}
}
if len(pSet) == 0 {
delete(x.user, userID)
}
return nil
}
func (x *onlineCache) GetAllOnlineUsers(ctx context.Context, cursor uint64) (map[string][]int32, uint64, error) {
if cursor != 0 {
return nil, 0, nil
}
x.lock.RLock()
defer x.lock.RUnlock()
res := make(map[string][]int32)
for k, v := range x.user {
pSet := make([]int32, 0, len(v))
for p := range v {
pSet = append(pSet, p)
}
res[k] = pSet
}
return res, 0, nil
}
+79
View File
@@ -0,0 +1,79 @@
package mcache
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
)
func NewSeqConversationCache(sc database.SeqConversation) cache.SeqConversationCache {
return &seqConversationCache{
sc: sc,
}
}
type seqConversationCache struct {
sc database.SeqConversation
}
func (x *seqConversationCache) Malloc(ctx context.Context, conversationID string, size int64) (int64, error) {
return x.sc.Malloc(ctx, conversationID, size)
}
func (x *seqConversationCache) SetMinSeq(ctx context.Context, conversationID string, seq int64) error {
return x.sc.SetMinSeq(ctx, conversationID, seq)
}
func (x *seqConversationCache) GetMinSeq(ctx context.Context, conversationID string) (int64, error) {
return x.sc.GetMinSeq(ctx, conversationID)
}
func (x *seqConversationCache) GetMaxSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error) {
res := make(map[string]int64)
for _, conversationID := range conversationIDs {
seq, err := x.GetMinSeq(ctx, conversationID)
if err != nil {
return nil, err
}
res[conversationID] = seq
}
return res, nil
}
func (x *seqConversationCache) GetMaxSeqsWithTime(ctx context.Context, conversationIDs []string) (map[string]database.SeqTime, error) {
res := make(map[string]database.SeqTime)
for _, conversationID := range conversationIDs {
seq, err := x.GetMinSeq(ctx, conversationID)
if err != nil {
return nil, err
}
res[conversationID] = database.SeqTime{Seq: seq}
}
return res, nil
}
func (x *seqConversationCache) GetMaxSeq(ctx context.Context, conversationID string) (int64, error) {
return x.sc.GetMaxSeq(ctx, conversationID)
}
func (x *seqConversationCache) GetMaxSeqWithTime(ctx context.Context, conversationID string) (database.SeqTime, error) {
seq, err := x.GetMinSeq(ctx, conversationID)
if err != nil {
return database.SeqTime{}, err
}
return database.SeqTime{Seq: seq}, nil
}
func (x *seqConversationCache) SetMinSeqs(ctx context.Context, seqs map[string]int64) error {
for conversationID, seq := range seqs {
if err := x.sc.SetMinSeq(ctx, conversationID, seq); err != nil {
return err
}
}
return nil
}
func (x *seqConversationCache) GetCacheMaxSeqWithTime(ctx context.Context, conversationIDs []string) (map[string]database.SeqTime, error) {
return x.GetMaxSeqsWithTime(ctx, conversationIDs)
}
+98
View File
@@ -0,0 +1,98 @@
package mcache
import (
"context"
"strconv"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
"github.com/openimsdk/tools/errs"
"github.com/redis/go-redis/v9"
)
func NewThirdCache(cache database.Cache) cache.ThirdCache {
return &thirdCache{
cache: cache,
}
}
type thirdCache struct {
cache database.Cache
}
func (c *thirdCache) getGetuiTokenKey() string {
return cachekey.GetGetuiTokenKey()
}
func (c *thirdCache) getGetuiTaskIDKey() string {
return cachekey.GetGetuiTaskIDKey()
}
func (c *thirdCache) getUserBadgeUnreadCountSumKey(userID string) string {
return cachekey.GetUserBadgeUnreadCountSumKey(userID)
}
func (c *thirdCache) getFcmAccountTokenKey(account string, platformID int) string {
return cachekey.GetFcmAccountTokenKey(account, platformID)
}
func (c *thirdCache) get(ctx context.Context, key string) (string, error) {
res, err := c.cache.Get(ctx, []string{key})
if err != nil {
return "", err
}
if val, ok := res[key]; ok {
return val, nil
}
return "", errs.Wrap(redis.Nil)
}
func (c *thirdCache) SetFcmToken(ctx context.Context, account string, platformID int, fcmToken string, expireTime int64) (err error) {
return errs.Wrap(c.cache.Set(ctx, c.getFcmAccountTokenKey(account, platformID), fcmToken, time.Duration(expireTime)*time.Second))
}
func (c *thirdCache) GetFcmToken(ctx context.Context, account string, platformID int) (string, error) {
return c.get(ctx, c.getFcmAccountTokenKey(account, platformID))
}
func (c *thirdCache) DelFcmToken(ctx context.Context, account string, platformID int) error {
return c.cache.Del(ctx, []string{c.getFcmAccountTokenKey(account, platformID)})
}
func (c *thirdCache) IncrUserBadgeUnreadCountSum(ctx context.Context, userID string) (int, error) {
return c.cache.Incr(ctx, c.getUserBadgeUnreadCountSumKey(userID), 1)
}
func (c *thirdCache) SetUserBadgeUnreadCountSum(ctx context.Context, userID string, value int) error {
return c.cache.Set(ctx, c.getUserBadgeUnreadCountSumKey(userID), strconv.Itoa(value), 0)
}
func (c *thirdCache) GetUserBadgeUnreadCountSum(ctx context.Context, userID string) (int, error) {
str, err := c.get(ctx, c.getUserBadgeUnreadCountSumKey(userID))
if err != nil {
return 0, err
}
val, err := strconv.Atoi(str)
if err != nil {
return 0, errs.WrapMsg(err, "strconv.Atoi", "str", str)
}
return val, nil
}
func (c *thirdCache) SetGetuiToken(ctx context.Context, token string, expireTime int64) error {
return c.cache.Set(ctx, c.getGetuiTokenKey(), token, time.Duration(expireTime)*time.Second)
}
func (c *thirdCache) GetGetuiToken(ctx context.Context) (string, error) {
return c.get(ctx, c.getGetuiTokenKey())
}
func (c *thirdCache) SetGetuiTaskID(ctx context.Context, taskID string, expireTime int64) error {
return c.cache.Set(ctx, c.getGetuiTaskIDKey(), taskID, time.Duration(expireTime)*time.Second)
}
func (c *thirdCache) GetGetuiTaskID(ctx context.Context) (string, error) {
return c.get(ctx, c.getGetuiTaskIDKey())
}
+1 -37
View File
@@ -27,6 +27,7 @@ type tokenCache struct {
func (x *tokenCache) getTokenKey(userID string, platformID int, token string) string {
return cachekey.GetTokenKey(userID, platformID) + ":" + token
}
func (x *tokenCache) SetTokenFlag(ctx context.Context, userID string, platformID int, token string, flag int) error {
@@ -56,14 +57,6 @@ func (x *tokenCache) GetTokensWithoutError(ctx context.Context, userID string, p
return mm, nil
}
func (x *tokenCache) HasTemporaryToken(ctx context.Context, userID string, platformID int, token string) error {
key := cachekey.GetTemporaryTokenKey(userID, platformID, token)
if _, err := x.cache.Get(ctx, []string{key}); err != nil {
return err
}
return nil
}
func (x *tokenCache) GetAllTokensWithoutError(ctx context.Context, userID string) (map[int]map[string]int, error) {
prefix := cachekey.UidPidToken + userID + ":"
tokens, err := x.cache.Prefix(ctx, prefix)
@@ -135,32 +128,3 @@ func (x *tokenCache) DeleteTokenByUidPid(ctx context.Context, userID string, pla
func (x *tokenCache) getExpireTime(t int64) time.Duration {
return time.Hour * 24 * time.Duration(t)
}
func (x *tokenCache) DeleteTokenByTokenMap(ctx context.Context, userID string, tokens map[int][]string) error {
keys := make([]string, 0, len(tokens))
for platformID, ts := range tokens {
for _, t := range ts {
keys = append(keys, x.getTokenKey(userID, platformID, t))
}
}
return x.cache.Del(ctx, keys)
}
func (x *tokenCache) DeleteAndSetTemporary(ctx context.Context, userID string, platformID int, fields []string) error {
keys := make([]string, 0, len(fields))
for _, f := range fields {
keys = append(keys, x.getTokenKey(userID, platformID, f))
}
if err := x.cache.Del(ctx, keys); err != nil {
return err
}
for _, f := range fields {
k := cachekey.GetTemporaryTokenKey(userID, platformID, f)
if err := x.cache.Set(ctx, k, "", time.Minute*5); err != nil {
return errs.Wrap(err)
}
}
return nil
}
+63
View File
@@ -0,0 +1,63 @@
package mcache
import (
"context"
"encoding/json"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
"github.com/openimsdk/tools/log"
)
func getCache[V any](ctx context.Context, cache database.Cache, key string, expireTime time.Duration, fn func(ctx context.Context) (V, error)) (V, error) {
getDB := func() (V, bool, error) {
res, err := cache.Get(ctx, []string{key})
if err != nil {
var val V
return val, false, err
}
var val V
if str, ok := res[key]; ok {
if json.Unmarshal([]byte(str), &val) != nil {
return val, false, err
}
return val, true, nil
}
return val, false, nil
}
dbVal, ok, err := getDB()
if err != nil {
return dbVal, err
}
if ok {
return dbVal, nil
}
lockValue, err := cache.Lock(ctx, key, time.Minute)
if err != nil {
return dbVal, err
}
defer func() {
if err := cache.Unlock(ctx, key, lockValue); err != nil {
log.ZError(ctx, "unlock cache key", err, "key", key, "value", lockValue)
}
}()
dbVal, ok, err = getDB()
if err != nil {
return dbVal, err
}
if ok {
return dbVal, nil
}
val, err := fn(ctx)
if err != nil {
return val, err
}
data, err := json.Marshal(val)
if err != nil {
return val, err
}
if err := cache.Set(ctx, key, string(data), expireTime); err != nil {
return val, err
}
return val, nil
}
+50 -13
View File
@@ -3,28 +3,65 @@ package redis
import (
"context"
"encoding/json"
"time"
"github.com/dtm-labs/rockscache"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/log"
"github.com/redis/go-redis/v9"
"golang.org/x/sync/singleflight"
"time"
"unsafe"
)
func getRocksCacheRedisClient(cli *rockscache.Client) redis.UniversalClient {
type Client struct {
rdb redis.UniversalClient
_ rockscache.Options
_ singleflight.Group
}
return (*Client)(unsafe.Pointer(cli)).rdb
// GetRocksCacheOptions returns the default configuration options for RocksCache.
func GetRocksCacheOptions() *rockscache.Options {
opts := rockscache.NewDefaultOptions()
opts.LockExpire = rocksCacheTimeout
opts.WaitReplicasTimeout = rocksCacheTimeout
opts.StrongConsistency = true
opts.RandomExpireAdjustment = 0.2
return &opts
}
func batchGetCache2[K comparable, V any](ctx context.Context, rcClient *rockscache.Client, expire time.Duration, ids []K, idKey func(id K) string, vId func(v *V) K, fn func(ctx context.Context, ids []K) ([]*V, error)) ([]*V, error) {
func newRocksCacheClient(rdb redis.UniversalClient) *rocksCacheClient {
if rdb == nil {
return &rocksCacheClient{}
}
rc := &rocksCacheClient{
rdb: rdb,
client: rockscache.NewClient(rdb, *GetRocksCacheOptions()),
}
return rc
}
type rocksCacheClient struct {
rdb redis.UniversalClient
client *rockscache.Client
}
func (x *rocksCacheClient) GetClient() *rockscache.Client {
return x.client
}
func (x *rocksCacheClient) Disable() bool {
return x.client == nil
}
func (x *rocksCacheClient) GetRedis() redis.UniversalClient {
return x.rdb
}
func (x *rocksCacheClient) GetBatchDeleter(topics ...string) cache.BatchDeleter {
return NewBatchDeleterRedis(x, topics)
}
func batchGetCache2[K comparable, V any](ctx context.Context, rcClient *rocksCacheClient, expire time.Duration, ids []K, idKey func(id K) string, vId func(v *V) K, fn func(ctx context.Context, ids []K) ([]*V, error)) ([]*V, error) {
if len(ids) == 0 {
return nil, nil
}
if rcClient.Disable() {
return fn(ctx, ids)
}
findKeys := make([]string, 0, len(ids))
keyId := make(map[string]K)
for _, id := range ids {
@@ -35,13 +72,13 @@ func batchGetCache2[K comparable, V any](ctx context.Context, rcClient *rockscac
keyId[key] = id
findKeys = append(findKeys, key)
}
slotKeys, err := groupKeysBySlot(ctx, getRocksCacheRedisClient(rcClient), findKeys)
slotKeys, err := groupKeysBySlot(ctx, rcClient.GetRedis(), findKeys)
if err != nil {
return nil, err
}
result := make([]*V, 0, len(findKeys))
for _, keys := range slotKeys {
indexCache, err := rcClient.FetchBatch2(ctx, keys, expire, func(idx []int) (map[int]string, error) {
indexCache, err := rcClient.GetClient().FetchBatch2(ctx, keys, expire, func(idx []int) (map[int]string, error) {
queryIds := make([]K, 0, len(idx))
idIndex := make(map[K]int)
for _, index := range idx {
+23 -56
View File
@@ -1,23 +1,11 @@
// Copyright © 2023 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package redis
import (
"context"
"encoding/json"
"fmt"
"time"
"github.com/dtm-labs/rockscache"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
"github.com/openimsdk/open-im-server/v3/pkg/localcache"
@@ -25,7 +13,6 @@ import (
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/utils/datautil"
"github.com/redis/go-redis/v9"
"time"
)
const (
@@ -41,10 +28,10 @@ type BatchDeleterRedis struct {
}
// NewBatchDeleterRedis creates a new BatchDeleterRedis instance.
func NewBatchDeleterRedis(redisClient redis.UniversalClient, options *rockscache.Options, redisPubTopics []string) *BatchDeleterRedis {
func NewBatchDeleterRedis(rcClient *rocksCacheClient, redisPubTopics []string) *BatchDeleterRedis {
return &BatchDeleterRedis{
redisClient: redisClient,
rocksClient: rockscache.NewClient(redisClient, *options),
redisClient: rcClient.GetRedis(),
rocksClient: rcClient.GetClient(),
redisPubTopics: redisPubTopics,
}
}
@@ -107,21 +94,29 @@ func (c *BatchDeleterRedis) AddKeys(keys ...string) {
c.keys = append(c.keys, keys...)
}
// GetRocksCacheOptions returns the default configuration options for RocksCache.
func GetRocksCacheOptions() *rockscache.Options {
opts := rockscache.NewDefaultOptions()
opts.LockExpire = rocksCacheTimeout
opts.WaitReplicasTimeout = rocksCacheTimeout
opts.StrongConsistency = true
opts.RandomExpireAdjustment = 0.2
type disableBatchDeleter struct{}
return &opts
func (x disableBatchDeleter) ChainExecDel(ctx context.Context) error {
return nil
}
func getCache[T any](ctx context.Context, rcClient *rockscache.Client, key string, expire time.Duration, fn func(ctx context.Context) (T, error)) (T, error) {
func (x disableBatchDeleter) ExecDelWithKeys(ctx context.Context, keys []string) error {
return nil
}
func (x disableBatchDeleter) Clone() cache.BatchDeleter {
return x
}
func (x disableBatchDeleter) AddKeys(keys ...string) {}
func getCache[T any](ctx context.Context, rcClient *rocksCacheClient, key string, expire time.Duration, fn func(ctx context.Context) (T, error)) (T, error) {
if rcClient.Disable() {
return fn(ctx)
}
var t T
var write bool
v, err := rcClient.Fetch2(ctx, key, expire, func() (s string, err error) {
v, err := rcClient.GetClient().Fetch2(ctx, key, expire, func() (s string, err error) {
t, err = fn(ctx)
if err != nil {
//log.ZError(ctx, "getCache query database failed", err, "key", key)
@@ -152,31 +147,3 @@ func getCache[T any](ctx context.Context, rcClient *rockscache.Client, key strin
return t, nil
}
//func batchGetCache[T any, K comparable](
// ctx context.Context,
// rcClient *rockscache.Client,
// expire time.Duration,
// keys []K,
// keyFn func(key K) string,
// fns func(ctx context.Context, key K) (T, error),
//) ([]T, error) {
// if len(keys) == 0 {
// return nil, nil
// }
// res := make([]T, 0, len(keys))
// for _, key := range keys {
// val, err := getCache(ctx, rcClient, keyFn(key), expire, func(ctx context.Context) (T, error) {
// return fns(ctx, key)
// })
// if err != nil {
// if errs.ErrRecordNotFound.Is(specialerror.ErrCode(errs.Unwrap(err))) {
// continue
// }
// return nil, errs.Wrap(err)
// }
// res = append(res, val)
// }
//
// return res, nil
//}
+7 -24
View File
@@ -1,29 +1,14 @@
// Copyright © 2023 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package redis
import (
"context"
"github.com/dtm-labs/rockscache"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
"github.com/openimsdk/tools/log"
"github.com/redis/go-redis/v9"
"time"
)
const (
@@ -33,18 +18,16 @@ const (
type BlackCacheRedis struct {
cache.BatchDeleter
expireTime time.Duration
rcClient *rockscache.Client
rcClient *rocksCacheClient
blackDB database.Black
}
func NewBlackCacheRedis(rdb redis.UniversalClient, localCache *config.LocalCache, blackDB database.Black, options *rockscache.Options) cache.BlackCache {
batchHandler := NewBatchDeleterRedis(rdb, options, []string{localCache.Friend.Topic})
b := localCache.Friend
log.ZDebug(context.Background(), "black local cache init", "Topic", b.Topic, "SlotNum", b.SlotNum, "SlotSize", b.SlotSize, "enable", b.Enable())
func NewBlackCacheRedis(rdb redis.UniversalClient, localCache *config.LocalCache, blackDB database.Black) cache.BlackCache {
rc := newRocksCacheClient(rdb)
return &BlackCacheRedis{
BatchDeleter: batchHandler,
BatchDeleter: rc.GetBatchDeleter(localCache.Friend.Topic),
expireTime: blackExpireTime,
rcClient: rockscache.NewClient(rdb, *options),
rcClient: rc,
blackDB: blackDB,
}
}
+9 -26
View File
@@ -1,47 +1,30 @@
// Copyright © 2023 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package redis
import (
"context"
"github.com/dtm-labs/rockscache"
"math/big"
"strings"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/utils/datautil"
"github.com/openimsdk/tools/utils/encrypt"
"github.com/redis/go-redis/v9"
"math/big"
"strings"
"time"
)
const (
conversationExpireTime = time.Second * 60 * 60 * 12
)
func NewConversationRedis(rdb redis.UniversalClient, localCache *config.LocalCache, opts *rockscache.Options, db database.Conversation) cache.ConversationCache {
batchHandler := NewBatchDeleterRedis(rdb, opts, []string{localCache.Conversation.Topic})
c := localCache.Conversation
log.ZDebug(context.Background(), "conversation local cache init", "Topic", c.Topic, "SlotNum", c.SlotNum, "SlotSize", c.SlotSize, "enable", c.Enable())
func NewConversationRedis(rdb redis.UniversalClient, localCache *config.LocalCache, db database.Conversation) cache.ConversationCache {
rc := newRocksCacheClient(rdb)
return &ConversationRedisCache{
BatchDeleter: batchHandler,
rcClient: rockscache.NewClient(rdb, *opts),
BatchDeleter: rc.GetBatchDeleter(localCache.Conversation.Topic),
rcClient: rc,
conversationDB: db,
expireTime: conversationExpireTime,
}
@@ -49,7 +32,7 @@ func NewConversationRedis(rdb redis.UniversalClient, localCache *config.LocalCac
type ConversationRedisCache struct {
cache.BatchDeleter
rcClient *rockscache.Client
rcClient *rocksCacheClient
conversationDB database.Conversation
expireTime time.Duration
}
-15
View File
@@ -1,15 +0,0 @@
// Copyright © 2023 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package redis
+5 -24
View File
@@ -1,30 +1,14 @@
// Copyright © 2023 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package redis
import (
"context"
"time"
"github.com/dtm-labs/rockscache"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/utils/datautil"
"github.com/redis/go-redis/v9"
)
@@ -38,21 +22,18 @@ type FriendCacheRedis struct {
cache.BatchDeleter
friendDB database.Friend
expireTime time.Duration
rcClient *rockscache.Client
rcClient *rocksCacheClient
syncCount int
}
// NewFriendCacheRedis creates a new instance of FriendCacheRedis.
func NewFriendCacheRedis(rdb redis.UniversalClient, localCache *config.LocalCache, friendDB database.Friend,
options *rockscache.Options) cache.FriendCache {
batchHandler := NewBatchDeleterRedis(rdb, options, []string{localCache.Friend.Topic})
f := localCache.Friend
log.ZDebug(context.Background(), "friend local cache init", "Topic", f.Topic, "SlotNum", f.SlotNum, "SlotSize", f.SlotSize, "enable", f.Enable())
func NewFriendCacheRedis(rdb redis.UniversalClient, localCache *config.LocalCache, friendDB database.Friend) cache.FriendCache {
rc := newRocksCacheClient(rdb)
return &FriendCacheRedis{
BatchDeleter: batchHandler,
BatchDeleter: rc.GetBatchDeleter(localCache.Friend.Topic),
friendDB: friendDB,
expireTime: friendExpireTime,
rcClient: rockscache.NewClient(rdb, *options),
rcClient: rc,
}
}
+5 -33
View File
@@ -1,17 +1,3 @@
// Copyright © 2023 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package redis
import (
@@ -19,7 +5,6 @@ import (
"fmt"
"time"
"github.com/dtm-labs/rockscache"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey"
@@ -36,34 +21,21 @@ const (
groupExpireTime = time.Second * 60 * 60 * 12
)
var errIndex = errs.New("err index")
type GroupCacheRedis struct {
cache.BatchDeleter
groupDB database.Group
groupMemberDB database.GroupMember
groupRequestDB database.GroupRequest
expireTime time.Duration
rcClient *rockscache.Client
rcClient *rocksCacheClient
groupHash cache.GroupHash
}
func NewGroupCacheRedis(
rdb redis.UniversalClient,
localCache *config.LocalCache,
groupDB database.Group,
groupMemberDB database.GroupMember,
groupRequestDB database.GroupRequest,
hashCode cache.GroupHash,
opts *rockscache.Options,
) cache.GroupCache {
batchHandler := NewBatchDeleterRedis(rdb, opts, []string{localCache.Group.Topic})
g := localCache.Group
log.ZDebug(context.Background(), "group local cache init", "Topic", g.Topic, "SlotNum", g.SlotNum, "SlotSize", g.SlotSize, "enable", g.Enable())
func NewGroupCacheRedis(rdb redis.UniversalClient, localCache *config.LocalCache, groupDB database.Group, groupMemberDB database.GroupMember, groupRequestDB database.GroupRequest, hashCode cache.GroupHash) cache.GroupCache {
rc := newRocksCacheClient(rdb)
return &GroupCacheRedis{
BatchDeleter: batchHandler,
rcClient: rockscache.NewClient(rdb, *opts),
BatchDeleter: rc.GetBatchDeleter(localCache.Group.Topic),
rcClient: rc,
expireTime: groupExpireTime,
groupDB: groupDB,
groupMemberDB: groupMemberDB,
+59
View File
@@ -0,0 +1,59 @@
package redis
import (
"context"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey"
"github.com/openimsdk/tools/s3/minio"
"github.com/redis/go-redis/v9"
)
func NewMinioCache(rdb redis.UniversalClient) minio.Cache {
rc := newRocksCacheClient(rdb)
return &minioCacheRedis{
BatchDeleter: rc.GetBatchDeleter(),
rcClient: rc,
expireTime: time.Hour * 24 * 7,
}
}
type minioCacheRedis struct {
cache.BatchDeleter
rcClient *rocksCacheClient
expireTime time.Duration
}
func (g *minioCacheRedis) getObjectImageInfoKey(key string) string {
return cachekey.GetObjectImageInfoKey(key)
}
func (g *minioCacheRedis) getMinioImageThumbnailKey(key string, format string, width int, height int) string {
return cachekey.GetMinioImageThumbnailKey(key, format, width, height)
}
func (g *minioCacheRedis) DelObjectImageInfoKey(ctx context.Context, keys ...string) error {
ks := make([]string, 0, len(keys))
for _, key := range keys {
ks = append(ks, g.getObjectImageInfoKey(key))
}
return g.BatchDeleter.ExecDelWithKeys(ctx, ks)
}
func (g *minioCacheRedis) DelImageThumbnailKey(ctx context.Context, key string, format string, width int, height int) error {
return g.BatchDeleter.ExecDelWithKeys(ctx, []string{g.getMinioImageThumbnailKey(key, format, width, height)})
}
func (g *minioCacheRedis) GetImageObjectKeyInfo(ctx context.Context, key string, fn func(ctx context.Context) (*minio.ImageInfo, error)) (*minio.ImageInfo, error) {
info, err := getCache(ctx, g.rcClient, g.getObjectImageInfoKey(key), g.expireTime, fn)
if err != nil {
return nil, err
}
return info, nil
}
func (g *minioCacheRedis) GetThumbnailKey(ctx context.Context, key string, format string, width int, height int, minioCache func(ctx context.Context) (string, error)) (string, error) {
return getCache(ctx, g.rcClient, g.getMinioImageThumbnailKey(key, format, width, height), g.expireTime, minioCache)
}
+9 -11
View File
@@ -3,7 +3,8 @@ package redis
import (
"context"
"encoding/json"
"github.com/dtm-labs/rockscache"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
@@ -11,7 +12,6 @@ import (
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/utils/datautil"
"github.com/redis/go-redis/v9"
"time"
) //
// msgCacheTimeout is expiration time of message cache, 86400 seconds
@@ -19,15 +19,13 @@ const msgCacheTimeout = time.Hour * 24
func NewMsgCache(client redis.UniversalClient, db database.Msg) cache.MsgCache {
return &msgCache{
rdb: client,
rcClient: rockscache.NewClient(client, *GetRocksCacheOptions()),
rcClient: newRocksCacheClient(client),
msgDocDatabase: db,
}
}
type msgCache struct {
rdb redis.UniversalClient
rcClient *rockscache.Client
rcClient *rocksCacheClient
msgDocDatabase database.Msg
}
@@ -36,11 +34,11 @@ func (c *msgCache) getSendMsgKey(id string) string {
}
func (c *msgCache) SetSendMsgStatus(ctx context.Context, id string, status int32) error {
return errs.Wrap(c.rdb.Set(ctx, c.getSendMsgKey(id), status, time.Hour*24).Err())
return errs.Wrap(c.rcClient.GetRedis().Set(ctx, c.getSendMsgKey(id), status, time.Hour*24).Err())
}
func (c *msgCache) GetSendMsgStatus(ctx context.Context, id string) (int32, error) {
result, err := c.rdb.Get(ctx, c.getSendMsgKey(id)).Int()
result, err := c.rcClient.GetRedis().Get(ctx, c.getSendMsgKey(id)).Int()
return int32(result), errs.Wrap(err)
}
@@ -67,12 +65,12 @@ func (c *msgCache) DelMessageBySeqs(ctx context.Context, conversationID string,
keys := datautil.Slice(seqs, func(seq int64) string {
return cachekey.GetMsgCacheKey(conversationID, seq)
})
slotKeys, err := groupKeysBySlot(ctx, getRocksCacheRedisClient(c.rcClient), keys)
slotKeys, err := groupKeysBySlot(ctx, c.rcClient.GetRedis(), keys)
if err != nil {
return err
}
for _, keys := range slotKeys {
if err := c.rcClient.TagAsDeletedBatch2(ctx, keys); err != nil {
if err := c.rcClient.GetClient().TagAsDeletedBatch2(ctx, keys); err != nil {
return err
}
}
@@ -88,7 +86,7 @@ func (c *msgCache) SetMessageBySeqs(ctx context.Context, conversationID string,
if err != nil {
return err
}
if err := c.rcClient.RawSet(ctx, cachekey.GetMsgCacheKey(conversationID, msg.Msg.Seq), string(data), msgCacheTimeout); err != nil {
if err := c.rcClient.GetClient().RawSet(ctx, cachekey.GetMsgCacheKey(conversationID, msg.Msg.Seq), string(data), msgCacheTimeout); err != nil {
return err
}
}
+9 -3
View File
@@ -3,18 +3,24 @@ package redis
import (
"context"
"fmt"
"strconv"
"strings"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/mcache"
"github.com/openimsdk/protocol/constant"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/log"
"github.com/redis/go-redis/v9"
"strconv"
"strings"
"time"
)
func NewUserOnline(rdb redis.UniversalClient) cache.OnlineCache {
if rdb == nil || config.Standalone() {
return mcache.NewOnlineCache()
}
return &userOnline{
rdb: rdb,
expire: cachekey.OnlineExpire,
+82 -82
View File
@@ -2,7 +2,7 @@ package redis
import (
"context"
"github.com/dtm-labs/rockscache"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/log"
"github.com/redis/go-redis/v9"
@@ -28,83 +28,83 @@ type Config struct {
// Option is a function type for configuring Config
type Option func(c *Config)
// NewRedisShardManager creates a new RedisShardManager instance
func NewRedisShardManager(redisClient redis.UniversalClient, opts ...Option) *RedisShardManager {
config := &Config{
batchSize: defaultBatchSize, // Default batch size is 50 keys
continueOnError: false,
concurrentLimit: defaultConcurrentLimit, // Default concurrent limit is 3
}
for _, opt := range opts {
opt(config)
}
rsm := &RedisShardManager{
redisClient: redisClient,
config: config,
}
return rsm
}
// WithBatchSize sets the number of keys to process per batch
func WithBatchSize(size int) Option {
return func(c *Config) {
c.batchSize = size
}
}
// WithContinueOnError sets whether to continue processing on error
func WithContinueOnError(continueOnError bool) Option {
return func(c *Config) {
c.continueOnError = continueOnError
}
}
// WithConcurrentLimit sets the concurrency limit
func WithConcurrentLimit(limit int) Option {
return func(c *Config) {
c.concurrentLimit = limit
}
}
// ProcessKeysBySlot groups keys by their Redis cluster hash slots and processes them using the provided function.
func (rsm *RedisShardManager) ProcessKeysBySlot(
ctx context.Context,
keys []string,
processFunc func(ctx context.Context, slot int64, keys []string) error,
) error {
// Group keys by slot
slots, err := groupKeysBySlot(ctx, rsm.redisClient, keys)
if err != nil {
return err
}
g, ctx := errgroup.WithContext(ctx)
g.SetLimit(rsm.config.concurrentLimit)
// Process keys in each slot using the provided function
for slot, singleSlotKeys := range slots {
batches := splitIntoBatches(singleSlotKeys, rsm.config.batchSize)
for _, batch := range batches {
slot, batch := slot, batch // Avoid closure capture issue
g.Go(func() error {
err := processFunc(ctx, slot, batch)
if err != nil {
log.ZWarn(ctx, "Batch processFunc failed", err, "slot", slot, "keys", batch)
if !rsm.config.continueOnError {
return err
}
}
return nil
})
}
}
if err := g.Wait(); err != nil {
return err
}
return nil
}
//// NewRedisShardManager creates a new RedisShardManager instance
//func NewRedisShardManager(redisClient redis.UniversalClient, opts ...Option) *RedisShardManager {
// config := &Config{
// batchSize: defaultBatchSize, // Default batch size is 50 keys
// continueOnError: false,
// concurrentLimit: defaultConcurrentLimit, // Default concurrent limit is 3
// }
// for _, opt := range opts {
// opt(config)
// }
// rsm := &RedisShardManager{
// redisClient: redisClient,
// config: config,
// }
// return rsm
//}
//
//// WithBatchSize sets the number of keys to process per batch
//func WithBatchSize(size int) Option {
// return func(c *Config) {
// c.batchSize = size
// }
//}
//
//// WithContinueOnError sets whether to continue processing on error
//func WithContinueOnError(continueOnError bool) Option {
// return func(c *Config) {
// c.continueOnError = continueOnError
// }
//}
//
//// WithConcurrentLimit sets the concurrency limit
//func WithConcurrentLimit(limit int) Option {
// return func(c *Config) {
// c.concurrentLimit = limit
// }
//}
//
//// ProcessKeysBySlot groups keys by their Redis cluster hash slots and processes them using the provided function.
//func (rsm *RedisShardManager) ProcessKeysBySlot(
// ctx context.Context,
// keys []string,
// processFunc func(ctx context.Context, slot int64, keys []string) error,
//) error {
//
// // Group keys by slot
// slots, err := groupKeysBySlot(ctx, rsm.redisClient, keys)
// if err != nil {
// return err
// }
//
// g, ctx := errgroup.WithContext(ctx)
// g.SetLimit(rsm.config.concurrentLimit)
//
// // Process keys in each slot using the provided function
// for slot, singleSlotKeys := range slots {
// batches := splitIntoBatches(singleSlotKeys, rsm.config.batchSize)
// for _, batch := range batches {
// slot, batch := slot, batch // Avoid closure capture issue
// g.Go(func() error {
// err := processFunc(ctx, slot, batch)
// if err != nil {
// log.ZWarn(ctx, "Batch processFunc failed", err, "slot", slot, "keys", batch)
// if !rsm.config.continueOnError {
// return err
// }
// }
// return nil
// })
// }
// }
//
// if err := g.Wait(); err != nil {
// return err
// }
// return nil
//}
// groupKeysBySlot groups keys by their Redis cluster hash slots.
func groupKeysBySlot(ctx context.Context, redisClient redis.UniversalClient, keys []string) (map[int64][]string, error) {
@@ -197,15 +197,15 @@ func ProcessKeysBySlot(
return nil
}
func DeleteCacheBySlot(ctx context.Context, rcClient *rockscache.Client, keys []string) error {
func DeleteCacheBySlot(ctx context.Context, rcClient *rocksCacheClient, keys []string) error {
switch len(keys) {
case 0:
return nil
case 1:
return rcClient.TagAsDeletedBatch2(ctx, keys)
return rcClient.GetClient().TagAsDeletedBatch2(ctx, keys)
default:
return ProcessKeysBySlot(ctx, getRocksCacheRedisClient(rcClient), keys, func(ctx context.Context, slot int64, keys []string) error {
return rcClient.TagAsDeletedBatch2(ctx, keys)
return ProcessKeysBySlot(ctx, rcClient.GetRedis(), keys, func(ctx context.Context, slot int64, keys []string) error {
return rcClient.GetClient().TagAsDeletedBatch2(ctx, keys)
})
}
}
+10 -76
View File
@@ -1,39 +1,23 @@
// Copyright © 2023 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package redis
import (
"context"
"github.com/dtm-labs/rockscache"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/tools/s3"
"github.com/openimsdk/tools/s3/cont"
"github.com/openimsdk/tools/s3/minio"
"github.com/redis/go-redis/v9"
"time"
)
func NewObjectCacheRedis(rdb redis.UniversalClient, objDB database.ObjectInfo) cache.ObjectCache {
opts := rockscache.NewDefaultOptions()
batchHandler := NewBatchDeleterRedis(rdb, &opts, nil)
rc := newRocksCacheClient(rdb)
return &objectCacheRedis{
BatchDeleter: batchHandler,
rcClient: rockscache.NewClient(rdb, opts),
BatchDeleter: rc.GetBatchDeleter(),
rcClient: rc,
expireTime: time.Hour * 12,
objDB: objDB,
}
@@ -42,7 +26,7 @@ func NewObjectCacheRedis(rdb redis.UniversalClient, objDB database.ObjectInfo) c
type objectCacheRedis struct {
cache.BatchDeleter
objDB database.ObjectInfo
rcClient *rockscache.Client
rcClient *rocksCacheClient
expireTime time.Duration
}
@@ -76,11 +60,10 @@ func (g *objectCacheRedis) GetName(ctx context.Context, engine string, name stri
}
func NewS3Cache(rdb redis.UniversalClient, s3 s3.Interface) cont.S3Cache {
opts := rockscache.NewDefaultOptions()
batchHandler := NewBatchDeleterRedis(rdb, &opts, nil)
rc := newRocksCacheClient(rdb)
return &s3CacheRedis{
BatchDeleter: batchHandler,
rcClient: rockscache.NewClient(rdb, opts),
BatchDeleter: rc.GetBatchDeleter(),
rcClient: rc,
expireTime: time.Hour * 12,
s3: s3,
}
@@ -89,7 +72,7 @@ func NewS3Cache(rdb redis.UniversalClient, s3 s3.Interface) cont.S3Cache {
type s3CacheRedis struct {
cache.BatchDeleter
s3 s3.Interface
rcClient *rockscache.Client
rcClient *rocksCacheClient
expireTime time.Duration
}
@@ -110,52 +93,3 @@ func (g *s3CacheRedis) GetKey(ctx context.Context, engine string, name string) (
return g.s3.StatObject(ctx, name)
})
}
func NewMinioCache(rdb redis.UniversalClient) minio.Cache {
opts := rockscache.NewDefaultOptions()
batchHandler := NewBatchDeleterRedis(rdb, &opts, nil)
return &minioCacheRedis{
BatchDeleter: batchHandler,
rcClient: rockscache.NewClient(rdb, opts),
expireTime: time.Hour * 24 * 7,
}
}
type minioCacheRedis struct {
cache.BatchDeleter
rcClient *rockscache.Client
expireTime time.Duration
}
func (g *minioCacheRedis) getObjectImageInfoKey(key string) string {
return cachekey.GetObjectImageInfoKey(key)
}
func (g *minioCacheRedis) getMinioImageThumbnailKey(key string, format string, width int, height int) string {
return cachekey.GetMinioImageThumbnailKey(key, format, width, height)
}
func (g *minioCacheRedis) DelObjectImageInfoKey(ctx context.Context, keys ...string) error {
ks := make([]string, 0, len(keys))
for _, key := range keys {
ks = append(ks, g.getObjectImageInfoKey(key))
}
return g.BatchDeleter.ExecDelWithKeys(ctx, ks)
}
func (g *minioCacheRedis) DelImageThumbnailKey(ctx context.Context, key string, format string, width int, height int) error {
return g.BatchDeleter.ExecDelWithKeys(ctx, []string{g.getMinioImageThumbnailKey(key, format, width, height)})
}
func (g *minioCacheRedis) GetImageObjectKeyInfo(ctx context.Context, key string, fn func(ctx context.Context) (*minio.ImageInfo, error)) (*minio.ImageInfo, error) {
info, err := getCache(ctx, g.rcClient, g.getObjectImageInfoKey(key), g.expireTime, fn)
if err != nil {
return nil, err
}
return info, nil
}
func (g *minioCacheRedis) GetThumbnailKey(ctx context.Context, key string, format string, width int, height int, minioCache func(ctx context.Context) (string, error)) (string, error) {
return getCache(ctx, g.rcClient, g.getMinioImageThumbnailKey(key, format, width, height), g.expireTime, minioCache)
}
+19 -17
View File
@@ -4,33 +4,35 @@ import (
"context"
"errors"
"fmt"
"github.com/dtm-labs/rockscache"
"strconv"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/mcache"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/log"
"github.com/redis/go-redis/v9"
"strconv"
"time"
)
func NewSeqConversationCacheRedis(rdb redis.UniversalClient, mgo database.SeqConversation) cache.SeqConversationCache {
if rdb == nil {
return mcache.NewSeqConversationCache(mgo)
}
return &seqConversationCacheRedis{
rdb: rdb,
mgo: mgo,
lockTime: time.Second * 3,
dataTime: time.Hour * 24 * 365,
minSeqExpireTime: time.Hour,
rocks: rockscache.NewClient(rdb, *GetRocksCacheOptions()),
rcClient: newRocksCacheClient(rdb),
}
}
type seqConversationCacheRedis struct {
rdb redis.UniversalClient
mgo database.SeqConversation
rocks *rockscache.Client
rcClient *rocksCacheClient
lockTime time.Duration
dataTime time.Duration
minSeqExpireTime time.Duration
@@ -45,7 +47,7 @@ func (s *seqConversationCacheRedis) SetMinSeq(ctx context.Context, conversationI
}
func (s *seqConversationCacheRedis) GetMinSeq(ctx context.Context, conversationID string) (int64, error) {
return getCache(ctx, s.rocks, s.getMinSeqKey(conversationID), s.minSeqExpireTime, func(ctx context.Context) (int64, error) {
return getCache(ctx, s.rcClient, s.getMinSeqKey(conversationID), s.minSeqExpireTime, func(ctx context.Context) (int64, error) {
return s.mgo.GetMinSeq(ctx, conversationID)
})
}
@@ -68,7 +70,7 @@ func (s *seqConversationCacheRedis) getSingleMaxSeqWithTime(ctx context.Context,
func (s *seqConversationCacheRedis) batchGetMaxSeq(ctx context.Context, keys []string, keyConversationID map[string]string, seqs map[string]int64) error {
result := make([]*redis.StringCmd, len(keys))
pipe := s.rdb.Pipeline()
pipe := s.rcClient.GetRedis().Pipeline()
for i, key := range keys {
result[i] = pipe.HGet(ctx, key, "CURR")
}
@@ -99,7 +101,7 @@ func (s *seqConversationCacheRedis) batchGetMaxSeq(ctx context.Context, keys []s
func (s *seqConversationCacheRedis) batchGetMaxSeqWithTime(ctx context.Context, keys []string, keyConversationID map[string]string, seqs map[string]database.SeqTime) error {
result := make([]*redis.SliceCmd, len(keys))
pipe := s.rdb.Pipeline()
pipe := s.rcClient.GetRedis().Pipeline()
for i, key := range keys {
result[i] = pipe.HMGet(ctx, key, "CURR", "TIME")
}
@@ -157,7 +159,7 @@ func (s *seqConversationCacheRedis) GetMaxSeqs(ctx context.Context, conversation
if len(keys) == 1 {
return s.getSingleMaxSeq(ctx, conversationIDs[0])
}
slotKeys, err := groupKeysBySlot(ctx, s.rdb, keys)
slotKeys, err := groupKeysBySlot(ctx, s.rcClient.GetRedis(), keys)
if err != nil {
return nil, err
}
@@ -190,7 +192,7 @@ func (s *seqConversationCacheRedis) GetMaxSeqsWithTime(ctx context.Context, conv
if len(keys) == 1 {
return s.getSingleMaxSeqWithTime(ctx, conversationIDs[0])
}
slotKeys, err := groupKeysBySlot(ctx, s.rdb, keys)
slotKeys, err := groupKeysBySlot(ctx, s.rcClient.GetRedis(), keys)
if err != nil {
return nil, err
}
@@ -234,7 +236,7 @@ redis.call("HSET", key, "CURR", curr_seq, "LAST", last_seq, "TIME", mallocTime)
redis.call("EXPIRE", key, dataSecond)
return 0
`
result, err := s.rdb.Eval(ctx, script, []string{key}, owner, int64(s.dataTime/time.Second), currSeq, lastSeq, mill).Int64()
result, err := s.rcClient.GetRedis().Eval(ctx, script, []string{key}, owner, int64(s.dataTime/time.Second), currSeq, lastSeq, mill).Int64()
if err != nil {
return 0, errs.Wrap(err)
}
@@ -305,7 +307,7 @@ table.insert(result, last_seq)
table.insert(result, mallocTime)
return result
`
result, err := s.rdb.Eval(ctx, script, []string{key}, size, int64(s.lockTime/time.Second), int64(s.dataTime/time.Second), time.Now().UnixMilli()).Int64Slice()
result, err := s.rcClient.GetRedis().Eval(ctx, script, []string{key}, size, int64(s.lockTime/time.Second), int64(s.dataTime/time.Second), time.Now().UnixMilli()).Int64Slice()
if err != nil {
return nil, errs.Wrap(err)
}
@@ -438,7 +440,7 @@ func (s *seqConversationCacheRedis) SetMinSeqs(ctx context.Context, seqs map[str
return err
}
}
return DeleteCacheBySlot(ctx, s.rocks, keys)
return DeleteCacheBySlot(ctx, s.rcClient, keys)
}
// GetCacheMaxSeqWithTime only get the existing cache, if there is no cache, no cache will be generated
@@ -456,7 +458,7 @@ func (s *seqConversationCacheRedis) GetCacheMaxSeqWithTime(ctx context.Context,
key2conversationID[key] = conversationID
keys = append(keys, key)
}
slotKeys, err := groupKeysBySlot(ctx, s.rdb, keys)
slotKeys, err := groupKeysBySlot(ctx, s.rcClient.GetRedis(), keys)
if err != nil {
return nil, err
}
@@ -465,7 +467,7 @@ func (s *seqConversationCacheRedis) GetCacheMaxSeqWithTime(ctx context.Context,
if len(keys) == 0 {
continue
}
pipe := s.rdb.Pipeline()
pipe := s.rcClient.GetRedis().Pipeline()
cmds := make([]*redis.SliceCmd, 0, len(keys))
for _, key := range keys {
cmds = append(cmds, pipe.HMGet(ctx, key, "CURR", "TIME"))
+12 -11
View File
@@ -2,31 +2,29 @@ package redis
import (
"context"
"github.com/dtm-labs/rockscache"
"strconv"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
"github.com/openimsdk/tools/errs"
"github.com/redis/go-redis/v9"
"strconv"
"time"
)
func NewSeqUserCacheRedis(rdb redis.UniversalClient, mgo database.SeqUser) cache.SeqUser {
return &seqUserCacheRedis{
rdb: rdb,
mgo: mgo,
readSeqWriteRatio: 100,
expireTime: time.Hour * 24 * 7,
readExpireTime: time.Hour * 24 * 30,
rocks: rockscache.NewClient(rdb, *GetRocksCacheOptions()),
rocks: newRocksCacheClient(rdb),
}
}
type seqUserCacheRedis struct {
rdb redis.UniversalClient
mgo database.SeqUser
rocks *rockscache.Client
rocks *rocksCacheClient
expireTime time.Duration
readExpireTime time.Duration
readSeqWriteRatio int64
@@ -54,7 +52,7 @@ func (s *seqUserCacheRedis) SetUserMaxSeq(ctx context.Context, conversationID st
if err := s.mgo.SetUserMaxSeq(ctx, conversationID, userID, seq); err != nil {
return err
}
return s.rocks.TagAsDeleted2(ctx, s.getSeqUserMaxSeqKey(conversationID, userID))
return s.rocks.GetClient().TagAsDeleted2(ctx, s.getSeqUserMaxSeqKey(conversationID, userID))
}
func (s *seqUserCacheRedis) GetUserMinSeq(ctx context.Context, conversationID string, userID string) (int64, error) {
@@ -74,12 +72,15 @@ func (s *seqUserCacheRedis) GetUserReadSeq(ctx context.Context, conversationID s
}
func (s *seqUserCacheRedis) SetUserReadSeq(ctx context.Context, conversationID string, userID string, seq int64) error {
if s.rocks.GetRedis() == nil {
return s.SetUserReadSeqToDB(ctx, conversationID, userID, seq)
}
dbSeq, err := s.GetUserReadSeq(ctx, conversationID, userID)
if err != nil {
return err
}
if dbSeq < seq {
if err := s.rocks.RawSet(ctx, s.getSeqUserReadSeqKey(conversationID, userID), strconv.Itoa(int(seq)), s.readExpireTime); err != nil {
if err := s.rocks.GetClient().RawSet(ctx, s.getSeqUserReadSeqKey(conversationID, userID), strconv.Itoa(int(seq)), s.readExpireTime); err != nil {
return errs.Wrap(err)
}
}
@@ -109,12 +110,12 @@ func (s *seqUserCacheRedis) setUserRedisReadSeqs(ctx context.Context, userID str
keys = append(keys, key)
keySeq[key] = seq
}
slotKeys, err := groupKeysBySlot(ctx, s.rdb, keys)
slotKeys, err := groupKeysBySlot(ctx, s.rocks.GetRedis(), keys)
if err != nil {
return err
}
for _, keys := range slotKeys {
pipe := s.rdb.Pipeline()
pipe := s.rocks.GetRedis().Pipeline()
for _, key := range keys {
pipe.HSet(ctx, key, "value", strconv.FormatInt(keySeq[key], 10))
pipe.Expire(ctx, key, s.readExpireTime)
+2 -15
View File
@@ -1,26 +1,13 @@
// Copyright © 2023 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package redis
import (
"context"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey"
"github.com/openimsdk/tools/errs"
"github.com/redis/go-redis/v9"
"time"
)
func NewThirdCache(rdb redis.UniversalClient) cache.ThirdCache {
+6 -22
View File
@@ -1,30 +1,16 @@
// Copyright © 2023 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package redis
import (
"context"
"time"
"github.com/dtm-labs/rockscache"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/tools/log"
"github.com/redis/go-redis/v9"
"time"
)
const (
@@ -38,19 +24,17 @@ type UserCacheRedis struct {
rdb redis.UniversalClient
userDB database.User
expireTime time.Duration
rcClient *rockscache.Client
rcClient *rocksCacheClient
}
func NewUserCacheRedis(rdb redis.UniversalClient, localCache *config.LocalCache, userDB database.User, options *rockscache.Options) cache.UserCache {
batchHandler := NewBatchDeleterRedis(rdb, options, []string{localCache.User.Topic})
u := localCache.User
log.ZDebug(context.Background(), "user local cache init", "Topic", u.Topic, "SlotNum", u.SlotNum, "SlotSize", u.SlotSize, "enable", u.Enable())
rc := newRocksCacheClient(rdb)
return &UserCacheRedis{
BatchDeleter: batchHandler,
BatchDeleter: rc.GetBatchDeleter(localCache.User.Topic),
rdb: rdb,
userDB: userDB,
expireTime: userExpireTime,
rcClient: rockscache.NewClient(rdb, *options),
rcClient: rc,
}
}
+1 -1
View File
@@ -140,7 +140,7 @@ func NewGroupDatabase(
groupMemberDB: groupMemberDB,
groupRequestDB: groupRequestDB,
ctxTx: ctxTx,
cache: redis2.NewGroupCacheRedis(rdb, localCache, groupDB, groupMemberDB, groupRequestDB, groupHash, redis2.GetRocksCacheOptions()),
cache: redis2.NewGroupCacheRedis(rdb, localCache, groupDB, groupMemberDB, groupRequestDB, groupHash),
}
}
+14 -17
View File
@@ -18,19 +18,21 @@ import (
"context"
"encoding/json"
"errors"
"github.com/openimsdk/tools/mq"
"github.com/openimsdk/tools/utils/jsonutil"
"google.golang.org/protobuf/proto"
"strconv"
"strings"
"time"
"github.com/openimsdk/tools/utils/jsonutil"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/redis/go-redis/v9"
"go.mongodb.org/mongo-driver/mongo"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/convert"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/kafka"
@@ -102,22 +104,14 @@ type CommonMsgDatabase interface {
GetLastMessage(ctx context.Context, conversationIDS []string, userID string) (map[string]*sdkws.MsgData, error)
}
func NewCommonMsgDatabase(msgDocModel database.Msg, msg cache.MsgCache, seqUser cache.SeqUser, seqConversation cache.SeqConversationCache, kafkaConf *config.Kafka) (CommonMsgDatabase, error) {
conf, err := kafka.BuildProducerConfig(*kafkaConf.Build())
if err != nil {
return nil, err
}
producerToRedis, err := kafka.NewKafkaProducer(conf, kafkaConf.Address, kafkaConf.ToRedisTopic)
if err != nil {
return nil, err
}
func NewCommonMsgDatabase(msgDocModel database.Msg, msg cache.MsgCache, seqUser cache.SeqUser, seqConversation cache.SeqConversationCache, producer mq.Producer) CommonMsgDatabase {
return &commonMsgDatabase{
msgDocDatabase: msgDocModel,
msgCache: msg,
seqUser: seqUser,
seqConversation: seqConversation,
producer: producerToRedis,
}, nil
producer: producer,
}
}
type commonMsgDatabase struct {
@@ -126,12 +120,15 @@ type commonMsgDatabase struct {
msgCache cache.MsgCache
seqConversation cache.SeqConversationCache
seqUser cache.SeqUser
producer *kafka.Producer
producer mq.Producer
}
func (db *commonMsgDatabase) MsgToMQ(ctx context.Context, key string, msg2mq *sdkws.MsgData) error {
_, _, err := db.producer.SendMessage(ctx, key, msg2mq)
return err
data, err := proto.Marshal(msg2mq)
if err != nil {
return err
}
return db.producer.SendMessage(ctx, key, data)
}
func (db *commonMsgDatabase) batchInsertBlock(ctx context.Context, conversationID string, fields []any, key int8, firstSeq int64) error {
+33 -25
View File
@@ -2,11 +2,13 @@ package controller
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/common/convert"
"github.com/openimsdk/protocol/constant"
"github.com/openimsdk/tools/mq"
"github.com/openimsdk/tools/utils/datautil"
"google.golang.org/protobuf/proto"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/kafka"
@@ -32,30 +34,30 @@ type MsgTransferDatabase interface {
SetHasReadSeqToDB(ctx context.Context, conversationID string, userSeqMap map[string]int64) error
// to mq
MsgToPushMQ(ctx context.Context, key, conversationID string, msg2mq *sdkws.MsgData) (int32, int64, error)
MsgToPushMQ(ctx context.Context, key, conversationID string, msg2mq *sdkws.MsgData) error
MsgToMongoMQ(ctx context.Context, key, conversationID string, msgs []*sdkws.MsgData, lastSeq int64) error
}
func NewMsgTransferDatabase(msgDocModel database.Msg, msg cache.MsgCache, seqUser cache.SeqUser, seqConversation cache.SeqConversationCache, kafkaConf *config.Kafka) (MsgTransferDatabase, error) {
conf, err := kafka.BuildProducerConfig(*kafkaConf.Build())
if err != nil {
return nil, err
}
producerToMongo, err := kafka.NewKafkaProducer(conf, kafkaConf.Address, kafkaConf.ToMongoTopic)
if err != nil {
return nil, err
}
producerToPush, err := kafka.NewKafkaProducer(conf, kafkaConf.Address, kafkaConf.ToPushTopic)
if err != nil {
return nil, err
}
func NewMsgTransferDatabase(msgDocModel database.Msg, msg cache.MsgCache, seqUser cache.SeqUser, seqConversation cache.SeqConversationCache, mongoProducer, pushProducer mq.Producer) (MsgTransferDatabase, error) {
//conf, err := kafka.BuildProducerConfig(*kafkaConf.Build())
//if err != nil {
// return nil, err
//}
//producerToMongo, err := kafka.NewKafkaProducerV2(conf, kafkaConf.Address, kafkaConf.ToMongoTopic)
//if err != nil {
// return nil, err
//}
//producerToPush, err := kafka.NewKafkaProducerV2(conf, kafkaConf.Address, kafkaConf.ToPushTopic)
//if err != nil {
// return nil, err
//}
return &msgTransferDatabase{
msgDocDatabase: msgDocModel,
msgCache: msg,
seqUser: seqUser,
seqConversation: seqConversation,
producerToMongo: producerToMongo,
producerToPush: producerToPush,
producerToMongo: mongoProducer,
producerToPush: pushProducer,
}, nil
}
@@ -65,8 +67,8 @@ type msgTransferDatabase struct {
msgCache cache.MsgCache
seqConversation cache.SeqConversationCache
seqUser cache.SeqUser
producerToMongo *kafka.Producer
producerToPush *kafka.Producer
producerToMongo mq.Producer
producerToPush mq.Producer
}
func (db *msgTransferDatabase) BatchInsertChat2DB(ctx context.Context, conversationID string, msgList []*sdkws.MsgData, currentMaxSeq int64) error {
@@ -281,19 +283,25 @@ func (db *msgTransferDatabase) SetHasReadSeqToDB(ctx context.Context, conversati
return nil
}
func (db *msgTransferDatabase) MsgToPushMQ(ctx context.Context, key, conversationID string, msg2mq *sdkws.MsgData) (int32, int64, error) {
partition, offset, err := db.producerToPush.SendMessage(ctx, key, &pbmsg.PushMsgDataToMQ{MsgData: msg2mq, ConversationID: conversationID})
func (db *msgTransferDatabase) MsgToPushMQ(ctx context.Context, key, conversationID string, msg2mq *sdkws.MsgData) error {
data, err := proto.Marshal(&pbmsg.PushMsgDataToMQ{MsgData: msg2mq, ConversationID: conversationID})
if err != nil {
log.ZError(ctx, "MsgToPushMQ", err, "key", key, "msg2mq", msg2mq)
return 0, 0, err
return err
}
return partition, offset, nil
if err := db.producerToPush.SendMessage(ctx, key, data); err != nil {
log.ZError(ctx, "MsgToPushMQ", err, "key", key, "conversationID", conversationID)
return err
}
return nil
}
func (db *msgTransferDatabase) MsgToMongoMQ(ctx context.Context, key, conversationID string, messages []*sdkws.MsgData, lastSeq int64) error {
if len(messages) > 0 {
_, _, err := db.producerToMongo.SendMessage(ctx, key, &pbmsg.MsgDataToMongoByMQ{LastSeq: lastSeq, ConversationID: conversationID, MsgData: messages})
data, err := proto.Marshal(&pbmsg.MsgDataToMongoByMQ{LastSeq: lastSeq, ConversationID: conversationID, MsgData: messages})
if err != nil {
return err
}
if err := db.producerToMongo.SendMessage(ctx, key, data); err != nil {
log.ZError(ctx, "MsgToMongoMQ", err, "key", key, "conversationID", conversationID, "lastSeq", lastSeq)
return err
}
+12 -14
View File
@@ -17,12 +17,13 @@ package controller
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/kafka"
"github.com/openimsdk/protocol/push"
"github.com/openimsdk/protocol/sdkws"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/mq"
"google.golang.org/protobuf/proto"
)
type PushDatabase interface {
@@ -32,21 +33,13 @@ type PushDatabase interface {
type pushDataBase struct {
cache cache.ThirdCache
producerToOfflinePush *kafka.Producer
producerToOfflinePush mq.Producer
}
func NewPushDatabase(cache cache.ThirdCache, kafkaConf *config.Kafka) PushDatabase {
conf, err := kafka.BuildProducerConfig(*kafkaConf.Build())
if err != nil {
return nil
}
producerToOfflinePush, err := kafka.NewKafkaProducer(conf, kafkaConf.Address, kafkaConf.ToOfflinePushTopic)
if err != nil {
return nil
}
func NewPushDatabase(cache cache.ThirdCache, offlinePushProducer mq.Producer) PushDatabase {
return &pushDataBase{
cache: cache,
producerToOfflinePush: producerToOfflinePush,
producerToOfflinePush: offlinePushProducer,
}
}
@@ -55,7 +48,12 @@ func (p *pushDataBase) DelFcmToken(ctx context.Context, userID string, platformI
}
func (p *pushDataBase) MsgToOfflinePushMQ(ctx context.Context, key string, userIDs []string, msg2mq *sdkws.MsgData) error {
_, _, err := p.producerToOfflinePush.SendMessage(ctx, key, &push.PushMsgReq{MsgData: msg2mq, UserIDs: userIDs})
log.ZInfo(ctx, "message is push to offlinePush topic", "key", key, "userIDs", userIDs, "msg", msg2mq.String())
data, err := proto.Marshal(&push.PushMsgReq{MsgData: msg2mq, UserIDs: userIDs})
if err != nil {
return err
}
if err := p.producerToOfflinePush.SendMessage(ctx, key, data); err != nil {
log.ZError(ctx, "message is push to offlinePush topic", err, "key", key, "userIDs", userIDs, "msg", msg2mq.String())
}
return err
}
+83
View File
@@ -16,6 +16,7 @@ package database
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/tools/db/pagination"
)
@@ -29,3 +30,85 @@ type Black interface {
FindOwnerBlackInfos(ctx context.Context, ownerUserID string, userIDs []string) (blacks []*model.Black, err error)
FindBlackUserIDs(ctx context.Context, ownerUserID string) (blackUserIDs []string, err error)
}
var (
_ Black = (*mgoImpl)(nil)
_ Black = (*redisImpl)(nil)
)
type mgoImpl struct {
}
func (m *mgoImpl) Create(ctx context.Context, blacks []*model.Black) (err error) {
//TODO implement me
panic("implement me")
}
func (m *mgoImpl) Delete(ctx context.Context, blacks []*model.Black) (err error) {
//TODO implement me
panic("implement me")
}
func (m *mgoImpl) Find(ctx context.Context, blacks []*model.Black) (blackList []*model.Black, err error) {
//TODO implement me
panic("implement me")
}
func (m *mgoImpl) Take(ctx context.Context, ownerUserID, blockUserID string) (black *model.Black, err error) {
//TODO implement me
panic("implement me")
}
func (m *mgoImpl) FindOwnerBlacks(ctx context.Context, ownerUserID string, pagination pagination.Pagination) (total int64, blacks []*model.Black, err error) {
//TODO implement me
panic("implement me")
}
func (m *mgoImpl) FindOwnerBlackInfos(ctx context.Context, ownerUserID string, userIDs []string) (blacks []*model.Black, err error) {
//TODO implement me
panic("implement me")
}
func (m *mgoImpl) FindBlackUserIDs(ctx context.Context, ownerUserID string) (blackUserIDs []string, err error) {
//TODO implement me
panic("implement me")
}
type redisImpl struct {
}
func (r *redisImpl) Create(ctx context.Context, blacks []*model.Black) (err error) {
//TODO implement me
panic("implement me")
}
func (r *redisImpl) Delete(ctx context.Context, blacks []*model.Black) (err error) {
//TODO implement me
panic("implement me")
}
func (r *redisImpl) Find(ctx context.Context, blacks []*model.Black) (blackList []*model.Black, err error) {
//TODO implement me
panic("implement me")
}
func (r *redisImpl) Take(ctx context.Context, ownerUserID, blockUserID string) (black *model.Black, err error) {
//TODO implement me
panic("implement me")
}
func (r *redisImpl) FindOwnerBlacks(ctx context.Context, ownerUserID string, pagination pagination.Pagination) (total int64, blacks []*model.Black, err error) {
//TODO implement me
panic("implement me")
}
func (r *redisImpl) FindOwnerBlackInfos(ctx context.Context, ownerUserID string, userIDs []string) (blacks []*model.Black, err error) {
//TODO implement me
panic("implement me")
}
func (r *redisImpl) FindBlackUserIDs(ctx context.Context, ownerUserID string) (blackUserIDs []string, err error) {
//TODO implement me
panic("implement me")
}
+16
View File
@@ -0,0 +1,16 @@
package database
import (
"context"
"time"
)
type Cache interface {
Get(ctx context.Context, key []string) (map[string]string, error)
Prefix(ctx context.Context, prefix string) (map[string]string, error)
Set(ctx context.Context, key string, value string, expireAt time.Duration) error
Incr(ctx context.Context, key string, value int) (int, error)
Del(ctx context.Context, key []string) error
Lock(ctx context.Context, key string, duration time.Duration) (string, error)
Unlock(ctx context.Context, key string, value string) error
}
+1 -1
View File
@@ -127,7 +127,7 @@ func (x *CacheMgo) Del(ctx context.Context, key []string) error {
return nil
}
_, err := x.coll.DeleteMany(ctx, bson.M{"key": bson.M{"$in": key}})
return errs.Wrap(err)
return err
}
func (x *CacheMgo) lockKey(key string) string {
@@ -0,0 +1,133 @@
package mgo
import (
"context"
"strings"
"sync"
"testing"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/tools/db/mongoutil"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)
func TestName1111(t *testing.T) {
coll := Mongodb().Collection("temp")
//updatePipeline := mongo.Pipeline{
// {
// {"$set", bson.M{
// "age": bson.M{
// "$toString": bson.M{
// "$add": bson.A{
// bson.M{"$toInt": "$age"},
// 1,
// },
// },
// },
// }},
// },
//}
pipeline := mongo.Pipeline{
{
{"$set", bson.M{
"value": bson.M{
"$toString": bson.M{
"$add": bson.A{
bson.M{"$toInt": "$value"},
1,
},
},
},
}},
},
}
opt := options.FindOneAndUpdate().SetUpsert(false).SetReturnDocument(options.After)
res, err := mongoutil.FindOneAndUpdate[model.Cache](context.Background(), coll, bson.M{"key": "123456"}, pipeline, opt)
if err != nil {
panic(err)
}
t.Log(res)
}
func TestName33333(t *testing.T) {
c, err := NewCacheMgo(Mongodb())
if err != nil {
panic(err)
}
if err := c.Set(context.Background(), "123456", "123456", time.Hour); err != nil {
panic(err)
}
if err := c.Set(context.Background(), "123666", "123666", time.Hour); err != nil {
panic(err)
}
res1, err := c.Get(context.Background(), []string{"123456"})
if err != nil {
panic(err)
}
t.Log(res1)
res2, err := c.Prefix(context.Background(), "123")
if err != nil {
panic(err)
}
t.Log(res2)
}
func TestName1111aa(t *testing.T) {
c, err := NewCacheMgo(Mongodb())
if err != nil {
panic(err)
}
var count int
key := "123456"
doFunc := func() {
value, err := c.Lock(context.Background(), key, time.Second*30)
if err != nil {
t.Log("Lock error", err)
return
}
tmp := count
tmp++
count = tmp
t.Log("count", tmp)
if err := c.Unlock(context.Background(), key, value); err != nil {
t.Log("Unlock error", err)
return
}
}
if _, err := c.Lock(context.Background(), key, time.Second*10); err != nil {
t.Log(err)
return
}
var wg sync.WaitGroup
for i := 0; i < 32; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < 100; i++ {
doFunc()
}
}()
}
wg.Wait()
}
func TestName111111a(t *testing.T) {
arr := strings.SplitN("1:testkakskdask:1111", ":", 2)
t.Log(arr)
}
+44 -8
View File
@@ -2,16 +2,17 @@ package mgo
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/tools/db/mongoutil"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"math"
"math/rand"
"strconv"
"testing"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/tools/db/mongoutil"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)
func TestName1(t *testing.T) {
@@ -93,7 +94,7 @@ func TestName3(t *testing.T) {
func TestName4(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*300)
defer cancel()
cli := Result(mongo.Connect(ctx, options.Client().ApplyURI("mongodb://openIM:openIM123@172.16.8.66:37017/openim_v3?maxPoolSize=100").SetConnectTimeout(5*time.Second)))
cli := Result(mongo.Connect(ctx, options.Client().ApplyURI("mongodb://openIM:openIM123@172.16.8.135:37017/openim_v3?maxPoolSize=100").SetConnectTimeout(5*time.Second)))
msg, err := NewMsgMongo(cli.Database("openim_v3"))
if err != nil {
@@ -109,6 +110,41 @@ func TestName4(t *testing.T) {
}
func TestName5(t *testing.T) {
var v time.Time
t.Log(v.UnixMilli())
ctx, cancel := context.WithTimeout(context.Background(), time.Second*300)
defer cancel()
cli := Result(mongo.Connect(ctx, options.Client().ApplyURI("mongodb://openIM:openIM123@172.16.8.135:37017/openim_v3?maxPoolSize=100").SetConnectTimeout(5*time.Second)))
tmp, err := NewMsgMongo(cli.Database("openim_v3"))
if err != nil {
panic(err)
}
msg := tmp.(*MsgMgo)
ts := time.Now().Add(-time.Hour * 24 * 5).UnixMilli()
t.Log(ts)
var seqs []int64
for i := 1; i < 256; i++ {
seqs = append(seqs, int64(i))
}
res, err := msg.FindSeqs(ctx, "si_4924054191_9511766539", seqs)
if err != nil {
panic(err)
}
t.Log(res)
}
//func TestName6(t *testing.T) {
// ctx, cancel := context.WithTimeout(context.Background(), time.Second*300)
// defer cancel()
// cli := Result(mongo.Connect(ctx, options.Client().ApplyURI("mongodb://openIM:openIM123@172.16.8.135:37017/openim_v3?maxPoolSize=100").SetConnectTimeout(5*time.Second)))
//
// tmp, err := NewMsgMongo(cli.Database("openim_v3"))
// if err != nil {
// panic(err)
// }
// msg := tmp.(*MsgMgo)
// seq, sendTime, err := msg.findBeforeSendTime(ctx, "si_4924054191_9511766539", 1144)
// if err != nil {
// panic(err)
// }
// t.Log(seq, sendTime)
//}
@@ -2,10 +2,11 @@ package mgo
import (
"context"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"testing"
"time"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)
func Result[V any](val V, err error) V {
@@ -19,7 +20,7 @@ func Mongodb() *mongo.Database {
return Result(
mongo.Connect(context.Background(),
options.Client().
ApplyURI("mongodb://openIM:openIM123@172.16.8.48:37017/openim_v3?maxPoolSize=100").
ApplyURI("mongodb://openIM:openIM123@172.16.8.135:37017/openim_v3?maxPoolSize=100").
SetConnectTimeout(5*time.Second)),
).Database("openim_v3")
}
+2
View File
@@ -17,4 +17,6 @@ const (
UserName = "user"
SeqConversationName = "seq"
SeqUserName = "seq_user"
StreamMsgName = "stream_msg"
CacheName = "cache"
)
+9
View File
@@ -0,0 +1,9 @@
package model
import "time"
type Cache struct {
Key string `bson:"key"`
Value string `bson:"value"`
ExpireAt *time.Time `bson:"expire_at"`
}
+25
View File
@@ -0,0 +1,25 @@
package dbbuild
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/tools/db/mongoutil"
"github.com/redis/go-redis/v9"
)
type Builder interface {
Mongo(ctx context.Context) (*mongoutil.Client, error)
Redis(ctx context.Context) (redis.UniversalClient, error)
}
func NewBuilder(mongoConf *config.Mongo, redisConf *config.Redis) Builder {
if config.Standalone() {
globalStandalone.setConfig(mongoConf, redisConf)
return globalStandalone
}
return &microservices{
mongo: mongoConf,
redis: redisConf,
}
}
+26
View File
@@ -0,0 +1,26 @@
package dbbuild
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/tools/db/mongoutil"
"github.com/openimsdk/tools/db/redisutil"
"github.com/redis/go-redis/v9"
)
type microservices struct {
mongo *config.Mongo
redis *config.Redis
}
func (x *microservices) Mongo(ctx context.Context) (*mongoutil.Client, error) {
return mongoutil.NewMongoDB(ctx, x.mongo.Build())
}
func (x *microservices) Redis(ctx context.Context) (redis.UniversalClient, error) {
if x.redis.Disable {
return nil, nil
}
return redisutil.NewRedisClient(ctx, x.redis.Build())
}
+76
View File
@@ -0,0 +1,76 @@
package dbbuild
import (
"context"
"sync"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/tools/db/mongoutil"
"github.com/openimsdk/tools/db/redisutil"
"github.com/redis/go-redis/v9"
)
const (
standaloneMongo = "mongo"
standaloneRedis = "redis"
)
var globalStandalone = &standalone{}
type standaloneConn[C any] struct {
Conn C
Err error
}
func (x *standaloneConn[C]) result() (C, error) {
return x.Conn, x.Err
}
type standalone struct {
lock sync.Mutex
mongo *config.Mongo
redis *config.Redis
conn map[string]any
}
func (x *standalone) setConfig(mongoConf *config.Mongo, redisConf *config.Redis) {
x.lock.Lock()
defer x.lock.Unlock()
x.mongo = mongoConf
x.redis = redisConf
}
func (x *standalone) Mongo(ctx context.Context) (*mongoutil.Client, error) {
x.lock.Lock()
defer x.lock.Unlock()
if x.conn == nil {
x.conn = make(map[string]any)
}
v, ok := x.conn[standaloneMongo]
if !ok {
var val standaloneConn[*mongoutil.Client]
val.Conn, val.Err = mongoutil.NewMongoDB(ctx, x.mongo.Build())
v = &val
x.conn[standaloneMongo] = v
}
return v.(*standaloneConn[*mongoutil.Client]).result()
}
func (x *standalone) Redis(ctx context.Context) (redis.UniversalClient, error) {
x.lock.Lock()
defer x.lock.Unlock()
if x.redis.Disable {
return nil, nil
}
if x.conn == nil {
x.conn = make(map[string]any)
}
v, ok := x.conn[standaloneRedis]
if !ok {
var val standaloneConn[redis.UniversalClient]
val.Conn, val.Err = redisutil.NewRedisClient(ctx, x.redis.Build())
v = &val
x.conn[standaloneRedis] = v
}
return v.(*standaloneConn[redis.UniversalClient]).result()
}
+60
View File
@@ -0,0 +1,60 @@
package mqbuild
import (
"context"
"fmt"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/tools/mq"
"github.com/openimsdk/tools/mq/kafka"
"github.com/openimsdk/tools/mq/simmq"
)
type Builder interface {
GetTopicProducer(ctx context.Context, topic string) (mq.Producer, error)
GetTopicConsumer(ctx context.Context, topic string) (mq.Consumer, error)
}
func NewBuilder(kafka *config.Kafka) Builder {
if config.Standalone() {
return standaloneBuilder{}
}
return &kafkaBuilder{
addr: kafka.Address,
config: kafka.Build(),
topicGroupID: map[string]string{
kafka.ToRedisTopic: kafka.ToRedisGroupID,
kafka.ToMongoTopic: kafka.ToMongoGroupID,
kafka.ToPushTopic: kafka.ToPushGroupID,
kafka.ToOfflinePushTopic: kafka.ToOfflineGroupID,
},
}
}
type standaloneBuilder struct{}
func (standaloneBuilder) GetTopicProducer(ctx context.Context, topic string) (mq.Producer, error) {
return simmq.GetTopicProducer(topic), nil
}
func (standaloneBuilder) GetTopicConsumer(ctx context.Context, topic string) (mq.Consumer, error) {
return simmq.GetTopicConsumer(topic), nil
}
type kafkaBuilder struct {
addr []string
config *kafka.Config
topicGroupID map[string]string
}
func (x *kafkaBuilder) GetTopicProducer(ctx context.Context, topic string) (mq.Producer, error) {
return kafka.NewKafkaProducerV2(x.config, x.addr, topic)
}
func (x *kafkaBuilder) GetTopicConsumer(ctx context.Context, topic string) (mq.Consumer, error) {
groupID, ok := x.topicGroupID[topic]
if !ok {
return nil, fmt.Errorf("topic %s groupID not found", topic)
}
return kafka.NewMConsumerGroupV2(ctx, x.config, groupID, []string{topic}, true)
}
+9 -7
View File
@@ -3,15 +3,16 @@ package rpccache
import (
"context"
"fmt"
"github.com/openimsdk/open-im-server/v3/pkg/rpcli"
"github.com/openimsdk/protocol/constant"
"github.com/openimsdk/protocol/user"
"math/rand"
"strconv"
"sync"
"sync/atomic"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/rpcli"
"github.com/openimsdk/protocol/constant"
"github.com/openimsdk/protocol/user"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey"
"github.com/openimsdk/open-im-server/v3/pkg/localcache"
"github.com/openimsdk/open-im-server/v3/pkg/localcache/lru"
@@ -51,10 +52,11 @@ func NewOnlineCache(client *rpcli.UserClient, group *GroupLocalCache, rdb redis.
x.CurrentPhase.Store(DoSubscribeOver)
x.Cond.Broadcast()
}
go func() {
x.doSubscribe(ctx, rdb, fn)
}()
if rdb != nil {
go func() {
x.doSubscribe(ctx, rdb, fn)
}()
}
return x, nil
}
+6 -3
View File
@@ -3,11 +3,12 @@ package batcher
import (
"context"
"fmt"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/utils/idutil"
"strings"
"sync"
"time"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/utils/idutil"
)
var (
@@ -245,7 +246,9 @@ func (b *Batcher[T]) distributeMessage(messages map[string][]*T, totalCount int,
if b.config.syncWait {
b.counter.Wait()
}
b.OnComplete(lastMessage, totalCount)
if b.OnComplete != nil {
b.OnComplete(lastMessage, totalCount)
}
}
func (b *Batcher[T]) run(channelID int, ch <-chan *Msg[T]) {