From 614589153b907655538628e5e163aec8e16ea43a Mon Sep 17 00:00:00 2001 From: Wes Date: Tue, 19 Dec 2023 02:57:35 +0000 Subject: [PATCH] Update Malwarebazaar test and comply with flake8 --- .../analyzers/malwarebazaar/malwarebazaar.py | 312 +++++++++--------- .../malwarebazaar/malwarebazaar_test.py | 27 +- 2 files changed, 181 insertions(+), 158 deletions(-) diff --git a/salt/sensoroni/files/analyzers/malwarebazaar/malwarebazaar.py b/salt/sensoroni/files/analyzers/malwarebazaar/malwarebazaar.py index cfc7d9f3a..649f6881d 100755 --- a/salt/sensoroni/files/analyzers/malwarebazaar/malwarebazaar.py +++ b/salt/sensoroni/files/analyzers/malwarebazaar/malwarebazaar.py @@ -1,156 +1,156 @@ -import requests -import helpers -import json -import sys - -# supports querying for hash, gimphash, tlsh, and telfhash -# usage is as follows: -# python3 malwarebazaar.py '{"artifactType":"x", "value":"y"}' - - -def buildReq(observ_type, observ_value): - # determine correct query type to send based off of observable type - unique_types = {'gimphash': 1, 'telfhash': 1, 'tlsh': 1} - if observ_type in unique_types: - qtype = 'get_' + observ_type - else: - qtype = 'get_info' - return {'query': qtype, observ_type: observ_value} - - -def sendReq(meta, query): - # send a post request with our compiled query to the API - url = meta['baseUrl'] - response = requests.post(url, query) - return response.json() - - -def isInJson(data, target_string, maxdepth=1000, tail=0): - # searches a JSON object for an occurance of a string - # recursively. - # depth limiter (arbitrary default value of 1000) - if tail > maxdepth: - return False - - if isinstance(data, dict): - for key, value in data.items(): - if isinstance(value, (dict, list)): - # recursive call - if isInJson(value, target_string, maxdepth, tail + 1): - return True - elif isinstance(value, str) and target_string in value.lower(): - # found target string - return True - - elif isinstance(data, list): - for item in data: - if isinstance(item, (dict, list)): - # recursive call - if isInJson(item, target_string, maxdepth, tail + 1): - return True - elif isinstance(item, str) and target_string in item.lower(): - # found target string - return True - - return False - - -def prepareResults(raw): - # parse raw API response, gauge threat level - # and return status and a short summary - if raw == {}: - status = 'caution' - summary = 'internal_failure' - elif raw['query_status'] == 'ok': - parsed = raw['data'][0] - vendor_data = parsed['vendor_intel'] - - # get summary - if 'signature' in parsed: - summary = parsed['signature'] - elif 'tags' in parsed: - summary = str(parsed['tags'][0]) - elif 'YOROI_YOMI' in vendor_data: - summary = vendor_data['YOROI_YOMI']['detection'] - - # gauge vendors to determine an approximation of status, - # normalized to a value out of 100 - # only updates score if it finds a higher indicator value - score = 0 - vendor_info_list = [ - ('vxCube', 'maliciousness', int), - ('Triage', 'score', lambda x: int(x) * 10), - ('DocGuard', 'alertlevel', lambda x: int(x) * 10), - ('YOROI_YOMI', 'score', lambda x: int(float(x)) * 100), - ('Inquest', 'verdict', lambda x: 100 if x == 'MALICIOUS' else 0), - ('ReversingLabs', 'status', - lambda x: 100 if x == 'MALICIOUS' else 0), - ('Spamhaus_HBL', 'detection', - lambda x: 100 if x == 'MALICIOUS' else 0), - ] - for vendor, key, transform in vendor_info_list: - if vendor in vendor_data and key in vendor_data[vendor]: - value = vendor_data[vendor][key] - score = max(score, transform(value)) - # Ensure score is at least 0 (or some default value) - score = max(score, 0) - - # compute status - if score >= 75 or isInJson(raw, 'MALICIOUS'.lower(), 1001): - # if score >= 75: - status = 'threat' - elif score >= 50: - status = 'caution' - elif score >= 25: - status = 'info' - - else: - status = 'ok' - elif raw['query_status'] != 'ok': - status = 'info' - summary = 'no result' - - return {'response': raw, 'summary': summary, 'status': status} - - -def analyze(input): - # put all of our methods together, pass them input, and return - # properly formatted json/python dict output - data = json.loads(input) - meta = helpers.loadMetadata(__file__) - helpers.checkSupportedType(meta, data["artifactType"]) - - if (data['artifactType'] == 'tlsh' or data['artifactType'] == 'gimphash' - or data['artifactType'] == 'telfhash'): - # To get accurate reporting for TLSH, telfhash and gimphash, - # we deem it necessary to query - # twice for the sake of retrieving more specific data. - - initialQuery = buildReq(data['artifactType'], data['value']) - initialRaw = sendReq(meta, initialQuery) - - # To prevent double-querying when a tlsh/gimphash is invalid, - # this if statement is necessary. - if initialRaw['query_status'] == 'ok': - # Setting artifactType and value to our new re-query arguments - # to get a more detailed report. - data['artifactType'] = 'hash' - data['value'] = initialRaw['data'][0]['sha256_hash'] - else: - return prepareResults(initialRaw) - - query = buildReq(data['artifactType'], data['value']) - response = sendReq(meta, query) - return prepareResults(response) - - -def main(): - if len(sys.argv) == 2: - results = analyze(sys.argv[1]) - print(json.dumps(results)) - else: - print("ERROR: Input is not in proper JSON format") - - -if __name__ == '__main__': - main() +import requests +import helpers +import json +import sys + +# supports querying for hash, gimphash, tlsh, and telfhash +# usage is as follows: +# python3 malwarebazaar.py '{"artifactType":"x", "value":"y"}' + + +def buildReq(observ_type, observ_value): + # determine correct query type to send based off of observable type + unique_types = {'gimphash': 1, 'telfhash': 1, 'tlsh': 1} + if observ_type in unique_types: + qtype = 'get_' + observ_type + else: + qtype = 'get_info' + return {'query': qtype, observ_type: observ_value} + + +def sendReq(meta, query): + # send a post request with our compiled query to the API + url = meta['baseUrl'] + response = requests.post(url, query) + return response.json() + + +def isInJson(data, target_string, maxdepth=1000, tail=0): + # searches a JSON object for an occurance of a string + # recursively. + # depth limiter (arbitrary default value of 1000) + if tail > maxdepth: + return False + + if isinstance(data, dict): + for key, value in data.items(): + if isinstance(value, (dict, list)): + # recursive call + if isInJson(value, target_string, maxdepth, tail + 1): + return True + elif isinstance(value, str) and target_string in value.lower(): + # found target string + return True + + elif isinstance(data, list): + for item in data: + if isinstance(item, (dict, list)): + # recursive call + if isInJson(item, target_string, maxdepth, tail + 1): + return True + elif isinstance(item, str) and target_string in item.lower(): + # found target string + return True + + return False + + +def prepareResults(raw): + # parse raw API response, gauge threat level + # and return status and a short summary + if raw == {}: + status = 'caution' + summary = 'internal_failure' + elif raw['query_status'] == 'ok': + parsed = raw['data'][0] + vendor_data = parsed['vendor_intel'] + + # get summary + if 'signature' in parsed: + summary = parsed['signature'] + elif 'tags' in parsed: + summary = str(parsed['tags'][0]) + elif 'YOROI_YOMI' in vendor_data: + summary = vendor_data['YOROI_YOMI']['detection'] + + # gauge vendors to determine an approximation of status, + # normalized to a value out of 100 + # only updates score if it finds a higher indicator value + score = 0 + vendor_info_list = [ + ('vxCube', 'maliciousness', int), + ('Triage', 'score', lambda x: int(x) * 10), + ('DocGuard', 'alertlevel', lambda x: int(x) * 10), + ('YOROI_YOMI', 'score', lambda x: int(float(x)) * 100), + ('Inquest', 'verdict', lambda x: 100 if x == 'MALICIOUS' else 0), + ('ReversingLabs', 'status', + lambda x: 100 if x == 'MALICIOUS' else 0), + ('Spamhaus_HBL', 'detection', + lambda x: 100 if x == 'MALICIOUS' else 0), + ] + for vendor, key, transform in vendor_info_list: + if vendor in vendor_data and key in vendor_data[vendor]: + value = vendor_data[vendor][key] + score = max(score, transform(value)) + # Ensure score is at least 0 (or some default value) + score = max(score, 0) + + # compute status + if score >= 75 or isInJson(raw, 'MALICIOUS'.lower(), 1001): + # if score >= 75: + status = 'threat' + elif score >= 50: + status = 'caution' + elif score >= 25: + status = 'info' + + else: + status = 'ok' + elif raw['query_status'] != 'ok': + status = 'info' + summary = 'no result' + + return {'response': raw, 'summary': summary, 'status': status} + + +def analyze(input): + # put all of our methods together, pass them input, and return + # properly formatted json/python dict output + data = json.loads(input) + meta = helpers.loadMetadata(__file__) + helpers.checkSupportedType(meta, data["artifactType"]) + + if (data['artifactType'] == 'tlsh' or data['artifactType'] == 'gimphash' + or data['artifactType'] == 'telfhash'): + # To get accurate reporting for TLSH, telfhash and gimphash, + # we deem it necessary to query + # twice for the sake of retrieving more specific data. + + initialQuery = buildReq(data['artifactType'], data['value']) + initialRaw = sendReq(meta, initialQuery) + + # To prevent double-querying when a tlsh/gimphash is invalid, + # this if statement is necessary. + if initialRaw['query_status'] == 'ok': + # Setting artifactType and value to our new re-query arguments + # to get a more detailed report. + data['artifactType'] = 'hash' + data['value'] = initialRaw['data'][0]['sha256_hash'] + else: + return prepareResults(initialRaw) + + query = buildReq(data['artifactType'], data['value']) + response = sendReq(meta, query) + return prepareResults(response) + + +def main(): + if len(sys.argv) == 2: + results = analyze(sys.argv[1]) + print(json.dumps(results)) + else: + print("ERROR: Input is not in proper JSON format") + + +if __name__ == '__main__': + main() diff --git a/salt/sensoroni/files/analyzers/malwarebazaar/malwarebazaar_test.py b/salt/sensoroni/files/analyzers/malwarebazaar/malwarebazaar_test.py index 225eb5a73..212882048 100644 --- a/salt/sensoroni/files/analyzers/malwarebazaar/malwarebazaar_test.py +++ b/salt/sensoroni/files/analyzers/malwarebazaar/malwarebazaar_test.py @@ -23,6 +23,18 @@ class TestMalwarebazaarMethods(unittest.TestCase): self.assertEqual(mock_cmd.getvalue(), expected) mock.assert_called_once() + def test_isInJson_tail_greater_than_max_depth(self): + max_depth = 1000 + tail = 2000 + test_string = "helo" + input_json = { + "value": "test", + "test": "value", + "arr": ["Foo", "Bar", "Hello"], + "dict1": {"key1": "val", "key2": "helo"} + } + self.assertEqual(malwarebazaar.isInJson(input_json, test_string, max_depth, tail), False) + def test_isInJson_string_found_in_dict(self): test_string = "helo" input_json = { @@ -33,6 +45,18 @@ class TestMalwarebazaarMethods(unittest.TestCase): } self.assertEqual(malwarebazaar.isInJson(input_json, test_string), True) + def test_isInJson_dict_in_list(self): + max_depth = 1000 + tail = 1 + test_string = "helo" + input_json = { + "key1": "test", + "key2": "value", + "key3": ["Foo", "Bar", "Hello"], + "nested_list": [{"key1": "val", "key2": "helo"}] + } + self.assertEqual(malwarebazaar.isInJson(input_json, test_string, max_depth, tail), True) + def test_isInJson_string_found_in_arr(self): test_string = "helo" input_json = { @@ -51,8 +75,7 @@ class TestMalwarebazaarMethods(unittest.TestCase): "arr": ["Foo", "Bar", "helo"], "dict1": {"Hello": "val", "key": "val"} } - self.assertEqual(malwarebazaar.isInJson( - input_json, test_string), False) + self.assertEqual(malwarebazaar.isInJson(input_json, test_string), False) def test_analyze(self): """simulated sendReq and prepareResults with 2 mock objects