Skip to content

Commit

Permalink
Mqtt js upgrade (#562)
Browse files Browse the repository at this point in the history
Co-authored-by: Bret Ambrose <[email protected]>
Co-authored-by: Vera Xia <[email protected]>
  • Loading branch information
3 people authored Aug 28, 2024
1 parent d2f7e6b commit 10b0239
Show file tree
Hide file tree
Showing 9 changed files with 335 additions and 158 deletions.
20 changes: 16 additions & 4 deletions lib/browser/mqtt.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/

import * as mqtt from "mqtt";
import * as mqtt_packet from "mqtt-packet";
import * as WebsocketUtils from "./ws";
import * as auth from "./auth";
import { Trie, TrieOp, Node as TrieNode } from "./trie";
Expand All @@ -36,7 +37,7 @@ import {
OnConnectionFailedResult,
OnConnectionClosedResult
} from "../common/mqtt";
import { normalize_payload } from "../common/mqtt_shared";
import {normalize_payload, normalize_payload_to_buffer} from "../common/mqtt_shared";

export {
QoS, Payload, MqttRequest, MqttSubscribeRequest, MqttWill, OnMessageCallback, MqttConnectionConnected, MqttConnectionDisconnected,
Expand Down Expand Up @@ -310,7 +311,7 @@ export class MqttClientConnection extends BufferedEventEmitter {

const will = this.config.will ? {
topic: this.config.will.topic,
payload: normalize_payload(this.config.will.payload),
payload: normalize_payload_to_buffer(this.config.will.payload),
qos: this.config.will.qos,
retain: this.config.will.retain,
} : undefined;
Expand Down Expand Up @@ -576,7 +577,18 @@ export class MqttClientConnection extends BufferedEventEmitter {
return this.on_error(error);
}
const sub = (packet as mqtt.ISubscriptionGrant[])[0];
resolve({ topic: sub.topic, qos: sub.qos });

/*
* 128 is not modeled in QoS, either on our side nor mqtt-js's side.
* We have always passed this 128 to the user and it is not reasonable to extend
* our output type with 128 since it's also our input type and we don't want anyone
* to pass 128 to us.
*
* The 5 client solves this by making the output type a completely separate enum.
*
* By doing this cast, we make the type checker ignore this edge case.
*/
resolve({ topic: sub.topic, qos: sub.qos as QoS });
});
});
}
Expand All @@ -603,7 +615,7 @@ export class MqttClientConnection extends BufferedEventEmitter {
}
resolve({
packet_id: packet
? (packet as mqtt.IUnsubackPacket).messageId
? (packet as mqtt_packet.IUnsubackPacket).messageId
: undefined,
});
});
Expand Down
4 changes: 2 additions & 2 deletions lib/browser/mqtt5.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import url from "url";
import {HttpsProxyAgent} from "https-proxy-agent";
import * as auth from "./auth";

jest.setTimeout(10000);
jest.setTimeout(1000000);

function createBrowserSpecificTestConfig (testType: test_utils.SuccessfulConnectionTestType) : mqtt5.Mqtt5ClientConfig {

Expand Down Expand Up @@ -405,7 +405,7 @@ test_utils.conditional_test(test_utils.ClientEnvironmentalConfig.hasIotCoreEnvir
await test_utils.testNegotiatedSettings(forcedRejoinClient, true);
});

test_utils.conditional_test(test_utils.ClientEnvironmentalConfig.hasIotCoreEnvironment())('Sub - Pub QoS 0 - Unsub', async () => {
test('Sub - Pub QoS 0 - Unsub', async () => {
let topic : string = `test/${uuid()}`;
let testPayload : Buffer = Buffer.from("Derp", "utf-8");

Expand Down
19 changes: 12 additions & 7 deletions lib/browser/mqtt5.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import {BufferedEventEmitter} from "../common/event";
import * as mqtt from "mqtt"; /* The mqtt-js external dependency */
import * as mqtt_packet from "mqtt-packet";
import * as mqtt5 from "../common/mqtt5";
import {OutboundTopicAliasBehaviorType} from "../common/mqtt5";
import * as mqtt5_packet from "../common/mqtt5_packet"
Expand Down Expand Up @@ -445,15 +446,19 @@ export class Mqtt5Client extends BufferedEventEmitter implements mqtt5.IMqtt5Cli
let subMap: mqtt.ISubscriptionMap = mqtt_utils.transform_crt_subscribe_to_mqtt_js_subscription_map(packet);
let subOptions: mqtt.IClientSubscribeOptions = mqtt_utils.transform_crt_subscribe_to_mqtt_js_subscribe_options(packet);

// @ts-ignore
this.browserClient.subscribe(subMap, subOptions, (error, grants) => {
this.browserClient.subscribe(subMap, subOptions, (error, grants, suback) => {
if (error) {
reject(error);
return;
}

const suback: mqtt5_packet.SubackPacket = mqtt_utils.transform_mqtt_js_subscription_grants_to_crt_suback(grants);
resolve(suback);
if (suback) {
const crtSubackFromMqttjsSuback = mqtt_utils.transform_mqtt_js_suback_to_crt_suback(suback);
resolve(crtSubackFromMqttjsSuback);
} else {
const crtSubackFromGrants: mqtt5_packet.SubackPacket = mqtt_utils.transform_mqtt_js_subscription_grants_to_crt_suback(grants ?? []);
resolve(crtSubackFromGrants);
}
});
} catch (err) {
reject(err);
Expand Down Expand Up @@ -505,7 +510,7 @@ export class Mqtt5Client extends BufferedEventEmitter implements mqtt5.IMqtt5Cli
};
resolve(unsuback);
} else {
const unsuback: mqtt5_packet.UnsubackPacket = mqtt_utils.transform_mqtt_js_unsuback_to_crt_unsuback(packet as mqtt.IUnsubackPacket);
const unsuback: mqtt5_packet.UnsubackPacket = mqtt_utils.transform_mqtt_js_unsuback_to_crt_unsuback(packet as mqtt_packet.IUnsubackPacket);
resolve(unsuback);
}
});
Expand Down Expand Up @@ -607,7 +612,7 @@ export class Mqtt5Client extends BufferedEventEmitter implements mqtt5.IMqtt5Cli
})
}

const puback: mqtt5_packet.PubackPacket = mqtt_utils.transform_mqtt_js_puback_to_crt_puback(completionPacket as mqtt.IPubackPacket);
const puback: mqtt5_packet.PubackPacket = mqtt_utils.transform_mqtt_js_puback_to_crt_puback(completionPacket as mqtt_packet.IPubackPacket);
resolve(puback);
break;

Expand Down Expand Up @@ -889,4 +894,4 @@ export class Mqtt5Client extends BufferedEventEmitter implements mqtt5.IMqtt5Cli
this.emit(Mqtt5Client.MESSAGE_RECEIVED, messageReceivedEvent);
}, 0);
}
}
}
73 changes: 55 additions & 18 deletions lib/browser/mqtt5_utils.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@
*/

import * as mqtt from "mqtt";
import * as mqtt_packet from "mqtt-packet";
import * as mqtt5 from "./mqtt5";
import {InboundTopicAliasBehaviorType, OutboundTopicAliasBehaviorType} from "./mqtt5";
import * as mqtt5_utils from "./mqtt5_utils";
import * as mqtt_shared from "../common/mqtt_shared";
import {normalize_payload_to_buffer} from "../common/mqtt_shared";


test('MQTT.JS User Properties to CRT User Properties undefined', async () => {
Expand All @@ -17,7 +19,7 @@ test('MQTT.JS User Properties to CRT User Properties undefined', async () => {
});

test('MQTT.JS User Properties to CRT User Properties single', async () => {
let mqttJsUserProperties : mqtt.UserProperties = {
let mqttJsUserProperties : mqtt_packet.UserProperties = {
prop1 : "value1",
prop2 : "value2"
}
Expand All @@ -40,7 +42,7 @@ test('MQTT.JS User Properties to CRT User Properties single', async () => {
});

test('MQTT.JS User Properties to CRT User Properties multi', async () => {
let mqttJsUserProperties : mqtt.UserProperties = {
let mqttJsUserProperties : mqtt_packet.UserProperties = {
prop1 : "value1",
prop2 : ["value2_1", "value2_2", "value2_3"]
}
Expand Down Expand Up @@ -71,7 +73,7 @@ test('MQTT.JS User Properties to CRT User Properties multi', async () => {
});

test('CRT User Properties to MQTT.js User Properties undefined', async () => {
let mqttJsUserProperties : mqtt.UserProperties | undefined = mqtt5_utils.transform_crt_user_properties_to_mqtt_js_user_properties(undefined);
let mqttJsUserProperties : mqtt_packet.UserProperties | undefined = mqtt5_utils.transform_crt_user_properties_to_mqtt_js_user_properties(undefined);

expect(mqttJsUserProperties).toBeUndefined();
});
Expand All @@ -82,7 +84,7 @@ test('CRT User Properties to MQTT.js User Properties single', async () => {
{ name : "prop2", value: "value2"}
]

let mqttJsUserProperties : mqtt.UserProperties | undefined = mqtt5_utils.transform_crt_user_properties_to_mqtt_js_user_properties(crtUserProperties);
let mqttJsUserProperties : mqtt_packet.UserProperties | undefined = mqtt5_utils.transform_crt_user_properties_to_mqtt_js_user_properties(crtUserProperties);

expect(mqttJsUserProperties).toEqual(
{
Expand All @@ -99,9 +101,9 @@ test('CRT User Properties to MQTT.js User Properties single', async () => {
{ name : "prop2", value: "value2_3"}
]

let mqttJsUserProperties : mqtt.UserProperties | undefined = mqtt5_utils.transform_crt_user_properties_to_mqtt_js_user_properties(crtUserProperties);
let mqttJsUserProperties : mqtt_packet.UserProperties | undefined = mqtt5_utils.transform_crt_user_properties_to_mqtt_js_user_properties(crtUserProperties);
expect(mqttJsUserProperties).toBeDefined();
let definedProperties : mqtt.UserProperties = mqttJsUserProperties ?? {};
let definedProperties : mqtt_packet.UserProperties = mqttJsUserProperties ?? {};

const {prop1 : propOne, prop2: propTwo, ...rest} = definedProperties;

Expand Down Expand Up @@ -415,7 +417,7 @@ test('create_mqtt_js_client_config_from_crt_client_config maximal, minimal will'
expectedOptions["password"] = myPassword;
expectedOptions["will"] = {
topic : "Ohno",
payload : "",
payload : normalize_payload_to_buffer(""),
qos : mqtt5.QoS.AtLeastOnce,
retain : false
}
Expand Down Expand Up @@ -638,8 +640,6 @@ test('transform_crt_subscribe_to_mqtt_js_subscription_map', async() => {
});
});

//function transform_crt_subscribe_to_mqtt_js_subscribe_options(subscribe: mqtt5.SubscribePacket) : mqtt.IClientSubscribeOptions

test('transform_crt_subscribe_to_mqtt_js_subscribe_options minimal', async() => {
let subscribe : mqtt5.SubscribePacket = {
subscriptions: [
Expand Down Expand Up @@ -692,7 +692,7 @@ test('transform_mqtt_js_subscription_grants_to_crt_suback', async() => {
},
{
topic: "a/different/topic",
qos: mqtt5.SubackReasonCode.NotAuthorized,
qos: mqtt5.SubackReasonCode.UnspecifiedError,
nl: true,
rap: true,
rh: 2
Expand All @@ -703,7 +703,45 @@ test('transform_mqtt_js_subscription_grants_to_crt_suback', async() => {

expect(suback).toEqual({
type: mqtt5.PacketType.Suback,
reasonCodes: [2, mqtt5.SubackReasonCode.NotAuthorized]
reasonCodes: [2, mqtt5.SubackReasonCode.UnspecifiedError]
});
});

test('transform_mqtt_js_suback_to_crt_suback - minimal', async() => {
let mqttJsSuback : mqtt_packet.ISubackPacket = {
cmd: "suback",
granted: [1]
};

let suback : mqtt5.SubackPacket = mqtt5_utils.transform_mqtt_js_suback_to_crt_suback(mqttJsSuback);

expect(suback).toEqual({
type: mqtt5.PacketType.Suback,
reasonCodes: [mqtt5.SubackReasonCode.GrantedQoS1]
});
});

test('transform_mqtt_js_suback_to_crt_suback - maximal', async() => {
let mqttJsSuback : mqtt_packet.ISubackPacket = {
cmd: "suback",
granted: [2, 128],
properties : {
reasonString: "Misadventure",
userProperties: {
world: ["hello"]
}
}
};

let suback : mqtt5.SubackPacket = mqtt5_utils.transform_mqtt_js_suback_to_crt_suback(mqttJsSuback);

expect(suback).toEqual({
type: mqtt5.PacketType.Suback,
reasonCodes: [mqtt5.SubackReasonCode.GrantedQoS2, mqtt5.SubackReasonCode.UnspecifiedError],
reasonString: "Misadventure",
userProperties: [
{name: "world", value: "hello"}
]
});
});

Expand Down Expand Up @@ -827,7 +865,7 @@ test('transform_mqtt_js_publish_to_crt_publish maximal', async() => {
});

test('transform_mqtt_js_puback_to_crt_puback minimal', async() => {
let mqttJsPuback : mqtt.IPubackPacket = {
let mqttJsPuback : mqtt_packet.IPubackPacket = {
cmd: 'puback'
};

Expand All @@ -840,7 +878,7 @@ test('transform_mqtt_js_puback_to_crt_puback minimal', async() => {
});

test('transform_mqtt_js_puback_to_crt_puback maximal', async() => {
let mqttJsPuback : mqtt.IPubackPacket = {
let mqttJsPuback : mqtt_packet.IPubackPacket = {
cmd: 'puback',
reasonCode: mqtt5.PubackReasonCode.NotAuthorized,
properties: {
Expand Down Expand Up @@ -893,9 +931,9 @@ test('transform_crt_unsubscribe_to_mqtt_js_unsubscribe_options maximal', async()
});

test('transform_mqtt_js_unsuback_to_crt_unsuback minimal', async() => {
let mqttJsUnsuback : mqtt.IUnsubackPacket = {
let mqttJsUnsuback : mqtt_packet.IUnsubackPacket = {
cmd: 'unsuback',
reasonCode: mqtt5.UnsubackReasonCode.NoSubscriptionExisted
granted: [mqtt5.UnsubackReasonCode.NoSubscriptionExisted]
};

let crtUnsuback : mqtt5.UnsubackPacket = mqtt5_utils.transform_mqtt_js_unsuback_to_crt_unsuback(mqttJsUnsuback);
Expand All @@ -907,10 +945,9 @@ test('transform_mqtt_js_unsuback_to_crt_unsuback minimal', async() => {
});

test('transform_mqtt_js_unsuback_to_crt_unsuback maximal', async() => {
let mqttJsUnsuback : mqtt.IUnsubackPacket = {
let mqttJsUnsuback : mqtt_packet.IUnsubackPacket = {
cmd: 'unsuback',
// @ts-ignore
reasonCode: [mqtt5.UnsubackReasonCode.NoSubscriptionExisted, mqtt5.UnsubackReasonCode.ImplementationSpecificError],
granted: [mqtt5.UnsubackReasonCode.NoSubscriptionExisted, mqtt5.UnsubackReasonCode.ImplementationSpecificError],
properties: {
reasonString: "Dunno",
userProperties: {
Expand Down
Loading

0 comments on commit 10b0239

Please sign in to comment.