Testing subprocesses#

When using the subprocess package there are two approaches to testing:

  • Have your tests exercise the real processes being instantiated and used.

  • Mock out use of the subprocess package and provide expected output while recording interactions with the package to make sure they are as expected.

While the first of these should be preferred, it means that you need to have all the external software available everywhere you wish to run tests. Your tests will also need to make sure any dependencies of that software on an external environment are met. If that external software takes a long time to run, your tests will also take a long time to run.

These challenges can often make the second approach more practical and can be the more pragmatic approach when coupled with a mock that accurately simulates the behaviour of a subprocess. MockPopen is an attempt to provide just such a mock.

Warning

Previous versions of this mock made use of mock_calls. These are deceptively incapable of recording some information important in the use of this mock, so please switch to making assertions about all_calls and calls instead.

Example usage#

As an example, suppose you have code such as the following that you need to test:

from subprocess import Popen, PIPE


def my_func():
    process = Popen(['svn', 'ls', '-R', 'foo'], stdout=PIPE, stderr=PIPE)
    out, err = process.communicate()
    if process.returncode:
        raise RuntimeError('something bad happened')
    return out

Tests that exercise this code using MockPopen could be written as follows:

from unittest import TestCase

from testfixtures.mock import call
from testfixtures import Replacer, ShouldRaise, compare, SequenceComparison
from testfixtures.popen import MockPopen, PopenBehaviour


class TestMyFunc(TestCase):

    def setUp(self):
        self.Popen = MockPopen()
        self.r = Replacer()
        self.r.replace(dotted_path, self.Popen)
        self.addCleanup(self.r.restore)

    def test_example(self):
        # set up
        self.Popen.set_command('svn ls -R foo', stdout=b'o', stderr=b'e')

        # testing of results
        compare(my_func(), b'o')

        # testing calls were in the right order and with the correct parameters:
        process = call.Popen(['svn', 'ls', '-R', 'foo'], stderr=PIPE, stdout=PIPE)
        compare(Popen.all_calls, expected=[
            process,
            process.communicate()
        ])

    def test_example_bad_returncode(self):
        # set up
        Popen.set_command('svn ls -R foo', stdout=b'o', stderr=b'e',
                          returncode=1)

        # testing of error
        with ShouldRaise(RuntimeError('something bad happened')):
            my_func()

Passing input to processes#

If your testing requires passing input to the subprocess, you can do so by checking for the input passed to communicate() method when you check the calls on the mock as shown in this example:

def test_communicate_with_input(self):
    # setup
    Popen = MockPopen()
    Popen.set_command('a command')
    # usage
    process = Popen('a command', stdout=PIPE, stderr=PIPE, shell=True)
    out, err = process.communicate('foo')
    # test call list
    compare(Popen.all_calls, expected=[
            process.root_call,
            process.root_call.communicate('foo'),
    ])

Note

Accessing .stdin isn’t current supported by this mock.

Reading from stdout and stderr#

The stdout and stderr attributes of the mock returned by MockPopen will be file-like objects as with the real Popen and can be read as shown in this example:

def test_read_from_stdout_and_stderr(self):
    # setup
    Popen = MockPopen()
    Popen.set_command('a command', stdout=b'foo', stderr=b'bar')
    # usage
    process = Popen('a command', stdout=PIPE, stderr=PIPE, shell=True)
    compare(process.stdout.read(), expected=b'foo')
    compare(process.stderr.read(), expected=b'bar')

Warning

While these streams behave a lot like the streams of a real Popen object, they do not exhibit the deadlocking behaviour that can occur when the two streams are read as in the example above. Be very careful when reading stdout and stderr and consider using communicate() instead.

Writing to stdin#

If you set stdin=PIPE in your call to Popen then the stdin attribute of the mock returned by MockPopen will be a mock and you can then examine the write calls to it as shown in this example:

def test_write_to_stdin(self):
    # setup
    Popen = MockPopen()
    Popen.set_command('a command')
    # usage
    process = Popen('a command', stdin=PIPE, shell=True)
    process.stdin.write('some text')
    process.stdin.close()
    # test call list
    compare(Popen.all_calls, expected=[
        process.root_call,
        process.root_call.stdin.write('some text'),
        process.root_call.stdin.close(),
    ])

Specifying the return code#

Often code will need to behave differently depending on the return code of the launched process. Specifying a simulated response code, along with testing for the correct usage of wait(), can be seen in the following example:

def test_wait_and_return_code(self):
    # setup
    Popen = MockPopen()
    Popen.set_command('a command', returncode=3)
    # usage
    process = Popen('a command')
    compare(process.returncode, expected=None)
    # result checking
    compare(process.wait(), expected=3)
    compare(process.returncode, expected=3)
    # test call list
    compare(Popen.all_calls, expected=[
        call.Popen('a command'),
        call.Popen('a command').wait(),
    ])

Checking for signal sending#

Calls to send_signal(), terminate() and kill() are all recorded by the mock returned by MockPopen. However, other than being recorded, these calls do nothing. The following example doesn’t make sense for a real test of sub-process usage but does show how the mock behaves:

def test_send_signal(self):
    # setup
    Popen = MockPopen()
    Popen.set_command('a command')
    # usage
    process = Popen('a command', stdout=PIPE, stderr=PIPE, shell=True)
    process.send_signal(0)
    # result checking
    compare(Popen.all_calls, expected=[
        process.root_call,
        process.root_call.send_signal(0),
    ])

Polling a process#

The poll() method is often used as part of a loop in order to do other work while waiting for a sub-process to complete. The mock returned by MockPopen supports this by allowing the poll() method to be called a number of times before the returncode is set using the poll_count parameter as shown in the following example:

def test_poll_until_result(self):
    # setup
    Popen = MockPopen()
    Popen.set_command('a command', returncode=3, poll_count=2)
    # example usage
    process = Popen('a command')
    while process.poll() is None:
        # you'd probably have a sleep here, or go off and
        # do some other work.
        pass
    # result checking
    compare(process.returncode, expected=3)
    compare(Popen.all_calls, expected=[
        process.root_call,
        process.root_call.poll(),
        process.root_call.poll(),
        process.root_call.poll(),
    ])

Different behaviour on sequential processes#

If your code needs to call the same command but have different behaviour on each call, then you can pass a callable behaviour like this:

def test_multiple_responses(self):
    # set up
    behaviours = [
        PopenBehaviour(stderr=b'e', returncode=1),
        PopenBehaviour(stdout=b'o'),
    ]

    def behaviour(command, stdin):
        return behaviours.pop(0)

    self.Popen.set_command('svn ls -R foo', behaviour=behaviour)

    # testing of error:
    with ShouldRaise(RuntimeError('something bad happened')):
        my_func()
    # testing of second call:
    compare(my_func(), b'o')

If you need to keep state across calls, such as accumulating stdin or failing for a configurable number of calls, then wrap that behaviour up into a class:

class CustomBehaviour(object):

    def __init__(self, fail_count=1):
        self.fail_count = fail_count

    def __call__(self, command, stdin):
        while self.fail_count > 0:
            self.fail_count -= 1
            return PopenBehaviour(stderr=b'e', returncode=1)
        return PopenBehaviour(stdout=b'o')

This can then be used like this:

def test_count_down(self):
    # set up
    self.Popen.set_command('svn ls -R foo', behaviour=CustomBehaviour())
    # testing of error:
    with ShouldRaise(RuntimeError('something bad happened')):
        my_func()
    # testing of second call:
    compare(my_func(), b'o')

Using default behaviour#

If you’re testing something that needs to make many calls to many different commands that all behave the same, it can be tedious to specify the behaviour of each with set_command. For this case, MockPopen has the set_default method which can be used to set the behaviour of any command that has not been specified with set_command as shown in the following example:

def test_default_behaviour(self):
    # set up
    self.Popen.set_default(stdout=b'o', stderr=b'e')

    # testing of results
    compare(my_func(), b'o')

    # testing calls were in the right order and with the correct parameters:
    root_call = call.Popen(['svn', 'ls', '-R', 'foo'],
                           stderr=PIPE, stdout=PIPE)
    compare(Popen.all_calls, expected=[
        root_call,
        root_call.communicate()
    ])

Tracking multiple simultaneous processes#

Conversely, if you’re testing something that spins up multiple subprocesses and manages their simultaneous execution, you will want to explicitly define the behaviour of each process using set_command and then make assertions about each process using all_calls.

For example, suppose we wanted to test this function:

def process_in_batches(n):
    processes = []
    for i in range(n):
        processes.append(Popen('process --batch='+str(i),
                               stdout=PIPE, stderr=PIPE, shell=True))
    total = 0
    for process in processes:
        out, err = process.communicate()
        total += int(out)
    return total

Then you could test it as follows:

def test_multiple_processes(self):
    # set up
    self.Popen.set_command('process --batch=0', stdout=b'42')
    self.Popen.set_command('process --batch=1', stdout=b'13')

    # testing of results
    compare(process_in_batches(2), expected=55)

    # testing of process management:
    p1 = call.Popen('process --batch=0', shell=True, stderr=PIPE, stdout=PIPE)
    p2 = call.Popen('process --batch=1', shell=True, stderr=PIPE, stdout=PIPE)
    compare(Popen.all_calls, expected=[
        p1,
        p2,
        p1.communicate(),
        p2.communicate(),
    ])

Note that the order of all calls is explicitly recorded. If the order of these calls is non-deterministic due to your method of process management, you may wish to use a SequenceComparison:

def test_multiple_processes_unordered(self):
    # set up
    self.Popen.set_command('process --batch=0', stdout=b'42')
    self.Popen.set_command('process --batch=1', stdout=b'13')

    # testing of results
    compare(process_in_batches(2), expected=55)

    # testing of process management:
    p1 = call.Popen('process --batch=0', shell=True, stderr=PIPE, stdout=PIPE)
    p2 = call.Popen('process --batch=1', shell=True, stderr=PIPE, stdout=PIPE)
    compare(Popen.all_calls, expected=SequenceComparison(
        p2,
        p2.communicate(),
        p1,
        p1.communicate(),
        ordered=False
    ))