diff --git a/lib/server.js b/lib/server.js index 00b6899..2e03c42 100644 --- a/lib/server.js +++ b/lib/server.js @@ -155,15 +155,17 @@ Server.prototype.close = function(callback) { that.closeConn(that.clients[id], cb); }; }), function() { - that.once("closed", callback); - try { - that.server.close(function() { - debug("closed"); - that.emit("closed"); - }); - } catch (exception) { - callback(exception); - } + that.ascoltatore.close(function () { + that.once("closed", callback); + try { + that.server.close(function() { + debug("closed"); + that.emit("closed"); + }); + } catch (exception) { + callback(exception); + } + }); }); }; @@ -219,15 +221,16 @@ Server.prototype.serve = function(client) { } }; - var forward = function(topic, payload, retry) { + var forward = function(topic, payload, options) { debug("delivering message on " + topic + " to " + client.id); - var qos = client.subscriptions[topic]; + var qos = Math.min((options && options.qos) || 0, + client.subscriptions[topic]); var packet = { topic: topic, payload: payload, qos: qos, - messageId: client.nextId++ + messageId: (options && options.messageId) || client.nextId++ }; actualSend(packet, 0); @@ -327,11 +330,15 @@ Server.prototype.serve = function(client) { return; } - that.ascoltatore.subscribe(s.topic.replace("#", "*"), forward, function() { - debug("subscribed " + client.id + " to " + s.topic); - client.subscriptions[s.topic] = s.qos; - cb(); - }); + that.ascoltatore.subscribe( + s.topic.replace("#", "*"), + { qos: s.qos }, + forward, + function() { + debug("subscribed " + client.id + " to " + s.topic); + client.subscriptions[s.topic] = s.qos; + cb(); + }); }); }; }), function(err) { @@ -353,17 +360,21 @@ Server.prototype.serve = function(client) { return; } - that.ascoltatore.publish(packet.topic, packet.payload, function() { - debug("client " + client.id + " published packet to topic " + packet.topic); - - if (packet.qos === 1) { - client.puback({ - messageId: packet.messageId - }); - } + that.ascoltatore.publish( + packet.topic, + packet.payload, + { qos: packet.qos, clientId: client.id, messageId: packet.messageId }, + function() { + debug("client " + client.id + " published packet to topic " + packet.topic); + + if (packet.qos === 1) { + client.puback({ + messageId: packet.messageId + }); + } - that.emit("published", packet, client); - }); + that.emit("published", packet, client); + }); }); }); @@ -386,7 +397,10 @@ Server.prototype.serve = function(client) { if (client.will) { debug("delivering last will for client " + client.id + " to topic " + client.will.topic); - that.ascoltatore.publish(client.will.topic, client.will.payload); + that.ascoltatore.publish( + client.will.topic, + client.will.payload, + { qos: client.will.qos, clientId: client.id }); } unsubAndClose(); diff --git a/package.json b/package.json index 8558326..a353cf7 100644 --- a/package.json +++ b/package.json @@ -46,7 +46,7 @@ "dependencies": { "mqtt": "~0.2.6", "async": "~0.2.4", - "ascoltatori": "~0.4.1", + "ascoltatori": "~0.5.0", "debug": "~0.7.2", "commander": "~1.1.1", "minimatch": "~0.2.11" diff --git a/test/server_spec.js b/test/server_spec.js index aee166b..649e138 100644 --- a/test/server_spec.js +++ b/test/server_spec.js @@ -688,6 +688,93 @@ describe("mosca.Server", function() { }); }); + client.on("suback", function(packet) { + client.publish({ + topic: "hello", + qos: 1, + messageId: 24 + }); + }); + + var subscriptions = [{ + topic: "hello", + qos: 1 + } + ]; + + client.subscribe({ + subscriptions: subscriptions, + messageId: 42 + }); + }); + }); + + it("should receive published messageId", function(done) { + buildAndConnect(done, function(client) { + + client.once("publish", function(packet) { + expect(packet.messageId).to.be.equal(24); + client.disconnect(); + }); + + client.on("suback", function(packet) { + client.publish({ + topic: "hello", + qos: 1, + messageId: 24 + }); + }); + + var subscriptions = [{ + topic: "hello", + qos: 1 + } + ]; + + client.subscribe({ + subscriptions: subscriptions, + messageId: 42 + }); + }); + }); + + it("should receive all messages at QoS 0 if a subscription is done with QoS 0", function(done) { + buildAndConnect(done, function(client) { + + client.once("publish", function(packet) { + expect(packet.qos).to.be.equal(0); + client.disconnect(); + }); + + client.on("suback", function(packet) { + client.publish({ + topic: "hello", + qos: 1, + messageId: 24 + }); + }); + + var subscriptions = [{ + topic: "hello", + qos: 0 + } + ]; + + client.subscribe({ + subscriptions: subscriptions, + messageId: 42 + }); + }); + }); + + it("should receive at QoS 0 all messages published at QoS 0 even if subscribed at QoS 1", function(done) { + buildAndConnect(done, function(client) { + + client.once("publish", function(packet) { + expect(packet.qos).to.be.equal(0); + client.disconnect(); + }); + client.on("suback", function(packet) { client.publish({ topic: "hello",