🚨 Active Supply Chain Attack:node-ipc Package Compromised.Learn More
Socket
Book a DemoSign in
Socket

python-irodsclient

Package Overview
Dependencies
Maintainers
1
Versions
35
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

python-irodsclient - pypi Package Compare versions

Comparing version
3.1.1
to
3.2.0
+269
irods/auth/pam_interactive.py
from . import (
__NEXT_OPERATION__,
__FLOW_COMPLETE__,
authentication_base,
_auth_api_request,
FORCE_PASSWORD_PROMPT,
throw_if_request_message_is_missing_key,
AuthStorage,
STORE_PASSWORD_IN_MEMORY,
CLIENT_GET_REQUEST_RESULT
)
from .native import _authenticate_native
import getpass
import jsonpatch
import jsonpointer
import logging
import sys
# Constants defining the states and operations for the pam_interactive authentication flow
AUTH_CLIENT_AUTH_REQUEST = "pam_auth_client_request"
AUTH_CLIENT_AUTH_RESPONSE = "pam_auth_response"
PERFORM_RUNNING = "running"
PERFORM_READY = "ready"
PERFORM_NEXT = "next"
PERFORM_RESPONSE = "response"
PERFORM_WAITING = "waiting"
PERFORM_WAITING_PW = "waiting_pw"
PERFORM_ERROR = "error"
PERFORM_TIMEOUT = "timeout"
PERFORM_AUTHENTICATED = "authenticated"
PERFORM_NOT_AUTHENTICATED = "not_authenticated"
PAM_INTERACTIVE_SCHEME = "pam_interactive"
PERFORM_NATIVE_AUTH = "native_auth"
AUTH_AGENT_AUTH_REQUEST = "auth_agent_auth_request"
AUTH_AGENT_AUTH_RESPONSE = "auth_agent_auth_response"
_logger = logging.getLogger(__name__)
def login(conn, **extra_opt):
"""The entry point for the pam_interactive authentication scheme."""
# The AuthStorage object holds the token generated by the server for the native auth step
depot = AuthStorage.create_temp_pw_storage(conn)
auth_client_object = _pam_interactive_ClientAuthState(conn, depot, scheme=PAM_INTERACTIVE_SCHEME)
auth_client_object.authenticate_client(
initial_request=extra_opt
)
class _pam_interactive_ClientAuthState(authentication_base):
def __init__(self, conn, depot, *_, **_kw):
super().__init__(conn, *_, **_kw)
self.depot = depot
self._list_for_request_result_return = None
def auth_client_start(self, request):
self._list_for_request_result_return = request.pop(CLIENT_GET_REQUEST_RESULT, None)
resp = request.copy()
# "pstate" stores persistent values across multiple authentication steps, allowing the server
# to recall previous inputs through JSON pointers. "pdirty" flags if pstate has changed and needs syncing.
# The server side implementation can be found here: https://github.com/irods/irods_auth_plugin_pam_interactive
# The plugin is built on the authentication framework described here:
# https://github.com/irods-contrib/irods_working_group_authentication/tree/e83e84df8ea4a732e5de894fb28aae281c3b3d29/development
resp["pstate"] = resp.get("pstate", {})
resp["pdirty"] = resp.get("pdirty", False)
resp['user_name'] = self.conn.account.proxy_user
resp['zone_name'] = self.conn.account.proxy_zone
# If not forcing a prompt, check for existing credentials (.irodsA) to attempt native auth directly
if not resp.get(FORCE_PASSWORD_PROMPT, False):
if self.conn.account.password and self.conn.account.derived_auth_file:
resp[__NEXT_OPERATION__] = PERFORM_NATIVE_AUTH
return resp
# Otherwise, begin the full interactive flow
resp[__NEXT_OPERATION__] = AUTH_CLIENT_AUTH_REQUEST
return resp
def pam_auth_client_request(self, request):
server_req = request.copy()
server_req[__NEXT_OPERATION__] = AUTH_AGENT_AUTH_REQUEST
resp = _auth_api_request(self.conn, server_req)
resp[__NEXT_OPERATION__] = AUTH_CLIENT_AUTH_RESPONSE
return resp
def pam_auth_response(self, request):
throw_if_request_message_is_missing_key(request, ["user_name", "zone_name"])
server_req = request.copy()
# The "next_operation" key here instructs the server which of its own operations to run
server_req[__NEXT_OPERATION__] = AUTH_AGENT_AUTH_RESPONSE
resp = _auth_api_request(self.conn, server_req)
# The server's response contains a "next_operation" key which dictates the client's next state.
return resp
def _get_default_value(self, request):
"""Looks for a default value in pstate based on a path from the server."""
default_path = request.get("msg", {}).get("default_path", "")
if default_path:
jptr = jsonpointer.JsonPointer(default_path)
return str(jptr.get(request.get("pstate", {}), ""))
return ""
def _patch_state(self, req):
"""Applies server patch instructions to the client's pstate."""
patch_ops = req.get("msg", {}).get("patch")
if not patch_ops:
return
resp = req.get("resp", "")
# If the patch operation is an add or replace without a value, use the response value (following json patch RFC)
for op in patch_ops:
if op.get("op") in ["add", "replace"] and "value" not in op:
op["value"] = resp
req["pstate"] = jsonpatch.apply_patch(req.get("pstate", {}), patch_ops)
req["pdirty"] = True
del req["msg"]["patch"]
def _retrieve_entry(self, req):
"""Checks if the server is asking for a value already stored in pstate."""
if "retrieve" not in req.get("msg", {}):
return False
retr_path = req.get("msg", {}).get("retrieve", "")
if retr_path:
jptr = jsonpointer.JsonPointer(retr_path)
req["resp"] = str(jptr.get(req.get("pstate", {}), ""))
return True
# If no value found in pstate, set resp to empty string.
# The server will then decide the next step based on its configuration for the current prompt.
# It may terminate the flow if the input was required, fall back to a default value, or re-prompt the user.
req["resp"] = ""
return True
def _get_input(self, server_req, is_password=False, prompt_label="Input: "):
"""Handles input from the user, either as a password or regular input."""
# If the server asks for a value we already have, send it without prompting
if self._retrieve_entry(server_req):
self._patch_state(server_req)
server_req[__NEXT_OPERATION__] = AUTH_AGENT_AUTH_RESPONSE
return _auth_api_request(self.conn, server_req)
prompt = server_req.get("msg", {}).get("prompt", prompt_label)
default_value = self._get_default_value(server_req)
display_prompt = prompt
if default_value:
if is_password:
display_prompt += " [******] "
else:
display_prompt += f" [{default_value}] "
if is_password:
user_input = getpass.getpass(display_prompt)
else:
sys.stderr.write(display_prompt)
sys.stderr.flush()
user_input = sys.stdin.readline().strip()
server_req["resp"] = user_input or default_value
self._patch_state(server_req)
server_req[__NEXT_OPERATION__] = AUTH_AGENT_AUTH_RESPONSE
return _auth_api_request(self.conn, server_req)
def waiting(self, request):
"""Handles interactive input requests from the server."""
server_req = request.copy()
return self._get_input(server_req, is_password=False)
def waiting_pw(self, request):
"""Handles the case where a password is specifically requested."""
server_req = request.copy()
return self._get_input(server_req, is_password=True, prompt_label="Password: ")
def authenticated(self, request):
throw_if_request_message_is_missing_key(request, ["request_result"])
pw = request["request_result"] # The password token returned by the server
if not self.depot:
raise RuntimeError("auth storage object was either not set, or allowed to expire prematurely.")
if request.get(STORE_PASSWORD_IN_MEMORY):
self.depot.use_client_auth_file(None)
self.depot.store_pw(pw)
# Return the password token to the caller if requested
if isinstance(self._list_for_request_result_return, list):
self._list_for_request_result_return[:] = (pw,)
resp = request.copy()
resp[__NEXT_OPERATION__] = PERFORM_NATIVE_AUTH
return resp
def native_auth(self, request):
resp = request.copy()
# The native auth function will use the depot to retrieve the password token
_authenticate_native(self.conn, request)
resp[__NEXT_OPERATION__] = __FLOW_COMPLETE__
self.loggedIn = 1
return resp
# Pass-through states like next, running, ready, and response are used when the server
# performs internal updates without needing user input. The client applies state changes,
# optionally shows a prompt, and immediately sends the updated context back.
def next(self, request):
prompt = request.get("msg", {}).get("prompt", "")
if prompt:
_logger.info("Server prompt: %s", prompt)
server_req = request.copy()
self._patch_state(server_req)
server_req[__NEXT_OPERATION__] = AUTH_AGENT_AUTH_RESPONSE
resp = _auth_api_request(self.conn, server_req)
return resp
def running(self, request):
return self.next(request)
def ready(self, request):
return self.next(request)
def response(self, request):
return self.next(request)
# Failure states like error, timeout, and not_authenticated occur when authentication fails.
# The client informs the user, marks login as unsuccessful, and ends the flow.
def _auth_failure(self, request, message):
_logger.error(message)
resp = request.copy()
resp[__NEXT_OPERATION__] = __FLOW_COMPLETE__
self.loggedIn = 0
return resp
def error(self, request):
return self._auth_failure(request, "Authentication error.")
def timeout(self, request):
return self._auth_failure(request, "Authentication timed out.")
def not_authenticated(self, request):
return self._auth_failure(request, "Authentication failed possibly due to incorrect credentials.")
import unittest
import os
import json
from irods.client_init import write_pam_interactive_irodsA_file
from unittest.mock import patch
from irods.auth import ClientAuthError, FORCE_PASSWORD_PROMPT
from irods.auth.pam_interactive import (
_pam_interactive_ClientAuthState,
PERFORM_WAITING,
PERFORM_AUTHENTICATED,
PERFORM_NEXT,
)
from irods.test.helpers import make_session
from irods.auth.pam_interactive import __NEXT_OPERATION__, __FLOW_COMPLETE__
from irods.auth.pam_interactive import _auth_api_request
class PamInteractiveTest(unittest.TestCase):
def setUp(self):
# These tests assume the irods_environment.json file is set up correctly
# and that the iRODS user 'alice' exists in the 'tempZone' zone and has the password 'rods'.
# There should also be a linux user 'alice' with the same password.
self.sess = None
self.auth_client = _pam_interactive_ClientAuthState(None, None, scheme="pam_interactive")
self.env_file_path = os.path.expanduser("~/.irods/irods_environment.json")
self.auth_file_path = os.path.expanduser("~/.irods/.irodsA")
with open(self.env_file_path) as f:
env = json.load(f)
self.user = env.get("irods_user_name", "alice")
self.zone = env.get("irods_zone_name", "tempZone")
self.password = "rods"
def tearDown(self):
if self.sess:
self.sess.cleanup()
if os.path.exists(self.auth_file_path):
os.remove(self.auth_file_path)
def test_pam_interactive_login_basic(self):
with patch("getpass.getpass", return_value=self.password):
self.sess = make_session(test_server_version=False, env_file=self.env_file_path, authentication_scheme="pam_interactive")
# Creating a session does not trigger auth, so the home collection is accessed to trigger and confirm auth succeeded
home = self.sess.collections.get(f"/{self.sess.zone}/home/{self.sess.username}")
self.assertEqual(home.name, self.sess.username)
def test_pam_interactive_auth_file_creation(self):
with patch("getpass.getpass", return_value=self.password):
write_pam_interactive_irodsA_file(env_file=self.env_file_path)
self.assertTrue(os.path.exists(self.auth_file_path), ".irodsA file was not created")
with patch("getpass.getpass", return_value=self.password) as mock_getpass:
self.sess = make_session(test_server_version=False, env_file=self.env_file_path, authentication_scheme= "pam_interactive")
# Creating a session does not trigger auth, so the home collection is accessed to trigger and confirm auth succeeded
home = self.sess.collections.get(f"/{self.sess.zone}/home/{self.sess.username}")
self.assertEqual(home.name, self.sess.username)
mock_getpass.assert_not_called()
def test_forced_interactive_flow(self):
with patch("getpass.getpass", return_value=self.password):
write_pam_interactive_irodsA_file(env_file=self.env_file_path)
self.assertTrue(os.path.exists(self.auth_file_path), ".irodsA file was not created")
with patch("getpass.getpass", return_value=self.password) as mock_getpass:
self.sess = make_session(test_server_version=False, env_file=self.env_file_path, authentication_scheme="pam_interactive")
self.sess.set_auth_option_for_scheme("pam_interactive", FORCE_PASSWORD_PROMPT, True)
# Creating a session does not trigger auth, so the home collection is accessed to trigger and confirm auth succeeded
home = self.sess.collections.get(f"/{self.sess.zone}/home/{self.sess.username}")
self.assertEqual(home.name, self.sess.username)
mock_getpass.assert_called_once()
def test_failed_login_incorrect_password(self):
with patch("getpass.getpass", return_value="wrong_password"):
with self.assertRaises(ClientAuthError):
self.sess = make_session(test_server_version=False, env_file=self.env_file_path, authentication_scheme="pam_interactive")
self.sess.collections.get(f"/{self.sess.zone}/home/{self.sess.username}") # trigger auth flow
with patch("getpass.getpass", return_value="wrong_password"):
with self.assertRaises(ClientAuthError):
write_pam_interactive_irodsA_file(env_file=self.env_file_path)
def test_get_default_value(self):
test_cases = [
("simple_path", {"msg": {"default_path": "/username"}, "pstate": {"username": "alice"}}, "alice"),
("nested_path", {"msg": {"default_path": "/user/name"}, "pstate": {"user": {"name": "alice"}}}, "alice"),
("path_does_not_exist", {"msg": {"default_path": "/user/username"}, "pstate": {"username": "alice"}}, ""),
("non_string_value", {"msg": {"default_path": "/user/id"}, "pstate": {"user": {"id": 123}}}, "123"),
("no_default_path", {"msg": {}, "pstate": {"username": "alice"}}, ""),
]
for name, request, expected in test_cases:
with self.subTest(name=name):
self.assertEqual(self.auth_client._get_default_value(request), expected)
def test_patch_state(self):
test_cases = [
("add_op", {"msg": {"patch": [{"op": "add", "path": "/username", "value": "alice"}]}, "pstate": {}}, {"username": "alice"}, True),
("replace_op", {"msg": {"patch": [{"op": "replace", "path": "/username", "value": "rods"}]}, "pstate": {"username": "alice"}}, {"username": "rods"}, True),
("remove_op", {"msg": {"patch": [{"op": "remove", "path": "/username"}]}, "pstate": {"username": "rods"}}, {}, True),
("add_resp_fallback", {"msg": {"patch": [{"op": "add", "path": "/username"}]}, "pstate": {}, "resp": "alice"}, {"username": "alice"}, True),
("replace_resp_fallback", {"msg": {"patch": [{"op": "replace", "path": "/username"}]}, "pstate": {"username": "rods"}, "resp": "alice"}, {"username": "alice"}, True),
("nested_add_operation", {'msg': {'patch': [{'op': 'add', 'path': '/user/name', 'value': 'alice'}]}, 'pstate': {'user': {}}}, {'user': {'name': 'alice'}}, True),
("resp_fallback_empty", {'msg': {'patch': [{'op': 'add', 'path': '/username'}]}, 'pstate': {}}, {'username': ''}, True),
("no_patch_ops", {"msg": {}, "pstate": {"username": "alice"}, "pdirty": False}, {"username": "alice"}, False)
]
for name, request, expected_pstate, expected_pdirty in test_cases:
with self.subTest(name=name):
self.auth_client._patch_state(request)
self.assertEqual(request["pstate"], expected_pstate)
self.assertEqual(request["pdirty"], expected_pdirty)
def test_retrieve_entry(self):
test_cases = [
("surface_value", {"msg": {"retrieve": "/user"}, "pstate": {"user": "alice"}}, True, "alice"),
("nested_value", {"msg": {"retrieve": "/user/password"}, "pstate": {"user": {"password": "rods"}}}, True, "rods"),
("empty_value", {"msg": {"retrieve": "/user"}, "pstate": {"user": ""}}, True, ""),
("path_does_not_exist", {"msg": {"retrieve": "/missing"}, "pstate": {"user": "alice"}}, True, ""),
("non_string_value", {'msg': {'retrieve': '/user/id'}, 'pstate': {'user': {'id': 456}}}, True, '456'),
("no_retrieve_key", {"msg": {}, "pstate": {"user": "alice"}}, False, None)
]
for name, request, expected_result, expected_resp in test_cases:
with self.subTest(name=name):
self.assertEqual(self.auth_client._retrieve_entry(request), expected_result)
if expected_result:
self.assertEqual(request["resp"], expected_resp)
@patch('irods.auth.pam_interactive._auth_api_request', return_value={"result": "ok"})
@patch('sys.stderr.write')
def test_get_input(self, mock_stderr, mock_api_request):
test_cases = [
("non_password_input", False, 'sys.stdin.readline', "rods\n", {"msg": {"prompt": "Prompt:"}}, "rods"),
("non_password_default", False, 'sys.stdin.readline', "\n", {"msg": {"prompt": "Prompt:", "default_path": "/password"}, "pstate": {"password": "rods"}}, "rods"),
("password_input", True, 'getpass.getpass', "rods", {"msg": {"prompt": "Password:"}}, "rods"),
("password_default", True, 'getpass.getpass', "", {"msg": {"prompt": "Password:", "default_path": "/password"}, "pstate": {"password": "rods"}}, "rods")
]
for name, is_password, mock_target, user_input, request, expected_resp in test_cases:
with self.subTest(name=name), patch(mock_target, return_value=user_input), patch.object(self.auth_client, '_patch_state') as mock_patch:
resp = self.auth_client._get_input(request, is_password=is_password)
self.assertEqual(request["resp"], expected_resp)
self.assertEqual(resp, {"result": "ok"})
mock_patch.assert_called_once()
def test_pass_through_states(self):
with patch("irods.auth.pam_interactive._auth_api_request", return_value={"result": "ok"}):
request = {"msg": {"prompt": "Prompt:"}, "pstate": {}, "pdirty": False}
for state in [self.auth_client.next, self.auth_client.running, self.auth_client.ready, self.auth_client.response]:
with self.subTest(state=state.__name__):
resp = state(request)
self.assertEqual(resp, {"result": "ok"})
def test_failure_states(self):
request = {"foo": "bar"}
with patch("irods.auth.pam_interactive._logger"):
for state in [self.auth_client.error, self.auth_client.timeout, self.auth_client.not_authenticated]:
with self.subTest(state=state.__name__):
resp = state(request)
self.assertEqual(resp[__NEXT_OPERATION__], __FLOW_COMPLETE__)
self.assertEqual(self.auth_client.loggedIn, 0)
@patch("sys.stdin.readline", return_value="ABC123\n")
def test_pam_interactive_mfa_flow(self, mock_stdin):
state = {"stage": "before_mfa", "step": 0}
def mock_server(conn, req):
# Switch from the real server to the mock server when the password step is completed
if state["stage"] == "before_mfa":
resp = _auth_api_request(conn, req)
if req.get(__NEXT_OPERATION__) == PERFORM_NEXT: # Indicates the password step is complete
state["stage"] = "mfa_mock"
return resp
# MFA simulation steps
if state["step"] == 0:
return {
__NEXT_OPERATION__: PERFORM_WAITING,
"pstate": {"Password: ": self.password, "verification_code": ""},
"msg": {
"prompt": "Verification Code: ",
"default_path": "/verification_code",
"patch": [{"op": "add", "path": "/verification_code"}],
},
"pdirty": True,
}
elif state["step"] == 1:
return {
__NEXT_OPERATION__: PERFORM_NEXT,
"pstate": {"Password: ": self.password, "verification_code": ""},
"pdirty": True,
}
elif state["step"] == 2:
return {
__NEXT_OPERATION__: PERFORM_AUTHENTICATED,
"pstate": {"Password: ": self.password, "verification_code": "ABC123"},
"pdirty": True,
"request_result": "temp_token",
}
state["step"] += 1
with patch("irods.auth.pam_interactive._auth_api_request", side_effect=mock_server), \
patch("getpass.getpass", return_value=self.password), \
patch("irods.auth.pam_interactive._authenticate_native") as mock_native:
self.sess = make_session(test_server_version=False, env_file=self.env_file_path, authentication_scheme="pam_interactive")
self.sess.server_version # Trigger auth flow
mock_native.assert_called_once()
if __name__ == "__main__":
unittest.main()
+31
-0

@@ -14,2 +14,33 @@ # Changelog

## [v3.2.0] - 2025-08-27
This release makes the library compatible with iRODS 5, adds support for the PAM Interactive authentication scheme, improves support for groupadmins, and adds new features for GenQuery1.
With this release, users can override the global default number of rows to return for GenQuery1 queries. See [GenQuery1 Queries](https://github.com/irods/python-irodsclient/tree/v3.2.0#genquery1-queries) to learn more.
The Consortium recommends that users enable `PYTHON_IRODSCLIENT_CONFIGURATION_LOAD_ERRORS_FATAL` to aid in detecting invalid configuration values on program startup. This option will be enabled by default in a future release. See [Python iRODS Client Settings File](https://github.com/irods/python-irodsclient/tree/v3.2.0#python-irods-client-settings-file) for more information.
### Changed
- Use `group` keyword to create groups in iRODS 4.3.4 and later (#742).
- Bump iRODS compatibility to iRODS 5.0.1 (#743).
### Removed
- Remove deprecated user-group symbols (#440).
### Fixed
- Apply `user_zone` property appropriately when creating users as a groupadmin (#759).
- Qualify username when retrieving user via `session.users.get('<username>')` (#764).
### Added
- Implement support for PAM Interactive authentication scheme (#653).
- Add support for iRODS 5 access time (#700).
- Allow developers to override the global default number of rows to return for GenQuery1 queries (#712).
- Add mypy type checking (#744).
- Allow the exclusion of GenQuery1 columns through negation (#755).
- Add convenience function for creating remote users as a groupadmin (#759).
## [v3.1.1] - 2025-06-09

@@ -16,0 +47,0 @@

+1
-1

@@ -95,3 +95,3 @@ import sys

PAM_AUTH_SCHEME = PAM_AUTH_PLUGIN.lower()
PAM_AUTH_SCHEMES = (PAM_AUTH_SCHEME, "pam_password")
PAM_AUTH_SCHEMES = (PAM_AUTH_SCHEME, "pam_password", "pam_interactive")

@@ -98,0 +98,0 @@ DEFAULT_CONFIG_PATH = os.path.expanduser("~/.python_irodsclient")

@@ -10,3 +10,3 @@ import importlib

__all__ = ["pam_password", "native"]
__all__ = ["pam_interactive", "pam_password", "native"]

@@ -13,0 +13,0 @@

@@ -15,3 +15,3 @@ import ast

logger = logging.Logger(__name__)
logger = logging.getLogger(__name__)

@@ -61,5 +61,27 @@

connections = ConnectionsProperties()
class ConfigurationError(BaseException): pass
class ConfigurationValueError(ValueError,ConfigurationError): pass
class Genquery1_Properties(iRODSConfiguration, metaclass=iRODSConfigAliasMetaclass):
@property
def irods_query_limit(self):
import irods.query
return irods.query.IRODS_QUERY_LIMIT
@irods_query_limit.setter
def irods_query_limit(self, target_value):
import irods.query
requested = int(target_value)
if requested <= 0:
raise ConfigurationValueError(f'Error setting IRODS_QUERY_LIMIT to [{requested}]. Use positive values only.')
irods.query.IRODS_QUERY_LIMIT = requested
genquery1 = Genquery1_Properties()
# #############################################################################

@@ -66,0 +88,0 @@ #

@@ -118,1 +118,33 @@ #!/usr/bin/env python3

_write_encoded_auth_value(auth_file, to_encode[0], overwrite)
def write_pam_interactive_irodsA_file(overwrite=True, ttl="", **kw):
"""Write credentials to an .irodsA file for PAM interactive authentication."""
import irods.auth
ses = kw.pop("_session", None) or h.make_session(**kw)
auth_file = ses.pool.account.derived_auth_file
if not auth_file:
msg = "Auth file could not be written because no iRODS client environment was found."
raise RuntimeError(msg)
ses.set_auth_option_for_scheme(
"pam_interactive", irods.auth.FORCE_PASSWORD_PROMPT, True
)
if ttl:
ses.set_auth_option_for_scheme(
"pam_interactive", "time_to_live_in_hours", ttl
)
ses.set_auth_option_for_scheme(
"pam_interactive", irods.auth.STORE_PASSWORD_IN_MEMORY, True
)
L = []
ses.set_auth_option_for_scheme(
"pam_interactive", irods.auth.CLIENT_GET_REQUEST_RESULT, L
)
with ses.pool.get_connection() as _:
_write_encoded_auth_value(auth_file, L[0], overwrite)
from datetime import datetime, timezone
from calendar import timegm
class Column_remover:
def __init__(self, column):
self.column = column

@@ -91,2 +94,5 @@ class QueryKey:

def __neg__(self):
return Column_remover(self)
def __repr__(self):

@@ -100,5 +106,13 @@ return "<{}.{} {} {}>".format(

@property
def id_tuple(self):
return self.column_type, self.icat_key, self.icat_id
def __hash__(self):
return hash((self.column_type, self.icat_key, self.icat_id))
return hash(self.id_tuple)
def __eq__(self, other):
if isinstance(other, Column):
return self.id_tuple == other.id_tuple
return super().__eq__(other)

@@ -105,0 +119,0 @@ class Keyword(QueryKey):

@@ -62,9 +62,13 @@ import io

replicas = sorted(results, key=lambda r: r[DataObject.replica_number])
self.replicas = [
iRODSReplica(
r[DataObject.replica_number],
r[DataObject.replica_status],
r[DataObject.resource_name],
r[DataObject.path],
r[DataObject.resc_hier],
# The status quo before iRODS 5
replica_args = [(
(r[DataObject.replica_number],
r[DataObject.replica_status],
r[DataObject.resource_name],
r[DataObject.path],
r[DataObject.resc_hier],
)
,dict(
checksum=r[DataObject.checksum],

@@ -76,4 +80,11 @@ size=r[DataObject.size],

)
for r in replicas
]
) for r in replicas]
# Adjust for adding access_time in the iRODS 5 case.
if self.manager.sess.server_version >= (5,):
for n,r in enumerate(replicas):
replica_args[n][1]['access_time'] = r[DataObject.access_time]
self.replicas = [iRODSReplica(*a,**k) for a,k in replica_args]
self._meta = None

@@ -80,0 +91,0 @@

@@ -9,2 +9,3 @@ # if you're copying these from the docs, you might find the following regex helpful:

import sys
from typing import Dict

@@ -52,6 +53,2 @@

# NOTE: Everything of the form *UserGroup* is deprecated.
UserGroupDoesNotExist = GroupDoesNotExist
class ResourceDoesNotExist(DoesNotExist):

@@ -96,3 +93,3 @@ pass

class iRODSExceptionMeta(type):
codes = {}
codes: "Dict[int, iRODSException]" = {}
positive_code_error_message = (

@@ -99,0 +96,0 @@ "For {name}, a positive code of {attrs[code]} was declared."

import contextlib
import os
import sys

@@ -17,3 +18,2 @@ from irods import env_filename_from_keyword_args

class StopTestsException(Exception):

@@ -27,2 +27,3 @@

class iRODS_Server_Too_Recent_For_Testing(StopTestsException):

@@ -40,2 +41,3 @@ pass

def make_session(test_server_version=False, **kwargs):

@@ -147,1 +149,14 @@ """Connect to an iRODS server as determined by any client environment

return None
# Utility class and factory function for storing the original value of variables within the given namespace.
def create_value_cache(namespace:dict):
class CachedValues:
__namespace = namespace
@classmethod
def make_entry(cls, name):
cached_value = cls.__namespace[name]
setattr(cls,name,property(lambda self: cached_value))
return CachedValues()

@@ -39,2 +39,3 @@ """From rodsKeyWdDef.hpp"""

DATA_ACCESS_INX_KW = "dataAccessInx"
DATA_ACCESS_TIME_KW = "dataAccessTime"
NO_OPEN_FLAG_KW = "noOpenFlag"

@@ -41,0 +42,0 @@ PHYOPEN_BY_SIZE_KW = "phyOpenBySize"

@@ -76,3 +76,3 @@ from os.path import basename, dirname

.filter(*conditions)
.all()
._all()
)

@@ -79,0 +79,0 @@

@@ -8,2 +8,3 @@ import ast

import weakref
from typing import Any, List, Type
from irods.models import DataObject, Collection

@@ -17,4 +18,4 @@ from irods.manager import Manager

StringStringMap,
DataObjInfo,
ModDataObjMeta,
DataObjInfo_for_session,
ModDataObjMeta_for_session,
DataObjChksumRequest,

@@ -43,4 +44,4 @@ DataObjChksumResponse,

_update_types = []
_update_functions = weakref.WeakKeyDictionary()
_update_types: List[Type] = []
_update_functions: weakref.WeakKeyDictionary[Type, Any] = weakref.WeakKeyDictionary()

@@ -420,6 +421,6 @@

): # in iRODS 4.2.11 and later, myStr is in JSON format.
exc = Server_Checksum_Warning(checksum)
exception = Server_Checksum_Warning(checksum)
if not r_error_stack:
r_error_stack.fill(exc.response)
raise exc
r_error_stack.fill(exception.response)
raise exception
except iRODSMessage.ResponseNotParseable:

@@ -944,42 +945,50 @@ # response.msg is None when VERIFY_CHKSUM_KW is used

message_body = ModDataObjMeta(
dataObjInfo=DataObjInfo(
objPath=data_obj_info["objPath"],
rescName=data_obj_info.get("rescName", ""),
rescHier=data_obj_info.get("rescHier", ""),
dataType="",
dataSize=0,
chksum="",
version="",
filePath="",
dataOwnerName="",
dataOwnerZone="",
replNum=data_obj_info.get("replNum", 0),
replStatus=0,
statusString="",
dataId=0,
collId=0,
dataMapId=0,
flags=0,
dataComments="",
dataMode="",
dataExpiry="",
dataCreate="",
dataModify="",
dataAccess="",
dataAccessInx=0,
writeFlag=0,
destRescName="",
backupRescName="",
subPath="",
specColl=0,
regUid=0,
otherFlags=0,
KeyValPair_PI=StringStringMap(options),
in_pdmo="",
next=0,
rescId=0,
),
fields = dict(
objPath=data_obj_info["objPath"],
rescName=data_obj_info.get("rescName", ""),
rescHier=data_obj_info.get("rescHier", ""),
dataType="",
dataSize=0,
chksum="",
version="",
filePath="",
dataOwnerName="",
dataOwnerZone="",
replNum=data_obj_info.get("replNum", 0),
replStatus=0,
statusString="",
dataId=0,
collId=0,
dataMapId=0,
flags=0,
dataComments="",
dataMode="",
dataExpiry="",
dataCreate="",
dataModify="",
dataAccess="",
dataAccessInx=0,
writeFlag=0,
destRescName="",
backupRescName="",
subPath="",
specColl=0,
regUid=0,
otherFlags=0,
KeyValPair_PI=StringStringMap(options),
in_pdmo="",
next=0,
rescId=0,
)
DataObjInfo_class = DataObjInfo_for_session(self.sess)
if 'dataAccessTime' in DataObjInfo_class.__dict__:
fields["dataAccessTime"]=""
message_body = ModDataObjMeta_for_session(self.sess)(
dataObjInfo=DataObjInfo_class(**fields),
regParam=StringStringMap(meta_dict),
)
message = iRODSMessage(

@@ -986,0 +995,0 @@ "RODS_API_REQ",

import logging
import copy
from os.path import dirname, basename
from typing import Any, Dict

@@ -35,3 +36,3 @@ from irods.manager import Manager

__kw = {} # default (empty) keywords
__kw : Dict[str, Any] = {} # default (empty) keywords

@@ -87,3 +88,3 @@ def _updated_keywords(self, opts):

columns += (model.create_time, model.modify_time)
results = self.sess.query(*columns).filter(*conditions).all()
results = self.sess.query(*columns).filter(*conditions)._all()

@@ -90,0 +91,0 @@ def meta_opts(row):

@@ -49,6 +49,6 @@ import logging

def get(self, user_name, user_zone=""):
query = self.sess.query(User).filter(User.name == user_name)
if not user_zone:
user_zone = self.sess.zone
if len(user_zone) > 0:
query = query.filter(User.zone == user_zone)
query = self.sess.query(User).filter(User.name == user_name, User.zone == user_zone)

@@ -61,3 +61,11 @@ try:

def create_with_password(self, user_name, password, user_zone=""):
def create_remote(self, user_name:str, user_zone:str):
"""
Create an entry in the local catalog for a remote user. The user_type will be 'rodsuser'.
"""
if user_zone in (self.sess.zone,""):
raise ValueError(f"Parameter [{user_zone = }] must be a remote zone.")
return self.create_with_password(user_name, password='', user_zone=user_zone)
def create_with_password(self, user_name:str, password:str, user_zone:str=""):
"""This method can be used by a groupadmin to initialize the password field while creating the new user.

@@ -67,5 +75,11 @@ (This is necessary since group administrators may not change the password of an existing user.)

if '#' in user_name:
raise ValueError("A zone name must be specified in the user_zone parameter, not within the user_name.")
if password and (user_zone not in ("", self.sess.zone)):
raise ValueError("A password cannot be specified for remote zone users.")
message_body = UserAdminRequest(
"mkuser",
user_name,
user_name + ("" if not user_zone else f"#{user_zone}"),
(

@@ -265,2 +279,9 @@ ""

CREATE_GROUP__USER_TYPE__DEFAULT__API_CHANGE__VERSION_THRESHOLD = (4,3,4)
def get__group_create__user_type__default(session):
if session.server_version < CREATE_GROUP__USER_TYPE__DEFAULT__API_CHANGE__VERSION_THRESHOLD:
return "rodsgroup"
return ""
class GroupManager(UserManager):

@@ -297,12 +318,25 @@

name,
user_type="rodsgroup",
user_type=get__group_create__user_type__default,
user_zone="",
auth_str="",
group_admin=False,
group_admin=None,
**options,
):
"""Create and return a new iRODSGroup.
if not options.pop("suppress_deprecation_warning", False):
Input parameters:
-----------------
name: This is the name to be given to the new group.
user_type: (deprecated parameter) This parameter should remain unused and will effectively resolve as "rodsgroup".
user_zone: (deprecated parameter) In iRODS 4.3+, do not use this parameter as groups may not be made for a remote zone.
auth_type: (deprecated parameter) This parameter should be left to default to "" as groups do not need authentication.
group_admin: If left to its default value of None, seamlessly allows a groupadmin to create new groups.
"""
if callable(user_type):
user_type = user_type(self.sess)
if user_zone != "" or auth_str != "" or user_type not in ("", "rodsgroup"):
warnings.warn(
"Use of session.user_groups is deprecated in v1.1.7 - prefer session.groups",
"Use of non-default value for auth_str, user_type or user_zone in GroupManager.create is deprecated",
DeprecationWarning,

@@ -316,5 +350,11 @@ stacklevel=2,

if MessageClass is UserAdminRequest:
message_body = MessageClass("mkgroup", name, user_type, user_zone)
message_body = MessageClass("mkgroup", name, "rodsgroup")
else:
message_body = MessageClass("add", "user", name, user_type, "", "")
message_body = MessageClass(
"add",
("user" if self.sess.server_version < CREATE_GROUP__USER_TYPE__DEFAULT__API_CHANGE__VERSION_THRESHOLD else "group"),
name,
user_type,
"",
"")
request = iRODSMessage("RODS_API_REQ", msg=message_body, int_info=api_to_use)

@@ -334,12 +374,4 @@ with self.sess.pool.get_connection() as conn:

def addmember(
self, group_name, user_name, user_zone="", group_admin=False, **options
self, group_name, user_name, user_zone="", group_admin=None, **options
):
if not options.pop("suppress_deprecation_warning", False):
warnings.warn(
"Use of session.user_groups is deprecated in v1.1.7 - prefer session.groups",
DeprecationWarning,
stacklevel=2,
)
(MessageClass, api_key) = self._api_info(group_admin)

@@ -359,12 +391,4 @@

def removemember(
self, group_name, user_name, user_zone="", group_admin=False, **options
self, group_name, user_name, user_zone="", group_admin=None, **options
):
if not options.pop("suppress_deprecation_warning", False):
warnings.warn(
"Use of session.user_groups is deprecated in v1.1.7 - prefer session.groups",
DeprecationWarning,
stacklevel=2,
)
(MessageClass, api_key) = self._api_info(group_admin)

@@ -397,5 +421,1 @@

logger.debug(response.int_info)
# NOTE: Everything of the form *UserGroup* is deprecated.
UserGroupManager = GroupManager

@@ -8,2 +8,3 @@ """Define objects related to communication with iRODS server API endpoints."""

import irods.exception as ex
from typing import Optional
import xml.etree.ElementTree as ET_xml

@@ -89,11 +90,11 @@ import defusedxml.ElementTree as ET_secure_xml

_default_XML = os.environ.get(
_default_XML_env = os.environ.get(
"PYTHON_IRODSCLIENT_DEFAULT_XML", globals().get("_default_XML")
)
if not _default_XML:
if not _default_XML_env:
_default_XML = XML_Parser_Type.STANDARD_XML
else:
try:
_default_XML = _XML_strings[_default_XML]
_default_XML = _XML_strings[_default_XML_env]
except KeyError:

@@ -185,3 +186,3 @@ raise BadXMLSpec("XML parser type not recognized")

IRODS_VERSION = (4, 3, 4, "d")
IRODS_VERSION = (5, 0, 1, "d")

@@ -872,3 +873,3 @@ UNICODE = str

_name = None
_name : Optional[str] = None

@@ -1113,50 +1114,62 @@ def __init__(self, *args):

class DataObjInfo(Message):
_name = "DataObjInfo_PI"
objPath = StringProperty()
rescName = StringProperty()
rescHier = StringProperty()
dataType = StringProperty()
dataSize = LongProperty()
chksum = StringProperty()
version = StringProperty()
filePath = StringProperty()
dataOwnerName = StringProperty()
dataOwnerZone = StringProperty()
replNum = IntegerProperty()
replStatus = IntegerProperty()
statusString = StringProperty()
dataId = LongProperty()
collId = LongProperty()
dataMapId = IntegerProperty()
dataComments = StringProperty()
dataMode = StringProperty()
dataExpiry = StringProperty()
dataCreate = StringProperty()
dataModify = StringProperty()
dataAccess = StringProperty()
dataAccessInx = IntegerProperty()
writeFlag = IntegerProperty()
destRescName = StringProperty()
backupRescName = StringProperty()
subPath = StringProperty()
specColl = IntegerProperty()
regUid = IntegerProperty()
otherFlags = IntegerProperty()
KeyValPair_PI = SubmessageProperty(StringStringMap)
in_pdmo = StringProperty()
next = IntegerProperty()
rescId = LongProperty()
def DataObjInfo_for_session(session):
class DataObjInfo(Message):
_name = "DataObjInfo_PI"
objPath = StringProperty()
rescName = StringProperty()
rescHier = StringProperty()
dataType = StringProperty()
dataSize = LongProperty()
chksum = StringProperty()
version = StringProperty()
filePath = StringProperty()
dataOwnerName = StringProperty()
dataOwnerZone = StringProperty()
replNum = IntegerProperty()
replStatus = IntegerProperty()
statusString = StringProperty()
dataId = LongProperty()
collId = LongProperty()
dataMapId = IntegerProperty()
dataComments = StringProperty()
dataMode = StringProperty()
dataExpiry = StringProperty()
dataCreate = StringProperty()
dataModify = StringProperty()
dataAccess = StringProperty()
dataAccessInx = IntegerProperty()
writeFlag = IntegerProperty()
destRescName = StringProperty()
backupRescName = StringProperty()
subPath = StringProperty()
specColl = IntegerProperty()
regUid = IntegerProperty()
otherFlags = IntegerProperty()
KeyValPair_PI = SubmessageProperty(StringStringMap)
in_pdmo = StringProperty()
next = IntegerProperty()
rescId = LongProperty()
class ModDataObjMeta(Message):
_name = "ModDataObjMeta_PI"
dataObjInfo = SubmessageProperty(DataObjInfo)
regParam = SubmessageProperty(StringStringMap)
class _DataObjInfo_for_iRODS_5(DataObjInfo):
dataAccessTime = StringProperty()
return DataObjInfo if session.server_version < (5,) else _DataObjInfo_for_iRODS_5
def ModDataObjMeta_for_session(session):
doi_class = DataObjInfo_for_session(session)
class ModDataObjMeta(Message):
_name = "ModDataObjMeta_PI"
dataObjInfo = SubmessageProperty(doi_class)
regParam = SubmessageProperty(StringStringMap)
return ModDataObjMeta
# -- A tuple-descended class which facilitates filling in a
# quasi-RError stack from a JSON formatted list.
_Server_Status_Message = namedtuple("server_status_msg", ("msg", "status"))
_Server_Status_Message = namedtuple("_Server_Status_Message", ("msg", "status"))

@@ -1163,0 +1176,0 @@

@@ -5,8 +5,4 @@ # Ordered property classes stolen from Kris Kowal of Ask a Wizard

try:
next_counter = count().__next__
except AttributeError:
next_counter = count().next
next_counter = count().__next__
class OrderedProperty:

@@ -13,0 +9,0 @@

@@ -179,10 +179,4 @@ # A parser for the iRODS XML-like protocol.

try:
unicode # Python 2
except NameError:
unicode = str
def fromstring(s):
if type(s) is unicode:
if type(s) is str:
s = s.encode("utf-8")

@@ -189,0 +183,0 @@ if type(s) is not bytes:

@@ -0,1 +1,2 @@

from typing import Dict, List, Tuple
from irods.column import Column, Integer, String, DateTime, Keyword

@@ -5,4 +6,4 @@

class ModelBase(type):
column_items = []
column_dict = {}
column_items : List[Tuple[int, Column]] = []
column_dict : Dict[int, Column] = {}

@@ -80,6 +81,2 @@ @classmethod

# The UserGroup model-class is now renamed Group, but we'll keep the deprecated name around for now.
UserGroup = Group
class Resource(Model):

@@ -127,2 +124,3 @@ id = Column(Integer, "R_RESC_ID", 301)

resc_id = Column(String, "D_RESC_ID", 423, min_version=(4, 2, 0))
access_time = Column(DateTime, "D_ACCESS_TIME", 424, min_version=(5, 0, 0))

@@ -129,0 +127,0 @@

@@ -12,2 +12,3 @@ #!/usr/bin/env python

import multiprocessing
from typing import List, Union

@@ -607,3 +608,4 @@ from irods.data_object import iRODSDataObject

opt, arg = getopt.getopt(sys.argv[1:], "vL:l:aR:N:")
opt, arg_ = getopt.getopt(sys.argv[1:], "vL:l:aR:N:")
arg: List[Union[str, int]] = list(arg_)

@@ -627,4 +629,5 @@ opts = dict(opt)

arg[1] = Oper.PUT if arg[1].lower() in ("w", "put", "a") else Oper.GET
if async_xfer is not None:
# The purpose of the isinstance calls in the lines below is to allow mypy to infer the current type
arg[1] = Oper.PUT if isinstance(arg[1], str) and arg[1].lower() in ("w", "put", "a") else Oper.GET
if isinstance(arg[1], int) and async_xfer is not None:
arg[1] |= Oper.NONBLOCKING

@@ -631,0 +634,0 @@

@@ -6,2 +6,3 @@ #!/usr/bin/env python3

import sys
from typing import Callable, Dict

@@ -25,3 +26,3 @@ from irods.auth.pam_password import _get_pam_password_from_stdin as get_password

vector = {"pam_password": write_pam_irodsA_file, "native": write_native_irodsA_file}
vector : Dict[str, Callable] = {"pam_password": write_pam_irodsA_file, "native": write_native_irodsA_file}
opts, args = getopt.getopt(sys.argv[1:], "hi:", ["ttl=", "help"])

@@ -50,8 +51,10 @@ optD = dict(opts)

options["ttl"] = optD["--ttl"]
pw = get_password(
sys.stdin if inp_stream in ("-", None) else open(inp_stream, "r"),
prompt=f"Enter current password for scheme {scheme!r}: ",
)
if inp_stream is None or inp_stream == "-":
pw = get_password(sys.stdin,
prompt=f"Enter current password for scheme {scheme!r}: ",)
else:
pw = get_password(open(inp_stream, "r", encoding='utf-8'),
prompt=f"Enter current password for scheme {scheme!r}: ",)
vector[scheme](pw, **options)
else:
print("did not recognize authentication scheme argument", file=sys.stderr)

@@ -5,3 +5,3 @@ from collections import OrderedDict

from irods.models import Model
from irods.column import Column, Keyword
from irods.column import Column, Column_remover, Keyword
from irods.message import (

@@ -38,2 +38,3 @@ IntegerIntegerMap,

IRODS_QUERY_LIMIT = 500

@@ -59,2 +60,4 @@ class Query:

self.columns[arg] = 1
elif isinstance(arg, Column_remover):
self.columns.pop(arg.column, None)
else:

@@ -200,3 +203,3 @@ raise TypeError("Arguments must be models or columns")

def _message(self):
max_rows = 500 if self._limit == -1 else self._limit
max_rows = IRODS_QUERY_LIMIT if self._limit < 0 else self._limit
args = {

@@ -236,2 +239,9 @@ "maxRows": max_rows,

def _all(self):
"""Internally used version of all(). Unlike the public version, the returned iterator when
executed to completion is unaffected by IRODS_QUERY_LIMIT. This is in order to accurately
reflect the enumeration of sub-objects, e.g. AVU's associated with an object.
"""
return self.get_results()
def all(self):

@@ -386,1 +396,7 @@ result_set = self.execute()

yield result
# Record a copy of the original value of IRODS_QUERY_LIMIT for possible future access.
import irods.helpers as helpers
cached_values = helpers.create_value_cache(globals())
cached_values.make_entry('IRODS_QUERY_LIMIT')

@@ -109,50 +109,2 @@ import ast

@property
def groups(self):
class _GroupManager(self.user_groups.__class__):
def create(
self, name, group_admin=None
): # NB new default (see user_groups manager and i/f, with False as default)
user_type = "rodsgroup" # These are no longer parameters in the new interface, as they have no reason to vary.
user_zone = "" # Groups (1) are always of type 'rodsgroup', (2) always belong to the local zone, and
auth_str = "" # (3) do not authenticate.
return super(_GroupManager, self).create(
name,
user_type,
user_zone,
auth_str,
group_admin,
suppress_deprecation_warning=True,
)
def addmember(self, group_name, user_name, user_zone="", group_admin=None):
return super(_GroupManager, self).addmember(
group_name,
user_name,
user_zone,
group_admin,
suppress_deprecation_warning=True,
)
def removemember(
self, group_name, user_name, user_zone="", group_admin=None
):
return super(_GroupManager, self).removemember(
group_name,
user_name,
user_zone,
group_admin,
suppress_deprecation_warning=True,
)
_groups = getattr(self, "_groups", None)
if not _groups:
_groups = self._groups = _GroupManager(self.user_groups.sess)
return self._groups
def __init__(self, configure=True, auto_cleanup=True, **kwargs):

@@ -177,3 +129,3 @@ self.pool = None

self.users = UserManager(self)
self.user_groups = GroupManager(self)
self.groups = GroupManager(self)
self.resources = ResourceManager(self)

@@ -180,0 +132,0 @@ self.zones = ZoneManager(self)

#! /usr/bin/env python
import datetime
import os
import sys
import unittest
from irods.models import User
from irods.exception import UserDoesNotExist, ResourceDoesNotExist, SYS_NO_API_PRIV
from irods.models import User, Group
from irods.exception import (
UserDoesNotExist,
ResourceDoesNotExist,
SYS_NO_API_PRIV,
)
from irods.session import iRODSSession

@@ -71,2 +76,92 @@ from irods.resource import iRODSResource

def test_create_with_user_options__issue_759(self):
gpadmin_user = remote_zone = None
try:
gpadmin_user = self.sess.users.create(self.new_user_name, "groupadmin")
gpadmin_user.modify("password", gpadmin_password:='my-gpadmin-passw')
remote_zone = self.sess.zones.create('other_zone', 'remote')
with iRODSSession(
port=self.sess.port,
zone=self.sess.zone,
host=self.sess.host,
user=self.new_user_name,
password=gpadmin_password,
) as gpadmin:
for expected_exception,args,kwargs in (
(None, ("newUser","newPassword"), {}), # no zone supplied
(None, ("newUser","newPassword"), {"user_zone":gpadmin.zone}), # local zone supplied
(ValueError, ("newUser","newPassword"), {"user_zone":remote_zone.name}), # remote zone supplied
(ValueError, ("newUser","newPassword"), {"user_zone":"fictionZone"}), # nonexistent zone supplied
(ValueError, ("newUser#tempZone","newPassword"), {}),
(ValueError, (f"newUser#{remote_zone.name}","newPassword"), {}),
(ValueError, ("newUser#fictionZone","newPassword"), {}),
(ValueError, (f"newUser#{gpadmin.zone}","newPassword"), {"user_zone":gpadmin.zone}),
(ValueError, (f"newUser#{gpadmin.zone}","newPassword"), {"user_zone":remote_zone.name}),
(ValueError, (f"newUser#{gpadmin.zone}","newPassword"), {"user_zone":"fictionZone"}),
):
with self.subTest(args = args, kwargs = kwargs):
user = None
try:
test_function = lambda:gpadmin.users.create_with_password(*args, **kwargs)
if expected_exception:
with self.assertRaises(expected_exception):
user = test_function()
else:
user = test_function()
self.assertEqual(user is None, expected_exception is not None, "In case of error, and only then, user should not exist.")
finally:
if user:
self.sess.users.get(user.name,user.zone).remove()
finally:
if remote_zone:
remote_zone.remove()
if gpadmin_user:
gpadmin_user.remove()
def test_groupadmin_can_create_entry_for_remote_user_and_add_to_group__issue_759(self):
gpadmin_user = remote_zone = test_group = test_user = None
gpadmin_user = self.sess.users.create(self.new_user_name, "groupadmin")
gpadmin_user.modify("password", gpadmin_password:='my-gpadmin-passw')
try:
remote_zone = self.sess.zones.create('other_zone', 'remote')
with iRODSSession(
port=self.sess.port,
zone=self.sess.zone,
host=self.sess.host,
user=self.new_user_name,
password=gpadmin_password,
) as gpadmin:
# Create a group as group admin and add own name to it to get group modification privileges.
test_group = gpadmin.groups.create('test_group')
test_group.addmember(gpadmin_user.name)
# TODO(#763): remove randomization of user name
random = helpers.unique_name(helpers.my_function_name(), datetime.datetime.now())
# Create a test user, effectively a local entry for a user in a remote-zone.
test_user = gpadmin.users.create_remote(f'remote_user_{random}', user_zone=remote_zone.name)
# Add the remote test user to the group we created, then assert membership.
test_group.addmember(test_user.name, user_zone = remote_zone.name)
self.assertIn(
(test_user.name, test_user.zone),
[(_[User.name],_[User.zone]) for _ in self.sess.query(User,Group).filter(Group.name == test_group.name)]
)
finally:
if test_group:
# Iterate through members from group (with groupadmin as the last), removing each one.
for member in sorted(test_group.members, key=lambda _:_.name == gpadmin_user.name):
test_group.removemember(member.name, user_zone=member.zone)
# Use rodadmin-enabled session to remove the group
self.sess.groups.get(test_group.name).remove()
# Clean up other objects created for test.
if test_user:
self.sess.users.get(test_user.name, user_zone=test_user.zone).remove()
if gpadmin_user:
gpadmin_user.remove()
if remote_zone:
remote_zone.remove()
def test_groupadmin_creates_group_and_unable_to_delete_group__374(self):

@@ -73,0 +168,0 @@ # Have user with groupadmin

@@ -170,5 +170,6 @@ import base64

make_session.__doc__ = re.sub(
r"(test_server_version\s*)=\s*\w+", r"\1 = True", _irods_helpers_make_session.__doc__
)
if _irods_helpers_make_session.__doc__ is not None:
make_session.__doc__ = re.sub(
r"(test_server_version\s*)=\s*\w+", r"\1 = True", _irods_helpers_make_session.__doc__
)

@@ -175,0 +176,0 @@

@@ -24,2 +24,3 @@ #! /usr/bin/env python

from re import compile as regex
from typing import Dict, Optional
import gc

@@ -38,6 +39,3 @@ from irods.test.setupssl import create_ssl_dir

try:
from re import _pattern_type as regex_type
except ImportError:
from re import Pattern as regex_type # Python 3.7+
from re import Pattern as regex_type

@@ -185,3 +183,3 @@

env_save = {}
env_save: Dict[str,Optional[str]] = {}

@@ -188,0 +186,0 @@ @contextlib.contextmanager

@@ -24,2 +24,3 @@ #!/usr/bin/env python

)
from irods.message.ordered import OrderedProperty

@@ -210,4 +211,8 @@

def test_ordered_properties_have_unique_ids(self):
property1 = OrderedProperty()
property2 = OrderedProperty()
self.assertNotEqual(property1._creation_counter, property2._creation_counter) # pylint: disable=protected-access
if __name__ == "__main__":
unittest.main()

@@ -136,3 +136,4 @@ #! /usr/bin/env python

from irods.test.helpers import create_simple_resc, create_simple_resc_hierarchy
create_simple_resc_hierarchy = helpers.create_simple_resc_hierarchy
create_simple_resc = helpers.create_simple_resc

@@ -139,0 +140,0 @@ def test_replica_truncate_json_error__issue_606(self):

@@ -15,2 +15,3 @@ #! /usr/bin/env python

import socket
from typing import Any, Dict
import irods.test.helpers as helpers

@@ -32,3 +33,3 @@ from irods.connection import DESTRUCTOR_MSG

test_extension = ""
preferred_parameters = {}
preferred_parameters : Dict[str, Any] = {}

@@ -35,0 +36,0 @@ @classmethod

@@ -29,10 +29,11 @@ #! /usr/bin/env python

)
from irods.query import SpecificQuery
from irods import MAX_SQL_ROWS
import irods.client_configuration as config
from irods.column import Like, NotLike, Between, In
import irods.keywords as kw
from irods.meta import iRODSMeta
from irods.query import SpecificQuery
from irods.rule import Rule
from irods import MAX_SQL_ROWS
import irods.test.helpers as helpers
from irods.test.helpers import irods_shared_reg_resc_vault
import irods.test.helpers as helpers
import irods.keywords as kw

@@ -747,2 +748,4 @@ IRODS_STATEMENT_TABLE_SIZE = 50

def test_multiple_criteria_on_one_column_name(self):
# Remove the column skips when irods/irods #8574 is resolved.
skipped_columns = {DataObject.map_id, Collection.map_id, DataObject.status, DataObject.type, DataObject.collection_id}
collection = self.coll_path

@@ -759,3 +762,3 @@ filename = "test_multiple_AVU_joins"

self.assertTrue(nobj > 0 and len(objects) == nobj)
q = self.sess.query(Collection, DataObject)
q = self.sess.query(Collection, DataObject, *{-col for col in skipped_columns})
dummy_test = [

@@ -1025,3 +1028,54 @@ d

def test_negating_columns_in_genquery1_results__issue_755(self):
columns_to_negate = {Collection.map_id, DataObject.map_id, DataObject.status, DataObject.type, DataObject.collection_id}
columns_to_negate_D = columns_to_negate & set(DataObject._columns)
columns_to_negate_C = columns_to_negate & set(Collection._columns)
cases = { (Collection, DataObject): columns_to_negate,
(Collection,): columns_to_negate_C,
(DataObject,): columns_to_negate_D, }
for requested,intersect in cases.items():
q = self.sess.query(*requested, *{-col for col in columns_to_negate}).limit(1)
row = list(q.all())[0]
# assert that columns_to_negate members don't appear in result
self.assertFalse(columns_to_negate & row.keys())
# Remove the if/continue when irods/irods #8574 is resolved.
if self.sess.server_version > (5,0,0) and len(requested) > 1:
continue
# Re-assert the positive space: that the sets of negated columns are in fact both
# (1) nonzero length, and
# (2) present in the results when not explicitly negated.
self.assertTrue(intersect)
q = self.sess.query(*requested).limit(1)
row = list(q.all())[0]
self.assertEqual(columns_to_negate & row.keys(), intersect)
def test_set_query_limit__issue_712(self):
# Test requires that 0 < num_limited_results < num_total_objects.
num_limited_results = 4
num_total_objects = num_limited_results + 6
data_objs = []
try:
# Create a number of data objects.
for name in range(num_total_objects):
data_objs.append(self.sess.data_objects.create(f'{self.coll_path}/issue_712_obj{name}'))
# Test a query limit via configuration.
with config.loadlines(
entries=[dict(setting="genquery1.irods_query_limit", value=num_limited_results)]
):
limited_results = list(self.sess.query(DataObject.id).filter(Like(DataObject.name, '%issue_712_obj%')).all())
self.assertEqual(num_limited_results, len(limited_results))
# Test the query limit is no longer in effect.
non_limited_results = list(self.sess.query(DataObject.id).filter(Like(DataObject.name, '%issue_712_obj%')).all())
self.assertEqual(num_total_objects, len(non_limited_results))
finally:
for d in data_objs:
d.unlink(force=True)
class TestSpecificQuery(unittest.TestCase):

@@ -1028,0 +1082,0 @@

#! /usr/bin/env python
from datetime import datetime as _datetime
import os

@@ -58,2 +59,22 @@ import sys

def test_create_common_username_remote_then_local__issue_764(self):
zone = None
users= []
test_zone = "remote_zone"
# TODO(#763): remove user name randomization.
test_user = "user_issue_764_" + helpers.unique_name(helpers.my_function_name(), _datetime.now())
try:
zone = self.sess.zones.create(test_zone, "remote")
users.append(
self.sess.users.create(test_user, "rodsuser", user_zone=test_zone)
)
users.append(
self.sess.users.create(test_user, "rodsuser", user_zone="")
)
self.assertEqual(2, len(list(self.sess.query(User).filter(User.name == test_user))))
finally:
for user in users:
user.remove()
if zone:
zone.remove()

@@ -60,0 +81,0 @@ if __name__ == "__main__":

import os
__version__ = "3.1.1"
__version__ = "3.2.0"

@@ -5,0 +5,0 @@

PrettyTable>=0.7.2
defusedxml
jsonpointer
jsonpatch
[tests]
unittest-xml-reporting
types-defusedxml
progressbar
types-tqdm

@@ -40,2 +40,3 @@ CHANGELOG.md

irods/auth/pam.py
irods/auth/pam_interactive.py
irods/auth/pam_password.py

@@ -80,2 +81,3 @@ irods/client_configuration/__init__.py

irods/test/meta_test.py
irods/test/pam_interactive_test.py
irods/test/pool_test.py

@@ -82,0 +84,0 @@ irods/test/query_test.py

@@ -44,5 +44,11 @@ from setuptools import setup, find_packages

"defusedxml",
"jsonpointer",
"jsonpatch",
],
extras_require={"tests": ["unittest-xml-reporting"]}, # for xmlrunner
extras_require={"tests": ["unittest-xml-reporting", # for xmlrunner
"types-defusedxml", # for type checking
"progressbar", # for type checking
"types-tqdm"] # for type checking
},
scripts=["irods/prc_write_irodsA.py"],
)

Sorry, the diff of this file is too big to display

Sorry, the diff of this file is too big to display

Sorry, the diff of this file is too big to display

Sorry, the diff of this file is too big to display