aboutsummaryrefslogtreecommitdiff
blob: 406be3408e9d64cf7fabfeafb5eb646fa0ce8fe6 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
import multiprocessing
import random
import string
from dataclasses import dataclass
from typing import List

import pytest
from pkgcheck import addons, base, sources
from pkgcheck.addons.caches import CachedAddon, CacheDisabled
from pkgcheck.checks import AsyncCheck, SkipCheck
from pkgcore.ebuild import domain, repo_objs
from pkgcore.ebuild.atom import atom
from pkgcore.ebuild.cpv import VersionedCPV
from pkgcore.ebuild.eapi import get_eapi
from pkgcore.ebuild.ebuild_src import package
from pkgcore.ebuild.misc import ChunkedDataDict, chunked_data
from pkgcore.package.metadata import factory
from pkgcore.repository import prototype
from pkgcore.repository.util import SimpleTree
from snakeoil.cli import arghparse
from snakeoil.data_source import text_data_source
from snakeoil.osutils import pjoin
from snakeoil.sequences import split_negations


@dataclass
class Profile:
    """Profile record used to create profiles in a repository."""

    path: str
    arch: str
    status: str = "stable"
    deprecated: bool = False
    defaults: List[str] = None
    eapi: str = "5"


# TODO: merge this with the pkgcore-provided equivalent
class FakePkg(package):
    def __init__(self, cpvstr, data=None, parent=None, ebuild="", **kwargs):
        if data is None:
            data = {}

        for x in ("DEPEND", "RDEPEND", "PDEPEND", "IUSE", "LICENSE"):
            data.setdefault(x, "")

        cpv = VersionedCPV(cpvstr)
        # TODO: make pkgcore generate empty shared pkg data when None is passed
        mxml = repo_objs.LocalMetadataXml("")
        shared = repo_objs.SharedPkgData(metadata_xml=mxml, manifest=None)
        super().__init__(shared, parent, cpv.category, cpv.package, cpv.fullver)
        object.__setattr__(self, "data", data)
        object.__setattr__(self, "_ebuild", ebuild)

        # custom attributes
        for attr, value in kwargs.items():
            object.__setattr__(self, attr, value)

    @property
    def eapi(self):
        return get_eapi(self.data.get("EAPI", "0"))

    @property
    def ebuild(self):
        return text_data_source(self._ebuild)


class FakeTimedPkg(package):
    __slots__ = "_mtime_"

    def __init__(self, cpvstr, mtime, data=None, shared=None, repo=None):
        if data is None:
            data = {}
        cpv = VersionedCPV(cpvstr)
        super().__init__(shared, factory(repo), cpv.category, cpv.package, cpv.fullver)
        object.__setattr__(self, "data", data)
        object.__setattr__(self, "_mtime_", mtime)


class FakeFilesDirPkg(package):
    __slots__ = ("path",)

    def __init__(self, cpvstr, repo, data=None, shared=None):
        if data is None:
            data = {}
        cpv = VersionedCPV(cpvstr)
        super().__init__(shared, factory(repo), cpv.category, cpv.package, cpv.fullver)
        object.__setattr__(self, "data", data)
        object.__setattr__(
            self,
            "path",
            pjoin(repo.location, cpv.category, cpv.package, f"{cpv.package}-{cpv.fullver}.ebuild"),
        )


class ReportTestCase:
    """Base class for verifying report generation."""

    def _assertReportSanity(self, *reports):
        for report in reports:
            assert report.__class__ in self.check_kls.known_results
            # pull desc to force a render for basic sanity checks
            assert report.desc

    def _run_check(self, check, data):
        if isinstance(data, sources.Source):
            source = data
            options = source.options
        elif isinstance(data, prototype.tree):
            options = arghparse.Namespace(target_repo=data)
            source = sources.RepoSource(options)
        else:
            if not isinstance(data, tuple):
                data = [data]
            options = arghparse.Namespace()
            source = sources.Source(options, data)

        results = []
        runner = check.runner_cls(options, source, [check])
        results.extend(runner.run())
        return results

    def assertNoReport(self, check, data, msg=""):
        if msg:
            msg = f"{msg}: "
        results = self._run_check(check, data)
        assert results == [], f"{msg}{list(report.desc for report in results)}"

    def assertReports(self, check, data):
        results = self._run_check(check, data)
        assert results, f"must get a report from {check} {data}, got none"
        self._assertReportSanity(*results)
        return results

    def assertReport(self, check, data):
        results = self.assertReports(check, data)
        results_str = "\n".join(map(str, results))
        assert len(results) == 1, f"expected one report, got {len(results)}:\n{results_str}"
        self._assertReportSanity(*results)
        result = results[0]
        return result


class FakeProfile:
    def __init__(
        self,
        masked_use={},
        stable_masked_use={},
        forced_use={},
        stable_forced_use={},
        pkg_use={},
        provides={},
        iuse_effective=[],
        use=[],
        masks=[],
        unmasks=[],
        arch="x86",
        name="none",
    ):
        self.provides_repo = SimpleTree(provides)

        self.masked_use = ChunkedDataDict()
        self.masked_use.update_from_stream(
            chunked_data(atom(k), *split_negations(v)) for k, v in masked_use.items()
        )
        self.masked_use.freeze()

        self.stable_masked_use = ChunkedDataDict()
        self.stable_masked_use.update_from_stream(
            chunked_data(atom(k), *split_negations(v)) for k, v in stable_masked_use.items()
        )
        self.stable_masked_use.freeze()

        self.forced_use = ChunkedDataDict()
        self.forced_use.update_from_stream(
            chunked_data(atom(k), *split_negations(v)) for k, v in forced_use.items()
        )
        self.forced_use.freeze()

        self.stable_forced_use = ChunkedDataDict()
        self.stable_forced_use.update_from_stream(
            chunked_data(atom(k), *split_negations(v)) for k, v in stable_forced_use.items()
        )
        self.stable_forced_use.freeze()

        self.pkg_use = ChunkedDataDict()
        self.pkg_use.update_from_stream(
            chunked_data(atom(k), *split_negations(v)) for k, v in pkg_use.items()
        )
        self.pkg_use.freeze()

        self.masks = tuple(map(atom, masks))
        self.unmasks = tuple(map(atom, unmasks))
        self.iuse_effective = set(iuse_effective)
        self.use = set(use)
        self.key = arch
        self.name = name

        vfilter = domain.generate_filter(self.masks, self.unmasks)
        self.visible = vfilter.match


# TODO: move to snakeoil.test or somewhere more generic
class Tmpdir:
    """Provide access to a temporary directory across all test methods."""

    @pytest.fixture(autouse=True)
    def _create_tmpdir(self, tmpdir):
        self.dir = str(tmpdir)


def random_str(length=10):
    """Generate a random string of ASCII characters of a given length."""
    return "".join(random.choice(string.ascii_letters) for _ in range(length))


# TODO: combine this with pkgcheck.checks.init_checks()
def init_check(check_cls, options):
    """Initialize an individual check."""
    addons_map = {}
    enabled_addons = base.get_addons([check_cls])
    results_q = multiprocessing.SimpleQueue()

    # initialize required caches before other addons
    enabled_addons = sorted(enabled_addons, key=lambda x: not issubclass(x, CachedAddon))

    # check class is guaranteed to be last in the list
    try:
        for cls in enabled_addons:
            if issubclass(cls, AsyncCheck):
                addon = addons.init_addon(cls, options, addons_map, results_q=results_q)
            else:
                addon = addons.init_addon(cls, options, addons_map)

        source = sources.init_source(addon.source, options, addons_map)
    except CacheDisabled as e:
        raise SkipCheck(cls, e)

    required_addons = {base.param_name(x): addons_map[x] for x in addon.required_addons}
    return addon, required_addons, source