mirror of
https://github.com/openimsdk/open-im-server.git
synced 2026-04-28 06:19:20 +08:00
fix: prometheus discovery (#3408)
This commit is contained in:
@@ -39,7 +39,7 @@ type Config struct {
|
||||
Index conf.Index
|
||||
}
|
||||
|
||||
func Start(ctx context.Context, config *Config, client discovery.Conn, service grpc.ServiceRegistrar) error {
|
||||
func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, service grpc.ServiceRegistrar) error {
|
||||
apiPort, err := datautil.GetElemByIndex(config.API.Api.Ports, int(config.Index))
|
||||
if err != nil {
|
||||
return err
|
||||
|
||||
@@ -6,35 +6,29 @@ import (
|
||||
"net/http"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
conf "github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
|
||||
"github.com/openimsdk/tools/apiresp"
|
||||
"github.com/openimsdk/tools/discovery"
|
||||
"github.com/openimsdk/tools/discovery/etcd"
|
||||
"github.com/openimsdk/tools/errs"
|
||||
clientv3 "go.etcd.io/etcd/client/v3"
|
||||
)
|
||||
|
||||
type PrometheusDiscoveryApi struct {
|
||||
config *Config
|
||||
client *clientv3.Client
|
||||
kv discovery.KeyValue
|
||||
}
|
||||
|
||||
func NewPrometheusDiscoveryApi(config *Config, client discovery.Conn) *PrometheusDiscoveryApi {
|
||||
func NewPrometheusDiscoveryApi(config *Config, client discovery.SvcDiscoveryRegistry) *PrometheusDiscoveryApi {
|
||||
api := &PrometheusDiscoveryApi{
|
||||
config: config,
|
||||
}
|
||||
if config.Discovery.Enable == conf.ETCD {
|
||||
api.client = client.(*etcd.SvcDiscoveryRegistryImpl).GetClient()
|
||||
kv: client,
|
||||
}
|
||||
return api
|
||||
}
|
||||
|
||||
func (p *PrometheusDiscoveryApi) discovery(c *gin.Context, key string) {
|
||||
value, err := p.kv.GetKey(c, prommetrics.BuildDiscoveryKey(key))
|
||||
value, err := p.kv.GetKeyWithPrefix(c, prommetrics.BuildDiscoveryKeyPrefix(key))
|
||||
if err != nil {
|
||||
if errors.Is(err, discovery.ErrNotSupportedKeyValue) {
|
||||
if errors.Is(err, discovery.ErrNotSupported) {
|
||||
c.JSON(http.StatusOK, []struct{}{})
|
||||
return
|
||||
}
|
||||
@@ -46,10 +40,17 @@ func (p *PrometheusDiscoveryApi) discovery(c *gin.Context, key string) {
|
||||
return
|
||||
}
|
||||
var resp prommetrics.RespTarget
|
||||
if err := json.Unmarshal(value, &resp); err != nil {
|
||||
apiresp.GinError(c, errs.WrapMsg(err, "json unmarshal err"))
|
||||
return
|
||||
for i := range value {
|
||||
var tmp prommetrics.Target
|
||||
if err = json.Unmarshal(value[i], &tmp); err != nil {
|
||||
apiresp.GinError(c, errs.WrapMsg(err, "json unmarshal err"))
|
||||
return
|
||||
}
|
||||
|
||||
resp.Targets = append(resp.Targets, tmp.Target)
|
||||
resp.Labels = tmp.Labels // default label is fixed. See prommetrics.BuildDefaultTarget
|
||||
}
|
||||
|
||||
c.JSON(http.StatusOK, []*prommetrics.RespTarget{&resp})
|
||||
}
|
||||
|
||||
|
||||
@@ -53,7 +53,7 @@ func prommetricsGin() gin.HandlerFunc {
|
||||
}
|
||||
}
|
||||
|
||||
func newGinRouter(ctx context.Context, client discovery.Conn, cfg *Config) (*gin.Engine, error) {
|
||||
func newGinRouter(ctx context.Context, client discovery.SvcDiscoveryRegistry, cfg *Config) (*gin.Engine, error) {
|
||||
authConn, err := client.GetConn(ctx, cfg.Discovery.RpcService.Auth)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
||||
@@ -39,7 +39,7 @@ type Config struct {
|
||||
}
|
||||
|
||||
// Start run ws server.
|
||||
func Start(ctx context.Context, conf *Config, client discovery.Conn, server grpc.ServiceRegistrar) error {
|
||||
func Start(ctx context.Context, conf *Config, client discovery.SvcDiscoveryRegistry, server grpc.ServiceRegistrar) error {
|
||||
log.CInfo(ctx, "MSG-GATEWAY server is initializing", "runtimeEnv", runtimeenv.RuntimeEnvironment(),
|
||||
"rpcPorts", conf.MsgGateway.RPC.Ports,
|
||||
"wsPort", conf.MsgGateway.LongConnSvr.Ports, "prometheusPorts", conf.MsgGateway.Prometheus.Ports)
|
||||
|
||||
@@ -58,7 +58,7 @@ type Config struct {
|
||||
Index conf.Index
|
||||
}
|
||||
|
||||
func Start(ctx context.Context, config *Config, client discovery.Conn, server grpc.ServiceRegistrar) error {
|
||||
func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server grpc.ServiceRegistrar) error {
|
||||
builder := mqbuild.NewBuilder(&config.KafkaConfig)
|
||||
|
||||
log.CInfo(ctx, "MSG-TRANSFER server is initializing", "runTimeEnv", runtimeenv.RuntimeEnvironment(), "prometheusPorts",
|
||||
|
||||
@@ -50,7 +50,7 @@ func (p pushServer) DelUserPushToken(ctx context.Context,
|
||||
return &pbpush.DelUserPushTokenResp{}, nil
|
||||
}
|
||||
|
||||
func Start(ctx context.Context, config *Config, client discovery.Conn, server grpc.ServiceRegistrar) error {
|
||||
func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server grpc.ServiceRegistrar) error {
|
||||
dbb := dbbuild.NewBuilder(&config.MongoConfig, &config.RedisConfig)
|
||||
rdb, err := dbb.Redis(ctx)
|
||||
if err != nil {
|
||||
|
||||
@@ -59,7 +59,7 @@ type Config struct {
|
||||
Discovery config.Discovery
|
||||
}
|
||||
|
||||
func Start(ctx context.Context, config *Config, client discovery.Conn, server grpc.ServiceRegistrar) error {
|
||||
func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server grpc.ServiceRegistrar) error {
|
||||
dbb := dbbuild.NewBuilder(&config.MongoConfig, &config.RedisConfig)
|
||||
rdb, err := dbb.Redis(ctx)
|
||||
if err != nil {
|
||||
|
||||
@@ -69,7 +69,7 @@ type Config struct {
|
||||
Discovery config.Discovery
|
||||
}
|
||||
|
||||
func Start(ctx context.Context, config *Config, client discovery.Conn, server grpc.ServiceRegistrar) error {
|
||||
func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server grpc.ServiceRegistrar) error {
|
||||
dbb := dbbuild.NewBuilder(&config.MongodbConfig, &config.RedisConfig)
|
||||
mgocli, err := dbb.Mongo(ctx)
|
||||
if err != nil {
|
||||
|
||||
@@ -76,7 +76,7 @@ type Config struct {
|
||||
Discovery config.Discovery
|
||||
}
|
||||
|
||||
func Start(ctx context.Context, config *Config, client discovery.Conn, server grpc.ServiceRegistrar) error {
|
||||
func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server grpc.ServiceRegistrar) error {
|
||||
dbb := dbbuild.NewBuilder(&config.MongodbConfig, &config.RedisConfig)
|
||||
mgocli, err := dbb.Mongo(ctx)
|
||||
if err != nil {
|
||||
|
||||
@@ -78,7 +78,7 @@ func (m *msgServer) addInterceptorHandler(interceptorFunc ...MessageInterceptorF
|
||||
|
||||
}
|
||||
|
||||
func Start(ctx context.Context, config *Config, client discovery.Conn, server grpc.ServiceRegistrar) error {
|
||||
func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server grpc.ServiceRegistrar) error {
|
||||
builder := mqbuild.NewBuilder(&config.KafkaConfig)
|
||||
redisProducer, err := builder.GetTopicProducer(ctx, config.KafkaConfig.ToRedisTopic)
|
||||
if err != nil {
|
||||
|
||||
@@ -66,7 +66,7 @@ type Config struct {
|
||||
Discovery config.Discovery
|
||||
}
|
||||
|
||||
func Start(ctx context.Context, config *Config, client discovery.Conn, server grpc.ServiceRegistrar) error {
|
||||
func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server grpc.ServiceRegistrar) error {
|
||||
dbb := dbbuild.NewBuilder(&config.MongodbConfig, &config.RedisConfig)
|
||||
mgocli, err := dbb.Mongo(ctx)
|
||||
if err != nil {
|
||||
|
||||
@@ -64,7 +64,7 @@ type Config struct {
|
||||
Discovery config.Discovery
|
||||
}
|
||||
|
||||
func Start(ctx context.Context, config *Config, client discovery.Conn, server grpc.ServiceRegistrar) error {
|
||||
func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server grpc.ServiceRegistrar) error {
|
||||
dbb := dbbuild.NewBuilder(&config.MongodbConfig, &config.RedisConfig)
|
||||
mgocli, err := dbb.Mongo(ctx)
|
||||
if err != nil {
|
||||
|
||||
@@ -79,7 +79,7 @@ type Config struct {
|
||||
Discovery config.Discovery
|
||||
}
|
||||
|
||||
func Start(ctx context.Context, config *Config, client discovery.Conn, server grpc.ServiceRegistrar) error {
|
||||
func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server grpc.ServiceRegistrar) error {
|
||||
dbb := dbbuild.NewBuilder(&config.MongodbConfig, &config.RedisConfig)
|
||||
mgocli, err := dbb.Mongo(ctx)
|
||||
if err != nil {
|
||||
|
||||
@@ -25,7 +25,7 @@ type Config struct {
|
||||
Discovery config.Discovery
|
||||
}
|
||||
|
||||
func Start(ctx context.Context, conf *Config, client discovery.Conn, service grpc.ServiceRegistrar) error {
|
||||
func Start(ctx context.Context, conf *Config, client discovery.SvcDiscoveryRegistry, service grpc.ServiceRegistrar) error {
|
||||
log.CInfo(ctx, "CRON-TASK server is initializing", "runTimeEnv", runtimeenv.RuntimeEnvironment(), "chatRecordsClearTime", conf.CronTask.CronExecuteTime, "msgDestructTime", conf.CronTask.RetainChatRecords)
|
||||
if conf.CronTask.RetainChatRecords < 1 {
|
||||
log.ZInfo(ctx, "disable cron")
|
||||
|
||||
Reference in New Issue
Block a user