Skip to content

Commit

Permalink
Merge pull request lightninglabs#6 from gcash/txrelay
Browse files Browse the repository at this point in the history
Add option to download unconfirmed transactions
  • Loading branch information
cpacia authored Oct 28, 2018
2 parents 2b416e9 + 70e69d5 commit 1d1c002
Show file tree
Hide file tree
Showing 4 changed files with 268 additions and 12 deletions.
30 changes: 29 additions & 1 deletion Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

97 changes: 96 additions & 1 deletion blockmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"bytes"
"container/list"
"fmt"
"github.com/gcash/bchutil/bloom"
"github.com/go-errors/errors"
"math"
"math/big"
Expand Down Expand Up @@ -88,6 +89,14 @@ type txMsg struct {
peer *ServerPeer
}

// disableTxDownloadMsg tells the peer handler to send a match none
// bloom filter to all peers
type disableTxDownloadMsg struct{}

// enableTxDownloadMsg tells the peer handler to send a match all
//// bloom filter to all peers
type enableTxDownloadMsg struct{}

// blockManager provides a concurrency safe block manager for handling all
// incoming blocks.
type blockManager struct {
Expand Down Expand Up @@ -175,6 +184,8 @@ type blockManager struct {
minRetargetTimespan int64 // target timespan / adjustment factor
maxRetargetTimespan int64 // target timespan * adjustment factor
blocksPerRetarget int32 // target timespan / target time per block

requestedTxns map[chainhash.Hash]struct{}
}

// newBlockManager returns a new bitcoin block manager. Use Start to begin
Expand Down Expand Up @@ -203,6 +214,7 @@ func newBlockManager(s *ChainService) (*blockManager, error) {
blocksPerRetarget: int32(targetTimespan / targetTimePerBlock),
minRetargetTimespan: targetTimespan / adjustmentFactor,
maxRetargetTimespan: targetTimespan * adjustmentFactor,
requestedTxns: make(map[chainhash.Hash]struct{}),
}

// Next we'll create the two signals that goroutines will use to wait
Expand Down Expand Up @@ -1632,12 +1644,21 @@ out:
case *invMsg:
b.handleInvMsg(msg)

case *txMsg:
b.handleTxMsg(msg)

case *headersMsg:
b.handleHeadersMsg(msg)

case *donePeerMsg:
b.handleDonePeerMsg(candidatePeers, msg.peer)

case *disableTxDownloadMsg:
b.handleDisableTxDownloadMsg(candidatePeers)

case *enableTxDownloadMsg:
b.handleEnableTxDownloadMsg(candidatePeers)

default:
log.Warnf("Invalid message type in block "+
"handler: %T", msg)
Expand Down Expand Up @@ -1912,13 +1933,48 @@ func (b *blockManager) QueueInv(inv *wire.MsgInv, sp *ServerPeer) {
}
}

/// QueueTx adds the passed transaction message and peer to the block handling
// queue. Responds to the done channel argument after the tx message is
// processed.
func (b *blockManager) QueueTx(tx *bchutil.Tx, sp *ServerPeer) {
// No channel handling here because peers do not need to block on inv
// messages.
if atomic.LoadInt32(&b.shutdown) != 0 {
return
}

select {
case b.peerChan <- &txMsg{tx: tx, peer: sp}:
case <-b.quit:
return
}
}

// handleInvMsg handles inv messages from all peers.
// We examine the inventory advertised by the remote peer and act accordingly.
func (b *blockManager) handleInvMsg(imsg *invMsg) {
invVects := imsg.inv.InvList
if b.BlockHeadersSynced() {
gdmsg := wire.NewMsgGetData()
for _, iv := range invVects {
if iv.Type == wire.InvTypeTx {
if b.server.mempool.HaveTransaction(&iv.Hash) {
continue
}
if _, exists := b.requestedTxns[iv.Hash]; !exists {
b.requestedTxns[iv.Hash] = struct{}{}
gdmsg.AddInvVect(iv)
}
}
}
if len(gdmsg.InvList) > 0 {
imsg.peer.QueueMessage(gdmsg, nil)
}
}

// Attempt to find the final block in the inventory list. There may
// not be one.
lastBlock := -1
invVects := imsg.inv.InvList
for i := len(invVects) - 1; i >= 0; i-- {
if invVects[i].Type == wire.InvTypeBlock {
lastBlock = i
Expand Down Expand Up @@ -1996,6 +2052,41 @@ func (b *blockManager) handleInvMsg(imsg *invMsg) {
}
}

// handleTxMsg handles transaction messages from all peers.
func (b *blockManager) handleTxMsg(tmsg *txMsg) {
txHash := tmsg.tx.Hash()
if _, exists := b.requestedTxns[*txHash]; !exists {
log.Warnf("Peer %s sent us a transaction we didn't request", tmsg.peer.Addr())
return
}
b.server.mempool.AddTransaction(tmsg.tx)
delete(b.requestedTxns, *txHash)
}

// handleDisableTxDownloadMsg sends a match none bloom filter to all peers
func (b *blockManager) handleDisableTxDownloadMsg(peers *list.List) {
for e := peers.Front(); e != nil; e = e.Next() {
sp, ok := e.Value.(*ServerPeer)
if !ok {
log.Error("handleDisableTxDownloadMsg error asserting type ServerPeer")
}
filter := bloom.NewFilter(0, 0, 0, wire.BloomUpdateNone)
sp.QueueMessage(filter.MsgFilterLoad(), nil)
}
}

// handleEnableTxDownloadMsg sends a match all bloom filter to all peers
func (b *blockManager) handleEnableTxDownloadMsg(peers *list.List) {
for e := peers.Front(); e != nil; e = e.Next() {
sp, ok := e.Value.(*ServerPeer)
if !ok {
log.Error("handleEnableTxDownloadMsg error asserting type ServerPeer")
}
filter := bloom.NewFilter(0, 0, 1, wire.BloomUpdateNone)
sp.QueueMessage(filter.MsgFilterLoad(), nil)
}
}

// QueueHeaders adds the passed headers message and peer to the block handling
// queue.
func (b *blockManager) QueueHeaders(headers *wire.MsgHeaders, sp *ServerPeer) {
Expand Down Expand Up @@ -2353,6 +2444,10 @@ func (b *blockManager) handleHeadersMsg(hmsg *headersMsg) {
b.headerTipHash = *finalHash
b.newHeadersMtx.Unlock()
b.newHeadersSignal.Broadcast()

// Clear the mempool to free up memory. This may mean we might receive
// transactions we've previously downloaded but this is rather unlikely.
b.server.mempool.Clear()
}

// checkHeaderSanity checks the PoW, and timestamp of a block header.
Expand Down
73 changes: 73 additions & 0 deletions mempool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package neutrino

import (
"github.com/gcash/bchd/btcjson"
"github.com/gcash/bchd/chaincfg/chainhash"
"github.com/gcash/bchutil"
"sync"
)

// Mempool is used when we are downloading unconfirmed transactions.
// We will use this object to track which transactions we've already
// downloaded so that we don't download them more than once.
type Mempool struct {
downloadedTxs map[chainhash.Hash]bool
mtx sync.RWMutex
callbacks []func(tx *bchutil.Tx, block *btcjson.BlockDetails)
watchedAddrs []bchutil.Address
}

// NewMempool returns an initialized mempool
func NewMempool() *Mempool {
return &Mempool{
downloadedTxs: make(map[chainhash.Hash]bool),
mtx: sync.RWMutex{},
}
}

// RegisterCallback will register a callback that will fire when a transaction
// matching a watched address enters the mempool.
func (mp *Mempool) RegisterCallback(onRecvTx func(tx *bchutil.Tx, block *btcjson.BlockDetails)) {
mp.mtx.Lock()
defer mp.mtx.Unlock()
mp.callbacks = append(mp.callbacks, onRecvTx)
}

// HaveTransaction returns whether or not the passed transaction already exists
// in the mempool.
func (mp *Mempool) HaveTransaction(hash *chainhash.Hash) bool {
mp.mtx.RLock()
defer mp.mtx.RUnlock()
return mp.downloadedTxs[*hash]
}

// AddTransaction adds a new transaction to the mempool and
// maybe calls back if it matches any watched addresses.
func (mp *Mempool) AddTransaction(tx *bchutil.Tx) {
mp.mtx.Lock()
defer mp.mtx.Unlock()
mp.downloadedTxs[*tx.Hash()] = true

ro := defaultRescanOptions()
WatchAddrs(mp.watchedAddrs...)(ro)
if ok, err := ro.paysWatchedAddr(tx); ok && err == nil {
for _, cb := range mp.callbacks {
cb(tx, nil)
}
}
}

// Clear will remove all transactions from the mempool. This
// should be done whenever a new block is accepted.
func (mp *Mempool) Clear() {
mp.mtx.Lock()
defer mp.mtx.Unlock()
mp.downloadedTxs = make(map[chainhash.Hash]bool)
}

// NotifyReceived stores addresses to watch
func (mp *Mempool) NotifyReceived(addrs []bchutil.Address) {
mp.mtx.Lock()
defer mp.mtx.Unlock()
mp.watchedAddrs = append(mp.watchedAddrs, addrs...)
}
Loading

0 comments on commit 1d1c002

Please sign in to comment.