diff --git a/src/productcomposer/cli.py b/src/productcomposer/cli.py
index 92e94ec..d77236a 100644
--- a/src/productcomposer/cli.py
+++ b/src/productcomposer/cli.py
@@ -9,11 +9,13 @@
from argparse import ArgumentParser
from xml.etree import ElementTree as ET
-import rpm
import yaml
from . import __version__
from .core.logger import logger
+from .rpm import split_nevra
+from .solv import Pool
+from .updateinfo import Updateinfo
from .wrappers import CreaterepoWrapper
from .wrappers import ModifyrepoWrapper
@@ -24,9 +26,6 @@
ET_ENCODING = "unicode"
-local_rpms = {} # hased via name
-local_updateinfos = {} # sorted by updateinfo_id
-
tree_report = {} # hashed via file name
# hardcoded defaults for now
@@ -112,7 +111,13 @@ def build(args):
if args.filename.startswith('/'):
directory = os.path.dirname(args.filename)
reposdir = args.reposdir if args.reposdir else directory + "/repos"
- scan_rpms(reposdir, yml)
+
+ pool = Pool()
+ pool.repo.add_rpms(reposdir)
+ pool.internalize()
+
+ updateinfo = Updateinfo()
+ updateinfo.add_xmls(reposdir)
if args.clean and os.path.exists(args.out):
shutil.rmtree(args.out)
@@ -120,8 +125,7 @@ def build(args):
product_base_dir = get_product_dir(yml, flavor, archlist, args.release)
kwdfile = args.filename.removesuffix('.productcompose') + '.kwd'
- create_tree(args.out, product_base_dir, yml, kwdfile, flavor, archlist)
-
+ create_tree(args.out, product_base_dir, yml, pool, updateinfo, kwdfile, flavor, archlist)
def verify(args):
parse_yaml(args.filename, args.flavor)
@@ -182,7 +186,7 @@ def run_helper(args, cwd=None, stdout=None, failmsg=None):
die("Failed to run" + args[0], details=output)
return popen.stdout.read() if stdout == subprocess.PIPE else ''
-def create_tree(outdir, product_base_dir, yml, kwdfile, flavor, archlist):
+def create_tree(outdir, product_base_dir, yml, pool, updateinfo, kwdfile, flavor, archlist):
if not os.path.exists(outdir):
os.mkdir(outdir)
@@ -210,11 +214,12 @@ def create_tree(outdir, product_base_dir, yml, kwdfile, flavor, archlist):
else:
die("Bad debug option, must be either 'split' or 'drop'")
+ included_rpms = set()
for arch in archlist:
- link_rpms_to_tree(rpmdir, yml, arch, flavor, debugdir, sourcedir)
+ included_rpms |= link_rpms_to_tree(rpmdir, yml, pool, arch, flavor, debugdir, sourcedir)
for arch in archlist:
- unpack_meta_rpms(rpmdir, yml, arch, flavor, medium=1) # only for first medium am
+ unpack_meta_rpms(rpmdir, yml, pool, arch, flavor, medium=1) # only for first medium am
post_createrepo(rpmdir, yml['name'])
if debugdir:
@@ -268,8 +273,24 @@ def create_tree(outdir, product_base_dir, yml, kwdfile, flavor, archlist):
'-d', rpmdir ]
run_helper(args)
- if local_updateinfos:
- process_updateinfos(rpmdir, yml, archlist, flavor, debugdir, sourcedir)
+ _, unmatched = updateinfo.filter_packages(included_rpms, archlist)
+ if unmatched and not "ignore_missing_packages" in yml["build_options"]:
+ die('Abort due to missing packages: ' + ", ".join(sorted([str(i) for i in unmatched])))
+
+ if updateinfo:
+ # TODO: use a temp dir
+ updateinfo_path = os.path.join(rpmdir, "updateinfo.xml")
+
+ with open(updateinfo_path, "w") as f:
+ f.write(updateinfo.to_string())
+
+ mr = ModifyrepoWrapper(
+ file=updateinfo_path,
+ directory=rpmdir,
+ )
+ mr.run_cmd()
+
+ os.unlink(updateinfo_path)
# Add License File and create extra .license directory
if os.path.exists(rpmdir + "/license.tar.gz"):
@@ -286,7 +307,7 @@ def create_tree(outdir, product_base_dir, yml, kwdfile, flavor, archlist):
mr = ModifyrepoWrapper(
file=os.path.join(rpmdir, "license.tar"),
- directory=os.path.join(rpmdir, "repodata"),
+ directory=rpmdir,
)
mr.run_cmd()
os.unlink(rpmdir + '/license.tar')
@@ -369,106 +390,6 @@ def create_checksums_file(maindir):
relname = os.path.relpath(root + '/' + name, maindir)
run_helper([chksums_tool, relname], cwd=maindir, stdout=chksums_file)
-# create a fake entry from an updateinfo package spec
-def create_updateinfo_entry(pkgentry):
- tags = {}
- for tag in 'name', 'epoch', 'version', 'release', 'arch':
- tags[tag] = pkgentry.get(tag)
- return { "tags": tags }
-
-def create_updateinfo_packagefilter(yml, archlist, flavor):
- package_filter = {}
- for arch in archlist:
- for package in create_package_list(yml['packages'], arch, flavor):
- name = package
- match = re.match(r'([^><=]*)([><=]=?)(.*)', name.replace(' ', ''))
- if match:
- name = match.group(1)
- if name not in package_filter:
- package_filter[name] = [ package ]
- else:
- package_filter[name].append(package)
- return package_filter
-
-def entry_matches_updateinfo_packagefilter(entry, package_filter):
- name = entry['tags']['name']
- if name in package_filter:
- for pfspec in package_filter[name]:
- pfname, pfop, pfepoch, pfversion, pfrelease = split_package_spec(pfspec)
- if entry_qualifies(entry, None, pfname, pfop, pfepoch, pfversion, pfrelease):
- return True
- return False
-
-# Add updateinfo.xml to metadata
-def process_updateinfos(rpmdir, yml, archlist, flavor, debugdir, sourcedir):
- missing_package = False
- package_filter = create_updateinfo_packagefilter(yml, archlist, flavor)
- uitemp = None
-
- for ufn, u in sorted(local_updateinfos.items()):
- note("Add updateinfo " + ufn)
- for update in u.findall('update'):
- needed = False
- parent = update.findall('pkglist')[0].findall('collection')[0]
-
- for pkgentry in parent.findall('package'):
- src = pkgentry.get('src')
- if os.path.exists(rpmdir + '/' + src):
- needed = True
- continue
- if debugdir and os.path.exists(debugdir + '/' + src):
- needed = True
- continue
- if sourcedir and os.path.exists(sourcedir + '/' + src):
- needed = True
- continue
- name = pkgentry.get('name')
- pkgarch = pkgentry.get('arch')
-
- # do not insist on debuginfo or source packages
- if pkgarch == 'src' or pkgarch == 'nosrc':
- parent.remove(pkgentry)
- continue
- if name.endswith('-debuginfo') or name.endswith('-debugsource'):
- parent.remove(pkgentry)
- continue
- # ignore unwanted architectures
- if pkgarch != 'noarch' and pkgarch not in archlist:
- parent.remove(pkgentry)
- continue
-
- # check if we should have this package
- if name in package_filter:
- entry = create_updateinfo_entry(pkgentry)
- if entry_matches_updateinfo_packagefilter(entry, package_filter):
- warn("package " + entry_nvra(entry) + " not found")
- missing_package = True
-
- parent.remove(pkgentry)
-
- if not needed:
- continue
-
- if not uitemp:
- uitemp = open(rpmdir + '/updateinfo.xml', 'x')
- uitemp.write("\n ")
- uitemp.write(ET.tostring(update, encoding=ET_ENCODING))
-
- if uitemp:
- uitemp.write("\n")
- uitemp.close()
-
- mr = ModifyrepoWrapper(
- file=os.path.join(rpmdir, "updateinfo.xml"),
- directory=os.path.join(rpmdir, "repodata"),
- )
- mr.run_cmd()
-
- os.unlink(rpmdir + '/updateinfo.xml')
-
- if missing_package and not 'ignore_missing_packages' in yml['build_options']:
- die('Abort due to missing packages')
-
def post_createrepo(rpmdir, product_name, content=None):
# FIXME
@@ -487,22 +408,26 @@ def post_createrepo(rpmdir, product_name, content=None):
cr.run_cmd(cwd=rpmdir, stdout=subprocess.PIPE)
-def unpack_meta_rpms(rpmdir, yml, arch, flavor, medium):
+def unpack_meta_rpms(rpmdir, yml, pool, arch, flavor, medium):
if not yml['unpack_packages']:
return
missing_package = False
- for package in create_package_list(yml['unpack_packages'], arch, flavor):
- name, op, epoch, version, release = split_package_spec(package)
- rpm = lookup_rpm(arch, name, op, epoch, version, release)
- if not rpm:
- warn("package " + package + " not found")
+ for pattern in create_package_list(yml['unpack_packages'], arch, flavor):
+ print(f"Unpacking {pattern}...")
+ rpms = pool.match(pattern, arch=arch, latest=True)
+ assert len(rpms) <= 1
+
+ if not rpms:
+ warn(f"package {pattern} not found")
missing_package = True
continue
+ rpm = rpms[0]
+
tempdir = rpmdir + "/temp"
os.mkdir(tempdir)
- run_helper(['unrpm', '-q', rpm['filename']], cwd=tempdir, failmsg=("extract " + rpm['filename']))
+ run_helper(['unrpm', '-q', rpm.location], cwd=tempdir, failmsg=f"extract {rpm.location}")
skel_dir = tempdir + "/usr/lib/skelcd/CD" + str(medium)
if os.path.exists(skel_dir):
@@ -529,77 +454,60 @@ def create_package_list(yml, arch, flavor):
return packages
-def split_package_spec(pkgspec):
- name = pkgspec
- match = re.match(r'([^><=]*)([><=]=?)(.*)', name.replace(' ', ''))
- if match:
- name = match.group(1)
- op = match.group(2)
- epoch = '0'
- version = match.group(3)
- release = None
- if ':' in version:
- (epoch, version) = version.split(':', 2)
- if '-' in version:
- (version, release) = version.rsplit('-', 2)
- return (name, op, epoch, version, release)
- return (name, None, None, None, None)
-
-
-def link_rpms_to_tree(rpmdir, yml, arch, flavor, debugdir=None, sourcedir=None):
+
+def link_rpms_to_tree(rpmdir, yml, pool, arch, flavor, debugdir=None, sourcedir=None):
+ result = set()
+
singlemode = True
if 'take_all_available_versions' in yml['build_options']:
singlemode = False
missing_package = None
for package in create_package_list(yml['packages'], arch, flavor):
- name, op, epoch, version, release = split_package_spec(package)
- if singlemode:
- rpm = lookup_rpm(arch, name, op, epoch, version, release)
- rpms = [rpm] if rpm else []
- else:
- rpms = lookup_all_rpms(arch, name, op, epoch, version, release)
+ rpms = pool.match(package, arch=arch, latest=singlemode)
if not rpms:
- warn("package " + package + " not found for " + arch)
+ warn(f"package '{package}' not found for architecture '{arch}'")
missing_package = True
continue
for rpm in rpms:
+ assert rpm.arch in [arch, "noarch"]
+
link_entry_into_dir(rpm, rpmdir)
+ result.add(rpm)
- match = re.match(r'^(.*)-([^-]*)-([^-]*)\.([^\.]*)\.rpm$', rpm['tags']['sourcerpm'])
- if not match:
- warn("package " + entry_nvra(rpm) + " does not have a source rpm")
- continue
+ srpm_name, srpm_epoch, srpm_version, srpm_release, srpm_arch = split_nevra(rpm.sourcerpm)
- source_package_name = match.group(1)
- # no chance to get a epoch from file name
- source_package_version = match.group(2)
- source_package_release = match.group(3)
- source_package_arch = match.group(4)
if sourcedir:
# so we need to add also the src rpm
- srpm = lookup_rpm(source_package_arch, source_package_name, '=', None, source_package_version, source_package_release)
- if srpm:
- link_entry_into_dir(srpm, sourcedir)
- else:
- details=" required by " + entry_nvra(rpm)
- warn("source rpm package " + source_package_name + "-" + source_package_version + '-' + 'source_package_release' + '.' + source_package_arch + " not found", details=details)
+ pattern = f"{srpm_name} = {srpm_version}-{srpm_release}"
+ source_rpms = pool.match(pattern, arch="src")
+
+ if not source_rpms:
+ details=f" required by {rpm}"
+ warn(f"source rpm package {rpm.sourcerpm} not found", details=details)
missing_package = True
+ for i in source_rpms:
+ link_entry_into_dir(i, sourcedir)
+
if debugdir:
- drpm = lookup_rpm(arch, source_package_name + "-debugsource", '=', None, source_package_version, source_package_release)
- if drpm:
- link_entry_into_dir(drpm, debugdir)
+ pattern = f"{srpm_name}-debugsource = {srpm_version}-{srpm_release}"
+ debugsource_rpms = pool.match(pattern, arch=arch)
+ for i in debugsource_rpms:
+ link_entry_into_dir(i, debugdir)
- drpm = lookup_rpm(arch, rpm['tags']['name'] + "-debuginfo", '=', rpm['tags']['epoch'], rpm['tags']['version'], rpm['tags']['release'])
- if drpm:
- link_entry_into_dir(drpm, debugdir)
+ pattern = f"{rpm.name}-debuginfo = {rpm.version}-{rpm.release}"
+ debuginfo_rpms = pool.match(pattern, arch=arch)
+ for i in debuginfo_rpms:
+ link_entry_into_dir(i, debugdir)
if missing_package and not 'ignore_missing_packages' in yml['build_options']:
die('Abort due to missing packages')
+ return result
+
def link_file_into_dir(filename, directory):
if not os.path.exists(directory):
os.mkdir(directory)
@@ -607,164 +515,60 @@ def link_file_into_dir(filename, directory):
if not os.path.exists(outname):
os.link(filename, outname)
+def link_entry_into_dir(rpm, directory):
+ # TODO: store list of linked files instead of maintaining `tree_report`
+ link_file_into_dir(rpm.location, directory + '/' + rpm.arch)
+ add_entry_to_report(rpm, directory)
-def link_entry_into_dir(entry, directory):
- link_file_into_dir(entry['filename'], directory + '/' + entry['tags']['arch'])
- add_entry_to_report(entry, directory)
-
-def add_entry_to_report(entry, directory):
- outname = directory + '/' + entry['tags']['arch'] + '/' + os.path.basename(entry['filename'])
+def add_entry_to_report(rpm, directory):
+ outname = directory + '/' + rpm.arch + '/' + os.path.basename(rpm.location)
# first one wins, see link_file_into_dir
if outname not in tree_report:
- tree_report[outname] = entry
+ tree_report[outname] = {
+ # TODO: set proper value to origin
+ "origin": os.path.basename(rpm.location),
+ "rpm": rpm,
+ }
def write_report_file(directory, outfile):
root = ET.Element('report')
if not directory.endswith('/'):
directory += '/'
+
+ # TODO: properly order by package nevra?
for fn, entry in sorted(tree_report.items()):
if not fn.startswith(directory):
continue
+
binary = ET.SubElement(root, 'binary')
- binary.text = 'obs://' + entry['origin']
- tags = entry['tags']
- for tag in 'name', 'epoch', 'version', 'release', 'arch', 'buildtime', 'disturl', 'license':
- if tags[tag] is None or tags[tag] == '':
- continue
- if tag == 'epoch' and tags[tag] == 0:
+ binary.text = f"obs://{entry['origin']}"
+
+ rpm = entry["rpm"]
+
+ for tag in ("name", "epoch", "version", "release", "arch", "buildtime", "disturl", "license"):
+ value = getattr(rpm, tag)
+
+ if not value:
continue
- if tag == 'arch':
- binary.set('binaryarch', str(tags[tag]))
- else:
- binary.set(tag, str(tags[tag]))
- if tags['name'].endswith('-release'):
- cpeid = read_cpeid(entry['filename'])
- if cpeid:
- binary.set('cpeid', cpeid)
- tree = ET.ElementTree(root)
- tree.write(outfile)
-def entry_qualifies(entry, arch, name, op, epoch, version, release):
- tags = entry['tags']
-
- if name and tags['name'] != name:
- return False
-
- if arch and tags['arch'] != arch:
- if arch == 'src' or arch == 'nosrc' or tags['arch'] != 'noarch':
- return False
-
- if op:
- # We must not hand over the release when the release is not required by the user
- # or the equal case will never be true.
- tepoch = tags['epoch'] if epoch is not None else None
- trelease = tags['release'] if release is not None else None
- cmp = rpm.labelCompare((tepoch, tags['version'], trelease), (epoch, version, release))
- if cmp > 0:
- return op[0] == '>'
- if cmp < 0:
- return op[0] == '<'
- return '=' in op
-
- return True
-
-def entry_nvra(entry):
- tags = entry['tags']
- return tags['name'] + "-" + tags['version'] + "-" + tags['release'] + "." + tags['arch']
-
-def lookup_all_rpms(arch, name, op=None, epoch=None, version=None, release=None):
- if not name in local_rpms:
- return []
-
- rpms = []
- for lrpm in local_rpms[name]:
- if entry_qualifies(lrpm, arch, name, op, epoch, version, release):
- rpms.append(lrpm)
- return rpms
-
-def lookup_rpm(arch, name, op=None, epoch=None, version=None, release=None):
- if not name in local_rpms:
- return None
-
- candidate = None
- for lrpm in local_rpms[name]:
- if not entry_qualifies(lrpm, arch, name, op, epoch, version, release):
- continue
- if candidate:
- # version compare
- tags = lrpm['tags']
- ctags = candidate['tags']
- if rpm.labelCompare((tags['epoch'], tags['version'], tags['release']), (ctags['epoch'], ctags['version'], ctags['release'])) <= 0:
+ if isinstance(value, int):
+ value = str(value)
+
+ if tag == "epoch" and value == "0":
continue
- candidate = lrpm
- return candidate
+ if tag == "arch":
+ tag = "binaryarch"
-def scan_rpms(directory, yml):
- # This function scans all local available rpms and builds up the
- # query database
- ts = rpm.TransactionSet()
- ts.setVSFlags(rpm._RPMVSF_NOSIGNATURES)
+ binary.set(tag, value)
+
+ if rpm.product_cpeid:
+ binary.set("cpeid", rpm.product_cpeid)
+
+ tree = ET.ElementTree(root)
+ ET.indent(root)
+ tree.write(outfile)
- for dirpath, dirs, files in os.walk(directory):
- reldirpath = os.path.relpath(dirpath, directory)
- subdirs = reldirpath.split('/')
- if len(subdirs) < 1:
- continue
- arch = subdirs[-1]
- note("scanning: " + reldirpath)
- for filename in files:
- fname = os.path.join(dirpath, filename)
- if arch == 'updateinfo':
- local_updateinfos[fname] = ET.parse(fname).getroot()
- continue
- if filename.endswith('.rpm'):
- fd = os.open(fname, os.O_RDONLY)
- h = ts.hdrFromFdno(fd)
- os.close(fd)
- tags = {}
- for tag in 'name', 'epoch', 'version', 'release', 'arch', 'sourcerpm', 'nosource', 'nopatch', 'buildtime', 'disturl', 'license':
- tags[tag] = h[tag]
-
- if not tags['sourcerpm']:
- tags['arch'] = 'nosrc' if tags['nosource'] or tags['nopatch'] else 'src'
-
- item = { 'filename': fname, 'origin':os.path.join(reldirpath, filename), 'tags': tags }
-
- # add entry to local_rpms hash
- name = tags['name']
- if not name in local_rpms:
- local_rpms[name] = []
- local_rpms[name].append(item)
-
-def cpeid_hexdecode(p):
- pout = ''
- while True:
- match = re.match(r'^(.*?)%([0-9a-fA-F][0-9a-fA-F])(.*)', p)
- if not match:
- return pout + p
- pout = pout + match.group(1) + chr(int(match.group(2), 16))
- p = match.group(3)
-
-def read_cpeid(fname):
- ts = rpm.TransactionSet()
- ts.setVSFlags(rpm._RPMVSF_NOSIGNATURES)
- fd = os.open(fname, os.O_RDONLY)
- h = ts.hdrFromFdno(fd)
- os.close(fd)
- pn = h['providename']
- pf = h['provideflags']
- pv = h['provideversion']
- if pn and pf and pv:
- idx = 0
- for p in h['providename']:
- if p == 'product-cpeid()':
- f = pf[idx]
- v = pv[idx]
- if v and f and (f & 14) == 8:
- return cpeid_hexdecode(v)
- idx = idx + 1
- return None
if __name__ == "__main__":
try:
@@ -777,4 +581,5 @@ def read_cpeid(fname):
else:
raise SystemExit(status)
+
# vim: sw=4 et
diff --git a/src/productcomposer/rpm.py b/src/productcomposer/rpm.py
new file mode 100644
index 0000000..6dbe71a
--- /dev/null
+++ b/src/productcomposer/rpm.py
@@ -0,0 +1,155 @@
+import abc
+import functools
+import os
+
+import rpm
+
+
+# TODO: document how we should deal with epoch '' and '0' - are they equal?
+
+
+def create_rpm_ts():
+ ts = rpm.TransactionSet()
+ ts.setKeyring(rpm.keyring())
+ ts.setVSFlags(rpm._RPMVSF_NOSIGNATURES | rpm._RPMVSF_NODIGESTS)
+ return ts
+
+
+def get_rpm_hdr(path, ts=None):
+ if ts is None:
+ ts = create_rpm_ts()
+ fd = os.open(path, os.O_RDONLY)
+ hdr = ts.hdrFromFdno(fd)
+ os.close(fd)
+ return hdr
+
+
+def split_nevr(nevr):
+ epoch = ""
+ name, version, release = nevr.rsplit("-", 2)
+ if ":" in version:
+ epoch, version = version.split(":")
+
+ return name, epoch, version, release
+
+
+def split_nevra(nevra):
+ # strip path
+ nevra = os.path.basename(nevra)
+
+ # strip .rpm suffix
+ if nevra.endswith(".rpm"):
+ nevra = nevra[:-4]
+
+ nevr, arch = nevra.rsplit(".", 1)
+ name, epoch, version, release = split_nevr(nevr)
+ return name, epoch, version, release, arch
+
+
+@functools.total_ordering
+class NevraBase(abc.ABC):
+
+ def __str__(self):
+ return self.nevra
+
+ def __repr__(self):
+ result = super().__repr__()
+ result = f"{result}({self.__str__()})"
+ return result
+
+ def __eq__(self, other):
+ one = (self.name, self.epoch, self.version, self.release, self.arch)
+ two = (other.name, other.epoch, other.version, other.release, other.arch)
+ return one == two
+
+ def __lt__(self, other):
+ if self.name != other.name:
+ return self.name < other.name
+ if rpm.labelCompare((self.epoch, self.version, self.release), (other.epoch, other.version, other.release)) == -1:
+ return True
+ if self.arch != other.arch:
+ return self.arch < other.arch
+ return False
+
+ @property
+ @abc.abstractmethod
+ def name(self) -> str:
+ pass
+
+ @property
+ @abc.abstractmethod
+ def epoch(self) -> str:
+ pass
+
+ @property
+ @abc.abstractmethod
+ def version(self) -> str:
+ pass
+
+ @property
+ @abc.abstractmethod
+ def release(self) -> str:
+ pass
+
+ @property
+ @abc.abstractmethod
+ def arch(self) -> str:
+ pass
+
+ @property
+ def nevra(self) -> str:
+ return f"{self.name}-{self.evr}.{self.arch}"
+
+ @property
+ def evr(self) -> str:
+ if self.epoch:
+ return f"{self.epoch}:{self.version}-{self.release}"
+ return f"{self.version}-{self.release}"
+
+ @property
+ def is_debuginfo(self):
+ return self.name.endswith(("-debuginfo", "-debugsource"))
+
+ @property
+ def is_source(self):
+ return self.arch in ["src", "nosrc"]
+
+
+class Nevra(NevraBase):
+ @classmethod
+ def from_string(cls, nevra):
+ values = split_nevra(nevra)
+ return cls(*values)
+
+ @classmethod
+ def from_dict(cls, nevra):
+ keys = ["name", "epoch", "version", "release", "arch"]
+ values = [nevra[i] for i in keys]
+ return cls(*values)
+
+ def __init__(self, name, epoch, version, release, arch):
+ self._name = name
+ self._epoch = epoch
+ self._version = version
+ self._release = release
+ self._arch = arch
+
+ @property
+ def name(self) -> str:
+ return self._name
+
+ @property
+ def epoch(self) -> str:
+ return self._epoch
+
+ @property
+ def version(self) -> str:
+ return self._version
+
+ @property
+ def release(self) -> str:
+ return self._release
+
+ @property
+ def arch(self) -> str:
+ return self._arch
diff --git a/src/productcomposer/solv.py b/src/productcomposer/solv.py
new file mode 100644
index 0000000..a8c36f6
--- /dev/null
+++ b/src/productcomposer/solv.py
@@ -0,0 +1,177 @@
+import functools
+import os
+import re
+import urllib.parse
+
+import solv
+
+from . import rpm
+
+
+SOLVABLE_DISTURL = "solvable:disturl"
+SOLVABLE_PRODUCT_CPEID = "solvable:product_cpeid"
+
+
+class CmdlineRepo:
+ def __init__(self, pool):
+ self.pool = pool
+ self.name = "@commandline"
+ self.solv_repo = pool.solv_pool.add_repo(self.name)
+ self.solv_repo.appdata = self
+ self.solv_repodata = self.solv_repo.add_repodata(solv.Repo.REPO_REUSE_REPODATA)
+ self.rpm_ts = rpm.create_rpm_ts()
+
+ def add_rpm(self, path):
+ path = os.path.abspath(path)
+ solvable = self.solv_repo.add_rpm(path, solv.Repo.REPO_REUSE_REPODATA | solv.Repo.REPO_NO_INTERNALIZE)
+ assert solvable
+ self._add_rpm_postprocess(path, solvable)
+ return solvable
+
+ def _add_rpm_postprocess(self, path, solvable):
+ # Package.disturl
+ if path:
+ # HACK: libsolv currently doesn't support loading rpm headers in the python bindings
+ hdr = rpm.get_rpm_hdr(path, ts=self.rpm_ts)
+
+ if hdr["disturl"] is not None:
+ self.solv_repodata.set_str(solvable.id, self.pool.solv_pool.str2id(SOLVABLE_DISTURL), hdr["disturl"])
+
+ # Package.product_cpeid
+ if solvable.name.endswith("-release"):
+ product_cpeid = None
+ cpeid_prefix = "product-cpeid() = "
+ for dep in solvable.lookup_deparray(solv.SOLVABLE_PROVIDES):
+ dep_str = dep.str()
+ if dep_str.startswith(cpeid_prefix):
+ product_cpeid = dep_str[len(cpeid_prefix):]
+ continue
+ if product_cpeid:
+ product_cpeid = urllib.parse.unquote(product_cpeid)
+ self.solv_repodata.set_str(solvable.id, self.pool.solv_pool.str2id(SOLVABLE_PRODUCT_CPEID), product_cpeid)
+
+ def add_rpms(self, topdir):
+ for root, dirs, files in os.walk(topdir):
+ for fn in files:
+ if not fn.endswith(".rpm"):
+ continue
+ path = os.path.join(root, fn)
+ self.add_rpm(path)
+
+
+class Pool:
+ def __init__(self):
+ self.solv_pool = solv.Pool()
+ self.repo = CmdlineRepo(self)
+
+ def internalize(self):
+ self.repo.solv_repo.internalize()
+
+ def match(self, pattern, arch, latest=False):
+ if re.search(r"[/]", pattern) is not None:
+ raise ValueError(f"Invalid package pattern: {pattern}")
+
+ if arch == "src":
+ # selects both arch and noarch packages
+ selection_flag = solv.Selection.SELECTION_SOURCE_ONLY | solv.Selection.SELECTION_REL
+ else:
+ selection_flag = solv.Selection.SELECTION_NAME | solv.Selection.SELECTION_GLOB | solv.Selection.SELECTION_REL
+
+ sel = self.solv_pool.matchdeps(pattern, selection_flag, solv.SOLVABLE_NAME)
+
+ result = []
+ for s in sel.solvables():
+ package = Package(s)
+ # for binary packages match given arch and "noarch"
+ if arch and arch != "src" and package.arch not in [arch, "noarch"]:
+ continue
+ result.append(package)
+
+ if latest and result:
+ # return the latest package for each arch
+ result_by_arch = {}
+ for package in result:
+ result_by_arch.setdefault(package.arch, []).append(package)
+ result = [sorted(i)[-1] for i in result_by_arch.values()]
+
+ return result
+
+
+class Package(rpm.NevraBase):
+ def __init__(self, solvable):
+ self.solvable = solvable
+
+ def __hash__(self):
+ return hash((self.solvable.id, self.nevra))
+
+ def __str__(self):
+ return self.nevra
+
+ def __repr__(self):
+ result = super().__repr__()
+ result = f"{result}({self.__str__()})"
+ return result
+
+ @property
+ def location(self):
+ result = self.solvable.lookup_location()
+ result = result[0]
+ return result
+
+ @property
+ def name(self):
+ return self.solvable.name
+
+ @property
+ def evr(self):
+ return self.solvable.evr
+
+ def get_parsed_evr(self):
+ epoch = ""
+ version, release = self.evr.split("-")
+ if ":" in version:
+ epoch, version = version.split(":")
+ return epoch, version, release
+
+ @property
+ def epoch(self):
+ return self.get_parsed_evr()[0]
+
+ @property
+ def version(self):
+ return self.get_parsed_evr()[1]
+
+ @property
+ def release(self):
+ return self.get_parsed_evr()[2]
+
+ @property
+ def arch(self):
+ return self.solvable.arch
+
+ @property
+ def sourcerpm(self):
+ return self.solvable.lookup_sourcepkg()
+
+ @property
+ def disturl(self):
+ return self.solvable.lookup_str(self.solvable.pool.str2id(SOLVABLE_DISTURL))
+
+ @property
+ def license(self):
+ return self.solvable.lookup_str(solv.SOLVABLE_LICENSE)
+
+ @property
+ def buildtime(self):
+ return self.solvable.lookup_num(solv.SOLVABLE_BUILDTIME)
+
+ @property
+ def product_cpeid(self):
+ return self.solvable.lookup_str(self.solvable.pool.str2id(SOLVABLE_PRODUCT_CPEID))
+
+ @property
+ def provides(self):
+ result = []
+ for dep in self.solvable.lookup_deparray(solv.SOLVABLE_PROVIDES):
+ result.append(dep.str())
+ return result
diff --git a/src/productcomposer/updateinfo.py b/src/productcomposer/updateinfo.py
new file mode 100644
index 0000000..a665987
--- /dev/null
+++ b/src/productcomposer/updateinfo.py
@@ -0,0 +1,129 @@
+import os
+from xml.etree import ElementTree as ET
+
+from . import rpm
+
+
+"""
+Files must match the following pattern: *updateinfo*.xml
+Compressed files are out of scope.
+
+TODO: join entries into one resulting updateinfo.xml
+TODO: sort the resulting updateinfo.xml
+TODO: pretty-print (ET.indent() is not enough, we need to strip some spaces as well)
+TODO: deduplicate records in __ior__() / __or__()
+"""
+
+
+class Updateinfo:
+ def __init__(self, path=None):
+ if path:
+ self.path = os.path.abspath(path)
+ self.root = ET.parse(path).getroot()
+ else:
+ self.path = None
+ self.root = ET.Element("updates")
+ self.remove_elements_without_packages = True
+
+ def __ior__(self, other):
+ self.root.extend(other.root)
+ return self
+
+ def __or__(self, other):
+ result = Updateinfo()
+ result |= self
+ result |= other
+ return result
+
+ def __bool__(self):
+ return len(self.root) > 0
+
+ def indent(self):
+ def strip_text(node):
+ if node.text and not node.text.strip():
+ node.text = None
+ for child in node:
+ strip_text(child)
+
+ strip_text(self.root)
+ ET.indent(self.root)
+
+ def to_string(self):
+ self.indent()
+ return ET.tostring(self.root).decode("utf-8")
+
+ def add_xmls(self, topdir):
+ for root, dirs, files in os.walk(topdir):
+ for fn in files:
+ if not fn.endswith(".xml"):
+ continue
+ if not "updateinfo" in fn:
+ continue
+ path = os.path.join(root, fn)
+ self |= Updateinfo(path)
+
+ def filter_packages(self, nevra_list, arch_list):
+ nevra_by_name_arch = {}
+ for nevra in nevra_list:
+ if isinstance(nevra, str):
+ nevra = rpm.Nevra.from_string(nevra)
+ key = (nevra.name, nevra.arch)
+ nevra_by_name_arch.setdefault(key, []).append(nevra)
+
+ matched = []
+ unmatched = []
+
+ for update in self.root.findall("update"):
+ for pkglist in update.findall("pkglist"):
+ for collection in pkglist.findall("collection"):
+ for package in collection.findall("package"):
+ updateinfo_nevra = rpm.Nevra.from_dict(package.attrib)
+
+ if updateinfo_nevra.is_debuginfo:
+ # remove debuginfo packages from updateinfo
+ collection.remove(package)
+ continue
+
+ if updateinfo_nevra.is_source:
+ # remove source packages from updateinfo
+ collection.remove(package)
+ continue
+
+ if updateinfo_nevra.arch not in arch_list:
+ # remove packages that do not match the provided arch list
+ collection.remove(package)
+ continue
+
+ # TODO: arch->noarch and noarch->arch transitions
+ keep = False
+ key = (updateinfo_nevra.name, updateinfo_nevra.arch)
+ nevra_list = nevra_by_name_arch.get(key, [])
+ for nevra in nevra_list:
+ # it's safe to compare, because name & arch are identical due to previous grouping and we're comparing evr only
+ if nevra >= updateinfo_nevra:
+ keep = True
+ break
+
+ if keep:
+ matched.append(updateinfo_nevra)
+ else:
+ # remove package that doesn't match any provided nevra from the nevra_list
+ collection.remove(package)
+ unmatched.append(updateinfo_nevra)
+
+ # remove that has no from
+ if self.remove_elements_without_packages and not collection.findall("package"):
+ pkglist.remove(collection)
+
+ # remove that has no from
+ if self.remove_elements_without_packages and not pkglist.findall("collection"):
+ update.remove(pkglist)
+
+ # remove that has no from
+ if self.remove_elements_without_packages and not update.findall("pkglist"):
+ self.root.remove(update)
+
+ return matched, unmatched
+
+ def sort(self):
+ self.root[:] = sorted(self.root, key=lambda child: child.find("id").text)
diff --git a/src/productcomposer/wrappers/modifyrepo.py b/src/productcomposer/wrappers/modifyrepo.py
index 99626f8..3da887d 100644
--- a/src/productcomposer/wrappers/modifyrepo.py
+++ b/src/productcomposer/wrappers/modifyrepo.py
@@ -1,3 +1,5 @@
+import os
+
from pydantic.types import DirectoryPath
from pydantic.types import FilePath
@@ -14,8 +16,8 @@ class ModifyrepoWrapper(BaseWrapper):
mdtype: str | None = Field()
def get_cmd(self):
- directory = os.path.join(self.directory, "repodata")
- cmd = ["modify", self.file, directory]
+ directory = self.directory / "repodata"
+ cmd = ["modifyrepo", self.file.as_posix(), directory.as_posix()]
cmd.append("--unique-md-filenames")
cmd.append(f"--checksum={self.checksum_type}")
diff --git a/tests/unit/test_solv.py b/tests/unit/test_solv.py
new file mode 100644
index 0000000..91b6554
--- /dev/null
+++ b/tests/unit/test_solv.py
@@ -0,0 +1,135 @@
+import pytest
+
+import solv
+
+from productcomposer.solv import Package
+from productcomposer.solv import Pool
+
+
+def add_package(pool, name, evr, arch, provides=None):
+ s = pool.repo.solv_repo.add_solvable()
+ s.name = name
+ s.evr = evr
+ s.arch = arch
+
+ provides = provides or []
+ for name, flag, evr in provides:
+ rel = pool.solv_pool.rel2id(pool.solv_pool.str2id(name), pool.solv_pool.str2id(evr), flag)
+ dep = solv.Dep(pool.solv_pool, rel)
+ s.add_provides(dep)
+
+ pool.repo._add_rpm_postprocess(path=None, solvable=s)
+
+
+@pytest.fixture
+def pool(request):
+ p = Pool()
+
+ add_package(p, "foo", "2-0", "noarch")
+ add_package(p, "foo", "2-0", "src")
+ add_package(p, "foo", "1-0", "noarch")
+ add_package(p, "foo", "1-0", "nosrc")
+ add_package(p, "bar", "1-0", "noarch")
+ add_package(p, "bar", "1-0", "src")
+
+ add_package(p, "arch-pkg", "1-0", "x86_64")
+ add_package(p, "arch-pkg", "1-0", "i586")
+ add_package(p, "arch-pkg", "1-0", "aarch64")
+ add_package(p, "arch-pkg", "1-0", "src")
+
+ add_package(p, "arch-pkg", "2-0", "x86_64")
+ add_package(p, "arch-pkg", "2-0", "i586")
+ add_package(p, "arch-pkg", "2-0", "aarch64")
+ add_package(p, "arch-pkg", "2-0", "src")
+
+ add_package(
+ p, "example-release", "1-0", "x86_64",
+ provides=[
+ ("product-cpeid()", solv.REL_EQ, "cpe%3A/o%3Avendor%3Aproduct%3Aversion%3Aupdate"),
+ ],
+ )
+
+ p.internalize()
+ return p
+
+
+def test_match_simple(pool):
+ packages = pool.match("foo", arch="x86_64")
+ assert len(packages) == 2
+ assert packages[0].nevra == "foo-2-0.noarch"
+ assert packages[1].nevra == "foo-1-0.noarch"
+
+
+def test_match_flag(pool):
+ packages = pool.match("foo > 1", arch="x86_64")
+ assert len(packages) == 1
+ assert packages[0].nevra == "foo-2-0.noarch"
+
+
+def test_match_flag_without_spaces(pool):
+ packages = pool.match("foo>1", arch="x86_64")
+ assert len(packages) == 1
+ assert packages[0].nevra == "foo-2-0.noarch"
+
+
+def test_match_invalid_characters(pool):
+ with pytest.raises(ValueError):
+ packages = pool.match("/foo", arch="x86_64")
+
+
+def test_match_latest(pool):
+ packages = pool.match("foo", arch="x86_64", latest=True)
+ assert len(packages) == 1
+ assert packages[0].nevra == "foo-2-0.noarch"
+
+
+def test_match_nosrc(pool):
+ packages = pool.match("foo = 1-0", arch="src")
+ assert len(packages) == 1
+ assert packages[0].nevra == "foo-1-0.nosrc"
+
+
+def test_match_src(pool):
+ packages = pool.match("foo = 2-0", arch="src")
+ assert len(packages) == 1
+ assert packages[0].nevra == "foo-2-0.src"
+
+
+def test_match_arch_pkg_latest(pool):
+ packages = pool.match("arch-pkg", arch="x86_64", latest=True)
+ assert len(packages) == 1
+ assert packages[0].nevra == "arch-pkg-2-0.x86_64"
+
+
+def test_match_arch_pkg_flag_latest(pool):
+ packages = pool.match("arch-pkg < 2", arch="x86_64", latest=True)
+ assert len(packages) == 1
+ assert packages[0].nevra == "arch-pkg-1-0.x86_64"
+
+
+def test_match_arch_pkg_flag_latest_arch_None(pool):
+ packages = pool.match("arch-pkg < 2", arch=None, latest=True)
+ assert len(packages) == 3
+ assert packages[0].nevra == "arch-pkg-1-0.x86_64"
+ assert packages[1].nevra == "arch-pkg-1-0.i586"
+ assert packages[2].nevra == "arch-pkg-1-0.aarch64"
+
+
+def test_match_glob(pool):
+ packages = pool.match("fo*", arch="x86_64", latest=True)
+ assert len(packages) == 1
+ assert packages[0].nevra == "foo-2-0.noarch"
+
+
+def test_product_cpeid(pool):
+ packages = pool.match("*-release", arch="x86_64")
+ assert len(packages) == 1
+ assert packages[0].nevra == "example-release-1-0.x86_64"
+ assert packages[0].product_cpeid == "cpe:/o:vendor:product:version:update"
+
+
+def test_package_cmp(pool):
+ p1 = pool.match("foo = 1-0", arch="x86_64")[0]
+ p2 = pool.match("foo = 2-0", arch="x86_64")[0]
+ assert p1 < p2
+ assert p1 != p2