summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'lib/kernellib.py')
-rwxr-xr-xlib/kernellib.py640
1 files changed, 640 insertions, 0 deletions
diff --git a/lib/kernellib.py b/lib/kernellib.py
new file mode 100755
index 0000000..d320c20
--- /dev/null
+++ b/lib/kernellib.py
@@ -0,0 +1,640 @@
+#!/usr/bin/env python
+# kernel-check -- Kernel security information
+# Copyright 2009-2009 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+from __future__ import with_statement
+from contextlib import closing
+import xml.etree.cElementTree as et
+import cStringIO
+import datetime
+import inspect
+import logging
+import mmap
+import os
+import portage
+import re
+import urllib
+
+
+ARCHES = [
+ 'all', 'alpha', 'amd64', 'amd64-fbsd', 'arm', 'hppa', 'ia64', 'm68k',
+ 'mips', 'ppc', 'ppc64', 's390', 'sh', 'sparc', 'sparc-fbsd', 'x86',
+ 'x86-fbsd'
+]
+
+BUGORDER = ['bugid', 'reporter', 'reported', 'status', 'arch', 'affected']
+CVEORDER = ['cve', 'published', 'desc', 'severity', 'vector', 'score', 'refs']
+
+REGEX = {
+ 'gp_version' : re.compile(r'(?<=K_GENPATCHES_VER\=\").+(?=\")'),
+ 'gp_want' : re.compile(r'(?<=K_WANT_GENPATCHES\=\").+(?=\")'),
+ 'k_version' : re.compile(r'^((?:\d{1,2}\.){0,4}\d{1,2})(-.*)?$'),
+ 'rc_kernel' : re.compile(r'^rc\d{1,3}$'),
+ 'git_kernel' : re.compile(r'^git(\d{1,3})$'),
+ 'r_kernel' : re.compile(r'^r\d{1,3}$')
+}
+
+SUPPORTED = ['gentoo', 'vanilla', 'hardened']
+
+KERNEL_TYPES = [
+ 'aa', 'acpi', 'ac', 'alpha', 'arm', 'as', 'cell', 'ck', 'compaq', 'crypto',
+ 'development', 'gaming','gentoo-dev', 'gentoo', 'gentoo-test', 'gfs',
+ 'git', 'grsec', 'gs', 'hardened-dev', 'hardened', 'hppa-dev', 'hppa',
+ 'ia64', 'kurobox', 'linux', 'lolo', 'mips-prepatch', 'mips', 'mjc', 'mm',
+ 'mosix', 'openblocks', 'openmosix','openvz', 'pac', 'pegasos-dev',
+ 'pegasos', 'pfeifer', 'planet-ccrma', 'ppc64', 'ppc-development',
+ 'ppc-dev', 'ppc', 'redhat', 'rsbac-dev', 'rsbac', 'selinux', 'sh',
+ 'sparc-dev', 'sparc', 'suspend2', 'systrace', 'tuxonice', 'uclinux',
+ 'usermode', 'vanilla-prepatch', 'vanilla', 'vanilla-tiny', 'vserver-dev',
+ 'vserver', 'win4lin', 'wolk-dev', 'wolk', 'xbox', 'xen', 'xfs'
+]
+
+VERSION = '0.3.9'
+NOCVE = 'GENERIC-MAP-NOMATCH'
+NOCVEDESC = 'This GENERIC identifier is not specific to any vulnerability. '\
+ 'GENERIC-MAP-NOMATCH is used by products, databases, and ' \
+ 'services to specify when a particular vulnerability element ' \
+ 'does not map to a corresponding CVE entry.'
+CVES = dict()
+DEBUG = False
+VERBOSE = False
+FORCE = False
+SKIP = False
+DELAY = 0
+FILEPATH = os.path.dirname(os.path.realpath(__file__))
+PORTDIR = portage.settings['PORTDIR']
+DIR = {
+ 'tmp' : os.path.join(FILEPATH, 'tmp'),
+ 'out' : os.path.join(PORTDIR, 'metadata', 'kernel'),
+ 'bug' : os.path.join(FILEPATH, 'tmp', 'bug'),
+ 'nvd' : os.path.join(FILEPATH, 'tmp', 'nvd')
+}
+
+def BUG_ON(msg):
+ if DEBUG:
+ print 'DEBUG line %s in %s(): %s' % (inspect.stack()[1][2],
+ inspect.stack()[1][3], msg)
+
+class Evaluation:
+ """Evaluation class
+
+ Provides information about the vulnerability of a kernel.
+ """
+
+ read = int()
+ arch = int()
+ affected = list()
+ unaffected = list()
+
+ def __init__(self):
+ self.affected = list()
+ self.unaffected = list()
+
+
+class Comparison:
+ """Comparison class
+ """
+
+ fixed = int()
+ new = list()
+ #TODO add more information
+
+ def __init__(self):
+ self.fixed = list()
+ self.new = list()
+
+
+class Cve:
+ """Common vulnerabilities and exposures class
+
+ Contains all important information about a cve.
+
+ Attributes:
+ cve: a string represeting the cve number of the class.
+ desc: a string providing a detailed description for the cve.
+ published: a string representing the original cve release date.
+ refs: a list of external references.
+ severity: a string representing the cve severity.
+ score: a floating point representing cvss base score.
+ vector: a string providing the cve access vector.
+ """
+
+ cve = str()
+ desc = str()
+ published = str()
+ refs = list()
+ severity = str()
+ score = float()
+ vector = str()
+
+ def __init__(self, cve):
+ self.cve = cve
+
+ def __eq__(self, other):
+ return (self.cve == other.cve) #FIXME is this enough?
+
+ def __ne__(self, other):
+ return not self.__eq__(other)
+
+
+class Genpatch:
+ 'Genpatch class'
+
+ base = bool()
+ extras = bool()
+ kernel = None
+ version = str()
+
+ def __init__(self, version):
+ self.version = version
+
+
+ def __repr__(self):
+ if self.base and self.extras:
+ return 'base extras'
+ if self.base:
+ return 'base'
+ if self.extras:
+ return 'extras'
+
+
+ def __eq__(self, other):
+ if self.kernel == other.kernel:
+ return (''.join((str(self.base), str(self.extras), self.version))
+ == ''.join((str(other.base), str(other.extras), other.version)))
+ else:
+ return False
+
+
+ def __ne__(self, other):
+ return not self.__eq__(other)
+
+
+class Kernel:
+ 'Kernel class'
+
+ revision = str()
+ source = str()
+ version = str()
+ genpatch = None
+
+ def __init__(self, source):
+ self.source = source
+
+
+ def __repr__(self):
+ return str(self.version + '-' + self.source + '-' + self.revision)
+
+
+ def __eq__(self, other):
+ return (''.join((self.revision, self.source, self.version,
+ str(self.genpatch))) == ''.join((other.revision,
+ other.source, other.version, str(other.genpatch))))
+
+
+ def __ne__(self, other):
+ return not self.__eq__(other)
+
+
+class Vulnerability:
+ 'Vulnerability class'
+
+ arch = str()
+ bugid = int()
+ cvelist = list()
+ cves = list()
+ affected = list()
+ reported = str()
+ reporter = str()
+ status = str()
+
+ def __init__(self, bugid):
+ self.bugid = bugid
+
+ def __eq__(self, other):
+ return (self.bugid == other.bugid) #FIXME is this enough?
+
+ def __ne__(self, other):
+ return not self.__eq__(other)
+
+
+class Interval:
+ """Interval class
+
+ Provides one interval entry for a vulnerability
+
+ Attributes:
+ name: a string representing the name of the kernel release
+ lower: a string representing the lower boundary of the interval
+ upper: a string representing the upper boundary of the interval
+ lower_i: a boolean indicating if the lower boundary is inclusive
+ upper_i: a boolean indicating if the upper boundary is inclusive
+ expand: a boolean indicating if the interval is shadowing other intervals
+ """
+
+ name = str()
+ lower = str()
+ upper = str()
+ lower_i = bool()
+ upper_i = bool()
+ expand = str()
+
+ def __init__(self, name, lower, upper, lower_i, upper_i, expand):
+ if name == 'linux' or name == 'genpatches':
+ pass
+ elif name == 'gp':
+ name = 'genpatches'
+
+ name = name.replace('-sources', '')
+
+ self.name = name
+ self.lower_i = lower_i
+ self.upper_i = upper_i
+ if name == 'genpatches':
+ if lower:
+ self.lower = lower.replace('-','.')
+ else:
+ self.lower = lower
+ if upper:
+ self.upper = upper.replace('-','.')
+ else:
+ self.upper = upper
+ else:
+ self.lower = lower
+ self.upper = upper
+
+ self.expand = expand
+
+
+ def __repr__(self):
+ interval = str(self.name)
+ if self.expand:
+ interval += '+'
+ interval += ' '
+ if self.lower and self.lower_i:
+ interval += '>=%s ' % (self.lower)
+ if self.lower and not self.lower_i:
+ interval += '>%s ' % (self.lower)
+ if self.upper and self.upper_i:
+ interval += '<=%s' % (self.upper)
+ if self.upper and not self.upper_i:
+ interval += '<%s' % (self.upper)
+
+ return interval
+
+
+def interval_from_xml(root):
+ 'Returns an interval from xml'
+
+ name = root.get('source')
+
+ lower = ''
+ upper = ''
+ lower_i = False
+ upper_i = False
+ expand = '' #TODO implement
+
+ if root.find('lower') is not None:
+ lower = root.find('lower').text
+ lower_i = (root.find('lower').get('inclusive') == 'true')
+
+ if root.find('upper') is not None:
+ upper = root.find('upper').text
+ upper_i = (root.find('upper').get('inclusive') == 'true')
+
+ return Interval(name, lower, upper, lower_i, upper_i, expand)
+
+
+#TODO Use exceptions
+def is_in_interval(interval, kernel, bugid=None):
+ 'Returns True if the given version is inside our specified interval'
+
+ version = str()
+
+ if interval.name == 'linux':
+ version = kernel.version
+
+ elif interval.name == 'genpatches':
+ version = kernel.version.replace('-', '.')
+
+ elif interval.name == 'hardened':
+ version = kernel.version #TODO is this correct?
+
+ elif interval.name == 'xen':
+ version = kernel.version #TODO is this correct?
+
+ elif interval.name == 'vserver':
+ return False
+
+ else:
+ BUG_ON(interval.name + ' ' + bugid.bugid)
+ return False
+
+ for item in ['lower', 'upper']:
+ if getattr(interval, item):
+ result = portage.versions.vercmp(version, getattr(interval, item))
+
+ if result == None:
+ BUG_ON('Could not compare %s and %s' %
+ (getattr(interval, item),version))
+
+ if result == 0 and not getattr(interval, item + '_i'):
+ return False
+
+ if result == 0 and getattr(interval, item + '_i'):
+ return True
+
+ if item == 'lower' and result < 0:
+ return False
+
+ if item == 'upper' and result > 0:
+ return False
+
+ return True
+
+#TODO Add inline get_genpatch
+def parse_genpatch_list(directory):
+ 'Returns a list containing all genpatches from portage'
+
+ patches = list()
+ directory = os.path.join(directory, 'sys-kernel')
+
+ for sources in os.listdir(directory):
+ if '-sources' in sources:
+ for ebuild in os.listdir(os.path.join(directory, sources)):
+ if '.ebuild' in ebuild:
+ genpatch = extract_genpatch(ebuild, directory, sources)
+
+ if genpatch is not None:
+ patches.append(genpatch)
+
+ return patches
+
+
+def get_genpatch(patches, kernel):
+ 'Returns the genpatch for a specific kernel'
+
+ for item in patches:
+ if item.kernel == kernel:
+ return item
+
+ return None
+
+
+def extract_genpatch(ebuild, directory, sources):
+ 'Returns a genpatch from an ebuild'
+
+ pkg = portage.versions.catpkgsplit('sys-kernel/%s' % ebuild[:-7])
+
+ with open(os.path.join(directory, sources, ebuild), 'r') as ebuild_file:
+ content = ebuild_file.read()
+
+ try:
+ genpatch_v = REGEX['gp_version'].findall(content)[0]
+ genpatch_w = REGEX['gp_want'].findall(content)[0]
+ except:
+ return None
+
+ kernel = Kernel(pkg[1].replace('-sources', ''))
+ kernel.version = pkg[2]
+ kernel.revision = pkg[3]
+
+ genpatch = Genpatch(pkg[2] + '-' + genpatch_v)
+ genpatch.kernel = kernel
+ genpatch.base = ('base' in genpatch_w)
+ genpatch.extras = ('extras' in genpatch_w)
+
+ return genpatch
+
+
+def parse_cve_files(directory):
+ 'Returns all bug files as list'
+
+ files = list()
+
+ if (os.path.exists(directory)):
+ for item in os.listdir(directory):
+ try:
+ cve_file = read_cve_file(directory, item[:-4])
+ if cve_file is not None:
+ files.append(cve_file)
+
+ except AttributeError:
+ pass
+
+ return files
+
+
+def find_cve(cve, directory):
+ 'Returns a bug containing the cve'
+
+ for item in parse_cve_files(directory):
+ for cves in item.cves:
+ if cve == cves.cve:
+ return item
+
+ return None
+
+
+def eval_cve_files(directory, kernel, arch):
+ 'Returns a vulnerabilty evaluation'
+
+ files = parse_cve_files(directory)
+
+ if not files:
+ return None
+
+ evaluation = Evaluation()
+
+ for item in files:
+ evaluation.read += 1
+
+ if item.arch not in ARCHES:
+ BUG_ON('[Error] Wrong architecture %s in bugid: %s' %
+ (item.arch, item.bugid))
+
+ if item.arch != arch and item.arch != 'all':
+ evaluation.unaffected.append(item)
+ else:
+ evaluation.arch += 1
+
+ if is_affected(item.affected, kernel, item):
+ evaluation.affected.append(item)
+ else:
+ evaluation.unaffected.append(item)
+
+ return evaluation
+
+#TODO Remove item
+def is_affected(interval_list, kernel, item):
+ 'Returns true if a kernel is affected'
+
+ kernel_gentoo = (kernel.source == 'gentoo' and kernel.genpatch is not None)
+ kernel_affected = False
+ kernel_linux_affected = False
+ kernel_gp_affected = False
+ linux_interval = False
+ gentoo_interval = False
+
+ for interval in interval_list:
+ if interval.name == 'genpatches':
+ gentoo_interval = True
+ if kernel_gentoo:
+ if is_in_interval(interval, kernel.genpatch, item):
+ kernel_gp_affected = True
+
+ elif interval.name == 'linux':
+ linux_interval = True
+ if is_in_interval(interval, kernel, item):
+ kernel_linux_affected = True
+
+ else:
+ pass #TODO
+
+ if linux_interval:
+ if kernel_linux_affected:
+ if gentoo_interval and kernel_gentoo:
+ if kernel_gp_affected:
+ kernel_affected = True
+ else:
+ kernel_affected = False
+ else:
+ kernel_affected = True
+ else:
+ kernel_affected = False
+ else:
+ if kernel_gentoo and gentoo_interval:
+ if kernel_gp_affected:
+ kernel_affected = True
+ else:
+ kernel_affected = False
+ #TODO Implement else for hardend/xen/expand
+
+ return kernel_affected
+
+
+def compare_evaluation(kernel, compare):
+ 'Creates a comparison out of two evaluation instances'
+
+ comparison = Comparison()
+
+ if kernel.read != compare.read or kernel.arch != compare.arch:
+ BUG_ON('Kernels do not match: %s %s' % (kernel1.read, kernel2.read))
+ return
+
+ for item in kernel.affected:
+ if item not in compare.affected:
+ comparison.fixed.append(item)
+
+ for item in compare.affected:
+ if item not in kernel.affected:
+ comparison.new.append(item)
+
+ return comparison
+
+
+def read_cve_file(directory, bugid):
+ 'Read a bug file created by collector'
+
+ cves = list()
+ affected = list()
+
+ filename = os.path.join(directory, bugid + '.xml')
+
+ try:
+ with open(filename, 'r+') as xml_data:
+ memory_map = mmap.mmap(xml_data.fileno(), 0)
+ root = et.parse(memory_map).getroot()
+ except IOError:
+ return None
+
+ bugroot = root.find('bug')
+
+ vul = Vulnerability(bugroot.find('bugid').text)
+ vul.arch = bugroot.find('arch').text
+ vul.reported = bugroot.find('reported').text
+ vul.reporter = bugroot.find('reporter').text
+ vul.status = bugroot.find('status').text
+
+ affectedroot = bugroot.find('affected')
+
+ for item in affectedroot:
+ interval = interval_from_xml(item)
+ affected.append(interval)
+
+ vul.affected = affected
+
+ for item in root:
+ if item.tag == 'cve':
+ cve = Cve(item.find('cve').text)
+ cve.desc = item.find('desc').text
+ cve.published = item.find('published').text
+ cve.refs = item.find('refs').text #FIXME
+ cve.severity = item.find('severity').text
+ cve.score = item.find('score').text
+ cve.vector = item.find('vector').text
+
+ cves.append(cve)
+ vul.cves = cves
+
+ return vul
+
+#TODO Use Exceptions
+def extract_version(release):
+ 'Extracts revision, source and version out of a release tag'
+
+ match = REGEX['k_version'].match(release)
+ if not match:
+ BUG_ON('[Error] Release %s does not contain any valid information' %
+ release)
+ return None
+
+ version, rest = match.groups()
+
+ kernel = Kernel('vanilla')
+ kernel.revision = 'r0'
+ kernel.version = version
+
+ for elem in (rest or '').split('-'):
+ if elem == 'sources':
+ pass
+ elif REGEX['rc_kernel'].match(elem):
+ kernel.version += '_' + elem
+ elif REGEX['git_kernel'].match(elem):
+ kernel.source = 'git'
+ kernel.revision = 'r' + REGEX['gitd'].match(elem).groups()[0]
+ elif REGEX['r_kernel'].match(elem):
+ kernel.revision = elem
+ elif elem in KERNEL_TYPES:
+ kernel.source = elem
+ elif elem != '':
+ BUG_ON('[Error] Dropping unknown version component \'%s\', \
+ probably local tag.' % elem)
+
+ return kernel
+
+
+def all_version(source):
+ """ Given a kernel source name (e.g. vanilla), returns a Kernel object
+ for the latest revision in the tree, or None if none exists. """
+
+ versions = list()
+
+ porttree = portage.db[portage.root]['porttree']
+ matches = porttree.dbapi.xmatch('match-all',
+ 'sys-kernel/%s-sources' % source)
+
+ for item in matches:
+ best = portage.versions.catpkgsplit(item)
+ if not best:
+ continue
+
+ kernel = Kernel(best[1].replace('-sources', ''))
+ kernel.version = best[2]
+ kernel.revision = best[3]
+
+ versions.append(kernel)
+
+ return versions
+