Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
bpo-18233: Add SSLSocket.getpeercertchain()
Based on the patch provided by Christian Heimes (christian.heimes) and updated by Mariusz Masztalerczuk (mmasztalerczuk).
  • Loading branch information
chrisburr committed Dec 9, 2020
commit 7008f61a1b93b321905d1ecefa10f5fa12d46b28
15 changes: 15 additions & 0 deletions Doc/library/ssl.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1259,6 +1259,21 @@ SSL sockets also have the following additional methods and attributes:
.. versionchanged:: 3.9
IPv6 address strings no longer have a trailing new line.

.. method:: SSLSocket.getpeercertchain(binary_form=False, validate=True)

Returns certificate chain for the peer. If no chain is provided, returns
None. Otherwise returns a tuple of dicts containing information about the
certificates. The chain starts with the leaf certificate and ends with the
root certificate. If called on the client side, the leaf certificate is the
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's not necessarily correct. SSL_get_peer_cert_chain() returns whatever the client sends. It may just be the EE cert (end entity), EE+intermediates, EE+intermediate+root, EE with intermediates for primary and alternative chains, or whatever the admin for the site has configured. I guess the peer chain can even include unrelated junk.

It's also worth mentioning that the chain is not available with TLS session resumption.

peer's certificate.

If the optional argument *binary_form* is True, return a list of *binary_form*-encoded copies
of the certificates.
If the optional argument *validate* is False, return the peer's cert chain
without any validation and without the root CA cert.");

.. versionadded:: 3.9

.. method:: SSLSocket.cipher()

Returns a three-value tuple containing the name of the cipher being used, the
Expand Down
13 changes: 13 additions & 0 deletions Lib/ssl.py
Original file line number Diff line number Diff line change
Expand Up @@ -905,6 +905,13 @@ def getpeercert(self, binary_form=False):
"""
return self._sslobj.getpeercert(binary_form)

def getpeercertchain(self, binary_form=False, validate=True):
""""Returns the certificate chain of the SSL connection
as tuple of dicts.

Return None if no chain is provieded."""
return self._sslobj.getpeercertchain(binary_form, validate)

def selected_npn_protocol(self):
"""Return the currently selected NPN protocol as a string, or ``None``
if a next protocol was not negotiated or if NPN is not supported by one
Expand Down Expand Up @@ -1123,6 +1130,12 @@ def getpeercert(self, binary_form=False):
self._check_connected()
return self._sslobj.getpeercert(binary_form)

@_sslcopydoc
def getpeercertchain(self, binary_form=False, validate=True):
self._checkClosed()
self._check_connected()
return self._sslobj.getpeercertchain(binary_form, validate)

@_sslcopydoc
def selected_npn_protocol(self):
self._checkClosed()
Expand Down
30 changes: 30 additions & 0 deletions Lib/test/test_ssl.py
Original file line number Diff line number Diff line change
Expand Up @@ -2160,6 +2160,36 @@ def test_get_ca_certs_capath(self):
self.assertTrue(cert)
self.assertEqual(len(ctx.get_ca_certs()), 1)

def test_getpeercertchain(self):
ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
ctx.verify_mode = ssl.CERT_REQUIRED
ctx.load_verify_locations(capath=CAPATH)
with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
s.connect(self.server_addr)
try:
peer_cert = s.getpeercert()
peer_cert_bin = s.getpeercert(True)
chain = s.getpeercertchain()
chain_bin = s.getpeercertchain(True)
chain_no_validate = s.getpeercertchain(validate=False)
chain_bin_no_validate = s.getpeercertchain(True, False)
finally:
self.assertTrue(peer_cert)
self.assertEqual(len(chain), 2)
self.assertTrue(peer_cert_bin)
self.assertEqual(len(chain_bin), 2)

# ca cert
ca_certs = ctx.get_ca_certs()
self.assertEqual(len(ca_certs), 1)
test_get_ca_certsert = ca_certs[0]
ca_cert_bin = ctx.get_ca_certs(True)[0]

self.assertEqual(chain, (peer_cert, test_get_ca_certsert))
self.assertEqual(chain_bin, (peer_cert_bin, ca_cert_bin))
self.assertEqual(chain_no_validate, (peer_cert,))
self.assertEqual(chain_bin_no_validate, (peer_cert_bin,))

@needs_sni
def test_context_setget(self):
# Check that the context of a connected socket can be replaced.
Expand Down
128 changes: 128 additions & 0 deletions Modules/_ssl.c
Original file line number Diff line number Diff line change
Expand Up @@ -2096,6 +2096,133 @@ _ssl__SSLSocket_cipher_impl(PySSLSocket *self)
return cipher_to_tuple(current);
}

/*[clinic input]
_ssl._SSLSocket.getpeercertchain
der as binary_mode: bool = False
validate: bool = True
[clinic start generated code]*/

static PyObject *
_ssl__SSLSocket_getpeercertchain_impl(PySSLSocket *self, int binary_mode,
int validate)
/*[clinic end generated code: output=8094e6d78d27eb9a input=f4dcd181d0d163eb]*/
{
int len, i;
PyObject *retval = NULL, *ci=NULL;
STACK_OF(X509) *peer_chain; /* reference */

assert((self->ctx != NULL) && (self->ctx->ctx != NULL));
if (self->ssl == NULL) {
Py_RETURN_NONE;
}

/* The peer just transmits the intermediate cert chain EXCLUDING the root
* CA certificate as this side is suppose to have a copy of the root
* certificate for verification. */
if (validate) {
#ifdef OPENSSL_VERSION_1_1
peer_chain = SSL_get0_verified_chain(self->ssl);
long ret = SSL_get_verify_result(self->ssl);
if (ret != X509_V_OK) {
#ifdef SSL_R_CERTIFICATE_VERIFY_FAILED
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SSL_R_CERTIFICATE_VERIFY_FAILED is always defined on 1.1.0+.

long e = ERR_PACK(ERR_LIB_SSL, 0, SSL_R_CERTIFICATE_VERIFY_FAILED);
#else
long e = ERR_PACK(ERR_LIB_SSL, 0, 134);
#endif
fill_and_set_sslerror(self, PySSLCertVerificationErrorObject, PY_SSL_ERROR_SSL, NULL, __LINE__, e);
return NULL;
}
#else
X509 *peer_cert = SSL_get_peer_certificate(self->ssl);
if (peer_cert == NULL)
Py_RETURN_NONE;

STACK_OF(X509) *chain = SSL_get_peer_cert_chain(self->ssl);
if (chain == NULL) {
X509_free(peer_cert);
Py_RETURN_NONE;
}
X509_STORE_CTX *store_ctx;

/* Initialize a store context with store (for root CA certs), the
* peer's cert and the peer's chain with intermediate CA certs. */
if ((store_ctx = X509_STORE_CTX_new()) == NULL) {
X509_free(peer_cert);
_setSSLError(NULL, 0, __FILE__, __LINE__);
return NULL;
}

if (!X509_STORE_CTX_init(store_ctx,
SSL_CTX_get_cert_store(self->ctx->ctx),
peer_cert, chain)) {
#ifdef SSL_R_CERTIFICATE_VERIFY_FAILED
long e = ERR_PACK(ERR_LIB_SSL, 0, SSL_R_CERTIFICATE_VERIFY_FAILED);
#else
long e = ERR_PACK(ERR_LIB_SSL, 0, 134);
#endif
fill_and_set_sslerror(self, PySSLCertVerificationErrorObject, PY_SSL_ERROR_SSL, NULL, __LINE__, e);
X509_free(peer_cert);
X509_STORE_CTX_free(store_ctx);
goto end;
}
X509_free(peer_cert);

/* Validate peer cert using its intermediate CA certs and the
* context's root CA certs. */
if (X509_verify_cert(store_ctx) <= 0) {
// _setX509StoreContextError(self, store_ctx, __FILE__, __LINE__);
#ifdef SSL_R_CERTIFICATE_VERIFY_FAILED
long e = ERR_PACK(ERR_LIB_SSL, 0, SSL_R_CERTIFICATE_VERIFY_FAILED);
#else
long e = ERR_PACK(ERR_LIB_SSL, 0, 134);
#endif
fill_and_set_sslerror(self, PySSLCertVerificationErrorObject, PY_SSL_ERROR_SSL, NULL, __LINE__, e);
X509_STORE_CTX_free(store_ctx);
goto end;
}

/* Get chain from store context */
peer_chain = X509_STORE_CTX_get1_chain(store_ctx);
X509_STORE_CTX_free(store_ctx);
#endif
} else {
peer_chain = SSL_get_peer_cert_chain(self->ssl);
}

if (peer_chain == NULL) {
Py_RETURN_NONE;
}

len = sk_X509_num(peer_chain);

if ((retval = PyTuple_New(len)) == NULL) {
return NULL;
}

for (i = 0; i < len; i++){
X509 *cert = sk_X509_value(peer_chain, i);
if (binary_mode) {
ci = _certificate_to_der(cert);
} else {
ci = _decode_certificate(cert);
}

if (ci == NULL) {
Py_CLEAR(retval);
goto end;
}
PyTuple_SET_ITEM(retval, i, ci);
}

end:
#ifndef OPENSSL_VERSION_1_1
if (validate && (peer_chain != NULL)) {
sk_X509_pop_free(peer_chain, X509_free);
}
#endif
return retval;
}

/*[clinic input]
_ssl._SSLSocket.version
[clinic start generated code]*/
Expand Down Expand Up @@ -3000,6 +3127,7 @@ static PyMethodDef PySSLMethods[] = {
_SSL__SSLSOCKET_GET_CHANNEL_BINDING_METHODDEF
_SSL__SSLSOCKET_CIPHER_METHODDEF
_SSL__SSLSOCKET_SHARED_CIPHERS_METHODDEF
_SSL__SSLSOCKET_GETPEERCERTCHAIN_METHODDEF
_SSL__SSLSOCKET_VERSION_METHODDEF
_SSL__SSLSOCKET_SELECTED_NPN_PROTOCOL_METHODDEF
_SSL__SSLSOCKET_SELECTED_ALPN_PROTOCOL_METHODDEF
Expand Down
52 changes: 51 additions & 1 deletion Modules/clinic/_ssl.c.h

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.