You can publish your own data to EdgePi's backend by publishing to the running MQTT broker on your EdgePi.
All you need to do is to follow a certain JSON message format and pubish the message to the localhost
MQTT.
{
'value': <value-to-publish>,
'timestamp_ms': <unix-epoch-in-ms>,
'metric': 'aux/<atrribute-name>',
'tags': {
'data_unit': '<data-unit-name>',
'data_type': '<int|float|bool>',
'read_status': {0 | 1 | 2}, // 0: ok, 1: com err, 2: read err
'device_name': '<device-name>'
}
}
import time
import json
import random
import paho.mqtt.client as mqtt
from datetime import datetime
def publish_data(mqtt_client):
value = random.uniform(0, 100)
timestamp_ms= int(datetime.timestamp(datetime.now()) * 10**3)
data_id = '1'
attribute_name = 'UV Index'
device_name = 'Sensor 01'
data_unit = 'mW/m²'
data_type = 'float'
read_status = 0 # Translates to 'OK'
data = {
'value': value,
'timestamp_ms': timestamp_ms,
'metric': f'aux/{attribute_name}',
'tags': {
'data_unit': data_unit,
'data_type': data_type,
'read_status': read_status,
'device_name': device_name
}
}
print(f'Publishing data {data}')
mqtt_client.publish(f'edgepi/data/aux/{data_id}', payload=json.dumps(data), qos=1)
mqtt_client = mqtt.Client(protocol=mqtt.MQTTv5, transport='tcp')
mqtt_client.connect('localhost')
mqtt_client.loop_start()
try:
while True:
publish_data(mqtt_client)
time.sleep(2)
finally:
mqtt_client.disconnect()