#!/usr/bin/env python # Copyright (C) 2001 Alexander S. Guy <a7r@andern.org> # Andern Research Labs # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2, or (at your option) # any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place - Suite 330, # Boston, MA 02111-1307, USA. */ # # Copyright 2001, Russell Nelson <opkg.py@russnelson.com> # Added reading in of packages. # Added missing package information fields. # Changed render_control() to __repr__(). # # Current Issues: # The API doesn't validate package information fields. It should be # throwing exceptions in the right places. # Executions of tar could silently fail. # Executions of tar *do* fail, and loudly, because you have to specify a full filename, # and tar complains if any files are missing, and the opkg spec doesn't require # people to say "./control.tar.gz" or "./control" when they package files. # It would be much better to require ./control or disallow ./control (either) # rather than letting people pick. Some freedoms aren't worth their cost. import tempfile import os import sys import glob import hashlib import re import subprocess from stat import ST_SIZE import arfile import tarfile import textwrap class Version: """A class for holding parsed package version information.""" def __init__(self, epoch, version): self.epoch = epoch self.version = version def _versioncompare(self, selfversion, refversion): if not selfversion: selfversion = "" if not refversion: refversion = "" while 1: ## first look for non-numeric version component selfm = re.match('([^0-9]*)(.*)', selfversion) #print(('selfm', selfm.groups())) (selfalpha, selfversion) = selfm.groups() refm = re.match('([^0-9]*)(.*)', refversion) #print(('refm', refm.groups()) (refalpha, refversion) = refm.groups() if (selfalpha > refalpha): return 1 elif (selfalpha < refalpha): return -1 ## now look for numeric version component (selfnum, selfversion) = re.match('([0-9]*)(.*)', selfversion).groups() (refnum, refversion) = re.match('([0-9]*)(.*)', refversion).groups() #print(('selfnum', selfnum, selfversion) #print(('refnum', refnum, refversion) if (selfnum != ''): selfnum = int(selfnum) else: selfnum = -1 if (refnum != ''): refnum = int(refnum) else: refnum = -1 if (selfnum > refnum): return 1 elif (selfnum < refnum): return -1 if selfversion == '' and refversion == '': return 0 def compare(self, ref): if (self.epoch > ref.epoch): return 1 elif (self.epoch < ref.epoch): return -1 else: self_ver_comps = re.match(r"(.+?)(-r.+)?$", self.version) ref_ver_comps = re.match(r"(.+?)(-r.+)?$", ref.version) #print((self_ver_comps.group(1), self_ver_comps.group(2))) #print((ref_ver_comps.group(1), ref_ver_comps.group(2))) r = self._versioncompare(self_ver_comps.group(1), ref_ver_comps.group(1)) if r == 0: r = self._versioncompare(self_ver_comps.group(2), ref_ver_comps.group(2)) #print("compare: %s vs %s = %d" % (self, ref, r)) return r def __str__(self): return str(self.epoch) + ":" + self.version def parse_version(versionstr): epoch = 0 # check for epoch m = re.match('([0-9]*):(.*)', versionstr) if m: (epochstr, versionstr) = m.groups() epoch = int(epochstr) return Version(epoch, versionstr) class Package: """A class for creating objects to manipulate (e.g. create) opkg packages.""" # fn: Package file path # relpath: If this argument is set, the file path is given relative to this # path when a string representation of the Package object is created. If # this argument is not set, the basename of the file path is given. def __init__(self, fn=None, relpath=None): self.package = None self.version = 'none' self.parsed_version = None self.architecture = None self.maintainer = None self.source = None self.description = None self.depends = None self.provides = None self.replaces = None self.conflicts = None self.recommends = None self.suggests = None self.section = None self.filename_header = None self.file_list = [] # md5 and size is lazy attribute, computed on demand #self.md5 = None #self.size = None self.installed_size = None self.filename = None self.file_ext_opk = "ipk" self.homepage = None self.oe = None self.priority = None self.tags = None self.fn = fn self.license = None if fn: # see if it is deb format f = open(fn, "rb") if relpath: self.filename = os.path.relpath(fn, relpath) else: self.filename = os.path.basename(fn) ## sys.stderr.write(" extracting control.tar.gz from %s\n"% (fn,)) ar = arfile.ArFile(f, fn) tarStream = ar.open("control.tar.gz") tarf = tarfile.open("control.tar.gz", "r", tarStream) try: control = tarf.extractfile("control") except KeyError: control = tarf.extractfile("./control") try: self.read_control(control) except TypeError as e: sys.stderr.write("Cannot read control file '%s' - %s\n" % (fn, e)) control.close() self.scratch_dir = None self.file_dir = None self.meta_dir = None def __getattr__(self, name): if name == "md5": self._computeFileMD5() return self.md5 elif name == 'size': return self._get_file_size() else: raise AttributeError(name) def _computeFileMD5(self): # compute the MD5. if not self.fn: self.md5 = 'Unknown' else: f = open(self.fn, "rb") sum = hashlib.md5() while True: data = f.read(1024) if not data: break sum.update(data) f.close() self.md5 = sum.hexdigest() def _get_file_size(self): if not self.fn: self.size = 0; else: stat = os.stat(self.fn) self.size = stat[ST_SIZE] return int(self.size) def read_control(self, control): import os line = control.readline() while 1: if not line: break line = line.rstrip() lineparts = re.match(r'([\w-]*?):\s*(.*)', str(line)) if lineparts: name = lineparts.group(1).lower() value = lineparts.group(2) while 1: line = control.readline() if not line: break if line[0] != ' ': break value = value + '\n' + line if name == 'size': self.size = int(value) elif name == 'md5sum': self.md5 = value elif name in self.__dict__: self.__dict__[name] = value else: print("Lost field %s, %s" % (name,value)) pass if line and line[0] == '\n': return # consumes one blank line at end of package descriptoin else: line = control.readline() pass return def _setup_scratch_area(self): self.scratch_dir = "%s/%sopkg" % (tempfile.gettempdir(), tempfile.gettempprefix()) self.file_dir = "%s/files" % (self.scratch_dir) self.meta_dir = "%s/meta" % (self.scratch_dir) os.mkdir(self.scratch_dir) os.mkdir(self.file_dir) os.mkdir(self.meta_dir) def set_package(self, package): self.package = package def get_package(self): return self.package def set_version(self, version): self.version = version self.parsed_version = parse_version(version) def get_version(self): return self.version def set_architecture(self, architecture): self.architecture = architecture def get_architecture(self): return self.architecture def set_maintainer(self, maintainer): self.maintainer = maintainer def get_maintainer(self): return self.maintainer def set_source(self, source): self.source = source def get_source(self): return self.source def set_description(self, description): self.description = description def get_description(self): return self.description def set_depends(self, depends): self.depends = depends def get_depends(self, depends): return self.depends def set_provides(self, provides): self.provides = provides def get_provides(self, provides): return self.provides def set_replaces(self, replaces): self.replaces = replaces def get_replaces(self, replaces): return self.replaces def set_conflicts(self, conflicts): self.conflicts = conflicts def get_conflicts(self, conflicts): return self.conflicts def set_suggests(self, suggests): self.suggests = suggests def get_suggests(self, suggests): return self.suggests def set_section(self, section): self.section = section def get_section(self, section): return self.section def set_license(self, license): self.license = license def get_license(self, license): return self.license def get_file_list_dir(self, directory): def check_output(*popenargs, **kwargs): """Run command with arguments and return its output as a byte string. Backported from Python 2.7 as it's implemented as pure python on stdlib. >>> check_output(['/usr/bin/python', '--version']) Python 2.6.2 """ process = subprocess.Popen(stdout=subprocess.PIPE, *popenargs, **kwargs) output, unused_err = process.communicate() retcode = process.poll() if retcode: cmd = kwargs.get("args") if cmd is None: cmd = popenargs[0] error = subprocess.CalledProcessError(retcode, cmd) error.output = output raise error return output if not self.fn: try: cmd = "find %s -name %s | head -n 1" % (directory, self.filename) rc = check_output(cmd, shell=True) if rc != "": newfn = str(rc).split()[0] # sys.stderr.write("Package '%s' with empty fn and filename is '%s' was found in '%s', updating fn\n" % (self.package, self.filename, newfn)) self.fn = newfn except OSError as e: sys.stderr.write("Cannot find current fn for package '%s' filename '%s' in dir '%s'\n(%s)\n" % (self.package, self.filename, directory, e)) except IOError as e: sys.stderr.write("Cannot find current fn for package '%s' filename '%s' in dir '%s'\n(%s)\n" % (self.package, self.filename, directory, e)) return self.get_file_list() def get_file_list(self): if not self.fn: sys.stderr.write("Package '%s' has empty fn, returning empty filelist\n" % (self.package)) return [] f = open(self.fn, "rb") ar = arfile.ArFile(f, self.fn) tarStream = ar.open("data.tar.gz") tarf = tarfile.open("data.tar.gz", "r", tarStream) self.file_list = tarf.getnames() self.file_list = map(lambda a: ["./", ""][a.startswith("./")] + a, self.file_list) f.close() return self.file_list def set_package_extension(self, ext="ipk"): self.file_ext_opk = ext def get_package_extension(self): return self.file_ext_opk def write_package(self, dirname): self._setup_scratch_area() file = open("%s/control" % self.meta_dir, 'w') file.write(str(self)) file.close() cmd = "cd %s ; tar cvz --format=gnu -f %s/control.tar.gz control" % (self.meta_dir, self.scratch_dir) cmd_out, cmd_in, cmd_err = os.popen3(cmd) while cmd_err.readline() != "": pass cmd_out.close() cmd_in.close() cmd_err.close() bits = "control.tar.gz" if self.file_list: cmd = "cd %s ; tar cvz --format=gnu -f %s/data.tar.gz" % (self.file_dir, self.scratch_dir) cmd_out, cmd_in, cmd_err = os.popen3(cmd) while cmd_err.readline() != "": pass cmd_out.close() cmd_in.close() cmd_err.close() bits = bits + " data.tar.gz" file = "%s_%s_%s.%s" % (self.package, self.version, self.architecture, self.get_package_extension()) cmd = "cd %s ; tar cvz --format=gnu -f %s/%s %s" % (self.scratch_dir, dirname, file, bits) cmd_out, cmd_in, cmd_err = os.popen3(cmd) while cmd_err.readline() != "": pass cmd_out.close() cmd_in.close() cmd_err.close() def compare_version(self, ref): """Compare package versions of self and ref""" if not self.version: print('No version for package %s' % self.package) if not ref.version: print('No version for package %s' % ref.package) if not self.parsed_version: self.parsed_version = parse_version(self.version) if not ref.parsed_version: ref.parsed_version = parse_version(ref.version) return self.parsed_version.compare(ref.parsed_version) def __str__(self): out = "" # XXX - Some checks need to be made, and some exceptions # need to be thrown. -- a7r if self.package: out = out + "Package: %s\n" % (self.package) if self.version: out = out + "Version: %s\n" % (self.version) if self.depends: out = out + "Depends: %s\n" % (self.depends) if self.provides: out = out + "Provides: %s\n" % (self.provides) if self.replaces: out = out + "Replaces: %s\n" % (self.replaces) if self.conflicts: out = out + "Conflicts: %s\n" % (self.conflicts) if self.suggests: out = out + "Suggests: %s\n" % (self.suggests) if self.recommends: out = out + "Recommends: %s\n" % (self.recommends) if self.section: out = out + "Section: %s\n" % (self.section) if self.architecture: out = out + "Architecture: %s\n" % (self.architecture) if self.maintainer: out = out + "Maintainer: %s\n" % (self.maintainer) if self.md5: out = out + "MD5Sum: %s\n" % (self.md5) if self.size: out = out + "Size: %d\n" % int(self.size) if self.installed_size: out = out + "InstalledSize: %d\n" % int(self.installed_size) if self.filename: out = out + "Filename: %s\n" % (self.filename) if self.source: out = out + "Source: %s\n" % (self.source) if self.description: printable_description = textwrap.dedent(self.description).strip() out = out + "Description: %s\n" % textwrap.fill(printable_description, width=74, initial_indent=' ', subsequent_indent=' ') if self.oe: out = out + "OE: %s\n" % (self.oe) if self.homepage: out = out + "HomePage: %s\n" % (self.homepage) if self.license: out = out + "License: %s\n" % (self.license) if self.priority: out = out + "Priority: %s\n" % (self.priority) if self.tags: out = out + "Tags: %s\n" % (self.tags) out = out + "\n" return out def __del__(self): # XXX - Why is the `os' module being yanked out before Package objects # are being destroyed? -- a7r pass class Packages: """A currently unimplemented wrapper around the opkg utility.""" def __init__(self): self.packages = {} return def add_package(self, pkg): package = pkg.package arch = pkg.architecture name = ("%s:%s" % (package, arch)) if (name not in self.packages): self.packages[name] = pkg if pkg.compare_version(self.packages[name]) >= 0: self.packages[name] = pkg return 0 else: return 1 def read_packages_file(self, fn): f = open(fn, "r") while True: pkg = Package() try: pkg.read_control(f) except TypeError as e: sys.stderr.write("Cannot read control file '%s' - %s\n" % (fn, e)) continue if pkg.get_package(): self.add_package(pkg) else: break f.close() return def write_packages_file(self, fn): f = open(fn, "w") names = list(self.packages.keys()) names.sort() for name in names: f.write(self.packages[name].__repr__()) return def keys(self): return list(self.packages.keys()) def __getitem__(self, key): return self.packages[key] if __name__ == "__main__": assert Version(0, "1.2.2-r1").compare(Version(0, "1.2.3-r0")) == -1 assert Version(0, "1.2.2-r0").compare(Version(0, "1.2.2+cvs20070308-r0")) == -1 assert Version(0, "1.2.2+cvs20070308").compare(Version(0, "1.2.2-r0")) == 1 assert Version(0, "1.2.2-r0").compare(Version(0, "1.2.2-r0")) == 0 assert Version(0, "1.2.2-r5").compare(Version(0, "1.2.2-r0")) == 1 package = Package() package.set_package("FooBar") package.set_version("0.1-fam1") package.set_architecture("arm") package.set_maintainer("Testing <testing@testing.testing>") package.set_depends("libc") package.set_description("A test of the APIs. And very long descriptions so often used in oe-core\nfoo\n\n\nbar") print("<") sys.stdout.write(str(package)) print(">") f = open("/tmp/control", "w") f.write(str(package)) f.close() f = open("/tmp/control", "r") package2 = Package() package2.read_control(f) print("<") sys.stdout.write(str(package2)) print(">") package.write_package("/tmp")