test_memory_cache.py revision b28f5fb097e06a97a45e0ae348e506d9d1432cc8
#
# LDAP integration test
#
# Copyright (c) 2015 Red Hat, Inc.
# Author: Lukas Slebodnik <lslebodn@redhat.com>
#
# This is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by
# the Free Software Foundation; version 2 only
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import os
import stat
import ent
import grp
import pwd
import config
import signal
import subprocess
import time
import pytest
import ds_openldap
import ldap_ent
import sssd_id
from util import unindent
LDAP_BASE_DN = "dc=example,dc=com"
@pytest.fixture(scope="module")
def ds_inst(request):
"""LDAP server instance fixture"""
ds_inst = ds_openldap.DSOpenLDAP(
config.PREFIX, 10389, LDAP_BASE_DN,
"cn=admin", "Secret123")
try:
ds_inst.setup()
except:
ds_inst.teardown()
raise
request.addfinalizer(lambda: ds_inst.teardown())
return ds_inst
@pytest.fixture(scope="module")
def ldap_conn(request, ds_inst):
"""LDAP server connection fixture"""
ldap_conn = ds_inst.bind()
ldap_conn.ds_inst = ds_inst
request.addfinalizer(lambda: ldap_conn.unbind_s())
return ldap_conn
def create_ldap_fixture(request, ldap_conn, ent_list):
"""Add LDAP entries and add teardown for removing them"""
for entry in ent_list:
ldap_conn.add_s(entry[0], entry[1])
def teardown():
for entry in ent_list:
ldap_conn.delete_s(entry[0])
request.addfinalizer(teardown)
def create_conf_fixture(request, contents):
"""Generate sssd.conf and add teardown for removing it"""
conf = open(config.CONF_PATH, "w")
conf.write(contents)
conf.close()
os.chmod(config.CONF_PATH, stat.S_IRUSR | stat.S_IWUSR)
request.addfinalizer(lambda: os.unlink(config.CONF_PATH))
def stop_sssd():
pid_file = open(config.PIDFILE_PATH, "r")
pid = int(pid_file.read())
os.kill(pid, signal.SIGTERM)
while True:
try:
os.kill(pid, signal.SIGCONT)
except:
break
time.sleep(1)
def create_sssd_fixture(request):
"""Start sssd and add teardown for stopping it and removing state"""
if subprocess.call(["sssd", "-D", "-f"]) != 0:
raise Exception("sssd start failed")
def teardown():
try:
stop_sssd()
except:
pass
subprocess.call(["sss_cache", "-E"])
for path in os.listdir(config.DB_PATH):
os.unlink(config.DB_PATH + "/" + path)
for path in os.listdir(config.MCACHE_PATH):
os.unlink(config.MCACHE_PATH + "/" + path)
request.addfinalizer(teardown)
def load_data_to_ldap(request, ldap_conn):
ent_list = ldap_ent.List(ldap_conn.ds_inst.base_dn)
ent_list.add_user("user1", 1001, 2001)
ent_list.add_user("user2", 1002, 2002)
ent_list.add_user("user3", 1003, 2003)
ent_list.add_user("user11", 1011, 2001)
ent_list.add_user("user12", 1012, 2002)
ent_list.add_user("user13", 1013, 2003)
ent_list.add_user("user21", 1021, 2001)
ent_list.add_user("user22", 1022, 2002)
ent_list.add_user("user23", 1023, 2003)
ent_list.add_group("group1", 2001, ["user1", "user11", "user21"])
ent_list.add_group("group2", 2002, ["user2", "user12", "user22"])
ent_list.add_group("group3", 2003, ["user3", "user13", "user23"])
ent_list.add_group("group0x", 2000, ["user1", "user2", "user3"])
ent_list.add_group("group1x", 2010, ["user11", "user12", "user13"])
ent_list.add_group("group2x", 2020, ["user21", "user22", "user23"])
create_ldap_fixture(request, ldap_conn, ent_list)
@pytest.fixture
def sanity_rfc2307(request, ldap_conn):
load_data_to_ldap(request, ldap_conn)
conf = unindent("""\
[sssd]
domains = LDAP
services = nss
[nss]
[domain/LDAP]
ldap_auth_disable_tls_never_use_in_production = true
ldap_schema = rfc2307
id_provider = ldap
auth_provider = ldap
sudo_provider = ldap
ldap_uri = {ldap_conn.ds_inst.ldap_url}
ldap_search_base = {ldap_conn.ds_inst.base_dn}
""").format(**locals())
create_conf_fixture(request, conf)
create_sssd_fixture(request)
return None
@pytest.fixture
def fqname_rfc2307(request, ldap_conn):
load_data_to_ldap(request, ldap_conn)
conf = unindent("""\
[sssd]
domains = LDAP
services = nss
[nss]
[domain/LDAP]
ldap_auth_disable_tls_never_use_in_production = true
ldap_schema = rfc2307
id_provider = ldap
auth_provider = ldap
sudo_provider = ldap
ldap_uri = {ldap_conn.ds_inst.ldap_url}
ldap_search_base = {ldap_conn.ds_inst.base_dn}
use_fully_qualified_names = true
""").format(**locals())
create_conf_fixture(request, conf)
create_sssd_fixture(request)
return None
@pytest.fixture
def fqname_case_insensitive_rfc2307(request, ldap_conn):
load_data_to_ldap(request, ldap_conn)
conf = unindent("""\
[sssd]
domains = LDAP
services = nss
[nss]
[domain/LDAP]
ldap_auth_disable_tls_never_use_in_production = true
ldap_schema = rfc2307
id_provider = ldap
auth_provider = ldap
sudo_provider = ldap
ldap_uri = {ldap_conn.ds_inst.ldap_url}
ldap_search_base = {ldap_conn.ds_inst.base_dn}
use_fully_qualified_names = true
case_sensitive = false
""").format(**locals())
create_conf_fixture(request, conf)
create_sssd_fixture(request)
return None
def test_getpwnam(ldap_conn, sanity_rfc2307):
ent.assert_passwd_by_name(
'user1',
dict(name='user1', passwd='*', uid=1001, gid=2001,
gecos='1001', shell='/bin/bash'))
ent.assert_passwd_by_uid(
1001,
dict(name='user1', passwd='*', uid=1001, gid=2001,
gecos='1001', shell='/bin/bash'))
ent.assert_passwd_by_name(
'user2',
dict(name='user2', passwd='*', uid=1002, gid=2002,
gecos='1002', shell='/bin/bash'))
ent.assert_passwd_by_uid(
1002,
dict(name='user2', passwd='*', uid=1002, gid=2002,
gecos='1002', shell='/bin/bash'))
ent.assert_passwd_by_name(
'user3',
dict(name='user3', passwd='*', uid=1003, gid=2003,
gecos='1003', shell='/bin/bash'))
ent.assert_passwd_by_uid(
1003,
dict(name='user3', passwd='*', uid=1003, gid=2003,
gecos='1003', shell='/bin/bash'))
ent.assert_passwd_by_name(
'user11',
dict(name='user11', passwd='*', uid=1011, gid=2001,
gecos='1011', shell='/bin/bash'))
ent.assert_passwd_by_uid(
1011,
dict(name='user11', passwd='*', uid=1011, gid=2001,
gecos='1011', shell='/bin/bash'))
ent.assert_passwd_by_name(
'user12',
dict(name='user12', passwd='*', uid=1012, gid=2002,
gecos='1012', shell='/bin/bash'))
ent.assert_passwd_by_uid(
1012,
dict(name='user12', passwd='*', uid=1012, gid=2002,
gecos='1012', shell='/bin/bash'))
ent.assert_passwd_by_name(
'user13',
dict(name='user13', passwd='*', uid=1013, gid=2003,
gecos='1013', shell='/bin/bash'))
ent.assert_passwd_by_uid(
1013,
dict(name='user13', passwd='*', uid=1013, gid=2003,
gecos='1013', shell='/bin/bash'))
ent.assert_passwd_by_name(
'user21',
dict(name='user21', passwd='*', uid=1021, gid=2001,
gecos='1021', shell='/bin/bash'))
ent.assert_passwd_by_uid(
1021,
dict(name='user21', passwd='*', uid=1021, gid=2001,
gecos='1021', shell='/bin/bash'))
ent.assert_passwd_by_name(
'user22',
dict(name='user22', passwd='*', uid=1022, gid=2002,
gecos='1022', shell='/bin/bash'))
ent.assert_passwd_by_uid(
1022,
dict(name='user22', passwd='*', uid=1022, gid=2002,
gecos='1022', shell='/bin/bash'))
ent.assert_passwd_by_name(
'user23',
dict(name='user23', passwd='*', uid=1023, gid=2003,
gecos='1023', shell='/bin/bash'))
ent.assert_passwd_by_uid(
1023,
dict(name='user23', passwd='*', uid=1023, gid=2003,
gecos='1023', shell='/bin/bash'))
def test_getpwnam_with_mc(ldap_conn, sanity_rfc2307):
test_getpwnam(ldap_conn, sanity_rfc2307)
stop_sssd()
test_getpwnam(ldap_conn, sanity_rfc2307)
def test_getgrnam_simple(ldap_conn, sanity_rfc2307):
ent.assert_group_by_name("group1", dict(name="group1", gid=2001))
ent.assert_group_by_gid(2001, dict(name="group1", gid=2001))
ent.assert_group_by_name("group2", dict(name="group2", gid=2002))
ent.assert_group_by_gid(2002, dict(name="group2", gid=2002))
ent.assert_group_by_name("group3", dict(name="group3", gid=2003))
ent.assert_group_by_gid(2003, dict(name="group3", gid=2003))
ent.assert_group_by_name("group0x", dict(name="group0x", gid=2000))
ent.assert_group_by_gid(2000, dict(name="group0x", gid=2000))
ent.assert_group_by_name("group1x", dict(name="group1x", gid=2010))
ent.assert_group_by_gid(2010, dict(name="group1x", gid=2010))
ent.assert_group_by_name("group2x", dict(name="group2x", gid=2020))
ent.assert_group_by_gid(2020, dict(name="group2x", gid=2020))
def test_getgrnam_simple_with_mc(ldap_conn, sanity_rfc2307):
test_getgrnam_simple(ldap_conn, sanity_rfc2307)
stop_sssd()
test_getgrnam_simple(ldap_conn, sanity_rfc2307)
def test_getgrnam_membership(ldap_conn, sanity_rfc2307):
ent.assert_group_by_name(
"group1",
dict(mem=ent.contains_only("user1", "user11", "user21")))
ent.assert_group_by_gid(
2001,
dict(mem=ent.contains_only("user1", "user11", "user21")))
ent.assert_group_by_name(
"group2",
dict(mem=ent.contains_only("user2", "user12", "user22")))
ent.assert_group_by_gid(
2002,
dict(mem=ent.contains_only("user2", "user12", "user22")))
ent.assert_group_by_name(
"group3",
dict(mem=ent.contains_only("user3", "user13", "user23")))
ent.assert_group_by_gid(
2003,
dict(mem=ent.contains_only("user3", "user13", "user23")))
ent.assert_group_by_name(
"group0x",
dict(mem=ent.contains_only("user1", "user2", "user3")))
ent.assert_group_by_gid(
2000,
dict(mem=ent.contains_only("user1", "user2", "user3")))
ent.assert_group_by_name(
"group1x",
dict(mem=ent.contains_only("user11", "user12", "user13")))
ent.assert_group_by_gid(
2010,
dict(mem=ent.contains_only("user11", "user12", "user13")))
ent.assert_group_by_name(
"group2x",
dict(mem=ent.contains_only("user21", "user22", "user23")))
ent.assert_group_by_gid(
2020,
dict(mem=ent.contains_only("user21", "user22", "user23")))
def test_getgrnam_membership_with_mc(ldap_conn, sanity_rfc2307):
test_getgrnam_membership(ldap_conn, sanity_rfc2307)
stop_sssd()
test_getgrnam_membership(ldap_conn, sanity_rfc2307)
def assert_user_gids_equal(user, expected_gids):
(res, errno, gids) = sssd_id.get_user_gids(user)
assert res == sssd_id.NssReturnCode.SUCCESS, \
"Could not find groups for user %s, %d" % (user, errno)
assert sorted(gids) == sorted(expected_gids), \
"result: %s\n expected %s" % (
", ".join(["%s" % s for s in sorted(gids)]),
", ".join(["%s" % s for s in sorted(expected_gids)])
)
def test_initgroups(ldap_conn, sanity_rfc2307):
assert_user_gids_equal('user1', [2000, 2001])
assert_user_gids_equal('user2', [2000, 2002])
assert_user_gids_equal('user3', [2000, 2003])
assert_user_gids_equal('user11', [2010, 2001])
assert_user_gids_equal('user12', [2010, 2002])
assert_user_gids_equal('user13', [2010, 2003])
assert_user_gids_equal('user21', [2020, 2001])
assert_user_gids_equal('user22', [2020, 2002])
assert_user_gids_equal('user23', [2020, 2003])
def test_initgroups_with_mc(ldap_conn, sanity_rfc2307):
test_initgroups(ldap_conn, sanity_rfc2307)
stop_sssd()
test_initgroups(ldap_conn, sanity_rfc2307)
def test_initgroups_fqname_with_mc(ldap_conn, fqname_rfc2307):
assert_user_gids_equal('user1@LDAP', [2000, 2001])
stop_sssd()
assert_user_gids_equal('user1@LDAP', [2000, 2001])
def assert_initgroups_equal(user, primary_gid, expected_gids):
(res, errno, gids) = sssd_id.call_sssd_initgroups(user, primary_gid)
assert res == sssd_id.NssReturnCode.SUCCESS, \
"Could not find groups for user %s, %d" % (user, errno)
assert sorted(gids) == sorted(expected_gids), \
"result: %s\n expected %s" % (
", ".join(["%s" % s for s in sorted(gids)]),
", ".join(["%s" % s for s in sorted(expected_gids)])
)
def assert_stored_last_initgroups(user1_case1, user1_case2, user1_case_last,
primary_gid, expected_gids):
assert_initgroups_equal(user1_case1, primary_gid, expected_gids)
assert_initgroups_equal(user1_case2, primary_gid, expected_gids)
assert_initgroups_equal(user1_case_last, primary_gid, expected_gids)
stop_sssd()
user = user1_case1
(res, errno, _) = sssd_id.call_sssd_initgroups(user, primary_gid)
assert res == sssd_id.NssReturnCode.UNAVAIL, \
"Initgroups for user shoudl fail user %s, %d, %d" % (user, res, errno)
user = user1_case2
(res, errno, _) = sssd_id.call_sssd_initgroups(user, primary_gid)
assert res == sssd_id.NssReturnCode.UNAVAIL, \
"Initgroups for user shoudl fail user %s, %d, %d" % (user, res, errno)
# Just last invocation of initgroups shoudl PASS
# Otherwise, we would not be able to invalidate it
assert_initgroups_equal(user1_case_last, primary_gid, expected_gids)
def test_initgroups_case_insensitive_with_mc1(ldap_conn,
fqname_case_insensitive_rfc2307):
user1_case1 = 'User1@LDAP'
user1_case2 = 'uSer1@LDAP'
user1_case_last = 'usEr1@LDAP'
primary_gid = 2001
expected_gids = [2000, 2001]
assert_stored_last_initgroups(user1_case1, user1_case2, user1_case_last,
primary_gid, expected_gids)
def test_initgroups_case_insensitive_with_mc2(ldap_conn,
fqname_case_insensitive_rfc2307):
user1_case1 = 'usEr1@LDAP'
user1_case2 = 'User1@LDAP'
user1_case_last = 'uSer1@LDAP'
primary_gid = 2001
expected_gids = [2000, 2001]
assert_stored_last_initgroups(user1_case1, user1_case2, user1_case_last,
primary_gid, expected_gids)
def test_initgroups_case_insensitive_with_mc3(ldap_conn,
fqname_case_insensitive_rfc2307):
user1_case1 = 'uSer1@LDAP'
user1_case2 = 'usEr1@LDAP'
user1_case_last = 'User1@LDAP'
primary_gid = 2001
expected_gids = [2000, 2001]
assert_stored_last_initgroups(user1_case1, user1_case2, user1_case_last,
primary_gid, expected_gids)
def run_simple_test_with_initgroups():
ent.assert_passwd_by_name(
'user1',
dict(name='user1', passwd='*', uid=1001, gid=2001,
gecos='1001', shell='/bin/bash'))
ent.assert_passwd_by_uid(
1001,
dict(name='user1', passwd='*', uid=1001, gid=2001,
gecos='1001', shell='/bin/bash'))
ent.assert_group_by_name(
"group1",
dict(mem=ent.contains_only("user1", "user11", "user21")))
ent.assert_group_by_gid(
2001,
dict(mem=ent.contains_only("user1", "user11", "user21")))
# unrelated group to user1
ent.assert_group_by_name(
"group2",
dict(mem=ent.contains_only("user2", "user12", "user22")))
ent.assert_group_by_gid(
2002,
dict(mem=ent.contains_only("user2", "user12", "user22")))
assert_initgroups_equal("user1", 2001, [2000, 2001])
def test_invalidation_of_gids_after_initgroups(ldap_conn, sanity_rfc2307):
# the sssd cache was empty and not all user's group were
# resolved with getgr{nm,gid}. Therefore there is a change in
# group membership => user groups should be invalidated
run_simple_test_with_initgroups()
assert_initgroups_equal("user1", 2001, [2000, 2001])
stop_sssd()
ent.assert_passwd_by_name(
'user1',
dict(name='user1', passwd='*', uid=1001, gid=2001,
gecos='1001', shell='/bin/bash'))
ent.assert_passwd_by_uid(
1001,
dict(name='user1', passwd='*', uid=1001, gid=2001,
gecos='1001', shell='/bin/bash'))
# unrelated group to user1 must be returned
ent.assert_group_by_name(
"group2",
dict(mem=ent.contains_only("user2", "user12", "user22")))
ent.assert_group_by_gid(
2002,
dict(mem=ent.contains_only("user2", "user12", "user22")))
assert_initgroups_equal("user1", 2001, [2000, 2001])
# user groups must be invalidated
for group in ["group1", "group0x"]:
with pytest.raises(KeyError):
grp.getgrnam(group)
for gid in [2000, 2001]:
with pytest.raises(KeyError):
grp.getgrgid(gid)
def test_initgroups_without_change_in_membership(ldap_conn, sanity_rfc2307):
# the sssd cache was empty and not all user's group were
# resolved with getgr{nm,gid}. Therefore there is a change in
# group membership => user groups should be invalidated
run_simple_test_with_initgroups()
# invalidate cache
subprocess.call(["sss_cache", "-E"])
# all users and groups will be just refreshed from LDAP
# but there will not be a change in group membership
# user groups should not be invlaidated
run_simple_test_with_initgroups()
stop_sssd()
# everything should be in memory cache
run_simple_test_with_initgroups()
def assert_mc_records_for_user1():
ent.assert_passwd_by_name(
'user1',
dict(name='user1', passwd='*', uid=1001, gid=2001,
gecos='1001', shell='/bin/bash'))
ent.assert_passwd_by_uid(
1001,
dict(name='user1', passwd='*', uid=1001, gid=2001,
gecos='1001', shell='/bin/bash'))
ent.assert_group_by_name(
"group1",
dict(mem=ent.contains_only("user1", "user11", "user21")))
ent.assert_group_by_gid(
2001,
dict(mem=ent.contains_only("user1", "user11", "user21")))
ent.assert_group_by_name(
"group0x",
dict(mem=ent.contains_only("user1", "user2", "user3")))
ent.assert_group_by_gid(
2000,
dict(mem=ent.contains_only("user1", "user2", "user3")))
assert_initgroups_equal("user1", 2001, [2000, 2001])
def assert_missing_mc_records_for_user1():
with pytest.raises(KeyError):
pwd.getpwnam("user1")
with pytest.raises(KeyError):
pwd.getpwuid(1001)
for gid in [2000, 2001]:
with pytest.raises(KeyError):
grp.getgrgid(gid)
for group in ["group0x", "group1"]:
with pytest.raises(KeyError):
grp.getgrnam(group)
(res, err, _) = sssd_id.call_sssd_initgroups("user1", 2001)
assert res == sssd_id.NssReturnCode.UNAVAIL, \
"Initgroups should not find anything after invalidation of mc.\n" \
"User user1, errno:%d" % err
def test_invalidate_user_before_stop(ldap_conn, sanity_rfc2307):
# initialize cache with full ID
(res, errno, _) = sssd_id.get_user_groups("user1")
assert res == sssd_id.NssReturnCode.SUCCESS, \
"Could not find groups for user1, %d" % errno
assert_mc_records_for_user1()
subprocess.call(["sss_cache", "-u", "user1"])
stop_sssd()
assert_missing_mc_records_for_user1()
def test_invalidate_user_after_stop(ldap_conn, sanity_rfc2307):
# initialize cache with full ID
(res, errno, _) = sssd_id.get_user_groups("user1")
assert res == sssd_id.NssReturnCode.SUCCESS, \
"Could not find groups for user1, %d" % errno
assert_mc_records_for_user1()
stop_sssd()
subprocess.call(["sss_cache", "-u", "user1"])
assert_missing_mc_records_for_user1()
def test_invalidate_users_before_stop(ldap_conn, sanity_rfc2307):
# initialize cache with full ID
(res, errno, _) = sssd_id.get_user_groups("user1")
assert res == sssd_id.NssReturnCode.SUCCESS, \
"Could not find groups for user1, %d" % errno
assert_mc_records_for_user1()
subprocess.call(["sss_cache", "-U"])
stop_sssd()
assert_missing_mc_records_for_user1()
def test_invalidate_users_after_stop(ldap_conn, sanity_rfc2307):
# initialize cache with full ID
(res, errno, _) = sssd_id.get_user_groups("user1")
assert res == sssd_id.NssReturnCode.SUCCESS, \
"Could not find groups for user1, %d" % errno
assert_mc_records_for_user1()
stop_sssd()
subprocess.call(["sss_cache", "-U"])
assert_missing_mc_records_for_user1()
def test_invalidate_group_before_stop(ldap_conn, sanity_rfc2307):
# initialize cache with full ID
(res, errno, _) = sssd_id.get_user_groups("user1")
assert res == sssd_id.NssReturnCode.SUCCESS, \
"Could not find groups for user1, %d" % errno
assert_mc_records_for_user1()
subprocess.call(["sss_cache", "-g", "group1"])
stop_sssd()
assert_missing_mc_records_for_user1()
def test_invalidate_group_after_stop(ldap_conn, sanity_rfc2307):
# initialize cache with full ID
(res, errno, _) = sssd_id.get_user_groups("user1")
assert res == sssd_id.NssReturnCode.SUCCESS, \
"Could not find groups for user1, %d" % errno
assert_mc_records_for_user1()
stop_sssd()
subprocess.call(["sss_cache", "-g", "group1"])
assert_missing_mc_records_for_user1()
def test_invalidate_groups_before_stop(ldap_conn, sanity_rfc2307):
# initialize cache with full ID
(res, errno, _) = sssd_id.get_user_groups("user1")
assert res == sssd_id.NssReturnCode.SUCCESS, \
"Could not find groups for user1, %d" % errno
assert_mc_records_for_user1()
subprocess.call(["sss_cache", "-G"])
stop_sssd()
assert_missing_mc_records_for_user1()
def test_invalidate_groups_after_stop(ldap_conn, sanity_rfc2307):
# initialize cache with full ID
(res, errno, _) = sssd_id.get_user_groups("user1")
assert res == sssd_id.NssReturnCode.SUCCESS, \
"Could not find groups for user1, %d" % errno
assert_mc_records_for_user1()
stop_sssd()
subprocess.call(["sss_cache", "-G"])
assert_missing_mc_records_for_user1()
def test_invalidate_everything_before_stop(ldap_conn, sanity_rfc2307):
# initialize cache with full ID
(res, errno, _) = sssd_id.get_user_groups("user1")
assert res == sssd_id.NssReturnCode.SUCCESS, \
"Could not find groups for user1, %d" % errno
assert_mc_records_for_user1()
subprocess.call(["sss_cache", "-E"])
stop_sssd()
assert_missing_mc_records_for_user1()
def test_invalidate_everything_after_stop(ldap_conn, sanity_rfc2307):
# initialize cache with full ID
(res, errno, _) = sssd_id.get_user_groups("user1")
assert res == sssd_id.NssReturnCode.SUCCESS, \
"Could not find groups for user1, %d" % errno
assert_mc_records_for_user1()
stop_sssd()
subprocess.call(["sss_cache", "-E"])
assert_missing_mc_records_for_user1()
def test_removed_mc(ldap_conn, sanity_rfc2307):
"""
Regression test for ticket:
https://fedorahosted.org/sssd/ticket/2726
"""
ent.assert_passwd_by_name(
'user1',
dict(name='user1', passwd='*', uid=1001, gid=2001,
gecos='1001', shell='/bin/bash'))
ent.assert_passwd_by_uid(
1001,
dict(name='user1', passwd='*', uid=1001, gid=2001,
gecos='1001', shell='/bin/bash'))
ent.assert_group_by_name("group1", dict(name="group1", gid=2001))
ent.assert_group_by_gid(2001, dict(name="group1", gid=2001))
stop_sssd()
# remove cache without invalidation
for path in os.listdir(config.MCACHE_PATH):
os.unlink(config.MCACHE_PATH + "/" + path)
# sssd is stopped; so the memory cache should not be used
# in long living clients (py.test in this case)
with pytest.raises(KeyError):
pwd.getpwnam('user1')
with pytest.raises(KeyError):
pwd.getpwuid(1001)
with pytest.raises(KeyError):
grp.getgrnam('group1')
with pytest.raises(KeyError):
grp.getgrgid(2001)