mirror of
https://github.com/Security-Onion-Solutions/securityonion.git
synced 2025-12-06 09:12:45 +01:00
Merge pull request #7958 from Security-Onion-Solutions/feature/mhr_analyzer
Add Team Cymru Malware Hash Registry Analyzer
This commit is contained in:
0
salt/sensoroni/files/analyzers/mhr/__init__.py
Normal file
0
salt/sensoroni/files/analyzers/mhr/__init__.py
Normal file
7
salt/sensoroni/files/analyzers/mhr/mhr.json
Normal file
7
salt/sensoroni/files/analyzers/mhr/mhr.json
Normal file
@@ -0,0 +1,7 @@
|
||||
{
|
||||
"name": "Team Cymru Malware Hash Registry",
|
||||
"version": "0.1",
|
||||
"author": "Security Onion Solutions",
|
||||
"description": "This analyzer queries Team Cymru's Malware Hash registry for hashes to determine if the associated files are considered malicious.",
|
||||
"supportedTypes" : ["hash"]
|
||||
}
|
||||
65
salt/sensoroni/files/analyzers/mhr/mhr.py
Executable file
65
salt/sensoroni/files/analyzers/mhr/mhr.py
Executable file
@@ -0,0 +1,65 @@
|
||||
import json
|
||||
import helpers
|
||||
import argparse
|
||||
import datetime
|
||||
from whois import NICClient
|
||||
|
||||
|
||||
def sendReq(hash):
|
||||
server = "hash.cymru.com"
|
||||
flags = 0
|
||||
options = {"whoishost": server}
|
||||
nic_client = NICClient()
|
||||
response = nic_client.whois_lookup(options, hash, flags).rstrip()
|
||||
hash = response.split(' ')[0]
|
||||
lastSeen = response.split(' ')[1]
|
||||
if lastSeen == "NO_DATA":
|
||||
avPct = 0
|
||||
else:
|
||||
avPct = response.split(' ')[2]
|
||||
lastSeen = datetime.datetime.fromtimestamp(int(lastSeen)).strftime("%Y-%d-%m %H:%M:%S")
|
||||
raw = {"hash": hash, "last_seen": lastSeen, "av_detection_percentage": int(avPct)}
|
||||
return raw
|
||||
|
||||
|
||||
def prepareResults(raw):
|
||||
if raw and "last_seen" in raw:
|
||||
if raw["last_seen"] == "NO_DATA":
|
||||
status = "ok"
|
||||
summary = "no_results"
|
||||
elif raw["av_detection_percentage"] < 1:
|
||||
status = "ok"
|
||||
summary = "harmless"
|
||||
elif raw["av_detection_percentage"] in range(1, 50):
|
||||
status = "caution"
|
||||
summary = "suspicious"
|
||||
elif raw["av_detection_percentage"] in range(51, 100):
|
||||
status = "threat"
|
||||
summary = "malicious"
|
||||
else:
|
||||
status = "caution"
|
||||
summary = "internal_failure"
|
||||
results = {'response': raw, 'summary': summary, 'status': status}
|
||||
return results
|
||||
|
||||
|
||||
def analyze(input):
|
||||
meta = helpers.loadMetadata(__file__)
|
||||
data = helpers.parseArtifact(input)
|
||||
helpers.checkSupportedType(meta, data["artifactType"])
|
||||
response = sendReq(data["value"])
|
||||
return prepareResults(response)
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description='Search Team Cymru Malware Hash Registry for a given artifact')
|
||||
parser.add_argument('artifact', help='the artifact represented in JSON format')
|
||||
|
||||
args = parser.parse_args()
|
||||
if args.artifact:
|
||||
results = analyze(args.artifact)
|
||||
print(json.dumps(results))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
93
salt/sensoroni/files/analyzers/mhr/mhr_test.py
Normal file
93
salt/sensoroni/files/analyzers/mhr/mhr_test.py
Normal file
@@ -0,0 +1,93 @@
|
||||
from io import StringIO
|
||||
import sys
|
||||
from unittest.mock import patch, MagicMock
|
||||
from mhr import mhr
|
||||
import unittest
|
||||
|
||||
|
||||
class TestMHRMethods(unittest.TestCase):
|
||||
|
||||
def test_main_missing_input(self):
|
||||
with patch('sys.exit', new=MagicMock()) as sysmock:
|
||||
with patch('sys.stderr', new=StringIO()) as mock_stderr:
|
||||
sys.argv = ["cmd"]
|
||||
mhr.main()
|
||||
self.assertEqual(mock_stderr.getvalue(), "usage: cmd [-h] artifact\ncmd: error: the following arguments are required: artifact\n")
|
||||
sysmock.assert_called_once_with(2)
|
||||
|
||||
def test_main_success(self):
|
||||
output = {"foo": "bar"}
|
||||
with patch('sys.stdout', new=StringIO()) as mock_stdout:
|
||||
with patch('mhr.mhr.analyze', new=MagicMock(return_value=output)) as mock:
|
||||
sys.argv = ["cmd", "input"]
|
||||
mhr.main()
|
||||
expected = '{"foo": "bar"}\n'
|
||||
self.assertEqual(mock_stdout.getvalue(), expected)
|
||||
mock.assert_called_once()
|
||||
|
||||
def test_sendReq(self):
|
||||
output = "84af04b8e69682782607a0c5796ca56999eda6b3 1563161433 35"
|
||||
hash = "abcd1234"
|
||||
server = "hash.cymru.com"
|
||||
flags = 0
|
||||
options = {"whoishost": server}
|
||||
with patch('whois.NICClient.whois_lookup', new=MagicMock(return_value=output)) as mock:
|
||||
response = mhr.sendReq(hash)
|
||||
mock.assert_called_once_with(options, hash, flags)
|
||||
self.assertIsNotNone(response)
|
||||
self.assertEqual(response, {"hash": "84af04b8e69682782607a0c5796ca56999eda6b3", "last_seen": "2019-15-07 03:30:33", "av_detection_percentage": 35})
|
||||
|
||||
def test_sendReqNoData(self):
|
||||
output = "84af04b8e69682782607a0c5796ca5696b3 NO_DATA"
|
||||
hash = "abcd1234"
|
||||
server = "hash.cymru.com"
|
||||
flags = 0
|
||||
options = {"whoishost": server}
|
||||
with patch('whois.NICClient.whois_lookup', new=MagicMock(return_value=output)) as mock:
|
||||
response = mhr.sendReq(hash)
|
||||
mock.assert_called_once_with(options, hash, flags)
|
||||
self.assertIsNotNone(response)
|
||||
self.assertEqual(response, {"hash": "84af04b8e69682782607a0c5796ca5696b3", "last_seen": "NO_DATA", "av_detection_percentage": 0})
|
||||
|
||||
def test_prepareResults_none(self):
|
||||
raw = {"hash": "14af04b8e69682782607a0c5796ca56999eda6b3", "last_seen": "NO_DATA", "av_detection_percentage": 0}
|
||||
results = mhr.prepareResults(raw)
|
||||
self.assertEqual(results["response"], raw)
|
||||
self.assertEqual(results["summary"], "no_results")
|
||||
self.assertEqual(results["status"], "ok")
|
||||
|
||||
def test_prepareResults_harmless(self):
|
||||
raw = {"hash": "14af04b8e69682782607a0c5796ca56999eda6b3", "last_seen": "123456", "av_detection_percentage": 0}
|
||||
results = mhr.prepareResults(raw)
|
||||
self.assertEqual(results["response"], raw)
|
||||
self.assertEqual(results["summary"], "harmless")
|
||||
self.assertEqual(results["status"], "ok")
|
||||
|
||||
def test_prepareResults_sus(self):
|
||||
raw = {"hash": "14af04b8e69682782607a0c5796ca56999eda6b3", "last_seen": "123456", "av_detection_percentage": 1}
|
||||
results = mhr.prepareResults(raw)
|
||||
self.assertEqual(results["response"], raw)
|
||||
self.assertEqual(results["summary"], "suspicious")
|
||||
self.assertEqual(results["status"], "caution")
|
||||
|
||||
def test_prepareResults_mal(self):
|
||||
raw = {"hash": "14af04b8e69682782607a0c5796ca56999eda6b3", "last_seen": "123456", "av_detection_percentage": 51}
|
||||
results = mhr.prepareResults(raw)
|
||||
self.assertEqual(results["response"], raw)
|
||||
self.assertEqual(results["summary"], "malicious")
|
||||
self.assertEqual(results["status"], "threat")
|
||||
|
||||
def test_prepareResults_error(self):
|
||||
raw = {}
|
||||
results = mhr.prepareResults(raw)
|
||||
self.assertEqual(results["response"], raw)
|
||||
self.assertEqual(results["summary"], "internal_failure")
|
||||
self.assertEqual(results["status"], "caution")
|
||||
|
||||
def test_analyze(self):
|
||||
output = {"hash": "14af04b8e69682782607a0c5796ca56999eda6b3", "last_seen": "NO_DATA", "av_detection_percentage": 0}
|
||||
artifactInput = '{"value": "14af04b8e69682782607a0c5796ca56999eda6b3", "artifactType": "hash"}'
|
||||
with patch('mhr.mhr.sendReq', new=MagicMock(return_value=output)) as mock:
|
||||
results = mhr.analyze(artifactInput)
|
||||
self.assertEqual(results["summary"], "no_results")
|
||||
mock.assert_called_once()
|
||||
2
salt/sensoroni/files/analyzers/mhr/requirements.txt
Normal file
2
salt/sensoroni/files/analyzers/mhr/requirements.txt
Normal file
@@ -0,0 +1,2 @@
|
||||
requests>=2.27.1
|
||||
python-whois>=0.7.3
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Reference in New Issue
Block a user