Announcement

Collapse
No announcement yet.

Not all updates are published to ascii interface

Collapse
X
 
  • Filter
  • Time
  • Show
Clear All
new posts

    Not all updates are published to ascii interface

    Hi,

    I'm running several devices over MQTT, and am using the ASCII interface to push update to influxdb/telegraf.

    However when multiple devices publish in quick succession, or simultaneously, some of the updates do not make it to ascii interface.

    Is this behavior known or am I missing something to enable rapid ascii interface updates? I am seeing the updates reflected in the dashboard

    For instance:

    MQTT log:
    Code:
    Oct 18 12:22:27 livingroom/environmental/firefly1/Temperature 23.37
    Oct 18 12:22:32 livingroom/environmental/firefly1/Humidity 58.47
    Oct 18 12:22:37 livingroom/environmental/firefly1/Pressure 1000.76
    Oct 18 12:22:42 livingroom/environmental/firefly1/MotionSensor 1
    Oct 18 12:22:47 livingroom/environmental/firefly1/Lux 283.38
    Oct 18 12:25:01 masterbed/environmental/firefly3/Temperature 22.94
    Oct 18 12:25:06 masterbed/environmental/firefly3/Humidity 52.67
    Oct 18 12:25:11 masterbed/environmental/firefly3/Pressure 1000.74
    Oct 18 12:25:16 masterbed/environmental/firefly3/MotionSensor 1
    Oct 18 12:25:21 masterbed/environmental/firefly3/Lux 172.85
    Oct 18 12:25:36 sensor/status/firefly3 Connected
    Oct 18 12:27:27 livingroom/environmental/firefly1/Temperature 23.52
    Oct 18 12:27:32 livingroom/environmental/firefly1/Humidity 53.83
    Oct 18 12:27:37 livingroom/environmental/firefly1/Pressure 1000.78
    Oct 18 12:27:42 livingroom/environmental/firefly1/MotionSensor 1
    Oct 18 12:27:47 livingroom/environmental/firefly1/Lux 443.03
    Resuling ASCII interface from telnet:

    Code:
    DC,204,23.37,23.3
    DC,205,58.47,52.53
    DC,206,1000.76,1000.85
    DC,207,1,0
    DC,208,283.38,259.8
    DC,204,23.52,23.37
    DC,205,53.83,58.47
    DC,206,1000.78,1000.76
    DC,208,443.03,283.38
    Several of the MQTT messages are missing

    #2
    I never did stress testing. I will take a look. For my application the concern was just the opposite as I did not want to exceed the link budget of the LoRa interface.

    Comment


      #3
      Thanks Michael.

      If there's any other information I can provide please let me know.

      Basically I've started setting up Wemos D1's with BME280, TSL2651, SDS011 and PIR, and have them report on 5 minute intervals.

      I should also mention that this isn't super stressing the system now. If you look at the timestamps in the first set, I've gone as far as spacing out each update by 5 seconds to try to eliminate overwhelming the API.

      I'm currently working on adding jitter to the updates so there isn't recurring time collisions (though in this case that isn't a problem).

      Edit: Also let me get a different example, I realized during testing Firefly3 MQTT messages weren't associated. Working on that now.

      Comment


        #4
        I looked at the code and there is a queue managed to stack up messages and then send them one at a time, but no flow control was implemented. I can make that selection available. Does your client use hardware or xon/xoff protocol for flow control? Are you on Linux on Windows. Are you using HS4 or HS3 version of the plugin.

        Why are you using serial rather than IP interface to get your data to influxdb?

        Comment


          #5
          Here's a better example:

          Code:
          Oct 18 14:00:45 livingroom/environmental/firefly1/Temperature 23.17
          Oct 18 14:00:50 livingroom/environmental/firefly1/Humidity 54.39
          Oct 18 14:00:55 livingroom/environmental/firefly1/Pressure 1000.67
          Oct 18 14:01:00 livingroom/environmental/firefly1/MotionSensor 1
          Oct 18 14:01:00 masterbed/environmental/firefly3/Temperature 22.76
          Oct 18 14:01:05 livingroom/environmental/firefly1/Lux 57.48
          Oct 18 14:01:05 masterbed/environmental/firefly3/Humidity 54.67
          Oct 18 14:01:10 masterbed/environmental/firefly3/Pressure 1000.67
          Oct 18 14:01:15 masterbed/environmental/firefly3/MotionSensor 1
          Oct 18 14:01:20 sensor/status/firefly1 Connected
          Oct 18 14:01:20 masterbed/environmental/firefly3/Lux 172.85
          Oct 18 14:01:35 sensor/status/firefly3 Connected
          You can see below, that MotionSensor 1 from Firefly1 was not published on the ascii interface. Nor was the Temperature from Firefly3 which came at the same time.

          Code:
          DC,204,23.17,23.16
          DC,205,54.39,54.36
          DC,206,1000.67,1000.68
          DC,208,57.48,61.33
          DC,219,54.67,54.66
          DC,220,1000.67,1000.7
          DC,216,172.85,184.57

          Comment


            #6
            Originally posted by Michael McSharry View Post
            I looked at the code and there is a queue managed to stack up messages and then send them one at a time, but no flow control was implemented. I can make that selection available. Does your client use hardware or xon/xoff protocol for flow control? Are you on Linux on Windows. Are you using HS4 or HS3 version of the plugin.

            Why are you using serial rather than IP interface to get your data to influxdb?
            Ah, uh, I'm not sure what xon/xoff is, let me look it up.

            I am running this on Linux via docker

            Full setup: 4.1.0.5 running via docker on Linux LTS 20.04. So also HS4. Version of the plugin is 5.5.2.0, so HS4 version.

            There is no flow control on either the sensor side or the ascii client side.

            What is the IP interface? Got any documentation that I can ref? I'm only aware of the ASCII and JSON interfaces. And the JSON interafce doesn't publish device updates.

            It's been a long time since my days of being a software engineer so I'm scraping my way through this.

            Comment


              #7
              Arduino Code:

              Code:
              #include <Wire.h>
              #include "SPI.h"
              #include <Adafruit_Sensor.h>
              #include <Adafruit_BME280.h>
              #include <Streaming.h>
              #include <ESP8266WiFi.h>
              #include <PubSubClient.h>
              #include <malloc.h>
              #include "SdsDustSensor.h"
              #include <SparkFunTSL2561.h>
              #include "Adafruit_TSL2591.h"
              #include <ArduinoHttpClient.h>
              #include <ArduinoJson.h>
              
              // Enables
              const bool ENABLE_BME280   = true; 
              const bool ENABLE_TEMT6000 = false;
              const bool ENABLE_PIR      = false;
              const bool ENABLE_SDS011   = false;
              const bool ENABLE_TSL2561  = false;
              const bool ENABLE_TSL2591  = false;
              
              // Configuration Options
              //char* SENSOR_ID = "firefly1"; const char* LOCATION2 = "livingroom"; const char* LOCATION1 = "home"; const char* MEASUREMENT = "environmental";
              char* SENSOR_ID = "firefly2"; const char* LOCATION2 = "serverroom"; const char* LOCATION1 = "home"; const char* MEASUREMENT = "environmental";
              //char* SENSOR_ID = "firefly3"; const char* LOCATION2 = "masterbed"; const char* LOCATION1 = "home"; const char* MEASUREMENT = "environmental";
              //char* SENSOR_ID = "test"; const char* LOCATION2 = "test"; const char* LOCATION1 = "test"; const char* MEASUREMENT = "environmental";
              
              #define SENSORREADDELAY 300000 // Read BME280 once per 5 minutes (300000)
              #define ALTREADDELAY 8640000 // Read once a day
              
              const char* MQTT_SERVER = "mqtt.lan";
              const uint16_t MQTT_SERVER_PORT = 1883;
              const char* MQTT_USER = NULL;
              const char* MQTT_PASS = NULL;
              
              // WiFi definitions
              const char* WIFI_SSID = "SSID";
              const char* WIFI_PASS = "SSIDPWD";
              
              // MQTT Definitions
              char* MQTT_TOPIC_BASE = "sensor/environmental";
              char* MQTT_TOPIC_CONFIG_BASE = "sensor/config";
              
              // Variables
              long nextSensorRead = 0; // Track when the last sensor read occured
              long altNextReadTime = 0;
              char* mqttSensorTopic;
              
              // Constructors
              void mqttCallback(char* topic, byte* payload, unsigned int length);
              
              // BME280
              void bme280Init();
              void bme280Read();
              float bme280Temp();
              float bme280Humidity();
              float bme280Pressure();
              float bme280Altitude();
              int getLocalPressure();
              
              // TEMT6000
              float temt6000Read(int lumiPin, int &rawVal);
              
              // MQTT & WiFi
              void mqttInit();
              void mqttLoop();
              void mqttPublish(char *topic, float payload);
              void mqttPublish(char *topic, int payload);
              void mqttPublish(char *topic, char* payload);
              void mqttPublish(char *topic, double payload);
              void wifiInit();
              
              // System & Helpers
              void printI2CError(byte error);
              byte commaSplit(const char *input_buffer);
              char *concatenate(const char *a, const char *b, const char *c);
              char *concatenate(const char *a, const char *b);  
              
              // TSL2561 and TSL2591
              void tsl2561Init();
              int tsl2561Read(double &lux);
              void tsl2591Init();
              void tsl2591Read(double &lux);
              
              // SDS011 Air quality sensor
              void sdsInit();
              void sdsLoop(float &pm25, float &pm10, bool validRead);
              
              /**************************
               * Setup
               **************************/
              void setup() 
              {
                Serial.begin(9600); //Begin serial communication at 9600bps
                while (!Serial);
                delay(1000);
                Serial.println("Starting up...");
              
                // /////////////////////
                // Wifi and MQTT setup
                // /////////////////////
                wifiInit();  
                //mqttClient.setServer("192.168.1.10", MQTT_SERVER_PORT);
                mqttInit();
                if (ENABLE_BME280) { bme280Init(); }
                if (ENABLE_TSL2561) { tsl2561Init(); }
                if (ENABLE_TSL2591) { tsl2591Init(); }
                if (ENABLE_SDS011) { sdsInit(); }
              
                Serial.println("=================================");
                Serial.print("BME280 Enabled: ");
                Serial.println(ENABLE_BME280);
                Serial.print("TEMT6000 Enabled: ");
                Serial.println(ENABLE_TEMT6000);
                Serial.print("PIR Enabled: ");
                Serial.println(ENABLE_PIR);
                Serial.print("SDS011 Enabled: ");
                Serial.println(ENABLE_SDS011);
                Serial.print("TSL2561 Enabled: ");
                Serial.println(ENABLE_TSL2561);
                Serial.println("=================================");
              }
              
              /**************************
               * LOOP
               **************************/
              void loop() 
              {
                // Get the current time
                unsigned long now = millis();
              
                mqttLoop();
              
                // Read sensors every SENSORREADDELAY
                if (now > nextSensorRead)
                {
                  Serial.println("\nAttempting to read BME280...");
                  nextSensorRead = now + SENSORREADDELAY;
              
                  if (ENABLE_PIR) { pirRead(); }
                  if (ENABLE_BME280) { bme280Read(); }
                  double tslLux;
                  if (ENABLE_TSL2561) { tsl2561Read(tslLux); }
                  if (ENABLE_TSL2591) { tsl2591Read(tslLux); }
                  if (ENABLE_TEMT6000)
                  {
                    int lumiRaw;
                    float lux;
                    lux = temt6000Read(lumiRaw);
                  }
                }
              
                if (ENABLE_BME280 && now > altNextReadTime)
                {
                  Serial.println();
                  float alt = bme280Altitude();
                  altNextReadTime = now + ALTREADDELAY;
                }
              
                if (ENABLE_SDS011) 
                { 
                  float pm25, pm10;
                  bool sdsValidRead = false;
                  sdsLoop(pm25, pm10, sdsValidRead); 
                }
              
              
                Serial.print(".");
                delay(5000);
              }
              
              
              
              void printI2CError(byte error)
                // If there's an I2C error, this function will
                // print out an explanation.
              {
                Serial.print("I2C error: ");
                Serial.print(error,DEC);
                Serial.print(", ");
              
                switch(error)
                {
                  case 0:
                    Serial.println("success");
                    break;
                  case 1:
                    Serial.println("data too long for transmit buffer");
                    break;
                  case 2:
                    Serial.println("received NACK on address (disconnected?)");
                    break;
                  case 3:
                    Serial.println("received NACK on data");
                    break;
                  case 4:
                    Serial.println("other error");
                    break;
                  default:
                    Serial.println("unknown error");
                }
              }
              Code:
              char* MQTT_TOPIC_STATUS_BASE = "sensor/status";
              #define MQTT_VERSION MQTT_VERSION_3_1_1
              const int MQTT_QOS = 2;
              const char* MQTT_WILL_MSG = "offline";
              
              PubSubClient mqttClient(MQTT_SERVER, MQTT_SERVER_PORT, mqttCallback, wifiClient);
              
              void mqttInit()
              {
                mqttSubscribe();
                mqttSensorTopic = mqttAssembleTopic(MQTT_TOPIC_BASE, SENSOR_ID);
              }
              
              void mqttLoop()
              {
                mqttReconnect();
                mqttClient.loop(); // This is required by the mqtt library
              }
              
              
              /**************************
               * MQTT Callback, runs when mqtt message received
               **************************/
              void mqttSubscribe()
              {
                char* mqttSubscribeStatus = mqttAssembleTopic(MQTT_TOPIC_STATUS_BASE, SENSOR_ID);
              
                mqttClient.subscribe(mqttSubscribeStatus, 1);
                Serial.print("Subscribed to topic: ");
                Serial.println(mqttSubscribeStatus);
              }
              
              void mqttCallback(char* topic, byte* payload, unsigned int length)
              {
                Serial.print("Callback recieved: ");
                Serial.print(topic);
                Serial.print(" ");
                for (int i=0;i<length;i++) {
                  Serial.println((char)payload[i]);
                }
              
                mqttPublish("YAR", "Blah");
              }
              
              /**************************
               * MQTT Reconnect
               **************************/
              void mqttReconnect()
              {
                if (!mqttClient.connected())
                {
                  Serial.print("\nreconnecting to mqtt broker...");
              
                  if (mqttClient.connect(SENSOR_ID, MQTT_USER, MQTT_PASS)) {
                    Serial.println("Connected");
                    char* topic = mqttAssembleTopic(MQTT_TOPIC_STATUS_BASE, SENSOR_ID);
                    mqttPublish(topic, "Connected");
                    delay(2000); // Delay for 2 seconds for good measure
                  }
                  else
                  {
                    Serial.print("connection failed. Reason: ");
                    Serial.println(mqttClient.state());
                    Serial.println("Trying again in 5 seconds");
                    delay(5000);
                  }
                }
              }
              
              /**************************
               * MQTT Publish
               **************************/
              void mqttPublish(char *topic, double payload)
              {
                mqttPublishFinal(topic, String(payload).c_str());
              }
              void mqttPublish(char *topic, float payload)
              {
                mqttPublishFinal(topic, String(payload).c_str());
              }
              void mqttPublish(char *topic, int payload)
              {
                mqttPublishFinal(topic, String(payload).c_str());
              }
              void mqttPublish(char *topic, char* payload)
              {
                mqttPublishFinal(topic, String(payload).c_str());
              }
              void mqttPublishFinal(char *topic, const char* payload) 
              {
                Serial.print(topic);
                Serial.print(": ");
                Serial.println(payload);
              
                mqttClient.publish(topic, payload, true);
              }
              
              /**************************
               * Appends a topic to a base topic
               **************************/
              char *mqttAssembleTopic(char* baseTopic, char* appendTopic)
              {
                return concatenate(baseTopic, "/", appendTopic);
              }

              Comment


                #8
                Try to use the message rate throttle so messages are limited to the rate that is supported by the remainder of the system. This is setup at the same place as the baud rate for the port. The manual should describe it.

                Comment


                  #9
                  (realized I misread your question about IP vs serial input to influx DB, its because I'm using telegraf to translate to influxDB)

                  Sorry, need a little more clarification on where to throttle messages. Are you talking about from the Wemos D1/Arduino side? Not quite sure how to do that. Will do some searching.

                  Is 5 second spacing of messages not enough spacing to avoid getting rates throttled? And the MQTT messages are getting published correctly, its just not getting translated to the ASCII interface?

                  Or a I totally misunderstanding what you're suggesting?

                  Comment


                    #10
                    Section 12.4 or the mcsMQTT manual describes the Serial support. Figure 59 shows the setup with Min Send Interval. If you put something like 100 in the text box then mcsMQTT will send messages at a rate no faster than 100 milliseconds per message. @9600 baud there can be 960 bytes/second. If your lines are 30 characters per line then 960/30=32 messages/second = 31 milliseconds/message. 100 milliseconds then gives you a 300% margin. If you baud rate or message characteristics are different then adjust apporpriately.

                    Comment


                      #11
                      Okay, I'll give it a shot. Thanks. Will report back.

                      Comment

                      Working...
                      X