Sophie

Sophie

distrib > Mageia > 5 > i586 > media > core-release > by-pkgid > a493cfa5485edbef625a7abe38b7db75 > files > 23

python-stomper-0.2.8-5.mga5.noarch.rpm

"""
A simple twisted STOMP message sender.

(c) Oisin Mulvihill, 2007-07-26.
License: http://www.apache.org/licenses/LICENSE-2.0

"""
import logging

from twisted.internet import reactor
from twisted.internet.task import LoopingCall
from twisted.internet.protocol import Protocol, ReconnectingClientFactory

import stomper
from stomper import stompbuffer


stomper.utils.log_init(logging.DEBUG)

DESTINATION="/topic/inbox"


class StompProtocol(Protocol, stomper.Engine):

    def __init__(self, username='', password=''):
        stomper.Engine.__init__(self)
        self.username = username
        self.password = password
        self.counter = 1
        self.log = logging.getLogger("sender")
        self.stompBuffer = stompbuffer.StompBuffer()


    def connected(self, msg):
        """Once I've connected I want to subscribe to my the message queue.
        """
        stomper.Engine.connected(self, msg)

        self.log.info("Connected: session %s. Beginning say hello." % msg['headers']['session'])
        
        def setup_looping_call():
            lc = LoopingCall(self.send)
            lc.start(2)
            
        reactor.callLater(1, setup_looping_call)

        f = stomper.Frame()
        f.unpack(stomper.subscribe(DESTINATION))

        # ActiveMQ specific headers:
        #
        # prevent the messages we send comming back to us.
        f.headers['activemq.noLocal'] = 'true'
        
        return f.pack()

        
    def ack(self, msg):
        """Processes the received message. I don't need to 
        generate an ack message.
        
        """
        self.log.info("SENDER - received: %s " % msg['body'])
        return stomper.NO_REPONSE_NEEDED


    def send(self):
        """Send out a hello message periodically.
        """
        self.log.info("Saying hello (%d)." % self.counter)

        f = stomper.Frame()
        f.unpack(stomper.send(DESTINATION, 'hello there (%d)' % self.counter))

        self.counter += 1        

        # ActiveMQ specific headers:
        #
        #f.headers['persistent'] = 'true'

        self.transport.write(f.pack())


    def connectionMade(self):
        """Register with stomp server.
        """
        cmd = stomper.connect(self.username, self.password)
        self.transport.write(cmd)


    def dataReceived(self, data):
        """Use stompbuffer to determine when a complete message has been received. 
        """
        self.stompBuffer.appendData(data)

        while True:
           msg = self.stompBuffer.getOneMessage()
           if msg is None:
               break

           returned = self.react(msg)
           if returned:
               self.transport.write(returned)




class StompClientFactory(ReconnectingClientFactory):
    
    # Will be set up before the factory is created.
    username, password = '', ''
    
    def buildProtocol(self, addr):
        return StompProtocol(self.username, self.password)
    
    
    def clientConnectionLost(self, connector, reason):
        """Lost connection
        """
        print 'Lost connection.  Reason:', reason
    
    
    def clientConnectionFailed(self, connector, reason):
        """Connection failed
        """
        print 'Connection failed. Reason:', reason        
        ReconnectingClientFactory.clientConnectionFailed(self, connector, reason)


def start(host='localhost', port=61613, username='', password=''):
    """Start twisted event loop and the fun should begin...
    """
    StompClientFactory.username = username
    StompClientFactory.password = password
    reactor.connectTCP(host, port, StompClientFactory())
    reactor.run()


if __name__ == '__main__':
    start()