Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: [Issue 218] Add Seek functions to Reader #222

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 21 additions & 1 deletion pulsar/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@

package pulsar

import "context"
import (
"context"
"time"
)

// ReaderMessage package Reader and Message as a struct to use
type ReaderMessage struct {
Expand Down Expand Up @@ -88,4 +91,21 @@ type Reader interface {

// Close the reader and stop the broker to push more messages
Close()

// Reset the subscription associated with this reader to a specific message id.
// The message id can either be a specific message or represent the first or last messages in the topic.
//
// Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the
// seek() on the individual partitions.
Seek(MessageID) error

// Reset the subscription associated with this reader to a specific message publish time.
//
// Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the seek() on
// the individual partitions.
//
// @param timestamp
// the message publish time where to reposition the subscription
//
SeekByTime(time time.Time) error
}
31 changes: 31 additions & 0 deletions pulsar/reader_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package pulsar

import (
"context"
"time"

log "github.com/sirupsen/logrus"
)
Expand Down Expand Up @@ -150,3 +151,33 @@ func (r *reader) hasMoreMessages() bool {
func (r *reader) Close() {
r.pc.Close()
}

func (r *reader) messageID(msgID MessageID) (*messageID, bool) {
mid, ok := msgID.(*messageID)
if !ok {
r.log.Warnf("invalid message id type")
return nil, false
}

partition := mid.partitionIdx
// did we receive a valid partition index?
if partition != 0 {
r.log.Warnf("invalid partition index %d expected 0", partition)
return nil, false
}

return mid, true
}
Comment on lines +155 to +170
Copy link
Contributor

@nitishv nitishv Jul 6, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given the method signature, the purpose of this method is to convert MessageID to get *messageID
In doing so, a failure in type assertion is not really a failure. The given msgID can still be converted. Refer to a this fix I just made #305
I think that the mentioned PR change can be re-factored into this unexported method for reader, as it being used in couple of places now with this feature.

Also checking for non-zero partition index does not seem to be the purpose of this method, and should not be included here. That check is only relevant to Seek APIs, and should be moved there.


func (r *reader) Seek(msgID MessageID) error {
mid, ok := r.messageID(msgID)
if !ok {
return nil
}

return r.pc.Seek(mid)
}

func (r *reader) SeekByTime(time time.Time) error {
return r.pc.SeekByTime(time)
}
108 changes: 108 additions & 0 deletions pulsar/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ import (
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/apache/pulsar-client-go/pulsar/internal"
)

func TestReaderConfigErrors(t *testing.T) {
Expand Down Expand Up @@ -362,3 +365,108 @@ func TestReaderHasNext(t *testing.T) {

assert.Equal(t, 10, i)
}

func TestReaderSeek(t *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There can be an added test case which does a Seek on a custom MessageID refer to #305

client, err := NewClient(ClientOptions{
URL: lookupURL,
})
require.Nil(t, err)
defer client.Close()

topicName := newTopicName()
ctx := context.Background()

producer, err := client.CreateProducer(ProducerOptions{
Topic: topicName,
})
require.Nil(t, err)
defer producer.Close()

reader, err := client.CreateReader(ReaderOptions{
Topic: topicName,
StartMessageID: EarliestMessageID(),
})
require.Nil(t, err)
defer reader.Close()

const N = 10
var seekID MessageID
for i := 0; i < N; i++ {
id, err := producer.Send(ctx, &ProducerMessage{
Payload: []byte(fmt.Sprintf("hello-%d", i)),
})
require.Nil(t, err)

if i == 4 {
seekID = id
}
}
err = producer.Flush()
assert.NoError(t, err)

for i := 0; i < N; i++ {
msg, err := reader.Next(context.Background())
assert.Nil(t, err)
assert.Equal(t, fmt.Sprintf("hello-%d", i), string(msg.Payload()))
}

err = reader.Seek(seekID)
require.Nil(t, err)

msg, err := reader.Next(context.Background())
assert.Nil(t, err)
assert.Equal(t, "hello-4", string(msg.Payload()))
}

func TestReaderSeekByTime(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
})
require.Nil(t, err)
defer client.Close()

topicName := newTopicName()
ctx := context.Background()

producer, err := client.CreateProducer(ProducerOptions{
Topic: topicName,
DisableBatching: false,
})
require.Nil(t, err)
defer producer.Close()

reader, err := client.CreateReader(ReaderOptions{
Topic: topicName,
StartMessageID: LatestMessageID(),
})
require.Nil(t, err)
defer reader.Close()

const N = 10
resetTimeStr := "100s"
retentionTimeInSecond, err := internal.ParseRelativeTimeInSeconds(resetTimeStr)
assert.Nil(t, err)

for i := 0; i < N; i++ {
_, err := producer.Send(ctx, &ProducerMessage{
Payload: []byte(fmt.Sprintf("hello-%d", i)),
})
require.Nil(t, err)
}

for i := 0; i < N; i++ {
msg, err := reader.Next(context.Background())
assert.Nil(t, err)
assert.Equal(t, fmt.Sprintf("hello-%d", i), string(msg.Payload()))
}

currentTimestamp := time.Now()
err = reader.SeekByTime(currentTimestamp.Add(-retentionTimeInSecond))
require.Nil(t, err)

for i := 0; i < N; i++ {
msg, err := reader.Next(context.Background())
assert.Nil(t, err)
assert.Equal(t, fmt.Sprintf("hello-%d", i), string(msg.Payload()))
}
}