Skip to content

Allow using an OpenSSL hashed directory for verification in X509Store #943

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 11, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ Deprecations:
Changes:
^^^^^^^^

- Added ``OpenSSL.crypto.X509Store.load_locations`` to set trusted
certificate file bundles and/or directories for verification.
`#943 <https://github.com/pyca/pyopenssl/pull/943>`_
- Added ``Context.set_keylog_callback`` to log key material.
`#910 <https://github.com/pyca/pyopenssl/pull/910>`_
- Added ``OpenSSL.SSL.Connection.get_verified_chain`` to retrieve the
Expand Down
48 changes: 48 additions & 0 deletions src/OpenSSL/crypto.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
exception_from_error_queue as _exception_from_error_queue,
byte_string as _byte_string,
native as _native,
path_string as _path_string,
UNSPECIFIED as _UNSPECIFIED,
text_to_bytes_and_warn as _text_to_bytes_and_warn,
make_assert as _make_assert,
Expand Down Expand Up @@ -1674,6 +1675,53 @@ def set_time(self, vfy_time):
_lib.X509_VERIFY_PARAM_set_time(param, int(vfy_time.strftime("%s")))
_openssl_assert(_lib.X509_STORE_set1_param(self._store, param) != 0)

def load_locations(self, cafile, capath=None):
"""
Let X509Store know where we can find trusted certificates for the
certificate chain. Note that the certificates have to be in PEM
format.

If *capath* is passed, it must be a directory prepared using the
``c_rehash`` tool included with OpenSSL. Either, but not both, of
*cafile* or *capath* may be ``None``.

.. note::

Both *cafile* and *capath* may be set simultaneously.

Call this method multiple times to add more than one location.
For example, CA certificates, and certificate revocation list bundles
may be passed in *cafile* in subsequent calls to this method.

.. versionadded:: 20.0

:param cafile: In which file we can find the certificates (``bytes`` or
``unicode``).
:param capath: In which directory we can find the certificates
(``bytes`` or ``unicode``).

:return: ``None`` if the locations were set successfully.

:raises OpenSSL.crypto.Error: If both *cafile* and *capath* is ``None``
or the locations could not be set for any reason.

"""
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if both are NULL or both are provided?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When both are None, the X509_STORE_load_locations() call will return 0, so an OpenSSL.crypto.Error will be raised. It's not very helpful, a ValueError might have been better, by checking the args explicitly. I did not want to diverge from OpenSSL.SSL.Context.load_verify_locations(), which behaves similarly, so I kept it this way. But since OpenSSL.SSL.Error is different from OpenSSL.crypto.Error anyway, I could add the check and raise a ValueError.

When both are provided, both "locations" will be used. Even better, load_locations can be called multiple times, to add more and more locations. Internally, OpenSSL just adds them to the store as X509_LOOKUPs. Should I update the docstring with this info?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since pyOpenSSL is generally a thin wrapper around OpenSSL's API I'm fine with it raising crypto.Error. If you could update the docstring here then I think this is pretty much good to go. Thanks!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've updated and also extended the docstring to mention that this method can be called multiple times, and added some tests.

One more question: I see that many methods have a ::versionadded line. Should I add one for load_locations()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(... and removed 2 empty lines in another force-push, sorry...)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes a .. versionadded:: 20.0 would be appropriate.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

if cafile is None:
cafile = _ffi.NULL
else:
cafile = _path_string(cafile)

if capath is None:
capath = _ffi.NULL
else:
capath = _path_string(capath)

load_result = _lib.X509_STORE_load_locations(
self._store, cafile, capath
)
if not load_result:
_raise_current_error()


class X509StoreContextError(Exception):
"""
Expand Down
136 changes: 135 additions & 1 deletion tests/test_crypto.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import base64
from subprocess import PIPE, Popen
from datetime import datetime, timedelta
import sys

import pytest

Expand Down Expand Up @@ -46,7 +47,14 @@
get_elliptic_curves,
)

from .util import EqualityTestsMixin, is_consistent_type, WARNING_TYPE_EXPECTED
from OpenSSL._util import ffi as _ffi, lib as _lib

from .util import (
EqualityTestsMixin,
is_consistent_type,
WARNING_TYPE_EXPECTED,
NON_ASCII,
)


def normalize_privatekey_pem(pem):
Expand Down Expand Up @@ -2228,6 +2236,68 @@ def test_add_cert_accepts_duplicate(self):
store.add_cert(cert)
store.add_cert(cert)

@pytest.mark.parametrize(
"cafile, capath, call_cafile, call_capath",
[
(
"/cafile" + NON_ASCII,
None,
b"/cafile" + NON_ASCII.encode(sys.getfilesystemencoding()),
_ffi.NULL,
),
(
b"/cafile" + NON_ASCII.encode("utf-8"),
None,
b"/cafile" + NON_ASCII.encode("utf-8"),
_ffi.NULL,
),
(
None,
"/capath" + NON_ASCII,
_ffi.NULL,
b"/capath" + NON_ASCII.encode(sys.getfilesystemencoding()),
),
(
None,
b"/capath" + NON_ASCII.encode("utf-8"),
_ffi.NULL,
b"/capath" + NON_ASCII.encode("utf-8"),
),
],
)
def test_load_locations_parameters(
self, cafile, capath, call_cafile, call_capath, monkeypatch
):
class LibMock(object):
def load_locations(self, store, cafile, capath):
self.cafile = cafile
self.capath = capath
return 1

lib_mock = LibMock()
monkeypatch.setattr(
_lib, "X509_STORE_load_locations", lib_mock.load_locations
)

store = X509Store()
store.load_locations(cafile=cafile, capath=capath)

assert call_cafile == lib_mock.cafile
assert call_capath == lib_mock.capath

def test_load_locations_fails_when_all_args_are_none(self):
store = X509Store()
with pytest.raises(Error):
store.load_locations(None, None)

def test_load_locations_raises_error_on_failure(self, tmpdir):
invalid_ca_file = tmpdir.join("invalid.pem")
invalid_ca_file.write("This is not a certificate")

store = X509Store()
with pytest.raises(Error):
store.load_locations(cafile=str(invalid_ca_file))


class TestPKCS12(object):
"""
Expand Down Expand Up @@ -3884,6 +3954,70 @@ def test_get_verified_chain_invalid_chain_no_root(self):
assert exc.value.args[0][2] == "unable to get issuer certificate"
assert exc.value.certificate.get_subject().CN == "intermediate"

@pytest.fixture
def root_ca_file(self, tmpdir):
return self._create_ca_file(tmpdir, "root_ca_hash_dir", self.root_cert)

@pytest.fixture
def intermediate_ca_file(self, tmpdir):
return self._create_ca_file(
tmpdir, "intermediate_ca_hash_dir", self.intermediate_cert
)

@staticmethod
def _create_ca_file(base_path, hash_directory, cacert):
ca_hash = "{:08x}.0".format(cacert.subject_name_hash())
cafile = base_path.join(hash_directory, ca_hash)
cafile.write_binary(
dump_certificate(FILETYPE_PEM, cacert), ensure=True
)
return cafile

def test_verify_with_ca_file_location(self, root_ca_file):
store = X509Store()
store.load_locations(str(root_ca_file))

store_ctx = X509StoreContext(store, self.intermediate_cert)
store_ctx.verify_certificate()

def test_verify_with_ca_path_location(self, root_ca_file):
store = X509Store()
store.load_locations(None, str(root_ca_file.dirname))

store_ctx = X509StoreContext(store, self.intermediate_cert)
store_ctx.verify_certificate()

def test_verify_with_cafile_and_capath(
self, root_ca_file, intermediate_ca_file
):
store = X509Store()
store.load_locations(
cafile=str(root_ca_file), capath=str(intermediate_ca_file.dirname)
)

store_ctx = X509StoreContext(store, self.intermediate_server_cert)
store_ctx.verify_certificate()

def test_verify_with_multiple_ca_files(
self, root_ca_file, intermediate_ca_file
):
store = X509Store()
store.load_locations(str(root_ca_file))
store.load_locations(str(intermediate_ca_file))

store_ctx = X509StoreContext(store, self.intermediate_server_cert)
store_ctx.verify_certificate()

def test_verify_failure_with_empty_ca_directory(self, tmpdir):
store = X509Store()
store.load_locations(None, str(tmpdir))

store_ctx = X509StoreContext(store, self.intermediate_cert)
with pytest.raises(X509StoreContextError) as exc:
store_ctx.verify_certificate()

assert exc.value.args[0][2] == "unable to get local issuer certificate"


class TestSignVerify(object):
"""
Expand Down