mirror of
https://github.com/Security-Onion-Solutions/securityonion.git
synced 2025-12-06 09:12:45 +01:00
Make sure everything is added back after renaming mhr to malwarehashregistry
This commit is contained in:
@@ -0,0 +1,7 @@
|
||||
{
|
||||
"name": "Team Cymru Malware Hash Registry",
|
||||
"version": "0.1",
|
||||
"author": "Security Onion Solutions",
|
||||
"description": "This analyzer queries Team Cymru's Malware Hash registry for hashes to determine if the associated files are considered malicious.",
|
||||
"supportedTypes" : ["hash"]
|
||||
}
|
||||
65
salt/sensoroni/files/analyzers/malwarehashregistry/malwarehashregistry.py
Executable file
65
salt/sensoroni/files/analyzers/malwarehashregistry/malwarehashregistry.py
Executable file
@@ -0,0 +1,65 @@
|
||||
import json
|
||||
import helpers
|
||||
import argparse
|
||||
import datetime
|
||||
from whois import NICClient
|
||||
|
||||
|
||||
def sendReq(hash):
|
||||
server = "hash.cymru.com"
|
||||
flags = 0
|
||||
options = {"whoishost": server}
|
||||
nic_client = NICClient()
|
||||
response = nic_client.whois_lookup(options, hash, flags).rstrip()
|
||||
hash = response.split(' ')[0]
|
||||
lastSeen = response.split(' ')[1]
|
||||
if lastSeen == "NO_DATA":
|
||||
avPct = 0
|
||||
else:
|
||||
avPct = response.split(' ')[2]
|
||||
lastSeen = datetime.datetime.fromtimestamp(int(lastSeen)).strftime("%Y-%d-%m %H:%M:%S")
|
||||
raw = {"hash": hash, "last_seen": lastSeen, "av_detection_percentage": int(avPct)}
|
||||
return raw
|
||||
|
||||
|
||||
def prepareResults(raw):
|
||||
if raw and "last_seen" in raw:
|
||||
if raw["last_seen"] == "NO_DATA":
|
||||
status = "ok"
|
||||
summary = "no_results"
|
||||
elif raw["av_detection_percentage"] < 1:
|
||||
status = "ok"
|
||||
summary = "harmless"
|
||||
elif raw["av_detection_percentage"] in range(1, 50):
|
||||
status = "caution"
|
||||
summary = "suspicious"
|
||||
elif raw["av_detection_percentage"] in range(51, 100):
|
||||
status = "threat"
|
||||
summary = "malicious"
|
||||
else:
|
||||
status = "caution"
|
||||
summary = "internal_failure"
|
||||
results = {'response': raw, 'summary': summary, 'status': status}
|
||||
return results
|
||||
|
||||
|
||||
def analyze(input):
|
||||
meta = helpers.loadMetadata(__file__)
|
||||
data = helpers.parseArtifact(input)
|
||||
helpers.checkSupportedType(meta, data["artifactType"])
|
||||
response = sendReq(data["value"])
|
||||
return prepareResults(response)
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description='Search Team Cymru Malware Hash Registry for a given artifact')
|
||||
parser.add_argument('artifact', help='the artifact represented in JSON format')
|
||||
|
||||
args = parser.parse_args()
|
||||
if args.artifact:
|
||||
results = analyze(args.artifact)
|
||||
print(json.dumps(results))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -0,0 +1,93 @@
|
||||
from io import StringIO
|
||||
import sys
|
||||
from unittest.mock import patch, MagicMock
|
||||
from malwarehashregistry import malwarehashregistry
|
||||
import unittest
|
||||
|
||||
|
||||
class TestMalwareHashRegistryMethods(unittest.TestCase):
|
||||
|
||||
def test_main_missing_input(self):
|
||||
with patch('sys.exit', new=MagicMock()) as sysmock:
|
||||
with patch('sys.stderr', new=StringIO()) as mock_stderr:
|
||||
sys.argv = ["cmd"]
|
||||
malwarehashregistry.main()
|
||||
self.assertEqual(mock_stderr.getvalue(), "usage: cmd [-h] artifact\ncmd: error: the following arguments are required: artifact\n")
|
||||
sysmock.assert_called_once_with(2)
|
||||
|
||||
def test_main_success(self):
|
||||
output = {"foo": "bar"}
|
||||
with patch('sys.stdout', new=StringIO()) as mock_stdout:
|
||||
with patch('malwarehashregistry.malwarehashregistry.analyze', new=MagicMock(return_value=output)) as mock:
|
||||
sys.argv = ["cmd", "input"]
|
||||
malwarehashregistry.main()
|
||||
expected = '{"foo": "bar"}\n'
|
||||
self.assertEqual(mock_stdout.getvalue(), expected)
|
||||
mock.assert_called_once()
|
||||
|
||||
def test_sendReq(self):
|
||||
output = "84af04b8e69682782607a0c5796ca56999eda6b3 1563161433 35"
|
||||
hash = "abcd1234"
|
||||
server = "hash.cymru.com"
|
||||
flags = 0
|
||||
options = {"whoishost": server}
|
||||
with patch('whois.NICClient.whois_lookup', new=MagicMock(return_value=output)) as mock:
|
||||
response = malwarehashregistry.sendReq(hash)
|
||||
mock.assert_called_once_with(options, hash, flags)
|
||||
self.assertIsNotNone(response)
|
||||
self.assertEqual(response, {"hash": "84af04b8e69682782607a0c5796ca56999eda6b3", "last_seen": "2019-15-07 03:30:33", "av_detection_percentage": 35})
|
||||
|
||||
def test_sendReqNoData(self):
|
||||
output = "84af04b8e69682782607a0c5796ca5696b3 NO_DATA"
|
||||
hash = "abcd1234"
|
||||
server = "hash.cymru.com"
|
||||
flags = 0
|
||||
options = {"whoishost": server}
|
||||
with patch('whois.NICClient.whois_lookup', new=MagicMock(return_value=output)) as mock:
|
||||
response = malwarehashregistry.sendReq(hash)
|
||||
mock.assert_called_once_with(options, hash, flags)
|
||||
self.assertIsNotNone(response)
|
||||
self.assertEqual(response, {"hash": "84af04b8e69682782607a0c5796ca5696b3", "last_seen": "NO_DATA", "av_detection_percentage": 0})
|
||||
|
||||
def test_prepareResults_none(self):
|
||||
raw = {"hash": "14af04b8e69682782607a0c5796ca56999eda6b3", "last_seen": "NO_DATA", "av_detection_percentage": 0}
|
||||
results = malwarehashregistry.prepareResults(raw)
|
||||
self.assertEqual(results["response"], raw)
|
||||
self.assertEqual(results["summary"], "no_results")
|
||||
self.assertEqual(results["status"], "ok")
|
||||
|
||||
def test_prepareResults_harmless(self):
|
||||
raw = {"hash": "14af04b8e69682782607a0c5796ca56999eda6b3", "last_seen": "123456", "av_detection_percentage": 0}
|
||||
results = malwarehashregistry.prepareResults(raw)
|
||||
self.assertEqual(results["response"], raw)
|
||||
self.assertEqual(results["summary"], "harmless")
|
||||
self.assertEqual(results["status"], "ok")
|
||||
|
||||
def test_prepareResults_sus(self):
|
||||
raw = {"hash": "14af04b8e69682782607a0c5796ca56999eda6b3", "last_seen": "123456", "av_detection_percentage": 1}
|
||||
results = malwarehashregistry.prepareResults(raw)
|
||||
self.assertEqual(results["response"], raw)
|
||||
self.assertEqual(results["summary"], "suspicious")
|
||||
self.assertEqual(results["status"], "caution")
|
||||
|
||||
def test_prepareResults_mal(self):
|
||||
raw = {"hash": "14af04b8e69682782607a0c5796ca56999eda6b3", "last_seen": "123456", "av_detection_percentage": 51}
|
||||
results = malwarehashregistry.prepareResults(raw)
|
||||
self.assertEqual(results["response"], raw)
|
||||
self.assertEqual(results["summary"], "malicious")
|
||||
self.assertEqual(results["status"], "threat")
|
||||
|
||||
def test_prepareResults_error(self):
|
||||
raw = {}
|
||||
results = malwarehashregistry.prepareResults(raw)
|
||||
self.assertEqual(results["response"], raw)
|
||||
self.assertEqual(results["summary"], "internal_failure")
|
||||
self.assertEqual(results["status"], "caution")
|
||||
|
||||
def test_analyze(self):
|
||||
output = {"hash": "14af04b8e69682782607a0c5796ca56999eda6b3", "last_seen": "NO_DATA", "av_detection_percentage": 0}
|
||||
artifactInput = '{"value": "14af04b8e69682782607a0c5796ca56999eda6b3", "artifactType": "hash"}'
|
||||
with patch('malwarehashregistry.malwarehashregistry.sendReq', new=MagicMock(return_value=output)) as mock:
|
||||
results = malwarehashregistry.analyze(artifactInput)
|
||||
self.assertEqual(results["summary"], "no_results")
|
||||
mock.assert_called_once()
|
||||
@@ -0,0 +1,2 @@
|
||||
requests>=2.27.1
|
||||
python-whois>=0.7.3
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Reference in New Issue
Block a user