mirror of
https://github.com/openimsdk/open-im-server.git
synced 2026-05-20 08:49:01 +08:00
Merge branch 'errcode' of github.com:OpenIMSDK/Open-IM-Server into errcode
Conflicts: cmd/rpc/auth/main.go cmd/rpc/friend/main.go cmd/rpc/group/main.go cmd/rpc/msg/main.go cmd/rpc/user/main.go internal/rpc/auth/auth.go internal/rpc/conversation/conversaion.go internal/rpc/friend/friend.go internal/rpc/group/callback.go internal/rpc/group/group.go internal/rpc/msg/pull_message.go internal/rpc/msg/query_msg.go internal/rpc/msg/rpc_chat.go internal/rpc/msg/send_msg.go internal/rpc/user/user.go pkg/common/db/cache/redis.go pkg/common/db/controller/msg.go pkg/common/http/http_client.go
This commit is contained in:
Vendored
+40
-4
@@ -20,21 +20,25 @@ const (
|
||||
superGroupRecvMsgNotNotifyUserIDsKey = "SUPER_GROUP_RECV_MSG_NOT_NOTIFY_USER_IDS:"
|
||||
conversationExpireTime = time.Second * 60 * 60 * 12
|
||||
)
|
||||
type FuncDB func() (string, error)
|
||||
|
||||
// arg fn will exec when no data in cache
|
||||
type ConversationCache interface {
|
||||
// get user's conversationIDs from cache
|
||||
GetUserConversationIDs(ctx context.Context, userID string, fn func(ctx context.Context, userID string) ([]string, error)) ([]string, error)
|
||||
GetUserConversationIDs(ctx context.Context, userID string, fn FuncDB) ([]string, error)
|
||||
// del user's conversationIDs from cache, call when a user add or reduce a conversation
|
||||
DelUserConversationIDs(ctx context.Context, userID string) error
|
||||
DelUsersConversationIDs(ctx context.Context,userIDList []string)error
|
||||
// get one conversation from cache
|
||||
GetConversation(ctx context.Context, ownerUserID, conversationID string, fn func(ctx context.Context, ownerUserID, conversationID string) (*relationTb.ConversationModel, error)) (*relationTb.ConversationModel, error)
|
||||
GetConversation(ctx context.Context, ownerUserID, conversationID string, fn FuncDB) (*relationTb.ConversationModel, error)
|
||||
// get one conversation from cache
|
||||
GetConversations(ctx context.Context, ownerUserID string, conversationIDs []string, fn func(ctx context.Context, ownerUserID, conversationIDs []string) ([]*relationTb.ConversationModel, error)) ([]*relationTb.ConversationModel, error)
|
||||
GetConversations(ctx context.Context, ownerUserID string, conversationIDs []string, fn FuncDB)([]*relationTb.ConversationModel, error)
|
||||
// get one user's all conversations from cache
|
||||
GetUserAllConversations(ctx context.Context, ownerUserID string, fn func(ctx context.Context, ownerUserIDs string) ([]*relationTb.ConversationModel, error)) ([]*relationTb.ConversationModel, error)
|
||||
GetUserAllConversations(ctx context.Context, ownerUserID string, fn FuncDB ) ([]*relationTb.ConversationModel, error)
|
||||
// del one conversation from cache, call when one user's conversation Info changed
|
||||
DelConversation(ctx context.Context, ownerUserID, conversationID string) error
|
||||
DelUserConversations(ctx context.Context, ownerUserID string, conversationIDList []string) error
|
||||
DelUsersConversation(ctx context.Context, ownerUserIDList []string, conversationID string) error
|
||||
// get user conversation recv msg from cache
|
||||
GetUserRecvMsgOpt(ctx context.Context, ownerUserID, conversationID string, fn func(ctx context.Context, ownerUserID, conversationID string) (opt int, err error)) (opt int, err error)
|
||||
// del user recv msg opt from cache, call when user's conversation recv msg opt changed
|
||||
@@ -51,6 +55,38 @@ type ConversationRedis struct {
|
||||
rcClient *rockscache.Client
|
||||
}
|
||||
|
||||
func (c *ConversationRedis) GetUserConversationIDs(ctx context.Context, userID string, fn func(ctx context.Context, userID string) ([]string, error)) ([]string, error) {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (c *ConversationRedis) DelUsersConversationIDs(ctx context.Context, userIDList []string) error {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (c *ConversationRedis) GetConversation(ctx context.Context, ownerUserID, conversationID string, fn func(ctx context.Context, ownerUserID string, conversationID string) (*relationTb.ConversationModel, error)) (*relationTb.ConversationModel, error) {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (c *ConversationRedis) GetConversations(ctx context.Context, ownerUserID string, conversationIDs []string, fn FuncDB) ([]*relationTb.ConversationModel, error) {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (c *ConversationRedis) GetUserAllConversations(ctx context.Context, ownerUserID string, fn FuncDB) ([]*relationTb.ConversationModel, error) {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (c *ConversationRedis) DelUsersConversation(ctx context.Context, ownerUserIDList []string, conversationID string) error {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (c *ConversationRedis) GetUserRecvMsgOpt(ctx context.Context, ownerUserID, conversationID string, fn func(ctx context.Context, ownerUserID string, conversationID string) (opt int, err error)) (opt int, err error) {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (c *ConversationRedis) GetSuperGroupRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string, fn func(ctx context.Context, groupID string) (userIDs []string, err error)) (userIDs []string, err error) {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func NewConversationRedis(rcClient *rockscache.Client) *ConversationRedis {
|
||||
return &ConversationRedis{rcClient: rcClient}
|
||||
}
|
||||
|
||||
Vendored
+126
-115
@@ -1,17 +1,14 @@
|
||||
package cache
|
||||
|
||||
import (
|
||||
"Open_IM/pkg/common/db/relation"
|
||||
relationTb "Open_IM/pkg/common/db/table/relation"
|
||||
"Open_IM/pkg/common/db/unrelation"
|
||||
"Open_IM/pkg/common/tracelog"
|
||||
"Open_IM/pkg/utils"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"github.com/dtm-labs/rockscache"
|
||||
"github.com/go-redis/redis/v8"
|
||||
"math/big"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
@@ -157,6 +154,27 @@ func (g *GroupCacheRedis) GetGroupMembersHash(ctx context.Context, groupID strin
|
||||
})
|
||||
}
|
||||
|
||||
func (g *GroupCacheRedis) GetGroupMemberHash1(ctx context.Context, groupIDs []string) (map[string]*relationTb.GroupSimpleUserID, error) {
|
||||
// todo
|
||||
mapGroupUserIDs, err := g.groupMember.FindJoinUserID(ctx, groupIDs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
res := make(map[string]*relationTb.GroupSimpleUserID)
|
||||
for _, groupID := range groupIDs {
|
||||
userIDs := mapGroupUserIDs[groupID]
|
||||
users := &relationTb.GroupSimpleUserID{}
|
||||
if len(userIDs) > 0 {
|
||||
utils.Sort(userIDs, true)
|
||||
bi := big.NewInt(0)
|
||||
bi.SetString(utils.Md5(strings.Join(userIDs, ";"))[0:8], 16)
|
||||
users.Hash = bi.Uint64()
|
||||
}
|
||||
res[groupID] = users
|
||||
}
|
||||
return res, nil
|
||||
}
|
||||
|
||||
func (g *GroupCacheRedis) DelGroupMembersHash(ctx context.Context, groupID string) (err error) {
|
||||
defer func() {
|
||||
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupID", groupID)
|
||||
@@ -178,111 +196,104 @@ func (g *GroupCacheRedis) DelGroupMemberIDs(ctx context.Context, groupID string)
|
||||
return g.rcClient.TagAsDeleted(g.getGroupMemberIDsKey(groupID))
|
||||
}
|
||||
|
||||
// JoinedGroups
|
||||
func (g *GroupCacheRedis) GetJoinedGroupIDs(ctx context.Context, userID string) (joinedGroupIDs []string, err error) {
|
||||
getJoinedGroupIDList := func() (string, error) {
|
||||
joinedGroupList, err := relation.GetJoinedGroupIDListByUserID(userID)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
bytes, err := json.Marshal(joinedGroupList)
|
||||
if err != nil {
|
||||
return "", utils.Wrap(err, "")
|
||||
}
|
||||
return string(bytes), nil
|
||||
}
|
||||
defer func() {
|
||||
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "userID", userID, "joinedGroupIDs", joinedGroupIDs)
|
||||
}()
|
||||
joinedGroupIDListStr, err := g.rcClient.Fetch(g.getJoinedGroupsKey(userID), time.Second*30*60, getJoinedGroupIDList)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
err = json.Unmarshal([]byte(joinedGroupIDListStr), &joinedGroupIDs)
|
||||
return joinedGroupIDs, utils.Wrap(err, "")
|
||||
}
|
||||
//// JoinedGroups
|
||||
//func (g *GroupCacheRedis) GetJoinedGroupIDs(ctx context.Context, userID string) (joinedGroupIDs []string, err error) {
|
||||
// getJoinedGroupIDList := func() (string, error) {
|
||||
// joinedGroupList, err := relation.GetJoinedGroupIDListByUserID(userID)
|
||||
// if err != nil {
|
||||
// return "", err
|
||||
// }
|
||||
// bytes, err := json.Marshal(joinedGroupList)
|
||||
// if err != nil {
|
||||
// return "", utils.Wrap(err, "")
|
||||
// }
|
||||
// return string(bytes), nil
|
||||
// }
|
||||
// defer func() {
|
||||
// tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "userID", userID, "joinedGroupIDs", joinedGroupIDs)
|
||||
// }()
|
||||
// joinedGroupIDListStr, err := g.rcClient.Fetch(g.getJoinedGroupsKey(userID), time.Second*30*60, getJoinedGroupIDList)
|
||||
// if err != nil {
|
||||
// return nil, err
|
||||
// }
|
||||
// err = json.Unmarshal([]byte(joinedGroupIDListStr), &joinedGroupIDs)
|
||||
// return joinedGroupIDs, utils.Wrap(err, "")
|
||||
//}
|
||||
|
||||
func (g *GroupCacheRedis) DelJoinedGroupIDs(ctx context.Context, userID string) (err error) {
|
||||
func (g *GroupCacheRedis) DelJoinedGroupID(ctx context.Context, userID string) (err error) {
|
||||
defer func() {
|
||||
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "userID", userID)
|
||||
}()
|
||||
return g.rcClient.TagAsDeleted(g.getJoinedGroupsKey(userID))
|
||||
}
|
||||
|
||||
// GetGroupMemberInfo
|
||||
func (g *GroupCacheRedis) GetGroupMemberInfo(ctx context.Context, groupID, userID string) (groupMember *relation.GroupMember, err error) {
|
||||
getGroupMemberInfo := func() (string, error) {
|
||||
groupMemberInfo, err := relation.GetGroupMemberInfoByGroupIDAndUserID(groupID, userID)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
bytes, err := json.Marshal(groupMemberInfo)
|
||||
if err != nil {
|
||||
return "", utils.Wrap(err, "")
|
||||
}
|
||||
return string(bytes), nil
|
||||
}
|
||||
defer func() {
|
||||
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupID", groupID, "userID", userID, "groupMember", *groupMember)
|
||||
}()
|
||||
groupMemberInfoStr, err := g.rcClient.Fetch(g.getGroupMemberInfoKey(groupID, userID), time.Second*30*60, getGroupMemberInfo)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
groupMember = &relation.GroupMember{}
|
||||
err = json.Unmarshal([]byte(groupMemberInfoStr), groupMember)
|
||||
return groupMember, utils.Wrap(err, "")
|
||||
//func (g *GroupCacheRedis) DelJoinedGroupIDs(ctx context.Context, userIDs []string) (err error) {
|
||||
// defer func() {
|
||||
// tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "userID", userID)
|
||||
// }()
|
||||
// for _, userID := range userIDs {
|
||||
// if err := g.DelJoinedGroupID(ctx, userID); err != nil {
|
||||
// return err
|
||||
// }
|
||||
// }
|
||||
// return nil
|
||||
//}
|
||||
|
||||
func (g *GroupCacheRedis) GetGroupMemberInfo(ctx context.Context, groupID, userID string) (groupMember *relationTb.GroupMemberModel, err error) {
|
||||
return GetCache(ctx, g.rcClient, g.getGroupMemberInfoKey(groupID, userID), g.expireTime, func(ctx context.Context) (*relationTb.GroupMemberModel, error) {
|
||||
return g.groupMember.Take(ctx, groupID, userID)
|
||||
})
|
||||
}
|
||||
|
||||
func (g *GroupCacheRedis) GetGroupMembersInfo(ctx context.Context, groupID, userIDs []string) (groupMember *relationTb.GroupMemberModel, err error) {
|
||||
//func (g *GroupCacheRedis) GetGroupMembersInfo(ctx context.Context, groupID, userIDs []string) (groupMember *relationTb.GroupMemberModel, err error) {
|
||||
//
|
||||
// return nil, err
|
||||
//}
|
||||
|
||||
return nil, err
|
||||
}
|
||||
|
||||
func (g *GroupCacheRedis) GetGroupMembersInfo(ctx context.Context, count, offset int32, groupID string) (groupMembers []*relation.GroupMember, err error) {
|
||||
defer func() {
|
||||
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "count", count, "offset", offset, "groupID", groupID, "groupMember", groupMembers)
|
||||
}()
|
||||
groupMemberIDList, err := g.GetGroupMemberIDs(ctx, groupID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if count < 0 || offset < 0 {
|
||||
return nil, nil
|
||||
}
|
||||
var groupMemberList []*relation.GroupMember
|
||||
var start, stop int32
|
||||
start = offset
|
||||
stop = offset + count
|
||||
l := int32(len(groupMemberIDList))
|
||||
if start > stop {
|
||||
return nil, nil
|
||||
}
|
||||
if start >= l {
|
||||
return nil, nil
|
||||
}
|
||||
if count != 0 {
|
||||
if stop >= l {
|
||||
stop = l
|
||||
}
|
||||
groupMemberIDList = groupMemberIDList[start:stop]
|
||||
} else {
|
||||
if l < 1000 {
|
||||
stop = l
|
||||
} else {
|
||||
stop = 1000
|
||||
}
|
||||
groupMemberIDList = groupMemberIDList[start:stop]
|
||||
}
|
||||
for _, userID := range groupMemberIDList {
|
||||
groupMember, err := g.GetGroupMemberInfo(ctx, groupID, userID)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
groupMembers = append(groupMembers, groupMember)
|
||||
}
|
||||
return groupMemberList, nil
|
||||
}
|
||||
//func (g *GroupCacheRedis) GetGroupMembersInfo(ctx context.Context, count, offset int32, groupID string) (groupMembers []*relation.GroupMember, err error) {
|
||||
// defer func() {
|
||||
// tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "count", count, "offset", offset, "groupID", groupID, "groupMember", groupMembers)
|
||||
// }()
|
||||
// groupMemberIDList, err := g.GetGroupMemberIDs(ctx, groupID)
|
||||
// if err != nil {
|
||||
// return nil, err
|
||||
// }
|
||||
// if count < 0 || offset < 0 {
|
||||
// return nil, nil
|
||||
// }
|
||||
// var groupMemberList []*relation.GroupMember
|
||||
// var start, stop int32
|
||||
// start = offset
|
||||
// stop = offset + count
|
||||
// l := int32(len(groupMemberIDList))
|
||||
// if start > stop {
|
||||
// return nil, nil
|
||||
// }
|
||||
// if start >= l {
|
||||
// return nil, nil
|
||||
// }
|
||||
// if count != 0 {
|
||||
// if stop >= l {
|
||||
// stop = l
|
||||
// }
|
||||
// groupMemberIDList = groupMemberIDList[start:stop]
|
||||
// } else {
|
||||
// if l < 1000 {
|
||||
// stop = l
|
||||
// } else {
|
||||
// stop = 1000
|
||||
// }
|
||||
// groupMemberIDList = groupMemberIDList[start:stop]
|
||||
// }
|
||||
// for _, userID := range groupMemberIDList {
|
||||
// groupMember, err := g.GetGroupMemberInfo(ctx, groupID, userID)
|
||||
// if err != nil {
|
||||
// return
|
||||
// }
|
||||
// groupMembers = append(groupMembers, groupMember)
|
||||
// }
|
||||
// return groupMemberList, nil
|
||||
//}
|
||||
|
||||
func (g *GroupCacheRedis) DelGroupMemberInfo(ctx context.Context, groupID, userID string) (err error) {
|
||||
defer func() {
|
||||
@@ -292,23 +303,23 @@ func (g *GroupCacheRedis) DelGroupMemberInfo(ctx context.Context, groupID, userI
|
||||
}
|
||||
|
||||
// groupMemberNum
|
||||
func (g *GroupCacheRedis) GetGroupMemberNum(ctx context.Context, groupID string) (num int, err error) {
|
||||
getGroupMemberNum := func() (string, error) {
|
||||
num, err := relation.GetGroupMemberNumByGroupID(groupID)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return strconv.Itoa(int(num)), nil
|
||||
}
|
||||
defer func() {
|
||||
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupID", groupID, "num", num)
|
||||
}()
|
||||
groupMember, err := g.rcClient.Fetch(g.getGroupMemberNumKey(groupID), time.Second*30*60, getGroupMemberNum)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return strconv.Atoi(groupMember)
|
||||
}
|
||||
//func (g *GroupCacheRedis) GetGroupMemberNum(ctx context.Context, groupID string) (num int, err error) {
|
||||
// getGroupMemberNum := func() (string, error) {
|
||||
// num, err := relation.GetGroupMemberNumByGroupID(groupID)
|
||||
// if err != nil {
|
||||
// return "", err
|
||||
// }
|
||||
// return strconv.Itoa(int(num)), nil
|
||||
// }
|
||||
// defer func() {
|
||||
// tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupID", groupID, "num", num)
|
||||
// }()
|
||||
// groupMember, err := g.rcClient.Fetch(g.getGroupMemberNumKey(groupID), time.Second*30*60, getGroupMemberNum)
|
||||
// if err != nil {
|
||||
// return 0, err
|
||||
// }
|
||||
// return strconv.Atoi(groupMember)
|
||||
//}
|
||||
|
||||
func (g *GroupCacheRedis) DelGroupMemberNum(ctx context.Context, groupID string) (err error) {
|
||||
defer func() {
|
||||
|
||||
Vendored
+23
-20
@@ -88,14 +88,14 @@ type Cache interface {
|
||||
|
||||
// native redis operate
|
||||
|
||||
type RedisClient struct {
|
||||
rdb redis.UniversalClient
|
||||
}
|
||||
//func NewRedis() *RedisClient {
|
||||
// o := &RedisClient{}
|
||||
// o.InitRedis()
|
||||
// return o
|
||||
//}
|
||||
|
||||
func (r *RedisClient) InitRedis() error {
|
||||
func NewRedis() (*RedisClient, error) {
|
||||
var rdb redis.UniversalClient
|
||||
var err error
|
||||
ctx := context.Background()
|
||||
if config.Config.Redis.EnableCluster {
|
||||
rdb = redis.NewClusterClient(&redis.ClusterOptions{
|
||||
Addrs: config.Config.Redis.DBAddress,
|
||||
@@ -103,11 +103,10 @@ func (r *RedisClient) InitRedis() error {
|
||||
Password: config.Config.Redis.DBPassWord, // no password set
|
||||
PoolSize: 50,
|
||||
})
|
||||
_, err = rdb.Ping(ctx).Result()
|
||||
if err != nil {
|
||||
fmt.Println("redis cluster failed address ", config.Config.Redis.DBAddress, config.Config.Redis.DBUserName, config.Config.Redis.DBPassWord)
|
||||
return err
|
||||
}
|
||||
//if err := rdb.Ping(ctx).Err();err != nil {
|
||||
// return nil, fmt.Errorf("redis ping %w", err)
|
||||
//}
|
||||
//return &RedisClient{rdb: rdb}, nil
|
||||
} else {
|
||||
rdb = redis.NewClient(&redis.Options{
|
||||
Addr: config.Config.Redis.DBAddress[0],
|
||||
@@ -116,18 +115,22 @@ func (r *RedisClient) InitRedis() error {
|
||||
DB: 0, // use default DB
|
||||
PoolSize: 100, // 连接池大小
|
||||
})
|
||||
_, err = rdb.Ping(ctx).Result()
|
||||
if err != nil {
|
||||
fmt.Println(" redis " + config.Config.Redis.DBAddress[0] + config.Config.Redis.DBUserName + config.Config.Redis.DBPassWord)
|
||||
return err
|
||||
}
|
||||
//err := rdb.Ping(ctx).Err()
|
||||
//if err != nil {
|
||||
// panic(err.Error() + " redis " + config.Config.Redis.DBAddress[0] + config.Config.Redis.DBUserName + config.Config.Redis.DBPassWord)
|
||||
//}
|
||||
}
|
||||
r.rdb = rdb
|
||||
return nil
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
|
||||
defer cancel()
|
||||
err := rdb.Ping(ctx).Err()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("redis ping %w", err)
|
||||
}
|
||||
return &RedisClient{rdb: rdb}, nil
|
||||
}
|
||||
|
||||
func (r *RedisClient) GetClient() redis.UniversalClient {
|
||||
return r.rdb
|
||||
type RedisClient struct {
|
||||
rdb redis.UniversalClient
|
||||
}
|
||||
|
||||
func NewRedisClient(rdb redis.UniversalClient) *RedisClient {
|
||||
|
||||
Reference in New Issue
Block a user