Merge remote-tracking branch 'origin/errcode' into errcode
# Conflicts: # internal/api/auth.go
This commit is contained in:
+4
-13
@@ -35,14 +35,10 @@ func NewGinRouter() *gin.Engine {
|
||||
userRouterGroup := r.Group("/user")
|
||||
{
|
||||
u := NewUser(zk)
|
||||
userRouterGroup.POST("/user_register", u.UserRegister)
|
||||
userRouterGroup.POST("/update_user_info", u.UpdateUserInfo) //1
|
||||
userRouterGroup.POST("/set_global_msg_recv_opt", u.SetGlobalRecvMessageOpt)
|
||||
userRouterGroup.POST("/get_users_info", u.GetUsersPublicInfo) //1
|
||||
userRouterGroup.POST("/get_self_user_info", u.GetSelfUserInfo) //1
|
||||
userRouterGroup.POST("/get_users_online_status", u.GetUsersOnlineStatus) //1
|
||||
userRouterGroup.POST("/get_users_info_from_cache", u.GetUsersInfoFromCache)
|
||||
userRouterGroup.POST("/get_user_friend_from_cache", u.GetFriendIDListFromCache)
|
||||
userRouterGroup.POST("/get_black_list_from_cache", u.GetBlackIDListFromCache)
|
||||
userRouterGroup.POST("/get_users_info", u.GetUsersPublicInfo) //1
|
||||
//userRouterGroup.POST("/get_all_users_uid", manage.GetAllUsersUid) // todo
|
||||
//userRouterGroup.POST("/account_check", manage.AccountCheck) // todo
|
||||
userRouterGroup.POST("/get_users", u.GetUsers)
|
||||
@@ -100,21 +96,16 @@ func NewGinRouter() *gin.Engine {
|
||||
authRouterGroup := r.Group("/auth")
|
||||
{
|
||||
a := NewAuth(zk)
|
||||
authRouterGroup.POST("/user_register", a.UserRegister) //1
|
||||
u := NewUser(zk)
|
||||
authRouterGroup.POST("/user_register", u.UserRegister) //1
|
||||
authRouterGroup.POST("/user_token", a.UserToken) //1
|
||||
authRouterGroup.POST("/parse_token", a.ParseToken) //1
|
||||
authRouterGroup.POST("/force_logout", a.ForceLogout) //1
|
||||
}
|
||||
|
||||
/*
|
||||
|
||||
*/
|
||||
|
||||
////Third service
|
||||
thirdGroup := r.Group("/third")
|
||||
{
|
||||
t := NewThird(zk)
|
||||
|
||||
thirdGroup.POST("/get_rtc_invitation_info", t.GetSignalInvitationInfo)
|
||||
thirdGroup.POST("/get_rtc_invitation_start_app", t.GetSignalInvitationInfoStartApp)
|
||||
thirdGroup.POST("/fcm_update_token", t.FcmUpdateToken)
|
||||
|
||||
+4
-20
@@ -27,6 +27,10 @@ func (o *User) client() (user.UserClient, error) {
|
||||
return user.NewUserClient(conn), nil
|
||||
}
|
||||
|
||||
func (o *User) UserRegister(c *gin.Context) {
|
||||
a2r.Call(user.UserClient.UserRegister, o.client, c)
|
||||
}
|
||||
|
||||
func (o *User) UpdateUserInfo(c *gin.Context) {
|
||||
a2r.Call(user.UserClient.UpdateUserInfo, o.client, c)
|
||||
}
|
||||
@@ -39,26 +43,6 @@ func (o *User) GetUsersPublicInfo(c *gin.Context) {
|
||||
a2r.Call(user.UserClient.GetDesignateUsers, o.client, c)
|
||||
}
|
||||
|
||||
func (o *User) GetSelfUserInfo(c *gin.Context) {
|
||||
a2r.Call(user.UserClient.GetSelfUserInfo, o.client, c)
|
||||
}
|
||||
|
||||
func (o *User) GetUsersOnlineStatus(c *gin.Context) {
|
||||
a2r.Call(user.UserClient.GetUsersOnlineStatus, o.client, c)
|
||||
}
|
||||
|
||||
func (o *User) GetUsersInfoFromCache(c *gin.Context) {
|
||||
a2r.Call(user.UserClient.GetUsersInfoFromCache, o.client, c)
|
||||
}
|
||||
|
||||
func (o *User) GetFriendIDListFromCache(c *gin.Context) {
|
||||
a2r.Call(user.UserClient.GetFriendIDListFromCache, o.client, c)
|
||||
}
|
||||
|
||||
func (o *User) GetBlackIDListFromCache(c *gin.Context) {
|
||||
a2r.Call(user.UserClient.GetBlackIDListFromCache, o.client, c)
|
||||
}
|
||||
|
||||
//func (u *User) GetAllUsersUid(c *gin.Context) {
|
||||
// a2r.Call(user.UserClient.GetAllUsersUid, u.client, c)
|
||||
//}
|
||||
|
||||
@@ -1,52 +0,0 @@
|
||||
package common
|
||||
|
||||
import (
|
||||
"OpenIM/internal/api2rpc"
|
||||
"OpenIM/pkg/proto/group"
|
||||
"fmt"
|
||||
"github.com/gin-gonic/gin"
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
type AReq struct {
|
||||
}
|
||||
|
||||
type AResp struct {
|
||||
}
|
||||
|
||||
func KickGroupMember(c *gin.Context) {
|
||||
// 默认 全部自动
|
||||
api2rpc.NewRpc(api2rpc.NewGin[AReq, AResp](c), group.NewGroupClient, group.GroupClient.KickGroupMember).Call()
|
||||
|
||||
//// 可以自定义编辑请求和响应
|
||||
//a := NewRpc(NewGin[apistruct.KickGroupMemberReq, apistruct.KickGroupMemberResp](c), group.NewGroupClient, group.GroupClient.KickGroupMember)
|
||||
//a.Before(func(apiReq *apistruct.KickGroupMemberReq, rpcReq *group.KickGroupMemberReq, bind func() error) error {
|
||||
// return bind()
|
||||
//}).After(func(rpcResp *group.KickGroupMemberResp, apiResp *apistruct.KickGroupMemberResp, bind func() error) error {
|
||||
// return bind()
|
||||
//}).Name("group").Call()
|
||||
}
|
||||
|
||||
//func getInterfaceName(handler PackerHandler) string {
|
||||
// funcInfo := runtime.FuncForPC(reflect.ValueOf(handler).Pointer())
|
||||
// name := funcInfo.Name()
|
||||
// names := strings.Split(name, "/")
|
||||
// if len(names) == 0 {
|
||||
// return ""
|
||||
// }
|
||||
//
|
||||
// return names[len(names)-1]
|
||||
//}
|
||||
|
||||
func TestName(t *testing.T) {
|
||||
n := 100000000
|
||||
start := time.Now().UnixNano()
|
||||
for i := 0; i < n; i++ {
|
||||
var val group.GroupClient
|
||||
reflect.TypeOf(&val).Elem().String()
|
||||
}
|
||||
end := time.Now().UnixNano()
|
||||
fmt.Println(time.Duration(end-start) / time.Duration(n))
|
||||
}
|
||||
@@ -3,8 +3,8 @@ package check
|
||||
import (
|
||||
"OpenIM/pkg/common/config"
|
||||
"OpenIM/pkg/common/constant"
|
||||
discoveryRegistry "OpenIM/pkg/discoveryregistry"
|
||||
sdkws "OpenIM/pkg/proto/sdkws"
|
||||
"OpenIM/pkg/discoveryregistry"
|
||||
"OpenIM/pkg/proto/sdkws"
|
||||
"OpenIM/pkg/proto/user"
|
||||
"OpenIM/pkg/utils"
|
||||
"context"
|
||||
@@ -12,18 +12,18 @@ import (
|
||||
"strings"
|
||||
)
|
||||
|
||||
func NewUserCheck(zk discoveryRegistry.SvcDiscoveryRegistry) *UserCheck {
|
||||
func NewUserCheck(client discoveryregistry.SvcDiscoveryRegistry) *UserCheck {
|
||||
return &UserCheck{
|
||||
zk: zk,
|
||||
client: client,
|
||||
}
|
||||
}
|
||||
|
||||
type UserCheck struct {
|
||||
zk discoveryRegistry.SvcDiscoveryRegistry
|
||||
client discoveryregistry.SvcDiscoveryRegistry
|
||||
}
|
||||
|
||||
func (u *UserCheck) getConn() (*grpc.ClientConn, error) {
|
||||
return u.zk.GetConn(config.Config.RpcRegisterName.OpenImUserName)
|
||||
return u.client.GetConn(config.Config.RpcRegisterName.OpenImUserName)
|
||||
}
|
||||
|
||||
func (u *UserCheck) GetUsersInfos(ctx context.Context, userIDs []string, complete bool) ([]*sdkws.UserInfo, error) {
|
||||
@@ -107,5 +107,8 @@ func (u *UserCheck) GetUserGlobalMsgRecvOpt(ctx context.Context, userID string)
|
||||
resp, err := user.NewUserClient(cc).GetGlobalRecvMessageOpt(ctx, &user.GetGlobalRecvMessageOptReq{
|
||||
UserID: userID,
|
||||
})
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return resp.GlobalRecvMsgOpt, err
|
||||
}
|
||||
|
||||
@@ -14,7 +14,7 @@ func (c *Check) ExtendMessageUpdatedNotification(ctx context.Context, sendID str
|
||||
req *msg.SetMessageReactionExtensionsReq, resp *msg.SetMessageReactionExtensionsResp, isHistory bool, isReactionFromCache bool) {
|
||||
var m apistruct.ReactionMessageModifierNotification
|
||||
m.SourceID = req.SourceID
|
||||
m.OpUserID = req.OpUserID
|
||||
m.OpUserID = tracelog.GetOpUserID(ctx)
|
||||
m.SessionType = req.SessionType
|
||||
keyMap := make(map[string]*sdkws.KeyValue)
|
||||
for _, valueResp := range resp.Result {
|
||||
@@ -33,7 +33,7 @@ func (c *Check) ExtendMessageUpdatedNotification(ctx context.Context, sendID str
|
||||
c.messageReactionSender(ctx, sendID, sourceID, sessionType, constant.ReactionMessageModifier, utils.StructToJsonString(m), isHistory, isReactionFromCache)
|
||||
}
|
||||
func (c *Check) ExtendMessageDeleteNotification(ctx context.Context, sendID string, sourceID string, sessionType int32,
|
||||
req *msg.DeleteMessageListReactionExtensionsReq, resp *msg.DeleteMessageListReactionExtensionsResp, isHistory bool, isReactionFromCache bool) {
|
||||
req *msg.DeleteMessagesReactionExtensionsReq, resp *msg.DeleteMessagesReactionExtensionsResp, isHistory bool, isReactionFromCache bool) {
|
||||
var m apistruct.ReactionMessageDeleteNotification
|
||||
m.SourceID = req.SourceID
|
||||
m.OpUserID = req.OpUserID
|
||||
|
||||
@@ -1,46 +0,0 @@
|
||||
package rpcserver
|
||||
|
||||
import (
|
||||
"OpenIM/internal/common/network"
|
||||
"OpenIM/pkg/common/constant"
|
||||
"OpenIM/pkg/common/log"
|
||||
discoveryRegistry "OpenIM/pkg/discoveryregistry"
|
||||
"github.com/OpenIMSDK/openKeeper"
|
||||
"net"
|
||||
"strconv"
|
||||
)
|
||||
|
||||
type RpcServer struct {
|
||||
Port int
|
||||
RegisterName string
|
||||
RegisterCenter discoveryRegistry.SvcDiscoveryRegistry
|
||||
}
|
||||
|
||||
func NewRpcServer(registerIPInConfig string, port int, registerName string, zkServers []string, zkRoot string) (*RpcServer, error) {
|
||||
log.NewPrivateLog(constant.LogFileName)
|
||||
s := &RpcServer{
|
||||
Port: port,
|
||||
RegisterName: registerName,
|
||||
}
|
||||
|
||||
zkClient, err := openKeeper.NewClient(zkServers, zkRoot, 10, "", "")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
registerIP, err := network.GetRpcRegisterIP(registerIPInConfig)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
s.RegisterCenter = zkClient
|
||||
err = s.RegisterCenter.Register(s.RegisterName, registerIP, s.Port)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return s, nil
|
||||
}
|
||||
|
||||
func GetTcpListen(listenIPInConfig string, port int) (net.Listener, string, error) {
|
||||
address := network.GetListenIP(listenIPInConfig) + ":" + strconv.Itoa(port)
|
||||
listener, err := net.Listen("tcp", address)
|
||||
return listener, address, err
|
||||
}
|
||||
@@ -4,10 +4,10 @@ import (
|
||||
"OpenIM/pkg/common/config"
|
||||
"OpenIM/pkg/common/constant"
|
||||
"OpenIM/pkg/common/log"
|
||||
prome "OpenIM/pkg/common/prome"
|
||||
"OpenIM/pkg/common/prome"
|
||||
"OpenIM/pkg/common/tokenverify"
|
||||
"OpenIM/pkg/proto/msggateway"
|
||||
sdkws "OpenIM/pkg/proto/sdkws"
|
||||
"OpenIM/pkg/proto/sdkws"
|
||||
"OpenIM/pkg/utils"
|
||||
"bytes"
|
||||
"context"
|
||||
@@ -44,12 +44,11 @@ func initPrometheus() {
|
||||
|
||||
func (r *RPCServer) onInit(rpcPort int) {
|
||||
r.rpcPort = rpcPort
|
||||
r.rpcRegisterName = config.Config.RpcRegisterName.OpenImRelayName
|
||||
r.etcdSchema = config.Config.Etcd.EtcdSchema
|
||||
r.etcdAddr = config.Config.Etcd.EtcdAddr
|
||||
r.rpcRegisterName = config.Config.RpcRegisterName.OpenImMessageGatewayName
|
||||
r.platformList = genPlatformArray()
|
||||
r.pushTerminal = []int{constant.IOSPlatformID, constant.AndroidPlatformID}
|
||||
}
|
||||
|
||||
func (r *RPCServer) run() {
|
||||
listenIP := ""
|
||||
if config.Config.ListenIP == "" {
|
||||
@@ -76,7 +75,7 @@ func (r *RPCServer) run() {
|
||||
}
|
||||
srv := grpc.NewServer(grpcOpts...)
|
||||
defer srv.GracefulStop()
|
||||
msggateway.RegisterRelayServer(srv, r)
|
||||
msggateway.RegisterMsgGatewayServer(srv, r)
|
||||
|
||||
rpcRegisterIP := config.Config.RpcRegisterIP
|
||||
if config.Config.RpcRegisterIP == "" {
|
||||
@@ -97,7 +96,7 @@ func (r *RPCServer) run() {
|
||||
return
|
||||
}
|
||||
}
|
||||
func (r *RPCServer) OnlinePushMsg(_ context.Context, in *msggateway.OnlinePushMsgReq) (*msggateway.OnlinePushMsgResp, error) {
|
||||
func (r *RPCServer) OnlinePushMsg(ctx context.Context, in *msggateway.OnlinePushMsgReq) (*msggateway.OnlinePushMsgResp, error) {
|
||||
log.NewInfo(in.OperationID, "PushMsgToUser is arriving", in.String())
|
||||
var resp []*msggateway.SingleMsgToUserPlatform
|
||||
msgBytes, _ := proto.Marshal(in.MsgData)
|
||||
|
||||
@@ -70,8 +70,8 @@ func (ws *WServer) run() {
|
||||
func (ws *WServer) wsHandler(w http.ResponseWriter, r *http.Request) {
|
||||
query := r.URL.Query()
|
||||
operationID := ""
|
||||
if len(query["operationID"]) != 0 {
|
||||
operationID = query["operationID"][0]
|
||||
if len(query[constant.OperationID]) != 0 {
|
||||
operationID = query[constant.OperationID][0]
|
||||
} else {
|
||||
operationID = utils.OperationIDGenerator()
|
||||
}
|
||||
|
||||
@@ -2,27 +2,54 @@ package msgtransfer
|
||||
|
||||
import (
|
||||
"OpenIM/pkg/common/config"
|
||||
"OpenIM/pkg/common/db/cache"
|
||||
"OpenIM/pkg/common/db/controller"
|
||||
"OpenIM/pkg/common/db/relation"
|
||||
relationTb "OpenIM/pkg/common/db/table/relation"
|
||||
"OpenIM/pkg/common/db/unrelation"
|
||||
"OpenIM/pkg/common/prome"
|
||||
"fmt"
|
||||
)
|
||||
|
||||
type MsgTransfer struct {
|
||||
persistentCH PersistentConsumerHandler // 聊天记录持久化到mysql的消费者 订阅的topic: ws2ms_chat
|
||||
historyCH OnlineHistoryRedisConsumerHandler // 这个消费者聚合消息, 订阅的topic:ws2ms_chat, 修改通知发往msg_to_modify topic, 消息存入redis后Incr Redis, 再发消息到ms2pschat topic推送, 发消息到msg_to_mongo topic持久化
|
||||
historyMongoCH OnlineHistoryMongoConsumerHandler // mongoDB批量插入, 成功后删除redis中消息,以及处理删除通知消息删除的 订阅的topic: msg_to_mongo
|
||||
modifyCH ModifyMsgConsumerHandler // 负责消费修改消息通知的consumer, 订阅的topic: msg_to_modify
|
||||
persistentCH *PersistentConsumerHandler // 聊天记录持久化到mysql的消费者 订阅的topic: ws2ms_chat
|
||||
historyCH *OnlineHistoryRedisConsumerHandler // 这个消费者聚合消息, 订阅的topic:ws2ms_chat, 修改通知发往msg_to_modify topic, 消息存入redis后Incr Redis, 再发消息到ms2pschat topic推送, 发消息到msg_to_mongo topic持久化
|
||||
historyMongoCH *OnlineHistoryMongoConsumerHandler // mongoDB批量插入, 成功后删除redis中消息,以及处理删除通知消息删除的 订阅的topic: msg_to_mongo
|
||||
modifyCH *ModifyMsgConsumerHandler // 负责消费修改消息通知的consumer, 订阅的topic: msg_to_modify
|
||||
}
|
||||
|
||||
func NewMsgTransfer() *MsgTransfer {
|
||||
msgTransfer := &MsgTransfer{}
|
||||
msgTransfer.persistentCH.Init()
|
||||
msgTransfer.historyCH.Init()
|
||||
msgTransfer.historyMongoCH.Init()
|
||||
msgTransfer.modifyCH.Init()
|
||||
if config.Config.Prometheus.Enable {
|
||||
msgTransfer.initPrometheus()
|
||||
func StartTransfer(prometheusPort int) error {
|
||||
db, err := relation.NewGormDB()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return msgTransfer
|
||||
if err := db.AutoMigrate(&relationTb.ChatLogModel{}); err != nil {
|
||||
return err
|
||||
}
|
||||
rdb, err := cache.NewRedis()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
mongo, err := unrelation.NewMongo()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
cacheModel := cache.NewCacheModel(rdb)
|
||||
msgDocModel := unrelation.NewMsgMongoDriver(mongo.GetDatabase())
|
||||
extendMsgModel := unrelation.NewExtendMsgSetMongoDriver(mongo.GetDatabase())
|
||||
|
||||
chatLogDatabase := controller.NewChatLogDatabase(relation.NewChatLogGorm(db))
|
||||
extendMsgDatabase := controller.NewExtendMsgDatabase(extendMsgModel)
|
||||
msgDatabase := controller.NewMsgDatabase(msgDocModel, cacheModel)
|
||||
|
||||
msgTransfer := NewMsgTransfer(chatLogDatabase, extendMsgDatabase, msgDatabase)
|
||||
msgTransfer.initPrometheus()
|
||||
return msgTransfer.Start(prometheusPort)
|
||||
}
|
||||
|
||||
func NewMsgTransfer(chatLogDatabase controller.ChatLogDatabase, extendMsgDatabase controller.ExtendMsgDatabase, msgDatabase controller.MsgDatabase) *MsgTransfer {
|
||||
return &MsgTransfer{persistentCH: NewPersistentConsumerHandler(chatLogDatabase), historyCH: NewOnlineHistoryRedisConsumerHandler(msgDatabase),
|
||||
historyMongoCH: NewOnlineHistoryMongoConsumerHandler(msgDatabase), modifyCH: NewModifyMsgConsumerHandler(extendMsgDatabase)}
|
||||
}
|
||||
|
||||
func (m *MsgTransfer) initPrometheus() {
|
||||
@@ -36,19 +63,18 @@ func (m *MsgTransfer) initPrometheus() {
|
||||
prome.NewMsgInsertMongoFailedCounter()
|
||||
}
|
||||
|
||||
func (m *MsgTransfer) Run(promePort int) {
|
||||
func (m *MsgTransfer) Start(prometheusPort int) error {
|
||||
if config.Config.ChatPersistenceMysql {
|
||||
go m.persistentCH.persistentConsumerGroup.RegisterHandleAndConsumer(&m.persistentCH)
|
||||
go m.persistentCH.persistentConsumerGroup.RegisterHandleAndConsumer(m.persistentCH)
|
||||
} else {
|
||||
fmt.Println("msg transfer not start mysql consumer")
|
||||
}
|
||||
go m.historyCH.historyConsumerGroup.RegisterHandleAndConsumer(&m.historyCH)
|
||||
go m.historyMongoCH.historyConsumerGroup.RegisterHandleAndConsumer(&m.historyMongoCH)
|
||||
go m.modifyCH.modifyMsgConsumerGroup.RegisterHandleAndConsumer(&m.modifyCH)
|
||||
go func() {
|
||||
err := prome.StartPrometheusSrv(promePort)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}()
|
||||
go m.historyCH.historyConsumerGroup.RegisterHandleAndConsumer(m.historyCH)
|
||||
go m.historyMongoCH.historyConsumerGroup.RegisterHandleAndConsumer(m.historyMongoCH)
|
||||
go m.modifyCH.modifyMsgConsumerGroup.RegisterHandleAndConsumer(m.modifyCH)
|
||||
err := prome.StartPrometheusSrv(prometheusPort)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -25,10 +25,13 @@ type ModifyMsgConsumerHandler struct {
|
||||
extendMsgDatabase controller.ExtendMsgDatabase
|
||||
}
|
||||
|
||||
func (mmc *ModifyMsgConsumerHandler) Init() {
|
||||
mmc.modifyMsgConsumerGroup = kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V2_0_0_0,
|
||||
OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.MsgToModify.Topic},
|
||||
config.Config.Kafka.MsgToModify.Addr, config.Config.Kafka.ConsumerGroupID.MsgToModify)
|
||||
func NewModifyMsgConsumerHandler(database controller.ExtendMsgDatabase) *ModifyMsgConsumerHandler {
|
||||
return &ModifyMsgConsumerHandler{
|
||||
modifyMsgConsumerGroup: kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V2_0_0_0,
|
||||
OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.MsgToModify.Topic},
|
||||
config.Config.Kafka.MsgToModify.Addr, config.Config.Kafka.ConsumerGroupID.MsgToModify),
|
||||
extendMsgDatabase: database,
|
||||
}
|
||||
}
|
||||
|
||||
func (ModifyMsgConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil }
|
||||
@@ -38,7 +41,8 @@ func (mmc *ModifyMsgConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSessi
|
||||
for msg := range claim.Messages() {
|
||||
log.NewDebug("", "kafka get info to mysql", "ModifyMsgConsumerHandler", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value), "key", string(msg.Key))
|
||||
if len(msg.Value) != 0 {
|
||||
mmc.ModifyMsg(msg, string(msg.Key), sess)
|
||||
ctx := mmc.modifyMsgConsumerGroup.GetContextFromMsg(msg)
|
||||
mmc.ModifyMsg(ctx, msg, string(msg.Key), sess)
|
||||
} else {
|
||||
log.Error("", "msg get from kafka but is nil", msg.Key)
|
||||
}
|
||||
@@ -47,29 +51,30 @@ func (mmc *ModifyMsgConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSessi
|
||||
return nil
|
||||
}
|
||||
|
||||
func (mmc *ModifyMsgConsumerHandler) ModifyMsg(cMsg *sarama.ConsumerMessage, msgKey string, _ sarama.ConsumerGroupSession) {
|
||||
func (mmc *ModifyMsgConsumerHandler) ModifyMsg(ctx context.Context, cMsg *sarama.ConsumerMessage, msgKey string, _ sarama.ConsumerGroupSession) {
|
||||
log.NewInfo("msg come here ModifyMsg!!!", "", "msg", string(cMsg.Value), msgKey)
|
||||
msgFromMQ := pbMsg.MsgDataToModifyByMQ{}
|
||||
operationID := tracelog.GetOperationID(ctx)
|
||||
err := proto.Unmarshal(cMsg.Value, &msgFromMQ)
|
||||
if err != nil {
|
||||
log.NewError(msgFromMQ.TriggerID, "msg_transfer Unmarshal msg err", "msg", string(cMsg.Value), "err", err.Error())
|
||||
return
|
||||
}
|
||||
log.Debug(msgFromMQ.TriggerID, "proto.Unmarshal MsgDataToMQ", msgFromMQ.String())
|
||||
for _, msgDataToMQ := range msgFromMQ.MessageList {
|
||||
for _, msgDataToMQ := range msgFromMQ.Messages {
|
||||
isReactionFromCache := utils.GetSwitchFromOptions(msgDataToMQ.MsgData.Options, constant.IsReactionFromCache)
|
||||
if !isReactionFromCache {
|
||||
continue
|
||||
}
|
||||
ctx := context.Background()
|
||||
tracelog.SetOperationID(ctx, msgDataToMQ.OperationID)
|
||||
tracelog.SetOperationID(ctx, operationID)
|
||||
if msgDataToMQ.MsgData.ContentType == constant.ReactionMessageModifier {
|
||||
notification := &apistruct.ReactionMessageModifierNotification{}
|
||||
if err := json.Unmarshal(msgDataToMQ.MsgData.Content, notification); err != nil {
|
||||
continue
|
||||
}
|
||||
if notification.IsExternalExtensions {
|
||||
log.NewInfo(msgDataToMQ.OperationID, "msg:", notification, "this is external extensions")
|
||||
log.NewInfo(operationID, "msg:", notification, "this is external extensions")
|
||||
continue
|
||||
}
|
||||
if !notification.IsReact {
|
||||
@@ -89,7 +94,7 @@ func (mmc *ModifyMsgConsumerHandler) ModifyMsg(cMsg *sarama.ConsumerMessage, msg
|
||||
}
|
||||
|
||||
if err := mmc.extendMsgDatabase.InsertExtendMsg(ctx, notification.SourceID, notification.SessionType, &extendMsg); err != nil {
|
||||
log.NewError(msgDataToMQ.OperationID, "MsgFirstModify InsertExtendMsg failed", notification.SourceID, notification.SessionType, extendMsg, err.Error())
|
||||
log.NewError(operationID, "MsgFirstModify InsertExtendMsg failed", notification.SourceID, notification.SessionType, extendMsg, err.Error())
|
||||
continue
|
||||
}
|
||||
} else {
|
||||
@@ -103,7 +108,7 @@ func (mmc *ModifyMsgConsumerHandler) ModifyMsg(cMsg *sarama.ConsumerMessage, msg
|
||||
}
|
||||
// is already modify
|
||||
if err := mmc.extendMsgDatabase.InsertOrUpdateReactionExtendMsgSet(ctx, notification.SourceID, notification.SessionType, notification.ClientMsgID, notification.MsgFirstModifyTime, reactionExtensionList); err != nil {
|
||||
log.NewError(msgDataToMQ.OperationID, "InsertOrUpdateReactionExtendMsgSet failed")
|
||||
log.NewError(operationID, "InsertOrUpdateReactionExtendMsgSet failed")
|
||||
}
|
||||
}
|
||||
} else if msgDataToMQ.MsgData.ContentType == constant.ReactionMessageDeleter {
|
||||
@@ -112,7 +117,7 @@ func (mmc *ModifyMsgConsumerHandler) ModifyMsg(cMsg *sarama.ConsumerMessage, msg
|
||||
continue
|
||||
}
|
||||
if err := mmc.extendMsgDatabase.DeleteReactionExtendMsgSet(ctx, notification.SourceID, notification.SessionType, notification.ClientMsgID, notification.MsgFirstModifyTime, notification.SuccessReactionExtensionList); err != nil {
|
||||
log.NewError(msgDataToMQ.OperationID, "InsertOrUpdateReactionExtendMsgSet failed")
|
||||
log.NewError(operationID, "InsertOrUpdateReactionExtendMsgSet failed")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,13 +3,11 @@ package msgtransfer
|
||||
import (
|
||||
"OpenIM/pkg/common/config"
|
||||
"OpenIM/pkg/common/constant"
|
||||
"OpenIM/pkg/common/db/cache"
|
||||
"OpenIM/pkg/common/db/controller"
|
||||
"OpenIM/pkg/common/kafka"
|
||||
"OpenIM/pkg/common/log"
|
||||
"OpenIM/pkg/common/tracelog"
|
||||
pbMsg "OpenIM/pkg/proto/msg"
|
||||
pbPush "OpenIM/pkg/proto/push"
|
||||
"OpenIM/pkg/statistics"
|
||||
"OpenIM/pkg/utils"
|
||||
"context"
|
||||
@@ -57,10 +55,11 @@ type OnlineHistoryRedisConsumerHandler struct {
|
||||
producerToMongo *kafka.Producer
|
||||
|
||||
msgDatabase controller.MsgDatabase
|
||||
cache cache.Cache
|
||||
}
|
||||
|
||||
func (och *OnlineHistoryRedisConsumerHandler) Init() {
|
||||
func NewOnlineHistoryRedisConsumerHandler(database controller.MsgDatabase) *OnlineHistoryRedisConsumerHandler {
|
||||
var och OnlineHistoryRedisConsumerHandler
|
||||
och.msgDatabase = database
|
||||
och.msgDistributionCh = make(chan Cmd2Value) //no buffer channel
|
||||
go och.MessagesDistributionHandle()
|
||||
for i := 0; i < ChannelNum; i++ {
|
||||
@@ -73,8 +72,8 @@ func (och *OnlineHistoryRedisConsumerHandler) Init() {
|
||||
och.historyConsumerGroup = kafka.NewMConsumerGroup(&kafka.MConsumerGroupConfig{KafkaVersion: sarama.V2_0_0_0,
|
||||
OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.Ws2mschat.Topic},
|
||||
config.Config.Kafka.Ws2mschat.Addr, config.Config.Kafka.ConsumerGroupID.MsgToRedis)
|
||||
|
||||
statistics.NewStatistics(&och.singleMsgSuccessCount, config.Config.ModuleName.MsgTransferName, fmt.Sprintf("%d second singleMsgCount insert to mongo", constant.StatisticsTimeInterval), constant.StatisticsTimeInterval)
|
||||
return &och
|
||||
}
|
||||
|
||||
func (och *OnlineHistoryRedisConsumerHandler) Run(channelID int) {
|
||||
@@ -242,17 +241,17 @@ func (och *OnlineHistoryRedisConsumerHandler) ConsumeClaim(sess sarama.ConsumerG
|
||||
}
|
||||
|
||||
func (och *OnlineHistoryRedisConsumerHandler) sendMessageToPushMQ(ctx context.Context, message *pbMsg.MsgDataToMQ, pushToUserID string) {
|
||||
mqPushMsg := pbMsg.PushMsgDataToMQ{OperationID: message.OperationID, MsgData: message.MsgData, PushToUserID: pushToUserID}
|
||||
pid, offset, err := och.producerToPush.SendMessage(ctx, &mqPushMsg, mqPushMsg.PushToUserID)
|
||||
mqPushMsg := pbMsg.PushMsgDataToMQ{MsgData: message.MsgData, SourceID: pushToUserID}
|
||||
pid, offset, err := och.producerToPush.SendMessage(ctx, &mqPushMsg, mqPushMsg.SourceID)
|
||||
if err != nil {
|
||||
log.Error(mqPushMsg.OperationID, "kafka send failed", "send data", message.String(), "pid", pid, "offset", offset, "err", err.Error())
|
||||
log.Error(tracelog.GetOperationID(ctx), "kafka send failed", "send data", message.String(), "pid", pid, "offset", offset, "err", err.Error())
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (och *OnlineHistoryRedisConsumerHandler) sendMessageToModifyMQ(ctx context.Context, aggregationID string, triggerID string, messages []*pbMsg.MsgDataToMQ) {
|
||||
if len(messages) > 0 {
|
||||
pid, offset, err := och.producerToModify.SendMessage(ctx, &pbMsg.MsgDataToModifyByMQ{AggregationID: aggregationID, MessageList: messages, TriggerID: triggerID}, aggregationID)
|
||||
pid, offset, err := och.producerToModify.SendMessage(ctx, &pbMsg.MsgDataToModifyByMQ{AggregationID: aggregationID, Messages: messages, TriggerID: triggerID}, aggregationID)
|
||||
if err != nil {
|
||||
log.Error(triggerID, "kafka send failed", "send data", len(messages), "pid", pid, "offset", offset, "err", err.Error(), "key", aggregationID)
|
||||
}
|
||||
@@ -261,7 +260,7 @@ func (och *OnlineHistoryRedisConsumerHandler) sendMessageToModifyMQ(ctx context.
|
||||
|
||||
func (och *OnlineHistoryRedisConsumerHandler) SendMessageToMongoCH(ctx context.Context, aggregationID string, triggerID string, messages []*pbMsg.MsgDataToMQ, lastSeq int64) {
|
||||
if len(messages) > 0 {
|
||||
pid, offset, err := och.producerToMongo.SendMessage(ctx, &pbMsg.MsgDataToMongoByMQ{LastSeq: lastSeq, AggregationID: aggregationID, MessageList: messages, TriggerID: triggerID}, aggregationID)
|
||||
pid, offset, err := och.producerToMongo.SendMessage(ctx, &pbMsg.MsgDataToMongoByMQ{LastSeq: lastSeq, AggregationID: aggregationID, Messages: messages, TriggerID: triggerID}, aggregationID)
|
||||
if err != nil {
|
||||
log.Error(triggerID, "kafka send failed", "send data", len(messages), "pid", pid, "offset", offset, "err", err.Error(), "key", aggregationID)
|
||||
}
|
||||
|
||||
@@ -3,7 +3,6 @@ package msgtransfer
|
||||
import (
|
||||
"OpenIM/pkg/common/config"
|
||||
"OpenIM/pkg/common/constant"
|
||||
"OpenIM/pkg/common/db/cache"
|
||||
"OpenIM/pkg/common/db/controller"
|
||||
kfk "OpenIM/pkg/common/kafka"
|
||||
"OpenIM/pkg/common/log"
|
||||
@@ -19,66 +18,68 @@ import (
|
||||
type OnlineHistoryMongoConsumerHandler struct {
|
||||
historyConsumerGroup *kfk.MConsumerGroup
|
||||
msgDatabase controller.MsgDatabase
|
||||
cache cache.Cache
|
||||
}
|
||||
|
||||
func (mc *OnlineHistoryMongoConsumerHandler) Init() {
|
||||
mc.historyConsumerGroup = kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V2_0_0_0,
|
||||
OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.MsgToMongo.Topic},
|
||||
config.Config.Kafka.Ws2mschat.Addr, config.Config.Kafka.ConsumerGroupID.MsgToMongo)
|
||||
func NewOnlineHistoryMongoConsumerHandler(database controller.MsgDatabase) *OnlineHistoryMongoConsumerHandler {
|
||||
mc := &OnlineHistoryMongoConsumerHandler{
|
||||
historyConsumerGroup: kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V2_0_0_0,
|
||||
OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.MsgToMongo.Topic},
|
||||
config.Config.Kafka.Ws2mschat.Addr, config.Config.Kafka.ConsumerGroupID.MsgToMongo),
|
||||
msgDatabase: database,
|
||||
}
|
||||
return mc
|
||||
}
|
||||
func (mc *OnlineHistoryMongoConsumerHandler) handleChatWs2Mongo(cMsg *sarama.ConsumerMessage, msgKey string, _ sarama.ConsumerGroupSession) {
|
||||
|
||||
func (mc *OnlineHistoryMongoConsumerHandler) handleChatWs2Mongo(ctx context.Context, cMsg *sarama.ConsumerMessage, msgKey string, session sarama.ConsumerGroupSession) {
|
||||
msg := cMsg.Value
|
||||
msgFromMQ := pbMsg.MsgDataToMongoByMQ{}
|
||||
operationID := tracelog.GetOperationID(ctx)
|
||||
err := proto.Unmarshal(msg, &msgFromMQ)
|
||||
if err != nil {
|
||||
log.Error("msg_transfer Unmarshal msg err", "", "msg", string(msg), "err", err.Error())
|
||||
return
|
||||
}
|
||||
log.Info(msgFromMQ.TriggerID, "BatchInsertChat2DB userID: ", msgFromMQ.AggregationID, "msgFromMQ.LastSeq: ", msgFromMQ.LastSeq)
|
||||
ctx := context.Background()
|
||||
tracelog.SetOperationID(ctx, msgFromMQ.TriggerID)
|
||||
log.Info(operationID, "BatchInsertChat2DB userID: ", msgFromMQ.AggregationID, "msgFromMQ.LastSeq: ", msgFromMQ.LastSeq)
|
||||
//err = db.DB.BatchInsertChat2DB(msgFromMQ.AggregationID, msgFromMQ.MessageList, msgFromMQ.TriggerID, msgFromMQ.LastSeq)
|
||||
err = mc.msgDatabase.BatchInsertChat2DB(ctx, msgFromMQ.AggregationID, msgFromMQ.MessageList, msgFromMQ.LastSeq)
|
||||
err = mc.msgDatabase.BatchInsertChat2DB(ctx, msgFromMQ.AggregationID, msgFromMQ.Messages, msgFromMQ.LastSeq)
|
||||
if err != nil {
|
||||
log.NewError(msgFromMQ.TriggerID, "single data insert to mongo err", err.Error(), msgFromMQ.MessageList, msgFromMQ.AggregationID, msgFromMQ.TriggerID)
|
||||
log.NewError(operationID, "single data insert to mongo err", err.Error(), msgFromMQ.Messages, msgFromMQ.AggregationID, msgFromMQ.TriggerID)
|
||||
}
|
||||
//err = db.DB.DeleteMessageFromCache(msgFromMQ.MessageList, msgFromMQ.AggregationID, msgFromMQ.GetTriggerID())
|
||||
err = mc.msgDatabase.DeleteMessageFromCache(ctx, msgFromMQ.AggregationID, msgFromMQ.MessageList)
|
||||
err = mc.msgDatabase.DeleteMessageFromCache(ctx, msgFromMQ.AggregationID, msgFromMQ.Messages)
|
||||
if err != nil {
|
||||
log.NewError(msgFromMQ.TriggerID, "remove cache msg from redis err", err.Error(), msgFromMQ.MessageList, msgFromMQ.AggregationID, msgFromMQ.TriggerID)
|
||||
log.NewError(operationID, "remove cache msg from redis err", err.Error(), msgFromMQ.Messages, msgFromMQ.AggregationID, msgFromMQ.TriggerID)
|
||||
}
|
||||
for _, v := range msgFromMQ.MessageList {
|
||||
for _, v := range msgFromMQ.Messages {
|
||||
if v.MsgData.ContentType == constant.DeleteMessageNotification {
|
||||
tips := sdkws.TipsComm{}
|
||||
DeleteMessageTips := sdkws.DeleteMessageTips{}
|
||||
err := proto.Unmarshal(v.MsgData.Content, &tips)
|
||||
if err != nil {
|
||||
log.NewError(msgFromMQ.TriggerID, "tips unmarshal err:", err.Error(), v.String())
|
||||
log.NewError(operationID, "tips unmarshal err:", err.Error(), v.String())
|
||||
continue
|
||||
}
|
||||
err = proto.Unmarshal(tips.Detail, &DeleteMessageTips)
|
||||
if err != nil {
|
||||
log.NewError(msgFromMQ.TriggerID, "deleteMessageTips unmarshal err:", err.Error(), v.String())
|
||||
log.NewError(operationID, "deleteMessageTips unmarshal err:", err.Error(), v.String())
|
||||
continue
|
||||
}
|
||||
if totalUnExistSeqs, err := mc.msgDatabase.DelMsgBySeqs(ctx, DeleteMessageTips.UserID, DeleteMessageTips.Seqs); err != nil {
|
||||
log.NewError(v.OperationID, utils.GetSelfFuncName(), "DelMsgBySeqs args: ", DeleteMessageTips.UserID, DeleteMessageTips.Seqs, "error:", err.Error(), "totalUnExistSeqs: ", totalUnExistSeqs)
|
||||
log.NewError(operationID, utils.GetSelfFuncName(), "DelMsgBySeqs args: ", DeleteMessageTips.UserID, DeleteMessageTips.Seqs, "error:", err.Error(), "totalUnExistSeqs: ", totalUnExistSeqs)
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (OnlineHistoryMongoConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil }
|
||||
func (OnlineHistoryMongoConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
|
||||
func (mc *OnlineHistoryMongoConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession,
|
||||
claim sarama.ConsumerGroupClaim) error { // a instance in the consumer group
|
||||
func (mc *OnlineHistoryMongoConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { // a instance in the consumer group
|
||||
log.NewDebug("", "online new session msg come", claim.HighWaterMarkOffset(), claim.Topic(), claim.Partition())
|
||||
for msg := range claim.Messages() {
|
||||
log.NewDebug("", "kafka get info to mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value), "key", string(msg.Key))
|
||||
if len(msg.Value) != 0 {
|
||||
mc.handleChatWs2Mongo(msg, string(msg.Key), sess)
|
||||
ctx := mc.historyConsumerGroup.GetContextFromMsg(msg)
|
||||
mc.handleChatWs2Mongo(ctx, msg, string(msg.Key), sess)
|
||||
} else {
|
||||
log.Error("", "mongo msg get from kafka but is nil", msg.Key)
|
||||
}
|
||||
|
||||
@@ -12,8 +12,10 @@ import (
|
||||
"OpenIM/pkg/common/db/controller"
|
||||
kfk "OpenIM/pkg/common/kafka"
|
||||
"OpenIM/pkg/common/log"
|
||||
"OpenIM/pkg/common/tracelog"
|
||||
pbMsg "OpenIM/pkg/proto/msg"
|
||||
"OpenIM/pkg/utils"
|
||||
"context"
|
||||
|
||||
"github.com/Shopify/sarama"
|
||||
"github.com/golang/protobuf/proto"
|
||||
@@ -21,26 +23,30 @@ import (
|
||||
|
||||
type PersistentConsumerHandler struct {
|
||||
persistentConsumerGroup *kfk.MConsumerGroup
|
||||
chatLogInterface controller.ChatLogDatabase
|
||||
chatLogDatabase controller.ChatLogDatabase
|
||||
}
|
||||
|
||||
func (pc *PersistentConsumerHandler) Init() {
|
||||
pc.persistentConsumerGroup = kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V2_0_0_0,
|
||||
OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.Ws2mschat.Topic},
|
||||
config.Config.Kafka.Ws2mschat.Addr, config.Config.Kafka.ConsumerGroupID.MsgToMySql)
|
||||
func NewPersistentConsumerHandler(database controller.ChatLogDatabase) *PersistentConsumerHandler {
|
||||
return &PersistentConsumerHandler{
|
||||
persistentConsumerGroup: kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V2_0_0_0,
|
||||
OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.Ws2mschat.Topic},
|
||||
config.Config.Kafka.Ws2mschat.Addr, config.Config.Kafka.ConsumerGroupID.MsgToMySql),
|
||||
chatLogDatabase: database,
|
||||
}
|
||||
}
|
||||
|
||||
func (pc *PersistentConsumerHandler) handleChatWs2Mysql(cMsg *sarama.ConsumerMessage, msgKey string, _ sarama.ConsumerGroupSession) {
|
||||
func (pc *PersistentConsumerHandler) handleChatWs2Mysql(ctx context.Context, cMsg *sarama.ConsumerMessage, msgKey string, _ sarama.ConsumerGroupSession) {
|
||||
msg := cMsg.Value
|
||||
operationID := tracelog.GetOperationID(ctx)
|
||||
log.NewInfo("msg come here mysql!!!", "", "msg", string(msg), msgKey)
|
||||
var tag bool
|
||||
msgFromMQ := pbMsg.MsgDataToMQ{}
|
||||
err := proto.Unmarshal(msg, &msgFromMQ)
|
||||
if err != nil {
|
||||
log.NewError(msgFromMQ.OperationID, "msg_transfer Unmarshal msg err", "msg", string(msg), "err", err.Error())
|
||||
log.NewError(operationID, "msg_transfer Unmarshal msg err", "msg", string(msg), "err", err.Error())
|
||||
return
|
||||
}
|
||||
log.Debug(msgFromMQ.OperationID, "proto.Unmarshal MsgDataToMQ", msgFromMQ.String())
|
||||
log.Debug(operationID, "proto.Unmarshal MsgDataToMQ", msgFromMQ.String())
|
||||
//Control whether to store history messages (mysql)
|
||||
isPersist := utils.GetSwitchFromOptions(msgFromMQ.MsgData.Options, constant.IsPersistent)
|
||||
//Only process receiver data
|
||||
@@ -58,23 +64,22 @@ func (pc *PersistentConsumerHandler) handleChatWs2Mysql(cMsg *sarama.ConsumerMes
|
||||
tag = true
|
||||
}
|
||||
if tag {
|
||||
log.NewInfo(msgFromMQ.OperationID, "msg_transfer msg persisting", string(msg))
|
||||
log.NewInfo(operationID, "msg_transfer msg persisting", string(msg))
|
||||
if err = pc.chatLogInterface.CreateChatLog(msgFromMQ); err != nil {
|
||||
log.NewError(msgFromMQ.OperationID, "Message insert failed", "err", err.Error(), "msg", msgFromMQ.String())
|
||||
log.NewError(operationID, "Message insert failed", "err", err.Error(), "msg", msgFromMQ.String())
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
func (PersistentConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil }
|
||||
func (PersistentConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
|
||||
func (pc *PersistentConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession,
|
||||
claim sarama.ConsumerGroupClaim) error {
|
||||
func (pc *PersistentConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
|
||||
for msg := range claim.Messages() {
|
||||
log.NewDebug("", "kafka get info to mysql", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value), "key", string(msg.Key))
|
||||
if len(msg.Value) != 0 {
|
||||
pc.handleChatWs2Mysql(msg, string(msg.Key), sess)
|
||||
ctx := pc.persistentConsumerGroup.GetContextFromMsg(msg)
|
||||
pc.handleChatWs2Mysql(ctx, msg, string(msg.Key), sess)
|
||||
} else {
|
||||
log.Error("", "msg get from kafka but is nil", msg.Key)
|
||||
}
|
||||
|
||||
@@ -95,7 +95,7 @@ func (m *minioImpl) GetObjectInfo(ctx context.Context, args *BucketFile) (*Objec
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (m *minioImpl) CopyObjetInfo(ctx context.Context, src *BucketFile, dst *BucketFile) error {
|
||||
func (m *minioImpl) CopyObjectInfo(ctx context.Context, src *BucketFile, dst *BucketFile) error {
|
||||
destination, err := minio.NewDestinationInfo(dst.Bucket, dst.Name, nil, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -103,15 +103,15 @@ func (m *minioImpl) CopyObjetInfo(ctx context.Context, src *BucketFile, dst *Buc
|
||||
return m.client.CopyObject(destination, minio.NewSourceInfo(src.Bucket, src.Name, nil))
|
||||
}
|
||||
|
||||
func (m *minioImpl) DeleteObjetInfo(ctx context.Context, info *BucketFile) error {
|
||||
func (m *minioImpl) DeleteObjectInfo(ctx context.Context, info *BucketFile) error {
|
||||
return m.client.RemoveObject(info.Bucket, info.Name)
|
||||
}
|
||||
|
||||
func (m *minioImpl) MoveObjetInfo(ctx context.Context, src *BucketFile, dst *BucketFile) error {
|
||||
if err := m.CopyObjetInfo(ctx, src, dst); err != nil {
|
||||
func (m *minioImpl) MoveObjectInfo(ctx context.Context, src *BucketFile, dst *BucketFile) error {
|
||||
if err := m.CopyObjectInfo(ctx, src, dst); err != nil {
|
||||
return err
|
||||
}
|
||||
return m.DeleteObjetInfo(ctx, src)
|
||||
return m.DeleteObjectInfo(ctx, src)
|
||||
}
|
||||
|
||||
func (m *minioImpl) MergeObjectInfo(ctx context.Context, src []BucketFile, dst *BucketFile) error {
|
||||
@@ -119,7 +119,7 @@ func (m *minioImpl) MergeObjectInfo(ctx context.Context, src []BucketFile, dst *
|
||||
case 0:
|
||||
return errors.New("src empty")
|
||||
case 1:
|
||||
return m.CopyObjetInfo(ctx, &src[0], dst)
|
||||
return m.CopyObjectInfo(ctx, &src[0], dst)
|
||||
}
|
||||
destination, err := minio.NewDestinationInfo(dst.Bucket, dst.Name, nil, nil)
|
||||
if err != nil {
|
||||
|
||||
@@ -11,9 +11,9 @@ type Interface interface {
|
||||
ClearBucket() string
|
||||
ApplyPut(ctx context.Context, args *ApplyPutArgs) (*PutRes, error)
|
||||
GetObjectInfo(ctx context.Context, args *BucketFile) (*ObjectInfo, error)
|
||||
CopyObjetInfo(ctx context.Context, src *BucketFile, dst *BucketFile) error
|
||||
DeleteObjetInfo(ctx context.Context, info *BucketFile) error
|
||||
MoveObjetInfo(ctx context.Context, src *BucketFile, dst *BucketFile) error
|
||||
CopyObjectInfo(ctx context.Context, src *BucketFile, dst *BucketFile) error
|
||||
DeleteObjectInfo(ctx context.Context, info *BucketFile) error
|
||||
MoveObjectInfo(ctx context.Context, src *BucketFile, dst *BucketFile) error
|
||||
MergeObjectInfo(ctx context.Context, src []BucketFile, dst *BucketFile) error
|
||||
IsNotFound(err error) bool
|
||||
}
|
||||
|
||||
@@ -107,143 +107,3 @@ func callbackBeforeSuperGroupOnlinePush(ctx context.Context, groupID string, msg
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
//func callbackOfflinePush(operationID string, userIDList []string, msg *common.MsgData, offlinePushUserIDList *[]string) cbApi.CommonCallbackResp {
|
||||
// callbackResp := cbapi.CommonCallbackResp{OperationID: operationID}
|
||||
// if !config.Config.Callback.CallbackOfflinePush.Enable {
|
||||
// return callbackResp
|
||||
// }
|
||||
// req := cbApi.CallbackBeforePushReq{
|
||||
// UserStatusBatchCallbackReq: cbApi.UserStatusBatchCallbackReq{
|
||||
// UserStatusBaseCallback: cbApi.UserStatusBaseCallback{
|
||||
// CallbackCommand: constant.CallbackOfflinePushCommand,
|
||||
// OperationID: operationID,
|
||||
// PlatformID: msg.SenderPlatformID,
|
||||
// Platform: constant.PlatformIDToName(int(msg.SenderPlatformID)),
|
||||
// },
|
||||
// UserIDList: userIDList,
|
||||
// },
|
||||
// OfflinePushInfo: msg.OfflinePushInfo,
|
||||
// ClientMsgID: msg.ClientMsgID,
|
||||
// SendID: msg.SendID,
|
||||
// GroupID: msg.GroupID,
|
||||
// ContentType: msg.ContentType,
|
||||
// SessionType: msg.SessionType,
|
||||
// AtUserIDList: msg.AtUserIDList,
|
||||
// Content: callback.GetContent(msg),
|
||||
// }
|
||||
// resp := &cbApi.CallbackBeforePushResp{CommonCallbackResp: &callbackResp}
|
||||
// if err := http.CallBackPostReturn(config.Config.Callback.CallbackUrl, constant.CallbackOfflinePushCommand, req, resp, config.Config.Callback.CallbackOfflinePush.CallbackTimeOut); err != nil {
|
||||
// callbackResp.ErrCode = http2.StatusInternalServerError
|
||||
// callbackResp.ErrMsg = err.Error()
|
||||
// if !*config.Config.Callback.CallbackOfflinePush.CallbackFailedContinue {
|
||||
// callbackResp.ActionCode = constant.ActionForbidden
|
||||
// return callbackResp
|
||||
// } else {
|
||||
// callbackResp.ActionCode = constant.ActionAllow
|
||||
// return callbackResp
|
||||
// }
|
||||
// }
|
||||
// if resp.ErrCode == constant.CallbackHandleSuccess && resp.ActionCode == constant.ActionAllow {
|
||||
// if len(resp.UserIDList) != 0 {
|
||||
// *offlinePushUserIDList = resp.UserIDList
|
||||
// }
|
||||
// if resp.OfflinePushInfo != nil {
|
||||
// msg.OfflinePushInfo = resp.OfflinePushInfo
|
||||
// }
|
||||
// }
|
||||
// log.NewDebug(operationID, utils.GetSelfFuncName(), offlinePushUserIDList, resp.UserIDList)
|
||||
// return callbackResp
|
||||
//}
|
||||
//
|
||||
//func callbackOnlinePush(operationID string, userIDList []string, msg *common.MsgData) cbApi.CommonCallbackResp {
|
||||
// callbackResp := cbApi.CommonCallbackResp{OperationID: operationID}
|
||||
// if !config.Config.Callback.CallbackOnlinePush.Enable || utils.IsContain(msg.SendID, userIDList) {
|
||||
// return callbackResp
|
||||
// }
|
||||
// req := cbApi.CallbackBeforePushReq{
|
||||
// UserStatusBatchCallbackReq: cbApi.UserStatusBatchCallbackReq{
|
||||
// UserStatusBaseCallback: cbApi.UserStatusBaseCallback{
|
||||
// CallbackCommand: constant.CallbackOnlinePushCommand,
|
||||
// OperationID: operationID,
|
||||
// PlatformID: msg.SenderPlatformID,
|
||||
// Platform: constant.PlatformIDToName(int(msg.SenderPlatformID)),
|
||||
// },
|
||||
// UserIDList: userIDList,
|
||||
// },
|
||||
// //OfflinePushInfo: msg.OfflinePushInfo,
|
||||
// ClientMsgID: msg.ClientMsgID,
|
||||
// SendID: msg.SendID,
|
||||
// GroupID: msg.GroupID,
|
||||
// ContentType: msg.ContentType,
|
||||
// SessionType: msg.SessionType,
|
||||
// AtUserIDList: msg.AtUserIDList,
|
||||
// Content: callback.GetContent(msg),
|
||||
// }
|
||||
// resp := &cbApi.CallbackBeforePushResp{CommonCallbackResp: &callbackResp}
|
||||
// if err := http.CallBackPostReturn(config.Config.Callback.CallbackUrl, constant.CallbackOnlinePushCommand, req, resp, config.Config.Callback.CallbackOnlinePush.CallbackTimeOut); err != nil {
|
||||
// callbackResp.ErrCode = http2.StatusInternalServerError
|
||||
// callbackResp.ErrMsg = err.Error()
|
||||
// if !config.Config.Callback.CallbackOnlinePush.CallbackFailedContinue {
|
||||
// callbackResp.ActionCode = constant.ActionForbidden
|
||||
// return callbackResp
|
||||
// } else {
|
||||
// callbackResp.ActionCode = constant.ActionAllow
|
||||
// return callbackResp
|
||||
// }
|
||||
// }
|
||||
// if resp.ErrCode == constant.CallbackHandleSuccess && resp.ActionCode == constant.ActionAllow {
|
||||
// //if resp.OfflinePushInfo != nil {
|
||||
// // msg.OfflinePushInfo = resp.OfflinePushInfo
|
||||
// //}
|
||||
// }
|
||||
// return callbackResp
|
||||
//}
|
||||
//
|
||||
//func callbackBeforeSuperGroupOnlinePush(operationID string, groupID string, msg *common.MsgData, pushToUserList *[]string) cbApi.CommonCallbackResp {
|
||||
// log.Debug(operationID, utils.GetSelfFuncName(), groupID, msg.String(), pushToUserList)
|
||||
// callbackResp := cbApi.CommonCallbackResp{OperationID: operationID}
|
||||
// if !config.Config.Callback.CallbackBeforeSuperGroupOnlinePush.Enable {
|
||||
// return callbackResp
|
||||
// }
|
||||
// req := cbApi.CallbackBeforeSuperGroupOnlinePushReq{
|
||||
// UserStatusBaseCallback: cbApi.UserStatusBaseCallback{
|
||||
// CallbackCommand: constant.CallbackSuperGroupOnlinePushCommand,
|
||||
// OperationID: operationID,
|
||||
// PlatformID: msg.SenderPlatformID,
|
||||
// Platform: constant.PlatformIDToName(int(msg.SenderPlatformID)),
|
||||
// },
|
||||
// //OfflinePushInfo: msg.OfflinePushInfo,
|
||||
// ClientMsgID: msg.ClientMsgID,
|
||||
// SendID: msg.SendID,
|
||||
// GroupID: groupID,
|
||||
// ContentType: msg.ContentType,
|
||||
// SessionType: msg.SessionType,
|
||||
// AtUserIDList: msg.AtUserIDList,
|
||||
// Content: callback.GetContent(msg),
|
||||
// Seq: msg.Seq,
|
||||
// }
|
||||
// resp := &cbApi.CallbackBeforeSuperGroupOnlinePushResp{CommonCallbackResp: &callbackResp}
|
||||
// if err := http.CallBackPostReturn(config.Config.Callback.CallbackUrl, constant.CallbackSuperGroupOnlinePushCommand, req, resp, config.Config.Callback.CallbackBeforeSuperGroupOnlinePush.CallbackTimeOut); err != nil {
|
||||
// callbackResp.ErrCode = http2.StatusInternalServerError
|
||||
// callbackResp.ErrMsg = err.Error()
|
||||
// if !config.Config.Callback.CallbackBeforeSuperGroupOnlinePush.CallbackFailedContinue {
|
||||
// callbackResp.ActionCode = constant.ActionForbidden
|
||||
// return callbackResp
|
||||
// } else {
|
||||
// callbackResp.ActionCode = constant.ActionAllow
|
||||
// return callbackResp
|
||||
// }
|
||||
// }
|
||||
// if resp.ErrCode == constant.CallbackHandleSuccess && resp.ActionCode == constant.ActionAllow {
|
||||
// if len(resp.UserIDList) != 0 {
|
||||
// *pushToUserList = resp.UserIDList
|
||||
// }
|
||||
// //if resp.OfflinePushInfo != nil {
|
||||
// // msg.OfflinePushInfo = resp.OfflinePushInfo
|
||||
// //}
|
||||
// }
|
||||
// log.NewDebug(operationID, utils.GetSelfFuncName(), pushToUserList, resp.UserIDList)
|
||||
// return callbackResp
|
||||
//
|
||||
//}
|
||||
|
||||
@@ -0,0 +1,36 @@
|
||||
/*
|
||||
** description("").
|
||||
** copyright('open-im,www.open-im.io').
|
||||
** author("fg,Gordon@open-im.io").
|
||||
** time(2021/3/22 15:33).
|
||||
*/
|
||||
package push
|
||||
|
||||
import (
|
||||
"OpenIM/pkg/common/config"
|
||||
"OpenIM/pkg/common/constant"
|
||||
"OpenIM/pkg/common/prome"
|
||||
"OpenIM/pkg/statistics"
|
||||
"fmt"
|
||||
)
|
||||
|
||||
type Consumer struct {
|
||||
pushCh ConsumerHandler
|
||||
successCount uint64
|
||||
}
|
||||
|
||||
func NewConsumer(pusher *Pusher) *Consumer {
|
||||
return &Consumer{
|
||||
pushCh: *NewConsumerHandler(pusher),
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Consumer) initPrometheus() {
|
||||
prome.NewMsgOfflinePushSuccessCounter()
|
||||
prome.NewMsgOfflinePushFailedCounter()
|
||||
}
|
||||
|
||||
func (c *Consumer) Start() {
|
||||
statistics.NewStatistics(&c.successCount, config.Config.ModuleName.PushName, fmt.Sprintf("%d second push to msg_gateway count", constant.StatisticsTimeInterval), constant.StatisticsTimeInterval)
|
||||
go c.pushCh.pushConsumerGroup.RegisterHandleAndConsumer(&c.pushCh)
|
||||
}
|
||||
@@ -19,10 +19,10 @@ var Terminal = []int{constant.IOSPlatformID, constant.AndroidPlatformID, constan
|
||||
|
||||
type Fcm struct {
|
||||
fcmMsgCli *messaging.Client
|
||||
cache cache.Cache
|
||||
cache cache.Model
|
||||
}
|
||||
|
||||
func NewClient(cache cache.Cache) *Fcm {
|
||||
func NewClient(cache cache.Model) *Fcm {
|
||||
opt := option.WithCredentialsFile(filepath.Join(config.Root, "config", config.Config.Push.Fcm.ServiceAccount))
|
||||
fcmApp, err := firebase.NewApp(context.Background(), nil, opt)
|
||||
if err != nil {
|
||||
|
||||
@@ -9,7 +9,7 @@ import (
|
||||
)
|
||||
|
||||
func Test_Push(t *testing.T) {
|
||||
var redis cache.Cache
|
||||
var redis cache.Model
|
||||
offlinePusher := NewClient(redis)
|
||||
err := offlinePusher.Push(context.Background(), []string{"userID1"}, "test", "test", &push.Opts{})
|
||||
assert.Nil(t, err)
|
||||
|
||||
@@ -38,12 +38,12 @@ const (
|
||||
)
|
||||
|
||||
type Client struct {
|
||||
cache cache.Cache
|
||||
cache cache.Model
|
||||
tokenExpireTime int64
|
||||
taskIDTTL int64
|
||||
}
|
||||
|
||||
func NewClient(cache cache.Cache) *Client {
|
||||
func NewClient(cache cache.Model) *Client {
|
||||
return &Client{cache: cache, tokenExpireTime: tokenExpireTime, taskIDTTL: taskIDTTL}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,63 +0,0 @@
|
||||
/*
|
||||
** description("").
|
||||
** copyright('open-im,www.open-im.io').
|
||||
** author("fg,Gordon@open-im.io").
|
||||
** time(2021/3/22 15:33).
|
||||
*/
|
||||
package push
|
||||
|
||||
import (
|
||||
fcm "OpenIM/internal/push/fcm"
|
||||
"OpenIM/internal/push/getui"
|
||||
jpush "OpenIM/internal/push/jpush"
|
||||
"OpenIM/pkg/common/config"
|
||||
"OpenIM/pkg/common/constant"
|
||||
"OpenIM/pkg/common/db/cache"
|
||||
"OpenIM/pkg/common/prome"
|
||||
"OpenIM/pkg/statistics"
|
||||
"fmt"
|
||||
)
|
||||
|
||||
type Push struct {
|
||||
rpcServer PushServer
|
||||
pushCh ConsumerHandler
|
||||
offlinePusher OfflinePusher
|
||||
successCount uint64
|
||||
}
|
||||
|
||||
func (p *Push) Init(rpcPort int) error {
|
||||
rdb, err := cache.NewRedis()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
var cacheInterface cache.Cache
|
||||
p.rpcServer.Init(rpcPort, cacheInterface)
|
||||
p.pushCh.Init()
|
||||
statistics.NewStatistics(&p.successCount, config.Config.ModuleName.PushName, fmt.Sprintf("%d second push to msg_gateway count", constant.StatisticsTimeInterval), constant.StatisticsTimeInterval)
|
||||
if *config.Config.Push.Getui.Enable {
|
||||
p.offlinePusher = getui.NewClient(cacheInterface)
|
||||
}
|
||||
if config.Config.Push.Jpns.Enable {
|
||||
p.offlinePusher = jpush.NewClient()
|
||||
}
|
||||
if config.Config.Push.Fcm.Enable {
|
||||
p.offlinePusher = fcm.NewClient(cacheInterface)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *Push) initPrometheus() {
|
||||
prome.NewMsgOfflinePushSuccessCounter()
|
||||
prome.NewMsgOfflinePushFailedCounter()
|
||||
}
|
||||
|
||||
func (p *Push) Run(prometheusPort int) {
|
||||
go p.rpcServer.run()
|
||||
go p.pushCh.pushConsumerGroup.RegisterHandleAndConsumer(&p.pushCh)
|
||||
go func() {
|
||||
err := prome.StartPrometheusSrv(prometheusPort)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}()
|
||||
}
|
||||
@@ -1,4 +1,4 @@
|
||||
package push
|
||||
package jpush
|
||||
|
||||
import (
|
||||
"OpenIM/internal/push"
|
||||
|
||||
@@ -0,0 +1,39 @@
|
||||
package push
|
||||
|
||||
import (
|
||||
"OpenIM/internal/push/fcm"
|
||||
"OpenIM/internal/push/getui"
|
||||
"OpenIM/internal/push/jpush"
|
||||
"OpenIM/pkg/common/config"
|
||||
"OpenIM/pkg/common/db/cache"
|
||||
"context"
|
||||
)
|
||||
|
||||
type OfflinePusher interface {
|
||||
Push(ctx context.Context, userIDs []string, title, content string, opts *Opts) error
|
||||
}
|
||||
|
||||
func NewOfflinePusher(cache cache.Model) OfflinePusher {
|
||||
var offlinePusher OfflinePusher
|
||||
if config.Config.Push.Getui.Enable {
|
||||
offlinePusher = getui.NewClient(cache)
|
||||
}
|
||||
if config.Config.Push.Fcm.Enable {
|
||||
offlinePusher = fcm.NewClient(cache)
|
||||
}
|
||||
if config.Config.Push.Jpns.Enable {
|
||||
offlinePusher = jpush.NewClient()
|
||||
}
|
||||
return offlinePusher
|
||||
}
|
||||
|
||||
type Opts struct {
|
||||
Signal *Signal
|
||||
IOSPushSound string
|
||||
IOSBadgeCount bool
|
||||
Ex string
|
||||
}
|
||||
|
||||
type Signal struct {
|
||||
ClientMsgID string
|
||||
}
|
||||
@@ -22,16 +22,19 @@ import (
|
||||
|
||||
type ConsumerHandler struct {
|
||||
pushConsumerGroup *kfk.MConsumerGroup
|
||||
pusher Pusher
|
||||
pusher *Pusher
|
||||
}
|
||||
|
||||
func (c *ConsumerHandler) Init() {
|
||||
c.pushConsumerGroup = kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V2_0_0_0,
|
||||
func NewConsumerHandler(pusher *Pusher) *ConsumerHandler {
|
||||
var consumerHandler ConsumerHandler
|
||||
consumerHandler.pusher = pusher
|
||||
consumerHandler.pushConsumerGroup = kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V2_0_0_0,
|
||||
OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.Ms2pschat.Topic}, config.Config.Kafka.Ms2pschat.Addr,
|
||||
config.Config.Kafka.ConsumerGroupID.MsgToPush)
|
||||
return &consumerHandler
|
||||
}
|
||||
|
||||
func (c *ConsumerHandler) handleMs2PsChat(msg []byte) {
|
||||
func (c *ConsumerHandler) handleMs2PsChat(ctx context.Context, msg []byte) {
|
||||
log.NewDebug("", "msg come from kafka And push!!!", "msg", string(msg))
|
||||
msgFromMQ := pbChat.PushMsgDataToMQ{}
|
||||
if err := proto.Unmarshal(msg, &msgFromMQ); err != nil {
|
||||
@@ -40,14 +43,13 @@ func (c *ConsumerHandler) handleMs2PsChat(msg []byte) {
|
||||
}
|
||||
pbData := &pbPush.PushMsgReq{
|
||||
MsgData: msgFromMQ.MsgData,
|
||||
SourceID: msgFromMQ.PushToUserID,
|
||||
SourceID: msgFromMQ.SourceID,
|
||||
}
|
||||
sec := msgFromMQ.MsgData.SendTime / 1000
|
||||
nowSec := utils.GetCurrentTimestampBySecond()
|
||||
if nowSec-sec > 10 {
|
||||
return
|
||||
}
|
||||
ctx := context.Background()
|
||||
tracelog.SetOperationID(ctx, "")
|
||||
var err error
|
||||
switch msgFromMQ.MsgData.SessionType {
|
||||
@@ -66,7 +68,8 @@ func (c *ConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession,
|
||||
claim sarama.ConsumerGroupClaim) error {
|
||||
for msg := range claim.Messages() {
|
||||
log.NewDebug("", "kafka get info to mysql", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value))
|
||||
c.handleMs2PsChat(msg.Value)
|
||||
ctx := c.pushConsumerGroup.GetContextFromMsg(msg)
|
||||
c.handleMs2PsChat(ctx, msg.Value)
|
||||
sess.MarkMessage(msg, "")
|
||||
}
|
||||
return nil
|
||||
|
||||
@@ -1,18 +0,0 @@
|
||||
package push
|
||||
|
||||
import "context"
|
||||
|
||||
type OfflinePusher interface {
|
||||
Push(ctx context.Context, userIDs []string, title, content string, opts *Opts) error
|
||||
}
|
||||
|
||||
type Opts struct {
|
||||
Signal *Signal
|
||||
IOSPushSound string
|
||||
IOSBadgeCount bool
|
||||
Ex string
|
||||
}
|
||||
|
||||
type Signal struct {
|
||||
ClientMsgID string
|
||||
}
|
||||
@@ -1,24 +1,47 @@
|
||||
package push
|
||||
|
||||
import (
|
||||
"OpenIM/pkg/common/config"
|
||||
"OpenIM/pkg/common/constant"
|
||||
"OpenIM/pkg/common/db/cache"
|
||||
"OpenIM/pkg/common/db/controller"
|
||||
"OpenIM/pkg/common/db/localcache"
|
||||
"OpenIM/pkg/discoveryregistry"
|
||||
pbPush "OpenIM/pkg/proto/push"
|
||||
"context"
|
||||
"github.com/OpenIMSDK/openKeeper"
|
||||
"google.golang.org/grpc"
|
||||
"sync"
|
||||
)
|
||||
|
||||
type pushServer struct {
|
||||
pusher *Pusher
|
||||
}
|
||||
|
||||
func Start(client *openKeeper.ZkClient, server *grpc.Server) error {
|
||||
func Start(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error {
|
||||
rdb, err := cache.NewRedis()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
cacheModel := cache.NewCacheModel(rdb)
|
||||
|
||||
pbPush.RegisterPushMsgServiceServer(server, &pushServer{
|
||||
pusher: NewPusher(),
|
||||
})
|
||||
offlinePusher := NewOfflinePusher(cacheModel)
|
||||
database := controller.NewPushDatabase(cacheModel)
|
||||
pusher := NewPusher(client, offlinePusher, database, localcache.NewGroupLocalCache(client), localcache.NewConversationLocalCache(client))
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(2)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
pbPush.RegisterPushMsgServiceServer(server, &pushServer{
|
||||
pusher: pusher,
|
||||
})
|
||||
}()
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
consumer := NewConsumer(pusher)
|
||||
consumer.initPrometheus()
|
||||
consumer.Start()
|
||||
}()
|
||||
wg.Wait()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *pushServer) PushMsg(ctx context.Context, pbData *pbPush.PushMsgReq) (resp *pbPush.PushMsgResp, err error) {
|
||||
|
||||
@@ -27,13 +27,13 @@ type Pusher struct {
|
||||
database controller.PushDatabase
|
||||
client discoveryregistry.SvcDiscoveryRegistry
|
||||
offlinePusher OfflinePusher
|
||||
groupLocalCache localcache.GroupLocalCache
|
||||
conversationLocalCache localcache.ConversationLocalCache
|
||||
groupLocalCache *localcache.GroupLocalCache
|
||||
conversationLocalCache *localcache.ConversationLocalCache
|
||||
successCount int
|
||||
}
|
||||
|
||||
func NewPusher(client discoveryregistry.SvcDiscoveryRegistry, offlinePusher OfflinePusher, database controller.PushDatabase,
|
||||
groupLocalCache localcache.GroupLocalCache, conversationLocalCache localcache.ConversationLocalCache) *Pusher {
|
||||
groupLocalCache *localcache.GroupLocalCache, conversationLocalCache *localcache.ConversationLocalCache) *Pusher {
|
||||
return &Pusher{
|
||||
database: database,
|
||||
client: client,
|
||||
@@ -162,7 +162,7 @@ func (p *Pusher) MsgToSuperGroupUser(ctx context.Context, groupID string, msg *s
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *Pusher) GetConnsAndOnlinePush(ctx context.Context, msg *sdkws.MsgData, pushToUserIDs []string) (wsResults []*msggateway.SingleMsgToUserResultList, err error) {
|
||||
func (p *Pusher) GetConnsAndOnlinePush(ctx context.Context, msg *sdkws.MsgData, pushToUserIDs []string) (wsResults []*msggateway.SingleMsgToUserResults, err error) {
|
||||
conns, err := p.client.GetConns(config.Config.RpcRegisterName.OpenImMessageGatewayName)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -170,7 +170,7 @@ func (p *Pusher) GetConnsAndOnlinePush(ctx context.Context, msg *sdkws.MsgData,
|
||||
//Online push message
|
||||
for _, v := range conns {
|
||||
msgClient := msggateway.NewMsgGatewayClient(v)
|
||||
reply, err := msgClient.SuperGroupOnlineBatchPushOneMsg(ctx, &msggateway.OnlineBatchPushOneMsgReq{OperationID: tracelog.GetOperationID(ctx), MsgData: msg, PushToUserIDList: pushToUserIDs})
|
||||
reply, err := msgClient.SuperGroupOnlineBatchPushOneMsg(ctx, &msggateway.OnlineBatchPushOneMsgReq{MsgData: msg, PushToUserIDs: pushToUserIDs})
|
||||
if err != nil {
|
||||
log.NewError(tracelog.GetOperationID(ctx), msg, len(pushToUserIDs), "err", err)
|
||||
continue
|
||||
|
||||
@@ -6,8 +6,6 @@ import (
|
||||
"OpenIM/pkg/common/constant"
|
||||
"OpenIM/pkg/common/db/cache"
|
||||
"OpenIM/pkg/common/db/controller"
|
||||
"OpenIM/pkg/common/db/relation"
|
||||
relationTb "OpenIM/pkg/common/db/table/relation"
|
||||
"OpenIM/pkg/common/log"
|
||||
"OpenIM/pkg/common/tokenverify"
|
||||
"OpenIM/pkg/common/tracelog"
|
||||
@@ -20,19 +18,12 @@ import (
|
||||
)
|
||||
|
||||
type authServer struct {
|
||||
controller.AuthDatabase
|
||||
authDatabase controller.AuthDatabase
|
||||
userCheck *check.UserCheck
|
||||
RegisterCenter discoveryRegistry.SvcDiscoveryRegistry
|
||||
}
|
||||
|
||||
func Start(client discoveryRegistry.SvcDiscoveryRegistry, server *grpc.Server) error {
|
||||
mysql, err := relation.NewGormDB()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := mysql.AutoMigrate(&relationTb.FriendModel{}, &relationTb.FriendRequestModel{}, &relationTb.BlackModel{}); err != nil {
|
||||
return err
|
||||
}
|
||||
rdb, err := cache.NewRedis()
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -40,7 +31,7 @@ func Start(client discoveryRegistry.SvcDiscoveryRegistry, server *grpc.Server) e
|
||||
pbAuth.RegisterAuthServer(server, &authServer{
|
||||
userCheck: check.NewUserCheck(client),
|
||||
RegisterCenter: client,
|
||||
AuthDatabase: controller.NewAuthDatabase(cache.NewCache(rdb), config.Config.TokenPolicy.AccessSecret, config.Config.TokenPolicy.AccessExpire),
|
||||
authDatabase: controller.NewAuthDatabase(cache.NewCacheModel(rdb), config.Config.TokenPolicy.AccessSecret, config.Config.TokenPolicy.AccessExpire),
|
||||
})
|
||||
return nil
|
||||
}
|
||||
@@ -50,7 +41,7 @@ func (s *authServer) UserToken(ctx context.Context, req *pbAuth.UserTokenReq) (*
|
||||
if _, err := s.userCheck.GetUsersInfo(ctx, req.UserID); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
token, err := s.CreateToken(ctx, req.UserID, constant.PlatformIDToName(int(req.PlatformID)))
|
||||
token, err := s.authDatabase.CreateToken(ctx, req.UserID, constant.PlatformIDToName(int(req.PlatformID)))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -64,7 +55,7 @@ func (s *authServer) parseToken(ctx context.Context, tokensString string) (claim
|
||||
if err != nil {
|
||||
return nil, utils.Wrap(err, "")
|
||||
}
|
||||
m, err := s.GetTokensWithoutError(ctx, claims.UID, claims.Platform)
|
||||
m, err := s.authDatabase.GetTokensWithoutError(ctx, claims.UID, claims.Platform)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -108,13 +99,13 @@ func (s *authServer) ForceLogout(ctx context.Context, req *pbAuth.ForceLogoutReq
|
||||
}
|
||||
|
||||
func (s *authServer) forceKickOff(ctx context.Context, userID string, platformID int32, operationID string) error {
|
||||
grpcCons, err := s.RegisterCenter.GetConns(config.Config.RpcRegisterName.OpenImMessageGatewayName)
|
||||
conns, err := s.RegisterCenter.GetConns(config.Config.RpcRegisterName.OpenImMessageGatewayName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, v := range grpcCons {
|
||||
for _, v := range conns {
|
||||
client := msggateway.NewMsgGatewayClient(v)
|
||||
kickReq := &msggateway.KickUserOfflineReq{OperationID: operationID, KickUserIDList: []string{userID}, PlatformID: platformID}
|
||||
kickReq := &msggateway.KickUserOfflineReq{KickUserIDList: []string{userID}, PlatformID: platformID}
|
||||
log.NewInfo(operationID, "KickUserOffline ", client, kickReq.String())
|
||||
_, err := client.KickUserOffline(ctx, kickReq)
|
||||
return utils.Wrap(err, "")
|
||||
|
||||
@@ -9,10 +9,10 @@ import (
|
||||
"OpenIM/pkg/common/db/relation"
|
||||
tableRelation "OpenIM/pkg/common/db/table/relation"
|
||||
"OpenIM/pkg/common/db/tx"
|
||||
"OpenIM/pkg/discoveryregistry"
|
||||
pbConversation "OpenIM/pkg/proto/conversation"
|
||||
"OpenIM/pkg/utils"
|
||||
"context"
|
||||
"github.com/OpenIMSDK/openKeeper"
|
||||
"github.com/dtm-labs/rockscache"
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
@@ -23,7 +23,7 @@ type conversationServer struct {
|
||||
notify *notification.Check
|
||||
}
|
||||
|
||||
func Start(client *openKeeper.ZkClient, server *grpc.Server) error {
|
||||
func Start(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error {
|
||||
db, err := relation.NewGormDB()
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -31,13 +31,14 @@ func Start(client *openKeeper.ZkClient, server *grpc.Server) error {
|
||||
if err := db.AutoMigrate(&tableRelation.ConversationModel{}); err != nil {
|
||||
return err
|
||||
}
|
||||
redis, err := cache.NewRedis()
|
||||
rdb, err := cache.NewRedis()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
pbConversation.RegisterConversationServer(server, &conversationServer{
|
||||
groupChecker: check.NewGroupChecker(client),
|
||||
ConversationDatabase: controller.NewConversationDatabase(relation.NewConversationGorm(db), cache.NewConversationRedis(redis.GetClient(), rockscache.Options{
|
||||
ConversationDatabase: controller.NewConversationDatabase(relation.NewConversationGorm(db), cache.NewConversationRedis(rdb, rockscache.Options{
|
||||
RandomExpireAdjustment: 0.2,
|
||||
DisableCacheRead: false,
|
||||
DisableCacheDelete: false,
|
||||
@@ -176,11 +177,11 @@ func (c *conversationServer) ModifyConversationField(ctx context.Context, req *p
|
||||
|
||||
// 获取超级大群开启免打扰的用户ID
|
||||
func (c *conversationServer) GetRecvMsgNotNotifyUserIDs(ctx context.Context, req *pbConversation.GetRecvMsgNotNotifyUserIDsReq) (*pbConversation.GetRecvMsgNotNotifyUserIDsResp, error) {
|
||||
resp := &pbConversation.GetRecvMsgNotNotifyUserIDsResp{}
|
||||
userIDs, err := c.ConversationDatabase.FindRecvMsgNotNotifyUserIDs(ctx, req.GroupID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
resp := &pbConversation.GetRecvMsgNotNotifyUserIDsResp{}
|
||||
resp.UserIDs = userIDs
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
@@ -12,13 +12,13 @@ import (
|
||||
"OpenIM/pkg/common/db/unrelation"
|
||||
"OpenIM/pkg/common/tokenverify"
|
||||
"OpenIM/pkg/common/tracelog"
|
||||
"OpenIM/pkg/discoveryregistry"
|
||||
pbConversation "OpenIM/pkg/proto/conversation"
|
||||
pbGroup "OpenIM/pkg/proto/group"
|
||||
"OpenIM/pkg/proto/sdkws"
|
||||
"OpenIM/pkg/utils"
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/OpenIMSDK/openKeeper"
|
||||
"github.com/dtm-labs/rockscache"
|
||||
"google.golang.org/grpc"
|
||||
"gorm.io/gorm"
|
||||
@@ -29,7 +29,7 @@ import (
|
||||
"time"
|
||||
)
|
||||
|
||||
func Start(client *openKeeper.ZkClient, server *grpc.Server) error {
|
||||
func Start(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error {
|
||||
db, err := relation.NewGormDB()
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -46,17 +46,7 @@ func Start(client *openKeeper.ZkClient, server *grpc.Server) error {
|
||||
return err
|
||||
}
|
||||
pbGroup.RegisterGroupServer(server, &groupServer{
|
||||
GroupDatabase: controller.NewGroupDatabase(
|
||||
relation.NewGroupDB(db),
|
||||
relation.NewGroupMemberDB(db),
|
||||
relation.NewGroupRequest(db),
|
||||
tx.NewGorm(db),
|
||||
tx.NewMongo(mongo.GetClient()),
|
||||
unrelation.NewSuperGroupMongoDriver(mongo.GetClient()),
|
||||
cache.NewGroupCacheRedis(rdb, relation.NewGroupDB(db), relation.NewGroupMemberDB(db), relation.NewGroupRequest(db), unrelation.NewSuperGroupMongoDriver(mongo.GetClient()), rockscache.Options{
|
||||
StrongConsistency: true,
|
||||
}),
|
||||
),
|
||||
GroupDatabase: controller.InitGroupDatabase(db, rdb, mongo.GetDatabase()),
|
||||
UserCheck: check.NewUserCheck(client),
|
||||
ConversationChecker: check.NewConversationChecker(client),
|
||||
})
|
||||
|
||||
@@ -3,16 +3,15 @@ package msg
|
||||
import (
|
||||
"OpenIM/pkg/common/tokenverify"
|
||||
"OpenIM/pkg/proto/msg"
|
||||
"OpenIM/pkg/proto/sdkws"
|
||||
"context"
|
||||
)
|
||||
|
||||
func (m *msgServer) DelMsgList(ctx context.Context, req *sdkws.DelMsgListReq) (*sdkws.DelMsgListResp, error) {
|
||||
resp := &sdkws.DelMsgListResp{}
|
||||
if _, err := m.MsgDatabase.DelMsgBySeqs(ctx, req.UserID, req.SeqList); err != nil {
|
||||
func (m *msgServer) DelMsgs(ctx context.Context, req *msg.DelMsgsReq) (*msg.DelMsgsResp, error) {
|
||||
resp := &msg.DelMsgsResp{}
|
||||
if _, err := m.MsgDatabase.DelMsgBySeqs(ctx, req.UserID, req.Seqs); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
DeleteMessageNotification(ctx, req.UserID, req.SeqList)
|
||||
DeleteMessageNotification(ctx, req.UserID, req.Seqs)
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
@@ -21,13 +20,6 @@ func (m *msgServer) DelSuperGroupMsg(ctx context.Context, req *msg.DelSuperGroup
|
||||
if err := tokenverify.CheckAdmin(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
//maxSeq, err := m.MsgDatabase.GetGroupMaxSeq(ctx, req.GroupID)
|
||||
//if err != nil {
|
||||
// return nil, err
|
||||
//}
|
||||
//if err := m.MsgDatabase.SetGroupUserMinSeq(ctx, req.GroupID, maxSeq); err != nil {
|
||||
// return nil, err
|
||||
//}
|
||||
if err := m.MsgDatabase.DeleteUserSuperGroupMsgsAndSetMinSeq(ctx, req.GroupID, []string{req.UserID}, 0); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -42,8 +34,5 @@ func (m *msgServer) ClearMsg(ctx context.Context, req *msg.ClearMsgReq) (*msg.Cl
|
||||
if err := m.MsgDatabase.CleanUpUserMsg(ctx, req.UserID); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
//if err := m.MsgDatabase.DelUserAllSeq(ctx, req.UserID); err != nil {
|
||||
// return nil, err
|
||||
//}
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
@@ -27,7 +27,7 @@ func (m *msgServer) SetMessageReactionExtensions(ctx context.Context, req *msg.S
|
||||
notification.ExtendMessageUpdatedNotification(req.OperationID, req.OpUserID, req.SourceID, req.SessionType, req, &resp, !req.IsReact, false)
|
||||
return resp, nil
|
||||
}
|
||||
isExists, err := m.MsgDatabase.JudgeMessageReactionEXISTS(ctx, req.ClientMsgID, req.SessionType)
|
||||
isExists, err := m.MsgDatabase.JudgeMessageReactionExist(ctx, req.ClientMsgID, req.SessionType)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -35,7 +35,7 @@ func (m *msgServer) SetMessageReactionExtensions(ctx context.Context, req *msg.S
|
||||
if !isExists {
|
||||
if !req.IsReact {
|
||||
resp.MsgFirstModifyTime = utils.GetCurrentTimestampByMill()
|
||||
for k, v := range req.ReactionExtensionList {
|
||||
for k, v := range req.ReactionExtensions {
|
||||
err := m.MessageLocker.LockMessageTypeKey(ctx, req.ClientMsgID, k)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -60,10 +60,10 @@ func (m *msgServer) SetMessageReactionExtensions(ctx context.Context, req *msg.S
|
||||
return nil, err
|
||||
}
|
||||
setValue := make(map[string]*sdkws.KeyValue)
|
||||
for k, v := range req.ReactionExtensionList {
|
||||
for k, v := range req.ReactionExtensions {
|
||||
|
||||
temp := new(sdkws.KeyValue)
|
||||
if vv, ok := mongoValue.ReactionExtensionList[k]; ok {
|
||||
if vv, ok := mongoValue.ReactionExtensions[k]; ok {
|
||||
utils.CopyStructFields(temp, &vv)
|
||||
if v.LatestUpdateTime != vv.LatestUpdateTime {
|
||||
setKeyResultInfo(&resp, 300, "message have update", req.ClientMsgID, k, temp)
|
||||
@@ -158,14 +158,14 @@ func setDeleteKeyResultInfo(r *msg.DeleteMessageListReactionExtensionsResp, errC
|
||||
_ = db.DB.UnLockMessageTypeKey(clientMsgID, typeKey)
|
||||
}
|
||||
|
||||
func (m *msgServer) GetMessageListReactionExtensions(ctx context.Context, req *msg.GetMessageListReactionExtensionsReq) (resp *msg.GetMessageListReactionExtensionsResp, err error) {
|
||||
func (m *msgServer) GetMessagesReactionExtensions(ctx context.Context, req *msg.GetMessagesReactionExtensionsReq) (resp *msg.GetMessagesReactionExtensionsResp, err error) {
|
||||
log.Debug(req.OperationID, utils.GetSelfFuncName(), "m args is:", req.String())
|
||||
var rResp msg.GetMessageListReactionExtensionsResp
|
||||
for _, messageValue := range req.MessageReactionKeyList {
|
||||
var oneMessage msg.SingleMessageExtensionResult
|
||||
oneMessage.ClientMsgID = messageValue.ClientMsgID
|
||||
|
||||
isExists, err := db.DB.JudgeMessageReactionEXISTS(messageValue.ClientMsgID, req.SessionType)
|
||||
isExists, err := db.DB.JudgeMessageReactionExist(messageValue.ClientMsgID, req.SessionType)
|
||||
if err != nil {
|
||||
rResp.ErrCode = 100
|
||||
rResp.ErrMsg = err.Error()
|
||||
@@ -218,9 +218,9 @@ func (m *msgServer) AddMessageReactionExtensions(ctx context.Context, req *msg.M
|
||||
return
|
||||
}
|
||||
|
||||
func (m *msgServer) DeleteMessageReactionExtensions(ctx context.Context, req *msg.DeleteMessageListReactionExtensionsReq) (resp *msg.DeleteMessageListReactionExtensionsResp, err error) {
|
||||
func (m *msgServer) DeleteMessageReactionExtensions(ctx context.Context, req *msg.DeleteMessagesReactionExtensionsReq) (resp *msg.DeleteMessagesReactionExtensionsResp, err error) {
|
||||
log.Debug(req.OperationID, utils.GetSelfFuncName(), "m args is:", req.String())
|
||||
var rResp msg.DeleteMessageListReactionExtensionsResp
|
||||
var rResp msg.DeleteMessagesReactionExtensionsResp
|
||||
callbackResp := notification.callbackDeleteMessageReactionExtensions(req)
|
||||
if callbackResp.ActionCode != constant.ActionAllow || callbackResp.ErrCode != 0 {
|
||||
rResp.ErrCode = int32(callbackResp.ErrCode)
|
||||
@@ -241,7 +241,7 @@ func (m *msgServer) DeleteMessageReactionExtensions(ctx context.Context, req *ms
|
||||
return &rResp, nil
|
||||
|
||||
}
|
||||
for _, v := range callbackResp.ResultReactionExtensionList {
|
||||
for _, v := range callbackResp.ResultReactionExtensions {
|
||||
if v.ErrCode != 0 {
|
||||
func(req *[]*sdkws.KeyValue, typeKey string) {
|
||||
for i := 0; i < len(*req); i++ {
|
||||
@@ -253,7 +253,7 @@ func (m *msgServer) DeleteMessageReactionExtensions(ctx context.Context, req *ms
|
||||
rResp.Result = append(rResp.Result, v)
|
||||
}
|
||||
}
|
||||
isExists, err := db.DB.JudgeMessageReactionEXISTS(req.ClientMsgID, req.SessionType)
|
||||
isExists, err := db.DB.JudgeMessageReactionExist(req.ClientMsgID, req.SessionType)
|
||||
if err != nil {
|
||||
rResp.ErrCode = 100
|
||||
rResp.ErrMsg = err.Error()
|
||||
|
||||
@@ -18,9 +18,9 @@ func CallbackSetMessageReactionExtensions(ctx context.Context, setReq *msg.SetMe
|
||||
OperationID: tracelog.GetOperationID(ctx),
|
||||
CallbackCommand: constant.CallbackBeforeSetMessageReactionExtensionCommand,
|
||||
SourceID: setReq.SourceID,
|
||||
OpUserID: setReq.OpUserID,
|
||||
OpUserID: tracelog.GetOpUserID(ctx),
|
||||
SessionType: setReq.SessionType,
|
||||
ReactionExtensionList: setReq.ReactionExtensionList,
|
||||
ReactionExtensionList: setReq.ReactionExtensions,
|
||||
ClientMsgID: setReq.ClientMsgID,
|
||||
IsReact: setReq.IsReact,
|
||||
IsExternalExtensions: setReq.IsExternalExtensions,
|
||||
@@ -34,7 +34,7 @@ func CallbackSetMessageReactionExtensions(ctx context.Context, setReq *msg.SetMe
|
||||
return nil
|
||||
}
|
||||
|
||||
func CallbackDeleteMessageReactionExtensions(setReq *msg.DeleteMessageListReactionExtensionsReq) error {
|
||||
func CallbackDeleteMessageReactionExtensions(setReq *msg.DeleteMessagesReactionExtensionsReq) error {
|
||||
if !config.Config.Callback.CallbackAfterSendGroupMsg.Enable {
|
||||
return nil
|
||||
}
|
||||
@@ -44,7 +44,7 @@ func CallbackDeleteMessageReactionExtensions(setReq *msg.DeleteMessageListReacti
|
||||
SourceID: setReq.SourceID,
|
||||
OpUserID: setReq.OpUserID,
|
||||
SessionType: setReq.SessionType,
|
||||
ReactionExtensionList: setReq.ReactionExtensionList,
|
||||
ReactionExtensionList: setReq.ReactionExtensions,
|
||||
ClientMsgID: setReq.ClientMsgID,
|
||||
IsExternalExtensions: setReq.IsExternalExtensions,
|
||||
MsgFirstModifyTime: setReq.MsgFirstModifyTime,
|
||||
@@ -53,31 +53,31 @@ func CallbackDeleteMessageReactionExtensions(setReq *msg.DeleteMessageListReacti
|
||||
return http.CallBackPostReturn(cbURL(), req, resp, config.Config.Callback.CallbackAfterSendGroupMsg)
|
||||
}
|
||||
|
||||
func CallbackGetMessageListReactionExtensions(getReq *msg.GetMessageListReactionExtensionsReq) error {
|
||||
func CallbackGetMessageListReactionExtensions(ctx context.Context, getReq *msg.GetMessagesReactionExtensionsReq) error {
|
||||
if !config.Config.Callback.CallbackAfterSendGroupMsg.Enable {
|
||||
return nil
|
||||
}
|
||||
req := &cbapi.CallbackGetMessageListReactionExtReq{
|
||||
OperationID: getReq.OperationID,
|
||||
OperationID: tracelog.GetOperationID(ctx),
|
||||
CallbackCommand: constant.CallbackGetMessageListReactionExtensionsCommand,
|
||||
SourceID: getReq.SourceID,
|
||||
OpUserID: getReq.OpUserID,
|
||||
OpUserID: tracelog.GetOperationID(ctx),
|
||||
SessionType: getReq.SessionType,
|
||||
TypeKeyList: getReq.TypeKeyList,
|
||||
MessageKeyList: getReq.MessageReactionKeyList,
|
||||
TypeKeyList: getReq.TypeKeys,
|
||||
MessageKeyList: getReq.MessageReactionKeys,
|
||||
}
|
||||
resp := &cbapi.CallbackGetMessageListReactionExtResp{}
|
||||
return http.CallBackPostReturn(cbURL(), req, resp, config.Config.Callback.CallbackAfterSendGroupMsg)
|
||||
}
|
||||
|
||||
func CallbackAddMessageReactionExtensions(setReq *msg.ModifyMessageReactionExtensionsReq) error {
|
||||
func CallbackAddMessageReactionExtensions(ctx context.Context, setReq *msg.ModifyMessageReactionExtensionsReq) error {
|
||||
req := &cbapi.CallbackAddMessageReactionExtReq{
|
||||
OperationID: setReq.OperationID,
|
||||
OperationID: tracelog.GetOperationID(ctx),
|
||||
CallbackCommand: constant.CallbackAddMessageListReactionExtensionsCommand,
|
||||
SourceID: setReq.SourceID,
|
||||
OpUserID: setReq.OpUserID,
|
||||
OpUserID: tracelog.GetOperationID(ctx),
|
||||
SessionType: setReq.SessionType,
|
||||
ReactionExtensionList: setReq.ReactionExtensionList,
|
||||
ReactionExtensionList: setReq.ReactionExtensions,
|
||||
ClientMsgID: setReq.ClientMsgID,
|
||||
IsReact: setReq.IsReact,
|
||||
IsExternalExtensions: setReq.IsExternalExtensions,
|
||||
|
||||
+15
-12
@@ -1,6 +1,7 @@
|
||||
package msg
|
||||
|
||||
import (
|
||||
"OpenIM/pkg/common/db/cache"
|
||||
"context"
|
||||
"time"
|
||||
)
|
||||
@@ -13,14 +14,16 @@ type MessageLocker interface {
|
||||
LockGlobalMessage(ctx context.Context, clientMsgID string) (err error)
|
||||
UnLockGlobalMessage(ctx context.Context, clientMsgID string) (err error)
|
||||
}
|
||||
type LockerMessage struct{}
|
||||
|
||||
func NewLockerMessage() *LockerMessage {
|
||||
return &LockerMessage{}
|
||||
type LockerMessage struct {
|
||||
cache cache.Model
|
||||
}
|
||||
func (l *LockerMessage) LockMessageTypeKey(clientMsgID, typeKey string) (err error) {
|
||||
|
||||
func NewLockerMessage(cache cache.Model) *LockerMessage {
|
||||
return &LockerMessage{cache: cache}
|
||||
}
|
||||
func (l *LockerMessage) LockMessageTypeKey(ctx context.Context, clientMsgID, typeKey string) (err error) {
|
||||
for i := 0; i < 3; i++ {
|
||||
err = db.DB.LockMessageTypeKey(clientMsgID, typeKey)
|
||||
err = l.cache.LockMessageTypeKey(ctx, clientMsgID, typeKey)
|
||||
if err != nil {
|
||||
time.Sleep(time.Millisecond * 100)
|
||||
continue
|
||||
@@ -31,9 +34,9 @@ func (l *LockerMessage) LockMessageTypeKey(clientMsgID, typeKey string) (err err
|
||||
return err
|
||||
|
||||
}
|
||||
func (l *LockerMessage) LockGlobalMessage(clientMsgID string) (err error) {
|
||||
func (l *LockerMessage) LockGlobalMessage(ctx context.Context, clientMsgID string) (err error) {
|
||||
for i := 0; i < 3; i++ {
|
||||
err = db.DB.LockMessageTypeKey(clientMsgID, GlOBLLOCK)
|
||||
err = l.cache.LockMessageTypeKey(ctx, clientMsgID, GlOBLLOCK)
|
||||
if err != nil {
|
||||
time.Sleep(time.Millisecond * 100)
|
||||
continue
|
||||
@@ -44,9 +47,9 @@ func (l *LockerMessage) LockGlobalMessage(clientMsgID string) (err error) {
|
||||
return err
|
||||
|
||||
}
|
||||
func (l *LockerMessage) UnLockMessageTypeKey(clientMsgID string, typeKey string) error {
|
||||
return db.DB.UnLockMessageTypeKey(clientMsgID, typeKey)
|
||||
func (l *LockerMessage) UnLockMessageTypeKey(ctx context.Context, clientMsgID string, typeKey string) error {
|
||||
return l.cache.UnLockMessageTypeKey(ctx, clientMsgID, typeKey)
|
||||
}
|
||||
func (l *LockerMessage) UnLockGlobalMessage(clientMsgID string) error {
|
||||
return db.DB.UnLockMessageTypeKey(clientMsgID, GlOBLLOCK)
|
||||
func (l *LockerMessage) UnLockGlobalMessage(ctx context.Context, clientMsgID string) error {
|
||||
return l.cache.UnLockMessageTypeKey(ctx, clientMsgID, GlOBLLOCK)
|
||||
}
|
||||
|
||||
@@ -1,7 +0,0 @@
|
||||
package msg
|
||||
|
||||
import "context"
|
||||
|
||||
func DeleteMessageNotification(ctx context.Context, userID string, seqs []uint32) {
|
||||
panic("todo")
|
||||
}
|
||||
@@ -3,7 +3,6 @@ package msg
|
||||
import (
|
||||
"OpenIM/pkg/common/config"
|
||||
"OpenIM/pkg/common/constant"
|
||||
"OpenIM/pkg/common/tracelog"
|
||||
"OpenIM/pkg/proto/msg"
|
||||
"OpenIM/pkg/proto/sdkws"
|
||||
"OpenIM/pkg/utils"
|
||||
@@ -34,27 +33,6 @@ type MessageRevoked struct {
|
||||
SessionType int32 `json:"sessionType"`
|
||||
Seq uint32 `json:"seq"`
|
||||
}
|
||||
type MsgCallBackReq struct {
|
||||
SendID string `json:"sendID"`
|
||||
RecvID string `json:"recvID"`
|
||||
Content string `json:"content"`
|
||||
SendTime int64 `json:"sendTime"`
|
||||
MsgFrom int32 `json:"msgFrom"`
|
||||
ContentType int32 `json:"contentType"`
|
||||
SessionType int32 `json:"sessionType"`
|
||||
PlatformID int32 `json:"senderPlatformID"`
|
||||
MsgID string `json:"msgID"`
|
||||
IsOnlineOnly bool `json:"isOnlineOnly"`
|
||||
}
|
||||
type MsgCallBackResp struct {
|
||||
ErrCode int32 `json:"errCode"`
|
||||
ErrMsg string `json:"errMsg"`
|
||||
ResponseErrCode int32 `json:"responseErrCode"`
|
||||
ResponseResult struct {
|
||||
ModifiedMsg string `json:"modifiedMsg"`
|
||||
Ext string `json:"ext"`
|
||||
}
|
||||
}
|
||||
|
||||
func (m *msgServer) userIsMuteAndIsAdminInGroup(ctx context.Context, groupID, userID string) (isMute bool, err error) {
|
||||
groupMemberInfo, err := m.Group.GetGroupMemberInfo(ctx, groupID, userID)
|
||||
@@ -331,7 +309,7 @@ func valueCopy(pb *msg.SendMsgReq) *msg.SendMsgReq {
|
||||
}
|
||||
|
||||
func (m *msgServer) sendMsgToGroupOptimization(ctx context.Context, list []string, groupPB *msg.SendMsgReq, wg *sync.WaitGroup) error {
|
||||
msgToMQGroup := msg.MsgDataToMQ{OperationID: tracelog.GetOperationID(ctx), MsgData: groupPB.MsgData}
|
||||
msgToMQGroup := msg.MsgDataToMQ{MsgData: groupPB.MsgData}
|
||||
tempOptions := make(map[string]bool, 1)
|
||||
for k, v := range groupPB.MsgData.Options {
|
||||
tempOptions[k] = v
|
||||
|
||||
@@ -52,10 +52,11 @@ func (m *msgServer) sendMsgNotification(ctx context.Context, req *msg.SendMsgReq
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
resp.SendTime = msgToMQSingle.MsgData.SendTime
|
||||
resp.ServerMsgID = msgToMQSingle.MsgData.ServerMsgID
|
||||
resp.ClientMsgID = msgToMQSingle.MsgData.ClientMsgID
|
||||
resp = &msg.SendMsgResp{
|
||||
ServerMsgID: msgToMQSingle.MsgData.ServerMsgID,
|
||||
ClientMsgID: msgToMQSingle.MsgData.ClientMsgID,
|
||||
SendTime: msgToMQSingle.MsgData.SendTime,
|
||||
}
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
@@ -90,9 +91,11 @@ func (m *msgServer) sendMsgSingleChat(ctx context.Context, req *msg.SendMsgReq)
|
||||
return nil, err
|
||||
}
|
||||
promePkg.Inc(promePkg.SingleChatMsgProcessSuccessCounter)
|
||||
resp.SendTime = msgToMQSingle.MsgData.SendTime
|
||||
resp.ServerMsgID = msgToMQSingle.MsgData.ServerMsgID
|
||||
resp.ClientMsgID = msgToMQSingle.MsgData.ClientMsgID
|
||||
resp = &msg.SendMsgResp{
|
||||
ServerMsgID: msgToMQSingle.MsgData.ServerMsgID,
|
||||
ClientMsgID: msgToMQSingle.MsgData.ClientMsgID,
|
||||
SendTime: msgToMQSingle.MsgData.SendTime,
|
||||
}
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
@@ -265,9 +268,9 @@ func (m *msgServer) GetMaxAndMinSeq(ctx context.Context, req *sdkws.GetMaxAndMin
|
||||
}
|
||||
resp.MaxSeq = maxSeq
|
||||
resp.MinSeq = minSeq
|
||||
if len(req.GroupIDList) > 0 {
|
||||
if len(req.GroupIDs) > 0 {
|
||||
resp.GroupMaxAndMinSeq = make(map[string]*sdkws.MaxAndMinSeq)
|
||||
for _, groupID := range req.GroupIDList {
|
||||
for _, groupID := range req.GroupIDs {
|
||||
maxSeq, err := m.MsgDatabase.GetGroupMaxSeq(ctx, groupID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -285,19 +288,19 @@ func (m *msgServer) GetMaxAndMinSeq(ctx context.Context, req *sdkws.GetMaxAndMin
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
func (m *msgServer) PullMessageBySeqList(ctx context.Context, req *sdkws.PullMessageBySeqListReq) (*sdkws.PullMessageBySeqListResp, error) {
|
||||
resp := &sdkws.PullMessageBySeqListResp{GroupMsgDataList: make(map[string]*sdkws.MsgDataList)}
|
||||
func (m *msgServer) PullMessageBySeqs(ctx context.Context, req *sdkws.PullMessageBySeqsReq) (*sdkws.PullMessageBySeqsResp, error) {
|
||||
resp := &sdkws.PullMessageBySeqsResp{GroupMsgDataList: make(map[string]*sdkws.MsgDataList)}
|
||||
msgs, err := m.MsgDatabase.GetMsgBySeqs(ctx, req.UserID, req.Seqs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
resp.List = msgs
|
||||
for userID, list := range req.GroupSeqList {
|
||||
msgs, err := m.MsgDatabase.GetMsgBySeqs(ctx, userID, req.Seqs)
|
||||
for groupID, list := range req.GroupSeqs {
|
||||
msgs, err := m.MsgDatabase.GetSuperGroupMsgBySeqs(ctx, groupID, list.Seqs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
resp.GroupMsgDataList[userID] = &sdkws.MsgDataList{
|
||||
resp.GroupMsgDataList[groupID] = &sdkws.MsgDataList{
|
||||
MsgDataList: msgs,
|
||||
}
|
||||
}
|
||||
|
||||
+30
-18
@@ -2,10 +2,10 @@ package msg
|
||||
|
||||
import (
|
||||
"OpenIM/internal/common/check"
|
||||
"OpenIM/pkg/common/db/cache"
|
||||
"OpenIM/pkg/common/db/controller"
|
||||
"OpenIM/pkg/common/db/localcache"
|
||||
"OpenIM/pkg/common/db/relation"
|
||||
relationTb "OpenIM/pkg/common/db/table/relation"
|
||||
"OpenIM/pkg/common/db/unrelation"
|
||||
"OpenIM/pkg/common/prome"
|
||||
"OpenIM/pkg/discoveryregistry"
|
||||
"OpenIM/pkg/proto/msg"
|
||||
@@ -13,34 +13,46 @@ import (
|
||||
)
|
||||
|
||||
type msgServer struct {
|
||||
RegisterCenter discoveryregistry.SvcDiscoveryRegistry
|
||||
MsgDatabase controller.MsgDatabase
|
||||
Group *check.GroupChecker
|
||||
User *check.UserCheck
|
||||
Conversation *check.ConversationChecker
|
||||
friend *check.FriendChecker
|
||||
RegisterCenter discoveryregistry.SvcDiscoveryRegistry
|
||||
MsgDatabase controller.MsgDatabase
|
||||
ExtendMsgDatabase controller.ExtendMsgDatabase
|
||||
Group *check.GroupChecker
|
||||
User *check.UserCheck
|
||||
Conversation *check.ConversationChecker
|
||||
friend *check.FriendChecker
|
||||
*localcache.GroupLocalCache
|
||||
black *check.BlackChecker
|
||||
MessageLocker MessageLocker
|
||||
}
|
||||
|
||||
func Start(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error {
|
||||
mysql, err := relation.NewGormDB()
|
||||
rdb, err := cache.NewRedis()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := mysql.AutoMigrate(&relationTb.UserModel{}); err != nil {
|
||||
mongo, err := unrelation.NewMongo()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
cacheModel := cache.NewCacheModel(rdb)
|
||||
msgDocModel := unrelation.NewMsgMongoDriver(mongo.GetDatabase())
|
||||
extendMsgModel := unrelation.NewExtendMsgSetMongoDriver(mongo.GetDatabase())
|
||||
|
||||
extendMsgDatabase := controller.NewExtendMsgDatabase(extendMsgModel)
|
||||
msgDatabase := controller.NewMsgDatabase(msgDocModel, cacheModel)
|
||||
|
||||
s := &msgServer{
|
||||
Conversation: check.NewConversationChecker(client),
|
||||
User: check.NewUserCheck(client),
|
||||
Group: check.NewGroupChecker(client),
|
||||
//MsgDatabase: controller.MsgDatabase(),
|
||||
RegisterCenter: client,
|
||||
GroupLocalCache: localcache.NewGroupMemberIDsLocalCache(client),
|
||||
black: check.NewBlackChecker(client),
|
||||
friend: check.NewFriendChecker(client),
|
||||
Conversation: check.NewConversationChecker(client),
|
||||
User: check.NewUserCheck(client),
|
||||
Group: check.NewGroupChecker(client),
|
||||
MsgDatabase: msgDatabase,
|
||||
ExtendMsgDatabase: extendMsgDatabase,
|
||||
RegisterCenter: client,
|
||||
GroupLocalCache: localcache.NewGroupLocalCache(client),
|
||||
black: check.NewBlackChecker(client),
|
||||
friend: check.NewFriendChecker(client),
|
||||
MessageLocker: NewLockerMessage(cacheModel),
|
||||
}
|
||||
s.initPrometheus()
|
||||
msg.RegisterMsgServer(server, s)
|
||||
|
||||
@@ -1 +1,28 @@
|
||||
package third
|
||||
|
||||
import (
|
||||
"OpenIM/pkg/proto/third"
|
||||
"context"
|
||||
)
|
||||
|
||||
func (t *thirdServer) GetSignalInvitationInfo(ctx context.Context, req *third.GetSignalInvitationInfoReq) (resp *third.GetSignalInvitationInfoResp, err error) {
|
||||
signalReq, err := t.thirdDatabase.GetSignalInvitationInfoByClientMsgID(ctx, req.ClientMsgID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
resp = &third.GetSignalInvitationInfoResp{}
|
||||
resp.InvitationInfo = signalReq.Invitation
|
||||
resp.OfflinePushInfo = signalReq.OfflinePushInfo
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
func (t *thirdServer) GetSignalInvitationInfoStartApp(ctx context.Context, req *third.GetSignalInvitationInfoStartAppReq) (resp *third.GetSignalInvitationInfoStartAppResp, err error) {
|
||||
signalReq, err := t.thirdDatabase.GetAvailableSignalInvitationInfo(ctx, req.UserID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
resp = &third.GetSignalInvitationInfoStartAppResp{}
|
||||
resp.InvitationInfo = signalReq.Invitation
|
||||
resp.OfflinePushInfo = signalReq.OfflinePushInfo
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
@@ -7,13 +7,13 @@ import (
|
||||
"OpenIM/pkg/common/db/obj"
|
||||
"OpenIM/pkg/common/db/relation"
|
||||
relationTb "OpenIM/pkg/common/db/table/relation"
|
||||
"OpenIM/pkg/discoveryregistry"
|
||||
"OpenIM/pkg/proto/third"
|
||||
"context"
|
||||
"github.com/OpenIMSDK/openKeeper"
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
func Start(client *openKeeper.ZkClient, server *grpc.Server) error {
|
||||
func Start(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error {
|
||||
rdb, err := cache.NewRedis()
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -30,7 +30,7 @@ func Start(client *openKeeper.ZkClient, server *grpc.Server) error {
|
||||
return err
|
||||
}
|
||||
third.RegisterThirdServer(server, &thirdServer{
|
||||
thirdDatabase: controller.NewThirdDatabase(cache.NewCache(rdb)),
|
||||
thirdDatabase: controller.NewThirdDatabase(cache.NewCacheModel(rdb)),
|
||||
userCheck: check.NewUserCheck(client),
|
||||
s3dataBase: controller.NewS3Database(o, relation.NewObjectHash(db), relation.NewObjectInfo(db), relation.NewObjectPut(db)),
|
||||
})
|
||||
@@ -43,28 +43,6 @@ type thirdServer struct {
|
||||
userCheck *check.UserCheck
|
||||
}
|
||||
|
||||
func (t *thirdServer) GetSignalInvitationInfo(ctx context.Context, req *third.GetSignalInvitationInfoReq) (resp *third.GetSignalInvitationInfoResp, err error) {
|
||||
signalReq, err := t.thirdDatabase.GetSignalInvitationInfoByClientMsgID(ctx, req.ClientMsgID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
resp = &third.GetSignalInvitationInfoResp{}
|
||||
resp.InvitationInfo = signalReq.Invitation
|
||||
resp.OfflinePushInfo = signalReq.OfflinePushInfo
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
func (t *thirdServer) GetSignalInvitationInfoStartApp(ctx context.Context, req *third.GetSignalInvitationInfoStartAppReq) (resp *third.GetSignalInvitationInfoStartAppResp, err error) {
|
||||
signalReq, err := t.thirdDatabase.GetAvailableSignalInvitationInfo(ctx, req.UserID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
resp = &third.GetSignalInvitationInfoStartAppResp{}
|
||||
resp.InvitationInfo = signalReq.Invitation
|
||||
resp.OfflinePushInfo = signalReq.OfflinePushInfo
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
func (t *thirdServer) FcmUpdateToken(ctx context.Context, req *third.FcmUpdateTokenReq) (resp *third.FcmUpdateTokenResp, err error) {
|
||||
err = t.thirdDatabase.FcmUpdateToken(ctx, req.Account, int(req.PlatformID), req.FcmToken, req.ExpireTime)
|
||||
if err != nil {
|
||||
|
||||
@@ -16,7 +16,6 @@ import (
|
||||
pbuser "OpenIM/pkg/proto/user"
|
||||
"OpenIM/pkg/utils"
|
||||
"context"
|
||||
"github.com/OpenIMSDK/openKeeper"
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
@@ -30,11 +29,11 @@ type userServer struct {
|
||||
}
|
||||
|
||||
func Start(client registry.SvcDiscoveryRegistry, server *grpc.Server) error {
|
||||
gormDB, err := relation.NewGormDB()
|
||||
db, err := relation.NewGormDB()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := gormDB.AutoMigrate(&tablerelation.UserModel{}); err != nil {
|
||||
if err := db.AutoMigrate(&tablerelation.UserModel{}); err != nil {
|
||||
return err
|
||||
}
|
||||
users := make([]*tablerelation.UserModel, 0)
|
||||
@@ -45,14 +44,13 @@ func Start(client registry.SvcDiscoveryRegistry, server *grpc.Server) error {
|
||||
users = append(users, &tablerelation.UserModel{UserID: v, Nickname: config.Config.Manager.Nickname[k]})
|
||||
}
|
||||
u := &userServer{
|
||||
UserDatabase: controller.NewUserDatabase(relation.NewUserGorm(gormDB)),
|
||||
UserDatabase: controller.NewUserDatabase(relation.NewUserGorm(db)),
|
||||
notification: notification.NewCheck(client),
|
||||
userCheck: check.NewUserCheck(client),
|
||||
RegisterCenter: client,
|
||||
}
|
||||
pbuser.RegisterUserServer(server, u)
|
||||
u.UserDatabase.InitOnce(context.Background(), users)
|
||||
return nil
|
||||
return u.UserDatabase.InitOnce(context.Background(), users)
|
||||
}
|
||||
|
||||
// ok
|
||||
|
||||
@@ -1,178 +0,0 @@
|
||||
package task
|
||||
|
||||
import (
|
||||
"OpenIM/pkg/common/config"
|
||||
"OpenIM/pkg/common/constant"
|
||||
"OpenIM/pkg/common/db/controller"
|
||||
"OpenIM/pkg/common/log"
|
||||
"OpenIM/pkg/common/tracelog"
|
||||
"OpenIM/pkg/utils"
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/go-redis/redis/v8"
|
||||
"math"
|
||||
)
|
||||
|
||||
type msgTool struct {
|
||||
msgInterface controller.MsgDatabase
|
||||
userInterface controller.UserDatabase
|
||||
groupDatabase controller.GroupDatabase
|
||||
}
|
||||
|
||||
func (c *msgTool) getCronTaskOperationID() string {
|
||||
return cronTaskOperationID + utils.OperationIDGenerator()
|
||||
}
|
||||
|
||||
func (c *msgTool) ClearMsgAndFixSeq() {
|
||||
operationID := c.getCronTaskOperationID()
|
||||
ctx := context.Background()
|
||||
tracelog.SetOperationID(ctx, operationID)
|
||||
log.NewInfo(operationID, "============================ start del cron task ============================")
|
||||
var err error
|
||||
userIDList, err := c.userInterface.GetAllUserID(ctx)
|
||||
if err == nil {
|
||||
c.ClearUsersMsg(ctx, userIDList)
|
||||
} else {
|
||||
log.NewError(operationID, utils.GetSelfFuncName(), err.Error())
|
||||
}
|
||||
// working group msg clear
|
||||
superGroupIDList, err := c.groupDatabase.GetGroupIDsByGroupType(ctx, constant.WorkingGroup)
|
||||
if err == nil {
|
||||
c.ClearSuperGroupMsg(ctx, superGroupIDList)
|
||||
} else {
|
||||
log.NewError(operationID, utils.GetSelfFuncName(), err.Error())
|
||||
}
|
||||
log.NewInfo(operationID, "============================ start del cron finished ============================")
|
||||
}
|
||||
|
||||
func (c *msgTool) ClearUsersMsg(ctx context.Context, userIDs []string) {
|
||||
for _, userID := range userIDs {
|
||||
if err := c.msgInterface.DeleteUserMsgsAndSetMinSeq(ctx, userID, int64(config.Config.Mongo.DBRetainChatRecords*24*60*60)); err != nil {
|
||||
log.NewError(tracelog.GetOperationID(ctx), utils.GetSelfFuncName(), err.Error(), userID)
|
||||
}
|
||||
_, maxSeqMongo, minSeqCache, maxSeqCache, err := c.msgInterface.GetUserMinMaxSeqInMongoAndCache(ctx, userID)
|
||||
if err != nil {
|
||||
log.NewError(tracelog.GetOperationID(ctx), utils.GetSelfFuncName(), err.Error(), "GetUserMinMaxSeqInMongoAndCache failed", userID)
|
||||
continue
|
||||
}
|
||||
c.FixUserSeq(ctx, userID, minSeqCache, maxSeqCache)
|
||||
c.CheckMaxSeqWithMongo(ctx, userID, maxSeqCache, maxSeqMongo, constant.WriteDiffusion)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *msgTool) ClearSuperGroupMsg(ctx context.Context, superGroupIDs []string) {
|
||||
for _, groupID := range superGroupIDs {
|
||||
userIDs, err := c.groupDatabase.FindGroupMemberUserID(ctx, groupID)
|
||||
if err != nil {
|
||||
log.NewError(tracelog.GetOperationID(ctx), utils.GetSelfFuncName(), "FindGroupMemberUserID", err.Error(), groupID)
|
||||
continue
|
||||
}
|
||||
if err := c.msgInterface.DeleteUserSuperGroupMsgsAndSetMinSeq(ctx, groupID, userIDs, int64(config.Config.Mongo.DBRetainChatRecords*24*60*60)); err != nil {
|
||||
log.NewError(tracelog.GetOperationID(ctx), utils.GetSelfFuncName(), err.Error(), "DeleteUserSuperGroupMsgsAndSetMinSeq failed", groupID, userIDs, config.Config.Mongo.DBRetainChatRecords)
|
||||
}
|
||||
_, maxSeqMongo, maxSeqCache, err := c.msgInterface.GetSuperGroupMinMaxSeqInMongoAndCache(ctx, groupID)
|
||||
if err != nil {
|
||||
log.NewError(tracelog.GetOperationID(ctx), utils.GetSelfFuncName(), err.Error(), "GetUserMinMaxSeqInMongoAndCache failed", groupID)
|
||||
continue
|
||||
}
|
||||
for _, userID := range userIDs {
|
||||
minSeqCache, err := c.msgInterface.GetGroupUserMinSeq(ctx, groupID, userID)
|
||||
if err != nil {
|
||||
log.NewError(tracelog.GetOperationID(ctx), "GetGroupUserMinSeq failed", groupID, userID)
|
||||
continue
|
||||
}
|
||||
c.FixGroupUserSeq(ctx, userID, groupID, minSeqCache, maxSeqCache)
|
||||
|
||||
}
|
||||
c.CheckMaxSeqWithMongo(ctx, groupID, maxSeqCache, maxSeqMongo, constant.WriteDiffusion)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *msgTool) FixUserSeq(ctx context.Context, userID string, minSeqCache, maxSeqCache int64) {
|
||||
if minSeqCache > maxSeqCache {
|
||||
if err := c.msgInterface.SetUserMinSeq(ctx, userID, maxSeqCache); err != nil {
|
||||
log.NewError(tracelog.GetOperationID(ctx), "SetUserMinSeq failed", userID, minSeqCache, maxSeqCache)
|
||||
} else {
|
||||
log.NewWarn(tracelog.GetOperationID(ctx), "SetUserMinSeq success", userID, minSeqCache, maxSeqCache)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *msgTool) FixGroupUserSeq(ctx context.Context, userID string, groupID string, minSeqCache, maxSeqCache int64) {
|
||||
if minSeqCache > maxSeqCache {
|
||||
if err := c.msgInterface.SetGroupUserMinSeq(ctx, groupID, userID, maxSeqCache); err != nil {
|
||||
log.NewError(tracelog.GetOperationID(ctx), "SetGroupUserMinSeq failed", userID, minSeqCache, maxSeqCache)
|
||||
} else {
|
||||
log.NewWarn(tracelog.GetOperationID(ctx), "SetGroupUserMinSeq success", userID, minSeqCache, maxSeqCache)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *msgTool) CheckMaxSeqWithMongo(ctx context.Context, sourceID string, maxSeqCache, maxSeqMongo int64, diffusionType int) {
|
||||
if math.Abs(float64(maxSeqMongo-maxSeqCache)) > 10 {
|
||||
log.NewWarn(tracelog.GetOperationID(ctx), "cache max seq and mongo max seq is diff > 10", sourceID, maxSeqCache, maxSeqMongo, diffusionType)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *msgTool) ShowUserSeqs(ctx context.Context, userID string) {
|
||||
|
||||
}
|
||||
|
||||
func (c *msgTool) ShowSuperGroupSeqs(ctx context.Context, groupID string) {
|
||||
|
||||
}
|
||||
|
||||
func (c *msgTool) FixAllSeq(ctx context.Context) {
|
||||
userIDs, err := c.userInterface.GetAllUserID(ctx)
|
||||
if err != nil {
|
||||
panic(err.Error())
|
||||
}
|
||||
for _, userID := range userIDs {
|
||||
userCurrentMinSeq, err := c.msgInterface.GetUserMinSeq(ctx, userID)
|
||||
if err != nil && err != redis.Nil {
|
||||
continue
|
||||
}
|
||||
userCurrentMaxSeq, err := c.msgInterface.GetUserMaxSeq(ctx, userID)
|
||||
if err != nil && err != redis.Nil {
|
||||
continue
|
||||
}
|
||||
if userCurrentMinSeq > userCurrentMaxSeq {
|
||||
if err = c.msgInterface.SetUserMinSeq(ctx, userID, userCurrentMaxSeq); err != nil {
|
||||
fmt.Println("SetUserMinSeq failed", userID, userCurrentMaxSeq)
|
||||
}
|
||||
fmt.Println("fix", userID, userCurrentMaxSeq)
|
||||
}
|
||||
}
|
||||
fmt.Println("fix users seq success")
|
||||
|
||||
groupIDs, err := c.groupDatabase.GetGroupIDsByGroupType(ctx, constant.WorkingGroup)
|
||||
if err != nil {
|
||||
panic(err.Error())
|
||||
}
|
||||
for _, groupID := range groupIDs {
|
||||
maxSeq, err := c.msgInterface.GetGroupMaxSeq(ctx, groupID)
|
||||
if err != nil {
|
||||
fmt.Println("GetGroupMaxSeq failed", groupID)
|
||||
continue
|
||||
}
|
||||
userIDs, err := c.groupDatabase.FindGroupMemberUserID(ctx, groupID)
|
||||
if err != nil {
|
||||
fmt.Println("get groupID", groupID, "failed, try again later")
|
||||
continue
|
||||
}
|
||||
for _, userID := range userIDs {
|
||||
userMinSeq, err := c.msgInterface.GetGroupUserMinSeq(ctx, groupID, userID)
|
||||
if err != nil && err != redis.Nil {
|
||||
fmt.Println("GetGroupUserMinSeq failed", groupID, userID)
|
||||
continue
|
||||
}
|
||||
if userMinSeq > maxSeq {
|
||||
if err = c.msgInterface.SetGroupUserMinSeq(ctx, groupID, userID, maxSeq); err != nil {
|
||||
fmt.Println("SetGroupUserMinSeq failed", err.Error(), groupID, userID, maxSeq)
|
||||
}
|
||||
fmt.Println("fix", groupID, userID, maxSeq, userMinSeq)
|
||||
}
|
||||
}
|
||||
}
|
||||
fmt.Println("fix all seq finished")
|
||||
}
|
||||
@@ -1,56 +0,0 @@
|
||||
package task
|
||||
|
||||
import (
|
||||
"OpenIM/pkg/common/config"
|
||||
"OpenIM/pkg/common/log"
|
||||
"OpenIM/pkg/common/tracelog"
|
||||
"OpenIM/pkg/utils"
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/robfig/cron/v3"
|
||||
)
|
||||
|
||||
const cronTaskOperationID = "cronTaskOperationID-"
|
||||
const moduleName = "cron"
|
||||
|
||||
func StartCronTask() error {
|
||||
log.NewPrivateLog(moduleName)
|
||||
log.NewInfo(utils.OperationIDGenerator(), "start cron task", "cron config", config.Config.Mongo.ChatRecordsClearTime)
|
||||
fmt.Println("cron task start, config", config.Config.Mongo.ChatRecordsClearTime)
|
||||
clearCronTask := msgTool{}
|
||||
ctx := context.Background()
|
||||
operationID := clearCronTask.getCronTaskOperationID()
|
||||
tracelog.SetOperationID(ctx, operationID)
|
||||
c := cron.New()
|
||||
_, err := c.AddFunc(config.Config.Mongo.ChatRecordsClearTime, clearCronTask.ClearMsgAndFixSeq)
|
||||
if err != nil {
|
||||
fmt.Println("start cron failed", err.Error(), config.Config.Mongo.ChatRecordsClearTime)
|
||||
return err
|
||||
}
|
||||
c.Start()
|
||||
fmt.Println("start cron task success")
|
||||
for {
|
||||
time.Sleep(10 * time.Second)
|
||||
}
|
||||
}
|
||||
|
||||
func FixSeq(userID, superGroupID string, fixAllSeq bool) {
|
||||
log.NewPrivateLog(moduleName)
|
||||
log.NewInfo(utils.OperationIDGenerator(), "start cron task", "cron config", config.Config.Mongo.ChatRecordsClearTime)
|
||||
clearCronTask := msgTool{}
|
||||
ctx := context.Background()
|
||||
operationID := clearCronTask.getCronTaskOperationID()
|
||||
tracelog.SetOperationID(ctx, operationID)
|
||||
if userID != "" {
|
||||
clearCronTask.ClearUsersMsg(ctx, []string{userID})
|
||||
}
|
||||
if superGroupID != "" {
|
||||
clearCronTask.ClearSuperGroupMsg(ctx, []string{superGroupID})
|
||||
}
|
||||
if fixAllSeq {
|
||||
clearCronTask.FixAllSeq(ctx)
|
||||
}
|
||||
fmt.Println("fix seq finished")
|
||||
}
|
||||
@@ -0,0 +1,39 @@
|
||||
package tools
|
||||
|
||||
import (
|
||||
"OpenIM/pkg/common/config"
|
||||
"OpenIM/pkg/common/log"
|
||||
"OpenIM/pkg/common/tracelog"
|
||||
"OpenIM/pkg/utils"
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/robfig/cron/v3"
|
||||
"sync"
|
||||
)
|
||||
|
||||
const cronTaskOperationID = "cronTaskOperationID-"
|
||||
const moduleName = "cron"
|
||||
|
||||
func StartCronTask() error {
|
||||
log.NewPrivateLog(moduleName)
|
||||
log.NewInfo(utils.OperationIDGenerator(), "start cron task", "cron config", config.Config.Mongo.ChatRecordsClearTime)
|
||||
fmt.Println("cron task start, config", config.Config.Mongo.ChatRecordsClearTime)
|
||||
msgTool, err := InitMsgTool()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
ctx := context.Background()
|
||||
operationID := msgTool.getCronTaskOperationID()
|
||||
tracelog.SetOperationID(ctx, operationID)
|
||||
c := cron.New()
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
_, err = c.AddFunc(config.Config.Mongo.ChatRecordsClearTime, msgTool.AllUserClearMsgAndFixSeq)
|
||||
if err != nil {
|
||||
fmt.Println("start cron failed", err.Error(), config.Config.Mongo.ChatRecordsClearTime)
|
||||
return err
|
||||
}
|
||||
c.Start()
|
||||
wg.Wait()
|
||||
return nil
|
||||
}
|
||||
@@ -0,0 +1,235 @@
|
||||
package tools
|
||||
|
||||
import (
|
||||
"OpenIM/pkg/common/config"
|
||||
"OpenIM/pkg/common/constant"
|
||||
"OpenIM/pkg/common/db/cache"
|
||||
"OpenIM/pkg/common/db/controller"
|
||||
"OpenIM/pkg/common/db/relation"
|
||||
"OpenIM/pkg/common/db/unrelation"
|
||||
"OpenIM/pkg/common/log"
|
||||
"OpenIM/pkg/common/tracelog"
|
||||
"OpenIM/pkg/utils"
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/go-redis/redis/v8"
|
||||
"math"
|
||||
)
|
||||
|
||||
type MsgTool struct {
|
||||
msgDatabase controller.MsgDatabase
|
||||
userDatabase controller.UserDatabase
|
||||
groupDatabase controller.GroupDatabase
|
||||
}
|
||||
|
||||
func NewMsgTool(msgDatabase controller.MsgDatabase, userDatabase controller.UserDatabase, groupDatabase controller.GroupDatabase) *MsgTool {
|
||||
return &MsgTool{
|
||||
msgDatabase: msgDatabase,
|
||||
userDatabase: userDatabase,
|
||||
groupDatabase: groupDatabase,
|
||||
}
|
||||
}
|
||||
|
||||
func InitMsgTool() (*MsgTool, error) {
|
||||
rdb, err := cache.NewRedis()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
mongo, err := unrelation.NewMongo()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
db, err := relation.NewGormDB()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
msgDatabase := controller.InitMsgDatabase(rdb, mongo.GetDatabase())
|
||||
userDatabase := controller.NewUserDatabase(relation.NewUserGorm(db))
|
||||
groupDatabase := controller.InitGroupDatabase(db, rdb, mongo.GetDatabase())
|
||||
msgTool := NewMsgTool(msgDatabase, userDatabase, groupDatabase)
|
||||
return msgTool, nil
|
||||
}
|
||||
|
||||
func (c *MsgTool) getCronTaskOperationID() string {
|
||||
return cronTaskOperationID + utils.OperationIDGenerator()
|
||||
}
|
||||
|
||||
func (c *MsgTool) AllUserClearMsgAndFixSeq() {
|
||||
operationID := c.getCronTaskOperationID()
|
||||
ctx := context.Background()
|
||||
tracelog.SetOperationID(ctx, operationID)
|
||||
log.NewInfo(operationID, "============================ start del cron task ============================")
|
||||
var err error
|
||||
userIDList, err := c.userDatabase.GetAllUserID(ctx)
|
||||
if err == nil {
|
||||
c.ClearUsersMsg(ctx, userIDList)
|
||||
} else {
|
||||
log.NewError(operationID, utils.GetSelfFuncName(), err.Error())
|
||||
}
|
||||
// working group msg clear
|
||||
superGroupIDList, err := c.groupDatabase.GetGroupIDsByGroupType(ctx, constant.WorkingGroup)
|
||||
if err == nil {
|
||||
c.ClearSuperGroupMsg(ctx, superGroupIDList)
|
||||
} else {
|
||||
log.NewError(operationID, utils.GetSelfFuncName(), err.Error())
|
||||
}
|
||||
log.NewInfo(operationID, "============================ start del cron finished ============================")
|
||||
}
|
||||
|
||||
func (c *MsgTool) ClearUsersMsg(ctx context.Context, userIDs []string) {
|
||||
for _, userID := range userIDs {
|
||||
if err := c.msgDatabase.DeleteUserMsgsAndSetMinSeq(ctx, userID, int64(config.Config.Mongo.DBRetainChatRecords*24*60*60)); err != nil {
|
||||
log.NewError(tracelog.GetOperationID(ctx), utils.GetSelfFuncName(), err.Error(), userID)
|
||||
}
|
||||
maxSeqCache, maxSeqMongo, err := c.GetAndFixUserSeqs(ctx, userID)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
c.CheckMaxSeqWithMongo(ctx, userID, maxSeqCache, maxSeqMongo, constant.WriteDiffusion)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *MsgTool) ClearSuperGroupMsg(ctx context.Context, superGroupIDs []string) {
|
||||
for _, groupID := range superGroupIDs {
|
||||
userIDs, err := c.groupDatabase.FindGroupMemberUserID(ctx, groupID)
|
||||
if err != nil {
|
||||
log.NewError(tracelog.GetOperationID(ctx), utils.GetSelfFuncName(), "FindGroupMemberUserID", err.Error(), groupID)
|
||||
continue
|
||||
}
|
||||
if err := c.msgDatabase.DeleteUserSuperGroupMsgsAndSetMinSeq(ctx, groupID, userIDs, int64(config.Config.Mongo.DBRetainChatRecords*24*60*60)); err != nil {
|
||||
log.NewError(tracelog.GetOperationID(ctx), utils.GetSelfFuncName(), err.Error(), "DeleteUserSuperGroupMsgsAndSetMinSeq failed", groupID, userIDs, config.Config.Mongo.DBRetainChatRecords)
|
||||
}
|
||||
if err := c.fixGroupSeq(ctx, groupID, userIDs); err != nil {
|
||||
log.NewError(tracelog.GetOperationID(ctx), utils.GetSelfFuncName(), err.Error(), groupID, userIDs)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *MsgTool) FixGroupSeq(ctx context.Context, groupID string) error {
|
||||
userIDs, err := c.groupDatabase.FindGroupMemberUserID(ctx, groupID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return c.fixGroupSeq(ctx, groupID, userIDs)
|
||||
}
|
||||
|
||||
func (c *MsgTool) fixGroupSeq(ctx context.Context, groupID string, userIDs []string) error {
|
||||
_, maxSeqMongo, maxSeqCache, err := c.msgDatabase.GetSuperGroupMinMaxSeqInMongoAndCache(ctx, groupID)
|
||||
if err != nil {
|
||||
log.NewError(tracelog.GetOperationID(ctx), utils.GetSelfFuncName(), err.Error(), "GetUserMinMaxSeqInMongoAndCache failed", groupID)
|
||||
return err
|
||||
}
|
||||
for _, userID := range userIDs {
|
||||
if _, err := c.GetAndFixGroupUserSeq(ctx, userID, groupID, maxSeqCache); err != nil {
|
||||
log.NewError(tracelog.GetOperationID(ctx), "GetAndFixGroupUserSeq failed", groupID, userID, maxSeqCache)
|
||||
continue
|
||||
}
|
||||
}
|
||||
c.CheckMaxSeqWithMongo(ctx, groupID, maxSeqCache, maxSeqMongo, constant.WriteDiffusion)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *MsgTool) GetAndFixUserSeqs(ctx context.Context, userID string) (maxSeqCache, maxSeqMongo int64, err error) {
|
||||
_, maxSeqMongo, minSeqCache, maxSeqCache, err := c.msgDatabase.GetUserMinMaxSeqInMongoAndCache(ctx, userID)
|
||||
if err != nil {
|
||||
log.NewError(tracelog.GetOperationID(ctx), utils.GetSelfFuncName(), err.Error(), "GetUserMinMaxSeqInMongoAndCache failed", userID)
|
||||
return 0, 0, err
|
||||
}
|
||||
|
||||
if minSeqCache > maxSeqCache {
|
||||
if err := c.msgDatabase.SetUserMinSeq(ctx, userID, maxSeqCache); err != nil {
|
||||
log.NewError(tracelog.GetOperationID(ctx), "SetUserMinSeq failed", userID, minSeqCache, maxSeqCache)
|
||||
} else {
|
||||
log.NewWarn(tracelog.GetOperationID(ctx), "SetUserMinSeq success", userID, minSeqCache, maxSeqCache)
|
||||
}
|
||||
}
|
||||
return maxSeqCache, maxSeqMongo, nil
|
||||
}
|
||||
|
||||
func (c *MsgTool) GetAndFixGroupUserSeq(ctx context.Context, userID string, groupID string, maxSeqCache int64) (minSeqCache int64, err error) {
|
||||
minSeqCache, err = c.msgDatabase.GetGroupUserMinSeq(ctx, groupID, userID)
|
||||
if err != nil {
|
||||
log.NewError(tracelog.GetOperationID(ctx), "GetGroupUserMinSeq failed", groupID, userID)
|
||||
return 0, err
|
||||
}
|
||||
if minSeqCache > maxSeqCache {
|
||||
if err := c.msgDatabase.SetGroupUserMinSeq(ctx, groupID, userID, maxSeqCache); err != nil {
|
||||
log.NewError(tracelog.GetOperationID(ctx), "SetGroupUserMinSeq failed", userID, minSeqCache, maxSeqCache)
|
||||
} else {
|
||||
log.NewWarn(tracelog.GetOperationID(ctx), "SetGroupUserMinSeq success", userID, minSeqCache, maxSeqCache)
|
||||
}
|
||||
}
|
||||
return minSeqCache, nil
|
||||
}
|
||||
|
||||
func (c *MsgTool) CheckMaxSeqWithMongo(ctx context.Context, sourceID string, maxSeqCache, maxSeqMongo int64, diffusionType int) {
|
||||
if math.Abs(float64(maxSeqMongo-maxSeqCache)) > 10 {
|
||||
log.NewWarn(tracelog.GetOperationID(ctx), "cache max seq and mongo max seq is diff > 10", sourceID, maxSeqCache, maxSeqMongo, diffusionType)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *MsgTool) ShowUserSeqs(ctx context.Context, userID string) {
|
||||
|
||||
}
|
||||
|
||||
func (c *MsgTool) ShowSuperGroupSeqs(ctx context.Context, groupID string) {
|
||||
|
||||
}
|
||||
|
||||
func (c *MsgTool) ShowSuperGroupUserSeqs(ctx context.Context, groupID, userID string) {
|
||||
|
||||
}
|
||||
|
||||
func (c *MsgTool) FixAllSeq(ctx context.Context) {
|
||||
userIDs, err := c.userDatabase.GetAllUserID(ctx)
|
||||
if err != nil {
|
||||
panic(err.Error())
|
||||
}
|
||||
for _, userID := range userIDs {
|
||||
userCurrentMinSeq, err := c.msgDatabase.GetUserMinSeq(ctx, userID)
|
||||
if err != nil && err != redis.Nil {
|
||||
continue
|
||||
}
|
||||
userCurrentMaxSeq, err := c.msgDatabase.GetUserMaxSeq(ctx, userID)
|
||||
if err != nil && err != redis.Nil {
|
||||
continue
|
||||
}
|
||||
if userCurrentMinSeq > userCurrentMaxSeq {
|
||||
if err = c.msgDatabase.SetUserMinSeq(ctx, userID, userCurrentMaxSeq); err != nil {
|
||||
fmt.Println("SetUserMinSeq failed", userID, userCurrentMaxSeq)
|
||||
}
|
||||
fmt.Println("fix", userID, userCurrentMaxSeq)
|
||||
}
|
||||
}
|
||||
fmt.Println("fix users seq success")
|
||||
groupIDs, err := c.groupDatabase.GetGroupIDsByGroupType(ctx, constant.WorkingGroup)
|
||||
if err != nil {
|
||||
panic(err.Error())
|
||||
}
|
||||
for _, groupID := range groupIDs {
|
||||
maxSeq, err := c.msgDatabase.GetGroupMaxSeq(ctx, groupID)
|
||||
if err != nil {
|
||||
fmt.Println("GetGroupMaxSeq failed", groupID)
|
||||
continue
|
||||
}
|
||||
userIDs, err := c.groupDatabase.FindGroupMemberUserID(ctx, groupID)
|
||||
if err != nil {
|
||||
fmt.Println("get groupID", groupID, "failed, try again later")
|
||||
continue
|
||||
}
|
||||
for _, userID := range userIDs {
|
||||
userMinSeq, err := c.msgDatabase.GetGroupUserMinSeq(ctx, groupID, userID)
|
||||
if err != nil && err != redis.Nil {
|
||||
fmt.Println("GetGroupUserMinSeq failed", groupID, userID)
|
||||
continue
|
||||
}
|
||||
if userMinSeq > maxSeq {
|
||||
if err = c.msgDatabase.SetGroupUserMinSeq(ctx, groupID, userID, maxSeq); err != nil {
|
||||
fmt.Println("SetGroupUserMinSeq failed", err.Error(), groupID, userID, maxSeq)
|
||||
}
|
||||
fmt.Println("fix", groupID, userID, maxSeq, userMinSeq)
|
||||
}
|
||||
}
|
||||
}
|
||||
fmt.Println("fix all seq finished")
|
||||
}
|
||||
@@ -1,4 +1,4 @@
|
||||
package task
|
||||
package tools
|
||||
|
||||
import (
|
||||
"OpenIM/pkg/common/constant"
|
||||
@@ -16,12 +16,7 @@ import (
|
||||
"time"
|
||||
)
|
||||
|
||||
var (
|
||||
redisClient *redis.Client
|
||||
mongoClient *mongo.Collection
|
||||
)
|
||||
|
||||
func GenUserChat(startSeq, stopSeq, delSeq, index uint32, userID string) *mongo2.UserChat {
|
||||
func GenUserChat(startSeq, stopSeq, delSeq, index int64, userID string) *mongo2.UserChat {
|
||||
chat := &mongo2.UserChat{UID: userID + strconv.Itoa(int(index))}
|
||||
for i := startSeq; i <= stopSeq; i++ {
|
||||
msg := sdkws.MsgData{
|
||||
@@ -36,8 +31,8 @@ func GenUserChat(startSeq, stopSeq, delSeq, index uint32, userID string) *mongo2
|
||||
SessionType: 1,
|
||||
MsgFrom: 100,
|
||||
ContentType: 101,
|
||||
Content: []byte("testFaceURL"),
|
||||
Seq: uint32(i),
|
||||
Content: []byte("testFaceURL.com"),
|
||||
Seq: i,
|
||||
SendTime: time.Now().Unix(),
|
||||
CreateTime: time.Now().Unix(),
|
||||
Status: 1,
|
||||
@@ -89,15 +84,4 @@ func TestDeleteUserMsgsAndSetMinSeq(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Error("err is not nil", testUID1, err.Error())
|
||||
}
|
||||
// testWorkingGroupIDList := []string{"test_del_id1", "test_del_id2", "test_del_id3", "test_del_id4", "test_del_id5"}
|
||||
// for _, groupID := range testWorkingGroupIDList {
|
||||
// operationID = groupID + "-" + operationID
|
||||
// log.NewDebug(operationID, utils.GetSelfFuncName(), "groupID:", groupID, "userIDList:", testUserIDList)
|
||||
// if err := DeleteUserSuperGroupMsgsAndSetMinSeq(operationID, groupID, testUserIDList); err != nil {
|
||||
// t.Error("checkMaxSeqWithMongo failed", groupID)
|
||||
// }
|
||||
// if err := checkMaxSeqWithMongo(operationID, groupID, constant.ReadDiffusion); err != nil {
|
||||
// t.Error("checkMaxSeqWithMongo failed", groupID)
|
||||
// }
|
||||
// }
|
||||
}
|
||||
Reference in New Issue
Block a user