CERT
Software Assurance Secure Systems Organizational Security Coordinating Response Training
Skip to end of metadata
Go to start of metadata

If you find yourself using flow data from another analysis platform and would like to import it into a SiLK format, you essentially have two options:  you can either replay the flow data, or you can convert it with rwtuc.

Replaying flow data

Many flow collection tools have a flow "replay" (for example, the nfreplay command in the nfdump toolset). This is the best way to import data as it essentially rebuilds the flow data and packs it into the SiLK repository.

The general process for replaying flow data is as follows:

  1. Install the SiLK packing infrastructure (if you haven't already). See the "Administration" tooltips, particularly the SiLK on a Box - Standalone Flow Collection & Analysis page.
  2. Configure a sensor to collect the replayed data. This is also described on the SiLK on a Box - Standalone Flow Collection & Analysis page in the section concerning sensors.conf.
  3. Start rwflowpack
  4. Replay the flow data. Be sure to direct it to the IP address and port that you specified in sensors.conf.

Once you're done replaying the flow data, you should be able to query directly against the imported data in the repository using rwfilter selection criteria.

rwtuc Conversion

Although some flow analysis toolkits (including SiLK) do not have a method for replaying flow, they all support some type of text-based output. We can use text output as an input into rwtuc, which will then create the binary packed SiLK flow files.

Each platform will have different nuances that must be handled; you'll want to check out some other tooltips on rwtuc like Crafting Records for Test Scripts using rwtuc. Here's an example of how to import Lancope flow data into SiLK format (Lancope is biflow data; the tool also does some aggregation & summarization before storing the data).

wrapper.sh
#  This shell script outputs lancope data in $FILE into a text file,
#  and then calls a script file to convert the text data to rwtuc format.
#  The converted text data is saved in $FILE.rw.
#
./swflow -t flow.txt -f $FILE -s "all" 2>/dev/null > /dev/null
cat flow.txt | ./lancope-convert.pl | rwtuc --bad=bad.txt > $FILE.rw
rwfileinfo tmp.rw | grep "records"
#
#  I know the conversion routine's not perfect, so let's watch for problems
#  and fix them as they come up.
if [ -e bad.txt ]
then
    echo "Bad records found."
    exit
fi
lancope-convert
#! /usr/bin/perl
#
#  Converts lancope output to silk-readable text
#
#  Generate data like:
#  ./swflow  -t tmp -f 10/14/data/flow08101405.log -s "all"
#
use strict;

my $line;
#  These are all the fields exported in the lancope data set:
my ($starttime,
    $flowduration,
    $clienthost,
    $clientzone,
    $clientcountry,
    $serverhost,
    $serverzone,
    $servercountry,
    $initiator,
    $active,
    $flowstatus,
    $profiledservice,
    $protocol,
    $flowservice,
    $clientpkts,
    $clientbytes,
    $clienttotalbytes,
    $serverpkts,
    $serverbytes,
    $servertotalbytes,
    $activeflows,
    $clientport,
    $serverport,
    $portmin,
    $portmax,
    $vlanid,
    $mplslabel,
    $totalpkts,
    $totalbytes,
    $totaltraffic,
    $avgkb,
    $clientsyn,
    $clientsynack,
    $clientrst,
    $clientfin,
    $clientbad,
    $clientfrag,
    $serversyn,
    $serversynack,
    $serverrst,
    $serverfin,
    $serverbad,
    $serverfrag,
    $clientcistart,
    $clientciend,
    $servercistart,
    $serverciend,
    $clientttl,
    $serverttl,
    $endlasttime,
    $clientmac,
    $servermac,
    $zsid,
    $zdid
    );
my (%flow);

#
#  Print out a header line; this makes it easy for rwtuc to
#  know what fields are imported into SiLK format.
#
$flow{"stime"}    = "stime";
$flow{"dur"}      = "dur";
$flow{"sip"}      = "sip";
$flow{"dip"}      = "dip";
$flow{"protocol"} = "protocol";
$flow{"pkts"}     = "pkts";
$flow{"bytes"}    = "bytes";
$flow{"sport"}    = "sport";
$flow{"dport"}    = "dport";
$flow{"flags"}    = "flags";
$flow{"class"}    = "class";
$flow{"type"}     = "type";
writeflow (%flow);
undef %flow;

#
#  Loop through all the data on STDIN
#
while (<>) {
    chomp;
    $line = $_;
    $line =~ s/\"//g;		# get rid of quote delimiters
    next if ($line =~ /^Start/); # first line
    #  Again, these are all the columns in the lancope data set
    ($starttime,
     $flowduration,
     $clienthost,
     $clientzone,
     $clientcountry,
     $serverhost,
     $serverzone,
     $servercountry,
     $initiator,
     $active,
     $flowstatus,
     $profiledservice,
     $protocol,
     $flowservice,
     $clientpkts,
     $clientbytes,
     $clienttotalbytes,
     $serverpkts,
     $serverbytes,
     $servertotalbytes,
     $activeflows,
     $clientport,
     $serverport,
     $portmin,
     $portmax,
     $vlanid,
     $mplslabel,
     $totalpkts,
     $totalbytes,
     $totaltraffic,
     $avgkb,
     $clientsyn,
     $clientsynack,
     $clientrst,
     $clientfin,
     $clientbad,
     $clientfrag,
     $serversyn,
     $serversynack,
     $serverrst,
     $serverfin,
     $serverbad,
     $serverfrag,
     $clientcistart,
     $clientciend,
     $servercistart,
     $serverciend,
     $clientttl,
     $serverttl,
     $endlasttime,
     $clientmac,
     $servermac,
     $zsid,
     $zdid) = split (/,/, $line);

    #  These are the conversion routines:  take the lancope data and
    #  turn it into SiLK format.  Some fields need converter helper functions.
    $flow{"stime"}    = reformattime($starttime)	;
    $flow{"dur"}      = $flowduration	;
    $flow{"sip"}      = $clienthost	;
    $flow{"dip"}      = $serverhost	;
    $flow{"protocol"} = decodeproto($protocol);
    $flow{"pkts"}     = $clientpkts	;
    $flow{"bytes"}    = $clientbytes	;
    $flow{"sport"}    = $clientport	;
    $flow{"dport"}    = $serverport	;
    $flow{"flags"}    = flaglist ($clientsyn, $clientsynack, $clientrst, $clientfin)	;
    $flow{"class"}    = "all";
    $flow{"type"}     = gettype ($flow{"sip"}, $flow{"dip"});
    #
    writeflow (%flow);		# client-side flow
    #
    #  Lancope records are bi-flows.  That means we normally write TWO uniflow records for
    # each biflow
    #
    $flow{"dip"}      = $clienthost	;
    $flow{"sip"}      = $serverhost	;
    $flow{"pkts"}     = $serverpkts	;
    $flow{"bytes"}    = $serverbytes	;
    $flow{"dport"}    = $clientport	;
    $flow{"sport"}    = $serverport	;
    $flow{"flags"}    = flaglist ($serversyn, $serversynack, $serverrst, $serverfin)	;
    $flow{"type"}     = gettype ($flow{"sip"}, $flow{"dip"});
    writeflow (%flow);
    #
    #  Make sure data isn't accidentally carried through to the next record
    undef %flow;
}

sub gettype {
    #
    #  This helper function returns the type for the two given IP addresses.
    #  You need to customize this to the types that are defined for your data set.
    #
    my ($sip, $dip) = @_;

    if (isinternal ($sip)) {
	if (isinternal ($dip)) {
	    return "int2int";
	} else {
	    return "out";
	}
    } else {
	if (isinternal ($dip)) {
	    return "in";
	} else {
	    return "ext2ext";
	}
    }
}


sub isinternal {
    #  Lancope swaps source and destination addresses; this function undoes
    #  that swap.  You'll want to customize this for your network.
    #  Returns true if the passed in address is on the internal network.
    #  
    #    internal network:
    #    192.168.0.0/16
    #
    my ($ip) = @_;
    my ($o1, $o2, $o3, $o4) = split(/\./, $ip);
    return (($o1 eq "138") && ($o2 eq "254"));
}

sub writeflow {
    my ($flow) = @_;
    #  prints out a properly formatted rwtuc-ready flow record.
    #
    return if (($flow{"pkts"} eq '0') || ($flow{"bytes"} eq '0'));
    
    $flow{"dur"} = "1048575" if ($flow{"dur"} > 1048575);
    $flow{"pkts"} = "4294967295" if ($flow{"pkts"} > 4294967295);
    $flow{"bytes"} = "4294967295" if ($flow{"bytes"} > 4294967295);

    print $flow{"stime"}    . "|";
    print $flow{"dur"}      . "|";
    print $flow{"sip"}      . "|";
    print $flow{"dip"}      . "|";
    print $flow{"protocol"} . "|";
    print $flow{"pkts"}     . "|";
    print $flow{"bytes"}    . "|";
    print $flow{"sport"}    . "|";
    print $flow{"dport"}    . "|";
    print $flow{"flags"}    . "|";
    print $flow{"class"}    . "|";
    print $flow{"type"}     . "|";
    print "\n";
}

sub reformattime {
    my ($intime) = @_;
    #  Converts times to SiLK-type format; e.g., 
    #  2008-10-14 00:56:27 to 2008/10/19T00:00:01.050
    #
    #$intime =~ tr/- /\/T/g;
    $intime =~ s/-/\//g;
    $intime =~ s/ /T/g;
    return $intime;

    #my ($year, $month, $day, $hour, $min, $sec);
    #if ($intime =~ /(\d\d\d\d)-(\d\d)-(\d\d) (\d\d):(\d\d):(\d\d)/) {
	#($year, $month, $day, $hour, $min, $sec) 
	#(   $1,     $2,   $3,    $4,   $5,   $6);
	#return "$1/$2/$3T$4:$5$:6";
    #} else {
	#die "Unrecognized time format:  $intime";
    #}
}


sub flaglist {
    #  Convert
    #  syn, synack, rst, fin to SAFRPEUC
    my ($syn, $synack, $rst, $fin) = @_;
    my $ret = "";

    $ret .= "S" if (($syn > 0) || ($synack > 0));
    $ret .= "A" if ($synack > 0);
    $ret .= "R" if ($rst > 0);
    $ret .= "F" if ($fin > 0);

    return $ret;
}


sub decodeproto {
    #  Convert named protocols into numeric; e.g.,
    #  TCP / UDP to 6, 17
    my ($protoname) = @_;
    $protoname = lc ($protoname);

    return "6"   if ($protoname eq "tcp");
    return "17"  if ($protoname eq "udp");
    return "50"  if ($protoname eq "ip-sec");

    return "51"  if ($protoname eq "ah" 	);
    return "37"  if ($protoname eq "ddp" 	);
    return "8"   if ($protoname eq "egp" 	);
    return "88"  if ($protoname eq "eigrp" 	);
    return "50"  if ($protoname eq "esp" 	);
    return "133" if ($protoname eq "fc" 	);
    return "47"  if ($protoname eq "gre" 	);
    return "0"   if ($protoname eq "hopopt" 	);
    return "45"  if ($protoname eq "idrp" 	);
    return "2"   if ($protoname eq "igmp" 	);
    return "9"   if ($protoname eq "igp" 	);
    return "4"   if ($protoname eq "ipinip" 	);
    return "41"  if ($protoname eq "ipv6" 	);
    return "44"  if ($protoname eq "ipv6-frag" 	);
    return "58"  if ($protoname eq "ipv6-icmp" 	);
    return "59"  if ($protoname eq "ipv6-nonxt"	);
    return "60"  if ($protoname eq "ipv6-opts" 	);
    return "43"  if ($protoname eq "ipv6route" 	);
    return "124" if ($protoname eq "isis" 	);
    return "115" if ($protoname eq "l2tp" 	);
    return "137" if ($protoname eq "mplsinip" 	);
    return "89"  if ($protoname eq "ospfigp" 	);
    return "27"  if ($protoname eq "rdp" 	);
    return "46"  if ($protoname eq "rsvp" 	);
    return "42"  if ($protoname eq "sdrp" 	);
    return "83"  if ($protoname eq "vines" 	);
    return "112" if ($protoname eq "vrrp" 	);
    return "1"   if ($protoname eq "icmp" 	);
    return "255" if ($protoname eq "unprofiled");

    return $protoname if ($protoname =~ /\d+/);
    die "Unable to decipher protocol $protoname\n";

}
  • No labels