KQL Query to Detect Log Ingestion Downtime by data connector/tables in Microsoft Sentinel

Introduction

In a cybersecurity environment, timely detection of anomalies or issues in log ingestion is critical for maintaining the integrity and effectiveness of your security operations. Microsoft Sentinel offers a powerful platform for managing and analyzing security logs, but it’s equally important to have mechanisms in place to detect and address any downtime or delays in log ingestion. In this blog post, we’ll explore a Key Query Language (KQL) query designed to help you detect log ingestion downtime by data connectors and their respective tables in Microsoft Sentinel.

Value Proposition

The query is a must-have for all Microsoft Sentinel implementations. It addresses the critical need for timely detection of log ingestion issues, ensuring logs from essential sources are ingested promptly. With its comprehensive monitoring and proactive alerting capabilities, it’s indispensable for efficiently managing security risks and responding swiftly to threats.

  • Timely Detection: Quickly identifies if logs from critical sources like Windows Security Events or Syslog are delayed.
  • Comprehensive Monitoring: Monitors multiple data connectors for a complete view of log ingestion status.
  • Customizable and Scalable: Can be easily adjusted to monitor new data connectors and adapt to changing needs.

Note: The below KQL query serves as a foundational template for monitoring log ingestion in Microsoft Sentinel. However, it must be customized according to the specific data connectors available in your environment. Adjustments may be necessary to include relevant data sources and tailor alerting thresholds to align with your organization's security requirements and infrastructure setup.

Kql query to detect log ingestion downtime by data connector

let WindowsSecurityEventsviaAMA = SecurityEvent
| where isnotempty(TimeGenerated)
| extend tablename= "securityevent"
| summarize LastSeen= max(TimeGenerated)by tablename;
let syslog = Syslog
| where isnotempty(TimeGenerated)
| extend tablename= "syslog"
| summarize LastSeen= max(TimeGenerated)by tablename;
let SonicWall = CommonSecurityLog
| where DeviceVendor == "SonicWall"
| extend tablename= "SonicWall"
| summarize LastSeen= max(TimeGenerated)by tablename;
let SemperisDirectoryServicesProtector = dsp_parser
| where isnotempty(TimeGenerated)
| extend tablename= "dsp_parser"
| summarize LastSeen= max(TimeGenerated)by tablename;
let PaloAltoNetworks = CommonSecurityLog
| where DeviceVendor == "Palo Alto Networks" and DeviceProduct has "PAN-OS"
| extend tablename= "PaloAltoNetworks"
| summarize LastSeen= max(TimeGenerated)by tablename;
let EntraIDProtection = SecurityAlert
| where ProductName == "Azure Active Directory Identity Protection" and ProviderName == "IPC"
| extend tablename= "SecurityAlert (IPC)"
| summarize LastSeen= max(TimeGenerated)by tablename;
let securityalert = (datatype: string, name: string) {
SecurityAlert
| where ProviderName == name or ProductName == name
| extend tablename= datatype
| summarize LastSeen= max(TimeGenerated)by tablename
};
let crowdstrikefalcon = (datatype: string) {
table(datatype)
| where isnotempty(TimeGenerated)
| extend tablename= datatype
| summarize LastSeen= max(TimeGenerated)by tablename
};
let CrowdStrikeEndpointProtection= CommonSecurityLog
| where DeviceProduct == "FalconHost"
| extend tablename= "CrowdStrikeEndpointProtection"
| summarize LastSeen= max(TimeGenerated)by tablename;
let InfobloxNIOS= Infoblox
| extend tablename= "Infoblox"
| summarize LastSeen= max(TimeGenerated)by tablename;
let CitrixADC= CitrixADCEvent
| extend tablename= "CitrixADCEvent"
| summarize LastSeen= max(TimeGenerated)by tablename;
let ciscomeraki= CiscoMeraki
| extend tablename= "meraki_CL"
| summarize LastSeen= max(TimeGenerated)by tablename;
let table= union withsource=tablename *
| summarize LastSeen = max(TimeGenerated) by tablename, ProcessName, SourceSystem
| extend EntraID = iff(tablename in ("SigninLogs", "AuditLogs", "AADNonInteractiveUserSignInLogs", "AADRiskyServicePrincipals", "NetworkAccessTraffic", "AADServicePrincipalRiskEvents", "ADFSSignInLogs", "AADProvisioningLogs", "AADProvisioningLogs", "AADManagedIdentitySignInLogs", "AADUserRiskEvents", "AADUserRiskEvents"), "Microsoft_EntraID", "")
| extend CiscoISE = iff(ProcessName has_any ("CSCO", "CISE"), "Cisco Identity Services Engine", "")
| extend AmazonWebServicesS3 = iff(tablename in ("AWSCloudWatch", "AWSCloudTrail", "AWSGuardDuty", "AWSVPCFlow"), "Amazon Web Services S3 (Preview)", "")
| extend ArmisAlerts = iff(tablename in ("Armis_Alerts_CL"), "Armis Alerts (using Azure Function)", "")
| extend ArmisDevices = iff(tablename in ("Armis_Devices_CL"), "Armis Devices (using Azure Function)", "")
| extend AzureActivity = iff(tablename == "AzureActivity", "Azure Activity", "")
| extend OfficeActivity = iff(tablename == "OfficeActivity", "Microsoft 365 (formerly, Office 365)", "")
| extend ThreatIntelligenceIndicator = iff(tablename == "ThreatIntelligenceIndicator" and SourceSystem in ("Microsoft Defender Threat Intelligence", "Microsoft Emerging Threat Feed"), "Microsoft Defender Threat Intelligence (Preview)", "")
| extend DefenderXDR = iff(tablename in ("SecurityIncident", "SecurityAlert", "DeviceInfo", "DeviceNetworkInfo", "IdentityLogonEvents", "IdentityQueryEvents", "IdentityDirectoryEvents", "CloudAppEvents", "AlertInfo", "AlertEvidence", "DeviceEvents", "DeviceFileEvents", "DeviceImageLoadEvents", "DeviceLogonEvents", "DeviceNetworkEvents", "DeviceProcessEvents", "DeviceRegistryEvents", "DeviceFileCertificateInfo", "EmailEvents", "EmailUrlInfo", "EmailAttachmentInfo", "EmailPostDeliveryEvents", "UrlClickEvents"), "Microsoft Defender XDR", "")
| extend Okta = iff(tablename == "Okta_CL", "Okta Single Sign-On (using Azure Functions)", "")
| extend PaloAltoPrismaCloudCSPM = iff(tablename in ("PaloAltoPrismaCloudAlert_CL", "PaloAltoPrismaCloudAudit_CL"), "Palo Alto Prisma Cloud CSPM (using Azure Functions)", "")
| extend ProofpointOnDemandEmailSecurity = iff(tablename in ("ProofpointPOD_message_CL", "ProofpointPOD_maillog_CL"), "Proofpoint On Demand Email Security (using Azure Functions)", "")
| extend ProofpointTAP = iff(tablename in ("ProofPointTAPClicksPermitted_CL", "ProofPointTAPClicksBlocked_CL", "ProofPointTAPMessagesDelivered_CL", "ProofPointTAPMessagesBlocked_CL"), "Proofpoint TAP (using Azure Functions)", "")
| extend SailPointIdentityNow = iff(tablename in ("SailPointIDN_Events_CL", "SailPointIDN_Triggers_CL"), "SailPoint IdentityNow (using Azure Function)", "")
| extend ThreatIntelligenceUploadIndicatorsAPI = iff(tablename == "ThreatIntelligenceIndicator" and SourceSystem !in ('SecurityGraph', 'Azure Sentinel', 'Microsoft Sentinel'), "Threat Intelligence Upload Indicators API (Preview)", "")
| extend WindowsEvent = iff(tablename == "WindowsEvent", "Windows Forwarded Events", "");
let CrowdStrike_Additional_Events_CL = crowdstrikefalcon("CrowdStrike_Additional_Events_CL");
let CrowdStrike_Secondary_Data_CL = crowdstrikefalcon("CrowdStrike_Secondary_Data_CL");
let CrowdstrikeReplicatorLogs_CL = crowdstrikefalcon("CrowdstrikeReplicatorLogs_CL");
let ASimNetworkSessionLogs = crowdstrikefalcon("ASimNetworkSessionLogs");
let ASimDnsActivityLogs = crowdstrikefalcon("ASimDnsActivityLogs");
let ASimAuditEventLogs = crowdstrikefalcon("ASimAuditEventLogs");
let ASimRegistryEventLogs = crowdstrikefalcon("ASimRegistryEventLogs");
let ASimUserManagementActivityLogs = crowdstrikefalcon("ASimUserManagementActivityLogs");
let ASimProcessEventLogs = crowdstrikefalcon("ASimProcessEventLogs");
let ASimAuthenticationEventLogs = crowdstrikefalcon("ASimAuthenticationEventLogs");
let ASimFileEventLogs = crowdstrikefalcon("ASimFileEventLogs");
let DefenderforCloudApps = securityalert("DefenderforCloudApps", "Microsoft Cloud App Security");
let DefenderforIdentity = securityalert("DefenderforIdentity", "Azure Advanced Threat Protection");
let DefenderforEndpoint = securityalert("DefenderforEndpoint", "MDATP");
let DefenderforOffice365 = securityalert("DefenderforOffice365", "OATP");
union
ciscomeraki,
table,
CitrixADC,
InfobloxNIOS,
CrowdStrikeEndpointProtection,
CrowdStrike_Additional_Events_CL,
CrowdStrike_Secondary_Data_CL,
CrowdstrikeReplicatorLogs_CL,
ASimNetworkSessionLogs,
ASimDnsActivityLogs,
ASimAuditEventLogs,
ASimRegistryEventLogs,
ASimUserManagementActivityLogs,
ASimProcessEventLogs,
DefenderforCloudApps,
DefenderforIdentity,
ThreatIntelligenceIndicator,
EntraIDProtection,
PaloAltoNetworks,
SemperisDirectoryServicesProtector,
SonicWall,
syslog,
WindowsSecurityEventsviaAMA
| extend meraki_CL = iff (tablename == "meraki_CL", "Cisco Meraki", "")
| extend CitrixADCEvent =iff(tablename == "CitrixADCEvent", "Citrix ADC (former NetScaler)", "")
| extend Infoblox = iff (tablename == "Infoblox", "Infoblox NIOS", "")
| extend CrowdStrikeEndpointProtection = iff (tablename == "CrowdStrikeEndpointProtection", "CrowdStrike Falcon Endpoint Protection", "")
| extend CrowdstrikeReplicator = iff(tablename in("CrowdStrike_Secondary_Data_CL", "CrowdstrikeReplicatorLogs_CL", "ASimNetworkSessionLogs", "ASimDnsActivityLogs", "ASimAuditEventLogs", "ASimRegistryEventLogs", "ASimUserManagementActivityLogs", "ASimProcessEventLogs", "ASimAuthenticationEventLogs", "ASimFileEventLogs", "CrowdStrike_Additional_Events_CL"), "Crowdstrike Falcon Data Replicator V2 (using Azure Functions) (Preview)", "")
| extend MCAS = iff (tablename == "DefenderforCloudApps", "Microsoft Defender for Cloud Apps", "")
| extend MDI = iff (tablename == "DefenderforIdentity", "Microsoft Defender for Identity", "")
| extend MDE = iff (tablename == "DefenderforEndpoint", "Microsoft Defender for Endpoint", "")
| extend MDO = iff (tablename == "DefenderforOffice365", "Microsoft Defender for Office 365 (Preview)", "")
| extend ThreatIntelligence = iff (tablename == "ThreatIntelligenceIndicator", "Microsoft Defender Threat Intelligence", "")
| extend entraprotect = iff (tablename == "SecurityAlert (IPC)", "Microsoft Entra ID Protection", "")
| extend PaloAltoNetwork = iff (tablename == "PaloAltoNetworks", "Palo Alto Networks (Firewall)", "")
| extend SemperisDirectoryServicesProtector = iff (tablename == "dsp_parser", "Semperis Directory Services Protector", "")
| extend SonicWallFirewall = iff (tablename == "SonicWall", "SonicWall Firewall", "")
| extend syslog = iff (tablename == "syslog", "Syslog", "")
| extend securityevent = iff (tablename == "securityevent", "Windows Security Events via AMA", "")
| extend ["Data Connector"] = coalesce(EntraID, CiscoISE, AmazonWebServicesS3, ArmisAlerts, ArmisDevices, meraki_CL, CitrixADCEvent, CrowdstrikeReplicator, CrowdStrikeEndpointProtection, Infoblox, OfficeActivity, MCAS, MDI, MDE, MDO, ThreatIntelligenceIndicator, DefenderXDR, entraprotect, Okta, PaloAltoNetwork, PaloAltoPrismaCloudCSPM, ProofpointOnDemandEmailSecurity, ProofpointTAP, SailPointIdentityNow, SemperisDirectoryServicesProtector, SonicWallFirewall, syslog, ThreatIntelligenceUploadIndicatorsAPI, WindowsEvent, securityevent)
| where isnotempty(['Data Connector'])
| project-away
EntraID,
ProcessName,
SourceSystem,
CiscoISE,
AmazonWebServicesS3,
ArmisAlerts,
ArmisDevices,
AzureActivity,
meraki_CL,
CitrixADCEvent,
CrowdstrikeReplicator,
CrowdStrikeEndpointProtection,
Infoblox,
OfficeActivity,
MCAS,
MDI,
MDE,
MDO,
ThreatIntelligenceIndicator,
DefenderXDR,
Okta,
PaloAltoNetwork,
PaloAltoPrismaCloudCSPM,
ProofpointOnDemandEmailSecurity,
ProofpointTAP,
SailPointIdentityNow,
SemperisDirectoryServicesProtector,
SonicWallFirewall,
syslog,
ThreatIntelligenceUploadIndicatorsAPI,
WindowsEvent,
securityevent
| extend TimeDifference = datetime_diff("minute", now(), LastSeen)
| summarize LastSeen =max(LastSeen)by ['Data Connector'], ['Table Name']= tablename, TimeDifference
| project-reorder ['Data Connector'], ['Table Name'], LastSeen, TimeDifference
| sort by TimeDifference desc


The above will give you the result like below mentioned screenshot:

Visualization

The above KQL query can be leveraged to visualize the log downtime through workbooks and dashboards.

Visualizing the log ingestion status through a workbook provides a clear and intuitive way to monitor the health of data connectors in Microsoft Sentinel. It offers a quick overview of the last seen times for each data connector, highlighting any potential delays or issues in log ingestion.

Conclusion


In summary, the KQL query offers a solid foundation for monitoring log ingestion in Microsoft Sentinel, providing vital insights into data connector status. When combined with visual workbooks, these insights are enhanced, offering clear and intuitive representations of log ingestion health. Together, they empower organizations to proactively manage security operations, ensuring timely detection and resolution of any issues, thus bolstering overall cybersecurity defenses.

One response to “KQL Query to Detect Log Ingestion Downtime by data connector/tables in Microsoft Sentinel”

Leave a comment

I’m Sujit

Welcome to SecureBytes, my cozy corner of the internet where we explore the wonders of the online world. Join me on a journey of discovery, knowledge, and shared interests. Let’s navigate this digital realm together with curiosity and excitement. Ready to embark on this adventure? Let’s go!

Let’s connect