NetSA NiFi Documentation

This package provides tools for working with NetSA network flow data files in the Apache NiFi system. These currently consist of:

  • SilkReader: A Record Reader Controller Service for SiLK flow files, to be used with Apache NiFi record processors such as ConvertRecord, QueryRecord, ScriptedFilterRecord, or others.

  • FilterBasicSilk: A very simple SiLK record-filtering processor, which provides much greater performance when filtering flow records than using SilkReader with ScriptedFilterRecord.

As of January 2024, NetSA NiFi currently supports Apache NiFi versions 1.18 through 1.24. The directions below assume Apache NiFi 1.24.0, and may vary for different versions. NetSA NiFI has not yet been tested with Apache NiFi 2.

Usage

A typical workflow would be to read SiLK files from some source, feed those through one or more FilterBasicSilk processors to limit the output to records of interest, and then send the output from there into a ConvertRecord processor to read SiLK records and write JSON records.

Schematically:

[A] GetFile (read from some directory)
 |
 | success
 v
[B] FilterBasicSilK (web traffic)
 |    application = 80
 |
 | filter_pass
 v
[C] ConvertRecord (convert to JSON)
 |    record reader = SilkRecordReader
 |    record writer = JSONRecordSetWriter
 |
 | success
 v
[D] PutFile (write to some directory)

Advanced Filtering

The FilterBasicSilk processor does not provide anything in the way of logical expressions, it only provides simple ways to accept flows when a column has one of a set of specified values. In order to build more complicated logical expressions, it may be necessary to use multiple FilterBasicSilk processors, and even to use both the pass and fail output of processors. For example:

 :
 v
[F] FilterBasicSilK (web traffic)
 |    application = 80
 |
 | filter_pass
 v
[G] FilterBasicSilk (no port is 80)
 |    aport = 80
 |
 | filter_fail
 v
(X)

Note that the “filter_fail” relationship is used for G, which means that records that fail its filter are used. This effectively negates the filter.

By filtering on “application = 80” and sending its pass output to a filter saying “aport = 80” and using the fail output of that second filter, you filter the data to produce “records which were detected to be web traffic, but which did not involve port 80 on either end of the conversation”.

Further, it’s possible to use both the pass and fail output of a single filter. For example, if we extend the above processing pipeline with another processor:

 :
 v
[F] FilterBasicSilK (web traffic) ------+
 |    application = 80                  |
 |                                      |
 | filter_pass                          | filter_fail
 v                                      v
[G] FilterBasicSilk (no port is 80)    [H] FilterBasicSilk (any port is 80)
 |    aport = 80                        |    aport = 80
 |                                      |
 | filter_fail                          | filter_pass
 v                                      v
(X)                                    (Y)

When the fail output of [F] is passed to [H], now the output of [H] is “records which were not detected to be web traffic, but which did involve port 80 on one side or other of the conversation.”

So now X is receiving flows which are HTTP but on an unusual port, and Y is receiving flows which are not HTTP but which are on a web port.

Note that in this case, the processed files in X and Y may have the same original filename. If you later combine these two streams of data and send them to a processor like PutFile which requires filenames to not be duplicated, you’ll have problems. You may need to use an UpdateAttribute processor to rename the files (by adding a suffix to them, for example.) If you send them to a processor that does not care about filenames, this will not be a problem.

Conversion and Scripted Filtering

Rather than using FilterBasicSilk, you may choose to convert SiLK records to other formats and use more powerful but slower scripting capabilities by using the SilkReader Record Reader. To do this, simply use a SilkReader service as the Record Reader for your record-based processors. See below for configuration details.

SilkReader Configuration

SilkReader is a Controller Service which provides the RecordReader interface. It reads input files which contain SiLK flow data and produces Records with all of the fields of the SiLK flow records, as described below.

Since SilkReader is a Controller Service, you may either create it directly by configuring a Process Group or create it at need when configuring a record-based Processor.

To create it within a Process Group, select the Controller Services tab, press the + button, and select SilkReader from the list.

To create it from a Processor, choose the Record Reader property, and then select “Create new service…” from the dropdown. Then choose SilkReader from the dropdown list.

Once a SilkReader is created, it may be configured by setting its SiLK Config File property to the path of a SiLK configuration file. This enables the SilkReader to assign symbolic sensor, class, and type names to flows. Without a configuration file, this information is only available numerically.

A single SilkReader service may be used by multiple Processors which require a RecordReader. Simply choose the existing SilkReader instance from the dropdown list for their Record Reader properties.

SiLK records have a fixed schema, which is used for all records and cannot be configured. The following fields are defined, which generally match the fields in the SiLK rwcut tool (see below for differences): sIP, dIP, sPort, dPort, protocol, packets, bytes, flags, flagsBits, sTime, duration, eTime, sensor, sensorName, className, type, typeName, iType, iCode, in, out, nhIP, initialFlags, initialFlagsBits, sessionFlags, sessionFlagsBits, attributes, attributesBits, application, and memo. You can read more about these fields in the rwcut manual page.

The flags, initialFlags, sessionFlags, and attributes fields are all represented textually (the default in rwcut), while flagsBits, initialFlagsBits, sessionFlagsBits, and attributesBits are the numeric values for these fields. (As if the --integer-tcp-flags option was used.)

Unlike in rwcut, the sensor, class, and type names for records are presented in the sensorName, className, and typeName fields. These values are only available if the SilkReader is configured with a SiLK configuration file, otherwise they all result in ?. The sensor and type fields are numeric. The sensor field contains the sensor ID which is presented by rwcut when no SiLK configuration file is available, or when the --integer-sensors option is used. The type field contains the flowtype ID, which is presented by rwcut in the type field when no SiLK configuration file is available.

Finally, the sTime, duration, and eTime fields all contain millisecond resolution. sTime and eTime are provided as timestamp value (which may be transformed to a specific text format when converting to JSON or another output format.) duration is provided as the number of seconds with double precision.

FilterBasicSilk Configuration and Filtering

The FilterBasicSilk processor provides filtering similar in capability to the SiLK rwfilter tool. It reads SiLK network flow data files, filters them based on the properties set on the processor, and then outputs new files which contain only the records which pass (filter_pass) or fail (filter_fail) the filtering properties. If there are any processing errors, input files will be passed to the failure relationship. And all input files are also passed to the original relationship.

Providing a Silk Config File to the processor will allow it to understand SiLK sensor, class, and type names. Without a config file, you may still filter based on sensor ID and flowtype ID values.

Other properties are more-or-less equivalent to similarly named options to the rwfilter command. (You can learn more about these options in the rwfilter manual page.

The following properties are understood by FilterBasicSilk. Details which differ from rwfilter are decribed below: flowtypes, sensors, protocol, sport, dport, aport, icmp-type, icmp-code, flags-all, flags-initial, flags-session, scidr, sipset, dcidr, dipset, any-cidr, anyset, nhcidr, nhipset, bytes, packets, bytes-per-packet, active-time, stime, etime, duration, application, attributes, ip-version, input-index, output-index, and any-index.

Each property’s documentation in the NiFi configuration interface gives details on the format of the field, and these formats generally match those of the rwfilter options arguments.

Notable differences from rwfilter include:

The --class and --type options are unavailable, and flowtypes should be used instead. In rwfilter these two methods cannot be used at the same time, and the flowtypes mechanism is more flexible.

The --start-date and --end-date options are not included. These specify which stored SiLK data files should be referred to, but for filtering it is necessary to use the stime, etime, or active-time properties.

The --saddress, --daddress, --any-address, and --next-hop-id options are not provided. Instead of using the IP address wildcard mechanism provided by these options, use scidr, dcidr, any-cidr, and nhcidr. These provide matching on a comma-separated list of addresses and CIDR-style (i.e. 1.2.0.0/16 or ::ffff:0.0.0.0/96) address blocks.

None of the individual flag options like --fin-flag are provided. Instead use flags-all and match based on comma-separated high/mask combinations.

The country code, address type, tuple file, and prefix map features are not supported.

Finally, the primary difference is that unlike rwfilter, FilterBasicSilk does not provide any --not-X options. For example, the FilterBasicSilk sport property is equivalent to the rwfilter --sport option, but FilterBasicSilk has no equivalent to the --not-sport option. See below for details about how to work around this limitation.

Using Two FilterBasicSilk Processors For “Not” Filters

If you only filter based on negative results from some set of options, it’s easy enough to use the filter_fail output instead of the filter_pass output. However, should you need to combine some number of positive and negative filters, you may need to use two FilterBasicSilk processors for what would be a single rwfilter call. For example:

rwfilter --not-scidr=10.0.0.0/8 --protocol=6 --aport=80,443

In this case, the filter is finding flows which did not originate from the network 10.0.0.0/8, and which were TCP flows which had either source or destination port 80 or 443. In order to represent this as two FilterBasicSilk processors, you could use:

 :
 v
[J] FilterBasicSilk (sip not in 10.0.0.0/8)
 |    scidr = 10.0.0.0/8
 |
 | filter_fail
 v
[K] FilterBasicSilk (TCP port is 80 or 443)
 |    protocol = 6
 |    aport = 80,443
 |
 | filter_pass
 v
(Z)

Here, Z would receive SiLK flow records which failed the first filter (and so were not in 10.0.0.0/8) and passed the second (and so were TCP flows with either port being 80 or 443.)

Of course, J and K could be reversed, and more complicated combinations can represent a wider array of logical filtering expressions. (See the Advanced Filtering section above.)

Reloading Configuration Files

SilkReader and FilterBasicSilk may be configured to re-read certain configuration files when they change on disk. Some care should be taken with this feature, since they can result in unvalidated changes when the new files are made available.

If a SilkReader’s “Reload SiLK Config File” option is set to true, then if the named SiLK configuration file changes wihle the processor is active, the processor will attempt to re-load it when the next file is processed. If reading the new configuration file fails, the processor continues to use the old configuration and will not attempt to re-load again until the configuration file changes again.

The FilterBasicSilk “Reload SiLK Config File” option works the same way, but FilterBasicSiLK also has a “Reload IP Set Files” option. When the “Reload IP Set Files” option is activated, changes to the files named for “sipset”, “dipset”, “nhipset”, or “anyset” will be re-loaded when they change. Like with the SiLK config file, if reloading one of these files fails the old set will continue to be used and that file will not be re-read again until it changes again.

When any of these files produces a failure when being re-read, that failure will be logged (and should produce an error flag in the Apache NiFi UI). If the failure is not corrected before processors or services are for some reason deactivated, the failures will prevent the processors or services from being activated again, since checking the configuration will fail.

Building the Software

You can use the provided script to build a NiFi Archive (NAR) containing NetSA NiFi, suitable for deployment to Apache NiFi:

./scripts/build_nar.sh

If you’d rather build the NAR file yourself, the script does the following:

First, mill is used to build the Scala source code into a jar file with the software, and install it into your ~/.m2/repository directory:

./mill netsa-nifi.publishM2Local

Second, Maven is used to collect the jar file and its dependencies into a nar file with the appropriate metadata:

cd netsa-nifi-nar
mvn clean package -Dnetsa.nifi.version=X.Y.Z

You can use the following call to mill to determine the version number of NetSA NiFi to provide to Maven:

./mill show netsa-nifi.publishVersion

Installation

The file .../target/netsa-nifi-nar-X.Y.Z.nar will be the NiFi Archive, which can be placed in Apache NiFi’s lib directory (or you might place it into Apache NiFi’s library in another way.)

Once Apache NiFi is restarted, the new processor and controller service should be available.

Third-party Licenses

This Software includes and/or makes use of Third-Party Software each subject to its own license, including but not limited to:

  1. Apache NiFi (https://github.com/apache/nifi/blob/rel/nifi-1.18.0/LICENSE) Copyright 2014-2022 The Apache Software Foundation

  2. Scala (https://github.com/scala/scala/blob/v2.13.12/LICENSE) Copyright (c) 2002-2023 EPFL Copyright (c) 2011-2023 Lightbend, Inc.

  3. mill (https://github.com/com-lihaoyi/mill/blob/0.11.5/LICENSE) Copyright (c) 2017 Li Haoyi (haoyi.sg@gmail.com)

  4. millw (https://github.com/lefou/millw/blob/0.4.10/LICENSE) Copyright 2023 Tobias Roeser

License

NetSA NiFi 1.1

Copyright 2023 Carnegie Mellon University.

NO WARRANTY. THIS CARNEGIE MELLON UNIVERSITY AND SOFTWARE ENGINEERING INSTITUTE MATERIAL IS FURNISHED ON AN “AS-IS” BASIS. CARNEGIE MELLON UNIVERSITY MAKES NO WARRANTIES OF ANY KIND, EITHER EXPRESSED OR IMPLIED, AS TO ANY MATTER INCLUDING, BUT NOT LIMITED TO, WARRANTY OF FITNESS FOR PURPOSE OR MERCHANTABILITY, EXCLUSIVITY, OR RESULTS OBTAINED FROM USE OF THE MATERIAL. CARNEGIE MELLON UNIVERSITY DOES NOT MAKE ANY WARRANTY OF ANY KIND WITH RESPECT TO FREEDOM FROM PATENT, TRADEMARK, OR COPYRIGHT INFRINGEMENT.

Licensed under a MIT (SEI)-style license, please see license.txt or contact permission@sei.cmu.edu for full terms.

[DISTRIBUTION STATEMENT A] This material has been approved for public release and unlimited distribution. Please see Copyright notice for non-US Government use and distribution.

This Software includes and/or makes use of Third-Party Software each subject to its own license.

DM23-2311