mirror of
https://github.com/openimsdk/open-im-server.git
synced 2026-04-28 06:19:20 +08:00
feat: Integrate Comprehensive E2E Testing for GoChat (#1906)
* feat: create e2e test readme Signed-off-by: Xinwei Xiong (cubxxw) <3293172751nss@gmail.com> * feat: fix markdown file * feat: add openim make lint * feat: add git chglog pull request * feat: add git chglog pull request * fix: fix openim api err code * fix: fix openim api err code * fix: fix openim api err code * feat: Improve CICD * feat: Combining GitHub and Google Workspace for Effective Project Management' * feat: fix openim tools error code * feat: fix openim tools error code * feat: add openim error handle * feat: add openim error handle * feat: optimize tim white prom code return err * feat: fix openim tools error code * style: format openim server code style * feat: add openim optimize commit code * feat: add openim optimize commit code * feat: add openim auto format code * feat: add openim auto format code * feat: add openim auto format code * feat: add openim auto format code * feat: add openim auto format code * feat: format openim code * feat: Some of the notes were translated * feat: Some of the notes were translated * feat: update openim server code * feat: optimize openim reset code * feat: optimize openim reset code --------- Signed-off-by: Xinwei Xiong (cubxxw) <3293172751nss@gmail.com>
This commit is contained in:
@@ -20,19 +20,16 @@ import (
|
||||
"github.com/OpenIMSDK/protocol/constant"
|
||||
)
|
||||
|
||||
// RequiredIf validates if the specified field is required based on the session type.
|
||||
func RequiredIf(fl validator.FieldLevel) bool {
|
||||
sessionType := fl.Parent().FieldByName("SessionType").Int()
|
||||
|
||||
switch sessionType {
|
||||
case constant.SingleChatType, constant.NotificationChatType:
|
||||
if fl.FieldName() == "RecvID" {
|
||||
return fl.Field().String() != ""
|
||||
}
|
||||
return fl.FieldName() != "RecvID" || fl.Field().String() != ""
|
||||
case constant.GroupChatType, constant.SuperGroupChatType:
|
||||
if fl.FieldName() == "GroupID" {
|
||||
return fl.Field().String() != ""
|
||||
}
|
||||
return fl.FieldName() != "GroupID" || fl.Field().String() != ""
|
||||
default:
|
||||
return true
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
+2
-7
@@ -210,7 +210,6 @@ func (m *MessageApi) SendMessage(c *gin.Context) {
|
||||
sendMsgReq, err := m.getSendMsgReq(c, req.SendMsg)
|
||||
if err != nil {
|
||||
// Log and respond with an error if preparation fails.
|
||||
log.ZError(c, "decodeData failed", err)
|
||||
apiresp.GinError(c, err)
|
||||
return
|
||||
}
|
||||
@@ -226,7 +225,6 @@ func (m *MessageApi) SendMessage(c *gin.Context) {
|
||||
if err != nil {
|
||||
// Set the status to failed and respond with an error if sending fails.
|
||||
status = constant.MsgSendFailed
|
||||
log.ZError(c, "send message err", err)
|
||||
apiresp.GinError(c, err)
|
||||
return
|
||||
}
|
||||
@@ -240,7 +238,8 @@ func (m *MessageApi) SendMessage(c *gin.Context) {
|
||||
})
|
||||
if err != nil {
|
||||
// Log the error if updating the status fails.
|
||||
log.ZError(c, "SetSendMsgStatus failed", err)
|
||||
apiresp.GinError(c, err)
|
||||
return
|
||||
}
|
||||
|
||||
// Respond with a success message and the response payload.
|
||||
@@ -299,7 +298,6 @@ func (m *MessageApi) BatchSendMsg(c *gin.Context) {
|
||||
resp apistruct.BatchSendMsgResp
|
||||
)
|
||||
if err := c.BindJSON(&req); err != nil {
|
||||
log.ZError(c, "BatchSendMsg BindJSON failed", err)
|
||||
apiresp.GinError(c, errs.ErrArgs.WithDetail(err.Error()).Wrap())
|
||||
return
|
||||
}
|
||||
@@ -310,14 +308,12 @@ func (m *MessageApi) BatchSendMsg(c *gin.Context) {
|
||||
}
|
||||
|
||||
var recvIDs []string
|
||||
var err error
|
||||
if req.IsSendAll {
|
||||
pageNumber := 1
|
||||
showNumber := 500
|
||||
for {
|
||||
recvIDsPart, err := m.userRpcClient.GetAllUserIDs(c, int32(pageNumber), int32(showNumber))
|
||||
if err != nil {
|
||||
log.ZError(c, "GetAllUserIDs failed", err)
|
||||
apiresp.GinError(c, err)
|
||||
return
|
||||
}
|
||||
@@ -333,7 +329,6 @@ func (m *MessageApi) BatchSendMsg(c *gin.Context) {
|
||||
log.ZDebug(c, "BatchSendMsg nums", "nums ", len(recvIDs))
|
||||
sendMsgReq, err := m.getSendMsgReq(c, req.SendMsg)
|
||||
if err != nil {
|
||||
log.ZError(c, "decodeData failed", err)
|
||||
apiresp.GinError(c, err)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -44,7 +44,7 @@ import (
|
||||
)
|
||||
|
||||
func NewGinRouter(discov discoveryregistry.SvcDiscoveryRegistry, rdb redis.UniversalClient) *gin.Engine {
|
||||
discov.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin"))) // 默认RPC中间件
|
||||
discov.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin"))) // Default RPC middleware
|
||||
gin.SetMode(gin.ReleaseMode)
|
||||
r := gin.New()
|
||||
if v, ok := binding.Validator.Engine().(*validator.Validate); ok {
|
||||
@@ -225,6 +225,7 @@ func NewGinRouter(discov discoveryregistry.SvcDiscoveryRegistry, rdb redis.Unive
|
||||
return r
|
||||
}
|
||||
|
||||
// GinParseToken is a middleware that parses the token in the request header and verifies it.
|
||||
func GinParseToken(rdb redis.UniversalClient) gin.HandlerFunc {
|
||||
dataBase := controller.NewAuthDatabase(
|
||||
cache.NewMsgCacheModel(rdb),
|
||||
@@ -250,13 +251,11 @@ func GinParseToken(rdb redis.UniversalClient) gin.HandlerFunc {
|
||||
}
|
||||
m, err := dataBase.GetTokensWithoutError(c, claims.UserID, claims.PlatformID)
|
||||
if err != nil {
|
||||
log.ZWarn(c, "cache get token error", errs.ErrTokenNotExist.Wrap())
|
||||
apiresp.GinError(c, errs.ErrTokenNotExist.Wrap())
|
||||
c.Abort()
|
||||
return
|
||||
}
|
||||
if len(m) == 0 {
|
||||
log.ZWarn(c, "cache do not exist token error", errs.ErrTokenNotExist.Wrap())
|
||||
apiresp.GinError(c, errs.ErrTokenNotExist.Wrap())
|
||||
c.Abort()
|
||||
return
|
||||
@@ -265,12 +264,10 @@ func GinParseToken(rdb redis.UniversalClient) gin.HandlerFunc {
|
||||
switch v {
|
||||
case constant.NormalToken:
|
||||
case constant.KickedToken:
|
||||
log.ZWarn(c, "cache kicked token error", errs.ErrTokenKicked.Wrap())
|
||||
apiresp.GinError(c, errs.ErrTokenKicked.Wrap())
|
||||
c.Abort()
|
||||
return
|
||||
default:
|
||||
log.ZWarn(c, "cache unknown token error", errs.ErrTokenUnknown.Wrap())
|
||||
apiresp.GinError(c, errs.ErrTokenUnknown.Wrap())
|
||||
c.Abort()
|
||||
return
|
||||
@@ -286,3 +283,10 @@ func GinParseToken(rdb redis.UniversalClient) gin.HandlerFunc {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// // handleGinError logs and returns an error response through Gin context.
|
||||
// func handleGinError(c *gin.Context, logMessage string, errType errs.CodeError, detail string) {
|
||||
// wrappedErr := errType.Wrap(detail)
|
||||
// apiresp.GinError(c, wrappedErr)
|
||||
// c.Abort()
|
||||
// }
|
||||
|
||||
@@ -68,7 +68,7 @@ func (u *UserApi) GetUsers(c *gin.Context) {
|
||||
func (u *UserApi) GetUsersOnlineStatus(c *gin.Context) {
|
||||
var req msggateway.GetUsersOnlineStatusReq
|
||||
if err := c.BindJSON(&req); err != nil {
|
||||
apiresp.GinError(c, errs.ErrArgs.WithDetail(err.Error()).Wrap())
|
||||
apiresp.GinError(c, err)
|
||||
return
|
||||
}
|
||||
conns, err := u.Discov.GetConns(c, config.Config.RpcRegisterName.OpenImMessageGatewayName)
|
||||
@@ -86,7 +86,7 @@ func (u *UserApi) GetUsersOnlineStatus(c *gin.Context) {
|
||||
msgClient := msggateway.NewMsgGatewayClient(v)
|
||||
reply, err := msgClient.GetUsersOnlineStatus(c, &req)
|
||||
if err != nil {
|
||||
log.ZWarn(c, "GetUsersOnlineStatus rpc err", err)
|
||||
log.ZDebug(c, "GetUsersOnlineStatus rpc error", err)
|
||||
|
||||
parseError := apiresp.ParseError(err)
|
||||
if parseError.ErrCode == errs.NoPermissionError {
|
||||
|
||||
@@ -91,13 +91,7 @@ type Client struct {
|
||||
// }
|
||||
|
||||
// ResetClient updates the client's state with new connection and context information.
|
||||
func (c *Client) ResetClient(
|
||||
ctx *UserConnContext,
|
||||
conn LongConn,
|
||||
isBackground, isCompress bool,
|
||||
longConnServer LongConnServer,
|
||||
token string,
|
||||
) {
|
||||
func (c *Client) ResetClient(ctx *UserConnContext, conn LongConn, isBackground, isCompress bool, longConnServer LongConnServer, token string) {
|
||||
c.w = new(sync.Mutex)
|
||||
c.conn = conn
|
||||
c.PlatformID = utils.StringToInt(ctx.GetPlatformID())
|
||||
@@ -112,9 +106,11 @@ func (c *Client) ResetClient(
|
||||
c.token = token
|
||||
}
|
||||
|
||||
// pingHandler handles ping messages and sends pong responses.
|
||||
func (c *Client) pingHandler(_ string) error {
|
||||
_ = c.conn.SetReadDeadline(pongWait)
|
||||
if err := c.conn.SetReadDeadline(pongWait); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return c.writePongMsg()
|
||||
}
|
||||
|
||||
@@ -141,7 +137,8 @@ func (c *Client) readMessage() {
|
||||
}
|
||||
|
||||
log.ZDebug(c.ctx, "readMessage", "messageType", messageType)
|
||||
if c.closed.Load() { // 连接刚置位已经关闭,但是协程还没退出的场景
|
||||
if c.closed.Load() {
|
||||
// The scenario where the connection has just been closed, but the coroutine has not exited
|
||||
c.closedErr = ErrConnClosed
|
||||
return
|
||||
}
|
||||
@@ -185,11 +182,11 @@ func (c *Client) handleMessage(message []byte) error {
|
||||
|
||||
err := c.longConnServer.Decode(message, binaryReq)
|
||||
if err != nil {
|
||||
return errs.Wrap(err)
|
||||
return err
|
||||
}
|
||||
|
||||
if err := c.longConnServer.Validate(binaryReq); err != nil {
|
||||
return errs.Wrap(err)
|
||||
return err
|
||||
}
|
||||
|
||||
if binaryReq.SendID != c.UserID {
|
||||
@@ -239,7 +236,7 @@ func (c *Client) setAppBackgroundStatus(ctx context.Context, req *Req) ([]byte,
|
||||
}
|
||||
|
||||
c.IsBackground = isBackground
|
||||
// todo callback
|
||||
// TODO: callback
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
@@ -273,7 +270,7 @@ func (c *Client) replyMessage(ctx context.Context, binaryReq *Req, err error, re
|
||||
}
|
||||
|
||||
if binaryReq.ReqIdentifier == WsLogoutMsg {
|
||||
return errors.New("user logout")
|
||||
return errs.Wrap(errors.New("user logout"))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@@ -316,17 +313,21 @@ func (c *Client) writeBinaryMsg(resp Resp) error {
|
||||
|
||||
encodedBuf, err := c.longConnServer.Encode(resp)
|
||||
if err != nil {
|
||||
return errs.Wrap(err)
|
||||
return err
|
||||
}
|
||||
|
||||
c.w.Lock()
|
||||
defer c.w.Unlock()
|
||||
|
||||
_ = c.conn.SetWriteDeadline(writeWait)
|
||||
err = c.conn.SetWriteDeadline(writeWait)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if c.IsCompress {
|
||||
resultBuf, compressErr := c.longConnServer.CompressWithPool(encodedBuf)
|
||||
if compressErr != nil {
|
||||
return errs.Wrap(compressErr)
|
||||
return compressErr
|
||||
}
|
||||
return c.conn.WriteMessage(MessageBinary, resultBuf)
|
||||
}
|
||||
@@ -344,7 +345,7 @@ func (c *Client) writePongMsg() error {
|
||||
|
||||
err := c.conn.SetWriteDeadline(writeWait)
|
||||
if err != nil {
|
||||
return errs.Wrap(err)
|
||||
return err
|
||||
}
|
||||
|
||||
return c.conn.WriteMessage(PongMessage, nil)
|
||||
|
||||
@@ -17,7 +17,6 @@ package msggateway
|
||||
import (
|
||||
"bytes"
|
||||
"compress/gzip"
|
||||
"errors"
|
||||
"io"
|
||||
"sync"
|
||||
|
||||
@@ -46,12 +45,15 @@ func NewGzipCompressor() *GzipCompressor {
|
||||
func (g *GzipCompressor) Compress(rawData []byte) ([]byte, error) {
|
||||
gzipBuffer := bytes.Buffer{}
|
||||
gz := gzip.NewWriter(&gzipBuffer)
|
||||
|
||||
if _, err := gz.Write(rawData); err != nil {
|
||||
return nil, errs.Wrap(err)
|
||||
return nil, errs.Wrap(err, "GzipCompressor.Compress: writing to gzip writer failed")
|
||||
}
|
||||
|
||||
if err := gz.Close(); err != nil {
|
||||
return nil, errs.Wrap(err)
|
||||
return nil, errs.Wrap(err, "GzipCompressor.Compress: closing gzip writer failed")
|
||||
}
|
||||
|
||||
return gzipBuffer.Bytes(), nil
|
||||
}
|
||||
|
||||
@@ -63,10 +65,10 @@ func (g *GzipCompressor) CompressWithPool(rawData []byte) ([]byte, error) {
|
||||
gz.Reset(&gzipBuffer)
|
||||
|
||||
if _, err := gz.Write(rawData); err != nil {
|
||||
return nil, errs.Wrap(err)
|
||||
return nil, errs.Wrap(err, "GzipCompressor.CompressWithPool: error writing data")
|
||||
}
|
||||
if err := gz.Close(); err != nil {
|
||||
return nil, errs.Wrap(err)
|
||||
return nil, errs.Wrap(err, "GzipCompressor.CompressWithPool: error closing gzip writer")
|
||||
}
|
||||
return gzipBuffer.Bytes(), nil
|
||||
}
|
||||
@@ -75,32 +77,36 @@ func (g *GzipCompressor) DeCompress(compressedData []byte) ([]byte, error) {
|
||||
buff := bytes.NewBuffer(compressedData)
|
||||
reader, err := gzip.NewReader(buff)
|
||||
if err != nil {
|
||||
return nil, errs.Wrap(err, "NewReader failed")
|
||||
return nil, errs.Wrap(err, "GzipCompressor.DeCompress: NewReader creation failed")
|
||||
}
|
||||
compressedData, err = io.ReadAll(reader)
|
||||
decompressedData, err := io.ReadAll(reader)
|
||||
if err != nil {
|
||||
return nil, errs.Wrap(err, "ReadAll failed")
|
||||
return nil, errs.Wrap(err, "GzipCompressor.DeCompress: reading from gzip reader failed")
|
||||
}
|
||||
_ = reader.Close()
|
||||
return compressedData, nil
|
||||
if err = reader.Close(); err != nil {
|
||||
// Even if closing the reader fails, we've successfully read the data,
|
||||
// so we return the decompressed data and an error indicating the close failure.
|
||||
return decompressedData, errs.Wrap(err, "GzipCompressor.DeCompress: closing gzip reader failed")
|
||||
}
|
||||
return decompressedData, nil
|
||||
}
|
||||
|
||||
func (g *GzipCompressor) DecompressWithPool(compressedData []byte) ([]byte, error) {
|
||||
reader := gzipReaderPool.Get().(*gzip.Reader)
|
||||
if reader == nil {
|
||||
return nil, errs.Wrap(errors.New("NewReader failed"))
|
||||
}
|
||||
defer gzipReaderPool.Put(reader)
|
||||
|
||||
err := reader.Reset(bytes.NewReader(compressedData))
|
||||
if err != nil {
|
||||
return nil, errs.Wrap(err, "NewReader failed")
|
||||
return nil, errs.Wrap(err, "GzipCompressor.DecompressWithPool: resetting gzip reader failed")
|
||||
}
|
||||
|
||||
compressedData, err = io.ReadAll(reader)
|
||||
decompressedData, err := io.ReadAll(reader)
|
||||
if err != nil {
|
||||
return nil, errs.Wrap(err, "ReadAll failed")
|
||||
return nil, errs.Wrap(err, "GzipCompressor.DecompressWithPool: reading from pooled gzip reader failed")
|
||||
}
|
||||
_ = reader.Close()
|
||||
return compressedData, nil
|
||||
if err = reader.Close(); err != nil {
|
||||
// Similar to DeCompress, return the data and error for close failure.
|
||||
return decompressedData, errs.Wrap(err, "GzipCompressor.DecompressWithPool: closing pooled gzip reader failed")
|
||||
}
|
||||
return decompressedData, nil
|
||||
}
|
||||
|
||||
@@ -37,10 +37,16 @@ func TestCompressDecompress(t *testing.T) {
|
||||
|
||||
// compress
|
||||
dest, err := compressor.CompressWithPool(src)
|
||||
if err != nil {
|
||||
t.Log(err)
|
||||
}
|
||||
assert.Equal(t, nil, err)
|
||||
|
||||
// decompress
|
||||
res, err := compressor.DecompressWithPool(dest)
|
||||
if err != nil {
|
||||
t.Log(err)
|
||||
}
|
||||
assert.Equal(t, nil, err)
|
||||
|
||||
// check
|
||||
@@ -60,10 +66,16 @@ func TestCompressDecompressWithConcurrency(t *testing.T) {
|
||||
|
||||
// compress
|
||||
dest, err := compressor.CompressWithPool(src)
|
||||
if err != nil {
|
||||
t.Log(err)
|
||||
}
|
||||
assert.Equal(t, nil, err)
|
||||
|
||||
// decompress
|
||||
res, err := compressor.DecompressWithPool(dest)
|
||||
if err != nil {
|
||||
t.Log(err)
|
||||
}
|
||||
assert.Equal(t, nil, err)
|
||||
|
||||
// check
|
||||
@@ -99,6 +111,7 @@ func BenchmarkDecompress(b *testing.B) {
|
||||
|
||||
compressor := NewGzipCompressor()
|
||||
comdata, err := compressor.Compress(src)
|
||||
|
||||
assert.Equal(b, nil, err)
|
||||
|
||||
for i := 0; i < b.N; i++ {
|
||||
|
||||
@@ -37,7 +37,7 @@ func (g *GobEncoder) Encode(data any) ([]byte, error) {
|
||||
enc := gob.NewEncoder(&buff)
|
||||
err := enc.Encode(data)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, errs.Wrap(err, "GobEncoder.Encode failed")
|
||||
}
|
||||
return buff.Bytes(), nil
|
||||
}
|
||||
@@ -47,7 +47,7 @@ func (g *GobEncoder) Decode(encodeData []byte, decodeData any) error {
|
||||
dec := gob.NewDecoder(buff)
|
||||
err := dec.Decode(decodeData)
|
||||
if err != nil {
|
||||
return errs.Wrap(err)
|
||||
return errs.Wrap(err, "GobEncoder.Decode failed")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -23,14 +23,7 @@ import (
|
||||
|
||||
// RunWsAndServer run ws server.
|
||||
func RunWsAndServer(rpcPort, wsPort, prometheusPort int) error {
|
||||
fmt.Println(
|
||||
"start rpc/msg_gateway server, port: ",
|
||||
rpcPort,
|
||||
wsPort,
|
||||
prometheusPort,
|
||||
", OpenIM version: ",
|
||||
config.Version,
|
||||
)
|
||||
fmt.Println("start rpc/msg_gateway server, port: ", rpcPort, wsPort, prometheusPort, ", OpenIM version: ", config.Version)
|
||||
longServer, err := NewWsServer(
|
||||
WithPort(wsPort),
|
||||
WithMaxConnNum(int64(config.Config.LongConnSvr.WebsocketMaxConnNum)),
|
||||
|
||||
@@ -15,9 +15,11 @@
|
||||
package msggateway
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/OpenIMSDK/tools/errs"
|
||||
"github.com/gorilla/websocket"
|
||||
)
|
||||
|
||||
@@ -72,7 +74,8 @@ func (d *GWebSocket) GenerateLongConn(w http.ResponseWriter, r *http.Request) er
|
||||
|
||||
conn, err := upgrader.Upgrade(w, r, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
// The upgrader.Upgrade method usually returns enough error messages to diagnose problems that may occur during the upgrade
|
||||
return errs.Wrap(err, "GenerateLongConn: WebSocket upgrade failed")
|
||||
}
|
||||
d.conn = conn
|
||||
return nil
|
||||
@@ -96,7 +99,16 @@ func (d *GWebSocket) SetReadDeadline(timeout time.Duration) error {
|
||||
}
|
||||
|
||||
func (d *GWebSocket) SetWriteDeadline(timeout time.Duration) error {
|
||||
return d.conn.SetWriteDeadline(time.Now().Add(timeout))
|
||||
// TODO add error
|
||||
if timeout <= 0 {
|
||||
return errs.Wrap(errors.New("timeout must be greater than 0"))
|
||||
}
|
||||
|
||||
// TODO SetWriteDeadline Future add error handling
|
||||
if err := d.conn.SetWriteDeadline(time.Now().Add(timeout)); err != nil {
|
||||
return errs.Wrap(err, "GWebSocket.SetWriteDeadline failed")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *GWebSocket) Dial(urlStr string, requestHeader http.Header) (*http.Response, error) {
|
||||
|
||||
@@ -20,6 +20,7 @@ import (
|
||||
|
||||
"github.com/OpenIMSDK/protocol/push"
|
||||
"github.com/OpenIMSDK/tools/discoveryregistry"
|
||||
"github.com/OpenIMSDK/tools/errs"
|
||||
|
||||
"github.com/go-playground/validator/v10"
|
||||
"google.golang.org/protobuf/proto"
|
||||
@@ -119,10 +120,10 @@ func NewGrpcHandler(validate *validator.Validate, client discoveryregistry.SvcDi
|
||||
func (g GrpcHandler) GetSeq(context context.Context, data *Req) ([]byte, error) {
|
||||
req := sdkws.GetMaxSeqReq{}
|
||||
if err := proto.Unmarshal(data.Data, &req); err != nil {
|
||||
return nil, err
|
||||
return nil, errs.Wrap(err, "GetSeq: error unmarshaling request")
|
||||
}
|
||||
if err := g.validate.Struct(&req); err != nil {
|
||||
return nil, err
|
||||
return nil, errs.Wrap(err, "GetSeq: validation failed")
|
||||
}
|
||||
resp, err := g.msgRpcClient.GetMaxSeq(context, &req)
|
||||
if err != nil {
|
||||
@@ -130,28 +131,37 @@ func (g GrpcHandler) GetSeq(context context.Context, data *Req) ([]byte, error)
|
||||
}
|
||||
c, err := proto.Marshal(resp)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, errs.Wrap(err, "GetSeq: error marshaling response")
|
||||
}
|
||||
return c, nil
|
||||
}
|
||||
|
||||
func (g GrpcHandler) SendMessage(context context.Context, data *Req) ([]byte, error) {
|
||||
msgData := sdkws.MsgData{}
|
||||
// SendMessage handles the sending of messages through gRPC. It unmarshals the request data,
|
||||
// validates the message, and then sends it using the message RPC client.
|
||||
func (g GrpcHandler) SendMessage(ctx context.Context, data *Req) ([]byte, error) {
|
||||
// Unmarshal the message data from the request.
|
||||
var msgData sdkws.MsgData
|
||||
if err := proto.Unmarshal(data.Data, &msgData); err != nil {
|
||||
return nil, err
|
||||
return nil, errs.Wrap(err, "error unmarshalling message data")
|
||||
}
|
||||
|
||||
// Validate the message data structure.
|
||||
if err := g.validate.Struct(&msgData); err != nil {
|
||||
return nil, err
|
||||
return nil, errs.Wrap(err, "message data validation failed")
|
||||
}
|
||||
|
||||
req := msg.SendMsgReq{MsgData: &msgData}
|
||||
resp, err := g.msgRpcClient.SendMsg(context, &req)
|
||||
|
||||
resp, err := g.msgRpcClient.SendMsg(ctx, &req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
c, err := proto.Marshal(resp)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, errs.Wrap(err, "error marshaling response")
|
||||
}
|
||||
|
||||
return c, nil
|
||||
}
|
||||
|
||||
@@ -162,7 +172,7 @@ func (g GrpcHandler) SendSignalMessage(context context.Context, data *Req) ([]by
|
||||
}
|
||||
c, err := proto.Marshal(resp)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, errs.Wrap(err, "error marshaling response")
|
||||
}
|
||||
return c, nil
|
||||
}
|
||||
@@ -170,7 +180,7 @@ func (g GrpcHandler) SendSignalMessage(context context.Context, data *Req) ([]by
|
||||
func (g GrpcHandler) PullMessageBySeqList(context context.Context, data *Req) ([]byte, error) {
|
||||
req := sdkws.PullMessageBySeqsReq{}
|
||||
if err := proto.Unmarshal(data.Data, &req); err != nil {
|
||||
return nil, err
|
||||
return nil, errs.Wrap(err, "error unmarshaling request")
|
||||
}
|
||||
if err := g.validate.Struct(data); err != nil {
|
||||
return nil, err
|
||||
|
||||
@@ -88,6 +88,7 @@ type WsServer struct {
|
||||
Encoder
|
||||
MessageHandler
|
||||
}
|
||||
|
||||
type kickHandler struct {
|
||||
clientOK bool
|
||||
oldClients []*Client
|
||||
@@ -129,7 +130,9 @@ func (ws *WsServer) UnRegister(c *Client) {
|
||||
}
|
||||
|
||||
func (ws *WsServer) Validate(s any) error {
|
||||
//?question?
|
||||
if s == nil {
|
||||
return errs.Wrap(errors.New("input cannot be nil"))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -276,7 +279,7 @@ func (ws *WsServer) registerClient(client *Client) {
|
||||
log.ZDebug(client.ctx, "user exist", "userID", client.UserID, "platformID", client.PlatformID)
|
||||
if clientOK {
|
||||
ws.clients.Set(client.UserID, client)
|
||||
// 已经有同平台的连接存在
|
||||
// There is already a connection to the platform
|
||||
log.ZInfo(client.ctx, "repeat login", "userID", client.UserID, "platformID", client.PlatformID, "old remote addr", getRemoteAdders(oldClients))
|
||||
ws.onlineUserConnNum.Add(1)
|
||||
} else {
|
||||
|
||||
@@ -19,15 +19,15 @@ import "time"
|
||||
type (
|
||||
Option func(opt *configs)
|
||||
configs struct {
|
||||
// 长连接监听端口
|
||||
// Long connection listening port
|
||||
port int
|
||||
// 长连接允许最大链接数
|
||||
// Maximum number of connections allowed for long connection
|
||||
maxConnNum int64
|
||||
// 连接握手超时时间
|
||||
// Connection handshake timeout
|
||||
handshakeTimeout time.Duration
|
||||
// 允许消息最大长度
|
||||
// Maximum length allowed for messages
|
||||
messageMaxMsgLength int
|
||||
// websocket write buffer, default: 4096, 4kb.
|
||||
// Websocket write buffer, default: 4096, 4kb.
|
||||
writeBufferSize int
|
||||
}
|
||||
)
|
||||
|
||||
@@ -45,8 +45,13 @@ import (
|
||||
)
|
||||
|
||||
type MsgTransfer struct {
|
||||
historyCH *OnlineHistoryRedisConsumerHandler // 这个消费者聚合消息, 订阅的topic:ws2ms_chat, 修改通知发往msg_to_modify topic, 消息存入redis后Incr Redis, 再发消息到ms2pschat topic推送, 发消息到msg_to_mongo topic持久化
|
||||
historyMongoCH *OnlineHistoryMongoConsumerHandler // mongoDB批量插入, 成功后删除redis中消息,以及处理删除通知消息删除的 订阅的topic: msg_to_mongo
|
||||
// This consumer aggregated messages, subscribed to the topic:ws2ms_chat,
|
||||
// the modification notification is sent to msg_to_modify topic, the message is stored in redis, Incr Redis,
|
||||
// and then the message is sent to ms2pschat topic for push, and the message is sent to msg_to_mongo topic for persistence
|
||||
historyCH *OnlineHistoryRedisConsumerHandler
|
||||
// mongoDB batch insert, delete messages in redis after success,
|
||||
// and handle the deletion notification message deleted subscriptions topic: msg_to_mongo
|
||||
historyMongoCH *OnlineHistoryMongoConsumerHandler
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
}
|
||||
@@ -65,6 +70,7 @@ func StartTransfer(prometheusPort int) error {
|
||||
if err = mongo.CreateMsgIndex(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
client, err := kdisc.NewDiscoveryRegister(config.Config.Envs.Discovery)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -73,6 +79,7 @@ func StartTransfer(prometheusPort int) error {
|
||||
if err := client.CreateRpcRootNodes(config.Config.GetServiceNames()); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin")))
|
||||
msgModel := cache.NewMsgCacheModel(rdb)
|
||||
msgDocModel := unrelation.NewMsgMongoDriver(mongo.GetDatabase())
|
||||
@@ -106,7 +113,7 @@ func NewMsgTransfer(msgDatabase controller.CommonMsgDatabase, conversationRpcCli
|
||||
}
|
||||
|
||||
func (m *MsgTransfer) Start(prometheusPort int) error {
|
||||
fmt.Println("start msg transfer", "prometheusPort:", prometheusPort)
|
||||
fmt.Println("Start msg transfer", "prometheusPort:", prometheusPort)
|
||||
if prometheusPort <= 0 {
|
||||
return errs.Wrap(errors.New("prometheusPort not correct"))
|
||||
}
|
||||
|
||||
@@ -155,21 +155,13 @@ func (och *OnlineHistoryRedisConsumerHandler) Run(channelID int) {
|
||||
notStorageNotificationList,
|
||||
)
|
||||
if err := och.msgDatabase.MsgToModifyMQ(ctx, msgChannelValue.uniqueKey, conversationIDNotification, modifyMsgList); err != nil {
|
||||
log.ZError(
|
||||
ctx,
|
||||
"msg to modify mq error",
|
||||
err,
|
||||
"uniqueKey",
|
||||
msgChannelValue.uniqueKey,
|
||||
"modifyMsgList",
|
||||
modifyMsgList,
|
||||
)
|
||||
log.ZError(ctx, "msg to modify mq error", err, "uniqueKey", msgChannelValue.uniqueKey, "modifyMsgList", modifyMsgList)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 获取消息/通知 存储的消息列表, 不存储并且推送的消息列表,.
|
||||
// Get messages/notifications stored message list, not stored and pushed message list.
|
||||
func (och *OnlineHistoryRedisConsumerHandler) getPushStorageMsgList(
|
||||
totalMsgs []*ContextMsg,
|
||||
) (storageMsgList, notStorageMsgList, storageNotificatoinList, notStorageNotificationList, modifyMsgList []*sdkws.MsgData) {
|
||||
@@ -190,7 +182,7 @@ func (och *OnlineHistoryRedisConsumerHandler) getPushStorageMsgList(
|
||||
// clone msg from notificationMsg
|
||||
if options.IsSendMsg() {
|
||||
msg := proto.Clone(v.message).(*sdkws.MsgData)
|
||||
// 消息
|
||||
// message
|
||||
if v.message.Options != nil {
|
||||
msg.Options = msgprocessor.NewMsgOptions()
|
||||
}
|
||||
|
||||
@@ -31,12 +31,7 @@ func url() string {
|
||||
return config.Config.Callback.CallbackUrl
|
||||
}
|
||||
|
||||
func callbackOfflinePush(
|
||||
ctx context.Context,
|
||||
userIDs []string,
|
||||
msg *sdkws.MsgData,
|
||||
offlinePushUserIDs *[]string,
|
||||
) error {
|
||||
func callbackOfflinePush(ctx context.Context, userIDs []string, msg *sdkws.MsgData, offlinePushUserIDs *[]string) error {
|
||||
if !config.Config.Callback.CallbackOfflinePush.Enable || msg.ContentType == constant.Typing {
|
||||
return nil
|
||||
}
|
||||
@@ -59,10 +54,12 @@ func callbackOfflinePush(
|
||||
AtUserIDs: msg.AtUserIDList,
|
||||
Content: GetContent(msg),
|
||||
}
|
||||
|
||||
resp := &callbackstruct.CallbackBeforePushResp{}
|
||||
if err := http.CallBackPostReturn(ctx, url(), req, resp, config.Config.Callback.CallbackOfflinePush); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if len(resp.UserIDs) != 0 {
|
||||
*offlinePushUserIDs = resp.UserIDs
|
||||
}
|
||||
|
||||
@@ -39,20 +39,22 @@ type Fcm struct {
|
||||
cache cache.MsgModel
|
||||
}
|
||||
|
||||
// NewClient initializes a new FCM client using the Firebase Admin SDK.
|
||||
// It requires the FCM service account credentials file located within the project's configuration directory.
|
||||
func NewClient(cache cache.MsgModel) *Fcm {
|
||||
projectRoot := config.GetProjectRoot()
|
||||
projectRoot, _ := config.GetProjectRoot()
|
||||
credentialsFilePath := filepath.Join(projectRoot, "config", config.Config.Push.Fcm.ServiceAccount)
|
||||
opt := option.WithCredentialsFile(credentialsFilePath)
|
||||
fcmApp, err := firebase.NewApp(context.Background(), nil, opt)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
fcmMsgClient, err := fcmApp.Messaging(ctx)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
return &Fcm{fcmMsgCli: fcmMsgClient, cache: cache}
|
||||
}
|
||||
|
||||
|
||||
@@ -229,7 +229,8 @@ func (p *Pusher) Push2SuperGroup(ctx context.Context, groupID string, msg *sdkws
|
||||
}(groupID, kickedUsers)
|
||||
pushToUserIDs = append(pushToUserIDs, kickedUsers...)
|
||||
case constant.GroupDismissedNotification:
|
||||
if msgprocessor.IsNotification(msgprocessor.GetConversationIDByMsg(msg)) { // 消息先到,通知后到
|
||||
// Messages arrive first, notifications arrive later
|
||||
if msgprocessor.IsNotification(msgprocessor.GetConversationIDByMsg(msg)) {
|
||||
var tips sdkws.GroupDismissedTips
|
||||
if p.UnmarshalNotificationElem(msg.Content, &tips) != nil {
|
||||
return err
|
||||
|
||||
@@ -310,7 +310,7 @@ func (c *conversationServer) SetConversations(ctx context.Context,
|
||||
unequal++
|
||||
}
|
||||
}
|
||||
if err := c.conversationDatabase.SetUsersConversationFiledTx(ctx, req.UserIDs, &conversation, m); err != nil {
|
||||
if err := c.conversationDatabase.SetUsersConversationFieldTx(ctx, req.UserIDs, &conversation, m); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if unequal > 0 {
|
||||
@@ -321,7 +321,7 @@ func (c *conversationServer) SetConversations(ctx context.Context,
|
||||
return &pbconversation.SetConversationsResp{}, nil
|
||||
}
|
||||
|
||||
// 获取超级大群开启免打扰的用户ID.
|
||||
// Get user IDs with "Do Not Disturb" enabled in super large groups.
|
||||
func (c *conversationServer) GetRecvMsgNotNotifyUserIDs(ctx context.Context, req *pbconversation.GetRecvMsgNotNotifyUserIDsReq) (*pbconversation.GetRecvMsgNotNotifyUserIDsResp, error) {
|
||||
//userIDs, err := c.conversationDatabase.FindRecvMsgNotNotifyUserIDs(ctx, req.GroupID)
|
||||
//if err != nil {
|
||||
@@ -378,7 +378,7 @@ func (c *conversationServer) CreateGroupChatConversations(ctx context.Context, r
|
||||
}
|
||||
|
||||
func (c *conversationServer) SetConversationMaxSeq(ctx context.Context, req *pbconversation.SetConversationMaxSeqReq) (*pbconversation.SetConversationMaxSeqResp, error) {
|
||||
if err := c.conversationDatabase.UpdateUsersConversationFiled(ctx, req.OwnerUserID, req.ConversationID,
|
||||
if err := c.conversationDatabase.UpdateUsersConversationField(ctx, req.OwnerUserID, req.ConversationID,
|
||||
map[string]any{"max_seq": req.MaxSeq}); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -278,6 +278,7 @@ func (s *friendServer) GetDesignatedFriends(ctx context.Context, req *pbfriend.G
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
// Get the list of friend requests sent out proactively.
|
||||
func (s *friendServer) GetDesignatedFriendsApply(ctx context.Context,
|
||||
req *pbfriend.GetDesignatedFriendsApplyReq) (resp *pbfriend.GetDesignatedFriendsApplyResp, err error) {
|
||||
friendRequests, err := s.friendDatabase.FindBothFriendRequests(ctx, req.FromUserID, req.ToUserID)
|
||||
@@ -292,7 +293,7 @@ func (s *friendServer) GetDesignatedFriendsApply(ctx context.Context,
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
// ok 获取接收到的好友申请(即别人主动申请的).
|
||||
// Get received friend requests (i.e., those initiated by others).
|
||||
func (s *friendServer) GetPaginationFriendsApplyTo(ctx context.Context, req *pbfriend.GetPaginationFriendsApplyToReq) (resp *pbfriend.GetPaginationFriendsApplyToResp, err error) {
|
||||
defer log.ZInfo(ctx, utils.GetFuncName()+" Return")
|
||||
if err := s.userRpcClient.Access(ctx, req.UserID); err != nil {
|
||||
@@ -311,7 +312,6 @@ func (s *friendServer) GetPaginationFriendsApplyTo(ctx context.Context, req *pbf
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
// ok 获取主动发出去的好友申请列表.
|
||||
func (s *friendServer) GetPaginationFriendsApplyFrom(ctx context.Context, req *pbfriend.GetPaginationFriendsApplyFromReq) (resp *pbfriend.GetPaginationFriendsApplyFromResp, err error) {
|
||||
defer log.ZInfo(ctx, utils.GetFuncName()+" Return")
|
||||
resp = &pbfriend.GetPaginationFriendsApplyFromResp{}
|
||||
|
||||
@@ -765,8 +765,8 @@ func (s *groupServer) GroupApplicationResponse(ctx context.Context, req *pbgroup
|
||||
return nil, errs.ErrGroupRequestHandled.Wrap("group request already processed")
|
||||
}
|
||||
var inGroup bool
|
||||
if _, err := s.db.TakeGroupMember(ctx, req.GroupID, req.FromUserID); err == nil {
|
||||
inGroup = true // 已经在群里了
|
||||
if _, takeErr := s.db.TakeGroupMember(ctx, req.GroupID, req.FromUserID); takeErr == nil {
|
||||
inGroup = true // Already in group
|
||||
} else if !s.IsNotFound(err) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -67,7 +67,7 @@ func Start(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) e
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// 根据配置文件策略选择 oss 方式
|
||||
// Select based on the configuration file strategy
|
||||
enable := config.Config.Object.Enable
|
||||
var o s3.Interface
|
||||
switch config.Config.Object.Enable {
|
||||
|
||||
@@ -58,9 +58,9 @@ import (
|
||||
// continue
|
||||
// }
|
||||
// if len(seqs) > 0 {
|
||||
// if err := c.conversationDatabase.UpdateUsersConversationFiled(ctx, []string{conversation.OwnerUserID}, conversation.ConversationID, map[string]interface{}{"latest_msg_destruct_time": now}); err
|
||||
// if err := c.conversationDatabase.UpdateUsersConversationField(ctx, []string{conversation.OwnerUserID}, conversation.ConversationID, map[string]interface{}{"latest_msg_destruct_time": now}); err
|
||||
// != nil {
|
||||
// log.ZError(ctx, "updateUsersConversationFiled failed", err, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID)
|
||||
// log.ZError(ctx, "updateUsersConversationField failed", err, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID)
|
||||
// continue
|
||||
// }
|
||||
// if err := c.msgNotificationSender.UserDeleteMsgsNotification(ctx, conversation.OwnerUserID, conversation.ConversationID, seqs); err != nil {
|
||||
@@ -139,8 +139,8 @@ func (c *MsgTool) ConversationsDestructMsgs() {
|
||||
continue
|
||||
}
|
||||
if len(seqs) > 0 {
|
||||
if err := c.conversationDatabase.UpdateUsersConversationFiled(ctx, []string{conversation.OwnerUserID}, conversation.ConversationID, map[string]any{"latest_msg_destruct_time": now}); err != nil {
|
||||
log.ZError(ctx, "updateUsersConversationFiled failed", err, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID)
|
||||
if err := c.conversationDatabase.UpdateUsersConversationField(ctx, []string{conversation.OwnerUserID}, conversation.ConversationID, map[string]any{"latest_msg_destruct_time": now}); err != nil {
|
||||
log.ZError(ctx, "updateUsersConversationField failed", err, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID)
|
||||
continue
|
||||
}
|
||||
if err := c.msgNotificationSender.UserDeleteMsgsNotification(ctx, conversation.OwnerUserID, conversation.ConversationID, seqs); err != nil {
|
||||
|
||||
@@ -32,7 +32,7 @@ import (
|
||||
)
|
||||
|
||||
func StartTask() error {
|
||||
fmt.Println("cron task start, config", config.Config.ChatRecordsClearTime)
|
||||
fmt.Println("Cron task start, config:", config.Config.ChatRecordsClearTime)
|
||||
|
||||
msgTool, err := InitMsgTool()
|
||||
if err != nil {
|
||||
@@ -48,16 +48,16 @@ func StartTask() error {
|
||||
|
||||
// register cron tasks
|
||||
var crontab = cron.New()
|
||||
fmt.Println("start chatRecordsClearTime cron task", "cron config", config.Config.ChatRecordsClearTime)
|
||||
fmt.Printf("Start chatRecordsClearTime cron task, cron config: %s\n", config.Config.ChatRecordsClearTime)
|
||||
_, err = crontab.AddFunc(config.Config.ChatRecordsClearTime, cronWrapFunc(rdb, "cron_clear_msg_and_fix_seq", msgTool.AllConversationClearMsgAndFixSeq))
|
||||
if err != nil {
|
||||
return errs.Wrap(err)
|
||||
}
|
||||
|
||||
fmt.Println("start msgDestruct cron task", "cron config", config.Config.MsgDestructTime)
|
||||
fmt.Printf("Start msgDestruct cron task, cron config: %s\n", config.Config.MsgDestructTime)
|
||||
_, err = crontab.AddFunc(config.Config.MsgDestructTime, cronWrapFunc(rdb, "cron_conversations_destruct_msgs", msgTool.ConversationsDestructMsgs))
|
||||
if err != nil {
|
||||
return errs.Wrap(err)
|
||||
return errs.Wrap(err, "cron_conversations_destruct_msgs")
|
||||
}
|
||||
|
||||
// start crontab
|
||||
|
||||
@@ -197,7 +197,8 @@ func (c *MsgTool) checkMaxSeqWithMongo(ctx context.Context, conversationID strin
|
||||
return err
|
||||
}
|
||||
if math.Abs(float64(maxSeqMongo-maxSeqCache)) > 10 {
|
||||
log.ZError(ctx, "cache max seq and mongo max seq is diff > 10", nil, "maxSeqMongo", maxSeqMongo, "minSeqMongo", minSeqMongo, "maxSeqCache", maxSeqCache, "conversationID", conversationID)
|
||||
err = fmt.Errorf("cache max seq and mongo max seq is diff > 10, maxSeqMongo:%d,minSeqMongo:%d,maxSeqCache:%d,conversationID:%s", maxSeqMongo, minSeqMongo, maxSeqCache, conversationID)
|
||||
return errs.Wrap(err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@@ -219,7 +220,6 @@ func (c *MsgTool) checkMaxSeq(ctx context.Context, conversationID string) error
|
||||
func (c *MsgTool) FixAllSeq(ctx context.Context) error {
|
||||
conversationIDs, err := c.conversationDatabase.GetAllConversationIDs(ctx)
|
||||
if err != nil {
|
||||
log.ZError(ctx, "GetAllConversationIDs failed", err)
|
||||
return err
|
||||
}
|
||||
for _, conversationID := range conversationIDs {
|
||||
|
||||
Reference in New Issue
Block a user