netsa.util.shell — Robust Shell Pipelines

Overview

The netsa.util.shell module provides a facility for securely and efficiently running UNIX command pipelines from Python. To avoid text substitution attacks, it does not actually use the UNIX shell to process commands. In addition, it runs commands directly in a way that allows easier clean-up in the case of errors.

The following standard Python library functions provide similar capabilities, but without either sufficient text substitution protections or sufficient error-checking and recovery mechanisms:

Here are some examples, in increasing complexity, of the use of the run_parallel and run_collect functions:

Run a single process and wait for it to complete:

# Shell: rm -rf /tmp/test
run_parallel("rm -rf /tmp/test")

Start two processes and wait for both to complete:

# Shell: rm -rf /tmp/test1 & rm -rf /tmp/test2 & wait
run_parallel("rm -rf /tmp/test_dir_1",
             "rm -rf /tmp/test_dir_2")

Store the output of a command into a file:

# Shell: echo test > /tmp/testout
run_parallel(["echo test", ">/tmp/testout"])

Read the input of a command from a file (and put the ouput into another file):

# Shell: cat < /tmp/test > /tmp/testout
run_parallel(["</tmp/test", "cat", ">/tmp/testout"])

Append the output of a command to a file:

# Shell: echo test >> /tmp/testout
run_parallel(["echo test", ">>/tmp/testout"])

Pipe the output of one command into another command (and put the output into a file):

# Shell: echo test | sed 's/e/f/' > /tmp/testout
run_parallel(["echo test", "sed 's/e/f/'", ">/tmp/testout"])

Run two pipelines in parallel and wait for both to complete:

# Shell:
#    echo test | sed 's/e/f/' > /tmp/testout &
#    cat /etc/passwd | cut -f1 -d'|' > /tmp/testout2 &
#    wait
run_parallel(["echo test", "sed 's/e/f/'", ">/tmp/testout"],
             ["cat /etc/passwd", "cut -f1 -d'|'", ">/tmp/testout2"])

Run a single pipeline and collect the output and error output in the variables out and err:

# Shell: foo=`cat /etc/passwd | cut -f1 -d'|'`
(foo, foo_err) = run_collect("cat /etc/passwd", "cut -f1 -d'|'")

The following examples are more complicated, and require the use of the long forms of command and pipeline specifications. (All of the examples above have used the short-hand forms.) You should read the documentation for command and pipeline to see how the long forms and short-hand forms are related.

Run a pipeline, collect standard output of the pipeline to one file, and append standard error from all of the commands to another file:

# Shell: ( gen-data | cut -f1 -d'|' > /tmp/testout ) 2>> /tmp/testlog
run_parallel(pipeline("gen-data", "cut -f1 -d'|'", ">/tmp/testout",
                      stderr="/tmp/testlog", stderr_append=True))

Run a pipeline, collect standard output of the pipeline to one file, and collect standard error from one command to another file:

# Shell: ( gen-data 2> /tmp/testlog ) | cut -f1 -d'|' > /tmp/testout
run_parallel([command("gen-data", stderr="/tmp/testlog"),
              "cut -f1 -d'|'", ">/tmp/testout"])

Run a pipeline, collect standard output of the pipeline to a file, and ignore the potentially non-zero exit status of the gen-data command:

# Shell: (gen-data | cut -f1 -d'|' > /tmp/testout) || true
run_parallel([command("gen-data", ignore_exit_status=True),
              "cut -f1 -d'|'", ">/tmp/testout"])

Use long pipelines to process data using multiple named pipes:

# Shell:
#   mkfifo /tmp/fifo1
#   mkfifo /tmp/fifo2
#   tee /tmp/fifo1 < /etc/passwd | cut -f1 -d'|' | sort > /tmp/out1 &
#   tee /tmp/fifo2 < /tmp/fifo1 | cut -f2 -d'|' | sort > /tmp/out2 &
#   cut -f3 -d'|' < /tmp/fifo2 | sort | uniq -c > /tmp/out3 &
#   wait
run_parallel("mkfifo /tmp/fifo1",
             "mkfifo /tmp/fifo2")
run_parallel(
    ["</etc/passwd", "tee /tmp/fifo1", "cut -f1 -d'|'", ">/tmp/out1"],
    ["</tmp/fifo1", "tee /tmp/fifo2", "cut -f2 -d'|'", ">/tmp/out2"],
    ["</tmp/fifo2", "cut -f3 -d'|'", "sort", "uniq -c", ">/tmp/out3"])

Exceptions

exception netsa.util.shell.PipelineException(message, exit_statuses)

This exception represents a failure to process a pipeline in either run_parallel or run_collect. It can be triggered by any of the commands being run by the function failing (either because the file was not found or because the command’s exit status was unacceptable.) The message contains a summary of the status of all of the sub-commands at the time the problem was discovered, including stderr output for each sub-command if available.

Building Commands and Pipelines

netsa.util.shell.command(<command spec>[, stderr : str or file, stderr_append=False, ignore_exit_status=False, ignore_exit_statuses : int seq]) → command

Interprets the arguments as a “command specification”, and returns that specification as a value.

If there is only a single argument and it is a command, then a new command is returned with the options provided by this call. For example:

new_command = command(old_command, ignore_exit_status=True)

If there is only a single argument and it is a str, the string is parsed as if it were a simple shell command. (i.e. respecting single and double quotation marks, backslashes, etc.) For example:

# Shell: ls /etc
new_command = command("ls /etc")

If there is only a single argument and it is a list or a tuple, interpret it as being the argument vector for the command (with the first argument being the command to be executed.) For example:

# Shell: ls /etc
new_command = command(["ls", "/etc"])

If there are multiple arguments, each argument is taken as being one element of the argument vector, with the first bring the command to be executed. For example:

# Shell: ls /etc
new_command = command("ls", "/etc")

The following keyword arguments may be given as options to a command specification:

stderr
Filename (str) or open file object of destination for stderr.
stderr_append
True if stderr should be opened for append. Does nothing if stderr is already an open file.
ignore_exit_status
If True, then the exit status for this command is completely ignored.
ignore_exit_statuses
A list of numeric exit statuses that should not be considered errors when they are encountered.

In addition, these options may be “handed down” from the pipeline call, or from run_parallel or run_collect. If so, then options given locally to the command take precedence.

Example: Define a command spec using a single string:

# Shell: ls -lR /tmp/foo
c = command("ls -lR /tmp/foo")

Example: Define a command as the same as an old command with different options:

d = command(c, ignore_exit_status=True)

Example: Define a command using a list of strings:

# Shell: ls -lR /tmp/foo
e = command(["ls", "-lR", "/tmp/foo"])

Example: Define a command using individual string arguments:

# Shell: ls -lR /tmp/foo
f = command("ls", "-lR", "/tmp/foo")

Short-hand Form:

In the pipeline, run_parallel, and run_collect functions, commands may be given in a short-hand form where convenient. The short-hand form of a command is a single string. Here are some examples:

"ls -lR"              =>  command(["ls", "-lR"])
"echo test test a b"  =>  command(["echo", "test", "test", "a", "b"])
"echo 'test test' a"  =>  command(["echo", "test test", "a"])
"'weird program'"     =>  command(["weird program"])

There is no way to associate options with a short-hand command. If you wish to redirect error output or ignore exit statuses, you will need to use the long form.

Variable Expansion:

When commands are executed, variable expansion is performed. The expansions are provided by the argument vars to run_parallel or run_collect. Note that commands are split into arguments before this expansion occurs, which is a security measure. This means that no matter what whitespace or punctuation is in an expansion, it can’t change the sense of the command. The down side of that is that on occasions when you would like to add multiple arguments to a command, you must construct the command using the list syntax.

Expansion variable references are placed using the Python String formatting operations.

Here is an example substitution, showing how %(target)s becomes a single argument before the subsitution occurs.

("ls -lR %(target)s", vars={'target': 'bl ah"'}) =>
("ls", "-lR", "%(target)s", vars={'target': 'bl ah"'}) =>
("ls", "-lR", 'bl ah "')

If the value to be substituted implements the method get_argument_list, which takes no arguments and returns a list of strings, then those strings are included as multiple separate arguments. This is an expert technique for extending commands at call-time for use internal to APIs.

("ls -lR %(targets)s", vars={'targets': special_container}) =>
("ls", "-lR", "target1", "target2", ...)

Functions as Commands:

In addition to executable programs, Python functions may also be used as commands. This is useful if you wish to do processing of data in a sub-process as part of a pipeline without needing to have auxilliary Python script files. However, this is an advanced technique and you should fully understand the subtleties before making use of it.

When a Python function is used as a command, the process will fork as normal in preparation for executing a new command. However, instead of exec-ing a new executable, the Python function is called. When the Python function completes (either successfully or unsuccessfully), the child process exits immediately.

If you intend to use this feature, be sure that you know how the lifecycles of various objects will behave when the Python interpreter is forked and two copies are running at once.

The command function is called with vars (as given to run_parallel or run_collect) as its first argument, and the remainder of argv from calling command as its remaining arguments.

netsa.util.shell.pipeline(<pipeline spec>[, stdin : str or file, stdout : str or file, stdout_append=False, ...]) → pipeline

Interprets the arguments as a “pipeline specification”, and returns that specification as a value.

If there is only a single argument and it is a pipeline, then a new pipeline is returned with the options provided by this call. For example:

new_pipeline = pipeline(old_pipeline, stdout="/tmp/newfile")

If there is only a single argument and it is a list or a tuple, interpret it as being a list of commands and I/O redirection short-hands to run in the pipeline. For example:

# Shell: ls /etc | sort -r
new_pipeline = pipeline(["ls /etc", "sort -r"])

If there are multiple arguments, these arguments are treated as a list of commands and I/O redirection short-hands (as if they were passed as a single list.) For example:

# Shell: ls /etc | sort -r
new_pipeline = pipeline("ls /etc", "sort -r")

The following keyword arguments may be given as options to a pipeline specification:

stdin
Filename (str) or open file object of source for stdin.
stdout
Filename (str) or open file object of destination for stdout.
stdout_append
True if stdout should be opened for append. Does nothing if stdout is already an open file.

Because these options are so common, they may also be given in short-hand form. If the first command in the pipeline is a string starting with <, the remainder of the string is intepreted as a filename for stdin. If the last command in the pipeline is a string starting with > or >>, the remainder of the string is interpreted as a filename for stdout (and if >> was used, it is opened for append.)

In addition, any unrecognized keyword arguments will be provided as defaults for any command specifications used in this pipeline. (So, for example, if you give the ignore_exit_status option to pipeline, all of the commands in that pipeline will use the same value of ignore_exit_status unless they have their own overriding setting.)

Example: Define a pipeline using a list of commands:

# Shell: ls -lR /tmp/foo | sort > /tmp/testout
a = pipeline(command("ls -lR /tmp/foo"),
             command("sort"),
             stdout="/tmp/testout")

Example: Define the same pipeline using the short-hand form of commands, and the shorthand method of setting stdout:

# Shell: ls -lR /tmp/foo | sort > /tmp/testout
b = pipeline("ls -lR /tmp/foo",
             "sort",
             ">/tmp/testout")

Example: Define the same pipeline using a list instead of multiple arguments:

# Shell: ls -lR /tmp/foo | sort > /tmp/testout
c = pipeline(["ls -lR /tmp/foo",
              "sort",
              ">/tmp/testout"])

Example: Define a new pipeline which is the same as an old pipeline but with different options:

d = pipeline(c, stdout="/tmp/newout")

Short-hand Form:

In the run_parallel command, pipelines may be given in a short-hand form where convenient. The short-hand form of a pipeline is a list of commands and I/O redirection short-hands. Here are some examples:

# Shell: ls /tmp/die | xargs rm
["ls /tmp/die", "xargs rm"]  =>  pipeline(["ls /tmp/die", "xargs rm"])
# Shell: sort < /tmp/testin > /tmp/testsort
["</tmp/testin", "sort", ">/tmp/testsort"]  =>
         pipeline(["sort"], stdin="/tmp/testin", stdout="/tmp/testsort")

Note that although you can set stdin, stdout, and stdout_append using the short-hand form (by using the I/O redirection strings at the start and end of the list), you cannot set these options to open file objects, only to filenames. You also set other options to be passed down to the individual commands.

Variable Expansion:

As in command, pipelines have variable expansion. Most variable expansion happens inside the actual commands in the pipeline. However, variable expansion also occurs in filenames provided for the stdin and stdout options. For example:

# Shell: ls -lR > $output_file
pipeline("ls -lR", ">%(output_file)s")
pipeline("ls -lR", stdout="%(output_file)s")

Running Pipelines

netsa.util.shell.run_parallel(<pipeline spec>, ...[, vars : dict, ...])

Runs a series of commands (as specified by the arguments provided) by forking and establishing pipes between commands. Raises PipelineException and kills off all remaining subprocesses if any one command fails.

Each argument is passed to the pipeline function to create a new pipeline, which allows the short-hand form of pipelines (as list short-hands) to be used.

The following keyword arguments may be given as options to run_parallel:

vars
A dictionary of variable substitutions to make in the command and pipeline specifications in this run_parallel call.

Additional keyword arguments will be passed down as default values to the pipeline and command specifications making up this run_parallel call.

The run_parallel function returns the list of exit codes of the processes in each pipeline as a list of lists. Each list corresponds to a pipeline, in the order in which they were passed into the function. Each element represents a process in the pipeline, in the order they were defined in the pipeline. If a process is not run (e.g., because a process preceding it in the pipeline fails), the exit status will be None.

Example: Run three mkdirs in parallel and fail if any of them fails:

# Shell: mkdir a & mkdir b & mkdir c & wait
run_parallel("mkdir a", "mkdir b", "mkdir c")

Example: Make a fifo, then afterwards, use it to do some work. (Try making a typo in here and watch it kill everything off instead of hanging forever.)

# Shell: mkfifo test.fifo
run_parallel("mkfifo test.fifo")
# Shell:
#   cat /etc/passwd | sort -r | cut -f1 -d: > $f &
#   cat $f | sed -e 's/a/b/g' > $f2 &
#   wait
run_parallel(["cat /etc/passwd", "sort -r", "cut -f1 -d:", ">%(f)s"],
             ["cat %(f)s", "sed -e 's/a/b/g'", ">%(f2)s"],
             vars={'f': 'test.fifo', 'f2': 'test.txt'})

Example: run two pipelines in parallel, then investigate their processes’ exit statuses:

# Shell:
#   ls -l | grep ^d &
#   cat /etc/passwd | sort -r | cut -f1 -d: &
#   wait
exits = run_parallel(["ls -l", "grep ^d"],
                     ["cat /etc/passwd", "sort -r", "cut -f1 -d:"])
# If all complete successfully, exits will be:
#  [[0, 0], [0, 0, 0]]
netsa.util.shell.run_collect(<command spec>, ...[, vars : dict, ...]) → str, str

Runs a series of commands specifying a single pipeline by forking and establishing pipes between commands. The output of the final command is collected and returned in the result. stderr across all commands is returned in the result. The final result is a tuple (stdout, stderr)

Raises PipelineException and kills off all remaining subprocesses if any one command fails.

The arguments are passed as arguments to a single call of the pipeline function to create a pipeline specification. That is: each argument is a command specification. Note that this is not the same as run_parallel, which interprets its arguments as multiple pipeline specifications.

You can also redirect stderr independently for each command if needed, allowing you to send some stderr to /dev/null or another destination instead of collecting it.

Example: Reverse sort the output of ls -1 and store the output and error in the variables a_stdout and a_stderr:

# Shell: ls -1 | sort -r
(a_stdout, a_stderr) = run_collect("ls -1", "sort -r")

Example: Do the same as the above, but run ls -1 on a named directory instead of the current working directory:

# Shell: ls -1 $dir | sort -r
(b_stdout, b_stderr) = run_collect("ls -1 %(dir)s", "sort -r",
                                   vars={'dir': 'some_directory'})

Example: The following does not collect output, but instead writes it to a file. If there were any error output, it would be returned in the variable c_stderr:

# Shell: ls -1 | sort -r > test.out
(empty_stdout, c_stderr) = run_collect("ls -1", "sort -r", ">test.out")
netsa.util.shell.run_collect_files(<command spec>, ...[, vars : dict, ...]) → file, file

Runs a series of commands like run_collect, but returns open file objects for stdout and stderr instead of strings.

Example: Iterate over the lines of ls -l | sort -r and print them out with line numbers:

# Shell: ls -l | sort -r
(f_stdout, f_stderr) = run_collect_files("ls -l", "sort -r")
for (line_no, line) in enumerate(f_stdout):
    print ("%3d %s" % (line_no, line[:-1]))