New Research: Supply Chain Attack on Axios Pulls Malicious Dependency from npm.Details
Socket
Book a DemoSign in
Socket

fmdt-python

Package Overview
Dependencies
Maintainers
1
Versions
43
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

fmdt-python - pypi Package Compare versions

Comparing version
0.0.39
to
0.0.41
+21
fmdt_python-0.0.41.dist-info/LICENSE
MIT License
Copyright (c) 2023 Evan Voyles
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
Metadata-Version: 2.1
Name: fmdt-python
Version: 0.0.41
Summary: utlity functions for fast meteor detection toolbox
Author-email: Evan Voyles <ejovo13@yahoo.com>
Project-URL: fmdt, https://github.com/alsoc/fmdt
Project-URL: Homepage, https://github.com/ejovo13/fmdt_scripts
Classifier: Programming Language :: Python :: 3
Classifier: License :: OSI Approved :: MIT License
Classifier: Operating System :: OS Independent
Requires-Python: >=3.10
Description-Content-Type: text/markdown
License-File: LICENSE
Requires-Dist: numpy
Requires-Dist: ffmpeg-python
Requires-Dist: requests
Requires-Dist: termcolor
Requires-Dist: matplotlib
Requires-Dist: pandas
Requires-Dist: jsonpickle
Requires-Dist: appdirs
Requires-Dist: Deprecated
A series of Python scripts to facilitate the processing of [fmdt's](https://github.com/alsoc/fmdt) output
Scripts for video editing rely on [ffmpeg-python's](https://github.com/kkroening/ffmpeg-python) simple Python bindings to ffmpeg. Make sure you install `ffmpeg-python` before trying any of the video editing functionality.
### Installation
```
pip install fmdt-python
```
`fmdt/core.py` should contain the functions that are called directly in scripts.
`fmdt/utils.py` contains utility functions that `fmdt.core` makes use of.
Example to split a video using tracking information already provided by `fmdt-detect`:
```
import fmdt
fmdt.split_video_at_meteors("demo.mp4", "ex1_detect_tracks.txt")
```
#### TODO
- [x] Upload fmdt to pip so that we can download fmdt and call scripts from anywhere
- [x] Add API to call fmdt executables like `fmdt-detect` and `fmdt-visu`
fmdt/__init__.py,sha256=c2_mqwk3mNmc6gCjhc86j8PnaLGLRhHH7O9GoYWLF5A,1386
fmdt/__test__.py,sha256=FXbpPO1x5EatXgegxcJLGb1iUpWMCz9xuSvOWc0zr8A,1919
fmdt/api.py,sha256=rFHKXzmOnQnoXUJC13QjbXYXfwFZdmVZ1NnQzX52cH8,12273
fmdt/args.py,sha256=p8alJhYCrPrDI23bCq1Nge9qDx09vJZurYwVI6I4mDw,29126
fmdt/config.py,sha256=ZUXr-CUDih7olqIU0wRAYkEXZNyBiHfNBrpbg61l8o4,7995
fmdt/core.py,sha256=Bzm6pGjXV6Hko9BeTgz5Ixl_xH8XOukO9Y-o-_DSsfY,15530
fmdt/db.py,sha256=qv-rS4Of0nE8iWcTxttfW6wBS47pRWu9LVbAnOX64uQ,25847
fmdt/download.py,sha256=aVGLn6MvB9FB5ndmHp399II3DCaOrZDm-539xyTltsU,3791
fmdt/exceptions.py,sha256=shvijE3euAAFiFO5yehddsiL0DSQBPU4Y3V887CYQzY,225
fmdt/res.py,sha256=Y0RvFRdikLYGXPFcrhgjf6ibmsk4GkQun13XPxDZZiw,14856
fmdt/tests.py,sha256=tAYIICMqg5LVOWanlfWtHcKgy2uvSRfVOmJl4R3dLLc,3562
fmdt/truth.py,sha256=X75ADsXeowMl_d_-CZk29k7rGrGsnyL65qwB5sugQOs,19740
fmdt/utils.py,sha256=sqE-rA78rr_bLqHXTp_HdCMFRNXPGJGbamVYS4GdhtM,10095
fmdt_python-0.0.41.dist-info/LICENSE,sha256=7oPhtg5rNNa1IVBDV-A7WY-TVhhBIwuaT4dKmd9sqMY,1068
fmdt_python-0.0.41.dist-info/METADATA,sha256=ggGe1CXvXNCBjXqFZFt_tnooUMBiCP3MzKvQsuMpnik,1611
fmdt_python-0.0.41.dist-info/WHEEL,sha256=pkctZYzUS4AYVn6dJ-7367OJZivF2e8RA9b_ZBjif18,92
fmdt_python-0.0.41.dist-info/top_level.txt,sha256=mfR9HxoivulFZbTMXWUH0J2-8270GBpGjQe69ki4V8E,5
fmdt_python-0.0.41.dist-info/RECORD,,
Wheel-Version: 1.0
Generator: bdist_wheel (0.40.0)
Root-Is-Purelib: true
Tag: py3-none-any
import unittest
import shutil
# import fmdt.api
import os
import fmdt.args
import fmdt.api
import fmdt.res
import fmdt.core
import fmdt.download
import pandas as pd
class TestAPI(unittest.TestCase):
def test_fmdt_detect_existence(self):
fmdt_detect_exe = shutil.which("fmdt-detect")
self.assertTrue(not fmdt_detect_exe is None, "Executable 'fmdt-detect' not found on the path")
class TestARGS(unittest.TestCase):
#================= Creation of Args type ==============
def test_detect_arg_creation(self):
args = fmdt.detect_args("demo.mp4")
self.assertTrue(args.detect_args.vid_in_path == "demo.mp4")
self.assertTrue(args.visu_args.vid_out_path == "demo_visu.mp4")
def test_arg_creation(self):
args = fmdt.Args.new(vid_in_path="demo.mp4", vid_out_path="demo_out.mp4", vid_in_start=10)
self.assertTrue(args.visu_args.vid_out_path == "demo_out.mp4")
self.assertTrue(args.visu_args.vid_in_start == 10)
self.assertTrue(args.detect_args.vid_in_start == 10)
args2 = fmdt.Args.new(vid_in_path="demo.mp4", timeout=4, log=True)
self.assertTrue(args2.timeout == 4)
self.assertTrue(args2.log)
class TestDetections(unittest.TestCase):
def test_demo_plain(self):
if not os.path.exists("demo.mp4"):
print("Cannot run detection tests on demo.mp4 because it was not found")
exit(2)
res = fmdt.detect("demo.mp4")
"""Needs to be run with "demo.mp4" in the current working directory"""
self.assertTrue(isinstance(res, fmdt.res.DetectionResult))
self.assertTrue(isinstance(res.trk_list[0], fmdt.core.TrackedObject))
self.assertTrue(len(res.trk_list) == 38)
self.assertTrue(res.trk_list[0].type_str() == "Meteor")
self.assertTrue(res.nframes == 256)
# Since we haven't passed the log_path parameter, we expect no data to be stored
self.assertTrue(len(res.nrois) == 0)
def test_demo_log(self):
if not os.path.exists("demo.mp4"):
print("Cannot run detection tests on demo.mp4 because it was not found")
exit(2)
res = fmdt.detect("demo.mp4", log_path="log")
self.assertTrue(len(res.nrois) == 256)
df = res.to_dataframe()
self.assertTrue(len(df) == 256)
def test_demo_log_all(self):
if not os.path.exists("demo.mp4"):
print("Cannot run detection tests on demo.mp4 because it was not found")
exit(2)
res = fmdt.detect("demo.mp4", log_path="log", trk_all=True)
self.assertTrue(len(res.trk_list) == 82)
self.assertTrue(len(res.nrois) == 256)
df = res.to_dataframe()
self.assertTrue(len(df) == 256)
class TestGroundTruth(unittest.TestCase):
def test_downloads(self):
"""Download all the csvs associated with our data base"""
fmdt.download.download_csvs(overwrite=True)
# Now I should be able to load in the values using pandas I guess
# We want to load in some ground truth values as well
gt6 = fmdt.GroundTruth("human_detections_draco6.csv")
gt12 = fmdt.GroundTruth("human_detections_draco12.csv")
gtall = fmdt.GroundTruth("human_detections.csv")
self.assertTrue(len(gt6) == 38)
self.assertTrue(len(gt12) == 39)
self.assertTrue(len(gtall) == 77)
# class TestKnownResults(unittest.TestCase):
# def test_
def main() -> None:
unittest.main()
if __name__ == '__main__':
main()
+11
-5

@@ -20,3 +20,5 @@ from fmdt.core import (

VisuArgs,
detect_args
detect_args,
set_exec_path,
get_exec_path
)

@@ -40,3 +42,4 @@

download_demo_mp4,
download_dbs
download_dbs,
get_db_dir
)

@@ -60,3 +63,4 @@

listdir_cache,
size_cache
size_cache,
bytes_format
)

@@ -68,3 +72,3 @@

VideoClip,
load_in_videos,
retrieve_videos,
load_draco6,

@@ -74,2 +78,4 @@ load_draco12,

load_demo,
load_window_clips,
load_all,
info as local_info

@@ -86,4 +92,4 @@ )

MINOR_VERSION = 0
PATCH = 39
PATCH = 41
VERSION = str(MAJOR_VERSION) + "." + str(MINOR_VERSION) + "." + str(PATCH)

@@ -300,3 +300,6 @@ """

def check(
trk_path: str, gt_path: str, stdout: str = None, log = False
trk_path: str,
gt_path: str,
stdout: str = None,
log = False
) -> fmdt.res.CheckResult:

@@ -309,3 +312,10 @@ """Call fmdt-check

argv = ["fmdt-check", "--trk-path", trk_path, "--gt-path", gt_path]
if fmdt.args.get_exec_path() is None:
fmdt_check_exe = shutil.which("fmdt-check")
else:
fmdt_check_exe = shutil.which("fmdt-check", path=fmdt.args.get_exec_path())
assert not fmdt_check_exe is None, "fmdt-check executable not found!"
argv = [fmdt_check_exe, "--trk-path", trk_path, "--gt-path", gt_path]
res = subprocess.run(argv, stdout=subprocess.PIPE)

@@ -312,0 +322,0 @@

@@ -14,3 +14,11 @@ """Arguments Class to store shared parameters when calling a chain of .detect(args).visu().split()"""

__EXECUTABLE_PATH = None
def set_exec_path(path: str):
global __EXECUTABLE_PATH
__EXECUTABLE_PATH = path
def get_exec_path() -> str:
return __EXECUTABLE_PATH
class Result:

@@ -453,3 +461,3 @@ pass

def detect_cmd(self) -> str:
return handle_detect_args(**self.detect_args)
return handle_detect_args(**self.detect_args.to_dict())

@@ -534,3 +542,3 @@ def visu(self, **kwargs):

"""Return the command used to execute fmdt-detect with this configuration"""
return ' '.join(handle_detect_args(**self.detect_args))
return ' '.join(handle_detect_args(**self.detect_args.to_dict()))

@@ -743,7 +751,8 @@ def digest(self) -> str:

"""Convert the arguments needed for fmdt-detect into a fmdt-detect command-line call
"""
if get_exec_path() is None:
fmdt_detect_exe = shutil.which("fmdt-detect")
else:
fmdt_detect_exe = shutil.which("fmdt-detect", path=get_exec_path())
fmdt_detect_exe = shutil.which("fmdt-detect")
fmdt_detect_found = not fmdt_detect_exe is None

@@ -810,2 +819,6 @@ assert fmdt_detect_found, "fmdt-detect executable not found"

if get_exec_path() is None:
fmdt_visu_exe = shutil.which("fmdt-visu")
else:
fmdt_visu_exe = shutil.which("fmdt-visu", path=get_exec_path())

@@ -812,0 +825,0 @@ fmdt_visu_exe = shutil.which("fmdt-visu")

@@ -177,4 +177,4 @@ """

"type": <"meteor" | "noise" | "start">
"frame_start": <int>
"frame_end": <int>
"start_frame": <int>
"end_frame": <int>
}

@@ -352,3 +352,3 @@ where each item in the list corresponds to a single object detected by

`video_filename` (str): Filename of video to split
`start_end` (list[tuple[int, int]]): list of (frame_start, frame_end) pairs to split this video at
`start_end` (list[tuple[int, int]]): list of (start_frame, end_frame) pairs to split this video at
`nframes_before` (int): Number of frames to extract before the meteor sequence begins

@@ -355,0 +355,0 @@ (Default value of 3)

+228
-95

@@ -24,2 +24,3 @@ """Module dealing with the database stuff"""

VIDEOS_FILE = fmdt.config.dir() + "/videos.db"
DEFAULT_DATA_DIR = fmdt.download.get_db_dir()

@@ -30,3 +31,4 @@ class VideoType(Enum):

WINDOW = 2,
OTHER = 3
DEMO = 3,
OTHER = 4

@@ -40,2 +42,4 @@ def __str__(self) -> str:

return "WINDOW"
elif self == VideoType.DEMO:
return "DEMO"
else:

@@ -56,2 +60,6 @@ return "OTHER"

return con.win
elif self == VideoType.DEMO:
return con.win
elif self == VideoType.OTHER:
return con.win
else:

@@ -67,4 +75,8 @@ return "./"

return VideoType.DRACO12
elif str == "WINDOW":
return VideoType.WINDOW
elif str == "DEMO":
return VideoType.DEMO
else:
return VideoType.WINDOW
return VideoType.OTHER

@@ -265,6 +277,12 @@ # Or maybe recording, night sky watching, stargazing, video of night sky

# Lookup the id in our default database file.
def id(self) -> int:
def id(
self,
db_file = "videos.db",
db_dir = DEFAULT_DATA_DIR
) -> int:
con = sqlite3.connect(VIDEOS_FILE)
db_filename = fmdt.utils.join(db_dir, db_file)
con = sqlite3.connect(db_filename)
query = f"""

@@ -290,7 +308,17 @@ select id from video where name = '{self.name}'

def meteors(self) -> list[fmdt.HumanDetection]:
return retrieve_meteors(self.name)
def meteors(
self,
db_filename: str = "videos.db",
db_dir = DEFAULT_DATA_DIR
) -> list[fmdt.HumanDetection]:
return retrieve_meteors(self.name, db_filename, db_dir)
def has_meteors(self) -> bool:
return len(self.meteors()) > 0
def has_meteors(
self,
db_filename: str = "videos.db",
db_dir = DEFAULT_DATA_DIR
) -> bool:
return len(self.meteors(db_filename, db_dir)) > 0

@@ -379,10 +407,15 @@ def exists(self) -> bool:

def create_clip(self, frame_start: int, frame_end: int):
def create_clip(self, start_frame: int, end_frame: int):
valid_bounds = frame_start >= 0 and frame_end <= self.nb_frames()
assert valid_bounds, f"Cannot create video clip with bounds [{frame_start}, {frame_end}] for {self} ({self.nb_frames()} frames)"
valid_bounds = start_frame >= 0 and end_frame <= self.nb_frames()
assert valid_bounds, f"Cannot create video clip with bounds [{start_frame}, {end_frame}] for {self} ({self.nb_frames()} frames)"
return VideoClip(self.name, frame_start, frame_end, self.type)
return VideoClip(self.name, start_frame, end_frame, self.type)
def create_clips(self, frame_buffer: int = None, condense = True, save_to_disk = False):
def create_clips(
self,
frame_buffer: int = None,
condense = True,
save_to_disk = False,
db_dir = DEFAULT_DATA_DIR):
"""

@@ -401,3 +434,3 @@ If a video has meteors in our data base, create video clips that capture each meteor

if not self.has_meteors():
if not self.has_meteors(db_dir = db_dir):
raise GroundTruthError(f"No ground truths in our database for {self}")

@@ -409,3 +442,3 @@

# We need to get the intervals associated with the ground truths
meteors = self.meteors()
meteors = self.meteors(db_dir = db_dir)
intervals = [m.interval() for m in meteors]

@@ -431,2 +464,18 @@

def retrieve_clips(
self,
db_file = "videos.db",
db_dir = DEFAULT_DATA_DIR
) -> list:
"""Retrieve predefined clips in the table `video_clips` from our videos.db"""
db_filename = fmdt.utils.join(db_dir, db_file)
con = sqlite3.connect(db_filename)
self_clips = pd.read_sql_query(f"SELECT * FROM video_clips WHERE parent_id = {self.id(db_file, db_dir)}",
con = con)
return [VideoClip.from_pd_row(r) for _, r in self_clips.iterrows()]
@staticmethod

@@ -438,3 +487,84 @@ def from_pd_row(ser: pd.Series):

@staticmethod
def from_db_id(
id: int,
db_file = "videos.db",
db_dir = DEFAULT_DATA_DIR
):
con = sqlite3.connect(fmdt.utils.join(db_dir, db_file))
video_pd = pd.read_sql_query(f"SELECT * FROM video WHERE id = {id}", con = con)
v = Video.from_pd_row(video_pd.iloc[0])
con.close()
return v
class VideoClip(Video):
def __init__(self, name: str, start_frame: int, end_frame: int, type: VideoType = None):
super().__init__(name, type)
self.start_frame = start_frame
self.end_frame = end_frame
def __str__(self) -> str:
s = super().__str__()
return s + f" [{self.start_frame}, {self.end_frame}]"
def __eq__(self, rhs) -> bool:
return (
self.name == rhs.name and
self.start_frame == rhs.start_frame and
self.end_frame == rhs.end_frame
)
def meteors(
self,
db_file = "videos.db",
db_dir = DEFAULT_DATA_DIR
):
p_meteors = super().meteors(db_file, db_dir)
def pred(hum_det: fmdt.HumanDetection):
"""Check if the meteors start frame is in the clip"""
return hum_det.start_frame >= self.start_frame and hum_det.start_frame <= self.end_frame
def modify_meteor(m: fmdt.HumanDetection):
"""Modify the interval with respect to this video clip"""
m_c = deepcopy(m)
m_c.start_frame = m.start_frame - self.start_frame
m_c.end_frame = m.end_frame - self.start_frame
return m_c
return [modify_meteor(m) for m in p_meteors if pred(m)]
def parent_path(self) -> str:
"""Return the full path to the folder that these clips will appear in"""
return self.dir() + "/" + self.prefix() + "/"
def full_path(self) -> str:
return self.parent_path() + f"f{self.start_frame:04}-{self.end_frame:04}_." + self.suffix()
def save(self, overwrite = False):
"""
Write this clip to disk
"""
fmdt.utils.mkdir_p(self.parent_path())
fmdt.utils.extract_video_frames(super().full_path(), self.start_frame, self.end_frame, self.full_path(), overwrite=overwrite)
@staticmethod
def from_pd_row(
row: pd.Series,
db_file = "videos.db",
db_dir = DEFAULT_DATA_DIR
):
parent_video = Video.from_db_id(row["parent_id"], db_file, db_dir)
return VideoClip(parent_video.name, row["start_frame"], row["end_frame"], parent_video.type)
# Take a list of Videos and turn it into a data base.

@@ -458,19 +588,2 @@ def videos_to_csv(videos: list[Video], csv_filename: str) -> None:

def load_in_videos(db_filename: str = "videos.db", dir = fmdt.download.__DATA_DIR) -> list[Video]:
"""Read in the videos stored in 'videos.db' into a list of fmdt.Video"""
# Download if the database file requested doesnt exist
if not os.path.exists(dir + "/" + db_filename):
fmdt.download.download_videos_db(db_filename, log=False, overwrite=False, dir=dir)
if not dir is None:
db_filename = dir + "/" + db_filename
con = sqlite3.connect(db_filename)
df = pd.read_sql_query("select * from video", con)
con.close()
return [fmdt.Video.from_pd_row(df.iloc[i]) for i in range(len(df))]
def has_meteors(vids: list[Video]) -> list[Video]:

@@ -480,4 +593,4 @@ return [v for v in vids if v.has_meteors()]

def load_draco6(
filename: str = "videos.db",
db_dir = fmdt.download.__DATA_DIR,
db_filename: str = "videos.db",
db_dir = DEFAULT_DATA_DIR,
require_gt = False,

@@ -488,7 +601,7 @@ require_exist = False

vids = load_in_videos(filename, db_dir)
vids = retrieve_videos(db_filename, db_dir)
d6 = [v for v in vids if v.is_draco6()]
if require_gt:
d6 = [v for v in d6 if v.has_meteors()]
d6 = [v for v in d6 if v.has_meteors(db_filename, db_dir)]

@@ -502,3 +615,3 @@ if require_exist:

db_filename: str = "videos.db",
db_dir = fmdt.download.__DATA_DIR,
db_dir = DEFAULT_DATA_DIR,
require_gt = False,

@@ -508,7 +621,7 @@ require_exist = False

vids = load_in_videos(db_filename, db_dir)
vids = retrieve_videos(db_filename, db_dir)
d12 = [v for v in vids if v.is_draco12()]
if require_gt:
d12 = [v for v in d12 if v.has_meteors()]
d12 = [v for v in d12 if v.has_meteors(db_filename, db_dir)]

@@ -522,3 +635,3 @@ if require_exist:

db_filename: str = "videos.db",
db_dir = fmdt.download.__DATA_DIR,
db_dir = DEFAULT_DATA_DIR,
require_gt = False,

@@ -528,7 +641,7 @@ require_exist = False

vids = load_in_videos(db_filename, db_dir)
vids = retrieve_videos(db_filename, db_dir)
win = [v for v in vids if v.is_window()]
if require_gt:
win = [v for v in win if v.has_meteors()]
win = [v for v in win if v.has_meteors(db_filename, db_dir)]

@@ -539,6 +652,34 @@ if require_exist:

return win
def load_window_clips(
db_file = "videos.db",
db_dir = DEFAULT_DATA_DIR,
require_exist = False
) -> list[VideoClip]:
"""Load all of the clips associated with all of our windows objects"""
db_path = fmdt.utils.join(db_dir, db_file)
con = sqlite3.connect(db_path)
df: pd.DataFrame = pd.read_sql_query("select * from video_clips", con = con)
con.close()
if require_exist:
out = []
for _, row in df.iterrows():
v = VideoClip.from_pd_row(row, db_file, db_dir)
if v.exists():
out.append[v]
return out
else:
return [VideoClip.from_pd_row(row, db_file, db_dir) for _, row in df.iterrows()]
def load_demo(db_filename: str = "videos.db", db_dir = fmdt.download.__DATA_DIR) -> list[Video]:
def load_demo(db_filename: str = "videos.db", db_dir = DEFAULT_DATA_DIR) -> list[Video]:
"""Load video 2022_05_31_tauh_34_meteors.mp4 from our database"""
vids = load_in_videos(db_filename, db_dir)
vids = retrieve_videos(db_filename, db_dir)
win = [v for v in vids if v.name == "2022_05_31_tauh_34_meteors.mp4"]

@@ -549,17 +690,50 @@

def retrieve_meteors(video_name: str, db_filename: str = "videos.db", dir = fmdt.download.__DATA_DIR) -> list[fmdt.HumanDetection]:
def load_all(
db_filename = "videos.db",
db_dir = DEFAULT_DATA_DIR,
require_gt = True
) -> list:
"""Load all videos that have a ground truth in our database"""
draco6 = load_draco6 (db_filename, db_dir, require_gt)
draco12 = load_draco12(db_filename, db_dir, require_gt)
windows = load_window_clips(db_filename, db_dir)
return draco6 + draco12 + windows
def retrieve_videos(
db_filename: str = "videos.db",
db_dir = DEFAULT_DATA_DIR
) -> list[Video]:
"""Read in the videos stored in 'videos.db' into a list of fmdt.Video"""
db_path = fmdt.utils.join(db_dir, db_filename)
# Download if the database file requested doesnt exist
if not os.path.exists(db_path):
fmdt.download.download_videos_db(db_filename, log=False, overwrite=False, dir=db_dir)
con = sqlite3.connect(db_path)
df = pd.read_sql_query("select * from video", con)
con.close()
return [fmdt.Video.from_pd_row(df.iloc[i]) for i in range(len(df))]
def retrieve_meteors(
video_name: str,
db_filename: str = "videos.db",
db_dir = DEFAULT_DATA_DIR
) -> list[fmdt.HumanDetection]:
"""Query all of the ground truths in our database"""
db_path = fmdt.utils.join(db_dir, db_filename)
if not os.path.exists(dir + "/" + db_filename):
fmdt.download.download_videos_db(db_filename, log=False, overwrite=False, dir=dir)
if not os.path.exists(db_path):
fmdt.download.download_videos_db(db_filename, log=False, overwrite=False, dir=db_dir)
if not dir is None:
db_filename = dir + "/" + db_filename
query = f"""
select * from human_detection where video_name = '{video_name}'
select * from human_detections where video_name = '{video_name}'
"""
con = sqlite3.connect(db_filename)
con = sqlite3.connect(db_path)
df = pd.read_sql_query(query, con)

@@ -569,4 +743,4 @@ con.close()

return [fmdt.HumanDetection.from_pd_row(df.iloc[i]) for i in range(len(df))]
# return df
def get_video_diagnostics(vids: list[Video]) -> tuple[int, int]:

@@ -628,3 +802,3 @@ """Print information about the local environment"""

def get_video_by_id(id: int) -> Video | None:
videos = load_in_videos()
videos = retrieve_videos()
v = [v for v in videos if v.has_id(id)]

@@ -644,43 +818,2 @@ if len(v) != 0:

class VideoClip(Video):
def __init__(self, name: str, frame_start: int, frame_end: int, type: VideoType = None):
super().__init__(name, type)
self.frame_start = frame_start
self.frame_end = frame_end
def __str__(self) -> str:
s = super().__str__()
return s + f" [{self.frame_start}, {self.frame_end}]"
def meteors(self):
p_meteors = super().meteors()
def pred(hum_det: fmdt.HumanDetection):
"""Check if the meteors start frame is in the clip"""
return hum_det.start_frame >= self.frame_start and hum_det.start_frame <= self.frame_end
def modify_meteor(m: fmdt.HumanDetection):
"""Modify the interval with respect to this video clip"""
m_c = deepcopy(m)
m_c.start_frame = m.start_frame - self.frame_start
m_c.end_frame = m.end_frame - self.frame_start
return m_c
return [modify_meteor(m) for m in p_meteors if pred(m)]
def parent_path(self) -> str:
"""Return the full path to the folder that these clips will appear in"""
return self.dir() + "/" + self.prefix() + "/"
def full_path(self) -> str:
return self.parent_path() + f"f{self.frame_start:04}-{self.frame_end:04}_." + self.suffix()
def save(self, overwrite = False):
"""
Write this clip to disk
"""
fmdt.utils.mkdir_p(self.parent_path())
fmdt.utils.extract_video_frames(super().full_path(), self.frame_start, self.frame_end, self.full_path(), overwrite=overwrite)

@@ -6,2 +6,6 @@ """Module dealing with downloading files and stuff"""

from fmdt.utils import (
join
)
# Let's define some functions that will fetch our data base files

@@ -24,3 +28,6 @@

def get_db_dir() -> str:
return __DATA_DIR
def download_file(

@@ -35,3 +42,3 @@ filename: str,

if not dir is None:
filename = dir + "/" + filename
filename = join(dir, filename)

@@ -60,3 +67,3 @@ if os.path.exists(filename) and not overwrite:

if not dir is None:
filename = dir + "/" + filename
filename = join(dir, filename)

@@ -63,0 +70,0 @@ if os.path.exists(filename) and not overwrite:

class GroundTruthError(Exception):
"""Raised when a video doesn't have any ground truths in our data base"""
pass
class LogError(Exception):
"""Raised when we have issues with the `log_path` parameter"""
pass

@@ -8,5 +8,22 @@ """Store the results of a run to fmdt-detect"""

def retrieve_all_nroi(log_path: str) -> list[int]:
from fmdt.exceptions import (
LogError
)
from fmdt.utils import (
stderr,
join
)
def retrieve_all_nroi(log_path: str, max_frames: int = None) -> list[int]:
"""
Assumes the format: '# Frame n°00247 (t) -- Regions of interest (RoI) [64]: \n'
Read the log files stored in log_path and retrieve a list of the number of regions of interest
Assumes the format: '# Frame n°XXXXX (t) -- Regions of interest (RoI) [XX]:'
Parameters
----------
log_path (str): The directory that was passed to fmdt-detect's --log-path parameter
max_frames (int | None): The maximum number of frame files in `log_path` to read. Useful for when
multiple different executions are saving log information to the same directory.
"""

@@ -26,3 +43,3 @@

def retrieve_single_nroi(frame: str):
with open(log_path + "/" + frame, "r") as file:
with open(join(log_path, frame), "r") as file:
lines = file.readlines()

@@ -41,15 +58,14 @@ rois = [l for l in lines if contains_roi(l) and "(t)" in l]

frames = os.listdir(log_path)[1:]
frames = os.listdir(log_path)[1:] # frame 0 has no associations; skip it
rxp = r'\[\d*\]'
rxp = r'\[\d*\]' # regex to capture '42' in line '# Associations [42]:'
rxp_comp = re.compile(rxp)
def retrieve_single_nassociations(frame: str):
with open(log_path + "/" + frame, "r") as file:
with open(join(log_path, frame), "r") as file:
lines = file.readlines()
assocs = [l for l in lines if "Associations" in l]
tokens = assocs[0].split()
n_roi = [n for n in tokens if rxp_comp.search(n)]
return int(n_roi[0][1:-2])
tokens = assocs[0].split() # convert '# Associations [42]:' to ['#', 'Associations', '[42]:']
n_roi = [n for n in tokens if rxp_comp.search(n)] # n_roi <- ['[42]:']
return int(n_roi[0][1:-2]) # extract 42 from ['[42]:']

@@ -78,3 +94,3 @@ return [retrieve_single_nassociations(f) for f in frames]

def mean_err(filename) -> float:
m, _ = retrieve_mean_err_std_dev(log_path + "/" + filename)
m, _ = retrieve_mean_err_std_dev(join(log_path, filename))
return m

@@ -92,3 +108,3 @@

def mean_err(filename) -> float:
_, s = retrieve_mean_err_std_dev(log_path + "/" + filename)
_, s = retrieve_mean_err_std_dev(join(log_path, filename))
return s

@@ -104,3 +120,3 @@

nrois = retrieve_all_nroi(log_path)
nassocs = retrieve_all_nassociations(log_path)
nassocs = retrieve_all_nassociations(log_path) # C'est la ou l'erreur s'est produit
mean_errs = retrieve_all_mean_errs(log_path)

@@ -140,2 +156,6 @@ std_devs = retrieve_all_std_devs(log_path)

return self.args.visu()
def cmd(self) -> str:
"""Return the command used to call this detect"""
return self.args.command()

@@ -142,0 +162,0 @@ def n_meteors_detected(self) -> int:

@@ -7,2 +7,6 @@ import ffmpeg

from termcolor import (
colored
)
def retain_meteors(tracking_list: list[dict]) -> list[dict]:

@@ -285,1 +289,10 @@ """Take a list of dictionaries returned by one of the fmdt.extract_* functions

return(intervals)
def join(dir: str, file: str):
"""Convenience function for os.path.join"""
return os.path.join(dir, os.path.basename(file))
def stderr(message: str) -> None:
"""Print a message to stderr that will show up as red in most terminals"""
print(colored(message, "red"), file=sys.stderr)
"""Tutorial to use the GroundTruth class of fmdt.truth.GroundTruth"""
import fmdt
import fmdt.truth
# We need to define the directory where our GroundTruth videos are found.
vid_dir12 = "/home/ejovo/Videos/Watec12mm"
vid_dir6 = "/home/ejovo/Videos/Watec6mm"
csv12 = "./human_detections_draco12.csv"
csv6 = "./human_detections_draco6.csv"
# We can create a new `GroundTruth` object - a list of `HumanDetection` paired
# with useful functions - two ways
gt12 = fmdt.truth.GroundTruth(csv=csv12, vid_dir=vid_dir12)
gt6 = fmdt.truth.GroundTruth(csv=csv6, vid_dir=vid_dir6)
print(type(gt12))
print(gt12)
print(type(gt6))
print(gt6)
# We can create an Args and run that command across all the
# videos in our database with GroundTruth.try_command
d_args = {
"light_min": 251,
"light_max": 252,
"trk_out_path": "tracks.txt",
"timeout": 1, #! Special parameter of the pyhton SUBPROCESS managing fmdt-detect
}
args = fmdt.Args(detect_args=d_args)
success = gt6.try_command(args)
print(success)
print(sum(success))
# Start implementing the functions needed to check fmdt
import fmdt
import fmdt.truth
og = "2022_05_31_tauh_34_meteors.mp4"
args = fmdt.detect_args(trk_out_path="trk.txt")
meteors = fmdt.truth.load_meteors_file("meteors.txt", "2022_05_31_tauh_34_meteors.mp4")
print(meteors)
v = fmdt.Video(og, fmdt.VideoType.WINDOW)
v.evaluate_args(args, meteors)
print(v)
print(v.full_path())
print(v.exists())
v.detect()
print(fmdt.load_config())
MIT License
Copyright (c) 2023 Evan Voyles
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
Metadata-Version: 2.1
Name: fmdt-python
Version: 0.0.39
Summary: utlity functions for fast meteor detection toolbox
Project-URL: fmdt, https://github.com/alsoc/fmdt
Project-URL: Homepage, https://github.com/ejovo13/fmdt_scripts
Author-email: Evan Voyles <ejovo13@yahoo.com>
License-File: LICENSE
Classifier: License :: OSI Approved :: MIT License
Classifier: Operating System :: OS Independent
Classifier: Programming Language :: Python :: 3
Requires-Python: >=3.10
Requires-Dist: appdirs
Requires-Dist: deprecated
Requires-Dist: ffmpeg-python
Requires-Dist: jsonpickle
Requires-Dist: matplotlib
Requires-Dist: numpy
Requires-Dist: pandas
Requires-Dist: requests
Requires-Dist: termcolor
Description-Content-Type: text/markdown
A series of Python scripts to facilitate the processing of [fmdt's](https://github.com/alsoc/fmdt) output
Scripts for video editing rely on [ffmpeg-python's](https://github.com/kkroening/ffmpeg-python) simple Python bindings to ffmpeg. Make sure you install `ffmpeg-python` before trying any of the video editing functionality.
### Installation
```
pip install fmdt-python
```
`fmdt/core.py` should contain the functions that are called directly in scripts.
`fmdt/utils.py` contains utility functions that `fmdt.core` makes use of.
Example to split a video using tracking information already provided by `fmdt-detect`:
```
import fmdt
fmdt.split_video_at_meteors("demo.mp4", "ex1_detect_tracks.txt")
```
#### TODO
- [x] Upload fmdt to pip so that we can download fmdt and call scripts from anywhere
- [x] Add API to call fmdt executables like `fmdt-detect` and `fmdt-visu`
docs_use_gt.py,sha256=rWW5XgI3h3qvm50q-9gTpqewnvTEOaMb0Mc78HJRNfA,1038
fmdt_check.py,sha256=CYaeqc2X7vB242_c_CZlbdJoTtlD8DTx2J-cLLQvJQI,440
get_vid_db.py,sha256=wZJZdDxF7943LHs28A_WPZbn5w4ivMeW3BQSVqaRGoM,1588
ground_truth.py,sha256=d3gxRPAISTna4Uqayjhn1BYSj-rSfoz9MFkJMt2YndA,3337
is_human_detected.py,sha256=ONul9ERekUqJA2vTYmkxW8e067L9lKmveNnTeL0hIA0,670
light_min_max.py,sha256=DjuB_tos8XK8kLUJsGcY42tq3NuW9b_NXES8UIYalr8,1040
load_meteors.py,sha256=YsE4io9F6Uf9sX1yz8nlk9hdYim5O02zK4X49Krp5Tw,169
movement_stats.py,sha256=HmHCSYE9BlZ1eB00ap0BDaXtoxYlrj2ZxeNlYbmlK30,7740
new_gt.py,sha256=EBAcUOh6JRkTdPxaxgy6H0mbp9RjK3tcEmDcIfvySKc,582
new_options.py,sha256=COmAInBo7hIFq5rx6hTDL6Mr1kS199SjE8eyUHyPTzw,398
new_testing.py,sha256=cH3NEIuUCFwm-PbF4s3K27t8sCdFtofyGFRZTwXLNAw,450
opti_draco6.py,sha256=C28ZniIJCzhuVyFIyUbcUMNhjHkZQ9gUWL3pNYqpdZA,345
playing_with_videos_db.py,sha256=3PRVg32bcet7x592qtWHScUQmQQcfPEC6r37Q2QejQg,1278
run_args_results.py,sha256=jGeASanSx8pWLvibvfT_FSASs_buIHr0WspLw9g7xKM,830
setup.py,sha256=VoRyY9amhUCM3npwDJv6JjTPoqQH8PeHkNqFIyVyVq8,117
split_window.py,sha256=xeAt_fVNdAX_3SZnd8Zohs1xL1RFGB2EsRgD4Eg_EV0,414
sql_stuff.py,sha256=abkL7boP2DHi1Elr3vG1WG4Hpfr8Drpu1k5qPM6na8w,1938
sqlite.py,sha256=1VZau3smmxs7xGq2CFNWbKSha2d5YxpqL7m8M_fuf9c,237
windows.py,sha256=zV0PBlwAwuXm3Wz1sYLItuj9XBxNxckWVu-NTrl7UB4,2024
windows_data.py,sha256=0V2lG93PsOwS3WN_eWYW2od2lfgOtEH2U5ChC9V2qyo,462
fmdt/__init__.py,sha256=4VQUgXywuk3pddX_pUils2VuPgt0N6D3oldxo4U7Ro4,1276
fmdt/__test__.py,sha256=FXbpPO1x5EatXgegxcJLGb1iUpWMCz9xuSvOWc0zr8A,1919
fmdt/api.py,sha256=WA0qrzf-OWRqZ5jn6IMBmOjlzXxegDV7A8hpIeRqQe4,11979
fmdt/args.py,sha256=YE_LLUwx7Rc5yMbxTDl_wNclifXJTUgKy8OlaIwNCAY,28658
fmdt/config.py,sha256=ZUXr-CUDih7olqIU0wRAYkEXZNyBiHfNBrpbg61l8o4,7995
fmdt/core.py,sha256=wWIF8iPbsy3i3ja2pzRFsFpMhTQv4TnB3Qftgerl5Kg,15530
fmdt/db.py,sha256=h0p5k2tAOfQB5ke48mBTglePjZu1ccfVIwfAr2VMLk4,22244
fmdt/download.py,sha256=-kUQRYUguAdDzJ_LqIxNyKgtmnzyAmwXQlIZZaUNuvA,3708
fmdt/exceptions.py,sha256=sGaB6iuSVT-uuaStplkY7g5VLLCFY33arrQSw_cADHg,121
fmdt/res.py,sha256=1WisapSMnE1czLQckhiCgzvPUWrcS47f6gx6arbv83c,13960
fmdt/truth.py,sha256=X75ADsXeowMl_d_-CZk29k7rGrGsnyL65qwB5sugQOs,19740
fmdt/utils.py,sha256=89vye2tPo1VdMFB7vBwfpIDOvrbRxHsfbLainCuNHFs,9757
fmdt_python-0.0.39.dist-info/METADATA,sha256=Z2cIcfRZjdpWfEAM7eKs6jitfXxrvroKOGpl8bnf0ck,1611
fmdt_python-0.0.39.dist-info/WHEEL,sha256=EI2JsGydwUL5GP9t6kzZv7G3HDPi7FuZDDf9In6amRM,87
fmdt_python-0.0.39.dist-info/licenses/LICENSE,sha256=7oPhtg5rNNa1IVBDV-A7WY-TVhhBIwuaT4dKmd9sqMY,1068
fmdt_python-0.0.39.dist-info/RECORD,,
Wheel-Version: 1.0
Generator: hatchling 1.14.0
Root-Is-Purelib: true
Tag: py3-none-any
import os
import fmdt
import fmdt.args
import sqlite3
w6 = "/run/media/ejovo/Seagate Portable Drive/Meteors/Watec6mm/Meteor"
w12 = "/run/media/ejovo/Seagate Portable Drive/Meteors/Watec12mm/Meteor"
win = "/run/media/ejovo/Seagate Portable Drive/Meteors"
def is_video(vid_name: str) -> bool:
return vid_name[-3:] == "avi" or vid_name[-3:] == "mp4"
def is_draco6(vid_name: str) -> bool:
return is_video(vid_name) and vid_name[0:13] == "Draconids-6mm"
def is_draco12(vid_name: str) -> bool:
return is_video(vid_name) and vid_name[0:14] == "Draconids-12mm"
def is_window(vid_name: str) -> bool:
return is_video(vid_name) and vid_name[0:6] == "window"
# def get_vids(dir: str, type: fmdt.args.VideoType) -> list[]
def get_draco6(dir) -> list[fmdt.args.Video]:
return [fmdt.Video(v, type=fmdt.args.VideoType.DRACO6) for v in os.listdir(dir) if is_draco6(v)]
def get_draco12(dir) -> list[fmdt.args.Video]:
return [fmdt.Video(v, type=fmdt.args.VideoType.DRACO12) for v in os.listdir(dir) if is_draco12(v)]
def get_window(dir) -> list[fmdt.args.Video]:
return [fmdt.Video(v, type=fmdt.args.VideoType.WINDOW) for v in os.listdir(dir) if is_window(v)]
d6 = get_draco6(w6)
d12 = get_draco12(w12)
dw = get_window(win)
d6.sort()
d12.sort()
dw.sort()
print(d6)
print()
print(d12)
print(len(d6))
print(len(d12))
print(len(dw))
all = [v for v in d6]
all.extend(d12)
all.extend(dw)
print(len(all))
print(type(all[0]))
fmdt.args.videos_to_csv(all, "test_vid.csv")
# videos = fmdt.args.csv_to_videos("test_vid.csv")
# print(len(videos))
# for i in range(10):
"""Load up a configuration of fmdt-detect commands to run, and expect to find a specific meteor"""
import fmdt.truth
import fmdt
import random
DATA_BASE_FILE = "human_detections.csv"
# DATA_BASE_FILE = "human_detection.csv"
humans = fmdt.HumanDetection.init_ground_truth(DATA_BASE_FILE)
# watec6 = None
# watec12 = None
watec6 = "/home/ejovo/Videos/Watec6mm"
watec12 = "/home/ejovo/Videos/Watec12mm"
# I actually think that I might want to
# Need to load in the configs
# Load them in as Args
gt12 = fmdt.init_ground_truth(vid_db_dir=watec12)
gt12.meteors = [m for m in gt12.meteors if m.is_draco_12()]
gt6 = fmdt.init_ground_truth(vid_db_dir=watec6)
gt6.meteors = [m for m in gt6.meteors if m.is_draco_6()]
print(gt6)
print(gt12)
print(gt6.meteors)
print(gt6.vids())
d_args = {
"light_min": 221,
"light_max": 246,
"trk_bb_path": "bb.txt",
"trk_out_path": "trk.txt",
"timeout": 1
}
a = fmdt.Args(detect_args=d_args)
# Apply a single command across the entire database
# detections = gt6.try_command(a)
# print(detections)
# EXPERIMENTAL RESULTS
detections = [False, False, True, False, False, False, True, False, False, False, False, False, False, False, False, False, False, False, False, False, False, False, False, False, False, False, False, False, False, False, False, False, False, False, True, False, False, False]
print(f"Sum detections: {sum(detections)}")
print(gt6)
def roll_parameters():
light_min = random.randint(0, 254)
diff = random.randint(1, 255 - light_min)
d_args["light_min"] = light_min
d_args["light_max"] = light_min + diff
det = gt6.try_command(fmdt.Args(detect_args=d_args))
print(f"[{light_min}, {light_min + diff}] detected {sum(det)} gts")
# roll_parameters()
N_TRIALS = 05
# N_TRIALS = 150
# I want a list of light min and light max pairs
lm_pairs = []
gts_det = []
lst_detections = [[]]
MAX_LIGHT_DIFF = 40
LIGHT_MIN_MIN = 150
LIGHT_MIN_MAX = 254
# ============================ ~ 20 minute run ========================
for k in range(N_TRIALS):
light_min = random.randint(LIGHT_MIN_MIN, LIGHT_MIN_MAX)
diff = random.randint(1, min(255 - light_min, MAX_LIGHT_DIFF))
d_args["light_min"] = light_min
d_args["light_max"] = light_min + diff
# det = gt6.try_command(fmdt.Args(detect_args=d_args))
det = gt12.try_command(fmdt.Args(detect_args=d_args))
print(f"[{light_min}, {light_min + diff}] detected {sum(det)} gts")
lm_pairs.append((light_min, light_min + diff))
gts_det.append(sum(det))
lst_detections.append(det)
# print(lm_pairs)
# print(gts_det)
# ======================================================================
# #! =============== Experimental Results ===========================
# lm_pairs = [(253, 254), (164, 176), (215, 237), (200, 214), (218, 230), (250, 253), (186, 188), (250, 255), (199, 238), (151, 190), (220, 230), (202, 214), (191, 211), (205, 227), (224, 252), (250, 253), (227, 237), (196, 235), (176, 189), (194, 219)]
# gts_det = [8, 0, 2, 0, 3, 6, 2, 6, 2, 0, 3, 1, 1, 0, 3, 6, 3, 1, 0, 0]
# # Let's try and plot this as a 3d surface?
# import matplotlib.pyplot as plt
# import matplotlib.patches as patches
# import numpy as np
lmin_lst = [l[0] for l in lm_pairs]
lmax_lst = [l[1] for l in lm_pairs]
print(lmin_lst)
print(lmax_lst)
print(gts_det)
import fmdt
import fmdt.truth
vid = "Draconids-6mm1.00-2750-163200.avi"
light_min = 186
light_max = 211
human = fmdt.truth.HumanDetection(vid, 45, 59, 240, 226, 125, 264)
track = "tracks.txt"
bb = "bb.txt"
# res = fmdt.detect(vid, light_min=light_min, light_max=light_max, trk_all=True, trk_out_path=track, trk_bb_path=bb)
# fmdt.visu(vid, track, bb, "visu.mp4", True)
# Get the tracking list
print(fmdt.extract_all_information(track))
tracking_list = fmdt.extract_all_information(track)
for t in tracking_list:
print(t)
is_det = fmdt.truth.is_meteor_detected(human, tracking_list)
print(f"Ground truth: {human} detected in {tracking_list}? : {is_det}")
import fmdt
import fmdt.utils
import numpy as np
import os
import sys
import fmdt.truth
# dir = "/run/media/ejovo/Seagate Portable Drive/Meteors/Watec12mm/Meteor/"
# name = "Draconids-12mm2.00.09-1450-202900.avi"
# vid = "Draconids-6mm1.00-2750-163200.avi"; human = fmdt.truth.HumanDetection(vid, 45, 59, 240, 226, 125, 264)
vid = "draco1.avi"; human = fmdt.truth.HumanDetection(vid, 45, 59, 240, 226, 125, 264)
# vid = "Draconids-6mm1.05-0750-164200.avi"; human = fmdt.truth.HumanDetection(vid, 16, 26, 12, 339, 4, 360)
# vid = "Draconids-6mm1.14-1400-170300.avi"; human = fmdt.truth.HumanDetection(vid, 11, 24, 11, 233, 14, 273)
# vid = "Draconids-6mm1.20-2350-171600.avi" ; human = fmdt.truth.HumanDetection(vid, 22, 38, 232, 368, 237, 396)
# vid = dir + name
if vid is None and len(sys.argv) < 2:
assert False, "No video selected. Pass the path of a video as an argument or change the variable `vid` in this script"
human.test_detection_vary_light(vid, offset=25, light_min_start=150, light_min_end=235, k_trials=15, log=True)
import fmdt
import fmdt.truth
lst = fmdt.truth.load_meteors_file("meteors.txt", "2022_05_31_tauh_34_meteors.mp4")
fmdt.truth.save_meteors_file("meteors_copy.txt", lst)
import fmdt
from termcolor import colored
# import fractions
d6 = fmdt.load_draco6(require_gt=True)
d12 = fmdt.load_draco12(require_gt=True, require_exist= True)
w = fmdt.load_window()
print(len(d6))
print(len(d12))
# Video With Detection (VWD)
def get_vwd_ids(vids: list[fmdt.Video], light_min, light_max) -> list[int]:
video_ids = []
for v in vids:
res = v.detect(light_min=light_min, light_max=light_max, save_df=True, timeout=1.0)
c_res = res.check()
trk_rate = c_res.meteor_stats()["trk_rate"]
if c_res.meteors_detected():
print(colored(f"{v} has tracking rate {trk_rate}", "green"))
video_ids.append(v.id())
else:
print(colored(f"{v} has tracking rate {trk_rate}", "red"))
return video_ids
# ids_245 = get_vwd_ids(d12, 245, 250)
ids_245 = [57, 58, 62, 65, 73, 75, 77, 83, 84, 85]
# ids_215 = get_vwd_ids(d12, 215, 220)
ids_215 = [54, 56, 57, 58, 60, 62, 63, 64, 65, 92]
# ids_225 = get_vwd_ids(d12, 225, 230)
ids_225 = [57, 58, 60, 61, 62, 65, 79, 83, 89, 92]
d12_ids = set(ids_245 + ids_215 + ids_225)
print(d12_ids)
print(len(d12_ids))
# Let's first select the 8 videos that are detected
# print(get_vwd_ids(d6, 253, 255))
# print(get_vwd_ids(d12, ))
import fmdt.db
v = fmdt.db.get_video_by_id(2)
print(v)
def discrete_video_measure(vid: fmdt.Video, interval: tuple[int, int], step_size = 5, log = False) -> tuple[int, int]:
"""Returns the number of successes (n_successes, total_intervals)"""
assert vid.exists(), f"Video {vid} does not exist on disk. Check your local config with fmdt.local_info() and fmdt.load_config()"
assert interval[0] >= 0, f"light_min ({interval[0]}) must be >= 0"
assert interval[1] <= 255, f"light_max ({interval[1]}) must be <= 0"
gap = interval[1] - interval[0]
assert step_size <= gap
total_intervals = 0
n_successes = 0
for lmin in range(interval[0], interval[1] - step_size + 1, step_size):
print(colored(f"[{lmin}, {lmin + step_size}]", "red"))
c_res = vid.detect(light_min=lmin, light_max=lmin + step_size, save_df=True, log=log).check()
total_intervals += 1
if c_res.meteors_detected():
n_successes += 1
return n_successes, total_intervals
# REAL WORLD RESULTS
d6_video_ids = [2, 5, 6, 20, 22, 25, 29, 35, 36]
d6_detected = fmdt.db.get_video_by_ids(d6_video_ids)
d12_detected = fmdt.db.get_video_by_ids(d12_ids)
print(d6_detected)
print(len(d6_detected))
interval = (240, 255)
step_size = 1
# ======================= id: 2 ============================
v = fmdt.db.get_video_by_id(2)
# ======================== id: 5 ============================
# v = fmdt.db.get_video_by_id(5)
# ================================================================
# def num_intervals(step_size):
# pass
# step_sizes = [1, 2, 3, 4, 5, 15]
# res = [discrete_video_measure(v, interval, s) for s in step_sizes]
# n_success = [r[0] for r in res]
# n_intervals = [r[1] for r in res]
# ratios = []
# for i in range(len(res)):
# ratios.append(n_success[i] / n_intervals[i])
# print(step_sizes)
# print(n_success)
# print(n_intervals)
# print(ratios)
# =========================================================================
# Now I guess we want to compare MOVEMENT statistics with TRACKING RATE
# v = fmdt.db.get_video_by_id(2)
# v = fmdt.db.get_video_by_id(5)
# print(v)
# args = {
# "light_min": 240,
# "light_max": 255,
# "save_df": True
# }
# res = v.detect(**args)
# print(res.df.head())
# print(res.roa())
# print(res.mean_roa())
# print(v.compute_trk_rate(**args))
import numpy as np
import pandas as pd
def generate_data(vid: fmdt.Video, light_min_min, light_max_max):
int_size = light_max_max - light_min_min
x = np.arange(1, int_size + 1)
f = lambda i: np.floor(int_size / i)
total_data_points = int(np.array(list(map(f, x))).sum())
trk_rate = np.zeros(total_data_points)
mean_roa = np.zeros(total_data_points)
mean_nroi = np.zeros(total_data_points)
mean_nassoc = np.zeros(total_data_points)
mean_mean_err = np.zeros(total_data_points)
mean_std_dev = np.zeros(total_data_points)
trk_rate = []
mean_roa = []
mean_nroi = []
mean_nassoc = []
mean_mean_err = []
mean_std_dev = []
# Now i have the number of intervals for each, but that doesnt really do anything.
# What am i actually iterating through?
# Two loops
for step_size in x:
for lmin in range(light_min_min, light_max_max - step_size + 1, step_size):
print(colored(f"[{lmin}, {lmin + step_size}]", "red"))
res = vid.detect(light_min=lmin, light_max=lmin + step_size,
save_df=True, log=False)
c_res = res.check()
if not res.df is None:
trk_rate.append(c_res.trk_rate())
mean_roa.append(res.mean_roa())
mean_nroi.append(res.mean_nroi())
mean_nassoc.append(res.mean_nassoc())
mean_mean_err.append(res.mean_mean_err())
mean_std_dev.append(res.mean_std_dev())
return pd.DataFrame({
"trk_rate": trk_rate,
"mean_roa": mean_roa,
"mean_nroi": mean_nroi,
"mean_nassoc": mean_nassoc,
"mean_mean_err": mean_mean_err,
"mean_std_dev": mean_std_dev
})
def generate_data_for_videos(vids: list[fmdt.Video], light_min_min, light_max_max):
int_size = light_max_max - light_min_min
x = [1]
i = 0
while x[i] <= int_size:
x.append(x[i] * 2)
i += 1
# x = np.logspace(0, np.log2(int_size), base=2)
# f = lambda i: np.floor(int_size / i)
# total_data_points = int(np.array(list(map(f, x))).sum()) * len(vids)
print(x)
# # raise AssertionError("You messed up")
# trk_rate = np.zeros(total_data_points)
# mean_roa = np.zeros(total_data_points)
# mean_nroi = np.zeros(total_data_points)
# mean_nassoc = np.zeros(total_data_points)
# mean_mean_err = np.zeros(total_data_points)
# mean_std_dev = np.zeros(total_data_points)
# Now i have the number of intervals for each, but that doesnt really do anything.
# What am i actually iterating through?
# Two loops
trk_rate = []
mean_roa = []
mean_nroi = []
mean_nassoc = []
mean_mean_err = []
mean_std_dev = []
for vid in vids:
for step_size in x:
for lmin in range(light_min_min, light_max_max - step_size + 1, step_size):
print(colored(f"[{lmin}, {lmin + step_size}]", "red"))
res = vid.detect(light_min=lmin, light_max=lmin + step_size,
save_df=True, log=False, timeout=1.0)
c_res = res.check()
if not res.df is None:
trk_rate.append(c_res.trk_rate())
mean_roa.append(res.mean_roa())
mean_nroi.append(res.mean_nroi())
mean_nassoc.append(res.mean_nassoc())
mean_mean_err.append(res.mean_mean_err())
mean_std_dev.append(res.mean_std_dev())
return pd.DataFrame({
"trk_rate": trk_rate,
"mean_roa": mean_roa,
"mean_nroi": mean_nroi,
"mean_nassoc": mean_nassoc,
"mean_mean_err": mean_mean_err,
"mean_std_dev": mean_std_dev
})
# df = generate_data(v, 230, 255)
# df.to_csv(f"data_{v.prefix()}.csv")
# Let's get the beast 9 videos
# vids = fmdt.db.get_video_by_ids(d6_video_ids)
# print(vids)
df = generate_data_for_videos(d12_detected, 155, 255)
df.to_csv(f"draco12_data.csv")
"""This script shows you how to load in our GroundTruth databases"""
import fmdt
# Specify where your videos are stored
fmdt.init(d6_dir = "/home/ejovo/Videos/Watec6mm",
d12_dir = "/home/ejovo/Videos/Watec12mm",
win_dir = "/home/ejovo/Videos/Window")
# Load in the database
gt6 = fmdt.load_gt6()
gt12 = fmdt.load_gt12()
# Far cleaner than before!
args = fmdt.detect_args(light_min=254, light_max=255, timeout=0.5)
# gt6.try_command(args)
# success_list = gt6.try_command(args)
# print(success_list)
# print(sum(success_list))
gt6.draw_heatmap(150, 200, 2)
import fmdt
window = fmdt.load_window()
clips = window[0].create_clips()
c = clips[-1]
trk_rates = []
for lmin in range(50, 150, 5):
# Call detect with varying (lmin, lmax)
res = c.detect(
light_min = lmin,
light_max = lmin + 5
)
# Retreive the trk_rate
trk_rate = res.check().trk_rate()
print(f"({lmin}, {lmin + 5}) -> {trk_rate}")
import fmdt
# Specifiy where your videos are stored
fmdt.init(d6_dir = "/home/ejovo/Videos/Watec6mm",
d12_dir = "/home/ejovo/Videos/Watec12mm",
win_dir = "/home/ejovo/Videos/Window")
# Load in the database
gt6 = fmdt.load_gt6()
gt12 = fmdt.load_gt12()
# Far cleaner than before!
args = fmdt.detect_args(light_min=254, light_max=255, timeout=1)
success_list = gt6.try_command(args)
print(success_list)
print(sum(success_list))
import fmdt
draco12 = fmdt.load_draco6(require_gt = True)
lmin = 232
lmax = 251
res_detect = [d.detect(light_min = lmin, light_max = lmax, timeout = 2) for d in draco12]
res_check = [r.check() for r in res_detect]
has_detection = [c.trk_rate() > 0.0 for c in res_check]
for r in res_check:
print(r.all_stats())
print(sum(has_detection))
"""Play with the contents of `videos.db` programmatically and store results in data frames."""
import pandas as pd
import sqlite3
import fmdt.args
import fmdt.truth
import fmdt.config
import fmdt.db
# Add a videos.db file to our config
fmdt.download_csvs()
# config = fmdt.config.load_config()
# files_in_config = fmdt.config.listdir()
# print(files_in_config)
print(f"config directory: {fmdt.config.dir()}")
fmdt.download_dbs()
vids = fmdt.load_draco6() # list[Video]
for i in range(20):
v = vids[i]
print(f"{v.id()}: {v} has {len(v.meteors())} meteor(s) in our database")
print(".")
print(".")
print(".")
print(f"{vids[-1].id()}: {vids[-1]} has {len(vids[-1].meteors())} meteor(s) in our database")
id = 17
v = vids[id]
print(f"vids[{id}].meteors() -> {v.meteors()[0]} and has full dir: {v.full_path()}")
on_disc = [v for v in vids if v.exists()]
has_meteors = [v for v in on_disc if v.has_meteors()]
print(f"{len(on_disc)} videos exist on disc out of {len(vids)}")
print(f"{len(has_meteors)} videos have meteors and exist on disk")
# Let's get some diagnostics then.
# Like: How many videos exist from our database exist on disk?
d6 = fmdt.load_draco6()
d12 = fmdt.load_draco12()
w = fmdt.load_window()
fmdt.db.info()
print(w)
print(w[0].full_path())
# Convert the results obtained from `run_args.sh` into a three arrays of the same size:
#
# light_min
# light_max
# n_detected
# file = "draco6_random_lminmax_results.txt"
# file = "data_all_k30j10.txt"
file = "data_all.txt"
# file = "draco12_k30_j10.txt"
# Convert a '[1, 2, 3]' to [1, 2, 3]
def strlist_to_listint(strlist: str) -> list[int]:
red = strlist[1:-1]
ints = red.split(',')
return [int(i.strip()) for i in ints]
print(strlist_to_listint('[1, 2, 3]'))
with open(file) as f:
lines = [strlist_to_listint(l.strip()) for l in f.readlines()]
lmins = lines[0::3]
lmaxs = lines[1::3]
dets = lines[2::3]
lmins = [j for i in lmins for j in i]
lmaxs = [j for i in lmaxs for j in i]
dets = [j for i in dets for j in i]
print(lmins)
print(lmaxs)
print(dets)
from setuptools import setup
setup(
name="fmdt-python",
packages=["fmdt"]
# package_dir={'fmdt':'src'}
)
import fmdt
ints = [(785, 790), (1222, 1250), (1426, 1439), (2288, 2323), (2836, 2850), (2810, 2888), (2928, 2933), (3426, 3434), (3857, 3862), (4155, 4179), (4262, 4268), (4447, 4460), (5323, 5330), (6790, 6811), (7199, 7207)]
print(ints)
win = fmdt.load_window()
v = win[0]
print(v)
print(v.exists())
fmdt.split_video_at_intervals(v.full_path(), ints, nframes_before=-15, nframes_after=50, overwrite=True)
"""Convert the contents of `human_detectsions.csv` and `test_vid.csv` and "meteors.txt" into a single data base file `videos.db`"""
import sqlite3
import pandas as pd
from fmdt.args import *
import fmdt.truth
f = open("videos.db", "w")
f.close()
con = sqlite3.connect("videos.db")
cur = con.cursor()
cur.execute("CREATE TABLE video(id, name, type)")
# Let's load in our csv
df = pd.read_csv("test_vid.csv")
print(df.head(10))
for i in range(len(df)):
r = df.iloc[i]
id = r["id"]
name = r["name"]
type = r["type"]
# print(id)
# print(name)
# print(type)
cur.execute(f"""
INSERT INTO video VALUES
({id}, '{name}', '{type}')
""")
#================ GroundTruths ===================
# fmdt.truth.GroundTruth("")
# fmdt.truth.init_ground_truth()
csv_file = "human_detections.csv"
df_gt = pd.read_csv(csv_file)
# Pandas DataFrame Ground Truths
cur.execute("CREATE TABLE human_detection(id, video_name, start_x, start_y, start_frame, end_x, end_y, end_frame)")
for i in range(len(df_gt)):
r = df_gt.iloc[i]
vid_name = r["video_name"]
start_x = r["start_x"]
start_y = r["start_y"]
start_frame = r["start_frame"]
end_x = r["end_x"]
end_y = r["end_y"]
end_frame = r["end_frame"]
cur.execute(f"""
INSERT INTO human_detection VALUES
({i}, '{vid_name}', {start_x}, {start_y}, {start_frame}, {end_x}, {end_y}, {end_frame})
""")
print(i)
import fmdt.db
import fmdt.truth
# ================= Ground Truths in meteors.txt ========
meteors = fmdt.truth.load_meteors_file("meteors.txt", "2022_05_31_tauh_34_meteors.mp4")
start = i + 1
for i in range(start, start + len(meteors)):
m = meteors[i - start]
query = f"""
INSERT INTO human_detection VALUES
({i}, '{m.video_name}', {m.start_x}, {m.start_y}, {m.start_frame}, {m.end_x}, {m.end_y}, {m.end_frame})
"""
cur.execute(query)
con.commit()
con.close()
"""Learn how to play around with sql lite"""
import os
import fmdt
import fmdt.config
con = fmdt.config.load_config()
print(con)
files = os.listdir(con.d6)
all_vids = fmdt.config.get_local_vids()
print(all_vids)
print(len(all_vids))
import fmdt
import pandas as pd
window = fmdt.load_window()
def get_csv_name(w: fmdt.Video, diff: int) -> str:
pre = w.prefix()
return pre + f"_50_200_{diff}_data.csv"
def get_df(w: fmdt.Video) -> pd.DataFrame:
dfs = []
for d in [1, 2, 4, 8]:
dfs.append(pd.read_csv(get_csv_name(w, d)))
df = pd.concat(dfs)
df = df.drop_duplicates()
return df
df = get_df(window[0])
print(df.head())
df.groupby(["lmin", "lmax"]).mean()
import fmdt
import pandas as pd
import numpy as np
# Let's evaluate how well light min and light max play with our windows videos
def score_window(clips: list[fmdt.VideoClip], light_min = 55, light_max = 80) -> pd.DataFrame:
"""Return a list containing the trk rates for each individual clip"""
rates = []
for c in clips:
d_res = c.detect(light_min=light_min, light_max=light_max)
if len(d_res.trk_list) > 0:
c_res = d_res.check()
rates.append(c_res.trk_rate())
else:
rates.append(0.0)
n = len(clips)
clips_id = range(0, n)
lmins = np.full(n, light_min)
lmaxs = np.full(n, light_max)
return pd.DataFrame({
"clip_id": clips_id,
"lmin": lmins,
"lmax": lmaxs,
"trk_rate": rates
})
def score_window_lmin_lmax(window: fmdt.Video, lmin_min, lmin_max, diff):
clips = window.create_clips()
# Make sure the clips exist on disk
for c in clips:
if not c.exists():
c.save()
df = score_window(clips)
for lmin in range(lmin_min, lmin_max + diff, diff):
df = pd.concat([df, score_window(clips, lmin, lmin + diff)])
df.to_csv(f"{window.prefix()}_{lmin_min}_{lmin_max}_{diff}_data.csv", index=False)
windows = fmdt.load_window()
windows = windows[2:-2]
# Now let's get some light_min and light_max variation
lmin_min = 50
lmin_max = 200
# score_window_lmin_lmax(windows[0], lmin_min, lmin_max, 1)
# score_window_lmin_lmax(windows[0], lmin_min, lmin_max, 2)
# score_window_lmin_lmax(windows[0], lmin_min, lmin_max, 4)
# score_window_lmin_lmax(windows[0], lmin_min, lmin_max, 8)
# score_window_lmin_lmax(windows[1], lmin_min, lmin_max, 1)
# score_window_lmin_lmax(windows[1], lmin_min, lmin_max, 2)
# score_window_lmin_lmax(windows[1], lmin_min, lmin_max, 4)
# score_window_lmin_lmax(windows[1], lmin_min, lmin_max, 8)
diffs = [1, 2, 4, 8]
for w in windows:
for d in diffs:
score_window_lmin_lmax(w, lmin_min, lmin_max, d)