🚨 Latest Research:Tanstack npm Packages Compromised in Ongoing Mini Shai-Hulud Supply-Chain Attack.Learn More
Socket
Book a DemoSign in
Socket

processing

Package Overview
Dependencies
Maintainers
2
Versions
18
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

processing - pypi Package Compare versions

Comparing version
0.51
to
0.52
+22
-0
CHANGES.txt

@@ -7,2 +7,24 @@ .. default-role:: literal

Changes in 0.52
---------------
* On versions 0.50 and 0.51 Mac OSX `Lock.release()` would fail with
`OSError(errno.ENOSYS, "[Errno 78] Function not implemented")`.
This appears to be because on Mac OSX `sem_getvalue()` has not been
implemented.
Now `sem_getvalue()` is no longer needed. Unfortunately, however,
on Mac OSX `BoundedSemaphore()` will not raise `ValueError` if it
exceeds its initial value.
* Some changes to the code for the reduction/rebuilding of connection
and socket objects so that things work the same on Windows and Unix.
This should fix a couple of bugs.
* The code has been changed to consistently use "camelCase" for
methods and (non-factory) functions. In the few cases where this
has meant a change to the documented API, the old name has been
retained as an alias.
Changes in 0.51

@@ -9,0 +31,0 @@ ---------------

@@ -14,2 +14,24 @@ <?xml version="1.0" encoding="utf-8" ?>

<div class="section">
<h1><a id="changes-in-0-52" name="changes-in-0-52">Changes in 0.52</a></h1>
<ul>
<li><p class="first">On versions 0.50 and 0.51 Mac OSX <tt class="docutils literal"><span class="pre">Lock.release()</span></tt> would fail with
<tt class="docutils literal"><span class="pre">OSError(errno.ENOSYS,</span> <span class="pre">&quot;[Errno</span> <span class="pre">78]</span> <span class="pre">Function</span> <span class="pre">not</span> <span class="pre">implemented&quot;)</span></tt>.
This appears to be because on Mac OSX <tt class="docutils literal"><span class="pre">sem_getvalue()</span></tt> has not been
implemented.</p>
<p>Now <tt class="docutils literal"><span class="pre">sem_getvalue()</span></tt> is no longer needed. Unfortunately, however,
on Mac OSX <tt class="docutils literal"><span class="pre">BoundedSemaphore()</span></tt> will not raise <tt class="docutils literal"><span class="pre">ValueError</span></tt> if it
exceeds its initial value.</p>
</li>
<li><p class="first">Some changes to the code for the reduction/rebuilding of connection
and socket objects so that things work the same on Windows and Unix.
This should fix a couple of bugs.</p>
</li>
<li><p class="first">The code has been changed to consistently use &quot;camelCase&quot; for
methods and (non-factory) functions. In the few cases where this
has meant a change to the documented API, the old name has been
retained as an alias.</p>
</li>
</ul>
</div>
<div class="section">
<h1><a id="changes-in-0-51" name="changes-in-0-51">Changes in 0.51</a></h1>

@@ -16,0 +38,0 @@ <ul>

+8
-8

@@ -48,3 +48,3 @@ <?xml version="1.0" encoding="utf-8" ?>

</dd>
<dt><tt class="docutils literal"><span class="pre">sendbytes(buffer)</span></tt></dt>
<dt><tt class="docutils literal"><span class="pre">sendBytes(buffer)</span></tt></dt>
<dd><p class="first">Send byte data from an object supporting the buffer interface

@@ -54,7 +54,7 @@ as a complete message.</p>

</dd>
<dt><tt class="docutils literal"><span class="pre">recvbytes()</span></tt></dt>
<dt><tt class="docutils literal"><span class="pre">recvBytes()</span></tt></dt>
<dd>Return a complete message of byte data sent from the other end
of the connection as a string. Raises <tt class="docutils literal"><span class="pre">EOFError</span></tt> if there is
nothing left to receive and the other end was closed.</dd>
<dt><tt class="docutils literal"><span class="pre">recvbytes_into(buffer,</span> <span class="pre">offset=0)</span></tt></dt>
<dt><tt class="docutils literal"><span class="pre">recvBytesInto(buffer,</span> <span class="pre">offset=0)</span></tt></dt>
<dd><p class="first">Read into <tt class="docutils literal"><span class="pre">buffer</span></tt> at position <tt class="docutils literal"><span class="pre">offset</span></tt> a complete message of

@@ -81,4 +81,4 @@ byte data sent from the other end of the connection and return

[1, 'hello', None]
&gt;&gt;&gt; b.sendbytes('thank you')
&gt;&gt;&gt; a.recvbytes()
&gt;&gt;&gt; b.sendBytes('thank you')
&gt;&gt;&gt; a.recvBytes()
'thank you'

@@ -88,5 +88,5 @@ &gt;&gt;&gt; import array

&gt;&gt;&gt; arr2 = array.array('i', [0] * 10)
&gt;&gt;&gt; a.sendbytes(arr1)
&gt;&gt;&gt; b.recvbytes_into(arr2)
20
&gt;&gt;&gt; a.sendBytes(arr1)
&gt;&gt;&gt; count = b.recvBytesInto(arr2)
&gt;&gt;&gt; assert count == len(arr1) * arr1.itemsize
&gt;&gt;&gt; arr2

@@ -93,0 +93,0 @@ array('i', [0, 1, 2, 3, 4, 0, 0, 0, 0, 0])

@@ -45,3 +45,3 @@ .. include:: header.txt

`sendbytes(buffer)`
`sendBytes(buffer)`
Send byte data from an object supporting the buffer interface

@@ -52,3 +52,3 @@ as a complete message.

`recvbytes()`
`recvBytes()`
Return a complete message of byte data sent from the other end

@@ -58,3 +58,3 @@ of the connection as a string. Raises `EOFError` if there is

`recvbytes_into(buffer, offset=0)`
`recvBytesInto(buffer, offset=0)`
Read into `buffer` at position `offset` a complete message of

@@ -81,4 +81,4 @@ byte data sent from the other end of the connection and return

[1, 'hello', None]
>>> b.sendbytes('thank you')
>>> a.recvbytes()
>>> b.sendBytes('thank you')
>>> a.recvBytes()
'thank you'

@@ -88,5 +88,5 @@ >>> import array

>>> arr2 = array.array('i', [0] * 10)
>>> a.sendbytes(arr1)
>>> b.recvbytes_into(arr2)
20
>>> a.sendBytes(arr1)
>>> count = b.recvBytesInto(arr2)
>>> assert count == len(arr1) * arr1.itemsize
>>> arr2

@@ -93,0 +93,0 @@ array('i', [0, 1, 2, 3, 4, 0, 0, 0, 0, 0])

@@ -45,3 +45,3 @@ <?xml version="1.0" encoding="utf-8" ?>

</blockquote>
<!-- `deliver_challenge(connection, authkey)`
<!-- `deliverChallenge(connection, authkey)`
Sends a randomly generated message to the other end of the

@@ -54,3 +54,3 @@ connection and waits for a reply.

`answer_challenge(connection, authkey)`
`answerChallenge(connection, authkey)`
Receives a message, calculates the digest of the message using

@@ -67,3 +67,3 @@ `authkey` as the key, and then sends the digest back.

<dt><strong>exception</strong> <tt class="docutils literal"><span class="pre">BufferTooShort</span></tt></dt>
<dd><p class="first">Exception raise by the <tt class="docutils literal"><span class="pre">recvbytes_into()</span></tt> method of a
<dd><p class="first">Exception raise by the <tt class="docutils literal"><span class="pre">recvBytesInto()</span></tt> method of a
connection object when the supplied buffer object is too small

@@ -206,5 +206,5 @@ for the message read.</p>

conn.sendbytes('hello')
conn.sendBytes('hello')
conn.sendbytes(array('i', [42, 1729]))
conn.sendBytes(array('i', [42, 1729]))

@@ -225,6 +225,6 @@ conn.close()

print conn.recvbytes() # =&gt; 'hello'
print conn.recvBytes() # =&gt; 'hello'
arr = array('i', [0, 0, 0, 0, 0])
print conn.recvbytes_into(arr) # =&gt; 8
print conn.recvBytesInto(arr) # =&gt; 8
print arr # =&gt; array('i', [42, 1729, 0, 0, 0])

@@ -231,0 +231,0 @@

@@ -41,3 +41,3 @@ .. include:: header.txt

..
`deliver_challenge(connection, authkey)`
`deliverChallenge(connection, authkey)`
Sends a randomly generated message to the other end of the

@@ -50,3 +50,3 @@ connection and waits for a reply.

`answer_challenge(connection, authkey)`
`answerChallenge(connection, authkey)`
Receives a message, calculates the digest of the message using

@@ -65,3 +65,3 @@ `authkey` as the key, and then sends the digest back.

**exception** `BufferTooShort`
Exception raise by the `recvbytes_into()` method of a
Exception raise by the `recvBytesInto()` method of a
connection object when the supplied buffer object is too small

@@ -217,5 +217,5 @@ for the message read.

conn.sendbytes('hello')
conn.sendBytes('hello')
conn.sendbytes(array('i', [42, 1729]))
conn.sendBytes(array('i', [42, 1729]))

@@ -236,6 +236,6 @@ conn.close()

print conn.recvbytes() # => 'hello'
print conn.recvBytes() # => 'hello'
arr = array('i', [0, 0, 0, 0, 0])
print conn.recvbytes_into(arr) # => 8
print conn.recvBytesInto(arr) # => 8
print arr # => array('i', [42, 1729, 0, 0, 0])

@@ -242,0 +242,0 @@

@@ -7,3 +7,3 @@ <?xml version="1.0" encoding="utf-8" ?>

<meta name="generator" content="Docutils 0.4: http://docutils.sourceforge.net/" />
<title>Documentation for processing-0.51</title>
<title>Documentation for processing-0.52</title>
<meta name="author" content="R Oudkerk" />

@@ -18,3 +18,3 @@ <link rel="stylesheet" href="html4css1.css" type="text/css" />

<div class="document" id="documentation-for-processing-version">
<h1 class="title">Documentation for processing-0.51</h1>
<h1 class="title">Documentation for processing-0.52</h1>
<table class="docinfo" frame="void" rules="none">

@@ -21,0 +21,0 @@ <col class="docinfo-name" />

@@ -21,3 +21,3 @@ <?xml version="1.0" encoding="utf-8" ?>

<blockquote>
<a class="reference" href="http://cheeseshop.python.org/pypi/processing">http://cheeseshop.python.org/pypi/processing</a></blockquote>
<a class="reference" href="http://pypi.python.org/pypi/processing">http://pypi.python.org/pypi/processing</a></blockquote>
<p>Otherwise, if you have the correct C compiler setup then the source

@@ -24,0 +24,0 @@ distribution can be installed the usual way:</p>

@@ -268,3 +268,3 @@ <?xml version="1.0" encoding="utf-8" ?>

pool = Pool(processes=4) # start 4 worker processes
result = pool.apply_async(f, [10]) # evaluate &quot;f(10)&quot; asynchronously
result = pool.applyAsync(f, [10]) # evaluate &quot;f(10)&quot; asynchronously
print result.get(timeout=1) # prints &quot;100&quot; unless your computer is *very* slow

@@ -271,0 +271,0 @@ print pool.map(f, range(10)) # prints &quot;[0, 1, 4,..., 81]&quot;

@@ -284,3 +284,3 @@ .. include:: header.txt

pool = Pool(processes=4) # start 4 worker processes
result = pool.apply_async(f, [10]) # evaluate "f(10)" asynchronously
result = pool.applyAsync(f, [10]) # evaluate "f(10)" asynchronously
print result.get(timeout=1) # prints "100" unless your computer is *very* slow

@@ -287,0 +287,0 @@ print pool.map(f, range(10)) # prints "[0, 1, 4,..., 81]"

@@ -33,3 +33,3 @@ <?xml version="1.0" encoding="utf-8" ?>

<dd><p class="first">Creates a manager object.</p>
<p>Once created one should call <tt class="docutils literal"><span class="pre">start()</span></tt> or <tt class="docutils literal"><span class="pre">serve_forever()</span></tt> to
<p>Once created one should call <tt class="docutils literal"><span class="pre">start()</span></tt> or <tt class="docutils literal"><span class="pre">serveForever()</span></tt> to
ensure that the manager object refers to a started manager

@@ -56,6 +56,6 @@ process.</p>

<dd>Spawn or fork a subprocess to start the manager.</dd>
<dt><tt class="docutils literal"><span class="pre">serve_forever()</span></tt></dt>
<dt><tt class="docutils literal"><span class="pre">serveForever()</span></tt></dt>
<dd>Start the manager in the current process. See <a class="reference" href="#using-a-remote-manager">Using a remote
manager</a>.</dd>
<dt><tt class="docutils literal"><span class="pre">from_address(address,</span> <span class="pre">authkey)</span></tt></dt>
<dt><tt class="docutils literal"><span class="pre">fromAddress(address,</span> <span class="pre">authkey)</span></tt></dt>
<dd>A class method which returns a manager object referring to a

@@ -179,3 +179,3 @@ pre-existing server process which is using the given address and

<tt class="docutils literal"><span class="pre">exposed</span></tt> argument is the list of those method names which
should be exposed via <a class="reference" href="proxy-objects.html#methods-of-baseproxy"><tt class="docutils literal"><span class="pre">BaseProxy._callmethod()</span></tt></a>. <a class="footnote-reference" href="#id3" id="id1" name="id1">[1]</a> <a class="footnote-reference" href="#id4" id="id2" name="id2">[2]</a></p>
should be exposed via <a class="reference" href="proxy-objects.html#methods-of-baseproxy"><tt class="docutils literal"><span class="pre">BaseProxy._callMethod()</span></tt></a>. <a class="footnote-reference" href="#id3" id="id1" name="id1">[1]</a> <a class="footnote-reference" href="#id4" id="id2" name="id2">[2]</a></p>
<p>If <tt class="docutils literal"><span class="pre">exposed</span></tt> is <tt class="docutils literal"><span class="pre">None</span></tt> and <tt class="docutils literal"><span class="pre">callable.__exposed__</span></tt> exists then

@@ -186,3 +186,3 @@ <tt class="docutils literal"><span class="pre">callable.__exposed__</span></tt> is used instead.</p>

start with <tt class="docutils literal"><span class="pre">'_'</span></tt> will be exposed.</p>
<p class="last">An attempt to use <tt class="docutils literal"><span class="pre">BaseProxy._callmethod()</span></tt> with a method name which is
<p class="last">An attempt to use <tt class="docutils literal"><span class="pre">BaseProxy._callMethod()</span></tt> with a method name which is
not exposed will raise an exception.</p>

@@ -261,3 +261,3 @@ </dd>

&gt;&gt;&gt; m = QueueManager(address=('foo.bar.org', 50000), authkey='none')
&gt;&gt;&gt; m.serve_forever()
&gt;&gt;&gt; m.serveForever()
</pre>

@@ -270,3 +270,3 @@ <p>One client can access the server as follows:</p>

...
&gt;&gt;&gt; m = QueueManager.from_address(address=('foo.bar.org', 50000), authkey='none')
&gt;&gt;&gt; m = QueueManager.fromAddress(address=('foo.bar.org', 50000), authkey='none')
&gt;&gt;&gt; queue = m.get_proxy()

@@ -281,3 +281,3 @@ &gt;&gt;&gt; queue.put('hello')

...
&gt;&gt;&gt; m = QueueManager.from_address(address=('foo.bar.org', 50000), authkey='none')
&gt;&gt;&gt; m = QueueManager.fromAddress(address=('foo.bar.org', 50000), authkey='none')
&gt;&gt;&gt; queue = m.get_proxy()

@@ -284,0 +284,0 @@ &gt;&gt;&gt; queue.get()

@@ -28,3 +28,3 @@ .. include:: header.txt

Once created one should call `start()` or `serve_forever()` to
Once created one should call `start()` or `serveForever()` to
ensure that the manager object refers to a started manager

@@ -55,7 +55,7 @@ process.

`serve_forever()`
`serveForever()`
Start the manager in the current process. See `Using a remote
manager`_.
`from_address(address, authkey)`
`fromAddress(address, authkey)`
A class method which returns a manager object referring to a

@@ -216,3 +216,3 @@ pre-existing server process which is using the given address and

.. |callmethod| replace:: ``BaseProxy._callmethod()``
.. |callmethod| replace:: ``BaseProxy._callMethod()``

@@ -262,2 +262,3 @@ .. _callmethod: proxy-objects.html#methods-of-baseproxy

Using a remote manager

@@ -280,3 +281,3 @@ ======================

>>> m = QueueManager(address=('foo.bar.org', 50000), authkey='none')
>>> m.serve_forever()
>>> m.serveForever()

@@ -289,3 +290,3 @@ One client can access the server as follows::

...
>>> m = QueueManager.from_address(address=('foo.bar.org', 50000), authkey='none')
>>> m = QueueManager.fromAddress(address=('foo.bar.org', 50000), authkey='none')
>>> queue = m.get_proxy()

@@ -300,3 +301,3 @@ >>> queue.put('hello')

...
>>> m = QueueManager.from_address(address=('foo.bar.org', 50000), authkey='none')
>>> m = QueueManager.fromAddress(address=('foo.bar.org', 50000), authkey='none')
>>> queue = m.get_proxy()

@@ -303,0 +304,0 @@ >>> queue.get()

@@ -45,3 +45,3 @@ <?xml version="1.0" encoding="utf-8" ?>

the result is ready.</dd>
<dt><tt class="docutils literal"><span class="pre">apply_async(func,</span> <span class="pre">args=(),</span> <span class="pre">kwds={},</span> <span class="pre">callback=None)</span></tt></dt>
<dt><tt class="docutils literal"><span class="pre">applyAsync(func,</span> <span class="pre">args=(),</span> <span class="pre">kwds={},</span> <span class="pre">callback=None)</span></tt></dt>
<dd><p class="first">A variant of the <tt class="docutils literal"><span class="pre">apply()</span></tt> method which returns a

@@ -63,3 +63,3 @@ result object --- see <a class="reference" href="#asynchronous-result-objects">Asynchronous result objects</a>.</p>

</dd>
<dt><tt class="docutils literal"><span class="pre">map_async(func,</span> <span class="pre">iterable,</span> <span class="pre">chunksize=None,</span> <span class="pre">callback=None)</span></tt></dt>
<dt><tt class="docutils literal"><span class="pre">mapAsync(func,</span> <span class="pre">iterable,</span> <span class="pre">chunksize=None,</span> <span class="pre">callback=None)</span></tt></dt>
<dd><p class="first">A variant of the <tt class="docutils literal"><span class="pre">map()</span></tt> method which returns a result object

@@ -85,3 +85,3 @@ --- see <a class="reference" href="#asynchronous-result-objects">Asynchronous result objects</a>.</p>

</dd>
<dt><tt class="docutils literal"><span class="pre">imap_unordered(func,</span> <span class="pre">iterable,</span> <span class="pre">chunksize=1)</span></tt></dt>
<dt><tt class="docutils literal"><span class="pre">imapUnordered(func,</span> <span class="pre">iterable,</span> <span class="pre">chunksize=1)</span></tt></dt>
<dd>The same as <tt class="docutils literal"><span class="pre">imap()</span></tt> except that the ordering of the results

@@ -107,3 +107,3 @@ from the returned iterator should be considered arbitrary.

<h1><a id="asynchronous-result-objects" name="asynchronous-result-objects">Asynchronous result objects</a></h1>
<p>The result objects returns by <tt class="docutils literal"><span class="pre">apply_async()</span></tt> and <tt class="docutils literal"><span class="pre">map_async()</span></tt> have
<p>The result objects returns by <tt class="docutils literal"><span class="pre">applyAsync()</span></tt> and <tt class="docutils literal"><span class="pre">mapAsync()</span></tt> have
the following public methods:</p>

@@ -141,3 +141,3 @@ <blockquote>

result = pool.apply_async(f, (10,)) # evaluate &quot;f(10)&quot; asynchronously
result = pool.applyAsync(f, (10,)) # evaluate &quot;f(10)&quot; asynchronously
print result.get(timeout=1) # prints &quot;100&quot; unless your computer is *very* slow

@@ -153,3 +153,3 @@

import time
result = pool.apply_async(time.sleep, (10,))
result = pool.applyAsync(time.sleep, (10,))
print result.get(timeout=1) # raises `TimeoutError`

@@ -156,0 +156,0 @@ </pre>

@@ -38,3 +38,3 @@ .. include:: header.txt

`apply_async(func, args=(), kwds={}, callback=None)`
`applyAsync(func, args=(), kwds={}, callback=None)`
A variant of the `apply()` method which returns a

@@ -58,3 +58,3 @@ result object --- see `Asynchronous result objects`_.

`map_async(func, iterable, chunksize=None, callback=None)`
`mapAsync(func, iterable, chunksize=None, callback=None)`
A variant of the `map()` method which returns a result object

@@ -83,3 +83,3 @@ --- see `Asynchronous result objects`_.

`imap_unordered(func, iterable, chunksize=1)`
`imapUnordered(func, iterable, chunksize=1)`
The same as `imap()` except that the ordering of the results

@@ -108,3 +108,3 @@ from the returned iterator should be considered arbitrary.

The result objects returns by `apply_async()` and `map_async()` have
The result objects returns by `applyAsync()` and `mapAsync()` have
the following public methods:

@@ -144,3 +144,3 @@

result = pool.apply_async(f, (10,)) # evaluate "f(10)" asynchronously
result = pool.applyAsync(f, (10,)) # evaluate "f(10)" asynchronously
print result.get(timeout=1) # prints "100" unless your computer is *very* slow

@@ -156,3 +156,3 @@

import time
result = pool.apply_async(time.sleep, (10,))
result = pool.applyAsync(time.sleep, (10,))
print result.get(timeout=1) # raises `TimeoutError`

@@ -159,0 +159,0 @@

@@ -128,3 +128,4 @@ <?xml version="1.0" encoding="utf-8" ?>

Note that exit handlers and finally clauses etc will not be
executed.</p>
executed. Also note that descendants of the process will
<em>not</em> be terminates.</p>
<div class="last warning">

@@ -131,0 +132,0 @@ <p class="first admonition-title">Warning</p>

@@ -139,3 +139,4 @@ .. include:: header.txt

Note that exit handlers and finally clauses etc will not be
executed.
executed. Also note that descendants of the process will
*not* be terminates.

@@ -142,0 +143,0 @@ .. warning::

@@ -28,3 +28,3 @@ <?xml version="1.0" encoding="utf-8" ?>

<dt><strong>exception</strong> <tt class="docutils literal"><span class="pre">BufferTooShort</span></tt></dt>
<dd><p class="first">Exception raised by the <tt class="docutils literal"><span class="pre">recvbytes_into()</span></tt> method of a
<dd><p class="first">Exception raised by the <tt class="docutils literal"><span class="pre">recvBytesInto()</span></tt> method of a
<a class="reference" href="connection-objects.html">connection object</a>

@@ -79,4 +79,7 @@ when the supplied buffer object is too small for the message

<dt><tt class="docutils literal"><span class="pre">BoundedSemaphore(value=1)</span></tt></dt>
<dd>Returns a bounded semaphore object: a clone of
<tt class="docutils literal"><span class="pre">threading.BoundedSemaphore</span></tt>.</dd>
<dd><p class="first">Returns a bounded semaphore object: a clone of
<tt class="docutils literal"><span class="pre">threading.BoundedSemaphore</span></tt>.</p>
<p class="last">(On Mac OSX this is indistiguishable from <tt class="docutils literal"><span class="pre">Semaphore()</span></tt>
because <tt class="docutils literal"><span class="pre">sem_getvalue()</span></tt> is not implemented on that platform).</p>
</dd>
<dt><tt class="docutils literal"><span class="pre">Condition(lock=None)</span></tt></dt>

@@ -220,7 +223,6 @@ <dd><p class="first">Returns a condition variable: a clone of <tt class="docutils literal"><span class="pre">threading.Condition</span></tt>.</p>

<dl class="docutils">
<dt><tt class="docutils literal"><span class="pre">enableLogging(level=10,</span> <span class="pre">HandlerType=None,</span> <span class="pre">handlerArgs=(),</span> <span class="pre">format=None)</span></tt></dt>
<dt><tt class="docutils literal"><span class="pre">enableLogging(level,</span> <span class="pre">HandlerType=None,</span> <span class="pre">handlerArgs=(),</span> <span class="pre">format=None)</span></tt></dt>
<dd><p class="first">Enables logging and sets the debug level used by the package's
logger to <tt class="docutils literal"><span class="pre">level</span></tt> -- see documentation for the <tt class="docutils literal"><span class="pre">logging</span></tt>
package in the standard library. The default logging level is
<tt class="docutils literal"><span class="pre">10</span></tt> which is equivalent to <tt class="docutils literal"><span class="pre">logging.DEBUG</span></tt>.</p>
logger to <tt class="docutils literal"><span class="pre">level</span></tt>. See documentation for the <tt class="docutils literal"><span class="pre">logging</span></tt> module
in the standard library.</p>
<p>If <tt class="docutils literal"><span class="pre">HandlerType</span></tt> is specified then a handler is created using

@@ -227,0 +229,0 @@ <tt class="docutils literal"><span class="pre">HandlerType(*handlerArgs)</span></tt> and this will be used by the

@@ -20,3 +20,3 @@ .. include:: header.txt

**exception** `BufferTooShort`
Exception raised by the `recvbytes_into()` method of a
Exception raised by the `recvBytesInto()` method of a
`connection object <connection-objects.html>`_

@@ -76,4 +76,7 @@ when the supplied buffer object is too small for the message

Returns a bounded semaphore object: a clone of
`threading.BoundedSemaphore`.
`threading.BoundedSemaphore`.
(On Mac OSX this is indistiguishable from `Semaphore()`
because `sem_getvalue()` is not implemented on that platform).
`Condition(lock=None)`

@@ -231,7 +234,6 @@ Returns a condition variable: a clone of `threading.Condition`.

`enableLogging(level=10, HandlerType=None, handlerArgs=(), format=None)`
`enableLogging(level, HandlerType=None, handlerArgs=(), format=None)`
Enables logging and sets the debug level used by the package's
logger to `level` -- see documentation for the `logging`
package in the standard library. The default logging level is
`10` which is equivalent to `logging.DEBUG`.
logger to `level`. See documentation for the `logging` module
in the standard library.

@@ -238,0 +240,0 @@ If `HandlerType` is specified then a handler is created using

@@ -65,3 +65,3 @@ <?xml version="1.0" encoding="utf-8" ?>

the &quot;feeder&quot; thread to the underlying pipe. (The child process
can call the <tt class="docutils literal"><span class="pre">canceljoin()</span></tt> method of the queue to avoid this
can call the <tt class="docutils literal"><span class="pre">cancelJoin()</span></tt> method of the queue to avoid this
behaviour.)</p>

@@ -68,0 +68,0 @@ <p>This means that whenever you use a queue you need to make sure

@@ -63,3 +63,3 @@ .. include:: header.txt

the "feeder" thread to the underlying pipe. (The child process
can call the `canceljoin()` method of the queue to avoid this
can call the `cancelJoin()` method of the queue to avoid this
behaviour.)

@@ -66,0 +66,0 @@

@@ -89,3 +89,3 @@ <?xml version="1.0" encoding="utf-8" ?>

<dl class="docutils">
<dt><tt class="docutils literal"><span class="pre">_callmethod(methodname,</span> <span class="pre">args=(),</span> <span class="pre">kwds={})</span></tt></dt>
<dt><tt class="docutils literal"><span class="pre">_callMethod(methodname,</span> <span class="pre">args=(),</span> <span class="pre">kwds={})</span></tt></dt>
<dd><p class="first">Call and return the result of a method of the proxy's referent.</p>

@@ -95,3 +95,3 @@ <p>If <tt class="docutils literal"><span class="pre">proxy</span></tt> is a proxy whose referent is <tt class="docutils literal"><span class="pre">obj</span></tt> then the

<blockquote>
<tt class="docutils literal"><span class="pre">proxy._callmethod(methodname,</span> <span class="pre">args,</span> <span class="pre">kwds)</span></tt></blockquote>
<tt class="docutils literal"><span class="pre">proxy._callMethod(methodname,</span> <span class="pre">args,</span> <span class="pre">kwds)</span></tt></blockquote>
<p>will evaluate the expression</p>

@@ -105,5 +105,5 @@ <blockquote>

<p>If an exception is raised by <a class="reference" href="#id1">(*)</a> then then is re-raised by
<tt class="docutils literal"><span class="pre">_callmethod()</span></tt>. If some other exception is raised in the
<tt class="docutils literal"><span class="pre">_callMethod()</span></tt>. If some other exception is raised in the
manager's process then this is converted into a <tt class="docutils literal"><span class="pre">RemoteError</span></tt>
exception and is raised by <tt class="docutils literal"><span class="pre">_callmethod()</span></tt>.</p>
exception and is raised by <tt class="docutils literal"><span class="pre">_callMethod()</span></tt>.</p>
<p class="last">Note in particular that an exception will be raised if

@@ -113,3 +113,3 @@ <tt class="docutils literal"><span class="pre">methodname</span></tt> has not been <em>exposed</em> --- see the <tt class="docutils literal"><span class="pre">exposed</span></tt>

</dd>
<dt><tt class="docutils literal"><span class="pre">_getvalue()</span></tt></dt>
<dt><tt class="docutils literal"><span class="pre">_getValue()</span></tt></dt>
<dd><p class="first">Return a copy of the referent.</p>

@@ -135,10 +135,10 @@ <p class="last">If the referent is unpicklable then this will raise an exception.</p>

<h1><a id="examples" name="examples">Examples</a></h1>
<p>An example of the usage of <tt class="docutils literal"><span class="pre">_callmethod()</span></tt>:</p>
<p>An example of the usage of <tt class="docutils literal"><span class="pre">_callMethod()</span></tt>:</p>
<pre class="literal-block">
&gt;&gt;&gt; l = manager.list(range(10))
&gt;&gt;&gt; l._callmethod('__getslice__', (2, 7)) # equiv to `l[2:7]`
&gt;&gt;&gt; l._callMethod('__getslice__', (2, 7)) # equiv to `l[2:7]`
[2, 3, 4, 5, 6]
&gt;&gt;&gt; l._callmethod('__iter__') # equiv to `iter(l)`
&gt;&gt;&gt; l._callMethod('__iter__') # equiv to `iter(l)`
&lt;Proxy[iter] object at 0x00DFAFF0&gt;
&gt;&gt;&gt; l._callmethod('__getitem__', (20,)) # equiv to `l[20]`
&gt;&gt;&gt; l._callMethod('__getitem__', (20,)) # equiv to `l[20]`
Traceback (most recent call last):

@@ -155,3 +155,3 @@ ...

def next(self):
return self._callmethod('next')
return self._callMethod('next')
</pre>

@@ -158,0 +158,0 @@ </div>

@@ -81,3 +81,3 @@ .. include:: header.txt

`_callmethod(methodname, args=(), kwds={})`
`_callMethod(methodname, args=(), kwds={})`
Call and return the result of a method of the proxy's referent.

@@ -88,3 +88,3 @@

`proxy._callmethod(methodname, args, kwds)`
`proxy._callMethod(methodname, args, kwds)`

@@ -102,5 +102,5 @@ will evaluate the expression

If an exception is raised by `(*)`_ then then is re-raised by
`_callmethod()`. If some other exception is raised in the
`_callMethod()`. If some other exception is raised in the
manager's process then this is converted into a `RemoteError`
exception and is raised by `_callmethod()`.
exception and is raised by `_callMethod()`.

@@ -112,3 +112,3 @@ Note in particular that an exception will be raised if

`_getvalue()`
`_getValue()`
Return a copy of the referent.

@@ -139,10 +139,10 @@

An example of the usage of `_callmethod()`::
An example of the usage of `_callMethod()`::
>>> l = manager.list(range(10))
>>> l._callmethod('__getslice__', (2, 7)) # equiv to `l[2:7]`
>>> l._callMethod('__getslice__', (2, 7)) # equiv to `l[2:7]`
[2, 3, 4, 5, 6]
>>> l._callmethod('__iter__') # equiv to `iter(l)`
>>> l._callMethod('__iter__') # equiv to `iter(l)`
<Proxy[iter] object at 0x00DFAFF0>
>>> l._callmethod('__getitem__', (20,)) # equiv to `l[20]`
>>> l._callMethod('__getitem__', (20,)) # equiv to `l[20]`
Traceback (most recent call last):

@@ -159,3 +159,3 @@ ...

def next(self):
return self._callmethod('next')
return self._callMethod('next')

@@ -162,0 +162,0 @@

@@ -47,3 +47,3 @@ <?xml version="1.0" encoding="utf-8" ?>

<tt class="docutils literal"><span class="pre">Full</span></tt> exception (<tt class="docutils literal"><span class="pre">timeout</span></tt> is ignored in that case).</dd>
<dt><tt class="docutils literal"><span class="pre">put_nowait(item)</span></tt></dt>
<dt><tt class="docutils literal"><span class="pre">put_nowait(item)</span></tt>, <tt class="docutils literal"><span class="pre">putNoWait(item)</span></tt></dt>
<dd>Equivalent to <tt class="docutils literal"><span class="pre">put(item,</span> <span class="pre">False)</span></tt>.</dd>

@@ -60,3 +60,3 @@ <dt><tt class="docutils literal"><span class="pre">get(block=True,</span> <span class="pre">timeout=None)</span></tt></dt>

that case).</dd>
<dt><tt class="docutils literal"><span class="pre">get_nowait()</span></tt></dt>
<dt><tt class="docutils literal"><span class="pre">get_nowait()</span></tt>, <tt class="docutils literal"><span class="pre">getNoWait()</span></tt></dt>
<dd>Equivalent to <tt class="docutils literal"><span class="pre">get(False)</span></tt>.</dd>

@@ -69,6 +69,6 @@ </dl>

<dl class="docutils">
<dt><tt class="docutils literal"><span class="pre">putmany(iterable)</span></tt></dt>
<dt><tt class="docutils literal"><span class="pre">putMany(iterable)</span></tt></dt>
<dd>If the queue has infinite size then this adds all
items in the iterable to the queue's buffer. So
<tt class="docutils literal"><span class="pre">q.putmany(X)</span></tt> is a faster alternative to <tt class="docutils literal"><span class="pre">for</span> <span class="pre">x</span> <span class="pre">in</span> <span class="pre">X:</span>
<tt class="docutils literal"><span class="pre">q.putMany(X)</span></tt> is a faster alternative to <tt class="docutils literal"><span class="pre">for</span> <span class="pre">x</span> <span class="pre">in</span> <span class="pre">X:</span>
<span class="pre">q.put(x)</span></tt>. Raises an error if the queue has finite

@@ -81,3 +81,3 @@ size.</dd>

called automatically when the queue is garbage collected.</dd>
<dt><tt class="docutils literal"><span class="pre">jointhread()</span></tt></dt>
<dt><tt class="docutils literal"><span class="pre">joinThread()</span></tt></dt>
<dd><p class="first">This joins the background thread and can only be used

@@ -90,5 +90,5 @@ after <tt class="docutils literal"><span class="pre">close()</span></tt> has been called. This blocks until

background thread. The process can call
<tt class="docutils literal"><span class="pre">canceljoin()</span></tt> to prevent this behaviour.</p>
<tt class="docutils literal"><span class="pre">cancelJoin()</span></tt> to prevent this behaviour.</p>
</dd>
<dt><tt class="docutils literal"><span class="pre">canceljoin()</span></tt></dt>
<dt><tt class="docutils literal"><span class="pre">cancelJoin()</span></tt></dt>
<dd>Prevents the background thread from being joined

@@ -118,3 +118,3 @@ automatically when the process exits. Unnecessary if

<p>As mentioned above, if a child process has put items on a queue
(and it has not used <tt class="docutils literal"><span class="pre">canceljoin()</span></tt>) then that process will not
(and it has not used <tt class="docutils literal"><span class="pre">cancelJoin()</span></tt>) then that process will not
terminate until all buffered items have been flushed to the pipe.</p>

@@ -121,0 +121,0 @@ <p>This means that if you try joining that process you may get a

@@ -40,3 +40,3 @@ .. include:: header.txt

`put_nowait(item)`
`put_nowait(item)`, `putNoWait(item)`
Equivalent to `put(item, False)`.

@@ -55,3 +55,3 @@

`get_nowait()`
`get_nowait()`, `getNoWait()`
Equivalent to `get(False)`.

@@ -62,6 +62,6 @@

`putmany(iterable)`
`putMany(iterable)`
If the queue has infinite size then this adds all
items in the iterable to the queue's buffer. So
`q.putmany(X)` is a faster alternative to `for x in X:
`q.putMany(X)` is a faster alternative to `for x in X:
q.put(x)`. Raises an error if the queue has finite

@@ -76,3 +76,3 @@ size.

`jointhread()`
`joinThread()`
This joins the background thread and can only be used

@@ -86,5 +86,5 @@ after `close()` has been called. This blocks until

background thread. The process can call
`canceljoin()` to prevent this behaviour.
`cancelJoin()` to prevent this behaviour.
`canceljoin()`
`cancelJoin()`
Prevents the background thread from being joined

@@ -113,3 +113,3 @@ automatically when the process exits. Unnecessary if

As mentioned above, if a child process has put items on a queue
(and it has not used `canceljoin()`) then that process will not
(and it has not used `cancelJoin()`) then that process will not
terminate until all buffered items have been flushed to the pipe.

@@ -116,0 +116,0 @@

@@ -1,1 +0,1 @@

.. |version| replace:: 0.51
.. |version| replace:: 0.52

@@ -25,3 +25,3 @@ #

q.put(a)
# q.putmany((a for i in xrange(iterations))
# q.putMany((a for i in xrange(iterations))

@@ -28,0 +28,0 @@ q.put('STOP')

@@ -29,3 +29,3 @@ #

def next(self):
return self._callmethod('next')
return self._callMethod('next')

@@ -32,0 +32,0 @@ ##

@@ -61,7 +61,7 @@ #

results = [pool.apply_async(calculate, t) for t in TASKS]
results = [pool.applyAsync(calculate, t) for t in TASKS]
imap_it = pool.imap(calculatestar, TASKS)
imap_unordered_it = pool.imap_unordered(calculatestar, TASKS)
imap_unordered_it = pool.imapUnordered(calculatestar, TASKS)
print 'Ordered results using pool.apply_async():'
print 'Ordered results using pool.applyAsync():'
for r in results:

@@ -76,3 +76,3 @@ print '\t', r.get()

print 'Unordered results using pool.imap_unordered():'
print 'Unordered results using pool.imapUnordered():'
for x in imap_unordered_it:

@@ -185,3 +185,3 @@ print '\t', x

print 'Testing ApplyResult.get() with timeout:',
res = pool.apply_async(calculate, TASKS[0])
res = pool.applyAsync(calculate, TASKS[0])
while 1:

@@ -219,6 +219,6 @@ sys.stdout.flush()

r = pool.apply_async(mul, (7, 8), callback=A.append)
r = pool.applyAsync(mul, (7, 8), callback=A.append)
r.wait()
r = pool.map_async(pow3, range(10), callback=A.extend)
r = pool.mapAsync(pow3, range(10), callback=A.extend)
r.wait()

@@ -246,3 +246,3 @@

result = pool.apply_async(time.sleep, [0.5])
result = pool.applyAsync(time.sleep, [0.5])
pool.close()

@@ -266,3 +266,3 @@ pool.join()

ignore = pool.apply(pow3, [2])
results = [pool.apply_async(time.sleep, [10]) for i in range(10)]
results = [pool.applyAsync(time.sleep, [10]) for i in range(10)]
pool.terminate()

@@ -286,3 +286,3 @@ pool.join()

ignore = pool.apply(pow3, [2])
results = [pool.apply_async(time.sleep, [10]) for i in range(10)]
results = [pool.applyAsync(time.sleep, [10]) for i in range(10)]

@@ -289,0 +289,0 @@ del results, pool

@@ -243,4 +243,4 @@ #

ignore = processing.activeChildren() # cleanup any old processes
if hasattr(processing, '_debug_info'):
info = processing._debug_info()
if hasattr(processing, '_debugInfo'):
info = processing._debugInfo()
if info:

@@ -247,0 +247,0 @@ print info

@@ -22,4 +22,3 @@ #

def worker(input, output):
for item in iter(input.get, 'STOP'):
func, args = item
for func, args in iter(input.get, 'STOP'):
result = calculate(func, args)

@@ -63,3 +62,3 @@ output.put(result)

# Submit tasks
task_queue.putmany(TASKS1)
task_queue.putMany(TASKS1)

@@ -75,3 +74,3 @@ # Start worker processes

# Add more tasks using `put()` instead of `putmany()`
# Add more tasks using `put()` instead of `putMany()`
for task in TASKS2:

@@ -78,0 +77,0 @@ task_queue.put(task)

@@ -25,3 +25,3 @@ <?xml version="1.0" encoding="utf-8" ?>

<tr><th class="docinfo-name">Version:</th>
<td>0.51</td></tr>
<td>0.52</td></tr>
<tr class="field"><th class="docinfo-name">Licence:</th><td class="field-body">BSD Licence</td>

@@ -61,3 +61,3 @@ </tr>

<li><a class="reference" href="http://developer.berlios.de/project/filelist.php?group_id=9001">http://developer.berlios.de/project/filelist.php?group_id=9001</a> or</li>
<li><a class="reference" href="http://cheeseshop.python.org/pypi/processing">http://cheeseshop.python.org/pypi/processing</a></li>
<li><a class="reference" href="http://pypi.python.org/pypi/processing">http://pypi.python.org/pypi/processing</a></li>
</ul>

@@ -113,3 +113,3 @@ </div>

&gt;&gt;&gt; p = Pool(4)
&gt;&gt;&gt; result = p.map_async(f, range(10))
&gt;&gt;&gt; result = p.mapAsync(f, range(10))
&gt;&gt;&gt; print result.get(timeout=1)

@@ -116,0 +116,0 @@ [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

@@ -17,3 +17,3 @@ .. default-role:: literal

http://cheeseshop.python.org/pypi/processing
http://pypi.python.org/pypi/processing

@@ -20,0 +20,0 @@ Otherwise, if you have the correct C compiler setup then the source

@@ -43,3 +43,3 @@ #

__version__ = '0.51'
__version__ = '0.52'

@@ -53,3 +53,3 @@ __all__ = [

#
# Imports
# Absolute imports
#

@@ -59,6 +59,10 @@

import sys
import _processing # from . import _processing
#
# Relative imports - Python 2.4 does not have relative import syntax
#
import _processing
from process import Process, currentProcess, activeChildren
from logger import SUBDEBUG, DEBUG, INFO, SUBWARNING, WARNING
from logger import NOTSET, SUBDEBUG, DEBUG, INFO, SUBWARNING

@@ -130,3 +134,3 @@ #

def enableLogging(level=10, HandlerType=None, handlerArgs=(), format=None):
def enableLogging(level, HandlerType=None, handlerArgs=(), format=None):
'''

@@ -144,6 +148,19 @@ Enable logging using `level` as the debug level

if sys.platform == 'win32' and getattr(sys, 'frozen', False):
from processing import forking
forking.freezeSupport()
from processing.forking import freezeSupport
freezeSupport()
def waitForAnyChild(block=True):
'''
Wait for a child process returning (pid, exitcode, process_object).
Similar to os.wait() but plays nicely with `processing`:
waitForAnyChild()[:2] is equivalent to os.wait().
'''
from processing.forking import waitForAnyChild
return waitForAnyChild(block)
#
# Definitions depending on native semaphores
#
if HAVE_NATIVE_SEMAPHORE:

@@ -150,0 +167,0 @@

@@ -20,3 +20,3 @@ #

from processing.finalize import Finalize
from processing.logger import subdebug
from processing.logger import subDebug
from processing.reduction import connections_are_picklable

@@ -35,3 +35,3 @@

_nextid = itertools.count().next
_nextId = itertools.count().next

@@ -53,3 +53,3 @@ default_family = 'AF_INET'

def arbitrary_address(family):
def arbitraryAddress(family):
'''

@@ -61,6 +61,6 @@ Return an arbitrary free address for the given family

elif family == 'AF_UNIX':
return tempfile.mktemp(prefix='pyc-%d-%d-' % (os.getpid(), _nextid()))
return tempfile.mktemp(prefix='pyc-%d-%d-' % (os.getpid(), _nextId()))
elif family == 'AF_PIPE':
return tempfile.mktemp(
prefix=r'\\.\pipe\pyc-%d-%d-' % (os.getpid(), _nextid())
prefix=r'\\.\pipe\pyc-%d-%d-' % (os.getpid(), _nextId())
)

@@ -71,3 +71,3 @@ else:

def address_type(address):
def addressType(address):
'''

@@ -138,5 +138,5 @@ Return the types of the address

'''
family = family or (address and address_type(address)) \
family = family or (address and addressType(address)) \
or default_family
address = address or arbitrary_address(family)
address = address or arbitraryAddress(family)

@@ -163,4 +163,4 @@ if family == 'AF_PIPE':

if self._authkey:
deliver_challenge(c, self._authkey)
answer_challenge(c, self._authkey)
deliverChallenge(c, self._authkey)
answerChallenge(c, self._authkey)
return c

@@ -182,3 +182,3 @@

'''
family = family or address_type(address)
family = family or addressType(address)
if family == 'AF_PIPE':

@@ -194,5 +194,5 @@ c = PipeClient(address)

if authkey:
answer_challenge(c, authkey)
deliver_challenge(c, authkey)
if authkey is not None:
answerChallenge(c, authkey)
deliverChallenge(c, authkey)

@@ -227,3 +227,3 @@ return c

'''
address = arbitrary_address('AF_PIPE')
address = arbitraryAddress('AF_PIPE')
if duplex:

@@ -283,3 +283,3 @@ openmode = win32.PIPE_ACCESS_DUPLEX

subdebug('listener bound to address %r', self._address)
subDebug('listener bound to address %r', self._address)

@@ -309,3 +309,3 @@ if family == 'AF_UNIX':

'''
family = address_type(address)
family = addressType(address)
s = socket.socket( getattr(socket, family) )

@@ -353,3 +353,3 @@

subdebug('listener created with address=%r', self._address)
subDebug('listener created with address=%r', self._address)

@@ -369,4 +369,4 @@ self.close = Finalize(

)
self._handle_queue.append(newhandle)
handle = self._handle_queue.pop(0)
self._handle_queue.append(newhandle)
try:

@@ -381,3 +381,3 @@ win32.ConnectNamedPipe(handle, win32.NULL)

def _finalize_pipelistener(queue, address):
subdebug('closing listener with address=%r', address)
subDebug('closing listener with address=%r', address)
for handle in queue:

@@ -420,3 +420,3 @@ win32.CloseHandle(handle)

def deliver_challenge(connection, authkey):
def deliverChallenge(connection, authkey):
import hmac, sha

@@ -429,21 +429,25 @@ assert type(authkey) is str, '%r is not a string' % authkey

message = ''.join(str(random.randrange(256)) for i in range(20))
connection.sendbytes('#CHALLENGE:' + message)
connection.sendBytes('#CHALLENGE:' + message)
digest = hmac.new(authkey, message, sha).digest()
response = connection.recvbytes()
response = connection.recvBytes()
if response == digest:
connection.sendbytes('#WELCOME')
connection.sendBytes('#WELCOME')
else:
connection.sendbytes('#AUTHENTICATION_FAILED')
connection.sendBytes('#AUTHENTICATION_FAILED')
raise AuthenticationError, 'digest received was wrong'
def answer_challenge(connection, authkey):
def answerChallenge(connection, authkey):
import hmac, sha
assert type(authkey) is str, '%r is not a string' % authkey
message = connection.recvbytes()
message = connection.recvBytes()
assert message[:11] == '#CHALLENGE:', 'message = %r' % message
message = message[11:]
digest = hmac.new(authkey, message, sha).digest()
connection.sendbytes(digest)
response = connection.recvbytes()
connection.sendBytes(digest)
response = connection.recvBytes()
if response != '#WELCOME':
raise AuthenticationError, 'digest sent was rejected'
# deprecated
deliver_Challenge = deliverChallenge
answer_challenge = answerChallenge

@@ -48,4 +48,4 @@ #

self._in = _in
self.send = self.sendbytes = _out.put
self.recv = self.recvbytes = _in.get
self.send = self.sendbytes = self.sendBytes = _out.put
self.recv = self.recvbytes = self.recvBytes = _in.get

@@ -52,0 +52,0 @@ def poll(self, timeout=0.0):

@@ -12,5 +12,5 @@ #

from processing.logger import subdebug
from processing.logger import subDebug
__all__ = ['Finalize', '_run_finalizers']
__all__ = ['Finalize', '_runFinalizers']

@@ -51,5 +51,5 @@

except KeyError:
subdebug('finalizer no longer registered')
subDebug('finalizer no longer registered')
else:
subdebug('finalizer calling %s with args %s and kwargs %s',
subDebug('finalizer calling %s with args %s and kwargs %s',
self._callback, self._args, self._kwargs)

@@ -73,3 +73,3 @@ self._callback(*self._args, **self._kwargs)

def still_active(self):
def stillActive(self):
'''

@@ -100,3 +100,3 @@ Return whether this finalizer is still waiting to invoke callback

def _run_finalizers(minpriority=None):
def _runFinalizers(minpriority=None):
'''

@@ -113,7 +113,6 @@ Run all finalizers whose exit priority is not None and at least minpriority

items = filter(f, _registry.items())
items.sort(reverse=True)
items = sorted(filter(f, _registry.items()), reverse=True)
for key, finalizer in items:
subdebug('calling %s', finalizer)
subDebug('calling %s', finalizer)
try:

@@ -120,0 +119,0 @@ finalizer()

@@ -12,13 +12,12 @@ #

import signal
import processing
__all__ = ['Popen', 'PicklableOnlyForInheritance', 'assert_spawning', 'exit']
__all__ = ['Popen', 'assertSpawning', 'exit']
#
# While spawning a process on Windows _spawning is set to True
# Check that the current thread is spawining a child process
#
_spawning = False
def assert_spawning(self):
if not _spawning:
def assertSpawning(self):
if not thisThreadIsSpawning():
raise RuntimeError, \

@@ -29,15 +28,2 @@ ('%s objects should only be shared between '

#
# Base for classes which should not be pickled except to enable inheritance
#
class PicklableOnlyForInheritance(object):
def __getstate__(self):
assert_spawning(self)
return self._state
def __setstate__(self, state):
self._setstate(state)
#
# Unix

@@ -48,5 +34,15 @@ #

import time
import errno
exit = os._exit
def thisThreadIsSpawning():
return False
#
# We define a Popen class similar to the one from subprocess, but
# whose constructor takes a process object as its argument, and which
# has terminate() and waitTimeout() methods.
#
class Popen(object):

@@ -85,3 +81,3 @@

def wait_timeout(self, timeout):
def waitTimeout(self, timeout):
deadline = time.time() + timeout

@@ -104,6 +100,6 @@ delay = 0.0005

os.kill(self.pid, signal.SIGTERM)
except OSError:
if self.returncode is not None:
except OSError, e:
if self.waitTimeout(0.1) is None:
raise
#

@@ -114,7 +110,7 @@ # Windows

else:
import imp, weakref, thread, msvcrt, _subprocess, processing
import imp, thread, msvcrt, _subprocess
from os.path import dirname, splitext, basename, abspath
from cPickle import dump, load, HIGHEST_PROTOCOL
from processing._processing import win32
from processing.logger import subwarning
from processing._processing import _hInterruptEvent, _main_thread_ident
from processing.finalize import Finalize

@@ -126,8 +122,11 @@

exit = win32.ExitProcess
_state_lock = thread.allocate_lock()
tls = thread._local()
def thisThreadIsSpawning():
return getattr(tls, 'is_spawning', False)
#
# We define a Popen class similar to the one from subprocess, but
# whose constructor takes a process object as its argument, and which
# has terminate() and wait_timeout() methods.
# has terminate() and waitTimeout() methods.
#

@@ -140,4 +139,2 @@

def __init__(self, process_obj):
global _spawning
# create pipe for communication with child

@@ -153,3 +150,3 @@ r, w = os.pipe()

# start process
cmd = get_commandline() + [rhandle]
cmd = getCommandLine() + [rhandle]
cmd = ' '.join('"%s"' % x for x in cmd)

@@ -168,33 +165,42 @@ hp, ht, pid, tid = _subprocess.CreateProcess(

# send information to child
prep_data = get_preparation_data(process_obj._name)
prep_data = getPreparationData(process_obj._name)
to_child = os.fdopen(w, 'wb')
_state_lock.acquire()
tls.is_spawning = True
try:
_spawning = True
dump(prep_data, to_child, HIGHEST_PROTOCOL)
dump(process_obj, to_child, HIGHEST_PROTOCOL)
finally:
_spawning = False
_state_lock.release()
tls.is_spawning = False
to_child.close()
def wait_timeout(self, timeout):
def waitTimeout(self, timeout):
if self.returncode is None:
millisecs = int(timeout * 1000)
res = _subprocess.WaitForSingleObject(self._handle, millisecs)
if res == _subprocess.WAIT_OBJECT_0:
code = _subprocess.GetExitCodeProcess(self._handle)
if timeout is None:
msecs = win32.INFINITE
else:
msecs = int(timeout * 1000 + 0.5)
if _main_thread_ident == thread.get_ident():
win32.ResetEvent(_hInterruptEvent)
handles = (int(self._handle), _hInterruptEvent)
else:
handles = (int(self._handle),)
res = win32.WaitForMultipleObjects(
len(handles), handles, False, msecs
)
if res == win32.WAIT_OBJECT_0:
code = win32.GetExitCodeProcess(int(self._handle))
if code == TERMINATE:
code = -signal.SIGTERM
self.returncode = code
return self.returncode
def wait(self):
res = None
while res is None:
res = self.wait_timeout(1)
return res
return self.waitTimeout(None)
def poll(self):
return self.wait_timeout(0)
return self.waitTimeout(0)

@@ -206,5 +212,5 @@ def terminate(self):

except WindowsError:
if self.returncode is not None:
if self.waitTimeout(0.1) is None:
raise
#

@@ -214,3 +220,3 @@ #

def is_forking(argv):
def isForking(argv):
'''

@@ -230,3 +236,3 @@ Return whether commandline indicates we are forking

'''
if is_forking(sys.argv):
if isForking(sys.argv):
main()

@@ -236,7 +242,7 @@ sys.exit()

def get_commandline():
def getCommandLine():
'''
Returns prefix of commandline used for spawning a child process
Returns prefix of command line used for spawning a child process
'''
if processing.currentProcess()._identity==() and is_forking(sys.argv):
if processing.currentProcess()._identity==() and isForking(sys.argv):
raise RuntimeError, '''

@@ -266,6 +272,13 @@ Attempt to start a new process before the current process

def get_preparation_data(name):
def getPreparationData(name):
'''
Return info about parent needed by child to unpickle process object
'''
from processing.logger import _logger
if _logger is not None:
log_args = (_logger.getEffectiveLevel(),) + _logger._extra_args
else:
log_args = None
if sys.argv[0] not in ('', '-c') and not WINEXE:

@@ -281,7 +294,7 @@ mainpath = getattr(sys.modules['__main__'], '__file__', None)

processing.currentProcess().getAuthKey(),
None, processing.ORIGINAL_DIR]
None, processing.ORIGINAL_DIR, log_args]
def prepare(name, mainpath, sys_path, sys_argv, authkey,
cur_dir, orig_dir):
cur_dir, orig_dir, log_args):
'''

@@ -296,2 +309,6 @@ Try to get this process ready to unpickle process object

if log_args is not None:
from processing.logger import enableLogging
enableLogging(*log_args)
if orig_dir is not None:

@@ -353,3 +370,2 @@ processing.ORIGINAL_DIR = orig_dir

def main():

@@ -359,3 +375,3 @@ '''

'''
assert is_forking(sys.argv)
assert isForking(sys.argv)

@@ -362,0 +378,0 @@ handle = int(sys.argv[-1])

@@ -20,3 +20,3 @@ #

from processing.logger import info
from processing.forking import PicklableOnlyForInheritance
from processing.forking import assertSpawning

@@ -33,3 +33,3 @@ __all__ = ['BufferWrapper']

class Arena(PicklableOnlyForInheritance):
class Arena(object):

@@ -45,3 +45,7 @@ _nextid = itertools.count().next

def _setstate(self, state):
def __getstate__(self):
assertSpawning(self)
return self._state
def __setstate__(self, state):
self.size, self.name = self._state = state

@@ -53,3 +57,3 @@ self.buffer = mmap.mmap(0, self.size, tagname=self.name)

class Arena(PicklableOnlyForInheritance):
class Arena(object):

@@ -82,2 +86,5 @@ def __init__(self, size):

def __getstate__(self):
assertSpawning(self) # will fail
#

@@ -102,3 +109,3 @@ # Class allowing allocation of chunks of memory from arenas

def _roundup(self, n):
def _roundUp(self, n):
mask = self._alignment - 1

@@ -190,3 +197,3 @@ return (max(1, n) + mask) & ~mask

try:
size = self._roundup(size)
size = self._roundUp(size)
(arena, start, stop) = self._malloc(size)

@@ -216,13 +223,13 @@ new_stop = start + size

def getaddress(self):
def getAddress(self):
(arena, start, stop), size = self._state
address, length = _processing.address_of_buffer(arena.buffer)
address, length = _processing.addressOfBuffer(arena.buffer)
assert size <= length
return address + start
def getview(self):
def getView(self):
(arena, start, stop), size = self._state
return _processing.rwbuffer(arena.buffer, start, size)
return _processing.readWriteBuffer(arena.buffer, start, size)
def getsize(self):
def getSize(self):
return self._state[1]

@@ -11,6 +11,6 @@ #

__all__ = ['enableLogging', 'getLogger', 'subDebug',
'debug', 'info', 'subWarning']
__all__ = ['enableLogging', 'getLogger', 'subdebug',
'debug', 'info', 'subwarning', 'warning']
NOTSET = 0
SUBDEBUG = 5

@@ -20,31 +20,23 @@ DEBUG = 10

SUBWARNING = 25
WARNING = 30
DEFAULT_FORMAT = '[%(levelname)s/%(processName)s] %(message)s'
_logger = None
def subdebug(msg, *args):
def subDebug(msg, *args):
if _logger:
_logger.subdebug(msg, *args)
_logger.log(SUBDEBUG, msg, *args)
def debug(msg, *args):
if _logger:
_logger.debug(msg, *args)
_logger.log(DEBUG, msg, *args)
def info(msg, *args):
if _logger:
_logger.info(msg, *args)
_logger.log(INFO, msg, *args)
def subwarning(msg, *args):
def subWarning(msg, *args):
if _logger:
_logger.subwarning(msg, *args)
_logger.log(SUBWARNING, msg, *args)
def warning(msg, *args):
if _logger:
_logger.warning(msg, *args)
else:
from processing import currentProcess
print >>sys.stderr, ('[WARNING/%s] ' + msg) % \
((currentProcess().getName(),) + args)
def getLogger():

@@ -68,6 +60,7 @@ '''

if _logger is None:
_logger = logging.getLogger('processing')
temp = logging.getLogger('processing')
temp._extra_args = (HandlerType, handlerArgs, format)
_logger = temp
_logger.propagate = 0
# we want `_logger` to support the "%(processName)s" format
def makeRecord(self, *args):

@@ -80,4 +73,2 @@ record = self.__class__.makeRecord(self, *args)

_logger.makeRecord = MethodType(makeRecord, _logger)
_logger.subdebug = MethodType(_logger.log, SUBDEBUG)
_logger.subwarning = MethodType(_logger.log, SUBWARNING)
logging.addLevelName(SUBDEBUG, 'SUBDEBUG')

@@ -87,19 +78,17 @@ logging.addLevelName(SUBWARNING, 'SUBWARNING')

# cleanup func of `processing` should run before that of `logging`
atexit._exithandlers.remove((process._exit_func, (), {}))
atexit._exithandlers.append((process._exit_func, (), {}))
atexit._exithandlers.remove((process._exitFunction, (), {}))
atexit._exithandlers.append((process._exitFunction, (), {}))
if not _logger.handlers or HandlerType:
HandlerType = HandlerType or logging.StreamHandler
format = format or '[%(levelname)s/%(processName)s] %(message)s'
if HandlerType:
format = format or DEFAULT_FORMAT
handler = HandlerType(*handlerArgs)
handler.setFormatter(logging.Formatter(format))
_logger.handlers = [handler]
_logger.handlers = [handler] # overwrites any old handler
_logger.setLevel(level)
process.currentProcess()._logargs = [
level, HandlerType, handlerArgs, format
]
_logger._extra_args = (HandlerType, handlerArgs, format)
else:
_logger.setLevel(level)
process.currentProcess()._logargs[0] = level
finally:
logging._releaseLock()

@@ -28,7 +28,7 @@ #

from processing.connection import Listener, Client, Pipe, AuthenticationError
from processing.connection import deliver_challenge, answer_challenge
from processing.connection import deliverChallenge, answerChallenge
from processing.process import Process, currentProcess
from processing.process import activeChildren, _register_afterfork
from processing.logger import subdebug, debug, info, subwarning
from processing.finalize import Finalize, _run_finalizers
from processing.process import activeChildren, _registerAfterFork
from processing.logger import debug, info, subWarning
from processing.finalize import Finalize, _runFinalizers
from processing.forking import exit

@@ -40,6 +40,6 @@

def reduce_array(a):
def reduceArray(a):
return array.array, (a.typecode, a.tostring())
copy_reg.pickle(array.array, reduce_array)
copy_reg.pickle(array.array, reduceArray)

@@ -112,3 +112,3 @@ #

def all_methods(obj):
def allMethods(obj):
'''

@@ -124,7 +124,7 @@ Return a list of names of methods of `obj`

def public_methods(obj):
def publicMethods(obj):
'''
Return a list of names of methods of `obj` which do not start with '_'
'''
return filter(lambda name: name[0] != '_', all_methods(obj))
return filter(lambda name: name[0] != '_', allMethods(obj))

@@ -139,4 +139,4 @@ #

'''
public = ['shutdown', 'create', 'accept_connection',
'getmethods', 'debug_info', 'dummy', 'incref', 'decref']
public = ['shutdown', 'create', 'acceptConnection',
'getMethods', 'debugInfo', 'dummy', 'incref', 'decref']

@@ -160,3 +160,3 @@ def __init__(self, registry, address, authkey):

def serve_forever(self):
def serveForever(self):
'''

@@ -169,3 +169,3 @@ Run the server forever

c = self.listener.accept()
t = threading.Thread(target=self.handle_request, args=(c,))
t = threading.Thread(target=self.handleRequest, args=(c,))
t.setDaemon(True)

@@ -179,3 +179,3 @@ t.start()

def handle_request(self, c):
def handleRequest(self, c):
'''

@@ -186,4 +186,4 @@ Handle a new connection

try:
deliver_challenge(c, self.authkey)
answer_challenge(c, self.authkey)
deliverChallenge(c, self.authkey)
answerChallenge(c, self.authkey)
request = c.recv()

@@ -212,11 +212,11 @@ ignore, funcname, args, kwds = request

if msg[0] == '#ERROR':
subwarning('Failure to send exception: %r', msg[1])
subWarning('Failure to send exception: %r', msg[1])
else:
subwarning('Failure to send result: %r', msg[1])
subwarning(' ... request was %r', request)
subwarning(' ... exception was %r', e)
subWarning('Failure to send result: %r', msg[1])
subWarning(' ... request was %r', request)
subWarning(' ... exception was %r', e)
c.close()
def serve_client(self, connection):
def serveClient(self, connection):
'''

@@ -307,24 +307,24 @@ Handle requests from the proxies in a particular process/thread

except Exception, e:
subwarning('exception in thread serving %r',
subWarning('exception in thread serving %r',
threading.currentThread().getName())
subwarning(' ... message was %r', msg)
subwarning(' ... exception was %r', e)
subWarning(' ... message was %r', msg)
subWarning(' ... exception was %r', e)
connection.close()
sys.exit(1)
def fallback_getvalue(self, connection, ident, obj):
def fallbackGetValue(self, connection, ident, obj):
return obj
def fallback_str(self, connection, ident, obj):
def fallbackStr(self, connection, ident, obj):
return str(obj)
def fallback_repr(self, connection, ident, obj):
def fallbackRepr(self, connection, ident, obj):
return repr(obj)
def fallback_cmp(self, connection, ident, obj, *args):
def fallbackCmp(self, connection, ident, obj, *args):
return cmp(obj, *args)
fallback_mapping = {
'__str__':fallback_str, '__repr__':fallback_repr,
'__cmp__':fallback_cmp, '#GETVALUE':fallback_getvalue
'__str__':fallbackStr, '__repr__':fallbackRepr,
'__cmp__':fallbackCmp, '#GETVALUE':fallbackGetValue
}

@@ -335,3 +335,3 @@

def debug_info(self, c):
def debugInfo(self, c):
'''

@@ -362,3 +362,3 @@ Return some info --- useful to spot problems with refcounting

# do some cleaning up
_run_finalizers(0)
_runFinalizers(0)
for p in activeChildren():

@@ -370,3 +370,3 @@ debug('terminating a child process of manager')

p.join()
_run_finalizers()
_runFinalizers()
info('manager exiting with exitcode 0')

@@ -387,3 +387,3 @@

if exposed is None:
exposed = public_methods(obj)
exposed = publicMethods(obj)

@@ -401,3 +401,3 @@ ident = id(obj)

def getmethods(self, c, token):
def getMethods(self, c, token):
'''

@@ -408,3 +408,3 @@ Return the methods of the shared object indicated by token

def accept_connection(self, c, name):
def acceptConnection(self, c, name):
'''

@@ -415,3 +415,3 @@ Spawn a new thread to serve this connection

c.send(('#RETURN', None))
self.serve_client(c)
self.serveClient(c)

@@ -475,3 +475,3 @@ def incref(self, c, ident):

self._registry, _ = BaseManager._get_registry_creators(self)
self._registry, _ = BaseManager._getRegistryCreators(self)

@@ -483,3 +483,3 @@ # pipe over which we will retreive address of server

self._process = Process(
target=self._run_server,
target=self._runServer,
args=(self._registry, self._address, self._authkey, writer),

@@ -499,3 +499,3 @@ )

self.shutdown = Finalize(
self, BaseManager._finalize_manager,
self, BaseManager._finalizeManager,
args=(self._process, self._address, self._authkey),

@@ -506,3 +506,3 @@ exitpriority=0

@classmethod
def _run_server(cls, registry, address, authkey, writer):
def _runServer(cls, registry, address, authkey, writer):
'''

@@ -521,5 +521,5 @@ Create a server, report its address and run it

info('manager serving at %r', server.address)
server.serve_forever()
server.serveForever()
def serve_forever(self, verbose=True):
def serveForever(self, verbose=True):
'''

@@ -531,3 +531,3 @@ Start server in the current process

registry, _ = BaseManager._get_registry_creators(self)
registry, _ = BaseManager._getRegistryCreators(self)
server = Server(registry, self._address, self._authkey)

@@ -538,6 +538,6 @@ currentProcess()._server = server

(type(self).__name__, server.address)
server.serve_forever()
server.serveForever()
@classmethod
def from_address(cls, address, authkey):
def fromAddress(cls, address, authkey):
'''

@@ -567,9 +567,9 @@ Create a new manager object for a pre-existing server process

def _debug_info(self):
def _debugInfo(self):
'''
Return some info about the servers shared objects and connections
'''
return transact(self._address, self._authkey, 'debug_info')
return transact(self._address, self._authkey, 'debugInfo')
def _proxy_from_token(self, token):
def _proxyFromToken(self, token):
'''

@@ -579,3 +579,3 @@ Create a proxy for a token

assert token.address == self.address
_, creators = BaseManager._get_registry_creators(self)
_, creators = BaseManager._getRegistryCreators(self)
proxytype = creators[token.typeid]._proxytype

@@ -585,3 +585,3 @@ return proxytype(token, authkey=self._authkey)

@staticmethod
def _get_registry_creators(self_or_cls):
def _getRegistryCreators(self_or_cls):
registry = {}

@@ -606,3 +606,3 @@ creators = {}

@staticmethod
def _finalize_manager(process, address, authkey):
def _finalizeManager(process, address, authkey):
'''

@@ -637,2 +637,6 @@ Shutdown the manager process; will be registered as a finalizer

# deprecated
from_address = fromAddress
serve_forever = serveForever
#

@@ -653,3 +657,3 @@ # Function for adding methods to managers

typeid = typeid or _unique_label(callable.__name__)
typeid = typeid or _uniqueLabel(callable.__name__)

@@ -672,3 +676,3 @@ def temp(self, *args, **kwds):

def _unique_label(prefix, _count={}):
def _uniqueLabel(prefix, _count={}):
'''

@@ -690,3 +694,3 @@ Return a string beginning with 'prefix' which has not already been used.

def __init__(self):
_register_afterfork(self, set.clear)
_registerAfterFork(self, set.clear)
def __reduce__(self):

@@ -697,7 +701,7 @@ return type(self), ()

def __init__(self):
_register_afterfork(self, clear_namespace)
_registerAfterFork(self, _clearNamespace)
def __reduce__(self):
return type(self), ()
def clear_namespace(obj):
def _clearNamespace(obj):
obj.__dict__.clear()

@@ -750,3 +754,3 @@

_register_afterfork(self, BaseProxy._afterfork)
_registerAfterFork(self, BaseProxy._afterFork)

@@ -759,6 +763,6 @@ def _connect(self):

connection = Client(self._token.address, authkey=self._authkey)
dispatch(connection, None, 'accept_connection', (name,))
dispatch(connection, None, 'acceptConnection', (name,))
self._tls.connection = connection
def _callmethod(self, methodname, args=(), kwds={}):
def _callMethod(self, methodname, args=(), kwds={}):
'''

@@ -784,7 +788,7 @@ Try to call a method of the referrent and return a copy of the result

def _getvalue(self):
def _getValue(self):
'''
Get a copy of the value of the referent
'''
return self._callmethod('#GETVALUE')
return self._callMethod('#GETVALUE')

@@ -813,3 +817,3 @@ def _incref(self):

# check whether manager is still alive
manager_still_alive = shutdown is None or shutdown.still_active()
manager_still_alive = shutdown is None or shutdown.stillActive()
if manager_still_alive:

@@ -838,3 +842,3 @@ # tell manager this process no longer cares about referent

def _afterfork(self):
def _afterFork(self):
self._manager = None

@@ -851,3 +855,3 @@ self._incref()

def __deepcopy__(self, memo):
return self._getvalue()
return self._getValue()

@@ -866,3 +870,3 @@ def __hash__(self):

try:
return self._callmethod('__repr__')
return self._callMethod('__repr__')
except (SystemExit, KeyboardInterrupt):

@@ -873,2 +877,6 @@ raise

# deprecated
_callmethod = _callMethod
_getvalue = _getValue
#

@@ -878,5 +886,5 @@ # Since BaseProxy._mutex might be locked at time of fork we reset it

def reset_mutex(obj):
def _resetMutex(obj):
obj._mutex = threading.Lock()
_register_afterfork(BaseProxy, reset_mutex)
_registerAfterFork(BaseProxy, _resetMutex)

@@ -926,3 +934,3 @@ #

exec '''def %s(self, *args, **kwds):
return self._callmethod(%r, args, kwds)''' % (name, name) in dic
return self._callMethod(%r, args, kwds)''' % (name, name) in dic

@@ -941,3 +949,3 @@ ProxyType = type('AutoProxy[%s]' % typeid, (BaseProxy,), dic)

if exposed is None:
exposed = transact(token.address, authkey, 'getmethods', (token,))
exposed = transact(token.address, authkey, 'getMethods', (token,))
ProxyType = MakeAutoProxyType(exposed, token.typeid)

@@ -1003,3 +1011,3 @@ proxy = ProxyType(token, manager=manager, authkey=authkey, incref=incref)

def next(self):
return self._callmethod('next')
return self._callMethod('next')

@@ -1017,10 +1025,10 @@ BaseManager._Iter = CreatorMethod(iter, IteratorProxy, ('next', '__iter__'))

def acquire(self, blocking=1):
return self._callmethod('acquire', (blocking,))
return self._callMethod('acquire', (blocking,))
def release(self):
return self._callmethod('release')
return self._callMethod('release')
def __enter__(self):
self._callmethod('acquire')
self._callMethod('acquire')
return self
def __exit__(self, exc_type, exc_val, exc_tb):
return self._callmethod('release')
return self._callMethod('release')

@@ -1030,7 +1038,7 @@

def wait(self, timeout=None):
return self._callmethod('wait', (timeout,))
return self._callMethod('wait', (timeout,))
def notify(self):
return self._callmethod('notify')
return self._callMethod('notify')
def notifyAll(self):
return self._callmethod('notifyAll')
return self._callMethod('notifyAll')

@@ -1048,3 +1056,3 @@

return object.__getattribute__(self, key)
callmethod = object.__getattribute__(self, '_callmethod')
callmethod = object.__getattribute__(self, '_callMethod')
return callmethod('__getattribute__', (key,))

@@ -1055,3 +1063,3 @@

return object.__setattr__(self, key, value)
callmethod = object.__getattribute__(self, '_callmethod')
callmethod = object.__getattribute__(self, '_callMethod')
return callmethod('__setattr__', (key, value))

@@ -1062,3 +1070,3 @@

return object.__delattr__(self, key)
callmethod = object.__getattribute__(self, '_callmethod')
callmethod = object.__getattribute__(self, '_callMethod')
return callmethod('__delattr__', (key,))

@@ -1080,7 +1088,7 @@

def __iadd__(self, value):
self._callmethod('extend', (value,))
self._callMethod('extend', (value,))
return self
def __imul__(self, value):
# Inefficient since a copy of the target is transferred and discarded
self._callmethod('__imul__', (value,))
self._callMethod('__imul__', (value,))
return self

@@ -1099,5 +1107,5 @@

def get(self):
return self._callmethod('get')
return self._callMethod('get')
def set(self, value):
return self._callmethod('set', (value,))
return self._callMethod('set', (value,))
value = property(get, set)

@@ -1104,0 +1112,0 @@

@@ -39,3 +39,3 @@ #

newjobid = itertools.count().next
newJobId = itertools.count().next

@@ -96,3 +96,3 @@ def mapstar(args):

self._task_handler = threading.Thread(
target=Pool._handle_tasks,
target=Pool._handleTasks,
args=(self._taskqueue, self._inqueue, self._outqueue, self._pool)

@@ -105,3 +105,3 @@ )

self._result_handler = threading.Thread(
target=Pool._handle_results,
target=Pool._handleResults,
args=(self._outqueue, self._cache)

@@ -114,3 +114,3 @@ )

self._terminate = Finalize(
self, Pool._terminate_pool,
self, Pool._terminatePool,
args=(self._taskqueue, self._inqueue, self._outqueue,

@@ -127,3 +127,3 @@ self._cache, self._pool, self._task_handler,

assert self._state == RUN
return self.apply_async(func, args, kwds).get()
return self.applyAsync(func, args, kwds).get()

@@ -135,3 +135,3 @@ def map(self, func, iterable, chunksize=None):

assert self._state == RUN
return self.map_async(func, iterable, chunksize).get()
return self.mapAsync(func, iterable, chunksize).get()

@@ -146,13 +146,13 @@ def imap(self, func, iterable, chunksize=1):

self._taskqueue.put((((result._job, i, func, (x,), {})
for i, x in enumerate(iterable)), result._setlength))
for i, x in enumerate(iterable)), result._setLength))
return result
else:
assert chunksize > 1
task_batches = Pool._gettasks(func, iterable, chunksize)
task_batches = Pool._getTasks(func, iterable, chunksize)
result = IMapIterator(self._cache)
self._taskqueue.put((((result._job, i, mapstar, (x,), {})
for i, x in enumerate(task_batches)), result._setlength))
for i, x in enumerate(task_batches)), result._setLength))
return (item for chunk in result for item in chunk)
def imap_unordered(self, func, iterable, chunksize=1):
def imapUnordered(self, func, iterable, chunksize=1):
'''

@@ -165,13 +165,13 @@ Like `imap()` method but ordering of results is arbitrary

self._taskqueue.put((((result._job, i, func, (x,), {})
for i, x in enumerate(iterable)), result._setlength))
for i, x in enumerate(iterable)), result._setLength))
return result
else:
assert chunksize > 1
task_batches = Pool._gettasks(func, iterable, chunksize)
task_batches = Pool._getTasks(func, iterable, chunksize)
result = IMapUnorderedIterator(self._cache)
self._taskqueue.put((((result._job, i, mapstar, (x,), {})
for i, x in enumerate(task_batches)), result._setlength))
for i, x in enumerate(task_batches)), result._setLength))
return (item for chunk in result for item in chunk)
def apply_async(self, func, args=(), kwds={}, callback=None):
def applyAsync(self, func, args=(), kwds={}, callback=None):
'''

@@ -184,4 +184,4 @@ Asynchronous equivalent of `apply()` builtin

return result
def map_async(self, func, iterable, chunksize=None, callback=None):
def mapAsync(self, func, iterable, chunksize=None, callback=None):
'''

@@ -199,3 +199,3 @@ Asynchronous equivalent of `map()` builtin

task_batches = Pool._gettasks(func, iterable, chunksize)
task_batches = Pool._getTasks(func, iterable, chunksize)
result = MapResult(self._cache, chunksize, len(iterable), callback)

@@ -205,9 +205,9 @@ self._taskqueue.put((((result._job, i, mapstar, (x,), {})

return result
@staticmethod
def _handle_tasks(taskqueue, inqueue, outqueue, pool):
def _handleTasks(taskqueue, inqueue, outqueue, pool):
thread = threading.currentThread()
put = inqueue._writer.send
for taskseq, setlength in iter(taskqueue.get, None):
for taskseq, setLength in iter(taskqueue.get, None):
i = -1

@@ -220,5 +220,5 @@ for i, task in enumerate(taskseq):

else:
if setlength:
debug('doing setlength()')
setlength(i+1)
if setLength:
debug('doing setLength()')
setLength(i+1)
continue

@@ -240,3 +240,3 @@ break

@staticmethod
def _handle_results(outqueue, cache):
def _handleResults(outqueue, cache):
thread = threading.currentThread()

@@ -272,3 +272,3 @@ get = outqueue._reader.recv

@staticmethod
def _gettasks(func, it, size):
def _getTasks(func, it, size):
it = iter(it)

@@ -304,3 +304,3 @@ while 1:

@staticmethod
def _terminate_pool(taskqueue, inqueue, outqueue, cache, pool,
def _terminatePool(taskqueue, inqueue, outqueue, cache, pool,
task_handler, result_handler):

@@ -347,4 +347,9 @@ debug('finalizing pool')

# deprecated
apply_async = applyAsync
map_async = mapAsync
imap_unordered = imapUnordered
#
# Class whose instances are returned by `Pool.apply_async()`
# Class whose instances are returned by `Pool.applyAsync()`
#

@@ -356,3 +361,3 @@

self._cond = threading.Condition(threading.Lock())
self._job = newjobid()
self._job = newJobId()
self._cache = cache

@@ -400,3 +405,3 @@ self._ready = False

#
# Class whose instances are returned by `Pool.map_async()`
# Class whose instances are returned by `Pool.mapAsync()`
#

@@ -451,3 +456,3 @@

self._cond = threading.Condition(threading.Lock())
self._job = newjobid()
self._job = newJobId()
self._cache = cache

@@ -505,3 +510,3 @@ self._items = collections.deque()

def _setlength(self, length):
def _setLength(self, length):
self._cond.acquire()

@@ -517,3 +522,3 @@ try:

#
# Class whose instances are returned by `Pool.imap_unordered()`
# Class whose instances are returned by `Pool.imapUnordered()`
#

@@ -520,0 +525,0 @@

@@ -42,2 +42,6 @@ #

return list(_current_process._children)
#
#
#

@@ -68,5 +72,5 @@ def _cleanup():

self._daemonic = _current_process._daemonic
self._logargs = _current_process._logargs
self._parent_pid = os.getpid()
self._popen = None
self._exiting = False

@@ -114,5 +118,5 @@ self._target = target

else:
res = self._popen.wait_timeout(timeout)
res = self._popen.waitTimeout(timeout)
if res is not None:
_cleanup()
_current_process._children.discard(self)

@@ -222,7 +226,4 @@ def isAlive(self):

_registry.clear()
if sys.platform == 'win32' and self._logargs is not None:
from processing.logger import enableLogging
enableLogging(*self._logargs)
_current_process = self
_afterfork()
_runAfterForkers()
info('child process calling self.run()')

@@ -233,3 +234,3 @@ try:

finally:
_exit_func()
_exitFunction()
except SystemExit, e:

@@ -252,11 +253,2 @@ if not e.args:

def __getstate__(self):
# disallow pickling except for the purpose of inheritance on Windows
from processing.forking import assert_spawning
assert_spawning(self)
return dict(self.__dict__)
def __setstate__(self, state):
self.__dict__.update(state)
#

@@ -274,3 +266,2 @@ # Create object representing the main process

self._popen = None
self._logargs = None
self._counter = itertools.count(1)

@@ -298,3 +289,3 @@ self._children = set()

def _reduce_method(m):
def _reduceMethod(m):
if m.im_self is None:

@@ -305,3 +296,3 @@ return getattr, (m.im_class, m.im_func.func_name)

copy_reg.pickle(type(_current_process.start), _reduce_method)
copy_reg.pickle(type(_current_process.start), _reduceMethod)

@@ -313,5 +304,5 @@ #

_afterfork_registry = weakref.WeakValueDictionary()
_afterfork_count = itertools.count()
_afterForkerId = itertools.count().next
def _afterfork():
def _runAfterForkers():
# execute in order of registration

@@ -321,4 +312,4 @@ for (index, ident, func), obj in sorted(_afterfork_registry.items()):

def _register_afterfork(obj, func):
_afterfork_registry[(_afterfork_count.next(), id(obj), func)] = obj
def _registerAfterFork(obj, func):
_afterfork_registry[(_afterForkerId(), id(obj), func)] = obj

@@ -329,4 +320,4 @@ #

def _exit_func():
from processing.finalize import _run_finalizers
def _exitFunction():
from processing.finalize import _runFinalizers
from processing.logger import info

@@ -337,3 +328,3 @@

info('running all "atexit" finalizers with priority >= 0')
_run_finalizers(0)
_runFinalizers(0)

@@ -350,4 +341,4 @@ for p in activeChildren():

info('running the remaining "atexit" finalizers')
_run_finalizers()
_runFinalizers()
atexit.register(_exit_func)
atexit.register(_exitFunction)

@@ -23,6 +23,6 @@ #

from processing.synchronize import Lock, BoundedSemaphore
from processing.logger import debug, subwarning
from processing.logger import debug, subWarning
from processing.finalize import Finalize
from processing.process import _exit_func, _register_afterfork
from processing.forking import PicklableOnlyForInheritance
from processing.process import _exitFunction, _registerAfterFork
from processing.forking import assertSpawning

@@ -33,4 +33,4 @@ #

atexit._exithandlers.remove((_exit_func, (), {}))
atexit._exithandlers.append((_exit_func, (), {}))
atexit._exithandlers.remove((_exitFunction, (), {}))
atexit._exithandlers.append((_exitFunction, (), {}))

@@ -41,3 +41,3 @@ #

class Queue(PicklableOnlyForInheritance):
class Queue(object):

@@ -59,8 +59,12 @@ def __init__(self, maxsize=0):

state = maxsize, reader, writer, rlock, wlock, sem, os.getpid()
self._setstate(state)
self.__setstate__(state)
if sys.platform != 'win32':
_register_afterfork(self, Queue._afterfork)
_registerAfterFork(self, Queue._afterFork)
def _setstate(self, state):
def __getstate__(self):
assertSpawning(self)
return self._state
def __setstate__(self, state):
(self._maxsize, self._reader, self._writer,

@@ -71,6 +75,6 @@ self._rlock, self._wlock, self._sem, self._opid) = self._state = state

self._poll = self._reader.poll
self._afterfork()
self._afterFork()
def _afterfork(self):
debug('Queue._afterfork()')
def _afterFork(self):
debug('Queue._afterFork()')
self._notempty = threading.Condition(threading.Lock())

@@ -93,3 +97,3 @@ self._buffer = collections.deque()

if self._thread is None:
self._startthread()
self._startThread()
self._buffer.append(obj)

@@ -100,3 +104,3 @@ self._notempty.notify()

def putmany(self, iterable):
def putMany(self, iterable):
assert not self._closed

@@ -108,3 +112,3 @@ assert self._maxsize == 0

if self._thread is None:
self._startthread()
self._startThread()
self._buffer.extend(iterable)

@@ -114,3 +118,3 @@ self._notempty.notify()

self._notempty.release()
def get(self, block=True, timeout=None):

@@ -149,8 +153,14 @@ if block and timeout is None:

def full(self):
return bool(self._sem) and self._sem._semlock._getvalue() == 0
if self._sem:
if self._sem.acquire(False):
self._sem.release()
return False
return True
else:
return False
def get_nowait(self):
def getNoWait(self):
return self.get(False)
def put_nowait(self, obj):
def putNoWait(self, obj):
return self.put(obj, False)

@@ -164,10 +174,10 @@

def jointhread(self):
debug('Queue.jointhread()')
def joinThread(self):
debug('Queue.joinThread()')
assert self._closed
if self._jointhread:
self._jointhread()
def canceljoin(self):
debug('Queue.canceljoin()')
def cancelJoin(self):
debug('Queue.cancelJoin()')
self._joincancelled = True

@@ -179,4 +189,4 @@ try:

def _startthread(self):
debug('Queue._startthread()')
def _startThread(self):
debug('Queue._startThread()')

@@ -206,3 +216,3 @@ # Start thread which transfers data from buffer to pipe

self._jointhread = Finalize(
self._thread, Queue._finalize_join,
self._thread, Queue._finalizeJoin,
[weakref.ref(self._thread)],

@@ -214,3 +224,3 @@ exitpriority=-5

self._close = Finalize(
self, Queue._finalize_close,
self, Queue._finalizeClose,
[self._buffer, self._notempty],

@@ -221,3 +231,3 @@ exitpriority=10

@staticmethod
def _finalize_join(twr):
def _finalizeJoin(twr):
debug('joining queue thread')

@@ -232,3 +242,3 @@ thread = twr()

@staticmethod
def _finalize_close(buffer, notempty):
def _finalizeClose(buffer, notempty):
debug('telling queue thread to quit')

@@ -288,7 +298,15 @@ notempty.acquire()

# started to cleanup.
if getattr(currentProcess(), '_exiting', False):
subwarning('error in queue thread: %s', e)
if currentProcess()._exiting:
subWarning('error in queue thread: %s', e)
else:
raise
get_nowait = getNoWait
put_nowait = putNoWait
# deprecated
putmany = putMany
jointhread = joinThread
canceljoin = cancelJoin
_sentinel = object()

@@ -300,3 +318,3 @@

class SimpleQueue(PicklableOnlyForInheritance):
class SimpleQueue(object):

@@ -309,3 +327,3 @@ def __init__(self):

state = reader, writer, Lock(), Lock()
self._setstate(state)
self.__setstate__(state)

@@ -315,3 +333,7 @@ def empty(self):

def _setstate(self, state):
def __getstate__(self):
assertSpawning(self)
return self._state
def __setstate__(self, state):
(self._reader, self._writer, self._rlock, self._wlock) \

@@ -318,0 +340,0 @@ = self._state = state

@@ -15,8 +15,11 @@ #

import sys
import copy_reg
import socket
import threading
import copy_reg
import processing
from processing import _processing, currentProcess
from processing.logger import debug, subdebug
from processing import _processing
from processing.logger import debug, subDebug, subWarning
from processing.forking import thisThreadIsSpawning
from processing.process import _registerAfterFork

@@ -28,3 +31,3 @@ #

connections_are_picklable = (
sys.platform == 'win32' or hasattr(_processing, 'recvfd')
sys.platform == 'win32' or hasattr(_processing, 'recvFd')
)

@@ -37,121 +40,132 @@

s = socket._socket.socket()
_processing.changefd(s, fd, family, type, proto)
_processing.changeFd(s, fd, family, type, proto)
return s
#
# Platform specific definitions
#
#
if sys.platform == 'win32':
import msvcrt, _subprocess
import _subprocess
from processing._processing import win32
closefd = _processing.win32.CloseHandle
#
# Handles can be transferred between processes using `DuplicateHandle()`
#
closeHandle = win32.CloseHandle
def duplicateHandle(handle):
return _subprocess.DuplicateHandle(
_subprocess.GetCurrentProcess(), handle,
_subprocess.GetCurrentProcess(),
0, False, _subprocess.DUPLICATE_SAME_ACCESS
).Detach()
def reduce_handle(handle):
subdebug('reducing handle %d', handle)
flags = win32.GetHandleInformation(handle)
return (os.getpid(), handle, flags & win32.HANDLE_FLAG_INHERIT)
def rebuild_handle(reduced_handle):
pid, old_handle, inheritable = reduced_handle
subdebug('rebuilding handle %d from PID=%d', old_handle, pid)
if inheritable and getattr(currentProcess(), '_inheriting', False):
return old_handle
def sendHandle(conn, handle, destination_pid):
process_handle = win32.OpenProcess(
win32.PROCESS_ALL_ACCESS, False, pid
win32.PROCESS_ALL_ACCESS, False, destination_pid
)
try:
new_handle = _subprocess.DuplicateHandle(
process_handle, old_handle, _subprocess.GetCurrentProcess(),
0, False, _subprocess.DUPLICATE_SAME_ACCESS
_subprocess.GetCurrentProcess(), handle,
process_handle, 0, False, _subprocess.DUPLICATE_SAME_ACCESS
)
conn.send(new_handle.Detach())
finally:
win32.CloseHandle(process_handle)
def recvHandle(conn):
return conn.recv()
return new_handle.Detach()
def isInheritableHandle(handle):
return (win32.GetHandleInformation(handle) & win32.HANDLE_FLAG_INHERIT)
else:
closeHandle = os.close
duplicateHandle = os.dup
#
# Register `_processing.PipeConnection` with `copy_reg`
#
def sendHandle(conn, handle, destination_pid):
_processing.sendFd(conn.fileno(), handle)
def recvHandle(conn):
return _processing.recvFd(conn.fileno())
def reduce_pipe_connection(conn):
return rebuild_pipe_connection, (reduce_handle(conn.fileno()),)
def rebuild_pipe_connection(reduced_handle):
handle = rebuild_handle(reduced_handle)
return _processing.PipeConnection(handle, duplicate=False)
copy_reg.pickle(_processing.PipeConnection, reduce_pipe_connection)
def isInheritableHandle(handle):
return True
else:
#
# Support for a per-process server thread which caches pickled handles
#
#
# On Unix file descriptors can be transferred between processes
# over Unix domain sockets.
#
# When we first use `reduce_handle()` we start a server thread
# which listens to a Unix domain socket for connections from
# client processes. When a client process asks for an fd the
# thread sends it using `_processing.sendfd()`.
#
closefd = os.close
_cache = set()
_fd_cache = set()
_fd_lock = threading.Lock()
_fd_listener = None
def _reset(obj):
global _lock, _listener, _cache
for h in _cache:
closeHandle(h)
_cache.clear()
_lock = threading.Lock()
_listener = None
def _share_fds():
while 1:
try:
conn = _fd_listener.accept()
fd_wanted = conn.recv()
_fd_cache.remove(fd_wanted)
_processing.sendfd(conn.fileno(), fd_wanted)
closefd(fd_wanted)
conn.close()
except (SystemExit, KeyboardInterrupt):
raise
except:
import traceback
traceback.print_exc()
_reset(None)
_registerAfterFork(_reset, _reset)
def reduce_handle(fd):
global _fd_listener
def _getListener():
global _listener
if not connections_are_picklable:
raise RuntimeError, 'pickling of file dscriptors not supported'
if _fd_listener is None:
_fd_lock.acquire()
try:
if _fd_listener is None:
from processing.connection import Listener
debug('starting listener and thread for sending fds')
_fd_listener = Listener(family='AF_UNIX',authenticate=True)
t = threading.Thread(target=_share_fds)
t.setDaemon(True)
t.start()
finally:
_fd_lock.release()
if _listener is None:
_lock.acquire()
try:
if _listener is None:
from processing.connection import Listener
debug('starting listener and thread for sending handles')
_listener = Listener(authenticate=True)
t = threading.Thread(target=_serve)
t.setDaemon(True)
t.start()
finally:
_lock.release()
dup_fd = os.dup(fd)
_fd_cache.add(dup_fd)
subdebug('reducing fd %d', fd)
return (_fd_listener.address, dup_fd)
return _listener
def rebuild_handle(reduced_handle):
from processing.connection import Client
address, fd = reduced_handle
subdebug('rebuilding fd %d', fd)
conn = Client(address, authenticate=True)
conn.send(fd)
return _processing.recvfd(conn.fileno())
def _serve():
while 1:
try:
conn = _listener.accept()
handle_wanted, destination_pid = conn.recv()
_cache.remove(handle_wanted)
sendHandle(conn, handle_wanted, destination_pid)
closeHandle(handle_wanted)
conn.close()
except (SystemExit, KeyboardInterrupt):
raise
except:
if not processing.currentProcess()._exiting:
import traceback
subWarning(
'thread for sharing handles raised exception :\n' +
'-'*79 + '\n' + traceback.format_exc() + '-'*79
)
#
# Functions to be used for pickling/unpickling objects with handles
#
def reduceHandle(handle):
if thisThreadIsSpawning() and isInheritableHandle(handle):
return (None, handle, True)
dup_handle = duplicateHandle(handle)
_cache.add(dup_handle)
subDebug('reducing handle %d', handle)
return (_getListener().address, dup_handle, False)
def rebuildHandle(pickled_data):
from processing.connection import Client
address, handle, inherited = pickled_data
if inherited:
return handle
subDebug('rebuilding handle %d', handle)
conn = Client(address, authenticate=True)
conn.send((handle, os.getpid()))
new_handle = recvHandle(conn)
conn.close()
return new_handle
#

@@ -161,10 +175,10 @@ # Register `_processing.Connection` with `copy_reg`

def reduce_connection(conn):
return rebuild_connection, (reduce_handle(conn.fileno()),)
def reduceConnection(conn):
return rebuildConnection, (reduceHandle(conn.fileno()),)
def rebuild_connection(reduced_handle):
fd = rebuild_handle(reduced_handle)
def rebuildConnection(reduced_handle):
fd = rebuildHandle(reduced_handle)
return _processing.Connection(fd, duplicate=False)
copy_reg.pickle(_processing.Connection, reduce_connection)
copy_reg.pickle(_processing.Connection, reduceConnection)

@@ -175,3 +189,3 @@ #

def reduce_socket(s):
def reduceSocket(s):
try:

@@ -185,11 +199,26 @@ Family, Type, Proto = s.family, s.type, s.proto

Proto = 0
reduced_handle = reduce_handle(s.fileno())
return rebuild_socket, (reduced_handle, Family, Type, Proto)
reduced_handle = reduceHandle(s.fileno())
return rebuildSocket, (reduced_handle, Family, Type, Proto)
def rebuild_socket(reduced_handle, family, type, proto):
fd = rebuild_handle(reduced_handle)
def rebuildSocket(reduced_handle, family, type, proto):
fd = rebuildHandle(reduced_handle)
_sock = fromfd(fd, family, type, proto)
closefd(fd)
closeHandle(fd)
return socket.socket(_sock=_sock)
copy_reg.pickle(socket.socket, reduce_socket)
copy_reg.pickle(socket.socket, reduceSocket)
#
# Register `_processing.PipeConnection` with `copy_reg`
#
if sys.platform == 'win32':
def reducePipeConnection(conn):
return rebuildPipeConnection, (reduceHandle(conn.fileno()),)
def rebuildPipeConnection(reduced_handle):
handle = rebuildHandle(reduced_handle)
return _processing.PipeConnection(handle, duplicate=False)
copy_reg.pickle(_processing.PipeConnection, reducePipeConnection)

@@ -15,3 +15,3 @@ #

from processing import heap, RLock
from processing.forking import PicklableOnlyForInheritance, assert_spawning
from processing.forking import assertSpawning

@@ -37,6 +37,6 @@ __all__ = ['RawValue', 'RawArray', 'Value', 'Array', 'copy', 'synchronized']

def _new_value(type_):
def _newValue(type_):
size = ctypes.sizeof(type_)
wrapper = heap.BufferWrapper(size)
return rebuild_ctype(type_, wrapper, None)
return rebuildCtype(type_, wrapper, None)

@@ -48,3 +48,3 @@ def RawValue(typecode_or_type, *args):

type_ = typecode_to_type.get(typecode_or_type, typecode_or_type)
obj = _new_value(type_)
obj = _newValue(type_)
ctypes.memset(ctypes.addressof(obj), 0, ctypes.sizeof(obj))

@@ -61,6 +61,6 @@ obj.__init__(*args)

type_ = type_ * size_or_initializer
return _new_value(type_)
return _newValue(type_)
else:
type_ = type_ * len(size_or_initializer)
result = _new_value(type_)
result = _newValue(type_)
result.__init__(*size_or_initializer)

@@ -98,3 +98,3 @@ return result

def copy(obj):
new_obj = _new_value(type(obj))
new_obj = _newValue(type(obj))
ctypes.pointer(new_obj)[0] = obj

@@ -118,3 +118,3 @@ return new_obj

names = [field[0] for field in cls._fields_]
d = dict((name, makeproperty(name)) for name in names)
d = dict((name, makeProperty(name)) for name in names)
classname = 'Synchronized' + cls.__name__

@@ -128,16 +128,16 @@ scls = classcache[cls] = type(classname, (SynchronizedBase,), d)

def reduce_ctype(obj):
def reduceCtype(obj):
assert sys.platform == 'win32', \
'synchronized objects should only be shared through inheritance'
if isinstance(obj, ctypes.Array):
return rebuild_ctype, (obj._type_, obj._wrapper, obj._length_)
return rebuildCtype, (obj._type_, obj._wrapper, obj._length_)
else:
return rebuild_ctype, (type(obj), obj._wrapper, None)
return rebuildCtype, (type(obj), obj._wrapper, None)
def rebuild_ctype(type_, wrapper, length):
def rebuildCtype(type_, wrapper, length):
if length is not None:
type_ = type_ * length
if sys.platform == 'win32' and type_ not in copy_reg.dispatch_table:
copy_reg.pickle(type_, reduce_ctype)
obj = type_.from_address(wrapper.getaddress())
copy_reg.pickle(type_, reduceCtype)
obj = type_.from_address(wrapper.getAddress())
obj._wrapper = wrapper

@@ -150,3 +150,3 @@ return obj

def makeproperty(name):
def makeProperty(name):
try:

@@ -183,3 +183,3 @@ return propcache[name]

class SynchronizedBase(PicklableOnlyForInheritance):
class SynchronizedBase(object):

@@ -193,3 +193,3 @@ def __init__(self, obj, lock=None):

def __reduce__(self):
assert_spawning(self)
assertSpawning(self)
return synchronized, (self._obj, self._lock)

@@ -208,3 +208,3 @@

class Synchronized(SynchronizedBase):
value = makeproperty('value')
value = makeProperty('value')

@@ -247,3 +247,3 @@

class SynchronizedString(SynchronizedArray):
value = makeproperty('value')
raw = makeproperty('raw')
value = makeProperty('value')
raw = makeProperty('raw')

@@ -20,6 +20,6 @@ #

from processing import _processing
from processing.process import currentProcess, _register_afterfork
from processing.logger import debug, subdebug
from processing.process import currentProcess, _registerAfterFork
from processing.logger import debug
from processing.finalize import Finalize
from processing.forking import PicklableOnlyForInheritance
from processing.forking import assertSpawning

@@ -36,3 +36,3 @@ #

class SemLock(PicklableOnlyForInheritance):
class SemLock(object):

@@ -42,10 +42,14 @@ def __init__(self, kind, value):

debug('created semlock with handle %s' % sl.handle)
self._setstate((sl.handle, sl.kind, sl.maxvalue))
self.__setstate__((sl.handle, sl.kind, sl.maxvalue))
if sys.platform != 'win32':
def _afterfork(obj):
obj._semlock._afterfork()
_register_afterfork(self, _afterfork)
def _afterFork(obj):
obj._semlock._afterFork()
_registerAfterFork(self, _afterFork)
def _setstate(self, state):
def __getstate__(self):
assertSpawning(self)
return self._state
def __setstate__(self, state):
self._state = state

@@ -71,11 +75,12 @@ if not hasattr(self, '_semlock'):

def getValue(self):
return self._semlock._getvalue()
return self._semlock._getValue()
def __repr__(self):
try:
return '<Semaphore(value=%r)>' % self._semlock._getvalue()
value = self._semlock._getValue()
except (KeyboardInterrupt, SystemExit):
raise
except:
return object.__repr__(self)
value = 'unknown'
return '<Semaphore(value=%s)>' % value

@@ -93,8 +98,9 @@ #

try:
return '<BoundedSemaphore(value=%r, maxvalue=%r)>' % \
(self._semlock._getvalue(), self._semlock.maxvalue)
value = self._semlock._getValue()
except (KeyboardInterrupt, SystemExit):
raise
except Exception:
return object.__repr__(self)
except:
value = 'unknown'
return '<BoundedSemaphore(value=%s, maxvalue=%s)>' % \
(value, self._semlock.maxvalue)

@@ -112,7 +118,7 @@ #

try:
if self._semlock._ismine():
if self._semlock._isMine():
name = currentProcess().getName()
if threading.currentThread().getName() != 'MainThread':
name += '|' + threading.currentThread().getName()
elif self._semlock._getvalue() == 1:
elif self._semlock._getValue() == 1:
name = 'None'

@@ -126,5 +132,4 @@ elif self._semlock._count() > 0:

except Exception:
return object.__repr__(self)
else:
return '<Lock(owner=%s)>' % name
name = 'unknown'
return '<Lock(owner=%s)>' % name

@@ -142,17 +147,18 @@ #

try:
if self._semlock._ismine():
if self._semlock._isMine():
name = currentProcess().getName()
if threading.currentThread().getName() != 'MainThread':
name += '|' + threading.currentThread().getName()
return '<RLock(%s, %s)>' % (name, self._semlock._count())
elif self._semlock._getvalue() == 1:
return '<RLock(None, 0)>'
count = self._semlock._count()
elif self._semlock._getValue() == 1:
name, count = 'None', 0
elif self._semlock._count() > 0:
return '<RLock(SomeOtherThread, nonzero)>'
name, count = 'SomeOtherThread', 'nonzero'
else:
return '<RLock(SomeOtherProcess, nonzero)>'
name, count = 'SomeOtherProcess', 'nonzero'
except (KeyboardInterrupt, SystemExit):
raise
except Exception:
return object.__repr__(self)
name, count = 'unknown', 'unknown'
return '<RLock(%s, %s)>' % (name, count)

@@ -163,9 +169,13 @@ #

class Condition(PicklableOnlyForInheritance):
class Condition(object):
def __init__(self, lock=None):
state = (lock or RLock(), Semaphore(0), Semaphore(0), Semaphore(0))
self._setstate(state)
self.__setstate__(state)
def _setstate(self, state):
def __getstate__(self):
assertSpawning(self)
return self._state
def __setstate__(self, state):
(self._lock, self._sleeping_count,

@@ -175,7 +185,9 @@ self._woken_count, self._wait_semaphore) = self._state = state

self.release = self._lock.release
self.__enter__ = self._lock.__enter__
self.__exit__ = self._lock.__exit__
def __repr__(self):
try:
num_waiters = (self._sleeping_count._semlock._getvalue() -
self._woken_count._semlock._getvalue())
num_waiters = (self._sleeping_count._semlock._getValue() -
self._woken_count._semlock._getValue())
except (KeyboardInterrupt, SystemExit):

@@ -185,6 +197,6 @@ raise

num_waiters = 'unkown'
return '<Condition(%r, %s)>' % (self._lock, num_waiters)
return '<Condition(%s, %s)>' % (self._lock, num_waiters)
def wait(self, timeout=None):
assert self._lock._semlock._ismine(), \
assert self._lock._semlock._isMine(), \
'must acquire() condition before using wait()'

@@ -212,3 +224,3 @@

def notify(self):
assert self._lock._semlock._ismine(), 'lock is not owned'
assert self._lock._semlock._isMine(), 'lock is not owned'
assert not self._wait_semaphore.acquire(False)

@@ -230,6 +242,6 @@

def notifyAll(self):
assert self._lock._semlock._ismine(), 'lock is not owned'
assert self._lock._semlock._isMine(), 'lock is not owned'
assert not self._wait_semaphore.acquire(False)
# to take account of timeouts since last notify() we subtract
# to take account of timeouts since last notify*() we subtract
# woken_count from sleeping_count and rezero woken_count

@@ -240,8 +252,9 @@ while self._woken_count.acquire(False):

sleepers = self._sleeping_count.getValue()
sleepers = 0
while self._sleeping_count.acquire(False):
self._wait_semaphore.release() # wake up one sleeper
sleepers += 1
if sleepers:
for i in xrange(sleepers):
self._sleeping_count.acquire() # grab a sleeper
self._wait_semaphore.release() # wake up a sleeper
for i in xrange(sleepers):
self._woken_count.acquire() # wait for a sleeper to wake

@@ -253,9 +266,2 @@

def __enter__(self):
self.acquire()
return self
def __exit__(self, t, v, tb):
self.release()
#

@@ -272,9 +278,16 @@ # Event

def isSet(self):
return bool(self._flag._semlock._getvalue())
self._cond.acquire()
try:
if self._flag.acquire(False):
self._flag.release()
return True
return False
finally:
self._cond.release()
def set(self):
self._cond.acquire()
try:
if not self.isSet():
self._flag.release()
self._flag.acquire(False)
self._flag.release()
self._cond.notifyAll()

@@ -294,5 +307,7 @@ finally:

try:
if not self.isSet():
if self._flag.acquire(False):
self._flag.release()
else:
self._cond.wait(timeout)
finally:
self._cond.release()
Metadata-Version: 1.0
Name: processing
Version: 0.51
Version: 0.52
Summary: Package for using processes which mimics the threading module

@@ -44,3 +44,3 @@ Home-page: http://developer.berlios.de/projects/pyprocessing

* http://developer.berlios.de/project/filelist.php?group_id=9001 or
* http://cheeseshop.python.org/pypi/processing
* http://pypi.python.org/pypi/processing

@@ -97,3 +97,3 @@

>>> p = Pool(4)
>>> result = p.map_async(f, range(10))
>>> result = p.mapAsync(f, range(10))
>>> print result.get(timeout=1)

@@ -100,0 +100,0 @@ [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

@@ -49,3 +49,3 @@ .. default-role:: literal

* http://developer.berlios.de/project/filelist.php?group_id=9001 or
* http://cheeseshop.python.org/pypi/processing
* http://pypi.python.org/pypi/processing

@@ -102,3 +102,3 @@

>>> p = Pool(4)
>>> result = p.map_async(f, range(10))
>>> result = p.mapAsync(f, range(10))
>>> print result.get(timeout=1)

@@ -105,0 +105,0 @@ [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

@@ -48,2 +48,7 @@ #

#
# HAVE_BROKEN_SEM_GETVALUE
# Set to 1 if `sem_getvalue()` does not work or is unavailable.
# On Mac OSX it seems to return -1 with message "[Errno 78]
# Function not implemented".
#
# HAVE_BROKEN_SEM_UNLINK

@@ -55,15 +60,16 @@ # Set to 1 if `sem_unlink()` is unnecessary. For some reason this

if sys.platform == 'win32':
if sys.platform == 'win32': # Windows
macros = dict()
libraries = ['ws2_32']
elif sys.platform == 'darwin':
elif sys.platform == 'darwin': # Mac OSX
macros = dict(
HAVE_SEM_OPEN=1,
HAVE_SEM_TIMEDWAIT=0,
HAVE_FD_TRANSFER=1
HAVE_FD_TRANSFER=1,
HAVE_BROKEN_SEM_GETVALUE=1
)
libraries = []
elif sys.platform == 'cygwin':
elif sys.platform == 'cygwin': # Cygwin
macros = dict(

@@ -77,3 +83,3 @@ HAVE_SEM_OPEN=1,

else:
else: # Linux and other unices
macros = dict(

@@ -87,3 +93,3 @@ HAVE_SEM_OPEN=1,

#macros['Py_DEBUG'] = 1
#

@@ -90,0 +96,0 @@ # Print configuration info

@@ -330,7 +330,7 @@ /*

{"sendbytes", (PyCFunction)Connection_sendbytes, METH_VARARGS,
{"sendBytes", (PyCFunction)Connection_sendbytes, METH_VARARGS,
"send the byte data from a readable buffer-like object"},
{"recvbytes", (PyCFunction)Connection_recvbytes, METH_NOARGS,
{"recvBytes", (PyCFunction)Connection_recvbytes, METH_NOARGS,
"receive byte data as a string"},
{"recvbytes_into", (PyCFunction)Connection_recvbytes_into, METH_VARARGS,
{"recvBytesInto", (PyCFunction)Connection_recvbytes_into, METH_VARARGS,
"receive byte data into a writeable buffer-like object\n"

@@ -351,2 +351,11 @@ "returns the number of bytes read"},

/* deprecated names */
{"sendbytes", (PyCFunction)Connection_sendbytes, METH_VARARGS,
"send the byte data from a readable buffer-like object"},
{"recvbytes", (PyCFunction)Connection_recvbytes, METH_NOARGS,
"receive byte data as a string"},
{"recvbytes_into", (PyCFunction)Connection_recvbytes_into, METH_VARARGS,
"receive byte data into a writeable buffer-like object\n"
"returns the number of bytes read"},
{NULL} /* Sentinel */

@@ -353,0 +362,0 @@ };

@@ -39,3 +39,3 @@ /*

Type = PyExc_OSError;
PyErr_SetFromErrno(PyExc_IOError);
PyErr_SetFromErrno(Type);
break;

@@ -266,15 +266,15 @@ #endif /* !MS_WINDOWS */

static PyMethodDef module_methods[] = {
{"rwbuffer", processing_rwbuffer, METH_VARARGS,
"rwbuffer(obj [, offset[, size]]) -> buffer\n"
{"readWriteBuffer", processing_rwbuffer, METH_VARARGS,
"readWriteBuffer(obj [, offset[, size]]) -> buffer\n"
"Create a writable view of obj assuming obj supports buffer inteface"},
{"address_of_buffer", processing_address_of_buffer, METH_O,
"address_of_buffer(obj) -> integer\n"
{"addressOfBuffer", processing_address_of_buffer, METH_O,
"addressOfBuffer(obj) -> integer\n"
"Return address of obj assuming obj supports buffer inteface"},
#if HAVE_FD_TRANSFER
{"sendfd", processing_sendfd, METH_VARARGS,
"sendfd(sockfd, fd) -> None\n"
{"sendFd", processing_sendfd, METH_VARARGS,
"sendFd(sockfd, fd) -> None\n"
"Send file descriptor given by fd over the unix domain socket\n"
"whose file decriptor is sockfd"},
{"recvfd", processing_recvfd, METH_VARARGS,
"recvfd(sockfd) -> fd\n"
{"recvFd", processing_recvfd, METH_VARARGS,
"recvFd(sockfd) -> fd\n"
"Receive a file descriptor over a unix domain socket\n"

@@ -284,4 +284,4 @@ "whose file decriptor is sockfd"},

#if defined(MS_WINDOWS) && PY_VERSION_HEX < 0x02060000
{"changefd", (PyCFunction)processing_changefd, METH_VARARGS,
"changefd(fd, family, type [, proto]) -> None\n"
{"changeFd", (PyCFunction)processing_changefd, METH_VARARGS,
"changeFd(fd, family, type [, proto]) -> None\n"
"Replace the file descriptor etc of an existing socket object\n"

@@ -370,3 +370,30 @@ "the old fd is closed, and replaced with a duplicate of fd"},

}
PyModule_AddObject(module, "_hInterruptEvent",
Py_BuildValue(F_HANDLE, hInterruptEvent));
PyModule_AddObject(module, "_main_thread_ident",
Py_BuildValue(F_HANDLE, main_thread));
#endif
/* Add configuration macros */
#ifdef HAVE_SEM_OPEN
PyModule_AddObject(module, "HAVE_SEM_OPEN",
Py_BuildValue("i", HAVE_SEM_OPEN));
#endif
#ifdef HAVE_SEM_TIMEDWAIT
PyModule_AddObject(module, "HAVE_SEM_TIMEDWAIT",
Py_BuildValue("i", HAVE_SEM_TIMEDWAIT));
#endif
#ifdef HAVE_FD_TRANSFER
PyModule_AddObject(module, "HAVE_FD_TRANSFER",
Py_BuildValue("i", HAVE_FD_TRANSFER));
#endif
#ifdef HAVE_BROKEN_SEM_GETVALUE
PyModule_AddObject(module, "HAVE_BROKEN_SEM_GETVALUE",
Py_BuildValue("i", HAVE_BROKEN_SEM_GETVALUE));
#endif
#ifdef HAVE_BROKEN_SEM_UNLINK
PyModule_AddObject(module, "HAVE_BROKEN_SEM_UNLINK",
Py_BuildValue("i", HAVE_BROKEN_SEM_UNLINK));
#endif
}

@@ -25,3 +25,2 @@ /*

#ifdef MS_WINDOWS

@@ -33,3 +32,3 @@

static SECURITY_ATTRIBUTES sa = {sizeof(SECURITY_ATTRIBUTES), NULL, TRUE};
static SECURITY_ATTRIBUTES sa = { sizeof(SECURITY_ATTRIBUTES), NULL, TRUE };

@@ -41,3 +40,2 @@ #define SEM_FAILED NULL

#define SEM_CREATE(name, val, max) CreateSemaphore(&sa, val, max, NULL)
#define SEM_POST(sem) (ReleaseSemaphore(sem, 1, NULL) ? 0 : -1)
#define SEM_CLOSE(sem) (CloseHandle(sem) ? 0 : -1)

@@ -183,3 +181,2 @@ #define SEM_GETVALUE(sem, pval) _SemLock_GetSemaphoreValue(sem, pval)

#define SEM_CREATE(name, val, max) sem_open(name, O_CREAT | O_EXCL, 0600, val)
#define SEM_POST(sem) sem_post(sem)
#define SEM_CLOSE(sem) sem_close(sem)

@@ -327,4 +324,2 @@ #define SEM_GETVALUE(sem, pval) sem_getvalue(sem, pval)

{
int sval;
switch (self->kind) {

@@ -343,12 +338,36 @@ case RECURSIVE_MUTEX:

break;
#if HAVE_BROKEN_SEM_GETVALUE
case BOUNDED_SEMAPHORE:
/* This check is unreliable since value may change before post.
However, it will only fail if something has gone wrong. */
if (sem_getvalue(self->handle, &sval) < 0)
return PyErr_SetFromErrno(PyExc_OSError);
if (sval >= self->maxvalue) {
PyErr_SetString(PyExc_ValueError,
"semaphore or lock released too many times");
return NULL;
}
/* We will only check properly the Lock case (where maxvalue == 1) */
if (self->maxvalue == 1) {
/* make sure that already locked */
if (sem_trywait(self->handle) < 0) {
if (errno != EAGAIN)
return PyErr_SetFromErrno(PyExc_OSError);
/* it is already locked as expected */
} else {
/* it was not locked -- so undo wait and raise error */
if (sem_post(self->handle) < 0)
return PyErr_SetFromErrno(PyExc_OSError);
PyErr_SetString(PyExc_ValueError,
"semaphore or lock released too many times");
return NULL;
}
}
#else
case BOUNDED_SEMAPHORE:
{
int sval;
/* This check is not an absolute guarantee that the semaphore
does not rise above maxvalue. */
if (sem_getvalue(self->handle, &sval) < 0) {
return PyErr_SetFromErrno(PyExc_OSError);
} else if (sval >= self->maxvalue) {
PyErr_SetString(PyExc_ValueError,
"semaphore or lock released too many times");
return NULL;
}
}
#endif
}

@@ -403,3 +422,3 @@

PyOS_snprintf(buffer, sizeof(buffer), "/pys-%d-%d", getpid(), counter++);
PyOS_snprintf(buffer, sizeof(buffer), "/pr%d-%d", getpid(), counter++);

@@ -471,7 +490,11 @@ if (kind == BOUNDED_SEMAPHORE)

{
#if HAVE_BROKEN_SEM_GETVALUE
PyErr_SetNone(PyExc_NotImplementedError);
return NULL;
#else
int sval;
if (SEM_GETVALUE(self->handle, &sval) < 0)
SetException(NULL, STANDARD_ERROR);
return PyInt_FromLong((long)sval);
#endif
}

@@ -501,9 +524,9 @@

"number of `acquire()`s minus number of `release()`s for this process"},
{"_ismine", (PyCFunction)SemLock_ismine, METH_NOARGS,
{"_isMine", (PyCFunction)SemLock_ismine, METH_NOARGS,
"whether the lock is owned by this thread"},
{"_getvalue", (PyCFunction)SemLock_getvalue, METH_NOARGS,
{"_getValue", (PyCFunction)SemLock_getvalue, METH_NOARGS,
"get the value of the semaphore"},
{"_rebuild", (PyCFunction)SemLock_rebuild, METH_VARARGS | METH_CLASS,
""},
{"_afterfork", (PyCFunction)SemLock_afterfork, METH_NOARGS,
{"_afterFork", (PyCFunction)SemLock_afterfork, METH_NOARGS,
"rezero the net acquisition count after fork()"},

@@ -510,0 +533,0 @@ {NULL}

@@ -152,2 +152,17 @@ /*

static PyObject *
win32_GetExitCodeProcess(PyObject *self, PyObject *args)
{
HANDLE hProcess;
DWORD dwExitCode;
if (!PyArg_ParseTuple(args, F_HANDLE, &hProcess))
return NULL;
if (!GetExitCodeProcess(hProcess, &dwExitCode))
return PyErr_SetFromWindowsErr(0);
return Py_BuildValue(F_DWORD, dwExitCode);
}
static PyObject *
win32_GetHandleInformation(PyObject *self, PyObject *args)

@@ -193,2 +208,21 @@ {

static PyObject *
win32_ResetEvent(PyObject *self, PyObject *args)
{
HANDLE hEvent;
BOOL success;
if (!PyArg_ParseTuple(args, F_HANDLE, &hEvent))
return NULL;
Py_BEGIN_ALLOW_THREADS
success = ResetEvent(hEvent);
Py_END_ALLOW_THREADS
if (!success)
return PyErr_SetFromWindowsErr(0);
Py_RETURN_NONE;
}
static PyObject *
win32_SetConsoleCtrlHandler(PyObject *self, PyObject *args)

@@ -270,2 +304,56 @@ {

static PyObject *
win32_WaitForMultipleObjects(PyObject *self, PyObject *args)
{
DWORD nCount;
PyObject *oHandles;
BOOL bWaitAll;
DWORD dwMilliseconds;
DWORD res, i;
HANDLE *handle_array = NULL;
PyObject *obj;
if (!PyArg_ParseTuple(args, F_DWORD "Oi" F_DWORD,
&nCount, &oHandles, &bWaitAll, &dwMilliseconds))
return NULL;
if (!PySequence_Check(oHandles)) {
PyErr_SetString(PyExc_TypeError, "expected a sequence");
return NULL;
}
if (nCount != (DWORD)PySequence_Length(oHandles)) {
PyErr_SetString(PyExc_ValueError, "sequence has unexpected length");
return NULL;
}
handle_array = PyMem_Malloc(nCount * sizeof(HANDLE));
if (!handle_array)
return PyErr_NoMemory();
PyErr_Clear();
for (i = 0 ; i < nCount ; i++) {
obj = PySequence_GetItem(oHandles, (Py_ssize_t)i);
handle_array[i] = (HANDLE)PyLong_AsVoidPtr(obj);
if (PyErr_Occurred()) {
PyMem_Free(handle_array);
return NULL;
}
}
Py_BEGIN_ALLOW_THREADS
res = WaitForMultipleObjects(nCount, handle_array,
bWaitAll, dwMilliseconds);
Py_END_ALLOW_THREADS
PyMem_Free(handle_array);
if (res == WAIT_FAILED)
return PyErr_SetFromWindowsErr(0);
return Py_BuildValue(F_DWORD, res);
}
static PyObject *
win32_WaitNamedPipe(PyObject *self, PyObject *args)

@@ -297,5 +385,7 @@ {

WIN32_FUNCTION(GenerateConsoleCtrlEvent),
WIN32_FUNCTION(GetExitCodeProcess),
WIN32_FUNCTION(GetHandleInformation),
WIN32_FUNCTION(GetLastError),
WIN32_FUNCTION(OpenProcess),
WIN32_FUNCTION(ResetEvent),
WIN32_FUNCTION(SetConsoleCtrlHandler),

@@ -305,2 +395,3 @@ WIN32_FUNCTION(SetHandleInformation),

WIN32_FUNCTION(TerminateProcess),
WIN32_FUNCTION(WaitForMultipleObjects),
WIN32_FUNCTION(WaitNamedPipe),

@@ -333,2 +424,3 @@ {NULL}

WIN32_CONSTANT(F_DWORD, HANDLE_FLAG_INHERIT);
WIN32_CONSTANT(F_DWORD, INFINITE);
WIN32_CONSTANT(F_DWORD, NMPWAIT_WAIT_FOREVER);

@@ -343,2 +435,5 @@ WIN32_CONSTANT(F_DWORD, OPEN_EXISTING);

WIN32_CONSTANT(F_DWORD, PROCESS_ALL_ACCESS);
WIN32_CONSTANT(F_DWORD, WAIT_OBJECT_0);
WIN32_CONSTANT(F_DWORD, WAIT_TIMEOUT);
WIN32_CONSTANT(F_POINTER, NULL);

@@ -345,0 +440,0 @@

@@ -16,2 +16,3 @@ #

import random
import logging
import ctypes

@@ -40,2 +41,5 @@

HAVE_GETVALUE = not getattr(processing._processing,
'HAVE_BROKEN_SEM_GETVALUE', False)
#

@@ -70,22 +74,26 @@ # Creates a wrapper for a function which records the time it takes to finish

def assertReturnsIfImplemented(self, value, func, *args):
try:
res = func(*args)
except NotImplementedError:
pass
else:
return self.assertEqual(value, res)
#
# Monkey patch threading._Semaphore and proxy type to have getValue()
# Return the value of a semaphore
#
def getValue(self):
return self._Semaphore__value
try:
return self.getValue()
except AttributeError:
try:
return self._Semaphore__value
except AttributeError:
try:
return self._value
except AttributeError:
raise NotImplementedError
threading._Semaphore.getValue = getValue
def getValue(self):
return self._callmethod('getValue')
processing.managers.AcquirerProxy.getValue = getValue
class SyncManager(processing.managers.SyncManager):
# We create a trivial manager subclass. This ensures that this
# module will be imported in the manager process, so monkey
# patching of threading.Semaphore will affect the manager process.
pass
#

@@ -107,4 +115,4 @@ # Testcases

self.assertTrue(len(authkey) > 0)
self.assertTrue(current.getExitCode() is None)
self.assertEqual(current.getPid(), os.getpid())
self.assertEqual(current.getExitCode(), None)

@@ -134,3 +142,2 @@ def _test(self, q, *args, **kwds):

self.assertEquals(p.getAuthKey(), current.getAuthKey())
self.assertEquals(p.getExitCode(), None)
self.assertEquals(p.isAlive(), False)

@@ -140,2 +147,3 @@ self.assertEquals(p.isDaemon(), True)

self.assertTrue(type(self.activeChildren()) is list)
self.assertEqual(p.getExitCode(), None)

@@ -470,14 +478,14 @@ p.start()

def _test_semaphore(self, sem):
self.assertEqual(sem.getValue(), 2)
self.assertReturnsIfImplemented(2, getValue, sem)
self.assertEqual(sem.acquire(), True)
self.assertEqual(sem.getValue(), 1)
self.assertReturnsIfImplemented(1, getValue, sem)
self.assertEqual(sem.acquire(), True)
self.assertEqual(sem.getValue(), 0)
self.assertReturnsIfImplemented(0, getValue, sem)
self.assertEqual(sem.acquire(False), False)
self.assertEqual(sem.getValue(), 0)
self.assertReturnsIfImplemented(0, getValue, sem)
self.assertEqual(sem.release(), None)
self.assertEqual(sem.getValue(), 1)
self.assertReturnsIfImplemented(1, getValue, sem)
self.assertEqual(sem.release(), None)
self.assertEqual(sem.getValue(), 2)
self.assertReturnsIfImplemented(2, getValue, sem)
def test_semaphore(self):

@@ -487,5 +495,5 @@ sem = self.Semaphore(2)

self.assertEqual(sem.release(), None)
self.assertEqual(sem.getValue(), 3)
self.assertReturnsIfImplemented(3, getValue, sem)
self.assertEqual(sem.release(), None)
self.assertEqual(sem.getValue(), 4)
self.assertReturnsIfImplemented(4, getValue, sem)

@@ -495,4 +503,5 @@ def test_bounded_semaphore(self):

self._test_semaphore(sem)
self.assertRaises(ValueError, sem.release)
self.assertEqual(sem.getValue(), 2)
if HAVE_GETVALUE:
self.assertRaises(ValueError, sem.release)
self.assertReturnsIfImplemented(2, getValue, sem)

@@ -521,7 +530,3 @@ def test_timeout(self):

def test_getvalue_bound(self):
sem = self.BoundedSemaphore(4)
self.assertEqual(sem.getValue(), 4)
class _TestCondition(BaseTestCase):

@@ -539,6 +544,9 @@

if self.TYPE == 'processes':
sleepers = (cond._sleeping_count.getValue() -
cond._woken_count.getValue())
self.assertEqual(sleepers, 0)
self.assertEqual(cond._wait_semaphore.getValue(), 0)
try:
sleepers = (cond._sleeping_count.getValue() -
cond._woken_count.getValue())
self.assertEqual(sleepers, 0)
self.assertEqual(cond._wait_semaphore.getValue(), 0)
except NotImplementedError:
pass

@@ -564,3 +572,3 @@ def test_notify(self):

time.sleep(DELTA)
self.assertEqual(woken.getValue(), 0)
self.assertReturnsIfImplemented(0, getValue, woken)

@@ -574,3 +582,3 @@ # wake up one process/thread

time.sleep(DELTA)
self.assertEqual(woken.getValue(), 1)
self.assertReturnsIfImplemented(1, getValue, woken)

@@ -584,3 +592,3 @@ # wake up another

time.sleep(DELTA)
self.assertEqual(woken.getValue(), 2)
self.assertReturnsIfImplemented(2, getValue, woken)

@@ -615,3 +623,3 @@ # check state is not mucked up

woken.acquire()
self.assertEqual(woken.getValue(), 0)
self.assertReturnsIfImplemented(0, getValue, woken)

@@ -637,3 +645,3 @@ # check state is not mucked up

time.sleep(DELTA)
self.assertEqual(woken.getValue(), 0)
self.assertReturnsIfImplemented(0, getValue, woken)

@@ -647,3 +655,3 @@ # wake them all up

time.sleep(DELTA)
self.assertEqual(woken.getValue(), 6)
self.assertReturnsIfImplemented(6, getValue, woken)

@@ -859,3 +867,3 @@ # check state is not mucked up

def test_async(self):
res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
res = self.pool.applyAsync(sqr, (7, TIMEOUT1,))
get = TimingWrapper(res.get)

@@ -866,3 +874,3 @@ self.assertEqual(get(), 49)

def test_async_timeout(self):
res = self.pool.apply_async(sqr, (6, 100))
res = self.pool.applyAsync(sqr, (6, 100))
get = TimingWrapper(res.get)

@@ -890,6 +898,6 @@ self.assertRaises(processing.TimeoutError, get, timeout=TIMEOUT2)

def test_imap_unordered(self):
it = self.pool.imap_unordered(sqr, range(1000))
it = self.pool.imapUnordered(sqr, range(1000))
self.assertEqual(sorted(it), map(sqr, range(1000)))
it = self.pool.imap_unordered(sqr, range(1000), chunksize=53)
it = self.pool.imapUnordered(sqr, range(1000), chunksize=53)
self.assertEqual(sorted(it), map(sqr, range(1000)))

@@ -913,3 +921,3 @@

# the managers shared objects
debug = self._debug_info()
debug = self._debugInfo()
if debug:

@@ -943,3 +951,3 @@ print debug

def next(self):
return self._callmethod('next')
return self._callMethod('next')

@@ -972,9 +980,9 @@ class MyManager(BaseManager):

self.assertRaises(ValueError, foo.g)
self.assertEqual(foo._callmethod('f'), 'f()')
self.assertRaises(RemoteError, foo._callmethod, '_h')
self.assertEqual(foo._callMethod('f'), 'f()')
self.assertRaises(RemoteError, foo._callMethod, '_h')
self.assertEqual(bar.f(), 'f()')
self.assertEqual(bar._h(), '_h()')
self.assertEqual(bar._callmethod('f'), 'f()')
self.assertEqual(bar._callmethod('_h'), '_h()')
self.assertEqual(bar._callMethod('f'), 'f()')
self.assertEqual(bar._callMethod('_h'), '_h()')

@@ -1008,3 +1016,3 @@ self.assertEqual(list(baz), [i*i for i in range(10)])

def _putter(self, address):
m2 = QueueManager2.from_address(address=address, authkey='none')
m2 = QueueManager2.fromAddress(address=address, authkey='none')
queue = m2.get_proxy()

@@ -1020,3 +1028,3 @@ queue.put('hello world')

m2 = QueueManager2.from_address(address=m.address, authkey='none')
m2 = QueueManager2.fromAddress(address=m.address, authkey='none')
queue = m2.get_proxy()

@@ -1044,4 +1052,4 @@ self.assertEqual(queue.get(), 'hello world')

def _echo(self, conn):
for msg in iter(conn.recvbytes, ''):
conn.sendbytes(msg)
for msg in iter(conn.recvBytes, ''):
conn.sendBytes(msg)
conn.close()

@@ -1067,4 +1075,4 @@

self.assertEqual(conn.sendbytes(msg), None)
self.assertEqual(conn.recvbytes(), msg)
self.assertEqual(conn.sendBytes(msg), None)
self.assertEqual(conn.recvBytes(), msg)

@@ -1074,4 +1082,4 @@ if self.TYPE == 'processes':

expected = list(arr) + [0] * (10 - len(arr))
self.assertEqual(conn.sendbytes(arr), None)
self.assertEqual(conn.recvbytes_into(buffer),
self.assertEqual(conn.sendBytes(arr), None)
self.assertEqual(conn.recvBytesInto(buffer),
len(arr) * buffer.itemsize)

@@ -1082,4 +1090,4 @@ self.assertEqual(list(buffer), expected)

expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
self.assertEqual(conn.sendbytes(arr), None)
self.assertEqual(conn.recvbytes_into(buffer, 3 * buffer.itemsize),
self.assertEqual(conn.sendBytes(arr), None)
self.assertEqual(conn.recvBytesInto(buffer, 3 * buffer.itemsize),
len(arr) * buffer.itemsize)

@@ -1089,5 +1097,5 @@ self.assertEqual(list(buffer), expected)

buffer = array.array('c', ' ' * 40)
self.assertEqual(conn.sendbytes(longmsg), None)
self.assertEqual(conn.sendBytes(longmsg), None)
try:
res = conn.recvbytes_into(buffer)
res = conn.recvBytesInto(buffer)
except processing.BufferTooShort, e:

@@ -1114,6 +1122,6 @@ self.assertEqual(e.args, (longmsg,))

really_big_msg = 'X' * (1024 * 1024 * 16) # 16 megabytes
conn.sendbytes(really_big_msg)
self.assertEqual(conn.recvbytes(), really_big_msg)
conn.sendBytes(really_big_msg)
self.assertEqual(conn.recvBytes(), really_big_msg)
conn.sendbytes('') # tell child to quit
conn.sendBytes('') # tell child to quit
child_conn.close()

@@ -1123,3 +1131,3 @@

self.assertRaises(EOFError, conn.recv)
self.assertRaises(EOFError, conn.recvbytes)
self.assertRaises(EOFError, conn.recvBytes)

@@ -1148,6 +1156,6 @@ p.join()

conn.sendbytes('hello')
self.assertEqual(conn.recvbytes(), 'hello')
conn.sendBytes('hello')
self.assertEqual(conn.recvBytes(), 'hello')
conn.sendbytes('')
conn.sendBytes('')
conn.close()

@@ -1366,3 +1374,3 @@ p.join()

from processing.finalize import Finalize
from processing.process import _exit_func
from processing.process import _exitFunction

@@ -1399,3 +1407,3 @@ class Foo(object):

# garbage collecting locals
_exit_func()
_exitFunction()
conn.close()

@@ -1443,6 +1451,6 @@ os._exit(0)

#
# Test that enableLogging() works -- does not test logging output
# Quick test that logging works -- does not test logging output
#
class _TestEnableLogging(BaseTestCase):
class _TestLogging(BaseTestCase):

@@ -1452,3 +1460,3 @@ ALLOWED_TYPES = ('processes',)

def test_enable_logging(self):
processing.enableLogging(level=processing.WARNING)
processing.enableLogging(level=processing.SUBWARNING)
logger = processing.getLogger()

@@ -1459,2 +1467,29 @@ self.assertTrue(logger is not None)

def _test_level(self, conn):
logger = processing.getLogger()
conn.send(logger.getEffectiveLevel())
def test_level(self):
LEVEL1 = 32
LEVEL2 = 37
LEVEL3 = 41
reader, writer = processing.Pipe(duplex=False)
root_logger = logging.getLogger('')
processing.enableLogging(level=LEVEL1)
self.Process(target=self._test_level, args=(writer,)).start()
self.assertEqual(LEVEL1, reader.recv())
processing.getLogger().setLevel(LEVEL2)
self.Process(target=self._test_level, args=(writer,)).start()
self.assertEqual(LEVEL2, reader.recv())
self.assertEqual(processing.NOTSET, logging.NOTSET)
processing.enableLogging(level=logging.NOTSET)
root_logger.setLevel(LEVEL3)
self.Process(target=self._test_level, args=(writer,)).start()
self.assertEqual(LEVEL3, reader.recv())
processing.enableLogging(level=processing.SUBWARNING)
#

@@ -1510,7 +1545,7 @@ # Functions used to create test cases from the base ones in this module

Process = processing.Process
manager = object.__new__(SyncManager) # initialized by test_main()
manager = object.__new__(processing.managers.SyncManager)
locals().update(get_attributes(manager, (
'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
'Namespace', '_debug_info'
'Namespace', '_debugInfo'
)))

@@ -1517,0 +1552,0 @@