Lesson 8: Pub/Sub
The second control flow mechanism of Orchestra is the pub/sub mechanism, where outlets are used as event queue. see more at <https://docs.ledr.io/en/the-orchestra-platfrom/subscribers-events-and-pub-sub>
python src/module/lesson_8_pubsub.py
Code
1import time
2from threading import Thread
3import avesterra as av
4from dotenv import find_dotenv, load_dotenv
5from orchestra import env
6import lesson_7_adapter as number_adapter
7
8"""
9You are the referee of a soccer match. You mission is count the score of both
10team.
11You can re-use the "number adapter" to create two entities, each keeping track
12of the score of one team.
13There are two entities "Team 1 goal" and "Team 2 goal", you must listen to
14events about ball entering the net.
15Every time the "Team 1 goal" has a "Ball entered goal" event, we shall
16increment the number of the "Team 2 score" entity, and vice-versa.
17"""
18
19shutdown = False
20
21
22def listen_event_loop(
23 referee_outlet: av.AvEntity,
24 auth: av.AvAuthorization,
25 team_1_score: av.AvEntity,
26 team_2_score: av.AvEntity,
27):
28 """This function counts points !"""
29 # Every call to `av.wait_event` will only receive one event.
30 # We typically have some kind of infinite loop to keep listening to events
31 while True:
32 # This will block until there is a new event on the outlet
33 # A timeout of `0` means no timeout
34 try:
35 event = av.wait_event(referee_outlet, timeout=0, authorization=auth)
36 except Exception:
37 if shutdown:
38 break
39 raise
40
41 # Note: Here we don't check the nature of the event since there's only
42 # one kind of event we expect.
43 # In a real system though, you probably would check what kind of event
44 # it was
45
46 # We get the name of the entity onto which an event was published
47 entity_name = av.entity_name(event.entity, auth)
48 if entity_name == "Team 1 goal":
49 score = number_adapter.get_number(team_2_score, auth)
50 score += 1
51 number_adapter.set_number(team_2_score, score, auth)
52 elif entity_name == "Team 2 goal":
53 score = number_adapter.get_number(team_1_score, auth)
54 score += 1
55 number_adapter.set_number(team_1_score, score, auth)
56
57
58def demo_pubsub():
59 global team_1_score, team_2_score, shutdown
60 load_dotenv(find_dotenv())
61 av.initialize(
62 server=env.get_or_raise(env.AVESTERRA_HOST),
63 directory=env.get_or_raise(env.AVESTERRA_CERTIFICATE_DIR_PATH),
64 )
65 auth = env.get_or_raise(env.AVESTERRA_AUTH, av.AvAuthorization)
66
67 time.sleep(0.1) # Give time to the number adapter to start
68
69 team_1_score = number_adapter.create_number_entity("Team 1 score", auth)
70 team_2_score = number_adapter.create_number_entity("Team 2 score", auth)
71
72 # For simplicity, we don't set any metadata on these entities.
73 # In a real system, you should
74 team_1_goal = av.create_entity("Team 1 goal", authorization=auth)
75 team_2_goal = av.create_entity("Team 2 goal", authorization=auth)
76
77 # This outlet will be the event queue from which we will be getting events
78 referee_outlet = av.create_outlet("Referee outlet", authorization=auth)
79
80 # We subscribe both goals to the outlet, so the outlet receive any event
81 # published on either of these entities
82 av.subscribe_event(team_1_goal, referee_outlet, authorization=auth)
83 av.subscribe_event(team_2_goal, referee_outlet, authorization=auth)
84
85 # Run the listen loop in background
86 Thread(
87 target=listen_event_loop,
88 args=(referee_outlet, auth, team_1_score, team_2_score),
89 ).start()
90
91 assert number_adapter.get_number(team_1_score, auth) == 0
92 assert number_adapter.get_number(team_2_score, auth) == 0
93
94 av.publish_event(team_1_goal, name="Ball entered the net!", authorization=auth)
95 time.sleep(0.1) # Time for the referee to update scores
96
97 assert number_adapter.get_number(team_1_score, auth) == 0
98 assert number_adapter.get_number(team_2_score, auth) == 1
99
100 av.publish_event(team_1_goal, name="Ball entered the net!", authorization=auth)
101 time.sleep(0.1) # Time for the referee to update scores
102
103 assert number_adapter.get_number(team_1_score, auth) == 0
104 assert number_adapter.get_number(team_2_score, auth) == 2
105
106 av.publish_event(team_2_goal, name="Ball entered the net!", authorization=auth)
107 time.sleep(0.1) # Time for the referee to update scores
108
109 assert number_adapter.get_number(team_1_score, auth) == 1
110 assert number_adapter.get_number(team_2_score, auth) == 2
111
112 av.publish_event(team_2_goal, name="Ball entered the net!", authorization=auth)
113 time.sleep(0.1) # Time for the referee to update scores
114
115 assert number_adapter.get_number(team_1_score, auth) == 2
116 assert number_adapter.get_number(team_2_score, auth) == 2
117
118 av.publish_event(team_1_goal, name="Ball entered the net!", authorization=auth)
119 time.sleep(0.1) # Time for the referee to update scores
120
121 assert number_adapter.get_number(team_1_score, auth) == 2
122 assert number_adapter.get_number(team_2_score, auth) == 3
123
124 # team 2 wins :)
125
126 shutdown = True
127 number_adapter.adapter.shutdown()
128
129
130if __name__ == "__main__":
131 Thread(target=demo_pubsub).start()
132
133 # Remember our friend the "number adapter" from last lesson?
134 number_adapter.run_number_adapter()