1 from __future__ import absolute_import
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 import logging, os, socket, time, types
21 from heapq import heappush, heappop, nsmallest
22 from proton import Collector, Connection, ConnectionException, Delivery, Described, dispatch
23 from proton import Endpoint, Event, EventBase, EventType, generate_uuid, Handler, Link, Message
24 from proton import ProtonException, PN_ACCEPTED, PN_PYREF, SASL, Session, SSL, SSLDomain, SSLUnavailable, symbol
25 from proton import Terminus, Timeout, Transport, TransportException, ulong, Url
26 from select import select
27 from proton.handlers import OutgoingMessageHandler
28 from proton import unicode2utf8, utf82unicode
29
30 import traceback
31 from proton import WrappedHandler, _chandler, secs2millis, millis2secs, timeout2millis, millis2timeout, Selectable
32 from .wrapper import Wrapper, PYCTX
33 from cproton import *
34 from . import _compat
35
36 try:
37 import Queue
38 except ImportError:
39 import queue as Queue
40
41 -class Task(Wrapper):
42
43 @staticmethod
45 if impl is None:
46 return None
47 else:
48 return Task(impl)
49
52
55
57 pn_task_cancel(self._impl)
58
60
63
64 - def set_ssl_domain(self, ssl_domain):
65 pn_acceptor_set_ssl_domain(self._impl, ssl_domain._domain)
66
68 pn_acceptor_close(self._impl)
69
71
72 @staticmethod
74 if impl is None:
75 return None
76 else:
77 record = pn_reactor_attachments(impl)
78 attrs = pn_void2py(pn_record_get(record, PYCTX))
79 if attrs and 'subclass' in attrs:
80 return attrs['subclass'](impl=impl)
81 else:
82 return Reactor(impl=impl)
83
84 - def __init__(self, *handlers, **kwargs):
88
91
93 self.errors.append(info)
94 self.yield_()
95
97 return WrappedHandler.wrap(pn_reactor_get_global_handler(self._impl), self.on_error)
98
100 impl = _chandler(handler, self.on_error)
101 pn_reactor_set_global_handler(self._impl, impl)
102 pn_decref(impl)
103
104 global_handler = property(_get_global, _set_global)
105
107 return millis2timeout(pn_reactor_get_timeout(self._impl))
108
110 return pn_reactor_set_timeout(self._impl, timeout2millis(secs))
111
112 timeout = property(_get_timeout, _set_timeout)
113
115 pn_reactor_yield(self._impl)
116
118 return pn_reactor_mark(self._impl)
119
121 return WrappedHandler.wrap(pn_reactor_get_handler(self._impl), self.on_error)
122
124 impl = _chandler(handler, self.on_error)
125 pn_reactor_set_handler(self._impl, impl)
126 pn_decref(impl)
127
128 handler = property(_get_handler, _set_handler)
129
138
140 n = pn_reactor_wakeup(self._impl)
141 if n: raise IOError(pn_error_text(pn_reactor_error(self._impl)))
142
144 pn_reactor_start(self._impl)
145
146 @property
148 return pn_reactor_quiesced(self._impl)
149
151 if self.errors:
152 for exc, value, tb in self.errors[:-1]:
153 traceback.print_exception(exc, value, tb)
154 exc, value, tb = self.errors[-1]
155 _compat.raise_(exc, value, tb)
156
158 result = pn_reactor_process(self._impl)
159 self._check_errors()
160 return result
161
163 pn_reactor_stop(self._impl)
164 self._check_errors()
165
167 impl = _chandler(task, self.on_error)
168 task = Task.wrap(pn_reactor_schedule(self._impl, secs2millis(delay), impl))
169 pn_decref(impl)
170 return task
171
172 - def acceptor(self, host, port, handler=None):
173 impl = _chandler(handler, self.on_error)
174 aimpl = pn_reactor_acceptor(self._impl, unicode2utf8(host), str(port), impl)
175 pn_decref(impl)
176 if aimpl:
177 return Acceptor(aimpl)
178 else:
179 raise IOError("%s (%s:%s)" % pn_error_text(pn_reactor_error(self._impl)), host, port)
180
182 """Deprecated: use connection_to_host() instead
183 """
184 impl = _chandler(handler, self.on_error)
185 result = Connection.wrap(pn_reactor_connection(self._impl, impl))
186 if impl: pn_decref(impl)
187 return result
188
190 """Create an outgoing Connection that will be managed by the reactor.
191 The reator's pn_iohandler will create a socket connection to the host
192 once the connection is opened.
193 """
194 conn = self.connection(handler)
195 self.set_connection_host(conn, host, port)
196 return conn
197
199 """Change the address used by the connection. The address is
200 used by the reactor's iohandler to create an outgoing socket
201 connection. This must be set prior to opening the connection.
202 """
203 pn_reactor_set_connection_host(self._impl,
204 connection._impl,
205 unicode2utf8(str(host)),
206 unicode2utf8(str(port)))
207
209 """This may be used to retrieve the remote peer address.
210 @return: string containing the address in URL format or None if no
211 address is available. Use the proton.Url class to create a Url object
212 from the returned value.
213 """
214 _url = pn_reactor_get_connection_address(self._impl, connection._impl)
215 return utf82unicode(_url)
216
218 impl = _chandler(handler, self.on_error)
219 result = Selectable.wrap(pn_reactor_selectable(self._impl))
220 if impl:
221 record = pn_selectable_attachments(result._impl)
222 pn_record_set_handler(record, impl)
223 pn_decref(impl)
224 return result
225
227 pn_reactor_update(self._impl, sel._impl)
228
230 pn_collector_put(pn_reactor_collector(self._impl), PN_PYREF, pn_py2void(obj), etype.number)
231
232 from proton import wrappers as _wrappers
233 _wrappers["pn_reactor"] = lambda x: Reactor.wrap(pn_cast_pn_reactor(x))
234 _wrappers["pn_task"] = lambda x: Task.wrap(pn_cast_pn_task(x))
238 """
239 Can be added to a reactor to allow events to be triggered by an
240 external thread but handled on the event thread associated with
241 the reactor. An instance of this class can be passed to the
242 Reactor.selectable() method of the reactor in order to activate
243 it. The close() method should be called when it is no longer
244 needed, to allow the event loop to end if needed.
245 """
247 self.queue = Queue.Queue()
248 self.pipe = os.pipe()
249 self._closed = False
250
252 """
253 Request that the given event be dispatched on the event thread
254 of the reactor to which this EventInjector was added.
255 """
256 self.queue.put(event)
257 os.write(self.pipe[1], _compat.str2bin("!"))
258
260 """
261 Request that this EventInjector be closed. Existing events
262 will be dispctahed on the reactors event dispactch thread,
263 then this will be removed from the set of interest.
264 """
265 self._closed = True
266 os.write(self.pipe[1], _compat.str2bin("!"))
267
270
276
286
289 """
290 Application defined event, which can optionally be associated with
291 an engine object and or an arbitrary subject
292 """
293 - def __init__(self, typename, connection=None, session=None, link=None, delivery=None, subject=None):
306
310
312 """
313 Class to track state of an AMQP 1.0 transaction.
314 """
315 - def __init__(self, txn_ctrl, handler, settle_before_discharge=False):
316 self.txn_ctrl = txn_ctrl
317 self.handler = handler
318 self.id = None
319 self._declare = None
320 self._discharge = None
321 self.failed = False
322 self._pending = []
323 self.settle_before_discharge = settle_before_discharge
324 self.declare()
325
328
331
333 self._declare = self._send_ctrl(symbol(u'amqp:declare:list'), [None])
334
338
343
344 - def send(self, sender, msg, tag=None):
349
356
357 - def update(self, delivery, state=None):
361
367
370
393
395 """
396 Abstract interface for link configuration options
397 """
399 """
400 Subclasses will implement any configuration logic in this
401 method
402 """
403 pass
404 - def test(self, link):
405 """
406 Subclasses can override this to selectively apply an option
407 e.g. based on some link criteria
408 """
409 return True
410
414
419
421 - def apply(self, sender): pass
423
425 - def apply(self, receiver): pass
427
442
445 self.filter_set = filter_set
446
447 - def apply(self, receiver):
449
451 """
452 Configures a link with a message selector filter
453 """
454 - def __init__(self, value, name='selector'):
456
458 - def apply(self, receiver):
461
462 -class Move(ReceiverOption):
463 - def apply(self, receiver):
465
466 -class Copy(ReceiverOption):
467 - def apply(self, receiver):
469
477
482
489
492 self._default_session = None
493
495 if not self._default_session:
496 self._default_session = _create_session(connection)
497 return self._default_session
498
500 """
501 Internal handler that triggers the necessary socket connect for an
502 opened connection.
503 """
506
508 if not self._override(event):
509 event.dispatch(self.base)
510
512 conn = event.connection
513 return conn and hasattr(conn, '_overrides') and event.dispatch(conn._overrides)
514
516 """
517 Internal handler that triggers the necessary socket connect for an
518 opened connection.
519 """
533
534 - def _connect(self, connection, reactor):
565
568
574
577
592
595
598
600 """
601 A reconnect strategy involving an increasing delay between
602 retries, up to a maximum or 10 seconds.
603 """
606
609
617
620 self.values = [Url(v) for v in values]
621 self.i = iter(self.values)
622
625
627 try:
628 return next(self.i)
629 except StopIteration:
630 self.i = iter(self.values)
631 return next(self.i)
632
645
648 """A representation of the AMQP concept of a 'container', which
649 lossely speaking is something that establishes links to or from
650 another container, over which messages are transfered. This is
651 an extension to the Reactor class that adds convenience methods
652 for creating connections and sender- or receiver- links.
653 """
654 - def __init__(self, *handlers, **kwargs):
670
671 - def connect(self, url=None, urls=None, address=None, handler=None, reconnect=None, heartbeat=None, ssl_domain=None, **kwargs):
672 """
673 Initiates the establishment of an AMQP connection. Returns an
674 instance of proton.Connection.
675
676 @param url: URL string of process to connect to
677
678 @param urls: list of URL strings of process to try to connect to
679
680 Only one of url or urls should be specified.
681
682 @param reconnect: A value of False will prevent the library
683 form automatically trying to reconnect if the underlying
684 socket is disconnected before the connection has been closed.
685
686 @param heartbeat: A value in milliseconds indicating the
687 desired frequency of heartbeats used to test the underlying
688 socket is alive.
689
690 @param ssl_domain: SSL configuration in the form of an
691 instance of proton.SSLdomain.
692
693 @param handler: a connection scoped handler that will be
694 called to process any events in the scope of this connection
695 or its child links
696
697 @param kwargs: sasl_enabled, which determines whether a sasl layer is
698 used for the connection; allowed_mechs an optional list of SASL
699 mechanisms to allow if sasl is enabled; allow_insecure_mechs a flag
700 indicating whether insecure mechanisms, such as PLAIN over a
701 non-encrypted socket, are allowed; 'virtual_host' the hostname to set
702 in the Open performative used by peer to determine the correct
703 back-end service for the client. If 'virtual_host' is not supplied the
704 host field from the URL is used instead."
705
706 """
707 conn = self.connection(handler)
708 conn.container = self.container_id or str(generate_uuid())
709 conn.offered_capabilities = kwargs.get('offered_capabilities')
710 conn.desired_capabilities = kwargs.get('desired_capabilities')
711 conn.properties = kwargs.get('properties')
712
713 connector = Connector(conn)
714 connector.allow_insecure_mechs = kwargs.get('allow_insecure_mechs', self.allow_insecure_mechs)
715 connector.allowed_mechs = kwargs.get('allowed_mechs', self.allowed_mechs)
716 connector.sasl_enabled = kwargs.get('sasl_enabled', self.sasl_enabled)
717 connector.user = kwargs.get('user', self.user)
718 connector.password = kwargs.get('password', self.password)
719 connector.virtual_host = kwargs.get('virtual_host')
720 if connector.virtual_host:
721
722 conn.hostname = connector.virtual_host
723 connector.ssl_sni = kwargs.get('sni')
724
725 conn._overrides = connector
726 if url: connector.address = Urls([url])
727 elif urls: connector.address = Urls(urls)
728 elif address: connector.address = address
729 else: raise ValueError("One of url, urls or address required")
730 if heartbeat:
731 connector.heartbeat = heartbeat
732 if reconnect:
733 connector.reconnect = reconnect
734 elif reconnect is None:
735 connector.reconnect = Backoff()
736
737
738 connector.ssl_domain = ssl_domain or (self.ssl and self.ssl.client)
739 conn._session_policy = SessionPerConnection()
740 conn.open()
741 return conn
742
743 - def _get_id(self, container, remote, local):
744 if local and remote: "%s-%s-%s" % (container, remote, local)
745 elif local: return "%s-%s" % (container, local)
746 elif remote: return "%s-%s" % (container, remote)
747 else: return "%s-%s" % (container, str(generate_uuid()))
748
761
762 - def create_sender(self, context, target=None, source=None, name=None, handler=None, tags=None, options=None):
763 """
764 Initiates the establishment of a link over which messages can
765 be sent. Returns an instance of proton.Sender.
766
767 There are two patterns of use. (1) A connection can be passed
768 as the first argument, in which case the link is established
769 on that connection. In this case the target address can be
770 specified as the second argument (or as a keyword
771 argument). The source address can also be specified if
772 desired. (2) Alternatively a URL can be passed as the first
773 argument. In this case a new connection will be establised on
774 which the link will be attached. If a path is specified and
775 the target is not, then the path of the URL is used as the
776 target address.
777
778 The name of the link may be specified if desired, otherwise a
779 unique name will be generated.
780
781 Various LinkOptions can be specified to further control the
782 attachment.
783 """
784 if isinstance(context, _compat.STRING_TYPES):
785 context = Url(context)
786 if isinstance(context, Url) and not target:
787 target = context.path
788 session = self._get_session(context)
789 snd = session.sender(name or self._get_id(session.connection.container, target, source))
790 if source:
791 snd.source.address = source
792 if target:
793 snd.target.address = target
794 if handler != None:
795 snd.handler = handler
796 if tags:
797 snd.tag_generator = tags
798 _apply_link_options(options, snd)
799 snd.open()
800 return snd
801
802 - def create_receiver(self, context, source=None, target=None, name=None, dynamic=False, handler=None, options=None):
803 """
804 Initiates the establishment of a link over which messages can
805 be received (aka a subscription). Returns an instance of
806 proton.Receiver.
807
808 There are two patterns of use. (1) A connection can be passed
809 as the first argument, in which case the link is established
810 on that connection. In this case the source address can be
811 specified as the second argument (or as a keyword
812 argument). The target address can also be specified if
813 desired. (2) Alternatively a URL can be passed as the first
814 argument. In this case a new connection will be establised on
815 which the link will be attached. If a path is specified and
816 the source is not, then the path of the URL is used as the
817 target address.
818
819 The name of the link may be specified if desired, otherwise a
820 unique name will be generated.
821
822 Various LinkOptions can be specified to further control the
823 attachment.
824 """
825 if isinstance(context, _compat.STRING_TYPES):
826 context = Url(context)
827 if isinstance(context, Url) and not source:
828 source = context.path
829 session = self._get_session(context)
830 rcv = session.receiver(name or self._get_id(session.connection.container, source, target))
831 if source:
832 rcv.source.address = source
833 if dynamic:
834 rcv.source.dynamic = True
835 if target:
836 rcv.target.address = target
837 if handler != None:
838 rcv.handler = handler
839 _apply_link_options(options, rcv)
840 rcv.open()
841 return rcv
842
844 if not _get_attr(context, '_txn_ctrl'):
845 class InternalTransactionHandler(OutgoingMessageHandler):
846 def __init__(self):
847 super(InternalTransactionHandler, self).__init__(auto_settle=True)
848
849 def on_settled(self, event):
850 if hasattr(event.delivery, "transaction"):
851 event.transaction = event.delivery.transaction
852 event.delivery.transaction.handle_outcome(event)
853 context._txn_ctrl = self.create_sender(context, None, name='txn-ctrl', handler=InternalTransactionHandler())
854 context._txn_ctrl.target.type = Terminus.COORDINATOR
855 context._txn_ctrl.target.capabilities.put_object(symbol(u'amqp:local-transactions'))
856 return Transaction(context._txn_ctrl, handler, settle_before_discharge)
857
858 - def listen(self, url, ssl_domain=None):
859 """
860 Initiates a server socket, accepting incoming AMQP connections
861 on the interface and port specified.
862 """
863 url = Url(url)
864 acceptor = self.acceptor(url.host, url.port)
865 ssl_config = ssl_domain
866 if not ssl_config and url.scheme == 'amqps':
867
868 if self.ssl:
869 ssl_config = self.ssl.server
870 else:
871 raise SSLUnavailable("amqps: SSL libraries not found")
872 if ssl_config:
873 acceptor.set_ssl_domain(ssl_config)
874 return acceptor
875
880