Code: Select all
#!/usr/bin/python3
#
# Domoticz Python Plugin for WeMo Insight by Belkin
# Modifications for WeMo Insight switch and rename to wemo-insight - huey 2018/04/16
# Added support for WeMo Insight switch - r3wt3d 2017/11/16
# Based on code from:
# https://gist.github.com/pruppert/af7d38cb7b7ca75584ef
# https://github.com/pdumoulin/blinky
#
# Author: mivo
#
"""
<plugin key="wemo-insight" name="WeMo Insight (plugin)" author="huey, with thx to mivo and r3wt3d" version="0.1" wikilink="http://www.domoticz.com/wiki/plugins" >
<description>
<h2>Belkin WeMo Insight plugin</h2><br/>
<h3>Devices</h3>
<ul style="list-style-type:square">
<li>Comes default with on/off switch and power</li>
<li>Optional additional selector switch for more convenient logging of standby status</li>
<li>Optional additional devices to log duration of last ontime and total for day in seconds</li>
<li>Optional additional device with summary details text</li>
<li>Optional additional device to reset standby threshold to default value of 8 or auto value</li>
</ul>
</description>
<params>
<param field="Address" label="IP Address" width="200px" required="true"/>
<param field="Port" label="Port" width="50px" required="true" default="49153"/>
<param field="Mode1" label="Standby status" width="75px">
<options>
<option label="True" value="YES" default="true"/>
<option label="False" value="NO"/>
</options>
</param>
<param field="Mode2" label="On for seconds" width="75px">
<options>
<option label="True" value="YES"/>
<option label="False" value="NO" default="true"/>
</options>
</param>
<param field="Mode3" label="Today seconds" width="75px">
<options>
<option label="True" value="YES"/>
<option label="False" value="NO" default="true"/>
</options>
</param>
<param field="Mode4" label="Summary details" width="75px">
<options>
<option label="True" value="YES"/>
<option label="False" value="NO" default="true"/>
</options>
</param>
<param field="Mode5" label="Standby reset" width="75px">
<options>
<option label="True" value="YES"/>
<option label="False" value="NO" default="true"/>
</options>
</param>
<param field="Mode6" label="Debug" width="75px">
<options>
<option label="True" value="Debug"/>
<option label="False" value="Normal" default="true"/>
</options>
</param>
</params>
</plugin>
"""
##import Domoticz
try:
import Domoticz
except ImportError:
def Log(s):
print(s)
def Debug(s):
print(s)
def Error(s):
print(s)
import sys
import urllib.request
import datetime
class wemoInsight:
ip = None
ports = [49153, 49152, 49154, 49151, 49155]
def __init__(self, insight_ip=None, insight_port=None):
self.ip = insight_ip
if insight_port:
self.ports = [insight_port]
def action(self, action):
value = None
if action == 'on':
event, method, obj, value = ('basicevent', 'Set', 'BinaryState', 1)
return self._send(event, method, obj, value)
elif action == 'off':
event, method, obj, value = ('basicevent', 'Set', 'BinaryState', 0)
return self._send(event, method, obj, value)
elif action == 'status':
event, method, obj = ('basicevent', 'Get', 'BinaryState')
return self._send(event, method, obj, value)
elif action == 'name':
event, method, obj = ('basicevent', 'Get', 'FriendlyName')
return self._send(event, method, obj, value)
elif action == 'signal':
event, method, obj = ('basicevent', 'Get', 'SignalStrength')
return self._send(event, method, obj, value)
elif action == 'params':
event, method, obj = ('insight', 'Get', 'InsightParams')
return self._send(event, method, obj, value)
elif action == 'autopowerthreshold':
event, method, obj = ('insight', 'SetAuto', 'PowerThreshold')
return self._send(event, method, obj, value)
elif action == 'setpowerthreshold':
event, method, obj, value = ('insight', 'Set', 'PowerThreshold', value)
return self._send(event, method, obj, value)
elif action == 'resetpowerthreshold':
event, method, obj = ('insight', 'Reset', 'PowerThreshold')
return self._send(event, method, obj, value)
def on(self):
return self.action('on')
def off(self):
return self.action('off')
def status(self):
return self.action('status')
def name(self):
return self.action('name')
def signal(self):
return self.action('signal')
def params(self):
return self.action('params')
def autopowerthreshold(self):
return self.action('autopowerthreshold')
def setpowerthreshold(self):
return self.action('setpowerthreshold')
def resetpowerthreshold(self):
return self.action('resetpowerthreshold')
def _get_header_xml(self, event, method, obj):
method = method + obj
return '"urn:Belkin:service:%s:1#%s"' % (event, method)
def _get_body_xml(self, event, method, obj, value=0):
method = method + obj
return '<u:%s xmlns:u="urn:Belkin:service:%s:1"><%s>%s</%s></u:%s>' % (method, event, obj, value, obj, method)
def _send(self, event, method, obj, value=None):
body_xml = self._get_body_xml(event, method, obj, value)
header_xml = self._get_header_xml(event, method, obj)
for port in self.ports:
result = self._try_send(self.ip, port, body_xml, header_xml, obj, event)
if result is not None:
self.ports = [port]
return result
raise Exception("_send TimeoutOnAllPorts")
def _try_send(self, ip, port, body, header, data, event):
try:
request = urllib.request.Request('http://%s:%s/upnp/control/%s1' % (ip, port, event))
request.add_header('Content-type', 'text/xml; charset="utf-8"')
request.add_header('SOAPACTION', header)
request_body = '<?xml version="1.0" encoding="utf-8"?>'
request_body += '<s:Envelope xmlns:s="http://schemas.xmlsoap.org/soap/envelope/" s:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/">'
request_body += '<s:Body>%s</s:Body></s:Envelope>' % body
request.data = request_body.encode()
result = urllib.request.urlopen(request, timeout=3)
return self._extract(result.read().decode(), data)
except Exception as e:
# except:
# raise
print(str(e))
return None
#huey: not sure what this is for, seems unused to me.
# def _get_request_data(self, event, method, obj, value=None):
# body_xml = self._get_body_xml(event, method, obj, value)
# header_xml = self._get_header_xml(event, method, obj)
# headers = dict()
#
# url = '/upnp/control/%s1' % event
# headers['Content-type'] = 'text/xml; charset="utf-8"'
# headers['SOAPACTION'] = header_xml
#
# body = '<?xml version="1.0" encoding="utf-8"?>' \
# '<s:Envelope xmlns:s="http://schemas.xmlsoap.org/soap/envelope/" s:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/">'
# body += '<s:Body>%s</s:Body></s:Envelope>' % body_xml
#
# headers['Content-Length'] = "%d"%(len(body))
#
# return dict(url=url, headers=headers, body=body)
def _extract(self, XML, tagName):
#Domoticz.Debug('XML: ' + XML)
#Domoticz.Debug('tagName: ' + tagName)
startTag = '<%s>' % (tagName)
endTag = '</%s>' % (tagName)
startPos = XML.find(startTag)
endPos = XML.find(endTag, startPos+1)
#Domoticz.Debug('start end: ' + str(startPos) + ' ' + str(endPos))
if ((startPos == -1) or (endPos == -1)):
raise Exception("_extract" + "'"+tagName+"' not found in supplied XML")
return False
#if ((startPos == -1) or (endPos == -1)): Domoticz.Error("'"+tagName+"' not found in supplied XML")
#print('vystup:', XML[startPos+len(startTag):endPos])
return XML[startPos+len(startTag):endPos]
global insight
insight = wemoInsight()
def onStart():
global insight
Domoticz.Log("* Plugin is starting *")
if Parameters["Mode6"] == "Debug":
Domoticz.Debugging(1)
Domoticz.Debug("sys.exec: " + str(sys.executable))
#sys.path.append('/usr/local/lib/python36/site-packages')
Domoticz.Debug("sys.path: " + str(sys.path))
DumpConfigToLog()
#huey: can not get external imports to work...
#Domoticz.Log("Debugger started, use 'telnet 0.0.0.0 4444' to connect")
#import rpdb
#rpdb.set_trace()
if 'WeMoInsight' not in Images:
Domoticz.Debug("Create new images")
Domoticz.Image('WeMoInsight.zip').Create()
if (len(Devices) == 0):
Domoticz.Device(Name="Switch", Unit=1, TypeName="Switch", Used=1).Create()
Domoticz.Log("Switch Device created.")
Domoticz.Device(Name="Power", Unit=2, TypeName="kWh", Used=1).Create()
Domoticz.Log("kWh Device created.")
if (3 not in Devices and Parameters["Mode2"] == "YES"):
Domoticz.Device(Name="OnFor", Unit=3, TypeName="Custom", Options = { "Custom" : "1;s"}, Used=0).Create()
Domoticz.Log("OnFor Custom Device created.")
if (4 not in Devices and Parameters["Mode3"] == "YES"):
Domoticz.Device(Name="OnTodayFor", Unit=4, TypeName="Custom", Options = { "Custom" : "1;s"}, Used=0).Create()
Domoticz.Log("OnToday Custom Device created.")
if (5 not in Devices and Parameters["Mode4"] == "YES"):
Domoticz.Device(Name="Summary", Unit=5, TypeName="Text", Used=0).Create()
Domoticz.Log("Summary Text Device created.")
if (6 not in Devices and Parameters["Mode1"] == "YES"):
OptionsStatus = {"LevelActions": "||",
"LevelNames": "Off|On|Standby",
"LevelOffHidden": "false",
"SelectorStyle": "0"}
Domoticz.Device(Name="Status", Unit=6, TypeName="Selector Switch", Switchtype=18, Image=12, Options=OptionsStatus, Used=0).Create()
Domoticz.Log("Switch Status Device created.")
if (7 not in Devices and Parameters["Mode5"] == "YES"):
OptionsStandby = {"LevelActions": "||",
"LevelNames": "Off|Default|Auto",
"LevelOffHidden": "true",
"SelectorStyle": "0"}
Domoticz.Device(Name="Standby threshold", Unit=7, TypeName="Selector Switch", Switchtype=18, Image=12, Options=OptionsStandby, Used=0).Create()
Domoticz.Log("Standby Reset Device created.")
if 1 in Devices:
UpdateImage(1, 'WeMoInsight')
if 3 in Devices:
UpdateImage(3, 'WeMoInsight')
if 4 in Devices:
UpdateImage(4, 'WeMoInsight')
# if 5 in Devices:
# UpdateImage(5, 'WeMoInsight')
if 6 in Devices:
UpdateImage(6, 'WeMoInsight')
if 7 in Devices:
UpdateImage(7, 'WeMoInsight')
# Domoticz.Heartbeat(30)
Domoticz.Heartbeat(60)
insight.ip = Parameters["Address"]
insight.port = Parameters["Port"]
def onStop():
Domoticz.Log("* Plugin is stopping *")
def onCommand(Unit, Command, Level, Hue):
global insight
Domoticz.Debug("onCommand called, unit: " + str(Unit) + " command: " + str(Command) + " level: " + str(Level) + " hue: " + str(Hue))
try:
if (Command.upper() == 'ON') or (Unit == 6 and Command.upper() == 'SET LEVEL' and Level >= 10):
status = insight.on()
updStatus(status)
elif (Unit == 7 and Command.upper() == 'SET LEVEL' and Level == 10):
status = insight.resetpowerthreshold()
Devices[7].Update(1,"10")
Domoticz.Debug("Standby: 'Default (8)'")
elif (Unit == 7 and Command.upper() == 'SET LEVEL' and Level == 20):
status = insight.autopowerthreshold()
Devices[7].Update(1,"20")
Domoticz.Debug("Standby: 'Auto'")
else:
status = insight.off()
updStatus(status)
except Exception as e:
Domoticz.Error('Except onCommand: ' + str(e))
return
def onHeartbeat():
global insight
Domoticz.Debug("* onHeartBeat called *")
try:
params = insight.params()
except Exception as e:
Domoticz.Error('Except onHeartbeat: ' + str(e))
return
Domoticz.Debug('DebugParams: ' + str(params))
if params is None:
Domoticz.Log('ERROR Params is undefined')
else:
state = int(params[0:1])
Domoticz.Debug('DebugState : ' + str(state))
power = str(insight.params()).split('|')
#if state >= 1:
updStatus(params)
if Parameters["Mode2"] == "YES":
Domoticz.Debug("* Device 3 update called *")
onfor = int(power[2])
Domoticz.Debug('DebugOnFor: ' + str(onfor))
Devices[3].Update(0,str(onfor))
if Parameters["Mode3"] == "YES":
Domoticz.Debug("* Device 4 update called *")
ontoday = int(power[3])
Domoticz.Debug('DebugOnToday: ' + str(ontoday))
Devices[4].Update(0,str(ontoday))
if Parameters["Mode4"] == "YES":
Domoticz.Debug("* Device 5 update called *")
lastchange = datetime.datetime.fromtimestamp(int(power[1]))
Domoticz.Debug('DebugLastChange: ' + str(lastchange))
onfortext = datetime.timedelta(seconds=int(power[2]))
Domoticz.Debug('DebugOnForText: ' + str(onfortext))
ontodaytext = datetime.timedelta(seconds=int(power[3]))
Domoticz.Debug('DebugOnTodayText: ' + str(ontodaytext))
ontotal = int(power[4])
Domoticz.Debug('DebugOnTotal: ' + str(ontotal))
ontotaltext = datetime.timedelta(seconds=int(power[4]))
Domoticz.Debug('DebugOnTotalText: ' + str(ontotaltext))
totalmw = int(float(power[9]))/1000
Domoticz.Debug('DebugTotalkWh: ' + str(totalmw))
threshold = int(power[10])/1000
Domoticz.Debug('DebugThreshold: ' + str(threshold))
Devices[5].Update(0,"On for: "+ str(onfortext) + "<br>" + "On today: " + str(ontodaytext) + "<br>" + "On total: " + str(ontotaltext) + "<br>" + "Total kWh: " + str(totalmw) + "<br>" + "Standby threshold: " + str(threshold) + "<br>" + "Last change: " + str(lastchange))
if Parameters["Mode5"] == "YES":
Domoticz.Debug("* Device 5 update called *")
if threshold == int('8000')/1000:
Devices[7].Update(1,"10")
Domoticz.Debug("Standby: 'Default (8)'")
else:
Devices[7].Update(1,"20")
Domoticz.Debug("Standby: 'Auto'")
def updStatus(status):
Domoticz.Debug("* updStatus called *")
if not status:
Domoticz.Error('False updStatus: ' + str(status))
return
try:
istate = int(status[0:1])
except ValueError:
Domoticz.Error('Except updStatus: ' + str(status))
return
Domoticz.Debug('DebugStatus: ' + str(status) + ' iState: ' + str(istate))
if istate == 1:
if (1 in Devices):
if Devices[1].nValue == 0 or Devices[1].nValue == 1:
Domoticz.Debug("* Device 1 update called *, nValue: " + str(Devices[1].nValue) )
Devices[1].Update(1,"On")
Domoticz.Debug("Device 1 updated to " + str(istate) + ", nValue: " + str(Devices[1].nValue) + " sValue: " + Devices[1].sValue)
Domoticz.Debug("* Device 6 update called *, nValue: " + str(Devices[6].nValue) )
Devices[6].Update(10, "10")
Domoticz.Debug("Device 6 updated to " + str(istate) + ", nValue: " + str(Devices[6].nValue) + " sValue: " + Devices[6].sValue)
elif istate == 0:
if (1 in Devices):
if Devices[1].nValue == 1:
Domoticz.Debug("* Device 1 update called *, nValue: " + str(Devices[1].nValue) )
Devices[1].Update(0,"Off")
Domoticz.Debug("Device 1 updated to " + str(istate) + ", nValue: " + str(Devices[1].nValue) + " sValue: " + Devices[1].sValue)
Domoticz.Debug("* Device 6 update called *, nValue: " + str(Devices[6].nValue) )
Devices[6].Update(0, "0")
Domoticz.Debug("Device 6 updated to " + str(istate) + ", nValue: " + str(Devices[6].nValue) + " sValue: " + Devices[6].sValue)
elif istate == 8:
if (1 in Devices):
if Devices[1].nValue == 0 or Devices[1].nValue == 1:
Domoticz.Debug("* Device 1 update called *, nValue: " + str(Devices[1].nValue) )
Devices[1].Update(1,"On")
Domoticz.Debug("Device 1 updated to " + str(istate) + ", nValue: " + str(Devices[1].nValue) + " sValue: " + Devices[1].sValue)
Domoticz.Debug("* Device 6 update called *, nValue: " + str(Devices[6].nValue) )
Devices[6].Update(20, "20")
Domoticz.Debug("Device 6 updated to " + str(istate) + ", nValue: " + str(Devices[6].nValue) + " sValue: " + Devices[6].sValue)
if Parameters["Mode1"] == "YES":
Domoticz.Debug("* Device 2 update called *")
power = status.split('|')
if Parameters["Mode1"] == "YES":
currentmw = round(int(power[7])/1000)
Domoticz.Debug('DebugkWh: ' + str(currentmw))
todaymw = int(power[8])/1000
Domoticz.Debug('DebugtoDay: ' + str(todaymw))
Devices[2].Update(0,str(currentmw) + "; " + str(todaymw))
# Synchronise images to match required Logo
def UpdateImage(Unit, Logo):
Domoticz.Debug("* updateImage called *, unit: " + str(Unit) )
Domoticz.Debug("Device current logo : " + str(Devices[Unit].Image) )
Domoticz.Debug("Device should have logo: " + str(Images[Logo].ID) )
if (Unit in Devices) and (Logo in Images):
if (Devices[Unit].Image != Images[Logo].ID):
Devices[Unit].Update(nValue=Devices[Unit].nValue, sValue=str(Devices[Unit].sValue), Image=Images[Logo].ID)
Domoticz.Debug("Device " + str(Unit) + " image updated to " + Logo)
return
# Generic helper functions
def LogMessage(Message):
Domoticz.Debug(Message)
def DumpConfigToLog():
for x in Parameters:
if Parameters[x] != "":
LogMessage( "'" + x + "':'" + str(Parameters[x]) + "'")
LogMessage("Device count: " + str(len(Devices)))
for x in Devices:
LogMessage("Device: " + str(x) + " - " + str(Devices[x]))
LogMessage("Internal ID: '" + str(Devices[x].ID) + "'")
LogMessage("External ID: '" + str(Devices[x].DeviceID) + "'")
LogMessage("Device Name: '" + Devices[x].Name + "'")
LogMessage("Device nValue: " + str(Devices[x].nValue))
LogMessage("Device sValue: '" + Devices[x].sValue + "'")
LogMessage("Device LastLevel: " + str(Devices[x].LastLevel))
LogMessage("Device Image ID: '" + str(Devices[x].Image) + "'")
return
def main():
import argparse
parser = argparse.ArgumentParser(description='WeMo Insight control module for Python')
parser.add_argument('action', choices=['on', 'off', 'status', 'name', 'signal', 'params', 'autopowerthreshold', 'setpowerthreshold', 'resetpowerthreshold'], help='Action')
parser.add_argument('ip', help='IP address')
parser.add_argument('port', nargs='?', default='49153', help='Port')
args = parser.parse_args()
IP=args.ip
PORT=args.port
insight = wemoInsight(IP, PORT)
if args.action:
print(insight.action(args.action))
if __name__ == '__main__':
main()