Code: Select all
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# This software has been designed to work with Nibe Uplink API https://api.nibeuplink.com/Home/About
# It's aimed for Domoticz running on Raspberry Pi
#
# This software licensed under the GNU General Public License v3.0
import json
import os
import plistlib
import requests
from urllib import urlencode
import sys, getopt
from datetime import datetime
import time
import logging
# Global (module) namespace variables
redirect_uri = "https://www.marshflattsfarm.org.uk/nibeuplink/oauth2callback" # Don't alter this or anything else!
baseEndPointUrl = 'https://api.nibeuplink.com'
authorizeEndpointUrl = baseEndPointUrl + '/oauth/authorize'
tokenEndpointUrl = baseEndPointUrl + '/oauth/token'
cfgFile = sys.path[0] + '/config.json'
configChanged = False
newDevicesList = [] # This is a python list
PROGRAMNAME = 'Domoticz RPC for NIBE Uplink'
VERSION = '2.0.0'
MSG_ERROR = 'Error'
MSG_INFO = 'Info'
MSG_EXEC = 'Exec info'
tty = True if os.isatty(sys.stdin.fileno()) else False
isDebug = False
isVerbose = False
def query_yes_no(question, default="no"):
"""
Ask a yes/no question via raw_input() and return their answer.
"question" is a string that is presented to the user.
"default" is the presumed answer if the user just hits <Enter>.
It must be "yes" (the default), "no" or None (meaning
an answer is required of the user).
The "answer" return value is True for "yes" or False for "no".
"""
valid = {"yes": True, "y": True, "ye": True,
"no": False, "n": False}
if default is None:
prompt = " [y/n] "
elif default == "yes":
prompt = " [Y/n] "
elif default == "no":
prompt = " [y/N] "
else:
raise ValueError("invalid default answer: '%s'" % default)
while True:
sys.stdout.write(question + prompt)
choice = raw_input().lower()
if default is not None and choice == '':
return valid[default]
elif choice in valid:
return valid[choice]
else:
sys.stdout.write("Please respond with 'yes' or 'no' "
"(or 'y' or 'n').\n")
def connected_to_internet(url='http://www.google.com/', timeout=5):
try:
_ = requests.head(url, timeout=timeout)
return True
except requests.ConnectionError:
print('No internet connection available.')
return False
def default_input(message, defaultVal):
if defaultVal:
return raw_input( "%s [%s]:" % (message, defaultVal) ) or defaultVal
else:
return raw_input( "%s :" % (message) )
def create_config():
global cfg;cfg = {}
import socket
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(('8.8.8.8', 0)) # connecting to a UDP address doesn't send packets
local_ip_address = s.getsockname()[0]
cfg['domoticz'] = {}
cfg['domoticz']['hostName'] = default_input('Domoticz web service IP address (or host name)', local_ip_address)
cfg['domoticz']['portNumber'] = default_input('Domoticz web service port number', 8080)
cfg['domoticz']['protocol'] = ''
while cfg['domoticz']['protocol'] <> 'http' and cfg['domoticz']['protocol'] <> 'https':
cfg['domoticz']['protocol'] = default_input('Domoticz web service communication protocol (http or https)', 'http')
if cfg['domoticz']['protocol'] <> 'http' and cfg['domoticz']['protocol'] <> 'https':
print 'Invalid value given for Domoticz web service communication protocol. It must be \'http\' or \'https\''
cfg['domoticz']['httpBasicAuth'] = {}
cfg['domoticz']['httpBasicAuth']['userName'] = \
default_input('Domoticz web service user name (leave blank if no username is needed)', '')
cfg['domoticz']['httpBasicAuth']['passWord'] = \
default_input('Domoticz web service password (leave blank if no passord is needed)', '')
cfg['domoticz']['devices'] = {}
cfg['domoticz']['devices']['device'] = []
cfg['system'] = {}
tmpdir = '/var/tmp' if os.path.isdir('/var/tmp') else '/tmp'
cfg['system']['tmpFolder'] = '/xxxx/yyyy'
while not os.path.isdir(cfg['system']['tmpFolder']):
cfg['system']['tmpFolder'] = default_input('Directory for app logging and storing access tokens', tmpdir)
if not os.path.isdir(cfg['system']['tmpFolder']):
print 'That isn\'t a valid directory name on Your system! Please try again.'
cfg['oAuth2ClientCredentials'] = {}
cfg['oAuth2ClientCredentials']['authorizationCode'] = 'xxxx'
cfg['oAuth2ClientCredentials']['clientId'] = ''
while len(cfg['oAuth2ClientCredentials']['clientId']) <> 32:
cfg['oAuth2ClientCredentials']['clientId'] = default_input('Your Nibe UpLink Application\'s Identifier', '')
if len(cfg['oAuth2ClientCredentials']['clientId']) <> 32:
print 'That doesn\'t look like a valid Identifier. Have a look at https://api.nibeuplink.com/Applications'
print 'Please try again'
cfg['oAuth2ClientCredentials']['clientSecret'] = ''
while len(cfg['oAuth2ClientCredentials']['clientSecret']) <> 44:
cfg['oAuth2ClientCredentials']['clientSecret'] = default_input('Your Nibe UpLink Application\'s secret', '')
if len(cfg['oAuth2ClientCredentials']['clientSecret']) <> 44:
print 'That doesn\'t look like a valid secret. Have a look at https://api.nibeuplink.com/Applications'
print 'Please try again'
# Do we already have a hardware device named 'NIBEUplink' in Domoticz?
payload = dict([('type', 'hardware')])
r = domoticzAPI(payload)
hwIdx = '0'
HWNAME = 'NIBEUplink'
if 'result' in r.keys():
for hw in r['result']:
if hw['Name'] == HWNAME and hw['Enabled'] == 'true':
hwIdx = hw['idx']
break
if hwIdx <> '0':
cfg['domoticz']['virtualHwDeviceIdx'] = int(hwIdx)
else:
# Create a new Hardware Device. We wants it, we needs it. Must have the precious. They stole it from us!
payload = dict([('type', 'command'), ('param', 'addhardware'), ('htype', 15), \
('port', 1), ('name', HWNAME), ('enabled', 'true'), ('datatimeout', 0)])
r = domoticzAPI(payload)
# Now go fishing for the hardware device Idx
payload = dict([('type', 'hardware')])
r = domoticzAPI(payload)
for hw in r['result']:
if hw['Name'] == HWNAME and hw['Enabled'] == 'true':
hwIdx = hw['idx']
break
if hwIdx <> '0':
cfg['domoticz']['virtualHwDeviceIdx'] = int(hwIdx)
else:
print 'Can not find the newly created virtual hardware device.'
sys.exit(0)
# ROOM PLAN
# Do we already have a room plane named 'NIBEUplink' in Domoticz?
payload = dict([('type', 'plans')])
r = domoticzAPI(payload)
roomIdx = '0'
ROOMPLAN = 'NIBEUplink'
if 'result' in r.keys(): # Can happen if there are no room plans
for room in r['result']:
if room['Name'] == ROOMPLAN:
roomIdx = room['idx']
break
if roomIdx <> '0':
cfg['domoticz']['roomPlan'] = int(roomIdx)
else:
# Create a new Room Plan
payload = dict([('type', 'command'), ('param', 'addplan'), ('name', ROOMPLAN)])
r = domoticzAPI(payload)
# Now go fishing for the room plan Idx
payload = dict([('type', 'plans')])
r = domoticzAPI(payload)
for room in r['result']:
if room['Name'] == ROOMPLAN:
roomIdx = room['idx']
break
if roomIdx <> '0':
cfg['domoticz']['roomPlan'] = int(roomIdx)
else:
print 'Can not find the newly created room plan.'
sys.exit(0)
with open(cfgFile, 'w') as outfile:
json.dump(cfg, outfile, indent=2, sort_keys=True, separators=(',', ':'))
return cfg
def load_config():
try:
with open(cfgFile) as json_data_file:
cfg = json.load(json_data_file)
except IOError:
# Create a new config file
if tty:
cfg = create_config()
else:
sys.exit(0)
except:
logMessage = 'Can not open the config file ' + cfgFile
print logMessage, sys.exc_info()[0]
sys.exit(0)
return cfg
def domoticzAPI(payload):
try:
r = requests.get(cfg['domoticz']['protocol'] + '://' + cfg['domoticz']['hostName'] + ':' + \
str(cfg['domoticz']['portNumber']) + '/json.htm', \
verify=False, \
auth=(cfg['domoticz']['httpBasicAuth']['userName'], cfg['domoticz']['httpBasicAuth']['passWord']), \
params=payload)
except:
print('Can not open domoticz URL: \'' + cfg['domoticz']['protocol'] + '://' + cfg['domoticz']['hostName'] + ':' + \
str(cfg['domoticz']['portNumber']) + '/json.htm\'', sys.exc_info()[0])
sys.exit(0)
if r.status_code <> 200:
print 'Unexpected status code from Domoticz: ' + str(r.status_code)
sys.exit(0)
try:
rJsonDecoded = r.json()
except:
print('Can\'t Json decode response from Domoticz.', sys.exc_info()[0])
sys.exit(0)
if rJsonDecoded['status'] <> 'OK':
print 'Unexpected response from Domoticz: ' + rJsonDecoded['status']
sys.exit(0)
return rJsonDecoded
def logToDomoticz(messageType, logMessage):
payload = dict([('type', 'command'), ('param', 'addlogmessage'), \
('message', '(' + messageType+ ') ' + os.path.basename(sys.argv[0]) + ': ' + logMessage)])
r = domoticzAPI(payload)
return r
def truncate(x, d):
return int(x*(10.0**d))/(10.0**d)
def findTruncatedValue(rawValue, displayValue):
# Handle a case like this:
# "displayValue": "1339.3kWh", "rawValue": 133931
# "displayValue": "0.314kWh", "rawValue": 314931
# "displayValue": "-245DM", "rawValue": -2456
# "displayValue": "1341.6kWh", "rawValue": 134157
xTest = ''.join(ch for ch in displayValue if ch.isdigit() or ch == '.') # Now "1339.3"
if not xTest.lstrip('-').replace('.','',1).isdigit():
return ''
displayValueNum = float(xTest)
rawValue = abs(rawValue) # Now a positive number
if displayValueNum == rawValue or displayValueNum == 0 or rawValue == 0: return 1
# displayValueNum is now a number and it's not 0
if displayValueNum >= 1:
if int(rawValue / 10) == int(displayValueNum):
return 0.1
elif int(rawValue / 100) == int(displayValueNum):
return 0.01
elif int(rawValue / 1000) == int(displayValueNum):
return 0.001
else:
numDecimalPlaces = len(str(displayValueNum).split('.')[1]) # How many decimal places is there in displayValueNum ?
rawValueFloat = float(rawValue)
if truncate(rawValueFloat / 10, numDecimalPlaces) == displayValueNum:
return 0.1
elif truncate(rawValueFloat / 100, numDecimalPlaces) == displayValueNum:
return 0.01
elif truncate(rawValueFloat / 1000, numDecimalPlaces) == displayValueNum:
return 0.001
elif truncate(rawValueFloat / 10000, numDecimalPlaces) == displayValueNum:
return 0.0001
elif truncate(rawValueFloat / 100000, numDecimalPlaces) == displayValueNum:
return 0.00001
elif truncate(rawValueFloat / 1000000, numDecimalPlaces) == displayValueNum:
return 0.000001
return ''
def redefineDomoDevice(nibeSystem, nibeResource, nibeCategoryName):
global newDevicesList
global configChanged
for c in cfg['domoticz']['devices']['device']:
if c['nibeSystemId'] == nibeSystem['systemId'] and c['nibeParameterId'] == nibeResource['parameterId']:
devNameLong = ((nibeSystem['productName'] + ' ' + nibeResource['title'] + ' ' + nibeResource['designation']).title()).strip()
if not c['enabled']:
devNameLong += ' (Currently not enabled)'
elif c['domoticzIdx'] > 0:
devNameLong += ' (Enabled and current Domoticz idx: ' + str(c['domoticzIdx']) + ')'
if query_yes_no('Redefine : ' + devNameLong, 'no'):
configChanged = True # Do not append to newDevicesList
else:
newDevicesList.append(c)
def getDomoDevice(nibeSystem, nibeResource, nibeCategoryName):
unsupportedDevices = [ 0, 47214, 48745]
if nibeResource['parameterId'] in unsupportedDevices:
#print str(nibeResource['parameterId']) + ' is not a supported device'
return {}
#print str(nibeResource['nibeResourceId']) + ' is a supported device'
for c in cfg['domoticz']['devices']['device']:
if c['nibeSystemId'] == nibeSystem['systemId'] and c['nibeParameterId'] == nibeResource['parameterId']:
#print 'Found : ' + c['nibeTitle']
return c
if not tty:
return {}
nibeResource['title'] = nibeResource['title'].strip()
devNameLong = ((nibeSystem['productName'] + ' ' + nibeResource['title'] + ' ' + nibeResource['designation']).title()).strip()
nibeResource['title'] = nibeResource['title'].replace('compressor', 'compr')
nibeResource['title'] = nibeResource['title'].replace('compr.', 'compr')
if len(nibeResource['title']) > 17:
nibeResource['title'] = nibeResource['title'].replace('operating', 'op')
if nibeResource['unit'] == 'kWh':
# Counter incremental has very little space in the title
if len(nibeResource['title']) > 17:
nibeResource['title'] = nibeResource['title'].replace('factor', 'fc')
if len(nibeResource['title']) > 21:
nibeResource['title'] = nibeResource['title'][0:21].strip()
elif nibeResource['unit'] == 'A':
# Current meter, Add phase number to name
nibeResource['title'] = nibeResource['title'] + ' (Phase-' + nibeResource['designation'][-1:] + ')'
else:
if len(nibeResource['title']) > 36:
nibeResource['title'] = nibeResource['title'][0:36].strip()
# Avoid dulicate device names for 'heat medium flow'
if nibeResource['title'] == 'heat medium flow': nibeResource['title'] = nibeResource['title'] + ' ' + nibeResource['designation']
productNameShort = nibeSystem['productName'][5:] if nibeSystem['productName'][0:5] == 'NIBE ' else nibeSystem['productName']
devName = (productNameShort + ' ' + nibeResource['title']).title() # Capitalize the first letter of each word
createDev = default_input('\n\nFound a new Nibe Uplink resource: \'' + devNameLong + '\'\nShall I enable it and create a Domoticz Virtual Device for it? : Y/N', 'N')
createDev = (createDev.upper() == 'Y' or createDev.upper() == 'YES') and True or False
entry = {}
entry['enabled'] = False
entry['domoticzIdx'] = 0
if (nibeResource['displayValue'] == 'no' and nibeResource['rawValue'] == 0) \
or (nibeResource['displayValue'] == 'yes' and nibeResource['rawValue'] == 1):
entry['domoticzSensorType'] = 6 # Switch
elif nibeResource['parameterId'] in {9999999,47412} :# Hard coded for alarm
entry['domoticzSensorType'] = 7 # Alert
elif nibeResource['parameterId'] == 43416: # Hard coded for compressor starts, nibeResource['unit'] = ''
entry['domoticzSensorType'] = 113 # Counter sensor (Ordinary)
elif nibeResource['parameterId'] == 10069: # Hard coded for Smart Price Adaption
entry['domoticzSensorType'] = 1004 # Custom sensor
elif nibeResource['unit'] == '%':
entry['domoticzSensorType'] = 2 # Percentage
elif unicode(nibeResource['unit']) == u'\xb0'+'C':
entry['domoticzSensorType'] = 80 # Temp
elif nibeResource['unit'] == 'h' or nibeResource['unit'] == 's':
entry['domoticzSensorType'] = 113 # Counter sensor (Ordinary)
elif nibeResource['unit'] == 'Hz':
entry['domoticzSensorType'] = 1004 # Custom sensor
elif nibeResource['unit'] == 'kWh':
entry['domoticzSensorType'] = 113 # Counter sensor (Ordinary)
elif nibeResource['unit'] == 'kW':
entry['domoticzSensorType'] = 248 # Usage Electric
elif nibeResource['unit'] == 'A':
entry['domoticzSensorType'] = 19 # (Ampere 1-Phase)
elif nibeResource['unit'] == 'DM': # Degree minutes
entry['domoticzSensorType'] = 1004 # Custom sensor
elif nibeResource['parameterId'] in {40050,43124} : # Hard coded for some unit-less devices
entry['domoticzSensorType'] = 1004 # Custom sensor
elif nibeResource['unit'] == '':
entry['domoticzSensorType'] = 5 # Text
else:
print 'Unknown Domoticz type: \'' + unicode(nibeResource['unit']) + '\''
entry['domoticzSensorType'] = 0
if createDev and entry['domoticzSensorType'] > 0:
# Create a Virtual Device
payload = dict([('type', 'createvirtualsensor'), ('idx', cfg['domoticz']['virtualHwDeviceIdx']), \
('sensorname', devName), ('sensortype', entry['domoticzSensorType'])])
if entry['domoticzSensorType'] == 1004:
payload['sensoroptions'] = '1;' + nibeResource['unit']
r = domoticzAPI(payload)
# Now go fishing for the newly created device idx
payload = dict([('type', 'devices')])
r = domoticzAPI(payload)
devIdx = 0
for dev in reversed(r['result']):
if dev['Name'] == devName and dev['HardwareID'] == cfg['domoticz']['virtualHwDeviceIdx']:
devIdx = dev['idx']
break
if devIdx <> '0':
entry['domoticzIdx'] = int(devIdx)
entry['enabled'] = True
print 'Created Domoticz virtual device (idx) : ' + str(devIdx)
else:
print 'Error: Can not find the newly created virtual device.'
sys.exit(0)
# Add the device to the Domoticz room plan
payload = dict([('type', 'command'), ('param', 'addplanactivedevice'), ('idx', cfg['domoticz']['roomPlan']), \
('activetype', 0), ('activeidx', devIdx)])
r = domoticzAPI(payload)
entry['nibeSystemId'] = nibeSystem['systemId']
entry['nibeParameterId'] = nibeResource['parameterId']
entry['nibeTitle'] = nibeResource['title']
entry['nibeDesignation'] = nibeResource['designation']
entry['nibeCategoryName'] = nibeCategoryName.strip()
if nibeResource['parameterId'] == 43084: # This guy may be 0 for the moment making it hard to guess the factor
entry['valueFactor'] = 0.01
elif nibeResource['parameterId'] == 40016: # Brine out should have 0.1
entry['valueFactor'] = 0.1
elif nibeResource['parameterId'] == 40079: # Current should have 0.1
entry['valueFactor'] = 0.1
elif nibeResource['parameterId'] == 40081: # Current should have 0.1
entry['valueFactor'] = 0.1
elif nibeResource['parameterId'] == 40083: # Current should have 0.1
entry['valueFactor'] = 0.1
elif nibeResource['displayValue'] == '---':
entry['valueFactor'] = 1
elif nibeResource['displayValue'] == 'no' or nibeResource['displayValue'] == 'yes' or \
nibeResource['rawValue'] == -32768:
entry['valueFactor'] = 0
elif (nibeResource['rawValue'] == 0) and ('0.000' in nibeResource['displayValue']):
entry['valueFactor'] = 0.001
elif (nibeResource['rawValue'] == 0) and ('0.00' in nibeResource['displayValue']):
entry['valueFactor'] = 0.01
elif (nibeResource['rawValue'] == 0) and ('0.0' in nibeResource['displayValue']):
entry['valueFactor'] = 0.1 # This may not always be correct
elif (nibeResource['rawValue'] == 0):
entry['valueFactor'] = 1
elif str(nibeResource['rawValue'] * 1000) in nibeResource['displayValue']:
entry['valueFactor'] = 1000
elif str(nibeResource['rawValue'] * 100) in nibeResource['displayValue']:
entry['valueFactor'] = 100
elif str(nibeResource['rawValue'] * 10) in nibeResource['displayValue']:
entry['valueFactor'] = 10
elif str(nibeResource['rawValue']) in nibeResource['displayValue']:
entry['valueFactor'] = 1
elif str(nibeResource['rawValue'] / 10) in nibeResource['displayValue']:
entry['valueFactor'] = 0.1
elif str(nibeResource['rawValue'] / 100) in nibeResource['displayValue']:
entry['valueFactor'] = 0.01
elif findTruncatedValue(nibeResource['rawValue'], nibeResource['displayValue']) <> '':
entry['valueFactor'] = findTruncatedValue(nibeResource['rawValue'], nibeResource['displayValue'])
else:
entry['valueFactor'] = 1
# Change the Domoticz device if it was just created above
# We put this here because now we can access the entry['valueFactor'] now
if createDev and entry['domoticzSensorType'] > 0:
# When there is a counter created, there is a possibility to change the units and set the offset value
domoSurpriseFc = 1000 if nibeResource['unit'] == 'kWh' else 1
if entry['domoticzSensorType'] == 113:
payload = dict([('type', 'setused'), ('idx', devIdx), ('description', 'Virtual sensor device for ' + devNameLong), \
('switchtype', 0 if nibeResource['unit'] == 'kWh' else 3), \
('addjvalue', nibeResource['rawValue']*entry['valueFactor'] * -1), \
('used', 'true'), ('name', devName), \
('options', 'VmFsdWVRdWFudGl0eSUzQVRpbWUlM0JWYWx1ZVVuaXRzJTNBaCUzQg==' if nibeResource['unit'] == 'h' else '')])
r = domoticzAPI(payload)
# The options sent above is the string 'ValueQuantity:Time;ValueUnits:h;' that has been URL encoded + Base64 encoded
cfg['domoticz']['devices']['device'].append(entry)
with open(cfgFile, 'w') as outfile:
json.dump(cfg, outfile, indent=2, sort_keys=True, separators=(',', ':'))
configChanged = True
return entry
def updateDomoDevice(domoDevice, nibeResource):
if not domoDevice['enabled']:
return
# Only update if the new value differs from the device value
# or if the device has not been updated for a while
payload = dict([('type', 'devices'), ('rid', domoDevice['domoticzIdx'])])
r = domoticzAPI(payload)
# Domoticz has another surprise going on when it comes to counters ... Energy (kWh) Gas (m3) Water (m3)
# The counter will be treated with the divider which is defined in the parameters in the application settings.
# For example if the counter is set to "Water" and the value is passed as liters, the divider must set to 1000
# (as the unit is m3).
# It should be used both when reading and writing values
domoSurpriseFc = 1000 if nibeResource['unit'] == 'kWh' else 1
# TODO check more cases
#print r['result'][0]['Data'] #data
#print r['result'][0]['LastUpdate']
if not 'result' in r.keys():
errMess = 'Failure getting data for domoticz device idx: ' + str(domoDevice['domoticzIdx'])
print errMess
logToDomoticz(MSG_ERROR, errMess)
return
xTest = ''.join(ch for ch in r['result'][0]['Data'] if ch.isdigit() or ch == '.' or ch == '-') # Now "-1339.3"
if xTest.lstrip('-').replace('.','',1).isdigit():
domoValueNum = float(xTest) * domoSurpriseFc # This contains the Domoticz device's value as a number
else:
domoValueNum = 0
# Now, looking for a reason to update the sensor
# Does the Domotic's sensor need an update in order not to time out?
sensorTimedOut = False
if 'HaveTimeout' in r['result'][0]:
if (r['result'][0]['HaveTimeout'] and ((datetime.now() - datetime.strptime(r['result'][0]['LastUpdate'], '%Y-%m-%d %H:%M:%S')).seconds >= 3000)):
sensorTimedOut = True
# Does the value reported from Nibe differ from the Domoticz device's value?
valueChanged = False
if domoDevice['domoticzSensorType'] == 7: # Handling the ALERT Device type
domoCompareValue = r['result'][0]['Level']
testValue = r['result'][0]['Data']
if testValue == 'Everything seems to be fine':
domoCompareValue = '9' # Old version used 'Everything seems to be fine', it's to long to fit. This test can be removed in next version.
nibeCompareValue = 4 if nibeResource['rawValue'] <> 0 else 1
elif domoDevice['domoticzSensorType'] == 6: # Handling the SWITCH Device type
domoCompareValue = r['result'][0]['Status']
nibeCompareValue = 'On' if nibeResource['rawValue'] == 1 else 'Off'
elif domoDevice['domoticzSensorType'] == 113:# These guys use an offset value that we need to deal with
# Comparing floats in Python is not as simple as it sounds, using str() as a workaround
if nibeResource['unit'] == 'kWh':
nibeCompareValue = str(float(nibeResource['rawValue']*domoDevice['valueFactor']*domoSurpriseFc))
domoCompareValue = str((domoValueNum / domoSurpriseFc - r['result'][0]['AddjValue'])*domoSurpriseFc)
else:
# Don't use fractionals
nibeCompareValue = int(nibeResource['rawValue']*domoDevice['valueFactor'])
domoCompareValue = int(domoValueNum / domoSurpriseFc - r['result'][0]['AddjValue'])
elif domoDevice['domoticzSensorType'] == 248 and nibeResource['unit'] == 'kW': # Usage Electric
nibeCompareValue = str(float(nibeResource['rawValue']*domoDevice['valueFactor']*1000))
domoCompareValue = str(domoValueNum)
else:
nibeCompareValue = str(float(nibeResource['rawValue']*domoDevice['valueFactor']))
domoCompareValue = str(domoValueNum)
if nibeCompareValue <> domoCompareValue: valueChanged = True
if isDebug:
print r['result'][0]['Name']
print 'N: ' + str(nibeCompareValue)
print 'D: ' + str(domoCompareValue)
print
elif isVerbose and (valueChanged or sensorTimedOut):
sayThis = 'Updating Domoticz device \'' + r['result'][0]['Name'] + '\' idx: ' + str(domoDevice['domoticzIdx']) + ' due to:'
if valueChanged: sayThis += ' <value changed>. New value is: ' + str(nibeCompareValue) + \
'. Old value was: ' + str(domoCompareValue) + '.'
if sensorTimedOut: sayThis += ' <time condition>'
print sayThis
if not valueChanged and not sensorTimedOut:
return
if domoDevice['domoticzSensorType'] == 7: # Handling the ALERT Device type
payload = dict([('type', 'command'), ('param', 'udevice'), ('idx', domoDevice['domoticzIdx']), \
('nvalue', nibeCompareValue), ('svalue', 'OK' if nibeCompareValue == 1 else 'Alert!')])
elif domoDevice['domoticzSensorType'] == 6: # Handling the SWITCH Device type
payload = dict([('type', 'command'), ('param', 'switchlight'), ('idx', domoDevice['domoticzIdx']), \
('switchcmd', nibeCompareValue)])
else: # All other sensor types
payload = dict([('type', 'command'), ('param', 'udevice'), ('idx', domoDevice['domoticzIdx']), \
('nvalue', 0), ('svalue', nibeCompareValue)])
r = domoticzAPI(payload)
# Retrieve the authorization code
# It will only run if the variable cfg['oAuth2ClientCredentials']['authorizationCode'] has not been set
def retrieve_authorization_code():
authorization_code_req = {
"response_type": 'code',
"client_id": cfg['oAuth2ClientCredentials']['clientId'],
"state": 'xyz',
"access_type": 'offline',
"redirect_uri": redirect_uri,
"scope": (r'READSYSTEM' +
r' WRITESYSTEM')
}
r = requests.get(authorizeEndpointUrl + "?%s" % urlencode(authorization_code_req),
allow_redirects=False)
print '\nAuthorization Code retrieval\n==========================\nCopy the URL below and paste into the adress bar of a web browser. After granting access on Nibe Uplink, Your browser will show You the Authorization Code. Then enter that code below .\n'
url = r.headers.get('location')
print baseEndPointUrl + url + '\n'
while len(cfg['oAuth2ClientCredentials']['authorizationCode']) <> 401:
cfg['oAuth2ClientCredentials']['authorizationCode'] = default_input('Authorization Code', '')
if len(cfg['oAuth2ClientCredentials']['authorizationCode']) <> 401:
print 'That doesn\'t look like a valid Authorization Code. Please try again.'
with open(cfgFile, 'w') as outfile:
json.dump(cfg, outfile, indent=2, sort_keys=True, separators=(',', ':'))
configChanged = True
return
# Request new OAuth2 tokens
def requestTokens(grant_type, refreshToken):
logToDomoticz(MSG_INFO, 'Requesting acess tokens using the ' + grant_type + ' grant type')
logging.basicConfig(filename=logFile,level=logging.DEBUG,format='%(asctime)s %(levelname)s %(message)s',filemode='w')
data={}
try:
if grant_type == 'authorization_code':
data={'grant_type' : grant_type, 'code' : cfg['oAuth2ClientCredentials']['authorizationCode'], 'client_id' : cfg['oAuth2ClientCredentials']['clientId'], 'client_secret' : cfg['oAuth2ClientCredentials']['clientSecret'], 'redirect_uri' : redirect_uri}
elif grant_type == 'refresh_token':
logging.info('Using Refresh Token: %s' % refreshToken)
data={'grant_type' : grant_type, 'refresh_token' : refreshToken, 'client_id' : cfg['oAuth2ClientCredentials']['clientId'], 'client_secret' : cfg['oAuth2ClientCredentials']['clientSecret']}
getTokens = requests.post(tokenEndpointUrl, data)
getTokens.raise_for_status()
newTokens = getTokens.json()
accessToken = newTokens['access_token']
expiresIn = newTokens['expires_in']
expiration = int(time.time()) + expiresIn
refreshToken = newTokens['refresh_token']
plistlib.writePlist({'Access Token':accessToken,'Refresh Token':refreshToken,'Expiration': expiration,}, tokenDictionaryFile)
logging.info('Got Access Token: %s' % accessToken)
logging.info('The Access Token is valid for : %s seconds' % expiresIn)
logging.info('Got Refresh Token: %s' % refreshToken)
#tokenPlist = plistlib.readPlist(tokenDictionaryFile)
except requests.exceptions.RequestException, e:
logMessage = 'Can\'t generate tokens: %s' % e
logging.error('========== ' + logMessage + ' ==========')
logToDomoticz(MSG_ERROR, logMessage)
if tty:
print logMessage
print 'The Authorization Code might be too old. Clearing it out so that You can request a new. Please run this script again.'
cfg['oAuth2ClientCredentials']['authorizationCode'] = 'xxxx'
with open(cfgFile, 'w') as outfile:
json.dump(cfg, outfile, indent=2, sort_keys=True, separators=(',', ':'))
print '\n\nBelow is some debugging help:\n=============================='
for k, v in data.iteritems():
print k, ' = ', v
print '\n\n'
sys.exit(0)
except:
logMessage = 'Can\'t create the token dictionary file'
logging.error('========== ' + logMessage + ' ==========')
logToDomoticz(MSG_ERROR, logMessage)
if tty:
print logMessage
sys.exit(0)
return accessToken
# Validate the OAuth2 tokens
def validateTokens():
# First let's read the Token Dictionary plist
accessTokenValid = False
try:
tokenPlist = plistlib.readPlist(tokenDictionaryFile)
refreshToken = tokenPlist["Refresh Token"]
accessToken = tokenPlist["Access Token"]
expiration = tokenPlist["Expiration"]
if expiration - time.time() > 30:
accessTokenValid = True
logMessage = 'Current access token valid for ' + str(int(expiration - time.time())) + ' seconds'
if isVerbose:
logToDomoticz(MSG_INFO, logMessage)
except:
if not tty:
logToDomoticz(MSG_ERROR, 'You need to run \'python ' + os.path.realpath(__file__) + '\' from a console to obtain a new Authorization Code')
sys.exit(0)
# No file!? Man that's bad. Maybe this is the first time the script runs. Let's make an Authorization Request
errorText = 'There is no dictionary file ' + tokenDictionaryFile + '' \
+ ' (That is perfectly normal if this is the first time that the script runs)'
logToDomoticz(MSG_ERROR, errorText)
if tty:
print errorText
accessToken = requestTokens('authorization_code', '')
accessTokenValid = True
if not accessTokenValid:
# The old refresh token is used to obtain a new access token and a new refresh token
accessToken = requestTokens('refresh_token', refreshToken)
return accessToken
def get_system_list(accessToken):
authorization_header = {"Authorization": "Bearer %s" % accessToken}
r = requests.get(baseEndPointUrl + "/api/v1/systems", headers=authorization_header)
if r.status_code == requests.codes.ok:
if isDebug: print 'HTTP/1.1 200 OK'
else:
print "Nibe server responded with an error code: ", r.status_code
sys.exit(0)
if isDebug: print "get_system_list: ", r.text, "\n"
return r.text
def get_system_status(accessToken, systemid):
authorization_header = {"Authorization": "Bearer %s" % accessToken}
r = requests.get(baseEndPointUrl + "/api/v1/systems/" + str(systemid) + "/status/system", headers=authorization_header)
if isDebug: print "get_system_status: ", r.text, "\n"
return r.text
def get_system_config(accessToken, systemid):
authorization_header = {"Authorization": "Bearer %s" % accessToken}
r = requests.get(baseEndPointUrl + "/api/v1/systems/" + str(systemid) + "/config", headers=authorization_header)
if isDebug: print "get_system_config: ", r.text, "\n"
return r.text
def get_system_unit_status(accessToken, systemid, systemUnitId):
authorization_header = {"Authorization": "Bearer %s" % accessToken}
r = requests.get(baseEndPointUrl + "/api/v1/systems/" + str(systemid) + "/status/systemUnit/" + systemUnitId, headers=authorization_header)
if isDebug: print "get_system_unit_status: ", r.text, "\n"
return r.text
def get_serviceinfoCategories(accessToken, systemid):
authorization_header = {"Authorization": "Bearer %s" % accessToken}
r = requests.get(baseEndPointUrl + "/api/v1/systems/" + str(systemid) + "/serviceinfo/categories?parameters=true", headers=authorization_header)
if isDebug: print "get_serviceinfoCategories: ", r.text + '\n'
return r.text
def list_systems(accessToken):
systemlist = json.loads(get_system_list(accessToken))
if isVerbose: print 'Number of systems: ' + str(systemlist['numItems'])
if isVerbose: print ''
for nibeSystem in systemlist['objects']:
if isVerbose: print 'Product Name: ' + nibeSystem['productName']
if isVerbose: print 'Serial number: ' + nibeSystem['serialNumber']
if isVerbose: print 'System ID: ' + str(nibeSystem['systemId'])
if isVerbose: print 'Has alarmed: ' + str(nibeSystem['hasAlarmed'])
if isVerbose: print ''
# climate system 1
systemUnitId = 0
if isDebug: systemUnitStatus0 = json.loads(get_system_unit_status(accessToken, nibeSystem['systemId'], str(systemUnitId)))
if isDebug: systemStatus = json.loads(get_system_status(accessToken, nibeSystem['systemId']))
serviceinfoCategories = json.loads(get_serviceinfoCategories(accessToken, nibeSystem['systemId']))
# Append the hasAlarmed to the serviceinfoCategories parameters
has_alarmed_dict = {'parameterId': 9999999,
'name': '9999999',
'title': 'system alarm',
'designation': '',
'unit': '',
'displayValue': str(int(nibeSystem['hasAlarmed'])),
'rawValue': int(nibeSystem['hasAlarmed'])
}
for s in serviceinfoCategories:
if s['categoryId'] == 'STATUS' :
s['parameters'].append(has_alarmed_dict)
if redefineDevices:
for s in serviceinfoCategories:
for nibeResource in s['parameters']: # nibeResources:
redefineDomoDevice(nibeSystem, nibeResource, s['name'])
if configChanged:
cfg['domoticz']['devices']['device'] = newDevicesList # Replace the devices list in config file
with open(cfgFile, 'w') as outfile:
json.dump(cfg, outfile, indent=2, sort_keys=True, separators=(',', ':'))
print '\n\n\n'
for s in serviceinfoCategories:
for nibeResource in s['parameters']: # nibeResources:
#catResources = json.loads(p)
domoDevice = getDomoDevice(nibeSystem, nibeResource, s['name'])
if domoDevice: updateDomoDevice(domoDevice, nibeResource)
#print "Unit: " + p['unit']
if configChanged:
logMessage = 'Updated the config file at ' + cfgFile
logToDomoticz(MSG_INFO, logMessage)
if isVerbose: print logMessage
def print_help(argv):
print 'usage: ' + os.path.basename(__file__) + ' [option] [-C domoticzDeviceidx|all] \nOptions and arguments'
print '-d : debug output (also --debug)'
print '-h : print this help message and exit (also --help)'
print '-v : verbose'
print '-V : print the version number and exit (also --version)'
print '-R : redefines devices in the config file (also --redefine)'
def main(argv):
global isDebug
global isVerbose
global redefineDevices;redefineDevices = False
try:
opts, args = getopt.getopt(argv, 'dhvVR', ['help', 'debug', 'version', 'redefine'])
except getopt.GetoptError:
print_help(argv)
sys.exit(2)
for opt, arg in opts:
if opt in ('-h', '--help'):
print_help(argv)
sys.exit(0)
elif opt in ('-d', '--debug'):
isDebug = True
elif opt in ('-v'):
isVerbose = True
elif opt in ('-V', '--version'):
print PROGRAMNAME + ' ' + VERSION
sys.exit(0)
elif opt in ("-R", "--redefine"):
redefineDevices = True
if isDebug: print 'Debug is on'
if not tty: time.sleep( 5 ) # Give Domoticz some time to settle down from other commands running exactly at the 00 sec
global cfg; cfg = load_config()
global logFile; logFile = os.path.join(cfg['system']['tmpFolder'], os.path.basename(sys.argv[0]) + '.log')
global tokenDictionaryFile; tokenDictionaryFile = os.path.join(cfg['system']['tmpFolder'], 'nibeUpLink.plist')
if not connected_to_internet():
logToDomoticz(MSG_ERROR, 'No internet connection available')
sys.exit(0)
msgProgInfo = PROGRAMNAME + ' Version ' + VERSION
msgProgInfo += ' running on TTY console...' if tty else ' running as a CRON job...'
if isVerbose:
print msgProgInfo
logToDomoticz(MSG_EXEC, msgProgInfo)
if len(cfg['oAuth2ClientCredentials']['authorizationCode']) <> 401:
if tty:
retrieve_authorization_code()
else:
sys.exit(0)
accessToken = validateTokens()
if redefineDevices and tty:
print '\nYou have requested to redefine the devices. By doing that, for each device found in your Nibe system, you will be asked if you want to redefine it. After going through the complete list, for any device that You answered \'Yes\' You will be asked if You want to create a Domoticz device for it.\n'
if not query_yes_no('Are You sure that you\'d like to redefine the devices', 'no'):
sys.exit(0)
list_systems(accessToken)
sys.exit(0)
if __name__ == '__main__':
main(sys.argv[1:])