utils.py revision f5f22d30238f9adf5a42a510ff1683bca461600c
# -*- coding: utf-8 -*-
# $Id$
# pylint: disable=C0302
"""
Common Utility Functions.
"""
__copyright__ = \
"""
Copyright (C) 2012-2014 Oracle Corporation
This file is part of VirtualBox Open Source Edition (OSE), as
available from http://www.virtualbox.org. This file is free software;
General Public License (GPL) as published by the Free Software
Foundation, in version 2 as it comes in the "COPYING" file of the
VirtualBox OSE distribution. VirtualBox OSE is distributed in the
hope that it will be useful, but WITHOUT ANY WARRANTY of any kind.
The contents of this file may alternatively be used under the terms
of the Common Development and Distribution License Version 1.0
(CDDL) only, as it comes in the "COPYING.CDDL" file of the
VirtualBox OSE distribution, in which case the provisions of the
CDDL are applicable instead of those of the GPL.
You may elect to license modified versions of this file under the
terms and conditions of either the GPL or the CDDL or both.
"""
__version__ = "$Revision$"
# Standard Python imports.
import datetime;
import os;
import platform;
import re;
import stat;
import subprocess;
import sys;
import tarfile;
import time;
import traceback;
import unittest;
import zipfile
import win32api; # pylint: disable=F0401
import win32con; # pylint: disable=F0401
import win32console; # pylint: disable=F0401
import win32process; # pylint: disable=F0401
else:
import signal;
# Python 3 hacks:
#
# Host OS and CPU.
#
def getHostOs():
"""
Gets the host OS name (short).
See the KBUILD_OSES variable in kBuild/header.kmk for possible return values.
"""
elif sPlatform == 'Windows':
sPlatform = 'win';
elif sPlatform == 'SunOS':
sPlatform = 'solaris';
else:
return sPlatform;
g_sHostArch = None;
def getHostArch():
"""
Gets the host CPU architecture.
See the KBUILD_ARCHES variable in kBuild/header.kmk for possible return values.
"""
global g_sHostArch;
if g_sHostArch is None:
sArch = 'x86';
sArch = 'amd64';
sArch = 'amd64';
else:
try:
except:
pass;
if sArch != 'amd64':
sArch = 'x86';
else:
g_sHostArch = sArch;
return g_sHostArch;
def getHostOsDotArch():
"""
Gets the 'os.arch' for the host.
"""
"""
Validates the OS name.
"""
if sOs in ('darwin', 'dos', 'dragonfly', 'freebsd', 'haiku', 'l4', 'linux', 'netbsd', 'nt', 'openbsd', \
'os2', 'solaris', 'win', 'os-agnostic'):
return True;
return False;
def isValidArch(sArch):
"""
Validates the CPU architecture name.
"""
'mips32', 'mips64', 'ia64', 'hppa32', 'hppa64', 'arm', 'alpha'):
return True;
return False;
def isValidOsDotArch(sOsDotArch):
"""
Validates the 'os.arch' string.
"""
return False;
def getHostOsVersion():
"""
Returns the host OS version. This is platform.release with additional
distro indicator on linux.
"""
if sOs == 'linux':
sDist = '';
try:
# try /etc/lsb-release first to distinguish between Debian and Ubuntu
if oMatch is not None:
except:
pass;
if sDist:
else:
asFiles = \
[
[ '/etc/debian_version', 'Debian v'],
[ '/etc/gentoo-release', '' ],
[ '/etc/redhat-release', '' ],
[ '/etc/SuSE-release', '' ],
];
try:
except:
continue;
break;
elif sOs == 'solaris':
try:
except:
pass;
elif sOs == 'darwin':
"5": "Leopard",
"6": "Snow Leopard",
"7": "Lion",
"8": "Mountain Lion",
"9": "Mavericks",
"10": "Yosemite"}
return sVersion;
#
# File system.
#
"""
Wrapper around open() that tries it's best to make sure the file isn't
inherited by child processes.
This is a best effort thing at the moment as it doesn't synchronizes with
child process spawning in any way. Thus it can be subject to races in
multithreaded programs.
"""
try:
except:
#try:
#except:
# pass;
return oFile;
"""
No exceptions os.readlink wrapper.
"""
try:
except:
return sRet;
"""
Reads the entire file.
"""
return sRet;
"""
No exceptions common.readFile wrapper.
"""
try:
except:
return sRet;
"""
No exceptions os.rmdir wrapper.
"""
try:
except:
return oRet;
"""
No exceptions os.remove wrapper.
"""
try:
except:
return oRet;
#
# SubProcess.
#
"""
If the "executable" is a python script, insert the python interpreter at
the head of the argument list so that it will work on systems which doesn't
support hash-bang scripts.
"""
if asArgs is None:
else:
# paranoia...
else:
return None;
"""
Wrapper around subprocess.call to deal with its absense in older
python versions.
Returns process exit code (see subprocess.poll).
"""
"""
Wrapper around subprocess.check_output to deal with its absense in older
python versions.
"""
if iExitCode is not 0:
if asArgs is None:
print(sOutput);
g_fOldSudo = None;
"""
Adds 'sudo' (or similar) to the args parameter, whereever it is.
"""
# Are we root?
try:
except:
pass;
# If not, prepend sudo (non-interactive, simulate initial login).
if asArgs is None:
# Detect old sudo.
global g_fOldSudo;
if g_fOldSudo is None:
try:
except:
sVersion = '1.7.0';
if not g_fOldSudo:
if fInitialEnv and not g_fOldSudo:
# paranoia...
else:
return None;
"""
sudo (or similar) + subprocess.call
"""
"""
sudo (or similar) + subprocess.check_output.
"""
"""
sudo (or similar) + subprocess.check_output, except '-i' isn't used.
"""
"""
sudo (or similar) + subprocess.Popen.
"""
#
# Generic process stuff.
#
def processInterrupt(uPid):
"""
Sends a SIGINT or equivalent to interrupt the specified process.
Returns True on success, False on failure.
On Windows hosts this may not work unless the process happens to be a
process group leader.
"""
try:
except:
else:
try:
except:
return fRc;
def sendUserSignal1(uPid):
"""
Sends a SIGUSR1 or equivalent to nudge the process into shutting down
(VBoxSVC) or something.
Returns True on success, False on failure or if not supported (win).
On Windows hosts this may not work unless the process happens to be a
process group leader.
"""
else:
try:
except:
return fRc;
def processTerminate(uPid):
"""
Terminates the process in a nice manner (SIGTERM or equivalent).
Returns True on success, False on failure.
"""
try:
except:
pass;
else:
try:
except:
pass;
else:
try:
except:
pass;
return fRc;
def processKill(uPid):
"""
Terminates the process with extreme prejudice (SIGKILL).
Returns True on success, False on failure.
"""
else:
try:
except:
return fRc;
"""
Like processKill(), but checks if the process name matches before killing
it. This is intended for killing using potentially stale pid values.
Returns True on success, False on failure.
"""
return False;
return processKill(uPid);
def processExists(uPid):
"""
Checks if the specified process exits.
Returns True if it positively exists, False otherwise.
"""
try:
except:
pass;
else:
else:
try:
except:
return fRc;
"""
Checks if a process PID and NAME matches.
"""
return False;
try:
for oProcess in aoProcesses:
#reporter.log2('uPid=%s sName=%s sCurName=%s' % (uPid, sName, sCurName));
break;
except:
#reporter.logXcpt('uPid=%s sName=%s' % (uPid, sName));
pass;
else:
else:
asPsCmd = None;
if asPsCmd is not None:
try:
except:
#reporter.logXcpt();
return False;
# ps fails with non-zero exit code if the pid wasn't found.
if iExitCode is not 0:
return False;
if sCurName is None:
return False;
if sCurName is '':
return False;
return False;
return fRc;
class ProcessInfo(object):
"""Process info."""
self.iParentPid = None;
self.iProcGroup = None;
self.iSessionId = None;
"""Load all the info."""
if sOs == 'linux':
elif sOs == 'solaris':
else:
pass;
"""Windows specific loadAll."""
except: pass;
except: pass;
except: pass;
except: pass;
except: pass;
def getBaseImageName(self):
"""
Gets the base image name if available, use the process name if not available.
"""
if sRet is None:
if sRet is None:
return None;
return None;
"""
Same as getBaseImageName, except any '.exe' or similar suffix is stripped.
"""
return sRet;
def processListAll(): # pylint: disable=R0914
"""
Return a list of ProcessInfo objects for all the processes in the system
that the current user can see.
"""
asProcesses = [];
if sOs == 'win':
for oProcess in aoProcesses:
try:
except:
continue;
try:
except:
asDirs = [];
elif sOs == 'darwin':
# Try our best to parse ps output. (Not perfect but does the job most of the time.)
try:
'-o', 'pid=',
'-o', 'ppid=',
'-o', 'pgid=',
'-o', 'sess=',
'-o', 'uid=',
'-o', 'gid=',
'-o', 'comm=' ]);
except:
return asProcesses;
continue;
iField = 0;
off = 0;
aoFields = [None, None, None, None, None, None, None];
while iField < 7:
# Eat whitespace.
off += 1;
# Final field / EOL.
if iField == 6:
break;
break;
# Generic field parsing.
off += 1;
off += 1;
try:
if iField != 3:
else:
except:
pass;
iField += 1;
if aoFields[0] is not None:
return asProcesses;
"""
Looks for information regarding the demise of the given process.
"""
if sOs == 'darwin':
#
# On darwin we look for crash and diagnostic reports.
#
asLogDirs = [
u'/Library/Logs/DiagnosticReports/',
u'/Library/Logs/CrashReporter/',
u'~/Library/Logs/DiagnosticReports/',
u'~/Library/Logs/CrashReporter/',
];
continue;
try:
except:
continue;
for sEntry in asDirEntries:
# Only interested in .crash files.
if sSuff != '.crash':
continue;
# The pid can be found at the end of the first line.
try:
except:
continue;
continue;
offPid -= 1;
except: continue;
# Does the pid we found match?
if uReportPid == uPid:
elif sOs == 'win':
#
# Getting WER reports would be great, however we have trouble match the
# PID to those as they seems not to mention it in the brief reports.
# Instead we'll just look for crash dumps in C:\CrashDumps (our custom
# location - see the windows readme for the testbox script) and what
# the MSDN article lists for now.
#
# It's been observed on Windows server 2012 that the dump files takes
# the form: <processimage>.<decimal-pid>.dmp
#
asDmpDirs = [
u'%SystemDrive%/CrashDumps/', # Testboxes.
u'%LOCALAPPDATA%/CrashDumps/', # MSDN example.
u'%WINDIR%/ServiceProfiles/LocalServices/', # Local and network service.
u'%WINDIR%/ServiceProfiles/NetworkSerices/',
u'%WINDIR%/ServiceProfiles/',
u'%WINDIR%/System32/Config/SystemProfile/', # System services.
];
continue;
try:
except:
continue;
for sEntry in asDirEntries:
else:
pass; ## TODO
return None;
#
# Time.
#
def timestampNano():
"""
Gets a nanosecond timestamp.
"""
def timestampMilli():
"""
Gets a millisecond timestamp.
"""
def timestampSecond():
"""
Gets a second timestamp.
"""
def getTimePrefix():
"""
Returns a timestamp prefix, typically used for logging. UTC.
"""
try:
except:
sTs = 'getTimePrefix-exception';
return sTs;
def getTimePrefixAndIsoTimestamp():
"""
Returns current UTC as log prefix and iso timestamp.
"""
try:
except:
def formatIsoTimestamp(oNow):
"""Formats the datetime object as an ISO timestamp."""
return sTs;
def getIsoTimestamp():
"""Returns the current UTC timestamp as a string."""
def getLocalHourOfWeek():
""" Local hour of week (0 based). """
""" Format a seconds interval into a nice 01h 00m 22s string """
# Two simple special cases.
if cSeconds < 60:
return '%ss' % (cSeconds,);
if cSeconds < 3600:
if cSecs == 0:
return '%sm' % (cMins,);
# Generic and a bit slower.
cSeconds %= 86400;
cSeconds %= 3600;
sRet = '';
if cDays > 0:
if cHours > 0:
if cMins > 0:
if cSecs > 0:
return sRet[:-1];
"""
Flexible input version of formatIntervalSeconds for use in WUI forms where
data is usually already string form.
"""
return formatIntervalSeconds(oSeconds);
try:
except:
pass;
else:
if lSeconds >= 0:
return formatIntervalSeconds2(lSeconds);
return oSeconds;
def parseIntervalSeconds(sString):
"""
Reverse of formatIntervalSeconds.
Returns (cSeconds, sError), where sError is None on success.
"""
# We might given non-strings, just return them without any fuss.
return (sString, None);
# Strip it and make sure it's not empty.
return (0, 'Empty interval string.');
#
# Split up the input into a list of 'valueN, unitN, ...'.
#
# Don't want to spend too much time trying to make re.split do exactly what
# I need here, so please forgive the extra pass I'm making here.
#
asParts = [];
for sPart in asRawParts:
return (0, 'Empty interval string or something?');
#
# Process them one or two at the time.
#
cSeconds = 0;
asErrors = [];
i = 0;
i += 1;
sUnit = 's';
i += 1;
pass;
iNumber *= 60;
iNumber *= 3600;
iNumber *= 86400;
else:
else:
def formatIntervalHours(cHours):
""" Format a hours interval into a nice 1w 2d 1h string. """
# Simple special cases.
if cHours < 24:
return '%sh' % (cHours,);
# Generic and a bit slower.
cHours %= 24;
sRet = '';
if cWeeks > 0:
if cDays > 0:
if cHours > 0:
return sRet[:-1];
def parseIntervalHours(sString):
"""
Reverse of formatIntervalHours.
Returns (cHours, sError), where sError is None on success.
"""
# We might given non-strings, just return them without any fuss.
return (sString, None);
# Strip it and make sure it's not empty.
return (0, 'Empty interval string.');
#
# Split up the input into a list of 'valueN, unitN, ...'.
#
# Don't want to spend too much time trying to make re.split do exactly what
# I need here, so please forgive the extra pass I'm making here.
#
asParts = [];
for sPart in asRawParts:
return (0, 'Empty interval string or something?');
#
# Process them one or two at the time.
#
cHours = 0;
asErrors = [];
i = 0;
i += 1;
sUnit = 'h';
i += 1;
pass;
iNumber *= 24;
else:
else:
#
# Introspection.
#
"""
Returns the name of the caller's caller.
"""
if oFrame is None:
try:
raise Exception();
except:
while iFrame > 1:
if oFrame is not None:
if oFrame is not None:
return sName;
return "unknown";
"""
Gets text detailing the exception. (Good for logging.)
Returns list of info strings.
"""
#
# Try get exception info.
#
try:
except:
if oType is not None:
#
# Try format the info
#
asRet = [];
try:
try:
else:
except:
except:
else:
asRet = ['Couldn\'t find exception traceback.'];
return asRet;
#
# TestSuite stuff.
#
"""
Checks if we're running from the SVN checkout or not.
"""
try:
cScriptDepth = 1;
except:
while cScriptDepth >= 0:
return True;
cScriptDepth -= 1;
return False;
#
# Bourne shell argument fun.
#
"""
Given a bourne shell command line invocation, split it up into arguments
assuming IFS is space.
Returns None on syntax error.
"""
## @todo bourne shell argument parsing!
def argsGetFirst(sCmdLine):
"""
Given a bourne shell command line invocation, get return the first argument
assuming IFS is space.
Returns None on invalid syntax, otherwise the parsed and unescaped argv[0] string.
"""
return None;
return asArgs[0];
#
# String helpers.
#
"""
Compares to strings in an case insensitive fashion.
Python doesn't seem to have any way of doing the correctly, so this is just
an approximation using lower.
"""
return 0;
return 0;
return -1;
return 1;
#
# Misc.
#
"""
Compares to version strings in a fashion similar to RTStrVersionCompare.
"""
## @todo implement me!!
return 0;
return -1;
return 1;
"""
Formats a decimal number with pretty separators.
"""
while off > 0:
off -= 3;
return sRet;
def formatNumberNbsp(lNum):
"""
Formats a decimal number with pretty separators.
"""
"""
Checks if the object is a string object, hiding difference between python 2 and 3.
Returns True if it's a string of some kind.
Returns False if not.
"""
def hasNonAsciiCharacters(sText):
"""
Returns True is specified string has non-ASCII characters.
"""
def chmodPlusX(sFile):
"""
Makes the specified file or directory executable.
Returns success indicator, no exceptions.
Note! Symbolic links are followed and the target will be changed.
"""
try:
except:
return False;
try:
except:
return False;
return True;
"""
Unpacks the given file if it has a know archive extension, otherwise do
nothing.
Returns list of the extracted files (full path) on success.
Returns empty list if not a supported archive format.
Returns None on failure. Raises no exceptions.
"""
if fnError is None:
asMembers = [];
try:
else:
return None;
try:
return None;
else:
return [];
#
# Change asMembers to local slashes and prefix with path.
#
asMembersRet = [];
return asMembersRet;
def getDiskUsage(sPath):
"""
Get free space of a partition that corresponds to specified sPath in MB.
Returns partition free space value in MB.
"""
import ctypes
else:
# Convert to MB
return cMbFreeSpace;
#
# Unit testing.
#
# pylint: disable=C0111
def testIntervalSeconds(self):
if __name__ == '__main__':
# not reached.