a5e7ae69e440f6816420fc99599f044e79e716b6vboxsync#!/usr/bin/env python
a5e7ae69e440f6816420fc99599f044e79e716b6vboxsync# -*- coding: utf-8 -*-
a5e7ae69e440f6816420fc99599f044e79e716b6vboxsync# pylint: disable=C0302
a5e7ae69e440f6816420fc99599f044e79e716b6vboxsync
a5e7ae69e440f6816420fc99599f044e79e716b6vboxsync"""
a5e7ae69e440f6816420fc99599f044e79e716b6vboxsyncVirtualBox Validation Kit - Guest Control Tests.
a5e7ae69e440f6816420fc99599f044e79e716b6vboxsync"""
a5e7ae69e440f6816420fc99599f044e79e716b6vboxsync
a5e7ae69e440f6816420fc99599f044e79e716b6vboxsync__copyright__ = \
a5e7ae69e440f6816420fc99599f044e79e716b6vboxsync"""
a5e7ae69e440f6816420fc99599f044e79e716b6vboxsyncCopyright (C) 2010-2014 Oracle Corporation
a5e7ae69e440f6816420fc99599f044e79e716b6vboxsync
a5e7ae69e440f6816420fc99599f044e79e716b6vboxsyncThis file is part of VirtualBox Open Source Edition (OSE), as
a5e7ae69e440f6816420fc99599f044e79e716b6vboxsyncavailable from http://www.virtualbox.org. This file is free software;
a5e7ae69e440f6816420fc99599f044e79e716b6vboxsyncyou can redistribute it and/or modify it under the terms of the GNU
a5e7ae69e440f6816420fc99599f044e79e716b6vboxsyncGeneral Public License (GPL) as published by the Free Software
a5e7ae69e440f6816420fc99599f044e79e716b6vboxsyncFoundation, in version 2 as it comes in the "COPYING" file of the
a5e7ae69e440f6816420fc99599f044e79e716b6vboxsyncVirtualBox OSE distribution. VirtualBox OSE is distributed in the
a5e7ae69e440f6816420fc99599f044e79e716b6vboxsynchope that it will be useful, but WITHOUT ANY WARRANTY of any kind.
a5e7ae69e440f6816420fc99599f044e79e716b6vboxsync
a5e7ae69e440f6816420fc99599f044e79e716b6vboxsyncThe contents of this file may alternatively be used under the terms
a5e7ae69e440f6816420fc99599f044e79e716b6vboxsyncof the Common Development and Distribution License Version 1.0
a5e7ae69e440f6816420fc99599f044e79e716b6vboxsync(CDDL) only, as it comes in the "COPYING.CDDL" file of the
a5e7ae69e440f6816420fc99599f044e79e716b6vboxsyncVirtualBox OSE distribution, in which case the provisions of the
a5e7ae69e440f6816420fc99599f044e79e716b6vboxsyncCDDL are applicable instead of those of the GPL.
a5e7ae69e440f6816420fc99599f044e79e716b6vboxsync
a5e7ae69e440f6816420fc99599f044e79e716b6vboxsyncYou may elect to license modified versions of this file under the
a5e7ae69e440f6816420fc99599f044e79e716b6vboxsyncterms and conditions of either the GPL or the CDDL or both.
a5e7ae69e440f6816420fc99599f044e79e716b6vboxsync"""
a5e7ae69e440f6816420fc99599f044e79e716b6vboxsync__version__ = "$Revision$"
a5e7ae69e440f6816420fc99599f044e79e716b6vboxsync
a5e7ae69e440f6816420fc99599f044e79e716b6vboxsync# Disable bitching about too many arguments per function.
a5e7ae69e440f6816420fc99599f044e79e716b6vboxsync# pylint: disable=R0913
a5e7ae69e440f6816420fc99599f044e79e716b6vboxsync
a5e7ae69e440f6816420fc99599f044e79e716b6vboxsync## @todo Convert map() usage to a cleaner alternative Python now offers.
a5e7ae69e440f6816420fc99599f044e79e716b6vboxsync# pylint: disable=W0141
a5e7ae69e440f6816420fc99599f044e79e716b6vboxsync
a5e7ae69e440f6816420fc99599f044e79e716b6vboxsync## @todo Convert the context/test classes into named tuples. Not in the mood right now, so
a5e7ae69e440f6816420fc99599f044e79e716b6vboxsync# disabling it.
a5e7ae69e440f6816420fc99599f044e79e716b6vboxsync# pylint: disable=R0903
a5e7ae69e440f6816420fc99599f044e79e716b6vboxsync
a5e7ae69e440f6816420fc99599f044e79e716b6vboxsync# Standard Python imports.
a5e7ae69e440f6816420fc99599f044e79e716b6vboxsyncfrom array import array
a5e7ae69e440f6816420fc99599f044e79e716b6vboxsyncimport errno
a5e7ae69e440f6816420fc99599f044e79e716b6vboxsyncimport os
import random
import string # pylint: disable=W0402
import struct
import sys
import time
# Only the main script needs to modify the path.
try: __file__
except: __file__ = sys.argv[0];
g_ksValidationKitDir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))));
sys.path.append(g_ksValidationKitDir);
# Validation Kit imports.
from testdriver import reporter;
from testdriver import base;
from testdriver import vbox;
from testdriver import vboxcon;
from testdriver import vboxwrappers;
class GuestStream(bytearray):
"""
Class for handling a guest process input/output stream.
"""
def appendStream(self, stream, convertTo='<b'):
"""
Appends and converts a byte sequence to this object;
handy for displaying a guest stream.
"""
self.extend(struct.pack(convertTo, stream));
class tdCtxTest(object):
"""
Provides the actual test environment. Should be kept
as generic as possible.
"""
def __init__(self, oSession, oTxsSession, oTestVm): # pylint: disable=W0613
## The desired Main API result.
self.fRc = False;
## IGuest reference.
self.oGuest = oSession.o.console.guest;
# Rest not used (yet).
class tdCtxCreds(object):
"""
Provides credentials to pass to the guest.
"""
def __init__(self, sUser, sPassword, sDomain):
self.sUser = sUser;
self.sPassword = sPassword;
self.sDomain = sDomain;
class tdTestGuestCtrlBase(object):
"""
Base class for all guest control tests.
Note: This test ASSUMES that working Guest Additions
were installed and running on the guest to be tested.
"""
def __init__(self):
self.oTest = None;
self.oCreds = None;
self.timeoutMS = 30 * 1000; # 30s timeout
## IGuestSession reference or None.
self.oGuestSession = None;
def setEnvironment(self, oSession, oTxsSession, oTestVm):
"""
Sets the test environment required for this test.
"""
self.oTest = tdCtxTest(oSession, oTxsSession, oTestVm);
return self.oTest;
def createSession(self, sName):
"""
Creates (opens) a guest session.
Returns (True, IGuestSession) on success or (False, None) on failure.
"""
if self.oGuestSession is None:
if sName is None:
sName = "<untitled>";
try:
reporter.log('Creating session "%s" ...' % (sName,));
self.oGuestSession = self.oTest.oGuest.createSession(self.oCreds.sUser,
self.oCreds.sPassword,
self.oCreds.sDomain,
sName);
except:
# Just log, don't assume an error here (will be done in the main loop then).
reporter.logXcpt('Creating a guest session "%s" failed; sUser="%s", pw="%s", sDomain="%s":'
% (sName, self.oCreds.sUser, self.oCreds.sPassword, self.oCreds.sDomain));
return (False, None);
try:
reporter.log('Waiting for session "%s" to start within %ldms...' % (sName, self.timeoutMS));
fWaitFor = [ vboxcon.GuestSessionWaitForFlag_Start ];
waitResult = self.oGuestSession.waitForArray(fWaitFor, self.timeoutMS);
#
# Be nice to Guest Additions < 4.3: They don't support session handling and
# therefore return WaitFlagNotSupported.
#
if waitResult != vboxcon.GuestSessionWaitResult_Start \
and waitResult != vboxcon.GuestSessionWaitResult_WaitFlagNotSupported:
# Just log, don't assume an error here (will be done in the main loop then).
reporter.log('Session did not start successfully, returned wait result: %ld' \
% (waitResult));
return (False, None);
reporter.log('Session "%s" successfully started' % (sName,));
except:
# Just log, don't assume an error here (will be done in the main loop then).
reporter.logXcpt('Waiting for guest session "%s" to start failed:' % (sName));
return (False, None);
else:
reporter.log('Warning: Session already set; this is probably not what you want');
return (True, self.oGuestSession);
def setSession(self, oGuestSession):
"""
Sets the current guest session and closes
an old one if necessary.
"""
if self.oGuestSession is not None:
self.closeSession();
self.oGuestSession = oGuestSession;
return self.oGuestSession;
def closeSession(self):
"""
Closes the guest session.
"""
if self.oGuestSession is not None:
sName = self.oGuestSession.name;
try:
reporter.log('Closing session "%s" ...' % (sName,));
self.oGuestSession.close();
self.oGuestSession = None;
except:
# Just log, don't assume an error here (will be done in the main loop then).
reporter.logXcpt('Closing guest session "%s" failed:' % (sName,));
return False;
return True;
class tdTestCopyFrom(tdTestGuestCtrlBase):
"""
Test for copying files from the guest to the host.
"""
def __init__(self, sSrc = "", sDst = "", sUser = "", sPassword = "", aFlags = None):
tdTestGuestCtrlBase.__init__(self);
self.oCreds = tdCtxCreds(sUser, sPassword, sDomain = "");
self.sSrc = sSrc;
self.sDst = sDst;
self.aFlags = aFlags;
class tdTestCopyTo(tdTestGuestCtrlBase):
"""
Test for copying files from the host to the guest.
"""
def __init__(self, sSrc = "", sDst = "", sUser = "", sPassword = "", aFlags = None):
tdTestGuestCtrlBase.__init__(self);
self.oCreds = tdCtxCreds(sUser, sPassword, sDomain = "");
self.sSrc = sSrc;
self.sDst = sDst;
self.aFlags = aFlags;
class tdTestDirCreate(tdTestGuestCtrlBase):
"""
Test for directoryCreate call.
"""
def __init__(self, sDirectory = "", sUser = "", sPassword = "", fMode = 0, aFlags = None):
tdTestGuestCtrlBase.__init__(self);
self.oCreds = tdCtxCreds(sUser, sPassword, sDomain = "");
self.sDirectory = sDirectory;
self.fMode = fMode;
self.aFlags = aFlags;
class tdTestDirCreateTemp(tdTestGuestCtrlBase):
"""
Test for the directoryCreateTemp call.
"""
def __init__(self, sDirectory = "", sTemplate = "", sUser = "", sPassword = "", fMode = 0, fSecure = False):
tdTestGuestCtrlBase.__init__(self);
self.oCreds = tdCtxCreds(sUser, sPassword, sDomain = "");
self.sDirectory = sDirectory;
self.sTemplate = sTemplate;
self.fMode = fMode;
self.fSecure = fSecure;
class tdTestDirOpen(tdTestGuestCtrlBase):
"""
Test for the directoryOpen call.
"""
def __init__(self, sDirectory = "", sUser = "", sPassword = "",
sFilter = "", aFlags = None):
tdTestGuestCtrlBase.__init__(self);
self.oCreds = tdCtxCreds(sUser, sPassword, sDomain = "");
self.sDirectory = sDirectory;
self.sFilter = sFilter;
self.aFlags = aFlags or [];
class tdTestDirRead(tdTestDirOpen):
"""
Test for the opening, reading and closing a certain directory.
"""
def __init__(self, sDirectory = "", sUser = "", sPassword = "",
sFilter = "", aFlags = None):
tdTestDirOpen.__init__(self, sDirectory, sUser, sPassword, sFilter, aFlags);
class tdTestExec(tdTestGuestCtrlBase):
"""
Specifies exactly one guest control execution test.
Has a default timeout of 5 minutes (for safety).
"""
def __init__(self, sCmd = "", aArgs = None, aEnv = None, \
aFlags = None, timeoutMS = 5 * 60 * 1000, \
sUser = "", sPassword = "", sDomain = "", \
fWaitForExit = True):
tdTestGuestCtrlBase.__init__(self);
self.oCreds = tdCtxCreds(sUser, sPassword, sDomain);
self.sCmd = sCmd;
self.aArgs = aArgs;
self.aEnv = aEnv;
self.aFlags = aFlags or [];
self.timeoutMS = timeoutMS;
self.fWaitForExit = fWaitForExit;
self.uExitStatus = 0;
self.iExitCode = 0;
self.cbStdOut = 0;
self.cbStdErr = 0;
self.sBuf = '';
class tdTestFileExists(tdTestGuestCtrlBase):
"""
Test for the file exists API call (fileExists).
"""
def __init__(self, sFile = "", sUser = "", sPassword = ""):
tdTestGuestCtrlBase.__init__(self);
self.oCreds = tdCtxCreds(sUser, sPassword, sDomain = "");
self.sFile = sFile;
class tdTestFileRemove(tdTestGuestCtrlBase):
"""
Test querying guest file information.
"""
def __init__(self, sFile = "", sUser = "", sPassword = ""):
tdTestGuestCtrlBase.__init__(self);
self.oCreds = tdCtxCreds(sUser, sPassword, sDomain = "");
self.sFile = sFile;
class tdTestFileStat(tdTestGuestCtrlBase):
"""
Test querying guest file information.
"""
def __init__(self, sFile = "", sUser = "", sPassword = "", cbSize = 0, eFileType = 0):
tdTestGuestCtrlBase.__init__(self);
self.oCreds = tdCtxCreds(sUser, sPassword, sDomain = "");
self.sFile = sFile;
self.cbSize = cbSize;
self.eFileType = eFileType;
class tdTestFileIO(tdTestGuestCtrlBase):
"""
Test for the IGuestFile object.
"""
def __init__(self, sFile = "", sUser = "", sPassword = ""):
tdTestGuestCtrlBase.__init__(self);
self.oCreds = tdCtxCreds(sUser, sPassword, sDomain = "");
self.sFile = sFile;
class tdTestFileQuerySize(tdTestGuestCtrlBase):
"""
Test for the file size query API call (fileQuerySize).
"""
def __init__(self, sFile = "", sUser = "", sPassword = ""):
tdTestGuestCtrlBase.__init__(self);
self.oCreds = tdCtxCreds(sUser, sPassword, sDomain = "");
self.sFile = sFile;
class tdTestFileReadWrite(tdTestGuestCtrlBase):
"""
Tests reading from guest files.
"""
def __init__(self, sFile = "", sUser = "", sPassword = "",
sOpenMode = "r", sDisposition = "",
sSharingMode = "",
lCreationMode = 0, cbOffset = 0, cbToReadWrite = 0,
aBuf = None):
tdTestGuestCtrlBase.__init__(self);
self.oCreds = tdCtxCreds(sUser, sPassword, sDomain = "");
self.sFile = sFile;
self.sOpenMode = sOpenMode;
self.sDisposition = sDisposition;
self.sSharingMode = sSharingMode;
self.lCreationMode = lCreationMode;
self.cbOffset = cbOffset;
self.cbToReadWrite = cbToReadWrite;
self.aBuf = aBuf;
class tdTestSession(tdTestGuestCtrlBase):
"""
Test the guest session handling.
"""
def __init__(self, sUser = "", sPassword = "", sDomain = "", \
sSessionName = ""):
tdTestGuestCtrlBase.__init__(self);
self.sSessionName = sSessionName;
self.oCreds = tdCtxCreds(sUser, sPassword, sDomain);
def getSessionCount(self, oVBoxMgr):
"""
Helper for returning the number of currently
opened guest sessions of a VM.
"""
if self.oTest.oGuest is None:
return 0;
aoSession = oVBoxMgr.getArray(self.oTest.oGuest, 'sessions')
return len(aoSession);
class tdTestSessionEnv(tdTestGuestCtrlBase):
"""
Test the guest session environment.
"""
def __init__(self, sUser = "", sPassword = "", aEnv = None):
tdTestGuestCtrlBase.__init__(self);
self.oCreds = tdCtxCreds(sUser, sPassword, sDomain = "");
self.aEnv = aEnv or [];
class tdTestSessionFileRefs(tdTestGuestCtrlBase):
"""
Tests session file (IGuestFile) reference counting.
"""
def __init__(self, cRefs = 0):
tdTestGuestCtrlBase.__init__(self);
self.cRefs = cRefs;
class tdTestSessionDirRefs(tdTestGuestCtrlBase):
"""
Tests session directory (IGuestDirectory) reference counting.
"""
def __init__(self, cRefs = 0):
tdTestGuestCtrlBase.__init__(self);
self.cRefs = cRefs;
class tdTestSessionProcRefs(tdTestGuestCtrlBase):
"""
Tests session process (IGuestProcess) reference counting.
"""
def __init__(self, cRefs = 0):
tdTestGuestCtrlBase.__init__(self);
self.cRefs = cRefs;
class tdTestUpdateAdditions(tdTestGuestCtrlBase):
"""
Test updating the Guest Additions inside the guest.
"""
def __init__(self, sSrc = "", aArgs = None, aFlags = None,
sUser = "", sPassword = "", sDomain = ""):
tdTestGuestCtrlBase.__init__(self);
self.oCreds = tdCtxCreds(sUser, sPassword, sDomain);
self.sSrc = sSrc;
self.aArgs = aArgs;
self.aFlags = aFlags;
class tdTestResult(object):
"""
Base class for test results.
"""
def __init__(self, fRc = False):
## The overall test result.
self.fRc = fRc;
class tdTestResultDirRead(tdTestResult):
"""
Test result for reading guest directories.
"""
def __init__(self, fRc = False,
numFiles = 0, numDirs = 0):
tdTestResult.__init__(self, fRc = fRc);
self.numFiles = numFiles;
self.numDirs = numDirs;
class tdTestResultExec(tdTestResult):
"""
Holds a guest process execution test result,
including the exit code, status + aFlags.
"""
def __init__(self, fRc = False, \
uExitStatus = 500, iExitCode = 0, \
sBuf = None, cbBuf = 0, \
cbStdOut = 0, cbStdErr = 0):
tdTestResult.__init__(self);
## The overall test result.
self.fRc = fRc;
## Process exit stuff.
self.uExitStatus = uExitStatus;
self.iExitCode = iExitCode;
## Desired buffer length returned back from stdout/stderr.
self.cbBuf = cbBuf;
## Desired buffer result from stdout/stderr. Use with caution!
self.sBuf = sBuf;
self.cbStdOut = cbStdOut;
self.cbStdErr = cbStdErr;
class tdTestResultFileStat(tdTestResult):
"""
Test result for stat'ing guest files.
"""
def __init__(self, fRc = False,
cbSize = 0, eFileType = 0):
tdTestResult.__init__(self, fRc = fRc);
self.cbSize = cbSize;
self.eFileType = eFileType;
## @todo Add more information.
class tdTestResultFileReadWrite(tdTestResult):
"""
Test result for reading + writing guest directories.
"""
def __init__(self, fRc = False,
cbProcessed = 0, cbOffset = 0, aBuf = None):
tdTestResult.__init__(self, fRc = fRc);
self.cbProcessed = cbProcessed;
self.cbOffset = cbOffset;
self.aBuf = aBuf;
class tdTestResultSession(tdTestResult):
"""
Test result for guest session counts.
"""
def __init__(self, fRc = False, cNumSessions = 0):
tdTestResult.__init__(self, fRc = fRc);
self.cNumSessions = cNumSessions;
class tdTestResultSessionEnv(tdTestResult):
"""
Test result for guest session environment tests.
"""
def __init__(self, fRc = False, cNumVars = 0):
tdTestResult.__init__(self, fRc = fRc);
self.cNumVars = cNumVars;
class SubTstDrvAddGuestCtrl(base.SubTestDriverBase):
"""
Sub-test driver for executing guest control (VBoxService, IGuest) tests.
"""
def __init__(self, oTstDrv):
base.SubTestDriverBase.__init__(self, 'add-guest-ctrl', oTstDrv);
## @todo base.TestBase.
self.asTestsDef = \
[
'session_basic', 'session_env', 'session_file_ref', 'session_dir_ref', 'session_proc_ref', \
'exec_basic', 'exec_errorlevel', 'exec_timeout', \
'dir_create', 'dir_create_temp', 'dir_read', \
'file_remove', 'file_stat', 'file_read', 'file_write', \
'copy_to', 'copy_from', \
'update_additions'
];
self.asTests = self.asTestsDef;
def parseOption(self, asArgs, iArg): # pylint: disable=R0912,R0915
if asArgs[iArg] == '--add-guest-ctrl-tests':
iArg += 1;
if asArgs[iArg] == 'all': # Nice for debugging scripts.
self.asTests = self.asTestsDef;
return iArg + 1;
iNext = self.oTstDrv.requireMoreArgs(1, asArgs, iArg);
self.asTests = asArgs[iArg].split(':');
for s in self.asTests:
if s not in self.asTestsDef:
raise base.InvalidOption('The "--add-guest-ctrl-tests" value "%s" is not valid; valid values are: %s' \
% (s, ' '.join(self.asTestsDef)));
return iNext;
return iArg;
def showUsage(self):
base.SubTestDriverBase.showUsage(self);
reporter.log(' --add-guest-ctrl-tests <s1[:s2[:]]>');
reporter.log(' Default: %s (all)' % (':'.join(self.asTestsDef)));
return True;
def testIt(self, oTestVm, oSession, oTxsSession):
"""
Executes the test.
Returns fRc, oTxsSession. The latter may have changed.
"""
reporter.log("Active tests: %s" % (self.asTests,));
# Do the testing.
reporter.testStart('Session Basics');
fSkip = 'session_basic' not in self.asTests;
if fSkip == False:
fRc, oTxsSession = self.testGuestCtrlSession(oSession, oTxsSession, oTestVm);
reporter.testDone(fSkip);
reporter.testStart('Session Environment');
fSkip = 'session_env' not in self.asTests or fRc is False;
if fSkip == False:
fRc, oTxsSession = self.testGuestCtrlSessionEnvironment(oSession, oTxsSession, oTestVm);
reporter.testDone(fSkip);
reporter.testStart('Session File References');
fSkip = 'session_file_ref' not in self.asTests;
if fSkip == False:
fRc, oTxsSession = self.testGuestCtrlSessionFileRefs(oSession, oTxsSession, oTestVm);
reporter.testDone(fSkip);
## @todo Implement this.
#reporter.testStart('Session Directory References');
#fSkip = 'session_dir_ref' not in self.asTests;
#if fSkip == False:
# fRc, oTxsSession = self.testGuestCtrlSessionDirRefs(oSession, oTxsSession, oTestVm);
#reporter.testDone(fSkip);
reporter.testStart('Session Process References');
fSkip = 'session_proc_ref' not in self.asTests or fRc is False;
if fSkip == False:
fRc, oTxsSession = self.testGuestCtrlSessionProcRefs(oSession, oTxsSession, oTestVm);
reporter.testDone(fSkip);
reporter.testStart('Execution');
fSkip = 'exec_basic' not in self.asTests or fRc is False;
if fSkip == False:
fRc, oTxsSession = self.testGuestCtrlExec(oSession, oTxsSession, oTestVm);
reporter.testDone(fSkip);
reporter.testStart('Execution Error Levels');
fSkip = 'exec_errorlevel' not in self.asTests or fRc is False;
if fSkip == False:
fRc, oTxsSession = self.testGuestCtrlExecErrorLevel(oSession, oTxsSession, oTestVm);
reporter.testDone(fSkip);
reporter.testStart('Execution Timeouts');
fSkip = 'exec_timeout' not in self.asTests or fRc is False;
if fSkip == False:
fRc, oTxsSession = self.testGuestCtrlExecTimeout(oSession, oTxsSession, oTestVm);
reporter.testDone(fSkip);
reporter.testStart('Creating directories');
fSkip = 'dir_create' not in self.asTests or fRc is False;
if fSkip == False:
fRc, oTxsSession = self.testGuestCtrlDirCreate(oSession, oTxsSession, oTestVm);
reporter.testDone(fSkip);
reporter.testStart('Creating temporary directories');
fSkip = 'dir_create_temp' not in self.asTests or fRc is False;
if fSkip == False:
fRc, oTxsSession = self.testGuestCtrlDirCreateTemp(oSession, oTxsSession, oTestVm);
reporter.testDone(fSkip);
reporter.testStart('Reading directories');
fSkip = 'dir_read' not in self.asTests or fRc is False;
if fSkip == False:
fRc, oTxsSession = self.testGuestCtrlDirRead(oSession, oTxsSession, oTestVm);
reporter.testDone(fSkip);
# FIXME: Failing test.
# reporter.testStart('Copy to guest');
# fSkip = 'copy_to' not in self.asTests or fRc is False;
# if fSkip == False:
# fRc, oTxsSession = self.testGuestCtrlCopyTo(oSession, oTxsSession, oTestVm);
# reporter.testDone(fSkip);
reporter.testStart('Copy from guest');
fSkip = 'copy_from' not in self.asTests or fRc is False;
if fSkip == False:
fRc, oTxsSession = self.testGuestCtrlCopyFrom(oSession, oTxsSession, oTestVm);
reporter.testDone(fSkip);
reporter.testStart('Removing files');
fSkip = 'file_remove' not in self.asTests or fRc is False;
if fSkip == False:
fRc, oTxsSession = self.testGuestCtrlFileRemove(oSession, oTxsSession, oTestVm);
reporter.testDone(fSkip);
reporter.testStart('Querying file information (stat)');
fSkip = 'file_stat' not in self.asTests or fRc is False;
if fSkip == False:
fRc, oTxsSession = self.testGuestCtrlFileStat(oSession, oTxsSession, oTestVm);
reporter.testDone(fSkip);
# FIXME: Failing tests.
# reporter.testStart('File read');
# fSkip = 'file_read' not in self.asTests or fRc is False;
# if fSkip == False:
# fRc, oTxsSession = self.testGuestCtrlFileRead(oSession, oTxsSession, oTestVm);
# reporter.testDone(fSkip);
# reporter.testStart('File write');
# fSkip = 'file_write' not in self.asTests or fRc is False;
# if fSkip == False:
# fRc, oTxsSession = self.testGuestCtrlFileWrite(oSession, oTxsSession, oTestVm);
# reporter.testDone(fSkip);
reporter.testStart('Updating Guest Additions');
fSkip = 'update_additions' not in self.asTests or fRc is False;
# Skip test for updating Guest Additions if we run on a too old (Windows) guest.
fSkip = oTestVm.sKind in ('WindowsNT4', 'Windows2000', 'WindowsXP', 'Windows2003');
if fSkip == False:
fRc, oTxsSession = self.testGuestCtrlUpdateAdditions(oSession, oTxsSession, oTestVm);
reporter.testDone(fSkip);
return (fRc, oTxsSession);
def gctrlCopyFileFrom(self, oGuestSession, sSrc, sDst, aFlags):
"""
Helper function to copy a single file from the guest to the host.
"""
fRc = True; # Be optimistic.
try:
reporter.log2('Copying guest file "%s" to host "%s"' % (sSrc, sDst));
curProgress = oGuestSession.copyFrom(sSrc, sDst, aFlags);
if curProgress is not None:
oProgress = vboxwrappers.ProgressWrapper(curProgress, self.oTstDrv.oVBoxMgr, self, "gctrlCopyFrm");
try:
iRc = oProgress.waitForOperation(0, fIgnoreErrors = True);
if iRc != 0:
reporter.log('Waiting for copyFrom failed');
fRc = False;
except:
reporter.logXcpt('Copy from waiting exception for sSrc="%s", sDst="%s":' \
% (sSrc, sDst));
fRc = False;
except:
# Just log, don't assume an error here (will be done in the main loop then).
reporter.logXcpt('Copy from exception for sSrc="%s", sDst="%s":' \
% (sSrc, sDst));
fRc = False;
return fRc;
def gctrlCopyFileTo(self, oGuestSession, sSrc, sDst, aFlags):
"""
Helper function to copy a single file from host to the guest.
"""
fRc = True; # Be optimistic.
try:
reporter.log2('Copying host file "%s" to guest "%s"' % (sSrc, sDst));
curProgress = oGuestSession.copyTo(sSrc, sDst, aFlags);
if curProgress is not None:
oProgress = vboxwrappers.ProgressWrapper(curProgress, self.oTstDrv.oVBoxMgr, self, "gctrlCopyTo");
try:
iRc = oProgress.waitForOperation(0, fIgnoreErrors = True);
if iRc != 0:
reporter.log('Waiting for copyTo failed');
fRc = False;
except:
reporter.logXcpt('Copy to waiting exception for sSrc="%s", sDst="%s":' \
% (sSrc, sDst));
fRc = False;
else:
reporter.error('No progress object returned');
except:
# Just log, don't assume an error here (will be done in the main loop then).
reporter.logXcpt('Copy to exception for sSrc="%s", sDst="%s":' \
% (sSrc, sDst));
fRc = False;
return fRc;
def gctrlCopyFrom(self, oTest, oRes, oGuestSession, subDir = ''): # pylint: disable=R0914
"""
Copies files / directories to the host.
Source always contains the absolute path, subDir all paths
below it.
"""
## @todo r=bird: The use subDir and sSubDir in this method is extremely confusing!!
# Just follow the coding guidelines and ALWAYS use prefixes, please!
fRc = True; # Be optimistic.
sSrc = oTest.sSrc;
sDst = oTest.sDst;
aFlags = oTest.aFlags;
if sSrc == "" \
or os.path.isfile(sSrc):
return self.gctrlCopyFileFrom(oGuestSession, \
sSrc, sDst, aFlags);
sSrcAbs = sSrc;
if subDir != "":
sSrcAbs = os.path.join(sSrc, subDir);
sFilter = ""; # No filter.
sDstAbs = os.path.join(sDst, subDir);
reporter.log2('Copying guest directory "%s" to host "%s"' % (sSrcAbs, sDstAbs));
try:
#reporter.log2('Directory="%s", filter="%s", aFlags="%s"' % (sCurDir, sFilter, aFlags));
oCurDir = oGuestSession.directoryOpen(sSrcAbs, sFilter, aFlags);
while fRc:
try:
oFsObjInfo = oCurDir.read();
if oFsObjInfo.name == "." \
or oFsObjInfo.name == "..":
#reporter.log2('\tSkipping "%s"' % (oFsObjInfo.name,));
continue; # Skip "." and ".." entries.
if oFsObjInfo.type is vboxcon.FsObjType_Directory:
#reporter.log2('\tDirectory "%s"' % (oFsObjInfo.name,));
sDirCreate = sDst;
if subDir != "":
sDirCreate = os.path.join(sDirCreate, subDir);
sDirCreate = os.path.join(sDirCreate, oFsObjInfo.name);
try:
reporter.log2('\tCreating directory "%s"' % (sDirCreate,));
os.makedirs(sDirCreate);
sSubDir = oFsObjInfo.name;
if subDir != "":
sSubDir = os.path.join(subDir, oFsObjInfo.name);
self.gctrlCopyFrom(oTest, oRes, oGuestSession, sSubDir);
except (OSError) as e:
if e.errno == errno.EEXIST:
pass;
else:
# Just log, don't assume an error here (will be done in the main loop then).
reporter.logXcpt('\tDirectory creation exception for directory="%s":' % (sSubDir,));
raise;
elif oFsObjInfo.type is vboxcon.FsObjType_File:
#reporter.log2('\tFile "%s"' % (oFsObjInfo.name,));
sSourceFile = os.path.join(sSrcAbs, oFsObjInfo.name);
sDestFile = os.path.join(sDstAbs, oFsObjInfo.name);
self.gctrlCopyFileFrom(oGuestSession, sSourceFile, sDestFile, aFlags);
elif oFsObjInfo.type is vboxcon.FsObjType_Symlink:
#reporter.log2('\tSymlink "%s" -- not tested yet' % oFsObjInfo.name);
pass;
else:
reporter.error('\tDirectory "%s" contains invalid directory entry "%s" (%d)' \
% (sSubDir, oFsObjInfo.name, oFsObjInfo.type));
fRc = False;
except Exception, oXcpt:
# No necessarily an error -- could be VBOX_E_OBJECT_NOT_FOUND. See reference.
if vbox.ComError.equal(oXcpt, vbox.ComError.VBOX_E_OBJECT_NOT_FOUND):
#reporter.log2('\tNo more directory entries for "%s"' % (sCurDir,));
break
# Just log, don't assume an error here (will be done in the main loop then).
reporter.logXcpt('\tDirectory read exception for directory="%s":' % (sSrcAbs,));
fRc = False;
break;
oCurDir.close();
except:
# Just log, don't assume an error here (will be done in the main loop then).
reporter.logXcpt('\tDirectory open exception for directory="%s":' % (sSrcAbs,));
fRc = False;
return fRc;
def gctrlCopyTo(self, oTest, oGuestSession, subDir = ''): # pylint: disable=R0914
"""
Copies files / directories to the guest.
Source always contains the absolute path,
subDir all paths below it.
"""
fRc = True; # Be optimistic.
sSrc = oTest.sSrc;
sDst = oTest.sDst;
aFlags = oTest.aFlags;
reporter.log2('sSrc=%s, sDst=%s, aFlags=%s' % (sSrc, sDst, aFlags));
sSrcAbs = sSrc;
if subDir != "":
sSrcAbs = os.path.join(sSrc, subDir);
# Note: Current test might want to test copying empty sources
if not os.path.exists(sSrcAbs) \
or os.path.isfile(sSrcAbs):
return self.gctrlCopyFileTo(oGuestSession, \
sSrcAbs, sDst, aFlags);
sDstAbs = os.path.join(sDst, subDir);
reporter.log2('Copying host directory "%s" to guest "%s"' % (sSrcAbs, sDstAbs));
try:
# Note: Symlinks intentionally not considered here.
for (sDirCurrent, oDirs, oFiles) in os.walk(sSrcAbs, topdown=True):
for sFile in oFiles:
sCurFile = os.path.join(sDirCurrent, sFile);
reporter.log2('Processing host file "%s"' % (sCurFile,));
sFileDest = os.path.join(sDstAbs, os.path.relpath(sCurFile, sSrcAbs));
reporter.log2('Copying file to "%s"' % (sFileDest,));
fRc = self.gctrlCopyFileTo(oGuestSession, \
sCurFile, sFileDest, aFlags);
if fRc is False:
break;
if fRc is False:
break;
for sSubDir in oDirs:
sCurDir = os.path.join(sDirCurrent, sSubDir);
reporter.log2('Processing host dir "%s"' % (sCurDir,));
sDirDest = os.path.join(sDstAbs, os.path.relpath(sCurDir, sSrcAbs));
reporter.log2('Creating guest dir "%s"' % (sDirDest,));
oGuestSession.directoryCreate(sDirDest, \
700, [ vboxcon.DirectoryCreateFlag_Parents ]);
if fRc is False:
break;
if fRc is False:
break;
except:
# Just log, don't assume an error here (will be done in the main loop then).
reporter.logXcpt('Copy to exception for sSrc="%s", sDst="%s":' \
% (sSrcAbs, sDst));
return False;
return fRc;
def gctrlCreateDir(self, oTest, oRes, oGuestSession):
"""
Helper function to create a guest directory specified in
the current test.
"""
fRc = True; # Be optimistic.
reporter.log2('Creating directory "%s"' % (oTest.sDirectory,));
try:
oGuestSession.directoryCreate(oTest.sDirectory, \
oTest.fMode, oTest.aFlags);
fDirExists = oGuestSession.directoryExists(oTest.sDirectory);
if fDirExists is False \
and oRes.fRc is True:
# Directory does not exist but we want it to.
fRc = False;
except:
reporter.logXcpt('Directory create exception for directory "%s":' % (oTest.sDirectory,));
if oRes.fRc is True:
# Just log, don't assume an error here (will be done in the main loop then).
fRc = False;
# Directory creation failed, which was the expected result.
return fRc;
def gctrlReadDir(self, oTest, oRes, oGuestSession, subDir = ''): # pylint: disable=R0914
"""
Helper function to read a guest directory specified in
the current test.
"""
sDir = oTest.sDirectory;
sFilter = oTest.sFilter;
aFlags = oTest.aFlags;
fRc = True; # Be optimistic.
cDirs = 0; # Number of directories read.
cFiles = 0; # Number of files read.
try:
sCurDir = os.path.join(sDir, subDir);
#reporter.log2('Directory="%s", filter="%s", aFlags="%s"' % (sCurDir, sFilter, aFlags));
oCurDir = oGuestSession.directoryOpen(sCurDir, sFilter, aFlags);
while fRc:
try:
oFsObjInfo = oCurDir.read();
if oFsObjInfo.name == "." \
or oFsObjInfo.name == "..":
#reporter.log2('\tSkipping "%s"' % oFsObjInfo.name);
continue; # Skip "." and ".." entries.
if oFsObjInfo.type is vboxcon.FsObjType_Directory:
#reporter.log2('\tDirectory "%s"' % oFsObjInfo.name);
cDirs += 1;
sSubDir = oFsObjInfo.name;
if subDir != "":
sSubDir = os.path.join(subDir, oFsObjInfo.name);
fRc, cSubDirs, cSubFiles = self.gctrlReadDir(oTest, oRes, oGuestSession, sSubDir);
cDirs += cSubDirs;
cFiles += cSubFiles;
elif oFsObjInfo.type is vboxcon.FsObjType_File:
#reporter.log2('\tFile "%s"' % oFsObjInfo.name);
cFiles += 1;
elif oFsObjInfo.type is vboxcon.FsObjType_Symlink:
#reporter.log2('\tSymlink "%s" -- not tested yet' % oFsObjInfo.name);
pass;
else:
reporter.error('\tDirectory "%s" contains invalid directory entry "%s" (type %d)' % \
(sCurDir, oFsObjInfo.name, oFsObjInfo.type));
fRc = False;
except Exception, oXcpt:
# No necessarily an error -- could be VBOX_E_OBJECT_NOT_FOUND. See reference.
if vbox.ComError.equal(oXcpt, vbox.ComError.VBOX_E_OBJECT_NOT_FOUND):
#reporter.log2('\tNo more directory entries for "%s"' % (sCurDir,));
break
# Just log, don't assume an error here (will be done in the main loop then).
reporter.logXcpt('\tDirectory open exception for directory="%s":' % (sCurDir,));
fRc = False;
break;
oCurDir.close();
except:
# Just log, don't assume an error here (will be done in the main loop then).
reporter.logXcpt('\tDirectory open exception for directory="%s":' % (sCurDir,));
fRc = False;
return (fRc, cDirs, cFiles);
def gctrlExecDoTest(self, i, oTest, oRes, oGuestSession):
"""
Wrapper function around gctrlExecute to provide more sanity checking
when needed in actual execution tests.
"""
reporter.log('Testing #%d, cmd="%s" ...' % (i, oTest.sCmd));
fRc = self.gctrlExecute(oTest, oGuestSession);
if fRc is oRes.fRc:
if fRc is True:
# Compare exit status / code on successful process execution.
if oTest.uExitStatus != oRes.uExitStatus \
or oTest.iExitCode != oRes.iExitCode:
reporter.error('Test #%d failed: Got exit status + code %d,%d, expected %d,%d' % \
(i, oTest.uExitStatus, oTest.iExitCode, \
oRes.uExitStatus, oRes.iExitCode));
return False;
if fRc is True:
# Compare test / result buffers on successful process execution.
if oTest.sBuf is not None \
and oRes.sBuf is not None:
if bytes(oTest.sBuf) != bytes(oRes.sBuf):
reporter.error('Test #%d failed: Got buffer\n%s (%ld bytes), expected\n%s (%ld bytes)' %
(i, map(hex, map(ord, oTest.sBuf)), len(oTest.sBuf), \
map(hex, map(ord, oRes.sBuf)), len(oRes.sBuf)));
return False;
else:
reporter.log2('Test #%d passed: Buffers match (%ld bytes)' % (i, len(oRes.sBuf)));
elif oRes.sBuf is not None \
and len(oRes.sBuf):
reporter.error('Test #%d failed: Got no buffer data, expected\n%s (%dbytes)' %
(i, map(hex, map(ord, oRes.sBuf)), len(oRes.sBuf)));
return False;
elif oRes.cbStdOut > 0 \
and oRes.cbStdOut != oTest.cbStdOut:
reporter.error('Test #%d failed: Got %ld stdout data, expected %ld'
% (i, oTest.cbStdOut, oRes.cbStdOut));
return False;
else:
reporter.error('Test #%d failed: Got %s, expected %s' % (i, fRc, oRes.fRc));
return False;
return True;
def gctrlExecute(self, oTest, oGuestSession):
"""
Helper function to execute a program on a guest, specified in
the current test.
"""
fRc = True; # Be optimistic.
## @todo Compare execution timeouts!
#tsStart = base.timestampMilli();
reporter.log2('Using session user=%s, sDomain=%s, session name=%s, session timeout=%ld' \
% (oGuestSession.user, oGuestSession.domain, \
oGuestSession.name, oGuestSession.timeout));
reporter.log2('Executing cmd=%s, aFlags=%s, timeout=%ld, args=%s, env=%s' \
% (oTest.sCmd, oTest.aFlags, oTest.timeoutMS, \
oTest.aArgs, oTest.aEnv));
try:
curProc = oGuestSession.processCreate(oTest.sCmd, \
oTest.aArgs, oTest.aEnv, \
oTest.aFlags, oTest.timeoutMS);
if curProc is not None:
reporter.log2('Process start requested, waiting for start (%ldms) ...' % (oTest.timeoutMS,));
fWaitFor = [ vboxcon.ProcessWaitForFlag_Start ];
waitResult = curProc.waitForArray(fWaitFor, oTest.timeoutMS);
reporter.log2('Wait result returned: %d, current process status is: %ld' % (waitResult, curProc.status));
if curProc.status == vboxcon.ProcessStatus_Started:
fWaitFor = [ vboxcon.ProcessWaitForFlag_Terminate ];
if vboxcon.ProcessCreateFlag_WaitForStdOut in oTest.aFlags:
fWaitFor.append(vboxcon.ProcessWaitForFlag_StdOut);
if vboxcon.ProcessCreateFlag_WaitForStdErr in oTest.aFlags:
fWaitFor.append(vboxcon.ProcessWaitForFlag_StdErr);
## @todo Add vboxcon.ProcessWaitForFlag_StdIn.
reporter.log2('Process (PID %ld) started, waiting for termination (%dms), waitFlags=%s ...' \
% (curProc.PID, oTest.timeoutMS, fWaitFor));
while True:
waitResult = curProc.waitForArray(fWaitFor, oTest.timeoutMS);
reporter.log2('Wait returned: %d' % (waitResult,));
try:
# Try stdout.
if waitResult == vboxcon.ProcessWaitResult_StdOut \
or waitResult == vboxcon.ProcessWaitResult_WaitFlagNotSupported:
reporter.log2('Reading stdout ...');
buf = curProc.Read(1, 64 * 1024, oTest.timeoutMS);
if len(buf):
reporter.log2('Process (PID %ld) got %ld bytes of stdout data' % (curProc.PID, len(buf)));
oTest.cbStdOut += len(buf);
oTest.sBuf = buf; # Appending does *not* work atm, so just assign it. No time now.
# Try stderr.
if waitResult == vboxcon.ProcessWaitResult_StdErr \
or waitResult == vboxcon.ProcessWaitResult_WaitFlagNotSupported:
reporter.log2('Reading stderr ...');
buf = curProc.Read(2, 64 * 1024, oTest.timeoutMS);
if len(buf):
reporter.log2('Process (PID %ld) got %ld bytes of stderr data' % (curProc.PID, len(buf)));
oTest.cbStdErr += len(buf);
oTest.sBuf = buf; # Appending does *not* work atm, so just assign it. No time now.
# Use stdin.
if waitResult == vboxcon.ProcessWaitResult_StdIn \
or waitResult == vboxcon.ProcessWaitResult_WaitFlagNotSupported:
pass; #reporter.log2('Process (PID %ld) needs stdin data' % (curProc.pid,));
# Termination or error?
if waitResult == vboxcon.ProcessWaitResult_Terminate \
or waitResult == vboxcon.ProcessWaitResult_Error \
or waitResult == vboxcon.ProcessWaitResult_Timeout:
reporter.log2('Process (PID %ld) reported terminate/error/timeout: %ld, status: %ld' \
% (curProc.PID, waitResult, curProc.status));
break;
except:
# Just skip reads which returned nothing.
pass;
reporter.log2('Final process status (PID %ld) is: %ld' % (curProc.PID, curProc.status));
reporter.log2('Process (PID %ld) %ld stdout, %ld stderr' % (curProc.PID, oTest.cbStdOut, oTest.cbStdErr));
oTest.uExitStatus = curProc.status;
oTest.iExitCode = curProc.exitCode;
reporter.log2('Process (PID %ld) has exit code: %ld' % (curProc.PID, oTest.iExitCode));
except KeyboardInterrupt:
reporter.error('Process (PID %ld) execution interrupted' % (curProc.PID,));
if curProc is not None:
curProc.close();
except:
# Just log, don't assume an error here (will be done in the main loop then).
reporter.logXcpt('Execution exception for command "%s":' % (oTest.sCmd,));
fRc = False;
return fRc;
def testGuestCtrlSessionEnvironment(self, oSession, oTxsSession, oTestVm): # pylint: disable=R0914
"""
Tests the guest session environment.
"""
if oTestVm.isWindows():
sUser = "Administrator";
else:
sUser = "vbox";
sPassword = "password";
aaTests = [
# No environment set.
[ tdTestSessionEnv(sUser = sUser, sPassword = sPassword),
tdTestResultSessionEnv(fRc = False) ],
# Invalid stuff.
[ tdTestSessionEnv(sUser = sUser, sPassword = sPassword, aEnv = [ '=FOO' ]),
tdTestResultSessionEnv(fRc = False) ],
[ tdTestSessionEnv(sUser = sUser, sPassword = sPassword, aEnv = [ '====' ]),
tdTestResultSessionEnv(fRc = False) ],
[ tdTestSessionEnv(sUser = sUser, sPassword = sPassword, aEnv = [ '=BAR' ]),
tdTestResultSessionEnv(fRc = False) ],
[ tdTestSessionEnv(sUser = sUser, sPassword = sPassword, aEnv = [ u'ß$%ß&' ]),
tdTestResultSessionEnv(fRc = False) ],
# Key only.
[ tdTestSessionEnv(sUser = sUser, sPassword = sPassword, aEnv = [ 'FOO=' ]),
tdTestResultSessionEnv(fRc = True, cNumVars = 1) ],
# Values.
[ tdTestSessionEnv(sUser = sUser, sPassword = sPassword, aEnv = [ 'FOO' ]),
tdTestResultSessionEnv(fRc = True, cNumVars = 1) ],
[ tdTestSessionEnv(sUser = sUser, sPassword = sPassword, aEnv = [ 'FOO=BAR' ]),
tdTestResultSessionEnv(fRc = True, cNumVars = 1) ],
[ tdTestSessionEnv(sUser = sUser, sPassword = sPassword, aEnv = [ 'FOO=BAR', 'BAR=BAZ' ]),
tdTestResultSessionEnv(fRc = True, cNumVars = 2) ],
# A bit more weird keys/values.
[ tdTestSessionEnv(sUser = sUser, sPassword = sPassword, aEnv = [ '$$$=' ]),
tdTestResultSessionEnv(fRc = True, cNumVars = 1) ],
[ tdTestSessionEnv(sUser = sUser, sPassword = sPassword, aEnv = [ '$$$=%%%' ]),
tdTestResultSessionEnv(fRc = True, cNumVars = 1) ],
# Same stuff.
[ tdTestSessionEnv(sUser = sUser, sPassword = sPassword, aEnv = [ 'FOO=BAR', 'FOO=BAR' ]),
tdTestResultSessionEnv(fRc = True, cNumVars = 1) ]
];
# Parameters.
fRc = True;
for (i, aTest) in enumerate(aaTests):
curTest = aTest[0]; # tdTestExec, use an index, later.
curRes = aTest[1]; # tdTestResult
curTest.setEnvironment(oSession, oTxsSession, oTestVm);
reporter.log('Testing #%d, user="%s", sPassword="%s", env="%s" (%d)...' \
% (i, curTest.oCreds.sUser, curTest.oCreds.sPassword, curTest.aEnv, len(curTest.aEnv)));
curGuestSessionName = 'testGuestCtrlSessionEnvironment: Test #%d' % (i,);
fRc2, curGuestSession = curTest.createSession(curGuestSessionName);
if fRc2 is not True:
reporter.error('Test #%d failed: Session creation failed: Got %s, expected True' % (i, fRc2));
fRc = False;
break;
# Make sure environment is empty.
curEnv = self.oTstDrv.oVBoxMgr.getArray(curGuestSession, 'environment');
reporter.log2('Test #%d: Environment initially has %d elements' % (i, len(curEnv)));
if len(curEnv) != 0:
reporter.error('Test #%d failed: Initial session environment has %d vars, expected 0' % (i, len(curEnv)));
fRc = False;
break;
try:
for (_, aEnv) in enumerate(curTest.aEnv): # Enumerate only will work with a sequence (e.g > 1 entries).
aElems = aEnv.split('=');
strKey = '';
strValue = '';
if len(aElems) > 0:
strKey = aElems[0];
if len(aElems) == 2:
strValue = aElems[1];
reporter.log2('Test #%d: Single key="%s", value="%s" (%d) ...' \
% (i, strKey, strValue, len(aElems)));
try:
curGuestSession.environmentSet(strKey, strValue); # No return (e.g. boolean) value available thru wrapper.
except:
# Setting environment variables might fail (e.g. if empty name specified). Check.
reporter.logXcpt('Test #%d failed: Setting environment variable failed:' % (i,));
curEnv = self.oTstDrv.oVBoxMgr.getArray(curGuestSession, 'environment');
if len(curEnv) is not curRes.cNumVars:
reporter.error('Test #%d failed: Session environment has %d vars, expected %d' \
% (i, len(curEnv), curRes.cNumVars));
fRc = False;
break;
else:
reporter.log('Test #%d: API reported an error (single), good' % (i,));
reporter.log2('Getting key="%s" ...' % (strKey,));
try:
strValue2 = curGuestSession.environmentGet(strKey);
if strKey.isalnum() \
and strValue != strValue2:
reporter.error('Test #%d failed: Got environment variable "%s", expected "%s" (key: "%s")' \
% (i, strValue2, strValue, strKey));
fRc = False;
break;
# Getting back an empty value when specifying an invalid key is fine.
reporter.log2('Got key "%s=%s"' % (strKey, strValue2));
except UnicodeDecodeError: # Might happen on unusal values, fine.
if strValue != strValue2:
reporter.error('Test #%d failed: Got (undecoded) environment variable "%s", ' \
'expected "%s" (key: "%s")' \
% (i, strValue2, strValue, strKey));
fRc = False;
break;
except:
if strKey == "" \
or not strKey.isalnum():
reporter.log('Test #%d: API reported an error (invalid key "%s"), good' % (i, strKey));
else:
reporter.errorXcpt('Test #%d failed: Getting environment variable:' % (i));
if fRc is False:
continue;
# Set the same stuff again, this time all at once using the array.
if len(curTest.aEnv):
reporter.log('Test #%d: Array %s (%d)' % (i, curTest.aEnv, len(curTest.aEnv)));
try:
## @todo No return (e.g. boolean) value available thru wrapper.
#curGuestSession.environmentSetArray(curTest.aEnv);
pass;
except:
# Setting environment variables might fail (e.g. if empty name specified). Check.
curEnv = self.oTstDrv.oVBoxMgr.getArray(curGuestSession, 'environment');
if len(curEnv) is not curRes.cNumVars:
reporter.error('Test #%d failed: Session environment has %d vars, expected %d (array)' \
% (i, len(curEnv), curRes.cNumVars));
fRc = False;
break;
else:
reporter.log('Test #%d: API reported an error (array), good' % (i,));
## @todo Get current system environment and add it to curRes.cNumVars before comparing!
reporter.log('Test #%d: Environment size' % (i,));
curEnv = self.oTstDrv.oVBoxMgr.getArray(curGuestSession, 'environment');
reporter.log2('Test #%d: Environment (%d) -> %s' % (i, len(curEnv), curEnv));
if len(curEnv) != curRes.cNumVars:
reporter.error('Test #%d failed: Session environment has %d vars (%s), expected %d' \
% (i, len(curEnv), curEnv, curRes.cNumVars));
fRc = False;
break;
curGuestSession.environmentClear(); # No return (e.g. boolean) value available thru wrapper.
curEnv = self.oTstDrv.oVBoxMgr.getArray(curGuestSession, 'environment');
if len(curEnv) is not 0:
reporter.error('Test #%d failed: Session environment has %d vars, expected 0');
fRc = False;
break;
except:
reporter.errorXcpt('Test #%d failed:' % (i,));
fRc2 = curTest.closeSession();
if fRc2 is False:
reporter.error('Test #%d failed: Session could not be closed' % (i,));
fRc = False;
break;
return (fRc, oTxsSession);
def testGuestCtrlSession(self, oSession, oTxsSession, oTestVm): # pylint: disable=R0914
"""
Tests the guest session handling.
"""
if oTestVm.isWindows():
sUser = "Administrator";
else:
sUser = "vbox";
sPassword = "password";
aaTests = [
# Invalid parameters.
[ tdTestSession(),
tdTestResultSession(fRc = False) ],
[ tdTestSession(sUser = ''),
tdTestResultSession(fRc = False) ],
[ tdTestSession(sPassword = 'bar'),
tdTestResultSession(fRc = False) ],
[ tdTestSession(sDomain = 'boo'),
tdTestResultSession(fRc = False) ],
[ tdTestSession(sPassword = 'bar', sDomain = 'boo'),
tdTestResultSession(fRc = False) ],
# User account without a passwort - forbidden.
[ tdTestSession(sUser = sUser),
tdTestResultSession(fRc = False) ],
# Wrong credentials.
# Note: On Guest Additions < 4.3 this always succeeds because these don't
# support creating dedicated sessions. Instead, guest process creation
# then will fail. See note below.
[ tdTestSession(sUser = 'foo', sPassword = 'bar', sDomain = 'boo'),
tdTestResultSession(fRc = False) ],
# Correct credentials.
[ tdTestSession(sUser = sUser, sPassword = sPassword),
tdTestResultSession(fRc = True, cNumSessions = 1) ]
];
# Parameters.
fRc = True;
for (i, aTest) in enumerate(aaTests):
curTest = aTest[0]; # tdTestSession, use an index, later.
curRes = aTest[1]; # tdTestResult
curTest.setEnvironment(oSession, oTxsSession, oTestVm);
reporter.log('Testing #%d, user="%s", sPassword="%s", sDomain="%s" ...' \
% (i, curTest.oCreds.sUser, curTest.oCreds.sPassword, curTest.oCreds.sDomain));
curGuestSessionName = 'testGuestCtrlSession: Test #%d' % (i);
fRc2, curGuestSession = curTest.createSession(curGuestSessionName);
# See note about < 4.3 Guest Additions above.
if curGuestSession is not None \
and curGuestSession.protocolVersion >= 2 \
and fRc2 is not curRes.fRc:
reporter.error('Test #%d failed: Session creation failed: Got %s, expected %s' \
% (i, fRc2, curRes.fRc));
fRc = False;
if fRc2:
# On Guest Additions < 4.3 getSessionCount() always will return 1, so skip the
# check then.
if curGuestSession.protocolVersion >= 2:
curSessionCount = curTest.getSessionCount(self.oTstDrv.oVBoxMgr);
if curSessionCount is not curRes.cNumSessions:
reporter.error('Test #%d failed: Session count does not match: Got %d, expected %d' \
% (i, curSessionCount, curRes.cNumSessions));
fRc = False;
break;
if curGuestSession is not None \
and curGuestSession.name != curGuestSessionName:
reporter.error('Test #%d failed: Session name does not match: Got "%s", expected "%s"' \
% (i, curGuestSession.name, curGuestSessionName));
fRc = False;
break;
fRc2 = curTest.closeSession();
if fRc2 is False:
reporter.error('Test #%d failed: Session could not be closed' % (i,));
fRc = False;
break;
if fRc is False:
return (False, oTxsSession);
# Multiple sessions.
iMaxGuestSessions = 31; # Maximum number of concurrent guest session allowed.
# Actually, this is 32, but we don't test session 0.
multiSession = {};
reporter.log2('Opening multiple guest tsessions at once ...');
for i in range(iMaxGuestSessions + 1):
multiSession[i] = tdTestSession(sUser = sUser, sPassword = sPassword, sSessionName = 'MultiSession #%d' % (i,));
multiSession[i].setEnvironment(oSession, oTxsSession, oTestVm);
curSessionCount = multiSession[i].getSessionCount(self.oTstDrv.oVBoxMgr);
reporter.log2('MultiSession test #%d count is %d' % (i, curSessionCount));
if curSessionCount is not i:
reporter.error('MultiSession count #%d must be %d, got %d' % (i, i, curSessionCount));
fRc = False;
break;
fRc2, _ = multiSession[i].createSession('MultiSession #%d' % (i,));
if fRc2 is not True:
if i < iMaxGuestSessions:
reporter.error('MultiSession #%d test failed' % (i,));
fRc = False;
else:
reporter.log('MultiSession #%d exceeded concurrent guest session count, good' % (i,));
break;
curSessionCount = multiSession[i].getSessionCount(self.oTstDrv.oVBoxMgr);
if curSessionCount is not iMaxGuestSessions:
reporter.error('Final MultiSession count must be %d, got %d'
% (iMaxGuestSessions, curSessionCount));
return (False, oTxsSession);
reporter.log2('Closing MultiSessions ...');
iLastSession = iMaxGuestSessions - 1;
for i in range(iLastSession): # Close all but the last opened session.
fRc2 = multiSession[i].closeSession();
reporter.log2('MultiSession #%d count is %d' % (i, multiSession[i].getSessionCount(self.oTstDrv.oVBoxMgr),));
if fRc2 is False:
reporter.error('Closing MultiSession #%d failed' % (i,));
fRc = False;
break;
curSessionCount = multiSession[i].getSessionCount(self.oTstDrv.oVBoxMgr);
if curSessionCount is not 1:
reporter.error('Final MultiSession count #2 must be 1, got %d' % (curSessionCount,));
fRc = False;
try:
# Make sure that accessing the first opened guest session does not work anymore because we just removed (closed) it.
curSessionName = multiSession[0].oGuestSession.name;
reporter.error('Accessing first removed MultiSession should not be possible, got name="%s"' % (curSessionName,));
fRc = False;
except:
reporter.logXcpt('Could not access first removed MultiSession object, good:');
try:
# Try Accessing last opened session which did not get removed yet.
curSessionName = multiSession[iLastSession].oGuestSession.name;
reporter.log('Accessing last standing MultiSession worked, got name="%s"' % (curSessionName,));
multiSession[iLastSession].closeSession();
curSessionCount = multiSession[i].getSessionCount(self.oTstDrv.oVBoxMgr);
if curSessionCount is not 0:
reporter.error('Final MultiSession count #3 must be 0, got %d' % (curSessionCount,));
fRc = False;
except:
reporter.logXcpt('Could not access last standing MultiSession object:');
fRc = False;
## @todo Test session timeouts.
return (fRc, oTxsSession);
def testGuestCtrlSessionFileRefs(self, oSession, oTxsSession, oTestVm): # pylint: disable=R0914
"""
Tests the guest session file reference handling.
"""
if oTestVm.isWindows():
sUser = "Administrator";
sPassword = "password";
sDomain = "";
sFile = "C:\\windows\\system32\\kernel32.dll";
# Number of stale guest files to create.
cStaleFiles = 10;
fRc = True;
try:
oGuest = oSession.o.console.guest;
oGuestSession = oGuest.createSession(sUser, sPassword, sDomain, \
"testGuestCtrlSessionFileRefs");
fWaitFor = [ vboxcon.GuestSessionWaitForFlag_Start ];
waitResult = oGuestSession.waitForArray(fWaitFor, 30 * 1000);
#
# Be nice to Guest Additions < 4.3: They don't support session handling and
# therefore return WaitFlagNotSupported.
#
if waitResult != vboxcon.GuestSessionWaitResult_Start \
and waitResult != vboxcon.GuestSessionWaitResult_WaitFlagNotSupported:
# Just log, don't assume an error here (will be done in the main loop then).
reporter.log('Session did not start successfully, returned wait result: %ld' \
% (waitResult));
return (False, oTxsSession);
reporter.log('Session successfully started');
#
# Open guest files and "forget" them (stale entries).
# For them we don't have any references anymore intentionally.
#
reporter.log2('Opening stale files');
for i in range(0, cStaleFiles):
try:
oGuestSession.fileOpen(sFile, "r", "oe", 0);
# Note: Use a timeout in the call above for not letting the stale processes
# hanging around forever. This can happen if the installed Guest Additions
# do not support terminating guest processes.
except:
reporter.errorXcpt('Opening stale file #%ld failed:' % (i,));
fRc = False;
break;
if fRc:
cFiles = len(self.oTstDrv.oVBoxMgr.getArray(oGuestSession, 'files'));
if cFiles != cStaleFiles:
reporter.error('Test failed: Got %ld stale files, expected %ld' % (cFiles, cStaleFiles));
fRc = False;
if fRc:
#
# Open non-stale files and close them again.
#
reporter.log2('Opening non-stale files');
aaFiles = [];
for i in range(0, cStaleFiles):
try:
oCurFile = oGuestSession.fileOpen(sFile, "r", "oe", 0);
aaFiles.append(oCurFile);
except:
reporter.errorXcpt('Opening non-stale file #%ld failed:' % (i,));
fRc = False;
break;
if fRc:
cFiles = len(self.oTstDrv.oVBoxMgr.getArray(oGuestSession, 'files'));
if cFiles != cStaleFiles * 2:
reporter.error('Test failed: Got %ld total files, expected %ld' % (cFiles, cStaleFiles * 2));
fRc = False;
if fRc:
reporter.log2('Closing all non-stale files again ...');
for i in range(0, cStaleFiles):
try:
aaFiles[i].close();
except:
reporter.errorXcpt('Waiting for non-stale file #%ld failed:' % (i,));
fRc = False;
break;
cFiles = len(self.oTstDrv.oVBoxMgr.getArray(oGuestSession, 'files'));
# Here we count the stale files (that is, files we don't have a reference
# anymore for) and the opened and then closed non-stale files (that we still keep
# a reference in aaFiles[] for).
if cFiles != cStaleFiles:
reporter.error('Test failed: Got %ld total files, expected %ld' \
% (cFiles, cStaleFiles));
fRc = False;
if fRc:
#
# Check if all (referenced) non-stale files now are in "closed" state.
#
reporter.log2('Checking statuses of all non-stale files ...');
for i in range(0, cStaleFiles):
try:
curFilesStatus = aaFiles[i].status;
if curFilesStatus != vboxcon.FileStatus_Closed:
reporter.error('Test failed: Non-stale file #%ld has status %ld, expected %ld' \
% (i, curFilesStatus, vboxcon.FileStatus_Closed));
fRc = False;
except:
reporter.errorXcpt('Checking status of file #%ld failed:' % (i,));
fRc = False;
break;
if fRc:
reporter.log2('All non-stale files closed');
cFiles = len(self.oTstDrv.oVBoxMgr.getArray(oGuestSession, 'files'));
reporter.log2('Final guest session file count: %ld' % (cFiles,));
# Now try to close the session and see what happens.
reporter.log2('Closing guest session ...');
oGuestSession.close();
except:
reporter.errorXcpt('Testing for stale processes failed:');
fRc = False;
return (fRc, oTxsSession);
#def testGuestCtrlSessionDirRefs(self, oSession, oTxsSession, oTestVm):
# """
# Tests the guest session directory reference handling.
# """
# fRc = True;
# return (fRc, oTxsSession);
def testGuestCtrlSessionProcRefs(self, oSession, oTxsSession, oTestVm): # pylint: disable=R0914
"""
Tests the guest session process reference handling.
"""
if oTestVm.isWindows():
sUser = "Administrator";
sPassword = "password";
sDomain = "";
sCmd = "C:\\windows\\system32\\cmd.exe";
sArgs = [];
# Number of stale guest processes to create.
cStaleProcs = 10;
fRc = True;
try:
oGuest = oSession.o.console.guest;
oGuestSession = oGuest.createSession(sUser, sPassword, sDomain, \
"testGuestCtrlSessionProcRefs");
fWaitFor = [ vboxcon.GuestSessionWaitForFlag_Start ];
waitResult = oGuestSession.waitForArray(fWaitFor, 30 * 1000);
#
# Be nice to Guest Additions < 4.3: They don't support session handling and
# therefore return WaitFlagNotSupported.
#
if waitResult != vboxcon.GuestSessionWaitResult_Start \
and waitResult != vboxcon.GuestSessionWaitResult_WaitFlagNotSupported:
# Just log, don't assume an error here (will be done in the main loop then).
reporter.log('Session did not start successfully, returned wait result: %ld' \
% (waitResult));
return (False, oTxsSession);
reporter.log('Session successfully started');
#
# Fire off forever-running processes and "forget" them (stale entries).
# For them we don't have any references anymore intentionally.
#
reporter.log2('Starting stale processes');
for i in range(0, cStaleProcs):
try:
oGuestSession.processCreate(sCmd, \
sArgs, [], \
[ vboxcon.ProcessCreateFlag_WaitForStdOut ], \
30 * 1000);
# Note: Use a timeout in the call above for not letting the stale processes
# hanging around forever. This can happen if the installed Guest Additions
# do not support terminating guest processes.
except:
reporter.logXcpt('Creating stale process #%ld failed:' % (i,));
fRc = False;
break;
if fRc:
cProcs = len(self.oTstDrv.oVBoxMgr.getArray(oGuestSession, 'processes'));
if cProcs != cStaleProcs:
reporter.error('Test failed: Got %ld stale processes, expected %ld' % (cProcs, cStaleProcs));
fRc = False;
if fRc:
#
# Fire off non-stale processes and wait for termination.
#
if oTestVm.isWindows():
sArgs = [ '/C', 'dir', '/S', 'C:\\Windows\\system'];
reporter.log2('Starting non-stale processes');
aaProcs = [];
for i in range(0, cStaleProcs):
try:
oCurProc = oGuestSession.processCreate(sCmd, \
sArgs, [], \
[], \
0); # Infinite timeout.
aaProcs.append(oCurProc);
except:
reporter.logXcpt('Creating non-stale process #%ld failed:' % (i,));
fRc = False;
break;
if fRc:
reporter.log2('Waiting for non-stale processes to terminate');
for i in range(0, cStaleProcs):
try:
aaProcs[i].waitForArray([ vboxcon.ProcessWaitForFlag_Terminate ], 30 * 1000);
curProcStatus = aaProcs[i].status;
if aaProcs[i].status != vboxcon.ProcessStatus_TerminatedNormally:
reporter.error('Test failed: Waiting for non-stale processes #%ld'
' resulted in status %ld, expected %ld' \
% (i, curProcStatus, vboxcon.ProcessStatus_TerminatedNormally));
fRc = False;
except:
reporter.logXcpt('Waiting for non-stale process #%ld failed:' % (i,));
fRc = False;
break;
cProcs = len(self.oTstDrv.oVBoxMgr.getArray(oGuestSession, 'processes'));
# Here we count the stale processes (that is, processes we don't have a reference
# anymore for) and the started + terminated non-stale processes (that we still keep
# a reference in aaProcs[] for).
if cProcs != (cStaleProcs * 2):
reporter.error('Test failed: Got %ld total processes, expected %ld' \
% (cProcs, cStaleProcs));
fRc = False;
if fRc:
#
# Check if all (referenced) non-stale processes now are in "terminated" state.
#
for i in range(0, cStaleProcs):
curProcStatus = aaProcs[i].status;
if aaProcs[i].status != vboxcon.ProcessStatus_TerminatedNormally:
reporter.error('Test failed: Non-stale processes #%ld has status %ld, expected %ld' \
% (i, curProcStatus, vboxcon.ProcessStatus_TerminatedNormally));
fRc = False;
if fRc:
reporter.log2('All non-stale processes terminated');
# Fire off blocking processes which are terminated via terminate().
if oTestVm.isWindows():
sArgs = [ '/C', 'dir', '/S', 'C:\\Windows'];
reporter.log2('Starting blocking processes');
aaProcs = [];
for i in range(0, cStaleProcs):
try:
oCurProc = oGuestSession.processCreate(sCmd, \
sArgs, [], \
[], 30 * 1000);
# Note: Use a timeout in the call above for not letting the stale processes
# hanging around forever. This can happen if the installed Guest Additions
# do not support terminating guest processes.
aaProcs.append(oCurProc);
except:
reporter.logXcpt('Creating blocking process failed:');
fRc = False;
break;
if fRc:
reporter.log2('Terminating blocking processes');
for i in range(0, cStaleProcs):
try:
aaProcs[i].terminate();
except: # Termination might not be supported, just skip and log it.
reporter.logXcpt('Termination of blocking process failed, skipped:');
cProcs = len(self.oTstDrv.oVBoxMgr.getArray(oGuestSession, 'processes'));
if cProcs != (cStaleProcs * 2): # Still should be 20 processes because we terminated the 10 newest ones.
reporter.error('Test failed: Got %ld total processes, expected %ld' % (cProcs, cStaleProcs * 2));
fRc = False;
cProcs = len(self.oTstDrv.oVBoxMgr.getArray(oGuestSession, 'processes'));
reporter.log2('Final guest session processes count: %ld' % (cProcs,));
# Now try to close the session and see what happens.
reporter.log2('Closing guest session ...');
oGuestSession.close();
except:
reporter.logXcpt('Testing for stale processes failed:');
fRc = False;
return (fRc, oTxsSession);
def testGuestCtrlExec(self, oSession, oTxsSession, oTestVm): # pylint: disable=R0914,R0915
"""
Tests the basic execution feature.
"""
if oTestVm.isWindows():
sUser = "Administrator";
else:
sUser = "vbox";
sPassword = "password";
if oTestVm.isWindows():
# Outputting stuff.
sImageOut = "C:\\windows\\system32\\cmd.exe";
else:
reporter.error('Implement me!'); ## @todo Implement non-Windows bits.
return (False, oTxsSession);
aaInvalid = [
# Invalid parameters.
[ tdTestExec(sUser = sUser, sPassword = sPassword),
tdTestResultExec(fRc = False) ],
# Non-existent / invalid image.
[ tdTestExec(sCmd = "non-existent", sUser = sUser, sPassword = sPassword),
tdTestResultExec(fRc = False) ],
[ tdTestExec(sCmd = "non-existent2", sUser = sUser, sPassword = sPassword, fWaitForExit = True),
tdTestResultExec(fRc = False) ],
# Use an invalid format string.
[ tdTestExec(sCmd = "%$%%%&", sUser = sUser, sPassword = sPassword),
tdTestResultExec(fRc = False) ],
# More stuff.
[ tdTestExec(sCmd = "ƒ‰‹ˆ÷‹¸", sUser = sUser, sPassword = sPassword),
tdTestResultExec(fRc = False) ],
[ tdTestExec(sCmd = "???://!!!", sUser = sUser, sPassword = sPassword),
tdTestResultExec(fRc = False) ],
[ tdTestExec(sCmd = "<>!\\", sUser = sUser, sPassword = sPassword),
tdTestResultExec(fRc = False) ]
# Enable as soon as ERROR_BAD_DEVICE is implemented.
#[ tdTestExec(sCmd = "CON", sUser = sUser, sPassword = sPassword),
# tdTestResultExec(fRc = False) ]
];
if oTestVm.isWindows():
aaExec = [
# Basic executon.
[ tdTestExec(sCmd = sImageOut, aArgs = [ '/C', 'dir', '/S', 'c:\\windows\\system32' ],
sUser = sUser, sPassword = sPassword),
tdTestResultExec(fRc = True) ],
[ tdTestExec(sCmd = sImageOut, aArgs = [ '/C', 'dir', '/S', 'c:\\windows\\system32\\kernel32.dll' ],
sUser = sUser, sPassword = sPassword),
tdTestResultExec(fRc = True) ],
[ tdTestExec(sCmd = sImageOut, aArgs = [ '/C', 'dir', '/S', 'c:\\windows\\system32\\nonexist.dll' ],
sUser = sUser, sPassword = sPassword),
tdTestResultExec(fRc = True, iExitCode = 1) ],
[ tdTestExec(sCmd = sImageOut, aArgs = [ '/C', 'dir', '/S', '/wrongparam' ],
sUser = sUser, sPassword = sPassword),
tdTestResultExec(fRc = True, iExitCode = 1) ],
# Paths with spaces.
## @todo Get path of installed Guest Additions. Later.
[ tdTestExec(sCmd = "C:\\Program Files\\Oracle\\VirtualBox Guest Additions\\VBoxControl.exe",
aArgs = [ 'version' ],
sUser = sUser, sPassword = sPassword),
tdTestResultExec(fRc = True) ],
# StdOut.
[ tdTestExec(sCmd = sImageOut, aArgs = [ '/C', 'dir', '/S', 'c:\\windows\\system32' ],
sUser = sUser, sPassword = sPassword),
tdTestResultExec(fRc = True) ],
[ tdTestExec(sCmd = sImageOut, aArgs = [ '/C', 'dir', '/S', 'stdout-non-existing' ],
sUser = sUser, sPassword = sPassword),
tdTestResultExec(fRc = True, iExitCode = 1) ],
# StdErr.
[ tdTestExec(sCmd = sImageOut, aArgs = [ '/C', 'dir', '/S', 'c:\\windows\\system32' ],
sUser = sUser, sPassword = sPassword),
tdTestResultExec(fRc = True) ],
[ tdTestExec(sCmd = sImageOut, aArgs = [ '/C', 'dir', '/S', 'stderr-non-existing' ],
sUser = sUser, sPassword = sPassword),
tdTestResultExec(fRc = True, iExitCode = 1) ],
# StdOut + StdErr.
[ tdTestExec(sCmd = sImageOut, aArgs = [ '/C', 'dir', '/S', 'c:\\windows\\system32' ],
sUser = sUser, sPassword = sPassword),
tdTestResultExec(fRc = True) ],
[ tdTestExec(sCmd = sImageOut, aArgs = [ '/C', 'dir', '/S', 'stdouterr-non-existing' ],
sUser = sUser, sPassword = sPassword),
tdTestResultExec(fRc = True, iExitCode = 1) ]
# FIXME: Failing tests.
# Environment variables.
# [ tdTestExec(sCmd = sImageOut, aArgs = [ '/C', 'set', 'TEST_NONEXIST' ],
# sUser = sUser, sPassword = sPassword),
# tdTestResultExec(fRc = True, iExitCode = 1) ]
# [ tdTestExec(sCmd = sImageOut, aArgs = [ '/C', 'set', 'windir' ],
# sUser = sUser, sPassword = sPassword,
# aFlags = [ vboxcon.ProcessCreateFlag_WaitForStdOut, vboxcon.ProcessCreateFlag_WaitForStdErr ]),
# tdTestResultExec(fRc = True, sBuf = 'windir=C:\\WINDOWS\r\n') ],
# [ tdTestExec(sCmd = sImageOut, aArgs = [ '/C', 'set', 'TEST_FOO' ],
# sUser = sUser, sPassword = sPassword,
# aEnv = [ 'TEST_FOO=BAR' ],
# aFlags = [ vboxcon.ProcessCreateFlag_WaitForStdOut, vboxcon.ProcessCreateFlag_WaitForStdErr ]),
# tdTestResultExec(fRc = True, sBuf = 'TEST_FOO=BAR\r\n') ],
# [ tdTestExec(sCmd = sImageOut, aArgs = [ '/C', 'set', 'TEST_FOO' ],
# sUser = sUser, sPassword = sPassword,
# aEnv = [ 'TEST_FOO=BAR', 'TEST_BAZ=BAR' ],
# aFlags = [ vboxcon.ProcessCreateFlag_WaitForStdOut, vboxcon.ProcessCreateFlag_WaitForStdErr ]),
# tdTestResultExec(fRc = True, sBuf = 'TEST_FOO=BAR\r\n') ]
## @todo Create some files (or get files) we know the output size of to validate output length!
## @todo Add task which gets killed at some random time while letting the guest output something.
];
# Manual test, not executed automatically.
aaManual = [
[ tdTestExec(sCmd = sImageOut, aArgs = [ '/C', 'dir /S C:\\Windows' ],
sUser = sUser, sPassword = sPassword,
aFlags = [ vboxcon.ProcessCreateFlag_WaitForStdOut, vboxcon.ProcessCreateFlag_WaitForStdErr ]),
tdTestResultExec(fRc = True, cbStdOut = 497917) ] ];
else:
reporter.log('No OS-specific tests for non-Windows yet!');
# Build up the final test array for the first batch.
aaTests = [];
aaTests.extend(aaInvalid);
if aaExec is not None:
aaTests.extend(aaExec);
fRc = True;
#
# Single execution stuff. Nice for debugging.
#
fManual = False;
if fManual:
curTest = aaTests[1][0]; # tdTestExec, use an index, later.
curRes = aaTests[1][1]; # tdTestResultExec
curTest.setEnvironment(oSession, oTxsSession, oTestVm);
fRc, curGuestSession = curTest.createSession('testGuestCtrlExec: Single test 1');
if fRc is False:
reporter.error('Single test failed: Could not create session');
else:
fRc = self.gctrlExecDoTest(0, curTest, curRes, curGuestSession);
curTest.closeSession();
curTest = aaTests[2][0]; # tdTestExec, use an index, later.
curRes = aaTests[2][1]; # tdTestResultExec
curTest.setEnvironment(oSession, oTxsSession, oTestVm);
fRc, curGuestSession = curTest.createSession('testGuestCtrlExec: Single test 2');
if fRc is False:
reporter.error('Single test failed: Could not create session');
else:
fRc = self.gctrlExecDoTest(0, curTest, curRes, curGuestSession);
curTest.closeSession();
curTest = aaTests[3][0]; # tdTestExec, use an index, later.
curRes = aaTests[3][1]; # tdTestResultExec
curTest.setEnvironment(oSession, oTxsSession, oTestVm);
fRc, curGuestSession = curTest.createSession('testGuestCtrlExec: Single test 3');
if fRc is False:
reporter.error('Single test failed: Could not create session');
else:
fRc = self.gctrlExecDoTest(0, curTest, curRes, curGuestSession);
curTest.closeSession();
return (fRc, oTxsSession);
else:
aaManual = aaManual; # Workaround for pylint #W0612.
if fRc is False:
return (fRc, oTxsSession);
#
# First batch: One session per guest process.
#
for (i, aTest) in enumerate(aaTests):
curTest = aTest[0]; # tdTestExec, use an index, later.
curRes = aTest[1]; # tdTestResultExec
curTest.setEnvironment(oSession, oTxsSession, oTestVm);
fRc, curGuestSession = curTest.createSession('testGuestCtrlExec: Test #%d' % (i,));
if fRc is False:
reporter.error('Test #%d failed: Could not create session' % (i,));
break;
fRc = self.gctrlExecDoTest(i, curTest, curRes, curGuestSession);
if fRc is False:
break;
fRc = curTest.closeSession();
if fRc is False:
break;
# No sessions left?
if fRc is True:
cSessions = len(self.oTstDrv.oVBoxMgr.getArray(oSession.o.console.guest, 'sessions'));
if cSessions is not 0:
reporter.error('Found %d stale session(s), expected 0' % (cSessions,));
fRc = False;
if fRc is False:
return (fRc, oTxsSession);
reporter.log('Now using one guest session for all tests ...');
#
# Second batch: One session for *all* guest processes.
#
oGuest = oSession.o.console.guest;
try:
reporter.log('Creating session for all tests ...');
curGuestSession = oGuest.createSession(sUser, sPassword, '', 'testGuestCtrlExec: One session for all tests');
try:
fWaitFor = [ vboxcon.GuestSessionWaitForFlag_Start ];
waitResult = curGuestSession.waitForArray(fWaitFor, 30 * 1000);
if waitResult != vboxcon.GuestSessionWaitResult_Start \
and waitResult != vboxcon.GuestSessionWaitResult_WaitFlagNotSupported:
reporter.error('Session did not start successfully, returned wait result: %ld' \
% (waitResult));
return (False, oTxsSession);
reporter.log('Session successfully started');
except:
# Just log, don't assume an error here (will be done in the main loop then).
reporter.logXcpt('Waiting for guest session to start failed:');
return (False, oTxsSession);
# Note: Not waiting for the guest session to start here
# is intentional. This must be handled by the process execution
# call then.
for (i, aTest) in enumerate(aaTests):
curTest = aTest[0]; # tdTestExec, use an index, later.
curRes = aTest[1]; # tdTestResultExec
curTest.setEnvironment(oSession, oTxsSession, oTestVm);
fRc = self.gctrlExecDoTest(i, curTest, curRes, curGuestSession);
if fRc is False:
break;
try:
reporter.log2('Closing guest session ...');
curGuestSession.close();
curGuestSession = None;
except:
# Just log, don't assume an error here (will be done in the main loop then).
reporter.logXcpt('Closing guest session failed:');
fRc = False;
except:
reporter.logXcpt('Could not create one session:');
# No sessions left?
if fRc is True:
cSessions = len(self.oTstDrv.oVBoxMgr.getArray(oSession.o.console.guest, 'sessions'));
if cSessions is not 0:
reporter.error('Found %d stale session(s), expected 0' % (cSessions,));
fRc = False;
return (fRc, oTxsSession);
def testGuestCtrlExecErrorLevel(self, oSession, oTxsSession, oTestVm):
"""
Tests handling of error levels from started guest processes.
"""
if oTestVm.isWindows():
sUser = "Administrator";
else:
sUser = "vbox";
sPassword = "password";
if oTestVm.isWindows():
# Outputting stuff.
sImage = "C:\\windows\\system32\\cmd.exe";
else:
reporter.error('Implement me!'); ## @todo Implement non-Windows bits.
return (False, oTxsSession);
aaTests = [];
if oTestVm.isWindows():
aaTests.extend([
# Simple.
[ tdTestExec(sCmd = sImage, aArgs = [ '/C', 'wrongcommand' ],
sUser = sUser, sPassword = sPassword),
tdTestResultExec(fRc = True, iExitCode = 1) ],
[ tdTestExec(sCmd = sImage, aArgs = [ '/C', 'exit', '22' ],
sUser = sUser, sPassword = sPassword),
tdTestResultExec(fRc = True, iExitCode = 22) ],
[ tdTestExec(sCmd = sImage, aArgs = [ '/C', 'set', 'ERRORLEVEL=234' ],
sUser = sUser, sPassword = sPassword),
tdTestResultExec(fRc = True, iExitCode = 0) ],
[ tdTestExec(sCmd = sImage, aArgs = [ '/C', 'echo', '%WINDIR%' ],
sUser = sUser, sPassword = sPassword),
tdTestResultExec(fRc = True, iExitCode = 0) ],
[ tdTestExec(sCmd = sImage, aArgs = [ '/C', 'set', 'ERRORLEVEL=0' ],
sUser = sUser, sPassword = sPassword),
tdTestResultExec(fRc = True, iExitCode = 0) ],
[ tdTestExec(sCmd = sImage, aArgs = [ '/C', 'dir', 'c:\\windows\\system32' ],
sUser = sUser, sPassword = sPassword),
tdTestResultExec(fRc = True, iExitCode = 0) ],
[ tdTestExec(sCmd = sImage, aArgs = [ '/C', 'dir', 'c:\\windows\\system32\\kernel32.dll' ],
sUser = sUser, sPassword = sPassword),
tdTestResultExec(fRc = True, iExitCode = 0) ],
[ tdTestExec(sCmd = sImage, aArgs = [ '/C', 'dir', 'c:\\nonexisting-file' ],
sUser = sUser, sPassword = sPassword),
tdTestResultExec(fRc = True, iExitCode = 1) ],
[ tdTestExec(sCmd = sImage, aArgs = [ '/C', 'dir', 'c:\\nonexisting-dir\\' ],
sUser = sUser, sPassword = sPassword),
tdTestResultExec(fRc = True, iExitCode = 1) ]
# FIXME: Failing tests.
# With stdout.
# [ tdTestExec(sCmd = sImage, aArgs = [ '/C', 'dir', 'c:\\windows\\system32' ],
# sUser = sUser, sPassword = sPassword, aFlags = [ vboxcon.ProcessCreateFlag_WaitForStdOut ]),
# tdTestResultExec(fRc = True, iExitCode = 0) ],
# [ tdTestExec(sCmd = sImage, aArgs = [ '/C', 'dir', 'c:\\nonexisting-file' ],
# sUser = sUser, sPassword = sPassword, aFlags = [ vboxcon.ProcessCreateFlag_WaitForStdOut ]),
# tdTestResultExec(fRc = True, iExitCode = 1) ],
# [ tdTestExec(sCmd = sImage, aArgs = [ '/C', 'dir', 'c:\\nonexisting-dir\\' ],
# sUser = sUser, sPassword = sPassword, aFlags = [ vboxcon.ProcessCreateFlag_WaitForStdOut ]),
# tdTestResultExec(fRc = True, iExitCode = 1) ],
# With stderr.
# [ tdTestExec(sCmd = sImage, aArgs = [ '/C', 'dir', 'c:\\windows\\system32' ],
# sUser = sUser, sPassword = sPassword, aFlags = [ vboxcon.ProcessCreateFlag_WaitForStdErr ]),
# tdTestResultExec(fRc = True, iExitCode = 0) ],
# [ tdTestExec(sCmd = sImage, aArgs = [ '/C', 'dir', 'c:\\nonexisting-file' ],
# sUser = sUser, sPassword = sPassword, aFlags = [ vboxcon.ProcessCreateFlag_WaitForStdErr ]),
# tdTestResultExec(fRc = True, iExitCode = 1) ],
# [ tdTestExec(sCmd = sImage, aArgs = [ '/C', 'dir', 'c:\\nonexisting-dir\\' ],
# sUser = sUser, sPassword = sPassword, aFlags = [ vboxcon.ProcessCreateFlag_WaitForStdErr ]),
# tdTestResultExec(fRc = True, iExitCode = 1) ],
# With stdout/stderr.
# [ tdTestExec(sCmd = sImage, aArgs = [ '/C', 'dir', 'c:\\windows\\system32' ],
# sUser = sUser, sPassword = sPassword,
# aFlags = [ vboxcon.ProcessCreateFlag_WaitForStdOut, vboxcon.ProcessCreateFlag_WaitForStdErr ]),
# tdTestResultExec(fRc = True, iExitCode = 0) ],
# [ tdTestExec(sCmd = sImage, aArgs = [ '/C', 'dir', 'c:\\nonexisting-file' ],
# sUser = sUser, sPassword = sPassword,
# aFlags = [ vboxcon.ProcessCreateFlag_WaitForStdOut, vboxcon.ProcessCreateFlag_WaitForStdErr ]),
# tdTestResultExec(fRc = True, iExitCode = 1) ],
# [ tdTestExec(sCmd = sImage, aArgs = [ '/C', 'dir', 'c:\\nonexisting-dir\\' ],
# sUser = sUser, sPassword = sPassword,
# aFlags = [ vboxcon.ProcessCreateFlag_WaitForStdOut, vboxcon.ProcessCreateFlag_WaitForStdErr ]),
# tdTestResultExec(fRc = True, iExitCode = 1) ]
## @todo Test stdin!
]);
else:
reporter.log('No OS-specific tests for non-Windows yet!');
fRc = True;
for (i, aTest) in enumerate(aaTests):
curTest = aTest[0]; # tdTestExec, use an index, later.
curRes = aTest[1]; # tdTestResult
curTest.setEnvironment(oSession, oTxsSession, oTestVm);
fRc, curGuestSession = curTest.createSession('testGuestCtrlExecErrorLevel: Test #%d' % (i,));
if fRc is False:
reporter.error('Test #%d failed: Could not create session' % (i,));
break;
fRc = self.gctrlExecDoTest(i, curTest, curRes, curGuestSession);
curTest.closeSession();
if fRc is False:
break;
return (fRc, oTxsSession);
def testGuestCtrlExecTimeout(self, oSession, oTxsSession, oTestVm):
"""
Tests handling of timeouts of started guest processes.
"""
if oTestVm.isWindows():
sUser = "Administrator";
else:
sUser = "vbox";
sPassword = "password";
sDomain = "";
if oTestVm.isWindows():
# Outputting stuff.
sImage = "C:\\windows\\system32\\cmd.exe";
else:
reporter.error('Implement me!'); ## @todo Implement non-Windows bits.
return (False, oTxsSession);
fRc = True;
try:
oGuest = oSession.o.console.guest;
oGuestSession = oGuest.createSession(sUser, sPassword, sDomain, "testGuestCtrlExecTimeout");
oGuestSession.waitForArray([ vboxcon.GuestSessionWaitForFlag_Start ], 30 * 1000);
# Create a process which never terminates and should timeout when
# waiting for termination.
try:
curProc = oGuestSession.processCreate(sImage, [], \
[], [], 30 * 1000);
reporter.log('Waiting for process 1 being started ...');
waitRes = curProc.waitForArray([ vboxcon.ProcessWaitForFlag_Start ], 30 * 1000);
if waitRes != vboxcon.ProcessWaitResult_Start:
reporter.error('Waiting for process 1 to start failed, got status %ld');
fRc = False;
if fRc:
reporter.log('Waiting for process 1 to time out within 1ms ...');
waitRes = curProc.waitForArray([ vboxcon.ProcessWaitForFlag_Terminate ], 1);
if waitRes != vboxcon.ProcessWaitResult_Timeout:
reporter.error('Waiting for process 1 did not time out when it should (1)');
fRc = False;
else:
reporter.log('Waiting for process 1 timed out (1), good');
if fRc:
reporter.log('Waiting for process 1 to time out within 5000ms ...');
waitRes = curProc.waitForArray([ vboxcon.ProcessWaitForFlag_Terminate ], 5000);
if waitRes != vboxcon.ProcessWaitResult_Timeout:
reporter.error('Waiting for process 1 did not time out when it should, got wait result %ld' % (waitRes,));
fRc = False;
else:
reporter.log('Waiting for process 1 timed out (5000), good');
## @todo Add curProc.terminate() as soon as it's implemented.
except:
reporter.errorXcpt('Exception for process 1:');
fRc = False;
# Create a lengthly running guest process which will be killed by VBoxService on the
# guest because it ran out of execution time (5 seconds).
if fRc:
try:
curProc = oGuestSession.processCreate(sImage, [], \
[], [], 5 * 1000);
reporter.log('Waiting for process 2 being started ...');
waitRes = curProc.waitForArray([ vboxcon.ProcessWaitForFlag_Start ], 30 * 1000);
if waitRes != vboxcon.ProcessWaitResult_Start:
reporter.error('Waiting for process 1 to start failed, got status %ld');
fRc = False;
if fRc:
reporter.log('Waiting for process 2 to get killed because it ran out of execution time ...');
waitRes = curProc.waitForArray([ vboxcon.ProcessWaitForFlag_Terminate ], 30 * 1000);
if waitRes != vboxcon.ProcessWaitResult_Timeout:
reporter.error('Waiting for process 2 did not time out when it should, got wait result %ld' \
% (waitRes,));
fRc = False;
if fRc:
reporter.log('Waiting for process 2 indicated an error, good');
if curProc.status != vboxcon.ProcessStatus_TimedOutKilled:
reporter.error('Status of process 2 wrong; excepted %ld, got %ld' \
% (vboxcon.ProcessStatus_TimedOutKilled, curProc.status));
fRc = False;
else:
reporter.log('Status of process 2 correct (%ld)' % (vboxcon.ProcessStatus_TimedOutKilled,));
## @todo Add curProc.terminate() as soon as it's implemented.
except:
reporter.errorXcpt('Exception for process 2:');
fRc = False;
oGuestSession.close();
except:
reporter.errorXcpt('Could not handle session:');
fRc = False;
return (fRc, oTxsSession);
def testGuestCtrlDirCreate(self, oSession, oTxsSession, oTestVm):
"""
Tests creation of guest directories.
"""
if oTestVm.isWindows():
sUser = "Administrator";
else:
sUser = "vbox";
sPassword = "password";
if oTestVm.isWindows():
sScratch = "C:\\Temp\\vboxtest\\testGuestCtrlDirCreate\\";
aaTests = [];
if oTestVm.isWindows():
aaTests.extend([
# Invalid stuff.
[ tdTestDirCreate(sUser = sUser, sPassword = sPassword, sDirectory = '' ),
tdTestResult(fRc = False) ],
# More unusual stuff.
[ tdTestDirCreate(sUser = sUser, sPassword = sPassword, sDirectory = '..\\..\\' ),
tdTestResult(fRc = False) ],
[ tdTestDirCreate(sUser = sUser, sPassword = sPassword, sDirectory = '../../' ),
tdTestResult(fRc = False) ],
[ tdTestDirCreate(sUser = sUser, sPassword = sPassword, sDirectory = 'z:\\' ),
tdTestResult(fRc = False) ],
[ tdTestDirCreate(sUser = sUser, sPassword = sPassword, sDirectory = '\\\\uncrulez\\foo' ),
tdTestResult(fRc = False) ],
# Creating directories.
[ tdTestDirCreate(sUser = sUser, sPassword = sPassword, sDirectory = sScratch ),
tdTestResult(fRc = False) ],
[ tdTestDirCreate(sUser = sUser, sPassword = sPassword, sDirectory = os.path.join(sScratch, 'foo\\bar\\baz'),
aFlags = [ vboxcon.DirectoryCreateFlag_Parents ] ),
tdTestResult(fRc = True) ],
[ tdTestDirCreate(sUser = sUser, sPassword = sPassword, sDirectory = os.path.join(sScratch, 'foo\\bar\\baz'),
aFlags = [ vboxcon.DirectoryCreateFlag_Parents ] ),
tdTestResult(fRc = True) ],
# Long (+ random) stuff.
[ tdTestDirCreate(sUser = sUser, sPassword = sPassword,
sDirectory = os.path.join(sScratch,
"".join(random.choice(string.ascii_lowercase) for i in range(32))) ),
tdTestResult(fRc = True) ],
[ tdTestDirCreate(sUser = sUser, sPassword = sPassword,
sDirectory = os.path.join(sScratch,
"".join(random.choice(string.ascii_lowercase) for i in range(128))) ),
tdTestResult(fRc = True) ],
# Following two should fail on Windows (paths too long). Both should timeout.
[ tdTestDirCreate(sUser = sUser, sPassword = sPassword,
sDirectory = os.path.join(sScratch,
"".join(random.choice(string.ascii_lowercase) for i in range(255))) ),
tdTestResult(fRc = False) ],
[ tdTestDirCreate(sUser = sUser, sPassword = sPassword,
sDirectory = os.path.join(sScratch,
"".join(random.choice(string.ascii_lowercase) for i in range(1024))) ),
tdTestResult(fRc = False) ]
]);
else:
reporter.log('No OS-specific tests for non-Windows yet!');
fRc = True;
for (i, aTest) in enumerate(aaTests):
curTest = aTest[0]; # tdTestExec, use an index, later.
curRes = aTest[1]; # tdTestResult
reporter.log('Testing #%d, sDirectory="%s" ...' % (i, curTest.sDirectory));
curTest.setEnvironment(oSession, oTxsSession, oTestVm);
fRc, curGuestSession = curTest.createSession('testGuestCtrlDirCreate: Test #%d' % (i,));
if fRc is False:
reporter.error('Test #%d failed: Could not create session' % (i,));
break;
fRc = self.gctrlCreateDir(curTest, curRes, curGuestSession);
curTest.closeSession();
if fRc is False:
reporter.error('Test #%d failed' % (i,));
fRc = False;
break;
return (fRc, oTxsSession);
def testGuestCtrlDirCreateTemp(self, oSession, oTxsSession, oTestVm): # pylint: disable=R0914
"""
Tests creation of temporary directories.
"""
if oTestVm.isWindows():
sUser = "Administrator";
else:
sUser = "vbox";
sPassword = "password";
# if oTestVm.isWindows():
# sScratch = "C:\\Temp\\vboxtest\\testGuestCtrlDirCreateTemp\\";
aaTests = [];
if oTestVm.isWindows():
aaTests.extend([
# Invalid stuff.
[ tdTestDirCreateTemp(sUser = sUser, sPassword = sPassword, sDirectory = ''),
tdTestResult(fRc = False) ],
[ tdTestDirCreateTemp(sUser = sUser, sPassword = sPassword, sDirectory = 'C:\\Windows',
fMode = 1234),
tdTestResult(fRc = False) ],
[ tdTestDirCreateTemp(sUser = sUser, sPassword = sPassword, sTemplate = '',
sDirectory = 'C:\\Windows', fMode = 1234),
tdTestResult(fRc = False) ],
[ tdTestDirCreateTemp(sUser = sUser, sPassword = sPassword, sTemplate = 'xXx',
sDirectory = 'C:\\Windows', fMode = 0700),
tdTestResult(fRc = False) ],
[ tdTestDirCreateTemp(sUser = sUser, sPassword = sPassword, sTemplate = 'xxx',
sDirectory = 'C:\\Windows', fMode = 0700),
tdTestResult(fRc = False) ],
# More unusual stuff.
[ tdTestDirCreateTemp(sUser = sUser, sPassword = sPassword, sTemplate = 'foo',
sDirectory = 'z:\\'),
tdTestResult(fRc = False) ],
[ tdTestDirCreateTemp(sUser = sUser, sPassword = sPassword, sTemplate = 'foo',
sDirectory = '\\\\uncrulez\\foo'),
tdTestResult(fRc = False) ],
# Non-existing stuff.
[ tdTestDirCreateTemp(sUser = sUser, sPassword = sPassword, sTemplate = 'bar',
sDirectory = 'c:\\Apps\\nonexisting\\foo'),
tdTestResult(fRc = False) ],
# FIXME: Failing test. Non Windows path
# [ tdTestDirCreateTemp(sUser = sUser, sPassword = sPassword, sTemplate = 'bar',
# sDirectory = '/tmp/non/existing'),
# tdTestResult(fRc = False) ]
]);
else:
reporter.log('No OS-specific tests for non-Windows yet!');
# FIXME: Failing tests.
# aaTests.extend([
# Non-secure variants.
# [ tdTestDirCreateTemp(sUser = sUser, sPassword = sPassword, sTemplate = 'XXX',
# sDirectory = sScratch),
# tdTestResult(fRc = True) ],
# [ tdTestDirCreateTemp(sUser = sUser, sPassword = sPassword, sTemplate = 'XXX',
# sDirectory = sScratch),
# tdTestResult(fRc = True) ],
# [ tdTestDirCreateTemp(sUser = sUser, sPassword = sPassword, sTemplate = 'X',
# sDirectory = sScratch),
# tdTestResult(fRc = True) ],
# [ tdTestDirCreateTemp(sUser = sUser, sPassword = sPassword, sTemplate = 'X',
# sDirectory = sScratch),
# tdTestResult(fRc = True) ],
# [ tdTestDirCreateTemp(sUser = sUser, sPassword = sPassword, sTemplate = 'XXX',
# sDirectory = sScratch,
# fMode = 0700),
# tdTestResult(fRc = True) ],
# [ tdTestDirCreateTemp(sUser = sUser, sPassword = sPassword, sTemplate = 'XXX',
# sDirectory = sScratch,
# fMode = 0700),
# tdTestResult(fRc = True) ],
# [ tdTestDirCreateTemp(sUser = sUser, sPassword = sPassword, sTemplate = 'XXX',
# sDirectory = sScratch,
# fMode = 0755),
# tdTestResult(fRc = True) ],
# [ tdTestDirCreateTemp(sUser = sUser, sPassword = sPassword, sTemplate = 'XXX',
# sDirectory = sScratch,
# fMode = 0755),
# tdTestResult(fRc = True) ],
# Secure variants.
# [ tdTestDirCreateTemp(sUser = sUser, sPassword = sPassword, sTemplate = 'XXX',
# sDirectory = sScratch, fSecure = True),
# tdTestResult(fRc = True) ],
# [ tdTestDirCreateTemp(sUser = sUser, sPassword = sPassword, sTemplate = 'XXX',
# sDirectory = sScratch, fSecure = True),
# tdTestResult(fRc = True) ],
# [ tdTestDirCreateTemp(sUser = sUser, sPassword = sPassword, sTemplate = 'XXX',
# sDirectory = sScratch, fSecure = True),
# tdTestResult(fRc = True) ],
# [ tdTestDirCreateTemp(sUser = sUser, sPassword = sPassword, sTemplate = 'XXX',
# sDirectory = sScratch, fSecure = True),
# tdTestResult(fRc = True) ],
# [ tdTestDirCreateTemp(sUser = sUser, sPassword = sPassword, sTemplate = 'XXX',
# sDirectory = sScratch,
# fSecure = True, fMode = 0700),
# tdTestResult(fRc = True) ],
# [ tdTestDirCreateTemp(sUser = sUser, sPassword = sPassword, sTemplate = 'XXX',
# sDirectory = sScratch,
# fSecure = True, fMode = 0700),
# tdTestResult(fRc = True) ],
# [ tdTestDirCreateTemp(sUser = sUser, sPassword = sPassword, sTemplate = 'XXX',
# sDirectory = sScratch,
# fSecure = True, fMode = 0755),
# tdTestResult(fRc = True) ],
# [ tdTestDirCreateTemp(sUser = sUser, sPassword = sPassword, sTemplate = 'XXX',
# sDirectory = sScratch,
# fSecure = True, fMode = 0755),
# tdTestResult(fRc = True) ],
# Random stuff.
# [ tdTestDirCreateTemp(sUser = sUser, sPassword = sPassword,
# sTemplate = "XXX-".join(random.choice(string.ascii_lowercase) for i in range(32)),
# sDirectory = sScratch,
# fSecure = True, fMode = 0755),
# tdTestResult(fRc = True) ],
# [ tdTestDirCreateTemp(sUser = sUser, sPassword = sPassword, sTemplate = "".join('X' for i in range(32)),
# sDirectory = sScratch,
# fSecure = True, fMode = 0755),
# tdTestResult(fRc = True) ],
# [ tdTestDirCreateTemp(sUser = sUser, sPassword = sPassword, sTemplate = "".join('X' for i in range(128)),
# sDirectory = sScratch,
# fSecure = True, fMode = 0755),
# tdTestResult(fRc = True) ]
# ]);
fRc = True;
for (i, aTest) in enumerate(aaTests):
curTest = aTest[0]; # tdTestExec, use an index, later.
curRes = aTest[1]; # tdTestResult
reporter.log('Testing #%d, sTemplate="%s", fMode=%#o, path="%s", secure="%s" ...' %
(i, curTest.sTemplate, curTest.fMode, curTest.sDirectory, curTest.fSecure));
curTest.setEnvironment(oSession, oTxsSession, oTestVm);
fRc, curGuestSession = curTest.createSession('testGuestCtrlDirCreateTemp: Test #%d' % (i,));
if fRc is False:
reporter.error('Test #%d failed: Could not create session' % (i,));
break;
sDirTemp = "";
try:
sDirTemp = curGuestSession.directoryCreateTemp(curTest.sTemplate, curTest.fMode,
curTest.sDirectory, curTest.fSecure);
except:
if curRes.fRc is True:
reporter.errorXcpt('Creating temp directory "%s" failed:' % (curTest.sDirectory,));
fRc = False;
break;
else:
reporter.logXcpt('Creating temp directory "%s" failed expectedly, skipping:' % (curTest.sDirectory,));
curTest.closeSession();
if sDirTemp != "":
reporter.log2('Temporary directory is: %s' % (sDirTemp,));
fExists = curGuestSession.directoryExists(sDirTemp);
if fExists is False:
reporter.error('Test #%d failed: Temporary directory "%s" does not exists' % (i, sDirTemp));
fRc = False;
break;
return (fRc, oTxsSession);
def testGuestCtrlDirRead(self, oSession, oTxsSession, oTestVm): # pylint: disable=R0914
"""
Tests opening and reading (enumerating) guest directories.
"""
if oTestVm.isWindows():
sUser = "Administrator";
else:
sUser = "vbox";
sPassword = "password";
aaTests = [];
if oTestVm.isWindows():
aaTests.extend([
# Invalid stuff.
[ tdTestDirRead(sUser = sUser, sPassword = sPassword, sDirectory = ''),
tdTestResultDirRead(fRc = False) ],
[ tdTestDirRead(sUser = sUser, sPassword = sPassword, sDirectory = 'C:\\Windows', aFlags = [ 1234 ]),
tdTestResultDirRead(fRc = False) ],
[ tdTestDirRead(sUser = sUser, sPassword = sPassword, sDirectory = 'C:\\Windows', sFilter = '*.foo'),
tdTestResultDirRead(fRc = False) ],
# More unusual stuff.
[ tdTestDirRead(sUser = sUser, sPassword = sPassword, sDirectory = 'z:\\'),
tdTestResultDirRead(fRc = False) ],
[ tdTestDirRead(sUser = sUser, sPassword = sPassword, sDirectory = '\\\\uncrulez\\foo'),
tdTestResultDirRead(fRc = False) ],
# Non-existing stuff.
[ tdTestDirRead(sUser = sUser, sPassword = sPassword, sDirectory = 'c:\\Apps\\nonexisting'),
tdTestResultDirRead(fRc = False) ],
[ tdTestDirRead(sUser = sUser, sPassword = sPassword, sDirectory = 'c:\\Apps\\testDirRead'),
tdTestResultDirRead(fRc = False) ]
]);
if oTestVm.sVmName == 'tst-xppro':
aaTests.extend([
# Reading directories.
[ tdTestDirRead(sUser = sUser, sPassword = sPassword, sDirectory = '../../Windows/Fonts'),
tdTestResultDirRead(fRc = True, numFiles = 191) ],
[ tdTestDirRead(sUser = sUser, sPassword = sPassword, sDirectory = 'c:\\Windows\\Help'),
tdTestResultDirRead(fRc = True, numDirs = 13, numFiles = 569) ],
[ tdTestDirRead(sUser = sUser, sPassword = sPassword, sDirectory = 'c:\\Windows\\Web'),
tdTestResultDirRead(fRc = True, numDirs = 3, numFiles = 55) ]
]);
else:
reporter.log('No OS-specific tests for non-Windows yet!');
fRc = True;
for (i, aTest) in enumerate(aaTests):
curTest = aTest[0]; # tdTestExec, use an index, later.
curRes = aTest[1]; # tdTestResult
reporter.log('Testing #%d, dir="%s" ...' % (i, curTest.sDirectory));
curTest.setEnvironment(oSession, oTxsSession, oTestVm);
fRc, curGuestSession = curTest.createSession('testGuestCtrlDirRead: Test #%d' % (i,));
if fRc is False:
reporter.error('Test #%d failed: Could not create session' % (i,));
break;
(fRc2, cDirs, cFiles) = self.gctrlReadDir(curTest, curRes, curGuestSession);
curTest.closeSession();
reporter.log2('Test #%d: Returned %ld directories, %ld files total' % (i, cDirs, cFiles));
if fRc2 is curRes.fRc:
if fRc2 is True:
if curRes.numFiles != cFiles:
reporter.error('Test #%d failed: Got %ld files, expected %ld' % (i, cFiles, curRes.numFiles));
fRc = False;
break;
if curRes.numDirs != cDirs:
reporter.error('Test #%d failed: Got %ld directories, expected %ld' % (i, cDirs, curRes.numDirs));
fRc = False;
break;
else:
reporter.error('Test #%d failed: Got %s, expected %s' % (i, fRc2, curRes.fRc));
fRc = False;
break;
return (fRc, oTxsSession);
def testGuestCtrlFileRemove(self, oSession, oTxsSession, oTestVm):
"""
Tests removing guest files.
"""
if oTestVm.isWindows():
sUser = "Administrator";
else:
sUser = "vbox";
sPassword = "password";
aaTests = [];
if oTestVm.isWindows():
aaTests.extend([
# Invalid stuff.
[ tdTestFileRemove(sUser = sUser, sPassword = sPassword, sFile = ''), tdTestResult(fRc = False) ],
[ tdTestFileRemove(sUser = sUser, sPassword = sPassword, sFile = 'C:\\Windows'), tdTestResult(fRc = False) ],
[ tdTestFileRemove(sUser = sUser, sPassword = sPassword, sFile = 'C:\\Windows'), tdTestResult(fRc = False) ],
# More unusual stuff.
[ tdTestFileRemove(sUser = sUser, sPassword = sPassword, sFile = 'z:\\'), tdTestResult(fRc = False) ],
[ tdTestFileRemove(sUser = sUser, sPassword = sPassword, sFile = '\\\\uncrulez\\foo'),
tdTestResult(fRc = False) ],
# Non-existing stuff.
[ tdTestFileRemove(sUser = sUser, sPassword = sPassword, sFile = 'c:\\Apps\\nonexisting'),
tdTestResult(fRc = False) ],
[ tdTestFileRemove(sUser = sUser, sPassword = sPassword, sFile = 'c:\\Apps\\testFileRemove'),
tdTestResult(fRc = False) ],
# Try to delete system files.
[ tdTestFileRemove(sUser = sUser, sPassword = sPassword, sFile = 'c:\\pagefile.sys'),
tdTestResult(fRc = False) ],
[ tdTestFileRemove(sUser = sUser, sPassword = sPassword, sFile = 'c:\\Windows\\kernel32.sys'),
tdTestResult(fRc = False) ]
]);
if oTestVm.sVmName == 'tst-xppro':
aaTests.extend([
# Try delete some unimportant media stuff.
[ tdTestFileRemove(sUser = sUser, sPassword = sPassword, sFile = 'c:\\Windows\\Media\\chimes.wav'),
tdTestResult(fRc = True) ],
# Second attempt should fail.
[ tdTestFileRemove(sUser = sUser, sPassword = sPassword, sFile = 'c:\\Windows\\Media\\chimes.wav'),
tdTestResult(fRc = False) ],
[ tdTestFileRemove(sUser = sUser, sPassword = sPassword, sFile = 'c:\\Windows\\Media\\chord.wav'),
tdTestResult(fRc = True) ],
[ tdTestFileRemove(sUser = sUser, sPassword = sPassword, sFile = 'c:\\Windows\\Media\\chord.wav'),
tdTestResult(fRc = False) ]
]);
else:
reporter.log('No OS-specific tests for non-Windows yet!');
fRc = True;
for (i, aTest) in enumerate(aaTests):
curTest = aTest[0]; # tdTestExec, use an index, later.
curRes = aTest[1]; # tdTestResult
reporter.log('Testing #%d, file="%s" ...' % (i, curTest.sFile));
curTest.setEnvironment(oSession, oTxsSession, oTestVm);
fRc, curGuestSession = curTest.createSession('testGuestCtrlFileRemove: Test #%d' % (i,));
if fRc is False:
reporter.error('Test #%d failed: Could not create session' % (i,));
break;
try:
curGuestSession.fileRemove(curTest.sFile);
except:
if curRes.fRc is True:
reporter.errorXcpt('Removing file "%s" failed:' % (curTest.sFile,));
fRc = False;
break;
else:
reporter.logXcpt('Removing file "%s" failed expectedly, skipping:' % (curTest.sFile,));
curTest.closeSession();
return (fRc, oTxsSession);
def testGuestCtrlFileStat(self, oSession, oTxsSession, oTestVm): # pylint: disable=R0914
"""
Tests querying file information through stat.
"""
if oTestVm.isWindows():
sUser = "Administrator";
else:
sUser = "vbox";
sPassword = "password";
aaTests = [];
if oTestVm.isWindows():
aaTests.extend([
# Invalid stuff.
[ tdTestFileStat(sUser = sUser, sPassword = sPassword, sFile = ''),
tdTestResultFileStat(fRc = False) ],
[ tdTestFileStat(sUser = sUser, sPassword = sPassword, sFile = 'C:\\Windows'),
tdTestResultFileStat(fRc = False) ],
[ tdTestFileStat(sUser = sUser, sPassword = sPassword, sFile = 'C:\\Windows'),
tdTestResultFileStat(fRc = False) ],
# More unusual stuff.
[ tdTestFileStat(sUser = sUser, sPassword = sPassword, sFile = 'z:\\'),
tdTestResultFileStat(fRc = False) ],
[ tdTestFileStat(sUser = sUser, sPassword = sPassword, sFile = '\\\\uncrulez\\foo'),
tdTestResultFileStat(fRc = False) ],
# Non-existing stuff.
[ tdTestFileStat(sUser = sUser, sPassword = sPassword, sFile = 'c:\\Apps\\nonexisting'),
tdTestResultFileStat(fRc = False) ],
[ tdTestFileStat(sUser = sUser, sPassword = sPassword, sFile = 'c:\\Apps\\testDirRead'),
tdTestResultFileStat(fRc = False) ]
]);
if oTestVm.sVmName == 'tst-xppro':
aaTests.extend([
# Directories; should fail.
[ tdTestFileStat(sUser = sUser, sPassword = sPassword, sFile = '../../Windows/Fonts'),
tdTestResultFileStat(fRc = False) ],
[ tdTestFileStat(sUser = sUser, sPassword = sPassword, sFile = 'c:\\Windows\\Help'),
tdTestResultFileStat(fRc = False) ],
[ tdTestFileStat(sUser = sUser, sPassword = sPassword, sFile = 'c:\\Windows\\system32'),
tdTestResultFileStat(fRc = False) ],
# Regular files.
[ tdTestFileStat(sUser = sUser, sPassword = sPassword, sFile = 'c:\\Windows\\system32\\kernel32.dll'),
tdTestResultFileStat(fRc = False, cbSize = 926720, eFileType = vboxcon.FsObjType_File) ]
]);
else:
reporter.log('No OS-specific tests for non-Windows yet!');
fRc = True;
for (i, aTest) in enumerate(aaTests):
curTest = aTest[0]; # tdTestExec, use an index, later.
curRes = aTest[1]; # tdTestResult
reporter.log('Testing #%d, sFile="%s" ...' % (i, curTest.sFile));
curTest.setEnvironment(oSession, oTxsSession, oTestVm);
fRc, curGuestSession = curTest.createSession('testGuestCtrlFileStat: Test #%d' % (i,));
if fRc is False:
reporter.error('Test #%d failed: Could not create session' % (i,));
break;
fileObjInfo = None;
try:
fileObjInfo = curGuestSession.fileQueryInfo(curTest.sFile);
except:
if curRes.fRc is True:
reporter.errorXcpt('Querying file information for "%s" failed:' % (curTest.sFile,));
fRc = False;
break;
else:
reporter.logXcpt('Querying file information for "%s" failed expectedly, skipping:' % (curTest.sFile,));
curTest.closeSession();
if fileObjInfo is not None:
eFileType = fileObjInfo.type;
if curRes.eFileType != eFileType:
reporter.error('Test #%d failed: Got file type %ld, expected %ld' % (i, eFileType, curRes.eFileType));
fRc = False;
break;
cbFile = long(fileObjInfo.objectSize);
if curRes.cbSize != cbFile:
reporter.error('Test #%d failed: Got %ld bytes size, expected %ld bytes' % (i, cbFile, curRes.cbSize));
fRc = False;
break;
## @todo Add more checks later.
return (fRc, oTxsSession);
def testGuestCtrlFileRead(self, oSession, oTxsSession, oTestVm): # pylint: disable=R0914
"""
Tests reading from guest files.
"""
if oTestVm.isWindows():
sUser = "Administrator";
else:
sUser = "vbox";
sPassword = "password";
if oTxsSession.syncMkDir('${SCRATCH}/testGuestCtrlFileRead') is False:
reporter.error('Could not create scratch directory on guest');
return (False, oTxsSession);
aaTests = [];
aaTests.extend([
# Invalid stuff.
[ tdTestFileReadWrite(sUser = sUser, sPassword = sPassword, cbToReadWrite = 0),
tdTestResultFileReadWrite(fRc = False) ],
[ tdTestFileReadWrite(sUser = sUser, sPassword = sPassword, sFile = ''),
tdTestResultFileReadWrite(fRc = False) ],
[ tdTestFileReadWrite(sUser = sUser, sPassword = sPassword, sFile = 'non-existing.file'),
tdTestResultFileReadWrite(fRc = False) ],
# Wrong open mode.
[ tdTestFileReadWrite(sUser = sUser, sPassword = sPassword, sFile = 'non-existing.file', \
sOpenMode = 'rt', sDisposition = 'oe'),
tdTestResultFileReadWrite(fRc = False) ],
[ tdTestFileReadWrite(sUser = sUser, sPassword = sPassword, sFile = '\\\\uncrulez\\non-existing.file', \
sOpenMode = 'tr', sDisposition = 'oe'),
tdTestResultFileReadWrite(fRc = False) ],
[ tdTestFileReadWrite(sUser = sUser, sPassword = sPassword, sFile = '../../non-existing.file', \
sOpenMode = 'wr', sDisposition = 'oe'),
tdTestResultFileReadWrite(fRc = False) ],
# Wrong disposition.
[ tdTestFileReadWrite(sUser = sUser, sPassword = sPassword, sFile = 'non-existing.file', \
sOpenMode = 'r', sDisposition = 'e'),
tdTestResultFileReadWrite(fRc = False) ],
[ tdTestFileReadWrite(sUser = sUser, sPassword = sPassword, sFile = '\\\\uncrulez\\non-existing.file', \
sOpenMode = 'r', sDisposition = 'o'),
tdTestResultFileReadWrite(fRc = False) ],
[ tdTestFileReadWrite(sUser = sUser, sPassword = sPassword, sFile = '../../non-existing.file', \
sOpenMode = 'r', sDisposition = 'c'),
tdTestResultFileReadWrite(fRc = False) ],
# Opening non-existing file when it should exist.
[ tdTestFileReadWrite(sUser = sUser, sPassword = sPassword, sFile = 'non-existing.file', \
sOpenMode = 'r', sDisposition = 'oe'),
tdTestResultFileReadWrite(fRc = False) ],
[ tdTestFileReadWrite(sUser = sUser, sPassword = sPassword, sFile = '\\\\uncrulez\\non-existing.file', \
sOpenMode = 'r', sDisposition = 'oe'),
tdTestResultFileReadWrite(fRc = False) ],
[ tdTestFileReadWrite(sUser = sUser, sPassword = sPassword, sFile = '../../non-existing.file', \
sOpenMode = 'r', sDisposition = 'oe'),
tdTestResultFileReadWrite(fRc = False) ]
]);
if oTestVm.isWindows():
aaTests.extend([
# Create a file which must not exist (but it hopefully does).
[ tdTestFileReadWrite(sUser = sUser, sPassword = sPassword, sFile = 'C:\\Windows\\System32\\calc.exe', \
sOpenMode = 'w', sDisposition = 'ce'),
tdTestResultFileReadWrite(fRc = False) ],
# Open a file which must exist.
[ tdTestFileReadWrite(sUser = sUser, sPassword = sPassword, sFile = 'C:\\Windows\\System32\\kernel32.dll', \
sOpenMode = 'r', sDisposition = 'oe'),
tdTestResultFileReadWrite(fRc = True) ],
# Try truncating a file which already is opened with a different sharing mode (and thus should fail).
[ tdTestFileReadWrite(sUser = sUser, sPassword = sPassword, sFile = 'C:\\Windows\\System32\\kernel32.dll', \
sOpenMode = 'w', sDisposition = 'ot'),
tdTestResultFileReadWrite(fRc = False) ]
]);
if oTestVm.sKind == "WindowsXP":
aaTests.extend([
# Reading from beginning.
[ tdTestFileReadWrite(sUser = sUser, sPassword = sPassword, sFile = 'C:\\Windows\\System32\\eula.txt', \
sOpenMode = 'r', sDisposition = 'oe', cbToReadWrite = 33),
tdTestResultFileReadWrite(fRc = True, aBuf = 'Microsoft Windows XP Professional', \
cbProcessed = 33, cbOffset = 33) ],
# Reading from offset.
[ tdTestFileReadWrite(sUser = sUser, sPassword = sPassword, sFile = 'C:\\Windows\\System32\\eula.txt', \
sOpenMode = 'r', sDisposition = 'oe', cbOffset = 17782, cbToReadWrite = 26),
tdTestResultFileReadWrite(fRc = True, aBuf = 'LINKS TO THIRD PARTY SITES', \
cbProcessed = 26, cbOffset = 17782 + 26) ]
]);
fRc = True;
for (i, aTest) in enumerate(aaTests):
curTest = aTest[0]; # tdTestFileReadWrite, use an index, later.
curRes = aTest[1]; # tdTestResult
reporter.log('Testing #%d, sFile="%s", cbToReadWrite=%d, sOpenMode="%s", sDisposition="%s", cbOffset=%ld ...' % \
(i, curTest.sFile, curTest.cbToReadWrite, curTest.sOpenMode, curTest.sDisposition, curTest.cbOffset));
curTest.setEnvironment(oSession, oTxsSession, oTestVm);
fRc, curGuestSession = curTest.createSession('testGuestCtrlFileRead: Test #%d' % (i,));
if fRc is False:
reporter.error('Test #%d failed: Could not create session' % (i,));
break;
try:
if curTest.cbOffset > 0:
curFile = curGuestSession.fileOpenEx(curTest.sFile, curTest.sOpenMode, curTest.sDisposition, \
curTest.sSharingMode, curTest.lCreationMode, curTest.cbOffset);
curOffset = long(curFile.offset);
resOffset = long(curTest.cbOffset);
if curOffset != resOffset:
reporter.error('Test #%d failed: Initial offset on open does not match: Got %ld, expected %ld' \
% (i, curOffset, resOffset));
fRc = False;
else:
curFile = curGuestSession.fileOpen(curTest.sFile, curTest.sOpenMode, curTest.sDisposition, \
curTest.lCreationMode);
if fRc \
and curTest.cbToReadWrite > 0:
## @todo Split this up in 64K reads. Later.
## @todo Test timeouts.
aBufRead = curFile.read(curTest.cbToReadWrite, 30 * 1000);
if curRes.cbProcessed > 0 \
and curRes.cbProcessed is not len(aBufRead):
reporter.error('Test #%d failed: Read buffer length does not match: Got %ld, expected %ld' \
% (i, len(aBufRead), curRes.cbProcessed));
fRc = False;
if fRc:
if curRes.aBuf is not None \
and bytes(curRes.aBuf) != bytes(aBufRead):
reporter.error('Test #%d failed: Got buffer\n%s (%ld bytes), expected\n%s (%ld bytes)' \
% (i, map(hex, map(ord, aBufRead)), len(aBufRead), \
map(hex, map(ord, curRes.aBuf)), len(curRes.aBuf)));
reporter.error('Test #%d failed: Got buffer\n%s, expected\n%s' \
% (i, aBufRead, curRes.aBuf));
fRc = False;
# Test final offset.
curOffset = long(curFile.offset);
resOffset = long(curRes.cbOffset);
if curOffset != resOffset:
reporter.error('Test #%d failed: Final offset does not match: Got %ld, expected %ld' \
% (i, curOffset, resOffset));
fRc = False;
curFile.close();
except:
reporter.logXcpt('Opening "%s" failed:' % (curTest.sFile,));
fRc = False;
curTest.closeSession();
if fRc != curRes.fRc:
reporter.error('Test #%d failed: Got %s, expected %s' % (i, fRc, curRes.fRc));
fRc = False;
break;
return (fRc, oTxsSession);
def testGuestCtrlFileWrite(self, oSession, oTxsSession, oTestVm): # pylint: disable=R0914
"""
Tests writing to guest files.
"""
if oTestVm.isWindows():
sUser = "Administrator";
else:
sUser = "vbox";
sPassword = "password";
if oTestVm.isWindows():
sScratch = "C:\\Temp\\vboxtest\\testGuestCtrlFileWrite\\";
if oTxsSession.syncMkDir('${SCRATCH}/testGuestCtrlFileWrite') is False:
reporter.error('Could not create scratch directory on guest');
return (False, oTxsSession);
aaTests = [];
cScratchBuf = 512;
aScratchBuf = array('b', [random.randint(-128, 127) for i in range(cScratchBuf)]);
aaTests.extend([
# Write to a non-existing file.
[ tdTestFileReadWrite(sUser = sUser, sPassword = sPassword, sFile = sScratch + 'testGuestCtrlFileWrite.txt', \
sOpenMode = 'w+', sDisposition = 'ce', cbToReadWrite = cScratchBuf,
aBuf = aScratchBuf),
tdTestResultFileReadWrite(fRc = True, aBuf = aScratchBuf, \
cbProcessed = cScratchBuf, cbOffset = cScratchBuf) ]
]);
aScratchBuf2 = array('b', [random.randint(-128, 127) for i in range(cScratchBuf)]);
aaTests.extend([
# Append the same amount of data to the just created file.
[ tdTestFileReadWrite(sUser = sUser, sPassword = sPassword, sFile = sScratch + 'testGuestCtrlFileWrite.txt', \
sOpenMode = 'w+', sDisposition = 'oa', cbToReadWrite = cScratchBuf,
cbOffset = cScratchBuf, aBuf = aScratchBuf2),
tdTestResultFileReadWrite(fRc = True, aBuf = aScratchBuf2, \
cbProcessed = cScratchBuf, cbOffset = cScratchBuf * 2) ],
]);
fRc = True;
for (i, aTest) in enumerate(aaTests):
curTest = aTest[0]; # tdTestFileReadWrite, use an index, later.
curRes = aTest[1]; # tdTestResult
reporter.log('Testing #%d, sFile="%s", cbToReadWrite=%d, sOpenMode="%s", sDisposition="%s", cbOffset=%ld ...' % \
(i, curTest.sFile, curTest.cbToReadWrite, curTest.sOpenMode, curTest.sDisposition, curTest.cbOffset));
curTest.setEnvironment(oSession, oTxsSession, oTestVm);
fRc, curGuestSession = curTest.createSession('testGuestCtrlFileWrite: Test #%d' % (i,));
if fRc is False:
reporter.error('Test #%d failed: Could not create session' % (i,));
break;
try:
if curTest.cbOffset > 0:
curFile = curGuestSession.fileOpenEx(curTest.sFile, curTest.sOpenMode, curTest.sDisposition, \
curTest.sSharingMode, curTest.lCreationMode, curTest.cbOffset);
curOffset = long(curFile.offset);
resOffset = long(curTest.cbOffset);
if curOffset != resOffset:
reporter.error('Test #%d failed: Initial offset on open does not match: Got %ld, expected %ld' \
% (i, curOffset, resOffset));
fRc = False;
else:
curFile = curGuestSession.fileOpen(curTest.sFile, curTest.sOpenMode, curTest.sDisposition, \
curTest.lCreationMode);
if fRc \
and curTest.cbToReadWrite > 0:
## @todo Split this up in 64K writes. Later.
## @todo Test timeouts.
cBytesWritten = curFile.write(curTest.aBuf, 30 * 1000);
if curRes.cbProcessed > 0 \
and curRes.cbProcessed != cBytesWritten:
reporter.error('Test #%d failed: Written buffer length does not match: Got %ld, expected %ld' \
% (i, cBytesWritten, curRes.cbProcessed));
fRc = False;
if fRc:
# Verify written content by seeking back to the initial offset and
# re-read & compare the written data.
try:
curFile.seek(-(curTest.cbToReadWrite), vboxcon.FileSeekType_Current);
except:
reporter.logXcpt('Seeking back to initial write position failed:');
fRc = False;
if fRc \
and long(curFile.offset) != curTest.cbOffset:
reporter.error('Test #%d failed: Initial write position does not match current position, \
got %ld, expected %ld' \
% (i, long(curFile.offset), curTest.cbOffset));
fRc = False;
if fRc:
aBufRead = curFile.read(curTest.cbToReadWrite, 30 * 1000);
if len(aBufRead) != curTest.cbToReadWrite:
reporter.error('Test #%d failed: Got buffer length %ld, expected %ld' \
% (i, len(aBufRead), curTest.cbToReadWrite));
fRc = False;
if fRc \
and curRes.aBuf is not None \
and buffer(curRes.aBuf) != aBufRead:
reporter.error('Test #%d failed: Got buffer\n%s, expected\n%s' \
% (i, aBufRead, curRes.aBuf));
fRc = False;
# Test final offset.
curOffset = long(curFile.offset);
resOffset = long(curRes.cbOffset);
if curOffset != resOffset:
reporter.error('Test #%d failed: Final offset does not match: Got %ld, expected %ld' \
% (i, curOffset, resOffset));
fRc = False;
curFile.close();
except:
reporter.logXcpt('Opening "%s" failed:' % (curTest.sFile,));
fRc = False;
curTest.closeSession();
if fRc != curRes.fRc:
reporter.error('Test #%d failed: Got %s, expected %s' % (i, fRc, curRes.fRc));
fRc = False;
break;
return (fRc, oTxsSession);
def testGuestCtrlCopyTo(self, oSession, oTxsSession, oTestVm):
"""
Tests copying files from host to the guest.
"""
if oTestVm.isWindows():
sUser = "Administrator";
else:
sUser = "vbox";
sPassword = "password";
if oTestVm.isWindows():
sScratch = "C:\\Temp\\vboxtest\\testGuestCtrlCopyTo\\";
if oTxsSession.syncMkDir('${SCRATCH}/testGuestCtrlCopyTo') is False:
reporter.error('Could not create scratch directory on guest');
return (False, oTxsSession);
# Some stupid trickery to guess the location of the iso.
sVBoxValidationKitISO = os.path.abspath(os.path.join(os.path.dirname(__file__), '../../VBoxValidationKit.iso'));
if not os.path.isfile(sVBoxValidationKitISO):
sVBoxValidationKitISO = os.path.abspath(os.path.join(os.path.dirname(__file__), '../../VBoxTestSuite.iso'));
if not os.path.isfile(sVBoxValidationKitISO):
sCur = os.getcwd();
for i in range(0, 10):
sVBoxValidationKitISO = os.path.join(sCur, 'validationkit/VBoxValidationKit.iso');
if os.path.isfile(sVBoxValidationKitISO):
break;
sVBoxValidationKitISO = os.path.join(sCur, 'testsuite/VBoxTestSuite.iso');
if os.path.isfile(sVBoxValidationKitISO):
break;
sCur = os.path.abspath(os.path.join(sCur, '..'));
if i is None: pass; # shut up pychecker/pylint.
if os.path.isfile(sVBoxValidationKitISO):
reporter.log('Validation Kit .ISO found at: %s' % (sVBoxValidationKitISO,));
else:
reporter.log('Warning: Validation Kit .ISO not found -- some tests might fail');
aaTests = [];
if oTestVm.isWindows():
aaTests.extend([
# Destination missing.
[ tdTestCopyTo(sUser = sUser, sPassword = sPassword, sSrc = ''), tdTestResult(fRc = False) ],
[ tdTestCopyTo(sUser = sUser, sPassword = sPassword, sSrc = 'C:\\Windows',
aFlags = [ 1234 ] ), tdTestResult(fRc = False) ],
# Source missing.
[ tdTestCopyTo(sUser = sUser, sPassword = sPassword, sDst = ''), tdTestResult(fRc = False) ],
[ tdTestCopyTo(sUser = sUser, sPassword = sPassword, sDst = 'C:\\Windows',
aFlags = [ 1234 ] ), tdTestResult(fRc = False) ],
# Nothing to copy (source and/or destination is empty).
[ tdTestCopyTo(sUser = sUser, sPassword = sPassword, sSrc = 'z:\\'), tdTestResult(fRc = False) ],
[ tdTestCopyTo(sUser = sUser, sPassword = sPassword, sSrc = '\\\\uncrulez\\foo'),
tdTestResult(fRc = False) ],
[ tdTestCopyTo(sUser = sUser, sPassword = sPassword, sSrc = 'non-exist',
sDst = os.path.join(sScratch, 'non-exist.dll')), tdTestResult(fRc = False) ],
# Copying single files.
[ tdTestCopyTo(sUser = sUser, sPassword = sPassword, sSrc = sVBoxValidationKitISO,
sDst = 'C:\\non-exist\\'), tdTestResult(fRc = False) ],
[ tdTestCopyTo(sUser = sUser, sPassword = sPassword, sSrc = sVBoxValidationKitISO,
sDst = 'C:\\non\\exist\\'), tdTestResult(fRc = False) ],
[ tdTestCopyTo(sUser = sUser, sPassword = sPassword, sSrc = sVBoxValidationKitISO,
sDst = 'C:\\non\\exist\\renamedfile.dll'), tdTestResult(fRc = False) ],
[ tdTestCopyTo(sUser = sUser, sPassword = sPassword, sSrc = sVBoxValidationKitISO,
sDst = os.path.join(sScratch, 'HostGuestAdditions.iso')), tdTestResult(fRc = True) ],
[ tdTestCopyTo(sUser = sUser, sPassword = sPassword, sSrc = sVBoxValidationKitISO,
sDst = os.path.join(sScratch, 'HostGuestAdditions.iso')), tdTestResult(fRc = True) ],
# Destination is a directory, should fail.
[ tdTestCopyTo(sUser = sUser, sPassword = sPassword, sSrc = sVBoxValidationKitISO,
sDst = sScratch), tdTestResult(fRc = False) ]
## @todo Add testing the CopyTo flags here!
]);
if self.oTstDrv.sHost == 'win':
## @todo Check for Windows (7) host.
aaTests.extend([
# Copying directories with contain files we don't have read access to.
[ tdTestCopyTo(sUser = sUser, sPassword = sPassword, sSrc = 'C:\\Windows\\security',
sDst = sScratch), tdTestResult(fRc = False) ],
# Copying directories with regular files.
[ tdTestCopyTo(sUser = sUser, sPassword = sPassword, sSrc = 'C:\\Windows\\Help',
sDst = sScratch), tdTestResult(fRc = True) ]
]);
else:
reporter.log('No OS-specific tests for non-Windows yet!');
fRc = True;
for (i, aTest) in enumerate(aaTests):
curTest = aTest[0]; # tdTestExec, use an index, later.
curRes = aTest[1]; # tdTestResult
reporter.log('Testing #%d, sSrc="%s", sDst="%s", aFlags="%s" ...' % \
(i, curTest.sSrc, curTest.sDst, curTest.aFlags));
curTest.setEnvironment(oSession, oTxsSession, oTestVm);
fRc, curGuestSession = curTest.createSession('testGuestCtrlCopyTo: Test #%d' % (i,));
if fRc is False:
reporter.error('Test #%d failed: Could not create session' % (i,));
break;
fRc2 = self.gctrlCopyTo(curTest, curGuestSession);
curTest.closeSession();
if fRc2 is curRes.fRc:
## @todo Verify the copied results (size, checksum?).
pass;
else:
reporter.error('Test #%d failed: Got %s, expected %s' % (i, fRc2, curRes.fRc));
fRc = False;
break;
return (fRc, oTxsSession);
def testGuestCtrlCopyFrom(self, oSession, oTxsSession, oTestVm): # pylint: disable=R0914
"""
Tests copying files from guest to the host.
"""
if oTestVm.isWindows():
sUser = "Administrator";
else:
sUser = "vbox";
sPassword = "password";
sScratch = os.path.join(self.oTstDrv.sScratchPath, "testGctrlCopyFrom");
try:
os.makedirs(sScratch);
except OSError as e:
if e.errno != errno.EEXIST:
reporter.error('Failed: Unable to create scratch directory \"%s\"' % (sScratch,));
return (False, oTxsSession);
reporter.log('Scratch path is: %s' % (sScratch,));
aaTests = [];
if oTestVm.isWindows():
aaTests.extend([
# Destination missing.
[ tdTestCopyFrom(sUser = sUser, sPassword = sPassword, sSrc = ''),
tdTestResult(fRc = False) ],
[ tdTestCopyFrom(sUser = sUser, sPassword = sPassword, sSrc = 'C:\\Windows',
aFlags = [ 1234 ] ),
tdTestResult(fRc = False) ],
# Source missing.
[ tdTestCopyFrom(sUser = sUser, sPassword = sPassword, sDst = ''),
tdTestResult(fRc = False) ],
[ tdTestCopyFrom(sUser = sUser, sPassword = sPassword, sDst = 'C:\\Windows',
aFlags = [ 1234 ] ),
tdTestResult(fRc = False) ],
# Nothing to copy (sDst is empty / unreachable).
[ tdTestCopyFrom(sUser = sUser, sPassword = sPassword, sSrc = 'z:\\'),
tdTestResult(fRc = False) ],
[ tdTestCopyFrom(sUser = sUser, sPassword = sPassword, sSrc = '\\\\uncrulez\\foo'),
tdTestResult(fRc = False) ],
[ tdTestCopyFrom(sUser = sUser, sPassword = sPassword, sSrc = 'non-exist',
sDst = os.path.join(sScratch, 'non-exist.dll')),
tdTestResult(fRc = False) ]
## @todo Add testing the CopyFrom aFlags here!
]);
if self.oTstDrv.sHost == 'win':
aaTests.extend([
# FIXME: Failing test.
# Copying single files.
# [ tdTestCopyFrom(sUser = sUser, sPassword = sPassword, sSrc = 'C:\\Windows\\system32\\ole32.dll',
# sDst = 'C:\\non-exist\\'), tdTestResult(fRc = False) ],
# [ tdTestCopyFrom(sUser = sUser, sPassword = sPassword, sSrc = 'C:\\Windows\\system32\\ole32.dll',
# sDst = 'C:\\non\\exist\\'), tdTestResult(fRc = False) ],
# [ tdTestCopyFrom(sUser = sUser, sPassword = sPassword, sSrc = 'C:\\Windows\\system32\\ole32.dll',
# sDst = 'C:\\non\\exist\\renamedfile.dll'), tdTestResult(fRc = False) ],
# [ tdTestCopyFrom(sUser = sUser, sPassword = sPassword, sSrc = 'C:\\Windows\\system32\\ole32.dll',
# sDst = os.path.join(sScratch, 'renamedfile.dll')), tdTestResult(fRc = True) ],
# [ tdTestCopyFrom(sUser = sUser, sPassword = sPassword, sSrc = 'C:\\Windows\\system32\\ole32.dll',
# sDst = os.path.join(sScratch, 'renamedfile.dll')), tdTestResult(fRc = True) ],
# Destination is a directory, should fail.
[ tdTestCopyFrom(sUser = sUser, sPassword = sPassword, sSrc = 'C:\\Windows\\system32\\ole32.dll',
sDst = sScratch), tdTestResult(fRc = False) ],
# Copying directories.
[ tdTestCopyFrom(sUser = sUser, sPassword = sPassword, sSrc = 'C:\\Windows\\Web',
sDst = sScratch), tdTestResult(fRc = True) ]
## @todo Add testing the CopyFrom aFlags here!
]);
else:
reporter.log('No OS-specific tests for non-Windows yet!');
fRc = True;
for (i, aTest) in enumerate(aaTests):
curTest = aTest[0]; # tdTestExec, use an index, later.
curRes = aTest[1]; # tdTestResult
reporter.log('Testing #%d, sSrc="%s", sDst="%s", aFlags="%s" ...' % \
(i, curTest.sSrc, curTest.sDst, curTest.aFlags));
curTest.setEnvironment(oSession, oTxsSession, oTestVm);
fRc, curGuestSession = curTest.createSession('testGuestCtrlCopyFrom: Test #%d' % (i,));
if fRc is False:
reporter.error('Test #%d failed: Could not create session' % (i,));
break;
fRc2 = self.gctrlCopyFrom(curTest, curRes, curGuestSession);
curTest.closeSession();
if fRc2 is curRes.fRc:
## @todo Verify the copied results (size, checksum?).
pass;
else:
reporter.error('Test #%d failed: Got %s, expected %s' % (i, fRc2, curRes.fRc));
fRc = False;
break;
return (fRc, oTxsSession);
def testGuestCtrlUpdateAdditions(self, oSession, oTxsSession, oTestVm): # pylint: disable=R0914
"""
Tests updating the Guest Additions inside the guest.
"""
if oTestVm.isWindows():
sUser = "Administrator";
else:
sUser = "vbox";
sPassword = "password";
# Some stupid trickery to guess the location of the iso.
sVBoxValidationKitISO = os.path.abspath(os.path.join(os.path.dirname(__file__), '../../VBoxValidationKit.iso'));
if not os.path.isfile(sVBoxValidationKitISO):
sVBoxValidationKitISO = os.path.abspath(os.path.join(os.path.dirname(__file__), '../../VBoxTestSuite.iso'));
if not os.path.isfile(sVBoxValidationKitISO):
sCur = os.getcwd();
for i in range(0, 10):
sVBoxValidationKitISO = os.path.join(sCur, 'validationkit/VBoxValidationKit.iso');
if os.path.isfile(sVBoxValidationKitISO):
break;
sVBoxValidationKitISO = os.path.join(sCur, 'testsuite/VBoxTestSuite.iso');
if os.path.isfile(sVBoxValidationKitISO):
break;
sCur = os.path.abspath(os.path.join(sCur, '..'));
if i is None: pass; # shut up pychecker/pylint.
if os.path.isfile(sVBoxValidationKitISO):
reporter.log('Validation Kit .ISO found at: %s' % (sVBoxValidationKitISO,));
else:
reporter.log('Warning: Validation Kit .ISO not found -- some tests might fail');
sScratch = os.path.join(self.oTstDrv.sScratchPath, "testGctrlUpdateAdditions");
try:
os.makedirs(sScratch);
except OSError as e:
if e.errno != errno.EEXIST:
reporter.error('Failed: Unable to create scratch directory \"%s\"' % (sScratch,));
return (False, oTxsSession);
reporter.log('Scratch path is: %s' % (sScratch,));
aaTests = [];
if oTestVm.isWindows():
aaTests.extend([
# Source is missing.
[ tdTestUpdateAdditions(sUser = sUser, sPassword = sPassword, sSrc = ''),
tdTestResult(fRc = False) ],
# Wrong aFlags.
[ tdTestUpdateAdditions(sUser = sUser, sPassword = sPassword, sSrc = self.oTstDrv.getGuestAdditionsIso(),
aFlags = [ 1234 ]),
tdTestResult(fRc = False) ],
# Non-existing .ISO.
[ tdTestUpdateAdditions(sUser = sUser, sPassword = sPassword, sSrc = "non-existing.iso"),
tdTestResult(fRc = False) ],
# Wrong .ISO.
[ tdTestUpdateAdditions(sUser = sUser, sPassword = sPassword, sSrc = sVBoxValidationKitISO),
tdTestResult(fRc = False) ],
# The real thing.
[ tdTestUpdateAdditions(sUser = sUser, sPassword = sPassword, sSrc = self.oTstDrv.getGuestAdditionsIso()),
tdTestResult(fRc = True) ],
# Test the (optional) installer arguments. This will extract the
# installer into our guest's scratch directory.
[ tdTestUpdateAdditions(sUser = sUser, sPassword = sPassword, sSrc = self.oTstDrv.getGuestAdditionsIso(),
aArgs = [ '/extract', '/D=' + sScratch ]),
tdTestResult(fRc = True) ]
# Some debg ISO. Only enable locally.
#[ tdTestUpdateAdditions(sUser = sUser, sPassword = sPassword,
# sSrc = "V:\\Downloads\\VBoxGuestAdditions-r80354.iso"),
# tdTestResult(fRc = True) ]
]);
else:
reporter.log('No OS-specific tests for non-Windows yet!');
fRc = True;
for (i, aTest) in enumerate(aaTests):
curTest = aTest[0]; # tdTestExec, use an index, later.
curRes = aTest[1]; # tdTestResult
reporter.log('Testing #%d, sSrc="%s", aFlags="%s" ...' % \
(i, curTest.sSrc, curTest.aFlags));
curTest.setEnvironment(oSession, oTxsSession, oTestVm);
fRc, _ = curTest.createSession('Test #%d' % (i,));
if fRc is False:
reporter.error('Test #%d failed: Could not create session' % (i,));
break;
try:
curProgress = curTest.oTest.oGuest.updateGuestAdditions(curTest.sSrc, curTest.aArgs, curTest.aFlags);
if curProgress is not None:
oProgress = vboxwrappers.ProgressWrapper(curProgress, self.oTstDrv.oVBoxMgr, self, "gctrlUpGA");
try:
iRc = oProgress.waitForOperation(0, fIgnoreErrors = True);
if iRc != 0:
reporter.log('Waiting for updating Guest Additions failed');
fRc = False;
except:
reporter.logXcpt('Updating Guest Additions waiting exception for sSrc="%s", aFlags="%s":' \
% (curTest.sSrc, curTest.aFlags));
fRc = False;
except:
# Just log, don't assume an error here (will be done in the main loop then).
reporter.logXcpt('Updating Guest Additions exception for sSrc="%s", aFlags="%s":' \
% (curTest.sSrc, curTest.aFlags));
fRc = False;
curTest.closeSession();
if fRc is curRes.fRc:
if fRc:
## @todo Verify if Guest Additions were really updated (build, revision, ...).
pass;
else:
reporter.error('Test #%d failed: Got %s, expected %s' % (i, fRc, curRes.fRc));
fRc = False;
break;
return (fRc, oTxsSession);
class tdAddGuestCtrl(vbox.TestDriver): # pylint: disable=R0902,R0904
"""
Guest control using VBoxService on the guest.
"""
def __init__(self):
vbox.TestDriver.__init__(self);
self.oTestVmSet = self.oTestVmManager.getStandardVmSet('nat');
self.fQuick = False; # Don't skip lengthly tests by default.
self.addSubTestDriver(SubTstDrvAddGuestCtrl(self));
#
# Overridden methods.
#
def showUsage(self):
"""
Shows the testdriver usage.
"""
rc = vbox.TestDriver.showUsage(self);
reporter.log('');
reporter.log('tdAddGuestCtrl Options:');
reporter.log(' --quick');
reporter.log(' Same as --virt-modes hwvirt --cpu-counts 1.');
return rc;
def parseOption(self, asArgs, iArg): # pylint: disable=R0912,R0915
"""
Parses the testdriver arguments from the command line.
"""
if asArgs[iArg] == '--quick':
self.parseOption(['--virt-modes', 'hwvirt'], 0);
self.parseOption(['--cpu-counts', '1'], 0);
self.fQuick = True;
else:
return vbox.TestDriver.parseOption(self, asArgs, iArg);
return iArg + 1;
def actionConfig(self):
if not self.importVBoxApi(): # So we can use the constant below.
return False;
eNic0AttachType = vboxcon.NetworkAttachmentType_NAT;
sGaIso = self.getGuestAdditionsIso();
return self.oTestVmSet.actionConfig(self, eNic0AttachType = eNic0AttachType, sDvdImage = sGaIso);
def actionExecute(self):
return self.oTestVmSet.actionExecute(self, self.testOneCfg);
#
# Test execution helpers.
#
def testOneCfg(self, oVM, oTestVm): # pylint: disable=R0915
"""
Runs the specified VM thru the tests.
Returns a success indicator on the general test execution. This is not
the actual test result.
"""
self.logVmInfo(oVM);
fRc = True;
oSession, oTxsSession = self.startVmAndConnectToTxsViaTcp(oTestVm.sVmName, fCdWait = False);
reporter.log("TxsSession: %s" % (oTxsSession,));
if oSession is not None:
self.addTask(oSession);
fManual = False; # Manual override for local testing. (Committed version shall be False.)
if not fManual:
fRc, oTxsSession = self.aoSubTstDrvs[0].testIt(oTestVm, oSession, oTxsSession);
else:
fRc, oTxsSession = self.testGuestCtrlManual(oSession, oTxsSession, oTestVm);
# Cleanup.
self.removeTask(oTxsSession);
if not fManual:
self.terminateVmBySession(oSession);
else:
fRc = False;
return fRc;
def gctrlReportError(self, progress):
"""
Helper function to report an error of a
given progress object.
"""
if progress is None:
reporter.log('No progress object to print error for');
else:
errInfo = progress.errorInfo;
if errInfo:
reporter.log('%s' % (errInfo.text,));
return False;
def gctrlGetRemainingTime(self, msTimeout, msStart):
"""
Helper function to return the remaining time (in ms)
based from a timeout value and the start time (both in ms).
"""
if msTimeout is 0:
return 0xFFFFFFFE; # Wait forever.
msElapsed = base.timestampMilli() - msStart;
if msElapsed > msTimeout:
return 0; # No time left.
return msTimeout - msElapsed;
def testGuestCtrlManual(self, oSession, oTxsSession, oTestVm): # pylint: disable=R0914,R0915,W0613,W0612
"""
For manually testing certain bits.
"""
reporter.log('Manual testing ...');
fRc = True;
sUser = 'Administrator';
sPassword = 'password';
oGuest = oSession.o.console.guest;
oGuestSession = oGuest.createSession(sUser,
sPassword,
"", "Manual Test");
aWaitFor = [ vboxcon.GuestSessionWaitForFlag_Start ];
_ = oGuestSession.waitForArray(aWaitFor, 30 * 1000);
sCmd = 'c:\\windows\\system32\\cmd.exe';
aArgs = [ '/C', 'dir', '/S', 'c:\\windows' ];
aEnv = [];
aFlags = [];
for _ in range(100):
oProc = oGuestSession.processCreate(sCmd,
aArgs, aEnv,
aFlags, 30 * 1000);
aWaitFor = [ vboxcon.ProcessWaitForFlag_Terminate ];
_ = oProc.waitForArray(aWaitFor, 30 * 1000);
oGuestSession.close();
oGuestSession = None;
time.sleep(5);
oSession.o.console.PowerDown();
return (fRc, oTxsSession);
if __name__ == '__main__':
sys.exit(tdAddGuestCtrl().main(sys.argv));