Skip to content

Commit

Permalink
[Issue 781][add consumer seek by time on partitioned topic] (#782)
Browse files Browse the repository at this point in the history
* seek and every partition of topic and check for error

* use array to store errors instead of channles

* add test case to test seek by time on partitioned topic

* wrap errors

* refactor method

Co-authored-by: PGarule <[email protected]>
  • Loading branch information
GPrabhudas and PGarule authored Jun 23, 2022
1 parent 2ae909e commit 0f7041f
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 4 deletions.
13 changes: 9 additions & 4 deletions pulsar/consumer_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/apache/pulsar-client-go/pulsar/internal"
pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
"github.com/apache/pulsar-client-go/pulsar/log"
pkgerrors "github.com/pkg/errors"
)

const defaultNackRedeliveryDelay = 1 * time.Minute
Expand Down Expand Up @@ -589,11 +590,15 @@ func (c *consumer) Seek(msgID MessageID) error {
func (c *consumer) SeekByTime(time time.Time) error {
c.Lock()
defer c.Unlock()
if len(c.consumers) > 1 {
return newError(SeekFailed, "for partition topic, seek command should perform on the individual partitions")
var errs error
// run SeekByTime on every partition of topic
for _, cons := range c.consumers {
if err := cons.SeekByTime(time); err != nil {
msg := fmt.Sprintf("unable to SeekByTime for topic=%s subscription=%s", c.topic, c.Subscription())
errs = pkgerrors.Wrap(newError(SeekFailed, err.Error()), msg)
}
}

return c.consumers[0].SeekByTime(time)
return errs
}

var r = &random{
Expand Down
73 changes: 73 additions & 0 deletions pulsar/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3052,3 +3052,76 @@ func TestEncryptDecryptRedeliveryOnFailure(t *testing.T) {
assert.NotNil(t, msg)
consumer.Ack(msg)
}

// TestConsumerSeekByTimeOnPartitionedTopic test seek by time on partitioned topic.
// It is based on existing test case [TestConsumerSeekByTime] but for partitioned topic.
func TestConsumerSeekByTimeOnPartitionedTopic(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
})
assert.Nil(t, err)
defer client.Close()

// Create topic with 5 partitions
topicAdminURL := "admin/v2/persistent/public/default/TestSeekByTimeOnPartitionedTopic/partitions"
err = httpPut(topicAdminURL, 5)
defer httpDelete(topicAdminURL)
assert.Nil(t, err)

topicName := "persistent://public/default/TestSeekByTimeOnPartitionedTopic"

partitions, err := client.TopicPartitions(topicName)
assert.Nil(t, err)
assert.Equal(t, len(partitions), 5)
for i := 0; i < 5; i++ {
assert.Equal(t, partitions[i],
fmt.Sprintf("%s-partition-%d", topicName, i))
}

ctx := context.Background()

producer, err := client.CreateProducer(ProducerOptions{
Topic: topicName,
DisableBatching: false,
})
assert.Nil(t, err)
defer producer.Close()

consumer, err := client.Subscribe(ConsumerOptions{
Topic: topicName,
SubscriptionName: "my-sub",
})
assert.Nil(t, err)
defer consumer.Close()

// Use value bigger than 1000 to full-fill queue channel with size 1000 and message channel with size 10
const N = 1100
resetTimeStr := "100s"
retentionTimeInSecond, err := internal.ParseRelativeTimeInSeconds(resetTimeStr)
assert.Nil(t, err)

for i := 0; i < N; i++ {
_, err := producer.Send(ctx, &ProducerMessage{
Payload: []byte(fmt.Sprintf("hello-%d", i)),
})
assert.Nil(t, err)
}

// Don't consume all messages so some stay in queues
for i := 0; i < N-20; i++ {
msg, err := consumer.Receive(ctx)
assert.Nil(t, err)
consumer.Ack(msg)
}

currentTimestamp := time.Now()
err = consumer.SeekByTime(currentTimestamp.Add(-retentionTimeInSecond))
assert.Nil(t, err)

// should be able to consume all messages once again
for i := 0; i < N; i++ {
msg, err := consumer.Receive(ctx)
assert.Nil(t, err)
consumer.Ack(msg)
}
}

0 comments on commit 0f7041f

Please sign in to comment.