feat: new features merged (#2409)

* fix: GroupApplicationAcceptedNotification

* fix: GroupApplicationAcceptedNotification

* fix: NotificationUserInfoUpdate

* cicd: robot automated Change

* fix: component

* fix: getConversationInfo

* feat: cron task

* feat: cron task

* feat: cron task

* feat: cron task

* feat: cron task

* fix: minio config url recognition error

* new mongo

* new mongo

* new mongo

* new mongo

* new mongo

* new mongo

* new mongo

* new mongo

* friend incr sync

* friend incr sync

* friend incr sync

* friend incr sync

* friend incr sync

* mage

* optimization version log

* optimization version log

* sync

* sync

* sync

* group sync

* sync option

* sync option

* refactor: replace `friend` package with `realtion`.

* refactor: update lastest commit to relation.

* sync option

* sync option

* sync option

* sync

* sync

* go.mod

* seq

* update: go mod

* refactor: change incremental to full

* feat: get full friend user ids

* feat: api and config

* seq

* group version

* merge

* seq

* seq

* seq

* fix: sort by id avoid unstable sort friends.

* group

* group

* group

* fix: sort by id avoid unstable sort friends.

* fix: sort by id avoid unstable sort friends.

* fix: sort by id avoid unstable sort friends.

* user version

* seq

* seq

* seq user

* user online

* implement minio expire delete.

* user online

* config

* fix

* fix

* implement minio expire delete logic.

* online cache

* online cache

* online cache

* online cache

* online cache

* online cache

* online cache

* online cache

* online cache

* online cache

* online cache

* online cache

* feat: implement scheduled delete outdated object in minio.

* update gomake version

* update gomake version

* implement FindExpires pagination.

* remove unnesseary incr.

* fix uncorrect args call.

* online push

* online push

* online push

* resolving conflicts

* resolving conflicts

* test

* api prommetrics

* api prommetrics

* api prommetrics

* api prommetrics

* api prommetrics

* rpc prommetrics

* rpc prommetrics

* online status

* online status

* online status

* online status

* sub

* conversation version incremental

* merge seq

* merge online

* merge online

* merge online

* merge seq

* GetOwnerConversation

* fix: change incremental syncer router name.

* rockscache batch get

* rockscache seq batch get

* fix: GetMsgDocModelByIndex bug

* update go.mod

* update go.mod

* merge

* feat: prometheus

* feat: prometheus

---------

Co-authored-by: withchao <withchao@users.noreply.github.com>
Co-authored-by: Monet Lee <monet_lee@163.com>
Co-authored-by: OpenIM-Gordon <46924906+FGadvancer@users.noreply.github.com>
Co-authored-by: icey-yu <1186114839@qq.com>
This commit is contained in:
chao
2024-07-16 10:46:21 +08:00
committed by GitHub
parent 5f52fa19bd
commit 4aaf496086
97 changed files with 3743 additions and 1587 deletions
+94
View File
@@ -0,0 +1,94 @@
package redis
import (
"context"
"encoding/json"
"github.com/dtm-labs/rockscache"
"github.com/redis/go-redis/v9"
"golang.org/x/sync/singleflight"
"time"
"unsafe"
)
func getRocksCacheRedisClient(cli *rockscache.Client) redis.UniversalClient {
type Client struct {
rdb redis.UniversalClient
_ rockscache.Options
_ singleflight.Group
}
return (*Client)(unsafe.Pointer(cli)).rdb
}
func batchGetCache2[K comparable, V any](ctx context.Context, rcClient *rockscache.Client, expire time.Duration, ids []K, idKey func(id K) string, vId func(v *V) K, fn func(ctx context.Context, ids []K) ([]*V, error)) ([]*V, error) {
if len(ids) == 0 {
return nil, nil
}
findKeys := make([]string, 0, len(ids))
keyId := make(map[string]K)
for _, id := range ids {
key := idKey(id)
if _, ok := keyId[key]; ok {
continue
}
keyId[key] = id
findKeys = append(findKeys, key)
}
slotKeys, err := groupKeysBySlot(ctx, getRocksCacheRedisClient(rcClient), findKeys)
if err != nil {
return nil, err
}
result := make([]*V, 0, len(findKeys))
for _, keys := range slotKeys {
indexCache, err := rcClient.FetchBatch2(ctx, keys, expire, func(idx []int) (map[int]string, error) {
queryIds := make([]K, 0, len(idx))
idIndex := make(map[K]int)
for _, index := range idx {
id := keyId[keys[index]]
idIndex[id] = index
queryIds = append(queryIds, id)
}
values, err := fn(ctx, queryIds)
if err != nil {
return nil, err
}
if len(values) == 0 {
return map[int]string{}, nil
}
cacheIndex := make(map[int]string)
for _, value := range values {
id := vId(value)
index, ok := idIndex[id]
if !ok {
continue
}
bs, err := json.Marshal(value)
if err != nil {
return nil, err
}
cacheIndex[index] = string(bs)
}
return cacheIndex, nil
})
if err != nil {
return nil, err
}
for index, data := range indexCache {
if data == "" {
continue
}
var value V
if err := json.Unmarshal([]byte(data), &value); err != nil {
return nil, err
}
if cb, ok := any(&value).(BatchCacheCallback[K]); ok {
cb.BatchCache(keyId[keys[index]])
}
result = append(result, &value)
}
}
return result, nil
}
type BatchCacheCallback[K comparable] interface {
BatchCache(id K)
}
+27 -28
View File
@@ -23,7 +23,6 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/localcache"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/mw/specialerror"
"github.com/openimsdk/tools/utils/datautil"
"github.com/redis/go-redis/v9"
"time"
@@ -147,30 +146,30 @@ func getCache[T any](ctx context.Context, rcClient *rockscache.Client, key strin
return t, nil
}
func batchGetCache[T any, K comparable](
ctx context.Context,
rcClient *rockscache.Client,
expire time.Duration,
keys []K,
keyFn func(key K) string,
fns func(ctx context.Context, key K) (T, error),
) ([]T, error) {
if len(keys) == 0 {
return nil, nil
}
res := make([]T, 0, len(keys))
for _, key := range keys {
val, err := getCache(ctx, rcClient, keyFn(key), expire, func(ctx context.Context) (T, error) {
return fns(ctx, key)
})
if err != nil {
if errs.ErrRecordNotFound.Is(specialerror.ErrCode(errs.Unwrap(err))) {
continue
}
return nil, errs.Wrap(err)
}
res = append(res, val)
}
return res, nil
}
//func batchGetCache[T any, K comparable](
// ctx context.Context,
// rcClient *rockscache.Client,
// expire time.Duration,
// keys []K,
// keyFn func(key K) string,
// fns func(ctx context.Context, key K) (T, error),
//) ([]T, error) {
// if len(keys) == 0 {
// return nil, nil
// }
// res := make([]T, 0, len(keys))
// for _, key := range keys {
// val, err := getCache(ctx, rcClient, keyFn(key), expire, func(ctx context.Context) (T, error) {
// return fns(ctx, key)
// })
// if err != nil {
// if errs.ErrRecordNotFound.Is(specialerror.ErrCode(errs.Unwrap(err))) {
// continue
// }
// return nil, errs.Wrap(err)
// }
// res = append(res, val)
// }
//
// return res, nil
//}
+55
View File
@@ -0,0 +1,55 @@
package redis
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database/mgo"
"github.com/openimsdk/tools/db/mongoutil"
"github.com/openimsdk/tools/db/redisutil"
"testing"
)
func TestName(t *testing.T) {
//var rocks rockscache.Client
//rdb := getRocksCacheRedisClient(&rocks)
//t.Log(rdb == nil)
ctx := context.Background()
rdb, err := redisutil.NewRedisClient(ctx, (&config.Redis{
Address: []string{"172.16.8.48:16379"},
Password: "openIM123",
DB: 3,
}).Build())
if err != nil {
panic(err)
}
mgocli, err := mongoutil.NewMongoDB(ctx, (&config.Mongo{
Address: []string{"172.16.8.48:37017"},
Database: "openim_v3",
Username: "openIM",
Password: "openIM123",
MaxPoolSize: 100,
MaxRetry: 1,
}).Build())
if err != nil {
panic(err)
}
//userMgo, err := mgo.NewUserMongo(mgocli.GetDB())
//if err != nil {
// panic(err)
//}
//rock := rockscache.NewClient(rdb, rockscache.NewDefaultOptions())
mgoSeqUser, err := mgo.NewSeqUserMongo(mgocli.GetDB())
if err != nil {
panic(err)
}
seqUser := NewSeqUserCacheRedis(rdb, mgoSeqUser)
res, err := seqUser.GetReadSeqs(ctx, "2110910952", []string{"sg_2920732023", "sg_345762580"})
if err != nil {
panic(err)
}
t.Log(res)
}
+5 -3
View File
@@ -164,10 +164,12 @@ func (c *ConversationRedisCache) DelConversations(ownerUserID string, conversati
}
func (c *ConversationRedisCache) GetConversations(ctx context.Context, ownerUserID string, conversationIDs []string) ([]*model.Conversation, error) {
return batchGetCache(ctx, c.rcClient, c.expireTime, conversationIDs, func(conversationID string) string {
return batchGetCache2(ctx, c.rcClient, c.expireTime, conversationIDs, func(conversationID string) string {
return c.getConversationKey(ownerUserID, conversationID)
}, func(ctx context.Context, conversationID string) (*model.Conversation, error) {
return c.conversationDB.Take(ctx, ownerUserID, conversationID)
}, func(conversation *model.Conversation) string {
return conversation.ConversationID
}, func(ctx context.Context, conversationIDs []string) ([]*model.Conversation, error) {
return c.conversationDB.Find(ctx, ownerUserID, conversationIDs)
})
}
-25
View File
@@ -70,10 +70,6 @@ func (f *FriendCacheRedis) getFriendIDsKey(ownerUserID string) string {
return cachekey.GetFriendIDsKey(ownerUserID)
}
//func (f *FriendCacheRedis) getFriendSyncSortUserIDsKey(ownerUserID string) string {
// return cachekey.GetFriendSyncSortUserIDsKey(ownerUserID, f.syncCount)
//}
func (f *FriendCacheRedis) getFriendMaxVersionKey(ownerUserID string) string {
return cachekey.GetFriendMaxVersionKey(ownerUserID)
}
@@ -107,16 +103,6 @@ func (f *FriendCacheRedis) DelFriendIDs(ownerUserIDs ...string) cache.FriendCach
return newFriendCache
}
//func (f *FriendCacheRedis) DelSortFriendUserIDs(ownerUserIDs ...string) cache.FriendCache {
// newGroupCache := f.CloneFriendCache()
// keys := make([]string, 0, len(ownerUserIDs))
// for _, userID := range ownerUserIDs {
// keys = append(keys, f.getFriendSyncSortUserIDsKey(userID))
// }
// newGroupCache.AddKeys(keys...)
// return newGroupCache
//}
// GetTwoWayFriendIDs retrieves two-way friend IDs from the cache.
func (f *FriendCacheRedis) GetTwoWayFriendIDs(ctx context.Context, ownerUserID string) (twoWayFriendIDs []string, err error) {
friendIDs, err := f.GetFriendIDs(ctx, ownerUserID)
@@ -193,17 +179,6 @@ func (f *FriendCacheRedis) DelMaxFriendVersion(ownerUserIDs ...string) cache.Fri
return newFriendCache
}
//func (f *FriendCacheRedis) FindSortFriendUserIDs(ctx context.Context, ownerUserID string) ([]string, error) {
// userIDs, err := f.GetFriendIDs(ctx, ownerUserID)
// if err != nil {
// return nil, err
// }
// if len(userIDs) > f.syncCount {
// userIDs = userIDs[:f.syncCount]
// }
// return userIDs, nil
//}
func (f *FriendCacheRedis) FindMaxFriendVersion(ctx context.Context, ownerUserID string) (*model.VersionLog, error) {
return getCache(ctx, f.rcClient, f.getFriendMaxVersionKey(ownerUserID), f.expireTime, func(ctx context.Context) (*model.VersionLog, error) {
return f.friendDB.FindIncrVersion(ctx, ownerUserID, 0, 0)
+15 -75
View File
@@ -118,34 +118,12 @@ func (g *GroupCacheRedis) getJoinGroupMaxVersionKey(userID string) string {
return cachekey.GetJoinGroupMaxVersionKey(userID)
}
func (g *GroupCacheRedis) GetGroupIndex(group *model.Group, keys []string) (int, error) {
key := g.getGroupInfoKey(group.GroupID)
for i, _key := range keys {
if _key == key {
return i, nil
}
}
return 0, errIndex
}
func (g *GroupCacheRedis) GetGroupMemberIndex(groupMember *model.GroupMember, keys []string) (int, error) {
key := g.getGroupMemberInfoKey(groupMember.GroupID, groupMember.UserID)
for i, _key := range keys {
if _key == key {
return i, nil
}
}
return 0, errIndex
func (g *GroupCacheRedis) getGroupID(group *model.Group) string {
return group.GroupID
}
func (g *GroupCacheRedis) GetGroupsInfo(ctx context.Context, groupIDs []string) (groups []*model.Group, err error) {
return batchGetCache(ctx, g.rcClient, g.expireTime, groupIDs, func(groupID string) string {
return g.getGroupInfoKey(groupID)
}, func(ctx context.Context, groupID string) (*model.Group, error) {
return g.groupDB.Take(ctx, groupID)
})
return batchGetCache2(ctx, g.rcClient, g.expireTime, groupIDs, g.getGroupInfoKey, g.getGroupID, g.groupDB.Find)
}
func (g *GroupCacheRedis) GetGroupInfo(ctx context.Context, groupID string) (group *model.Group, err error) {
@@ -233,19 +211,6 @@ func (g *GroupCacheRedis) GetGroupMemberIDs(ctx context.Context, groupID string)
})
}
func (g *GroupCacheRedis) GetGroupsMemberIDs(ctx context.Context, groupIDs []string) (map[string][]string, error) {
m := make(map[string][]string)
for _, groupID := range groupIDs {
userIDs, err := g.GetGroupMemberIDs(ctx, groupID)
if err != nil {
return nil, err
}
m[groupID] = userIDs
}
return m, nil
}
func (g *GroupCacheRedis) DelGroupMemberIDs(groupID string) cache.GroupCache {
cache := g.CloneGroupCache()
cache.AddKeys(g.getGroupMemberIDsKey(groupID))
@@ -285,10 +250,12 @@ func (g *GroupCacheRedis) GetGroupMemberInfo(ctx context.Context, groupID, userI
}
func (g *GroupCacheRedis) GetGroupMembersInfo(ctx context.Context, groupID string, userIDs []string) ([]*model.GroupMember, error) {
return batchGetCache(ctx, g.rcClient, g.expireTime, userIDs, func(userID string) string {
return batchGetCache2(ctx, g.rcClient, g.expireTime, userIDs, func(userID string) string {
return g.getGroupMemberInfoKey(groupID, userID)
}, func(ctx context.Context, userID string) (*model.GroupMember, error) {
return g.groupMemberDB.Take(ctx, groupID, userID)
}, func(member *model.GroupMember) string {
return member.UserID
}, func(ctx context.Context, userIDs []string) ([]*model.GroupMember, error) {
return g.groupMemberDB.Find(ctx, groupID, userIDs)
})
}
@@ -301,14 +268,6 @@ func (g *GroupCacheRedis) GetAllGroupMembersInfo(ctx context.Context, groupID st
return g.GetGroupMembersInfo(ctx, groupID, groupMemberIDs)
}
func (g *GroupCacheRedis) GetAllGroupMemberInfo(ctx context.Context, groupID string) ([]*model.GroupMember, error) {
groupMemberIDs, err := g.GetGroupMemberIDs(ctx, groupID)
if err != nil {
return nil, err
}
return g.GetGroupMembersInfo(ctx, groupID, groupMemberIDs)
}
func (g *GroupCacheRedis) DelGroupMembersInfo(groupID string, userIDs ...string) cache.GroupCache {
keys := make([]string, 0, len(userIDs))
for _, userID := range userIDs {
@@ -388,42 +347,23 @@ func (g *GroupCacheRedis) GetGroupRolesLevelMemberInfo(ctx context.Context, grou
return g.GetGroupMembersInfo(ctx, groupID, userIDs)
}
func (g *GroupCacheRedis) FindGroupMemberUser(ctx context.Context, groupIDs []string, userID string) (_ []*model.GroupMember, err error) {
func (g *GroupCacheRedis) FindGroupMemberUser(ctx context.Context, groupIDs []string, userID string) ([]*model.GroupMember, error) {
if len(groupIDs) == 0 {
var err error
groupIDs, err = g.GetJoinedGroupIDs(ctx, userID)
if err != nil {
return nil, err
}
}
return batchGetCache(ctx, g.rcClient, g.expireTime, groupIDs, func(groupID string) string {
return batchGetCache2(ctx, g.rcClient, g.expireTime, groupIDs, func(groupID string) string {
return g.getGroupMemberInfoKey(groupID, userID)
}, func(ctx context.Context, groupID string) (*model.GroupMember, error) {
return g.groupMemberDB.Take(ctx, groupID, userID)
}, func(member *model.GroupMember) string {
return member.GroupID
}, func(ctx context.Context, groupIDs []string) ([]*model.GroupMember, error) {
return g.groupMemberDB.FindInGroup(ctx, userID, groupIDs)
})
}
//func (g *GroupCacheRedis) FindSortGroupMemberUserIDs(ctx context.Context, groupID string) ([]string, error) {
// userIDs, err := g.GetGroupMemberIDs(ctx, groupID)
// if err != nil {
// return nil, err
// }
// if len(userIDs) > g.syncCount {
// userIDs = userIDs[:g.syncCount]
// }
// return userIDs, nil
//}
//
//func (g *GroupCacheRedis) FindSortJoinGroupIDs(ctx context.Context, userID string) ([]string, error) {
// groupIDs, err := g.GetJoinedGroupIDs(ctx, userID)
// if err != nil {
// return nil, err
// }
// if len(groupIDs) > g.syncCount {
// groupIDs = groupIDs[:g.syncCount]
// }
// return groupIDs, nil
//}
func (g *GroupCacheRedis) DelMaxGroupMemberVersion(groupIDs ...string) cache.GroupCache {
keys := make([]string, 0, len(groupIDs))
for _, groupID := range groupIDs {
-1
View File
@@ -183,5 +183,4 @@ func (c *msgCache) GetMessagesBySeq(ctx context.Context, conversationID string,
return nil, nil, err
}
return seqMsgs, failedSeqs, nil
}
+89
View File
@@ -0,0 +1,89 @@
package redis
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey"
"github.com/openimsdk/tools/errs"
"github.com/redis/go-redis/v9"
"strconv"
"time"
)
func NewUserOnline(rdb redis.UniversalClient) cache.OnlineCache {
return &userOnline{
rdb: rdb,
expire: cachekey.OnlineExpire,
channelName: cachekey.OnlineChannel,
}
}
type userOnline struct {
rdb redis.UniversalClient
expire time.Duration
channelName string
}
func (s *userOnline) getUserOnlineKey(userID string) string {
return cachekey.GetOnlineKey(userID)
}
func (s *userOnline) GetOnline(ctx context.Context, userID string) ([]int32, error) {
members, err := s.rdb.ZRangeByScore(ctx, s.getUserOnlineKey(userID), &redis.ZRangeBy{
Min: strconv.FormatInt(time.Now().Unix(), 10),
Max: "+inf",
}).Result()
if err != nil {
return nil, errs.Wrap(err)
}
platformIDs := make([]int32, 0, len(members))
for _, member := range members {
val, err := strconv.Atoi(member)
if err != nil {
return nil, errs.Wrap(err)
}
platformIDs = append(platformIDs, int32(val))
}
return platformIDs, nil
}
func (s *userOnline) SetUserOnline(ctx context.Context, userID string, online, offline []int32) error {
script := `
local key = KEYS[1]
local score = ARGV[3]
local num1 = redis.call("ZCARD", key)
redis.call("ZREMRANGEBYSCORE", key, "-inf", ARGV[2])
for i = 5, tonumber(ARGV[4])+4 do
redis.call("ZREM", key, ARGV[i])
end
local num2 = redis.call("ZCARD", key)
for i = 5+tonumber(ARGV[4]), #ARGV do
redis.call("ZADD", key, score, ARGV[i])
end
redis.call("EXPIRE", key, ARGV[1])
local num3 = redis.call("ZCARD", key)
local change = (num1 ~= num2) or (num2 ~= num3)
if change then
local members = redis.call("ZRANGE", key, 0, -1)
table.insert(members, KEYS[2])
redis.call("PUBLISH", KEYS[3], table.concat(members, ":"))
return 1
else
return 0
end
`
now := time.Now()
argv := make([]any, 0, 2+len(online)+len(offline))
argv = append(argv, int32(s.expire/time.Second), now.Unix(), now.Add(s.expire).Unix(), int32(len(offline)))
for _, platformID := range offline {
argv = append(argv, platformID)
}
for _, platformID := range online {
argv = append(argv, platformID)
}
keys := []string{s.getUserOnlineKey(userID), userID, s.channelName}
if err := s.rdb.Eval(ctx, script, keys, argv).Err(); err != nil {
return err
}
return nil
}
+15 -1
View File
@@ -2,6 +2,7 @@ package redis
import (
"context"
"github.com/dtm-labs/rockscache"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/log"
"github.com/redis/go-redis/v9"
@@ -109,7 +110,7 @@ func (rsm *RedisShardManager) ProcessKeysBySlot(
func groupKeysBySlot(ctx context.Context, redisClient redis.UniversalClient, keys []string) (map[int64][]string, error) {
slots := make(map[int64][]string)
clusterClient, isCluster := redisClient.(*redis.ClusterClient)
if isCluster {
if isCluster && len(keys) > 1 {
pipe := clusterClient.Pipeline()
cmds := make([]*redis.IntCmd, len(keys))
for i, key := range keys {
@@ -195,3 +196,16 @@ func ProcessKeysBySlot(
}
return nil
}
func DeleteCacheBySlot(ctx context.Context, rcClient *rockscache.Client, keys []string) error {
switch len(keys) {
case 0:
return nil
case 1:
return rcClient.TagAsDeletedBatch2(ctx, keys)
default:
return ProcessKeysBySlot(ctx, getRocksCacheRedisClient(rcClient), keys, func(ctx context.Context, slot int64, keys []string) error {
return rcClient.TagAsDeletedBatch2(ctx, keys)
})
}
}
-200
View File
@@ -1,200 +0,0 @@
// Copyright © 2023 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package redis
import (
"context"
"errors"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/utils/stringutil"
"github.com/redis/go-redis/v9"
"sync"
)
func NewSeqCache(rdb redis.UniversalClient) cache.SeqCache {
return &seqCache{rdb: rdb}
}
type seqCache struct {
rdb redis.UniversalClient
}
func (c *seqCache) getMaxSeqKey(conversationID string) string {
return cachekey.GetMaxSeqKey(conversationID)
}
func (c *seqCache) getMinSeqKey(conversationID string) string {
return cachekey.GetMinSeqKey(conversationID)
}
func (c *seqCache) getHasReadSeqKey(conversationID string, userID string) string {
return cachekey.GetHasReadSeqKey(conversationID, userID)
}
func (c *seqCache) getConversationUserMinSeqKey(conversationID, userID string) string {
return cachekey.GetConversationUserMinSeqKey(conversationID, userID)
}
func (c *seqCache) setSeq(ctx context.Context, conversationID string, seq int64, getkey func(conversationID string) string) error {
return errs.Wrap(c.rdb.Set(ctx, getkey(conversationID), seq, 0).Err())
}
func (c *seqCache) getSeq(ctx context.Context, conversationID string, getkey func(conversationID string) string) (int64, error) {
val, err := c.rdb.Get(ctx, getkey(conversationID)).Int64()
if err != nil {
return 0, errs.Wrap(err)
}
return val, nil
}
func (c *seqCache) getSeqs(ctx context.Context, items []string, getkey func(s string) string) (m map[string]int64, err error) {
m = make(map[string]int64, len(items))
var (
reverseMap = make(map[string]string, len(items))
keys = make([]string, len(items))
lock sync.Mutex
)
for i, v := range items {
keys[i] = getkey(v)
reverseMap[getkey(v)] = v
}
manager := NewRedisShardManager(c.rdb)
if err = manager.ProcessKeysBySlot(ctx, keys, func(ctx context.Context, _ int64, keys []string) error {
res, err := c.rdb.MGet(ctx, keys...).Result()
if err != nil && !errors.Is(err, redis.Nil) {
return errs.Wrap(err)
}
// len(res) <= len(items)
for i := range res {
strRes, ok := res[i].(string)
if !ok {
continue
}
val := stringutil.StringToInt64(strRes)
if val != 0 {
lock.Lock()
m[reverseMap[keys[i]]] = val
lock.Unlock()
}
}
return nil
}); err != nil {
return nil, err
}
return m, nil
}
func (c *seqCache) SetMaxSeq(ctx context.Context, conversationID string, maxSeq int64) error {
return c.setSeq(ctx, conversationID, maxSeq, c.getMaxSeqKey)
}
func (c *seqCache) GetMaxSeqs(ctx context.Context, conversationIDs []string) (m map[string]int64, err error) {
return c.getSeqs(ctx, conversationIDs, c.getMaxSeqKey)
}
func (c *seqCache) GetMaxSeq(ctx context.Context, conversationID string) (int64, error) {
return c.getSeq(ctx, conversationID, c.getMaxSeqKey)
}
func (c *seqCache) SetMinSeq(ctx context.Context, conversationID string, minSeq int64) error {
return c.setSeq(ctx, conversationID, minSeq, c.getMinSeqKey)
}
func (c *seqCache) setSeqs(ctx context.Context, seqs map[string]int64, getkey func(key string) string) error {
for conversationID, seq := range seqs {
if err := c.rdb.Set(ctx, getkey(conversationID), seq, 0).Err(); err != nil {
return errs.Wrap(err)
}
}
return nil
}
func (c *seqCache) SetMinSeqs(ctx context.Context, seqs map[string]int64) error {
return c.setSeqs(ctx, seqs, c.getMinSeqKey)
}
func (c *seqCache) GetMinSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error) {
return c.getSeqs(ctx, conversationIDs, c.getMinSeqKey)
}
func (c *seqCache) GetMinSeq(ctx context.Context, conversationID string) (int64, error) {
return c.getSeq(ctx, conversationID, c.getMinSeqKey)
}
func (c *seqCache) GetConversationUserMinSeq(ctx context.Context, conversationID string, userID string) (int64, error) {
val, err := c.rdb.Get(ctx, c.getConversationUserMinSeqKey(conversationID, userID)).Int64()
if err != nil {
return 0, errs.Wrap(err)
}
return val, nil
}
func (c *seqCache) GetConversationUserMinSeqs(ctx context.Context, conversationID string, userIDs []string) (m map[string]int64, err error) {
return c.getSeqs(ctx, userIDs, func(userID string) string {
return c.getConversationUserMinSeqKey(conversationID, userID)
})
}
func (c *seqCache) SetConversationUserMinSeq(ctx context.Context, conversationID string, userID string, minSeq int64) error {
return errs.Wrap(c.rdb.Set(ctx, c.getConversationUserMinSeqKey(conversationID, userID), minSeq, 0).Err())
}
func (c *seqCache) SetConversationUserMinSeqs(ctx context.Context, conversationID string, seqs map[string]int64) (err error) {
return c.setSeqs(ctx, seqs, func(userID string) string {
return c.getConversationUserMinSeqKey(conversationID, userID)
})
}
func (c *seqCache) SetUserConversationsMinSeqs(ctx context.Context, userID string, seqs map[string]int64) (err error) {
return c.setSeqs(ctx, seqs, func(conversationID string) string {
return c.getConversationUserMinSeqKey(conversationID, userID)
})
}
func (c *seqCache) SetHasReadSeq(ctx context.Context, userID string, conversationID string, hasReadSeq int64) error {
return errs.Wrap(c.rdb.Set(ctx, c.getHasReadSeqKey(conversationID, userID), hasReadSeq, 0).Err())
}
func (c *seqCache) SetHasReadSeqs(ctx context.Context, conversationID string, hasReadSeqs map[string]int64) error {
return c.setSeqs(ctx, hasReadSeqs, func(userID string) string {
return c.getHasReadSeqKey(conversationID, userID)
})
}
func (c *seqCache) UserSetHasReadSeqs(ctx context.Context, userID string, hasReadSeqs map[string]int64) error {
return c.setSeqs(ctx, hasReadSeqs, func(conversationID string) string {
return c.getHasReadSeqKey(conversationID, userID)
})
}
func (c *seqCache) GetHasReadSeqs(ctx context.Context, userID string, conversationIDs []string) (map[string]int64, error) {
return c.getSeqs(ctx, conversationIDs, func(conversationID string) string {
return c.getHasReadSeqKey(conversationID, userID)
})
}
func (c *seqCache) GetHasReadSeq(ctx context.Context, userID string, conversationID string) (int64, error) {
val, err := c.rdb.Get(ctx, c.getHasReadSeqKey(conversationID, userID)).Int64()
if err != nil {
return 0, err
}
return val, nil
}
+333
View File
@@ -0,0 +1,333 @@
package redis
import (
"context"
"errors"
"fmt"
"github.com/dtm-labs/rockscache"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/log"
"github.com/redis/go-redis/v9"
"time"
)
func NewSeqConversationCacheRedis(rdb redis.UniversalClient, mgo database.SeqConversation) cache.SeqConversationCache {
return &seqConversationCacheRedis{
rdb: rdb,
mgo: mgo,
lockTime: time.Second * 3,
dataTime: time.Hour * 24 * 365,
minSeqExpireTime: time.Hour,
rocks: rockscache.NewClient(rdb, *GetRocksCacheOptions()),
}
}
type seqConversationCacheRedis struct {
rdb redis.UniversalClient
mgo database.SeqConversation
rocks *rockscache.Client
lockTime time.Duration
dataTime time.Duration
minSeqExpireTime time.Duration
}
func (s *seqConversationCacheRedis) getMinSeqKey(conversationID string) string {
return cachekey.GetMallocMinSeqKey(conversationID)
}
func (s *seqConversationCacheRedis) SetMinSeq(ctx context.Context, conversationID string, seq int64) error {
return s.SetMinSeqs(ctx, map[string]int64{conversationID: seq})
}
func (s *seqConversationCacheRedis) GetMinSeq(ctx context.Context, conversationID string) (int64, error) {
return getCache(ctx, s.rocks, s.getMinSeqKey(conversationID), s.minSeqExpireTime, func(ctx context.Context) (int64, error) {
return s.mgo.GetMinSeq(ctx, conversationID)
})
}
func (s *seqConversationCacheRedis) getSingleMaxSeq(ctx context.Context, conversationID string) (map[string]int64, error) {
seq, err := s.GetMaxSeq(ctx, conversationID)
if err != nil {
return nil, err
}
return map[string]int64{conversationID: seq}, nil
}
func (s *seqConversationCacheRedis) batchGetMaxSeq(ctx context.Context, keys []string, keyConversationID map[string]string, seqs map[string]int64) error {
result := make([]*redis.StringCmd, len(keys))
pipe := s.rdb.Pipeline()
for i, key := range keys {
result[i] = pipe.HGet(ctx, key, "CURR")
}
if _, err := pipe.Exec(ctx); err != nil {
return errs.Wrap(err)
}
var notFoundKey []string
for i, r := range result {
req, err := r.Int64()
if err == nil {
seqs[keyConversationID[keys[i]]] = req
} else if errors.Is(err, redis.Nil) {
notFoundKey = append(notFoundKey, keys[i])
} else {
return errs.Wrap(err)
}
}
if len(notFoundKey) > 0 {
conversationID := keyConversationID[notFoundKey[0]]
seq, err := s.GetMaxSeq(ctx, conversationID)
if err != nil {
return err
}
seqs[conversationID] = seq
}
return nil
}
func (s *seqConversationCacheRedis) GetMaxSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error) {
switch len(conversationIDs) {
case 0:
return map[string]int64{}, nil
case 1:
return s.getSingleMaxSeq(ctx, conversationIDs[0])
}
keys := make([]string, 0, len(conversationIDs))
keyConversationID := make(map[string]string, len(conversationIDs))
for _, conversationID := range conversationIDs {
key := s.getSeqMallocKey(conversationID)
if _, ok := keyConversationID[key]; ok {
continue
}
keys = append(keys, key)
keyConversationID[key] = conversationID
}
if len(keys) == 1 {
return s.getSingleMaxSeq(ctx, conversationIDs[0])
}
slotKeys, err := groupKeysBySlot(ctx, s.rdb, keys)
if err != nil {
return nil, err
}
seqs := make(map[string]int64, len(conversationIDs))
for _, keys := range slotKeys {
if err := s.batchGetMaxSeq(ctx, keys, keyConversationID, seqs); err != nil {
return nil, err
}
}
return seqs, nil
}
func (s *seqConversationCacheRedis) getSeqMallocKey(conversationID string) string {
return cachekey.GetMallocSeqKey(conversationID)
}
func (s *seqConversationCacheRedis) setSeq(ctx context.Context, key string, owner int64, currSeq int64, lastSeq int64) (int64, error) {
if lastSeq < currSeq {
return 0, errs.New("lastSeq must be greater than currSeq")
}
// 0: success
// 1: success the lock has expired, but has not been locked by anyone else
// 2: already locked, but not by yourself
script := `
local key = KEYS[1]
local lockValue = ARGV[1]
local dataSecond = ARGV[2]
local curr_seq = tonumber(ARGV[3])
local last_seq = tonumber(ARGV[4])
if redis.call("EXISTS", key) == 0 then
redis.call("HSET", key, "CURR", curr_seq, "LAST", last_seq)
redis.call("EXPIRE", key, dataSecond)
return 1
end
if redis.call("HGET", key, "LOCK") ~= lockValue then
return 2
end
redis.call("HDEL", key, "LOCK")
redis.call("HSET", key, "CURR", curr_seq, "LAST", last_seq)
redis.call("EXPIRE", key, dataSecond)
return 0
`
result, err := s.rdb.Eval(ctx, script, []string{key}, owner, int64(s.dataTime/time.Second), currSeq, lastSeq).Int64()
if err != nil {
return 0, errs.Wrap(err)
}
return result, nil
}
// malloc size=0 is to get the current seq size>0 is to allocate seq
func (s *seqConversationCacheRedis) malloc(ctx context.Context, key string, size int64) ([]int64, error) {
// 0: success
// 1: need to obtain and lock
// 2: already locked
// 3: exceeded the maximum value and locked
script := `
local key = KEYS[1]
local size = tonumber(ARGV[1])
local lockSecond = ARGV[2]
local dataSecond = ARGV[3]
local result = {}
if redis.call("EXISTS", key) == 0 then
local lockValue = math.random(0, 999999999)
redis.call("HSET", key, "LOCK", lockValue)
redis.call("EXPIRE", key, lockSecond)
table.insert(result, 1)
table.insert(result, lockValue)
return result
end
if redis.call("HEXISTS", key, "LOCK") == 1 then
table.insert(result, 2)
return result
end
local curr_seq = tonumber(redis.call("HGET", key, "CURR"))
local last_seq = tonumber(redis.call("HGET", key, "LAST"))
if size == 0 then
redis.call("EXPIRE", key, dataSecond)
table.insert(result, 0)
table.insert(result, curr_seq)
table.insert(result, last_seq)
return result
end
local max_seq = curr_seq + size
if max_seq > last_seq then
local lockValue = math.random(0, 999999999)
redis.call("HSET", key, "LOCK", lockValue)
redis.call("HSET", key, "CURR", last_seq)
redis.call("EXPIRE", key, lockSecond)
table.insert(result, 3)
table.insert(result, curr_seq)
table.insert(result, last_seq)
table.insert(result, lockValue)
return result
end
redis.call("HSET", key, "CURR", max_seq)
redis.call("EXPIRE", key, dataSecond)
table.insert(result, 0)
table.insert(result, curr_seq)
table.insert(result, last_seq)
return result
`
result, err := s.rdb.Eval(ctx, script, []string{key}, size, int64(s.lockTime/time.Second), int64(s.dataTime/time.Second)).Int64Slice()
if err != nil {
return nil, errs.Wrap(err)
}
return result, nil
}
func (s *seqConversationCacheRedis) wait(ctx context.Context) error {
timer := time.NewTimer(time.Second / 4)
defer timer.Stop()
select {
case <-timer.C:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
func (s *seqConversationCacheRedis) setSeqRetry(ctx context.Context, key string, owner int64, currSeq int64, lastSeq int64) {
for i := 0; i < 10; i++ {
state, err := s.setSeq(ctx, key, owner, currSeq, lastSeq)
if err != nil {
log.ZError(ctx, "set seq cache failed", err, "key", key, "owner", owner, "currSeq", currSeq, "lastSeq", lastSeq, "count", i+1)
if err := s.wait(ctx); err != nil {
return
}
continue
}
switch state {
case 0: // ideal state
case 1:
log.ZWarn(ctx, "set seq cache lock not found", nil, "key", key, "owner", owner, "currSeq", currSeq, "lastSeq", lastSeq)
case 2:
log.ZWarn(ctx, "set seq cache lock to be held by someone else", nil, "key", key, "owner", owner, "currSeq", currSeq, "lastSeq", lastSeq)
default:
log.ZError(ctx, "set seq cache lock unknown state", nil, "key", key, "owner", owner, "currSeq", currSeq, "lastSeq", lastSeq)
}
return
}
log.ZError(ctx, "set seq cache retrying still failed", nil, "key", key, "owner", owner, "currSeq", currSeq, "lastSeq", lastSeq)
}
func (s *seqConversationCacheRedis) getMallocSize(conversationID string, size int64) int64 {
if size == 0 {
return 0
}
var basicSize int64
if msgprocessor.IsGroupConversationID(conversationID) {
basicSize = 100
} else {
basicSize = 50
}
basicSize += size
return basicSize
}
func (s *seqConversationCacheRedis) Malloc(ctx context.Context, conversationID string, size int64) (int64, error) {
if size < 0 {
return 0, errs.New("size must be greater than 0")
}
key := s.getSeqMallocKey(conversationID)
for i := 0; i < 10; i++ {
states, err := s.malloc(ctx, key, size)
if err != nil {
return 0, err
}
switch states[0] {
case 0: // success
return states[1], nil
case 1: // not found
mallocSize := s.getMallocSize(conversationID, size)
seq, err := s.mgo.Malloc(ctx, conversationID, mallocSize)
if err != nil {
return 0, err
}
s.setSeqRetry(ctx, key, states[1], seq+size, seq+mallocSize)
return seq, nil
case 2: // locked
if err := s.wait(ctx); err != nil {
return 0, err
}
continue
case 3: // exceeded cache max value
currSeq := states[1]
lastSeq := states[2]
mallocSize := s.getMallocSize(conversationID, size)
seq, err := s.mgo.Malloc(ctx, conversationID, mallocSize)
if err != nil {
return 0, err
}
if lastSeq == seq {
s.setSeqRetry(ctx, key, states[3], currSeq+size, seq+mallocSize)
return currSeq, nil
} else {
log.ZWarn(ctx, "malloc seq not equal cache last seq", nil, "conversationID", conversationID, "currSeq", currSeq, "lastSeq", lastSeq, "mallocSeq", seq)
s.setSeqRetry(ctx, key, states[3], seq+size, seq+mallocSize)
return seq, nil
}
default:
log.ZError(ctx, "malloc seq unknown state", nil, "state", states[0], "conversationID", conversationID, "size", size)
return 0, errs.New(fmt.Sprintf("unknown state: %d", states[0]))
}
}
log.ZError(ctx, "malloc seq retrying still failed", nil, "conversationID", conversationID, "size", size)
return 0, errs.New("malloc seq waiting for lock timeout", "conversationID", conversationID, "size", size)
}
func (s *seqConversationCacheRedis) GetMaxSeq(ctx context.Context, conversationID string) (int64, error) {
return s.Malloc(ctx, conversationID, 0)
}
func (s *seqConversationCacheRedis) SetMinSeqs(ctx context.Context, seqs map[string]int64) error {
keys := make([]string, 0, len(seqs))
for conversationID, seq := range seqs {
keys = append(keys, s.getMinSeqKey(conversationID))
if err := s.mgo.SetMinSeq(ctx, conversationID, seq); err != nil {
return err
}
}
return DeleteCacheBySlot(ctx, s.rocks, keys)
}
+109
View File
@@ -0,0 +1,109 @@
package redis
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database/mgo"
"github.com/redis/go-redis/v9"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"strconv"
"sync"
"sync/atomic"
"testing"
"time"
)
func newTestSeq() *seqConversationCacheRedis {
mgocli, err := mongo.Connect(context.Background(), options.Client().ApplyURI("mongodb://openIM:openIM123@172.16.8.48:37017/openim_v3?maxPoolSize=100").SetConnectTimeout(5*time.Second))
if err != nil {
panic(err)
}
model, err := mgo.NewSeqConversationMongo(mgocli.Database("openim_v3"))
if err != nil {
panic(err)
}
opt := &redis.Options{
Addr: "172.16.8.48:16379",
Password: "openIM123",
DB: 1,
}
rdb := redis.NewClient(opt)
if err := rdb.Ping(context.Background()).Err(); err != nil {
panic(err)
}
return NewSeqConversationCacheRedis(rdb, model).(*seqConversationCacheRedis)
}
func TestSeq(t *testing.T) {
ts := newTestSeq()
var (
wg sync.WaitGroup
speed atomic.Int64
)
const count = 128
wg.Add(count)
for i := 0; i < count; i++ {
index := i + 1
go func() {
defer wg.Done()
var size int64 = 10
cID := strconv.Itoa(index * 1)
for i := 1; ; i++ {
//first, err := ts.mgo.Malloc(context.Background(), cID, size) // mongo
first, err := ts.Malloc(context.Background(), cID, size) // redis
if err != nil {
t.Logf("[%d-%d] %s %s", index, i, cID, err)
return
}
speed.Add(size)
_ = first
//t.Logf("[%d] %d -> %d", i, first+1, first+size)
}
}()
}
done := make(chan struct{})
go func() {
wg.Wait()
close(done)
}()
ticker := time.NewTicker(time.Second)
for {
select {
case <-done:
ticker.Stop()
return
case <-ticker.C:
value := speed.Swap(0)
t.Logf("speed: %d/s", value)
}
}
}
func TestDel(t *testing.T) {
ts := newTestSeq()
for i := 1; i < 100; i++ {
var size int64 = 100
first, err := ts.Malloc(context.Background(), "100", size)
if err != nil {
t.Logf("[%d] %s", i, err)
return
}
t.Logf("[%d] %d -> %d", i, first+1, first+size)
time.Sleep(time.Second)
}
}
func TestSeqMalloc(t *testing.T) {
ts := newTestSeq()
t.Log(ts.GetMaxSeq(context.Background(), "100"))
}
func TestMinSeq(t *testing.T) {
ts := newTestSeq()
t.Log(ts.GetMinSeq(context.Background(), "10000000"))
}
+185
View File
@@ -0,0 +1,185 @@
package redis
import (
"context"
"github.com/dtm-labs/rockscache"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
"github.com/openimsdk/tools/errs"
"github.com/redis/go-redis/v9"
"strconv"
"time"
)
func NewSeqUserCacheRedis(rdb redis.UniversalClient, mgo database.SeqUser) cache.SeqUser {
return &seqUserCacheRedis{
rdb: rdb,
mgo: mgo,
readSeqWriteRatio: 100,
expireTime: time.Hour * 24 * 7,
readExpireTime: time.Hour * 24 * 30,
rocks: rockscache.NewClient(rdb, *GetRocksCacheOptions()),
}
}
type seqUserCacheRedis struct {
rdb redis.UniversalClient
mgo database.SeqUser
rocks *rockscache.Client
expireTime time.Duration
readExpireTime time.Duration
readSeqWriteRatio int64
}
func (s *seqUserCacheRedis) getSeqUserMaxSeqKey(conversationID string, userID string) string {
return cachekey.GetSeqUserMaxSeqKey(conversationID, userID)
}
func (s *seqUserCacheRedis) getSeqUserMinSeqKey(conversationID string, userID string) string {
return cachekey.GetSeqUserMinSeqKey(conversationID, userID)
}
func (s *seqUserCacheRedis) getSeqUserReadSeqKey(conversationID string, userID string) string {
return cachekey.GetSeqUserReadSeqKey(conversationID, userID)
}
func (s *seqUserCacheRedis) GetMaxSeq(ctx context.Context, conversationID string, userID string) (int64, error) {
return getCache(ctx, s.rocks, s.getSeqUserMaxSeqKey(conversationID, userID), s.expireTime, func(ctx context.Context) (int64, error) {
return s.mgo.GetMaxSeq(ctx, conversationID, userID)
})
}
func (s *seqUserCacheRedis) SetMaxSeq(ctx context.Context, conversationID string, userID string, seq int64) error {
if err := s.mgo.SetMaxSeq(ctx, conversationID, userID, seq); err != nil {
return err
}
return s.rocks.TagAsDeleted2(ctx, s.getSeqUserMaxSeqKey(conversationID, userID))
}
func (s *seqUserCacheRedis) GetMinSeq(ctx context.Context, conversationID string, userID string) (int64, error) {
return getCache(ctx, s.rocks, s.getSeqUserMinSeqKey(conversationID, userID), s.expireTime, func(ctx context.Context) (int64, error) {
return s.mgo.GetMaxSeq(ctx, conversationID, userID)
})
}
func (s *seqUserCacheRedis) SetMinSeq(ctx context.Context, conversationID string, userID string, seq int64) error {
return s.SetMinSeqs(ctx, userID, map[string]int64{conversationID: seq})
}
func (s *seqUserCacheRedis) GetReadSeq(ctx context.Context, conversationID string, userID string) (int64, error) {
return getCache(ctx, s.rocks, s.getSeqUserReadSeqKey(conversationID, userID), s.readExpireTime, func(ctx context.Context) (int64, error) {
return s.mgo.GetMaxSeq(ctx, conversationID, userID)
})
}
func (s *seqUserCacheRedis) SetReadSeq(ctx context.Context, conversationID string, userID string, seq int64) error {
if seq%s.readSeqWriteRatio == 0 {
if err := s.mgo.SetReadSeq(ctx, conversationID, userID, seq); err != nil {
return err
}
}
if err := s.rocks.RawSet(ctx, s.getSeqUserReadSeqKey(conversationID, userID), strconv.Itoa(int(seq)), s.readExpireTime); err != nil {
return errs.Wrap(err)
}
return nil
}
func (s *seqUserCacheRedis) SetMinSeqs(ctx context.Context, userID string, seqs map[string]int64) error {
keys := make([]string, 0, len(seqs))
for conversationID, seq := range seqs {
if err := s.mgo.SetMinSeq(ctx, conversationID, userID, seq); err != nil {
return err
}
keys = append(keys, s.getSeqUserMinSeqKey(conversationID, userID))
}
return DeleteCacheBySlot(ctx, s.rocks, keys)
}
func (s *seqUserCacheRedis) setRedisReadSeqs(ctx context.Context, userID string, seqs map[string]int64) error {
keys := make([]string, 0, len(seqs))
keySeq := make(map[string]int64)
for conversationID, seq := range seqs {
key := s.getSeqUserReadSeqKey(conversationID, userID)
keys = append(keys, key)
keySeq[key] = seq
}
slotKeys, err := groupKeysBySlot(ctx, s.rdb, keys)
if err != nil {
return err
}
for _, keys := range slotKeys {
pipe := s.rdb.Pipeline()
for _, key := range keys {
pipe.HSet(ctx, key, "value", strconv.FormatInt(keySeq[key], 10))
pipe.Expire(ctx, key, s.readExpireTime)
}
if _, err := pipe.Exec(ctx); err != nil {
return err
}
}
return nil
}
func (s *seqUserCacheRedis) SetReadSeqs(ctx context.Context, userID string, seqs map[string]int64) error {
if len(seqs) == 0 {
return nil
}
if err := s.setRedisReadSeqs(ctx, userID, seqs); err != nil {
return err
}
for conversationID, seq := range seqs {
if seq%s.readSeqWriteRatio == 0 {
if err := s.mgo.SetReadSeq(ctx, conversationID, userID, seq); err != nil {
return err
}
}
}
return nil
}
func (s *seqUserCacheRedis) GetReadSeqs(ctx context.Context, userID string, conversationIDs []string) (map[string]int64, error) {
res, err := batchGetCache2(ctx, s.rocks, s.readExpireTime, conversationIDs, func(conversationID string) string {
return s.getSeqUserReadSeqKey(conversationID, userID)
}, func(v *readSeqModel) string {
return v.ConversationID
}, func(ctx context.Context, conversationIDs []string) ([]*readSeqModel, error) {
seqs, err := s.mgo.GetReadSeqs(ctx, userID, conversationIDs)
if err != nil {
return nil, err
}
res := make([]*readSeqModel, 0, len(seqs))
for conversationID, seq := range seqs {
res = append(res, &readSeqModel{ConversationID: conversationID, Seq: seq})
}
return res, nil
})
if err != nil {
return nil, err
}
data := make(map[string]int64)
for _, v := range res {
data[v.ConversationID] = v.Seq
}
return data, nil
}
var _ BatchCacheCallback[string] = (*readSeqModel)(nil)
type readSeqModel struct {
ConversationID string
Seq int64
}
func (r *readSeqModel) BatchCache(conversationID string) {
r.ConversationID = conversationID
}
func (r *readSeqModel) UnmarshalJSON(bytes []byte) (err error) {
r.Seq, err = strconv.ParseInt(string(bytes), 10, 64)
return
}
func (r *readSeqModel) MarshalJSON() ([]byte, error) {
return []byte(strconv.FormatInt(r.Seq, 10)), nil
}
+79
View File
@@ -0,0 +1,79 @@
package redis
import (
"context"
"fmt"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey"
"github.com/redis/go-redis/v9"
"log"
"strconv"
"sync/atomic"
"testing"
"time"
)
func newTestOnline() *userOnline {
opt := &redis.Options{
Addr: "172.16.8.48:16379",
Password: "openIM123",
DB: 0,
}
rdb := redis.NewClient(opt)
if err := rdb.Ping(context.Background()).Err(); err != nil {
panic(err)
}
return &userOnline{rdb: rdb, expire: time.Hour, channelName: "user_online"}
}
func TestOnline(t *testing.T) {
ts := newTestOnline()
var count atomic.Int64
for i := 0; i < 64; i++ {
go func(userID string) {
var err error
for i := 0; ; i++ {
if i%2 == 0 {
err = ts.SetUserOnline(context.Background(), userID, []int32{5, 6}, []int32{7, 8, 9})
} else {
err = ts.SetUserOnline(context.Background(), userID, []int32{1, 2, 3}, []int32{4, 5, 6})
}
if err != nil {
panic(err)
}
count.Add(1)
}
}(strconv.Itoa(10000 + i))
}
ticker := time.NewTicker(time.Second)
for range ticker.C {
t.Log(count.Swap(0))
}
}
func TestGetOnline(t *testing.T) {
ts := newTestOnline()
ctx := context.Background()
pIDs, err := ts.GetOnline(ctx, "10000")
if err != nil {
panic(err)
}
t.Log(pIDs)
}
func TestRecvOnline(t *testing.T) {
ts := newTestOnline()
ctx := context.Background()
pubsub := ts.rdb.Subscribe(ctx, cachekey.OnlineChannel)
_, err := pubsub.Receive(ctx)
if err != nil {
log.Fatalf("Could not subscribe: %v", err)
}
ch := pubsub.Channel()
for msg := range ch {
fmt.Printf("Received message from channel %s: %s\n", msg.Channel, msg.Payload)
}
}
+3 -185
View File
@@ -16,21 +16,14 @@ package redis
import (
"context"
"encoding/json"
"errors"
"github.com/dtm-labs/rockscache"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/protocol/constant"
"github.com/openimsdk/protocol/user"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/log"
"github.com/redis/go-redis/v9"
"hash/crc32"
"strconv"
"time"
)
@@ -61,8 +54,8 @@ func NewUserCacheRedis(rdb redis.UniversalClient, localCache *config.LocalCache,
}
}
func (u *UserCacheRedis) getOnlineStatusKey(modKey string) string {
return cachekey.GetOnlineStatusKey(modKey)
func (u *UserCacheRedis) getUserID(user *model.User) string {
return user.UserID
}
func (u *UserCacheRedis) CloneUserCache() cache.UserCache {
@@ -90,11 +83,7 @@ func (u *UserCacheRedis) GetUserInfo(ctx context.Context, userID string) (userIn
}
func (u *UserCacheRedis) GetUsersInfo(ctx context.Context, userIDs []string) ([]*model.User, error) {
return batchGetCache(ctx, u.rcClient, u.expireTime, userIDs, func(userID string) string {
return u.getUserInfoKey(userID)
}, func(ctx context.Context, userID string) (*model.User, error) {
return u.userDB.Take(ctx, userID)
})
return batchGetCache2(ctx, u.rcClient, u.expireTime, userIDs, u.getUserInfoKey, u.getUserID, u.userDB.Find)
}
func (u *UserCacheRedis) DelUsersInfo(userIDs ...string) cache.UserCache {
@@ -130,174 +119,3 @@ func (u *UserCacheRedis) DelUsersGlobalRecvMsgOpt(userIDs ...string) cache.UserC
return cache
}
// GetUserStatus get user status.
func (u *UserCacheRedis) GetUserStatus(ctx context.Context, userIDs []string) ([]*user.OnlineStatus, error) {
userStatus := make([]*user.OnlineStatus, 0, len(userIDs))
for _, userID := range userIDs {
UserIDNum := crc32.ChecksumIEEE([]byte(userID))
modKey := strconv.Itoa(int(UserIDNum % statusMod))
var onlineStatus user.OnlineStatus
key := u.getOnlineStatusKey(modKey)
result, err := u.rdb.HGet(ctx, key, userID).Result()
if err != nil {
if errors.Is(err, redis.Nil) {
// key or field does not exist
userStatus = append(userStatus, &user.OnlineStatus{
UserID: userID,
Status: constant.Offline,
PlatformIDs: nil,
})
continue
} else {
return nil, errs.Wrap(err)
}
}
err = json.Unmarshal([]byte(result), &onlineStatus)
if err != nil {
return nil, errs.Wrap(err)
}
onlineStatus.UserID = userID
onlineStatus.Status = constant.Online
userStatus = append(userStatus, &onlineStatus)
}
return userStatus, nil
}
// SetUserStatus Set the user status and save it in redis.
func (u *UserCacheRedis) SetUserStatus(ctx context.Context, userID string, status, platformID int32) error {
UserIDNum := crc32.ChecksumIEEE([]byte(userID))
modKey := strconv.Itoa(int(UserIDNum % statusMod))
key := u.getOnlineStatusKey(modKey)
log.ZDebug(ctx, "SetUserStatus args", "userID", userID, "status", status, "platformID", platformID, "modKey", modKey, "key", key)
isNewKey, err := u.rdb.Exists(ctx, key).Result()
if err != nil {
return errs.Wrap(err)
}
if isNewKey == 0 {
if status == constant.Online {
onlineStatus := user.OnlineStatus{
UserID: userID,
Status: constant.Online,
PlatformIDs: []int32{platformID},
}
jsonData, err := json.Marshal(&onlineStatus)
if err != nil {
return errs.Wrap(err)
}
_, err = u.rdb.HSet(ctx, key, userID, string(jsonData)).Result()
if err != nil {
return errs.Wrap(err)
}
u.rdb.Expire(ctx, key, userOlineStatusExpireTime)
return nil
}
}
isNil := false
result, err := u.rdb.HGet(ctx, key, userID).Result()
if err != nil {
if errors.Is(err, redis.Nil) {
isNil = true
} else {
return errs.Wrap(err)
}
}
if status == constant.Offline {
err = u.refreshStatusOffline(ctx, userID, status, platformID, isNil, err, result, key)
if err != nil {
return err
}
} else {
err = u.refreshStatusOnline(ctx, userID, platformID, isNil, err, result, key)
if err != nil {
return errs.Wrap(err)
}
}
return nil
}
func (u *UserCacheRedis) refreshStatusOffline(ctx context.Context, userID string, status, platformID int32, isNil bool, err error, result, key string) error {
if isNil {
log.ZWarn(ctx, "this user not online,maybe trigger order not right",
err, "userStatus", status)
return nil
}
var onlineStatus user.OnlineStatus
err = json.Unmarshal([]byte(result), &onlineStatus)
if err != nil {
return errs.Wrap(err)
}
var newPlatformIDs []int32
for _, val := range onlineStatus.PlatformIDs {
if val != platformID {
newPlatformIDs = append(newPlatformIDs, val)
}
}
if newPlatformIDs == nil {
_, err = u.rdb.HDel(ctx, key, userID).Result()
if err != nil {
return errs.Wrap(err)
}
} else {
onlineStatus.PlatformIDs = newPlatformIDs
newjsonData, err := json.Marshal(&onlineStatus)
if err != nil {
return errs.Wrap(err)
}
_, err = u.rdb.HSet(ctx, key, userID, string(newjsonData)).Result()
if err != nil {
return errs.Wrap(err)
}
}
return nil
}
func (u *UserCacheRedis) refreshStatusOnline(ctx context.Context, userID string, platformID int32, isNil bool, err error, result, key string) error {
var onlineStatus user.OnlineStatus
if !isNil {
err := json.Unmarshal([]byte(result), &onlineStatus)
if err != nil {
return errs.Wrap(err)
}
onlineStatus.PlatformIDs = RemoveRepeatedElementsInList(append(onlineStatus.PlatformIDs, platformID))
} else {
onlineStatus.PlatformIDs = append(onlineStatus.PlatformIDs, platformID)
}
onlineStatus.Status = constant.Online
onlineStatus.UserID = userID
newjsonData, err := json.Marshal(&onlineStatus)
if err != nil {
return errs.WrapMsg(err, "json.Marshal failed")
}
_, err = u.rdb.HSet(ctx, key, userID, string(newjsonData)).Result()
if err != nil {
return errs.Wrap(err)
}
return nil
}
type Comparable interface {
~int | ~string | ~float64 | ~int32
}
func RemoveRepeatedElementsInList[T Comparable](slc []T) []T {
var result []T
tempMap := map[T]struct{}{}
for _, e := range slc {
if _, found := tempMap[e]; !found {
tempMap[e] = struct{}{}
result = append(result, e)
}
}
return result
}