#!/usr/bin/python3 # -*- coding: utf-8 -*- # Copyright (c) 2010-2013 Roger Light # # All rights reserved. This program and the accompanying materials # are made available under the terms of the Eclipse Distribution License v1.0 # which accompanies this distribution. # # The Eclipse Distribution License is available at # http://www.eclipse.org/org/documents/edl-v10.php. # # Contributors: # Roger Light - initial implementation # Copyright (c) 2010,2011 Roger Light # All rights reserved. # This shows a simple example of an MQTT subscriber. import signal import sys import json import datetime import paho.mqtt.client as mqtt from influxdb import InfluxDBClient def signal_handler(sig, frame): influxc.close() mqttc.disconnect() sys.exit(0) prefix = 'home/rtl_433' #prefix = 'rtl_433/fedora' TAGS_KEYS = ['type','model','subtype','channel','id'] DBNAME = "rtl433" DELAYRECORD=300 recorddb = {} def on_connect(mqttc, obj, flags, rc): print("rc: " + str(rc)) def on_message(mqttc, obj, msg): #print(msg.topic + " " + str(msg.qos) + " " + str(msg.payload)) currentdt=datetime.datetime.now(datetime.UTC) dateinflux=currentdt.strftime("%Y-%m-%dT%H:%M:%SZ") jsonin=json.loads(msg.payload) ref="" if "model" in jsonin: ref+=str(jsonin["model"]) if "channel" in jsonin: ref+=str(jsonin["channel"]) if "id" in jsonin: ref+=str(jsonin["id"]) if ref not in recorddb: recorddb[ref] = {} recorddb[ref]["lastsend"]=0 #print(jsonin) tags = {} for t in TAGS_KEYS: if t in jsonin: tags[t] = jsonin[t] points = [] fields = {} changed = False if 'temperature_C' in jsonin: fields = {'value': float(jsonin['temperature_C'])} measurement = 'temperature_C' points.append({'measurement': measurement, 'tags': tags, 'time': dateinflux, 'fields': fields}) if 'temperature_C' not in recorddb[ref] or recorddb[ref]['temperature_C'] != fields["value"]: changed = True recorddb[ref]['temperature_C'] = fields["value"] if 'humidity' in jsonin: fields = {'value': int(jsonin['humidity'])} measurement = 'humidity' points.append({'measurement': measurement, 'tags': tags, 'time': dateinflux, 'fields': fields}) if 'humidity' not in recorddb[ref] or recorddb[ref]['humidity'] != fields["value"]: changed = True recorddb[ref]['humidity'] = fields["value"] if 'battery_ok' in jsonin: fields = {'value': True if jsonin['battery_ok']==1 else False} measurement = 'battery_ok' points.append({'measurement': measurement, 'tags': tags, 'time': dateinflux, 'fields': fields}) if 'battery_ok' not in recorddb[ref] or recorddb[ref]['battery_ok'] != fields["value"]: changed = True recorddb[ref]['battery_ok'] = fields["value"] current_ts = currentdt.timestamp() if changed == True and (current_ts < recorddb[ref]["lastsend"] or current_ts > (recorddb[ref]["lastsend"] + DELAYRECORD)): recorddb[ref]["lastsend"] = current_ts #print("Write") #print(ref) print(points) influxc.write_points(points) #else: # print("Not Write") # print(ref) # print(recorddb[ref]["lastsend"]) def on_subscribe(mqttc, obj, mid, granted_qos): print("Subscribed: " + str(mid) + " " + str(granted_qos)) signal.signal(signal.SIGINT, signal_handler) # If you want to use a specific client id, use # mqttc = mqtt.Client("client-id") # but note that the client id must be unique on the broker. Leaving the client # id parameter empty will generate a random id for you. mqttc = mqtt.Client() mqttc.on_message = on_message mqttc.on_connect = on_connect mqttc.on_subscribe = on_subscribe # Uncomment to enable debug messages # mqttc.on_log = on_log mqttc.connect("localhost", 1883, 60) mqttc.subscribe(prefix+"/events", 0) influxc = InfluxDBClient(host="localhost", port=8428) dbs = influxc.get_list_database() dbs_list = [] for db in dbs: dbs_list.append(db.get("name")) if DBNAME not in dbs_list: influxc.create_database(DBNAME) influxc.switch_database(DBNAME) mqttc.loop_forever()