Edit protocols.tf and update the "for_each" key with the name of the new protocol/dataset.
for_each = toset(["smb"]) --> for_each = toset(["smb", "ftp"])
Currently, the protocol/dataset name must match one of the keys in the standard_pps dictionary defined in scripts/lambda_functions/get_org_info.py
Run 'terraform apply'.
This will automatically take care of additional protocol-specific infrastructure, including new S3 directory, DynamoDB tables, AutoScaling launch template and rules for baseline regeneration, and updates to relevant IAM policies.
The goal here is to create a single SiLK binary file containing the last 90 days (or at least a recent 90 days - it doesn't have to be exactly the previous 90 days) of flows representing the protocol of interest. The best way to do this is to perform one or more queries against your SiLK repository with the appropriate filters, then deduplicate, combine, and sort the resulting flows into a single SiLK binary file.
It is recommended to start one or more screen sessions on a SiLK analysis server. Make sure to use the --threads option to speed up each query (as shown in the below example).
To capture flows representing the protocol of interest, see the "Thinking About SiLK Filters" section below.
An example considering the SMB protocol would be as follows (two queries):
Get flows with relevant application label.
rwfilter --start-date=<90_DAYS_AGO_FROM_YESTERDAY> --end-date=<YESTERDAYS_DATE> \
--type=out,outweb \
--protocol=6 --flags-initial=S/SAPFR --packets=4- --bytes-per=65- \
--not-dcidr=10.0.0.0/8,172.16.0.0/12,192.168.0.0/16,100.64.0.0/10 \
--application=139 --threads=4 --pass=smb_90days_app.bin
Get flows with relevant destination port(s), but remove flows matching the application which we captured in the previous query. Also note we no longer include --type=outweb because outweb refers to flows whose dport is one of (80, 443, 8080) and we are specifically filtering for dports not in that set.
rwfilter --start-date=<90_DAYS_AGO_FROM_YESTERDAY> --end-date=<YESTERDAYS_DATE> \
--type=out \
--protocol=6 --flags-initial=S/SAPFR --packets=4- --bytes-per=65- \
--not-dcidr=10.0.0.0/8,172.16.0.0/12,192.168.0.0/16,100.64.0.0/10 \
--dport=139,445 --threads=4 --pass=stdout |
rwfilter stdin --application=139 --fail=smb_90days_dport.bin
Next, deduplicate, combine, and sort the query result files into a single SiLK binary file.
rwdedupe smb_90days_app.bin smb_90days_dport.bin |
rwsort --fields=stime --output-path=smb_deduped_combined.bin
In this step, each flow record is lightly transformed and enriched with source org and destination ASN information.
Use the provided Python script, convert_initial_protocol_traffic.py, to perform these transformations/enrichments. This script requires the following to run:
Run the script.
python3 convert_initial_protocol_traffic.py \
-b smb_90days_combined.bin -a ddb-org-info-items.json -p smb
Once the script completes, you should find multiple files have been generated in the current working directory with the filename (e.g., filenames like <protocol>__<iso-date>__<uuid4>) and file content format expected for processing later. Upload each of those files to the protocol's baseline_traffic directory in S3.
Lastly, you should find an output file, named <protocol>-baseline-traffic-agg.psv - this will be used in the next step.
Use the provided Python script, generate_initial_baseline_metrics.py, to generate the baseline metrics and load those to the newly provisioned DynamoDB tables. This script requires the following to run:
Following our example considering the SMB protocol, run the following:
python3 generate_initial_baseline_metrics.py \
-r smb-baseline-traffic-agg.psv -p smb -t <table_prefix> --region <aws_region>
If there were no errors, you may remove the <protocol>-baseline-traffic-agg.psv file as it is no longer needed (unless you want to keep it in order to generate pseudo-protocol traffic).
Finally, confirm that none of the processors are in an invalid state - if so, fix the issue; otherwise, Start the entire dataflow.
The analytic will not alert on any new protocol traffic if that protocol does not have an entry in the Protocol-Thresholds DynamoDB table.
It is generally recommended to let everything run for a day or two, confirm that things are working as expected, and then complete this step.
When ready, update the config.json file with the new protocol and it's baseline thresholds/parameters - then run the load_thresholds.py script to update the Protocol-Thresholds DynamoDB table. See Terraform outputs for table_prefix and aws_region values.
python3 load_thresholds.py -r config.json -t <table_prefix> --region <aws_region>
You're done!
For this analytic, we are only interested in outbound protocol use, so any filter will include: --type=out,outweb or --type=out (this filter on it's own denotes that traffic may be request or response, but it is going from your IP space/asset to some external IP space/asset)
Furthermore, most of the protocol traffic we are interested in is TCP-based (filtering for outbound, asset-initiated UDP-based flows is problematic to do with SiLK and should generally be avoided here), and we are only interested in connections initiated by your assets, with at least some data transfer. So the vast majority of filters will also include:
--protocol=6 --flags-initial=S/SAPFR --packets=4- --bytes-per=65-
Also, we're only interested in outbound flows going to external/publicly routable IPs. While this list of CIDR blocks to exclude could be more complete, it covers most things you are likely to see (and we're largely unconcerned with enriching IPv6 at this time). So likewise, the vast majority of filters will also include:
--not-dcidr=10.0.0.0/8,172.16.0.0/12,192.168.0.0/16,100.64.0.0/10
However, it is possible to filter for outbound, asset-initiated ICMP or ICMPv6-based flows as long as you know the ICMP request types and/or codes you are interested in. For example, to identify all asset-initiated pings (echo request), the filter would include: --icmp-type=8
Filtering for the actual protocol of interest, which is usually an application layer protocol, generally includes filtering for both:
In other words, you should filter for flow characteristics that should represent the protocol in an ideal world. This enables the analytic to identify not only new, rare, and/or inconsistent standard use of the protocol but also non-standard use of the protocol or it's standard ports (i.e., a different/unexpected application using the protocol's standard port(s), or the protocol using a different/unexpected port).
We generally assume that monitored traffic, in the form of SiLK binary files generated by multiple sensors, potentially across various geographical locations, comes to the SiLK NiFi processor in a staggered manner across time. And so at various times over the course of, say, an hour these files get parsed, filtered for protocol traffic of interest, undergo a few transformations, and then flow records get submitted to the incoming queue in batches.
To manage and balance queue message submission rate, concurrent executions of Lambda functions, and timely processing of protocol traffic, we can (over-)estimate a maximum number of protocol flows that are likely to show up in an hour's time. This way, the capability can handle the regular rate of protocol traffic and unexpectedly large dumps of protocol traffic should it occur. Moreover, having a ControlRate processor for each protocol not only enables you to finely tune queue message submission rate for that protocol, but also enables you to tweak, manage and balance queue message submission rate across all of the protocols you choose to onboard.
The goal is to minimize the queue message submissions per unit time, thus minimizing concurrent executions, while targeting a total NiFi processing time per file of at most 30 minutes or 1800 seconds (it MUST be less than an hour; otherwise, we're potentially receiving more protocol traffic than we can process in the same amount of time). I like to target 10 minutes (600 seconds) as long as it doesn't push up the concurrent executions too much.
Determine the average flows per day over the last X days of protocol traffic, where X is 60-90 days. You can get this value from an existing SiLK binary file.
rwcount existing.bin \
--bin-size=86400 --no-titles --delimited=' ' |
awk '{ total += $2; count++ } END { print total/count }'
Finally, multiply the figure from #4 by the figure from #5 - this is the value you should set for the "Maximum Rate" property.
See Error Handling and Monitoring for information about monitoring concurrent executions of Lambda functions using CloudWatch Metrics.