Latest Threat Research:SANDWORM_MODE: Shai-Hulud-Style npm Worm Hijacks CI Workflows and Poisons AI Toolchains.Details
Socket
Book a DemoInstallSign in
Socket

google-apputils

Package Overview
Dependencies
Maintainers
3
Versions
8
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

google-apputils - npm Package Compare versions

Comparing version
0.3.0
to
0.4.0
+455
google/apputils/humanize.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright 2008 Google Inc. All Rights Reserved.
"""Lightweight routines for producing more friendly output.
Usage examples:
'New messages: %s' % humanize.Commas(star_count)
-> 'New messages: 58,192'
'Found %s.' % humanize.Plural(error_count, 'error')
-> 'Found 2 errors.'
'Found %s.' % humanize.Plural(error_count, 'ox', 'oxen')
-> 'Found 2 oxen.'
'Copied at %s.' % humanize.DecimalPrefix(rate, 'bps', precision=3)
-> 'Copied at 42.6 Mbps.'
'Free RAM: %s' % humanize.BinaryPrefix(bytes_free, 'B')
-> 'Free RAM: 742 MiB'
'Finished all tasks in %s.' % humanize.Duration(elapsed_time)
-> 'Finished all tasks in 34m 5s.'
These libraries are not a substitute for full localization. If you
need localization, then you will have to think about translating
strings, formatting numbers in different ways, and so on. Use
ICU if your application is user-facing. Use these libraries if
your application is an English-only internal tool, and you are
tired of seeing "1 results" or "3450134804 bytes used".
Compare humanize.*Prefix() to C++ utilites HumanReadableNumBytes and
HumanReadableInt in strings/human_readable.h.
"""
import datetime
import math
import re
SIBILANT_ENDINGS = frozenset(['sh', 'ss', 'tch', 'ax', 'ix', 'ex'])
DIGIT_SPLITTER = re.compile(r'\d+|\D+').findall
# These are included because they are common technical terms.
SPECIAL_PLURALS = {
'index': 'indices',
'matrix': 'matrices',
'vertex': 'vertices',
}
VOWELS = frozenset('AEIOUaeiou')
# In Python 2.6, int(float('nan')) intentionally raises a TypeError. This code
# attempts to futureproof us against that eventual upgrade.
try:
_IsNan = math.isnan
except AttributeError:
def _IsNan(x):
return type(x) is float and x != x
def Commas(value):
"""Formats an integer with thousands-separating commas.
Args:
value: An integer.
Returns:
A string.
"""
if value < 0:
sign = '-'
value = -value
else:
sign = ''
result = []
while value >= 1000:
result.append('%03d' % (value % 1000))
value /= 1000
result.append('%d' % value)
return sign + ','.join(reversed(result))
def Plural(quantity, singular, plural=None):
"""Formats an integer and a string into a single pluralized string.
Args:
quantity: An integer.
singular: A string, the singular form of a noun.
plural: A string, the plural form. If not specified, then simple
English rules of regular pluralization will be used.
Returns:
A string.
"""
return '%d %s' % (quantity, PluralWord(quantity, singular, plural))
def PluralWord(quantity, singular, plural=None):
"""Builds the plural of an English word.
Args:
quantity: An integer.
singular: A string, the singular form of a noun.
plural: A string, the plural form. If not specified, then simple
English rules of regular pluralization will be used.
Returns:
the plural form of the word.
"""
if quantity == 1:
return singular
if plural:
return plural
if singular in SPECIAL_PLURALS:
return SPECIAL_PLURALS[singular]
# We need to guess what the English plural might be. Keep this
# function simple! It doesn't need to know about every possiblity;
# only regular rules and the most common special cases.
#
# Reference: http://en.wikipedia.org/wiki/English_plural
for ending in SIBILANT_ENDINGS:
if singular.endswith(ending):
return '%ses' % singular
if singular.endswith('o') and singular[-2:-1] not in VOWELS:
return '%ses' % singular
if singular.endswith('y') and singular[-2:-1] not in VOWELS:
return '%sies' % singular[:-1]
return '%ss' % singular
def WordSeries(words, conjunction='and'):
"""Convert a list of words to an English-language word series.
Args:
words: A list of word strings.
conjunction: A coordinating conjunction.
Returns:
A single string containing all the words in the list separated by commas,
the coordinating conjunction, and a serial comma, as appropriate.
"""
num_words = len(words)
if num_words == 0:
return ''
elif num_words == 1:
return words[0]
elif num_words == 2:
return (' %s ' % conjunction).join(words)
else:
return '%s, %s %s' % (', '.join(words[:-1]), conjunction, words[-1])
def AddIndefiniteArticle(noun):
"""Formats a noun with an appropriate indefinite article.
Args:
noun: A string representing a noun.
Returns:
A string containing noun prefixed with an indefinite article, e.g.,
"a thing" or "an object".
"""
if not noun:
raise ValueError('argument must be a word: {!r}'.format(noun))
if noun[0] in VOWELS:
return 'an ' + noun
else:
return 'a ' + noun
def DecimalPrefix(quantity, unit, precision=1, min_scale=0, max_scale=None):
"""Formats an integer and a unit into a string, using decimal prefixes.
The unit will be prefixed with an appropriate multiplier such that
the formatted integer is less than 1,000 (as long as the raw integer
is less than 10**27). For example:
DecimalPrefix(576012, 'bps') -> '576 kbps'
DecimalPrefix(1574215, 'bps', 2) -> '1.6 Mbps'
Only the SI prefixes which are powers of 10**3 will be used, so
DecimalPrefix(100, 'thread') is '100 thread', not '1 hthread'.
See also:
BinaryPrefix()
Args:
quantity: A number.
unit: A string.
precision: An integer, the minimum number of digits to display.
min_scale: minimum power of 1000 to scale to, (None = unbounded).
max_scale: maximum power of 1000 to scale to, (None = unbounded).
Returns:
A string.
"""
return _Prefix(quantity, unit, precision, DecimalScale, min_scale=min_scale,
max_scale=max_scale)
def BinaryPrefix(quantity, unit, precision=1):
"""Formats an integer and a unit into a string, using binary prefixes.
The unit will be prefixed with an appropriate multiplier such that
the formatted integer is less than 1,024 (as long as the raw integer
is less than 2**90). For example:
BinaryPrefix(576012, 'B') -> '562 KiB'
See also:
DecimalPrefix()
Args:
quantity: A number.
unit: A string.
precision: An integer, the minimum number of digits to display.
Returns:
A string.
"""
return _Prefix(quantity, unit, precision, BinaryScale)
def _Prefix(quantity, unit, precision, scale_callable, **args):
"""Formats an integer and a unit into a string.
Args:
quantity: A number.
unit: A string.
precision: An integer, the minimum number of digits to display.
scale_callable: A callable, scales the number and units.
**args: named arguments passed to scale_callable.
Returns:
A string.
"""
if not quantity:
return '0 %s' % unit
if quantity in [float('inf'), float('-inf')] or _IsNan(quantity):
return '%f %s' % (quantity, unit)
scaled_quantity, scaled_unit = scale_callable(quantity, unit, **args)
print_pattern = '%%.%df %%s' % max(0, (precision - int(
math.log(abs(scaled_quantity), 10)) - 1))
return print_pattern % (scaled_quantity, scaled_unit)
def DecimalScale(quantity, unit, min_scale=0, max_scale=None):
"""Get the scaled value and decimal prefixed unit in a tupple.
DecimalScale(576012, 'bps') -> (576.012, 'kbps')
DecimalScale(1574215, 'bps') -> (1.574215, 'Mbps')
Args:
quantity: A number.
unit: A string.
min_scale: minimum power of 1000 to normalize to (None = unbounded)
max_scale: maximum power of 1000 to normalize to (None = unbounded)
Returns:
A tuple of a scaled quantity (float) and BinaryPrefix for the
units (string).
"""
if min_scale is None or min_scale < 0:
negative_powers = ('m', u'µ', 'n', 'p', 'f', 'a', 'z', 'y')
if min_scale is not None:
negative_powers = tuple(negative_powers[0:-min_scale])
else:
negative_powers = None
if max_scale is None or max_scale > 0:
powers = ('k', 'M', 'G', 'T', 'P', 'E', 'Z', 'Y')
if max_scale is not None:
powers = tuple(powers[0:max_scale])
return _Scale(quantity, unit, 1000, powers, negative_powers)
def BinaryScale(quantity, unit):
"""Get the scaled value and binary prefixed unit in a tupple.
BinaryPrefix(576012, 'B') -> (562.51171875, 'KiB')
Args:
quantity: A number.
unit: A string.
Returns:
A tuple of a scaled quantity (float) and BinaryPrefix for the
units (string).
"""
return _Scale(quantity, unit, 1024,
('Ki', 'Mi', 'Gi', 'Ti', 'Pi', 'Ei', 'Zi', 'Yi'))
def _Scale(quantity, unit, multiplier, prefixes, inverse_prefixes=None):
"""Returns the formatted quantity and unit into a tuple.
Args:
quantity: A number.
unit: A string
multiplier: An integer, the ratio between prefixes.
prefixes: A sequence of strings.
inverse_prefixes: Prefixes to use for negative powers,
If empty or None, no scaling is done for fractions.
Returns:
A tuple containing the raw scaled quantity and the prefixed unit.
"""
if not quantity:
return 0, unit
if quantity in [float('inf'), float('-inf')] or _IsNan(quantity):
return quantity, unit
power = int(math.log(abs(quantity), multiplier))
if abs(quantity) > 1 or not inverse_prefixes:
if power < 1:
return quantity, unit
power = min(power, len(prefixes))
prefix = prefixes[power - 1]
value = float(quantity) / multiplier ** power
else:
power = min(-power + 1, len(inverse_prefixes))
prefix = inverse_prefixes[power - 1]
value = quantity * multiplier ** power
return value, prefix + unit
# Contains the fractions where the full range [1/n ... (n - 1) / n]
# is defined in Unicode.
FRACTIONS = {
3: (None, u'⅓', u'⅔', None),
5: (None, u'⅕', u'⅖', u'⅗', u'⅘', None),
8: (None, u'⅛', u'¼', u'⅜', u'½', u'⅝', u'¾', u'⅞', None),
}
FRACTION_ROUND_DOWN = 1.0 / (max(FRACTIONS.keys()) * 2.0)
FRACTION_ROUND_UP = 1.0 - FRACTION_ROUND_DOWN
def PrettyFraction(number, spacer=''):
"""Convert a number into a string that might include a unicode fraction.
This method returns the integer representation followed by the closest
fraction of a denominator 2, 3, 4, 5 or 8.
For instance, 0.33 will be converted to 1/3.
The resulting representation should be less than 1/16 off.
Args:
number: a python number
spacer: an optional string to insert between the integer and the fraction
default is an empty string.
Returns:
a unicode string representing the number.
"""
# We do not want small negative numbers to display as -0.
if number < -FRACTION_ROUND_DOWN:
return u'-%s' % PrettyFraction(-number)
number = abs(number)
rounded = int(number)
fract = number - rounded
if fract >= FRACTION_ROUND_UP:
return str(rounded + 1)
errors_fractions = []
for denominator, fraction_elements in FRACTIONS.items():
numerator = int(round(denominator * fract))
error = abs(fract - (float(numerator) / float(denominator)))
errors_fractions.append((error, fraction_elements[numerator]))
unused_error, fraction_text = min(errors_fractions)
if rounded and fraction_text:
return u'%d%s%s' % (rounded, spacer, fraction_text)
if rounded:
return str(rounded)
if fraction_text:
return fraction_text
return u'0'
def Duration(duration, separator=' '):
"""Formats a nonnegative number of seconds into a human-readable string.
Args:
duration: A float duration in seconds.
separator: A string separator between days, hours, minutes and seconds.
Returns:
Formatted string like '5d 12h 30m 45s'.
"""
try:
delta = datetime.timedelta(seconds=duration)
except OverflowError:
return '>=' + TimeDelta(datetime.timedelta.max)
return TimeDelta(delta, separator=separator)
def TimeDelta(delta, separator=' '):
"""Format a datetime.timedelta into a human-readable string.
Args:
delta: The datetime.timedelta to format.
separator: A string separator between days, hours, minutes and seconds.
Returns:
Formatted string like '5d 12h 30m 45s'.
"""
parts = []
seconds = delta.seconds
if delta.days:
parts.append('%dd' % delta.days)
if seconds >= 3600:
parts.append('%dh' % (seconds // 3600))
seconds %= 3600
if seconds >= 60:
parts.append('%dm' % (seconds // 60))
seconds %= 60
seconds += delta.microseconds / 1e6
if seconds or not parts:
parts.append('%gs' % seconds)
return separator.join(parts)
def NaturalSortKey(data):
"""Key function for "natural sort" ordering.
This key function results in a lexigraph sort. For example:
- ['1, '3', '20'] (not ['1', '20', '3']).
- ['Model 9', 'Model 70 SE', 'Model 70 SE2']
(not ['Model 70 SE', 'Model 70 SE2', 'Model 9']).
Usage:
new_list = sorted(old_list, key=humanize.NaturalSortKey)
or
list_sort_in_place.sort(key=humanize.NaturalSortKey)
Based on code by Steven Bazyl <sbazyl@google.com>.
Args:
data: str, The key being compared in a sort.
Returns:
A list which is comparable to other lists for the purpose of sorting.
"""
segments = DIGIT_SPLITTER(data)
for i, value in enumerate(segments):
if value.isdigit():
segments[i] = int(value)
return segments
#!/usr/bin/env python
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""Test for google.apputils.humanize."""
import datetime
from google.apputils import basetest
from google.apputils import humanize
class HumanizeTest(basetest.TestCase):
def testCommas(self):
self.assertEqual('0', humanize.Commas(0))
self.assertEqual('100', humanize.Commas(100))
self.assertEqual('1,000', humanize.Commas(1000))
self.assertEqual('10,000', humanize.Commas(10000))
self.assertEqual('1,000,000', humanize.Commas(1e6))
self.assertEqual('-1,000,000', humanize.Commas(-1e6))
def testPlural(self):
self.assertEqual('0 objects', humanize.Plural(0, 'object'))
self.assertEqual('1 object', humanize.Plural(1, 'object'))
self.assertEqual('-1 objects', humanize.Plural(-1, 'object'))
self.assertEqual('42 objects', humanize.Plural(42, 'object'))
self.assertEqual('42 cats', humanize.Plural(42, 'cat'))
self.assertEqual('42 glasses', humanize.Plural(42, 'glass'))
self.assertEqual('42 potatoes', humanize.Plural(42, 'potato'))
self.assertEqual('42 cherries', humanize.Plural(42, 'cherry'))
self.assertEqual('42 monkeys', humanize.Plural(42, 'monkey'))
self.assertEqual('42 oxen', humanize.Plural(42, 'ox', 'oxen'))
self.assertEqual('42 indices', humanize.Plural(42, 'index'))
self.assertEqual(
'42 attorneys general',
humanize.Plural(42, 'attorney general', 'attorneys general'))
def testPluralWord(self):
self.assertEqual('vaxen', humanize.PluralWord(2, 'vax', plural='vaxen'))
self.assertEqual('cores', humanize.PluralWord(2, 'core'))
self.assertEqual('group', humanize.PluralWord(1, 'group'))
self.assertEqual('cells', humanize.PluralWord(0, 'cell'))
self.assertEqual('degree', humanize.PluralWord(1.0, 'degree'))
self.assertEqual('helloes', humanize.PluralWord(3.14, 'hello'))
def testWordSeries(self):
self.assertEqual('', humanize.WordSeries([]))
self.assertEqual('foo', humanize.WordSeries(['foo']))
self.assertEqual('foo and bar', humanize.WordSeries(['foo', 'bar']))
self.assertEqual(
'foo, bar, and baz', humanize.WordSeries(['foo', 'bar', 'baz']))
self.assertEqual(
'foo, bar, or baz', humanize.WordSeries(['foo', 'bar', 'baz'],
conjunction='or'))
def testAddIndefiniteArticle(self):
self.assertEqual('a thing', humanize.AddIndefiniteArticle('thing'))
self.assertEqual('an object', humanize.AddIndefiniteArticle('object'))
self.assertEqual('a Porsche', humanize.AddIndefiniteArticle('Porsche'))
self.assertEqual('an Audi', humanize.AddIndefiniteArticle('Audi'))
def testDecimalPrefix(self):
self.assertEqual('0 m', humanize.DecimalPrefix(0, 'm'))
self.assertEqual('1 km', humanize.DecimalPrefix(1000, 'm'))
self.assertEqual('-1 km', humanize.DecimalPrefix(-1000, 'm'))
self.assertEqual('10 Gbps', humanize.DecimalPrefix(10e9, 'bps'))
self.assertEqual('6000 Yg', humanize.DecimalPrefix(6e27, 'g'))
self.assertEqual('12.1 km', humanize.DecimalPrefix(12100, 'm', precision=3))
self.assertEqual('12 km', humanize.DecimalPrefix(12100, 'm', precision=2))
self.assertEqual('1.15 km', humanize.DecimalPrefix(1150, 'm', precision=3))
self.assertEqual('-1.15 km', humanize.DecimalPrefix(-1150, 'm',
precision=3))
self.assertEqual('1.1 s', humanize.DecimalPrefix(1.12, 's', precision=2))
self.assertEqual('-1.1 s', humanize.DecimalPrefix(-1.12, 's', precision=2))
self.assertEqual('nan bps', humanize.DecimalPrefix(float('nan'), 'bps'))
self.assertEqual('inf bps', humanize.DecimalPrefix(float('inf'), 'bps'))
self.assertEqual('-inf bps', humanize.DecimalPrefix(float('-inf'), 'bps'))
self.assertEqual('-4 mm',
humanize.DecimalPrefix(-0.004, 'm', min_scale=None))
self.assertEqual('0 m', humanize.DecimalPrefix(0, 'm', min_scale=None))
self.assertEqual(
u'1 µs',
humanize.DecimalPrefix(0.0000013, 's', min_scale=None))
self.assertEqual('3 km', humanize.DecimalPrefix(3000, 'm', min_scale=None))
self.assertEqual(
'5000 TB',
humanize.DecimalPrefix(5e15, 'B', max_scale=4))
self.assertEqual(
'5 mSWE',
humanize.DecimalPrefix(0.005, 'SWE', min_scale=None))
self.assertEqual(
'0.0005 ms',
humanize.DecimalPrefix(5e-7, 's', min_scale=-1, precision=2))
def testBinaryPrefix(self):
self.assertEqual('0 B', humanize.BinaryPrefix(0, 'B'))
self.assertEqual('1000 B', humanize.BinaryPrefix(1000, 'B'))
self.assertEqual('1 KiB', humanize.BinaryPrefix(1024, 'B'))
self.assertEqual('64 GiB', humanize.BinaryPrefix(2**36, 'B'))
self.assertEqual('65536 Yibit', humanize.BinaryPrefix(2**96, 'bit'))
self.assertEqual('1.25 KiB', humanize.BinaryPrefix(1280, 'B', precision=3))
self.assertEqual('1.2 KiB', humanize.BinaryPrefix(1280, 'B', precision=2))
def testScale(self):
self.assertEqual((12.1, 'km'), humanize.DecimalScale(12100, 'm'))
self.assertEqual((1.15, 'Mm'), humanize.DecimalScale(1150000, 'm'))
self.assertEqual((450, 'mSWE'),
humanize.DecimalScale(0.45, 'SWE', min_scale=None))
self.assertEqual(
(250, u'µm'),
humanize.DecimalScale(1.0 / (4 * 1000), 'm', min_scale=None))
value, unit = humanize.BinaryScale(200000000000, 'B')
self.assertAlmostEqual(value, 186.26, 2)
self.assertEqual(unit, 'GiB')
value, unit = humanize.BinaryScale(3000000000000, 'B')
self.assertAlmostEqual(value, 2.728, 3)
self.assertEqual(unit, 'TiB')
def testPrettyFraction(self):
# No rounded integer part
self.assertEqual(u'½', humanize.PrettyFraction(0.5))
# Roundeded integer + fraction
self.assertEqual(u'6⅔', humanize.PrettyFraction(20.0 / 3.0))
# Rounded integer, no fraction
self.assertEqual(u'2', humanize.PrettyFraction(2.00001))
# No rounded integer, no fraction
self.assertEqual(u'0', humanize.PrettyFraction(0.001))
# Round up
self.assertEqual(u'1', humanize.PrettyFraction(0.99))
# No round up, edge case
self.assertEqual(u'⅞', humanize.PrettyFraction(0.9))
# Negative fraction
self.assertEqual(u'-⅕', humanize.PrettyFraction(-0.2))
# Negative close to zero (should not be -0)
self.assertEqual(u'0', humanize.PrettyFraction(-0.001))
# Smallest fraction that should round down.
self.assertEqual(u'0', humanize.PrettyFraction(1.0 / 16.0))
# Largest fraction should round up.
self.assertEqual(u'1', humanize.PrettyFraction(15.0 / 16.0))
# Integer zero.
self.assertEqual(u'0', humanize.PrettyFraction(0))
# Check that division yields fraction
self.assertEqual(u'⅘', humanize.PrettyFraction(4.0 / 5.0))
# Custom spacer.
self.assertEqual(u'2 ½', humanize.PrettyFraction(2.5, spacer=' '))
def testDuration(self):
self.assertEqual('2h', humanize.Duration(7200))
self.assertEqual('5d 13h 47m 12s', humanize.Duration(481632))
self.assertEqual('0s', humanize.Duration(0))
self.assertEqual('59s', humanize.Duration(59))
self.assertEqual('1m', humanize.Duration(60))
self.assertEqual('1m 1s', humanize.Duration(61))
self.assertEqual('1h 1s', humanize.Duration(3601))
self.assertEqual('2h-2s', humanize.Duration(7202, separator='-'))
def testLargeDuration(self):
# The maximum seconds and days that can be stored in a datetime.timedelta
# object, as seconds. max_days is equal to MAX_DELTA_DAYS in Python's
# Modules/datetimemodule.c, converted to seconds.
max_seconds = 3600 * 24 - 1
max_days = 999999999 * 24 * 60 * 60
self.assertEqual('999999999d', humanize.Duration(max_days))
self.assertEqual('999999999d 23h 59m 59s',
humanize.Duration(max_days + max_seconds))
self.assertEqual('>=999999999d 23h 59m 60s',
humanize.Duration(max_days + max_seconds + 1))
def testTimeDelta(self):
self.assertEqual('0s', humanize.TimeDelta(datetime.timedelta()))
self.assertEqual('2h', humanize.TimeDelta(datetime.timedelta(hours=2)))
self.assertEqual('1m', humanize.TimeDelta(datetime.timedelta(minutes=1)))
self.assertEqual('5d', humanize.TimeDelta(datetime.timedelta(days=5)))
self.assertEqual('1.25s', humanize.TimeDelta(
datetime.timedelta(seconds=1, microseconds=250000)))
self.assertEqual('1.5s',
humanize.TimeDelta(datetime.timedelta(seconds=1.5)))
self.assertEqual('4d 10h 5m 12.25s', humanize.TimeDelta(
datetime.timedelta(days=4, hours=10, minutes=5, seconds=12,
microseconds=250000)))
class NaturalSortKeyChunkingTest(basetest.TestCase):
def testChunkifySingleChars(self):
self.assertListEqual(
humanize.NaturalSortKey('a1b2c3'),
['a', 1, 'b', 2, 'c', 3])
def testChunkifyMultiChars(self):
self.assertListEqual(
humanize.NaturalSortKey('aa11bb22cc33'),
['aa', 11, 'bb', 22, 'cc', 33])
def testChunkifyComplex(self):
self.assertListEqual(
humanize.NaturalSortKey('one 11 -- two 44'),
['one ', 11, ' -- two ', 44])
class NaturalSortKeysortTest(basetest.TestCase):
def testNaturalSortKeySimpleWords(self):
self.test = ['pair', 'banana', 'apple']
self.good = ['apple', 'banana', 'pair']
self.test.sort(key=humanize.NaturalSortKey)
self.assertListEqual(self.test, self.good)
def testNaturalSortKeySimpleNums(self):
self.test = ['3333', '2222', '9999', '0000']
self.good = ['0000', '2222', '3333', '9999']
self.test.sort(key=humanize.NaturalSortKey)
self.assertListEqual(self.test, self.good)
def testNaturalSortKeySimpleDigits(self):
self.test = ['8', '3', '2']
self.good = ['2', '3', '8']
self.test.sort(key=humanize.NaturalSortKey)
self.assertListEqual(self.test, self.good)
def testVersionStrings(self):
self.test = ['1.2', '0.9', '1.1a2', '1.1a', '1', '1.2.1', '0.9.1']
self.good = ['0.9', '0.9.1', '1', '1.1a', '1.1a2', '1.2', '1.2.1']
self.test.sort(key=humanize.NaturalSortKey)
self.assertListEqual(self.test, self.good)
def testNaturalSortKeySimpleNumLong(self):
self.test = ['11', '9', '1', '200', '19', '20', '900']
self.good = ['1', '9', '11', '19', '20', '200', '900']
self.test.sort(key=humanize.NaturalSortKey)
self.assertListEqual(self.test, self.good)
def testNaturalSortKeyAlNum(self):
self.test = ['x10', 'x9', 'x1', 'x11']
self.good = ['x1', 'x9', 'x10', 'x11']
self.test.sort(key=humanize.NaturalSortKey)
self.assertListEqual(self.test, self.good)
def testNaturalSortKeyNumAlNum(self):
self.test = ['4x10', '4x9', '4x11', '5yy4', '3x1', '2x11']
self.good = ['2x11', '3x1', '4x9', '4x10', '4x11', '5yy4']
self.test.sort(key=humanize.NaturalSortKey)
self.assertListEqual(self.test, self.good)
def testNaturalSortKeyAlNumAl(self):
self.test = ['a9c', 'a4b', 'a10c', 'a1c', 'c10c', 'c10a', 'c9a']
self.good = ['a1c', 'a4b', 'a9c', 'a10c', 'c9a', 'c10a', 'c10c']
self.test.sort(key=humanize.NaturalSortKey)
self.assertListEqual(self.test, self.good)
class NaturalSortKeyBigTest(basetest.TestCase):
def testBig(self):
self.test = [
'1000X Radonius Maximus', '10X Radonius', '200X Radonius',
'20X Radonius', '20X Radonius Prime', '30X Radonius',
'40X Radonius', 'Allegia 50 Clasteron', 'Allegia 500 Clasteron',
'Allegia 51 Clasteron', 'Allegia 51B Clasteron',
'Allegia 52 Clasteron', 'Allegia 60 Clasteron', 'Alpha 100',
'Alpha 2', 'Alpha 200', 'Alpha 2A', 'Alpha 2A-8000', 'Alpha 2A-900',
'Callisto Morphamax', 'Callisto Morphamax 500',
'Callisto Morphamax 5000', 'Callisto Morphamax 600',
'Callisto Morphamax 700', 'Callisto Morphamax 7000',
'Callisto Morphamax 7000 SE', 'Callisto Morphamax 7000 SE2',
'QRS-60 Intrinsia Machine', 'QRS-60F Intrinsia Machine',
'QRS-62 Intrinsia Machine', 'QRS-62F Intrinsia Machine',
'Xiph Xlater 10000', 'Xiph Xlater 2000', 'Xiph Xlater 300',
'Xiph Xlater 40', 'Xiph Xlater 5', 'Xiph Xlater 50',
'Xiph Xlater 500', 'Xiph Xlater 5000', 'Xiph Xlater 58']
self.good = [
'10X Radonius',
'20X Radonius',
'20X Radonius Prime',
'30X Radonius',
'40X Radonius',
'200X Radonius',
'1000X Radonius Maximus',
'Allegia 50 Clasteron',
'Allegia 51 Clasteron',
'Allegia 51B Clasteron',
'Allegia 52 Clasteron',
'Allegia 60 Clasteron',
'Allegia 500 Clasteron',
'Alpha 2',
'Alpha 2A',
'Alpha 2A-900',
'Alpha 2A-8000',
'Alpha 100',
'Alpha 200',
'Callisto Morphamax',
'Callisto Morphamax 500',
'Callisto Morphamax 600',
'Callisto Morphamax 700',
'Callisto Morphamax 5000',
'Callisto Morphamax 7000',
'Callisto Morphamax 7000 SE',
'Callisto Morphamax 7000 SE2',
'QRS-60 Intrinsia Machine',
'QRS-60F Intrinsia Machine',
'QRS-62 Intrinsia Machine',
'QRS-62F Intrinsia Machine',
'Xiph Xlater 5',
'Xiph Xlater 40',
'Xiph Xlater 50',
'Xiph Xlater 58',
'Xiph Xlater 300',
'Xiph Xlater 500',
'Xiph Xlater 2000',
'Xiph Xlater 5000',
'Xiph Xlater 10000',
]
self.test.sort(key=humanize.NaturalSortKey)
self.assertListEqual(self.test, self.good)
if __name__ == '__main__':
basetest.main()
+0
-1
google
google.apputils
Metadata-Version: 1.0
Name: google-apputils
Version: 0.3.0
Version: 0.4.0
Summary: UNKNOWN

@@ -5,0 +5,0 @@ Home-page: http://code.google.com/p/google-apputils-python

@@ -13,2 +13,3 @@ LICENSE

google/apputils/file_util.py
google/apputils/humanize.py
google/apputils/resources.py

@@ -26,2 +27,3 @@ google/apputils/run_script_module.py

google_apputils.egg-info/top_level.txt
tests/__init__.py
tests/app_test.py

@@ -36,2 +38,3 @@ tests/app_test_helper.py

tests/file_util_test.py
tests/humanize_test.py
tests/resources_test.py

@@ -38,0 +41,0 @@ tests/sh_test.py

#!/usr/bin/env python
from pkgutil import extend_path
__path__ = extend_path(__path__, __name__)
try:
import pkg_resources
pkg_resources.declare_namespace(__name__)
except ImportError:
from pkgutil import extend_path
__path__ = extend_path(__path__, __name__)
#!/usr/bin/env python
from pkgutil import extend_path
__path__ = extend_path(__path__, __name__)
try:
import pkg_resources
pkg_resources.declare_namespace(__name__)
except ImportError:
from pkgutil import extend_path
__path__ = extend_path(__path__, __name__)

@@ -41,2 +41,5 @@ #!/usr/bin/env python

flags.DEFINE_boolean('run_with_pdb', 0, 'Set to true for PDB debug mode')
flags.DEFINE_boolean('pdb_post_mortem', 0,
'Set to true to handle uncaught exceptions with PDB '
'post mortem.')
flags.DEFINE_boolean('run_with_profiling', 0,

@@ -46,2 +49,5 @@ 'Set to true for profiling the script. '

'change over time.')
flags.DEFINE_string('profile_file', None,
'Dump profile information to a file (for python -m '
'pstats). Implies --run_with_profiling.')
flags.DEFINE_boolean('use_cprofile_for_profiling', True,

@@ -80,5 +86,6 @@ 'Use cProfile instead of the profile module for '

"""Special boolean flag that displays usage and raises SystemExit."""
NAME = 'help'
def __init__(self):
flags.BooleanFlag.__init__(self, 'help', 0, 'show this help',
flags.BooleanFlag.__init__(self, self.NAME, 0, 'show this help',
short_name='?', allow_override=1)

@@ -88,12 +95,19 @@

if arg:
usage(writeto_stdout=1)
usage(shorthelp=1, writeto_stdout=1)
# Advertise --helpfull on stdout, since usage() was on stdout.
print
print 'Try --helpfull to get a list of all flags.'
sys.exit(1)
class HelpXMLFlag(flags.BooleanFlag):
"""Similar to HelpFlag, but generates output in XML format."""
class HelpshortFlag(HelpFlag):
"""--helpshort is an alias for --help."""
NAME = 'helpshort'
class HelpfullFlag(flags.BooleanFlag):
"""Display help for flags in this module and all dependent modules."""
def __init__(self):
flags.BooleanFlag.__init__(self, 'helpxml', False,
'like --help, but generates XML output',
flags.BooleanFlag.__init__(self, 'helpfull', 0, 'show full help',
allow_override=1)

@@ -103,12 +117,12 @@

if arg:
flags.FLAGS.WriteHelpInXMLFormat(sys.stdout)
usage(writeto_stdout=1)
sys.exit(1)
class HelpshortFlag(flags.BooleanFlag):
"""Special bool flag that calls usage(shorthelp=1) and raises SystemExit."""
class HelpXMLFlag(flags.BooleanFlag):
"""Similar to HelpfullFlag, but generates output in XML format."""
def __init__(self):
flags.BooleanFlag.__init__(self, 'helpshort', 0,
'show usage only for this module',
flags.BooleanFlag.__init__(self, 'helpxml', False,
'like --helpfull, but generates XML output',
allow_override=1)

@@ -118,3 +132,3 @@

if arg:
usage(shorthelp=1, writeto_stdout=1)
flags.FLAGS.WriteHelpInXMLFormat(sys.stdout)
sys.exit(1)

@@ -124,3 +138,2 @@

class BuildDataFlag(flags.BooleanFlag):
"""Boolean flag that writes build data to stdout and exits."""

@@ -145,3 +158,3 @@

sys.stderr.write('FATAL Flags parsing error: %s\n' % error)
sys.stderr.write('Pass --help or --helpshort to see help on flags.\n')
sys.stderr.write('Pass --helpshort or --helpfull to see help on flags.\n')
sys.exit(1)

@@ -156,3 +169,3 @@

# Use a global to ensure idempotence.
# pylint: disable-msg=W0603
# pylint: disable=global-statement
global _define_help_flags_called

@@ -162,4 +175,5 @@

flags.DEFINE_flag(HelpFlag())
flags.DEFINE_flag(HelpshortFlag()) # alias for --help
flags.DEFINE_flag(HelpfullFlag())
flags.DEFINE_flag(HelpXMLFlag())
flags.DEFINE_flag(HelpshortFlag())
flags.DEFINE_flag(BuildDataFlag())

@@ -200,3 +214,3 @@ _define_help_flags_called = True

else:
if FLAGS.run_with_profiling:
if FLAGS.run_with_profiling or FLAGS.profile_file:
# Avoid import overhead since most apps (including performance-sensitive

@@ -210,3 +224,6 @@ # ones) won't be run with profiling.

profiler = profile.Profile()
atexit.register(profiler.print_stats)
if FLAGS.profile_file:
atexit.register(profiler.dump_stats, FLAGS.profile_file)
else:
atexit.register(profiler.print_stats)
retval = profiler.runcall(main, argv)

@@ -218,2 +235,7 @@ sys.exit(retval)

usage(shorthelp=1, detailed_error=error, exitcode=error.exitcode)
except:
if FLAGS.pdb_post_mortem:
traceback.print_exc()
pdb.post_mortem()
raise

@@ -254,3 +276,4 @@

del tb
sys.exc_clear()
if hasattr(sys, 'exc_clear'):
sys.exc_clear() # This functionality is gone in Python 3.

@@ -257,0 +280,0 @@ try:

#!/usr/bin/env python
#
# Copyright 2007 Google Inc. All Rights Reserved.

@@ -34,3 +35,5 @@ #

This module itself registers the command 'help' that allows users
to retrieve help for all or specific commands.
to retrieve help for all or specific commands. 'help' is the default
command executed if no command is expressed, unless a different default
command is set with SetDefaultCommand.

@@ -140,2 +143,3 @@ Example:

_cmd_alias_list = {} # list of command_names index by command_alias
_cmd_default = 'help' # command to execute if none explicitly given

@@ -163,3 +167,3 @@

"""Return list of registered commands."""
# pylint: disable-msg=W0602
# pylint: disable=global-variable-not-assigned
global _cmd_list

@@ -171,3 +175,3 @@ return _cmd_list

"""Return list of registered command aliases."""
# pylint: disable-msg=W0602
# pylint: disable=global-variable-not-assigned
global _cmd_alias_list

@@ -284,6 +288,3 @@ return _cmd_alias_list

# Run command
if FLAGS.run_with_pdb:
ret = pdb.runcall(self.Run, argv)
else:
ret = self.Run(argv)
ret = self.Run(argv)
if ret is None:

@@ -296,2 +297,7 @@ ret = 0

app.usage(shorthelp=1, detailed_error=error, exitcode=error.exitcode)
except:
if FLAGS.pdb_post_mortem:
traceback.print_exc()
pdb.post_mortem()
raise
finally:

@@ -420,3 +426,3 @@ # Restore app.usage and remove this command's flags from the global flags.

# Update global command list.
# pylint: disable-msg=W0602
# pylint: disable=global-variable-not-assigned
global _cmd_list

@@ -650,3 +656,6 @@ global _cmd_alias_list

command = GetCommandByName(name)
cmd_help = command.CommandGetHelp(GetCommandArgv(), cmd_names=cmd_names)
try:
cmd_help = command.CommandGetHelp(GetCommandArgv(), cmd_names=cmd_names)
except Exception as error:
cmd_help = "Internal error for command '%s': %s." % (name, str(error))
cmd_help = cmd_help.strip()

@@ -672,3 +681,3 @@ all_names = ', '.join([name] + (command.CommandGetAliases() or []))

# We do not register them globally so that they do not reappear.
# pylint: disable-msg=W0212
# pylint: disable=protected-access
cmd_flags = command._command_flags

@@ -706,3 +715,3 @@ if cmd_flags.RegisteredFlags():

# Update the global commands.
# pylint: disable-msg=W0603
# pylint: disable=global-statement
global _cmd_argv

@@ -728,3 +737,3 @@ try:

# Update the global commands.
# pylint: disable-msg=W0603
# pylint: disable=global-statement
global _cmd_argv

@@ -743,11 +752,23 @@ _cmd_argv = ParseFlagsWithUsage(_cmd_argv)

def _CommandsStart():
def SetDefaultCommand(default_command):
"""Change the default command to execute if none is explicitly given.
Args:
default_command: str, the name of the command to execute by default.
"""
# pylint: disable=global-statement,g-bad-name
global _cmd_default
_cmd_default = default_command
def _CommandsStart(unused_argv):
"""Main initialization.
This initializes flag values, and calls __main__.main(). Only non-flag
arguments are passed to main(). The return value of main() is used as the
exit status.
Calls __main__.main(), and then the command indicated by the first
non-flag argument, or 'help' if no argument was given. (The command
to execute if no flag is given can be changed via SetDefaultCommand).
Only non-flag arguments are passed to main(). If main does not call
sys.exit, the return value of the command is used as the exit status.
"""
app.RegisterAndParseFlagsWithUsage()
# The following is supposed to return after registering additional commands

@@ -766,3 +787,5 @@ try:

else:
command = GetCommandByName('help')
command = GetCommandByName(_cmd_default)
if command is None:
ShortHelpAndExit("FATAL Command '%s' unknown" % _cmd_default)
sys.exit(command.CommandRun(GetCommandArgv()))

@@ -780,3 +803,7 @@

app.parse_flags_with_usage = ParseFlagsWithUsage
app.really_start = _CommandsStart
original_really_start = app.really_start
def InterceptReallyStart():
original_really_start(main=_CommandsStart)
app.really_start = InterceptReallyStart
app.usage = _ReplacementAppUsage

@@ -783,0 +810,0 @@ return app.run()

@@ -24,6 +24,7 @@ #!/usr/bin/env python

import commands
import collections
import difflib
import getpass
import itertools
import json
import os

@@ -35,3 +36,12 @@ import re

import types
import urlparse
try:
import faulthandler
except ImportError:
# //testing/pybase:pybase can't have deps on any extension modules as it
# is used by code that is executed in such a way it cannot import them. :(
# We use faulthandler if it is available (either via a user declared dep
# or from the Python 3.3+ standard library).
faulthandler = None

@@ -268,17 +278,19 @@ # unittest2 is a backport of Python 2.7's unittest for Python 2.6, so

# The closure here makes sure that each new test() function remembers its
# own values of cls_test and test_name. Without this, they'd all point to
# the values from the last iteration of the loop, causing some arbitrary
# test method to run multiple times and the others never. :(
def test_closure(cls_test, test_name):
def test(self, *args, **kargs):
leaf = self.__class__
leaf.__tests_to_run.discard(test_name)
return cls_test(self, *args, **kargs)
return test
for test_name in test_names:
cls_test = getattr(cls, test_name)
# The default parameters here make sure that each new test() function
# remembers its own values of cls_test and test_name. Without these
# default parameters, they'd all point to the values from the last
# iteration of the loop, causing some arbitrary test method to run
# multiple times and the others never. :(
def test(self, cls_test=cls_test, test_name=test_name):
leaf = self.__class__
leaf.__tests_to_run.discard(test_name)
return cls_test(self)
BeforeAfterTestCaseMeta.SetMethod(
cls, test_name, test_closure(cls_test, test_name))
BeforeAfterTestCaseMeta.SetMethod(cls, test_name, test)
@staticmethod

@@ -380,2 +392,6 @@ def SetBeforeAfterTestCaseAttr():

# TODO(user): Provide an assertItemsEqual method when our super class
# does not provide one. That method went away in Python 3.2 (renamed
# to assertCountEqual, or is that different? investigate).
def assertSameElements(self, expected_seq, actual_seq, msg=None):

@@ -437,5 +453,5 @@ """Assert that two sequences have the same elements (in any order).

if msg:
raise self.failureException(msg)
failure_message = ['\n']
failure_message = [msg, ':\n']
else:
failure_message = ['\n']
for line in difflib.ndiff(first.splitlines(True), second.splitlines(True)):

@@ -487,13 +503,30 @@ failure_message.append(line)

if isinstance(regexes, basestring):
self.fail('regexes is a string; it needs to be a list of strings.')
self.fail('regexes is a string; use assertRegexpMatches instead.')
if not regexes:
self.fail('No regexes specified.')
regex = '(?:%s)' % ')|(?:'.join(regexes)
regex_type = type(regexes[0])
for regex in regexes[1:]:
if type(regex) is not regex_type:
self.fail('regexes list must all be the same type.')
if regex_type is bytes and isinstance(actual_str, unicode):
regexes = [regex.decode('utf-8') for regex in regexes]
regex_type = unicode
elif regex_type is unicode and isinstance(actual_str, bytes):
regexes = [regex.encode('utf-8') for regex in regexes]
regex_type = bytes
if regex_type is unicode:
regex = u'(?:%s)' % u')|(?:'.join(regexes)
elif regex_type is bytes:
regex = b'(?:' + (b')|(?:'.join(regexes)) + b')'
else:
self.fail('Only know how to deal with unicode str or bytes regexes.')
if not re.search(regex, actual_str, re.MULTILINE):
self.fail(message or ('String "%s" does not contain any of these '
self.fail(message or ('"%s" does not contain any of these '
'regexes: %s.' % (actual_str, regexes)))
def assertCommandSucceeds(self, command, regexes=[''], env=None,
def assertCommandSucceeds(self, command, regexes=(b'',), env=None,
close_fds=True):

@@ -504,3 +537,3 @@ """Asserts that a shell command succeeds (i.e. exits with code 0).

command: List or string representing the command to run.
regexes: List of regular expression strings.
regexes: List of regular expression byte strings that match success.
env: Dictionary of environment variable settings.

@@ -512,5 +545,10 @@ close_fds: Whether or not to close all open fd's in the child after

# Accommodate code which listed their output regexes w/o the b'' prefix by
# converting them to bytes for the user.
if isinstance(regexes[0], unicode):
regexes = [regex.encode('utf-8') for regex in regexes]
command_string = GetCommandString(command)
self.assert_(
ret_code == 0,
self.assertEqual(
ret_code, 0,
'Running command\n'

@@ -546,5 +584,10 @@ '%s failed with error code %s and message\n'

# Accommodate code which listed their output regexes w/o the b'' prefix by
# converting them to bytes for the user.
if isinstance(regexes[0], unicode):
regexes = [regex.encode('utf-8') for regex in regexes]
command_string = GetCommandString(command)
self.assert_(
ret_code != 0,
self.assertNotEqual(
ret_code, 0,
'The following command succeeded while expected to fail:\n%s' %

@@ -579,3 +622,3 @@ _QuoteLongString(command_string))

callable_obj(*args, **kwargs)
except expected_exception, err:
except expected_exception as err:
self.assert_(predicate(err),

@@ -605,3 +648,3 @@ '%r does not match predicate %r' % (err, predicate))

callable_obj(*args, **kwargs)
except expected_exception, err:
except expected_exception as err:
actual_exception_message = str(err)

@@ -757,2 +800,147 @@ self.assert_(expected_exception_message == actual_exception_message,

def assertDictEqual(self, a, b, msg=None):
"""Raises AssertionError if a and b are not equal dictionaries.
Args:
a: A dict, the expected value.
b: A dict, the actual value.
msg: An optional str, the associated message.
Raises:
AssertionError: if the dictionaries are not equal.
"""
self.assertIsInstance(a, dict, 'First argument is not a dictionary')
self.assertIsInstance(b, dict, 'Second argument is not a dictionary')
def Sorted(iterable):
try:
return sorted(iterable) # In 3.3, unordered objects are possible.
except TypeError:
return list(iterable)
if a == b:
return
a_items = Sorted(a.iteritems())
b_items = Sorted(b.iteritems())
unexpected = []
missing = []
different = []
safe_repr = unittest.util.safe_repr
def Repr(dikt):
"""Deterministic repr for dict."""
# Sort the entries based on their repr, not based on their sort order,
# which will be non-deterministic across executions, for many types.
entries = sorted((safe_repr(k), safe_repr(v))
for k, v in dikt.iteritems())
return '{%s}' % (', '.join('%s: %s' % pair for pair in entries))
message = ['%s != %s%s' % (Repr(a), Repr(b), ' (%s)' % msg if msg else '')]
# The standard library default output confounds lexical difference with
# value difference; treat them separately.
for a_key, a_value in a_items:
if a_key not in b:
unexpected.append((a_key, a_value))
elif a_value != b[a_key]:
different.append((a_key, a_value, b[a_key]))
if unexpected:
message.append(
'Unexpected, but present entries:\n%s' % ''.join(
'%s: %s\n' % (safe_repr(k), safe_repr(v)) for k, v in unexpected))
if different:
message.append(
'repr() of differing entries:\n%s' % ''.join(
'%s: %s != %s\n' % (safe_repr(k), safe_repr(a_value),
safe_repr(b_value))
for k, a_value, b_value in different))
for b_key, b_value in b_items:
if b_key not in a:
missing.append((b_key, b_value))
if missing:
message.append(
'Missing entries:\n%s' % ''.join(
('%s: %s\n' % (safe_repr(k), safe_repr(v)) for k, v in missing)))
raise self.failureException('\n'.join(message))
def assertUrlEqual(self, a, b):
"""Asserts that urls are equal, ignoring ordering of query params."""
parsed_a = urlparse.urlparse(a)
parsed_b = urlparse.urlparse(b)
self.assertEqual(parsed_a.scheme, parsed_b.scheme)
self.assertEqual(parsed_a.netloc, parsed_b.netloc)
self.assertEqual(parsed_a.path, parsed_b.path)
self.assertEqual(parsed_a.fragment, parsed_b.fragment)
self.assertEqual(sorted(parsed_a.params.split(';')),
sorted(parsed_b.params.split(';')))
self.assertDictEqual(urlparse.parse_qs(parsed_a.query),
urlparse.parse_qs(parsed_b.query))
def assertSameStructure(self, a, b, aname='a', bname='b', msg=None):
"""Asserts that two values contain the same structural content.
The two arguments should be data trees consisting of trees of dicts and
lists. They will be deeply compared by walking into the contents of dicts
and lists; other items will be compared using the == operator.
If the two structures differ in content, the failure message will indicate
the location within the structures where the first difference is found.
This may be helpful when comparing large structures.
Args:
a: The first structure to compare.
b: The second structure to compare.
aname: Variable name to use for the first structure in assertion messages.
bname: Variable name to use for the second structure.
msg: Additional text to include in the failure message.
"""
# Accumulate all the problems found so we can report all of them at once
# rather than just stopping at the first
problems = []
_WalkStructureForProblems(a, b, aname, bname, problems)
# Avoid spamming the user toooo much
max_problems_to_show = self.maxDiff // 80
if len(problems) > max_problems_to_show:
problems = problems[0:max_problems_to_show-1] + ['...']
if problems:
failure_message = '; '.join(problems)
if msg:
failure_message += (': ' + msg)
self.fail(failure_message)
def assertJsonEqual(self, first, second, msg=None):
"""Asserts that the JSON objects defined in two strings are equal.
A summary of the differences will be included in the failure message
using assertSameStructure.
Args:
first: A string contining JSON to decode and compare to second.
second: A string contining JSON to decode and compare to first.
msg: Additional text to include in the failure message.
"""
try:
first_structured = json.loads(first)
except ValueError as e:
raise ValueError('could not decode first JSON value %s: %s' %
(first, e))
try:
second_structured = json.loads(second)
except ValueError as e:
raise ValueError('could not decode second JSON value %s: %s' %
(second, e))
self.assertSameStructure(first_structured, second_structured,
aname='first', bname='second', msg=msg)
def getRecordedProperties(self):

@@ -942,14 +1130,38 @@ """Return any properties that the user has recorded."""

# We want to emit exactly one notice to stderr telling the user where to look
# for their stdout or stderr that may have been consumed to aid debugging.
_notified_test_output_path = ''
def _MaybeNotifyAboutTestOutput(outdir):
global _notified_test_output_path
if _notified_test_output_path != outdir:
_notified_test_output_path = outdir
sys.stderr.write('\nNOTE: Some tests capturing output into: %s\n' % outdir)
# TODO(user): Make CaptureTest* be usable as context managers to easily stop
# capturing at the appropriate time to make debugging failures much easier.
# Public interface
def CaptureTestStdout(outfile=None):
def CaptureTestStdout(outfile=''):
"""Capture the stdout stream to a file until StopCapturing() is called."""
if not outfile:
outfile = os.path.join(FLAGS.test_tmpdir, 'captured.out')
outdir = FLAGS.test_tmpdir
else:
outdir = os.path.dirname(outfile)
_MaybeNotifyAboutTestOutput(outdir)
_CaptureTestOutput(sys.stdout, outfile)
def CaptureTestStderr(outfile=None):
def CaptureTestStderr(outfile=''):
"""Capture the stderr stream to a file until StopCapturing() is called."""
if not outfile:
outfile = os.path.join(FLAGS.test_tmpdir, 'captured.err')
outdir = FLAGS.test_tmpdir
else:
outdir = os.path.dirname(outfile)
_MaybeNotifyAboutTestOutput(outdir)
_CaptureTestOutput(sys.stderr, outfile)

@@ -967,2 +1179,3 @@

def StopCapturing():
"""Stop capturing redirected output. Debugging sucks if you forget!"""
while _captured_streams:

@@ -976,3 +1189,5 @@ _, cap_stream = _captured_streams.popitem()

"""Write data into file named filename."""
fd = os.open(filename, os.O_CREAT | os.O_TRUNC | os.O_WRONLY, 0600)
fd = os.open(filename, os.O_CREAT | os.O_TRUNC | os.O_WRONLY, 0o600)
if not isinstance(data, (bytes, bytearray)):
data = data.encode('utf-8')
os.write(fd, data)

@@ -982,2 +1197,45 @@ os.close(fd)

_INT_TYPES = (int, long) # Sadly there is no types.IntTypes defined for us.
def _WalkStructureForProblems(a, b, aname, bname, problem_list):
"""The recursive comparison behind assertSameStructure."""
if type(a) != type(b) and not (
isinstance(a, _INT_TYPES) and isinstance(b, _INT_TYPES)):
# We do not distinguish between int and long types as 99.99% of Python 2
# code should never care. They collapse into a single type in Python 3.
problem_list.append('%s is a %r but %s is a %r' %
(aname, type(a), bname, type(b)))
# If they have different types there's no point continuing
return
if isinstance(a, collections.Mapping):
for k in a:
if k in b:
_WalkStructureForProblems(a[k], b[k],
'%s[%r]' % (aname, k), '%s[%r]' % (bname, k),
problem_list)
else:
problem_list.append('%s has [%r] but %s does not' % (aname, k, bname))
for k in b:
if k not in a:
problem_list.append('%s lacks [%r] but %s has it' % (aname, k, bname))
# Strings are Sequences but we'll just do those with regular !=
elif isinstance(a, collections.Sequence) and not isinstance(a, basestring):
minlen = min(len(a), len(b))
for i in xrange(minlen):
_WalkStructureForProblems(a[i], b[i],
'%s[%d]' % (aname, i), '%s[%d]' % (bname, i),
problem_list)
for i in xrange(minlen, len(a)):
problem_list.append('%s has [%i] but %s does not' % (aname, i, bname))
for i in xrange(minlen, len(b)):
problem_list.append('%s lacks [%i] but %s has it' % (aname, i, bname))
else:
if a != b:
problem_list.append('%s is %r but %s is %r' % (aname, a, bname, b))
class OutputDifferedError(AssertionError):

@@ -991,16 +1249,46 @@ pass

def _DiffViaExternalProgram(lhs, rhs, external_diff):
"""Compare two files using an external program; raise if it reports error."""
# The behavior of this function matches the old _Diff() method behavior
# when a TEST_DIFF environment variable was set. A few old things at
# Google depended on that functionality.
command = [external_diff, lhs, rhs]
try:
subprocess.check_output(command, close_fds=True, stderr=subprocess.STDOUT)
return True # No diffs.
except subprocess.CalledProcessError as error:
failure_output = error.output
if error.returncode == 1:
raise OutputDifferedError('\nRunning %s\n%s\nTest output differed from'
' golden file\n' % (command, failure_output))
except EnvironmentError as error:
failure_output = str(error)
# Running the program failed in some way that wasn't a diff.
raise DiffFailureError('\nRunning %s\n%s\nFailure diffing test output'
' with golden file\n' % (command, failure_output))
def _Diff(lhs, rhs):
"""Run standard unix 'diff' against two files."""
"""Given two pathnames, compare two files. Raise if they differ."""
# Some people rely on being able to specify TEST_DIFF in the environment to
# have tests use their own diff wrapper for use when updating golden data.
external_diff = os.environ.get('TEST_DIFF')
if external_diff:
return _DiffViaExternalProgram(lhs, rhs, external_diff)
try:
with open(lhs, 'rt') as lhs_f:
with open(rhs, 'rt') as rhs_f:
diff_text = ''.join(
difflib.unified_diff(lhs_f.readlines(), rhs_f.readlines()))
if not diff_text:
return True
raise OutputDifferedError('\nComparing %s and %s\nTest output differed '
'from golden file:\n%s' % (lhs, rhs, diff_text))
except EnvironmentError as error:
# Unable to read the files.
raise DiffFailureError('\nComparing %s and %s\nFailure diffing test output '
'with golden file: %s\n' % (lhs, rhs, error))
cmd = '${TEST_DIFF:-diff} %s %s' % (commands.mkarg(lhs), commands.mkarg(rhs))
(status, output) = commands.getstatusoutput(cmd)
if os.WIFEXITED(status) and os.WEXITSTATUS(status) == 1:
# diff outputs must be the same as c++ and shell
raise OutputDifferedError('\nRunning %s\n%s\nTest output differed '
'from golden file\n' % (cmd, output))
elif not os.WIFEXITED(status) or os.WEXITSTATUS(status) != 0:
raise DiffFailureError('\nRunning %s\n%s\nFailure diffing test output '
'with golden file\n' % (cmd, output))
def DiffTestStringFile(data, golden):

@@ -1015,7 +1303,7 @@ """Diff data agains a golden file."""

"""Diff two strings."""
data1_file = os.path.join(FLAGS.test_tmpdir, 'provided_1.dat')
_WriteTestData(data1, data1_file)
data2_file = os.path.join(FLAGS.test_tmpdir, 'provided_2.dat')
_WriteTestData(data2, data2_file)
_Diff(data1_file, data2_file)
diff_text = ''.join(
difflib.unified_diff(data1.splitlines(True), data2.splitlines(True)))
if not diff_text:
return
raise OutputDifferedError('\nTest strings differed:\n%s' % diff_text)

@@ -1081,2 +1369,7 @@

"""
if isinstance(s, (bytes, bytearray)):
try:
s = s.decode('utf-8')
except UnicodeDecodeError:
s = str(s)
return ('8<-----------\n' +

@@ -1135,2 +1428,4 @@ s + '\n' +

def _RunInApp(function, args, kwargs):

@@ -1171,7 +1466,12 @@ """Executes a set of Python unit tests, ensuring app.really_start.

function: basetest.RunTests or a similar function. It will be called as
function(argv, args, kwargs) where argv is a list containing the
elements of sys.argv without the command-line flags.
function(argv, args, kwargs) where argv is a list containing the
elements of sys.argv without the command-line flags.
args: Positional arguments passed through to unittest.TestProgram.__init__.
kwargs: Keyword arguments passed through to unittest.TestProgram.__init__.
"""
if faulthandler:
try:
faulthandler.enable()
except Exception as e:
sys.stderr.write('faulthandler.enable() failed %r; ignoring.\n' % e)
if _IsInAppMain():

@@ -1178,0 +1478,0 @@ # Save command-line flags so the side effects of FLAGS(sys.argv) can be

@@ -31,2 +31,3 @@ #!/usr/bin/env python

import types
import warnings

@@ -52,2 +53,14 @@ from dateutil import parser

def MicrosecondsToSeconds(microseconds):
"""Convert microseconds to seconds.
Args:
microseconds: A number representing some duration of time measured in
microseconds.
Returns:
A number representing the same duration of time measured in seconds.
"""
return microseconds / _MICROSECONDS_PER_SECOND_F
def _GetCurrentTimeMicros():

@@ -86,2 +99,67 @@ """Get the current time in microseconds, in UTC.

def DatetimeToUTCMicros(date):
"""Converts a datetime object to microseconds since the epoch in UTC.
Args:
date: A datetime to convert.
Returns:
The number of microseconds since the epoch, in UTC, represented by the input
datetime.
"""
# Using this guide: http://wiki.python.org/moin/WorkingWithTime
# And this conversion guide: http://docs.python.org/library/time.html
# Turn the date parameter into a tuple (struct_time) that can then be
# manipulated into a long value of seconds. During the conversion from
# struct_time to long, the source date in UTC, and so it follows that the
# correct transformation is calendar.timegm()
micros = calendar.timegm(date.utctimetuple()) * _MICROSECONDS_PER_SECOND
return micros + date.microsecond
def DatetimeToUTCMillis(date):
"""Converts a datetime object to milliseconds since the epoch in UTC.
Args:
date: A datetime to convert.
Returns:
The number of milliseconds since the epoch, in UTC, represented by the input
datetime.
"""
return DatetimeToUTCMicros(date) / 1000
def UTCMicrosToDatetime(micros, tz=None):
"""Converts a microsecond epoch time to a datetime object.
Args:
micros: A UTC time, expressed in microseconds since the epoch.
tz: The desired tzinfo for the datetime object. If None, the
datetime will be naive.
Returns:
The datetime represented by the input value.
"""
# The conversion from micros to seconds for input into the
# utcfromtimestamp function needs to be done as a float to make sure
# we dont lose the sub-second resolution of the input time.
dt = datetime.datetime.utcfromtimestamp(
micros / _MICROSECONDS_PER_SECOND_F)
if tz is not None:
dt = tz.fromutc(dt)
return dt
def UTCMillisToDatetime(millis, tz=None):
"""Converts a millisecond epoch time to a datetime object.
Args:
millis: A UTC time, expressed in milliseconds since the epoch.
tz: The desired tzinfo for the datetime object. If None, the
datetime will be naive.
Returns:
The datetime represented by the input value.
"""
return UTCMicrosToDatetime(millis * 1000, tz)
UTC = pytz.UTC

@@ -384,6 +462,10 @@ US_PACIFIC = pytz.timezone('US/Pacific')

tz: optional timezone, if timezone is omitted from timestring.
Returns:
New Timestamp.
New Timestamp or None if unable to parse the timestring.
"""
r = parser.parse(timestring)
try:
r = parser.parse(timestring)
except ValueError:
return None
if not r.tzinfo:

@@ -398,16 +480,37 @@ r = (tz or cls.LocalTimezone).localize(r)

def _IntStringToInterval(cls, timestring):
"""Parse interval date specification and create a timedelta object."""
return datetime.timedelta(seconds=ConvertIntervalToSeconds(timestring))
"""Parse interval date specification and create a timedelta object.
Args:
timestring: string interval.
Returns:
A datetime.timedelta representing the specified interval or None if
unable to parse the timestring.
"""
seconds = ConvertIntervalToSeconds(timestring)
return datetime.timedelta(seconds=seconds) if seconds else None
@classmethod
def FromString(cls, value, tz=None):
"""Try to create a Timestamp from a string."""
val = cls._StringToTime(value, tz)
if val:
return val
"""Create a Timestamp from a string.
val = cls._IntStringToInterval(value)
if val:
return cls.utcnow() - val
Args:
value: String interval or datetime.
e.g. "2013-01-05 13:00:00" or "1d"
tz: optional timezone, if timezone is omitted from timestring.
Returns:
A new Timestamp.
Raises:
TimeParseError if unable to parse value.
"""
result = cls._StringToTime(value, tz=tz)
if result:
return result
result = cls._IntStringToInterval(value)
if result:
return cls.utcnow() - result
raise TimeParseError(value)

@@ -414,0 +517,0 @@

@@ -15,2 +15,4 @@ #!/usr/bin/env python

# limitations under the License.
#
# This code must be source compatible with Python 2.6 through 3.3.

@@ -52,3 +54,3 @@ """Import this module to add a hook to call pdb on uncaught exceptions.

traceback.print_exception(exc_class, value, tb)
print
sys.stdout.write('\n')
# ...then start the debugger in post-mortem mode.

@@ -60,4 +62,5 @@ pdb.pm()

# Must back up old excepthook.
global old_excepthook # pylint: disable-msg=W0603
old_excepthook = sys.excepthook
sys.excepthook = _DebugHandler
global old_excepthook # pylint: disable=global-statement
if old_excepthook is None:
old_excepthook = sys.excepthook
sys.excepthook = _DebugHandler

@@ -21,2 +21,3 @@ #!/usr/bin/env python

import contextlib
import errno

@@ -41,10 +42,7 @@ import os

"""Read entire contents of file with name 'filename'."""
fp = open(filename)
try:
with open(filename) as fp:
return fp.read()
finally:
fp.close()
def Write(filename, contents, overwrite_existing=True, mode=0666):
def Write(filename, contents, overwrite_existing=True, mode=0666, gid=None):
"""Create a file 'filename' with 'contents', with the mode given in 'mode'.

@@ -55,2 +53,4 @@

An optional gid can be specified.
Args:

@@ -62,2 +62,3 @@ filename: str; the name of the file

mode: int; permissions with which to create the file (default is 0666 octal)
gid: int; group id with which to create the file
"""

@@ -72,5 +73,7 @@ flags = os.O_WRONLY | os.O_TRUNC | os.O_CREAT

os.close(fd)
if gid is not None:
os.chown(filename, -1, gid)
def AtomicWrite(filename, contents, mode=0666):
def AtomicWrite(filename, contents, mode=0666, gid=None):
"""Create a file 'filename' with 'contents' atomically.

@@ -84,2 +87,4 @@

An optional gid can be specified.
Args:

@@ -89,4 +94,5 @@ filename: str; the name of the file

mode: int; permissions with which to create the file (default is 0666 octal)
gid: int; group id with which to create the file
"""
(fd, tmp_filename) = tempfile.mkstemp(dir=os.path.dirname(filename))
fd, tmp_filename = tempfile.mkstemp(dir=os.path.dirname(filename))
try:

@@ -98,2 +104,4 @@ os.write(fd, contents)

os.chmod(tmp_filename, mode)
if gid is not None:
os.chown(tmp_filename, -1, gid)
os.rename(tmp_filename, filename)

@@ -108,2 +116,30 @@ except OSError, exc:

@contextlib.contextmanager
def TemporaryFileWithContents(contents, **kw):
"""A contextmanager that writes out a string to a file on disk.
This is useful whenever you need to call a function or command that expects a
file on disk with some contents that you have in memory. The context manager
abstracts the writing, flushing, and deletion of the temporary file. This is a
common idiom that boils down to a single with statement.
Note: if you need a temporary file-like object for calling an internal
function, you should use a StringIO as a file-like object and not this.
Temporary files should be avoided unless you need a file name or contents in a
file on disk to be read by some other function or program.
Args:
contents: a string with the contents to write to the file.
**kw: Optional arguments passed on to tempfile.NamedTemporaryFile.
Yields:
The temporary file object, opened in 'w' mode.
"""
temporary_file = tempfile.NamedTemporaryFile(**kw)
temporary_file.write(contents)
temporary_file.flush()
yield temporary_file
temporary_file.close()
def MkDirs(directory, force_mode=None):

@@ -110,0 +146,0 @@ """Makes a directory including its parent directories.

@@ -62,3 +62,3 @@ #!/usr/bin/env python

"""Get a filename for a resource; see _Call."""
global _extracted_files # pylint: disable-msg=W0603
global _extracted_files # pylint: disable=global-statement
if not _extracted_files:

@@ -65,0 +65,0 @@ atexit.register(pkg_resources.cleanup_resources)

@@ -214,5 +214,5 @@ #!/usr/bin/env python

os.execv(program, args)
except EnvironmentError, e:
except EnvironmentError as e:
if not getattr(e, 'filename', None):
e.filename = program # Add info to error message
raise

@@ -137,3 +137,3 @@ #!/usr/bin/env python

return False
except SystemExit, e:
except SystemExit as e:
returncode, = e.args

@@ -140,0 +140,0 @@ return not returncode

#!/usr/bin/env python
# This code must be source compatible with Python 2.4 through 3.3.
#

@@ -3,0 +4,0 @@ # Copyright 2003 Google Inc. All Rights Reserved.

Metadata-Version: 1.0
Name: google-apputils
Version: 0.3.0
Version: 0.4.0
Summary: UNKNOWN

@@ -5,0 +5,0 @@ Home-page: http://code.google.com/p/google-apputils-python

@@ -53,6 +53,5 @@ #!/usr/bin/env python

name="google-apputils",
version="0.3.0",
version="0.4.0",
packages=find_packages(exclude=["tests"]),
namespace_packages=find_packages(exclude=["tests"]),
scripts=["ez_setup.py"],
namespace_packages=["google"],
entry_points={

@@ -59,0 +58,0 @@ "distutils.commands": [

#!/usr/bin/env python
#
# Copyright 2005 Google Inc. All Rights Reserved.

@@ -3,0 +4,0 @@ #

@@ -260,8 +260,11 @@ #! /bin/bash

fgrep '>%@<' >/dev/null ||
die "Test 32 failed"
die "Test 30 failed"
readonly HELP_PROG="
from ${APP_PACKAGE} import app
def main(argv): print 'HI'
app.run()
"
echo "PASS"
echo "PASS"
#!/usr/bin/env python
#
# Copyright 2007 Google Inc. All Rights Reserved.

@@ -3,0 +4,0 @@ #

@@ -276,2 +276,26 @@ #! /bin/bash

# Success, default command set and correctly run.
RES=`$PYTHON -c "${IMPORTS}
class test(appcommands.Cmd):
def Run(self, argv):
print 'test running correctly'
return 0
def main(argv):
appcommands.AddCmd('test', test)
appcommands.SetDefaultCommand('test')
appcommands.Run()"` || die "Test 63 failed"
echo "${RES}" | grep -q "test running correctly" || die "Test 64 failed"
# Failure, default command set but missing.
$PYTHON -c "${IMPORTS}
class test(appcommands.Cmd):
def Run(self, argv):
print 'test running correctly'
return 0
def main(argv):
appcommands.AddCmd('test', test)
appcommands.SetDefaultCommand('missing')
appcommands.Run()" >/dev/null 2>&1 && die "Test 65 failed"
echo "PASS"

@@ -20,2 +20,3 @@ #!/usr/bin/env python

import datetime
import random

@@ -146,4 +147,90 @@ import time

def testFromStringInterval(self):
expected_date = datetime.datetime.utcnow() - datetime.timedelta(days=1)
expected_s = time.mktime(expected_date.utctimetuple())
actual_date = datelib.Timestamp.FromString('1d')
actual_s = time.mktime(actual_date.timetuple())
diff_seconds = actual_s - expected_s
self.assertBetween(diff_seconds, 0, 1)
self.assertRaises(
datelib.TimeParseError, datelib.Timestamp.FromString, 'wat')
def _EpochToDatetime(t, tz=None):
if tz is not None:
return datelib.datetime.datetime.fromtimestamp(t, tz)
else:
return datelib.datetime.datetime.utcfromtimestamp(t)
class DatetimeConversionUnitTest(basetest.TestCase):
def setUp(self):
self.pst = pytz.timezone('US/Pacific')
self.utc = pytz.utc
self.now = time.time()
def testDatetimeToUTCMicros(self):
self.assertEqual(
0, datelib.DatetimeToUTCMicros(_EpochToDatetime(0)))
self.assertEqual(
1001 * long(datelib._MICROSECONDS_PER_SECOND),
datelib.DatetimeToUTCMicros(_EpochToDatetime(1001)))
self.assertEqual(long(self.now * datelib._MICROSECONDS_PER_SECOND),
datelib.DatetimeToUTCMicros(_EpochToDatetime(self.now)))
# tzinfo shouldn't change the result
self.assertEqual(
0, datelib.DatetimeToUTCMicros(_EpochToDatetime(0, tz=self.pst)))
def testDatetimeToUTCMillis(self):
self.assertEqual(
0, datelib.DatetimeToUTCMillis(_EpochToDatetime(0)))
self.assertEqual(
1001 * 1000L, datelib.DatetimeToUTCMillis(_EpochToDatetime(1001)))
self.assertEqual(long(self.now * 1000),
datelib.DatetimeToUTCMillis(_EpochToDatetime(self.now)))
# tzinfo shouldn't change the result
self.assertEqual(
0, datelib.DatetimeToUTCMillis(_EpochToDatetime(0, tz=self.pst)))
def testUTCMicrosToDatetime(self):
self.assertEqual(_EpochToDatetime(0), datelib.UTCMicrosToDatetime(0))
self.assertEqual(_EpochToDatetime(1.000001),
datelib.UTCMicrosToDatetime(1000001))
self.assertEqual(_EpochToDatetime(self.now), datelib.UTCMicrosToDatetime(
long(self.now * datelib._MICROSECONDS_PER_SECOND)))
# Check timezone-aware comparisons
self.assertEqual(_EpochToDatetime(0, self.pst),
datelib.UTCMicrosToDatetime(0, tz=self.pst))
self.assertEqual(_EpochToDatetime(0, self.pst),
datelib.UTCMicrosToDatetime(0, tz=self.utc))
def testUTCMillisToDatetime(self):
self.assertEqual(_EpochToDatetime(0), datelib.UTCMillisToDatetime(0))
self.assertEqual(_EpochToDatetime(1.001), datelib.UTCMillisToDatetime(1001))
t = time.time()
dt = _EpochToDatetime(t)
# truncate sub-milli time
dt -= datelib.datetime.timedelta(microseconds=dt.microsecond % 1000)
self.assertEqual(dt, datelib.UTCMillisToDatetime(long(t * 1000)))
# Check timezone-aware comparisons
self.assertEqual(_EpochToDatetime(0, self.pst),
datelib.UTCMillisToDatetime(0, tz=self.pst))
self.assertEqual(_EpochToDatetime(0, self.pst),
datelib.UTCMillisToDatetime(0, tz=self.utc))
class MicrosecondsToSecondsUnitTest(basetest.TestCase):
def testConversionFromMicrosecondsToSeconds(self):
self.assertEqual(0.0, datelib.MicrosecondsToSeconds(0))
self.assertEqual(7.0, datelib.MicrosecondsToSeconds(7000000))
self.assertEqual(1.234567, datelib.MicrosecondsToSeconds(1234567))
self.assertEqual(12345654321.123456,
datelib.MicrosecondsToSeconds(12345654321123456))
if __name__ == '__main__':
basetest.main()

@@ -39,3 +39,3 @@ #!/usr/bin/env python

# pylint is dumb about mox:
# pylint:disable-msg=E1101
# pylint: disable=no-member

@@ -67,7 +67,4 @@

file_util.Write(self.file_path, self.sample_contents)
fp = open(self.file_path)
try:
with open(self.file_path) as fp:
self.assertEquals(fp.read(), self.sample_contents)
finally:
fp.close()

@@ -87,7 +84,4 @@ def testWriteExclusive(self):

file_util.AtomicWrite(self.file_path, self.sample_contents)
fp = open(self.file_path)
try:
with open(self.file_path) as fp:
self.assertEquals(fp.read(), self.sample_contents)
finally:
fp.close()

@@ -131,6 +125,31 @@ def testAtomicWriteMode(self):

open(self.file_path).AndReturn(file_handle)
file_handle.__enter__().AndReturn(file_handle)
file_handle.read().AndReturn(self.sample_contents)
file_handle.close()
file_handle.__exit__(None, None, None)
self.mox.ReplayAll()
self.assertEquals(file_util.Read(self.file_path), self.sample_contents)
try:
self.assertEquals(file_util.Read(self.file_path), self.sample_contents)
self.mox.VerifyAll()
finally:
# Because we mock out the built-in open() function, which the unittest
# library depends on, we need to make sure we revert it before leaving the
# test, otherwise any test failures will cause further internal failures
# and yield no meaningful error output.
self.mox.ResetAll()
self.mox.UnsetStubs()
def testWriteGroup(self):
self.mox.StubOutWithMock(os, 'open')
self.mox.StubOutWithMock(os, 'write')
self.mox.StubOutWithMock(os, 'close')
self.mox.StubOutWithMock(os, 'chown')
gid = 'new gid'
os.open(self.file_path, os.O_WRONLY | os.O_TRUNC | os.O_CREAT,
0666).AndReturn(self.fd)
os.write(self.fd, self.sample_contents)
os.close(self.fd)
os.chown(self.file_path, -1, gid)
self.mox.ReplayAll()
file_util.Write(self.file_path, self.sample_contents, gid=gid)
self.mox.VerifyAll()

@@ -150,2 +169,3 @@

self.mode = 'new permissions'
self.gid = 'new gid'
self.temp_filename = '/some/temp/file'

@@ -163,2 +183,21 @@ self.os_error = OSError('A problem renaming!')

def testAtomicWriteGroup(self):
self.mox.StubOutWithMock(os, 'chown')
os.chown(self.temp_filename, -1, self.gid)
os.rename(self.temp_filename, self.file_path)
self.mox.ReplayAll()
file_util.AtomicWrite(self.file_path, self.sample_contents,
mode=self.mode, gid=self.gid)
self.mox.VerifyAll()
def testAtomicWriteGroupError(self):
self.mox.StubOutWithMock(os, 'chown')
os.chown(self.temp_filename, -1, self.gid).AndRaise(self.os_error)
os.remove(self.temp_filename)
self.mox.ReplayAll()
self.assertRaises(OSError, file_util.AtomicWrite, self.file_path,
self.sample_contents, mode=self.mode, gid=self.gid)
self.mox.VerifyAll()
def testRenamingError(self):

@@ -180,3 +219,3 @@ os.rename(self.temp_filename, self.file_path).AndRaise(self.os_error)

mode=self.mode)
except OSError, e:
except OSError as e:
self.assertEquals(str(e),

@@ -190,6 +229,19 @@ 'A problem renaming!. Additional errors cleaning up: '

class TemporaryFilesMoxTest(FileUtilMoxTestBase):
def testTemporaryFileWithContents(self):
contents = 'Inspiration!'
with file_util.TemporaryFileWithContents(contents) as temporary_file:
filename = temporary_file.name
contents_read = open(temporary_file.name).read()
self.assertEqual(contents_read, contents)
# Ensure that the file does not exist.
self.assertFalse(os.path.exists(filename))
class MkDirsMoxTest(FileUtilMoxTestBase):
# pylint is dumb about mox:
# pylint:disable-msg=E1103
# pylint: disable=maybe-no-member

@@ -331,5 +383,7 @@ def setUp(self):

os.makedirs(test_sandbox)
open(os.path.join(test_sandbox, 'file'), 'w').close()
with open(os.path.join(test_sandbox, 'file'), 'w'):
pass
os.makedirs(test_dir)
open(os.path.join(test_dir, 'file'), 'w')
with open(os.path.join(test_dir, 'file'), 'w'):
pass

@@ -336,0 +390,0 @@ file_util.RmDirs(test_dir)

#!/usr/bin/env python
#
# Copyright 2010 Google Inc. All Rights Reserved.

@@ -4,0 +3,0 @@

#!/usr/bin/env python
# This code must be source compatible with Python 2.4 through 3.3.
#

@@ -8,7 +9,7 @@ # Copyright 2003 Google Inc. All Rights Reserved.

__pychecker__ = '--unusednames=e,_'
import os
# Use unittest instead of basetest to avoid bootstrap issues / circular deps.
import unittest
from google.apputils import basetest
from google.apputils import shellutil

@@ -20,3 +21,3 @@

class ShellUtilUnitTest(basetest.TestCase):
class ShellUtilUnitTest(unittest.TestCase):
def testShellEscapeList(self):

@@ -27,11 +28,11 @@ # TODO(user): Actually run some shell commands and test the

words = []
self.assertEquals(shellutil.ShellEscapeList(words), '')
self.assertEqual(shellutil.ShellEscapeList(words), '')
# Empty string
words = ['']
self.assertEquals(shellutil.ShellEscapeList(words), "''")
self.assertEqual(shellutil.ShellEscapeList(words), "''")
# Single word
words = ['foo']
self.assertEquals(shellutil.ShellEscapeList(words), "'foo'")
self.assertEqual(shellutil.ShellEscapeList(words), "'foo'")

@@ -41,11 +42,11 @@ # Single word with single quote

expected = """ 'foo'"'"'bar' """.strip()
self.assertEquals(shellutil.ShellEscapeList(words), expected)
self.assertEqual(shellutil.ShellEscapeList(words), expected)
# .. double quote
words = ['foo"bar']
expected = """ 'foo"bar' """.strip()
self.assertEquals(shellutil.ShellEscapeList(words), expected)
self.assertEqual(shellutil.ShellEscapeList(words), expected)
# Multiple words
words = ['foo', 'bar']
self.assertEquals(shellutil.ShellEscapeList(words), "'foo' 'bar'")
self.assertEqual(shellutil.ShellEscapeList(words), "'foo' 'bar'")

@@ -55,3 +56,3 @@ # Words with spaces

expected = """ 'foo' 'bar' 'foo'"'"''"'"' '"'"''"'"'bar' """.strip()
self.assertEquals(shellutil.ShellEscapeList(words), expected)
self.assertEqual(shellutil.ShellEscapeList(words), expected)

@@ -61,3 +62,3 @@ # Now I'm just being mean

expected = """ 'foo' 'bar' '""'"'"'"'"'"'"' """.strip()
self.assertEquals(shellutil.ShellEscapeList(words), expected)
self.assertEqual(shellutil.ShellEscapeList(words), expected)

@@ -72,2 +73,2 @@ def testShellifyStatus(self):

if __name__ == '__main__':
basetest.main()
unittest.main()

Sorry, the diff of this file is too big to display