mirror of
https://github.com/openimsdk/open-im-server.git
synced 2026-04-28 06:19:20 +08:00
feat: compatible autoSetPorts (#2929)
This commit is contained in:
@@ -16,6 +16,7 @@ package startrpc
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
||||
"github.com/openimsdk/tools/utils/datautil"
|
||||
@@ -27,7 +28,6 @@ import (
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/openimsdk/open-im-server/v3/internal/tools/addr"
|
||||
kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
|
||||
"github.com/openimsdk/tools/discovery"
|
||||
@@ -41,11 +41,60 @@ import (
|
||||
)
|
||||
|
||||
// Start rpc server.
|
||||
func Start[T any](ctx context.Context, discovery *config.Discovery, prometheusConfig *config.Prometheus, listenIP string,
|
||||
index int, rpcRegisterName string, share *config.Share, config T, rpcFn func(ctx context.Context,
|
||||
config T, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error, options ...grpc.ServerOption) error {
|
||||
func Start[T any](ctx context.Context, discovery *config.Discovery, prometheusConfig *config.Prometheus, listenIP,
|
||||
registerIP string, autoSetPorts bool, rpcPorts []int, index int, rpcRegisterName string, share *config.Share, config T, rpcFn func(ctx context.Context,
|
||||
config T, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error, options ...grpc.ServerOption) error {
|
||||
|
||||
var (
|
||||
rpcTcpAddr string
|
||||
netDone = make(chan struct{}, 2)
|
||||
netErr error
|
||||
)
|
||||
|
||||
if !autoSetPorts {
|
||||
rpcPort, err := datautil.GetElemByIndex(rpcPorts, index)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
rpcTcpAddr = net.JoinHostPort(network.GetListenIP(listenIP), strconv.Itoa(rpcPort))
|
||||
} else {
|
||||
rpcTcpAddr = net.JoinHostPort(network.GetListenIP(listenIP), "0")
|
||||
}
|
||||
|
||||
// var reg *prometheus.Registry
|
||||
// var metric *grpcprometheus.ServerMetrics
|
||||
if prometheusConfig.Enable {
|
||||
// cusMetrics := prommetrics.GetGrpcCusMetrics(rpcRegisterName, share)
|
||||
// reg, metric, _ = prommetrics.NewGrpcPromObj(cusMetrics)
|
||||
// options = append(options, mw.GrpcServer(), grpc.StreamInterceptor(metric.StreamServerInterceptor()),
|
||||
// grpc.UnaryInterceptor(metric.UnaryServerInterceptor()))
|
||||
options = append(
|
||||
options, mw.GrpcServer(),
|
||||
prommetricsUnaryInterceptor(rpcRegisterName),
|
||||
prommetricsStreamInterceptor(rpcRegisterName),
|
||||
)
|
||||
prometheusPort, err := datautil.GetElemByIndex(prometheusConfig.Ports, index)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
cs := prommetrics.GetGrpcCusMetrics(rpcRegisterName, share)
|
||||
go func() {
|
||||
if err := prommetrics.RpcInit(cs, prometheusPort); err != nil && !errors.Is(err, http.ErrServerClosed) {
|
||||
netErr = errs.WrapMsg(err, fmt.Sprintf("rpc %s prometheus start err: %d", rpcRegisterName, prometheusPort))
|
||||
netDone <- struct{}{}
|
||||
}
|
||||
// metric.InitializeMetrics(srv)
|
||||
// Create a HTTP server for prometheus.
|
||||
// httpServer = &http.Server{Handler: promhttp.HandlerFor(reg, promhttp.HandlerOpts{}), Addr: fmt.Sprintf("0.0.0.0:%d", prometheusPort)}
|
||||
// if err := httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed {
|
||||
// netErr = errs.WrapMsg(err, "prometheus start err", httpServer.Addr)
|
||||
// netDone <- struct{}{}
|
||||
// }
|
||||
}()
|
||||
} else {
|
||||
options = append(options, mw.GrpcServer())
|
||||
}
|
||||
|
||||
rpcTcpAddr := net.JoinHostPort(network.GetListenIP(listenIP), "0")
|
||||
listener, err := net.Listen(
|
||||
"tcp",
|
||||
rpcTcpAddr,
|
||||
@@ -54,8 +103,8 @@ func Start[T any](ctx context.Context, discovery *config.Discovery, prometheusCo
|
||||
return errs.WrapMsg(err, "listen err", "rpcTcpAddr", rpcTcpAddr)
|
||||
}
|
||||
|
||||
h, portStr, _ := net.SplitHostPort(listener.Addr().String())
|
||||
host, _ := addr.Extract(h)
|
||||
_, portStr, _ := net.SplitHostPort(listener.Addr().String())
|
||||
registerIP = network.GetListenIP(registerIP)
|
||||
port, _ := strconv.Atoi(portStr)
|
||||
|
||||
log.CInfo(ctx, "RPC server is initializing", "rpcRegisterName", rpcRegisterName, "rpcPort", portStr,
|
||||
@@ -70,22 +119,6 @@ func Start[T any](ctx context.Context, discovery *config.Discovery, prometheusCo
|
||||
defer client.Close()
|
||||
client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin")))
|
||||
|
||||
// var reg *prometheus.Registry
|
||||
// var metric *grpcprometheus.ServerMetrics
|
||||
if prometheusConfig.Enable {
|
||||
// cusMetrics := prommetrics.GetGrpcCusMetrics(rpcRegisterName, share)
|
||||
// reg, metric, _ = prommetrics.NewGrpcPromObj(cusMetrics)
|
||||
// options = append(options, mw.GrpcServer(), grpc.StreamInterceptor(metric.StreamServerInterceptor()),
|
||||
// grpc.UnaryInterceptor(metric.UnaryServerInterceptor()))
|
||||
options = append(
|
||||
options, mw.GrpcServer(),
|
||||
prommetricsUnaryInterceptor(rpcRegisterName),
|
||||
prommetricsStreamInterceptor(rpcRegisterName),
|
||||
)
|
||||
} else {
|
||||
options = append(options, mw.GrpcServer())
|
||||
}
|
||||
|
||||
srv := grpc.NewServer(options...)
|
||||
|
||||
err = rpcFn(ctx, config, client, srv)
|
||||
@@ -95,7 +128,7 @@ func Start[T any](ctx context.Context, discovery *config.Discovery, prometheusCo
|
||||
|
||||
err = client.Register(
|
||||
rpcRegisterName,
|
||||
host,
|
||||
registerIP,
|
||||
port,
|
||||
grpc.WithTransportCredentials(insecure.NewCredentials()),
|
||||
)
|
||||
@@ -103,33 +136,6 @@ func Start[T any](ctx context.Context, discovery *config.Discovery, prometheusCo
|
||||
return err
|
||||
}
|
||||
|
||||
var (
|
||||
netDone = make(chan struct{}, 2)
|
||||
netErr error
|
||||
)
|
||||
if prometheusConfig.Enable {
|
||||
go func() {
|
||||
prometheusPort, err := datautil.GetElemByIndex(prometheusConfig.Ports, index)
|
||||
if err != nil {
|
||||
netErr = err
|
||||
netDone <- struct{}{}
|
||||
return
|
||||
}
|
||||
cs := prommetrics.GetGrpcCusMetrics(rpcRegisterName, share)
|
||||
if err := prommetrics.RpcInit(cs, prometheusPort); err != nil && err != http.ErrServerClosed {
|
||||
netErr = errs.WrapMsg(err, fmt.Sprintf("rpc %s prometheus start err: %d", rpcRegisterName, prometheusPort))
|
||||
netDone <- struct{}{}
|
||||
}
|
||||
// metric.InitializeMetrics(srv)
|
||||
// Create a HTTP server for prometheus.
|
||||
// httpServer = &http.Server{Handler: promhttp.HandlerFor(reg, promhttp.HandlerOpts{}), Addr: fmt.Sprintf("0.0.0.0:%d", prometheusPort)}
|
||||
// if err := httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed {
|
||||
// netErr = errs.WrapMsg(err, "prometheus start err", httpServer.Addr)
|
||||
// netDone <- struct{}{}
|
||||
// }
|
||||
}()
|
||||
}
|
||||
|
||||
go func() {
|
||||
err := srv.Serve(listener)
|
||||
if err != nil {
|
||||
|
||||
Reference in New Issue
Block a user