Latest Threat Research:SANDWORM_MODE: Shai-Hulud-Style npm Worm Hijacks CI Workflows and Poisons AI Toolchains.Details
Socket
Book a DemoInstallSign in
Socket

google-apputils

Package Overview
Dependencies
Maintainers
3
Versions
8
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

google-apputils - npm Package Compare versions

Comparing version
0.4.0
to
0.4.1
+1
-1
google_apputils.egg-info/PKG-INFO
Metadata-Version: 1.0
Name: google-apputils
Version: 0.4.0
Version: 0.4.1
Summary: UNKNOWN

@@ -5,0 +5,0 @@ Home-page: http://code.google.com/p/google-apputils-python

@@ -1,3 +0,3 @@

python-dateutil>=1.4,<2
python-dateutil>=1.4
python-gflags>=1.4
pytz>=2010

@@ -1,4 +0,2 @@

LICENSE
README
ez_setup.py
setup.py

@@ -25,19 +23,2 @@ google/__init__.py

google_apputils.egg-info/requires.txt
google_apputils.egg-info/top_level.txt
tests/__init__.py
tests/app_test.py
tests/app_test_helper.py
tests/app_unittest.sh
tests/appcommands_example.py
tests/appcommands_unittest.sh
tests/basetest_sh_test.sh
tests/basetest_test.py
tests/datelib_unittest.py
tests/file_util_test.py
tests/humanize_test.py
tests/resources_test.py
tests/sh_test.py
tests/shellutil_unittest.py
tests/stopwatch_unittest.py
tests/data/a
tests/data/b
google_apputils.egg-info/top_level.txt
#!/usr/bin/env python
#
# Copyright 2007 Google Inc. All Rights Reserved.

@@ -216,3 +215,4 @@ #

def __init__(self, name, flag_values, command_aliases=None):
def __init__(self, name, flag_values, command_aliases=None,
all_commands_help=None, help_full=None):
"""Initialize and check whether self is actually a Cmd instance.

@@ -225,6 +225,13 @@

Args:
name: Name of the command
flag_values: FlagValues() instance that needs to be passed as
flag_values parameter to any flags registering call.
command_aliases: A list of command aliases that the command can be run as.
name: Name of the command
flag_values: FlagValues() instance that needs to be passed as
flag_values parameter to any flags registering call.
command_aliases: A list of aliases that the command can be run as.
all_commands_help: A short description of the command that is shown when
the user requests help for all commands at once.
help_full: A long description of the command and usage that is
shown when the user requests help for just this
command. If unspecified, the command's docstring is
used instead.
Raises:

@@ -236,13 +243,14 @@ AppCommandsError: if self is Cmd (Cmd is abstract)

self._command_flags = flag_values
self._all_commands_help = None
self._all_commands_help = all_commands_help
self._help_full = help_full
if type(self) is Cmd:
raise AppCommandsError('Cmd is abstract and cannot be instantiated')
def Run(self, argv):
def Run(self, unused_argv):
"""Execute the command. Must be provided by the implementing class.
Args:
argv: Remaining command line arguments after parsing flags and command
(that is a copy of sys.argv at the time of the function call with
all parsed flags removed).
unused_argv: Remaining command line arguments after parsing flags and
command (in other words, a copy of sys.argv at the time of
the function call with all parsed flags removed).

@@ -254,3 +262,3 @@ Returns:

Raises:
AppCommandsError: Always as in must be overwritten
AppCommandsError: Always, as in must be overwritten
"""

@@ -280,4 +288,5 @@ raise AppCommandsError('%s.%s.Run() is not implemented' % (

exitcode=None):
AppcommandsUsage(shorthelp, writeto_stdout, detailed_error, exitcode=1,
show_cmd=self._command_name, show_global_flags=True)
AppcommandsUsage(shorthelp, writeto_stdout, detailed_error,
exitcode=exitcode, show_cmd=self._command_name,
show_global_flags=True)
app.usage = ReplacementAppUsage

@@ -332,2 +341,4 @@ # Parse flags and restore app.usage afterwards

return flags.DocToHelp(self._all_commands_help)
elif self._help_full is not None:
return flags.DocToHelp(self._help_full)
elif self.__doc__:

@@ -346,3 +357,11 @@ return flags.DocToHelp(self.__doc__)

def CommandGetName(self):
"""Get name of command.
Returns:
Command name.
"""
return self._command_name
class _FunctionalCmd(Cmd):

@@ -355,4 +374,3 @@ """Class to wrap functions as CMD instances.

def __init__(self, name, flag_values, cmd_func, all_commands_help=None,
**kargs):
def __init__(self, name, flag_values, cmd_func, **kargs):
"""Create a functional command.

@@ -365,34 +383,10 @@

cmd_func: Function to call when command is to be executed.
**kargs: Additional keyword arguments to be passed to the
superclass constructor.
"""
Cmd.__init__(self, name, flag_values, **kargs)
self._all_commands_help = all_commands_help
if 'help_full' not in kargs:
kargs['help_full'] = cmd_func.__doc__
super(_FunctionalCmd, self).__init__(name, flag_values, **kargs)
self._cmd_func = cmd_func
def CommandGetHelp(self, unused_argv, cmd_names=None):
"""Get help for command.
Args:
unused_argv: Remaining command line flags and arguments after parsing
command (that is a copy of sys.argv at the time of the
function call with all parsed flags removed); unused in this
implementation.
cmd_names: By default, if help is being shown for more than one command,
and this command defines _all_commands_help, then
_all_commands_help will be displayed instead of the class
doc. cmd_names is used to determine the number of commands
being displayed and if only a single command is display then
the class doc is returned.
Returns:
__doc__ property for command function or a message stating there is no
help.
"""
if (type(cmd_names) is list and len(cmd_names) > 1 and
self._all_commands_help is not None):
return flags.DocToHelp(self._all_commands_help)
if self._cmd_func.__doc__ is not None:
return flags.DocToHelp(self._cmd_func.__doc__)
else:
return 'No help available'
def Run(self, argv):

@@ -412,3 +406,3 @@ """Execute the command with given arguments.

def _AddCmdInstance(command_name, cmd, command_aliases=None):
def _AddCmdInstance(command_name, cmd, command_aliases=None, **_):
"""Add a command from a Cmd instance.

@@ -422,5 +416,4 @@

Raises:
AppCommandsError: is command is already registered OR cmd is not a subclass
of Cmd
AppCommandsError: if name is already registered OR name is not a string OR
AppCommandsError: If cmd is not a subclass of Cmd.
AppCommandsError: If name is already registered OR name is not a string OR
name is too short OR name does not start with a letter OR

@@ -451,5 +444,3 @@ name contains any non alphanumeric characters besides

Raises:
AppCommandsError: is command is already registered OR cmd is not a subclass
of Cmd
AppCommandsError: if name is already registered OR name is not a string OR
AppCommandsError: If name is already registered OR name is not a string OR
name is too short OR name does not start with a letter OR

@@ -473,3 +464,3 @@ name contains any non alphanumeric characters besides

def AddCmd(command_name, cmd_factory, **kargs):
def AddCmd(command_name, cmd_factory, **kwargs):
"""Add a command from a Cmd subclass or factory.

@@ -482,3 +473,5 @@

of Cmd.
command_aliases: A list of command aliases that the command can be run as.
**kwargs: Additional keyword arguments to be passed to the
cmd_factory at initialization. Also passed to
_AddCmdInstance to catch command_aliases.

@@ -488,3 +481,3 @@ Raises:

"""
cmd = cmd_factory(command_name, flags.FlagValues(), **kargs)
cmd = cmd_factory(command_name, flags.FlagValues(), **kwargs)

@@ -494,3 +487,3 @@ if not isinstance(cmd, Cmd):

_AddCmdInstance(command_name, cmd, **kargs)
_AddCmdInstance(command_name, cmd, **kwargs)

@@ -517,3 +510,3 @@

all_commands_help=all_commands_help),
command_aliases)
command_aliases=command_aliases)

@@ -527,2 +520,14 @@

def __init__(self, *args, **kwargs):
if 'help_full' not in kwargs:
kwargs['help_full'] = (
'Help for all or selected command:\n'
'\t%(prog)s help [<command>]\n\n'
'To retrieve help with global flags:\n'
'\t%(prog)s --help\n\n'
'To retrieve help with flags only from the main module:\n'
'\t%(prog)s --helpshort [<command>]\n\n'
% {'prog': GetAppBasename()})
super(_CmdHelp, self).__init__(*args, **kwargs)
def Run(self, argv):

@@ -560,14 +565,3 @@ """Execute help command.

def CommandGetHelp(self, unused_argv, cmd_names=None):
"""Returns: Help for command."""
cmd_help = ('Help for all or selected command:\n'
'\t%(prog)s help [<command>]\n\n'
'To retrieve help with global flags:\n'
'\t%(prog)s --help\n\n'
'To retrieve help with flags only from the main module:\n'
'\t%(prog)s --helpshort [<command>]\n\n'
% {'prog': GetAppBasename()})
return flags.DocToHelp(cmd_help)
def GetSynopsis():

@@ -670,6 +664,7 @@ """Get synopsis for program.

cmd_help = command.CommandGetHelp(GetCommandArgv(), cmd_names=cmd_names)
except Exception as error:
except Exception as error: # pylint: disable=broad-except
cmd_help = "Internal error for command '%s': %s." % (name, str(error))
cmd_help = cmd_help.strip()
all_names = ', '.join([name] + (command.CommandGetAliases() or []))
all_names = ', '.join(
[command.CommandGetName()] + (command.CommandGetAliases() or []))
if len(all_names) + 1 >= len(prefix) or not cmd_help:

@@ -676,0 +671,0 @@ # If command/alias list would reach over help block-indent

@@ -31,2 +31,3 @@ #!/usr/bin/env python

import re
import signal
import subprocess

@@ -36,6 +37,7 @@ import sys

import types
import unittest
import urlparse
try:
import faulthandler
import faulthandler # pylint: disable=g-import-not-at-top
except ImportError:

@@ -48,11 +50,3 @@ # //testing/pybase:pybase can't have deps on any extension modules as it

# unittest2 is a backport of Python 2.7's unittest for Python 2.6, so
# we don't need it if we are running 2.7 or newer.
if sys.version_info < (2, 7):
import unittest2 as unittest
else:
import unittest
from google.apputils import app
from google.apputils import app # pylint: disable=g-import-not-at-top
import gflags as flags

@@ -77,2 +71,3 @@ from google.apputils import shellutil

def _GetDefaultTestTmpdir():
"""Get default test temp dir."""
tmpdir = os.environ.get('TEST_TMPDIR', '')

@@ -99,209 +94,32 @@ if not tmpdir:

class BeforeAfterTestCaseMeta(type):
# We might need to monkey-patch TestResult so that it stops considering an
# unexpected pass as a as a "successful result". For details, see
# http://bugs.python.org/issue20165
def _MonkeyPatchTestResultForUnexpectedPasses():
"""Workaround for <http://bugs.python.org/issue20165>."""
"""Adds setUpTestCase() and tearDownTestCase() methods.
# pylint: disable=g-doc-return-or-yield,g-doc-args,g-wrong-blank-lines
These may be needed for setup and teardown of shared fixtures usually because
such fixtures are expensive to setup and teardown (eg Perforce clients). When
using such fixtures, care should be taken to keep each test as independent as
possible (eg via the use of sandboxes).
def wasSuccessful(self):
"""Tells whether or not this result was a success.
Example:
Any unexpected pass is to be counted as a non-success.
"""
return (len(self.failures) == len(self.errors) ==
len(self.unexpectedSuccesses) == 0)
class MyTestCase(basetest.TestCase):
# pylint: enable=g-doc-return-or-yield,g-doc-args,g-wrong-blank-lines
__metaclass__ = basetest.BeforeAfterTestCaseMeta
test_result = unittest.result.TestResult()
test_result.addUnexpectedSuccess('test')
if test_result.wasSuccessful(): # The bug is present.
unittest.result.TestResult.wasSuccessful = wasSuccessful
if test_result.wasSuccessful(): # Warn the user if our hot-fix failed.
sys.stderr.write('unittest.result.TestResult monkey patch to report'
' unexpected passes as failures did not work.\n')
@classmethod
def setUpTestCase(cls):
cls._resource = foo.ReallyExpensiveResource()
@classmethod
def tearDownTestCase(cls):
cls._resource.Destroy()
_MonkeyPatchTestResultForUnexpectedPasses()
def testSomething(self):
self._resource.Something()
...
"""
_test_loader = unittest.defaultTestLoader
def __init__(cls, name, bases, dict):
super(BeforeAfterTestCaseMeta, cls).__init__(name, bases, dict)
# Notes from mtklein
# This code can be tricky to think about. Here are a few things to remember
# as you read through it.
# When inheritance is involved, this __init__ is called once on each class
# in the inheritance chain when that class is defined. In a typical
# scenario where a BaseClass inheriting from TestCase declares the
# __metaclass__ and SubClass inherits from BaseClass, __init__ will be first
# called with cls=BaseClass when BaseClass is defined, and then called later
# with cls=SubClass when SubClass is defined.
# To know when to call setUpTestCase and tearDownTestCase, this class wraps
# the setUp, tearDown, and test* methods in a TestClass. We'd like to only
# wrap those methods in the leaves of the inheritance tree, but we can't
# know when we're a leaf at wrapping time. So instead we wrap all the
# setUp, tearDown, and test* methods, but code them so that we only do the
# counting we want at the leaves, which we *can* detect when we've got an
# actual instance to look at --- i.e. self, when a method is running.
# Because we're wrapping at every level of inheritance, some methods get
# wrapped multiple times down the inheritance chain; if SubClass were to
# inherit, say, setUp or testFoo from BaseClass, that method would be
# wrapped twice, first by BaseClass then by SubClass. That's OK, because we
# ensure that the extra code we inject with these wrappers is idempotent.
# test_names are the test methods this class can see.
test_names = set(cls._test_loader.getTestCaseNames(cls))
# Each class keeps a set of the tests it still has to run. When it's empty,
# we know we should call tearDownTestCase. For now, it holds the sentinel
# value of None, acting as a indication that we need to call setUpTestCase,
# which fills in the actual tests to run.
cls.__tests_to_run = None
# These calls go through and monkeypatch various methods, in no particular
# order.
BeforeAfterTestCaseMeta.SetSetUpAttr(cls, test_names)
BeforeAfterTestCaseMeta.SetTearDownAttr(cls)
BeforeAfterTestCaseMeta.SetTestMethodAttrs(cls, test_names)
BeforeAfterTestCaseMeta.SetBeforeAfterTestCaseAttr()
# Just a little utility function to help with monkey-patching.
@staticmethod
def SetMethod(cls, method_name, replacement):
"""Like setattr, but also preserves name, doc, and module metadata."""
original = getattr(cls, method_name)
replacement.__name__ = original.__name__
replacement.__doc__ = original.__doc__
replacement.__module__ = original.__module__
setattr(cls, method_name, replacement)
@staticmethod
def SetSetUpAttr(cls, test_names):
"""Wraps setUp() with per-class setUp() functionality."""
# Remember that SetSetUpAttr is eventually called on each class in the
# inheritance chain. This line can be subtle because of inheritance. Say
# we've got BaseClass that defines setUp, and SubClass inheriting from it
# that doesn't define setUp. This method will run twice, and both times
# cls_setUp will be BaseClass.setUp. This is one of the tricky cases where
# setUp will be wrapped multiple times.
cls_setUp = cls.setUp
# We create a new setUp method that first checks to see if we need to run
# setUpTestCase (looking for the __tests_to_run==None flag), and then runs
# the original setUp method.
def setUp(self):
"""Function that will encapsulate and replace cls.setUp()."""
# This line is unassuming but crucial to making this whole system work.
# It sets leaf to the class of the instance we're currently testing. That
# is, leaf is going to be a leaf class. It's not necessarily the same
# class as the parameter cls that's being passed in. For example, in the
# case above where setUp is in BaseClass, when we instantiate a SubClass
# and call setUp, we need leaf to be pointing at the class SubClass.
leaf = self.__class__
# The reason we want to do this is that it makes sure setUpTestCase is
# only run once, not once for each class down the inheritance chain. When
# multiply-wrapped, this extra code is called multiple times. In the
# running example:
#
# 1) cls=BaseClass: replace BaseClass' setUp with a wrapped setUp
# 2) cls=SubClass: set SubClass.setUp to what it thinks was its original
# setUp --- the wrapped setUp from 1)
#
# So it's double-wrapped, but that's OK. When we actually call setUp from
# an instance, we're calling the double-wrapped method. It sees
# __tests_to_run is None and fills that in. Then it calls what it thinks
# was its original setUp, the singly-wrapped setUp from BaseClass. The
# singly-wrapped setUp *skips* the if-statement, as it sees
# leaf.__tests_to_run is not None now. It just runs the real, original
# setUp().
# test_names is passed in from __init__, and holds all the test cases that
# cls can see. In the BaseClass call, that's probably the empty set, and
# for SubClass it'd have your test methods.
if leaf.__tests_to_run is None:
leaf.__tests_to_run = set(test_names)
self.setUpTestCase()
cls_setUp(self)
# Monkeypatch our new setUp method into the place of the original.
BeforeAfterTestCaseMeta.SetMethod(cls, 'setUp', setUp)
@staticmethod
def SetTearDownAttr(cls):
"""Wraps tearDown() with per-class tearDown() functionality."""
# This is analagous to SetSetUpAttr, except of course it's patching tearDown
# to run tearDownTestCase when there are no more tests to run. All the same
# hairy logic applies.
cls_tearDown = cls.tearDown
def tearDown(self):
"""Function that will encapsulate and replace cls.tearDown()."""
cls_tearDown(self)
leaf = self.__class__
# We need to make sure that tearDownTestCase is only run when
# we're executing this in the leaf class, so we need the
# explicit leaf == cls check below.
if (leaf.__tests_to_run is not None
and not leaf.__tests_to_run
and leaf == cls):
leaf.__tests_to_run = None
self.tearDownTestCase()
BeforeAfterTestCaseMeta.SetMethod(cls, 'tearDown', tearDown)
@staticmethod
def SetTestMethodAttrs(cls, test_names):
"""Makes each test method first remove itself from the remaining set."""
# This makes each test case remove itself from the set of remaining tests.
# You might think that this belongs more logically in tearDown, and I'd
# agree except that tearDown doesn't know what test case it's tearing down!
# Instead we have the test method itself remove itself before attempting the
# test.
# Note that having the test remove itself after running doesn't work, as we
# never get to 'after running' for tests that fail.
# Like setUp and tearDown, the test case could conceivably be wrapped
# twice... but as noted it's an implausible situation to have an actual test
# defined in a base class. Just in case, we take the same precaution by
# looking in only the leaf class' set of __tests_to_run, and using discard()
# instead of remove() to make the operation idempotent.
# The closure here makes sure that each new test() function remembers its
# own values of cls_test and test_name. Without this, they'd all point to
# the values from the last iteration of the loop, causing some arbitrary
# test method to run multiple times and the others never. :(
def test_closure(cls_test, test_name):
def test(self, *args, **kargs):
leaf = self.__class__
leaf.__tests_to_run.discard(test_name)
return cls_test(self, *args, **kargs)
return test
for test_name in test_names:
cls_test = getattr(cls, test_name)
BeforeAfterTestCaseMeta.SetMethod(
cls, test_name, test_closure(cls_test, test_name))
@staticmethod
def SetBeforeAfterTestCaseAttr():
# This just makes sure every TestCase has a setUpTestCase or
# tearDownTestCase, so that you can safely define only one or neither of
# them if you want.
TestCase.setUpTestCase = lambda self: None
TestCase.tearDownTestCase = lambda self: None
class TestCase(unittest.TestCase):

@@ -340,2 +158,42 @@ """Extension of unittest.TestCase providing more powerful assertions."""

def assertStartsWith(self, actual, expected_start):
"""Assert that actual.startswith(expected_start) is True.
Args:
actual: str
expected_start: str
"""
if not actual.startswith(expected_start):
self.fail('%r does not start with %r' % (actual, expected_start))
def assertNotStartsWith(self, actual, unexpected_start):
"""Assert that actual.startswith(unexpected_start) is False.
Args:
actual: str
unexpected_start: str
"""
if actual.startswith(unexpected_start):
self.fail('%r does start with %r' % (actual, unexpected_start))
def assertEndsWith(self, actual, expected_end):
"""Assert that actual.endswith(expected_end) is True.
Args:
actual: str
expected_end: str
"""
if not actual.endswith(expected_end):
self.fail('%r does not end with %r' % (actual, expected_end))
def assertNotEndsWith(self, actual, unexpected_end):
"""Assert that actual.endswith(unexpected_end) is False.
Args:
actual: str
unexpected_end: str
"""
if actual.endswith(unexpected_end):
self.fail('%r does end with %r' % (actual, unexpected_end))
def assertSequenceStartsWith(self, prefix, whole, msg=None):

@@ -395,2 +253,16 @@ """An equality assertion for the beginning of ordered sequences.

def assertNoCommonElements(self, expected_seq, actual_seq, msg=None):
"""Checks whether actual iterable and expected iterable are disjoint."""
common = set(expected_seq) & set(actual_seq)
if not common:
return
common_msg = 'Common elements %s\nExpected: %s\nActual: %s' % (
common, expected_seq, actual_seq)
if msg:
msg += ': %s' % common_msg
else:
msg = common_msg
self.fail(msg)
# TODO(user): Provide an assertItemsEqual method when our super class

@@ -400,2 +272,65 @@ # does not provide one. That method went away in Python 3.2 (renamed

def assertItemsEqual(self, *args, **kwargs):
# pylint: disable=g-doc-args
"""An unordered sequence specific comparison.
It asserts that actual_seq and expected_seq have the same element counts.
Equivalent to::
self.assertEqual(Counter(iter(actual_seq)),
Counter(iter(expected_seq)))
Asserts that each element has the same count in both sequences.
Example:
- [0, 1, 1] and [1, 0, 1] compare equal.
- [0, 0, 1] and [0, 1] compare unequal.
Args:
expected_seq: A sequence containing elements we are expecting.
actual_seq: The sequence that we are testing.
msg: The message to be printed if the test fails.
"""
# pylint: enable=g-doc-args
# In Python 3k this method is called assertCountEqual()
if sys.version_info.major > 2:
self.assertItemsEqual = super(TestCase, self).assertCountEqual
self.assertItemsEqual(*args, **kwargs)
return
# For Python 2.x we must check for the issue below
super_assert_items_equal = super(TestCase, self).assertItemsEqual
try:
super_assert_items_equal([23], []) # Force a fail to check behavior.
except self.failureException as error_to_introspect:
if 'First has 0, Second has 1: 23' in str(error_to_introspect):
# It exhibits http://bugs.python.org/issue14832
# Always use our repaired method that swaps the arguments.
self.assertItemsEqual = self._FixedAssertItemsEqual
else:
# It exhibits correct behavior. Always use the super's method.
self.assertItemsEqual = super_assert_items_equal
# Delegate this call to the correct method. All future calls will skip
# this error patching code.
self.assertItemsEqual(*args, **kwargs)
assert 'Impossible: TestCase assertItemsEqual is broken.'
def _FixedAssertItemsEqual(self, expected_seq, actual_seq, msg=None):
"""A version of assertItemsEqual that works around issue14832."""
super(TestCase, self).assertItemsEqual(actual_seq, expected_seq, msg=msg)
def assertCountEqual(self, *args, **kwargs):
# pylint: disable=g-doc-args
"""An unordered sequence specific comparison.
Equivalent to assertItemsEqual(). This method is a compatibility layer
for Python 3k, since 2to3 does not convert assertItemsEqual() calls into
assertCountEqual() calls.
Args:
expected_seq: A sequence containing elements we are expecting.
actual_seq: The sequence that we are testing.
msg: The message to be printed if the test fails.
"""
# pylint: enable=g-doc-args
self.assertItemsEqual(*args, **kwargs)
def assertSameElements(self, expected_seq, actual_seq, msg=None):

@@ -422,2 +357,10 @@ """Assert that two sequences have the same elements (in any order).

# alone.
# Fail on strings: empirically, passing strings to this test method
# is almost always a bug. If comparing the character sets of two strings
# is desired, cast the inputs to sets or lists explicitly.
if (isinstance(expected_seq, basestring) or
isinstance(actual_seq, basestring)):
self.fail('Passing a string to assertSameElements is usually a bug. '
'Did you mean to use assertEqual?\n'
'Expected: %s\nActual: %s' % (expected_seq, actual_seq))
try:

@@ -475,2 +418,3 @@ expected = dict([(element, None) for element in expected_seq])

def assertRegexMatch(self, actual_str, regexes, message=None):
# pylint: disable=g-doc-bad-indent
"""Asserts that at least one regex in regexes matches str.

@@ -507,2 +451,3 @@

"""
# pylint: enable=g-doc-bad-indent
if isinstance(regexes, basestring):

@@ -609,5 +554,23 @@ self.fail('regexes is a string; use assertRegexpMatches instead.')

class _AssertRaisesContext(object):
def __init__(self, expected_exception, test_case, test_func):
self.expected_exception = expected_exception
self.test_case = test_case
self.test_func = test_func
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, tb):
if exc_type is None:
self.test_case.fail(self.expected_exception.__name__ + ' not raised')
if not issubclass(exc_type, self.expected_exception):
return False
self.test_func(exc_value)
return True
def assertRaisesWithPredicateMatch(self, expected_exception, predicate,
callable_obj, *args,
**kwargs):
callable_obj=None, *args, **kwargs):
# pylint: disable=g-doc-args
"""Asserts that exception is thrown and predicate(exception) is true.

@@ -622,19 +585,32 @@

kwargs: Extra keyword args.
Returns:
A context manager if callable_obj is None. Otherwise, None.
Raises:
self.failureException if callable_obj does not raise a macthing exception.
"""
try:
callable_obj(*args, **kwargs)
except expected_exception as err:
# pylint: enable=g-doc-args
def Check(err):
self.assert_(predicate(err),
'%r does not match predicate %r' % (err, predicate))
else:
self.fail(expected_exception.__name__ + ' not raised')
context = self._AssertRaisesContext(expected_exception, self, Check)
if callable_obj is None:
return context
with context:
callable_obj(*args, **kwargs)
def assertRaisesWithLiteralMatch(self, expected_exception,
expected_exception_message, callable_obj,
*args, **kwargs):
expected_exception_message,
callable_obj=None, *args, **kwargs):
# pylint: disable=g-doc-args
"""Asserts that the message in a raised exception equals the given string.
Unlike assertRaisesWithRegexpMatch this method takes a literal string, not
Unlike assertRaisesRegexp, this method takes a literal string, not
a regular expression.
with self.assertRaisesWithLiteralMatch(ExType, 'message'):
DoSomething()
Args:

@@ -645,9 +621,14 @@ expected_exception: Exception class expected to be raised.

equal str(e).
callable_obj: Function to be called.
callable_obj: Function to be called, or None to return a context.
args: Extra args.
kwargs: Extra kwargs.
Returns:
A context manager if callable_obj is None. Otherwise, None.
Raises:
self.failureException if callable_obj does not raise a macthing exception.
"""
try:
callable_obj(*args, **kwargs)
except expected_exception as err:
# pylint: enable=g-doc-args
def Check(err):
actual_exception_message = str(err)

@@ -659,7 +640,12 @@ self.assert_(expected_exception_message == actual_exception_message,

actual_exception_message))
else:
self.fail(expected_exception.__name__ + ' not raised')
context = self._AssertRaisesContext(expected_exception, self, Check)
if callable_obj is None:
return context
with context:
callable_obj(*args, **kwargs)
def assertRaisesWithRegexpMatch(self, expected_exception, expected_regexp,
callable_obj, *args, **kwargs):
callable_obj=None, *args, **kwargs):
# pylint: disable=g-doc-args
"""Asserts that the message in a raised exception matches the given regexp.

@@ -674,14 +660,16 @@

found in error message.
callable_obj: Function to be called.
callable_obj: Function to be called, or None to return a context.
args: Extra args.
kwargs: Extra keyword args.
Returns:
A context manager if callable_obj is None. Otherwise, None.
Raises:
self.failureException if callable_obj does not raise a macthing exception.
"""
# TODO(user): this is a good candidate for a global
# search-and-replace.
self.assertRaisesRegexp(
expected_exception,
expected_regexp,
callable_obj,
*args,
**kwargs)
# pylint: enable=g-doc-args
# TODO(user): this is a good candidate for a global search-and-replace.
return self.assertRaisesRegexp(expected_exception, expected_regexp,
callable_obj, *args, **kwargs)

@@ -714,3 +702,30 @@ def assertContainsInOrder(self, strings, target):

def assertContainsSubsequence(self, container, subsequence):
"""Assert that "container" contains "subsequence" as a subsequence.
Asserts that big_list contains all the elements of small_list, in order, but
possibly with other elements interspersed. For example, [1, 2, 3] is a
subsequence of [0, 0, 1, 2, 0, 3, 0] but not of [0, 0, 1, 3, 0, 2, 0].
Args:
container: the list we're testing for subsequence inclusion.
subsequence: the list we hope will be a subsequence of container.
"""
first_nonmatching = None
reversed_container = list(reversed(container))
subsequence = list(subsequence)
for e in subsequence:
if e not in reversed_container:
first_nonmatching = e
break
while e != reversed_container.pop():
pass
if first_nonmatching is not None:
self.fail('%s not a subsequence of %s. First non-matching element: %s' %
(subsequence, container, first_nonmatching))
def assertTotallyOrdered(self, *groups):
# pylint: disable=g-doc-args
"""Asserts that total ordering has been implemented correctly.

@@ -741,7 +756,7 @@

[1], # Integers sort earlier.
['foo'], # As do strings.
[A(1, 'a')],
[A(2, 'b')], # 2 is after 1.
[A(2, 'c'), A(2, 'd')], # The second argument is irrelevant.
[A(3, 'z')])
[A(3, 'c'), A(3, 'd')], # The second argument is irrelevant.
[A(4, 'z')],
['foo']) # Strings sort last.

@@ -754,2 +769,3 @@ Args:

"""
# pylint: enable=g-doc-args

@@ -822,12 +838,12 @@ def CheckOrder(small, big):

def Sorted(iterable):
def Sorted(list_of_items):
try:
return sorted(iterable) # In 3.3, unordered objects are possible.
return sorted(list_of_items) # In 3.3, unordered are possible.
except TypeError:
return list(iterable)
return list_of_items
if a == b:
return
a_items = Sorted(a.iteritems())
b_items = Sorted(b.iteritems())
a_items = Sorted(list(a.iteritems()))
b_items = Sorted(list(b.iteritems()))

@@ -854,6 +870,10 @@ unexpected = []

if a_key not in b:
unexpected.append((a_key, a_value))
missing.append((a_key, a_value))
elif a_value != b[a_key]:
different.append((a_key, a_value, b[a_key]))
for b_key, b_value in b_items:
if b_key not in a:
unexpected.append((b_key, b_value))
if unexpected:

@@ -871,5 +891,2 @@ message.append(

for b_key, b_value in b_items:
if b_key not in a:
missing.append((b_key, b_value))
if missing:

@@ -892,4 +909,5 @@ message.append(

sorted(parsed_b.params.split(';')))
self.assertDictEqual(urlparse.parse_qs(parsed_a.query),
urlparse.parse_qs(parsed_b.query))
self.assertDictEqual(
urlparse.parse_qs(parsed_a.query, keep_blank_values=True),
urlparse.parse_qs(parsed_b.query, keep_blank_values=True))

@@ -1118,2 +1136,13 @@ def assertSameStructure(self, a, b, aname='a', bname='b', msg=None):

def _StopCapturingStream(stream):
"""Stops capturing the given output stream.
Args:
stream: Should be sys.stdout or sys.stderr.
"""
assert _captured_streams.has_key(stream)
for cap_stream in _captured_streams.itervalues():
cap_stream.StopCapture()
def _DiffTestOutput(stream, golden_filename):

@@ -1126,8 +1155,5 @@ """Compare ouput of redirected stream to contents of golden file.

"""
assert _captured_streams.has_key(stream)
_StopCapturingStream(stream)
cap = _captured_streams[stream]
for cap_stream in _captured_streams.itervalues():
cap_stream.StopCapture()
try:

@@ -1155,9 +1181,37 @@ _Diff(cap.filename(), golden_filename)

# TODO(user): Make CaptureTest* be usable as context managers to easily stop
# capturing at the appropriate time to make debugging failures much easier.
class _DiffingTestOutputContext(object):
def __init__(self, diff_fn):
self._diff_fn = diff_fn
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, tb):
self._diff_fn()
return True
# Public interface
def CaptureTestStdout(outfile=''):
"""Capture the stdout stream to a file until StopCapturing() is called."""
def CaptureTestStdout(outfile='', expected_output_filepath=None):
"""Capture the stdout stream to a file.
If expected_output_filepath, then this function returns a context manager
that stops capturing and performs a diff when the context is exited.
with basetest.CaptureTestStdout(expected_output_filepath=some_filepath):
sys.stdout.print(....)
Otherwise, StopCapturing() must be called to stop capturing stdout, and then
DiffTestStdout() must be called to do the comparison.
Args:
outfile: The path to the local filesystem file to which to capture output;
if omitted, a standard filepath in --test_tmpdir will be used.
expected_output_filepath: The path to the local filesystem file containing
the expected output to be diffed against when the context is exited.
Returns:
A context manager if expected_output_filepath is specified, otherwise
None.
"""
if not outfile:

@@ -1170,6 +1224,28 @@ outfile = os.path.join(FLAGS.test_tmpdir, 'captured.out')

_CaptureTestOutput(sys.stdout, outfile)
if expected_output_filepath is not None:
return _DiffingTestOutputContext(
lambda: DiffTestStdout(expected_output_filepath))
def CaptureTestStderr(outfile=''):
"""Capture the stderr stream to a file until StopCapturing() is called."""
def CaptureTestStderr(outfile='', expected_output_filepath=None):
"""Capture the stderr stream to a file.
If expected_output_filepath, then this function returns a context manager
that stops capturing and performs a diff when the context is exited.
with basetest.CaptureTestStderr(expected_output_filepath=some_filepath):
sys.stderr.print(....)
Otherwise, StopCapturing() must be called to stop capturing stderr, and then
DiffTestStderr() must be called to do the comparison.
Args:
outfile: The path to the local filesystem file to which to capture output;
if omitted, a standard filepath in --test_tmpdir will be used.
expected_output_filepath: The path to the local filesystem file containing
the expected output, to be diffed against when the context is exited.
Returns:
A context manager if expected_output_filepath is specified, otherwise
None.
"""
if not outfile:

@@ -1182,2 +1258,5 @@ outfile = os.path.join(FLAGS.test_tmpdir, 'captured.err')

_CaptureTestOutput(sys.stderr, outfile)
if expected_output_filepath is not None:
return _DiffingTestOutputContext(
lambda: DiffTestStderr(expected_output_filepath))

@@ -1291,4 +1370,4 @@

try:
with open(lhs, 'rt') as lhs_f:
with open(rhs, 'rt') as rhs_f:
with open(lhs, 'r') as lhs_f:
with open(rhs, 'r') as rhs_f:
diff_text = ''.join(

@@ -1400,2 +1479,3 @@ difflib.unified_diff(lhs_f.readlines(), rhs_f.readlines()))

def main(*args, **kwargs):
# pylint: disable=g-doc-args
"""Executes a set of Python unit tests.

@@ -1408,2 +1488,3 @@

Args:

@@ -1413,2 +1494,3 @@ args: Positional arguments passed through to unittest.TestProgram.__init__.

"""
# pylint: enable=g-doc-args
_RunInApp(RunTests, args, kwargs)

@@ -1419,3 +1501,3 @@

"""Returns True iff app.main or app.really_start is active."""
f = sys._getframe().f_back
f = sys._getframe().f_back # pylint: disable=protected-access
app_dict = app.__dict__

@@ -1487,4 +1569,6 @@ while f:

faulthandler.enable()
except Exception as e:
except Exception as e: # pylint: disable=broad-except
sys.stderr.write('faulthandler.enable() failed %r; ignoring.\n' % e)
else:
faulthandler.register(signal.SIGTERM)
if _IsInAppMain():

@@ -1491,0 +1575,0 @@ # Save command-line flags so the side effects of FLAGS(sys.argv) can be

@@ -33,3 +33,3 @@ #!/usr/bin/env python

from dateutil import parser
import dateutil.parser
import pytz

@@ -346,5 +346,4 @@

"""
if tz is None:
return cls.Localize(cls(*(time.strptime(date_string, format)[:6])))
return tz.localize(cls(*(time.strptime(date_string, format)[:6])))
date_time = super(BaseTimestamp, cls).strptime(date_string, format)
return (tz.localize if tz else cls.Localize)(date_time)

@@ -466,4 +465,6 @@ def astimezone(self, *args, **kwargs):

try:
r = parser.parse(timestring)
except ValueError:
r = dateutil.parser.parse(timestring)
# dateutil will raise ValueError if it's an unknown format -- or
# TypeError in some cases, due to bugs.
except (TypeError, ValueError):
return None

@@ -470,0 +471,0 @@ if not r.tzinfo:

@@ -136,2 +136,37 @@ #!/usr/bin/env python

# TODO(user): remove after migration to Python 3.2.
# This context manager can be replaced with tempfile.TemporaryDirectory in
# Python 3.2 (http://bugs.python.org/issue5178,
# http://docs.python.org/dev/library/tempfile.html#tempfile.TemporaryDirectory).
@contextlib.contextmanager
def TemporaryDirectory(suffix='', prefix='tmp', base_path=None):
"""A context manager to create a temporary directory and clean up on exit.
The parameters are the same ones expected by tempfile.mkdtemp.
The directory will be securely and atomically created.
Everything under it will be removed when exiting the context.
Args:
suffix: optional suffix.
prefix: options prefix.
base_path: the base path under which to create the temporary directory.
Yields:
The absolute path of the new temporary directory.
"""
temp_dir_path = tempfile.mkdtemp(suffix, prefix, base_path)
try:
yield temp_dir_path
finally:
try:
shutil.rmtree(temp_dir_path)
except OSError, e:
if e.message == 'Cannot call rmtree on a symbolic link':
# Interesting synthetic exception made up by shutil.rmtree.
# Means we received a symlink from mkdtemp.
# Also means must clean up the symlink instead.
os.unlink(temp_dir_path)
else:
raise
def MkDirs(directory, force_mode=None):

@@ -168,3 +203,3 @@ """Makes a directory including its parent directories.

def RmDirs(dir_name):
"""Removes dir_name and every non-empty directory in dir_name.
"""Removes dir_name and every subsequently empty directory above it.

@@ -194,3 +229,3 @@ Unlike os.removedirs and shutil.rmtree, this function doesn't raise an error

except OSError, err:
if err.errno not in (errno.EACCES, errno.ENOTEMPTY):
if err.errno not in (errno.EACCES, errno.ENOTEMPTY, errno.EPERM):
raise

@@ -197,0 +232,0 @@

@@ -56,11 +56,3 @@ #!/usr/bin/env python

# In Python 2.6, int(float('nan')) intentionally raises a TypeError. This code
# attempts to futureproof us against that eventual upgrade.
try:
_IsNan = math.isnan
except AttributeError:
def _IsNan(x):
return type(x) is float and x != x
def Commas(value):

@@ -95,3 +87,3 @@ """Formats an integer with thousands-separating commas.

plural: A string, the plural form. If not specified, then simple
English rules of regular pluralization will be used.
English rules of regular pluralization will be used.

@@ -111,3 +103,3 @@ Returns:

plural: A string, the plural form. If not specified, then simple
English rules of regular pluralization will be used.
English rules of regular pluralization will be used.

@@ -167,2 +159,3 @@ Returns:

noun: A string representing a noun.
Returns:

@@ -188,2 +181,4 @@ A string containing noun prefixed with an indefinite article, e.g.,

DecimalPrefix(576012, 'bps') -> '576 kbps'
DecimalPrefix(576012, '') -> '576 k'
DecimalPrefix(576, '') -> '576'
DecimalPrefix(1574215, 'bps', 2) -> '1.6 Mbps'

@@ -199,3 +194,4 @@

quantity: A number.
unit: A string.
unit: A string, the dimension for quantity, with no multipliers (e.g.
"bps"). If quantity is dimensionless, the empty string.
precision: An integer, the minimum number of digits to display.

@@ -206,3 +202,5 @@ min_scale: minimum power of 1000 to scale to, (None = unbounded).

Returns:
A string.
A string, composed by the decimal (scaled) representation of quantity at the
required precision, possibly followed by a space, the appropriate multiplier
and the unit.
"""

@@ -221,2 +219,3 @@ return _Prefix(quantity, unit, precision, DecimalScale, min_scale=min_scale,

BinaryPrefix(576012, 'B') -> '562 KiB'
BinaryPrefix(576012, '') -> '562 Ki'

@@ -228,7 +227,10 @@ See also:

quantity: A number.
unit: A string.
unit: A string, the dimension for quantity, with no multipliers (e.g.
"B"). If quantity is dimensionless, the empty string.
precision: An integer, the minimum number of digits to display.
Returns:
A string.
A string, composed by the decimal (scaled) representation of quantity at the
required precision, possibly followed by a space, the appropriate multiplier
and the unit.
"""

@@ -243,3 +245,4 @@ return _Prefix(quantity, unit, precision, BinaryScale)

quantity: A number.
unit: A string.
unit: A string, the dimension for quantity, with no multipliers (e.g.
"bps"). If quantity is dimensionless, the empty string.
precision: An integer, the minimum number of digits to display.

@@ -252,16 +255,28 @@ scale_callable: A callable, scales the number and units.

"""
separator = ' ' if unit else ''
if not quantity:
return '0 %s' % unit
return '0%s%s' % (separator, unit)
if quantity in [float('inf'), float('-inf')] or _IsNan(quantity):
return '%f %s' % (quantity, unit)
if quantity in [float('inf'), float('-inf')] or math.isnan(quantity):
return '%f%s%s' % (quantity, separator, unit)
scaled_quantity, scaled_unit = scale_callable(quantity, unit, **args)
print_pattern = '%%.%df %%s' % max(0, (precision - int(
if scaled_unit:
separator = ' '
print_pattern = '%%.%df%%s%%s' % max(0, (precision - int(
math.log(abs(scaled_quantity), 10)) - 1))
return print_pattern % (scaled_quantity, scaled_unit)
return print_pattern % (scaled_quantity, separator, scaled_unit)
# Prefixes and corresponding min_scale and max_scale for decimal formating.
DECIMAL_PREFIXES = ('y', 'z', 'a', 'f', 'p', 'n', u'µ', 'm',
'', 'k', 'M', 'G', 'T', 'P', 'E', 'Z', 'Y')
DECIMAL_MIN_SCALE = -8
DECIMAL_MAX_SCALE = 8
def DecimalScale(quantity, unit, min_scale=0, max_scale=None):

@@ -283,13 +298,9 @@ """Get the scaled value and decimal prefixed unit in a tupple.

"""
if min_scale is None or min_scale < 0:
negative_powers = ('m', u'µ', 'n', 'p', 'f', 'a', 'z', 'y')
if min_scale is not None:
negative_powers = tuple(negative_powers[0:-min_scale])
else:
negative_powers = None
if max_scale is None or max_scale > 0:
powers = ('k', 'M', 'G', 'T', 'P', 'E', 'Z', 'Y')
if max_scale is not None:
powers = tuple(powers[0:max_scale])
return _Scale(quantity, unit, 1000, powers, negative_powers)
if min_scale is None or min_scale < DECIMAL_MIN_SCALE:
min_scale = DECIMAL_MIN_SCALE
if max_scale is None or max_scale > DECIMAL_MAX_SCALE:
max_scale = DECIMAL_MAX_SCALE
powers = DECIMAL_PREFIXES[
min_scale - DECIMAL_MIN_SCALE:max_scale - DECIMAL_MIN_SCALE + 1]
return _Scale(quantity, unit, 1000, powers, min_scale)

@@ -314,3 +325,3 @@

def _Scale(quantity, unit, multiplier, prefixes, inverse_prefixes=None):
def _Scale(quantity, unit, multiplier, prefixes=None, min_scale=None):
"""Returns the formatted quantity and unit into a tuple.

@@ -323,26 +334,22 @@

prefixes: A sequence of strings.
inverse_prefixes: Prefixes to use for negative powers,
If empty or None, no scaling is done for fractions.
If empty or None, no scaling is done.
min_scale: minimum power of multiplier corresponding to the first prefix.
If None assumes prefixes are for positive powers only.
Returns:
A tuple containing the raw scaled quantity and the prefixed unit.
A tuple containing the raw scaled quantity (float) and the prefixed unit.
"""
if (not prefixes or not quantity or math.isnan(quantity) or
quantity in [float('inf'), float('-inf')]):
return float(quantity), unit
if not quantity:
return 0, unit
if quantity in [float('inf'), float('-inf')] or _IsNan(quantity):
return quantity, unit
power = int(math.log(abs(quantity), multiplier))
if abs(quantity) > 1 or not inverse_prefixes:
if power < 1:
return quantity, unit
power = min(power, len(prefixes))
prefix = prefixes[power - 1]
value = float(quantity) / multiplier ** power
else:
power = min(-power + 1, len(inverse_prefixes))
prefix = inverse_prefixes[power - 1]
value = quantity * multiplier ** power
if min_scale is None:
min_scale = 0
prefixes = ('',) + tuple(prefixes)
value, prefix = quantity, ''
for power, prefix in enumerate(prefixes, min_scale):
# This is more numerically accurate than '/ multiplier ** power'.
value = float(quantity) * multiplier ** -power
if abs(value) < multiplier:
break
return value, prefix + unit

@@ -470,1 +477,46 @@

return segments
def UnixTimestamp(unix_ts, tz):
"""Format a UNIX timestamp into a human-readable string.
Args:
unix_ts: UNIX timestamp (number of seconds since epoch). May be a floating
point number.
tz: datetime.tzinfo object, timezone to use when formatting. Typical uses
might want to rely on datelib or pytz to provide the tzinfo object, e.g.
use datelib.UTC, datelib.US_PACIFIC, or pytz.timezone('Europe/Dublin').
Returns:
Formatted string like '2013-11-17 11:08:27.720000 PST'.
"""
date_time = datetime.datetime.fromtimestamp(unix_ts, tz)
return date_time.strftime('%Y-%m-%d %H:%M:%S.%f %Z')
def AddOrdinalSuffix(value):
"""Adds an ordinal suffix to a non-negative integer (e.g. 1 -> '1st').
Args:
value: A non-negative integer.
Returns:
A string containing the integer with a two-letter ordinal suffix.
"""
if value < 0 or value != int(value):
raise ValueError('argument must be a non-negative integer: %s' % value)
if value % 100 in (11, 12, 13):
suffix = 'th'
else:
rem = value % 10
if rem == 1:
suffix = 'st'
elif rem == 2:
suffix = 'nd'
elif rem == 3:
suffix = 'rd'
else:
suffix = 'th'
return str(value) + suffix
Metadata-Version: 1.0
Name: google-apputils
Version: 0.4.0
Version: 0.4.1
Summary: UNKNOWN

@@ -5,0 +5,0 @@ Home-page: http://code.google.com/p/google-apputils-python

@@ -25,3 +25,3 @@ #!/usr/bin/env python

REQUIRE = [
"python-dateutil>=1.4,<2",
"python-dateutil>=1.4",
"python-gflags>=1.4",

@@ -54,3 +54,3 @@ "pytz>=2010",

name="google-apputils",
version="0.4.0",
version="0.4.1",
packages=find_packages(exclude=["tests"]),

@@ -57,0 +57,0 @@ namespace_packages=["google"],

#!/usr/bin/env python
"""Bootstrap setuptools installation
If you want to use setuptools in your package's setup.py, just include this
file in the same directory with it, and add this to the top of your setup.py::
from ez_setup import use_setuptools
use_setuptools()
If you want to require a specific version of setuptools, set a download
mirror, or use an alternate download directory, you can do so by supplying
the appropriate options to ``use_setuptools()``.
This file can also be run as a script to install or upgrade setuptools.
"""
import sys
DEFAULT_VERSION = "0.6c11"
DEFAULT_URL = "http://pypi.python.org/packages/%s/s/setuptools/" % sys.version[:3]
md5_data = {
'setuptools-0.6c10-py2.3.egg': 'ce1e2ab5d3a0256456d9fc13800a7090',
'setuptools-0.6c10-py2.4.egg': '57d6d9d6e9b80772c59a53a8433a5dd4',
'setuptools-0.6c10-py2.5.egg': 'de46ac8b1c97c895572e5e8596aeb8c7',
'setuptools-0.6c10-py2.6.egg': '58ea40aef06da02ce641495523a0b7f5',
'setuptools-0.6c11-py2.3.egg': '2baeac6e13d414a9d28e7ba5b5a596de',
'setuptools-0.6c11-py2.4.egg': 'bd639f9b0eac4c42497034dec2ec0c2b',
'setuptools-0.6c11-py2.5.egg': '64c94f3bf7a72a13ec83e0b24f2749b2',
'setuptools-0.6c11-py2.6.egg': 'bfa92100bd772d5a213eedd356d64086',
'setuptools-0.6c8-py2.3.egg': '50759d29b349db8cfd807ba8303f1902',
'setuptools-0.6c8-py2.4.egg': 'cba38d74f7d483c06e9daa6070cce6de',
'setuptools-0.6c8-py2.5.egg': '1721747ee329dc150590a58b3e1ac95b',
'setuptools-0.6c9-py2.3.egg': 'a83c4020414807b496e4cfbe08507c03',
'setuptools-0.6c9-py2.4.egg': '260a2be2e5388d66bdaee06abec6342a',
'setuptools-0.6c9-py2.5.egg': 'fe67c3e5a17b12c0e7c541b7ea43a8e6',
'setuptools-0.6c9-py2.6.egg': 'ca37b1ff16fa2ede6e19383e7b59245a',
}
import sys, os
try: from hashlib import md5
except ImportError: from md5 import md5
def _validate_md5(egg_name, data):
if egg_name in md5_data:
digest = md5(data).hexdigest()
if digest != md5_data[egg_name]:
print >>sys.stderr, (
"md5 validation of %s failed! (Possible download problem?)"
% egg_name
)
sys.exit(2)
return data
def use_setuptools(
version=DEFAULT_VERSION, download_base=DEFAULT_URL, to_dir=os.curdir,
download_delay=15
):
"""Automatically find/download setuptools and make it available on sys.path
`version` should be a valid setuptools version number that is available
as an egg for download under the `download_base` URL (which should end with
a '/'). `to_dir` is the directory where setuptools will be downloaded, if
it is not already available. If `download_delay` is specified, it should
be the number of seconds that will be paused before initiating a download,
should one be required. If an older version of setuptools is installed,
this routine will print a message to ``sys.stderr`` and raise SystemExit in
an attempt to abort the calling script.
"""
was_imported = 'pkg_resources' in sys.modules or 'setuptools' in sys.modules
def do_download():
egg = download_setuptools(version, download_base, to_dir, download_delay)
sys.path.insert(0, egg)
import setuptools; setuptools.bootstrap_install_from = egg
try:
import pkg_resources
except ImportError:
return do_download()
try:
pkg_resources.require("setuptools>="+version); return
except pkg_resources.VersionConflict, e:
if was_imported:
print >>sys.stderr, (
"The required version of setuptools (>=%s) is not available, and\n"
"can't be installed while this script is running. Please install\n"
" a more recent version first, using 'easy_install -U setuptools'."
"\n\n(Currently using %r)"
) % (version, e.args[0])
sys.exit(2)
except pkg_resources.DistributionNotFound:
pass
del pkg_resources, sys.modules['pkg_resources'] # reload ok
return do_download()
def download_setuptools(
version=DEFAULT_VERSION, download_base=DEFAULT_URL, to_dir=os.curdir,
delay = 15
):
"""Download setuptools from a specified location and return its filename
`version` should be a valid setuptools version number that is available
as an egg for download under the `download_base` URL (which should end
with a '/'). `to_dir` is the directory where the egg will be downloaded.
`delay` is the number of seconds to pause before an actual download attempt.
"""
import urllib2, shutil
egg_name = "setuptools-%s-py%s.egg" % (version,sys.version[:3])
url = download_base + egg_name
saveto = os.path.join(to_dir, egg_name)
src = dst = None
if not os.path.exists(saveto): # Avoid repeated downloads
try:
from distutils import log
if delay:
log.warn("""
---------------------------------------------------------------------------
This script requires setuptools version %s to run (even to display
help). I will attempt to download it for you (from
%s), but
you may need to enable firewall access for this script first.
I will start the download in %d seconds.
(Note: if this machine does not have network access, please obtain the file
%s
and place it in this directory before rerunning this script.)
---------------------------------------------------------------------------""",
version, download_base, delay, url
); from time import sleep; sleep(delay)
log.warn("Downloading %s", url)
src = urllib2.urlopen(url)
# Read/write all in one block, so we don't create a corrupt file
# if the download is interrupted.
data = _validate_md5(egg_name, src.read())
dst = open(saveto,"wb"); dst.write(data)
finally:
if src: src.close()
if dst: dst.close()
return os.path.realpath(saveto)
def main(argv, version=DEFAULT_VERSION):
"""Install or upgrade setuptools and EasyInstall"""
try:
import setuptools
except ImportError:
egg = None
try:
egg = download_setuptools(version, delay=0)
sys.path.insert(0,egg)
from setuptools.command.easy_install import main
return main(list(argv)+[egg]) # we're done here
finally:
if egg and os.path.exists(egg):
os.unlink(egg)
else:
if setuptools.__version__ == '0.0.1':
print >>sys.stderr, (
"You have an obsolete version of setuptools installed. Please\n"
"remove it from your system entirely before rerunning this script."
)
sys.exit(2)
req = "setuptools>="+version
import pkg_resources
try:
pkg_resources.require(req)
except pkg_resources.VersionConflict:
try:
from setuptools.command.easy_install import main
except ImportError:
from easy_install import main
main(list(argv)+[download_setuptools(delay=0)])
sys.exit(0) # try to force an exit
else:
if argv:
from setuptools.command.easy_install import main
main(argv)
else:
print "Setuptools version",version,"or greater has been installed."
print '(Run "ez_setup.py -U setuptools" to reinstall or upgrade.)'
def update_md5(filenames):
"""Update our built-in md5 registry"""
import re
for name in filenames:
base = os.path.basename(name)
f = open(name,'rb')
md5_data[base] = md5(f.read()).hexdigest()
f.close()
data = [" %r: %r,\n" % it for it in md5_data.items()]
data.sort()
repl = "".join(data)
import inspect
srcfile = inspect.getsourcefile(sys.modules[__name__])
f = open(srcfile, 'rb'); src = f.read(); f.close()
match = re.search("\nmd5_data = {\n([^}]+)}", src)
if not match:
print >>sys.stderr, "Internal error!"
sys.exit(2)
src = src[:match.start(1)] + repl + src[match.end(1):]
f = open(srcfile,'w')
f.write(src)
f.close()
if __name__=='__main__':
if len(sys.argv)>2 and sys.argv[1]=='--md5update':
update_md5(sys.argv[2:])
else:
main(sys.argv[1:])
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
APPENDIX: How to apply the Apache License to your work.
To apply the Apache License to your work, attach the following
boilerplate notice, with the fields enclosed by brackets "[]"
replaced with your own identifying information. (Don't include
the brackets!) The text should be enclosed in the appropriate
comment syntax for the file format. We also recommend that a
file or class name and description of purpose be included on the
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright [yyyy] [name of copyright owner]
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
#!/usr/bin/env python
#!/usr/bin/env python
#
# Copyright 2005 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS-IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Helper script used by app_unittest.sh"""
import sys
import gflags as flags
from google.apputils import app
FLAGS = flags.FLAGS
flags.DEFINE_boolean("raise_exception", False, "throw MyException from main")
class MyException(Exception):
pass
def main(args):
if FLAGS.raise_exception:
raise MyException
if __name__ == '__main__':
app.run()
#!/usr/bin/env python
# Copyright 2008 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS-IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests for app.py ."""
import os
import shutil
import socket
import sys
import mox
from google.apputils import basetest
from google.apputils import app
import gflags as flags
FLAGS = flags.FLAGS
class TestFunctions(basetest.TestCase):
def testInstallExceptionHandler(self):
self.assertRaises(TypeError, app.InstallExceptionHandler, 1)
if __name__ == '__main__':
basetest.main()
#! /bin/bash
# Copyright 2003 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS-IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Author: Douglas Greiman
PYTHON=$(which python)
function die {
echo "$1"
exit 1
}
APP_PACKAGE="google.apputils"
# This should exit with error code because no main defined
$PYTHON -c "from ${APP_PACKAGE} import app; app.run()" 2>/dev/null && \
die "Test 1 failed"
# Standard use. This should exit successfully
$PYTHON -c "from ${APP_PACKAGE} import app
a = 0
def main(argv):
global a
a = 1
app.run()
assert a == 1" || \
die "Test 2 failed"
# app.run called in exec block, script read from -c string. Should succeed.
$PYTHON -c "from ${APP_PACKAGE} import app
a = 0
s='''
def main(argv):
global a
a = 1
app.run()
'''
exec s
assert a == 1" || \
die "Test 4 failed"
# app.run called in exec block, script read from file. Should succeed.
PYFILE=$TEST_TMPDIR/tmp.py
cat >$PYFILE <<EOF
from ${APP_PACKAGE} import app
a = 0
s='''
def main(argv):
global a
a = 1
app.run()
'''
exec s
assert a == 1
EOF
$PYTHON $PYFILE || \
die "Test 5 failed"
rm -f $PYFILE
# Test for usage from --help
$PYTHON -c "from ${APP_PACKAGE} import app
def main(argv):
pass
app.run()
" --help | grep 'USAGE:' >/dev/null || \
die "Test 11 failed"
# Test that the usage() function works
$PYTHON -c "from ${APP_PACKAGE} import app
def main(argv):
app.usage()
app.run()
" 2>&1 | egrep '^ --' >/dev/null || \
die "Test 12 failed"
# Test that shorthelp doesn't give flags in this case.
$PYTHON -c "from ${APP_PACKAGE} import app
def main(argv):
app.usage(shorthelp=1)
app.run()
" 2>&1 | grep '^ --' >/dev/null && \
die "Test 13 failed"
# Test writeto_stdout.
$PYTHON -c "from ${APP_PACKAGE} import app
def main(argv):
app.usage(shorthelp=1, writeto_stdout=1)
app.run()
" | grep 'USAGE' >/dev/null || \
die "Test 14 failed"
# Test detailed_error
$PYTHON -c "from ${APP_PACKAGE} import app
def main(argv):
app.usage(shorthelp=1, writeto_stdout=1, detailed_error='BAZBAZ')
app.run()
" 2>&1 | grep 'BAZBAZ' >/dev/null || \
die "Test 15 failed"
# Test exitcode
$PYTHON -c "from ${APP_PACKAGE} import app
def main(argv):
app.usage(writeto_stdout=1, exitcode=1)
app.run()
" >/dev/null
if [ "$?" -ne "1" ]; then
die "Test 16 failed"
fi
# Test --help (this could use wrapping which is tested elsewhere)
$PYTHON -c "from ${APP_PACKAGE} import app
def main(argv):
print 'FAIL'
app.run()
" 2>&1 --help | grep 'USAGE: -c \[flags\]' >/dev/null || \
die "Test 17 failed"
# Test --help does not wrap for __main__.__doc__
$PYTHON -c "from ${APP_PACKAGE} import app
import sys
def main(argv):
print 'FAIL'
doc = []
for i in xrange(10):
doc.append(str(i))
doc.append('12345678 ')
sys.modules['__main__'].__doc__ = ''.join(doc)
app.run()
" 2>&1 --help | grep '712345678 812345678' >/dev/null || \
die "Test 18 failed"
# Test --help with forced wrap for __main__.__doc__
$PYTHON -c "from ${APP_PACKAGE} import app
import sys
def main(argv):
print 'FAIL'
doc = []
for i in xrange(10):
doc.append(str(i))
doc.append('12345678 ')
sys.modules['__main__'].__doc__ = ''.join(doc)
app.SetEnableHelpWrapping()
app.run()
" 2>&1 --help | grep '712345678 812345678' >/dev/null && \
die "Test 19 failed"
# Test UsageError
$PYTHON -c "from ${APP_PACKAGE} import app
def main(argv):
raise app.UsageError('You made a usage error')
app.run()
" 2>&1 | grep "You made a usage error" >/dev/null || \
die "Test 20 failed"
# Test UsageError exit code
$PYTHON -c "from ${APP_PACKAGE} import app
def main(argv):
raise app.UsageError('You made a usage error', exitcode=64)
app.run()
" > /dev/null 2>&1
if [ "$?" -ne "64" ]; then
die "Test 21 failed"
fi
# Test catching top-level exceptions. We should get the exception name on
# stderr.
./app_test_helper.py \
--raise_exception 2>&1 | grep -q 'MyException' || die "Test 23 failed"
# Test exception handlers are called
have_handler_output=$TEST_TMPDIR/handler.txt
$PYTHON -c "from ${APP_PACKAGE} import app
def main(argv):
raise ValueError('look for me')
class TestExceptionHandler(app.ExceptionHandler):
def __init__(self, msg):
self.msg = msg
def Handle(self, exc):
print '%s %s' % (self.msg, exc)
app.InstallExceptionHandler(TestExceptionHandler('first'))
app.InstallExceptionHandler(TestExceptionHandler('second'))
app.run()
" > $have_handler_output 2>&1
grep -q "first look for me" $have_handler_output || die "Test 24 failed"
grep -q "second look for me" $have_handler_output || die "Test 25 failed"
no_handler_output=$TEST_TMPDIR/no_handler.txt
# Test exception handlers are not called for "normal" exits
for exc in "SystemExit(1)" "app.UsageError('foo')"; do
$PYTHON -c "from ${APP_PACKAGE} import app
def main(argv):
raise $exc
class TestExceptionHandler(app.ExceptionHandler):
def Handle(self, exc):
print 'handler was called'
app.InstallExceptionHandler(TestExceptionHandler())
app.run()
" > $no_handler_output 2>&1
grep -q "handler was called" $no_handler_output && die "Test 26 ($exc) failed"
done
# Test --help expands docstring.
$PYTHON -c "
'''USAGE: %s [flags]'''
from ${APP_PACKAGE} import app
def main(argv): print 'FAIL'
app.run()
" --help 2>&1 |
fgrep 'USAGE: -c [flags]' >/dev/null ||
die "Test 27 failed"
# Test --help expands docstring.
$PYTHON -c "
'''USAGE: %s --fmt=\"%%s\" --fraction=50%%'''
from ${APP_PACKAGE} import app
def main(argv): print 'FAIL'
app.run()
" --help 2>&1 |
fgrep 'USAGE: -c --fmt="%s" --fraction=50%' >/dev/null ||
die "Test 28 failed"
# Test --help expands docstring.
$PYTHON -c "
'''>%s|%%s|%%%s|%%%%s|%%%%%s<'''
from ${APP_PACKAGE} import app
def main(argv): print 'FAIL'
app.run()
" --help 2>&1 |
fgrep '>-c|%s|%-c|%%s|%%-c<' >/dev/null ||
die "Test 29 failed"
# Test bad docstring.
$PYTHON -c "
'''>%@<'''
from ${APP_PACKAGE} import app
def main(argv): print 'FAIL'
app.run()
" --help 2>&1 |
fgrep '>%@<' >/dev/null ||
die "Test 30 failed"
readonly HELP_PROG="
from ${APP_PACKAGE} import app
def main(argv): print 'HI'
app.run()
"
echo "PASS"
#!/usr/bin/env python
#
# Copyright 2007 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS-IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Test tool to demonstrate appcommands.py usage.
This tool shows how to use appcommands.py.
"""
from google.apputils import appcommands
import gflags as flags
FLAGS = flags.FLAGS
flags.DEFINE_string('hint', '', 'Global hint to show in commands')
# Name taken from app.py
class Test1(appcommands.Cmd):
"""Help for test1."""
def __init__(self, name, flag_values, **kargs):
"""Init and register flags specific to command."""
appcommands.Cmd.__init__(self, name, flag_values=flag_values, **kargs)
# self._all_commands_help allows you to define a different message to be
# displayed when all commands are displayed vs. the single command.
self._all_commands_help = ''
# Flag --fail1 is specific to this command
flags.DEFINE_boolean('fail1', False, 'Make test1 fail',
flag_values=flag_values)
flags.DEFINE_string('foo', '', 'Param foo', flag_values=flag_values)
flags.DEFINE_string('bar', '', 'Param bar', flag_values=flag_values)
flags.DEFINE_integer('intfoo', 0, 'Integer foo', flag_values=flag_values)
def Run(self, argv):
"""Output 'Command1' and flag info.
Args:
argv: Remaining command line arguments after parsing flags and command
Returns:
Value of flag fail1
"""
print 'Command1'
if FLAGS.hint:
print "Hint1:'%s'" % FLAGS.hint
print "Foo1:'%s'" % FLAGS.foo
print "Bar1:'%s'" % FLAGS.bar
return FLAGS.fail1 * 1
class Test2(appcommands.Cmd):
"""Help for test2."""
def __init__(self, name, flag_values):
"""Init and register flags specific to command."""
appcommands.Cmd.__init__(self, name, flag_values)
flags.DEFINE_boolean('fail2', False, 'Make test2 fail',
flag_values=flag_values)
flags.DEFINE_string('foo', '', 'Param foo', flag_values=flag_values)
flags.DEFINE_string('bar', '', 'Param bar', flag_values=flag_values)
def Run(self, argv):
"""Output 'Command2' and flag info.
Args:
argv: Remaining command line arguments after parsing flags and command
Returns:
Value of flag fail2
"""
print 'Command2'
if FLAGS.hint:
print "Hint2:'%s'" % FLAGS.hint
print "Foo2:'%s'" % FLAGS.foo
print "Bar2:'%s'" % FLAGS.bar
return FLAGS.fail2 * 1
def Test3(unused_argv):
"""Help for test3."""
print 'Command3'
def Test4(unused_argv):
"""Help for test4."""
print 'Command4'
def main(unused_argv):
"""Register the commands."""
appcommands.AddCmd('test1', Test1, command_aliases=['testalias1',
'testalias2'])
appcommands.AddCmd('test2', Test2)
appcommands.AddCmdFunc('test3', Test3)
appcommands.AddCmdFunc('test4', Test4, command_aliases=['testalias3'],
all_commands_help='')
if __name__ == '__main__':
appcommands.Run()
#! /bin/bash
# Copyright 2007 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS-IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Author: mboerger@google.com
PYTHON=$(which python)
function die {
echo "$1"
exit 1
}
IMPORTS="from google.apputils import app
from google.apputils import appcommands
import gflags as flags"
# This should exit with error code because no main defined
$PYTHON -c "${IMPORTS}
appcommands.Run()" >/dev/null 2>&1 && \
die "Test 1 failed"
# Standard use. This should exit successfully
$PYTHON -c "${IMPORTS}
import sys
def test(argv):
return 0
def main(argv):
appcommands.AddCmdFunc('test', test)
appcommands.Run()
sys.exit(1)" test || \
die "Test 2 failed"
# Even with no return from Cmds Run() does not return
$PYTHON -c "${IMPORTS}
import sys
def test(argv):
return
def main(argv):
appcommands.AddCmdFunc('test', test)
appcommands.Run()
sys.exit(1)" test || \
die "Test 3 failed"
# Standard use with returning an error code.
$PYTHON -c "${IMPORTS}
import sys
def test(argv):
return 1
def main(argv):
appcommands.AddCmdFunc('test', test)
appcommands.Run()
sys.exit(0)" test && \
die "Test 4 failed"
# Executing two commands in single mode does not work (execute only first)
$PYTHON -c "${IMPORTS}
def test1(argv):
return 0
def test2(argv):
return 1
def main(argv):
appcommands.AddCmdFunc('test1', test1)
appcommands.AddCmdFunc('test2', test2)
appcommands.Run()" test1 test2 || \
die "Test 5 failed"
# Registering a command twice does not work.
$PYTHON -c "${IMPORTS}
def test1(argv):
return 0
def main(argv):
appcommands.AddCmdFunc('test', test1)
appcommands.AddCmdFunc('test', test1)
appcommands.Run()" test >/dev/null 2>&1 && \
die "Test 6 failed"
# Executing help, returns non zero return code (1), then check result
RES=`$PYTHON -c "${IMPORTS}
def test1(argv):
'''Help1'''
return 0
def test2(argv):
'''Help2'''
return 0
def main(argv):
appcommands.AddCmdFunc('test1', test1)
appcommands.AddCmdFunc('test2', test2)
appcommands.Run()" help` && die "Test 7 failed"
echo "${RES}" | grep -q "USAGE: " || die "Test 8 failed"
echo "${RES}" | sed -ne '/following commands:/,/.*/p' | \
grep -q "help, test1, test2" || die "Test 9 failed"
echo "${RES}" | grep -q -E "(^| )test1[ \t]+Help1($| )" || die "Test 10 failed"
echo "${RES}" | grep -q -E "(^| )test2[ \t]+Help2($| )" || die "Test 11 failed"
# Executing help for command, returns non zero return code (1), then check result
RES=`$PYTHON -c "${IMPORTS}
def test1(argv):
'''Help1'''
return 0
def test2(argv):
'''Help2'''
return 0
def main(argv):
appcommands.AddCmdFunc('test1', test1)
appcommands.AddCmdFunc('test2', test2)
appcommands.Run()" help test2` && die "Test 12 failed"
echo "${RES}" | grep -q "USAGE: " || die "Test 13 failed"
echo "${RES}" | grep -q -E "(^| )Any of the following commands:" && die "Test 14 failed"
echo "${RES}" | grep -q -E "(^| )test1[ \t]+" && die "Test 15 failed"
echo "${RES}" | grep -q -E "(^| )test2[ \t]+Help2($| )" || die "Test 16 failed"
# Returning False succeeds
$PYTHON -c "${IMPORTS}
def test(argv): return False
def main(argv):
appcommands.AddCmdFunc('test', test)
appcommands.Run()" test || die "Test 17 failed"
# Returning True fails
$PYTHON -c "${IMPORTS}
def test(argv): return True
def main(argv):
appcommands.AddCmdFunc('test', test)
appcommands.Run()" test && die "Test 18 failed"
# Registering using AddCmd instead of AddCmdFunc, should be the normal case
$PYTHON -c "${IMPORTS}
class test(appcommands.Cmd):
def Run(self, argv): return 0
def main(argv):
appcommands.AddCmd('test', test)
appcommands.Run()" test || die "Test 19 failed"
# Registering using AddCmd instead of AddCmdFunc, now fail
$PYTHON -c "${IMPORTS}
class test(appcommands.Cmd):
def Run(self, argv): return 1
def main(argv):
appcommands.AddCmd('test', test)
appcommands.Run()" test && die "Test 20 failed"
TEST=./appcommands_example.py
if test -s "${TEST}.py"; then
TEST="${TEST}.py"
elif test ! -s "${TEST}"; then
die "Could not locate ${TEST}"
fi
# Success
$PYTHON $TEST test1 >/dev/null 2>&1 || die "Test 21 failed"
$PYTHON $TEST test1|grep -q 'Command1' 2>&1 || die "Test 22 failed"
$PYTHON $TEST test2|grep -q 'Command2' 2>&1 || die "Test 23 failed"
$PYTHON $TEST test3|grep -q 'Command3' 2>&1 || die "Test 24 failed"
# Success, --nofail1 belongs to test1
$PYTHON $TEST test1 --nofail1 >/dev/null 2>&1 || die "Test 25 failed"
# Failure, --fail1
$PYTHON $TEST test1 --fail1 >/dev/null 2>&1 && die "Test 26 failed"
# Failure, --nofail1 does not belong to test2
$PYTHON $TEST test2 --nofail1 >/dev/null 2>&1 && die "Test 27 failed"
# Failure, --nofail1 must appear after its command
$PYTHON $TEST --nofail1 test1 >/dev/null 2>&1 && die "Test 28 failed"
# Failure, explicit from --fail2
$PYTHON $TEST test2 --fail2 >/dev/null 2>&1 && die "Test 29 failed"
# Success, --hint before command, foo shown with test1
$PYTHON $TEST --hint 'XYZ' test1|grep -q "Hint1:'XYZ'" || die "Test 30 failed"
# Success, --hint before command, foo shown with test1
$PYTHON $TEST test1 --hint 'XYZ'|grep -q "Hint1:'XYZ'" || die "Test 31 failed"
# Test for standard --help
$PYTHON $TEST --help|grep -q "following commands:" && die "Test 32 failed"
$PYTHON $TEST help|grep -q "following commands:" || die "Test 33 failed"
# No help after command
$PYTHON $TEST test1 --help|grep -q "following commands:" && die "Test 34 failed"
$PYTHON $TEST test1 --help 'XYZ'|grep -q "Hint1:'XYZ'" && die "Test 35 failed"
# Help specific to command:
$PYTHON $TEST --help test1|grep -q "following commands:" && die "Test 36 failed"
$PYTHON $TEST --help test1|grep -q "test1 *Help for test1" && die "Test 37 failed"
$PYTHON $TEST help test1|grep -q "following commands:" && die "Test 38 failed"
$PYTHON $TEST help test1|grep -q "test1, testalias1, testalias2" || die\
"Test 39 failed"
$PYTHON $TEST help testalias1|grep -q "[-]-foo" || die\
"Test 40 failed"
$PYTHON $TEST help testalias2|grep -q "[-]-foo" || die\
"Test 41 failed"
$PYTHON $TEST help test4|grep -q "^ *Help for test4" || die "Test 42 failed"
$PYTHON $TEST help testalias3|grep -q "^ *Help for test4" || die\
"Test 43 failed"
# Help for cmds with all_command_help.
$PYTHON $TEST help|grep -q "test1 *Help for test1" && die "Test 44 failed"
$PYTHON $TEST help test1|grep -q "Help for test1" || die "Test 45 failed"
$PYTHON $TEST help|grep -q "test4 *Help for test4" && die "Test 44 failed"
$PYTHON $TEST help test4|grep -q "Help for test4" || die "Test 45 failed"
# Success, --hint before command, foo shown with test1
$PYTHON $TEST --hint 'XYZ' --help|grep -q "following commands:" && die "Test 46 failed"
$PYTHON $TEST --hint 'XYZ' --help|grep -q "XYZ" && die "Test 47 failed"
$PYTHON $TEST --hint 'XYZ' --help|grep -q "This tool shows how" || die "Test 48 failed"
$PYTHON $TEST --hint 'XYZ' help|grep -q "following commands:" || die "Test 49 failed"
$PYTHON $TEST --hint 'XYZ' help|grep -q "XYZ" && die "Test 50 failed"
$PYTHON $TEST --hint 'XYZ' help|grep -q "This tool shows how" || die "Test 51 failed"
# A command name with an letters, numbers, or an underscore is fine
$PYTHON -c "${IMPORTS}
def test(argv):
return 0
def main(argv):
appcommands.AddCmdFunc('test', test)
appcommands.AddCmdFunc('test_foo', test)
appcommands.AddCmdFunc('a123', test)
appcommands.Run()" test || die "Test 52 failed"
# A command name that starts with a non-alphanumeric characters is not ok
$PYTHON -c "${IMPORTS}
def test(argv):
return 0
def main(argv):
appcommands.AddCmdFunc('123', test)
appcommands.Run()" 123 >/dev/null 2>&1 && die "Test 53 failed"
# A command name that contains other characters is not ok
$PYTHON -c "${IMPORTS}
def test(argv):
return 0
def main(argv):
appcommands.AddCmdFunc('test+1', test)
appcommands.Run()" "test+1" >/dev/null 2>&1 && die "Test 54 failed"
# If a command raises app.UsageError, usage is printed.
RES=`$PYTHON -c "${IMPORTS}
def test(argv):
'''Help1'''
raise app.UsageError('Ha-ha')
def main(argv):
appcommands.AddCmdFunc('test', test)
appcommands.Run()" test` && die "Test 55 failed"
echo "${RES}" | grep -q "USAGE: " || die "Test 56 failed"
echo "${RES}" | grep -q -E "(^| )test[ \t]+Help1($| )" || die "Test 57 failed"
echo "${RES}" | grep -q -E "(^| )Ha-ha($| )" || die "Test 58 failed"
$PYTHON -c "${IMPORTS}
class Test(appcommands.Cmd):
def Run(self, argv): return 0
def test(*args, **kwargs):
return Test(*args, **kwargs)
def main(argv):
appcommands.AddCmd('test', test)
appcommands.Run()" test || die "Test 62 failed"
# Success, default command set and correctly run.
RES=`$PYTHON -c "${IMPORTS}
class test(appcommands.Cmd):
def Run(self, argv):
print 'test running correctly'
return 0
def main(argv):
appcommands.AddCmd('test', test)
appcommands.SetDefaultCommand('test')
appcommands.Run()"` || die "Test 63 failed"
echo "${RES}" | grep -q "test running correctly" || die "Test 64 failed"
# Failure, default command set but missing.
$PYTHON -c "${IMPORTS}
class test(appcommands.Cmd):
def Run(self, argv):
print 'test running correctly'
return 0
def main(argv):
appcommands.AddCmd('test', test)
appcommands.SetDefaultCommand('missing')
appcommands.Run()" >/dev/null 2>&1 && die "Test 65 failed"
echo "PASS"
#! /bin/bash
# Copyright 2003 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS-IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Author: Douglas Greiman
# Owner: unittest-team@google.com
EXE=./basetest_test.py
function die {
echo "$1"
exit $2
}
# Create directories for use
function MaybeMkdir {
for dir in $@; do
if [ ! -d "$dir" ] ; then
mkdir "$dir" || die "Unable to create $dir"
fi
done
}
# TODO(dborowitz): Clean these up if we die.
MaybeMkdir abc cba def fed ghi jkl
# Test assertListEqual, assertDictEqual, and assertSameElements
$EXE --testid=5 || die "Test 5 failed" $?
# Test assertAlmostEqual and assertNotAlmostEqual
$EXE --testid=6 || die "Test 6 failed" $?
# Invoke with no env vars and no flags
(
unset TEST_RANDOM_SEED
unset TEST_SRCDIR
unset TEST_TMPDIR
$EXE --testid=1
) || die "Test 1 failed" $?
# Invoke with env vars but no flags
(
export TEST_RANDOM_SEED=321
export TEST_SRCDIR=cba
export TEST_TMPDIR=fed
$EXE --testid=2
) || die "Test 2 failed" $?
# Invoke with no env vars and all flags
(
unset TEST_RANDOM_SEED
unset TEST_SRCDIR
unset TEST_TMPDIR
$EXE --testid=3 --test_random_seed=123 --test_srcdir=abc --test_tmpdir=def
) || die "Test 3 failed" $?
# Invoke with env vars and all flags
(
export TEST_RANDOM_SEED=321
export TEST_SRCDIR=cba
export TEST_TMPDIR=fed
$EXE --testid=4 --test_random_seed=123 --test_srcdir=abc --test_tmpdir=def
) || die "Test 4 failed" $?
# Cleanup
rm -r abc cba def fed ghi jkl
echo "Pass"

Sorry, the diff of this file is too big to display

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

#!/usr/bin/env python
# Copyright 2002 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS-IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Unittest for datelib.py module."""
import datetime
import random
import time
import pytz
from google.apputils import basetest
from google.apputils import datelib
class TimestampUnitTest(basetest.TestCase):
seed = 1979
def testTzAwareSuccession(self):
a = datelib.Timestamp.now()
b = datelib.Timestamp.utcnow()
self.assertLessEqual(a, b)
def testTzRandomConversion(self):
random.seed(self.seed)
for unused_i in xrange(100):
stz = pytz.timezone(random.choice(pytz.all_timezones))
a = datelib.Timestamp.FromString('2008-04-12T10:00:00', stz)
b = a
for unused_j in xrange(100):
b = b.astimezone(pytz.timezone(random.choice(pytz.all_timezones)))
self.assertEqual(a, b)
random.seed()
def testMicroTimestampConversion(self):
"""Test that f1(f2(a)) == a."""
def IsEq(x):
self.assertEqual(
x, datelib.Timestamp.FromMicroTimestamp(x).AsMicroTimestamp())
IsEq(0)
IsEq(datelib.MAXIMUM_MICROSECOND_TIMESTAMP)
random.seed(self.seed)
for _ in xrange(100):
IsEq(random.randint(0, datelib.MAXIMUM_MICROSECOND_TIMESTAMP))
def testMicroTimestampKnown(self):
self.assertEqual(0, datelib.Timestamp.FromString(
'1970-01-01T00:00:00', pytz.UTC).AsMicroTimestamp())
self.assertEqual(
datelib.MAXIMUM_MICROSECOND_TIMESTAMP,
datelib.MAXIMUM_MICROSECOND_TIMESTAMP_AS_TS.AsMicroTimestamp())
def testMicroTimestampOrdering(self):
"""Test that cmp(a, b) == cmp(f1(a), f1(b))."""
def IsEq(a, b):
self.assertEqual(
cmp(a, b),
cmp(datelib.Timestamp.FromMicroTimestamp(a),
datelib.Timestamp.FromMicroTimestamp(b)))
random.seed(self.seed)
for unused_i in xrange(100):
IsEq(
random.randint(0, datelib.MAXIMUM_MICROSECOND_TIMESTAMP),
random.randint(0, datelib.MAXIMUM_MICROSECOND_TIMESTAMP))
def testCombine(self):
for tz in (datelib.UTC, datelib.US_PACIFIC):
self.assertEqual(
datelib.Timestamp(1970, 1, 1, 0, 0, 0, 0, tz),
datelib.Timestamp.combine(
datelib.datetime.date(1970, 1, 1),
datelib.datetime.time(0, 0, 0),
tz))
self.assertEqual(
datelib.Timestamp(9998, 12, 31, 23, 59, 59, 999999, tz),
datelib.Timestamp.combine(
datelib.datetime.date(9998, 12, 31),
datelib.datetime.time(23, 59, 59, 999999),
tz))
def testFromString1(self):
for string_zero in (
'1970-01-01 00:00:00',
'19700101T000000',
'1970-01-01T00:00:00'
):
for testtz in (datelib.UTC, datelib.US_PACIFIC):
self.assertEqual(
datelib.Timestamp.FromString(string_zero, testtz),
datelib.Timestamp(1970, 1, 1, 0, 0, 0, 0, testtz))
self.assertEqual(
datelib.Timestamp.FromString(
'1970-01-01T00:00:00+0000', datelib.US_PACIFIC),
datelib.Timestamp(1970, 1, 1, 0, 0, 0, 0, datelib.UTC))
startdate = datelib.Timestamp(2009, 1, 1, 3, 0, 0, 0, datelib.US_PACIFIC)
for day in xrange(1, 366):
self.assertEqual(
datelib.Timestamp.FromString(startdate.isoformat()),
startdate,
'FromString works for day %d since 2009-01-01' % day)
startdate += datelib.datetime.timedelta(days=1)
def testFromString2(self):
"""Test correctness of parsing the local time in a given timezone.
The result shall always be the same as tz.localize(naive_time).
"""
baseday = datelib.datetime.date(2009, 1, 1).toordinal()
for day_offset in xrange(0, 365):
day = datelib.datetime.date.fromordinal(baseday + day_offset)
naive_day = datelib.datetime.datetime.combine(
day, datelib.datetime.time(0, 45, 9))
naive_day_str = naive_day.strftime('%Y-%m-%dT%H:%M:%S')
self.assertEqual(
datelib.US_PACIFIC.localize(naive_day),
datelib.Timestamp.FromString(naive_day_str, tz=datelib.US_PACIFIC),
'FromString localizes time incorrectly')
def testFromStringInterval(self):
expected_date = datetime.datetime.utcnow() - datetime.timedelta(days=1)
expected_s = time.mktime(expected_date.utctimetuple())
actual_date = datelib.Timestamp.FromString('1d')
actual_s = time.mktime(actual_date.timetuple())
diff_seconds = actual_s - expected_s
self.assertBetween(diff_seconds, 0, 1)
self.assertRaises(
datelib.TimeParseError, datelib.Timestamp.FromString, 'wat')
def _EpochToDatetime(t, tz=None):
if tz is not None:
return datelib.datetime.datetime.fromtimestamp(t, tz)
else:
return datelib.datetime.datetime.utcfromtimestamp(t)
class DatetimeConversionUnitTest(basetest.TestCase):
def setUp(self):
self.pst = pytz.timezone('US/Pacific')
self.utc = pytz.utc
self.now = time.time()
def testDatetimeToUTCMicros(self):
self.assertEqual(
0, datelib.DatetimeToUTCMicros(_EpochToDatetime(0)))
self.assertEqual(
1001 * long(datelib._MICROSECONDS_PER_SECOND),
datelib.DatetimeToUTCMicros(_EpochToDatetime(1001)))
self.assertEqual(long(self.now * datelib._MICROSECONDS_PER_SECOND),
datelib.DatetimeToUTCMicros(_EpochToDatetime(self.now)))
# tzinfo shouldn't change the result
self.assertEqual(
0, datelib.DatetimeToUTCMicros(_EpochToDatetime(0, tz=self.pst)))
def testDatetimeToUTCMillis(self):
self.assertEqual(
0, datelib.DatetimeToUTCMillis(_EpochToDatetime(0)))
self.assertEqual(
1001 * 1000L, datelib.DatetimeToUTCMillis(_EpochToDatetime(1001)))
self.assertEqual(long(self.now * 1000),
datelib.DatetimeToUTCMillis(_EpochToDatetime(self.now)))
# tzinfo shouldn't change the result
self.assertEqual(
0, datelib.DatetimeToUTCMillis(_EpochToDatetime(0, tz=self.pst)))
def testUTCMicrosToDatetime(self):
self.assertEqual(_EpochToDatetime(0), datelib.UTCMicrosToDatetime(0))
self.assertEqual(_EpochToDatetime(1.000001),
datelib.UTCMicrosToDatetime(1000001))
self.assertEqual(_EpochToDatetime(self.now), datelib.UTCMicrosToDatetime(
long(self.now * datelib._MICROSECONDS_PER_SECOND)))
# Check timezone-aware comparisons
self.assertEqual(_EpochToDatetime(0, self.pst),
datelib.UTCMicrosToDatetime(0, tz=self.pst))
self.assertEqual(_EpochToDatetime(0, self.pst),
datelib.UTCMicrosToDatetime(0, tz=self.utc))
def testUTCMillisToDatetime(self):
self.assertEqual(_EpochToDatetime(0), datelib.UTCMillisToDatetime(0))
self.assertEqual(_EpochToDatetime(1.001), datelib.UTCMillisToDatetime(1001))
t = time.time()
dt = _EpochToDatetime(t)
# truncate sub-milli time
dt -= datelib.datetime.timedelta(microseconds=dt.microsecond % 1000)
self.assertEqual(dt, datelib.UTCMillisToDatetime(long(t * 1000)))
# Check timezone-aware comparisons
self.assertEqual(_EpochToDatetime(0, self.pst),
datelib.UTCMillisToDatetime(0, tz=self.pst))
self.assertEqual(_EpochToDatetime(0, self.pst),
datelib.UTCMillisToDatetime(0, tz=self.utc))
class MicrosecondsToSecondsUnitTest(basetest.TestCase):
def testConversionFromMicrosecondsToSeconds(self):
self.assertEqual(0.0, datelib.MicrosecondsToSeconds(0))
self.assertEqual(7.0, datelib.MicrosecondsToSeconds(7000000))
self.assertEqual(1.234567, datelib.MicrosecondsToSeconds(1234567))
self.assertEqual(12345654321.123456,
datelib.MicrosecondsToSeconds(12345654321123456))
if __name__ == '__main__':
basetest.main()
#!/usr/bin/env python
# Copyright 2007 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS-IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Unittest for common file utilities."""
import __builtin__
import errno
import os
import posix
import pwd
import shutil
import stat
import tempfile
import mox
from google.apputils import basetest
from google.apputils import file_util
import gflags as flags
FLAGS = flags.FLAGS
# pylint is dumb about mox:
# pylint: disable=no-member
class FileUtilTest(basetest.TestCase):
def testHomeDir(self):
self.assertEqual(file_util.HomeDir(), pwd.getpwuid(os.geteuid()).pw_dir)
self.assertEqual(file_util.HomeDir(0), pwd.getpwuid(0).pw_dir)
self.assertEqual(file_util.HomeDir('root'), pwd.getpwnam('root').pw_dir)
class FileUtilTempdirTest(basetest.TestCase):
def setUp(self):
self.temp_dir = tempfile.mkdtemp()
self.file_path = self.temp_dir + 'sample.txt'
self.sample_contents = 'Random text: aldmkfhjwoem103u74.'
# To avoid confusion in the mode tests.
self.prev_umask = posix.umask(0)
def tearDown(self):
shutil.rmtree(self.temp_dir)
posix.umask(self.prev_umask)
def testWriteOverwrite(self):
file_util.Write(self.file_path, 'original contents')
file_util.Write(self.file_path, self.sample_contents)
with open(self.file_path) as fp:
self.assertEquals(fp.read(), self.sample_contents)
def testWriteExclusive(self):
file_util.Write(self.file_path, 'original contents')
self.assertRaises(OSError, file_util.Write, self.file_path,
self.sample_contents, overwrite_existing=False)
def testWriteMode(self):
mode = 0744
file_util.Write(self.file_path, self.sample_contents, mode=mode)
s = os.stat(self.file_path)
self.assertEqual(stat.S_IMODE(s.st_mode), mode)
def testAtomicWriteSuccessful(self):
file_util.AtomicWrite(self.file_path, self.sample_contents)
with open(self.file_path) as fp:
self.assertEquals(fp.read(), self.sample_contents)
def testAtomicWriteMode(self):
mode = 0745
file_util.AtomicWrite(self.file_path, self.sample_contents, mode=mode)
s = os.stat(self.file_path)
self.assertEqual(stat.S_IMODE(s.st_mode), mode)
class FileUtilMoxTestBase(basetest.TestCase):
def setUp(self):
self.mox = mox.Mox()
self.sample_contents = 'Contents of the file'
self.file_path = '/path/to/some/file'
self.fd = 'a file descriptor'
def tearDown(self):
# In case a test fails before it gets to the unset line.
self.mox.UnsetStubs()
class FileUtilMoxTest(FileUtilMoxTestBase):
def testListDirPath(self):
self.mox.StubOutWithMock(os, 'listdir')
dir_contents = ['file1', 'file2', 'file3', 'directory1', 'file4',
'directory2']
os.listdir('/path/to/some/directory').AndReturn(dir_contents)
self.mox.ReplayAll()
self.assertListEqual(file_util.ListDirPath('/path/to/some/directory'),
['%s/%s' % ('/path/to/some/directory', entry)
for entry in dir_contents])
self.mox.VerifyAll()
def testSuccessfulRead(self):
file_handle = self.mox.CreateMockAnything()
self.mox.StubOutWithMock(__builtin__, 'open', use_mock_anything=True)
open(self.file_path).AndReturn(file_handle)
file_handle.__enter__().AndReturn(file_handle)
file_handle.read().AndReturn(self.sample_contents)
file_handle.__exit__(None, None, None)
self.mox.ReplayAll()
try:
self.assertEquals(file_util.Read(self.file_path), self.sample_contents)
self.mox.VerifyAll()
finally:
# Because we mock out the built-in open() function, which the unittest
# library depends on, we need to make sure we revert it before leaving the
# test, otherwise any test failures will cause further internal failures
# and yield no meaningful error output.
self.mox.ResetAll()
self.mox.UnsetStubs()
def testWriteGroup(self):
self.mox.StubOutWithMock(os, 'open')
self.mox.StubOutWithMock(os, 'write')
self.mox.StubOutWithMock(os, 'close')
self.mox.StubOutWithMock(os, 'chown')
gid = 'new gid'
os.open(self.file_path, os.O_WRONLY | os.O_TRUNC | os.O_CREAT,
0666).AndReturn(self.fd)
os.write(self.fd, self.sample_contents)
os.close(self.fd)
os.chown(self.file_path, -1, gid)
self.mox.ReplayAll()
file_util.Write(self.file_path, self.sample_contents, gid=gid)
self.mox.VerifyAll()
class AtomicWriteMoxTest(FileUtilMoxTestBase):
def setUp(self):
super(AtomicWriteMoxTest, self).setUp()
self.mox.StubOutWithMock(tempfile, 'mkstemp')
self.mox.StubOutWithMock(os, 'write')
self.mox.StubOutWithMock(os, 'close')
self.mox.StubOutWithMock(os, 'chmod')
self.mox.StubOutWithMock(os, 'rename')
self.mox.StubOutWithMock(os, 'remove')
self.mode = 'new permissions'
self.gid = 'new gid'
self.temp_filename = '/some/temp/file'
self.os_error = OSError('A problem renaming!')
tempfile.mkstemp(dir='/path/to/some').AndReturn(
(self.fd, self.temp_filename))
os.write(self.fd, self.sample_contents)
os.close(self.fd)
os.chmod(self.temp_filename, self.mode)
def tearDown(self):
self.mox.UnsetStubs()
def testAtomicWriteGroup(self):
self.mox.StubOutWithMock(os, 'chown')
os.chown(self.temp_filename, -1, self.gid)
os.rename(self.temp_filename, self.file_path)
self.mox.ReplayAll()
file_util.AtomicWrite(self.file_path, self.sample_contents,
mode=self.mode, gid=self.gid)
self.mox.VerifyAll()
def testAtomicWriteGroupError(self):
self.mox.StubOutWithMock(os, 'chown')
os.chown(self.temp_filename, -1, self.gid).AndRaise(self.os_error)
os.remove(self.temp_filename)
self.mox.ReplayAll()
self.assertRaises(OSError, file_util.AtomicWrite, self.file_path,
self.sample_contents, mode=self.mode, gid=self.gid)
self.mox.VerifyAll()
def testRenamingError(self):
os.rename(self.temp_filename, self.file_path).AndRaise(self.os_error)
os.remove(self.temp_filename)
self.mox.ReplayAll()
self.assertRaises(OSError, file_util.AtomicWrite, self.file_path,
self.sample_contents, mode=self.mode)
self.mox.VerifyAll()
def testRenamingErrorWithRemoveError(self):
extra_error = OSError('A problem removing!')
os.rename(self.temp_filename, self.file_path).AndRaise(self.os_error)
os.remove(self.temp_filename).AndRaise(extra_error)
self.mox.ReplayAll()
try:
file_util.AtomicWrite(self.file_path, self.sample_contents,
mode=self.mode)
except OSError as e:
self.assertEquals(str(e),
'A problem renaming!. Additional errors cleaning up: '
'A problem removing!')
else:
raise self.failureException('OSError not raised by AtomicWrite')
self.mox.VerifyAll()
class TemporaryFilesMoxTest(FileUtilMoxTestBase):
def testTemporaryFileWithContents(self):
contents = 'Inspiration!'
with file_util.TemporaryFileWithContents(contents) as temporary_file:
filename = temporary_file.name
contents_read = open(temporary_file.name).read()
self.assertEqual(contents_read, contents)
# Ensure that the file does not exist.
self.assertFalse(os.path.exists(filename))
class MkDirsMoxTest(FileUtilMoxTestBase):
# pylint is dumb about mox:
# pylint: disable=maybe-no-member
def setUp(self):
super(MkDirsMoxTest, self).setUp()
self.mox.StubOutWithMock(os, 'mkdir')
self.mox.StubOutWithMock(os, 'chmod')
self.mox.StubOutWithMock(os.path, 'isdir')
self.dir_tree = ['/path', 'to', 'some', 'directory']
def tearDown(self):
self.mox.UnsetStubs()
def testNoErrorsAbsoluteOneDir(self):
# record, replay
os.mkdir('/foo')
self.mox.ReplayAll()
# test, verify
file_util.MkDirs('/foo')
self.mox.VerifyAll()
def testNoErrorsAbsoluteOneDirWithForceMode(self):
# record, replay
os.mkdir('/foo')
os.chmod('/foo', 0707)
self.mox.ReplayAll()
# test, verify
file_util.MkDirs('/foo', force_mode=0707)
self.mox.VerifyAll()
def testNoErrorsExistingDirWithForceMode(self):
exist_error = OSError(errno.EEXIST, 'This string not used')
# record, replay
os.mkdir('/foo').AndRaise(exist_error)
# no chmod is called since the dir exists
os.path.isdir('/foo').AndReturn(True)
self.mox.ReplayAll()
# test, verify
file_util.MkDirs('/foo', force_mode=0707)
self.mox.VerifyAll()
def testNoErrorsAbsoluteSlashDot(self):
# record, replay
os.mkdir('/foo')
self.mox.ReplayAll()
# test, verify
file_util.MkDirs('/foo/.')
self.mox.VerifyAll()
def testNoErrorsAbsoluteExcessiveSlashDot(self):
"""See that normpath removes irrelevant .'s in the path."""
# record, replay
os.mkdir('/foo')
os.mkdir('/foo/bar')
self.mox.ReplayAll()
# test, verify
file_util.MkDirs('/./foo/./././bar/.')
self.mox.VerifyAll()
def testNoErrorsAbsoluteTwoDirs(self):
# record, replay
os.mkdir('/foo')
os.mkdir('/foo/bar')
self.mox.ReplayAll()
# test, verify
file_util.MkDirs('/foo/bar')
self.mox.VerifyAll()
def testNoErrorsPartialTwoDirsWithForceMode(self):
exist_error = OSError(errno.EEXIST, 'This string not used')
# record, replay
os.mkdir('/foo').AndRaise(exist_error) # /foo exists
os.path.isdir('/foo').AndReturn(True)
os.mkdir('/foo/bar') # bar does not
os.chmod('/foo/bar', 0707)
self.mox.ReplayAll()
# test, verify
file_util.MkDirs('/foo/bar', force_mode=0707)
self.mox.VerifyAll()
def testNoErrorsRelativeOneDir(self):
# record, replay
os.mkdir('foo')
self.mox.ReplayAll()
# test, verify
file_util.MkDirs('foo')
self.mox.VerifyAll()
def testNoErrorsRelativeTwoDirs(self):
# record, replay
os.mkdir('foo')
os.mkdir('foo/bar')
self.mox.ReplayAll()
# test, verify
file_util.MkDirs('foo/bar')
self.mox.VerifyAll()
def testDirectoriesExist(self):
exist_error = OSError(errno.EEXIST, 'This string not used')
# record, replay
for i in range(len(self.dir_tree)):
path = os.path.join(*self.dir_tree[:i+1])
os.mkdir(path).AndRaise(exist_error)
os.path.isdir(path).AndReturn(True)
self.mox.ReplayAll()
# test, verify
file_util.MkDirs(os.path.join(*self.dir_tree))
self.mox.VerifyAll()
def testFileInsteadOfDirectory(self):
exist_error = OSError(errno.EEXIST, 'This string not used')
path = self.dir_tree[0]
# record, replay
os.mkdir(path).AndRaise(exist_error)
os.path.isdir(path).AndReturn(False)
self.mox.ReplayAll()
# test, verify
self.assertRaises(OSError, file_util.MkDirs, os.path.join(*self.dir_tree))
self.mox.VerifyAll()
def testNonExistsError(self):
non_exist_error = OSError(errno.ETIMEDOUT, 'This string not used')
path = self.dir_tree[0]
# record, replay
os.mkdir(path).AndRaise(non_exist_error)
self.mox.ReplayAll()
# test, verify
self.assertRaises(OSError, file_util.MkDirs, os.path.join(*self.dir_tree))
self.mox.VerifyAll()
class RmDirsTestCase(mox.MoxTestBase):
def testRmDirs(self):
test_sandbox = os.path.join(FLAGS.test_tmpdir, 'test-rm-dirs')
test_dir = os.path.join(test_sandbox, 'test', 'dir')
os.makedirs(test_sandbox)
with open(os.path.join(test_sandbox, 'file'), 'w'):
pass
os.makedirs(test_dir)
with open(os.path.join(test_dir, 'file'), 'w'):
pass
file_util.RmDirs(test_dir)
self.assertFalse(os.path.exists(os.path.join(test_sandbox, 'test')))
self.assertTrue(os.path.exists(os.path.join(test_sandbox, 'file')))
shutil.rmtree(test_sandbox)
def testRmDirsForNonExistingDirectory(self):
self.mox.StubOutWithMock(os, 'rmdir')
os.rmdir('path/to')
os.rmdir('path')
self.mox.StubOutWithMock(shutil, 'rmtree')
shutil.rmtree('path/to/directory').AndRaise(
OSError(errno.ENOENT, "No such file or directory 'path/to/directory'"))
self.mox.ReplayAll()
file_util.RmDirs('path/to/directory')
self.mox.VerifyAll()
def testRmDirsForNonExistingParentDirectory(self):
self.mox.StubOutWithMock(os, 'rmdir')
os.rmdir('path/to').AndRaise(
OSError(errno.ENOENT, "No such file or directory 'path/to'"))
os.rmdir('path')
self.mox.StubOutWithMock(shutil, 'rmtree')
shutil.rmtree('path/to/directory').AndRaise(
OSError(errno.ENOENT, "No such file or directory 'path/to/directory'"))
self.mox.ReplayAll()
file_util.RmDirs('path/to/directory')
self.mox.VerifyAll()
def testRmDirsForNotEmptyDirectory(self):
self.mox.StubOutWithMock(os, 'rmdir')
os.rmdir('path/to').AndRaise(
OSError(errno.ENOTEMPTY, 'Directory not empty', 'path/to'))
self.mox.StubOutWithMock(shutil, 'rmtree')
shutil.rmtree('path/to/directory')
self.mox.ReplayAll()
file_util.RmDirs('path/to/directory')
self.mox.VerifyAll()
def testRmDirsForPermissionDeniedOnParentDirectory(self):
self.mox.StubOutWithMock(os, 'rmdir')
os.rmdir('path/to').AndRaise(
OSError(errno.EACCES, 'Permission denied', 'path/to'))
self.mox.StubOutWithMock(shutil, 'rmtree')
shutil.rmtree('path/to/directory')
self.mox.ReplayAll()
file_util.RmDirs('path/to/directory')
self.mox.VerifyAll()
def testRmDirsWithSimplePath(self):
self.mox.StubOutWithMock(shutil, 'rmtree')
shutil.rmtree('directory')
self.mox.ReplayAll()
file_util.RmDirs('directory')
self.mox.VerifyAll()
if __name__ == '__main__':
basetest.main()
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""Test for google.apputils.humanize."""
import datetime
from google.apputils import basetest
from google.apputils import humanize
class HumanizeTest(basetest.TestCase):
def testCommas(self):
self.assertEqual('0', humanize.Commas(0))
self.assertEqual('100', humanize.Commas(100))
self.assertEqual('1,000', humanize.Commas(1000))
self.assertEqual('10,000', humanize.Commas(10000))
self.assertEqual('1,000,000', humanize.Commas(1e6))
self.assertEqual('-1,000,000', humanize.Commas(-1e6))
def testPlural(self):
self.assertEqual('0 objects', humanize.Plural(0, 'object'))
self.assertEqual('1 object', humanize.Plural(1, 'object'))
self.assertEqual('-1 objects', humanize.Plural(-1, 'object'))
self.assertEqual('42 objects', humanize.Plural(42, 'object'))
self.assertEqual('42 cats', humanize.Plural(42, 'cat'))
self.assertEqual('42 glasses', humanize.Plural(42, 'glass'))
self.assertEqual('42 potatoes', humanize.Plural(42, 'potato'))
self.assertEqual('42 cherries', humanize.Plural(42, 'cherry'))
self.assertEqual('42 monkeys', humanize.Plural(42, 'monkey'))
self.assertEqual('42 oxen', humanize.Plural(42, 'ox', 'oxen'))
self.assertEqual('42 indices', humanize.Plural(42, 'index'))
self.assertEqual(
'42 attorneys general',
humanize.Plural(42, 'attorney general', 'attorneys general'))
def testPluralWord(self):
self.assertEqual('vaxen', humanize.PluralWord(2, 'vax', plural='vaxen'))
self.assertEqual('cores', humanize.PluralWord(2, 'core'))
self.assertEqual('group', humanize.PluralWord(1, 'group'))
self.assertEqual('cells', humanize.PluralWord(0, 'cell'))
self.assertEqual('degree', humanize.PluralWord(1.0, 'degree'))
self.assertEqual('helloes', humanize.PluralWord(3.14, 'hello'))
def testWordSeries(self):
self.assertEqual('', humanize.WordSeries([]))
self.assertEqual('foo', humanize.WordSeries(['foo']))
self.assertEqual('foo and bar', humanize.WordSeries(['foo', 'bar']))
self.assertEqual(
'foo, bar, and baz', humanize.WordSeries(['foo', 'bar', 'baz']))
self.assertEqual(
'foo, bar, or baz', humanize.WordSeries(['foo', 'bar', 'baz'],
conjunction='or'))
def testAddIndefiniteArticle(self):
self.assertEqual('a thing', humanize.AddIndefiniteArticle('thing'))
self.assertEqual('an object', humanize.AddIndefiniteArticle('object'))
self.assertEqual('a Porsche', humanize.AddIndefiniteArticle('Porsche'))
self.assertEqual('an Audi', humanize.AddIndefiniteArticle('Audi'))
def testDecimalPrefix(self):
self.assertEqual('0 m', humanize.DecimalPrefix(0, 'm'))
self.assertEqual('1 km', humanize.DecimalPrefix(1000, 'm'))
self.assertEqual('-1 km', humanize.DecimalPrefix(-1000, 'm'))
self.assertEqual('10 Gbps', humanize.DecimalPrefix(10e9, 'bps'))
self.assertEqual('6000 Yg', humanize.DecimalPrefix(6e27, 'g'))
self.assertEqual('12.1 km', humanize.DecimalPrefix(12100, 'm', precision=3))
self.assertEqual('12 km', humanize.DecimalPrefix(12100, 'm', precision=2))
self.assertEqual('1.15 km', humanize.DecimalPrefix(1150, 'm', precision=3))
self.assertEqual('-1.15 km', humanize.DecimalPrefix(-1150, 'm',
precision=3))
self.assertEqual('1.1 s', humanize.DecimalPrefix(1.12, 's', precision=2))
self.assertEqual('-1.1 s', humanize.DecimalPrefix(-1.12, 's', precision=2))
self.assertEqual('nan bps', humanize.DecimalPrefix(float('nan'), 'bps'))
self.assertEqual('inf bps', humanize.DecimalPrefix(float('inf'), 'bps'))
self.assertEqual('-inf bps', humanize.DecimalPrefix(float('-inf'), 'bps'))
self.assertEqual('-4 mm',
humanize.DecimalPrefix(-0.004, 'm', min_scale=None))
self.assertEqual('0 m', humanize.DecimalPrefix(0, 'm', min_scale=None))
self.assertEqual(
u'1 µs',
humanize.DecimalPrefix(0.0000013, 's', min_scale=None))
self.assertEqual('3 km', humanize.DecimalPrefix(3000, 'm', min_scale=None))
self.assertEqual(
'5000 TB',
humanize.DecimalPrefix(5e15, 'B', max_scale=4))
self.assertEqual(
'5 mSWE',
humanize.DecimalPrefix(0.005, 'SWE', min_scale=None))
self.assertEqual(
'0.0005 ms',
humanize.DecimalPrefix(5e-7, 's', min_scale=-1, precision=2))
def testBinaryPrefix(self):
self.assertEqual('0 B', humanize.BinaryPrefix(0, 'B'))
self.assertEqual('1000 B', humanize.BinaryPrefix(1000, 'B'))
self.assertEqual('1 KiB', humanize.BinaryPrefix(1024, 'B'))
self.assertEqual('64 GiB', humanize.BinaryPrefix(2**36, 'B'))
self.assertEqual('65536 Yibit', humanize.BinaryPrefix(2**96, 'bit'))
self.assertEqual('1.25 KiB', humanize.BinaryPrefix(1280, 'B', precision=3))
self.assertEqual('1.2 KiB', humanize.BinaryPrefix(1280, 'B', precision=2))
def testScale(self):
self.assertEqual((12.1, 'km'), humanize.DecimalScale(12100, 'm'))
self.assertEqual((1.15, 'Mm'), humanize.DecimalScale(1150000, 'm'))
self.assertEqual((450, 'mSWE'),
humanize.DecimalScale(0.45, 'SWE', min_scale=None))
self.assertEqual(
(250, u'µm'),
humanize.DecimalScale(1.0 / (4 * 1000), 'm', min_scale=None))
value, unit = humanize.BinaryScale(200000000000, 'B')
self.assertAlmostEqual(value, 186.26, 2)
self.assertEqual(unit, 'GiB')
value, unit = humanize.BinaryScale(3000000000000, 'B')
self.assertAlmostEqual(value, 2.728, 3)
self.assertEqual(unit, 'TiB')
def testPrettyFraction(self):
# No rounded integer part
self.assertEqual(u'½', humanize.PrettyFraction(0.5))
# Roundeded integer + fraction
self.assertEqual(u'6⅔', humanize.PrettyFraction(20.0 / 3.0))
# Rounded integer, no fraction
self.assertEqual(u'2', humanize.PrettyFraction(2.00001))
# No rounded integer, no fraction
self.assertEqual(u'0', humanize.PrettyFraction(0.001))
# Round up
self.assertEqual(u'1', humanize.PrettyFraction(0.99))
# No round up, edge case
self.assertEqual(u'⅞', humanize.PrettyFraction(0.9))
# Negative fraction
self.assertEqual(u'-⅕', humanize.PrettyFraction(-0.2))
# Negative close to zero (should not be -0)
self.assertEqual(u'0', humanize.PrettyFraction(-0.001))
# Smallest fraction that should round down.
self.assertEqual(u'0', humanize.PrettyFraction(1.0 / 16.0))
# Largest fraction should round up.
self.assertEqual(u'1', humanize.PrettyFraction(15.0 / 16.0))
# Integer zero.
self.assertEqual(u'0', humanize.PrettyFraction(0))
# Check that division yields fraction
self.assertEqual(u'⅘', humanize.PrettyFraction(4.0 / 5.0))
# Custom spacer.
self.assertEqual(u'2 ½', humanize.PrettyFraction(2.5, spacer=' '))
def testDuration(self):
self.assertEqual('2h', humanize.Duration(7200))
self.assertEqual('5d 13h 47m 12s', humanize.Duration(481632))
self.assertEqual('0s', humanize.Duration(0))
self.assertEqual('59s', humanize.Duration(59))
self.assertEqual('1m', humanize.Duration(60))
self.assertEqual('1m 1s', humanize.Duration(61))
self.assertEqual('1h 1s', humanize.Duration(3601))
self.assertEqual('2h-2s', humanize.Duration(7202, separator='-'))
def testLargeDuration(self):
# The maximum seconds and days that can be stored in a datetime.timedelta
# object, as seconds. max_days is equal to MAX_DELTA_DAYS in Python's
# Modules/datetimemodule.c, converted to seconds.
max_seconds = 3600 * 24 - 1
max_days = 999999999 * 24 * 60 * 60
self.assertEqual('999999999d', humanize.Duration(max_days))
self.assertEqual('999999999d 23h 59m 59s',
humanize.Duration(max_days + max_seconds))
self.assertEqual('>=999999999d 23h 59m 60s',
humanize.Duration(max_days + max_seconds + 1))
def testTimeDelta(self):
self.assertEqual('0s', humanize.TimeDelta(datetime.timedelta()))
self.assertEqual('2h', humanize.TimeDelta(datetime.timedelta(hours=2)))
self.assertEqual('1m', humanize.TimeDelta(datetime.timedelta(minutes=1)))
self.assertEqual('5d', humanize.TimeDelta(datetime.timedelta(days=5)))
self.assertEqual('1.25s', humanize.TimeDelta(
datetime.timedelta(seconds=1, microseconds=250000)))
self.assertEqual('1.5s',
humanize.TimeDelta(datetime.timedelta(seconds=1.5)))
self.assertEqual('4d 10h 5m 12.25s', humanize.TimeDelta(
datetime.timedelta(days=4, hours=10, minutes=5, seconds=12,
microseconds=250000)))
class NaturalSortKeyChunkingTest(basetest.TestCase):
def testChunkifySingleChars(self):
self.assertListEqual(
humanize.NaturalSortKey('a1b2c3'),
['a', 1, 'b', 2, 'c', 3])
def testChunkifyMultiChars(self):
self.assertListEqual(
humanize.NaturalSortKey('aa11bb22cc33'),
['aa', 11, 'bb', 22, 'cc', 33])
def testChunkifyComplex(self):
self.assertListEqual(
humanize.NaturalSortKey('one 11 -- two 44'),
['one ', 11, ' -- two ', 44])
class NaturalSortKeysortTest(basetest.TestCase):
def testNaturalSortKeySimpleWords(self):
self.test = ['pair', 'banana', 'apple']
self.good = ['apple', 'banana', 'pair']
self.test.sort(key=humanize.NaturalSortKey)
self.assertListEqual(self.test, self.good)
def testNaturalSortKeySimpleNums(self):
self.test = ['3333', '2222', '9999', '0000']
self.good = ['0000', '2222', '3333', '9999']
self.test.sort(key=humanize.NaturalSortKey)
self.assertListEqual(self.test, self.good)
def testNaturalSortKeySimpleDigits(self):
self.test = ['8', '3', '2']
self.good = ['2', '3', '8']
self.test.sort(key=humanize.NaturalSortKey)
self.assertListEqual(self.test, self.good)
def testVersionStrings(self):
self.test = ['1.2', '0.9', '1.1a2', '1.1a', '1', '1.2.1', '0.9.1']
self.good = ['0.9', '0.9.1', '1', '1.1a', '1.1a2', '1.2', '1.2.1']
self.test.sort(key=humanize.NaturalSortKey)
self.assertListEqual(self.test, self.good)
def testNaturalSortKeySimpleNumLong(self):
self.test = ['11', '9', '1', '200', '19', '20', '900']
self.good = ['1', '9', '11', '19', '20', '200', '900']
self.test.sort(key=humanize.NaturalSortKey)
self.assertListEqual(self.test, self.good)
def testNaturalSortKeyAlNum(self):
self.test = ['x10', 'x9', 'x1', 'x11']
self.good = ['x1', 'x9', 'x10', 'x11']
self.test.sort(key=humanize.NaturalSortKey)
self.assertListEqual(self.test, self.good)
def testNaturalSortKeyNumAlNum(self):
self.test = ['4x10', '4x9', '4x11', '5yy4', '3x1', '2x11']
self.good = ['2x11', '3x1', '4x9', '4x10', '4x11', '5yy4']
self.test.sort(key=humanize.NaturalSortKey)
self.assertListEqual(self.test, self.good)
def testNaturalSortKeyAlNumAl(self):
self.test = ['a9c', 'a4b', 'a10c', 'a1c', 'c10c', 'c10a', 'c9a']
self.good = ['a1c', 'a4b', 'a9c', 'a10c', 'c9a', 'c10a', 'c10c']
self.test.sort(key=humanize.NaturalSortKey)
self.assertListEqual(self.test, self.good)
class NaturalSortKeyBigTest(basetest.TestCase):
def testBig(self):
self.test = [
'1000X Radonius Maximus', '10X Radonius', '200X Radonius',
'20X Radonius', '20X Radonius Prime', '30X Radonius',
'40X Radonius', 'Allegia 50 Clasteron', 'Allegia 500 Clasteron',
'Allegia 51 Clasteron', 'Allegia 51B Clasteron',
'Allegia 52 Clasteron', 'Allegia 60 Clasteron', 'Alpha 100',
'Alpha 2', 'Alpha 200', 'Alpha 2A', 'Alpha 2A-8000', 'Alpha 2A-900',
'Callisto Morphamax', 'Callisto Morphamax 500',
'Callisto Morphamax 5000', 'Callisto Morphamax 600',
'Callisto Morphamax 700', 'Callisto Morphamax 7000',
'Callisto Morphamax 7000 SE', 'Callisto Morphamax 7000 SE2',
'QRS-60 Intrinsia Machine', 'QRS-60F Intrinsia Machine',
'QRS-62 Intrinsia Machine', 'QRS-62F Intrinsia Machine',
'Xiph Xlater 10000', 'Xiph Xlater 2000', 'Xiph Xlater 300',
'Xiph Xlater 40', 'Xiph Xlater 5', 'Xiph Xlater 50',
'Xiph Xlater 500', 'Xiph Xlater 5000', 'Xiph Xlater 58']
self.good = [
'10X Radonius',
'20X Radonius',
'20X Radonius Prime',
'30X Radonius',
'40X Radonius',
'200X Radonius',
'1000X Radonius Maximus',
'Allegia 50 Clasteron',
'Allegia 51 Clasteron',
'Allegia 51B Clasteron',
'Allegia 52 Clasteron',
'Allegia 60 Clasteron',
'Allegia 500 Clasteron',
'Alpha 2',
'Alpha 2A',
'Alpha 2A-900',
'Alpha 2A-8000',
'Alpha 100',
'Alpha 200',
'Callisto Morphamax',
'Callisto Morphamax 500',
'Callisto Morphamax 600',
'Callisto Morphamax 700',
'Callisto Morphamax 5000',
'Callisto Morphamax 7000',
'Callisto Morphamax 7000 SE',
'Callisto Morphamax 7000 SE2',
'QRS-60 Intrinsia Machine',
'QRS-60F Intrinsia Machine',
'QRS-62 Intrinsia Machine',
'QRS-62F Intrinsia Machine',
'Xiph Xlater 5',
'Xiph Xlater 40',
'Xiph Xlater 50',
'Xiph Xlater 58',
'Xiph Xlater 300',
'Xiph Xlater 500',
'Xiph Xlater 2000',
'Xiph Xlater 5000',
'Xiph Xlater 10000',
]
self.test.sort(key=humanize.NaturalSortKey)
self.assertListEqual(self.test, self.good)
if __name__ == '__main__':
basetest.main()
#!/usr/bin/env python
# Copyright 2010 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS-IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests for the resources module."""
__author__ = 'dborowitz@google.com (Dave Borowitz)'
from google.apputils import basetest
from google.apputils import file_util
from google.apputils import resources
PREFIX = __name__ + ':data/'
class ResourcesTest(basetest.TestCase):
def _CheckTestData(self, func):
self.assertEqual('test file a contents\n', func(PREFIX + 'a'))
self.assertEqual('test file b contents\n', func(PREFIX + 'b'))
def testGetResource(self):
self._CheckTestData(resources.GetResource)
def testGetResourceAsFile(self):
self._CheckTestData(lambda n: resources.GetResourceAsFile(n).read())
def testGetResourceFilename(self):
self._CheckTestData(
lambda n: file_util.Read(resources.GetResourceFilename(n)))
if __name__ == '__main__':
basetest.main()
#!/usr/bin/env python
# Copyright 2010 Google Inc. All Rights Reserved.
"""Tests for google.apputils.
In addition to the test modules under this package, we have a special TestCase
that runs the tests that are shell scripts.
"""
# TODO(dborowitz): It may be useful to generalize this and provide it to users
# who want to run their own sh_tests.
import os
import subprocess
import sys
from google.apputils import basetest
import gflags
FLAGS = gflags.FLAGS
class ShellScriptTests(basetest.TestCase):
"""TestCase that runs the various *test.sh scripts."""
def RunTestScript(self, script_name):
tests_path = os.path.dirname(__file__)
sh_test_path = os.path.realpath(os.path.join(tests_path, script_name))
env = {
# Setuptools puts dependency eggs in our path, so propagate that.
'PYTHONPATH': os.pathsep.join(sys.path),
'TEST_TMPDIR': FLAGS.test_tmpdir,
}
p = subprocess.Popen(sh_test_path, cwd=tests_path, env=env)
self.assertEqual(0, p.wait())
def testBaseTest(self):
self.RunTestScript('basetest_sh_test.sh')
def testApp(self):
self.RunTestScript('app_unittest.sh')
def testAppCommands(self):
self.RunTestScript('appcommands_unittest.sh')
if __name__ == '__main__':
basetest.main()
#!/usr/bin/env python
# This code must be source compatible with Python 2.4 through 3.3.
#
# Copyright 2003 Google Inc. All Rights Reserved.
"""Unittest for shellutil module."""
import os
# Use unittest instead of basetest to avoid bootstrap issues / circular deps.
import unittest
from google.apputils import shellutil
# Running windows?
win32 = (os.name == 'nt')
class ShellUtilUnitTest(unittest.TestCase):
def testShellEscapeList(self):
# TODO(user): Actually run some shell commands and test the
# shell escaping works properly.
# Empty list
words = []
self.assertEqual(shellutil.ShellEscapeList(words), '')
# Empty string
words = ['']
self.assertEqual(shellutil.ShellEscapeList(words), "''")
# Single word
words = ['foo']
self.assertEqual(shellutil.ShellEscapeList(words), "'foo'")
# Single word with single quote
words = ["foo'bar"]
expected = """ 'foo'"'"'bar' """.strip()
self.assertEqual(shellutil.ShellEscapeList(words), expected)
# .. double quote
words = ['foo"bar']
expected = """ 'foo"bar' """.strip()
self.assertEqual(shellutil.ShellEscapeList(words), expected)
# Multiple words
words = ['foo', 'bar']
self.assertEqual(shellutil.ShellEscapeList(words), "'foo' 'bar'")
# Words with spaces
words = ['foo', 'bar', "foo'' ''bar"]
expected = """ 'foo' 'bar' 'foo'"'"''"'"' '"'"''"'"'bar' """.strip()
self.assertEqual(shellutil.ShellEscapeList(words), expected)
# Now I'm just being mean
words = ['foo', 'bar', """ ""'"'" """.strip()]
expected = """ 'foo' 'bar' '""'"'"'"'"'"'"' """.strip()
self.assertEqual(shellutil.ShellEscapeList(words), expected)
def testShellifyStatus(self):
if not win32:
self.assertEqual(shellutil.ShellifyStatus(0), 0)
self.assertEqual(shellutil.ShellifyStatus(1), 129)
self.assertEqual(shellutil.ShellifyStatus(1 * 256), 1)
if __name__ == '__main__':
unittest.main()
#!/usr/bin/env python
# Copyright 2006 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS-IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests for the stopwatch module."""
__author__ = 'dbentley@google.com (Dan Bentley)'
from google.apputils import basetest
import gflags as flags
from google.apputils import stopwatch
FLAGS = flags.FLAGS
class StubTime(object):
"""Simple stub replacement for the time module.
Only useful for relative calculations, since it always starts at 0.
"""
# These method names match the standard library time module.
def __init__(self):
self._counter = 0
def time(self):
"""Get the time for this time object.
A call is always guaranteed to be greater than the previous one.
Returns:
A monotonically increasing time.
"""
self._counter += 0.0001
return self._counter
def sleep(self, time):
"""Simulate sleeping for the specified number of seconds."""
self._counter += time
class StopwatchUnitTest(basetest.TestCase):
"""Stopwatch tests.
These tests are tricky because timing is difficult.
Therefore, we test the structure of the results but not the results
themselves for fear it would lead to intermittent but persistent
failures.
"""
def setUp(self):
self.time = StubTime()
stopwatch.time = self.time
def testResults(self):
sw = stopwatch.StopWatch()
sw.start()
sw.stop()
results = sw.results()
self.assertListEqual([r[0] for r in results], ['total'])
results = sw.results(verbose=1)
self.assertListEqual([r[0] for r in results], ['overhead', 'total'])
# test tally part of results.
sw.start('ron')
sw.stop('ron')
sw.start('ron')
sw.stop('ron')
results = sw.results()
results = sw.results(verbose=1)
for r in results:
if r[0] == 'ron':
assert r[2] == 2
def testSeveralTimes(self):
sw = stopwatch.StopWatch()
sw.start()
sw.start('a')
sw.start('b')
self.time.sleep(1)
sw.stop('b')
sw.stop('a')
sw.stop()
results = sw.results(verbose=1)
self.assertListEqual([r[0] for r in results],
['a', 'b', 'overhead', 'total'])
# Make sure overhead is positive
self.assertEqual(results[2][1] > 0, 1)
def testNoStopOthers(self):
sw = stopwatch.StopWatch()
sw.start()
sw.start('a')
sw.start('b', stop_others=0)
self.time.sleep(1)
sw.stop('b')
sw.stop('a')
sw.stop()
#overhead should be negative, because we ran two timers simultaneously
#It is possible that this could fail in outlandish circumstances.
#If this is a problem in practice, increase the value of the call to
#time.sleep until it passes consistently.
#Or, consider finding a platform where the two calls sw.start() and
#sw.start('a') happen within 1 second.
results = sw.results(verbose=1)
self.assertEqual(results[2][1] < 0, 1)
def testStopNonExistentTimer(self):
sw = stopwatch.StopWatch()
self.assertRaises(RuntimeError, sw.stop)
self.assertRaises(RuntimeError, sw.stop, 'foo')
def testResultsDoesntCrashWhenUnstarted(self):
sw = stopwatch.StopWatch()
sw.results()
def testResultsDoesntCrashWhenUnstopped(self):
sw = stopwatch.StopWatch()
sw.start()
sw.results()
def testTimerValue(self):
sw = stopwatch.StopWatch()
self.assertAlmostEqual(0, sw.timervalue('a'), 2)
sw.start('a')
self.assertAlmostEqual(0, sw.timervalue('a'), 2)
self.time.sleep(1)
self.assertAlmostEqual(1, sw.timervalue('a'), 2)
sw.stop('a')
self.assertAlmostEqual(1, sw.timervalue('a'), 2)
sw.start('a')
self.time.sleep(1)
self.assertAlmostEqual(2, sw.timervalue('a'), 2)
sw.stop('a')
self.assertAlmostEqual(2, sw.timervalue('a'), 2)
def testResultsDoesntReset(self):
sw = stopwatch.StopWatch()
sw.start()
self.time.sleep(1)
sw.start('a')
self.time.sleep(1)
sw.stop('a')
sw.stop()
res1 = sw.results(verbose=True)
res2 = sw.results(verbose=True)
self.assertListEqual(res1, res2)
if __name__ == '__main__':
basetest.main()