Parsing CEF messages without Azure Monitor Agent
Introduction
During my time as SOC Engineer, I do a lot of third-party data source ingestion projects for clients into their Microsoft Sentinel instances. Most of these data sources are network security solutions like firewalls and proxy solutions.
When you want to ingest this data into Microsoft Sentinel, you have a couple of scenario's and architectures you can build to accomplish this. The architecture you will find the most on the internet is by using the Azure Monitoring Agent. Even though the features in the AMA agents are getting better, I still prefer to use Logstash during my ingestion projects instead. In my opinion, Logstash is more flexible and provides more capabilities than the AMA agent currently does.
The downside of using Logstash, is that there is no build-in parser to parse Syslog CEF messages to the CommonSecurityLog table of Microsoft Sentinel (which is handled in the AMA agent when you use AMA). This is why in this blog post I wanted to write down how I do the CEF to CommonSecurityLog parsing, and hopefully save you some time creating your own parser.
What is CEF
CEF, or Common Event Format, is a vendor-neutral format for logging data from network and security devices and appliances, such as firewalls, routers, detection and response solutions, and intrusion detection systems, as well as from other kinds of systems such as web servers. An extension of Syslog, it was developed especially for security information and event management (SIEM) solutions. CEF messages have a standard header that contains information such as the device vendor, the device product, the device version, the event class, the event severity, and the event ID. CEF messages also have a variable number of extensions that provide more details about the event, such as the source and destination IP addresses, the username, the file name, or the action taken.
Basically, CEF messages are constructed as below:
CEF:0|DeviceVendor|DeviceProduct|DeviceVersion|DeviceEventClassID|Activity|LogSeverity|AdditionalExtensions
Notice that a CEF message starts with 8 properties divided by 7 |
characters. This should be for all messages the case, although the order might differ between different products (which we will talk about later). In the latest property called AdditionalExtensions
, more information can be found related to the event. This data is described in key-value pares with the =
sign between them, ad uses white space as separator.
key1=value1 key2=value2 key3=value3 ...
From CEF to CommonSecurityLog
Where do you normalize the data
When using Logstash, you have the power to normalize and filter your data in the Logstash filter plugins. This is very powerful with a lot of flexibility, but is a bit less maintainable in my opinion. When you have multiple customers like I do with normalization logic existing in Logstash, you will need to change the Logstash configuration files on all the Logstash instances when a change in the normalization logic needs to happen. Exactly because of this reason, I like to do the normalization in the transformation logic of a Data Collection Rule in Azure.
Additionally, when doing your normalizations in a DCR rather than in Logstash, you have the following benefits:
- You can use KQL which is probably more known among your collogues.
- You can push changes using CI/CD with ARM or Bicep, rather than having to push Logstash config files using tools like Ansible.
- In most of the cases, acquiring the roles needed to do changes on an Azure resource is easier than acquiring the roles needed to perform change in config files of a Linux machine.
Transformation in DCR
The input data
Before you can do the transformation, you need to create a DCR which is based on an input JSON file. This file describes how data will come into your DCR. With Logstash, you can generate this file by using the following output plugin syntax:
input {
pipeline { address => sentinel }
}
output {
microsoft-sentinel-log-analytics-logstash-output-plugin {
create_sample_file => true
sample_file_path => "/tmp"
}
}
Typically, you will get a JSON object which will look as the following:
{
"severity": 0,
"message": "CEF:0|Check Point|URL Filtering|Check Point|Reject|custom_hacking_20160912|Unknown|act=Reject app=HTTPS deviceDirection=0",
"facility": 0,
"type": "syslog",
"host": "10.248.2.130",
"facility_label": "kernel",
"priority": 0,
"severity_label": "Emergency",
"ls_timestamp": "2025-01-08T14:50:53.342159510Z",
"ls_version": "1"
}
In this object you will see that the actual event generated by the data source is present in the "message" field. Other fields are the headers of the syslog message, and tell you more about the syslog message itself. For this blogpost, we will mainly use the message
, host
, and ls_timestamp
fields.
The normalization logic
To normalize this data into the CommonSecurityLog table, I used the following CEF and CommonSecurityLog field mapping and CommonSecurityLog table reference guide provided in the Microsoft Learn. These guides exactly describe which CEF fields corresponds with which columns in the CommonSecurityLog table and tells you in which data format they should be.
The hardest part now is making sure we can extract the keys and values in the message
field using KQL, and create new columns for them. To do this, I used the extract
function in KQL and Regular Expressions. The first regex I created was a regex that took a specific key, and took everything after the =
sign as a value until the next white space.
duser=([^\\s]+)
This worked great until I noticed that some vendors include spaces into their values as well:
To solve this issues, I wanted to work with a positive look ahead regex, to match every character until we found a new key with the =
separator sign:
duser=(.*?)(?= \\w+=)
But unfortunately, KQL uses the re2 library which do not support lookarounds in regex. Because of this, I came with a regex that matches every character between zero and unlimited times, as few times as possible (.*?
). Followed by matching a whitespace and word character between one and unlimited times followed by the =
sign (to match the next key-value pair)
duser=(.*?)\\s\\w+=
By using this regex in KQL with the extract function, we can extract any key-value pair from the AdditionalExtensions
field and save them in a new column.
Eventually, when you do this for all possible CEF values that are supported and convert them to their correct CommonSecurityLog column in the correct data type, you get the following generic transformation KQL:
source
// Normalize to CommonSecurityLog schema
| parse message with CEF: string
"|" DeviceVendor: string
"|" DeviceProduct: string
"|" DeviceVersion: string
"|" DeviceEventClassID: string
"|" Activity: string
"|" LogSeverity: string
"|" AdditionalExtensions: string
// Extract fields
| extend
DeviceAction = extract("act=(.*?)\\s\\w+=", 1, AdditionalExtensions),
ApplicationProtocol = extract("app=(.*?)\\s\\w+=", 1, AdditionalExtensions),
DeviceEventCategory = extract("cat=(.*?)\\s\\w+=", 1, AdditionalExtensions),
EventCount = toint(extract("cnt=(.*?)\\s\\w+=", 1, AdditionalExtensions)),
DestinationDnsDomain = extract("destinationDnsDomain=(.*?)\\s\\w+=", 1, AdditionalExtensions),
DestinationServiceName = extract("destinationServiceName=(.*?)\\s\\w+=", 1, AdditionalExtensions),
DestinationTranslatedAddress = extract("destinationTranslatedAddress=(.*?)\\s\\w+=", 1, AdditionalExtensions),
DestinationTranslatedPort = toint(extract("destinationTranslatedPort=(.*?)\\s\\w+=", 1, AdditionalExtensions)),
CommunicationDirection = extract("deviceDirection=(.*?)\\s\\w+=", 1, AdditionalExtensions),
DeviceDnsDomain = extract("deviceDnsDomain=(.*?)\\s\\w+=", 1, AdditionalExtensions),
deviceExternalId = toint(extract("deviceExternalId=(.*?)\\s\\w+=", 1, AdditionalExtensions)),
DeviceFacility = extract("deviceFacility=(.*?)\\s\\w+=", 1, AdditionalExtensions),
DeviceInboundInterface = extract("deviceInboundInterface=(.*?)\\s\\w+=", 1, AdditionalExtensions),
DeviceNtDomain = extract("deviceNtDomain=(.*?)\\s\\w+=", 1, AdditionalExtensions),
DeviceOutboundInterface = extract("deviceOutboundInterface=(.*?)\\s\\w+=", 1, AdditionalExtensions),
DevicePayloadId = extract("devicePayloadId=(.*?)\\s\\w+=", 1, AdditionalExtensions),
ProcessName = extract("deviceProcessName=(.*?)\\s\\w+=", 1, AdditionalExtensions),
DeviceTranslatedAddress = extract("deviceTranslatedAddress=(.*?)\\s\\w+=", 1, AdditionalExtensions),
DestinationHostName = extract("dhost=(.*?)\\s\\w+=", 1, AdditionalExtensions),
DestinationMacAddress = extract("dmac=(.*?)\\s\\w+=", 1, AdditionalExtensions),
DestinationNTDomain = extract("dntdom=(.*?)\\s\\w+=", 1, AdditionalExtensions),
DestinationProcessId = toint(extract("dpid=(.*?)\\s\\w+=", 1, AdditionalExtensions)),
DestinationUserPrivileges = extract("dpriv=(.*?)\\s\\w+=", 1, AdditionalExtensions),
DestinationProcessName = extract("dproc=(.*?)\\s\\w+=", 1, AdditionalExtensions),
DestinationPort = toint(extract("dpt=(.*?)\\s\\w+=", 1, AdditionalExtensions)),
DestinationIP = extract("dst=(.*?)\\s\\w+=", 1, AdditionalExtensions),
DeviceTimeZone = extract("dtz=(.*?)\\s\\w+=", 1, AdditionalExtensions),
DestinationUserId = extract("duid=(.*?)\\s\\w+=", 1, AdditionalExtensions),
DestinationUserName = extract("duser=(.*?)\\s\\w+=", 1, AdditionalExtensions),
DeviceAddress = extract("dvc=(.*?)\\s\\w+=", 1, AdditionalExtensions),
DeviceName = extract("dvchost=(.*?)\\s\\w+=", 1, AdditionalExtensions),
DeviceMacAddress = extract("dvcmac=(.*?)\\s\\w+=", 1, AdditionalExtensions),
ProcessID = toint(extract("dvcpid=(.*?)\\s\\w+=", 1, AdditionalExtensions)),
ExternalID = toint(extract("externalId=(.*?)\\s\\w+=", 1, AdditionalExtensions)),
FileCreateTime = extract("fileCreateTime=(.*?)\\s\\w+=", 1, AdditionalExtensions),
FileHash = extract("fileHash=(.*?)\\s\\w+=", 1, AdditionalExtensions),
FileID = extract("fileId=(.*?)\\s\\w+=", 1, AdditionalExtensions),
FileModificationTime = extract("fileModificationTime=(.*?)\\s\\w+=", 1, AdditionalExtensions),
FilePath = extract("filePath=(.*?)\\s\\w+=", 1, AdditionalExtensions),
FilePermission = extract("filePermission=(.*?)\\s\\w+=", 1, AdditionalExtensions),
FileType = extract("fileType=(.*?)\\s\\w+=", 1, AdditionalExtensions),
FileName = extract("fname=(.*?)\\s\\w+=", 1, AdditionalExtensions),
FileSize = toint(extract("fsize=(.*?)\\s\\w+=", 1, AdditionalExtensions)),
Computer = extract("Host=(.*?)\\s\\w+=", 1, AdditionalExtensions),
ReceivedBytes = tolong(extract("in=(.*?)\\s\\w+=", 1, AdditionalExtensions)),
Message = extract("msg=(.*?)\\s\\w+=", 1, AdditionalExtensions),
OldFileCreateTime = extract("oldFileCreateTime=(.*?)\\s\\w+=", 1, AdditionalExtensions),
OldFileHash = extract("oldFileHash=(.*?)\\s\\w+=", 1, AdditionalExtensions),
OldFileId = extract("oldFileId=(.*?)\\s\\w+=", 1, AdditionalExtensions),
OldFileModificationTime = extract("oldFileModificationTime=(.*?)\\s\\w+=", 1, AdditionalExtensions),
OldFileName = extract("oldFileName=(.*?)\\s\\w+=", 1, AdditionalExtensions),
OldFilePath = extract("oldFilePath=(.*?)\\s\\w+=", 1, AdditionalExtensions),
OldFilePermission = extract("oldFilePermission=(.*?)\\s\\w+=", 1, AdditionalExtensions),
OldFileSize = toint(extract("oldFileSize=(.*?)\\s\\w+=", 1, AdditionalExtensions)),
OldFileType = extract("oldFileType=(.*?)\\s\\w+=", 1, AdditionalExtensions),
SentBytes = tolong(extract("out=(.*?)\\s\\w+=", 1, AdditionalExtensions)),
EventOutcome = extract("outcome=(.*?)\\s\\w+=", 1, AdditionalExtensions),
Protocol = extract("proto=(.*?)\\s\\w+=", 1, AdditionalExtensions),
Reason = extract("reason=(.*?)\\s\\w+=", 1, AdditionalExtensions),
RequestURL = extract("Request=(.*?)\\s\\w+=", 1, AdditionalExtensions),
RequestClientApplication = extract("requestClientApplication=(.*?)\\s\\w+=", 1, AdditionalExtensions),
RequestContext = extract("requestContext=(.*?)\\s\\w+=", 1, AdditionalExtensions),
RequestCookies = extract("requestCookies=(.*?)\\s\\w+=", 1, AdditionalExtensions),
RequestMethod = extract("requestMethod=(.*?)\\s\\w+=", 1, AdditionalExtensions),
ReceiptTime = extract("rt=(.*?)\\s\\w+=", 1, AdditionalExtensions),
SourceHostName = extract("shost=(.*?)\\s\\w+=", 1, AdditionalExtensions),
SourceMacAddress = extract("smac=(.*?)\\s\\w+=", 1, AdditionalExtensions),
SourceNTDomain = extract("sntdom=(.*?)\\s\\w+=", 1, AdditionalExtensions),
SourceDnsDomain = extract("sourceDnsDomain=(.*?)\\s\\w+=", 1, AdditionalExtensions),
SourceServiceName = extract("sourceServiceName=(.*?)\\s\\w+=", 1, AdditionalExtensions),
SourceTranslatedAddress = extract("sourceTranslatedAddress=(.*?)\\s\\w+=", 1, AdditionalExtensions),
SourceTranslatedPort = toint(extract("sourceTranslatedPort=(.*?)\\s\\w+=", 1, AdditionalExtensions)),
SourceProcessId = toint(extract("spid=(.*?)\\s\\w+=", 1, AdditionalExtensions)),
SourceUserPrivileges = extract("spriv=(.*?)\\s\\w+=", 1, AdditionalExtensions),
SourceProcessName = extract("sproc=(.*?)\\s\\w+=", 1, AdditionalExtensions),
SourcePort = toint(extract("spt=(.*?)\\s\\w+=", 1, AdditionalExtensions)),
SourceIP = extract("src=(.*?)\\s\\w+=", 1, AdditionalExtensions),
SourceUserID = extract("suid=(.*?)\\s\\w+=", 1, AdditionalExtensions),
SourceUserName = extract("suser=(.*?)\\s\\w+=", 1, AdditionalExtensions),
EventType = toint(extract("type=(.*?)\\s\\w+=", 1, AdditionalExtensions))
// Extract custom fields
| extend
DeviceCustomString1 = extract("cs1=(.*?)\\s\\w+=", 1, AdditionalExtensions),
DeviceCustomString1Label = extract("cs1Label=(.*?)\\s\\w+=", 1, AdditionalExtensions),
DeviceCustomString2 = extract("cs2=(.*?)\\s\\w+=", 1, AdditionalExtensions),
DeviceCustomString2Label = extract("cs2Label=(.*?)\\s\\w+=", 1, AdditionalExtensions),
DeviceCustomString3 = extract("cs3=(.*?)\\s\\w+=", 1, AdditionalExtensions),
DeviceCustomString3Label = extract("cs3Label=(.*?)\\s\\w+=", 1, AdditionalExtensions),
DeviceCustomString4 = extract("cs4=(.*?)\\s\\w+=", 1, AdditionalExtensions),
DeviceCustomString4Label = extract("cs4Label=(.*?)\\s\\w+=", 1, AdditionalExtensions),
DeviceCustomString5 = extract("cs5=(.*?)\\s\\w+=", 1, AdditionalExtensions),
DeviceCustomString5Label = extract("cs5Label=(.*?)\\s\\w+=", 1, AdditionalExtensions),
DeviceCustomString6 = extract("cs6=(.*?)\\s\\w+=", 1, AdditionalExtensions),
DeviceCustomString6Label = extract("cs6Label=(.*?)\\s\\w+=", 1, AdditionalExtensions),
DeviceCustomNumber1 = toint(extract("cn1=(.*?)\\s\\w+=", 1, AdditionalExtensions)),
DeviceCustomNumber1Label = extract("cn1Label=(.*?)\\s\\w+=", 1, AdditionalExtensions),
DeviceCustomNumber2 = toint(extract("cn2=(.*?)\\s\\w+=", 1, AdditionalExtensions)),
DeviceCustomNumber2Label = extract("cn2Label=(.*?)\\s\\w+=", 1, AdditionalExtensions),
DeviceCustomNumber3 = toint(extract("cn3=(.*?)\\s\\w+=", 1, AdditionalExtensions)),
DeviceCustomNumber3Label = extract("cn3Label=(.*?)\\s\\w+=", 1, AdditionalExtensions),
FlexString1 = extract("flexString1=(.*?)\\s\\w+=", 1, AdditionalExtensions),
FlexString1Label = extract("flexString1Label=(.*?)\\s\\w+=", 1, AdditionalExtensions),
FlexString2 = extract("flexString2=(.*?)\\s\\w+=", 1, AdditionalExtensions),
FlexString2Label = extract("flexString2Label=(.*?)\\s\\w+=", 1, AdditionalExtensions),
DeviceCustomIPv6Address1 = extract("c6a1=(.*?)\\s\\w+=", 1, AdditionalExtensions),
DeviceCustomIPv6Address1Label = extract("c6a1Label=(.*?)\\s\\w+=", 1, AdditionalExtensions),
DeviceCustomIPv6Address2 = extract("c6a2=(.*?)\\s\\w+=", 1, AdditionalExtensions),
DeviceCustomIPv6Address2Label = extract("c6a2Label=(.*?)\\s\\w+=", 1, AdditionalExtensions),
DeviceCustomIPv6Address3 = extract("c6a3=(.*?)\\s\\w+=", 1, AdditionalExtensions),
DeviceCustomIPv6Address3Label = extract("c6a3Label=(.*?)\\s\\w+=", 1, AdditionalExtensions),
DeviceCustomIPv6Address4 = extract("c6a4=(.*?)\\s\\w+=", 1, AdditionalExtensions),
DeviceCustomIPv6Address4Label = extract("c6a4Label=(.*?)\\s\\w+=", 1, AdditionalExtensions),
DeviceCustomFloatingPoint1 = toreal(extract("cfp1=(.*?)\\s\\w+=", 1, AdditionalExtensions)),
deviceCustomFloatingPoint1Label = extract("cfp1Label=(.*?)\\s\\w+=", 1, AdditionalExtensions),
DeviceCustomFloatingPoint2 = toreal(extract("cfp2=(.*?)\\s\\w+=", 1, AdditionalExtensions)),
deviceCustomFloatingPoint2Label = extract("cfp2Label=(.*?)\\s\\w+=", 1, AdditionalExtensions),
DeviceCustomFloatingPoint3 = toreal(extract("cfp3=(.*?)\\s\\w+=", 1, AdditionalExtensions)),
deviceCustomFloatingPoint3Label = extract("cfp3Label=(.*?)\\s\\w+=", 1, AdditionalExtensions),
DeviceCustomFloatingPoint4 = toreal(extract("cfp4=(.*?)\\s\\w+=", 1, AdditionalExtensions)),
deviceCustomFloatingPoint4Label = extract("cfp4Label=(.*?)\\s\\w+=", 1, AdditionalExtensions),
DeviceCustomDate1 = extract("deviceCustomDate1=(.*?)\\s\\w+=", 1, AdditionalExtensions),
DeviceCustomDate1Label = extract("deviceCustomDate1Label=(.*?)\\s\\w+=", 1, AdditionalExtensions),
DeviceCustomDate2 = extract("deviceCustomDate2=(.*?)\\s\\w+=", 1, AdditionalExtensions),
DeviceCustomDate2Label = extract("deviceCustomDate2Label=(.*?)\\s\\w+=", 1, AdditionalExtensions),
FlexDate1 = extract("flexDate1=(.*?)\\s\\w+=", 1, AdditionalExtensions),
FlexDate1Label = extract("flexDate1Label=(.*?)\\s\\w+=", 1, AdditionalExtensions),
FlexNumber1 = toint(extract("flexNumber1=(.*?)\\s\\w+=", 1, AdditionalExtensions)),
FlexNumber1Label = extract("flexNumber1Label=(.*?)\\s\\w+=", 1, AdditionalExtensions),
FlexNumber2 = toint(extract("flexNumber2=(.*?)\\s\\w+=", 1, AdditionalExtensions)),
FlexNumber2Label = extract("flexNumber2Label=(.*?)\\s\\w+=", 1, AdditionalExtensions)
| extend TimeGenerated = todatetime(ls_timestamp), Computer = tostring(host)
| project-away
message,
facility,
facility_label,
ls_version,
priority,
severity,
severity_label,
['type'],
CEF,
ls_timestamp,
host,
tags
This is the transformation KQL which you can use in your Data Collection Rules.
Ingestion delay
Some people might question if such a big query doesn't result in a high ingestion delay. When testing this on multiple environments, I conclude on a ingestion delay of around 9 seconds.
CommonSecurityLog
| extend IngestionTime = ingestion_time()
| project TimeGenerated, IngestionTime
| extend TimeDifference = IngestionTime - TimeGenerated
| summarize avg(TimeDifference)
Specific field nuance
It is important to keep in mind that some security appliance vendors include specific key-value pairs not present in the CEF standard. For these key-value pairs you will have to write extra extraction logics that fit in the correct CommonSecurityLog columns.
CheckPoint example
aggregated_log_count=4 browse_time=0 hll_key=13140258
PaloAlto example
PanOSTunnelType=N/A PanOSThreatCategory=N/A PanOSContentVer=WildFire-0 PanOSAssocID=0
Order of CEF headers
As discussed earlier, the CEF messages contain of 8 fields. Unfortunately some vendors changes the position of these fields, meaning you will have to double check the above KQL if the below sequence is in fact correct for you.
CheckPoint example log
CEF:0|Check Point|URL Filtering|Check Point|Reject|custom_hacking_20160912|Unknown|act=Reject app=HTTPS deviceDirection=0
For above log you would need the following sequence:
// Normalize to CommonSecurityLog schema
| parse message with CEF: string
"|" DeviceVendor: string
"|" DeviceEventClassID: string
"|" DeviceProduct: string
"|" DeviceVersion: string
"|" Activity: string
"|" LogSeverity: string
"|" AdditionalExtensions: string
PaloAlto example log
CEF:0|Palo Alto Networks|PAN-OS|10.2.9-h1|wildfire|THREAT|1| deviceExternalId=0231019336 src=10.169.162.135
For above log you would need the following sequence:
// Normalize to CommonSecurityLog schema
| parse message with CEF: string
"|" DeviceVendor: string
"|" DeviceProduct: string
"|" DeviceVersion: string
"|" DeviceEventClassID: string
"|" Activity: string
"|" LogSeverity: string
"|" AdditionalExtensions: string
Creating the DCR via the portal
To be totally transparent, I am not sure if this is the best way of creating a DCR, but it is the way I do it when I need to create a DCR via the Azure Portal:
- Go to the Log Analytics workspace and create a new custom log that is DCR-based
- Define a table name, data collection rule name, and data collection endpoint name
- Upload the sample file and add the transformation of your data source.
- Go to the DCR you created an navigate to export template > deploy
- Now click on ‘Edit template’ and change the outputstream to ‘Microsoft-CommonSecurityLog’. This will change the destination table to the CommonSecurityLog table instead of the custom table we created earlier. Optionally, you can also change the workspace to something like ‘myworkspace’.
- Save the template, and redeploy the template under the same or another name.
- Optionally, you can now delete the custom table that was created after saving the DCR.
Errors