Merge branch 'errcode' of github.com:OpenIMSDK/Open-IM-Server into errcode

 Conflicts:
	cmd/api/main.go
	go.mod
	internal/push/fcm/push.go
	internal/push/getui/push.go
	internal/push/logic/init.go
	internal/push/logic/push_to_client.go
	internal/push/push_rpc_server.go
	internal/rpc/conversation/conversaion.go
	pkg/common/db/controller/group.go
	pkg/proto/push/push.pb.go
This commit is contained in:
wangchuxiao
2023-02-22 19:52:53 +08:00
134 changed files with 3751 additions and 3309 deletions
+1 -1
View File
@@ -1,7 +1,7 @@
package apiAuth
import (
api "Open_IM/pkg/api_struct"
api "Open_IM/pkg/apistruct"
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/log"
+1 -2
View File
@@ -1,10 +1,9 @@
package conversation
import (
api "Open_IM/pkg/api_struct"
api "Open_IM/pkg/apistruct"
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/log"
"Open_IM/pkg/getcdv3"
pbConversation "Open_IM/pkg/proto/conversation"
pbUser "Open_IM/pkg/proto/user"
"Open_IM/pkg/utils"
+1 -1
View File
@@ -2,7 +2,7 @@ package friend
//import (
// jsonData "Open_IM/internal/utils"
// api "Open_IM/pkg/api_struct"
// api "Open_IM/pkg/apistruct"
// "Open_IM/pkg/common/config"
// "Open_IM/pkg/common/log"
// "Open_IM/pkg/common/tokenverify"
+2 -2
View File
@@ -1,8 +1,8 @@
package friend
import (
common "Open_IM/internal/api_to_rpc"
api "Open_IM/pkg/api_struct"
common "Open_IM/internal/api2rpc"
api "Open_IM/pkg/apistruct"
"Open_IM/pkg/common/config"
rpc "Open_IM/pkg/proto/friend"
"github.com/gin-gonic/gin"
+1 -1
View File
@@ -2,7 +2,7 @@ package group
//import (
// common "Open_IM/internal/api_to_rpc"
// api "Open_IM/pkg/api_struct"
// api "Open_IM/pkg/apistruct"
// "Open_IM/pkg/common/config"
// "Open_IM/pkg/common/constant"
// "Open_IM/pkg/common/log"
+4 -4
View File
@@ -1,7 +1,7 @@
package group
import (
"Open_IM/pkg/api_struct"
"Open_IM/pkg/apistruct"
"Open_IM/pkg/proto/group"
"context"
"errors"
@@ -69,10 +69,10 @@ func KickGroupMember(c *gin.Context) {
// 默认 全部自动
NewRpc(NewApiBind[apistruct.KickGroupMemberReq, apistruct.KickGroupMemberResp](c), "", group.NewGroupClient, group.GroupClient.KickGroupMember).Execute()
// 可以自定义编辑请求和响应
a := NewRpc(NewApiBind[api_struct.KickGroupMemberReq, api_struct.KickGroupMemberResp](c), "", group.NewGroupClient, group.GroupClient.KickGroupMember)
a.Before(func(apiReq *api_struct.KickGroupMemberReq, rpcReq *group.KickGroupMemberReq, bind func() error) error {
a := NewRpc(NewApiBind[apistruct.KickGroupMemberReq, apistruct.KickGroupMemberResp](c), "", group.NewGroupClient, group.GroupClient.KickGroupMember)
a.Before(func(apiReq *apistruct.KickGroupMemberReq, rpcReq *group.KickGroupMemberReq, bind func() error) error {
return bind()
}).After(func(rpcResp *group.KickGroupMemberResp, apiResp *api_struct.KickGroupMemberResp, bind func() error) error {
}).After(func(rpcResp *group.KickGroupMemberResp, apiResp *apistruct.KickGroupMemberResp, bind func() error) error {
return bind()
}).Execute()
}
+1 -1
View File
@@ -2,7 +2,7 @@ package group
//import (
// jsonData "Open_IM/internal/utils"
// api "Open_IM/pkg/api_struct"
// api "Open_IM/pkg/apistruct"
// "Open_IM/pkg/common/config"
// "Open_IM/pkg/common/log"
// "Open_IM/pkg/common/token_verify"
+1 -2
View File
@@ -7,10 +7,9 @@
package manage
import (
api "Open_IM/pkg/api_struct"
api "Open_IM/pkg/apistruct"
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/db/mysql_model/im_mysql_model"
"Open_IM/pkg/common/log"
"Open_IM/pkg/common/tokenverify"
pbChat "Open_IM/pkg/proto/msg"
+1 -2
View File
@@ -7,12 +7,11 @@
package manage
import (
api "Open_IM/pkg/api_struct"
api "Open_IM/pkg/apistruct"
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/log"
"Open_IM/pkg/common/tokenverify"
"Open_IM/pkg/getcdv3"
pbRelay "Open_IM/pkg/proto/relay"
rpc "Open_IM/pkg/proto/user"
"Open_IM/pkg/utils"
+1 -2
View File
@@ -1,12 +1,11 @@
package msg
import (
api "Open_IM/pkg/api_struct"
api "Open_IM/pkg/apistruct"
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/log"
"Open_IM/pkg/common/tokenverify"
"Open_IM/pkg/getcdv3"
rpc "Open_IM/pkg/proto/msg"
pbCommon "Open_IM/pkg/proto/sdkws"
"Open_IM/pkg/utils"
+1 -2
View File
@@ -1,12 +1,11 @@
package msg
import (
api "Open_IM/pkg/api_struct"
api "Open_IM/pkg/apistruct"
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/log"
"Open_IM/pkg/common/tokenverify"
"Open_IM/pkg/getcdv3"
rpc "Open_IM/pkg/proto/msg"
"Open_IM/pkg/utils"
"context"
-1
View File
@@ -4,7 +4,6 @@ import (
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/log"
"Open_IM/pkg/common/tokenverify"
"Open_IM/pkg/getcdv3"
pbChat "Open_IM/pkg/proto/msg"
sdkws "Open_IM/pkg/proto/sdkws"
"context"
-1
View File
@@ -4,7 +4,6 @@ import (
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/log"
"Open_IM/pkg/common/tokenverify"
"Open_IM/pkg/getcdv3"
"Open_IM/pkg/proto/msg"
sdkws "Open_IM/pkg/proto/sdkws"
"Open_IM/pkg/utils"
-1
View File
@@ -7,7 +7,6 @@ import (
sdkws "Open_IM/pkg/proto/sdkws"
"context"
"Open_IM/pkg/getcdv3"
"github.com/gin-gonic/gin"
"net/http"
"strings"
+1 -1
View File
@@ -1,7 +1,7 @@
package apiThird
import (
api "Open_IM/pkg/api_struct"
api "Open_IM/pkg/apistruct"
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/log"
+1 -1
View File
@@ -1,7 +1,7 @@
package apiThird
import (
api "Open_IM/pkg/api_struct"
api "Open_IM/pkg/apistruct"
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/log"
+1 -2
View File
@@ -1,8 +1,7 @@
package apiThird
import (
api "Open_IM/pkg/api_struct"
"Open_IM/pkg/common/db"
api "Open_IM/pkg/apistruct"
"Open_IM/pkg/common/log"
"Open_IM/pkg/common/tokenverify"
"Open_IM/pkg/utils"
@@ -1,10 +1,9 @@
package apiThird
import (
api "Open_IM/pkg/api_struct"
api "Open_IM/pkg/apistruct"
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant"
imdb "Open_IM/pkg/common/db/mysql_model/im_mysql_model"
"Open_IM/pkg/common/log"
"Open_IM/pkg/common/tokenverify"
_ "Open_IM/pkg/common/tokenverify"
+1 -2
View File
@@ -1,8 +1,7 @@
package apiThird
import (
api "Open_IM/pkg/api_struct"
"Open_IM/pkg/common/db"
api "Open_IM/pkg/apistruct"
"Open_IM/pkg/common/log"
"Open_IM/pkg/common/tokenverify"
"Open_IM/pkg/utils"
+1 -2
View File
@@ -1,8 +1,7 @@
package apiThird
import (
api "Open_IM/pkg/api_struct"
"Open_IM/pkg/common/db"
api "Open_IM/pkg/apistruct"
"Open_IM/pkg/common/log"
"Open_IM/pkg/common/tokenverify"
"Open_IM/pkg/utils"
@@ -1,7 +1,7 @@
package apiThird
import (
api "Open_IM/pkg/api_struct"
api "Open_IM/pkg/apistruct"
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/log"
+2 -3
View File
@@ -1,17 +1,16 @@
package user
import (
jsonData "Open_IM/internal/utils"
api "Open_IM/pkg/api_struct"
api "Open_IM/pkg/apistruct"
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/log"
"Open_IM/pkg/common/tokenverify"
cacheRpc "Open_IM/pkg/proto/cache"
pbRelay "Open_IM/pkg/proto/relay"
sdkws "Open_IM/pkg/proto/sdkws"
rpc "Open_IM/pkg/proto/user"
"Open_IM/pkg/utils"
jsonData "Open_IM/pkg/utils"
"context"
"net/http"
"strings"
+1 -1
View File
@@ -17,5 +17,5 @@ func NewBlackChecker(zk discoveryRegistry.SvcDiscoveryRegistry) *BlackChecker {
// possibleBlackUserID是否被userID拉黑,也就是是否在userID的黑名单中
func (b *BlackChecker) IsBlocked(ctx context.Context, possibleBlackUserID, userID string) (bool, error) {
return false, nil
}
+1 -1
View File
@@ -31,5 +31,5 @@ func (c *ConversationChecker) getConn() (*grpc.ClientConn, error) {
}
func (c *ConversationChecker) GetSingleConversationRecvMsgOpt(ctx context.Context, userID, conversationID string) (int32, error) {
panic("implement me")
}
+35 -1
View File
@@ -2,6 +2,7 @@ package check
import (
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant"
discoveryRegistry "Open_IM/pkg/discoveryregistry"
"Open_IM/pkg/proto/friend"
sdkws "Open_IM/pkg/proto/sdkws"
@@ -24,7 +25,7 @@ func (f *FriendChecker) GetFriendsInfo(ctx context.Context, ownerUserID, friendU
if err != nil {
return nil, err
}
r, err := friend.NewFriendClient(cc).GetPaginationFriends(ctx, &friend.GetPaginationFriendsReq{OwnerUserID: ownerUserID, FriendUserIDs: []string{friendUserID}})
r, err := friend.NewFriendClient(cc).GetDesignatedFriends(ctx, &friend.GetDesignatedFriendsReq{OwnerUserID: ownerUserID, FriendUserIDs: []string{friendUserID}})
if err != nil {
return nil, err
}
@@ -37,5 +38,38 @@ func (f *FriendChecker) getConn() (*grpc.ClientConn, error) {
// possibleFriendUserID是否在userID的好友中
func (f *FriendChecker) IsFriend(ctx context.Context, possibleFriendUserID, userID string) (bool, error) {
cc, err := f.getConn()
if err != nil {
return false, err
}
resp, err := friend.NewFriendClient(cc).IsFriend(ctx, &friend.IsFriendReq{UserID1: userID, UserID2: possibleFriendUserID})
if err != nil {
return false, err
}
return resp.InUser1Friends, nil
}
func (f *FriendChecker) GetAllPageFriends(ctx context.Context, ownerUserID string) (resp []*sdkws.FriendInfo, err error) {
cc, err := f.getConn()
if err != nil {
return nil, err
}
page := int32(0)
req := friend.GetPaginationFriendsReq{UserID: ownerUserID}
for {
req.Pagination = &sdkws.RequestPagination{PageNumber: page, ShowNumber: constant.ShowNumber}
tmp, err := friend.NewFriendClient(cc).GetPaginationFriends(ctx, &req)
if err != nil {
return nil, err
}
if len(tmp.FriendsInfo) == 0 {
if tmp.Total == int32(len(resp)) {
return resp, nil
}
return nil, constant.ErrData.Wrap("The total number of results and expectations are different, but result is nil")
}
resp = append(resp, tmp.FriendsInfo...)
page++
}
}
+8 -5
View File
@@ -12,10 +12,6 @@ import (
"strings"
)
//func GetUsersInfo(ctx context.Context, args ...interface{}) ([]*sdkws.UserInfo, error) {
// return nil, errors.New("TODO:GetUserInfo")
//}
func NewUserCheck(zk discoveryRegistry.SvcDiscoveryRegistry) *UserCheck {
return &UserCheck{
zk: zk,
@@ -104,5 +100,12 @@ func (u *UserCheck) GetPublicUserInfoMap(ctx context.Context, userIDs []string,
}
func (u *UserCheck) GetUserGlobalMsgRecvOpt(ctx context.Context, userID string) (int32, error) {
cc, err := u.getConn()
if err != nil {
return 0, err
}
resp, err := user.NewUserClient(cc).GetGlobalRecvMessageOpt(ctx, &user.GetGlobalRecvMessageOptReq{
UserID: userID,
})
return resp.GlobalRecvMsgOpt, err
}
+2 -2
View File
@@ -8,8 +8,8 @@ import (
"github.com/golang/protobuf/proto"
)
func (c *Check) DeleteMessageNotification(ctx context.Context, userID string, seqList []uint32, operationID string) {
DeleteMessageTips := sdkws.DeleteMessageTips{UserID: userID, SeqList: seqList}
func (c *Check) DeleteMessageNotification(ctx context.Context, userID string, seqs []int64, operationID string) {
DeleteMessageTips := sdkws.DeleteMessageTips{UserID: userID, Seqs: seqs}
c.MessageNotification(ctx, userID, userID, constant.DeleteMessageNotification, &DeleteMessageTips)
}
-1
View File
@@ -2,7 +2,6 @@ package msggateway
import (
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/db"
"Open_IM/pkg/common/log"
pbChat "Open_IM/pkg/proto/msg"
sdkws "Open_IM/pkg/proto/sdkws"
+1 -1
View File
@@ -8,7 +8,7 @@ import (
"fmt"
"sync"
prome "Open_IM/pkg/common/prometheus"
prome "Open_IM/pkg/common/prome"
"github.com/go-playground/validator/v10"
)
-1
View File
@@ -3,7 +3,6 @@ package msggateway
import (
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/db"
"Open_IM/pkg/common/log"
"Open_IM/pkg/common/prome"
pbChat "Open_IM/pkg/proto/msg"
+34 -18
View File
@@ -3,14 +3,12 @@ package new
import (
"Open_IM/pkg/common/constant"
"Open_IM/pkg/utils"
"bytes"
"context"
"errors"
"fmt"
"github.com/go-playground/validator/v10"
"runtime/debug"
"sync"
"time"
)
const (
@@ -35,35 +33,52 @@ const (
type Client struct {
w *sync.Mutex
conn LongConn
PlatformID int32
PushedMaxSeq uint32
IsCompress bool
platformID int
isCompress bool
userID string
IsBackground bool
token string
isBackground bool
connID string
onlineAt int64 // 上线时间戳(毫秒)
handler MessageHandler
unregisterChan chan *Client
compressor Compressor
encoder Encoder
userContext UserConnContext
validate *validator.Validate
closed bool
}
func newClient(conn LongConn, isCompress bool, userID string, isBackground bool, token string,
connID string, onlineAt int64, handler MessageHandler, unregisterChan chan *Client) *Client {
func newClient(ctx *UserConnContext, conn LongConn, isCompress bool, compressor Compressor, encoder Encoder,
handler MessageHandler, unregisterChan chan *Client, validate *validator.Validate) *Client {
return &Client{
conn: conn,
IsCompress: isCompress,
userID: userID, IsBackground: isBackground, token: token,
connID: connID,
onlineAt: onlineAt,
w: new(sync.Mutex),
conn: conn,
platformID: utils.StringToInt(ctx.GetPlatformID()),
isCompress: isCompress,
userID: ctx.GetUserID(),
compressor: compressor,
encoder: encoder,
connID: ctx.GetConnID(),
onlineAt: utils.GetCurrentTimestampByMill(),
handler: handler,
unregisterChan: unregisterChan,
validate: validate,
}
}
func (c *Client) ResetClient(ctx *UserConnContext, conn LongConn, isCompress bool, compressor Compressor, encoder Encoder,
handler MessageHandler, unregisterChan chan *Client, validate *validator.Validate) {
c.w = new(sync.Mutex)
c.conn = conn
c.platformID = utils.StringToInt(ctx.GetPlatformID())
c.isCompress = isCompress
c.userID = ctx.GetUserID()
c.compressor = compressor
c.encoder = encoder
c.connID = ctx.GetConnID()
c.onlineAt = utils.GetCurrentTimestampByMill()
c.handler = handler
c.unregisterChan = unregisterChan
c.validate = validate
}
func (c *Client) readMessage() {
defer func() {
if r := recover(); r != nil {
@@ -77,7 +92,7 @@ func (c *Client) readMessage() {
if returnErr != nil {
break
}
if c.closed == true {
if c.closed == true { //连接刚置位已经关闭,但是协程还没退出的场景
break
}
switch messageType {
@@ -119,7 +134,8 @@ func (c *Client) handleMessage(message []byte) error {
return errors.New("exception conn userID not same to req userID")
}
ctx := context.Background()
ctx = context.WithValue(ctx, "operationID", binaryReq.OperationID)
ctx = context.WithValue(ctx, c.connID, binaryReq.OperationID)
ctx = context.WithValue(ctx, OPERATION_ID, binaryReq.OperationID)
ctx = context.WithValue(ctx, "userID", binaryReq.SendID)
var messageErr error
var resp []byte
@@ -173,7 +189,7 @@ func (c *Client) writeMsg(resp Resp) error {
return utils.Wrap(err, "")
}
_ = c.conn.SetWriteTimeout(60)
if c.IsCompress {
if c.isCompress {
var compressErr error
resultBuf, compressErr = c.compressor.Compress(encodeBuf)
if compressErr != nil {
-1
View File
@@ -4,7 +4,6 @@ import (
"bytes"
"compress/gzip"
"io/ioutil"
"open_im_sdk/pkg/utils"
)
type Compressor interface {
+10
View File
@@ -0,0 +1,10 @@
package new
const (
USERID = "sendID"
PLATFORM_ID = "platformID"
TOKEN = "token"
OPERATION_ID = "operationID"
COMPRESSION = "compression"
GZIP_COMPRESSION_PROTOCAL = "gzip"
)
+32 -5
View File
@@ -1,6 +1,10 @@
package new
import "net/http"
import (
"Open_IM/pkg/utils"
"net/http"
"strconv"
)
type UserConnContext struct {
RespWriter http.ResponseWriter
@@ -19,9 +23,32 @@ func newContext(respWriter http.ResponseWriter, req *http.Request) *UserConnCont
RemoteAddr: req.RemoteAddr,
}
}
func (c *UserConnContext) Query(key string) string {
return c.Req.URL.Query().Get(key)
func (c *UserConnContext) Query(key string) (string, bool) {
var value string
if value = c.Req.URL.Query().Get(key); value == "" {
return value, false
}
return value, true
}
func (c *UserConnContext) GetHeader(key string) string {
return c.Req.Header.Get(key)
func (c *UserConnContext) GetHeader(key string) (string, bool) {
var value string
if value = c.Req.Header.Get(key); value == "" {
return value, false
}
return value, true
}
func (c *UserConnContext) SetHeader(key, value string) {
c.RespWriter.Header().Set(key, value)
}
func (c *UserConnContext) ErrReturn(error string, code int) {
http.Error(c.RespWriter, error, code)
}
func (c *UserConnContext) GetConnID() string {
return c.RemoteAddr + "_" + strconv.Itoa(int(utils.GetCurrentTimestampByMill()))
}
func (c *UserConnContext) GetUserID() string {
return c.Req.URL.Query().Get(USERID)
}
func (c *UserConnContext) GetPlatformID() string {
return c.Req.URL.Query().Get(PLATFORM_ID)
}
-1
View File
@@ -3,7 +3,6 @@ package new
import (
"bytes"
"encoding/gob"
"open_im_sdk/pkg/utils"
)
type Encoder interface {
+44
View File
@@ -0,0 +1,44 @@
package new
import (
"Open_IM/pkg/common/constant"
"errors"
"net/http"
)
func httpError(ctx *UserConnContext, err error) {
code := http.StatusUnauthorized
ctx.SetHeader("Sec-Websocket-Version", "13")
ctx.SetHeader("ws_err_msg", err.Error())
if errors.Is(err, constant.ErrTokenExpired) {
code = int(constant.ErrTokenExpired.ErrCode)
}
if errors.Is(err, constant.ErrTokenInvalid) {
code = int(constant.ErrTokenInvalid.ErrCode)
}
if errors.Is(err, constant.ErrTokenMalformed) {
code = int(constant.ErrTokenMalformed.ErrCode)
}
if errors.Is(err, constant.ErrTokenNotValidYet) {
code = int(constant.ErrTokenNotValidYet.ErrCode)
}
if errors.Is(err, constant.ErrTokenUnknown) {
code = int(constant.ErrTokenUnknown.ErrCode)
}
if errors.Is(err, constant.ErrTokenKicked) {
code = int(constant.ErrTokenKicked.ErrCode)
}
if errors.Is(err, constant.ErrTokenDifferentPlatformID) {
code = int(constant.ErrTokenDifferentPlatformID.ErrCode)
}
if errors.Is(err, constant.ErrTokenDifferentUserID) {
code = int(constant.ErrTokenDifferentUserID.ErrCode)
}
if errors.Is(err, constant.ErrConnOverMaxNumLimit) {
code = int(constant.ErrConnOverMaxNumLimit.ErrCode)
}
if errors.Is(err, constant.ErrConnArgsErr) {
code = int(constant.ErrConnArgsErr.ErrCode)
}
ctx.ErrReturn(err.Error(), code)
}
+22 -4
View File
@@ -26,19 +26,37 @@ type LongConn interface {
SetConnNil()
//Check the connection of the current and when it was sent are the same
CheckSendConnDiffNow() bool
//
GenerateLongConn(w http.ResponseWriter, r *http.Request) error
}
type GWebSocket struct {
protocolType int
conn *websocket.Conn
protocolType int
conn *websocket.Conn
handshakeTimeout time.Duration
readBufferSize, WriteBufferSize int
}
func NewDefault(protocolType int) *GWebSocket {
return &GWebSocket{protocolType: protocolType}
func newGWebSocket(protocolType int, handshakeTimeout time.Duration, readBufferSize int) *GWebSocket {
return &GWebSocket{protocolType: protocolType, handshakeTimeout: handshakeTimeout, readBufferSize: readBufferSize}
}
func (d *GWebSocket) Close() error {
return d.conn.Close()
}
func (d *GWebSocket) GenerateLongConn(w http.ResponseWriter, r *http.Request) error {
upgrader := &websocket.Upgrader{
HandshakeTimeout: d.handshakeTimeout,
ReadBufferSize: d.readBufferSize,
CheckOrigin: func(r *http.Request) bool { return true },
}
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
return err
}
d.conn = conn
return nil
}
func (d *GWebSocket) WriteMessage(messageType int, message []byte) error {
d.setSendConn(d.conn)
return d.conn.WriteMessage(messageType, message)
+9 -1
View File
@@ -1,6 +1,9 @@
package new
import "context"
import (
"Open_IM/internal/common/check"
"context"
)
type Req struct {
ReqIdentifier int32 `json:"reqIdentifier" validate:"required"`
@@ -30,6 +33,11 @@ type MessageHandler interface {
var _ MessageHandler = (*GrpcHandler)(nil)
type GrpcHandler struct {
msg *check.MsgCheck
}
func NewGrpcHandler(msg *check.MsgCheck) *GrpcHandler {
return &GrpcHandler{msg: msg}
}
func (g GrpcHandler) GetSeq(context context.Context, data Req) ([]byte, error) {
+121 -53
View File
@@ -1,22 +1,24 @@
package new
import (
"bytes"
"Open_IM/pkg/common/constant"
"Open_IM/pkg/utils"
"errors"
"fmt"
"github.com/go-playground/validator/v10"
"github.com/gorilla/websocket"
"net/http"
"open_im_sdk/pkg/utils"
"sync"
"sync/atomic"
"time"
)
var bufferPool = sync.Pool{
New: func() interface{} {
return make([]byte, 1000)
},
}
type LongConnServer interface {
Run() error
}
@@ -28,17 +30,21 @@ type Server struct {
rpcServer *RpcServer
}
type WsServer struct {
port int
wsMaxConnNum int
wsUpGrader *websocket.Upgrader
registerChan chan *Client
unregisterChan chan *Client
clients *UserMap
clientPool sync.Pool
onlineUserNum int64
onlineUserConnNum int64
compressor Compressor
handler MessageHandler
port int
wsMaxConnNum int64
wsUpGrader *websocket.Upgrader
registerChan chan *Client
unregisterChan chan *Client
clients *UserMap
clientPool sync.Pool
onlineUserNum int64
onlineUserConnNum int64
gzipCompressor Compressor
encoder Encoder
handler MessageHandler
handshakeTimeout time.Duration
readBufferSize, WriteBufferSize int
validate *validator.Validate
}
func newWsServer(opts ...Option) (*WsServer, error) {
@@ -51,18 +57,18 @@ func newWsServer(opts ...Option) (*WsServer, error) {
}
return &WsServer{
port: config.port,
wsMaxConnNum: config.maxConnNum,
wsUpGrader: &websocket.Upgrader{
HandshakeTimeout: config.handshakeTimeout,
ReadBufferSize: config.messageMaxMsgLength,
CheckOrigin: func(r *http.Request) bool { return true },
},
port: config.port,
wsMaxConnNum: config.maxConnNum,
handshakeTimeout: config.handshakeTimeout,
readBufferSize: config.messageMaxMsgLength,
clientPool: sync.Pool{
New: func() interface{} {
return new(Client)
},
},
validate: validator.New(),
clients: newUserMap(),
handler: NewGrpcHandler(),
}, nil
}
func (ws *WsServer) Run() error {
@@ -72,53 +78,115 @@ func (ws *WsServer) Run() error {
select {
case client = <-ws.registerChan:
ws.registerClient(client)
case client = <-h.unregisterChan:
h.unregisterClient(client)
case msg = <-h.readChan:
h.messageHandler(msg)
case client = <-ws.unregisterChan:
ws.unregisterClient(client)
}
}
}()
http.HandleFunc("/", ws.wsHandler) //Get request from client to handle by wsHandler
return http.ListenAndServe(":"+utils.IntToString(ws.port), nil) //Start listening
}
func (ws *WsServer) registerClient(client *Client) {
var (
ok bool
userOK bool
clientOK bool
cli *Client
)
if cli, ok = h.clients.Get(client.key); ok == false {
h.clients.Set(client.key, client)
atomic.AddInt64(&h.onlineConnections, 1)
fmt.Println("R在线用户数量:", h.onlineConnections)
return
cli, userOK,clientOK = ws.clients.Get(client.userID,client.platformID)
if !userOK {
ws.clients.Set(client.userID,client)
atomic.AddInt64(&ws.onlineUserNum, 1)
atomic.AddInt64(&ws.onlineUserConnNum, 1)
fmt.Println("R在线用户数量:", ws.onlineUserNum)
fmt.Println("R在线用户连接数量:", ws.onlineUserConnNum)
}else{
if clientOK {//已经有同平台的连接存在
ws.clients.Set(client.userID,client)
ws.multiTerminalLoginChecker(cli)
}else{
ws.clients.Set(client.userID,client)
atomic.AddInt64(&ws.onlineUserConnNum, 1)
fmt.Println("R在线用户数量:", ws.onlineUserNum)
fmt.Println("R在线用户连接数量:", ws.onlineUserConnNum)
}
}
if client.onlineAt > cli.onlineAt {
h.clients.Set(client.key, client)
h.close(cli)
return
}
h.close(client)
}
http.HandleFunc("/", ws.wsHandler) //Get request from client to handle by wsHandler
return http.ListenAndServe(":"+utils.IntToString(ws.port), nil) //Start listening
func (ws *WsServer) multiTerminalLoginChecker(client *Client) {
}
func (ws *WsServer) unregisterClient(client *Client) {
isDeleteUser:=ws.clients.delete(client.userID,client.platformID)
if isDeleteUser {
atomic.AddInt64(&ws.onlineUserNum, -1)
}
atomic.AddInt64(&ws.onlineUserConnNum, -1)
fmt.Println("R在线用户数量:", ws.onlineUserNum)
fmt.Println("R在线用户连接数量:", ws.onlineUserConnNum)
}
}
func (ws *WsServer) wsHandler(w http.ResponseWriter, r *http.Request) {
context := newContext(w, r)
if isPass, compression := ws.headerCheck(w, r, operationID); isPass {
conn, err := ws.wsUpGrader.Upgrade(w, r, nil) //Conn is obtained through the upgraded escalator
if err != nil {
log.Error(operationID, "upgrade http conn err", err.Error(), query)
return
} else {
newConn := &UserConn{conn, new(sync.Mutex), utils.StringToInt32(query["platformID"][0]), 0, compression, query["sendID"][0], false, query["token"][0], conn.RemoteAddr().String() + "_" + strconv.Itoa(int(utils.GetCurrentTimestampByMill()))}
userCount++
ws.addUserConn(query["sendID"][0], utils.StringToInt(query["platformID"][0]), newConn, query["token"][0], newConn.connID, operationID)
go ws.readMsg(newConn)
}
} else {
log.Error(operationID, "headerCheck failed ")
if ws.onlineUserConnNum >= ws.wsMaxConnNum {
httpError(context, constant.ErrConnOverMaxNumLimit)
return
}
var (
token string
userID string
platformID string
exists bool
compression bool
compressor Compressor
)
token, exists = context.Query(TOKEN)
if !exists {
httpError(context, constant.ErrConnArgsErr)
return
}
userID, exists = context.Query(USERID)
if !exists {
httpError(context, constant.ErrConnArgsErr)
return
}
platformID, exists = context.Query(PLATFORM_ID)
if !exists {
httpError(context, constant.ErrConnArgsErr)
return
}
err := tokenverify.WsVerifyToken(token, userID, platformID)
if err != nil {
httpError(context, err)
return
}
wsLongConn:=newGWebSocket(constant.WebSocket,ws.handshakeTimeout,ws.readBufferSize)
err = wsLongConn.GenerateLongConn(w, r)
if err != nil {
httpError(context, err)
return
}
compressProtoc, exists := context.Query(COMPRESSION)
if exists {
if compressProtoc==GZIP_COMPRESSION_PROTOCAL{
compression = true
compressor = ws.gzipCompressor
}
}
compressProtoc, exists = context.GetHeader(COMPRESSION)
if exists {
if compressProtoc==GZIP_COMPRESSION_PROTOCAL {
compression = true
compressor = ws.gzipCompressor
}
}
client:=ws.clientPool.Get().(*Client)
client.ResetClient(context,wsLongConn,compression,compressor,ws.encoder,ws.handler,ws.unregisterChan,ws.validate)
ws.registerChan <- client
go client.readMessage()
}
+2 -2
View File
@@ -7,7 +7,7 @@ type configs struct {
//长连接监听端口
port int
//长连接允许最大链接数
maxConnNum int
maxConnNum int64
//连接握手超时时间
handshakeTimeout time.Duration
//允许消息最大长度
@@ -19,7 +19,7 @@ func WithPort(port int) Option {
opt.port = port
}
}
func WithMaxConnNum(num int) Option {
func WithMaxConnNum(num int64) Option {
return func(opt *configs) {
opt.maxConnNum = num
}
+16 -15
View File
@@ -9,24 +9,24 @@ type UserMap struct {
func newUserMap() *UserMap {
return &UserMap{}
}
func (u *UserMap) GetAll(key string) []*Client {
func (u *UserMap) GetAll(key string) ([]*Client, bool) {
allClients, ok := u.m.Load(key)
if ok {
return allClients.([]*Client)
return allClients.([]*Client), ok
}
return nil
return nil, ok
}
func (u *UserMap) Get(key string, platformID int32) (*Client, bool) {
allClients, existed := u.m.Load(key)
if existed {
func (u *UserMap) Get(key string, platformID int) (*Client, bool, bool) {
allClients, userExisted := u.m.Load(key)
if userExisted {
for _, client := range allClients.([]*Client) {
if client.PlatformID == platformID {
return client, existed
if client.platformID == platformID {
return client, userExisted, true
}
}
return nil, false
return nil, userExisted, false
}
return nil, existed
return nil, userExisted, false
}
func (u *UserMap) Set(key string, v *Client) {
allClients, existed := u.m.Load(key)
@@ -40,24 +40,25 @@ func (u *UserMap) Set(key string, v *Client) {
u.m.Store(key, clients)
}
}
func (u *UserMap) delete(key string, platformID int32) {
func (u *UserMap) delete(key string, platformID int) (isDeleteUser bool) {
allClients, existed := u.m.Load(key)
if existed {
oldClients := allClients.([]*Client)
a := make([]*Client, len(oldClients))
a := make([]*Client, 3)
for _, client := range oldClients {
if client.PlatformID != platformID {
if client.platformID != platformID {
a = append(a, client)
}
}
if len(a) == 0 {
u.m.Delete(key)
return true
} else {
u.m.Store(key, a)
return false
}
}
return existed
}
func (u *UserMap) DeleteAll(key string) {
u.m.Delete(key)
+4 -6
View File
@@ -12,15 +12,13 @@ import (
"bytes"
"context"
"encoding/gob"
"github.com/golang/protobuf/proto"
"github.com/gorilla/websocket"
grpcPrometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"google.golang.org/grpc"
"net"
"strconv"
"strings"
"github.com/golang/protobuf/proto"
grpcPrometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/gorilla/websocket"
"google.golang.org/grpc"
)
type RPCServer struct {
+1 -2
View File
@@ -3,9 +3,8 @@ package msggateway
import (
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/db"
"Open_IM/pkg/common/log"
prome "Open_IM/pkg/common/prometheus"
prome "Open_IM/pkg/common/prome"
"Open_IM/pkg/common/tokenverify"
pbRelay "Open_IM/pkg/proto/relay"
"Open_IM/pkg/utils"
+235
View File
@@ -0,0 +1,235 @@
package objstorage
import (
"bytes"
"context"
"crypto/md5"
"encoding/hex"
"encoding/json"
"fmt"
"log"
"math/rand"
"path"
"strconv"
"time"
)
func NewController(i Interface, kv KV) (*Controller, error) {
if err := i.Init(); err != nil {
return nil, err
}
return &Controller{
i: i,
kv: kv,
}, nil
}
type Controller struct {
i Interface
//i *minioImpl
kv KV
}
func (c *Controller) key(v string) string {
return "OBJECT_STORAGE:" + c.i.Name() + ":" + v
}
func (c *Controller) putKey(v string) string {
return c.key("put:" + v)
}
func (c *Controller) pathKey(v string) string {
return c.key("path:" + v)
}
func (c *Controller) ApplyPut(ctx context.Context, args *FragmentPutArgs) (*PutAddr, error) {
if data, err := c.kv.Get(ctx, c.pathKey(args.Hash)); err == nil {
// 服务器已存在
var src BucketFile
if err := json.Unmarshal([]byte(data), &src); err != nil {
return nil, err
}
var bucket string
if args.ClearTime <= 0 {
bucket = c.i.PermanentBucket()
} else {
bucket = c.i.ClearBucket()
}
dst := &BucketFile{
Bucket: bucket,
Name: args.Name,
}
// 直接拷贝一份
err := c.i.CopyObjetInfo(ctx, &src, dst)
if err == nil {
info, err := c.i.GetObjectInfo(ctx, dst)
if err != nil {
return nil, err
}
return &PutAddr{
ResourceURL: info.URL,
}, nil
} else if !c.i.IsNotFound(err) {
return nil, err
}
} else if !c.kv.IsNotFound(err) {
return nil, err
}
// 上传逻辑
name := args.Name
effective := time.Now().Add(args.EffectiveTime)
prefix := c.Prefix(&args.PutArgs)
if minSize := c.i.MinMultipartSize(); args.FragmentSize > 0 && args.FragmentSize < minSize {
args.FragmentSize = minSize
}
var pack int64
if args.FragmentSize <= 0 || args.Size <= args.FragmentSize {
pack = 1
} else {
pack = args.Size / args.FragmentSize
if args.Size%args.FragmentSize > 0 {
pack++
}
}
p := path.Join(path.Dir(args.Name), time.Now().Format("20060102"))
info := putInfo{
Bucket: c.i.UploadBucket(),
Fragments: make([]string, 0, pack),
FragmentSize: args.FragmentSize,
Name: name,
Hash: args.Hash,
Size: args.Size,
}
if args.ClearTime > 0 {
t := time.Now().Add(args.ClearTime).UnixMilli()
info.ClearTime = &t
}
putURLs := make([]string, 0, pack)
for i := int64(1); i <= pack; i++ {
name := prefix + "_" + strconv.FormatInt(i, 10) + path.Ext(args.Name)
name = path.Join(p, name)
info.Fragments = append(info.Fragments, name)
args.Name = name
put, err := c.i.ApplyPut(ctx, &ApplyPutArgs{
Bucket: info.Bucket,
Name: name,
Effective: args.EffectiveTime,
Header: args.Header,
})
if err != nil {
return nil, err
}
putURLs = append(putURLs, put.URL)
}
data, err := json.Marshal(&info)
if err != nil {
return nil, err
}
if err := c.kv.Set(ctx, c.putKey(prefix), string(data), args.EffectiveTime); err != nil {
return nil, err
}
var fragmentSize int64
if pack == 1 {
fragmentSize = args.Size
} else {
fragmentSize = args.FragmentSize
}
return &PutAddr{
PutURLs: putURLs,
FragmentSize: fragmentSize,
PutID: prefix,
EffectiveTime: effective,
}, nil
}
func (c *Controller) ConfirmPut(ctx context.Context, putID string) (*ObjectInfo, error) {
data, err := c.kv.Get(ctx, c.putKey(putID))
if err != nil {
return nil, err
}
var info putInfo
if err := json.Unmarshal([]byte(data), &info); err != nil {
return nil, err
}
var total int64
src := make([]BucketFile, len(info.Fragments))
for i, fragment := range info.Fragments {
state, err := c.i.GetObjectInfo(ctx, &BucketFile{
Bucket: info.Bucket,
Name: fragment,
})
if err != nil {
return nil, err
}
total += state.Size
src[i] = BucketFile{
Bucket: info.Bucket,
Name: fragment,
}
}
if total != info.Size {
return nil, fmt.Errorf("incomplete upload %d/%d", total, info.Size)
}
var dst *BucketFile
if info.ClearTime == nil {
dst = &BucketFile{
Bucket: c.i.PermanentBucket(),
Name: info.Name,
}
} else {
dst = &BucketFile{
Bucket: c.i.ClearBucket(),
Name: info.Name,
}
}
if err := c.i.MergeObjectInfo(ctx, src, dst); err != nil { // SourceInfo 0 is too small (2) and it is not the last part
return nil, err
}
obj, err := c.i.GetObjectInfo(ctx, dst)
if err != nil {
return nil, err
}
go func() {
err := c.kv.Del(ctx, c.putKey(putID))
if err != nil {
log.Println("del key:", err)
}
for _, b := range src {
err = c.i.DeleteObjetInfo(ctx, &b)
if err != nil {
log.Println("del obj:", err)
}
}
}()
return obj, nil
}
func (c *Controller) Prefix(args *PutArgs) string {
buf := bytes.NewBuffer(nil)
buf.WriteString(args.Name)
buf.WriteString("~~~@~@~~~")
buf.WriteString(strconv.FormatInt(args.Size, 10))
buf.WriteString(",")
buf.WriteString(args.Hash)
buf.WriteString(",")
buf.WriteString(strconv.FormatInt(int64(args.ClearTime), 10))
buf.WriteString(",")
buf.WriteString(strconv.FormatInt(int64(args.EffectiveTime), 10))
buf.WriteString(",")
buf.WriteString(c.i.Name())
r := make([]byte, 16)
rand.Read(r)
buf.Write(r)
md5v := md5.Sum(buf.Bytes())
return hex.EncodeToString(md5v[:])
}
type putInfo struct {
Bucket string
Fragments []string
FragmentSize int64
Size int64
Name string
Hash string
ClearTime *int64
}
+49
View File
@@ -0,0 +1,49 @@
package objstorage
import (
"context"
"github.com/go-redis/redis/v8"
"log"
"time"
)
type KV interface {
Get(ctx context.Context, key string) (string, error)
Set(ctx context.Context, key string, val string, expiration time.Duration) error
Del(ctx context.Context, key string) error
IsNotFound(err error) bool
}
func NewKV() KV {
rdb := redis.NewClient(&redis.Options{
Addr: "",
Username: "",
Password: "",
})
return &redisImpl{
rdb: rdb,
}
}
type redisImpl struct {
rdb *redis.Client
}
func (r *redisImpl) Del(ctx context.Context, key string) error {
log.Println("redis del", key)
return r.rdb.Del(ctx, key).Err()
}
func (r *redisImpl) Get(ctx context.Context, key string) (string, error) {
log.Println("redis get", key)
return r.rdb.Get(ctx, key).Result()
}
func (r *redisImpl) Set(ctx context.Context, key string, val string, expiration time.Duration) error {
log.Println("redis set", key, val, expiration.String())
return r.rdb.Set(ctx, key, val, expiration).Err()
}
func (r *redisImpl) IsNotFound(err error) bool {
return err == redis.Nil
}
+112
View File
@@ -0,0 +1,112 @@
package objstorage
import (
"bytes"
"context"
"crypto/md5"
"encoding/hex"
"fmt"
"io"
"log"
"net/http"
"path"
"time"
)
func HttpPut(url string, body io.Reader) error {
req, err := http.NewRequest(http.MethodPut, url, body)
if err != nil {
return err
}
client := http.Client{}
resp, err := client.Do(req)
if err != nil {
return err
}
data, err := io.ReadAll(resp.Body)
if err != nil {
return err
}
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("http [%s] %s", resp.Status, data)
}
if len(data) > 0 {
log.Println("[http body]", string(data))
}
return nil
}
func Md5(p []byte) string {
t := md5.Sum(p)
return hex.EncodeToString(t[:])
}
func Main() {
ctx := context.Background()
c, err := NewController(&minioImpl{}, NewKV())
if err != nil {
log.Fatalln(err)
}
name := "hello.txt"
data := []byte("hello world")
userID := "10000"
name = path.Join("user_"+userID, name)
addr, err := c.ApplyPut(ctx, &FragmentPutArgs{
PutArgs: PutArgs{
Name: name,
Size: int64(len(data)),
Hash: Md5(data),
EffectiveTime: time.Second * 60 * 60,
},
FragmentSize: 2,
})
if err != nil {
log.Fatalln(err)
}
fmt.Println()
fmt.Println()
if addr.ResourceURL != "" {
log.Println("服务器已经存在")
return
}
var (
start int
end = int(addr.FragmentSize)
)
for _, u := range addr.PutURLs {
if start >= len(data) {
break
}
if end > len(data) {
end = len(data)
}
_ = u
page := data[start:end]
fmt.Print(string(page))
start += int(addr.FragmentSize)
end += int(addr.FragmentSize)
err = HttpPut(u, bytes.NewReader(page))
if err != nil {
log.Fatalln(err)
}
}
fmt.Println()
fmt.Println()
fmt.Println("[PUT_ID]", addr.PutID)
info, err := c.ConfirmPut(ctx, addr.PutID)
if err != nil {
log.Fatalln(err)
}
log.Printf("%+v\n", info)
log.Println("success")
}
+147
View File
@@ -0,0 +1,147 @@
package objstorage
import (
"context"
"errors"
"fmt"
"github.com/minio/minio-go"
"net/url"
"time"
)
func NewMinio() Interface {
return &minioImpl{}
}
type minioImpl struct {
uploadBucket string // 上传桶
permanentBucket string // 永久桶
clearBucket string // 自动清理桶
client *minio.Client
}
func (m *minioImpl) Init() error {
client, err := minio.New("127.0.0.1:9000", "minioadmin", "minioadmin", false)
if err != nil {
return fmt.Errorf("minio client error: %w", err)
}
m.client = client
m.uploadBucket = "upload"
m.permanentBucket = "permanent"
m.clearBucket = "clear"
return nil
}
func (m *minioImpl) Name() string {
return "minio"
}
func (m *minioImpl) MinMultipartSize() int64 {
return 1024 * 1024 * 5 // minio.absMinPartSize
}
func (m *minioImpl) UploadBucket() string {
return m.uploadBucket
}
func (m *minioImpl) PermanentBucket() string {
return m.permanentBucket
}
func (m *minioImpl) ClearBucket() string {
return m.clearBucket
}
func (m *minioImpl) urlReplace(u *url.URL) {
}
func (m *minioImpl) ApplyPut(ctx context.Context, args *ApplyPutArgs) (*PutRes, error) {
if args.Effective <= 0 {
return nil, errors.New("EffectiveTime <= 0")
}
_, err := m.GetObjectInfo(ctx, &BucketFile{
Bucket: m.uploadBucket,
Name: args.Name,
})
if err == nil {
return nil, fmt.Errorf("minio bucket %s name %s already exists", args.Bucket, args.Name)
} else if !m.IsNotFound(err) {
return nil, err
}
effective := time.Now().Add(args.Effective)
u, err := m.client.PresignedPutObject(m.uploadBucket, args.Name, args.Effective)
if err != nil {
return nil, fmt.Errorf("minio apply error: %w", err)
}
m.urlReplace(u)
return &PutRes{
URL: u.String(),
Bucket: m.uploadBucket,
Name: args.Name,
EffectiveTime: effective,
}, nil
}
func (m *minioImpl) GetObjectInfo(ctx context.Context, args *BucketFile) (*ObjectInfo, error) {
info, err := m.client.StatObject(args.Bucket, args.Name, minio.StatObjectOptions{})
if err != nil {
return nil, err
}
return &ObjectInfo{
URL: "", // todo
Size: info.Size,
Hash: info.ETag,
}, nil
}
func (m *minioImpl) CopyObjetInfo(ctx context.Context, src *BucketFile, dst *BucketFile) error {
destination, err := minio.NewDestinationInfo(dst.Bucket, dst.Name, nil, nil)
if err != nil {
return err
}
return m.client.CopyObject(destination, minio.NewSourceInfo(src.Bucket, src.Name, nil))
}
func (m *minioImpl) DeleteObjetInfo(ctx context.Context, info *BucketFile) error {
return m.client.RemoveObject(info.Bucket, info.Name)
}
func (m *minioImpl) MoveObjetInfo(ctx context.Context, src *BucketFile, dst *BucketFile) error {
if err := m.CopyObjetInfo(ctx, src, dst); err != nil {
return err
}
return m.DeleteObjetInfo(ctx, src)
}
func (m *minioImpl) MergeObjectInfo(ctx context.Context, src []BucketFile, dst *BucketFile) error {
switch len(src) {
case 0:
return errors.New("src empty")
case 1:
return m.CopyObjetInfo(ctx, &src[0], dst)
}
destination, err := minio.NewDestinationInfo(dst.Bucket, dst.Name, nil, nil)
if err != nil {
return err
}
sources := make([]minio.SourceInfo, len(src))
for i, s := range src {
sources[i] = minio.NewSourceInfo(s.Bucket, s.Name, nil)
}
return m.client.ComposeObject(destination, sources) // todo
}
func (m *minioImpl) IsNotFound(err error) bool {
if err == nil {
return false
}
switch e := err.(type) {
case minio.ErrorResponse:
return e.StatusCode == 404 && e.Code == "NoSuchKey"
case *minio.ErrorResponse:
return e.StatusCode == 404 && e.Code == "NoSuchKey"
default:
return false
}
}
+19
View File
@@ -0,0 +1,19 @@
package objstorage
import "context"
type Interface interface {
Init() error
Name() string
MinMultipartSize() int64
UploadBucket() string
PermanentBucket() string
ClearBucket() string
ApplyPut(ctx context.Context, args *ApplyPutArgs) (*PutRes, error)
GetObjectInfo(ctx context.Context, args *BucketFile) (*ObjectInfo, error)
CopyObjetInfo(ctx context.Context, src *BucketFile, dst *BucketFile) error
DeleteObjetInfo(ctx context.Context, info *BucketFile) error
MoveObjetInfo(ctx context.Context, src *BucketFile, dst *BucketFile) error
MergeObjectInfo(ctx context.Context, src []BucketFile, dst *BucketFile) error
IsNotFound(err error) bool
}
+69
View File
@@ -0,0 +1,69 @@
package objstorage
import (
"net/http"
"time"
)
type PutRes struct {
URL string
Bucket string
Name string
EffectiveTime time.Time
}
type FragmentPutArgs struct {
PutArgs
FragmentSize int64 // 分片大小
}
type PutArgs struct {
Name string // 文件名
Size int64 // 大小
Hash string // md5
Prefix string // 前缀
ClearTime time.Duration // 自动清理时间
EffectiveTime time.Duration // 申请有效时间
Header http.Header // header
}
type BucketFile struct {
Bucket string `json:"bucket"`
Name string `json:"name"`
}
type ObjectInfo struct {
URL string
Size int64
Hash string
}
//type PutSpace struct {
// URL string
// EffectiveTime time.Time
//}
type PutAddr struct {
ResourceURL string
PutID string
FragmentSize int64
EffectiveTime time.Time
PutURLs []string
}
type KVData struct {
Bucket string `json:"bucket"`
Name string `json:"name"`
}
type PutResp struct {
URL string
Time *time.Time
}
type ApplyPutArgs struct {
Bucket string
Name string
Effective time.Duration // 申请有效时间
Header http.Header // header
}
-3
View File
@@ -1,10 +1,7 @@
package push
import (
tpns "Open_IM/internal/push/sdk/tpns-server-sdk-go/go"
"Open_IM/internal/push/sdk/tpns-server-sdk-go/go/auth"
"Open_IM/internal/push/sdk/tpns-server-sdk-go/go/common"
"Open_IM/internal/push/sdk/tpns-server-sdk-go/go/req"
"Open_IM/pkg/common/config"
)
+36 -119
View File
@@ -2,137 +2,54 @@ package conversation
import (
"Open_IM/internal/common/check"
chat "Open_IM/internal/rpc/msg"
"Open_IM/internal/common/notification"
"Open_IM/internal/tx"
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/db/cache"
"Open_IM/pkg/common/db/controller"
"Open_IM/pkg/common/db/relation"
tableRelation "Open_IM/pkg/common/db/table/relation"
"Open_IM/pkg/common/log"
"Open_IM/pkg/common/prome"
pbConversation "Open_IM/pkg/proto/conversation"
pbUser "Open_IM/pkg/proto/user"
"Open_IM/pkg/utils"
"context"
"github.com/OpenIMSDK/openKeeper"
"github.com/dtm-labs/rockscache"
"net"
"strconv"
"strings"
grpcPrometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"Open_IM/pkg/common/config"
"google.golang.org/grpc"
)
type conversationServer struct {
rpcPort int
rpcRegisterName string
etcdSchema string
etcdAddr []string
groupChecker *check.GroupChecker
controller.ConversationInterface
groupChecker *check.GroupChecker
controller.ConversationDataBaseInterface
notify *notification.Check
}
func NewConversationServer(port int) *conversationServer {
log.NewPrivateLog(constant.LogFileName)
c := conversationServer{
rpcPort: port,
rpcRegisterName: config.Config.RpcRegisterName.OpenImConversationName,
etcdSchema: config.Config.Etcd.EtcdSchema,
etcdAddr: config.Config.Etcd.EtcdAddr,
groupChecker: check.NewGroupChecker(),
}
var cDB relation.Conversation
var cCache cache.ConversationCache
//mysql init
var mysql relation.Mysql
err := mysql.InitConn().AutoMigrateModel(&tableRelation.ConversationModel{})
func Start(client *openKeeper.ZkClient, server *grpc.Server) error {
db, err := relation.NewGormDB()
if err != nil {
panic("db init err:" + err.Error())
return err
}
if mysql.GormConn() != nil {
//get gorm model
cDB = relation.NewConversationGorm(mysql.GormConn())
} else {
panic("db init err:" + "conn is nil")
if err := db.AutoMigrate(&tableRelation.ConversationModel{}); err != nil {
return err
}
//redis init
var redis cache.RedisClient
redis.InitRedis()
rcClient := rockscache.NewClient(redis.GetClient(), rockscache.Options{
RandomExpireAdjustment: 0.2,
DisableCacheRead: false,
DisableCacheDelete: false,
StrongConsistency: true,
redis, err := cache.NewRedis()
if err != nil {
return err
}
pbConversation.RegisterConversationServer(server, &conversationServer{
groupChecker: check.NewGroupChecker(client),
ConversationDataBaseInterface: controller.NewConversationDatabase(relation.NewConversationGorm(db), cache.NewConversationRedis(redis.GetClient(), rockscache.Options{
RandomExpireAdjustment: 0.2,
DisableCacheRead: false,
DisableCacheDelete: false,
StrongConsistency: true,
}), tx.NewGorm(db)),
})
cCache = cache.NewConversationRedis(rcClient)
database := controller.NewConversationDataBase(cDB, cCache)
c.ConversationInterface = controller.NewConversationController(database)
return &c
}
func (c *conversationServer) Run() {
log.NewInfo("0", "rpc conversation start...")
listenIP := ""
if config.Config.ListenIP == "" {
listenIP = "0.0.0.0"
} else {
listenIP = config.Config.ListenIP
}
address := listenIP + ":" + strconv.Itoa(c.rpcPort)
listener, err := net.Listen("tcp", address)
if err != nil {
panic("listening err:" + err.Error() + c.rpcRegisterName)
}
log.NewInfo("0", "listen network success, ", address, listener)
//grpc server
var grpcOpts []grpc.ServerOption
if config.Config.Prometheus.Enable {
promePkg.NewGrpcRequestCounter()
promePkg.NewGrpcRequestFailedCounter()
promePkg.NewGrpcRequestSuccessCounter()
grpcOpts = append(grpcOpts, []grpc.ServerOption{
// grpc.UnaryInterceptor(promePkg.UnaryServerInterceptorProme),
grpc.StreamInterceptor(grpcPrometheus.StreamServerInterceptor),
grpc.UnaryInterceptor(grpcPrometheus.UnaryServerInterceptor),
}...)
}
srv := grpc.NewServer(grpcOpts...)
defer srv.GracefulStop()
//service registers with etcd
pbConversation.RegisterConversationServer(srv, c)
rpcRegisterIP := config.Config.RpcRegisterIP
if config.Config.RpcRegisterIP == "" {
rpcRegisterIP, err = utils.GetLocalIP()
if err != nil {
log.Error("", "GetLocalIP failed ", err.Error())
}
}
log.NewInfo("", "rpcRegisterIP", rpcRegisterIP)
err = rpc.RegisterEtcd(c.etcdSchema, strings.Join(c.etcdAddr, ","), rpcRegisterIP, c.rpcPort, c.rpcRegisterName, 10, "")
if err != nil {
log.NewError("0", "RegisterEtcd failed ", err.Error(),
c.etcdSchema, strings.Join(c.etcdAddr, ","), rpcRegisterIP, c.rpcPort, c.rpcRegisterName)
panic(utils.Wrap(err, "register conversation module rpc to etcd err"))
}
log.NewInfo("0", "RegisterConversationServer ok ", c.etcdSchema, strings.Join(c.etcdAddr, ","), rpcRegisterIP, c.rpcPort, c.rpcRegisterName)
err = srv.Serve(listener)
if err != nil {
log.NewError("0", "Serve failed ", err.Error())
return
}
log.NewInfo("0", "rpc conversation ok")
return nil
}
func (c *conversationServer) GetConversation(ctx context.Context, req *pbConversation.GetConversationReq) (*pbConversation.GetConversationResp, error) {
resp := &pbConversation.GetConversationResp{Conversation: &pbConversation.Conversation{}}
conversations, err := c.ConversationInterface.FindConversations(ctx, req.OwnerUserID, []string{req.ConversationID})
conversations, err := c.ConversationDataBaseInterface.FindConversations(ctx, req.OwnerUserID, []string{req.ConversationID})
if err != nil {
return nil, err
}
@@ -147,7 +64,7 @@ func (c *conversationServer) GetConversation(ctx context.Context, req *pbConvers
func (c *conversationServer) GetAllConversations(ctx context.Context, req *pbConversation.GetAllConversationsReq) (*pbConversation.GetAllConversationsResp, error) {
resp := &pbConversation.GetAllConversationsResp{Conversations: []*pbConversation.Conversation{}}
conversations, err := c.ConversationInterface.GetUserAllConversation(ctx, req.OwnerUserID)
conversations, err := c.ConversationDataBaseInterface.GetUserAllConversation(ctx, req.OwnerUserID)
if err != nil {
return nil, err
}
@@ -159,7 +76,7 @@ func (c *conversationServer) GetAllConversations(ctx context.Context, req *pbCon
func (c *conversationServer) GetConversations(ctx context.Context, req *pbConversation.GetConversationsReq) (*pbConversation.GetConversationsResp, error) {
resp := &pbConversation.GetConversationsResp{Conversations: []*pbConversation.Conversation{}}
conversations, err := c.ConversationInterface.FindConversations(ctx, req.OwnerUserID, req.ConversationIDs)
conversations, err := c.ConversationDataBaseInterface.FindConversations(ctx, req.OwnerUserID, req.ConversationIDs)
if err != nil {
return nil, err
}
@@ -175,11 +92,11 @@ func (c *conversationServer) BatchSetConversations(ctx context.Context, req *pbC
if err := utils.CopyStructFields(&conversations, req.Conversations); err != nil {
return nil, err
}
err := c.ConversationInterface.SetUserConversations(ctx, req.OwnerUserID, conversations)
err := c.ConversationDataBaseInterface.SetUserConversations(ctx, req.OwnerUserID, conversations)
if err != nil {
return nil, err
}
chat.ConversationChangeNotification(ctx, req.OwnerUserID)
c.notify.ConversationChangeNotification(ctx, req.OwnerUserID)
return resp, nil
}
@@ -196,7 +113,7 @@ func (c *conversationServer) ModifyConversationField(ctx context.Context, req *p
var err error
isSyncConversation := true
if req.Conversation.ConversationType == constant.GroupChatType {
groupInfo, err := c.groupChecker.GetGroupInfo(req.Conversation.GroupID)
groupInfo, err := c.groupChecker.GetGroupInfo(ctx, req.Conversation.GroupID)
if err != nil {
return nil, err
}
@@ -209,14 +126,14 @@ func (c *conversationServer) ModifyConversationField(ctx context.Context, req *p
return nil, err
}
if req.FieldType == constant.FieldIsPrivateChat {
err := c.ConversationInterface.SyncPeerUserPrivateConversationTx(ctx, &conversation)
err := c.ConversationDataBaseInterface.SyncPeerUserPrivateConversationTx(ctx, &conversation)
if err != nil {
return nil, err
}
chat.ConversationSetPrivateNotification(ctx, req.Conversation.OwnerUserID, req.Conversation.UserID, req.Conversation.IsPrivateChat)
c.notify.ConversationSetPrivateNotification(ctx, req.Conversation.OwnerUserID, req.Conversation.UserID, req.Conversation.IsPrivateChat)
return resp, nil
}
//haveUserID, err := c.ConversationInterface.GetUserIDExistConversation(ctx, req.UserIDList, req.Conversation.ConversationID)
//haveUserID, err := c.ConversationDataBaseInterface.GetUserIDExistConversation(ctx, req.UserIDList, req.Conversation.ConversationID)
//if err != nil {
// return nil, err
//}
@@ -240,18 +157,18 @@ func (c *conversationServer) ModifyConversationField(ctx context.Context, req *p
case constant.FieldBurnDuration:
filedMap["burn_duration"] = req.Conversation.BurnDuration
}
err = c.ConversationInterface.SetUsersConversationFiledTx(ctx, req.UserIDList, &conversation, filedMap)
err = c.ConversationDataBaseInterface.SetUsersConversationFiledTx(ctx, req.UserIDList, &conversation, filedMap)
if err != nil {
return nil, err
}
if isSyncConversation {
for _, v := range req.UserIDList {
chat.ConversationChangeNotification(ctx, v)
c.notify.ConversationChangeNotification(ctx, v)
}
} else {
for _, v := range req.UserIDList {
chat.ConversationUnreadChangeNotification(ctx, v, req.Conversation.ConversationID, req.Conversation.UpdateUnreadCountTime)
c.notify.ConversationUnreadChangeNotification(ctx, v, req.Conversation.ConversationID, req.Conversation.UpdateUnreadCountTime)
}
}
return resp, nil
+4 -4
View File
@@ -14,7 +14,7 @@ func (s *friendServer) GetPaginationBlacks(ctx context.Context, req *pbFriend.Ge
if err := s.userCheck.Access(ctx, req.UserID); err != nil {
return nil, err
}
blacks, total, err := s.BlackInterface.FindOwnerBlacks(ctx, req.UserID, req.Pagination.PageNumber, req.Pagination.ShowNumber)
blacks, total, err := s.BlackDatabase.FindOwnerBlacks(ctx, req.UserID, req.Pagination.PageNumber, req.Pagination.ShowNumber)
if err != nil {
return nil, err
}
@@ -28,7 +28,7 @@ func (s *friendServer) GetPaginationBlacks(ctx context.Context, req *pbFriend.Ge
func (s *friendServer) IsBlack(ctx context.Context, req *pbFriend.IsBlackReq) (*pbFriend.IsBlackResp, error) {
resp := &pbFriend.IsBlackResp{}
in1, in2, err := s.BlackInterface.CheckIn(ctx, req.UserID1, req.UserID2)
in1, in2, err := s.BlackDatabase.CheckIn(ctx, req.UserID1, req.UserID2)
if err != nil {
return nil, err
}
@@ -42,7 +42,7 @@ func (s *friendServer) RemoveBlack(ctx context.Context, req *pbFriend.RemoveBlac
if err := s.userCheck.Access(ctx, req.OwnerUserID); err != nil {
return nil, err
}
if err := s.BlackInterface.Delete(ctx, []*relation.BlackModel{{OwnerUserID: req.OwnerUserID, BlockUserID: req.BlackUserID}}); err != nil {
if err := s.BlackDatabase.Delete(ctx, []*relation.BlackModel{{OwnerUserID: req.OwnerUserID, BlockUserID: req.BlackUserID}}); err != nil {
return nil, err
}
s.notification.BlackDeletedNotification(ctx, req)
@@ -55,7 +55,7 @@ func (s *friendServer) AddBlack(ctx context.Context, req *pbFriend.AddBlackReq)
return nil, err
}
black := relation.BlackModel{OwnerUserID: req.OwnerUserID, BlockUserID: req.BlackUserID, OperatorUserID: tracelog.GetOpUserID(ctx)}
if err := s.BlackInterface.Create(ctx, []*relation.BlackModel{&black}); err != nil {
if err := s.BlackDatabase.Create(ctx, []*relation.BlackModel{&black}); err != nil {
return nil, err
}
s.notification.BlackAddedNotification(ctx, req)
+35 -31
View File
@@ -4,13 +4,14 @@ import (
"Open_IM/internal/common/check"
"Open_IM/internal/common/convert"
"Open_IM/internal/common/notification"
"Open_IM/internal/tx"
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/db/controller"
"Open_IM/pkg/common/db/relation"
relationTb "Open_IM/pkg/common/db/table/relation"
tablerelation "Open_IM/pkg/common/db/table/relation"
"Open_IM/pkg/common/tokenverify"
"Open_IM/pkg/common/tracelog"
discoveryRegistry "Open_IM/pkg/discoveryregistry"
registry "Open_IM/pkg/discoveryregistry"
pbfriend "Open_IM/pkg/proto/friend"
"Open_IM/pkg/utils"
"context"
@@ -19,27 +20,27 @@ import (
)
type friendServer struct {
controller.FriendInterface
controller.BlackInterface
controller.FriendDatabase
controller.BlackDatabase
notification *notification.Check
userCheck *check.UserCheck
RegisterCenter discoveryRegistry.SvcDiscoveryRegistry
RegisterCenter registry.SvcDiscoveryRegistry
}
func Start(client *openKeeper.ZkClient, server *grpc.Server) error {
mysql, err := relation.NewGormDB()
db, err := relation.NewGormDB()
if err != nil {
return err
}
if err := mysql.AutoMigrate(&relationTb.FriendModel{}, &relationTb.FriendRequestModel{}, &relationTb.BlackModel{}); err != nil {
if err := db.AutoMigrate(&tablerelation.FriendModel{}, &tablerelation.FriendRequestModel{}, &tablerelation.BlackModel{}); err != nil {
return err
}
pbfriend.RegisterFriendServer(server, &friendServer{
FriendInterface: controller.NewFriendController(mysql),
BlackInterface: controller.NewBlackController(mysql),
notification: notification.NewCheck(client),
userCheck: check.NewUserCheck(client),
RegisterCenter: client,
FriendDatabase: controller.NewFriendDatabase(relation.NewFriendGorm(db), relation.NewFriendRequestGorm(db), tx.NewGorm(db)),
BlackDatabase: controller.NewBlackDatabase(relation.NewBlackGorm(db)),
notification: notification.NewCheck(client),
userCheck: check.NewUserCheck(client),
RegisterCenter: client,
})
return nil
}
@@ -59,14 +60,14 @@ func (s *friendServer) ApplyToAddFriend(ctx context.Context, req *pbfriend.Apply
if _, err := s.userCheck.GetUsersInfoMap(ctx, []string{req.ToUserID, req.FromUserID}, true); err != nil {
return nil, err
}
in1, in2, err := s.FriendInterface.CheckIn(ctx, req.FromUserID, req.ToUserID)
in1, in2, err := s.FriendDatabase.CheckIn(ctx, req.FromUserID, req.ToUserID)
if err != nil {
return nil, err
}
if in1 && in2 {
return nil, constant.ErrRelationshipAlready.Wrap()
}
if err = s.FriendInterface.AddFriendRequest(ctx, req.FromUserID, req.ToUserID, req.ReqMsg, req.Ex); err != nil {
if err = s.FriendDatabase.AddFriendRequest(ctx, req.FromUserID, req.ToUserID, req.ReqMsg, req.Ex); err != nil {
return nil, err
}
s.notification.FriendApplicationAddNotification(ctx, req)
@@ -90,7 +91,7 @@ func (s *friendServer) ImportFriends(ctx context.Context, req *pbfriend.ImportFr
return nil, constant.ErrArgs.Wrap("friend userID repeated")
}
if err := s.FriendInterface.BecomeFriends(ctx, req.OwnerUserID, req.FriendUserIDs, constant.BecomeFriendByImport, tracelog.GetOpUserID(ctx)); err != nil {
if err := s.FriendDatabase.BecomeFriends(ctx, req.OwnerUserID, req.FriendUserIDs, constant.BecomeFriendByImport, tracelog.GetOpUserID(ctx)); err != nil {
return nil, err
}
return resp, nil
@@ -102,7 +103,7 @@ func (s *friendServer) RespondFriendApply(ctx context.Context, req *pbfriend.Res
if err := s.userCheck.Access(ctx, req.ToUserID); err != nil {
return nil, err
}
friendRequest := relationTb.FriendRequestModel{FromUserID: req.FromUserID, ToUserID: req.ToUserID, HandleMsg: req.HandleMsg, HandleResult: req.HandleResult}
friendRequest := tablerelation.FriendRequestModel{FromUserID: req.FromUserID, ToUserID: req.ToUserID, HandleMsg: req.HandleMsg, HandleResult: req.HandleResult}
if req.HandleResult == constant.FriendResponseAgree {
err := s.AgreeFriendRequest(ctx, &friendRequest)
if err != nil {
@@ -132,7 +133,7 @@ func (s *friendServer) DeleteFriend(ctx context.Context, req *pbfriend.DeleteFri
if err != nil {
return nil, err
}
if err := s.FriendInterface.Delete(ctx, req.OwnerUserID, []string{req.FriendUserID}); err != nil {
if err := s.FriendDatabase.Delete(ctx, req.OwnerUserID, []string{req.FriendUserID}); err != nil {
return nil, err
}
s.notification.FriendDeletedNotification(ctx, req)
@@ -149,7 +150,7 @@ func (s *friendServer) SetFriendRemark(ctx context.Context, req *pbfriend.SetFri
if err != nil {
return nil, err
}
if err := s.FriendInterface.UpdateRemark(ctx, req.OwnerUserID, req.FriendUserID, req.Remark); err != nil {
if err := s.FriendDatabase.UpdateRemark(ctx, req.OwnerUserID, req.FriendUserID, req.Remark); err != nil {
return nil, err
}
s.notification.FriendRemarkSetNotification(ctx, req.OwnerUserID, req.FriendUserID)
@@ -158,20 +159,21 @@ func (s *friendServer) SetFriendRemark(ctx context.Context, req *pbfriend.SetFri
// ok
func (s *friendServer) GetDesignatedFriends(ctx context.Context, req *pbfriend.GetDesignatedFriendsReq) (resp *pbfriend.GetDesignatedFriendsResp, err error) {
resp = &pbfriend.GetDesignatedFriendsResp{}
if err := s.userCheck.Access(ctx, req.UserID); err != nil {
return nil, err
if utils.Duplicate(req.FriendUserIDs) {
return nil, constant.ErrArgs.Wrap("friend userID repeated")
}
friends, total, err := s.FriendInterface.PageOwnerFriends(ctx, req.UserID, req.Pagination.PageNumber, req.Pagination.ShowNumber)
friends, err := s.FriendDatabase.FindFriendsWithError(ctx, req.OwnerUserID, req.FriendUserIDs)
if err != nil {
return nil, err
}
resp.FriendsInfo, err = (*convert.NewDBFriend(nil, s.RegisterCenter)).DB2PB(ctx, friends)
if err != nil {
if resp.FriendsInfo, err = (*convert.NewDBFriend(nil, s.RegisterCenter)).DB2PB(ctx, friends); err != nil {
return nil, err
}
resp.Total = int32(total)
return resp, nil
}
// ok 获取接收到的好友申请(即别人主动申请的)
@@ -180,7 +182,7 @@ func (s *friendServer) GetPaginationFriendsApplyTo(ctx context.Context, req *pbf
if err := s.userCheck.Access(ctx, req.UserID); err != nil {
return nil, err
}
friendRequests, total, err := s.FriendInterface.PageFriendRequestToMe(ctx, req.UserID, req.Pagination.PageNumber, req.Pagination.ShowNumber)
friendRequests, total, err := s.FriendDatabase.PageFriendRequestToMe(ctx, req.UserID, req.Pagination.PageNumber, req.Pagination.ShowNumber)
if err != nil {
return nil, err
}
@@ -198,7 +200,7 @@ func (s *friendServer) GetPaginationFriendsApplyFrom(ctx context.Context, req *p
if err := s.userCheck.Access(ctx, req.UserID); err != nil {
return nil, err
}
friendRequests, total, err := s.FriendInterface.PageFriendRequestFromMe(ctx, req.UserID, req.Pagination.PageNumber, req.Pagination.ShowNumber)
friendRequests, total, err := s.FriendDatabase.PageFriendRequestFromMe(ctx, req.UserID, req.Pagination.PageNumber, req.Pagination.ShowNumber)
if err != nil {
return nil, err
}
@@ -213,7 +215,7 @@ func (s *friendServer) GetPaginationFriendsApplyFrom(ctx context.Context, req *p
// ok
func (s *friendServer) IsFriend(ctx context.Context, req *pbfriend.IsFriendReq) (resp *pbfriend.IsFriendResp, err error) {
resp = &pbfriend.IsFriendResp{}
resp.InUser1Friends, resp.InUser2Friends, err = s.FriendInterface.CheckIn(ctx, req.UserID1, req.UserID2)
resp.InUser1Friends, resp.InUser2Friends, err = s.FriendDatabase.CheckIn(ctx, req.UserID1, req.UserID2)
if err != nil {
return nil, err
}
@@ -223,15 +225,17 @@ func (s *friendServer) IsFriend(ctx context.Context, req *pbfriend.IsFriendReq)
// ok
func (s *friendServer) GetPaginationFriends(ctx context.Context, req *pbfriend.GetPaginationFriendsReq) (resp *pbfriend.GetPaginationFriendsResp, err error) {
resp = &pbfriend.GetPaginationFriendsResp{}
if utils.Duplicate(req.FriendUserIDs) {
return nil, constant.ErrArgs.Wrap("friend userID repeated")
if err := s.userCheck.Access(ctx, req.UserID); err != nil {
return nil, err
}
friends, err := s.FriendInterface.FindFriendsWithError(ctx, req.OwnerUserID, req.FriendUserIDs)
friends, total, err := s.FriendDatabase.PageOwnerFriends(ctx, req.UserID, req.Pagination.PageNumber, req.Pagination.ShowNumber)
if err != nil {
return nil, err
}
if resp.FriendsInfo, err = (*convert.NewDBFriend(nil, s.RegisterCenter)).DB2PB(ctx, friends); err != nil {
resp.FriendsInfo, err = (*convert.NewDBFriend(nil, s.RegisterCenter)).DB2PB(ctx, friends)
if err != nil {
return nil, err
}
resp.Total = int32(total)
return resp, nil
}
+89 -80
View File
@@ -3,6 +3,7 @@ package group
import (
"Open_IM/internal/common/check"
"Open_IM/internal/common/notification"
"Open_IM/internal/tx"
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/db/cache"
"Open_IM/pkg/common/db/controller"
@@ -44,7 +45,15 @@ func Start(client *openKeeper.ZkClient, server *grpc.Server) error {
return err
}
pbGroup.RegisterGroupServer(server, &groupServer{
GroupInterface: controller.NewGroupInterface(controller.NewGroupDatabase(db, redis.GetClient(), mongo.GetClient())),
GroupDatabase: controller.NewGroupDatabase(
relation.NewGroupDB(db),
relation.NewGroupMemberDB(db),
relation.NewGroupRequest(db),
tx.NewGorm(db),
tx.NewMongo(mongo.GetClient()),
unrelation.NewSuperGroupMongoDriver(mongo.GetClient()),
redis.GetClient(),
),
UserCheck: check.NewUserCheck(client),
ConversationChecker: check.NewConversationChecker(client),
})
@@ -52,7 +61,7 @@ func Start(client *openKeeper.ZkClient, server *grpc.Server) error {
}
type groupServer struct {
GroupInterface controller.GroupInterface
GroupDatabase controller.GroupDatabase
UserCheck *check.UserCheck
Notification *notification.Check
ConversationChecker *check.ConversationChecker
@@ -60,7 +69,7 @@ type groupServer struct {
func (s *groupServer) CheckGroupAdmin(ctx context.Context, groupID string) error {
if !tokenverify.IsAppManagerUid(ctx) {
groupMember, err := s.GroupInterface.TakeGroupMember(ctx, groupID, tracelog.GetOpUserID(ctx))
groupMember, err := s.GroupDatabase.TakeGroupMember(ctx, groupID, tracelog.GetOpUserID(ctx))
if err != nil {
return err
}
@@ -90,7 +99,7 @@ func (s *groupServer) IsNotFound(err error) bool {
func (s *groupServer) GenGroupID(ctx context.Context, groupID *string) error {
if *groupID != "" {
_, err := s.GroupInterface.TakeGroup(ctx, *groupID)
_, err := s.GroupDatabase.TakeGroup(ctx, *groupID)
if err == nil {
return constant.ErrGroupIDExisted.Wrap("group id existed " + *groupID)
} else if s.IsNotFound(err) {
@@ -104,7 +113,7 @@ func (s *groupServer) GenGroupID(ctx context.Context, groupID *string) error {
bi := big.NewInt(0)
bi.SetString(id[0:8], 16)
id = bi.String()
_, err := s.GroupInterface.TakeGroup(ctx, id)
_, err := s.GroupDatabase.TakeGroup(ctx, id)
if err == nil {
continue
} else if s.IsNotFound(err) {
@@ -159,7 +168,7 @@ func (s *groupServer) CreateGroup(ctx context.Context, req *pbGroup.CreateGroupR
return nil, err
}
if req.GroupInfo.GroupType == constant.SuperGroup {
if err := s.GroupInterface.CreateSuperGroup(ctx, group.GroupID, userIDs); err != nil {
if err := s.GroupDatabase.CreateSuperGroup(ctx, group.GroupID, userIDs); err != nil {
return nil, err
}
} else {
@@ -174,7 +183,7 @@ func (s *groupServer) CreateGroup(ctx context.Context, req *pbGroup.CreateGroupR
}
}
}
if err := s.GroupInterface.CreateGroup(ctx, []*relationTb.GroupModel{group}, groupMembers); err != nil {
if err := s.GroupDatabase.CreateGroup(ctx, []*relationTb.GroupModel{group}, groupMembers); err != nil {
return nil, err
}
resp.GroupInfo = DbToPbGroupInfo(group, req.OwnerUserID, uint32(len(userIDs)))
@@ -196,7 +205,7 @@ func (s *groupServer) GetJoinedGroupList(ctx context.Context, req *pbGroup.GetJo
if err := tokenverify.CheckAccessV3(ctx, req.FromUserID); err != nil {
return nil, err
}
total, members, err := s.GroupInterface.PageGroupMember(ctx, nil, []string{req.FromUserID}, nil, req.Pagination.PageNumber, req.Pagination.ShowNumber)
total, members, err := s.GroupDatabase.PageGroupMember(ctx, nil, []string{req.FromUserID}, nil, req.Pagination.PageNumber, req.Pagination.ShowNumber)
if err != nil {
return nil, err
}
@@ -207,15 +216,15 @@ func (s *groupServer) GetJoinedGroupList(ctx context.Context, req *pbGroup.GetJo
groupIDs := utils.Slice(members, func(e *relationTb.GroupMemberModel) string {
return e.GroupID
})
groups, err := s.GroupInterface.FindGroup(ctx, groupIDs)
groups, err := s.GroupDatabase.FindGroup(ctx, groupIDs)
if err != nil {
return nil, err
}
groupMemberNum, err := s.GroupInterface.MapGroupMemberNum(ctx, groupIDs)
groupMemberNum, err := s.GroupDatabase.MapGroupMemberNum(ctx, groupIDs)
if err != nil {
return nil, err
}
owners, err := s.GroupInterface.FindGroupMember(ctx, groupIDs, nil, []int32{constant.GroupOwner})
owners, err := s.GroupDatabase.FindGroupMember(ctx, groupIDs, nil, []int32{constant.GroupOwner})
if err != nil {
return nil, err
}
@@ -238,14 +247,14 @@ func (s *groupServer) InviteUserToGroup(ctx context.Context, req *pbGroup.Invite
if utils.Duplicate(req.InvitedUserIDs) {
return nil, constant.ErrArgs.Wrap("userID duplicate")
}
group, err := s.GroupInterface.TakeGroup(ctx, req.GroupID)
group, err := s.GroupDatabase.TakeGroup(ctx, req.GroupID)
if err != nil {
return nil, err
}
if group.Status == constant.GroupStatusDismissed {
return nil, constant.ErrDismissedAlready.Wrap()
}
members, err := s.GroupInterface.FindGroupMember(ctx, []string{group.GroupID}, nil, nil)
members, err := s.GroupDatabase.FindGroupMember(ctx, []string{group.GroupID}, nil, nil)
if err != nil {
return nil, err
}
@@ -276,7 +285,7 @@ func (s *groupServer) InviteUserToGroup(ctx context.Context, req *pbGroup.Invite
InviterUserID: opUserID,
})
}
if err := s.GroupInterface.CreateGroupRequest(ctx, requests); err != nil {
if err := s.GroupDatabase.CreateGroupRequest(ctx, requests); err != nil {
return nil, err
}
for _, request := range requests {
@@ -292,7 +301,7 @@ func (s *groupServer) InviteUserToGroup(ctx context.Context, req *pbGroup.Invite
}
}
if group.GroupType == constant.SuperGroup {
if err := s.GroupInterface.CreateSuperGroupMember(ctx, req.GroupID, req.InvitedUserIDs); err != nil {
if err := s.GroupDatabase.CreateSuperGroupMember(ctx, req.GroupID, req.InvitedUserIDs); err != nil {
return nil, err
}
for _, userID := range req.InvitedUserIDs {
@@ -314,7 +323,7 @@ func (s *groupServer) InviteUserToGroup(ctx context.Context, req *pbGroup.Invite
}
groupMembers = append(groupMembers, member)
}
if err := s.GroupInterface.CreateGroup(ctx, nil, groupMembers); err != nil {
if err := s.GroupDatabase.CreateGroup(ctx, nil, groupMembers); err != nil {
return nil, err
}
s.Notification.MemberInvitedNotification(ctx, req.GroupID, req.Reason, req.InvitedUserIDs)
@@ -324,14 +333,14 @@ func (s *groupServer) InviteUserToGroup(ctx context.Context, req *pbGroup.Invite
func (s *groupServer) GetGroupAllMember(ctx context.Context, req *pbGroup.GetGroupAllMemberReq) (*pbGroup.GetGroupAllMemberResp, error) {
resp := &pbGroup.GetGroupAllMemberResp{}
group, err := s.GroupInterface.TakeGroup(ctx, req.GroupID)
group, err := s.GroupDatabase.TakeGroup(ctx, req.GroupID)
if err != nil {
return nil, err
}
if group.GroupType == constant.SuperGroup {
return nil, constant.ErrArgs.Wrap("unsupported super group")
}
members, err := s.GroupInterface.FindGroupMember(ctx, []string{req.GroupID}, nil, nil)
members, err := s.GroupDatabase.FindGroupMember(ctx, []string{req.GroupID}, nil, nil)
if err != nil {
return nil, err
}
@@ -352,7 +361,7 @@ func (s *groupServer) GetGroupAllMember(ctx context.Context, req *pbGroup.GetGro
func (s *groupServer) GetGroupMemberList(ctx context.Context, req *pbGroup.GetGroupMemberListReq) (*pbGroup.GetGroupMemberListResp, error) {
resp := &pbGroup.GetGroupMemberListResp{}
total, members, err := s.GroupInterface.PageGroupMember(ctx, []string{req.GroupID}, nil, utils.If(req.Filter >= 0, []int32{req.Filter}, nil), req.Pagination.PageNumber, req.Pagination.ShowNumber)
total, members, err := s.GroupDatabase.PageGroupMember(ctx, []string{req.GroupID}, nil, utils.If(req.Filter >= 0, []int32{req.Filter}, nil), req.Pagination.PageNumber, req.Pagination.ShowNumber)
if err != nil {
return nil, err
}
@@ -374,7 +383,7 @@ func (s *groupServer) GetGroupMemberList(ctx context.Context, req *pbGroup.GetGr
func (s *groupServer) KickGroupMember(ctx context.Context, req *pbGroup.KickGroupMemberReq) (*pbGroup.KickGroupMemberResp, error) {
resp := &pbGroup.KickGroupMemberResp{}
group, err := s.GroupInterface.TakeGroup(ctx, req.GroupID)
group, err := s.GroupDatabase.TakeGroup(ctx, req.GroupID)
if err != nil {
return nil, err
}
@@ -389,7 +398,7 @@ func (s *groupServer) KickGroupMember(ctx context.Context, req *pbGroup.KickGrou
return nil, constant.ErrArgs.Wrap("opUserID in KickedUserIDs")
}
if group.GroupType == constant.SuperGroup {
if err := s.GroupInterface.DeleteSuperGroupMember(ctx, req.GroupID, req.KickedUserIDs); err != nil {
if err := s.GroupDatabase.DeleteSuperGroupMember(ctx, req.GroupID, req.KickedUserIDs); err != nil {
return nil, err
}
go func() {
@@ -398,7 +407,7 @@ func (s *groupServer) KickGroupMember(ctx context.Context, req *pbGroup.KickGrou
}
}()
} else {
members, err := s.GroupInterface.FindGroupMember(ctx, []string{req.GroupID}, append(req.KickedUserIDs, opUserID), nil)
members, err := s.GroupDatabase.FindGroupMember(ctx, []string{req.GroupID}, append(req.KickedUserIDs, opUserID), nil)
if err != nil {
return nil, err
}
@@ -431,7 +440,7 @@ func (s *groupServer) KickGroupMember(ctx context.Context, req *pbGroup.KickGrou
return nil, constant.ErrNoPermission.Wrap("opUserID is OrdinaryUser")
}
}
if err := s.GroupInterface.DeleteGroupMember(ctx, group.GroupID, req.KickedUserIDs); err != nil {
if err := s.GroupDatabase.DeleteGroupMember(ctx, group.GroupID, req.KickedUserIDs); err != nil {
return nil, err
}
s.Notification.MemberKickedNotification(ctx, req, req.KickedUserIDs)
@@ -447,7 +456,7 @@ func (s *groupServer) GetGroupMembersInfo(ctx context.Context, req *pbGroup.GetG
if req.GroupID == "" {
return nil, constant.ErrArgs.Wrap("groupID empty")
}
members, err := s.GroupInterface.FindGroupMember(ctx, []string{req.GroupID}, req.Members, nil)
members, err := s.GroupDatabase.FindGroupMember(ctx, []string{req.GroupID}, req.Members, nil)
if err != nil {
return nil, err
}
@@ -468,7 +477,7 @@ func (s *groupServer) GetGroupMembersInfo(ctx context.Context, req *pbGroup.GetG
func (s *groupServer) GetGroupApplicationList(ctx context.Context, req *pbGroup.GetGroupApplicationListReq) (*pbGroup.GetGroupApplicationListResp, error) {
resp := &pbGroup.GetGroupApplicationListResp{}
total, groupRequests, err := s.GroupInterface.PageGroupRequestUser(ctx, req.FromUserID, req.Pagination.PageNumber, req.Pagination.ShowNumber)
total, groupRequests, err := s.GroupDatabase.PageGroupRequestUser(ctx, req.FromUserID, req.Pagination.PageNumber, req.Pagination.ShowNumber)
if err != nil {
return nil, err
}
@@ -490,7 +499,7 @@ func (s *groupServer) GetGroupApplicationList(ctx context.Context, req *pbGroup.
if err != nil {
return nil, err
}
groups, err := s.GroupInterface.FindGroup(ctx, utils.Distinct(groupIDs))
groups, err := s.GroupDatabase.FindGroup(ctx, utils.Distinct(groupIDs))
if err != nil {
return nil, err
}
@@ -500,11 +509,11 @@ func (s *groupServer) GetGroupApplicationList(ctx context.Context, req *pbGroup.
if ids := utils.Single(utils.Keys(groupMap), groupIDs); len(ids) > 0 {
return nil, constant.ErrGroupIDNotFound.Wrap(strings.Join(ids, ","))
}
groupMemberNumMap, err := s.GroupInterface.MapGroupMemberNum(ctx, groupIDs)
groupMemberNumMap, err := s.GroupDatabase.MapGroupMemberNum(ctx, groupIDs)
if err != nil {
return nil, err
}
owners, err := s.GroupInterface.FindGroupMember(ctx, groupIDs, nil, []int32{constant.GroupOwner})
owners, err := s.GroupDatabase.FindGroupMember(ctx, groupIDs, nil, []int32{constant.GroupOwner})
if err != nil {
return nil, err
}
@@ -522,15 +531,15 @@ func (s *groupServer) GetGroupsInfo(ctx context.Context, req *pbGroup.GetGroupsI
if len(req.GroupIDs) == 0 {
return nil, constant.ErrArgs.Wrap("groupID is empty")
}
groups, err := s.GroupInterface.FindGroup(ctx, req.GroupIDs)
groups, err := s.GroupDatabase.FindGroup(ctx, req.GroupIDs)
if err != nil {
return nil, err
}
groupMemberNumMap, err := s.GroupInterface.MapGroupMemberNum(ctx, req.GroupIDs)
groupMemberNumMap, err := s.GroupDatabase.MapGroupMemberNum(ctx, req.GroupIDs)
if err != nil {
return nil, err
}
owners, err := s.GroupInterface.FindGroupMember(ctx, req.GroupIDs, nil, []int32{constant.GroupOwner})
owners, err := s.GroupDatabase.FindGroupMember(ctx, req.GroupIDs, nil, []int32{constant.GroupOwner})
if err != nil {
return nil, err
}
@@ -538,7 +547,7 @@ func (s *groupServer) GetGroupsInfo(ctx context.Context, req *pbGroup.GetGroupsI
return e.GroupID
})
resp.GroupInfos = utils.Slice(groups, func(e *relationTb.GroupModel) *sdkws.GroupInfo {
return DbToPbGroupInfo(e, ownerMap[e.GroupID].UserID, uint32(groupMemberNumMap[e.GroupID]))
return DbToPbGroupInfo(e, ownerMap[e.GroupID].UserID, groupMemberNumMap[e.GroupID])
})
return resp, nil
}
@@ -549,7 +558,7 @@ func (s *groupServer) GroupApplicationResponse(ctx context.Context, req *pbGroup
return nil, constant.ErrArgs.Wrap("HandleResult unknown")
}
if !tokenverify.IsAppManagerUid(ctx) {
groupMember, err := s.GroupInterface.TakeGroupMember(ctx, req.GroupID, req.FromUserID)
groupMember, err := s.GroupDatabase.TakeGroupMember(ctx, req.GroupID, req.FromUserID)
if err != nil {
return nil, err
}
@@ -557,11 +566,11 @@ func (s *groupServer) GroupApplicationResponse(ctx context.Context, req *pbGroup
return nil, constant.ErrNoPermission.Wrap("no group owner or admin")
}
}
group, err := s.GroupInterface.TakeGroup(ctx, req.GroupID)
group, err := s.GroupDatabase.TakeGroup(ctx, req.GroupID)
if err != nil {
return nil, err
}
groupRequest, err := s.GroupInterface.TakeGroupRequest(ctx, req.GroupID, req.FromUserID)
groupRequest, err := s.GroupDatabase.TakeGroupRequest(ctx, req.GroupID, req.FromUserID)
if err != nil {
return nil, err
}
@@ -569,7 +578,7 @@ func (s *groupServer) GroupApplicationResponse(ctx context.Context, req *pbGroup
return nil, constant.ErrArgs.Wrap("group request already processed")
}
var join bool
if _, err = s.GroupInterface.TakeGroupMember(ctx, req.GroupID, req.FromUserID); err == nil {
if _, err = s.GroupDatabase.TakeGroupMember(ctx, req.GroupID, req.FromUserID); err == nil {
join = true // 已经在群里了
} else if !s.IsNotFound(err) {
return nil, err
@@ -596,7 +605,7 @@ func (s *groupServer) GroupApplicationResponse(ctx context.Context, req *pbGroup
return nil, err
}
}
if err := s.GroupInterface.HandlerGroupRequest(ctx, req.GroupID, req.FromUserID, req.HandledMsg, req.HandleResult, member); err != nil {
if err := s.GroupDatabase.HandlerGroupRequest(ctx, req.GroupID, req.FromUserID, req.HandledMsg, req.HandleResult, member); err != nil {
return nil, err
}
if !join {
@@ -615,7 +624,7 @@ func (s *groupServer) JoinGroup(ctx context.Context, req *pbGroup.JoinGroupReq)
if _, err := s.UserCheck.GetPublicUserInfo(ctx, tracelog.GetOpUserID(ctx)); err != nil {
return nil, err
}
group, err := s.GroupInterface.TakeGroup(ctx, req.GroupID)
group, err := s.GroupDatabase.TakeGroup(ctx, req.GroupID)
if err != nil {
return nil, err
}
@@ -626,7 +635,7 @@ func (s *groupServer) JoinGroup(ctx context.Context, req *pbGroup.JoinGroupReq)
if group.GroupType == constant.SuperGroup {
return nil, constant.ErrGroupTypeNotSupport.Wrap()
}
user, err := relation.GetUserByUserID(tracelog.GetOpUserID(ctx))
user, err := s.UserCheck.GetUsersInfo(ctx, tracelog.GetOpUserID(ctx))
if err != nil {
return nil, err
}
@@ -639,7 +648,7 @@ func (s *groupServer) JoinGroup(ctx context.Context, req *pbGroup.JoinGroupReq)
if err := CallbackBeforeMemberJoinGroup(ctx, groupMember, group.Ex); err != nil {
return nil, err
}
if err := s.GroupInterface.CreateGroup(ctx, nil, []*relationTb.GroupMemberModel{groupMember}); err != nil {
if err := s.GroupDatabase.CreateGroup(ctx, nil, []*relationTb.GroupMemberModel{groupMember}); err != nil {
return nil, err
}
s.Notification.MemberEnterDirectlyNotification(ctx, req.GroupID, tracelog.GetOpUserID(ctx), tracelog.GetOperationID(ctx))
@@ -652,7 +661,7 @@ func (s *groupServer) JoinGroup(ctx context.Context, req *pbGroup.JoinGroupReq)
JoinSource: req.JoinSource,
ReqTime: time.Now(),
}
if err := s.GroupInterface.CreateGroupRequest(ctx, []*relationTb.GroupRequestModel{&groupRequest}); err != nil {
if err := s.GroupDatabase.CreateGroupRequest(ctx, []*relationTb.GroupRequestModel{&groupRequest}); err != nil {
return nil, err
}
s.Notification.JoinGroupApplicationNotification(ctx, req)
@@ -661,17 +670,17 @@ func (s *groupServer) JoinGroup(ctx context.Context, req *pbGroup.JoinGroupReq)
func (s *groupServer) QuitGroup(ctx context.Context, req *pbGroup.QuitGroupReq) (*pbGroup.QuitGroupResp, error) {
resp := &pbGroup.QuitGroupResp{}
group, err := s.GroupInterface.TakeGroup(ctx, req.GroupID)
group, err := s.GroupDatabase.TakeGroup(ctx, req.GroupID)
if err != nil {
return nil, err
}
if group.GroupType == constant.SuperGroup {
if err := s.GroupInterface.DeleteSuperGroupMember(ctx, req.GroupID, []string{tracelog.GetOpUserID(ctx)}); err != nil {
if err := s.GroupDatabase.DeleteSuperGroupMember(ctx, req.GroupID, []string{tracelog.GetOpUserID(ctx)}); err != nil {
return nil, err
}
s.Notification.SuperGroupNotification(ctx, tracelog.GetOpUserID(ctx), tracelog.GetOpUserID(ctx))
} else {
_, err := s.GroupInterface.TakeGroupMember(ctx, req.GroupID, tracelog.GetOpUserID(ctx))
_, err := s.GroupDatabase.TakeGroupMember(ctx, req.GroupID, tracelog.GetOpUserID(ctx))
if err != nil {
return nil, err
}
@@ -683,7 +692,7 @@ func (s *groupServer) QuitGroup(ctx context.Context, req *pbGroup.QuitGroupReq)
func (s *groupServer) SetGroupInfo(ctx context.Context, req *pbGroup.SetGroupInfoReq) (*pbGroup.SetGroupInfoResp, error) {
resp := &pbGroup.SetGroupInfoResp{}
if !tokenverify.IsAppManagerUid(ctx) {
groupMember, err := s.GroupInterface.TakeGroupMember(ctx, req.GroupInfoForSet.GroupID, tracelog.GetOpUserID(ctx))
groupMember, err := s.GroupDatabase.TakeGroupMember(ctx, req.GroupInfoForSet.GroupID, tracelog.GetOpUserID(ctx))
if err != nil {
return nil, err
}
@@ -691,14 +700,14 @@ func (s *groupServer) SetGroupInfo(ctx context.Context, req *pbGroup.SetGroupInf
return nil, constant.ErrNoPermission.Wrap("no group owner or admin")
}
}
group, err := s.GroupInterface.TakeGroup(ctx, req.GroupInfoForSet.GroupID)
group, err := s.GroupDatabase.TakeGroup(ctx, req.GroupInfoForSet.GroupID)
if err != nil {
return nil, err
}
if group.Status == constant.GroupStatusDismissed {
return nil, utils.Wrap(constant.ErrDismissedAlready, "")
}
userIDs, err := s.GroupInterface.FindGroupMemberUserID(ctx, group.GroupID)
userIDs, err := s.GroupDatabase.FindGroupMemberUserID(ctx, group.GroupID)
if err != nil {
return nil, err
}
@@ -706,10 +715,10 @@ func (s *groupServer) SetGroupInfo(ctx context.Context, req *pbGroup.SetGroupInf
if len(data) > 0 {
return resp, nil
}
if err := s.GroupInterface.UpdateGroup(ctx, group.GroupID, data); err != nil {
if err := s.GroupDatabase.UpdateGroup(ctx, group.GroupID, data); err != nil {
return nil, err
}
group, err = s.GroupInterface.TakeGroup(ctx, req.GroupInfoForSet.GroupID)
group, err = s.GroupDatabase.TakeGroup(ctx, req.GroupInfoForSet.GroupID)
if err != nil {
return nil, err
}
@@ -734,7 +743,7 @@ func (s *groupServer) SetGroupInfo(ctx context.Context, req *pbGroup.SetGroupInf
func (s *groupServer) TransferGroupOwner(ctx context.Context, req *pbGroup.TransferGroupOwnerReq) (*pbGroup.TransferGroupOwnerResp, error) {
resp := &pbGroup.TransferGroupOwnerResp{}
group, err := s.GroupInterface.TakeGroup(ctx, req.GroupID)
group, err := s.GroupDatabase.TakeGroup(ctx, req.GroupID)
if err != nil {
return nil, err
}
@@ -744,7 +753,7 @@ func (s *groupServer) TransferGroupOwner(ctx context.Context, req *pbGroup.Trans
if req.OldOwnerUserID == req.NewOwnerUserID {
return nil, constant.ErrArgs.Wrap("OldOwnerUserID == NewOwnerUserID")
}
members, err := s.GroupInterface.FindGroupMember(ctx, []string{req.GroupID}, []string{req.OldOwnerUserID, req.NewOwnerUserID}, nil)
members, err := s.GroupDatabase.FindGroupMember(ctx, []string{req.GroupID}, []string{req.OldOwnerUserID, req.NewOwnerUserID}, nil)
if err != nil {
return nil, err
}
@@ -759,7 +768,7 @@ func (s *groupServer) TransferGroupOwner(ctx context.Context, req *pbGroup.Trans
oldOwner := memberMap[req.OldOwnerUserID]
if tokenverify.IsAppManagerUid(ctx) {
if oldOwner == nil {
oldOwner, err = s.GroupInterface.TakeGroupOwner(ctx, req.OldOwnerUserID)
oldOwner, err = s.GroupDatabase.TakeGroupOwner(ctx, req.OldOwnerUserID)
if err != nil {
return nil, err
}
@@ -772,7 +781,7 @@ func (s *groupServer) TransferGroupOwner(ctx context.Context, req *pbGroup.Trans
return nil, constant.ErrNoPermission.Wrap(fmt.Sprintf("user %s no permission transfer group owner", tracelog.GetOpUserID(ctx)))
}
}
if err := s.GroupInterface.TransferGroupOwner(ctx, req.GroupID, req.OldOwnerUserID, req.NewOwnerUserID, newOwner.RoleLevel); err != nil {
if err := s.GroupDatabase.TransferGroupOwner(ctx, req.GroupID, req.OldOwnerUserID, req.NewOwnerUserID, newOwner.RoleLevel); err != nil {
return nil, err
}
s.Notification.GroupOwnerTransferredNotification(ctx, req)
@@ -786,10 +795,10 @@ func (s *groupServer) GetGroups(ctx context.Context, req *pbGroup.GetGroupsReq)
err error
)
if req.GroupID != "" {
groups, err = s.GroupInterface.FindGroup(ctx, []string{req.GroupID})
groups, err = s.GroupDatabase.FindGroup(ctx, []string{req.GroupID})
resp.Total = uint32(len(groups))
} else {
resp.Total, groups, err = s.GroupInterface.SearchGroup(ctx, req.GroupName, req.Pagination.PageNumber, req.Pagination.ShowNumber)
resp.Total, groups, err = s.GroupDatabase.SearchGroup(ctx, req.GroupName, req.Pagination.PageNumber, req.Pagination.ShowNumber)
}
if err != nil {
return nil, err
@@ -797,7 +806,7 @@ func (s *groupServer) GetGroups(ctx context.Context, req *pbGroup.GetGroupsReq)
groupIDs := utils.Slice(groups, func(e *relationTb.GroupModel) string {
return e.GroupID
})
ownerMembers, err := s.GroupInterface.FindGroupMember(ctx, groupIDs, nil, []int32{constant.GroupOwner})
ownerMembers, err := s.GroupDatabase.FindGroupMember(ctx, groupIDs, nil, []int32{constant.GroupOwner})
if err != nil {
return nil, err
}
@@ -807,7 +816,7 @@ func (s *groupServer) GetGroups(ctx context.Context, req *pbGroup.GetGroupsReq)
if ids := utils.Single(groupIDs, utils.Keys(ownerMemberMap)); len(ids) > 0 {
return nil, constant.ErrDB.Wrap("group not owner " + strings.Join(ids, ","))
}
groupMemberNumMap, err := s.GroupInterface.MapGroupMemberNum(ctx, groupIDs)
groupMemberNumMap, err := s.GroupDatabase.MapGroupMemberNum(ctx, groupIDs)
if err != nil {
return nil, err
}
@@ -820,7 +829,7 @@ func (s *groupServer) GetGroups(ctx context.Context, req *pbGroup.GetGroupsReq)
func (s *groupServer) GetGroupMembersCMS(ctx context.Context, req *pbGroup.GetGroupMembersCMSReq) (*pbGroup.GetGroupMembersCMSResp, error) {
resp := &pbGroup.GetGroupMembersCMSResp{}
total, members, err := s.GroupInterface.SearchGroupMember(ctx, req.UserName, []string{req.GroupID}, nil, nil, req.Pagination.PageNumber, req.Pagination.ShowNumber)
total, members, err := s.GroupDatabase.SearchGroupMember(ctx, req.UserName, []string{req.GroupID}, nil, nil, req.Pagination.PageNumber, req.Pagination.ShowNumber)
if err != nil {
return nil, err
}
@@ -846,7 +855,7 @@ func (s *groupServer) GetUserReqApplicationList(ctx context.Context, req *pbGrou
if err != nil {
return nil, err
}
total, requests, err := s.GroupInterface.PageGroupRequestUser(ctx, req.UserID, req.Pagination.PageNumber, req.Pagination.ShowNumber)
total, requests, err := s.GroupDatabase.PageGroupRequestUser(ctx, req.UserID, req.Pagination.PageNumber, req.Pagination.ShowNumber)
if err != nil {
return nil, err
}
@@ -857,7 +866,7 @@ func (s *groupServer) GetUserReqApplicationList(ctx context.Context, req *pbGrou
groupIDs := utils.Distinct(utils.Slice(requests, func(e *relationTb.GroupRequestModel) string {
return e.GroupID
}))
groups, err := s.GroupInterface.FindGroup(ctx, groupIDs)
groups, err := s.GroupDatabase.FindGroup(ctx, groupIDs)
if err != nil {
return nil, err
}
@@ -867,7 +876,7 @@ func (s *groupServer) GetUserReqApplicationList(ctx context.Context, req *pbGrou
if ids := utils.Single(groupIDs, utils.Keys(groupMap)); len(ids) > 0 {
return nil, constant.ErrGroupIDNotFound.Wrap(strings.Join(ids, ","))
}
owners, err := s.GroupInterface.FindGroupMember(ctx, groupIDs, nil, []int32{constant.GroupOwner})
owners, err := s.GroupDatabase.FindGroupMember(ctx, groupIDs, nil, []int32{constant.GroupOwner})
if err != nil {
return nil, err
}
@@ -877,7 +886,7 @@ func (s *groupServer) GetUserReqApplicationList(ctx context.Context, req *pbGrou
if ids := utils.Single(groupIDs, utils.Keys(ownerMap)); len(ids) > 0 {
return nil, constant.ErrData.Wrap("group no owner", strings.Join(ids, ","))
}
groupMemberNum, err := s.GroupInterface.MapGroupMemberNum(ctx, groupIDs)
groupMemberNum, err := s.GroupDatabase.MapGroupMemberNum(ctx, groupIDs)
if err != nil {
return nil, err
}
@@ -892,18 +901,18 @@ func (s *groupServer) DismissGroup(ctx context.Context, req *pbGroup.DismissGrou
if err := s.CheckGroupAdmin(ctx, req.GroupID); err != nil {
return nil, err
}
group, err := s.GroupInterface.TakeGroup(ctx, req.GroupID)
group, err := s.GroupDatabase.TakeGroup(ctx, req.GroupID)
if err != nil {
return nil, err
}
if group.Status == constant.GroupStatusDismissed {
return nil, constant.ErrArgs.Wrap("group status is dismissed")
}
if err := s.GroupInterface.DismissGroup(ctx, req.GroupID); err != nil {
if err := s.GroupDatabase.DismissGroup(ctx, req.GroupID); err != nil {
return nil, err
}
if group.GroupType == constant.SuperGroup {
if err := s.GroupInterface.DeleteSuperGroup(ctx, group.GroupID); err != nil {
if err := s.GroupDatabase.DeleteSuperGroup(ctx, group.GroupID); err != nil {
return nil, err
}
} else {
@@ -914,12 +923,12 @@ func (s *groupServer) DismissGroup(ctx context.Context, req *pbGroup.DismissGrou
func (s *groupServer) MuteGroupMember(ctx context.Context, req *pbGroup.MuteGroupMemberReq) (*pbGroup.MuteGroupMemberResp, error) {
resp := &pbGroup.MuteGroupMemberResp{}
member, err := s.GroupInterface.TakeGroupMember(ctx, req.GroupID, req.UserID)
member, err := s.GroupDatabase.TakeGroupMember(ctx, req.GroupID, req.UserID)
if err != nil {
return nil, err
}
if !(tracelog.GetOpUserID(ctx) == req.UserID || tokenverify.IsAppManagerUid(ctx)) {
opMember, err := s.GroupInterface.TakeGroupMember(ctx, req.GroupID, req.UserID)
opMember, err := s.GroupDatabase.TakeGroupMember(ctx, req.GroupID, req.UserID)
if err != nil {
return nil, err
}
@@ -928,7 +937,7 @@ func (s *groupServer) MuteGroupMember(ctx context.Context, req *pbGroup.MuteGrou
}
}
data := UpdateGroupMemberMutedTimeMap(time.Now().Add(time.Second * time.Duration(req.MutedSeconds)))
if err := s.GroupInterface.UpdateGroupMember(ctx, member.GroupID, member.UserID, data); err != nil {
if err := s.GroupDatabase.UpdateGroupMember(ctx, member.GroupID, member.UserID, data); err != nil {
return nil, err
}
s.Notification.GroupMemberMutedNotification(ctx, req.GroupID, req.UserID, req.MutedSeconds)
@@ -937,12 +946,12 @@ func (s *groupServer) MuteGroupMember(ctx context.Context, req *pbGroup.MuteGrou
func (s *groupServer) CancelMuteGroupMember(ctx context.Context, req *pbGroup.CancelMuteGroupMemberReq) (*pbGroup.CancelMuteGroupMemberResp, error) {
resp := &pbGroup.CancelMuteGroupMemberResp{}
member, err := s.GroupInterface.TakeGroupMember(ctx, req.GroupID, req.UserID)
member, err := s.GroupDatabase.TakeGroupMember(ctx, req.GroupID, req.UserID)
if err != nil {
return nil, err
}
if !(tracelog.GetOpUserID(ctx) == req.UserID || tokenverify.IsAppManagerUid(ctx)) {
opMember, err := s.GroupInterface.TakeGroupMember(ctx, req.GroupID, tracelog.GetOpUserID(ctx))
opMember, err := s.GroupDatabase.TakeGroupMember(ctx, req.GroupID, tracelog.GetOpUserID(ctx))
if err != nil {
return nil, err
}
@@ -951,7 +960,7 @@ func (s *groupServer) CancelMuteGroupMember(ctx context.Context, req *pbGroup.Ca
}
}
data := UpdateGroupMemberMutedTimeMap(time.Unix(0, 0))
if err := s.GroupInterface.UpdateGroupMember(ctx, member.GroupID, member.UserID, data); err != nil {
if err := s.GroupDatabase.UpdateGroupMember(ctx, member.GroupID, member.UserID, data); err != nil {
return nil, err
}
s.Notification.GroupMemberCancelMutedNotification(ctx, req.GroupID, req.UserID)
@@ -963,7 +972,7 @@ func (s *groupServer) MuteGroup(ctx context.Context, req *pbGroup.MuteGroupReq)
if err := s.CheckGroupAdmin(ctx, req.GroupID); err != nil {
return nil, err
}
if err := s.GroupInterface.UpdateGroup(ctx, req.GroupID, UpdateGroupStatusMap(constant.GroupStatusMuted)); err != nil {
if err := s.GroupDatabase.UpdateGroup(ctx, req.GroupID, UpdateGroupStatusMap(constant.GroupStatusMuted)); err != nil {
return nil, err
}
s.Notification.GroupMutedNotification(ctx, req.GroupID)
@@ -975,7 +984,7 @@ func (s *groupServer) CancelMuteGroup(ctx context.Context, req *pbGroup.CancelMu
if err := s.CheckGroupAdmin(ctx, req.GroupID); err != nil {
return nil, err
}
if err := s.GroupInterface.UpdateGroup(ctx, req.GroupID, UpdateGroupStatusMap(constant.GroupOk)); err != nil {
if err := s.GroupDatabase.UpdateGroup(ctx, req.GroupID, UpdateGroupStatusMap(constant.GroupOk)); err != nil {
return nil, err
}
s.Notification.GroupCancelMutedNotification(ctx, req.GroupID)
@@ -1001,7 +1010,7 @@ func (s *groupServer) SetGroupMemberInfo(ctx context.Context, req *pbGroup.SetGr
}
groupIDs := utils.Keys(groupIDMap)
userIDs := utils.Keys(userIDMap)
members, err := s.GroupInterface.FindGroupMember(ctx, groupIDs, append(userIDs, tracelog.GetOpUserID(ctx)), nil)
members, err := s.GroupDatabase.FindGroupMember(ctx, groupIDs, append(userIDs, tracelog.GetOpUserID(ctx)), nil)
if err != nil {
return nil, err
}
@@ -1044,7 +1053,7 @@ func (s *groupServer) SetGroupMemberInfo(ctx context.Context, req *pbGroup.SetGr
return nil, err
}
}
err = s.GroupInterface.UpdateGroupMembers(ctx, utils.Slice(req.Members, func(e *pbGroup.SetGroupMemberInfo) *relationTb.BatchUpdateGroupMember {
err = s.GroupDatabase.UpdateGroupMembers(ctx, utils.Slice(req.Members, func(e *pbGroup.SetGroupMemberInfo) *relationTb.BatchUpdateGroupMember {
return &relationTb.BatchUpdateGroupMember{
GroupID: e.GroupID,
UserID: e.UserID,
@@ -1068,7 +1077,7 @@ func (s *groupServer) GetGroupAbstractInfo(ctx context.Context, req *pbGroup.Get
if utils.Duplicate(req.GroupIDs) {
return nil, constant.ErrArgs.Wrap("groupIDs duplicate")
}
groups, err := s.GroupInterface.FindGroup(ctx, req.GroupIDs)
groups, err := s.GroupDatabase.FindGroup(ctx, req.GroupIDs)
if err != nil {
return nil, err
}
@@ -1077,7 +1086,7 @@ func (s *groupServer) GetGroupAbstractInfo(ctx context.Context, req *pbGroup.Get
})); len(ids) > 0 {
return nil, constant.ErrGroupIDNotFound.Wrap("not found group " + strings.Join(ids, ","))
}
groupUserMap, err := s.GroupInterface.MapGroupMemberUserID(ctx, req.GroupIDs)
groupUserMap, err := s.GroupDatabase.MapGroupMemberUserID(ctx, req.GroupIDs)
if err != nil {
return nil, err
}
@@ -1096,7 +1105,7 @@ func (s *groupServer) GetUserInGroupMembers(ctx context.Context, req *pbGroup.Ge
if len(req.GroupIDs) == 0 {
return nil, constant.ErrArgs.Wrap("groupIDs empty")
}
members, err := s.GroupInterface.FindGroupMember(ctx, []string{req.UserID}, req.GroupIDs, nil)
members, err := s.GroupDatabase.FindGroupMember(ctx, []string{req.UserID}, req.GroupIDs, nil)
if err != nil {
return nil, err
}
@@ -1118,7 +1127,7 @@ func (s *groupServer) GetUserInGroupMembers(ctx context.Context, req *pbGroup.Ge
func (s *groupServer) GetGroupMemberUserID(ctx context.Context, req *pbGroup.GetGroupMemberUserIDReq) (*pbGroup.GetGroupMemberUserIDResp, error) {
resp := &pbGroup.GetGroupMemberUserIDResp{}
var err error
resp.UserIDs, err = s.GroupInterface.FindGroupMemberUserID(ctx, req.GroupID)
resp.UserIDs, err = s.GroupDatabase.FindGroupMemberUserID(ctx, req.GroupID)
if err != nil {
return nil, err
}
@@ -1130,7 +1139,7 @@ func (s *groupServer) GetGroupMemberRoleLevel(ctx context.Context, req *pbGroup.
if len(req.RoleLevels) == 0 {
return nil, constant.ErrArgs.Wrap("RoleLevels empty")
}
members, err := s.GroupInterface.FindGroupMember(ctx, []string{req.GroupID}, nil, req.RoleLevels)
members, err := s.GroupDatabase.FindGroupMember(ctx, []string{req.GroupID}, nil, req.RoleLevels)
if err != nil {
return nil, err
}
+7 -7
View File
@@ -14,14 +14,14 @@ import (
func (s *groupServer) GetJoinedSuperGroupList(ctx context.Context, req *pbGroup.GetJoinedSuperGroupListReq) (*pbGroup.GetJoinedSuperGroupListResp, error) {
resp := &pbGroup.GetJoinedSuperGroupListResp{}
joinSuperGroup, err := s.GroupInterface.FindJoinSuperGroup(ctx, req.UserID)
joinSuperGroup, err := s.GroupDatabase.FindJoinSuperGroup(ctx, req.UserID)
if err != nil {
return nil, err
}
if len(joinSuperGroup.GroupIDs) == 0 {
return resp, nil
}
owners, err := s.GroupInterface.FindGroupMember(ctx, joinSuperGroup.GroupIDs, nil, []int32{constant.GroupOwner})
owners, err := s.GroupDatabase.FindGroupMember(ctx, joinSuperGroup.GroupIDs, nil, []int32{constant.GroupOwner})
if err != nil {
return nil, err
}
@@ -31,7 +31,7 @@ func (s *groupServer) GetJoinedSuperGroupList(ctx context.Context, req *pbGroup.
if ids := utils.Single(joinSuperGroup.GroupIDs, utils.Keys(ownerMap)); len(ids) > 0 {
return nil, constant.ErrData.Wrap(fmt.Sprintf("super group %s not owner", strings.Join(ids, ",")))
}
groups, err := s.GroupInterface.FindGroup(ctx, joinSuperGroup.GroupIDs)
groups, err := s.GroupDatabase.FindGroup(ctx, joinSuperGroup.GroupIDs)
if err != nil {
return nil, err
}
@@ -41,7 +41,7 @@ func (s *groupServer) GetJoinedSuperGroupList(ctx context.Context, req *pbGroup.
if ids := utils.Single(joinSuperGroup.GroupIDs, utils.Keys(groupMap)); len(ids) > 0 {
return nil, constant.ErrData.Wrap(fmt.Sprintf("super group info %s not found", strings.Join(ids, ",")))
}
superGroupMembers, err := s.GroupInterface.FindSuperGroup(ctx, joinSuperGroup.GroupIDs)
superGroupMembers, err := s.GroupDatabase.FindSuperGroup(ctx, joinSuperGroup.GroupIDs)
if err != nil {
return nil, err
}
@@ -59,18 +59,18 @@ func (s *groupServer) GetSuperGroupsInfo(ctx context.Context, req *pbGroup.GetSu
if len(req.GroupIDs) == 0 {
return nil, constant.ErrArgs.Wrap("groupIDs empty")
}
groups, err := s.GroupInterface.FindGroup(ctx, req.GroupIDs)
groups, err := s.GroupDatabase.FindGroup(ctx, req.GroupIDs)
if err != nil {
return nil, err
}
superGroupMembers, err := s.GroupInterface.FindSuperGroup(ctx, req.GroupIDs)
superGroupMembers, err := s.GroupDatabase.FindSuperGroup(ctx, req.GroupIDs)
if err != nil {
return nil, err
}
superGroupMemberMap := utils.SliceToMapAny(superGroupMembers, func(e *unrelation.SuperGroupModel) (string, []string) {
return e.GroupID, e.MemberIDs
})
owners, err := s.GroupInterface.FindGroupMember(ctx, req.GroupIDs, nil, []int32{constant.GroupOwner})
owners, err := s.GroupDatabase.FindGroupMember(ctx, req.GroupIDs, nil, []int32{constant.GroupOwner})
if err != nil {
return nil, err
}
+1 -1
View File
@@ -32,7 +32,7 @@ func toCommonCallback(ctx context.Context, msg *pbChat.SendMsgReq, command strin
AtUserIDList: msg.MsgData.AtUserIDList,
SenderFaceURL: msg.MsgData.SenderFaceURL,
Content: utils.GetContent(msg.MsgData),
Seq: msg.MsgData.Seq,
Seq: uint32(msg.MsgData.Seq),
Ex: msg.MsgData.Ex,
}
}
+13 -7
View File
@@ -9,7 +9,7 @@ import (
func (m *msgServer) DelMsgList(ctx context.Context, req *common.DelMsgListReq) (*common.DelMsgListResp, error) {
resp := &common.DelMsgListResp{}
if err := m.MsgInterface.DelMsgFromCache(ctx, req.UserID, req.SeqList); err != nil {
if _, err := m.MsgInterface.DelMsgBySeqs(ctx, req.UserID, req.SeqList); err != nil {
return nil, err
}
DeleteMessageNotification(ctx, req.UserID, req.SeqList)
@@ -21,11 +21,14 @@ func (m *msgServer) DelSuperGroupMsg(ctx context.Context, req *msg.DelSuperGroup
if err := tokenverify.CheckAdmin(ctx); err != nil {
return nil, err
}
maxSeq, err := m.MsgInterface.GetGroupMaxSeq(ctx, req.GroupID)
if err != nil {
return nil, err
}
if err := m.MsgInterface.SetGroupUserMinSeq(ctx, req.GroupID, maxSeq); err != nil {
//maxSeq, err := m.MsgInterface.GetGroupMaxSeq(ctx, req.GroupID)
//if err != nil {
// return nil, err
//}
//if err := m.MsgInterface.SetGroupUserMinSeq(ctx, req.GroupID, maxSeq); err != nil {
// return nil, err
//}
if err := m.MsgInterface.DeleteUserSuperGroupMsgsAndSetMinSeq(ctx, req.GroupID, req.UserID, 0); err != nil {
return nil, err
}
return resp, nil
@@ -36,8 +39,11 @@ func (m *msgServer) ClearMsg(ctx context.Context, req *msg.ClearMsgReq) (*msg.Cl
if err := tokenverify.CheckAccessV3(ctx, req.UserID); err != nil {
return nil, err
}
if err := m.MsgInterface.DelUserAllSeq(ctx, req.UserID); err != nil {
if err := m.MsgInterface.CleanUpUserMsg(ctx, req.UserID); err != nil {
return nil, err
}
//if err := m.MsgInterface.DelUserAllSeq(ctx, req.UserID); err != nil {
// return nil, err
//}
return resp, nil
}
-1
View File
@@ -3,7 +3,6 @@ package msg
import (
"Open_IM/internal/common/notification"
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/db"
"Open_IM/pkg/common/log"
"Open_IM/pkg/proto/msg"
"Open_IM/pkg/proto/sdkws"
+35 -50
View File
@@ -5,10 +5,8 @@ import (
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/http"
"Open_IM/pkg/common/log"
"Open_IM/pkg/common/tracelog"
"Open_IM/pkg/proto/msg"
"Open_IM/pkg/utils"
"context"
)
@@ -29,7 +27,7 @@ func CallbackSetMessageReactionExtensions(ctx context.Context, setReq *msg.SetMe
MsgFirstModifyTime: setReq.MsgFirstModifyTime,
}
resp := &cbapi.CallbackBeforeSetMessageReactionExtResp{}
return http.CallBackPostReturn(config.Config.Callback.CallbackUrl, req, resp, config.Config.Callback.CallbackAfterSendGroupMsg)
return http.CallBackPostReturn(cbURL(), req, resp, config.Config.Callback.CallbackAfterSendGroupMsg)
}
func CallbackDeleteMessageReactionExtensions(setReq *msg.DeleteMessageListReactionExtensionsReq) error {
@@ -48,52 +46,39 @@ func CallbackDeleteMessageReactionExtensions(setReq *msg.DeleteMessageListReacti
MsgFirstModifyTime: setReq.MsgFirstModifyTime,
}
resp := &cbapi.CallbackDeleteMessageReactionExtResp{}
defer log.NewDebug(setReq.OperationID, utils.GetSelfFuncName(), req, *resp)
return http.CallBackPostReturn(config.Config.Callback.CallbackUrl, req, resp, config.Config.Callback.CallbackAfterSendGroupMsg)
return http.CallBackPostReturn(cbURL(), req, resp, config.Config.Callback.CallbackAfterSendGroupMsg)
}
//func CallbackGetMessageListReactionExtensions(getReq *msg.GetMessageListReactionExtensionsReq) error {
// if !config.Config.Callback.CallbackAfterSendGroupMsg.Enable {
// return nil
// }
// req := cbapi.CallbackGetMessageListReactionExtReq{
// OperationID: getReq.OperationID,
// CallbackCommand: constant.CallbackGetMessageListReactionExtensionsCommand,
// SourceID: getReq.SourceID,
// OpUserID: getReq.OpUserID,
// SessionType: getReq.SessionType,
// TypeKeyList: getReq.TypeKeyList,
// MessageKeyList: getReq.MessageReactionKeyList,
// }
// resp := &cbApi.CallbackGetMessageListReactionExtResp{CommonCallbackResp: &callbackResp}
// defer log.NewDebug(getReq.OperationID, utils.GetSelfFuncName(), req, *resp)
// if err := http.CallBackPostReturn(config.Config.Callback.CallbackUrl, constant.CallbackGetMessageListReactionExtensionsCommand, req, resp, config.Config.Callback.CallbackAfterSendGroupMsg.CallbackTimeOut); err != nil {
// callbackResp.ErrCode = http2.StatusInternalServerError
// callbackResp.ErrMsg = err.Error()
// }
// return resp
//}
//func callbackAddMessageReactionExtensions(setReq *msg.AddMessageReactionExtensionsReq) *cb.CallbackAddMessageReactionExtResp {
// callbackResp := cbapi.CommonCallbackResp{OperationID: setReq.OperationID}
// log.NewDebug(setReq.OperationID, utils.GetSelfFuncName(), setReq.String())
// req := cbapi.CallbackAddMessageReactionExtReq{
// OperationID: setReq.OperationID,
// CallbackCommand: constant.CallbackAddMessageListReactionExtensionsCommand,
// SourceID: setReq.SourceID,
// OpUserID: setReq.OpUserID,
// SessionType: setReq.SessionType,
// ReactionExtensionList: setReq.ReactionExtensionList,
// ClientMsgID: setReq.ClientMsgID,
// IsReact: setReq.IsReact,
// IsExternalExtensions: setReq.IsExternalExtensions,
// MsgFirstModifyTime: setReq.MsgFirstModifyTime,
// }
// resp := &cbapi.CallbackAddMessageReactionExtResp{CommonCallbackResp: &callbackResp}
// defer log.NewDebug(setReq.OperationID, utils.GetSelfFuncName(), req, *resp, *resp.CommonCallbackResp, resp.IsReact, resp.MsgFirstModifyTime)
// if err := http.CallBackPostReturn(config.Config.Callback.CallbackUrl, constant.CallbackAddMessageListReactionExtensionsCommand, req, resp, config.Config.Callback.CallbackAfterSendGroupMsg.CallbackTimeOut); err != nil {
// callbackResp.ErrCode = http2.StatusInternalServerError
// callbackResp.ErrMsg = err.Error()
// }
// return resp
//
//}
func CallbackGetMessageListReactionExtensions(getReq *msg.GetMessageListReactionExtensionsReq) error {
if !config.Config.Callback.CallbackAfterSendGroupMsg.Enable {
return nil
}
req := &cbapi.CallbackGetMessageListReactionExtReq{
OperationID: getReq.OperationID,
CallbackCommand: constant.CallbackGetMessageListReactionExtensionsCommand,
SourceID: getReq.SourceID,
OpUserID: getReq.OpUserID,
SessionType: getReq.SessionType,
TypeKeyList: getReq.TypeKeyList,
MessageKeyList: getReq.MessageReactionKeyList,
}
resp := &cbapi.CallbackGetMessageListReactionExtResp{}
return http.CallBackPostReturn(cbURL(), req, resp, config.Config.Callback.CallbackAfterSendGroupMsg)
}
func CallbackAddMessageReactionExtensions(setReq *msg.ModifyMessageReactionExtensionsReq) error {
req := &cbapi.CallbackAddMessageReactionExtReq{
OperationID: setReq.OperationID,
CallbackCommand: constant.CallbackAddMessageListReactionExtensionsCommand,
SourceID: setReq.SourceID,
OpUserID: setReq.OpUserID,
SessionType: setReq.SessionType,
ReactionExtensionList: setReq.ReactionExtensionList,
ClientMsgID: setReq.ClientMsgID,
IsReact: setReq.IsReact,
IsExternalExtensions: setReq.IsExternalExtensions,
MsgFirstModifyTime: setReq.MsgFirstModifyTime,
}
resp := &cbapi.CallbackAddMessageReactionExtResp{}
return http.CallBackPostReturn(cbURL(), req, resp, config.Config.Callback.CallbackAfterSendGroupMsg)
}
-1
View File
@@ -1,7 +1,6 @@
package msg
import (
"Open_IM/pkg/common/db"
"time"
)
+5 -5
View File
@@ -165,14 +165,14 @@ func (m *msgServer) messageVerification(ctx context.Context, data *msg.SendMsgRe
}
if revokeMessage.RevokerID != revokeMessage.SourceMessageSendID {
resp, err := m.MsgInterface.GetSuperGroupMsg(ctx, data.MsgData.GroupID, revokeMessage.Seq)
resp, err := m.MsgInterface.GetSuperGroupMsgBySeqs(ctx, data.MsgData.GroupID, []int64{int64(revokeMessage.Seq)})
if err != nil {
return nil, err
}
if resp.ClientMsgID == revokeMessage.ClientMsgID && resp.Seq == revokeMessage.Seq {
revokeMessage.SourceMessageSendTime = resp.SendTime
revokeMessage.SourceMessageSenderNickname = resp.SenderNickname
revokeMessage.SourceMessageSendID = resp.SendID
if resp[0].ClientMsgID == revokeMessage.ClientMsgID && resp[0].Seq == int64(revokeMessage.Seq) {
revokeMessage.SourceMessageSendTime = resp[0].SendTime
revokeMessage.SourceMessageSenderNickname = resp[0].SenderNickname
revokeMessage.SourceMessageSendID = resp[0].SendID
data.MsgData.Content = []byte(utils.StructToJsonString(revokeMessage))
} else {
return nil, constant.ErrData.Wrap("MsgData")
+2 -2
View File
@@ -2,7 +2,7 @@ package msg
import (
"Open_IM/pkg/common/constant"
promePkg "Open_IM/pkg/common/prometheus"
promePkg "Open_IM/pkg/common/prome"
pbConversation "Open_IM/pkg/proto/conversation"
"Open_IM/pkg/proto/msg"
"Open_IM/pkg/proto/sdkws"
@@ -67,7 +67,7 @@ func (m *msgServer) sendMsgSingleChat(ctx context.Context, req *msg.SendMsgReq)
if err != nil {
return nil, err
}
isSend, err := modifyMessageByUserMessageReceiveOpt(req.MsgData.RecvID, req.MsgData.SendID, constant.SingleChatType, req)
isSend, err := m.modifyMessageByUserMessageReceiveOpt(ctx, req.MsgData.RecvID, req.MsgData.SendID, constant.SingleChatType, req)
if err != nil {
return nil, err
}
+1 -1
View File
@@ -9,7 +9,7 @@ import (
discoveryRegistry "Open_IM/pkg/discoveryregistry"
"github.com/OpenIMSDK/openKeeper"
promePkg "Open_IM/pkg/common/prometheus"
promePkg "Open_IM/pkg/common/prome"
"Open_IM/pkg/proto/msg"
"google.golang.org/grpc"
)
+30 -23
View File
@@ -4,13 +4,14 @@ import (
"Open_IM/internal/common/check"
"Open_IM/internal/common/convert"
"Open_IM/internal/common/notification"
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/db/controller"
"Open_IM/pkg/common/db/relation"
tablerelation "Open_IM/pkg/common/db/table/relation"
"Open_IM/pkg/common/tokenverify"
"Open_IM/pkg/common/tracelog"
discoveryRegistry "Open_IM/pkg/discoveryregistry"
registry "Open_IM/pkg/discoveryregistry"
"Open_IM/pkg/proto/sdkws"
pbuser "Open_IM/pkg/proto/user"
"Open_IM/pkg/utils"
@@ -20,27 +21,37 @@ import (
)
type userServer struct {
controller.UserInterface
controller.UserDatabase
notification *notification.Check
userCheck *check.UserCheck
ConversationChecker *check.ConversationChecker
RegisterCenter discoveryRegistry.SvcDiscoveryRegistry
RegisterCenter registry.SvcDiscoveryRegistry
friendCheck *check.FriendChecker
}
func Start(client *openKeeper.ZkClient, server *grpc.Server) error {
mysql, err := relation.NewGormDB()
gormDB, err := relation.NewGormDB()
if err != nil {
return err
}
if err := mysql.AutoMigrate(&tablerelation.UserModel{}); err != nil {
if err := gormDB.AutoMigrate(&tablerelation.UserModel{}); err != nil {
return err
}
pbuser.RegisterUserServer(server, &userServer{
UserInterface: controller.NewUserController(mysql),
u := &userServer{
UserDatabase: controller.NewUserDatabase(relation.NewUserGorm(gormDB)),
notification: notification.NewCheck(client),
userCheck: check.NewUserCheck(client),
RegisterCenter: client,
})
}
pbuser.RegisterUserServer(server, u)
users := make([]*tablerelation.UserModel, 0)
if len(config.Config.Manager.AppManagerUid) != len(config.Config.Manager.Nickname) {
return constant.ErrConfig.Wrap("len(config.Config.Manager.AppManagerUid) != len(config.Config.Manager.Nickname)")
}
for k, v := range config.Config.Manager.AppManagerUid {
users = append(users, &tablerelation.UserModel{UserID: v, Nickname: config.Config.Manager.Nickname[k]})
}
u.UserDatabase.InitOnce(context.Background(), users)
return nil
}
@@ -58,10 +69,6 @@ func (s *userServer) GetDesignateUsers(ctx context.Context, req *pbuser.GetDesig
return resp, nil
}
func (s *userServer) GetAllPageFriends(ctx context.Context, ownerUserID string) (resp []*sdkws.FriendInfo, err error) {
return
}
// ok
func (s *userServer) UpdateUserInfo(ctx context.Context, req *pbuser.UpdateUserInfoReq) (resp *pbuser.UpdateUserInfoResp, err error) {
resp = &pbuser.UpdateUserInfoResp{}
@@ -69,14 +76,6 @@ func (s *userServer) UpdateUserInfo(ctx context.Context, req *pbuser.UpdateUserI
if err != nil {
return nil, err
}
//oldNickname := ""
//if req.UserInfo.Nickname != "" {
// u, err := s.FindWithError(ctx, []string{req.UserInfo.UserID})
// if err != nil {
// return nil, err
// }
// oldNickname = u[0].Nickname
//}
user, err := convert.NewPBUser(req.UserInfo).Convert()
if err != nil {
return nil, err
@@ -85,7 +84,7 @@ func (s *userServer) UpdateUserInfo(ctx context.Context, req *pbuser.UpdateUserI
if err != nil {
return nil, err
}
friends, err := s.GetAllPageFriends(ctx, req.UserInfo.UserID)
friends, err := s.friendCheck.GetAllPageFriends(ctx, req.UserInfo.UserID)
if err != nil {
return nil, err
}
@@ -94,9 +93,7 @@ func (s *userServer) UpdateUserInfo(ctx context.Context, req *pbuser.UpdateUserI
s.notification.FriendInfoUpdatedNotification(ctx, req.UserInfo.UserID, v.FriendUser.UserID, tracelog.GetOpUserID(ctx))
}
}()
s.notification.UserInfoUpdatedNotification(ctx, tracelog.GetOpUserID(ctx), req.UserInfo.UserID)
return resp, nil
}
@@ -184,3 +181,13 @@ func (s *userServer) UserRegister(ctx context.Context, req *pbuser.UserRegisterR
}
return resp, nil
}
func (s *userServer) GetGlobalRecvMessageOpt(ctx context.Context, req *pbuser.GetGlobalRecvMessageOptReq) (resp *pbuser.GetGlobalRecvMessageOptResp, err error) {
resp = &pbuser.GetGlobalRecvMessageOptResp{}
user, err := s.FindWithError(ctx, []string{req.UserID})
if err != nil {
return nil, err
}
resp.GlobalRecvMsgOpt = user[0].GlobalRecvMsgOpt
return resp, nil
}
+1 -1
View File
@@ -6,7 +6,7 @@ import (
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/log"
"Open_IM/pkg/common/middleware"
promePkg "Open_IM/pkg/common/prometheus"
promePkg "Open_IM/pkg/common/prome"
"flag"
"fmt"
"github.com/OpenIMSDK/openKeeper"
+19
View File
@@ -0,0 +1,19 @@
package tx
import (
"gorm.io/gorm"
)
func NewGorm(db *gorm.DB) Tx {
return &_Gorm{tx: db}
}
type _Gorm struct {
tx *gorm.DB
}
func (g *_Gorm) Transaction(fn func(tx any) error) error {
return g.tx.Transaction(func(tx *gorm.DB) error {
return fn(tx)
})
}
+31
View File
@@ -0,0 +1,31 @@
package tx
import (
"Open_IM/pkg/utils"
"context"
"go.mongodb.org/mongo-driver/mongo"
)
func NewMongo(client *mongo.Client) CtxTx {
return &_Mongo{
client: client,
}
}
type _Mongo struct {
client *mongo.Client
}
func (m *_Mongo) Transaction(ctx context.Context, fn func(ctx context.Context) error) error {
sess, err := m.client.StartSession()
if err != nil {
return err
}
sCtx := mongo.NewSessionContext(ctx, sess)
defer sess.EndSession(sCtx)
if err := fn(sCtx); err != nil {
_ = sess.AbortTransaction(sCtx)
return err
}
return utils.Wrap(sess.CommitTransaction(sCtx), "")
}
+11
View File
@@ -0,0 +1,11 @@
package tx
import "context"
type Tx interface {
Transaction(fn func(tx any) error) error
}
type CtxTx interface {
Transaction(ctx context.Context, fn func(ctx context.Context) error) error
}