This package provides tools for working with NetSA network flow data files in the Apache NiFi system. These currently consist of:
SilkReader: A Record Reader Controller Service for SiLK flow files, to be used with Apache NiFi record processors such as ConvertRecord, QueryRecord, ScriptedFilterRecord, or others.
FilterBasicSilk: A very simple SiLK record-filtering processor, which provides much greater performance when filtering flow records than using SilkReader with ScriptedFilterRecord.
As of January 2024, NetSA NiFi currently supports Apache NiFi versions 1.18 through 1.24. The directions below assume Apache NiFi 1.24.0, and may vary for different versions. NetSA NiFI has not yet been tested with Apache NiFi 2.
A typical workflow would be to read SiLK files from some source, feed those through one or more FilterBasicSilk processors to limit the output to records of interest, and then send the output from there into a ConvertRecord processor to read SiLK records and write JSON records.
Schematically:
[A] GetFile (read from some directory)
|
| success
v
[B] FilterBasicSilK (web traffic)
| application = 80
|
| filter_pass
v
[C] ConvertRecord (convert to JSON)
| record reader = SilkRecordReader
| record writer = JSONRecordSetWriter
|
| success
v
[D] PutFile (write to some directory)
The FilterBasicSilk processor does not provide anything in the way of logical expressions, it only provides simple ways to accept flows when a column has one of a set of specified values. In order to build more complicated logical expressions, it may be necessary to use multiple FilterBasicSilk processors, and even to use both the pass and fail output of processors. For example:
:
v
[F] FilterBasicSilK (web traffic)
| application = 80
|
| filter_pass
v
[G] FilterBasicSilk (no port is 80)
| aport = 80
|
| filter_fail
v
(X)
Note that the “filter_fail” relationship is used for G, which means that records that fail its filter are used. This effectively negates the filter.
By filtering on “application = 80” and sending its pass output to a filter saying “aport = 80” and using the fail output of that second filter, you filter the data to produce “records which were detected to be web traffic, but which did not involve port 80 on either end of the conversation”.
Further, it’s possible to use both the pass and fail output of a single filter. For example, if we extend the above processing pipeline with another processor:
:
v
[F] FilterBasicSilK (web traffic) ------+
| application = 80 |
| |
| filter_pass | filter_fail
v v
[G] FilterBasicSilk (no port is 80) [H] FilterBasicSilk (any port is 80)
| aport = 80 | aport = 80
| |
| filter_fail | filter_pass
v v
(X) (Y)
When the fail output of [F] is passed to [H], now the output of [H] is “records which were not detected to be web traffic, but which did involve port 80 on one side or other of the conversation.”
So now X is receiving flows which are HTTP but on an unusual port, and Y is receiving flows which are not HTTP but which are on a web port.
Note that in this case, the processed files in X and Y may have the same original filename. If you later combine these two streams of data and send them to a processor like PutFile which requires filenames to not be duplicated, you’ll have problems. You may need to use an UpdateAttribute processor to rename the files (by adding a suffix to them, for example.) If you send them to a processor that does not care about filenames, this will not be a problem.
Rather than using FilterBasicSilk, you may choose to convert SiLK records to other formats and use more powerful but slower scripting capabilities by using the SilkReader Record Reader. To do this, simply use a SilkReader service as the Record Reader for your record-based processors. See below for configuration details.
SilkReader is a Controller Service which provides the RecordReader interface. It reads input files which contain SiLK flow data and produces Records with all of the fields of the SiLK flow records, as described below.
Since SilkReader is a Controller Service, you may either create it directly by configuring a Process Group or create it at need when configuring a record-based Processor.
To create it within a Process Group, select the Controller Services
tab, press the +
button, and select SilkReader from the
list.
To create it from a Processor, choose the Record Reader property, and then select “Create new service…” from the dropdown. Then choose SilkReader from the dropdown list.
Once a SilkReader is created, it may be configured by setting its SiLK Config File property to the path of a SiLK configuration file. This enables the SilkReader to assign symbolic sensor, class, and type names to flows. Without a configuration file, this information is only available numerically.
A single SilkReader service may be used by multiple Processors which require a RecordReader. Simply choose the existing SilkReader instance from the dropdown list for their Record Reader properties.
SiLK records have a fixed schema, which is used for all records and
cannot be configured. The following fields are defined, which generally
match the fields in the SiLK rwcut tool (see below for differences):
sIP
, dIP
, sPort
,
dPort
, protocol
, packets
,
bytes
, flags
, flagsBits
,
sTime
, duration
, eTime
,
sensor
, sensorName
, className
,
type
, typeName
, iType
,
iCode
, in
, out
,
nhIP
, initialFlags
,
initialFlagsBits
, sessionFlags
,
sessionFlagsBits
, attributes
,
attributesBits
, application
, and
memo
. You can read more about these fields in the rwcut manual
page.
The flags
, initialFlags
,
sessionFlags
, and attributes
fields are all
represented textually (the default in rwcut), while
flagsBits
, initialFlagsBits
,
sessionFlagsBits
, and attributesBits
are the
numeric values for these fields. (As if the
--integer-tcp-flags
option was used.)
Unlike in rwcut, the sensor, class, and type names for records are
presented in the sensorName
, className
, and
typeName
fields. These values are only available if the
SilkReader is configured with a SiLK configuration file, otherwise they
all result in ?
. The sensor
and
type
fields are numeric. The sensor
field
contains the sensor ID which is presented by rwcut when no SiLK
configuration file is available, or when the
--integer-sensors
option is used. The type
field contains the flowtype ID, which is presented by rwcut in the type
field when no SiLK configuration file is available.
Finally, the sTime
, duration
, and
eTime
fields all contain millisecond resolution.
sTime
and eTime
are provided as timestamp
value (which may be transformed to a specific text format when
converting to JSON or another output format.) duration
is
provided as the number of seconds with double precision.
The FilterBasicSilk processor provides filtering similar in
capability to the SiLK rwfilter tool. It reads SiLK network
flow data files, filters them based on the properties set on the
processor, and then outputs new files which contain only the records
which pass (filter_pass
) or fail (filter_fail
)
the filtering properties. If there are any processing errors, input
files will be passed to the failure
relationship. And all
input files are also passed to the original
relationship.
Providing a Silk Config File to the processor will allow it to understand SiLK sensor, class, and type names. Without a config file, you may still filter based on sensor ID and flowtype ID values.
Other properties are more-or-less equivalent to similarly named options to the rwfilter command. (You can learn more about these options in the rwfilter manual page.
The following properties are understood by FilterBasicSilk. Details
which differ from rwfilter are decribed below:
flowtypes
, sensors
, protocol
,
sport
, dport
, aport
,
icmp-type
, icmp-code
, flags-all
,
flags-initial
, flags-session
,
scidr
, sipset
, dcidr
,
dipset
, any-cidr
, anyset
,
nhcidr
, nhipset
, bytes
,
packets
, bytes-per-packet
,
active-time
, stime
, etime
,
duration
, application
,
attributes
, ip-version
,
input-index
, output-index
, and
any-index
.
Each property’s documentation in the NiFi configuration interface gives details on the format of the field, and these formats generally match those of the rwfilter options arguments.
Notable differences from rwfilter include:
The --class
and --type
options are
unavailable, and flowtypes
should be used instead. In
rwfilter these two methods cannot be used at the same time, and
the flowtypes
mechanism is more flexible.
The --start-date
and --end-date
options are
not included. These specify which stored SiLK data files should be
referred to, but for filtering it is necessary to use the
stime
, etime
, or active-time
properties.
The --saddress
, --daddress
,
--any-address
, and --next-hop-id
options are
not provided. Instead of using the IP address wildcard mechanism
provided by these options, use scidr
, dcidr
,
any-cidr
, and nhcidr
. These provide matching
on a comma-separated list of addresses and CIDR-style
(i.e. 1.2.0.0/16
or ::ffff:0.0.0.0/96
) address
blocks.
None of the individual flag options like --fin-flag
are
provided. Instead use flags-all
and match based on
comma-separated high/mask combinations.
The country code, address type, tuple file, and prefix map features are not supported.
Finally, the primary difference is that unlike rwfilter,
FilterBasicSilk does not provide any --not-X
options. For
example, the FilterBasicSilk sport
property is equivalent
to the rwfilter --sport
option, but
FilterBasicSilk has no equivalent to the --not-sport
option. See below for details about how to work around this
limitation.
If you only filter based on negative results from some set of
options, it’s easy enough to use the filter_fail
output
instead of the filter_pass
output. However, should you need
to combine some number of positive and negative filters, you may need to
use two FilterBasicSilk processors for what would be a single
rwfilter call. For example:
rwfilter --not-scidr=10.0.0.0/8 --protocol=6 --aport=80,443
In this case, the filter is finding flows which did not originate
from the network 10.0.0.0/8
, and which were TCP flows which
had either source or destination port 80 or 443. In order to represent
this as two FilterBasicSilk processors, you could use:
:
v
[J] FilterBasicSilk (sip not in 10.0.0.0/8)
| scidr = 10.0.0.0/8
|
| filter_fail
v
[K] FilterBasicSilk (TCP port is 80 or 443)
| protocol = 6
| aport = 80,443
|
| filter_pass
v
(Z)
Here, Z would receive SiLK flow records which failed the first filter
(and so were not in 10.0.0.0/8
) and passed the second (and
so were TCP flows with either port being 80 or 443.)
Of course, J and K could be reversed, and more complicated combinations can represent a wider array of logical filtering expressions. (See the Advanced Filtering section above.)
SilkReader and FilterBasicSilk may be configured to re-read certain configuration files when they change on disk. Some care should be taken with this feature, since they can result in unvalidated changes when the new files are made available.
If a SilkReader’s “Reload SiLK Config File” option is set to true, then if the named SiLK configuration file changes wihle the processor is active, the processor will attempt to re-load it when the next file is processed. If reading the new configuration file fails, the processor continues to use the old configuration and will not attempt to re-load again until the configuration file changes again.
The FilterBasicSilk “Reload SiLK Config File” option works the same way, but FilterBasicSiLK also has a “Reload IP Set Files” option. When the “Reload IP Set Files” option is activated, changes to the files named for “sipset”, “dipset”, “nhipset”, or “anyset” will be re-loaded when they change. Like with the SiLK config file, if reloading one of these files fails the old set will continue to be used and that file will not be re-read again until it changes again.
When any of these files produces a failure when being re-read, that failure will be logged (and should produce an error flag in the Apache NiFi UI). If the failure is not corrected before processors or services are for some reason deactivated, the failures will prevent the processors or services from being activated again, since checking the configuration will fail.
You can use the provided script to build a NiFi Archive (NAR) containing NetSA NiFi, suitable for deployment to Apache NiFi:
./scripts/build_nar.sh
If you’d rather build the NAR file yourself, the script does the following:
First, mill is used to build the Scala source code into a jar file
with the software, and install it into your
~/.m2/repository
directory:
./mill netsa-nifi.publishM2Local
Second, Maven is used to collect the jar file and its dependencies into a nar file with the appropriate metadata:
cd netsa-nifi-nar
mvn clean package -Dnetsa.nifi.version=X.Y.Z
You can use the following call to mill to determine the version number of NetSA NiFi to provide to Maven:
./mill show netsa-nifi.publishVersion
The file .../target/netsa-nifi-nar-X.Y.Z.nar
will be the
NiFi Archive, which can be placed in Apache NiFi’s lib
directory (or you might place it into Apache NiFi’s library in another
way.)
Once Apache NiFi is restarted, the new processor and controller service should be available.
This Software includes and/or makes use of Third-Party Software each subject to its own license, including but not limited to:
Apache NiFi (https://github.com/apache/nifi/blob/rel/nifi-1.18.0/LICENSE) Copyright 2014-2022 The Apache Software Foundation
Scala (https://github.com/scala/scala/blob/v2.13.12/LICENSE) Copyright (c) 2002-2023 EPFL Copyright (c) 2011-2023 Lightbend, Inc.
mill (https://github.com/com-lihaoyi/mill/blob/0.11.5/LICENSE) Copyright (c) 2017 Li Haoyi (haoyi.sg@gmail.com)
millw (https://github.com/lefou/millw/blob/0.4.10/LICENSE) Copyright 2023 Tobias Roeser
NetSA NiFi 1.1
Copyright 2023 Carnegie Mellon University.
NO WARRANTY. THIS CARNEGIE MELLON UNIVERSITY AND SOFTWARE ENGINEERING INSTITUTE MATERIAL IS FURNISHED ON AN “AS-IS” BASIS. CARNEGIE MELLON UNIVERSITY MAKES NO WARRANTIES OF ANY KIND, EITHER EXPRESSED OR IMPLIED, AS TO ANY MATTER INCLUDING, BUT NOT LIMITED TO, WARRANTY OF FITNESS FOR PURPOSE OR MERCHANTABILITY, EXCLUSIVITY, OR RESULTS OBTAINED FROM USE OF THE MATERIAL. CARNEGIE MELLON UNIVERSITY DOES NOT MAKE ANY WARRANTY OF ANY KIND WITH RESPECT TO FREEDOM FROM PATENT, TRADEMARK, OR COPYRIGHT INFRINGEMENT.
Licensed under a MIT (SEI)-style license, please see license.txt or contact permission@sei.cmu.edu for full terms.
[DISTRIBUTION STATEMENT A] This material has been approved for public release and unlimited distribution. Please see Copyright notice for non-US Government use and distribution.
This Software includes and/or makes use of Third-Party Software each subject to its own license.
DM23-2311