New Research: Supply Chain Attack on Axios Pulls Malicious Dependency from npm.Details →
Socket
Book a DemoSign in
Socket

deffcode

Package Overview
Dependencies
Maintainers
1
Versions
8
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

deffcode - pypi Package Compare versions

Comparing version
0.2.5
to
0.2.6
+570
tests/test_ffdecoder.py
"""
===============================================
DeFFcode library source-code is deployed under the Apache 2.0 License:
Copyright (c) 2021 Abhishek Thakur(@abhiTronix) <abhi.una12@gmail.com>
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
===============================================
"""
# import the necessary packages
import os
import cv2
import json
import pytest
import tempfile
import platform
import numpy as np
import logging
from .essentials import (
return_static_ffmpeg,
return_testvideo_path,
return_generated_frames_path,
actual_frame_count_n_frame_size,
remove_file_safe,
)
from PIL import Image
from deffcode import FFdecoder
from deffcode.utils import logger_handler
# define test logger
logger = logging.getLogger("Test_FFdecoder")
logger.propagate = False
logger.addHandler(logger_handler())
logger.setLevel(logging.DEBUG)
@pytest.mark.parametrize(
"source, custom_ffmpeg, output",
[
(return_testvideo_path(fmt="av"), return_static_ffmpeg(), True),
(
"https://gitlab.com/abhiTronix/Imbakup/-/raw/master/Images/starship.mkv",
"",
True,
),
("unknown://invalid.com/", "", False),
(return_testvideo_path(fmt="ao"), return_static_ffmpeg(), False),
(
return_generated_frames_path(return_static_ffmpeg()),
return_static_ffmpeg(),
True,
),
],
)
def test_source_playback(source, custom_ffmpeg, output):
"""
Paths Source Playback - Test playback of various source paths/urls supported by FFdecoder API
"""
decoder = None
frame_num = 0
try:
# formulate the decoder with suitable source(for e.g. foo.mp4)
if source == return_testvideo_path(fmt="av"):
# get instance
instance = FFdecoder(
source,
frame_format="bgr24",
custom_ffmpeg=custom_ffmpeg,
verbose=True,
)
# force unknown number of frames(like camera) {special case}
instance.metadata = {"approx_video_nframes": 0, "source_has_audio": True}
# formulate decoder
decoder = instance.formulate()
else:
# formulate decoder
decoder = FFdecoder(
source,
frame_format="bgr24",
custom_ffmpeg=custom_ffmpeg,
verbose=True,
).formulate()
# gather data
actual_frame_num, actual_frame_shape = actual_frame_count_n_frame_size(source)
# grab RGB24(default) 3D frames from decoder
for frame in decoder.generateFrame():
# check shape
if frame.shape != actual_frame_shape:
raise RuntimeError("Test failed")
# increment number of frames
frame_num += 1
assert frame_num >= actual_frame_num, "Test failed"
except Exception as e:
if not output:
logger.exception(str(e))
pytest.xfail("Test Passed!")
else:
pytest.fail(str(e))
finally:
# terminate the decoder
not (decoder is None) and decoder.terminate()
@pytest.mark.parametrize(
"pixfmts", ["bgr24", "gray", "rgba", "invalid", "invalid2", "yuv420p", "bgr48be"]
)
def test_frame_format(pixfmts):
"""
Testing `frame_format` with different pixel formats.
"""
decoder = None
frame_num = 0
source = return_testvideo_path(fmt="vo")
actual_frame_num, actual_frame_shape = actual_frame_count_n_frame_size(source)
ffparams = {"-pix_fmt": "bgr24"}
try:
# formulate the decoder with suitable source(for e.g. foo.mp4)
if pixfmts == "yuv420p":
ffparams = {"-enforce_cv_patch": True}
decoder = FFdecoder(
source,
frame_format=pixfmts,
custom_ffmpeg=return_static_ffmpeg(),
verbose=True,
**ffparams,
).formulate()
elif pixfmts != "invalid2":
decoder = FFdecoder(
source,
frame_format=pixfmts,
custom_ffmpeg=return_static_ffmpeg(),
**ffparams,
).formulate()
else:
decoder = FFdecoder(
source,
custom_ffmpeg=return_static_ffmpeg(),
**ffparams,
)
# assign manually pix-format via `metadata` property object {special case}
decoder.metadata = dict(output_frames_pixfmt="yuvj422p")
# formulate decoder
decoder.formulate()
# grab RGB24(default) 3D frames from decoder
for frame in decoder.generateFrame():
if pixfmts == "yuv420p":
# try converting to BGR frame
frame = cv2.cvtColor(frame, cv2.COLOR_YUV2BGR_I420)
# lets print its shape
logger.debug(frame.shape)
break
except Exception as e:
pytest.fail(str(e))
finally:
# terminate the decoder
not (decoder is None) and decoder.terminate()
@pytest.mark.parametrize(
"custom_params, checks",
[
(
{
"source": "Custom_Value", # source cannot be altered
"mytuple": ( # Python's `json` module converts Python tuples to JSON lists
1,
"John",
("inner_tuple"),
),
"output_frames_pixfmt": 1234, # invalid pixformat
"source_video_resolution": [640], # invalid resolution
"-disable_ffmpeg_window": "Invalid",
},
False,
),
(
{"output_frames_pixfmt": "invalid", "-disable_ffmpeg_window": False},
False,
),
(["invalid"], False),
(
dict(
mystring="abcd", # string data
myint=1234, # integers data
mylist=[1, "Rohan", ["inner_list"]], # list data
mydict={"anotherstring": "hello"}, # dictionary data
myjson=json.loads(
'{"name": "John", "age": 30, "city": "New York"}'
), # json data
source_video_resolution=[640, 480],
),
True,
),
],
)
def test_metadata(custom_params, checks):
"""
Testing `metadata` print and updation
"""
decoder = None
source = return_testvideo_path(fmt="vo")
try:
# custom vars
ffparams = (
{"-framerate": None}
if not checks
else {"-framerate": 25.0, "-disable_ffmpeg_window": True}
)
# formulate the decoder with suitable source(for e.g. foo.mp4)
decoder = FFdecoder(
source, custom_ffmpeg=return_static_ffmpeg(), verbose=True, **ffparams
).formulate()
# re-test
decoder.formulate()
# print metadata as `json.dump`
logger.debug(decoder.metadata)
# change metadata
decoder.metadata = custom_params
# print metadata as `json.dump`
logger.debug(decoder.metadata)
if checks:
assert all(
json.loads(decoder.metadata)[x] == custom_params[x]
for x in custom_params
), "Test failed"
except Exception as e:
if not checks:
pytest.xfail(str(e))
else:
pytest.fail(str(e))
finally:
# terminate the decoder
not (decoder is None) and decoder.terminate()
@pytest.mark.parametrize(
"ffparams, pixfmts",
[
(
{
"-ss": "00:00:01.45",
"-frames:v": 1,
"-custom_resolution": [640, 480],
"-framerate": 30.0,
"-passthrough_audio": "invalid", # just for test
"-vcodec": "unknown",
},
"rgba",
),
(
{
"-ss": "00:02.45",
"-vframes": 1,
"-custom_resolution": "invalid",
"-framerate": "invalid",
"-ffprefixes": "invalid",
"-clones": "invalid",
"-framerate": "invalid",
"-vcodec": None,
},
"gray",
),
],
)
def test_seek_n_save(ffparams, pixfmts):
"""
Testing `frame_format` with different colorspaces.
"""
decoder = None
filename = ""
try:
# formulate the decoder with suitable source(for e.g. foo.mp4)
decoder = FFdecoder(
return_testvideo_path(fmt="vo"),
frame_format=pixfmts,
custom_ffmpeg=return_static_ffmpeg(),
verbose=True,
**ffparams,
).formulate()
# grab the RGB24(default) frame from the decoder
frame = next(decoder.generateFrame(), None)
# check if frame is None
if not (frame is None) and pixfmts == "rgba":
# Convert and save our output
filename = os.path.abspath(
os.path.join(
*[tempfile.gettempdir(), "temp_write", "filename_rgba.jpeg"]
)
)
im = Image.fromarray(frame)
im = im.convert("RGB")
im.save(filename)
elif not (frame is None) and pixfmts == "gray":
# Convert and save our output
filename = os.path.abspath(
os.path.join(
*[tempfile.gettempdir(), "temp_write", "filename_gray.png"]
)
)
cv2.imwrite(filename, frame)
else:
raise AssertionError("Test Failed!")
if filename:
assert os.path.isfile(filename), "Test Failed!"
except Exception as e:
pytest.fail(str(e))
finally:
# terminate the decoder
not (decoder is None) and decoder.terminate()
filename and remove_file_safe(filename)
test_data = [
(return_testvideo_path(), {"-c:v": "hevc"}, False),
(
return_generated_frames_path(return_static_ffmpeg()),
{"-s": "data", "-vcodec": None},
True,
),
(
return_testvideo_path(),
{"-vcodec": "h264", "-vf": "rotate=angle=-20*PI/180:fillcolor=brown"},
True,
),
(
"testsrc=size=1280x720:rate=30", # virtual "testsrc" source
{
"-ffprefixes": ["-t", "5"], # playback time of 5 seconds
"-clones": [
"-i",
"https://abhitronix.github.io/deffcode/latest/assets/images/ffmpeg.png",
],
"-filter_complex": "[1]format=rgba,colorchannelmixer=aa=0.5[logo];[0][logo]overlay=W-w-5:H-h-5:format=auto,format=bgr24",
},
True,
),
]
@pytest.mark.parametrize("source, ffparams, result", test_data)
def test_FFdecoder_params(source, ffparams, result):
"""
Testing FFdecoder API with different parameters and save output
"""
decoder = None
writer = None
f_name = os.path.join(*[tempfile.gettempdir(), "temp_write", "output_foo.avi"])
try:
# initialize and formulate the decode with suitable source
with FFdecoder(
source,
frame_format="bgr24",
source_demuxer=(
"lavfi"
if (isinstance(source, str) and source.startswith("testsrc"))
else None
),
**ffparams,
) as decoder:
# retrieve JSON Metadata and convert it to dict
metadata_dict = json.loads(decoder.metadata)
# prepare OpenCV parameters
FOURCC = cv2.VideoWriter_fourcc("M", "J", "P", "G")
FRAMERATE = metadata_dict["source_video_framerate"]
FRAMESIZE = tuple(metadata_dict["source_video_resolution"])
# Define writer with parameters and suitable output filename for e.g. `output_foo.avi`
writer = cv2.VideoWriter(f_name, FOURCC, FRAMERATE, FRAMESIZE)
# grab the BGR24 frame from the decoder
for frame in decoder.generateFrame():
# check if frame is None
if frame is None:
break
# writing BGR24 frame to writer
writer.write(frame)
except Exception as e:
if result:
pytest.fail(str(e))
else:
pytest.xfail(str(e))
finally:
# terminate the decoder
if not (writer is None):
writer.release()
remove_file_safe(f_name)
test_data = [
(
"/dev/video0",
"v4l2",
True if platform.system() == "Linux" else False,
), # manual source and demuxer
(
0,
None,
True if platform.system() == "Linux" else False,
), # +ve index and no demuxer
(
"-1",
"auto",
True if platform.system() == "Linux" else False,
), # -ve index and "auto" demuxer
("5", "auto", False), # out-of-range index and "auto" demuxer
("invalid", "auto", False), # invalid source and "auto" demuxer
("/dev/video0", "invalid", False), # manual source and invalid demuxer
]
@pytest.mark.parametrize("source, source_demuxer, result", test_data)
def test_camera_capture(source, source_demuxer, result):
"""
Tests FFdecoder's realtime Webcam and Virtual playback capabilities
as well as Index based Camera Device Capturing
"""
decoder = None
try:
# initialize and formulate the decode with suitable source
decoder = FFdecoder(
source,
source_demuxer=source_demuxer,
frame_format="bgr24",
verbose=True,
).formulate()
# capture 5 camera frames
for i in range(5):
# grab the bgr24 frame from the decoder
frame_recv = next(decoder.generateFrame(), None)
# check if frame is None
if frame_recv is None:
raise AssertionError("Test Failed!")
except Exception as e:
if result:
# catch errors
pytest.fail(str(e))
else:
pytest.xfail(str(e))
finally:
# terminate
not (decoder is None) and decoder.terminate()
test_data = [
(
"null", # discard frame_format
{
"-custom_resolution": "null", # discard `-custom_resolution`
"-framerate": "null", # discard `-framerate`
"-enforce_cv_patch": "invalid", # invalid value for testing
"-vf": "format=bgr24,scale=320:240,fps=60", # format=bgr24, scale=320x240, framerate=60fps
},
True,
),
(
"bgr24", # this pixel-format must override filter `format=rgb24`
{
"-vf": "format=rgb24,scale=320:240,fps=60", # format=rgb24, scale=320x240, framerate=60fps
},
True,
),
(
"invalid", # invalid frame_format
{
"-custom_resolution": "invalid", # invalid `-custom_resolution`
"-framerate": "invalid", # invalid `-framerate`
"-vf": "format=bgr24,scale=320:240,fps=60", # format=bgr24, scale=320x240, framerate=60fps
},
True,
),
(
"invalid2", # invalid frame_format
{
"-custom_resolution": "invalid", # invalid `-custom_resolution`
"-framerate": "null", # discard `-framerate`
},
False,
),
(
"invalid3", # invalid frame_format
{
"-custom_resolution": "null", # discard `-custom_resolution`
"-framerate": "invalid", # invalid `-framerate`
},
False,
),
(
"null", # discard frame_format
{
"-custom_resolution": "null", # discard `-custom_resolution`
"-framerate": "null", # discard `-framerate`
},
True,
),
]
@pytest.mark.parametrize("frame_format, ffparams, result", test_data)
def test_discard_n_filter_params(frame_format, ffparams, result):
"""
Tests FFdecoder's discarding FFmpeg parameters and using FFmpeg Filter
capabilities
"""
decoder = None
try:
# initialize and formulate the decode with suitable source
if not frame_format in ["invalid2", "invalid3"]:
decoder = FFdecoder(
return_testvideo_path(),
frame_format=frame_format,
verbose=True,
**ffparams,
).formulate()
else:
decoder = FFdecoder(
return_testvideo_path(),
frame_format=frame_format,
verbose=True,
**ffparams,
)
# assign manually pix-format via `metadata` property object {special case}
decoder.metadata = (
{"source_video_resolution": [0], "output_frames_resolution": [0]}
if frame_format == "invalid2"
else {"source_video_framerate": 0.0, "output_framerate": 0.0}
)
# formulate decoder
decoder.formulate()
# capture 2 camera frames
for i in range(2):
# grab the bgr24 frame from the decoder
frame_recv = next(decoder.generateFrame(), None)
# check if frame is None
if frame_recv is None:
raise AssertionError("Test Failed!")
except Exception as e:
if result:
# catch errors
pytest.fail(str(e))
else:
pytest.xfail(str(e))
finally:
# terminate
not (decoder is None) and decoder.terminate()
"""
===============================================
DeFFcode library source-code is deployed under the Apache 2.0 License:
Copyright (c) 2021 Abhishek Thakur(@abhiTronix) <abhi.una12@gmail.com>
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
===============================================
"""
# import the necessary packages
import os
import pytest
import shutil
import logging
import requests
import tempfile
from .essentials import (
is_windows,
return_static_ffmpeg,
return_testvideo_path,
return_generated_frames_path,
)
from deffcode.utils import logger_handler
from deffcode.ffhelper import (
get_valid_ffmpeg_path,
download_ffmpeg_binaries,
validate_ffmpeg,
validate_imgseqdir,
is_valid_image_seq,
is_valid_url,
check_sp_output,
extract_device_n_demuxer,
)
# define test logger
logger = logging.getLogger("Test_ffhelper")
logger.propagate = False
logger.addHandler(logger_handler())
logger.setLevel(logging.DEBUG)
test_data = [
(
os.path.join(tempfile.gettempdir(), "temp_ffmpeg"),
"win32" if is_windows else "",
),
(
os.path.join(tempfile.gettempdir(), "temp_ffmpeg"),
"win64" if is_windows else "",
),
("wrong_test_path", "wrong_bit"),
]
@pytest.mark.parametrize("paths, os_bit", test_data)
def test_ffmpeg_binaries_download(paths, os_bit):
"""
Testing Static FFmpeg auto-download on Windows OS
"""
file_path = ""
try:
file_path = download_ffmpeg_binaries(
path=paths, os_windows=is_windows, os_bit=os_bit
)
if file_path:
logger.debug("FFmpeg Binary path: {}".format(file_path))
assert os.path.isfile(file_path), "FFmpeg download failed!"
shutil.rmtree(os.path.abspath(os.path.join(file_path, "../..")))
except Exception as e:
if paths == "wrong_test_path" or os_bit == "wrong_bit":
pass
else:
pytest.fail(str(e))
@pytest.mark.parametrize("paths", ["wrong_test_path", return_static_ffmpeg()])
def test_validate_ffmpeg(paths):
"""
Testing downloaded FFmpeg Static binaries validation on Windows OS
"""
try:
output = validate_ffmpeg(paths, verbose=True)
if paths != "wrong_test_path":
assert bool(output), "Validation Test failed at path: {}".format(paths)
except Exception as e:
if paths == "wrong_test_path":
pass
else:
pytest.fail(str(e))
test_data = [
("", "", True),
("wrong_test_path", "", False),
("", "wrong_test_path", False),
("", os.path.join(tempfile.gettempdir(), "temp_ffmpeg"), True),
(return_static_ffmpeg(), "", True),
(os.path.dirname(return_static_ffmpeg()), "", True),
]
@pytest.mark.parametrize("paths, ffmpeg_download_paths, results", test_data)
def test_get_valid_ffmpeg_path(paths, ffmpeg_download_paths, results):
"""
Testing FFmpeg excutables validation and correction:
"""
try:
output = get_valid_ffmpeg_path(
custom_ffmpeg=paths,
is_windows=is_windows,
ffmpeg_download_path=ffmpeg_download_paths,
verbose=True,
)
if not (
paths == "wrong_test_path" or ffmpeg_download_paths == "wrong_test_path"
):
assert (
bool(output) == results
), "FFmpeg excutables validation and correction Test failed at path: {} and FFmpeg ffmpeg_download_paths: {}".format(
paths, ffmpeg_download_paths
)
except Exception as e:
if paths == "wrong_test_path" or ffmpeg_download_paths == "wrong_test_path":
pass
elif isinstance(e, requests.exceptions.Timeout):
logger.exceptions(str(e))
else:
pytest.fail(str(e))
@pytest.mark.xfail(raises=Exception)
def test_check_sp_output():
"""
Testing check_sp_output method
"""
check_sp_output(["ffmpeg", "-Vv"])
@pytest.mark.parametrize(
"URL, result",
[
("rtmp://live.twitch.tv/", True),
(None, False),
("unknown://invalid.com/", False),
],
)
def test_is_valid_url(URL, result):
"""
Testing is_valid_url method
"""
try:
result_url = is_valid_url(return_static_ffmpeg(), url=URL, verbose=True)
assert result_url == result, "URL validity test Failed!"
except Exception as e:
pytest.fail(str(e))
@pytest.mark.parametrize(
"source, result",
[
(return_generated_frames_path(return_static_ffmpeg()), True),
(None, False),
(return_testvideo_path(), False),
(
"{}/Downloads/Test_videos/{}".format(tempfile.gettempdir(), "invalid.png"),
False,
),
],
)
def test_is_valid_image_seq(source, result):
"""
Testing test_is_valid_image_seq method
"""
try:
result_url = is_valid_image_seq(
return_static_ffmpeg(), source=source, verbose=True
)
assert result_url == result, "Image sequence validity test Failed!"
except Exception as e:
result and pytest.fail(str(e))
@pytest.mark.parametrize(
"path, result",
[
(return_generated_frames_path(return_static_ffmpeg()), True),
("unknown://invalid.com/", False),
],
)
def test_validate_imgseqdir(path, result):
"""
Testing validate_imgseqdir method
"""
try:
output = validate_imgseqdir(path, extension="png", verbose=True)
assert output == result, "Image sequence directory validity test Failed!"
except Exception as e:
result and pytest.fail(str(e))
@pytest.mark.xfail(raises=ValueError)
def test_extract_device_n_demuxer():
"""
Testing extract_device_n_demuxer method
"""
extract_device_n_demuxer(return_static_ffmpeg(), machine_OS="invalid", verbose=True)
"""
===============================================
DeFFcode library source-code is deployed under the Apache 2.0 License:
Copyright (c) 2021 Abhishek Thakur(@abhiTronix) <abhi.una12@gmail.com>
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
===============================================
"""
# import the necessary packages
import pytest
import logging
from .essentials import (
return_static_ffmpeg,
return_testvideo_path,
return_generated_frames_path,
actual_frame_count_n_frame_size,
)
from deffcode.utils import logger_handler
from deffcode import Sourcer
# define test logger
logger = logging.getLogger("Test_Sourcer")
logger.propagate = False
logger.addHandler(logger_handler())
logger.setLevel(logging.DEBUG)
@pytest.mark.parametrize(
"source, sourcer_params, custom_ffmpeg",
[
(
return_generated_frames_path(return_static_ffmpeg()),
{
"-ffprefixes": "invalid", # invalid ffprefixes
"-filter_complex": "loop=loop=3:size=15:start=25",
},
return_static_ffmpeg(),
),
(
"rtmp://live.twitch.tv/",
{
"-ffmpeg_download_path": ["invalid"], # invalid FFmpeg download path
"-ffprefixes": ["-stream_loop", "3"],
},
return_static_ffmpeg(),
),
(
"unknown://invalid.com/", # invalid source-1
{"-force_validate_source": True}, # force_validate_source
return_static_ffmpeg(),
),
(
return_testvideo_path(fmt="ao"),
{"-force_validate_source": ["invalid"]}, # invalid force_validate_source
return_static_ffmpeg(),
),
(
return_testvideo_path(),
{},
"invalid_ffmpeg", # invalid FFmpeg
),
],
)
def test_source(source, sourcer_params, custom_ffmpeg):
"""
Paths Source - Test various source paths/urls supported by Sourcer.
"""
try:
sourcer = Sourcer(
source, custom_ffmpeg=custom_ffmpeg, verbose=True, **sourcer_params
).probe_stream()
logger.debug("Found Metadata: `{}`".format(sourcer.retrieve_metadata()))
except Exception as e:
if isinstance(e, ValueError) or custom_ffmpeg == "invalid_ffmpeg":
pytest.xfail("Test Passed!")
else:
pytest.fail(str(e))
@pytest.mark.parametrize(
"source, default_stream_indexes, params",
[
(return_testvideo_path(), [0, 0], ["source_has_video", "source_has_audio"]),
("mandelbrot=size=1280x720:rate=30", [0, 0], ["source_has_video"]),
(return_testvideo_path(fmt="ao"), [3, 2], ["source_has_audio"]),
(
"http://devimages.apple.com/iphone/samples/bipbop/bipbopall.m3u8",
(1, 0),
["source_has_video", "source_has_audio"],
),
("unknown://invalid.com/", (1, "invalid", 0), []),
("invalid", (), []),
(
return_generated_frames_path(return_static_ffmpeg()),
(0, 0),
["source_has_image_sequence"],
),
],
)
def test_probe_stream_n_retrieve_metadata(source, default_stream_indexes, params):
"""
Test `probe_stream` and `retrieve_metadata` function.
"""
try:
source_demuxer = (
"lavfi" if source == "mandelbrot=size=1280x720:rate=30" else None
)
if source == "invalid":
sourcer = Sourcer(
source, custom_ffmpeg=return_static_ffmpeg(), verbose=True
)
else:
sourcer = Sourcer(
source,
custom_ffmpeg=return_static_ffmpeg(),
source_demuxer=source_demuxer,
verbose=True,
).probe_stream(default_stream_indexes=default_stream_indexes)
metadata = sourcer.retrieve_metadata()
logger.debug("Found Metadata: `{}`".format(metadata))
assert all(metadata[x] == True for x in params), "Test Failed!"
if (
source.startswith("http")
or source.endswith("png")
or source == "mandelbrot=size=1280x720:rate=30"
):
logger.debug("Skipped check!")
else:
assert (
metadata["approx_video_nframes"]
>= actual_frame_count_n_frame_size(source)[0]
), "Test Failed for frames count!"
except Exception as e:
if isinstance(e, ValueError) or (
source in ["invalid", "unknown://invalid.com/"]
and isinstance(e, AssertionError)
):
pytest.xfail("Test Still Passed!")
else:
pytest.fail(str(e))
"""
===============================================
DeFFcode library source-code is deployed under the Apache 2.0 License:
Copyright (c) 2021 Abhishek Thakur(@abhiTronix) <abhi.una12@gmail.com>
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
===============================================
"""
# import the necessary packages
import pytest
import logging
import os
import tempfile
from os.path import expanduser
from deffcode.utils import dict2Args, logger_handler, delete_file_safe
# define test logger
logger = logging.getLogger("Test_Utilities")
logger.propagate = False
logger.addHandler(logger_handler())
logger.setLevel(logging.DEBUG)
test_data = [
(os.path.join(expanduser("~"), "deffcode.log"), logging.FileHandler("test.log")),
(False, logging.StreamHandler()),
]
@pytest.mark.parametrize("log_filepath, handler_type", test_data)
def test_loggerhandler(log_filepath, handler_type):
"""
Testing dict2Args utils function.
"""
if log_filepath:
os.environ["DEFFCODE_LOGFILE"] = log_filepath
try:
assert type(logger_handler()) == type(handler_type), "Test failed"
except AssertionError:
pytest.fail("Logger handler test failed!")
finally:
log_filepath and os.environ.pop("DEFFCODE_LOGFILE", None)
test_data = [
{"-thread_queue_size": "512", "-f": "alsa", "-clones": 24},
{
"-thread_queue_size": "512",
"-f": "alsa",
"-clones": ["-map", "0:v:0", "-map", "1:a?"],
"-ac": "1",
"-ar": "48000",
"-i": "plughw:CARD=CAMERA,DEV=0",
},
{
"-thread_queue_size": "512",
"-f": "alsa",
"-ac": "1",
"-ar": "48000",
"-i": "plughw:CARD=CAMERA,DEV=0",
},
]
@pytest.mark.parametrize("dictionary", test_data)
def test_dict2Args(dictionary):
"""
Testing dict2Args utils function.
"""
result = dict2Args(dictionary)
if result and isinstance(result, list):
logger.debug("dict2Args converted Arguments are: {}".format(result))
else:
pytest.fail("Failed to complete this test!")
test_data = [
(os.path.join(expanduser("~"), "invalid"), True),
("{}/Downloads/Test_videos/{}".format(tempfile.gettempdir(), "undelete.txt"), False),
]
@pytest.mark.parametrize("file_path, result", test_data)
def test_delete_file_safe(file_path, result):
"""
Testing delete_file_safe method
"""
try:
delete_file_safe(file_path)
except Exception as e:
if result:
pytest.fail(str(e))
else:
pytest.xfail(str(e))
+17
-12
Metadata-Version: 2.1
Name: deffcode
Version: 0.2.5
Version: 0.2.6
Summary: A cross-platform High-performance & Flexible Real-time Video Frames Decoder in Python.

@@ -26,9 +26,14 @@ Home-page: https://abhitronix.github.io/deffcode

Classifier: License :: OSI Approved :: Apache Software License
Classifier: Programming Language :: Python :: 3.7
Classifier: Programming Language :: Python :: 3.8
Classifier: Programming Language :: Python :: 3.9
Classifier: Programming Language :: Python :: 3.10
Requires-Python: >=3.7
Classifier: Programming Language :: Python :: 3.11
Requires-Python: >=3.8
Description-Content-Type: text/markdown
License-File: LICENSE
Requires-Dist: cython
Requires-Dist: numpy
Requires-Dist: requests
Requires-Dist: colorlog
Requires-Dist: tqdm

@@ -320,3 +325,3 @@ <!--

We welcome your contributions to help us improve and extend this project. If you want to get involved with VidGear development, checkout the **[Contribution Guidelines ▶️][contribute]**
We welcome your contributions to help us improve and extend this project. If you want to get involved with DeFFcode development, checkout the **[Contribution Guidelines ▶️][contribute]**

@@ -348,14 +353,14 @@ We're offering support for DeFFcode on [**Gitter Community Channel**][gitter]. Come and join the conversation over there!

[![DOI](https://zenodo.org/badge/DOI/10.5281/zenodo.7155399.svg)](https://doi.org/10.5281/zenodo.7155399)
[![DOI](https://zenodo.org/badge/DOI/10.5281/zenodo.7523792.svg)](https://doi.org/10.5281/zenodo.7523792)
```BibTeX
@software{deffcode,
author = {Abhishek Singh Thakur},
title = {abhiTronix/deffcode: v0.2.4},
month = oct,
year = 2022,
author = {Abhishek Thakur},
title = {abhiTronix/deffcode: v0.2.5},
month = jan,
year = 2023,
publisher = {Zenodo},
version = {v0.2.4},
doi = {10.5281/zenodo.7155399},
url = {https://doi.org/10.5281/zenodo.7155399}
version = {v0.2.5},
doi = {10.5281/zenodo.7523792},
url = {https://doi.org/10.5281/zenodo.7523792}
}

@@ -362,0 +367,0 @@ ```

@@ -14,2 +14,6 @@ LICENSE

deffcode.egg-info/requires.txt
deffcode.egg-info/top_level.txt
deffcode.egg-info/top_level.txt
tests/test_ffdecoder.py
tests/test_ffhelper.py
tests/test_sourcer.py
tests/test_utils.py

@@ -22,2 +22,3 @@ """

# import the necessary packages
import platform
import logging

@@ -110,6 +111,12 @@ import numpy as np

# handle process to be frames written
# handles process to be frames written
self.__process = None
# handle exclusive metadata
# handles disabling window for ffmpeg subprocess on Windows
self.__ffmpeg_window_disabler_patch = False
# checks machine OS
self.__machine_OS = platform.system()
# handles exclusive metadata
self.__ff_pixfmt_metadata = None # metadata

@@ -132,3 +139,3 @@ self.__raw_frame_num = None # raw-frame number

# handle termination
# handles termination
self.__terminate_stream = False

@@ -138,9 +145,12 @@

self.__extra_params = {
str(k).strip(): str(v).strip()
if not (v is None) and not isinstance(v, (dict, list, int, float, tuple))
else v
str(k).strip(): (
str(v).strip()
if not (v is None)
and not isinstance(v, (dict, list, int, float, tuple))
else v
)
for k, v in ffparams.items()
}
# handle custom Sourcer API params
# handles custom Sourcer API params
sourcer_params = self.__extra_params.pop("-custom_sourcer_params", {})

@@ -150,3 +160,3 @@ # reset improper values

# handle user ffmpeg pre-headers(parameters such as `-re`) parameters (must be a list)
# handles user ffmpeg pre-headers(parameters such as `-re`) parameters (must be a list)
self.__ffmpeg_prefixes = self.__extra_params.pop("-ffprefixes", [])

@@ -213,5 +223,25 @@ # check if not valid type

self.__verbose_logs and logger.critical(
"Enforcing OpenCV compatibility patch for YUV/NV frames."
"Enforcing OpenCV compatibility patch for YUV/NV video frames."
)
# handle disabling window for ffmpeg subprocess on Windows OS
# this patch prevents ffmpeg creation window from opening when
# building exe files
ffmpeg_window_disabler_patch = self.__extra_params.pop(
"-disable_ffmpeg_window", False
)
if ffmpeg_window_disabler_patch and isinstance(
ffmpeg_window_disabler_patch, bool
):
# check if value is valid
if self.__machine_OS != "Windows" or self.__verbose_logs:
logger.warning(
"Optional `-disable_ffmpeg_window` flag is only available on Windows OS with `verbose=False`. Discarding!"
)
else:
self.__ffmpeg_window_disabler_patch = ffmpeg_window_disabler_patch
else:
# handle improper values
self.__ffmpeg_window_disabler_patch = False
# handle pass-through audio mode works in conjunction with WriteGear [TODO]

@@ -297,3 +327,2 @@ self.__passthrough_mode = self.__extra_params.pop("-passthrough_audio", False)

def formulate(self):
"""

@@ -430,9 +459,11 @@ This method formulates all necessary FFmpeg pipeline arguments and executes it inside the FFmpeg `subprocess` pipe.

"{} Switching to default `{}` pixel-format!".format(
"Provided FFmpeg does not supports `{}` pixel-format.".format(
self.__sourcer_metadata["output_frames_pixfmt"]
if "output_frames_pixfmt" in self.__sourcer_metadata
else self.__frame_format
)
if self.__frame_format != "null"
else "No usable pixel-format defined.",
(
"Provided FFmpeg does not supports `{}` pixel-format.".format(
self.__sourcer_metadata["output_frames_pixfmt"]
if "output_frames_pixfmt" in self.__sourcer_metadata
else self.__frame_format
)
if self.__frame_format != "null"
else "No usable pixel-format defined."
),
default_pixfmt,

@@ -478,5 +509,5 @@ )

if "output_frames_pixfmt" in self.__sourcer_metadata:
self.__sourcer_metadata[
"output_frames_pixfmt"
] = self.__raw_frame_pixfmt
self.__sourcer_metadata["output_frames_pixfmt"] = (
self.__raw_frame_pixfmt
)

@@ -816,5 +847,7 @@ # handle raw-frame resolution

key,
" and its counterpart"
if key in counterpart_prop.values()
else "",
(
" and its counterpart"
if key in counterpart_prop.values()
else ""
),
value[key],

@@ -851,3 +884,2 @@ )

def __launch_FFdecoderline(self, input_params, output_params):
"""

@@ -891,3 +923,9 @@ This Internal method executes FFmpeg pipeline arguments inside a `subprocess` pipe in a new process.

self.__process = sp.Popen(
cmd, stdin=sp.DEVNULL, stdout=sp.PIPE, stderr=sp.DEVNULL
cmd,
stdin=sp.DEVNULL,
stdout=sp.PIPE,
stderr=sp.DEVNULL,
creationflags=( # this prevents ffmpeg creation window from opening when building exe files on Windows
sp.DETACHED_PROCESS if self.__ffmpeg_window_disabler_patch else 0
),
)

@@ -913,5 +951,3 @@

# terminate/kill process if still processing
if self.__process.poll() is None:
# demuxers prefer kill
self.__process.kill()
self.__process.poll() is None and self.__process.terminate()
# wait if not exiting

@@ -918,0 +954,0 @@ self.__process.wait()

@@ -345,10 +345,11 @@ """

"""
# extract and clean FFmpeg output
demuxers = check_sp_output([path, "-hide_banner", "-demuxers"])
splitted = [x.decode("utf-8").strip() for x in demuxers.split(b"\n")]
supported_demuxers = splitted[splitted.index("--") + 1 : len(splitted) - 1]
# compile regex
finder = re.compile(r"\s\s[a-z0-9_,-]+\s+")
# find all outputs
outputs = finder.findall("\n".join(supported_demuxers))
# return output findings
split_index = [idx for idx, s in enumerate(splitted) if "--" in s][0]
supported_demuxers = splitted[split_index + 1 : len(splitted) - 1]
# search all demuxers
outputs = [re.search(r"\s[a-z0-9_,-]{2,}\s", d) for d in supported_demuxers]
outputs = [o.group(0) for o in outputs if o]
# return demuxers output
return [o.strip() if not ("," in o) else o.split(",")[-1].strip() for o in outputs]

@@ -522,6 +523,8 @@

idx,
dev
if machine_OS != "Linux"
else "{} at path `{}`".format(
next(iter(dev.values()))[0], next(iter(dev.keys()))
(
dev
if machine_OS != "Linux"
else "{} at path `{}`".format(
next(iter(dev.values()))[0], next(iter(dev.keys()))
)
),

@@ -637,4 +640,7 @@ )

supported_protocols = splitted[splitted.index("Output:") + 1 : len(splitted) - 1]
# rtsp is a demuxer somehow
supported_protocols += ["rtsp"] if "rtsp" in get_supported_demuxers(path) else []
# RTSP is a demuxer somehow
# support both RTSP and RTSPS(over SSL)
supported_protocols += (
["rtsp", "rtsps"] if "rtsp" in get_supported_demuxers(path) else []
)
# Test and return result whether scheme is supported

@@ -683,3 +689,3 @@ if extracted_scheme_url and extracted_scheme_url in supported_protocols:

if retcode and not (retrieve_stderr):
logger.error("[Pipline-Error] :: {}".format(output.decode("utf-8")))
logger.error("[Pipeline-Error] :: {}".format(output.decode("utf-8")))
cmd = kwargs.get("args")

@@ -693,3 +699,3 @@ if cmd is None:

bool(output) or bool(stderr) or logger.error(
"[Pipline-Error] :: Pipline failed to exact any data from command: {}!".format(
"[Pipeline-Error] :: Pipeline failed to exact any data from command: {}!".format(
args[0] if args else []

@@ -696,0 +702,0 @@ )

@@ -90,3 +90,3 @@ """

"""
# checks if machine in-use is running windows os or not
# checks machine OS
self.__machine_OS = platform.system()

@@ -104,5 +104,7 @@

self.__sourcer_params = {
str(k).strip(): str(v).strip()
if not isinstance(v, (dict, list, int, float, tuple))
else v
str(k).strip(): (
str(v).strip()
if not isinstance(v, (dict, list, int, float, tuple))
else v
)
for k, v in sourcer_params.items()

@@ -204,2 +206,3 @@ }

self.__default_video_resolution = "" # handles stream resolution
self.__default_video_orientation = "" # handles stream's video orientation
self.__default_video_framerate = "" # handles stream framerate

@@ -224,2 +227,3 @@ self.__default_video_bitrate = "" # handles stream's video bitrate

self.__output_frames_pixfmt = "" # handles output frame pixel format
self.__output_orientation = "" # handles output frame orientation

@@ -258,2 +262,3 @@ # check whether metadata probed or not?

self.__default_video_framerate = video_rfparams["framerate"]
self.__default_video_orientation = video_rfparams["orientation"]

@@ -269,2 +274,3 @@ # parse output parameters through filters (if available)

self.__output_framerate = out_video_rfparams["framerate"]
self.__output_orientation = out_video_rfparams["orientation"]
# parse output pixel-format

@@ -366,2 +372,3 @@ self.__output_frames_pixfmt = self.__extract_video_pixfmt(

"source_video_framerate": self.__default_video_framerate,
"source_video_orientation": self.__default_video_orientation,
"source_video_decoder": self.__default_video_decoder,

@@ -395,2 +402,3 @@ "source_duration_sec": self.__default_source_duration,

"output_framerate": self.__output_framerate,
"output_orientation": self.__output_orientation,
}

@@ -408,2 +416,3 @@ )

"output_framerate": self.__default_video_framerate,
"output_orientation": self.__default_video_orientation,
}

@@ -501,10 +510,14 @@ )

"Successfully configured device `{}` at index `{}` with demuxer `{}`.".format(
self.__extracted_devices_list[index]
if self.__machine_OS != "Linux"
else next(
iter(self.__extracted_devices_list[index].values())
)[0],
index
if index >= 0
else len(self.__extracted_devices_list) + index,
(
self.__extracted_devices_list[index]
if self.__machine_OS != "Linux"
else next(
iter(self.__extracted_devices_list[index].values())
)[0]
),
(
index
if index >= 0
else len(self.__extracted_devices_list) + index
),
self.__source_demuxer,

@@ -609,5 +622,7 @@ )

selected_stream = video_bitrate_text[
default_stream
if default_stream > 0 and default_stream < len(video_bitrate_text)
else 0
(
default_stream
if default_stream > 0 and default_stream < len(video_bitrate_text)
else 0
)
]

@@ -644,5 +659,7 @@ filtered_bitrate = re.findall(

selected_stream = meta_text[
default_stream
if default_stream > 0 and default_stream < len(meta_text)
else 0
(
default_stream
if default_stream > 0 and default_stream < len(meta_text)
else 0
)
]

@@ -681,5 +698,7 @@ filtered_pixfmt = re.findall(

selected_stream = meta_text[
default_stream
if default_stream > 0 and default_stream < len(meta_text)
else 0
(
default_stream
if default_stream > 0 and default_stream < len(meta_text)
else 0
)
]

@@ -711,5 +730,7 @@ filtered_pixfmt = re.findall(

selected_stream = meta_text[
default_stream
if default_stream > 0 and default_stream < len(meta_text)
else 0
(
default_stream
if default_stream > 0 and default_stream < len(meta_text)
else 0
)
]

@@ -742,3 +763,3 @@ # filter data

"""
This Internal method parses default video-stream resolution and framerate from metadata.
This Internal method parses default video-stream resolution, orientation, and framerate from metadata.

@@ -766,8 +787,26 @@ Parameters:

)
# extract video orientation metadata if available
identifiers_orientation = ["displaymatrix:", "rotation"]
meta_text_orientation = (
[
line.strip()
for line in self.__ffsp_output.split("\n")
if all(x in line for x in identifiers_orientation)
]
if not extract_output
else [
line.strip()
for line in self.__metadata_output.split("\n")
if all(x in line for x in identifiers_orientation)
]
)
# use metadata if available
result = {}
if meta_text:
selected_stream = meta_text[
default_stream
if default_stream > 0 and default_stream < len(meta_text)
else 0
(
default_stream
if default_stream > 0 and default_stream < len(meta_text)
else 0
)
]

@@ -800,4 +839,20 @@

return result if result and (len(result) == 2) else {}
# extract video orientation metadata
if meta_text_orientation:
selected_stream = meta_text_orientation[
(
default_stream
if default_stream > 0 and default_stream < len(meta_text)
else 0
)
]
filtered_orientation = re.findall(
r"[-]?\d+\.\d+", selected_stream.strip()
)
result["orientation"] = float(filtered_orientation[0])
else:
result["orientation"] = 0.0
return result if result and (len(result) == 3) else {}
def __extract_duration(self, inseconds=True):

@@ -804,0 +859,0 @@ """

@@ -1,1 +0,1 @@

__version__ = "0.2.5"
__version__ = "0.2.6"
+17
-12
Metadata-Version: 2.1
Name: deffcode
Version: 0.2.5
Version: 0.2.6
Summary: A cross-platform High-performance & Flexible Real-time Video Frames Decoder in Python.

@@ -26,9 +26,14 @@ Home-page: https://abhitronix.github.io/deffcode

Classifier: License :: OSI Approved :: Apache Software License
Classifier: Programming Language :: Python :: 3.7
Classifier: Programming Language :: Python :: 3.8
Classifier: Programming Language :: Python :: 3.9
Classifier: Programming Language :: Python :: 3.10
Requires-Python: >=3.7
Classifier: Programming Language :: Python :: 3.11
Requires-Python: >=3.8
Description-Content-Type: text/markdown
License-File: LICENSE
Requires-Dist: cython
Requires-Dist: numpy
Requires-Dist: requests
Requires-Dist: colorlog
Requires-Dist: tqdm

@@ -320,3 +325,3 @@ <!--

We welcome your contributions to help us improve and extend this project. If you want to get involved with VidGear development, checkout the **[Contribution Guidelines ▶️][contribute]**
We welcome your contributions to help us improve and extend this project. If you want to get involved with DeFFcode development, checkout the **[Contribution Guidelines ▶️][contribute]**

@@ -348,14 +353,14 @@ We're offering support for DeFFcode on [**Gitter Community Channel**][gitter]. Come and join the conversation over there!

[![DOI](https://zenodo.org/badge/DOI/10.5281/zenodo.7155399.svg)](https://doi.org/10.5281/zenodo.7155399)
[![DOI](https://zenodo.org/badge/DOI/10.5281/zenodo.7523792.svg)](https://doi.org/10.5281/zenodo.7523792)
```BibTeX
@software{deffcode,
author = {Abhishek Singh Thakur},
title = {abhiTronix/deffcode: v0.2.4},
month = oct,
year = 2022,
author = {Abhishek Thakur},
title = {abhiTronix/deffcode: v0.2.5},
month = jan,
year = 2023,
publisher = {Zenodo},
version = {v0.2.4},
doi = {10.5281/zenodo.7155399},
url = {https://doi.org/10.5281/zenodo.7155399}
version = {v0.2.5},
doi = {10.5281/zenodo.7523792},
url = {https://doi.org/10.5281/zenodo.7523792}
}

@@ -362,0 +367,0 @@ ```

@@ -286,3 +286,3 @@ <!--

We welcome your contributions to help us improve and extend this project. If you want to get involved with VidGear development, checkout the **[Contribution Guidelines ▶️][contribute]**
We welcome your contributions to help us improve and extend this project. If you want to get involved with DeFFcode development, checkout the **[Contribution Guidelines ▶️][contribute]**

@@ -314,14 +314,14 @@ We're offering support for DeFFcode on [**Gitter Community Channel**][gitter]. Come and join the conversation over there!

[![DOI](https://zenodo.org/badge/DOI/10.5281/zenodo.7155399.svg)](https://doi.org/10.5281/zenodo.7155399)
[![DOI](https://zenodo.org/badge/DOI/10.5281/zenodo.7523792.svg)](https://doi.org/10.5281/zenodo.7523792)
```BibTeX
@software{deffcode,
author = {Abhishek Singh Thakur},
title = {abhiTronix/deffcode: v0.2.4},
month = oct,
year = 2022,
author = {Abhishek Thakur},
title = {abhiTronix/deffcode: v0.2.5},
month = jan,
year = 2023,
publisher = {Zenodo},
version = {v0.2.4},
doi = {10.5281/zenodo.7155399},
url = {https://doi.org/10.5281/zenodo.7155399}
version = {v0.2.5},
doi = {10.5281/zenodo.7523792},
url = {https://doi.org/10.5281/zenodo.7523792}
}

@@ -328,0 +328,0 @@ ```

@@ -93,8 +93,8 @@ """

"License :: OSI Approved :: Apache Software License",
"Programming Language :: Python :: 3.7",
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
],
python_requires=">=3.7",
python_requires=">=3.8",
scripts=[],

@@ -101,0 +101,0 @@ project_urls={