Skip to content

Commit

Permalink
query + neutrino: Clone job before sending to workmanager
Browse files Browse the repository at this point in the history
Adds CloneReq function field to query.Request struct. Jobs are
cloned in the worker before sending to the workmanager. This would
be useful in coming commits where a job's request is modified
according to the response it gets. Such as in the case of block
header fetching. A CloneReq function is defined in the instance of
GetCFilter, GetCFHeader and GetData requests in this commit as well.

Signed-off-by: Maureen Ononiwu <[email protected]>
  • Loading branch information
Chinwendu20 committed Sep 5, 2023
1 parent f168538 commit dd02e22
Show file tree
Hide file tree
Showing 5 changed files with 140 additions and 19 deletions.
22 changes: 21 additions & 1 deletion blockmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -842,6 +842,7 @@ func (c *checkpointedCFHeadersQuery) requests() []*query.Request {
Req: m,
HandleResp: c.handleResponse,
SendQuery: sendQueryMessageWithEncoding,
CloneReq: cloneMsgCFHeaders,
}
}
return reqs
Expand Down Expand Up @@ -957,12 +958,31 @@ func sendQueryMessageWithEncoding(peer query.Peer, req query.ReqMessage) error {
if !ok {
return errors.New("invalid request type")
}

sp.QueueMessageWithEncoding(request.message, nil, request.encoding)

return nil
}

// cloneMsgCFHeaders clones query.ReqMessage that contains the MsgGetCFHeaders message.
func cloneMsgCFHeaders(req query.ReqMessage) query.ReqMessage {
oldReq, ok := req.(*encodedQuery)
if !ok {
log.Errorf("request not of type *encodedQuery")
}
oldReqMessage, ok := oldReq.message.(*wire.MsgGetCFHeaders)
if !ok {
log.Errorf("request not of type *wire.MsgGetCFHeaders")
}
newReq := &encodedQuery{
message: wire.NewMsgGetCFHeaders(
oldReqMessage.FilterType, oldReqMessage.StartHeight, &oldReqMessage.StopHash,
),
encoding: oldReq.encoding,
priorityIndex: oldReq.priorityIndex,
}
return newReq
}

// getCheckpointedCFHeaders catches a filter header store up with the
// checkpoints we got from the network. It assumes that the filter header store
// matches the checkpoints up to the tip of the store.
Expand Down
35 changes: 35 additions & 0 deletions query.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,24 @@ func (q *cfiltersQuery) request() *query.Request {
Req: msg,
HandleResp: q.handleResponse,
SendQuery: sendQueryMessageWithEncoding,
CloneReq: func(req query.ReqMessage) query.ReqMessage {
oldReq, ok := req.(*encodedQuery)
if !ok {
log.Errorf("request not of type *encodedQuery")
}
oldReqMessage, ok := oldReq.message.(*wire.MsgGetCFilters)
if !ok {
log.Errorf("request not of type *wire.MsgGetCFilters")
}
newReq := &encodedQuery{
message: wire.NewMsgGetCFilters(
oldReqMessage.FilterType, oldReqMessage.StartHeight, &oldReqMessage.StopHash,
),
encoding: oldReq.encoding,
priorityIndex: oldReq.priorityIndex,
}
return newReq
},
}
}

Expand Down Expand Up @@ -926,6 +944,23 @@ func (s *ChainService) GetBlock(blockHash chainhash.Hash,
Req: msg,
HandleResp: handleResp,
SendQuery: sendQueryMessageWithEncoding,
CloneReq: func(req query.ReqMessage) query.ReqMessage {
newMsg := wire.NewMsgGetData()
_ = newMsg.AddInvVect(inv)

oldReq, ok := req.(*encodedQuery)
if !ok {
log.Errorf("request not of type *encodedQuery")
}

newReq := &encodedQuery{
message: newMsg,
encoding: oldReq.encoding,
priorityIndex: oldReq.priorityIndex,
}

return newReq
},
}

// Prepare the query options.
Expand Down
3 changes: 3 additions & 0 deletions query/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,9 @@ type Request struct {
// SendQuery handles sending request to the worker's peer. It returns an error,
// if one is encountered while sending the request.
SendQuery func(peer Peer, request ReqMessage) error

// CloneReq clones the message.
CloneReq func(message ReqMessage) ReqMessage
}

// ReqMessage is an interface which all structs containing information
Expand Down
22 changes: 21 additions & 1 deletion query/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,11 +235,31 @@ nexJobLoop:
// Stop to allow garbage collection.
timeout.Stop()

// This is necessary to avoid a situation where future changes to the job's request affect the current job.
// For example: suppose we want to fetch headers between checkpoints 0 and 20,000. The maximum number of headers
// that a peer can send in one message is 2000. When we receive 2000 headers for one request,
// we update the job's request, changing its startheight and blocklocator to match the next batch of headers
// that we want to fetch. Since we are not done with fetching our target of 20,000 headers,
// we will have to make more changes to the job's request in the future. This could alter previous requests,
// resulting in unwanted behaviour.
resultJob := &queryJob{
index: job.Index(),
Request: &Request{
Req: job.CloneReq(job.Req),
HandleResp: job.Request.HandleResp,
CloneReq: job.Request.CloneReq,
SendQuery: job.Request.SendQuery,
},
cancelChan: job.cancelChan,
tries: job.tries,
timeout: job.timeout,
}

// We have a result ready for the query, hand it off before
// getting a new job.
select {
case results <- &jobResult{
job: job,
job: resultJob,
peer: peer,
err: jobErr,
}:
Expand Down
77 changes: 60 additions & 17 deletions query/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,19 @@ func makeJob() *queryJob {
m.requests <- req.Message()
return nil
},
CloneReq: func(req ReqMessage) ReqMessage {
oldReq := req.(*mockQueryEncoded)

newMsg := &wire.MsgGetData{
InvList: oldReq.message.InvList,
}

clone := &mockQueryEncoded{
message: newMsg,
}

return clone
},
}

return &queryJob{
Expand Down Expand Up @@ -209,9 +222,15 @@ func TestWorkerIgnoreMsgs(t *testing.T) {
t.Fatalf("response error: %v", result.err)
}

// Make sure the result was given for the intended job.
if result.job != task {
t.Fatalf("got result for unexpected job")
// Make sure the QueryJob instance in the result is different from the initial one
// supplied to the worker
if result.job == task {
t.Fatalf("result's job should be different from the task's")
}

// Make sure we are receiving the corresponding result for the given task.
if result.job.Index() != task.Index() {
t.Fatalf("result's job index should not be different from task's")
}

// And the correct peer.
Expand Down Expand Up @@ -264,9 +283,15 @@ func TestWorkerTimeout(t *testing.T) {
t.Fatalf("expected timeout, got: %v", result.err)
}

// Make sure the result was given for the intended job.
if result.job != task {
t.Fatalf("got result for unexpected job")
// Make sure the QueryJob instance in the result is different from the initial one
// supplied to the worker
if result.job == task {
t.Fatalf("result's job should be different from the task's")
}

// Make sure we are receiving the corresponding result for the given task.
if result.job.Index() != task.Index() {
t.Fatalf("result's job index should not be different from task's")
}

// And the correct peer.
Expand Down Expand Up @@ -323,9 +348,15 @@ func TestWorkerDisconnect(t *testing.T) {
t.Fatalf("expected peer disconnect, got: %v", result.err)
}

// Make sure the result was given for the intended job.
if result.job != task {
t.Fatalf("got result for unexpected job")
// Make sure the QueryJob instance in the result is different from the initial one
// supplied to the worker
if result.job == task {
t.Fatalf("result's job should be different from the task's")
}

// Make sure we are receiving the corresponding result for the given task.
if result.job.Index() != task.Index() {
t.Fatalf("result's job index should not be different from task's")
}

// And the correct peer.
Expand Down Expand Up @@ -411,9 +442,15 @@ func TestWorkerProgress(t *testing.T) {
t.Fatalf("expected no error, got: %v", result.err)
}

// Make sure the result was given for the intended task.
if result.job != task {
t.Fatalf("got result for unexpected job")
// Make sure the QueryJob instance in the result is different from the initial one
// supplied to the worker
if result.job == task {
t.Fatalf("result's job should be different from the task's")
}

// Make sure we are receiving the corresponding result for the given task.
if result.job.Index() != task.Index() {
t.Fatalf("result's job index should not be different from task's")
}

// And the correct peer.
Expand Down Expand Up @@ -484,9 +521,15 @@ func TestWorkerJobCanceled(t *testing.T) {
t.Fatalf("expected job canceled, got: %v", result.err)
}

// Make sure the result was given for the intended task.
if result.job != task {
t.Fatalf("got result for unexpected job")
// Make sure the QueryJob instance in the result is different from the initial one
// supplied to the worker
if result.job == task {
t.Fatalf("result's job should be different from the task's")
}

// Make sure we are receiving the corresponding result for the given task.
if result.job.Index() != task.Index() {
t.Fatalf("result's job index should not be different from task's")
}

// And the correct peer.
Expand Down Expand Up @@ -546,9 +589,9 @@ func TestWorkerSendQueryErr(t *testing.T) {
ctx.peer.err, result.err)
}

// Make sure the result was given for the intended task.
// Make sure the QueryJob instance in the result is same as the taskJob's.
if result.job != taskJob {
t.Fatalf("got result for unexpected job")
t.Fatalf("result's job should be same as the taskJob's")
}

// And the correct peer.
Expand Down

0 comments on commit dd02e22

Please sign in to comment.