new feature: add batch send msg (#560)

* new feature: add batch send msg

* new feature: add batch send msg

* new feature: add batch send msg

* new feature: add batch send msg

* new feature: add batch send msg

* new feature: add batch send msg
This commit is contained in:
WangchuXiao
2023-07-14 14:49:28 +08:00
committed by GitHub
parent 3b438d58a9
commit 70d8ae4c19
27 changed files with 142 additions and 1299 deletions
+76 -114
View File
@@ -26,7 +26,6 @@ import (
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/log"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/tokenverify"
"github.com/OpenIMSDK/Open-IM-Server/pkg/discoveryregistry"
"github.com/OpenIMSDK/Open-IM-Server/pkg/errs"
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/msg"
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws"
@@ -35,12 +34,13 @@ import (
)
type MessageApi struct {
rpcclient.Message
validate *validator.Validate
*rpcclient.Message
validate *validator.Validate
userRpcClient *rpcclient.UserRpcClient
}
func NewMessageApi(discov discoveryregistry.SvcDiscoveryRegistry) MessageApi {
return MessageApi{Message: *rpcclient.NewMessage(discov), validate: validator.New()}
func NewMessageApi(msgRpcClient *rpcclient.Message, userRpcClient *rpcclient.User) MessageApi {
return MessageApi{Message: msgRpcClient, validate: validator.New(), userRpcClient: rpcclient.NewUserRpcClientByUser(userRpcClient)}
}
func (MessageApi) SetOptions(options map[string]bool, value bool) {
@@ -50,9 +50,10 @@ func (MessageApi) SetOptions(options map[string]bool, value bool) {
utils.SetSwitchFromOptions(options, constant.IsConversationUpdate, value)
}
func (m MessageApi) newUserSendMsgReq(c *gin.Context, params *apistruct.ManagementSendMsgReq) *msg.SendMsgReq {
func (m MessageApi) newUserSendMsgReq(c *gin.Context, params *apistruct.SendMsg) *msg.SendMsgReq {
var newContent string
var err error
options := make(map[string]bool, 5)
switch params.ContentType {
case constant.Text:
newContent = params.Content["text"].(string)
@@ -70,11 +71,9 @@ func (m MessageApi) newUserSendMsgReq(c *gin.Context, params *apistruct.Manageme
fallthrough
case constant.CustomOnlineOnly:
fallthrough
case constant.Revoke:
newContent = params.Content["revokeMsgClientID"].(string)
default:
newContent = utils.StructToJsonString(params.Content)
}
options := make(map[string]bool, 5)
if params.IsOnlineOnly {
m.SetOptions(options, false)
}
@@ -98,7 +97,6 @@ func (m MessageApi) newUserSendMsgReq(c *gin.Context, params *apistruct.Manageme
MsgFrom: constant.SysMsgType,
ContentType: params.ContentType,
Content: []byte(newContent),
RecvID: params.RecvID,
CreateTime: utils.GetCurrentTimestampByMill(),
Options: options,
OfflinePushInfo: params.OfflinePushInfo,
@@ -163,19 +161,9 @@ func (m *MessageApi) DeleteMsgPhysical(c *gin.Context) {
a2r.Call(msg.MsgClient.DeleteMsgPhysical, m.Client, c)
}
func (m *MessageApi) SendMessage(c *gin.Context) {
params := apistruct.ManagementSendMsgReq{}
if err := c.BindJSON(&params); err != nil {
apiresp.GinError(c, errs.ErrArgs.WithDetail(err.Error()).Wrap())
return
}
if !tokenverify.IsAppManagerUid(c) {
apiresp.GinError(c, errs.ErrNoPermission.Wrap("only app manager can send message"))
return
}
func (m *MessageApi) getSendMsgReq(c *gin.Context, req apistruct.SendMsg) (sendMsgReq *msg.SendMsgReq, err error) {
var data interface{}
switch params.ContentType {
switch req.ContentType {
case constant.Text:
data = apistruct.TextElem{}
case constant.Picture:
@@ -192,25 +180,44 @@ func (m *MessageApi) SendMessage(c *gin.Context) {
data = apistruct.RevokeElem{}
case constant.OANotification:
data = apistruct.OANotificationElem{}
params.SessionType = constant.NotificationChatType
req.SessionType = constant.NotificationChatType
case constant.CustomNotTriggerConversation:
data = apistruct.CustomElem{}
case constant.CustomOnlineOnly:
data = apistruct.CustomElem{}
default:
apiresp.GinError(c, errs.ErrArgs.WithDetail("not support err contentType").Wrap())
return nil, errs.ErrArgs.WithDetail("not support err contentType")
}
if err := mapstructure.WeakDecode(req.Content, &data); err != nil {
return nil, err
}
log.ZDebug(c, "getSendMsgReq", "data", data)
if err := m.validate.Struct(data); err != nil {
return nil, err
}
return m.newUserSendMsgReq(c, &req), nil
}
func (m *MessageApi) SendMessage(c *gin.Context) {
req := apistruct.SendMsgReq{}
if err := c.BindJSON(&req); err != nil {
apiresp.GinError(c, errs.ErrArgs.WithDetail(err.Error()).Wrap())
return
}
if err := mapstructure.WeakDecode(params.Content, &data); err != nil {
apiresp.GinError(c, errs.ErrArgs.Wrap(err.Error()))
return
} else if err := m.validate.Struct(params); err != nil {
apiresp.GinError(c, errs.ErrArgs.Wrap(err.Error()))
log.ZInfo(c, "SendMessage", "req", req)
if !tokenverify.IsAppManagerUid(c) {
apiresp.GinError(c, errs.ErrNoPermission.Wrap("only app manager can send message"))
return
}
pbReq := m.newUserSendMsgReq(c, &params)
sendMsgReq, err := m.getSendMsgReq(c, req.SendMsg)
if err != nil {
log.ZError(c, "decodeData failed", err)
apiresp.GinError(c, err)
}
sendMsgReq.MsgData.RecvID = req.RecvID
var status int
respPb, err := m.Client.SendMsg(c, pbReq)
respPb, err := m.Client.SendMsg(c, sendMsgReq)
if err != nil {
status = constant.MsgSendFailed
apiresp.GinError(c, err)
@@ -226,107 +233,62 @@ func (m *MessageApi) SendMessage(c *gin.Context) {
apiresp.GinSuccess(c, respPb)
}
func (m *MessageApi) ManagementBatchSendMsg(c *gin.Context) {
params := apistruct.ManagementBatchSendMsgReq{}
resp := apistruct.ManagementBatchSendMsgResp{}
var msgSendFailedFlag bool
if err := c.BindJSON(&params); err != nil {
func (m *MessageApi) BatchSendMsg(c *gin.Context) {
var (
req apistruct.BatchSendMsgReq
resp apistruct.BatchSendMsgResp
)
if err := c.BindJSON(&req); err != nil {
log.ZError(c, "BatchSendMsg BindJSON failed", err)
apiresp.GinError(c, errs.ErrArgs.WithDetail(err.Error()).Wrap())
return
}
if !tokenverify.IsAppManagerUid(c) {
log.ZInfo(c, "BatchSendMsg", "req", req)
if err := tokenverify.CheckAdmin(c); err != nil {
apiresp.GinError(c, errs.ErrNoPermission.Wrap("only app manager can send message"))
return
}
var data interface{}
switch params.ContentType {
case constant.Text:
data = apistruct.TextElem{}
case constant.Picture:
data = apistruct.PictureElem{}
case constant.Voice:
data = apistruct.SoundElem{}
case constant.Video:
data = apistruct.VideoElem{}
case constant.File:
data = apistruct.FileElem{}
case constant.Custom:
data = apistruct.CustomElem{}
case constant.Revoke:
data = apistruct.RevokeElem{}
case constant.OANotification:
data = apistruct.OANotificationElem{}
params.SessionType = constant.NotificationChatType
case constant.CustomNotTriggerConversation:
data = apistruct.CustomElem{}
case constant.CustomOnlineOnly:
data = apistruct.CustomElem{}
default:
apiresp.GinError(c, errs.ErrArgs.WithDetail("not support err contentType").Wrap())
return
}
if err := mapstructure.WeakDecode(params.Content, &data); err != nil {
apiresp.GinError(c, errs.ErrArgs.Wrap(err.Error()))
return
} else if err := m.validate.Struct(params); err != nil {
apiresp.GinError(c, errs.ErrArgs.Wrap(err.Error()))
return
}
t := &apistruct.ManagementSendMsgReq{
SendID: params.SendID,
GroupID: params.GroupID,
SenderNickname: params.SenderNickname,
SenderFaceURL: params.SenderFaceURL,
SenderPlatformID: params.SenderPlatformID,
Content: params.Content,
ContentType: params.ContentType,
SessionType: params.SessionType,
IsOnlineOnly: params.IsOnlineOnly,
NotOfflinePush: params.NotOfflinePush,
OfflinePushInfo: params.OfflinePushInfo,
}
pbReq := m.newUserSendMsgReq(c, t)
var recvList []string
if params.IsSendAll {
// req2 := &user.GetAllUserIDReq{}
// resp2, err := m.Message.GetAllUserID(c, req2)
// if err != nil {
// apiresp.GinError(c, errs.ErrArgs.Wrap(err.Error()))
// return
// }
// recvList = resp2.UserIDs
var recvIDs []string
var err error
if req.IsSendAll {
pageNumber := 1
showNumber := 500
for {
recvIDsPart, err := m.userRpcClient.GetAllUserIDs(c, int32(pageNumber), int32(showNumber))
if err != nil {
log.ZError(c, "GetAllUserIDs failed", err)
apiresp.GinError(c, err)
}
if len(recvIDsPart) < showNumber {
recvIDs = append(recvIDs, recvIDsPart...)
break
}
pageNumber++
}
} else {
recvList = params.RecvIDList
recvIDs = req.RecvIDs
}
for _, recvID := range recvList {
pbReq.MsgData.RecvID = recvID
rpcResp, err := m.Client.SendMsg(c, pbReq)
log.ZDebug(c, "BatchSendMsg nums", "nums ", len(recvIDs))
sendMsgReq, err := m.getSendMsgReq(c, req.SendMsg)
if err != nil {
log.ZError(c, "decodeData failed", err)
apiresp.GinError(c, err)
}
for _, recvID := range recvIDs {
sendMsgReq.MsgData.RecvID = recvID
rpcResp, err := m.Client.SendMsg(c, sendMsgReq)
if err != nil {
resp.Data.FailedIDList = append(resp.Data.FailedIDList, recvID)
msgSendFailedFlag = true
resp.FailedIDs = append(resp.FailedIDs, recvID)
continue
}
resp.Data.ResultList = append(resp.Data.ResultList, &apistruct.SingleReturnResult{
resp.Results = append(resp.Results, &apistruct.SingleReturnResult{
ServerMsgID: rpcResp.ServerMsgID,
ClientMsgID: rpcResp.ClientMsgID,
SendTime: rpcResp.SendTime,
RecvID: recvID,
})
}
var status int32
if msgSendFailedFlag {
status = constant.MsgSendFailed
} else {
status = constant.MsgSendSuccessed
}
_, err := m.Client.SetSendMsgStatus(c, &msg.SetSendMsgStatusReq{Status: status})
if err != nil {
apiresp.GinError(c, errs.ErrArgs.Wrap(err.Error()))
return
}
apiresp.GinSuccess(c, resp)
}