Mothra is easily able to process loose IPFIX or SiLK data files, but it is also able to query
an organized repository of IPFIX information. You can use the included
mothra-packer
tool and the other tools included with it to load IPFIX data into
a Mothra repository, and then use Mothra with Spark to query it.
mothra-packer
and
mothra-invariantpacker
The Mothra packing tools are used to watch an incoming directory and pack files from that directory into a repository directory. For normal operations, this would be reading from a local directory and writing to a directory in HDFS.
mothra-packer
The packer process watches the incoming directory for new files which do not have names
beginning with a dot (.
) character. This meshes with SiLK's
rwreceiver
tool, which prefixes the names of files with a dot while writing,
and then renames them to remove the dot once all data has been written. The packer also
uses the same technique while writing files into the repository.
By default, mothra-packer
runs continuously as a foreground process. It logs
messages via Apache Log4j2. See Configuring Log4j logging for the Mothra
tools for more details. If you do not otherwise configure Log4j2, the default
configuration will log warnings and errors to the console.
mothra-packer
and some other tools use configurable packing logic, which is
described below under Mothra packing logic configuration.
See the mothra-packer(1)
manual page
for more details.
mothra-invariantpacker
The invariant packer watches an incoming directory for new files produced by
super_mediator
in "invariant mode". In this mode, the incoming files are
already pre-partitioned, and mothra-invariantpacker
can more efficiently
process them in bulk into their taret locations.
The same packing configuration is used for mothra-invariantpacker
as for
mothra-packer
and mothra-repacker
, but the packing logic must be
a PartitionerPackLogic
or a PartitionerConfigurator
, not any
other kind of packing logic. See Mothra packing logic
configuration for more details.
See the mothra-invariantpacker(1)
manual page for more details.
In addition to mothra-packer
to load data, there are several additional tools
for altering the contents of a Mothra IPFIX repository.
mothra-filejoiner
The packing process may result in several files being produced for each partition in the Mothra repository. This tool will reduce the overall number of files in the repository by combining the files in a given partition. A smaller number of files can improve performance if there are too many small files in the repository.
See the mothra-filejoiner(1)
manual
page for more details.
mothra-filesanitizer
Sometimes, Information Elements are collected which take up too much space or which are not meant to be kept. This tool will process files and rewrite them with one or more information elements purged completely from the files.
See the mothra-filesanitizer(1)
manual page for more details.
mothra-repacker
This tool is used to repack files which have been previously packed into a Mothra IPFIX repository. This allows changing partitioning and compression in the repository.
Just like mothra-packer
, the repacker needs packing logic configuration to
determine where files should be placed. See Mothra packing logic
configuration for more details.
Repacking requires bringing what is potentially a very large amount of data back from the clutser to a work directory on the local filesystem, so be careful when using it on large data repositories.
See the mothra-repacker(1)
manual
page for more details.
mothra-rollupday
To further reduce the number of files processed, this tool will combine smaller files into daily files to reduce the overall number of files and partitions. After processing, other partitioning remains the same, but within those partitions all records for each day will be together.
See the mothra-rollupday(1)
manual
page for more details.
Apache Log4j 2 may be configured by creating a configuration file and providing that configuration file using environment variables or command-line arguments.
On the command-line of the tool scripts, you can provide an option
-J-Dlog4j2.configurationFile=/path/to/log4j2.xml
providing a path to the config file. (This same mechanism may be used for other options for
the JVM.) You may also use the JAVA_OPTS
environment variable to specify this
property by including -Dlog4j2.configurationFile=/path/to/log4j2.xml
in the variable. Many tools, including the Mothra tool scripts,
will pass $JAVA_OPTS
to the JVM on startup.
You can find more details on the log4j2.xml
configuration file in the Log4j 2.x manual.
mothra-packer
, mothra-invariantpacker
, and
mothra-repacker
make use of a configuration file that defines how records are to
be packed. This configuration is defined as Scala code.
The configuration file's source code is interpreted to produce a value which is of type PartionerConfigurator
.
This value provides a sequence of Partitioner
objects which define partitioning schemes to use at each successive level of directory
hierarchy. Effectively, the data is partitioned based on all of these schemes together and
the partitioning is encoded in a standard way which is interpreted by the IPFIX datasource
when querying the repository.
// A simple partion configuration to split records into partitions based
// on the value of the vlanId Information Element. This configuration is
// set up to keep notional "departments" together.
import org.cert.netsa.mothra.packer.{Partitioner, PartitionerConfigurator}
new PartitionerConfigurator {
override def partitioners: Seq[Partitioner] = {
// VLAN IDs of various offices
val it = Seq(3, 4, 5, 6)
val hr = Seq(1, 8, 11)
val eng = Seq(9, 101)
val vlanPartitioner = Partitioner.UInt16("vlanId")
// Add one partition for each office
vlanPartitioner.addPartitionEqualsAny(it)
vlanPartitioner.addPartitionEqualsAny(hr)
vlanPartitioner.addPartitionEqualsAny(eng)
// Add partitions to cover other non-null vlanId values
vlanPartitioner.coverRanges()
// Besides time, we're only partitioning on vlanId
Seq(vlanPartitioner)
}
}
Another example:
// A simple partion configuration to split records into partitions based
// first on the protocol and then by the source port of the record.
import org.cert.netsa.mothra.packer.{Partitioner, PartitionerConfigurator}
new PartitionerConfigurator {
override def partitioners: Seq[Partitioner] = {
// Partitions for protocolIdentifier values
val protoPartitioner = Partitioner.UInt8("protocolIdentifier")
// We want to partition these specific values each alone
protoPartitioner.addPartitionEquals(1)
protoPartitioner.addPartitionEquals(6)
protoPartitioner.addPartitionEquals(17)
protoPartitioner.addPartitionEquals(50)
protoPartitioner.addPartitionEquals(58)
// And then fill in the gaps for everything else
protoPartitioner.coverRanges()
// Partitions for TCP/UDP/etc. source port values
val sPortPartitioner = Partitioner.UInt16("sourceTransportPort")
// We want to partition each of these specific values
sPortPartitioner.addPartitionEquals(22)
sPortPartitioner.addPartitionEquals(25)
sPortPartitioner.addPartitionEquals(53)
sPortPartitioner.addPartitionEquals(80)
sPortPartitioner.addPartitionEquals(443)
// And the entire range from 443 < x <= 1023 is one partition
sPortPartitioner.addPartitionRange(443, false, 1023, true)
// And 8000 is also one specific value
sPortPartitioner.addPartitionEquals(8000)
// And then fill in the gaps for all other non-null values
sPortPartitioner.coverRanges()
// And as a result, partition first by protocol and then within
// protocol by source port.
Seq(protoPartitioner, sPortPartitioner)
}
}