This repository has been archived on 2023-02-21. You can view files and clone it, but cannot push or open issues or pull requests.
librfid/models.py

335 lines
12 KiB
Python

'''
librfid - RFID EPC Class 1 Gen2 / ISO/IEC 18000-6C compliant library
Copyright (C) 2013 Entr'ouvert
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as
published by the Free Software Foundation, either version 3 of the
License, or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
Data model and low level protocol.
The low level protocol currently supported is the 5AA protocol from STID.
'''
from datetime import datetime
from serial import (Serial, EIGHTBITS, PARITY_NONE, STOPBITS_ONE)
import librfid.core_standard
from librfid.utils import get_length_on_2_bytes, get_crc_16_CCITT
from librfid._exceptions import (UnknownReaderTypeException, UnknownReaderException,
BadLengthMessageException, BadCRCException,
CommandResponseCodeMismatchException, UnknownReaderException,
StandardResponseError, UnknownResponseErrorType,
ReaderResponseError)
from librfid.commands_standard import COMMANDS as STANDARD_COMMANDS
from librfid.errors_standard import ERRORS as STANDARD_ERRORS
from librfid.reader_configs import READERS
class RFIDReader():
'''
RFIDReader Class
'''
def __init__(self, *args, **kwargs):
self.name = kwargs.pop('name', 'URL-W41-A-U04-5AA')
params, commands, errors, core = READERS[self.name]
self.commands = commands
self.errors = errors
self.core = core
self.reader_type = params.get('reader_type', 'SERIAL')
if self.reader_type == 'SERIAL':
self.port = params.get('port', '/dev/ttyUSB0')
self.baudrate = params.get('baudrate', 115200)
self.bytesize = params.get('bytesize', EIGHTBITS)
self.parity = params.get('parity', PARITY_NONE)
self.stopbits = params.get('stopbits', STOPBITS_ONE)
self.timeout = params.get('timeout', 5)
self.xonxoff = params.get('xonxoff', False)
self.rtscts = params.get('rtscts', False)
self.writeTimeout = params.get('writeTimeout', None)
self.dsrdtr = params.get('dsrdtr', False)
self.interCharTimeout = params.get('interCharTimeout', None)
else:
raise UnknownReaderTypeException()
def get_connection(self):
'''
Get connection handler on reader
'''
if self.reader_type == 'SERIAL':
return Serial(port=self.port, baudrate=self.baudrate,
bytesize=self.bytesize, parity=self.parity,
stopbits=self.stopbits, timeout=self.timeout,
xonxoff=self.xonxoff, rtscts=self.rtscts,
writeTimeout=self.writeTimeout, dsrdtr=self.dsrdtr,
interCharTimeout=self.interCharTimeout)
else:
raise UnknownReaderTypeException()
def send(self, command):
'''
Send command and read response
'''
try:
connection = self.get_connection()
message = command.get_message()
connection.flush()
command.time = datetime.today()
connection.write(message)
response = ''
while(1):
'''First byte reading, timeout is 2s.
Then reading waits max 0.001s between each byte
and after the last byte.'''
read = connection.read(1)
if not read:
break
connection.timeout = 0.001
response += read
end = datetime.today()
connection.close()
response = Response(command, response, reader=self)
response.time = end
return response
except Exception, err:
print str(err)
return None
def send_only(self, command):
'''
Send command without reading response
'''
try:
connection = self.get_connection()
message = command.get_message()
connection.flush()
connection.write(message)
connection.close()
return True
except:
return False
class Command():
'''
Command encapsulated in a message
build_message() does the encapsulation
Command fields:
rfu_cmd: 1 byte
type_cmd: 1 byte
code_cmd: 2 bytes
reserved: 2 bytes AAh 55h
data_length (Lout): 2 bytes
data : Lout bytes
'''
def __init__(self, *args, **kwargs):
self.command = kwargs.pop('command', '')
# Message parameters
self.crc_type = kwargs.pop('crc_type', 'CRC-16-CCITT')
self.mode = kwargs.pop('mode', 'unsecure')
self.address = kwargs.pop('mode', 'rs232')
self.command_name = kwargs.pop('command_name', '')
self.time = None
self.data = ''
if self.command_name:
self.rfu = '\x00'
self.type_cmd = STANDARD_COMMANDS[self.command_name]['type']
self.code_cmd = STANDARD_COMMANDS[self.command_name]['code']
self.reserved = '\xaa\x55'
self.raw_data = kwargs.pop('raw_data', '')
self.data = kwargs.pop('data', '')
if self.data:
self.process_data()
self.lout = get_length_on_2_bytes(self.raw_data)
self.command = self.rfu + self.type_cmd + self.code_cmd + \
self.reserved + self.lout + self.raw_data
def set_command(self, string):
self.command = string
def process_data(self):
try:
getattr(librfid.core_standard, self.command_name.lower())(self)
except:
pass
def build_message(self):
self.sof = '\x02'
self.length = get_length_on_2_bytes(self.command)
self.ctrl_addr = '\x00' # RS232 only
self.ctrl_mode = '\x00' # Unsecure mode
message = self.length + self.ctrl_addr + self.ctrl_mode + self.command
self.crc = get_crc_16_CCITT(message)
self.message = self.sof + message + self.crc
def get_message(self):
self.build_message()
return self.message
def pretty_print(self):
self.build_message()
display = self.message[:5].encode('hex') + ' ' \
+ self.message[5:-2].encode('hex') + ' ' \
+ self.message[-2:].encode('hex')
if self.time:
display = str(self.time) + '\t' + display
display = 'COMMAND\t\t' + display
print display
class ReaderCommand(Command):
'''
Command encapsulated in a message
build_message() does the encapsulation
Command fields:
rfu_cmd: 1 byte
type_cmd: 1 byte
code_cmd: 2 bytes
reserved: 2 bytes AAh 55h
data_length (Lout): 2 bytes
data : Lout bytes
'''
def __init__(self, *args, **kwargs):
self.reader = kwargs.pop('reader', '')
if not self.reader:
raise UnknownReaderException('Reader not given to command')
self.command = kwargs.pop('command', '')
# Message parameters
self.crc_type = kwargs.pop('crc_type', 'CRC-16-CCITT')
self.mode = kwargs.pop('mode', 'unsecure')
self.address = kwargs.pop('mode', 'rs232')
self.command_name = kwargs.pop('command_name', '')
self.time = None
self.data = ''
if self.command_name:
self.rfu = '\x00'
self.type_cmd = self.reader.commands[self.command_name]['type']
self.code_cmd = self.reader.commands[self.command_name]['code']
self.reserved = '\xaa\x55'
self.raw_data = kwargs.pop('raw_data', '')
self.data = kwargs.pop('data', '')
if self.data:
self.process_data()
self.lout = get_length_on_2_bytes(self.raw_data)
self.command = self.rfu + self.type_cmd + self.code_cmd + \
self.reserved + self.lout + self.raw_data
def process_data(self):
try:
getattr(self.reader.core, self.command_name.lower())(self)
except:
pass
class Response():
def __init__(self, command, message, reader=None):
self.message = message
self.command = command
if len(message) < 13:
raise BadLengthMessageException(
'Message must be at least 13 byte long (message: %s).'
% self.message.encode('hex'))
self.sof = message[0] # '\x02'
self.message_length = int(message[1:3].encode('hex'), 16)
if self.message_length != (len(message) - 7):
raise BadLengthMessageException(
'Length field says %d and the length computed '
'is %d (message: %s).'
% (self.message_length, len(message) - 7,
self.message.encode('hex')))
self.ctrl_addr = message[3] # '\x00' # RS232 only
self.ctrl_mode = message[4] # '\x00' # Unsecure mode
self.response = message[5:-2]
self.crc = message[-2:]
str_to_check = self.message[1:-2] # [Len ... Commande]
crc = get_crc_16_CCITT(str_to_check)
if self.crc != crc:
raise BadCRCException('CRC checking failed')
self.code = self.response[0:2]
if command.code_cmd != self.code:
raise CommandResponseCodeMismatchException(
'Command code %s mistmatch with response '
'code %s (message: %s).'
% (command.code_cmd.encode('hex'),
self.code.encode('hex'),
self.message.encode('hex')))
self.lin = int(self.response[2:4].encode('hex'), 16)
self.raw_data = None
if self.lin:
self.raw_data = self.response[4:-2]
if len(self.raw_data) != self.lin:
raise BadLengthMessageException(
'Data length %d mismatch mength field %d (message: %s).'
% (len(self.raw_data), self.lin,
self.message.encode('hex')))
self.status_type = self.response[-2] # Reader 00 or standard 08
'''if command.type_cmd != self.status_type:
raise CommandResponseTypeMismatchException(\
'Command type %s mismatch with response '
'status type %s (message: %s).' \
% (command.type_cmd.encode('hex'),
self.status_type.encode('hex'),
self.message.encode('hex')))'''
self.status_code = self.response[-1]
if self.status_code != '\x00':
if self.status_type == '\x00':
self.reader = reader
if not self.reader:
raise UnknownReaderException('Reader error returned but '
'no reader given to response to interpret error code')
raise ReaderResponseError('%s'
% self.reader.errors[self.status_code])
if self.status_type == '\x08':
raise StandardResponseError('%s'
% STANDARD_ERRORS[self.status_code])
raise UnknownResponseErrorType('Unknown type %s'
% self.status_type.encode('hex'))
self.time = None
self.data = None
if self.raw_data:
self.process_data()
def process_data(self):
try:
core_method_name = self.command.command_name.lower()
module = None
if isinstance(self.command, ReaderCommand):
module = self.command.reader.core
else:
module = librfid.core_standard
getattr(module, core_method_name)(self)
except Exception, e:
pass
def pretty_print(self):
display = None
if not self.message:
display = 'NO RESPONSE'
else:
display = self.message[:5].encode('hex') + ' ' \
+ self.message[5:-2].encode('hex') + ' ' \
+ self.message[-2:].encode('hex')
if self.time:
display = str(self.time) + '\t' + display
display = 'RESPONSE\t' + display
print display