urlhaus dep upgrades + update to use authenticated abusech api

This commit is contained in:
reyesj2
2025-08-20 17:20:10 -05:00
parent 2e94e452ed
commit 9ca0c7d53a
17 changed files with 65 additions and 26 deletions

View File

@@ -55,6 +55,8 @@ sensoroni:
enabled: False enabled: False
visibility: public visibility: public
timeout: 180 timeout: 180
urlhaus:
api_key:
virustotal: virustotal:
base_url: https://www.virustotal.com/api/v3/search?query= base_url: https://www.virustotal.com/api/v3/search?query=
api_key: api_key:

View File

@@ -43,7 +43,7 @@ Many analyzers require authentication, via an API key or similar. The table belo
[Spamhaus](https://www.spamhaus.org/dbl/) |✗| [Spamhaus](https://www.spamhaus.org/dbl/) |✗|
[Sublime Platform](https://sublime.security) |✓| [Sublime Platform](https://sublime.security) |✓|
[ThreatFox](https://threatfox.abuse.ch/) |✗| [ThreatFox](https://threatfox.abuse.ch/) |✗|
[Urlhaus](https://urlhaus.abuse.ch/) |✗| [Urlhaus](https://urlhaus.abuse.ch/) |✓|
[Urlscan](https://urlscan.io/docs/api/) |✓| [Urlscan](https://urlscan.io/docs/api/) |✓|
[VirusTotal](https://developers.virustotal.com/reference/overview) |✓| [VirusTotal](https://developers.virustotal.com/reference/overview) |✓|
[WhoisLookup](https://github.com/meeb/whoisit) |✗| [WhoisLookup](https://github.com/meeb/whoisit) |✗|

View File

@@ -1,6 +1,6 @@
{ {
"name": "Urlhaus", "name": "Urlhaus",
"version": "0.1", "version": "0.2",
"author": "Security Onion Solutions", "author": "Security Onion Solutions",
"description": "This analyzer queries URLHaus to see if a URL is considered malicious.", "description": "This analyzer queries URLHaus to see if a URL is considered malicious.",
"supportedTypes" : ["url"], "supportedTypes" : ["url"],

View File

@@ -1,16 +1,28 @@
import json import json
import os
import requests import requests
import sys import sys
import helpers import helpers
import argparse
def checkConfigRequirements(conf):
if not conf.get('api_key'):
sys.exit(126)
else:
return True
def buildReq(artifact_value): def buildReq(artifact_value):
return {"url": artifact_value} return {"url": artifact_value}
def sendReq(meta, payload): def sendReq(conf, meta, payload):
url = meta['baseUrl'] url = meta['baseUrl']
response = requests.request('POST', url, data=payload) headers = {}
if conf.get('api_key'):
headers['Auth-Key'] = conf['api_key']
response = requests.request('POST', url, data=payload, headers=headers)
return response.json() return response.json()
@@ -31,21 +43,28 @@ def prepareResults(raw):
return results return results
def analyze(input): def analyze(conf, input):
checkConfigRequirements(conf)
meta = helpers.loadMetadata(__file__) meta = helpers.loadMetadata(__file__)
data = helpers.parseArtifact(input) data = helpers.parseArtifact(input)
helpers.checkSupportedType(meta, data["artifactType"]) helpers.checkSupportedType(meta, data["artifactType"])
payload = buildReq(data["value"]) payload = buildReq(data["value"])
response = sendReq(meta, payload) response = sendReq(conf, meta, payload)
return prepareResults(response) return prepareResults(response)
def main(): def main():
if len(sys.argv) == 2: dir = os.path.dirname(os.path.realpath(__file__))
results = analyze(sys.argv[1]) parser = argparse.ArgumentParser(
description='Search URLhaus for a given artifact')
parser.add_argument(
'artifact', help='the artifact represented in JSON format')
parser.add_argument('-c', '--config', metavar='CONFIG_FILE', default=dir + '/urlhaus.yaml',
help='optional config file to use instead of the default config file')
args = parser.parse_args()
if args.artifact:
results = analyze(helpers.loadConfig(args.config), args.artifact)
print(json.dumps(results)) print(json.dumps(results))
else:
print("ERROR: Missing input JSON")
if __name__ == "__main__": if __name__ == "__main__":

View File

@@ -0,0 +1 @@
api_key: "{{ salt['pillar.get']('sensoroni:analyzers:urlhaus:api_key', '') }}"

View File

@@ -1,27 +1,24 @@
from io import StringIO from io import StringIO
import sys import sys
from unittest.mock import patch, MagicMock from unittest.mock import patch, MagicMock
from urlhaus import urlhaus
import unittest import unittest
from urlhaus import urlhaus
class TestUrlhausMethods(unittest.TestCase): class TestUrlhausMethods(unittest.TestCase):
def test_main_missing_input(self):
with patch('sys.stdout', new=StringIO()) as mock_stdout:
sys.argv = ["cmd"]
urlhaus.main()
self.assertEqual(mock_stdout.getvalue(), "ERROR: Missing input JSON\n")
def test_main_success(self): def test_main_success(self):
output = {"foo": "bar"} output = {"foo": "bar"}
config = {"api_key": "test_key"}
with patch('sys.stdout', new=StringIO()) as mock_stdout: with patch('sys.stdout', new=StringIO()) as mock_stdout:
with patch('urlhaus.urlhaus.analyze', new=MagicMock(return_value=output)) as mock: with patch('urlhaus.urlhaus.analyze', new=MagicMock(return_value=output)) as mock_analyze:
sys.argv = ["cmd", "input"] with patch('helpers.loadConfig', new=MagicMock(return_value=config)) as mock_config:
urlhaus.main() sys.argv = ["cmd", "input"]
expected = '{"foo": "bar"}\n' urlhaus.main()
self.assertEqual(mock_stdout.getvalue(), expected) expected = '{"foo": "bar"}\n'
mock.assert_called_once() self.assertEqual(mock_stdout.getvalue(), expected)
mock_analyze.assert_called_once()
mock_config.assert_called_once()
def test_buildReq(self): def test_buildReq(self):
result = urlhaus.buildReq("test") result = urlhaus.buildReq("test")
@@ -29,9 +26,10 @@ class TestUrlhausMethods(unittest.TestCase):
def test_sendReq(self): def test_sendReq(self):
with patch('requests.request', new=MagicMock(return_value=MagicMock())) as mock: with patch('requests.request', new=MagicMock(return_value=MagicMock())) as mock:
conf = {"api_key": "test_key"}
meta = {"baseUrl": "myurl"} meta = {"baseUrl": "myurl"}
response = urlhaus.sendReq(meta, "mypayload") response = urlhaus.sendReq(conf, meta, "mypayload")
mock.assert_called_once_with("POST", "myurl", data="mypayload") mock.assert_called_once_with("POST", "myurl", data="mypayload", headers={"Auth-Key": "test_key"})
self.assertIsNotNone(response) self.assertIsNotNone(response)
def test_prepareResults_none(self): def test_prepareResults_none(self):
@@ -65,8 +63,19 @@ class TestUrlhausMethods(unittest.TestCase):
def test_analyze(self): def test_analyze(self):
output = {"threat": "malware_download"} output = {"threat": "malware_download"}
config = {"api_key": "test_key"}
artifactInput = '{"value":"foo","artifactType":"url"}' artifactInput = '{"value":"foo","artifactType":"url"}'
with patch('urlhaus.urlhaus.sendReq', new=MagicMock(return_value=output)) as mock: with patch('urlhaus.urlhaus.sendReq', new=MagicMock(return_value=output)) as mock:
results = urlhaus.analyze(artifactInput) results = urlhaus.analyze(config, artifactInput)
self.assertEqual(results["summary"], "malware_download") self.assertEqual(results["summary"], "malware_download")
mock.assert_called_once() mock.assert_called_once()
def test_checkConfigRequirements_valid(self):
config = {"api_key": "test_key"}
self.assertTrue(urlhaus.checkConfigRequirements(config))
def test_checkConfigRequirements_missing_key(self):
config = {}
with self.assertRaises(SystemExit) as cm:
urlhaus.checkConfigRequirements(config)
self.assertEqual(cm.exception.code, 126)

View File

@@ -291,6 +291,14 @@ sensoroni:
sensitive: False sensitive: False
advanced: True advanced: True
forcedType: string forcedType: string
urlhaus:
api_key:
description: API key for the urlhaus analyzer.
helpLink: sensoroni.html
global: False
sensitive: True
advanced: False
forcedType: string
virustotal: virustotal:
api_key: api_key:
description: API key for the VirusTotal analyzer. description: API key for the VirusTotal analyzer.