Skip to content
This repository was archived by the owner on Feb 11, 2020. It is now read-only.

Pass options to backend #26

Merged
merged 2 commits into from
May 24, 2013
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 42 additions & 28 deletions lib/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -155,15 +155,17 @@ Server.prototype.close = function(callback) {
that.closeConn(that.clients[id], cb);
};
}), function() {
that.once("closed", callback);
try {
that.server.close(function() {
debug("closed");
that.emit("closed");
});
} catch (exception) {
callback(exception);
}
that.ascoltatore.close(function () {
that.once("closed", callback);
try {
that.server.close(function() {
debug("closed");
that.emit("closed");
});
} catch (exception) {
callback(exception);
}
});
});
};

Expand Down Expand Up @@ -219,15 +221,16 @@ Server.prototype.serve = function(client) {
}
};

var forward = function(topic, payload, retry) {
var forward = function(topic, payload, options) {
debug("delivering message on " + topic + " to " + client.id);

var qos = client.subscriptions[topic];
var qos = Math.min((options && options.qos) || 0,
client.subscriptions[topic]);
var packet = {
topic: topic,
payload: payload,
qos: qos,
messageId: client.nextId++
messageId: (options && options.messageId) || client.nextId++
};

actualSend(packet, 0);
Expand Down Expand Up @@ -327,11 +330,15 @@ Server.prototype.serve = function(client) {
return;
}

that.ascoltatore.subscribe(s.topic.replace("#", "*"), forward, function() {
debug("subscribed " + client.id + " to " + s.topic);
client.subscriptions[s.topic] = s.qos;
cb();
});
that.ascoltatore.subscribe(
s.topic.replace("#", "*"),
{ qos: s.qos },
forward,
function() {
debug("subscribed " + client.id + " to " + s.topic);
client.subscriptions[s.topic] = s.qos;
cb();
});
});
};
}), function(err) {
Expand All @@ -353,17 +360,21 @@ Server.prototype.serve = function(client) {
return;
}

that.ascoltatore.publish(packet.topic, packet.payload, function() {
debug("client " + client.id + " published packet to topic " + packet.topic);

if (packet.qos === 1) {
client.puback({
messageId: packet.messageId
});
}
that.ascoltatore.publish(
packet.topic,
packet.payload,
{ qos: packet.qos, clientId: client.id, messageId: packet.messageId },
function() {
debug("client " + client.id + " published packet to topic " + packet.topic);

if (packet.qos === 1) {
client.puback({
messageId: packet.messageId
});
}

that.emit("published", packet, client);
});
that.emit("published", packet, client);
});
});
});

Expand All @@ -386,7 +397,10 @@ Server.prototype.serve = function(client) {

if (client.will) {
debug("delivering last will for client " + client.id + " to topic " + client.will.topic);
that.ascoltatore.publish(client.will.topic, client.will.payload);
that.ascoltatore.publish(
client.will.topic,
client.will.payload,
{ qos: client.will.qos, clientId: client.id });
}

unsubAndClose();
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
"dependencies": {
"mqtt": "~0.2.6",
"async": "~0.2.4",
"ascoltatori": "~0.4.1",
"ascoltatori": "~0.5.0",
"debug": "~0.7.2",
"commander": "~1.1.1",
"minimatch": "~0.2.11"
Expand Down
87 changes: 87 additions & 0 deletions test/server_spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -688,6 +688,93 @@ describe("mosca.Server", function() {
});
});

client.on("suback", function(packet) {
client.publish({
topic: "hello",
qos: 1,
messageId: 24
});
});

var subscriptions = [{
topic: "hello",
qos: 1
}
];

client.subscribe({
subscriptions: subscriptions,
messageId: 42
});
});
});

it("should receive published messageId", function(done) {
buildAndConnect(done, function(client) {

client.once("publish", function(packet) {
expect(packet.messageId).to.be.equal(24);
client.disconnect();
});

client.on("suback", function(packet) {
client.publish({
topic: "hello",
qos: 1,
messageId: 24
});
});

var subscriptions = [{
topic: "hello",
qos: 1
}
];

client.subscribe({
subscriptions: subscriptions,
messageId: 42
});
});
});

it("should receive all messages at QoS 0 if a subscription is done with QoS 0", function(done) {
buildAndConnect(done, function(client) {

client.once("publish", function(packet) {
expect(packet.qos).to.be.equal(0);
client.disconnect();
});

client.on("suback", function(packet) {
client.publish({
topic: "hello",
qos: 1,
messageId: 24
});
});

var subscriptions = [{
topic: "hello",
qos: 0
}
];

client.subscribe({
subscriptions: subscriptions,
messageId: 42
});
});
});

it("should receive at QoS 0 all messages published at QoS 0 even if subscribed at QoS 1", function(done) {
buildAndConnect(done, function(client) {

client.once("publish", function(packet) {
expect(packet.qos).to.be.equal(0);
client.disconnect();
});

client.on("suback", function(packet) {
client.publish({
topic: "hello",
Expand Down