Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Fix] Fix the dispatcher() stuck caused by availablePermitsCh #875

Merged
merged 2 commits into from
Oct 31, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 40 additions & 44 deletions pulsar/consumer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"math"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/gogo/protobuf/proto"
Expand Down Expand Up @@ -83,15 +84,6 @@ const (
noMessageEntry = -1
)

type permitsReq int32

const (
// reset the availablePermits of pc
permitsReset permitsReq = iota
// increase the availablePermits
permitsInc
)

type partitionConsumerOpts struct {
topic string
consumerName string
Expand Down Expand Up @@ -141,8 +133,7 @@ type partitionConsumer struct {
messageCh chan ConsumerMessage

// the number of message slots available
availablePermits int32
availablePermitsCh chan permitsReq
availablePermits *availablePermits

// the size of the queue channel for buffering messages
queueSize int32
Expand Down Expand Up @@ -170,6 +161,37 @@ type partitionConsumer struct {
unAckChunksTracker *unAckChunksTracker
}

type availablePermits struct {
permits int32
pc *partitionConsumer
}

func (p *availablePermits) inc() {
// atomic add availablePermits
ap := atomic.AddInt32(&p.permits, 1)

// TODO implement a better flow controller
// send more permits if needed
flowThreshold := int32(math.Max(float64(p.pc.queueSize/2), 1))
if ap >= flowThreshold {
availablePermits := ap
requestedPermits := ap
// check if permits changed
if !atomic.CompareAndSwapInt32(&p.permits, ap, 0) {
return
}

p.pc.log.Debugf("requesting more permits=%d available=%d", requestedPermits, availablePermits)
if err := p.pc.internalFlow(uint32(requestedPermits)); err != nil {
p.pc.log.WithError(err).Error("unable to send permits")
}
}
}

func (p *availablePermits) reset() {
atomic.StoreInt32(&p.permits, 0)
}

type schemaInfoCache struct {
lock sync.RWMutex
cache map[string]Schema
Expand Down Expand Up @@ -241,8 +263,8 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon
dlq: dlq,
metrics: metrics,
schemaInfoCache: newSchemaInfoCache(client, options.topic),
availablePermitsCh: make(chan permitsReq, 10),
}
pc.availablePermits = &availablePermits{pc: pc}
pc.chunkedMsgCtxMap = newChunkedMsgCtxMap(options.maxPendingChunkedMessage, pc)
pc.unAckChunksTracker = newUnAckChunksTracker(pc)
pc.setConsumerState(consumerInit)
Expand Down Expand Up @@ -931,14 +953,14 @@ func (pc *partitionConsumer) processMessageChunk(compressedPayload internal.Buff
"Received unexpected chunk messageId %s, last-chunk-id %d, chunkId = %d, total-chunks %d",
msgID.String(), lastChunkedMsgID, chunkID, totalChunks))
pc.chunkedMsgCtxMap.remove(uuid)
pc.availablePermitsCh <- permitsInc
pc.availablePermits.inc()
return nil
}

ctx.append(chunkID, msgID, compressedPayload)

if msgMeta.GetChunkId() != msgMeta.GetNumChunksFromMsg()-1 {
pc.availablePermitsCh <- permitsInc
pc.availablePermits.inc()
return nil
}

Expand Down Expand Up @@ -1075,7 +1097,7 @@ func (pc *partitionConsumer) dispatcher() {
messages = nil

// reset available permits
pc.availablePermitsCh <- permitsReset
pc.availablePermits.reset()
initialPermits := uint32(pc.queueSize)

pc.log.Debugf("dispatcher requesting initial permits=%d", initialPermits)
Expand All @@ -1098,15 +1120,7 @@ func (pc *partitionConsumer) dispatcher() {
messages[0] = nil
messages = messages[1:]

pc.availablePermitsCh <- permitsInc

case pr := <-pc.availablePermitsCh:
switch pr {
case permitsInc:
pc.increasePermitsAndRequestMoreIfNeed()
case permitsReset:
pc.availablePermits = 0
}
pc.availablePermits.inc()

case clearQueueCb := <-pc.clearQueueCh:
// drain the message queue on any new connection by sending a
Expand Down Expand Up @@ -1136,7 +1150,7 @@ func (pc *partitionConsumer) dispatcher() {
messages = nil

// reset available permits
pc.availablePermitsCh <- permitsReset
pc.availablePermits.reset()
initialPermits := uint32(pc.queueSize)

pc.log.Debugf("dispatcher requesting initial permits=%d", initialPermits)
Expand Down Expand Up @@ -1576,25 +1590,7 @@ func (pc *partitionConsumer) discardCorruptedMessage(msgID *pb.MessageIdData,
if err != nil {
pc.log.Error("Connection was closed when request ack cmd")
}
pc.availablePermitsCh <- permitsInc
}

func (pc *partitionConsumer) increasePermitsAndRequestMoreIfNeed() {
// TODO implement a better flow controller
// send more permits if needed
flowThreshold := int32(math.Max(float64(pc.queueSize/2), 1))
pc.availablePermits++
ap := pc.availablePermits
if ap >= flowThreshold {
availablePermits := ap
requestedPermits := ap
pc.availablePermitsCh <- permitsReset

pc.log.Debugf("requesting more permits=%d available=%d", requestedPermits, availablePermits)
if err := pc.internalFlow(uint32(requestedPermits)); err != nil {
pc.log.WithError(err).Error("unable to send permits")
}
}
pc.availablePermits.inc()
}

// _setConn sets the internal connection field of this partition consumer atomically.
Expand Down