#!/usr/bin/env python # kernel-check -- Gentoo Kernel Security # Copyright 2009-2010 Gentoo Foundation # Distributed under the terms of the GNU General Public License v2 from contextlib import closing import xml.etree.cElementTree as et import cStringIO import datetime import hashlib import logging import mmap import os import portage import re import shutil import sys import time import urllib class CronError(Exception): def __init__(self, value): self.value = value CONST = { 'minyear' : 2002, 'maxyear' : 2012, 'nvdurl' : 'http://nvd.nist.gov/', 'bzurl' : 'https://bugs.gentoo.org/', 'state' : ['NEW', 'ASSIGNED', 'REOPENED', 'RESOLVED', 'VERIFIED', 'CLOSED'], 'resolut' : ['FIXED', 'LATER', 'TEST-REQUEST', 'UPSTREAM', '---'], 'bugorder' : ['bugid', 'reporter', 'reported', 'status', 'arch', 'affected'], 'cveorder' : ['cve', 'published', 'desc', 'severity', 'vector', 'score', 'refs'], 'filepath' : os.path.dirname(os.path.realpath(__file__)), 'portdir' : portage.settings['PORTDIR'] } PENDING = { 'published' : '0000-00-00', 'desc' : 'This PENDING identifier specifies all vulnerabilities ' \ 'which are not approved yet. PENDING is used by products, ' \ 'databases, and services to specify when a particular ' \ 'vulnerability element has been proposed as CVE entry.', 'severity' : 'Low', 'vector' : '()', 'score' : '0.0', 'refs' : et.Element('refs') } NOMATCH = { 'cve' : 'GENERIC-MAP-NOMATCH', 'published' : '0000-00-00', 'desc' : 'This GENERIC identifier is not specific to any ' \ 'vulnerability. GENERIC-MAP-NOMATCH is used by products, ' \ 'databases, and services to specify when a particular ' \ 'vulnerability element does not map to a corresponding ' \ 'CVE entry.', 'severity' : 'Low', 'vector' : '()', 'score' : '0.0', 'refs' : et.Element('refs') } PARAM = { 'delay' : 0.2, 'skip' : True, 'logfile' : False, #os.path.join(CONST['filepath'], 'cron.log'), 'tmpdir' : os.path.join(CONST['filepath'], 'tmp'), 'bugdir' : os.path.join(CONST['filepath'], 'tmp', 'bug'), 'nvddir' : os.path.join(CONST['filepath'], 'tmp', 'nvd'), 'outdir' : os.path.join(CONST['portdir'], 'metadata', 'kernel') } REGEX = { 'bugzilla' : re.compile(r'(?<=bug.cgi\?id=)\d*'), 'grp_all' : re.compile(r'(?<=\()[ (]*CVE-(\d{4})([-,(){}|, \d]+)(?=\))'), 'grp_split' : re.compile(r'(?<=\D)(\d{4})(?=\D|$)'), 'm_nomatch' : re.compile(r'.*GENERIC-MAP-NOMATCH.*'), 'wb_match' : re.compile(r'\s*\[\s*([^ +<=>]+)\s*([<=>]{1,2})' \ r'\s*([^ <=>\]]+)\s*(?:([<=>]{1,2})' \ r'\s*([^ \]]+))?\s*\]\s*(.*)'), 'wb_vers' : re.compile(r'^(?:\d{1,2}\.){0,3}\d{1,2}' \ r'(?:[-_](?:r|rc)?\d{1,2})*$') } CVES = dict() logging.basicConfig(format='[%(asctime)s] %(levelname)-6s : %(message)s', datefmt='%H:%M:%S', filename=PARAM['logfile'], level=logging.DEBUG) def main(argv): 'Main function' logging.info('Running cron') for item in sorted(PARAM): logging.info('Parameter %-8s = %s' % (item, '\'' + str(PARAM[item]) + '\'')) current_year = datetime.datetime.now().year if current_year < CONST['minyear'] or current_year > CONST['maxyear']: current_year = CONST['maxyear'] for directory in ['tmpdir', 'bugdir', 'nvddir', 'outdir']: if not os.path.isdir(PARAM[directory]): os.makedirs(PARAM[directory]) logging.info('Creating a filetable for all xml files') filetable = dict() for xmlfile in os.listdir(PARAM['outdir']): xmlfile = os.path.join(PARAM['outdir'], xmlfile) if os.path.isfile(xmlfile): with open(xmlfile, 'r') as data: filetable[xmlfile] = hashlib.md5(data.read()).hexdigest() else: logging.error('Invalid directory: %s' % xmlfile) logging.info('Receiving the latest xml file from the nvd') receive_file(PARAM['nvddir'], [CONST['nvdurl'], 'download/'],'nvdcve-recent.xml') if not PARAM['skip']: logging.info('Receiving earlier xml files from the nvd') for year in xrange(CONST['minyear'], current_year + 1): receive_file(PARAM['nvddir'], [CONST['nvdurl'], 'download/'], 'nvdcve-%s.xml' % str(year)) logging.info('Receiving the kernel vulnerability list from bugzilla') url = [CONST['bzurl'], 'buglist.cgi?query_format=advanced' \ '&component=Kernel'] for item in CONST['state']: url.append('&bug_status=' + item) for item in CONST['resolut']: url.append('&resolution=' + item) url.append('#') receive_file(PARAM['tmpdir'], url, 'bugzilla.xml') filename = os.path.join(PARAM['tmpdir'], 'bugzilla.xml') with open(filename, 'r+') as buglist_file: memory_map = mmap.mmap(buglist_file.fileno(), 0) buglist = REGEX['bugzilla'].findall(memory_map.read(-1)) logging.info('Found %i kernel vulnerabilities' % len(buglist)) logging.info('Creating the nvd dictionary') nvd_dict = parse_nvd_dict(PARAM['nvddir']) logging.info('Creating the xml files') created_files = 0 for item in buglist: try: receive_file(PARAM['bugdir'], [CONST['bzurl'], 'show_bug.cgi?ctype=xml&id='], item) vul = parse_bz_dict(PARAM['bugdir'], item) for cve in vul['cvelist']: if cve == NOMATCH['cve']: vul['cves'] = [NOMATCH['cve']] if len(vul['cvelist']) > 1: logging.error('\'Nomatch\' and valid cve: ' + item) else: try: vul['cves'].append(nvd_dict[cve]) except KeyError: logging.error('No Nvd entry: ' + cve) vul['cves'].append(cve) vul['pending'] = True write_xml_file(PARAM['outdir'], vul, filetable) created_files += 1 time.sleep(PARAM['delay']) except CronError, e: logging.error('[%s] %s' % (item, e.value)) logging.info('Created %i xml files' % created_files) for key in filetable.keys(): os.remove(key) logging.info('File removed %s' % key) def receive_file(directory, url, xml_file): 'Generic download function' filename = os.path.join(directory, xml_file) url.append(xml_file) try: with closing(cStringIO.StringIO()) as data: with closing(urllib.urlopen(''.join(url))) as resource: data.write(resource.read()) with open(filename, 'w') as output: output.write(data.getvalue()) except IOError: logging.error('File %s - Download failed!' % filename) return logging.debug('File %s - %sKB received' % (filename, os.path.getsize(filename)/1024)) def parse_nvd_dict(directory): 'Returns a dictionary from the National Vulnerability Database' nvd = dict() for nvdfile in os.listdir(directory): filename = os.path.join(directory, nvdfile) try: with open(filename, 'r+') as xml_data: memory_map = mmap.mmap(xml_data.fileno(), 0) except SyntaxError: continue root = et.parse(memory_map).getroot() namespace = root.tag[:-3] for tree in root: if tree.get('severity') is None \ or tree.get('CVSS_vector') is None \ or tree.get('CVSS_score') is None: continue cve = { 'cve' : tree.get('name'), 'published' : tree.get('published'), 'severity' : tree.get('severity'), 'vector' : tree.get('CVSS_vector'), 'score' : tree.get('CVSS_score') } desc = tree.find('%sdesc/%sdescript/' % (namespace, namespace)) if desc is not None: cve['desc'] = desc.text reftree = tree.find(namespace + 'refs') reftree.tag = reftree.tag.replace(namespace, '') for elem in reftree.findall('.//*'): elem.tag = elem.tag.replace(namespace, '') bugref = et.SubElement(reftree, 'ref') bugref.set('source', 'GENTOO') bugref.set('url', '%sshow_bug.cgi?id=%s' % (CONST['bzurl'], cve['cve'])) bugref.text = 'Gentoo %s' % cve['cve'] cve['refs'] = reftree nvd[cve['cve']] = cve return nvd def parse_bz_dict(directory, bugid): 'Returns a list containing information about a bug' filename = os.path.join(directory, bugid) try: with open(filename, 'r+') as xml_data: memory_map = mmap.mmap(xml_data.fileno(), 0) root = et.parse(memory_map).getroot()[0] except IOError: return string = str() try: string = root.find('short_desc').text except AttributeError: raise CronError('No Cve') try: cvelist = list() string = string.replace('CAN', 'CVE') if string in REGEX['m_nomatch'].findall(string): cvelist = [NOMATCH['cve']] for (year, split_cves) in REGEX['grp_all'].findall(string): for cve in REGEX['grp_split'].findall(split_cves): cvelist.append('CVE-%s-%s' % (year, cve)) if not cvelist: raise CronError('No Cve') vul = { 'bugid' : bugid, 'cvelist' : cvelist, 'cves' : list(), 'arch' : root.find('rep_platform').text.lower(), 'reporter' : root.find('reporter').text.lower(), 'reported' : root.find('creation_ts').text, 'status' : root.find('bug_status').text.lower(), 'pending' : False } for item in vul['cvelist']: if item != NOMATCH['cve']: if item not in CVES: CVES[item] = vul.bugid else: raise CronError('Duplicate: ' + CVES[item]) except AttributeError: pass try: whiteboard = root.find('status_whiteboard').text; vul['affected'] = interval_from_wb(whiteboard) if vul['affected'] == None: raise CronError('Invalid whiteboard: ' + whiteboard) except AttributeError: raise CronError('Empty whiteboard') return vul def interval_from_wb(whiteboard): 'Returns a list of intervals within a whiteboard string' upper_inc = None upper = None lower_inc = None lower = None affected = list() while len(whiteboard.strip()) > 0: match = REGEX['wb_match'].match(whiteboard) if not match: return None name = match.group(1) comp1 = match.group(2) vers1 = match.group(3) comp2 = match.group(4) vers2 = match.group(5) whiteboard = match.group(6) if comp1 == '=' or comp1 == '==': lower_inc = True upper_inc = True lower = vers1 upper = vers1 if not REGEX['wb_vers'].match(vers1): return None else: for (char, version) in ((comp1, vers1), (comp2, vers2)): if char == '<': upper_inc = False upper = version elif char == '<=' or char == '=<': upper_inc = True upper = version elif char == '>': lower_inc = False lower = version elif char == '>=' or char == '=>': lower_inc = True lower = version elif char: return None if version and not REGEX['wb_vers'].match(version): return None interval = { 'name' : name, 'lower' : lower, 'upper' : upper, 'lower_inc' : lower_inc, 'upper_inc' : upper_inc } affected.append(interval) return affected def write_xml_file(directory, vul, filetable): 'Writes a bug file containing all important information for kernel-check' filename = os.path.join(directory, vul['bugid'] + '.xml') root = et.Element('vulnerability') bugroot = et.SubElement(root, 'bug') for element in CONST['bugorder']: if element == 'affected': affectedroot = et.SubElement(bugroot, 'affected') for item in vul['affected']: intnode = et.Element('interval') intnode.set('source', item['name']) affectedroot.append(intnode) for interval in ('lower', 'upper'): if item[interval]: node = et.SubElement(intnode, interval) node.text = item[interval] node.set('inclusive', str(item[interval + '_inc']).lower()) else: node = et.SubElement(bugroot, element) node.text = vul[element] for cve in vul['cves']: cveroot = et.SubElement(root, 'cve') if cve == NOMATCH['cve']: for element in CONST['cveorder']: if element == 'refs': cveroot.append(NOMATCH[element]) else: node = et.SubElement(cveroot, element) node.text = NOMATCH[element] else: if vul['pending']: for element in CONST['cveorder']: if element == 'refs': cveroot.append(PENDING[element]) else: if element == 'cve': node = et.SubElement(cveroot, element) node.text = cve else: node = et.SubElement(cveroot, element) node.text = PENDING[element] else: for element in CONST['cveorder']: if element == 'refs': cveroot.append(cve[element]) else: node = et.SubElement(cveroot, element) node.text = cve[element] hashfile = os.path.join(PARAM['tmpdir'], 'hash.xml') xmlout_hash = str() try: with open(hashfile, 'w') as xmlout: __indent__(root) doc = et.ElementTree(root) doc.write(xmlout, encoding='utf-8') with open(hashfile, 'r') as xmlout: xmlout_hash = hashlib.md5(xmlout.read()).hexdigest() if filename in filetable: if filetable[filename] != xmlout_hash: shutil.move(hashfile, filename) logging.debug('File changed %s' % filename) del filetable[filename] else: shutil.move(hashfile, filename) logging.debug('File new %s' % filename) except Exception, e: logging.error("File creation failed: %s" % e) def __indent__(node, level=0): 'Indents xml layout for printing' i = '\n' + level * ' ' * 4 if len(node): if not node.text or not node.text.strip(): node.text = i + ' ' * 4 if not node.tail or not node.tail.strip(): node.tail = i for node in node: __indent__(node, level + 1) if not node.tail or not node.tail.strip(): node.tail = i else: if level and (not node.tail or not node.tail.strip()): node.tail = i if __name__ == '__main__': main(sys.argv[1:])