Lesson 8: Pub/Sub

The second control flow mechanism of Orchestra is the pub/sub mechanism, where outlets are used as event queue. see more at <https://docs.ledr.io/en/the-orchestra-platfrom/subscribers-events-and-pub-sub>

python src/module/lesson_8_pubsub.py

Code

  1import time
  2from threading import Thread
  3import avesterra as av
  4from dotenv import find_dotenv, load_dotenv
  5from orchestra import env
  6import lesson_7_adapter as number_adapter
  7
  8"""
  9You are the referee of a soccer match. You mission is count the score of both
 10team.
 11You can re-use the "number adapter" to create two entities, each keeping track
 12of the score of one team.
 13There are two entities "Team 1 goal" and "Team 2 goal", you must listen to
 14events about ball entering the net.
 15Every time the "Team 1 goal" has a "Ball entered goal" event, we shall 
 16increment the number of the "Team 2 score" entity, and vice-versa.
 17"""
 18
 19shutdown = False
 20
 21
 22def listen_event_loop(
 23    referee_outlet: av.AvEntity,
 24    auth: av.AvAuthorization,
 25    team_1_score: av.AvEntity,
 26    team_2_score: av.AvEntity,
 27):
 28    """This function counts points !"""
 29    # Every call to `av.wait_event` will only receive one event.
 30    # We typically have some kind of infinite loop to keep listening to events
 31    while True:
 32        # This will block until there is a new event on the outlet
 33        # A timeout of `0` means no timeout
 34        try:
 35            event = av.wait_event(referee_outlet, timeout=0, authorization=auth)
 36        except Exception:
 37            if shutdown:
 38                break
 39            raise
 40
 41        # Note: Here we don't check the nature of the event since there's only
 42        # one kind of event we expect.
 43        # In a real system though, you probably would check what kind of event
 44        # it was
 45
 46        # We get the name of the entity onto which an event was published
 47        entity_name = av.entity_name(event.entity, auth)
 48        if entity_name == "Team 1 goal":
 49            score = number_adapter.get_number(team_2_score, auth)
 50            score += 1
 51            number_adapter.set_number(team_2_score, score, auth)
 52        elif entity_name == "Team 2 goal":
 53            score = number_adapter.get_number(team_1_score, auth)
 54            score += 1
 55            number_adapter.set_number(team_1_score, score, auth)
 56
 57
 58def demo_pubsub():
 59    global team_1_score, team_2_score, shutdown
 60    load_dotenv(find_dotenv())
 61    av.initialize(
 62        server=env.get_or_raise(env.AVESTERRA_HOST),
 63        directory=env.get_or_raise(env.AVESTERRA_CERTIFICATE_DIR_PATH),
 64    )
 65    auth = env.get_or_raise(env.AVESTERRA_AUTH, av.AvAuthorization)
 66
 67    time.sleep(0.1)  # Give time to the number adapter to start
 68
 69    team_1_score = number_adapter.create_number_entity("Team 1 score", auth)
 70    team_2_score = number_adapter.create_number_entity("Team 2 score", auth)
 71
 72    # For simplicity, we don't set any metadata on these entities.
 73    # In a real system, you should
 74    team_1_goal = av.create_entity("Team 1 goal", authorization=auth)
 75    team_2_goal = av.create_entity("Team 2 goal", authorization=auth)
 76
 77    # This outlet will be the event queue from which we will be getting events
 78    referee_outlet = av.create_outlet("Referee outlet", authorization=auth)
 79
 80    # We subscribe both goals to the outlet, so the outlet receive any event
 81    # published on either of these entities
 82    av.subscribe_event(team_1_goal, referee_outlet, authorization=auth)
 83    av.subscribe_event(team_2_goal, referee_outlet, authorization=auth)
 84
 85    # Run the listen loop in background
 86    Thread(
 87        target=listen_event_loop,
 88        args=(referee_outlet, auth, team_1_score, team_2_score),
 89    ).start()
 90
 91    assert number_adapter.get_number(team_1_score, auth) == 0
 92    assert number_adapter.get_number(team_2_score, auth) == 0
 93
 94    av.publish_event(team_1_goal, name="Ball entered the net!", authorization=auth)
 95    time.sleep(0.1)  # Time for the referee to update scores
 96
 97    assert number_adapter.get_number(team_1_score, auth) == 0
 98    assert number_adapter.get_number(team_2_score, auth) == 1
 99
100    av.publish_event(team_1_goal, name="Ball entered the net!", authorization=auth)
101    time.sleep(0.1)  # Time for the referee to update scores
102
103    assert number_adapter.get_number(team_1_score, auth) == 0
104    assert number_adapter.get_number(team_2_score, auth) == 2
105
106    av.publish_event(team_2_goal, name="Ball entered the net!", authorization=auth)
107    time.sleep(0.1)  # Time for the referee to update scores
108
109    assert number_adapter.get_number(team_1_score, auth) == 1
110    assert number_adapter.get_number(team_2_score, auth) == 2
111
112    av.publish_event(team_2_goal, name="Ball entered the net!", authorization=auth)
113    time.sleep(0.1)  # Time for the referee to update scores
114
115    assert number_adapter.get_number(team_1_score, auth) == 2
116    assert number_adapter.get_number(team_2_score, auth) == 2
117
118    av.publish_event(team_1_goal, name="Ball entered the net!", authorization=auth)
119    time.sleep(0.1)  # Time for the referee to update scores
120
121    assert number_adapter.get_number(team_1_score, auth) == 2
122    assert number_adapter.get_number(team_2_score, auth) == 3
123
124    # team 2 wins :)
125
126    shutdown = True
127    number_adapter.adapter.shutdown()
128
129
130if __name__ == "__main__":
131    Thread(target=demo_pubsub).start()
132
133    # Remember our friend the "number adapter" from last lesson?
134    number_adapter.run_number_adapter()