Security News
Research
Data Theft Repackaged: A Case Study in Malicious Wrapper Packages on npm
The Socket Research Team breaks down a malicious wrapper package that uses obfuscation to harvest credentials and exfiltrate sensitive data.
aplustools is a simple, user-friendly Python library for performing amazing tasks. It simplifies complex processes, allowing you to achieve more with less code. Developed with a focus on ease of use and efficiency, aplustools is the go-to solution for Python developers looking to enhance their projects.
You can install aplustools via pip:
pip install aplustools --upgrade
Or clone the repository and install manually:
git clone https://github.com/Adalfarus/aplustools.git
cd aplustools
python -m build
Here are a few quick examples of how to use aplustools:
from aplustools.web.search import Search, GoogleSearchCore
# Call the `google_provider` function with a query
searcher = Search(GoogleSearchCore(advanced=True))
results = searcher.search("Cute puppies", num_results=10)
# Print the result
print(results)
from aplustools.web.utils import WebPage
web_page = WebPage(results[0].get("url"))
response = None
if web_page.crawlable: # Google search does this automatically at the moment
response = web_page.page.content
print(response)
from aplustools.web.request import UnifiedRequestHandler, UnifiedRequestHandlerAdvanced
import os
# Default request handler
handler = UnifiedRequestHandler()
# Synchronous request
handler.fetch('http://example.com', async_mode=False)
# Asynchronous request
handler.fetch('http://example.com', async_mode=True)
# Advanced request handler (you can pass extra keyword arguments, and it automatically raises for status)
adv_handler = UnifiedRequestHandlerAdvanced() # It can also handle image content
# Synchronous GET request
adv_handler.request('GET', 'http://example.com', async_mode=False)
# Asynchronous GET request
adv_handler.request('GET', 'http://example.com', async_mode=True)
folder_path = "./test_data/images"
os.makedirs(folder_path, exist_ok=True)
# Synchronous binary request (e.g., image)
image_content = adv_handler.request('GET', 'http://example.com/image.png', async_mode=False, return_type='binary')
with open(os.path.join(folder_path, './image.png'), 'wb') as file:
file.write(image_content)
# Asynchronous binary request (e.g., image)
image_content_async = adv_handler.request('GET', 'http://example.com/image.png', async_mode=True, return_type='binary')
with open(os.path.join(folder_path, './image_async.png'), 'wb') as file:
file.write(image_content_async)
from aplustools.data.imagetools import ImageManager, OnlineImage
import os
os.makedirs("./test", exist_ok=True)
manager = ImageManager("./test", use_async=True)
image_index = manager.add_image(OnlineImage, "somewhere.com/image.jpg")
manager.execute_func(image_index, "download_image")
manager.execute_func(image_index, "convert_to_grayscale")
manager.execute_func(image_index, "apply_filter") # Default is blur
manager.execute_func(image_index, "rotate_image", 12)
manager.execute_func(image_index, "save_image_to_disk") # Overwrites downloaded file
from aplustools.data.imagetools import OnlineImage, OfflineImage, ImageFilter, SVGCompatibleImage
import os
# Setup
folder_path = "./images"
os.makedirs(folder_path, exist_ok=True)
test_url = "someImage.url"
base64_image_str = ("data:image/jpeg;base64,...")
# Test downloading an online image
online_image = OnlineImage(current_url=test_url, base_location=folder_path, one_time=True)
download_result = online_image.download_image(folder_path, online_image.current_url, "downloaded_image", "jpg")
if not download_result[0]:
raise Exception("Failed to download image.")
else:
online_image.convert_to_grayscale()
online_image.save_image_to_disk(os.path.join(folder_path, "image.png"))
# Test loading and saving a base64 image with OfflineImage
offline_image = OfflineImage(data=base64_image_str, base_location=folder_path)
save_result = offline_image.base64(folder_path, "base64_image", "png")
if not save_result:
raise Exception("Failed to save base64 image.")
# Test image transformations
offline_image.resize_image((100, 100))
offline_image.rotate_image(45)
offline_image.convert_to_grayscale()
offline_image.apply_filter(ImageFilter.BLUR)
offline_image.save_image_to_disk(os.path.join(folder_path, "transformed_image.png"))
# Example svg image usage
image_processor = SVGCompatibleImage("someSvg.svg", 300,
(667, 800), magick_path=r".\ImageMagick",
base_location='./')
image_processor.save_image_to_disk()
from aplustools.io.environment import absolute_path, remv, copy
a_img_path = absolute_path(os.path.join(folder_path, "downloaded_image.jpg"))
copy(a_img_path, str(folder_path) + "downloaded_image" + " - Copy.jpg")
remv(a_img_path) # Remove the original image
remv(folder_path) # Remove the base directory
from aplustools.data.faker import TestDataGenerator
test_data = TestDataGenerator()
print(test_data.generate_random_identity())
print("\n", end="")
print(test_data.generate_random_name())
print(test_data.generate_random_email())
print(test_data.generate_random_password())
print(test_data.generate_random_phone_number())
print(test_data.generate_random_address())
print(test_data.generate_random_birth_date())
from aplustools.utils.dummy import Dummy3 # Dummy3 is for Python 3
import math
dummy = Dummy3()
# Do a bunch of operations that would normally throw errors
dummy.attr.func("", int3="")
dummy["Hello"] = 1
del dummy[1]
reversed(dummy)
if not "Dummy" in dummy:
dummy.keys = ["1"]
print(dummy.keys + dummy)
var1 = +dummy
var2 = -dummy
var3 = ~dummy
print(var1, var2, var3)
hash(dummy)
abs(dummy)
round(dummy)
complex(dummy)
oct(dummy)
repr(dummy)
bytes(dummy)
format(dummy, "DD")
math.trunc(dummy)
dummy << dummy
dummy >> dummy
dummy -= 1_000_000
num = 1
num *= dummy
if dummy:
print(True)
else:
print(False)
for x in dummy:
print(x)
type(dummy)
print(dummy, "->", int(dummy), list(dummy), tuple(dummy), float(dummy))
from aplustools.utils.hasher import hashed_latest, hashed_wrapper_latest, reducer, big_reducer, num_hasher
inp = "Hello beautiful world, how are you today, lovely star?"
inp2 = "Hello World, kitty"
desired_length = 64
hashed_inp = hashed_wrapper_latest(inp, desired_length, hash_func=hashed_latest)
hashed_inp2 = hashed_wrapper_latest(inp2, desired_length, hash_func=hashed_latest)
print(f"{inp} ({len(inp)}) -> {hashed_inp} ({len(hashed_inp)})\n{inp2} ({len(inp2)}) -> {hashed_inp2} ({len(hashed_inp2)})")
num_hashed_inp = num_hasher(inp, desired_length)
num_hashed_inp2 = num_hasher(inp2, desired_length)
print(f"{inp} ({len(inp)}) -> {num_hashed_inp} ({len(num_hashed_inp)})\n{inp2} ({len(inp2)}) -> {num_hashed_inp2} ({len(num_hashed_inp2)})")
acceptable_chars = range(100, 200)
num_hashed_inp_uni = num_hasher(inp, desired_length, acceptable_chars)
num_hashed_inp_uni_2 = num_hasher(inp2, desired_length, acceptable_chars)
print(f"{inp} ({len(inp)}) -> {num_hashed_inp_uni} ({len(num_hashed_inp_uni)})\n{inp2} ({len(inp2)}) -> {num_hashed_inp_uni_2} ({len(num_hashed_inp_uni_2)})")
from aplustools.utils.genpass import SecurePasswordManager, GeneratePasswords
manager = SecurePasswordManager()
password = manager.generate_ratio_based_password_v2(length=26, letters_ratio=0.5, numbers_ratio=0.3,
punctuations_ratio=0.2, exclude_similar=True)
manager.add_password("example.com", "json-the-greatest", password)
manager.store_in_buffer("example.com", 0) # Stores unencrypted password in a buffer
print(manager.get_password("example.com"), "|", manager.use_from_buffer(0)) # for faster access.
print(GeneratePasswords.generate_custom_sentence_based_password_v1(
"Exploring the unknown -- discovering new horizons...", random_case=True, extra_char="_",
char_position="keep" or 0, num_length=5, special_chars_length=2))
# "keep" just sets it to 0
# These classes are mostly meant for secure interprocess bidirectional
# communication using networks.
from aplustools.utils.genpass import ControlCodeProtocol, SecureSocketServer, SecureSocketClient
# Initialize the protocol
prot = ControlCodeProtocol()
# Create a client and a server
client = SecureSocketClient(prot)
sock = client.get_socket()
server = SecureSocketServer(sock, prot)
# Start key exchange and communication between server and client
client.start_and_exchange_keys() # Client has to start first
server.start_and_exchange_keys()
# Client sends a message and a control code to the server
client.add_control_code("shutdown")
client.add_message("HELLO SERVER")
client.sendall() # The message is still received as it sends one chunk here
# which then get's decoded in one piece. The decoder decodes everything
# into chunks and control codes are always put behind messages.
# Server shuts down the client connection
server.shutdown_client()
print("DONE1")
# There are also classes for one directional communication that are
# more integrated.
from aplustools.utils.genpass import ControlCodeProtocol, ServerMessageHandler, ClientMessageHandler
import threading
prot = ControlCodeProtocol()
# Create message handlers for server and client
encoder = ClientMessageHandler(prot)
# Make a connection using the clients host and chosen port
connection = encoder.get_socket()
decoder = ServerMessageHandler(connection, prot)
# Client prepares and sends a message
encoder.add_message("Hello Server")
encoder.send_control_message("shutdown")
# Server receives and processes each chunk
encoder.start()
threading.Thread(decoder.listen_for_messages()).start() # Blocking
encoder.flush() # Blocking until connection is established
from aplustools.data.updaters import GithubUpdater, VersionNumber
from aplustools.io.environment import get_temp
from aplustools import set_dir_to_ex
import sys
import os
set_dir_to_ex()
__version__ = VersionNumber("0.0.1")
# Initialize an updater
updater = GithubUpdater("Adalfarus", "unicode-writer", "py") # If you want to use exe
latest_version = updater.get_latest_tag_version() # you need to compile them
# yourself, otherwise it
# Check if an update is needed # won't work
if __version__ >= latest_version:
sys.exit()
# Updater method
path, zip_path = os.path.join(os.getcwd(), "update"), os.path.join(get_temp(), f"apt-update_{latest_version}")
os.makedirs(path, exist_ok=True)
os.makedirs(zip_path, exist_ok=True)
# Start the update in a separate thread
update_thread = updater.update(path, zip_path, latest_version, implementation="none",
host="localhost", port=1264, non_blocking=True, wait_for_connection=True)
update_thread.start()
# Receive the update status generator and print them
progress_bar = 1
for i in updater.receive_update_status():
print(f"{i}%", end=f" PB{progress_bar}\n")
if i == 100:
progress_bar += 1 # Switch to second progress bar, when the downloading is finished
update_thread.join()
from aplustools.package.argumint import ArgStruct, ArguMint
from typing import Literal
import sys
def sorry(*args, **kwargs):
print("Not implemented yet, sorry!")
def help_text():
print("Build -> dir/file or help.")
def build_file(path: Literal["./main.py", "./file.py"] = "./main.py", num: int = 0):
"""
build_file
:param path: The path to the file that should be built.
:param num:
:return None:
"""
print(f"Building file {path} ..., {num}")
from aplustools.package import timid
timer = timid.TimidTimer()
arg_struct = {'apt': {'build': {'file': {}, 'dir': {'main': {}, 'all': {}}}, 'help': {}}}
# Example usage
builder = ArgStruct()
builder.add_command("apt")
builder.add_nested_command("apt", "build", "file")
builder.add_nested_command("apt.build", "dir", {'main': {}, 'all': {}})
# builder.add_subcommand("apt.build", "dir")
# builder.add_nested_command("apt.build.dir", "main")
# builder.add_nested_command("apt.build.dir", "all")
builder.add_command("apt.help")
# builder.add_nested_command("apt", "help")
print(builder.get_structure()) # Best to cache this for better times (by ~15 microseconds)
parser = ArguMint(default_endpoint=sorry, arg_struct=arg_struct)
parser.add_endpoint("apt.help", help_text)
parser.add_endpoint("apt.build.file", build_file)
sys.argv[0] = "apt"
# Testing
# sys.argv = ["apt", "help"]
# sys.argv = ["apt", "build", "file", "./file.py", "--num=19"]
parser.parse_cli(sys, "native_light")
print(timer.end())
from aplustools.data.compressor import FileContainerV3, BrotliChunkCompressor
import os
compressor = BrotliChunkCompressor()
container = FileContainerV3(compressor, block_size=2048*2048)
image_data = {}
for file in os.listdir("./images"):
if file.lower().endswith(".png"):
with open(os.path.join("./images", file), "rb") as f:
image_file_data = b''.join(f.readlines())
image_data[file] = image_file_data
for file_name, image in image_data.items():
container.add_file(file_name, image)
# Get the compressed data
compressed_data = container.get_compressed_container()
print("Compression done")
with open("./files.bin", "wb") as f:
f.write(compressed_data)
print("Wrote bin")
# To extract a specific file from the compressed data
try:
decompressed_images = []
for i in range(len(image_data)):
decompressed_image = container.extract_file(compressed_data, i)
decompressed_images.append(decompressed_image)
except Exception as e:
print("Indexing not possible, error", e, "\n")
decompressed_images = []
for file_name in image_data.keys():
decompressed_image = container.extract_file(compressed_data, file_name)
decompressed_images.append(decompressed_image)
compression_ratio = len(compressed_data) / sum(len(x) for x in image_data.values())
print(f"Original size: {sum(len(x) for x in image_data.values())} bytes")
print(f"Compressed size: {len(compressed_data)} bytes")
print(f"Compression ratio: {compression_ratio:.2f}")
for i, decompressed_image in enumerate(decompressed_images):
with open(f"./decompressed_images/image{i}.png", "wb") as f:
f.write(decompressed_image)
from aplustools.io import environment as env
from aplustools.io import loggers
# Set the current working directory to the main script or executable
env.set_working_dir_to_main_script_location()
# Create an instance of Logger
p_logger = loggers.PrintLogger("my_logs_file.log", show_time=True, capture_print=False,
overwrite_print=True, print_passthrough=False, print_log_to_stdout=True)
# Shows the time in the logs (Like this [12:33:21]) and overwrites the normal sys.stdout
# Call the `monitor_stdout` method and pass the logger object, this will overwrite sys.stdout from Text I/O to the logger object
logger = loggers.monitor_stdout(logger=p_logger) # Return sys.stdout, so you can make sure it worked
# Log something
p_logger.log("Hello, world!")
# Print something, it won't get captured or displayed
print("Hello, beautiful world!")
# Close the Logger object (returns sys.stdout to it's normal state)
p_logger.close()
(If you want to ensure that all dependencies are installed please run upype import aplustools; aplustools.install_all_dependencies()
)
(Correct shortform for aplustools is apt, so please use import aplustools as apt
for consistency)
There are multiple clis added through this package:
C:\Users\user_>pype
Enter Python expression: 1 + 2
3
C:\Users\user_>pype 1 // 3
0
C:\Users\user_>upype import aplustools; print(aplustools.__version__)
1.4.4
C:\Users\user_>upype
Enter Python code (type 'end' on a new line to finish):
... class Test:
... def hello_word(self):
... print("Hello, you!")
... test = Test()
... test.hello_word()
... end
Hello, you!
These can also be used in e.g. batch files like this
@echo off
set /p id=Enter ID:
upype from aplustools.utils.genpass import GeneratePasswords; print(GeneratePasswords.generate_custom_sentence_based_password_v1("""%id%""", random_case=True, extra_char="""_""", char_position=0, num_length=5, special_chars_length=2))
pause
Can currently run tests with apt tests run
and show a basic help using apt help
.
For more detailed usage and examples, check out our documentation.
PEP 8 -- Style Guide for Python Code
For modules I use 'lowercase', classes are 'CapitalizedWords' and functions and methods are 'lower_case_with_underscores'.
Dependencies (except for the standard libraries) are:
none
]
requests
]
requests
, PySide6
]Pillow
, aiohttp
, requests
, wand
]opencv-python
, aiohttp
, wand
, pillow_heif
]requests
, BeautifulSoup4
]cryptography
]requests
, aiohttp
]brotli
, zstandard
, py7zr
]PySide6
]Sub-Modules that may be removed in future updates due to being hard to support or simply unneeded.
We welcome contributions! Please see our contributing guidelines for more details on how you can contribute to aplustools.
git checkout -b feature/AmazingFeature
)git commit -m 'Add some AmazingFeature'
)git push origin feature/AmazingFeature
)aplustools is licensed under the GPL-3.0 License - see the LICENSE file for details.
FAQs
A collection of helpful tools
We found that aplustools demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Security News
Research
The Socket Research Team breaks down a malicious wrapper package that uses obfuscation to harvest credentials and exfiltrate sensitive data.
Research
Security News
Attackers used a malicious npm package typosquatting a popular ESLint plugin to steal sensitive data, execute commands, and exploit developer systems.
Security News
The Ultralytics' PyPI Package was compromised four times in one weekend through GitHub Actions cache poisoning and failure to rotate previously compromised API tokens.