2010-03-02 9 views
8

Ich verwende httplib2, um eine Anfrage von meinem Server an einen anderen Webdienst zu stellen. Wir möchten die gegenseitige Zertifikatauthentifizierung verwenden. Ich sehe, wie man ein Zertifikat für die ausgehende Verbindung verwendet (h.set_certificate), aber wie überprüfe ich das vom Antwortserver verwendete Zertifikat?So führen Sie die gegenseitige Zertifikatauthentifizierung mit httplib2 durch

This ticket scheint darauf hinzuweisen, dass httplib2 es nicht selbst tut, und hat nur vage Vorschläge, wo zu suchen.

Ist es möglich? Muss ich auf einer niedrigeren Ebene hacken?

Antwort

5

Hier ist der Code mein Kollege Dave St. Germain schrieb das Problem zu lösen:

import ssl 
import socket 
from httplib2 import has_timeout 
import httplib2 
import socks 


class CertificateValidationError(httplib2.HttpLib2Error): 
    pass 


def validating_sever_factory(ca_cert_file): 
    # we need to define a closure here because we don't control 
    # the arguments this class is instantiated with 
    class ValidatingHTTPSConnection(httplib2.HTTPSConnectionWithTimeout): 

     def connect(self): 
      # begin copypasta from HTTPSConnectionWithTimeout 
      "Connect to a host on a given (SSL) port." 

      if self.proxy_info and self.proxy_info.isgood(): 
       sock = socks.socksocket(socket.AF_INET, socket.SOCK_STREAM) 
       sock.setproxy(*self.proxy_info.astuple()) 
      else: 
       sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 

      if has_timeout(self.timeout): 
       sock.settimeout(self.timeout) 
      sock.connect((self.host, self.port)) 
      # end copypasta 


      try: 
       self.sock = ssl.wrap_socket(sock, 
          self.key_file, 
          self.cert_file, 
          cert_reqs=ssl.CERT_REQUIRED, 
          ca_certs=ca_cert_file 
          ) 
      except ssl.SSLError: 
       # we have to capture the exception here and raise later because 
       # httplib2 tries to ignore exceptions on connect 
       import sys 
       self._exc_info = sys.exc_info() 
       raise 
      else: 
       self._exc_info = None 

       # this might be redundant 
       server_cert = self.sock.getpeercert() 
       if not server_cert: 
        raise CertificateValidationError(repr(server_cert)) 

     def getresponse(self): 
      if not self._exc_info: 
       return httplib2.HTTPSConnectionWithTimeout.getresponse(self) 
      else: 
       raise self._exc_info[1], None, self._exc_info[2] 
    return ValidatingHTTPSConnection 


def do_request(url, 
     method='GET', 
     body=None, 
     headers=None, 
     keyfile=None, 
     certfile=None, 
     ca_certs=None, 
     proxy_info=None, 
     timeout=30): 
    """ 
    makes an http/https request, with optional client certificate and server 
    certificate verification. 
    returns response, content 
    """ 
    kwargs = {} 
    h = httplib2.Http(proxy_info=proxy_info, timeout=timeout) 
    is_ssl = url.startswith('https') 
    if is_ssl and ca_certs: 
     kwargs['connection_type'] = validating_sever_factory(ca_certs) 

    if is_ssl and keyfile and certfile: 
     h.add_certificate(keyfile, certfile, '') 
    return h.request(url, method=method, body=body, headers=headers, **kwargs) 
3

vielleicht haben da Ihre Frage änderten sich die Dinge, ich bin in der Lage die gegenseitige Authentifizierung mit httplib2 v0.7 zu tun, wie unten:

import httplib2 

h=httplib2.Http(ca_certs='ca.crt') 
h.add_certificate(key='client_private_key.pem', cert='cert_client.pem', domain='') 
try: resp, cont = h.request('https://mytest.com/cgi-bin/test.cgi') 
except Exception as e: print e 

Notiere die domain='' Argument, das ist der einzige Weg, ich es

+2

nach [diesen Link] machen könnte funktionieren (http://nullege.com/codes/search/httplib2 .Http.add_certificate), 'domain' muss nur ein Hostname und Port sein. – arturo