Sorry but i don't understand how works the python scripts to be run automatically when a device changed (script_device_xxx.py)
Could you for example give me an example of script_device_test_py reading a value and log it into domoticz.
I think with that kind of example i will understand how it works.
Thank you.
Looking after script in python
Moderator: leecollings
-
- Posts: 408
- Joined: Sunday 15 January 2017 11:06
- Target OS: Linux
- Domoticz version: beta
- Location: Norway
- Contact:
Re: Looking after script in python
@YellowSky,
to use the python scripts, domoticz must be compiled from source, and you need to use the ENABLE_PYTHON flag. Currently python scripts requires python2.x, while the plugins-system uses python3.x, so you can compile with support for either scripts or plugins, but not both at the same time.
There are some work underway upgrading the event-system to python3, but it's not ready yet.
M
to use the python scripts, domoticz must be compiled from source, and you need to use the ENABLE_PYTHON flag. Currently python scripts requires python2.x, while the plugins-system uses python3.x, so you can compile with support for either scripts or plugins, but not both at the same time.
There are some work underway upgrading the event-system to python3, but it's not ready yet.
M
-
- Posts: 65
- Joined: Tuesday 12 May 2015 12:09
- Target OS: Raspberry Pi / ODroid
- Domoticz version: Last
- Location: Belgium
- Contact:
Re: Looking after script in python
Oh, i understand why i can't use this kind of scripts.
But i am not able to do that (compile the source), therefore i pass.
Too bad for me but thank you for this answer
But i am not able to do that (compile the source), therefore i pass.
Too bad for me but thank you for this answer
-
- Posts: 408
- Joined: Sunday 15 January 2017 11:06
- Target OS: Linux
- Domoticz version: beta
- Location: Norway
- Contact:
Re: Looking after script in python
@YelloSky,
Perhaps/hopefully python scripts will become "official" someday, and included in the precompiled binaries. In the meantime, you can achieve quite a lot using python scripts triggered from a device's on and off actions, and using the JSON api to read and manipulate devices, sensors etc.
You'll find quite a lot of documentation in the wiki, and probably find quite a lot of examples, I have the scripts I use posted on github, if you'd like to take a look...
https://github.com/moroen/domoticz-scripts
M
Perhaps/hopefully python scripts will become "official" someday, and included in the precompiled binaries. In the meantime, you can achieve quite a lot using python scripts triggered from a device's on and off actions, and using the JSON api to read and manipulate devices, sensors etc.
You'll find quite a lot of documentation in the wiki, and probably find quite a lot of examples, I have the scripts I use posted on github, if you'd like to take a look...
https://github.com/moroen/domoticz-scripts
M
- Westcott
- Posts: 423
- Joined: Tuesday 09 December 2014 17:04
- Target OS: Raspberry Pi / ODroid
- Domoticz version: Beta
- Location: UK - Glos
- Contact:
Re: Looking after script in python
Just out of interest, I'm using a Python version of Egregius' Pass2php.
On every relevant device change, a JSON encoded data set of devicechanged, otherdevices, otherdevices_idx, and otherdevices_lastupdate is sent to a Python listener script.
If any Domoticz updates are required, these go back as JSON calls.
Because the Python script is running continuously, it can store persistent values, and have its own scheduling.
On every relevant device change, a JSON encoded data set of devicechanged, otherdevices, otherdevices_idx, and otherdevices_lastupdate is sent to a Python listener script.
If any Domoticz updates are required, these go back as JSON calls.
Because the Python script is running continuously, it can store persistent values, and have its own scheduling.
Zwave - Sigma Z+ stick, Fibaro, Horstmann, Neo Coolcam, EUROtronic
RFlink - IR detectors and temperatures
Wifi - YeeLights, ESP32s, Anoop sockets
Zigbee - lots with zigbee2mqtt and ZbBridge
RFlink - IR detectors and temperatures
Wifi - YeeLights, ESP32s, Anoop sockets
Zigbee - lots with zigbee2mqtt and ZbBridge
- G3rard
- Posts: 669
- Joined: Wednesday 04 March 2015 22:15
- Target OS: -
- Domoticz version: No
- Location: The Netherlands
- Contact:
Re: Looking after script in python
Can you share that Pass2Python version?
Not using Domoticz anymore
- Westcott
- Posts: 423
- Joined: Tuesday 09 December 2014 17:04
- Target OS: Raspberry Pi / ODroid
- Domoticz version: Beta
- Location: UK - Glos
- Contact:
Re: Looking after script in python
Hi G3rard,
Here you go.
Note that several extra libraries are needed.
I used luarocks to get them for Lua and pip3 for Python3.
Lua-
This is all my Python 3 code, with functions to schedule the tasks of:
Deleting old unused devices,
Checking battery levels,
Checking if certain local network devices are really online, and
Running internet speed check.
Here you go.
Note that several extra libraries are needed.
I used luarocks to get them for Lua and pip3 for Python3.
Lua-
Code: Select all
function SendToPython(device, data)
--print("Start")
if not JSON then
JSON = require "cjson"
end
if device and data then
d = {}
d[device] = data
changed = JSON.encode(d)
elseif devicechanged then
changed = JSON.encode(devicechanged)
else
changed = ""
end
devices = JSON.encode(otherdevices)
idx = JSON.encode(otherdevices_idx)
lastupdate = JSON.encode(otherdevices_lastupdate)
data = changed..'|'..devices..'|'..idx..'|'..lastupdate
package.loadlib("core.so", "*")
local Socket = require "socket"
local tcp = Socket.connect('127.0.0.1', 8088)
if not tcp then
print("Python socket connect failed")
return
else
tcp:settimeout(2)
i, status = tcp:send(data);
if not i then
print("Python send failed - "..status)
end
end
tcp:close()
--print("End")
end
Deleting old unused devices,
Checking battery levels,
Checking if certain local network devices are really online, and
Running internet speed check.
Code: Select all
#!/usr/local/bin/python3
import socket
import sys
import os
import json
import urllib
import threading
import signal
import time
import requests
import sched
import re
import speedtest
from datetime import datetime
from apscheduler.schedulers.background import BackgroundScheduler
from urllib.request import urlopen
scheduler = BackgroundScheduler()
HOST = '' # Symbolic name meaning all available interfaces
PORT = 8088 # Arbitrary non-privileged port
URL_DOMOTICZ = 'http://localhost:8080/json.htm?type='
f_log = open('/home/pi/domoticz/domoticz-helper.log','w')
def DTstr():
return datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
def Log(msg):
msg = DTstr() + " " + msg
print(msg)
print(msg, file=f_log, flush=True)
#Bind socket to local host and port
try:
skt = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
skt.bind((HOST, PORT))
#Start listening on socket
skt.listen(5)
Log('Socket now listening')
except OSError as msg:
print('Bind failed. Error Code : ' + str(msg[0]) + ' Message ' + msg[1])
sys.exit()
except socket.error as msg:
print('Bind failed. Error Code : ' + str(msg[0]) + ' Message ' + msg[1])
sys.exit()
#print 'Socket bind complete'
def SigHandler(signum = None, frame = None):
try:
Log("Shutdown..")
skt.close()
scheduler.shutdown()
print("")
time.sleep(1) #here check if process is done
SendMessage("Python server stopped", "darkred", "mistyrose")
raise SystemExit
exit()
finally:
Log("Stopped")
os._exit(0)
for sig in [signal.SIGTERM, signal.SIGINT, signal.SIGHUP, signal.SIGQUIT]:
signal.signal(sig, SigHandler)
def DomoticzData(cmd, get=False):
try:
url = URL_DOMOTICZ + cmd
result = requests.get(url)
jsonData = result.json()
if get == True:
return jsonData
elif jsonData["status"] != "OK":
Log("Status " + jsonData["status"] + " for " + cmd)
except(requests.ConnectionError, e):
Log('Request Failed %s - %s' % (e, url) )
def SendMessage(msg, colour="Black", bg="LemonChiffon"):
data = urllib.parse.quote('<font Size="2" COLOR="' + colour + '" style="BACKGROUND-COLOR: ' + bg + '">' + msg + '</font>')
cmd = "command¶m=addlogmessage&message=" + data
threadReqHome = threading.Thread(target=DomoticzData, args=(cmd,))
threadReqHome.start()
svalues = {}
idxes = {}
lastupd = {}
started = False
def StartOnline():
CheckOnline()
scheduler.add_job(CheckOnline, 'interval', seconds = 30)
def StartBattery():
CheckBatteryLevels()
scheduler.add_job(CheckBatteryLevels, 'interval', minutes = 10)
def StartDeleted():
DeleteOldDevices()
scheduler.add_job(DeleteOldDevices, 'interval', minutes = 30)
def StartSTthread():
threadReqHome = threading.Thread(target=RunSpeedTest)
threadReqHome.start()
def ProcessData(data):
global svalues, idxes, lastupd
jchanged, jdevices, jidx, jlastupdates = data.split("|")
changed = json.loads(jchanged)
svalues = json.loads(jdevices)
idxes = json.loads(jidx)
lastupd = json.loads(jlastupdates)
Log(list(changed)[0] + " = " + list(changed.values())[0])
global started
if started == False:
started = True
scheduler.start()
scheduler.add_job(StartSTthread, 'interval', minutes = 60)
sch = sched.scheduler(time.time, time.sleep)
sch.enter(5, 1, StartOnline)
sch.enter(10, 1, StartBattery)
sch.enter(15, 1, StartDeleted)
sch.run()
Log("Schedulers started")
def UpdateDevice(device, cmd):
idx = str(idxes[device])
data = 'command¶m=udevice&idx=PARAM_IDX&svalue=PARAM_CMD'
data = data.replace('PARAM_IDX', idx)
data = data.replace('PARAM_CMD', str(cmd))
threadReqHome = threading.Thread(target=DomoticzData, args=(data,))
threadReqHome.start()
def DeleteDevice(idx):
data = 'deletedevice&idx=PARAM_IDX'
data = data.replace('PARAM_IDX',str(idx))
threadReqHome = threading.Thread(target=DomoticzData, args=(data,))
threadReqHome.start()
def LastUpdated(dd):
DTobj = datetime.strptime(dd['LastUpdate'], '%Y-%m-%d %H:%M:%S')
return datetime.now() - DTobj
def DeleteOldDevices():
jdata = DomoticzData("devices&used=false", True)
if jdata["status"] == "OK":
for dd in jdata['result']:
# print( dd['Name'] + ' ' + dd['LastUpdate'] )
DTdiff = LastUpdated(dd)
if DTdiff.days > 0:
DeleteDevice(dd['idx'])
msg = "Old " + dd['HardwareName'] + ' ' + str(dd['idx']) + ' ' + dd['Type'] + ' ' + dd['SubType']
SendMessage(msg, "darkred")
Log(msg)
# Log("Delete done")
else:
Log("Delete get failed " + jdata["status"])
def CheckBatteryLevels():
if len(svalues) == 0:
return
jdata = DomoticzData("devices&filter=temp&used=true", True)
if jdata["status"] == "OK":
for dd in jdata['result']:
desc = dd['Description']
if len(desc) > 1 and desc in svalues:
batt = int(dd['BatteryLevel'])
prev = int(svalues[desc])
DTdiff = int(LastUpdated(dd).total_seconds())
if batt >= 0 and batt <= 100 and prev >= 0 and prev <= 100 and batt != prev:
msg = dd['Name'] + ' (' + str(idxes[dd['Name']]) + ') ' + str(prev) + ' -> ' + str(batt) + ' ' + str(DTdiff)
SendMessage(msg)
Log(msg)
svalues[desc] = batt
# Log("Batteries done")
else:
Log("Battery get failed " + jdata["status"])
def CheckOnline():
if len(svalues) == 0:
return
jdata = DomoticzData("devices&filter=utility&used=true&order=Name&plan=2", True)
if jdata["status"] == "OK":
devices = {}
macaddr = {}
ipp = re.compile('\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}')
macp = re.compile('([0-9a-f]{2}(?::[0-9a-f]{2}){5})', re.IGNORECASE)
for dd in jdata['result']:
desc = dd['Description']
macg = re.search(macp, desc)
if macg != None:
devices[dd['Name']] = macg.group()
# Log(dd['Name'] + ' ' + mac.group())
else:
Log("Online get failed " + jdata["status"])
return
with open('/proc/net/arp') as f:
for line in f:
# print(line)
ipg = re.search(ipp, line)
macg = re.search(macp, line)
if ipg != None and macg != None:
# print(ip.group() + ' = ' + mac.group(0))
macaddr[macg.group()] = ipg.group()
for name, mac in devices.items():
stat = svalues[name]
if mac in macaddr:
macip = macaddr[mac]
else:
macip = 'Offline'
if stat != macip:
UpdateDevice(name, macip)
svalues[name] = macip
msg = name + ' is now ' + str(macip)
SendMessage(msg)
Log(msg)
# Log("Online done")
def RunSpeedTest():
Log("Speedtest...")
try:
sp = speedtest.Speedtest()
servers = sp.get_best_server()
ping = str(int(servers['latency']))
UpdateDevice("Ping", ping)
dnld = sp.download()
dwn = "{:.2f}".format(dnld/1000000)
UpdateDevice("Download", dwn)
upld = sp.upload()
up = "{:.2f}".format(upld/1000000)
Log(up + " " + dwn + " " + ping + "ms " + servers['name'])
except:
Log("Failed")
SendMessage("Speedtest failed", "darkred")
SendMessage("Python server ready...")
Log("Started")
#Function for handling connections. This will be used to create threads
def ClientThread(conn):
data = ""
BUFF_SIZE = 4096
while True: # Read all of the sent data
#Receiving from client
part = conn.recv(BUFF_SIZE)
data += part.decode('utf-8')
if len(part) < BUFF_SIZE:
break
conn.close()
# reply = 'OK...' + data
# conn.sendall(reply)
if len(data) > 0:
ProcessData(data)
#now keep talking with the client
while True:
#wait to accept a connection - blocking call
conn, addr = skt.accept()
# print 'Connected with ' + addr[0] + ':' + str(addr[1])
#start new thread takes 1st argument as a function name to be run, second is the tuple of arguments to the function.
threading.Thread(target=ClientThread, args=(conn,)).start()
skt.close()
Zwave - Sigma Z+ stick, Fibaro, Horstmann, Neo Coolcam, EUROtronic
RFlink - IR detectors and temperatures
Wifi - YeeLights, ESP32s, Anoop sockets
Zigbee - lots with zigbee2mqtt and ZbBridge
RFlink - IR detectors and temperatures
Wifi - YeeLights, ESP32s, Anoop sockets
Zigbee - lots with zigbee2mqtt and ZbBridge
Who is online
Users browsing this forum: No registered users and 1 guest