Provision Additional Protocol-specific Infrastructure with Terraform

Edit protocols.tf and update the "for_each" key with the name of the new protocol/dataset.

for_each = toset(["smb"])  -->  for_each = toset(["smb", "ftp"])

Currently, the protocol/dataset name must match one of the keys in the standard_pps dictionary defined in scripts/lambda_functions/get_org_info.py

Run 'terraform apply'.

This will automatically take care of additional protocol-specific infrastructure, including new S3 directory, DynamoDB tables, AutoScaling launch template and rules for baseline regeneration, and updates to relevant IAM policies.

Collect Protocol Traffic for Initial Baseline

The goal here is to create a single SiLK binary file containing the last 90 days (or at least a recent 90 days - it doesn't have to be exactly the previous 90 days) of flows representing the protocol of interest. The best way to do this is to perform one or more queries against your SiLK repository with the appropriate filters, then deduplicate, combine, and sort the resulting flows into a single SiLK binary file.

It is recommended to start one or more screen sessions on a SiLK analysis server. Make sure to use the --threads option to speed up each query (as shown in the below example).

To capture flows representing the protocol of interest, see the "Thinking About SiLK Filters" section below.

An example considering the SMB protocol would be as follows (two queries):

Get flows with relevant application label.

rwfilter --start-date=<90_DAYS_AGO_FROM_YESTERDAY> --end-date=<YESTERDAYS_DATE> \
  --type=out,outweb \
  --protocol=6 --flags-initial=S/SAPFR --packets=4- --bytes-per=65- \
  --not-dcidr=10.0.0.0/8,172.16.0.0/12,192.168.0.0/16,100.64.0.0/10 \
  --application=139 --threads=4 --pass=smb_90days_app.bin

Get flows with relevant destination port(s), but remove flows matching the application which we captured in the previous query. Also note we no longer include --type=outweb because outweb refers to flows whose dport is one of (80, 443, 8080) and we are specifically filtering for dports not in that set.

rwfilter --start-date=<90_DAYS_AGO_FROM_YESTERDAY> --end-date=<YESTERDAYS_DATE> \
  --type=out \
  --protocol=6 --flags-initial=S/SAPFR --packets=4- --bytes-per=65- \
  --not-dcidr=10.0.0.0/8,172.16.0.0/12,192.168.0.0/16,100.64.0.0/10 \
  --dport=139,445 --threads=4 --pass=stdout |
  rwfilter stdin --application=139 --fail=smb_90days_dport.bin

Next, deduplicate, combine, and sort the query result files into a single SiLK binary file.

rwdedupe smb_90days_app.bin smb_90days_dport.bin |
  rwsort --fields=stime --output-path=smb_deduped_combined.bin

Convert and Load the Initial Protocol Traffic to S3

In this step, each flow record is lightly transformed and enriched with source org and destination ASN information.

Use the provided Python script, convert_initial_protocol_traffic.py, to perform these transformations/enrichments. This script requires the following to run:

  1. The SiLK rwtools installed and available in the shell's path
  2. Python >= 3.6
  3. Python libraries: dnspython, tqdm
  4. The ability to make DNS TXT queries to Team Cymru's IP-to-ASN Mapping service
  5. The SiLK binary file generated in the previous step
  6. The ddb-org-info-items.json file, which should have been generated during the "Initial Loads" steps

Run the script.

python3 convert_initial_protocol_traffic.py \
  -b smb_90days_combined.bin -a ddb-org-info-items.json -p smb

Once the script completes, you should find multiple files have been generated in the current working directory with the filename (e.g., filenames like <protocol>__<iso-date>__<uuid4>) and file content format expected for processing later. Upload each of those files to the protocol's baseline_traffic directory in S3.

Lastly, you should find an output file, named <protocol>-baseline-traffic-agg.psv - this will be used in the next step.

Generate and Load the Initial Baseline Metrics to DynamoDB

Use the provided Python script, generate_initial_baseline_metrics.py, to generate the baseline metrics and load those to the newly provisioned DynamoDB tables. This script requires the following to run:

  1. Python >= 3.6
  2. Python libraries: boto3, numpy, tqdm
  3. PutItem and BatchWriteItem permissions and network access to the protocol-specific DynamoDB tables
  4. See Terraform outputs for table_prefix and aws_region values

Following our example considering the SMB protocol, run the following:

python3 generate_initial_baseline_metrics.py \
  -r smb-baseline-traffic-agg.psv -p smb -t <table_prefix> --region <aws_region>

If there were no errors, you may remove the <protocol>-baseline-traffic-agg.psv file as it is no longer needed (unless you want to keep it in order to generate pseudo-protocol traffic).

Update the NiFi UNX-OBP Dataflow

In the UNX-OBP INPUT Processor Group...

  1. Add a new Processor Group for the new protocol
    1. Since most protocol filters will follow very similar logic, you can just copy/paste the existing SMB Processor Group and rename it for the new protocol. Go into the new protocol Processor Group (i.e., double-click it).
    2. Update the names of the Input Port, Processors, and Output Port, as well as modify processor properties for the new protocol, essentially mirroring the relevant filters used in onboarding Step 1. You should not need to adjust any of the connections/Relationships.
    3. Adjust the <protocol> value for the given "unx_obp_proto" property in each of the UpdateRecord and UpdateAttribute processors.
  2. Back in the UNX-OBP INPUT view...
    1. Make a new connection from the Non-rfc1918 processor to the new protocol Processor Group, selecting the "filter-fail" Relationship. Confirm the new protocol Input Port name is selected in the connection settings.
    2. Make a new connection from the new protocol Processor Group to the Funnel above the SplitRecord processor. Confirm the new protocol Output Port name is selected in the connection settings.
  3. Add a new property in the RouteOnAttribute configuration for the new protocol - this will create a new <protocol> Relationship. Copy the existing value, but update it for the new protocol.
  4. Determine the appropriate ControlRate properties (see the "A Rough Formula for Setting NiFi ControlRate Properties" section below). Copy/paste the existing ControlRate processor, rename and update for the new protocol with those properties set.
    1. Make a new connection from RouteOnAttribute to the ControlRate processor via the new <protocol> Relationship.
    2. Make a new connection from the ControlRate processor to the Funnel above the "Send to Analytic Incoming Queue" processor via the "success" Relationship.

Finally, confirm that none of the processors are in an invalid state - if so, fix the issue; otherwise, Start the entire dataflow.

Update and Load New Protocol Thresholds

The analytic will not alert on any new protocol traffic if that protocol does not have an entry in the Protocol-Thresholds DynamoDB table.

It is generally recommended to let everything run for a day or two, confirm that things are working as expected, and then complete this step.

When ready, update the config.json file with the new protocol and it's baseline thresholds/parameters - then run the load_thresholds.py script to update the Protocol-Thresholds DynamoDB table. See Terraform outputs for table_prefix and aws_region values.

python3 load_thresholds.py -r config.json -t <table_prefix> --region <aws_region>

You're done!

Extra Tips

Thinking About SiLK Filters

For this analytic, we are only interested in outbound protocol use, so any filter will include: --type=out,outweb or --type=out (this filter on it's own denotes that traffic may be request or response, but it is going from your IP space/asset to some external IP space/asset)

Furthermore, most of the protocol traffic we are interested in is TCP-based (filtering for outbound, asset-initiated UDP-based flows is problematic to do with SiLK and should generally be avoided here), and we are only interested in connections initiated by your assets, with at least some data transfer. So the vast majority of filters will also include:

--protocol=6 --flags-initial=S/SAPFR --packets=4- --bytes-per=65-

Also, we're only interested in outbound flows going to external/publicly routable IPs. While this list of CIDR blocks to exclude could be more complete, it covers most things you are likely to see (and we're largely unconcerned with enriching IPv6 at this time). So likewise, the vast majority of filters will also include:

--not-dcidr=10.0.0.0/8,172.16.0.0/12,192.168.0.0/16,100.64.0.0/10

However, it is possible to filter for outbound, asset-initiated ICMP or ICMPv6-based flows as long as you know the ICMP request types and/or codes you are interested in. For example, to identify all asset-initiated pings (echo request), the filter would include: --icmp-type=8

Filtering for the actual protocol of interest, which is usually an application layer protocol, generally includes filtering for both:

  1. Flows identified with the relevant application label for the protocol, regardless of destination port
    1. This is the strongest indication that a flow represents the protocol
    2. For this query, you will always include --type=out,outweb
  2. Flows whose destination port is one of the well-known, standard ports for the protocol (including related secure/implicit TLS ports), regardless of application label
    1. This is a weaker but still reasonable indication that a flow represents the protocol
    2. For this query, you only need to include 'outweb' in the --type parameter if any one of the destination ports is 80, 443, or 8080

In other words, you should filter for flow characteristics that should represent the protocol in an ideal world. This enables the analytic to identify not only new, rare, and/or inconsistent standard use of the protocol but also non-standard use of the protocol or it's standard ports (i.e., a different/unexpected application using the protocol's standard port(s), or the protocol using a different/unexpected port).

A Rough Formula for Setting NiFi ControlRate Properties

We generally assume that monitored traffic, in the form of SiLK binary files generated by multiple sensors, potentially across various geographical locations, comes to the SiLK NiFi processor in a staggered manner across time. And so at various times over the course of, say, an hour these files get parsed, filtered for protocol traffic of interest, undergo a few transformations, and then flow records get submitted to the incoming queue in batches.

To manage and balance queue message submission rate, concurrent executions of Lambda functions, and timely processing of protocol traffic, we can (over-)estimate a maximum number of protocol flows that are likely to show up in an hour's time. This way, the capability can handle the regular rate of protocol traffic and unexpectedly large dumps of protocol traffic should it occur. Moreover, having a ControlRate processor for each protocol not only enables you to finely tune queue message submission rate for that protocol, but also enables you to tweak, manage and balance queue message submission rate across all of the protocols you choose to onboard.

The goal is to minimize the queue message submissions per unit time, thus minimizing concurrent executions, while targeting a total NiFi processing time per file of at most 30 minutes or 1800 seconds (it MUST be less than an hour; otherwise, we're potentially receiving more protocol traffic than we can process in the same amount of time). I like to target 10 minutes (600 seconds) as long as it doesn't push up the concurrent executions too much.

  1. Determine the average flows per day over the last X days of protocol traffic, where X is 60-90 days. You can get this value from an existing SiLK binary file.

    rwcount existing.bin \
      --bin-size=86400 --no-titles --delimited=' ' |
      awk '{ total += $2; count++ } END { print total/count }'
  2. Divide the previous figure by 24 to get rough average flows per hour.
  3. Traffic typically will not be so evenly distributed - it may burst at times and/or follow work hours, etc. So multiply average flows per hour by 2.5 and round up to nearest thousand.
  4. Divide the previous figure by the total NiFi processing time target (e.g., 1800, 900, or 600 seconds for processing time targets of 30, 15, or 10 minutes, respectively) and round up to the nearest integer. This is the minimum number of flows that needs to be submitted per second.
  5. Since we want to submit messages often but NiFi prefers somewhat larger intervals for counter accuracy, use a minimum of 5 seconds for the "Time Duration" property.
  6. Finally, multiply the figure from #4 by the figure from #5 - this is the value you should set for the "Maximum Rate" property.

See Error Handling and Monitoring for information about monitoring concurrent executions of Lambda functions using CloudWatch Metrics.