fix: graceful exit for kafka consumer of msgtransfer (#1483)

* fix: graceful exit for kafka consumer of msgtransfer

Signed-off-by: rfyiamcool <rfyiamcool@163.com>

* Update init.go

* Update init.go

---------

Signed-off-by: rfyiamcool <rfyiamcool@163.com>
Co-authored-by: OpenIM-Gordon <46924906+FGadvancer@users.noreply.github.com>
This commit is contained in:
fengyun.rui
2024-02-05 10:37:53 +08:00
committed by GitHub
parent 0865eb65b1
commit 31381935f1
3 changed files with 128 additions and 39 deletions
@@ -19,6 +19,7 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
@@ -431,16 +432,29 @@ func (och *OnlineHistoryRedisConsumerHandler) ConsumeClaim(
log.ZDebug(context.Background(), "online new session msg come", "highWaterMarkOffset",
claim.HighWaterMarkOffset(), "topic", claim.Topic(), "partition", claim.Partition())
split := 1000
rwLock := new(sync.RWMutex)
messages := make([]*sarama.ConsumerMessage, 0, 1000)
ticker := time.NewTicker(time.Millisecond * 100)
var (
split = 1000
rwLock = new(sync.RWMutex)
messages = make([]*sarama.ConsumerMessage, 0, 1000)
ticker = time.NewTicker(time.Millisecond * 100)
wg = sync.WaitGroup{}
running = new(atomic.Bool)
)
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-ticker.C:
// if the buffer is empty and running is false, return loop.
if len(messages) == 0 {
if !running.Load() {
return
}
continue
}
@@ -473,17 +487,35 @@ func (och *OnlineHistoryRedisConsumerHandler) ConsumeClaim(
}
}()
for msg := range claim.Messages() {
if len(msg.Value) == 0 {
continue
wg.Add(1)
go func() {
defer wg.Done()
for running.Load() {
select {
case msg, ok := <-claim.Messages():
if !ok {
running.Store(false)
return
}
if len(msg.Value) == 0 {
continue
}
rwLock.Lock()
messages = append(messages, msg)
rwLock.Unlock()
sess.MarkMessage(msg, "")
case <-sess.Context().Done():
running.Store(false)
return
}
}
}()
rwLock.Lock()
messages = append(messages, msg)
rwLock.Unlock()
sess.MarkMessage(msg, "")
}
wg.Wait()
return nil
}