マルチスレッド対応
This commit is contained in:
@@ -4,10 +4,15 @@ use crate::detections::print::MESSAGES;
|
||||
use crate::detections::rule;
|
||||
use crate::detections::rule::RuleNode;
|
||||
use crate::yaml::ParseYaml;
|
||||
|
||||
use evtx::err;
|
||||
use evtx::{EvtxParser, ParserSettings, SerializedEvtxRecord};
|
||||
use serde_json::{Error, Value};
|
||||
use std::path::PathBuf;
|
||||
use tokio::runtime;
|
||||
use tokio::{spawn, task::JoinHandle};
|
||||
|
||||
use std::{fs::File, sync::Arc};
|
||||
use std::{path::PathBuf, time::Instant};
|
||||
|
||||
const DIRPATH_RULES: &str = "rules";
|
||||
|
||||
@@ -26,74 +31,17 @@ impl Detection {
|
||||
}
|
||||
|
||||
// parse rule files
|
||||
let mut selection_rules = self.parse_rule_files();
|
||||
if selection_rules.is_empty() {
|
||||
let rules = self.parse_rule_files();
|
||||
if rules.is_empty() {
|
||||
return;
|
||||
}
|
||||
|
||||
// serialize from evtx files to json
|
||||
let evtx_records = self.serialize_evtx_to_jsons(evtx_files);
|
||||
// transform from evtx files into json
|
||||
let records = self.evtx_to_jsons(evtx_files);
|
||||
|
||||
// select rule files and collect message
|
||||
let mut message = MESSAGES.lock().unwrap();
|
||||
selection_rules.iter_mut().for_each(|rule| {
|
||||
evtx_records.iter().for_each(|event_record| {
|
||||
if !rule.select(event_record) {
|
||||
return;
|
||||
}
|
||||
|
||||
message.insert(
|
||||
event_record,
|
||||
rule.yaml["title"].as_str().unwrap_or("").to_string(),
|
||||
rule.yaml["output"].as_str().unwrap_or("").to_string(),
|
||||
)
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
// serialize evtx files to json
|
||||
fn serialize_evtx_to_jsons(&self, evtx_files: Vec<PathBuf>) -> Vec<Value> {
|
||||
return evtx_files
|
||||
.iter()
|
||||
.filter_map(|evtx_file| {
|
||||
// convert to evtx parser
|
||||
match EvtxParser::from_path(evtx_file) {
|
||||
Ok(parser) => Option::Some(parser),
|
||||
Err(e) => {
|
||||
eprintln!("{}", e);
|
||||
return Option::None;
|
||||
}
|
||||
}
|
||||
})
|
||||
.map(|mut cur| {
|
||||
let mut parse_config = ParserSettings::default();
|
||||
parse_config = parse_config.separate_json_attributes(true);
|
||||
cur = cur.with_configuration(parse_config);
|
||||
let ret: Vec<err::Result<SerializedEvtxRecord<String>>> =
|
||||
cur.records_json().collect();
|
||||
return ret;
|
||||
})
|
||||
.flatten()
|
||||
.filter_map(|json_record| {
|
||||
// convert from evtx parser to evtx json string records
|
||||
if json_record.is_ok() {
|
||||
return Option::Some(json_record.unwrap());
|
||||
} else {
|
||||
eprintln!("{}", json_record.unwrap_err());
|
||||
return Option::None;
|
||||
}
|
||||
})
|
||||
.filter_map(|json_record| {
|
||||
// serialize json from json string
|
||||
let result_json: Result<Value, Error> = serde_json::from_str(&json_record.data); //// https://rust-lang-nursery.github.io/rust-cookbook/encoding/complex.html
|
||||
if result_json.is_err() {
|
||||
eprintln!("{}", result_json.unwrap_err());
|
||||
return Option::None;
|
||||
} else {
|
||||
return result_json.ok();
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
runtime::Runtime::new()
|
||||
.unwrap()
|
||||
.block_on(self.execute_rule(rules, records));
|
||||
}
|
||||
|
||||
fn parse_rule_files(&self) -> Vec<RuleNode> {
|
||||
@@ -135,4 +83,187 @@ impl Detection {
|
||||
})
|
||||
.collect();
|
||||
}
|
||||
|
||||
// evtxファイルをjsonに変換する。
|
||||
fn evtx_to_jsons(&mut self, evtx_files: Vec<PathBuf>) -> Vec<Value> {
|
||||
// EvtxParserを生成する。
|
||||
let evtx_parsers: Vec<EvtxParser<File>> = evtx_files
|
||||
.iter()
|
||||
.filter_map(|evtx_file| {
|
||||
// convert to evtx parser
|
||||
match EvtxParser::from_path(evtx_file) {
|
||||
Ok(parser) => Option::Some(parser),
|
||||
Err(e) => {
|
||||
eprintln!("{}", e);
|
||||
return Option::None;
|
||||
}
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
|
||||
let xml_records = runtime::Runtime::new()
|
||||
.unwrap()
|
||||
.block_on(self.evtx_to_xml(evtx_parsers));
|
||||
|
||||
return runtime::Runtime::new()
|
||||
.unwrap()
|
||||
.block_on(self.xml_to_json(xml_records));
|
||||
}
|
||||
|
||||
// evtxファイルからxmlを生成する。
|
||||
async fn evtx_to_xml(
|
||||
&mut self,
|
||||
evtx_parsers: Vec<EvtxParser<File>>,
|
||||
) -> Vec<SerializedEvtxRecord<String>> {
|
||||
// evtx_parser.records_json()でevtxをxmlに変換するJobを作成
|
||||
let handles: Vec<JoinHandle<Vec<err::Result<SerializedEvtxRecord<String>>>>> = evtx_parsers
|
||||
.into_iter()
|
||||
.map(|mut evtx_parser| {
|
||||
return spawn(async move {
|
||||
let mut parse_config = ParserSettings::default();
|
||||
parse_config = parse_config.separate_json_attributes(true);
|
||||
evtx_parser = evtx_parser.with_configuration(parse_config);
|
||||
let values = evtx_parser.records_json().collect();
|
||||
return values;
|
||||
});
|
||||
})
|
||||
.collect();
|
||||
|
||||
// 作成したjobを実行し(handle.awaitの部分)、スレッドの実行時にエラーが発生した場合、標準エラー出力に出しておく
|
||||
let mut ret = vec![];
|
||||
for handle in handles {
|
||||
let future_result = handle.await;
|
||||
if future_result.is_err() {
|
||||
eprintln!("{}", future_result.unwrap_err());
|
||||
continue;
|
||||
}
|
||||
|
||||
ret.push(future_result.unwrap());
|
||||
}
|
||||
|
||||
// xmlの変換でエラーが出た場合、標準エラー出力に出しておく
|
||||
return ret
|
||||
.into_iter()
|
||||
.flatten()
|
||||
.filter_map(|parse_result| {
|
||||
if parse_result.is_err() {
|
||||
eprintln!("{}", parse_result.unwrap_err());
|
||||
return Option::None;
|
||||
}
|
||||
|
||||
return Option::Some(parse_result.unwrap());
|
||||
})
|
||||
.collect();
|
||||
}
|
||||
|
||||
async fn xml_to_json(&mut self, xml_records: Vec<SerializedEvtxRecord<String>>) -> Vec<Value> {
|
||||
// xmlからjsonに変換するJobを作成
|
||||
let handles: Vec<JoinHandle<Result<Value, Error>>> = xml_records
|
||||
.into_iter()
|
||||
.map(|xml_record| {
|
||||
return spawn(async move {
|
||||
return serde_json::from_str(&xml_record.data);
|
||||
});
|
||||
})
|
||||
.collect();
|
||||
|
||||
// 作成したjobを実行し(handle.awaitの部分)、スレッドの実行時にエラーが発生した場合、標準エラー出力に出しておく
|
||||
let mut ret = vec![];
|
||||
for handle in handles {
|
||||
let future_result = handle.await;
|
||||
if future_result.is_err() {
|
||||
eprintln!("{}", future_result.unwrap_err());
|
||||
continue;
|
||||
}
|
||||
|
||||
ret.push(future_result.unwrap());
|
||||
}
|
||||
|
||||
// xmlの変換でエラーが出た場合、標準エラー出力に出しておく
|
||||
return ret
|
||||
.into_iter()
|
||||
.filter_map(|parse_result| {
|
||||
if parse_result.is_err() {
|
||||
eprintln!("{}", parse_result.unwrap_err());
|
||||
return Option::None;
|
||||
}
|
||||
|
||||
return Option::Some(parse_result.unwrap());
|
||||
})
|
||||
.collect();
|
||||
}
|
||||
|
||||
async fn execute_rule(&mut self, rules: Vec<RuleNode>, records: Vec<Value>) {
|
||||
// 排他制御と所有権共有のため、recordをRwLockとArcで囲む
|
||||
// recordは不変参照(mutが不要)なので、不変参照なら複数スレッドが同時にロックを取得できるようにRwLockを用いている。
|
||||
// RwLockの代わりにMutexを使うこともできるが、これは不変参照であっても同時に1スレッドしかロックを取得できず、パフォーマンスが良くないと思う。
|
||||
let mut records_arcs = vec![];
|
||||
for record_chunk in Detection::chunks(records, num_cpus::get() * 4) {
|
||||
let record_chunk_arc = Arc::new(record_chunk);
|
||||
records_arcs.push(record_chunk_arc);
|
||||
}
|
||||
|
||||
// 所有権共有のため、ruleをArcで囲む
|
||||
let rules_arc = Arc::new(rules);
|
||||
|
||||
// ルール実行するスレッドを作成。
|
||||
let mut handles = vec![];
|
||||
for record_chunk_arc in &records_arcs {
|
||||
let records_arc_clone = Arc::clone(&record_chunk_arc);
|
||||
let rules_clones = Arc::clone(&rules_arc);
|
||||
|
||||
let handle: JoinHandle<Vec<bool>> = spawn(async move {
|
||||
let mut ret = vec![];
|
||||
for record in records_arc_clone.iter() {
|
||||
for rule in rules_clones.iter() {
|
||||
if rule.select(record) {
|
||||
// TODO ここはtrue/falseじゃなくて、ruleとrecordのタプルをretにpushする実装に変更したい。
|
||||
ret.push(true);
|
||||
} else {
|
||||
ret.push(false);
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
});
|
||||
handles.push(handle);
|
||||
}
|
||||
|
||||
// メッセージを追加する。これを上記のspawnの中でやると、ロックの取得で逆に時間がかかるので、外に出す
|
||||
let mut message = MESSAGES.lock().unwrap();
|
||||
let mut handles_ite = handles.into_iter();
|
||||
for record_chunk_arc in &records_arcs {
|
||||
let mut handles_ret_ite = handles_ite.next().unwrap().await.unwrap().into_iter();
|
||||
for rule in rules_arc.iter() {
|
||||
for record_arc in record_chunk_arc.iter() {
|
||||
if handles_ret_ite.next().unwrap() == true {
|
||||
// TODO メッセージが多いと、rule.select()よりもこの処理の方が時間かかる。
|
||||
message.insert(
|
||||
record_arc,
|
||||
rule.yaml["title"].as_str().unwrap_or("").to_string(),
|
||||
rule.yaml["output"].as_str().unwrap_or("").to_string(),
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 配列を指定したサイズで分割する。Vector.chunksと同じ動作をするが、Vectorの関数だとinto的なことができないので自作
|
||||
fn chunks(ary: Vec<Value>, size: usize) -> Vec<Vec<Value>> {
|
||||
let arylen = ary.len();
|
||||
let mut ite = ary.into_iter();
|
||||
|
||||
let mut ret = vec![];
|
||||
for i in 0..arylen {
|
||||
if i % size == 0 {
|
||||
ret.push(vec![]);
|
||||
ret.iter_mut().last().unwrap().push(ite.next().unwrap());
|
||||
} else {
|
||||
ret.iter_mut().last().unwrap().push(ite.next().unwrap());
|
||||
}
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user