Source code for module.lesson_8_adapters

import time
import traceback

from dotenv import load_dotenv, find_dotenv
import avesterra as av
import orchestra.env as env
from orchestra import mount
import json
import random
import threading
from orchestra.orchestra_adapter import OrchestraAdapter, ValueType

load_dotenv(find_dotenv())
HOST = env.get_or_raise("AVESTERRA_HOST")
CERT_DIR = env.get_or_raise("AVESTERRA_CERTIFICATE_DIR_PATH")
AUTH = env.get_or_raise(env.AVESTERRA_AUTH, av.AvAuthorization)
MOUNT_KEY = "number_adapter"


[docs] def set_count(entity: av.AvEntity, count: int, auth: av.AvAuthorization): """ This function is the driver/client code that calls the adapter route to set_count :param entity: (AvEntity) of the number :param count: (int) count to be set :param auth: The orchestra server authorization that allows entity operations. """ av.invoke_entity_retry_bo( entity=entity, method=av.AvMethod.SET, attribute=av.AvAttribute.COUNT, value=av.AvValue.encode_integer(count), authorization=auth, )
[docs] def get_count(entity: av.AvEntity, auth: av.AvAuthorization) -> int: """ This function is the driver/client code that calls the adapter routet to get_count :param entity: (AvEntity) of the number :param auth: The orchestra server authorization that allows entity operations. """ val = av.invoke_entity_retry_bo( entity=entity, method=av.AvMethod.GET, attribute=av.AvAttribute.COUNT, authorization=auth, ) return val.decode_integer()
[docs] def create_number(name: str, auth: av.AvAuthorization) -> av.AvEntity: """ This function shows how to connect an entity to an adapter. Note, the use of create_entity instead of create_object. This is because create_object automatically connects to the object adapter and we don't want that. :param name: (str) The number entity name :param auth: The Orchestra server authorization that allows entity operations. :return: (AvEntity) The number entity """ number_entity = av.create_entity( name=name, key=name.lower(), context=av.AvContext.ORCHESTRA, category=av.AvCategory.MATHEMATICS, klass=av.AvClass.MATHEMATICS, authorization=auth, ) av.connect_method(entity=number_entity, outlet=mount.get_outlet(MOUNT_KEY, auth=auth), authorization=auth) print(f" The entity ID of the number entity is: {number_entity}") return number_entity
[docs] def main(): """ Main function of the driver code """ # Create a number entity that we can update its count through an outlet number_entity = create_number(name="My Number", auth=AUTH) try: set_count(entity=number_entity, count=1, auth=AUTH) print(f'{get_count(entity=number_entity, auth=AUTH)}') assert get_count(entity=number_entity, auth=AUTH) == 1 set_count(entity=number_entity, count=42, auth=AUTH) assert get_count(entity=number_entity, auth=AUTH) == 42 set_count(entity=number_entity, count=100, auth=AUTH) assert get_count(entity=number_entity, auth=AUTH) == 100 except Exception as e: print(f"Problem with set and get count: {traceback.format_exc()}")
if __name__ == "__main__": adapter = OrchestraAdapter( mount_key=MOUNT_KEY, version="1.0.0", description="Exposes interface for updating count" ) number_count = {} @adapter.route("Set number count") @adapter.method(av.AvMethod.SET) @adapter.attribute(av.AvAttribute.COUNT) @adapter.value_in(ValueType.integer()) @adapter.value_out(ValueType.null()) def set_number_count(entity: av.AvEntity, value: av.AvValue) -> av.AvValue: """ Sets the number count. :param entity: (AvEntity) that is the entity set the count of :param value: (AvValue) that encodes number :return: (AvValue) with Null """ count = value.decode_integer() number_count[entity] = count return av.NULL_VALUE @adapter.route("Get number count") @adapter.method(av.AvMethod.GET) @adapter.attribute(av.AvAttribute.COUNT) @adapter.value_out(ValueType.integer()) def get_number_count(entity: av.AvEntity) -> av.AvValue: """ Gets the count value from the entity. :param entity: (AvEntity) the entity which contains the count :return: (AvValue) with the count """ count = number_count.get(entity, 0) return av.AvValue.encode_integer(count) adapter_driver_code_thread = threading.Thread(target=main) adapter_driver_code_thread.start() adapter.run()