fixed TOCTOU error when validating peer's certificate
By TOCTOU it's meant time-of-check-time-of-use. Up to now, pywbem made
two connections for one request (applies just to ssl). The first one
made the verification (without the hostname check) and the second one
was used for request. No verification was done for the latter, which
could be abused.
Peer's certificate is now validated when connecting over ssl. To
prevent man-in-the-middle attack, verification of hostname is also
added. Peer's hostname must match the commonName of its certificate.
Or it must be contained in subjectAltName (list of aliases). M2Crypto
package is used for that purpose. Thanks to it both security
enhancements could be implemented quiete easily. Downside is a new
dependency added to pywbem. Verification can be skipped if
no_verification is set to False.
Certificate trust store can now be specified by user. Some default
paths, valid for several distributions, were added.
Authored by: miminar 2014-01-17
NOTE: The code and patches are littered with whitespace issues.
Generation of patches needs to be done carefully.
--- pywbem-0.7.0/cim_http.py.orig 2008-11-05 17:01:51.000000000 -0800
+++ pywbem-0.7.0/cim_http.py 2014-01-17 06:11:05.000000000 -0800
@@ -4,8 +4,7 @@
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as
-# published by the Free Software Foundation; either version 2 of the
-# License.
+# published by the Free Software Foundation; version 2 of the License.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
@@ -29,7 +28,8 @@ being transferred is XML. It is up to t
data and interpret the result.
'''
-import sys, string, re, os, socket, pwd
+from M2Crypto import SSL, Err
+import sys, string, re, os, socket, getpass
from stat import S_ISSOCK
import cim_obj
from types import StringTypes
@@ -59,6 +59,15 @@ def parse_url(url):
if m:
host = url[len(m.group(0)):]
+ # IPv6 with/without port
+ m = re.match("^\[?([0-9A-Fa-f:]*)\]?(:([0-9]*))?$", host)
+ if m:
+ host = m.group(1)
+ port_tmp = m.group(3)
+ if port_tmp:
+ port = int(port_tmp)
+ return host, port, ssl
+
s = string.split(host, ":") # Set port number
if len(s) != 1:
host = s[0]
@@ -66,8 +75,26 @@ def parse_url(url):
return host, port, ssl
+def get_default_ca_certs():
+ """
+ Try to find out system path with ca certificates. This path is cached and
+ returned. If no path is found out, None is returned.
+ """
+ if not hasattr(get_default_ca_certs, '_path'):
+ for path in (
+ '/etc/ssl/certs',
+ '/etc/ssl/certificates'):
+ if os.path.exists(path):
+ get_default_ca_certs._path = path
+ break
+ else:
+ get_default_ca_certs._path = None
+ return get_default_ca_certs._path
+
def wbem_request(url, data, creds, headers = [], debug = 0, x509 = None,
- verify_callback = None):
+ verify_callback = None, ca_certs = None,
+ no_verification = False):
"""Send XML data over HTTP to the specified url. Return the
response in XML. Uses Python's build-in httplib. x509 may be a
dictionary containing the location of the SSL certificate and key
@@ -97,9 +124,48 @@ def wbem_request(url, data, creds, heade
class HTTPSConnection(HTTPBaseConnection, httplib.HTTPSConnection):
def __init__(self, host, port=None, key_file=None, cert_file=None,
- strict=None):
+ strict=None, ca_certs=None, verify_callback=None):
httplib.HTTPSConnection.__init__(self, host, port, key_file,
cert_file, strict)
+ self.ca_certs = ca_certs
+ self.verify_callback = verify_callback
+
+ def connect(self):
+ "Connect to a host on a given (SSL) port."
+ if self._tunnel_host:
+ self.sock = sock
+ self._tunnel()
+ ctx = SSL.Context('sslv23')
+ if self.cert_file:
+ if self.ca_certs:
+ depth=9, callback=verify_callback)
+ if os.path.isdir(self.ca_certs):
+ ctx.load_verify_locations(capath=self.ca_certs)
+ else:
+ ctx.load_verify_locations(cafile=self.ca_certs)
+ try:
+ # Below is a body of SSL.Connection.connect() method
+ # except for the first line (socket connection). We want to preserve
+ # tunneling ability.
+ ret = self.sock.connect_ssl()
+ if self.ca_certs:
+ check = getattr(self.sock, 'postConnectionCheck',
+ if check is not None:
+ if not check(self.sock.get_peer_cert(), self.host):
+ raise Error('SSL error: post connection check failed')
+ return ret
+ , SSL.Checker.WrongHost), arg:
+ raise Error("SSL error: %s" % arg)
class FileHTTPConnection(HTTPBaseConnection, httplib.HTTPConnection):
def __init__(self, uds_path):
@@ -109,47 +175,36 @@ def wbem_request(url, data, creds, heade
- host, port, ssl = parse_url(url)
+ host, port, use_ssl = parse_url(url)
key_file = None
cert_file = None
- if ssl:
-
- if x509 is not None:
+ if use_ssl and x509 is not None:
cert_file = x509.get('cert_file')
key_file = x509.get('key_file')
- if verify_callback is not None:
- try:
- from OpenSSL import SSL
- ctx = SSL.Context(SSL.SSLv3_METHOD)
- ctx.set_verify(SSL.VERIFY_PEER, verify_callback)
- # Add the key and certificate to the session
- if cert_file is not None and key_file is not None:
- ctx.use_certificate_file(cert_file)
- ctx.use_privatekey_file(key_file)
- s.connect((host, port))
- s.do_handshake()
- s.shutdown()
- s.close()
- except socket.error, arg:
- raise Error("Socket error: %s" % (arg,))
- except socket.sslerror, arg:
- raise Error("SSL error: %s" % (arg,))
-
numTries = 0
localAuthHeader = None
tryLimit = 5
+ if isinstance(data, unicode):
+ data = data.encode('utf-8')
data = '<?xml version="1.0" encoding="utf-8" ?>\n' + data
+ if not no_verification and ca_certs is None:
+ ca_certs = get_default_ca_certs()
+ elif no_verification:
+ ca_certs = None
+
local = False
- if ssl:
- h = HTTPSConnection(host, port = port, key_file = key_file,
- cert_file = cert_file)
+ if use_ssl:
+ h = HTTPSConnection(host,
+ port = port,
+ key_file = key_file,
+ cert_file = cert_file,
+ ca_certs = ca_certs,
+ verify_callback = verify_callback)
else:
if url.startswith('http'):
h = HTTPConnection(host, port = port)
@@ -167,12 +222,12 @@ def wbem_request(url, data, creds, heade
raise Error('Invalid URL')
locallogin = None
- if host in ('localhost', '127.0.0.1'):
+ if host in ('localhost', 'localhost6', '127.0.0.1', '::1'):
local = True
if local:
uid = os.getuid()
try:
- locallogin = pwd.getpwuid(uid)[0]
+ locallogin = getpass.getuser()
except KeyError:
locallogin = None
while numTries < tryLimit:
@@ -191,6 +246,8 @@ def wbem_request(url, data, creds, heade
h.putheader('PegasusAuthorization', 'Local "%s"' % locallogin)
for hdr in headers:
+ if isinstance(hdr, unicode):
+ hdr = hdr.encode('utf-8')
s = map(lambda x: string.strip(x), string.split(hdr, ":", 1))
--- pywbem-0.7.0/cim_operations.py.orig 2008-12-12 09:40:22.000000000 -0800
+++ pywbem-0.7.0/cim_operations.py 2014-01-17 06:11:05.000000000 -0800
@@ -4,8 +4,7 @@
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as
-# published by the Free Software Foundation; either version 2 of the
-# License.
+# published by the Free Software Foundation; version 2 of the License.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
@@ -28,7 +27,7 @@ import sys, string
from types import StringTypes
from xml.dom import minidom
import cim_obj, cim_xml, cim_http, cim_types
-from cim_obj import CIMClassName, CIMInstanceName, CIMInstance, CIMClass
+from cim_obj import CIMClassName, CIMInstanceName, CIMInstance, CIMClass, NocaseDict
from datetime import datetime, timedelta
from tupletree import dom_to_tupletree, xml_to_tupletree
from tupleparse import parse_cim
@@ -79,12 +78,12 @@ class WBEMConnection(object):
the request before it is sent, and the reply before it is
unpacked.
- verify_callback is used to verify the server certificate.
- It is passed to OpenSSL.SSL.set_verify, and is called during the SSL
- handshake. verify_callback should take five arguments: A Connection
- object, an X509 object, and three integer variables, which are in turn
- potential error number, error depth and return code. verify_callback
- should return True if verification passes and False otherwise.
+ verify_callback is used to verify the server certificate. It is passed to
+ M2Crypto.SSL.Context.set_verify, and is called during the SSL handshake.
+ verify_callback should take five arguments: An SSL Context object, an X509
+ object, and three integer variables, which are in turn potential error
+ number, error depth and return code. verify_callback should return True if
+ verification passes and False otherwise.
The value of the x509 argument is used only when the url contains
'https'. x509 must be a dictionary containing the keys 'cert_file'
@@ -92,14 +91,27 @@ class WBEMConnection(object):
filename of an certificate and the value of 'key_file' must consist
of a filename containing the private key belonging to the public key
that is part of the certificate in cert_file.
+
+ ca_certs specifies where CA certificates for verification purposes are
+ located. These are trusted certificates. Note that the certificates have to
+ be in PEM format. Either it is a directory prepared using the c_rehash tool
+ included with OpenSSL or an pemfile. If None, default system path will be
+ used.
+
+ no_verification allows to disable peer's verification. This is insecure and
+ should be avoided. If True, peer's certificate is not verified and ca_certs
+ argument is ignored.
"""
def __init__(self, url, creds = None, default_namespace = DEFAULT_NAMESPACE,
- x509 = None, verify_callback = None):
+ x509 = None, verify_callback = None, ca_certs = None,
+ no_verification = False):
self.url = url
self.creds = creds
self.x509 = x509
self.verify_callback = verify_callback
+ self.ca_certs = ca_certs
+ self.no_verification = no_verification
self.last_request = self.last_reply = ''
self.default_namespace = default_namespace
self.debug = False
@@ -165,7 +177,9 @@ class WBEMConnection(object):
self.creds, headers,
x509 = self.x509,
- verify_callback = self.verify_callback)
+ verify_callback = self.verify_callback,
+ ca_certs = self.ca_certs,
+ no_verification = self.no_verification)
except cim_http.AuthError:
raise
except cim_http.Error, arg:
@@ -322,7 +336,9 @@ class WBEMConnection(object):
self.creds, headers,
x509 = self.x509,
- verify_callback = self.verify_callback)
+ verify_callback = self.verify_callback,
+ ca_certs = self.ca_certs,
+ no_verification = self.no_verification)
except cim_http.Error, arg:
# Convert cim_http exceptions to CIMError exceptions
raise CIMError(0, str(arg))
@@ -812,7 +828,7 @@ class WBEMConnection(object):
# Convert zero or more PARAMVALUE elements into dictionary
- output_params = {}
+ output_params = NocaseDict()
for p in result:
if p[1] == 'reference':