fix Pb2String

This commit is contained in:
wangchuxiao
2023-01-11 16:23:16 +08:00
parent 46c3b5a6ea
commit c96ccfc7f5
35 changed files with 720 additions and 726 deletions
+14 -13
View File
@@ -8,6 +8,7 @@ import (
rocksCache "Open_IM/pkg/common/db/rocks_cache"
"Open_IM/pkg/common/log"
"Open_IM/pkg/common/token_verify"
"Open_IM/pkg/getcdv3"
cacheRpc "Open_IM/pkg/proto/cache"
pbConversation "Open_IM/pkg/proto/conversation"
pbChat "Open_IM/pkg/proto/msg"
@@ -129,7 +130,7 @@ func (rpc *rpcChat) messageVerification(ctx context.Context, data *pbChat.SendMs
}
log.NewDebug(data.OperationID, *config.Config.MessageVerify.FriendVerify)
reqGetBlackIDListFromCache := &cacheRpc.GetBlackIDListFromCacheReq{UserID: data.MsgData.RecvID, OperationID: data.OperationID}
etcdConn, err := utils.GetConn(context.Background(), config.Config.RpcRegisterName.OpenImCacheName)
etcdConn, err := getcdv3.GetConn(context.Background(), config.Config.RpcRegisterName.OpenImCacheName)
if err != nil {
errMsg := data.OperationID + "getcdv3.GetDefaultConn == nil"
log.NewError(data.OperationID, errMsg)
@@ -152,7 +153,7 @@ func (rpc *rpcChat) messageVerification(ctx context.Context, data *pbChat.SendMs
log.NewDebug(data.OperationID, *config.Config.MessageVerify.FriendVerify)
if *config.Config.MessageVerify.FriendVerify {
reqGetFriendIDListFromCache := &cacheRpc.GetFriendIDListFromCacheReq{UserID: data.MsgData.RecvID, OperationID: data.OperationID}
etcdConn, err := utils.GetConn(context.Background(), config.Config.RpcRegisterName.OpenImCacheName)
etcdConn, err := getcdv3.GetConn(context.Background(), config.Config.RpcRegisterName.OpenImCacheName)
if err != nil {
errMsg := data.OperationID + "getcdv3.GetDefaultConn == nil"
log.NewError(data.OperationID, errMsg)
@@ -176,7 +177,7 @@ func (rpc *rpcChat) messageVerification(ctx context.Context, data *pbChat.SendMs
return true, 0, "", nil
}
case constant.GroupChatType:
userIDList, err := utils2.GetGroupMemberUserIDList(data.MsgData.GroupID, data.OperationID)
userIDList, err := utils2.GetGroupMemberUserIDList(ctx, data.MsgData.GroupID, data.OperationID)
if err != nil {
errMsg := data.OperationID + err.Error()
log.NewError(data.OperationID, errMsg)
@@ -250,7 +251,7 @@ func (rpc *rpcChat) messageVerification(ctx context.Context, data *pbChat.SendMs
if groupInfo.GroupType == constant.SuperGroup {
return true, 0, "", nil
} else {
userIDList, err := utils2.GetGroupMemberUserIDList(data.MsgData.GroupID, data.OperationID)
userIDList, err := utils2.GetGroupMemberUserIDList(ctx, data.MsgData.GroupID, data.OperationID)
if err != nil {
errMsg := data.OperationID + err.Error()
log.NewError(data.OperationID, errMsg)
@@ -485,13 +486,13 @@ func (rpc *rpcChat) SendMsg(ctx context.Context, pb *pbChat.SendMsgReq) (*pbChat
wg.Add(1)
tmp := valueCopy(pb)
// go rpc.sendMsgToGroup(v[i*split:(i+1)*split], *pb, k, &sendTag, &wg)
go rpc.sendMsgToGroupOptimization(v[i*split:(i+1)*split], tmp, k, &sendTag, &wg)
go rpc.sendMsgToGroupOptimization(ctx, v[i*split:(i+1)*split], tmp, k, &sendTag, &wg)
}
if remain > 0 {
wg.Add(1)
tmp := valueCopy(pb)
// go rpc.sendMsgToGroup(v[split*(len(v)/split):], *pb, k, &sendTag, &wg)
go rpc.sendMsgToGroupOptimization(v[split*(len(v)/split):], tmp, k, &sendTag, &wg)
go rpc.sendMsgToGroupOptimization(ctx, v[split*(len(v)/split):], tmp, k, &sendTag, &wg)
}
}
log.Debug(pb.OperationID, "send msg cost time22 ", time.Since(t1), pb.MsgData.ClientMsgID, "uidList : ", len(addUidList))
@@ -538,7 +539,7 @@ func (rpc *rpcChat) SendMsg(ctx context.Context, pb *pbChat.SendMsgReq) (*pbChat
conversationReq.UserIDList = pb.MsgData.AtUserIDList
conversation.GroupAtType = constant.AtMe
}
etcdConn, err := utils.GetConn(ctx, config.Config.RpcRegisterName.OpenImConversationName)
etcdConn, err := getcdv3.GetConn(ctx, config.Config.RpcRegisterName.OpenImConversationName)
if err != nil {
errMsg := pb.OperationID + "getcdv3.GetDefaultConn == nil"
log.NewError(pb.OperationID, errMsg)
@@ -615,7 +616,7 @@ func (rpc *rpcChat) SendMsg(ctx context.Context, pb *pbChat.SendMsgReq) (*pbChat
}
msgToMQSingle.MsgData = pb.MsgData
log.NewInfo(msgToMQSingle.OperationID, msgToMQSingle)
err1 := rpc.sendMsgToWriter(&msgToMQSingle, msgToMQSingle.MsgData.GroupID, constant.OnlineStatus)
err1 := rpc.sendMsgToWriter(ctx, &msgToMQSingle, msgToMQSingle.MsgData.GroupID, constant.OnlineStatus)
if err1 != nil {
log.NewError(msgToMQSingle.OperationID, "kafka send msg err:RecvID", msgToMQSingle.MsgData.RecvID, msgToMQSingle.String())
promePkg.PromeInc(promePkg.WorkSuperGroupChatMsgProcessFailedCounter)
@@ -639,7 +640,7 @@ func (rpc *rpcChat) sendMsgToWriter(ctx context.Context, m *pbChat.MsgDataToMQ,
case constant.OnlineStatus:
if m.MsgData.ContentType == constant.SignalingNotification {
rpcPushMsg := pbPush.PushMsgReq{OperationID: m.OperationID, MsgData: m.MsgData, PushToUserID: key}
grpcConn, err := utils.GetConn(ctx, config.Config.RpcRegisterName.OpenImPushName)
grpcConn, err := getcdv3.GetConn(ctx, config.Config.RpcRegisterName.OpenImPushName)
if err != nil {
return err
}
@@ -1103,7 +1104,7 @@ func valueCopy(pb *pbChat.SendMsgReq) *pbChat.SendMsgReq {
return &pbChat.SendMsgReq{Token: pb.Token, OperationID: pb.OperationID, MsgData: &msgData}
}
func (rpc *rpcChat) sendMsgToGroup(list []string, pb pbChat.SendMsgReq, status string, sendTag *bool, wg *sync.WaitGroup) {
func (rpc *rpcChat) sendMsgToGroup(ctx context.Context, list []string, pb pbChat.SendMsgReq, status string, sendTag *bool, wg *sync.WaitGroup) {
// log.Debug(pb.OperationID, "split userID ", list)
offlinePushInfo := sdk_ws.OfflinePushInfo{}
if pb.MsgData.OfflinePushInfo != nil {
@@ -1126,7 +1127,7 @@ func (rpc *rpcChat) sendMsgToGroup(list []string, pb pbChat.SendMsgReq, status s
if isSend {
msgToMQGroup.MsgData = groupPB.MsgData
// log.Debug(groupPB.OperationID, "sendMsgToWriter, ", v, groupID, msgToMQGroup.String())
err := rpc.sendMsgToWriter(&msgToMQGroup, v, status)
err := rpc.sendMsgToWriter(ctx, &msgToMQGroup, v, status)
if err != nil {
log.NewError(msgToMQGroup.OperationID, "kafka send msg err:UserId", v, msgToMQGroup.String())
} else {
@@ -1139,7 +1140,7 @@ func (rpc *rpcChat) sendMsgToGroup(list []string, pb pbChat.SendMsgReq, status s
wg.Done()
}
func (rpc *rpcChat) sendMsgToGroupOptimization(list []string, groupPB *pbChat.SendMsgReq, status string, sendTag *bool, wg *sync.WaitGroup) {
func (rpc *rpcChat) sendMsgToGroupOptimization(ctx context.Context, list []string, groupPB *pbChat.SendMsgReq, status string, sendTag *bool, wg *sync.WaitGroup) {
msgToMQGroup := pbChat.MsgDataToMQ{Token: groupPB.Token, OperationID: groupPB.OperationID, MsgData: groupPB.MsgData}
tempOptions := make(map[string]bool, 1)
for k, v := range groupPB.MsgData.Options {
@@ -1158,7 +1159,7 @@ func (rpc *rpcChat) sendMsgToGroupOptimization(list []string, groupPB *pbChat.Se
log.Error(msgToMQGroup.OperationID, "sendMsgToGroupOptimization userID nil ", msgToMQGroup.String())
continue
}
err := rpc.sendMsgToWriter(&msgToMQGroup, v, status)
err := rpc.sendMsgToWriter(ctx, &msgToMQGroup, v, status)
if err != nil {
log.NewError(msgToMQGroup.OperationID, "kafka send msg err:UserId", v, msgToMQGroup.String())
} else {