Logo Search packages:      
Sourcecode: aap version File versions

popenerr.py

# This is a copy of the popen2.py module of Python 1.5.2.
# It has been modified to pass stdin as file descriptor 2 instead of 0.
# This means the child process initially uses the existing stdin.  This is
# required for "su" to work on Linux, where it must be a tty.
# After passing "su" the executed command is expected to connect stdin to file
# descriptor 2.
#
# Some things that are not obvious:
# - There are two things:
#   1. file descriptors that Unix uses, an integer number
#   2. Python file objects
# - The returned file objects must be closed, otherwise a stale handle causes
#   trouble for the next call.  It's not clear why.
# - One descriptor is used for two directions. Closing it for one directions
#   also closes it for the other direction.  But there are two file objects for
#   it that also must be closed.  Need to catch and ignore an error when
#   closing the second one.
# - On some systems a bidirectional pipe is not possible.  stderr will then not
#   be connected.

import os
import sys

# Max number of file descriptors (os.getdtablesize()???)
try:
    MAXFD = os.sysconf('SC_OPEN_MAX')
except (AttributeError, ValueError):
    MAXFD = 256

_active = []

def _cleanup():
    for inst in _active[:]:
        inst.poll()

00036 class Popen3:
    """Class representing a child process.  Normally instances are created
    by the factory functions popen2() and popen3()."""
00039     def __init__(self, cmd, capturestderr=0, bufsize=-1):
        """The parameter 'cmd' is the shell command to execute in a
        sub-process.  The 'capturestderr' flag, if true, specifies that
        the object should capture standard error output of the child process.
        The default is false.  If the 'bufsize' parameter is specified, it
        specifies the size of the I/O buffers to/from the child process."""
        _cleanup()
        n = capturestderr       # using stderr is disabled, avoid a warning
        if type(cmd) == type(''):
            cmd = ['/bin/sh', '-c', cmd]
        p2cread, p2cwrite = os.pipe()
        c2pread, c2pwrite = os.pipe()
        self.pid = os.fork()
        if self.pid == 0:
            # Child
            os.close(2)
            n = os.dup(p2cread)
            if n <> 2:
                sys.stderr.write('popen2: bad read dup: %d\n' % n)
            os.close(1)
            n = os.dup(c2pwrite)
            if n <> 1:
                sys.stderr.write('popen2: bad write dup: %d\n' % n)
            for i in range(3, MAXFD):
                try:
                    os.close(i)
                except: pass
            try:
                os.execvp(cmd[0], cmd)
            finally:
                os._exit(1)
            # Shouldn't come here, I guess
            os._exit(1)

        os.close(p2cread)
        self.tochild = os.fdopen(p2cwrite, 'w', bufsize)
        os.close(c2pwrite)
        self.fromchild = os.fdopen(c2pread, 'r', bufsize)
        try:
            self.childerr = os.fdopen(p2cwrite, 'r', bufsize)
        except:
            # On some systems sockets are not bidirectional
            self.childerr = None

        self.sts = -1 # Child not completed yet
        _active.append(self)

00086     def poll(self):
        """Return the exit status of the child process if it has finished,
        or -1 if it hasn't finished yet."""
        if self.sts < 0:
            try:
                pid, sts = os.waitpid(self.pid, os.WNOHANG)
                if pid == self.pid:
                    self.sts = sts
                    _active.remove(self)
            except os.error:
                pass
        return self.sts

00099     def wait(self):
        """Wait for and return the exit status of the child process."""
        if self.sts < 0:
            pid, sts = os.waitpid(self.pid, 0)
            if pid == self.pid:
                self.sts = sts
                _active.remove(self)
            return self.sts

def popen3(cmd, bufsize=-1):
    _cleanup()
    inst = Popen3(cmd, 1, bufsize)
    return inst.fromchild, inst.tochild, inst.childerr

def _test():
    teststr = "abc\n"

    print "testing popen3: reading from stdout..."
    r, w, e = popen3('cat <&2')
    w.write(teststr)
    w.close()
    assert r.read() == teststr
    r.close()
    try:
        e.close()
    except:
        pass

    print "testing popen3: reading from stderr..."
    r, w, e = popen3('cat /etc/group >&2')
    assert e.read() == open("/etc/group").read()
    w.close()
    r.close()
    try:
        e.close()
    except:
        pass

    print "testing popen3: reading an error message..."
    r, w, e = popen3('cat -abcdefghi')
    err = e.read()
    print "Between the ----- lines should be an error message from 'cat':"
    print "-----\n%s-----" % err
    assert err != ''
    w.close()
    r.close()
    try:
        e.close()
    except:
        pass

    _cleanup()
    assert not _active
    print "All OK"

if __name__ == '__main__':
    _test()

Generated by  Doxygen 1.6.0   Back to index