#!/usr/bin/python3 # -*- coding: utf-8 -*- # Copyright (c) 2010-2013 Roger Light # # All rights reserved. This program and the accompanying materials # are made available under the terms of the Eclipse Distribution License v1.0 # which accompanies this distribution. # # The Eclipse Distribution License is available at # http://www.eclipse.org/org/documents/edl-v10.php. # # Contributors: # Roger Light - initial implementation # Copyright (c) 2010,2011 Roger Light # All rights reserved. # This shows a simple example of an MQTT subscriber. import signal import sys import json import datetime import paho.mqtt.client as mqtt import requests def signal_handler(sig, frame): mqttc.disconnect() sys.exit(0) prefix = 'home/rtl_433' vmhost = 'localhost' mqtthost = 'localhost' #prefix = 'rtl_433/fedora' TAGS_KEYS = ['type','model','subtype','channel','id'] DELAYRECORD = 300 MINTS = 1735686000 # 01/01/2025 00:00:00 recorddb = {} def on_connect(mqttc, obj, flags, rc): print("rc: " + str(rc)) def create_measure(name, timestamp, tags, value): #jsonmeasure = { "metric": { "__name__": name, "instance": "rtl433", "job": "rtl433tovm" }, "values": [value], "timestamps":[timestamp*1000] } jsonmeasure = { "metric": { "__name__": name }, "values": [value], "timestamps":[timestamp*1000] } jsonmeasure["metric"].update(tags) return json.dumps(jsonmeasure) + '\n' def on_message(mqttc, obj, msg): #print(msg.topic + " " + str(msg.qos) + " " + str(msg.payload)) currentts=int(datetime.datetime.now(datetime.UTC).timestamp()) jsonin=json.loads(msg.payload) ref="" if "model" in jsonin: ref+=str(jsonin["model"]) if "channel" in jsonin: ref+=str(jsonin["channel"]) if "id" in jsonin: ref+=str(jsonin["id"]) if ref not in recorddb: recorddb[ref] = {} recorddb[ref]["lastsend"]=0 if currentts > MINTS and (currentts < recorddb[ref]["lastsend"] or currentts > (recorddb[ref]["lastsend"] + DELAYRECORD)): tags = {} for t in TAGS_KEYS: if t in jsonin: tags[t] = str(jsonin[t]) measures = "" if 'temperature_C' in jsonin: value = float(jsonin['temperature_C']) if 'temperature' not in recorddb[ref] or recorddb[ref]['temperature'] != value: measures += create_measure("temperature", currentts, tags, value) recorddb[ref]['temperature'] = value if 'humidity' in jsonin: value = int(jsonin['humidity']) if 'humidity' not in recorddb[ref] or recorddb[ref]['humidity'] != value: measures += create_measure("humidity", currentts, tags, value) recorddb[ref]['humidity'] = value if 'battery_ok' in jsonin: value = int(jsonin['battery_ok']) if 'battery_ok' not in recorddb[ref] or recorddb[ref]['battery_ok'] != value: measures += create_measure("battery_ok", currentts, tags, value) recorddb[ref]['battery_ok'] = value if len(measures) > 0: recorddb[ref]["lastsend"] = currentts #print(measures) try: requests.post("http://{}:8428/api/v1/import".format(vmhost), data=measures) except: print("VictoriaMetrics communication error") def on_subscribe(mqttc, obj, mid, granted_qos): print("Subscribed: " + str(mid) + " " + str(granted_qos)) signal.signal(signal.SIGINT, signal_handler) # If you want to use a specific client id, use # mqttc = mqtt.Client("client-id") # but note that the client id must be unique on the broker. Leaving the client # id parameter empty will generate a random id for you. mqttc = mqtt.Client() mqttc.on_message = on_message mqttc.on_connect = on_connect mqttc.on_subscribe = on_subscribe # Uncomment to enable debug messages # mqttc.on_log = on_log mqttc.connect(mqtthost, 1883, 60) mqttc.subscribe(prefix+"/events", 0) mqttc.loop_forever()