
Rosetta
Rosetta is a Python package that can be used to fake security logs and alerts for testing different detection and response use cases. It provides the following functions:
- Generate bad and random observables/indicators that include IP Addresses, Urls, File hashes , CVE's and more
- Fake log messages in different formats like CEF, LEEF and JSON.
- Convert one log format into another, for example from CEF to LEEF.
- Send the fake log message to different log management and analytics tools.
Installation
- You can install rosetta via pip:
pip install rosetta-ce
- Or you can install it from the source code:
git clone https://github.com/ayman-m/rosetta.git
cd rosetta
python setup.py install
- Once installed, you can import the library in your Python code like this:
from rosetta import Observables, Events
Usage
Here are some examples of how to use Rosetta:
from rosetta import Converter, ConverterToEnum, ConverterFromEnum, Events, ObservableType, ObservableKnown, \
Observables, Sender
converted_log = Converter.convert(from_type=ConverterFromEnum.CEF, to_type=ConverterToEnum.LEEF,
data="cef_log=CEF:0|Security|Intrusion Detection System|1.0|Alert|10|src=192.168.0.1 dst=192.168.0.2 act=blocked")
print(
converted_log)
bad_ip = Observables.generator(count=2, observable_type=ObservableType.IP, known=ObservableKnown.BAD)
print(bad_ip)
good_ip = Observables.generator(count=2, observable_type=ObservableType.IP, known=ObservableKnown.GOOD)
print(good_ip)
bad_url = Observables.generator(count=2, observable_type=ObservableType.URL, known=ObservableKnown.BAD)
print(bad_url)
good_url = Observables.generator(count=2, observable_type=ObservableType.URL, known=ObservableKnown.GOOD)
print(good_url)
bad_hash = Observables.generator(count=2, observable_type=ObservableType.SHA256, known=ObservableKnown.BAD)
print(bad_hash)
good_hash = Observables.generator(count=2, observable_type=ObservableType.SHA256, known=ObservableKnown.GOOD)
print(good_hash)
cve = Observables.generator(count=2, observable_type=ObservableType.CVE)
print(cve)
terms = Observables.generator(count=2, observable_type=ObservableType.TERMS)
print(terms)
src_ip, dst_ip, src_host, dst_host = ["192.168.10.10", "192.168.10.20"], ["1.1.1.1", "1.1.1.2"], ["abc"], ["xyz", "wlv"]
url, port = ["https://example.org", "https://wikipedia.com"], ["555", "666"]
protocol, app = ["ftp", "dns", "ssl"], ["explorer.exe", "chrome.exe"]
user = ["ayman", "mahmoud"]
file_name, file_hash = ["test.zip", "image.ps"], ["719283fd5600eb631c23b290530e4dac9029bae72f15299711edbc800e8e02b2"]
cmd, process = ["sudo restart", "systemctl stop firewalld"], ["bind", "crond"]
severity = ["high", "critical"]
sensor = ["fw", "edr"]
action = ["block", "allow"]
observables_list = Observables(src_ip=src_ip, dst_ip=dst_ip, src_host=src_host, dst_host=dst_host, url=url, port=port,
protocol=protocol, app=app, user=user, file_name=file_name, file_hash=file_hash, cmd=cmd,
process=process, severity=severity, sensor=sensor, action=action)
generic_syslog_with_random_observables = Events.syslog(count=1)
print(generic_syslog_with_random_observables)
generic_syslog_with_my_observables = Events.syslog(count=1, observables=observables_list)
print(generic_syslog_with_my_observables)
generic_cef_with_my_observables = Events.cef(count=1, observables=observables_list)
print(generic_cef_with_my_observables)
leef_with_my_observables = Events.leef(count=1, observables=observables_list)
print(leef_with_my_observables)
winevent_with_my_observables = Events.winevent(count=1, observables=observables_list)
print(winevent_with_my_observables)
json_with_my_observables = Events.json(count=1, observables=observables_list)
print(json_with_my_observables)
incident_with_my_observables = Events.incidents(count=1, fields="id,type,duration,analyst,description,events", observables=observables_list)
print(incident_with_my_observables)
worker = Sender(data_type="SYSLOG", destination="udp:127.0.0.1:514", observables=observables_list, count=5, interval=2)
worker.start()