CERT
Software Assurance Secure Systems Organizational Security Coordinating Response Training
Skip to end of metadata
Go to start of metadata

Motivation

Suppose you have the following task: For all the SiLK flow records received on Feb 6, 2014, create eight files that approximate the following:

  1.   All HTTP traffic, http.dat
  2.   All HTTPS traffic, https.dat
  3.   All SSH traffic, ssh.dat
  4.   Any other TCP traffic, tcp.dat
  5.   All UDP-based DNS traffic, dns.dat
  6.   All DHCP traffic, dhcp.dat
  7.   Any other UDP traffic, udp.dat
  8.   Any traffic not captured above, other.dat

One way to approach the eight requests in this task is to run a separate rwfilter command for each output.  The commands to get the results for Requests 1-3 and 5-6 are straightforward.  The commands for Requests 4, 7, 8 are also simple once you realize you just need to create a list of ports or protocols that omit those used in the other queries:

rwfilter ... --pass=http.dat  --proto=6  --aport=80
rwfilter ... --pass=https.dat --proto=6  --aport=443
rwfilter ... --pass=ssh.dat   --proto=6  --aport=22
rwfilter ... --pass=tcp.dat   --proto=6  --aport=0-21,23-79,81-442,444-
rwfilter ... --pass=dns.dat   --proto=17 --aport=53
rwfilter ... --pass=dhcp.dat  --proto=17 --aport=67,68
rwfilter ... --pass=udp.dat   --proto=17 --aport=0-52,54-66,69-
rwfilter ... --pass=other.dat --proto=0-5,7-16,18-

 Where "..." represents the file selection criteria.  Since the task is for all traffic on Feb 6, 2014, replace the "..." with

--flowtype=all/all --start-date=2014/02/06

The file selection criteria are not pertinent to this discussion, so the sample code below will use "...".

For many sites, any incoming and outgoing TCP traffic on ports 80, 443, and 8080 will be written into the "inweb" and "outweb" types. The file selection criteria could be smarter and exclude the "in" and "out" types when looking for HTTP and HTTPS traffic.

The rwfilter commands assume that all traffic for the desired protocols occur on that protocol's advertised port.  If your flow records were collected with yaf (http://tools.netsa.cert.org/yaf/) and the appLabel feature (http://tools.netsa.cert.org/yaf/applabel.html) was enabled, you could replace the "--proto" and "--aport" switches with the "--application" switch.

You may realize that this is not very efficient, since each of those rwfilter commands is independently processing every record in the data repository.  If your data repository is small or if this is a one-time task, you and your system administrator may be willing to live with the inefficiency.

Manifold definition

The idea of an rwfilter "manifold" is to create many output files while only making one pass over the data in the file repository, which makes the task more efficient both in terms of resources and in the time it takes to get the results.

The rwfilter manifold uses a chain of rwfilter commands and employs both the --pass and --fail switches to create files along the chain of commands.

For example, here is a simple manifold that creates four output files---for TCP, UDP, ICMP, and OTHER protocols:

rwfilter ... --proto=6  --pass=tcp-all.dat  --fail=-                   \
  | rwfilter --proto=17 --pass=udp-all.dat  --fail=-             stdin \
  | rwfilter --proto=1  --pass=icmp-all.dat --fail=other-all.dat stdin

The first rwfilter comamnd writes all TCP flow records into "tcp-all.dat".  Any non-TCP flow records are written to the standard output ("-").

The second rwfilter command reads the first rwfilter's standard output as its standard input---note the "stdin" at the end of the second line.  (When looking at existing uses of the manifold, instead of seeing a "stdin" argument you may see it expressed using the command line switch "--input-pipe=stdin".  The forms are equivalent, though note that the "--input-pipe" switch is deprecated.)  Any UDP flow records are written to the "udp-all.dat" file, and all non-UDP flows are written to the standard output.

The third rwfilter command reads the second's standard output.  The ICMP traffic is written to the file "icmp-all.dat", and all remaining traffic is written to "other-all.dat".

From within Python

To run a chain of rwfilter commands in Python, consider using the utilities available in the netsa.util.shell module (http://tools.netsa.cert.org/netsa-python/doc/netsa_util_shell.html) that is part of the netsa-python library (http://tools.netsa.cert.org/netsa-python/).

The rwfilter commands that comprise the manifold could be written using netsa-python as:

from netsa.util.shell import *
c1 = command("rwfilter ... --proto=6 --pass=tcp-all.dat --fail=-")
c2 = command("rwfilter --proto=17 --pass=udp-all.dat --fail=- stdin")
c3 = command("rwfilter --proto=1  --pass=icmp-all.dat"
             + " --fail=other-all.dat stdin")
run_parallel(pipeline(c1, c2, c3))

Writing the manifold

The rwfilter manifold is a powerful idea, and composing the rwfilter commands is fairly simple as long as you are pulling data out of the stream at every step.

To return to the task defined at the beginning of this document: Since the set of records returned by the each of the requests in the task do not overlap, we can get the results using a simple manifold. Our manifold assumes that the data is sane---for example, we assume that no traffic goes from port 80 to port 22---and we use a "first-match wins" rule.

The easiest way to write the manifold is as a single chain of rwfilter commands, where each rwfilter command removes some of the records.  (This chain uses the command line argument of "-" to tell rwfilter to read from the standard input, and it is equivalent to the "stdin" command line argument used above.)

rwfilter ... --proto=6  --aport=80    --pass=http.dat  --fail=-           \
  | rwfilter --proto=6  --aport=443   --pass=https.dat --fail=-         - \
  | rwfilter --proto=6  --aport=22    --pass=ssh.dat   --fail=-         - \
  | rwfilter --proto=6                --pass=tcp.dat   --fail=-         - \
  | rwfilter --proto=17               --pass=-         --fail=other.dat - \
  | rwfilter            --aport=53    --pass=dns.dat   --fail=-         - \
  | rwfilter            --aport=67,68 --pass=dhcp.dat  --fail=udp.dat   -

The first four rwfilter commands create the files for Requests 1-4. The fourth rwfilter command does not need to specify a port list since the data for ports 22, 80, and 443 has already been removed.

Note that the fifth rwfilter command sends records that pass the filter to the standard output and writes records that fail the filter to a file.  This rwfilter command creates the file for Request 8.

The sixth rwfilter command handles Request 5.  The --proto switch is no longer required since we know all the flow records represent UDP traffic.

The seventh rwfilter command handles Requests 6 and 7.

The manifold in Python

To write that manifold using the netsa.util.shell module of the netsa-python library:

from netsa.util.shell import *
pl = ["rwfilter ... --proto=6 --aport=80  --pass=http.dat  --fail=-",
      "rwfilter     --proto=6 --aport=443 --pass=https.dat --fail=-         -",
      "rwfilter     --proto=6 --aport=22  --pass=ssh.dat   --fail=-         -",
      "rwfilter     --proto=6             --pass=tcp.dat   --fail=-         -",
      "rwfilter     --proto=17            --pass=-         --fail=other.dat -",
      "rwfilter     --aport=53            --pass=dns.dat   --fail=-         -",
      "rwfilter     --aport=67,68         --pass=dhcp.dat  --fail=udp.dat   -"]
run_parallel(pipeline(pl))

Instead of explicitly using the command() constructor as in the previous example, we hand a list of strings to the pipeline() constructor.

The manifold and named pipes

This single chain of rwfilter commands is straightforward, but there is still some inefficiency: The TCP check occurs in each of the first four rwfilter commands.  If the data set is small, you may not care about this inefficiency.

A more efficient approach is to split the TCP traffic into a separate chain of rwfilter commands.  This speeds the query in two ways:

  • The chain handling TCP traffic is no longer reading and writing the records for UDP and other protocols traffic.
  • The two chains can run in parallel.

To split the traffic (and run on it in parallel), you need to use a UNIX construct called a named pipe.  A named pipe (also known as a FIFO [first in, first out]), operates like a traditional UNIX pipe except that it is "named" by being represented in the file system.

To create a named pipe, use the "mkfifo" command and give a location in the file system where you want to create the FIFO.

mkfifo /tmp/fifo1

Once you create a named pipe, you can almost treat it as a standard file by writing to it and reading from it.  However, a process that is writing to the named pipe will block (not complete) until there is a process that is reading the data.  Likewise, a process that is reading from the named pipe will block until another process writes its data to the named pipe.

Because of the potential for processes to block, one normally enters the command that reads from the named pipe first and creates it as a background process, and then one creates the process that writes to the named pipe.

For example, the shell command "ls | sort -r" prints the entries in the current directory in reverse order.  To do this using the named pipe "/tmp/fifo1", you use:

sort -r /tmp/fifo1 &
ls > /tmp/fifo1

Create the read process first (the process that would go after the "|" when using an unnamed-pipe), then create the write process (the process that would go before the "|").

Before we introduce the named pipe into the rwfilter manifold, let us determine the rwfilter commands we would use in the shell if we were using temporary files.

The rwfilter command to divide traffic into TCP and into non-TCP is

rwfilter ... --proto=6 --pass=all-tcp.dat --fail=non-tcp.dat

The output for Requests 1-4 can be created by using an rwfilter manifold where the first rwfilter command reads the "all-tcp.dat" file:

rwfilter     --aport=80  --pass=http.dat  --fail=-       all-tcp.dat \
  | rwfilter --aport=443 --pass=https.dat --fail=-       -           \
  | rwfilter --aport=22  --pass=ssh.dat   --fail=tcp.dat -

The rwfilter commands to create the files for Requests 5-8 are just like those that we used in our initial manifold solution, where the first rwfilter command reads the "non-tcp.dat" file:

rwfilter --proto=17        --pass=-        --fail=other.dat non-tcp.dat \
  | rwfilter --aport=53    --pass=dns.dat  --fail=-         -           \
  | rwfilter --aport=67,68 --pass=dhcp.dat --fail=udp.dat   -

You could invoke the three previous rwfilter commands using two named pipes---one for each of the two temporary files. Alternatively, you could use one named pipe and one standard (unnamed) pipe.

The following uses a single named pipe to replace the "all-tcp.dat" file, and uses an unnamed pipe in place of "non-tcp.dat".  The following is rwfilter manifold in the bash shell, and note the use of the "( ... ) &" construct to background a series of commands.

rm -f /tmp/fifo1
mkfifo /tmp/fifo1
(rwfilter    --aport=80    --pass=http.dat   --fail=-       /tmp/fifo1 \
  | rwfilter --aport=443   --pass=https.dat  --fail=-       -          \
  | rwfilter --aport=22    --pass=ssh.dat    --fail=tcp.dat - ) &
rwfilter ... --proto=6     --pass=/tmp/fifo1 --fail=-           \
  | rwfilter --proto=17    --pass=-          --fail=other.dat - \
  | rwfilter --aport=53    --pass=dns.dat    --fail=-         - \
  | rwfilter --aport=67,68 --pass=dhcp.dat   --fail=udp.dat   -

Named pipes and Python

Once you begin to use named pipes in the rwfilter manifold, the advantage of the netsa.util.shell module in the netsa-python library over using the shell becomes apparent.

When you run your commands in the shell, you need to ensure that the commands that read from the named pipe(s) are created in the background before the commands that write to the named pipe(s). A second problem is error handling: When a process exits abnormally in the shell, the shell may kill the commands downstream of the failed process but other processes may hang indefinitely.

The run_parallel() command in netsa.util.shell handles these situations for you.  You do not need to be (as) concerned with the order of your commands, and it kills all your subprocesses when any command fails.

To create the manifold in netsa-python using a named pipe, you use:

import os
from netsa.util.shell import *
pl = ["rwfilter --aport=80 --pass=http.dat --fail=- /tmp/fifo1",
      "rwfilter --aport=443 --pass=https.dat --fail=- -",
      "rwfilter --aport=22 --pass=ssh.dat --fail=tcp.dat -"]
p2 = ["rwfilter ... --proto=6 --pass=/tmp/fifo1 --fail=-",
      "rwfilter --proto=17 --pass=- --fail=other.dat -",
      "rwfilter --aport=53 --pass=dns.dat --fail=- -",
      "rwfilter --aport=67,68 --pass=dhcp.dat --fail=udp.dat -"]
os.unlink("/tmp/fifo1")
run_parallel("mkfifo /tmp/fifo1")
run_parallel(pipeline(pl), pipeline(p2))

An entirely different approach

Finally, as an alternative the rwfilter manifold, you could use something like the Python script below which uses the silk Python extension library (http://tools.netsa.cert.org/silk/pysilk.html).

This script reads SiLK flow records and splits them into files based on the protocols and ports.  The script accepts one or more files on the command line or it reads flow records on its standard input.

The Python code in this script will be slower than the manifold solutions presented above, and---depending on your site's configuration---it may even be slower than making multiple passes over the data.  The script has the advantage that you only do a single pass over the data, and it is easy enough to modify.

Note the example in the file's comments of using a tuple file to whittle the data before sending it to the script.  Doing this feeds the Python script only the data you are actually going to process and store.

Another option to reduce the amount of data the script processes is to use a simple manifold to split the data into TCP, UDP, and OTHER data files, and then create modified copies of this script that operate on a single protocol.

#!/usr/bin/env python
#
#  Read SiLK Flow records and split into multiple files depending on
#  the protocol and ports that a record uses.
#
#  Invoke as
#
#    split-flows.py  YEAR MONTH DAY  FILE [FILE...]
#
#  or to read from stdin:
#
#    split-flows.py  YEAR MONTH DAY
#
#  Code assumes the incoming data is for a single day.
#
#  Records are split into multiple files, where the file name's
#  prefixes are specified in the 'file' dictionary.  For example,
#  output files are named 'tcp-80-YEAR-MONTH-DAY.dat',
#  'udp-53-YEAR-MONTH-DAY.dat' for TCP traffic on port 80 and UDP
#  traffic on port 53, respectively.
#
#  The splitting logic is hard-coded in the main processing loop.
#
#  Any TCP traffic that is not matched goes into a file named
#  tcp-other-YEAR-MONTH-DAY.dat.  Any UDP traffic that is not
#  matched goes into a file named udp-other-YEAR-MONTH-DAY.dat.  Any
#  other unmatched traffic goes into a file named
#  other-YEAR-MONTH-DAY.dat.
#
#  If you do not care about the leftover data (that is, you do not
#  want any of the "other" files), you can reduce the amount of
#  traffic this script gets by filtering the data using a tuple
#  file.  For example, store the following (remove the leading '#')
#  into the text file /tmp/tuples.txt
#
#  proto | sport
#      6 | 80,443,22
#     17 | 53,67,68
#
#  Invoke rwfilter and pipe the result to this script as:
#
#  rwfilter --start-date=2011/12/13               \
#           --types=in,out,inweb,outweb           \
#           --proto=6,17                          \
#           --tuple-file=/tmp/tuples.txt          \
#           --tuple-direction=both                \
#           --pass=stdout
#  | python split-flows.py 2011 12 13
#
#  (The reason for the --proto=6,17 switch (which duplicates some of
#  the effort) is to reduce the number of records that we have to
#  search for in the red-black tree that the tuple-file creates.)
#
#  Ideas for expansion:
#    * Use the "manifold" (chained rwfilter commands) to split the
#      data into the protocols first, then create two versions of this
#      script: one for TCP and one for UDP.
#        rwfilter ... --proto=6 --pass=tcp-all.dat --fail=-         \
#          | rwfilter --proto=17 --pass=udp-all.dat --fail=other.dat
#    * Change the code instead of hard-coding the file prefixes and
#      the logic that splits flows.  For example, use lambda
#      functions, nested dictionaries, ...
#    * Have this script invoke rwfilter for you
#    * Have the script determine the date by looking at the start time
#      of the first record it sees.
#
#

# Use print functions (Compatible with Python 3.0; Requires 2.6+)
from __future__ import print_function
# Import the PySiLK bindings
from silk import *
# Import sys for the command line arguments.
import sys

# Where to write output files.  CUSTOMIZE THIS.
output_dir = "/tmp"

# Files that will be created.  CUSTOMIZE THIS.  The key is the file
# name's prefix.  The value will be the SilkFile object once the file
# has been opened.  Currently logic to do the splitting is hard-coded.
file = {'http'      : None,
        'https'     : None,
        'ssh'       : None,
        'tcp-other' : None,
        'dns'       : None,
        'dhcp'      : None,
        'udp-other' : None,
        'other'     : None,
     };

# Main function
def main():
    # Get the date from the command line
    if len(sys.argv) < 4:
        print ("Usage: %s year month day [infile1 [infile2...]]" % sys.argv[0])
        sys.exit(1)
    year = sys.argv[1]
    month = sys.argv[2]
    day = sys.argv[3]
    infile = None

    # Open the first file for reading
    arg_index = 4
    if len(sys.argv) == arg_index:
        infile = silkfile_open('-', READ)
    else:
        infile = silkfile_open(sys.argv[arg_index], READ)
        arg_index += 1

    # Open the output files
    for k in file.keys():
        name = "%s/%s-%s-%s-%s.dat" % (output_dir, k, year, month, day)
        file[k] = silkfile_open(name, WRITE)

    # Loop over the input files
    while infile is not None:
        # Loop over the records in this input file
        for rec in infile:
            # Split the record into a single file.  CUSTOMIZE THIS.
            # First match wins.
            if rec.protocol == 6:
                if (rec.sport == 80 or rec.dport == 80):
                    file['http'].write(rec)
                elif (rec.sport == 443 or rec.dport == 443):
                    file['https'].write(rec)
                elif (rec.sport == 22 or rec.dport == 22):
                    file['ssh'].write(rec)
                else:
                    file['tcp-other'].write(rec)
            elif rec.protocol == 17:
                if (rec.sport == 53 or rec.dport == 53):
                    file['dns'].write(rec)
                elif (rec.sport in [67,68] or rec.dport in [67,68]):
                    file['dhcp'].write(rec)
                else:
                    file['udp-other'].write(rec)
            else:
                file['other'].write(rec)

        # Move to the next file on the command line
        if arg_index == len(sys.argv):
            infile.close()
            infile = None
        else:
            try:
                infile = silkfile_open(sys.argv[arg_index], READ)
                arg_index += 1
            except IOError:
                print("Error: unable to open file %s" % sys.argv[arg_index])
                infile = None

    # Close output files
    for k in file.keys():
        try:
            file[k].close
        except:
            print("OOPS!  Error closing file for key %s" % k)

# Call the main() function when this program is started
if __name__ == '__main__':
    main()
  • No labels