mirror of
https://github.com/Security-Onion-Solutions/securityonion.git
synced 2026-04-24 21:47:48 +02:00
Add initial OTX analyzer and tests
This commit is contained in:
@@ -0,0 +1,7 @@
|
|||||||
|
{
|
||||||
|
"name": "Alienvault OTX",
|
||||||
|
"version": "0.1",
|
||||||
|
"author": "Security Onion Solutions",
|
||||||
|
"description": "This analyzer queries Alienvault OTX for a domain, hash, IP, or URL, then returns a report for it.",
|
||||||
|
"supportedTypes" : ["domain", "hash", "ip", "url"]
|
||||||
|
}
|
||||||
Executable
+88
@@ -0,0 +1,88 @@
|
|||||||
|
import json
|
||||||
|
import requests
|
||||||
|
import helpers
|
||||||
|
import sys
|
||||||
|
import os
|
||||||
|
import argparse
|
||||||
|
|
||||||
|
|
||||||
|
def buildReq(conf, artifact_type, artifact_value):
|
||||||
|
headers = {"X-OTX-API-KEY": conf["api_key"]}
|
||||||
|
base_url = conf['base_url']
|
||||||
|
if artifact_type == "ip":
|
||||||
|
uri = "indicators/IPv4/"
|
||||||
|
elif artifact_type == "url":
|
||||||
|
uri = "indicators/url/"
|
||||||
|
elif artifact_type == "domain":
|
||||||
|
uri = "indicators/domain/"
|
||||||
|
elif artifact_type == "hash":
|
||||||
|
uri = "indicators/file/"
|
||||||
|
section = "/general"
|
||||||
|
url = base_url + uri + artifact_value + section
|
||||||
|
return url, headers
|
||||||
|
|
||||||
|
|
||||||
|
def checkConfigRequirements(conf):
|
||||||
|
if "api_key" not in conf or len(conf['api_key']) == 0:
|
||||||
|
sys.exit(126)
|
||||||
|
else:
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
def sendReq(url, headers):
|
||||||
|
response = requests.request('GET', url, headers=headers)
|
||||||
|
return response.json()
|
||||||
|
|
||||||
|
|
||||||
|
def prepareResults(response):
|
||||||
|
if len(response) != 0:
|
||||||
|
raw = response
|
||||||
|
if 'reputation' in raw:
|
||||||
|
reputation = raw["reputation"]
|
||||||
|
if reputation == 0:
|
||||||
|
status = "ok"
|
||||||
|
summaryinfo = "harmless"
|
||||||
|
elif reputation > 0 and reputation < 50:
|
||||||
|
status = "ok"
|
||||||
|
summaryinfo = "Likely Harmless"
|
||||||
|
elif reputation >= 50 and reputation < 75:
|
||||||
|
status = "caution"
|
||||||
|
summaryinfo = "suspicious"
|
||||||
|
elif reputation >= 75 and reputation <= 100:
|
||||||
|
status = "threat"
|
||||||
|
summaryinfo = "malicious"
|
||||||
|
else:
|
||||||
|
status = "info"
|
||||||
|
summaryinfo = "Analysis complete."
|
||||||
|
else:
|
||||||
|
raw = {}
|
||||||
|
status = "caution"
|
||||||
|
summaryinfo = "internal_failure"
|
||||||
|
results = {'response': raw, 'status': status, 'summary': summaryinfo}
|
||||||
|
return results
|
||||||
|
|
||||||
|
|
||||||
|
def analyze(conf, input):
|
||||||
|
checkConfigRequirements(conf)
|
||||||
|
meta = helpers.loadMetadata(__file__)
|
||||||
|
data = helpers.parseArtifact(input)
|
||||||
|
helpers.checkSupportedType(meta, data["artifactType"])
|
||||||
|
request = buildReq(conf, data["artifactType"], data["value"])
|
||||||
|
response = sendReq(request[0], request[1])
|
||||||
|
return prepareResults(response)
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
dir = os.path.dirname(os.path.realpath(__file__))
|
||||||
|
parser = argparse.ArgumentParser(description='Search Alienvault OTX for a given artifact')
|
||||||
|
parser.add_argument('artifact', help='the artifact represented in JSON format')
|
||||||
|
parser.add_argument('-c', '--config', metavar="CONFIG_FILE", default=dir + "/otx.yaml", help='optional config file to use instead of the default config file')
|
||||||
|
|
||||||
|
args = parser.parse_args()
|
||||||
|
if args.artifact:
|
||||||
|
results = analyze(helpers.loadConfig(args.config), args.artifact)
|
||||||
|
print(json.dumps(results))
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
@@ -0,0 +1,2 @@
|
|||||||
|
base_url: https://otx.alienvault.com/api/v1/
|
||||||
|
api_key: "{{ salt['pillar.get']('sensoroni:analyzers:otx:api_key', '') }}"
|
||||||
@@ -0,0 +1,250 @@
|
|||||||
|
from io import StringIO
|
||||||
|
import sys
|
||||||
|
from unittest.mock import patch, MagicMock
|
||||||
|
from otx import otx
|
||||||
|
import unittest
|
||||||
|
|
||||||
|
|
||||||
|
class TestOtxMethods(unittest.TestCase):
|
||||||
|
|
||||||
|
def test_main_missing_input(self):
|
||||||
|
with patch('sys.exit', new=MagicMock()) as sysmock:
|
||||||
|
with patch('sys.stderr', new=StringIO()) as mock_stderr:
|
||||||
|
sys.argv = ["cmd"]
|
||||||
|
otx.main()
|
||||||
|
self.assertEqual(mock_stderr.getvalue(), "usage: cmd [-h] [-c CONFIG_FILE] artifact\ncmd: error: the following arguments are required: artifact\n")
|
||||||
|
sysmock.assert_called_once_with(2)
|
||||||
|
|
||||||
|
def test_main_success(self):
|
||||||
|
output = {"foo": "bar"}
|
||||||
|
with patch('sys.stdout', new=StringIO()) as mock_stdout:
|
||||||
|
with patch('otx.otx.analyze', new=MagicMock(return_value=output)) as mock:
|
||||||
|
sys.argv = ["cmd", "input"]
|
||||||
|
otx.main()
|
||||||
|
expected = '{"foo": "bar"}\n'
|
||||||
|
self.assertEqual(mock_stdout.getvalue(), expected)
|
||||||
|
mock.assert_called_once()
|
||||||
|
|
||||||
|
def checkConfigRequirements(self):
|
||||||
|
conf = {"not_a_key": "abcd12345"}
|
||||||
|
with self.assertRaises(SystemExit) as cm:
|
||||||
|
otx.checkConfigRequirements(conf)
|
||||||
|
self.assertEqual(cm.exception.code, 126)
|
||||||
|
|
||||||
|
def test_buildReq_domain(self):
|
||||||
|
conf = {'base_url': 'https://myurl/', 'api_key': 'abcd12345'}
|
||||||
|
artifact_type = "domain"
|
||||||
|
artifact_value = "abc.com"
|
||||||
|
result = otx.buildReq(conf, artifact_type, artifact_value)
|
||||||
|
self.assertEqual("https://myurl/indicators/domain/abc.com/general", result[0])
|
||||||
|
self.assertEqual({'X-OTX-API-KEY': 'abcd12345'}, result[1])
|
||||||
|
|
||||||
|
def test_buildReq_hash(self):
|
||||||
|
conf = {'base_url': 'https://myurl/', 'api_key': 'abcd12345'}
|
||||||
|
artifact_type = "hash"
|
||||||
|
artifact_value = "abcd1234"
|
||||||
|
result = otx.buildReq(conf, artifact_type, artifact_value)
|
||||||
|
self.assertEqual("https://myurl/indicators/file/abcd1234/general", result[0])
|
||||||
|
self.assertEqual({'X-OTX-API-KEY': 'abcd12345'}, result[1])
|
||||||
|
|
||||||
|
def test_buildReq_ip(self):
|
||||||
|
conf = {'base_url': 'https://myurl/', 'api_key': 'abcd12345'}
|
||||||
|
artifact_type = "ip"
|
||||||
|
artifact_value = "192.168.1.1"
|
||||||
|
result = otx.buildReq(conf, artifact_type, artifact_value)
|
||||||
|
self.assertEqual("https://myurl/indicators/IPv4/192.168.1.1/general", result[0])
|
||||||
|
self.assertEqual({'X-OTX-API-KEY': 'abcd12345'}, result[1])
|
||||||
|
|
||||||
|
def test_buildReq_url(self):
|
||||||
|
conf = {'base_url': 'https://myurl/', 'api_key': 'abcd12345'}
|
||||||
|
artifact_type = "url"
|
||||||
|
artifact_value = "https://abc.com"
|
||||||
|
result = otx.buildReq(conf, artifact_type, artifact_value)
|
||||||
|
self.assertEqual("https://myurl/indicators/url/https://abc.com/general", result[0])
|
||||||
|
self.assertEqual({'X-OTX-API-KEY': 'abcd12345'}, result[1])
|
||||||
|
|
||||||
|
def test_sendReq(self):
|
||||||
|
with patch('requests.request', new=MagicMock(return_value=MagicMock())) as mock:
|
||||||
|
url = "https://myurl="
|
||||||
|
response = otx.sendReq(url, headers={"x-apikey": "xyz"})
|
||||||
|
mock.assert_called_once_with("GET", "https://myurl=", headers={"x-apikey": "xyz"})
|
||||||
|
self.assertIsNotNone(response)
|
||||||
|
|
||||||
|
def test_prepareResults_harmless(self):
|
||||||
|
raw = {
|
||||||
|
"whois": "http://whois.domaintools.com/192.168.1.1",
|
||||||
|
"reputation": 0,
|
||||||
|
"indicator": "192.168.1.1",
|
||||||
|
"type": "IPv4",
|
||||||
|
"pulse_info": {
|
||||||
|
"count": 0,
|
||||||
|
"pulses": [],
|
||||||
|
"related": {
|
||||||
|
"alienvault": {
|
||||||
|
"adversary": [],
|
||||||
|
"malware_families": []
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"false_positive": [],
|
||||||
|
"sections": [
|
||||||
|
"general"
|
||||||
|
]
|
||||||
|
}
|
||||||
|
results = otx.prepareResults(raw)
|
||||||
|
self.assertEqual(results["response"], raw)
|
||||||
|
self.assertEqual(results["summary"], "harmless")
|
||||||
|
self.assertEqual(results["status"], "ok")
|
||||||
|
|
||||||
|
def test_prepareResults_likely_harmless(self):
|
||||||
|
raw = {
|
||||||
|
"whois": "http://whois.domaintools.com/192.168.1.1",
|
||||||
|
"reputation": 49,
|
||||||
|
"indicator": "192.168.1.1",
|
||||||
|
"type": "IPv4",
|
||||||
|
"pulse_info": {
|
||||||
|
"count": 0,
|
||||||
|
"pulses": [],
|
||||||
|
"related": {
|
||||||
|
"alienvault": {
|
||||||
|
"adversary": [],
|
||||||
|
"malware_families": []
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"false_positive": [],
|
||||||
|
"sections": [
|
||||||
|
"general"
|
||||||
|
]
|
||||||
|
}
|
||||||
|
results = otx.prepareResults(raw)
|
||||||
|
self.assertEqual(results["response"], raw)
|
||||||
|
self.assertEqual(results["summary"], "Likely Harmless")
|
||||||
|
self.assertEqual(results["status"], "ok")
|
||||||
|
|
||||||
|
def test_prepareResults_suspicious(self):
|
||||||
|
raw = {
|
||||||
|
"whois": "http://whois.domaintools.com/192.168.1.1",
|
||||||
|
"reputation": 50,
|
||||||
|
"indicator": "192.168.1.1",
|
||||||
|
"type": "IPv4",
|
||||||
|
"pulse_info": {
|
||||||
|
"count": 0,
|
||||||
|
"pulses": [],
|
||||||
|
"related": {
|
||||||
|
"alienvault": {
|
||||||
|
"adversary": [],
|
||||||
|
"malware_families": []
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"false_positive": [],
|
||||||
|
"sections": [
|
||||||
|
"general"
|
||||||
|
]
|
||||||
|
}
|
||||||
|
results = otx.prepareResults(raw)
|
||||||
|
self.assertEqual(results["response"], raw)
|
||||||
|
self.assertEqual(results["summary"], "suspicious")
|
||||||
|
self.assertEqual(results["status"], "caution")
|
||||||
|
|
||||||
|
def test_prepareResults_threat(self):
|
||||||
|
raw = {
|
||||||
|
"whois": "http://whois.domaintools.com/192.168.1.1",
|
||||||
|
"reputation": 75,
|
||||||
|
"indicator": "192.168.1.1",
|
||||||
|
"type": "IPv4",
|
||||||
|
"pulse_info": {
|
||||||
|
"count": 0,
|
||||||
|
"pulses": [],
|
||||||
|
"related": {
|
||||||
|
"alienvault": {
|
||||||
|
"adversary": [],
|
||||||
|
"malware_families": []
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"false_positive": [],
|
||||||
|
"sections": [
|
||||||
|
"general"
|
||||||
|
]
|
||||||
|
}
|
||||||
|
results = otx.prepareResults(raw)
|
||||||
|
self.assertEqual(results["response"], raw)
|
||||||
|
self.assertEqual(results["summary"], "malicious")
|
||||||
|
self.assertEqual(results["status"], "threat")
|
||||||
|
|
||||||
|
def test_prepareResults_undetermined(self):
|
||||||
|
raw = {
|
||||||
|
"alexa": "",
|
||||||
|
"base_indicator": {},
|
||||||
|
"domain": "Unavailable",
|
||||||
|
"false_positive": [],
|
||||||
|
"hostname": "Unavailable",
|
||||||
|
"indicator": "http://192.168.1.1",
|
||||||
|
"pulse_info": {
|
||||||
|
"count": 0,
|
||||||
|
"pulses": [],
|
||||||
|
"references": [],
|
||||||
|
"related": {
|
||||||
|
"alienvault": {
|
||||||
|
"adversary": [],
|
||||||
|
"industries": [],
|
||||||
|
"malware_families": [],
|
||||||
|
"unique_indicators": 0
|
||||||
|
},
|
||||||
|
"other": {
|
||||||
|
"adversary": [],
|
||||||
|
"industries": [],
|
||||||
|
"malware_families": [],
|
||||||
|
"unique_indicators": 0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"sections": [
|
||||||
|
"general"
|
||||||
|
],
|
||||||
|
"type": "url",
|
||||||
|
"type_title": "URL",
|
||||||
|
"validation": []
|
||||||
|
}
|
||||||
|
results = otx.prepareResults(raw)
|
||||||
|
self.assertEqual(results["response"], raw)
|
||||||
|
self.assertEqual(results["summary"], "Analysis complete.")
|
||||||
|
self.assertEqual(results["status"], "info")
|
||||||
|
|
||||||
|
def test_prepareResults_error(self):
|
||||||
|
raw = {}
|
||||||
|
results = otx.prepareResults(raw)
|
||||||
|
self.assertEqual(results["response"], raw)
|
||||||
|
self.assertEqual(results["summary"], "internal_failure")
|
||||||
|
self.assertEqual(results["status"], "caution")
|
||||||
|
|
||||||
|
def test_analyze(self):
|
||||||
|
output = {
|
||||||
|
"whois": "http://whois.domaintools.com/192.168.1.1",
|
||||||
|
"reputation": 0,
|
||||||
|
"indicator": "192.168.1.1",
|
||||||
|
"type": "IPv4",
|
||||||
|
"pulse_info": {
|
||||||
|
"count": 0,
|
||||||
|
"pulses": [],
|
||||||
|
"related": {
|
||||||
|
"alienvault": {
|
||||||
|
"adversary": [],
|
||||||
|
"malware_families": []
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"false_positive": [],
|
||||||
|
"sections": [
|
||||||
|
"general"
|
||||||
|
]
|
||||||
|
}
|
||||||
|
|
||||||
|
artifactInput = '{"value":"192.168.1.1","artifactType":"ip"}'
|
||||||
|
conf = {"base_url": "https://myurl/", "api_key": "xyz"}
|
||||||
|
with patch('otx.otx.sendReq', new=MagicMock(return_value=output)) as mock:
|
||||||
|
results = otx.analyze(conf, artifactInput)
|
||||||
|
self.assertEqual(results["summary"], "harmless")
|
||||||
|
mock.assert_called_once()
|
||||||
@@ -0,0 +1,2 @@
|
|||||||
|
requests>=2.27.1
|
||||||
|
pyyaml>=6.0
|
||||||
BIN
Binary file not shown.
BIN
Binary file not shown.
BIN
Binary file not shown.
Binary file not shown.
BIN
Binary file not shown.
BIN
Binary file not shown.
Reference in New Issue
Block a user