Synchronous Client Extended ExampleΒΆ

#!/usr/bin/env python
'''
Pymodbus Synchronous Client Extended Examples
--------------------------------------------------------------------------

The following is an example of how to use the synchronous modbus client
implementation from pymodbus to perform the extended portions of the
modbus protocol.
'''
#---------------------------------------------------------------------------# 
# import the various server implementations
#---------------------------------------------------------------------------# 
from pymodbus.client.sync import ModbusTcpClient as ModbusClient
#from pymodbus.client.sync import ModbusUdpClient as ModbusClient
#from pymodbus.client.sync import ModbusSerialClient as ModbusClient

#---------------------------------------------------------------------------# 
# configure the client logging
#---------------------------------------------------------------------------# 
import logging
logging.basicConfig()
log = logging.getLogger()
log.setLevel(logging.DEBUG)

#---------------------------------------------------------------------------# 
# choose the client you want
#---------------------------------------------------------------------------# 
# make sure to start an implementation to hit against. For this
# you can use an existing device, the reference implementation in the tools
# directory, or start a pymodbus server.
#
# It should be noted that you can supply an ipv4 or an ipv6 host address for
# both the UDP and TCP clients.
#---------------------------------------------------------------------------# 
client = ModbusClient('127.0.0.1')
client.connect()

#---------------------------------------------------------------------------# 
# import the extended messages to perform
#---------------------------------------------------------------------------# 
from pymodbus.diag_message import *
from pymodbus.file_message import *
from pymodbus.other_message import *
from pymodbus.mei_message import *

#---------------------------------------------------------------------------# 
# extra requests
#---------------------------------------------------------------------------# 
# If you are performing a request that is not available in the client
# mixin, you have to perform the request like this instead::
#
# from pymodbus.diag_message import ClearCountersRequest
# from pymodbus.diag_message import ClearCountersResponse
#
# request  = ClearCountersRequest()
# response = client.execute(request)
# if isinstance(response, ClearCountersResponse):
#     ... do something with the response
#
#
# What follows is a listing of all the supported methods. Feel free to
# comment, uncomment, or modify each result set to match with your reference.
#---------------------------------------------------------------------------# 

#---------------------------------------------------------------------------# 
# information requests
#---------------------------------------------------------------------------# 
rq = ReadDeviceInformationRequest()
rr = client.execute(rq)
#assert(rr == None)                             # not supported by reference
assert(rr.function_code < 0x80)                 # test that we are not an error
assert(rr.information[0] == 'proconX Pty Ltd')  # test the vendor name
assert(rr.information[1] == 'FT-MBSV')          # test the product code
assert(rr.information[2] == 'EXPERIMENTAL')     # test the code revision

rq = ReportSlaveIdRequest()
rr = client.execute(rq)
assert(rr == None)                              # not supported by reference
#assert(rr.function_code < 0x80)                # test that we are not an error
#assert(rr.identifier  == 0x00)                 # test the slave identifier
#assert(rr.status  == 0x00)                     # test that the status is ok

rq = ReadExceptionStatusRequest()
rr = client.execute(rq)
#assert(rr == None)                             # not supported by reference
assert(rr.function_code < 0x80)                 # test that we are not an error
assert(rr.status == 0x55)                       # test the status code

rq = GetCommEventCounterRequest()
rr = client.execute(rq)
assert(rr == None)                              # not supported by reference
#assert(rr.function_code < 0x80)                # test that we are not an error
#assert(rr.status == True)                      # test the status code
#assert(rr.count == 0x00)                       # test the status code

rq = GetCommEventLogRequest()
rr = client.execute(rq)
#assert(rr == None)                             # not supported by reference
#assert(rr.function_code < 0x80)                # test that we are not an error
#assert(rr.status == True)                      # test the status code
#assert(rr.event_count == 0x00)                 # test the number of events
#assert(rr.message_count == 0x00)               # test the number of messages
#assert(len(rr.events) == 0x00)                 # test the number of events

#---------------------------------------------------------------------------# 
# diagnostic requests
#---------------------------------------------------------------------------# 
rq = ReturnQueryDataRequest()
rr = client.execute(rq)
assert(rr == None)                             # not supported by reference
#assert(rr.message[0] == 0x0000)               # test the resulting message

rq = RestartCommunicationsOptionRequest()
rr = client.execute(rq)
#assert(rr == None)                            # not supported by reference
#assert(rr.message == 0x0000)                  # test the resulting message

rq = ReturnDiagnosticRegisterRequest()
rr = client.execute(rq)
#assert(rr == None)                            # not supported by reference

rq = ChangeAsciiInputDelimiterRequest()
rr = client.execute(rq)
#assert(rr == None)                            # not supported by reference

rq = ForceListenOnlyModeRequest()
client.execute(rq)                             # does not send a response

rq = ClearCountersRequest()
rr = client.execute(rq)
#assert(rr == None)                            # not supported by reference

rq = ReturnBusCommunicationErrorCountRequest()
rr = client.execute(rq)
#assert(rr == None)                            # not supported by reference

rq = ReturnBusExceptionErrorCountRequest()
rr = client.execute(rq)
#assert(rr == None)                            # not supported by reference

rq = ReturnSlaveMessageCountRequest()
rr = client.execute(rq)
#assert(rr == None)                            # not supported by reference

rq = ReturnSlaveNoResponseCountRequest()
rr = client.execute(rq)
#assert(rr == None)                            # not supported by reference

rq = ReturnSlaveNAKCountRequest()
rr = client.execute(rq)
#assert(rr == None)                            # not supported by reference

rq = ReturnSlaveBusyCountRequest()
rr = client.execute(rq)
#assert(rr == None)                            # not supported by reference

rq = ReturnSlaveBusCharacterOverrunCountRequest()
rr = client.execute(rq)
#assert(rr == None)                            # not supported by reference

rq = ReturnIopOverrunCountRequest()
rr = client.execute(rq)
#assert(rr == None)                            # not supported by reference

rq = ClearOverrunCountRequest()
rr = client.execute(rq)
#assert(rr == None)                            # not supported by reference

rq = GetClearModbusPlusRequest()
rr = client.execute(rq)
#assert(rr == None)                            # not supported by reference

#---------------------------------------------------------------------------# 
# close the client
#---------------------------------------------------------------------------# 
client.close()