google-apputils
Advanced tools
| #!/usr/bin/env python | ||
| # -*- coding: utf-8 -*- | ||
| # Copyright 2008 Google Inc. All Rights Reserved. | ||
| """Lightweight routines for producing more friendly output. | ||
| Usage examples: | ||
| 'New messages: %s' % humanize.Commas(star_count) | ||
| -> 'New messages: 58,192' | ||
| 'Found %s.' % humanize.Plural(error_count, 'error') | ||
| -> 'Found 2 errors.' | ||
| 'Found %s.' % humanize.Plural(error_count, 'ox', 'oxen') | ||
| -> 'Found 2 oxen.' | ||
| 'Copied at %s.' % humanize.DecimalPrefix(rate, 'bps', precision=3) | ||
| -> 'Copied at 42.6 Mbps.' | ||
| 'Free RAM: %s' % humanize.BinaryPrefix(bytes_free, 'B') | ||
| -> 'Free RAM: 742 MiB' | ||
| 'Finished all tasks in %s.' % humanize.Duration(elapsed_time) | ||
| -> 'Finished all tasks in 34m 5s.' | ||
| These libraries are not a substitute for full localization. If you | ||
| need localization, then you will have to think about translating | ||
| strings, formatting numbers in different ways, and so on. Use | ||
| ICU if your application is user-facing. Use these libraries if | ||
| your application is an English-only internal tool, and you are | ||
| tired of seeing "1 results" or "3450134804 bytes used". | ||
| Compare humanize.*Prefix() to C++ utilites HumanReadableNumBytes and | ||
| HumanReadableInt in strings/human_readable.h. | ||
| """ | ||
| import datetime | ||
| import math | ||
| import re | ||
| SIBILANT_ENDINGS = frozenset(['sh', 'ss', 'tch', 'ax', 'ix', 'ex']) | ||
| DIGIT_SPLITTER = re.compile(r'\d+|\D+').findall | ||
| # These are included because they are common technical terms. | ||
| SPECIAL_PLURALS = { | ||
| 'index': 'indices', | ||
| 'matrix': 'matrices', | ||
| 'vertex': 'vertices', | ||
| } | ||
| VOWELS = frozenset('AEIOUaeiou') | ||
| # In Python 2.6, int(float('nan')) intentionally raises a TypeError. This code | ||
| # attempts to futureproof us against that eventual upgrade. | ||
| try: | ||
| _IsNan = math.isnan | ||
| except AttributeError: | ||
| def _IsNan(x): | ||
| return type(x) is float and x != x | ||
| def Commas(value): | ||
| """Formats an integer with thousands-separating commas. | ||
| Args: | ||
| value: An integer. | ||
| Returns: | ||
| A string. | ||
| """ | ||
| if value < 0: | ||
| sign = '-' | ||
| value = -value | ||
| else: | ||
| sign = '' | ||
| result = [] | ||
| while value >= 1000: | ||
| result.append('%03d' % (value % 1000)) | ||
| value /= 1000 | ||
| result.append('%d' % value) | ||
| return sign + ','.join(reversed(result)) | ||
| def Plural(quantity, singular, plural=None): | ||
| """Formats an integer and a string into a single pluralized string. | ||
| Args: | ||
| quantity: An integer. | ||
| singular: A string, the singular form of a noun. | ||
| plural: A string, the plural form. If not specified, then simple | ||
| English rules of regular pluralization will be used. | ||
| Returns: | ||
| A string. | ||
| """ | ||
| return '%d %s' % (quantity, PluralWord(quantity, singular, plural)) | ||
| def PluralWord(quantity, singular, plural=None): | ||
| """Builds the plural of an English word. | ||
| Args: | ||
| quantity: An integer. | ||
| singular: A string, the singular form of a noun. | ||
| plural: A string, the plural form. If not specified, then simple | ||
| English rules of regular pluralization will be used. | ||
| Returns: | ||
| the plural form of the word. | ||
| """ | ||
| if quantity == 1: | ||
| return singular | ||
| if plural: | ||
| return plural | ||
| if singular in SPECIAL_PLURALS: | ||
| return SPECIAL_PLURALS[singular] | ||
| # We need to guess what the English plural might be. Keep this | ||
| # function simple! It doesn't need to know about every possiblity; | ||
| # only regular rules and the most common special cases. | ||
| # | ||
| # Reference: http://en.wikipedia.org/wiki/English_plural | ||
| for ending in SIBILANT_ENDINGS: | ||
| if singular.endswith(ending): | ||
| return '%ses' % singular | ||
| if singular.endswith('o') and singular[-2:-1] not in VOWELS: | ||
| return '%ses' % singular | ||
| if singular.endswith('y') and singular[-2:-1] not in VOWELS: | ||
| return '%sies' % singular[:-1] | ||
| return '%ss' % singular | ||
| def WordSeries(words, conjunction='and'): | ||
| """Convert a list of words to an English-language word series. | ||
| Args: | ||
| words: A list of word strings. | ||
| conjunction: A coordinating conjunction. | ||
| Returns: | ||
| A single string containing all the words in the list separated by commas, | ||
| the coordinating conjunction, and a serial comma, as appropriate. | ||
| """ | ||
| num_words = len(words) | ||
| if num_words == 0: | ||
| return '' | ||
| elif num_words == 1: | ||
| return words[0] | ||
| elif num_words == 2: | ||
| return (' %s ' % conjunction).join(words) | ||
| else: | ||
| return '%s, %s %s' % (', '.join(words[:-1]), conjunction, words[-1]) | ||
| def AddIndefiniteArticle(noun): | ||
| """Formats a noun with an appropriate indefinite article. | ||
| Args: | ||
| noun: A string representing a noun. | ||
| Returns: | ||
| A string containing noun prefixed with an indefinite article, e.g., | ||
| "a thing" or "an object". | ||
| """ | ||
| if not noun: | ||
| raise ValueError('argument must be a word: {!r}'.format(noun)) | ||
| if noun[0] in VOWELS: | ||
| return 'an ' + noun | ||
| else: | ||
| return 'a ' + noun | ||
| def DecimalPrefix(quantity, unit, precision=1, min_scale=0, max_scale=None): | ||
| """Formats an integer and a unit into a string, using decimal prefixes. | ||
| The unit will be prefixed with an appropriate multiplier such that | ||
| the formatted integer is less than 1,000 (as long as the raw integer | ||
| is less than 10**27). For example: | ||
| DecimalPrefix(576012, 'bps') -> '576 kbps' | ||
| DecimalPrefix(1574215, 'bps', 2) -> '1.6 Mbps' | ||
| Only the SI prefixes which are powers of 10**3 will be used, so | ||
| DecimalPrefix(100, 'thread') is '100 thread', not '1 hthread'. | ||
| See also: | ||
| BinaryPrefix() | ||
| Args: | ||
| quantity: A number. | ||
| unit: A string. | ||
| precision: An integer, the minimum number of digits to display. | ||
| min_scale: minimum power of 1000 to scale to, (None = unbounded). | ||
| max_scale: maximum power of 1000 to scale to, (None = unbounded). | ||
| Returns: | ||
| A string. | ||
| """ | ||
| return _Prefix(quantity, unit, precision, DecimalScale, min_scale=min_scale, | ||
| max_scale=max_scale) | ||
| def BinaryPrefix(quantity, unit, precision=1): | ||
| """Formats an integer and a unit into a string, using binary prefixes. | ||
| The unit will be prefixed with an appropriate multiplier such that | ||
| the formatted integer is less than 1,024 (as long as the raw integer | ||
| is less than 2**90). For example: | ||
| BinaryPrefix(576012, 'B') -> '562 KiB' | ||
| See also: | ||
| DecimalPrefix() | ||
| Args: | ||
| quantity: A number. | ||
| unit: A string. | ||
| precision: An integer, the minimum number of digits to display. | ||
| Returns: | ||
| A string. | ||
| """ | ||
| return _Prefix(quantity, unit, precision, BinaryScale) | ||
| def _Prefix(quantity, unit, precision, scale_callable, **args): | ||
| """Formats an integer and a unit into a string. | ||
| Args: | ||
| quantity: A number. | ||
| unit: A string. | ||
| precision: An integer, the minimum number of digits to display. | ||
| scale_callable: A callable, scales the number and units. | ||
| **args: named arguments passed to scale_callable. | ||
| Returns: | ||
| A string. | ||
| """ | ||
| if not quantity: | ||
| return '0 %s' % unit | ||
| if quantity in [float('inf'), float('-inf')] or _IsNan(quantity): | ||
| return '%f %s' % (quantity, unit) | ||
| scaled_quantity, scaled_unit = scale_callable(quantity, unit, **args) | ||
| print_pattern = '%%.%df %%s' % max(0, (precision - int( | ||
| math.log(abs(scaled_quantity), 10)) - 1)) | ||
| return print_pattern % (scaled_quantity, scaled_unit) | ||
| def DecimalScale(quantity, unit, min_scale=0, max_scale=None): | ||
| """Get the scaled value and decimal prefixed unit in a tupple. | ||
| DecimalScale(576012, 'bps') -> (576.012, 'kbps') | ||
| DecimalScale(1574215, 'bps') -> (1.574215, 'Mbps') | ||
| Args: | ||
| quantity: A number. | ||
| unit: A string. | ||
| min_scale: minimum power of 1000 to normalize to (None = unbounded) | ||
| max_scale: maximum power of 1000 to normalize to (None = unbounded) | ||
| Returns: | ||
| A tuple of a scaled quantity (float) and BinaryPrefix for the | ||
| units (string). | ||
| """ | ||
| if min_scale is None or min_scale < 0: | ||
| negative_powers = ('m', u'µ', 'n', 'p', 'f', 'a', 'z', 'y') | ||
| if min_scale is not None: | ||
| negative_powers = tuple(negative_powers[0:-min_scale]) | ||
| else: | ||
| negative_powers = None | ||
| if max_scale is None or max_scale > 0: | ||
| powers = ('k', 'M', 'G', 'T', 'P', 'E', 'Z', 'Y') | ||
| if max_scale is not None: | ||
| powers = tuple(powers[0:max_scale]) | ||
| return _Scale(quantity, unit, 1000, powers, negative_powers) | ||
| def BinaryScale(quantity, unit): | ||
| """Get the scaled value and binary prefixed unit in a tupple. | ||
| BinaryPrefix(576012, 'B') -> (562.51171875, 'KiB') | ||
| Args: | ||
| quantity: A number. | ||
| unit: A string. | ||
| Returns: | ||
| A tuple of a scaled quantity (float) and BinaryPrefix for the | ||
| units (string). | ||
| """ | ||
| return _Scale(quantity, unit, 1024, | ||
| ('Ki', 'Mi', 'Gi', 'Ti', 'Pi', 'Ei', 'Zi', 'Yi')) | ||
| def _Scale(quantity, unit, multiplier, prefixes, inverse_prefixes=None): | ||
| """Returns the formatted quantity and unit into a tuple. | ||
| Args: | ||
| quantity: A number. | ||
| unit: A string | ||
| multiplier: An integer, the ratio between prefixes. | ||
| prefixes: A sequence of strings. | ||
| inverse_prefixes: Prefixes to use for negative powers, | ||
| If empty or None, no scaling is done for fractions. | ||
| Returns: | ||
| A tuple containing the raw scaled quantity and the prefixed unit. | ||
| """ | ||
| if not quantity: | ||
| return 0, unit | ||
| if quantity in [float('inf'), float('-inf')] or _IsNan(quantity): | ||
| return quantity, unit | ||
| power = int(math.log(abs(quantity), multiplier)) | ||
| if abs(quantity) > 1 or not inverse_prefixes: | ||
| if power < 1: | ||
| return quantity, unit | ||
| power = min(power, len(prefixes)) | ||
| prefix = prefixes[power - 1] | ||
| value = float(quantity) / multiplier ** power | ||
| else: | ||
| power = min(-power + 1, len(inverse_prefixes)) | ||
| prefix = inverse_prefixes[power - 1] | ||
| value = quantity * multiplier ** power | ||
| return value, prefix + unit | ||
| # Contains the fractions where the full range [1/n ... (n - 1) / n] | ||
| # is defined in Unicode. | ||
| FRACTIONS = { | ||
| 3: (None, u'⅓', u'⅔', None), | ||
| 5: (None, u'⅕', u'⅖', u'⅗', u'⅘', None), | ||
| 8: (None, u'⅛', u'¼', u'⅜', u'½', u'⅝', u'¾', u'⅞', None), | ||
| } | ||
| FRACTION_ROUND_DOWN = 1.0 / (max(FRACTIONS.keys()) * 2.0) | ||
| FRACTION_ROUND_UP = 1.0 - FRACTION_ROUND_DOWN | ||
| def PrettyFraction(number, spacer=''): | ||
| """Convert a number into a string that might include a unicode fraction. | ||
| This method returns the integer representation followed by the closest | ||
| fraction of a denominator 2, 3, 4, 5 or 8. | ||
| For instance, 0.33 will be converted to 1/3. | ||
| The resulting representation should be less than 1/16 off. | ||
| Args: | ||
| number: a python number | ||
| spacer: an optional string to insert between the integer and the fraction | ||
| default is an empty string. | ||
| Returns: | ||
| a unicode string representing the number. | ||
| """ | ||
| # We do not want small negative numbers to display as -0. | ||
| if number < -FRACTION_ROUND_DOWN: | ||
| return u'-%s' % PrettyFraction(-number) | ||
| number = abs(number) | ||
| rounded = int(number) | ||
| fract = number - rounded | ||
| if fract >= FRACTION_ROUND_UP: | ||
| return str(rounded + 1) | ||
| errors_fractions = [] | ||
| for denominator, fraction_elements in FRACTIONS.items(): | ||
| numerator = int(round(denominator * fract)) | ||
| error = abs(fract - (float(numerator) / float(denominator))) | ||
| errors_fractions.append((error, fraction_elements[numerator])) | ||
| unused_error, fraction_text = min(errors_fractions) | ||
| if rounded and fraction_text: | ||
| return u'%d%s%s' % (rounded, spacer, fraction_text) | ||
| if rounded: | ||
| return str(rounded) | ||
| if fraction_text: | ||
| return fraction_text | ||
| return u'0' | ||
| def Duration(duration, separator=' '): | ||
| """Formats a nonnegative number of seconds into a human-readable string. | ||
| Args: | ||
| duration: A float duration in seconds. | ||
| separator: A string separator between days, hours, minutes and seconds. | ||
| Returns: | ||
| Formatted string like '5d 12h 30m 45s'. | ||
| """ | ||
| try: | ||
| delta = datetime.timedelta(seconds=duration) | ||
| except OverflowError: | ||
| return '>=' + TimeDelta(datetime.timedelta.max) | ||
| return TimeDelta(delta, separator=separator) | ||
| def TimeDelta(delta, separator=' '): | ||
| """Format a datetime.timedelta into a human-readable string. | ||
| Args: | ||
| delta: The datetime.timedelta to format. | ||
| separator: A string separator between days, hours, minutes and seconds. | ||
| Returns: | ||
| Formatted string like '5d 12h 30m 45s'. | ||
| """ | ||
| parts = [] | ||
| seconds = delta.seconds | ||
| if delta.days: | ||
| parts.append('%dd' % delta.days) | ||
| if seconds >= 3600: | ||
| parts.append('%dh' % (seconds // 3600)) | ||
| seconds %= 3600 | ||
| if seconds >= 60: | ||
| parts.append('%dm' % (seconds // 60)) | ||
| seconds %= 60 | ||
| seconds += delta.microseconds / 1e6 | ||
| if seconds or not parts: | ||
| parts.append('%gs' % seconds) | ||
| return separator.join(parts) | ||
| def NaturalSortKey(data): | ||
| """Key function for "natural sort" ordering. | ||
| This key function results in a lexigraph sort. For example: | ||
| - ['1, '3', '20'] (not ['1', '20', '3']). | ||
| - ['Model 9', 'Model 70 SE', 'Model 70 SE2'] | ||
| (not ['Model 70 SE', 'Model 70 SE2', 'Model 9']). | ||
| Usage: | ||
| new_list = sorted(old_list, key=humanize.NaturalSortKey) | ||
| or | ||
| list_sort_in_place.sort(key=humanize.NaturalSortKey) | ||
| Based on code by Steven Bazyl <sbazyl@google.com>. | ||
| Args: | ||
| data: str, The key being compared in a sort. | ||
| Returns: | ||
| A list which is comparable to other lists for the purpose of sorting. | ||
| """ | ||
| segments = DIGIT_SPLITTER(data) | ||
| for i, value in enumerate(segments): | ||
| if value.isdigit(): | ||
| segments[i] = int(value) | ||
| return segments |
| #!/usr/bin/env python |
| #!/usr/bin/env python | ||
| # -*- coding: utf-8 -*- | ||
| """Test for google.apputils.humanize.""" | ||
| import datetime | ||
| from google.apputils import basetest | ||
| from google.apputils import humanize | ||
| class HumanizeTest(basetest.TestCase): | ||
| def testCommas(self): | ||
| self.assertEqual('0', humanize.Commas(0)) | ||
| self.assertEqual('100', humanize.Commas(100)) | ||
| self.assertEqual('1,000', humanize.Commas(1000)) | ||
| self.assertEqual('10,000', humanize.Commas(10000)) | ||
| self.assertEqual('1,000,000', humanize.Commas(1e6)) | ||
| self.assertEqual('-1,000,000', humanize.Commas(-1e6)) | ||
| def testPlural(self): | ||
| self.assertEqual('0 objects', humanize.Plural(0, 'object')) | ||
| self.assertEqual('1 object', humanize.Plural(1, 'object')) | ||
| self.assertEqual('-1 objects', humanize.Plural(-1, 'object')) | ||
| self.assertEqual('42 objects', humanize.Plural(42, 'object')) | ||
| self.assertEqual('42 cats', humanize.Plural(42, 'cat')) | ||
| self.assertEqual('42 glasses', humanize.Plural(42, 'glass')) | ||
| self.assertEqual('42 potatoes', humanize.Plural(42, 'potato')) | ||
| self.assertEqual('42 cherries', humanize.Plural(42, 'cherry')) | ||
| self.assertEqual('42 monkeys', humanize.Plural(42, 'monkey')) | ||
| self.assertEqual('42 oxen', humanize.Plural(42, 'ox', 'oxen')) | ||
| self.assertEqual('42 indices', humanize.Plural(42, 'index')) | ||
| self.assertEqual( | ||
| '42 attorneys general', | ||
| humanize.Plural(42, 'attorney general', 'attorneys general')) | ||
| def testPluralWord(self): | ||
| self.assertEqual('vaxen', humanize.PluralWord(2, 'vax', plural='vaxen')) | ||
| self.assertEqual('cores', humanize.PluralWord(2, 'core')) | ||
| self.assertEqual('group', humanize.PluralWord(1, 'group')) | ||
| self.assertEqual('cells', humanize.PluralWord(0, 'cell')) | ||
| self.assertEqual('degree', humanize.PluralWord(1.0, 'degree')) | ||
| self.assertEqual('helloes', humanize.PluralWord(3.14, 'hello')) | ||
| def testWordSeries(self): | ||
| self.assertEqual('', humanize.WordSeries([])) | ||
| self.assertEqual('foo', humanize.WordSeries(['foo'])) | ||
| self.assertEqual('foo and bar', humanize.WordSeries(['foo', 'bar'])) | ||
| self.assertEqual( | ||
| 'foo, bar, and baz', humanize.WordSeries(['foo', 'bar', 'baz'])) | ||
| self.assertEqual( | ||
| 'foo, bar, or baz', humanize.WordSeries(['foo', 'bar', 'baz'], | ||
| conjunction='or')) | ||
| def testAddIndefiniteArticle(self): | ||
| self.assertEqual('a thing', humanize.AddIndefiniteArticle('thing')) | ||
| self.assertEqual('an object', humanize.AddIndefiniteArticle('object')) | ||
| self.assertEqual('a Porsche', humanize.AddIndefiniteArticle('Porsche')) | ||
| self.assertEqual('an Audi', humanize.AddIndefiniteArticle('Audi')) | ||
| def testDecimalPrefix(self): | ||
| self.assertEqual('0 m', humanize.DecimalPrefix(0, 'm')) | ||
| self.assertEqual('1 km', humanize.DecimalPrefix(1000, 'm')) | ||
| self.assertEqual('-1 km', humanize.DecimalPrefix(-1000, 'm')) | ||
| self.assertEqual('10 Gbps', humanize.DecimalPrefix(10e9, 'bps')) | ||
| self.assertEqual('6000 Yg', humanize.DecimalPrefix(6e27, 'g')) | ||
| self.assertEqual('12.1 km', humanize.DecimalPrefix(12100, 'm', precision=3)) | ||
| self.assertEqual('12 km', humanize.DecimalPrefix(12100, 'm', precision=2)) | ||
| self.assertEqual('1.15 km', humanize.DecimalPrefix(1150, 'm', precision=3)) | ||
| self.assertEqual('-1.15 km', humanize.DecimalPrefix(-1150, 'm', | ||
| precision=3)) | ||
| self.assertEqual('1.1 s', humanize.DecimalPrefix(1.12, 's', precision=2)) | ||
| self.assertEqual('-1.1 s', humanize.DecimalPrefix(-1.12, 's', precision=2)) | ||
| self.assertEqual('nan bps', humanize.DecimalPrefix(float('nan'), 'bps')) | ||
| self.assertEqual('inf bps', humanize.DecimalPrefix(float('inf'), 'bps')) | ||
| self.assertEqual('-inf bps', humanize.DecimalPrefix(float('-inf'), 'bps')) | ||
| self.assertEqual('-4 mm', | ||
| humanize.DecimalPrefix(-0.004, 'm', min_scale=None)) | ||
| self.assertEqual('0 m', humanize.DecimalPrefix(0, 'm', min_scale=None)) | ||
| self.assertEqual( | ||
| u'1 µs', | ||
| humanize.DecimalPrefix(0.0000013, 's', min_scale=None)) | ||
| self.assertEqual('3 km', humanize.DecimalPrefix(3000, 'm', min_scale=None)) | ||
| self.assertEqual( | ||
| '5000 TB', | ||
| humanize.DecimalPrefix(5e15, 'B', max_scale=4)) | ||
| self.assertEqual( | ||
| '5 mSWE', | ||
| humanize.DecimalPrefix(0.005, 'SWE', min_scale=None)) | ||
| self.assertEqual( | ||
| '0.0005 ms', | ||
| humanize.DecimalPrefix(5e-7, 's', min_scale=-1, precision=2)) | ||
| def testBinaryPrefix(self): | ||
| self.assertEqual('0 B', humanize.BinaryPrefix(0, 'B')) | ||
| self.assertEqual('1000 B', humanize.BinaryPrefix(1000, 'B')) | ||
| self.assertEqual('1 KiB', humanize.BinaryPrefix(1024, 'B')) | ||
| self.assertEqual('64 GiB', humanize.BinaryPrefix(2**36, 'B')) | ||
| self.assertEqual('65536 Yibit', humanize.BinaryPrefix(2**96, 'bit')) | ||
| self.assertEqual('1.25 KiB', humanize.BinaryPrefix(1280, 'B', precision=3)) | ||
| self.assertEqual('1.2 KiB', humanize.BinaryPrefix(1280, 'B', precision=2)) | ||
| def testScale(self): | ||
| self.assertEqual((12.1, 'km'), humanize.DecimalScale(12100, 'm')) | ||
| self.assertEqual((1.15, 'Mm'), humanize.DecimalScale(1150000, 'm')) | ||
| self.assertEqual((450, 'mSWE'), | ||
| humanize.DecimalScale(0.45, 'SWE', min_scale=None)) | ||
| self.assertEqual( | ||
| (250, u'µm'), | ||
| humanize.DecimalScale(1.0 / (4 * 1000), 'm', min_scale=None)) | ||
| value, unit = humanize.BinaryScale(200000000000, 'B') | ||
| self.assertAlmostEqual(value, 186.26, 2) | ||
| self.assertEqual(unit, 'GiB') | ||
| value, unit = humanize.BinaryScale(3000000000000, 'B') | ||
| self.assertAlmostEqual(value, 2.728, 3) | ||
| self.assertEqual(unit, 'TiB') | ||
| def testPrettyFraction(self): | ||
| # No rounded integer part | ||
| self.assertEqual(u'½', humanize.PrettyFraction(0.5)) | ||
| # Roundeded integer + fraction | ||
| self.assertEqual(u'6⅔', humanize.PrettyFraction(20.0 / 3.0)) | ||
| # Rounded integer, no fraction | ||
| self.assertEqual(u'2', humanize.PrettyFraction(2.00001)) | ||
| # No rounded integer, no fraction | ||
| self.assertEqual(u'0', humanize.PrettyFraction(0.001)) | ||
| # Round up | ||
| self.assertEqual(u'1', humanize.PrettyFraction(0.99)) | ||
| # No round up, edge case | ||
| self.assertEqual(u'⅞', humanize.PrettyFraction(0.9)) | ||
| # Negative fraction | ||
| self.assertEqual(u'-⅕', humanize.PrettyFraction(-0.2)) | ||
| # Negative close to zero (should not be -0) | ||
| self.assertEqual(u'0', humanize.PrettyFraction(-0.001)) | ||
| # Smallest fraction that should round down. | ||
| self.assertEqual(u'0', humanize.PrettyFraction(1.0 / 16.0)) | ||
| # Largest fraction should round up. | ||
| self.assertEqual(u'1', humanize.PrettyFraction(15.0 / 16.0)) | ||
| # Integer zero. | ||
| self.assertEqual(u'0', humanize.PrettyFraction(0)) | ||
| # Check that division yields fraction | ||
| self.assertEqual(u'⅘', humanize.PrettyFraction(4.0 / 5.0)) | ||
| # Custom spacer. | ||
| self.assertEqual(u'2 ½', humanize.PrettyFraction(2.5, spacer=' ')) | ||
| def testDuration(self): | ||
| self.assertEqual('2h', humanize.Duration(7200)) | ||
| self.assertEqual('5d 13h 47m 12s', humanize.Duration(481632)) | ||
| self.assertEqual('0s', humanize.Duration(0)) | ||
| self.assertEqual('59s', humanize.Duration(59)) | ||
| self.assertEqual('1m', humanize.Duration(60)) | ||
| self.assertEqual('1m 1s', humanize.Duration(61)) | ||
| self.assertEqual('1h 1s', humanize.Duration(3601)) | ||
| self.assertEqual('2h-2s', humanize.Duration(7202, separator='-')) | ||
| def testLargeDuration(self): | ||
| # The maximum seconds and days that can be stored in a datetime.timedelta | ||
| # object, as seconds. max_days is equal to MAX_DELTA_DAYS in Python's | ||
| # Modules/datetimemodule.c, converted to seconds. | ||
| max_seconds = 3600 * 24 - 1 | ||
| max_days = 999999999 * 24 * 60 * 60 | ||
| self.assertEqual('999999999d', humanize.Duration(max_days)) | ||
| self.assertEqual('999999999d 23h 59m 59s', | ||
| humanize.Duration(max_days + max_seconds)) | ||
| self.assertEqual('>=999999999d 23h 59m 60s', | ||
| humanize.Duration(max_days + max_seconds + 1)) | ||
| def testTimeDelta(self): | ||
| self.assertEqual('0s', humanize.TimeDelta(datetime.timedelta())) | ||
| self.assertEqual('2h', humanize.TimeDelta(datetime.timedelta(hours=2))) | ||
| self.assertEqual('1m', humanize.TimeDelta(datetime.timedelta(minutes=1))) | ||
| self.assertEqual('5d', humanize.TimeDelta(datetime.timedelta(days=5))) | ||
| self.assertEqual('1.25s', humanize.TimeDelta( | ||
| datetime.timedelta(seconds=1, microseconds=250000))) | ||
| self.assertEqual('1.5s', | ||
| humanize.TimeDelta(datetime.timedelta(seconds=1.5))) | ||
| self.assertEqual('4d 10h 5m 12.25s', humanize.TimeDelta( | ||
| datetime.timedelta(days=4, hours=10, minutes=5, seconds=12, | ||
| microseconds=250000))) | ||
| class NaturalSortKeyChunkingTest(basetest.TestCase): | ||
| def testChunkifySingleChars(self): | ||
| self.assertListEqual( | ||
| humanize.NaturalSortKey('a1b2c3'), | ||
| ['a', 1, 'b', 2, 'c', 3]) | ||
| def testChunkifyMultiChars(self): | ||
| self.assertListEqual( | ||
| humanize.NaturalSortKey('aa11bb22cc33'), | ||
| ['aa', 11, 'bb', 22, 'cc', 33]) | ||
| def testChunkifyComplex(self): | ||
| self.assertListEqual( | ||
| humanize.NaturalSortKey('one 11 -- two 44'), | ||
| ['one ', 11, ' -- two ', 44]) | ||
| class NaturalSortKeysortTest(basetest.TestCase): | ||
| def testNaturalSortKeySimpleWords(self): | ||
| self.test = ['pair', 'banana', 'apple'] | ||
| self.good = ['apple', 'banana', 'pair'] | ||
| self.test.sort(key=humanize.NaturalSortKey) | ||
| self.assertListEqual(self.test, self.good) | ||
| def testNaturalSortKeySimpleNums(self): | ||
| self.test = ['3333', '2222', '9999', '0000'] | ||
| self.good = ['0000', '2222', '3333', '9999'] | ||
| self.test.sort(key=humanize.NaturalSortKey) | ||
| self.assertListEqual(self.test, self.good) | ||
| def testNaturalSortKeySimpleDigits(self): | ||
| self.test = ['8', '3', '2'] | ||
| self.good = ['2', '3', '8'] | ||
| self.test.sort(key=humanize.NaturalSortKey) | ||
| self.assertListEqual(self.test, self.good) | ||
| def testVersionStrings(self): | ||
| self.test = ['1.2', '0.9', '1.1a2', '1.1a', '1', '1.2.1', '0.9.1'] | ||
| self.good = ['0.9', '0.9.1', '1', '1.1a', '1.1a2', '1.2', '1.2.1'] | ||
| self.test.sort(key=humanize.NaturalSortKey) | ||
| self.assertListEqual(self.test, self.good) | ||
| def testNaturalSortKeySimpleNumLong(self): | ||
| self.test = ['11', '9', '1', '200', '19', '20', '900'] | ||
| self.good = ['1', '9', '11', '19', '20', '200', '900'] | ||
| self.test.sort(key=humanize.NaturalSortKey) | ||
| self.assertListEqual(self.test, self.good) | ||
| def testNaturalSortKeyAlNum(self): | ||
| self.test = ['x10', 'x9', 'x1', 'x11'] | ||
| self.good = ['x1', 'x9', 'x10', 'x11'] | ||
| self.test.sort(key=humanize.NaturalSortKey) | ||
| self.assertListEqual(self.test, self.good) | ||
| def testNaturalSortKeyNumAlNum(self): | ||
| self.test = ['4x10', '4x9', '4x11', '5yy4', '3x1', '2x11'] | ||
| self.good = ['2x11', '3x1', '4x9', '4x10', '4x11', '5yy4'] | ||
| self.test.sort(key=humanize.NaturalSortKey) | ||
| self.assertListEqual(self.test, self.good) | ||
| def testNaturalSortKeyAlNumAl(self): | ||
| self.test = ['a9c', 'a4b', 'a10c', 'a1c', 'c10c', 'c10a', 'c9a'] | ||
| self.good = ['a1c', 'a4b', 'a9c', 'a10c', 'c9a', 'c10a', 'c10c'] | ||
| self.test.sort(key=humanize.NaturalSortKey) | ||
| self.assertListEqual(self.test, self.good) | ||
| class NaturalSortKeyBigTest(basetest.TestCase): | ||
| def testBig(self): | ||
| self.test = [ | ||
| '1000X Radonius Maximus', '10X Radonius', '200X Radonius', | ||
| '20X Radonius', '20X Radonius Prime', '30X Radonius', | ||
| '40X Radonius', 'Allegia 50 Clasteron', 'Allegia 500 Clasteron', | ||
| 'Allegia 51 Clasteron', 'Allegia 51B Clasteron', | ||
| 'Allegia 52 Clasteron', 'Allegia 60 Clasteron', 'Alpha 100', | ||
| 'Alpha 2', 'Alpha 200', 'Alpha 2A', 'Alpha 2A-8000', 'Alpha 2A-900', | ||
| 'Callisto Morphamax', 'Callisto Morphamax 500', | ||
| 'Callisto Morphamax 5000', 'Callisto Morphamax 600', | ||
| 'Callisto Morphamax 700', 'Callisto Morphamax 7000', | ||
| 'Callisto Morphamax 7000 SE', 'Callisto Morphamax 7000 SE2', | ||
| 'QRS-60 Intrinsia Machine', 'QRS-60F Intrinsia Machine', | ||
| 'QRS-62 Intrinsia Machine', 'QRS-62F Intrinsia Machine', | ||
| 'Xiph Xlater 10000', 'Xiph Xlater 2000', 'Xiph Xlater 300', | ||
| 'Xiph Xlater 40', 'Xiph Xlater 5', 'Xiph Xlater 50', | ||
| 'Xiph Xlater 500', 'Xiph Xlater 5000', 'Xiph Xlater 58'] | ||
| self.good = [ | ||
| '10X Radonius', | ||
| '20X Radonius', | ||
| '20X Radonius Prime', | ||
| '30X Radonius', | ||
| '40X Radonius', | ||
| '200X Radonius', | ||
| '1000X Radonius Maximus', | ||
| 'Allegia 50 Clasteron', | ||
| 'Allegia 51 Clasteron', | ||
| 'Allegia 51B Clasteron', | ||
| 'Allegia 52 Clasteron', | ||
| 'Allegia 60 Clasteron', | ||
| 'Allegia 500 Clasteron', | ||
| 'Alpha 2', | ||
| 'Alpha 2A', | ||
| 'Alpha 2A-900', | ||
| 'Alpha 2A-8000', | ||
| 'Alpha 100', | ||
| 'Alpha 200', | ||
| 'Callisto Morphamax', | ||
| 'Callisto Morphamax 500', | ||
| 'Callisto Morphamax 600', | ||
| 'Callisto Morphamax 700', | ||
| 'Callisto Morphamax 5000', | ||
| 'Callisto Morphamax 7000', | ||
| 'Callisto Morphamax 7000 SE', | ||
| 'Callisto Morphamax 7000 SE2', | ||
| 'QRS-60 Intrinsia Machine', | ||
| 'QRS-60F Intrinsia Machine', | ||
| 'QRS-62 Intrinsia Machine', | ||
| 'QRS-62F Intrinsia Machine', | ||
| 'Xiph Xlater 5', | ||
| 'Xiph Xlater 40', | ||
| 'Xiph Xlater 50', | ||
| 'Xiph Xlater 58', | ||
| 'Xiph Xlater 300', | ||
| 'Xiph Xlater 500', | ||
| 'Xiph Xlater 2000', | ||
| 'Xiph Xlater 5000', | ||
| 'Xiph Xlater 10000', | ||
| ] | ||
| self.test.sort(key=humanize.NaturalSortKey) | ||
| self.assertListEqual(self.test, self.good) | ||
| if __name__ == '__main__': | ||
| basetest.main() |
| google.apputils |
| Metadata-Version: 1.0 | ||
| Name: google-apputils | ||
| Version: 0.3.0 | ||
| Version: 0.4.0 | ||
| Summary: UNKNOWN | ||
@@ -5,0 +5,0 @@ Home-page: http://code.google.com/p/google-apputils-python |
@@ -13,2 +13,3 @@ LICENSE | ||
| google/apputils/file_util.py | ||
| google/apputils/humanize.py | ||
| google/apputils/resources.py | ||
@@ -26,2 +27,3 @@ google/apputils/run_script_module.py | ||
| google_apputils.egg-info/top_level.txt | ||
| tests/__init__.py | ||
| tests/app_test.py | ||
@@ -36,2 +38,3 @@ tests/app_test_helper.py | ||
| tests/file_util_test.py | ||
| tests/humanize_test.py | ||
| tests/resources_test.py | ||
@@ -38,0 +41,0 @@ tests/sh_test.py |
| #!/usr/bin/env python | ||
| from pkgutil import extend_path | ||
| __path__ = extend_path(__path__, __name__) | ||
| try: | ||
| import pkg_resources | ||
| pkg_resources.declare_namespace(__name__) | ||
| except ImportError: | ||
| from pkgutil import extend_path | ||
| __path__ = extend_path(__path__, __name__) |
| #!/usr/bin/env python | ||
| from pkgutil import extend_path | ||
| __path__ = extend_path(__path__, __name__) | ||
| try: | ||
| import pkg_resources | ||
| pkg_resources.declare_namespace(__name__) | ||
| except ImportError: | ||
| from pkgutil import extend_path | ||
| __path__ = extend_path(__path__, __name__) |
+42
-19
@@ -41,2 +41,5 @@ #!/usr/bin/env python | ||
| flags.DEFINE_boolean('run_with_pdb', 0, 'Set to true for PDB debug mode') | ||
| flags.DEFINE_boolean('pdb_post_mortem', 0, | ||
| 'Set to true to handle uncaught exceptions with PDB ' | ||
| 'post mortem.') | ||
| flags.DEFINE_boolean('run_with_profiling', 0, | ||
@@ -46,2 +49,5 @@ 'Set to true for profiling the script. ' | ||
| 'change over time.') | ||
| flags.DEFINE_string('profile_file', None, | ||
| 'Dump profile information to a file (for python -m ' | ||
| 'pstats). Implies --run_with_profiling.') | ||
| flags.DEFINE_boolean('use_cprofile_for_profiling', True, | ||
@@ -80,5 +86,6 @@ 'Use cProfile instead of the profile module for ' | ||
| """Special boolean flag that displays usage and raises SystemExit.""" | ||
| NAME = 'help' | ||
| def __init__(self): | ||
| flags.BooleanFlag.__init__(self, 'help', 0, 'show this help', | ||
| flags.BooleanFlag.__init__(self, self.NAME, 0, 'show this help', | ||
| short_name='?', allow_override=1) | ||
@@ -88,12 +95,19 @@ | ||
| if arg: | ||
| usage(writeto_stdout=1) | ||
| usage(shorthelp=1, writeto_stdout=1) | ||
| # Advertise --helpfull on stdout, since usage() was on stdout. | ||
| print 'Try --helpfull to get a list of all flags.' | ||
| sys.exit(1) | ||
| class HelpXMLFlag(flags.BooleanFlag): | ||
| """Similar to HelpFlag, but generates output in XML format.""" | ||
| class HelpshortFlag(HelpFlag): | ||
| """--helpshort is an alias for --help.""" | ||
| NAME = 'helpshort' | ||
| class HelpfullFlag(flags.BooleanFlag): | ||
| """Display help for flags in this module and all dependent modules.""" | ||
| def __init__(self): | ||
| flags.BooleanFlag.__init__(self, 'helpxml', False, | ||
| 'like --help, but generates XML output', | ||
| flags.BooleanFlag.__init__(self, 'helpfull', 0, 'show full help', | ||
| allow_override=1) | ||
@@ -103,12 +117,12 @@ | ||
| if arg: | ||
| flags.FLAGS.WriteHelpInXMLFormat(sys.stdout) | ||
| usage(writeto_stdout=1) | ||
| sys.exit(1) | ||
| class HelpshortFlag(flags.BooleanFlag): | ||
| """Special bool flag that calls usage(shorthelp=1) and raises SystemExit.""" | ||
| class HelpXMLFlag(flags.BooleanFlag): | ||
| """Similar to HelpfullFlag, but generates output in XML format.""" | ||
| def __init__(self): | ||
| flags.BooleanFlag.__init__(self, 'helpshort', 0, | ||
| 'show usage only for this module', | ||
| flags.BooleanFlag.__init__(self, 'helpxml', False, | ||
| 'like --helpfull, but generates XML output', | ||
| allow_override=1) | ||
@@ -118,3 +132,3 @@ | ||
| if arg: | ||
| usage(shorthelp=1, writeto_stdout=1) | ||
| flags.FLAGS.WriteHelpInXMLFormat(sys.stdout) | ||
| sys.exit(1) | ||
@@ -124,3 +138,2 @@ | ||
| class BuildDataFlag(flags.BooleanFlag): | ||
| """Boolean flag that writes build data to stdout and exits.""" | ||
@@ -145,3 +158,3 @@ | ||
| sys.stderr.write('FATAL Flags parsing error: %s\n' % error) | ||
| sys.stderr.write('Pass --help or --helpshort to see help on flags.\n') | ||
| sys.stderr.write('Pass --helpshort or --helpfull to see help on flags.\n') | ||
| sys.exit(1) | ||
@@ -156,3 +169,3 @@ | ||
| # Use a global to ensure idempotence. | ||
| # pylint: disable-msg=W0603 | ||
| # pylint: disable=global-statement | ||
| global _define_help_flags_called | ||
@@ -162,4 +175,5 @@ | ||
| flags.DEFINE_flag(HelpFlag()) | ||
| flags.DEFINE_flag(HelpshortFlag()) # alias for --help | ||
| flags.DEFINE_flag(HelpfullFlag()) | ||
| flags.DEFINE_flag(HelpXMLFlag()) | ||
| flags.DEFINE_flag(HelpshortFlag()) | ||
| flags.DEFINE_flag(BuildDataFlag()) | ||
@@ -200,3 +214,3 @@ _define_help_flags_called = True | ||
| else: | ||
| if FLAGS.run_with_profiling: | ||
| if FLAGS.run_with_profiling or FLAGS.profile_file: | ||
| # Avoid import overhead since most apps (including performance-sensitive | ||
@@ -210,3 +224,6 @@ # ones) won't be run with profiling. | ||
| profiler = profile.Profile() | ||
| atexit.register(profiler.print_stats) | ||
| if FLAGS.profile_file: | ||
| atexit.register(profiler.dump_stats, FLAGS.profile_file) | ||
| else: | ||
| atexit.register(profiler.print_stats) | ||
| retval = profiler.runcall(main, argv) | ||
@@ -218,2 +235,7 @@ sys.exit(retval) | ||
| usage(shorthelp=1, detailed_error=error, exitcode=error.exitcode) | ||
| except: | ||
| if FLAGS.pdb_post_mortem: | ||
| traceback.print_exc() | ||
| pdb.post_mortem() | ||
| raise | ||
@@ -254,3 +276,4 @@ | ||
| del tb | ||
| sys.exc_clear() | ||
| if hasattr(sys, 'exc_clear'): | ||
| sys.exc_clear() # This functionality is gone in Python 3. | ||
@@ -257,0 +280,0 @@ try: |
| #!/usr/bin/env python | ||
| # | ||
| # Copyright 2007 Google Inc. All Rights Reserved. | ||
@@ -34,3 +35,5 @@ # | ||
| This module itself registers the command 'help' that allows users | ||
| to retrieve help for all or specific commands. | ||
| to retrieve help for all or specific commands. 'help' is the default | ||
| command executed if no command is expressed, unless a different default | ||
| command is set with SetDefaultCommand. | ||
@@ -140,2 +143,3 @@ Example: | ||
| _cmd_alias_list = {} # list of command_names index by command_alias | ||
| _cmd_default = 'help' # command to execute if none explicitly given | ||
@@ -163,3 +167,3 @@ | ||
| """Return list of registered commands.""" | ||
| # pylint: disable-msg=W0602 | ||
| # pylint: disable=global-variable-not-assigned | ||
| global _cmd_list | ||
@@ -171,3 +175,3 @@ return _cmd_list | ||
| """Return list of registered command aliases.""" | ||
| # pylint: disable-msg=W0602 | ||
| # pylint: disable=global-variable-not-assigned | ||
| global _cmd_alias_list | ||
@@ -284,6 +288,3 @@ return _cmd_alias_list | ||
| # Run command | ||
| if FLAGS.run_with_pdb: | ||
| ret = pdb.runcall(self.Run, argv) | ||
| else: | ||
| ret = self.Run(argv) | ||
| ret = self.Run(argv) | ||
| if ret is None: | ||
@@ -296,2 +297,7 @@ ret = 0 | ||
| app.usage(shorthelp=1, detailed_error=error, exitcode=error.exitcode) | ||
| except: | ||
| if FLAGS.pdb_post_mortem: | ||
| traceback.print_exc() | ||
| pdb.post_mortem() | ||
| raise | ||
| finally: | ||
@@ -420,3 +426,3 @@ # Restore app.usage and remove this command's flags from the global flags. | ||
| # Update global command list. | ||
| # pylint: disable-msg=W0602 | ||
| # pylint: disable=global-variable-not-assigned | ||
| global _cmd_list | ||
@@ -650,3 +656,6 @@ global _cmd_alias_list | ||
| command = GetCommandByName(name) | ||
| cmd_help = command.CommandGetHelp(GetCommandArgv(), cmd_names=cmd_names) | ||
| try: | ||
| cmd_help = command.CommandGetHelp(GetCommandArgv(), cmd_names=cmd_names) | ||
| except Exception as error: | ||
| cmd_help = "Internal error for command '%s': %s." % (name, str(error)) | ||
| cmd_help = cmd_help.strip() | ||
@@ -672,3 +681,3 @@ all_names = ', '.join([name] + (command.CommandGetAliases() or [])) | ||
| # We do not register them globally so that they do not reappear. | ||
| # pylint: disable-msg=W0212 | ||
| # pylint: disable=protected-access | ||
| cmd_flags = command._command_flags | ||
@@ -706,3 +715,3 @@ if cmd_flags.RegisteredFlags(): | ||
| # Update the global commands. | ||
| # pylint: disable-msg=W0603 | ||
| # pylint: disable=global-statement | ||
| global _cmd_argv | ||
@@ -728,3 +737,3 @@ try: | ||
| # Update the global commands. | ||
| # pylint: disable-msg=W0603 | ||
| # pylint: disable=global-statement | ||
| global _cmd_argv | ||
@@ -743,11 +752,23 @@ _cmd_argv = ParseFlagsWithUsage(_cmd_argv) | ||
| def _CommandsStart(): | ||
| def SetDefaultCommand(default_command): | ||
| """Change the default command to execute if none is explicitly given. | ||
| Args: | ||
| default_command: str, the name of the command to execute by default. | ||
| """ | ||
| # pylint: disable=global-statement,g-bad-name | ||
| global _cmd_default | ||
| _cmd_default = default_command | ||
| def _CommandsStart(unused_argv): | ||
| """Main initialization. | ||
| This initializes flag values, and calls __main__.main(). Only non-flag | ||
| arguments are passed to main(). The return value of main() is used as the | ||
| exit status. | ||
| Calls __main__.main(), and then the command indicated by the first | ||
| non-flag argument, or 'help' if no argument was given. (The command | ||
| to execute if no flag is given can be changed via SetDefaultCommand). | ||
| Only non-flag arguments are passed to main(). If main does not call | ||
| sys.exit, the return value of the command is used as the exit status. | ||
| """ | ||
| app.RegisterAndParseFlagsWithUsage() | ||
| # The following is supposed to return after registering additional commands | ||
@@ -766,3 +787,5 @@ try: | ||
| else: | ||
| command = GetCommandByName('help') | ||
| command = GetCommandByName(_cmd_default) | ||
| if command is None: | ||
| ShortHelpAndExit("FATAL Command '%s' unknown" % _cmd_default) | ||
| sys.exit(command.CommandRun(GetCommandArgv())) | ||
@@ -780,3 +803,7 @@ | ||
| app.parse_flags_with_usage = ParseFlagsWithUsage | ||
| app.really_start = _CommandsStart | ||
| original_really_start = app.really_start | ||
| def InterceptReallyStart(): | ||
| original_really_start(main=_CommandsStart) | ||
| app.really_start = InterceptReallyStart | ||
| app.usage = _ReplacementAppUsage | ||
@@ -783,0 +810,0 @@ return app.run() |
+349
-49
@@ -24,6 +24,7 @@ #!/usr/bin/env python | ||
| import commands | ||
| import collections | ||
| import difflib | ||
| import getpass | ||
| import itertools | ||
| import json | ||
| import os | ||
@@ -35,3 +36,12 @@ import re | ||
| import types | ||
| import urlparse | ||
| try: | ||
| import faulthandler | ||
| except ImportError: | ||
| # //testing/pybase:pybase can't have deps on any extension modules as it | ||
| # is used by code that is executed in such a way it cannot import them. :( | ||
| # We use faulthandler if it is available (either via a user declared dep | ||
| # or from the Python 3.3+ standard library). | ||
| faulthandler = None | ||
@@ -268,17 +278,19 @@ # unittest2 is a backport of Python 2.7's unittest for Python 2.6, so | ||
| # The closure here makes sure that each new test() function remembers its | ||
| # own values of cls_test and test_name. Without this, they'd all point to | ||
| # the values from the last iteration of the loop, causing some arbitrary | ||
| # test method to run multiple times and the others never. :( | ||
| def test_closure(cls_test, test_name): | ||
| def test(self, *args, **kargs): | ||
| leaf = self.__class__ | ||
| leaf.__tests_to_run.discard(test_name) | ||
| return cls_test(self, *args, **kargs) | ||
| return test | ||
| for test_name in test_names: | ||
| cls_test = getattr(cls, test_name) | ||
| # The default parameters here make sure that each new test() function | ||
| # remembers its own values of cls_test and test_name. Without these | ||
| # default parameters, they'd all point to the values from the last | ||
| # iteration of the loop, causing some arbitrary test method to run | ||
| # multiple times and the others never. :( | ||
| def test(self, cls_test=cls_test, test_name=test_name): | ||
| leaf = self.__class__ | ||
| leaf.__tests_to_run.discard(test_name) | ||
| return cls_test(self) | ||
| BeforeAfterTestCaseMeta.SetMethod( | ||
| cls, test_name, test_closure(cls_test, test_name)) | ||
| BeforeAfterTestCaseMeta.SetMethod(cls, test_name, test) | ||
| @staticmethod | ||
@@ -380,2 +392,6 @@ def SetBeforeAfterTestCaseAttr(): | ||
| # TODO(user): Provide an assertItemsEqual method when our super class | ||
| # does not provide one. That method went away in Python 3.2 (renamed | ||
| # to assertCountEqual, or is that different? investigate). | ||
| def assertSameElements(self, expected_seq, actual_seq, msg=None): | ||
@@ -437,5 +453,5 @@ """Assert that two sequences have the same elements (in any order). | ||
| if msg: | ||
| raise self.failureException(msg) | ||
| failure_message = ['\n'] | ||
| failure_message = [msg, ':\n'] | ||
| else: | ||
| failure_message = ['\n'] | ||
| for line in difflib.ndiff(first.splitlines(True), second.splitlines(True)): | ||
@@ -487,13 +503,30 @@ failure_message.append(line) | ||
| if isinstance(regexes, basestring): | ||
| self.fail('regexes is a string; it needs to be a list of strings.') | ||
| self.fail('regexes is a string; use assertRegexpMatches instead.') | ||
| if not regexes: | ||
| self.fail('No regexes specified.') | ||
| regex = '(?:%s)' % ')|(?:'.join(regexes) | ||
| regex_type = type(regexes[0]) | ||
| for regex in regexes[1:]: | ||
| if type(regex) is not regex_type: | ||
| self.fail('regexes list must all be the same type.') | ||
| if regex_type is bytes and isinstance(actual_str, unicode): | ||
| regexes = [regex.decode('utf-8') for regex in regexes] | ||
| regex_type = unicode | ||
| elif regex_type is unicode and isinstance(actual_str, bytes): | ||
| regexes = [regex.encode('utf-8') for regex in regexes] | ||
| regex_type = bytes | ||
| if regex_type is unicode: | ||
| regex = u'(?:%s)' % u')|(?:'.join(regexes) | ||
| elif regex_type is bytes: | ||
| regex = b'(?:' + (b')|(?:'.join(regexes)) + b')' | ||
| else: | ||
| self.fail('Only know how to deal with unicode str or bytes regexes.') | ||
| if not re.search(regex, actual_str, re.MULTILINE): | ||
| self.fail(message or ('String "%s" does not contain any of these ' | ||
| self.fail(message or ('"%s" does not contain any of these ' | ||
| 'regexes: %s.' % (actual_str, regexes))) | ||
| def assertCommandSucceeds(self, command, regexes=[''], env=None, | ||
| def assertCommandSucceeds(self, command, regexes=(b'',), env=None, | ||
| close_fds=True): | ||
@@ -504,3 +537,3 @@ """Asserts that a shell command succeeds (i.e. exits with code 0). | ||
| command: List or string representing the command to run. | ||
| regexes: List of regular expression strings. | ||
| regexes: List of regular expression byte strings that match success. | ||
| env: Dictionary of environment variable settings. | ||
@@ -512,5 +545,10 @@ close_fds: Whether or not to close all open fd's in the child after | ||
| # Accommodate code which listed their output regexes w/o the b'' prefix by | ||
| # converting them to bytes for the user. | ||
| if isinstance(regexes[0], unicode): | ||
| regexes = [regex.encode('utf-8') for regex in regexes] | ||
| command_string = GetCommandString(command) | ||
| self.assert_( | ||
| ret_code == 0, | ||
| self.assertEqual( | ||
| ret_code, 0, | ||
| 'Running command\n' | ||
@@ -546,5 +584,10 @@ '%s failed with error code %s and message\n' | ||
| # Accommodate code which listed their output regexes w/o the b'' prefix by | ||
| # converting them to bytes for the user. | ||
| if isinstance(regexes[0], unicode): | ||
| regexes = [regex.encode('utf-8') for regex in regexes] | ||
| command_string = GetCommandString(command) | ||
| self.assert_( | ||
| ret_code != 0, | ||
| self.assertNotEqual( | ||
| ret_code, 0, | ||
| 'The following command succeeded while expected to fail:\n%s' % | ||
@@ -579,3 +622,3 @@ _QuoteLongString(command_string)) | ||
| callable_obj(*args, **kwargs) | ||
| except expected_exception, err: | ||
| except expected_exception as err: | ||
| self.assert_(predicate(err), | ||
@@ -605,3 +648,3 @@ '%r does not match predicate %r' % (err, predicate)) | ||
| callable_obj(*args, **kwargs) | ||
| except expected_exception, err: | ||
| except expected_exception as err: | ||
| actual_exception_message = str(err) | ||
@@ -757,2 +800,147 @@ self.assert_(expected_exception_message == actual_exception_message, | ||
| def assertDictEqual(self, a, b, msg=None): | ||
| """Raises AssertionError if a and b are not equal dictionaries. | ||
| Args: | ||
| a: A dict, the expected value. | ||
| b: A dict, the actual value. | ||
| msg: An optional str, the associated message. | ||
| Raises: | ||
| AssertionError: if the dictionaries are not equal. | ||
| """ | ||
| self.assertIsInstance(a, dict, 'First argument is not a dictionary') | ||
| self.assertIsInstance(b, dict, 'Second argument is not a dictionary') | ||
| def Sorted(iterable): | ||
| try: | ||
| return sorted(iterable) # In 3.3, unordered objects are possible. | ||
| except TypeError: | ||
| return list(iterable) | ||
| if a == b: | ||
| return | ||
| a_items = Sorted(a.iteritems()) | ||
| b_items = Sorted(b.iteritems()) | ||
| unexpected = [] | ||
| missing = [] | ||
| different = [] | ||
| safe_repr = unittest.util.safe_repr | ||
| def Repr(dikt): | ||
| """Deterministic repr for dict.""" | ||
| # Sort the entries based on their repr, not based on their sort order, | ||
| # which will be non-deterministic across executions, for many types. | ||
| entries = sorted((safe_repr(k), safe_repr(v)) | ||
| for k, v in dikt.iteritems()) | ||
| return '{%s}' % (', '.join('%s: %s' % pair for pair in entries)) | ||
| message = ['%s != %s%s' % (Repr(a), Repr(b), ' (%s)' % msg if msg else '')] | ||
| # The standard library default output confounds lexical difference with | ||
| # value difference; treat them separately. | ||
| for a_key, a_value in a_items: | ||
| if a_key not in b: | ||
| unexpected.append((a_key, a_value)) | ||
| elif a_value != b[a_key]: | ||
| different.append((a_key, a_value, b[a_key])) | ||
| if unexpected: | ||
| message.append( | ||
| 'Unexpected, but present entries:\n%s' % ''.join( | ||
| '%s: %s\n' % (safe_repr(k), safe_repr(v)) for k, v in unexpected)) | ||
| if different: | ||
| message.append( | ||
| 'repr() of differing entries:\n%s' % ''.join( | ||
| '%s: %s != %s\n' % (safe_repr(k), safe_repr(a_value), | ||
| safe_repr(b_value)) | ||
| for k, a_value, b_value in different)) | ||
| for b_key, b_value in b_items: | ||
| if b_key not in a: | ||
| missing.append((b_key, b_value)) | ||
| if missing: | ||
| message.append( | ||
| 'Missing entries:\n%s' % ''.join( | ||
| ('%s: %s\n' % (safe_repr(k), safe_repr(v)) for k, v in missing))) | ||
| raise self.failureException('\n'.join(message)) | ||
| def assertUrlEqual(self, a, b): | ||
| """Asserts that urls are equal, ignoring ordering of query params.""" | ||
| parsed_a = urlparse.urlparse(a) | ||
| parsed_b = urlparse.urlparse(b) | ||
| self.assertEqual(parsed_a.scheme, parsed_b.scheme) | ||
| self.assertEqual(parsed_a.netloc, parsed_b.netloc) | ||
| self.assertEqual(parsed_a.path, parsed_b.path) | ||
| self.assertEqual(parsed_a.fragment, parsed_b.fragment) | ||
| self.assertEqual(sorted(parsed_a.params.split(';')), | ||
| sorted(parsed_b.params.split(';'))) | ||
| self.assertDictEqual(urlparse.parse_qs(parsed_a.query), | ||
| urlparse.parse_qs(parsed_b.query)) | ||
| def assertSameStructure(self, a, b, aname='a', bname='b', msg=None): | ||
| """Asserts that two values contain the same structural content. | ||
| The two arguments should be data trees consisting of trees of dicts and | ||
| lists. They will be deeply compared by walking into the contents of dicts | ||
| and lists; other items will be compared using the == operator. | ||
| If the two structures differ in content, the failure message will indicate | ||
| the location within the structures where the first difference is found. | ||
| This may be helpful when comparing large structures. | ||
| Args: | ||
| a: The first structure to compare. | ||
| b: The second structure to compare. | ||
| aname: Variable name to use for the first structure in assertion messages. | ||
| bname: Variable name to use for the second structure. | ||
| msg: Additional text to include in the failure message. | ||
| """ | ||
| # Accumulate all the problems found so we can report all of them at once | ||
| # rather than just stopping at the first | ||
| problems = [] | ||
| _WalkStructureForProblems(a, b, aname, bname, problems) | ||
| # Avoid spamming the user toooo much | ||
| max_problems_to_show = self.maxDiff // 80 | ||
| if len(problems) > max_problems_to_show: | ||
| problems = problems[0:max_problems_to_show-1] + ['...'] | ||
| if problems: | ||
| failure_message = '; '.join(problems) | ||
| if msg: | ||
| failure_message += (': ' + msg) | ||
| self.fail(failure_message) | ||
| def assertJsonEqual(self, first, second, msg=None): | ||
| """Asserts that the JSON objects defined in two strings are equal. | ||
| A summary of the differences will be included in the failure message | ||
| using assertSameStructure. | ||
| Args: | ||
| first: A string contining JSON to decode and compare to second. | ||
| second: A string contining JSON to decode and compare to first. | ||
| msg: Additional text to include in the failure message. | ||
| """ | ||
| try: | ||
| first_structured = json.loads(first) | ||
| except ValueError as e: | ||
| raise ValueError('could not decode first JSON value %s: %s' % | ||
| (first, e)) | ||
| try: | ||
| second_structured = json.loads(second) | ||
| except ValueError as e: | ||
| raise ValueError('could not decode second JSON value %s: %s' % | ||
| (second, e)) | ||
| self.assertSameStructure(first_structured, second_structured, | ||
| aname='first', bname='second', msg=msg) | ||
| def getRecordedProperties(self): | ||
@@ -942,14 +1130,38 @@ """Return any properties that the user has recorded.""" | ||
| # We want to emit exactly one notice to stderr telling the user where to look | ||
| # for their stdout or stderr that may have been consumed to aid debugging. | ||
| _notified_test_output_path = '' | ||
| def _MaybeNotifyAboutTestOutput(outdir): | ||
| global _notified_test_output_path | ||
| if _notified_test_output_path != outdir: | ||
| _notified_test_output_path = outdir | ||
| sys.stderr.write('\nNOTE: Some tests capturing output into: %s\n' % outdir) | ||
| # TODO(user): Make CaptureTest* be usable as context managers to easily stop | ||
| # capturing at the appropriate time to make debugging failures much easier. | ||
| # Public interface | ||
| def CaptureTestStdout(outfile=None): | ||
| def CaptureTestStdout(outfile=''): | ||
| """Capture the stdout stream to a file until StopCapturing() is called.""" | ||
| if not outfile: | ||
| outfile = os.path.join(FLAGS.test_tmpdir, 'captured.out') | ||
| outdir = FLAGS.test_tmpdir | ||
| else: | ||
| outdir = os.path.dirname(outfile) | ||
| _MaybeNotifyAboutTestOutput(outdir) | ||
| _CaptureTestOutput(sys.stdout, outfile) | ||
| def CaptureTestStderr(outfile=None): | ||
| def CaptureTestStderr(outfile=''): | ||
| """Capture the stderr stream to a file until StopCapturing() is called.""" | ||
| if not outfile: | ||
| outfile = os.path.join(FLAGS.test_tmpdir, 'captured.err') | ||
| outdir = FLAGS.test_tmpdir | ||
| else: | ||
| outdir = os.path.dirname(outfile) | ||
| _MaybeNotifyAboutTestOutput(outdir) | ||
| _CaptureTestOutput(sys.stderr, outfile) | ||
@@ -967,2 +1179,3 @@ | ||
| def StopCapturing(): | ||
| """Stop capturing redirected output. Debugging sucks if you forget!""" | ||
| while _captured_streams: | ||
@@ -976,3 +1189,5 @@ _, cap_stream = _captured_streams.popitem() | ||
| """Write data into file named filename.""" | ||
| fd = os.open(filename, os.O_CREAT | os.O_TRUNC | os.O_WRONLY, 0600) | ||
| fd = os.open(filename, os.O_CREAT | os.O_TRUNC | os.O_WRONLY, 0o600) | ||
| if not isinstance(data, (bytes, bytearray)): | ||
| data = data.encode('utf-8') | ||
| os.write(fd, data) | ||
@@ -982,2 +1197,45 @@ os.close(fd) | ||
| _INT_TYPES = (int, long) # Sadly there is no types.IntTypes defined for us. | ||
| def _WalkStructureForProblems(a, b, aname, bname, problem_list): | ||
| """The recursive comparison behind assertSameStructure.""" | ||
| if type(a) != type(b) and not ( | ||
| isinstance(a, _INT_TYPES) and isinstance(b, _INT_TYPES)): | ||
| # We do not distinguish between int and long types as 99.99% of Python 2 | ||
| # code should never care. They collapse into a single type in Python 3. | ||
| problem_list.append('%s is a %r but %s is a %r' % | ||
| (aname, type(a), bname, type(b))) | ||
| # If they have different types there's no point continuing | ||
| return | ||
| if isinstance(a, collections.Mapping): | ||
| for k in a: | ||
| if k in b: | ||
| _WalkStructureForProblems(a[k], b[k], | ||
| '%s[%r]' % (aname, k), '%s[%r]' % (bname, k), | ||
| problem_list) | ||
| else: | ||
| problem_list.append('%s has [%r] but %s does not' % (aname, k, bname)) | ||
| for k in b: | ||
| if k not in a: | ||
| problem_list.append('%s lacks [%r] but %s has it' % (aname, k, bname)) | ||
| # Strings are Sequences but we'll just do those with regular != | ||
| elif isinstance(a, collections.Sequence) and not isinstance(a, basestring): | ||
| minlen = min(len(a), len(b)) | ||
| for i in xrange(minlen): | ||
| _WalkStructureForProblems(a[i], b[i], | ||
| '%s[%d]' % (aname, i), '%s[%d]' % (bname, i), | ||
| problem_list) | ||
| for i in xrange(minlen, len(a)): | ||
| problem_list.append('%s has [%i] but %s does not' % (aname, i, bname)) | ||
| for i in xrange(minlen, len(b)): | ||
| problem_list.append('%s lacks [%i] but %s has it' % (aname, i, bname)) | ||
| else: | ||
| if a != b: | ||
| problem_list.append('%s is %r but %s is %r' % (aname, a, bname, b)) | ||
| class OutputDifferedError(AssertionError): | ||
@@ -991,16 +1249,46 @@ pass | ||
| def _DiffViaExternalProgram(lhs, rhs, external_diff): | ||
| """Compare two files using an external program; raise if it reports error.""" | ||
| # The behavior of this function matches the old _Diff() method behavior | ||
| # when a TEST_DIFF environment variable was set. A few old things at | ||
| # Google depended on that functionality. | ||
| command = [external_diff, lhs, rhs] | ||
| try: | ||
| subprocess.check_output(command, close_fds=True, stderr=subprocess.STDOUT) | ||
| return True # No diffs. | ||
| except subprocess.CalledProcessError as error: | ||
| failure_output = error.output | ||
| if error.returncode == 1: | ||
| raise OutputDifferedError('\nRunning %s\n%s\nTest output differed from' | ||
| ' golden file\n' % (command, failure_output)) | ||
| except EnvironmentError as error: | ||
| failure_output = str(error) | ||
| # Running the program failed in some way that wasn't a diff. | ||
| raise DiffFailureError('\nRunning %s\n%s\nFailure diffing test output' | ||
| ' with golden file\n' % (command, failure_output)) | ||
| def _Diff(lhs, rhs): | ||
| """Run standard unix 'diff' against two files.""" | ||
| """Given two pathnames, compare two files. Raise if they differ.""" | ||
| # Some people rely on being able to specify TEST_DIFF in the environment to | ||
| # have tests use their own diff wrapper for use when updating golden data. | ||
| external_diff = os.environ.get('TEST_DIFF') | ||
| if external_diff: | ||
| return _DiffViaExternalProgram(lhs, rhs, external_diff) | ||
| try: | ||
| with open(lhs, 'rt') as lhs_f: | ||
| with open(rhs, 'rt') as rhs_f: | ||
| diff_text = ''.join( | ||
| difflib.unified_diff(lhs_f.readlines(), rhs_f.readlines())) | ||
| if not diff_text: | ||
| return True | ||
| raise OutputDifferedError('\nComparing %s and %s\nTest output differed ' | ||
| 'from golden file:\n%s' % (lhs, rhs, diff_text)) | ||
| except EnvironmentError as error: | ||
| # Unable to read the files. | ||
| raise DiffFailureError('\nComparing %s and %s\nFailure diffing test output ' | ||
| 'with golden file: %s\n' % (lhs, rhs, error)) | ||
| cmd = '${TEST_DIFF:-diff} %s %s' % (commands.mkarg(lhs), commands.mkarg(rhs)) | ||
| (status, output) = commands.getstatusoutput(cmd) | ||
| if os.WIFEXITED(status) and os.WEXITSTATUS(status) == 1: | ||
| # diff outputs must be the same as c++ and shell | ||
| raise OutputDifferedError('\nRunning %s\n%s\nTest output differed ' | ||
| 'from golden file\n' % (cmd, output)) | ||
| elif not os.WIFEXITED(status) or os.WEXITSTATUS(status) != 0: | ||
| raise DiffFailureError('\nRunning %s\n%s\nFailure diffing test output ' | ||
| 'with golden file\n' % (cmd, output)) | ||
| def DiffTestStringFile(data, golden): | ||
@@ -1015,7 +1303,7 @@ """Diff data agains a golden file.""" | ||
| """Diff two strings.""" | ||
| data1_file = os.path.join(FLAGS.test_tmpdir, 'provided_1.dat') | ||
| _WriteTestData(data1, data1_file) | ||
| data2_file = os.path.join(FLAGS.test_tmpdir, 'provided_2.dat') | ||
| _WriteTestData(data2, data2_file) | ||
| _Diff(data1_file, data2_file) | ||
| diff_text = ''.join( | ||
| difflib.unified_diff(data1.splitlines(True), data2.splitlines(True))) | ||
| if not diff_text: | ||
| return | ||
| raise OutputDifferedError('\nTest strings differed:\n%s' % diff_text) | ||
@@ -1081,2 +1369,7 @@ | ||
| """ | ||
| if isinstance(s, (bytes, bytearray)): | ||
| try: | ||
| s = s.decode('utf-8') | ||
| except UnicodeDecodeError: | ||
| s = str(s) | ||
| return ('8<-----------\n' + | ||
@@ -1135,2 +1428,4 @@ s + '\n' + | ||
| def _RunInApp(function, args, kwargs): | ||
@@ -1171,7 +1466,12 @@ """Executes a set of Python unit tests, ensuring app.really_start. | ||
| function: basetest.RunTests or a similar function. It will be called as | ||
| function(argv, args, kwargs) where argv is a list containing the | ||
| elements of sys.argv without the command-line flags. | ||
| function(argv, args, kwargs) where argv is a list containing the | ||
| elements of sys.argv without the command-line flags. | ||
| args: Positional arguments passed through to unittest.TestProgram.__init__. | ||
| kwargs: Keyword arguments passed through to unittest.TestProgram.__init__. | ||
| """ | ||
| if faulthandler: | ||
| try: | ||
| faulthandler.enable() | ||
| except Exception as e: | ||
| sys.stderr.write('faulthandler.enable() failed %r; ignoring.\n' % e) | ||
| if _IsInAppMain(): | ||
@@ -1178,0 +1478,0 @@ # Save command-line flags so the side effects of FLAGS(sys.argv) can be |
+114
-11
@@ -31,2 +31,3 @@ #!/usr/bin/env python | ||
| import types | ||
| import warnings | ||
@@ -52,2 +53,14 @@ from dateutil import parser | ||
| def MicrosecondsToSeconds(microseconds): | ||
| """Convert microseconds to seconds. | ||
| Args: | ||
| microseconds: A number representing some duration of time measured in | ||
| microseconds. | ||
| Returns: | ||
| A number representing the same duration of time measured in seconds. | ||
| """ | ||
| return microseconds / _MICROSECONDS_PER_SECOND_F | ||
| def _GetCurrentTimeMicros(): | ||
@@ -86,2 +99,67 @@ """Get the current time in microseconds, in UTC. | ||
| def DatetimeToUTCMicros(date): | ||
| """Converts a datetime object to microseconds since the epoch in UTC. | ||
| Args: | ||
| date: A datetime to convert. | ||
| Returns: | ||
| The number of microseconds since the epoch, in UTC, represented by the input | ||
| datetime. | ||
| """ | ||
| # Using this guide: http://wiki.python.org/moin/WorkingWithTime | ||
| # And this conversion guide: http://docs.python.org/library/time.html | ||
| # Turn the date parameter into a tuple (struct_time) that can then be | ||
| # manipulated into a long value of seconds. During the conversion from | ||
| # struct_time to long, the source date in UTC, and so it follows that the | ||
| # correct transformation is calendar.timegm() | ||
| micros = calendar.timegm(date.utctimetuple()) * _MICROSECONDS_PER_SECOND | ||
| return micros + date.microsecond | ||
| def DatetimeToUTCMillis(date): | ||
| """Converts a datetime object to milliseconds since the epoch in UTC. | ||
| Args: | ||
| date: A datetime to convert. | ||
| Returns: | ||
| The number of milliseconds since the epoch, in UTC, represented by the input | ||
| datetime. | ||
| """ | ||
| return DatetimeToUTCMicros(date) / 1000 | ||
| def UTCMicrosToDatetime(micros, tz=None): | ||
| """Converts a microsecond epoch time to a datetime object. | ||
| Args: | ||
| micros: A UTC time, expressed in microseconds since the epoch. | ||
| tz: The desired tzinfo for the datetime object. If None, the | ||
| datetime will be naive. | ||
| Returns: | ||
| The datetime represented by the input value. | ||
| """ | ||
| # The conversion from micros to seconds for input into the | ||
| # utcfromtimestamp function needs to be done as a float to make sure | ||
| # we dont lose the sub-second resolution of the input time. | ||
| dt = datetime.datetime.utcfromtimestamp( | ||
| micros / _MICROSECONDS_PER_SECOND_F) | ||
| if tz is not None: | ||
| dt = tz.fromutc(dt) | ||
| return dt | ||
| def UTCMillisToDatetime(millis, tz=None): | ||
| """Converts a millisecond epoch time to a datetime object. | ||
| Args: | ||
| millis: A UTC time, expressed in milliseconds since the epoch. | ||
| tz: The desired tzinfo for the datetime object. If None, the | ||
| datetime will be naive. | ||
| Returns: | ||
| The datetime represented by the input value. | ||
| """ | ||
| return UTCMicrosToDatetime(millis * 1000, tz) | ||
| UTC = pytz.UTC | ||
@@ -384,6 +462,10 @@ US_PACIFIC = pytz.timezone('US/Pacific') | ||
| tz: optional timezone, if timezone is omitted from timestring. | ||
| Returns: | ||
| New Timestamp. | ||
| New Timestamp or None if unable to parse the timestring. | ||
| """ | ||
| r = parser.parse(timestring) | ||
| try: | ||
| r = parser.parse(timestring) | ||
| except ValueError: | ||
| return None | ||
| if not r.tzinfo: | ||
@@ -398,16 +480,37 @@ r = (tz or cls.LocalTimezone).localize(r) | ||
| def _IntStringToInterval(cls, timestring): | ||
| """Parse interval date specification and create a timedelta object.""" | ||
| return datetime.timedelta(seconds=ConvertIntervalToSeconds(timestring)) | ||
| """Parse interval date specification and create a timedelta object. | ||
| Args: | ||
| timestring: string interval. | ||
| Returns: | ||
| A datetime.timedelta representing the specified interval or None if | ||
| unable to parse the timestring. | ||
| """ | ||
| seconds = ConvertIntervalToSeconds(timestring) | ||
| return datetime.timedelta(seconds=seconds) if seconds else None | ||
| @classmethod | ||
| def FromString(cls, value, tz=None): | ||
| """Try to create a Timestamp from a string.""" | ||
| val = cls._StringToTime(value, tz) | ||
| if val: | ||
| return val | ||
| """Create a Timestamp from a string. | ||
| val = cls._IntStringToInterval(value) | ||
| if val: | ||
| return cls.utcnow() - val | ||
| Args: | ||
| value: String interval or datetime. | ||
| e.g. "2013-01-05 13:00:00" or "1d" | ||
| tz: optional timezone, if timezone is omitted from timestring. | ||
| Returns: | ||
| A new Timestamp. | ||
| Raises: | ||
| TimeParseError if unable to parse value. | ||
| """ | ||
| result = cls._StringToTime(value, tz=tz) | ||
| if result: | ||
| return result | ||
| result = cls._IntStringToInterval(value) | ||
| if result: | ||
| return cls.utcnow() - result | ||
| raise TimeParseError(value) | ||
@@ -414,0 +517,0 @@ |
@@ -15,2 +15,4 @@ #!/usr/bin/env python | ||
| # limitations under the License. | ||
| # | ||
| # This code must be source compatible with Python 2.6 through 3.3. | ||
@@ -52,3 +54,3 @@ """Import this module to add a hook to call pdb on uncaught exceptions. | ||
| traceback.print_exception(exc_class, value, tb) | ||
| sys.stdout.write('\n') | ||
| # ...then start the debugger in post-mortem mode. | ||
@@ -60,4 +62,5 @@ pdb.pm() | ||
| # Must back up old excepthook. | ||
| global old_excepthook # pylint: disable-msg=W0603 | ||
| old_excepthook = sys.excepthook | ||
| sys.excepthook = _DebugHandler | ||
| global old_excepthook # pylint: disable=global-statement | ||
| if old_excepthook is None: | ||
| old_excepthook = sys.excepthook | ||
| sys.excepthook = _DebugHandler |
@@ -21,2 +21,3 @@ #!/usr/bin/env python | ||
| import contextlib | ||
| import errno | ||
@@ -41,10 +42,7 @@ import os | ||
| """Read entire contents of file with name 'filename'.""" | ||
| fp = open(filename) | ||
| try: | ||
| with open(filename) as fp: | ||
| return fp.read() | ||
| finally: | ||
| fp.close() | ||
| def Write(filename, contents, overwrite_existing=True, mode=0666): | ||
| def Write(filename, contents, overwrite_existing=True, mode=0666, gid=None): | ||
| """Create a file 'filename' with 'contents', with the mode given in 'mode'. | ||
@@ -55,2 +53,4 @@ | ||
| An optional gid can be specified. | ||
| Args: | ||
@@ -62,2 +62,3 @@ filename: str; the name of the file | ||
| mode: int; permissions with which to create the file (default is 0666 octal) | ||
| gid: int; group id with which to create the file | ||
| """ | ||
@@ -72,5 +73,7 @@ flags = os.O_WRONLY | os.O_TRUNC | os.O_CREAT | ||
| os.close(fd) | ||
| if gid is not None: | ||
| os.chown(filename, -1, gid) | ||
| def AtomicWrite(filename, contents, mode=0666): | ||
| def AtomicWrite(filename, contents, mode=0666, gid=None): | ||
| """Create a file 'filename' with 'contents' atomically. | ||
@@ -84,2 +87,4 @@ | ||
| An optional gid can be specified. | ||
| Args: | ||
@@ -89,4 +94,5 @@ filename: str; the name of the file | ||
| mode: int; permissions with which to create the file (default is 0666 octal) | ||
| gid: int; group id with which to create the file | ||
| """ | ||
| (fd, tmp_filename) = tempfile.mkstemp(dir=os.path.dirname(filename)) | ||
| fd, tmp_filename = tempfile.mkstemp(dir=os.path.dirname(filename)) | ||
| try: | ||
@@ -98,2 +104,4 @@ os.write(fd, contents) | ||
| os.chmod(tmp_filename, mode) | ||
| if gid is not None: | ||
| os.chown(tmp_filename, -1, gid) | ||
| os.rename(tmp_filename, filename) | ||
@@ -108,2 +116,30 @@ except OSError, exc: | ||
| @contextlib.contextmanager | ||
| def TemporaryFileWithContents(contents, **kw): | ||
| """A contextmanager that writes out a string to a file on disk. | ||
| This is useful whenever you need to call a function or command that expects a | ||
| file on disk with some contents that you have in memory. The context manager | ||
| abstracts the writing, flushing, and deletion of the temporary file. This is a | ||
| common idiom that boils down to a single with statement. | ||
| Note: if you need a temporary file-like object for calling an internal | ||
| function, you should use a StringIO as a file-like object and not this. | ||
| Temporary files should be avoided unless you need a file name or contents in a | ||
| file on disk to be read by some other function or program. | ||
| Args: | ||
| contents: a string with the contents to write to the file. | ||
| **kw: Optional arguments passed on to tempfile.NamedTemporaryFile. | ||
| Yields: | ||
| The temporary file object, opened in 'w' mode. | ||
| """ | ||
| temporary_file = tempfile.NamedTemporaryFile(**kw) | ||
| temporary_file.write(contents) | ||
| temporary_file.flush() | ||
| yield temporary_file | ||
| temporary_file.close() | ||
| def MkDirs(directory, force_mode=None): | ||
@@ -110,0 +146,0 @@ """Makes a directory including its parent directories. |
@@ -62,3 +62,3 @@ #!/usr/bin/env python | ||
| """Get a filename for a resource; see _Call.""" | ||
| global _extracted_files # pylint: disable-msg=W0603 | ||
| global _extracted_files # pylint: disable=global-statement | ||
| if not _extracted_files: | ||
@@ -65,0 +65,0 @@ atexit.register(pkg_resources.cleanup_resources) |
@@ -214,5 +214,5 @@ #!/usr/bin/env python | ||
| os.execv(program, args) | ||
| except EnvironmentError, e: | ||
| except EnvironmentError as e: | ||
| if not getattr(e, 'filename', None): | ||
| e.filename = program # Add info to error message | ||
| raise |
@@ -137,3 +137,3 @@ #!/usr/bin/env python | ||
| return False | ||
| except SystemExit, e: | ||
| except SystemExit as e: | ||
| returncode, = e.args | ||
@@ -140,0 +140,0 @@ return not returncode |
| #!/usr/bin/env python | ||
| # This code must be source compatible with Python 2.4 through 3.3. | ||
| # | ||
@@ -3,0 +4,0 @@ # Copyright 2003 Google Inc. All Rights Reserved. |
+1
-1
| Metadata-Version: 1.0 | ||
| Name: google-apputils | ||
| Version: 0.3.0 | ||
| Version: 0.4.0 | ||
| Summary: UNKNOWN | ||
@@ -5,0 +5,0 @@ Home-page: http://code.google.com/p/google-apputils-python |
+2
-3
@@ -53,6 +53,5 @@ #!/usr/bin/env python | ||
| name="google-apputils", | ||
| version="0.3.0", | ||
| version="0.4.0", | ||
| packages=find_packages(exclude=["tests"]), | ||
| namespace_packages=find_packages(exclude=["tests"]), | ||
| scripts=["ez_setup.py"], | ||
| namespace_packages=["google"], | ||
| entry_points={ | ||
@@ -59,0 +58,0 @@ "distutils.commands": [ |
| #!/usr/bin/env python | ||
| # | ||
| # Copyright 2005 Google Inc. All Rights Reserved. | ||
@@ -3,0 +4,0 @@ # |
@@ -260,8 +260,11 @@ #! /bin/bash | ||
| fgrep '>%@<' >/dev/null || | ||
| die "Test 32 failed" | ||
| die "Test 30 failed" | ||
| readonly HELP_PROG=" | ||
| from ${APP_PACKAGE} import app | ||
| def main(argv): print 'HI' | ||
| app.run() | ||
| " | ||
| echo "PASS" | ||
| echo "PASS" |
| #!/usr/bin/env python | ||
| # | ||
| # Copyright 2007 Google Inc. All Rights Reserved. | ||
@@ -3,0 +4,0 @@ # |
@@ -276,2 +276,26 @@ #! /bin/bash | ||
| # Success, default command set and correctly run. | ||
| RES=`$PYTHON -c "${IMPORTS} | ||
| class test(appcommands.Cmd): | ||
| def Run(self, argv): | ||
| print 'test running correctly' | ||
| return 0 | ||
| def main(argv): | ||
| appcommands.AddCmd('test', test) | ||
| appcommands.SetDefaultCommand('test') | ||
| appcommands.Run()"` || die "Test 63 failed" | ||
| echo "${RES}" | grep -q "test running correctly" || die "Test 64 failed" | ||
| # Failure, default command set but missing. | ||
| $PYTHON -c "${IMPORTS} | ||
| class test(appcommands.Cmd): | ||
| def Run(self, argv): | ||
| print 'test running correctly' | ||
| return 0 | ||
| def main(argv): | ||
| appcommands.AddCmd('test', test) | ||
| appcommands.SetDefaultCommand('missing') | ||
| appcommands.Run()" >/dev/null 2>&1 && die "Test 65 failed" | ||
| echo "PASS" |
@@ -20,2 +20,3 @@ #!/usr/bin/env python | ||
| import datetime | ||
| import random | ||
@@ -146,4 +147,90 @@ import time | ||
| def testFromStringInterval(self): | ||
| expected_date = datetime.datetime.utcnow() - datetime.timedelta(days=1) | ||
| expected_s = time.mktime(expected_date.utctimetuple()) | ||
| actual_date = datelib.Timestamp.FromString('1d') | ||
| actual_s = time.mktime(actual_date.timetuple()) | ||
| diff_seconds = actual_s - expected_s | ||
| self.assertBetween(diff_seconds, 0, 1) | ||
| self.assertRaises( | ||
| datelib.TimeParseError, datelib.Timestamp.FromString, 'wat') | ||
| def _EpochToDatetime(t, tz=None): | ||
| if tz is not None: | ||
| return datelib.datetime.datetime.fromtimestamp(t, tz) | ||
| else: | ||
| return datelib.datetime.datetime.utcfromtimestamp(t) | ||
| class DatetimeConversionUnitTest(basetest.TestCase): | ||
| def setUp(self): | ||
| self.pst = pytz.timezone('US/Pacific') | ||
| self.utc = pytz.utc | ||
| self.now = time.time() | ||
| def testDatetimeToUTCMicros(self): | ||
| self.assertEqual( | ||
| 0, datelib.DatetimeToUTCMicros(_EpochToDatetime(0))) | ||
| self.assertEqual( | ||
| 1001 * long(datelib._MICROSECONDS_PER_SECOND), | ||
| datelib.DatetimeToUTCMicros(_EpochToDatetime(1001))) | ||
| self.assertEqual(long(self.now * datelib._MICROSECONDS_PER_SECOND), | ||
| datelib.DatetimeToUTCMicros(_EpochToDatetime(self.now))) | ||
| # tzinfo shouldn't change the result | ||
| self.assertEqual( | ||
| 0, datelib.DatetimeToUTCMicros(_EpochToDatetime(0, tz=self.pst))) | ||
| def testDatetimeToUTCMillis(self): | ||
| self.assertEqual( | ||
| 0, datelib.DatetimeToUTCMillis(_EpochToDatetime(0))) | ||
| self.assertEqual( | ||
| 1001 * 1000L, datelib.DatetimeToUTCMillis(_EpochToDatetime(1001))) | ||
| self.assertEqual(long(self.now * 1000), | ||
| datelib.DatetimeToUTCMillis(_EpochToDatetime(self.now))) | ||
| # tzinfo shouldn't change the result | ||
| self.assertEqual( | ||
| 0, datelib.DatetimeToUTCMillis(_EpochToDatetime(0, tz=self.pst))) | ||
| def testUTCMicrosToDatetime(self): | ||
| self.assertEqual(_EpochToDatetime(0), datelib.UTCMicrosToDatetime(0)) | ||
| self.assertEqual(_EpochToDatetime(1.000001), | ||
| datelib.UTCMicrosToDatetime(1000001)) | ||
| self.assertEqual(_EpochToDatetime(self.now), datelib.UTCMicrosToDatetime( | ||
| long(self.now * datelib._MICROSECONDS_PER_SECOND))) | ||
| # Check timezone-aware comparisons | ||
| self.assertEqual(_EpochToDatetime(0, self.pst), | ||
| datelib.UTCMicrosToDatetime(0, tz=self.pst)) | ||
| self.assertEqual(_EpochToDatetime(0, self.pst), | ||
| datelib.UTCMicrosToDatetime(0, tz=self.utc)) | ||
| def testUTCMillisToDatetime(self): | ||
| self.assertEqual(_EpochToDatetime(0), datelib.UTCMillisToDatetime(0)) | ||
| self.assertEqual(_EpochToDatetime(1.001), datelib.UTCMillisToDatetime(1001)) | ||
| t = time.time() | ||
| dt = _EpochToDatetime(t) | ||
| # truncate sub-milli time | ||
| dt -= datelib.datetime.timedelta(microseconds=dt.microsecond % 1000) | ||
| self.assertEqual(dt, datelib.UTCMillisToDatetime(long(t * 1000))) | ||
| # Check timezone-aware comparisons | ||
| self.assertEqual(_EpochToDatetime(0, self.pst), | ||
| datelib.UTCMillisToDatetime(0, tz=self.pst)) | ||
| self.assertEqual(_EpochToDatetime(0, self.pst), | ||
| datelib.UTCMillisToDatetime(0, tz=self.utc)) | ||
| class MicrosecondsToSecondsUnitTest(basetest.TestCase): | ||
| def testConversionFromMicrosecondsToSeconds(self): | ||
| self.assertEqual(0.0, datelib.MicrosecondsToSeconds(0)) | ||
| self.assertEqual(7.0, datelib.MicrosecondsToSeconds(7000000)) | ||
| self.assertEqual(1.234567, datelib.MicrosecondsToSeconds(1234567)) | ||
| self.assertEqual(12345654321.123456, | ||
| datelib.MicrosecondsToSeconds(12345654321123456)) | ||
| if __name__ == '__main__': | ||
| basetest.main() |
+69
-15
@@ -39,3 +39,3 @@ #!/usr/bin/env python | ||
| # pylint is dumb about mox: | ||
| # pylint:disable-msg=E1101 | ||
| # pylint: disable=no-member | ||
@@ -67,7 +67,4 @@ | ||
| file_util.Write(self.file_path, self.sample_contents) | ||
| fp = open(self.file_path) | ||
| try: | ||
| with open(self.file_path) as fp: | ||
| self.assertEquals(fp.read(), self.sample_contents) | ||
| finally: | ||
| fp.close() | ||
@@ -87,7 +84,4 @@ def testWriteExclusive(self): | ||
| file_util.AtomicWrite(self.file_path, self.sample_contents) | ||
| fp = open(self.file_path) | ||
| try: | ||
| with open(self.file_path) as fp: | ||
| self.assertEquals(fp.read(), self.sample_contents) | ||
| finally: | ||
| fp.close() | ||
@@ -131,6 +125,31 @@ def testAtomicWriteMode(self): | ||
| open(self.file_path).AndReturn(file_handle) | ||
| file_handle.__enter__().AndReturn(file_handle) | ||
| file_handle.read().AndReturn(self.sample_contents) | ||
| file_handle.close() | ||
| file_handle.__exit__(None, None, None) | ||
| self.mox.ReplayAll() | ||
| self.assertEquals(file_util.Read(self.file_path), self.sample_contents) | ||
| try: | ||
| self.assertEquals(file_util.Read(self.file_path), self.sample_contents) | ||
| self.mox.VerifyAll() | ||
| finally: | ||
| # Because we mock out the built-in open() function, which the unittest | ||
| # library depends on, we need to make sure we revert it before leaving the | ||
| # test, otherwise any test failures will cause further internal failures | ||
| # and yield no meaningful error output. | ||
| self.mox.ResetAll() | ||
| self.mox.UnsetStubs() | ||
| def testWriteGroup(self): | ||
| self.mox.StubOutWithMock(os, 'open') | ||
| self.mox.StubOutWithMock(os, 'write') | ||
| self.mox.StubOutWithMock(os, 'close') | ||
| self.mox.StubOutWithMock(os, 'chown') | ||
| gid = 'new gid' | ||
| os.open(self.file_path, os.O_WRONLY | os.O_TRUNC | os.O_CREAT, | ||
| 0666).AndReturn(self.fd) | ||
| os.write(self.fd, self.sample_contents) | ||
| os.close(self.fd) | ||
| os.chown(self.file_path, -1, gid) | ||
| self.mox.ReplayAll() | ||
| file_util.Write(self.file_path, self.sample_contents, gid=gid) | ||
| self.mox.VerifyAll() | ||
@@ -150,2 +169,3 @@ | ||
| self.mode = 'new permissions' | ||
| self.gid = 'new gid' | ||
| self.temp_filename = '/some/temp/file' | ||
@@ -163,2 +183,21 @@ self.os_error = OSError('A problem renaming!') | ||
| def testAtomicWriteGroup(self): | ||
| self.mox.StubOutWithMock(os, 'chown') | ||
| os.chown(self.temp_filename, -1, self.gid) | ||
| os.rename(self.temp_filename, self.file_path) | ||
| self.mox.ReplayAll() | ||
| file_util.AtomicWrite(self.file_path, self.sample_contents, | ||
| mode=self.mode, gid=self.gid) | ||
| self.mox.VerifyAll() | ||
| def testAtomicWriteGroupError(self): | ||
| self.mox.StubOutWithMock(os, 'chown') | ||
| os.chown(self.temp_filename, -1, self.gid).AndRaise(self.os_error) | ||
| os.remove(self.temp_filename) | ||
| self.mox.ReplayAll() | ||
| self.assertRaises(OSError, file_util.AtomicWrite, self.file_path, | ||
| self.sample_contents, mode=self.mode, gid=self.gid) | ||
| self.mox.VerifyAll() | ||
| def testRenamingError(self): | ||
@@ -180,3 +219,3 @@ os.rename(self.temp_filename, self.file_path).AndRaise(self.os_error) | ||
| mode=self.mode) | ||
| except OSError, e: | ||
| except OSError as e: | ||
| self.assertEquals(str(e), | ||
@@ -190,6 +229,19 @@ 'A problem renaming!. Additional errors cleaning up: ' | ||
| class TemporaryFilesMoxTest(FileUtilMoxTestBase): | ||
| def testTemporaryFileWithContents(self): | ||
| contents = 'Inspiration!' | ||
| with file_util.TemporaryFileWithContents(contents) as temporary_file: | ||
| filename = temporary_file.name | ||
| contents_read = open(temporary_file.name).read() | ||
| self.assertEqual(contents_read, contents) | ||
| # Ensure that the file does not exist. | ||
| self.assertFalse(os.path.exists(filename)) | ||
| class MkDirsMoxTest(FileUtilMoxTestBase): | ||
| # pylint is dumb about mox: | ||
| # pylint:disable-msg=E1103 | ||
| # pylint: disable=maybe-no-member | ||
@@ -331,5 +383,7 @@ def setUp(self): | ||
| os.makedirs(test_sandbox) | ||
| open(os.path.join(test_sandbox, 'file'), 'w').close() | ||
| with open(os.path.join(test_sandbox, 'file'), 'w'): | ||
| pass | ||
| os.makedirs(test_dir) | ||
| open(os.path.join(test_dir, 'file'), 'w') | ||
| with open(os.path.join(test_dir, 'file'), 'w'): | ||
| pass | ||
@@ -336,0 +390,0 @@ file_util.RmDirs(test_dir) |
+0
-1
| #!/usr/bin/env python | ||
| # | ||
| # Copyright 2010 Google Inc. All Rights Reserved. | ||
@@ -4,0 +3,0 @@ |
| #!/usr/bin/env python | ||
| # This code must be source compatible with Python 2.4 through 3.3. | ||
| # | ||
@@ -8,7 +9,7 @@ # Copyright 2003 Google Inc. All Rights Reserved. | ||
| __pychecker__ = '--unusednames=e,_' | ||
| import os | ||
| # Use unittest instead of basetest to avoid bootstrap issues / circular deps. | ||
| import unittest | ||
| from google.apputils import basetest | ||
| from google.apputils import shellutil | ||
@@ -20,3 +21,3 @@ | ||
| class ShellUtilUnitTest(basetest.TestCase): | ||
| class ShellUtilUnitTest(unittest.TestCase): | ||
| def testShellEscapeList(self): | ||
@@ -27,11 +28,11 @@ # TODO(user): Actually run some shell commands and test the | ||
| words = [] | ||
| self.assertEquals(shellutil.ShellEscapeList(words), '') | ||
| self.assertEqual(shellutil.ShellEscapeList(words), '') | ||
| # Empty string | ||
| words = [''] | ||
| self.assertEquals(shellutil.ShellEscapeList(words), "''") | ||
| self.assertEqual(shellutil.ShellEscapeList(words), "''") | ||
| # Single word | ||
| words = ['foo'] | ||
| self.assertEquals(shellutil.ShellEscapeList(words), "'foo'") | ||
| self.assertEqual(shellutil.ShellEscapeList(words), "'foo'") | ||
@@ -41,11 +42,11 @@ # Single word with single quote | ||
| expected = """ 'foo'"'"'bar' """.strip() | ||
| self.assertEquals(shellutil.ShellEscapeList(words), expected) | ||
| self.assertEqual(shellutil.ShellEscapeList(words), expected) | ||
| # .. double quote | ||
| words = ['foo"bar'] | ||
| expected = """ 'foo"bar' """.strip() | ||
| self.assertEquals(shellutil.ShellEscapeList(words), expected) | ||
| self.assertEqual(shellutil.ShellEscapeList(words), expected) | ||
| # Multiple words | ||
| words = ['foo', 'bar'] | ||
| self.assertEquals(shellutil.ShellEscapeList(words), "'foo' 'bar'") | ||
| self.assertEqual(shellutil.ShellEscapeList(words), "'foo' 'bar'") | ||
@@ -55,3 +56,3 @@ # Words with spaces | ||
| expected = """ 'foo' 'bar' 'foo'"'"''"'"' '"'"''"'"'bar' """.strip() | ||
| self.assertEquals(shellutil.ShellEscapeList(words), expected) | ||
| self.assertEqual(shellutil.ShellEscapeList(words), expected) | ||
@@ -61,3 +62,3 @@ # Now I'm just being mean | ||
| expected = """ 'foo' 'bar' '""'"'"'"'"'"'"' """.strip() | ||
| self.assertEquals(shellutil.ShellEscapeList(words), expected) | ||
| self.assertEqual(shellutil.ShellEscapeList(words), expected) | ||
@@ -72,2 +73,2 @@ def testShellifyStatus(self): | ||
| if __name__ == '__main__': | ||
| basetest.main() | ||
| unittest.main() |
Sorry, the diff of this file is too big to display
Alert delta unavailable
Currently unable to show alert delta for PyPI packages.
318764
27.9%44
7.32%6726
29.72%