mirror of
https://github.com/Security-Onion-Solutions/securityonion.git
synced 2025-12-06 09:12:45 +01:00
127 lines
5.3 KiB
Python
127 lines
5.3 KiB
Python
from io import StringIO
|
|
import dns
|
|
import sys
|
|
from unittest.mock import patch, MagicMock
|
|
from spamhaus import spamhaus
|
|
import unittest
|
|
|
|
|
|
class FakeAnswer:
|
|
address = ''
|
|
|
|
def __init__(self, ip='127.0.0.1'):
|
|
self.address = ip
|
|
|
|
def to_text(self):
|
|
return str(self.address)
|
|
|
|
|
|
class TestSpamhausMethods(unittest.TestCase):
|
|
|
|
def test_main_missing_input(self):
|
|
with patch('sys.exit', new=MagicMock()) as sysmock:
|
|
with patch('sys.stderr', new=StringIO()) as mock_stderr:
|
|
sys.argv = ["cmd"]
|
|
spamhaus.main()
|
|
self.assertEqual(mock_stderr.getvalue(), "usage: cmd [-h] [-c CONFIG_FILE] artifact\ncmd: error: the following arguments are required: artifact\n")
|
|
sysmock.assert_called_once_with(2)
|
|
|
|
def test_main_success(self):
|
|
output = {"foo": "bar"}
|
|
with patch('sys.stdout', new=StringIO()) as mock_stdout:
|
|
with patch('spamhaus.spamhaus.analyze', new=MagicMock(return_value=output)) as mock:
|
|
sys.argv = ["cmd", "input"]
|
|
spamhaus.main()
|
|
expected = '{"foo": "bar"}\n'
|
|
self.assertEqual(mock_stdout.getvalue(), expected)
|
|
mock.assert_called_once()
|
|
|
|
def test_resolve(self):
|
|
with patch('dns.resolver.Resolver.resolve', new=MagicMock(return_value=MagicMock())) as mock:
|
|
meta = {}
|
|
conf = {"nameservers": ["1.2.3.4"], "lookup_host": "some.host"}
|
|
response = spamhaus.resolve(config=conf, meta=meta, ip="127.0.0.1")
|
|
mock.assert_called_once_with("1.0.0.127.some.host.")
|
|
self.assertIsNotNone(response)
|
|
|
|
def test_resolve_not_found(self):
|
|
mock = MagicMock()
|
|
mock.side_effect = dns.resolver.NXDOMAIN
|
|
with patch('dns.resolver.Resolver.resolve', new=mock):
|
|
meta = {}
|
|
conf = {"nameservers": ["1.2.3.4"], "lookup_host": "some.host"}
|
|
response = spamhaus.resolve(config=conf, meta=meta, ip="127.0.0.1")
|
|
mock.assert_called_once_with("1.0.0.127.some.host.")
|
|
self.assertIsNotNone(response)
|
|
|
|
def test_prepareResults_ok_multiple(self):
|
|
raw = [FakeAnswer("127.0.0.0"), FakeAnswer("127.0.0.1")]
|
|
results = spamhaus.prepareResults(raw)
|
|
self.assertEqual(results["response"], ['127.0.0.0', '127.0.0.1'])
|
|
self.assertEqual(results["summary"], "harmless")
|
|
self.assertEqual(results["status"], "ok")
|
|
|
|
def test_prepareResults_failure2(self):
|
|
raw = [FakeAnswer("127.255.255.252")]
|
|
results = spamhaus.prepareResults(raw)
|
|
self.assertEqual(results["response"], ['127.255.255.252'])
|
|
self.assertEqual(results["summary"], "internal_failure")
|
|
self.assertEqual(results["status"], "caution")
|
|
|
|
def test_prepareResults_failure4(self):
|
|
raw = [FakeAnswer("127.255.255.254")]
|
|
results = spamhaus.prepareResults(raw)
|
|
self.assertEqual(results["response"], ['127.255.255.254'])
|
|
self.assertEqual(results["summary"], "internal_failure")
|
|
self.assertEqual(results["status"], "caution")
|
|
|
|
def test_prepareResults_excessive(self):
|
|
raw = [FakeAnswer("127.255.255.255")]
|
|
results = spamhaus.prepareResults(raw)
|
|
self.assertEqual(results["response"], ['127.255.255.255'])
|
|
self.assertEqual(results["summary"], "excessive_usage")
|
|
self.assertEqual(results["status"], "caution")
|
|
|
|
def test_prepareResults_sus_multiple(self):
|
|
raw = [FakeAnswer("127.0.0.10"), FakeAnswer("127.0.0.11")]
|
|
results = spamhaus.prepareResults(raw)
|
|
self.assertEqual(results["response"], ['127.0.0.10', '127.0.0.11'])
|
|
self.assertEqual(results["summary"], "suspicious")
|
|
self.assertEqual(results["status"], "caution")
|
|
|
|
def test_prepareResults_spam_multiple(self):
|
|
raw = [FakeAnswer("127.0.0.2")]
|
|
results = spamhaus.prepareResults(raw)
|
|
self.assertEqual(results["response"], ['127.0.0.2'])
|
|
self.assertEqual(results["summary"], "spam")
|
|
self.assertEqual(results["status"], "caution")
|
|
|
|
def test_prepareResults_threat_multiple(self):
|
|
raw = [FakeAnswer("127.0.0.1"), FakeAnswer("127.0.0.4")]
|
|
results = spamhaus.prepareResults(raw)
|
|
self.assertEqual(results["response"], ['127.0.0.1', '127.0.0.4'])
|
|
self.assertEqual(results["summary"], "malicious")
|
|
self.assertEqual(results["status"], "threat")
|
|
|
|
def test_prepareResults_threat(self):
|
|
raw = [FakeAnswer("127.0.0.4")]
|
|
results = spamhaus.prepareResults(raw)
|
|
self.assertEqual(results["response"], ['127.0.0.4'])
|
|
self.assertEqual(results["summary"], "malicious")
|
|
self.assertEqual(results["status"], "threat")
|
|
|
|
def test_prepareResults_error(self):
|
|
raw = []
|
|
results = spamhaus.prepareResults(raw)
|
|
self.assertEqual(results["response"], [])
|
|
self.assertEqual(results["summary"], "harmless")
|
|
self.assertEqual(results["status"], "ok")
|
|
|
|
def test_analyze(self):
|
|
output = [FakeAnswer()]
|
|
artifactInput = '{"value":"1.2.3.4","artifactType":"ip"}'
|
|
with patch('spamhaus.spamhaus.resolve', new=MagicMock(return_value=output)) as mock:
|
|
results = spamhaus.analyze({}, artifactInput)
|
|
self.assertEqual(results["summary"], "harmless")
|
|
mock.assert_called_once()
|