#
# Copyright (C) 2002 Lars Gustaebel <lars@gustaebel.de>
# All rights reserved.
#
# Permission is hereby granted, free of charge, to any person
# obtaining a copy of this software and associated documentation
# files (the "Software"), to deal in the Software without
# restriction, including without limitation the rights to use,
# copies of the Software, and to permit persons to whom the
# Software is furnished to do so, subject to the following
# conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
# OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
# OTHER DEALINGS IN THE SOFTWARE.
#
"""Read from and write to cpio format archives.
"""
#
#
#---------
# Imports
#---------
import sys
import os
import stat
import time
import struct
# cpio magic numbers
"""Base exception."""
pass
"""General exception for extract errors."""
pass
"""Exception for unreadble cpio archives."""
pass
"""Exception for unavailable compression methods."""
pass
"""Exception for unsupported operations on stream-like CpioFiles."""
pass
#---------------------------
# internal stream interface
#---------------------------
class _LowLevelFile:
"""Low-level file object. Supports reading and writing.
It is used instead of a regular file object for streaming
access.
"""
mode = {
}[mode]
class _Stream:
"""Class that serves as an adapter between CpioFile and
a stream-like object. The stream-like object only
needs to have a read() or write() method and is accessed
blockwise. Use of gzip or bzip2 compression is possible.
A stream-like object could be for example: sys.stdin,
sys.stdout, a socket, a tape device etc.
_Stream is intended to be used only internally.
"""
"""Construct a _Stream object.
"""
if fileobj is None:
if type == "gz":
try:
import zlib
except ImportError:
raise CompressionError("zlib module is not available")
if mode == "r":
else:
if type == "bz2":
try:
import bz2
except ImportError:
raise CompressionError("bz2 module is not available")
if mode == "r":
else:
"""Initialize for writing with gzip compression.
"""
"""Write string s to the stream.
"""
"""Write string s to the stream if a whole new block
is ready to be written.
"""
"""Close the _Stream object. No operation should be
done on it afterwards.
"""
return
long(0xffffFFFF)))
if not self._extfileobj:
"""Initialize for reading a gzip compressed fileobj.
"""
# taken from gzip.GzipFile with some alterations
raise ReadError("not a gzip file")
raise CompressionError("unsupported compression method")
if flag & 4:
if flag & 8:
while True:
if not s or s == NUL:
break
if flag & 16:
while True:
if not s or s == NUL:
break
if flag & 2:
"""Return the stream's file pointer position.
"""
"""Set the stream's file pointer to pos. Negative seeking
is forbidden.
"""
else:
raise StreamError("seeking backwards is not allowed")
"""Return the next size number of bytes from the stream.
If size is not defined, return all bytes of the stream
up to EOF.
"""
if size is None:
t = []
while True:
if not buf:
break
else:
# print("reading {0} bytes to {1} ({2})".format(size, self.pos, self.fileobj.tell()))
return buf
"""Return size bytes from the stream.
"""
while c < size:
if not buf:
break
t = "".join(t)
return t[:size]
"""Return size bytes from stream. If internal buffer is empty,
read another block from the stream.
"""
while c < size:
if not buf:
break
t = "".join(t)
return t[:size]
# class _Stream
#------------------------
# Extraction file object
#------------------------
"""File-like object for reading an archive member.
Is returned by CpioFile.extractfile().
"""
raise ValueError("file is closed")
if size is None:
else:
"""Read a line with approx. size. If size is negative,
read a whole line. readline() and read() must not
be mixed up (!).
"""
if size < 0:
if nl >= 0:
else:
if not buf:
break
if nl == -1:
s = self.linebuffer
return s
return buf + "\n"
"""Return a list with all (following) lines.
"""
result = []
while True:
if not line: break
return result
"""Return the current file position.
"""
"""Seek to a position in the file.
"""
if whence == 0:
elif whence == 1:
if pos < 0:
else:
elif whence == 2:
"""Close the file object.
"""
#class ExFileObject
#------------------
# Exported Classes
#------------------
"""Informational class which holds the details about an
archive member given by a cpio header block.
CpioInfo objects are returned by CpioFile.getmember(),
CpioFile.getmembers() and CpioFile.getcpioinfo() and are
usually created internally.
"""
"""Construct a CpioInfo object. name is the optional name
of the member.
"""
return "<{0} {1!r} at {2:#x}>".format(
"""Construct a CpioInfo object from a buffer. The buffer should
be at least 6 octets long to determine the type of archive. The
rest of the data will be read in on demand.
"""
# Read enough for the ASCII magic
hdrtype = "CMS_ASC"
hdrtype = "CMS_CHR"
hdrtype = "CMS_CRC"
else:
if b == CMN_ASC:
hdrtype = "CMN_ASC"
elif b == CMN_BIN:
hdrtype = "CMN_BIN"
elif b == CMN_BBS:
hdrtype = "CMN_BBS"
elif b == CMN_CRC:
hdrtype = "CMN_CRC"
else:
raise ValueError("invalid cpio header")
if hdrtype == "CMN_BIN":
# Header is padded to halfword boundaries
elif hdrtype == "CMS_ASC":
# Pad to the nearest 4 byte block, 0-3 bytes.
else:
raise ValueError("unsupported cpio header")
return cpioinfo
# This isn't in tarfile, but it's too useful. It's required
# modifications to frombuf(), as well as CpioFile.next() to pass the
# CpioFile object in. I'm not sure that isn't poor OO style.
"""Return a file-like object which can be read to extract the contents.
"""
else:
return None
"""The CpioFile Class provides an interface to cpio archives.
"""
"""Open an (uncompressed) cpio archive `name'. `mode' is either 'r' to
read from an existing archive, 'a' to append data to an existing
file or 'w' to create a new file overwriting an existing one. `mode'
defaults to 'r'.
If `fileobj' is given, it is used for reading or writing data. If it
can be determined, `mode' is overridden by `fileobj's mode.
`fileobj' is not closed, when CpioFile is closed.
"""
raise ValueError("mode must be 'r', 'a' or 'w'")
else:
# Copy constructor: just copy fileobj over and reset the
# _Stream object's idea of where we are back to the
# beginning. Everything else will be reset normally.
# XXX clear closed flag?
if cfobj:
# Init datastructures
self.firstmember = None
# Move to the end of the archive,
# before the first empty block.
self.firstmember = None
while True:
try:
except ReadError:
break
if cpioinfo is None:
break
#--------------------------------------------------------------------------
# Below are the classmethods which act as alternate constructors to the
# CpioFile class. The open() method is the only one that is needed for
# public use; it is the "super"-constructor and is able to select an
# adequate "sub"-constructor for a particular compression using the mapping
# from OPEN_METH.
#
# This concept allows one to subclass CpioFile without losing the comfort of
# the super-constructor. A sub-constructor is registered and made available
# by adding it to the mapping in OPEN_METH.
"""Open a cpio archive for reading, writing or appending. Return
an appropriate CpioFile class.
mode:
'r' open for reading with transparent compression
'r:' open for reading exclusively uncompressed
'r:gz' open for reading with gzip compression
'r:bz2' open for reading with bzip2 compression
'a' or 'a:' open for appending
'w' or 'w:' open for writing without compression
'w:gz' open for writing with gzip compression
'w:bz2' open for writing with bzip2 compression
'r|' open an uncompressed stream of cpio blocks for reading
'r|gz' open a gzip compressed stream of cpio blocks
'r|bz2' open a bzip2 compressed stream of cpio blocks
'w|' open an uncompressed stream for writing
'w|gz' open a gzip compressed stream for writing
'w|bz2' open a bzip2 compressed stream for writing
"""
raise ValueError("nothing to open")
if ":" in mode:
# Select the *open() function according to
# given compression.
else:
elif "|" in mode:
if filemode not in "rw":
raise ValueError("mode must be 'r' or 'w'")
t._extfileobj = False
return t
elif mode == "r":
# Find out which *open() is appropriate for opening the file.
try:
except (ReadError, CompressionError):
continue
raise ReadError("file could not be opened successfully")
elif mode in "aw":
raise ValueError("undiscernible mode")
"""Open uncompressed cpio archive name for reading or writing.
"""
raise ValueError("mode must be 'r', 'a' or 'w'")
"""Open gzip compressed cpio archive name for reading or writing.
Appending is not allowed.
"""
raise ValueError("mode must be 'r' or 'w'")
try:
import gzip
except (ImportError, AttributeError):
raise CompressionError("gzip module is not available")
if ext == ".gz":
ext = ""
if fileobj is None:
if mode != "r":
try:
fileobj))
except IOError:
raise ReadError("not a gzip file")
t._extfileobj = False
return t
"""Open bzip2 compressed cpio archive name for reading or writing.
Appending is not allowed.
"""
raise ValueError("mode must be 'r' or 'w'.")
try:
import bz2
except ImportError:
raise CompressionError("bz2 module is not available")
if ext == ".bz2":
ext = ""
if fileobj is not None:
raise ValueError("no support for external file objects")
try:
except IOError:
raise ReadError("not a bzip2 file")
t._extfileobj = False
return t
"""Open 7z compressed cpio archive name for reading, writing.
Appending is not allowed
"""
raise ValueError("mode must be 'r' or 'w'.")
if ext == ".7z":
ext = ""
try:
# To extract: 7z e -so <fname>
# To create an archive: 7z a -si <fname>
name)
if mode == "w":
comptype = "cpio"
except IOError:
t._extfileobj = False
return t
# All *open() methods are registered here.
OPEN_METH = {
"cpio": "cpioopen", # uncompressed
"gz": "gzopen", # gzip compressed
"bz2": "bz2open", # bzip2 compressed
"p7z": "p7zopen" # 7z compressed
}
"""Return a CpioInfo object for member `name'. If `name' can not be
found in the archive, KeyError is raised. If a member occurs more
than once in the archive, its last occurence is assumed to be the
most up-to-date version.
"""
if cpioinfo is None:
return cpioinfo
"""Return the members of the archive as a list of CpioInfo objects. The
list has the same order as the members in the archive.
"""
# scan the whole archive.
if self.firstmember is not None:
m = self.firstmember
self.firstmember = None
return m
while True:
# Read in enough for frombuf() to be able to determine
# what kind of archive it is. It will have to read the
# rest of the header.
if not buf:
return None
try:
except ValueError as e:
raise ReadError("empty, unreadable or compressed file")
return None
break
# if cpioinfo.chksum != calc_chksum(buf):
# self._dbg(1, "cpiofile: Bad Checksum {0!r}".format(cpioinfo.name))
return None
return cpioinfo
else:
# XXX deal with other types
else:
return None
if remainder:
blocks += 1
if cpioinfo is None:
else:
return members[i]
while True:
if cpioinfo is None:
break
else:
"""Find the next cpio archive glommed on to the end of the current one.
Some applications, like Solaris package datastreams, concatenate
multiple cpio archives together, separated by a bit of padding.
This routine puts all the file pointers in position to start
reading from the next archive, which can be done by creating a
new CpioFile object given the original one as an argument (after
this routine is called).
"""
bytes = 0
"""Return the next cpio archive glommed on to the end of the current one.
Return the CpioFile object based on the repositioning done by
find_next_archive().
"""
class CpioIter:
return self
if not cpioinfo:
raise StopIteration
else:
try:
except IndexError:
raise StopIteration
return cpioinfo
return True
return True
return False
if __name__ == "__main__":
print("cpiofile is:", cf)
print("cpioinfo is:", ci)
# f = cf.extractfile(ci)
# for l in f.readlines():
# print(l, end=" ")
# f.close()