t_pkg_depotd.py revision 689
581N/A#!/usr/bin/python2.4
581N/A#
581N/A# CDDL HEADER START
581N/A#
581N/A# The contents of this file are subject to the terms of the
581N/A# Common Development and Distribution License (the "License").
581N/A# You may not use this file except in compliance with the License.
581N/A#
581N/A# You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
581N/A# or http://www.opensolaris.org/os/licensing.
581N/A# See the License for the specific language governing permissions
581N/A# and limitations under the License.
581N/A#
581N/A# When distributing Covered Code, include this CDDL HEADER in each
581N/A# file and include the License file at usr/src/OPENSOLARIS.LICENSE.
581N/A# If applicable, add the following below this CDDL HEADER, with the
581N/A# fields enclosed by brackets "[]" replaced with your own identifying
581N/A# information: Portions Copyright [yyyy] [name of copyright owner]
581N/A#
581N/A# CDDL HEADER END
581N/A#
581N/A
581N/A# Copyright 2008 Sun Microsystems, Inc. All rights reserved.
581N/A# Use is subject to license terms.
581N/A
581N/Aimport testutils
581N/Aif __name__ == "__main__":
581N/A testutils.setup_environment("../../../proto")
581N/A
581N/Aimport unittest
581N/Aimport os
689N/Aimport pkg.misc as misc
581N/Aimport shutil
581N/A
581N/Aimport pkg.depotcontroller as dc
581N/A
581N/Aclass TestPkgDepot(testutils.SingleDepotTestCase):
581N/A # Only start/stop the depot once (instead of for every test)
581N/A persistent_depot = True
581N/A
581N/A quux10 = """
581N/A open quux@1.0,5.11-0
581N/A add dir mode=0755 owner=root group=bin path=/bin
581N/A add file /tmp/cat mode=0555 owner=root group=bin path=/bin/cat
581N/A add file /tmp/libc.so.1 mode=0555 owner=root group=bin path=/lib/libc.so.1
581N/A close """
581N/A
689N/A info10 = """
689N/A open info@1.0,5.11-0
689N/A close """
689N/A
581N/A misc_files = [ "/tmp/libc.so.1", "/tmp/cat" ]
581N/A
581N/A def setUp(self):
581N/A testutils.SingleDepotTestCase.setUp(self)
581N/A for p in self.misc_files:
581N/A f = open(p, "w")
581N/A # write the name of the file into the file, so that
581N/A # all files have differing contents
581N/A f.write(p)
581N/A f.close
581N/A self.debug("wrote %s" % p)
581N/A
581N/A def tearDown(self):
581N/A testutils.SingleDepotTestCase.tearDown(self)
581N/A for p in self.misc_files:
581N/A os.remove(p)
581N/A
581N/A def test_depot_ping(self):
581N/A """ Ping the depot several times """
581N/A
581N/A self.assert_(self.dc.is_alive())
581N/A self.assert_(self.dc.is_alive())
581N/A self.assert_(self.dc.is_alive())
581N/A self.assert_(self.dc.is_alive())
581N/A
581N/A def testStartStop(self):
581N/A """ Start and stop the depot several times """
581N/A self.dc.stop()
581N/A for i in range(0, 5):
581N/A self.dc.start()
581N/A self.assert_(self.dc.is_alive())
581N/A self.dc.stop()
581N/A self.assert_(not self.dc.is_alive())
581N/A
581N/A self.dc.start()
581N/A
581N/A def test_bug_1876(self):
581N/A """ Send package quux@1.0 an action at a time, restarting the
581N/A depot server after each one is sent, to ensure that
581N/A transactions work across depot restart. Then verify that
581N/A the package was successfully added by performing some
581N/A basic operations. """
581N/A
581N/A durl = self.dc.get_depot_url()
581N/A
581N/A for line in self.quux10.split("\n"):
581N/A line = line.strip()
581N/A if line == "":
581N/A continue
581N/A
581N/A try:
581N/A self.pkgsend(durl, line, exit = 0)
581N/A except:
581N/A self.pkgsend(durl, "close -A", exit = 0)
581N/A raise
581N/A
581N/A if not line == "close":
581N/A self.restart_depots()
581N/A
581N/A self.image_create(durl)
581N/A
581N/A self.pkg("list -a")
581N/A self.pkg("list", exit=1)
581N/A
581N/A self.pkg("install quux")
581N/A
581N/A self.pkg("list")
581N/A self.pkg("verify")
581N/A
581N/A self.pkg("uninstall quux")
581N/A self.pkg("verify")
581N/A
581N/A def test_bad_fmris(self):
581N/A durl = self.dc.get_depot_url()
581N/A self.pkgsend(durl, "open foo@", exit=1)
581N/A self.pkgsend(durl, "open foo@x.y", exit=1)
581N/A self.pkgsend(durl, "open foo@1.0,-2.0", exit=1)
581N/A
599N/A def test_bug_3365(self):
599N/A durl = self.dc.get_depot_url()
599N/A depotpath = self.dc.get_repodir()
599N/A
599N/A dir_file = os.path.join(depotpath, "search.dir")
599N/A pag_file = os.path.join(depotpath, "search.pag")
599N/A
599N/A self.assert_(not os.path.exists(dir_file))
599N/A self.assert_(not os.path.exists(pag_file))
599N/A
599N/A f = open(dir_file, "w")
599N/A f.close()
599N/A f = open(pag_file, "w")
599N/A f.close()
599N/A self.assert_(os.path.exists(dir_file))
599N/A self.assert_(os.path.exists(pag_file))
599N/A
599N/A self.dc.stop()
599N/A self.dc.start()
599N/A self.pkgsend_bulk(durl, self.quux10)
599N/A self.assert_(not os.path.exists(dir_file))
599N/A self.assert_(not os.path.exists(pag_file))
599N/A
689N/A def test_bug_4489(self):
689N/A """Publish a package and then verify that the depot /info
689N/A operation doesn't fail."""
689N/A depot_url = self.dc.get_depot_url()
689N/A plist = self.pkgsend_bulk(depot_url, self.info10)
689N/A misc.versioned_urlopen(depot_url, "info", [0], plist[0])
581N/A
581N/Aclass TestDepotController(testutils.CliTestCase):
581N/A
581N/A def setUp(self):
581N/A testutils.CliTestCase.setUp(self)
581N/A
581N/A self.__dc = dc.DepotController()
581N/A self.__pid = os.getpid()
581N/A self.__dc.set_depotd_path(testutils.g_proto_area + \
581N/A "/usr/lib/pkg.depotd")
612N/A self.__dc.set_depotd_content_root(testutils.g_proto_area + \
612N/A "/usr/share/lib/pkg")
581N/A
581N/A depotpath = os.path.join(self.get_test_prefix(), "depot")
581N/A logpath = os.path.join(self.get_test_prefix(), self.id())
581N/A
581N/A try:
581N/A os.makedirs(depotpath, 0755)
581N/A except:
581N/A pass
581N/A
581N/A self.__dc.set_repodir(depotpath)
581N/A self.__dc.set_logpath(logpath)
581N/A
581N/A def tearDown(self):
581N/A testutils.CliTestCase.tearDown(self)
581N/A
581N/A self.__dc.kill()
581N/A shutil.rmtree(self.__dc.get_repodir())
581N/A os.remove(self.__dc.get_logpath())
581N/A
581N/A
581N/A def testStartStop(self):
581N/A self.__dc.set_port(12000)
581N/A for i in range(0, 5):
581N/A self.__dc.start()
581N/A self.assert_(self.__dc.is_alive())
581N/A self.__dc.stop()
581N/A self.assert_(not self.__dc.is_alive())
581N/A
581N/A def testBadArgs(self):
581N/A self.__dc.set_readonly()
581N/A self.__dc.set_rebuild()
581N/A self.__dc.set_norefresh_index()
581N/A
581N/A self.assert_(self.__dc.start_expected_fail())
581N/A
581N/A self.__dc.set_readonly()
581N/A self.__dc.set_norebuild()
581N/A self.__dc.set_refresh_index()
581N/A
581N/A self.assert_(self.__dc.start_expected_fail())
581N/A
581N/A self.__dc.set_readonly()
581N/A self.__dc.set_rebuild()
581N/A self.__dc.set_refresh_index()
581N/A
581N/A self.assert_(self.__dc.start_expected_fail())
581N/A
581N/A self.__dc.set_readwrite()
581N/A self.__dc.set_rebuild()
581N/A self.__dc.set_refresh_index()
581N/A
581N/A self.assert_(self.__dc.start_expected_fail())
581N/A
581N/A self.__dc.set_mirror()
581N/A self.__dc.set_rebuild()
581N/A self.__dc.set_norefresh_index()
581N/A
581N/A self.assert_(self.__dc.start_expected_fail())
581N/A
581N/A self.__dc.set_mirror()
581N/A self.__dc.set_norebuild()
581N/A self.__dc.set_refresh_index()
581N/A
581N/A self.assert_(self.__dc.start_expected_fail())
581N/A
581N/Aif __name__ == "__main__":
581N/A unittest.main()