Latest Threat Research:SANDWORM_MODE: Shai-Hulud-Style npm Worm Hijacks CI Workflows and Poisons AI Toolchains.Details
Socket
Book a DemoInstallSign in
Socket

stomper

Package Overview
Dependencies
Maintainers
2
Versions
15
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

stomper - npm Package Compare versions

Comparing version
0.2.8
to
0.3.0
+508
lib/stomper/stomp_10.py
"""
This is a python client implementation of the STOMP protocol.
It aims to be transport layer neutral. This module provides functions to
create and parse STOMP messages in a programmatic fashion.
The examples package contains two examples using twisted as the transport
framework. Other frameworks can be used and I may add other examples as
time goes on.
The STOMP protocol specification maybe found here:
* http://stomp.codehaus.org/Protocol
I've looked at the stomp client by Jason R. Briggs and have based the message
generation on how his client does it. The client can be found at the follow
address however it isn't a dependancy.
* http://www.briggs.net.nz/log/projects/stomppy
In testing this library I run against ActiveMQ project. The server runs
in java, however its fairly standalone and easy to set up. The projects
page is here:
* http://activemq.apache.org/
(c) Oisin Mulvihill, 2007-07-26.
License: http://www.apache.org/licenses/LICENSE-2.0
"""
import re
import uuid
import types
import logging
import utils
import stompbuffer
# This is used as a return from message responses functions.
# It is used more for readability more then anything or reason.
NO_RESPONSE_NEEDED = ''
# For backwards compatibility
NO_REPONSE_NEEDED = ''
# The version of the protocol we implement.
STOMP_VERSION = '1.0'
# Message terminator:
NULL = '\x00'
# STOMP Spec v1.0 valid commands:
VALID_COMMANDS = [
'ABORT', 'ACK', 'BEGIN', 'COMMIT',
'CONNECT', 'CONNECTED', 'DISCONNECT', 'MESSAGE',
'SEND', 'SUBSCRIBE', 'UNSUBSCRIBE',
'RECEIPT', 'ERROR',
]
def get_log():
return logging.getLogger("stomper")
class FrameError(Exception):
"""Raise for problem with frame generation or parsing.
"""
class Frame(object):
"""This class is used to create or read STOMP message frames.
The method pack() is used to create a STOMP message ready
for transmission.
The method unpack() is used to read a STOMP message into
a frame instance. It uses the unpack_frame(...) function
to do the initial parsing.
The frame has three important member variables:
* cmd
* headers
* body
The 'cmd' is a property that represents the STOMP message
command. When you assign this a check is done to make sure
its one of the VALID_COMMANDS. If not then FrameError will
be raised.
The 'headers' is a dictionary which the user can added to
if needed. There are no restrictions or checks imposed on
what values are inserted.
The 'body' is just a member variable that the body text
is assigned to.
"""
def __init__(self):
"""Setup the internal state."""
self._cmd = ''
self.body = ''
self.headers = {}
def getCmd(self):
"""Don't use _cmd directly!"""
return self._cmd
def setCmd(self, cmd):
"""Check the cmd is valid, FrameError will be raised if its not."""
cmd = cmd.upper()
if cmd not in VALID_COMMANDS:
raise FrameError("The cmd '%s' is not valid! It must be one of '%s' (STOMP v%s)." % (
cmd, VALID_COMMANDS, STOMP_VERSION)
)
else:
self._cmd = cmd
cmd = property(getCmd, setCmd)
def pack(self):
"""Called to create a STOMP message from the internal values.
"""
headers = ''.join(
['%s:%s\n' % (f, v) for f, v in sorted(self.headers.items())]
)
stomp_message = "%s\n%s\n%s%s\n" % (self._cmd, headers, self.body, NULL)
# import pprint
# print "stomp_message: ", pprint.pprint(stomp_message)
return stomp_message
def unpack(self, message):
"""Called to extract a STOMP message into this instance.
message:
This is a text string representing a valid
STOMP (v1.0) message.
This method uses unpack_frame(...) to extract the
information, before it is assigned internally.
retuned:
The result of the unpack_frame(...) call.
"""
if not message:
raise FrameError("Unpack error! The given message isn't valid '%s'!" % message)
msg = unpack_frame(message)
self.cmd = msg['cmd']
self.headers = msg['headers']
# Assign directly as the message will have the null
# character in the message already.
self.body = msg['body']
return msg
def unpack_frame(message):
"""Called to unpack a STOMP message into a dictionary.
returned = {
# STOMP Command:
'cmd' : '...',
# Headers e.g.
'headers' : {
'destination' : 'xyz',
'message-id' : 'some event',
:
etc,
}
# Body:
'body' : '...1234...\x00',
}
"""
body = []
returned = dict(cmd='', headers={}, body='')
breakdown = message.split('\n')
# Get the message command:
returned['cmd'] = breakdown[0]
breakdown = breakdown[1:]
def headD(field):
# find the first ':' everything to the left of this is a
# header, everything to the right is data:
index = field.find(':')
if index:
header = field[:index].strip()
data = field[index+1:].strip()
# print "header '%s' data '%s'" % (header, data)
returned['headers'][header.strip()] = data.strip()
def bodyD(field):
field = field.strip()
if field:
body.append(field)
# Recover the header fields and body data
handler = headD
for field in breakdown:
# print "field:", field
if field.strip() == '':
# End of headers, it body data next.
handler = bodyD
continue
handler(field)
# Stich the body data together:
# print "1. body: ", body
body = "".join(body)
returned['body'] = body.replace('\x00', '')
# print "2. body: <%s>" % returned['body']
return returned
def abort(transactionid):
"""STOMP abort transaction command.
Rollback whatever actions in this transaction.
transactionid:
This is the id that all actions in this transaction.
"""
return "ABORT\ntransaction: %s\n\n\x00\n" % transactionid
def ack(messageid, transactionid=None):
"""STOMP acknowledge command.
Acknowledge receipt of a specific message from the server.
messageid:
This is the id of the message we are acknowledging,
what else could it be? ;)
transactionid:
This is the id that all actions in this transaction
will have. If this is not given then a random UUID
will be generated for this.
"""
header = 'message-id: %s' % messageid
if transactionid:
header = 'message-id: %s\ntransaction: %s' % (messageid, transactionid)
return "ACK\n%s\n\n\x00\n" % header
def begin(transactionid=None):
"""STOMP begin command.
Start a transaction...
transactionid:
This is the id that all actions in this transaction
will have. If this is not given then a random UUID
will be generated for this.
"""
if not transactionid:
# Generate a random UUID:
transactionid = uuid.uuid4()
return "BEGIN\ntransaction: %s\n\n\x00\n" % transactionid
def commit(transactionid):
"""STOMP commit command.
Do whatever is required to make the series of actions
permanent for this transactionid.
transactionid:
This is the id that all actions in this transaction.
"""
return "COMMIT\ntransaction: %s\n\n\x00\n" % transactionid
def connect(username, password):
"""STOMP connect command.
username, password:
These are the needed auth details to connect to the
message server.
After sending this we will receive a CONNECTED
message which will contain our session id.
"""
return "CONNECT\nlogin:%s\npasscode:%s\n\n\x00\n" % (username, password)
def disconnect():
"""STOMP disconnect command.
Tell the server we finished and we'll be closing the
socket soon.
"""
return "DISCONNECT\n\n\x00\n"
def send(dest, msg, transactionid=None):
"""STOMP send command.
dest:
This is the channel we wish to subscribe to
msg:
This is the message body to be sent.
transactionid:
This is an optional field and is not needed
by default.
"""
transheader = ''
if transactionid:
transheader = 'transaction: %s\n' % transactionid
return "SEND\ndestination: %s\n%s\n%s\x00\n" % (dest, transheader, msg)
def subscribe(dest, ack='auto'):
"""STOMP subscribe command.
dest:
This is the channel we wish to subscribe to
ack: 'auto' | 'client'
If the ack is set to client, then messages received will
have to have an acknowledge as a reply. Otherwise the server
will assume delivery failure.
"""
return "SUBSCRIBE\ndestination: %s\nack: %s\n\n\x00\n" % (dest, ack)
def unsubscribe(dest):
"""STOMP unsubscribe command.
dest:
This is the channel we wish to subscribe to
Tell the server we no longer wish to receive any
further messages for the given subscription.
"""
return "UNSUBSCRIBE\ndestination:%s\n\n\x00\n" % dest
class Engine(object):
"""This is a simple state machine to return a response to received
message if needed.
"""
def __init__(self, testing=False):
self.testing = testing
self.log = logging.getLogger("stomper.Engine")
self.sessionId = ''
# Entry Format:
#
# COMMAND : Handler_Function
#
self.states = {
'CONNECTED' : self.connected,
'MESSAGE' : self.ack,
'ERROR' : self.error,
'RECEIPT' : self.receipt,
}
def react(self, msg):
"""Called to provide a response to a message if needed.
msg:
This is a dictionary as returned by unpack_frame(...)
or it can be a straight STOMP message. This function
will attempt to determine which an deal with it.
returned:
A message to return or an empty string.
"""
returned = ""
# If its not a string assume its a dict.
mtype = type(msg)
if mtype in types.StringTypes:
msg = unpack_frame(msg)
elif mtype == types.DictType:
pass
else:
raise FrameError("Unknown message type '%s', I don't know what to do with this!" % mtype)
if self.states.has_key(msg['cmd']):
# print("reacting to message - %s" % msg['cmd'])
returned = self.states[msg['cmd']](msg)
return returned
def connected(self, msg):
"""No response is needed to a connected frame.
This method stores the session id as the
member sessionId for later use.
returned:
NO_RESPONSE_NEEDED
"""
self.sessionId = msg['headers']['session']
#print "connected: session id '%s'." % self.sessionId
return NO_RESPONSE_NEEDED
def ack(self, msg):
"""Called when a MESSAGE has been received.
Override this method to handle received messages.
This function will generate an acknowledge message
for the given message and transaction (if present).
"""
message_id = msg['headers']['message-id']
transaction_id = None
if msg['headers'].has_key('transaction-id'):
transaction_id = msg['headers']['transaction-id']
# print "acknowledging message id <%s>." % message_id
return ack(message_id, transaction_id)
def error(self, msg):
"""Called to handle an error message received from the server.
This method just logs the error message
returned:
NO_RESPONSE_NEEDED
"""
body = msg['body'].replace(NULL, '')
brief_msg = ""
if msg['headers'].has_key('message'):
brief_msg = msg['headers']['message']
self.log.error("Received server error - message%s\n\n%s" % (brief_msg, body))
returned = NO_RESPONSE_NEEDED
if self.testing:
returned = 'error'
return returned
def receipt(self, msg):
"""Called to handle a receipt message received from the server.
This method just logs the receipt message
returned:
NO_RESPONSE_NEEDED
"""
body = msg['body'].replace(NULL, '')
brief_msg = ""
if msg['headers'].has_key('receipt-id'):
brief_msg = msg['headers']['receipt-id']
self.log.info("Received server receipt message - receipt-id:%s\n\n%s" % (brief_msg, body))
returned = NO_RESPONSE_NEEDED
if self.testing:
returned = 'receipt'
return returned
"""
This is a python client implementation of the STOMP protocol.
It aims to be transport layer neutral. This module provides functions to
create and parse STOMP messages in a programmatic fashion.
The examples package contains two examples using twisted as the transport
framework. Other frameworks can be used and I may add other examples as
time goes on.
The STOMP protocol specification maybe found here:
* http://stomp.codehaus.org/Protocol
I've looked at the stomp client by Jason R. Briggs and have based the message
generation on how his client does it. The client can be found at the follow
address however it isn't a dependancy.
* http://www.briggs.net.nz/log/projects/stomppy
In testing this library I run against ActiveMQ project. The server runs
in java, however its fairly standalone and easy to set up. The projects
page is here:
* http://activemq.apache.org/
(c) Oisin Mulvihill, 2007-07-26.
Ralph Bean, 2014-09-09.
License: http://www.apache.org/licenses/LICENSE-2.0
"""
import re
import uuid
import types
import logging
import utils
import stompbuffer
# This is used as a return from message responses functions.
# It is used more for readability more then anything or reason.
NO_RESPONSE_NEEDED = ''
# For backwards compatibility
NO_REPONSE_NEEDED = ''
# The version of the protocol we implement.
STOMP_VERSION = '1.1'
# Message terminator:
NULL = '\x00'
# STOMP Spec v1.1 valid commands:
VALID_COMMANDS = [
'ABORT', 'ACK', 'BEGIN', 'COMMIT',
'CONNECT', 'CONNECTED', 'DISCONNECT', 'MESSAGE',
'NACK', 'SEND', 'SUBSCRIBE', 'UNSUBSCRIBE',
'RECEIPT', 'ERROR',
]
def get_log():
return logging.getLogger("stomper")
class FrameError(Exception):
"""Raise for problem with frame generation or parsing.
"""
class Frame(object):
"""This class is used to create or read STOMP message frames.
The method pack() is used to create a STOMP message ready
for transmission.
The method unpack() is used to read a STOMP message into
a frame instance. It uses the unpack_frame(...) function
to do the initial parsing.
The frame has three important member variables:
* cmd
* headers
* body
The 'cmd' is a property that represents the STOMP message
command. When you assign this a check is done to make sure
its one of the VALID_COMMANDS. If not then FrameError will
be raised.
The 'headers' is a dictionary which the user can added to
if needed. There are no restrictions or checks imposed on
what values are inserted.
The 'body' is just a member variable that the body text
is assigned to.
"""
def __init__(self):
"""Setup the internal state."""
self._cmd = ''
self.body = ''
self.headers = {}
def getCmd(self):
"""Don't use _cmd directly!"""
return self._cmd
def setCmd(self, cmd):
"""Check the cmd is valid, FrameError will be raised if its not."""
cmd = cmd.upper()
if cmd not in VALID_COMMANDS:
raise FrameError("The cmd '%s' is not valid! It must be one of '%s' (STOMP v%s)." % (
cmd, VALID_COMMANDS, STOMP_VERSION)
)
else:
self._cmd = cmd
cmd = property(getCmd, setCmd)
def pack(self):
"""Called to create a STOMP message from the internal values.
"""
headers = ''.join(
['%s:%s\n' % (f, v) for f, v in sorted(self.headers.items())]
)
stomp_message = "%s\n%s\n%s%s\n" % (self._cmd, headers, self.body, NULL)
return stomp_message
def unpack(self, message):
"""Called to extract a STOMP message into this instance.
message:
This is a text string representing a valid
STOMP (v1.1) message.
This method uses unpack_frame(...) to extract the
information, before it is assigned internally.
retuned:
The result of the unpack_frame(...) call.
"""
if not message:
raise FrameError("Unpack error! The given message isn't valid '%s'!" % message)
msg = unpack_frame(message)
self.cmd = msg['cmd']
self.headers = msg['headers']
# Assign directly as the message will have the null
# character in the message already.
self.body = msg['body']
return msg
def unpack_frame(message):
"""Called to unpack a STOMP message into a dictionary.
returned = {
# STOMP Command:
'cmd' : '...',
# Headers e.g.
'headers' : {
'destination' : 'xyz',
'message-id' : 'some event',
:
etc,
}
# Body:
'body' : '...1234...\x00',
}
"""
body = []
returned = dict(cmd='', headers={}, body='')
breakdown = message.split('\n')
# Get the message command:
returned['cmd'] = breakdown[0]
breakdown = breakdown[1:]
def headD(field):
# find the first ':' everything to the left of this is a
# header, everything to the right is data:
index = field.find(':')
if index:
header = field[:index].strip()
data = field[index+1:].strip()
# print "header '%s' data '%s'" % (header, data)
returned['headers'][header.strip()] = data.strip()
def bodyD(field):
field = field.strip()
if field:
body.append(field)
# Recover the header fields and body data
handler = headD
for field in breakdown:
# print "field:", field
if field.strip() == '':
# End of headers, it body data next.
handler = bodyD
continue
handler(field)
# Stich the body data together:
# print "1. body: ", body
body = "".join(body)
returned['body'] = body.replace('\x00', '')
# print "2. body: <%s>" % returned['body']
return returned
def abort(transactionid):
"""STOMP abort transaction command.
Rollback whatever actions in this transaction.
transactionid:
This is the id that all actions in this transaction.
"""
return "ABORT\ntransaction:%s\n\n\x00\n" % transactionid
def ack(messageid, subscriptionid, transactionid=None):
"""STOMP acknowledge command.
Acknowledge receipt of a specific message from the server.
messageid:
This is the id of the message we are acknowledging,
what else could it be? ;)
subscriptionid:
This is the id of the subscription that applies to the message.
transactionid:
This is the id that all actions in this transaction
will have. If this is not given then a random UUID
will be generated for this.
"""
header = 'subscription:%s\nmessage-id:%s' % (subscriptionid, messageid)
if transactionid:
header += '\ntransaction:%s' % transactionid
return "ACK\n%s\n\n\x00\n" % header
def nack(messageid, subscriptionid, transactionid=None):
"""STOMP negative acknowledge command.
NACK is the opposite of ACK. It is used to tell the server that the client
did not consume the message. The server can then either send the message to
a different client, discard it, or put it in a dead letter queue. The exact
behavior is server specific.
messageid:
This is the id of the message we are acknowledging,
what else could it be? ;)
subscriptionid:
This is the id of the subscription that applies to the message.
transactionid:
This is the id that all actions in this transaction
will have. If this is not given then a random UUID
will be generated for this.
"""
header = 'subscription:%s\nmessage-id:%s' % (subscriptionid, messageid)
if transactionid:
header += '\ntransaction:%s' % transactionid
return "NACK\n%s\n\n\x00\n" % header
def begin(transactionid=None):
"""STOMP begin command.
Start a transaction...
transactionid:
This is the id that all actions in this transaction
will have. If this is not given then a random UUID
will be generated for this.
"""
if not transactionid:
# Generate a random UUID:
transactionid = uuid.uuid4()
return "BEGIN\ntransaction:%s\n\n\x00\n" % transactionid
def commit(transactionid):
"""STOMP commit command.
Do whatever is required to make the series of actions
permanent for this transactionid.
transactionid:
This is the id that all actions in this transaction.
"""
return "COMMIT\ntransaction:%s\n\n\x00\n" % transactionid
def connect(username, password, host, heartbeats=(0,0)):
"""STOMP connect command.
username, password:
These are the needed auth details to connect to the
message server.
After sending this we will receive a CONNECTED
message which will contain our session id.
"""
if len(heartbeats) != 2:
raise ValueError('Invalid heartbeat %r' % heartbeats)
cx, cy = heartbeats
return "CONNECT\naccept-version:1.1\nhost:%s\nheart-beat:%i,%i\nlogin:%s\npasscode:%s\n\n\x00\n" % (host, cx, cy, username, password)
def disconnect(receipt=None):
"""STOMP disconnect command.
Tell the server we finished and we'll be closing the
socket soon.
"""
if not receipt:
receipt = uuid.uuid4()
return "DISCONNECT\nreceipt:%s\n\x00\n" % receipt
def send(dest, msg, transactionid=None, content_type='text/plain'):
"""STOMP send command.
dest:
This is the channel we wish to subscribe to
msg:
This is the message body to be sent.
transactionid:
This is an optional field and is not needed
by default.
"""
transheader = ''
if transactionid:
transheader = 'transaction:%s\n' % transactionid
return "SEND\ndestination:%s\ncontent-type:%s\n%s\n%s\x00\n" % (
dest, content_type, transheader, msg)
def subscribe(dest, idx, ack='auto'):
"""STOMP subscribe command.
dest:
This is the channel we wish to subscribe to
idx:
The ID that should uniquely identify the subscription
ack: 'auto' | 'client'
If the ack is set to client, then messages received will
have to have an acknowledge as a reply. Otherwise the server
will assume delivery failure.
"""
return "SUBSCRIBE\nid:%s\ndestination:%s\nack:%s\n\n\x00\n" % (
idx, dest, ack)
def unsubscribe(idx):
"""STOMP unsubscribe command.
idx:
This is the id of the subscription
Tell the server we no longer wish to receive any
further messages for the given subscription.
"""
return "UNSUBSCRIBE\nid:%s\n\n\x00\n" % idx
class Engine(object):
"""This is a simple state machine to return a response to received
message if needed.
"""
def __init__(self, testing=False):
self.testing = testing
self.log = logging.getLogger("stomper.Engine")
self.sessionId = ''
# Entry Format:
#
# COMMAND : Handler_Function
#
self.states = {
'CONNECTED' : self.connected,
'MESSAGE' : self.ack,
'ERROR' : self.error,
'RECEIPT' : self.receipt,
}
def react(self, msg):
"""Called to provide a response to a message if needed.
msg:
This is a dictionary as returned by unpack_frame(...)
or it can be a straight STOMP message. This function
will attempt to determine which an deal with it.
returned:
A message to return or an empty string.
"""
returned = ""
# If its not a string assume its a dict.
mtype = type(msg)
if mtype in types.StringTypes:
msg = unpack_frame(msg)
elif mtype == types.DictType:
pass
else:
raise FrameError("Unknown message type '%s', I don't know what to do with this!" % mtype)
if self.states.has_key(msg['cmd']):
# print("reacting to message - %s" % msg['cmd'])
returned = self.states[msg['cmd']](msg)
return returned
def connected(self, msg):
"""No response is needed to a connected frame.
This method stores the session id as the
member sessionId for later use.
returned:
NO_RESPONSE_NEEDED
"""
self.sessionId = msg['headers']['session']
#print "connected: session id '%s'." % self.sessionId
return NO_RESPONSE_NEEDED
def ack(self, msg):
"""Called when a MESSAGE has been received.
Override this method to handle received messages.
This function will generate an acknowledge message
for the given message and transaction (if present).
"""
message_id = msg['headers']['message-id']
subscription = msg['headers']['subscription']
transaction_id = None
if msg['headers'].has_key('transaction-id'):
transaction_id = msg['headers']['transaction-id']
# print "acknowledging message id <%s>." % message_id
return ack(message_id, subscription, transaction_id)
def error(self, msg):
"""Called to handle an error message received from the server.
This method just logs the error message
returned:
NO_RESPONSE_NEEDED
"""
body = msg['body'].replace(NULL, '')
brief_msg = ""
if msg['headers'].has_key('message'):
brief_msg = msg['headers']['message']
self.log.error("Received server error - message%s\n\n%s" % (brief_msg, body))
returned = NO_RESPONSE_NEEDED
if self.testing:
returned = 'error'
return returned
def receipt(self, msg):
"""Called to handle a receipt message received from the server.
This method just logs the receipt message
returned:
NO_RESPONSE_NEEDED
"""
body = msg['body'].replace(NULL, '')
brief_msg = ""
if msg['headers'].has_key('receipt-id'):
brief_msg = msg['headers']['receipt-id']
self.log.info("Received server receipt message - receipt-id:%s\n\n%s" % (brief_msg, body))
returned = NO_RESPONSE_NEEDED
if self.testing:
returned = 'receipt'
return returned
"""
This is the unittest to verify my stomper module.
The STOMP protocol specification maybe found here:
* http://stomp.codehaus.org/Protocol
I've looked and the stomp client by Jason R. Briggs and have based the message
generation on how his client did it. The client can be found at the follow
address however it isn't a dependancy.
* http://www.briggs.net.nz/log/projects/stomppy
(c) Oisin Mulvihill, 2007-07-26.
License: http://www.apache.org/licenses/LICENSE-2.0
"""
import pprint
import unittest
import stomper.stomp_10 as stomper
class TestEngine(stomper.Engine):
"""Test that these methods are called by the default engine.
"""
def __init__(self):
super(TestEngine, self).__init__()
self.ackCalled = False
self.errorCalled = False
self.receiptCalled = False
def ack(self, msg):
super(TestEngine, self).ack(msg)
self.ackCalled = True
return 'ack'
def error(self, msg):
super(TestEngine, self).error(msg)
self.errorCalled = True
return 'error'
def receipt(self, msg):
super(TestEngine, self).receipt(msg)
self.receiptCalled = True
return 'receipt'
class Stomper10Test(unittest.TestCase):
def testEngineToServerMessages(self):
"""Test the state machines reaction
"""
e = TestEngine()
# React to a message which should be an ack:
msg = stomper.Frame()
msg.cmd = 'MESSAGE'
msg.headers = {
'destination:': '/queue/a',
'message-id:': 'some-message-id'
}
msg.body = "hello queue a"
rc = e.react(msg.pack())
self.assertEquals(rc, 'ack')
self.assertEquals(e.ackCalled, True)
# React to an error:
error = stomper.Frame()
error.cmd = 'ERROR'
error.headers = {'message:': 'malformed packet received!'}
error.body = """The message:
-----
MESSAGE
destined:/queue/a
Hello queue a!
-----
Did not contain a destination header, which is required for message propagation.
\x00
"""
rc = e.react(error.pack())
self.assertEquals(rc, 'error')
self.assertEquals(e.errorCalled, True)
# React to an receipt:
receipt = stomper.Frame()
receipt.cmd = 'RECEIPT'
receipt.headers = {'receipt-id:': 'message-12345'}
rc = e.react(receipt.pack())
self.assertEquals(rc, 'receipt')
self.assertEquals(e.receiptCalled, True)
def testEngine(self):
"""Test the basic state machine.
"""
e = stomper.Engine(testing=True)
# test session connected message:
msg = """CONNECTED
session:ID:snorky.local-49191-1185461799654-3:18
\x00
"""
result = stomper.unpack_frame(msg)
correct = ''
returned = e.react(result)
self.assertEquals(returned, correct)
# test message:
msg = """MESSAGE
destination: /queue/a
message-id: some-message-id
hello queue a
\x00
"""
returned = e.react(msg)
correct = 'ACK\nmessage-id: some-message-id\n\n\x00\n'
self.assertEquals(returned, correct)
# test error:
msg = """ERROR
message:some error
There was a problem with your last message
\x00
"""
returned = e.react(msg)
correct = 'error'
self.assertEquals(returned, correct)
# test receipt:
msg = """RECEIPT
message-id: some-message-id
\x00
"""
returned = e.react(msg)
correct = 'receipt'
self.assertEquals(returned, correct)
def testFramepack1(self):
"""Testing pack, unpacking and the Frame class.
"""
# Check bad frame generation:
frame = stomper.Frame()
def bad():
frame.cmd = 'SOME UNNOWN CMD'
self.assertRaises(stomper.FrameError, bad)
# Generate a MESSAGE frame:
frame = stomper.Frame()
frame.cmd = 'MESSAGE'
frame.headers['destination'] = '/queue/a'
frame.headers['message-id'] = 'card_data'
frame.body = "hello queue a"
result = frame.pack()
# print "\n-- result " + "----" * 10
# pprint.pprint(result)
# print
# Try bad message unpack catching:
bad_frame = stomper.Frame()
self.assertRaises(stomper.FrameError, bad_frame.unpack, None)
self.assertRaises(stomper.FrameError, bad_frame.unpack, '')
# Try to read the generated frame back in
# and then check the variables are set up
# correctly:
frame2 = stomper.Frame()
frame2.unpack(result)
self.assertEquals(frame2.cmd, 'MESSAGE')
self.assertEquals(frame2.headers['destination'], '/queue/a')
self.assertEquals(frame2.headers['message-id'], 'card_data')
self.assertEquals(frame2.body, 'hello queue a')
result = frame2.pack()
correct = "MESSAGE\ndestination:/queue/a\nmessage-id:card_data\n\nhello queue a\x00\n"
# print "result: "
# pprint.pprint(result)
# print
# print "correct: "
# pprint.pprint(correct)
# print
#
self.assertEquals(result, correct)
result = stomper.unpack_frame(result)
self.assertEquals(result['cmd'], 'MESSAGE')
self.assertEquals(result['headers']['destination'], '/queue/a')
self.assertEquals(result['headers']['message-id'], 'card_data')
self.assertEquals(result['body'], 'hello queue a')
def testFramepack2(self):
"""Testing pack, unpacking and the Frame class.
"""
# Check bad frame generation:
frame = stomper.Frame()
frame.cmd = 'DISCONNECT'
result = frame.pack()
correct = 'DISCONNECT\n\n\x00\n'
self.assertEquals(result, correct)
def testFrameUnpack2(self):
"""Testing unpack frame function against MESSAGE
"""
msg = """MESSAGE
destination:/queue/a
message-id: card_data
hello queue a"""
result = stomper.unpack_frame(msg)
self.assertEquals(result['cmd'], 'MESSAGE')
self.assertEquals(result['headers']['destination'], '/queue/a')
self.assertEquals(result['headers']['message-id'], 'card_data')
self.assertEquals(result['body'], 'hello queue a')
def testFrameUnpack3(self):
"""Testing unpack frame function against CONNECTED
"""
msg = """CONNECTED
session:ID:snorky.local-49191-1185461799654-3:18
"""
result = stomper.unpack_frame(msg)
self.assertEquals(result['cmd'], 'CONNECTED')
self.assertEquals(result['headers']['session'], 'ID:snorky.local-49191-1185461799654-3:18')
self.assertEquals(result['body'], '')
def testBugInFrameUnpack1(self):
msg = """MESSAGE
destination:/queue/a
message-id: card_data
hello queue a
\x00
"""
result = stomper.unpack_frame(msg)
self.assertEquals(result['cmd'], 'MESSAGE')
self.assertEquals(result['headers']['destination'], '/queue/a')
self.assertEquals(result['headers']['message-id'], 'card_data')
self.assertEquals(result['body'], 'hello queue a')
def testCommit(self):
transactionid = '1234'
correct = "COMMIT\ntransaction: %s\n\n\x00\n" % transactionid
self.assertEquals(stomper.commit(transactionid), correct)
def testAbort(self):
transactionid = '1234'
correct = "ABORT\ntransaction: %s\n\n\x00\n" % transactionid
self.assertEquals(stomper.abort(transactionid), correct)
def testBegin(self):
transactionid = '1234'
correct = "BEGIN\ntransaction: %s\n\n\x00\n" % transactionid
self.assertEquals(stomper.begin(transactionid), correct)
def testAck(self):
messageid = '1234'
transactionid = '9876'
header = 'message-id: %s\ntransaction: %s' % (messageid, transactionid)
correct = "ACK\n%s\n\n\x00\n" % header
self.assertEquals(stomper.ack(messageid, transactionid), correct)
messageid = '1234'
correct = "ACK\nmessage-id: %s\n\n\x00\n" % messageid
self.assertEquals(stomper.ack(messageid), correct)
def testUnsubscribe(self):
dest = '/queue/all'
correct = "UNSUBSCRIBE\ndestination:%s\n\n\x00\n" % dest
self.assertEquals(stomper.unsubscribe(dest), correct)
def testSubscribe(self):
dest, ack = '/queue/all', 'client'
correct = "SUBSCRIBE\ndestination: %s\nack: %s\n\n\x00\n" % (dest, ack)
self.assertEquals(stomper.subscribe(dest, ack), correct)
dest, ack = '/queue/all', 'auto'
correct = "SUBSCRIBE\ndestination: %s\nack: %s\n\n\x00\n" % (dest, ack)
self.assertEquals(stomper.subscribe(dest, ack), correct)
correct = "SUBSCRIBE\ndestination: %s\nack: %s\n\n\x00\n" % (dest, ack)
self.assertEquals(stomper.subscribe(dest), correct)
def testConnect(self):
username, password = 'bob', '123'
correct = "CONNECT\nlogin:%s\npasscode:%s\n\n\x00\n" % (username, password)
self.assertEquals(stomper.connect(username, password), correct)
def testDisconnect(self):
correct = "DISCONNECT\n\n\x00\n"
self.assertEquals(stomper.disconnect(), correct)
def testSend(self):
dest, transactionid, msg = '/queue/myplace', '', '123 456 789'
correct = "SEND\ndestination: %s\n\n%s\x00\n" % (dest, msg)
result = stomper.send(dest, msg, transactionid)
# print "result: "
# pprint.pprint(result)
# print
# print "correct: "
# pprint.pprint(correct)
# print
self.assertEquals(result, correct)
dest, transactionid, msg = '/queue/myplace', '987', '123 456 789'
correct = "SEND\ndestination: %s\ntransaction: %s\n\n%s\x00\n" % (dest, transactionid, msg)
self.assertEquals(stomper.send(dest, msg, transactionid), correct)
if __name__ == "__main__":
unittest.main()
"""
This is the unittest to verify my stomper module.
The STOMP protocol specification maybe found here:
* http://stomp.codehaus.org/Protocol
I've looked and the stomp client by Jason R. Briggs and have based the message
generation on how his client did it. The client can be found at the follow
address however it isn't a dependancy.
* http://www.briggs.net.nz/log/projects/stomppy
(c) Oisin Mulvihill, 2007-07-26.
License: http://www.apache.org/licenses/LICENSE-2.0
"""
import pprint
import unittest
import stomper
#.stomp_11 as stomper
class TestEngine(stomper.Engine):
"""Test that these methods are called by the default engine.
"""
def __init__(self):
super(TestEngine, self).__init__()
self.ackCalled = False
self.errorCalled = False
self.receiptCalled = False
def ack(self, msg):
super(TestEngine, self).ack(msg)
self.ackCalled = True
return 'ack'
def error(self, msg):
super(TestEngine, self).error(msg)
self.errorCalled = True
return 'error'
def receipt(self, msg):
super(TestEngine, self).receipt(msg)
self.receiptCalled = True
return 'receipt'
class Stomper11Test(unittest.TestCase):
def testEngineToServerMessages(self):
"""Test the state machines reaction
"""
e = TestEngine()
# React to a message which should be an ack:
msg = stomper.Frame()
msg.cmd = 'MESSAGE'
msg.headers = {
'subscription': 1,
'destination:': '/queue/a',
'message-id:': 'some-message-id',
'content-type': 'text/plain',
}
msg.body = "hello queue a"
rc = e.react(msg.pack())
self.assertEquals(rc, 'ack')
self.assertEquals(e.ackCalled, True)
# React to an error:
error = stomper.Frame()
error.cmd = 'ERROR'
error.headers = {'message:': 'malformed packet received!'}
error.body = """The message:
-----
MESSAGE
destined:/queue/a
Hello queue a!
-----
Did not contain a destination header, which is required for message propagation.
\x00
"""
rc = e.react(error.pack())
self.assertEquals(rc, 'error')
self.assertEquals(e.errorCalled, True)
# React to an receipt:
receipt = stomper.Frame()
receipt.cmd = 'RECEIPT'
receipt.headers = {'receipt-id:': 'message-12345'}
rc = e.react(receipt.pack())
self.assertEquals(rc, 'receipt')
self.assertEquals(e.receiptCalled, True)
def testEngine(self):
"""Test the basic state machine.
"""
e = stomper.Engine(testing=True)
# test session connected message:
msg = """CONNECTED
version:1.1
session:ID:snorky.local-49191-1185461799654-3:18
\x00
"""
result = stomper.unpack_frame(msg)
correct = ''
returned = e.react(result)
self.assertEquals(returned, correct)
# test message:
msg = """MESSAGE
subscription:1
destination:/queue/a
message-id:some-message-id
content-type:text/plain
hello queue a
\x00
"""
returned = e.react(msg)
correct = 'ACK\nsubscription:1\nmessage-id:some-message-id\n\n\x00\n'
self.assertEquals(returned, correct)
# test error:
msg = """ERROR
message:some error
There was a problem with your last message
\x00
"""
returned = e.react(msg)
correct = 'error'
self.assertEquals(returned, correct)
# test receipt:
msg = """RECEIPT
message-id:some-message-id
\x00
"""
returned = e.react(msg)
correct = 'receipt'
self.assertEquals(returned, correct)
def testFramepack1(self):
"""Testing pack, unpacking and the Frame class.
"""
# Check bad frame generation:
frame = stomper.Frame()
def bad():
frame.cmd = 'SOME UNNOWN CMD'
self.assertRaises(stomper.FrameError, bad)
# Generate a MESSAGE frame:
frame = stomper.Frame()
frame.cmd = 'MESSAGE'
frame.headers['destination'] = '/queue/a'
frame.headers['message-id'] = 'card_data'
frame.body = "hello queue a"
result = frame.pack()
# print "\n-- result " + "----" * 10
# pprint.pprint(result)
# print
# Try bad message unpack catching:
bad_frame = stomper.Frame()
self.assertRaises(stomper.FrameError, bad_frame.unpack, None)
self.assertRaises(stomper.FrameError, bad_frame.unpack, '')
# Try to read the generated frame back in
# and then check the variables are set up
# correctly:
frame2 = stomper.Frame()
frame2.unpack(result)
self.assertEquals(frame2.cmd, 'MESSAGE')
self.assertEquals(frame2.headers['destination'], '/queue/a')
self.assertEquals(frame2.headers['message-id'], 'card_data')
self.assertEquals(frame2.body, 'hello queue a')
result = frame2.pack()
correct = "MESSAGE\ndestination:/queue/a\nmessage-id:card_data\n\nhello queue a\x00\n"
self.assertEquals(result, correct)
result = stomper.unpack_frame(result)
self.assertEquals(result['cmd'], 'MESSAGE')
self.assertEquals(result['headers']['destination'], '/queue/a')
self.assertEquals(result['headers']['message-id'], 'card_data')
self.assertEquals(result['body'], 'hello queue a')
def testFramepack2(self):
"""Testing pack, unpacking and the Frame class.
"""
# Check bad frame generation:
frame = stomper.Frame()
frame.cmd = 'DISCONNECT'
result = frame.pack()
correct = 'DISCONNECT\n\n\x00\n'
self.assertEquals(result, correct)
def testFrameUnpack2(self):
"""Testing unpack frame function against MESSAGE
"""
msg = """MESSAGE
destination:/queue/a
message-id:card_data
hello queue a"""
result = stomper.unpack_frame(msg)
self.assertEquals(result['cmd'], 'MESSAGE')
self.assertEquals(result['headers']['destination'], '/queue/a')
self.assertEquals(result['headers']['message-id'], 'card_data')
self.assertEquals(result['body'], 'hello queue a')
def testFrameUnpack3(self):
"""Testing unpack frame function against CONNECTED
"""
msg = """CONNECTED
version:1.1
session:ID:snorky.local-49191-1185461799654-3:18
"""
result = stomper.unpack_frame(msg)
self.assertEquals(result['cmd'], 'CONNECTED')
self.assertEquals(result['headers']['session'], 'ID:snorky.local-49191-1185461799654-3:18')
self.assertEquals(result['body'], '')
def testBugInFrameUnpack1(self):
msg = """MESSAGE
destination:/queue/a
message-id:card_data
hello queue a
\x00
"""
result = stomper.unpack_frame(msg)
self.assertEquals(result['cmd'], 'MESSAGE')
self.assertEquals(result['headers']['destination'], '/queue/a')
self.assertEquals(result['headers']['message-id'], 'card_data')
self.assertEquals(result['body'], 'hello queue a')
def testCommit(self):
transactionid = '1234'
correct = "COMMIT\ntransaction:%s\n\n\x00\n" % transactionid
self.assertEquals(stomper.commit(transactionid), correct)
def testAbort(self):
transactionid = '1234'
correct = "ABORT\ntransaction:%s\n\n\x00\n" % transactionid
self.assertEquals(stomper.abort(transactionid), correct)
def testBegin(self):
transactionid = '1234'
correct = "BEGIN\ntransaction:%s\n\n\x00\n" % transactionid
self.assertEquals(stomper.begin(transactionid), correct)
def testAck(self):
subscription = '1'
messageid = '1234'
transactionid = '9876'
header = 'subscription:%s\nmessage-id:%s\ntransaction:%s' % (
subscription, messageid, transactionid)
correct = "ACK\n%s\n\n\x00\n" % header
actual = stomper.ack(messageid, subscription, transactionid)
self.assertEquals(actual, correct)
subscription = '1'
messageid = '1234'
correct = "ACK\nsubscription:%s\nmessage-id:%s\n\n\x00\n" % (
subscription, messageid)
self.assertEquals(stomper.ack(messageid, subscription), correct)
def testNack(self):
subscription = '1'
messageid = '1234'
transactionid = '9876'
header = 'subscription:%s\nmessage-id:%s\ntransaction:%s' % (
subscription, messageid, transactionid)
correct = "NACK\n%s\n\n\x00\n" % header
actual = stomper.nack(messageid, subscription, transactionid)
self.assertEquals(actual, correct)
subscription = '1'
messageid = '1234'
correct = "NACK\nsubscription:%s\nmessage-id:%s\n\n\x00\n" % (
subscription, messageid)
self.assertEquals(stomper.nack(messageid, subscription), correct)
def testUnsubscribe(self):
subscription = '1'
correct = "UNSUBSCRIBE\nid:%s\n\n\x00\n" % subscription
self.assertEquals(stomper.unsubscribe(subscription), correct)
def testSubscribe(self):
dest, ack = '/queue/all', 'client'
correct = "SUBSCRIBE\nid:0\ndestination:%s\nack:%s\n\n\x00\n" % (dest, ack)
self.assertEquals(stomper.subscribe(dest, 0, ack), correct)
dest, ack = '/queue/all', 'auto'
correct = "SUBSCRIBE\nid:0\ndestination:%s\nack:%s\n\n\x00\n" % (dest, ack)
self.assertEquals(stomper.subscribe(dest, 0, ack), correct)
correct = "SUBSCRIBE\nid:0\ndestination:%s\nack:%s\n\n\x00\n" % (dest, ack)
self.assertEquals(stomper.subscribe(dest, 0), correct)
def testConnect(self):
username, password = 'bob', '123'
correct = "CONNECT\naccept-version:1.1\nhost:localhost\nheart-beat:0,0\nlogin:%s\npasscode:%s\n\n\x00\n" % (username, password)
self.assertEquals(stomper.connect(username, password, 'localhost'), correct)
def testConnectWithHeartbeats(self):
username, password = 'bob', '123'
heartbeats = (1000, 1000)
correct = "CONNECT\naccept-version:1.1\nhost:localhost\nheart-beat:1000,1000\nlogin:%s\npasscode:%s\n\n\x00\n" % (username, password)
self.assertEquals(stomper.connect(username, password, 'localhost', heartbeats=heartbeats), correct)
def testDisconnect(self):
correct = "DISCONNECT\nreceipt:77\n\x00\n"
self.assertEquals(stomper.disconnect(77), correct)
def testSend(self):
dest, transactionid, msg = '/queue/myplace', '', '123 456 789'
correct = "SEND\ndestination:%s\ncontent-type:text/plain\n\n%s\x00\n" % (dest, msg)
result = stomper.send(dest, msg, transactionid)
self.assertEquals(result, correct)
dest, transactionid, msg = '/queue/myplace', '987', '123 456 789'
correct = "SEND\ndestination:%s\ncontent-type:text/plain\ntransaction:%s\n\n%s\x00\n" % (dest, transactionid, msg)
self.assertEquals(stomper.send(dest, msg, transactionid), correct)
if __name__ == "__main__":
unittest.main()
+44
-5

@@ -1,4 +0,4 @@

Metadata-Version: 1.0
Metadata-Version: 1.1
Name: stomper
Version: 0.2.8
Version: 0.3.0
Summary: This is a transport neutral client implementation of the STOMP protocol.

@@ -25,2 +25,3 @@ Home-page: https://github.com/oisinmulvihill/stomper

- Daniele Varrazzo <https://github.com/dvarrazzo>
- Ralph Bean <http://threebean.org>

@@ -36,6 +37,4 @@

messages can be easily generated and parsed, however its up to the user to do
the sending and receiving. The STOMP protocol specification can be found here:
the sending and receiving.
- `Stomp Protocol <http://stomp.codehaus.org/Protocol/>`_
I've looked at the stomp client by Jason R. Briggs. I've based some of the

@@ -83,6 +82,46 @@ 'function to message' generation on how his client does it. The client can

Supported STOMP Versions
------------------------
1.1
~~~
This is the default version of the of STOMP used in stomper versions 0.3.x.
* https://stomp.github.io/stomp-specification-1.1.html
1.0
~~~
This is no longer the default protocol version. To use it you can import it as
follows::
import stomper.stomp_10 as stomper
This is the default version used in stomper version 0.2.x.
* https://stomp.github.io/stomp-specification-1.0.html
Version History
---------------
0.3.0
~~~~~
This release makes STOMP v1.1 the default protocol. To stick with STOMP v1.0
you can continue to use stomper v0.2.9 or change the import in your code to::
import stomper.stomp_10 as stomper
**Note** Any fixes to STOMP v1.0 will only be applied to version >= 0.3.
0.2.9
~~~~~
Thanks to Ralph Bean for contributing the new protocol 1.1 support:
* https://github.com/oisinmulvihill/stomper/issues/6
* https://github.com/oisinmulvihill/stomper/pull/7
0.2.8

@@ -89,0 +128,0 @@ ~~~~~

+4
-1

@@ -6,2 +6,4 @@ MANIFEST.in

lib/stomper/__init__.py
lib/stomper/stomp_10.py
lib/stomper/stomp_11.py
lib/stomper/stompbuffer.py

@@ -21,2 +23,3 @@ lib/stomper/utils.py

lib/stomper/tests/teststompbuffer.py
lib/stomper/tests/teststomper.py
lib/stomper/tests/teststomper_10.py
lib/stomper/tests/teststomper_11.py

@@ -1,508 +0,23 @@

"""
This is a python client implementation of the STOMP protocol.
from stomp_11 import (
Engine,
Frame,
FrameError,
It aims to be transport layer neutral. This module provides functions to
create and parse STOMP messages in a programmatic fashion.
abort,
ack,
nack,
begin,
commit,
connect,
disconnect,
send,
subscribe,
unpack_frame,
unsubscribe,
The examples package contains two examples using twisted as the transport
framework. Other frameworks can be used and I may add other examples as
time goes on.
VALID_COMMANDS,
The STOMP protocol specification maybe found here:
* http://stomp.codehaus.org/Protocol
I've looked at the stomp client by Jason R. Briggs and have based the message
generation on how his client does it. The client can be found at the follow
address however it isn't a dependancy.
* http://www.briggs.net.nz/log/projects/stomppy
In testing this library I run against ActiveMQ project. The server runs
in java, however its fairly standalone and easy to set up. The projects
page is here:
* http://activemq.apache.org/
(c) Oisin Mulvihill, 2007-07-26.
License: http://www.apache.org/licenses/LICENSE-2.0
"""
import re
import uuid
import types
import logging
import utils
import stompbuffer
# This is used as a return from message responses functions.
# It is used more for readability more then anything or reason.
NO_RESPONSE_NEEDED = ''
# For backwards compatibility
NO_REPONSE_NEEDED = ''
# The version of the protocol we implement.
STOMP_VERSION = '1.0'
# Message terminator:
NULL = '\x00'
# STOMP Spec v1.0 valid commands:
VALID_COMMANDS = [
'ABORT', 'ACK', 'BEGIN', 'COMMIT',
'CONNECT', 'CONNECTED', 'DISCONNECT', 'MESSAGE',
'SEND', 'SUBSCRIBE', 'UNSUBSCRIBE',
'RECEIPT', 'ERROR',
]
def get_log():
return logging.getLogger("stomper")
class FrameError(Exception):
"""Raise for problem with frame generation or parsing.
"""
class Frame(object):
"""This class is used to create or read STOMP message frames.
The method pack() is used to create a STOMP message ready
for transmission.
The method unpack() is used to read a STOMP message into
a frame instance. It uses the unpack_frame(...) function
to do the initial parsing.
The frame has three important member variables:
* cmd
* headers
* body
The 'cmd' is a property that represents the STOMP message
command. When you assign this a check is done to make sure
its one of the VALID_COMMANDS. If not then FrameError will
be raised.
The 'headers' is a dictionary which the user can added to
if needed. There are no restrictions or checks imposed on
what values are inserted.
The 'body' is just a member variable that the body text
is assigned to.
"""
def __init__(self):
"""Setup the internal state."""
self._cmd = ''
self.body = ''
self.headers = {}
def getCmd(self):
"""Don't use _cmd directly!"""
return self._cmd
def setCmd(self, cmd):
"""Check the cmd is valid, FrameError will be raised if its not."""
cmd = cmd.upper()
if cmd not in VALID_COMMANDS:
raise FrameError("The cmd '%s' is not valid! It must be one of '%s' (STOMP v%s)." % (
cmd, VALID_COMMANDS, STOMP_VERSION)
)
else:
self._cmd = cmd
cmd = property(getCmd, setCmd)
def pack(self):
"""Called to create a STOMP message from the internal values.
"""
headers = ''.join(
['%s:%s\n' % (f, v) for f, v in sorted(self.headers.items())]
)
stomp_message = "%s\n%s\n%s%s\n" % (self._cmd, headers, self.body, NULL)
# import pprint
# print "stomp_message: ", pprint.pprint(stomp_message)
return stomp_message
def unpack(self, message):
"""Called to extract a STOMP message into this instance.
message:
This is a text string representing a valid
STOMP (v1.0) message.
This method uses unpack_frame(...) to extract the
information, before it is assigned internally.
retuned:
The result of the unpack_frame(...) call.
"""
if not message:
raise FrameError("Unpack error! The given message isn't valid '%s'!" % message)
msg = unpack_frame(message)
self.cmd = msg['cmd']
self.headers = msg['headers']
# Assign directly as the message will have the null
# character in the message already.
self.body = msg['body']
return msg
def unpack_frame(message):
"""Called to unpack a STOMP message into a dictionary.
returned = {
# STOMP Command:
'cmd' : '...',
# Headers e.g.
'headers' : {
'destination' : 'xyz',
'message-id' : 'some event',
:
etc,
}
# Body:
'body' : '...1234...\x00',
}
"""
body = []
returned = dict(cmd='', headers={}, body='')
breakdown = message.split('\n')
# Get the message command:
returned['cmd'] = breakdown[0]
breakdown = breakdown[1:]
def headD(field):
# find the first ':' everything to the left of this is a
# header, everything to the right is data:
index = field.find(':')
if index:
header = field[:index].strip()
data = field[index+1:].strip()
# print "header '%s' data '%s'" % (header, data)
returned['headers'][header.strip()] = data.strip()
def bodyD(field):
field = field.strip()
if field:
body.append(field)
# Recover the header fields and body data
handler = headD
for field in breakdown:
# print "field:", field
if field.strip() == '':
# End of headers, it body data next.
handler = bodyD
continue
handler(field)
# Stich the body data together:
# print "1. body: ", body
body = "".join(body)
returned['body'] = body.replace('\x00', '')
# print "2. body: <%s>" % returned['body']
return returned
def abort(transactionid):
"""STOMP abort transaction command.
Rollback whatever actions in this transaction.
transactionid:
This is the id that all actions in this transaction.
"""
return "ABORT\ntransaction: %s\n\n\x00\n" % transactionid
def ack(messageid, transactionid=None):
"""STOMP acknowledge command.
Acknowledge receipt of a specific message from the server.
messageid:
This is the id of the message we are acknowledging,
what else could it be? ;)
transactionid:
This is the id that all actions in this transaction
will have. If this is not given then a random UUID
will be generated for this.
"""
header = 'message-id: %s' % messageid
if transactionid:
header = 'message-id: %s\ntransaction: %s' % (messageid, transactionid)
return "ACK\n%s\n\n\x00\n" % header
def begin(transactionid=None):
"""STOMP begin command.
Start a transaction...
transactionid:
This is the id that all actions in this transaction
will have. If this is not given then a random UUID
will be generated for this.
"""
if not transactionid:
# Generate a random UUID:
transactionid = uuid.uuid4()
return "BEGIN\ntransaction: %s\n\n\x00\n" % transactionid
def commit(transactionid):
"""STOMP commit command.
Do whatever is required to make the series of actions
permanent for this transactionid.
transactionid:
This is the id that all actions in this transaction.
"""
return "COMMIT\ntransaction: %s\n\n\x00\n" % transactionid
def connect(username, password):
"""STOMP connect command.
username, password:
These are the needed auth details to connect to the
message server.
After sending this we will receive a CONNECTED
message which will contain our session id.
"""
return "CONNECT\nlogin:%s\npasscode:%s\n\n\x00\n" % (username, password)
def disconnect():
"""STOMP disconnect command.
Tell the server we finished and we'll be closing the
socket soon.
"""
return "DISCONNECT\n\n\x00\n"
def send(dest, msg, transactionid=None):
"""STOMP send command.
dest:
This is the channel we wish to subscribe to
msg:
This is the message body to be sent.
transactionid:
This is an optional field and is not needed
by default.
"""
transheader = ''
if transactionid:
transheader = 'transaction: %s\n' % transactionid
return "SEND\ndestination: %s\n%s\n%s\x00\n" % (dest, transheader, msg)
def subscribe(dest, ack='auto'):
"""STOMP subscribe command.
dest:
This is the channel we wish to subscribe to
ack: 'auto' | 'client'
If the ack is set to client, then messages received will
have to have an acknowledge as a reply. Otherwise the server
will assume delivery failure.
"""
return "SUBSCRIBE\ndestination: %s\nack: %s\n\n\x00\n" % (dest, ack)
def unsubscribe(dest):
"""STOMP unsubscribe command.
dest:
This is the channel we wish to subscribe to
Tell the server we no longer wish to receive any
further messages for the given subscription.
"""
return "UNSUBSCRIBE\ndestination:%s\n\n\x00\n" % dest
class Engine(object):
"""This is a simple state machine to return a response to received
message if needed.
"""
def __init__(self, testing=False):
self.testing = testing
self.log = logging.getLogger("stomper.Engine")
self.sessionId = ''
# Entry Format:
#
# COMMAND : Handler_Function
#
self.states = {
'CONNECTED' : self.connected,
'MESSAGE' : self.ack,
'ERROR' : self.error,
'RECEIPT' : self.receipt,
}
def react(self, msg):
"""Called to provide a response to a message if needed.
msg:
This is a dictionary as returned by unpack_frame(...)
or it can be a straight STOMP message. This function
will attempt to determine which an deal with it.
returned:
A message to return or an empty string.
"""
returned = ""
# If its not a string assume its a dict.
mtype = type(msg)
if mtype in types.StringTypes:
msg = unpack_frame(msg)
elif mtype == types.DictType:
pass
else:
raise FrameError("Unknown message type '%s', I don't know what to do with this!" % mtype)
if self.states.has_key(msg['cmd']):
# print("reacting to message - %s" % msg['cmd'])
returned = self.states[msg['cmd']](msg)
return returned
def connected(self, msg):
"""No response is needed to a connected frame.
This method stores the session id as the
member sessionId for later use.
returned:
NO_RESPONSE_NEEDED
"""
self.sessionId = msg['headers']['session']
#print "connected: session id '%s'." % self.sessionId
return NO_RESPONSE_NEEDED
def ack(self, msg):
"""Called when a MESSAGE has been received.
Override this method to handle received messages.
This function will generate an acknowledge message
for the given message and transaction (if present).
"""
message_id = msg['headers']['message-id']
transaction_id = None
if msg['headers'].has_key('transaction-id'):
transaction_id = msg['headers']['transaction-id']
# print "acknowledging message id <%s>." % message_id
return ack(message_id, transaction_id)
def error(self, msg):
"""Called to handle an error message received from the server.
This method just logs the error message
returned:
NO_RESPONSE_NEEDED
"""
body = msg['body'].replace(NULL, '')
brief_msg = ""
if msg['headers'].has_key('message'):
brief_msg = msg['headers']['message']
self.log.error("Received server error - message%s\n\n%s" % (brief_msg, body))
returned = NO_RESPONSE_NEEDED
if self.testing:
returned = 'error'
return returned
def receipt(self, msg):
"""Called to handle a receipt message received from the server.
This method just logs the receipt message
returned:
NO_RESPONSE_NEEDED
"""
body = msg['body'].replace(NULL, '')
brief_msg = ""
if msg['headers'].has_key('receipt-id'):
brief_msg = msg['headers']['receipt-id']
self.log.info("Received server receipt message - receipt-id:%s\n\n%s" % (brief_msg, body))
returned = NO_RESPONSE_NEEDED
if self.testing:
returned = 'receipt'
return returned
NO_RESPONSE_NEEDED,
NO_REPONSE_NEEDED,
NULL,
)

@@ -1,4 +0,4 @@

Metadata-Version: 1.0
Metadata-Version: 1.1
Name: stomper
Version: 0.2.8
Version: 0.3.0
Summary: This is a transport neutral client implementation of the STOMP protocol.

@@ -25,2 +25,3 @@ Home-page: https://github.com/oisinmulvihill/stomper

- Daniele Varrazzo <https://github.com/dvarrazzo>
- Ralph Bean <http://threebean.org>

@@ -36,6 +37,4 @@

messages can be easily generated and parsed, however its up to the user to do
the sending and receiving. The STOMP protocol specification can be found here:
the sending and receiving.
- `Stomp Protocol <http://stomp.codehaus.org/Protocol/>`_
I've looked at the stomp client by Jason R. Briggs. I've based some of the

@@ -83,6 +82,46 @@ 'function to message' generation on how his client does it. The client can

Supported STOMP Versions
------------------------
1.1
~~~
This is the default version of the of STOMP used in stomper versions 0.3.x.
* https://stomp.github.io/stomp-specification-1.1.html
1.0
~~~
This is no longer the default protocol version. To use it you can import it as
follows::
import stomper.stomp_10 as stomper
This is the default version used in stomper version 0.2.x.
* https://stomp.github.io/stomp-specification-1.0.html
Version History
---------------
0.3.0
~~~~~
This release makes STOMP v1.1 the default protocol. To stick with STOMP v1.0
you can continue to use stomper v0.2.9 or change the import in your code to::
import stomper.stomp_10 as stomper
**Note** Any fixes to STOMP v1.0 will only be applied to version >= 0.3.
0.2.9
~~~~~
Thanks to Ralph Bean for contributing the new protocol 1.1 support:
* https://github.com/oisinmulvihill/stomper/issues/6
* https://github.com/oisinmulvihill/stomper/pull/7
0.2.8

@@ -89,0 +128,0 @@ ~~~~~

@@ -17,2 +17,3 @@ =======

- Daniele Varrazzo <https://github.com/dvarrazzo>
- Ralph Bean <http://threebean.org>

@@ -28,6 +29,4 @@

messages can be easily generated and parsed, however its up to the user to do
the sending and receiving. The STOMP protocol specification can be found here:
the sending and receiving.
- `Stomp Protocol <http://stomp.codehaus.org/Protocol/>`_
I've looked at the stomp client by Jason R. Briggs. I've based some of the

@@ -75,6 +74,46 @@ 'function to message' generation on how his client does it. The client can

Supported STOMP Versions
------------------------
1.1
~~~
This is the default version of the of STOMP used in stomper versions 0.3.x.
* https://stomp.github.io/stomp-specification-1.1.html
1.0
~~~
This is no longer the default protocol version. To use it you can import it as
follows::
import stomper.stomp_10 as stomper
This is the default version used in stomper version 0.2.x.
* https://stomp.github.io/stomp-specification-1.0.html
Version History
---------------
0.3.0
~~~~~
This release makes STOMP v1.1 the default protocol. To stick with STOMP v1.0
you can continue to use stomper v0.2.9 or change the import in your code to::
import stomper.stomp_10 as stomper
**Note** Any fixes to STOMP v1.0 will only be applied to version >= 0.3.
0.2.9
~~~~~
Thanks to Ralph Bean for contributing the new protocol 1.1 support:
* https://github.com/oisinmulvihill/stomper/issues/6
* https://github.com/oisinmulvihill/stomper/pull/7
0.2.8

@@ -81,0 +120,0 @@ ~~~~~

@@ -14,3 +14,3 @@ """

ProjectUrl = "https://github.com/oisinmulvihill/stomper"
Version = '0.2.8'
Version = '0.3.0'
Author = 'Oisin Mulvihill'

@@ -17,0 +17,0 @@ AuthorEmail = 'oisin dot mulvihill at gmail com'

"""
This is the unittest to verify my stomper module.
The STOMP protocol specification maybe found here:
* http://stomp.codehaus.org/Protocol
I've looked and the stomp client by Jason R. Briggs and have based the message
generation on how his client did it. The client can be found at the follow
address however it isn't a dependancy.
* http://www.briggs.net.nz/log/projects/stomppy
(c) Oisin Mulvihill, 2007-07-26.
License: http://www.apache.org/licenses/LICENSE-2.0
"""
import pprint
import unittest
import stomper
class TestEngine(stomper.Engine):
"""Test that these methods are called by the default engine.
"""
def __init__(self):
super(TestEngine, self).__init__()
self.ackCalled = False
self.errorCalled = False
self.receiptCalled = False
def ack(self, msg):
super(TestEngine, self).ack(msg)
self.ackCalled = True
return 'ack'
def error(self, msg):
super(TestEngine, self).error(msg)
self.errorCalled = True
return 'error'
def receipt(self, msg):
super(TestEngine, self).receipt(msg)
self.receiptCalled = True
return 'receipt'
class StomperTest(unittest.TestCase):
def testEngineToServerMessages(self):
"""Test the state machines reaction
"""
e = TestEngine()
# React to a message which should be an ack:
msg = stomper.Frame()
msg.cmd = 'MESSAGE'
msg.headers = {
'destination:': '/queue/a',
'message-id:': 'some-message-id'
}
msg.body = "hello queue a"
rc = e.react(msg.pack())
self.assertEquals(rc, 'ack')
self.assertEquals(e.ackCalled, True)
# React to an error:
error = stomper.Frame()
error.cmd = 'ERROR'
error.headers = {'message:': 'malformed packet received!'}
error.body = """The message:
-----
MESSAGE
destined:/queue/a
Hello queue a!
-----
Did not contain a destination header, which is required for message propagation.
\x00
"""
rc = e.react(error.pack())
self.assertEquals(rc, 'error')
self.assertEquals(e.errorCalled, True)
# React to an receipt:
receipt = stomper.Frame()
receipt.cmd = 'RECEIPT'
receipt.headers = {'receipt-id:': 'message-12345'}
rc = e.react(receipt.pack())
self.assertEquals(rc, 'receipt')
self.assertEquals(e.receiptCalled, True)
def testEngine(self):
"""Test the basic state machine.
"""
e = stomper.Engine(testing=True)
# test session connected message:
msg = """CONNECTED
session:ID:snorky.local-49191-1185461799654-3:18
\x00
"""
result = stomper.unpack_frame(msg)
correct = ''
returned = e.react(result)
self.assertEquals(returned, correct)
# test message:
msg = """MESSAGE
destination: /queue/a
message-id: some-message-id
hello queue a
\x00
"""
returned = e.react(msg)
correct = 'ACK\nmessage-id: some-message-id\n\n\x00\n'
self.assertEquals(returned, correct)
# test error:
msg = """ERROR
message:some error
There was a problem with your last message
\x00
"""
returned = e.react(msg)
correct = 'error'
self.assertEquals(returned, correct)
# test receipt:
msg = """RECEIPT
message-id: some-message-id
\x00
"""
returned = e.react(msg)
correct = 'receipt'
self.assertEquals(returned, correct)
def testFramepack1(self):
"""Testing pack, unpacking and the Frame class.
"""
# Check bad frame generation:
frame = stomper.Frame()
def bad():
frame.cmd = 'SOME UNNOWN CMD'
self.assertRaises(stomper.FrameError, bad)
# Generate a MESSAGE frame:
frame = stomper.Frame()
frame.cmd = 'MESSAGE'
frame.headers['destination'] = '/queue/a'
frame.headers['message-id'] = 'card_data'
frame.body = "hello queue a"
result = frame.pack()
# print "\n-- result " + "----" * 10
# pprint.pprint(result)
# print
# Try bad message unpack catching:
bad_frame = stomper.Frame()
self.assertRaises(stomper.FrameError, bad_frame.unpack, None)
self.assertRaises(stomper.FrameError, bad_frame.unpack, '')
# Try to read the generated frame back in
# and then check the variables are set up
# correctly:
frame2 = stomper.Frame()
frame2.unpack(result)
self.assertEquals(frame2.cmd, 'MESSAGE')
self.assertEquals(frame2.headers['destination'], '/queue/a')
self.assertEquals(frame2.headers['message-id'], 'card_data')
self.assertEquals(frame2.body, 'hello queue a')
result = frame2.pack()
correct = "MESSAGE\ndestination:/queue/a\nmessage-id:card_data\n\nhello queue a\x00\n"
# print "result: "
# pprint.pprint(result)
# print
# print "correct: "
# pprint.pprint(correct)
# print
#
self.assertEquals(result, correct)
result = stomper.unpack_frame(result)
self.assertEquals(result['cmd'], 'MESSAGE')
self.assertEquals(result['headers']['destination'], '/queue/a')
self.assertEquals(result['headers']['message-id'], 'card_data')
self.assertEquals(result['body'], 'hello queue a')
def testFramepack2(self):
"""Testing pack, unpacking and the Frame class.
"""
# Check bad frame generation:
frame = stomper.Frame()
frame.cmd = 'DISCONNECT'
result = frame.pack()
correct = 'DISCONNECT\n\n\x00\n'
self.assertEquals(result, correct)
def testFrameUnpack2(self):
"""Testing unpack frame function against MESSAGE
"""
msg = """MESSAGE
destination:/queue/a
message-id: card_data
hello queue a"""
result = stomper.unpack_frame(msg)
self.assertEquals(result['cmd'], 'MESSAGE')
self.assertEquals(result['headers']['destination'], '/queue/a')
self.assertEquals(result['headers']['message-id'], 'card_data')
self.assertEquals(result['body'], 'hello queue a')
def testFrameUnpack3(self):
"""Testing unpack frame function against CONNECTED
"""
msg = """CONNECTED
session:ID:snorky.local-49191-1185461799654-3:18
"""
result = stomper.unpack_frame(msg)
self.assertEquals(result['cmd'], 'CONNECTED')
self.assertEquals(result['headers']['session'], 'ID:snorky.local-49191-1185461799654-3:18')
self.assertEquals(result['body'], '')
def testBugInFrameUnpack1(self):
msg = """MESSAGE
destination:/queue/a
message-id: card_data
hello queue a
\x00
"""
result = stomper.unpack_frame(msg)
self.assertEquals(result['cmd'], 'MESSAGE')
self.assertEquals(result['headers']['destination'], '/queue/a')
self.assertEquals(result['headers']['message-id'], 'card_data')
self.assertEquals(result['body'], 'hello queue a')
def testCommit(self):
transactionid = '1234'
correct = "COMMIT\ntransaction: %s\n\n\x00\n" % transactionid
self.assertEquals(stomper.commit(transactionid), correct)
def testAbort(self):
transactionid = '1234'
correct = "ABORT\ntransaction: %s\n\n\x00\n" % transactionid
self.assertEquals(stomper.abort(transactionid), correct)
def testBegin(self):
transactionid = '1234'
correct = "BEGIN\ntransaction: %s\n\n\x00\n" % transactionid
self.assertEquals(stomper.begin(transactionid), correct)
def testAck(self):
messageid = '1234'
transactionid = '9876'
header = 'message-id: %s\ntransaction: %s' % (messageid, transactionid)
correct = "ACK\n%s\n\n\x00\n" % header
self.assertEquals(stomper.ack(messageid, transactionid), correct)
messageid = '1234'
correct = "ACK\nmessage-id: %s\n\n\x00\n" % messageid
self.assertEquals(stomper.ack(messageid), correct)
def testUnsubscribe(self):
dest = '/queue/all'
correct = "UNSUBSCRIBE\ndestination:%s\n\n\x00\n" % dest
self.assertEquals(stomper.unsubscribe(dest), correct)
def testSubscribe(self):
dest, ack = '/queue/all', 'client'
correct = "SUBSCRIBE\ndestination: %s\nack: %s\n\n\x00\n" % (dest, ack)
self.assertEquals(stomper.subscribe(dest, ack), correct)
dest, ack = '/queue/all', 'auto'
correct = "SUBSCRIBE\ndestination: %s\nack: %s\n\n\x00\n" % (dest, ack)
self.assertEquals(stomper.subscribe(dest, ack), correct)
correct = "SUBSCRIBE\ndestination: %s\nack: %s\n\n\x00\n" % (dest, ack)
self.assertEquals(stomper.subscribe(dest), correct)
def testConnect(self):
username, password = 'bob', '123'
correct = "CONNECT\nlogin:%s\npasscode:%s\n\n\x00\n" % (username, password)
self.assertEquals(stomper.connect(username, password), correct)
def testDisconnect(self):
correct = "DISCONNECT\n\n\x00\n"
self.assertEquals(stomper.disconnect(), correct)
def testSend(self):
dest, transactionid, msg = '/queue/myplace', '', '123 456 789'
correct = "SEND\ndestination: %s\n\n%s\x00\n" % (dest, msg)
result = stomper.send(dest, msg, transactionid)
# print "result: "
# pprint.pprint(result)
# print
# print "correct: "
# pprint.pprint(correct)
# print
self.assertEquals(result, correct)
dest, transactionid, msg = '/queue/myplace', '987', '123 456 789'
correct = "SEND\ndestination: %s\ntransaction: %s\n\n%s\x00\n" % (dest, transactionid, msg)
self.assertEquals(stomper.send(dest, msg, transactionid), correct)
if __name__ == "__main__":
unittest.main()