Sophie

Sophie

distrib > Mageia > 7 > armv7hl > media > core-release > by-pkgid > 16b6c7fca2fc9f56193b382cc05af140 > files > 148

python3-zope-component-4.4.1-3.mga7.noarch.rpm

Persistent Registries
=====================

.. testsetup::

   from zope.component.testing import setUp
   setUp()

Conforming Adapter Lookup
-------------------------
Here, we'll demonstrate that changes work even when data are stored in
a database and when accessed from multiple connections.

Start by setting up a database and creating two transaction
managers and database connections to work with.

.. doctest::

    >>> import ZODB.tests.util
    >>> db = ZODB.tests.util.DB()
    >>> import transaction
    >>> t1 = transaction.TransactionManager()
    >>> c1 = db.open(transaction_manager=t1)
    >>> r1 = c1.root()
    >>> t2 = transaction.TransactionManager()
    >>> c2 = db.open(transaction_manager=t2)
    >>> r2 = c2.root()

Create a set of components registries in the database, alternating
connections.

.. doctest::

    >>> from zope.component.persistentregistry import PersistentComponents
    >>> from zope.component.tests.examples import I1
    >>> from zope.component.tests.examples import I2
    >>> from zope.component.tests.examples import U
    >>> from zope.component.tests.examples import U1
    >>> from zope.component.tests.examples import U12
    >>> from zope.component.tests.examples import handle1
    >>> from zope.component.tests.examples import handle2
    >>> from zope.component.tests.examples import handle3
    >>> from zope.component.tests.examples import handle4

    >>> _ = t1.begin()
    >>> r1[1] = PersistentComponents('1')
    >>> t1.commit()

    >>> _ = t2.begin()
    >>> r2[2] = PersistentComponents('2', (r2[1], ))
    >>> t2.commit()

    >>> _ = t1.begin()
    >>> r1[3] = PersistentComponents('3', (r1[1], ))
    >>> t1.commit()

    >>> _ = t2.begin()
    >>> r2[4] = PersistentComponents('4', (r2[2], r2[3]))
    >>> t2.commit()

    >>> _ = t1.begin()
    >>> r1[1].__bases__
    ()
    >>> r1[2].__bases__ == (r1[1], )
    True

    >>> r1[1].registerUtility(U1(1))
    >>> r1[1].queryUtility(I1)
    U1(1)
    >>> r1[2].queryUtility(I1)
    U1(1)
    >>> t1.commit()

    >>> _ = t2.begin()
    >>> r2[1].registerUtility(U1(2))
    >>> r2[2].queryUtility(I1)
    U1(2)

    >>> r2[4].queryUtility(I1)
    U1(2)
    >>> t2.commit()


    >>> _ = t1.begin()
    >>> r1[1].registerUtility(U12(1), I2)
    >>> r1[4].queryUtility(I2)
    U12(1)
    >>> t1.commit()


    >>> _ = t2.begin()
    >>> r2[3].registerUtility(U12(3), I2)
    >>> r2[4].queryUtility(I2)
    U12(3)
    >>> t2.commit()

    >>> _ = t1.begin()

    >>> r1[1].registerHandler(handle1, info="First handler")
    >>> r1[2].registerHandler(handle2, required=[U])

    >>> r1[3].registerHandler(handle3)

    >>> r1[4].registerHandler(handle4)

    >>> r1[4].handle(U1(1))
    handle1 U1(1)
    handle3 U1(1)
    handle2 (U1(1),)
    handle4 U1(1)

    >>> t1.commit()

    >>> _ = t2.begin()
    >>> r2[4].handle(U1(1))
    handle1 U1(1)
    handle3 U1(1)
    handle2 (U1(1),)
    handle4 U1(1)
    >>> t2.abort()

    >>> db.close()

Subscription to Events in Persistent Registries
-----------------------------------------------

.. doctest::

    >>> import ZODB.tests.util
    >>> db = ZODB.tests.util.DB()
    >>> import transaction
    >>> t1 = transaction.TransactionManager()
    >>> c1 = db.open(transaction_manager=t1)
    >>> r1 = c1.root()
    >>> t2 = transaction.TransactionManager()
    >>> c2 = db.open(transaction_manager=t2)
    >>> r2 = c2.root()

    >>> from zope.component.persistentregistry import PersistentComponents

    >>> _ = t1.begin()
    >>> r1[1] = PersistentComponents('1')
    >>> r1[1].registerHandler(handle1)
    >>> r1[1].registerSubscriptionAdapter(handle1, provided=I2)
    >>> _ = r1[1].unregisterHandler(handle1)
    >>> _ = r1[1].unregisterSubscriptionAdapter(handle1, provided=I2)
    >>> t1.commit()
    >>> _ = t1.begin()
    >>> r1[1].registerHandler(handle1)
    >>> r1[1].registerSubscriptionAdapter(handle1, provided=I2)
    >>> t1.commit()

    >>> _ = t2.begin()
    >>> len(list(r2[1].registeredHandlers()))
    1
    >>> len(list(r2[1].registeredSubscriptionAdapters()))
    1
    >>> t2.abort()

Adapter Registrations after Serialization / Deserialization
-----------------------------------------------------------

We want to make sure that we see updates corrextly.

.. doctest::

    >>> import persistent
    >>> import transaction
    >>> from zope.interface import Interface
    >>> from zope.interface import implementer
    >>> class IFoo(Interface):
    ...     pass
    >>> @implementer(IFoo)
    ... class Foo(persistent.Persistent):
    ...     name = ''
    ...     def __init__(self, name=''):
    ...         self.name = name
    ...     def __repr__(self):
    ...         return 'Foo(%r)' % self.name

    >>> from zope.component.tests.examples import base
    >>> from zope.component.tests.examples import clear_base
    >>> len(base._v_subregistries)
    0

    >>> import ZODB.tests.util
    >>> db = ZODB.tests.util.DB()
    >>> tm1 = transaction.TransactionManager()
    >>> c1 = db.open(transaction_manager=tm1)
    >>> from zope.component.persistentregistry import PersistentAdapterRegistry
    >>> r1 = PersistentAdapterRegistry((base,))
    >>> r2 = PersistentAdapterRegistry((r1,))
    >>> c1.root()[1] = r1
    >>> c1.root()[2] = r2
    >>> tm1.commit()
    >>> r1._p_deactivate()

    >>> len(base._v_subregistries)
    0

    >>> tm2 = transaction.TransactionManager()
    >>> c2 = db.open(transaction_manager=tm2)
    >>> r1 = c2.root()[1]
    >>> r2 = c2.root()[2]

    >>> r1.lookup((), IFoo, '')

    >>> base.register((), IFoo, '', Foo(''))
    >>> r1.lookup((), IFoo, '')
    Foo('')

    >>> r2.lookup((), IFoo, '1')

    >>> r1.register((), IFoo, '1', Foo('1'))

    >>> r2.lookup((), IFoo, '1')
    Foo('1')

    >>> r1.lookup((), IFoo, '2')
    >>> r2.lookup((), IFoo, '2')

    >>> base.register((), IFoo, '2', Foo('2'))

    >>> r1.lookup((), IFoo, '2')
    Foo('2')

    >>> r2.lookup((), IFoo, '2')
    Foo('2')

    >>> db.close()
    >>> clear_base()

.. testcleanup::

   from zope.component.testing import tearDown
   tearDown()