# vim: set sw=4 sts=4 et : # Copyright: 2008-2009 Gentoo Foundation # Author(s): Nirbheek Chauhan # License: GPL-3 # import subprocess, os from .. import config class Crypto(object): """ Data Encrypter/Decrypter """ def __init__(self, gpghome=config.GPGHOME): """ @param gpghome: Home directory for GPG @type gpghome: string """ self.gpghome = gpghome self.gpgcmd = 'gpg -a --keyid-format long --trust-model always ' self.gpgcmd += '--homedir="%s" ' % self.gpghome def _validate_gpghome(self): try: gpghome = open(self.gpghome+'/secring.gpg') except (IOError, OSError): raise Exception('"%s": Unable to use GPG homedir' % self.gpghome) finally: gpghome.close() def _get_fp_from_keyid(self, keyid): gpg_args = '--with-colons --fingerprint --list-keys "%s"' % keyid process = subprocess.Popen(self.gpgcmd+gpg_args, shell=True, stdout=subprocess.PIPE) output = process.stdout.readlines() process.wait() for line in output: # Fingerprint line if line.startswith('fpr'): # Fingerprint return line.split(':')[-2] def export_pubkey(self, file, which): self._validate_gpghome() gpg_args = '--export "%s" > "%s"' % (which, file) print self.gpgcmd+gpg_args subprocess.check_call(self.gpgcmd+gpg_args, shell=True) def import_pubkey(self, pubkey): self._validate_gpghome() gpg_args = '--import <<<"%s"' % pubkey subprocess.check_call(self.gpgcmd+gpg_args, shell=True) def init_gpghome(self, name='Test AutotuA Slave', email='test_slave@test.org', expire='5y', length='4096'): """ Initialize a GnuPG home by generating keys """ params = (('Key-Type', 'DSA'), ('Key-Length', '1024'), ('Subkey-Type', 'ELG-E'), ('Subkey-Length', length), ('Name-Real', name), ('Name-Email', email), ('Expire-Date', expire),) # Batch mode gpg_args = '--batch --gen-key <<<"' gpg_args += '%echo Generating keys.. [Worship the gods of randomness :p]' for param in params: gpg_args += '\n%s: %s' % param gpg_args += '"' subprocess.check_call(self.gpgcmd+gpg_args, shell=True) print 'Done.' def encrypt(self, data, recipient='Autotua Master'): """ @param data: Data to be encrypted @type data: string @param recipient: Recipient for the data @type recipient: string returns: encrypted_data """ self._validate_gpghome() gpg_args = '--encrypt --sign --recipient "%s" <<<"%s"' % (recipient, data) process = subprocess.Popen(self.gpgcmd+gpg_args, shell=True, stdout=subprocess.PIPE) data = process.stdout.read() process.wait() if process.returncode < 0: raise Exception('Unable to encrypt, something went wrong :(') return data def decrypt(self, data): """ @param data: Data to be encrypted @type data: string returns: (decrypted_data, sender) """ self._validate_gpghome() gpg_args = '--decrypt <<<"%s"' % data process = subprocess.Popen(self.gpgcmd+gpg_args, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) ddata = process.stdout.read()[:-1] # Extra \n at the end :-/ # Get the output to stderr gpg_out = process.stderr.readlines() process.wait() if process.returncode < 0 or not ddata: raise Exception('Unable to decrypt, something went wrong') # Get the line with the DSA long key ID for line in gpg_out: if line.find('using DSA key') != -1: # Get the long key ID gpg_out = line.split()[-1] break # Get the fingerprint sender = self._get_fp_from_keyid(gpg_out) return (ddata, sender)