Skip to content

Commit

Permalink
Revert "Use a separate gorutine to handle the logic of reconnect" (#700)
Browse files Browse the repository at this point in the history
* Revert "Use a separate gorutine to handle the logic of reconnect (#691)"

This reverts commit 39e13ac.

* add closeCh for go rutine leak

Signed-off-by: xiaolongran <[email protected]>
  • Loading branch information
wolfstudy authored Jan 6, 2022
1 parent 6d543c6 commit ff7a962
Showing 1 changed file with 12 additions and 8 deletions.
20 changes: 12 additions & 8 deletions pulsar/producer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,9 @@ type partitionProducer struct {
batchFlushTicker *time.Ticker

// Channel where app is posting messages to be published
connectClosedCh chan connectionClosed
eventsChan chan interface{}
closeCh chan struct{}
connectClosedCh chan connectionClosed

publishSemaphore internal.Semaphore
pendingQueue internal.BlockingQueue
Expand Down Expand Up @@ -115,8 +116,9 @@ func newPartitionProducer(client *client, topic string, options *ProducerOptions
log: logger,
options: options,
producerID: client.rpcClient.NewProducerID(),
eventsChan: make(chan interface{}, maxPendingMessages),
connectClosedCh: make(chan connectionClosed, 10),
eventsChan: make(chan interface{}, maxPendingMessages+20),
closeCh: make(chan struct{}),
batchFlushTicker: time.NewTicker(batchingMaxPublishDelay),
publishSemaphore: internal.NewSemaphore(int32(maxPendingMessages)),
pendingQueue: internal.NewBlockingQueue(maxPendingMessages),
Expand Down Expand Up @@ -369,13 +371,13 @@ func (p *partitionProducer) reconnectToBroker() {
}

func (p *partitionProducer) runEventsLoop() {

go func() {
for {
for range p.connectClosedCh {
p.log.Info("runEventsLoop will reconnect in producer")
p.reconnectToBroker()
}
select {
case <-p.closeCh:
return
case <-p.connectClosedCh:
p.log.Info("runEventsLoop will reconnect in producer")
p.reconnectToBroker()
}
}()

Expand Down Expand Up @@ -872,6 +874,8 @@ func (p *partitionProducer) internalClose(req *closeProducer) {
p.setProducerState(producerClosed)
p.cnx.UnregisterListener(p.producerID)
p.batchFlushTicker.Stop()

close(p.closeCh)
}

func (p *partitionProducer) LastSequenceID() int64 {
Expand Down

0 comments on commit ff7a962

Please sign in to comment.