Logstash input plugin for Azure Storage Blobs
Summary
This plugin reads and parses data from Azure Storage Blobs.
Installation
You can install this plugin using the Logstash "plugin" or "logstash-plugin" (for newer versions of Logstash) command:
logstash-plugin install logstash-input-azureblob
For more information, see Logstash reference Working with plugins.
Configuration
Required Parameters
storage_account_name
The storage account name.
storage_access_key
The access key to the storage account.
container
The blob container name.
Optional Parameters
path_filters
The path(s) to the file(s) to use as an input. By default it will watch every files in the storage container. You can use filename patterns here, such as logs/*.log
. If you use a pattern like logs/**/*.log
, a recursive search of logs
will be done for all *.log
files.
Do not include a leading /
, as Azure path look like this: path/to/blob/file.txt
You may also configure multiple paths. See an example on the Logstash configuration page.
endpoint
Specifies the endpoint of Azure Service Management. The default value is core.windows.net
.
registry_path
Specifies the file path for the registry file to record offsets and coordinate between multiple clients. The default value is data/registry
.
Overwrite this value when there happen to be a file at the path of data/registry
in the azure blob container.
interval
Set how many seconds to idle before checking for new logs. The default, 30
, means idle for 30
seconds.
registry_create_policy
Specifies the way to initially set offset for existing blob files.
This option only applies for registry creation.
Valid values include:
The default, resume
, means when the registry is initially created, it assumes all blob has been consumed and it will start to pick up any new content in the blobs.
When set to start_over
, it assumes none of the blob is consumed and it will read all blob files from begining.
Offsets will be picked up from registry file whenever it exists.
file_head_bytes
Specifies the header of the file in bytes that does not repeat over records. Usually, these are json opening tags. The default value is 0
.
file_tail_bytes
Specifies the tail of the file that does not repeat over records. Usually, these are json closing tags. The defaul tvalue is 0
.
Advanced tweaking parameters
Keep these parameters default to use under normal situration. Tweak these parameters when dealing with large scale azure blobs and logs.
blob_list_page_size
Specifies the page-size for returned blob items. Too big number will hit heap overflow; Too small number will leads to too many requests. The default of 100
is good for heap size of 1G.
file_chunk_size_bytes
Specifies the buffer size used to download the blob content. This is also the maximum buffer size that will be passed to a codec except for JSON. The JSON codec will only receive valid JSON that might span between multiple chunks. Any malformed JSON content will be skipped.
The default value is 4194304 (4MB)
Examples
input
{
azureblob
{
storage_account_name => "mystorageaccount"
storage_access_key => "VGhpcyBpcyBhIGZha2Uga2V5Lg=="
container => "mycontainer"
}
}
input {
azureblob
{
storage_account_name => 'mystorageaccount'
storage_access_key => 'VGhpcyBpcyBhIGZha2Uga2V5Lg=='
container => 'wad-iis-logfiles'
codec => line
}
}
filter {
if [message] =~ "^#" {
drop {}
}
grok {
match => ["message", "%{TIMESTAMP_ISO8601:log_timestamp} %{WORD:sitename} %{WORD:computername} %{IP:server_ip} %{WORD:method} %{URIPATH:uriStem} %{NOTSPACE:uriQuery} %{NUMBER:port} %{NOTSPACE:username} %{IPORHOST:clientIP} %{NOTSPACE:protocolVersion} %{NOTSPACE:userAgent} %{NOTSPACE:cookie} %{NOTSPACE:referer} %{NOTSPACE:requestHost} %{NUMBER:response} %{NUMBER:subresponse} %{NUMBER:win32response} %{NUMBER:bytesSent} %{NUMBER:bytesReceived} %{NUMBER:timetaken}"]
}
date {
match => [ "log_timestamp", "YYYY-MM-dd HH:mm:ss" ]
timezone => "Etc/UTC"
}
if [bytesSent] {
ruby {
code => "event.set('kilobytesSent', event.get('bytesSent').to_i / 1024.0)"
}
}
if [bytesReceived] {
ruby {
code => "event.set('kilobytesReceived', event.get('bytesReceived').to_i / 1024.0 )"
}
}
mutate {
convert => ["bytesSent", "integer"]
convert => ["bytesReceived", "integer"]
convert => ["timetaken", "integer"]
add_field => { "clientHostname" => "%{clientIP}" }
remove_field => [ "log_timestamp"]
}
dns {
action => "replace"
reverse => ["clientHostname"]
}
useragent {
source=> "useragent"
prefix=> "browser"
}
}
output {
file {
path => '/var/tmp/logstash-file-output'
codec => rubydebug
}
stdout {
codec => rubydebug
}
}
input {
azureblob
{
storage_account_name => "mystorageaccount"
storage_access_key => "VGhpcyBpcyBhIGZha2Uga2V5Lg=="
container => "insights-logs-networksecuritygroupflowevent"
codec => "json"
file_head_bytes => 21
file_tail_bytes => 9
}
}
filter {
split { field => "[records]" }
split { field => "[records][properties][flows]"}
split { field => "[records][properties][flows][flows]"}
split { field => "[records][properties][flows][flows][flowTuples]"}
mutate{
split => { "[records][resourceId]" => "/"}
add_field => {"Subscription" => "%{[records][resourceId][2]}"
"ResourceGroup" => "%{[records][resourceId][4]}"
"NetworkSecurityGroup" => "%{[records][resourceId][8]}"}
convert => {"Subscription" => "string"}
convert => {"ResourceGroup" => "string"}
convert => {"NetworkSecurityGroup" => "string"}
split => { "[records][properties][flows][flows][flowTuples]" => ","}
add_field => {
"unixtimestamp" => "%{[records][properties][flows][flows][flowTuples][0]}"
"srcIp" => "%{[records][properties][flows][flows][flowTuples][1]}"
"destIp" => "%{[records][properties][flows][flows][flowTuples][2]}"
"srcPort" => "%{[records][properties][flows][flows][flowTuples][3]}"
"destPort" => "%{[records][properties][flows][flows][flowTuples][4]}"
"protocol" => "%{[records][properties][flows][flows][flowTuples][5]}"
"trafficflow" => "%{[records][properties][flows][flows][flowTuples][6]}"
"traffic" => "%{[records][properties][flows][flows][flowTuples][7]}"
}
convert => {"unixtimestamp" => "integer"}
convert => {"srcPort" => "integer"}
convert => {"destPort" => "integer"}
}
date{
match => ["unixtimestamp" , "UNIX"]
}
}
output {
stdout { codec => rubydebug }
}
More information
The source code of this plugin is hosted in GitHub repo Microsoft Azure Diagnostics with ELK. We welcome you to provide feedback and/or contribute to the project.