stomper
Advanced tools
| """ | ||
| This is a python client implementation of the STOMP protocol. | ||
| It aims to be transport layer neutral. This module provides functions to | ||
| create and parse STOMP messages in a programmatic fashion. | ||
| The examples package contains two examples using twisted as the transport | ||
| framework. Other frameworks can be used and I may add other examples as | ||
| time goes on. | ||
| The STOMP protocol specification maybe found here: | ||
| * http://stomp.codehaus.org/Protocol | ||
| I've looked at the stomp client by Jason R. Briggs and have based the message | ||
| generation on how his client does it. The client can be found at the follow | ||
| address however it isn't a dependancy. | ||
| * http://www.briggs.net.nz/log/projects/stomppy | ||
| In testing this library I run against ActiveMQ project. The server runs | ||
| in java, however its fairly standalone and easy to set up. The projects | ||
| page is here: | ||
| * http://activemq.apache.org/ | ||
| (c) Oisin Mulvihill, 2007-07-26. | ||
| License: http://www.apache.org/licenses/LICENSE-2.0 | ||
| """ | ||
| import re | ||
| import uuid | ||
| import types | ||
| import logging | ||
| import utils | ||
| import stompbuffer | ||
| # This is used as a return from message responses functions. | ||
| # It is used more for readability more then anything or reason. | ||
| NO_RESPONSE_NEEDED = '' | ||
| # For backwards compatibility | ||
| NO_REPONSE_NEEDED = '' | ||
| # The version of the protocol we implement. | ||
| STOMP_VERSION = '1.0' | ||
| # Message terminator: | ||
| NULL = '\x00' | ||
| # STOMP Spec v1.0 valid commands: | ||
| VALID_COMMANDS = [ | ||
| 'ABORT', 'ACK', 'BEGIN', 'COMMIT', | ||
| 'CONNECT', 'CONNECTED', 'DISCONNECT', 'MESSAGE', | ||
| 'SEND', 'SUBSCRIBE', 'UNSUBSCRIBE', | ||
| 'RECEIPT', 'ERROR', | ||
| ] | ||
| def get_log(): | ||
| return logging.getLogger("stomper") | ||
| class FrameError(Exception): | ||
| """Raise for problem with frame generation or parsing. | ||
| """ | ||
| class Frame(object): | ||
| """This class is used to create or read STOMP message frames. | ||
| The method pack() is used to create a STOMP message ready | ||
| for transmission. | ||
| The method unpack() is used to read a STOMP message into | ||
| a frame instance. It uses the unpack_frame(...) function | ||
| to do the initial parsing. | ||
| The frame has three important member variables: | ||
| * cmd | ||
| * headers | ||
| * body | ||
| The 'cmd' is a property that represents the STOMP message | ||
| command. When you assign this a check is done to make sure | ||
| its one of the VALID_COMMANDS. If not then FrameError will | ||
| be raised. | ||
| The 'headers' is a dictionary which the user can added to | ||
| if needed. There are no restrictions or checks imposed on | ||
| what values are inserted. | ||
| The 'body' is just a member variable that the body text | ||
| is assigned to. | ||
| """ | ||
| def __init__(self): | ||
| """Setup the internal state.""" | ||
| self._cmd = '' | ||
| self.body = '' | ||
| self.headers = {} | ||
| def getCmd(self): | ||
| """Don't use _cmd directly!""" | ||
| return self._cmd | ||
| def setCmd(self, cmd): | ||
| """Check the cmd is valid, FrameError will be raised if its not.""" | ||
| cmd = cmd.upper() | ||
| if cmd not in VALID_COMMANDS: | ||
| raise FrameError("The cmd '%s' is not valid! It must be one of '%s' (STOMP v%s)." % ( | ||
| cmd, VALID_COMMANDS, STOMP_VERSION) | ||
| ) | ||
| else: | ||
| self._cmd = cmd | ||
| cmd = property(getCmd, setCmd) | ||
| def pack(self): | ||
| """Called to create a STOMP message from the internal values. | ||
| """ | ||
| headers = ''.join( | ||
| ['%s:%s\n' % (f, v) for f, v in sorted(self.headers.items())] | ||
| ) | ||
| stomp_message = "%s\n%s\n%s%s\n" % (self._cmd, headers, self.body, NULL) | ||
| # import pprint | ||
| # print "stomp_message: ", pprint.pprint(stomp_message) | ||
| return stomp_message | ||
| def unpack(self, message): | ||
| """Called to extract a STOMP message into this instance. | ||
| message: | ||
| This is a text string representing a valid | ||
| STOMP (v1.0) message. | ||
| This method uses unpack_frame(...) to extract the | ||
| information, before it is assigned internally. | ||
| retuned: | ||
| The result of the unpack_frame(...) call. | ||
| """ | ||
| if not message: | ||
| raise FrameError("Unpack error! The given message isn't valid '%s'!" % message) | ||
| msg = unpack_frame(message) | ||
| self.cmd = msg['cmd'] | ||
| self.headers = msg['headers'] | ||
| # Assign directly as the message will have the null | ||
| # character in the message already. | ||
| self.body = msg['body'] | ||
| return msg | ||
| def unpack_frame(message): | ||
| """Called to unpack a STOMP message into a dictionary. | ||
| returned = { | ||
| # STOMP Command: | ||
| 'cmd' : '...', | ||
| # Headers e.g. | ||
| 'headers' : { | ||
| 'destination' : 'xyz', | ||
| 'message-id' : 'some event', | ||
| : | ||
| etc, | ||
| } | ||
| # Body: | ||
| 'body' : '...1234...\x00', | ||
| } | ||
| """ | ||
| body = [] | ||
| returned = dict(cmd='', headers={}, body='') | ||
| breakdown = message.split('\n') | ||
| # Get the message command: | ||
| returned['cmd'] = breakdown[0] | ||
| breakdown = breakdown[1:] | ||
| def headD(field): | ||
| # find the first ':' everything to the left of this is a | ||
| # header, everything to the right is data: | ||
| index = field.find(':') | ||
| if index: | ||
| header = field[:index].strip() | ||
| data = field[index+1:].strip() | ||
| # print "header '%s' data '%s'" % (header, data) | ||
| returned['headers'][header.strip()] = data.strip() | ||
| def bodyD(field): | ||
| field = field.strip() | ||
| if field: | ||
| body.append(field) | ||
| # Recover the header fields and body data | ||
| handler = headD | ||
| for field in breakdown: | ||
| # print "field:", field | ||
| if field.strip() == '': | ||
| # End of headers, it body data next. | ||
| handler = bodyD | ||
| continue | ||
| handler(field) | ||
| # Stich the body data together: | ||
| # print "1. body: ", body | ||
| body = "".join(body) | ||
| returned['body'] = body.replace('\x00', '') | ||
| # print "2. body: <%s>" % returned['body'] | ||
| return returned | ||
| def abort(transactionid): | ||
| """STOMP abort transaction command. | ||
| Rollback whatever actions in this transaction. | ||
| transactionid: | ||
| This is the id that all actions in this transaction. | ||
| """ | ||
| return "ABORT\ntransaction: %s\n\n\x00\n" % transactionid | ||
| def ack(messageid, transactionid=None): | ||
| """STOMP acknowledge command. | ||
| Acknowledge receipt of a specific message from the server. | ||
| messageid: | ||
| This is the id of the message we are acknowledging, | ||
| what else could it be? ;) | ||
| transactionid: | ||
| This is the id that all actions in this transaction | ||
| will have. If this is not given then a random UUID | ||
| will be generated for this. | ||
| """ | ||
| header = 'message-id: %s' % messageid | ||
| if transactionid: | ||
| header = 'message-id: %s\ntransaction: %s' % (messageid, transactionid) | ||
| return "ACK\n%s\n\n\x00\n" % header | ||
| def begin(transactionid=None): | ||
| """STOMP begin command. | ||
| Start a transaction... | ||
| transactionid: | ||
| This is the id that all actions in this transaction | ||
| will have. If this is not given then a random UUID | ||
| will be generated for this. | ||
| """ | ||
| if not transactionid: | ||
| # Generate a random UUID: | ||
| transactionid = uuid.uuid4() | ||
| return "BEGIN\ntransaction: %s\n\n\x00\n" % transactionid | ||
| def commit(transactionid): | ||
| """STOMP commit command. | ||
| Do whatever is required to make the series of actions | ||
| permanent for this transactionid. | ||
| transactionid: | ||
| This is the id that all actions in this transaction. | ||
| """ | ||
| return "COMMIT\ntransaction: %s\n\n\x00\n" % transactionid | ||
| def connect(username, password): | ||
| """STOMP connect command. | ||
| username, password: | ||
| These are the needed auth details to connect to the | ||
| message server. | ||
| After sending this we will receive a CONNECTED | ||
| message which will contain our session id. | ||
| """ | ||
| return "CONNECT\nlogin:%s\npasscode:%s\n\n\x00\n" % (username, password) | ||
| def disconnect(): | ||
| """STOMP disconnect command. | ||
| Tell the server we finished and we'll be closing the | ||
| socket soon. | ||
| """ | ||
| return "DISCONNECT\n\n\x00\n" | ||
| def send(dest, msg, transactionid=None): | ||
| """STOMP send command. | ||
| dest: | ||
| This is the channel we wish to subscribe to | ||
| msg: | ||
| This is the message body to be sent. | ||
| transactionid: | ||
| This is an optional field and is not needed | ||
| by default. | ||
| """ | ||
| transheader = '' | ||
| if transactionid: | ||
| transheader = 'transaction: %s\n' % transactionid | ||
| return "SEND\ndestination: %s\n%s\n%s\x00\n" % (dest, transheader, msg) | ||
| def subscribe(dest, ack='auto'): | ||
| """STOMP subscribe command. | ||
| dest: | ||
| This is the channel we wish to subscribe to | ||
| ack: 'auto' | 'client' | ||
| If the ack is set to client, then messages received will | ||
| have to have an acknowledge as a reply. Otherwise the server | ||
| will assume delivery failure. | ||
| """ | ||
| return "SUBSCRIBE\ndestination: %s\nack: %s\n\n\x00\n" % (dest, ack) | ||
| def unsubscribe(dest): | ||
| """STOMP unsubscribe command. | ||
| dest: | ||
| This is the channel we wish to subscribe to | ||
| Tell the server we no longer wish to receive any | ||
| further messages for the given subscription. | ||
| """ | ||
| return "UNSUBSCRIBE\ndestination:%s\n\n\x00\n" % dest | ||
| class Engine(object): | ||
| """This is a simple state machine to return a response to received | ||
| message if needed. | ||
| """ | ||
| def __init__(self, testing=False): | ||
| self.testing = testing | ||
| self.log = logging.getLogger("stomper.Engine") | ||
| self.sessionId = '' | ||
| # Entry Format: | ||
| # | ||
| # COMMAND : Handler_Function | ||
| # | ||
| self.states = { | ||
| 'CONNECTED' : self.connected, | ||
| 'MESSAGE' : self.ack, | ||
| 'ERROR' : self.error, | ||
| 'RECEIPT' : self.receipt, | ||
| } | ||
| def react(self, msg): | ||
| """Called to provide a response to a message if needed. | ||
| msg: | ||
| This is a dictionary as returned by unpack_frame(...) | ||
| or it can be a straight STOMP message. This function | ||
| will attempt to determine which an deal with it. | ||
| returned: | ||
| A message to return or an empty string. | ||
| """ | ||
| returned = "" | ||
| # If its not a string assume its a dict. | ||
| mtype = type(msg) | ||
| if mtype in types.StringTypes: | ||
| msg = unpack_frame(msg) | ||
| elif mtype == types.DictType: | ||
| pass | ||
| else: | ||
| raise FrameError("Unknown message type '%s', I don't know what to do with this!" % mtype) | ||
| if self.states.has_key(msg['cmd']): | ||
| # print("reacting to message - %s" % msg['cmd']) | ||
| returned = self.states[msg['cmd']](msg) | ||
| return returned | ||
| def connected(self, msg): | ||
| """No response is needed to a connected frame. | ||
| This method stores the session id as the | ||
| member sessionId for later use. | ||
| returned: | ||
| NO_RESPONSE_NEEDED | ||
| """ | ||
| self.sessionId = msg['headers']['session'] | ||
| #print "connected: session id '%s'." % self.sessionId | ||
| return NO_RESPONSE_NEEDED | ||
| def ack(self, msg): | ||
| """Called when a MESSAGE has been received. | ||
| Override this method to handle received messages. | ||
| This function will generate an acknowledge message | ||
| for the given message and transaction (if present). | ||
| """ | ||
| message_id = msg['headers']['message-id'] | ||
| transaction_id = None | ||
| if msg['headers'].has_key('transaction-id'): | ||
| transaction_id = msg['headers']['transaction-id'] | ||
| # print "acknowledging message id <%s>." % message_id | ||
| return ack(message_id, transaction_id) | ||
| def error(self, msg): | ||
| """Called to handle an error message received from the server. | ||
| This method just logs the error message | ||
| returned: | ||
| NO_RESPONSE_NEEDED | ||
| """ | ||
| body = msg['body'].replace(NULL, '') | ||
| brief_msg = "" | ||
| if msg['headers'].has_key('message'): | ||
| brief_msg = msg['headers']['message'] | ||
| self.log.error("Received server error - message%s\n\n%s" % (brief_msg, body)) | ||
| returned = NO_RESPONSE_NEEDED | ||
| if self.testing: | ||
| returned = 'error' | ||
| return returned | ||
| def receipt(self, msg): | ||
| """Called to handle a receipt message received from the server. | ||
| This method just logs the receipt message | ||
| returned: | ||
| NO_RESPONSE_NEEDED | ||
| """ | ||
| body = msg['body'].replace(NULL, '') | ||
| brief_msg = "" | ||
| if msg['headers'].has_key('receipt-id'): | ||
| brief_msg = msg['headers']['receipt-id'] | ||
| self.log.info("Received server receipt message - receipt-id:%s\n\n%s" % (brief_msg, body)) | ||
| returned = NO_RESPONSE_NEEDED | ||
| if self.testing: | ||
| returned = 'receipt' | ||
| return returned | ||
| """ | ||
| This is a python client implementation of the STOMP protocol. | ||
| It aims to be transport layer neutral. This module provides functions to | ||
| create and parse STOMP messages in a programmatic fashion. | ||
| The examples package contains two examples using twisted as the transport | ||
| framework. Other frameworks can be used and I may add other examples as | ||
| time goes on. | ||
| The STOMP protocol specification maybe found here: | ||
| * http://stomp.codehaus.org/Protocol | ||
| I've looked at the stomp client by Jason R. Briggs and have based the message | ||
| generation on how his client does it. The client can be found at the follow | ||
| address however it isn't a dependancy. | ||
| * http://www.briggs.net.nz/log/projects/stomppy | ||
| In testing this library I run against ActiveMQ project. The server runs | ||
| in java, however its fairly standalone and easy to set up. The projects | ||
| page is here: | ||
| * http://activemq.apache.org/ | ||
| (c) Oisin Mulvihill, 2007-07-26. | ||
| Ralph Bean, 2014-09-09. | ||
| License: http://www.apache.org/licenses/LICENSE-2.0 | ||
| """ | ||
| import re | ||
| import uuid | ||
| import types | ||
| import logging | ||
| import utils | ||
| import stompbuffer | ||
| # This is used as a return from message responses functions. | ||
| # It is used more for readability more then anything or reason. | ||
| NO_RESPONSE_NEEDED = '' | ||
| # For backwards compatibility | ||
| NO_REPONSE_NEEDED = '' | ||
| # The version of the protocol we implement. | ||
| STOMP_VERSION = '1.1' | ||
| # Message terminator: | ||
| NULL = '\x00' | ||
| # STOMP Spec v1.1 valid commands: | ||
| VALID_COMMANDS = [ | ||
| 'ABORT', 'ACK', 'BEGIN', 'COMMIT', | ||
| 'CONNECT', 'CONNECTED', 'DISCONNECT', 'MESSAGE', | ||
| 'NACK', 'SEND', 'SUBSCRIBE', 'UNSUBSCRIBE', | ||
| 'RECEIPT', 'ERROR', | ||
| ] | ||
| def get_log(): | ||
| return logging.getLogger("stomper") | ||
| class FrameError(Exception): | ||
| """Raise for problem with frame generation or parsing. | ||
| """ | ||
| class Frame(object): | ||
| """This class is used to create or read STOMP message frames. | ||
| The method pack() is used to create a STOMP message ready | ||
| for transmission. | ||
| The method unpack() is used to read a STOMP message into | ||
| a frame instance. It uses the unpack_frame(...) function | ||
| to do the initial parsing. | ||
| The frame has three important member variables: | ||
| * cmd | ||
| * headers | ||
| * body | ||
| The 'cmd' is a property that represents the STOMP message | ||
| command. When you assign this a check is done to make sure | ||
| its one of the VALID_COMMANDS. If not then FrameError will | ||
| be raised. | ||
| The 'headers' is a dictionary which the user can added to | ||
| if needed. There are no restrictions or checks imposed on | ||
| what values are inserted. | ||
| The 'body' is just a member variable that the body text | ||
| is assigned to. | ||
| """ | ||
| def __init__(self): | ||
| """Setup the internal state.""" | ||
| self._cmd = '' | ||
| self.body = '' | ||
| self.headers = {} | ||
| def getCmd(self): | ||
| """Don't use _cmd directly!""" | ||
| return self._cmd | ||
| def setCmd(self, cmd): | ||
| """Check the cmd is valid, FrameError will be raised if its not.""" | ||
| cmd = cmd.upper() | ||
| if cmd not in VALID_COMMANDS: | ||
| raise FrameError("The cmd '%s' is not valid! It must be one of '%s' (STOMP v%s)." % ( | ||
| cmd, VALID_COMMANDS, STOMP_VERSION) | ||
| ) | ||
| else: | ||
| self._cmd = cmd | ||
| cmd = property(getCmd, setCmd) | ||
| def pack(self): | ||
| """Called to create a STOMP message from the internal values. | ||
| """ | ||
| headers = ''.join( | ||
| ['%s:%s\n' % (f, v) for f, v in sorted(self.headers.items())] | ||
| ) | ||
| stomp_message = "%s\n%s\n%s%s\n" % (self._cmd, headers, self.body, NULL) | ||
| return stomp_message | ||
| def unpack(self, message): | ||
| """Called to extract a STOMP message into this instance. | ||
| message: | ||
| This is a text string representing a valid | ||
| STOMP (v1.1) message. | ||
| This method uses unpack_frame(...) to extract the | ||
| information, before it is assigned internally. | ||
| retuned: | ||
| The result of the unpack_frame(...) call. | ||
| """ | ||
| if not message: | ||
| raise FrameError("Unpack error! The given message isn't valid '%s'!" % message) | ||
| msg = unpack_frame(message) | ||
| self.cmd = msg['cmd'] | ||
| self.headers = msg['headers'] | ||
| # Assign directly as the message will have the null | ||
| # character in the message already. | ||
| self.body = msg['body'] | ||
| return msg | ||
| def unpack_frame(message): | ||
| """Called to unpack a STOMP message into a dictionary. | ||
| returned = { | ||
| # STOMP Command: | ||
| 'cmd' : '...', | ||
| # Headers e.g. | ||
| 'headers' : { | ||
| 'destination' : 'xyz', | ||
| 'message-id' : 'some event', | ||
| : | ||
| etc, | ||
| } | ||
| # Body: | ||
| 'body' : '...1234...\x00', | ||
| } | ||
| """ | ||
| body = [] | ||
| returned = dict(cmd='', headers={}, body='') | ||
| breakdown = message.split('\n') | ||
| # Get the message command: | ||
| returned['cmd'] = breakdown[0] | ||
| breakdown = breakdown[1:] | ||
| def headD(field): | ||
| # find the first ':' everything to the left of this is a | ||
| # header, everything to the right is data: | ||
| index = field.find(':') | ||
| if index: | ||
| header = field[:index].strip() | ||
| data = field[index+1:].strip() | ||
| # print "header '%s' data '%s'" % (header, data) | ||
| returned['headers'][header.strip()] = data.strip() | ||
| def bodyD(field): | ||
| field = field.strip() | ||
| if field: | ||
| body.append(field) | ||
| # Recover the header fields and body data | ||
| handler = headD | ||
| for field in breakdown: | ||
| # print "field:", field | ||
| if field.strip() == '': | ||
| # End of headers, it body data next. | ||
| handler = bodyD | ||
| continue | ||
| handler(field) | ||
| # Stich the body data together: | ||
| # print "1. body: ", body | ||
| body = "".join(body) | ||
| returned['body'] = body.replace('\x00', '') | ||
| # print "2. body: <%s>" % returned['body'] | ||
| return returned | ||
| def abort(transactionid): | ||
| """STOMP abort transaction command. | ||
| Rollback whatever actions in this transaction. | ||
| transactionid: | ||
| This is the id that all actions in this transaction. | ||
| """ | ||
| return "ABORT\ntransaction:%s\n\n\x00\n" % transactionid | ||
| def ack(messageid, subscriptionid, transactionid=None): | ||
| """STOMP acknowledge command. | ||
| Acknowledge receipt of a specific message from the server. | ||
| messageid: | ||
| This is the id of the message we are acknowledging, | ||
| what else could it be? ;) | ||
| subscriptionid: | ||
| This is the id of the subscription that applies to the message. | ||
| transactionid: | ||
| This is the id that all actions in this transaction | ||
| will have. If this is not given then a random UUID | ||
| will be generated for this. | ||
| """ | ||
| header = 'subscription:%s\nmessage-id:%s' % (subscriptionid, messageid) | ||
| if transactionid: | ||
| header += '\ntransaction:%s' % transactionid | ||
| return "ACK\n%s\n\n\x00\n" % header | ||
| def nack(messageid, subscriptionid, transactionid=None): | ||
| """STOMP negative acknowledge command. | ||
| NACK is the opposite of ACK. It is used to tell the server that the client | ||
| did not consume the message. The server can then either send the message to | ||
| a different client, discard it, or put it in a dead letter queue. The exact | ||
| behavior is server specific. | ||
| messageid: | ||
| This is the id of the message we are acknowledging, | ||
| what else could it be? ;) | ||
| subscriptionid: | ||
| This is the id of the subscription that applies to the message. | ||
| transactionid: | ||
| This is the id that all actions in this transaction | ||
| will have. If this is not given then a random UUID | ||
| will be generated for this. | ||
| """ | ||
| header = 'subscription:%s\nmessage-id:%s' % (subscriptionid, messageid) | ||
| if transactionid: | ||
| header += '\ntransaction:%s' % transactionid | ||
| return "NACK\n%s\n\n\x00\n" % header | ||
| def begin(transactionid=None): | ||
| """STOMP begin command. | ||
| Start a transaction... | ||
| transactionid: | ||
| This is the id that all actions in this transaction | ||
| will have. If this is not given then a random UUID | ||
| will be generated for this. | ||
| """ | ||
| if not transactionid: | ||
| # Generate a random UUID: | ||
| transactionid = uuid.uuid4() | ||
| return "BEGIN\ntransaction:%s\n\n\x00\n" % transactionid | ||
| def commit(transactionid): | ||
| """STOMP commit command. | ||
| Do whatever is required to make the series of actions | ||
| permanent for this transactionid. | ||
| transactionid: | ||
| This is the id that all actions in this transaction. | ||
| """ | ||
| return "COMMIT\ntransaction:%s\n\n\x00\n" % transactionid | ||
| def connect(username, password, host, heartbeats=(0,0)): | ||
| """STOMP connect command. | ||
| username, password: | ||
| These are the needed auth details to connect to the | ||
| message server. | ||
| After sending this we will receive a CONNECTED | ||
| message which will contain our session id. | ||
| """ | ||
| if len(heartbeats) != 2: | ||
| raise ValueError('Invalid heartbeat %r' % heartbeats) | ||
| cx, cy = heartbeats | ||
| return "CONNECT\naccept-version:1.1\nhost:%s\nheart-beat:%i,%i\nlogin:%s\npasscode:%s\n\n\x00\n" % (host, cx, cy, username, password) | ||
| def disconnect(receipt=None): | ||
| """STOMP disconnect command. | ||
| Tell the server we finished and we'll be closing the | ||
| socket soon. | ||
| """ | ||
| if not receipt: | ||
| receipt = uuid.uuid4() | ||
| return "DISCONNECT\nreceipt:%s\n\x00\n" % receipt | ||
| def send(dest, msg, transactionid=None, content_type='text/plain'): | ||
| """STOMP send command. | ||
| dest: | ||
| This is the channel we wish to subscribe to | ||
| msg: | ||
| This is the message body to be sent. | ||
| transactionid: | ||
| This is an optional field and is not needed | ||
| by default. | ||
| """ | ||
| transheader = '' | ||
| if transactionid: | ||
| transheader = 'transaction:%s\n' % transactionid | ||
| return "SEND\ndestination:%s\ncontent-type:%s\n%s\n%s\x00\n" % ( | ||
| dest, content_type, transheader, msg) | ||
| def subscribe(dest, idx, ack='auto'): | ||
| """STOMP subscribe command. | ||
| dest: | ||
| This is the channel we wish to subscribe to | ||
| idx: | ||
| The ID that should uniquely identify the subscription | ||
| ack: 'auto' | 'client' | ||
| If the ack is set to client, then messages received will | ||
| have to have an acknowledge as a reply. Otherwise the server | ||
| will assume delivery failure. | ||
| """ | ||
| return "SUBSCRIBE\nid:%s\ndestination:%s\nack:%s\n\n\x00\n" % ( | ||
| idx, dest, ack) | ||
| def unsubscribe(idx): | ||
| """STOMP unsubscribe command. | ||
| idx: | ||
| This is the id of the subscription | ||
| Tell the server we no longer wish to receive any | ||
| further messages for the given subscription. | ||
| """ | ||
| return "UNSUBSCRIBE\nid:%s\n\n\x00\n" % idx | ||
| class Engine(object): | ||
| """This is a simple state machine to return a response to received | ||
| message if needed. | ||
| """ | ||
| def __init__(self, testing=False): | ||
| self.testing = testing | ||
| self.log = logging.getLogger("stomper.Engine") | ||
| self.sessionId = '' | ||
| # Entry Format: | ||
| # | ||
| # COMMAND : Handler_Function | ||
| # | ||
| self.states = { | ||
| 'CONNECTED' : self.connected, | ||
| 'MESSAGE' : self.ack, | ||
| 'ERROR' : self.error, | ||
| 'RECEIPT' : self.receipt, | ||
| } | ||
| def react(self, msg): | ||
| """Called to provide a response to a message if needed. | ||
| msg: | ||
| This is a dictionary as returned by unpack_frame(...) | ||
| or it can be a straight STOMP message. This function | ||
| will attempt to determine which an deal with it. | ||
| returned: | ||
| A message to return or an empty string. | ||
| """ | ||
| returned = "" | ||
| # If its not a string assume its a dict. | ||
| mtype = type(msg) | ||
| if mtype in types.StringTypes: | ||
| msg = unpack_frame(msg) | ||
| elif mtype == types.DictType: | ||
| pass | ||
| else: | ||
| raise FrameError("Unknown message type '%s', I don't know what to do with this!" % mtype) | ||
| if self.states.has_key(msg['cmd']): | ||
| # print("reacting to message - %s" % msg['cmd']) | ||
| returned = self.states[msg['cmd']](msg) | ||
| return returned | ||
| def connected(self, msg): | ||
| """No response is needed to a connected frame. | ||
| This method stores the session id as the | ||
| member sessionId for later use. | ||
| returned: | ||
| NO_RESPONSE_NEEDED | ||
| """ | ||
| self.sessionId = msg['headers']['session'] | ||
| #print "connected: session id '%s'." % self.sessionId | ||
| return NO_RESPONSE_NEEDED | ||
| def ack(self, msg): | ||
| """Called when a MESSAGE has been received. | ||
| Override this method to handle received messages. | ||
| This function will generate an acknowledge message | ||
| for the given message and transaction (if present). | ||
| """ | ||
| message_id = msg['headers']['message-id'] | ||
| subscription = msg['headers']['subscription'] | ||
| transaction_id = None | ||
| if msg['headers'].has_key('transaction-id'): | ||
| transaction_id = msg['headers']['transaction-id'] | ||
| # print "acknowledging message id <%s>." % message_id | ||
| return ack(message_id, subscription, transaction_id) | ||
| def error(self, msg): | ||
| """Called to handle an error message received from the server. | ||
| This method just logs the error message | ||
| returned: | ||
| NO_RESPONSE_NEEDED | ||
| """ | ||
| body = msg['body'].replace(NULL, '') | ||
| brief_msg = "" | ||
| if msg['headers'].has_key('message'): | ||
| brief_msg = msg['headers']['message'] | ||
| self.log.error("Received server error - message%s\n\n%s" % (brief_msg, body)) | ||
| returned = NO_RESPONSE_NEEDED | ||
| if self.testing: | ||
| returned = 'error' | ||
| return returned | ||
| def receipt(self, msg): | ||
| """Called to handle a receipt message received from the server. | ||
| This method just logs the receipt message | ||
| returned: | ||
| NO_RESPONSE_NEEDED | ||
| """ | ||
| body = msg['body'].replace(NULL, '') | ||
| brief_msg = "" | ||
| if msg['headers'].has_key('receipt-id'): | ||
| brief_msg = msg['headers']['receipt-id'] | ||
| self.log.info("Received server receipt message - receipt-id:%s\n\n%s" % (brief_msg, body)) | ||
| returned = NO_RESPONSE_NEEDED | ||
| if self.testing: | ||
| returned = 'receipt' | ||
| return returned | ||
| """ | ||
| This is the unittest to verify my stomper module. | ||
| The STOMP protocol specification maybe found here: | ||
| * http://stomp.codehaus.org/Protocol | ||
| I've looked and the stomp client by Jason R. Briggs and have based the message | ||
| generation on how his client did it. The client can be found at the follow | ||
| address however it isn't a dependancy. | ||
| * http://www.briggs.net.nz/log/projects/stomppy | ||
| (c) Oisin Mulvihill, 2007-07-26. | ||
| License: http://www.apache.org/licenses/LICENSE-2.0 | ||
| """ | ||
| import pprint | ||
| import unittest | ||
| import stomper.stomp_10 as stomper | ||
| class TestEngine(stomper.Engine): | ||
| """Test that these methods are called by the default engine. | ||
| """ | ||
| def __init__(self): | ||
| super(TestEngine, self).__init__() | ||
| self.ackCalled = False | ||
| self.errorCalled = False | ||
| self.receiptCalled = False | ||
| def ack(self, msg): | ||
| super(TestEngine, self).ack(msg) | ||
| self.ackCalled = True | ||
| return 'ack' | ||
| def error(self, msg): | ||
| super(TestEngine, self).error(msg) | ||
| self.errorCalled = True | ||
| return 'error' | ||
| def receipt(self, msg): | ||
| super(TestEngine, self).receipt(msg) | ||
| self.receiptCalled = True | ||
| return 'receipt' | ||
| class Stomper10Test(unittest.TestCase): | ||
| def testEngineToServerMessages(self): | ||
| """Test the state machines reaction | ||
| """ | ||
| e = TestEngine() | ||
| # React to a message which should be an ack: | ||
| msg = stomper.Frame() | ||
| msg.cmd = 'MESSAGE' | ||
| msg.headers = { | ||
| 'destination:': '/queue/a', | ||
| 'message-id:': 'some-message-id' | ||
| } | ||
| msg.body = "hello queue a" | ||
| rc = e.react(msg.pack()) | ||
| self.assertEquals(rc, 'ack') | ||
| self.assertEquals(e.ackCalled, True) | ||
| # React to an error: | ||
| error = stomper.Frame() | ||
| error.cmd = 'ERROR' | ||
| error.headers = {'message:': 'malformed packet received!'} | ||
| error.body = """The message: | ||
| ----- | ||
| MESSAGE | ||
| destined:/queue/a | ||
| Hello queue a! | ||
| ----- | ||
| Did not contain a destination header, which is required for message propagation. | ||
| \x00 | ||
| """ | ||
| rc = e.react(error.pack()) | ||
| self.assertEquals(rc, 'error') | ||
| self.assertEquals(e.errorCalled, True) | ||
| # React to an receipt: | ||
| receipt = stomper.Frame() | ||
| receipt.cmd = 'RECEIPT' | ||
| receipt.headers = {'receipt-id:': 'message-12345'} | ||
| rc = e.react(receipt.pack()) | ||
| self.assertEquals(rc, 'receipt') | ||
| self.assertEquals(e.receiptCalled, True) | ||
| def testEngine(self): | ||
| """Test the basic state machine. | ||
| """ | ||
| e = stomper.Engine(testing=True) | ||
| # test session connected message: | ||
| msg = """CONNECTED | ||
| session:ID:snorky.local-49191-1185461799654-3:18 | ||
| \x00 | ||
| """ | ||
| result = stomper.unpack_frame(msg) | ||
| correct = '' | ||
| returned = e.react(result) | ||
| self.assertEquals(returned, correct) | ||
| # test message: | ||
| msg = """MESSAGE | ||
| destination: /queue/a | ||
| message-id: some-message-id | ||
| hello queue a | ||
| \x00 | ||
| """ | ||
| returned = e.react(msg) | ||
| correct = 'ACK\nmessage-id: some-message-id\n\n\x00\n' | ||
| self.assertEquals(returned, correct) | ||
| # test error: | ||
| msg = """ERROR | ||
| message:some error | ||
| There was a problem with your last message | ||
| \x00 | ||
| """ | ||
| returned = e.react(msg) | ||
| correct = 'error' | ||
| self.assertEquals(returned, correct) | ||
| # test receipt: | ||
| msg = """RECEIPT | ||
| message-id: some-message-id | ||
| \x00 | ||
| """ | ||
| returned = e.react(msg) | ||
| correct = 'receipt' | ||
| self.assertEquals(returned, correct) | ||
| def testFramepack1(self): | ||
| """Testing pack, unpacking and the Frame class. | ||
| """ | ||
| # Check bad frame generation: | ||
| frame = stomper.Frame() | ||
| def bad(): | ||
| frame.cmd = 'SOME UNNOWN CMD' | ||
| self.assertRaises(stomper.FrameError, bad) | ||
| # Generate a MESSAGE frame: | ||
| frame = stomper.Frame() | ||
| frame.cmd = 'MESSAGE' | ||
| frame.headers['destination'] = '/queue/a' | ||
| frame.headers['message-id'] = 'card_data' | ||
| frame.body = "hello queue a" | ||
| result = frame.pack() | ||
| # print "\n-- result " + "----" * 10 | ||
| # pprint.pprint(result) | ||
| # Try bad message unpack catching: | ||
| bad_frame = stomper.Frame() | ||
| self.assertRaises(stomper.FrameError, bad_frame.unpack, None) | ||
| self.assertRaises(stomper.FrameError, bad_frame.unpack, '') | ||
| # Try to read the generated frame back in | ||
| # and then check the variables are set up | ||
| # correctly: | ||
| frame2 = stomper.Frame() | ||
| frame2.unpack(result) | ||
| self.assertEquals(frame2.cmd, 'MESSAGE') | ||
| self.assertEquals(frame2.headers['destination'], '/queue/a') | ||
| self.assertEquals(frame2.headers['message-id'], 'card_data') | ||
| self.assertEquals(frame2.body, 'hello queue a') | ||
| result = frame2.pack() | ||
| correct = "MESSAGE\ndestination:/queue/a\nmessage-id:card_data\n\nhello queue a\x00\n" | ||
| # print "result: " | ||
| # pprint.pprint(result) | ||
| # print "correct: " | ||
| # pprint.pprint(correct) | ||
| # | ||
| self.assertEquals(result, correct) | ||
| result = stomper.unpack_frame(result) | ||
| self.assertEquals(result['cmd'], 'MESSAGE') | ||
| self.assertEquals(result['headers']['destination'], '/queue/a') | ||
| self.assertEquals(result['headers']['message-id'], 'card_data') | ||
| self.assertEquals(result['body'], 'hello queue a') | ||
| def testFramepack2(self): | ||
| """Testing pack, unpacking and the Frame class. | ||
| """ | ||
| # Check bad frame generation: | ||
| frame = stomper.Frame() | ||
| frame.cmd = 'DISCONNECT' | ||
| result = frame.pack() | ||
| correct = 'DISCONNECT\n\n\x00\n' | ||
| self.assertEquals(result, correct) | ||
| def testFrameUnpack2(self): | ||
| """Testing unpack frame function against MESSAGE | ||
| """ | ||
| msg = """MESSAGE | ||
| destination:/queue/a | ||
| message-id: card_data | ||
| hello queue a""" | ||
| result = stomper.unpack_frame(msg) | ||
| self.assertEquals(result['cmd'], 'MESSAGE') | ||
| self.assertEquals(result['headers']['destination'], '/queue/a') | ||
| self.assertEquals(result['headers']['message-id'], 'card_data') | ||
| self.assertEquals(result['body'], 'hello queue a') | ||
| def testFrameUnpack3(self): | ||
| """Testing unpack frame function against CONNECTED | ||
| """ | ||
| msg = """CONNECTED | ||
| session:ID:snorky.local-49191-1185461799654-3:18 | ||
| """ | ||
| result = stomper.unpack_frame(msg) | ||
| self.assertEquals(result['cmd'], 'CONNECTED') | ||
| self.assertEquals(result['headers']['session'], 'ID:snorky.local-49191-1185461799654-3:18') | ||
| self.assertEquals(result['body'], '') | ||
| def testBugInFrameUnpack1(self): | ||
| msg = """MESSAGE | ||
| destination:/queue/a | ||
| message-id: card_data | ||
| hello queue a | ||
| \x00 | ||
| """ | ||
| result = stomper.unpack_frame(msg) | ||
| self.assertEquals(result['cmd'], 'MESSAGE') | ||
| self.assertEquals(result['headers']['destination'], '/queue/a') | ||
| self.assertEquals(result['headers']['message-id'], 'card_data') | ||
| self.assertEquals(result['body'], 'hello queue a') | ||
| def testCommit(self): | ||
| transactionid = '1234' | ||
| correct = "COMMIT\ntransaction: %s\n\n\x00\n" % transactionid | ||
| self.assertEquals(stomper.commit(transactionid), correct) | ||
| def testAbort(self): | ||
| transactionid = '1234' | ||
| correct = "ABORT\ntransaction: %s\n\n\x00\n" % transactionid | ||
| self.assertEquals(stomper.abort(transactionid), correct) | ||
| def testBegin(self): | ||
| transactionid = '1234' | ||
| correct = "BEGIN\ntransaction: %s\n\n\x00\n" % transactionid | ||
| self.assertEquals(stomper.begin(transactionid), correct) | ||
| def testAck(self): | ||
| messageid = '1234' | ||
| transactionid = '9876' | ||
| header = 'message-id: %s\ntransaction: %s' % (messageid, transactionid) | ||
| correct = "ACK\n%s\n\n\x00\n" % header | ||
| self.assertEquals(stomper.ack(messageid, transactionid), correct) | ||
| messageid = '1234' | ||
| correct = "ACK\nmessage-id: %s\n\n\x00\n" % messageid | ||
| self.assertEquals(stomper.ack(messageid), correct) | ||
| def testUnsubscribe(self): | ||
| dest = '/queue/all' | ||
| correct = "UNSUBSCRIBE\ndestination:%s\n\n\x00\n" % dest | ||
| self.assertEquals(stomper.unsubscribe(dest), correct) | ||
| def testSubscribe(self): | ||
| dest, ack = '/queue/all', 'client' | ||
| correct = "SUBSCRIBE\ndestination: %s\nack: %s\n\n\x00\n" % (dest, ack) | ||
| self.assertEquals(stomper.subscribe(dest, ack), correct) | ||
| dest, ack = '/queue/all', 'auto' | ||
| correct = "SUBSCRIBE\ndestination: %s\nack: %s\n\n\x00\n" % (dest, ack) | ||
| self.assertEquals(stomper.subscribe(dest, ack), correct) | ||
| correct = "SUBSCRIBE\ndestination: %s\nack: %s\n\n\x00\n" % (dest, ack) | ||
| self.assertEquals(stomper.subscribe(dest), correct) | ||
| def testConnect(self): | ||
| username, password = 'bob', '123' | ||
| correct = "CONNECT\nlogin:%s\npasscode:%s\n\n\x00\n" % (username, password) | ||
| self.assertEquals(stomper.connect(username, password), correct) | ||
| def testDisconnect(self): | ||
| correct = "DISCONNECT\n\n\x00\n" | ||
| self.assertEquals(stomper.disconnect(), correct) | ||
| def testSend(self): | ||
| dest, transactionid, msg = '/queue/myplace', '', '123 456 789' | ||
| correct = "SEND\ndestination: %s\n\n%s\x00\n" % (dest, msg) | ||
| result = stomper.send(dest, msg, transactionid) | ||
| # print "result: " | ||
| # pprint.pprint(result) | ||
| # print "correct: " | ||
| # pprint.pprint(correct) | ||
| self.assertEquals(result, correct) | ||
| dest, transactionid, msg = '/queue/myplace', '987', '123 456 789' | ||
| correct = "SEND\ndestination: %s\ntransaction: %s\n\n%s\x00\n" % (dest, transactionid, msg) | ||
| self.assertEquals(stomper.send(dest, msg, transactionid), correct) | ||
| if __name__ == "__main__": | ||
| unittest.main() |
| """ | ||
| This is the unittest to verify my stomper module. | ||
| The STOMP protocol specification maybe found here: | ||
| * http://stomp.codehaus.org/Protocol | ||
| I've looked and the stomp client by Jason R. Briggs and have based the message | ||
| generation on how his client did it. The client can be found at the follow | ||
| address however it isn't a dependancy. | ||
| * http://www.briggs.net.nz/log/projects/stomppy | ||
| (c) Oisin Mulvihill, 2007-07-26. | ||
| License: http://www.apache.org/licenses/LICENSE-2.0 | ||
| """ | ||
| import pprint | ||
| import unittest | ||
| import stomper | ||
| #.stomp_11 as stomper | ||
| class TestEngine(stomper.Engine): | ||
| """Test that these methods are called by the default engine. | ||
| """ | ||
| def __init__(self): | ||
| super(TestEngine, self).__init__() | ||
| self.ackCalled = False | ||
| self.errorCalled = False | ||
| self.receiptCalled = False | ||
| def ack(self, msg): | ||
| super(TestEngine, self).ack(msg) | ||
| self.ackCalled = True | ||
| return 'ack' | ||
| def error(self, msg): | ||
| super(TestEngine, self).error(msg) | ||
| self.errorCalled = True | ||
| return 'error' | ||
| def receipt(self, msg): | ||
| super(TestEngine, self).receipt(msg) | ||
| self.receiptCalled = True | ||
| return 'receipt' | ||
| class Stomper11Test(unittest.TestCase): | ||
| def testEngineToServerMessages(self): | ||
| """Test the state machines reaction | ||
| """ | ||
| e = TestEngine() | ||
| # React to a message which should be an ack: | ||
| msg = stomper.Frame() | ||
| msg.cmd = 'MESSAGE' | ||
| msg.headers = { | ||
| 'subscription': 1, | ||
| 'destination:': '/queue/a', | ||
| 'message-id:': 'some-message-id', | ||
| 'content-type': 'text/plain', | ||
| } | ||
| msg.body = "hello queue a" | ||
| rc = e.react(msg.pack()) | ||
| self.assertEquals(rc, 'ack') | ||
| self.assertEquals(e.ackCalled, True) | ||
| # React to an error: | ||
| error = stomper.Frame() | ||
| error.cmd = 'ERROR' | ||
| error.headers = {'message:': 'malformed packet received!'} | ||
| error.body = """The message: | ||
| ----- | ||
| MESSAGE | ||
| destined:/queue/a | ||
| Hello queue a! | ||
| ----- | ||
| Did not contain a destination header, which is required for message propagation. | ||
| \x00 | ||
| """ | ||
| rc = e.react(error.pack()) | ||
| self.assertEquals(rc, 'error') | ||
| self.assertEquals(e.errorCalled, True) | ||
| # React to an receipt: | ||
| receipt = stomper.Frame() | ||
| receipt.cmd = 'RECEIPT' | ||
| receipt.headers = {'receipt-id:': 'message-12345'} | ||
| rc = e.react(receipt.pack()) | ||
| self.assertEquals(rc, 'receipt') | ||
| self.assertEquals(e.receiptCalled, True) | ||
| def testEngine(self): | ||
| """Test the basic state machine. | ||
| """ | ||
| e = stomper.Engine(testing=True) | ||
| # test session connected message: | ||
| msg = """CONNECTED | ||
| version:1.1 | ||
| session:ID:snorky.local-49191-1185461799654-3:18 | ||
| \x00 | ||
| """ | ||
| result = stomper.unpack_frame(msg) | ||
| correct = '' | ||
| returned = e.react(result) | ||
| self.assertEquals(returned, correct) | ||
| # test message: | ||
| msg = """MESSAGE | ||
| subscription:1 | ||
| destination:/queue/a | ||
| message-id:some-message-id | ||
| content-type:text/plain | ||
| hello queue a | ||
| \x00 | ||
| """ | ||
| returned = e.react(msg) | ||
| correct = 'ACK\nsubscription:1\nmessage-id:some-message-id\n\n\x00\n' | ||
| self.assertEquals(returned, correct) | ||
| # test error: | ||
| msg = """ERROR | ||
| message:some error | ||
| There was a problem with your last message | ||
| \x00 | ||
| """ | ||
| returned = e.react(msg) | ||
| correct = 'error' | ||
| self.assertEquals(returned, correct) | ||
| # test receipt: | ||
| msg = """RECEIPT | ||
| message-id:some-message-id | ||
| \x00 | ||
| """ | ||
| returned = e.react(msg) | ||
| correct = 'receipt' | ||
| self.assertEquals(returned, correct) | ||
| def testFramepack1(self): | ||
| """Testing pack, unpacking and the Frame class. | ||
| """ | ||
| # Check bad frame generation: | ||
| frame = stomper.Frame() | ||
| def bad(): | ||
| frame.cmd = 'SOME UNNOWN CMD' | ||
| self.assertRaises(stomper.FrameError, bad) | ||
| # Generate a MESSAGE frame: | ||
| frame = stomper.Frame() | ||
| frame.cmd = 'MESSAGE' | ||
| frame.headers['destination'] = '/queue/a' | ||
| frame.headers['message-id'] = 'card_data' | ||
| frame.body = "hello queue a" | ||
| result = frame.pack() | ||
| # print "\n-- result " + "----" * 10 | ||
| # pprint.pprint(result) | ||
| # Try bad message unpack catching: | ||
| bad_frame = stomper.Frame() | ||
| self.assertRaises(stomper.FrameError, bad_frame.unpack, None) | ||
| self.assertRaises(stomper.FrameError, bad_frame.unpack, '') | ||
| # Try to read the generated frame back in | ||
| # and then check the variables are set up | ||
| # correctly: | ||
| frame2 = stomper.Frame() | ||
| frame2.unpack(result) | ||
| self.assertEquals(frame2.cmd, 'MESSAGE') | ||
| self.assertEquals(frame2.headers['destination'], '/queue/a') | ||
| self.assertEquals(frame2.headers['message-id'], 'card_data') | ||
| self.assertEquals(frame2.body, 'hello queue a') | ||
| result = frame2.pack() | ||
| correct = "MESSAGE\ndestination:/queue/a\nmessage-id:card_data\n\nhello queue a\x00\n" | ||
| self.assertEquals(result, correct) | ||
| result = stomper.unpack_frame(result) | ||
| self.assertEquals(result['cmd'], 'MESSAGE') | ||
| self.assertEquals(result['headers']['destination'], '/queue/a') | ||
| self.assertEquals(result['headers']['message-id'], 'card_data') | ||
| self.assertEquals(result['body'], 'hello queue a') | ||
| def testFramepack2(self): | ||
| """Testing pack, unpacking and the Frame class. | ||
| """ | ||
| # Check bad frame generation: | ||
| frame = stomper.Frame() | ||
| frame.cmd = 'DISCONNECT' | ||
| result = frame.pack() | ||
| correct = 'DISCONNECT\n\n\x00\n' | ||
| self.assertEquals(result, correct) | ||
| def testFrameUnpack2(self): | ||
| """Testing unpack frame function against MESSAGE | ||
| """ | ||
| msg = """MESSAGE | ||
| destination:/queue/a | ||
| message-id:card_data | ||
| hello queue a""" | ||
| result = stomper.unpack_frame(msg) | ||
| self.assertEquals(result['cmd'], 'MESSAGE') | ||
| self.assertEquals(result['headers']['destination'], '/queue/a') | ||
| self.assertEquals(result['headers']['message-id'], 'card_data') | ||
| self.assertEquals(result['body'], 'hello queue a') | ||
| def testFrameUnpack3(self): | ||
| """Testing unpack frame function against CONNECTED | ||
| """ | ||
| msg = """CONNECTED | ||
| version:1.1 | ||
| session:ID:snorky.local-49191-1185461799654-3:18 | ||
| """ | ||
| result = stomper.unpack_frame(msg) | ||
| self.assertEquals(result['cmd'], 'CONNECTED') | ||
| self.assertEquals(result['headers']['session'], 'ID:snorky.local-49191-1185461799654-3:18') | ||
| self.assertEquals(result['body'], '') | ||
| def testBugInFrameUnpack1(self): | ||
| msg = """MESSAGE | ||
| destination:/queue/a | ||
| message-id:card_data | ||
| hello queue a | ||
| \x00 | ||
| """ | ||
| result = stomper.unpack_frame(msg) | ||
| self.assertEquals(result['cmd'], 'MESSAGE') | ||
| self.assertEquals(result['headers']['destination'], '/queue/a') | ||
| self.assertEquals(result['headers']['message-id'], 'card_data') | ||
| self.assertEquals(result['body'], 'hello queue a') | ||
| def testCommit(self): | ||
| transactionid = '1234' | ||
| correct = "COMMIT\ntransaction:%s\n\n\x00\n" % transactionid | ||
| self.assertEquals(stomper.commit(transactionid), correct) | ||
| def testAbort(self): | ||
| transactionid = '1234' | ||
| correct = "ABORT\ntransaction:%s\n\n\x00\n" % transactionid | ||
| self.assertEquals(stomper.abort(transactionid), correct) | ||
| def testBegin(self): | ||
| transactionid = '1234' | ||
| correct = "BEGIN\ntransaction:%s\n\n\x00\n" % transactionid | ||
| self.assertEquals(stomper.begin(transactionid), correct) | ||
| def testAck(self): | ||
| subscription = '1' | ||
| messageid = '1234' | ||
| transactionid = '9876' | ||
| header = 'subscription:%s\nmessage-id:%s\ntransaction:%s' % ( | ||
| subscription, messageid, transactionid) | ||
| correct = "ACK\n%s\n\n\x00\n" % header | ||
| actual = stomper.ack(messageid, subscription, transactionid) | ||
| self.assertEquals(actual, correct) | ||
| subscription = '1' | ||
| messageid = '1234' | ||
| correct = "ACK\nsubscription:%s\nmessage-id:%s\n\n\x00\n" % ( | ||
| subscription, messageid) | ||
| self.assertEquals(stomper.ack(messageid, subscription), correct) | ||
| def testNack(self): | ||
| subscription = '1' | ||
| messageid = '1234' | ||
| transactionid = '9876' | ||
| header = 'subscription:%s\nmessage-id:%s\ntransaction:%s' % ( | ||
| subscription, messageid, transactionid) | ||
| correct = "NACK\n%s\n\n\x00\n" % header | ||
| actual = stomper.nack(messageid, subscription, transactionid) | ||
| self.assertEquals(actual, correct) | ||
| subscription = '1' | ||
| messageid = '1234' | ||
| correct = "NACK\nsubscription:%s\nmessage-id:%s\n\n\x00\n" % ( | ||
| subscription, messageid) | ||
| self.assertEquals(stomper.nack(messageid, subscription), correct) | ||
| def testUnsubscribe(self): | ||
| subscription = '1' | ||
| correct = "UNSUBSCRIBE\nid:%s\n\n\x00\n" % subscription | ||
| self.assertEquals(stomper.unsubscribe(subscription), correct) | ||
| def testSubscribe(self): | ||
| dest, ack = '/queue/all', 'client' | ||
| correct = "SUBSCRIBE\nid:0\ndestination:%s\nack:%s\n\n\x00\n" % (dest, ack) | ||
| self.assertEquals(stomper.subscribe(dest, 0, ack), correct) | ||
| dest, ack = '/queue/all', 'auto' | ||
| correct = "SUBSCRIBE\nid:0\ndestination:%s\nack:%s\n\n\x00\n" % (dest, ack) | ||
| self.assertEquals(stomper.subscribe(dest, 0, ack), correct) | ||
| correct = "SUBSCRIBE\nid:0\ndestination:%s\nack:%s\n\n\x00\n" % (dest, ack) | ||
| self.assertEquals(stomper.subscribe(dest, 0), correct) | ||
| def testConnect(self): | ||
| username, password = 'bob', '123' | ||
| correct = "CONNECT\naccept-version:1.1\nhost:localhost\nheart-beat:0,0\nlogin:%s\npasscode:%s\n\n\x00\n" % (username, password) | ||
| self.assertEquals(stomper.connect(username, password, 'localhost'), correct) | ||
| def testConnectWithHeartbeats(self): | ||
| username, password = 'bob', '123' | ||
| heartbeats = (1000, 1000) | ||
| correct = "CONNECT\naccept-version:1.1\nhost:localhost\nheart-beat:1000,1000\nlogin:%s\npasscode:%s\n\n\x00\n" % (username, password) | ||
| self.assertEquals(stomper.connect(username, password, 'localhost', heartbeats=heartbeats), correct) | ||
| def testDisconnect(self): | ||
| correct = "DISCONNECT\nreceipt:77\n\x00\n" | ||
| self.assertEquals(stomper.disconnect(77), correct) | ||
| def testSend(self): | ||
| dest, transactionid, msg = '/queue/myplace', '', '123 456 789' | ||
| correct = "SEND\ndestination:%s\ncontent-type:text/plain\n\n%s\x00\n" % (dest, msg) | ||
| result = stomper.send(dest, msg, transactionid) | ||
| self.assertEquals(result, correct) | ||
| dest, transactionid, msg = '/queue/myplace', '987', '123 456 789' | ||
| correct = "SEND\ndestination:%s\ncontent-type:text/plain\ntransaction:%s\n\n%s\x00\n" % (dest, transactionid, msg) | ||
| self.assertEquals(stomper.send(dest, msg, transactionid), correct) | ||
| if __name__ == "__main__": | ||
| unittest.main() |
@@ -1,4 +0,4 @@ | ||
| Metadata-Version: 1.0 | ||
| Metadata-Version: 1.1 | ||
| Name: stomper | ||
| Version: 0.2.8 | ||
| Version: 0.3.0 | ||
| Summary: This is a transport neutral client implementation of the STOMP protocol. | ||
@@ -25,2 +25,3 @@ Home-page: https://github.com/oisinmulvihill/stomper | ||
| - Daniele Varrazzo <https://github.com/dvarrazzo> | ||
| - Ralph Bean <http://threebean.org> | ||
@@ -36,6 +37,4 @@ | ||
| messages can be easily generated and parsed, however its up to the user to do | ||
| the sending and receiving. The STOMP protocol specification can be found here: | ||
| the sending and receiving. | ||
| - `Stomp Protocol <http://stomp.codehaus.org/Protocol/>`_ | ||
| I've looked at the stomp client by Jason R. Briggs. I've based some of the | ||
@@ -83,6 +82,46 @@ 'function to message' generation on how his client does it. The client can | ||
| Supported STOMP Versions | ||
| ------------------------ | ||
| 1.1 | ||
| ~~~ | ||
| This is the default version of the of STOMP used in stomper versions 0.3.x. | ||
| * https://stomp.github.io/stomp-specification-1.1.html | ||
| 1.0 | ||
| ~~~ | ||
| This is no longer the default protocol version. To use it you can import it as | ||
| follows:: | ||
| import stomper.stomp_10 as stomper | ||
| This is the default version used in stomper version 0.2.x. | ||
| * https://stomp.github.io/stomp-specification-1.0.html | ||
| Version History | ||
| --------------- | ||
| 0.3.0 | ||
| ~~~~~ | ||
| This release makes STOMP v1.1 the default protocol. To stick with STOMP v1.0 | ||
| you can continue to use stomper v0.2.9 or change the import in your code to:: | ||
| import stomper.stomp_10 as stomper | ||
| **Note** Any fixes to STOMP v1.0 will only be applied to version >= 0.3. | ||
| 0.2.9 | ||
| ~~~~~ | ||
| Thanks to Ralph Bean for contributing the new protocol 1.1 support: | ||
| * https://github.com/oisinmulvihill/stomper/issues/6 | ||
| * https://github.com/oisinmulvihill/stomper/pull/7 | ||
| 0.2.8 | ||
@@ -89,0 +128,0 @@ ~~~~~ |
@@ -6,2 +6,4 @@ MANIFEST.in | ||
| lib/stomper/__init__.py | ||
| lib/stomper/stomp_10.py | ||
| lib/stomper/stomp_11.py | ||
| lib/stomper/stompbuffer.py | ||
@@ -21,2 +23,3 @@ lib/stomper/utils.py | ||
| lib/stomper/tests/teststompbuffer.py | ||
| lib/stomper/tests/teststomper.py | ||
| lib/stomper/tests/teststomper_10.py | ||
| lib/stomper/tests/teststomper_11.py |
+20
-505
@@ -1,508 +0,23 @@ | ||
| """ | ||
| This is a python client implementation of the STOMP protocol. | ||
| from stomp_11 import ( | ||
| Engine, | ||
| Frame, | ||
| FrameError, | ||
| It aims to be transport layer neutral. This module provides functions to | ||
| create and parse STOMP messages in a programmatic fashion. | ||
| abort, | ||
| ack, | ||
| nack, | ||
| begin, | ||
| commit, | ||
| connect, | ||
| disconnect, | ||
| send, | ||
| subscribe, | ||
| unpack_frame, | ||
| unsubscribe, | ||
| The examples package contains two examples using twisted as the transport | ||
| framework. Other frameworks can be used and I may add other examples as | ||
| time goes on. | ||
| VALID_COMMANDS, | ||
| The STOMP protocol specification maybe found here: | ||
| * http://stomp.codehaus.org/Protocol | ||
| I've looked at the stomp client by Jason R. Briggs and have based the message | ||
| generation on how his client does it. The client can be found at the follow | ||
| address however it isn't a dependancy. | ||
| * http://www.briggs.net.nz/log/projects/stomppy | ||
| In testing this library I run against ActiveMQ project. The server runs | ||
| in java, however its fairly standalone and easy to set up. The projects | ||
| page is here: | ||
| * http://activemq.apache.org/ | ||
| (c) Oisin Mulvihill, 2007-07-26. | ||
| License: http://www.apache.org/licenses/LICENSE-2.0 | ||
| """ | ||
| import re | ||
| import uuid | ||
| import types | ||
| import logging | ||
| import utils | ||
| import stompbuffer | ||
| # This is used as a return from message responses functions. | ||
| # It is used more for readability more then anything or reason. | ||
| NO_RESPONSE_NEEDED = '' | ||
| # For backwards compatibility | ||
| NO_REPONSE_NEEDED = '' | ||
| # The version of the protocol we implement. | ||
| STOMP_VERSION = '1.0' | ||
| # Message terminator: | ||
| NULL = '\x00' | ||
| # STOMP Spec v1.0 valid commands: | ||
| VALID_COMMANDS = [ | ||
| 'ABORT', 'ACK', 'BEGIN', 'COMMIT', | ||
| 'CONNECT', 'CONNECTED', 'DISCONNECT', 'MESSAGE', | ||
| 'SEND', 'SUBSCRIBE', 'UNSUBSCRIBE', | ||
| 'RECEIPT', 'ERROR', | ||
| ] | ||
| def get_log(): | ||
| return logging.getLogger("stomper") | ||
| class FrameError(Exception): | ||
| """Raise for problem with frame generation or parsing. | ||
| """ | ||
| class Frame(object): | ||
| """This class is used to create or read STOMP message frames. | ||
| The method pack() is used to create a STOMP message ready | ||
| for transmission. | ||
| The method unpack() is used to read a STOMP message into | ||
| a frame instance. It uses the unpack_frame(...) function | ||
| to do the initial parsing. | ||
| The frame has three important member variables: | ||
| * cmd | ||
| * headers | ||
| * body | ||
| The 'cmd' is a property that represents the STOMP message | ||
| command. When you assign this a check is done to make sure | ||
| its one of the VALID_COMMANDS. If not then FrameError will | ||
| be raised. | ||
| The 'headers' is a dictionary which the user can added to | ||
| if needed. There are no restrictions or checks imposed on | ||
| what values are inserted. | ||
| The 'body' is just a member variable that the body text | ||
| is assigned to. | ||
| """ | ||
| def __init__(self): | ||
| """Setup the internal state.""" | ||
| self._cmd = '' | ||
| self.body = '' | ||
| self.headers = {} | ||
| def getCmd(self): | ||
| """Don't use _cmd directly!""" | ||
| return self._cmd | ||
| def setCmd(self, cmd): | ||
| """Check the cmd is valid, FrameError will be raised if its not.""" | ||
| cmd = cmd.upper() | ||
| if cmd not in VALID_COMMANDS: | ||
| raise FrameError("The cmd '%s' is not valid! It must be one of '%s' (STOMP v%s)." % ( | ||
| cmd, VALID_COMMANDS, STOMP_VERSION) | ||
| ) | ||
| else: | ||
| self._cmd = cmd | ||
| cmd = property(getCmd, setCmd) | ||
| def pack(self): | ||
| """Called to create a STOMP message from the internal values. | ||
| """ | ||
| headers = ''.join( | ||
| ['%s:%s\n' % (f, v) for f, v in sorted(self.headers.items())] | ||
| ) | ||
| stomp_message = "%s\n%s\n%s%s\n" % (self._cmd, headers, self.body, NULL) | ||
| # import pprint | ||
| # print "stomp_message: ", pprint.pprint(stomp_message) | ||
| return stomp_message | ||
| def unpack(self, message): | ||
| """Called to extract a STOMP message into this instance. | ||
| message: | ||
| This is a text string representing a valid | ||
| STOMP (v1.0) message. | ||
| This method uses unpack_frame(...) to extract the | ||
| information, before it is assigned internally. | ||
| retuned: | ||
| The result of the unpack_frame(...) call. | ||
| """ | ||
| if not message: | ||
| raise FrameError("Unpack error! The given message isn't valid '%s'!" % message) | ||
| msg = unpack_frame(message) | ||
| self.cmd = msg['cmd'] | ||
| self.headers = msg['headers'] | ||
| # Assign directly as the message will have the null | ||
| # character in the message already. | ||
| self.body = msg['body'] | ||
| return msg | ||
| def unpack_frame(message): | ||
| """Called to unpack a STOMP message into a dictionary. | ||
| returned = { | ||
| # STOMP Command: | ||
| 'cmd' : '...', | ||
| # Headers e.g. | ||
| 'headers' : { | ||
| 'destination' : 'xyz', | ||
| 'message-id' : 'some event', | ||
| : | ||
| etc, | ||
| } | ||
| # Body: | ||
| 'body' : '...1234...\x00', | ||
| } | ||
| """ | ||
| body = [] | ||
| returned = dict(cmd='', headers={}, body='') | ||
| breakdown = message.split('\n') | ||
| # Get the message command: | ||
| returned['cmd'] = breakdown[0] | ||
| breakdown = breakdown[1:] | ||
| def headD(field): | ||
| # find the first ':' everything to the left of this is a | ||
| # header, everything to the right is data: | ||
| index = field.find(':') | ||
| if index: | ||
| header = field[:index].strip() | ||
| data = field[index+1:].strip() | ||
| # print "header '%s' data '%s'" % (header, data) | ||
| returned['headers'][header.strip()] = data.strip() | ||
| def bodyD(field): | ||
| field = field.strip() | ||
| if field: | ||
| body.append(field) | ||
| # Recover the header fields and body data | ||
| handler = headD | ||
| for field in breakdown: | ||
| # print "field:", field | ||
| if field.strip() == '': | ||
| # End of headers, it body data next. | ||
| handler = bodyD | ||
| continue | ||
| handler(field) | ||
| # Stich the body data together: | ||
| # print "1. body: ", body | ||
| body = "".join(body) | ||
| returned['body'] = body.replace('\x00', '') | ||
| # print "2. body: <%s>" % returned['body'] | ||
| return returned | ||
| def abort(transactionid): | ||
| """STOMP abort transaction command. | ||
| Rollback whatever actions in this transaction. | ||
| transactionid: | ||
| This is the id that all actions in this transaction. | ||
| """ | ||
| return "ABORT\ntransaction: %s\n\n\x00\n" % transactionid | ||
| def ack(messageid, transactionid=None): | ||
| """STOMP acknowledge command. | ||
| Acknowledge receipt of a specific message from the server. | ||
| messageid: | ||
| This is the id of the message we are acknowledging, | ||
| what else could it be? ;) | ||
| transactionid: | ||
| This is the id that all actions in this transaction | ||
| will have. If this is not given then a random UUID | ||
| will be generated for this. | ||
| """ | ||
| header = 'message-id: %s' % messageid | ||
| if transactionid: | ||
| header = 'message-id: %s\ntransaction: %s' % (messageid, transactionid) | ||
| return "ACK\n%s\n\n\x00\n" % header | ||
| def begin(transactionid=None): | ||
| """STOMP begin command. | ||
| Start a transaction... | ||
| transactionid: | ||
| This is the id that all actions in this transaction | ||
| will have. If this is not given then a random UUID | ||
| will be generated for this. | ||
| """ | ||
| if not transactionid: | ||
| # Generate a random UUID: | ||
| transactionid = uuid.uuid4() | ||
| return "BEGIN\ntransaction: %s\n\n\x00\n" % transactionid | ||
| def commit(transactionid): | ||
| """STOMP commit command. | ||
| Do whatever is required to make the series of actions | ||
| permanent for this transactionid. | ||
| transactionid: | ||
| This is the id that all actions in this transaction. | ||
| """ | ||
| return "COMMIT\ntransaction: %s\n\n\x00\n" % transactionid | ||
| def connect(username, password): | ||
| """STOMP connect command. | ||
| username, password: | ||
| These are the needed auth details to connect to the | ||
| message server. | ||
| After sending this we will receive a CONNECTED | ||
| message which will contain our session id. | ||
| """ | ||
| return "CONNECT\nlogin:%s\npasscode:%s\n\n\x00\n" % (username, password) | ||
| def disconnect(): | ||
| """STOMP disconnect command. | ||
| Tell the server we finished and we'll be closing the | ||
| socket soon. | ||
| """ | ||
| return "DISCONNECT\n\n\x00\n" | ||
| def send(dest, msg, transactionid=None): | ||
| """STOMP send command. | ||
| dest: | ||
| This is the channel we wish to subscribe to | ||
| msg: | ||
| This is the message body to be sent. | ||
| transactionid: | ||
| This is an optional field and is not needed | ||
| by default. | ||
| """ | ||
| transheader = '' | ||
| if transactionid: | ||
| transheader = 'transaction: %s\n' % transactionid | ||
| return "SEND\ndestination: %s\n%s\n%s\x00\n" % (dest, transheader, msg) | ||
| def subscribe(dest, ack='auto'): | ||
| """STOMP subscribe command. | ||
| dest: | ||
| This is the channel we wish to subscribe to | ||
| ack: 'auto' | 'client' | ||
| If the ack is set to client, then messages received will | ||
| have to have an acknowledge as a reply. Otherwise the server | ||
| will assume delivery failure. | ||
| """ | ||
| return "SUBSCRIBE\ndestination: %s\nack: %s\n\n\x00\n" % (dest, ack) | ||
| def unsubscribe(dest): | ||
| """STOMP unsubscribe command. | ||
| dest: | ||
| This is the channel we wish to subscribe to | ||
| Tell the server we no longer wish to receive any | ||
| further messages for the given subscription. | ||
| """ | ||
| return "UNSUBSCRIBE\ndestination:%s\n\n\x00\n" % dest | ||
| class Engine(object): | ||
| """This is a simple state machine to return a response to received | ||
| message if needed. | ||
| """ | ||
| def __init__(self, testing=False): | ||
| self.testing = testing | ||
| self.log = logging.getLogger("stomper.Engine") | ||
| self.sessionId = '' | ||
| # Entry Format: | ||
| # | ||
| # COMMAND : Handler_Function | ||
| # | ||
| self.states = { | ||
| 'CONNECTED' : self.connected, | ||
| 'MESSAGE' : self.ack, | ||
| 'ERROR' : self.error, | ||
| 'RECEIPT' : self.receipt, | ||
| } | ||
| def react(self, msg): | ||
| """Called to provide a response to a message if needed. | ||
| msg: | ||
| This is a dictionary as returned by unpack_frame(...) | ||
| or it can be a straight STOMP message. This function | ||
| will attempt to determine which an deal with it. | ||
| returned: | ||
| A message to return or an empty string. | ||
| """ | ||
| returned = "" | ||
| # If its not a string assume its a dict. | ||
| mtype = type(msg) | ||
| if mtype in types.StringTypes: | ||
| msg = unpack_frame(msg) | ||
| elif mtype == types.DictType: | ||
| pass | ||
| else: | ||
| raise FrameError("Unknown message type '%s', I don't know what to do with this!" % mtype) | ||
| if self.states.has_key(msg['cmd']): | ||
| # print("reacting to message - %s" % msg['cmd']) | ||
| returned = self.states[msg['cmd']](msg) | ||
| return returned | ||
| def connected(self, msg): | ||
| """No response is needed to a connected frame. | ||
| This method stores the session id as the | ||
| member sessionId for later use. | ||
| returned: | ||
| NO_RESPONSE_NEEDED | ||
| """ | ||
| self.sessionId = msg['headers']['session'] | ||
| #print "connected: session id '%s'." % self.sessionId | ||
| return NO_RESPONSE_NEEDED | ||
| def ack(self, msg): | ||
| """Called when a MESSAGE has been received. | ||
| Override this method to handle received messages. | ||
| This function will generate an acknowledge message | ||
| for the given message and transaction (if present). | ||
| """ | ||
| message_id = msg['headers']['message-id'] | ||
| transaction_id = None | ||
| if msg['headers'].has_key('transaction-id'): | ||
| transaction_id = msg['headers']['transaction-id'] | ||
| # print "acknowledging message id <%s>." % message_id | ||
| return ack(message_id, transaction_id) | ||
| def error(self, msg): | ||
| """Called to handle an error message received from the server. | ||
| This method just logs the error message | ||
| returned: | ||
| NO_RESPONSE_NEEDED | ||
| """ | ||
| body = msg['body'].replace(NULL, '') | ||
| brief_msg = "" | ||
| if msg['headers'].has_key('message'): | ||
| brief_msg = msg['headers']['message'] | ||
| self.log.error("Received server error - message%s\n\n%s" % (brief_msg, body)) | ||
| returned = NO_RESPONSE_NEEDED | ||
| if self.testing: | ||
| returned = 'error' | ||
| return returned | ||
| def receipt(self, msg): | ||
| """Called to handle a receipt message received from the server. | ||
| This method just logs the receipt message | ||
| returned: | ||
| NO_RESPONSE_NEEDED | ||
| """ | ||
| body = msg['body'].replace(NULL, '') | ||
| brief_msg = "" | ||
| if msg['headers'].has_key('receipt-id'): | ||
| brief_msg = msg['headers']['receipt-id'] | ||
| self.log.info("Received server receipt message - receipt-id:%s\n\n%s" % (brief_msg, body)) | ||
| returned = NO_RESPONSE_NEEDED | ||
| if self.testing: | ||
| returned = 'receipt' | ||
| return returned | ||
| NO_RESPONSE_NEEDED, | ||
| NO_REPONSE_NEEDED, | ||
| NULL, | ||
| ) |
+44
-5
@@ -1,4 +0,4 @@ | ||
| Metadata-Version: 1.0 | ||
| Metadata-Version: 1.1 | ||
| Name: stomper | ||
| Version: 0.2.8 | ||
| Version: 0.3.0 | ||
| Summary: This is a transport neutral client implementation of the STOMP protocol. | ||
@@ -25,2 +25,3 @@ Home-page: https://github.com/oisinmulvihill/stomper | ||
| - Daniele Varrazzo <https://github.com/dvarrazzo> | ||
| - Ralph Bean <http://threebean.org> | ||
@@ -36,6 +37,4 @@ | ||
| messages can be easily generated and parsed, however its up to the user to do | ||
| the sending and receiving. The STOMP protocol specification can be found here: | ||
| the sending and receiving. | ||
| - `Stomp Protocol <http://stomp.codehaus.org/Protocol/>`_ | ||
| I've looked at the stomp client by Jason R. Briggs. I've based some of the | ||
@@ -83,6 +82,46 @@ 'function to message' generation on how his client does it. The client can | ||
| Supported STOMP Versions | ||
| ------------------------ | ||
| 1.1 | ||
| ~~~ | ||
| This is the default version of the of STOMP used in stomper versions 0.3.x. | ||
| * https://stomp.github.io/stomp-specification-1.1.html | ||
| 1.0 | ||
| ~~~ | ||
| This is no longer the default protocol version. To use it you can import it as | ||
| follows:: | ||
| import stomper.stomp_10 as stomper | ||
| This is the default version used in stomper version 0.2.x. | ||
| * https://stomp.github.io/stomp-specification-1.0.html | ||
| Version History | ||
| --------------- | ||
| 0.3.0 | ||
| ~~~~~ | ||
| This release makes STOMP v1.1 the default protocol. To stick with STOMP v1.0 | ||
| you can continue to use stomper v0.2.9 or change the import in your code to:: | ||
| import stomper.stomp_10 as stomper | ||
| **Note** Any fixes to STOMP v1.0 will only be applied to version >= 0.3. | ||
| 0.2.9 | ||
| ~~~~~ | ||
| Thanks to Ralph Bean for contributing the new protocol 1.1 support: | ||
| * https://github.com/oisinmulvihill/stomper/issues/6 | ||
| * https://github.com/oisinmulvihill/stomper/pull/7 | ||
| 0.2.8 | ||
@@ -89,0 +128,0 @@ ~~~~~ |
+42
-3
@@ -17,2 +17,3 @@ ======= | ||
| - Daniele Varrazzo <https://github.com/dvarrazzo> | ||
| - Ralph Bean <http://threebean.org> | ||
@@ -28,6 +29,4 @@ | ||
| messages can be easily generated and parsed, however its up to the user to do | ||
| the sending and receiving. The STOMP protocol specification can be found here: | ||
| the sending and receiving. | ||
| - `Stomp Protocol <http://stomp.codehaus.org/Protocol/>`_ | ||
| I've looked at the stomp client by Jason R. Briggs. I've based some of the | ||
@@ -75,6 +74,46 @@ 'function to message' generation on how his client does it. The client can | ||
| Supported STOMP Versions | ||
| ------------------------ | ||
| 1.1 | ||
| ~~~ | ||
| This is the default version of the of STOMP used in stomper versions 0.3.x. | ||
| * https://stomp.github.io/stomp-specification-1.1.html | ||
| 1.0 | ||
| ~~~ | ||
| This is no longer the default protocol version. To use it you can import it as | ||
| follows:: | ||
| import stomper.stomp_10 as stomper | ||
| This is the default version used in stomper version 0.2.x. | ||
| * https://stomp.github.io/stomp-specification-1.0.html | ||
| Version History | ||
| --------------- | ||
| 0.3.0 | ||
| ~~~~~ | ||
| This release makes STOMP v1.1 the default protocol. To stick with STOMP v1.0 | ||
| you can continue to use stomper v0.2.9 or change the import in your code to:: | ||
| import stomper.stomp_10 as stomper | ||
| **Note** Any fixes to STOMP v1.0 will only be applied to version >= 0.3. | ||
| 0.2.9 | ||
| ~~~~~ | ||
| Thanks to Ralph Bean for contributing the new protocol 1.1 support: | ||
| * https://github.com/oisinmulvihill/stomper/issues/6 | ||
| * https://github.com/oisinmulvihill/stomper/pull/7 | ||
| 0.2.8 | ||
@@ -81,0 +120,0 @@ ~~~~~ |
+1
-1
@@ -14,3 +14,3 @@ """ | ||
| ProjectUrl = "https://github.com/oisinmulvihill/stomper" | ||
| Version = '0.2.8' | ||
| Version = '0.3.0' | ||
| Author = 'Oisin Mulvihill' | ||
@@ -17,0 +17,0 @@ AuthorEmail = 'oisin dot mulvihill at gmail com' |
| """ | ||
| This is the unittest to verify my stomper module. | ||
| The STOMP protocol specification maybe found here: | ||
| * http://stomp.codehaus.org/Protocol | ||
| I've looked and the stomp client by Jason R. Briggs and have based the message | ||
| generation on how his client did it. The client can be found at the follow | ||
| address however it isn't a dependancy. | ||
| * http://www.briggs.net.nz/log/projects/stomppy | ||
| (c) Oisin Mulvihill, 2007-07-26. | ||
| License: http://www.apache.org/licenses/LICENSE-2.0 | ||
| """ | ||
| import pprint | ||
| import unittest | ||
| import stomper | ||
| class TestEngine(stomper.Engine): | ||
| """Test that these methods are called by the default engine. | ||
| """ | ||
| def __init__(self): | ||
| super(TestEngine, self).__init__() | ||
| self.ackCalled = False | ||
| self.errorCalled = False | ||
| self.receiptCalled = False | ||
| def ack(self, msg): | ||
| super(TestEngine, self).ack(msg) | ||
| self.ackCalled = True | ||
| return 'ack' | ||
| def error(self, msg): | ||
| super(TestEngine, self).error(msg) | ||
| self.errorCalled = True | ||
| return 'error' | ||
| def receipt(self, msg): | ||
| super(TestEngine, self).receipt(msg) | ||
| self.receiptCalled = True | ||
| return 'receipt' | ||
| class StomperTest(unittest.TestCase): | ||
| def testEngineToServerMessages(self): | ||
| """Test the state machines reaction | ||
| """ | ||
| e = TestEngine() | ||
| # React to a message which should be an ack: | ||
| msg = stomper.Frame() | ||
| msg.cmd = 'MESSAGE' | ||
| msg.headers = { | ||
| 'destination:': '/queue/a', | ||
| 'message-id:': 'some-message-id' | ||
| } | ||
| msg.body = "hello queue a" | ||
| rc = e.react(msg.pack()) | ||
| self.assertEquals(rc, 'ack') | ||
| self.assertEquals(e.ackCalled, True) | ||
| # React to an error: | ||
| error = stomper.Frame() | ||
| error.cmd = 'ERROR' | ||
| error.headers = {'message:': 'malformed packet received!'} | ||
| error.body = """The message: | ||
| ----- | ||
| MESSAGE | ||
| destined:/queue/a | ||
| Hello queue a! | ||
| ----- | ||
| Did not contain a destination header, which is required for message propagation. | ||
| \x00 | ||
| """ | ||
| rc = e.react(error.pack()) | ||
| self.assertEquals(rc, 'error') | ||
| self.assertEquals(e.errorCalled, True) | ||
| # React to an receipt: | ||
| receipt = stomper.Frame() | ||
| receipt.cmd = 'RECEIPT' | ||
| receipt.headers = {'receipt-id:': 'message-12345'} | ||
| rc = e.react(receipt.pack()) | ||
| self.assertEquals(rc, 'receipt') | ||
| self.assertEquals(e.receiptCalled, True) | ||
| def testEngine(self): | ||
| """Test the basic state machine. | ||
| """ | ||
| e = stomper.Engine(testing=True) | ||
| # test session connected message: | ||
| msg = """CONNECTED | ||
| session:ID:snorky.local-49191-1185461799654-3:18 | ||
| \x00 | ||
| """ | ||
| result = stomper.unpack_frame(msg) | ||
| correct = '' | ||
| returned = e.react(result) | ||
| self.assertEquals(returned, correct) | ||
| # test message: | ||
| msg = """MESSAGE | ||
| destination: /queue/a | ||
| message-id: some-message-id | ||
| hello queue a | ||
| \x00 | ||
| """ | ||
| returned = e.react(msg) | ||
| correct = 'ACK\nmessage-id: some-message-id\n\n\x00\n' | ||
| self.assertEquals(returned, correct) | ||
| # test error: | ||
| msg = """ERROR | ||
| message:some error | ||
| There was a problem with your last message | ||
| \x00 | ||
| """ | ||
| returned = e.react(msg) | ||
| correct = 'error' | ||
| self.assertEquals(returned, correct) | ||
| # test receipt: | ||
| msg = """RECEIPT | ||
| message-id: some-message-id | ||
| \x00 | ||
| """ | ||
| returned = e.react(msg) | ||
| correct = 'receipt' | ||
| self.assertEquals(returned, correct) | ||
| def testFramepack1(self): | ||
| """Testing pack, unpacking and the Frame class. | ||
| """ | ||
| # Check bad frame generation: | ||
| frame = stomper.Frame() | ||
| def bad(): | ||
| frame.cmd = 'SOME UNNOWN CMD' | ||
| self.assertRaises(stomper.FrameError, bad) | ||
| # Generate a MESSAGE frame: | ||
| frame = stomper.Frame() | ||
| frame.cmd = 'MESSAGE' | ||
| frame.headers['destination'] = '/queue/a' | ||
| frame.headers['message-id'] = 'card_data' | ||
| frame.body = "hello queue a" | ||
| result = frame.pack() | ||
| # print "\n-- result " + "----" * 10 | ||
| # pprint.pprint(result) | ||
| # Try bad message unpack catching: | ||
| bad_frame = stomper.Frame() | ||
| self.assertRaises(stomper.FrameError, bad_frame.unpack, None) | ||
| self.assertRaises(stomper.FrameError, bad_frame.unpack, '') | ||
| # Try to read the generated frame back in | ||
| # and then check the variables are set up | ||
| # correctly: | ||
| frame2 = stomper.Frame() | ||
| frame2.unpack(result) | ||
| self.assertEquals(frame2.cmd, 'MESSAGE') | ||
| self.assertEquals(frame2.headers['destination'], '/queue/a') | ||
| self.assertEquals(frame2.headers['message-id'], 'card_data') | ||
| self.assertEquals(frame2.body, 'hello queue a') | ||
| result = frame2.pack() | ||
| correct = "MESSAGE\ndestination:/queue/a\nmessage-id:card_data\n\nhello queue a\x00\n" | ||
| # print "result: " | ||
| # pprint.pprint(result) | ||
| # print "correct: " | ||
| # pprint.pprint(correct) | ||
| # | ||
| self.assertEquals(result, correct) | ||
| result = stomper.unpack_frame(result) | ||
| self.assertEquals(result['cmd'], 'MESSAGE') | ||
| self.assertEquals(result['headers']['destination'], '/queue/a') | ||
| self.assertEquals(result['headers']['message-id'], 'card_data') | ||
| self.assertEquals(result['body'], 'hello queue a') | ||
| def testFramepack2(self): | ||
| """Testing pack, unpacking and the Frame class. | ||
| """ | ||
| # Check bad frame generation: | ||
| frame = stomper.Frame() | ||
| frame.cmd = 'DISCONNECT' | ||
| result = frame.pack() | ||
| correct = 'DISCONNECT\n\n\x00\n' | ||
| self.assertEquals(result, correct) | ||
| def testFrameUnpack2(self): | ||
| """Testing unpack frame function against MESSAGE | ||
| """ | ||
| msg = """MESSAGE | ||
| destination:/queue/a | ||
| message-id: card_data | ||
| hello queue a""" | ||
| result = stomper.unpack_frame(msg) | ||
| self.assertEquals(result['cmd'], 'MESSAGE') | ||
| self.assertEquals(result['headers']['destination'], '/queue/a') | ||
| self.assertEquals(result['headers']['message-id'], 'card_data') | ||
| self.assertEquals(result['body'], 'hello queue a') | ||
| def testFrameUnpack3(self): | ||
| """Testing unpack frame function against CONNECTED | ||
| """ | ||
| msg = """CONNECTED | ||
| session:ID:snorky.local-49191-1185461799654-3:18 | ||
| """ | ||
| result = stomper.unpack_frame(msg) | ||
| self.assertEquals(result['cmd'], 'CONNECTED') | ||
| self.assertEquals(result['headers']['session'], 'ID:snorky.local-49191-1185461799654-3:18') | ||
| self.assertEquals(result['body'], '') | ||
| def testBugInFrameUnpack1(self): | ||
| msg = """MESSAGE | ||
| destination:/queue/a | ||
| message-id: card_data | ||
| hello queue a | ||
| \x00 | ||
| """ | ||
| result = stomper.unpack_frame(msg) | ||
| self.assertEquals(result['cmd'], 'MESSAGE') | ||
| self.assertEquals(result['headers']['destination'], '/queue/a') | ||
| self.assertEquals(result['headers']['message-id'], 'card_data') | ||
| self.assertEquals(result['body'], 'hello queue a') | ||
| def testCommit(self): | ||
| transactionid = '1234' | ||
| correct = "COMMIT\ntransaction: %s\n\n\x00\n" % transactionid | ||
| self.assertEquals(stomper.commit(transactionid), correct) | ||
| def testAbort(self): | ||
| transactionid = '1234' | ||
| correct = "ABORT\ntransaction: %s\n\n\x00\n" % transactionid | ||
| self.assertEquals(stomper.abort(transactionid), correct) | ||
| def testBegin(self): | ||
| transactionid = '1234' | ||
| correct = "BEGIN\ntransaction: %s\n\n\x00\n" % transactionid | ||
| self.assertEquals(stomper.begin(transactionid), correct) | ||
| def testAck(self): | ||
| messageid = '1234' | ||
| transactionid = '9876' | ||
| header = 'message-id: %s\ntransaction: %s' % (messageid, transactionid) | ||
| correct = "ACK\n%s\n\n\x00\n" % header | ||
| self.assertEquals(stomper.ack(messageid, transactionid), correct) | ||
| messageid = '1234' | ||
| correct = "ACK\nmessage-id: %s\n\n\x00\n" % messageid | ||
| self.assertEquals(stomper.ack(messageid), correct) | ||
| def testUnsubscribe(self): | ||
| dest = '/queue/all' | ||
| correct = "UNSUBSCRIBE\ndestination:%s\n\n\x00\n" % dest | ||
| self.assertEquals(stomper.unsubscribe(dest), correct) | ||
| def testSubscribe(self): | ||
| dest, ack = '/queue/all', 'client' | ||
| correct = "SUBSCRIBE\ndestination: %s\nack: %s\n\n\x00\n" % (dest, ack) | ||
| self.assertEquals(stomper.subscribe(dest, ack), correct) | ||
| dest, ack = '/queue/all', 'auto' | ||
| correct = "SUBSCRIBE\ndestination: %s\nack: %s\n\n\x00\n" % (dest, ack) | ||
| self.assertEquals(stomper.subscribe(dest, ack), correct) | ||
| correct = "SUBSCRIBE\ndestination: %s\nack: %s\n\n\x00\n" % (dest, ack) | ||
| self.assertEquals(stomper.subscribe(dest), correct) | ||
| def testConnect(self): | ||
| username, password = 'bob', '123' | ||
| correct = "CONNECT\nlogin:%s\npasscode:%s\n\n\x00\n" % (username, password) | ||
| self.assertEquals(stomper.connect(username, password), correct) | ||
| def testDisconnect(self): | ||
| correct = "DISCONNECT\n\n\x00\n" | ||
| self.assertEquals(stomper.disconnect(), correct) | ||
| def testSend(self): | ||
| dest, transactionid, msg = '/queue/myplace', '', '123 456 789' | ||
| correct = "SEND\ndestination: %s\n\n%s\x00\n" % (dest, msg) | ||
| result = stomper.send(dest, msg, transactionid) | ||
| # print "result: " | ||
| # pprint.pprint(result) | ||
| # print "correct: " | ||
| # pprint.pprint(correct) | ||
| self.assertEquals(result, correct) | ||
| dest, transactionid, msg = '/queue/myplace', '987', '123 456 789' | ||
| correct = "SEND\ndestination: %s\ntransaction: %s\n\n%s\x00\n" % (dest, transactionid, msg) | ||
| self.assertEquals(stomper.send(dest, msg, transactionid), correct) | ||
| if __name__ == "__main__": | ||
| unittest.main() |
Alert delta unavailable
Currently unable to show alert delta for PyPI packages.
117733
32.94%25
13.64%2309
41.22%