
Product
Introducing Socket Fix for Safe, Automated Dependency Upgrades
Automatically fix and test dependency updates with socket fix—a new CLI tool that turns CVE alerts into safe, automated upgrades.
This project was slated to be shut down but after response from project users, version 1.0.2 will remain on pypi and the project will be left public for a while to allow for additional feedback/development volunteers. Want to volunteer to make this project better, hit me up at support@fromthefuture.net.
This library has stopped testing backwards compatibility with Python 2. All future updates and development will be on Python 3 only.
To install the library:
pip install splunk-hec-ftf
To use:
from splunk_hec import splunk_hec
This HEC library was created to allow the following:
There are other libraries out there but I noted a few pain points among them so when I wrote this library, I attempted to make sure the following would be true:
And, to that end, I believe I have succeeded here. This library has successfully supported the following applications:
All told, this library is a critical part of my infrastructure handling multiple TBs of daily thruput without issue.
These packages are used in some capacity and requests is the only external package currently leveraged.
Argument | Description | Required | Defaults |
---|---|---|---|
token | The HEC token for communication with your HEC layer | True | None |
hec_server | FQDN for the HEC server or load balancer | True | None |
hec_port | Port to use with HEC | False | 8088 |
input_type | Defines which endpoint to hit. 'raw' or 'json' | False | 'raw' |
use_hec_tls | Turns on use of TLS to send data. | False | True |
use_ack | Turns on indexer acknowledgement. | False | False |
hec_tls_verify | Turns on TLS verification. | False | False |
ack_interval | The number of seconds to wait between ACK attempts. | False | 5 |
max_ack_attempts | The number of attempts to make to SEND data and also to ACK data. | False | 5 |
max_content_length | The maximum estimated content length in bytes. Newer versions of Splunk can accept up to 800MB. | False | 50000000 |
max_events_per_batch | The maximum number of events to send in a single request. | False | 100000 |
backup_queue | The backup queue to store failures on. | False | None |
context_id | Used for logging, particularly in AWS Lambda invocations, to differentiate where logs came from. | False | 'default' |
rotate_session_after | The maximum number of POST requests to make to the HEC before starting a new session. This ensures we don't stick to a single indexer behind an LB | False | 10 |
debug_enabled | Allows the calling application to turn on DEBUG logging. | False | False |
logger | Allows the calling application to pass in a logger object. Log messages are simply printed if not provided. | False | None |
max_threads | The maximum number of threads to use. 0 for no threading. | False | 0 |
disable_tls_validation_warnings | Turn off TLS Validation warnings. | False | True |
max_ack_attempts :: This setting controls the number of attempts to make to send the data in addition to how many times you should ACK the data. In other words if ACK is enabled and this is 5. The library will make 5 attempts to send the data and for each send, 5 attempts will be made to confirm delivery.
backup_queue :: This is not required but highly recommended. If you do not provide it, failures will not be tracked or given back to the calling application in any way.
Most of the libraries out there support threading for delivery to HEC but they suffer from a simple problem: only the data delivery is actually threaded. What I mean is that data comes into the script/library and is parsed, added to a batch, and only when that batch is full is it put on a queue for subthreads to process.
This threading is helpful if you are:
In either of these situations, threading will help. In my serverless applications, this threading would not be of use as almost all data is sent in a single request to the backend. (i.e. I get 10,000 events in a serverless invocation, all 10,000 are processed and sent in a single batch to the backend.)
Point is - threading is something to consider carefully and here are the guidelines I would provide relative to this library:
I found that under certain conditions, leveraging threading in serverless compute was worthwhile. Specifically, when cross-region transfer of data is involved. When the lambda sits in one region (say, ap-northeast-1) and the receiver endpoint sits in another region (say, us-east-1) - you can gain execution speed by threading because of the additional data upload times involved.
Anyway, use your best judgment.
Remember earlier when I said ACK was supported? It is supported but it's supported in a way that adds quite a lot of overhead. This is the data flow:
Data -> Receiving App -> HEC Library -> POST to HEC -> ACK Test -> ACK Test -> ACK Test -> Confirmed
If you consider how the Splunk Universal Forwarder works, data gets sent and then shoved into a memory queue while additional data is sent AND ACK checks are happening. THIS IS NOT HOW IT WORKS HERE. Everything happens in sequential order here - I send the data and then immediately attempt to ACK that single block of data. No new data will be sent during this time.
To add some clarity around the process, it works approximately like this:
In practice, what does this mean? Well, the use of all of these retry mechanisms adds to the overall duration... a lot. Assume ACK is enabled with an ACK interval of 5 with 5 max_ack_attempts.
Odds are that if data was sent successfully and the backend indexers are not overburdened, ACK will also be successful within 1-2 attempts but for serverless environments where duration translates directly to cost, it's worth considering the above information before enabling ACK.
The library accepts a "backup queue" to be based in. It looks like this in code:
from splunk_hec import splunk_hec
import queue
splhec = splunk_hec(token=hec_token, hec_server=hec_server, hec_port=hec_port, input_type='json', use_hec_tls=True,
use_ack=hec_ack, hec_tls_verify=False, max_ack_attempts=hec_ack_attempts, max_content_length=hec_max_content_length,
max_events_per_batch=hec_batch_size )
# Note, this can be set directly or passed in during initilization... Depends on your use case.
splhec.backup_queue = queue.Queue(0)
# Simplified event processing
for event in events:
splhec.send_event(event)
# Check the backup queue
if splhec.backup_queue.empty():
print('Queue is empty, everything worked!)
else:
raise RuntimeError('Failures detected. Implement backup routine here.')
Basically, this gives the calling application control over how to handle detected delivery failures. Here are some ways to use this:
from splunk_hec import splunk_hec
import queue
# Create an object reference to the library, initalized with our settings.
splhec = splunk_hec( token=hec_token, hec_server=hec_server, hec_port=hec_port, input_type=input_type, use_hec_tls=use_hec_tls, use_ack=use_ack, hec_tls_verify=hec_tls_verify, ack_interval=ack_interval,
max_ack_attempts=max_ack_attempts, max_content_length=max_content_length, max_events_per_batch=max_events_per_batch, context_id=context_id, rotate_session_after=rotate_session_after, debug_enabled=debug_enabled,
max_threads=max_threads, disable_tls_validation_warnings=disable_tls_validation_warnings )
# Set up a backup queue
splhec.backup_queue = queue.Queue(0)
# Set basic parameters that we'll need later
index = 'main'
sourcetype = 'hec:%s' % str(context_id)
source = 'hec:test:events'
if 'raw' in input_type:
# When using raw input, we do this to essentially add the parameters to the POST URL so splunk knows where to send the data
splhec.set_request_params({'index':index, 'sourcetype':sourcetype, 'source':source})
# This comes from a test function so we're looping through to generate data.
for i in range(num_test_events):
# If you are using the 'json' input type, you need to format this for the /event endpoint. Follow
# splunk docs on this. This will preformat the data and allows for index-time field extraction.
if 'json' in input_type:
payload = {}
payload['event'] = '{"message": "JSON test Message %s.", "foo": "bar"}' % str(i)
payload['time'] = int(datetime.utcnow().strftime('%s'))
payload['source'] = source
payload['index'] = index
payload['sourcetype'] = sourcetype
payload = json.dumps(payload)
# If it's a raw payload, not much is necessary here. Make sure you have index-time props in place to parse the data.
else:
payload = '%s RAW Test Message %s.' % (str(datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S -0000')),str(i))
# This should add the event to our batch; when the max batch sizes are met, the library will flush out to the HEC automatically
splhec.send_event(payload)
# Finally, we call this to signal that we are done. A final flush is performed. If you are using threads, they will be shut down.
# YOU MUST END WITH THIS.
splhec.stop_threads_and_processing()
#### Note: You may also call splhec.force_flush_events() directly to flush to HEC at any time in your calling application.
There is a directory of samples based on older iterations of some production code I am running. All of these samples are provided as-is with no guarantee that they will be updated/maintained. And while I have made every effort to ensure backwards compatibility through all the library updates over time, I make no guarantee that a future update would break this sample code. If you come to rely on this in a production manner, reach out to me and let's share notes.
FAQs
A library for sending data to the Splunk HTTP Event Collector.
We found that splunk-hec-ftf demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Product
Automatically fix and test dependency updates with socket fix—a new CLI tool that turns CVE alerts into safe, automated upgrades.
Security News
CISA denies CVE funding issues amid backlash over a new CVE foundation formed by board members, raising concerns about transparency and program governance.
Product
We’re excited to announce a powerful new capability in Socket: historical data and enhanced analytics.