diff options
Diffstat (limited to 'sys/src/cmd/hg/mercurial/util.py')
-rw-r--r-- | sys/src/cmd/hg/mercurial/util.py | 1284 |
1 files changed, 0 insertions, 1284 deletions
diff --git a/sys/src/cmd/hg/mercurial/util.py b/sys/src/cmd/hg/mercurial/util.py deleted file mode 100644 index 02ff43d7f..000000000 --- a/sys/src/cmd/hg/mercurial/util.py +++ /dev/null @@ -1,1284 +0,0 @@ -# util.py - Mercurial utility functions and platform specfic implementations -# -# Copyright 2005 K. Thananchayan <thananck@yahoo.com> -# Copyright 2005-2007 Matt Mackall <mpm@selenic.com> -# Copyright 2006 Vadim Gelfer <vadim.gelfer@gmail.com> -# -# This software may be used and distributed according to the terms of the -# GNU General Public License version 2, incorporated herein by reference. - -"""Mercurial utility functions and platform specfic implementations. - -This contains helper routines that are independent of the SCM core and -hide platform-specific details from the core. -""" - -from i18n import _ -import error, osutil -import cStringIO, errno, re, shutil, sys, tempfile, traceback -import os, stat, time, calendar, random, textwrap -import imp - -# Python compatibility - -def sha1(s): - return _fastsha1(s) - -def _fastsha1(s): - # This function will import sha1 from hashlib or sha (whichever is - # available) and overwrite itself with it on the first call. - # Subsequent calls will go directly to the imported function. - try: - from hashlib import sha1 as _sha1 - except ImportError: - from sha import sha as _sha1 - global _fastsha1, sha1 - _fastsha1 = sha1 = _sha1 - return _sha1(s) - -import subprocess -closefds = os.name == 'posix' -def popen2(cmd): - # Setting bufsize to -1 lets the system decide the buffer size. - # The default for bufsize is 0, meaning unbuffered. This leads to - # poor performance on Mac OS X: http://bugs.python.org/issue4194 - p = subprocess.Popen(cmd, shell=True, bufsize=-1, - close_fds=closefds, - stdin=subprocess.PIPE, stdout=subprocess.PIPE) - return p.stdin, p.stdout -def popen3(cmd): - p = subprocess.Popen(cmd, shell=True, bufsize=-1, - close_fds=closefds, - stdin=subprocess.PIPE, stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - return p.stdin, p.stdout, p.stderr - -def version(): - """Return version information if available.""" - try: - import __version__ - return __version__.version - except ImportError: - return 'unknown' - -# used by parsedate -defaultdateformats = ( - '%Y-%m-%d %H:%M:%S', - '%Y-%m-%d %I:%M:%S%p', - '%Y-%m-%d %H:%M', - '%Y-%m-%d %I:%M%p', - '%Y-%m-%d', - '%m-%d', - '%m/%d', - '%m/%d/%y', - '%m/%d/%Y', - '%a %b %d %H:%M:%S %Y', - '%a %b %d %I:%M:%S%p %Y', - '%a, %d %b %Y %H:%M:%S', # GNU coreutils "/bin/date --rfc-2822" - '%b %d %H:%M:%S %Y', - '%b %d %I:%M:%S%p %Y', - '%b %d %H:%M:%S', - '%b %d %I:%M:%S%p', - '%b %d %H:%M', - '%b %d %I:%M%p', - '%b %d %Y', - '%b %d', - '%H:%M:%S', - '%I:%M:%SP', - '%H:%M', - '%I:%M%p', -) - -extendeddateformats = defaultdateformats + ( - "%Y", - "%Y-%m", - "%b", - "%b %Y", - ) - -def cachefunc(func): - '''cache the result of function calls''' - # XXX doesn't handle keywords args - cache = {} - if func.func_code.co_argcount == 1: - # we gain a small amount of time because - # we don't need to pack/unpack the list - def f(arg): - if arg not in cache: - cache[arg] = func(arg) - return cache[arg] - else: - def f(*args): - if args not in cache: - cache[args] = func(*args) - return cache[args] - - return f - -def lrucachefunc(func): - '''cache most recent results of function calls''' - cache = {} - order = [] - if func.func_code.co_argcount == 1: - def f(arg): - if arg not in cache: - if len(cache) > 20: - del cache[order.pop(0)] - cache[arg] = func(arg) - else: - order.remove(arg) - order.append(arg) - return cache[arg] - else: - def f(*args): - if args not in cache: - if len(cache) > 20: - del cache[order.pop(0)] - cache[args] = func(*args) - else: - order.remove(args) - order.append(args) - return cache[args] - - return f - -class propertycache(object): - def __init__(self, func): - self.func = func - self.name = func.__name__ - def __get__(self, obj, type=None): - result = self.func(obj) - setattr(obj, self.name, result) - return result - -def pipefilter(s, cmd): - '''filter string S through command CMD, returning its output''' - p = subprocess.Popen(cmd, shell=True, close_fds=closefds, - stdin=subprocess.PIPE, stdout=subprocess.PIPE) - pout, perr = p.communicate(s) - return pout - -def tempfilter(s, cmd): - '''filter string S through a pair of temporary files with CMD. - CMD is used as a template to create the real command to be run, - with the strings INFILE and OUTFILE replaced by the real names of - the temporary files generated.''' - inname, outname = None, None - try: - infd, inname = tempfile.mkstemp(prefix='hg-filter-in-') - fp = os.fdopen(infd, 'wb') - fp.write(s) - fp.close() - outfd, outname = tempfile.mkstemp(prefix='hg-filter-out-') - os.close(outfd) - cmd = cmd.replace('INFILE', inname) - cmd = cmd.replace('OUTFILE', outname) - code = os.system(cmd) - if sys.platform == 'OpenVMS' and code & 1: - code = 0 - if code: raise Abort(_("command '%s' failed: %s") % - (cmd, explain_exit(code))) - return open(outname, 'rb').read() - finally: - try: - if inname: os.unlink(inname) - except: pass - try: - if outname: os.unlink(outname) - except: pass - -filtertable = { - 'tempfile:': tempfilter, - 'pipe:': pipefilter, - } - -def filter(s, cmd): - "filter a string through a command that transforms its input to its output" - for name, fn in filtertable.iteritems(): - if cmd.startswith(name): - return fn(s, cmd[len(name):].lstrip()) - return pipefilter(s, cmd) - -def binary(s): - """return true if a string is binary data""" - return bool(s and '\0' in s) - -def increasingchunks(source, min=1024, max=65536): - '''return no less than min bytes per chunk while data remains, - doubling min after each chunk until it reaches max''' - def log2(x): - if not x: - return 0 - i = 0 - while x: - x >>= 1 - i += 1 - return i - 1 - - buf = [] - blen = 0 - for chunk in source: - buf.append(chunk) - blen += len(chunk) - if blen >= min: - if min < max: - min = min << 1 - nmin = 1 << log2(blen) - if nmin > min: - min = nmin - if min > max: - min = max - yield ''.join(buf) - blen = 0 - buf = [] - if buf: - yield ''.join(buf) - -Abort = error.Abort - -def always(fn): return True -def never(fn): return False - -def pathto(root, n1, n2): - '''return the relative path from one place to another. - root should use os.sep to separate directories - n1 should use os.sep to separate directories - n2 should use "/" to separate directories - returns an os.sep-separated path. - - If n1 is a relative path, it's assumed it's - relative to root. - n2 should always be relative to root. - ''' - if not n1: return localpath(n2) - if os.path.isabs(n1): - if os.path.splitdrive(root)[0] != os.path.splitdrive(n1)[0]: - return os.path.join(root, localpath(n2)) - n2 = '/'.join((pconvert(root), n2)) - a, b = splitpath(n1), n2.split('/') - a.reverse() - b.reverse() - while a and b and a[-1] == b[-1]: - a.pop() - b.pop() - b.reverse() - return os.sep.join((['..'] * len(a)) + b) or '.' - -def canonpath(root, cwd, myname): - """return the canonical path of myname, given cwd and root""" - if root == os.sep: - rootsep = os.sep - elif endswithsep(root): - rootsep = root - else: - rootsep = root + os.sep - name = myname - if not os.path.isabs(name): - name = os.path.join(root, cwd, name) - name = os.path.normpath(name) - audit_path = path_auditor(root) - if name != rootsep and name.startswith(rootsep): - name = name[len(rootsep):] - audit_path(name) - return pconvert(name) - elif name == root: - return '' - else: - # Determine whether `name' is in the hierarchy at or beneath `root', - # by iterating name=dirname(name) until that causes no change (can't - # check name == '/', because that doesn't work on windows). For each - # `name', compare dev/inode numbers. If they match, the list `rel' - # holds the reversed list of components making up the relative file - # name we want. - root_st = os.stat(root) - rel = [] - while True: - try: - name_st = os.stat(name) - except OSError: - break - if samestat(name_st, root_st): - if not rel: - # name was actually the same as root (maybe a symlink) - return '' - rel.reverse() - name = os.path.join(*rel) - audit_path(name) - return pconvert(name) - dirname, basename = os.path.split(name) - rel.append(basename) - if dirname == name: - break - name = dirname - - raise Abort('%s not under root' % myname) - -_hgexecutable = None - -def main_is_frozen(): - """return True if we are a frozen executable. - - The code supports py2exe (most common, Windows only) and tools/freeze - (portable, not much used). - """ - return (hasattr(sys, "frozen") or # new py2exe - hasattr(sys, "importers") or # old py2exe - imp.is_frozen("__main__")) # tools/freeze - -def hgexecutable(): - """return location of the 'hg' executable. - - Defaults to $HG or 'hg' in the search path. - """ - if _hgexecutable is None: - hg = os.environ.get('HG') - if hg: - set_hgexecutable(hg) - elif main_is_frozen(): - set_hgexecutable(sys.executable) - else: - set_hgexecutable(find_exe('hg') or 'hg') - return _hgexecutable - -def set_hgexecutable(path): - """set location of the 'hg' executable""" - global _hgexecutable - _hgexecutable = path - -def system(cmd, environ={}, cwd=None, onerr=None, errprefix=None): - '''enhanced shell command execution. - run with environment maybe modified, maybe in different dir. - - if command fails and onerr is None, return status. if ui object, - print error message and return status, else raise onerr object as - exception.''' - def py2shell(val): - 'convert python object into string that is useful to shell' - if val is None or val is False: - return '0' - if val is True: - return '1' - return str(val) - oldenv = {} - for k in environ: - oldenv[k] = os.environ.get(k) - if cwd is not None: - oldcwd = os.getcwd() - origcmd = cmd - if os.name == 'nt': - cmd = '"%s"' % cmd - try: - for k, v in environ.iteritems(): - os.environ[k] = py2shell(v) - os.environ['HG'] = hgexecutable() - if cwd is not None and oldcwd != cwd: - os.chdir(cwd) - rc = os.system(cmd) - if sys.platform == 'OpenVMS' and rc & 1: - rc = 0 - if rc and onerr: - errmsg = '%s %s' % (os.path.basename(origcmd.split(None, 1)[0]), - explain_exit(rc)[0]) - if errprefix: - errmsg = '%s: %s' % (errprefix, errmsg) - try: - onerr.warn(errmsg + '\n') - except AttributeError: - raise onerr(errmsg) - return rc - finally: - for k, v in oldenv.iteritems(): - if v is None: - del os.environ[k] - else: - os.environ[k] = v - if cwd is not None and oldcwd != cwd: - os.chdir(oldcwd) - -def checksignature(func): - '''wrap a function with code to check for calling errors''' - def check(*args, **kwargs): - try: - return func(*args, **kwargs) - except TypeError: - if len(traceback.extract_tb(sys.exc_info()[2])) == 1: - raise error.SignatureError - raise - - return check - -# os.path.lexists is not available on python2.3 -def lexists(filename): - "test whether a file with this name exists. does not follow symlinks" - try: - os.lstat(filename) - except: - return False - return True - -def rename(src, dst): - """forcibly rename a file""" - try: - os.rename(src, dst) - except OSError, err: # FIXME: check err (EEXIST ?) - - # On windows, rename to existing file is not allowed, so we - # must delete destination first. But if a file is open, unlink - # schedules it for delete but does not delete it. Rename - # happens immediately even for open files, so we rename - # destination to a temporary name, then delete that. Then - # rename is safe to do. - # The temporary name is chosen at random to avoid the situation - # where a file is left lying around from a previous aborted run. - # The usual race condition this introduces can't be avoided as - # we need the name to rename into, and not the file itself. Due - # to the nature of the operation however, any races will at worst - # lead to the rename failing and the current operation aborting. - - def tempname(prefix): - for tries in xrange(10): - temp = '%s-%08x' % (prefix, random.randint(0, 0xffffffff)) - if not os.path.exists(temp): - return temp - raise IOError, (errno.EEXIST, "No usable temporary filename found") - - temp = tempname(dst) - os.rename(dst, temp) - os.unlink(temp) - os.rename(src, dst) - -def unlink(f): - """unlink and remove the directory if it is empty""" - os.unlink(f) - # try removing directories that might now be empty - try: - os.removedirs(os.path.dirname(f)) - except OSError: - pass - -def copyfile(src, dest): - "copy a file, preserving mode and atime/mtime" - if os.path.islink(src): - try: - os.unlink(dest) - except: - pass - os.symlink(os.readlink(src), dest) - else: - try: - shutil.copyfile(src, dest) - shutil.copystat(src, dest) - except shutil.Error, inst: - raise Abort(str(inst)) - -def copyfiles(src, dst, hardlink=None): - """Copy a directory tree using hardlinks if possible""" - - if hardlink is None: - hardlink = (os.stat(src).st_dev == - os.stat(os.path.dirname(dst)).st_dev) - - if os.path.isdir(src): - os.mkdir(dst) - for name, kind in osutil.listdir(src): - srcname = os.path.join(src, name) - dstname = os.path.join(dst, name) - copyfiles(srcname, dstname, hardlink) - else: - if hardlink: - try: - os_link(src, dst) - except (IOError, OSError): - hardlink = False - shutil.copy(src, dst) - else: - shutil.copy(src, dst) - -class path_auditor(object): - '''ensure that a filesystem path contains no banned components. - the following properties of a path are checked: - - - under top-level .hg - - starts at the root of a windows drive - - contains ".." - - traverses a symlink (e.g. a/symlink_here/b) - - inside a nested repository''' - - def __init__(self, root): - self.audited = set() - self.auditeddir = set() - self.root = root - - def __call__(self, path): - if path in self.audited: - return - normpath = os.path.normcase(path) - parts = splitpath(normpath) - if (os.path.splitdrive(path)[0] - or parts[0].lower() in ('.hg', '.hg.', '') - or os.pardir in parts): - raise Abort(_("path contains illegal component: %s") % path) - if '.hg' in path.lower(): - lparts = [p.lower() for p in parts] - for p in '.hg', '.hg.': - if p in lparts[1:]: - pos = lparts.index(p) - base = os.path.join(*parts[:pos]) - raise Abort(_('path %r is inside repo %r') % (path, base)) - def check(prefix): - curpath = os.path.join(self.root, prefix) - try: - st = os.lstat(curpath) - except OSError, err: - # EINVAL can be raised as invalid path syntax under win32. - # They must be ignored for patterns can be checked too. - if err.errno not in (errno.ENOENT, errno.ENOTDIR, errno.EINVAL): - raise - else: - if stat.S_ISLNK(st.st_mode): - raise Abort(_('path %r traverses symbolic link %r') % - (path, prefix)) - elif (stat.S_ISDIR(st.st_mode) and - os.path.isdir(os.path.join(curpath, '.hg'))): - raise Abort(_('path %r is inside repo %r') % - (path, prefix)) - parts.pop() - prefixes = [] - while parts: - prefix = os.sep.join(parts) - if prefix in self.auditeddir: - break - check(prefix) - prefixes.append(prefix) - parts.pop() - - self.audited.add(path) - # only add prefixes to the cache after checking everything: we don't - # want to add "foo/bar/baz" before checking if there's a "foo/.hg" - self.auditeddir.update(prefixes) - -def nlinks(pathname): - """Return number of hardlinks for the given file.""" - return os.lstat(pathname).st_nlink - -if hasattr(os, 'link'): - os_link = os.link -else: - def os_link(src, dst): - raise OSError(0, _("Hardlinks not supported")) - -def lookup_reg(key, name=None, scope=None): - return None - -if os.name == 'nt': - from windows import * -else: - from posix import * - -def makelock(info, pathname): - try: - return os.symlink(info, pathname) - except OSError, why: - if why.errno == errno.EEXIST: - raise - except AttributeError: # no symlink in os - pass - - ld = os.open(pathname, os.O_CREAT | os.O_WRONLY | os.O_EXCL) - os.write(ld, info) - os.close(ld) - -def readlock(pathname): - try: - return os.readlink(pathname) - except OSError, why: - if why.errno not in (errno.EINVAL, errno.ENOSYS): - raise - except AttributeError: # no symlink in os - pass - return posixfile(pathname).read() - -def fstat(fp): - '''stat file object that may not have fileno method.''' - try: - return os.fstat(fp.fileno()) - except AttributeError: - return os.stat(fp.name) - -# File system features - -def checkcase(path): - """ - Check whether the given path is on a case-sensitive filesystem - - Requires a path (like /foo/.hg) ending with a foldable final - directory component. - """ - s1 = os.stat(path) - d, b = os.path.split(path) - p2 = os.path.join(d, b.upper()) - if path == p2: - p2 = os.path.join(d, b.lower()) - try: - s2 = os.stat(p2) - if s2 == s1: - return False - return True - except: - return True - -_fspathcache = {} -def fspath(name, root): - '''Get name in the case stored in the filesystem - - The name is either relative to root, or it is an absolute path starting - with root. Note that this function is unnecessary, and should not be - called, for case-sensitive filesystems (simply because it's expensive). - ''' - # If name is absolute, make it relative - if name.lower().startswith(root.lower()): - l = len(root) - if name[l] == os.sep or name[l] == os.altsep: - l = l + 1 - name = name[l:] - - if not os.path.exists(os.path.join(root, name)): - return None - - seps = os.sep - if os.altsep: - seps = seps + os.altsep - # Protect backslashes. This gets silly very quickly. - seps.replace('\\','\\\\') - pattern = re.compile(r'([^%s]+)|([%s]+)' % (seps, seps)) - dir = os.path.normcase(os.path.normpath(root)) - result = [] - for part, sep in pattern.findall(name): - if sep: - result.append(sep) - continue - - if dir not in _fspathcache: - _fspathcache[dir] = os.listdir(dir) - contents = _fspathcache[dir] - - lpart = part.lower() - for n in contents: - if n.lower() == lpart: - result.append(n) - break - else: - # Cannot happen, as the file exists! - result.append(part) - dir = os.path.join(dir, lpart) - - return ''.join(result) - -def checkexec(path): - """ - Check whether the given path is on a filesystem with UNIX-like exec flags - - Requires a directory (like /foo/.hg) - """ - - # VFAT on some Linux versions can flip mode but it doesn't persist - # a FS remount. Frequently we can detect it if files are created - # with exec bit on. - - try: - EXECFLAGS = stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH - fh, fn = tempfile.mkstemp("", "", path) - try: - os.close(fh) - m = os.stat(fn).st_mode & 0777 - new_file_has_exec = m & EXECFLAGS - os.chmod(fn, m ^ EXECFLAGS) - exec_flags_cannot_flip = ((os.stat(fn).st_mode & 0777) == m) - finally: - os.unlink(fn) - except (IOError, OSError): - # we don't care, the user probably won't be able to commit anyway - return False - return not (new_file_has_exec or exec_flags_cannot_flip) - -def checklink(path): - """check whether the given path is on a symlink-capable filesystem""" - # mktemp is not racy because symlink creation will fail if the - # file already exists - name = tempfile.mktemp(dir=path) - try: - os.symlink(".", name) - os.unlink(name) - return True - except (OSError, AttributeError): - return False - -def needbinarypatch(): - """return True if patches should be applied in binary mode by default.""" - return os.name == 'nt' - -def endswithsep(path): - '''Check path ends with os.sep or os.altsep.''' - return path.endswith(os.sep) or os.altsep and path.endswith(os.altsep) - -def splitpath(path): - '''Split path by os.sep. - Note that this function does not use os.altsep because this is - an alternative of simple "xxx.split(os.sep)". - It is recommended to use os.path.normpath() before using this - function if need.''' - return path.split(os.sep) - -def gui(): - '''Are we running in a GUI?''' - return os.name == "nt" or os.name == "mac" or os.environ.get("DISPLAY") - -def mktempcopy(name, emptyok=False, createmode=None): - """Create a temporary file with the same contents from name - - The permission bits are copied from the original file. - - If the temporary file is going to be truncated immediately, you - can use emptyok=True as an optimization. - - Returns the name of the temporary file. - """ - d, fn = os.path.split(name) - fd, temp = tempfile.mkstemp(prefix='.%s-' % fn, dir=d) - os.close(fd) - # Temporary files are created with mode 0600, which is usually not - # what we want. If the original file already exists, just copy - # its mode. Otherwise, manually obey umask. - try: - st_mode = os.lstat(name).st_mode & 0777 - except OSError, inst: - if inst.errno != errno.ENOENT: - raise - st_mode = createmode - if st_mode is None: - st_mode = ~umask - st_mode &= 0666 - os.chmod(temp, st_mode) - if emptyok: - return temp - try: - try: - ifp = posixfile(name, "rb") - except IOError, inst: - if inst.errno == errno.ENOENT: - return temp - if not getattr(inst, 'filename', None): - inst.filename = name - raise - ofp = posixfile(temp, "wb") - for chunk in filechunkiter(ifp): - ofp.write(chunk) - ifp.close() - ofp.close() - except: - try: os.unlink(temp) - except: pass - raise - return temp - -class atomictempfile(object): - """file-like object that atomically updates a file - - All writes will be redirected to a temporary copy of the original - file. When rename is called, the copy is renamed to the original - name, making the changes visible. - """ - def __init__(self, name, mode, createmode): - self.__name = name - self._fp = None - self.temp = mktempcopy(name, emptyok=('w' in mode), - createmode=createmode) - self._fp = posixfile(self.temp, mode) - - def __getattr__(self, name): - return getattr(self._fp, name) - - def rename(self): - if not self._fp.closed: - self._fp.close() - rename(self.temp, localpath(self.__name)) - - def __del__(self): - if not self._fp: - return - if not self._fp.closed: - try: - os.unlink(self.temp) - except: pass - self._fp.close() - -def makedirs(name, mode=None): - """recursive directory creation with parent mode inheritance""" - try: - os.mkdir(name) - if mode is not None: - os.chmod(name, mode) - return - except OSError, err: - if err.errno == errno.EEXIST: - return - if err.errno != errno.ENOENT: - raise - parent = os.path.abspath(os.path.dirname(name)) - makedirs(parent, mode) - makedirs(name, mode) - -class opener(object): - """Open files relative to a base directory - - This class is used to hide the details of COW semantics and - remote file access from higher level code. - """ - def __init__(self, base, audit=True): - self.base = base - if audit: - self.audit_path = path_auditor(base) - else: - self.audit_path = always - self.createmode = None - - @propertycache - def _can_symlink(self): - return checklink(self.base) - - def _fixfilemode(self, name): - if self.createmode is None: - return - os.chmod(name, self.createmode & 0666) - - def __call__(self, path, mode="r", text=False, atomictemp=False): - self.audit_path(path) - f = os.path.join(self.base, path) - - if not text and "b" not in mode: - mode += "b" # for that other OS - - nlink = -1 - if mode not in ("r", "rb"): - try: - nlink = nlinks(f) - except OSError: - nlink = 0 - d = os.path.dirname(f) - if not os.path.isdir(d): - makedirs(d, self.createmode) - if atomictemp: - return atomictempfile(f, mode, self.createmode) - if nlink > 1: - rename(mktempcopy(f), f) - fp = posixfile(f, mode) - if nlink == 0: - self._fixfilemode(f) - return fp - - def symlink(self, src, dst): - self.audit_path(dst) - linkname = os.path.join(self.base, dst) - try: - os.unlink(linkname) - except OSError: - pass - - dirname = os.path.dirname(linkname) - if not os.path.exists(dirname): - makedirs(dirname, self.createmode) - - if self._can_symlink: - try: - os.symlink(src, linkname) - except OSError, err: - raise OSError(err.errno, _('could not symlink to %r: %s') % - (src, err.strerror), linkname) - else: - f = self(dst, "w") - f.write(src) - f.close() - self._fixfilemode(dst) - -class chunkbuffer(object): - """Allow arbitrary sized chunks of data to be efficiently read from an - iterator over chunks of arbitrary size.""" - - def __init__(self, in_iter): - """in_iter is the iterator that's iterating over the input chunks. - targetsize is how big a buffer to try to maintain.""" - self.iter = iter(in_iter) - self.buf = '' - self.targetsize = 2**16 - - def read(self, l): - """Read L bytes of data from the iterator of chunks of data. - Returns less than L bytes if the iterator runs dry.""" - if l > len(self.buf) and self.iter: - # Clamp to a multiple of self.targetsize - targetsize = max(l, self.targetsize) - collector = cStringIO.StringIO() - collector.write(self.buf) - collected = len(self.buf) - for chunk in self.iter: - collector.write(chunk) - collected += len(chunk) - if collected >= targetsize: - break - if collected < targetsize: - self.iter = False - self.buf = collector.getvalue() - if len(self.buf) == l: - s, self.buf = str(self.buf), '' - else: - s, self.buf = self.buf[:l], buffer(self.buf, l) - return s - -def filechunkiter(f, size=65536, limit=None): - """Create a generator that produces the data in the file size - (default 65536) bytes at a time, up to optional limit (default is - to read all data). Chunks may be less than size bytes if the - chunk is the last chunk in the file, or the file is a socket or - some other type of file that sometimes reads less data than is - requested.""" - assert size >= 0 - assert limit is None or limit >= 0 - while True: - if limit is None: nbytes = size - else: nbytes = min(limit, size) - s = nbytes and f.read(nbytes) - if not s: break - if limit: limit -= len(s) - yield s - -def makedate(): - lt = time.localtime() - if lt[8] == 1 and time.daylight: - tz = time.altzone - else: - tz = time.timezone - return time.mktime(lt), tz - -def datestr(date=None, format='%a %b %d %H:%M:%S %Y %1%2'): - """represent a (unixtime, offset) tuple as a localized time. - unixtime is seconds since the epoch, and offset is the time zone's - number of seconds away from UTC. if timezone is false, do not - append time zone to string.""" - t, tz = date or makedate() - if "%1" in format or "%2" in format: - sign = (tz > 0) and "-" or "+" - minutes = abs(tz) // 60 - format = format.replace("%1", "%c%02d" % (sign, minutes // 60)) - format = format.replace("%2", "%02d" % (minutes % 60)) - s = time.strftime(format, time.gmtime(float(t) - tz)) - return s - -def shortdate(date=None): - """turn (timestamp, tzoff) tuple into iso 8631 date.""" - return datestr(date, format='%Y-%m-%d') - -def strdate(string, format, defaults=[]): - """parse a localized time string and return a (unixtime, offset) tuple. - if the string cannot be parsed, ValueError is raised.""" - def timezone(string): - tz = string.split()[-1] - if tz[0] in "+-" and len(tz) == 5 and tz[1:].isdigit(): - sign = (tz[0] == "+") and 1 or -1 - hours = int(tz[1:3]) - minutes = int(tz[3:5]) - return -sign * (hours * 60 + minutes) * 60 - if tz == "GMT" or tz == "UTC": - return 0 - return None - - # NOTE: unixtime = localunixtime + offset - offset, date = timezone(string), string - if offset != None: - date = " ".join(string.split()[:-1]) - - # add missing elements from defaults - for part in defaults: - found = [True for p in part if ("%"+p) in format] - if not found: - date += "@" + defaults[part] - format += "@%" + part[0] - - timetuple = time.strptime(date, format) - localunixtime = int(calendar.timegm(timetuple)) - if offset is None: - # local timezone - unixtime = int(time.mktime(timetuple)) - offset = unixtime - localunixtime - else: - unixtime = localunixtime + offset - return unixtime, offset - -def parsedate(date, formats=None, defaults=None): - """parse a localized date/time string and return a (unixtime, offset) tuple. - - The date may be a "unixtime offset" string or in one of the specified - formats. If the date already is a (unixtime, offset) tuple, it is returned. - """ - if not date: - return 0, 0 - if isinstance(date, tuple) and len(date) == 2: - return date - if not formats: - formats = defaultdateformats - date = date.strip() - try: - when, offset = map(int, date.split(' ')) - except ValueError: - # fill out defaults - if not defaults: - defaults = {} - now = makedate() - for part in "d mb yY HI M S".split(): - if part not in defaults: - if part[0] in "HMS": - defaults[part] = "00" - else: - defaults[part] = datestr(now, "%" + part[0]) - - for format in formats: - try: - when, offset = strdate(date, format, defaults) - except (ValueError, OverflowError): - pass - else: - break - else: - raise Abort(_('invalid date: %r ') % date) - # validate explicit (probably user-specified) date and - # time zone offset. values must fit in signed 32 bits for - # current 32-bit linux runtimes. timezones go from UTC-12 - # to UTC+14 - if abs(when) > 0x7fffffff: - raise Abort(_('date exceeds 32 bits: %d') % when) - if offset < -50400 or offset > 43200: - raise Abort(_('impossible time zone offset: %d') % offset) - return when, offset - -def matchdate(date): - """Return a function that matches a given date match specifier - - Formats include: - - '{date}' match a given date to the accuracy provided - - '<{date}' on or before a given date - - '>{date}' on or after a given date - - """ - - def lower(date): - d = dict(mb="1", d="1") - return parsedate(date, extendeddateformats, d)[0] - - def upper(date): - d = dict(mb="12", HI="23", M="59", S="59") - for days in "31 30 29".split(): - try: - d["d"] = days - return parsedate(date, extendeddateformats, d)[0] - except: - pass - d["d"] = "28" - return parsedate(date, extendeddateformats, d)[0] - - date = date.strip() - if date[0] == "<": - when = upper(date[1:]) - return lambda x: x <= when - elif date[0] == ">": - when = lower(date[1:]) - return lambda x: x >= when - elif date[0] == "-": - try: - days = int(date[1:]) - except ValueError: - raise Abort(_("invalid day spec: %s") % date[1:]) - when = makedate()[0] - days * 3600 * 24 - return lambda x: x >= when - elif " to " in date: - a, b = date.split(" to ") - start, stop = lower(a), upper(b) - return lambda x: x >= start and x <= stop - else: - start, stop = lower(date), upper(date) - return lambda x: x >= start and x <= stop - -def shortuser(user): - """Return a short representation of a user name or email address.""" - f = user.find('@') - if f >= 0: - user = user[:f] - f = user.find('<') - if f >= 0: - user = user[f+1:] - f = user.find(' ') - if f >= 0: - user = user[:f] - f = user.find('.') - if f >= 0: - user = user[:f] - return user - -def email(author): - '''get email of author.''' - r = author.find('>') - if r == -1: r = None - return author[author.find('<')+1:r] - -def ellipsis(text, maxlength=400): - """Trim string to at most maxlength (default: 400) characters.""" - if len(text) <= maxlength: - return text - else: - return "%s..." % (text[:maxlength-3]) - -def walkrepos(path, followsym=False, seen_dirs=None, recurse=False): - '''yield every hg repository under path, recursively.''' - def errhandler(err): - if err.filename == path: - raise err - if followsym and hasattr(os.path, 'samestat'): - def _add_dir_if_not_there(dirlst, dirname): - match = False - samestat = os.path.samestat - dirstat = os.stat(dirname) - for lstdirstat in dirlst: - if samestat(dirstat, lstdirstat): - match = True - break - if not match: - dirlst.append(dirstat) - return not match - else: - followsym = False - - if (seen_dirs is None) and followsym: - seen_dirs = [] - _add_dir_if_not_there(seen_dirs, path) - for root, dirs, files in os.walk(path, topdown=True, onerror=errhandler): - if '.hg' in dirs: - yield root # found a repository - qroot = os.path.join(root, '.hg', 'patches') - if os.path.isdir(os.path.join(qroot, '.hg')): - yield qroot # we have a patch queue repo here - if recurse: - # avoid recursing inside the .hg directory - dirs.remove('.hg') - else: - dirs[:] = [] # don't descend further - elif followsym: - newdirs = [] - for d in dirs: - fname = os.path.join(root, d) - if _add_dir_if_not_there(seen_dirs, fname): - if os.path.islink(fname): - for hgname in walkrepos(fname, True, seen_dirs): - yield hgname - else: - newdirs.append(d) - dirs[:] = newdirs - -_rcpath = None - -def os_rcpath(): - '''return default os-specific hgrc search path''' - path = system_rcpath() - path.extend(user_rcpath()) - path = [os.path.normpath(f) for f in path] - return path - -def rcpath(): - '''return hgrc search path. if env var HGRCPATH is set, use it. - for each item in path, if directory, use files ending in .rc, - else use item. - make HGRCPATH empty to only look in .hg/hgrc of current repo. - if no HGRCPATH, use default os-specific path.''' - global _rcpath - if _rcpath is None: - if 'HGRCPATH' in os.environ: - _rcpath = [] - for p in os.environ['HGRCPATH'].split(os.pathsep): - if not p: continue - if os.path.isdir(p): - for f, kind in osutil.listdir(p): - if f.endswith('.rc'): - _rcpath.append(os.path.join(p, f)) - else: - _rcpath.append(p) - else: - _rcpath = os_rcpath() - return _rcpath - -def bytecount(nbytes): - '''return byte count formatted as readable string, with units''' - - units = ( - (100, 1<<30, _('%.0f GB')), - (10, 1<<30, _('%.1f GB')), - (1, 1<<30, _('%.2f GB')), - (100, 1<<20, _('%.0f MB')), - (10, 1<<20, _('%.1f MB')), - (1, 1<<20, _('%.2f MB')), - (100, 1<<10, _('%.0f KB')), - (10, 1<<10, _('%.1f KB')), - (1, 1<<10, _('%.2f KB')), - (1, 1, _('%.0f bytes')), - ) - - for multiplier, divisor, format in units: - if nbytes >= divisor * multiplier: - return format % (nbytes / float(divisor)) - return units[-1][2] % nbytes - -def drop_scheme(scheme, path): - sc = scheme + ':' - if path.startswith(sc): - path = path[len(sc):] - if path.startswith('//'): - path = path[2:] - return path - -def uirepr(s): - # Avoid double backslash in Windows path repr() - return repr(s).replace('\\\\', '\\') - -def termwidth(): - if 'COLUMNS' in os.environ: - try: - return int(os.environ['COLUMNS']) - except ValueError: - pass - try: - import termios, array, fcntl - for dev in (sys.stdout, sys.stdin): - try: - try: - fd = dev.fileno() - except AttributeError: - continue - if not os.isatty(fd): - continue - arri = fcntl.ioctl(fd, termios.TIOCGWINSZ, '\0' * 8) - return array.array('h', arri)[1] - except ValueError: - pass - except ImportError: - pass - return 80 - -def wrap(line, hangindent, width=None): - if width is None: - width = termwidth() - 2 - padding = '\n' + ' ' * hangindent - return padding.join(textwrap.wrap(line, width=width - hangindent)) - -def iterlines(iterator): - for chunk in iterator: - for line in chunk.splitlines(): - yield line |