Feature/filter record by eventid#94 (#95)

* add function to get event id from rootnode.

* refactoring #76

* maybe fix bug.

* before test

* fix source files.

* cargo fmt --all

* add threadnum parameter
This commit is contained in:
James
2021-05-06 20:58:43 +09:00
committed by GitHub
parent a68a59417d
commit 2f24dc775f
4 changed files with 280 additions and 156 deletions

View File

@@ -1,18 +1,17 @@
extern crate csv;
use crate::detections::print::AlertMessage;
use crate::detections::print::MESSAGES;
use crate::detections::rule;
use crate::detections::rule::RuleNode;
use crate::detections::{print::AlertMessage, utils};
use crate::yaml::ParseYaml;
use evtx::err;
use evtx::{EvtxParser, ParserSettings, SerializedEvtxRecord};
use serde_json::{Error, Value};
use tokio::runtime;
use serde_json::Value;
use tokio::{spawn, task::JoinHandle};
use std::path::PathBuf;
use std::{collections::HashSet, path::PathBuf};
use std::{fs::File, sync::Arc};
const DIRPATH_RULES: &str = "rules";
@@ -47,15 +46,16 @@ impl Detection {
return;
}
let records = self.evtx_to_jsons(evtx_files);
runtime::Runtime::new()
.unwrap()
.block_on(self.execute_rule(rules, records));
let records = self.evtx_to_jsons(&evtx_files, &rules);
let tokio_rt = utils::create_tokio_runtime();
tokio_rt.block_on(self.execute_rule(rules, records));
tokio_rt.shutdown_background();
}
// ルールファイルをパースします。
fn parse_rule_files(&self) -> Vec<RuleNode> {
// load rule files
// ルールファイルのパースを実行
let mut rulefile_loader = ParseYaml::new();
let resutl_readdir = rulefile_loader.read_dir(DIRPATH_RULES);
if resutl_readdir.is_err() {
@@ -65,49 +65,52 @@ impl Detection {
return vec![];
}
let return_if_success = |mut rule: RuleNode| {
let err_msgs_result = rule.init();
if err_msgs_result.is_ok() {
return Option::Some(rule);
}
// ruleファイルのパースに失敗した場合はエラー出力
err_msgs_result.err().iter().for_each(|err_msgs| {
let stdout = std::io::stdout();
let mut stdout = stdout.lock();
let errmsg_body = format!(
"Failed to parse Rule file. (Error Rule Title : {})",
rule.yaml["title"].as_str().unwrap_or("")
);
AlertMessage::alert(&mut stdout, errmsg_body).ok();
err_msgs.iter().for_each(|err_msg| {
AlertMessage::alert(&mut stdout, err_msg.to_string()).ok();
});
println!("");
});
return Option::None;
};
// parse rule files
return rulefile_loader
.files
.into_iter()
.map(|rule_file| rule::parse_rule(rule_file))
.filter_map(|mut rule| {
let err_msgs_result = rule.init();
if err_msgs_result.is_ok() {
return Option::Some(rule);
}
// ruleファイルの初期化失敗時のエラーを表示する部分
err_msgs_result.err().iter().for_each(|err_msgs| {
// TODO 本当はファイルパスを出力したい
// ParseYamlの変更が必要なので、一旦yamlのタイトルを表示。
let stdout = std::io::stdout();
let mut stdout = stdout.lock();
AlertMessage::alert(
&mut stdout,
format!(
"Failed to parse Rule file. (Error Rule Title : {})",
rule.yaml["title"].as_str().unwrap_or("")
),
)
.ok();
err_msgs.iter().for_each(|err_msg| {
AlertMessage::alert(&mut stdout, err_msg.to_string()).ok();
});
println!("");
});
return Option::None;
})
.filter_map(return_if_success)
.collect();
}
// evtxファイルをjsonに変換します。
fn evtx_to_jsons(&mut self, evtx_files: Vec<PathBuf>) -> Vec<EvtxRecordInfo> {
fn evtx_to_jsons(
&mut self,
evtx_files: &Vec<PathBuf>,
rules: &Vec<RuleNode>,
) -> Vec<EvtxRecordInfo> {
// EvtxParserを生成する。
let evtx_parsers: Vec<EvtxParser<File>> = evtx_files
.iter()
.clone()
.into_iter()
.filter_map(|evtx_file| {
// convert to evtx parser
// println!("PathBuf:{}", evtx_file.display());
match EvtxParser::from_path(evtx_file) {
Ok(parser) => Option::Some(parser),
Err(e) => {
@@ -118,43 +121,32 @@ impl Detection {
})
.collect();
let xml_records = runtime::Runtime::new()
.unwrap()
.block_on(self.evtx_to_xml(evtx_parsers, &evtx_files));
let json_records = runtime::Runtime::new()
.unwrap()
.block_on(self.xml_to_json(xml_records, &evtx_files));
let tokio_rt = utils::create_tokio_runtime();
let xml_records = tokio_rt.block_on(self.evtx_to_xml(evtx_parsers, &evtx_files));
let json_records = tokio_rt.block_on(self.xml_to_json(xml_records, &evtx_files, &rules));
tokio_rt.shutdown_background();
let mut evtx_file_index = 0;
return json_records
.into_iter()
.map(|json_records_per_evtxfile| {
let evtx_filepath = evtx_files[evtx_file_index].display().to_string();
let ret: Vec<EvtxRecordInfo> = json_records_per_evtxfile
.into_iter()
.map(|json_record| {
return EvtxRecordInfo {
evtx_filepath: String::from(&evtx_filepath),
record: json_record,
};
})
.collect();
evtx_file_index = evtx_file_index + 1;
return ret;
.map(|(parser_idx, json_record)| {
let evtx_filepath = evtx_files[parser_idx].display().to_string();
return EvtxRecordInfo {
evtx_filepath: String::from(&evtx_filepath),
record: json_record,
};
})
.flatten()
.collect();
}
// evtxファイルからxmlを生成する。
// ちょっと分かりにくいですが、戻り値の型はVec<SerializedEvtxRecord<String>>ではなくて、Vec<Vec<SerializedEvtxRecord<String>>>になっています。
// 2次元配列にしている理由は、この後Value型(EvtxのXMLをJSONに変換したやつ)とイベントファイルのパスをEvtxRecordInfo構造体で保持するためです。
// EvtxParser毎にSerializedEvtxRecord<String>をグルーピングするために2次元配列にしています。
// 戻り値は「どのイベントファイルから生成されたXMLかを示すindex」と「変換されたXML」のタプルです。
// タプルのindexは、引数で指定されるevtx_filesのindexに対応しています。
async fn evtx_to_xml(
&mut self,
evtx_parsers: Vec<EvtxParser<File>>,
evtx_files: &Vec<PathBuf>,
) -> Vec<Vec<SerializedEvtxRecord<String>>> {
) -> Vec<(usize, SerializedEvtxRecord<String>)> {
// evtx_parser.records_json()でevtxをxmlに変換するJobを作成
let handles: Vec<JoinHandle<Vec<err::Result<SerializedEvtxRecord<String>>>>> = evtx_parsers
.into_iter()
@@ -162,6 +154,8 @@ impl Detection {
return spawn(async move {
let mut parse_config = ParserSettings::default();
parse_config = parse_config.separate_json_attributes(true);
parse_config = parse_config.num_threads(utils::get_thread_num());
evtx_parser = evtx_parser.with_configuration(parse_config);
let values = evtx_parser.records_json().collect();
return values;
@@ -171,11 +165,10 @@ impl Detection {
// 作成したjobを実行し(handle.awaitの部分)、スレッドの実行時にエラーが発生した場合、標準エラー出力に出しておく
let mut ret = vec![];
let mut evtx_file_index = 0;
for handle in handles {
for (parser_idx, handle) in handles.into_iter().enumerate() {
let future_result = handle.await;
if future_result.is_err() {
let evtx_filepath = &evtx_files[evtx_file_index].display();
let evtx_filepath = &evtx_files[parser_idx].display();
let errmsg = format!(
"Failed to parse event file. EventFile:{} Error:{}",
evtx_filepath,
@@ -185,111 +178,113 @@ impl Detection {
continue;
}
evtx_file_index = evtx_file_index + 1;
ret.push(future_result.unwrap());
future_result.unwrap().into_iter().for_each(|parse_result| {
ret.push((parser_idx, parse_result));
});
}
// パースに失敗しているレコードを除外して、返す。
// SerializedEvtxRecord<String>がどのEvtxParserからパースされたのか分かるようにするため、2次元配列のまま返す。
let mut evtx_file_index = 0;
return ret
.into_iter()
.map(|parse_results| {
let ret = parse_results
.into_iter()
.filter_map(|parse_result| {
if parse_result.is_err() {
let evtx_filepath = &evtx_files[evtx_file_index].display();
let errmsg = format!(
"Failed to parse event file. EventFile:{} Error:{}",
evtx_filepath,
parse_result.unwrap_err()
);
AlertMessage::alert(&mut std::io::stdout().lock(), errmsg).ok();
return Option::None;
}
return Option::Some(parse_result.unwrap());
})
.collect();
evtx_file_index = evtx_file_index + 1;
return ret;
.filter_map(|(parser_idx, parse_result)| {
if parse_result.is_err() {
let evtx_filepath = &evtx_files[parser_idx].display();
let errmsg = format!(
"Failed to parse event file. EventFile:{} Error:{}",
evtx_filepath,
parse_result.unwrap_err()
);
AlertMessage::alert(&mut std::io::stdout().lock(), errmsg).ok();
return Option::None;
}
return Option::Some((parser_idx, parse_result.unwrap()));
})
.collect();
}
// xmlからjsonに変換します。
// 戻り値は「どのイベントファイルから生成されたXMLかを示すindex」と「変換されたJSON」のタプルです。
// タプルのindexは、引数で指定されるevtx_filesのindexに対応しています。
async fn xml_to_json(
&mut self,
xml_records: Vec<Vec<SerializedEvtxRecord<String>>>,
xml_records: Vec<(usize, SerializedEvtxRecord<String>)>,
evtx_files: &Vec<PathBuf>,
) -> Vec<Vec<Value>> {
// xmlからjsonに変換するJobを作成
let handles: Vec<Vec<JoinHandle<Result<Value, Error>>>> = xml_records
.into_iter()
.map(|xml_records| {
return xml_records
.into_iter()
.map(|xml_record| {
return spawn(async move {
return serde_json::from_str(&xml_record.data);
});
})
.collect();
})
.collect();
rules: &Vec<RuleNode>,
) -> Vec<(usize, Value)> {
// TODO スレッド作り過ぎなので、数を減らす
// 作成したjobを実行し(handle.awaitの部分)、スレッドの実行時にエラーが発生した場合、標準エラー出力に出しておく
let mut ret = vec![];
let mut evtx_file_index = 0;
for handles_per_evtxfile in handles {
let mut sub_ret = vec![];
for handle in handles_per_evtxfile {
let future_result = handle.await;
if future_result.is_err() {
let evtx_filepath = &evtx_files[evtx_file_index].display();
// 非同期で実行される無名関数を定義
let async_job = |pair: (usize, SerializedEvtxRecord<String>),
event_id_set: Arc<HashSet<i64>>,
evtx_files: Arc<Vec<PathBuf>>| {
let parser_idx = pair.0;
let handle = spawn(async move {
let parse_result = serde_json::from_str(&pair.1.data);
// パースに失敗した場合はエラー出力しておく。
if parse_result.is_err() {
let evtx_filepath = &evtx_files[parser_idx].display();
let errmsg = format!(
"Failed to serialize from event xml to json. EventFile:{} Error:{}",
evtx_filepath,
future_result.unwrap_err()
parse_result.unwrap_err()
);
AlertMessage::alert(&mut std::io::stdout().lock(), errmsg).ok();
continue;
return Option::None;
}
sub_ret.push(future_result.unwrap());
}
ret.push(sub_ret);
evtx_file_index = evtx_file_index + 1;
}
// JSONの変換に失敗したものを除外して、返す。
// ValueがどのEvtxParserからパースされたのか分かるようにするため、2次元配列のまま返す。
let mut evtx_file_index = 0;
return ret
.into_iter()
.map(|parse_results| {
let successed = parse_results
.into_iter()
.filter_map(|parse_result| {
if parse_result.is_err() {
let evtx_filepath = &evtx_files[evtx_file_index].display();
let errmsg = format!(
"Failed to serialize from event xml to json. EventFile:{} Error:{}",
evtx_filepath,
parse_result.unwrap_err()
);
AlertMessage::alert(&mut std::io::stdout().lock(), errmsg).ok();
// ルールファイルで検知しようとしているEventIDでないレコードはここで捨てる。
let parsed_json: Value = parse_result.unwrap();
let event_id_opt = utils::get_event_value(&utils::get_event_id_key(), &parsed_json);
return event_id_opt
.and_then(|event_id| event_id.as_i64())
.and_then(|event_id| {
if event_id_set.contains(&event_id) {
return Option::Some(parsed_json);
} else {
return Option::None;
}
});
});
return Option::Some(parse_result.unwrap());
})
.collect();
evtx_file_index = evtx_file_index + 1;
return successed;
return (parser_idx, handle);
};
// 非同期で実行するスレッドを生成し、実行する。
let event_id_set_arc = Arc::new(Detection::get_event_ids(rules));
let evtx_files_arc = Arc::new(evtx_files.clone());
let handles: Vec<(usize, JoinHandle<Option<Value>>)> = xml_records
.into_iter()
.map(|xml_record_pair| {
let event_id_set_clone = Arc::clone(&event_id_set_arc);
let evtx_files_clone = Arc::clone(&evtx_files_arc);
return async_job(xml_record_pair, event_id_set_clone, evtx_files_clone);
})
.collect();
// スレッドの終了待ちをしている。
let mut ret = vec![];
for (parser_idx, handle) in handles {
let future = handle.await;
// スレッドが正常に完了しなかった場合はエラーメッセージを出力する。
if future.is_err() {
let evtx_filepath = &evtx_files[parser_idx].display();
let errmsg = format!(
"Failed to serialize from event xml to json. EventFile:{} Error:{}",
evtx_filepath,
future.unwrap_err()
);
AlertMessage::alert(&mut std::io::stdout().lock(), errmsg).ok();
continue;
}
// パース失敗やルールファイルで検知しようとしていないEventIDの場合等はis_none()==trueになる。
let parse_result = future.unwrap();
if parse_result.is_none() {
continue;
}
ret.push((parser_idx, parse_result.unwrap()));
}
return ret;
}
// 検知ロジックを実行します。
@@ -312,14 +307,9 @@ impl Detection {
let handle: JoinHandle<Vec<bool>> = spawn(async move {
let mut ret = vec![];
for record_info in records_arc_clone.iter() {
for rule in rules_clones.iter() {
if rule.select(&record_info.record) {
// TODO ここはtrue/falseじゃなくて、ruleとrecordのタプルをretにpushする実装に変更したい。
ret.push(true);
} else {
ret.push(false);
}
for rule in rules_clones.iter() {
for record_info in records_arc_clone.iter() {
ret.push(rule.select(&record_info.record)); // 検知したか否かを配列に保存しておく
}
}
return ret;
@@ -350,6 +340,14 @@ impl Detection {
}
}
fn get_event_ids(rules: &Vec<RuleNode>) -> HashSet<i64> {
return rules
.iter()
.map(|rule| rule.get_event_ids())
.flatten()
.collect();
}
// 配列を指定したサイズで分割する。Vector.chunksと同じ動作をするが、Vectorの関数だとinto的なことができないので自作
fn chunks<T>(ary: Vec<T>, size: usize) -> Vec<Vec<T>> {
let arylen = ary.len();