🚀 Socket Launch Week 🚀 Day 5: Introducing Socket Fix.Learn More
Socket
Sign inDemoInstall
Socket

splunk-hec-ftf

Package Overview
Dependencies
Maintainers
1
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

splunk-hec-ftf

A library for sending data to the Splunk HTTP Event Collector.

1.0.3
PyPI
Maintainers
1

Notice

This project was slated to be shut down but after response from project users, version 1.0.2 will remain on pypi and the project will be left public for a while to allow for additional feedback/development volunteers. Want to volunteer to make this project better, hit me up at support@fromthefuture.net.

Note on python versions

This library has stopped testing backwards compatibility with Python 2. All future updates and development will be on Python 3 only.

Installation and Usage

To install the library:

pip install splunk-hec-ftf

To use:

from splunk_hec import splunk_hec

Changelog

1.0.3

  • In the Retry() logic of the library, I was using a deprecated argument recently removed from the urllib3 package. In order to provide backwards compatibility while supporting the newer libraries, this now does a try/except when creating the Retry object capturing both versions of the argument.

1.0.2

  • Added target_host to logging when errors occur
  • Added processing for HTTP 400 responses due to invalid events to remove those events before attempting a resend. (non-ack flows only)

Library Overview

This HEC library was created to allow the following:

  • Provide a consistent and stable method to send logs to a remote HTTP Event Collector

There are other libraries out there but I noted a few pain points among them so when I wrote this library, I attempted to make sure the following would be true:

  • Provide support for Indexer Acknowledgement (ACK)
  • Provide more control to the calling/importing application
  • Perform minimal data formatting
  • Support real-world applications

And, to that end, I believe I have succeeded here. This library has successfully supported the following applications:

  • Serverless functions (GCP, Azure, and AWS)
  • Syslog-ng program()
  • Manual data import programs

All told, this library is a critical part of my infrastructure handling multiple TBs of daily thruput without issue.

Library Requirements

These packages are used in some capacity and requests is the only external package currently leveraged.

  • requests
  • json
  • uuid
  • time
  • queue
  • threading

Library init Arguments

ArgumentDescriptionRequiredDefaults
tokenThe HEC token for communication with your HEC layerTrueNone
hec_serverFQDN for the HEC server or load balancerTrueNone
hec_portPort to use with HECFalse8088
input_typeDefines which endpoint to hit. 'raw' or 'json'False'raw'
use_hec_tlsTurns on use of TLS to send data.FalseTrue
use_ackTurns on indexer acknowledgement.FalseFalse
hec_tls_verifyTurns on TLS verification.FalseFalse
ack_intervalThe number of seconds to wait between ACK attempts.False5
max_ack_attemptsThe number of attempts to make to SEND data and also to ACK data.False5
max_content_lengthThe maximum estimated content length in bytes. Newer versions of Splunk can accept up to 800MB.False50000000
max_events_per_batchThe maximum number of events to send in a single request.False100000
backup_queueThe backup queue to store failures on.FalseNone
context_idUsed for logging, particularly in AWS Lambda invocations, to differentiate where logs came from.False'default'
rotate_session_afterThe maximum number of POST requests to make to the HEC before starting a new session. This ensures we don't stick to a single indexer behind an LBFalse10
debug_enabledAllows the calling application to turn on DEBUG logging.FalseFalse
loggerAllows the calling application to pass in a logger object. Log messages are simply printed if not provided.FalseNone
max_threadsThe maximum number of threads to use. 0 for no threading.False0
disable_tls_validation_warningsTurn off TLS Validation warnings.FalseTrue

max_ack_attempts :: This setting controls the number of attempts to make to send the data in addition to how many times you should ACK the data. In other words if ACK is enabled and this is 5. The library will make 5 attempts to send the data and for each send, 5 attempts will be made to confirm delivery.

backup_queue :: This is not required but highly recommended. If you do not provide it, failures will not be tracked or given back to the calling application in any way.

Threading

Most of the libraries out there support threading for delivery to HEC but they suffer from a simple problem: only the data delivery is actually threaded. What I mean is that data comes into the script/library and is parsed, added to a batch, and only when that batch is full is it put on a queue for subthreads to process.

This threading is helpful if you are:

  • Using small batch sizes requiring numerous connections
  • Using ACK

In either of these situations, threading will help. In my serverless applications, this threading would not be of use as almost all data is sent in a single request to the backend. (i.e. I get 10,000 events in a serverless invocation, all 10,000 are processed and sent in a single batch to the backend.)

Point is - threading is something to consider carefully and here are the guidelines I would provide relative to this library:

  • If you need to speed up data processing, write threading into your data processor and call this library in the non-threading mode.
  • If you need to speed up data delivery/ACK, use the threading mode available in this library.

Serverless Considerations

I found that under certain conditions, leveraging threading in serverless compute was worthwhile. Specifically, when cross-region transfer of data is involved. When the lambda sits in one region (say, ap-northeast-1) and the receiver endpoint sits in another region (say, us-east-1) - you can gain execution speed by threading because of the additional data upload times involved.

Anyway, use your best judgment.

Indexer ACK

Remember earlier when I said ACK was supported? It is supported but it's supported in a way that adds quite a lot of overhead. This is the data flow:

Data -> Receiving App -> HEC Library -> POST to HEC -> ACK Test -> ACK Test -> ACK Test -> Confirmed

If you consider how the Splunk Universal Forwarder works, data gets sent and then shoved into a memory queue while additional data is sent AND ACK checks are happening. THIS IS NOT HOW IT WORKS HERE. Everything happens in sequential order here - I send the data and then immediately attempt to ACK that single block of data. No new data will be sent during this time.

  • In a multi-threaded approach, data will continue to queue for sending to the maximum limits of your available memory.
  • In the non-threaded approach, processing will block until ACK is confirmed or fails. The number of attempts is configurable.

Library Delivery Logic

To add some clarity around the process, it works approximately like this:

  • Data is added to a batch list.
  • When the list meets max_events_per_batch or max_content_length, it is sent to the HEC
  • The send process has two retry mechanisms
    • One mechanism is the urllib3 Retry mechanism which will attempt to call the POST request up to 3 times for connection failures, read failures, redirections, and 500/503 errors.
    • The second mechanism considers this 1 attempt and will try again if anything other than an HTTP 200 is received up to max_ack_attempts times.
  • The HTTP return is checked for code 200.
    • If it's a 200, the data is considered sent
    • Anything else triggers a retry
  • If ACK is enabled:
    • ACK is attempted for the data block up to max_ack_attempts using the same session object that sent it (ensuring you hit the same backend through a Load Balancer).
    • The urllib3 Retry mechanism described previously is utilized here as well.
    • Additional sleep timing is added based on the defined ack_interval
  • If ACK is successful, we are done.
  • If ACK was unsuccessful, we go back to step 3 and try this all again.

In practice, what does this mean? Well, the use of all of these retry mechanisms adds to the overall duration... a lot. Assume ACK is enabled with an ACK interval of 5 with 5 max_ack_attempts.

  • Before any ACK is attempted, we sleep for 2.5 seconds.
  • After this, we will attempt an ACK and sleep for 5 seconds between each request adding up to 25 seconds to the duration.
  • The urllib3 Retry logic using a backoff of 0.3 and max of 3 connection attempts which can add up to 2.1 seconds per request.

Odds are that if data was sent successfully and the backend indexers are not overburdened, ACK will also be successful within 1-2 attempts but for serverless environments where duration translates directly to cost, it's worth considering the above information before enabling ACK.

Using the Backup Queue

The library accepts a "backup queue" to be based in. It looks like this in code:

from splunk_hec import splunk_hec
import queue

splhec = splunk_hec(token=hec_token, hec_server=hec_server, hec_port=hec_port, input_type='json', use_hec_tls=True,
    use_ack=hec_ack, hec_tls_verify=False, max_ack_attempts=hec_ack_attempts, max_content_length=hec_max_content_length,
    max_events_per_batch=hec_batch_size )

# Note, this can be set directly or passed in during initilization... Depends on your use case.
splhec.backup_queue = queue.Queue(0)

# Simplified event processing
for event in events:
    splhec.send_event(event)

# Check the backup queue
if splhec.backup_queue.empty():
    print('Queue is empty, everything worked!)
else:
    raise RuntimeError('Failures detected. Implement backup routine here.')

Basically, this gives the calling application control over how to handle detected delivery failures. Here are some ways to use this:

  • In serverless environments, I often raise an exception to force the data message to be processed again.
  • In syslog-ng environments, the failed data is written to disk for monitoring by Splunk UF.

Sample Usage

from splunk_hec import splunk_hec
import queue

# Create an object reference to the library, initalized with our settings.
splhec = splunk_hec( token=hec_token, hec_server=hec_server, hec_port=hec_port, input_type=input_type, use_hec_tls=use_hec_tls, use_ack=use_ack, hec_tls_verify=hec_tls_verify, ack_interval=ack_interval,
    max_ack_attempts=max_ack_attempts, max_content_length=max_content_length, max_events_per_batch=max_events_per_batch, context_id=context_id, rotate_session_after=rotate_session_after, debug_enabled=debug_enabled,
    max_threads=max_threads, disable_tls_validation_warnings=disable_tls_validation_warnings )

# Set up a backup queue
splhec.backup_queue = queue.Queue(0)

# Set basic parameters that we'll need later
index = 'main'
sourcetype = 'hec:%s' % str(context_id)
source = 'hec:test:events'

if 'raw' in input_type:
    # When using raw input, we do this to essentially add the parameters to the POST URL so splunk knows where to send the data
    splhec.set_request_params({'index':index, 'sourcetype':sourcetype, 'source':source})

# This comes from a test function so we're looping through to generate data.
for i in range(num_test_events):

    # If you are using the 'json' input type, you need to format this for the /event endpoint. Follow
    # splunk docs on this. This will preformat the data and allows for index-time field extraction.
    if 'json' in input_type:
        payload = {}
        payload['event'] = '{"message": "JSON test Message %s.", "foo": "bar"}' % str(i)
        payload['time'] = int(datetime.utcnow().strftime('%s'))
        payload['source'] = source
        payload['index'] = index
        payload['sourcetype'] = sourcetype
        payload = json.dumps(payload)    

    # If it's a raw payload, not much is necessary here. Make sure you have index-time props in place to parse the data.
    else:
        payload = '%s RAW Test Message %s.' % (str(datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S -0000')),str(i))

    # This should add the event to our batch; when the max batch sizes are met, the library will flush out to the HEC automatically
    splhec.send_event(payload)

# Finally, we call this to signal that we are done. A final flush is performed. If you are using threads, they will be shut down.
# YOU MUST END WITH THIS.
splhec.stop_threads_and_processing()

#### Note: You may also call splhec.force_flush_events() directly to flush to HEC at any time in your calling application.

Samples

There is a directory of samples based on older iterations of some production code I am running. All of these samples are provided as-is with no guarantee that they will be updated/maintained. And while I have made every effort to ensure backwards compatibility through all the library updates over time, I make no guarantee that a future update would break this sample code. If you come to rely on this in a production manner, reach out to me and let's share notes.

FAQs

Did you know?

Socket

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Install

Related posts