Skip to content
This repository has been archived by the owner on Dec 4, 2024. It is now read-only.

nricciardi/py-busline

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

25 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Important

Migrate to Orbitalis framework

Busline for Python

Agnostic eventbus for Python.

Official eventbus library for Orbitalis

Local EventBus

Using Publisher/Subscriber

from busline.eventbus.async_local_eventbus import AsyncLocalEventBus
from busline.eventbus_client.publisher.local_eventbus_publisher import LocalEventBusPublisher
from busline.event.event import Event
from busline.eventbus_client.subscriber.local_eventbus_closure_subscriber import LocalEventBusClosureSubscriber

local_eventbus_instance = AsyncLocalEventBus()  # singleton


def callback(topic_name: str, event: Event):
    print(event)


subscriber = LocalEventBusClosureSubscriber(local_eventbus_instance, callback)
publisher = LocalEventBusPublisher(local_eventbus_instance)

await subscriber.subscribe("test-topic")

await publisher.publish("test-topic", Event())  # publish empty event

Using EventBusClient

from busline.event.event import Event
from busline.eventbus_client.local_eventbus_client import LocalEventBusClient


def callback(topic_name: str, event: Event):
    print(event)


client = LocalEventBusClient(callback)

await client.subscribe("test")

await client.publish("test", Event())

Create Agnostic EventBus

Implement business logic of your Publisher and Subscriber and... done. Nothing more.

from busline.event.event import Event
from busline.eventbus_client.publisher.publisher import Publisher


class YourEventBusPublisher(Publisher):

    async def _internal_publish(self, topic_name: str, event: Event, **kwargs):
        pass  # send events to your eventbus (maybe in cloud?)
from busline.eventbus_client.subscriber.subscriber import Subscriber
from busline.event.event import Event


class YourEventBusSubscriber(Subscriber):

    async def on_event(self, topic_name: str, event: Event, **kwargs):
        pass  # receive your events

You could create a client to allow components to use it instead of become a publisher or subscriber.

from busline.eventbus_client.eventbus_client import EventBusClient
from busline.event.event import Event


def client_callback(topic_name: str, e: Event):
    print(e)


subscriber = YourEventBusSubscriber(...)
publisher = YourEventBusPublisher(...)

client = EventBusClient(publisher, subscriber, ClosureEventListener(client_callback))

About

Agnostic eventbus for Python

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published