WiFi devices presence check
Posted: Tuesday 11 February 2014 19:57
This weekend I was busy with my DD-WRT WiFi routers (TP Link and WRT 54G router) and Raspberry Pi to determine if somebody is at home (detecting WiFi of Cell Phones). See also http://www.domoticz.com/forum/viewtopic ... snmp#p6999. All credits are for SweetPants (http://www.domoticz.com/forum/memberlis ... ofile&u=96)! I have only changed the python file to be able to read a json file with all phones, tables, etc.
I made a bash file which runs every 1 minute from a cron job (sudo crontab -e):
* * * * * /home/pi/script/wifi_presence_check.sh
The bash file looks like (wifi_presence_check.sh):
My json file looks like (wifi_devices.json):
My python script looks like (wifi_presence_check.py):
PS. SNMP must be enabled in your router!
I made a bash file which runs every 1 minute from a cron job (sudo crontab -e):
* * * * * /home/pi/script/wifi_presence_check.sh
The bash file looks like (wifi_presence_check.sh):
Code: Select all
#!/bin/bash
#
#$HOME/signalplus.py -i <ip_router> -c <community_string> -f <JSON_file_with_MAC_addresses> --verbose >> /tmp/wifi_presence_check.log &
sudo python /home/pi/script/wifi_presence_check.py -i 192.168.178.254 -c public -f /home/pi/script/wifi_devices.json --verbose >> /tmp/wifi_presence_check.log &
Code: Select all
{
"83:63:07:05:38:29": {
"Vendor": "BlackBerry",
"Type": "Phone",
"Model": "Torch",
"Owner": "Pietje",
"Idx": 83
},
.......
"F8:FA:51:67:C2:91": {
"Vendor": "Apple",
"Type": "Tablet",
"Model": "iPad Mini",
"Owner": "Frits",
"Idx": 89
}
}
Code: Select all
#!/usr/bin/python
import sys
import argparse
import netsnmp
import json
import httplib
from datetime import datetime
from pprint import pprint
__author__ = 'Harry Soeterbroek & Jan N'
__version__ = '0.2'
def cli_options():
cli_args = {}
cli_args['ipaddress'] = '192.168.178.1'
cli_args['community'] = 'public'
cli_args['version'] = 2
cli_args['secname'] = None
cli_args['seclevel'] = 'AuthNoPriv'
cli_args['authpassword'] = None
cli_args['authprotocol'] = 'MD5'
cli_args['privpassword'] = None
cli_args['privprotocol'] = 'DES'
cli_args['port'] = 161
cli_args['jsonmacaddressfile'] = None
cli_args['verbose'] = False
# Parse the CLI
parser = argparse.ArgumentParser()
parser.add_argument('-v', '--version', help='SNMP version', type=int )
parser.add_argument('-c', '--community', help='SNMPv1/2 community string')
parser.add_argument('-i', '--ipaddress', help='IP address of device to query', required=True)
parser.add_argument('-u', '--secname', help='SNMPv3 secname')
parser.add_argument('-l', '--seclevel', help='SNMPv3 security level (NoAuthNoPriv, AuthNoPriv, AuthPriv)')
parser.add_argument('-A', '--authpassword', help='SNMPv3 authpassword')
parser.add_argument('-a', '--authprotocol', help='SNMPv3 authprotocol (MD5, SHA)')
parser.add_argument('-X', '--privpassword', help='SNMPv3 privpassword')
parser.add_argument('-x', '--privprotocol', help='SNMPv3 privprotocol (DES, 3DES, AES128)')
parser.add_argument('-p', '--port', help='SNMP UDP port', type=int)
parser.add_argument('-f', '--jsonmacaddressfile', help='JSON File with WiFi device MAC addresses', required=True)
parser.add_argument('--verbose', help='Verbose mode', action='store_true', default=False)
# Parse arguments and die if error
try:
args = parser.parse_args()
except Exception:
sys.exit(2)
# Assign and verify SNMP arguments
if args.version:
cli_args['version'] = args.version
if args.community:
cli_args['community'] = args.community
if args.ipaddress:
cli_args['ipaddress'] = args.ipaddress
if (cli_args['version'] != 1) and (cli_args['version'] != 2) and (cli_args['version'] != 3):
print 'ERROR: Only SNMPv2 and SNMPv3 are supported'
sys.exit(2)
if args.secname:
cli_args['secname'] = args.secname
if args.secname:
cli_args['seclevel'] = args.seclevel
if (not cli_args['secname']) and (cli_args['version'] == 3):
print '{0} ERROR: SNMPv3 must specify a secname'.format(date_time())
sys.exit(2)
if args.authpassword:
cli_args['authpassword'] = args.authpassword
if args.authprotocol:
cli_args['authprotocol'] = args.authprotocol.upper()
if args.privpassword:
cli_args['privpassword'] = args.privpassword
if args.privprotocol:
cli_args['privprotocol'] = args.privprotocol.upper()
if args.port:
cli_args['port'] = args.port
if args.jsonmacaddressfile:
cli_args['jsonmacaddressfile'] = args.jsonmacaddressfile
if (cli_args['version']!= 3) and (not cli_args['community']):
print '{0} ERROR: SNMP community string not defined'.format(date_time())
sys.exit(2)
if args.verbose:
cli_args['verbose'] = args.verbose
return (cli_args)
def is_number(val):
try:
float(val)
return True
except ValueError:
return False
def snmp_walk(cli_args, oid):
return_results = {}
session = False
results_objs = False
try:
session = netsnmp.Session(
DestHost=cli_args['ipaddress'],Version=cli_args['version'], Community=cli_args['community'],
SecLevel=cli_args['seclevel'], AuthProto=cli_args['authprotocol'], AuthPass=cli_args['authpassword'],
PrivProto=cli_args['privprotocol'], PrivPass=cli_args['privpassword'], SecName=cli_args['secname'], UseNumeric=True)
results_objs = netsnmp.VarList(netsnmp.Varbind(oid))
session.walk(results_objs)
except Exception as e:
print "{0} ERROR: Occurred during SNMPget for OID {1} from {2}: ({3})".format(date_time(), oid, cli_args['ipaddress'], e)
sys.exit(2)
# Crash on error
if (session.ErrorStr):
print "{0} ERROR: Occurred during SNMPget for OID {1} from {2}: ({3}) ErrorNum: {4}, ErrorInd: {5}".format(date_time(), oid, cli_args['ipaddress'], session.ErrorStr, session.ErrorNum, session.ErrorInd)
sys.exit(2)
# Construct the results to return
for result in results_objs:
if is_number(result.val):
return_results[('%s.%s') % (result.tag, result.iid)] = ( float(result.val))
else:
return_results[('%s.%s') % (result.tag, result.iid)] = ( result.val)
# Print each MAC address in verbose mode
if cli_args['verbose']:
print "{0} DEBUG: MAC address presence:".format(date_time())
for oid, mac in return_results.iteritems():
mac = bin_to_mac(mac).upper() # Convert binary MAC to HEX
print "{0}".format(mac)
return return_results
def date_time():
return datetime.now().strftime('%Y/%m/%d %H:%M:%S')
def bin_to_mac(octet):
return ":".join([x.encode("hex") for x in list(octet)])
def mac_table(cli_parms, oid):
(mac_results) = snmp_walk(cli_parms, oid)
return mac_results
def mac_in_table(searched_mac, mac_results, oid):
mac = searched_mac
# Loop through every MAC address found
for oid, mac in mac_results.iteritems():
mac = bin_to_mac(mac).upper() # Convert binary MAC to HEX
if mac == searched_mac:
return True
return False
# This class is derived from Pymoticz, modified to use httplib
class Domoticz:
def __init__(self, domoticz_host='127.0.0.1:8080'): # Default on localhost
self.host = domoticz_host
def _request(self, url):
(ip, port) = self.host.split(":")
http = httplib.HTTPConnection(ip, port, timeout=2)
http.request("GET", url)
result = http.getresponse()
if (result.status != 200):
raise Exception
http.close()
return json.loads(result.read())
def list(self):
url='/json.htm?type=devices&used=true'
return self._request(url)
def turn_on(self, _id):
url='/json.htm?type=command¶m=switchlight&idx=%s&switchcmd=On' % (_id)
return self._request(url)
def turn_off(self, _id):
url='/json.htm?type=command¶m=switchlight&idx=%s&switchcmd=Off' % (_id)
return self._request(url)
def turn_on_if_off(self, _id):
status=False
if (self.get_switch_status(_id) == "Off"):
self.turn_on(_id)
status=True
return status
def turn_off_if_on(self, _id):
status=False
if (self.get_switch_status(_id) == "On"):
self.turn_off(_id)
status=True
return status
def turn_on_off(self, _id, _state):
url='/json.htm?type=command¶m=switchlight&idx=%s&switchcmd=%s' % (_id, _state)
return self._request(url)
def get_switch_status(self, _id):
try:
device = self.get_device(_id)
except:
return None
return device['Status']
def get_device(self, _id):
url='/json.htm?type=devices&rid=%s' % (_id)
try:
device = self._request(url)
except:
return None
return device['result'][0]
def main():
# Parse the CLI options
(cli_parms) = cli_options()
# Read JSON file
try:
json_data = open(cli_parms['jsonmacaddressfile'])
data = json.load(json_data)
json_data.close()
except Exception as e:
print "{0} ERROR: Occurred during reading file {1}: ({2})".format(date_time(), cli_parms['jsonmacaddressfile'], e)
sys.exit(2)
(found_macs) = mac_table(cli_parms, '.1.3.6.1.4.1.2021.255.3.54.1.3.32.1.4')
for key, value in data.items():
# Switch by Index (idx)
switch_idx = value["Idx"]
# Get instance of Domoticz class optional ip:port, default = 127.0.0.1:8080
d = Domoticz()
# Get Name of switch from Domoticz
switch_name = d.get_device(switch_idx)['Name']
if mac_in_table(key, found_macs, '.1.3.6.1.4.1.2021.255.3.54.1.3.32.1.4'):
if d.turn_on_if_off(switch_idx) and cli_parms['verbose']: # Turn switch On only if Off
print "{0} DEBUG: Switching {1}: {2}".format(date_time(), "On", switch_name)
else:
if d.turn_off_if_on(switch_idx) and cli_parms['verbose']:# Turn switch Off only if On
print "{0} DEBUG: Switching {1}: {2}".format(date_time(), "Off", switch_name)
if __name__ == "__main__":
main()