Spark and Mothra using Scala

A more thorough walkthrough and documentation is in progress. For now, reading the API documentation starting around org.cert.netsa.mothra.datasources.ipfix is the best approach.

Also, the following sample Scala code demonstrates the very bare basics.


// This import provides additional functionality for working with
// IPFIX and SiLK data using the Spark DataFrameReader interface.

import org.cert.netsa.mothra.datasources._

// This should be the root directory of the packed IPFIX data.
// You could also load arbitrary unpacked IPFIX data files by providing
// a glob specifying those files.

val path = "/hdfs/path/to/ipfix/data/root"
val fileGlob = "/hdfs/glob/to/ipfix/files"

// This will load the default IPFIX fields, as described in the
// scaladoc documentation for org.cert.spark.datasources.ipfix.
// Filters may reduce the set of files that must be examined so that
// not all ile data is read by the Spark job.
val df = spark.read.ipfix(repository=path)

// You can now work with this Spark dataframe using the standard
// Spark tools.
df.show() // Shows the first few full records
df.count() // Shows the number of records
// etc.

// If you wish to work on arbitrary IPFIX files, the following
// option also works and will pull in any files specified by the Hadoop
// glob expression.

val ipfixFiles = "/hdfs/path/to/ipfix/files/*.yaf"

// Queries against df2 will process all the specified files.
val df2 = spark.read.ipfix(ipfixFiles)

// If you wish to work with IPFIX data that is not in the standard
// set of fields, you'll have to make use of the fields mechanism as
// described in the scaladoc for org.cert.spark.datasources.ipfix and
// org.cert.spark.datasources.
val df3 = spark.read.fields(
    "startTime",
    "endTime",
    "TOS" -> "ipClassOfService"
).ipfix(path)