Among other things, besides designing detections to run on a much bigger scale (hundreds of customers), I’m finally playing with Microsoft’s Sentinel SIEM and KQL language.
One of the first challenges we face is about standardizing coding practice. As a benefit, it makes it easier and faster to maintain and improve the queries in the long run.
An here’s a quick example I’d like to share, feedback always welcome.
How to make sure all “late arrivals” are checked by detection rules?
Late arrivals are those events that arrive at the SIEM minutes after it was generated. Some extreme cases go beyond one hour (ex.: network/zone restrictions, zip & encrypt before transmit, batches, etc).
Think about an activity performed in an endpoint, registered in the OS logging system, forwarded to a local log relay/concentrator and later shipped to Azure infrastructure. These hops might introduce transport delays and other log processing issues.
While some of those are temporary (network issues, cached logs suddenly released, etc), many cases are simply part of the norm.
The following KQL is inspired by this article and provides an average total latency between distinct log states. Simply add tables accordingly:
// running over last 7d should suffice
let records = 100000; // limits to 100k records per tablelet my_perf_table = Perf
| extend Table_Name = 'Perf'
| take records;let my_seclogs_table = CommonSecurityLog
| extend Table_Name = 'CommonSecurityLog'
| take records;let my_office_table = OfficeActivity
| extend Table_Name = 'OfficeActivity'
| take records;union my_perf_table, my_seclogs_table, my_office_table| extend Time_Ingested = ingestion_time()
| extend Time_Generated = TimeGenerated
| extend Time_Received = _TimeReceived| project Table_Name, Time_Generated, Time_Received, Time_Ingested| extend TotalLatency = toreal(datetime_diff('Second',Time_Ingested,Time_Generated))| summarize EventCount=count(), Max_TotalLatency=max(TotalLatency), Avg_TotalLatency=avg(TotalLatency) by Table_Name| extend Avg_TotalLatency_minutes = Avg_TotalLatency / 60| sort by Avg_TotalLatency_minutes desc
As expected, the closer the data source is to Azure Infra, the better it becomes.
So what’s the deal with that latency/delay?
Depending on the use case, that’s a big deal! I’ve described the very same challenge with Splunk here. In general lines, the main risks are:
- Missing events because the time window was already scanned.
- Overlapping time windows in detections, leading to the same record/event being scanned multiple times, generating duplicate results, etc. Besides resources over consumption!
Consider the following sequence:
- A rule checks for events generated within the last hour, every hour.
- The rule eventually executes at 10:00; then it scans for events with TimeGenerated between 09:01 and 10:00 (now). Results are displayed.
- A few minutes later, new logs are ingested and they happen to be generated within that time window already checked.
- In the next hour (11:00), another execution round comes, but this time the scanned window is 10:01 and 11:00. Events from previous step go under the radar… what a happy ending for a detection engineer?
A quick solution
Just like our old buddy _indextime in SPL, in Azure there’s ingestion time which is accessible via ingestion_time() call.
This should work well for detections where you need to make sure all events are scanned, regardless of their TimeGenerated value. That is, as soon as it’s ingested by the system is gets scanned.
Easy example is any detection based on simple, atomic indicators. For instance, all “High” severity signatures from an security appliance.
Query prototype below provides a mechanism to keep scanning the events ingested in the last hour, even if they actually happened 24h ago:
// Catch 'late arrivals' while avoiding double event scanning
let ingested_lookback = 1h; // earliest [ingest time] boundary
let generated_lookback = 24h; // earliest [log time] boundary<TABLE>| where TimeGenerated > ago(generated_lookback) and ingestion_time() > ago(ingested_lookback)
| where AlertSeverity == 'High'| project TimeGenerated, TimeIngested=ingestion_time(), <fields>
| summarize (...)
That example looks for any event with TimeGenerated within last 24h — as long as — it’s ingested within the last hour (hourly executed). That roughly provides up to 23h delay tolerance for an event to be scanned.
Not sure how the Log Analytics engine optimizes that query though, but overall queries are quite fast in Sentinel. I’m pretty impressed!
Just like in Splunk, there are 2 virtual time boundaries into play here: one is the time the log was generated and the second is the time it was ingested.
A record is only scanned when it falls into both, between each earliest and latest boundaries, determined by ago() and now (default latest), respectively.
Not all roses…
There are some situations when the TimeGenerated plays a key role in the detection logic and data analytics, so that simple approach might not address all cases, for instance when there’s not enough data to perform:
- Any rate based detection: brute-force, activity spikes, etc;
- Analytics around user/host profiles over time (summaries);
- Beaconing detection and many others!
In some situations, where we know the latency is significantly high (ex.: OfficeActivity table), there will be some compromise such as ‘late detection’ to allow for data completion and thus a more robust detection can be built.
In the end, a combination of all that plus event/alert aggregation and suppression — which is another challenge in Sentinel! — should be considered in all cases. Something MS will be definitely improving soon, just like it has been doing with the whole Azure platform.
Happy to discuss ideas and other approaches!
Update 1: a colleague just sent me this article which explores it a bit more.
Update 2: a PoC on how to add an offset to account for latency and time-sequence sensitive rules. Assume it runs hourly, scoping on the 3rd last hour:
let offset = 3h; // how far back to set the needle
let window = 1h; // time window to be checked
let time_latest = ago(offset);
let time_earliest = time_latest-window;
let time_executed= now();
// generate empty time-series
range TimeGenerated from ago(48h) to now() step 1s
| where TimeGenerated between (time_earliest .. time_latest)
| summarize count(), earliest_boundary=min(TimeGenerated), latest_boundary=max(TimeGenerated) by time_earliest=time_earliest, time_latest=time_latest, time_executed=time_executed
// what's the window size scanned?
| extend diff_boundary=latest_boundary-earliest_boundary