merge all branch and change project structure

This commit is contained in:
Gordon
2021-11-10 15:24:59 +08:00
91 changed files with 3612 additions and 1225 deletions
+5 -1
View File
@@ -1,10 +1,10 @@
package apiAuth
import (
pbAuth "Open_IM/pkg/proto/auth"
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/log"
"Open_IM/pkg/grpc-etcdv3/getcdv3"
pbAuth "Open_IM/pkg/proto/auth"
"context"
"github.com/gin-gonic/gin"
"net/http"
@@ -49,6 +49,10 @@ func UserRegister(c *gin.Context) {
c.JSON(http.StatusBadRequest, gin.H{"errCode": 400, "errMsg": err.Error()})
return
}
if params.Secret != config.Config.Secret {
c.JSON(http.StatusBadRequest, gin.H{"errCode": 401, "errMsg": "not authorized"})
return
}
pbData := newUserRegisterReq(&params)
log.Info("", "", "api user_register is server, [data: %s]", pbData.String())
+5 -1
View File
@@ -1,10 +1,10 @@
package apiAuth
import (
pbAuth "Open_IM/pkg/proto/auth"
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/log"
"Open_IM/pkg/grpc-etcdv3/getcdv3"
pbAuth "Open_IM/pkg/proto/auth"
"context"
"github.com/gin-gonic/gin"
"net/http"
@@ -37,6 +37,10 @@ func UserToken(c *gin.Context) {
c.JSON(http.StatusBadRequest, gin.H{"errCode": 400, "errMsg": err.Error()})
return
}
if params.Secret != config.Config.Secret {
c.JSON(http.StatusBadRequest, gin.H{"errCode": 401, "errMsg": "not authorized"})
return
}
pbData := newUserTokenReq(&params)
log.Info("", "", "api user_token is server, [data: %s]", pbData.String())
@@ -19,7 +19,7 @@ type paramsUserNewestSeq struct {
MsgIncr int `json:"msgIncr" binding:"required"`
}
func UserNewestSeq(c *gin.Context) {
func UserGetSeq(c *gin.Context) {
params := paramsUserNewestSeq{}
if err := c.BindJSON(&params); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"errCode": 400, "errMsg": err.Error()})
@@ -31,7 +31,7 @@ func UserNewestSeq(c *gin.Context) {
c.JSON(http.StatusBadRequest, gin.H{"errCode": 400, "errMsg": "token validate err"})
return
}
pbData := pbMsg.GetNewSeqReq{}
pbData := pbMsg.GetMaxAndMinSeqReq{}
pbData.UserID = params.SendID
pbData.OperationID = params.OperationID
grpcConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImOfflineMessageName)
@@ -40,7 +40,7 @@ func UserNewestSeq(c *gin.Context) {
log.ErrorByKv("get grpcConn err", pbData.OperationID, "args", params)
}
msgClient := pbMsg.NewChatClient(grpcConn)
reply, err := msgClient.GetNewSeq(context.Background(), &pbData)
reply, err := msgClient.GetMaxAndMinSeq(context.Background(), &pbData)
if err != nil {
log.ErrorByKv("rpc call failed to getNewSeq", pbData.OperationID, "err", err, "pbData", pbData.String())
return
@@ -52,7 +52,8 @@ func UserNewestSeq(c *gin.Context) {
"msgIncr": params.MsgIncr,
"reqIdentifier": params.ReqIdentifier,
"data": gin.H{
"seq": reply.Seq,
"maxSeq": reply.MaxSeq,
"minSeq": reply.MinSeq,
},
})
+55
View File
@@ -70,3 +70,58 @@ func UserPullMsg(c *gin.Context) {
})
}
type paramsUserPullMsgBySeqList struct {
ReqIdentifier int `json:"reqIdentifier" binding:"required"`
SendID string `json:"sendID" binding:"required"`
OperationID string `json:"operationID" binding:"required"`
SeqList []int64 `json:"seqList"`
}
func UserPullMsgBySeqList(c *gin.Context) {
params := paramsUserPullMsgBySeqList{}
if err := c.BindJSON(&params); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"errCode": 400, "errMsg": err.Error()})
return
}
token := c.Request.Header.Get("token")
if !utils.VerifyToken(token, params.SendID) {
c.JSON(http.StatusBadRequest, gin.H{"errCode": 400, "errMsg": "token validate err"})
return
}
pbData := pbChat.PullMessageBySeqListReq{}
pbData.UserID = params.SendID
pbData.OperationID = params.OperationID
pbData.SeqList = params.SeqList
grpcConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImOfflineMessageName)
msgClient := pbChat.NewChatClient(grpcConn)
reply, err := msgClient.PullMessageBySeqList(context.Background(), &pbData)
if err != nil {
log.ErrorByKv("PullMessageBySeqList error", pbData.OperationID, "err", err.Error())
return
}
log.InfoByKv("rpc call success to PullMessageBySeqList", pbData.OperationID, "ReplyArgs", reply.String(), "maxSeq", reply.GetMaxSeq(),
"MinSeq", reply.GetMinSeq(), "singLen", len(reply.GetSingleUserMsg()), "groupLen", len(reply.GetGroupUserMsg()))
msg := make(map[string]interface{})
if v := reply.GetSingleUserMsg(); v != nil {
msg["single"] = v
} else {
msg["single"] = []pbChat.GatherFormat{}
}
if v := reply.GetGroupUserMsg(); v != nil {
msg["group"] = v
} else {
msg["group"] = []pbChat.GatherFormat{}
}
msg["maxSeq"] = reply.GetMaxSeq()
msg["minSeq"] = reply.GetMinSeq()
c.JSON(http.StatusOK, gin.H{
"errCode": reply.ErrCode,
"errMsg": reply.ErrMsg,
"reqIdentifier": params.ReqIdentifier,
"data": msg,
})
}
+1 -1
View File
@@ -67,7 +67,7 @@ func UserSendMsg(c *gin.Context) {
token := c.Request.Header.Get("token")
log.InfoByKv("Ws call success to sendMsgReq", params.OperationID, "Parameters", params)
log.InfoByKv("api call success to sendMsgReq", params.OperationID, "Parameters", params)
pbData := newUserSendMsgReq(token, &params)
log.Info("", "", "api UserSendMsg call start..., [data: %s]", pbData.String())
+1 -1
View File
@@ -1,10 +1,10 @@
package friend
import (
pbFriend "Open_IM/pkg/proto/friend"
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/log"
"Open_IM/pkg/grpc-etcdv3/getcdv3"
pbFriend "Open_IM/pkg/proto/friend"
"context"
"github.com/gin-gonic/gin"
"net/http"
+9 -6
View File
@@ -35,7 +35,7 @@ type paramsManagementSendMsg struct {
SessionType int32 `json:"sessionType" binding:"required"`
}
func newUserSendMsgReq(token string, params *paramsManagementSendMsg) *pbChat.UserSendMsgReq {
func newUserSendMsgReq(params *paramsManagementSendMsg) *pbChat.UserSendMsgReq {
var newContent string
switch params.ContentType {
case constant.Text:
@@ -53,7 +53,6 @@ func newUserSendMsgReq(token string, params *paramsManagementSendMsg) *pbChat.Us
}
pbData := pbChat.UserSendMsgReq{
ReqIdentifier: constant.WSSendMsg,
Token: token,
SendID: params.SendID,
SenderNickName: params.SenderNickName,
SenderFaceURL: params.SenderFaceURL,
@@ -103,15 +102,19 @@ func ManagementSendMsg(c *gin.Context) {
}
token := c.Request.Header.Get("token")
if !utils.IsContain(params.SendID, config.Config.Manager.AppManagerUid) {
c.JSON(http.StatusBadRequest, gin.H{"errCode": 400, "errMsg": "not appManager", "sendTime": 0, "MsgID": ""})
claims, err := utils.ParseToken(token)
if err != nil {
log.NewError(params.OperationID, "parse token failed", err.Error())
c.JSON(http.StatusBadRequest, gin.H{"errCode": 400, "errMsg": "parse token failed", "sendTime": 0, "MsgID": ""})
}
if !utils.IsContain(claims.UID, config.Config.Manager.AppManagerUid) {
c.JSON(http.StatusBadRequest, gin.H{"errCode": 400, "errMsg": "not authorized", "sendTime": 0, "MsgID": ""})
return
}
log.InfoByKv("Ws call success to ManagementSendMsgReq", params.OperationID, "Parameters", params)
pbData := newUserSendMsgReq(token, &params)
pbData := newUserSendMsgReq(&params)
log.Info("", "", "api ManagementSendMsg call start..., [data: %s]", pbData.String())
etcdConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImOfflineMessageName)
+200 -113
View File
@@ -6,62 +6,67 @@ import (
"Open_IM/pkg/common/log"
"Open_IM/pkg/grpc-etcdv3/getcdv3"
pbChat "Open_IM/pkg/proto/chat"
open_im_sdk "Open_IM/pkg/proto/sdk_ws"
"Open_IM/pkg/utils"
"bytes"
"context"
"encoding/gob"
"encoding/json"
"github.com/golang/protobuf/proto"
"github.com/gorilla/websocket"
"runtime"
"strings"
)
func (ws *WServer) msgParse(conn *websocket.Conn, jsonMsg []byte) {
func (ws *WServer) msgParse(conn *UserConn, binaryMsg []byte) {
//ws online debug data
//{"ReqIdentifier":1001,"Token":"123","SendID":"c4ca4238a0b923820dcc509a6f75849b","Time":"123","OperationID":"123","MsgIncr":0}
//{"ReqIdentifier":1002,"Token":"123","SendID":"c4ca4238a0b923820dcc509a6f75849b","Time":"123","OperationID":"123","MsgIncr":0,"SeqBegin":1,"SeqEnd":6}
//{"ReqIdentifier":1003,"Token":"123","SendID":"c4ca4238a0b923820dcc509a6f75849b",
//"RecvID":"a87ff679a2f3e71d9181a67b7542122c","ClientMsgID":"2343","Time":"147878787","OperationID":
//"123","MsgIncr":0,"SubMsgType":101,"MsgType":100,"MsgFrom":1,"Content":"sdfsdf"}
b := bytes.NewBuffer(binaryMsg)
m := Req{}
if err := json.Unmarshal(jsonMsg, &m); err != nil {
dec := gob.NewDecoder(b)
err := dec.Decode(&m)
if err != nil {
log.ErrorByKv("ws json Unmarshal err", "", "err", err.Error())
ws.sendErrMsg(conn, 200, err.Error())
ws.sendErrMsg(conn, 200, err.Error(), constant.WSDataError, "", "")
err = conn.Close()
if err != nil {
log.NewError("", "ws close err", err.Error())
}
return
}
if err := validate.Struct(m); err != nil {
log.ErrorByKv("ws args validate err", "", "err", err.Error())
ws.sendErrMsg(conn, 201, err.Error())
ws.sendErrMsg(conn, 201, err.Error(), m.ReqIdentifier, m.MsgIncr, m.OperationID)
return
}
if !utils.VerifyToken(m.Token, m.SendID) {
ws.sendErrMsg(conn, 202, "token validate err")
return
}
log.InfoByKv("Basic Info Authentication Success", m.OperationID, "reqIdentifier", m.ReqIdentifier, "sendID", m.SendID)
//if !utils.VerifyToken(m.Token, m.SendID) {
// ws.sendErrMsg(conn, 202, "token validate err", m.ReqIdentifier, m.MsgIncr,m.OperationID)
// return
//}
log.InfoByKv("Basic Info Authentication Success", m.OperationID, "reqIdentifier", m.ReqIdentifier, "sendID", m.SendID, "msgIncr", m.MsgIncr)
switch m.ReqIdentifier {
case constant.WSGetNewestSeq:
ws.newestSeqReq(conn, &m)
go ws.getSeqReq(conn, &m)
case constant.WSPullMsg:
ws.pullMsgReq(conn, &m)
go ws.pullMsgReq(conn, &m)
case constant.WSSendMsg:
ws.sendMsgReq(conn, &m)
sendTime := utils.GetCurrentTimestampByNano()
go ws.sendMsgReq(conn, &m, sendTime)
case constant.WSPullMsgBySeqList:
go ws.pullMsgBySeqListReq(conn, &m)
default:
}
log.NewInfo("", "goroutine num is ", runtime.NumGoroutine())
}
func (ws *WServer) newestSeqResp(conn *websocket.Conn, m *Req, pb *pbChat.GetNewSeqResp) {
mReply := make(map[string]interface{})
mData := make(map[string]interface{})
mReply["reqIdentifier"] = m.ReqIdentifier
mReply["msgIncr"] = m.MsgIncr
mReply["errCode"] = pb.GetErrCode()
mReply["errMsg"] = pb.GetErrMsg()
mData["seq"] = pb.GetSeq()
mReply["data"] = mData
ws.sendMsg(conn, mReply)
}
func (ws *WServer) newestSeqReq(conn *websocket.Conn, m *Req) {
log.InfoByKv("Ws call success to getNewSeq", m.OperationID, "Parameters", m)
pbData := pbChat.GetNewSeqReq{}
func (ws *WServer) getSeqReq(conn *UserConn, m *Req) {
log.NewInfo(m.OperationID, "Ws call success to getNewSeq", m.MsgIncr, m.SendID, m.ReqIdentifier)
pbData := pbChat.GetMaxAndMinSeqReq{}
nReply := new(pbChat.GetMaxAndMinSeqResp)
pbData.UserID = m.SendID
pbData.OperationID = m.OperationID
grpcConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImOfflineMessageName)
@@ -69,44 +74,35 @@ func (ws *WServer) newestSeqReq(conn *websocket.Conn, m *Req) {
log.ErrorByKv("get grpcConn err", pbData.OperationID, "args", m)
}
msgClient := pbChat.NewChatClient(grpcConn)
reply, err := msgClient.GetNewSeq(context.Background(), &pbData)
reply, err := msgClient.GetMaxAndMinSeq(context.Background(), &pbData)
if err != nil {
log.ErrorByKv("rpc call failed to getNewSeq", pbData.OperationID, "err", err, "pbData", pbData.String())
return
log.ErrorByKv("rpc call failed to getSeqReq", pbData.OperationID, "err", err, "pbData", pbData.String())
nReply.ErrCode = 200
nReply.ErrMsg = err.Error()
ws.getSeqResp(conn, m, nReply)
} else {
log.InfoByKv("rpc call success to getSeqReq", pbData.OperationID, "replyData", reply.String())
ws.getSeqResp(conn, m, reply)
}
log.InfoByKv("rpc call success to getNewSeq", pbData.OperationID, "replyData", reply.String())
ws.newestSeqResp(conn, m, reply)
}
func (ws *WServer) pullMsgResp(conn *websocket.Conn, m *Req, pb *pbChat.PullMessageResp) {
mReply := make(map[string]interface{})
msg := make(map[string]interface{})
mReply["reqIdentifier"] = m.ReqIdentifier
mReply["msgIncr"] = m.MsgIncr
mReply["errCode"] = pb.GetErrCode()
mReply["errMsg"] = pb.GetErrMsg()
//空切片
if v := pb.GetSingleUserMsg(); v != nil {
msg["single"] = v
} else {
msg["single"] = []pbChat.GatherFormat{}
func (ws *WServer) getSeqResp(conn *UserConn, m *Req, pb *pbChat.GetMaxAndMinSeqResp) {
var mReplyData open_im_sdk.GetMaxAndMinSeqResp
mReplyData.MaxSeq = pb.GetMaxSeq()
mReplyData.MinSeq = pb.GetMinSeq()
b, _ := proto.Marshal(&mReplyData)
mReply := Resp{
ReqIdentifier: m.ReqIdentifier,
MsgIncr: m.MsgIncr,
ErrCode: pb.GetErrCode(),
ErrMsg: pb.GetErrMsg(),
OperationID: m.OperationID,
Data: b,
}
if v := pb.GetGroupUserMsg(); v != nil {
msg["group"] = v
} else {
msg["group"] = []pbChat.GatherFormat{}
}
msg["maxSeq"] = pb.GetMaxSeq()
msg["minSeq"] = pb.GetMinSeq()
mReply["data"] = msg
ws.sendMsg(conn, mReply)
}
func (ws *WServer) pullMsgReq(conn *websocket.Conn, m *Req) {
log.InfoByKv("Ws call success to pullMsgReq", m.OperationID, "Parameters", m)
reply := new(pbChat.PullMessageResp)
func (ws *WServer) pullMsgReq(conn *UserConn, m *Req) {
log.NewInfo(m.OperationID, "Ws call success to pullMsgReq", m.ReqIdentifier, m.MsgIncr, m.SendID)
nReply := new(pbChat.PullMessageResp)
isPass, errCode, errMsg, data := ws.argsValidate(m, constant.WSPullMsg)
if isPass {
pbData := pbChat.PullMessageReq{}
@@ -119,78 +115,169 @@ func (ws *WServer) pullMsgReq(conn *websocket.Conn, m *Req) {
reply, err := msgClient.PullMessage(context.Background(), &pbData)
if err != nil {
log.ErrorByKv("PullMessage error", pbData.OperationID, "err", err.Error())
return
nReply.ErrCode = 200
nReply.ErrMsg = err.Error()
ws.pullMsgResp(conn, m, nReply)
} else {
log.InfoByKv("rpc call success to pullMsgRep", pbData.OperationID, "ReplyArgs", reply.String(), "maxSeq", reply.GetMaxSeq(),
"MinSeq", reply.GetMinSeq(), "singLen", len(reply.GetSingleUserMsg()), "groupLen", len(reply.GetGroupUserMsg()))
ws.pullMsgResp(conn, m, reply)
}
log.InfoByKv("rpc call success to pullMsgRep", pbData.OperationID, "ReplyArgs", reply.String(), "maxSeq", reply.GetMaxSeq(),
"MinSeq", reply.GetMinSeq(), "singLen", len(reply.GetSingleUserMsg()), "groupLen", len(reply.GetGroupUserMsg()))
ws.pullMsgResp(conn, m, reply)
} else {
reply.ErrCode = errCode
reply.ErrMsg = errMsg
ws.pullMsgResp(conn, m, reply)
nReply.ErrCode = errCode
nReply.ErrMsg = errMsg
ws.pullMsgResp(conn, m, nReply)
}
}
func (ws *WServer) pullMsgResp(conn *UserConn, m *Req, pb *pbChat.PullMessageResp) {
log.NewInfo(m.OperationID, "pullMsgResp come here ", pb.String())
var mReplyData open_im_sdk.PullMessageBySeqListResp
a, err := json.Marshal(pb.SingleUserMsg)
if err != nil {
log.NewError(m.OperationID, "GetSingleUserMsg,json marshal,err", err.Error())
}
log.NewInfo(m.OperationID, "pullMsgResp json is ", len(pb.SingleUserMsg))
err = json.Unmarshal(a, &mReplyData.SingleUserMsg)
if err != nil {
log.NewError(m.OperationID, "SingleUserMsg,json Unmarshal,err", err.Error())
}
b, err := json.Marshal(pb.GroupUserMsg)
if err != nil {
log.NewError(m.OperationID, "mReplyData,json marshal,err", err.Error())
}
err = json.Unmarshal(b, &mReplyData.GroupUserMsg)
if err != nil {
log.NewError(m.OperationID, "test SingleUserMsg,json Unmarshal,err", err.Error())
}
c, err := proto.Marshal(&mReplyData)
log.NewInfo(m.OperationID, "test info is ", len(mReplyData.SingleUserMsg), mReplyData.SingleUserMsg)
mReply := Resp{
ReqIdentifier: m.ReqIdentifier,
MsgIncr: m.MsgIncr,
ErrCode: pb.GetErrCode(),
ErrMsg: pb.GetErrMsg(),
OperationID: m.OperationID,
Data: c,
}
log.NewInfo(m.OperationID, "pullMsgResp all data is ", mReply.ReqIdentifier, mReply.MsgIncr, mReply.ErrCode, mReply.ErrMsg,
len(mReply.Data))
func (ws *WServer) sendMsgResp(conn *websocket.Conn, m *Req, pb *pbChat.UserSendMsgResp) {
mReply := make(map[string]interface{})
mReplyData := make(map[string]interface{})
mReply["reqIdentifier"] = m.ReqIdentifier
mReply["msgIncr"] = m.MsgIncr
mReply["errCode"] = pb.GetErrCode()
mReply["errMsg"] = pb.GetErrMsg()
mReplyData["clientMsgID"] = pb.GetClientMsgID()
mReplyData["serverMsgID"] = pb.GetServerMsgID()
mReply["data"] = mReplyData
ws.sendMsg(conn, mReply)
}
func (ws *WServer) sendMsgReq(conn *websocket.Conn, m *Req) {
log.InfoByKv("Ws call success to sendMsgReq", m.OperationID, "Parameters", m)
reply := new(pbChat.UserSendMsgResp)
}
func (ws *WServer) pullMsgBySeqListReq(conn *UserConn, m *Req) {
log.NewInfo(m.OperationID, "Ws call success to pullMsgBySeqListReq start", m.SendID, m.ReqIdentifier, m.MsgIncr)
nReply := new(pbChat.PullMessageResp)
isPass, errCode, errMsg, data := ws.argsValidate(m, constant.WSPullMsgBySeqList)
log.NewInfo(m.OperationID, "Ws call success to pullMsgBySeqListReq middle", m.SendID, m.ReqIdentifier, m.MsgIncr, data.(open_im_sdk.PullMessageBySeqListReq).SeqList)
if isPass {
pbData := pbChat.PullMessageBySeqListReq{}
pbData.SeqList = data.(open_im_sdk.PullMessageBySeqListReq).SeqList
pbData.UserID = m.SendID
pbData.OperationID = m.OperationID
grpcConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImOfflineMessageName)
msgClient := pbChat.NewChatClient(grpcConn)
reply, err := msgClient.PullMessageBySeqList(context.Background(), &pbData)
if err != nil {
log.NewError(pbData.OperationID, "pullMsgBySeqListReq err", err.Error())
nReply.ErrCode = 200
nReply.ErrMsg = err.Error()
ws.pullMsgResp(conn, m, nReply)
} else {
log.NewInfo(pbData.OperationID, "rpc call success to pullMsgBySeqListReq", reply.String(), reply.GetMaxSeq(), reply.GetMinSeq(), len(reply.GetSingleUserMsg()), len(reply.GetGroupUserMsg()))
ws.pullMsgResp(conn, m, reply)
}
} else {
nReply.ErrCode = errCode
nReply.ErrMsg = errMsg
ws.pullMsgResp(conn, m, nReply)
}
}
func (ws *WServer) sendMsgReq(conn *UserConn, m *Req, sendTime int64) {
log.NewInfo(m.OperationID, "Ws call success to sendMsgReq start", m.MsgIncr, m.ReqIdentifier, m.SendID, sendTime)
nReply := new(pbChat.UserSendMsgResp)
isPass, errCode, errMsg, pData := ws.argsValidate(m, constant.WSSendMsg)
if isPass {
data := pData.(MsgData)
data := pData.(open_im_sdk.UserSendMsgReq)
pbData := pbChat.UserSendMsgReq{
ReqIdentifier: m.ReqIdentifier,
Token: m.Token,
SendID: m.SendID,
OperationID: m.OperationID,
PlatformID: data.PlatformID,
SessionType: data.SessionType,
MsgFrom: data.MsgFrom,
ContentType: data.ContentType,
RecvID: data.RecvID,
ForceList: data.ForceList,
Content: data.Content,
Options: utils.MapToJsonString(data.Options),
ClientMsgID: data.ClientMsgID,
OffLineInfo: utils.MapToJsonString(data.OfflineInfo),
ReqIdentifier: m.ReqIdentifier,
Token: m.Token,
SendID: m.SendID,
OperationID: m.OperationID,
PlatformID: data.PlatformID,
SessionType: data.SessionType,
MsgFrom: data.MsgFrom,
ContentType: data.ContentType,
RecvID: data.RecvID,
ForceList: data.ForceList,
SenderNickName: data.SenderNickName,
SenderFaceURL: data.SenderFaceURL,
Content: data.Content,
Options: utils.MapIntToJsonString(data.Options),
ClientMsgID: data.ClientMsgID,
SendTime: sendTime,
}
log.InfoByKv("Ws call success to sendMsgReq", m.OperationID, "Parameters", m)
log.NewInfo(m.OperationID, "Ws call success to sendMsgReq middle", m.ReqIdentifier, m.SendID, m.MsgIncr, data)
etcdConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImOfflineMessageName)
client := pbChat.NewChatClient(etcdConn)
log.Info("", "", "api UserSendMsg call, api call rpc...")
reply, _ := client.UserSendMsg(context.Background(), &pbData)
log.Info("", "", "api UserSendMsg call end..., [data: %s] [reply: %s]", pbData.String(), reply.String())
ws.sendMsgResp(conn, m, reply)
reply, err := client.UserSendMsg(context.Background(), &pbData)
if err != nil {
log.NewError(pbData.OperationID, "UserSendMsg err", err.Error())
nReply.ErrCode = 200
nReply.ErrMsg = err.Error()
ws.sendMsgResp(conn, m, nReply, sendTime)
} else {
log.NewInfo(pbData.OperationID, "rpc call success to sendMsgReq", reply.String())
ws.sendMsgResp(conn, m, reply, sendTime)
}
} else {
reply.ErrCode = errCode
reply.ErrMsg = errMsg
ws.sendMsgResp(conn, m, reply)
nReply.ErrCode = errCode
nReply.ErrMsg = errMsg
ws.sendMsgResp(conn, m, nReply, sendTime)
}
}
func (ws *WServer) sendMsgResp(conn *UserConn, m *Req, pb *pbChat.UserSendMsgResp, sendTime int64) {
// := make(map[string]interface{})
func (ws *WServer) sendMsg(conn *websocket.Conn, mReply map[string]interface{}) {
bMsg, _ := json.Marshal(mReply)
err := ws.writeMsg(conn, websocket.TextMessage, bMsg)
if err != nil {
log.ErrorByKv("WS WriteMsg error", "", "userIP", conn.RemoteAddr().String(), "userUid", ws.getUserUid(conn), "error", err, "mReply", mReply)
var mReplyData open_im_sdk.UserSendMsgResp
mReplyData.ClientMsgID = pb.GetClientMsgID()
mReplyData.ServerMsgID = pb.GetServerMsgID()
mReplyData.SendTime = sendTime
b, _ := proto.Marshal(&mReplyData)
mReply := Resp{
ReqIdentifier: m.ReqIdentifier,
MsgIncr: m.MsgIncr,
ErrCode: pb.GetErrCode(),
ErrMsg: pb.GetErrMsg(),
OperationID: m.OperationID,
Data: b,
}
ws.sendMsg(conn, mReply)
}
func (ws *WServer) sendMsg(conn *UserConn, mReply interface{}) {
var b bytes.Buffer
enc := gob.NewEncoder(&b)
err := enc.Encode(mReply)
if err != nil {
log.NewError(mReply.(Resp).OperationID, mReply.(Resp).ReqIdentifier, mReply.(Resp).ErrCode, mReply.(Resp).ErrMsg, "Encode Msg error", conn.RemoteAddr().String(), ws.getUserUid(conn), err.Error())
return
}
err = ws.writeMsg(conn, websocket.BinaryMessage, b.Bytes())
if err != nil {
log.NewError(mReply.(Resp).OperationID, mReply.(Resp).ReqIdentifier, mReply.(Resp).ErrCode, mReply.(Resp).ErrMsg, "WS WriteMsg error", conn.RemoteAddr().String(), ws.getUserUid(conn), err.Error())
}
}
func (ws *WServer) sendErrMsg(conn *UserConn, errCode int32, errMsg string, reqIdentifier int32, msgIncr string, operationID string) {
mReply := Resp{
ReqIdentifier: reqIdentifier,
MsgIncr: msgIncr,
ErrCode: errCode,
ErrMsg: errMsg,
OperationID: operationID,
}
}
func (ws *WServer) sendErrMsg(conn *websocket.Conn, errCode int32, errMsg string) {
mReply := make(map[string]interface{})
mReply["errCode"] = errCode
mReply["errMsg"] = errMsg
ws.sendMsg(conn, mReply)
}
+61 -75
View File
@@ -6,14 +6,18 @@ import (
"Open_IM/pkg/common/log"
"Open_IM/pkg/grpc-etcdv3/getcdv3"
pbRelay "Open_IM/pkg/proto/relay"
open_im_sdk "Open_IM/pkg/proto/sdk_ws"
"Open_IM/pkg/utils"
"bytes"
"context"
"encoding/json"
"encoding/gob"
"fmt"
"github.com/gorilla/websocket"
"google.golang.org/grpc"
"github.com/golang/protobuf/proto"
"net"
"strings"
"github.com/gorilla/websocket"
"google.golang.org/grpc"
)
type RPCServer struct {
@@ -55,105 +59,87 @@ func (r *RPCServer) MsgToUser(_ context.Context, in *pbRelay.MsgToUserReq) (*pbR
log.InfoByKv("PushMsgToUser is arriving", in.OperationID, "args", in.String())
var resp []*pbRelay.SingleMsgToUser
var RecvID string
msg := make(map[string]interface{})
mReply := make(map[string]interface{})
mReply["reqIdentifier"] = constant.WSPushMsg
mReply["errCode"] = 0
mReply["errMsg"] = ""
msg["sendID"] = in.SendID
msg["recvID"] = in.RecvID
msg["msgFrom"] = in.MsgFrom
msg["contentType"] = in.ContentType
msg["sessionType"] = in.SessionType
msg["senderNickName"] = in.SenderNickName
msg["senderFaceUrl"] = in.SenderFaceURL
msg["clientMsgID"] = in.ClientMsgID
msg["serverMsgID"] = in.ServerMsgID
msg["content"] = in.Content
msg["seq"] = in.RecvSeq
msg["sendTime"] = in.SendTime
msg["senderPlatformID"] = in.PlatformID
mReply["data"] = msg
bMsg, _ := json.Marshal(mReply)
msg := open_im_sdk.MsgData{
SendID: in.SendID,
RecvID: in.RecvID,
MsgFrom: in.MsgFrom,
ContentType: in.ContentType,
SessionType: in.SessionType,
SenderNickName: in.SenderNickName,
SenderFaceURL: in.SenderFaceURL,
ClientMsgID: in.ClientMsgID,
ServerMsgID: in.ServerMsgID,
Content: in.Content,
Seq: in.RecvSeq,
SendTime: in.SendTime,
SenderPlatformID: in.PlatformID,
}
msgBytes, _ := proto.Marshal(&msg)
mReply := Resp{
ReqIdentifier: constant.WSPushMsg,
OperationID: in.OperationID,
Data: msgBytes,
}
var replyBytes bytes.Buffer
enc := gob.NewEncoder(&replyBytes)
err := enc.Encode(mReply)
if err != nil {
log.NewError(in.OperationID, "data encode err", err.Error())
}
switch in.GetSessionType() {
case constant.SingleChatType:
RecvID = in.GetRecvID()
case constant.GroupChatType:
RecvID = strings.Split(in.GetRecvID(), " ")[0]
}
log.InfoByKv("test", in.OperationID, "wsUserToConn", ws.wsUserToConn)
for key, conn := range ws.wsUserToConn {
UIDAndPID := strings.Split(key, " ")
if UIDAndPID[0] == RecvID {
resultCode := sendMsgToUser(conn, bMsg, in, UIDAndPID[1], UIDAndPID[0])
var tag bool
var UIDAndPID []string
userIDList := genUidPlatformArray(RecvID)
for _, v := range userIDList {
UIDAndPID = strings.Split(v, " ")
if conn := ws.getUserConn(v); conn != nil {
tag = true
resultCode := sendMsgToUser(conn, replyBytes.Bytes(), in, UIDAndPID[1], UIDAndPID[0])
temp := &pbRelay.SingleMsgToUser{
ResultCode: resultCode,
RecvID: UIDAndPID[0],
RecvPlatFormID: utils.PlatformNameToID(UIDAndPID[1]),
}
resp = append(resp, temp)
} else {
temp := &pbRelay.SingleMsgToUser{
ResultCode: -1,
RecvID: UIDAndPID[0],
RecvPlatFormID: utils.PlatformNameToID(UIDAndPID[1]),
}
resp = append(resp, temp)
}
}
//switch in.GetContentType() {
//case constant.SyncSenderMsg:
// log.InfoByKv("come sync", in.OperationID, "args", in.String())
// RecvID = in.GetSendID()
// if in.MsgFrom != constant.SysMsgType {
// for key, conn := range ws.wsUserToConn {
// UIDAndPID := strings.Split(key, " ")
// if UIDAndPID[0] == RecvID && utils.PlatformIDToName(in.GetPlatformID()) != UIDAndPID[1] {
// resultCode := sendMsgToUser(conn, bMsg, in, UIDAndPID[1], UIDAndPID[0])
// temp := &pbRelay.SingleMsgToUser{
// ResultCode: resultCode,
// RecvID: UIDAndPID[0],
// RecvPlatFormID: utils.PlatformNameToID(UIDAndPID[1]),
// }
// resp = append(resp, temp)
// }
//
// }
// }
//default:
// log.InfoByKv("not come sync", in.OperationID, "args", in.String())
// switch in.SessionType {
// case constant.SingleChatType:
// log.InfoByKv("come single", in.OperationID, "args", in.String())
// RecvID = in.GetRecvID()
// case constant.GroupChatType:
// RecvID = strings.Split(in.GetRecvID(), " ")[0]
// default:
// }
// log.InfoByKv("come for range", in.OperationID, "args", in.String())
//
// for key, conn := range ws.wsUserToConn {
// UIDAndPID := strings.Split(key, " ")
// if UIDAndPID[0] == RecvID {
// resultCode := sendMsgToUser(conn, bMsg, in, UIDAndPID[1], UIDAndPID[0])
// temp := &pbRelay.SingleMsgToUser{
// ResultCode: resultCode,
// RecvID: UIDAndPID[0],
// RecvPlatFormID: utils.PlatformNameToID(UIDAndPID[1]),
// }
// resp = append(resp, temp)
// }
// }
//}
if !tag {
log.NewError(in.OperationID, "push err ,no matched ws conn not in map", in.String())
}
return &pbRelay.MsgToUserResp{
Resp: resp,
}, nil
}
func sendMsgToUser(conn *websocket.Conn, bMsg []byte, in *pbRelay.MsgToUserReq, RecvPlatForm, RecvID string) (ResultCode int64) {
err := ws.writeMsg(conn, websocket.TextMessage, bMsg)
func sendMsgToUser(conn *UserConn, bMsg []byte, in *pbRelay.MsgToUserReq, RecvPlatForm, RecvID string) (ResultCode int64) {
err := ws.writeMsg(conn, websocket.BinaryMessage, bMsg)
if err != nil {
log.ErrorByKv("PushMsgToUser is failed By Ws", "", "Addr", conn.RemoteAddr().String(),
"error", err, "senderPlatform", utils.PlatformIDToName(in.PlatformID), "recvPlatform", RecvPlatForm, "args", in.String(), "recvID", RecvID)
ResultCode = -2
return ResultCode
} else {
log.InfoByKv("PushMsgToUser is success By Ws", in.OperationID, "args", in.String())
log.InfoByKv("PushMsgToUser is success By Ws", in.OperationID, "args", in.String(), "recvPlatForm", RecvPlatForm, "recvID", RecvID)
ResultCode = 0
return ResultCode
}
}
func genUidPlatformArray(uid string) (array []string) {
for i := 1; i <= utils.LinuxPlatformID; i++ {
array = append(array, uid+" "+utils.PlatformIDToName(int32(i)))
}
return array
}
+64 -21
View File
@@ -9,17 +9,27 @@ package gate
import (
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/log"
"github.com/mitchellh/mapstructure"
open_im_sdk "Open_IM/pkg/proto/sdk_ws"
"github.com/golang/protobuf/proto"
)
type Req struct {
ReqIdentifier int32 `json:"reqIdentifier" validate:"required"`
Token string `json:"token" validate:"required"`
SendID string `json:"sendID" validate:"required"`
OperationID string `json:"operationID" validate:"required"`
MsgIncr int32 `json:"msgIncr" validate:"required"`
Data map[string]interface{} `json:"data"`
ReqIdentifier int32 `json:"reqIdentifier" validate:"required"`
Token string `json:"token" `
SendID string `json:"sendID" validate:"required"`
OperationID string `json:"operationID" validate:"required"`
MsgIncr string `json:"msgIncr" validate:"required"`
Data []byte `json:"data"`
}
type Resp struct {
ReqIdentifier int32 `json:"reqIdentifier"`
MsgIncr string `json:"msgIncr"`
OperationID string `json:"operationID"`
ErrCode int32 `json:"errCode"`
ErrMsg string `json:"errMsg"`
Data []byte `json:"data"`
}
type SeqData struct {
SeqBegin int64 `mapstructure:"seqBegin" validate:"required"`
SeqEnd int64 `mapstructure:"seqEnd" validate:"required"`
@@ -30,31 +40,64 @@ type MsgData struct {
MsgFrom int32 `mapstructure:"msgFrom" validate:"required"`
ContentType int32 `mapstructure:"contentType" validate:"required"`
RecvID string `mapstructure:"recvID" validate:"required"`
ForceList []string `mapstructure:"forceList" validate:"required"`
ForceList []string `mapstructure:"forceList"`
Content string `mapstructure:"content" validate:"required"`
Options map[string]interface{} `mapstructure:"options" validate:"required"`
ClientMsgID string `mapstructure:"clientMsgID" validate:"required"`
OfflineInfo map[string]interface{} `mapstructure:"offlineInfo" validate:"required"`
Ext map[string]interface{} `mapstructure:"ext"`
}
type MaxSeqResp struct {
MaxSeq int64 `json:"maxSeq"`
}
type PullMessageResp struct {
}
type SeqListData struct {
SeqList []int64 `mapstructure:"seqList" validate:"required"`
}
func (ws *WServer) argsValidate(m *Req, r int32) (isPass bool, errCode int32, errMsg string, data interface{}) {
func (ws *WServer) argsValidate(m *Req, r int32) (isPass bool, errCode int32, errMsg string, returnData interface{}) {
switch r {
case constant.WSPullMsg:
data = SeqData{}
case constant.WSSendMsg:
data = MsgData{}
data := open_im_sdk.UserSendMsgReq{}
if err := proto.Unmarshal(m.Data, &data); err != nil {
log.ErrorByKv("Decode Data struct err", "", "err", err.Error(), "reqIdentifier", r)
return false, 203, err.Error(), nil
}
if err := validate.Struct(data); err != nil {
log.ErrorByKv("data args validate err", "", "err", err.Error(), "reqIdentifier", r)
return false, 204, err.Error(), nil
}
return true, 0, "", data
case constant.WSPullMsgBySeqList:
data := open_im_sdk.PullMessageBySeqListReq{}
if err := proto.Unmarshal(m.Data, &data); err != nil {
log.ErrorByKv("Decode Data struct err", "", "err", err.Error(), "reqIdentifier", r)
return false, 203, err.Error(), nil
}
if err := validate.Struct(data); err != nil {
log.ErrorByKv("data args validate err", "", "err", err.Error(), "reqIdentifier", r)
return false, 204, err.Error(), nil
}
return true, 0, "", data
default:
}
if err := mapstructure.WeakDecode(m.Data, &data); err != nil {
log.ErrorByKv("map to Data struct err", "", "err", err.Error(), "reqIdentifier", r)
return false, 203, err.Error(), nil
} else if err := validate.Struct(data); err != nil {
log.ErrorByKv("data args validate err", "", "err", err.Error(), "reqIdentifier", r)
return false, 204, err.Error(), nil
} else {
return true, 0, "", data
}
return false, 204, "args err", nil
//b := bytes.NewBuffer(m.Data)
//dec := gob.NewDecoder(b)
//err := dec.Decode(&data)
//if err != nil {
// log.ErrorByKv("Decode Data struct err", "", "err", err.Error(), "reqIdentifier", r)
// return false, 203, err.Error(), nil
//}
//if err := mapstructure.WeakDecode(m.Data, &data); err != nil {
// log.ErrorByKv("map to Data struct err", "", "err", err.Error(), "reqIdentifier", r)
// return false, 203, err.Error(), nil
//} else
}
+27 -18
View File
@@ -6,23 +6,28 @@ import (
"Open_IM/pkg/utils"
"github.com/gorilla/websocket"
"net/http"
"sync"
"time"
)
type UserConn struct {
*websocket.Conn
w *sync.Mutex
}
type WServer struct {
wsAddr string
wsMaxConnNum int
wsUpGrader *websocket.Upgrader
wsConnToUser map[*websocket.Conn]string
wsUserToConn map[string]*websocket.Conn
wsConnToUser map[*UserConn]string
wsUserToConn map[string]*UserConn
}
func (ws *WServer) onInit(wsPort int) {
ip := utils.ServerIP
ws.wsAddr = ip + ":" + utils.IntToString(wsPort)
ws.wsMaxConnNum = config.Config.LongConnSvr.WebsocketMaxConnNum
ws.wsConnToUser = make(map[*websocket.Conn]string)
ws.wsUserToConn = make(map[string]*websocket.Conn)
ws.wsConnToUser = make(map[*UserConn]string)
ws.wsUserToConn = make(map[string]*UserConn)
ws.wsUpGrader = &websocket.Upgrader{
HandshakeTimeout: time.Duration(config.Config.LongConnSvr.WebsocketTimeOut) * time.Second,
ReadBufferSize: config.Config.LongConnSvr.WebsocketMaxMsgLen,
@@ -49,35 +54,39 @@ func (ws *WServer) wsHandler(w http.ResponseWriter, r *http.Request) {
//Connection mapping relationship,
//userID+" "+platformID->conn
SendID := query["sendID"][0] + " " + utils.PlatformIDToName(int32(utils.StringToInt64(query["platformID"][0])))
ws.addUserConn(SendID, conn)
go ws.readMsg(conn)
//Initialize a lock for each user
newConn := &UserConn{conn, new(sync.Mutex)}
ws.addUserConn(SendID, newConn)
go ws.readMsg(newConn)
}
}
}
func (ws *WServer) readMsg(conn *websocket.Conn) {
func (ws *WServer) readMsg(conn *UserConn) {
for {
msgType, msg, err := conn.ReadMessage()
messageType, msg, err := conn.ReadMessage()
if messageType == websocket.PingMessage {
log.NewInfo("", "this is a pingMessage")
}
if err != nil {
log.ErrorByKv("WS ReadMsg error", "", "userIP", conn.RemoteAddr().String(), "userUid", ws.getUserUid(conn), "error", err)
ws.delUserConn(conn)
return
} else {
log.ErrorByKv("test", "", "msgType", msgType, "userIP", conn.RemoteAddr().String(), "userUid", ws.getUserUid(conn))
//log.ErrorByKv("test", "", "msgType", msgType, "userIP", conn.RemoteAddr().String(), "userUid", ws.getUserUid(conn))
}
ws.msgParse(conn, msg)
//ws.writeMsg(conn, 1, chat)
}
}
func (ws *WServer) writeMsg(conn *websocket.Conn, a int, msg []byte) error {
rwLock.Lock()
defer rwLock.Unlock()
func (ws *WServer) writeMsg(conn *UserConn, a int, msg []byte) error {
conn.w.Lock()
defer conn.w.Unlock()
return conn.WriteMessage(a, msg)
}
func (ws *WServer) addUserConn(uid string, conn *websocket.Conn) {
func (ws *WServer) addUserConn(uid string, conn *UserConn) {
rwLock.Lock()
defer rwLock.Unlock()
if oldConn, ok := ws.wsUserToConn[uid]; ok {
@@ -95,7 +104,7 @@ func (ws *WServer) addUserConn(uid string, conn *websocket.Conn) {
}
func (ws *WServer) delUserConn(conn *websocket.Conn) {
func (ws *WServer) delUserConn(conn *UserConn) {
rwLock.Lock()
defer rwLock.Unlock()
var uidPlatform string
@@ -111,12 +120,12 @@ func (ws *WServer) delUserConn(conn *websocket.Conn) {
}
err := conn.Close()
if err != nil {
log.ErrorByKv("close err", "", "uid", uidPlatform, "conn", conn)
log.ErrorByKv("close err", "", "uid", uidPlatform)
}
}
func (ws *WServer) getUserConn(uid string) *websocket.Conn {
func (ws *WServer) getUserConn(uid string) *UserConn {
rwLock.RLock()
defer rwLock.RUnlock()
if conn, ok := ws.wsUserToConn[uid]; ok {
@@ -124,7 +133,7 @@ func (ws *WServer) getUserConn(uid string) *websocket.Conn {
}
return nil
}
func (ws *WServer) getUserUid(conn *websocket.Conn) string {
func (ws *WServer) getUserUid(conn *UserConn) string {
rwLock.RLock()
defer rwLock.RUnlock()
+6 -1
View File
@@ -1,17 +1,22 @@
package logic
import (
pbMsg "Open_IM/pkg/proto/chat"
"Open_IM/pkg/common/db"
"Open_IM/pkg/common/db/mysql_model/im_mysql_model"
"Open_IM/pkg/common/log"
pbMsg "Open_IM/pkg/proto/chat"
"Open_IM/pkg/utils"
)
func saveUserChat(uid string, pbMsg *pbMsg.MsgSvrToPushSvrChatMsg) error {
time := utils.GetCurrentTimestampByMill()
seq, err := db.DB.IncrUserSeq(uid)
if err != nil {
log.NewError(pbMsg.OperationID, "data insert to redis err", err.Error(), pbMsg.String())
return err
}
pbMsg.RecvSeq = seq
log.NewInfo(pbMsg.OperationID, "IncrUserSeq cost time", utils.GetCurrentTimestampByMill()-time)
return db.DB.SaveUserChat(uid, pbMsg.SendTime, pbMsg)
}
@@ -33,6 +33,7 @@ func (mc *HistoryConsumerHandler) Init() {
func (mc *HistoryConsumerHandler) handleChatWs2Mongo(msg []byte, msgKey string) {
log.InfoByKv("chat come mongo!!!", "", "chat", string(msg))
time := utils.GetCurrentTimestampByNano()
pbData := pbMsg.WSToMsgSvrChatMsg{}
err := proto.Unmarshal(msg, &pbData)
if err != nil {
@@ -58,48 +59,53 @@ func (mc *HistoryConsumerHandler) handleChatWs2Mongo(msg []byte, msgKey string)
isHistory := utils.GetSwitchFromOptions(Options, "history")
//Control whether to store history messages (mysql)
isPersist := utils.GetSwitchFromOptions(Options, "persistent")
if pbData.SessionType == constant.SingleChatType {
log.Info("", "", "msg_transfer chat type = SingleChatType", isHistory, isPersist)
switch pbData.SessionType {
case constant.SingleChatType:
log.NewDebug(pbSaveData.OperationID, "msg_transfer chat type = SingleChatType", isHistory, isPersist)
if isHistory {
if msgKey == pbSaveData.RecvID {
err := saveUserChat(pbData.RecvID, &pbSaveData)
if err != nil {
log.ErrorByKv("data insert to mongo err", pbSaveData.OperationID, "data", pbSaveData.String(), "err", err.Error())
log.NewError(pbSaveData.OperationID, "single data insert to mongo err", err.Error(), pbSaveData.String())
return
}
} else if msgKey == pbSaveData.SendID {
err := saveUserChat(pbData.SendID, &pbSaveData)
if err != nil {
log.ErrorByKv("data insert to mongo err", pbSaveData.OperationID, "data", pbSaveData.String(), "err", err.Error())
log.NewError(pbSaveData.OperationID, "single data insert to mongo err", err.Error(), pbSaveData.String())
return
}
//if isSenderSync {
// pbSaveData.ContentType = constant.SyncSenderMsg
// log.WarnByKv("SyncSenderMsg come here", pbData.OperationID, pbSaveData.String())
// sendMessageToPush(&pbSaveData)
//}
}
log.NewDebug(pbSaveData.OperationID, "saveUserChat cost time ", utils.GetCurrentTimestampByNano()-time)
}
if msgKey == pbSaveData.RecvID {
pbSaveData.Options = pbData.Options
pbSaveData.OfflineInfo = pbData.OfflineInfo
sendMessageToPush(&pbSaveData)
go sendMessageToPush(&pbSaveData)
log.NewDebug(pbSaveData.OperationID, "sendMessageToPush cost time ", utils.GetCurrentTimestampByNano()-time)
}
log.InfoByKv("msg_transfer handle topic success...", "", "")
} else if pbData.SessionType == constant.GroupChatType {
log.Info("", "", "msg_transfer chat type = GroupChatType")
case constant.GroupChatType:
log.NewDebug(pbSaveData.OperationID, "msg_transfer chat type = GroupChatType", isHistory, isPersist)
if isHistory {
uidAndGroupID := strings.Split(pbData.RecvID, " ")
saveUserChat(uidAndGroupID[0], &pbSaveData)
err := saveUserChat(uidAndGroupID[0], &pbSaveData)
if err != nil {
log.NewError(pbSaveData.OperationID, "group data insert to mongo err", pbSaveData.String(), uidAndGroupID[0], err.Error())
return
}
}
pbSaveData.Options = pbData.Options
pbSaveData.OfflineInfo = pbData.OfflineInfo
sendMessageToPush(&pbSaveData)
log.InfoByKv("msg_transfer handle topic success...", "", "")
} else {
log.Error("", "", "msg_transfer recv chat err, chat.MsgFrom = %d", pbData.SessionType)
go sendMessageToPush(&pbSaveData)
default:
log.NewError(pbSaveData.OperationID, "SessionType error", pbSaveData.String())
return
}
log.NewDebug(pbSaveData.OperationID, "msg_transfer handle topic data to database success...", pbSaveData.String())
}
func (HistoryConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil }
+13
View File
@@ -0,0 +1,13 @@
package common
import (
"encoding/base64"
"fmt"
)
func GetAuthorization(Appkey string, MasterSecret string) string {
str := fmt.Sprintf("%s:%s", Appkey, MasterSecret)
buf := []byte(str)
Authorization := fmt.Sprintf("Basic %s", base64.StdEncoding.EncodeToString(buf))
return Authorization
}
+55
View File
@@ -0,0 +1,55 @@
package push
import (
"Open_IM/internal/push/jpush/common"
"Open_IM/internal/push/jpush/requestBody"
"Open_IM/pkg/common/config"
"bytes"
"encoding/json"
"io/ioutil"
"net/http"
)
type JPushResp struct {
}
func JGAccountListPush(accounts []string, content, detailContent, platform string) ([]byte, error) {
var pf requestBody.Platform
_ = pf.SetPlatform(platform)
var au requestBody.Audience
au.SetAlias(accounts)
var no requestBody.Notification
no.SetAlert(content)
var me requestBody.Message
me.SetMsgContent(detailContent)
var po requestBody.PushObj
po.SetPlatform(&pf)
po.SetAudience(&au)
po.SetNotification(&no)
po.SetMessage(&me)
con, err := json.Marshal(po)
if err != nil {
return nil, err
}
client := &http.Client{}
req, err := http.NewRequest("POST", config.Config.Push.Jpns.PushUrl, bytes.NewBuffer(con))
if err != nil {
return nil, err
}
req.Header.Set("Authorization", common.GetAuthorization(config.Config.Push.Jpns.AppKey, config.Config.Push.Jpns.MasterSecret))
resp, err := client.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
result, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}
return result, nil
}
@@ -0,0 +1,53 @@
package requestBody
const (
TAG = "tag"
TAG_AND = "tag_and"
TAG_NOT = "tag_not"
ALIAS = "alias"
REGISTRATION_ID = "registration_id"
SEGMENT = "segment"
ABTEST = "abtest"
)
type Audience struct {
Object interface{}
audience map[string][]string
}
func (a *Audience) set(key string, v []string) {
if a.audience == nil {
a.audience = make(map[string][]string)
a.Object = a.audience
}
//v, ok = this.audience[key]
//if ok {
// return
//}
a.audience[key] = v
}
func (a *Audience) SetTag(tags []string) {
a.set(TAG, tags)
}
func (a *Audience) SetTagAnd(tags []string) {
a.set(TAG_AND, tags)
}
func (a *Audience) SetTagNot(tags []string) {
a.set(TAG_NOT, tags)
}
func (a *Audience) SetAlias(alias []string) {
a.set(ALIAS, alias)
}
func (a *Audience) SetRegistrationId(ids []string) {
a.set(REGISTRATION_ID, ids)
}
func (a *Audience) SetAll() {
a.Object = "all"
}
@@ -0,0 +1,27 @@
package requestBody
type Message struct {
MsgContent string `json:"msg_content"`
Title string `json:"title,omitempty"`
ContentType string `json:"content_type,omitempty"`
Extras map[string]interface{} `json:"extras,omitempty"`
}
func (m *Message) SetMsgContent(c string) {
m.MsgContent = c
}
func (m *Message) SetTitle(t string) {
m.Title = t
}
func (m *Message) SetContentType(c string) {
m.ContentType = c
}
func (m *Message) SetExtras(key string, value interface{}) {
if m.Extras == nil {
m.Extras = make(map[string]interface{})
}
m.Extras[key] = value
}
@@ -0,0 +1,17 @@
package requestBody
type Notification struct {
Alert string `json:"alert,omitempty"`
Android *Android `json:"android,omitempty"`
IOS *Ios `json:"ios,omitempty"`
}
type Android struct {
}
type Ios struct {
}
func (n *Notification) SetAlert(alert string) {
n.Alert = alert
}
@@ -0,0 +1,83 @@
package requestBody
import (
"Open_IM/pkg/utils"
"errors"
)
const (
ANDROID = "android"
IOS = "ios"
QUICKAPP = "quickapp"
WINDOWSPHONE = "winphone"
ALL = "all"
)
type Platform struct {
Os interface{}
osArry []string
}
func (p *Platform) Set(os string) error {
if p.Os == nil {
p.osArry = make([]string, 0, 4)
} else {
switch p.Os.(type) {
case string:
return errors.New("platform is all")
default:
}
}
for _, value := range p.osArry {
if os == value {
return nil
}
}
switch os {
case IOS:
fallthrough
case ANDROID:
fallthrough
case QUICKAPP:
fallthrough
case WINDOWSPHONE:
p.osArry = append(p.osArry, os)
p.Os = p.osArry
default:
return errors.New("unknow platform")
}
return nil
}
func (p *Platform) SetPlatform(platform string) error {
switch platform {
case utils.AndroidPlatformStr:
return p.SetAndroid()
case utils.IOSPlatformStr:
return p.SetIOS()
default:
return errors.New("platform err")
}
}
func (p *Platform) SetIOS() error {
return p.Set(IOS)
}
func (p *Platform) SetAndroid() error {
return p.Set(ANDROID)
}
func (p *Platform) SetQuickApp() error {
return p.Set(QUICKAPP)
}
func (p *Platform) SetWindowsPhone() error {
return p.Set(WINDOWSPHONE)
}
func (p *Platform) SetAll() {
p.Os = ALL
}
@@ -0,0 +1,24 @@
package requestBody
type PushObj struct {
Platform interface{} `json:"platform"`
Audience interface{} `json:"audience"`
Notification interface{} `json:"notification,omitempty"`
Message interface{} `json:"message,omitempty"`
}
func (p *PushObj) SetPlatform(pf *Platform) {
p.Platform = pf.Os
}
func (p *PushObj) SetAudience(ad *Audience) {
p.Audience = ad.Object
}
func (p *PushObj) SetNotification(no *Notification) {
p.Notification = no
}
func (p *PushObj) SetMessage(m *Message) {
p.Message = m
}
+1 -1
View File
@@ -24,7 +24,7 @@ func Init(rpcPort int) {
log.NewPrivateLog(config.Config.ModuleName.PushName)
rpcServer.Init(rpcPort)
pushCh.Init()
pushTerminal = []int32{utils.IOSPlatformID}
pushTerminal = []int32{utils.IOSPlatformID, utils.AndroidPlatformID}
}
func init() {
producer = kafka.NewKafkaProducer(config.Config.Kafka.Ws2mschat.Addr, config.Config.Kafka.Ws2mschat.Topic)
+60 -98
View File
@@ -7,33 +7,35 @@
package logic
import (
push "Open_IM/internal/push/jpush"
rpcChat "Open_IM/internal/rpc/chat"
"Open_IM/internal/rpc/user/internal_service"
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/log"
"Open_IM/pkg/grpc-etcdv3/getcdv3"
pbChat "Open_IM/pkg/proto/chat"
pbGroup "Open_IM/pkg/proto/group"
pbRelay "Open_IM/pkg/proto/relay"
pbGetInfo "Open_IM/pkg/proto/user"
"Open_IM/pkg/utils"
"Open_IM/pkg/grpc-etcdv3/getcdv3"
"context"
"encoding/json"
"fmt"
"strings"
)
type EChatContent struct {
SessionType int `json:"chatType"`
type OpenIMContent struct {
SessionType int `json:"sessionType"`
From string `json:"from"`
To string `json:"to"`
Seq int64 `json:"seq"`
}
type AtContent struct {
Text string `json:"text"`
AtUserList []string `json:"atUserList"`
IsAtSelf bool `json:"isAtSelf"`
}
func MsgToUser(sendPbData *pbRelay.MsgToUserReq, OfflineInfo, Options string) {
var wsResult []*pbRelay.SingleMsgToUser
//isShouldOfflinePush := true
MOptions := utils.JsonStringToMap(Options) //Control whether to push message to sender's other terminal
//isSenderSync := utils.GetSwitchFromOptions(MOptions, "senderSync")
isOfflinePush := utils.GetSwitchFromOptions(MOptions, "offlinePush")
@@ -51,101 +53,61 @@ func MsgToUser(sendPbData *pbRelay.MsgToUserReq, OfflineInfo, Options string) {
wsResult = append(wsResult, reply.Resp...)
}
}
log.InfoByKv("push_result", sendPbData.OperationID, "result", wsResult)
if isOfflinePush {
for _, v := range wsResult {
if v.ResultCode == 0 {
continue
}
//supported terminal
for _, t := range pushTerminal {
if v.RecvPlatFormID == t {
//Use offline push messaging
var UIDList []string
UIDList = append(UIDList, sendPbData.RecvID)
var sendUIDList []string
sendUIDList = append(sendUIDList, sendPbData.SendID)
userInfo, err := internal_service.GetUserInfoClient(&pbGetInfo.GetUserInfoReq{UserIDList: sendUIDList, OperationID: sendPbData.OperationID})
if err != nil {
log.ErrorByArgs(fmt.Sprintf("err=%v,call GetUserInfoClient rpc server failed", err))
return
}
customContent := EChatContent{
SessionType: int(sendPbData.SessionType),
From: sendPbData.SendID,
To: sendPbData.RecvID,
Seq: sendPbData.RecvSeq,
}
bCustomContent, _ := json.Marshal(customContent)
jsonCustomContent := string(bCustomContent)
switch sendPbData.ContentType {
case constant.Text:
IOSAccountListPush(UIDList, userInfo.Data[0].Name, sendPbData.Content, jsonCustomContent)
case constant.Picture:
IOSAccountListPush(UIDList, userInfo.Data[0].Name, constant.ContentType2PushContent[constant.Picture], jsonCustomContent)
case constant.Voice:
IOSAccountListPush(UIDList, userInfo.Data[0].Name, constant.ContentType2PushContent[constant.Voice], jsonCustomContent)
case constant.Video:
IOSAccountListPush(UIDList, userInfo.Data[0].Name, constant.ContentType2PushContent[constant.Video], jsonCustomContent)
case constant.File:
IOSAccountListPush(UIDList, userInfo.Data[0].Name, constant.ContentType2PushContent[constant.File], jsonCustomContent)
default:
}
}
}
}
/*for _, t := range pushTerminal {
log.InfoByKv("push_result", sendPbData.OperationID, "result", wsResult, "sendData", sendPbData)
if sendPbData.ContentType != constant.Typing && sendPbData.ContentType != constant.HasReadReceipt {
if isOfflinePush {
for _, v := range wsResult {
if v.RecvPlatFormID == t && v.ResultCode == 0 {
isShouldOfflinePush = false
break
if v.ResultCode == 0 {
continue
}
//supported terminal
for _, t := range pushTerminal {
if v.RecvPlatFormID == t {
//Use offline push messaging
var UIDList []string
UIDList = append(UIDList, v.RecvID)
customContent := OpenIMContent{
SessionType: int(sendPbData.SessionType),
From: sendPbData.SendID,
To: sendPbData.RecvID,
Seq: sendPbData.RecvSeq,
}
bCustomContent, _ := json.Marshal(customContent)
jsonCustomContent := string(bCustomContent)
var content string
switch sendPbData.ContentType {
case constant.Text:
content = constant.ContentType2PushContent[constant.Text]
case constant.Picture:
content = constant.ContentType2PushContent[constant.Picture]
case constant.Voice:
content = constant.ContentType2PushContent[constant.Voice]
case constant.Video:
content = constant.ContentType2PushContent[constant.Video]
case constant.File:
content = constant.ContentType2PushContent[constant.File]
case constant.AtText:
a := AtContent{}
_ = utils.JsonStringToStruct(sendPbData.Content, &a)
if utils.IsContain(v.RecvID, a.AtUserList) {
content = constant.ContentType2PushContent[constant.AtText] + constant.ContentType2PushContent[constant.Common]
} else {
content = constant.ContentType2PushContent[constant.GroupMsg]
}
default:
}
pushResult, err := push.JGAccountListPush(UIDList, content, jsonCustomContent, utils.PlatformIDToName(t))
if err != nil {
log.NewError(sendPbData.OperationID, "offline push error", sendPbData.String(), err.Error(), t)
} else {
log.NewDebug(sendPbData.OperationID, "offline push return result is ", string(pushResult), sendPbData, t)
}
}
}
}
if isShouldOfflinePush {
//Use offline push messaging
var UIDList []string
UIDList = append(UIDList, sendPbData.RecvID)
var sendUIDList []string
sendUIDList = append(sendUIDList, sendPbData.SendID)
userInfo, err := internal_service.GetUserInfoClient(&pbGetInfo.GetUserInfoReq{UserIDList: sendUIDList, OperationID: sendPbData.OperationID})
if err != nil {
log.ErrorByArgs(fmt.Sprintf("err=%v,call GetUserInfoClient rpc server failed", err))
return
}
customContent := EChatContent{
SessionType: int(sendPbData.SessionType),
From: sendPbData.SendID,
To: sendPbData.RecvID,
Seq: sendPbData.RecvSeq,
}
bCustomContent, _ := json.Marshal(customContent)
jsonCustomContent := string(bCustomContent)
switch sendPbData.ContentType {
case constant.Text:
IOSAccountListPush(UIDList, userInfo.Data[0].Name, sendPbData.Content, jsonCustomContent)
case constant.Picture:
IOSAccountListPush(UIDList, userInfo.Data[0].Name, constant.ContentType2PushContent[constant.Picture], jsonCustomContent)
case constant.Voice:
IOSAccountListPush(UIDList, userInfo.Data[0].Name, constant.ContentType2PushContent[constant.Voice], jsonCustomContent)
case constant.Video:
IOSAccountListPush(UIDList, userInfo.Data[0].Name, constant.ContentType2PushContent[constant.Video], jsonCustomContent)
case constant.File:
IOSAccountListPush(UIDList, userInfo.Data[0].Name, constant.ContentType2PushContent[constant.File], jsonCustomContent)
default:
}
} else {
isShouldOfflinePush = true
}
}*/
}
}
}
+3
View File
@@ -10,6 +10,9 @@ import (
func (rpc *rpcAuth) UserRegister(_ context.Context, pb *pbAuth.UserRegisterReq) (*pbAuth.UserRegisterResp, error) {
log.Info("", "", "rpc user_register start, [data: %s]", pb.String())
//if len(pb.UID) == 0 {
// pb.UID = utils.GenID()
//}
if err := im_mysql_model.UserRegister(pb); err != nil {
log.Error("", "", "rpc user_register error, [data: %s] [err: %s]", pb.String(), err.Error())
return &pbAuth.UserRegisterResp{Success: false}, err
+1 -1
View File
@@ -18,7 +18,7 @@ func (rpc *rpcAuth) UserToken(_ context.Context, pb *pbAuth.UserTokenReq) (*pbAu
}
log.Info("", "", "rpc user_token call..., im_mysql_model.AppServerFindFromUserByUserID")
tokens, expTime, err := utils.CreateToken(pb.UID, "", pb.Platform)
tokens, expTime, err := utils.CreateToken(pb.UID, pb.Platform)
if err != nil {
log.Error("", "", "rpc user_token call..., utils.CreateToken fail [uid: %s] [err: %s]", pb.UID, err.Error())
return &pbAuth.UserTokenResp{ErrCode: 500, ErrMsg: err.Error()}, err
+48 -20
View File
@@ -13,35 +13,40 @@ import (
pbMsg "Open_IM/pkg/proto/chat"
)
func (rpc *rpcChat) GetNewSeq(_ context.Context, in *pbMsg.GetNewSeqReq) (*pbMsg.GetNewSeqResp, error) {
log.InfoByKv("rpc getNewSeq is arriving", in.OperationID, in.String())
func (rpc *rpcChat) GetMaxAndMinSeq(_ context.Context, in *pbMsg.GetMaxAndMinSeqReq) (*pbMsg.GetMaxAndMinSeqResp, error) {
log.InfoByKv("rpc getMaxAndMinSeq is arriving", in.OperationID, in.String())
//seq, err := model.GetBiggestSeqFromReceive(in.UserID)
seq, err := commonDB.DB.GetUserSeq(in.UserID)
resp := new(pbMsg.GetNewSeqResp)
if err == nil {
resp.Seq = seq
resp.ErrCode = 0
resp.ErrMsg = ""
return resp, err
maxSeq, err1 := commonDB.DB.GetUserMaxSeq(in.UserID)
minSeq, err2 := commonDB.DB.GetUserMinSeq(in.UserID)
resp := new(pbMsg.GetMaxAndMinSeqResp)
if err1 == nil {
resp.MaxSeq = maxSeq
} else if err1 == redis.ErrNil {
resp.MaxSeq = 0
} else {
if err == redis.ErrNil {
resp.Seq = 0
} else {
log.ErrorByKv("getSeq from redis error", in.OperationID, "args", in.String(), "err", err.Error())
resp.Seq = -1
}
resp.ErrCode = 0
resp.ErrMsg = ""
return resp, nil
log.NewError(in.OperationID, "getMaxSeq from redis error", in.String(), err1.Error())
resp.MaxSeq = -1
resp.ErrCode = 200
resp.ErrMsg = "redis get err"
}
if err2 == nil {
resp.MinSeq = minSeq
} else if err2 == redis.ErrNil {
resp.MinSeq = 0
} else {
log.NewError(in.OperationID, "getMaxSeq from redis error", in.String(), err2.Error())
resp.MinSeq = -1
resp.ErrCode = 201
resp.ErrMsg = "redis get err"
}
return resp, nil
}
func (rpc *rpcChat) PullMessage(_ context.Context, in *pbMsg.PullMessageReq) (*pbMsg.PullMessageResp, error) {
log.InfoByKv("rpc pullMessage is arriving", in.OperationID, "args", in.String())
resp := new(pbMsg.PullMessageResp)
var respSingleMsgFormat []*pbMsg.GatherFormat
var respGroupMsgFormat []*pbMsg.GatherFormat
SingleMsgFormat, GroupMsgFormat, MaxSeq, MinSeq, err := commonDB.DB.GetUserChat(in.UserID, in.SeqBegin, in.SeqEnd)
SingleMsgFormat, GroupMsgFormat, MaxSeq, MinSeq, err := commonDB.DB.GetMsgBySeqRange(in.UserID, in.SeqBegin, in.SeqEnd)
if err != nil {
log.ErrorByKv("pullMsg data error", in.OperationID, in.String())
resp.ErrCode = 1
@@ -59,6 +64,29 @@ func (rpc *rpcChat) PullMessage(_ context.Context, in *pbMsg.PullMessageReq) (*p
GroupUserMsg: respGroupMsgFormat,
}, nil
}
func (rpc *rpcChat) PullMessageBySeqList(_ context.Context, in *pbMsg.PullMessageBySeqListReq) (*pbMsg.PullMessageResp, error) {
log.NewInfo(in.OperationID, "rpc PullMessageBySeqList is arriving", in.String())
resp := new(pbMsg.PullMessageResp)
var respSingleMsgFormat []*pbMsg.GatherFormat
var respGroupMsgFormat []*pbMsg.GatherFormat
SingleMsgFormat, GroupMsgFormat, MaxSeq, MinSeq, err := commonDB.DB.GetMsgBySeqList(in.UserID, in.SeqList)
if err != nil {
log.ErrorByKv("PullMessageBySeqList data error", in.OperationID, in.String())
resp.ErrCode = 1
resp.ErrMsg = err.Error()
return resp, nil
}
respSingleMsgFormat = singleMsgHandleByUser(SingleMsgFormat, in.UserID)
respGroupMsgFormat = groupMsgHandleByUser(GroupMsgFormat)
return &pbMsg.PullMessageResp{
ErrCode: 0,
ErrMsg: "",
MaxSeq: MaxSeq,
MinSeq: MinSeq,
SingleUserMsg: respSingleMsgFormat,
GroupUserMsg: respGroupMsgFormat,
}, nil
}
func singleMsgHandleByUser(allMsg []*pbMsg.MsgFormat, ownerId string) []*pbMsg.GatherFormat {
var userid string
var respMsgFormat []*pbMsg.GatherFormat
+66 -71
View File
@@ -42,10 +42,9 @@ type MsgCallBackResp struct {
func (rpc *rpcChat) UserSendMsg(_ context.Context, pb *pbChat.UserSendMsgReq) (*pbChat.UserSendMsgResp, error) {
replay := pbChat.UserSendMsgResp{}
log.InfoByKv("sendMsg", pb.OperationID, "args", pb.String())
if !utils.VerifyToken(pb.Token, pb.SendID) {
return returnMsg(&replay, pb, http.StatusUnauthorized, "token validate err,not authorized", "", 0)
}
log.NewDebug(pb.OperationID, "rpc sendMsg come here", pb.String())
//if !utils.VerifyToken(pb.Token, pb.SendID) {
// return returnMsg(&replay, pb, http.StatusUnauthorized, "token validate err,not authorized", "", 0)
serverMsgID := GetMsgID(pb.SendID)
pbData := pbChat.WSToMsgSvrChatMsg{}
pbData.MsgFrom = pb.MsgFrom
@@ -64,7 +63,11 @@ func (rpc *rpcChat) UserSendMsg(_ context.Context, pb *pbChat.UserSendMsgReq) (*
pbData.MsgID = serverMsgID
pbData.OperationID = pb.OperationID
pbData.Token = pb.Token
pbData.SendTime = utils.GetCurrentTimestampByNano()
if pb.SendTime == 0 {
pbData.SendTime = utils.GetCurrentTimestampByNano()
} else {
pbData.SendTime = pb.SendTime
}
m := MsgCallBackResp{}
if config.Config.MessageCallBack.CallbackSwitch {
bMsg, err := http2.Post(config.Config.MessageCallBack.CallbackUrl, MsgCallBackReq{
@@ -88,85 +91,77 @@ func (rpc *rpcChat) UserSendMsg(_ context.Context, pb *pbChat.UserSendMsgReq) (*
return returnMsg(&replay, pb, m.ResponseErrCode, m.ErrMsg, "", 0)
} else {
pbData.Content = m.ResponseResult.ModifiedMsg
err1 := rpc.sendMsgToKafka(&pbData, pbData.RecvID)
err2 := rpc.sendMsgToKafka(&pbData, pbData.SendID)
if err1 != nil || err2 != nil {
return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0)
}
return returnMsg(&replay, pb, 0, "", serverMsgID, pbData.SendTime)
}
}
} else {
switch pbData.SessionType {
case constant.SingleChatType:
err1 := rpc.sendMsgToKafka(&pbData, pbData.RecvID)
err2 := rpc.sendMsgToKafka(&pbData, pbData.SendID)
if err1 != nil || err2 != nil {
return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0)
}
return returnMsg(&replay, pb, 0, "", serverMsgID, pbData.SendTime)
case constant.GroupChatType:
etcdConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImGroupName)
client := pbGroup.NewGroupClient(etcdConn)
req := &pbGroup.GetGroupAllMemberReq{
GroupID: pbData.RecvID,
Token: pbData.Token,
OperationID: pbData.OperationID,
}
reply, err := client.GetGroupAllMember(context.Background(), req)
}
switch pbData.SessionType {
case constant.SingleChatType:
err1 := rpc.sendMsgToKafka(&pbData, pbData.RecvID)
err2 := rpc.sendMsgToKafka(&pbData, pbData.SendID)
if err1 != nil || err2 != nil {
return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0)
}
return returnMsg(&replay, pb, 0, "", serverMsgID, pbData.SendTime)
case constant.GroupChatType:
etcdConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImGroupName)
client := pbGroup.NewGroupClient(etcdConn)
req := &pbGroup.GetGroupAllMemberReq{
GroupID: pbData.RecvID,
Token: pbData.Token,
OperationID: pbData.OperationID,
}
reply, err := client.GetGroupAllMember(context.Background(), req)
if err != nil {
log.Error(pbData.Token, pbData.OperationID, "rpc send_msg getGroupInfo failed, err = %s", err.Error())
return returnMsg(&replay, pb, 201, err.Error(), "", 0)
}
if reply.ErrorCode != 0 {
log.Error(pbData.Token, pbData.OperationID, "rpc send_msg getGroupInfo failed, err = %s", reply.ErrorMsg)
return returnMsg(&replay, pb, reply.ErrorCode, reply.ErrorMsg, "", 0)
}
var addUidList []string
switch pbData.ContentType {
case constant.KickGroupMemberTip:
var notification content_struct.NotificationContent
var kickContent group.KickGroupMemberReq
err := utils.JsonStringToStruct(pbData.Content, &notification)
if err != nil {
log.Error(pbData.Token, pbData.OperationID, "rpc send_msg getGroupInfo failed, err = %s", err.Error())
return returnMsg(&replay, pb, 201, err.Error(), "", 0)
}
if reply.ErrorCode != 0 {
log.Error(pbData.Token, pbData.OperationID, "rpc send_msg getGroupInfo failed, err = %s", reply.ErrorMsg)
return returnMsg(&replay, pb, reply.ErrorCode, reply.ErrorMsg, "", 0)
}
var addUidList []string
switch pbData.ContentType {
case constant.KickGroupMemberTip:
var notification content_struct.NotificationContent
var kickContent group.KickGroupMemberReq
err := utils.JsonStringToStruct(pbData.Content, &notification)
log.ErrorByKv("json unmarshall err", pbData.OperationID, "err", err.Error())
return returnMsg(&replay, pb, 200, err.Error(), "", 0)
} else {
err := utils.JsonStringToStruct(notification.Detail, &kickContent)
if err != nil {
log.ErrorByKv("json unmarshall err", pbData.OperationID, "err", err.Error())
return returnMsg(&replay, pb, 200, err.Error(), "", 0)
} else {
err := utils.JsonStringToStruct(notification.Detail, &kickContent)
if err != nil {
log.ErrorByKv("json unmarshall err", pbData.OperationID, "err", err.Error())
return returnMsg(&replay, pb, 200, err.Error(), "", 0)
}
for _, v := range kickContent.UidListInfo {
addUidList = append(addUidList, v.UserId)
}
}
case constant.QuitGroupTip:
addUidList = append(addUidList, pbData.SendID)
default:
}
groupID := pbData.RecvID
for i, v := range reply.MemberList {
pbData.RecvID = v.UserId + " " + groupID
err := rpc.sendMsgToKafka(&pbData, utils.IntToString(i))
if err != nil {
return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0)
for _, v := range kickContent.UidListInfo {
addUidList = append(addUidList, v.UserId)
}
}
for i, v := range addUidList {
pbData.RecvID = v + " " + groupID
err := rpc.sendMsgToKafka(&pbData, utils.IntToString(i+1))
if err != nil {
return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0)
}
}
return returnMsg(&replay, pb, 0, "", serverMsgID, pbData.SendTime)
case constant.QuitGroupTip:
addUidList = append(addUidList, pbData.SendID)
default:
}
groupID := pbData.RecvID
for i, v := range reply.MemberList {
pbData.RecvID = v.UserId + " " + groupID
err := rpc.sendMsgToKafka(&pbData, utils.IntToString(i))
if err != nil {
return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0)
}
}
for i, v := range addUidList {
pbData.RecvID = v + " " + groupID
err := rpc.sendMsgToKafka(&pbData, utils.IntToString(i+1))
if err != nil {
return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0)
}
}
return returnMsg(&replay, pb, 0, "", serverMsgID, pbData.SendTime)
default:
return returnMsg(&replay, pb, 203, "unkonwn sessionType", "", 0)
}
return returnMsg(&replay, pb, 203, "unkonwn sessionType", "", 0)
}
func (rpc *rpcChat) sendMsgToKafka(m *pbChat.WSToMsgSvrChatMsg, key string) error {
+19 -11
View File
@@ -55,25 +55,33 @@ func (s *friendServer) AddFriend(ctx context.Context, req *pbFriend.AddFriendReq
func (s *friendServer) ImportFriend(ctx context.Context, req *pbFriend.ImportFriendReq) (*pbFriend.ImportFriendResp, error) {
log.Info(req.Token, req.OperationID, "ImportFriend come here,args=%s", req.String())
var resp pbFriend.ImportFriendResp
var c pbFriend.CommonResp
//Parse token, to find current user information
claims, err := utils.ParseToken(req.Token)
if err != nil {
log.Error(req.Token, req.OperationID, "err=%s,parse token failed", err.Error())
return &pbFriend.ImportFriendResp{CommonResp: &pbFriend.CommonResp{ErrorCode: config.ErrAddFriend.ErrCode, ErrorMsg: config.ErrParseToken.ErrMsg}, FailedUidList: req.UidList}, nil
c.ErrorCode = config.ErrAddFriend.ErrCode
c.ErrorMsg = config.ErrParseToken.ErrMsg
return &pbFriend.ImportFriendResp{CommonResp: &c, FailedUidList: req.UidList}, nil
}
if !utils.IsContain(claims.UID, config.Config.Manager.AppManagerUid) {
log.Error(req.Token, req.OperationID, "not magager uid", claims.UID)
return &pbFriend.ImportFriendResp{CommonResp: &pbFriend.CommonResp{ErrorCode: config.ErrAddFriend.ErrCode, ErrorMsg: "not authorized"}, FailedUidList: req.UidList}, nil
log.Error(req.Token, req.OperationID, "not manager uid", claims.UID)
c.ErrorCode = config.ErrAddFriend.ErrCode
c.ErrorMsg = "not authorized"
return &pbFriend.ImportFriendResp{CommonResp: &c, FailedUidList: req.UidList}, nil
}
if _, err = im_mysql_model.FindUserByUID(req.OwnerUid); err != nil {
log.Error(req.Token, req.OperationID, "this user not exists,cant not add friend", req.OwnerUid)
return &pbFriend.ImportFriendResp{CommonResp: &pbFriend.CommonResp{ErrorCode: config.ErrAddFriend.ErrCode, ErrorMsg: "this user not exists,cant not add friend"}, FailedUidList: req.UidList}, nil
c.ErrorCode = config.ErrAddFriend.ErrCode
c.ErrorMsg = "this user not exists,cant not add friend"
return &pbFriend.ImportFriendResp{CommonResp: &c, FailedUidList: req.UidList}, nil
}
for _, v := range req.UidList {
if _, err = im_mysql_model.FindUserByUID(v); err != nil {
resp.CommonResp.ErrorMsg = "some uid establish failed"
resp.CommonResp.ErrorCode = 408
if _, fErr := im_mysql_model.FindUserByUID(v); fErr != nil {
c.ErrorMsg = "some uid establish failed"
c.ErrorCode = 408
resp.CommonResp = &c
resp.FailedUidList = append(resp.FailedUidList, v)
} else {
if _, err = im_mysql_model.FindFriendRelationshipFromFriend(req.OwnerUid, v); err != nil {
@@ -81,18 +89,18 @@ func (s *friendServer) ImportFriend(ctx context.Context, req *pbFriend.ImportFri
err1 := im_mysql_model.InsertToFriend(req.OwnerUid, v, 1)
if err1 != nil {
resp.FailedUidList = append(resp.FailedUidList, v)
log.Error(req.Token, req.OperationID, "err=%s,create friendship failed", err.Error())
log.NewError(req.OperationID, "err1,create friendship failed", req.OwnerUid, v, err1.Error())
}
err2 := im_mysql_model.InsertToFriend(v, req.OwnerUid, 1)
if err2 != nil {
log.Error(req.Token, req.OperationID, "err=%s,create friendship failed", err.Error())
log.NewError(req.OperationID, "err2,create friendship failed", v, req.OwnerUid, err2.Error())
}
if err1 == nil && err2 == nil {
var name, faceUrl string
n := content_struct.NotificationContent{1, constant.FriendAcceptTip, ""}
n := content_struct.NotificationContent{IsDisplay: 1, DefaultTips: constant.FriendAcceptTip}
r, err := im_mysql_model.FindUserByUID(v)
if err != nil {
log.ErrorByKv("get info failed", req.OperationID, "err", err.Error(), "req", req.String())
log.NewError(req.OperationID, "get info failed", err.Error(), v)
}
if r != nil {
name, faceUrl = r.Name, r.Icon
+14 -8
View File
@@ -70,6 +70,12 @@ func (s *groupServer) InviteUserToGroup(ctx context.Context, req *pbGroup.Invite
return &pbGroup.InviteUserToGroupResp{ErrorCode: config.ErrAccess.ErrCode, ErrorMsg: config.ErrAccess.ErrMsg}, nil
}
groupInfoFromMysql, err := imdb.FindGroupInfoByGroupId(req.GroupID)
if err != nil || groupInfoFromMysql == nil {
log.NewError(req.OperationID, "get group info error", req.GroupID, req.UidList)
return &pbGroup.InviteUserToGroupResp{ErrorCode: config.ErrAccess.ErrCode, ErrorMsg: config.ErrAccess.ErrMsg}, nil
}
//
//from User: invite: applicant
//to user: invite: invited
@@ -157,13 +163,13 @@ func (c *inviteUserToGroupReq) ContentToString() string {
}
func (s *groupServer) GetGroupAllMember(ctx context.Context, req *pbGroup.GetGroupAllMemberReq) (*pbGroup.GetGroupAllMemberResp, error) {
claims, err := utils.ParseToken(req.Token)
if err != nil {
log.Error(req.Token, req.OperationID, "err=%s,parse token failed", err.Error())
if req.Token != config.Config.Secret {
return &pbGroup.GetGroupAllMemberResp{ErrorCode: config.ErrParseToken.ErrCode, ErrorMsg: config.ErrParseToken.ErrMsg}, nil
}
}
//claims, err := utils.ParseToken(req.Token)
//if err != nil {
// log.Error(req.Token, req.OperationID, "err=%s,parse token failed", err.Error())
// if req.Token != config.Config.Secret {
// return &pbGroup.GetGroupAllMemberResp{ErrorCode: config.ErrParseToken.ErrCode, ErrorMsg: config.ErrParseToken.ErrMsg}, nil
// }
//}
var resp pbGroup.GetGroupAllMemberResp
resp.ErrorCode = 0
@@ -171,7 +177,7 @@ func (s *groupServer) GetGroupAllMember(ctx context.Context, req *pbGroup.GetGro
if err != nil {
resp.ErrorCode = config.ErrDb.ErrCode
resp.ErrorMsg = err.Error()
log.Error(claims.UID, req.OperationID, "FindGroupMemberListByGroupId failed, ", err.Error(), "params: ", req.GroupID)
log.NewError(req.OperationID, "FindGroupMemberListByGroupId failed,", err.Error(), req.GroupID)
return &resp, nil
}
+68
View File
@@ -0,0 +1,68 @@
package utils
import (
"Open_IM/pkg/utils"
"net/http"
"net/http/httptest"
"testing"
"github.com/gin-gonic/gin"
"github.com/stretchr/testify/assert"
)
func init() {
gin.SetMode(gin.TestMode)
}
func performRequest(r http.Handler, method, origin string) *httptest.ResponseRecorder {
return performRequestWithHeaders(r, method, origin, http.Header{})
}
func performRequestWithHeaders(r http.Handler, method, origin string, header http.Header) *httptest.ResponseRecorder {
req, _ := http.NewRequest(method, "/", nil)
// From go/net/http/request.go:
// For incoming requests, the Host header is promoted to the
// Request.Host field and removed from the Header map.
req.Host = header.Get("Host")
header.Del("Host")
if len(origin) > 0 {
header.Set("Origin", origin)
}
req.Header = header
w := httptest.NewRecorder()
r.ServeHTTP(w, req)
return w
}
func newTestRouter() *gin.Engine {
router := gin.New()
router.Use(utils.CorsHandler())
router.GET("/", func(c *gin.Context) {
c.String(http.StatusOK, "get")
})
router.POST("/", func(c *gin.Context) {
c.String(http.StatusOK, "post")
})
router.PATCH("/", func(c *gin.Context) {
c.String(http.StatusOK, "patch")
})
return router
}
func Test_CorsHandler(t *testing.T) {
router := newTestRouter()
// no CORS request, origin == ""
w := performRequest(router, "GET", "")
assert.Equal(t, "get", w.Body.String())
assert.Equal(t, w.Header().Get("Access-Control-Allow-Origin"), "*")
assert.Equal(t, w.Header().Get("Access-Control-Allow-Methods"), "*")
assert.Equal(t, w.Header().Get("Access-Control-Allow-Headers"), "*")
assert.Equal(t, w.Header().Get("Access-Control-Expose-Headers"), "Content-Length, Access-Control-Allow-Origin, Access-Control-Allow-Headers,Cache-Control,Content-Language,Content-Type,Expires,Last-Modified,Pragma,FooBar")
assert.Equal(t, w.Header().Get("Access-Control-Max-Age"), "172800")
assert.Equal(t, w.Header().Get("Access-Control-Allow-Credentials"), "false")
assert.Equal(t, w.Header().Get("content-type"), "application/json")
w = performRequest(router, "OPTIONS", "")
assert.Equal(t, w.Body.String(), "\"Options Request!\"")
}
+13
View File
@@ -0,0 +1,13 @@
package utils
import (
"Open_IM/pkg/utils"
"net"
"testing"
)
func TestServerIP(t *testing.T) {
if net.ParseIP(utils.ServerIP) == nil {
t.Fail()
}
}
+28
View File
@@ -0,0 +1,28 @@
package utils
import (
"github.com/bwmarrin/snowflake"
)
func init() {
var err error
idGenerator, err = snowflake.NewNode(getNodeNum())
if err != nil {
panic(err)
}
}
func getNodeNum() int64 {
return 1
}
var idGenerator *snowflake.Node
func GenID() string {
return idGenerator.Generate().String()
}
func GenIDs(count int) []string {
//impl
return []string{}
}
+15
View File
@@ -0,0 +1,15 @@
package utils
import "testing"
func TestGenID(t *testing.T) {
m := map[string]struct{}{}
for i := 0; i < 2000; i++ {
got := GenID()
if _, ok := m[got]; !ok {
m[got] = struct{}{}
} else {
t.Error("id generate error", got)
}
}
}
+28
View File
@@ -0,0 +1,28 @@
package utils
import (
"Open_IM/pkg/utils"
"path/filepath"
"runtime"
"testing"
"github.com/stretchr/testify/assert"
)
var (
_, b, _, _ = runtime.Caller(0)
// Root folder of this project
Root = filepath.Join(filepath.Dir(b), "../..")
)
func Test_GenSmallImage(t *testing.T) {
println(Root)
err := utils.GenSmallImage(Root+"/docs/open-im-logo.png", Root+"/out-test/open-im-logo-test.png")
assert.Nil(t, err)
err = utils.GenSmallImage(Root+"/docs/open-im-logo.png", "out-test/open-im-logo-test.png")
assert.Nil(t, err)
err = utils.GenSmallImage(Root+"/docs/Architecture.jpg", "out-test/Architecture-test.jpg")
assert.Nil(t, err)
}
+82
View File
@@ -0,0 +1,82 @@
package utils
import (
"Open_IM/pkg/common/config"
"Open_IM/pkg/utils"
"testing"
"time"
"github.com/stretchr/testify/assert"
)
func Test_BuildClaims(t *testing.T) {
uid := "1"
platform := "PC"
ttl := int64(-1)
claim := utils.BuildClaims(uid, platform, ttl)
now := time.Now().Unix()
assert.Equal(t, claim.UID, uid, "uid should equal")
assert.Equal(t, claim.Platform, platform, "platform should equal")
assert.Equal(t, claim.RegisteredClaims.ExpiresAt, int64(-1), "StandardClaims.ExpiresAt should be equal")
// time difference within 1s
assert.Equal(t, claim.RegisteredClaims.IssuedAt, now, "StandardClaims.IssuedAt should be equal")
assert.Equal(t, claim.RegisteredClaims.NotBefore, now, "StandardClaims.NotBefore should be equal")
ttl = int64(60)
now = time.Now().Unix()
claim = utils.BuildClaims(uid, platform, ttl)
// time difference within 1s
assert.Equal(t, claim.RegisteredClaims.ExpiresAt, int64(60)+now, "StandardClaims.ExpiresAt should be equal")
assert.Equal(t, claim.RegisteredClaims.IssuedAt, now, "StandardClaims.IssuedAt should be equal")
assert.Equal(t, claim.RegisteredClaims.NotBefore, now, "StandardClaims.NotBefore should be equal")
}
func Test_CreateToken(t *testing.T) {
uid := "1"
platform := int32(1)
now := time.Now().Unix()
tokenString, expiresAt, err := utils.CreateToken(uid, platform)
assert.NotEmpty(t, tokenString)
assert.Equal(t, expiresAt, 604800+now)
assert.Nil(t, err)
}
func Test_VerifyToken(t *testing.T) {
uid := "1"
platform := int32(1)
tokenString, _, _ := utils.CreateToken(uid, platform)
result := utils.VerifyToken(tokenString, uid)
assert.True(t, result)
result = utils.VerifyToken(tokenString, "2")
assert.False(t, result)
}
func Test_ParseRedisInterfaceToken(t *testing.T) {
uid := "1"
platform := int32(1)
tokenString, _, _ := utils.CreateToken(uid, platform)
claims, err := utils.ParseRedisInterfaceToken([]uint8(tokenString))
assert.Nil(t, err)
assert.Equal(t, claims.UID, uid)
// timeout
config.Config.TokenPolicy.AccessExpire = -80
tokenString, _, _ = utils.CreateToken(uid, platform)
claims, err = utils.ParseRedisInterfaceToken([]uint8(tokenString))
assert.Equal(t, err, utils.TokenExpired)
assert.Nil(t, claims)
}
func Test_ParseToken(t *testing.T) {
uid := "1"
platform := int32(1)
tokenString, _, _ := utils.CreateToken(uid, platform)
claims, err := utils.ParseToken(tokenString)
if err == nil {
assert.Equal(t, claims.UID, uid)
}
}
+16
View File
@@ -0,0 +1,16 @@
package utils
import (
"Open_IM/pkg/utils"
"testing"
"github.com/stretchr/testify/assert"
)
func Test_Md5(t *testing.T) {
result := utils.Md5("go")
assert.Equal(t, result, "34d1f91fb2e514b8576fab1a75a89a6b")
result2 := utils.Md5("go")
assert.Equal(t, result, result2)
}
@@ -0,0 +1,46 @@
package utils
import (
"Open_IM/pkg/utils"
"testing"
"github.com/stretchr/testify/assert"
)
func Test_PlatformIDToName(t *testing.T) {
assert.Equal(t, utils.PlatformIDToName(1), "IOS")
assert.Equal(t, utils.PlatformIDToName(2), "Android")
assert.Equal(t, utils.PlatformIDToName(3), "Windows")
assert.Equal(t, utils.PlatformIDToName(4), "OSX")
assert.Equal(t, utils.PlatformIDToName(5), "Web")
assert.Equal(t, utils.PlatformIDToName(6), "MiniWeb")
assert.Equal(t, utils.PlatformIDToName(7), "Linux")
assert.Equal(t, utils.PlatformIDToName(0), "")
}
func Test_PlatformNameToID(t *testing.T) {
assert.Equal(t, utils.PlatformNameToID("IOS"), int32(1))
assert.Equal(t, utils.PlatformNameToID("Android"), int32(2))
assert.Equal(t, utils.PlatformNameToID("Windows"), int32(3))
assert.Equal(t, utils.PlatformNameToID("OSX"), int32(4))
assert.Equal(t, utils.PlatformNameToID("Web"), int32(5))
assert.Equal(t, utils.PlatformNameToID("MiniWeb"), int32(6))
assert.Equal(t, utils.PlatformNameToID("Linux"), int32(7))
assert.Equal(t, utils.PlatformNameToID("UnknownDevice"), int32(0))
assert.Equal(t, utils.PlatformNameToID(""), int32(0))
}
func Test_PlatformNameToClass(t *testing.T) {
assert.Equal(t, utils.PlatformNameToClass("IOS"), "Mobile")
assert.Equal(t, utils.PlatformNameToClass("Android"), "Mobile")
assert.Equal(t, utils.PlatformNameToClass("OSX"), "PC")
assert.Equal(t, utils.PlatformNameToClass("Windows"), "PC")
assert.Equal(t, utils.PlatformNameToClass("Web"), "PC")
assert.Equal(t, utils.PlatformNameToClass("MiniWeb"), "Mobile")
assert.Equal(t, utils.PlatformNameToClass("Linux"), "PC")
assert.Equal(t, utils.PlatformNameToClass("UnknownDevice"), "")
assert.Equal(t, utils.PlatformNameToClass(""), "")
}