mirror of
https://github.com/openimsdk/open-im-server.git
synced 2026-04-28 22:39:18 +08:00
Compare commits
8 Commits
v3.5.1
...
v3.5.0-rc.4
| Author | SHA1 | Date | |
|---|---|---|---|
| e1422ec8f4 | |||
| ad47590e13 | |||
| a42a44e0a3 | |||
| d8838ee6b8 | |||
| f480f52e2d | |||
| a23cbf13cf | |||
| 498e26a942 | |||
| e1990c179e |
@@ -48,6 +48,7 @@ jobs:
|
||||
images: openim/openim-server
|
||||
# generate Docker tags based on the following events/attributes
|
||||
tags: |
|
||||
type=ref,event=tag
|
||||
type=schedule
|
||||
type=ref,event=branch
|
||||
type=ref,event=pr
|
||||
@@ -90,6 +91,7 @@ jobs:
|
||||
images: registry.cn-hangzhou.aliyuncs.com/openimsdk/openim-server
|
||||
# generate Docker tags based on the following events/attributes
|
||||
tags: |
|
||||
type=ref,event=tag
|
||||
type=schedule
|
||||
type=ref,event=branch
|
||||
type=ref,event=pr
|
||||
@@ -133,6 +135,7 @@ jobs:
|
||||
images: ghcr.io/openimsdk/openim-server
|
||||
# generate Docker tags based on the following events/attributes
|
||||
tags: |
|
||||
type=ref,event=tag
|
||||
type=schedule
|
||||
type=ref,event=branch
|
||||
type=ref,event=pr
|
||||
|
||||
@@ -1,139 +0,0 @@
|
||||
# Copyright © 2023 OpenIM open source community. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
name: Build OpenIM Web Docker image
|
||||
|
||||
on:
|
||||
# schedule:
|
||||
# - cron: '30 3 * * *'
|
||||
push:
|
||||
branches:
|
||||
# - main
|
||||
- release-*
|
||||
tags:
|
||||
- v*
|
||||
workflow_dispatch:
|
||||
|
||||
env:
|
||||
# Common versions
|
||||
GO_VERSION: "1.20"
|
||||
|
||||
jobs:
|
||||
build-openim-web-dockerhub:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v4
|
||||
- name: Set up QEMU
|
||||
uses: docker/setup-qemu-action@v3
|
||||
- name: Set up Docker Buildx
|
||||
uses: docker/setup-buildx-action@v3
|
||||
|
||||
# docker.io/openim/openim-web:latest
|
||||
- name: Extract metadata (tags, labels) for Docker
|
||||
id: meta
|
||||
uses: docker/metadata-action@v5.0.0
|
||||
with:
|
||||
images: openim/openim-web
|
||||
# generate Docker tags based on the following events/attributes
|
||||
tags: |
|
||||
type=schedule
|
||||
type=ref,event=branch
|
||||
type=ref,event=pr
|
||||
type=semver,pattern={{version}}
|
||||
type=semver,pattern={{major}}.{{minor}}
|
||||
type=semver,pattern={{major}}
|
||||
type=sha
|
||||
|
||||
- name: Log in to Docker Hub
|
||||
uses: docker/login-action@v3
|
||||
with:
|
||||
username: ${{ secrets.DOCKER_USERNAME }}
|
||||
password: ${{ secrets.DOCKER_PASSWORD }}
|
||||
|
||||
- name: Build and push Docker image
|
||||
uses: docker/build-push-action@v5
|
||||
with:
|
||||
context: .
|
||||
file: ./build/images/openim-tools/openim-web/Dockerfile
|
||||
platforms: linux/amd64,linux/arm64
|
||||
push: ${{ github.event_name != 'pull_request' }}
|
||||
tags: ${{ steps.meta.outputs.tags }}
|
||||
labels: ${{ steps.meta.outputs.labels }}
|
||||
|
||||
build-openim-web-aliyun:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v4
|
||||
- name: Set up QEMU
|
||||
uses: docker/setup-qemu-action@v3
|
||||
- name: Set up Docker Buildx
|
||||
uses: docker/setup-buildx-action@v3
|
||||
# registry.cn-hangzhou.aliyuncs.com/openimsdk/openim-web:latest
|
||||
- name: Extract metadata (tags, labels) for Docker
|
||||
id: meta2
|
||||
uses: docker/metadata-action@v5.0.0
|
||||
with:
|
||||
images: registry.cn-hangzhou.aliyuncs.com/openimsdk/openim-web
|
||||
|
||||
- name: Log in to AliYun Docker Hub
|
||||
uses: docker/login-action@v3
|
||||
with:
|
||||
registry: registry.cn-hangzhou.aliyuncs.com
|
||||
username: ${{ secrets.ALIREGISTRY_USERNAME }}
|
||||
password: ${{ secrets.ALIREGISTRY_TOKEN }}
|
||||
|
||||
- name: Build and push Docker image
|
||||
uses: docker/build-push-action@v5
|
||||
with:
|
||||
context: .
|
||||
file: ./build/images/openim-tools/openim-web/Dockerfile
|
||||
platforms: linux/amd64,linux/arm64
|
||||
push: ${{ github.event_name != 'pull_request' }}
|
||||
tags: ${{ steps.meta2.outputs.tags }}
|
||||
labels: ${{ steps.meta2.outputs.labels }}
|
||||
|
||||
build-openim-web-ghcr:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v4
|
||||
- name: Set up QEMU
|
||||
uses: docker/setup-qemu-action@v3
|
||||
- name: Set up Docker Buildx
|
||||
uses: docker/setup-buildx-action@v3
|
||||
# ghcr.io/openimsdk/openim-web:latest
|
||||
- name: Extract metadata (tags, labels) for Docker
|
||||
id: meta2
|
||||
uses: docker/metadata-action@v5.0.0
|
||||
with:
|
||||
images: ghcr.io/openimsdk/openim-web
|
||||
|
||||
- name: Log in to GitHub Container Registry
|
||||
uses: docker/login-action@v3
|
||||
with:
|
||||
registry: ghcr.io
|
||||
username: ${{ github.repository_owner }}
|
||||
password: ${{ secrets.GITHUB_TOKEN }}
|
||||
|
||||
- name: Build and push Docker image
|
||||
uses: docker/build-push-action@v5
|
||||
with:
|
||||
context: .
|
||||
file: ./build/images/openim-tools/openim-web/Dockerfile
|
||||
platforms: linux/amd64,linux/arm64
|
||||
push: ${{ github.event_name != 'pull_request' }}
|
||||
tags: ${{ steps.meta2.outputs.tags }}
|
||||
labels: ${{ steps.meta2.outputs.labels }}
|
||||
@@ -37,13 +37,20 @@ zookeeper:
|
||||
|
||||
###################### Mongo ######################
|
||||
# MongoDB configuration
|
||||
# If uri is not empty, it will be used directly
|
||||
#
|
||||
# MongoDB address for standalone setup, Mongos address for sharded cluster setup
|
||||
# Default MongoDB database name
|
||||
# Maximum connection pool size
|
||||
|
||||
# If uri is not empty, it will be used directly for the MongoDB connection.
|
||||
# This is a complete MongoDB URI string.
|
||||
# Example: mongodb://user:password@host1:port1,host2:port2/dbname?options
|
||||
mongo:
|
||||
uri: ${MONGO_URI}
|
||||
|
||||
# List of MongoDB server addresses.
|
||||
# Used for constructing the MongoDB URI if 'uri' above is empty.
|
||||
# For a standalone setup, specify the address of the single server.
|
||||
# For a sharded cluster, specify the addresses of the Mongos servers.
|
||||
# Example: [ '172.28.0.1:37017', '172.28.0.2:37017' ]
|
||||
# Default MongoDB database name
|
||||
# Maximum connection pool size
|
||||
address: [ ${MONGO_ADDRESS}:${MONGO_PORT} ]
|
||||
database: ${MONGO_DATABASE}
|
||||
username: ${MONGO_USERNAME}
|
||||
|
||||
@@ -34,7 +34,6 @@ services:
|
||||
ipv4_address: ${MONGO_NETWORK_ADDRESS:-172.28.0.2}
|
||||
|
||||
redis:
|
||||
# image: redis:7.0.0
|
||||
image: redis:${REDIS_IMAGE_VERSION:-7.0.0}
|
||||
container_name: redis
|
||||
ports:
|
||||
@@ -53,7 +52,6 @@ services:
|
||||
ipv4_address: ${REDIS_NETWORK_ADDRESS:-172.28.0.3}
|
||||
|
||||
zookeeper:
|
||||
# image: bitnami/zookeeper:3.8
|
||||
image: bitnami/zookeeper:${ZOOKEEPER_IMAGE_VERSION:-3.8}
|
||||
container_name: zookeeper
|
||||
ports:
|
||||
@@ -69,7 +67,6 @@ services:
|
||||
ipv4_address: ${ZOOKEEPER_NETWORK_ADDRESS:-172.28.0.5}
|
||||
|
||||
kafka:
|
||||
# image: 'bitnami/kafka:3.5.1'
|
||||
image: 'bitnami/kafka:${KAFKA_IMAGE_VERSION:-3.5.1}'
|
||||
container_name: kafka
|
||||
restart: always
|
||||
@@ -95,7 +92,6 @@ services:
|
||||
ipv4_address: ${KAFKA_NETWORK_ADDRESS:-172.28.0.4}
|
||||
|
||||
minio:
|
||||
# image: minio/minio
|
||||
image: minio/minio:${MINIO_IMAGE_VERSION:-latest}
|
||||
ports:
|
||||
- "${MINIO_PORT:-10005}:9000"
|
||||
@@ -114,7 +110,6 @@ services:
|
||||
ipv4_address: ${MINIO_NETWORK_ADDRESS:-172.28.0.6}
|
||||
|
||||
openim-web:
|
||||
# image: ${IMAGE_REGISTRY:-ghcr.io/openimsdk}/openim-web:latest
|
||||
image: ${IMAGE_REGISTRY:-ghcr.io/openimsdk}/openim-web:${OPENIM_WEB_IMAGE_VERSION:-latest}
|
||||
container_name: openim-web
|
||||
environment:
|
||||
|
||||
@@ -27,6 +27,7 @@ import (
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/go-playground/validator/v10"
|
||||
"github.com/mitchellh/mapstructure"
|
||||
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
||||
|
||||
|
||||
@@ -16,9 +16,10 @@ package push
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/OpenIMSDK/tools/utils"
|
||||
"sync"
|
||||
|
||||
"github.com/OpenIMSDK/tools/utils"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
|
||||
"github.com/OpenIMSDK/protocol/constant"
|
||||
|
||||
@@ -16,8 +16,10 @@ package friend
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
pbfriend "github.com/OpenIMSDK/protocol/friend"
|
||||
"github.com/OpenIMSDK/tools/utils"
|
||||
|
||||
cbapi "github.com/openimsdk/open-im-server/v3/pkg/callbackstruct"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/http"
|
||||
|
||||
@@ -16,6 +16,7 @@ package friend
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/OpenIMSDK/tools/tx"
|
||||
|
||||
"github.com/OpenIMSDK/protocol/sdkws"
|
||||
|
||||
@@ -16,9 +16,10 @@ package group
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/OpenIMSDK/tools/log"
|
||||
"time"
|
||||
|
||||
"github.com/OpenIMSDK/tools/log"
|
||||
|
||||
"github.com/OpenIMSDK/protocol/constant"
|
||||
"github.com/OpenIMSDK/protocol/group"
|
||||
"github.com/OpenIMSDK/protocol/wrapperspb"
|
||||
|
||||
@@ -17,13 +17,14 @@ package group
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/callbackstruct"
|
||||
"math/big"
|
||||
"math/rand"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/callbackstruct"
|
||||
|
||||
pbconversation "github.com/OpenIMSDK/protocol/conversation"
|
||||
"github.com/OpenIMSDK/protocol/wrapperspb"
|
||||
"github.com/OpenIMSDK/tools/tx"
|
||||
|
||||
@@ -16,16 +16,18 @@ package msg
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
utils2 "github.com/OpenIMSDK/tools/utils"
|
||||
|
||||
"github.com/redis/go-redis/v9"
|
||||
|
||||
cbapi "github.com/openimsdk/open-im-server/v3/pkg/callbackstruct"
|
||||
"github.com/OpenIMSDK/protocol/constant"
|
||||
"github.com/OpenIMSDK/protocol/msg"
|
||||
"github.com/OpenIMSDK/protocol/sdkws"
|
||||
"github.com/OpenIMSDK/tools/errs"
|
||||
"github.com/OpenIMSDK/tools/log"
|
||||
|
||||
cbapi "github.com/openimsdk/open-im-server/v3/pkg/callbackstruct"
|
||||
)
|
||||
|
||||
func (m *msgServer) GetConversationsHasReadAndMaxSeq(ctx context.Context, req *msg.GetConversationsHasReadAndMaxSeqReq) (resp *msg.GetConversationsHasReadAndMaxSeqResp, err error) {
|
||||
@@ -173,7 +175,7 @@ func (m *msgServer) MarkConversationAsRead(
|
||||
m.conversationAndGetRecvID(conversation, req.UserID), seqs, hasReadSeq); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
||||
} else if conversation.ConversationType == constant.SuperGroupChatType ||
|
||||
conversation.ConversationType == constant.NotificationChatType {
|
||||
if req.HasReadSeq > hasReadSeq {
|
||||
@@ -222,4 +224,4 @@ func (m *msgServer) sendMarkAsReadNotification(
|
||||
log.ZWarn(ctx, "send has read Receipt err", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
@@ -16,6 +16,7 @@ package msg
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/OpenIMSDK/protocol/sdkws"
|
||||
"google.golang.org/protobuf/proto"
|
||||
|
||||
@@ -24,6 +25,7 @@ import (
|
||||
"github.com/OpenIMSDK/tools/log"
|
||||
"github.com/OpenIMSDK/tools/mcontext"
|
||||
"github.com/OpenIMSDK/tools/utils"
|
||||
|
||||
cbapi "github.com/openimsdk/open-im-server/v3/pkg/callbackstruct"
|
||||
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
||||
|
||||
@@ -16,6 +16,7 @@ package user
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
pbuser "github.com/OpenIMSDK/protocol/user"
|
||||
"github.com/OpenIMSDK/tools/utils"
|
||||
|
||||
|
||||
+18
-18
@@ -16,11 +16,11 @@ package apistruct
|
||||
|
||||
type PictureBaseInfo struct {
|
||||
UUID string `mapstructure:"uuid"`
|
||||
Type string `mapstructure:"type" validate:"required"`
|
||||
Type string `mapstructure:"type" validate:"required"`
|
||||
Size int64 `mapstructure:"size"`
|
||||
Width int32 `mapstructure:"width" validate:"required"`
|
||||
Width int32 `mapstructure:"width" validate:"required"`
|
||||
Height int32 `mapstructure:"height" validate:"required"`
|
||||
Url string `mapstructure:"url" validate:"required"`
|
||||
Url string `mapstructure:"url" validate:"required"`
|
||||
}
|
||||
|
||||
type PictureElem struct {
|
||||
@@ -34,28 +34,28 @@ type SoundElem struct {
|
||||
SoundPath string `mapstructure:"soundPath"`
|
||||
SourceURL string `mapstructure:"sourceUrl" validate:"required"`
|
||||
DataSize int64 `mapstructure:"dataSize"`
|
||||
Duration int64 `mapstructure:"duration" validate:"required,min=1"`
|
||||
Duration int64 `mapstructure:"duration" validate:"required,min=1"`
|
||||
}
|
||||
type VideoElem struct {
|
||||
VideoPath string `mapstructure:"videoPath" `
|
||||
VideoPath string `mapstructure:"videoPath"`
|
||||
VideoUUID string `mapstructure:"videoUUID"`
|
||||
VideoURL string `mapstructure:"videoUrl" validate:"required"`
|
||||
VideoType string `mapstructure:"videoType" validate:"required"`
|
||||
VideoSize int64 `mapstructure:"videoSize" validate:"required"`
|
||||
Duration int64 `mapstructure:"duration" validate:"required"`
|
||||
VideoURL string `mapstructure:"videoUrl" validate:"required"`
|
||||
VideoType string `mapstructure:"videoType" validate:"required"`
|
||||
VideoSize int64 `mapstructure:"videoSize" validate:"required"`
|
||||
Duration int64 `mapstructure:"duration" validate:"required"`
|
||||
SnapshotPath string `mapstructure:"snapshotPath"`
|
||||
SnapshotUUID string `mapstructure:"snapshotUUID"`
|
||||
SnapshotSize int64 `mapstructure:"snapshotSize"`
|
||||
SnapshotURL string `mapstructure:"snapshotUrl" validate:"required"`
|
||||
SnapshotWidth int32 `mapstructure:"snapshotWidth" validate:"required"`
|
||||
SnapshotURL string `mapstructure:"snapshotUrl" validate:"required"`
|
||||
SnapshotWidth int32 `mapstructure:"snapshotWidth" validate:"required"`
|
||||
SnapshotHeight int32 `mapstructure:"snapshotHeight" validate:"required"`
|
||||
}
|
||||
type FileElem struct {
|
||||
FilePath string `mapstructure:"filePath" `
|
||||
FilePath string `mapstructure:"filePath"`
|
||||
UUID string `mapstructure:"uuid"`
|
||||
SourceURL string `mapstructure:"sourceUrl" validate:"required"`
|
||||
FileName string `mapstructure:"fileName" validate:"required"`
|
||||
FileSize int64 `mapstructure:"fileSize" validate:"required"`
|
||||
FileName string `mapstructure:"fileName" validate:"required"`
|
||||
FileSize int64 `mapstructure:"fileSize" validate:"required"`
|
||||
}
|
||||
type AtElem struct {
|
||||
Text string `mapstructure:"text"`
|
||||
@@ -63,9 +63,9 @@ type AtElem struct {
|
||||
IsAtSelf bool `mapstructure:"isAtSelf"`
|
||||
}
|
||||
type LocationElem struct {
|
||||
Description string `mapstructure:"description" `
|
||||
Longitude float64 `mapstructure:"longitude" validate:"required"`
|
||||
Latitude float64 `mapstructure:"latitude" validate:"required"`
|
||||
Description string `mapstructure:"description"`
|
||||
Longitude float64 `mapstructure:"longitude" validate:"required"`
|
||||
Latitude float64 `mapstructure:"latitude" validate:"required"`
|
||||
}
|
||||
type CustomElem struct {
|
||||
Data string `mapstructure:"data" validate:"required"`
|
||||
@@ -87,7 +87,7 @@ type OANotificationElem struct {
|
||||
NotificationType int32 `mapstructure:"notificationType" json:"notificationType" validate:"required"`
|
||||
Text string `mapstructure:"text" json:"text" validate:"required"`
|
||||
Url string `mapstructure:"url" json:"url"`
|
||||
MixType int32 `mapstructure:"mixType" json:"mixType" validate:"required"`
|
||||
MixType int32 `mapstructure:"mixType" json:"mixType" validate:"required"`
|
||||
PictureElem *PictureElem `mapstructure:"pictureElem" json:"pictureElem"`
|
||||
SoundElem *SoundElem `mapstructure:"soundElem" json:"soundElem"`
|
||||
VideoElem *VideoElem `mapstructure:"videoElem" json:"videoElem"`
|
||||
|
||||
@@ -16,6 +16,7 @@ package config
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
|
||||
"github.com/OpenIMSDK/tools/discoveryregistry"
|
||||
"gopkg.in/yaml.v3"
|
||||
)
|
||||
|
||||
@@ -35,7 +35,7 @@ const (
|
||||
DefaultFolderPath = "../config/"
|
||||
)
|
||||
|
||||
// return absolude path join ../config/, this is k8s container config path
|
||||
// return absolude path join ../config/, this is k8s container config path.
|
||||
func GetDefaultConfigPath() string {
|
||||
b, err := filepath.Abs(os.Args[0])
|
||||
if err != nil {
|
||||
@@ -45,7 +45,7 @@ func GetDefaultConfigPath() string {
|
||||
return filepath.Join(filepath.Dir(b), "../config/")
|
||||
}
|
||||
|
||||
// getProjectRoot returns the absolute path of the project root directory
|
||||
// getProjectRoot returns the absolute path of the project root directory.
|
||||
func GetProjectRoot() string {
|
||||
b, _ := filepath.Abs(os.Args[0])
|
||||
|
||||
|
||||
@@ -15,9 +15,10 @@
|
||||
package convert
|
||||
|
||||
import (
|
||||
"github.com/OpenIMSDK/protocol/sdkws"
|
||||
"time"
|
||||
|
||||
"github.com/OpenIMSDK/protocol/sdkws"
|
||||
|
||||
relationtb "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation"
|
||||
)
|
||||
|
||||
|
||||
Vendored
+1
-1
@@ -39,7 +39,7 @@ const (
|
||||
groupMemberIDsKey = "GROUP_MEMBER_IDS:"
|
||||
groupMembersHashKey = "GROUP_MEMBERS_HASH2:"
|
||||
groupMemberInfoKey = "GROUP_MEMBER_INFO:"
|
||||
//groupOwnerInfoKey = "GROUP_OWNER_INFO:"
|
||||
//groupOwnerInfoKey = "GROUP_OWNER_INFO:".
|
||||
joinedGroupsKey = "JOIN_GROUPS_KEY:"
|
||||
groupMemberNumKey = "GROUP_MEMBER_NUM_CACHE:"
|
||||
groupRoleLevelMemberIDsKey = "GROUP_ROLE_LEVEL_MEMBER_IDS:"
|
||||
|
||||
Vendored
+21
-3
@@ -18,6 +18,8 @@ import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/redis/go-redis/v9"
|
||||
@@ -43,6 +45,9 @@ func NewRedis() (redis.UniversalClient, error) {
|
||||
return redisClient, nil
|
||||
}
|
||||
|
||||
// Read configuration from environment variables
|
||||
overrideConfigFromEnv()
|
||||
|
||||
if len(config.Config.Redis.Address) == 0 {
|
||||
return nil, errors.New("redis address is empty")
|
||||
}
|
||||
@@ -60,9 +65,9 @@ func NewRedis() (redis.UniversalClient, error) {
|
||||
rdb = redis.NewClient(&redis.Options{
|
||||
Addr: config.Config.Redis.Address[0],
|
||||
Username: config.Config.Redis.Username,
|
||||
Password: config.Config.Redis.Password, // no password set
|
||||
DB: 0, // use default DB
|
||||
PoolSize: 100, // connection pool size
|
||||
Password: config.Config.Redis.Password,
|
||||
DB: 0, // use default DB
|
||||
PoolSize: 100, // connection pool size
|
||||
MaxRetries: maxRetry,
|
||||
})
|
||||
}
|
||||
@@ -78,3 +83,16 @@ func NewRedis() (redis.UniversalClient, error) {
|
||||
redisClient = rdb
|
||||
return rdb, err
|
||||
}
|
||||
|
||||
// overrideConfigFromEnv overrides configuration fields with environment variables if present.
|
||||
func overrideConfigFromEnv() {
|
||||
if envAddr := os.Getenv("REDIS_ADDRESS"); envAddr != "" {
|
||||
config.Config.Redis.Address = strings.Split(envAddr, ",") // Assuming addresses are comma-separated
|
||||
}
|
||||
if envUser := os.Getenv("REDIS_USERNAME"); envUser != "" {
|
||||
config.Config.Redis.Username = envUser
|
||||
}
|
||||
if envPass := os.Getenv("REDIS_PASSWORD"); envPass != "" {
|
||||
config.Config.Redis.Password = envPass
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,27 +1,13 @@
|
||||
// Copyright © 2023 OpenIM. All rights reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package unrelation
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"go.mongodb.org/mongo-driver/bson"
|
||||
|
||||
"go.mongodb.org/mongo-driver/mongo"
|
||||
"go.mongodb.org/mongo-driver/mongo/options"
|
||||
|
||||
@@ -34,7 +20,8 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
maxRetry = 10 // number of retries
|
||||
maxRetry = 10 // number of retries
|
||||
mongoConnTimeout = 10 * time.Second
|
||||
)
|
||||
|
||||
type Mongo struct {
|
||||
@@ -44,90 +31,123 @@ type Mongo struct {
|
||||
// NewMongo Initialize MongoDB connection.
|
||||
func NewMongo() (*Mongo, error) {
|
||||
specialerror.AddReplace(mongo.ErrNoDocuments, errs.ErrRecordNotFound)
|
||||
uri := "mongodb://sample.host:27017/?maxPoolSize=20&w=majority"
|
||||
if config.Config.Mongo.Uri != "" {
|
||||
uri = config.Config.Mongo.Uri
|
||||
} else {
|
||||
mongodbHosts := ""
|
||||
for i, v := range config.Config.Mongo.Address {
|
||||
if i == len(config.Config.Mongo.Address)-1 {
|
||||
mongodbHosts += v
|
||||
} else {
|
||||
mongodbHosts += v + ","
|
||||
}
|
||||
}
|
||||
if config.Config.Mongo.Password != "" && config.Config.Mongo.Username != "" {
|
||||
uri = fmt.Sprintf("mongodb://%s:%s@%s/%s?maxPoolSize=%d&authSource=admin",
|
||||
config.Config.Mongo.Username, config.Config.Mongo.Password, mongodbHosts,
|
||||
config.Config.Mongo.Database, config.Config.Mongo.MaxPoolSize)
|
||||
} else {
|
||||
uri = fmt.Sprintf("mongodb://%s/%s/?maxPoolSize=%d&authSource=admin",
|
||||
mongodbHosts, config.Config.Mongo.Database,
|
||||
config.Config.Mongo.MaxPoolSize)
|
||||
}
|
||||
}
|
||||
uri := buildMongoURI()
|
||||
fmt.Println("mongo:", uri)
|
||||
|
||||
var mongoClient *mongo.Client
|
||||
var err error = nil
|
||||
var err error
|
||||
|
||||
// Retry connecting to MongoDB
|
||||
for i := 0; i <= maxRetry; i++ {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), mongoConnTimeout)
|
||||
defer cancel()
|
||||
mongoClient, err = mongo.Connect(ctx, options.Client().ApplyURI(uri))
|
||||
if err == nil {
|
||||
return &Mongo{db: mongoClient}, nil
|
||||
}
|
||||
if cmdErr, ok := err.(mongo.CommandError); ok {
|
||||
if cmdErr.Code == 13 || cmdErr.Code == 18 {
|
||||
return nil, err
|
||||
} else {
|
||||
fmt.Printf("Failed to connect to MongoDB: %s\n", err)
|
||||
}
|
||||
if shouldRetry(err) {
|
||||
fmt.Printf("Failed to connect to MongoDB, retrying: %s\n", err)
|
||||
time.Sleep(time.Second) // exponential backoff could be implemented here
|
||||
continue
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
func buildMongoURI() string {
|
||||
uri := os.Getenv("MONGO_URI")
|
||||
if uri != "" {
|
||||
return uri
|
||||
}
|
||||
|
||||
username := os.Getenv("MONGO_USERNAME")
|
||||
password := os.Getenv("MONGO_PASSWORD")
|
||||
address := os.Getenv("MONGO_ADDRESS")
|
||||
port := os.Getenv("MONGO_PORT")
|
||||
database := os.Getenv("MONGO_DATABASE")
|
||||
maxPoolSize := os.Getenv("MONGO_MAX_POOL_SIZE")
|
||||
|
||||
if username == "" {
|
||||
username = config.Config.Mongo.Username
|
||||
}
|
||||
if password == "" {
|
||||
password = config.Config.Mongo.Password
|
||||
}
|
||||
if address == "" {
|
||||
address = strings.Join(config.Config.Mongo.Address, ",")
|
||||
} else if port != "" {
|
||||
address = fmt.Sprintf("%s:%s", address, port)
|
||||
}
|
||||
if database == "" {
|
||||
database = config.Config.Mongo.Database
|
||||
}
|
||||
if maxPoolSize == "" {
|
||||
maxPoolSize = fmt.Sprint(config.Config.Mongo.MaxPoolSize)
|
||||
}
|
||||
|
||||
uriFormat := "mongodb://%s/%s?maxPoolSize=%s&authSource=admin"
|
||||
if username != "" && password != "" {
|
||||
uriFormat = "mongodb://%s:%s@%s/%s?maxPoolSize=%s&authSource=admin"
|
||||
return fmt.Sprintf(uriFormat, username, password, address, database, maxPoolSize)
|
||||
}
|
||||
return fmt.Sprintf(uriFormat, address, database, maxPoolSize)
|
||||
}
|
||||
|
||||
func shouldRetry(err error) bool {
|
||||
if cmdErr, ok := err.(mongo.CommandError); ok {
|
||||
return cmdErr.Code != 13 && cmdErr.Code != 18
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// GetClient returns the MongoDB client.
|
||||
func (m *Mongo) GetClient() *mongo.Client {
|
||||
return m.db
|
||||
}
|
||||
|
||||
// GetDatabase returns the specific database from MongoDB.
|
||||
func (m *Mongo) GetDatabase() *mongo.Database {
|
||||
return m.db.Database(config.Config.Mongo.Database)
|
||||
}
|
||||
|
||||
// CreateMsgIndex creates an index for messages in MongoDB.
|
||||
func (m *Mongo) CreateMsgIndex() error {
|
||||
return m.createMongoIndex(unrelation.Msg, true, "doc_id")
|
||||
}
|
||||
|
||||
// createMongoIndex creates an index in a MongoDB collection.
|
||||
func (m *Mongo) createMongoIndex(collection string, isUnique bool, keys ...string) error {
|
||||
db := m.db.Database(config.Config.Mongo.Database).Collection(collection)
|
||||
db := m.GetDatabase().Collection(collection)
|
||||
opts := options.CreateIndexes().SetMaxTime(10 * time.Second)
|
||||
indexView := db.Indexes()
|
||||
keysDoc := bson.D{}
|
||||
// create composite indexes
|
||||
for _, key := range keys {
|
||||
if strings.HasPrefix(key, "-") {
|
||||
keysDoc = append(keysDoc, bson.E{Key: strings.TrimLeft(key, "-"), Value: -1})
|
||||
// keysDoc = keysDoc.Append(strings.TrimLeft(key, "-"), bsonx.Int32(-1))
|
||||
} else {
|
||||
keysDoc = append(keysDoc, bson.E{Key: key, Value: 1})
|
||||
// keysDoc = keysDoc.Append(key, bsonx.Int32(1))
|
||||
}
|
||||
}
|
||||
// create index
|
||||
|
||||
keysDoc := buildIndexKeys(keys)
|
||||
|
||||
index := mongo.IndexModel{
|
||||
Keys: keysDoc,
|
||||
}
|
||||
if isUnique {
|
||||
index.Options = options.Index().SetUnique(true)
|
||||
}
|
||||
result, err := indexView.CreateOne(
|
||||
context.Background(),
|
||||
index,
|
||||
opts,
|
||||
)
|
||||
|
||||
_, err := indexView.CreateOne(context.Background(), index, opts)
|
||||
if err != nil {
|
||||
return utils.Wrap(err, result)
|
||||
return utils.Wrap(err, "CreateIndex")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// buildIndexKeys builds the BSON document for index keys.
|
||||
func buildIndexKeys(keys []string) bson.D {
|
||||
keysDoc := bson.D{}
|
||||
for _, key := range keys {
|
||||
direction := 1 // default direction is ascending
|
||||
if strings.HasPrefix(key, "-") {
|
||||
direction = -1 // descending order for prefixed with "-"
|
||||
key = strings.TrimLeft(key, "-")
|
||||
}
|
||||
keysDoc = append(keysDoc, bson.E{Key: key, Value: direction})
|
||||
}
|
||||
return keysDoc
|
||||
}
|
||||
|
||||
@@ -1,93 +1,22 @@
|
||||
package discoveryregister
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister/kubernetes"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister/zookeeper"
|
||||
|
||||
"github.com/OpenIMSDK/tools/discoveryregistry"
|
||||
openkeeper "github.com/OpenIMSDK/tools/discoveryregistry/zookeeper"
|
||||
"github.com/OpenIMSDK/tools/log"
|
||||
"google.golang.org/grpc"
|
||||
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
||||
)
|
||||
|
||||
// NewDiscoveryRegister creates a new service discovery and registry client based on the provided environment type.
|
||||
func NewDiscoveryRegister(envType string) (discoveryregistry.SvcDiscoveryRegistry, error) {
|
||||
var client discoveryregistry.SvcDiscoveryRegistry
|
||||
var err error
|
||||
switch envType {
|
||||
case "zookeeper":
|
||||
client, err = openkeeper.NewClient(config.Config.Zookeeper.ZkAddr, config.Config.Zookeeper.Schema,
|
||||
openkeeper.WithFreq(time.Hour), openkeeper.WithUserNameAndPassword(
|
||||
config.Config.Zookeeper.Username,
|
||||
config.Config.Zookeeper.Password,
|
||||
), openkeeper.WithRoundRobin(), openkeeper.WithTimeout(10), openkeeper.WithLogger(log.NewZkLogger()))
|
||||
return zookeeper.NewZookeeperDiscoveryRegister()
|
||||
case "k8s":
|
||||
client, err = NewK8sDiscoveryRegister()
|
||||
return kubernetes.NewK8sDiscoveryRegister()
|
||||
default:
|
||||
client = nil
|
||||
err = errors.New("envType not correct")
|
||||
return nil, errors.New("envType not correct")
|
||||
}
|
||||
return client, err
|
||||
}
|
||||
|
||||
type K8sDR struct {
|
||||
options []grpc.DialOption
|
||||
rpcRegisterAddr string
|
||||
}
|
||||
|
||||
func NewK8sDiscoveryRegister() (discoveryregistry.SvcDiscoveryRegistry, error) {
|
||||
return &K8sDR{}, nil
|
||||
}
|
||||
|
||||
func (cli *K8sDR) Register(serviceName, host string, port int, opts ...grpc.DialOption) error {
|
||||
cli.rpcRegisterAddr = serviceName
|
||||
return nil
|
||||
}
|
||||
func (cli *K8sDR) UnRegister() error {
|
||||
|
||||
return nil
|
||||
}
|
||||
func (cli *K8sDR) CreateRpcRootNodes(serviceNames []string) error {
|
||||
|
||||
return nil
|
||||
}
|
||||
func (cli *K8sDR) RegisterConf2Registry(key string, conf []byte) error {
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (cli *K8sDR) GetConfFromRegistry(key string) ([]byte, error) {
|
||||
|
||||
return nil, nil
|
||||
}
|
||||
func (cli *K8sDR) GetConns(ctx context.Context, serviceName string, opts ...grpc.DialOption) ([]*grpc.ClientConn, error) {
|
||||
|
||||
conn, err := grpc.DialContext(ctx, serviceName, append(cli.options, opts...)...)
|
||||
return []*grpc.ClientConn{conn}, err
|
||||
}
|
||||
func (cli *K8sDR) GetConn(ctx context.Context, serviceName string, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
|
||||
|
||||
return grpc.DialContext(ctx, serviceName, append(cli.options, opts...)...)
|
||||
}
|
||||
func (cli *K8sDR) GetSelfConnTarget() string {
|
||||
|
||||
return cli.rpcRegisterAddr
|
||||
}
|
||||
func (cli *K8sDR) AddOption(opts ...grpc.DialOption) {
|
||||
cli.options = append(cli.options, opts...)
|
||||
}
|
||||
func (cli *K8sDR) CloseConn(conn *grpc.ClientConn) {
|
||||
conn.Close()
|
||||
}
|
||||
|
||||
// do not use this method for call rpc
|
||||
func (cli *K8sDR) GetClientLocalConns() map[string][]*grpc.ClientConn {
|
||||
fmt.Println("should not call this function!!!!!!!!!!!!!!!!!!!!!!!!!")
|
||||
return nil
|
||||
}
|
||||
func (cli *K8sDR) Close() {
|
||||
return
|
||||
}
|
||||
|
||||
@@ -1,407 +1,45 @@
|
||||
package discoveryregister
|
||||
|
||||
import (
|
||||
"context"
|
||||
"reflect"
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
"github.com/OpenIMSDK/tools/discoveryregistry"
|
||||
"google.golang.org/grpc"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func setupTestEnvironment() {
|
||||
os.Setenv("ZOOKEEPER_SCHEMA", "openim")
|
||||
os.Setenv("ZOOKEEPER_ADDRESS", "172.28.0.1:12181")
|
||||
os.Setenv("ZOOKEEPER_USERNAME", "")
|
||||
os.Setenv("ZOOKEEPER_PASSWORD", "")
|
||||
}
|
||||
|
||||
func TestNewDiscoveryRegister(t *testing.T) {
|
||||
type args struct {
|
||||
envType string
|
||||
}
|
||||
tests := []struct {
|
||||
name string
|
||||
args args
|
||||
want discoveryregistry.SvcDiscoveryRegistry
|
||||
wantErr bool
|
||||
}{
|
||||
// TODO: Add test cases.
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
got, err := NewDiscoveryRegister(tt.args.envType)
|
||||
if (err != nil) != tt.wantErr {
|
||||
t.Errorf("NewDiscoveryRegister() error = %v, wantErr %v", err, tt.wantErr)
|
||||
return
|
||||
}
|
||||
if !reflect.DeepEqual(got, tt.want) {
|
||||
t.Errorf("NewDiscoveryRegister() = %v, want %v", got, tt.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
setupTestEnvironment()
|
||||
|
||||
func TestNewK8sDiscoveryRegister(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
want discoveryregistry.SvcDiscoveryRegistry
|
||||
wantErr bool
|
||||
envType string
|
||||
expectedError bool
|
||||
expectedResult bool
|
||||
}{
|
||||
// TODO: Add test cases.
|
||||
{"zookeeper", false, true},
|
||||
{"k8s", false, true}, // 假设 k8s 配置也已正确设置
|
||||
{"invalid", true, false},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
got, err := NewK8sDiscoveryRegister()
|
||||
if (err != nil) != tt.wantErr {
|
||||
t.Errorf("NewK8sDiscoveryRegister() error = %v, wantErr %v", err, tt.wantErr)
|
||||
return
|
||||
}
|
||||
if !reflect.DeepEqual(got, tt.want) {
|
||||
t.Errorf("NewK8sDiscoveryRegister() = %v, want %v", got, tt.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestK8sDR_Register(t *testing.T) {
|
||||
type fields struct {
|
||||
options []grpc.DialOption
|
||||
rpcRegisterAddr string
|
||||
}
|
||||
type args struct {
|
||||
serviceName string
|
||||
host string
|
||||
port int
|
||||
opts []grpc.DialOption
|
||||
}
|
||||
tests := []struct {
|
||||
name string
|
||||
fields fields
|
||||
args args
|
||||
wantErr bool
|
||||
}{
|
||||
// TODO: Add test cases.
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
cli := &K8sDR{
|
||||
options: tt.fields.options,
|
||||
rpcRegisterAddr: tt.fields.rpcRegisterAddr,
|
||||
}
|
||||
if err := cli.Register(tt.args.serviceName, tt.args.host, tt.args.port, tt.args.opts...); (err != nil) != tt.wantErr {
|
||||
t.Errorf("K8sDR.Register() error = %v, wantErr %v", err, tt.wantErr)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
for _, test := range tests {
|
||||
client, err := NewDiscoveryRegister(test.envType)
|
||||
|
||||
func TestK8sDR_UnRegister(t *testing.T) {
|
||||
type fields struct {
|
||||
options []grpc.DialOption
|
||||
rpcRegisterAddr string
|
||||
}
|
||||
tests := []struct {
|
||||
name string
|
||||
fields fields
|
||||
wantErr bool
|
||||
}{
|
||||
// TODO: Add test cases.
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
cli := &K8sDR{
|
||||
options: tt.fields.options,
|
||||
rpcRegisterAddr: tt.fields.rpcRegisterAddr,
|
||||
if test.expectedError {
|
||||
assert.Error(t, err)
|
||||
} else {
|
||||
assert.NoError(t, err)
|
||||
if test.expectedResult {
|
||||
assert.Implements(t, (*discoveryregistry.SvcDiscoveryRegistry)(nil), client)
|
||||
} else {
|
||||
assert.Nil(t, client)
|
||||
}
|
||||
if err := cli.UnRegister(); (err != nil) != tt.wantErr {
|
||||
t.Errorf("K8sDR.UnRegister() error = %v, wantErr %v", err, tt.wantErr)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestK8sDR_CreateRpcRootNodes(t *testing.T) {
|
||||
type fields struct {
|
||||
options []grpc.DialOption
|
||||
rpcRegisterAddr string
|
||||
}
|
||||
type args struct {
|
||||
serviceNames []string
|
||||
}
|
||||
tests := []struct {
|
||||
name string
|
||||
fields fields
|
||||
args args
|
||||
wantErr bool
|
||||
}{
|
||||
// TODO: Add test cases.
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
cli := &K8sDR{
|
||||
options: tt.fields.options,
|
||||
rpcRegisterAddr: tt.fields.rpcRegisterAddr,
|
||||
}
|
||||
if err := cli.CreateRpcRootNodes(tt.args.serviceNames); (err != nil) != tt.wantErr {
|
||||
t.Errorf("K8sDR.CreateRpcRootNodes() error = %v, wantErr %v", err, tt.wantErr)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestK8sDR_RegisterConf2Registry(t *testing.T) {
|
||||
type fields struct {
|
||||
options []grpc.DialOption
|
||||
rpcRegisterAddr string
|
||||
}
|
||||
type args struct {
|
||||
key string
|
||||
conf []byte
|
||||
}
|
||||
tests := []struct {
|
||||
name string
|
||||
fields fields
|
||||
args args
|
||||
wantErr bool
|
||||
}{
|
||||
// TODO: Add test cases.
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
cli := &K8sDR{
|
||||
options: tt.fields.options,
|
||||
rpcRegisterAddr: tt.fields.rpcRegisterAddr,
|
||||
}
|
||||
if err := cli.RegisterConf2Registry(tt.args.key, tt.args.conf); (err != nil) != tt.wantErr {
|
||||
t.Errorf("K8sDR.RegisterConf2Registry() error = %v, wantErr %v", err, tt.wantErr)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestK8sDR_GetConfFromRegistry(t *testing.T) {
|
||||
type fields struct {
|
||||
options []grpc.DialOption
|
||||
rpcRegisterAddr string
|
||||
}
|
||||
type args struct {
|
||||
key string
|
||||
}
|
||||
tests := []struct {
|
||||
name string
|
||||
fields fields
|
||||
args args
|
||||
want []byte
|
||||
wantErr bool
|
||||
}{
|
||||
// TODO: Add test cases.
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
cli := &K8sDR{
|
||||
options: tt.fields.options,
|
||||
rpcRegisterAddr: tt.fields.rpcRegisterAddr,
|
||||
}
|
||||
got, err := cli.GetConfFromRegistry(tt.args.key)
|
||||
if (err != nil) != tt.wantErr {
|
||||
t.Errorf("K8sDR.GetConfFromRegistry() error = %v, wantErr %v", err, tt.wantErr)
|
||||
return
|
||||
}
|
||||
if !reflect.DeepEqual(got, tt.want) {
|
||||
t.Errorf("K8sDR.GetConfFromRegistry() = %v, want %v", got, tt.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestK8sDR_GetConns(t *testing.T) {
|
||||
type fields struct {
|
||||
options []grpc.DialOption
|
||||
rpcRegisterAddr string
|
||||
}
|
||||
type args struct {
|
||||
ctx context.Context
|
||||
serviceName string
|
||||
opts []grpc.DialOption
|
||||
}
|
||||
tests := []struct {
|
||||
name string
|
||||
fields fields
|
||||
args args
|
||||
want []*grpc.ClientConn
|
||||
wantErr bool
|
||||
}{
|
||||
// TODO: Add test cases.
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
cli := &K8sDR{
|
||||
options: tt.fields.options,
|
||||
rpcRegisterAddr: tt.fields.rpcRegisterAddr,
|
||||
}
|
||||
got, err := cli.GetConns(tt.args.ctx, tt.args.serviceName, tt.args.opts...)
|
||||
if (err != nil) != tt.wantErr {
|
||||
t.Errorf("K8sDR.GetConns() error = %v, wantErr %v", err, tt.wantErr)
|
||||
return
|
||||
}
|
||||
if !reflect.DeepEqual(got, tt.want) {
|
||||
t.Errorf("K8sDR.GetConns() = %v, want %v", got, tt.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestK8sDR_GetConn(t *testing.T) {
|
||||
type fields struct {
|
||||
options []grpc.DialOption
|
||||
rpcRegisterAddr string
|
||||
}
|
||||
type args struct {
|
||||
ctx context.Context
|
||||
serviceName string
|
||||
opts []grpc.DialOption
|
||||
}
|
||||
tests := []struct {
|
||||
name string
|
||||
fields fields
|
||||
args args
|
||||
want *grpc.ClientConn
|
||||
wantErr bool
|
||||
}{
|
||||
// TODO: Add test cases.
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
cli := &K8sDR{
|
||||
options: tt.fields.options,
|
||||
rpcRegisterAddr: tt.fields.rpcRegisterAddr,
|
||||
}
|
||||
got, err := cli.GetConn(tt.args.ctx, tt.args.serviceName, tt.args.opts...)
|
||||
if (err != nil) != tt.wantErr {
|
||||
t.Errorf("K8sDR.GetConn() error = %v, wantErr %v", err, tt.wantErr)
|
||||
return
|
||||
}
|
||||
if !reflect.DeepEqual(got, tt.want) {
|
||||
t.Errorf("K8sDR.GetConn() = %v, want %v", got, tt.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestK8sDR_GetSelfConnTarget(t *testing.T) {
|
||||
type fields struct {
|
||||
options []grpc.DialOption
|
||||
rpcRegisterAddr string
|
||||
}
|
||||
tests := []struct {
|
||||
name string
|
||||
fields fields
|
||||
want string
|
||||
}{
|
||||
// TODO: Add test cases.
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
cli := &K8sDR{
|
||||
options: tt.fields.options,
|
||||
rpcRegisterAddr: tt.fields.rpcRegisterAddr,
|
||||
}
|
||||
if got := cli.GetSelfConnTarget(); got != tt.want {
|
||||
t.Errorf("K8sDR.GetSelfConnTarget() = %v, want %v", got, tt.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestK8sDR_AddOption(t *testing.T) {
|
||||
type fields struct {
|
||||
options []grpc.DialOption
|
||||
rpcRegisterAddr string
|
||||
}
|
||||
type args struct {
|
||||
opts []grpc.DialOption
|
||||
}
|
||||
tests := []struct {
|
||||
name string
|
||||
fields fields
|
||||
args args
|
||||
}{
|
||||
// TODO: Add test cases.
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
cli := &K8sDR{
|
||||
options: tt.fields.options,
|
||||
rpcRegisterAddr: tt.fields.rpcRegisterAddr,
|
||||
}
|
||||
cli.AddOption(tt.args.opts...)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestK8sDR_CloseConn(t *testing.T) {
|
||||
type fields struct {
|
||||
options []grpc.DialOption
|
||||
rpcRegisterAddr string
|
||||
}
|
||||
type args struct {
|
||||
conn *grpc.ClientConn
|
||||
}
|
||||
tests := []struct {
|
||||
name string
|
||||
fields fields
|
||||
args args
|
||||
}{
|
||||
// TODO: Add test cases.
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
cli := &K8sDR{
|
||||
options: tt.fields.options,
|
||||
rpcRegisterAddr: tt.fields.rpcRegisterAddr,
|
||||
}
|
||||
cli.CloseConn(tt.args.conn)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestK8sDR_GetClientLocalConns(t *testing.T) {
|
||||
type fields struct {
|
||||
options []grpc.DialOption
|
||||
rpcRegisterAddr string
|
||||
}
|
||||
tests := []struct {
|
||||
name string
|
||||
fields fields
|
||||
want map[string][]*grpc.ClientConn
|
||||
}{
|
||||
// TODO: Add test cases.
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
cli := &K8sDR{
|
||||
options: tt.fields.options,
|
||||
rpcRegisterAddr: tt.fields.rpcRegisterAddr,
|
||||
}
|
||||
if got := cli.GetClientLocalConns(); !reflect.DeepEqual(got, tt.want) {
|
||||
t.Errorf("K8sDR.GetClientLocalConns() = %v, want %v", got, tt.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestK8sDR_Close(t *testing.T) {
|
||||
type fields struct {
|
||||
options []grpc.DialOption
|
||||
rpcRegisterAddr string
|
||||
}
|
||||
tests := []struct {
|
||||
name string
|
||||
fields fields
|
||||
}{
|
||||
// TODO: Add test cases.
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
cli := &K8sDR{
|
||||
options: tt.fields.options,
|
||||
rpcRegisterAddr: tt.fields.rpcRegisterAddr,
|
||||
}
|
||||
cli.Close()
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,92 @@
|
||||
package kubernetes
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
|
||||
"github.com/OpenIMSDK/tools/discoveryregistry"
|
||||
)
|
||||
|
||||
// K8sDR represents the Kubernetes service discovery and registration client.
|
||||
type K8sDR struct {
|
||||
options []grpc.DialOption
|
||||
rpcRegisterAddr string
|
||||
}
|
||||
|
||||
// NewK8sDiscoveryRegister creates a new instance of K8sDR for Kubernetes service discovery and registration.
|
||||
func NewK8sDiscoveryRegister() (discoveryregistry.SvcDiscoveryRegistry, error) {
|
||||
|
||||
return &K8sDR{}, nil
|
||||
}
|
||||
|
||||
// Register registers a service with Kubernetes.
|
||||
func (cli *K8sDR) Register(serviceName, host string, port int, opts ...grpc.DialOption) error {
|
||||
cli.rpcRegisterAddr = serviceName
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// UnRegister removes a service registration from Kubernetes.
|
||||
func (cli *K8sDR) UnRegister() error {
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// CreateRpcRootNodes creates root nodes for RPC in Kubernetes.
|
||||
func (cli *K8sDR) CreateRpcRootNodes(serviceNames []string) error {
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// RegisterConf2Registry registers a configuration to the registry.
|
||||
func (cli *K8sDR) RegisterConf2Registry(key string, conf []byte) error {
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetConfFromRegistry retrieves a configuration from the registry.
|
||||
func (cli *K8sDR) GetConfFromRegistry(key string) ([]byte, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// GetConns returns a list of gRPC client connections for a given service.
|
||||
func (cli *K8sDR) GetConns(ctx context.Context, serviceName string, opts ...grpc.DialOption) ([]*grpc.ClientConn, error) {
|
||||
conn, err := grpc.DialContext(ctx, serviceName, append(cli.options, opts...)...)
|
||||
return []*grpc.ClientConn{conn}, err
|
||||
}
|
||||
|
||||
// GetConn returns a single gRPC client connection for a given service.
|
||||
func (cli *K8sDR) GetConn(ctx context.Context, serviceName string, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
|
||||
return grpc.DialContext(ctx, serviceName, append(cli.options, opts...)...)
|
||||
}
|
||||
|
||||
// GetSelfConnTarget returns the connection target of the client itself.
|
||||
func (cli *K8sDR) GetSelfConnTarget() string {
|
||||
return cli.rpcRegisterAddr
|
||||
}
|
||||
|
||||
// AddOption adds gRPC dial options to the client.
|
||||
func (cli *K8sDR) AddOption(opts ...grpc.DialOption) {
|
||||
cli.options = append(cli.options, opts...)
|
||||
}
|
||||
|
||||
// CloseConn closes a given gRPC client connection.
|
||||
func (cli *K8sDR) CloseConn(conn *grpc.ClientConn) {
|
||||
conn.Close()
|
||||
}
|
||||
|
||||
// do not use this method for call rpc.
|
||||
func (cli *K8sDR) GetClientLocalConns() map[string][]*grpc.ClientConn {
|
||||
fmt.Println("should not call this function!!!!!!!!!!!!!!!!!!!!!!!!!")
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Close closes the K8sDR client.
|
||||
func (cli *K8sDR) Close() {
|
||||
|
||||
// Close any open resources here (if applicable)
|
||||
return
|
||||
}
|
||||
@@ -0,0 +1,47 @@
|
||||
package zookeeper
|
||||
|
||||
import (
|
||||
"os"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/OpenIMSDK/tools/discoveryregistry"
|
||||
openkeeper "github.com/OpenIMSDK/tools/discoveryregistry/zookeeper"
|
||||
"github.com/OpenIMSDK/tools/log"
|
||||
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
||||
)
|
||||
|
||||
// NewZookeeperDiscoveryRegister creates a new instance of ZookeeperDR for Zookeeper service discovery and registration.
|
||||
func NewZookeeperDiscoveryRegister() (discoveryregistry.SvcDiscoveryRegistry, error) {
|
||||
schema := getEnv("ZOOKEEPER_SCHEMA", config.Config.Zookeeper.Schema)
|
||||
zkAddr := getZkAddrFromEnv(config.Config.Zookeeper.ZkAddr)
|
||||
username := getEnv("ZOOKEEPER_USERNAME", config.Config.Zookeeper.Username)
|
||||
password := getEnv("ZOOKEEPER_PASSWORD", config.Config.Zookeeper.Password)
|
||||
|
||||
return openkeeper.NewClient(
|
||||
zkAddr,
|
||||
schema,
|
||||
openkeeper.WithFreq(time.Hour),
|
||||
openkeeper.WithUserNameAndPassword(username, password),
|
||||
openkeeper.WithRoundRobin(),
|
||||
openkeeper.WithTimeout(10),
|
||||
openkeeper.WithLogger(log.NewZkLogger()),
|
||||
)
|
||||
}
|
||||
|
||||
// getEnv returns the value of an environment variable if it exists, otherwise it returns the fallback value.
|
||||
func getEnv(key, fallback string) string {
|
||||
if value, exists := os.LookupEnv(key); exists {
|
||||
return value
|
||||
}
|
||||
return fallback
|
||||
}
|
||||
|
||||
// getZkAddrFromEnv returns the value of an environment variable if it exists, otherwise it returns the fallback value.
|
||||
func getZkAddrFromEnv(fallback []string) []string {
|
||||
if value, exists := os.LookupEnv("ZOOKEEPER_ADDRESS"); exists {
|
||||
return strings.Split(value, ",")
|
||||
}
|
||||
return fallback
|
||||
}
|
||||
@@ -1,17 +1,3 @@
|
||||
// Copyright © 2023 OpenIM. All rights reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package kafka
|
||||
|
||||
import (
|
||||
@@ -21,19 +7,18 @@ import (
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/IBM/sarama"
|
||||
"github.com/OpenIMSDK/protocol/constant"
|
||||
log "github.com/OpenIMSDK/tools/log"
|
||||
"github.com/OpenIMSDK/tools/log"
|
||||
"github.com/OpenIMSDK/tools/mcontext"
|
||||
"github.com/OpenIMSDK/tools/utils"
|
||||
"google.golang.org/protobuf/proto"
|
||||
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
||||
|
||||
"github.com/IBM/sarama"
|
||||
"google.golang.org/protobuf/proto"
|
||||
)
|
||||
|
||||
const (
|
||||
maxRetry = 10 // number of retries
|
||||
maxRetry = 10 // Maximum number of retries for producer creation
|
||||
)
|
||||
|
||||
var errEmptyMsg = errors.New("binary msg is empty")
|
||||
@@ -45,62 +30,85 @@ type Producer struct {
|
||||
producer sarama.SyncProducer
|
||||
}
|
||||
|
||||
// NewKafkaProducer Initialize kafka producer.
|
||||
// NewKafkaProducer initializes a new Kafka producer.
|
||||
func NewKafkaProducer(addr []string, topic string) *Producer {
|
||||
p := Producer{}
|
||||
p.config = sarama.NewConfig() // Instantiate a sarama Config
|
||||
p.config.Producer.Return.Successes = true // Whether to enable the successes channel to be notified after the message is sent successfully
|
||||
p := Producer{
|
||||
addr: addr,
|
||||
topic: topic,
|
||||
config: sarama.NewConfig(),
|
||||
}
|
||||
|
||||
// Set producer return flags
|
||||
p.config.Producer.Return.Successes = true
|
||||
p.config.Producer.Return.Errors = true
|
||||
p.config.Producer.Partitioner = sarama.NewHashPartitioner // Set the hash-key automatic hash partition. When sending a message, you must specify the key value of the message. If there is no key, the partition will be selected randomly
|
||||
|
||||
var producerAck = sarama.WaitForAll // default: WaitForAll
|
||||
switch strings.ToLower(config.Config.Kafka.ProducerAck) {
|
||||
case "no_response":
|
||||
producerAck = sarama.NoResponse
|
||||
case "wait_for_local":
|
||||
producerAck = sarama.WaitForLocal
|
||||
case "wait_for_all":
|
||||
producerAck = sarama.WaitForAll
|
||||
}
|
||||
p.config.Producer.RequiredAcks = producerAck
|
||||
// Set partitioner strategy
|
||||
p.config.Producer.Partitioner = sarama.NewHashPartitioner
|
||||
|
||||
var compress = sarama.CompressionNone // default: no compress
|
||||
_ = compress.UnmarshalText(bytes.ToLower([]byte(config.Config.Kafka.CompressType)))
|
||||
p.config.Producer.Compression = compress
|
||||
// Configure producer acknowledgement level
|
||||
configureProducerAck(&p, config.Config.Kafka.ProducerAck)
|
||||
|
||||
if config.Config.Kafka.Username != "" && config.Config.Kafka.Password != "" {
|
||||
// Configure message compression
|
||||
configureCompression(&p, config.Config.Kafka.CompressType)
|
||||
|
||||
// Get Kafka configuration from environment variables or fallback to config file
|
||||
kafkaUsername := getEnvOrConfig("KAFKA_USERNAME", config.Config.Kafka.Username)
|
||||
kafkaPassword := getEnvOrConfig("KAFKA_PASSWORD", config.Config.Kafka.Password)
|
||||
kafkaAddr := getEnvOrConfig("KAFKA_ADDRESS", addr[0]) // Assuming addr[0] contains address from config
|
||||
|
||||
// Configure SASL authentication if credentials are provided
|
||||
if kafkaUsername != "" && kafkaPassword != "" {
|
||||
p.config.Net.SASL.Enable = true
|
||||
p.config.Net.SASL.User = config.Config.Kafka.Username
|
||||
p.config.Net.SASL.Password = config.Config.Kafka.Password
|
||||
p.config.Net.SASL.User = kafkaUsername
|
||||
p.config.Net.SASL.Password = kafkaPassword
|
||||
}
|
||||
p.addr = addr
|
||||
p.topic = topic
|
||||
|
||||
// Set the Kafka address
|
||||
p.addr = []string{kafkaAddr}
|
||||
|
||||
// Set up TLS configuration (if required)
|
||||
SetupTLSConfig(p.config)
|
||||
var producer sarama.SyncProducer
|
||||
|
||||
// Create the producer with retries
|
||||
var err error
|
||||
for i := 0; i <= maxRetry; i++ {
|
||||
producer, err = sarama.NewSyncProducer(p.addr, p.config) // Initialize the client
|
||||
p.producer, err = sarama.NewSyncProducer(p.addr, p.config)
|
||||
if err == nil {
|
||||
p.producer = producer
|
||||
return &p
|
||||
}
|
||||
//TODO If the password is wrong, exit directly
|
||||
//if packetErr, ok := err.(*sarama.PacketEncodingError); ok {
|
||||
//if _, ok := packetErr.Err.(sarama.AuthenticationError); ok {
|
||||
// fmt.Println("Kafka password is wrong.")
|
||||
//}
|
||||
//} else {
|
||||
// fmt.Printf("Failed to create Kafka producer: %v\n", err)
|
||||
//}
|
||||
time.Sleep(time.Duration(1) * time.Second)
|
||||
time.Sleep(1 * time.Second) // Wait before retrying
|
||||
}
|
||||
|
||||
// Panic if unable to create producer after retries
|
||||
if err != nil {
|
||||
panic(err.Error())
|
||||
panic("Failed to create Kafka producer: " + err.Error())
|
||||
}
|
||||
p.producer = producer
|
||||
|
||||
return &p
|
||||
}
|
||||
|
||||
// configureProducerAck configures the producer's acknowledgement level.
|
||||
func configureProducerAck(p *Producer, ackConfig string) {
|
||||
switch strings.ToLower(ackConfig) {
|
||||
case "no_response":
|
||||
p.config.Producer.RequiredAcks = sarama.NoResponse
|
||||
case "wait_for_local":
|
||||
p.config.Producer.RequiredAcks = sarama.WaitForLocal
|
||||
case "wait_for_all":
|
||||
p.config.Producer.RequiredAcks = sarama.WaitForAll
|
||||
default:
|
||||
p.config.Producer.RequiredAcks = sarama.WaitForAll
|
||||
}
|
||||
}
|
||||
|
||||
// configureCompression configures the message compression type for the producer.
|
||||
func configureCompression(p *Producer, compressType string) {
|
||||
var compress sarama.CompressionCodec = sarama.CompressionNone
|
||||
compress.UnmarshalText(bytes.ToLower([]byte(compressType)))
|
||||
p.config.Producer.Compression = compress
|
||||
}
|
||||
|
||||
// GetMQHeaderWithContext extracts message queue headers from the context.
|
||||
func GetMQHeaderWithContext(ctx context.Context) ([]sarama.RecordHeader, error) {
|
||||
operationID, opUserID, platform, connID, err := mcontext.GetCtxInfos(ctx)
|
||||
if err != nil {
|
||||
@@ -111,22 +119,23 @@ func GetMQHeaderWithContext(ctx context.Context) ([]sarama.RecordHeader, error)
|
||||
{Key: []byte(constant.OpUserID), Value: []byte(opUserID)},
|
||||
{Key: []byte(constant.OpUserPlatform), Value: []byte(platform)},
|
||||
{Key: []byte(constant.ConnID), Value: []byte(connID)},
|
||||
}, err
|
||||
}, nil
|
||||
}
|
||||
|
||||
// GetContextWithMQHeader creates a context from message queue headers.
|
||||
func GetContextWithMQHeader(header []*sarama.RecordHeader) context.Context {
|
||||
var values []string
|
||||
for _, recordHeader := range header {
|
||||
values = append(values, string(recordHeader.Value))
|
||||
}
|
||||
return mcontext.WithMustInfoCtx(values) // TODO
|
||||
return mcontext.WithMustInfoCtx(values) // Attach extracted values to context
|
||||
}
|
||||
|
||||
// SendMessage sends a message to the Kafka topic configured in the Producer.
|
||||
func (p *Producer) SendMessage(ctx context.Context, key string, msg proto.Message) (int32, int64, error) {
|
||||
log.ZDebug(ctx, "SendMessage", "msg", msg, "topic", p.topic, "key", key)
|
||||
kMsg := &sarama.ProducerMessage{}
|
||||
kMsg.Topic = p.topic
|
||||
kMsg.Key = sarama.StringEncoder(key)
|
||||
|
||||
// Marshal the protobuf message
|
||||
bMsg, err := proto.Marshal(msg)
|
||||
if err != nil {
|
||||
return 0, 0, utils.Wrap(err, "kafka proto Marshal err")
|
||||
@@ -134,20 +143,33 @@ func (p *Producer) SendMessage(ctx context.Context, key string, msg proto.Messag
|
||||
if len(bMsg) == 0 {
|
||||
return 0, 0, utils.Wrap(errEmptyMsg, "")
|
||||
}
|
||||
kMsg.Value = sarama.ByteEncoder(bMsg)
|
||||
|
||||
// Prepare Kafka message
|
||||
kMsg := &sarama.ProducerMessage{
|
||||
Topic: p.topic,
|
||||
Key: sarama.StringEncoder(key),
|
||||
Value: sarama.ByteEncoder(bMsg),
|
||||
}
|
||||
|
||||
// Validate message key and value
|
||||
if kMsg.Key.Length() == 0 || kMsg.Value.Length() == 0 {
|
||||
return 0, 0, utils.Wrap(errEmptyMsg, "")
|
||||
}
|
||||
kMsg.Metadata = ctx
|
||||
|
||||
// Attach context metadata as headers
|
||||
header, err := GetMQHeaderWithContext(ctx)
|
||||
if err != nil {
|
||||
return 0, 0, utils.Wrap(err, "")
|
||||
}
|
||||
kMsg.Headers = header
|
||||
|
||||
// Send the message
|
||||
partition, offset, err := p.producer.SendMessage(kMsg)
|
||||
log.ZDebug(ctx, "ByteEncoder SendMessage end", "key ", kMsg.Key, "key length", kMsg.Value.Length())
|
||||
if err != nil {
|
||||
log.ZWarn(ctx, "p.producer.SendMessage error", err)
|
||||
return 0, 0, utils.Wrap(err, "")
|
||||
}
|
||||
return partition, offset, utils.Wrap(err, "")
|
||||
|
||||
log.ZDebug(ctx, "ByteEncoder SendMessage end", "key", kMsg.Key, "key length", kMsg.Value.Length())
|
||||
return partition, offset, nil
|
||||
}
|
||||
|
||||
@@ -15,6 +15,8 @@
|
||||
package kafka
|
||||
|
||||
import (
|
||||
"os"
|
||||
|
||||
"github.com/IBM/sarama"
|
||||
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
||||
@@ -33,3 +35,12 @@ func SetupTLSConfig(cfg *sarama.Config) {
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
// getEnvOrConfig returns the value of the environment variable if it exists,
|
||||
// otherwise, it returns the value from the configuration file.
|
||||
func getEnvOrConfig(envName string, configValue string) string {
|
||||
if value, exists := os.LookupEnv(envName); exists {
|
||||
return value
|
||||
}
|
||||
return configValue
|
||||
}
|
||||
|
||||
@@ -4,7 +4,7 @@ import ginProm "github.com/openimsdk/open-im-server/v3/pkg/common/ginprometheus"
|
||||
|
||||
/*
|
||||
labels := prometheus.Labels{"label_one": "any", "label_two": "value"}
|
||||
ApiCustomCnt.MetricCollector.(*prometheus.CounterVec).With(labels).Inc()
|
||||
ApiCustomCnt.MetricCollector.(*prometheus.CounterVec).With(labels).Inc().
|
||||
*/
|
||||
var (
|
||||
ApiCustomCnt = &ginProm.Metric{
|
||||
|
||||
@@ -15,7 +15,7 @@ package version
|
||||
// When releasing a new Kubernetes version, this file is updated by
|
||||
// build/mark_new_version.sh to reflect the new version, and then a
|
||||
// git annotated tag (using format vX.Y where X == Major version and Y
|
||||
// == Minor version) is created to point to the commit that updates
|
||||
// == Minor version) is created to point to the commit that updates.
|
||||
var (
|
||||
// TODO: Deprecate gitMajor and gitMinor, use only gitVersion
|
||||
// instead. First step in deprecation, keep the fields but make
|
||||
|
||||
@@ -25,7 +25,7 @@ func Get() Info {
|
||||
}
|
||||
}
|
||||
|
||||
// GetClientVersion returns the git version of the OpenIM client repository
|
||||
// GetClientVersion returns the git version of the OpenIM client repository.
|
||||
func GetClientVersion() (*OpenIMClientVersion, error) {
|
||||
clientVersion, err := getClientVersion()
|
||||
if err != nil {
|
||||
@@ -52,7 +52,7 @@ func getClientVersion() (string, error) {
|
||||
return ref.Hash().String(), nil
|
||||
}
|
||||
|
||||
// GetSingleVersion returns single version of sealer
|
||||
// GetSingleVersion returns single version of sealer.
|
||||
func GetSingleVersion() string {
|
||||
return gitVersion
|
||||
}
|
||||
|
||||
@@ -28,6 +28,8 @@ openim::log::info "\n# Use Docker to start all openim service"
|
||||
|
||||
trap 'openim::util::onCtrlC' INT
|
||||
|
||||
"${OPENIM_ROOT}"/scripts/init-config.sh --skip
|
||||
|
||||
"${OPENIM_ROOT}"/scripts/start-all.sh
|
||||
|
||||
sleep 5
|
||||
|
||||
+102
-69
@@ -1,17 +1,4 @@
|
||||
#!/usr/bin/env bash
|
||||
# Copyright © 2023 OpenIM. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# You may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
# This script automatically initializes various configuration files and can generate example files.
|
||||
|
||||
@@ -48,7 +35,8 @@ declare -A EXAMPLES=(
|
||||
FORCE_OVERWRITE=false
|
||||
SKIP_EXISTING=false
|
||||
GENERATE_EXAMPLES=false
|
||||
CLEAN_ENV_EXAMPLES=false
|
||||
CLEAN_CONFIG=false
|
||||
CLEAN_EXAMPLES=false
|
||||
|
||||
# Function to display help information
|
||||
show_help() {
|
||||
@@ -58,59 +46,45 @@ show_help() {
|
||||
echo " --force Overwrite existing files without prompt"
|
||||
echo " --skip Skip generation if file exists"
|
||||
echo " --examples Generate example files"
|
||||
echo " --clean-env-examples Generate example files in a clean environment"
|
||||
echo " --clean-config Clean all configuration files"
|
||||
echo " --clean-examples Clean all example files"
|
||||
}
|
||||
|
||||
# Function to generate configuration files
|
||||
generate_config_files() {
|
||||
# Loop through each template in TEMPLATES
|
||||
for template in "${!TEMPLATES[@]}"; do
|
||||
# Read the corresponding output files for the template
|
||||
IFS=';' read -ra OUTPUT_FILES <<< "${TEMPLATES[$template]}"
|
||||
for output_file in "${OUTPUT_FILES[@]}"; do
|
||||
# Check if the output file already exists
|
||||
if [[ -f "${output_file}" ]]; then
|
||||
# Handle existing file based on command-line options
|
||||
if [[ "${FORCE_OVERWRITE}" == true ]]; then
|
||||
openim::log::info "Force overwriting ${output_file}."
|
||||
elif [[ "${SKIP_EXISTING}" == true ]]; then
|
||||
openim::log::info "Skipping generation of ${output_file} as it already exists."
|
||||
local output_file="${TEMPLATES[$template]}"
|
||||
if [[ -f "${output_file}" ]]; then
|
||||
if [[ "${FORCE_OVERWRITE}" == true ]]; then
|
||||
openim::log::info "Force overwriting ${output_file}."
|
||||
elif [[ "${SKIP_EXISTING}" == true ]]; then
|
||||
openim::log::info "Skipping generation of ${output_file} as it already exists."
|
||||
continue
|
||||
else
|
||||
echo -n "File ${output_file} already exists. Overwrite? (Y/N): "
|
||||
read -r -n 1 REPLY
|
||||
echo
|
||||
if [[ ! $REPLY =~ ^[Yy]$ ]]; then
|
||||
openim::log::info "Skipping generation of ${output_file}."
|
||||
continue
|
||||
else
|
||||
# Ask user for confirmation to overwrite
|
||||
echo -n "File ${output_file} already exists. Overwrite? (Y/N): "
|
||||
read -r -n 1 REPLY
|
||||
echo
|
||||
if [[ ! $REPLY =~ ^[Yy]$ ]]; then
|
||||
openim::log::info "Skipping generation of ${output_file}."
|
||||
continue
|
||||
fi
|
||||
fi
|
||||
fi
|
||||
|
||||
# Process the template file to generate the output file
|
||||
openim::log::info "⌚ Working with template file: ${template} to generate ${output_file}..."
|
||||
if [[ ! -f "${OPENIM_ROOT}/scripts/genconfig.sh" ]]; then
|
||||
openim::log::error "genconfig.sh script not found"
|
||||
exit 1
|
||||
else
|
||||
if [[ "${SKIP_EXISTING}" == true ]]; then
|
||||
openim::log::info "Generating ${output_file} as it does not exist."
|
||||
fi
|
||||
"${OPENIM_ROOT}/scripts/genconfig.sh" "${ENV_FILE}" "${template}" > "${output_file}" || {
|
||||
openim::log::error "Error processing template file ${template}"
|
||||
exit 1
|
||||
}
|
||||
sleep 0.5
|
||||
done
|
||||
done
|
||||
}
|
||||
|
||||
# Function to generate example files
|
||||
generate_example_files() {
|
||||
for template in "${!EXAMPLES[@]}"; do
|
||||
local example_file="${EXAMPLES[$template]}"
|
||||
if [[ ! -f "${example_file}" ]]; then
|
||||
openim::log::info "Generating example file: ${example_file} from ${template}..."
|
||||
cp "${template}" "${example_file}"
|
||||
fi
|
||||
|
||||
openim::log::info "⌚ Working with template file: ${template} to generate ${output_file}..."
|
||||
if [[ ! -f "${OPENIM_ROOT}/scripts/genconfig.sh" ]]; then
|
||||
openim::log::error "genconfig.sh script not found"
|
||||
exit 1
|
||||
fi
|
||||
"${OPENIM_ROOT}/scripts/genconfig.sh" "${ENV_FILE}" "${template}" > "${output_file}" || {
|
||||
openim::log::error "Error processing template file ${template}"
|
||||
exit 1
|
||||
}
|
||||
sleep 0.5
|
||||
done
|
||||
}
|
||||
|
||||
@@ -120,7 +94,8 @@ declare -A env_vars=(
|
||||
["LOG_STORAGE_LOCATION"]="../logs/"
|
||||
)
|
||||
|
||||
generate_clean_environment_examples() {
|
||||
# Function to generate example files
|
||||
generate_example_files() {
|
||||
env_cmd="env -i"
|
||||
for var in "${!env_vars[@]}"; do
|
||||
env_cmd+=" $var='${env_vars[$var]}'"
|
||||
@@ -128,12 +103,56 @@ generate_clean_environment_examples() {
|
||||
|
||||
for template in "${!EXAMPLES[@]}"; do
|
||||
local example_file="${EXAMPLES[$template]}"
|
||||
openim::log::info "Generating example file: ${example_file} from ${template}..."
|
||||
if [[ -f "${example_file}" ]]; then
|
||||
if [[ "${FORCE_OVERWRITE}" == true ]]; then
|
||||
openim::log::info "Force overwriting example file: ${example_file}."
|
||||
elif [[ "${SKIP_EXISTING}" == true ]]; then
|
||||
openim::log::info "Skipping generation of example file: ${example_file} as it already exists."
|
||||
continue
|
||||
else
|
||||
echo -n "Example file ${example_file} already exists. Overwrite? (Y/N): "
|
||||
read -r -n 1 REPLY
|
||||
echo
|
||||
if [[ ! $REPLY =~ ^[Yy]$ ]]; then
|
||||
openim::log::info "Skipping generation of example file: ${example_file}."
|
||||
continue
|
||||
fi
|
||||
fi
|
||||
elif [[ "${SKIP_EXISTING}" == true ]]; then
|
||||
openim::log::info "Generating example file: ${example_file} as it does not exist."
|
||||
fi
|
||||
|
||||
openim::log::info "⌚ Working with template file: ${template} to generate example file: ${example_file}..."
|
||||
if [[ ! -f "${OPENIM_ROOT}/scripts/genconfig.sh" ]]; then
|
||||
openim::log::error "genconfig.sh script not found"
|
||||
exit 1
|
||||
fi
|
||||
eval "$env_cmd ${OPENIM_ROOT}/scripts/genconfig.sh '${ENV_FILE}' '${template}' > '${example_file}'" || {
|
||||
openim::log::error "Error processing template file ${template}"
|
||||
exit 1
|
||||
}
|
||||
sleep 0.5
|
||||
done
|
||||
}
|
||||
|
||||
|
||||
# Function to clean configuration files
|
||||
clean_config_files() {
|
||||
for output_file in "${TEMPLATES[@]}"; do
|
||||
if [[ -f "${output_file}" ]]; then
|
||||
rm -f "${output_file}"
|
||||
openim::log::info "Removed configuration file: ${output_file}"
|
||||
fi
|
||||
done
|
||||
}
|
||||
|
||||
# Function to clean example files
|
||||
clean_example_files() {
|
||||
for example_file in "${EXAMPLES[@]}"; do
|
||||
if [[ -f "${example_file}" ]]; then
|
||||
rm -f "${example_file}"
|
||||
openim::log::info "Removed example file: ${example_file}"
|
||||
fi
|
||||
done
|
||||
}
|
||||
|
||||
@@ -155,8 +174,12 @@ while [[ $# -gt 0 ]]; do
|
||||
GENERATE_EXAMPLES=true
|
||||
shift
|
||||
;;
|
||||
--clean-env-examples)
|
||||
CLEAN_ENV_EXAMPLES=true
|
||||
--clean-config)
|
||||
CLEAN_CONFIG=true
|
||||
shift
|
||||
;;
|
||||
--clean-examples)
|
||||
CLEAN_EXAMPLES=true
|
||||
shift
|
||||
;;
|
||||
*)
|
||||
@@ -167,19 +190,29 @@ while [[ $# -gt 0 ]]; do
|
||||
esac
|
||||
done
|
||||
|
||||
# Clean configuration files if --clean-config option is provided
|
||||
if [[ "${CLEAN_CONFIG}" == true ]]; then
|
||||
clean_config_files
|
||||
fi
|
||||
|
||||
# Clean example files if --clean-examples option is provided
|
||||
if [[ "${CLEAN_EXAMPLES}" == true ]]; then
|
||||
clean_example_files
|
||||
fi
|
||||
|
||||
# Generate configuration files if requested
|
||||
if [[ "${FORCE_OVERWRITE}" == true || "${SKIP_EXISTING}" == false ]]; then
|
||||
if [[ "${FORCE_OVERWRITE}" == true || "${SKIP_EXISTING}" == false ]] && [[ "${CLEAN_CONFIG}" == false ]]; then
|
||||
generate_config_files
|
||||
fi
|
||||
|
||||
# Generate configuration files if requested
|
||||
if [[ "${SKIP_EXISTING}" == true ]]; then
|
||||
generate_config_files
|
||||
fi
|
||||
|
||||
# Generate example files if --examples option is provided
|
||||
if [[ "${GENERATE_EXAMPLES}" == true ]]; then
|
||||
if [[ "${GENERATE_EXAMPLES}" == true ]] && [[ "${CLEAN_EXAMPLES}" == false ]]; then
|
||||
generate_example_files
|
||||
fi
|
||||
|
||||
# Generate example files in a clean environment if --clean-env-examples option is provided
|
||||
if [[ "${CLEAN_ENV_EXAMPLES}" == true ]]; then
|
||||
generate_clean_environment_examples
|
||||
fi
|
||||
|
||||
openim::log::success "Configuration and example files generation complete!"
|
||||
openim::log::success "Configuration and example files operation complete!"
|
||||
|
||||
@@ -9,7 +9,7 @@ import (
|
||||
"net/http"
|
||||
)
|
||||
|
||||
// API endpoints and other constants
|
||||
// API endpoints and other constants.
|
||||
const (
|
||||
APIHost = "http://127.0.0.1:10002"
|
||||
UserTokenURL = APIHost + "/auth/user_token"
|
||||
@@ -18,27 +18,27 @@ const (
|
||||
OperationID = "1646445464564"
|
||||
)
|
||||
|
||||
// UserTokenRequest represents a request to get a user token
|
||||
// UserTokenRequest represents a request to get a user token.
|
||||
type UserTokenRequest struct {
|
||||
Secret string `json:"secret"`
|
||||
PlatformID int `json:"platformID"`
|
||||
UserID string `json:"userID"`
|
||||
}
|
||||
|
||||
// UserTokenResponse represents a response containing a user token
|
||||
// UserTokenResponse represents a response containing a user token.
|
||||
type UserTokenResponse struct {
|
||||
Token string `json:"token"`
|
||||
ErrCode int `json:"errCode"`
|
||||
}
|
||||
|
||||
// User represents user data for registration
|
||||
// User represents user data for registration.
|
||||
type User struct {
|
||||
UserID string `json:"userID"`
|
||||
Nickname string `json:"nickname"`
|
||||
FaceURL string `json:"faceURL"`
|
||||
}
|
||||
|
||||
// UserRegisterRequest represents a request to register a user
|
||||
// UserRegisterRequest represents a request to register a user.
|
||||
type UserRegisterRequest struct {
|
||||
Secret string `json:"secret"`
|
||||
Users []User `json:"users"`
|
||||
@@ -58,7 +58,7 @@ func main() {
|
||||
}
|
||||
}
|
||||
|
||||
// GetUserToken requests a user token from the API
|
||||
// GetUserToken requests a user token from the API.
|
||||
func GetUserToken(userID string) (string, error) {
|
||||
reqBody := UserTokenRequest{
|
||||
Secret: SecretKey,
|
||||
@@ -88,7 +88,7 @@ func GetUserToken(userID string) (string, error) {
|
||||
return tokenResp.Token, nil
|
||||
}
|
||||
|
||||
// RegisterUser registers a new user using the API
|
||||
// RegisterUser registers a new user using the API.
|
||||
func RegisterUser(token, userID, nickname, faceURL string) error {
|
||||
user := User{
|
||||
UserID: userID,
|
||||
|
||||
@@ -7,18 +7,18 @@ import (
|
||||
"github.com/openimsdk/open-im-server/v3/test/e2e/framework/config"
|
||||
)
|
||||
|
||||
// UserInfoRequest represents a request to get or update user information
|
||||
// UserInfoRequest represents a request to get or update user information.
|
||||
type UserInfoRequest struct {
|
||||
UserIDs []string `json:"userIDs,omitempty"`
|
||||
UserInfo *gettoken.User `json:"userInfo,omitempty"`
|
||||
}
|
||||
|
||||
// GetUsersOnlineStatusRequest represents a request to get users' online status
|
||||
// GetUsersOnlineStatusRequest represents a request to get users' online status.
|
||||
type GetUsersOnlineStatusRequest struct {
|
||||
UserIDs []string `json:"userIDs"`
|
||||
}
|
||||
|
||||
// GetUsersInfo retrieves detailed information for a list of user IDs
|
||||
// GetUsersInfo retrieves detailed information for a list of user IDs.
|
||||
func GetUsersInfo(token string, userIDs []string) error {
|
||||
|
||||
url := fmt.Sprintf("http://%s:%s/user/get_users_info", config.LoadConfig().APIHost, config.LoadConfig().APIPort)
|
||||
@@ -29,7 +29,7 @@ func GetUsersInfo(token string, userIDs []string) error {
|
||||
return sendPostRequestWithToken(url, token, requestBody)
|
||||
}
|
||||
|
||||
// UpdateUserInfo updates the information for a user
|
||||
// UpdateUserInfo updates the information for a user.
|
||||
func UpdateUserInfo(token, userID, nickname, faceURL string) error {
|
||||
|
||||
url := fmt.Sprintf("http://%s:%s/user/update_user_info", config.LoadConfig().APIHost, config.LoadConfig().APIPort)
|
||||
@@ -44,7 +44,7 @@ func UpdateUserInfo(token, userID, nickname, faceURL string) error {
|
||||
return sendPostRequestWithToken(url, token, requestBody)
|
||||
}
|
||||
|
||||
// GetUsersOnlineStatus retrieves the online status for a list of user IDs
|
||||
// GetUsersOnlineStatus retrieves the online status for a list of user IDs.
|
||||
func GetUsersOnlineStatus(token string, userIDs []string) error {
|
||||
|
||||
url := fmt.Sprintf("http://%s:%s/user/get_users_online_status", config.LoadConfig().APIHost, config.LoadConfig().APIPort)
|
||||
|
||||
@@ -11,29 +11,29 @@ import (
|
||||
"github.com/openimsdk/open-im-server/v3/test/e2e/framework/config"
|
||||
)
|
||||
|
||||
// ForceLogoutRequest represents a request to force a user logout
|
||||
// ForceLogoutRequest represents a request to force a user logout.
|
||||
type ForceLogoutRequest struct {
|
||||
PlatformID int `json:"platformID"`
|
||||
UserID string `json:"userID"`
|
||||
}
|
||||
|
||||
// CheckUserAccountRequest represents a request to check a user account
|
||||
// CheckUserAccountRequest represents a request to check a user account.
|
||||
type CheckUserAccountRequest struct {
|
||||
CheckUserIDs []string `json:"checkUserIDs"`
|
||||
}
|
||||
|
||||
// GetUsersRequest represents a request to get a list of users
|
||||
// GetUsersRequest represents a request to get a list of users.
|
||||
type GetUsersRequest struct {
|
||||
Pagination Pagination `json:"pagination"`
|
||||
}
|
||||
|
||||
// Pagination specifies the page number and number of items per page
|
||||
// Pagination specifies the page number and number of items per page.
|
||||
type Pagination struct {
|
||||
PageNumber int `json:"pageNumber"`
|
||||
ShowNumber int `json:"showNumber"`
|
||||
}
|
||||
|
||||
// ForceLogout forces a user to log out
|
||||
// ForceLogout forces a user to log out.
|
||||
func ForceLogout(token, userID string, platformID int) error {
|
||||
|
||||
url := fmt.Sprintf("http://%s:%s/auth/force_logout", config.LoadConfig().APIHost, config.LoadConfig().APIPort)
|
||||
@@ -45,7 +45,7 @@ func ForceLogout(token, userID string, platformID int) error {
|
||||
return sendPostRequestWithToken(url, token, requestBody)
|
||||
}
|
||||
|
||||
// CheckUserAccount checks if the user accounts exist
|
||||
// CheckUserAccount checks if the user accounts exist.
|
||||
func CheckUserAccount(token string, userIDs []string) error {
|
||||
|
||||
url := fmt.Sprintf("http://%s:%s/user/account_check", config.LoadConfig().APIHost, config.LoadConfig().APIPort)
|
||||
@@ -56,7 +56,7 @@ func CheckUserAccount(token string, userIDs []string) error {
|
||||
return sendPostRequestWithToken(url, token, requestBody)
|
||||
}
|
||||
|
||||
// GetUsers retrieves a list of users with pagination
|
||||
// GetUsers retrieves a list of users with pagination.
|
||||
func GetUsers(token string, pageNumber, showNumber int) error {
|
||||
|
||||
url := fmt.Sprintf("http://%s:%s/user/account_check", config.LoadConfig().APIHost, config.LoadConfig().APIPort)
|
||||
@@ -70,7 +70,7 @@ func GetUsers(token string, pageNumber, showNumber int) error {
|
||||
return sendPostRequestWithToken(url, token, requestBody)
|
||||
}
|
||||
|
||||
// sendPostRequestWithToken sends a POST request with a token in the header
|
||||
// sendPostRequestWithToken sends a POST request with a token in the header.
|
||||
func sendPostRequestWithToken(url, token string, body any) error {
|
||||
reqBytes, err := json.Marshal(body)
|
||||
if err != nil {
|
||||
|
||||
@@ -10,7 +10,7 @@ import (
|
||||
)
|
||||
|
||||
var (
|
||||
// The default template version
|
||||
// The default template version.
|
||||
defaultTemplateVersion = "v1.3.0"
|
||||
)
|
||||
|
||||
@@ -84,7 +84,7 @@ func main() {
|
||||
select {}
|
||||
}
|
||||
|
||||
// getLatestVersion fetches the latest version number from a given URL
|
||||
// getLatestVersion fetches the latest version number from a given URL.
|
||||
func getLatestVersion(url string) (string, error) {
|
||||
resp, err := http.Get(url)
|
||||
if err != nil {
|
||||
@@ -102,7 +102,7 @@ func getLatestVersion(url string) (string, error) {
|
||||
return latestVersion, nil
|
||||
}
|
||||
|
||||
// downloadAndExtract downloads a file from a URL and extracts it to a destination directory
|
||||
// downloadAndExtract downloads a file from a URL and extracts it to a destination directory.
|
||||
func downloadAndExtract(url, destDir string) error {
|
||||
resp, err := http.Get(url)
|
||||
if err != nil {
|
||||
@@ -141,7 +141,7 @@ func downloadAndExtract(url, destDir string) error {
|
||||
return cmd.Run()
|
||||
}
|
||||
|
||||
// startProcess starts a process and prints any errors encountered
|
||||
// startProcess starts a process and prints any errors encountered.
|
||||
func startProcess(cmdPath string) {
|
||||
cmd := exec.Command(cmdPath)
|
||||
cmd.Stdout = os.Stdout
|
||||
|
||||
@@ -1,11 +1,12 @@
|
||||
package pkg
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
mongoModel "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation"
|
||||
mysqlModel "github.com/openimsdk/open-im-server/v3/tools/data-conversion/openim/mysql/v3"
|
||||
mongoModelRtc "github.com/openimsdk/open-im-server/v3/tools/up35/pkg/internal/rtc/mongo/table"
|
||||
mysqlModelRtc "github.com/openimsdk/open-im-server/v3/tools/up35/pkg/internal/rtc/mysql"
|
||||
"time"
|
||||
)
|
||||
|
||||
type convert struct{}
|
||||
|
||||
@@ -2,13 +2,15 @@ package mgo
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/OpenIMSDK/tools/mgoutil"
|
||||
"github.com/OpenIMSDK/tools/pagination"
|
||||
"github.com/openimsdk/open-im-server/v3/tools/up35/pkg/internal/rtc/mongo/table"
|
||||
"go.mongodb.org/mongo-driver/bson"
|
||||
"go.mongodb.org/mongo-driver/mongo"
|
||||
"go.mongodb.org/mongo-driver/mongo/options"
|
||||
"time"
|
||||
|
||||
"github.com/openimsdk/open-im-server/v3/tools/up35/pkg/internal/rtc/mongo/table"
|
||||
)
|
||||
|
||||
func NewMeeting(db *mongo.Database) (table.MeetingInterface, error) {
|
||||
|
||||
@@ -2,14 +2,16 @@ package mgo
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/OpenIMSDK/tools/mgoutil"
|
||||
"github.com/OpenIMSDK/tools/pagination"
|
||||
"github.com/OpenIMSDK/tools/utils"
|
||||
"github.com/openimsdk/open-im-server/v3/tools/up35/pkg/internal/rtc/mongo/table"
|
||||
"go.mongodb.org/mongo-driver/bson"
|
||||
"go.mongodb.org/mongo-driver/mongo"
|
||||
"go.mongodb.org/mongo-driver/mongo/options"
|
||||
"time"
|
||||
|
||||
"github.com/openimsdk/open-im-server/v3/tools/up35/pkg/internal/rtc/mongo/table"
|
||||
)
|
||||
|
||||
func NewMeetingInvitation(db *mongo.Database) (table.MeetingInvitationInterface, error) {
|
||||
@@ -55,7 +57,12 @@ func (x *meetingInvitation) CreateMeetingInvitationInfo(ctx context.Context, roo
|
||||
|
||||
func (x *meetingInvitation) GetUserInvitedMeetingIDs(ctx context.Context, userID string) (meetingIDs []string, err error) {
|
||||
fiveDaysAgo := time.Now().AddDate(0, 0, -5)
|
||||
return mgoutil.Find[string](ctx, x.coll, bson.M{"user_id": userID, "create_time": bson.M{"$gte": fiveDaysAgo}}, options.Find().SetSort(bson.M{"create_time": -1}).SetProjection(bson.M{"_id": 0, "room_id": 1}))
|
||||
return mgoutil.Find[string](
|
||||
ctx,
|
||||
x.coll,
|
||||
bson.M{"user_id": userID, "create_time": bson.M{"$gte": fiveDaysAgo}},
|
||||
options.Find().SetSort(bson.M{"create_time": -1}).SetProjection(bson.M{"_id": 0, "room_id": 1}),
|
||||
)
|
||||
}
|
||||
|
||||
func (x *meetingInvitation) Delete(ctx context.Context, roomIDs []string) error {
|
||||
|
||||
@@ -2,10 +2,12 @@ package mgo
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/OpenIMSDK/tools/mgoutil"
|
||||
"github.com/openimsdk/open-im-server/v3/tools/up35/pkg/internal/rtc/mongo/table"
|
||||
"go.mongodb.org/mongo-driver/bson"
|
||||
"go.mongodb.org/mongo-driver/mongo"
|
||||
|
||||
"github.com/openimsdk/open-im-server/v3/tools/up35/pkg/internal/rtc/mongo/table"
|
||||
)
|
||||
|
||||
func NewMeetingRecord(db *mongo.Database) (table.MeetingRecordInterface, error) {
|
||||
|
||||
@@ -2,13 +2,15 @@ package mgo
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/OpenIMSDK/tools/mgoutil"
|
||||
"github.com/OpenIMSDK/tools/pagination"
|
||||
"github.com/openimsdk/open-im-server/v3/tools/up35/pkg/internal/rtc/mongo/table"
|
||||
"go.mongodb.org/mongo-driver/bson"
|
||||
"go.mongodb.org/mongo-driver/mongo"
|
||||
"go.mongodb.org/mongo-driver/mongo/options"
|
||||
"time"
|
||||
|
||||
"github.com/openimsdk/open-im-server/v3/tools/up35/pkg/internal/rtc/mongo/table"
|
||||
)
|
||||
|
||||
func NewSignal(db *mongo.Database) (table.SignalInterface, error) {
|
||||
|
||||
@@ -2,14 +2,16 @@ package mgo
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/OpenIMSDK/tools/mgoutil"
|
||||
"github.com/OpenIMSDK/tools/pagination"
|
||||
"github.com/OpenIMSDK/tools/utils"
|
||||
"github.com/openimsdk/open-im-server/v3/tools/up35/pkg/internal/rtc/mongo/table"
|
||||
"go.mongodb.org/mongo-driver/bson"
|
||||
"go.mongodb.org/mongo-driver/mongo"
|
||||
"go.mongodb.org/mongo-driver/mongo/options"
|
||||
"time"
|
||||
|
||||
"github.com/openimsdk/open-im-server/v3/tools/up35/pkg/internal/rtc/mongo/table"
|
||||
)
|
||||
|
||||
func NewSignalInvitation(db *mongo.Database) (table.SignalInvitationInterface, error) {
|
||||
|
||||
@@ -2,8 +2,9 @@ package table
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/OpenIMSDK/tools/pagination"
|
||||
"time"
|
||||
|
||||
"github.com/OpenIMSDK/tools/pagination"
|
||||
)
|
||||
|
||||
type MeetingInfo struct {
|
||||
|
||||
@@ -2,11 +2,12 @@ package table
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/OpenIMSDK/tools/errs"
|
||||
"github.com/OpenIMSDK/tools/pagination"
|
||||
"github.com/redis/go-redis/v9"
|
||||
"go.mongodb.org/mongo-driver/mongo"
|
||||
"time"
|
||||
)
|
||||
|
||||
type SignalModel struct {
|
||||
|
||||
@@ -16,7 +16,7 @@ type SignalModel struct {
|
||||
SessionType int32 `gorm:"column:sesstion_type"`
|
||||
InitiateTime time.Time `gorm:"column:initiate_time"`
|
||||
EndTime time.Time `gorm:"column:end_time"`
|
||||
FileURL string `gorm:"column:file_url" json:"-"`
|
||||
FileURL string `gorm:"column:file_url" json:"-"`
|
||||
|
||||
Title string `gorm:"column:title;size:128"`
|
||||
Desc string `gorm:"column:desc;size:1024"`
|
||||
|
||||
@@ -4,12 +4,13 @@ import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"gopkg.in/yaml.v3"
|
||||
"log"
|
||||
"os"
|
||||
"reflect"
|
||||
"strconv"
|
||||
|
||||
"gopkg.in/yaml.v3"
|
||||
|
||||
"github.com/go-sql-driver/mysql"
|
||||
"go.mongodb.org/mongo-driver/bson"
|
||||
"go.mongodb.org/mongo-driver/mongo"
|
||||
|
||||
+2
-1
@@ -2,9 +2,10 @@ package main
|
||||
|
||||
import (
|
||||
"flag"
|
||||
"github.com/openimsdk/open-im-server/v3/tools/up35/pkg"
|
||||
"log"
|
||||
"os"
|
||||
|
||||
"github.com/openimsdk/open-im-server/v3/tools/up35/pkg"
|
||||
)
|
||||
|
||||
func main() {
|
||||
|
||||
Reference in New Issue
Block a user