New Research: Supply Chain Attack on Axios Pulls Malicious Dependency from npm.Details
Socket
Book a DemoSign in
Socket

rssbot

Package Overview
Dependencies
Maintainers
1
Versions
62
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

rssbot - pypi Package Compare versions

Comparing version
645
to
646
+22
rssbot/modules/__init__.py
# This file is placed in the Public Domain.
"modules"
from . import dbg, irc, req, rss, slg, thr, ver
__all__= (
'dbg',
'irc',
'req',
'rss',
'slg',
'thr',
'ver'
)
def __dir__():
return __all__
# This file is placed in the Public Domain.
"debug"
import time
from ..clients import Fleet
def dbg(event):
event.reply("raising exception")
raise Exception("yo!")
def brk(event):
event.reply("borking")
for clt in Fleet.all():
if "sock" in dir(clt):
event.reply(f"shutdown on {clt.cfg.server}")
time.sleep(2.0)
try:
clt.sock.shutdown(2)
except OSError:
pass
+1
-1
Metadata-Version: 2.4
Name: rssbot
Version: 645
Version: 646
Summary: 24/7 Feed Fetcher.

@@ -5,0 +5,0 @@ Author-email: Bart Thate <rssbotd@gmail.com>

@@ -12,3 +12,3 @@ [build-system]

description = "24/7 Feed Fetcher."
version = "645"
version = "646"
authors = [

@@ -15,0 +15,0 @@ {name = "Bart Thate",email = "rssbotd@gmail.com"},

Metadata-Version: 2.4
Name: rssbot
Version: 645
Version: 646
Summary: 24/7 Feed Fetcher.

@@ -5,0 +5,0 @@ Author-email: Bart Thate <rssbotd@gmail.com>

README.rst
pyproject.toml
rssbot/__init__.py
rssbot/__main__.py
rssbot/caching.py
rssbot/clients.py
rssbot/command.py
rssbot/handler.py
rssbot/imports.py
rssbot/objects.py
rssbot/persist.py
rssbot/runtime.py
rssbot/utility.py
rssbot.egg-info/PKG-INFO

@@ -19,2 +15,4 @@ rssbot.egg-info/SOURCES.txt

rssbot.egg-info/top_level.txt
rssbot/modules/__init__.py
rssbot/modules/dbg.py
rssbot/modules/irc.py

@@ -21,0 +19,0 @@ rssbot/modules/rss.py

@@ -13,13 +13,13 @@ # This file is placed in the Public Domain.

from .clients import Client
from .command import Main, Commands, command, inits, parse, scan
from .command import Main, Commands
from .command import command, inits, parse, scan
from .handler import Event
from .imports import modules
from .persist import Workdir, pidname, skel, types
from .runtime import level
from .modules import rss
Main.modpath = os.path.dirname(rss.__file__)
from . import modules as MODS
Main.name = Main.__module__.split(".")[0]

@@ -33,2 +33,5 @@

"clients"
class CLI(Client):

@@ -67,3 +70,3 @@

out(f"{Main.name.upper()} {Main.version} since {tme} ({Main.level.upper()})")
out(f"loaded {".".join(modules(Main.modpath))}")
out(f"loaded {".".join(dir(MODS))}")

@@ -107,2 +110,3 @@

except (KeyboardInterrupt, EOFError):
print("")
sys.exit(1)

@@ -162,4 +166,4 @@

Commands.add(cmd)
scan(Main.modpath)
inits(Main.init or "irc,rss")
scan(MODS)
inits(MODS, Main.init or "irc,rss")
forever()

@@ -178,6 +182,6 @@

Commands.add(ls)
scan(Main.modpath)
scan(MODS)
if "v" in Main.opts:
banner()
for _mod, thr in inits(Main.init):
for _mod, thr in inits(MODS, Main.init):
if "w" in Main.opts:

@@ -199,3 +203,3 @@ thr.join(30.0)

Commands.add(srv)
scan(Main.modpath)
scan(MODS)
csl = CLI()

@@ -217,4 +221,4 @@ evt = Event()

Commands.add(cmd)
scan(Main.modpath)
inits(Main.init or "irc,rss")
scan(MODS)
inits(MODS, Main.init or "irc,rss")
forever()

@@ -240,3 +244,2 @@

"runtime"

@@ -267,3 +270,3 @@

if check("a"):
Main.init = ",".join(modules(Main.modpath))
Main.init = ",".join(dir(MODS))
if check("v"):

@@ -270,0 +273,0 @@ setattr(Main.opts, "v", True)

@@ -7,3 +7,2 @@ # This file is placed in the Public Domain.

import queue
import threading

@@ -14,5 +13,7 @@ import time

from .handler import Handler
from .runtime import launch
"client"
class Client(Handler):

@@ -43,43 +44,2 @@

"output"
class Output(Client):
def __init__(self):
Client.__init__(self)
self.oqueue = queue.Queue()
self.oready = threading.Event()
self.ostop = threading.Event()
def oput(self, event):
self.oqueue.put(event)
def output(self):
while not self.ostop.is_set():
event = self.oqueue.get()
if event is None:
self.oqueue.task_done()
break
self.display(event)
self.oqueue.task_done()
self.oready.set()
def start(self):
super().start()
self.oready.clear()
self.ostop.clear()
launch(self.output)
def stop(self):
Client.stop(self)
self.ostop.set()
self.oqueue.put(None)
self.oready.wait()
def wait(self):
self.oqueue.join()
super().wait()
"fleet"

@@ -151,4 +111,3 @@

'Client',
'Fleet',
'Output'
'Fleet'
)

@@ -8,4 +8,2 @@ # This file is placed in the Public Domain.

import inspect
import os
import sys
import time

@@ -15,5 +13,4 @@

from .clients import Fleet
from .imports import load, pathtoname
from .objects import Default
from .runtime import launch, spl
from .runtime import launch

@@ -34,3 +31,2 @@

level = "warn"
modpath = ""
name = Default.__module__.split(".")[-2]

@@ -41,3 +37,3 @@ opts = Default()

verbose = False
version = 325
version = 85

@@ -87,8 +83,6 @@

def inits(names):
def inits(pkg, names):
modz = []
for name in sorted(spl(names)):
path = os.path.join(Main.modpath, name + ".py")
mname = pathtoname(path)
mod = load(path, mname)
mod = getattr(pkg, name, None)
if not mod:

@@ -161,10 +155,58 @@ continue

def scan(path):
for fnm in os.listdir(path):
pth = os.path.join(path, fnm)
mod = load(pth)
if mod:
Commands.scan(mod)
def scan(pkg):
for modname in dir(pkg):
mod = getattr(pkg, modname)
Commands.scan(mod)
"utilities"
def elapsed(seconds, short=True):
txt = ""
nsec = float(seconds)
if nsec < 1:
return f"{nsec:.2f}s"
yea = 365*24*60*60
week = 7*24*60*60
nday = 24*60*60
hour = 60*60
minute = 60
yeas = int(nsec/yea)
nsec -= yeas*yea
weeks = int(nsec/week)
nsec -= weeks*week
nrdays = int(nsec/nday)
nsec -= nrdays*nday
hours = int(nsec/hour)
nsec -= hours*hour
minutes = int(nsec/minute)
nsec -= int(minute*minutes)
sec = int(nsec)
if yeas:
txt += f"{yeas}y"
if weeks:
nrdays += weeks * 7
if nrdays:
txt += f"{nrdays}d"
if short and txt:
return txt.strip()
if hours:
txt += f"{hours}h"
if minutes:
txt += f"{minutes}m"
if sec:
txt += f"{sec}s"
txt = txt.strip()
return txt
def spl(txt):
try:
result = txt.split(',')
except (TypeError, ValueError):
result = [txt, ]
return [x for x in result if x]
"interface"

@@ -178,5 +220,9 @@

'command',
'elapsed',
'inits',
'load',
'modules',
'parse',
'scan'
'scan',
'spl'
)

@@ -44,3 +44,2 @@ # This file is placed in the Public Domain.

self.callback(event)
self.ready.set()

@@ -63,3 +62,2 @@ def poll(self):

self.queue.put(None)
self.ready.wait()

@@ -107,4 +105,4 @@ def wait(self):

return (
'Handler',
'Event'
'Event',
'Handler'
)

@@ -8,2 +8,3 @@ # This file is placed in the Public Domain.

import base64
import queue
import os

@@ -17,3 +18,3 @@ import socket

from ..clients import Fleet, Output
from ..clients import Client, Fleet
from ..command import Main, command

@@ -26,16 +27,27 @@ from ..handler import Event as IEvent

IGNORE = ["PING", "PONG", "PRIVMSG"]
IGNORE = ["PING", "PONG", "PRIVMSG"]
initlock = threading.RLock()
saylock = threading.RLock()
"init"
def init():
irc = IRC()
irc.start()
irc.events.joined.wait(30.0)
rlog("debug", fmt(irc.cfg, skip=["password", "realname", "username"]))
return irc
with initlock:
irc = IRC()
irc.start()
irc.events.joined.wait(30.0)
if irc.events.joined.is_set():
rlog("debug", fmt(irc.cfg, skip=["password", "realname", "username"]))
else:
irc.stop()
return irc
"config"
class Config(Default):

@@ -68,2 +80,5 @@

"event"
class Event(IEvent):

@@ -84,2 +99,5 @@

"wrapper"
class TextWrap(textwrap.TextWrapper):

@@ -100,5 +118,43 @@

class IRC(Output):
"output"
class Output:
def __init__(self):
self.oqueue = queue.Queue()
self.ostop = threading.Event()
def oput(self, event):
self.oqueue.put(event)
def output(self):
while not self.ostop.is_set():
event = self.oqueue.get()
if event is None:
self.oqueue.task_done()
break
self.display(event)
self.oqueue.task_done()
def start(self):
self.ostop.clear()
launch(self.output)
def stop(self):
self.ostop.set()
self.oqueue.put(None)
def wait(self):
self.oqueue.join()
super().wait()
"IRc"
class IRC(Output, Client):
def __init__(self):
Client.__init__(self)
Output.__init__(self)

@@ -113,2 +169,3 @@ self.buffer = []

self.events.joined = threading.Event()
self.events.logon = threading.Event()
self.events.ready = threading.Event()

@@ -146,5 +203,5 @@ self.idents = []

def connect(self, server, port=6667):
rlog("debug", f"connecting to {server}:{port}")
self.state.nrconnect += 1
self.events.connected.clear()
self.events.joined.clear()
if self.cfg.password:

@@ -171,3 +228,3 @@ rlog("debug", "using SASL")

self.events.connected.set()
rlog("warn", f"connected to {self.cfg.server}:{self.cfg.port} channel {self.cfg.channel}")
rlog("debug", f"connected {self.cfg.server}:{self.cfg.port} {self.cfg.channel}")
return True

@@ -232,4 +289,11 @@ return False

if self.connect(server, port):
self.logon(self.cfg.server, self.cfg.nick)
self.events.joined.wait(15.0)
if not self.events.joined.is_set():
self.disconnect()
self.events.joined.set()
continue
break
except (
socket.timeout,
ssl.SSLError,

@@ -239,7 +303,6 @@ OSError,

) as ex:
self.events.joined.set()
self.state.error = str(ex)
rlog("error", str(type(ex)) + " " + str(ex))
rlog("error", f"sleeping {self.cfg.sleep} seconds")
rlog("debug", str(type(ex)) + " " + str(ex))
time.sleep(self.cfg.sleep)
self.logon(server, nck)

@@ -310,7 +373,3 @@ def dosay(self, channel, txt):

if self.state.pongcheck:
self.state.pongcheck = False
self.state.keeprunning = False
self.events.connected.clear()
launch(init)
break
self.restart()

@@ -407,3 +466,5 @@ def logon(self, server, nck):

self.state.error = str(type(ex)) + " " + str(ex)
rlog("error", self.state.error)
rlog("debug", self.state.error)
self.state.pongcheck = True
self.stop()
return None

@@ -430,4 +491,7 @@ try:

ConnectionResetError,
BrokenPipeError
BrokenPipeError,
socket.timeout
) as ex:
rlog("debug", str(type(ex)) + " " + str(ex))
self.events.joined.set()
self.state.nrerror += 1

@@ -442,3 +506,3 @@ self.state.error = str(ex)

def reconnect(self):
rlog("error", f"reconnecting to {self.cfg.server}:{self.cfg.port}")
rlog("debug", f"reconnecting {self.cfg.server:self.cfg.port}")
self.disconnect()

@@ -449,2 +513,10 @@ self.events.connected.clear()

def restart(self):
self.events.joined.set()
self.state.pongcheck = False
self.state.keeprunning = False
self.state.stopkeep = True
self.stop()
launch(init)
def size(self, chan):

@@ -483,15 +555,16 @@ if chan in dir(self.cache):

Output.start(self)
launch(
self.doconnect,
self.cfg.server or "localhost",
self.cfg.nick,
int(self.cfg.port or '6667')
)
Client.start(self)
if not self.state.keeprunning:
launch(self.keep)
launch(self.doconnect,
self.cfg.server or "localhost",
self.cfg.nick,
int(self.cfg.port) or 6667
)
def stop(self):
self.state.stopkeep = True
Client.stop(self)
Output.stop(self)
self.disconnect()
#self.disconnect()

@@ -502,2 +575,5 @@ def wait(self):

"callbacks"
def cb_auth(evt):

@@ -520,4 +596,3 @@ bot = Fleet.get(evt.orig)

bot.state.error = evt.txt
#rlog('error', fmt(evt))
rlog("error", evt.txt)
rlog('debug', fmt(evt))

@@ -550,3 +625,3 @@

bot = Fleet.get(evt.orig)
bot.logon()
bot.events.logon,set()

@@ -580,3 +655,3 @@

bot = Fleet.get(evt.orig)
rlog("error", f"quit from {bot.cfg.server}")
rlog("debug", f"quit from {bot.cfg.server}")
bot.state.nrerror += 1

@@ -583,0 +658,0 @@ bot.state.error = evt.txt

@@ -25,5 +25,6 @@ # This file is placed in the Public Domain.

from ..clients import Fleet
from ..command import elapsed, spl
from ..objects import Default, Object, fmt, update
from ..persist import find, fntime, getpath, last, write
from ..runtime import Repeater, elapsed, launch, rlog, spl
from ..runtime import Repeater, launch, rlog

@@ -40,2 +41,5 @@

"init"
def init():

@@ -47,2 +51,5 @@ fetcher = Fetcher()

"classes"
class Feed(Default):

@@ -55,2 +62,20 @@

class Rss(Default):
def __init__(self):
Default.__init__(self)
self.display_list = 'title,link,author'
self.insertid = None
self.name = ""
self.rss = ""
class Urls(Default):
pass
"fetcher"
class Fetcher(Object):

@@ -65,4 +90,4 @@

def display(obj):
displaylist = ""
result = ''
displaylist = []
try:

@@ -100,3 +125,3 @@ displaylist = obj.display_list or 'title,link'

else:
uurl = fed.link
uurl = url
urls.append(uurl)

@@ -137,2 +162,59 @@ if uurl in seen:

"parser"
class Parser:
@staticmethod
def getitem(line, item):
lne = ''
index1 = line.find(f'<{item}>')
if index1 == -1:
return lne
index1 += len(item) + 2
index2 = line.find(f'</{item}>', index1)
if index2 == -1:
return lne
lne = line[index1:index2]
lne = cdata(lne)
return lne.strip()
@staticmethod
def getitems(text, token):
index = 0
result = []
stop = False
while not stop:
index1 = text.find(f'<{token}', index)
if index1 == -1:
break
index1 += len(token) + 2
index2 = text.find(f'</{token}>', index1)
if index2 == -1:
break
lne = text[index1:index2]
result.append(lne)
index = index2
return result
@staticmethod
def parse(txt, toke="item", items='title,link'):
result = []
for line in Parser.getitems(txt, toke):
line = line.strip()
obj = Object()
for itm in spl(items):
val = Parser.getitem(line, itm)
if val:
val = unescape(val.strip())
val = val.replace("\n", "")
val = striphtml(val)
setattr(obj, itm, val)
result.append(obj)
return result
"opml"
class OPML:

@@ -202,68 +284,2 @@

class Parser:
@staticmethod
def getitem(line, item):
lne = ''
index1 = line.find(f'<{item}>')
if index1 == -1:
return lne
index1 += len(item) + 2
index2 = line.find(f'</{item}>', index1)
if index2 == -1:
return lne
lne = line[index1:index2]
lne = cdata(lne)
return lne.strip()
@staticmethod
def getitems(text, token):
index = 0
result = []
stop = False
while not stop:
index1 = text.find(f'<{token}', index)
if index1 == -1:
break
index1 += len(token) + 2
index2 = text.find(f'</{token}>', index1)
if index2 == -1:
break
lne = text[index1:index2]
result.append(lne)
index = index2
return result
@staticmethod
def parse(txt, toke="item", items='title,link'):
result = []
for line in Parser.getitems(txt, toke):
line = line.strip()
obj = Object()
for itm in spl(items):
val = Parser.getitem(line, itm)
if val:
val = unescape(val.strip())
val = val.replace("\n", "")
val = striphtml(val)
setattr(obj, itm, val)
result.append(obj)
return result
class Rss(Default):
def __init__(self):
Default.__init__(self)
self.display_list = 'title,link,author'
self.insertid = None
self.name = ""
self.rss = ""
class Urls(Default):
pass
"utilities"

@@ -296,2 +312,4 @@

if rest:
if 'link' not in items:
items += ",link"
if url.endswith('atom'):

@@ -500,2 +518,5 @@ result = Parser.parse(str(rest.data, 'utf-8'), 'entry', items) or []

"data"
TEMPLATE = """<opml version="1.0">

@@ -502,0 +523,0 @@ <head>

@@ -11,6 +11,8 @@ # This file is placed in the Public Domain.

from ..command import STARTTIME
from ..runtime import elapsed
from ..command import STARTTIME, elapsed
"commands"
def thr(event):

@@ -17,0 +19,0 @@ result = []

@@ -10,3 +10,6 @@ # This file is placed in the Public Domain.

"commands"
def ver(event):
event.reply(f"{Main.name.upper()} {Main.version}")
# This file is placed in the Public Domain.
"locate"
"persistence"

@@ -46,28 +46,2 @@

"disk"
def cdir(path):
pth = pathlib.Path(path)
pth.parent.mkdir(parents=True, exist_ok=True)
def read(obj, path):
with lock:
with open(path, "r", encoding="utf-8") as fpt:
try:
update(obj, load(fpt))
except json.decoder.JSONDecodeError as ex:
ex.add_note(path)
raise ex
def write(obj, path):
with lock:
cdir(path)
with open(path, "w", encoding="utf-8") as fpt:
dump(obj, fpt, indent=4)
return path
"workdir"

@@ -202,2 +176,28 @@

"disk"
def cdir(path):
pth = pathlib.Path(path)
pth.parent.mkdir(parents=True, exist_ok=True)
def read(obj, path):
with lock:
with open(path, "r", encoding="utf-8") as fpt:
try:
update(obj, load(fpt))
except json.decoder.JSONDecodeError as ex:
ex.add_note(path)
raise ex
def write(obj, path):
with lock:
cdir(path)
with open(path, "w", encoding="utf-8") as fpt:
dump(obj, fpt, indent=4)
return path
"interface"

@@ -204,0 +204,0 @@

@@ -12,9 +12,5 @@ # This file is placed in the Public Domain.

import threading
import traceback
import _thread
from .objects import Default
errorlock = threading.RLock()

@@ -25,2 +21,32 @@ launchlock = threading.RLock()

"logging"
LEVELS = {'debug': logging.DEBUG,
'info': logging.INFO,
'warning': logging.WARNING,
'warn': logging.WARNING,
'error': logging.ERROR,
'critical': logging.CRITICAL
}
def level(loglevel="debug"):
if loglevel != "none":
format_short = "%(message)-80s"
datefmt = '%H:%M:%S'
logging.basicConfig(stream=sys.stderr, datefmt=datefmt, format=format_short)
logging.getLogger().setLevel(LEVELS.get(loglevel))
def rlog(loglevel, txt, ignore=None):
if ignore is None:
ignore = []
for ign in ignore:
if ign in str(txt):
return
logging.log(LEVELS.get(loglevel), txt)
"threads"

@@ -51,2 +77,4 @@

self.result = func(*args)
except (KeyboardInterrupt, EOFError) as ex:
raise ex
except Exception as ex:

@@ -134,81 +162,2 @@ logging.exception(ex)

"logging"
LEVELS = {'debug': logging.DEBUG,
'info': logging.INFO,
'warning': logging.WARNING,
'warn': logging.WARNING,
'error': logging.ERROR,
'critical': logging.CRITICAL
}
def level(loglevel="debug"):
if loglevel != "none":
format_short = "%(message)-80s"
datefmt = '%H:%M:%S'
logging.basicConfig(stream=sys.stderr, datefmt=datefmt, format=format_short)
logging.getLogger().setLevel(LEVELS.get(loglevel))
def rlog(loglevel, txt, ignore=None):
if ignore is None:
ignore = []
for ign in ignore:
if ign in str(txt):
return
logging.log(LEVELS.get(loglevel), txt)
"utilities"
def elapsed(seconds, short=True):
txt = ""
nsec = float(seconds)
if nsec < 1:
return f"{nsec:.2f}s"
yea = 365*24*60*60
week = 7*24*60*60
nday = 24*60*60
hour = 60*60
minute = 60
yeas = int(nsec/yea)
nsec -= yeas*yea
weeks = int(nsec/week)
nsec -= weeks*week
nrdays = int(nsec/nday)
nsec -= nrdays*nday
hours = int(nsec/hour)
nsec -= hours*hour
minutes = int(nsec/minute)
nsec -= int(minute*minutes)
sec = int(nsec)
if yeas:
txt += f"{yeas}y"
if weeks:
nrdays += weeks * 7
if nrdays:
txt += f"{nrdays}d"
if short and txt:
return txt.strip()
if hours:
txt += f"{hours}h"
if minutes:
txt += f"{minutes}m"
if sec:
txt += f"{sec}s"
txt = txt.strip()
return txt
def spl(txt):
try:
result = txt.split(',')
except (TypeError, ValueError):
result = [txt, ]
return [x for x in result if x]
"interface"

@@ -215,0 +164,0 @@

# This file is placed in the Public Domain/
__doc__ = __name__.upper()
# This file is placed in the Public Domain.
"cache"
import datetime
import os
import time
from .objects import fqn, items, update
class Cache:
objs = {}
@staticmethod
def add(path, obj):
Cache.objs[path] = obj
@staticmethod
def get(path):
return Cache.objs.get(path, None)
@staticmethod
def update(path, obj):
if not obj:
return
try:
update(Cache.objs[path], obj)
except KeyError:
Cache.add(path, obj)
def find(clz, selector=None, deleted=False, matching=False):
clz = long(clz)
if selector is None:
selector = {}
for pth in typed(clz):
obj = Cache.get(pth)
if not deleted and isdeleted(obj):
continue
if selector and not search(obj, selector, matching):
continue
yield pth, obj
def fntime(daystr):
datestr = ' '.join(daystr.split(os.sep)[-2:])
datestr = datestr.replace("_", " ")
if '.' in datestr:
datestr, rest = datestr.rsplit('.', 1)
else:
rest = ''
timed = time.mktime(time.strptime(datestr, '%Y-%m-%d %H:%M:%S'))
if rest:
timed += float('.' + rest)
return float(timed)
def getpath(obj):
return ident(obj)
def ident(obj):
return os.path.join(fqn(obj),*str(datetime.datetime.now()).split())
def isdeleted(obj):
return '__deleted__' in dir(obj) and obj.__deleted__
def long(name):
split = name.split(".")[-1].lower()
res = name
for names in types():
if split == names.split(".")[-1].lower():
res = names
break
return res
def typed(matcher):
for key in Cache.objs:
if matcher not in key:
continue
yield key
def types():
return set(Cache.objs.keys())
def search(obj, selector, matching=False):
res = False
if not selector:
return res
for key, value in items(selector):
val = getattr(obj, key, None)
if not val:
continue
if matching and value == val:
res = True
elif str(value).lower() in str(val).lower() or value == "match":
res = True
else:
res = False
break
return res
def __dir__():
return (
'Cache',
'find',
'fns',
'fntime',
'getpath',
'ident'
'last',
'search'
)
# This file is placed in the Public Domain.
"imports"
import importlib
import importlib.util
import os
import sys
"modules"
def load(path, mname=None):
if not os.path.exists(path):
return None
if mname is None:
mname = pathtoname(path)
if mname is None:
mname = path.split(os.sep)[-1][:-3]
spec = importlib.util.spec_from_file_location(mname, path)
if not spec or not spec.loader:
return None
module = importlib.util.module_from_spec(spec)
if not module:
return None
sys.modules[module.__name__] = module
spec.loader.exec_module(module)
return module
def modules(path):
return sorted([
x[:-3] for x in os.listdir(path)
if x.endswith(".py") and not x.startswith("__")
])
"utilities"
def pathtoname(path):
brk = __name__.split(".")[0]
splitted = path.split(os.sep)
res = []
for splt in splitted[::-1]:
if splt.endswith(".py"):
splt = splt[:-3]
res.append(splt)
if splt == brk:
break
return ".".join(res[::-1])
"interface"
def __dir__():
return (
'load',
'modules'
)
# This file is placed in the Public Domain.
"utilities'