Skip to main content
added 1 character in body
Source Link
Veedrac
  • 9.8k
  • 23
  • 38

I personally don't think Header should be calling seek on the input fil;file; it should trust the user to give it an appropriately seeked file, even if that may be offset.

It doesn't seem like Header needs fhandle at all, actually; it should be able to recievereceive just a string. FWIW, header_buffer isn't a buffer.

I personally don't think Header should be calling seek on the input fil; it should trust the user to give it an appropriately seeked file, even if that may be offset.

It doesn't seem like Header needs fhandle at all, actually; it should be able to recieve just a string. FWIW, header_buffer isn't a buffer.

I personally don't think Header should be calling seek on the input file; it should trust the user to give it an appropriately seeked file, even if that may be offset.

It doesn't seem like Header needs fhandle at all, actually; it should be able to receive just a string. FWIW, header_buffer isn't a buffer.

Source Link
Veedrac
  • 9.8k
  • 23
  • 38

The comment

# Format info: http://lclevy.free.fr/cr2/
# The Cr2 class loads a CR2 file from disk. It is currently read-only.

would be better in Cr2's docstring. "It is currently read-only" doesn't mean much since everything is public. To mark things "private" the convention is to prefix them with underscores (eg. self._header).

I wouldn't do that, though; I'd get rid of the getters and just have attributes. If you really want getters in order to make things read-only, use properties.

Since you're requiring Python 3, remove (object) from the inheritance list; it was only required while transitioning from 2.x.

I would separate the embedded classes; sticking them together seems relatively pointless.

IMHO, [endianness] = ... is nicer than (endianness,) = .... Feel free to ignore this point.

I personally don't think Header should be calling seek on the input fil; it should trust the user to give it an appropriately seeked file, even if that may be offset.

Your assert not fhandle.closed isn't needed; read will error just fine if it's closed (with a more descriptive error, too).

It doesn't seem like Header needs fhandle at all, actually; it should be able to recieve just a string. FWIW, header_buffer isn't a buffer.

I would use a dictionary to calculate endian_flag.

endian_flags = {
    0x4949: '<', # Intel
    0x4D4D: '>', # Motorola
}

# Default to native
endian_flag = endian_flags.get(endianness, "@")

Header looks like it could be a namedtuple.

You use unpack_from a lot where unpack alone should work. Prefer the later.

This all gives:

from collections import namedtuple

# Mapping from manufacturer to associated endianness as accepted by struct
endian_flags = {
    0x4949: '<', # Intel
    0x4D4D: '>', # Motorola
}

_HeaderFields = namedtuple("HeaderFields", [
    "endian_flag", "raw_header", "tiff_magic_word", "tiff_offset",
    "magic_word", "major_version", "minor_version", "raw_ifd_offset"
])

class Header(_HeaderFields):
    __slots__ = ()

    def __new__(cls, header_bytes):
        [endianness] = struct.unpack_from('H', header_bytes)

        # Default to native
        endian_flag = endian_flags.get(endianness, "@")
        raw_header = struct.unpack(endian_flag + 'HHLHBBL', header_bytes)

        return super().__new__(cls, endian_flag, raw_header, *raw_header[1:])

In IfdEntry, you have

tag_types[self.tag_type][0] == '*'

which would be more clear as

tag_types[self.tag_type].startswith('*')

In get_value you do

tag_fmt = tag_types[self.tag_type][1:]

but I don't see a reason to use a slice. In fact, I would do the even stricter

_, tag_fmt = tag_types[self.tag_type]

get_value would be better as a property.

I would change self.tag_type to be the result from after translating with tag_types.

I would do a single read from fhandle in __init__ and use unpack_from's offset parameter to read parts. This emphasised how repetitive the unpacking is, so I'd make a local function to reduce the repetition:

def __init__(self, num, parent):
    self.parent = parent
    parent.fhandle.seek(parent.offset + 2 + (12 * num))
    entry_bytes = parent.fhandle.read(12)

    def unpack_at(type, offset):
        return struct.unpack_from(parent.endian_flag + type, entry_bytes, offset)

    self.tag_id, tag_type, self.value_len = unpack_at('HHL', 0)
    self.tag_type = tag_types[tag_type]

    if self.tag_type.startswith('*'):
        [self.raw_value] = unpack_at('L', 8)
        self._value = -1
    else:
        [self.raw_value] = unpack_at(self.tag_type, 8)
        self._value = self.raw_value

Use None instead of -1 as a "missing value" sentinel; that's what it's for, after all.

Looking further, I got confused as to why you delayed it anyway; value_len never seems to be big so it makes sense just to do it eagerly. This pointed out what seems to be a bug: self.parent.fhandle.read(self.value_len) is too few bytes when tag_fmt represents a value of > 1 bytes. This kind of error and repetition suggests that you should really have a class to encapsulate reading data from the file. Cr2 would work great for that.

You also don't check for insufficient read sizes, but I'll let that be for now.

Being non-lazy, you could make IfdEntry another immutable namedtuple:

_IfdEntryFields = namedtuple("IfdEntryFields", [
    "image", "tag_id", "tag_type", "value_len", "raw_value", "value"
])

class IfdEntry(_IfdEntryFields):
    __slots__ = ()

    def __new__(cls, offset, image):
        tag_id, tag_type_key, value_len = image.read_at('HHL', offset)
        tag_type = tag_types[tag_type_key]

        if tag_type.startswith('*'):
            _, tag_type = tag_type

            [raw_value] = image.read_at('L', offset + 8)
            [value] = image.read_at('{}{}'.format(value_len, tag_type), raw_value)

            if tag_type == "s":
                value = value.rstrip(b'\0').decode("utf-8")

        else:
            [raw_value] = image.read_at(tag_type, offset + 8)
            value = raw_value

        return super().__new__(cls, image, tag_id, tag_type, value_len, raw_value, value)

The same goes for Ifd:

_IfdFields = namedtuple("IfdFields", [
    "image", "offset", "num_entries", "next_ifd_offset"
])

class Ifd(_IfdFields):
    __slots__ = ()

    def __new__(cls, offset, image):
        [num_entries] = image.read_at('H', offset)
        [next_ifd_offset] = image.read_at('H', offset + (2 + 12 * num_entries))
        return super().__new__(cls, image, offset, num_entries, next_ifd_offset)

find_entry should just iterate over range(self.num_entries); the 0 is optional.

You probably shouldn't return -1 on failure; I suggest None as before.

In Cr2, you have self.ifd. It makes sense to call this self.ifds. I would also do the loop as:

self.ifds = [Idf(16, self)]
for i in range(3):
    self.ifds.append(Ifd(self.ifds[-1].next_ifd_offset, self))

You should be more careful about how you handle errors in self.__init__; if the constructor closes the file should also be left closed.

Your first read to find the endianness uses the default, platform-specific endianness. I would imagine you want a more sensible default like '>'.

Ifd remakes its entries every call to find_entry. It makes more sense IMHO to generate once on creation.

All in all, I get something like

from collections import namedtuple
from struct import Struct

tags = {
    'image_width': 0x0100,
    'image_length': 0x0101,
    'bits_per_sample': 0x0102,
    'compression': 0x0103,
    'make': 0x010f,
    'model': 0x0110,
    'strip_offset': 0x0111,
    'orientation': 0x0112,
    'strip_byte_counts': 0x0117,
    'x_resolution': 0x011a,
    'y_resolution': 0x011b,
    'resolution_unit': 0x0128,
    'datetime': 0x0132,
    'exif': 0x8769,
    'gps_data': 0x8825
}

# Mapping of tag types to format strings.
# Format strings that start with a * are too long to fit in the IFD entry and
# are actually a pointer to somewhere else in the file.
tag_types = {
    0x1: 'B',  # Unsigned char
    0x2: '*s',  # String (with ending 0)
    0x3: 'H',  # Unsigned short
    0x4: 'L',  # Unsigned long
    0x5: '*Q',  # Unsigned rational
    0x6: 'b',  # Signed char
    0x7: '*p',  # Byte sequence
    0x8: 'h',  # Signed short
    0x9: 'l',  # Signed long
    0xA: '*q',  # Signed rational
    0xB: '*f',  # Float (IEEE)
    0xC: '*d',  # Double (IEEE)
}

# Mapping from manufacturer to associated endianness as accepted by struct
endian_flags = {
    0x4949: '<', # Intel
    0x4D4D: '>', # Motorola
}


_HeaderFields = namedtuple("HeaderFields", [
    "endian_flag", "raw_header", "tiff_magic_word", "tiff_offset", "magic_word",
    "major_version", "minor_version", "raw_ifd_offset"
])

class Header(_HeaderFields):
    __slots__ = ()

    def __new__(cls, image):
        # Default to native
        [endian_flag_key] = image.read_at('H', 0, endian_flag='>')
        endian_flag = endian_flags[endian_flag_key]

        raw_header = image.read_at('HHLHBBL', 0, endian_flag=endian_flag)
        return super().__new__(cls, endian_flag, raw_header, *raw_header[1:])


_IfdEntryFields = namedtuple("IfdEntryFields", ["tag_id", "tag_type", "value"])

class IfdEntry(_IfdEntryFields):
    __slots__ = ()

    def __new__(cls, offset, image):
        tag_id, tag_type_key, value_len = image.read_at('HHL', offset)
        tag_type = tag_types[tag_type_key]

        if tag_type.startswith('*'):
            _, tag_type = tag_type

            [pointer] = image.read_at('L', offset + 8)
            [value] = image.read_at(str(value_len) + tag_type, pointer)

            if tag_type == "s":
                value = value.rstrip(b'\0').decode("utf-8")

        else:
            [value] = image.read_at(tag_type, offset + 8)

        return super().__new__(cls, tag_id, tag_type, value)


_IfdFields = namedtuple("IfdFields", ["image", "entries", "offset", "num_entries", "next_ifd_offset"])

class Ifd(_IfdFields):
    __slots__ = ()

    def __new__(cls, ifd_offset, image):
        [num_entries]     = image.read_at('H', ifd_offset)
        offsets = [ifd_offset + 2 + (12 * i) for i in range(num_entries+1)]

        [next_ifd_offset] = image.read_at('H', offsets.pop())
        entries = [IfdEntry(entry_offset, image) for entry_offset in offsets]

        return super().__new__(cls, image, entries, ifd_offset, num_entries, next_ifd_offset)

    def find_entry(self, name):
        filtered = (entry for entry in self.entries if entry.tag_id == tags[name])
        return next(filtered, None)


class Cr2:
    """
    Load a CR2 file from disk. It is currently read-only.
    Format info: http://lclevy.free.fr/cr2/
    """
    def __init__(self, file_path):
        self.name = file_path
        self._fhandle = open(file_path, "rb")

        try:
            self.header = Header(self)

            self.ifds = [Ifd(16, self)]
            for i in range(3):
                self.ifds.append(Ifd(self.ifds[-1].next_ifd_offset, self))

        except:
            self._fhandle.close()
            raise

    def read_at(self, fmt, offset, endian_flag=None):
        if endian_flag is None:
            endian_flag = self.header.endian_flag

        reader = Struct(endian_flag + fmt)
        self._fhandle.seek(offset)
        data = self._fhandle.read(reader.size)
        return reader.unpack_from(data)

    def __enter__(self):
        return self

    def __exit__(self, type, value, traceback):
        self._fhandle.close()