Files
securityonion/salt/sensoroni/files/analyzers/malwarebazaar/malwarebazaar.py
2023-12-15 03:00:43 -05:00

158 lines
5.5 KiB
Python

import requests
import helpers
import json
import sys
# supports querying for hash, gimphash, tlsh, and telfhash
# usage is as follows:
# python3 malwarebazaar.py '{"artifactType":"x", "value":"y"}'
def buildReq(observ_type, observ_value):
# determine correct query type to send based off of observable type
unique_types = {'gimphash': 1, 'telfhash': 1, 'tlsh': 1}
if observ_type in unique_types:
qtype = 'get_' + observ_type
else:
qtype = 'get_info'
return {'query': qtype, observ_type: observ_value}
def sendReq(meta, query):
# send a post request with our compiled query to the API
url = meta['baseUrl']
response = requests.post(url, query)
return response.json()
def isInJson(data, target_string, maxdepth):
# searches a JSON object for an occurance of a string
# recursively.
# depth limiter (arbitrary value of 1000)
if maxdepth > 1000:
return False
if isinstance(data, dict):
for key, value in data.items():
if isinstance(value, (dict, list)):
# recursive call
if isInJson(value, target_string, maxdepth + 1):
return True
elif isinstance(value, str) and target_string in value.lower():
# found target string
return True
elif isinstance(data, list):
for item in data:
if isinstance(item, (dict, list)):
# recursive call
if isInJson(item, target_string, maxdepth + 1):
return True
elif isinstance(item, str) and target_string in item.lower():
# found target string
return True
return False
def prepareResults(raw):
# parse raw API response, gauge threat level and return status and a short summary
if raw == {}:
status = 'caution'
summary = 'internal_failure'
elif raw['query_status'] == 'ok':
parsed = raw['data'][0]
vendor_data = parsed['vendor_intel']
# get summary
if parsed['signature']:
summary = parsed['signature']
elif parsed['tags']:
summary = str(parsed['tags'][0])
elif vendor_data['YOROI_YOMI']:
summary = vendor_data['YOROI_YOMI']['detection']
# gauge vendors to determine an approximation of status, normalized to a value out of 100
# only updates score if it finds a higher indicator value
score = 0
if 'vxCube' in vendor_data:
score = int(vendor_data['vxCube']['maliciousness'])
if 'Triage' in vendor_data:
score = int(vendor_data['Triage']['score'])*10 if int(
vendor_data['Triage']['score'])*10 > score else score
if 'DocGuard' in vendor_data:
score = int(vendor_data['DocGuard']['alertlevel'])*10 if int(
vendor_data['DocGuard']['alertlevel'])*10 > score else score
if 'YOROI_YOMI' in vendor_data:
score = int(float(vendor_data['YOROI_YOMI']['score']))*100 if int(
float(vendor_data['YOROI_YOMI']['score']))*100 > score else score
if 'Inquest' in vendor_data and vendor_data['Inquest']['verdict'] == 'MALICIOUS':
score = 100 if 100 > score else score
if 'ReversingLabs' in vendor_data and vendor_data['ReversingLabs']['status'] == 'MALICIOUS':
score = 100 if 100 > score else score
if 'Spamhaus_HBL' in vendor_data and vendor_data['Spamhaus_HBL'][0]['detection'] == 'MALICIOUS':
score = 100 if 100 > score else score
# compute status
if score >= 75 or isInJson(raw, 'MALICIOUS'.lower()):
# if score >= 75:
status = 'threat'
elif score >= 50:
status = 'caution'
elif score >= 25:
status = 'info'
else:
status = 'ok'
elif raw['query_status'] != 'ok':
status = 'info'
summary = 'no result'
return {'response': raw, 'summary': summary, 'status': status}
def analyze(input):
# put all of our methods together, pass them input, and return
# properly formatted json/python dict output
data = json.loads(input)
meta = helpers.loadMetadata(__file__)
helpers.checkSupportedType(meta, data["artifactType"])
if (data['artifactType'] == 'tlsh' or data['artifactType'] == 'gimphash' or data['artifactType'] == 'telfhash'):
# To get accurate reporting for TLSH, telfhash and gimphash, we deem it necessary to query
# twice for the sake of retrieving more specific data.
initialQuery = buildReq(data['artifactType'], data['value'])
initialRaw = sendReq(meta, initialQuery)
# To prevent double-querying when a tlsh/gimphash is invalid, this if statement is necessary.
if initialRaw['query_status'] == 'ok':
# Setting artifactType and value to our new re-query arguments
# to get a more detailed report.
data['artifactType'] = 'hash'
data['value'] = initialRaw['data'][0]['sha256_hash']
else:
return prepareResults(initialRaw)
query = buildReq(data['artifactType'], data['value'])
response = sendReq(meta, query)
return prepareResults(response)
def main():
if len(sys.argv) == 2:
results = analyze(sys.argv[1])
print(json.dumps(results))
else:
print("ERROR: Input is not in proper JSON format")
if __name__ == '__main__':
main()