Managing an IPFIX repository

Mothra is easily able to process loose IPFIX or SiLK data files, but it is also able to query an organized repository of IPFIX information. You can use the included mothra-packer tool and the other tools included with it to load IPFIX data into a Mothra repository, and then use Mothra with Spark to query it.

Loading data using mothra-packer and mothra-invariantpacker

The Mothra packing tools are used to watch an incoming directory and pack files from that directory into a repository directory. For normal operations, this would be reading from a local directory and writing to a directory in HDFS.

mothra-packer

The packer process watches the incoming directory for new files which do not have names beginning with a dot (.) character. This meshes with SiLK's rwreceiver tool, which prefixes the names of files with a dot while writing, and then renames them to remove the dot once all data has been written. The packer also uses the same technique while writing files into the repository.

By default, mothra-packer runs continuously as a foreground process. It logs messages via Apache Log4j 1.2. See Configuring Log4j logging for the Mothra tools for more details. If you do not otherwise configure Log4j, the default configuration will log warnings and errors to the console.

mothra-packer and some other tools use configurable packing logic, which is described below under Mothra packing logic configuration.

See the mothra-packer(1) manual page for more details.

mothra-invariantpacker

The invariant packer watches an incoming directory for new files produced by super_mediator in "invariant mode". In this mode, the incoming files are already pre-partitioned, and mothra-invariantpacker can more efficiently process them in bulk into their taret locations.

The same packing configuration is used for mothra-invariantpacker as for mothra-packer and mothra-repacker, but the packing logic must be a PartitionerPackLogic or a PartitionerConfigurator, not any other kind of packing logic. See Mothra packing logic configuration for more details.

See the mothra-invariantpacker(1) manual page for more details.

Reorganizing data using Mothra utilities

In addition to mothra-packer to load data, there are several additional tools for altering the contents of a Mothra IPFIX repository.

mothra-filejoiner

The packing process may result in several files being produced for each partition in the Mothra repository. This tool will reduce the overall number of files in the repository by combining the files in a given partition. A smaller number of files can improve performance if there are too many small files in the repository.

See the mothra-filejoiner(1) manual page for more details.

mothra-filesanitizer

Sometimes, Information Elements are collected which take up too much space or which are not meant to be kept. This tool will process files and rewrite them with one or more information elements purged completely from the files.

See the mothra-filesanitizer(1) manual page for more details.

mothra-repacker

This tool is used to repack files which have been previously packed into a Mothra IPFIX repository. This allows changing partitioning and compression in the repository.

Just like mothra-packer, the repacker needs packing logic configuration to determine where files should be placed. See Mothra packing logic configuration for more details.

Repacking requires bringing what is potentially a very large amount of data back from the clutser to a work directory on the local filesystem, so be careful when using it on large data repositories.

See the mothra-repacker(1) manual page for more details.

mothra-rollupday

To further reduce the number of files processed, this tool will combine smaller files into daily files to reduce the overall number of files and partitions. After processing, other partitioning remains the same, but within those partitions all records for each day will be together.

See the mothra-rollupday(1) manual page for more details.

Configuring Log4j logging for the Mothra tools

Apache Log4j 1.2 (the version used by Apache Hadoop), which may be configured by creating a configuration file and providing that configuration file using environment variables or command-line arguments.

On the command-line of the tool scripts, you can provide an option -J-Dlog4j.configuration=file:/path/to/log4j.properties providing a URL for the file. (This same mechanism may be used for other options for the JVM.) You may also use the JAVA_OPTS environment variable to specify this property by including -Dlog4j.configuration=file:/path/to/log4j.properties in the variable. Many tools, including the Mothra tool scripts, will pass $JAVA_OPTS to the JVM on startup.

The following example file (which is the same as the default file used by the Mothra tool scripts) can act as a base for your changes for debugging:


# Default to WARN to keep chatter down, and log to the CONSOLE appender.
# See below for enabling other log levels for specific packages.

log4j.rootLogger=WARN,CONSOLE

# CONSOLE is set to be a ConsoleAppender.

log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
log4j.appender.CONSOLE.layout.conversionPattern=%d [%t] %-5p %c - %m%n

# For debugging purposes, set logging higher in some areas, like:

#log4j.logger.org.cert.netsa.mothra.packer = TRACE
#log4j.logger.org.cert.netsa = INFO
#log4j.logger.org.cert.netsa.io.ipfix = TRACE

# The default log levels are:
#      0 OFF   - Don't show any messages
#    200 ERROR - Errors that allow the application to continue
#    300 WARN  - Potentially harmful situations
#    400 INFO  - Informational high-level progress messages
#    500 DEBUG - Coarse debugging output
#    600 TRACE - Fine grained call tracing
# MAXINT ALL   - Show all messages

Some other sample appenders which could be used:


# SYSLOG is set to log via sending messages to a local syslogd

log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender
log4j.appender.SYSLOG.syslogHost=127.0.0.1
log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout
log4j.appender.SYSLOG.layout.conversionPattern=%d{ISO8601} %-5p [%t] %c{2} %x - %m%n
log4j.appender.SYSLOG.Facility=LOCAL1

# FILE is set up to log via a rolling log file

log4j.appender.FILE=org.apache.log4j.RollingFileAppender
log4j.appender.FILE.File=.../example.log
log4j.appender.FILE.MaxFileSize=100KB
# Keep a single backup file after rollover
log4j.appender.FILE.MaxBackupIndex=1
log4j.appender.FILE.layout=org.apache.log4j.PatternLayout
log4j.appender.FILE.layout.conversionPattern=%d [%t] %-5p %c - %m%n

You can find more details on the log4j.properties configuration file in the Log4j 1.2 manual.

Configuring Mothra packing logic

mothra-packer, mothra-invariantpacker, and mothra-repacker make use of a configuration file that defines how records are to be packed. This configuration is defined as Scala code.

The configuration file's source code is interpreted to produce a value which is of type PartionerConfigurator. This value provides a sequence of Partitioner objects which define partitioning schemes to use at each successive level of directory hierarchy. Effectively, the data is partitioned based on all of these schemes together and the partitioning is encoded in a standard way which is interpreted by the IPFIX datasource when querying the repository.


// A simple partion configuration to split records into partitions based
// on the value of the vlanId Information Element. This configuration is
// set up to keep notional "departments" together.

import org.cert.netsa.mothra.packer.{Partitioner, PartitionerConfigurator}

new PartitionerConfigurator {
  override def partitioners: Seq[Partitioner] = {
    // VLAN IDs of various offices
    val it = Seq(3, 4, 5, 6)
    val hr = Seq(1, 8, 11)
    val eng = Seq(9, 101)

    val vlanPartitioner = Partitioner.UInt16("vlanId")

    // Add one partition for each office
    vlanPartitioner.addPartitionEqualsAny(it)
    vlanPartitioner.addPartitionEqualsAny(hr)
    vlanPartitioner.addPartitionEqualsAny(eng)

    // Add partitions to cover other non-null vlanId values
    vlanPartitioner.coverRanges()

    // Besides time, we're only partitioning on vlanId
    Seq(vlanPartitioner)
  }
}

Another example:


// A simple partion configuration to split records into partitions based
// first on the protocol and then by the source port of the record.

import org.cert.netsa.mothra.packer.{Partitioner, PartitionerConfigurator}

new PartitionerConfigurator {
  override def partitioners: Seq[Partitioner] = {

    // Partitions for protocolIdentifier values
    val protoPartitioner = Partitioner.UInt8("protocolIdentifier")

    // We want to partition these specific values each alone
    protoPartitioner.addPartitionEquals(1)
    protoPartitioner.addPartitionEquals(6)
    protoPartitioner.addPartitionEquals(17)
    protoPartitioner.addPartitionEquals(50)
    protoPartitioner.addPartitionEquals(58)

    // And then fill in the gaps for everything else
    protoPartitioner.coverRanges()

    // Partitions for TCP/UDP/etc. source port values
    val sPortPartitioner = Partitioner.UInt16("sourceTransportPort")

    // We want to partition each of these specific values
    sPortPartitioner.addPartitionEquals(22)
    sPortPartitioner.addPartitionEquals(25)
    sPortPartitioner.addPartitionEquals(53)
    sPortPartitioner.addPartitionEquals(80)
    sPortPartitioner.addPartitionEquals(443)
    // And the entire range from 443 < x <= 1023 is one partition
    sPortPartitioner.addPartitionRange(443, false, 1023, true)
    // And 8000 is also one specific value
    sPortPartitioner.addPartitionEquals(8000)
    // And then fill in the gaps for all other non-null values
    sPortPartitioner.coverRanges()

    // And as a result, partition first by protocol and then within
    // protocol by source port.
    Seq(protoPartitioner, sPortPartitioner)
  }
}