Skip to content

Commit

Permalink
fix(Node): fix initialization race conditions
Browse files Browse the repository at this point in the history
Would break when closing during initialization.
  • Loading branch information
mappum committed Jun 26, 2015
1 parent bb8ca7e commit 86ce8f3
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 51 deletions.
35 changes: 22 additions & 13 deletions lib/node.js
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,15 @@ var Node = module.exports = function (opts, cb) {
util.inherits(Node, EventEmitter)

Node.prototype.start = function (cb) {
if (cb) this.on('ready', cb)

this.peers.connect()
this.chain.sync()
cb = cb || function () {}

this.peers.on('peerconnect', this._onPeerConnect.bind(this))

async.parallel([
this.peers.connect.bind(this.peers),
this.chain.sync.bind(this.chain)
], cb)

this.emit('ready')
}

Expand All @@ -61,20 +63,27 @@ Node.prototype.close = function (cb) {
var self = this

this.closing = true
this.peers.disconnect()

var wallets = []
for (var id in this.wallets) wallets.push(this.wallets[id])
async.each(wallets, function (wallet, cb) { wallet.close(cb) }, function (err) {
if (err) return cb(err)
var tasks = [ this.peers.disconnect.bind(this.peers) ]

self.chain.close(function (err) {
tasks.push(function (cb) {
var wallets = []
for (var id in this.wallets) wallets.push(this.wallets[id])
async.each(wallets, function (wallet, cb) { wallet.close(cb) }, function (err) {
if (err) return cb(err)
this.closed = true
self.emit('end')
cb(null)
self.chain.close(function (err) {
if (err) return cb(err)
this.closed = true
cb(null)
})
})
})

async.parallel(tasks, function (err) {
if (err) return (self._error || cb)(err)
self.emit('end')
cb(null)
})
}

Node.prototype.createBlockStream = function (opts) {
Expand Down
6 changes: 6 additions & 0 deletions lib/peer.js
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,12 @@ Peer.prototype._sendVersion = function () {
this.sendMessage(message)
}

Peer.prototype.sendMessage = function () {
if (this.status === Peer.STATUS.DISCONNECTED) return
var args = Array.prototype.slice.call(arguments, 0)
p2p.Peer.prototype.sendMessage.apply(this, args)
}

Peer.prototype.getFilter = function () {
return this.filter
}
Expand Down
113 changes: 75 additions & 38 deletions lib/peerGroup.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
var EventEmitter = require('events').EventEmitter
var util = require('util')
var async = require('async')
var bitcore = require('bitcore')
var Networks = bitcore.Networks
var Transaction = bitcore.Transaction
Expand Down Expand Up @@ -43,7 +44,12 @@ var PeerGroup = module.exports = function (opts) {

this.webSeeds = []

this.connecting = false
this.connected = false
this.disconnecting = false
this.connectedTCP = false
this.connectedWeb = false

this.peers = []
this.peers.tcp = []
this.peers.web = []
Expand All @@ -65,6 +71,11 @@ PeerGroup.prototype._error = function (err) {

PeerGroup.prototype._onPeerConnect = function (peer) {
var self = this

if (!(peer instanceof Peer)) {
peer = new Peer(peer)
}

peer.on('ready', function () {
self._onPeerReady(peer)
self.emit('peer', peer)
Expand Down Expand Up @@ -102,7 +113,6 @@ PeerGroup.prototype._onPeerReady = function (peer) {
// TODO: handle types other than transactions
var txMessage = peer.messages.Transaction(item.value)
peer.sendMessage(txMessage)
console.log(item.value.hash)
})
})

Expand Down Expand Up @@ -140,38 +150,48 @@ PeerGroup.prototype.connect = function (opts, cb) {
opts = {}
}
opts = opts || {}
if (cb) this.once('connect', cb)

this.on('peerconnect', function onPeerConnect () {
if (self.numberConnected() >= (self.tcpCount + self.webCount) / 2) {
self.removeListener('peerconnect', onPeerConnect)
self.connected = true
self.emit('connect')
}
})
if (cb) {
this.once('ready', function () { cb(null) })
}

this.connecting = true

var tasks = []

if (!process.browser) {
// first check if a peer is running on the local machine
this._connectToLocalhost(function (err, peer) {
if (err || !peer) {
// if we couldn't connect to the local peer, connect to DNS seed peers
return self._connectToTCPPeers()
}
// if we eventually disconnect from local peer, connect to DNS seed peers
peer.on('disconnect', self._connectToTCPPeers.bind(self))
tasks.push(function (cb) {
// first check if a peer is running on the local machine
self._connectToLocalhost(function (err, peer) {
if (err || !peer) {
// if we couldn't connect to the local peer, connect to DNS seed peers
return self._connectToTCPPeers(cb)
}
// if we eventually disconnect from local peer, connect to DNS seed peers
peer.on('disconnect', self._connectToTCPPeers.bind(self))
cb(null)
})
})
}

if (supportsWebRTC) {
webSeeds.forEach(function (uri) {
var client = new PeerhubClient(uri, function () {
self.webSeeds.push(client)
self.emit('seedconnect', client)
self._connectToWebPeers()
if (self.acceptWeb) self._acceptFromPeerhub(client)
})
tasks.push(function (cb) {
async.each(webSeeds, function (uri, cb) {
var client = new PeerhubClient(uri, function () {
self.webSeeds.push(client)
self.emit('seedconnect', client)
self._connectToWebPeers()
if (self.acceptWeb) self._acceptFromPeerhub(client)
cb(null)
})
}, cb)
})
}

async.parallel(tasks, function (err) {
if (err) return (cb || self._error)(err)
self.connecting = false
self.emit('ready')
})
}

PeerGroup.prototype._connectToLocalhost = function (cb) {
Expand Down Expand Up @@ -199,19 +219,23 @@ PeerGroup.prototype._connectToLocalhost = function (cb) {
localPeer.connect()
}

PeerGroup.prototype._connectToTCPPeers = function () {
PeerGroup.prototype._connectToTCPPeers = function (cb) {
if (process.browser) throw new Error('Not supported in the browser')
if (!cb) cb = function () {}

var self = this
if (this.tcpCount <= 0) return
if (this.tcpCount <= 0) return cb(null)
// FIXME: temporary hack to fix intermittent connection problems:
// (reconnect pool if no peers connect in 4 seconds)
var timeout = setTimeout(function () {
// (reconnect pool every 5 seconds if no peers connect)
var interval = setInterval(function () {
self.pool.connect()
}, 4000)
this.pool.on('peerconnect', function () {
clearTimeout(timeout)
})
}, 5000)
function onPeerConnect () {
self.pool.removeListener('peerconnect', onPeerConnect)
clearInterval(interval)
cb(null)
}
if (!this.connected) this.pool.on('peerconnect', onPeerConnect)
this.pool.connect()
if (this.acceptTcp) this.pool.listen()
}
Expand Down Expand Up @@ -241,6 +265,7 @@ PeerGroup.prototype._onWebPeerConnect = function (conn, incoming) {
var peer = new WebPeer(conn, { incoming: !!incoming, getTip: this.getTip })
peer.connect()
this._onPeerConnect(peer)
this.emit('webpeer', peer)
}

PeerGroup.prototype.acceptWebPeers = function () {
Expand All @@ -254,15 +279,27 @@ PeerGroup.prototype._acceptFromPeerhub = function (client) {
client.accept(function (id, peer) { self._onWebPeerConnect(peer, true) })
}

PeerGroup.prototype.disconnect = function () {
if (this.pool) this.pool.disconnect()
this.peers.forEach(function (peer) {
PeerGroup.prototype.disconnect = function (cb) {
var self = this

this.disconnecting = true

if (this.connecting) {
return this.on('ready', function () {
self.disconnect(cb)
})
}

self.peers.forEach(function (peer) {
peer.disconnect()
})
this.webSeeds.forEach(function (client) {
self.webSeeds.forEach(function (client) {
client.disconnect()
})
this.emit('disconnect')

if (this.pool) this.pool.disconnect()
self.emit('disconnect')
if (cb) cb(null)
}

PeerGroup.prototype.setFilter = function (filter) {
Expand Down

0 comments on commit 86ce8f3

Please sign in to comment.