I have continued to improve my script from my Jan 4, 2024 post.
A number of changes and improvements made include:
- the time period of the warning is now also visible
- the script runs in an infinite loop and does not need to be started periodically
- A KNMI file is only downloaded if a new file is available
- Receives a message via MQTT from the KNMI that a new file is available
All this has a price. Due to the security at the KNMI, multiple keys are required, which you can obtain from the KNMI.
Domoticz's security also requires a key
EXPLANATION
In the script, a number of variables are defined in the initialize global variables section.
Some of these require a personal value. The script explains how this value can be obtained.
These variables have been given the value "Create your own" in the script. change this to your own value
The URL for Domoticz must also be specified in the variable "URL_DOMOTICZ"
The log file is placed in the same folder in which the script is running.
If you want to put it in the default folder for log files, change OUTPUTFILE to: OUTPUTFILE = f"/var/log/{SCRPT_NAME}.log"
To start the script on every reboot, add the following line to the crontab file with the command: crontab -e
@reboot sleep 120 && /usr/bin/python3 -u PATH TO SCRIPT
(example path to script :/home/pi/knmi_alert/WeatherAlertToDomoticz.py)
The MQTT modules are not always installed in Python3, this can be done manually with "pip install paho-mqtt"
In domoticz a device of the alert type is expected

- alert_device.jpg (14 KiB) Viewed 1883 times
After the program has started, a message appears stating that no data has been found. This message will remain until a new file is issued by the KNMI and can take up to 5 hours
Code: Select all
import os
import sys
import ssl
import json
import logging
import requests
from time import sleep
from queue import Queue
from datetime import datetime
import xml.etree.ElementTree as ET
import paho.mqtt.client as mqtt_client
import paho.mqtt.properties as properties
from logging.handlers import RotatingFileHandler
# =============================================================================
# ===================== initialize global variabelen ==========================
# =============================================================================
# global variable in use for mqtt
BROKER_DOMAIN = "mqtt.dataplatform.knmi.nl"
# Client ID should be made static, it is used to identify your session, so that
# missed events can be replayed after a disconnect
# https://www.uuidgenerator.net/version4
CLIENT_ID = "Create your own"
# Obtain your token at: https://developer.dataplatform.knmi.nl/notification-service
TOKEN = "Create your own"
# This will listen to both file creation and update events of this dataset:
# https://dataplatform.knmi.nl/dataset/waarschuwingen-nederland-48h-1-0
# This topic has at least 1 event every 5 hours. Usually more
TOPIC = "dataplatform/file/v1/waarschuwingen_nederland_48h/1.0/#"
# Version 3.1.1 also supported
PROTOCOL = mqtt_client.MQTTv5
# global variable in use when retrieving xml file
# Obtain your token at: https://developer.dataplatform.knmi.nl/open-data-api#token
API_KEY = "Create your own"
DATASET_NAME = "waarschuwingen_nederland_48h"
DATASET_VERSION = "1.0"
# variables related to domoticz
# The authorization requirements can be found at
# https://www.domoticz.com/wiki/Domoticz_API/JSON_URL%27s#Authorization
# user: domoticz pasw: class AUTORISATIE_TOKEN => "Basic ZG9tb3RpY3o6Y2xhc3M="
AUTORISATIE_TOKEN = "Create your own"
DEVICE_ID="Create your own" # number of the device in domoticz
PROVINCE= "ZH" # default value. You can choose between: WAE, GR, FR, DR, NH, FL, OV, GL, UT, ZH, ZE, NB, LB, WAB
WEEKDAYS=["ma","di","wo","do","vr","za","zo"] # list to determine weekdays
URL_DOMOTICZ = "http://xxx.xxx.xxx.xxx:8080"
# other global variables
# create a queue for the available files
File_Q = Queue()
# script name. Used for log file and temporary file names
SCRPT_NAME = os.path.basename(sys.argv[0])[:-3]
OUTPUTFILE = f"{SCRPT_NAME}.log"
# waiting time between script executions
WAIT_EXECUTION = 300 # 5 MIN
# number of wait cycles before domoticz is refreshed (3600/wait time)
WAIT_CYCLES = int(3600/WAIT_EXECUTION)-1
# =============================================================================
# *****************************************************************************
# ****************** don't change anything below this line ********************
# *****************************************************************************
# =============================================================================
# =============================================================================
# ========================== initialize logging ===============================
# =============================================================================
logging.basicConfig(
format='%(asctime)s: %(levelname)-8s: %(funcName)-12s: %(lineno)-4d: %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
level = logging.INFO,
)
logger = logging.getLogger(__name__)
# Define a handler witch writes INFO messages or higher to a file and rotate at max bytes
fh = RotatingFileHandler(OUTPUTFILE,maxBytes=102400, backupCount=2) # max 0.1 mb
fh.setLevel(logging.INFO)
# set a format which is for File use
ff = logging.Formatter('%(asctime)s: %(levelname)-8s: %(funcName)-12s: %(message)s','%Y-%m-%d %H:%M:%S',)
fh.setFormatter(ff)
# add the handler to the root logger
logger.addHandler(fh)
logger.info ("="*52)
logger.info ("="*18 + " Script started " + "="*18)
logger.info ("="*52)
# =============================================================================
# ================================ Classes ====================================
# =============================================================================
class Domoticz:
"""
Class that simplifies communication with domoticz.
The authorization requirements can be found at
https://www.domoticz.com/wiki/Domoticz_API/JSON_URL%27s#Authorization
__get_data provides the communication with domoticz. This function is called by
the other functions that configure the appropriate parameters
Functions:
Status => requests the status of a device with a specified ID
update => update the value of the specified device with the specified values
which value is expected depends on the device and can be found at:
https://www.domoticz.com/wiki/Domoticz_API/JSON_URL%27s#Update_devices.2Fsensors
log => put a message in the domoticz log
variable=> requests the value of a domoticz variable with the specified ID
"""
def __init__(self, url:str = None , id:int = None) -> None:
self.base_url = f"{URL_DOMOTICZ}/json.htm" if not url else url
self.headers = {"Authorization":AUTORISATIE_TOKEN}
self.id=id
self.type='command'
def __get_data(self, url:str, params:dict=None):
try:
with requests.get(url, headers=self.headers, params=params) as r:
logger.info(f"=> Domoticz: {r.url}")
return r.json()
except Exception as err:
logger.error()(f"Error handeling domoticz \n{err}")
return"ERROR"
def status(self,id:int=None):
id = self.id if not id else id
return self.__get_data(self.base_url,dict(
type =self.type,
param ='getdevices',
rid = id
))
def update(self,id:int= None, nvalue:float=0, svalue:str=""):
id = self.id if not id else id
return self.__get_data(self.base_url,dict(
type =self.type,
param ='udevice',
idx = id,
nvalue = nvalue,
svalue = svalue
))
def log (self, mess, level:int = 2):
return (self.__get_data(self.base_url,dict(
type =self.type,
param ='addlogmessage',
message = mess,
level = level
)))
def variabele (self, id:int=1):
return self.__get_data(self.base_url,dict(
type =self.type,
param ='getuservariable',
idx = id
))
class OpenDataAPI:
def __init__(self, api_token: str):
self.base_url = "https://api.dataplatform.knmi.nl/open-data/v1"
self.headers = {"Authorization": api_token}
def __get_data(self, url, params=None):
return requests.get(url, headers=self.headers, params=params).json()
def list_files(self, DATASET_NAME: str, DATASET_VERSION: str, params: dict):
return self.__get_data(
f"{self.base_url}/datasets/{DATASET_NAME}/versions/{DATASET_VERSION}/files",
params=params,
)
def get_file_url(self, temp_url: str):
return self.__get_data(
f"{temp_url}"
)
# =============================================================================
# ============================== Functions ====================================
# =============================================================================
def connect_mqtt() -> mqtt_client:
def on_connect(c: mqtt_client, userdata, flags, rc, reason_code, props=None):
logger.info(f"Connected using client ID: {str(c._client_id)}")
logger.info(f"Session present: {str(flags['session present'])}")
logger.info(f"Connection result: {str(rc)}")
# Subscribe here so it is automatically done after disconnect
subscribe(c, TOPIC)
client = mqtt_client.Client(client_id=CLIENT_ID, protocol=PROTOCOL, transport="websockets")
client.tls_set(tls_version=ssl.PROTOCOL_TLS)
connect_properties = properties.Properties(properties.PacketTypes.CONNECT)
# Maximum is 3600
connect_properties.SessionExpiryInterval = 3600
# The MQTT username is not used for authentication, only the token
username = "token"
client.username_pw_set(username, TOKEN)
client.on_connect = on_connect
client.connect(host=BROKER_DOMAIN, port=443, keepalive=60, clean_start=False, properties=connect_properties) # set the clean_start to false or true. to delete previous sessions
return client
def subscribe(client: mqtt_client, topic: str):
def on_message(c: mqtt_client,userdata , message):
# NOTE: Do NOT do slow processing in this function, as this will interfere with PUBACK messages for QoS=1.
# A couple of seconds seems fine, a minute is definitely too long.
#logger.info(f"Received message on topic {message.topic}: {str(message.payload)}")
load = json.loads(message.payload)
if 'data'in load:
if 'xml' in load['data']['filename']:
# Place the available file in a queue
File_Q.put(load['data']['url'])
else:
logger.info(f"Received message on topic: {message.topic}: {str(message.payload)}")
def on_subscribe(c: mqtt_client, userdata, mid, granted_qos, *other):
logger.info(f"Subscribed to topic '{topic}'")
client.on_subscribe = on_subscribe
client.on_message = on_message
# A qos=1 will replay missed events when reconnecting with the same client ID. Use qos=0 to disable
client.subscribe(topic, qos=1)
def get_knmi_files():
logger.info(f"Fetching latest file of {DATASET_NAME} version {DATASET_VERSION}")
#download_url='https://api.dataplatform.knmi.nl/open-data/v1/datasets/waarschuwingen_nederland_48h/versions/1.0/files/knmi_waarschuwingen_202402120747.xml/url'
download_url= File_Q.get()
api = OpenDataAPI(api_token=API_KEY)
try:
response = api.get_file_url(download_url)
with requests.get(response['temporaryDownloadUrl'], stream=True) as r:
r.raise_for_status()
with open(f"{SCRPT_NAME}.xml", "wb") as f:
for chunk in r.iter_content(chunk_size=8192): # kan naar 1024 voor minder mem gebruik
f.write(chunk)
except Exception:
logger.error("Unable to download file using download URL")
sys.exit('system error. Program terminated')
def parse_XML_file(File):
# Check if data is available
if not os.path.isfile(File): return (0, "No data available \nThis can take up to 5 hours")
#get data from file
doc= ET.parse(File)
root = doc.getroot()
#set local variabelen
warningStatus = 0
WarningList = []
WarningText =""
status=False
startTime=""
endTime=""
# loop through the selected data and process the outcome
for timeslot in root.findall(".//data/cube/timeslice//"):
# save the time of the timeslot being processed
if timeslot.tag == "timeslice_id":
TimeslotTime = timeslot.text
# process all phenomena in the timeslot
if timeslot.tag == "phenomenon":
for phenomenon in timeslot:
# process all locations in phenomena
if phenomenon.tag == "location":
for locations in phenomenon:
# proces wanted location in locations
if locations.tag == "location_id" and locations.text != PROVINCE: break # if not Wanted area then exit for loop
if locations.tag =="location_warning_status" and int(locations.text) > 0: # if a warning is active
status= True # flag to mark whether text needs to be processed
# sets the highest warning level in warningStatus
if warningStatus < int(locations.text): warningStatus= int(locations.text)
# proces the warning text
if status and locations.tag == "text": # process text
status = False
logger.debug(f"Time slot with warning: {TimeslotTime}")
# manage start and end time
if startTime == "": startTime= datetime.fromisoformat(TimeslotTime)
endTime = datetime.fromisoformat(TimeslotTime)
# Get the desired text
for txt in locations:
if txt.tag == "text_data" and txt.text != None : WarningList.append(txt.text) # put the text in the list
# loop through the selected data and process the outcome
if startTime : # if a time is available, include it in the WarningText
WarningText= f"Van {WEEKDAYS[datetime.weekday(startTime)]} {startTime.hour}:00 tot {WEEKDAYS[datetime.weekday(endTime)]} {endTime.hour + 1}:00 \n"
# remove duplicates from the WarningList list
WarningList = list(dict.fromkeys(WarningList))
#transform list to a string
if WarningList: WarningText += "\n".join(WarningList)
if not WarningText: WarningText ="Geen waarschuwingen"
return warningStatus,WarningText
def run():
wait_cycles = 1
client = connect_mqtt()
client.loop_start()
wwd=Domoticz(id=DEVICE_ID)
Wstatus,Wtext = parse_XML_file(f"{SCRPT_NAME}.xml")
wwd.update(nvalue=Wstatus,svalue=Wtext) # send to domoticz
while True:
while not File_Q.empty(): # execute when new files are available
get_knmi_files()
wait_cycles = WAIT_CYCLES+1
if wait_cycles >= WAIT_CYCLES: # run once an hour or when there are new files
Wstatus,Wtext = parse_XML_file(f"{SCRPT_NAME}.xml")
#convert the status to the domoticz requirements (add 1 to the status)
Wstatus += 1
wwd.update(nvalue=Wstatus,svalue=Wtext) # send to domoticz
wwd.log(f'{SCRPT_NAME}: Has updated device {DEVICE_ID}')
wait_cycles = 0
else:
wait_cycles += 1
sleep(WAIT_EXECUTION) # time for a cup of coffee.....or a nap
if __name__ == "__main__":
run()
If you have any comments or wishes, please let me know.
Kind regards, HansH