From 7df68335689e4b4b2487abbaec10f4b0d1d329fc Mon Sep 17 00:00:00 2001 From: Jason Ertel Date: Mon, 4 Apr 2022 15:58:53 -0400 Subject: [PATCH] Add unit tests for Urlhaus; remove placeholder whois analyzer --- .gitignore | 1 + salt/sensoroni/files/analyzers/build.sh | 2 +- salt/sensoroni/files/analyzers/helpers.py | 18 +++--- .../sensoroni/files/analyzers/helpers_test.py | 28 +++++++++ salt/sensoroni/files/analyzers/pytest.ini | 5 +- .../files/analyzers/urlhaus/urlhaus.py | 53 +++++++++-------- .../files/analyzers/urlhaus/urlhaus_test.py | 57 +++++++++++++++++++ .../files/analyzers/whois/__init__.py | 0 .../files/analyzers/whois/requirements.txt | 1 - salt/sensoroni/files/analyzers/whois/whois.py | 6 -- .../files/analyzers/whois/whois_test.py | 13 ----- 11 files changed, 127 insertions(+), 57 deletions(-) create mode 100644 salt/sensoroni/files/analyzers/helpers_test.py create mode 100644 salt/sensoroni/files/analyzers/urlhaus/urlhaus_test.py delete mode 100644 salt/sensoroni/files/analyzers/whois/__init__.py delete mode 100644 salt/sensoroni/files/analyzers/whois/requirements.txt delete mode 100644 salt/sensoroni/files/analyzers/whois/whois.py delete mode 100644 salt/sensoroni/files/analyzers/whois/whois_test.py diff --git a/.gitignore b/.gitignore index 62c791cc2..83e6035cd 100644 --- a/.gitignore +++ b/.gitignore @@ -63,3 +63,4 @@ __pycache__ .pytest_cache .coverage *.pyc +.venv \ No newline at end of file diff --git a/salt/sensoroni/files/analyzers/build.sh b/salt/sensoroni/files/analyzers/build.sh index 99840918b..c9c408f01 100755 --- a/salt/sensoroni/files/analyzers/build.sh +++ b/salt/sensoroni/files/analyzers/build.sh @@ -11,5 +11,5 @@ if ! which pytest &> /dev/null || ! which flake8 &> /dev/null ; then exit 1 fi -flake8 "$TARGET_DIR" --show-source --max-complexity=10 --max-line-length=200 --statistics --doctests +flake8 "$TARGET_DIR" --show-source --max-complexity=10 --max-line-length=200 --statistics --doctests --exclude .venv pytest "$TARGET_DIR" "--cov=$TARGET_DIR" --doctest-modules --cov-report=term --cov-fail-under=90 --cov-config=${HOME_DIR}/pytest.ini diff --git a/salt/sensoroni/files/analyzers/helpers.py b/salt/sensoroni/files/analyzers/helpers.py index e71033dc8..4a82def9b 100644 --- a/salt/sensoroni/files/analyzers/helpers.py +++ b/salt/sensoroni/files/analyzers/helpers.py @@ -1,24 +1,22 @@ -import os import json -import inspect +import os +import sys + def checkSupportedType(meta, artifact_type): if artifact_type not in meta['supportedTypes']: - sys.exit("No supported type detected!") + sys.exit(126) else: return True -def loadData(artifact): - request_data = json.loads(artifact) - artifact_value = request_data['value'] - artifact_type = request_data['artifactType'] - return artifact_type, artifact_value +def parseArtifact(artifact): + data = json.loads(artifact) + return data -def loadMeta(file): +def loadMetadata(file): dir = os.path.dirname(os.path.realpath(file)) filename = os.path.realpath(file).rsplit('/', 1)[1].split('.')[0] with open(str(dir + "/" + filename + ".json"), "r") as metafile: return json.load(metafile) - diff --git a/salt/sensoroni/files/analyzers/helpers_test.py b/salt/sensoroni/files/analyzers/helpers_test.py new file mode 100644 index 000000000..3a5f99646 --- /dev/null +++ b/salt/sensoroni/files/analyzers/helpers_test.py @@ -0,0 +1,28 @@ +from unittest.mock import patch, MagicMock +import helpers +import unittest + + +class TestHelpersMethods(unittest.TestCase): + + def test_checkSupportedType(self): + with patch('sys.exit', new=MagicMock()) as mock: + meta = {"supportedTypes": ["ip", "foo"]} + result = helpers.checkSupportedType(meta, "ip") + self.assertTrue(result) + mock.assert_not_called() + + result = helpers.checkSupportedType(meta, "bar") + self.assertFalse(result) + mock.assert_called_once_with("No supported type detected!") + + def test_loadMetadata(self): + input = 'urlhaus/urlhaus.py' + data = helpers.loadMetadata(input) + self.assertEqual(data["name"], "Urlhaus") + + def test_parseArtifact(self): + input = '{"value":"foo","artifactType":"bar"}' + data = helpers.parseArtifact(input) + self.assertEqual(data["artifactType"], "bar") + self.assertEqual(data["value"], "foo") diff --git a/salt/sensoroni/files/analyzers/pytest.ini b/salt/sensoroni/files/analyzers/pytest.ini index cc6ad652e..e9205f43f 100644 --- a/salt/sensoroni/files/analyzers/pytest.ini +++ b/salt/sensoroni/files/analyzers/pytest.ini @@ -5,4 +5,7 @@ python_functions = test_* [report] exclude_lines = - if __name__ == .__main__.: \ No newline at end of file + if __name__ == .__main__.: + +show_missing = True +omit = *_test.py \ No newline at end of file diff --git a/salt/sensoroni/files/analyzers/urlhaus/urlhaus.py b/salt/sensoroni/files/analyzers/urlhaus/urlhaus.py index 8919b0bc3..3e7493b56 100644 --- a/salt/sensoroni/files/analyzers/urlhaus/urlhaus.py +++ b/salt/sensoroni/files/analyzers/urlhaus/urlhaus.py @@ -5,39 +5,42 @@ import sys import helpers -def buildReq(meta, artifact_value): - base_url = meta['baseUrl'] - url = base_url - payload = {"url": artifact_value} - return payload, url +def buildReq(artifact_value): + return {"url": artifact_value} -def sendReq(meta, payload, url): +def sendReq(meta, payload): + url = meta['baseUrl'] response = requests.request('POST', url, data=payload) - raw = response.json() - if raw['query_status'] == "no_results": - summaryinfo = "No results available." - elif raw['query_status'] == "invalid_url": - summaryinfo = "Invalid URL." + return response.json() + + +def prepareResults(raw): if 'threat' in raw: - threat = raw['threat'] - if threat == 'malware_download': - summaryinfo = "Threat: Malware" + summary = raw['threat'] + status = "danger" + elif 'query_status' in raw: + summary = raw['query_status'] + if summary == 'no_results': + status = "ok" else: - summaryinfo = threat - summary = summaryinfo - results = {'response': raw, 'summary': summary} - print(json.dumps(results)) + status = "error" + results = {'response': raw, 'summary': summary, 'status': status} + return results + + +def analyze(input): + meta = helpers.loadMetadata(__file__) + data = helpers.parseArtifact(input) + helpers.checkSupportedType(meta, data["artifactType"]) + payload = buildReq(data["value"]) + response = sendReq(meta, payload) + return prepareResults(response) def main(): - meta = helpers.loadMeta(__file__) - data = helpers.loadData(sys.argv[1]) - helpers.checkSupportedType(meta, data[0]) - request = buildReq(meta, data[1]) - payload = request[0] - url = request[1] - sendReq(meta, payload, url) + results = analyze(sys.argv[1]) + print(json.dumps(results)) if __name__ == "__main__": diff --git a/salt/sensoroni/files/analyzers/urlhaus/urlhaus_test.py b/salt/sensoroni/files/analyzers/urlhaus/urlhaus_test.py new file mode 100644 index 000000000..0bf175578 --- /dev/null +++ b/salt/sensoroni/files/analyzers/urlhaus/urlhaus_test.py @@ -0,0 +1,57 @@ +from io import StringIO +from unittest.mock import patch, MagicMock +from urlhaus import urlhaus +import unittest + + +class TestUrlhausMethods(unittest.TestCase): + + def test_main(self): + output = {"foo": "bar"} + with patch('sys.stdout', new=StringIO()) as mock_stdout: + with patch('urlhaus.urlhaus.analyze', new=MagicMock(return_value=output)) as mock: + urlhaus.main() + expected = '{"foo": "bar"}\n' + self.assertEqual(mock_stdout.getvalue(), expected) + mock.assert_called_once_with('.') + + def test_buildReq(self): + result = urlhaus.buildReq("test") + self.assertEqual("test", result["url"]) + + def test_sendReq(self): + with patch('requests.request', new=MagicMock(return_value=MagicMock())) as mock: + meta = {"baseUrl": "myurl"} + response = urlhaus.sendReq(meta, "mypayload") + mock.assert_called_once_with("POST", "myurl", data="mypayload") + self.assertIsNotNone(response) + + def test_prepareResults_none(self): + raw = {"query_status": "no_results"} + results = urlhaus.prepareResults(raw) + self.assertEqual(results["response"], raw) + self.assertEqual(results["summary"], "no_results") + self.assertEqual(results["status"], "info") + + def test_prepareResults_invalidUrl(self): + raw = {"query_status": "invalid_url"} + results = urlhaus.prepareResults(raw) + self.assertEqual(results["response"], raw) + self.assertEqual(results["summary"], "invalid_url") + self.assertEqual(results["status"], "error") + + def test_prepareResults_threat(self): + raw = {"query_status": "invalid_url"} # This is overrided in this scenario + raw["threat"] = "bad_actor" + results = urlhaus.prepareResults(raw) + self.assertEqual(results["response"], raw) + self.assertEqual(results["summary"], "bad_actor") + self.assertEqual(results["status"], "danger") + + def test_analyze(self): + output = {"threat": "malware_download"} + artifactInput = '{"value":"foo","artifactType":"url"}' + with patch('urlhaus.urlhaus.sendReq', new=MagicMock(return_value=output)) as mock: + results = urlhaus.analyze(artifactInput) + self.assertEqual(results["summary"], "malware_download") + mock.assert_called_once() diff --git a/salt/sensoroni/files/analyzers/whois/__init__.py b/salt/sensoroni/files/analyzers/whois/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/salt/sensoroni/files/analyzers/whois/requirements.txt b/salt/sensoroni/files/analyzers/whois/requirements.txt deleted file mode 100644 index 6c36329a3..000000000 --- a/salt/sensoroni/files/analyzers/whois/requirements.txt +++ /dev/null @@ -1 +0,0 @@ -requests>=2.27.1 diff --git a/salt/sensoroni/files/analyzers/whois/whois.py b/salt/sensoroni/files/analyzers/whois/whois.py deleted file mode 100644 index dc739c8ba..000000000 --- a/salt/sensoroni/files/analyzers/whois/whois.py +++ /dev/null @@ -1,6 +0,0 @@ -def main(): - print('{"result":{ "requestId": "something-generated-by-whois", "someother_field": "more data" }, "summary": "botsrv.btc-goblin.ru"}') - - -if __name__ == "__main__": - main() diff --git a/salt/sensoroni/files/analyzers/whois/whois_test.py b/salt/sensoroni/files/analyzers/whois/whois_test.py deleted file mode 100644 index d76962392..000000000 --- a/salt/sensoroni/files/analyzers/whois/whois_test.py +++ /dev/null @@ -1,13 +0,0 @@ -from io import StringIO -from unittest.mock import patch -from whois import whois -import unittest - - -class TestWhoisMethods(unittest.TestCase): - - def test_main(self): - with patch('sys.stdout', new=StringIO()) as mock_stdout: - whois.main() - expected = '{"result":{ "requestId": "something-generated-by-whois", "someother_field": "more data" }, "summary": "botsrv.btc-goblin.ru"}\n' - self.assertEqual(mock_stdout.getvalue(), expected)