mirror of
https://github.com/Security-Onion-Solutions/securityonion.git
synced 2026-04-26 22:47:49 +02:00
pushing everything at once
This commit is contained in:
@@ -0,0 +1,5 @@
|
||||
# Malwarebazaar
|
||||
|
||||
## Description
|
||||
Submit a gimphash, hash, tlsh, telfhash to Malwarebazaar for analysis.
|
||||
|
||||
@@ -0,0 +1,8 @@
|
||||
{
|
||||
"name": "Malwarebazaar",
|
||||
"version": "0.1",
|
||||
"author": "Security Onion Solutions",
|
||||
"description": "This analyzer queries Malwarebazaar to see if a hash, gimphash, tlsh, or telfhash is considered malicious.",
|
||||
"supportedTypes" : ["gimphash","hash","tlsh", "telfhash"],
|
||||
"baseUrl": "https://mb-api.abuse.ch/api/v1/"
|
||||
}
|
||||
@@ -0,0 +1,157 @@
|
||||
import requests
|
||||
import helpers
|
||||
import json
|
||||
import sys
|
||||
|
||||
# supports querying for hash, gimphash, tlsh, and telfhash
|
||||
# usage is as follows:
|
||||
# python3 malwarebazaar.py '{"artifactType":"x", "value":"y"}'
|
||||
|
||||
def buildReq(observ_type, observ_value):
|
||||
# determine correct query type to send based off of observable type
|
||||
unique_types = {'gimphash': 1, 'telfhash': 1, 'tlsh': 1}
|
||||
if observ_type in unique_types:
|
||||
qtype = 'get_' + observ_type
|
||||
else:
|
||||
qtype = 'get_info'
|
||||
return {'query': qtype, observ_type: observ_value}
|
||||
|
||||
|
||||
def sendReq(meta, query):
|
||||
# send a post request with our compiled query to the API
|
||||
url = meta['baseUrl']
|
||||
response = requests.post(url, query)
|
||||
return response.json()
|
||||
|
||||
|
||||
def isInJson(data, target_string, maxdepth):
|
||||
# searches a JSON object for an occurance of a string
|
||||
# recursively.
|
||||
|
||||
# depth limiter (arbitrary value of 1000)
|
||||
if maxdepth > 1000:
|
||||
return False
|
||||
|
||||
if isinstance(data, dict):
|
||||
for key, value in data.items():
|
||||
if isinstance(value, (dict, list)):
|
||||
# recursive call
|
||||
if isInJson(value, target_string, maxdepth + 1):
|
||||
return True
|
||||
elif isinstance(value, str) and target_string in value.lower():
|
||||
# found target string
|
||||
return True
|
||||
|
||||
elif isinstance(data, list):
|
||||
for item in data:
|
||||
if isinstance(item, (dict, list)):
|
||||
# recursive call
|
||||
if isInJson(item, target_string, maxdepth + 1):
|
||||
return True
|
||||
elif isinstance(item, str) and target_string in item.lower():
|
||||
# found target string
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
|
||||
def prepareResults(raw):
|
||||
# parse raw API response, gauge threat level and return status and a short summary
|
||||
if raw == {}:
|
||||
status = 'caution'
|
||||
summary = 'internal_failure'
|
||||
elif raw['query_status'] == 'ok':
|
||||
parsed = raw['data'][0]
|
||||
vendor_data = parsed['vendor_intel']
|
||||
|
||||
# get summary
|
||||
if parsed['signature']:
|
||||
summary = parsed['signature']
|
||||
elif parsed['tags']:
|
||||
summary = str(parsed['tags'][0])
|
||||
elif vendor_data['YOROI_YOMI']:
|
||||
summary = vendor_data['YOROI_YOMI']['detection']
|
||||
|
||||
# gauge vendors to determine an approximation of status, normalized to a value out of 100
|
||||
# only updates score if it finds a higher indicator value
|
||||
score = 0
|
||||
if 'vxCube' in vendor_data:
|
||||
score = int(vendor_data['vxCube']['maliciousness'])
|
||||
|
||||
if 'Triage' in vendor_data:
|
||||
score = int(vendor_data['Triage']['score'])*10 if int(
|
||||
vendor_data['Triage']['score'])*10 > score else score
|
||||
|
||||
if 'DocGuard' in vendor_data:
|
||||
score = int(vendor_data['DocGuard']['alertlevel'])*10 if int(
|
||||
vendor_data['DocGuard']['alertlevel'])*10 > score else score
|
||||
|
||||
if 'YOROI_YOMI' in vendor_data:
|
||||
score = int(float(vendor_data['YOROI_YOMI']['score']))*100 if int(
|
||||
float(vendor_data['YOROI_YOMI']['score']))*100 > score else score
|
||||
|
||||
if 'Inquest' in vendor_data and vendor_data['Inquest']['verdict'] == 'MALICIOUS':
|
||||
score = 100 if 100 > score else score
|
||||
|
||||
if 'ReversingLabs' in vendor_data and vendor_data['ReversingLabs']['status'] == 'MALICIOUS':
|
||||
score = 100 if 100 > score else score
|
||||
|
||||
if 'Spamhaus_HBL' in vendor_data and vendor_data['Spamhaus_HBL'][0]['detection'] == 'MALICIOUS':
|
||||
score = 100 if 100 > score else score
|
||||
|
||||
# compute status
|
||||
if score >= 75 or isInJson(raw, 'MALICIOUS'.lower()):
|
||||
# if score >= 75:
|
||||
status = 'threat'
|
||||
elif score >= 50:
|
||||
status = 'caution'
|
||||
elif score >= 25:
|
||||
status = 'info'
|
||||
|
||||
else:
|
||||
status = 'ok'
|
||||
elif raw['query_status'] != 'ok':
|
||||
status = 'info'
|
||||
summary = 'no result'
|
||||
|
||||
return {'response': raw, 'summary': summary, 'status': status}
|
||||
|
||||
|
||||
def analyze(input):
|
||||
# put all of our methods together, pass them input, and return
|
||||
# properly formatted json/python dict output
|
||||
data = json.loads(input)
|
||||
meta = helpers.loadMetadata(__file__)
|
||||
helpers.checkSupportedType(meta, data["artifactType"])
|
||||
|
||||
if (data['artifactType'] == 'tlsh' or data['artifactType'] == 'gimphash' or data['artifactType'] == 'telfhash'):
|
||||
# To get accurate reporting for TLSH, telfhash and gimphash, we deem it necessary to query
|
||||
# twice for the sake of retrieving more specific data.
|
||||
|
||||
initialQuery = buildReq(data['artifactType'], data['value'])
|
||||
initialRaw = sendReq(meta, initialQuery)
|
||||
|
||||
# To prevent double-querying when a tlsh/gimphash is invalid, this if statement is necessary.
|
||||
if initialRaw['query_status'] == 'ok':
|
||||
# Setting artifactType and value to our new re-query arguments
|
||||
# to get a more detailed report.
|
||||
data['artifactType'] = 'hash'
|
||||
data['value'] = initialRaw['data'][0]['sha256_hash']
|
||||
else:
|
||||
return prepareResults(initialRaw)
|
||||
|
||||
query = buildReq(data['artifactType'], data['value'])
|
||||
response = sendReq(meta, query)
|
||||
return prepareResults(response)
|
||||
|
||||
|
||||
def main():
|
||||
if len(sys.argv) == 2:
|
||||
results = analyze(sys.argv[1])
|
||||
print(json.dumps(results))
|
||||
else:
|
||||
print("ERROR: Input is not in proper JSON format")
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
@@ -0,0 +1,66 @@
|
||||
from io import StringIO
|
||||
import sys
|
||||
from unittest.mock import patch, MagicMock
|
||||
import malwarebazaar
|
||||
import unittest
|
||||
|
||||
class TestMalwarebazaarMethods(unittest.TestCase):
|
||||
def test_main_missing_input(self):
|
||||
with patch('sys.stdout', new=StringIO()) as mock_cmd:
|
||||
sys.argv = ["cmd"]
|
||||
malwarebazaar.main()
|
||||
self.assertEqual(mock_cmd.getvalue(),
|
||||
'ERROR: Input is not in proper JSON format\n')
|
||||
|
||||
def test_main_success(self):
|
||||
with patch('sys.stdout', new=StringIO()) as mock_cmd:
|
||||
with patch('malwarebazaar.analyze', new=MagicMock(return_value={'test': 'val'})) as mock:
|
||||
sys.argv = ["cmd", "input"]
|
||||
malwarebazaar.main()
|
||||
expected = '{"test": "val"}\n'
|
||||
self.assertEqual(mock_cmd.getvalue(), expected)
|
||||
mock.assert_called_once()
|
||||
def test_analyze(self):
|
||||
"""simulated sendReq and prepareResults with 2 mock objects and variables sendReqOutput and prepareResultOutput,
|
||||
input created for analyze method call and then we compared results['summary'] with 'no result' """
|
||||
sendReqOutput = {'threat': 'no_result',"query_status":"ok",'data':[{'sha256_hash':'notavalidhash'}]}
|
||||
input = '{"artifactType":"hash", "value":"1234"}'
|
||||
input2 ='{"artifactType":"tlsh", "value":"1234"}'
|
||||
input3='{"artifactType":"gimphash", "value":"1234"}'
|
||||
prepareResultOutput = {'response': '',
|
||||
'summary': 'no result', 'status': 'info'}
|
||||
|
||||
with patch('malwarebazaar.sendReq', new=MagicMock(return_value=sendReqOutput)) as mock:
|
||||
with patch('malwarebazaar.prepareResults', new=MagicMock(return_value=prepareResultOutput)) as mock2:
|
||||
results = malwarebazaar.analyze(input)
|
||||
results2 = malwarebazaar.analyze(input2)
|
||||
results3 =malwarebazaar.analyze(input3)
|
||||
self.assertEqual(results["summary"],prepareResultOutput['summary'])
|
||||
self.assertEqual(results2["summary"], prepareResultOutput['summary'])
|
||||
self.assertEqual(results3["summary"], prepareResultOutput['summary'])
|
||||
self.assertEqual(results["status"], "info")
|
||||
self.assertEqual(results2["status"], "info")
|
||||
self.assertEqual(results3["status"], "info")
|
||||
|
||||
mock.assert_called()
|
||||
|
||||
def test_prepareResults_illegal_search_term(self):
|
||||
# illegal search term
|
||||
raw = {'query_status': 'illegal_search_term'}
|
||||
expected = {'response': raw, 'status': 'info', 'summary': 'no result'}
|
||||
results = malwarebazaar.prepareResults(raw)
|
||||
self.assertEqual(results, expected)
|
||||
|
||||
def test_buildReqGimqhash(self):
|
||||
result = malwarebazaar.buildReq('gimphash', '')
|
||||
self.assertEqual(
|
||||
result, {'query': 'get_gimphash', 'gimphash': ''})
|
||||
|
||||
def test_buildReqHash(self):
|
||||
result = malwarebazaar.buildReq('hash', '')
|
||||
self.assertEqual(
|
||||
result, {'query': 'get_info', 'hash': ''})
|
||||
def test_buildReqtlshhash(self):
|
||||
result = malwarebazaar.buildReq('tlsh', '')
|
||||
self.assertEqual(
|
||||
result, {'query': 'get_tlsh', 'tlsh': ''})
|
||||
@@ -0,0 +1,2 @@
|
||||
requests>=2.31.0
|
||||
pyyaml>=6.0
|
||||
BIN
Binary file not shown.
BIN
Binary file not shown.
BIN
Binary file not shown.
BIN
Binary file not shown.
BIN
Binary file not shown.
BIN
Binary file not shown.
Reference in New Issue
Block a user