🚨 Active Supply Chain Attack:node-ipc Package Compromised.Learn More
Socket
Book a DemoSign in
Socket

processing

Package Overview
Dependencies
Maintainers
2
Versions
18
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

processing - pypi Package Compare versions

Comparing version
0.38
to
0.39
+108
doc/connection-objects.html
<?xml version="1.0" encoding="utf-8" ?>
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">
<html xmlns="http://www.w3.org/1999/xhtml" xml:lang="en" lang="en">
<head>
<meta http-equiv="Content-Type" content="text/html; charset=utf-8" />
<meta name="generator" content="Docutils 0.4: http://docutils.sourceforge.net/" />
<title>Connection objects</title>
<link rel="stylesheet" href="html4css1.css" type="text/css" />
</head>
<body>
<div class="header">
<a class="reference" href="queue-objects.html">Prev</a> &nbsp; &nbsp; &nbsp; &nbsp; <a class="reference" href="processing-ref.html">Up</a> &nbsp; &nbsp; &nbsp; &nbsp; <a class="reference" href="manager-objects.html">Next</a>
<hr class="header"/>
</div>
<div class="document" id="connection-objects">
<h1 class="title">Connection objects</h1>
<p>Connection objects allow the sending and receiving of picklable
objects or strings. They can be thought of a message oriented
connected sockets.</p>
<p>Connection objects usually created using <tt class="docutils literal"><span class="pre">processing.Pipe()</span></tt> -- see
also <a class="reference" href="connection-ref.html">Listener and Clients</a>.</p>
<p>Connection objects have the following methods:</p>
<blockquote>
<dl class="docutils">
<dt><tt class="docutils literal"><span class="pre">send(obj)</span></tt></dt>
<dd><p class="first">Send an object to the other end of the connection which should
be read using <tt class="docutils literal"><span class="pre">recv()</span></tt>.</p>
<p class="last">The object must be picklable.</p>
</dd>
<dt><tt class="docutils literal"><span class="pre">recv()</span></tt></dt>
<dd>Return an object sent from the other end of the connection
using <tt class="docutils literal"><span class="pre">send()</span></tt>.</dd>
<dt><tt class="docutils literal"><span class="pre">fileno()</span></tt></dt>
<dd>Returns the file descriptor or handle used by the connection.</dd>
<dt><tt class="docutils literal"><span class="pre">close()</span></tt></dt>
<dd><p class="first">Close the connection.</p>
<p class="last">This is called automatically when the connection is garbage
collected.</p>
</dd>
<dt><tt class="docutils literal"><span class="pre">poll(timeout=0)</span></tt></dt>
<dd>Return whether there is any data available to be read within
<tt class="docutils literal"><span class="pre">timeout</span></tt> seconds.</dd>
<dt><tt class="docutils literal"><span class="pre">sendbytes(buffer)</span></tt></dt>
<dd><p class="first">Send byte data from an object supporting the buffer interface
as a complete message.</p>
<p class="last">Can be used to send strings or a view returned by <tt class="docutils literal"><span class="pre">buffer()</span></tt>.</p>
</dd>
<dt><tt class="docutils literal"><span class="pre">recvbytes()</span></tt></dt>
<dd>Return a complete message of byte data sent from the other end
of the connection as a string.</dd>
<dt><tt class="docutils literal"><span class="pre">recvbytes_into(buffer,</span> <span class="pre">offset=0)</span></tt></dt>
<dd><p class="first">Read into <tt class="docutils literal"><span class="pre">buffer</span></tt> at position <tt class="docutils literal"><span class="pre">offset</span></tt> a complete message of
byte data sent from the other end of the connection and return
the number of bytes in the message.</p>
<p><tt class="docutils literal"><span class="pre">buffer</span></tt> must be an object satisfying the writable buffer
interface and <tt class="docutils literal"><span class="pre">offset</span></tt> must be non-negative and less than
the length of <tt class="docutils literal"><span class="pre">buffer</span></tt> (in bytes).</p>
<p class="last">If the buffer is too short then a <tt class="docutils literal"><span class="pre">BufferTooShort</span></tt> exception
is raised and the complete message of bytes data is available
as <tt class="docutils literal"><span class="pre">e.args[0]</span></tt> where <tt class="docutils literal"><span class="pre">e</span></tt> is the exception instance.</p>
</dd>
</dl>
</blockquote>
<p>For example:</p>
<blockquote>
<pre class="doctest-block">
&gt;&gt;&gt; from processing import Pipe
&gt;&gt;&gt; a, b = Pipe()
&gt;&gt;&gt; a.send([1, 'hello', None])
&gt;&gt;&gt; b.recv()
[1, 'hello', None]
&gt;&gt;&gt; b.sendbytes('thank you')
&gt;&gt;&gt; a.recvbytes()
'thank you'
&gt;&gt;&gt; import array
&gt;&gt;&gt; arr1 = array.array('i', range(5))
&gt;&gt;&gt; arr2 = array.array('i', [0] * 10)
&gt;&gt;&gt; a.sendbytes(arr1)
&gt;&gt;&gt; b.recvbytes_into(arr2)
20
&gt;&gt;&gt; arr2
array('i', [0, 1, 2, 3, 4, 0, 0, 0, 0, 0])
</pre>
</blockquote>
<div class="warning">
<p class="first admonition-title">Warning</p>
<p>The <tt class="docutils literal"><span class="pre">recv()</span></tt> method automatically unpickles the data it receives
which can be a security risk unless you can trust the process
which sent the message.</p>
<p class="last">Therefore, unless the connection object was produced using
<tt class="docutils literal"><span class="pre">Pipe()</span></tt> you should only use the <tt class="docutils literal"><span class="pre">recv()</span></tt> and <tt class="docutils literal"><span class="pre">send()</span></tt> methods
after performing some sort of authentication. See <a class="reference" href="connection-ref.html#authentication-keys">Authentication
keys</a>.</p>
</div>
<div class="warning">
<p class="first admonition-title">Warning</p>
<p class="last">If a process is killed while it is trying to read or write to a
pipe then the data in the pipe is likely to become corrupted
because it may become impossible to be sure where the message
boundaries lie.</p>
</div>
</div>
<div class="footer">
<hr class="footer" />
<a class="reference" href="queue-objects.html">Prev</a> &nbsp; &nbsp; &nbsp; &nbsp; <a class="reference" href="processing-ref.html">Up</a> &nbsp; &nbsp; &nbsp; &nbsp; <a class="reference" href="manager-objects.html">Next</a>
</div>
</body>
</html>
.. include:: header.txt
====================
Connection objects
====================
Connection objects allow the sending and receiving of picklable
objects or strings. They can be thought of a message oriented
connected sockets.
Connection objects usually created using `processing.Pipe()` -- see
also `Listener and Clients <connection-ref.html>`_.
Connection objects have the following methods:
`send(obj)`
Send an object to the other end of the connection which should
be read using `recv()`.
The object must be picklable.
`recv()`
Return an object sent from the other end of the connection
using `send()`.
`fileno()`
Returns the file descriptor or handle used by the connection.
`close()`
Close the connection.
This is called automatically when the connection is garbage
collected.
`poll(timeout=0)`
Return whether there is any data available to be read within
`timeout` seconds.
`sendbytes(buffer)`
Send byte data from an object supporting the buffer interface
as a complete message.
Can be used to send strings or a view returned by `buffer()`.
`recvbytes()`
Return a complete message of byte data sent from the other end
of the connection as a string.
`recvbytes_into(buffer, offset=0)`
Read into `buffer` at position `offset` a complete message of
byte data sent from the other end of the connection and return
the number of bytes in the message.
`buffer` must be an object satisfying the writable buffer
interface and `offset` must be non-negative and less than
the length of `buffer` (in bytes).
If the buffer is too short then a `BufferTooShort` exception
is raised and the complete message of bytes data is available
as `e.args[0]` where `e` is the exception instance.
For example:
>>> from processing import Pipe
>>> a, b = Pipe()
>>> a.send([1, 'hello', None])
>>> b.recv()
[1, 'hello', None]
>>> b.sendbytes('thank you')
>>> a.recvbytes()
'thank you'
>>> import array
>>> arr1 = array.array('i', range(5))
>>> arr2 = array.array('i', [0] * 10)
>>> a.sendbytes(arr1)
>>> b.recvbytes_into(arr2)
20
>>> arr2
array('i', [0, 1, 2, 3, 4, 0, 0, 0, 0, 0])
.. warning::
The `recv()` method automatically unpickles the data it receives
which can be a security risk unless you can trust the process
which sent the message.
Therefore, unless the connection object was produced using
`Pipe()` you should only use the `recv()` and `send()` methods
after performing some sort of authentication. See `Authentication
keys <connection-ref.html#authentication-keys>`_.
.. warning::
If a process is killed while it is trying to read or write to a
pipe then the data in the pipe is likely to become corrupted
because it may become impossible to be sure where the message
boundaries lie.
.. _Prev: queue-objects.html
.. _Up: processing-ref.html
.. _Next: manager-objects.html
<?xml version="1.0" encoding="utf-8" ?>
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">
<html xmlns="http://www.w3.org/1999/xhtml" xml:lang="en" lang="en">
<head>
<meta http-equiv="Content-Type" content="text/html; charset=utf-8" />
<meta name="generator" content="Docutils 0.4: http://docutils.sourceforge.net/" />
<title>Queue objects</title>
<link rel="stylesheet" href="html4css1.css" type="text/css" />
</head>
<body>
<div class="header">
<a class="reference" href="process-objects.html">Prev</a> &nbsp; &nbsp; &nbsp; &nbsp; <a class="reference" href="processing-ref.html">Up</a> &nbsp; &nbsp; &nbsp; &nbsp; <a class="reference" href="connection-objects.html">Next</a>
<hr class="header"/>
</div>
<div class="document" id="queue-objects">
<h1 class="title">Queue objects</h1>
<p>The queue types provided by <tt class="docutils literal"><span class="pre">processing</span></tt> are multi-producer,
multi-consumer FIFO queues modelled on the <tt class="docutils literal"><span class="pre">Queue.Queue</span></tt> class in the
standard library.</p>
<p>In general it is best to stick to <tt class="docutils literal"><span class="pre">processing.Queue</span></tt> which has the
important advantage that (unless a maximum size is specified) putting
an object on the queue will always succeed without blocking. Without
this guarantee one must be much more careful to avoid the possibility
of deadlocks.</p>
<dl class="docutils">
<dt><tt class="docutils literal"><span class="pre">Queue(maxsize=0)</span></tt></dt>
<dd><p class="first">Returns a process shared queue implemented using a pipe and a
few locks/semaphores. A background thread transfers objects
from a buffer into the pipe.</p>
<p>It is a near clone of <tt class="docutils literal"><span class="pre">Queue.Queue</span></tt> except that the <tt class="docutils literal"><span class="pre">qsize()</span></tt>
method is not implemented and that the <tt class="docutils literal"><span class="pre">task_done()</span></tt> and
<tt class="docutils literal"><span class="pre">join()</span></tt> methods introduced in Python 2.5 are also missing.</p>
<p><tt class="docutils literal"><span class="pre">Queue</span></tt> has a few additional methods which are usually
unnecessary:</p>
<blockquote>
<dl class="docutils">
<dt><tt class="docutils literal"><span class="pre">putmany(iterable)</span></tt></dt>
<dd>If the queue has infinite size then this adds all
items in the iterable to the queue's buffer. So
<tt class="docutils literal"><span class="pre">q.putmany(X)</span></tt> is a faster alternative to <tt class="docutils literal"><span class="pre">for</span> <span class="pre">x</span> <span class="pre">in</span> <span class="pre">X:</span>
<span class="pre">q.put(x)</span></tt>. Raises an error if the queue has finite
size.</dd>
<dt><tt class="docutils literal"><span class="pre">close()</span></tt></dt>
<dd>Indicates that no more data will be put on this queue by
the current process. The background thread will quit once
it has flushed all buffered data to the pipe. This is
called automatically when the queue is garbage collected.</dd>
<dt><tt class="docutils literal"><span class="pre">jointhread()</span></tt></dt>
<dd><p class="first">This joins the background thread and can only be used
after <tt class="docutils literal"><span class="pre">close()</span></tt> has been called. This blocks until
the background thread exits, ensuring that all data in
the buffer has been flushed to the pipe.</p>
<p class="last">By default if a process is not the creator of the
queue then on exit it will attempt to join the queue's
background thread. The process can call
<tt class="docutils literal"><span class="pre">canceljoin()</span></tt> to prevent this behaviour.</p>
</dd>
<dt><tt class="docutils literal"><span class="pre">canceljoin()</span></tt></dt>
<dd>Prevents the background thread from being joined
automatically when the process exits. Unnecessary if
the current process created the queue.</dd>
</dl>
</blockquote>
<p class="last">Note that if a process is killed while it is trying to receive
or send to a queue then the data in the queue is likely to
become corrupted (because it may become impossible to be sure
where the message boundaries lie).</p>
</dd>
<dt><tt class="docutils literal"><span class="pre">SimpleQueue()</span></tt></dt>
<dd><p class="first">A simplified and faster alternative to <tt class="docutils literal"><span class="pre">Queue()</span></tt>. It is really
just a non-duplex pipe protected by a couple of locks.</p>
<p>It has <tt class="docutils literal"><span class="pre">get()</span></tt> and <tt class="docutils literal"><span class="pre">put()</span></tt> methods but these do not have
<tt class="docutils literal"><span class="pre">block</span></tt> or <tt class="docutils literal"><span class="pre">timeout</span></tt> arguments. It also has an <tt class="docutils literal"><span class="pre">empty()</span></tt>
method.</p>
<p class="last"><strong>Warning</strong>: Unlike with <tt class="docutils literal"><span class="pre">Queue</span></tt> (when <tt class="docutils literal"><span class="pre">maxsize</span></tt> is zero)
using <tt class="docutils literal"><span class="pre">put()</span></tt> may block if the pipe's buffer does not have
sufficient space, so one must take care that no deadlocks are
possible.</p>
</dd>
<dt><tt class="docutils literal"><span class="pre">PosixQueue(maxsize=0,</span> <span class="pre">msgsize=0)</span></tt></dt>
<dd><p class="first">A faster alternative to <tt class="docutils literal"><span class="pre">Queue()</span></tt> which is available on Unix
systems which support Posix message queues.</p>
<p>However, posix queues have a maximum number of messages that
can occupy the queue at a given time, and each message (when
expressed as a pickled string) has a maximum length --- see
<tt class="docutils literal"><span class="pre">man</span> <span class="pre">7</span> <span class="pre">mq_overview</span></tt>.</p>
<p><tt class="docutils literal"><span class="pre">maxsize</span></tt> if specified determines the maximum number of items
that be in the queue. Note that unlike Pythons's normal queue
type if this is greater than a system defined maximum then an
error is raised. If <tt class="docutils literal"><span class="pre">maxsize</span></tt> is zero then this maximum value
is used.</p>
<p><tt class="docutils literal"><span class="pre">msgsize</span></tt> if specified determines the maximum length that each
message can be (when expressed as a pickled string). If this
is greater than a system defined maximum then an error is
raised. If <tt class="docutils literal"><span class="pre">msgsize</span></tt> is zero then this maximum value is used.</p>
<p class="last">If one tries to send a message which is too long then
<tt class="docutils literal"><span class="pre">ValueError</span></tt> will be raised.</p>
</dd>
</dl>
<div class="admonition-empty-and-full admonition">
<p class="first admonition-title"><tt class="docutils literal"><span class="pre">Empty</span></tt> and <tt class="docutils literal"><span class="pre">Full</span></tt></p>
<p class="last"><tt class="docutils literal"><span class="pre">processing</span></tt> uses the usual <tt class="docutils literal"><span class="pre">Queue.Empty</span></tt> and <tt class="docutils literal"><span class="pre">Queue.Full</span></tt>
exceptions to signal a timeout. They are not available in the
<tt class="docutils literal"><span class="pre">processing</span></tt> namespace so you need to import them from <tt class="docutils literal"><span class="pre">Queue</span></tt>.</p>
</div>
<div class="warning">
<p class="first admonition-title">Warning</p>
<p class="last">If a process is killed while it is trying to use a <tt class="docutils literal"><span class="pre">Queue</span></tt> or
<tt class="docutils literal"><span class="pre">SimpleQueue</span></tt> then the data in the queue is likely to become
corrupted because it may become impossible to be sure where the
message boundaries lie. However, <tt class="docutils literal"><span class="pre">PosixQueue</span></tt> does not have this
problem.</p>
</div>
</div>
<div class="footer">
<hr class="footer" />
<a class="reference" href="process-objects.html">Prev</a> &nbsp; &nbsp; &nbsp; &nbsp; <a class="reference" href="processing-ref.html">Up</a> &nbsp; &nbsp; &nbsp; &nbsp; <a class="reference" href="connection-objects.html">Next</a>
</div>
</body>
</html>
.. include:: header.txt
===============
Queue objects
===============
The queue types provided by `processing` are multi-producer,
multi-consumer FIFO queues modelled on the `Queue.Queue` class in the
standard library.
In general it is best to stick to `processing.Queue` which has the
important advantage that (unless a maximum size is specified) putting
an object on the queue will always succeed without blocking. Without
this guarantee one must be much more careful to avoid the possibility
of deadlocks.
`Queue(maxsize=0)`
Returns a process shared queue implemented using a pipe and a
few locks/semaphores. A background thread transfers objects
from a buffer into the pipe.
It is a near clone of `Queue.Queue` except that the `qsize()`
method is not implemented and that the `task_done()` and
`join()` methods introduced in Python 2.5 are also missing.
`Queue` has a few additional methods which are usually
unnecessary:
`putmany(iterable)`
If the queue has infinite size then this adds all
items in the iterable to the queue's buffer. So
`q.putmany(X)` is a faster alternative to `for x in X:
q.put(x)`. Raises an error if the queue has finite
size.
`close()`
Indicates that no more data will be put on this queue by
the current process. The background thread will quit once
it has flushed all buffered data to the pipe. This is
called automatically when the queue is garbage collected.
`jointhread()`
This joins the background thread and can only be used
after `close()` has been called. This blocks until
the background thread exits, ensuring that all data in
the buffer has been flushed to the pipe.
By default if a process is not the creator of the
queue then on exit it will attempt to join the queue's
background thread. The process can call
`canceljoin()` to prevent this behaviour.
`canceljoin()`
Prevents the background thread from being joined
automatically when the process exits. Unnecessary if
the current process created the queue.
Note that if a process is killed while it is trying to receive
or send to a queue then the data in the queue is likely to
become corrupted (because it may become impossible to be sure
where the message boundaries lie).
`SimpleQueue()`
A simplified and faster alternative to `Queue()`. It is really
just a non-duplex pipe protected by a couple of locks.
It has `get()` and `put()` methods but these do not have
`block` or `timeout` arguments. It also has an `empty()`
method.
**Warning**: Unlike with `Queue` (when `maxsize` is zero)
using `put()` may block if the pipe's buffer does not have
sufficient space, so one must take care that no deadlocks are
possible.
`PosixQueue(maxsize=0, msgsize=0)`
A faster alternative to `Queue()` which is available on Unix
systems which support Posix message queues.
However, posix queues have a maximum number of messages that
can occupy the queue at a given time, and each message (when
expressed as a pickled string) has a maximum length --- see
`man 7 mq_overview`.
`maxsize` if specified determines the maximum number of items
that be in the queue. Note that unlike Pythons's normal queue
type if this is greater than a system defined maximum then an
error is raised. If `maxsize` is zero then this maximum value
is used.
`msgsize` if specified determines the maximum length that each
message can be (when expressed as a pickled string). If this
is greater than a system defined maximum then an error is
raised. If `msgsize` is zero then this maximum value is used.
If one tries to send a message which is too long then
`ValueError` will be raised.
.. admonition:: `Empty` and `Full`
`processing` uses the usual `Queue.Empty` and `Queue.Full`
exceptions to signal a timeout. They are not available in the
`processing` namespace so you need to import them from `Queue`.
.. warning::
If a process is killed while it is trying to use a `Queue` or
`SimpleQueue` then the data in the queue is likely to become
corrupted because it may become impossible to be sure where the
message boundaries lie. However, `PosixQueue` does not have this
problem.
.. _Prev: process-objects.html
.. _Up: processing-ref.html
.. _Next: connection-objects.html
<?xml version="1.0" encoding="utf-8" ?>
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">
<html xmlns="http://www.w3.org/1999/xhtml" xml:lang="en" lang="en">
<head>
<meta http-equiv="Content-Type" content="text/html; charset=utf-8" />
<meta name="generator" content="Docutils 0.4: http://docutils.sourceforge.net/" />
<title>Shared ctypes objects</title>
<link rel="stylesheet" href="html4css1.css" type="text/css" />
</head>
<body>
<div class="header">
<a class="reference" href="pool-objects.html">Prev</a> &nbsp; &nbsp; &nbsp; &nbsp; <a class="reference" href="processing-ref.html">Up</a> &nbsp; &nbsp; &nbsp; &nbsp; <a class="reference" href="connection-ref.html">Next</a>
<hr class="header"/>
</div>
<div class="document" id="shared-ctypes-objects">
<h1 class="title">Shared ctypes objects</h1>
<p>The <tt class="docutils literal"><span class="pre">processing.sharedctypes</span></tt> module provides functions for allocating
ctypes objects from shared memory which can be inherited by child
processes. (See the standard library's documentation for details of
the <tt class="docutils literal"><span class="pre">ctypes</span></tt> package.)</p>
<p>Note that access to a ctypes objects is not protected by any lock.
However accessing a ctypes object can be much faster (20+ times
faster) than accessing a synchronized shared object allocated using
<tt class="docutils literal"><span class="pre">LocalManager</span></tt>.</p>
<p>The functions in the module are</p>
<blockquote>
<dl class="docutils">
<dt><tt class="docutils literal"><span class="pre">new_value(fmt_or_type,</span> <span class="pre">*args)</span></tt></dt>
<dd><p class="first">Returns a ctypes object allocated from shared memory.</p>
<p class="last"><tt class="docutils literal"><span class="pre">fmt_or_type</span></tt> determines the type of the returned object: it
is either a ctypes type or a one character string format of
the kind used by the <tt class="docutils literal"><span class="pre">array</span></tt> module. The remaining arguments
are passed on to the constructor for the type.</p>
</dd>
<dt><tt class="docutils literal"><span class="pre">new_array(fmt_or_type,</span> <span class="pre">size_or_initializer)</span></tt></dt>
<dd><p class="first">Returns a ctypes array allocated from shared memory.</p>
<p class="last"><tt class="docutils literal"><span class="pre">fmt_or_type</span></tt> determines the type of the elements of the
returned array: it is either a ctypes type or a one character
string format of the kind used by the <tt class="docutils literal"><span class="pre">array</span></tt> module. If
<tt class="docutils literal"><span class="pre">size_or_initializer</span></tt> is an integer then it determines the
length of the array, and the array will be initially zeroed.
Otherwise <tt class="docutils literal"><span class="pre">size_or_initializer</span></tt> is a sequence which is used to
initialize the array and whose length determines the length of
the array.</p>
</dd>
<dt><tt class="docutils literal"><span class="pre">copy(obj)</span></tt></dt>
<dd>Returns a ctypes object allocated from shared memory which is
a copy of the ctypes object <tt class="docutils literal"><span class="pre">obj</span></tt>.</dd>
</dl>
</blockquote>
<div class="section">
<h1><a id="equivalences" name="equivalences">Equivalences</a></h1>
<p>The table below compares the syntax for creating a shared ctypes
object from shared memory with the normal ctypes syntax. (In the
table <tt class="docutils literal"><span class="pre">MyStruct</span></tt> is some subclass of <tt class="docutils literal"><span class="pre">ctypes.Structure</span></tt>.)</p>
<table border="1" class="docutils">
<colgroup>
<col width="37%" />
<col width="36%" />
<col width="27%" />
</colgroup>
<thead valign="bottom">
<tr><th class="head">sharedctypes using type</th>
<th class="head">sharedctypes using format</th>
<th class="head">ctypes</th>
</tr>
</thead>
<tbody valign="top">
<tr><td>new_value(c_double, 2.4)</td>
<td>new_value('d', 2.4)</td>
<td>c_double(2.4)</td>
</tr>
<tr><td>new_value(MyStruct, 4, 6)</td>
<td>&nbsp;</td>
<td>MyStruct(4, 6)</td>
</tr>
<tr><td>new_array(c_short, 7)</td>
<td>new_array('h', 7)</td>
<td>(c_short * 7)()</td>
</tr>
<tr><td>new_array(c_int, (9, 2, 8))</td>
<td>new_array('i', (9, 2, 8))</td>
<td>(c_int * 3)(9, 2, 8)</td>
</tr>
</tbody>
</table>
</div>
<div class="section">
<h1><a id="example" name="example">Example</a></h1>
<p>Below is an example where a number of ctypes objects are modified by a
child process</p>
<pre class="literal-block">
from processing import Process
from processing.sharedctypes import new_value, new_array
from ctypes import Structure, c_double
class Point(Structure):
_fields_ = [('x', c_double), ('y', c_double)]
def modify(n, x, s, A):
n.value **= 2
x.value **= 2
s.value = s.value.upper()
for p in A:
p.x **= 2
p.y **= 2
if __name__ == '__main__':
n = new_value('i', 7)
x = new_value('d', 1.0/3.0)
s = new_array('c', 'hello world')
A = new_array(Point, [(1.875, -6.25), (-5.75, 2.0), (2.375, 9.5)])
p = Process(target=modify, args=(n, x, s, A))
p.start()
p.join()
print n
print x
print s.value
print [(p.x, p.y) for p in A]
</pre>
<p>The results printed are</p>
<pre class="literal-block">
c_long(49)
c_double(0.1111111111111111)
HELLO WORLD
[(3.515625, 39.0625), (33.0625, 4.0), (5.640625, 90.25)]
</pre>
</div>
</div>
<div class="footer">
<hr class="footer" />
<a class="reference" href="pool-objects.html">Prev</a> &nbsp; &nbsp; &nbsp; &nbsp; <a class="reference" href="processing-ref.html">Up</a> &nbsp; &nbsp; &nbsp; &nbsp; <a class="reference" href="connection-ref.html">Next</a>
</div>
</body>
</html>
.. include:: header.txt
========================
Shared ctypes objects
========================
The `processing.sharedctypes` module provides functions for allocating
ctypes objects from shared memory which can be inherited by child
processes. (See the standard library's documentation for details of
the `ctypes` package.)
Note that access to a ctypes objects is not protected by any lock.
However accessing a ctypes object can be much faster (20+ times
faster) than accessing a synchronized shared object allocated using
`LocalManager`.
The functions in the module are
`new_value(fmt_or_type, *args)`
Returns a ctypes object allocated from shared memory.
`fmt_or_type` determines the type of the returned object: it
is either a ctypes type or a one character string format of
the kind used by the `array` module. The remaining arguments
are passed on to the constructor for the type.
`new_array(fmt_or_type, size_or_initializer)`
Returns a ctypes array allocated from shared memory.
`fmt_or_type` determines the type of the elements of the
returned array: it is either a ctypes type or a one character
string format of the kind used by the `array` module. If
`size_or_initializer` is an integer then it determines the
length of the array, and the array will be initially zeroed.
Otherwise `size_or_initializer` is a sequence which is used to
initialize the array and whose length determines the length of
the array.
`copy(obj)`
Returns a ctypes object allocated from shared memory which is
a copy of the ctypes object `obj`.
Equivalences
============
The table below compares the syntax for creating a shared ctypes
object from shared memory with the normal ctypes syntax. (In the
table `MyStruct` is some subclass of `ctypes.Structure`.)
============================ =========================== ====================
sharedctypes using type sharedctypes using format ctypes
============================ =========================== ====================
new_value(c_double, 2.4) new_value('d', 2.4) c_double(2.4)
new_value(MyStruct, 4, 6) MyStruct(4, 6)
new_array(c_short, 7) new_array('h', 7) (c_short * 7)()
new_array(c_int, (9, 2, 8)) new_array('i', (9, 2, 8)) (c_int * 3)(9, 2, 8)
============================ =========================== ====================
Example
=======
Below is an example where a number of ctypes objects are modified by a
child process ::
from processing import Process
from processing.sharedctypes import new_value, new_array
from ctypes import Structure, c_double
class Point(Structure):
_fields_ = [('x', c_double), ('y', c_double)]
def modify(n, x, s, A):
n.value **= 2
x.value **= 2
s.value = s.value.upper()
for p in A:
p.x **= 2
p.y **= 2
if __name__ == '__main__':
n = new_value('i', 7)
x = new_value('d', 1.0/3.0)
s = new_array('c', 'hello world')
A = new_array(Point, [(1.875, -6.25), (-5.75, 2.0), (2.375, 9.5)])
p = Process(target=modify, args=(n, x, s, A))
p.start()
p.join()
print n
print x
print s.value
print [(p.x, p.y) for p in A]
The results printed are ::
c_long(49)
c_double(0.1111111111111111)
HELLO WORLD
[(3.515625, 39.0625), (33.0625, 4.0), (5.640625, 90.25)]
.. _Prev: pool-objects.html
.. _Up: processing-ref.html
.. _Next: connection-ref.html
.. |version| replace:: 0.39
#
# Module which supports allocation of memory from an mmap
#
# processing/heap.py
#
# Copyright (c) 2007, R Oudkerk --- see COPYING.txt
#
import bisect
import mmap
import tempfile
import os
import sys
import thread
from processing import Finalize, _processing, currentProcess
__all__ = ['BufferWrapper']
#
# Class representing an mmap - can be inherited by child processes
#
class MMapWrapper(object):
__slots__ = ('__data', '__weakref__')
mmap = property(lambda self: self.__data[0])
size = property(lambda self: self.__data[1])
name = property(lambda self: self.__data[2])
def __init__(self, size):
fd, name = tempfile.mkstemp(prefix='pym-')
remaining = size
while remaining > 0:
remaining -= os.write(fd, '\0' * remaining)
mmap_ = mmap.mmap(fd, size)
os.close(fd)
self.__data = (mmap_, size, name)
if sys.platform in ('win32', 'cygwin'):
Finalize(
self, MMapWrapper._finalize_heap, args=[mmap_, name],
atexit=True, priority=-10
)
else:
os.unlink(name)
def __getstate__(self):
assert sys.platform == 'win32'
return (self.name, self.size)
def __setstate__(self, state):
assert getattr(currentProcess(), '_unpickling', False), \
'mmaps should only be shared using process inheritance'
name, size = state
fd = os.open(name, os.O_RDWR | os.O_BINARY, int('0600', 8))
mmap_ = mmap.mmap(fd, size)
os.close(fd)
self.__data = (mmap_, size, name)
@staticmethod
def _finalize_heap(mmap, name):
import os
mmap.close()
os.unlink(name)
#
# Class allowing allocation of (unmovable) chunks of memory from mmaps
#
class Heap(object):
_alignment = 4
def __init__(self, size=1024):
self._lastpid = os.getpid()
self._lock = thread.allocate_lock()
self._size = size
self._lengths = []
self._len_to_seq = {}
self._start_to_location = {}
self._stop_to_location = {}
self._allocated_locations = set()
self._mmaps = []
def _roundup(self, n):
n = max(1, n)
q, r = divmod(n, self._alignment)
if r:
q += 1
return q * self._alignment
def _malloc(self, size):
# returns a large enough block -- it might be much larger
i = bisect.bisect_left(self._lengths, size)
if i == len(self._lengths):
length = max(self._size, size)
self._size *= 2
mmap = MMapWrapper(length)
self._mmaps.append(mmap)
return (mmap, 0, length)
else:
length = self._lengths[i]
seq = self._len_to_seq[length]
location = seq.pop()
if not seq:
del self._len_to_seq[length], self._lengths[i]
(mmap, start, stop) = location
del self._start_to_location[(mmap, start)]
del self._stop_to_location[(mmap, stop)]
return location
def _free(self, location):
# free location and try to merge with neighbours
(mmap, start, stop) = location
try:
prev_location = self._stop_to_location[(mmap, start)]
except KeyError:
pass
else:
start, _ = self._absorb(prev_location)
try:
next_location = self._start_to_location[(mmap, stop)]
except KeyError:
pass
else:
_, stop = self._absorb(next_location)
location = (mmap, start, stop)
length = stop - start
try:
self._len_to_seq[length].append(location)
except KeyError:
self._len_to_seq[length] = [location]
bisect.insort(self._lengths, length)
self._start_to_location[(mmap, start)] = location
self._stop_to_location[(mmap, stop)] = location
def _absorb(self, location):
# deregister this block so it can be merged with a neighbour
(mmap, start, stop) = location
del self._start_to_location[(mmap, start)]
del self._stop_to_location[(mmap, stop)]
length = stop - start
seq = self._len_to_seq[length]
seq.remove(location)
if not seq:
del self._len_to_seq[length]
self._lengths.remove(length)
return start, stop
def free(self, location):
# free a block returned by malloc()
self._lock.acquire()
try:
self._allocated_locations.remove(location)
self._free(location)
finally:
self._lock.release()
def malloc(self, size):
# return a block of right size (possibly rounded up)
self._lock.acquire()
try:
if os.getpid() != self._lastpid:
self.__init__() # reinitialize after fork
size = self._roundup(size)
(mmap, start, stop) = self._malloc(size)
new_stop = start + size
if new_stop < stop:
self._free((mmap, new_stop, stop))
location = (mmap, start, new_stop)
self._allocated_locations.add(location)
return location
finally:
self._lock.release()
def _dump(self):
self._verify(dump=True)
def _verify(self, dump=False):
all = []
occupied = 0
for L in self._len_to_seq.values():
for mmap, start, stop in L:
all.append((self._mmaps.index(mmap), start, stop,
stop-start, 'free'))
for mmap, start, stop in self._allocated_locations:
all.append((self._mmaps.index(mmap), start, stop,
stop-start, 'occupied'))
occupied += (stop-start)
all.sort()
if dump:
for line in all:
print '%8s%8s%8s%8s %s' % line
lengths = [len(w.mmap) for w in self._mmaps]
print 'mmap sizes =', lengths
print 'total size =', sum(lengths)
for i in range(len(all)-1):
(mmap, start, stop) = all[i][:3]
(nmmap, nstart, nstop) = all[i+1][:3]
assert ((mmap != nmmap and nstart == 0) or (stop == nstart))
#
# Class representing a chunk of an mmap -- can be inherited
#
class BufferWrapper(object):
__slots__ = ('__data', '__weakref__')
_heap = Heap()
location = property(lambda self : self.__data[0])
size = property(lambda self : self.__data[1])
def __init__(self, size):
assert 0 <= size <= sys.maxint
location = BufferWrapper._heap.malloc(size)
self.__setstate__((location, size))
Finalize(self, BufferWrapper._heap.free, args=[self.location])
def getaddress(self):
w, start, stop = self.location
address, length = _processing.address_of_buffer(w.mmap)
assert self.size <= length
return address + start
def getview(self):
w, start, stop = self.location
return _processing.rwbuffer(w.mmap, start, self.size)
def __getstate__(self):
assert sys.platform == 'win32'
return self.__data
def __setstate__(self, state):
self.__data = state
#
# Test
#
def test():
import random
iterations = 10000
maxblocks = 50
blocks = []
maxsize = 0
occ = 0
maxocc = 0
for i in xrange(iterations):
size = int(random.lognormvariate(0, 1) * 1000)
b = BufferWrapper(size)
occ += size
maxocc = max(maxocc, occ)
maxsize = max(maxsize, size)
blocks.append(b)
if len(blocks) > maxblocks:
i = random.randrange(maxblocks)
del blocks[i]
occ -= size
BufferWrapper._heap._dump()
print 'max size of a block =', maxsize
print 'max size occupied =', maxocc
print 'currently occupied =', occ
if __name__ == '__main__':
test()
#
# Module which supports allocation of ctypes objects from shared memory
#
# processing/sharedctypes.py
#
# Copyright (c) 2007, R Oudkerk --- see COPYING.txt
#
import weakref
import ctypes
import sys
from processing import heap, Lock
__all__ = ['new_value', 'new_array', 'copy']
#
#
#
def _new_value(type_):
size = ctypes.sizeof(type_)
wrapper = heap.BufferWrapper(size)
return rebuild_ctype(type_, wrapper, None)
def new_value(fmt_or_type, *args):
type_ = gettype(fmt_or_type)
obj = _new_value(type_)
ctypes.memset(ctypes.addressof(obj), 0, ctypes.sizeof(obj))
obj.__init__(*args)
return obj
def new_array(fmt_or_type, size_or_initializer):
type_ = gettype(fmt_or_type)
if isinstance(type_, str):
type_ = _fmt_to_type[type_]
if isinstance(size_or_initializer, int):
type_ = type_ * size_or_initializer
return new_value(type_)
else:
type_ = type_ * len(size_or_initializer)
result = _new_value(type_)
result.__init__(*size_or_initializer)
return result
def copy(obj):
new_obj = _new_value(type(obj))
ctypes.pointer(new_obj)[0] = obj
return new_obj
#
# Functions for pickling/unpickling
#
def reduce_ctype(obj):
assert sys.platform == 'win32'
if isinstance(obj, ctypes.Array):
return rebuild_ctype, (obj._type_, obj._wrapper, obj._length_)
else:
return rebuild_ctype, (type(obj), obj._wrapper, None)
def rebuild_ctype(type_, wrapper, length):
if length is not None:
type_ = type_ * length
fixup(type_)
obj = type_.from_address(wrapper.getaddress())
obj._wrapper = wrapper
return obj
def fixup(type_):
if (sys.platform == 'win32' and type_.__reduce__ is not reduce_ctype):
type_.__reduce__ = reduce_ctype
#
# Function which converts format strings to ctype types
#
def gettype(fmt_or_type, mapping={}):
if not mapping:
for name in dir(ctypes):
if name[:2] == 'c_':
T = getattr(ctypes, name)
if hasattr(T, '_type_'):
mapping[T._type_] = T
mapping.update(i=ctypes.c_int, I=ctypes.c_uint,
l=ctypes.c_long, L=ctypes.c_ulong)
return mapping.get(fmt_or_type, fmt_or_type)
#
# Tests
#
class _Foo(ctypes.Structure):
_fields_ = [
('x', ctypes.c_int),
('y', ctypes.c_double)
]
def _test(x, y, foo, arr, string):
x.value **= 2
y.value **= 2
foo.x **= 2
foo.y **= 2
for i in range(len(arr)):
arr[i] **= 2
string.value = string.value.upper()
def test():
from processing import Process
x = new_value('i', 7)
y = new_value(ctypes.c_double, 1.0/3.0)
foo = new_value(_Foo, 3, 2)
arr = new_array('d', range(10))
string = new_array('c', 'hello world')
bar = copy(foo)
assert (foo.x, foo.y) == (bar.x, bar.y)
p = Process(target=_test, args=(x, y, foo, arr, string))
p.start()
p.join()
print x.value
print y.value
print (foo.x, foo.y)
print arr[:]
print string.value
if __name__ == '__main__':
test()
#ifndef PROCESSING_DEFS_H
#define PROCESSING_DEFS_H
#define PY_SSIZE_T_CLEAN
#include "Python.h"
#if PY_VERSION_HEX < 0x02050000 && !defined(PY_SSIZE_T_MIN)
typedef int Py_ssize_t;
# define PY_SSIZE_T_MAX INT_MAX
# define PY_SSIZE_T_MIN INT_MIN
# define N_FMT "i"
#else
# define N_FMT "n"
#endif
#endif /* PROCESSING_DEFS_H */
#ifndef PROCESSING_H
#define PROCESSING_H
static PyObject*
processing_rwbuffer(PyObject *self, PyObject *args)
{
PyObject *obj;
Py_ssize_t offset = 0, size = Py_END_OF_BUFFER;
if (!PyArg_ParseTuple(args, "O|" N_FMT N_FMT, &obj, &offset, &size))
return NULL;
return PyBuffer_FromReadWriteObject(obj, offset, size);
}
static PyObject*
processing_address_of_buffer(PyObject *self, PyObject *obj)
{
void *buffer;
Py_ssize_t buffer_len;
if (PyObject_AsWriteBuffer(obj, &buffer, &buffer_len) < 0)
return NULL;
return Py_BuildValue(N_FMT N_FMT, buffer, buffer_len);
}
#endif /* PROCESSING_H */
+4
-4

@@ -39,3 +39,3 @@ #

__version__ = '0.38'
__version__ = '0.39'

@@ -103,8 +103,8 @@ __all__ = [

def Pipe():
def Pipe(duplex=True):
'''
Returns two connection object connected by a duplex pipe
Returns two connection object connected by a pipe
'''
from processing.connection import Pipe
return Pipe()
return Pipe(duplex)

@@ -111,0 +111,0 @@ def cpuCount():

@@ -5,2 +5,30 @@ ========================================

Changes in 0.39
---------------
* One can now create one-way pipes by doing
`reader, writer = Pipe(duplex=False)`.
* Rewrote code for managing shared memory maps.
* Added a `sharedctypes` module for creating `ctypes` objects allocated
from shared memory. On Python 2.4 this requires the installation of
`ctypes`.
`ctypes` objects are not protected by any locks so you will need to
synchronize access to them (such as by using a lock). However they
can be much faster to access than equivalent objects allocated using
a `LocalManager`.
* Rearranged documentation.
* Previously the C extension caused a segfault on 64 bit machines with
Python 2.5 because it used `int` instead of `Py_ssize_t` in certain
places. This is now fixed. Thanks to Alexy Khrabrov for the report.
* A fix for `Pool.terminate()`.
* A fix for cleanup behaviour of `Queue`.
Changes in 0.38

@@ -7,0 +35,0 @@ ---------------

@@ -256,23 +256,54 @@ #

def Pipe():
def Pipe(duplex=True):
'''
Returns pair of connection objects at either end of a duplex connection
Returns pair of connection objects at either end of a pipe
'''
if sys.platform == 'win32':
assert '_processing' in globals()
l = Listener()
a = Client(l.address)
b = l.accept()
l.close()
return a, b
address = arbitrary_address('AF_PIPE')
PIPE_ACCESS_INBOUND = 1
PIPE_ACCESS_OUTBOUND = 2
GENERIC_READ = 0x80000000
GENERIC_WRITE = 0x40000000
if duplex:
openmode = PIPE_ACCESS_INBOUND | PIPE_ACCESS_OUTBOUND
access = GENERIC_READ | GENERIC_WRITE
else:
openmode = PIPE_ACCESS_INBOUND
access = GENERIC_WRITE
a = _processing.createpipe(address, openmode=openmode)
_processing.waitpipe(address, 1000)
b = _processing.createfile(address, access)
_processing.connectpipe(a)
c = _processing.PipeConnection(a)
d = _processing.PipeConnection(b)
_processing.CloseHandle(a)
_processing.CloseHandle(b)
return c, d
else:
a, b = socket.socketpair()
if duplex:
a, b = socket.socketpair()
else:
a, b = os.pipe()
try:
Connection = _processing.SocketConnection
c, d = Connection(a.fileno()), Connection(b.fileno())
a.close()
b.close()
except NameError:
Connection = _SocketConnection
c, d = Connection(a), Connection(b)
else:
if duplex:
c, d = Connection(a.fileno()), Connection(b.fileno())
a.close()
b.close()
else:
c, d = Connection(a), Connection(b)
os.close(a)
os.close(b)
return c, d

@@ -330,3 +361,3 @@

family = address_type(address)
s = socket.socket( getattr(socket, family) )
s = socket.socket(getattr(socket, family))

@@ -412,3 +443,5 @@ endtime = time.time() + 10

return _processing.PipeConnection(h)
conn = _processing.PipeConnection(h)
_processing.CloseHandle(h)
return conn

@@ -490,2 +523,4 @@ #

w = _processing.SocketConnection(w)
os.close(r)
os.close(w)

@@ -492,0 +527,0 @@ print 'sending: ', obj

@@ -7,3 +7,3 @@ <?xml version="1.0" encoding="utf-8" ?>

<meta name="generator" content="Docutils 0.4: http://docutils.sourceforge.net/" />
<title>Connection module</title>
<title>Listeners and Clients</title>
<link rel="stylesheet" href="html4css1.css" type="text/css" />

@@ -13,13 +13,13 @@ </head>

<div class="header">
<a class="reference" href="pool-objects.html">Prev</a> &nbsp; &nbsp; &nbsp; &nbsp; <a class="reference" href="processing-ref.html">Up</a> &nbsp; &nbsp; &nbsp; &nbsp; <a class="reference" href="programming-guidelines.html">Next</a>
<a class="reference" href="sharedctypes.html">Prev</a> &nbsp; &nbsp; &nbsp; &nbsp; <a class="reference" href="processing-ref.html">Up</a> &nbsp; &nbsp; &nbsp; &nbsp; <a class="reference" href="programming-guidelines.html">Next</a>
<hr class="header"/>
</div>
<div class="document" id="connection-module">
<h1 class="title">Connection module</h1>
<p>The <tt class="docutils literal"><span class="pre">connection</span></tt> module allows the sending of picklable objects
between processes using sockets or (on Windows) named pipes. It
also has support for <em>digest authentication</em> (using the <tt class="docutils literal"><span class="pre">hmac</span></tt> module
from the standard library).</p>
<p>If the C extension <tt class="docutils literal"><span class="pre">_processing</span></tt> is not available then connections
will be slower and on Windows one will not be able to use named pipes.</p>
<div class="document" id="listeners-and-clients">
<h1 class="title">Listeners and Clients</h1>
<p>Usually message passing between processes is done using queues or by
using connection objects returned by <tt class="docutils literal"><span class="pre">Pipe()</span></tt>.</p>
<p>However, the <tt class="docutils literal"><span class="pre">processing.connection</span></tt> module allows some extra
flexibility. It basically gives a high level API for dealing with
sockets or Windows named pipes, and also has support for <em>digest
authentication</em> using the <tt class="docutils literal"><span class="pre">hmac</span></tt> module from the standard library.</p>
<div class="section">

@@ -35,7 +35,7 @@ <h1><a id="classes-and-functions" name="classes-and-functions">Classes and functions</a></h1>

<dd><p class="first">Attempts to set up a connection to the listener which is using
address <tt class="docutils literal"><span class="pre">address</span></tt>.</p>
address <tt class="docutils literal"><span class="pre">address</span></tt>, returning a <a class="reference" href="connection-object.html">connection object</a>.</p>
<p>The type of the connection is determined by <tt class="docutils literal"><span class="pre">family</span></tt>
argument, but this can generally be omitted since it can
usually be inferred from the format of <tt class="docutils literal"><span class="pre">address</span></tt>.</p>
<p>If <tt class="docutils literal"><span class="pre">authentication</span></tt> or <tt class="docutils literal"><span class="pre">authkey</span></tt> is a string then then digest
<p class="last">If <tt class="docutils literal"><span class="pre">authentication</span></tt> or <tt class="docutils literal"><span class="pre">authkey</span></tt> is a string then digest
authentication is used. The key used for authentication will

@@ -45,26 +45,3 @@ be either <tt class="docutils literal"><span class="pre">authkey</span></tt> or <tt class="docutils literal"><span class="pre">currentProcess.getAuthKey()</span></tt> if

<tt class="docutils literal"><span class="pre">AuthenticationError</span></tt> is raised. See <a class="reference" href="#authentication-keys">Authentication keys</a>.</p>
<p class="last">A <tt class="docutils literal"><span class="pre">Connection</span></tt> object is returned. (See <a class="reference" href="#connection-objects">Connection
objects</a>.)</p>
</dd>
<dt><tt class="docutils literal"><span class="pre">Pipe()</span></tt></dt>
<dd><p class="first">Returns a pair of two connection objects (see <a class="reference" href="#connection-objects">Connection
objects</a>) representing the ends of a duplex connection.
It is also available directly from <tt class="docutils literal"><span class="pre">processing</span></tt>.</p>
<p>For example:</p>
<pre class="literal-block">
from processing import Process, Pipe
def foo(conn):
conn.send(42)
if __name__ == '__main__':
c, d = Pipe()
p = Process(target=foo, args=[d])
p.start()
print c.recv() # prints 42
p.join()
</pre>
<p class="last">Note that at most one thread/process should be sending or
receiving from a connection object at a given time.</p>
</dd>
</dl>

@@ -152,3 +129,3 @@ </blockquote>

then <tt class="docutils literal"><span class="pre">AuthenticationError</span></tt> is raised.</p>
<p class="last">Returns a <tt class="docutils literal"><span class="pre">Connection</span></tt> object. See <a class="reference" href="#connection-objects">Connection objects</a>.</p>
<p class="last">Returns a <tt class="docutils literal"><span class="pre">connection</span> <span class="pre">object</span> <span class="pre">&lt;connection-object.html&gt;</span></tt> object.</p>
</dd>

@@ -175,61 +152,2 @@ <dt><tt class="docutils literal"><span class="pre">close()</span></tt></dt>

<div class="section">
<h1><a id="connection-objects" name="connection-objects">Connection objects</a></h1>
<p>A connection object represents one end of a message oriented socket or
pipe connection. Connection objects have the following methods:</p>
<blockquote>
<dl class="docutils">
<dt><tt class="docutils literal"><span class="pre">send(obj)</span></tt></dt>
<dd><p class="first">Send an object to the other end of the connection which should
be read using <tt class="docutils literal"><span class="pre">recv()</span></tt>.</p>
<p class="last">The object must be picklable.</p>
</dd>
<dt><tt class="docutils literal"><span class="pre">recv()</span></tt></dt>
<dd>Return an object sent from the other end of the connection
using <tt class="docutils literal"><span class="pre">send()</span></tt>.</dd>
<dt><tt class="docutils literal"><span class="pre">fileno()</span></tt></dt>
<dd>Returns the file descriptor or handle used by the connection.</dd>
<dt><tt class="docutils literal"><span class="pre">close()</span></tt></dt>
<dd><p class="first">Close the connection.</p>
<p class="last">This is called automatically when the connection is garbage
collected.</p>
</dd>
<dt><tt class="docutils literal"><span class="pre">poll(timeout)</span></tt></dt>
<dd>Return whether there is any data available to be read.</dd>
<dt><tt class="docutils literal"><span class="pre">sendbytes(buffer)</span></tt></dt>
<dd><p class="first">Send byte data from an object supporting the buffer interface
as a complete message.</p>
<p class="last">Can be used to send strings.</p>
</dd>
<dt><tt class="docutils literal"><span class="pre">recvbytes()</span></tt></dt>
<dd>Return a complete message of byte data sent from the other end
of the connection as a string.</dd>
<dt><tt class="docutils literal"><span class="pre">recvbytes_into(buffer)</span></tt></dt>
<dd><p class="first">Read into buffer a complete message of byte data sent from the
other end of the connection and return the number of bytes in
the message.</p>
<p class="last">If the buffer is too short then a <tt class="docutils literal"><span class="pre">BufferTooShort</span></tt> exception
is raised and the complete message of bytes data is available
as <tt class="docutils literal"><span class="pre">e.args[0]</span></tt> where <tt class="docutils literal"><span class="pre">e</span></tt> is the exception instance.</p>
</dd>
</dl>
</blockquote>
<div class="warning">
<p class="first admonition-title">Warning</p>
<p class="last">The <tt class="docutils literal"><span class="pre">recv()</span></tt> method automatically unpickles the data it
receives which can be a security risk. Therefore if you are using
the <tt class="docutils literal"><span class="pre">recv()</span></tt> and <tt class="docutils literal"><span class="pre">send()</span></tt> methods you should be using some
form of authentication. See <a class="reference" href="#authentication-keys">Authentication keys</a>.</p>
</div>
</div>
<div class="section">
<h1><a id="transferring-connection-objects-between-processes" name="transferring-connection-objects-between-processes">Transferring connection objects between processes</a></h1>
<p>If the C extension <tt class="docutils literal"><span class="pre">processing._process</span></tt> is available and contains
support then socket objects, connection objects and file objects can
be successfully pickled in one process and unpickled in another.</p>
<p>Note however that on Windows there is no <tt class="docutils literal"><span class="pre">socket.fromfd()</span></tt> function.
As a result on Windows an unpickled socket object is not a true socket
object: only the <tt class="docutils literal"><span class="pre">recv()</span></tt>, <tt class="docutils literal"><span class="pre">send()</span></tt>, <tt class="docutils literal"><span class="pre">sendall()</span></tt>, <tt class="docutils literal"><span class="pre">close()</span></tt> and
<tt class="docutils literal"><span class="pre">fileno()</span></tt> methods will work.</p>
</div>
<div class="section">
<h1><a id="address-formats" name="address-formats">Address formats</a></h1>

@@ -322,5 +240,5 @@ <ul>

<hr class="footer" />
<a class="reference" href="pool-objects.html">Prev</a> &nbsp; &nbsp; &nbsp; &nbsp; <a class="reference" href="processing-ref.html">Up</a> &nbsp; &nbsp; &nbsp; &nbsp; <a class="reference" href="programming-guidelines.html">Next</a>
<a class="reference" href="sharedctypes.html">Prev</a> &nbsp; &nbsp; &nbsp; &nbsp; <a class="reference" href="processing-ref.html">Up</a> &nbsp; &nbsp; &nbsp; &nbsp; <a class="reference" href="programming-guidelines.html">Next</a>
</div>
</body>
</html>
.. include:: header.txt
===================
Connection module
===================
=======================
Listeners and Clients
=======================
The `connection` module allows the sending of picklable objects
between processes using sockets or (on Windows) named pipes. It
also has support for *digest authentication* (using the `hmac` module
from the standard library).
Usually message passing between processes is done using queues or by
using connection objects returned by `Pipe()`.
If the C extension `_processing` is not available then connections
will be slower and on Windows one will not be able to use named pipes.
However, the `processing.connection` module allows some extra
flexibility. It basically gives a high level API for dealing with
sockets or Windows named pipes, and also has support for *digest
authentication* using the `hmac` module from the standard library.

@@ -27,3 +27,4 @@

Attempts to set up a connection to the listener which is using
address `address`.
address `address`, returning a `connection object
<connection-object.html>`_.

@@ -34,3 +35,3 @@ The type of the connection is determined by `family`

If `authentication` or `authkey` is a string then then digest
If `authentication` or `authkey` is a string then digest
authentication is used. The key used for authentication will

@@ -41,27 +42,2 @@ be either `authkey` or `currentProcess.getAuthKey()` if

A `Connection` object is returned. (See `Connection
objects`_.)
`Pipe()`
Returns a pair of two connection objects (see `Connection
objects`_) representing the ends of a duplex connection.
It is also available directly from `processing`.
For example::
from processing import Process, Pipe
def foo(conn):
conn.send(42)
if __name__ == '__main__':
c, d = Pipe()
p = Process(target=foo, args=[d])
p.start()
print c.recv() # prints 42
p.join()
Note that at most one thread/process should be sending or
receiving from a connection object at a given time.
..

@@ -158,3 +134,3 @@ `deliver_challenge(connection, authkey)`

Returns a `Connection` object. See `Connection objects`_.
Returns a `connection object <connection-object.html>` object.

@@ -179,71 +155,2 @@ `close()`

Connection objects
==================
A connection object represents one end of a message oriented socket or
pipe connection. Connection objects have the following methods:
`send(obj)`
Send an object to the other end of the connection which should
be read using `recv()`.
The object must be picklable.
`recv()`
Return an object sent from the other end of the connection
using `send()`.
`fileno()`
Returns the file descriptor or handle used by the connection.
`close()`
Close the connection.
This is called automatically when the connection is garbage
collected.
`poll(timeout)`
Return whether there is any data available to be read.
`sendbytes(buffer)`
Send byte data from an object supporting the buffer interface
as a complete message.
Can be used to send strings.
`recvbytes()`
Return a complete message of byte data sent from the other end
of the connection as a string.
`recvbytes_into(buffer)`
Read into buffer a complete message of byte data sent from the
other end of the connection and return the number of bytes in
the message.
If the buffer is too short then a `BufferTooShort` exception
is raised and the complete message of bytes data is available
as `e.args[0]` where `e` is the exception instance.
.. warning::
The `recv()` method automatically unpickles the data it
receives which can be a security risk. Therefore if you are using
the `recv()` and `send()` methods you should be using some
form of authentication. See `Authentication keys`_.
Transferring connection objects between processes
=================================================
If the C extension `processing._process` is available and contains
support then socket objects, connection objects and file objects can
be successfully pickled in one process and unpickled in another.
Note however that on Windows there is no `socket.fromfd()` function.
As a result on Windows an unpickled socket object is not a true socket
object: only the `recv()`, `send()`, `sendall()`, `close()` and
`fileno()` methods will work.
Address formats

@@ -340,5 +247,5 @@ ===============

.. _Prev: pool-objects.html
.. _Prev: sharedctypes.html
.. _Up: processing-ref.html
.. _Next: programming-guidelines.html

@@ -7,3 +7,3 @@ <?xml version="1.0" encoding="utf-8" ?>

<meta name="generator" content="Docutils 0.4: http://docutils.sourceforge.net/" />
<title>Documentation for processing</title>
<title>Documentation for processing-0.39</title>
<meta name="author" content="R Oudkerk" />

@@ -17,4 +17,4 @@ <link rel="stylesheet" href="html4css1.css" type="text/css" />

</div>
<div class="document" id="documentation-for-processing">
<h1 class="title">Documentation for processing</h1>
<div class="document" id="documentation-for-processing-version">
<h1 class="title">Documentation for processing-0.39</h1>
<table class="docinfo" frame="void" rules="none">

@@ -43,6 +43,9 @@ <col class="docinfo-name" />

<li><a class="reference" href="process-objects.html">Process objects</a></li>
<li><a class="reference" href="queue-objects.html">Queue objects</a></li>
<li><a class="reference" href="connection-objects.html">Connection objects</a></li>
<li><a class="reference" href="manager-objects.html">Manager objects</a></li>
<li><a class="reference" href="proxy-objects.html">Proxy objects</a></li>
<li><a class="reference" href="pool-objects.html">Pool objects</a></li>
<li><a class="reference" href="connection-ref.html">connection module</a></li>
<li><a class="reference" href="sharedctypes.html">Shared ctypes object</a></li>
<li><a class="reference" href="connection-ref.html">Listeners and Clients</a></li>
</ul>

@@ -49,0 +52,0 @@ </dd>

.. include:: header.txt
.. include:: version.txt
==============================
Documentation for processing
==============================
========================================
Documentation for processing-|version|
========================================

@@ -19,6 +20,9 @@ :Author: R Oudkerk

+ `Process objects <process-objects.html>`_
+ `Queue objects <queue-objects.html>`_
+ `Connection objects <connection-objects.html>`_
+ `Manager objects <manager-objects.html>`_
+ `Proxy objects <proxy-objects.html>`_
+ `Pool objects <pool-objects.html>`_
+ `connection module <connection-ref.html>`_
+ `Shared ctypes object <sharedctypes.html>`_
+ `Listeners and Clients <connection-ref.html>`_

@@ -25,0 +29,0 @@ * `Programming guidelines <programming-guidelines.html>`_

@@ -114,12 +114,8 @@ <?xml version="1.0" encoding="utf-8" ?>

</pre>
<p>Queues are thread and process safe.</p>
<p class="last">Note that if you need to guarantee that the <tt class="docutils literal"><span class="pre">put()</span></tt> method
will succeed without blocking -- which is sometimes very
useful when ensuring that a deadlock cannot occur -- then you
can use <tt class="docutils literal"><span class="pre">BufferedQueue()</span></tt> instead of <tt class="docutils literal"><span class="pre">Queue()</span></tt>.
See <a class="reference" href="processing-ref.html#pipes-and-queues">Queues</a>.</p>
<p class="last">Queues are thread and process safe. See <a class="reference" href="processing-ref.html#pipes-and-queues">Queues</a>.</p>
</dd>
<dt><strong>Pipes</strong>:</dt>
<dd><p class="first">The <tt class="docutils literal"><span class="pre">Pipe()</span></tt> function returns a pair of connection objects
connected by a duplex (two-way) pipe, for example</p>
connected by a pipe which by default is duplex (two-way). For
example</p>
<pre class="literal-block">

@@ -213,3 +209,3 @@ from processing import Process, Pipe

</pre>
<p class="last">Note that <tt class="docutils literal"><span class="pre">SharedValue</span></tt>, <tt class="docutils literal"><span class="pre">SharedStruct</span></tt> and <tt class="docutils literal"><span class="pre">SharedArray</span></tt> objects
<p>Note that <tt class="docutils literal"><span class="pre">SharedValue</span></tt>, <tt class="docutils literal"><span class="pre">SharedStruct</span></tt> and <tt class="docutils literal"><span class="pre">SharedArray</span></tt> objects
are thread and process safe. The <tt class="docutils literal"><span class="pre">'i'</span></tt> and <tt class="docutils literal"><span class="pre">'d256p'</span></tt> arguments

@@ -219,3 +215,8 @@ used when creating <tt class="docutils literal"><span class="pre">num</span></tt>, <tt class="docutils literal"><span class="pre">struct</span></tt> and <tt class="docutils literal"><span class="pre">array</span></tt> are format strings

indicates a signed integer, <tt class="docutils literal"><span class="pre">'d'</span></tt> indicates a double precision
float and <tt class="docutils literal"><span class="pre">'256p'</span></tt> indicates a string of length less than 256.</p>
float and <tt class="docutils literal"><span class="pre">'256p'</span></tt> indicates a string of length less than 256. See
<a class="reference" href="manager-objects.html#shared-memory-managers">Shared memory managers</a>.</p>
<p class="last">A faster alternative to <tt class="docutils literal"><span class="pre">LocalManager</span></tt> is the <tt class="docutils literal"><span class="pre">processing.sharedctypes</span></tt>
module which supports the creation of <a class="reference" href="sharedctypes.html">ctypes objects allocated from
shared memory</a>. However, ctypes objects are not
&quot;process safe&quot;, so one must synchronize access to them.</p>
</dd>

@@ -265,3 +266,4 @@ <dt><strong>Server process</strong>:</dt>

can be shared by different computers over a network. They are,
however, slower than using shared memory.</p>
however, slower than using shared memory. See <a class="reference" href="manager-objects.html#server-process-managers">Server process
managers</a>.</p>
</dd>

@@ -429,9 +431,13 @@ </dl>

<tr><td>list</td>
<td>6,000,000</td>
<td>4,800,000</td>
<td>6,400,000</td>
<td>5,100,000</td>
</tr>
<tr><td>shared memory array</td>
<td>120,000</td>
<td>110,000</td>
<tr><td>ctypes shared array</td>
<td>3,600,000</td>
<td>3,100,000</td>
</tr>
<tr><td>LocalManager shared array</td>
<td>125,000</td>
<td>135,000</td>
</tr>
<tr><td>list managed by server</td>

@@ -438,0 +444,0 @@ <td>20,000</td>

@@ -119,13 +119,9 @@ .. include:: header.txt

Queues are thread and process safe.
Queues are thread and process safe. See `Queues
<processing-ref.html#pipes-and-queues>`_.
Note that if you need to guarantee that the `put()` method
will succeed without blocking -- which is sometimes very
useful when ensuring that a deadlock cannot occur -- then you
can use `BufferedQueue()` instead of `Queue()`.
See `Queues <processing-ref.html#pipes-and-queues>`_.
**Pipes**:
The `Pipe()` function returns a pair of connection objects
connected by a duplex (two-way) pipe, for example ::
connected by a pipe which by default is duplex (two-way). For
example ::

@@ -228,4 +224,10 @@ from processing import Process, Pipe

indicates a signed integer, `'d'` indicates a double precision
float and `'256p'` indicates a string of length less than 256.
float and `'256p'` indicates a string of length less than 256. See
`Shared memory managers <manager-objects.html#shared-memory-managers>`_.
A faster alternative to `LocalManager` is the `processing.sharedctypes`
module which supports the creation of `ctypes objects allocated from
shared memory <sharedctypes.html>`_. However, ctypes objects are not
"process safe", so one must synchronize access to them.
**Server process**:

@@ -277,3 +279,4 @@ A manager object returned by `processing.Manager()`

can be shared by different computers over a network. They are,
however, slower than using shared memory.
however, slower than using shared memory. See `Server process
managers <manager-objects.html#server-process-managers>`_.

@@ -364,4 +367,5 @@

============================== ========== ==========
list 6,000,000 4,800,000
shared memory array 120,000 110,000
list 6,400,000 5,100,000
ctypes shared array 3,600,000 3,100,000
LocalManager shared array 125,000 135,000
list managed by server 20,000 17,000

@@ -368,0 +372,0 @@ ============================== ========== ==========

@@ -12,3 +12,3 @@ <?xml version="1.0" encoding="utf-8" ?>

<div class="header">
<a class="reference" href="process-objects.html">Prev</a> &nbsp; &nbsp; &nbsp; &nbsp; <a class="reference" href="processing-ref.html">Up</a> &nbsp; &nbsp; &nbsp; &nbsp; <a class="reference" href="proxy-objects.html">Next</a>
<a class="reference" href="connection-objects.html">Prev</a> &nbsp; &nbsp; &nbsp; &nbsp; <a class="reference" href="processing-ref.html">Up</a> &nbsp; &nbsp; &nbsp; &nbsp; <a class="reference" href="proxy-objects.html">Next</a>
<hr class="header"/>

@@ -384,5 +384,5 @@ </div>

<hr class="footer" />
<a class="reference" href="process-objects.html">Prev</a> &nbsp; &nbsp; &nbsp; &nbsp; <a class="reference" href="processing-ref.html">Up</a> &nbsp; &nbsp; &nbsp; &nbsp; <a class="reference" href="proxy-objects.html">Next</a>
<a class="reference" href="connection-objects.html">Prev</a> &nbsp; &nbsp; &nbsp; &nbsp; <a class="reference" href="processing-ref.html">Up</a> &nbsp; &nbsp; &nbsp; &nbsp; <a class="reference" href="proxy-objects.html">Next</a>
</div>
</body>
</html>

@@ -408,5 +408,5 @@ .. include:: header.txt

.. _Prev: process-objects.html
.. _Prev: connection-objects.html
.. _Up: processing-ref.html
.. _Next: proxy-objects.html

@@ -12,3 +12,3 @@ <?xml version="1.0" encoding="utf-8" ?>

<div class="header">
<a class="reference" href="proxy-objects.html">Prev</a> &nbsp; &nbsp; &nbsp; &nbsp; <a class="reference" href="processing-ref.html">Up</a> &nbsp; &nbsp; &nbsp; &nbsp; <a class="reference" href="connection-ref.html">Next</a>
<a class="reference" href="proxy-objects.html">Prev</a> &nbsp; &nbsp; &nbsp; &nbsp; <a class="reference" href="processing-ref.html">Up</a> &nbsp; &nbsp; &nbsp; &nbsp; <a class="reference" href="sharedctypes.html">Next</a>
<hr class="header"/>

@@ -73,4 +73,4 @@ </div>

<p class="last">Also if <tt class="docutils literal"><span class="pre">chunksize</span></tt> is <tt class="docutils literal"><span class="pre">1</span></tt> then the <tt class="docutils literal"><span class="pre">next()</span></tt> method of the
iterator returned by the <tt class="docutils literal"><span class="pre">imap()</span></tt> method has a <tt class="docutils literal"><span class="pre">timeout</span></tt>
parameter: <tt class="docutils literal"><span class="pre">next(timeout)</span></tt> will raise
iterator returned by the <tt class="docutils literal"><span class="pre">imap()</span></tt> method has an optional
<tt class="docutils literal"><span class="pre">timeout</span></tt> parameter: <tt class="docutils literal"><span class="pre">next(timeout)</span></tt> will raise
<tt class="docutils literal"><span class="pre">processing.TimeoutError</span></tt> if the result cannot be returned

@@ -152,5 +152,5 @@ within <tt class="docutils literal"><span class="pre">timeout</span></tt> seconds.</p>

<hr class="footer" />
<a class="reference" href="proxy-objects.html">Prev</a> &nbsp; &nbsp; &nbsp; &nbsp; <a class="reference" href="processing-ref.html">Up</a> &nbsp; &nbsp; &nbsp; &nbsp; <a class="reference" href="connection-ref.html">Next</a>
<a class="reference" href="proxy-objects.html">Prev</a> &nbsp; &nbsp; &nbsp; &nbsp; <a class="reference" href="processing-ref.html">Up</a> &nbsp; &nbsp; &nbsp; &nbsp; <a class="reference" href="sharedctypes.html">Next</a>
</div>
</body>
</html>

@@ -152,2 +152,2 @@ .. include:: header.txt

.. _Up: processing-ref.html
.. _Next: connection-ref.html
.. _Next: sharedctypes.html

@@ -12,3 +12,3 @@ <?xml version="1.0" encoding="utf-8" ?>

<div class="header">
<a class="reference" href="processing-ref.html">Prev</a> &nbsp; &nbsp; &nbsp; &nbsp; <a class="reference" href="processing-ref.html">Up</a> &nbsp; &nbsp; &nbsp; &nbsp; <a class="reference" href="manager-objects.html">Next</a>
<a class="reference" href="processing-ref.html">Prev</a> &nbsp; &nbsp; &nbsp; &nbsp; <a class="reference" href="processing-ref.html">Up</a> &nbsp; &nbsp; &nbsp; &nbsp; <a class="reference" href="queue-objects.html">Next</a>
<hr class="header"/>

@@ -193,5 +193,5 @@ </div>

<hr class="footer" />
<a class="reference" href="processing-ref.html">Prev</a> &nbsp; &nbsp; &nbsp; &nbsp; <a class="reference" href="processing-ref.html">Up</a> &nbsp; &nbsp; &nbsp; &nbsp; <a class="reference" href="manager-objects.html">Next</a>
<a class="reference" href="processing-ref.html">Prev</a> &nbsp; &nbsp; &nbsp; &nbsp; <a class="reference" href="processing-ref.html">Up</a> &nbsp; &nbsp; &nbsp; &nbsp; <a class="reference" href="queue-objects.html">Next</a>
</div>
</body>
</html>

@@ -207,2 +207,2 @@ .. include:: header.txt

.. _Up: processing-ref.html
.. _Next: manager-objects.html
.. _Next: queue-objects.html

@@ -32,3 +32,3 @@ <?xml version="1.0" encoding="utf-8" ?>

<dd><p class="first">Exception raised by the <tt class="docutils literal"><span class="pre">recvbytes_into()</span></tt> method of a
<a class="reference" href="connection-ref.html#connection-objects">connection object</a>
<a class="reference" href="connection-objects.html">connection object</a>
when the supplied buffer object is too small for the message

@@ -55,112 +55,26 @@ read.</p>

<dl class="docutils">
<dt><tt class="docutils literal"><span class="pre">Pipe()</span></tt></dt>
<dd><p class="first">Returns a pair of connection objects representing the ends of
a duplex pipe.</p>
<p>These connection objects can be inherited by child processes
and have methods <tt class="docutils literal"><span class="pre">send()</span></tt> and <tt class="docutils literal"><span class="pre">recv()</span></tt> (among others) for
sending and receiving picklable objects. (See <a class="reference" href="connection-ref.html#connection-objects">Connection
objects</a>.) For
example:</p>
<pre class="literal-block">
&gt;&gt;&gt; from processing import Pipe
&gt;&gt;&gt; a, b = Pipe()
&gt;&gt;&gt; a.send([1, 'hello', None])
&gt;&gt;&gt; b.recv()
[1, 'hello', None]
&gt;&gt;&gt; b.sendbytes('thank you')
&gt;&gt;&gt; a.recvbytes()
'thank you'
</pre>
<p>Note that it is not safe to have more than one process (or
thread) reading or writing to the same end of a pipe at the
same time.</p>
<p>Also note that if a process is killed while it is trying to
read or write to a pipe then the data in the pipe is likely to
become corrupted (because it may become impossible to be sure
where the message boundaries lie).</p>
<p class="last">On Windows this requires the <tt class="docutils literal"><span class="pre">_processing</span></tt> extension.</p>
<dt><tt class="docutils literal"><span class="pre">Pipe(duplex=True)</span></tt></dt>
<dd><p class="first">Returns a pair <tt class="docutils literal"><span class="pre">(conn1,</span> <span class="pre">conn2)</span></tt> of connection objects
representing the ends of a pipe.</p>
<p>If <tt class="docutils literal"><span class="pre">duplex</span></tt> is true then the pipe is two way; otherwise
<tt class="docutils literal"><span class="pre">conn1</span></tt> can only be used for receiving messages and <tt class="docutils literal"><span class="pre">conn2</span></tt>
can only be used for sending messages.</p>
<p class="last">See <a class="reference" href="connection-objects.html">Connection objects</a>.</p>
</dd>
<dt><tt class="docutils literal"><span class="pre">Queue(maxsize=0)</span></tt></dt>
<dd><p class="first">Returns a process shared queue implemented using a pipe and a
few locks/semaphores. A background thread transfers objects
from a buffer into the pipe.</p>
<p>It is a near clone of <tt class="docutils literal"><span class="pre">Queue.Queue</span></tt> except that the <tt class="docutils literal"><span class="pre">qsize()</span></tt>
method is not implemented and that the <tt class="docutils literal"><span class="pre">task_done()</span></tt> and
<tt class="docutils literal"><span class="pre">join()</span></tt> methods introduced in Python 2.5 are also missing.</p>
<p><tt class="docutils literal"><span class="pre">Queue</span></tt> has a few additional methods:</p>
<blockquote>
<dl class="docutils">
<dt><tt class="docutils literal"><span class="pre">putmany(iterable)</span></tt></dt>
<dd>If the queue has infinite size then this adds all
items in the iterable to the queue's buffer. So
<tt class="docutils literal"><span class="pre">q.putmany(X)</span></tt> is a faster alternative to <tt class="docutils literal"><span class="pre">for</span> <span class="pre">x</span> <span class="pre">in</span> <span class="pre">X:</span>
<span class="pre">q.put(x)</span></tt>. Raises an error if the queue has finite
size.</dd>
<dt><tt class="docutils literal"><span class="pre">close()</span></tt></dt>
<dd>Indicates that no more data will be put on this queue
by the current process. The background thread will
quit once it has flushed all it data. This is called
automatically when the queue is garbage collected.</dd>
<dt><tt class="docutils literal"><span class="pre">jointhread()</span></tt></dt>
<dd><p class="first">This joins the background thread and can only be used
after <tt class="docutils literal"><span class="pre">close()</span></tt> has been called. This blocks until
the background thread exits, ensuring that all data in
the buffer has been flushed to the pipe.</p>
<p class="last">By default if a process is not the creator of the
queue then on exit it will attempt to join the queue's
background thread. The process can call
<tt class="docutils literal"><span class="pre">canceljoin()</span></tt> to prevent this behaviour.</p>
<dt><tt class="docutils literal"><span class="pre">Queue(maxsize=0)</span></tt>, <tt class="docutils literal"><span class="pre">SimpleQueue()</span></tt>, <tt class="docutils literal"><span class="pre">PosixQueue(maxsize=0)</span></tt></dt>
<dd><p class="first">These functions return a process shared queue implemented in
different ways. The usual <tt class="docutils literal"><span class="pre">Empty</span></tt> and <tt class="docutils literal"><span class="pre">Full</span></tt> exceptions from
the <tt class="docutils literal"><span class="pre">Queue</span></tt> module are raised to signal timeouts.</p>
<p class="last">See <a class="reference" href="queue-objects.html">Queue objects</a>.</p>
</dd>
<dt><tt class="docutils literal"><span class="pre">canceljoin()</span></tt></dt>
<dd>Prevents the background thread from being joined
automatically when the process exits. Unnecessary if
the current process created the queue.</dd>
</dl>
</blockquote>
<p class="last">Note that if a process is killed while it is trying to receive
or send to a queue then the data in the queue is likely to
become corrupted (because it may become impossible to be sure
where the message boundaries lie).</p>
</dd>
<dt><tt class="docutils literal"><span class="pre">SimpleQueue()</span></tt></dt>
<dd><p class="first">A simplified and faster alternative to <tt class="docutils literal"><span class="pre">Queue()</span></tt>. It is
really just a pipe protected by a couple of locks.</p>
<p>It has <tt class="docutils literal"><span class="pre">get()</span></tt> and <tt class="docutils literal"><span class="pre">put()</span></tt> methods but these do not have
<tt class="docutils literal"><span class="pre">block</span></tt> or <tt class="docutils literal"><span class="pre">timeout</span></tt> arguments. It also has an <tt class="docutils literal"><span class="pre">empty()</span></tt>
method.</p>
<p class="last"><strong>Warning</strong>: Unlike with <tt class="docutils literal"><span class="pre">Queue</span></tt> (when <tt class="docutils literal"><span class="pre">maxsize</span></tt> is zero)
using <tt class="docutils literal"><span class="pre">put()</span></tt> may block if the pipe's buffer does not have
sufficient space, so one must take care that no deadlocks are
possible.</p>
</dd>
<dt><tt class="docutils literal"><span class="pre">PosixQueue(maxsize=0,</span> <span class="pre">msgsize=0)</span></tt></dt>
<dd><p class="first">A faster alternative to <tt class="docutils literal"><span class="pre">Queue()</span></tt> which is available on Unix
systems which support Posix message queues.</p>
<p>However, posix queues have a maximum number of messages that
can occupy the queue at a given time, and each message (when
expressed as a pickled string) has a maximum length --- see
<tt class="docutils literal"><span class="pre">man</span> <span class="pre">7</span> <span class="pre">mq_overview</span></tt>.</p>
<p><tt class="docutils literal"><span class="pre">maxsize</span></tt> if specified determines the maximum number of items
that be in the queue. Note that unlike Pythons's normal queue
type if this is greater than a system defined maximum then an
error is raised. If <tt class="docutils literal"><span class="pre">maxsize</span></tt> is zero then this maximum value
is used.</p>
<p><tt class="docutils literal"><span class="pre">msgsize</span></tt> if specified determines the maximum length that each
message can be (when expressed as a pickled string). If this
is greater than a system defined maximum then an error is
raised. If <tt class="docutils literal"><span class="pre">msgsize</span></tt> is zero then this maximum value is used.</p>
<p class="last">If one tries to send a message which is too long then
<tt class="docutils literal"><span class="pre">ValueError</span></tt> will be raised.</p>
</dd>
</dl>
</blockquote>
</div>
<div class="section">
<h1><a id="synchronization-primitives" name="synchronization-primitives">Synchronization primitives</a></h1>
<p>Generally synchronization primitives are not a necessary in a
multiprocess program as they are in a mulithreaded program.</p>
<p>Generally synchronization primitives are not as necessary in a
multiprocess program as they are in a mulithreaded program. See the
documentation for the standard library's <tt class="docutils literal"><span class="pre">threading</span></tt> module.</p>
<p>Note that one can also create synchronization primitves by using a
manager object -- see <a class="reference" href="#managers">Managers</a>.</p>
<p>The following all require support for native sempahores from the
<tt class="docutils literal"><span class="pre">_processing</span></tt> extension.</p>
<blockquote>

@@ -204,10 +118,4 @@ <dl class="docutils">

<tt class="docutils literal"><span class="pre">SharedValue</span></tt>, <tt class="docutils literal"><span class="pre">SharedStruct</span></tt>, <tt class="docutils literal"><span class="pre">SharedArray</span></tt></blockquote>
<p>for creating objects stored in shared memory map. Also has
static methods</p>
<blockquote>
<tt class="docutils literal"><span class="pre">Lock</span></tt>, <tt class="docutils literal"><span class="pre">RLock</span></tt>, <tt class="docutils literal"><span class="pre">Semaphore</span></tt>, <tt class="docutils literal"><span class="pre">BoundedSemaphore</span></tt>,
<tt class="docutils literal"><span class="pre">Condition</span></tt>, <tt class="docutils literal"><span class="pre">Event</span></tt>, <tt class="docutils literal"><span class="pre">Queue</span></tt></blockquote>
<p>which are just aliases for other functions in the <tt class="docutils literal"><span class="pre">processing</span></tt>
namespace. See <a class="reference" href="manager-objects.html#shared-memory-managers">LocalManager</a>.</p>
<p class="last">Requires support for native semaphores from <tt class="docutils literal"><span class="pre">_processing</span></tt>.</p>
<p>for creating objects stored in shared memory map.</p>
<p class="last">See <a class="reference" href="manager-objects.html#shared-memory-managers">LocalManager</a>.</p>
</dd>

@@ -222,19 +130,7 @@ <dt><tt class="docutils literal"><span class="pre">Manager()</span></tt></dt>

<blockquote>
<tt class="docutils literal"><span class="pre">list()</span></tt>, <tt class="docutils literal"><span class="pre">dict()</span></tt>, <tt class="docutils literal"><span class="pre">Namespace()</span></tt>, <tt class="docutils literal"><span class="pre">SharedValue()</span></tt>,
<tt class="docutils literal"><span class="pre">SharedStruct()</span></tt>, <tt class="docutils literal"><span class="pre">SharedArray()</span></tt>, <tt class="docutils literal"><span class="pre">Lock()</span></tt>, <tt class="docutils literal"><span class="pre">RLock()</span></tt>,
<tt class="docutils literal"><span class="pre">Semaphore()</span></tt>, <tt class="docutils literal"><span class="pre">BoundedSemaphore()</span></tt>, <tt class="docutils literal"><span class="pre">Condition()</span></tt>,
<tt class="docutils literal"><span class="pre">Event()</span></tt>, <tt class="docutils literal"><span class="pre">Queue()</span></tt>.</blockquote>
<p>For example:</p>
<pre class="literal-block">
&gt;&gt;&gt; from processing import Manager
&gt;&gt;&gt; manager = Manager()
&gt;&gt;&gt; l = manager.list(range(10))
&gt;&gt;&gt; l.reverse()
&gt;&gt;&gt; print l
[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]
&gt;&gt;&gt; print repr(l)
&lt;Proxy[list] object at 0x00E1B3B0&gt;
</pre>
<p class="last">See <a class="reference" href="manager-objects.html#syncmanager">SyncManager</a> and
<a class="reference" href="proxy-objects.html">Proxy objects</a>.</p>
<tt class="docutils literal"><span class="pre">list()</span></tt>, <tt class="docutils literal"><span class="pre">dict()</span></tt>, <tt class="docutils literal"><span class="pre">Namespace()</span></tt>,
<tt class="docutils literal"><span class="pre">SharedValue()</span></tt>, <tt class="docutils literal"><span class="pre">SharedStruct()</span></tt>, <tt class="docutils literal"><span class="pre">SharedArray()</span></tt>,
<tt class="docutils literal"><span class="pre">Lock()</span></tt>, <tt class="docutils literal"><span class="pre">RLock()</span></tt>, <tt class="docutils literal"><span class="pre">Semaphore()</span></tt>, <tt class="docutils literal"><span class="pre">BoundedSemaphore()</span></tt>,
<tt class="docutils literal"><span class="pre">Condition()</span></tt>, <tt class="docutils literal"><span class="pre">Event()</span></tt>, <tt class="docutils literal"><span class="pre">Queue()</span></tt>.</blockquote>
<p class="last">See <a class="reference" href="manager-objects.html#sync-manager">SyncManager</a>.</p>
</dd>

@@ -256,18 +152,4 @@ </dl>

<p>If <tt class="docutils literal"><span class="pre">processes</span></tt> is <tt class="docutils literal"><span class="pre">None</span></tt> then the number returned by
<tt class="docutils literal"><span class="pre">cpuCount()</span></tt> is used. See <a class="reference" href="pool-objects.html">Pool objects</a>.</p>
<p>Example:</p>
<pre class="literal-block">
from processing import Pool
def f(x):
return x*x
if __name__ == '__main__':
pool = Pool(processes=2)
result1 = pool.apply_async(f, (10,))
result2 = pool.map_async(f, range(5))
print result1.get() # =&gt; &quot;100&quot;
print result2.get(timeout=1) # =&gt; &quot;[0, 1, 4, 9, 16]&quot;
</pre>
<p class="last">Requires support for native semaphores from <tt class="docutils literal"><span class="pre">_processing</span></tt>.</p>
<tt class="docutils literal"><span class="pre">cpuCount()</span></tt> is used.</p>
<p class="last">See <a class="reference" href="pool-objects.html">Pool objects</a>.</p>
</dd>

@@ -383,6 +265,9 @@ </dl>

<li><a class="reference" href="process-objects.html">Process objects</a></li>
<li><a class="reference" href="queue-objects.html">Queue objects</a></li>
<li><a class="reference" href="connection-objects.html">Connection objects</a></li>
<li><a class="reference" href="manager-objects.html">Manager objects</a></li>
<li><a class="reference" href="proxy-objects.html">Proxy objects</a></li>
<li><a class="reference" href="pool-objects.html">Pool objects</a></li>
<li><a class="reference" href="connection-ref.html">connection module</a></li>
<li><a class="reference" href="sharedctypes.html">Shared ctypes object</a></li>
<li><a class="reference" href="connection-ref.html">Listeners and Clients</a></li>
</ul>

@@ -389,0 +274,0 @@ </div>

@@ -25,3 +25,3 @@ .. include:: header.txt

Exception raised by the `recvbytes_into()` method of a
`connection object <connection-ref.html#connection-objects>`_
`connection object <connection-objects.html>`_
when the supplied buffer object is too small for the message

@@ -50,119 +50,26 @@ read.

`Pipe()`
Returns a pair of connection objects representing the ends of
a duplex pipe.
`Pipe(duplex=True)`
Returns a pair `(conn1, conn2)` of connection objects
representing the ends of a pipe.
These connection objects can be inherited by child processes
and have methods `send()` and `recv()` (among others) for
sending and receiving picklable objects. (See `Connection
objects <connection-ref.html#connection-objects>`_.) For
example::
If `duplex` is true then the pipe is two way; otherwise
`conn1` can only be used for receiving messages and `conn2`
can only be used for sending messages.
See `Connection objects <connection-objects.html>`_.
>>> from processing import Pipe
>>> a, b = Pipe()
>>> a.send([1, 'hello', None])
>>> b.recv()
[1, 'hello', None]
>>> b.sendbytes('thank you')
>>> a.recvbytes()
'thank you'
`Queue(maxsize=0)`, `SimpleQueue()`, `PosixQueue(maxsize=0)`
These functions return a process shared queue implemented in
different ways. The usual `Empty` and `Full` exceptions from
the `Queue` module are raised to signal timeouts.
Note that it is not safe to have more than one process (or
thread) reading or writing to the same end of a pipe at the
same time.
See `Queue objects <queue-objects.html>`_.
Also note that if a process is killed while it is trying to
read or write to a pipe then the data in the pipe is likely to
become corrupted (because it may become impossible to be sure
where the message boundaries lie).
On Windows this requires the `_processing` extension.
`Queue(maxsize=0)`
Returns a process shared queue implemented using a pipe and a
few locks/semaphores. A background thread transfers objects
from a buffer into the pipe.
It is a near clone of `Queue.Queue` except that the `qsize()`
method is not implemented and that the `task_done()` and
`join()` methods introduced in Python 2.5 are also missing.
`Queue` has a few additional methods:
`putmany(iterable)`
If the queue has infinite size then this adds all
items in the iterable to the queue's buffer. So
`q.putmany(X)` is a faster alternative to `for x in X:
q.put(x)`. Raises an error if the queue has finite
size.
`close()`
Indicates that no more data will be put on this queue
by the current process. The background thread will
quit once it has flushed all it data. This is called
automatically when the queue is garbage collected.
`jointhread()`
This joins the background thread and can only be used
after `close()` has been called. This blocks until
the background thread exits, ensuring that all data in
the buffer has been flushed to the pipe.
By default if a process is not the creator of the
queue then on exit it will attempt to join the queue's
background thread. The process can call
`canceljoin()` to prevent this behaviour.
`canceljoin()`
Prevents the background thread from being joined
automatically when the process exits. Unnecessary if
the current process created the queue.
Note that if a process is killed while it is trying to receive
or send to a queue then the data in the queue is likely to
become corrupted (because it may become impossible to be sure
where the message boundaries lie).
`SimpleQueue()`
A simplified and faster alternative to `Queue()`. It is
really just a pipe protected by a couple of locks.
It has `get()` and `put()` methods but these do not have
`block` or `timeout` arguments. It also has an `empty()`
method.
**Warning**: Unlike with `Queue` (when `maxsize` is zero)
using `put()` may block if the pipe's buffer does not have
sufficient space, so one must take care that no deadlocks are
possible.
`PosixQueue(maxsize=0, msgsize=0)`
A faster alternative to `Queue()` which is available on Unix
systems which support Posix message queues.
However, posix queues have a maximum number of messages that
can occupy the queue at a given time, and each message (when
expressed as a pickled string) has a maximum length --- see
`man 7 mq_overview`.
`maxsize` if specified determines the maximum number of items
that be in the queue. Note that unlike Pythons's normal queue
type if this is greater than a system defined maximum then an
error is raised. If `maxsize` is zero then this maximum value
is used.
`msgsize` if specified determines the maximum length that each
message can be (when expressed as a pickled string). If this
is greater than a system defined maximum then an error is
raised. If `msgsize` is zero then this maximum value is used.
If one tries to send a message which is too long then
`ValueError` will be raised.
Synchronization primitives
--------------------------
Generally synchronization primitives are not a necessary in a
multiprocess program as they are in a mulithreaded program.
Generally synchronization primitives are not as necessary in a
multiprocess program as they are in a mulithreaded program. See the
documentation for the standard library's `threading` module.

@@ -172,5 +79,2 @@ Note that one can also create synchronization primitves by using a

The following all require support for native sempahores from the
`_processing` extension.
`BoundedSemaphore(value=1)`

@@ -217,13 +121,6 @@ Returns a bounded semaphore object: a clone of

for creating objects stored in shared memory map. Also has
static methods
`Lock`, `RLock`, `Semaphore`, `BoundedSemaphore`,
`Condition`, `Event`, `Queue`
for creating objects stored in shared memory map.
which are just aliases for other functions in the `processing`
namespace. See `LocalManager
<manager-objects.html#shared-memory-managers>`_.
See `LocalManager <manager-objects.html#shared-memory-managers>`_.
Requires support for native semaphores from `_processing`.

@@ -239,22 +136,10 @@ `Manager()`

`list()`, `dict()`, `Namespace()`, `SharedValue()`,
`SharedStruct()`, `SharedArray()`, `Lock()`, `RLock()`,
`Semaphore()`, `BoundedSemaphore()`, `Condition()`,
`Event()`, `Queue()`.
`list()`, `dict()`, `Namespace()`,
`SharedValue()`, `SharedStruct()`, `SharedArray()`,
`Lock()`, `RLock()`, `Semaphore()`, `BoundedSemaphore()`,
`Condition()`, `Event()`, `Queue()`.
For example::
See `SyncManager <manager-objects.html#sync-manager>`_.
>>> from processing import Manager
>>> manager = Manager()
>>> l = manager.list(range(10))
>>> l.reverse()
>>> print l
[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]
>>> print repr(l)
<Proxy[list] object at 0x00E1B3B0>
See `SyncManager <manager-objects.html#syncmanager>`_ and
`Proxy objects`_.
Process Pools

@@ -274,20 +159,5 @@ -------------

If `processes` is `None` then the number returned by
`cpuCount()` is used. See `Pool objects
<pool-objects.html>`_.
`cpuCount()` is used.
Example::
from processing import Pool
def f(x):
return x*x
if __name__ == '__main__':
pool = Pool(processes=2)
result1 = pool.apply_async(f, (10,))
result2 = pool.map_async(f, range(5))
print result1.get() # => "100"
print result2.get(timeout=1) # => "[0, 1, 4, 9, 16]"
Requires support for native semaphores from `_processing`.
See `Pool objects <pool-objects.html>`_.

@@ -403,11 +273,13 @@

* `Process objects <process-objects.html>`_
* `Manager objects <manager-objects.html>`_
* `Proxy objects <proxy-objects.html>`_
* `Pool objects <pool-objects.html>`_
* `connection module <connection-ref.html>`_
+ `Process objects <process-objects.html>`_
+ `Queue objects <queue-objects.html>`_
+ `Connection objects <connection-objects.html>`_
+ `Manager objects <manager-objects.html>`_
+ `Proxy objects <proxy-objects.html>`_
+ `Pool objects <pool-objects.html>`_
+ `Shared ctypes object <sharedctypes.html>`_
+ `Listeners and Clients <connection-ref.html>`_
.. _Prev: intro.html
.. _Up: index.html
.. _Next: process-objects.html

@@ -45,7 +45,2 @@ <?xml version="1.0" encoding="utf-8" ?>

<tt class="docutils literal"><span class="pre">ProcessExit</span></tt> is a subclass of <tt class="docutils literal"><span class="pre">SystemExit</span></tt>.</dd>
<dt><em>Not letting the manager die early</em></dt>
<dd>When the main process terminates any manager process it has
created will also terminate. Make sure that no child process
still needs the manager when this happens --- the easiest way is
just to join all child processes you create.</dd>
<dt><em>Joining zombie processes</em></dt>

@@ -56,15 +51,46 @@ <dd>On Unix when a process finishes but has not been joined it becomes

processes which have not yet been joined will be joined. Also
calling a finished process's <tt class="docutils literal"><span class="pre">isAlive()</span></tt> will join the process
will already have been joined. Even so it is probably good
practice to explicitly join all the processes that you start.</dd>
calling a finished process's <tt class="docutils literal"><span class="pre">isAlive()</span></tt> will join the process.
Even so it is probably good practice to explicitly join all the
processes that you start.</dd>
<dt><em>Better to inherit than pickle/unpickle</em></dt>
<dd><p class="first">On Windows many of types from the <tt class="docutils literal"><span class="pre">processing</span></tt> package need to be
picklable so that child processes can use them. However, on Unix
the following types are not picklable:</p>
<blockquote>
<tt class="docutils literal"><span class="pre">Lock</span></tt>, <tt class="docutils literal"><span class="pre">RLock</span></tt>, <tt class="docutils literal"><span class="pre">Semaphore</span></tt>, <tt class="docutils literal"><span class="pre">BoundedSemaphore</span></tt>,
<tt class="docutils literal"><span class="pre">Condition</span></tt>, <tt class="docutils literal"><span class="pre">Event</span></tt>, <tt class="docutils literal"><span class="pre">Queue</span></tt>, <tt class="docutils literal"><span class="pre">SharedValue</span></tt>,
<tt class="docutils literal"><span class="pre">SharedStruct</span></tt>, <tt class="docutils literal"><span class="pre">SharedArray</span></tt>.</blockquote>
<p class="last">For the sake of compatibility it is better not to rely on these
types being picklable.</p>
<dd>On Windows many of types from the <tt class="docutils literal"><span class="pre">processing</span></tt> package need to be
picklable so that child processes can use them. However, one
should generally avoid sending shared objects to other processes
using pipes or queues. Instead you should arrange the program so
that a process which need access to a shared resource created
elsewhere can inherit it from an ancestor process.</dd>
<dt><em>Explicity pass resources to child processes</em></dt>
<dd><p class="first">On Unix a child process can make use of a shared resource created
in a parent process using a global resource. However, it is
better to pass the object as an argument to the constructor for
the child process.</p>
<p>Apart from making the code (potentially) compatible with Windows
this also ensures that as long as the child process is still alive
the object will not be garbage collected in the parent process.
This might be important if some resource is freed when the object
is garbage collected in the parent process.</p>
<p>So for instance</p>
<pre class="literal-block">
from processing import Process, Lock
def f():
... do something using &quot;lock&quot; ...
if __name__ == '__main__':
lock = Lock()
for i in range(10):
Process(target=f).start()
</pre>
<p>should be rewritten as</p>
<pre class="last literal-block">
from processing import Process, Lock
def f(l):
... do something using &quot;l&quot; ...
if __name__ == '__main__':
lock = Lock()
for i in range(10):
Process(target=f, args=[lock]).start()
</pre>
</dd>

@@ -71,0 +97,0 @@ </dl>

@@ -42,8 +42,2 @@ .. include:: header.txt

*Not letting the manager die early*
When the main process terminates any manager process it has
created will also terminate. Make sure that no child process
still needs the manager when this happens --- the easiest way is
just to join all child processes you create.
*Joining zombie processes*

@@ -54,19 +48,51 @@ On Unix when a process finishes but has not been joined it becomes

processes which have not yet been joined will be joined. Also
calling a finished process's `isAlive()` will join the process
will already have been joined. Even so it is probably good
practice to explicitly join all the processes that you start.
calling a finished process's `isAlive()` will join the process.
Even so it is probably good practice to explicitly join all the
processes that you start.
*Better to inherit than pickle/unpickle*
On Windows many of types from the `processing` package need to be
picklable so that child processes can use them. However, on Unix
the following types are not picklable:
picklable so that child processes can use them. However, one
should generally avoid sending shared objects to other processes
using pipes or queues. Instead you should arrange the program so
that a process which need access to a shared resource created
elsewhere can inherit it from an ancestor process.
`Lock`, `RLock`, `Semaphore`, `BoundedSemaphore`,
`Condition`, `Event`, `Queue`, `SharedValue`,
`SharedStruct`, `SharedArray`.
*Explicity pass resources to child processes*
On Unix a child process can make use of a shared resource created
in a parent process using a global resource. However, it is
better to pass the object as an argument to the constructor for
the child process.
For the sake of compatibility it is better not to rely on these
types being picklable.
Apart from making the code (potentially) compatible with Windows
this also ensures that as long as the child process is still alive
the object will not be garbage collected in the parent process.
This might be important if some resource is freed when the object
is garbage collected in the parent process.
So for instance ::
from processing import Process, Lock
def f():
... do something using "lock" ...
if __name__ == '__main__':
lock = Lock()
for i in range(10):
Process(target=f).start()
should be rewritten as ::
from processing import Process, Lock
def f(l):
... do something using "l" ...
if __name__ == '__main__':
lock = Lock()
for i in range(10):
Process(target=f, args=[lock]).start()
Windows

@@ -73,0 +99,0 @@ -------

@@ -62,5 +62,2 @@ <?xml version="1.0" encoding="utf-8" ?>

<dd>Some simple benchmarks comparing <tt class="docutils literal"><span class="pre">processing</span></tt> with <tt class="docutils literal"><span class="pre">threading</span></tt>.</dd>
<dt><a class="reference" href="../test/test_with.py">test_with.py</a></dt>
<dd>The same as <tt class="docutils literal"><span class="pre">test_processing</span></tt> but uses Python 2.5's <tt class="docutils literal"><span class="pre">with</span></tt>
statement.</dd>
</dl>

@@ -67,0 +64,0 @@ </blockquote>

@@ -62,6 +62,2 @@ .. include:: header.txt

`test_with.py <../test/test_with.py>`_
The same as `test_processing` but uses Python 2.5's `with`
statement.
All the modules in the `test` folder use `freezeSupport()` so frozen

@@ -68,0 +64,0 @@ executables can be produced from them by using `py2exe`, `PyInstaller`

@@ -24,2 +24,4 @@ <?xml version="1.0" encoding="utf-8" ?>

</tr>
<tr><th class="docinfo-name">Version:</th>
<td>0.39</td></tr>
<tr class="field"><th class="docinfo-name">Licence:</th><td class="field-body">BSD Licence</td>

@@ -32,6 +34,13 @@ </tr>

<tt class="docutils literal"><span class="pre">threading</span></tt> module. It runs on both Unix and Windows.</p>
<p>Objects can be transferred between processes using pipes or queues,
and objects can be shared between processes using a server process or
(for simple data) shared memory. Equivalents of the synchronization
primitives in <tt class="docutils literal"><span class="pre">threading</span></tt> are also provided.</p>
<p><em>Features</em>:</p>
<ul class="simple">
<li>Objects can be transferred between processes using pipes or
multi-producer/multi-consumer queues.</li>
<li>Objects can be shared between processes using a server process or
(for simple data) shared memory.</li>
<li>Equivalents of all the synchronization primitives in <tt class="docutils literal"><span class="pre">threading</span></tt>
are available.</li>
<li>A <tt class="docutils literal"><span class="pre">Pool</span></tt> class makes it easy to submit tasks to a pool of worker
processes.</li>
</ul>
<div class="section">

@@ -97,2 +106,13 @@ <h1><a id="links" name="links">Links</a></h1>

</pre>
<p>Tasks can be offloaded to a pool of worker processes in various ways,
for example</p>
<pre class="literal-block">
&gt;&gt;&gt; from processing import Pool
&gt;&gt;&gt; def f(x): return x*x
...
&gt;&gt;&gt; p = Pool(4)
&gt;&gt;&gt; result = p.map_async(f, range(10))
&gt;&gt;&gt; print result.get(timeout=1)
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
</pre>
<a href="http://developer.berlios.de" title="BerliOS Developer">

@@ -99,0 +119,0 @@ <img src="http://developer.berlios.de/bslogo.php?group_id=9001"

@@ -0,1 +1,10 @@

#
# Module providing the `LocalManager` class for dealing
# with shared objects in shared memory
#
# processing/localmanager.py
#
# Copyright (c) 2006, 2007, R Oudkerk --- see COPYING.txt
#
import mmap

@@ -6,6 +15,6 @@ import re

import tempfile
import array
from processing import synchronize, process, queue, _processing
from processing import synchronize, process, queue, heap, _processing
from struct import pack as _pack, unpack as _unpack, calcsize as _calcsize
from array import array

@@ -15,218 +24,12 @@ __all__ = [ 'LocalManager' ]

#
# Heap implementation
#
#
class Heap(object):
'''
Class for allocating and freeing memory from an mmap
'''
def __init__(self, size, lock=None):
fd, self._name = tempfile.mkstemp(prefix='pym-')
self._size = remaining = size
while remaining > 0:
remaining -= os.write(fd, '\0' * remaining)
self._lock = lock or RLock()
self._mmap = mmap.mmap(fd, self._size)
os.close(fd)
self._set_pos(8)
try:
from struct import Struct
from functools import partial
_struct_cache = {}
except ImportError:
Struct = None
if sys.platform in ('win32', 'cygwin'):
process.Finalize(self, Heap._finalize_heap,
args=[self._mmap, self._name], atexit=True)
else:
os.unlink(self._name)
def __getstate__(self):
if sys.platform != 'win32':
raise NotImplemented
return self._size, self._name, self._lock
def __setstate__(self, state):
self._size, self._name, self._lock = state
fd = os.open(self._name, os.O_RDWR, 0600)
self._mmap = mmap.mmap(fd, self._size)
os.close(fd)
@staticmethod
def _finalize_heap(mmap, name):
import os
mmap.close()
try:
os.unlink(name)
except WindowsError:
process.info('failed to unlink %r -- try again later', name)
process._trylater.append((os.unlink, [name]))
def malloc(self, bytes):
'''
Allocate chunk of memory of given size
'''
bytes += 4 # extra room for header info
bytes = ((bytes + 3) // 4) * 4 # round up to multiple of 4
self._lock.acquire()
try:
offset = self._malloc(bytes)
return offset + 4
finally:
self._lock.release()
def free(self, offset):
'''
Free a chunk of memory previously allocated by malloc
'''
self._lock.acquire()
try:
self._free(offset - 4)
finally:
self._lock.release()
def _malloc(self, bytes):
'''
Allocate chunk of memory -- `bytes` must be multiple of 4.
First 4 bytes are occupied by header info so bytes must be at least 4
'''
# resize mmap if the file has been enlarged by other process
if self._size < self._mmap.size():
self._mmap.resize(self._mmap.size())
self._size = len(self._mmap)
start = stop = None
# iterate over empty chunks
for start, stop in self._get_empty_chunks():
# if chunk is large enough
if stop - start >= bytes:
# write header for the chunk we are creating
self._set_chunk_info(start, start + bytes, True)
# write header for following empty chunk if necessary
if start + bytes < stop:
self._set_chunk_info(start + bytes, stop, False)
# update position of last empty chunk
if start == self._get_pos():
try:
pos, _ = self._get_empty_chunks().next()
except StopIteration:
pos = self._size
self._set_pos(pos)
return start
# get position of memory just after last occupied chunk
if stop == self._size:
pos = start
else:
pos = self._size
# work out how much space we need and resize mmap
while pos + bytes > self._size:
self._size *= 2
if self._size > len(self._mmap):
self._mmap.resize(self._size)
# mark chunk as occupied and update position of last empty chunk
self._set_chunk_info(pos, pos + bytes, True)
if pos == self._get_pos():
self._set_pos(pos + bytes)
return pos
def _free(self, start):
'''
Free chunk of memory -- must have been allocated by _malloc
'''
stop, occ = self._get_chunk_info(start)
assert occ
# mark chunk as not occupied
self._mmap[start:stop] = '\0' * (stop - start)
self._set_chunk_info(start, stop, False)
# if this chunk is before previous first empty chunk
if self._get_pos() > start:
# record start as position of first empty chunk
self._set_pos(start)
def _get_chunks(self, start=8):
'''
Iterator over chunks of memory.
Returns (start, stop, occupied) tuples.
'''
while start < self._size:
stop, occ = self._get_chunk_info(start)
yield start, stop, occ
start = stop
def _get_empty_chunks(self):
'''
Iterator over empty chunks of memory.
When possible adjacent empty chunks are merged.
Returns (start, stop) pairs.
'''
pos = self._get_pos()
it = self._get_chunks(pos)
for start, stop, occ in it:
if not occ:
merge = False
for nstart, nstop, occ in it:
if occ:
break
stop = nstart = nstop
merge = True
if merge:
self._mmap[start:stop] = '\0' * (stop - start)
self._set_chunk_info(start, stop, False)
yield start, stop
def _get_pos(self):
'''
Return offset of first empty chunk of memory (or self._size if none)
'''
return _unpack('i', self._mmap[0:4])[0]
def _set_pos(self, pos):
'''
Set offset of first empty chunk of memory
'''
self._mmap[0:4] = _pack('i', pos)
def _get_chunk_info(self, start):
'''
Get info at beginning of a chunk of memory starting at start
Returns (stop, isoccupied) pair
'''
info = _unpack('i', self._mmap[start:start+4])[0]
stop = abs(info)
isoccupied = info > 0
if stop == 0:
assert not isoccupied
return self._size, False
return stop, isoccupied
def _set_chunk_info(self, start, stop, occ):
'''
Set info at begining of a chunk of memory
'''
assert stop > 0
if not occ:
stop = -stop
self._mmap[start:start+4] = _pack('i', stop)
def _dump(self):
'''
Print hex dump of mmap
'''
assert len(self._mmap) % 4 == 0
for i in range(len(self._mmap) // 4):
if i % 8 == 0:
print
print '%08x' % _unpack('I', self._mmap[4*i:4*i+4]),
print
#

@@ -237,30 +40,35 @@ # Class for a struct which lives in shared memory

class SharedStruct(object):
def __init__(self, format, value, lock):
wrapper = heap.BufferWrapper(_calcsize(format))
self.__setstate__((wrapper, format, lock))
self.set(value)
def __init__(self, heap, format, value, _lock=None, _start=None):
self._heap = heap
self._format = format
size = _calcsize(format)
if _start is None:
self._start = heap.malloc(size)
assert format is not None
else:
self._start = _start
self._stop = self._start + size
self._lock = _lock or self._heap._lock
def __getstate__(self):
assert sys.platform == 'win32'
return self._wrapper, self._format, self._lock
def __setstate__(self, state):
self._wrapper, self._format, self._lock = state
self._buffer = self._wrapper.getview()
self._acquire = self._lock.acquire
self._release = self._lock.release
self._mmap = self._heap._mmap
if Struct:
try:
s = _struct_cache[self._format]
except KeyError:
s = Struct(self._format)
self._get = partial(s.unpack_from, self._buffer)
self._set = partial(s.pack_into, self._buffer, 0)
if _start is None:
self.set(value)
process.Finalize(
self, self._heap.free, args=[self._start], atexit=True
)
def _get(self):
return _unpack(self._format, self._buffer[:])
def _set(self, *args):
self._buffer[:] = _pack(self._format, *args)
def get(self):
self._acquire()
try:
return _unpack(self._format, self._mmap[self._start:self._stop])
return self._get()
finally:

@@ -272,3 +80,3 @@ self._release()

try:
self._mmap[self._start:self._stop] = _pack(self._format, *value)
self._set(*value)
finally:

@@ -282,8 +90,2 @@ self._release()

if sys.platform == 'win32':
def __reduce__(self):
return type(self), (self._heap, self._format, None,
self._lock, self._start)
#

@@ -295,10 +97,6 @@ # Class for a length 1 struct which lives in shared memory

def __init__(self, heap, format, value, _lock=None, _start=None):
assert len(_unpack(format, '\0' * _calcsize(format))) == 1
SharedStruct.__init__(self, heap, format, value, _lock, _start)
def get(self):
self._acquire()
try:
return _unpack(self._format, self._mmap[self._start:self._stop])[0]
return self._get()[0]
finally:

@@ -310,3 +108,3 @@ self._release()

try:
self._mmap[self._start:self._stop] = _pack(self._format, value)
self._set(value)
finally:

@@ -322,43 +120,30 @@ self._release()

class SharedArray(object):
def __init__(self, format, sequence, lock):
wrapper = heap.BufferWrapper(_calcsize(format) * len(sequence))
self.__setstate__((wrapper, format, lock))
self[:] = sequence
def __init__(self, heap, format, sequence,
_lock=None, _start=None, _length=None):
self._heap = heap
self._format = format
self._itemsize = _calcsize(format)
def __getstate__(self):
assert sys.platform == 'win32'
return self._wrapper, self._format, self._lock
if _start is None:
sequence = tuple(sequence)
self._length = len(sequence)
self._start = heap.malloc(self._itemsize * self._length)
else:
self._start = _start
self._length = _length
self._stop = self._start + self._itemsize * self._length
self._lock = _lock or self._heap._lock
def __setstate__(self, state):
self._wrapper, self._format, self._lock = state
self._buffer = self._wrapper.getview()
self._acquire = self._lock.acquire
self._release = self._lock.release
self._mmap = self._heap._mmap
self._itemsize = _calcsize(self._format)
self._length, rem = divmod(len(self._buffer), self._itemsize)
assert rem == 0
if _start is None:
self[:] = sequence
process.Finalize(
self, self._heap.free, args=[self._start], atexit=True
)
def __len__(self):
return self._length
def __iter__(self):
return iter(array.array(self._format,
self._mmap[self._start:self._stop]))
def __getitem__(self, i):
self._acquire()
try:
assert 0 <= i < self._length
t = self._start + self._itemsize * i
return _unpack(self._format, self._mmap[t:t+self._itemsize])[0]
a = i * self._itemsize
b = a + self._itemsize
return _unpack(self._format, self._buffer[a:b])[0]
finally:

@@ -370,5 +155,5 @@ self._release()

try:
assert 0 <= i < self._length
t = self._start + self._itemsize * i
self._mmap[t:t+self._itemsize] = _pack(self._format, value)
a = i * self._itemsize
b = a + self._itemsize
self._buffer[a:b] = _pack(self._format, value)
finally:

@@ -380,6 +165,5 @@ self._release()

try:
at = self._start + self._itemsize * a
bt = self._start + self._itemsize * b
bt = min(bt, self._stop)
return array.array(self._format, self._mmap[at:bt])
at = self._itemsize * a
bt = self._itemsize * b
return array(self._format, self._buffer[at:bt])
finally:

@@ -391,6 +175,5 @@ self._release()

try:
at = self._start + self._itemsize * a
bt = self._start + self._itemsize * b
bt = min(bt, self._stop)
self._mmap[at:bt] = array.array(self._format, seq).tostring()
at = self._itemsize * a
bt = self._itemsize * b
self._buffer[at:bt] = array(self._format, seq).tostring()
finally:

@@ -402,3 +185,3 @@ self._release()

try:
return self._mmap[self._start:self._stop]
return self._buffer[:]
finally:

@@ -410,7 +193,6 @@ self._release()

try:
s = self._mmap[self._start:self._stop]
arr = array.array(self._format, s)
return list(arr)
arr = array(self._format, self._buffer[:])
finally:
self._release()
return list(arr)

@@ -425,8 +207,2 @@ def __repr__(self):

if sys.platform == 'win32':
def __reduce__(self):
return type(self), (self._heap, self._format, None,
self._lock, self._start, self._length)
#

@@ -438,5 +214,4 @@ # LocalManager

def __init__(self, size=256):
def __init__(self):
self._lock = synchronize.RLock()
self._size = size

@@ -469,19 +244,10 @@ def start(self):

def _getheap(self):
self._lock.acquire()
try:
if not hasattr(self, '_heap'):
self._heap = Heap(size=self._size, lock=self._lock)
return self._heap
finally:
self._lock.release()
def SharedValue(self, format, value):
return SharedValue(self._getheap(), format, value)
return SharedValue(format, value, self._lock)
def SharedStruct(self, format, value):
return SharedStruct(self._getheap(), format, value)
return SharedStruct(format, value, self._lock)
def SharedArray(self, format, sequence):
return SharedArray(self._getheap(), format, sequence)
return SharedArray(format, sequence, self._lock)

@@ -492,54 +258,26 @@ #

def test0():
m = LocalManager()
a = []
for i in range(10):
a.append(m.SharedValue('i', i))
print list(m._heap._get_empty_chunks())
del a[3:8]
print list(m._heap._get_empty_chunks())
for i in range(100):
a.append(m.SharedValue('i', 0x100 + i))
print list(m._heap._get_empty_chunks())
m._heap._dump()
def _test(x, y, z):
x.value = 42
y.value = (1729, 3.1415927)
for i in range(len(z)):
z[i] *= 2
def test():
'''
Randomly create and delete shared arrays of chars
from processing import Process
Check no corruption caused
'''
import random
m = LocalManager()
manager = LocalManager()
x = m.SharedValue('i', 0)
y = m.SharedStruct('id', (0, 0))
z = m.SharedArray('d', range(10))
L = []
maxcount = 10
p = Process(target=_test, args=(x, y, z))
p.start()
p.join()
for k in range(1000):
i = random.randrange(100)
arr = ''.join([chr(j) for j in range(i)])
sarr = manager.SharedArray('c', arr)
L.append((arr, sarr))
if len(L) > maxcount:
j = random.randrange(maxcount)
del L[j]
print 'Heap size =', manager._heap._size
print 'Empty chunks =', list(manager._getheap()._get_empty_chunks())
print x
print y
print z
for arr, sarr in L:
assert arr == ''.join(sarr)
print 'Tests passed'
if __name__ == '__main__':
test()

@@ -18,4 +18,3 @@ #

import threading, os, sys, weakref, traceback, array, copy_reg, cPickle
import process
from processing import process
from processing.connection import Listener, Client, AuthenticationError

@@ -384,6 +383,10 @@ from processing.connection import deliver_challenge, answer_challenge

info('manager received shutdown message')
Finalize._run_all_finalizers()
Finalize._run_all_finalizers(0)
for p in activeChildren():
debug('terminating a child process of manager')
p.terminate()
for p in activeChildren():
debug('terminating a child process of manager')
p.join()
Finalize._run_all_finalizers()
info('process exiting with `os.exit(0)`')

@@ -477,6 +480,6 @@ os._exit(0)

self._process.setAuthKey(self._authkey)
try:
self._process.setDaemon(True)
except AssertionError:
pass
## try:
## self._process.setDaemon(True)
## except AssertionError:
## pass
self._process.start()

@@ -483,0 +486,0 @@

Metadata-Version: 1.0
Name: processing
Version: 0.38
Version: 0.39
Summary: Package for using processes mimicking the `threading` module

@@ -9,13 +9,21 @@ Home-page: http://developer.berlios.de/projects/pyprocessing

License: BSD Licence
Description:
``processing`` is a package for the Python language which supports the
Description: ``processing`` is a package for the Python language which supports the
spawning of processes using the API of the standard library's
``threading`` module. It runs on both Unix and Windows.
Objects can be transferred between processes using pipes or queues,
and objects can be shared between processes using a server process or
(for simple data) shared memory. Equivalents of the synchronization
primitives in ``threading`` are also provided.
*Features*:
* Objects can be transferred between processes using pipes or
multi-producer/multi-consumer queues.
* Objects can be shared between processes using a server process or
(for simple data) shared memory.
* Equivalents of all the synchronization primitives in ``threading``
are available.
* A ``Pool`` class makes it easy to submit tasks to a pool of worker
processes.
Links

@@ -82,4 +90,15 @@ =====

Tasks can be offloaded to a pool of worker processes in various ways,
for example ::
>>> from processing import Pool
>>> def f(x): return x*x
...
>>> p = Pool(4)
>>> result = p.map_async(f, range(10))
>>> print result.get(timeout=1)
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
Platform: Unix and Windows

@@ -86,0 +105,0 @@ Classifier: Development Status :: 4 - Beta

+46
-25

@@ -20,4 +20,5 @@ #

import collections
import time
from processing.process import debug
from processing.process import debug, _sleep_until_neq

@@ -58,3 +59,3 @@ #

debug('worker got None -- exiting')
debug('worker got sentinel -- exiting')

@@ -111,5 +112,6 @@ #

self, Pool._terminate_pool,
args=[self._taskqueue, self._outqueue, self._cache, self._pool,
self._task_handler, self._result_handler],
atexit=True
args=[self._taskqueue, self._inqueue, self._outqueue,
self._cache, self._pool, self._task_handler,
self._result_handler],
atexit=True, priority=5
)

@@ -201,21 +203,25 @@

for taskseq, setlength in iter(taskqueue.get, None):
if thread._state:
debug('task handler found thread._state != RUN -- exiting')
return
i = -1
for i, task in enumerate(taskseq):
if thread._state:
debug('task handler found thread._state != RUN')
break
put(task)
if setlength:
setlength(i+1)
else:
if setlength:
debug('doing setlength()')
setlength(i+1)
continue
break
else:
debug('task handler got None')
debug('task handler got sentinel')
# tell result handler to finish when cache is empty
outqueue.put(None)
# tell workers there is no more work
debug('task handler sending None to workers')
debug('task handler sending sentinel to workers')
for p in pool:
put(None)
# tell result handler to finish when cache is empty
outqueue.put(None)
debug('task handler exiting')

@@ -238,8 +244,8 @@

else:
debug('result handler got None')
debug('result handler got sentinel')
while cache and thread._state == RUN:
while cache and thread._state != TERMINATE:
item = get()
if item is None:
debug('result handler ignoring None')
debug('result handler ignoring extra sentinel')
job, i, obj = item

@@ -276,2 +282,4 @@ try:

shutdown = terminate # depracated alias
def join(self):

@@ -285,6 +293,4 @@ debug('joining pool')

shutdown = terminate # depracated alias
@staticmethod
def _terminate_pool(taskqueue, outqueue, cache, pool,
def _terminate_pool(taskqueue, inqueue, outqueue, cache, pool,
task_handler, result_handler):

@@ -300,12 +306,27 @@ debug('finalizing pool')

result_handler._state = TERMINATE
debug('sending sentinels')
taskqueue.put(None)
outqueue.put(None)
debug('getting read lock on inqueue')
inqueue._rlock.acquire()
debug('terminating pool workers')
debug('terminating workers')
for p in pool:
if p.isAlive():
p.terminate()
p.terminate()
if task_handler.isAlive():
debug('removing tasks from inqueue until task handler finished')
while task_handler.isAlive() and inqueue._reader.poll():
inqueue._reader.recv()
time.sleep(0)
debug('joining result handler')
result_handler.join()
debug('joining task handler')
task_handler.join()
result_handler.join()
debug('joining pool workers')
for p in pool:
p.join()

@@ -312,0 +333,0 @@ #

@@ -56,3 +56,2 @@ #

def activeChildren():

@@ -111,4 +110,4 @@ '''

self._target = target
self._args = args
self._kwargs = kwargs
self._args = tuple(args)
self._kwargs = kwargs.copy()
self._stoppable = False

@@ -668,3 +667,3 @@ self._parent_pid = os.getpid()

@staticmethod
def _run_all_finalizers():
def _run_all_finalizers(minpriority=None):
'''

@@ -674,14 +673,17 @@ Run remaining callbacks (in reverse order of registration)

Finalize._exiting = True
L = Finalize._registry.keys()
L.sort(key=lambda f: f._orderkey, reverse=True)
if minpriority is None:
minkey = (-sys.maxint, 0)
else:
minkey = (minpriority, 0)
for finalizer in L:
if finalizer._atexit:
try:
finalizer()
except Exception:
import traceback
traceback.print_exc()
finalizers = [f for f in Finalize._registry.keys()
if f._atexit and f._orderkey >= minkey]
finalizers.sort(key=lambda f: f._orderkey, reverse=True)
_trylater = []
for finalizer in finalizers:
try:
finalizer()
except Exception:
import traceback
traceback.print_exc()

@@ -693,4 +695,4 @@ #

def _exit_func():
info('running all "atexit" finalizers')
Finalize._run_all_finalizers()
info('running all "atexit" finalizers with priority >= 0')
Finalize._run_all_finalizers(0)

@@ -718,8 +720,5 @@ for p in activeChildren():

for (func, args) in _trylater:
try:
func(*args)
except:
info('failed to do "trylater" call: %s', (func, args))
info('running all "atexit" finalizers with priority < 0')
Finalize._run_all_finalizers()
atexit.register(_exit_func)
+55
-58

@@ -22,2 +22,3 @@ #

from processing.process import debug, Finalize
from processing.connection import Pipe
from Queue import Empty, Full

@@ -39,13 +40,3 @@

def __init__(self, maxsize=0):
if sys.platform == 'win32':
from processing.connection import Listener, Client
l = Listener()
reader = Client(l.address)
writer = l.accept()
else:
rfd, wfd = os.pipe()
reader = _processing.SocketConnection(rfd)
writer = _processing.SocketConnection(wfd)
os.close(rfd), os.close(wfd)
reader, writer = Pipe(duplex=False)
rlock = Lock()

@@ -73,2 +64,3 @@ wlock = Lock()

self._thread = None
self._joincancelled = False

@@ -162,5 +154,5 @@ def __getstate__(self):

def close(self):
self._closed = True
if self._close:
self._close()
self._closed = True

@@ -171,5 +163,9 @@ def jointhread(self):

self._jointhread()
def canceljoin(self):
self._jointhread.cancel()
self._joincancelled = True
try:
self._jointhread.cancel()
except AttributeError:
pass

@@ -185,2 +181,16 @@ def _startthread(self):

# On process exit we will wait for data to be flushed to pipe.
#
# However, if this process created the queue then all
# processes which use the queue will be descendants of this
# process. Therefore waiting for the queue to be flushed
# is pointless once all the child processes have been joined.
created_by_this_process = (self._opid == os.getpid())
if not self._joincancelled and not created_by_this_process:
self._jointhread = Finalize(
self._thread, Queue._finalize_join,
[weakref.ref(self._thread)],
priority=-5, atexit=True
)
# Send sentinel to the thread queue object is garbage collected

@@ -193,13 +203,2 @@ self._close = Finalize(

# Flush data to pipe
self._jointhread = Finalize(
self._thread, Queue._finalize_join,
[weakref.ref(self._thread)],
atexit=True
)
# Don't bother joining on exit if this process created the queue
if self._opid == os.getpid():
self._jointhread.cancel()
@staticmethod

@@ -228,23 +227,32 @@ def _finalize_join(twr):

debug('starting thread to feed data to pipe')
while 1:
notempty.acquire()
try:
if not buffer:
notempty.wait()
finally:
notempty.release()
try:
while 1:
obj = buffer.popleft()
if obj is _sentinel:
debug('feeder thread got sentinel -- exiting')
return
try:
while 1:
notempty.acquire()
try:
if not buffer:
notempty.wait()
finally:
notempty.release()
try:
while 1:
obj = buffer.popleft()
if obj is _sentinel:
debug('feeder thread got sentinel -- exiting')
return
writelock.acquire()
try:
send(obj)
finally:
writelock.release()
except IndexError:
pass
writelock.acquire()
try:
send(obj)
finally:
writelock.release()
except IndexError:
pass
except Exception, e:
# Since this runs in a daemon thread the objects it uses
# may be become unusable while the process is cleaning up.
# We ignore errors which happen after the process has
# started to cleanup.
debug('error in queue thread: %s', e)
if not Finalize._exiting:
raise

@@ -260,15 +268,4 @@ _sentinel = object()

def __init__(self):
if sys.platform == 'win32':
from processing.connection import Listener, Client
l = Listener()
reader = Client(l.address)
writer = l.accept()
state = reader, writer, Lock(), None
else:
rfd, wfd = os.pipe()
reader = _processing.SocketConnection(rfd)
writer = _processing.SocketConnection(wfd)
os.close(rfd), os.close(wfd)
state = reader, writer, Lock(), Lock()
reader, writer = Pipe(duplex=False)
state = reader, writer, Lock(), Lock()
self.__setstate__(state)

@@ -275,0 +272,0 @@

@@ -0,1 +1,3 @@

.. include:: doc/version.txt
===================

@@ -8,5 +10,5 @@ Python processing

:Url: http://developer.berlios.de/projects/pyprocessing
:Version: |version|
:Licence: BSD Licence
``processing`` is a package for the Python language which supports the

@@ -16,8 +18,17 @@ spawning of processes using the API of the standard library's

Objects can be transferred between processes using pipes or queues,
and objects can be shared between processes using a server process or
(for simple data) shared memory. Equivalents of the synchronization
primitives in ``threading`` are also provided.
*Features*:
* Objects can be transferred between processes using pipes or
multi-producer/multi-consumer queues.
* Objects can be shared between processes using a server process or
(for simple data) shared memory.
* Equivalents of all the synchronization primitives in ``threading``
are available.
* A ``Pool`` class makes it easy to submit tasks to a pool of worker
processes.
Links

@@ -84,3 +95,14 @@ =====

Tasks can be offloaded to a pool of worker processes in various ways,
for example ::
>>> from processing import Pool
>>> def f(x): return x*x
...
>>> p = Pool(4)
>>> result = p.map_async(f, range(10))
>>> print result.get(timeout=1)
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
.. raw:: html

@@ -87,0 +109,0 @@

@@ -14,4 +14,3 @@ #

import os, sys, copy_reg, socket
import _processing
import process
from processing import _processing, process

@@ -83,3 +82,5 @@ #

handle = rebuild_handle(reduced_handle)
return _processing.PipeConnection(handle)
conn = _processing.PipeConnection(handle)
_processing.CloseHandle(handle)
return conn

@@ -86,0 +87,0 @@ copy_reg.pickle(_processing.PipeConnection, reduce_pipe_connection)

@@ -137,7 +137,3 @@ #

data = ['*.txt', '*.html', 'doc/*.html', 'doc/*.css'] + glob('test/*.py')
if sys.version_info < (2, 5, 0):
data = [x for x in data if 'test_with.py' not in x]
data = ['*.txt', '*.html', 'doc/*.html', 'doc/*.css', 'test/*.py']
kwds['package_data'] = {'processing': data}

@@ -144,0 +140,0 @@

@@ -1,2 +0,2 @@

/*
/*
* Definition of a `Connection` type.

@@ -37,7 +37,7 @@ * Used by `socket_connection.h` and `pipe_connection.h`.

char *buffer;
int length;
int res;
Py_ssize_t length;
CHECKHANDLE(self);
if (!PyArg_ParseTuple(args, "s#", &buffer, &length))

@@ -60,3 +60,3 @@ return NULL;

char *freeme = NULL;
int res;
Py_ssize_t nbytes;
PyObject *result = NULL;

@@ -67,12 +67,12 @@

Py_BEGIN_ALLOW_THREADS
res = recv_string(self->handle, self->buffer, BUFFER_SIZE, &freeme);
nbytes = recv_string(self->handle, self->buffer, BUFFER_SIZE, &freeme);
Py_END_ALLOW_THREADS
if (res < 0) {
SetExcFromNumber(res);
if (nbytes < 0) {
SetExcFromNumber(nbytes);
} else {
if (freeme == NULL) {
result = Py_BuildValue("s#", self->buffer, res);
result = Py_BuildValue("s#", self->buffer, nbytes);
} else {
result = Py_BuildValue("s#", freeme, res);
result = Py_BuildValue("s#", freeme, nbytes);
free(freeme);

@@ -89,3 +89,4 @@ }

char *freeme = NULL, *buffer = NULL;
int res, length;
Py_ssize_t nbytes, length;
int offset=0;
PyObject *result = NULL;

@@ -95,19 +96,30 @@

if (!PyArg_ParseTuple(args, "w#", &buffer, &length))
if (!PyArg_ParseTuple(args, "w#|i", &buffer, &length, &offset))
return NULL;
if (offset < 0) {
PyErr_SetString(PyExc_ValueError, "negative offset");
return NULL;
}
if (offset > length) {
PyErr_SetString(PyExc_ValueError, "offset too large");
return NULL;
}
Py_BEGIN_ALLOW_THREADS
res = recv_string(self->handle, buffer, length, &freeme);
nbytes = recv_string(self->handle, buffer+offset, length-offset, &freeme);
Py_END_ALLOW_THREADS
if (res < 0) {
SetExcFromNumber(res);
if (nbytes < 0) {
SetExcFromNumber(nbytes);
} else {
if (freeme == NULL) {
result = Py_BuildValue("i", res);
result = Py_BuildValue("i", nbytes);
} else {
result = PyObject_CallFunction(BufferTooShort, "s#", freeme, res);
result = PyObject_CallFunction(BufferTooShort,
"s#", freeme, nbytes);
free(freeme);
PyErr_SetObject(BufferTooShort, result);
Py_DECREF(result);
Py_XDECREF(result);
return NULL;

@@ -121,3 +133,3 @@ }

/*
* Functions for transferringn objects
* Functions for transferring objects
*/

@@ -129,3 +141,4 @@

char *buffer;
int length, res;
int res;
Py_ssize_t length;
PyObject *obj = NULL, *pickled_string = NULL;

@@ -146,4 +159,9 @@

if (length > 0x7fffffff) {
PyErr_SetString(PyExc_ValueError, "string too long");
goto ERR;
}
Py_BEGIN_ALLOW_THREADS
res = send_string(self->handle, buffer, length);
res = send_string(self->handle, buffer, (int)length);
Py_END_ALLOW_THREADS

@@ -159,3 +177,3 @@

Py_XDECREF(pickled_string);
return FALSE;
return NULL;
}

@@ -167,3 +185,3 @@

char *freeme = NULL;
int res;
Py_ssize_t nbytes;
PyObject *result = NULL;

@@ -174,14 +192,14 @@

Py_BEGIN_ALLOW_THREADS
res = recv_string(self->handle, self->buffer, BUFFER_SIZE, &freeme);
nbytes = recv_string(self->handle, self->buffer, BUFFER_SIZE, &freeme);
Py_END_ALLOW_THREADS
if (res < 0) {
SetExcFromNumber(res);
if (nbytes < 0) {
SetExcFromNumber(nbytes);
} else {
if (freeme == NULL) {
result = PyObject_CallFunction(loadsFunction, "s#",
self->buffer, res);
self->buffer, nbytes);
} else {
result = PyObject_CallFunction(loadsFunction, "s#",
freeme, res);
freeme, nbytes);
free(freeme);

@@ -188,0 +206,0 @@ }

@@ -9,11 +9,5 @@ /*

#include <Python.h>
#include "processing_defs.h"
#include "structmember.h"
#ifndef Py_RETURN_NONE
#define Py_RETURN_NONE return Py_INCREF(Py_None), Py_None
#define Py_RETURN_TRUE return Py_INCREF(Py_True), Py_True
#define Py_RETURN_FALSE return Py_INCREF(Py_False), Py_False
#endif
extern PyObject *dumpsFunction, *loadsFunction;

@@ -20,0 +14,0 @@ extern PyObject *BufferTooShort;

@@ -15,7 +15,7 @@ /*

#define TOO_LONG(n) (n > 0x7fffffff)
#define INVALID_HANDLE NULL
typedef HANDLE _HANDLE;
typedef unsigned uint32_t;
#define INVALID_HANDLE NULL
static _HANDLE

@@ -42,2 +42,3 @@ _duplicate(_HANDLE h)

#define ALREADY_SET_ERROR (-3)
#define BAD_MESSAGE_LENGTH (-7)

@@ -56,2 +57,5 @@ static PyObject *

break;
case BAD_MESSAGE_LENGTH:
PyErr_SetString(PyExc_IOError, "bad message length");
break;
default:

@@ -72,5 +76,9 @@ PyErr_SetString(PyExc_AssertionError, "unknown error number");

if (TOO_LONG(length))
return BAD_MESSAGE_LENGTH;
if (!WriteFile(pipe, string, length, &amount_written, NULL))
return STANDARD_ERROR;
assert(length == amount_written);
/* assert(length == amount_written); */
return SUCCESS;

@@ -85,3 +93,3 @@ }

static int
static Py_ssize_t
recv_string(_HANDLE pipe, char *buffer, size_t buflength, char **newbuffer)

@@ -94,2 +102,4 @@ {

if (ReadFile(pipe, buffer, buflength, &length, NULL)) {
if (TOO_LONG(length))
return BAD_MESSAGE_LENGTH;
return length;

@@ -103,4 +113,5 @@ } else if (GetLastError() != ERROR_MORE_DATA) {

full_length = length + left;
assert(full_length > 0);
full_length = length + left;
if (TOO_LONG(full_length))
return BAD_MESSAGE_LENGTH;

@@ -107,0 +118,0 @@ *newbuffer = malloc(full_length);

@@ -9,10 +9,5 @@ /*

#include "Python.h"
#include "processing_defs.h"
#include "processing.h"
#ifndef Py_RETURN_NONE
#define Py_RETURN_NONE return Py_INCREF(Py_None), Py_None
#define Py_RETURN_TRUE return Py_INCREF(Py_True), Py_True
#define Py_RETURN_FALSE return Py_INCREF(Py_False), Py_False
#endif
extern PyTypeObject BlockerType;

@@ -131,23 +126,23 @@ extern PyTypeObject SocketConnectionType;

#endif /* !NO_SENDFD */
static PyMethodDef module_methods[] = {
#if !NO_SENDFD
{"sendfd", processing_sendfd, METH_VARARGS,
"sendfd(sockfd, fd): send file descriptor given by fd over\n"
"sendfd(sockfd, fd) -> None: send file descriptor given by fd over\n"
"the unix domain socket whose file decriptor is sockfd"},
{"recvfd", processing_recvfd, METH_VARARGS,
"recvfd(sockfd): returns a file descriptor over\n"
"recvfd(sockfd) -> fd: returns a file descriptor over\n"
"a unix domain socket whose file decriptor is sockfd"},
{NULL, NULL, 0, NULL}
};
#endif
{"rwbuffer", processing_rwbuffer, METH_VARARGS,
"rwbuffer(object [, offset[, size]]) -> read-write buffer"},
#else /* NO_SENDFD */
{"address_of_buffer", processing_address_of_buffer, METH_O,
"address_of_buffer(obj) -> (address, size)"},
static PyMethodDef module_methods[] = {
{NULL, NULL, 0, NULL}
};
#endif /* NO_SENDFD */
PyMODINIT_FUNC

@@ -154,0 +149,0 @@ init_processing(void)

@@ -1,10 +0,12 @@

#include "Python.h"
#include "structmember.h"
/*
* A type which wraps a posix queue
*
* posix_queue.c
*
* Copyright (c) 2006, 2007, R Oudkerk --- see COPYING.txt
*/
#ifndef Py_RETURN_NONE
#define Py_RETURN_NONE return Py_INCREF(Py_None), Py_None
#define Py_RETURN_TRUE return Py_INCREF(Py_True), Py_True
#define Py_RETURN_FALSE return Py_INCREF(Py_False), Py_False
#endif
#include "processing_defs.h"
#include "structmember.h"
#include <mqueue.h>

@@ -205,3 +207,4 @@ #include <fcntl.h>

{
int res, length, block = 1;
int res, block = 1;
Py_ssize_t length;
char *string;

@@ -282,3 +285,4 @@ unsigned priority = 1;

{
int bytes, block = 1;
int block = 1;
Py_ssize_t bytes;
char *buffer;

@@ -285,0 +289,0 @@ PyObject *result, *timeout_obj = Py_None;

/*
* A type which wraps a posix named semaphore
*
* win_processing.c
* posix_processing.c
*

@@ -9,13 +9,6 @@ * Copyright (c) 2006, 2007, R Oudkerk --- see COPYING.txt

#include "Python.h"
#include "processing_defs.h"
#include "pythread.h"
#ifndef Py_RETURN_NONE
#define Py_RETURN_NONE return Py_INCREF(Py_None), Py_None
#define Py_RETURN_TRUE return Py_INCREF(Py_True), Py_True
#define Py_RETURN_FALSE return Py_INCREF(Py_False), Py_False
#endif
#include <semaphore.h>
#include <pthread.h>
#include <fcntl.h>

@@ -22,0 +15,0 @@ #include <time.h>

@@ -9,11 +9,4 @@ /*

#include <Python.h>
#include "structmember.h"
#include "processing_defs.h"
#ifndef Py_RETURN_NONE
#define Py_RETURN_NONE return Py_INCREF(Py_None), Py_None
#define Py_RETURN_TRUE return Py_INCREF(Py_True), Py_True
#define Py_RETURN_FALSE return Py_INCREF(Py_False), Py_False
#endif
extern PyObject *dumpsFunction, *loadsFunction;

@@ -49,3 +42,2 @@ extern PyObject *BufferTooShort;

/* should probably use "n" format for fd on Python 2.5 */
if (!PyArg_ParseTuple(args, "Oiii|i", &s, &fd, &family, &type, &proto))

@@ -52,0 +44,0 @@ return NULL;

@@ -12,2 +12,4 @@ /*

#define TOO_LONG(n) (n > 0x7fffffff)
#ifdef MS_WINDOWS

@@ -19,4 +21,2 @@

#define FALSE_SOCKET
#define WIN32_LEAN_AND_MEAN

@@ -70,3 +70,2 @@ #include <winsock2.h>

/*

@@ -83,3 +82,3 @@ * Error values

#define EARLY_END_OF_FILE (-6)
#define NEGATIVE_MESSAGE_LENGTH (-7)
#define BAD_MESSAGE_LENGTH (-7)
#define SOME_SOCKET_ERROR (-8)

@@ -110,12 +109,8 @@

break;
case SELECT_ERROR:
PyErr_SetString(PyExc_IOError, "select failed");
case BAD_MESSAGE_LENGTH:
PyErr_SetString(PyExc_IOError, "message length is bad");
break;
case NEGATIVE_MESSAGE_LENGTH:
PyErr_SetString(PyExc_IOError, "message length is negative");
break;
#ifdef MS_WINDOWS
case SOME_SOCKET_ERROR:
PyErr_Format(PyExc_IOError, "windows socket error %d",
WSAGetLastError());
PyErr_SetExcFromWindowsErr(PyExc_IOError, WSAGetLastError()) ;
break;

@@ -137,3 +132,3 @@ #endif

char *p = string;
int res;
Py_ssize_t res;

@@ -158,3 +153,4 @@ while (length > 0) {

{
int remaining = length, temp;
size_t remaining = length;
Py_ssize_t temp;
char *p = buffer;

@@ -166,3 +162,3 @@

if (temp == 0)
return remaining == (int)length
return remaining == length
? END_OF_FILE : EARLY_END_OF_FILE;

@@ -186,11 +182,11 @@ else

{
if (length < 16384) {
char *message, *p;
if (length < 0x4000) {
char *message;
int res;
p = message = malloc(length+4);
if (p == NULL)
message = malloc(length+4);
if (message == NULL)
return MEMORY_ERROR;
*(uint32_t*)message = htonl(length);
*(uint32_t*)message = htonl((uint32_t)length);
memcpy(message+4, string, length);

@@ -201,5 +197,9 @@ res = _sendall(h, message, length+4);

} else {
char lenbuff[4];
*(uint32_t*)lenbuff = htonl(length);
return _sendall(h, lenbuff, 4) || _sendall(h, string, length);
uint32_t lenbuff;
if (TOO_LONG(length))
return BAD_MESSAGE_LENGTH;
lenbuff = htonl((uint32_t)length);
return _sendall(h, (char*)&lenbuff, 4) || _sendall(h, string, length);
}

@@ -214,27 +214,27 @@ }

static int
static Py_ssize_t
recv_string(_HANDLE h, char *buffer, size_t buflength, char **newbuffer)
{
int res, length;
char lenbuff[4];
int res;
uint32_t ulength;
*newbuffer = NULL;
res = _recvall(h, lenbuff, 4);
res = _recvall(h, (char*)&ulength, 4);
if (res < 0)
return res;
length = htonl(*(uint32_t*) lenbuff);
if (length < 0)
return NEGATIVE_MESSAGE_LENGTH;
if (length <= (int)buflength) {
res = _recvall(h, buffer, length);
return res < 0 ? res : length;
ulength = ntohl(ulength);
if (TOO_LONG(ulength))
return BAD_MESSAGE_LENGTH;
if (ulength <= buflength) {
res = _recvall(h, buffer, (size_t)ulength);
return res < 0 ? res : ulength;
} else {
*newbuffer = malloc(length > 0 ? length : 1);
*newbuffer = malloc(ulength > 0 ? (size_t)ulength : 1);
if (*newbuffer == NULL)
return MEMORY_ERROR;
res = _recvall(h, *newbuffer, length);
return res < 0 ? res : length;
res = _recvall(h, *newbuffer, (size_t)ulength);
return res < 0 ? (Py_ssize_t)res : (Py_ssize_t)ulength;
}

@@ -260,4 +260,4 @@ }

tv.tv_sec = (int)timeout;
tv.tv_usec = (int)((timeout - tv.tv_sec) * 1e6);
tv.tv_sec = (long)timeout;
tv.tv_usec = (long)((timeout - tv.tv_sec) * 1e6);

@@ -268,3 +268,3 @@ res = select(fd+1, &rfds, NULL, NULL, &tv);

#ifdef MS_WINDOWS
return SELECT_ERROR;
return SOME_SOCKET_ERROR;
#else

@@ -271,0 +271,0 @@ return STANDARD_ERROR;

@@ -9,13 +9,10 @@ /*

#include "Python.h"
#include "processing_defs.h"
#include "processing.h"
#ifndef Py_RETURN_NONE
#define Py_RETURN_NONE return Py_INCREF(Py_None), Py_None
#define Py_RETURN_TRUE return Py_INCREF(Py_True), Py_True
#define Py_RETURN_FALSE return Py_INCREF(Py_False), Py_False
#endif
#define WIN32_LEAN_AND_MEAN
#include <windows.h>
#define PIPE_BUFFER_SIZE 8192
extern PyTypeObject BlockerType;

@@ -33,6 +30,4 @@ extern PyTypeObject PipeConnectionType;

#define PIPE_BUFFER_SIZE 8192
static PyObject *
processing_createpipe(PyObject *self, PyObject *args)
processing_createpipe(PyObject *self, PyObject *args, PyObject *kwds)
{

@@ -42,11 +37,17 @@ char *name;

DWORD timeout = NMPWAIT_WAIT_FOREVER;
DWORD openmode = PIPE_TYPE_MESSAGE | PIPE_READMODE_MESSAGE | PIPE_WAIT;
DWORD openmode = PIPE_ACCESS_DUPLEX;
DWORD pipemode = PIPE_TYPE_MESSAGE | PIPE_READMODE_MESSAGE | PIPE_WAIT;
if ( !PyArg_ParseTuple( args, "s|i", &name, &timeout) )
DWORD obufsz = openmode & PIPE_ACCESS_OUTBOUND ? PIPE_BUFFER_SIZE : 0;
DWORD ibufsz = openmode & PIPE_ACCESS_INBOUND ? PIPE_BUFFER_SIZE : 0;
static char *kwlist[] = {"name", "timeout", "openmode", NULL};
if (!PyArg_ParseTupleAndKeywords(args, kwds, "s|ii", kwlist,
&name, &timeout, &openmode))
return NULL;
Py_BEGIN_ALLOW_THREADS
pipe = CreateNamedPipe(name, PIPE_ACCESS_DUPLEX, openmode,
PIPE_UNLIMITED_INSTANCES, PIPE_BUFFER_SIZE,
PIPE_BUFFER_SIZE, timeout, NULL);
pipe = CreateNamedPipe(name, openmode, pipemode, PIPE_UNLIMITED_INSTANCES,
obufsz, ibufsz, timeout, NULL);
Py_END_ALLOW_THREADS

@@ -56,4 +57,3 @@

CloseHandle(pipe);
PyErr_SetFromWindowsErr(0);
return NULL;
return PyErr_SetFromWindowsErr(0);
}

@@ -75,3 +75,3 @@

if ( !PyArg_ParseTuple( args, "i", &pipe ) )
if (!PyArg_ParseTuple(args, "i", &pipe))
return NULL;

@@ -85,4 +85,3 @@

CloseHandle(pipe);
PyErr_SetFromWindowsErr(0);
return NULL;
return PyErr_SetFromWindowsErr(0);
}

@@ -105,3 +104,3 @@

if ( !PyArg_ParseTuple( args, "si", &name, &timeout ) )
if (!PyArg_ParseTuple(args, "si", &name, &timeout))
return NULL;

@@ -113,6 +112,4 @@

if (!success) {
PyErr_SetFromWindowsErr(0);
return NULL;
}
if (!success)
return PyErr_SetFromWindowsErr(0);

@@ -129,9 +126,9 @@ Py_RETURN_NONE;

DWORD dwMode = PIPE_READMODE_MESSAGE;
DWORD access = GENERIC_READ | GENERIC_WRITE;
if ( !PyArg_ParseTuple( args, "s", &name ) )
if (!PyArg_ParseTuple(args, "s|I", &name, &access))
return NULL;
Py_BEGIN_ALLOW_THREADS
pipe = CreateFile(name, GENERIC_READ | GENERIC_WRITE,
0, NULL, OPEN_EXISTING, 0, NULL);
pipe = CreateFile(name, access, 0, NULL, OPEN_EXISTING, 0, NULL);
if (pipe == INVALID_HANDLE_VALUE)

@@ -146,4 +143,3 @@ success = FALSE;

CloseHandle(pipe);
PyErr_SetFromWindowsErr(0);
return NULL;
return PyErr_SetFromWindowsErr(0);
}

@@ -163,7 +159,14 @@

HANDLE pipe;
BOOL success;
if (!PyArg_ParseTuple(args, "i", &pipe))
return NULL;
if (!CloseHandle(pipe))
Py_BEGIN_ALLOW_THREADS
success = CloseHandle(pipe);
Py_END_ALLOW_THREADS
if (!success)
return PyErr_SetFromWindowsErr(0);
Py_RETURN_NONE;

@@ -183,4 +186,6 @@ }

return NULL;
if (!SetConsoleCtrlHandler(NULL, !value))
return PyErr_SetFromWindowsErr(0);
Py_RETURN_NONE;

@@ -196,4 +201,6 @@ }

return NULL;
if (!GenerateConsoleCtrlEvent(event, gid))
return PyErr_SetFromWindowsErr(0);
Py_RETURN_NONE;

@@ -265,3 +272,4 @@ }

{"createpipe", (PyCFunction)processing_createpipe, METH_VARARGS, ""},
{"createpipe",
(PyCFunction)processing_createpipe, METH_VARARGS | METH_KEYWORDS, ""},

@@ -276,2 +284,6 @@ {"connectpipe", (PyCFunction)processing_connectpipe, METH_VARARGS, ""},

{"rwbuffer", processing_rwbuffer, METH_VARARGS, ""},
{"address_of_buffer", processing_address_of_buffer, METH_O, ""},
{NULL} /* Sentinel */

@@ -318,2 +330,4 @@ };

NULL, NULL);
if (!BufferTooShort)
return;
Py_INCREF(BufferTooShort);

@@ -320,0 +334,0 @@ PyModule_AddObject(m, "BufferTooShort", BufferTooShort);

@@ -9,11 +9,5 @@ /*

#include "Python.h"
#include "processing_defs.h"
#include "pythread.h"
#ifndef Py_RETURN_NONE
#define Py_RETURN_NONE return Py_INCREF(Py_None), Py_None
#define Py_RETURN_TRUE return Py_INCREF(Py_True), Py_True
#define Py_RETURN_FALSE return Py_INCREF(Py_False), Py_False
#endif
#define WIN32_LEAN_AND_MEAN

@@ -20,0 +14,0 @@ #include <windows.h>

@@ -83,3 +83,5 @@ #

# `self._block._close()` will be called before `os._exit()`.
process.Finalize(self, self._block._close, atexit=True)
process.Finalize(
self, self._block._close, atexit=True, priority=-10
)

@@ -223,3 +225,3 @@ self.acquire = self._block.acquire

try:
# wait for notification
# wait for notification or timeout
if timeout is None:

@@ -226,0 +228,0 @@ self._wait_semaphore.acquire()

@@ -28,4 +28,3 @@ #

from processing.test import test_processing, test_newtype, \
test_doc, test_speed, test_connection, test_reduction, test_stop, \
test_workers, test_pool
test_doc, test_speed, test_connection, test_reduction, test_stop

@@ -41,9 +40,9 @@ old_processes = set(activeChildren())

if HAVE_NATIVE_SEMAPHORE:
from processing.test import test_workers, test_pool
run_test(test_processing, ['processes', 'processes+server', 'threads'])
run_test(test_workers, ['processes'])
run_test(test_pool, ['processes'])
else:
run_test(test_processing, ['processes+server', 'threads'])
run_test(test_workers, ['processes'])
run_test(test_pool, ['processes'])
processes = set(activeChildren())

@@ -50,0 +49,0 @@ assert processes.issubset(old_processes)

@@ -103,5 +103,5 @@ #

t = time.time()
C = list(pool.imap(pow3, xrange(N), chunksize=10000))
print '\tlist(pool.imap(pow3, xrange(%d), chunksize=10000)):\n\t\t%s' \
' seconds' % (N, time.time() - t)
C = list(pool.imap(pow3, xrange(N), chunksize=N//8))
print '\tlist(pool.imap(pow3, xrange(%d), chunksize=%d)):\n\t\t%s' \
' seconds' % (N, N//8, time.time() - t)

@@ -126,5 +126,5 @@ assert A == B == C, (len(A), len(B), len(C))

t = time.time()
C = list(pool.imap(noop, L, chunksize=10000))
print '\tlist(pool.imap(noop, L, chunksize=10000)):\n\t\t%s seconds' % \
(time.time() - t)
C = list(pool.imap(noop, L, chunksize=len(L)//8))
print '\tlist(pool.imap(noop, L, chunksize=%d)):\n\t\t%s seconds' % \
(len(L)//8, time.time() - t)

@@ -131,0 +131,0 @@ assert A == B == C, (len(A), len(B), len(C))

@@ -16,3 +16,6 @@ #

from processing import *
Manager = LocalManager
try:
Manager = LocalManager
except NameError:
Manager = None
else:

@@ -19,0 +22,0 @@ raise ValueError, config

@@ -179,2 +179,7 @@ #

import processing
try:
from processing.sharedctypes import new_array
except ImportError:
new_array = None
manager = processing.Manager()

@@ -208,7 +213,10 @@ local_manager = processing.LocalManager()

test_seqspeed(range(10))
print '\n\t######## testing shared memory array\n'
print '\n\t######## testing LocalManager.SharedArray("i", ...)\n'
test_seqspeed(local_manager.SharedArray('i', range(10)))
print '\n\t######## testing list managed by server process\n'
test_seqspeed(manager.list(range(10)))
if new_array:
print '\n\t######## testing sharedctypes.new_array("i", ...)\n'
test_seqspeed(new_array('i', range(10)))
print

@@ -215,0 +223,0 @@

@@ -8,5 +8,6 @@ ========

Alexey Akimov, Michele Bertoldi, Josiah Carlson, Lisandro Dalcin,
Markus Gritsch, Doug Hellmann, Charlie Hull, Richard Jones, Gerald
John M. Manipon, Kevin Manley, Paul Rudin, Dominique Wahli.
Markus Gritsch, Doug Hellmann, Charlie Hull, Richard Jones, Alexy
Khrabrov, Gerald John M. Manipon, Kevin Manley, Paul Rudin,
Dominique Wahli.
Sorry if I have forgotten anyone.
#
# A test file for the processing package (and processing.dummy)
#
from __future__ import with_statement
import time, sys, random
from processing import *
from Queue import Empty
#### TEST_NAMESPACE
def namespace_func(running, mutex):
random.seed()
time.sleep(random.random()*4)
with mutex:
print '\n\t\t\t' + str(currentProcess()) + ' has finished'
running.value -= 1
def test_namespace(manager):
TASKS = 10
running = manager.SharedValue('i', TASKS)
mutex = manager.Lock()
for i in range(TASKS):
Process(target=namespace_func, args=[running, mutex]).start()
while running.value > 0:
time.sleep(0.08)
with mutex:
print running.value,
sys.stdout.flush()
print
print 'No more running processes'
#### TEST_QUEUE
def queue_func(queue):
for i in xrange(30):
time.sleep(0.5 * random.random())
queue.put(i*i)
queue.put('STOP')
def test_queue(manager):
q = manager.Queue()
p = Process(target=queue_func, args=[q])
p.start()
o = None
while o != 'STOP':
try:
o = q.get(timeout=0.3)
print o,
sys.stdout.flush()
except Empty:
print 'TIMEOUT'
print
#### TEST_CONDITION
def condition_func(cond):
with cond:
print '\t' + str(cond)
time.sleep(2)
print '\tchild is notifying'
cond.notify()
print '\t' + str(cond)
def test_condition(manager):
cond = manager.Condition()
p = Process(target=condition_func, args=[cond])
print cond
with cond:
print cond
with cond:
print cond
p.start()
print 'main is waiting'
cond.wait()
print 'main has woken up'
print cond
print cond
p.join()
print cond
#### TEST_SEMAPHORE
def semaphore_func(sema, mutex, running):
with sema:
with mutex:
running.value += 1
print running.value, 'tasks are running'
random.seed()
time.sleep(random.random()*2)
with mutex:
running.value -= 1
print '%s has finished' % currentProcess()
def test_semaphore(manager):
sema = manager.Semaphore(3)
mutex = manager.RLock()
running = manager.SharedValue('i', 0)
processes = [Process(target=semaphore_func, args=[sema, mutex, running])
for i in range(10)]
for p in processes:
p.start()
for p in processes:
p.join()
#### TEST_JOIN_TIMEOUT
def join_timeout_func():
print '\tchild sleeping'
time.sleep(5.5)
print '\n\tchild terminating'
def test_join_timeout(manager):
p = Process(target=join_timeout_func)
p.start()
print 'waiting for process to finish'
while 1:
p.join(timeout=1)
if not p.isAlive():
break
print '.',
sys.stdout.flush()
#### TEST_EVENT
def event_func(event):
print '\t%r is waiting' % currentProcess()
event.wait()
print '\t%r has woken up' % currentProcess()
def test_event(manager):
event = manager.Event()
processes = [Process(target=event_func, args=[event]) for i in range(5)]
for p in processes:
p.start()
print 'main is sleeping'
time.sleep(2)
print 'main is setting event'
event.set()
for p in processes:
p.join()
#### TEST_SHAREDVALUES
def sharedvalues_func(values, structs, arrays,
shared_values, shared_structs, shared_arrays):
for i in range(len(values)):
v = values[i][1]
sv = shared_values[i].value
assert v == sv
for i in range(len(structs)):
s = structs[i][1]
ss = shared_structs[i].value
assert s == ss, (s, ss)
for i in range(len(values)):
a = arrays[i][1]
sa = list(shared_arrays[i][:])
assert a == sa
print 'Tests passed'
def test_sharedvalues(manager):
if sys.platform == 'cygwin' and hasattr(manager, '_getheap'):
print >>sys.stderr, 'cygwin does not allow resizing of mmaps'
return
values = [
('i', 10),
('h', -2),
('16p', 'hello')
]
structs = [
('hd', (10, 0.75)),
('10d', tuple(0.375 * i for i in range(10))),
('cccc', ('a', 'b', 'c', 'd'))
]
arrays = [
('i', range(100)),
('d', [0.25 * i for i in range(100)]),
('H', range(1000))
]
shared_values = [manager.SharedValue(id, v) for id, v in values]
shared_structs = [manager.SharedStruct(id, s) for id, s in structs]
shared_arrays = [manager.SharedArray(id, a) for id, a in arrays]
p = Process(
target=sharedvalues_func,
args=(values, structs, arrays,
shared_values, shared_structs, shared_arrays)
)
p.start()
p.join()
assert p.getExitCode() == 0
####
def main():
with Manager() as manager:
for func in [ test_namespace, test_queue, test_condition,
test_semaphore, test_join_timeout, test_event,
test_sharedvalues ]:
print '\n\t######## %s\n' % func.__name__
func(manager)
ignore = activeChildren() # cleanup any old processes
info = manager._debug_info()
if info is not None:
print info
raise ValueError, 'there should be no positive refcounts left'
if __name__ == '__main__':
freezeSupport()
main()