Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create an UpdateDispatcher interface to decouple Dispatcher and Updater components #120

Merged
merged 5 commits into from
Nov 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion ext/common_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package ext

import (
"errors"

"github.com/PaulSonOfLars/gotgbot/v2"
)

Expand All @@ -21,11 +23,17 @@ func (d DummyHandler) Name() string {
return "dummy" + d.N
}

var ErrBadDispatcher = errors.New("can only inject updates if the dispatcher is of type *Dispatcher")

func (u *Updater) InjectUpdate(token string, upd gotgbot.Update) error {
bData, ok := u.botMapping.getBot(token)
if !ok {
return ErrNotFound
}

return u.Dispatcher.ProcessUpdate(bData.bot, &upd, nil)
d, ok := u.Dispatcher.(*Dispatcher)
if !ok {
return ErrBadDispatcher
}
return d.ProcessUpdate(bData.bot, &upd, nil)
}
19 changes: 16 additions & 3 deletions ext/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,18 @@ var (
ContinueGroups = errors.New("group iteration continued")
)

// The UpdateDispatcher interface is used to abstract away common Dispatcher implementations.
// It assumes that all incoming updates come through a JSON channel.
type UpdateDispatcher interface {
Start(b *gotgbot.Bot, updates <-chan json.RawMessage)
Stop()
}

// The Dispatcher struct is the default UpdateDispatcher implementation.
// It supports grouping of update handlers, allowing for powerful update handling flows.
// Customise the handling of updates by wrapping the Processor struct.
type Dispatcher struct {
// Processor defines how to process the raw updates being processed by the Dispatcher.
// Processor defines how to process the raw updates being handled by the Dispatcher.
// This can be extended to include additional error handling, metrics, etc.
Processor Processor

Expand Down Expand Up @@ -80,6 +90,9 @@ type Dispatcher struct {
waitGroup sync.WaitGroup
}

// Ensure compile-time type safety.
var _ UpdateDispatcher = &Dispatcher{}

// DispatcherOpts can be used to configure or override default Dispatcher behaviours.
type DispatcherOpts struct {
// Processor allows for providing custom Processor interfaces with different behaviours.
Expand Down Expand Up @@ -109,7 +122,7 @@ type DispatcherOpts struct {
MaxRoutines int
}

// NewDispatcher creates a new dispatcher, which process and handles incoming updates from the updates channel.
// NewDispatcher creates a new Dispatcher, which process and handles incoming updates from the updates channel.
func NewDispatcher(opts *DispatcherOpts) *Dispatcher {
var errHandler DispatcherErrorHandler
var panicHandler DispatcherPanicHandler
Expand Down Expand Up @@ -175,7 +188,7 @@ func (d *Dispatcher) MaxUsage() int {

// Start to handle incoming updates.
// This is a blocking method; it should be called as a goroutine, such that it can receive incoming updates.
func (d *Dispatcher) Start(b *gotgbot.Bot, updates chan json.RawMessage) {
func (d *Dispatcher) Start(b *gotgbot.Bot, updates <-chan json.RawMessage) {
// Listen to updates as they come in from the updater.
for upd := range updates {
d.waitGroup.Add(1)
Expand Down
17 changes: 5 additions & 12 deletions ext/updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ type ErrorFunc func(error)

type Updater struct {
// Dispatcher is where all the incoming updates are sent to be processed.
Dispatcher *Dispatcher
// The Dispatcher runs in a separate goroutine, allowing for parallel update processing and dispatching.
// Once the Updater has received an update, it sends it to the Dispatcher over a JSON channel.
Dispatcher UpdateDispatcher

// UnhandledErrFunc provides more flexibility for dealing with previously unhandled errors, such as failures to get
// updates (when long-polling), or failures to unmarshal.
Expand Down Expand Up @@ -54,23 +56,14 @@ type UpdaterOpts struct {
// ErrorLog specifies an optional logger for unexpected behavior from handlers.
// If nil, logging is done via the log package's standard logger.
ErrorLog *log.Logger
// The dispatcher instance to be used by the updater.
Dispatcher *Dispatcher
}

// NewUpdater Creates a new Updater, as well as the necessary structures required for the associated Dispatcher.
func NewUpdater(opts *UpdaterOpts) *Updater {
// NewUpdater Creates a new Updater, as well as a Dispatcher and any optional updater configurations (via UpdaterOpts).
func NewUpdater(dispatcher UpdateDispatcher, opts *UpdaterOpts) *Updater {
var unhandledErrFunc ErrorFunc
var errLog *log.Logger

// Default dispatcher, no special settings.
dispatcher := NewDispatcher(nil)

if opts != nil {
if opts.Dispatcher != nil {
dispatcher = opts.Dispatcher
}

unhandledErrFunc = opts.UnhandledErrFunc
errLog = opts.ErrorLog
}
Expand Down
16 changes: 8 additions & 8 deletions ext/updater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func TestUpdaterThrowsErrorWhenSameWebhookAddedTwice(t *testing.T) {
}

d := ext.NewDispatcher(&ext.DispatcherOpts{})
u := ext.NewUpdater(&ext.UpdaterOpts{Dispatcher: d})
u := ext.NewUpdater(d, nil)

err := u.AddWebhook(b, "test", ext.WebhookOpts{})
if err != nil {
Expand All @@ -47,7 +47,7 @@ func TestUpdaterSupportsWebhookReAdding(t *testing.T) {
}

d := ext.NewDispatcher(&ext.DispatcherOpts{})
u := ext.NewUpdater(&ext.UpdaterOpts{Dispatcher: d})
u := ext.NewUpdater(d, nil)

err := u.AddWebhook(b, "test", ext.WebhookOpts{})
if err != nil {
Expand Down Expand Up @@ -76,7 +76,7 @@ func TestUpdaterDisallowsEmptyWebhooks(t *testing.T) {
}

d := ext.NewDispatcher(&ext.DispatcherOpts{})
u := ext.NewUpdater(&ext.UpdaterOpts{Dispatcher: d})
u := ext.NewUpdater(d, nil)

err := u.AddWebhook(b, "", ext.WebhookOpts{})
if !errors.Is(err, ext.ErrEmptyPath) {
Expand Down Expand Up @@ -104,7 +104,7 @@ func TestUpdaterAllowsWebhookDeletion(t *testing.T) {
}

d := ext.NewDispatcher(&ext.DispatcherOpts{})
u := ext.NewUpdater(&ext.UpdaterOpts{Dispatcher: d})
u := ext.NewUpdater(d, nil)

err := u.StartPolling(b, &ext.PollingOpts{
EnableWebhookDeletion: true,
Expand Down Expand Up @@ -143,7 +143,7 @@ func TestUpdaterSupportsTwoPollingBots(t *testing.T) {
}

d := ext.NewDispatcher(&ext.DispatcherOpts{})
u := ext.NewUpdater(&ext.UpdaterOpts{Dispatcher: d})
u := ext.NewUpdater(d, nil)

err := u.StartPolling(b1, &ext.PollingOpts{
GetUpdatesOpts: &gotgbot.GetUpdatesOpts{
Expand Down Expand Up @@ -185,7 +185,7 @@ func TestUpdaterThrowsErrorWhenSameLongPollAddedTwice(t *testing.T) {
}

d := ext.NewDispatcher(&ext.DispatcherOpts{})
u := ext.NewUpdater(&ext.UpdaterOpts{Dispatcher: d})
u := ext.NewUpdater(d, nil)

err := u.StartPolling(b, &ext.PollingOpts{
GetUpdatesOpts: &gotgbot.GetUpdatesOpts{
Expand Down Expand Up @@ -228,7 +228,7 @@ func TestUpdaterSupportsLongPollReAdding(t *testing.T) {
}

d := ext.NewDispatcher(&ext.DispatcherOpts{})
u := ext.NewUpdater(&ext.UpdaterOpts{Dispatcher: d})
u := ext.NewUpdater(d, nil)

err := u.StartPolling(b, &ext.PollingOpts{
GetUpdatesOpts: &gotgbot.GetUpdatesOpts{RequestOpts: reqOpts},
Expand Down Expand Up @@ -303,7 +303,7 @@ func BenchmarkUpdaterMultibots(b *testing.B) {

func benchmarkUpdaterWithNBots(b *testing.B, numBot int) {
d := ext.NewDispatcher(nil)
u := ext.NewUpdater(&ext.UpdaterOpts{Dispatcher: d})
u := ext.NewUpdater(d, nil)

wg := sync.WaitGroup{}
d.AddHandler(ext.DummyHandler{F: func(b *gotgbot.Bot, ctx *ext.Context) error {
Expand Down
18 changes: 8 additions & 10 deletions samples/callbackqueryBot/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,15 @@ func main() {
}

// Create updater and dispatcher.
updater := ext.NewUpdater(&ext.UpdaterOpts{
Dispatcher: ext.NewDispatcher(&ext.DispatcherOpts{
// If an error is returned by a handler, log it and continue going.
Error: func(b *gotgbot.Bot, ctx *ext.Context, err error) ext.DispatcherAction {
log.Println("an error occurred while handling update:", err.Error())
return ext.DispatcherActionNoop
},
MaxRoutines: ext.DefaultMaxRoutines,
}),
dispatcher := ext.NewDispatcher(&ext.DispatcherOpts{
// If an error is returned by a handler, log it and continue going.
Error: func(b *gotgbot.Bot, ctx *ext.Context, err error) ext.DispatcherAction {
log.Println("an error occurred while handling update:", err.Error())
return ext.DispatcherActionNoop
},
MaxRoutines: ext.DefaultMaxRoutines,
})
dispatcher := updater.Dispatcher
updater := ext.NewUpdater(dispatcher, nil)

// /start command to introduce the bot
dispatcher.AddHandler(handlers.NewCommand("start", start))
Expand Down
18 changes: 8 additions & 10 deletions samples/commandBot/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,15 @@ func main() {
}

// Create updater and dispatcher.
updater := ext.NewUpdater(&ext.UpdaterOpts{
Dispatcher: ext.NewDispatcher(&ext.DispatcherOpts{
// If an error is returned by a handler, log it and continue going.
Error: func(b *gotgbot.Bot, ctx *ext.Context, err error) ext.DispatcherAction {
log.Println("an error occurred while handling update:", err.Error())
return ext.DispatcherActionNoop
},
MaxRoutines: ext.DefaultMaxRoutines,
}),
dispatcher := ext.NewDispatcher(&ext.DispatcherOpts{
// If an error is returned by a handler, log it and continue going.
Error: func(b *gotgbot.Bot, ctx *ext.Context, err error) ext.DispatcherAction {
log.Println("an error occurred while handling update:", err.Error())
return ext.DispatcherActionNoop
},
MaxRoutines: ext.DefaultMaxRoutines,
})
dispatcher := updater.Dispatcher
updater := ext.NewUpdater(dispatcher, nil)

// /start command to introduce the bot
dispatcher.AddHandler(handlers.NewCommand("start", start))
Expand Down
18 changes: 8 additions & 10 deletions samples/conversationBot/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,15 @@ func main() {
}

// Create updater and dispatcher.
updater := ext.NewUpdater(&ext.UpdaterOpts{
Dispatcher: ext.NewDispatcher(&ext.DispatcherOpts{
// If an error is returned by a handler, log it and continue going.
Error: func(b *gotgbot.Bot, ctx *ext.Context, err error) ext.DispatcherAction {
log.Println("an error occurred while handling update:", err.Error())
return ext.DispatcherActionNoop
},
MaxRoutines: ext.DefaultMaxRoutines,
}),
dispatcher := ext.NewDispatcher(&ext.DispatcherOpts{
// If an error is returned by a handler, log it and continue going.
Error: func(b *gotgbot.Bot, ctx *ext.Context, err error) ext.DispatcherAction {
log.Println("an error occurred while handling update:", err.Error())
return ext.DispatcherActionNoop
},
MaxRoutines: ext.DefaultMaxRoutines,
})
dispatcher := updater.Dispatcher
updater := ext.NewUpdater(dispatcher, nil)

dispatcher.AddHandler(handlers.NewConversation(
[]ext.Handler{handlers.NewCommand("start", start)},
Expand Down
18 changes: 8 additions & 10 deletions samples/echoBot/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,15 @@ func main() {
}

// Create updater and dispatcher.
updater := ext.NewUpdater(&ext.UpdaterOpts{
Dispatcher: ext.NewDispatcher(&ext.DispatcherOpts{
// If an error is returned by a handler, log it and continue going.
Error: func(b *gotgbot.Bot, ctx *ext.Context, err error) ext.DispatcherAction {
log.Println("an error occurred while handling update:", err.Error())
return ext.DispatcherActionNoop
},
MaxRoutines: ext.DefaultMaxRoutines,
}),
dispatcher := ext.NewDispatcher(&ext.DispatcherOpts{
// If an error is returned by a handler, log it and continue going.
Error: func(b *gotgbot.Bot, ctx *ext.Context, err error) ext.DispatcherAction {
log.Println("an error occurred while handling update:", err.Error())
return ext.DispatcherActionNoop
},
MaxRoutines: ext.DefaultMaxRoutines,
})
dispatcher := updater.Dispatcher
updater := ext.NewUpdater(dispatcher, nil)

// Add echo handler to reply to all text messages.
dispatcher.AddHandler(handlers.NewMessage(message.Text, echo))
Expand Down
19 changes: 8 additions & 11 deletions samples/echoMultiBot/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,15 @@ func main() {
webhookSecret := os.Getenv("WEBHOOK_SECRET")

// Create updater and dispatcher.
updater := ext.NewUpdater(&ext.UpdaterOpts{
ErrorLog: nil,
Dispatcher: ext.NewDispatcher(&ext.DispatcherOpts{
// If an error is returned by a handler, log it and continue going.
Error: func(b *gotgbot.Bot, ctx *ext.Context, err error) ext.DispatcherAction {
log.Println("an error occurred while handling update:", err.Error())
return ext.DispatcherActionNoop
},
MaxRoutines: ext.DefaultMaxRoutines,
}),
dispatcher := ext.NewDispatcher(&ext.DispatcherOpts{
// If an error is returned by a handler, log it and continue going.
Error: func(b *gotgbot.Bot, ctx *ext.Context, err error) ext.DispatcherAction {
log.Println("an error occurred while handling update:", err.Error())
return ext.DispatcherActionNoop
},
MaxRoutines: ext.DefaultMaxRoutines,
})
dispatcher := updater.Dispatcher
updater := ext.NewUpdater(dispatcher, nil)

// Add stop handler to stop all bots gracefully.
dispatcher.AddHandler(handlers.NewCommand("stop", func(b *gotgbot.Bot, ctx *ext.Context) error {
Expand Down
18 changes: 8 additions & 10 deletions samples/echoWebhookBot/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,17 +45,15 @@ func main() {
}

// Create updater and dispatcher.
updater := ext.NewUpdater(&ext.UpdaterOpts{
Dispatcher: ext.NewDispatcher(&ext.DispatcherOpts{
// If an error is returned by a handler, log it and continue going.
Error: func(b *gotgbot.Bot, ctx *ext.Context, err error) ext.DispatcherAction {
log.Println("an error occurred while handling update:", err.Error())
return ext.DispatcherActionNoop
},
MaxRoutines: ext.DefaultMaxRoutines,
}),
dispatcher := ext.NewDispatcher(&ext.DispatcherOpts{
// If an error is returned by a handler, log it and continue going.
Error: func(b *gotgbot.Bot, ctx *ext.Context, err error) ext.DispatcherAction {
log.Println("an error occurred while handling update:", err.Error())
return ext.DispatcherActionNoop
},
MaxRoutines: ext.DefaultMaxRoutines,
})
dispatcher := updater.Dispatcher
updater := ext.NewUpdater(dispatcher, nil)

// Add echo handler to reply to all text messages.
dispatcher.AddHandler(handlers.NewMessage(message.Text, echo))
Expand Down
18 changes: 8 additions & 10 deletions samples/inlinequeryBot/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,15 @@ func main() {
}

// Create updater and dispatcher.
updater := ext.NewUpdater(&ext.UpdaterOpts{
Dispatcher: ext.NewDispatcher(&ext.DispatcherOpts{
// If an error is returned by a handler, log it and continue going.
Error: func(b *gotgbot.Bot, ctx *ext.Context, err error) ext.DispatcherAction {
log.Println("an error occurred while handling update:", err.Error())
return ext.DispatcherActionNoop
},
MaxRoutines: ext.DefaultMaxRoutines,
}),
dispatcher := ext.NewDispatcher(&ext.DispatcherOpts{
// If an error is returned by a handler, log it and continue going.
Error: func(b *gotgbot.Bot, ctx *ext.Context, err error) ext.DispatcherAction {
log.Println("an error occurred while handling update:", err.Error())
return ext.DispatcherActionNoop
},
MaxRoutines: ext.DefaultMaxRoutines,
})
dispatcher := updater.Dispatcher
updater := ext.NewUpdater(dispatcher, nil)

// Create an inline query handler to reply to all inline queries
dispatcher.AddHandler(handlers.NewInlineQuery(inlinequery.All, source))
Expand Down
2 changes: 1 addition & 1 deletion samples/metricsBot/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func main() {
go monitorDispatcherBuffer(dispatcher)

// Create the updater with our customised dispatcher.
updater := ext.NewUpdater(&ext.UpdaterOpts{Dispatcher: dispatcher})
updater := ext.NewUpdater(dispatcher, nil)

// Add echo handler to reply to all text messages.
dispatcher.AddHandler(handlers.NewMessage(message.Text, echo))
Expand Down
Loading