🚨 Latest Research:Tanstack npm Packages Compromised in Ongoing Mini Shai-Hulud Supply-Chain Attack.Learn More
Socket
Book a DemoSign in
Socket

python-irodsclient

Package Overview
Dependencies
Maintainers
1
Versions
35
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

python-irodsclient - pypi Package Compare versions

Comparing version
2.0.1
to
2.1.0
+55
irods/genquery2.py
import json
from irods.api_number import api_number
from irods.exception import OperationNotSupported
from irods.message import GenQuery2Request, STR_PI, iRODSMessage
class GenQuery2(object):
"""Interface to the GenQuery2 API
This class provides an interface to the GenQuery2 API, an experimental
iRODS API for querying iRODS. GenQuery2 is an improved version of the
traditional GenQuery interface. The GenQuery2 interface may be subject
to change.
"""
def __init__(self, session):
self.session = session
if not self._is_supported():
raise OperationNotSupported(
"GenQuery2 is not supported by default on this iRODS version.")
def execute(self, query, zone=None):
"""Execute this GenQuery2 query, and return the results."""
effective_zone = self.session.zone if zone is None else zone
return json.loads(self._exec_genquery2(query, effective_zone))
def get_sql(self, query, zone=None):
"""Return the SQL query that this GenQuery2 query will be translated to."""
effective_zone = self.session.zone if zone is None else zone
return self._exec_genquery2(query, effective_zone, sql_flag=True)
def get_column_mappings(self, zone=None):
effective_zone = self.session.zone if zone is None else zone
return json.loads(self._exec_genquery2(
"", effective_zone, column_mappings_flag=True))
def _exec_genquery2(self, query, zone, sql_flag=False,
column_mappings_flag=False):
msg = GenQuery2Request()
msg.query_string = query
msg.zone = zone
msg.sql_only = 1 if sql_flag else 0
msg.column_mappings = 1 if column_mappings_flag else 0
message = iRODSMessage('RODS_API_REQ',
msg=msg,
int_info=api_number['GENQUERY2_AN'])
with self.session.pool.get_connection() as conn:
conn.send(message)
response = conn.recv()
return response.get_main_message(STR_PI).myStr
def _is_supported(self):
"""Checks whether this iRODS server supports GenQuery2."""
return self.session.server_version >= (4, 3, 2)
import contextlib
import re
from ..test.helpers import (home_collection,
make_session as make_test_session)
from irods.message import (ET, XML_Parser_Type)
__all__ = ['make_session', 'home_collection', 'xml_mode']
def make_session(test_server_version = False, **kwargs):
return make_test_session(test_server_version = test_server_version, **kwargs)
make_session.__doc__ = re.sub(r'(test_server_version\s*)=\s*\w+',r'\1 = False',make_test_session.__doc__)
@contextlib.contextmanager
def xml_mode(s):
"""In a with-block, this context manager can temporarily change the client's choice of XML parser.
Example usages:
with("QUASI_XML"):
# ...
with(XML_Parser_Type.QUASI_XML):
# ..."""
try:
if isinstance(s,str):
ET(getattr(XML_Parser_Type,s)) # e.g. xml_mode("QUASI_XML")
elif isinstance(s,XML_Parser_Type):
ET(s) # e.g. xml_mode(XML_Parser_Type.QUASI_XML)
else:
msg = "xml_mode argument must be a string (e.g. 'QUASI_XML') or an XML_Parser_Type enum."
raise ValueError(msg)
yield
finally:
ET(None)
from irods.api_number import api_number
from irods.message import iRODSMessage, JSON_Message
def _touch_impl(session, path, **options):
with session.pool.get_connection() as conn:
message_body = JSON_Message(
{'logical_path': path, 'options': options},
conn.server_version)
message = iRODSMessage('RODS_API_REQ', msg=message_body,
int_info=api_number['TOUCH_APN'])
conn.send(message)
response = conn.recv()
from irods.exception import CollectionDoesNotExist
def _is_collection(session, path):
"""Return True if the logical path points to a collection, else False.
Parameters
----------
session: iRODSSession
The session object.
path: string
The absolute logical path to a collection.
"""
try:
session.collections.get(path)
return True
except CollectionDoesNotExist:
pass
return False
#!/usr/bin/env python
from __future__ import absolute_import
import os
import sys
import unittest
import irods.test.helpers as helpers
class TestClientHints(unittest.TestCase):
def setUp(self):
self.sess = helpers.make_session()
def tearDown(self):
"""Close connections."""
self.sess.cleanup()
def test_client_hints(self):
client_hints = self.sess.client_hints
self.assertIn("specific_queries", client_hints)
self.assertIn("rules", client_hints)
self.assertIn("plugins", client_hints)
self.assertIn("hash_scheme", client_hints)
self.assertIn("match_hash_policy", client_hints)
if __name__ == '__main__':
# let the tests find the parent irods lib
sys.path.insert(0, os.path.abspath('../..'))
unittest.main()
import os
import sys
import unittest
import irods.test.helpers as helpers
class TestGenQuery2(unittest.TestCase):
@classmethod
def setUpClass(cls):
# cls.sess will be available to instance (test_*) methods as self.sess
cls.sess = helpers.make_session()
if cls.sess.server_version < (4, 3, 2):
raise unittest.SkipTest(
'GenQuery2 is not available by default in iRODS before v4.3.2.')
cls.coll_path_a = '/{}/home/{}/test_query2_coll_a'.format(
cls.sess.zone, cls.sess.username)
cls.coll_path_b = '/{}/home/{}/test_query2_coll_b'.format(
cls.sess.zone, cls.sess.username)
cls.sess.collections.create(cls.coll_path_a)
cls.sess.collections.create(cls.coll_path_b)
@classmethod
def tearDownClass(cls):
'''Remove test data and close connections
'''
cls.sess.collections.remove(cls.coll_path_a, force=True)
cls.sess.collections.remove(cls.coll_path_b, force=True)
cls.sess.cleanup()
def test_select(self):
query = "SELECT COLL_NAME WHERE COLL_NAME = '{}'".format(self.coll_path_a)
q = self.sess.genquery2_object()
query_result = q.execute(query)
self.assertIn([self.coll_path_a], query_result)
self.assertEqual(len(query_result), 1)
# Use upper() here in case GenQuery2 returns lowercase table names in a future implementation.
self.assertIn('R_COLL_MAIN', q.get_sql(query).upper())
def test_select_with_explicit_zone(self):
query = "SELECT COLL_NAME WHERE COLL_NAME = '{}'".format(self.coll_path_a)
q = self.sess.genquery2_object()
query_result = q.execute(query, zone=self.sess.zone)
self.assertIn([self.coll_path_a], query_result)
self.assertEqual(len(query_result), 1)
# Use upper() here in case GenQuery2 returns lowercase table names in a future implementation.
self.assertIn('R_COLL_MAIN', q.get_sql(query).upper())
def test_select_with_shorthand(self):
query = "SELECT COLL_NAME WHERE COLL_NAME = '{}'".format(self.coll_path_a)
query_result = self.sess.genquery2(query)
self.assertIn([self.coll_path_a], query_result)
self.assertEqual(len(query_result), 1)
def test_select_with_shorthand_and_explicit_zone(self):
query = "SELECT COLL_NAME WHERE COLL_NAME = '{}'".format(self.coll_path_a)
query_result = self.sess.genquery2(query, zone=self.sess.zone)
self.assertIn([self.coll_path_a], query_result)
self.assertEqual(len(query_result), 1)
def test_select_or(self):
query = "SELECT COLL_NAME WHERE COLL_NAME = '{}' OR COLL_NAME = '{}'".format(self.coll_path_a, self.coll_path_b)
q = self.sess.genquery2_object()
query_result = q.execute(query)
self.assertIn([self.coll_path_a], query_result)
self.assertIn([self.coll_path_b], query_result)
self.assertEqual(len(query_result), 2)
# Use upper() here in case GenQuery2 returns lowercase table names in a future implementation.
self.assertIn('R_COLL_MAIN', q.get_sql(query).upper())
def test_select_and(self):
query = "SELECT COLL_NAME WHERE COLL_NAME LIKE '{}' AND COLL_NAME LIKE '{}'".format(
"%test_query2_coll%", "%query2_coll_a%")
q = self.sess.genquery2_object()
query_result = q.execute(query)
self.assertIn([self.coll_path_a], query_result)
self.assertEqual(len(query_result), 1)
# Use upper() here in case GenQuery2 returns lowercase table names in a future implementation.
self.assertIn('R_COLL_MAIN', q.get_sql(query).upper())
def test_column_mappings(self):
q = self.sess.genquery2_object()
result = q.get_column_mappings()
self.assertIn("COLL_ID", result.keys())
self.assertIn("DATA_ID", result.keys())
self.assertIn("RESC_ID", result.keys())
self.assertIn("USER_ID", result.keys())
if __name__ == '__main__':
# let the tests find the parent irods lib
sys.path.insert(0, os.path.abspath('../..'))
unittest.main()
#! /usr/bin/env python
from __future__ import absolute_import
import os
import sys
import unittest
import irods.test.helpers as helpers
class TestLibraryFeatures(unittest.TestCase):
def setUp(self):
self.sess = helpers.make_session()
def tearDown(self):
"""Close connections."""
self.sess.cleanup()
def test_library_features__issue_556(self):
if self.sess.server_version < (4, 3, 1):
self.skipTest('Do not test library features before iRODS 4.3.1')
features = self.sess.library_features()
# Test that returned features are in the form of a Python dict object.
self.assertIsInstance(features, dict)
# Test that features is populated by at least one item.
self.assertTrue(features)
if __name__ == '__main__':
# let the tests find the parent irods lib
sys.path.insert(0, os.path.abspath('../..'))
unittest.main()
#! /usr/bin/env python
from __future__ import print_function
from __future__ import absolute_import
import os
import sys
import tempfile
import unittest
import textwrap
import json
import shutil
import ssl
import irods.test.helpers as helpers
from irods.connection import Connection
from irods.session import iRODSSession, NonAnonymousLoginWithoutPassword
from irods.rule import Rule
from irods.models import User
from socket import gethostname
from irods.password_obfuscation import (encode as pw_encode)
from irods.connection import PlainTextPAMPasswordError
from irods.access import iRODSAccess
import irods.exception as ex
import contextlib
import socket
from re import compile as regex
import gc
import six
from irods.test.setupssl import create_ssl_dir
#
# Allow override to specify the PAM password in effect for the test rodsuser.
#
TEST_PAM_PW_OVERRIDE = os.environ.get('PYTHON_IRODSCLIENT_TEST_PAM_PW_OVERRIDE','')
TEST_PAM_PW = TEST_PAM_PW_OVERRIDE or 'test123'
TEST_IRODS_PW = 'apass'
TEST_RODS_USER = 'alissa'
try:
from re import _pattern_type as regex_type
except ImportError:
from re import Pattern as regex_type # Python 3.7+
def json_file_update(fname,keys_to_delete=(),**kw):
with open(fname,'r') as f:
j = json.load(f)
j.update(**kw)
for k in keys_to_delete:
if k in j: del j [k]
elif isinstance(k,regex_type):
jk = [i for i in j.keys() if k.search(i)]
for ky in jk: del j[ky]
with open(fname,'w') as out:
json.dump(j, out, indent=4)
def env_dir_fullpath(authtype): return os.path.join( os.environ['HOME'] , '.irods.' + authtype)
def json_env_fullpath(authtype): return os.path.join( env_dir_fullpath(authtype), 'irods_environment.json')
def secrets_fullpath(authtype): return os.path.join( env_dir_fullpath(authtype), '.irodsA')
RODSADMIN_ENV_PATH = os.path.expanduser('~/.irods/irods_environment.json')
SERVER_ENV_SSL_SETTINGS = {
"irods_ssl_certificate_chain_file": "/etc/irods/ssl/irods.crt",
"irods_ssl_certificate_key_file": "/etc/irods/ssl/irods.key",
"irods_ssl_dh_params_file": "/etc/irods/ssl/dhparams.pem",
"irods_ssl_ca_certificate_file": "/etc/irods/ssl/irods.crt",
"irods_ssl_verify_server": "cert"
}
CLIENT_OPTIONS_FOR_SSL = {
"irods_client_server_policy": "CS_NEG_REQUIRE",
"irods_client_server_negotiation": "request_server_negotiation",
"irods_ssl_ca_certificate_file": "/etc/irods/ssl/irods.crt",
"irods_ssl_verify_server": "cert",
"irods_encryption_key_size": 16,
"irods_encryption_salt_size": 8,
"irods_encryption_num_hash_rounds": 16,
"irods_encryption_algorithm": "AES-256-CBC"
}
def client_env_keys_from_admin_env(user_name, auth_scheme=""):
cli_env = {}
with open(RODSADMIN_ENV_PATH) as f:
srv_env = json.load(f)
for k in [ "irods_host", "irods_zone_name", "irods_port" ]:
cli_env [k] = srv_env[k]
cli_env["irods_user_name"] = user_name
if auth_scheme:
cli_env["irods_authentication_scheme"] = auth_scheme
return cli_env
@contextlib.contextmanager
def pam_password_in_plaintext(allow=True):
saved = bool(Connection.DISALLOWING_PAM_PLAINTEXT)
try:
Connection.DISALLOWING_PAM_PLAINTEXT = not(allow)
yield
finally:
Connection.DISALLOWING_PAM_PLAINTEXT = saved
class TestLogins(unittest.TestCase):
'''
Ideally, these tests should move into CI, but that would require the server
(currently a different node than the client) to have SSL certs created and
enabled.
Until then, we require these tests to be run manually on a server node,
with:
python -m unittest "irods.test.login_auth_test[.XX[.YY]]'
Additionally:
1. The PAM/SSL tests under the TestLogins class should be run on a
single-node iRODS system, by the service account user. This ensures
the /etc/irods directory is local and writable.
2. ./setupssl.py (sets up SSL keys etc. in /etc/irods/ssl) should be run
first to create (or overwrite, if appropriate) the /etc/irods/ssl directory
and its contents.
3. Must add & override configuration entries in /var/lib/irods/irods_environment
Per https://slides.com/irods/ugm2018-ssl-and-pam-configuration#/3/7
'''
user_auth_envs = {
'.irods.pam': {
'USER': TEST_RODS_USER,
'PASSWORD': TEST_PAM_PW,
'AUTH': 'pam'
},
'.irods.native': {
'USER': TEST_RODS_USER,
'PASSWORD': TEST_IRODS_PW,
'AUTH': 'native'
}
}
env_save = {}
@contextlib.contextmanager
def setenv(self,var,newvalue):
try:
self.env_save[var] = os.environ.get(var,None)
os.environ[var] = newvalue
yield newvalue
finally:
oldvalue = self.env_save[var]
if oldvalue is None:
del os.environ[var]
else:
os.environ[var]=oldvalue
def create_env_dirs(self):
dirs = {}
retval = []
# -- create environment configurations and secrets
with pam_password_in_plaintext():
for dirname,lookup in self.user_auth_envs.items():
if lookup['AUTH'] in ('pam','pam_password'):
ses = iRODSSession( host=gethostname(),
user=lookup['USER'],
zone='tempZone',
authentication_scheme=lookup['AUTH'],
password=lookup['PASSWORD'],
port= 1247 )
try:
pam_hashes = ses.pam_pw_negotiated
except AttributeError:
pam_hashes = []
if not pam_hashes: print('Warning ** PAM pw couldnt be generated' ); break
scrambled_pw = pw_encode( pam_hashes[0] )
#elif lookup['AUTH'] == 'XXXXXX': # TODO: insert other authentication schemes here
elif lookup['AUTH'] in ('native', '',None):
scrambled_pw = pw_encode( lookup['PASSWORD'] )
cl_env = client_env_keys_from_admin_env(TEST_RODS_USER)
if lookup.get('AUTH',None) is not None: # - specify auth scheme only if given
cl_env['irods_authentication_scheme'] = lookup['AUTH']
dirbase = os.path.join(os.environ['HOME'],dirname)
dirs[dirbase] = { 'secrets':scrambled_pw , 'client_environment':cl_env }
# -- create the environment directories and write into them the configurations just created
for absdir in dirs.keys():
shutil.rmtree(absdir,ignore_errors=True)
os.mkdir(absdir)
with open(os.path.join(absdir,'irods_environment.json'),'w') as envfile:
envfile.write('{}')
json_file_update(envfile.name, **dirs[absdir]['client_environment'])
with open(os.path.join(absdir,'.irodsA'),'w') as secrets_file:
secrets_file.write(dirs[absdir]['secrets'])
os.chmod(secrets_file.name,0o600)
retval = dirs.keys()
return retval
PAM_SCHEME_STRING = 'pam'
@classmethod
def setUpClass(cls):
cls.admin = helpers.make_session()
if cls.admin.server_version >= (4,3):
cls.PAM_SCHEME_STRING = cls.user_auth_envs['.irods.pam']['AUTH'] = 'pam_password'
@classmethod
def tearDownClass(cls):
cls.admin.cleanup()
def setUp(self):
super(TestLogins,self).setUp()
def tearDown(self):
for envdir in getattr(self, 'envdirs', []):
shutil.rmtree(envdir, ignore_errors=True)
super(TestLogins,self).tearDown()
def validate_session(self, session, verbose=False, **options):
# - try to get the home collection
home_coll = '/{0.zone}/home/{0.username}'.format(session)
self.assertTrue(session.collections.get(home_coll).path == home_coll)
if verbose: print(home_coll)
# - check user is as expected
self.assertEqual( session.username, TEST_RODS_USER )
# - check socket type (normal vs SSL) against whether ssl requested
use_ssl = options.pop('ssl',None)
if use_ssl is not None:
my_connect = [s for s in (session.pool.active|session.pool.idle)] [0]
self.assertEqual( bool( use_ssl ), my_connect.socket.__class__ is ssl.SSLSocket )
@contextlib.contextmanager
def _setup_rodsuser_and_optional_pw(self, name, make_irods_pw = False):
try:
self.admin.users.create(name, 'rodsuser')
if make_irods_pw:
self.admin.users.modify(name,'password',TEST_IRODS_PW)
yield
finally:
self.admin.users.remove( name )
def tst0(self, ssl_opt, auth_opt, env_opt, name = TEST_RODS_USER, make_irods_pw = False):
_auth_opt = auth_opt
if auth_opt in ('pam', 'pam_password'):
auth_opt = self.PAM_SCHEME_STRING
with self._setup_rodsuser_and_optional_pw(name = name, make_irods_pw = make_irods_pw):
self.envdirs = self.create_env_dirs()
if not self.envdirs:
raise RuntimeError('Could not create one or more client environments')
auth_opt_explicit = 'native' if _auth_opt=='' else _auth_opt
verbosity=False
#verbosity='' # -- debug - sanity check by printing out options applied
out = {'':''}
if env_opt:
with self.setenv('IRODS_ENVIRONMENT_FILE', json_env_fullpath(auth_opt_explicit)) as env_file,\
self.setenv('IRODS_AUTHENTICATION_FILE', secrets_fullpath(auth_opt_explicit)):
cli_env_extras = {} if not(ssl_opt) else dict( CLIENT_OPTIONS_FOR_SSL )
if auth_opt:
cli_env_extras.update( irods_authentication_scheme = auth_opt )
remove=[]
else:
remove=[regex('authentication_')]
with helpers.file_backed_up(env_file):
json_file_update( env_file, keys_to_delete=remove, **cli_env_extras )
session = iRODSSession(irods_env_file=env_file)
with open(env_file) as f:
out = json.load(f)
self.validate_session( session, verbose = verbosity, ssl = ssl_opt )
session.cleanup()
out['ARGS']='no'
else:
session_options = {}
if auth_opt:
session_options.update (authentication_scheme = auth_opt)
if ssl_opt:
SSL_cert = CLIENT_OPTIONS_FOR_SSL["irods_ssl_ca_certificate_file"]
session_options.update(
ssl_context = ssl.create_default_context ( purpose = ssl.Purpose.SERVER_AUTH,
capath = None,
cadata = None,
cafile = SSL_cert),
**CLIENT_OPTIONS_FOR_SSL )
lookup = self.user_auth_envs ['.irods.'+('native' if not(_auth_opt) else _auth_opt)]
session = iRODSSession ( host=gethostname(),
user=lookup['USER'],
zone='tempZone',
password=lookup['PASSWORD'],
port= 1247,
**session_options )
out = session_options
self.validate_session( session, verbose = verbosity, ssl = ssl_opt )
session.cleanup()
out['ARGS']='yes'
if verbosity == '':
print ('--- ssl:',ssl_opt,'/ auth:',repr(auth_opt),'/ env:',env_opt)
print ('--- > ',json.dumps({k:v for k,v in out.items() if k != 'ssl_context'},indent=4))
print ('---')
# == test defaulting to 'native'
def test_01(self):
self.tst0 ( ssl_opt = True , auth_opt = '' , env_opt = False , make_irods_pw = True)
def test_02(self):
self.tst0 ( ssl_opt = False, auth_opt = '' , env_opt = False , make_irods_pw = True)
def test_03(self):
self.tst0 ( ssl_opt = True , auth_opt = '' , env_opt = True , make_irods_pw = True )
def test_04(self):
self.tst0 ( ssl_opt = False, auth_opt = '' , env_opt = True , make_irods_pw = True )
# == test explicit scheme 'native'
def test_1(self):
self.tst0 ( ssl_opt = True , auth_opt = 'native' , env_opt = False, make_irods_pw = True)
def test_2(self):
self.tst0 ( ssl_opt = False, auth_opt = 'native' , env_opt = False, make_irods_pw = True)
def test_3(self):
self.tst0 ( ssl_opt = True , auth_opt = 'native' , env_opt = True, make_irods_pw = True)
def test_4(self):
self.tst0 ( ssl_opt = False, auth_opt = 'native' , env_opt = True, make_irods_pw = True)
# == test explicit scheme 'pam'
def test_5(self):
self.tst0 ( ssl_opt = True, auth_opt = 'pam' , env_opt = False )
def test_6(self):
try:
self.tst0 ( ssl_opt = False, auth_opt = 'pam' , env_opt = False )
except PlainTextPAMPasswordError:
pass
else:
# -- no exception raised
self.fail("PlainTextPAMPasswordError should have been raised")
def test_7(self):
self.tst0 ( ssl_opt = True , auth_opt = 'pam' , env_opt = True )
def test_8(self):
self.tst0 ( ssl_opt = False, auth_opt = 'pam' , env_opt = True )
@unittest.skipUnless(TEST_PAM_PW_OVERRIDE, "Skipping unless pam password is overridden (e.g. to test special characters)")
def test_escaped_pam_password_chars__362(self):
with self._setup_rodsuser_and_optional_pw(name = TEST_RODS_USER):
context = ssl._create_unverified_context(
purpose=ssl.Purpose.SERVER_AUTH, capath=None, cadata=None, cafile=None,
)
ssl_settings = {
'client_server_negotiation': 'request_server_negotiation',
'client_server_policy': 'CS_NEG_REQUIRE',
'encryption_algorithm': 'AES-256-CBC',
'encryption_key_size': 32,
'encryption_num_hash_rounds': 16,
'encryption_salt_size': 8,
'ssl_ca_certificate_file': '/etc/irods/ssl/irods.crt',
'ssl_context': context
}
irods_session = iRODSSession(
host = self.admin.host,
port = self.admin.port,
zone = self.admin.zone,
user = TEST_RODS_USER,
password = TEST_PAM_PW_OVERRIDE,
authentication_scheme = 'pam',
**ssl_settings
)
home_coll = '/{0.zone}/home/{0.username}'.format(irods_session)
self.assertEqual(irods_session.collections.get(home_coll).path, home_coll)
class TestAnonymousUser(unittest.TestCase):
def setUp(self):
admin = self.admin = helpers.make_session()
user = self.user = admin.users.create('anonymous', 'rodsuser', admin.zone)
self.home = '/{admin.zone}/home/{user.name}'.format(**locals())
admin.collections.create(self.home)
acl = iRODSAccess('own', self.home, user.name)
admin.acls.set(acl, admin = True)
self.env_file = os.path.expanduser('~/.irods.anon/irods_environment.json')
self.env_dir = ( os.path.dirname(self.env_file))
self.auth_file = os.path.expanduser('~/.irods.anon/.irodsA')
os.mkdir( os.path.dirname(self.env_file))
json.dump( { "irods_host": admin.host,
"irods_port": admin.port,
"irods_user_name": user.name,
"irods_zone_name": admin.zone }, open(self.env_file,'w'), indent=4 )
def tearDown(self):
self.admin.collections.remove(self.home, recurse = True, force = True)
self.admin.users.remove(self.user.name)
shutil.rmtree (self.env_dir, ignore_errors = True)
def test_login_from_environment(self):
orig_env = os.environ.copy()
try:
os.environ["IRODS_ENVIRONMENT_FILE"] = self.env_file
os.environ["IRODS_AUTHENTICATION_FILE"] = self.auth_file
ses = helpers.make_session()
ses.collections.get(self.home)
finally:
os.environ.clear()
os.environ.update( orig_env )
class TestMiscellaneous(unittest.TestCase):
def test_nonanonymous_login_without_auth_file_fails__290(self):
ses = self.admin
if ses.users.get( ses.username ).type != 'rodsadmin':
self.skipTest( 'Only a rodsadmin may run this test.')
try:
ENV_DIR = tempfile.mkdtemp()
ses.users.create('bob', 'rodsuser')
ses.users.modify('bob', 'password', 'bpass')
d = dict(password = 'bpass', user = 'bob', host = ses.host, port = ses.port, zone = ses.zone)
(bob_env, bob_auth) = helpers.make_environment_and_auth_files(ENV_DIR, **d)
login_options = { 'irods_env_file': bob_env, 'irods_authentication_file': bob_auth }
with helpers.make_session(**login_options) as s:
s.users.get('bob')
os.unlink(bob_auth)
# -- Check that we raise an appropriate exception pointing to the missing auth file path --
with self.assertRaisesRegexp(NonAnonymousLoginWithoutPassword, bob_auth):
with helpers.make_session(**login_options) as s:
s.users.get('bob')
finally:
try:
shutil.rmtree(ENV_DIR,ignore_errors=True)
ses.users.get('bob').remove()
except ex.UserDoesNotExist:
pass
def setUp(self):
admin = self.admin = helpers.make_session()
if admin.users.get(admin.username).type != 'rodsadmin':
self.skipTest('need admin privilege')
admin.users.create('alice','rodsuser')
def tearDown(self):
self.admin.users.remove('alice')
self.admin.cleanup()
@unittest.skipUnless(six.PY3, "Skipping in Python2 because it doesn't reliably do cyclic GC.")
def test_destruct_session_with_no_pool_315(self):
destruct_flag = [False]
class mySess( iRODSSession ):
def __del__(self):
self.pool = None
super(mySess,self).__del__() # call parent destructor(s) - will raise
# an error before the #315 fix
destruct_flag[:] = [True]
admin = self.admin
admin.users.modify('alice','password','apass')
my_sess = mySess( user = 'alice',
password = 'apass',
host = admin.host,
port = admin.port,
zone = admin.zone)
my_sess.cleanup()
del my_sess
gc.collect()
self.assertEqual( destruct_flag, [True] )
def test_non_anon_native_login_omitting_password_fails_1__290(self):
# rodsuser with password unset
with self.assertRaises(ex.CAT_INVALID_USER):
self._non_anon_native_login_omitting_password_fails_N__290()
def test_non_anon_native_login_omitting_password_fails_2__290(self):
# rodsuser with a password set
self.admin.users.modify('alice','password','apass')
with self.assertRaises(ex.CAT_INVALID_AUTHENTICATION):
self._non_anon_native_login_omitting_password_fails_N__290()
def _non_anon_native_login_omitting_password_fails_N__290(self):
admin = self.admin
with iRODSSession(zone = admin.zone, port = admin.port, host = admin.host, user = 'alice') as alice:
alice.collections.get(helpers.home_collection(alice))
class TestWithSSL(unittest.TestCase):
'''
The tests within this class should be run by an account other than the
service account. Otherwise there is risk of corrupting the server setup.
'''
def setUp(self):
if os.path.expanduser('~') == '/var/lib/irods':
self.skipTest('TestWithSSL may not be run by user irods')
if not os.path.exists('/etc/irods/ssl'):
self.skipTest('Running setupssl.py as irods user is prerequisite for this test.')
with helpers.make_session() as session:
if not session.host in ('localhost', socket.gethostname()):
self.skipTest('Test must be run co-resident with server')
def test_ssl_with_server_verify_set_to_none_281(self):
env_file = os.path.expanduser('~/.irods/irods_environment.json')
my_ssl_directory = ''
try:
with helpers.file_backed_up(env_file):
with open(env_file) as env_file_handle:
env = json.load( env_file_handle )
my_ssl_directory = tempfile.mkdtemp(dir = os.path.expanduser("~"))
# Elect for efficiency in DH param generation, eg. when setting up for testing.
create_ssl_dir(ssl_dir = my_ssl_directory, use_strong_primes_for_dh_generation = False)
settings_to_update = {key:value.replace("/etc/irods/ssl",my_ssl_directory)
for key,value in env.items() if type(value) is str and value.startswith("/etc/irods/ssl")}
settings_to_update["irods_ssl_verify_server"] = "none"
env.update( settings_to_update )
with open(env_file,'w') as f:
json.dump(env,f)
with helpers.make_session() as session:
session.collections.get('/{session.zone}/home/{session.username}'.format(**locals()))
finally:
if my_ssl_directory:
shutil.rmtree(my_ssl_directory)
if __name__ == '__main__':
# let the tests find the parent irods lib
sys.path.insert(0, os.path.abspath('../..'))
unittest.main()
import irods
# Used in test of:
# irods.test.data_obj_test.TestDataObjOps.test_setting_xml_parser_choice_by_environment_only__issue_584
print(irods.client_configuration.connections.xml_parser_default)
+33
-0
Changelog
=========
v2.1.0 (2024-08-13)
-------------------
- [#3] v2.1.0 and update changelog [Terrell Russell]
- [#3] allow genquery2_test to work under all Python2/3 versions [Daniel Moore]
- [#3] call assertRaisesRegex[p] with/without final 'p' (depends on Python 2 or 3) [Daniel Moore]
- [#534] implement replica truncate [Daniel Moore]
- [#584] load settings from environment even without use of config file [Daniel Moore]
- [#600] Comment out test_files_query_case_sensitive BETWEEN tests [Alan King]
- [#597] genquery2_test: Replace Postgres-specific assertions [Alan King]
- [#566] Rename login_auth_test.py to prevent running with full suite [Alan King]
- [#533][#556] implement library features [Daniel Moore]
- [#537] add --help option to the script setupssl.py [Daniel Moore]
- [#574] rename progress_bar to updatables, allow for genericity [Daniel Moore]
- [#574] Allow for tqdm progress bars to be used [Raoul Schram]
- [#586] implement xml_mode() in a new irods.helpers module [Daniel Moore]
- [#558] iRODSAccess: Handle non-str types in constructor [Alan King]
- [#558] Add tests for iRODSAccess constructor type checking [Alan King]
- [#567] return logging to normal after a run of the pool_test [Daniel Moore]
- [#3][#562] skip issue 562 (leaking connections) test for Python2 [Daniel Moore]
- [#565] Descend Bad_AVU_Value from ValueError [Daniel Moore]
- [#587] unique_name now hashes the call tuple for a random seed. [Daniel Moore]
- [#3][#525] allow touch API tests to run on Python 2 [Daniel Moore]
- [#532,#564,#569] fix stored connections to match desired connection timeout. [Daniel Moore]
- [#562] release old connection when redirecting [Daniel Moore]
- [#576] test admin mode in metadata.apply_atomic_operations [Daniel Moore]
- [#576] Add missing admin_mode in JSON message for metadata.apply_atomic_operations [Paul Borgermans]
- [#571] exclude collection "/" from subcollections [Daniel Moore]
- [#557] de-duplicate acl lists in case of multiple replicas. [Daniel Moore]
- [#525] Add support for touch API operation. [Kory Draughn]
- [#535] Implement basic support for GenQuery2 [Sietse Snel]
- [#547] unify AVU field exceptions interface for metadata add and set [Daniel Moore]
- [#550] Add support for client hints [Sietse Snel]
v2.0.1 (2024-04-30)

@@ -5,0 +38,0 @@ -------------------

+24
-1
import collections
import copy
import six
from irods.collection import iRODSCollection
from irods.data_object import iRODSDataObject
from irods.path import iRODSPath

@@ -61,3 +64,11 @@ class _Access_LookupMeta(type):

self.access_name = access_name
self.path = path
if isinstance(path, (iRODSCollection, iRODSDataObject)):
self.path = path.path
elif isinstance(path, str):
# This should cover irods.path.iRODSPath as well as it is a descendant type of str.
self.path = path
else:
raise TypeError(
"'path' parameter must be of type 'str', 'irods.collection.iRODSCollection', "
"'irods.data_object.iRODSDataObject', or 'irods.path.iRODSPath'.")
self.user_name = user_name

@@ -67,2 +78,14 @@ self.user_zone = user_zone

def __eq__(self,other):
return self.access_name == other.access_name and \
iRODSPath(self.path) == iRODSPath(other.path) and \
self.user_name == other.user_name and \
self.user_zone == other.user_zone
def __hash__(self):
return hash((self.access_name,
iRODSPath(self.path),
self.user_name,
self.user_zone))
def copy(self, decanonicalize = False):

@@ -69,0 +92,0 @@ other = copy.deepcopy(self)

@@ -160,2 +160,5 @@ api_number = {

"GET_LIBRARY_FEATURES_AN": 801,
"REPLICA_TRUNCATE_AN": 802,
# 1000 - 1059 - NETCDF API calls

@@ -180,8 +183,11 @@ "NC_OPEN_AN": 1000,

"SSL_END_AN": 1101,
"CLIENT_HINTS_AN": 10215,
"GET_RESOURCE_INFO_FOR_OPERATION_AN": 10220,
"GENQUERY2_AN": 10221,
"ATOMIC_APPLY_METADATA_OPERATIONS_APN": 20002,
"GET_FILE_DESCRIPTOR_INFO_APN": 20000,
"REPLICA_CLOSE_APN": 20004,
"TOUCH_APN": 20007,
"AUTH_PLUG_REQ_AN": 1201
}
+14
-5

@@ -330,6 +330,4 @@ from __future__ import print_function

if use_environment_variables:
for key, variable in _calculate_overriding_environment_variables().items():
value = os.environ.get(variable)
if value is not None:
_load_config_line(root, key, value)
_load_settings_from_environment(root)
finally:

@@ -339,4 +337,13 @@ if _file:

default_config_dict = {}
def _load_settings_from_environment(root = None):
if root is None:
root = sys.modules[__name__]
for key, variable in _calculate_overriding_environment_variables().items():
value = os.environ.get(variable)
if value is not None:
_load_config_line(root, key, value)
def preserve_defaults():

@@ -346,3 +353,5 @@ default_config_dict.update((k,copy.deepcopy(v)) for k,v in globals().items() if isinstance(v,iRODSConfiguration))

def autoload(_file_to_load):
if _file_to_load is not None:
if _file_to_load is None:
_load_settings_from_environment()
else:
load(file = _file_to_load, use_environment_variables = True)

@@ -349,0 +358,0 @@

@@ -53,3 +53,3 @@ from __future__ import absolute_import

.filter(Collection.parent_name == self.path)
return [iRODSCollection(self.manager, row) for row in query]
return [iRODSCollection(self.manager, row) for row in query if row[Collection.name] != '/']

@@ -56,0 +56,0 @@ @property

@@ -73,3 +73,5 @@ from __future__ import absolute_import

size=r[DataObject.size],
comments=r[DataObject.comments]
comments=r[DataObject.comments],
create_time=r[DataObject.create_time],
modify_time=r[DataObject.modify_time]
) for r in replicas]

@@ -117,2 +119,5 @@ self._meta = None

def replica_truncate(self, size, **options):
return self.manager.replica_truncate(self.path, size, **options)
def replicate(self, resource = None, **options):

@@ -119,0 +124,0 @@ self.manager.replicate(self.path, resource = resource, **options)

@@ -26,2 +26,6 @@ # if you're copying these from the docs, you might find the following regex helpful:

class InvalidInputArgument(PycommandsException):
pass
class DataObjectDoesNotExist(DoesNotExist):

@@ -69,2 +73,13 @@ pass

class NotImplementedInIRODSServer(PycommandsException):
def __init__(self, feature_description, required_iRODS_version = ()):
super(NotImplementedInIRODSServer,self).__init__(feature_description + ': Not supported by the connected iRODS server.')
self.required_iRODS_version = required_iRODS_version
def __str__(self):
nv = self.required_iRODS_version
return '{}{}'.format(self.args, ' [requires iRODS version: {nv}]'.format(**locals()) if nv else '')
def __repr__(self):
return self.__class__.__name__ + str(self)
class iRODSExceptionMeta(type):

@@ -631,2 +646,6 @@ codes = {}

class SYS_REPLICA_INACCESSIBLE(iRODSException):
code = -168000
class SYS_NOT_ALLOWED(iRODSException):

@@ -633,0 +652,0 @@ code = -169000

@@ -119,3 +119,6 @@ from __future__ import absolute_import

acls = [ iRODSAccess ( r[access_column.name],
# Instantiate as set before converting to a list, in order to remove duplicate iRODSAccess
# objects. [#557]
acls = list({ iRODSAccess ( r[access_column.name],
target.path,

@@ -125,3 +128,3 @@ user_lookup[r[access_column.user_id]].name,

user_lookup[r[access_column.user_id]].type,
) for r in rows ]
) for r in rows })
return acls

@@ -128,0 +131,0 @@

from __future__ import absolute_import
from irods.models import Collection, DataObject
from irods.manager import Manager
from irods.manager._internal import _api_impl
from irods.message import iRODSMessage, CollectionRequest, FileOpenRequest, ObjCopyRequest, StringStringMap

@@ -153,1 +154,38 @@ from irods.exception import CollectionDoesNotExist, NoResultFound

response = conn.recv()
def touch(self, path, **options):
"""Change the mtime of an existing collection.
Parameters
----------
path: string
The absolute logical path of a collection.
seconds_since_epoch: integer, optional
The number of seconds since epoch representing the new mtime. Cannot
be used with "reference" parameter.
reference: string, optional
Use the mtime of the logical path to the data object or collection
identified by this option. Cannot be used with "seconds_since_epoch"
parameter.
Raises
------
CollectionDoesNotExist
If the target collection does not exist or does not point to a
collection.
"""
# Attempt to lookup the collection. If it does not exist, the call
# will raise an exception.
#
# Enforces the requirement that collections must exist before this
# operation is invoked.
self.get(path)
# The following options to the touch API are not allowed for collections.
options.pop('no_create', None)
options.pop('replica_number', None)
options.pop('leaf_resource_name', None)
_api_impl._touch_impl(self.sess, path, no_create=True, **options)
from __future__ import absolute_import
import ast
import collections
import io
import json
import logging
import os
import io
import six
import weakref
from irods.models import DataObject, Collection
from irods.manager import Manager
from irods.manager._internal import _api_impl, _logical_path
from irods.message import (

@@ -19,9 +26,70 @@ iRODSMessage, FileOpenRequest, ObjCopyRequest, StringStringMap, DataObjInfo, ModDataObjMeta,

from irods.parallel import deferred_call
import six
import ast
import json
import logging
logger = logging.getLogger(__name__)
_update_types = []
_update_functions = weakref.WeakKeyDictionary()
def register_update_instance(object_, updater): # updater
_update_functions[object_] = updater
def register_update_type(type_, factory_):
"""
Create an entry corresponding to a type_ of instance to be allowed among updatables, with processing
based on the factory_ callable.
Parameters:
type_ : a type of instance to be allowed in the updatables parameter.
factory_ : a function accepting the instance passed in, and yielding an update callable.
If None, then remove the type from the list.
"""
# Delete if already present in list
z = tuple(zip(*_update_types))
if z and type_ in z[0]:
_update_types.pop(z[0].index(type_))
# Rewrite the list
# - with the new item introduced at the start of the list but otherwise in the same order, and
# - preserving only pairs that do not contain 'None' as the second member.
_update_types[:] = list((k,v) for k,v in collections.OrderedDict([(type_,factory_)] + _update_types).items() if v is not None)
def unregister_update_type(type_):
"""
Remove type_ from the listof recognized updatable types maintained by the PRC.
"""
register_update_type(type_, None)
def do_progress_updates(updatables, n, logging_function = logger.warning):
"""
Used internally by Python iRODS Client's data transfer routines (put, get) to iterate through updatables to be processed.
This, in turn, should cause the underlying corresponding progress bars or indicators to be updated.
"""
if not isinstance(updatables, (list,tuple)):
updatables = [updatables]
for object_ in updatables:
# If an updatable is directly callable, we set that up to be called without further ado.
if callable(object_):
update_func = object_
else:
# If not, we search for a registered type that matches object_ and register (or look up if previously registered) a factory-produced updater for that instance.
# Examine the unit tests for issue #574 in data_obj_test.py for factory examples.
update_func = _update_functions.get(object_)
if not update_func:
# search based on type
for class_,factory_ in _update_types:
if isinstance(object_,class_):
update_func = factory_(object_)
register_update_instance(object_, update_func)
break
else:
logging_function("Could not derive an update function for: %r",object_)
continue
# Do the update.
if update_func: update_func(n)
def call___del__if_exists(super_):

@@ -127,3 +195,3 @@ """

def _download(self, obj, local_path, num_threads, **options):
def _download(self, obj, local_path, num_threads, updatables = (), **options):
"""Transfer the contents of a data object to a local file.

@@ -149,3 +217,4 @@

target_resource_name = options.get(kw.RESC_NAME_KW,''),
data_open_returned_values = data_open_returned_values_):
data_open_returned_values = data_open_returned_values_,
updatables = updatables):
raise RuntimeError("parallel get failed")

@@ -155,5 +224,6 @@ else:

f.write(chunk)
do_progress_updates(updatables, len(chunk))
def get(self, path, local_path = None, num_threads = DEFAULT_NUMBER_OF_THREADS, **options):
def get(self, path, local_path = None, num_threads = DEFAULT_NUMBER_OF_THREADS, updatables = (), **options):
"""

@@ -169,3 +239,3 @@ Get a reference to the data object at the specified `path'.

if local_path:
self._download(path, local_path, num_threads = num_threads, **options)
self._download(path, local_path, num_threads = num_threads, updatables = updatables, **options)

@@ -187,3 +257,3 @@ query = self.sess.query(DataObject)\

def put(self, local_path, irods_path, return_data_object = False, num_threads = DEFAULT_NUMBER_OF_THREADS, **options):
def put(self, local_path, irods_path, return_data_object = False, num_threads = DEFAULT_NUMBER_OF_THREADS, updatables = (), **options):

@@ -203,3 +273,3 @@ if self.sess.collections.exists(irods_path):

options.get(kw.DEST_RESC_NAME_KW,''),
open_options = options ):
open_options = options, updatables = updatables):
raise RuntimeError("parallel put failed")

@@ -213,2 +283,3 @@ else:

o.write(chunk)
do_progress_updates(updatables, len(chunk))
if kw.ALL_KW in options:

@@ -269,3 +340,4 @@ repl_options = options.copy()

data_open_returned_values = None,
progressQueue = False):
progressQueue = False,
updatables = ()):
"""Call into the irods.parallel library for multi-1247 GET.

@@ -281,3 +353,4 @@

data_open_returned_values = data_open_returned_values,
queueLength = (DEFAULT_QUEUE_DEPTH if progressQueue else 0))
queueLength = (DEFAULT_QUEUE_DEPTH if progressQueue else 0),
updatables = updatables)

@@ -292,2 +365,3 @@ def parallel_put(self,

open_options = {},
updatables = (),
progressQueue = False):

@@ -303,3 +377,4 @@ """Call into the irods.parallel library for multi-1247 PUT.

open_options = open_options,
queueLength = (DEFAULT_QUEUE_DEPTH if progressQueue else 0)
queueLength = (DEFAULT_QUEUE_DEPTH if progressQueue else 0),
updatables = updatables,
)

@@ -442,4 +517,5 @@

returned_values['session'] = directed_sess
conn.release()
conn = directed_sess.pool.get_connection()
logger.debug('redirect_to_host = %s', redirected_host)
logger.debug(u'redirect_to_host = %s', redirected_host)

@@ -474,2 +550,33 @@ # Restore RESC HIER for DATA_OBJ_OPEN call

def replica_truncate(self, path, desired_size, **options):
if self.sess.server_version == (4,3,2):
message = 'replica_truncate responses can fail to parse with iRODS 4.3.2 due to routine omission of the JSON response string, so this method is not supported for iRODS 4.3.2.'
raise ex.OperationNotSupported(message)
else:
required_server_version = (4,3,3)
if self.sess.server_version < required_server_version:
raise ex.NotImplementedInIRODSServer('replica_truncate', required_server_version)
message_body = FileOpenRequest(
objPath=path,
createMode=0,
openFlags=0,
offset=0,
dataSize=desired_size,
numThreads=self.sess.numThreads,
oprType=0,
KeyValPair_PI=StringStringMap(options),
)
message = iRODSMessage('RODS_API_REQ',
msg=message_body,
int_info=api_number["REPLICA_TRUNCATE_AN"])
with self.sess.pool.get_connection() as conn:
conn.send(message)
response = conn.recv()
msg = response.get_main_message( STR_PI )
return json.loads(msg.myStr)
def trim(self, path, **options):

@@ -743,1 +850,46 @@

response = conn.recv()
def touch(self, path, **options):
"""Change the mtime of a data object.
A path argument that does not exist will be created as an empty data
object, unless "no_create=True" is supplied.
Parameters
----------
path: string
The absolute logical path of a data object.
no_create: boolean, optional
Instructs the system not to create a data object when it does not
exist.
replica_number: integer, optional
The replica number of the replica to update. Replica numbers cannot
be used to create data objects or additional replicas. Cannot be used
with "leaf_resource_name".
leaf_resource_name: string, optional
The name of the leaf resource containing the replica to update. If
the object identified by the "path" parameter does not exist and this
parameter holds a valid resource, the data object will be created at
the specified resource. Cannot be used with "replica_number" parameter.
seconds_since_epoch: integer, optional
The number of seconds since epoch representing the new mtime. Cannot
be used with "reference" parameter.
reference: string, optional
Use the mtime of the logical path to the data object or collection
identified by this option. Cannot be used with "seconds_since_epoch"
parameter.
Raises
------
InvalidInputArgument
If the path points to a collection.
"""
if _logical_path._is_collection(self.sess, path):
raise ex.InvalidInputArgument()
_api_impl._touch_impl(self.sess, path, **options)

@@ -98,5 +98,2 @@ from __future__ import print_function

def add(self, model_cls, path, meta, **opts):
# Avoid sending request with empty argument(s)
if not(len(path) and len(meta.name) and len(meta.value)):
raise ValueError('Empty value in ' + repr(meta))

@@ -189,2 +186,3 @@ resource_type = self._model_class_to_resource_type(model_cls)

request = {
"admin_mode": True if kw.ADMIN_KW in self.__kw.keys() else False,
"entity_name": path,

@@ -191,0 +189,0 @@ "entity_type": self._model_class_to_resource_description(model_cls),

@@ -23,3 +23,3 @@ """Define objects related to communication with iRODS server API endpoints."""

class Bad_AVU_Field(Exception):
class Bad_AVU_Field(ValueError):
pass

@@ -71,3 +71,5 @@

PARSER_TYPE_STRINGS = {v:k for k,v in XML_Parser_Type.__members__.items() if v.value != 0}
# This creates a mapping from the "valid" (nonzero) XML_Parser_Type enums -- those which represent the actual parser
# choices -- to their corresponding names as strings (e.g. XML_Parser_Type.STANDARD_XML is mapped to 'STANDARD_XML'):
PARSER_TYPE_STRINGS = {v:k for k,v in XML_Parser_Type.__members__.items() if v.value != 0}

@@ -116,2 +118,5 @@ # We maintain values on a per-thread basis of:

def string_for_XML_parser(parser_enum):
return PARSER_TYPE_STRINGS[parser_enum]
_XML_parsers = {

@@ -634,2 +639,8 @@ XML_Parser_Type.STANDARD_XML : ET_xml,

class GenQuery2Request(Message):
_name = 'Genquery2Input_PI'
query_string = StringProperty()
zone = StringProperty()
sql_only = IntegerProperty()
column_mappings = IntegerProperty()

@@ -636,0 +647,0 @@ class FileOpenRequest(Message):

@@ -226,3 +226,3 @@ #!/usr/bin/env python

def _copy_part( src, dst, length, queueObject, debug_info, mgr):
def _copy_part( src, dst, length, queueObject, debug_info, mgr, updatables = ()):
"""

@@ -234,2 +234,3 @@ The work-horse for performing the copy between file and data object.

"""
from irods.manager.data_object_manager import do_progress_updates
bytecount = 0

@@ -245,2 +246,3 @@ accum = 0

if queueObject and accum and _io_send_bytes_progress(queueObject,accum): accum = 0
do_progress_updates(updatables, buf_len)
if verboseConnection:

@@ -307,3 +309,3 @@ print ("("+debug_info+")",end='',file=sys.stderr)

def _io_part (objHandle, range_, file_, opr_, mgr_, thread_debug_id = '', queueObject = None ):
def _io_part (objHandle, range_, file_, opr_, mgr_, thread_debug_id = '', queueObject = None, updatables = None):
"""

@@ -322,4 +324,4 @@ Runs in a separate thread to manage the transfer of a range of bytes within the data object.

thread_debug_id = str(threading.currentThread().ident)
return ( _copy_part (file_, objHandle, length, queueObject, thread_debug_id, mgr_) if Operation.isPut()
else _copy_part (objHandle, file_, length, queueObject, thread_debug_id, mgr_) )
return ( _copy_part (file_, objHandle, length, queueObject, thread_debug_id, mgr_, updatables) if Operation.isPut()
else _copy_part (objHandle, file_, length, queueObject, thread_debug_id, mgr_, updatables) )

@@ -348,7 +350,7 @@

logger.info("num_threads = %s ; bytes_per_thread = %s", num_threads, bytes_per_thread)
logger.info(u"num_threads = %s ; bytes_per_thread = %s", num_threads, bytes_per_thread)
_queueLength = extra_options.get('_queueLength',0)
if _queueLength > 0:
queueObject = Queue(_queueLength)
queueLength = extra_options.get('queueLength',0)
if queueLength > 0:
queueObject = Queue(queueLength)
else:

@@ -364,2 +366,7 @@ queueObject = None

File = gen_file_handle()
thread_opts = { 'updatables' : extra_options.get('updatables',()),
'queueObject' : queueObject
}
for byte_range in ranges:

@@ -374,5 +381,7 @@ if Io is None:

mgr.add_io( Io )
logger.debug('target_host = %s', Io.raw.session.pool.account.host)
logger.debug(u'target_host = %s', Io.raw.session.pool.account.host)
if File is None: File = gen_file_handle()
futures.append(executor.submit( _io_part, Io, byte_range, File, Operation, mgr, str(counter), queueObject))
futures.append(executor.submit(_io_part, Io, byte_range, File, Operation, mgr,
thread_debug_id = str(counter),
**thread_opts))
counter += 1

@@ -382,3 +391,3 @@ Io = File = None

if Operation.isNonBlocking():
if _queueLength:
if queueLength:
return futures, queueObject, mgr

@@ -407,3 +416,2 @@ else:

Io = None
if isinstance(Data,tuple):

@@ -479,5 +487,7 @@ (Data, Io) = Data[:2]

queueLength = kwopt.get('queueLength',0)
pass_thru_options = ('updatables','queueLength')
retval = _io_multipart_threaded (Operation, (Data, Io), replica_token, resc_hier, session, fname, total_bytes,
num_threads = num_threads,
_queueLength = queueLength)
**{k:v for k,v in kwopt.items() if k in pass_thru_options})

@@ -484,0 +494,0 @@ # SessionObject.data_objects.parallel_{put,get} will return:

@@ -25,2 +25,9 @@ from __future__ import absolute_import

def _adjust_timeout_to_pool_default(conn):
set_timeout = conn.socket.gettimeout()
desired_value = conn.pool.connection_timeout
if desired_value == set_timeout:
return
conn.socket.settimeout(desired_value)
class Pool(object):

@@ -61,2 +68,3 @@

def get_connection(self):
new_conn = False
with self._lock:

@@ -78,5 +86,7 @@ try:

conn = Connection(self, self.account)
new_conn = True
logger.debug("Created new connection with id: {}".format(id(conn)))
except KeyError:
conn = Connection(self, self.account)
new_conn = True
logger.debug("No connection found in idle set. Created a new connection with id: {}".format(id(conn)))

@@ -93,2 +103,7 @@

# If the connection we're about to make active was cached, it already has a socket object internal to it,
# so we potentially have to modify it to have the desired timeout.
if not new_conn:
_adjust_timeout_to_pool_default(conn)
logger.debug('num active: {}'.format(len(self.active)))

@@ -95,0 +110,0 @@ logger.debug('num idle: {}'.format(len(self.idle)))

@@ -8,2 +8,3 @@ from __future__ import absolute_import

import logging
from numbers import Number
import os

@@ -13,4 +14,6 @@ import threading

from irods.query import Query
from irods.genquery2 import GenQuery2
from irods.pool import Pool
from irods.account import iRODSAccount
from irods.api_number import api_number
from irods.manager.collection_manager import CollectionManager

@@ -23,3 +26,4 @@ from irods.manager.data_object_manager import DataObjectManager

from irods.manager.zone_manager import ZoneManager
from irods.exception import NetworkException
from irods.message import (iRODSMessage, STR_PI)
from irods.exception import (NetworkException, NotImplementedInIRODSServer)
from irods.password_obfuscation import decode

@@ -63,2 +67,13 @@ from irods import NATIVE_AUTH_SCHEME, PAM_AUTH_SCHEMES

def library_features(self):
irods_version_needed = (4,3,1)
if self.server_version < irods_version_needed:
raise NotImplementedInIRODSServer('library_features', irods_version_needed)
message = iRODSMessage('RODS_API_REQ', int_info = api_number['GET_LIBRARY_FEATURES_AN'])
with self.pool.get_connection() as conn:
conn.send(message)
response = conn.recv()
msg = response.get_main_message( STR_PI )
return json.loads(msg.myStr)
@property

@@ -274,2 +289,16 @@ def env_file (self):

def genquery2_object(self, **kwargs):
""" Returns GenQuery2 object
Returns GenQuery2 object that can be used to execute GenQuery2 queries,
to retrieve the SQL query for a particular GenQuery2 query, and to
get GenQuery2 column mappings.
"""
return GenQuery2(self, **kwargs)
def genquery2(self, query, **kwargs):
"""Shorthand for executing a single GenQuery2 query."""
q = GenQuery2(self)
return q.execute(query, **kwargs)
@property

@@ -310,2 +339,10 @@ def username(self):

@property
def client_hints(self):
message = iRODSMessage('RODS_API_REQ', int_info=api_number['CLIENT_HINTS_AN'])
with self.pool.get_connection() as conn:
conn.send(message)
response = conn.recv()
return response.get_json_encoded_struct()
@property
def pam_pw_negotiated(self):

@@ -333,5 +370,16 @@ self.pool.account.store_pw = []

def connection_timeout(self, seconds):
if seconds == 0:
exc = ValueError("Setting an iRODS connection_timeout to 0 seconds would make it non-blocking.")
raise exc
elif isinstance(seconds, Number):
if seconds < 0 or str(seconds) == 'nan' or str(abs(seconds)) == 'inf':
exc = ValueError("The iRODS connection_timeout may not be assigned a negative or otherwise rogue value (eg: NaN, Inf).")
raise exc
elif seconds is None:
pass
else:
exc = ValueError("The iRODS connection_timeout must be assigned a positive int, positive float, or None.")
raise exc
self._cached_connection_timeout = seconds
if seconds is not None:
self.pool.connection_timeout = seconds
self.pool.connection_timeout = seconds

@@ -338,0 +386,0 @@ @staticmethod

@@ -13,2 +13,3 @@ #! /usr/bin/env python

from irods.models import User,Collection,DataObject
from irods.path import iRODSPath
from irods.user import iRODSUser

@@ -375,2 +376,85 @@ from irods.session import iRODSSession

def test_iRODSAccess_can_be_constructed_using_iRODSCollection__issue_558(self):
user_name = "testuser"
collection_path = "/".join([helpers.home_collection(self.sess), "give_read_access_to_this"])
try:
user = self.sess.users.create(user_name, 'rodsuser')
collection = self.sess.collections.create(collection_path)
# Give user access to data object. This should succeed. Before the fix in #558, this would cause an error
# from pickle during a call to deepcopy of the iRODSAccess object. The library does not know how to pickle
# an iRODSCollection object.
access = iRODSAccess('read', collection, user.name)
self.sess.acls.set(access)
# We can get permissions from collection, and the test user's entry is there.
perms = self.sess.acls.get(collection)
self.assertTrue(any(p for p in perms if p.user_name == user_name))
finally:
self.sess.users.get(user_name).remove()
self.sess.collections.remove(collection_path, force=True)
def test_iRODSAccess_can_be_constructed_using_iRODSDataObject__issue_558(self):
user_name = "testuser"
data_object_path = "/".join([helpers.home_collection(self.sess), "give_read_access_to_this"])
try:
user = self.sess.users.create(user_name, 'rodsuser')
data_object = self.sess.data_objects.create(data_object_path)
# Give user access to data object. This should succeed. Before the fix in #558, this would cause an error
# from pickle during a call to deepcopy of the iRODSAccess object. The library does not know how to pickle
# an iRODSDataObject object.
access = iRODSAccess('read', data_object, user.name)
self.sess.acls.set(access)
# We can get permissions from the data object, and the test user's entry is there.
perms = self.sess.acls.get(data_object)
self.assertTrue(any(p for p in perms if p.user_name == user_name))
finally:
self.sess.users.get(user_name).remove()
self.sess.data_objects.unlink(data_object_path, force=True)
def test_iRODSAccess_can_be_constructed_using_iRODSPath__issue_558(self):
user_name = "testuser"
data_object_path = "/".join([helpers.home_collection(self.sess), "give_read_access_to_this"])
try:
irods_path = iRODSPath(data_object_path)
user = self.sess.users.create(user_name, 'rodsuser')
data_object = self.sess.data_objects.create(data_object_path)
# Give user access to data object. This should succeed. Before the fix in #558, this would cause an error
# from pickle during a call to deepcopy of the iRODSAccess object. The library does not know how to pickle
# an iRODSDataObject object.
access = iRODSAccess('read', irods_path, user.name)
self.sess.acls.set(access)
# We can get permissions from the data object, and the test user's entry is there.
perms = self.sess.acls.get(data_object)
self.assertTrue(any(p for p in perms if p.user_name == user_name))
finally:
self.sess.users.get(user_name).remove()
self.sess.data_objects.unlink(data_object_path, force=True)
def test_iRODSAccess_cannot_be_constructed_using_unsupported_type__issue_558(self):
# Before the fix in #558, this would have been allowed and only later would the type discrepancy be revealed,
# leading to opaque error messages. Now, the types are checked on the way in to ensure clarity and correctness.
# TODO(#480): We cannot use the unittest.assertRaises context manager as this was introduced in python 3.1.
assertCall = getattr(self,'assertRaisesRegex',None)
if assertCall is None:
assertCall = self.assertRaisesRegexp
assertCall(
TypeError,
"'path' parameter must be of type 'str', 'irods.collection.iRODSCollection', "
"'irods.data_object.iRODSDataObject', or 'irods.path.iRODSPath'.",
iRODSAccess, 'read', self.sess)
if __name__ == '__main__':

@@ -377,0 +461,0 @@ # let the tests find the parent irods lib

#! /usr/bin/env python
from __future__ import absolute_import
from datetime import datetime
import os

@@ -18,9 +19,27 @@ import sys

RODSUSER = 'nonadmin'
class TestCollection(unittest.TestCase):
class WrongUserType(RuntimeError): pass
@classmethod
def setUpClass(cls):
adm = helpers.make_session()
if adm.users.get(adm.username).type != 'rodsadmin':
raise cls.WrongUserType('Must be an iRODS admin to run tests in class {0.__name__}'.format(cls))
cls.logins = helpers.iRODSUserLogins(adm)
cls.logins.create_user(RODSUSER, 'abc123')
@classmethod
def tearDownClass(cls):
# TODO(#553): Skipping this will result in an interpreter seg fault for Py3.6 but not 3.11; why?
del cls.logins
def setUp(self):
self.sess = helpers.make_session()
self.test_coll_path = '/{}/home/{}/test_dir'.format(self.sess.zone, self.sess.username)
self.test_coll = self.sess.collections.create(self.test_coll_path)

@@ -384,3 +403,106 @@

def test_update_mtime_of_collection_using_touch_operation_as_non_admin__525(self):
user_session = self.logins.session_for_user(RODSUSER)
# Capture mtime of the home collection.
home_collection_path = helpers.home_collection(user_session)
collection = user_session.collections.get(home_collection_path)
old_mtime = collection.modify_time
# Set the mtime to an earlier time.
new_mtime = 1400000000
user_session.collections.touch(home_collection_path, seconds_since_epoch=new_mtime)
# Compare mtimes for correctness.
collection = user_session.collections.get(home_collection_path)
self.assertEqual(datetime.utcfromtimestamp(new_mtime), collection.modify_time)
self.assertGreater(old_mtime, collection.modify_time)
def test_touch_operation_does_not_create_new_collections__525(self):
user_session = self.logins.session_for_user(RODSUSER)
# The collection should not exist.
home_collection = helpers.home_collection(user_session)
collection_path = '{home_collection}/test_touch_operation_does_not_create_new_collections__525'.format(**locals())
with self.assertRaises(CollectionDoesNotExist):
user_session.collections.get(collection_path)
# Show the touch operation throws an exception if the target collection
# does not exist.
with self.assertRaises(CollectionDoesNotExist):
user_session.collections.touch(collection_path)
# Show the touch operation did not create a new collection.
with self.assertRaises(CollectionDoesNotExist):
user_session.collections.get(collection_path)
def test_touch_operation_does_not_work_when_given_a_data_object__525(self):
try:
user_session = self.logins.session_for_user(RODSUSER)
home_collection = helpers.home_collection(user_session)
# Create a data object.
data_object_path = '{home_collection}/test_touch_operation_does_not_work_when_given_a_data_object__525.txt'.format(**locals())
self.assertFalse(user_session.data_objects.exists(data_object_path))
user_session.data_objects.touch(data_object_path)
self.assertTrue(user_session.data_objects.exists(data_object_path))
# Show the touch operation for collections throws an exception when
# given a path pointing to a data object.
with self.assertRaises(CollectionDoesNotExist):
user_session.collections.touch(data_object_path)
finally:
user_session.data_objects.unlink(data_object_path, force=True)
def test_touch_operation_ignores_unsupported_options__525(self):
user_session = self.logins.session_for_user(RODSUSER)
home_collection = helpers.home_collection(user_session)
path = '{home_collection}/test_touch_operation_ignores_unsupported_options__525'.format(**locals())
try:
# Capture mtime of the home collection.
collection = user_session.collections.create(path)
old_mtime = collection.modify_time
# Capture the current time.
time.sleep(2) # Guarantees the mtime is different.
new_mtime = int(time.time())
# The touch API for the iRODS server will attempt to create a new data object
# if the "no_create" option is set to false. The PRC's collection interface will
# ignore that option if passed.
#
# The following arguments don't make sense for collections and will also be ignored.
#
# - replica_number
# - leaf_resource_name
#
# They are included to prove the PRC handles them appropriately (i.e. unsupported
# parameters are removed from the request).
user_session.collections.touch(path,
no_create=False,
replica_number=525,
seconds_since_epoch=new_mtime,
leaf_resource_name='ufs525')
# Compare mtimes for correctness.
collection = user_session.collections.get(path)
self.assertEqual(datetime.utcfromtimestamp(int(new_mtime)), collection.modify_time)
finally:
if collection:
user_session.collections.remove(path, recurse=True, force=True)
def test_subcollections_member_excludes_root_collection__571(self):
root_coll = self.sess.collections.get("/")
# Assert that none of the root collection's immediate children (as listed in the object's
# 'subcollections' property) point to the root subcollection.
self.assertEqual(root_coll.path, "/")
self.assertEqual([], [_ for _ in root_coll.subcollections if _.path == "/"])
if __name__ == "__main__":

@@ -387,0 +509,0 @@ # let the tests find the parent irods lib

@@ -9,4 +9,4 @@ #! /usr/bin/env python

import irods.test.helpers as helpers
from irods.test.helpers import (server_side_sleep, temporarily_assign_attribute as temp_setter)
class TestConnections(unittest.TestCase):

@@ -109,19 +109,76 @@

def assert_timeout_value_propagated_to_socket(self, session, timeout_value):
def _assert_timeout_value_is_propagated_to_all_sockets__issue_569(self, session, expected_timeout_value = 'POOL_TIMEOUT_SETTING'):
pool = session.pool
new_conn = None
if expected_timeout_value == 'POOL_TIMEOUT_SETTING':
expected_timeout_value = pool.connection_timeout
connections = set()
# make sure idle pool is not empty
session.collections.get(helpers.home_collection(session))
conn = next(iter(session.pool.idle))
self.assertEqual(conn.socket.gettimeout(), timeout_value)
# On any connections thus far created, check that their internal socket objects are set to the expected timeout value.
try:
# Peel connections off the idle pool and check each for the expected timeout value, but don't release them to that pool yet.
while (pool.idle):
# Peel a connection (guaranteed newly-allocated for purposes of this test) and check for the proper timeout.
conn = pool.get_connection()
connections |= {conn}
self.assertEqual(conn.socket.gettimeout(), expected_timeout_value)
# Get an additional connection while idle pool is empty; this way, we know it to be newly-allocated.
new_conn = pool.get_connection()
# Check the expected timeout applies to the newly-allocated connection
self.assertEqual(new_conn.socket.gettimeout(), expected_timeout_value)
finally:
# Release and destroy the connection that was newly-allocated for this test.
if new_conn:
new_conn.release(destroy = True)
# Release connections that had been cached, by the same normal mechanism the API endpoints indirectly employ.
for conn in connections:
pool.release_connection(conn)
def test_connection_timeout_parameter_in_session_init__issue_377(self):
timeout = 1.0
sess = helpers.make_session(connection_timeout = timeout)
self.assert_timeout_value_propagated_to_socket(sess, timeout)
self._assert_timeout_value_is_propagated_to_all_sockets__issue_569(sess, timeout)
def test_assigning_session_connection_timeout__issue_377(self):
sess = self.sess
sess = helpers.make_session()
for timeout in (999999, None):
sess.connection_timeout = timeout
sess.cleanup()
self.assert_timeout_value_propagated_to_socket(sess, timeout)
self._assert_timeout_value_is_propagated_to_all_sockets__issue_569(sess, timeout)
def test_assigning_session_connection_timeout_to_invalid_values__issue_569(self):
sess = helpers.make_session()
DESIRED_TIMEOUT = 64.25
sess.connection_timeout = DESIRED_TIMEOUT
# Test our desired connection pool default timeout has taken hold.
self.assertEqual(sess.connection_timeout, DESIRED_TIMEOUT)
# Test that bad timeout values are met with an exception.
for value in (float('NaN'), float('Inf'), -float('Inf'), -1, 0, 0.0, "banana"):
with self.assertRaises(ValueError):
sess.connection_timeout = value
# Test previously set value is unaffected
self.assertEqual(sess.connection_timeout, DESIRED_TIMEOUT)
def test_assigning_session_connection_timeout__issue_569(self):
sess = helpers.make_session()
old_timeout = sess.connection_timeout
with temp_setter(sess, 'connection_timeout',1.0):
# verify we can reproduce a NetworkException from a server timeout
with self.assertRaises(NetworkException):
server_side_sleep(sess,2.5)
# temporarily suspend timeouts on a session
with temp_setter(sess, 'connection_timeout',None):
server_side_sleep(sess,2.5)
# temporarily increase (from 1.0 to 4) the timeout on a session
with temp_setter(sess, 'connection_timeout',4):
server_side_sleep(sess,2.5)
self.assertEqual(old_timeout, sess.connection_timeout)
self._assert_timeout_value_is_propagated_to_all_sockets__issue_569(sess, old_timeout)
if __name__ == '__main__':

@@ -128,0 +185,0 @@ # let the tests find the parent irods lib

@@ -20,2 +20,3 @@ from __future__ import absolute_import

import irods.client_configuration as config
import irods.rule
from irods.session import iRODSSession

@@ -103,3 +104,3 @@ from irods.message import (iRODSMessage, IRODS_VERSION)

if not getattr(_thrlocal,"rand_gen",None) : _thrlocal.rand_gen = random.Random()
_thrlocal.rand_gen.seed(seed_tuple)
_thrlocal.rand_gen.seed(hash(seed_tuple))
return '%016X' % _thrlocal.rand_gen.randint(0,(1<<64)-1)

@@ -156,2 +157,16 @@

def make_session(test_server_version = True, **kwargs):
"""Connect to an iRODS server as determined by any client environment
file present at a standard location, and by any keyword arguments given.
Arguments:
test_server_version: Of type bool; in the `irods.test.helpers` version of this
function, defaults to True. A True value causes
*iRODS_Server_Too_Recent* to be raised if the server
connected to is more recent than the current Python iRODS
client's advertised level of compatibility.
**kwargs: Keyword arguments. Fed directly to the iRODSSession
constructor. """
try:

@@ -165,3 +180,2 @@ env_file = kwargs.pop('irods_env_file')

session = iRODSSession( irods_env_file = env_file, **kwargs )
if test_server_version:

@@ -179,2 +193,3 @@ connected_version = session.server_version[:3]

def home_collection(session):
"""Return a string value for the given session's home collection."""
return "/{0.zone}/home/{0.username}".format(session)

@@ -385,1 +400,24 @@

class _unlikely_value: pass
@contextlib.contextmanager
def temporarily_assign_attribute(target, attr, value, not_set_indicator = _unlikely_value()):
save = not_set_indicator
try:
save = getattr(target, attr, not_set_indicator)
setattr(target, attr, value)
yield
finally:
if save != not_set_indicator:
setattr(target, attr, save)
else:
delattr(target, attr)
# Implement a server-side wait that ensures no TCP communication from server end for a given interval.
# Useful to test the effect of socket inactivity on a client. See python-irodsclient issue #569
def server_side_sleep(session, seconds):
# Round floating-point seconds to nearest integer + microseconds figure, required by msiSleep.
int_, frac_ = [int(_) for _ in divmod(seconds * 1.0e6 + 0.5, 1.0e6)]
rule_code = "msiSleep('{}','{}')".format(int_,frac_)
# Call the msiSleep microservice.
irods.rule.Rule(session, body = rule_code, instance_name = 'irods_rule_engine_plugin-irods_rule_language-instance').execute()

@@ -341,3 +341,32 @@ #! /usr/bin/env python

def test_atomic_metadata_operations_with_admin_kw__issue_576(self):
ses = data = user = None
adm = self.sess
if adm.server_version <= (4,2,11):
self.skipTest('ADMIN_KW not valid for Metadata API in iRODS 4.2.11 and previous')
try:
# Create a rodsuser
user = adm.users.create('bobby', 'rodsuser')
user.modify('password', 'bpass')
# Log in as rodsuser and create a data object owned by that user.
ses = iRODSSession(port = adm.port, zone = adm.zone, host = adm.host, user = user.name, password = 'bpass')
home = helpers.home_collection(ses)
data = ses.data_objects.create('{home}/issue_576'.format(**locals()))
# Do and test the results of the atomic set using the admin session, with the ADMIN_KW turned on.
data_via_admin = adm.data_objects.get(data.path)
avu_item = iRODSMeta('issue_576', 'dummy_value')
data_via_admin.metadata(admin=True).apply_atomic_operations(AVUOperation(operation = "add", avu = avu_item))
self.assertIn(avu_item, data_via_admin.metadata.items())
finally:
# Clean up objects after use.
if ses:
if ses.data_objects.exists(data.path):
ses.data_objects.unlink(data.path, force = True)
ses.cleanup()
if user: user.remove()
def test_add_coll_meta(self):

@@ -524,10 +553,38 @@ # add metadata to test collection

def test_AVUs_populated_improperly_with_empties_or_nonstrings_fail_identically__issue_547(self):
try:
to_delete = []
hc = helpers.home_collection(self.sess)
mtemplate = iRODSMeta('some_name', 'some_value', 'some_units')
for index in ('name','value','units'):
for edge_case_arg in ('', 3):
# Empty units are permitted, ie. iRODSMeta(attr,value,'') and iRODSMeta(attr,value) are equivalent.
if edge_case_arg in ('',b'') and index == 'units':
continue
for method in ('set','add'):
data = self.sess.data_objects.create("{hc}/{index}_{edge_case_arg}_{method}_AZ__issue_547".format(**locals()))
to_delete.append(data)
m = iRODSMeta(*mtemplate)
setattr(m, index, edge_case_arg)
with self.assertRaises(Bad_AVU_Field):
getattr(data.metadata, method)(*m)
finally:
for d in to_delete:
d.unlink(force=True)
def test_nonstring_as_AVU_value_raises_an_error__issue_434(self):
args = ("an_attribute",0)
with self.assertRaisesRegexp(Bad_AVU_Field,'incorrect type'):
self.coll.metadata.set("an_attribute",0)
self.coll.metadata.set(*args)
with self.assertRaisesRegexp(Bad_AVU_Field,'incorrect type'):
self.coll.metadata.add(*args)
def test_empty_string_as_AVU_value_raises_an_error__issue_434(self):
args = ("an_attribute","")
with self.assertRaisesRegexp(Bad_AVU_Field,'zero-length'):
self.coll.metadata.set("an_attribute","")
self.coll.metadata.set(*args)
with self.assertRaisesRegexp(Bad_AVU_Field,'zero-length'):
self.coll.metadata.add(*args)
if __name__ == '__main__':

@@ -534,0 +591,0 @@ # let the tests find the parent irods lib

#! /usr/bin/env python
from __future__ import absolute_import
import contextlib
import datetime

@@ -245,2 +246,19 @@ import gc

@staticmethod
@contextlib.contextmanager
def configure_logger(logger, propagate = None, level = None, handler = None):
try:
saved_level = logger.level
logger.setLevel(level)
saved_propagate = logger.propagate
logger.propagate = propagate
if handler:
logger.addHandler(handler)
yield logger
finally:
if handler:
logger.removeHandler(handler)
logger.setLevel(saved_level)
logger.propagate = saved_propagate
# Test to confirm the connection destructor log message is actually

@@ -264,13 +282,7 @@ # logged to file, to confirm the destructor is called

last_used_time_2 = None
try:
# Create a temporary log file
my_log_file = tempfile.NamedTemporaryFile()
logging.getLogger('irods.connection').setLevel(logging.DEBUG)
file_handler = logging.FileHandler(my_log_file.name, mode='a')
file_handler.setLevel(logging.DEBUG)
logging.getLogger('irods.connection').addHandler(file_handler)
my_log_file = tempfile.NamedTemporaryFile()
file_handler = logging.FileHandler(my_log_file.name, mode='a')
file_handler.setLevel(logging.DEBUG)
with self.configure_logger(logging.getLogger('irods.connection'),
propagate = False, level = logging.DEBUG, handler = file_handler):
with self.sess.pool.get_connection() as conn:

@@ -318,5 +330,3 @@ conn_obj_id_1 = id(conn)

self.assertTrue(DESTRUCTOR_MSG in lines)
finally:
# Remove irods.connection's file_handler that was added just for this test
logging.getLogger('irods.connection').removeHandler(file_handler)
file_handler.close()

@@ -323,0 +333,0 @@ def test_get_connection_refresh_time_no_env_file_input_param(self):

@@ -183,21 +183,24 @@ #! /usr/bin/env python

result13 = self.sess.query(DataObject.name).filter(
Collection.name == self.coll_path).filter(
Between(DataObject.name, [self.case_sensitive_obj_name1,
self.case_sensitive_obj_name1 + "_"])).all()
self.assertTrue(result13.has_value(self.case_sensitive_obj_name1))
self.assertEqual(len(result13), 1)
# TODO(#600): Uncomment these lines and/or make a new test when database flavor can be detected.
# The resultset for BETWEEN queries can differ from database to database.
result14 = self.sess.query(DataObject.name).filter(
Collection.name == self.coll_path).filter(
Between(DataObject.name, [str.lower(self.case_sensitive_obj_name1),
str.lower(self.case_sensitive_obj_name1) + "_"])).all()
self.assertEqual(len(result14), 0)
#result13 = self.sess.query(DataObject.name).filter(
# Collection.name == self.coll_path).filter(
# Between(DataObject.name, [self.case_sensitive_obj_name1,
# self.case_sensitive_obj_name1 + "_"])).all()
#self.assertTrue(result13.has_value(self.case_sensitive_obj_name1))
#self.assertEqual(len(result13), 1)
result15 = self.sess.query(DataObject.name).filter(
Collection.name == self.coll_path).filter(
Between(DataObject.name, [str.upper(self.case_sensitive_obj_name1),
str.upper(self.case_sensitive_obj_name1) + "_"])).all()
self.assertEqual(len(result15), 0)
#result14 = self.sess.query(DataObject.name).filter(
# Collection.name == self.coll_path).filter(
# Between(DataObject.name, [str.lower(self.case_sensitive_obj_name1),
# str.lower(self.case_sensitive_obj_name1) + "_"])).all()
#self.assertEqual(len(result14), 0)
#result15 = self.sess.query(DataObject.name).filter(
# Collection.name == self.coll_path).filter(
# Between(DataObject.name, [str.upper(self.case_sensitive_obj_name1),
# str.upper(self.case_sensitive_obj_name1) + "_"])).all()
#self.assertEqual(len(result15), 0)
def test_files_query_case_insensitive(self):

@@ -204,0 +207,0 @@ # This tests that GenQueries are case-insensitive when the case_sensitive

#!/usr/bin/env python
from __future__ import print_function
import numbers
import os
import sys
import posix
import socket
import posix
import shutil
from subprocess import (Popen, PIPE)
import sys

@@ -74,20 +75,29 @@ IRODS_SSL_DIR = '/etc/irods/ssl'

def usage(exit_code = None):
print("""Usage: {sys.argv[0]} [-f] [-h <hostname>] [-k] [-q] [-x <extension>]
-f Force replacement of the existing SSL directory (/etc/irods/ssl) with a new one, containing newly generated files.
-h In the generated certificate, use the given hostname rather than the value returned from socket.gethostname()
-k (Keep old secrets files.) Do not generate new key file or dhparams.pem file.
-q For testing; do a quick generation of a dhparams.pem file rather than waiting on system entropy to make it more secure.
-x Optional extra extension for appending to end of the filename for the generated certificate.
--help Print this help.
Any invalid option prints this help.
""".format(**globals()), file = sys.stderr)
if isinstance(exit_code, numbers.Integral):
exit(exit_code)
if __name__ == '__main__':
import getopt
try:
opt, arg_list = getopt.getopt(sys.argv[1:],'x:fh:kq')
opt, arg_list = getopt.getopt(sys.argv[1:],'x:fh:kq',['help'])
except getopt.GetoptError:
print("""Usage: {sys.argv[0]} [-f] [-h <hostname>] [-k] [-q] [-x <extension>]
-f Force replacement of the existing SSL directory (/etc/irods/ssl) with a new one, containing newly generated files.
-h In the generated certificate, use the given hostname rather than the value returned from socket.gethostname()
-k (Keep old secrets files.) Do not generate new key file or dhparams.pem file.
-q For testing; do a quick generation of a dhparams.pem file rather than waiting on system entropy to make it more secure.
-x Optional extra extension for appending to end of the filename for the generated certificate.
usage(exit_code = 1)
Any invalid option prints this help.
""".format(**locals()), file = sys.stderr)
exit(1)
opt_lookup = dict(opt)
if '--help' in opt_lookup:
usage(exit_code = 0)
ext = opt_lookup.get('-x','')

@@ -94,0 +104,0 @@ if ext:

import os
__version__ = '2.0.1'
__version__ = '2.1.0'

@@ -5,0 +5,0 @@ def version_as_string():

@@ -19,2 +19,3 @@ AUTHORS

irods/exception.py
irods/genquery2.py
irods/keywords.py

@@ -40,2 +41,3 @@ irods/meta.py

irods/client_configuration/__init__.py
irods/helpers/__init__.py
irods/manager/__init__.py

@@ -49,2 +51,5 @@ irods/manager/access_manager.py

irods/manager/zone_manager.py
irods/manager/_internal/__init__.py
irods/manager/_internal/_api_impl.py
irods/manager/_internal/_logical_path.py
irods/message/__init__.py

@@ -60,2 +65,3 @@ irods/message/message.py

irods/test/admin_test.py
irods/test/client_hints_test.py
irods/test/collection_test.py

@@ -68,4 +74,6 @@ irods/test/connection_test.py

irods/test/force_create.py
irods/test/genquery2_test.py
irods/test/helpers.py
irods/test/login_auth_test.py
irods/test/library_features_test.py
irods/test/login_auth_test_must_run_manually.py
irods/test/message_test.py

@@ -90,2 +98,3 @@ irods/test/meta_test.py

irods/test/modules/test_saving_and_loading_of_settings__issue_471.py
irods/test/modules/test_xml_parser.py
irods/test/test-data/irods_environment.json

@@ -92,0 +101,0 @@ irods/test/test-data/irods_environment_negative_refresh_field.json

#! /usr/bin/env python
from __future__ import print_function
from __future__ import absolute_import
import os
import sys
import tempfile
import unittest
import textwrap
import json
import shutil
import ssl
import irods.test.helpers as helpers
from irods.connection import Connection
from irods.session import iRODSSession, NonAnonymousLoginWithoutPassword
from irods.rule import Rule
from irods.models import User
from socket import gethostname
from irods.password_obfuscation import (encode as pw_encode)
from irods.connection import PlainTextPAMPasswordError
from irods.access import iRODSAccess
import irods.exception as ex
import contextlib
import socket
from re import compile as regex
import gc
import six
from irods.test.setupssl import create_ssl_dir
#
# Allow override to specify the PAM password in effect for the test rodsuser.
#
TEST_PAM_PW_OVERRIDE = os.environ.get('PYTHON_IRODSCLIENT_TEST_PAM_PW_OVERRIDE','')
TEST_PAM_PW = TEST_PAM_PW_OVERRIDE or 'test123'
TEST_IRODS_PW = 'apass'
TEST_RODS_USER = 'alissa'
try:
from re import _pattern_type as regex_type
except ImportError:
from re import Pattern as regex_type # Python 3.7+
def json_file_update(fname,keys_to_delete=(),**kw):
with open(fname,'r') as f:
j = json.load(f)
j.update(**kw)
for k in keys_to_delete:
if k in j: del j [k]
elif isinstance(k,regex_type):
jk = [i for i in j.keys() if k.search(i)]
for ky in jk: del j[ky]
with open(fname,'w') as out:
json.dump(j, out, indent=4)
def env_dir_fullpath(authtype): return os.path.join( os.environ['HOME'] , '.irods.' + authtype)
def json_env_fullpath(authtype): return os.path.join( env_dir_fullpath(authtype), 'irods_environment.json')
def secrets_fullpath(authtype): return os.path.join( env_dir_fullpath(authtype), '.irodsA')
RODSADMIN_ENV_PATH = os.path.expanduser('~/.irods/irods_environment.json')
SERVER_ENV_SSL_SETTINGS = {
"irods_ssl_certificate_chain_file": "/etc/irods/ssl/irods.crt",
"irods_ssl_certificate_key_file": "/etc/irods/ssl/irods.key",
"irods_ssl_dh_params_file": "/etc/irods/ssl/dhparams.pem",
"irods_ssl_ca_certificate_file": "/etc/irods/ssl/irods.crt",
"irods_ssl_verify_server": "cert"
}
CLIENT_OPTIONS_FOR_SSL = {
"irods_client_server_policy": "CS_NEG_REQUIRE",
"irods_client_server_negotiation": "request_server_negotiation",
"irods_ssl_ca_certificate_file": "/etc/irods/ssl/irods.crt",
"irods_ssl_verify_server": "cert",
"irods_encryption_key_size": 16,
"irods_encryption_salt_size": 8,
"irods_encryption_num_hash_rounds": 16,
"irods_encryption_algorithm": "AES-256-CBC"
}
def client_env_keys_from_admin_env(user_name, auth_scheme=""):
cli_env = {}
with open(RODSADMIN_ENV_PATH) as f:
srv_env = json.load(f)
for k in [ "irods_host", "irods_zone_name", "irods_port" ]:
cli_env [k] = srv_env[k]
cli_env["irods_user_name"] = user_name
if auth_scheme:
cli_env["irods_authentication_scheme"] = auth_scheme
return cli_env
@contextlib.contextmanager
def pam_password_in_plaintext(allow=True):
saved = bool(Connection.DISALLOWING_PAM_PLAINTEXT)
try:
Connection.DISALLOWING_PAM_PLAINTEXT = not(allow)
yield
finally:
Connection.DISALLOWING_PAM_PLAINTEXT = saved
class TestLogins(unittest.TestCase):
'''
Ideally, these tests should move into CI, but that would require the server
(currently a different node than the client) to have SSL certs created and
enabled.
Until then, we require these tests to be run manually on a server node,
with:
python -m unittest "irods.test.login_auth_test[.XX[.YY]]'
Additionally:
1. The PAM/SSL tests under the TestLogins class should be run on a
single-node iRODS system, by the service account user. This ensures
the /etc/irods directory is local and writable.
2. ./setupssl.py (sets up SSL keys etc. in /etc/irods/ssl) should be run
first to create (or overwrite, if appropriate) the /etc/irods/ssl directory
and its contents.
3. Must add & override configuration entries in /var/lib/irods/irods_environment
Per https://slides.com/irods/ugm2018-ssl-and-pam-configuration#/3/7
'''
user_auth_envs = {
'.irods.pam': {
'USER': TEST_RODS_USER,
'PASSWORD': TEST_PAM_PW,
'AUTH': 'pam'
},
'.irods.native': {
'USER': TEST_RODS_USER,
'PASSWORD': TEST_IRODS_PW,
'AUTH': 'native'
}
}
env_save = {}
@contextlib.contextmanager
def setenv(self,var,newvalue):
try:
self.env_save[var] = os.environ.get(var,None)
os.environ[var] = newvalue
yield newvalue
finally:
oldvalue = self.env_save[var]
if oldvalue is None:
del os.environ[var]
else:
os.environ[var]=oldvalue
def create_env_dirs(self):
dirs = {}
retval = []
# -- create environment configurations and secrets
with pam_password_in_plaintext():
for dirname,lookup in self.user_auth_envs.items():
if lookup['AUTH'] in ('pam','pam_password'):
ses = iRODSSession( host=gethostname(),
user=lookup['USER'],
zone='tempZone',
authentication_scheme=lookup['AUTH'],
password=lookup['PASSWORD'],
port= 1247 )
try:
pam_hashes = ses.pam_pw_negotiated
except AttributeError:
pam_hashes = []
if not pam_hashes: print('Warning ** PAM pw couldnt be generated' ); break
scrambled_pw = pw_encode( pam_hashes[0] )
#elif lookup['AUTH'] == 'XXXXXX': # TODO: insert other authentication schemes here
elif lookup['AUTH'] in ('native', '',None):
scrambled_pw = pw_encode( lookup['PASSWORD'] )
cl_env = client_env_keys_from_admin_env(TEST_RODS_USER)
if lookup.get('AUTH',None) is not None: # - specify auth scheme only if given
cl_env['irods_authentication_scheme'] = lookup['AUTH']
dirbase = os.path.join(os.environ['HOME'],dirname)
dirs[dirbase] = { 'secrets':scrambled_pw , 'client_environment':cl_env }
# -- create the environment directories and write into them the configurations just created
for absdir in dirs.keys():
shutil.rmtree(absdir,ignore_errors=True)
os.mkdir(absdir)
with open(os.path.join(absdir,'irods_environment.json'),'w') as envfile:
envfile.write('{}')
json_file_update(envfile.name, **dirs[absdir]['client_environment'])
with open(os.path.join(absdir,'.irodsA'),'w') as secrets_file:
secrets_file.write(dirs[absdir]['secrets'])
os.chmod(secrets_file.name,0o600)
retval = dirs.keys()
return retval
PAM_SCHEME_STRING = 'pam'
@classmethod
def setUpClass(cls):
cls.admin = helpers.make_session()
if cls.admin.server_version >= (4,3):
cls.PAM_SCHEME_STRING = cls.user_auth_envs['.irods.pam']['AUTH'] = 'pam_password'
@classmethod
def tearDownClass(cls):
cls.admin.cleanup()
def setUp(self):
super(TestLogins,self).setUp()
def tearDown(self):
for envdir in getattr(self, 'envdirs', []):
shutil.rmtree(envdir, ignore_errors=True)
super(TestLogins,self).tearDown()
def validate_session(self, session, verbose=False, **options):
# - try to get the home collection
home_coll = '/{0.zone}/home/{0.username}'.format(session)
self.assertTrue(session.collections.get(home_coll).path == home_coll)
if verbose: print(home_coll)
# - check user is as expected
self.assertEqual( session.username, TEST_RODS_USER )
# - check socket type (normal vs SSL) against whether ssl requested
use_ssl = options.pop('ssl',None)
if use_ssl is not None:
my_connect = [s for s in (session.pool.active|session.pool.idle)] [0]
self.assertEqual( bool( use_ssl ), my_connect.socket.__class__ is ssl.SSLSocket )
@contextlib.contextmanager
def _setup_rodsuser_and_optional_pw(self, name, make_irods_pw = False):
try:
self.admin.users.create(name, 'rodsuser')
if make_irods_pw:
self.admin.users.modify(name,'password',TEST_IRODS_PW)
yield
finally:
self.admin.users.remove( name )
def tst0(self, ssl_opt, auth_opt, env_opt, name = TEST_RODS_USER, make_irods_pw = False):
_auth_opt = auth_opt
if auth_opt in ('pam', 'pam_password'):
auth_opt = self.PAM_SCHEME_STRING
with self._setup_rodsuser_and_optional_pw(name = name, make_irods_pw = make_irods_pw):
self.envdirs = self.create_env_dirs()
if not self.envdirs:
raise RuntimeError('Could not create one or more client environments')
auth_opt_explicit = 'native' if _auth_opt=='' else _auth_opt
verbosity=False
#verbosity='' # -- debug - sanity check by printing out options applied
out = {'':''}
if env_opt:
with self.setenv('IRODS_ENVIRONMENT_FILE', json_env_fullpath(auth_opt_explicit)) as env_file,\
self.setenv('IRODS_AUTHENTICATION_FILE', secrets_fullpath(auth_opt_explicit)):
cli_env_extras = {} if not(ssl_opt) else dict( CLIENT_OPTIONS_FOR_SSL )
if auth_opt:
cli_env_extras.update( irods_authentication_scheme = auth_opt )
remove=[]
else:
remove=[regex('authentication_')]
with helpers.file_backed_up(env_file):
json_file_update( env_file, keys_to_delete=remove, **cli_env_extras )
session = iRODSSession(irods_env_file=env_file)
with open(env_file) as f:
out = json.load(f)
self.validate_session( session, verbose = verbosity, ssl = ssl_opt )
session.cleanup()
out['ARGS']='no'
else:
session_options = {}
if auth_opt:
session_options.update (authentication_scheme = auth_opt)
if ssl_opt:
SSL_cert = CLIENT_OPTIONS_FOR_SSL["irods_ssl_ca_certificate_file"]
session_options.update(
ssl_context = ssl.create_default_context ( purpose = ssl.Purpose.SERVER_AUTH,
capath = None,
cadata = None,
cafile = SSL_cert),
**CLIENT_OPTIONS_FOR_SSL )
lookup = self.user_auth_envs ['.irods.'+('native' if not(_auth_opt) else _auth_opt)]
session = iRODSSession ( host=gethostname(),
user=lookup['USER'],
zone='tempZone',
password=lookup['PASSWORD'],
port= 1247,
**session_options )
out = session_options
self.validate_session( session, verbose = verbosity, ssl = ssl_opt )
session.cleanup()
out['ARGS']='yes'
if verbosity == '':
print ('--- ssl:',ssl_opt,'/ auth:',repr(auth_opt),'/ env:',env_opt)
print ('--- > ',json.dumps({k:v for k,v in out.items() if k != 'ssl_context'},indent=4))
print ('---')
# == test defaulting to 'native'
def test_01(self):
self.tst0 ( ssl_opt = True , auth_opt = '' , env_opt = False , make_irods_pw = True)
def test_02(self):
self.tst0 ( ssl_opt = False, auth_opt = '' , env_opt = False , make_irods_pw = True)
def test_03(self):
self.tst0 ( ssl_opt = True , auth_opt = '' , env_opt = True , make_irods_pw = True )
def test_04(self):
self.tst0 ( ssl_opt = False, auth_opt = '' , env_opt = True , make_irods_pw = True )
# == test explicit scheme 'native'
def test_1(self):
self.tst0 ( ssl_opt = True , auth_opt = 'native' , env_opt = False, make_irods_pw = True)
def test_2(self):
self.tst0 ( ssl_opt = False, auth_opt = 'native' , env_opt = False, make_irods_pw = True)
def test_3(self):
self.tst0 ( ssl_opt = True , auth_opt = 'native' , env_opt = True, make_irods_pw = True)
def test_4(self):
self.tst0 ( ssl_opt = False, auth_opt = 'native' , env_opt = True, make_irods_pw = True)
# == test explicit scheme 'pam'
def test_5(self):
self.tst0 ( ssl_opt = True, auth_opt = 'pam' , env_opt = False )
def test_6(self):
try:
self.tst0 ( ssl_opt = False, auth_opt = 'pam' , env_opt = False )
except PlainTextPAMPasswordError:
pass
else:
# -- no exception raised
self.fail("PlainTextPAMPasswordError should have been raised")
def test_7(self):
self.tst0 ( ssl_opt = True , auth_opt = 'pam' , env_opt = True )
def test_8(self):
self.tst0 ( ssl_opt = False, auth_opt = 'pam' , env_opt = True )
@unittest.skipUnless(TEST_PAM_PW_OVERRIDE, "Skipping unless pam password is overridden (e.g. to test special characters)")
def test_escaped_pam_password_chars__362(self):
with self._setup_rodsuser_and_optional_pw(name = TEST_RODS_USER):
context = ssl._create_unverified_context(
purpose=ssl.Purpose.SERVER_AUTH, capath=None, cadata=None, cafile=None,
)
ssl_settings = {
'client_server_negotiation': 'request_server_negotiation',
'client_server_policy': 'CS_NEG_REQUIRE',
'encryption_algorithm': 'AES-256-CBC',
'encryption_key_size': 32,
'encryption_num_hash_rounds': 16,
'encryption_salt_size': 8,
'ssl_ca_certificate_file': '/etc/irods/ssl/irods.crt',
'ssl_context': context
}
irods_session = iRODSSession(
host = self.admin.host,
port = self.admin.port,
zone = self.admin.zone,
user = TEST_RODS_USER,
password = TEST_PAM_PW_OVERRIDE,
authentication_scheme = 'pam',
**ssl_settings
)
home_coll = '/{0.zone}/home/{0.username}'.format(irods_session)
self.assertEqual(irods_session.collections.get(home_coll).path, home_coll)
class TestAnonymousUser(unittest.TestCase):
def setUp(self):
admin = self.admin = helpers.make_session()
user = self.user = admin.users.create('anonymous', 'rodsuser', admin.zone)
self.home = '/{admin.zone}/home/{user.name}'.format(**locals())
admin.collections.create(self.home)
acl = iRODSAccess('own', self.home, user.name)
admin.acls.set(acl, admin = True)
self.env_file = os.path.expanduser('~/.irods.anon/irods_environment.json')
self.env_dir = ( os.path.dirname(self.env_file))
self.auth_file = os.path.expanduser('~/.irods.anon/.irodsA')
os.mkdir( os.path.dirname(self.env_file))
json.dump( { "irods_host": admin.host,
"irods_port": admin.port,
"irods_user_name": user.name,
"irods_zone_name": admin.zone }, open(self.env_file,'w'), indent=4 )
def tearDown(self):
self.admin.collections.remove(self.home, recurse = True, force = True)
self.admin.users.remove(self.user.name)
shutil.rmtree (self.env_dir, ignore_errors = True)
def test_login_from_environment(self):
orig_env = os.environ.copy()
try:
os.environ["IRODS_ENVIRONMENT_FILE"] = self.env_file
os.environ["IRODS_AUTHENTICATION_FILE"] = self.auth_file
ses = helpers.make_session()
ses.collections.get(self.home)
finally:
os.environ.clear()
os.environ.update( orig_env )
class TestMiscellaneous(unittest.TestCase):
def test_nonanonymous_login_without_auth_file_fails__290(self):
ses = self.admin
if ses.users.get( ses.username ).type != 'rodsadmin':
self.skipTest( 'Only a rodsadmin may run this test.')
try:
ENV_DIR = tempfile.mkdtemp()
ses.users.create('bob', 'rodsuser')
ses.users.modify('bob', 'password', 'bpass')
d = dict(password = 'bpass', user = 'bob', host = ses.host, port = ses.port, zone = ses.zone)
(bob_env, bob_auth) = helpers.make_environment_and_auth_files(ENV_DIR, **d)
login_options = { 'irods_env_file': bob_env, 'irods_authentication_file': bob_auth }
with helpers.make_session(**login_options) as s:
s.users.get('bob')
os.unlink(bob_auth)
# -- Check that we raise an appropriate exception pointing to the missing auth file path --
with self.assertRaisesRegexp(NonAnonymousLoginWithoutPassword, bob_auth):
with helpers.make_session(**login_options) as s:
s.users.get('bob')
finally:
try:
shutil.rmtree(ENV_DIR,ignore_errors=True)
ses.users.get('bob').remove()
except ex.UserDoesNotExist:
pass
def setUp(self):
admin = self.admin = helpers.make_session()
if admin.users.get(admin.username).type != 'rodsadmin':
self.skipTest('need admin privilege')
admin.users.create('alice','rodsuser')
def tearDown(self):
self.admin.users.remove('alice')
self.admin.cleanup()
@unittest.skipUnless(six.PY3, "Skipping in Python2 because it doesn't reliably do cyclic GC.")
def test_destruct_session_with_no_pool_315(self):
destruct_flag = [False]
class mySess( iRODSSession ):
def __del__(self):
self.pool = None
super(mySess,self).__del__() # call parent destructor(s) - will raise
# an error before the #315 fix
destruct_flag[:] = [True]
admin = self.admin
admin.users.modify('alice','password','apass')
my_sess = mySess( user = 'alice',
password = 'apass',
host = admin.host,
port = admin.port,
zone = admin.zone)
my_sess.cleanup()
del my_sess
gc.collect()
self.assertEqual( destruct_flag, [True] )
def test_non_anon_native_login_omitting_password_fails_1__290(self):
# rodsuser with password unset
with self.assertRaises(ex.CAT_INVALID_USER):
self._non_anon_native_login_omitting_password_fails_N__290()
def test_non_anon_native_login_omitting_password_fails_2__290(self):
# rodsuser with a password set
self.admin.users.modify('alice','password','apass')
with self.assertRaises(ex.CAT_INVALID_AUTHENTICATION):
self._non_anon_native_login_omitting_password_fails_N__290()
def _non_anon_native_login_omitting_password_fails_N__290(self):
admin = self.admin
with iRODSSession(zone = admin.zone, port = admin.port, host = admin.host, user = 'alice') as alice:
alice.collections.get(helpers.home_collection(alice))
class TestWithSSL(unittest.TestCase):
'''
The tests within this class should be run by an account other than the
service account. Otherwise there is risk of corrupting the server setup.
'''
def setUp(self):
if os.path.expanduser('~') == '/var/lib/irods':
self.skipTest('TestWithSSL may not be run by user irods')
if not os.path.exists('/etc/irods/ssl'):
self.skipTest('Running setupssl.py as irods user is prerequisite for this test.')
with helpers.make_session() as session:
if not session.host in ('localhost', socket.gethostname()):
self.skipTest('Test must be run co-resident with server')
def test_ssl_with_server_verify_set_to_none_281(self):
env_file = os.path.expanduser('~/.irods/irods_environment.json')
my_ssl_directory = ''
try:
with helpers.file_backed_up(env_file):
with open(env_file) as env_file_handle:
env = json.load( env_file_handle )
my_ssl_directory = tempfile.mkdtemp(dir = os.path.expanduser("~"))
# Elect for efficiency in DH param generation, eg. when setting up for testing.
create_ssl_dir(ssl_dir = my_ssl_directory, use_strong_primes_for_dh_generation = False)
settings_to_update = {key:value.replace("/etc/irods/ssl",my_ssl_directory)
for key,value in env.items() if type(value) is str and value.startswith("/etc/irods/ssl")}
settings_to_update["irods_ssl_verify_server"] = "none"
env.update( settings_to_update )
with open(env_file,'w') as f:
json.dump(env,f)
with helpers.make_session() as session:
session.collections.get('/{session.zone}/home/{session.username}'.format(**locals()))
finally:
if my_ssl_directory:
shutil.rmtree(my_ssl_directory)
if __name__ == '__main__':
# let the tests find the parent irods lib
sys.path.insert(0, os.path.abspath('../..'))
unittest.main()

Sorry, the diff of this file is too big to display

Sorry, the diff of this file is too big to display

Sorry, the diff of this file is too big to display

Sorry, the diff of this file is too big to display