Python Plugin: Solar Inverter via TCP Shine v2 protocol (Growatt, Gaia)
Moderator: leecollings
-
- Posts: 25
- Joined: Sunday 18 June 2017 7:28
- Target OS: Linux
- Domoticz version: V3.7976
- Location: Netherlands
- Contact:
Python Plugin: Solar Inverter via TCP Shine v2 protocol (Growatt, Gaia)
This is the topic voor developping a plugin to communicate with the wifistick of GAIA grid tie Inverters and with growatt inverters via a tcp rs232 converter.
http://www.ebay.com/itm/2000W-Solar-on- ... F5Ow8R3kEw
http://m.ebay.com/itm/USR-TCP232-302-RS ... iid%253A24
http://www.ebay.com/itm/2000W-Solar-on- ... F5Ow8R3kEw
http://m.ebay.com/itm/USR-TCP232-302-RS ... iid%253A24
Re: Python Plugin: Solar Inverter via TCP Shine v2 protocol (Growatt, Gaia)
still have no luck with datalog dunno whats wrong
1/on my wifiplug i added IP address of my machine (on same local network)
2/on my machine instaled netcap with
but without any response in terminal (or is log stored in file somewhere ?)
when i click on test button on admin page status is YES
BTW my plug is wifi and not ethernet, but thats not important
here is link
https://www.aliexpress.com/item/Wifi-Po ... .34.JHzPYW
1/on my wifiplug i added IP address of my machine (on same local network)
2/on my machine instaled netcap with
Code: Select all
sudo ncat -e /bin/cat -k -t -l 50
when i click on test button on admin page status is YES
BTW my plug is wifi and not ethernet, but thats not important
here is link
https://www.aliexpress.com/item/Wifi-Po ... .34.JHzPYW
Last edited by devros on Wednesday 21 June 2017 8:45, edited 2 times in total.
-
- Posts: 25
- Joined: Sunday 18 June 2017 7:28
- Target OS: Linux
- Domoticz version: V3.7976
- Location: Netherlands
- Contact:
Re: Python Plugin: Solar Inverter via TCP Shine v2 protocol (Growatt, Gaia)
Maybe the wifistick is waiting for some commands.
There is a tcp modbus library for python...
https://pypi.python.org/pypi/pyModbusTCP/0.1.1
It can also act as modbus server
http://pythonhosted.org/pyModbusTCP/exa ... erver.html
There is a tcp modbus library for python...
https://pypi.python.org/pypi/pyModbusTCP/0.1.1
It can also act as modbus server

http://pythonhosted.org/pyModbusTCP/exa ... erver.html
Re: Python Plugin: Solar Inverter via TCP Shine v2 protocol (Growatt, Gaia)
in little lost, installed pyModbusTCP on my python3jeroenst wrote:Maybe the wifistick is waiting for some commands.
There is a tcp modbus library for python...
https://pypi.python.org/pypi/pyModbusTCP/0.1.1
It can also act as modbus server
http://pythonhosted.org/pyModbusTCP/exa ... erver.html
created server.py with your example, its running on 502 port (and changed 502 on wifistick admin)
but still no response
any idea how to send commands ?
dunno if it usefull but default conection is to
http://shinemonitor.com:502/
when paste it to browser this file is downloaded
24af 0001 000a ff01 1106 1416 1103 0023
also found this link with protocol info http://www.rtaautomation.com/technologies/modbus-tcpip/
-
- Posts: 25
- Joined: Sunday 18 June 2017 7:28
- Target OS: Linux
- Domoticz version: V3.7976
- Location: Netherlands
- Contact:
Re: Python Plugin: Solar Inverter via TCP Shine v2 protocol (Growatt, Gaia)
Is it also possible to connect to the wifi module on a specific port to communicate with modbus?
The python modbus module is not meant for sending out commands, but for receiving commands afaik...
If it is possible to connect to the wifi stick try this (change the ip and port to match your wifi stick) :
The python modbus module is not meant for sending out commands, but for receiving commands afaik...
If it is possible to connect to the wifi stick try this (change the ip and port to match your wifi stick) :
Code: Select all
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# read_register
# read 10 registers and print result on stdout
# you can use the tiny modbus server "mbserverd" to test this code
# mbserverd is here: https://github.com/sourceperl/mbserverd
# the command line modbus client mbtget can also be useful
# mbtget is here: https://github.com/sourceperl/mbtget
from pyModbusTCP.client import ModbusClient
import time
SERVER_HOST = "localhost" ######## IP OF YOUR WIFI STICK #######
SERVER_PORT = 502 ####### MODBUS PORT OF YOUR WIFI STICK ########
c = ModbusClient()
# uncomment this line to see debug message
#c.debug(True)
# define modbus server host, port
c.host(SERVER_HOST)
c.port(SERVER_PORT)
while True:
# open or reconnect TCP to server
if not c.is_open():
if not c.open():
print("unable to connect to "+SERVER_HOST+":"+str(SERVER_PORT))
# if open() is ok, read register (modbus function 0x03)
if c.is_open():
# read 10 registers at address 0, store result in regs list
regs = c.read_input_registers(0, 100)
# if success display registers
if regs:
print("reg ad #0 to 100: "+str(regs))
# sleep 2s before next polling
time.sleep(2)
Re: Python Plugin: Solar Inverter via TCP Shine v2 protocol (Growatt, Gaia)
Thanks, but cant find right port, looks like only 80/tcp is openjeroenst wrote:Is it also possible to connect to the wifi module on a specific port to communicate with modbus?
The python modbus module is not meant for sending out commands, but for receiving commands afaik...
If it is possible to connect to the wifi stick try this (change the ip and port to match your wifi stick) :
Code: Select all
#!/usr/bin/env python # -*- coding: utf-8 -*- # read_register # read 10 registers and print result on stdout # you can use the tiny modbus server "mbserverd" to test this code # mbserverd is here: https://github.com/sourceperl/mbserverd # the command line modbus client mbtget can also be useful # mbtget is here: https://github.com/sourceperl/mbtget from pyModbusTCP.client import ModbusClient import time SERVER_HOST = "localhost" ######## IP OF YOUR WIFI STICK ####### SERVER_PORT = 502 ####### MODBUS PORT OF YOUR WIFI STICK ######## c = ModbusClient() # uncomment this line to see debug message #c.debug(True) # define modbus server host, port c.host(SERVER_HOST) c.port(SERVER_PORT) while True: # open or reconnect TCP to server if not c.is_open(): if not c.open(): print("unable to connect to "+SERVER_HOST+":"+str(SERVER_PORT)) # if open() is ok, read register (modbus function 0x03) if c.is_open(): # read 10 registers at address 0, store result in regs list regs = c.read_input_registers(0, 100) # if success display registers if regs: print("reg ad #0 to 100: "+str(regs)) # sleep 2s before next polling time.sleep(2)
Nmap scan report for 192.168.2.212 Host ... (Unknown)
i can give you my SSH in PM if you wan to try...
-
- Posts: 25
- Joined: Sunday 18 June 2017 7:28
- Target OS: Linux
- Domoticz version: V3.7976
- Location: Netherlands
- Contact:
Re: Python Plugin: Solar Inverter via TCP Shine v2 protocol (Growatt, Gaia)
Can you do a scan from port 0 to 65536?
Re: Python Plugin: Solar Inverter via TCP Shine v2 protocol (Growatt, Gaia)
tried to use tshark and finaly found comunication with serverjeroenst wrote:Can you do a scan from port 0 to 65536?
Code: Select all
61 25.818115000 ba:ba:ac:9c:fc:00 -> ARP 44 Who has 192.168.2.212? Tell 192.168.2.61
62 25.837099000 Shanghai_28:2a:fe -> ARP 44 192.168.2.212 is at f0:fe:6b:28:2a:fe
Code: Select all
Frame 62: 44 bytes on wire (352 bits), 44 bytes captured (352 bits) on interface 0
Interface id: 0 (any)
Encapsulation type: Linux cooked-mode capture (25)
Arrival Time: Jun 21, 2017 16:11:30.993331000 CEST
[Time shift for this packet: 0.000000000 seconds]
Epoch Time: 1498054290.993331000 seconds
[Time delta from previous captured frame: 0.018984000 seconds]
[Time delta from previous displayed frame: 0.018984000 seconds]
[Time since reference or first frame: 25.837099000 seconds]
Frame Number: 62
Frame Length: 44 bytes (352 bits)
Capture Length: 44 bytes (352 bits)
[Frame is marked: False]
[Frame is ignored: False]
[Protocols in frame: sll:ethertype:arp]
Linux cooked capture
Packet type: Unicast to us (0)
Link-layer address type: 1
Link-layer address length: 6
Source: Shanghai_28:2a:fe (f0:fe:6b:28:2a:fe)
Protocol: ARP (0x0806)
Address Resolution Protocol (reply)
Hardware type: Ethernet (1)
Protocol type: IP (0x0800)
Hardware size: 6
Protocol size: 4
Opcode: reply (2)
Sender MAC address: Shanghai_28:2a:fe (f0:fe:6b:28:2a:fe)
Sender IP address: 192.168.2.212 (192.168.2.212)
Target MAC address: ba:ba:ac:9c:fc:00 (ba:ba:ac:9c:fc:00)
Target IP address: 192.168.2.61 (192.168.2.61)
Frame 63: 60 bytes on wire (480 bits), 60 bytes captured (480 bits) on interface 0
Interface id: 0 (any)
Encapsulation type: Linux cooked-mode capture (25)
Arrival Time: Jun 21, 2017 16:11:31.956889000 CEST
[Time shift for this packet: 0.000000000 seconds]
Epoch Time: 1498054291.956889000 seconds
[Time delta from previous captured frame: 0.963558000 seconds]
[Time delta from previous displayed frame: 0.963558000 seconds]
[Time since reference or first frame: 26.800657000 seconds]
Frame Number: 63
Frame Length: 60 bytes (480 bits)
Capture Length: 60 bytes (480 bits)
[Frame is marked: False]
[Frame is ignored: False]
[Protocols in frame: sll:ethertype:ip:tcp]
Linux cooked capture
Packet type: Unicast to us (0)
Link-layer address type: 1
Link-layer address length: 6
Source: Shanghai_28:2a:fe (f0:fe:6b:28:2a:fe)
Protocol: IP (0x0800)
Internet Protocol Version 4, Src: 192.168.2.212 (192.168.2.212), Dst: 192.168.2.61 (192.168.2.61)
Version: 4
Header Length: 20 bytes
Differentiated Services Field: 0x00 (DSCP 0x00: Default; ECN: 0x00: Not-ECT (Not ECN-Capable Transport))
0000 00.. = Differentiated Services Codepoint: Default (0x00)
.... ..00 = Explicit Congestion Notification: Not-ECT (Not ECN-Capable Transport) (0x00)
Total Length: 44
Identification: 0xae7c (44668)
Flags: 0x00
0... .... = Reserved bit: Not set
.0.. .... = Don't fragment: Not set
..0. .... = More fragments: Not set
Fragment offset: 0
Time to live: 255
Protocol: TCP (6)
Header checksum: 0x86ed [validation disabled]
[Good: False]
[Bad: False]
Source: 192.168.2.212 (192.168.2.212)
Destination: 192.168.2.61 (192.168.2.61)
[Source GeoIP: Unknown]
[Destination GeoIP: Unknown]
Transmission Control Protocol, Src Port: 1601 (1601), Dst Port: 502 (502), Seq: 0, Len: 0
Source Port: 1601 (1601)
Destination Port: 502 (502)
[Stream index: 29]
[TCP Segment Len: 0]
Sequence number: 0 (relative sequence number)
Acknowledgment number: 0
Header Length: 24 bytes
.... 0000 0000 0010 = Flags: 0x002 (SYN)
000. .... .... = Reserved: Not set
...0 .... .... = Nonce: Not set
.... 0... .... = Congestion Window Reduced (CWR): Not set
.... .0.. .... = ECN-Echo: Not set
.... ..0. .... = Urgent: Not set
.... ...0 .... = Acknowledgment: Not set
.... .... 0... = Push: Not set
.... .... .0.. = Reset: Not set
.... .... ..1. = Syn: Set
[Expert Info (Chat/Sequence): Connection establish request (SYN): server port 502]
[Connection establish request (SYN): server port 502]
[Severity level: Chat]
[Group: Sequence]
.... .... ...0 = Fin: Not set
Window size value: 1500
[Calculated window size: 1500]
Checksum: 0x901d [validation disabled]
[Good Checksum: False]
[Bad Checksum: False]
Urgent pointer: 0
Options: (4 bytes), Maximum segment size
Maximum segment size: 1400 bytes
Kind: Maximum Segment Size (2)
Length: 4
MSS Value: 1400
[SEQ/ACK analysis]
[TCP Analysis Flags]
[Expert Info (Note/Sequence): A new tcp session is started with the same ports as an earlier session in this trace]
[A new tcp session is started with the same ports as an earlier session in this trace]
[Severity level: Note]
[Group: Sequence]
-
- Posts: 25
- Joined: Sunday 18 June 2017 7:28
- Target OS: Linux
- Domoticz version: V3.7976
- Location: Netherlands
- Contact:
Re: Python Plugin: Solar Inverter via TCP Shine v2 protocol (Growatt, Gaia)
Did you also do a portscan from port 1 to port 65535?
It would be best if we can use the python library...
It would be best if we can use the python library...
Re: Python Plugin: Solar Inverter via TCP Shine v2 protocol (Growatt, Gaia)
jeroenst wrote:Did you also do a portscan from port 1 to port 65535?
It would be best if we can use the python library...
just did full scan, only 80 is opened....

-
- Posts: 25
- Joined: Sunday 18 June 2017 7:28
- Target OS: Linux
- Domoticz version: V3.7976
- Location: Netherlands
- Contact:
Re: Python Plugin: Solar Inverter via TCP Shine v2 protocol (Growatt, Gaia)
Okay, then it is not easy for me to write a plugin because the lineaire doesn't support it.
Maybe you can buy the rs232 ethernet converter I found?
Maybe you can buy the rs232 ethernet converter I found?
Re: Python Plugin: Solar Inverter via TCP Shine v2 protocol (Growatt, Gaia)
ok thanks for help, im using json from shine website, its not perfect but it somehow works with my limited python skilsjeroenst wrote:Okay, then it is not easy for me to write a plugin because the lineaire doesn't support it.
Maybe you can buy the rs232 ethernet converter I found?
but noticed that wifiplug is sending data (so multiple server setting in wifiplug admin works)
every 5 minutes (one to shinemonitor.comand second to my domoticz computer)
so its sendig data..., but how to decoce
Code: Select all
t
Source: 192.168.2.212 (192.168.2.212) WIFI PLUG ADRESS
Destination: 192.168.2.61 (192.168.2.61) MY MACHINE
[Source GeoIP: Unknown]
[Destination GeoIP: Unknown]
Transmission Control Protocol, Src Port: 1601 (1601), Dst Port: 502 (502), Seq: 0, Len: 0
Source Port: 1601 (1601)
Destination Port: 502 (502)
-
- Posts: 25
- Joined: Sunday 18 June 2017 7:28
- Target OS: Linux
- Domoticz version: V3.7976
- Location: Netherlands
- Contact:
Re: Python Plugin: Solar Inverter via TCP Shine v2 protocol (Growatt, Gaia)
That's indeed the question, how to decode..
I can create a server which dumps received data in hex, maybe we can discover the modbus rtu protocol in it...
I can create a server which dumps received data in hex, maybe we can discover the modbus rtu protocol in it...
Re: Python Plugin: Solar Inverter via TCP Shine v2 protocol (Growatt, Gaia)
jeroenst wrote:That's indeed the question, how to decode..
I can create a server which dumps received data in hex, maybe we can discover the modbus rtu protocol in it...
sure if you create server i can add it to my wifiplug admin page
checked original growatt wifi plug and all looks almost same as mine...
-
- Posts: 25
- Joined: Sunday 18 June 2017 7:28
- Target OS: Linux
- Domoticz version: V3.7976
- Location: Netherlands
- Contact:
Re: Python Plugin: Solar Inverter via TCP Shine v2 protocol (Growatt, Gaia)
Try this:
Code: Select all
#!/usr/bin/env php
<?php
// Set the ip and port we will listen on
$address = '0.0.0.0';
$port = 502;
$max_clients = 1;
// Array that will hold client information
$clients = Array();
// Create a TCP Stream socket
$master_socket = socket_create(AF_INET, SOCK_STREAM, 0);
// Bind the socket to an address/port
socket_bind($master_socket, $address, $port);
// Start listening for connections
socket_listen($master_socket);
// master loop
while (true) {
// Setup clients listen socket for reading
$read = array();
$read[] = $master_socket;
$write = NULL;
$errror = NULL;
// Add clients to the $read array
foreach($clients as $client){
$read[] = $client;
}
// Set up a blocking call to socket_select()
$ready = socket_select($read, $write, $error, null);
if ($ready == 0){
continue;
}
// if a new connection is being made add it to the client array
if (in_array($master_socket, $read)){
if (count($clients) <= $max_clients){
echo "\n\nClient connected...\n\n";
$clients[] = socket_accept($master_socket);
}else{
echo "\n\nMax clients reached...\n\n";
}
// remove master socket from the read array
$key = array_search($master_socket, $read);
unset($read[$key]);
}
// If a client is trying to write - handle it now
foreach($read as $client){
$input = socket_read($client, 1);
// Zero length string meaning disconnected
if ($input == null) {
echo "\n\nClient disconnected...\n\n";
$key = array_search($client, $clients);
unset($clients[$key]);
}
if ($input) {
echo sprintf('0x%02x',ord($input[0]))." ";
}
}
} // eo master loop
// Close the master sockets
socket_close($master_socket);
?>
Re: Python Plugin: Solar Inverter via TCP Shine v2 protocol (Growatt, Gaia)
Nothing sorry....
its strange
its strange
-
- Posts: 25
- Joined: Sunday 18 June 2017 7:28
- Target OS: Linux
- Domoticz version: V3.7976
- Location: Netherlands
- Contact:
Re: Python Plugin: Solar Inverter via TCP Shine v2 protocol (Growatt, Gaia)
Than the wifi stick is not sending any data to your domoticz computer although in a message earlier you said it was sending data every 5 minutes...devros wrote:Nothing sorry....
its strange
Re: Python Plugin: Solar Inverter via TCP Shine v2 protocol (Growatt, Gaia)
I was still thinking about something 
i tried this python script to open specific port
and output is Connected by ('192.168.2.212', 1351)
so its using 1351 port it looks...

i tried this python script to open specific port
Code: Select all
#!/usr/bin/python
# Demo server to open port 8888
# Modified from Python tutorial docs
import socket
HOST = '192.168.2.61' # my IP adress
PORT = 502 # defined port for server
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind((HOST, PORT))
s.listen(1)
conn, addr = s.accept()
print 'Connected by', addr
while 1:
data = conn.recv(1024)
if not data: break
conn.send(data)
conn.close()
so its using 1351 port it looks...
Re: Python Plugin: Solar Inverter via TCP Shine v2 protocol (Growatt, Gaia)
nmap info about port isdevros wrote:I was still thinking about something
i tried this python script to open specific portand output is Connected by ('192.168.2.212', 1351)Code: Select all
#!/usr/bin/python # Demo server to open port 8888 # Modified from Python tutorial docs import socket HOST = '192.168.2.61' # my IP adress PORT = 502 # defined port for server s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.bind((HOST, PORT)) s.listen(1) conn, addr = s.accept() print 'Connected by', addr while 1: data = conn.recv(1024) if not data: break conn.send(data) conn.close()
so its using 1351 port it looks...
CODE: SELECT ALL
PORT STATE SERVICE
1351/tcp closed equationbuilder
so it looks that its waiting for some command
-
- Posts: 25
- Joined: Sunday 18 June 2017 7:28
- Target OS: Linux
- Domoticz version: V3.7976
- Location: Netherlands
- Contact:
Re: Python Plugin: Solar Inverter via TCP Shine v2 protocol (Growatt, Gaia)
Try this:
Code: Select all
# -*- coding: utf-8 -*-
# Python module: ModbusClient class (Client ModBus/TCP)
VERSION = '0.1.1'
## ModBus/TCP
MODBUS_PORT = 502
## Modbus mode
MODBUS_TCP = 1
MODBUS_RTU = 2
## Modbus function code
# standard
READ_COILS = 0x01
READ_DISCRETE_INPUTS = 0x02
READ_HOLDING_REGISTERS = 0x03
READ_INPUT_REGISTERS = 0x04
WRITE_SINGLE_COIL = 0x05
WRITE_SINGLE_REGISTER = 0x06
WRITE_MULTIPLE_COILS = 0x0F
WRITE_MULTIPLE_REGISTERS = 0x10
MODBUS_ENCAPSULATED_INTERFACE = 0x2B
## Modbus except code
EXP_NONE = 0x00
EXP_ILLEGAL_FUNCTION = 0x01
EXP_DATA_ADDRESS = 0x02
EXP_DATA_VALUE = 0x03
EXP_SLAVE_DEVICE_FAILURE = 0x04
EXP_ACKNOWLEDGE = 0x05
EXP_SLAVE_DEVICE_BUSY = 0x06
EXP_MEMORY_PARITY_ERROR = 0x08
EXP_GATEWAY_PATH_UNAVAILABLE = 0x0A
EXP_GATEWAY_TARGET_DEVICE_FAILED_TO_RESPOND = 0x0B
## Module error codes
MB_NO_ERR = 0
MB_RESOLVE_ERR = 1
MB_CONNECT_ERR = 2
MB_SEND_ERR = 3
MB_RECV_ERR = 4
MB_TIMEOUT_ERR = 5
MB_FRAME_ERR = 6
MB_EXCEPT_ERR = 7
MB_CRC_ERR = 8
######################
# compute CRC of frame
######################
def crc16(frame):
"""Compute CRC16
:param frame: frame
:type frame: str (Python2) or class bytes (Python3)
:returns: CRC16
:rtype: int
"""
crc = 0xFFFF
for index, item in enumerate(bytearray(frame)):
next_byte = item
crc ^= next_byte
for i in range(8):
lsb = crc & 1
crc >>= 1
if lsb:
crc ^= 0xA001
return crc
def set_bit(value, offset):
"""Set a bit at offset position
:param value: value of integer where set the bit
:type value: int
:param offset: bit offset (0 is lsb)
:type offset: int
:returns: value of integer with bit set
:rtype: int
"""
mask = 1 << offset
return int(value | mask)
import re
import socket
import select
import struct
import random
import sys
import time
class ModbusClient:
"""Modbus TCP client"""
def __init__(self, host=None, port=None, unit_id=None, timeout=None,
debug=None, auto_open=None, auto_close=None):
"""Constructor
Modbus server params (host, port) can be set here or with host(), port()
functions. Same for debug option.
Use functions avoid to launch ValueError except if params is incorrect.
:param host: hostname or IPv4/IPv6 address server address (optional)
:type host: str
:param port: TCP port number (optional)
:type port: int
:param unit_id: unit ID (optional)
:type unit_id: int
:param timeout: socket timeout in seconds (optional)
:type timeout: float
:param debug: debug state (optional)
:type debug: bool
:param auto_open: auto TCP connect (optional)
:type auto_open: bool
:param auto_close: auto TCP close (optional)
:type auto_close: bool
:return: Object ModbusClient
:rtype: ModbusClient
:raises ValueError: if a set parameter value is incorrect
"""
# object vars
self.__hostname = '127.0.0.1'
self.__port = MODBUS_PORT
self.__unit_id = 1
self.__timeout = 30.0 # socket timeout
self.__debug = False # debug trace on/off
self.__auto_open = False # auto TCP connect
self.__auto_close = False # auto TCP close
self.__mode = MODBUS_TCP # default is Modbus/TCP
self.__sock = None # socket handle
self.__connection = None # client handle
self.__hd_tr_id = 0 # store transaction ID
self.__version = VERSION # version number
self.__last_error = MB_NO_ERR # last error code
self.__last_except = 0 # last expect code
# set host
if host:
if not self.host(host):
raise ValueError('host value error')
# set port
if port:
if not self.port(port):
raise ValueError('port value error')
# set unit_id
if unit_id is not None:
if self.unit_id(unit_id) is None:
raise ValueError('unit_id value error')
# set timeout
if timeout:
if not self.timeout(timeout):
raise ValueError('timeout value error')
# set debug
if debug:
if not self.debug(debug):
raise ValueError('debug value error')
# set auto_open
if auto_open:
if not self.auto_open(auto_open):
raise ValueError('auto_open value error')
# set auto_close
if auto_close:
if not self.auto_close(auto_close):
raise ValueError('auto_close value error')
def version(self):
"""Get package version
:return: current version of the package (like "0.0.1")
:rtype: str
"""
return self.__version
def last_error(self):
"""Get last error code
:return: last error code
:rtype: int
"""
return self.__last_error
def last_except(self):
"""Get last except code
:return: last except code
:rtype: int
"""
return self.__last_except
def host(self, hostname=None):
"""Get or set host (IPv4/IPv6 or hostname like 'plc.domain.net')
:param hostname: hostname or IPv4/IPv6 address or None for get value
:type hostname: str or None
:returns: hostname or None if set fail
:rtype: str or None
"""
if (hostname is None) or (hostname is self.__hostname):
return self.__hostname
# when hostname change ensure old socket is close
self.close()
# IPv4 ?
try:
socket.inet_pton(socket.AF_INET, hostname)
self.__hostname = hostname
return self.__hostname
except socket.error:
pass
# IPv6 ?
try:
socket.inet_pton(socket.AF_INET6, hostname)
self.__hostname = hostname
return self.__hostname
except socket.error:
pass
# DNS name ?
if re.match('^[a-z][a-z0-9\.\-]+$', hostname):
self.__hostname = hostname
return self.__hostname
else:
return None
def port(self, port=None):
"""Get or set TCP port
:param port: TCP port number or None for get value
:type port: int or None
:returns: TCP port or None if set fail
:rtype: int or None
"""
if (port is None) or (port is self.__port):
return self.__port
# when port change ensure old socket is close
self.close()
# valid port ?
if 0 < int(port) < 65536:
self.__port = int(port)
return self.__port
else:
return None
def unit_id(self, unit_id=None):
"""Get or set unit ID field
:param unit_id: unit ID (0 to 255) or None for get value
:type unit_id: int or None
:returns: unit ID or None if set fail
:rtype: int or None
"""
if unit_id is None:
return self.__unit_id
if 0 <= int(unit_id) < 256:
self.__unit_id = int(unit_id)
return self.__unit_id
else:
return None
def timeout(self, timeout=None):
"""Get or set timeout field
:param timeout: socket timeout in seconds or None for get value
:type timeout: float or None
:returns: timeout or None if set fail
:rtype: float or None
"""
if timeout is None:
return self.__timeout
if 0 < float(timeout) < 3600:
self.__timeout = float(timeout)
return self.__timeout
else:
return None
def debug(self, state=None):
"""Get or set debug mode
:param state: debug state or None for get value
:type state: bool or None
:returns: debug state or None if set fail
:rtype: bool or None
"""
if state is None:
return self.__debug
self.__debug = bool(state)
return self.__debug
def auto_open(self, state=None):
"""Get or set automatic TCP connect mode
:param state: auto_open state or None for get value
:type state: bool or None
:returns: auto_open state or None if set fail
:rtype: bool or None
"""
if state is None:
return self.__auto_open
self.__auto_open = bool(state)
return self.__auto_open
def auto_close(self, state=None):
"""Get or set automatic TCP close mode (after each request)
:param state: auto_close state or None for get value
:type state: bool or None
:returns: auto_close state or None if set fail
:rtype: bool or None
"""
if state is None:
return self.__auto_close
self.__auto_close = bool(state)
return self.__auto_close
def mode(self, mode=None):
"""Get or set modbus mode (TCP or RTU)
:param mode: mode (MODBUS_TCP/MODBUS_RTU) to set or None for get value
:type mode: int
:returns: mode or None if set fail
:rtype: int or None
"""
if mode is None:
return self.__mode
if mode == MODBUS_TCP or mode == MODBUS_RTU:
self.__mode = mode
return self.__mode
else:
return None
def open(self):
"""Connect to modbus server (open TCP connection)
:returns: connect status (True if open)
:rtype: bool
"""
# restart TCP if already open
if self.is_open():
self.close()
# init socket and connect
# list available sockets on the target host/port
# AF_xxx : AF_INET -> IPv4, AF_INET6 -> IPv6,
# AF_UNSPEC -> IPv6 (priority on some system) or 4
# list available socket on target host
for res in socket.getaddrinfo(self.__hostname, self.__port,
socket.AF_UNSPEC, socket.SOCK_STREAM):
af, sock_type, proto, canon_name, sa = res
try:
self.__sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
except socket.error:
self.__sock = None
print >>sys.stderr, 'error opening server socket'
continue
try:
self.__sock.settimeout(self.__timeout)
server_address = (self.__hostname , self.__port)
self.__sock.bind(server_address)
self.__sock.listen(1)
# Wait for a connection
print >>sys.stderr, 'waiting for a connection'
self.__connection, client_address = self.__sock.accept()
except socket.error:
self.__sock.close()
self.__sock = None
print >>sys.stderr, 'error setting server socket to listen mode'
continue
break
# check connect status
if self.__sock is not None:
return True
else:
self.__last_error = MB_CONNECT_ERR
self.__debug_msg('connect error')
return False
def is_open(self):
"""Get status of TCP connection
:returns: status (True for open)
:rtype: bool
"""
return self.__sock is not None
def close(self):
"""Close TCP connection
:returns: close status (True for close/None if already close)
:rtype: bool or None
"""
if self.__sock:
self.__sock.close()
self.__sock = None
return True
else:
return None
def read_coils(self, bit_addr, bit_nb=1):
"""Modbus function READ_COILS (0x01)
:param bit_addr: bit address (0 to 65535)
:type bit_addr: int
:param bit_nb: number of bits to read (1 to 2000)
:type bit_nb: int
:returns: bits list or None if error
:rtype: list of bool or None
"""
# check params
if not (0 <= int(bit_addr) <= 65535):
self.__debug_msg('read_coils(): bit_addr out of range')
return None
if not (1 <= int(bit_nb) <= 2000):
self.__debug_msg('read_coils(): bit_nb out of range')
return None
if (int(bit_addr) + int(bit_nb)) > 65536:
self.__debug_msg('read_coils(): read after ad 65535')
return None
# build frame
tx_buffer = self._mbus_frame(READ_COILS, struct.pack('>HH', bit_addr, bit_nb))
# send request
s_send = self._send_mbus(tx_buffer)
# check error
if not s_send:
return None
# receive
f_body = self._recv_mbus()
# check error
if not f_body:
return None
# check min frame body size
if len(f_body) < 2:
self.__last_error = MB_RECV_ERR
self.__debug_msg('read_coils(): rx frame under min size')
self.close()
return None
# extract field "byte count"
rx_byte_count = struct.unpack("B", f_body[0:1])[0]
# frame with bits value -> bits[] list
f_bits = bytearray(f_body[1:])
# check rx_byte_count: match nb of bits request and check buffer size
if not ((rx_byte_count == int((bit_nb + 7) / 8)) and
(rx_byte_count == len(f_bits))):
self.__last_error = MB_RECV_ERR
self.__debug_msg('read_coils(): rx byte count mismatch')
self.close()
return None
# allocate a bit_nb size list
bits = [None] * bit_nb
# fill bits list with bit items
for i, item in enumerate(bits):
bits[i] = bool(f_bits[int(i / 8)] >> (i % 8) & 0x01)
# return bits list
return bits
def read_discrete_inputs(self, bit_addr, bit_nb=1):
"""Modbus function READ_DISCRETE_INPUTS (0x02)
:param bit_addr: bit address (0 to 65535)
:type bit_addr: int
:param bit_nb: number of bits to read (1 to 2000)
:type bit_nb: int
:returns: bits list or None if error
:rtype: list of bool or None
"""
# check params
if not (0 <= int(bit_addr) <= 65535):
self.__debug_msg('read_discrete_inputs(): bit_addr out of range')
return None
if not (1 <= int(bit_nb) <= 2000):
self.__debug_msg('read_discrete_inputs(): bit_nb out of range')
return None
if (int(bit_addr) + int(bit_nb)) > 65536:
self.__debug_msg('read_discrete_inputs(): read after ad 65535')
return None
# build frame
tx_buffer = self._mbus_frame(READ_DISCRETE_INPUTS, struct.pack('>HH', bit_addr, bit_nb))
# send request
s_send = self._send_mbus(tx_buffer)
# check error
if not s_send:
return None
# receive
f_body = self._recv_mbus()
# check error
if not f_body:
return None
# check min frame body size
if len(f_body) < 2:
self.__last_error = MB_RECV_ERR
self.__debug_msg('read_discrete_inputs(): rx frame under min size')
self.close()
return None
# extract field "byte count"
rx_byte_count = struct.unpack("B", f_body[0:1])[0]
# frame with bits value -> bits[] list
f_bits = bytearray(f_body[1:])
# check rx_byte_count: match nb of bits request and check buffer size
if not ((rx_byte_count == int((bit_nb + 7) / 8)) and
(rx_byte_count == len(f_bits))):
self.__last_error = MB_RECV_ERR
self.__debug_msg('read_discrete_inputs(): rx byte count mismatch')
self.close()
return None
# allocate a bit_nb size list
bits = [None] * bit_nb
# fill bits list with bit items
for i, item in enumerate(bits):
bits[i] = bool(f_bits[int(i / 8)] >> (i % 8) & 0x01)
# return bits list
return bits
def read_holding_registers(self, reg_addr, reg_nb=1):
"""Modbus function READ_HOLDING_REGISTERS (0x03)
:param reg_addr: register address (0 to 65535)
:type reg_addr: int
:param reg_nb: number of registers to read (1 to 125)
:type reg_nb: int
:returns: registers list or None if fail
:rtype: list of int or None
"""
# check params
if not (0 <= int(reg_addr) <= 65535):
self.__debug_msg('read_holding_registers(): reg_addr out of range')
return None
if not (1 <= int(reg_nb) <= 125):
self.__debug_msg('read_holding_registers(): reg_nb out of range')
return None
if (int(reg_addr) + int(reg_nb)) > 65536:
self.__debug_msg('read_holding_registers(): read after ad 65535')
return None
# build frame
tx_buffer = self._mbus_frame(READ_HOLDING_REGISTERS, struct.pack('>HH', reg_addr, reg_nb))
# send request
s_send = self._send_mbus(tx_buffer)
# check error
if not s_send:
return None
# receive
f_body = self._recv_mbus()
# check error
if not f_body:
return None
# check min frame body size
if len(f_body) < 2:
self.__last_error = MB_RECV_ERR
self.__debug_msg('read_holding_registers(): rx frame under min size')
self.close()
return None
# extract field "byte count"
rx_byte_count = struct.unpack('B', f_body[0:1])[0]
# frame with regs value
f_regs = f_body[1:]
# check rx_byte_count: match nb of bits request and check buffer size
if not ((rx_byte_count == 2 * reg_nb) and
(rx_byte_count == len(f_regs))):
self.__last_error = MB_RECV_ERR
self.__debug_msg('read_holding_registers(): rx byte count mismatch')
self.close()
return None
# allocate a reg_nb size list
registers = [None] * reg_nb
# fill registers list with register items
for i, item in enumerate(registers):
registers[i] = struct.unpack('>H', f_regs[i * 2:i * 2 + 2])[0]
# return registers list
return registers
def read_input_registers(self, reg_addr, reg_nb=1):
"""Modbus function READ_INPUT_REGISTERS (0x04)
:param reg_addr: register address (0 to 65535)
:type reg_addr: int
:param reg_nb: number of registers to read (1 to 125)
:type reg_nb: int
:returns: registers list or None if fail
:rtype: list of int or None
"""
# check params
if not (0x0000 <= int(reg_addr) <= 0xffff):
self.__debug_msg('read_input_registers(): reg_addr out of range')
return None
if not (0x0001 <= int(reg_nb) <= 0x007d):
self.__debug_msg('read_input_registers(): reg_nb out of range')
return None
if (int(reg_addr) + int(reg_nb)) > 0x10000:
self.__debug_msg('read_input_registers(): read after ad 65535')
return None
# build frame
tx_buffer = self._mbus_frame(READ_INPUT_REGISTERS, struct.pack('>HH', reg_addr, reg_nb))
# send request
s_send = self._send_mbus(tx_buffer)
# check error
if not s_send:
return None
# receive
f_body = self._recv_mbus()
# check error
if not f_body:
return None
# check min frame body size
if len(f_body) < 2:
self.__last_error = MB_RECV_ERR
self.__debug_msg('read_input_registers(): rx frame under min size')
self.close()
return None
# extract field "byte count"
rx_byte_count = struct.unpack('B', f_body[0:1])[0]
# frame with regs value
f_regs = f_body[1:]
# check rx_byte_count: match nb of bits request and check buffer size
if not ((rx_byte_count == 2 * reg_nb) and
(rx_byte_count == len(f_regs))):
self.__last_error = MB_RECV_ERR
self.__debug_msg('read_input_registers(): rx byte count mismatch')
self.close()
return None
# allocate a reg_nb size list
registers = [None] * reg_nb
# fill registers list with register items
for i, item in enumerate(registers):
registers[i] = struct.unpack('>H', f_regs[i * 2:i * 2 + 2])[0]
# return registers list
return registers
def write_single_coil(self, bit_addr, bit_value):
"""Modbus function WRITE_SINGLE_COIL (0x05)
:param bit_addr: bit address (0 to 65535)
:type bit_addr: int
:param bit_value: bit value to write
:type bit_value: bool
:returns: True if write ok or None if fail
:rtype: bool or None
"""
# check params
if not (0 <= int(bit_addr) <= 65535):
self.__debug_msg('write_single_coil(): bit_addr out of range')
return None
# build frame
bit_value = 0xFF if bit_value else 0x00
tx_buffer = self._mbus_frame(WRITE_SINGLE_COIL, struct.pack('>HBB', bit_addr, bit_value, 0))
# send request
s_send = self._send_mbus(tx_buffer)
# check error
if not s_send:
return None
# receive
f_body = self._recv_mbus()
# check error
if not f_body:
return None
# check fix frame size
if len(f_body) != 4:
self.__last_error = MB_RECV_ERR
self.__debug_msg('write_single_coil(): rx frame size error')
self.close()
return None
# register extract
(rx_bit_addr, rx_bit_value, rx_padding) = struct.unpack('>HBB', f_body[:4])
# check bit write
is_ok = (rx_bit_addr == bit_addr) and (rx_bit_value == bit_value)
return True if is_ok else None
def write_single_register(self, reg_addr, reg_value):
"""Modbus function WRITE_SINGLE_REGISTER (0x06)
:param reg_addr: register address (0 to 65535)
:type reg_addr: int
:param reg_value: register value to write
:type reg_value: int
:returns: True if write ok or None if fail
:rtype: bool or None
"""
# check params
if not (0 <= int(reg_addr) <= 65535):
self.__debug_msg('write_single_register(): reg_addr out of range')
return None
if not (0 <= int(reg_value) <= 65535):
self.__debug_msg('write_single_register(): reg_value out of range')
return None
# build frame
tx_buffer = self._mbus_frame(WRITE_SINGLE_REGISTER,
struct.pack('>HH', reg_addr, reg_value))
# send request
s_send = self._send_mbus(tx_buffer)
# check error
if not s_send:
return None
# receive
f_body = self._recv_mbus()
# check error
if not f_body:
return None
# check fix frame size
if len(f_body) != 4:
self.__last_error = MB_RECV_ERR
self.__debug_msg('write_single_register(): rx frame size error')
self.close()
return None
# register extract
rx_reg_addr, rx_reg_value = struct.unpack('>HH', f_body)
# check register write
is_ok = (rx_reg_addr == reg_addr) and (rx_reg_value == reg_value)
return True if is_ok else None
def write_multiple_coils(self, bits_addr, bits_value):
"""Modbus function WRITE_MULTIPLE_COILS (0x0F)
:param bits_addr: bits address (0 to 65535)
:type bits_addr: int
:param bits_value: bits values to write
:type bits_value: list
:returns: True if write ok or None if fail
:rtype: bool or None
"""
# number of bits to write
bits_nb = len(bits_value)
# check params
if not (0x0000 <= int(bits_addr) <= 0xffff):
self.__debug_msg('write_multiple_coils(): bits_addr out of range')
return None
if not (0x0001 <= int(bits_nb) <= 0x07b0):
self.__debug_msg('write_multiple_coils(): number of bits out of range')
return None
if (int(bits_addr) + int(bits_nb)) > 0x10000:
self.__debug_msg('write_multiple_coils(): write after ad 65535')
return None
# build frame
# format bits value string
bits_val_str = b''
# allocate bytes list
b_size = int(bits_nb / 8)
b_size += 1 if (bits_nb % 8) else 0
bytes_l = [0] * b_size
# populate bytes list with bits_value
for i, item in enumerate(bits_value):
if item:
byte_i = int(i/8)
bytes_l[byte_i] = set_bit(bytes_l[byte_i], i % 8)
# format bits_val_str
for byte in bytes_l:
bits_val_str += struct.pack('B', byte)
bytes_nb = len(bits_val_str)
# format modbus frame body
body = struct.pack('>HHB', bits_addr, bits_nb, bytes_nb) + bits_val_str
tx_buffer = self._mbus_frame(WRITE_MULTIPLE_COILS, body)
# send request
s_send = self._send_mbus(tx_buffer)
# check error
if not s_send:
return None
# receive
f_body = self._recv_mbus()
# check error
if not f_body:
return None
# check fix frame size
if len(f_body) != 4:
self.__last_error = MB_RECV_ERR
self.__debug_msg('write_multiple_coils(): rx frame size error')
self.close()
return None
# register extract
(rx_bit_addr, rx_bit_nb) = struct.unpack('>HH', f_body[:4])
# check regs write
is_ok = (rx_bit_addr == bits_addr)
return True if is_ok else None
def write_multiple_registers(self, regs_addr, regs_value):
"""Modbus function WRITE_MULTIPLE_REGISTERS (0x10)
:param regs_addr: registers address (0 to 65535)
:type regs_addr: int
:param regs_value: registers values to write
:type regs_value: list
:returns: True if write ok or None if fail
:rtype: bool or None
"""
# number of registers to write
regs_nb = len(regs_value)
# check params
if not (0x0000 <= int(regs_addr) <= 0xffff):
self.__debug_msg('write_multiple_registers(): regs_addr out of range')
return None
if not (0x0001 <= int(regs_nb) <= 0x007b):
self.__debug_msg('write_multiple_registers(): number of registers out of range')
return None
if (int(regs_addr) + int(regs_nb)) > 0x10000:
self.__debug_msg('write_multiple_registers(): write after ad 65535')
return None
# build frame
# format reg value string
regs_val_str = b""
for reg in regs_value:
# check current register value
if not (0 <= int(reg) <= 0xffff):
self.__debug_msg('write_multiple_registers(): regs_value out of range')
return None
# pack register for build frame
regs_val_str += struct.pack('>H', reg)
bytes_nb = len(regs_val_str)
# format modbus frame body
body = struct.pack('>HHB', regs_addr, regs_nb, bytes_nb) + regs_val_str
tx_buffer = self._mbus_frame(WRITE_MULTIPLE_REGISTERS, body)
# send request
s_send = self._send_mbus(tx_buffer)
# check error
if not s_send:
return None
# receive
f_body = self._recv_mbus()
# check error
if not f_body:
return None
# check fix frame size
if len(f_body) != 4:
self.__last_error = MB_RECV_ERR
self.__debug_msg('write_multiple_registers(): rx frame size error')
self.close()
return None
# register extract
(rx_reg_addr, rx_reg_nb) = struct.unpack('>HH', f_body[:4])
# check regs write
is_ok = (rx_reg_addr == regs_addr)
return True if is_ok else None
def _can_read(self):
"""Wait data available for socket read
:returns: True if data available or None if timeout or socket error
:rtype: bool or None
"""
if self.__sock is None:
return None
if select.select([self.__sock], [], [], self.__timeout)[0]:
return True
else:
self.__last_error = MB_TIMEOUT_ERR
self.__debug_msg('timeout error')
self.close()
return None
def _send(self, data):
"""Send data over current socket
:param data: registers value to write
:type data: str (Python2) or class bytes (Python3)
:returns: True if send ok or None if error
:rtype: bool or None
"""
# check link
if self.__sock is None:
self.__debug_msg('call _send on close socket')
return None
# send
data_l = len(data)
try:
send_l = self.__connection.sendall(data)
except socket.error:
send_l = None
# handle send error
if (send_l is None) or (send_l != data_l):
self.__last_error = MB_SEND_ERR
self.__debug_msg('_send error')
self.close()
return None
else:
return send_l
def _recv(self, max_size):
"""Receive data over current socket
:param max_size: number of bytes to receive
:type max_size: int
:returns: receive data or None if error
:rtype: str (Python2) or class bytes (Python3) or None
"""
# wait for read
if not self._can_read():
self.close()
return None
# recv
try:
r_buffer = self.__connection.recv(max_size)
except socket.error:
r_buffer = None
# handle recv error
if not r_buffer:
self.__last_error = MB_RECV_ERR
self.__debug_msg('_recv error')
self.close()
return None
return r_buffer
def _send_mbus(self, frame):
"""Send modbus frame
:param frame: modbus frame to send (with MBAP for TCP/CRC for RTU)
:type frame: str (Python2) or class bytes (Python3)
:returns: number of bytes send or None if error
:rtype: int or None
"""
# for auto_open mode, check TCP and open if need
if self.__auto_open and not self.is_open():
self.open()
# send request
bytes_send = self._send(frame)
if bytes_send:
if self.__debug:
self._pretty_dump('Tx', frame)
return bytes_send
else:
return None
def _recv_mbus(self):
"""Receive a modbus frame
:returns: modbus frame body or None if error
:rtype: str (Python2) or class bytes (Python3) or None
"""
# receive
# modbus TCP receive
if self.__mode == MODBUS_TCP:
# 7 bytes header (mbap)
rx_buffer = self._recv(7)
# check recv
if not (rx_buffer and len(rx_buffer) == 7):
self.__last_error = MB_RECV_ERR
self.__debug_msg('_recv MBAP error')
self.close()
return None
rx_frame = rx_buffer
# decode header
(rx_hd_tr_id, rx_hd_pr_id,
rx_hd_length, rx_hd_unit_id) = struct.unpack('>HHHB', rx_frame)
# check header
if not ((rx_hd_tr_id == self.__hd_tr_id) and
(rx_hd_pr_id == 0) and
(rx_hd_length < 256) and
(rx_hd_unit_id == self.__unit_id)):
self.__last_error = MB_RECV_ERR
self.__debug_msg('MBAP format error')
if self.__debug:
rx_frame += self._recv(rx_hd_length - 1)
self._pretty_dump('Rx', rx_frame)
self.close()
return None
# end of frame
rx_buffer = self._recv(rx_hd_length - 1)
if not (rx_buffer and
(len(rx_buffer) == rx_hd_length - 1) and
(len(rx_buffer) >= 2)):
self.__last_error = MB_RECV_ERR
self.__debug_msg('_recv frame body error')
self.close()
return None
rx_frame += rx_buffer
# dump frame
if self.__debug:
self._pretty_dump('Rx', rx_frame)
# body decode
rx_bd_fc = struct.unpack('B', rx_buffer[0:1])[0]
f_body = rx_buffer[1:]
# modbus RTU receive
elif self.__mode == MODBUS_RTU:
# receive modbus RTU frame (max size is 256 bytes)
rx_buffer = self._recv(256)
# on _recv error
if not rx_buffer:
return None
rx_frame = rx_buffer
# dump frame
if self.__debug:
self._pretty_dump('Rx', rx_frame)
# RTU frame min size is 5 bytes
if len(rx_buffer) < 5:
self.__last_error = MB_RECV_ERR
self.__debug_msg('short frame error')
self.close()
return None
# check CRC
if not self._crc_is_ok(rx_frame):
self.__last_error = MB_CRC_ERR
self.__debug_msg('CRC error')
self.close()
return None
# body decode
(rx_unit_id, rx_bd_fc) = struct.unpack("BB", rx_frame[:2])
# check
if not (rx_unit_id == self.__unit_id):
self.__last_error = MB_RECV_ERR
self.__debug_msg('unit ID mismatch error')
self.close()
return None
# format f_body: remove unit ID, function code and CRC 2 last bytes
f_body = rx_frame[2:-2]
# for auto_close mode, close socket after each request
if self.__auto_close:
self.close()
# check except
if rx_bd_fc > 0x80:
# except code
exp_code = struct.unpack('B', f_body[0:1])[0]
self.__last_error = MB_EXCEPT_ERR
self.__last_except = exp_code
self.__debug_msg('except (code ' + str(exp_code) + ')')
return None
else:
# return
return f_body
def _mbus_frame(self, fc, body):
"""Build modbus frame (add MBAP for Modbus/TCP, slave AD + CRC for RTU)
:param fc: modbus function code
:type fc: int
:param body: modbus frame body
:type body: str (Python2) or class bytes (Python3)
:returns: modbus frame
:rtype: str (Python2) or class bytes (Python3)
"""
# build frame body
f_body = struct.pack('B', fc) + body
# modbus/TCP
if self.__mode == MODBUS_TCP:
# build frame ModBus Application Protocol header (mbap)
self.__hd_tr_id = random.randint(0, 65535)
tx_hd_pr_id = 0
tx_hd_length = len(f_body) + 1
f_mbap = struct.pack('>HHHB', self.__hd_tr_id, tx_hd_pr_id,
tx_hd_length, self.__unit_id)
return f_mbap + f_body
# modbus RTU
elif self.__mode == MODBUS_RTU:
# format [slave addr(unit_id)]frame_body[CRC16]
slave_ad = struct.pack('B', self.__unit_id)
return self._add_crc(slave_ad + f_body)
def _pretty_dump(self, label, data):
"""Print modbus/TCP frame ('[header]body')
or RTU ('body[CRC]') on stdout
:param label: modbus function code
:type label: str
:param data: modbus frame
:type data: str (Python2) or class bytes (Python3)
"""
# split data string items to a list of hex value
dump = ['%02X' % c for c in bytearray(data)]
# format for TCP or RTU
if self.__mode == MODBUS_TCP:
if len(dump) > 6:
# [MBAP] ...
dump[0] = '[' + dump[0]
dump[6] += ']'
elif self.__mode == MODBUS_RTU:
if len(dump) > 4:
# ... [CRC]
dump[-2] = '[' + dump[-2]
dump[-1] += ']'
# print result
print(label)
s = ''
for i in dump:
s += i + ' '
print(s)
def _add_crc(self, frame):
"""Add CRC to modbus frame (for RTU mode)
:param frame: modbus RTU frame
:type frame: str (Python2) or class bytes (Python3)
:returns: modbus RTU frame with CRC
:rtype: str (Python2) or class bytes (Python3)
"""
crc = struct.pack('<H', crc16(frame))
return frame + crc
def _crc_is_ok(self, frame):
"""Check the CRC of modbus RTU frame
:param frame: modbus RTU frame with CRC
:type frame: str (Python2) or class bytes (Python3)
:returns: status CRC (True for valid)
:rtype: bool
"""
return crc16(frame) == 0
def __debug_msg(self, msg):
"""Print debug message if debug mode is on
:param msg: debug message
:type msg: str
"""
if self.__debug:
print(msg)
c = ModbusClient(host="127.0.0.1" , port=502)
c.open();
while True:
# read 10 registers at address 0, store result in regs list
regs = c.read_input_registers(0, 100)
# if success display registers
if regs:
print("reg ad #0 to 100: "+str(regs))
# sleep 2s before next polling
time.sleep(2)
Who is online
Users browsing this forum: Google [Bot] and 1 guest