mirror of
https://github.com/Security-Onion-Solutions/securityonion.git
synced 2026-04-25 05:57:49 +02:00
Add Pulsedive analyzer and tests
This commit is contained in:
@@ -0,0 +1,7 @@
|
||||
{
|
||||
"name": "Pulsedive",
|
||||
"version": "0.1",
|
||||
"author": "Security Onion Solutions",
|
||||
"description": "This analyzer queries Pulsedive for context around an observable",
|
||||
"supportedTypes": ["domain", "ip", "hash", "uri_path", "url", "user-agent"]
|
||||
}
|
||||
@@ -0,0 +1,107 @@
|
||||
import json
|
||||
import requests
|
||||
import argparse
|
||||
import helpers
|
||||
import os
|
||||
import sys
|
||||
|
||||
|
||||
def checkConfigRequirements(conf):
|
||||
if "api_key" not in conf or len(conf['api_key']) == 0:
|
||||
sys.exit(126)
|
||||
else:
|
||||
return True
|
||||
|
||||
|
||||
def buildReq(conf, artifactType, artifactValue):
|
||||
indicatorTypes = ["domain", "hash", "ip" "url"]
|
||||
if artifactType in indicatorTypes:
|
||||
url = conf['base_url'] + '/info.php'
|
||||
params = {"key": conf["api_key"], "indicator": artifactValue}
|
||||
else:
|
||||
if artifactType == "uri_path":
|
||||
query = "http.location=" + artifactValue
|
||||
url = conf['base_url'] + '/explore.php'
|
||||
elif artifactType == "user-agent":
|
||||
query = "http.useragent_normaliser=" + artifactValue
|
||||
url = conf['base_url'] + '/explore.php'
|
||||
params = {"key": conf["api_key"], "q": query, "limit": 100}
|
||||
|
||||
return url, params
|
||||
|
||||
|
||||
def sendReq(url, params):
|
||||
response = requests.request('GET', url, params=params)
|
||||
return response.json()
|
||||
|
||||
|
||||
def prepareResults(raw):
|
||||
classified = []
|
||||
classification = {
|
||||
"high": "malicious",
|
||||
"medium": "suspicious",
|
||||
"low": "harmless",
|
||||
"none": "none",
|
||||
"unknown": "unknown"
|
||||
}
|
||||
|
||||
if raw:
|
||||
if 'results' in raw:
|
||||
if raw['results'] == []:
|
||||
classified.append("no_results")
|
||||
else:
|
||||
for r in raw['results']:
|
||||
risk = r['risk']
|
||||
classified.append(classification.get(risk))
|
||||
else:
|
||||
classified.append(classification.get(raw['risk']))
|
||||
|
||||
if classified.count('malicious') > 0:
|
||||
summary = "malicious"
|
||||
status = "threat"
|
||||
elif classified.count('suspicious') > 0:
|
||||
summary = "suspicious"
|
||||
status = "caution"
|
||||
elif classified.count('harmless') > 0:
|
||||
summary = "harmless"
|
||||
status = "ok"
|
||||
elif classified.count('none') > 0:
|
||||
summary = "harmless"
|
||||
status = "ok"
|
||||
elif classified.count('unknown') > 0:
|
||||
summary = ""
|
||||
status = "unknown"
|
||||
elif classified.count('no_results') > 0:
|
||||
summary = "no_results"
|
||||
status = "ok"
|
||||
else:
|
||||
summary = "internal_failure"
|
||||
status = "caution"
|
||||
results = {'response': raw, 'summary': summary, 'status': status}
|
||||
return results
|
||||
|
||||
|
||||
def analyze(conf, input):
|
||||
checkConfigRequirements(conf)
|
||||
meta = helpers.loadMetadata(__file__)
|
||||
data = helpers.parseArtifact(input)
|
||||
helpers.checkSupportedType(meta, data["artifactType"])
|
||||
request = buildReq(conf, data["artifactType"], data["value"])
|
||||
response = sendReq(request[0], request[1])
|
||||
return prepareResults(response)
|
||||
|
||||
|
||||
def main():
|
||||
dir = os.path.dirname(os.path.realpath(__file__))
|
||||
parser = argparse.ArgumentParser(description='Search VirusTotal for a given artifact')
|
||||
parser.add_argument('artifact', help='the artifact represented in JSON format')
|
||||
parser.add_argument('-c', '--config', metavar="CONFIG_FILE", default=dir + "/pulsedive.yaml", help='optional config file to use instead of the default config file')
|
||||
|
||||
args = parser.parse_args()
|
||||
if args.artifact:
|
||||
results = analyze(helpers.loadConfig(args.config), args.artifact)
|
||||
print(json.dumps(results))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -0,0 +1,2 @@
|
||||
base_url: https://pulsedive.com/api/
|
||||
api_key: "{{ salt['pillar.get']('sensoroni:analyzers:pulsedive:api_key', '') }}"
|
||||
@@ -0,0 +1,121 @@
|
||||
from io import StringIO
|
||||
import sys
|
||||
from unittest.mock import patch, MagicMock
|
||||
from pulsedive import pulsedive
|
||||
import unittest
|
||||
|
||||
|
||||
class TestVirusTotalMethods(unittest.TestCase):
|
||||
|
||||
def test_main_missing_input(self):
|
||||
with patch('sys.exit', new=MagicMock()) as sysmock:
|
||||
with patch('sys.stderr', new=StringIO()) as mock_stderr:
|
||||
sys.argv = ["cmd"]
|
||||
pulsedive.main()
|
||||
self.assertEqual(mock_stderr.getvalue(), "usage: cmd [-h] [-c CONFIG_FILE] artifact\ncmd: error: the following arguments are required: artifact\n")
|
||||
sysmock.assert_called_once_with(2)
|
||||
|
||||
def test_main_success(self):
|
||||
output = {"foo": "bar"}
|
||||
with patch('sys.stdout', new=StringIO()) as mock_stdout:
|
||||
with patch('pulsedive.pulsedive.analyze', new=MagicMock(return_value=output)) as mock:
|
||||
sys.argv = ["cmd", "input"]
|
||||
pulsedive.main()
|
||||
expected = '{"foo": "bar"}\n'
|
||||
self.assertEqual(mock_stdout.getvalue(), expected)
|
||||
mock.assert_called_once()
|
||||
|
||||
def test_checkConfigRequirements(self):
|
||||
conf = {"not_a_key": "abcd12345"}
|
||||
with self.assertRaises(SystemExit) as cm:
|
||||
pulsedive.checkConfigRequirements(conf)
|
||||
self.assertEqual(cm.exception.code, 126)
|
||||
|
||||
def test_buildReq_domain(self):
|
||||
conf = {"api_key": "xyz", "base_url": "https://myurl"}
|
||||
artifactType = "domain"
|
||||
artifactValue = "pulsedive.com"
|
||||
result = pulsedive.buildReq(conf, artifactType, artifactValue)
|
||||
self.assertEqual("https://myurl/info.php", result[0])
|
||||
self.assertEqual({"key": "xyz", "indicator": "pulsedive.com"}, result[1])
|
||||
|
||||
def test_buildReq_uri_path(self):
|
||||
conf = {"api_key": "xyz", "base_url": "https://myurl"}
|
||||
artifactType = "uri_path"
|
||||
artifactValue = "/main.php"
|
||||
result = pulsedive.buildReq(conf, artifactType, artifactValue)
|
||||
self.assertEqual("https://myurl/explore.php", result[0])
|
||||
self.assertEqual({"key": "xyz", "q": "http.location=/main.php", "limit": 100}, result[1])
|
||||
|
||||
def test_sendReq(self):
|
||||
with patch('requests.request', new=MagicMock(return_value=MagicMock())) as mock:
|
||||
url = 'https://myurl/api/'
|
||||
params = {"key": "abcd1234", "q": "http.location=/main.php", "limit": 100}
|
||||
response = pulsedive.sendReq(url=url, params=params)
|
||||
mock.assert_called_once_with("GET", "https://myurl/api/", params={"key": "abcd1234", "q": "http.location=/main.php", "limit": 100})
|
||||
self.assertIsNotNone(response)
|
||||
|
||||
def test_prepareResults_risk_high(self):
|
||||
raw = {"results": [{"risk": "high"}]}
|
||||
results = pulsedive.prepareResults(raw)
|
||||
self.assertEqual(results["response"], raw)
|
||||
self.assertEqual(results["summary"], "malicious")
|
||||
self.assertEqual(results["status"], "threat")
|
||||
|
||||
def test_prepareResults_risk_med(self):
|
||||
raw = {"results": [{"risk": "medium"}]}
|
||||
results = pulsedive.prepareResults(raw)
|
||||
self.assertEqual(results["response"], raw)
|
||||
self.assertEqual(results["summary"], "suspicious")
|
||||
self.assertEqual(results["status"], "caution")
|
||||
|
||||
def test_prepareResults_risk_low(self):
|
||||
raw = {"results": [{"risk": "low"}]}
|
||||
results = pulsedive.prepareResults(raw)
|
||||
self.assertEqual(results["response"], raw)
|
||||
self.assertEqual(results["summary"], "harmless")
|
||||
self.assertEqual(results["status"], "ok")
|
||||
|
||||
def test_prepareResults_risk_none(self):
|
||||
raw = {"results": [{"risk": "none"}]}
|
||||
results = pulsedive.prepareResults(raw)
|
||||
self.assertEqual(results["response"], raw)
|
||||
self.assertEqual(results["summary"], "harmless")
|
||||
self.assertEqual(results["status"], "ok")
|
||||
|
||||
def test_prepareResults_risk_unknown(self):
|
||||
raw = {"results": [{"risk": "unknown"}]}
|
||||
results = pulsedive.prepareResults(raw)
|
||||
self.assertEqual(results["response"], raw)
|
||||
self.assertEqual(results["summary"], "")
|
||||
self.assertEqual(results["status"], "unknown")
|
||||
|
||||
def test_prepareResults_no_results(self):
|
||||
raw = {"results": []}
|
||||
results = pulsedive.prepareResults(raw)
|
||||
self.assertEqual(results["response"], raw)
|
||||
self.assertEqual(results["summary"], "no_results")
|
||||
self.assertEqual(results["status"], "ok")
|
||||
|
||||
def test_prepareResults_risk_none_indicator(self):
|
||||
raw = {"iid": "1234", "risk": "none"}
|
||||
results = pulsedive.prepareResults(raw)
|
||||
self.assertEqual(results["response"], raw)
|
||||
self.assertEqual(results["summary"], "harmless")
|
||||
self.assertEqual(results["status"], "ok")
|
||||
|
||||
def test_prepareResults_error(self):
|
||||
raw = {}
|
||||
results = pulsedive.prepareResults(raw)
|
||||
self.assertEqual(results["response"], raw)
|
||||
self.assertEqual(results["summary"], "internal_failure")
|
||||
self.assertEqual(results["status"], "caution")
|
||||
|
||||
def test_analyze(self):
|
||||
output = {"results": [{"risk": "low"}]}
|
||||
artifactInput = '{"value":"chrome","artifactType":"user-agent"}'
|
||||
conf = {"api_key": "xyz", "base_url": "https://myurl"}
|
||||
with patch('pulsedive.pulsedive.sendReq', new=MagicMock(return_value=output)) as mock:
|
||||
results = pulsedive.analyze(conf, artifactInput)
|
||||
self.assertEqual(results["summary"], "harmless")
|
||||
mock.assert_called_once()
|
||||
@@ -0,0 +1,2 @@
|
||||
requests>=2.27.1
|
||||
pyyaml>=6.0
|
||||
BIN
Binary file not shown.
BIN
Binary file not shown.
BIN
Binary file not shown.
Binary file not shown.
BIN
Binary file not shown.
BIN
Binary file not shown.
Reference in New Issue
Block a user