WIP: Feature/count sigma rule #93 (#113)

* Feature/call error message struct#66 (#69)

* change  way to use write trait #66

* change call error message struct #66

* erase finished TODO #66

* erase comment in error message format test #66

* resolve conflict #66

* Feature/call error message struct#66 (#71)

* change ERROR writeln struct #66

* under constructing

* add statistics template

* fix

* add comment

* add condition impl #93

* fix erased get_descendants and remove unnecessaly struct #93

* erased finished TODO comment

* erased finished TODO comment

* Revert "fix erased get_descendants and remove unnecessaly struct #93"

This reverts commit 82e905e045.

Revert "add condition impl #93"

This reverts commit 19ecc87377.

* add doc comment to rule function

* fix and add test doc commet

* add doc to AggregaationParseInfo

* add struct count in aggregation condition. #93

* add evaluate aggregation condition func provisional architecture. #93

* add countup function #93

* fix key to count hashmap #93

* add judge aggregation condition function #93

* fix  error #93

* fix test #93

* share compile error ver

* fix detection.rs compile error

* fix timeframe parse

* add countup process in select

* fix select argument

* add test countup

* add test count judge #93

* add SIGMA windows count field and by keyword #93

* fix reference record in countup/judgecount #93

* add timedata in countup schema #93

* Refact: split code for matcher from rule.rs

* Reafact: combine multiple declared functions

* Refact: split code for SelectionNode from rule.rs

* Refact: mv test code for SelectionNode from rule.rs

* Refact: mv condition's code from rule.rs

* add count to detection #93

* fix compile error

* fix source to test ng. #93

* erase unused variable #93

* fix count architecture #93

* fix comment and compile error

* erase dust (response  to review)

* erase dust (response to review)

* reduce calling Rulenode function (response to review)

* add aggregation output func

* erase dust(response to review) and add agg condition String func

* change error output

* reduce call RuleNode function(response to review)

* To reduce call RuleNode function

* fix test name

* fix coflicted resolve miss

* add code comment in timeframe count.

* add sort record timedata in timeframe(response to review)

* fix unnecesasry result in ArgResult

* add no field and by value count test

* create count test no field and by with timeframe

* erase duplicated timeframe data in RuleNode

* fix test error no field and no by count with timeframe

* fix test name

* add test case of exist field and by count.

* fix by count test and add test count othervalue in timeframe

* add test

* fix judge_timeframe logic when indexout

* fix test name and add count test field and by with timeframe

* adjust #120

* move associated count function from rulenode

* fix error when resolve conflict

* fix no output bug if exist output

Co-authored-by: HajimeTakai <takai.wa.hajime@gmail.com>
Co-authored-by: itiB <is0312vx@ed.ritsumei.ac.jp>
This commit is contained in:
DustInDark
2021-07-16 07:20:44 +09:00
committed by GitHub
parent 65b714b81b
commit 330cbb58ca
9 changed files with 1127 additions and 188 deletions

View File

@@ -1,7 +1,8 @@
extern crate csv;
use crate::detections::rule::AggResult;
use serde_json::Value;
use tokio::{spawn, task::JoinHandle};
use tokio::spawn;
use crate::detections::print::MESSAGES;
use crate::detections::rule;
@@ -45,7 +46,7 @@ impl Detection {
}
let tokio_rt = utils::create_tokio_runtime();
tokio_rt.block_on(self.execute_rule(rules, records));
tokio_rt.block_on(Detection::execute_rules(rules, records));
tokio_rt.shutdown_background();
}
@@ -94,82 +95,85 @@ impl Detection {
.collect();
}
// 検知ロジックを実行します。
async fn execute_rule(&mut self, rules: Vec<RuleNode>, records: Vec<EvtxRecordInfo>) {
// 複数スレッドで所有権を共有するため、recordsをArcでwwap
let mut records_arcs = vec![];
for record_chunk in Detection::chunks(records, num_cpus::get() * 4) {
let record_chunk_arc = Arc::new(record_chunk);
records_arcs.push(record_chunk_arc);
}
// 複数スレッドで所有権を共有するため、rulesをArcでwwap
let rules_arc = Arc::new(rules);
// ルール実行するスレッドを作成。
let mut handles = vec![];
for record_chunk_arc in &records_arcs {
let records_arc_clone = Arc::clone(&record_chunk_arc);
let rules_clones = Arc::clone(&rules_arc);
let handle: JoinHandle<Vec<bool>> = spawn(async move {
let mut ret = vec![];
for rule in rules_clones.iter() {
for record_info in records_arc_clone.iter() {
ret.push(rule.select(&record_info.record)); // 検知したか否かを配列に保存しておく
}
}
return ret;
async fn execute_rules(rules: Vec<RuleNode>, records: Vec<EvtxRecordInfo>) {
let records_arc = Arc::new(records);
let traiter = rules.into_iter();
// 各rule毎にスレッドを作成して、スレッドを起動する。
let handles = traiter.map(|rule| {
let records_cloned = Arc::clone(&records_arc);
return spawn(async move {
Detection::execute_rule(rule, records_cloned);
});
handles.push(handle);
});
// 全スレッドの実行完了を待機
for handle in handles {
handle.await.unwrap();
}
}
// 検知ロジックを実行します。
fn execute_rule(mut rule: RuleNode, records: Arc<Vec<EvtxRecordInfo>>) {
let records = &*records;
let agg_condition = rule.has_agg_condition();
for record_info in records {
let result = rule.select(&record_info.evtx_filepath, &record_info.record);
if !result {
continue;
}
// aggregation conditionが存在しない場合はそのまま出力対応を行う
if !agg_condition {
Detection::insert_message(&rule, &record_info);
return;
}
}
// メッセージを追加する。これを上記のspawnの中でやると、ロックの取得で逆に時間がかかるので、外に出す
let mut message = MESSAGES.lock().unwrap();
let mut handles_ite = handles.into_iter();
for record_chunk_arc in &records_arcs {
let mut handles_ret_ite = handles_ite.next().unwrap().await.unwrap().into_iter();
for rule in rules_arc.iter() {
for record_info_arc in record_chunk_arc.iter() {
if handles_ret_ite.next().unwrap() == false {
continue;
}
// TODO メッセージが多いと、rule.select()よりもこの処理の方が時間かかる。
message.insert(
record_info_arc.evtx_filepath.to_string(),
&record_info_arc.record,
rule.yaml["title"].as_str().unwrap_or("").to_string(),
rule.yaml["output"].as_str().unwrap_or("").to_string(),
);
}
let agg_results = rule.judge_satisfy_aggcondition();
for value in agg_results {
if agg_condition {
Detection::insert_agg_message(&rule, value);
}
}
}
// fn get_event_ids(rules: &Vec<RuleNode>) -> HashSet<i64> {
// return rules
// .iter()
// .map(|rule| rule.get_event_ids())
// .flatten()
// .collect();
// }
/// 条件に合致したレコードを表示するための関数
fn insert_message(rule: &RuleNode, record_info: &EvtxRecordInfo) {
MESSAGES.lock().unwrap().insert(
record_info.evtx_filepath.to_string(),
&record_info.record,
rule.yaml["title"].as_str().unwrap_or("").to_string(),
rule.yaml["output"].as_str().unwrap_or("").to_string(),
);
}
// 配列を指定したサイズで分割する。Vector.chunksと同じ動作をするが、Vectorの関数だとinto的なことができないので自作
fn chunks<T>(ary: Vec<T>, size: usize) -> Vec<Vec<T>> {
let arylen = ary.len();
let mut ite = ary.into_iter();
/// insert aggregation condition detection message to output stack
fn insert_agg_message(rule: &RuleNode, agg_result: AggResult) {
let output = Detection::create_count_output(rule, &agg_result);
MESSAGES.lock().unwrap().insert_message(
agg_result.filepath,
agg_result.start_timedate,
rule.yaml["title"].as_str().unwrap_or("").to_string(),
output.to_string(),
)
}
let mut ret = vec![];
for i in 0..arylen {
if i % size == 0 {
ret.push(vec![]);
ret.iter_mut().last().unwrap().push(ite.next().unwrap());
} else {
ret.iter_mut().last().unwrap().push(ite.next().unwrap());
}
///aggregation conditionのcount部分の検知出力文の文字列を返す関数
fn create_count_output(rule: &RuleNode, agg_result: &AggResult) -> String {
let mut ret: String = "count(".to_owned();
let key: Vec<&str> = agg_result.key.split("_").collect();
if key.len() >= 1 {
ret.push_str(key[0]);
}
ret.push_str(&") ");
if key.len() >= 2 {
ret.push_str("by ");
ret.push_str(key[1]);
}
ret.push_str(&format!(
"{} in {}.",
agg_result.condition_op_num,
rule.yaml["timeframe"].as_str().unwrap_or(""),
));
return ret;
}
}