11 min read

Parsing CEF messages without Azure Monitor Agent

Parsing CEF messages without Azure Monitor Agent
Photo by Compare Fibre / Unsplash

Introduction

During my time as SOC Engineer, I do a lot of third-party data source ingestion projects for clients into their Microsoft Sentinel instances. Most of these data sources are network security solutions like firewalls and proxy solutions.

When you want to ingest this data into Microsoft Sentinel, you have a couple of scenario's and architectures you can build to accomplish this. The architecture you will find the most on the internet is by using the Azure Monitoring Agent. Even though the features in the AMA agents are getting better, I still prefer to use Logstash during my ingestion projects instead. In my opinion, Logstash is more flexible and provides more capabilities than the AMA agent currently does.

The downside of using Logstash, is that there is no build-in parser to parse Syslog CEF messages to the CommonSecurityLog table of Microsoft Sentinel (which is handled in the AMA agent when you use AMA). This is why in this blog post I wanted to write down how I do the CEF to CommonSecurityLog parsing, and hopefully save you some time creating your own parser.

What is CEF

CEF, or Common Event Format, is a vendor-neutral format for logging data from network and security devices and appliances, such as firewalls, routers, detection and response solutions, and intrusion detection systems, as well as from other kinds of systems such as web servers. An extension of Syslog, it was developed especially for security information and event management (SIEM) solutions. CEF messages have a standard header that contains information such as the device vendor, the device product, the device version, the event class, the event severity, and the event ID. CEF messages also have a variable number of extensions that provide more details about the event, such as the source and destination IP addresses, the username, the file name, or the action taken.

Basically, CEF messages are constructed as below:

CEF:0|DeviceVendor|DeviceProduct|DeviceVersion|DeviceEventClassID|Activity|LogSeverity|AdditionalExtensions

Notice that a CEF message starts with 8 properties divided by 7 | characters. This should be for all messages the case, although the order might differ between different products (which we will talk about later). In the latest property called AdditionalExtensions, more information can be found related to the event. This data is described in key-value pares with the = sign between them, ad uses white space as separator.

key1=value1 key2=value2 key3=value3 ...

From CEF to CommonSecurityLog

Where do you normalize the data

When using Logstash, you have the power to normalize and filter your data in the Logstash filter plugins. This is very powerful with a lot of flexibility, but is a bit less maintainable in my opinion. When you have multiple customers like I do with normalization logic existing in Logstash, you will need to change the Logstash configuration files on all the Logstash instances when a change in the normalization logic needs to happen. Exactly because of this reason, I like to do the normalization in the transformation logic of a Data Collection Rule in Azure.

Additionally, when doing your normalizations in a DCR rather than in Logstash, you have the following benefits:

  • You can use KQL which is probably more known among your collogues.
  • You can push changes using CI/CD with ARM or Bicep, rather than having to push Logstash config files using tools like Ansible.
  • In most of the cases, acquiring the roles needed to do changes on an Azure resource is easier than acquiring the roles needed to perform change in config files of a Linux machine.

Transformation in DCR

The input data

Before you can do the transformation, you need to create a DCR which is based on an input JSON file. This file describes how data will come into your DCR. With Logstash, you can generate this file by using the following output plugin syntax:

input {
    pipeline { address => sentinel }
}
output {
    microsoft-sentinel-log-analytics-logstash-output-plugin {
        create_sample_file => true
        sample_file_path => "/tmp"
    }
}

Typically, you will get a JSON object which will look as the following:

{
  "severity": 0,
  "message": "CEF:0|Check Point|URL Filtering|Check Point|Reject|custom_hacking_20160912|Unknown|act=Reject app=HTTPS deviceDirection=0",
  "facility": 0,
  "type": "syslog",
  "host": "10.248.2.130",
  "facility_label": "kernel",
  "priority": 0,
  "severity_label": "Emergency",
  "ls_timestamp": "2025-01-08T14:50:53.342159510Z",
  "ls_version": "1"
}
💡
When you do not change the data in the Logstash filter plugins and use syslog as input plugin, this will in most of the cases be the JSON you will end up with. I do recommend do double check this in your setup.

In this object you will see that the actual event generated by the data source is present in the "message" field. Other fields are the headers of the syslog message, and tell you more about the syslog message itself. For this blogpost, we will mainly use the message, host, and ls_timestamp fields.

The normalization logic

To normalize this data into the CommonSecurityLog table, I used the following CEF and CommonSecurityLog field mapping and CommonSecurityLog table reference guide provided in the Microsoft Learn. These guides exactly describe which CEF fields corresponds with which columns in the CommonSecurityLog table and tells you in which data format they should be.

The hardest part now is making sure we can extract the keys and values in the message field using KQL, and create new columns for them. To do this, I used the extract function in KQL and Regular Expressions. The first regex I created was a regex that took a specific key, and took everything after the = sign as a value until the next white space.

duser=([^\\s]+)

This worked great until I noticed that some vendors include spaces into their values as well:

To solve this issues, I wanted to work with a positive look ahead regex, to match every character until we found a new key with the = separator sign:

duser=(.*?)(?= \\w+=)

But unfortunately, KQL uses the re2 library which do not support lookarounds in regex. Because of this, I came with a regex that matches every character between zero and unlimited times, as few times as possible (.*?). Followed by matching a whitespace and word character between one and unlimited times followed by the = sign (to match the next key-value pair)

duser=(.*?)\\s\\w+=

By using this regex in KQL with the extract function, we can extract any key-value pair from the AdditionalExtensions field and save them in a new column.

Eventually, when you do this for all possible CEF values that are supported and convert them to their correct CommonSecurityLog column in the correct data type, you get the following generic transformation KQL:

source
// Normalize to CommonSecurityLog schema
| parse message with CEF: string 
    "|"     DeviceVendor: string 
    "|"     DeviceProduct: string 
    "|"     DeviceVersion: string
    "|"     DeviceEventClassID: string
    "|"     Activity: string 
    "|"     LogSeverity: string
    "|"     AdditionalExtensions: string
// Extract fields
| extend
    DeviceAction = extract("act=(.*?)\\s\\w+=", 1, AdditionalExtensions),
    ApplicationProtocol = extract("app=(.*?)\\s\\w+=", 1, AdditionalExtensions),
    DeviceEventCategory = extract("cat=(.*?)\\s\\w+=", 1, AdditionalExtensions),
    EventCount = toint(extract("cnt=(.*?)\\s\\w+=", 1, AdditionalExtensions)),
    DestinationDnsDomain = extract("destinationDnsDomain=(.*?)\\s\\w+=", 1, AdditionalExtensions),
    DestinationServiceName = extract("destinationServiceName=(.*?)\\s\\w+=", 1, AdditionalExtensions),
    DestinationTranslatedAddress = extract("destinationTranslatedAddress=(.*?)\\s\\w+=", 1, AdditionalExtensions),
    DestinationTranslatedPort = toint(extract("destinationTranslatedPort=(.*?)\\s\\w+=", 1, AdditionalExtensions)),
    CommunicationDirection = extract("deviceDirection=(.*?)\\s\\w+=", 1, AdditionalExtensions),
    DeviceDnsDomain = extract("deviceDnsDomain=(.*?)\\s\\w+=", 1, AdditionalExtensions),
    deviceExternalId = toint(extract("deviceExternalId=(.*?)\\s\\w+=", 1, AdditionalExtensions)),
    DeviceFacility = extract("deviceFacility=(.*?)\\s\\w+=", 1, AdditionalExtensions),
    DeviceInboundInterface = extract("deviceInboundInterface=(.*?)\\s\\w+=", 1, AdditionalExtensions),
    DeviceNtDomain = extract("deviceNtDomain=(.*?)\\s\\w+=", 1, AdditionalExtensions),
    DeviceOutboundInterface = extract("deviceOutboundInterface=(.*?)\\s\\w+=", 1, AdditionalExtensions),
    DevicePayloadId = extract("devicePayloadId=(.*?)\\s\\w+=", 1, AdditionalExtensions),
    ProcessName = extract("deviceProcessName=(.*?)\\s\\w+=", 1, AdditionalExtensions),
    DeviceTranslatedAddress = extract("deviceTranslatedAddress=(.*?)\\s\\w+=", 1, AdditionalExtensions),
    DestinationHostName = extract("dhost=(.*?)\\s\\w+=", 1, AdditionalExtensions),
    DestinationMacAddress = extract("dmac=(.*?)\\s\\w+=", 1, AdditionalExtensions),
    DestinationNTDomain = extract("dntdom=(.*?)\\s\\w+=", 1, AdditionalExtensions),
    DestinationProcessId = toint(extract("dpid=(.*?)\\s\\w+=", 1, AdditionalExtensions)),
    DestinationUserPrivileges = extract("dpriv=(.*?)\\s\\w+=", 1, AdditionalExtensions),
    DestinationProcessName = extract("dproc=(.*?)\\s\\w+=", 1, AdditionalExtensions),
    DestinationPort = toint(extract("dpt=(.*?)\\s\\w+=", 1, AdditionalExtensions)),
    DestinationIP = extract("dst=(.*?)\\s\\w+=", 1, AdditionalExtensions),
    DeviceTimeZone = extract("dtz=(.*?)\\s\\w+=", 1, AdditionalExtensions),
    DestinationUserId = extract("duid=(.*?)\\s\\w+=", 1, AdditionalExtensions),
    DestinationUserName = extract("duser=(.*?)\\s\\w+=", 1, AdditionalExtensions),
    DeviceAddress = extract("dvc=(.*?)\\s\\w+=", 1, AdditionalExtensions),
    DeviceName = extract("dvchost=(.*?)\\s\\w+=", 1, AdditionalExtensions),
    DeviceMacAddress = extract("dvcmac=(.*?)\\s\\w+=", 1, AdditionalExtensions),
    ProcessID = toint(extract("dvcpid=(.*?)\\s\\w+=", 1, AdditionalExtensions)),
    ExternalID = toint(extract("externalId=(.*?)\\s\\w+=", 1, AdditionalExtensions)),
    FileCreateTime = extract("fileCreateTime=(.*?)\\s\\w+=", 1, AdditionalExtensions),
    FileHash = extract("fileHash=(.*?)\\s\\w+=", 1, AdditionalExtensions),
    FileID = extract("fileId=(.*?)\\s\\w+=", 1, AdditionalExtensions),
    FileModificationTime = extract("fileModificationTime=(.*?)\\s\\w+=", 1, AdditionalExtensions),
    FilePath = extract("filePath=(.*?)\\s\\w+=", 1, AdditionalExtensions),
    FilePermission = extract("filePermission=(.*?)\\s\\w+=", 1, AdditionalExtensions),
    FileType = extract("fileType=(.*?)\\s\\w+=", 1, AdditionalExtensions),
    FileName = extract("fname=(.*?)\\s\\w+=", 1, AdditionalExtensions),
    FileSize = toint(extract("fsize=(.*?)\\s\\w+=", 1, AdditionalExtensions)),
    Computer = extract("Host=(.*?)\\s\\w+=", 1, AdditionalExtensions),
    ReceivedBytes = tolong(extract("in=(.*?)\\s\\w+=", 1, AdditionalExtensions)),
    Message = extract("msg=(.*?)\\s\\w+=", 1, AdditionalExtensions),
    OldFileCreateTime = extract("oldFileCreateTime=(.*?)\\s\\w+=", 1, AdditionalExtensions),
    OldFileHash = extract("oldFileHash=(.*?)\\s\\w+=", 1, AdditionalExtensions),
    OldFileId = extract("oldFileId=(.*?)\\s\\w+=", 1, AdditionalExtensions),
    OldFileModificationTime = extract("oldFileModificationTime=(.*?)\\s\\w+=", 1, AdditionalExtensions),
    OldFileName = extract("oldFileName=(.*?)\\s\\w+=", 1, AdditionalExtensions),
    OldFilePath = extract("oldFilePath=(.*?)\\s\\w+=", 1, AdditionalExtensions),
    OldFilePermission = extract("oldFilePermission=(.*?)\\s\\w+=", 1, AdditionalExtensions),
    OldFileSize = toint(extract("oldFileSize=(.*?)\\s\\w+=", 1, AdditionalExtensions)),
    OldFileType = extract("oldFileType=(.*?)\\s\\w+=", 1, AdditionalExtensions),
    SentBytes = tolong(extract("out=(.*?)\\s\\w+=", 1, AdditionalExtensions)),
    EventOutcome = extract("outcome=(.*?)\\s\\w+=", 1, AdditionalExtensions),
    Protocol = extract("proto=(.*?)\\s\\w+=", 1, AdditionalExtensions),
    Reason = extract("reason=(.*?)\\s\\w+=", 1, AdditionalExtensions),
    RequestURL = extract("Request=(.*?)\\s\\w+=", 1, AdditionalExtensions),
    RequestClientApplication = extract("requestClientApplication=(.*?)\\s\\w+=", 1, AdditionalExtensions),
    RequestContext = extract("requestContext=(.*?)\\s\\w+=", 1, AdditionalExtensions),
    RequestCookies = extract("requestCookies=(.*?)\\s\\w+=", 1, AdditionalExtensions),
    RequestMethod = extract("requestMethod=(.*?)\\s\\w+=", 1, AdditionalExtensions),
    ReceiptTime = extract("rt=(.*?)\\s\\w+=", 1, AdditionalExtensions),
    SourceHostName = extract("shost=(.*?)\\s\\w+=", 1, AdditionalExtensions),
    SourceMacAddress = extract("smac=(.*?)\\s\\w+=", 1, AdditionalExtensions),
    SourceNTDomain = extract("sntdom=(.*?)\\s\\w+=", 1, AdditionalExtensions),
    SourceDnsDomain = extract("sourceDnsDomain=(.*?)\\s\\w+=", 1, AdditionalExtensions),
    SourceServiceName = extract("sourceServiceName=(.*?)\\s\\w+=", 1, AdditionalExtensions),
    SourceTranslatedAddress = extract("sourceTranslatedAddress=(.*?)\\s\\w+=", 1, AdditionalExtensions),
    SourceTranslatedPort = toint(extract("sourceTranslatedPort=(.*?)\\s\\w+=", 1, AdditionalExtensions)),
    SourceProcessId = toint(extract("spid=(.*?)\\s\\w+=", 1, AdditionalExtensions)),
    SourceUserPrivileges = extract("spriv=(.*?)\\s\\w+=", 1, AdditionalExtensions),
    SourceProcessName = extract("sproc=(.*?)\\s\\w+=", 1, AdditionalExtensions),
    SourcePort = toint(extract("spt=(.*?)\\s\\w+=", 1, AdditionalExtensions)),
    SourceIP = extract("src=(.*?)\\s\\w+=", 1, AdditionalExtensions),
    SourceUserID = extract("suid=(.*?)\\s\\w+=", 1, AdditionalExtensions),
    SourceUserName = extract("suser=(.*?)\\s\\w+=", 1, AdditionalExtensions),
    EventType = toint(extract("type=(.*?)\\s\\w+=", 1, AdditionalExtensions))
// Extract custom fields
| extend
    DeviceCustomString1 = extract("cs1=(.*?)\\s\\w+=", 1, AdditionalExtensions),
    DeviceCustomString1Label = extract("cs1Label=(.*?)\\s\\w+=", 1, AdditionalExtensions),
    DeviceCustomString2 = extract("cs2=(.*?)\\s\\w+=", 1, AdditionalExtensions),
    DeviceCustomString2Label = extract("cs2Label=(.*?)\\s\\w+=", 1, AdditionalExtensions),
    DeviceCustomString3 = extract("cs3=(.*?)\\s\\w+=", 1, AdditionalExtensions),
    DeviceCustomString3Label = extract("cs3Label=(.*?)\\s\\w+=", 1, AdditionalExtensions),
    DeviceCustomString4 = extract("cs4=(.*?)\\s\\w+=", 1, AdditionalExtensions),
    DeviceCustomString4Label = extract("cs4Label=(.*?)\\s\\w+=", 1, AdditionalExtensions),
    DeviceCustomString5 = extract("cs5=(.*?)\\s\\w+=", 1, AdditionalExtensions),
    DeviceCustomString5Label = extract("cs5Label=(.*?)\\s\\w+=", 1, AdditionalExtensions),
    DeviceCustomString6 = extract("cs6=(.*?)\\s\\w+=", 1, AdditionalExtensions),
    DeviceCustomString6Label = extract("cs6Label=(.*?)\\s\\w+=", 1, AdditionalExtensions),
    DeviceCustomNumber1 = toint(extract("cn1=(.*?)\\s\\w+=", 1, AdditionalExtensions)),
    DeviceCustomNumber1Label = extract("cn1Label=(.*?)\\s\\w+=", 1, AdditionalExtensions),
    DeviceCustomNumber2 = toint(extract("cn2=(.*?)\\s\\w+=", 1, AdditionalExtensions)),
    DeviceCustomNumber2Label = extract("cn2Label=(.*?)\\s\\w+=", 1, AdditionalExtensions),
    DeviceCustomNumber3 = toint(extract("cn3=(.*?)\\s\\w+=", 1, AdditionalExtensions)),
    DeviceCustomNumber3Label = extract("cn3Label=(.*?)\\s\\w+=", 1, AdditionalExtensions),
    FlexString1 = extract("flexString1=(.*?)\\s\\w+=", 1, AdditionalExtensions),
    FlexString1Label = extract("flexString1Label=(.*?)\\s\\w+=", 1, AdditionalExtensions),
    FlexString2 = extract("flexString2=(.*?)\\s\\w+=", 1, AdditionalExtensions),
    FlexString2Label = extract("flexString2Label=(.*?)\\s\\w+=", 1, AdditionalExtensions),
    DeviceCustomIPv6Address1 = extract("c6a1=(.*?)\\s\\w+=", 1, AdditionalExtensions),
    DeviceCustomIPv6Address1Label = extract("c6a1Label=(.*?)\\s\\w+=", 1, AdditionalExtensions),
    DeviceCustomIPv6Address2 = extract("c6a2=(.*?)\\s\\w+=", 1, AdditionalExtensions),
    DeviceCustomIPv6Address2Label = extract("c6a2Label=(.*?)\\s\\w+=", 1, AdditionalExtensions),
    DeviceCustomIPv6Address3 = extract("c6a3=(.*?)\\s\\w+=", 1, AdditionalExtensions),
    DeviceCustomIPv6Address3Label = extract("c6a3Label=(.*?)\\s\\w+=", 1, AdditionalExtensions),
    DeviceCustomIPv6Address4 = extract("c6a4=(.*?)\\s\\w+=", 1, AdditionalExtensions),
    DeviceCustomIPv6Address4Label = extract("c6a4Label=(.*?)\\s\\w+=", 1, AdditionalExtensions),
    DeviceCustomFloatingPoint1 = toreal(extract("cfp1=(.*?)\\s\\w+=", 1, AdditionalExtensions)),
    deviceCustomFloatingPoint1Label = extract("cfp1Label=(.*?)\\s\\w+=", 1, AdditionalExtensions),
    DeviceCustomFloatingPoint2 = toreal(extract("cfp2=(.*?)\\s\\w+=", 1, AdditionalExtensions)),
    deviceCustomFloatingPoint2Label = extract("cfp2Label=(.*?)\\s\\w+=", 1, AdditionalExtensions),
    DeviceCustomFloatingPoint3 = toreal(extract("cfp3=(.*?)\\s\\w+=", 1, AdditionalExtensions)),
    deviceCustomFloatingPoint3Label = extract("cfp3Label=(.*?)\\s\\w+=", 1, AdditionalExtensions),
    DeviceCustomFloatingPoint4 = toreal(extract("cfp4=(.*?)\\s\\w+=", 1, AdditionalExtensions)),
    deviceCustomFloatingPoint4Label = extract("cfp4Label=(.*?)\\s\\w+=", 1, AdditionalExtensions),
    DeviceCustomDate1 = extract("deviceCustomDate1=(.*?)\\s\\w+=", 1, AdditionalExtensions),
    DeviceCustomDate1Label = extract("deviceCustomDate1Label=(.*?)\\s\\w+=", 1, AdditionalExtensions),
    DeviceCustomDate2 = extract("deviceCustomDate2=(.*?)\\s\\w+=", 1, AdditionalExtensions),
    DeviceCustomDate2Label = extract("deviceCustomDate2Label=(.*?)\\s\\w+=", 1, AdditionalExtensions),
    FlexDate1 = extract("flexDate1=(.*?)\\s\\w+=", 1, AdditionalExtensions),
    FlexDate1Label = extract("flexDate1Label=(.*?)\\s\\w+=", 1, AdditionalExtensions),
    FlexNumber1 = toint(extract("flexNumber1=(.*?)\\s\\w+=", 1, AdditionalExtensions)),
    FlexNumber1Label = extract("flexNumber1Label=(.*?)\\s\\w+=", 1, AdditionalExtensions),
    FlexNumber2 = toint(extract("flexNumber2=(.*?)\\s\\w+=", 1, AdditionalExtensions)),
    FlexNumber2Label = extract("flexNumber2Label=(.*?)\\s\\w+=", 1, AdditionalExtensions)
| extend TimeGenerated = todatetime(ls_timestamp), Computer = tostring(host)
| project-away
    message,
    facility,
    facility_label,
    ls_version,
    priority,
    severity,
    severity_label,
    ['type'],
    CEF,
    ls_timestamp,
    host,
    tags

This is the transformation KQL which you can use in your Data Collection Rules.

Ingestion delay

Some people might question if such a big query doesn't result in a high ingestion delay. When testing this on multiple environments, I conclude on a ingestion delay of around 9 seconds.

CommonSecurityLog
| extend IngestionTime = ingestion_time()
| project TimeGenerated, IngestionTime
| extend TimeDifference = IngestionTime - TimeGenerated
| summarize avg(TimeDifference)

Specific field nuance

It is important to keep in mind that some security appliance vendors include specific key-value pairs not present in the CEF standard. For these key-value pairs you will have to write extra extraction logics that fit in the correct CommonSecurityLog columns.

CheckPoint example

aggregated_log_count=4 browse_time=0 hll_key=13140258

PaloAlto example

PanOSTunnelType=N/A PanOSThreatCategory=N/A PanOSContentVer=WildFire-0 PanOSAssocID=0

Order of CEF headers

As discussed earlier, the CEF messages contain of 8 fields. Unfortunately some vendors changes the position of these fields, meaning you will have to double check the above KQL if the below sequence is in fact correct for you.

CheckPoint example log

CEF:0|Check Point|URL Filtering|Check Point|Reject|custom_hacking_20160912|Unknown|act=Reject app=HTTPS deviceDirection=0 

For above log you would need the following sequence:

// Normalize to CommonSecurityLog schema
| parse message with CEF: string 
    "|"     DeviceVendor: string
    "|"     DeviceEventClassID: string
    "|"     DeviceProduct: string 
    "|"     DeviceVersion: string
    "|"     Activity: string 
    "|"     LogSeverity: string
    "|"     AdditionalExtensions: string

PaloAlto example log

CEF:0|Palo Alto Networks|PAN-OS|10.2.9-h1|wildfire|THREAT|1| deviceExternalId=0231019336 src=10.169.162.135 

For above log you would need the following sequence:

// Normalize to CommonSecurityLog schema
| parse message with CEF: string 
    "|"     DeviceVendor: string 
    "|"     DeviceProduct: string 
    "|"     DeviceVersion: string
    "|"     DeviceEventClassID: string
    "|"     Activity: string 
    "|"     LogSeverity: string
    "|"     AdditionalExtensions: string

Creating the DCR via the portal

To be totally transparent, I am not sure if this is the best way of creating a DCR, but it is the way I do it when I need to create a DCR via the Azure Portal:

  1. Go to the Log Analytics workspace and create a new custom log that is DCR-based
  1. Define a table name, data collection rule name, and data collection endpoint name
💡
This will eventually create a new custom table in your Log Analytics workspace. Normally you do not need this table, which is why we will delete it afterwards.
  1. Upload the sample file and add the transformation of your data source.
  1. Go to the DCR you created an navigate to export template > deploy
  1. Now click on ‘Edit template’ and change the outputstream to ‘Microsoft-CommonSecurityLog’. This will change the destination table to the CommonSecurityLog table instead of the custom table we created earlier. Optionally, you can also change the workspace to something like ‘myworkspace’.
  1. Save the template, and redeploy the template under the same or another name.
  2. Optionally, you can now delete the custom table that was created after saving the DCR.

Errors

💡
When your output scheme is not correct according to the CommonSecurityLog table, you can get errors like the one below.