Mothra is easily able to process loose IPFIX or SiLK data files,
but it is also able to query an organized repository of IPFIX
information. You can use the included mothra-packer
tool and the other tools included with it to load IPFIX data into
a Mothra repository, and then use Mothra with Spark to query it.
mothra-packer
and
mothra-invariantpacker
The Mothra packing tools are used to watch an incoming directory and pack files from that directory into a repository directory. For normal operations, this would be reading from a local directory and writing to a directory in HDFS.
mothra-packer
The packer process watches the incoming directory for new files
which do not have names beginning with a dot (.
)
character. This meshes with SiLK's rwreceiver
tool,
which prefixes the names of files with a dot while writing, and
then renames them to remove the dot once all data has been
written. The packer also uses the same technique while writing
files into the repository.
By default, mothra-packer
runs continuously as a
foreground process. It logs messages via Apache Log4j 1.2. See
Configuring Log4j logging for the Mothra
tools for more details. If you do not otherwise configure
Log4j, the default configuration will log warnings and errors to
the console.
mothra-packer
and some other tools use configurable
packing logic, which is described below under Mothra packing logic configuration.
See the
mothra-packer(1)
manual page for more details.
mothra-invariantpacker
The invariant packer watches an incoming directory for new files
produced by super_mediator
in "invariant mode". In
this mode, the incoming files are already pre-partitioned, and
mothra-invariantpacker
can more efficiently process
them in bulk into their taret locations.
The same packing configuration is used for
mothra-invariantpacker
as for
mothra-packer
and mothra-repacker
, but
the packing logic must be a PartitionerPackLogic
or
a PartitionerConfigurator
, not any other kind of
packing logic. See Mothra packing logic
configuration for more details.
See the
mothra-invariantpacker(1)
manual page for more
details.
In addition to mothra-packer
to load data, there are
several additional tools for altering the contents of a Mothra
IPFIX repository.
mothra-filejoiner
The packing process may result in several files being produced for each partition in the Mothra repository. This tool will reduce the overall number of files in the repository by combining the files in a given partition. A smaller number of files can improve performance if there are too many small files in the repository.
See the
mothra-filejoiner(1)
manual page for more
details.
mothra-filesanitizer
Sometimes, Information Elements are collected which take up too much space or which are not meant to be kept. This tool will process files and rewrite them with one or more information elements purged completely from the files.
See the
mothra-filesanitizer(1)
manual page for more
details.
mothra-repacker
This tool is used to repack files which have been previously packed into a Mothra IPFIX repository. This allows changing partitioning and compression in the repository.
Just like mothra-packer
, the repacker needs packing
logic configuration to determine where files should be placed.
See Mothra packing logic
configuration for more details.
Repacking requires bringing what is potentially a very large amount of data back from the clutser to a work directory on the local filesystem, so be careful when using it on large data repositories.
See the
mothra-repacker(1)
manual page for more
details.
mothra-rollupday
To further reduce the number of files processed, this tool will combine smaller files into daily files to reduce the overall number of files and partitions. After processing, other partitioning remains the same, but within those partitions all records for each day will be together.
See the
mothra-rollupday(1)
manual page for more
details.
Apache Log4j 1.2 (the version used by Apache Hadoop), which may be configured by creating a configuration file and providing that configuration file using environment variables or command-line arguments.
On the command-line of the tool scripts, you can
provide an option
-J-Dlog4j.configuration=file:/path/to/log4j.properties
providing a URL for the file. (This same mechanism may be used for
other options for the JVM.) You may also use the
JAVA_OPTS
environment variable to specify this
property by including
-Dlog4j.configuration=file:/path/to/log4j.properties
in the variable. Many tools, including the Mothra tool scripts,
will pass $JAVA_OPTS
to the JVM on startup.
The following example file (which is the same as the default file used by the Mothra tool scripts) can act as a base for your changes for debugging:
# Default to WARN to keep chatter down, and log to the CONSOLE appender.
# See below for enabling other log levels for specific packages.
log4j.rootLogger=WARN,CONSOLE
# CONSOLE is set to be a ConsoleAppender.
log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
log4j.appender.CONSOLE.layout.conversionPattern=%d [%t] %-5p %c - %m%n
# For debugging purposes, set logging higher in some areas, like:
#log4j.logger.org.cert.netsa.mothra.packer = TRACE
#log4j.logger.org.cert.netsa = INFO
#log4j.logger.org.cert.netsa.io.ipfix = TRACE
# The default log levels are:
# 0 OFF - Don't show any messages
# 200 ERROR - Errors that allow the application to continue
# 300 WARN - Potentially harmful situations
# 400 INFO - Informational high-level progress messages
# 500 DEBUG - Coarse debugging output
# 600 TRACE - Fine grained call tracing
# MAXINT ALL - Show all messages
Some other sample appenders which could be used:
# SYSLOG is set to log via sending messages to a local syslogd
log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender
log4j.appender.SYSLOG.syslogHost=127.0.0.1
log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout
log4j.appender.SYSLOG.layout.conversionPattern=%d{ISO8601} %-5p [%t] %c{2} %x - %m%n
log4j.appender.SYSLOG.Facility=LOCAL1
# FILE is set up to log via a rolling log file
log4j.appender.FILE=org.apache.log4j.RollingFileAppender
log4j.appender.FILE.File=.../example.log
log4j.appender.FILE.MaxFileSize=100KB
# Keep a single backup file after rollover
log4j.appender.FILE.MaxBackupIndex=1
log4j.appender.FILE.layout=org.apache.log4j.PatternLayout
log4j.appender.FILE.layout.conversionPattern=%d [%t] %-5p %c - %m%n
You can find more details on the log4j.properties
configuration file in the Log4j 1.2
manual.
mothra-packer
, mothra-invariantpacker
,
and mothra-repacker
make use of a configuration file
that defines how records are to be packed. This configuration is
defined as Scala code.
The configuration file's source code is interpreted to produce a
value which is of type
PartionerConfigurator
. This value provides a
sequence of
Partitioner
objects which define partitioning
schemes to use at each successive level of directory hierarchy.
Effectively, the data is partitioned based on all of these schemes
together and the partitioning is encoded in a standard way which
is interpreted by the IPFIX datasource when querying the
repository.
// A simple partion configuration to split records into partitions based
// on the value of the vlanId Information Element. This configuration is
// set up to keep notional "departments" together.
import org.cert.netsa.mothra.packer.{Partitioner, PartitionerConfigurator}
new PartitionerConfigurator {
override def partitioners: Seq[Partitioner] = {
// VLAN IDs of various offices
val it = Seq(3, 4, 5, 6)
val hr = Seq(1, 8, 11)
val eng = Seq(9, 101)
val vlanPartitioner = Partitioner.UInt16("vlanId")
// Add one partition for each office
vlanPartitioner.addPartitionEqualsAny(it)
vlanPartitioner.addPartitionEqualsAny(hr)
vlanPartitioner.addPartitionEqualsAny(eng)
// Add partitions to cover other non-null vlanId values
vlanPartitioner.coverRanges()
// Besides time, we're only partitioning on vlanId
Seq(vlanPartitioner)
}
}
Another example:
// A simple partion configuration to split records into partitions based
// first on the protocol and then by the source port of the record.
import org.cert.netsa.mothra.packer.{Partitioner, PartitionerConfigurator}
new PartitionerConfigurator {
override def partitioners: Seq[Partitioner] = {
// Partitions for protocolIdentifier values
val protoPartitioner = Partitioner.UInt8("protocolIdentifier")
// We want to partition these specific values each alone
protoPartitioner.addPartitionEquals(1)
protoPartitioner.addPartitionEquals(6)
protoPartitioner.addPartitionEquals(17)
protoPartitioner.addPartitionEquals(50)
protoPartitioner.addPartitionEquals(58)
// And then fill in the gaps for everything else
protoPartitioner.coverRanges()
// Partitions for TCP/UDP/etc. source port values
val sPortPartitioner = Partitioner.UInt16("sourceTransportPort")
// We want to partition each of these specific values
sPortPartitioner.addPartitionEquals(22)
sPortPartitioner.addPartitionEquals(25)
sPortPartitioner.addPartitionEquals(53)
sPortPartitioner.addPartitionEquals(80)
sPortPartitioner.addPartitionEquals(443)
// And the entire range from 443 < x <= 1023 is one partition
sPortPartitioner.addPartitionRange(443, false, 1023, true)
// And 8000 is also one specific value
sPortPartitioner.addPartitionEquals(8000)
// And then fill in the gaps for all other non-null values
sPortPartitioner.coverRanges()
// And as a result, partition first by protocol and then within
// protocol by source port.
Seq(protoPartitioner, sPortPartitioner)
}
}