Notice
This project was slated to be shut down but after response from project users, version 1.0.2 will remain on pypi and the project
will be left public for a while to allow for additional feedback/development volunteers. Want to volunteer to make this project
better, hit me up at support@fromthefuture.net.
Note on python versions
This library has stopped testing backwards compatibility with Python 2. All future updates and development
will be on Python 3 only.
Installation and Usage
To install the library:
pip install splunk-hec-ftf
To use:
from splunk_hec import splunk_hec
Changelog
1.0.3
- In the Retry() logic of the library, I was using a deprecated argument recently removed from the urllib3 package. In order to provide backwards compatibility while supporting the newer libraries, this now does a try/except when creating the Retry object capturing both versions of the argument.
1.0.2
- Added target_host to logging when errors occur
- Added processing for HTTP 400 responses due to invalid events to remove those events before attempting a resend. (non-ack flows only)
Library Overview
This HEC library was created to allow the following:
- Provide a consistent and stable method to send logs to a remote HTTP Event Collector
There are other libraries out there but I noted a few pain points among them so when I wrote this library,
I attempted to make sure the following would be true:
- Provide support for Indexer Acknowledgement (ACK)
- Provide more control to the calling/importing application
- Perform minimal data formatting
- Support real-world applications
And, to that end, I believe I have succeeded here. This library has successfully supported the following
applications:
- Serverless functions (GCP, Azure, and AWS)
- Syslog-ng program()
- Manual data import programs
All told, this library is a critical part of my infrastructure handling multiple TBs of daily thruput without issue.
Library Requirements
These packages are used in some capacity and requests is the only external package currently leveraged.
- requests
- json
- uuid
- time
- queue
- threading
Library init Arguments
| token | The HEC token for communication with your HEC layer | True | None |
| hec_server | FQDN for the HEC server or load balancer | True | None |
| hec_port | Port to use with HEC | False | 8088 |
| input_type | Defines which endpoint to hit. 'raw' or 'json' | False | 'raw' |
| use_hec_tls | Turns on use of TLS to send data. | False | True |
| use_ack | Turns on indexer acknowledgement. | False | False |
| hec_tls_verify | Turns on TLS verification. | False | False |
| ack_interval | The number of seconds to wait between ACK attempts. | False | 5 |
| max_ack_attempts | The number of attempts to make to SEND data and also to ACK data. | False | 5 |
| max_content_length | The maximum estimated content length in bytes. Newer versions of Splunk can accept up to 800MB. | False | 50000000 |
| max_events_per_batch | The maximum number of events to send in a single request. | False | 100000 |
| backup_queue | The backup queue to store failures on. | False | None |
| context_id | Used for logging, particularly in AWS Lambda invocations, to differentiate where logs came from. | False | 'default' |
| rotate_session_after | The maximum number of POST requests to make to the HEC before starting a new session. This ensures we don't stick to a single indexer behind an LB | False | 10 |
| debug_enabled | Allows the calling application to turn on DEBUG logging. | False | False |
| logger | Allows the calling application to pass in a logger object. Log messages are simply printed if not provided. | False | None |
| max_threads | The maximum number of threads to use. 0 for no threading. | False | 0 |
| disable_tls_validation_warnings | Turn off TLS Validation warnings. | False | True |
max_ack_attempts :: This setting controls the number of attempts to make to send the data in addition to how many times you
should ACK the data. In other words if ACK is enabled and this is 5. The library will make 5 attempts to send the data and
for each send, 5 attempts will be made to confirm delivery.
backup_queue :: This is not required but highly recommended. If you do not provide it, failures will not be tracked or given back to
the calling application in any way.
Threading
Most of the libraries out there support threading for delivery to HEC but they suffer from a simple problem: only
the data delivery is actually threaded. What I mean is that data comes into the script/library and is parsed, added to
a batch, and only when that batch is full is it put on a queue for subthreads to process.
This threading is helpful if you are:
- Using small batch sizes requiring numerous connections
- Using ACK
In either of these situations, threading will help. In my serverless applications, this threading would not be of use
as almost all data is sent in a single request to the backend. (i.e. I get 10,000 events in a serverless invocation, all 10,000
are processed and sent in a single batch to the backend.)
Point is - threading is something to consider carefully and here are the guidelines I would provide relative to this library:
- If you need to speed up data processing, write threading into your data processor and call this library in the non-threading mode.
- If you need to speed up data delivery/ACK, use the threading mode available in this library.
Serverless Considerations
I found that under certain conditions, leveraging threading in serverless compute was worthwhile. Specifically, when cross-region transfer of data is involved. When the lambda sits in one region (say, ap-northeast-1) and the receiver endpoint sits in another region (say, us-east-1) - you can gain execution speed by threading because of the additional data upload times involved.
Anyway, use your best judgment.
Indexer ACK
Remember earlier when I said ACK was supported? It is supported but it's supported in a way that adds quite a lot of overhead. This is the
data flow:
Data -> Receiving App -> HEC Library -> POST to HEC -> ACK Test -> ACK Test -> ACK Test -> Confirmed
If you consider how the Splunk Universal Forwarder works, data gets sent and then shoved into a memory queue while additional data is sent AND
ACK checks are happening. THIS IS NOT HOW IT WORKS HERE. Everything happens in sequential order here - I send the data and then immediately
attempt to ACK that single block of data. No new data will be sent during this time.
- In a multi-threaded approach, data will continue to queue for sending to the maximum limits of your available memory.
- In the non-threaded approach, processing will block until ACK is confirmed or fails. The number of attempts is configurable.
Library Delivery Logic
To add some clarity around the process, it works approximately like this:
- Data is added to a batch list.
- When the list meets max_events_per_batch or max_content_length, it is sent to the HEC
- The send process has two retry mechanisms
- One mechanism is the urllib3 Retry mechanism which will attempt to call the POST request up to 3 times for connection failures, read failures, redirections, and 500/503 errors.
- The second mechanism considers this 1 attempt and will try again if anything other than an HTTP 200 is received up to max_ack_attempts times.
- The HTTP return is checked for code 200.
- If it's a 200, the data is considered sent
- Anything else triggers a retry
- If ACK is enabled:
- ACK is attempted for the data block up to max_ack_attempts using the same session object that sent it (ensuring you hit the same backend through a Load Balancer).
- The urllib3 Retry mechanism described previously is utilized here as well.
- Additional sleep timing is added based on the defined ack_interval
- If ACK is successful, we are done.
- If ACK was unsuccessful, we go back to step 3 and try this all again.
In practice, what does this mean? Well, the use of all of these retry mechanisms adds to the overall duration... a lot. Assume ACK is enabled with an ACK interval of 5 with 5 max_ack_attempts.
- Before any ACK is attempted, we sleep for 2.5 seconds.
- After this, we will attempt an ACK and sleep for 5 seconds between each request adding up to 25 seconds to the duration.
- The urllib3 Retry logic using a backoff of 0.3 and max of 3 connection attempts which can add up to 2.1 seconds per request.
Odds are that if data was sent successfully and the backend indexers are not overburdened, ACK will also be successful within 1-2 attempts but for serverless environments where duration
translates directly to cost, it's worth considering the above information before enabling ACK.
Using the Backup Queue
The library accepts a "backup queue" to be based in. It looks like this in code:
from splunk_hec import splunk_hec
import queue
splhec = splunk_hec(token=hec_token, hec_server=hec_server, hec_port=hec_port, input_type='json', use_hec_tls=True,
use_ack=hec_ack, hec_tls_verify=False, max_ack_attempts=hec_ack_attempts, max_content_length=hec_max_content_length,
max_events_per_batch=hec_batch_size )
# Note, this can be set directly or passed in during initilization... Depends on your use case.
splhec.backup_queue = queue.Queue(0)
# Simplified event processing
for event in events:
splhec.send_event(event)
# Check the backup queue
if splhec.backup_queue.empty():
print('Queue is empty, everything worked!)
else:
raise RuntimeError('Failures detected. Implement backup routine here.')
Basically, this gives the calling application control over how to handle detected delivery failures. Here are some ways to use this:
- In serverless environments, I often raise an exception to force the data message to be processed again.
- In syslog-ng environments, the failed data is written to disk for monitoring by Splunk UF.
Sample Usage
from splunk_hec import splunk_hec
import queue
# Create an object reference to the library, initalized with our settings.
splhec = splunk_hec( token=hec_token, hec_server=hec_server, hec_port=hec_port, input_type=input_type, use_hec_tls=use_hec_tls, use_ack=use_ack, hec_tls_verify=hec_tls_verify, ack_interval=ack_interval,
max_ack_attempts=max_ack_attempts, max_content_length=max_content_length, max_events_per_batch=max_events_per_batch, context_id=context_id, rotate_session_after=rotate_session_after, debug_enabled=debug_enabled,
max_threads=max_threads, disable_tls_validation_warnings=disable_tls_validation_warnings )
# Set up a backup queue
splhec.backup_queue = queue.Queue(0)
# Set basic parameters that we'll need later
index = 'main'
sourcetype = 'hec:%s' % str(context_id)
source = 'hec:test:events'
if 'raw' in input_type:
# When using raw input, we do this to essentially add the parameters to the POST URL so splunk knows where to send the data
splhec.set_request_params({'index':index, 'sourcetype':sourcetype, 'source':source})
# This comes from a test function so we're looping through to generate data.
for i in range(num_test_events):
# If you are using the 'json' input type, you need to format this for the /event endpoint. Follow
# splunk docs on this. This will preformat the data and allows for index-time field extraction.
if 'json' in input_type:
payload = {}
payload['event'] = '{"message": "JSON test Message %s.", "foo": "bar"}' % str(i)
payload['time'] = int(datetime.utcnow().strftime('%s'))
payload['source'] = source
payload['index'] = index
payload['sourcetype'] = sourcetype
payload = json.dumps(payload)
# If it's a raw payload, not much is necessary here. Make sure you have index-time props in place to parse the data.
else:
payload = '%s RAW Test Message %s.' % (str(datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S -0000')),str(i))
# This should add the event to our batch; when the max batch sizes are met, the library will flush out to the HEC automatically
splhec.send_event(payload)
# Finally, we call this to signal that we are done. A final flush is performed. If you are using threads, they will be shut down.
# YOU MUST END WITH THIS.
splhec.stop_threads_and_processing()
#### Note: You may also call splhec.force_flush_events() directly to flush to HEC at any time in your calling application.
Samples
There is a directory of samples based on older iterations of some production code I am running. All of these samples are provided as-is with
no guarantee that they will be updated/maintained. And while I have made every effort to ensure backwards compatibility through all the
library updates over time, I make no guarantee that a future update would break this sample code. If you come to rely on this in a production
manner, reach out to me and let's share notes.