From d364fe77e20dbfcf8bf79f89391c83bfd89385ba Mon Sep 17 00:00:00 2001 From: lucarin91 Date: Fri, 22 Nov 2024 18:45:33 +0100 Subject: [PATCH 1/8] feat: reduce token refresh --- arduino-iot-cloud.js | 8 +- utils/arduino-connection-manager.js | 197 ++++++++++++---------------- 2 files changed, 88 insertions(+), 117 deletions(-) diff --git a/arduino-iot-cloud.js b/arduino-iot-cloud.js index 315fe51..66ce37c 100644 --- a/arduino-iot-cloud.js +++ b/arduino-iot-cloud.js @@ -75,7 +75,7 @@ module.exports = function (RED) { try { if (config.thing !== "" && config.property !== "") { - this.arduinoRestClient = await connectionManager.getClientHttp(connectionConfig, this.organization); + this.arduinoRestClient = await connectionManager.getClientHttp(connectionConfig, config.organization); if (this.arduinoRestClient){ this.arduinoRestClient.openConnections++; this.organization = config.organization; @@ -152,7 +152,7 @@ module.exports = function (RED) { this.timeWindowUnit = config.timeWindowUnit; if (connectionConfig && config.thing !== "" && config.thing !== "0" && config.property !== "" && config.property !== "0") { try { - this.arduinoRestClient = await connectionManager.getClientHttp(connectionConfig, this.organization); + this.arduinoRestClient = await connectionManager.getClientHttp(connectionConfig, config.organization); if (this.arduinoRestClient){ this.arduinoRestClient.openConnections++; if (config.thing !== "" && config.property !== "") { @@ -251,7 +251,7 @@ module.exports = function (RED) { this.organization = config.organization; if (connectionConfig && config.thing !== "" && config.thing !== "0" && config.property !== "" && config.property !== "0") { try { - this.arduinoRestClient = await connectionManager.getClientHttp(connectionConfig, this.organization); + this.arduinoRestClient = await connectionManager.getClientHttp(connectionConfig, config.organization); if (this.arduinoRestClient){ this.arduinoRestClient.openConnections++; if (config.thing !== "" && config.property !== "") { @@ -340,7 +340,7 @@ module.exports = function (RED) { try { if (config.thing !== "" && config.property !== "") { - this.arduinoRestClient = await connectionManager.getClientHttp(connectionConfig, this.organization); + this.arduinoRestClient = await connectionManager.getClientHttp(connectionConfig, config.organization); if (this.arduinoRestClient){ this.arduinoRestClient.openConnections++; this.organization = config.organization; diff --git a/utils/arduino-connection-manager.js b/utils/arduino-connection-manager.js index de718f5..95da355 100644 --- a/utils/arduino-connection-manager.js +++ b/utils/arduino-connection-manager.js @@ -29,57 +29,33 @@ const Mutex = require('async-mutex').Mutex; * clientId: clientId, * connectionConfig: connectionConfig, * token: token, - * expires_token_ts: ts, * clientMqtt: clientMqttobj, * clientHttp: clientHttpobj, - * timeoutUpdateToken: timeout + * intervalUpdateToken: interval * } */ var connections = []; const getClientMutex = new Mutex(); -var numRetry=0; - -async function getToken(connectionConfig, organizationID) { - const dataToSend = { - grant_type: 'client_credentials', - client_id: connectionConfig.credentials.clientid, - client_secret: connectionConfig.credentials.clientsecret, - audience: accessTokenAudience - }; - - try { - var req = superagent +function getMqttOptions(clientId,token,RED){ + async function reconnect() { + const releaseMutex = await getClientMutex.acquire(); + let user = findUser(clientId); + if (user !== -1) { + let token = await getToken(connections[user].connectionConfig); + var req = superagent .post(accessTokenUri) .set('content-type', 'application/x-www-form-urlencoded') .set('accept', 'json') - if (organizationID) { - req.set('X-Organization', organizationID) - } - var res = await req.send(dataToSend); var token = res.body.access_token; - var expires_in = res.body.expires_in * 0.8; // needed to change the token before it expires - if (token !== undefined) { - return { token: token, expires_in: expires_in }; - } - } catch (err) { - if(err.response && err.response.res && err.response.request){ - console.log('statusCode: '+ err.response.res.statusCode +'\r'+ - 'statusMessage: ' + err.response.res.statusMessage + '\r' + - 'text: ' + err.response.res.text + '\r'+ - 'HTTP method: ' + err.response.request.method + '\r' + - 'URL request: ' + err.response.request.url - ); - }else{ - console.log(err); + var expires_in = res.body.expires_in * 0.8; // needed to change the token before it expiress[user].token = token.token; + await connections[user].clientMqtt.updateToken(token.token); } - + releaseMutex(); } -} -function getMqttOptions(clientId,token,RED){ return { host: arduinoIotCloudHost, token: token, @@ -92,8 +68,7 @@ function getMqttOptions(clientId,token,RED){ } }); - await reconnectMqtt(clientId); - + await reconnect(); }, onOffline: async () => { console.log(`connection lost for ${clientId}`); @@ -103,6 +78,8 @@ function getMqttOptions(clientId,token,RED){ node.status({ fill: "red", shape: "dot", text: "arduino-iot-cloud.status.offline" }); } }); + + await reconnect(); }, onConnected: () =>{ RED.nodes.eachNode((n)=>{ @@ -117,42 +94,35 @@ function getMqttOptions(clientId,token,RED){ } async function getClientMqtt(connectionConfig, RED) { - if (!connectionConfig || !connectionConfig.credentials) { throw new Error("Cannot find connection config or credentials."); } const releaseMutex = await getClientMutex.acquire(); try { - let user = findUser(connectionConfig.credentials.clientid); let clientMqtt; + let user = findUser(connectionConfig.credentials.clientid); if (user === -1) { + let token = await getToken(connectionConfig); clientMqtt = new ArduinoClientMqtt.ArduinoClientMqtt(); - const tokenInfo = await getToken(connectionConfig); - if (tokenInfo !== undefined) { - const ArduinoIotCloudOptions = getMqttOptions(connectionConfig.credentials.clientid,tokenInfo.token,RED) - const timeout = setTimeout(() => { updateToken(connectionConfig) }, tokenInfo.expires_in * 1000); - connections.push({ - clientId: connectionConfig.credentials.clientid, - connectionConfig: connectionConfig, - token: tokenInfo.token, - expires_token_ts: tokenInfo.expires_in, - clientMqtt: clientMqtt, - clientHttp: null, - timeoutUpdateToken: timeout - }); - await clientMqtt.connect(ArduinoIotCloudOptions); - } else { - clientMqtt = undefined; - } + connections.push({ + clientId: connectionConfig.credentials.clientid, + connectionConfig: connectionConfig, + clientMqtt: clientMqtt, + token: token.token, + clientHttp: null, + }); + await clientMqtt.connect( + getMqttOptions(connectionConfig.credentials.clientid, token.token, RED), + ); } else { if (connections[user].clientMqtt !== null) { clientMqtt = connections[user].clientMqtt; } else { clientMqtt = new ArduinoClientMqtt.ArduinoClientMqtt(); - const ArduinoIotCloudOptions = getMqttOptions(connectionConfig.credentials.clientid,connections[user].token,RED) connections[user].clientMqtt = clientMqtt; - await clientMqtt.connect(ArduinoIotCloudOptions); - + await clientMqtt.connect( + getMqttOptions(connectionConfig.credentials.clientid, connections[user].token, RED) + ); } } releaseMutex(); @@ -162,11 +132,9 @@ async function getClientMqtt(connectionConfig, RED) { console.log(err); releaseMutex(); } - } async function getClientHttp(connectionConfig, organizationID) { - if (!connectionConfig || !connectionConfig.credentials) { throw new Error("Cannot find cooonection config or credentials."); } @@ -180,19 +148,21 @@ async function getClientHttp(connectionConfig, organizationID) { if (tokenInfo !== undefined) { clientHttp = new ArduinoClientHttp.ArduinoClientHttp(tokenInfo.token); - var timeout = setTimeout(() => { updateToken(connectionConfig) }, tokenInfo.expires_in * 1000); + var interval = setInterval(async () => { + let id = findUser(connectionConfig.credentials.clientid); + if (id !== -1) { + connections[id].token = await getToken(connectionConfig, organizationID); + } + }, tokenInfo.expires_in * 1000); connections.push({ clientId: connectionConfig.credentials.clientid, connectionConfig: connectionConfig, token: tokenInfo.token, - expires_token_ts: tokenInfo.expires_in, clientMqtt: null, clientHttp: clientHttp, - timeoutUpdateToken: timeout + intervalUpdateToken: interval }); - } - } else { if (connections[user].clientHttp !== null) { clientHttp = connections[user].clientHttp; @@ -216,11 +186,8 @@ async function getClientHttp(connectionConfig, organizationID) { }else{ console.log(err); } - releaseMutex(); - } - } function findUser(clientId) { @@ -230,39 +197,56 @@ function findUser(clientId) { } } return -1; - } -async function updateToken(connectionConfig) { - try { - var user = findUser(connectionConfig.credentials.clientid); - if (user !== -1) { - var tokenInfo = await getToken(connectionConfig); - if (tokenInfo !== undefined) { - numRetry=0; - connections[user].token = tokenInfo.token; - connections[user].expires_token_ts = tokenInfo.expires_in; - if(connections[user].clientMqtt){ - connections[user].clientMqtt.updateToken(tokenInfo.token); - } - if(connections[user].clientHttp){ - connections[user].clientHttp.updateToken(tokenInfo.token); - } - connections[user].timeoutUpdateToken = setTimeout(() => { updateToken(connectionConfig) }, tokenInfo.expires_in * 1000); - } else { - /*Avoid too many requests addressed to server*/ - if(numRetry < 3){ - connections[user].timeoutUpdateToken = setTimeout(() => { updateToken(connectionConfig) }, 5000); - } - else{ - connections[user].timeoutUpdateToken = setTimeout(() => { updateToken(connectionConfig) }, 60000); - } +async function getToken(connectionConfig, organizationID) { + let token; + let delay = 200; + while (true) { + token = await _get(); + if (token) { + return token; + } + await new Promise((resolve) => setTimeout(resolve, delay)); + delay = Math.min(delay * 2, 5000); + } + + async function _get() { + const dataToSend = { + grant_type: 'client_credentials', + client_id: connectionConfig.credentials.clientid, + client_secret: connectionConfig.credentials.clientsecret, + audience: accessTokenAudience + }; + + try { + var req = superagent + .post(accessTokenUri) + .set('content-type', 'application/x-www-form-urlencoded') + .set('accept', 'json') - numRetry++; + if (organizationID) { + req.set('X-Organization', organizationID) + } + + var res = await req.send(dataToSend); + var token = res.body.access_token; + var expires_in = res.body.expires_in * 0.8; // needed to change the token before it expires + if (token !== undefined) { + return { token: token, expires_in: expires_in }; + } + } catch (err) { + if(err.response && err.response.res && err.response.request){ + console.log('statusCode: '+ err.response.res.statusCode +'\r'+ + 'statusMessage: ' + err.response.res.statusMessage + '\r' + + 'text: ' + err.response.res.text + '\r'+ + 'HTTP method: ' + err.response.request.method + '\r' + + 'URL request: ' + err.response.request.url + ); + }else{ + console.log(err); } } - } catch (err) { - console.log(err); } } @@ -277,11 +261,7 @@ async function deleteClientMqtt(clientId, thing, propertyName, nodeId) { await connections[user].clientMqtt.disconnect(); delete connections[user].clientMqtt; connections[user].clientMqtt = null; - if (connections[user].clientHttp === null) { - if (connections[user].timeoutUpdateToken) - clearTimeout(connections[user].timeoutUpdateToken); - connections.splice(user, 1); - } + connections.splice(user, 1); } } } @@ -299,23 +279,14 @@ async function deleteClientHttp(clientId) { } } if (connections[user].clientMqtt === null) { - if (connections[user].timeoutUpdateToken) - clearTimeout(connections[user].timeoutUpdateToken); + if (connections[user].intervalUpdateToken) + clearInterval(connections[user].intervalUpdateToken); connections.splice(user, 1); } } releaseMutex(); } -async function reconnectMqtt(clientId) { - var user = findUser(clientId); - if (user !== -1) { - if(connections[user].clientMqtt){ - await connections[user].clientMqtt.reconnect(); - } - } -} - exports.getClientMqtt = getClientMqtt; exports.getClientHttp = getClientHttp; exports.deleteClientMqtt = deleteClientMqtt; From e0908e1c2c0a9a8b79bd6d30db066f0701b337af Mon Sep 17 00:00:00 2001 From: lucarin91 Date: Tue, 26 Nov 2024 11:57:17 +0100 Subject: [PATCH 2/8] improve refresh token http --- .../arduino-iot-client-mqtt.js | 6 +- utils/arduino-connection-manager.js | 183 +++++++----------- utils/arduino-iot-cloud-api-wrapper.js | 75 ++++--- 3 files changed, 124 insertions(+), 140 deletions(-) diff --git a/arduino-iot-client-mqtt/arduino-iot-client-mqtt.js b/arduino-iot-client-mqtt/arduino-iot-client-mqtt.js index edd9759..02e4933 100644 --- a/arduino-iot-client-mqtt/arduino-iot-client-mqtt.js +++ b/arduino-iot-client-mqtt/arduino-iot-client-mqtt.js @@ -230,7 +230,7 @@ class ArduinoClientMqtt { } async reconnect() { - await this.connection.reconnect(); + this.connection.reconnect(); }; async updateToken(token) { @@ -241,7 +241,7 @@ class ArduinoClientMqtt { try { if (this.connection) { // Disconnect to the connection that is using the old token - await this.connection.end(); + this.connection.end(); // Remove the connection this.connection = null; @@ -656,4 +656,4 @@ function toArrayBuffer(buf) { return ab; } -exports.ArduinoClientMqtt = ArduinoClientMqtt; \ No newline at end of file +exports.ArduinoClientMqtt = ArduinoClientMqtt; diff --git a/utils/arduino-connection-manager.js b/utils/arduino-connection-manager.js index 95da355..32bfac6 100644 --- a/utils/arduino-connection-manager.js +++ b/utils/arduino-connection-manager.js @@ -24,34 +24,33 @@ const accessTokenUri = process.env.NODE_RED_ACCESS_TOKEN_URI || 'https://api2.ar const accessTokenAudience = process.env.NODE_RED_ACCESS_TOKEN_AUDIENCE || 'https://api2.arduino.cc/iot'; const arduinoIotCloudHost = process.env.NODE_RED_MQTT_HOST || 'wss.iot.arduino.cc'; const Mutex = require('async-mutex').Mutex; -/** Connections elem struct + +const mqttMutex = new Mutex(); +/** mqttConnections elem struct * { * clientId: clientId, * connectionConfig: connectionConfig, - * token: token, * clientMqtt: clientMqttobj, + * } + */ +var mqttConnections = []; +const httpMutex = new Mutex(); +/** httpConnections elem struct + * { + * clientId: clientId, + * connectionConfig: connectionConfig, * clientHttp: clientHttpobj, - * intervalUpdateToken: interval * } */ -var connections = []; -const getClientMutex = new Mutex(); +var httpConnections = []; -function getMqttOptions(clientId,token,RED){ +function getMqttOptions(clientId, token, RED){ async function reconnect() { - const releaseMutex = await getClientMutex.acquire(); - let user = findUser(clientId); - if (user !== -1) { - let token = await getToken(connections[user].connectionConfig); - var req = superagent - .post(accessTokenUri) - .set('content-type', 'application/x-www-form-urlencoded') - .set('accept', 'json') - - var res = await req.send(dataToSend); - var token = res.body.access_token; - var expires_in = res.body.expires_in * 0.8; // needed to change the token before it expiress[user].token = token.token; - await connections[user].clientMqtt.updateToken(token.token); + const releaseMutex = await mqttMutex.acquire(); + let id = findUser(clientId); + if (id !== -1) { + let token = await getToken(mqttConnections[id].connectionConfig); + await mqttConnections[id].clientMqtt.updateToken(token); } releaseMutex(); } @@ -60,7 +59,6 @@ function getMqttOptions(clientId,token,RED){ host: arduinoIotCloudHost, token: token, onDisconnect: async () => { - console.log(`connection lost for ${clientId}`); RED.nodes.eachNode((n)=>{ if(n.type === "property in"){ const node = RED.nodes.getNode(n.id); @@ -68,10 +66,10 @@ function getMqttOptions(clientId,token,RED){ } }); + await new Promise((resolve) => setTimeout(resolve, 1000)); await reconnect(); }, onOffline: async () => { - console.log(`connection lost for ${clientId}`); RED.nodes.eachNode((n)=>{ if(n.type === "property in"){ const node = RED.nodes.getNode(n.id); @@ -79,6 +77,7 @@ function getMqttOptions(clientId,token,RED){ } }); + await new Promise((resolve) => setTimeout(resolve, 1000)); await reconnect(); }, onConnected: () =>{ @@ -97,33 +96,23 @@ async function getClientMqtt(connectionConfig, RED) { if (!connectionConfig || !connectionConfig.credentials) { throw new Error("Cannot find connection config or credentials."); } - const releaseMutex = await getClientMutex.acquire(); + const releaseMutex = await mqttMutex.acquire(); try { let clientMqtt; - let user = findUser(connectionConfig.credentials.clientid); - if (user === -1) { + let id = findUser(connectionConfig.credentials.clientid); + if (id === -1) { let token = await getToken(connectionConfig); clientMqtt = new ArduinoClientMqtt.ArduinoClientMqtt(); - connections.push({ + mqttConnections.push({ clientId: connectionConfig.credentials.clientid, connectionConfig: connectionConfig, clientMqtt: clientMqtt, - token: token.token, - clientHttp: null, }); await clientMqtt.connect( - getMqttOptions(connectionConfig.credentials.clientid, token.token, RED), + getMqttOptions(connectionConfig.credentials.clientid, token, RED), ); } else { - if (connections[user].clientMqtt !== null) { - clientMqtt = connections[user].clientMqtt; - } else { - clientMqtt = new ArduinoClientMqtt.ArduinoClientMqtt(); - connections[user].clientMqtt = clientMqtt; - await clientMqtt.connect( - getMqttOptions(connectionConfig.credentials.clientid, connections[user].token, RED) - ); - } + clientMqtt = mqttConnections[id].clientMqtt; } releaseMutex(); @@ -138,42 +127,22 @@ async function getClientHttp(connectionConfig, organizationID) { if (!connectionConfig || !connectionConfig.credentials) { throw new Error("Cannot find cooonection config or credentials."); } - const releaseMutex = await getClientMutex.acquire(); + const releaseMutex = await httpMutex.acquire(); try { - var user = findUser(connectionConfig.credentials.clientid); + var id = findUser(connectionConfig.credentials.clientid); var clientHttp; - if (user === -1) { - - var tokenInfo = await getToken(connectionConfig, organizationID); - if (tokenInfo !== undefined) { - clientHttp = new ArduinoClientHttp.ArduinoClientHttp(tokenInfo.token); - - var interval = setInterval(async () => { - let id = findUser(connectionConfig.credentials.clientid); - if (id !== -1) { - connections[id].token = await getToken(connectionConfig, organizationID); - } - }, tokenInfo.expires_in * 1000); - connections.push({ - clientId: connectionConfig.credentials.clientid, - connectionConfig: connectionConfig, - token: tokenInfo.token, - clientMqtt: null, - clientHttp: clientHttp, - intervalUpdateToken: interval - }); - } + if (id === -1) { + clientHttp = new ArduinoClientHttp.ArduinoClientHttp(async () => await getToken(connectionConfig, organizationID)); + httpConnections.push({ + clientId: connectionConfig.credentials.clientid, + connectionConfig: connectionConfig, + clientHttp: clientHttp, + }); } else { - if (connections[user].clientHttp !== null) { - clientHttp = connections[user].clientHttp; - } else { - clientHttp = new ArduinoClientHttp.ArduinoClientHttp(connections[user].token); - - connections[user].clientHttp = clientHttp; - } + clientHttp = httpConnections[id].clientHttp = clientHttp; } - releaseMutex(); + return clientHttp; } catch (err) { if(err.response && err.response.res && err.response.request){ @@ -190,9 +159,40 @@ async function getClientHttp(connectionConfig, organizationID) { } } +async function deleteClientMqtt(clientId, thing, propertyName, nodeId) { + const releaseMutex = await mqttMutex.acquire(); + var id = findUser(clientId); + if (id !== -1) { + if (mqttConnections[id].clientMqtt !== null) { + var ret = await mqttConnections[id].clientMqtt.removePropertyValueCallback(thing, propertyName,nodeId); + if (ret === 0) { + await mqttConnections[id].clientMqtt.disconnect(); + delete mqttConnections[id].clientMqtt; + mqttConnections[id].clientMqtt = null; + mqttConnections.splice(id, 1); + } + } + } + releaseMutex(); +} + +async function deleteClientHttp(clientId) { + const releaseMutex = await httpMutex.acquire(); + var id = findUser(clientId); + if (id !== -1) { + if (httpConnections[id].clientHttp !== null) { + httpConnections[id].clientHttp.openConnections--; + if (httpConnections[id].clientHttp.openConnections === 0) { + httpConnections.splice(id, 1); + } + } + } + releaseMutex(); +} + function findUser(clientId) { - for (var i = 0; i < connections.length; i++) { - if (connections[i].clientId === clientId) { + for (var i = 0; i < httpConnections.length; i++) { + if (httpConnections[i].clientId === clientId) { return i; } } @@ -200,12 +200,11 @@ function findUser(clientId) { } async function getToken(connectionConfig, organizationID) { - let token; let delay = 200; while (true) { - token = await _get(); + let token = await _get(); if (token) { - return token; + return token.token; } await new Promise((resolve) => setTimeout(resolve, delay)); delay = Math.min(delay * 2, 5000); @@ -249,44 +248,6 @@ async function getToken(connectionConfig, organizationID) { } } } - -async function deleteClientMqtt(clientId, thing, propertyName, nodeId) { - const releaseMutex = await getClientMutex.acquire(); - var user = findUser(clientId); - if (user !== -1) { - if (connections[user].clientMqtt !== null) { - var ret = await connections[user].clientMqtt.removePropertyValueCallback(thing, propertyName,nodeId); - - if (ret === 0) { - await connections[user].clientMqtt.disconnect(); - delete connections[user].clientMqtt; - connections[user].clientMqtt = null; - connections.splice(user, 1); - } - } - } - releaseMutex(); -} - -async function deleteClientHttp(clientId) { - const releaseMutex = await getClientMutex.acquire(); - var user = findUser(clientId); - if (user !== -1) { - if (connections[user].clientHttp !== null) { - connections[user].clientHttp.openConnections--; - if (connections[user].clientHttp.openConnections === 0) { - connections[user].clientHttp = null; - } - } - if (connections[user].clientMqtt === null) { - if (connections[user].intervalUpdateToken) - clearInterval(connections[user].intervalUpdateToken); - connections.splice(user, 1); - } - } - releaseMutex(); -} - exports.getClientMqtt = getClientMqtt; exports.getClientHttp = getClientHttp; exports.deleteClientMqtt = deleteClientMqtt; diff --git a/utils/arduino-iot-cloud-api-wrapper.js b/utils/arduino-iot-cloud-api-wrapper.js index 1fcaf10..3ce359d 100644 --- a/utils/arduino-iot-cloud-api-wrapper.js +++ b/utils/arduino-iot-cloud-api-wrapper.js @@ -30,46 +30,69 @@ const apiSeries = new ArduinoIotClient.SeriesV2Api(client); const apiThings = new ArduinoIotClient.ThingsV2Api(client); class ArduinoClientHttp { - constructor(token) { - this.token = token; + constructor(getToken) { this.openConnections=0; + oauth2.accessToken = ""; if(process.env.API_BASE_PATH){ client.basePath = process.env.API_BASE_PATH; } + + // Wrap the functions with refresh token logic + function withTokenRefresh(fn) { + return async (...args) => { + let delay = 0; + for (;;) { + try { + return await fn(...args); + } catch (e) { + if (e.status === 401) { + oauth2.accessToken = await getToken(); + await new Promise((resolve) => setTimeout(resolve, delay)); + delay = delay===0 ? 200 : Math.min(delay*2, 5000); + continue; + } + throw e; + } + } + }; + } + this.wrappedPropertiesV2Publish = withTokenRefresh(apiProperties.propertiesV2Publish.bind(apiProperties)); + this.wrappedThingsV2List = withTokenRefresh(apiThings.thingsV2List.bind(apiThings)); + this.wrappedThingsV2Show = withTokenRefresh(apiThings.thingsV2Show.bind(apiThings)); + this.wrappedPropertiesV2Show = withTokenRefresh(apiProperties.propertiesV2Show.bind(apiProperties)); + this.wrappedSeriesV2BatchQueryRaw = withTokenRefresh(apiSeries.seriesV2BatchQueryRaw.bind(apiSeries)); } - updateToken(token) { - this.token = token; - } - setProperty(thing_id, property_id, value, opts, device_id = undefined) { + + + async setProperty(thing_id, property_id, value, opts = {}, device_id = undefined) { const body = JSON.stringify({ value: value, - device_id : device_id + device_id: device_id }); - oauth2.accessToken = this.token; - return apiProperties.propertiesV2Publish(thing_id, property_id, body, opts); + return await this.wrappedPropertiesV2Publish(thing_id, property_id, body, opts); } - getThings(opts) { - oauth2.accessToken = this.token; - return apiThings.thingsV2List(opts); + + async getThings(opts = {}) { + return await this.wrappedThingsV2List(opts); } - getThing(thingId, opts) { - oauth2.accessToken = this.token; + + async getThing(thingId, opts = {}) { opts.showDeleted = false; - return apiThings.thingsV2Show(thingId, opts); + return await this.wrappedThingsV2Show(thingId, opts); } - getProperties(thingId, opts) { - oauth2.accessToken = this.token; + + async getProperties(thingId, opts = {}) { opts.showProperties = true; - const thing = apiThings.thingsV2Show(thingId, opts); - return thing.then(({properties}) => properties); + const { properties } = await this.wrappedThingsV2Show(thingId, opts); + return properties; } - getProperty(thingId, propertyId, opts) { - oauth2.accessToken = this.token; - return apiProperties.propertiesV2Show(thingId, propertyId, opts); + + async getProperty(thingId, propertyId, opts = {}) { + return await this.wrappedPropertiesV2Show(thingId, propertyId, opts); } - getSeries(thingId, propertyId, start, end, opts) { - const body = JSON.stringify({ + async getSeries(_thingId, propertyId, start, end, opts = {}) { + const body = JSON.stringify({ requests: [{ q: "property." + propertyId, from: start, @@ -79,8 +102,8 @@ class ArduinoClientHttp { }], resp_version: 1 }); - oauth2.accessToken = this.token; - return apiSeries.seriesV2BatchQueryRaw(body, opts); + return await this.wrappedSeriesV2BatchQueryRaw(body, opts); } } + exports.ArduinoClientHttp = ArduinoClientHttp; From aa5f38951fd950c07de8f34d3cf387532d498843 Mon Sep 17 00:00:00 2001 From: lucarin91 Date: Tue, 26 Nov 2024 13:02:42 +0100 Subject: [PATCH 3/8] fix get properties --- .../arduino-iot-client-mqtt.js | 3 ++ arduino-iot-cloud.js | 2 +- utils/arduino-connection-manager.js | 32 +++++++++---------- 3 files changed, 19 insertions(+), 18 deletions(-) diff --git a/arduino-iot-client-mqtt/arduino-iot-client-mqtt.js b/arduino-iot-client-mqtt/arduino-iot-client-mqtt.js index 02e4933..cd92d5e 100644 --- a/arduino-iot-client-mqtt/arduino-iot-client-mqtt.js +++ b/arduino-iot-client-mqtt/arduino-iot-client-mqtt.js @@ -627,6 +627,9 @@ class ArduinoClientMqtt { node=nodeId; } const propOutputTopic = `/a/t/${thingId}/e/o`; + if (!this.propertyCallback[propOutputTopic] || !this.propertyCallback[propOutputTopic][name]) { + return Promise.resolve(this.numSubscriptions); + } var pos=-1; for(var i=0; i await getToken(connectionConfig, organizationID)); @@ -139,7 +139,7 @@ async function getClientHttp(connectionConfig, organizationID) { clientHttp: clientHttp, }); } else { - clientHttp = httpConnections[id].clientHttp = clientHttp; + clientHttp = httpConnections[id].clientHttp; } releaseMutex(); @@ -161,16 +161,14 @@ async function getClientHttp(connectionConfig, organizationID) { async function deleteClientMqtt(clientId, thing, propertyName, nodeId) { const releaseMutex = await mqttMutex.acquire(); - var id = findUser(clientId); + var id = findUser(mqttConnections, clientId); if (id !== -1) { - if (mqttConnections[id].clientMqtt !== null) { - var ret = await mqttConnections[id].clientMqtt.removePropertyValueCallback(thing, propertyName,nodeId); - if (ret === 0) { - await mqttConnections[id].clientMqtt.disconnect(); - delete mqttConnections[id].clientMqtt; - mqttConnections[id].clientMqtt = null; - mqttConnections.splice(id, 1); - } + var ret = await mqttConnections[id].clientMqtt.removePropertyValueCallback(thing, propertyName, nodeId); + if (ret === 0) { + await mqttConnections[id].clientMqtt.disconnect(); + delete mqttConnections[id].clientMqtt; + mqttConnections[id].clientMqtt = null; + mqttConnections.splice(id, 1); } } releaseMutex(); @@ -178,7 +176,7 @@ async function deleteClientMqtt(clientId, thing, propertyName, nodeId) { async function deleteClientHttp(clientId) { const releaseMutex = await httpMutex.acquire(); - var id = findUser(clientId); + var id = findUser(httpConnections, clientId); if (id !== -1) { if (httpConnections[id].clientHttp !== null) { httpConnections[id].clientHttp.openConnections--; @@ -190,9 +188,9 @@ async function deleteClientHttp(clientId) { releaseMutex(); } -function findUser(clientId) { - for (var i = 0; i < httpConnections.length; i++) { - if (httpConnections[i].clientId === clientId) { +function findUser(connections, clientId) { + for (var i = 0; i < connections.length; i++) { + if (connections[i].clientId === clientId) { return i; } } From 720883baa2e9b8ae4bf726b740a45d5597d79d97 Mon Sep 17 00:00:00 2001 From: lucarin91 Date: Tue, 26 Nov 2024 18:37:26 +0100 Subject: [PATCH 4/8] fix errors --- utils/arduino-connection-manager.js | 24 ++++++++++-------------- 1 file changed, 10 insertions(+), 14 deletions(-) diff --git a/utils/arduino-connection-manager.js b/utils/arduino-connection-manager.js index 9bb2561..821acc9 100644 --- a/utils/arduino-connection-manager.js +++ b/utils/arduino-connection-manager.js @@ -46,6 +46,7 @@ var httpConnections = []; function getMqttOptions(clientId, token, RED){ async function reconnect() { + console.log('Reconnecting to MQTT'); const releaseMutex = await mqttMutex.acquire(); let id = findUser(mqttConnections, clientId); if (id !== -1) { @@ -66,6 +67,8 @@ function getMqttOptions(clientId, token, RED){ } }); + console.log('Disconnected from MQTT'); + await new Promise((resolve) => setTimeout(resolve, 1000)); await reconnect(); }, @@ -77,6 +80,8 @@ function getMqttOptions(clientId, token, RED){ } }); + console.log('Offline from MQTT'); + await new Promise((resolve) => setTimeout(resolve, 1000)); await reconnect(); }, @@ -87,6 +92,8 @@ function getMqttOptions(clientId, token, RED){ node.status({}); } }); + + console.log('Connected to MQTT'); }, useCloudProtocolV2: true }; @@ -114,11 +121,10 @@ async function getClientMqtt(connectionConfig, RED) { } else { clientMqtt = mqttConnections[id].clientMqtt; } - releaseMutex(); - return clientMqtt; } catch (err) { console.log(err); + } finally { releaseMutex(); } } @@ -141,20 +147,10 @@ async function getClientHttp(connectionConfig, organizationID) { } else { clientHttp = httpConnections[id].clientHttp; } - releaseMutex(); - return clientHttp; } catch (err) { - if(err.response && err.response.res && err.response.request){ - console.log('statusCode: '+ err.response.res.statusCode +'\r'+ - 'statusMessage: ' + err.response.res.statusMessage + '\r' + - 'text: ' + err.response.res.text + '\r'+ - 'HTTP method: ' + err.response.request.method + '\r' + - 'URL request: ' + err.response.request.url - ); - }else{ - console.log(err); - } + console.log(err); + } finally { releaseMutex(); } } From 8ef60ebb147029063a7d11d0cbd2bd996e7fea83 Mon Sep 17 00:00:00 2001 From: lucarin91 Date: Tue, 3 Dec 2024 16:36:40 +0100 Subject: [PATCH 5/8] remove token refresh loop for http client --- utils/arduino-connection-manager.js | 72 ++++++++++++-------------- utils/arduino-iot-cloud-api-wrapper.js | 19 +++---- 2 files changed, 42 insertions(+), 49 deletions(-) diff --git a/utils/arduino-connection-manager.js b/utils/arduino-connection-manager.js index 821acc9..deefcc1 100644 --- a/utils/arduino-connection-manager.js +++ b/utils/arduino-connection-manager.js @@ -50,7 +50,7 @@ function getMqttOptions(clientId, token, RED){ const releaseMutex = await mqttMutex.acquire(); let id = findUser(mqttConnections, clientId); if (id !== -1) { - let token = await getToken(mqttConnections[id].connectionConfig); + let token = await waitForToken(mqttConnections[id].connectionConfig); await mqttConnections[id].clientMqtt.updateToken(token); } releaseMutex(); @@ -108,7 +108,7 @@ async function getClientMqtt(connectionConfig, RED) { let clientMqtt; let id = findUser(mqttConnections, connectionConfig.credentials.clientid); if (id === -1) { - let token = await getToken(connectionConfig); + let token = await waitForToken(connectionConfig); clientMqtt = new ArduinoClientMqtt.ArduinoClientMqtt(); mqttConnections.push({ clientId: connectionConfig.credentials.clientid, @@ -193,55 +193,51 @@ function findUser(connections, clientId) { return -1; } -async function getToken(connectionConfig, organizationID) { +async function waitForToken(connectionConfig, organizationID) { let delay = 200; while (true) { - let token = await _get(); + let token = await getToken(connectionConfig); if (token) { return token.token; } await new Promise((resolve) => setTimeout(resolve, delay)); delay = Math.min(delay * 2, 5000); } +} - async function _get() { - const dataToSend = { - grant_type: 'client_credentials', - client_id: connectionConfig.credentials.clientid, - client_secret: connectionConfig.credentials.clientsecret, - audience: accessTokenAudience - }; - - try { - var req = superagent - .post(accessTokenUri) - .set('content-type', 'application/x-www-form-urlencoded') - .set('accept', 'json') - - if (organizationID) { - req.set('X-Organization', organizationID) - } +async function getToken(connectionConfig) { + const dataToSend = { + grant_type: 'client_credentials', + client_id: connectionConfig.credentials.clientid, + client_secret: connectionConfig.credentials.clientsecret, + audience: accessTokenAudience + }; - var res = await req.send(dataToSend); - var token = res.body.access_token; - var expires_in = res.body.expires_in * 0.8; // needed to change the token before it expires - if (token !== undefined) { - return { token: token, expires_in: expires_in }; - } - } catch (err) { - if(err.response && err.response.res && err.response.request){ - console.log('statusCode: '+ err.response.res.statusCode +'\r'+ - 'statusMessage: ' + err.response.res.statusMessage + '\r' + - 'text: ' + err.response.res.text + '\r'+ - 'HTTP method: ' + err.response.request.method + '\r' + - 'URL request: ' + err.response.request.url - ); - }else{ - console.log(err); - } + try { + var req = superagent + .post(accessTokenUri) + .set('content-type', 'application/x-www-form-urlencoded') + .set('accept', 'json') + + if (organizationID) { + req.set('X-Organization', organizationID) + } + + var res = await req.send(dataToSend); + var token = res.body.access_token; + var expires_in = res.body.expires_in; + if (token !== undefined) { + return { token: token, expires_in: expires_in }; + } + } catch (err) { + if(err.response && err.response.res){ + console.log("cannot get token: " + err.response.res.statusCode + ' ' + err.response.res.statusMessage); + }else{ + console.log(err); } } } + exports.getClientMqtt = getClientMqtt; exports.getClientHttp = getClientHttp; exports.deleteClientMqtt = deleteClientMqtt; diff --git a/utils/arduino-iot-cloud-api-wrapper.js b/utils/arduino-iot-cloud-api-wrapper.js index 3ce359d..776b3cc 100644 --- a/utils/arduino-iot-cloud-api-wrapper.js +++ b/utils/arduino-iot-cloud-api-wrapper.js @@ -40,19 +40,16 @@ class ArduinoClientHttp { // Wrap the functions with refresh token logic function withTokenRefresh(fn) { return async (...args) => { - let delay = 0; - for (;;) { - try { - return await fn(...args); - } catch (e) { - if (e.status === 401) { - oauth2.accessToken = await getToken(); - await new Promise((resolve) => setTimeout(resolve, delay)); - delay = delay===0 ? 200 : Math.min(delay*2, 5000); - continue; + try { + return await fn(...args); + } catch (e) { + if (e.status === 401) { + oauth2.accessToken = await getToken(); + if (oauth2.accessToken) { + return await fn(...args); } - throw e; } + throw e; } }; } From c88b7728855b21fd4a2c5a06d55b746d9ae75c91 Mon Sep 17 00:00:00 2001 From: lucarin91 Date: Tue, 3 Dec 2024 16:55:36 +0100 Subject: [PATCH 6/8] remove expiration field --- utils/arduino-connection-manager.js | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/utils/arduino-connection-manager.js b/utils/arduino-connection-manager.js index deefcc1..803effb 100644 --- a/utils/arduino-connection-manager.js +++ b/utils/arduino-connection-manager.js @@ -198,7 +198,7 @@ async function waitForToken(connectionConfig, organizationID) { while (true) { let token = await getToken(connectionConfig); if (token) { - return token.token; + return token; } await new Promise((resolve) => setTimeout(resolve, delay)); delay = Math.min(delay * 2, 5000); @@ -225,9 +225,8 @@ async function getToken(connectionConfig) { var res = await req.send(dataToSend); var token = res.body.access_token; - var expires_in = res.body.expires_in; if (token !== undefined) { - return { token: token, expires_in: expires_in }; + return token; } } catch (err) { if(err.response && err.response.res){ From 8d6dd6a4cc0c4bf9229e652890c2930e2040e026 Mon Sep 17 00:00:00 2001 From: lucarin91 Date: Wed, 4 Dec 2024 10:15:31 +0100 Subject: [PATCH 7/8] make a single refresh token --- utils/arduino-iot-cloud-api-wrapper.js | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/utils/arduino-iot-cloud-api-wrapper.js b/utils/arduino-iot-cloud-api-wrapper.js index 776b3cc..f4b8624 100644 --- a/utils/arduino-iot-cloud-api-wrapper.js +++ b/utils/arduino-iot-cloud-api-wrapper.js @@ -37,14 +37,27 @@ class ArduinoClientHttp { client.basePath = process.env.API_BASE_PATH; } - // Wrap the functions with refresh token logic + // wrap the functions with refresh token logic + let refreshingToken = null; function withTokenRefresh(fn) { return async (...args) => { try { return await fn(...args); } catch (e) { if (e.status === 401) { - oauth2.accessToken = await getToken(); + // make sure only one refresh token is in progress + if (!refreshingToken) { + refreshingToken = (async () => { + try { + oauth2.accessToken = await getToken(); + } finally { + refreshingToken = null; + } + })(); + } + await refreshingToken; + + // eagerly retry the request if (oauth2.accessToken) { return await fn(...args); } From 0ead7d0bec8ecc628e84023c9d2570da04da2290 Mon Sep 17 00:00:00 2001 From: mirkokurt Date: Thu, 20 Feb 2025 14:29:39 -0100 Subject: [PATCH 8/8] fix: add organizationID to getToken method --- utils/arduino-connection-manager.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/utils/arduino-connection-manager.js b/utils/arduino-connection-manager.js index 803effb..05ecf60 100644 --- a/utils/arduino-connection-manager.js +++ b/utils/arduino-connection-manager.js @@ -196,7 +196,7 @@ function findUser(connections, clientId) { async function waitForToken(connectionConfig, organizationID) { let delay = 200; while (true) { - let token = await getToken(connectionConfig); + let token = await getToken(connectionConfig, organizationID); if (token) { return token; } @@ -205,7 +205,7 @@ async function waitForToken(connectionConfig, organizationID) { } } -async function getToken(connectionConfig) { +async function getToken(connectionConfig, organizationID) { const dataToSend = { grant_type: 'client_credentials', client_id: connectionConfig.credentials.clientid,