mirror of
https://github.com/Security-Onion-Solutions/securityonion.git
synced 2026-04-26 06:27:50 +02:00
Add EmailRep analyzer and tests
This commit is contained in:
@@ -0,0 +1,7 @@
|
||||
{
|
||||
"name": "EmailRep",
|
||||
"version": "0.1",
|
||||
"author": "Security Onion Solutions",
|
||||
"description": "This analyzer queries the EmailRep API for email address reputation information",
|
||||
"supportedTypes" : ["email", "mail"]
|
||||
}
|
||||
+67
@@ -0,0 +1,67 @@
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
import requests
|
||||
import helpers
|
||||
import argparse
|
||||
|
||||
|
||||
def checkConfigRequirements(conf):
|
||||
if "api_key" not in conf:
|
||||
sys.exit(126)
|
||||
else:
|
||||
return True
|
||||
|
||||
|
||||
def sendReq(conf, meta, email):
|
||||
url = conf['base_url'] + email
|
||||
headers = {"Key": conf['api_key']}
|
||||
response = requests.request('GET', url=url, headers=headers)
|
||||
return response.json()
|
||||
|
||||
|
||||
def prepareResults(raw):
|
||||
if "suspicious" in raw:
|
||||
if raw['suspicious'] is True:
|
||||
status = "caution"
|
||||
summary = "suspicious"
|
||||
elif raw['suspicious'] is False:
|
||||
status = "ok"
|
||||
summary = "harmless"
|
||||
elif "status" in raw:
|
||||
if raw["reason"] == "invalid email":
|
||||
status = "caution"
|
||||
summary = "Invalid email address."
|
||||
if "exceeded daily limit" in raw["reason"]:
|
||||
status = "caution"
|
||||
summary = "Exceeded daily request limit."
|
||||
else:
|
||||
status = "caution"
|
||||
summary = "internal_failure"
|
||||
results = {'response': raw, 'summary': summary, 'status': status}
|
||||
return results
|
||||
|
||||
|
||||
def analyze(conf, input):
|
||||
checkConfigRequirements(conf)
|
||||
meta = helpers.loadMetadata(__file__)
|
||||
data = helpers.parseArtifact(input)
|
||||
helpers.checkSupportedType(meta, data["artifactType"])
|
||||
response = sendReq(conf, meta, data["value"])
|
||||
return prepareResults(response)
|
||||
|
||||
|
||||
def main():
|
||||
dir = os.path.dirname(os.path.realpath(__file__))
|
||||
parser = argparse.ArgumentParser(description='Search Greynoise for a given artifact')
|
||||
parser.add_argument('artifact', help='the artifact represented in JSON format')
|
||||
parser.add_argument('-c', '--config', metavar="CONFIG_FILE", default=dir + "/emailrep.yaml", help='optional config file to use instead of the default config file')
|
||||
|
||||
args = parser.parse_args()
|
||||
if args.artifact:
|
||||
results = analyze(helpers.loadConfig(args.config), args.artifact)
|
||||
print(json.dumps(results))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -0,0 +1,2 @@
|
||||
base_url: https://emailrep.io/
|
||||
api_key: "{{ salt['pillar.get']('sensoroni:analyzers:emailrep:api_key', '') }}"
|
||||
@@ -0,0 +1,85 @@
|
||||
from io import StringIO
|
||||
import sys
|
||||
from unittest.mock import patch, MagicMock
|
||||
from emailrep import emailrep
|
||||
import unittest
|
||||
|
||||
|
||||
class TestGreynoiseMethods(unittest.TestCase):
|
||||
|
||||
def test_main_missing_input(self):
|
||||
with patch('sys.exit', new=MagicMock()) as sysmock:
|
||||
with patch('sys.stderr', new=StringIO()) as mock_stderr:
|
||||
sys.argv = ["cmd"]
|
||||
emailrep.main()
|
||||
self.assertEqual(mock_stderr.getvalue(), "usage: cmd [-h] [-c CONFIG_FILE] artifact\ncmd: error: the following arguments are required: artifact\n")
|
||||
sysmock.assert_called_once_with(2)
|
||||
|
||||
def test_main_success(self):
|
||||
output = {"foo": "bar"}
|
||||
with patch('sys.stdout', new=StringIO()) as mock_stdout:
|
||||
with patch('emailrep.emailrep.analyze', new=MagicMock(return_value=output)) as mock:
|
||||
sys.argv = ["cmd", "input"]
|
||||
emailrep.main()
|
||||
expected = '{"foo": "bar"}\n'
|
||||
self.assertEqual(mock_stdout.getvalue(), expected)
|
||||
mock.assert_called_once()
|
||||
|
||||
def test_checkConfigRequirements_not_present(self):
|
||||
conf = {"not_a_file_path": "blahblah"}
|
||||
with self.assertRaises(SystemExit) as cm:
|
||||
emailrep.checkConfigRequirements(conf)
|
||||
self.assertEqual(cm.exception.code, 126)
|
||||
|
||||
def test_sendReq(self):
|
||||
with patch('requests.request', new=MagicMock(return_value=MagicMock())) as mock:
|
||||
meta = {}
|
||||
conf = {"base_url": "https://myurl/", "api_key": "abcd1234"}
|
||||
email = "test@abc.com"
|
||||
response = emailrep.sendReq(conf=conf, meta=meta, email=email)
|
||||
mock.assert_called_once_with("GET", headers={"Key": "abcd1234"}, url="https://myurl/test@abc.com")
|
||||
self.assertIsNotNone(response)
|
||||
|
||||
def test_prepareResults_invalidEmail(self):
|
||||
raw = {"status": "fail", "reason": "invalid email"}
|
||||
results = emailrep.prepareResults(raw)
|
||||
self.assertEqual(results["response"], raw)
|
||||
self.assertEqual(results["summary"], "Invalid email address.")
|
||||
self.assertEqual(results["status"], "caution")
|
||||
|
||||
def test_prepareResults_not_suspicious(self):
|
||||
raw = {"email": "notsus@domain.com", "reputation": "high", "suspicious": False, "references": 21, "details": {"blacklisted": False, "malicious_activity": False, "profiles": ["twitter"]}}
|
||||
results = emailrep.prepareResults(raw)
|
||||
self.assertEqual(results["response"], raw)
|
||||
self.assertEqual(results["summary"], "harmless")
|
||||
self.assertEqual(results["status"], "ok")
|
||||
|
||||
def test_prepareResults_suspicious(self):
|
||||
raw = {"email": "sus@domain.com", "reputation": "none", "suspicious": True, "references": 0, "details": {"blacklisted": False, "malicious_activity": False, "profiles": []}}
|
||||
results = emailrep.prepareResults(raw)
|
||||
self.assertEqual(results["response"], raw)
|
||||
self.assertEqual(results["summary"], "suspicious")
|
||||
self.assertEqual(results["status"], "caution")
|
||||
|
||||
def test_prepareResults_exceeded_limit(self):
|
||||
raw = {"status": "fail", "reason": "exceeded daily limit. please wait 24 hrs or visit emailrep.io/key for an api key."}
|
||||
results = emailrep.prepareResults(raw)
|
||||
self.assertEqual(results["response"], raw)
|
||||
self.assertEqual(results["summary"], "Exceeded daily request limit.")
|
||||
self.assertEqual(results["status"], "caution")
|
||||
|
||||
def test_prepareResults_error(self):
|
||||
raw = {}
|
||||
results = emailrep.prepareResults(raw)
|
||||
self.assertEqual(results["response"], raw)
|
||||
self.assertEqual(results["summary"], "internal_failure")
|
||||
self.assertEqual(results["status"], "caution")
|
||||
|
||||
def test_analyze(self):
|
||||
output = {"email": "sus@domain.com", "reputation": "none", "suspicious": True, "references": 0, "details": {"blacklisted": False, "malicious_activity": False, "profiles": []}}
|
||||
artifactInput = '{"value":"sus@domain.com","artifactType":"email"}'
|
||||
conf = {"base_url": "myurl/", "api_key": "abcd1234"}
|
||||
with patch('emailrep.emailrep.sendReq', new=MagicMock(return_value=output)) as mock:
|
||||
results = emailrep.analyze(conf, artifactInput)
|
||||
self.assertEqual(results["summary"], "suspicious")
|
||||
mock.assert_called_once()
|
||||
@@ -0,0 +1,2 @@
|
||||
requests>=2.27.1
|
||||
pyyaml>=6.0
|
||||
BIN
Binary file not shown.
BIN
Binary file not shown.
BIN
Binary file not shown.
Binary file not shown.
BIN
Binary file not shown.
BIN
Binary file not shown.
Reference in New Issue
Block a user