#
# CDDL HEADER START
#
# The contents of this file are subject to the terms of the
# Common Development and Distribution License (the "License").
# You may not use this file except in compliance with the License.
#
# You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
# See the License for the specific language governing permissions
# and limitations under the License.
#
# When distributing Covered Code, include this CDDL HEADER in each
# file and include the License file at usr/src/OPENSOLARIS.LICENSE.
# If applicable, add the following below this CDDL HEADER, with the
# fields enclosed by brackets "[]" replaced with your own identifying
# information: Portions Copyright [yyyy] [name of copyright owner]
#
# CDDL HEADER END
#
"""
Misc utility functions used by the packaging system.
"""
import calendar
import collections
import datetime
import errno
import fnmatch
import getopt
import locale
import os
import platform
import re
import resource
import shutil
import signal
import six
import socket
import struct
import sys
import threading
import time
import traceback
import urllib
import zlib
# ungrouped-imports: pylint: disable=C0412
# Redefining built-in 'range'; pylint: disable=W0622
# Module 'urllib' has no 'parse' member; pylint: disable=E1101
# Default path where the temporary directories will be created.
# Minimum number of days to issue warning before a certificate expires
# Constant string used across many modules as a property name.
# Bug URI Constants (deprecated)
# Line too long; pylint: disable=C0301
# pylint: enable=C0301
# Comparison types
# Traceback message.
def get_traceback_message():
"""This function returns the standard traceback message. A function
is necessary since the _() call must be done at runtime after locale
setup."""
return _("""\n
This is an internal error in pkg(7) version {version}. Please log a
Service Request about this issue including the information above and this
def time_to_timestamp(t):
"""convert seconds since epoch to %Y%m%dT%H%M%SZ format"""
# XXX optimize?; pylint: disable=W0511
"""convert %Y%m%dT%H%M%SZ format to seconds since epoch"""
# XXX optimize?; pylint: disable=W0511
"""convert %Y%m%dT%H%M%SZ format to a datetime object"""
"""copy a file, preserving attributes, ownership, etc. where possible"""
try:
except OSError as e:
raise
"""Rewrite of shutil.copytree() that can handle special files such as
FIFOs, sockets, and device nodes. It re-creates all symlinks rather
than copying the data behind them, and supports neither the 'symlinks'
nor the 'ignore' keyword arguments of the shutil version.
"""
problem = None
# os.mknod doesn't work correctly in 64 bit.
# The s11 fcs version of python doesn't have os.mknod()
# but sock.bind has a path length limitation that we can
# hit when archiving the test suite.
# E1101 Module '{0}' has no '{1}' member
# pylint: disable=E1101
else:
try:
# Store original exception so that the
# real cause of failure can be raised if
# this fails.
continue
# the s11 fcs version of python doesn't have os.mknod()
# E1101 Module '{0}' has no '{1}' member
# pylint: disable=E1101
pass
pass
else:
if problem:
"""Rewrite of shutil.move() that uses our copy of copytree()."""
# If dst is a directory, then we try to move src into it.
try:
except EnvironmentError as e:
else:
else:
raise
"""given a set of directories, return expanded set that includes
all components"""
for d in dirs:
p = d
while p != "":
return out
def url_affix_trailing_slash(u):
"""if 'u' donesn't have a trailing '/', append one."""
if u[-1] != '/':
u = u + '/'
return u
"""Return a string that can use to identify the client."""
else:
return useragent
# Valid hostname can be : HOSTNAME or IPv4 addr or IPV6 addr
|(?:\d{1,3}\.){3}\d{3}
|\[([a-fA-F0-9\.]*:){,7}[a-fA-F0-9\.]+\])$""", re.X)
"""Verify that the publisher prefix only contains valid characters."""
if not prefix:
return False
# This is a workaround for the the hostname_re being slow when
# it comes to finding invalid characters in the prefix string.
# prefix bad chars
return False
return True
return False
"""Verify that the publisher URL contains only valid characters.
If 'proxy' is set to True, some checks are relaxed."""
if not url:
return False
# First split the URL and check if the scheme is one we support
if not o[0] in _valid_proto:
return False
if o[0] == "file":
return False
# No further validation to be done.
return True
# Next verify that the network location is valid
else:
if proxy:
# We may have authentication details in the proxy URI, which
# we must ignore when checking for hostname validity.
return False
return True
return False
"""Decompress a gzipped input stream into an output stream.
The argument 'gz' is an input stream of a gzipped file and 'outfile'
is is an output stream. gunzip_from_stream() decompresses data from
'gz' and writes it to 'outfile', and returns the hexadecimal SHA sum
of that data using the hash_func supplied.
'hash_funcs', if supplied, is a list of hash functions which we should
use to compute the hash. If 'hash_funcs' is supplied, a list of
hexadecimal digests computed using those functions is returned. The
returned list is in the same order as 'hash_funcs'.
If 'ignore_hash' is True, we do not compute a hash when decompressing
the content and do not return any value.
"""
FHCRC = 2
FEXTRA = 4
FNAME = 8
FCOMMENT = 16
raise ValueError("no hash functions for gunzip_from_stream")
# Read the header
if magic != b"\037\213":
if method != 8:
# Discard an extra field
# Discard a null-terminated filename
while True:
if not s or s == b"\000":
break
# Discard a null-terminated comment
while True:
if not s or s == b"\000":
break
# Discard a 16-bit CRC
if ignore_hash:
pass
elif hash_funcs:
shasums = []
for f in hash_funcs:
else:
while True:
if buf == b"":
if ignore_hash:
pass
elif hash_funcs:
else:
break
if ignore_hash:
pass
elif hash_funcs:
else:
if ignore_hash:
return
elif hash_funcs:
hexdigests = []
return hexdigests
""" Pipe exception. """
""" Emit a message. """
try:
except IOError as e:
raise PipeError(e)
raise
""" Emit a message to sys.stderr. """
try:
except IOError as e:
raise PipeError(e)
raise
"""Wraps locale.setlocale(), falling back to the C locale if the desired
locale is broken or unavailable. The 'printer' parameter should be a
function which takes a string and displays it. If 'None' (the default),
setlocale() will print the message to stderr."""
if printer is None:
try:
# Because of Python bug 813449, getdefaultlocale may fail
# with a ValueError even if setlocale succeeds. So we call
# it here to prevent having this error raised if it is
# called later by other non-pkg(7) code.
try:
except ValueError:
dl = ""
printer("Unable to set locale{0}; locale package may be broken "
"""Return its argument; used to mark strings for localization when
their use is delayed by the program."""
return message
"""Returns a human-formatted string representing the number of bytes
in the largest unit possible.
If provided, 'fmt' should be a string which can be formatted
with a dictionary containing a float 'num' and strings 'unit' and
'shortunit'. The default format prints, for example, '3.23 MB' """
units = [
(_("B"), _("B"), 2**10),
(_("kB"), _("k"), 2**20),
(_("MB"), _("M"), 2**30),
(_("GB"), _("G"), 2**40),
(_("TB"), _("T"), 2**50),
(_("PB"), _("P"), 2**60),
(_("EB"), _("E"), 2**70)
]
# pylint is picky about this message:
# old-division; pylint: disable=W1619
# Try the next largest unit of measure unless this is
# the largest or if the byte size is within the current
# unit of measure's range.
continue
if "{num:d}" in fmt:
)
else:
)
"""Calculate the depth of the current request path relative to our
base uri. path_info always ends with a '/' -- so ignore it when
calculating depth."""
if pub:
"""Takes a file action and returns the over-the-wire size of
a package as an integer. The OTW size is the compressed size,
pkg.csize. If that value isn't available, it returns pkg.size.
If pkg.size isn't available, return zero."""
if size is None:
"""Returns a tuple of ({hash attribute name: hash value}, content)
or a tuple of (hash value, content) if hash_attrs has only one element.
'data' should be a file-like object or a pathname to a file.
'length' should be an integer value representing the size of
the contents of data in bytes.
'return_content' is a boolean value indicating whether the
second tuple value should contain the content of 'data' or
if the content should be discarded during processing.
'hash_attrs' is a list of keys describing the hashes we want to compute
for this data. The keys must be present in 'hash_algs', a dictionary
mapping keys to the factory methods that are used to create objects
to compute them. The factory method must take no parameters, and must
return an object that has 'update()' and 'hexdigest()' methods.
'hash_func' is provided as a convenience to simply hash the data with
a single hash algorithm. The value of 'hash_func' should be the factory
method used to compute that hash value, as described in the previous
paragraph.
"""
else:
f = data
if length is None:
# Setup our results dictionary so that each attribute maps to a
# new hash object.
if hash_func:
else:
if hash_algs is None or hash_attrs is None:
assert False, "get_data_digest without hash_attrs/algs"
hash_results = {}
for attr in hash_attrs:
# "pkg.content-hash" is provided by default and doesn't
# indicate the hash_alg to be used, so when we want to
# calculate the content hash, we'll specify the
# hash_attrs explicitly, such as "file:sha512t_256".
if attr != "pkg.content-hash":
# Read the data in chunks and compute the SHA hashes as the data comes
# in. A large read on some platforms (e.g. Windows XP) may fail.
while length > 0:
if return_content:
if hash_func:
else:
# update each hash with this data
for attr in hash_attrs:
if attr != "pkg.content-hash":
data) # pylint: disable=E1101
if l == 0:
break
length -= l
if closefobj:
f.close()
if hash_func:
# The returned dictionary can now be populated with the hexdigests
# instead of the hash objects themselves.
for attr in hash_results:
"""Used by compute_compressed_attrs to calculate data size and compute
hashes as the data is written instead of having to read the written data
again later."""
"""If path is None, the data will be discarded immediately after
computing size and hashes."""
if path:
else:
"""Close the file."""
"""Flush the file."""
"""Return the size of the file."""
"""Write data to the file and compute the hashes of the data."""
data) # pylint: disable=E1101
"""Returns the size and one or more hashes of the compressed data. If
the file located at file_path doesn't exist or isn't gzipped, it creates
a file in compress_dir named fname. If compress_dir is None, the
attributes are calculated but no data will be written.
'chash_attrs' is a list of the chash attributes we should compute, with
'chash_algs' being a dictionary that maps the attribute names to the
algorithms used to compute them.
"""
if chash_attrs is None:
if chash_algs is None:
chashes = {}
for chash_attr in chash_attrs:
# "pkg.content-hash" is provided by default and doesn't
# indicate the hash_alg to be used, so when we want to
# calculate the content hash, we'll specify the
# hash_attrs explicitly, such as "gzip:sha512t_256".
if chash_attr == "pkg.content-hash":
else:
#
# This check prevents compressing a file which is already compressed.
# This takes CPU load off the depot on large imports of mostly-the-same
# stuff. And in general it saves disk bandwidth, and on ZFS in
# particular it saves us space in differential snapshots. We also need
# to check that the destination is in the same compression format as
# properties to work right.
#
if file_path:
if fileneeded:
if compress_dir:
else:
opath = None
l = n * bufsz
h = (n + 1) * bufsz
if attr == "pkg.content-hash":
else:
# Compute the SHA hash of the compressed file. In order for this to
# work correctly, we have to use the PkgGzipFile class. It omits
# filename and timestamp information from the gzip header, allowing us
# to generate deterministic hashes for different files with identical
# content.
while True:
# cdata is bytes
if cdata == b"":
break
for chash_attr in chashes:
cdata) # pylint: disable=E1101
# The returned dictionary can now be populated with the hexdigests
# instead of the hash objects themselves.
if attr == "pkg.content-hash":
else:
"""This class is used as an interface to procfs."""
# Detect whether python is running in 32-bit or 64-bit
# environment based on pointer size.
"long": "l",
"uintptr_t": "I",
"ulong": "L"
},
64: {
"long": "q",
"uintptr_t": "Q",
"ulong": "Q"
}}
_ctype_formats = {
# This dictionary maps basic c types into python format characters
# that can be used with struct.unpack(). The format of this
# dictionary is:
# <ctype>: (<repeat count>, <format char>)
# basic c types (repeat count should always be 1)
# char[] is used to encode character arrays
"char": (1, "c"),
"char[]": (1, "s"),
"int": (1, "i"),
"ushort_t": (1, "H"),
# other simple types (repeat count should always be 1)
"ctid_t": (1, "i"), # ctid_t -> id_t -> int
# dev_t -> ulong_t
"gid_t": (1, "I"), # gid_t -> uid_t -> uint_t
"pid_t": (1, "i"), # pid_t -> int
"poolid_t": (1, "i"), # poolid_t -> id_t -> int
"projid_t": (1, "i"), # projid_t -> id_t -> int
# size_t -> ulong_t
"taskid_t": (1, "i"), # taskid_t -> id_t -> int
# time_t -> long
"uid_t": (1, "I"), # uid_t -> uint_t
"zoneid_t": (1, "i"), # zoneid_t -> id_t -> int
"id_t": (1, "i"), # id_t -> int
# structures must be represented as character arrays
# sizeof (timestruc_t) = 8 in 32-bit process, and = 16 in 64-bit.
}
_timestruct_desc = [
# this list describes a timestruc_t structure
# the entry format is (<ctype>, <repeat count>, <name>)
("time_t", 1, "tv_sec"),
("long", 1, "tv_nsec"),
]
_psinfo_desc = [
# this list describes a psinfo_t structure
# the entry format is: (<ctype>, <repeat count>, <name>)
("int", 1, "pr_flag"),
("int", 1, "pr_nlwp"),
("pid_t", 1, "pr_pid"),
("pid_t", 1, "pr_ppid"),
("pid_t", 1, "pr_pgid"),
("pid_t", 1, "pr_sid"),
("uid_t", 1, "pr_uid"),
("uid_t", 1, "pr_euid"),
("gid_t", 1, "pr_gid"),
("gid_t", 1, "pr_egid"),
("uintptr_t", 1, "pr_addr"),
("size_t", 1, "pr_size"),
("size_t", 1, "pr_rssize"),
("size_t", 1, "pr_pad1"),
("dev_t", 1, "pr_ttydev"),
("ushort_t", 1, "pr_pctcpu"),
("ushort_t", 1, "pr_pctmem"),
("timestruc_t", 1, "pr_start"),
("timestruc_t", 1, "pr_time"),
("timestruc_t", 1, "pr_ctime"),
("char[]", 16, "pr_fname"),
("char[]", 80, "pr_psargs"),
("int", 1, "pr_wstat"),
("int", 1, "pr_argc"),
("uintptr_t", 1, "pr_argv"),
("uintptr_t", 1, "pr_envp"),
("char", 1, "pr_dmodel"),
("char[]", 3, "pr_pad2"),
("taskid_t", 1, "pr_taskid"),
("projid_t", 1, "pr_projid"),
("int", 1, "pr_nzomb"),
("poolid_t", 1, "pr_poolid"),
("zoneid_t", 1, "pr_zoneid"),
("id_t", 1, "pr_contract"),
("int", 1, "pr_filler"),
]
# For 64 bit process, the alignment is off by 4 bytes from pr_pctmem
# field. So add an additional pad here.
if _running_bit == 64:
# this list contains all the known structure description lists
# the entry format is: <structure name>: \
# [ <description>, <format string>, <namedtuple> ]
#
# Note that <format string> and <namedtuple> should be assigned
# None in this table, and then they will get pre-populated
# automatically when this class is instantiated
#
"psinfo_t": [_psinfo_desc, None, None],
"timestruc_t": [_timestruct_desc, None, None],
}
# fill in <format string> and <namedtuple> in _struct_descriptions
# update _struct_descriptions with a format string
v[1] = ""
# update _struct_descriptions with a named tuple
[ i[2] for i in desc ])
"""Unpack 'data' using struct.unpack(). 'name' is the name of
the data we're unpacking and is used to lookup a description
of the data (which in turn is used to build a format string to
decode the data)."""
# lookup the description of the data to unpack
# unpack the data into a list
# check for any nested data that needs unpacking
ctype = v[0]
continue
# return the data in a named tuple
def psinfo():
"""Read the psinfo file and return its contents."""
# This works only on Solaris, in 32-bit or 64-bit mode. It may
# not work on older or newer versions than 5.11. Ideally, we
# would use libproc, or check sbrk(0), but this is expedient.
# In most cases (there's a small chance the file will decode,
# but incorrectly), failure will raise an exception, and we'll
# fail safe.
psinfo_size = 232
psinfo_size = 288
try:
# Catch "Exception"; pylint: disable=W0703
except Exception:
return None
# make sure we got the expected amount of data, otherwise
# unpacking it will fail.
return None
def __getvmusage():
"""Return the amount of virtual memory in bytes currently in use."""
if psinfo is None:
return None
def _prstart():
"""Return the process start time expressed as a floating point number
in seconds since the epoch, in UTC."""
if psinfo is None:
return 0.0
def out_of_memory():
"""Return an out of memory message, for use in a MemoryError handler."""
# figure out how much memory we're using (note that we could run out
# of memory while doing this, so check for that.
vsz = None
try:
vmusage = __getvmusage()
if vmusage is not None:
raise
if vsz is not None:
error = """\
There is not enough memory to complete the requested operation. At least
{vsz} of virtual memory was in use by this command before it ran out of memory.
You must add more memory (swap or physical) or allow the system to access more
existing memory, or quit other programs that may be consuming memory, and try
the operation again."""
else:
error = """\
There is not enough memory to complete the requested operation. You must
add more memory (swap or physical) or allow the system to access more existing
memory, or quit other programs that may be consuming memory, and try the
operation again."""
# EmptyI for argument defaults
# ImmutableDict for argument defaults
# Missing docstring; pylint: disable=C0111
# Unused argument; pylint: disable=W0613
return ImmutableDict()
def __oops():
raise TypeError("Item assignment to ImmutableDict")
# A way to have a dictionary be a property
# Missing docstring; pylint: disable=C0111
raise AttributeError("unreadable attribute")
raise AttributeError("can't set attribute")
raise AttributeError("can't delete attribute")
if self.__iteritems is None:
raise AttributeError("can't iterate over items")
# for Python 3 compatibility
raise AttributeError("can't iterate over keys")
raise AttributeError("can't iterate over "
"values")
if self.__fgetdefault is None:
raise AttributeError("can't use get")
if self.__fsetdefault is None:
raise AttributeError("can't use setdefault")
raise AttributeError("can't use update")
raise AttributeError("can't use pop")
raise AttributeError("can't iterate")
# Unused argument; pylint: disable=W0613
if obj is None:
return self
"""Take the file given in path, open it, and use it to create
an X509 certificate object.
'uri' is an optional value indicating the uri associated with or that
requires the certificate for access.
'pub' is an optional string value containing the name (prefix) of a
related publisher."""
try:
except EnvironmentError as e:
raise
try:
# OpenSSL.crypto.Error
"""Validates the indicated certificate and returns a pyOpenSSL object
representing it if it is valid."""
if cert.has_expired():
# strptime's first argument must be str
# PyOpenSSL's has_expired() doesn't validate the notBefore
# time on the certificate. Don't ask me why.
if diff <= MIN_WARN_DAYS:
return cert
def binary_to_hex(s):
"""Converts a string of bytes to a hexadecimal representation.
"""
def hex_to_binary(s):
"""Converts a string of hex digits to the binary representation.
"""
return unhexlify(s)
def config_temp_root():
"""Examine the environment. If the environment has set TMPDIR, TEMP,
or TMP, return None. This tells tempfile to use the environment
settings when creating temporary files/directories. Otherwise,
return a path that the caller should pass to tempfile instead."""
# In Python's tempfile module, the default temp directory
# includes some paths that are suboptimal for holding large numbers
# of files. If the user hasn't set TMPDIR, TEMP, or TMP in the
# environment, override the default directory for creating a tempfile.
if env_val:
return None
return DEFAULT_TEMP_PATH
def get_temp_root_path():
"""Return the directory path where the temporary directories or
files should be created. If the environment has set TMPDIR
or TEMP or TMP then return the corresponding value else return the
default value."""
if env_val:
return env_val
return DEFAULT_TEMP_PATH
"""Parse the repository location provided and attempt to transform it
into a valid repository URI.
'cwd' is the working directory to use to turn paths into an absolute
path. If not provided, the current working directory is used.
"""
# Convert the file path to a URI.
if not cwd:
if scheme == "file":
# During urlunparsing below, ensure that the path starts with
# only one '/' character, if any are present.
# Rebuild the URI with the sanitized components.
"""Create a directory at the specified location if it does not
already exist (including any parent directories) re-raising any
unexpected exceptions as ApiExceptions.
"""
try:
except EnvironmentError as e:
return
raise api_errors.PermissionsException(
e.filename)
e.filename)
raise
"""This has the same external interface as threading.Lock,
but performs no locking. This is a placeholder object for situations
where we want to be able to do locking, but don't always need a
lock object present. The object has a held value, that is used
for _is_owned. This is informational and doesn't actually
provide mutual exclusion in any way whatsoever."""
# Missing docstring; pylint: disable=C0111
# Unused argument; pylint: disable=W0613
return True
return
"""Set __metaclass__ to Singleton to create a singleton.
**kw)
# Setting the python file buffer size to 128k gives substantial performance
# gains on certain files.
"""Version of relpath to workaround python bug:
"""
"""Change the ownership of all files under directory d to uid:gid."""
"""Generic table-based options parsing function. Returns a tuple
consisting of a list of parsed options in the form (option, argument)
and the remaining unparsed options. The parsed-option list may contain
duplicates if an option is passed multiple times.
'op' is the operation being performed.
'args' is the arguments that should be parsed.
'opts_table' is a list of options the operation supports.
The format of the list entries should be a tuple containing the
option and its default value:
(option, default_value, [valid values], [json schema])
It is valid to have other entries in the list when they are required
for additional option processing elsewhere. These are ignore here. If
the list entry is a tuple it must conform to the format oulined above.
The default value not only represents the default value assigned to the
option, but it also implicitly determines how the option is parsed. If
the default value is True or False, the option doesn't take any
arguments, can only be specified once, and if specified it inverts the
default value. If the default value is 0, the option doesn't take any
arguments, can be specified multiple times, and if specified its value
will be the number of times it was seen. If the default value is
None, the option requires an argument, can only be specified once, and
if specified its value will be its argument string. If the default
value is an empty list, the option requires an argument, may be
specified multiple times, and if specified its value will be a list
with all the specified argument values.
'opts_mapping' is a dict containing a mapping between the option name
and the short and long CLI specifier for that option in the form
{ option : (short, long), ... }
An example of a short opt is "f", which maps to a "-f" option. An
example of a long opt is "foo", which maps to a "--foo" option. Option
is the value of this option in the parsed option dictionary.
'usage_cb' is a function pointer that should display usage information
and will be invoked if invalid arguments are detected.
'use_cli_opts' is to indicate the option type is a CLI option or
a key-value pair option.
'opts_kv' is the user provided opts that should be parsed. It is a
dictionary with key as option name and value as option argument.
"""
if use_cli_opts:
# list for getopt long options
opts_l_list = []
# getopt str for short options
opts_s_str = ""
# dict to map options returned by getopt to keys
else:
opts_name_mapping = {}
for entry in opts_table:
# option table contains functions for verification, ignore here
continue
if use_cli_opts:
assert opt in opts_mapping
# make sure an option was specified
if lopt != "":
else:
if sopt != "":
else:
else:
# Add itself as a mapping for validation.
if opt in opts_mapping:
if optn:
# Parse options.
if use_cli_opts:
try:
except getopt.GetoptError as e:
else:
"""Find the default value for a given option from opts_table."""
for x in opts_table:
continue
if len(x) == 2:
elif len(x) == 3:
elif len(x) == 4:
dummy_schema = x
return default
"""Process option values."""
# Determine required option type based on the default value.
if use_cli_opts:
# Handle duplicates for integer and list types.
else:
return
else:
return
# Boolean and string types can't be repeated.
raise api_errors.InvalidOptionError(
# For boolean options we have to toggle the default value
# when in CLI mode.
if use_cli_opts:
else:
else:
# Assemble the options dictionary by passing in the right data types
# and take care of duplicates.
opt_dict = {}
if use_cli_opts:
for x in opts:
if cli_opt in opts_name_mapping:
else:
raise api_errors.InvalidOptionError(
[cli_opt])
return opt_dict
def api_cmdpath():
"""Returns the path to the executable that is invoking the api client
interfaces."""
cmdpath = None
# DebugValues is a singleton, hence no 'self' arg; pylint: disable=E1120
return cmdpath
def api_pkgcmd():
"""When running a pkg(1) command from within a packaging module, try
to use the same pkg(1) path as our current invocation. If we're
running pkg(1) from some other command (like the gui updater) then
assume that pkg(1) is in the default path."""
pkg_bin = "pkg"
cmdpath = api_cmdpath()
try:
# check if the currently running pkg command
# exists and is accessible.
except OSError:
pass
# propagate debug options
return pkg_cmd
def liveroot():
"""Return path to the current live root image, i.e. the image
that we are running from."""
# DebugValues is a singleton, hence no 'self' arg; pylint: disable=E1120
if not live_root:
live_root = "/"
return live_root
"""Find out how much space is available at the specified path if
it exists; return -1 if path doesn't exist"""
try:
except OSError:
return -1
"""Return the size (in bytes) of a directory and all of its contents."""
try:
return sum(
)
except EnvironmentError as e:
# Access to protected member; pylint: disable=W0212
raise api_errors._convert_error(e)
"""Returns a string containing a listing defined by provided values
in the specified output format.
'desired_field_order' is the list of the fields to show in the order
they should be output left to right.
'field_data' is a dictionary of lists of the form:
{
field_name1: {
[(output formats), field header, initial field value]
},
field_nameN: {
[(output formats), field header, initial field value]
}
}
'field_values' is a generator or list of dictionaries of the form:
{
field_name1: field_value,
field_nameN: field_value
}
'out_format' is the format to use for output. Currently 'default',
'tsv', 'json', and 'json-formatted' are supported. The first is
intended for columnar, human-readable output, and the others for
parsable output.
'def_fmt' is the default Python formatting string to use for the
'default' human-readable output. It must match the fields defined
in 'field_data'.
'omit_headers' is a boolean specifying whether headers should be
included in the listing. (If applicable to the specified output
format.)
'escape_output' is an optional boolean indicating whether shell
metacharacters or embedded control sequences should be escaped
before display. (If applicable to the specified output format.)
"""
# Missing docstring; pylint: disable=C0111
# Custom key function for preserving field ordering
# Functions for manipulating field_data records
return record[1]
return record[2]
if out_format == "tsv":
# Expand tabs if tsv output requested.
# Escape bourne shell metacharacters.
for c in ("\\", " ", "\t", "\n", "'", "`", ";", "&", "(", ")",
"|", "^", "<", ">"):
return nval
elif val == "":
return
elif val is None:
return
else:
nval = []
for v in val:
if v == "":
# Indicate empty string value using "".
elif v is None:
# Indicate no value using empty string.
elif escape_output:
# Otherwise, escape the value to be displayed.
else:
# Caller requested value not be escaped.
nval = None
if multi_value:
if out_format == "default":
# Create a formatting string for the default output
# format.
elif out_format == "tsv":
# Create a formatting string for the tsv output
# format.
num_fields = sum(
1 for k in field_data
if filter_tsv(field_data[k])
)
if out_format == "json-formatted":
# 'json' formats always include any extra fields returned;
# any explicitly named fields are only included if 'json'
# is explicitly listed.
def fmt_val(v):
return v
return [fmt_val(e) for e in v]
if isinstance(v, dict):
v[k] = fmt_val(e)
return v
return str(v)
dict(
for k in entry
)
for entry in field_values
], **args)
if out_format == "json-formatted":
# Include a trailing newline for readability.
return output + "\n"
return output
# Extract the list of headers from the field_data dictionary. Ensure
# they are extracted in the desired order by using the custom sort
# function.
# Output a header if desired.
output = ""
if not omit_headers:
output += "\n"
for entry in field_values:
# In Python 3, map() returns an iterator and will not process
# elements unless being called, so we turn it into a list to
# force it to process elements.
(field_data[f], v)
if f in field_data
)))
output += "\n"
return output
"""Truncate the specified file."""
try:
except IOError:
pass
except OSError as e:
# Access to protected member; pylint: disable=W0212
raise api_errors._convert_error(e)
def flush_output():
"""flush stdout and stderr"""
try:
except IOError:
pass
except OSError as e:
# Access to protected member; pylint: disable=W0212
raise api_errors._convert_error(e)
try:
except IOError:
pass
except OSError as e:
# Access to protected member; pylint: disable=W0212
raise api_errors._convert_error(e)
# valid json types
type(None))
"""A generic json encoder.
'name' a descriptive name of the data we're encoding. If encoding a
class, this would normally be the class name. 'name' is used when
displaying errors to identify the data that caused the errors.
'data' data to encode.
'desc' a description of the data to encode.
'commonize' a list of objects that should be cached by reference.
this is used when encoding objects which may contain multiple
references to a single object. In this case, each reference will be
replaced with a unique id, and the object that was pointed to will
only be encoded once. This ensures that upon decoding we can restore
the original object and all references to it."""
# debugging
if je_state is None and json_debug:
# we don't encode None
if data is None:
return None
# initialize parameters to default
if commonize is None:
if je_state is None:
# this is the first invocation of this function, so "data"
# points to the top-level object that we want to encode. this
# means that if we're commonizing any objects we should
# finalize the object cache when we're done encoding this
# object.
# initialize recursion state
obj_id = [0]
obj_cache = {}
else:
# we're being invoked recursively, do not finalize the object
# cache (since that will be done by a previous invocation of
# this function).
# get recursion state
# check if we're changing the set of objects to commonize
if not commonize:
else:
# update the set of objects to commonize
# make a copy so we don't update our callers state
# verify state
"""if necessary, finalize the object cache and merge it into
the state data.
while encoding, the object cache is a dictionary which
contains tuples consisting of an assigned unique object id
(obj_id) and an encoded object. these tuples are hashed by
the python object id of the original un-encoded python object.
so the hash contains:
{ id(<obj>): ( <obj_id>, <obj_state> ) }
when we finish the object cache we update it so that it
contains just encoded objects hashed by their assigned object
id (obj_id). so the hash contains:
{ str(<obj_id>): <obj_state> }
then we merge the state data and object cache into a single
dictionary and return that.
"""
# Unused argument; pylint: disable=W0613
if not finish:
return data
# json.dump converts integer dictionary keys into strings, so
# we'll convert the object id keys (which are integers) into
obj_cache2 = {}
# Value 'DebugValues' is unsubscriptable;
# pylint: disable=E1136
if DebugValues["plandesc_validate"]:
# debugging
if json_debug:
print("json_encode finished name: ", name,
print("json_encode finished data: ", data,
return data
# check if the description is a type object
else:
# get the expected data type from the description
# get the data type
# sanity check that the data type matches the description
"unexpected {0} for {1}, expected: {2}, value: {3}".format(
# The following situation is only true for Python 2.
# We should not see unicode strings getting passed in. The assert is
# necessary since we use the PkgDecoder hook function during json_decode
# to convert unicode objects back into escaped str objects, which would
# otherwise do that conversion unintentionally.
# we don't need to do anything for basic types
for t in json_types_immediates:
if issubclass(desc_type, t):
# encode elements nested in a dictionary like object
# return elements in a dictionary
# we always return a new dictionary
rv = {}
# check if we're not encoding nested elements
# lookup the first descriptor to see if we have
# generic type description.
# if the key in the first type pair is a type then we
# have a generic type description that applies to all
# keys and values in the dictionary.
# check if the description is a type object
# there can only be one generic type desc
# encode all key / value pairs
# encode the key
# encode the value
# save the result
# we have element specific value type descriptions.
# encode the specific values.
# check for the specific key
continue
# encode the value
# encode elements nested in a list like object
# return elements in a list
# we always return a new list
rv = []
# check for an empty list since we use izip_longest(zip_longest
# in python 3)
# check if we're not encoding nested elements
# don't accidentally generate data via izip_longest(zip_longest
# in python 3)
i = 0
i += 1
# if we're commonizing this object and it's already been encoded then
# just return its encoded object id.
# find an encoder for this class, which should be:
# <class>.getstate(obj, je_state)
# encode the data
# if we're commonizing this object, then assign it an object id and
# save that object id and the encoded object into the object cache
# (which is indexed by the python id for the object).
# return the encoded element
"""A generic json decoder.
'name' a descriptive name of the data. (used to identify unexpected
data errors.)
'desc' a programmatic description of data types.
'data' data to decode."""
# debugging
if jd_state is None and json_debug:
# we don't decode None
if data is None:
return data
# initialize parameters to default
if commonize is None:
if jd_state is None:
# this is the first invocation of this function, so when we
# return we're done decoding data.
# first time here, initialize recursion state
if not commonize:
# no common state
obj_cache = {}
else:
# load commonized state
else:
# we're being invoked recursively.
# check if the first object using commonization
if not commonize_old and commonize:
# merge in any new commonize requests
# check if we're updating the set of objects to commonize
if not commonize:
else:
# update the set of objects to commonize
# make a copy so we don't update our callers state.
if je_state_changed:
# verify state
"""Check if we're done decoding data."""
# Unused argument; pylint: disable=W0613
# check if the description is a type object
else:
# get the expected data type from the description
# get the data type
# sanity check that the data type matches the description
"unexpected {0} for {1}, expected: {2}, value: {3}".format(
if not finish:
return data
# debugging
if json_debug:
print("json_decode finished name: ", name,
print("json_decode finished data: ", data,
return data
# check if the description is a type object
else:
# get the expected data type from the description
# we don't need to do anything for basic types
for t in json_types_immediates:
if issubclass(desc_type, t):
# decode elements nested in a dictionary
# return elements in the specified dictionary like object
# allocate the return object. we don't just use
# type(desc) because that won't work for things like
# collections.defaultdict types.
# check if we're not decoding nested elements
# lookup the first descriptor to see if we have
# generic type description.
# if the key in the descriptor is a type then we have
# a generic type description that applies to all keys
# and values in the dictionary.
# check if the description is a type object
# there can only be one generic type desc
# decode all key / value pairs
# decode the key
# decode the value
# save the result
# we have element specific value type descriptions.
# copy all data and then decode the specific values
# check for the specific key
continue
# decode the value
# decode elements nested in a list
# return elements in the specified list like object
# get the return type
# check for an empty list since we use izip_longest(zip_longest
# in python 3)
# check if we're not decoding nested elements
# don't accidentally generate data via izip_longest(zip_longest
# in python 3)
rv = []
i = 0
i += 1
# find a decoder for this data, which should be:
# <class>.fromstate(state, jd_state)
# if this object was commonized then get a reference to it from the
# object cache.
# json.dump converts integer dictionary keys into strings, so
# obj_cache was indexed by integer strings.
# get the data type
# this commonized object hasn't been decoded yet
# decode it and update the cache with the decoded obj
else:
# decode the data
"""Validate that a named piece of data can be represented in json and
that the data can be passed directly to json.dump(). If the data
can't be represented as json we'll trigger an assert.
'name' is the name of the data to validate
'data' is the data to validate
'recurse' is an optional integer that controls recursion. if it's a
negative number (the default) we recursively check any nested lists or
dictionaries. if it's a positive integer than we only recurse to
the specified depth."""
"invalid json type \"{0}\" for \"{1}\", value: {2}".format(
for k in data:
# json.dump converts integer dictionary keys into
# strings, which is a bit unexpected. so make sure we
# don't have any of those.
"integer dictionary keys detected for: {0}".format(
name)
# validate the key and the value
"""Compare two json encoded objects to make sure they are
identical, assert() if they are not."""
def dbg():
"""dump debug info for json_diff"""
def d(s):
"""dbg helper"""
"Json dictionary keys differ for \"{0}\":\n"
"dict 1 missing: {1}\n"
for k in d0:
"Json list lengths differ for \"{0}\":\n"
"list 1 length: {1}\n"
"""Hook routine used by the JSON module to ensure that unicode objects
are converted to bytes objects in Python 2 and ensures that bytes
objects are converted to str objects in Python 3."""
rvdct = {}
k = force_str(k)
v= force_str(v)
rvdct[k] = v
return rvdct
"""A class which can be used for measuring process times (user,
system, and wait)."""
# we initialize our time values to account for all time used
# since the start of the process. (user and system time are
# obtained relative to process start time, but wall time is an
# absolute time value so here we initialize out initial wall
# time value to the time our process was started.)
"""Return True if a number is zero (up to a certain level of
precision.)"""
"""Return True if all the passed in values are zero."""
continue
s += "]\n"
return s
"""Update saved times to current process values."""
def __get_time():
"""Get current user, system, and wait times for this
process."""
"""Record the difference between the previously saved process
time values and the current values. Then update the saved
values to match the current values"""
if logger:
return rv
"""Exception class for AsyncCall() errors.
Any exceptions caught by the async call thread get bundled into this
Exception because otherwise we'll lose the stack trace associated with
the original exception."""
self.e = e
"""Class which can be used to call a function asynchronously.
The call is performed via a dedicated thread."""
self.e = None
# keep track of what's been done
# internal state
# pre-allocate an exception that we'll used in case everything
# goes horribly wrong.
Exception("AsyncCall Internal Error"))
"""Dedicated call thread.
'dummy' is a dummy parameter that is not used. this is done
because the threading module (which invokes this function)
inspects the first argument of "args" to check if it's
iterable, and that may cause bizarre failures if cb is a
dynamically bound class (like xmlrpclib._Method).
We need to be careful here and catch all exceptions. Since
we're executing in our own thread, any exceptions we don't
catch get dumped to the console."""
# Catch "Exception"; pylint: disable=W0703
try:
# Value 'DebugValues' is unsubscriptable;
# pylint: disable=E1136
if DebugValues["async_thread_error"]:
raise Exception("async_thread_error")
rv = e = None
try:
except Exception as e:
self.e.e = e
return
except Exception as e:
# if we raise an exception here, we're hosed
self.e.e = e
try:
# Value 'DebugValues' is unsubscriptable;
# pylint: disable=E1136
if DebugValues["async_thread_error"]:
raise Exception("async_thread_error")
except Exception:
pass
"""Start a call to an rpc server."""
# prepare the arguments for the thread
if args:
else:
# initialize and return the thread
"""Wait for an rpc call to finish."""
"""Check if an rpc call is done."""
"""Finish a call to an rpc server."""
# wait for the async call thread to exit
if self.e:
# if the calling thread hit an exception, re-raise it
# Raising NoneType; pylint: disable=E0702
raise self.e
"""Given a proxy string and a URI we want to access using it, determine
whether any OS environment variables should override that value.
The special value "-" is returned when a no_proxy environment variable
was found which should apply to this URI, indicating that no proxy
should be used at runtime."""
# There is no upper case version of http_proxy, according to curl(1)
if no_proxy:
if no_proxy_upper:
# Give precedence to protocol-specific proxies, and lowercase versions.
elif environ_all_proxy:
elif environ_all_proxy_upper:
if no_proxy or no_proxy_upper:
# SplitResult has a netloc member; pylint: disable=E1103
return "-"
return "-"
if not runtime_proxy:
return
return runtime_proxy
def decode(s):
"""convert non-ascii strings to unicode;
replace non-convertable chars"""
return s
try:
# this will fail if any 8 bit chars in string
# this is a nop if string is ascii.
s = s.encode("ascii")
except ValueError:
# this will encode 8 bit strings into unicode
return s
"""Helper function for yielding items that match one of the provided
patterns."""
if patterns:
# Normalize patterns and determine whether to glob.
npatterns = []
for p in patterns:
if pat_prefix:
p or (pat_prefix + p)
else:
pat = p
if "*" in p or "?" in p:
else:
npatterns = None
if glob_match:
break
break
else:
if patterns:
continue
# No patterns or matched at least one.
yield item
"""convert signal number to name(s)"""
if not sigdict:
"""Produces a list of n tuples (where n is the length of attrs)
containing the relevant information about the actions.
The "actionlist" parameter is a list of tuples which contain the fmri
of the package that's the source of the action, the action, and the
publisher the action's package came from. If the actionlist was
generated by searching, the last two pieces, "match" and "match_type"
contain information about why this action was selected.
The "attrs" parameter is a list of the attributes of the action that
should be displayed.
The "show_all" parameter determines whether an action that lacks one
or more of the desired attributes will be displayed or not.
The "remove_consec_dup_lines" parameter determines whether consecutive
duplicate lines should be removed from the results.
The "last_res" parameter is a seed to compare the first result against
for duplicate removal.
"""
# Assert that if last_res is set, we should be removing duplicate
# lines.
assert remove_consec_dup_lines or not last_res
line = []
elif attr == "action.name":
elif attr == "action.key":
elif attr == "action.raw":
a = action
elif attr == "pkg.name":
elif attr == "pkg.fmri":
elif attr == "pkg.shortfmri":
a = pfmri.get_short_fmri()
elif attr == "pkg.publisher":
a = pfmri.get_publisher()
if a is None:
a = pub
if a is None:
a = ""
elif attr == "search.match":
a = match
elif attr == "search.match_type":
a = match_type
else:
a = ""
# Too many boolean expressions in if statement;
# pylint: disable=R0916
and (not remove_consec_dup_lines or last_line is None or
yield line
"""Calculate the minimal edit distance for converting word1 to word2,
based on Wagner-Fischer algorithm."""
# dp[i][j] stands for the edit distance between two strings with
# length i and j, i.e., word1[0,...,i-1] and word2[0,...,j-1]
ins_cost = 1.0
del_cost = 1.0
rep_cost = 1.0
for i in range(m+1):
for i in range(n+1):
else:
return dp[m][n]
"""Given a text, a list of known_words, suggest some correct
candidates from known_words."""
candidates = []
if not text:
return candidates
# We are confident to suggest if the text is part of the known words.
for known in known_words:
# If the text's length is short, treat it as a prefix.
# Otherwise check if the text is part of the known
# words or vice verse.
if candidates:
return candidates
else:
# Give up suggestions if there are too many candidates.
return
# If there are no candidates from the "contains" check, use the edit
# distance algorithm to seek further.
for known in known_words:
# Sort the candidates by their distance, and return the words only.
def smallest_diff_key(a, b):
"""Return the smallest key 'k' in 'a' such that a[k] != b[k]."""
if not keys:
return None
def dict_cmp(a, b):
"""cmp method for dictionary, translated from the source code
adiff = smallest_diff_key(a, b)
bdiff = smallest_diff_key(b, a)
return 0
def cmp(a, b):
"""Implementaion for Python 2.7's built-in function cmp(), which is
removed in Python 3."""
return dict_cmp(a, b)
try:
if a == b:
return 0
elif a < b:
return -1
else:
return 1
except TypeError:
if a is None and b:
return -1
if a and b is None:
return 1
return NotImplemented
"""Limit memory consumption of current process to 'bytes'."""
if allow_override:
try:
except (KeyError, ValueError):
pass
try:
except AttributeError:
# If platform doesn't support RLIMIT_DATA, just ignore it.
pass
except ValueError:
# An unprivileged user can not raise a previously set limit,
# if that ever happens, just ignore it.
pass
"""Force the string into bytes."""
if isinstance(s, bytes):
return s
try:
# this case is: unicode in Python 2 and str in Python 3
# type not a string and Python 3's bytes() requires
# a string argument
# type not a string
s = bytes(s)
except UnicodeEncodeError:
raise
return s
"""Force the string into text."""
return s
try:
# this case is: str(bytes) in Python 2 and bytes in
# Python 3
else:
# type not a string
except UnicodeDecodeError as e:
return s
# force_str minimizes the work for compatible string handling between Python
# 2 and 3 because we will have the native string type in its runtime, that is,
# bytes in Python 2 and unicode string in Python 3.
else:
"""Safely open files that ensures that the path we'are accessing resides
within a specified image root.
'root' is a directory that the path must reside in.
"""
try:
except EnvironmentError as e:
# Access to protected member; pylint: disable=W0212
raise api_errors._convert_error(e)
# If it is a symbolic link, fall back to ar_open. ar_open interprets
# 'path' as relative to 'root', that is, 'root' will be prepended to
# 'path', so we need to call os.path.relpath here.
"""Check if 'cert' is a proper CA. For this the BasicConstraints need to
identify it as a CA cert and it needs to have the CertSign
(key_cert_sign in Cryptography) KeyUsage flag. Based loosely on
OpenSSL's check_ca()"""
bconst_ca = None
kuse_sign = None
for e in cert.extensions:
"""Set the open file descriptor soft limit."""
if printer is None:
try:
except (OSError, ValueError) as e:
printer(_("unable to set open file limit to {0}; please "
"increase the open file limit using 'ulimit -n'"
" and try the requested operation again: {1}")\
name cannot contain whitespace"""