mirror of
https://github.com/Security-Onion-Solutions/securityonion.git
synced 2025-12-08 18:22:47 +01:00
Add unit tests for Urlhaus; remove placeholder whois analyzer
This commit is contained in:
1
.gitignore
vendored
1
.gitignore
vendored
@@ -63,3 +63,4 @@ __pycache__
|
|||||||
.pytest_cache
|
.pytest_cache
|
||||||
.coverage
|
.coverage
|
||||||
*.pyc
|
*.pyc
|
||||||
|
.venv
|
||||||
@@ -11,5 +11,5 @@ if ! which pytest &> /dev/null || ! which flake8 &> /dev/null ; then
|
|||||||
exit 1
|
exit 1
|
||||||
fi
|
fi
|
||||||
|
|
||||||
flake8 "$TARGET_DIR" --show-source --max-complexity=10 --max-line-length=200 --statistics --doctests
|
flake8 "$TARGET_DIR" --show-source --max-complexity=10 --max-line-length=200 --statistics --doctests --exclude .venv
|
||||||
pytest "$TARGET_DIR" "--cov=$TARGET_DIR" --doctest-modules --cov-report=term --cov-fail-under=90 --cov-config=${HOME_DIR}/pytest.ini
|
pytest "$TARGET_DIR" "--cov=$TARGET_DIR" --doctest-modules --cov-report=term --cov-fail-under=90 --cov-config=${HOME_DIR}/pytest.ini
|
||||||
|
|||||||
@@ -1,24 +1,22 @@
|
|||||||
import os
|
|
||||||
import json
|
import json
|
||||||
import inspect
|
import os
|
||||||
|
import sys
|
||||||
|
|
||||||
|
|
||||||
def checkSupportedType(meta, artifact_type):
|
def checkSupportedType(meta, artifact_type):
|
||||||
if artifact_type not in meta['supportedTypes']:
|
if artifact_type not in meta['supportedTypes']:
|
||||||
sys.exit("No supported type detected!")
|
sys.exit(126)
|
||||||
else:
|
else:
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
|
||||||
def loadData(artifact):
|
def parseArtifact(artifact):
|
||||||
request_data = json.loads(artifact)
|
data = json.loads(artifact)
|
||||||
artifact_value = request_data['value']
|
return data
|
||||||
artifact_type = request_data['artifactType']
|
|
||||||
return artifact_type, artifact_value
|
|
||||||
|
|
||||||
|
|
||||||
def loadMeta(file):
|
def loadMetadata(file):
|
||||||
dir = os.path.dirname(os.path.realpath(file))
|
dir = os.path.dirname(os.path.realpath(file))
|
||||||
filename = os.path.realpath(file).rsplit('/', 1)[1].split('.')[0]
|
filename = os.path.realpath(file).rsplit('/', 1)[1].split('.')[0]
|
||||||
with open(str(dir + "/" + filename + ".json"), "r") as metafile:
|
with open(str(dir + "/" + filename + ".json"), "r") as metafile:
|
||||||
return json.load(metafile)
|
return json.load(metafile)
|
||||||
|
|
||||||
|
|||||||
28
salt/sensoroni/files/analyzers/helpers_test.py
Normal file
28
salt/sensoroni/files/analyzers/helpers_test.py
Normal file
@@ -0,0 +1,28 @@
|
|||||||
|
from unittest.mock import patch, MagicMock
|
||||||
|
import helpers
|
||||||
|
import unittest
|
||||||
|
|
||||||
|
|
||||||
|
class TestHelpersMethods(unittest.TestCase):
|
||||||
|
|
||||||
|
def test_checkSupportedType(self):
|
||||||
|
with patch('sys.exit', new=MagicMock()) as mock:
|
||||||
|
meta = {"supportedTypes": ["ip", "foo"]}
|
||||||
|
result = helpers.checkSupportedType(meta, "ip")
|
||||||
|
self.assertTrue(result)
|
||||||
|
mock.assert_not_called()
|
||||||
|
|
||||||
|
result = helpers.checkSupportedType(meta, "bar")
|
||||||
|
self.assertFalse(result)
|
||||||
|
mock.assert_called_once_with("No supported type detected!")
|
||||||
|
|
||||||
|
def test_loadMetadata(self):
|
||||||
|
input = 'urlhaus/urlhaus.py'
|
||||||
|
data = helpers.loadMetadata(input)
|
||||||
|
self.assertEqual(data["name"], "Urlhaus")
|
||||||
|
|
||||||
|
def test_parseArtifact(self):
|
||||||
|
input = '{"value":"foo","artifactType":"bar"}'
|
||||||
|
data = helpers.parseArtifact(input)
|
||||||
|
self.assertEqual(data["artifactType"], "bar")
|
||||||
|
self.assertEqual(data["value"], "foo")
|
||||||
@@ -6,3 +6,6 @@ python_functions = test_*
|
|||||||
[report]
|
[report]
|
||||||
exclude_lines =
|
exclude_lines =
|
||||||
if __name__ == .__main__.:
|
if __name__ == .__main__.:
|
||||||
|
|
||||||
|
show_missing = True
|
||||||
|
omit = *_test.py
|
||||||
@@ -5,39 +5,42 @@ import sys
|
|||||||
import helpers
|
import helpers
|
||||||
|
|
||||||
|
|
||||||
def buildReq(meta, artifact_value):
|
def buildReq(artifact_value):
|
||||||
base_url = meta['baseUrl']
|
return {"url": artifact_value}
|
||||||
url = base_url
|
|
||||||
payload = {"url": artifact_value}
|
|
||||||
return payload, url
|
|
||||||
|
|
||||||
|
|
||||||
def sendReq(meta, payload, url):
|
def sendReq(meta, payload):
|
||||||
|
url = meta['baseUrl']
|
||||||
response = requests.request('POST', url, data=payload)
|
response = requests.request('POST', url, data=payload)
|
||||||
raw = response.json()
|
return response.json()
|
||||||
if raw['query_status'] == "no_results":
|
|
||||||
summaryinfo = "No results available."
|
|
||||||
elif raw['query_status'] == "invalid_url":
|
def prepareResults(raw):
|
||||||
summaryinfo = "Invalid URL."
|
|
||||||
if 'threat' in raw:
|
if 'threat' in raw:
|
||||||
threat = raw['threat']
|
summary = raw['threat']
|
||||||
if threat == 'malware_download':
|
status = "danger"
|
||||||
summaryinfo = "Threat: Malware"
|
elif 'query_status' in raw:
|
||||||
|
summary = raw['query_status']
|
||||||
|
if summary == 'no_results':
|
||||||
|
status = "ok"
|
||||||
else:
|
else:
|
||||||
summaryinfo = threat
|
status = "error"
|
||||||
summary = summaryinfo
|
results = {'response': raw, 'summary': summary, 'status': status}
|
||||||
results = {'response': raw, 'summary': summary}
|
return results
|
||||||
print(json.dumps(results))
|
|
||||||
|
|
||||||
|
def analyze(input):
|
||||||
|
meta = helpers.loadMetadata(__file__)
|
||||||
|
data = helpers.parseArtifact(input)
|
||||||
|
helpers.checkSupportedType(meta, data["artifactType"])
|
||||||
|
payload = buildReq(data["value"])
|
||||||
|
response = sendReq(meta, payload)
|
||||||
|
return prepareResults(response)
|
||||||
|
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
meta = helpers.loadMeta(__file__)
|
results = analyze(sys.argv[1])
|
||||||
data = helpers.loadData(sys.argv[1])
|
print(json.dumps(results))
|
||||||
helpers.checkSupportedType(meta, data[0])
|
|
||||||
request = buildReq(meta, data[1])
|
|
||||||
payload = request[0]
|
|
||||||
url = request[1]
|
|
||||||
sendReq(meta, payload, url)
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
|||||||
57
salt/sensoroni/files/analyzers/urlhaus/urlhaus_test.py
Normal file
57
salt/sensoroni/files/analyzers/urlhaus/urlhaus_test.py
Normal file
@@ -0,0 +1,57 @@
|
|||||||
|
from io import StringIO
|
||||||
|
from unittest.mock import patch, MagicMock
|
||||||
|
from urlhaus import urlhaus
|
||||||
|
import unittest
|
||||||
|
|
||||||
|
|
||||||
|
class TestUrlhausMethods(unittest.TestCase):
|
||||||
|
|
||||||
|
def test_main(self):
|
||||||
|
output = {"foo": "bar"}
|
||||||
|
with patch('sys.stdout', new=StringIO()) as mock_stdout:
|
||||||
|
with patch('urlhaus.urlhaus.analyze', new=MagicMock(return_value=output)) as mock:
|
||||||
|
urlhaus.main()
|
||||||
|
expected = '{"foo": "bar"}\n'
|
||||||
|
self.assertEqual(mock_stdout.getvalue(), expected)
|
||||||
|
mock.assert_called_once_with('.')
|
||||||
|
|
||||||
|
def test_buildReq(self):
|
||||||
|
result = urlhaus.buildReq("test")
|
||||||
|
self.assertEqual("test", result["url"])
|
||||||
|
|
||||||
|
def test_sendReq(self):
|
||||||
|
with patch('requests.request', new=MagicMock(return_value=MagicMock())) as mock:
|
||||||
|
meta = {"baseUrl": "myurl"}
|
||||||
|
response = urlhaus.sendReq(meta, "mypayload")
|
||||||
|
mock.assert_called_once_with("POST", "myurl", data="mypayload")
|
||||||
|
self.assertIsNotNone(response)
|
||||||
|
|
||||||
|
def test_prepareResults_none(self):
|
||||||
|
raw = {"query_status": "no_results"}
|
||||||
|
results = urlhaus.prepareResults(raw)
|
||||||
|
self.assertEqual(results["response"], raw)
|
||||||
|
self.assertEqual(results["summary"], "no_results")
|
||||||
|
self.assertEqual(results["status"], "info")
|
||||||
|
|
||||||
|
def test_prepareResults_invalidUrl(self):
|
||||||
|
raw = {"query_status": "invalid_url"}
|
||||||
|
results = urlhaus.prepareResults(raw)
|
||||||
|
self.assertEqual(results["response"], raw)
|
||||||
|
self.assertEqual(results["summary"], "invalid_url")
|
||||||
|
self.assertEqual(results["status"], "error")
|
||||||
|
|
||||||
|
def test_prepareResults_threat(self):
|
||||||
|
raw = {"query_status": "invalid_url"} # This is overrided in this scenario
|
||||||
|
raw["threat"] = "bad_actor"
|
||||||
|
results = urlhaus.prepareResults(raw)
|
||||||
|
self.assertEqual(results["response"], raw)
|
||||||
|
self.assertEqual(results["summary"], "bad_actor")
|
||||||
|
self.assertEqual(results["status"], "danger")
|
||||||
|
|
||||||
|
def test_analyze(self):
|
||||||
|
output = {"threat": "malware_download"}
|
||||||
|
artifactInput = '{"value":"foo","artifactType":"url"}'
|
||||||
|
with patch('urlhaus.urlhaus.sendReq', new=MagicMock(return_value=output)) as mock:
|
||||||
|
results = urlhaus.analyze(artifactInput)
|
||||||
|
self.assertEqual(results["summary"], "malware_download")
|
||||||
|
mock.assert_called_once()
|
||||||
@@ -1 +0,0 @@
|
|||||||
requests>=2.27.1
|
|
||||||
@@ -1,6 +0,0 @@
|
|||||||
def main():
|
|
||||||
print('{"result":{ "requestId": "something-generated-by-whois", "someother_field": "more data" }, "summary": "botsrv.btc-goblin.ru"}')
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
main()
|
|
||||||
@@ -1,13 +0,0 @@
|
|||||||
from io import StringIO
|
|
||||||
from unittest.mock import patch
|
|
||||||
from whois import whois
|
|
||||||
import unittest
|
|
||||||
|
|
||||||
|
|
||||||
class TestWhoisMethods(unittest.TestCase):
|
|
||||||
|
|
||||||
def test_main(self):
|
|
||||||
with patch('sys.stdout', new=StringIO()) as mock_stdout:
|
|
||||||
whois.main()
|
|
||||||
expected = '{"result":{ "requestId": "something-generated-by-whois", "someother_field": "more data" }, "summary": "botsrv.btc-goblin.ru"}\n'
|
|
||||||
self.assertEqual(mock_stdout.getvalue(), expected)
|
|
||||||
Reference in New Issue
Block a user