Merge pull request #10503 from Security-Onion-Solutions/jertel/pcap

add ability to output PCAP import results in JSON format
This commit is contained in:
Jason Ertel
2023-06-05 14:32:32 -04:00
committed by GitHub
2 changed files with 236 additions and 92 deletions

View File

@@ -14,19 +14,56 @@
{%- set ES_PASS = salt['pillar.get']('elasticsearch:auth:users:so_elastic_user:pass', '') %} {%- set ES_PASS = salt['pillar.get']('elasticsearch:auth:users:so_elastic_user:pass', '') %}
INDEX_DATE=$(date +'%Y.%m.%d') INDEX_DATE=$(date +'%Y.%m.%d')
RUNID=$(cat /dev/urandom | tr -dc 'a-z0-9' | fold -w 8 | head -n 1)
LOG_FILE=/nsm/import/evtx-import.log LOG_FILE=/nsm/import/evtx-import.log
. /usr/sbin/so-common . /usr/sbin/so-common
function usage { function usage {
cat << EOF cat << EOF
Usage: $0 <evtx-file-1> [evtx-file-2] [evtx-file-*] Usage: $0 [options] <evtx-file-1> [evtx-file-2] [evtx-file-*]
Imports one or more evtx files into Security Onion. The evtx files will be analyzed and made available for review in the Security Onion toolset. Imports one or more evtx files into Security Onion. The evtx files will be analyzed and made available for review in the Security Onion toolset.
Options:
--json Outputs summary in JSON format. Implies --quiet.
--quiet Silences progress information to stdout.
EOF EOF
} }
quiet=0
json=0
INPUT_FILES=
while [[ $# -gt 0 ]]; do
param=$1
shift
case "$param" in
--json)
json=1
quiet=1
;;
--quiet)
quiet=1
;;
-*)
echo "Encountered unexpected parameter: $param"
usage
exit 1
;;
*)
if [[ "$INPUT_FILES" != "" ]]; then
INPUT_FILES="$INPUT_FILES $param"
else
INPUT_FILES="$param"
fi
;;
esac
done
function status {
msg=$1
[[ $quiet -eq 1 ]] && return
echo "$msg"
}
function evtx2es() { function evtx2es() {
EVTX=$1 EVTX=$1
@@ -42,31 +79,30 @@ function evtx2es() {
} }
# if no parameters supplied, display usage # if no parameters supplied, display usage
if [ $# -eq 0 ]; then if [ "$INPUT_FILES" == "" ]; then
usage usage
exit 1 exit 1
fi fi
# ensure this is a Manager node # ensure this is a Manager node
require_manager require_manager @> /dev/null
# verify that all parameters are files # verify that all parameters are files
for i in "$@"; do for i in $INPUT_FILES; do
if ! [ -f "$i" ]; then if ! [ -f "$i" ]; then
usage
echo "\"$i\" is not a valid file!" echo "\"$i\" is not a valid file!"
exit 2 exit 2
fi fi
done done
# track if we have any valid or invalid evtx
INVALID_EVTXS="no"
VALID_EVTXS="no"
# track oldest start and newest end so that we can generate the Kibana search hyperlink at the end # track oldest start and newest end so that we can generate the Kibana search hyperlink at the end
START_OLDEST="2050-12-31" START_OLDEST="2050-12-31"
END_NEWEST="1971-01-01" END_NEWEST="1971-01-01"
INVALID_EVTXS_COUNT=0
VALID_EVTXS_COUNT=0
SKIPPED_EVTXS_COUNT=0
touch /nsm/import/evtx-start_oldest touch /nsm/import/evtx-start_oldest
touch /nsm/import/evtx-end_newest touch /nsm/import/evtx-end_newest
@@ -74,27 +110,39 @@ echo $START_OLDEST > /nsm/import/evtx-start_oldest
echo $END_NEWEST > /nsm/import/evtx-end_newest echo $END_NEWEST > /nsm/import/evtx-end_newest
# paths must be quoted in case they include spaces # paths must be quoted in case they include spaces
for EVTX in "$@"; do for EVTX in $INPUT_FILES; do
EVTX=$(/usr/bin/realpath "$EVTX") EVTX=$(/usr/bin/realpath "$EVTX")
echo "Processing Import: ${EVTX}" status "Processing Import: ${EVTX}"
# generate a unique hash to assist with dedupe checks # generate a unique hash to assist with dedupe checks
HASH=$(md5sum "${EVTX}" | awk '{ print $1 }') HASH=$(md5sum "${EVTX}" | awk '{ print $1 }')
HASH_DIR=/nsm/import/${HASH} HASH_DIR=/nsm/import/${HASH}
echo "- assigning unique identifier to import: $HASH" status "- assigning unique identifier to import: $HASH"
if [[ "$HASH_FILTERS" == "" ]]; then
HASH_FILTERS="import.id:${HASH}"
HASHES="${HASH}"
else
HASH_FILTERS="$HASH_FILTERS%20OR%20import.id:${HASH}"
HASHES="${HASHES} ${HASH}"
fi
if [ -d $HASH_DIR ]; then if [ -d $HASH_DIR ]; then
echo "- this EVTX has already been imported; skipping" status "- this EVTX has already been imported; skipping"
INVALID_EVTXS="yes" SKIPPED_EVTXS_COUNT=$((SKIPPED_EVTXS_COUNT + 1))
else else
VALID_EVTXS="yes"
EVTX_DIR=$HASH_DIR/evtx EVTX_DIR=$HASH_DIR/evtx
mkdir -p $EVTX_DIR mkdir -p $EVTX_DIR
# import evtx and write them to import ingest pipeline # import evtx and write them to import ingest pipeline
echo "- importing logs to Elasticsearch..." status "- importing logs to Elasticsearch..."
evtx2es "${EVTX}" $HASH evtx2es "${EVTX}" $HASH
if [[ $? -ne 0 ]]; then
INVALID_EVTXS_COUNT=$((INVALID_EVTXS_COUNT + 1))
status "- WARNING: This evtx file may not have fully imported successfully"
else
VALID_EVTXS_COUNT=$((VALID_EVTXS_COUNT + 1))
fi
# compare $START to $START_OLDEST # compare $START to $START_OLDEST
START=$(cat /nsm/import/evtx-start_oldest) START=$(cat /nsm/import/evtx-start_oldest)
@@ -118,38 +166,60 @@ for EVTX in "$@"; do
fi # end of valid evtx fi # end of valid evtx
echo status
done # end of for-loop processing evtx files done # end of for-loop processing evtx files
# remove temp files
echo "Cleaning up:"
for TEMP_EVTX in ${TEMP_EVTXS[@]}; do
echo "- removing temporary evtx $TEMP_EVTX"
rm -f $TEMP_EVTX
done
# output final messages # output final messages
if [ "$INVALID_EVTXS" = "yes" ]; then if [[ $INVALID_EVTXS_COUNT -gt 0 ]]; then
echo status
echo "Please note! One or more evtx was invalid! You can scroll up to see which ones were invalid." status "Please note! One or more evtx was invalid! You can scroll up to see which ones were invalid."
fi fi
START_OLDEST_FORMATTED=`date +%Y-%m-%d --date="$START_OLDEST"` START_OLDEST_FORMATTED=`date +%Y-%m-%d --date="$START_OLDEST"`
START_OLDEST_SLASH=$(echo $START_OLDEST_FORMATTED | sed -e 's/-/%2F/g') START_OLDEST_SLASH=$(echo $START_OLDEST_FORMATTED | sed -e 's/-/%2F/g')
END_NEWEST_SLASH=$(echo $END_NEWEST | sed -e 's/-/%2F/g') END_NEWEST_SLASH=$(echo $END_NEWEST | sed -e 's/-/%2F/g')
if [ "$VALID_EVTXS" = "yes" ]; then if [[ $VALID_EVTXS_COUNT -gt 0 ]] || [[ $SKIPPED_EVTXS_COUNT -gt 0 ]]; then
cat << EOF URL="https://{{ URLBASE }}/#/dashboards?q=$HASH_FILTERS%20%7C%20groupby%20-sankey%20event.dataset%20event.category%2a%20%7C%20groupby%20-pie%20event.category%20%7C%20groupby%20-bar%20event.module%20%7C%20groupby%20event.dataset%20%7C%20groupby%20event.module%20%7C%20groupby%20event.category%20%7C%20groupby%20observer.name%20%7C%20groupby%20source.ip%20%7C%20groupby%20destination.ip%20%7C%20groupby%20destination.port&t=${START_OLDEST_SLASH}%2000%3A00%3A00%20AM%20-%20${END_NEWEST_SLASH}%2000%3A00%3A00%20AM&z=UTC"
Import complete! status "Import complete!"
status
You can use the following hyperlink to view data in the time range of your import. You can triple-click to quickly highlight the entire hyperlink and you can then copy it into your browser: status "Use the following hyperlink to view the imported data. Triple-click to quickly highlight the entire hyperlink and then copy it into a browser:"
https://{{ URLBASE }}/#/dashboards?q=import.id:${RUNID}%20%7C%20groupby%20-sankey%20event.dataset%20event.category%2a%20%7C%20groupby%20-pie%20event.category%20%7C%20groupby%20-bar%20event.module%20%7C%20groupby%20event.dataset%20%7C%20groupby%20event.module%20%7C%20groupby%20event.category%20%7C%20groupby%20observer.name%20%7C%20groupby%20source.ip%20%7C%20groupby%20destination.ip%20%7C%20groupby%20destination.port&t=${START_OLDEST_SLASH}%2000%3A00%3A00%20AM%20-%20${END_NEWEST_SLASH}%2000%3A00%3A00%20AM&z=UTC status
status "$URL"
or you can manually set your Time Range to be (in UTC): status
From: $START_OLDEST_FORMATTED To: $END_NEWEST status "or, manually set the Time Range to be (in UTC):"
status
Please note that it may take 30 seconds or more for events to appear in Security Onion Console. status "From: $START_OLDEST_FORMATTED To: $END_NEWEST"
EOF status
status "Note: It can take 30 seconds or more for events to appear in Security Onion Console."
RESULT=0
else
START_OLDEST=
END_NEWEST=
URL=
RESULT=1
fi fi
if [[ $json -eq 1 ]]; then
jq -n \
--arg success_count "$VALID_EVTXS_COUNT" \
--arg fail_count "$INVALID_EVTXS_COUNT" \
--arg skipped_count "$SKIPPED_EVTXS_COUNT" \
--arg begin_date "$START_OLDEST" \
--arg end_date "$END_NEWEST" \
--arg url "$URL" \
--arg hashes "$HASHES" \
'''{
success_count: $success_count,
fail_count: $fail_count,
skipped_count: $skipped_count,
begin_date: $begin_date,
end_date: $end_date,
url: $url,
hash: ($hashes / " ")
}'''
fi
exit $RESULT

View File

@@ -15,12 +15,51 @@
function usage { function usage {
cat << EOF cat << EOF
Usage: $0 <pcap-file-1> [pcap-file-2] [pcap-file-N] Usage: $0 [options] <pcap-file-1> [pcap-file-2] [pcap-file-N]
Imports one or more PCAP files onto a sensor node. The PCAP traffic will be analyzed and made available for review in the Security Onion toolset. Imports one or more PCAP files onto a sensor node. The PCAP traffic will be analyzed and made available for review in the Security Onion toolset.
Options:
--json Outputs summary in JSON format. Implies --quiet.
--quiet Silences progress information to stdout.
EOF EOF
} }
quiet=0
json=0
INPUT_FILES=
while [[ $# -gt 0 ]]; do
param=$1
shift
case "$param" in
--json)
json=1
quiet=1
;;
--quiet)
quiet=1
;;
-*)
echo "Encountered unexpected parameter: $param"
usage
exit 1
;;
*)
if [[ "$INPUT_FILES" != "" ]]; then
INPUT_FILES="$INPUT_FILES $param"
else
INPUT_FILES="$param"
fi
;;
esac
done
function status {
msg=$1
[[ $quiet -eq 1 ]] && return
echo "$msg"
}
function pcapinfo() { function pcapinfo() {
PCAP=$1 PCAP=$1
ARGS=$2 ARGS=$2
@@ -84,7 +123,7 @@ function zeek() {
} }
# if no parameters supplied, display usage # if no parameters supplied, display usage
if [ $# -eq 0 ]; then if [ "$INPUT_FILES" == "" ]; then
usage usage
exit 1 exit 1
fi fi
@@ -96,31 +135,30 @@ if [ ! -d /opt/so/conf/suricata ]; then
fi fi
# verify that all parameters are files # verify that all parameters are files
for i in "$@"; do for i in $INPUT_FILES; do
if ! [ -f "$i" ]; then if ! [ -f "$i" ]; then
usage
echo "\"$i\" is not a valid file!" echo "\"$i\" is not a valid file!"
exit 2 exit 2
fi fi
done done
# track if we have any valid or invalid pcaps
INVALID_PCAPS="no"
VALID_PCAPS="no"
# track oldest start and newest end so that we can generate the Kibana search hyperlink at the end # track oldest start and newest end so that we can generate the Kibana search hyperlink at the end
START_OLDEST="2050-12-31" START_OLDEST="2050-12-31"
END_NEWEST="1971-01-01" END_NEWEST="1971-01-01"
INVALID_PCAPS_COUNT=0
VALID_PCAPS_COUNT=0
SKIPPED_PCAPS_COUNT=0
# paths must be quoted in case they include spaces # paths must be quoted in case they include spaces
for PCAP in "$@"; do for PCAP in $INPUT_FILES; do
PCAP=$(/usr/bin/realpath "$PCAP") PCAP=$(/usr/bin/realpath "$PCAP")
echo "Processing Import: ${PCAP}" status "Processing Import: ${PCAP}"
echo "- verifying file" status "- verifying file"
if ! pcapinfo "${PCAP}" > /dev/null 2>&1; then if ! pcapinfo "${PCAP}" > /dev/null 2>&1; then
# try to fix pcap and then process the fixed pcap directly # try to fix pcap and then process the fixed pcap directly
PCAP_FIXED=`mktemp /tmp/so-import-pcap-XXXXXXXXXX.pcap` PCAP_FIXED=`mktemp /tmp/so-import-pcap-XXXXXXXXXX.pcap`
echo "- attempting to recover corrupted PCAP file" status "- attempting to recover corrupted PCAP file"
pcapfix "${PCAP}" "${PCAP_FIXED}" pcapfix "${PCAP}" "${PCAP_FIXED}"
# Make fixed file world readable since the Suricata docker container will runas a non-root user # Make fixed file world readable since the Suricata docker container will runas a non-root user
chmod a+r "${PCAP_FIXED}" chmod a+r "${PCAP_FIXED}"
@@ -131,33 +169,44 @@ for PCAP in "$@"; do
# generate a unique hash to assist with dedupe checks # generate a unique hash to assist with dedupe checks
HASH=$(md5sum "${PCAP}" | awk '{ print $1 }') HASH=$(md5sum "${PCAP}" | awk '{ print $1 }')
HASH_DIR=/nsm/import/${HASH} HASH_DIR=/nsm/import/${HASH}
echo "- assigning unique identifier to import: $HASH" status "- assigning unique identifier to import: $HASH"
if [ -d $HASH_DIR ]; then pcap_data=$(pcapinfo "${PCAP}")
echo "- this PCAP has already been imported; skipping" if ! echo "$pcap_data" | grep -q "First packet time:" || echo "$pcap_data" |egrep -q "Last packet time: 1970-01-01|Last packet time: n/a"; then
INVALID_PCAPS="yes" status "- this PCAP file is invalid; skipping"
elif pcapinfo "${PCAP}" |egrep -q "Last packet time: 1970-01-01|Last packet time: n/a"; then INVALID_PCAPS_COUNT=$((INVALID_PCAPS_COUNT + 1))
echo "- this PCAP file is invalid; skipping"
INVALID_PCAPS="yes"
else else
VALID_PCAPS="yes" if [ -d $HASH_DIR ]; then
status "- this PCAP has already been imported; skipping"
SKIPPED_PCAPS_COUNT=$((SKIPPED_PCAPS_COUNT + 1))
else
VALID_PCAPS_COUNT=$((VALID_PCAPS_COUNT + 1))
PCAP_DIR=$HASH_DIR/pcap PCAP_DIR=$HASH_DIR/pcap
mkdir -p $PCAP_DIR mkdir -p $PCAP_DIR
# generate IDS alerts and write them to standard pipeline # generate IDS alerts and write them to standard pipeline
echo "- analyzing traffic with Suricata" status "- analyzing traffic with Suricata"
suricata "${PCAP}" $HASH suricata "${PCAP}" $HASH
{% if salt['pillar.get']('global:mdengine') == 'ZEEK' %} {% if salt['pillar.get']('global:mdengine') == 'ZEEK' %}
# generate Zeek logs and write them to a unique subdirectory in /nsm/import/zeek/ # generate Zeek logs and write them to a unique subdirectory in /nsm/import/zeek/
# since each run writes to a unique subdirectory, there is no need for a lock file # since each run writes to a unique subdirectory, there is no need for a lock file
echo "- analyzing traffic with Zeek" status "- analyzing traffic with Zeek"
zeek "${PCAP}" $HASH zeek "${PCAP}" $HASH
{% endif %} {% endif %}
status "- saving PCAP data spanning dates $START through $END"
fi
if [[ "$HASH_FILTERS" == "" ]]; then
HASH_FILTERS="import.id:${HASH}"
HASHES="${HASH}"
else
HASH_FILTERS="$HASH_FILTERS%20OR%20import.id:${HASH}"
HASHES="${HASHES} ${HASH}"
fi
START=$(pcapinfo "${PCAP}" -a |grep "First packet time:" | awk '{print $4}') START=$(pcapinfo "${PCAP}" -a |grep "First packet time:" | awk '{print $4}')
END=$(pcapinfo "${PCAP}" -e |grep "Last packet time:" | awk '{print $4}') END=$(pcapinfo "${PCAP}" -e |grep "Last packet time:" | awk '{print $4}')
echo "- saving PCAP data spanning dates $START through $END"
# compare $START to $START_OLDEST # compare $START to $START_OLDEST
START_COMPARE=$(date -d $START +%s) START_COMPARE=$(date -d $START +%s)
@@ -179,37 +228,62 @@ for PCAP in "$@"; do
fi # end of valid pcap fi # end of valid pcap
echo status
done # end of for-loop processing pcap files done # end of for-loop processing pcap files
# remove temp files # remove temp files
echo "Cleaning up:"
for TEMP_PCAP in ${TEMP_PCAPS[@]}; do for TEMP_PCAP in ${TEMP_PCAPS[@]}; do
echo "- removing temporary pcap $TEMP_PCAP" status "- removing temporary pcap $TEMP_PCAP"
rm -f $TEMP_PCAP rm -f $TEMP_PCAP
done done
# output final messages # output final messages
if [ "$INVALID_PCAPS" = "yes" ]; then if [[ $INVALID_PCAPS_COUNT -gt 0 ]]; then
echo status
echo "Please note! One or more pcaps was invalid! You can scroll up to see which ones were invalid." status "WARNING: One or more pcaps was invalid. Scroll up to see which ones were invalid."
fi fi
START_OLDEST_SLASH=$(echo $START_OLDEST | sed -e 's/-/%2F/g') START_OLDEST_SLASH=$(echo $START_OLDEST | sed -e 's/-/%2F/g')
END_NEWEST_SLASH=$(echo $END_NEWEST | sed -e 's/-/%2F/g') END_NEWEST_SLASH=$(echo $END_NEWEST | sed -e 's/-/%2F/g')
if [[ $VALID_PCAPS_COUNT -gt 0 ]] || [[ $SKIPPED_PCAPS_COUNT -gt 0 ]]; then
URL="https://{{ URLBASE }}/#/dashboards?q=$HASH_FILTERS%20%7C%20groupby%20-sankey%20event.dataset%20event.category%2a%20%7C%20groupby%20-pie%20event.category%20%7C%20groupby%20-bar%20event.module%20%7C%20groupby%20event.dataset%20%7C%20groupby%20event.module%20%7C%20groupby%20event.category%20%7C%20groupby%20observer.name%20%7C%20groupby%20source.ip%20%7C%20groupby%20destination.ip%20%7C%20groupby%20destination.port&t=${START_OLDEST_SLASH}%2000%3A00%3A00%20AM%20-%20${END_NEWEST_SLASH}%2000%3A00%3A00%20AM&z=UTC"
if [ "$VALID_PCAPS" = "yes" ]; then status "Import complete!"
cat << EOF status
status "Use the following hyperlink to view the imported data. Triple-click to quickly highlight the entire hyperlink and then copy it into a browser:"
Import complete! status "$URL"
status
You can use the following hyperlink to view data in the time range of your import. You can triple-click to quickly highlight the entire hyperlink and you can then copy it into your browser: status "or, manually set the Time Range to be (in UTC):"
https://{{ URLBASE }}/#/dashboards?q=import.id:${HASH}%20%7C%20groupby%20-sankey%20event.dataset%20event.category%2a%20%7C%20groupby%20-pie%20event.category%20%7C%20groupby%20-bar%20event.module%20%7C%20groupby%20event.dataset%20%7C%20groupby%20event.module%20%7C%20groupby%20event.category%20%7C%20groupby%20observer.name%20%7C%20groupby%20source.ip%20%7C%20groupby%20destination.ip%20%7C%20groupby%20destination.port&t=${START_OLDEST_SLASH}%2000%3A00%3A00%20AM%20-%20${END_NEWEST_SLASH}%2000%3A00%3A00%20AM&z=UTC status "From: $START_OLDEST To: $END_NEWEST"
status
or you can manually set your Time Range to be (in UTC): status "Note: It can take 30 seconds or more for events to appear in Security Onion Console."
From: $START_OLDEST To: $END_NEWEST RESULT=0
else
Please note that it may take 30 seconds or more for events to appear in Security Onion Console. START_OLDEST=
EOF END_NEWEST=
URL=
RESULT=1
fi fi
if [[ $json -eq 1 ]]; then
jq -n \
--arg success_count "$VALID_PCAPS_COUNT" \
--arg fail_count "$INVALID_PCAPS_COUNT" \
--arg skipped_count "$SKIPPED_PCAPS_COUNT" \
--arg begin_date "$START_OLDEST" \
--arg end_date "$END_NEWEST" \
--arg url "$URL" \
--arg hashes "$HASHES" \
'''{
success_count: $success_count,
fail_count: $fail_count,
skipped_count: $skipped_count,
begin_date: $begin_date,
end_date: $end_date,
url: $url,
hash: ($hashes / " ")
}'''
fi
exit $RESULT