Skip to content

Commit

Permalink
Client needs to generate random request IDs. Fixes #114
Browse files Browse the repository at this point in the history
  • Loading branch information
gammazero committed Jun 24, 2018
1 parent ce0cbc9 commit 3c5e24a
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 21 deletions.
88 changes: 75 additions & 13 deletions aat/rpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package aat

import (
"context"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -42,34 +43,95 @@ func TestRPCRegisterAndCall(t *testing.T) {
t.Fatal("Failed to connect client:", err)
}

// Test calling the procedure.
callArgs := wamp.List{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
ctx := context.Background()
result, err := caller.Call(ctx, procName, nil, callArgs, nil, "")
// Connect second caller session.
caller2, err := connectClient()
if err != nil {
t.Fatal("Failed to call procedure:", err)
t.Fatal("Failed to connect client:", err)
}
sum, ok := wamp.AsInt64(result.Arguments[0])
if !ok {
t.Fatal("Could not convert result to int64")

// Connect third caller session.
caller3, err := connectClient()
if err != nil {
t.Fatal("Failed to connect client:", err)
}
if sum != 55 {
t.Fatal("Wrong result:", sum)

// Test calling the procedure.
callArgs := wamp.List{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
var result1, result2, result3 *wamp.Result
var err1, err2, err3 error
var ready, allDone sync.WaitGroup
release := make(chan struct{})
ready.Add(3)
allDone.Add(3)
go func() {
defer allDone.Done()
ready.Done()
<-release
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
result1, err1 = caller.Call(ctx, procName, nil, callArgs, nil, "")
}()
go func() {
defer allDone.Done()
// Call it with caller2.
ready.Done()
<-release
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
result2, err2 = caller2.Call(ctx, procName, nil, callArgs, nil, "")
}()
go func() {
// Call it with caller3.
defer allDone.Done()
ready.Done()
<-release
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
result3, err3 = caller3.Call(ctx, procName, nil, callArgs, nil, "")
}()

ready.Wait()
close(release)
allDone.Wait()

errs := []error{err1, err2, err3}
results := []*wamp.Result{result1, result2, result3}
for i := 0; i < 3; i++ {
if errs[i] != nil {
t.Error("Caller", i, "failed to call procedure:", errs[i])
} else {
sum, ok := wamp.AsInt64(results[i].Arguments[0])
if !ok {
t.Error("Could not convert result", i, "to int64")
} else if sum != 55 {
t.Errorf("Wrong result %d: %d", i, sum)
}
}
}

// Test unregister.
if err = callee.Unregister(procName); err != nil {
t.Fatal("Failed to unregister procedure:", err)
t.Error("Failed to unregister procedure:", err)
}

err = caller.Close()
if err != nil {
t.Fatal("Failed to disconnect client:", err)
t.Error("Failed to disconnect client:", err)
}

err = caller2.Close()
if err != nil {
t.Error("Failed to disconnect client:", err)
}

err = caller3.Close()
if err != nil {
t.Error("Failed to disconnect client:", err)
}

err = callee.Close()
if err != nil {
t.Fatal("Failed to disconnect client:", err)
t.Error("Failed to disconnect client:", err)
}
}

Expand Down
14 changes: 6 additions & 8 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,6 @@ type Client struct {
progGate map[context.Context]wamp.ID

actionChan chan func()
idGen *wamp.SyncIDGen

stopping chan struct{}
activeInvHandlers sync.WaitGroup
Expand Down Expand Up @@ -191,7 +190,6 @@ func NewClient(p wamp.Peer, cfg Config) (*Client, error) {
progGate: map[context.Context]wamp.ID{},

actionChan: make(chan func()),
idGen: new(wamp.SyncIDGen),
stopping: make(chan struct{}),
done: make(chan struct{}),

Expand Down Expand Up @@ -252,7 +250,7 @@ func (c *Client) Subscribe(topic string, fn EventHandler, options wamp.Dict) err
if options == nil {
options = wamp.Dict{}
}
id := c.idGen.Next()
id := wamp.GlobalID()
c.expectReply(id)
c.sess.Send(&wamp.Subscribe{
Request: id,
Expand Down Expand Up @@ -322,7 +320,7 @@ func (c *Client) Unsubscribe(topic string) error {
return err
}

id := c.idGen.Next()
id := wamp.GlobalID()
c.expectReply(id)
c.sess.Send(&wamp.Unsubscribe{
Request: id,
Expand Down Expand Up @@ -382,7 +380,7 @@ func (c *Client) Publish(topic string, options wamp.Dict, args wamp.List, kwargs
// Check if the client is asking for a PUBLISHED response.
pubAck, _ := options[wamp.OptAcknowledge].(bool)

id := c.idGen.Next()
id := wamp.GlobalID()
if pubAck {
c.expectReply(id)
}
Expand Down Expand Up @@ -443,7 +441,7 @@ type InvocationHandler func(context.Context, wamp.List, wamp.Dict, wamp.Dict) (r
//
// NOTE: Use consts defined in wamp/options.go instead of raw strings.
func (c *Client) Register(procedure string, fn InvocationHandler, options wamp.Dict) error {
id := c.idGen.Next()
id := wamp.GlobalID()
c.expectReply(id)
c.sess.Send(&wamp.Register{
Request: id,
Expand Down Expand Up @@ -517,7 +515,7 @@ func (c *Client) Unregister(procedure string) error {
return err
}

id := c.idGen.Next()
id := wamp.GlobalID()
c.expectReply(id)
c.sess.Send(&wamp.Unregister{
Request: id,
Expand Down Expand Up @@ -657,7 +655,7 @@ func (c *Client) CallProgress(ctx context.Context, procedure string, options wam
}()
}

id := c.idGen.Next()
id := wamp.GlobalID()
c.expectReply(id)
c.sess.Send(&wamp.Call{
Request: id,
Expand Down

0 comments on commit 3c5e24a

Please sign in to comment.