mirror of
https://github.com/Security-Onion-Solutions/securityonion.git
synced 2025-12-06 09:12:45 +01:00
Update Malwarebazaar test and comply with flake8
This commit is contained in:
@@ -1,156 +1,156 @@
|
|||||||
import requests
|
import requests
|
||||||
import helpers
|
import helpers
|
||||||
import json
|
import json
|
||||||
import sys
|
import sys
|
||||||
|
|
||||||
# supports querying for hash, gimphash, tlsh, and telfhash
|
# supports querying for hash, gimphash, tlsh, and telfhash
|
||||||
# usage is as follows:
|
# usage is as follows:
|
||||||
# python3 malwarebazaar.py '{"artifactType":"x", "value":"y"}'
|
# python3 malwarebazaar.py '{"artifactType":"x", "value":"y"}'
|
||||||
|
|
||||||
|
|
||||||
def buildReq(observ_type, observ_value):
|
def buildReq(observ_type, observ_value):
|
||||||
# determine correct query type to send based off of observable type
|
# determine correct query type to send based off of observable type
|
||||||
unique_types = {'gimphash': 1, 'telfhash': 1, 'tlsh': 1}
|
unique_types = {'gimphash': 1, 'telfhash': 1, 'tlsh': 1}
|
||||||
if observ_type in unique_types:
|
if observ_type in unique_types:
|
||||||
qtype = 'get_' + observ_type
|
qtype = 'get_' + observ_type
|
||||||
else:
|
else:
|
||||||
qtype = 'get_info'
|
qtype = 'get_info'
|
||||||
return {'query': qtype, observ_type: observ_value}
|
return {'query': qtype, observ_type: observ_value}
|
||||||
|
|
||||||
|
|
||||||
def sendReq(meta, query):
|
def sendReq(meta, query):
|
||||||
# send a post request with our compiled query to the API
|
# send a post request with our compiled query to the API
|
||||||
url = meta['baseUrl']
|
url = meta['baseUrl']
|
||||||
response = requests.post(url, query)
|
response = requests.post(url, query)
|
||||||
return response.json()
|
return response.json()
|
||||||
|
|
||||||
|
|
||||||
def isInJson(data, target_string, maxdepth=1000, tail=0):
|
def isInJson(data, target_string, maxdepth=1000, tail=0):
|
||||||
# searches a JSON object for an occurance of a string
|
# searches a JSON object for an occurance of a string
|
||||||
# recursively.
|
# recursively.
|
||||||
# depth limiter (arbitrary default value of 1000)
|
# depth limiter (arbitrary default value of 1000)
|
||||||
if tail > maxdepth:
|
if tail > maxdepth:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
if isinstance(data, dict):
|
if isinstance(data, dict):
|
||||||
for key, value in data.items():
|
for key, value in data.items():
|
||||||
if isinstance(value, (dict, list)):
|
if isinstance(value, (dict, list)):
|
||||||
# recursive call
|
# recursive call
|
||||||
if isInJson(value, target_string, maxdepth, tail + 1):
|
if isInJson(value, target_string, maxdepth, tail + 1):
|
||||||
return True
|
return True
|
||||||
elif isinstance(value, str) and target_string in value.lower():
|
elif isinstance(value, str) and target_string in value.lower():
|
||||||
# found target string
|
# found target string
|
||||||
return True
|
return True
|
||||||
|
|
||||||
elif isinstance(data, list):
|
elif isinstance(data, list):
|
||||||
for item in data:
|
for item in data:
|
||||||
if isinstance(item, (dict, list)):
|
if isinstance(item, (dict, list)):
|
||||||
# recursive call
|
# recursive call
|
||||||
if isInJson(item, target_string, maxdepth, tail + 1):
|
if isInJson(item, target_string, maxdepth, tail + 1):
|
||||||
return True
|
return True
|
||||||
elif isinstance(item, str) and target_string in item.lower():
|
elif isinstance(item, str) and target_string in item.lower():
|
||||||
# found target string
|
# found target string
|
||||||
return True
|
return True
|
||||||
|
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
|
||||||
def prepareResults(raw):
|
def prepareResults(raw):
|
||||||
# parse raw API response, gauge threat level
|
# parse raw API response, gauge threat level
|
||||||
# and return status and a short summary
|
# and return status and a short summary
|
||||||
if raw == {}:
|
if raw == {}:
|
||||||
status = 'caution'
|
status = 'caution'
|
||||||
summary = 'internal_failure'
|
summary = 'internal_failure'
|
||||||
elif raw['query_status'] == 'ok':
|
elif raw['query_status'] == 'ok':
|
||||||
parsed = raw['data'][0]
|
parsed = raw['data'][0]
|
||||||
vendor_data = parsed['vendor_intel']
|
vendor_data = parsed['vendor_intel']
|
||||||
|
|
||||||
# get summary
|
# get summary
|
||||||
if 'signature' in parsed:
|
if 'signature' in parsed:
|
||||||
summary = parsed['signature']
|
summary = parsed['signature']
|
||||||
elif 'tags' in parsed:
|
elif 'tags' in parsed:
|
||||||
summary = str(parsed['tags'][0])
|
summary = str(parsed['tags'][0])
|
||||||
elif 'YOROI_YOMI' in vendor_data:
|
elif 'YOROI_YOMI' in vendor_data:
|
||||||
summary = vendor_data['YOROI_YOMI']['detection']
|
summary = vendor_data['YOROI_YOMI']['detection']
|
||||||
|
|
||||||
# gauge vendors to determine an approximation of status,
|
# gauge vendors to determine an approximation of status,
|
||||||
# normalized to a value out of 100
|
# normalized to a value out of 100
|
||||||
# only updates score if it finds a higher indicator value
|
# only updates score if it finds a higher indicator value
|
||||||
score = 0
|
score = 0
|
||||||
vendor_info_list = [
|
vendor_info_list = [
|
||||||
('vxCube', 'maliciousness', int),
|
('vxCube', 'maliciousness', int),
|
||||||
('Triage', 'score', lambda x: int(x) * 10),
|
('Triage', 'score', lambda x: int(x) * 10),
|
||||||
('DocGuard', 'alertlevel', lambda x: int(x) * 10),
|
('DocGuard', 'alertlevel', lambda x: int(x) * 10),
|
||||||
('YOROI_YOMI', 'score', lambda x: int(float(x)) * 100),
|
('YOROI_YOMI', 'score', lambda x: int(float(x)) * 100),
|
||||||
('Inquest', 'verdict', lambda x: 100 if x == 'MALICIOUS' else 0),
|
('Inquest', 'verdict', lambda x: 100 if x == 'MALICIOUS' else 0),
|
||||||
('ReversingLabs', 'status',
|
('ReversingLabs', 'status',
|
||||||
lambda x: 100 if x == 'MALICIOUS' else 0),
|
lambda x: 100 if x == 'MALICIOUS' else 0),
|
||||||
('Spamhaus_HBL', 'detection',
|
('Spamhaus_HBL', 'detection',
|
||||||
lambda x: 100 if x == 'MALICIOUS' else 0),
|
lambda x: 100 if x == 'MALICIOUS' else 0),
|
||||||
]
|
]
|
||||||
for vendor, key, transform in vendor_info_list:
|
for vendor, key, transform in vendor_info_list:
|
||||||
if vendor in vendor_data and key in vendor_data[vendor]:
|
if vendor in vendor_data and key in vendor_data[vendor]:
|
||||||
value = vendor_data[vendor][key]
|
value = vendor_data[vendor][key]
|
||||||
score = max(score, transform(value))
|
score = max(score, transform(value))
|
||||||
# Ensure score is at least 0 (or some default value)
|
# Ensure score is at least 0 (or some default value)
|
||||||
score = max(score, 0)
|
score = max(score, 0)
|
||||||
|
|
||||||
# compute status
|
# compute status
|
||||||
if score >= 75 or isInJson(raw, 'MALICIOUS'.lower(), 1001):
|
if score >= 75 or isInJson(raw, 'MALICIOUS'.lower(), 1001):
|
||||||
# if score >= 75:
|
# if score >= 75:
|
||||||
status = 'threat'
|
status = 'threat'
|
||||||
elif score >= 50:
|
elif score >= 50:
|
||||||
status = 'caution'
|
status = 'caution'
|
||||||
elif score >= 25:
|
elif score >= 25:
|
||||||
status = 'info'
|
status = 'info'
|
||||||
|
|
||||||
else:
|
else:
|
||||||
status = 'ok'
|
status = 'ok'
|
||||||
elif raw['query_status'] != 'ok':
|
elif raw['query_status'] != 'ok':
|
||||||
status = 'info'
|
status = 'info'
|
||||||
summary = 'no result'
|
summary = 'no result'
|
||||||
|
|
||||||
return {'response': raw, 'summary': summary, 'status': status}
|
return {'response': raw, 'summary': summary, 'status': status}
|
||||||
|
|
||||||
|
|
||||||
def analyze(input):
|
def analyze(input):
|
||||||
# put all of our methods together, pass them input, and return
|
# put all of our methods together, pass them input, and return
|
||||||
# properly formatted json/python dict output
|
# properly formatted json/python dict output
|
||||||
data = json.loads(input)
|
data = json.loads(input)
|
||||||
meta = helpers.loadMetadata(__file__)
|
meta = helpers.loadMetadata(__file__)
|
||||||
helpers.checkSupportedType(meta, data["artifactType"])
|
helpers.checkSupportedType(meta, data["artifactType"])
|
||||||
|
|
||||||
if (data['artifactType'] == 'tlsh' or data['artifactType'] == 'gimphash'
|
if (data['artifactType'] == 'tlsh' or data['artifactType'] == 'gimphash'
|
||||||
or data['artifactType'] == 'telfhash'):
|
or data['artifactType'] == 'telfhash'):
|
||||||
# To get accurate reporting for TLSH, telfhash and gimphash,
|
# To get accurate reporting for TLSH, telfhash and gimphash,
|
||||||
# we deem it necessary to query
|
# we deem it necessary to query
|
||||||
# twice for the sake of retrieving more specific data.
|
# twice for the sake of retrieving more specific data.
|
||||||
|
|
||||||
initialQuery = buildReq(data['artifactType'], data['value'])
|
initialQuery = buildReq(data['artifactType'], data['value'])
|
||||||
initialRaw = sendReq(meta, initialQuery)
|
initialRaw = sendReq(meta, initialQuery)
|
||||||
|
|
||||||
# To prevent double-querying when a tlsh/gimphash is invalid,
|
# To prevent double-querying when a tlsh/gimphash is invalid,
|
||||||
# this if statement is necessary.
|
# this if statement is necessary.
|
||||||
if initialRaw['query_status'] == 'ok':
|
if initialRaw['query_status'] == 'ok':
|
||||||
# Setting artifactType and value to our new re-query arguments
|
# Setting artifactType and value to our new re-query arguments
|
||||||
# to get a more detailed report.
|
# to get a more detailed report.
|
||||||
data['artifactType'] = 'hash'
|
data['artifactType'] = 'hash'
|
||||||
data['value'] = initialRaw['data'][0]['sha256_hash']
|
data['value'] = initialRaw['data'][0]['sha256_hash']
|
||||||
else:
|
else:
|
||||||
return prepareResults(initialRaw)
|
return prepareResults(initialRaw)
|
||||||
|
|
||||||
query = buildReq(data['artifactType'], data['value'])
|
query = buildReq(data['artifactType'], data['value'])
|
||||||
response = sendReq(meta, query)
|
response = sendReq(meta, query)
|
||||||
return prepareResults(response)
|
return prepareResults(response)
|
||||||
|
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
if len(sys.argv) == 2:
|
if len(sys.argv) == 2:
|
||||||
results = analyze(sys.argv[1])
|
results = analyze(sys.argv[1])
|
||||||
print(json.dumps(results))
|
print(json.dumps(results))
|
||||||
else:
|
else:
|
||||||
print("ERROR: Input is not in proper JSON format")
|
print("ERROR: Input is not in proper JSON format")
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
main()
|
main()
|
||||||
|
|||||||
@@ -23,6 +23,18 @@ class TestMalwarebazaarMethods(unittest.TestCase):
|
|||||||
self.assertEqual(mock_cmd.getvalue(), expected)
|
self.assertEqual(mock_cmd.getvalue(), expected)
|
||||||
mock.assert_called_once()
|
mock.assert_called_once()
|
||||||
|
|
||||||
|
def test_isInJson_tail_greater_than_max_depth(self):
|
||||||
|
max_depth = 1000
|
||||||
|
tail = 2000
|
||||||
|
test_string = "helo"
|
||||||
|
input_json = {
|
||||||
|
"value": "test",
|
||||||
|
"test": "value",
|
||||||
|
"arr": ["Foo", "Bar", "Hello"],
|
||||||
|
"dict1": {"key1": "val", "key2": "helo"}
|
||||||
|
}
|
||||||
|
self.assertEqual(malwarebazaar.isInJson(input_json, test_string, max_depth, tail), False)
|
||||||
|
|
||||||
def test_isInJson_string_found_in_dict(self):
|
def test_isInJson_string_found_in_dict(self):
|
||||||
test_string = "helo"
|
test_string = "helo"
|
||||||
input_json = {
|
input_json = {
|
||||||
@@ -33,6 +45,18 @@ class TestMalwarebazaarMethods(unittest.TestCase):
|
|||||||
}
|
}
|
||||||
self.assertEqual(malwarebazaar.isInJson(input_json, test_string), True)
|
self.assertEqual(malwarebazaar.isInJson(input_json, test_string), True)
|
||||||
|
|
||||||
|
def test_isInJson_dict_in_list(self):
|
||||||
|
max_depth = 1000
|
||||||
|
tail = 1
|
||||||
|
test_string = "helo"
|
||||||
|
input_json = {
|
||||||
|
"key1": "test",
|
||||||
|
"key2": "value",
|
||||||
|
"key3": ["Foo", "Bar", "Hello"],
|
||||||
|
"nested_list": [{"key1": "val", "key2": "helo"}]
|
||||||
|
}
|
||||||
|
self.assertEqual(malwarebazaar.isInJson(input_json, test_string, max_depth, tail), True)
|
||||||
|
|
||||||
def test_isInJson_string_found_in_arr(self):
|
def test_isInJson_string_found_in_arr(self):
|
||||||
test_string = "helo"
|
test_string = "helo"
|
||||||
input_json = {
|
input_json = {
|
||||||
@@ -51,8 +75,7 @@ class TestMalwarebazaarMethods(unittest.TestCase):
|
|||||||
"arr": ["Foo", "Bar", "helo"],
|
"arr": ["Foo", "Bar", "helo"],
|
||||||
"dict1": {"Hello": "val", "key": "val"}
|
"dict1": {"Hello": "val", "key": "val"}
|
||||||
}
|
}
|
||||||
self.assertEqual(malwarebazaar.isInJson(
|
self.assertEqual(malwarebazaar.isInJson(input_json, test_string), False)
|
||||||
input_json, test_string), False)
|
|
||||||
|
|
||||||
def test_analyze(self):
|
def test_analyze(self):
|
||||||
"""simulated sendReq and prepareResults with 2 mock objects
|
"""simulated sendReq and prepareResults with 2 mock objects
|
||||||
|
|||||||
Reference in New Issue
Block a user