Skip to content

Commit

Permalink
Added semaphore implementation with lower contention (#298)
Browse files Browse the repository at this point in the history
  • Loading branch information
merlimat authored Jun 24, 2020
1 parent 549874b commit cc08fd6
Show file tree
Hide file tree
Showing 3 changed files with 135 additions and 28 deletions.
92 changes: 73 additions & 19 deletions pulsar/internal/semaphore.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,81 @@

package internal

// Semaphore is a channel of bool, used to receive a bool type semaphore.
type Semaphore chan bool
import (
"sync/atomic"

// Acquire a permit from this semaphore, blocking until one is available.
log "github.com/sirupsen/logrus"
)

// Acquire a permit, if one is available and returns immediately,
// reducing the number of available permits by one.
func (s Semaphore) Acquire() {
s <- true
type Semaphore interface {
// Acquire a permit, if one is available and returns immediately,
// reducing the number of available permits by one.
Acquire()

// Try to acquire a permit. The method will return immediately
// with a `true` if it was possible to acquire a permit and
// `false` otherwise.
TryAcquire() bool

// Release a permit, returning it to the semaphore.
// Release a permit, increasing the number of available permits by
// one. If any threads are trying to acquire a permit, then one is
// selected and given the permit that was just released. That thread
// is (re)enabled for thread scheduling purposes.
// There is no requirement that a thread that releases a permit must
// have acquired that permit by calling Acquire().
// Correct usage of a semaphore is established by programming convention
// in the application.
Release()
}

type semaphore struct {
maxPermits int32
permits int32
ch chan bool
}

func NewSemaphore(maxPermits int32) Semaphore {
if maxPermits <= 0 {
log.Fatal("Max permits for semaphore needs to be > 0")
}

return &semaphore{
maxPermits: maxPermits,
permits: 0,
ch: make(chan bool),
}
}

func (s *semaphore) Acquire() {
permits := atomic.AddInt32(&s.permits, 1)
if permits <= s.maxPermits {
return
}

// Block on the channel until a new permit is available
<-s.ch
}

func (s *semaphore) TryAcquire() bool {
for {
currentPermits := atomic.LoadInt32(&s.permits)
if currentPermits >= s.maxPermits {
// All the permits are already exhausted
return false
}

if atomic.CompareAndSwapInt32(&s.permits, currentPermits, currentPermits+1) {
// Successfully incremented counter
return true
}
}
}

// Release a permit, returning it to the semaphore.

// Release a permit, increasing the number of available permits by
// one. If any threads are trying to acquire a permit, then one is
// selected and given the permit that was just released. That thread
// is (re)enabled for thread scheduling purposes.
// There is no requirement that a thread that releases a permit must
// have acquired that permit by calling Acquire().
// Correct usage of a semaphore is established by programming convention
// in the application.
func (s Semaphore) Release() {
<-s
func (s *semaphore) Release() {
permits := atomic.AddInt32(&s.permits, -1)
if permits >= s.maxPermits {
// Unblock the next in line to acquire the semaphore
s.ch <- true
}
}
60 changes: 60 additions & 0 deletions pulsar/internal/semaphore_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package internal

import (
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestSemaphore(t *testing.T) {
s := NewSemaphore(3)

const n = 10

wg := sync.WaitGroup{}
wg.Add(n)

for i := 0; i < n; i++ {
go func() {
s.Acquire()
time.Sleep(100 * time.Millisecond)
s.Release()
wg.Done()
}()
}

wg.Wait()
}

func TestSemaphore_TryAcquire(t *testing.T) {
s := NewSemaphore(1)

s.Acquire()

assert.False(t, s.TryAcquire())

s.Release()

assert.True(t, s.TryAcquire())
assert.False(t, s.TryAcquire())
s.Release()
}
11 changes: 2 additions & 9 deletions pulsar/producer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func newPartitionProducer(client *client, topic string, options *ProducerOptions
producerID: client.rpcClient.NewProducerID(),
eventsChan: make(chan interface{}, maxPendingMessages),
batchFlushTicker: time.NewTicker(batchingMaxPublishDelay),
publishSemaphore: make(internal.Semaphore, maxPendingMessages),
publishSemaphore: internal.NewSemaphore(int32(maxPendingMessages)),
pendingQueue: internal.NewBlockingQueue(maxPendingMessages),
lastSequenceID: -1,
partitionIdx: partitionIdx,
Expand Down Expand Up @@ -387,14 +387,7 @@ func (p *partitionProducer) Send(ctx context.Context, msg *ProducerMessage) (Mes

func (p *partitionProducer) SendAsync(ctx context.Context, msg *ProducerMessage,
callback func(MessageID, *ProducerMessage, error)) {
p.publishSemaphore.Acquire()
sr := &sendRequest{
ctx: ctx,
msg: msg,
callback: callback,
flushImmediately: false,
}
p.eventsChan <- sr
p.internalSendAsync(ctx, msg, callback, false)
}

func (p *partitionProducer) internalSendAsync(ctx context.Context, msg *ProducerMessage,
Expand Down

0 comments on commit cc08fd6

Please sign in to comment.