Add new spamhaus analyzer

This commit is contained in:
Jason Ertel
2022-04-19 12:12:52 -04:00
parent 0cb73d8f6a
commit 4129cef9fb
10 changed files with 222 additions and 1 deletions

View File

@@ -0,0 +1,2 @@
dnspython>=2.2.1
pyyaml>=6.0

View File

@@ -0,0 +1,7 @@
{
"name": "Spamhaus",
"version": "0.1",
"author": "Jason Ertel",
"description": "This analyzer queries Spamhaus to see if an IP is considered malicious.",
"supportedTypes" : ["ip"]
}

View File

@@ -0,0 +1,83 @@
import argparse
import dns.resolver
import dns.reversename
import json
import os
import helpers
def resolve(config, meta, ip):
value = str(dns.reversename.from_address(ip)).replace("in-addr.arpa.", config["lookup_host"] + ".")
resolver = dns.resolver.Resolver()
if len(config["nameservers"]) > 0:
resolver.nameservers = config["nameservers"]
try:
responses = resolver.resolve(value)
except dns.resolver.NXDOMAIN:
responses = []
return responses
def prepareResults(responses):
resultMap = {
"127.0.0.2": {'severity': 200, 'summary': 'spam', 'status': 'caution'},
"127.0.0.3": {'severity': 200, 'summary': 'spam', 'status': 'caution'},
"127.0.0.4": {'severity': 300, 'summary': 'malicious', 'status': 'threat'},
"127.0.0.5": {'severity': 300, 'summary': 'malicious', 'status': 'threat'},
"127.0.0.6": {'severity': 300, 'summary': 'malicious', 'status': 'threat'},
"127.0.0.7": {'severity': 300, 'summary': 'malicious', 'status': 'threat'},
"127.0.0.10": {'severity': 100, 'summary': 'suspicious', 'status': 'caution'},
"127.0.0.11": {'severity': 100, 'summary': 'suspicious', 'status': 'caution'},
"127.0.1.2": {'severity': 200, 'summary': 'spam', 'status': 'caution'},
"127.0.1.4": {'severity': 250, 'summary': 'phishing', 'status': 'threat'},
"127.0.1.5": {'severity': 300, 'summary': 'malicious', 'status': 'threat'},
"127.0.1.6": {'severity': 300, 'summary': 'malicious', 'status': 'threat'},
"127.0.1.102": {'severity': 200, 'summary': 'spam', 'status': 'caution'},
"127.0.1.103": {'severity': 200, 'summary': 'spam', 'status': 'caution'},
"127.0.1.104": {'severity': 300, 'summary': 'malicious', 'status': 'threat'},
"127.0.1.105": {'severity': 300, 'summary': 'malicious', 'status': 'threat'},
"127.0.1.106": {'severity': 300, 'summary': 'malicious', 'status': 'threat'},
"127.0.1.107": {'severity': 100, 'summary': 'suspicious', 'status': 'caution'},
"127.255.255.252": {'severity': 1, 'summary': 'internal_failure', 'status': 'caution'},
"127.255.255.254": {'severity': 2, 'summary': 'internal_failure', 'status': 'caution'},
"127.255.255.255": {'severity': 3, 'summary': 'excessive_usage', 'status': 'caution'},
}
raw = []
currentResult = {'severity': 0, 'summary': 'harmless', 'status': 'ok'}
for response in responses:
raw.append(response.to_text())
if response.address in resultMap:
result = resultMap[response.address]
if currentResult is None or currentResult['severity'] < result['severity']:
currentResult = result
currentResult['response'] = raw
return currentResult
def analyze(config, input):
meta = helpers.loadMetadata(__file__)
data = helpers.parseArtifact(input)
helpers.checkSupportedType(meta, data["artifactType"])
response = resolve(config, meta, data["value"])
return prepareResults(response)
def main():
dir = os.path.dirname(os.path.realpath(__file__))
parser = argparse.ArgumentParser(description='Search Spamhaus for an IP')
parser.add_argument('artifact', help='the artifact represented in JSON format')
parser.add_argument('-c', '--config', metavar="CONFIG_FILE", default=dir + "/spamhaus.yaml", help='optional config file to use instead of the default config file')
args = parser.parse_args()
if args.artifact:
results = analyze(helpers.loadConfig(args.config), args.artifact)
print(json.dumps(results))
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,2 @@
lookup_host: zen.spamhaus.org
nameservers: ["{{ salt['pillar.get']('sensoroni:analyzers:spamhaus:nameserver', '') }}"]

View File

@@ -0,0 +1,126 @@
from io import StringIO
import dns
import sys
from unittest.mock import patch, MagicMock
from spamhaus import spamhaus
import unittest
class FakeAnswer:
address = ''
def __init__(self, ip='127.0.0.1'):
self.address = ip
def to_text(self):
return str(self.address)
class TestSpamhausMethods(unittest.TestCase):
def test_main_missing_input(self):
with patch('sys.exit', new=MagicMock()) as sysmock:
with patch('sys.stderr', new=StringIO()) as mock_stderr:
sys.argv = ["cmd"]
spamhaus.main()
self.assertEqual(mock_stderr.getvalue(), "usage: cmd [-h] [-c CONFIG_FILE] artifact\ncmd: error: the following arguments are required: artifact\n")
sysmock.assert_called_once_with(2)
def test_main_success(self):
output = {"foo": "bar"}
with patch('sys.stdout', new=StringIO()) as mock_stdout:
with patch('spamhaus.spamhaus.analyze', new=MagicMock(return_value=output)) as mock:
sys.argv = ["cmd", "input"]
spamhaus.main()
expected = '{"foo": "bar"}\n'
self.assertEqual(mock_stdout.getvalue(), expected)
mock.assert_called_once()
def test_resolve(self):
with patch('dns.resolver.Resolver.resolve', new=MagicMock(return_value=MagicMock())) as mock:
meta = {}
conf = {"nameservers": ["1.2.3.4"], "lookup_host": "some.host"}
response = spamhaus.resolve(config=conf, meta=meta, ip="127.0.0.1")
mock.assert_called_once_with("1.0.0.127.some.host.")
self.assertIsNotNone(response)
def test_resolve_not_found(self):
mock = MagicMock()
mock.side_effect = dns.resolver.NXDOMAIN
with patch('dns.resolver.Resolver.resolve', new=mock):
meta = {}
conf = {"nameservers": ["1.2.3.4"], "lookup_host": "some.host"}
response = spamhaus.resolve(config=conf, meta=meta, ip="127.0.0.1")
mock.assert_called_once_with("1.0.0.127.some.host.")
self.assertIsNotNone(response)
def test_prepareResults_ok_multiple(self):
raw = [FakeAnswer("127.0.0.0"), FakeAnswer("127.0.0.1")]
results = spamhaus.prepareResults(raw)
self.assertEqual(results["response"], ['127.0.0.0', '127.0.0.1'])
self.assertEqual(results["summary"], "harmless")
self.assertEqual(results["status"], "ok")
def test_prepareResults_failure2(self):
raw = [FakeAnswer("127.255.255.252")]
results = spamhaus.prepareResults(raw)
self.assertEqual(results["response"], ['127.255.255.252'])
self.assertEqual(results["summary"], "internal_failure")
self.assertEqual(results["status"], "caution")
def test_prepareResults_failure4(self):
raw = [FakeAnswer("127.255.255.254")]
results = spamhaus.prepareResults(raw)
self.assertEqual(results["response"], ['127.255.255.254'])
self.assertEqual(results["summary"], "internal_failure")
self.assertEqual(results["status"], "caution")
def test_prepareResults_excessive(self):
raw = [FakeAnswer("127.255.255.255")]
results = spamhaus.prepareResults(raw)
self.assertEqual(results["response"], ['127.255.255.255'])
self.assertEqual(results["summary"], "excessive_usage")
self.assertEqual(results["status"], "caution")
def test_prepareResults_sus_multiple(self):
raw = [FakeAnswer("127.0.0.10"), FakeAnswer("127.0.0.11")]
results = spamhaus.prepareResults(raw)
self.assertEqual(results["response"], ['127.0.0.10', '127.0.0.11'])
self.assertEqual(results["summary"], "suspicious")
self.assertEqual(results["status"], "caution")
def test_prepareResults_spam_multiple(self):
raw = [FakeAnswer("127.0.0.2")]
results = spamhaus.prepareResults(raw)
self.assertEqual(results["response"], ['127.0.0.2'])
self.assertEqual(results["summary"], "spam")
self.assertEqual(results["status"], "caution")
def test_prepareResults_threat_multiple(self):
raw = [FakeAnswer("127.0.0.1"), FakeAnswer("127.0.0.4")]
results = spamhaus.prepareResults(raw)
self.assertEqual(results["response"], ['127.0.0.1', '127.0.0.4'])
self.assertEqual(results["summary"], "malicious")
self.assertEqual(results["status"], "threat")
def test_prepareResults_threat(self):
raw = [FakeAnswer("127.0.0.4")]
results = spamhaus.prepareResults(raw)
self.assertEqual(results["response"], ['127.0.0.4'])
self.assertEqual(results["summary"], "malicious")
self.assertEqual(results["status"], "threat")
def test_prepareResults_error(self):
raw = []
results = spamhaus.prepareResults(raw)
self.assertEqual(results["response"], [])
self.assertEqual(results["summary"], "harmless")
self.assertEqual(results["status"], "ok")
def test_analyze(self):
output = [FakeAnswer()]
artifactInput = '{"value":"1.2.3.4","artifactType":"ip"}'
with patch('spamhaus.spamhaus.resolve', new=MagicMock(return_value=output)) as mock:
results = spamhaus.analyze({}, artifactInput)
self.assertEqual(results["summary"], "harmless")
mock.assert_called_once()

View File

@@ -2,7 +2,7 @@
"name": "Urlhaus",
"version": "0.1",
"author": "Wes",
"description": "This analyzer queries URLHaus to see if a URL is consdered malicious.",
"description": "This analyzer queries URLHaus to see if a URL is considered malicious.",
"supportedTypes" : ["url"],
"baseUrl": "https://urlhaus-api.abuse.ch/v1/url/"
}

View File

@@ -9,6 +9,7 @@ class TestUrlhausMethods(unittest.TestCase):
def test_main_missing_input(self):
with patch('sys.stdout', new=StringIO()) as mock_stdout:
sys.argv = ["cmd"]
urlhaus.main()
self.assertEqual(mock_stdout.getvalue(), "ERROR: Missing input JSON\n")