summaryrefslogtreecommitdiff
path: root/detection/awdb/decoder.py
diff options
context:
space:
mode:
Diffstat (limited to 'detection/awdb/decoder.py')
-rw-r--r--detection/awdb/decoder.py168
1 files changed, 168 insertions, 0 deletions
diff --git a/detection/awdb/decoder.py b/detection/awdb/decoder.py
new file mode 100644
index 0000000..1b43fc8
--- /dev/null
+++ b/detection/awdb/decoder.py
@@ -0,0 +1,168 @@
+from __future__ import unicode_literals
+
+import struct
+
+from awdb.compat import byte_from_int, int_from_byte, int_from_bytes
+from awdb.errors import InvalidDatabaseError
+
+
+class Decoder(object):
+ def __init__(self, database_buffer, pointer_base=0, pointer_test=False):
+ self._pointer_test = pointer_test
+ self._buffer = database_buffer
+ self._pointer_base = pointer_base
+
+ def _decode_array(self, size, offset):
+ array = []
+ for _ in range(size):
+ (value, offset) = self.decode(offset)
+ array.append(value)
+ return array, offset
+
+ def _decode_boolean(self, size, offset):
+ return size != 0, offset
+
+ def _decode_bytes(self, size, offset):
+ new_offset = offset + size
+ return self._buffer[offset:new_offset], new_offset
+
+ def _decode_double(self, size, offset):
+ self._verify_size(size, 8)
+ new_offset = offset + size
+ packed_bytes = self._buffer[offset:new_offset]
+ (value, ) = struct.unpack(b'!d', packed_bytes)
+ return value, new_offset
+
+ def _decode_float(self, size, offset):
+ self._verify_size(size, 4)
+ new_offset = offset + size
+ packed_bytes = self._buffer[offset:new_offset]
+ (value, ) = struct.unpack(b'!f', packed_bytes)
+ return value, new_offset
+
+ def _decode_int32(self, size, offset):
+ if size == 0:
+ return 0, offset
+ new_offset = offset + size
+ packed_bytes = self._buffer[offset:new_offset]
+
+ if size != 4:
+ packed_bytes = packed_bytes.rjust(4, b'\x00')
+ (value, ) = struct.unpack(b'!i', packed_bytes)
+ return value, new_offset
+
+ def _decode_map(self, size, offset):
+ container = {}
+ for _ in range(size):
+ (key, offset) = self.decode(offset)
+ (value, offset) = self.decode(offset)
+ if key == value:
+ container[key] = bytes(value, 'utf-8')
+ else:
+ container[key] = value
+ # print("###")
+ # print(container)
+ # print("###")
+ return container, offset
+
+ def _decode_pointer(self, size, offset):
+ pointer_size = (size >> 3) + 1
+
+ buf = self._buffer[offset:offset + pointer_size]
+ new_offset = offset + pointer_size
+
+ if pointer_size == 1:
+ buf = byte_from_int(size & 0x7) + buf
+ pointer = struct.unpack(b'!H', buf)[0] + self._pointer_base
+ elif pointer_size == 2:
+ buf = b'\x00' + byte_from_int(size & 0x7) + buf
+ pointer = struct.unpack(b'!I', buf)[0] + 2048 + self._pointer_base
+ elif pointer_size == 3:
+ buf = byte_from_int(size & 0x7) + buf
+ pointer = struct.unpack(b'!I',
+ buf)[0] + 526336 + self._pointer_base
+ else:
+ pointer = struct.unpack(b'!I', buf)[0] + self._pointer_base
+
+ if self._pointer_test:
+ return pointer, new_offset
+ (value, _) = self.decode(pointer)
+ return value, new_offset
+
+ def _decode_uint(self, size, offset):
+ new_offset = offset + size
+ uint_bytes = self._buffer[offset:new_offset]
+ return int_from_bytes(uint_bytes), new_offset
+
+ def _decode_utf8_string(self, size, offset):
+ new_offset = offset + size
+ return self._buffer[offset:new_offset].decode('utf-8'), new_offset
+
+ _type_decoder = {
+ 1: _decode_pointer,
+ 2: _decode_utf8_string,
+ 3: _decode_double,
+ 4: _decode_bytes,
+ 5: _decode_uint,
+ 6: _decode_uint,
+ 7: _decode_map,
+ 8: _decode_int32,
+ 9: _decode_uint,
+ 10: _decode_uint,
+ 11: _decode_array,
+ 14: _decode_boolean,
+ 15: _decode_float,
+ }
+
+ def decode(self, offset):
+ new_offset = offset + 1
+ ctrl_byte = int_from_byte(self._buffer[offset])
+ type_num = ctrl_byte >> 5
+ if not type_num:
+ (type_num, new_offset) = self._read_extended(new_offset)
+
+ try:
+ decoder = self._type_decoder[type_num]
+ except KeyError:
+ raise InvalidDatabaseError('Unexpected type number ({type}) '
+ 'encountered'.format(type=type_num))
+
+ (size, new_offset) = self._size_from_ctrl_byte(ctrl_byte, new_offset,
+ type_num)
+ return decoder(self, size, new_offset)
+
+ def _read_extended(self, offset):
+ next_byte = int_from_byte(self._buffer[offset])
+ type_num = next_byte + 7
+ if type_num < 7:
+ raise InvalidDatabaseError(
+ 'Something went horribly wrong in the decoder. An '
+ 'extended type resolved to a type number < 8 '
+ '({type})'.format(type=type_num))
+ return type_num, offset + 1
+
+ def _verify_size(self, expected, actual):
+ if expected != actual:
+ raise InvalidDatabaseError(
+ 'The AW DB file\'s data section contains bad data '
+ '(unknown data type or corrupt data)')
+
+ def _size_from_ctrl_byte(self, ctrl_byte, offset, type_num):
+ size = ctrl_byte & 0x1f
+ if type_num == 1 or size < 29:
+ return size, offset
+
+ if size == 29:
+ size = 29 + int_from_byte(self._buffer[offset])
+ return size, offset + 1
+
+ if size == 30:
+ new_offset = offset + 2
+ size_bytes = self._buffer[offset:new_offset]
+ size = 285 + struct.unpack(b'!H', size_bytes)[0]
+ return size, new_offset
+
+ new_offset = offset + 3
+ size_bytes = self._buffer[offset:new_offset]
+ size = struct.unpack(b'!I', b'\x00' + size_bytes)[0] + 65821
+ return size, new_offset