Data model and low level protocol.

This commit is contained in:
Mikaël Ates 2013-05-24 17:04:37 +02:00
parent 079eb39eb0
commit fd0ea39922
1 changed files with 290 additions and 0 deletions

290
models.py Normal file
View File

@ -0,0 +1,290 @@
'''
librfid - RFID EPC Class 1 Gen2 / ISO/IEC 18000-6C compliant library
Copyright (C) 2013 Entr'ouvert
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as
published by the Free Software Foundation, either version 3 of the
License, or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
Data model and low level protocol
'''
import utils
from datetime import datetime
from serial import (Serial, EIGHTBITS, PARITY_NONE, STOPBITS_ONE)
from _exceptions import (UnknownReaderTypeException, UnknownReaderException,
BadLengthMessageException, BadCRCException,
CommandResponseCodeMismatchException, UnknownReaderException,
StandardResponseError, UnknownResponseErrorType)
from commands_standard import COMMANDS as STANDARD_COMMANDS
from errors_standard import ERRORS as STANDARD_ERRORS
from reader_configs import READERS
class RFIDReader():
'''
RFIDReader Class
'''
def __init__(self, *args, **kwargs):
self.name = kwargs.pop('name', 'URL-W41-A-U04-5AA')
params, commands, errors = READERS[self.name]
self.commands = commands
self.errors = errors
self.reader_type = params.get('reader_type', 'SERIAL')
if self.reader_type == 'SERIAL':
self.port = params.get('port', '/dev/ttyUSB0')
self.baudrate = params.get('baudrate', 115200)
self.bytesize = params.get('bytesize', EIGHTBITS)
self.parity = params.get('parity', PARITY_NONE)
self.stopbits = params.get('stopbits', STOPBITS_ONE)
self.timeout = params.get('timeout', 2)
self.xonxoff = params.get('xonxoff', False)
self.rtscts = params.get('rtscts', False)
self.writeTimeout = params.get('writeTimeout', None)
self.dsrdtr = params.get('dsrdtr', False)
self.interCharTimeout = params.get('interCharTimeout', None)
else:
raise UnknownReaderTypeException()
def get_connection(self):
'''
Get connection handler on reader
'''
if self.reader_type == 'SERIAL':
return Serial(port=self.port, baudrate=self.baudrate,
bytesize=self.bytesize, parity=self.parity,
stopbits=self.stopbits, timeout=self.timeout,
xonxoff=self.xonxoff, rtscts=self.rtscts,
writeTimeout=self.writeTimeout, dsrdtr=self.dsrdtr,
interCharTimeout=self.interCharTimeout)
else:
raise UnknownReaderTypeException()
def send(self, command):
'''
Send command and read response
'''
try:
connection = self.get_connection()
message = command.get_message()
connection.flush()
command.time = datetime.today()
connection.write(message)
response = ''
while(1):
'''First byte reading, timeout is 2s.
Then reading waits max 0.001s between each byte
and after the last byte.'''
read = connection.read(1)
if not read:
break
connection.timeout = 0.001
response += read
end = datetime.today()
connection.close()
response = Response(command, response, reader=self)
response.time = end
return response
except Exception, err:
print str(err)
return None
def send_only(self, command):
'''
Send command without reading response
'''
try:
connection = self.get_connection()
message = command.get_message()
connection.flush()
connection.write(message)
connection.close()
return True
except:
return False
class Command():
'''
Command encapsulated in a message
build_message() does the encapsulation
Command fields:
rfu_cmd: 1 byte
type_cmd: 1 byte
code_cmd: 2 bytes
reserved: 2 bytes AAh 55h
data_length (Lout): 2 bytes
data : Lout bytes
'''
def __init__(self, *args, **kwargs):
self.command = kwargs.pop('command', '')
# Message parameters
self.crc_type = kwargs.pop('crc_type', 'CRC-16-CCITT')
self.mode = kwargs.pop('mode', 'unsecure')
self.address = kwargs.pop('mode', 'rs232')
self.command_name = kwargs.pop('command_name', '')
self.time = None
if self.command_name:
self.rfu = '\x00'
self.type_cmd = STANDARD_COMMANDS[self.command_name]['type']
self.code_cmd = STANDARD_COMMANDS[self.command_name]['code']
self.reserved = '\xaa\x55'
self.data = kwargs.pop('data', '')
self.lout = utils.get_length_on_2_bytes(self.data)
self.command = self.rfu + self.type_cmd + self.code_cmd + \
self.reserved + self.lout + self.data
def set_command(self, string):
self.command = string
def build_message(self):
self.sof = '\x02'
self.length = utils.get_length_on_2_bytes(self.command)
self.ctrl_addr = '\x00' # RS232 only
self.ctrl_mode = '\x00' # Unsecure mode
message = self.length + self.ctrl_addr + self.ctrl_mode + self.command
self.crc = utils.get_crc_16_CCITT(message)
self.message = self.sof + message + self.crc
def get_message(self):
self.build_message()
return self.message
def pretty_print(self):
self.build_message()
display = self.message[:5].encode('hex') + ' ' \
+ self.message[5:-2].encode('hex') + ' ' \
+ self.message[-2:].encode('hex')
if self.time:
display = str(self.time) + '\t' + display
display = 'COMMAND\t\t' + display
print display
class ReaderCommand(Command):
'''
Command encapsulated in a message
build_message() does the encapsulation
Command fields:
rfu_cmd: 1 byte
type_cmd: 1 byte
code_cmd: 2 bytes
reserved: 2 bytes AAh 55h
data_length (Lout): 2 bytes
data : Lout bytes
'''
def __init__(self, *args, **kwargs):
self.reader = kwargs.pop('reader', '')
if not self.reader:
raise UnknownReaderException('Reader not given to command')
self.command = kwargs.pop('command', '')
# Message parameters
self.crc_type = kwargs.pop('crc_type', 'CRC-16-CCITT')
self.mode = kwargs.pop('mode', 'unsecure')
self.address = kwargs.pop('mode', 'rs232')
self.command_name = kwargs.pop('command_name', '')
self.time = None
if self.command_name:
self.rfu = '\x00'
self.type_cmd = self.reader.commands[self.command_name]['type']
self.code_cmd = self.reader.commands[self.command_name]['code']
self.reserved = '\xaa\x55'
self.data = kwargs.pop('data', '')
self.lout = utils.get_length_on_2_bytes(self.data)
self.command = self.rfu + self.type_cmd + self.code_cmd + \
self.reserved + self.lout + self.data
class Response():
def __init__(self, command, message, reader=None):
self.message = message
if len(message) < 13:
raise BadLengthMessageException(
'Message must be at least 13 byte long (message: %s).'
% self.message.encode('hex'))
self.sof = message[0] # '\x02'
self.message_length = int(message[1:3].encode('hex'), 16)
if self.message_length != (len(message) - 7):
raise BadLengthMessageException(
'Length field says %d and the length computed '
'is %d (message: %s).'
% (self.message_length, len(message) - 7,
self.message.encode('hex')))
self.ctrl_addr = message[3] # '\x00' # RS232 only
self.ctrl_mode = message[4] # '\x00' # Unsecure mode
self.response = message[5:-2]
self.crc = message[-2:]
str_to_check = self.message[1:-2] # [Len ... Commande]
crc = utils.get_crc_16_CCITT(str_to_check)
if self.crc != crc:
raise BadCRCException('CRC checking failed')
self.code = self.response[0:2]
if command.code_cmd != self.code:
raise CommandResponseCodeMismatchException(
'Command code %s mistmatch with response '
'code %s (message: %s).'
% (command.code_cmd.encode('hex'),
self.code.encode('hex'),
self.message.encode('hex')))
self.lin = int(self.response[2:4].encode('hex'), 16)
self.data = None
if self.lin:
self.data = self.response[4:-2]
if len(self.data) != self.lin:
raise BadLengthMessageException(
'Data length %d mismatch mength field %d (message: %s).'
% (len(self.data), self.lin,
self.message.encode('hex')))
self.status_type = self.response[-2] # Reader 00 or standard 08
'''if command.type_cmd != self.status_type:
raise CommandResponseTypeMismatchException(\
'Command type %s mismatch with response '
'status type %s (message: %s).' \
% (command.type_cmd.encode('hex'),
self.status_type.encode('hex'),
self.message.encode('hex')))'''
self.status_code = self.response[-1]
if self.status_code != '\x00':
if self.status_type == '\x00':
self.reader = reader
if not self.reader:
raise UnknownReaderException('Reader error returned but '
'no reader given to response to interpret error code')
raise ReaderResponseError('%s'
% self.reader.errors[self.status_code])
if self.status_type == '\x08':
raise StandardResponseError('%s'
% STANDARD_ERRORS[self.status_code])
raise UnknownResponseErrorType('Unknown type %s'
% self.status_type.encode('hex'))
self.time = None
def pretty_print(self):
display = None
if not self.message:
display = 'NO RESPONSE'
else:
display = self.message[:5].encode('hex') + ' ' \
+ self.message[5:-2].encode('hex') + ' ' \
+ self.message[-2:].encode('hex')
if self.time:
display = str(self.time) + '\t' + display
display = 'RESPONSE\t' + display
print display