mirror of
https://github.com/Security-Onion-Solutions/securityonion.git
synced 2026-04-25 14:07:49 +02:00
Add localfile analyzer and tests
This commit is contained in:
@@ -0,0 +1,7 @@
|
||||
{
|
||||
"name": "Local Intel",
|
||||
"version": "0.1",
|
||||
"author": "Security Onion Solutions",
|
||||
"description": "This analyzer queries local indicators in CSV format for a match.",
|
||||
"supportedTypes" : ["domain", "hash", "ip", "url"]
|
||||
}
|
||||
+79
@@ -0,0 +1,79 @@
|
||||
import json
|
||||
import helpers
|
||||
import os
|
||||
import sys
|
||||
import argparse
|
||||
import csv
|
||||
|
||||
|
||||
def checkConfigRequirements(conf):
|
||||
if "file_path" not in conf or len(conf['file_path']) == 0:
|
||||
sys.exit(126)
|
||||
else:
|
||||
return True
|
||||
|
||||
|
||||
def searchFile(artifact, csvfiles):
|
||||
dir = os.path.dirname(os.path.realpath(__file__))
|
||||
found = []
|
||||
for f in csvfiles:
|
||||
filename = dir + "/" + f
|
||||
with open(filename, "r") as csvfile:
|
||||
csvdata = csv.DictReader(csvfile)
|
||||
for row in csvdata:
|
||||
first_key = list(row.keys())[0]
|
||||
if artifact in row[first_key]:
|
||||
row.update({"filename": filename})
|
||||
found.append(row)
|
||||
if len(found) != 0:
|
||||
if len(found) == 1:
|
||||
results = found[0]
|
||||
else:
|
||||
results = found
|
||||
else:
|
||||
results = "No results"
|
||||
|
||||
return results
|
||||
|
||||
|
||||
def prepareResults(raw):
|
||||
if len(raw) > 0:
|
||||
if "No results" in raw:
|
||||
status = "ok"
|
||||
summary = "no_results"
|
||||
else:
|
||||
status = "info"
|
||||
summary = "One or more matches found."
|
||||
else:
|
||||
raw = {}
|
||||
status = "caution"
|
||||
summary = "internal_failure"
|
||||
response = raw
|
||||
results = {'response': response, 'status': status, 'summary': summary}
|
||||
return results
|
||||
|
||||
|
||||
def analyze(conf, input):
|
||||
checkConfigRequirements(conf)
|
||||
meta = helpers.loadMetadata(__file__)
|
||||
data = helpers.parseArtifact(input)
|
||||
helpers.checkSupportedType(meta, data["artifactType"])
|
||||
search = searchFile(data["value"], conf['file_path'])
|
||||
results = prepareResults(search)
|
||||
return results
|
||||
|
||||
|
||||
def main():
|
||||
dir = os.path.dirname(os.path.realpath(__file__))
|
||||
parser = argparse.ArgumentParser(description='Search CSV file for a given artifact')
|
||||
parser.add_argument('artifact', help='the artifact represented in JSON format')
|
||||
parser.add_argument('-c', '--config', metavar="CONFIG_FILE", default=dir + "/localfile.yaml", help='optional config file to use instead of the default config file')
|
||||
|
||||
args = parser.parse_args()
|
||||
if args.artifact:
|
||||
results = analyze(helpers.loadConfig(args.config), args.artifact)
|
||||
print(json.dumps(results))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -0,0 +1 @@
|
||||
file_path: []
|
||||
@@ -0,0 +1,4 @@
|
||||
indicator,description,reference
|
||||
abcd1234,This is a test!,Testing
|
||||
abcd1234,This is another test!,Testing
|
||||
192.168.1.1,Yet another test!,Testing
|
||||
|
@@ -0,0 +1,119 @@
|
||||
from io import StringIO
|
||||
import sys
|
||||
from unittest.mock import patch, MagicMock
|
||||
from localfile import localfile
|
||||
import unittest
|
||||
|
||||
|
||||
class TestLocalfileMethods(unittest.TestCase):
|
||||
|
||||
def test_main_missing_input(self):
|
||||
with patch('sys.exit', new=MagicMock()) as sysmock:
|
||||
with patch('sys.stderr', new=StringIO()) as mock_stderr:
|
||||
sys.argv = ["cmd"]
|
||||
localfile.main()
|
||||
self.assertEqual(mock_stderr.getvalue(), "usage: cmd [-h] [-c CONFIG_FILE] artifact\ncmd: error: the following arguments are required: artifact\n")
|
||||
sysmock.assert_called_once_with(2)
|
||||
|
||||
def test_main_success(self):
|
||||
output = {"foo": "bar"}
|
||||
with patch('sys.stdout', new=StringIO()) as mock_stdout:
|
||||
with patch('localfile.localfile.analyze', new=MagicMock(return_value=output)) as mock:
|
||||
sys.argv = ["cmd", "input"]
|
||||
localfile.main()
|
||||
expected = '{"foo": "bar"}\n'
|
||||
self.assertEqual(mock_stdout.getvalue(), expected)
|
||||
mock.assert_called_once()
|
||||
|
||||
def test_checkConfigRequirements_present(self):
|
||||
conf = {"file_path": "['intel.csv']"}
|
||||
self.assertTrue(localfile.checkConfigRequirements(conf))
|
||||
|
||||
def test_checkConfigRequirements_not_present(self):
|
||||
conf = {"not_a_file_path": "blahblah"}
|
||||
with self.assertRaises(SystemExit) as cm:
|
||||
localfile.checkConfigRequirements(conf)
|
||||
self.assertEqual(cm.exception.code, 126)
|
||||
|
||||
def test_checkConfigRequirements_empty(self):
|
||||
conf = {"file_path": ""}
|
||||
with self.assertRaises(SystemExit) as cm:
|
||||
localfile.checkConfigRequirements(conf)
|
||||
self.assertEqual(cm.exception.code, 126)
|
||||
|
||||
def test_searchFile_multiple_found(self):
|
||||
artifact = "abcd1234"
|
||||
results = localfile.searchFile(artifact, ["localfile_test.csv"])
|
||||
self.assertEqual(results[0]["indicator"], "abcd1234")
|
||||
self.assertEqual(results[0]["description"], "This is a test!")
|
||||
self.assertEqual(results[0]["reference"], "Testing")
|
||||
self.assertEqual(results[1]["indicator"], "abcd1234")
|
||||
self.assertEqual(results[1]["description"], "This is another test!")
|
||||
|
||||
def test_searchFile_single_found(self):
|
||||
artifact = "192.168.1.1"
|
||||
results = localfile.searchFile(artifact, ["localfile_test.csv"])
|
||||
self.assertEqual(results["indicator"], "192.168.1.1")
|
||||
self.assertEqual(results["description"], "Yet another test!")
|
||||
self.assertEqual(results["reference"], "Testing")
|
||||
|
||||
def test_searchFile_not_found(self):
|
||||
artifact = "youcan'tfindme"
|
||||
results = localfile.searchFile(artifact, ["localfile_test.csv"])
|
||||
self.assertEqual(results, "No results")
|
||||
|
||||
def test_prepareResults_none(self):
|
||||
raw = "No results"
|
||||
results = localfile.prepareResults(raw)
|
||||
self.assertEqual(results["response"], raw)
|
||||
self.assertEqual(results["summary"], "no_results")
|
||||
self.assertEqual(results["status"], "ok")
|
||||
|
||||
def test_prepareResults_ok(self):
|
||||
raw = [
|
||||
{
|
||||
"description": "This is one BAD piece of malware!",
|
||||
"filename": "/opt/sensoroni/analyzers/localfile/intel.csv",
|
||||
"indicator": "abc1234",
|
||||
"reference": "https://myintelservice"
|
||||
},
|
||||
{
|
||||
"filename": "/opt/sensoroni/analyzers/localfile/random.csv",
|
||||
"randomcol1": "myothervalue",
|
||||
"randomcol2": "myotherothervalue",
|
||||
"value": "abc1234"
|
||||
}
|
||||
]
|
||||
results = localfile.prepareResults(raw)
|
||||
self.assertEqual(results["response"], raw)
|
||||
self.assertEqual(results["summary"], "One or more matches found.")
|
||||
self.assertEqual(results["status"], "info")
|
||||
|
||||
def test_prepareResults_error(self):
|
||||
raw = {}
|
||||
results = localfile.prepareResults(raw)
|
||||
self.assertEqual(results["response"], raw)
|
||||
self.assertEqual(results["summary"], "internal_failure")
|
||||
self.assertEqual(results["status"], "caution")
|
||||
|
||||
def test_analyze(self):
|
||||
output = [
|
||||
{
|
||||
"description": "This is one BAD piece of malware!",
|
||||
"filename": "/opt/sensoroni/analyzers/localfile/intel.csv",
|
||||
"indicator": "abc1234",
|
||||
"reference": "https://myintelservice"
|
||||
},
|
||||
{
|
||||
"filename": "/opt/sensoroni/analyzers/localfile/random.csv",
|
||||
"randomcol1": "myothervalue",
|
||||
"randomcol2": "myotherothervalue",
|
||||
"value": "abc1234"
|
||||
}
|
||||
]
|
||||
artifactInput = '{"value":"foo","artifactType":"url"}'
|
||||
conf = {"file_path": "/home/intel.csv"}
|
||||
with patch('localfile.localfile.searchFile', new=MagicMock(return_value=output)) as mock:
|
||||
results = localfile.analyze(conf, artifactInput)
|
||||
self.assertEqual(results["summary"], "One or more matches found.")
|
||||
mock.assert_called_once()
|
||||
@@ -0,0 +1,2 @@
|
||||
requests>=2.27.1
|
||||
pyyaml>=6.0
|
||||
BIN
Binary file not shown.
BIN
Binary file not shown.
BIN
Binary file not shown.
Binary file not shown.
BIN
Binary file not shown.
BIN
Binary file not shown.
Reference in New Issue
Block a user