mirror of
https://github.com/Security-Onion-Solutions/securityonion.git
synced 2025-12-06 17:22:49 +01:00
Add Whoislookup RDAP-based analyzer
This commit is contained in:
0
salt/sensoroni/files/analyzers/whoislookup/__init__.py
Executable file
0
salt/sensoroni/files/analyzers/whoislookup/__init__.py
Executable file
1
salt/sensoroni/files/analyzers/whoislookup/requirements.txt
Executable file
1
salt/sensoroni/files/analyzers/whoislookup/requirements.txt
Executable file
@@ -0,0 +1 @@
|
||||
whoisit>=2.5.3
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
7
salt/sensoroni/files/analyzers/whoislookup/whoislookup.json
Executable file
7
salt/sensoroni/files/analyzers/whoislookup/whoislookup.json
Executable file
@@ -0,0 +1,7 @@
|
||||
{
|
||||
"name": "WHOIS",
|
||||
"version": "0.1",
|
||||
"author": "Security Onion Solutions",
|
||||
"description": "This analyzer performs a query to an RDAP server for WHOIS-like domain information.",
|
||||
"supportedTypes" : ["domain"]
|
||||
}
|
||||
55
salt/sensoroni/files/analyzers/whoislookup/whoislookup.py
Executable file
55
salt/sensoroni/files/analyzers/whoislookup/whoislookup.py
Executable file
@@ -0,0 +1,55 @@
|
||||
import json
|
||||
import helpers
|
||||
import argparse
|
||||
import whoisit
|
||||
|
||||
|
||||
def sendReq(domain):
|
||||
whoisit.bootstrap()
|
||||
try:
|
||||
results = whoisit.domain(domain, raw=True)
|
||||
except whoisit.errors.ResourceDoesNotExist:
|
||||
results = "Not found."
|
||||
except whoisit.errors.QueryError as error:
|
||||
results = "QueryError: " + str(error)
|
||||
return results
|
||||
|
||||
|
||||
def prepareResults(raw):
|
||||
if raw:
|
||||
if "Not found." in raw:
|
||||
status = "info"
|
||||
summary = "no_results"
|
||||
elif "QueryError" in raw:
|
||||
status = "caution"
|
||||
summary = "invalid_input"
|
||||
else:
|
||||
status = "info"
|
||||
summary = "analysis_complete"
|
||||
else:
|
||||
status = "caution"
|
||||
summary = "internal_failure"
|
||||
results = {'response': raw, 'summary': summary, 'status': status}
|
||||
return results
|
||||
|
||||
|
||||
def analyze(input):
|
||||
meta = helpers.loadMetadata(__file__)
|
||||
data = helpers.parseArtifact(input)
|
||||
helpers.checkSupportedType(meta, data["artifactType"])
|
||||
response = sendReq(data["value"])
|
||||
return prepareResults(response)
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description='Query RDAP server for WHOIS-like information for a given artifact')
|
||||
parser.add_argument('artifact', help='the artifact represented in JSON format')
|
||||
|
||||
args = parser.parse_args()
|
||||
if args.artifact:
|
||||
results = analyze(args.artifact)
|
||||
print(json.dumps(results))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
89
salt/sensoroni/files/analyzers/whoislookup/whoislookup_test.py
Executable file
89
salt/sensoroni/files/analyzers/whoislookup/whoislookup_test.py
Executable file
@@ -0,0 +1,89 @@
|
||||
from io import StringIO
|
||||
import sys
|
||||
from unittest.mock import patch, MagicMock
|
||||
from whoislookup import whoislookup
|
||||
import unittest
|
||||
import whoisit
|
||||
|
||||
|
||||
class TestWhoisLookupMethods(unittest.TestCase):
|
||||
|
||||
def test_main_missing_input(self):
|
||||
with patch('sys.exit', new=MagicMock()) as sysmock:
|
||||
with patch('sys.stderr', new=StringIO()) as mock_stderr:
|
||||
sys.argv = ["cmd"]
|
||||
whoislookup.main()
|
||||
self.assertEqual(mock_stderr.getvalue(), "usage: cmd [-h] artifact\ncmd: error: the following arguments are required: artifact\n")
|
||||
sysmock.assert_called_once_with(2)
|
||||
|
||||
def test_main_success(self):
|
||||
output = {"foo": "bar"}
|
||||
with patch('sys.stdout', new=StringIO()) as mock_stdout:
|
||||
with patch('whoislookup.whoislookup.analyze', new=MagicMock(return_value=output)) as mock:
|
||||
sys.argv = ["cmd", "input"]
|
||||
whoislookup.main()
|
||||
expected = '{"foo": "bar"}\n'
|
||||
self.assertEqual(mock_stdout.getvalue(), expected)
|
||||
mock.assert_called_once()
|
||||
|
||||
def test_sendReq(self):
|
||||
output = {"foo": "bar"}
|
||||
with patch('whoisit.domain', new=MagicMock(return_value=output)) as mock:
|
||||
response = whoislookup.sendReq("abcd1234.com")
|
||||
mock.assert_called_once_with("abcd1234.com", raw=True)
|
||||
self.assertIsNotNone(response)
|
||||
self.assertEqual(response, output)
|
||||
|
||||
def test_sendReqNotFound(self):
|
||||
mock = MagicMock()
|
||||
mock.side_effect = whoisit.errors.ResourceDoesNotExist()
|
||||
with patch('whoisit.domain', new=mock):
|
||||
response = whoislookup.sendReq("abcd1234.com")
|
||||
mock.assert_called_once_with("abcd1234.com", raw=True)
|
||||
self.assertIsNotNone(response)
|
||||
self.assertEqual(response, "Not found.")
|
||||
|
||||
def test_sendReqQueryError(self):
|
||||
mock = MagicMock()
|
||||
mock.side_effect = whoisit.errors.QueryError("error")
|
||||
with patch('whoisit.domain', new=mock):
|
||||
response = whoislookup.sendReq("abcd1234.com")
|
||||
mock.assert_called_once_with("abcd1234.com", raw=True)
|
||||
self.assertIsNotNone(response)
|
||||
self.assertEqual(response, "QueryError: error")
|
||||
|
||||
def test_prepareResults_none(self):
|
||||
raw = "Not found."
|
||||
results = whoislookup.prepareResults(raw)
|
||||
self.assertEqual(results["response"], raw)
|
||||
self.assertEqual(results["summary"], "no_results")
|
||||
self.assertEqual(results["status"], "info")
|
||||
|
||||
def test_prepareResults_info(self):
|
||||
raw = {"hash": "14af04b8e69682782607a0c5796ca56999eda6b3", "last_seen": "123456", "av_detection_percentage": 0}
|
||||
results = whoislookup.prepareResults(raw)
|
||||
self.assertEqual(results["response"], raw)
|
||||
self.assertEqual(results["summary"], "analysis_complete")
|
||||
self.assertEqual(results["status"], "info")
|
||||
|
||||
def test_prepareResults_query_error(self):
|
||||
raw = "QueryError: blahblahblah"
|
||||
results = whoislookup.prepareResults(raw)
|
||||
self.assertEqual(results["response"], raw)
|
||||
self.assertEqual(results["summary"], "invalid_input")
|
||||
self.assertEqual(results["status"], "caution")
|
||||
|
||||
def test_prepareResults_error(self):
|
||||
raw = {}
|
||||
results = whoislookup.prepareResults(raw)
|
||||
self.assertEqual(results["response"], raw)
|
||||
self.assertEqual(results["summary"], "internal_failure")
|
||||
self.assertEqual(results["status"], "caution")
|
||||
|
||||
def test_analyze(self):
|
||||
output = {"hash": "14af04b8e69682782607a0c5796ca56999eda6b3", "last_seen": "NO_DATA", "av_detection_percentage": 0}
|
||||
artifactInput = '{"value": "14af04b8e69682782607a0c5796ca56999eda6b3", "artifactType": "domain"}'
|
||||
with patch('whoislookup.whoislookup.sendReq', new=MagicMock(return_value=output)) as mock:
|
||||
results = whoislookup.analyze(artifactInput)
|
||||
self.assertEqual(results["summary"], "analysis_complete")
|
||||
mock.assert_called_once()
|
||||
Reference in New Issue
Block a user