malwarebazaar dep upgrades + use auth

This commit is contained in:
reyesj2
2025-08-20 20:59:23 -05:00
parent 9ca0c7d53a
commit 87a28e8ce7
15 changed files with 71 additions and 32 deletions

View File

@@ -34,6 +34,8 @@ sensoroni:
api_version: community
localfile:
file_path: []
malwarebazaar:
api_key:
otx:
base_url: https://otx.alienvault.com/api/v1/
api_key:

View File

@@ -2,12 +2,21 @@ import requests
import helpers
import json
import sys
import os
import argparse
# supports querying for hash, gimphash, tlsh, and telfhash
# usage is as follows:
# python3 malwarebazaar.py '{"artifactType":"x", "value":"y"}'
def checkConfigRequirements(conf):
if not conf.get('api_key'):
sys.exit(126)
else:
return True
def buildReq(observ_type, observ_value):
# determine correct query type to send based off of observable type
unique_types = {'gimphash': 1, 'telfhash': 1, 'tlsh': 1}
@@ -18,10 +27,13 @@ def buildReq(observ_type, observ_value):
return {'query': qtype, observ_type: observ_value}
def sendReq(meta, query):
def sendReq(conf, meta, query):
# send a post request with our compiled query to the API
url = meta['baseUrl']
response = requests.post(url, query)
headers = {}
if conf.get('api_key'):
headers['Auth-Key'] = conf['api_key']
response = requests.post(url, query, headers=headers)
return response.json()
@@ -113,10 +125,11 @@ def prepareResults(raw):
return {'response': raw, 'summary': summary, 'status': status}
def analyze(input):
def analyze(conf, input):
# put all of our methods together, pass them input, and return
# properly formatted json/python dict output
data = json.loads(input)
checkConfigRequirements(conf)
data = helpers.parseArtifact(input)
meta = helpers.loadMetadata(__file__)
helpers.checkSupportedType(meta, data["artifactType"])
@@ -127,7 +140,7 @@ def analyze(input):
# twice for the sake of retrieving more specific data.
initialQuery = buildReq(data['artifactType'], data['value'])
initialRaw = sendReq(meta, initialQuery)
initialRaw = sendReq(conf, meta, initialQuery)
# To prevent double-querying when a tlsh/gimphash is invalid,
# this if statement is necessary.
@@ -140,16 +153,22 @@ def analyze(input):
return prepareResults(initialRaw)
query = buildReq(data['artifactType'], data['value'])
response = sendReq(meta, query)
response = sendReq(conf, meta, query)
return prepareResults(response)
def main():
if len(sys.argv) == 2:
results = analyze(sys.argv[1])
dir = os.path.dirname(os.path.realpath(__file__))
parser = argparse.ArgumentParser(
description='Search MalwareBazaar for a given artifact')
parser.add_argument(
'artifact', help='the artifact represented in JSON format')
parser.add_argument('-c', '--config', metavar='CONFIG_FILE', default=dir + '/malwarebazaar.yaml',
help='optional config file to use instead of the default config file')
args = parser.parse_args()
if args.artifact:
results = analyze(helpers.loadConfig(args.config), args.artifact)
print(json.dumps(results))
else:
print("ERROR: Input is not in proper JSON format")
if __name__ == '__main__':

View File

@@ -0,0 +1 @@
api: "{{ salt['pillar.get']('sensoroni:analyzers:malwarebazaar:api_key', '') }}"

View File

@@ -6,22 +6,18 @@ import unittest
class TestMalwarebazaarMethods(unittest.TestCase):
def test_main_missing_input(self):
with patch('sys.stdout', new=StringIO()) as mock_cmd:
sys.argv = ["cmd"]
malwarebazaar.main()
self.assertEqual(mock_cmd.getvalue(),
'ERROR: Input is not in proper JSON format\n')
def test_main_success(self):
with patch('sys.stdout', new=StringIO()) as mock_cmd:
with patch('malwarebazaar.malwarebazaar.analyze',
new=MagicMock(return_value={'test': 'val'})) as mock:
sys.argv = ["cmd", "input"]
malwarebazaar.main()
expected = '{"test": "val"}\n'
self.assertEqual(mock_cmd.getvalue(), expected)
mock.assert_called_once()
output = {"test": "val"}
config = {"api_key": "test_key"}
with patch('sys.stdout', new=StringIO()) as mock_stdout:
with patch('malwarebazaar.malwarebazaar.analyze', new=MagicMock(return_value=output)) as mock_analyze:
with patch('helpers.loadConfig', new=MagicMock(return_value=config)) as mock_config:
sys.argv = ["cmd", "input"]
malwarebazaar.main()
expected = '{"test": "val"}\n'
self.assertEqual(mock_stdout.getvalue(), expected)
mock_analyze.assert_called_once()
mock_config.assert_called_once()
def test_isInJson_tail_greater_than_max_depth(self):
max_depth = 1000
@@ -84,6 +80,7 @@ class TestMalwarebazaarMethods(unittest.TestCase):
and then we compared results['summary'] with 'no result' """
sendReqOutput = {'threat': 'no_result', "query_status": "ok",
'data': [{'sha256_hash': 'notavalidhash'}]}
config = {"api_key": "test_key"}
input = '{"artifactType": "hash", "value": "1234"}'
input2 = '{"artifactType": "tlsh", "value": "1234"}'
input3 = '{"artifactType": "gimphash", "value": "1234"}'
@@ -94,9 +91,9 @@ class TestMalwarebazaarMethods(unittest.TestCase):
new=MagicMock(return_value=sendReqOutput)) as mock:
with patch('malwarebazaar.malwarebazaar.prepareResults',
new=MagicMock(return_value=prep_res_sim)) as mock2:
results = malwarebazaar.analyze(input)
results2 = malwarebazaar.analyze(input2)
results3 = malwarebazaar.analyze(input3)
results = malwarebazaar.analyze(config, input)
results2 = malwarebazaar.analyze(config, input2)
results3 = malwarebazaar.analyze(config, input3)
self.assertEqual(results["summary"], prep_res_sim['summary'])
self.assertEqual(results2["summary"], prep_res_sim['summary'])
self.assertEqual(results3["summary"], prep_res_sim['summary'])
@@ -113,6 +110,7 @@ class TestMalwarebazaarMethods(unittest.TestCase):
and then we compared results['summary'] with 'no result' """
sendReqOutput = {'threat': 'threat', "query_status": "notok", 'data': [
{'sha256_hash': 'validhash'}]}
config = {"api_key": "test_key"}
input = '{"artifactType": "hash", "value": "1234"}'
input2 = '{"artifactType": "tlsh", "value": "1234"}'
input3 = '{"artifactType": "gimphash", "value": "1234"}'
@@ -123,9 +121,9 @@ class TestMalwarebazaarMethods(unittest.TestCase):
new=MagicMock(return_value=sendReqOutput)) as mock:
with patch('malwarebazaar.malwarebazaar.prepareResults',
new=MagicMock(return_value=prep_res_sim)) as mock2:
results = malwarebazaar.analyze(input)
results2 = malwarebazaar.analyze(input2)
results3 = malwarebazaar.analyze(input3)
results = malwarebazaar.analyze(config, input)
results2 = malwarebazaar.analyze(config, input2)
results3 = malwarebazaar.analyze(config, input3)
self.assertEqual(results["summary"], prep_res_sim['summary'])
self.assertEqual(results2["summary"], prep_res_sim['summary'])
self.assertEqual(results3["summary"], prep_res_sim['summary'])
@@ -239,7 +237,18 @@ class TestMalwarebazaarMethods(unittest.TestCase):
def test_sendReq(self):
with patch('requests.post',
new=MagicMock(return_value=MagicMock())) as mock:
conf = {"api_key": "test_key"}
response = malwarebazaar.sendReq(
{'baseUrl': 'https://www.randurl.xyz'}, 'example_data')
conf, {'baseUrl': 'https://www.randurl.xyz'}, 'example_data')
self.assertIsNotNone(response)
mock.assert_called_once()
def test_checkConfigRequirements_valid(self):
config = {"api_key": "test_key"}
self.assertTrue(malwarebazaar.checkConfigRequirements(config))
def test_checkConfigRequirements_missing_key(self):
config = {}
with self.assertRaises(SystemExit) as cm:
malwarebazaar.checkConfigRequirements(config)
self.assertEqual(cm.exception.code, 126)

View File

@@ -174,6 +174,14 @@ sensoroni:
sensitive: False
advanced: True
forcedType: "[]string"
malwarebazaar:
api_key:
description: API key for the malwarebazaar analyzer.
helpLink: sensoroni.html
global: False
sensitive: True
advanced: False
forcedType: string
otx:
api_key:
description: API key for the OTX analyzer.