Code Example - Advanced Model Orchestration

While the previous model orchestrator example is useful, it is not very configurable. The configured logic needs to be altered if additional models are required in the chain. We can take this a step further and use the asset library’s Flow view to configure order in which models execute, including models that execute at the same (using async programming). Also, this code can easily be modified to handle custom situations, such as additional asset parameters that could influence the flow.

Prerequisites

There are no prerequisites for this - the model creates all necessary views and asset templates after the model has been run once.

Building the Advanced Orchestration model

Create a new Python model called Orchestrator, and create 3 additional files called asset_setup.py, assets.py and orchestrator.py. Copy and paste the code from below into each file. Also overwrite main.py with the code from below.

main.py

import asset_setup
import assets
import orchestrator

def akumen(view, **kwargs):
    """
    !! This akumen() function must exist in the execution file!

    Parameters:
        !! These lines define parameters, and a line must exist per input (or output).

        - Input: view [assetview]
    """
    print('Running Akumen model...')
    
    # Perform any configuration (if any)
    asset_setup.configure(view)
    
    # Check for empty view, means it's an initial setup
    if view == '' or view == None:
        return {}
    
    # Get the assets from the view, along with the expected order we need to execute them in    
    view_assets = assets.get(view)
    
    # Start the orchestrator
    orchestrator.start(view, view_assets)

    # The akumen() function must return a dictionary including keys relating to outputs.
    return {

    }


if __name__ == '__main__':
    """
    Any local test code can be used in an import guard
    when developing on a local machine, you can put code here that won't
    get run by Akumen.
    """
    print('Running local tests...')
    assert (akumen(1, 2)['first_result'] == 3)
    assert (akumen(3, 4)['second_result'] == -1)
    print('Tests completed!')

asset_setup.py

import akumen_api
import requests
from akumen_api import progress

# Sets up the asset templates required for orchestration to function correctly
def configure(view):
    progress('Configuring asset templates')
    
    response = requests.post(f"{akumen_api.AKUMEN_API_URL}assets/template/getlist", 
      headers = { 'Authorization': akumen_api.API_KEY }, json=['Model'])
    response.raise_for_status()
    
    template_list = response.json()
    model_template = next((template for template in template_list if template['name'] == 'Model'), None)
    
    if model_template is None:
        progress('Could not find asset template Model.  Setting up from scratch')
    else:
        progress('Asset template Model exists - confirming attributes')
        
    template = {
      "name": "Model",
      "description": None,
      "ordinal": None,
      "image": "object.png",
      "attributes": [
        {
          "name": "Last Run",
          "description": "",
          "type": "string",
          "format_string": None,
          "eng_unit_name": "",
          "enum_name": "",
          "category": None,
          "default_value": None
        },
        {
          "name": "Model Name",
          "description": "",
          "type": "string",
          "format_string": None,
          "eng_unit_name": "",
          "enum_name": "",
          "category": None,
          "default_value": None
        },
        {
          "name": "Link to Model",
          "description": "",
          "type": "url",
          "format_string": None,
          "eng_unit_name": "",
          "enum_name": "",
          "category": None,
          "default_value": None
        },
        {
          "name": "Log",
          "description": "",
          "type": "richtext",
          "format_string": None,
          "eng_unit_name": "",
          "enum_name": "",
          "category": None,
          "default_value": None
        },
        {
          "name": "Runtime",
          "description": "",
          "type": "float",
          "format_string": None,
          "eng_unit_name": "",
          "enum_name": "",
          "category": None,
          "default_value": None
        }
      ]
    }
    
    response = requests.post(f"{akumen_api.AKUMEN_API_URL}assets/template/create", headers = { 'Authorization': akumen_api.API_KEY }, json = template)
    print(response.text)
    response.raise_for_status()
        
    progress('Asset templates have been configured')
    
    if view == '' or view == None:
        # Does not exist, so create it
        view = 'Orchestration'
    
    response = requests.get(f"{akumen_api.AKUMEN_API_URL}assets/view?search_text={view}", headers = { 'Authorization': akumen_api.API_KEY })
    response.raise_for_status()
    
    if len(response.json()) == 0:
        progress(f"Asset view {view} does not exist, creating")
        
        response = requests.post(f"{akumen_api.AKUMEN_API_URL}assets/view/create",  headers = { 'Authorization': akumen_api.API_KEY }, json = { 'name': view, 'description': 'Orchestration View', 'asset_view_type': 'Flow'})
        response.raise_for_status()
        
        progress(f"Asset view has been created")
        

assets.py

import requests
import akumen_api
from akumen_api import progress

# Finds the first asset, which is the starting point of the model orchestration.  Note that there may be more than one starting point
def find_first_assets(view):
    first_assets = []
    
    for asset in view['assets']:
        relationship = next((relationship for relationship in view['asset_view_relationships'] if asset['name'] == relationship['to_asset_name']), None)
        if relationship is None:
            first_assets.append(asset['name'])
        
    return first_assets
    
# Finds all of the assets in the view in the appropriate order of execution
def find_assets_in_order(assets, relationships, assets_in_order, used_assets):
    for asset in assets:
        next_assets = [relationship['to_asset_name'] for relationship in relationships if relationship['from_asset_name'] == asset]
        # Check if we've already used an asset, if so we've detected a loop
        next_assets_to_execute = []
        for next_asset in next_assets:
            if next_asset not in used_assets:
                # And add to the used list
                used_assets.append(next_asset)
                next_assets_to_execute.append(next_asset)
                
        # And add the updated array to the assets_in_order
        if len(next_assets_to_execute) > 0:
            assets_in_order.append(next_assets_to_execute)
                
            # Got here ok, so continue recursing
            find_assets_in_order(next_assets_to_execute, relationships, assets_in_order, used_assets)

# Fetches the assets and relationships from the view
def get(view):
    progress('Fetching assets from view')
    
    response = requests.get(f"{akumen_api.AKUMEN_API_URL}assets/view/{view}", headers = { 'Authorization': akumen_api.API_KEY })
    response.raise_for_status()
    
    view = response.json()
    
    # Now we need to go through the relationships and verify that we don't have any loops (where assets are called more than once).  If that is so, then this
    # model will never finish.  First we need to look for the first asset, that is, the one with no incoming links.  If we don't find one, then we can consider
    # that a loop.  Secondly, we need to see if there's any loops within the system, that'll be where assets appear twice
    
    first_assets = find_first_assets(view)
    if len(first_assets) == 0:
        raise Exception("Could not find starting point for model orchestration.  One or more assets must have no incoming connections")
    
    # Setup an array, where each element is an array of assets.  The order of execution will be each group of assets within the array
    assets_in_order = []
    # Add the starting points
    assets_in_order.append(first_assets)
    
    # And populate assets_in_order with the recursive function
    find_assets_in_order(first_assets, view['asset_view_relationships'], assets_in_order, [])
    
    # Return a tuple of the assets in the order they should execute, as well as the assets themselves so we know the attribute details
    return { 'assets_in_order': assets_in_order, 'assets': view['assets'] }

orchestrator.py

import requests
import akumen_api
from akumen_api import progress

import asyncio

from dateutil.parser import parse

# Sets the asset status based on the results of execution
def set_asset_status(view, asset, colour):
    response = requests.post(f"{akumen_api.AKUMEN_API_URL}assets/view/{view}/update_colour/{asset['name']}", headers={'authorization': akumen_api.API_KEY }, json={ 'view_name': view, 'asset_name': asset['name'], 'colour': colour })
    response.raise_for_status()

# Checks the study status every few seconds
async def check_scenario_status(view, asset, model, study, scenario_name):
    finished = False
    
    status = 'lightblue'
    
    while not finished:
        # Make an API call to get the status of the executed scenario to determine if it was successful
        url = f"{akumen_api.AKUMEN_API_URL}models/{model['name']}/{study['name']}/{scenario_name}"
        response = requests.get(f"{url}?include_scenario_logs=false", headers={'authorization': akumen_api.API_KEY })
        try:
            response.raise_for_status()
        except:
            finished = True
            raise
            
        if finished:
            continue

        scenario = response.json()
        if scenario == None:
            # Must have been deleted or something, drop out
            finished = True
            status = 'red'
        elif scenario['run_status_string'] == 'Complete':
            status = 'lightgreen'
            finished = True
        elif scenario['run_status_string'] == 'Warning':
            status = 'lightyellow'
            finished = True
        elif scenario['run_status_string'] == 'Error':
            status = 'pink'
            finished = True
            
        await asyncio.sleep(3)
            
    # Out of the loop, so set the appropriate asset colour
    set_asset_status(view, asset, status)
    
    # Final fetch of the scenario, so we can get the log to add to the asset
    response = requests.get(f"{url}?include_scenario_logs=true", headers={'authorization': akumen_api.API_KEY })
    response.raise_for_status()
    scenario = response.json()
    
    asset['attributes'] = {
        'Last Run': scenario['executed_end'],
        'Runtime': (parse(scenario['executed_end']) - parse(scenario['executed_start'])).total_seconds(),
        'Link to Model': f"/ModelBuilder/Detail/{model['id']}?StudyId={study['id']}",
        'Log': scenario['output_log'].replace('\n', '<br />')
    }
    
    # Write some of the stats to the asset
    response = requests.put(f"{akumen_api.AKUMEN_API_URL_V2}assets", headers={'authorization': akumen_api.API_KEY }, json=[ asset ] )
    response.raise_for_status()

    # And return the scenario so we can utilise the run status to determine if we proceed with the rest of the orchestrator
    return scenario

# Executes a study
async def execute_study(view, asset):
    if asset['object_type_name'] != 'Model':
        set_asset_status(view, asset, 'lightpink')
        raise Exception("Could not execute model, as the asset template type is not of type model")
    model_name = next((attribute['value'] for attribute in asset['attributes'] if attribute['name'] == 'Model Name'), None)
    if model_name == None or model_name == '':
        raise Exception("Could not execute model, as the Model Name attribute was not set")
        
    # Get a reference to the model
    response = requests.get(f"{akumen_api.AKUMEN_API_URL}models/{model_name}?include_scenario_logs=false&include_parameters=false", headers={'authorization': akumen_api.API_KEY })
    response.raise_for_status()
    model = response.json()
    
    # Now we need to send an execution for the first scenario in the first study (this can be changed at a later date, but the orchestrator is more for utility models that only have a single scenario)
    study = model['studies'][0]
    scenario = study['scenarios'][0]
    
    # Firstly set the asset colour in light blue to indicate it's being queued
    set_asset_status(view, asset, 'lightblue')
    
    # And queue up the model
    response = requests.post(f"{akumen_api.AKUMEN_API_URL}execute/{model['name']}/{study['name']}", headers={'authorization': akumen_api.API_KEY }, json={ 'scenario_names': [scenario['name']], 'clear_outputs': True })
    response.raise_for_status()
    
    # And wait until the model is completed, returning a reference to the scenario
    scenario = await check_scenario_status(view, asset, model, study, scenario['name'])

async def run(view, view_assets):
    # Preliminary pass - go through all models, and reset run status and asset colours
    colours = {
        'view_name': view,
        'colours': {
            
        }
    }
    for asset_order in view_assets['assets_in_order']:
        for asset_name in asset_order:
            asset = next((asset for asset in view_assets['assets'] if asset['name'] == asset_name), None)
            if asset == None:
                # Don't throw an exception here, we still need to continue resetting everything
                continue
            colours['colours'][asset_name] = 'white'
            model_name = next((attribute['value'] for attribute in asset['attributes'] if attribute['name'] == 'Model Name'), None)
            response = requests.put(f"{akumen_api.AKUMEN_API_URL}models/{model_name}/clear_outputs", headers={'authorization': akumen_api.API_KEY })
            # Don't do a raise for status, we want to continue without throwing any exceptions
            
    # Clear the asset colours in one hit
    response = requests.post(f"{akumen_api.AKUMEN_API_URL}assets/view/{view}/update_colours", headers={'authorization': akumen_api.API_KEY }, json=colours)
    # Again, don't raise for status, allow this reset to continue
    
    for asset_order in view_assets['assets_in_order']:
        execution_tasks = []
        for asset_name in asset_order:
            asset = next((asset for asset in view_assets['assets'] if asset['name'] == asset_name), None)
            if asset == None:
                raise Exception(f"Could not find asset {asset_name}")
                
            # Build a list of tasks to execute based on the assets in order
            execution_tasks.append(asyncio.create_task(execute_study(view, asset)))
    
        # And run these tasks asynchronously - this will hang up and wait until all the tasks in the list are completed before moving onto the next  list
        results = await asyncio.gather(*execution_tasks, return_exceptions=True)
        for result in results:
            if type(result) == Exception:
                # There's been a problem, so we need to signal that the orchestration has failed.  Leave all scenarios in their current
                # state so the failure can be diagnosed
                raise Exception(f"The orchestration model has failed with error {result.args[0]}.  Check the execution models to determine the cause")

# Starts the orchestrator process, executing the studies in the order defined in the 
def start(view, view_assets):
    asyncio.run(run(view, view_assets))

Once the code has been created, simply run the model once and it will create the necessary asset template (called Model) and an empty view called Orchestration

Use of the model

In order to use this model, simply create one or more “worker” models, that is, models that need to be executed in an order. They could do things like fetch results from the previous model, but only once the previous model has successfully run. This needs to be coded into each model. Alternatively, the model could populate some additional attributes in the asset library that can be used by the destination model.

The next step is to create assets in the Asset Library Master View of type Model. Simply fill in the Model Name field - the rest of the fields we be populated by the orchestrator model.

Master View Master View

Move over to the Orchestration view, and drag/drop the assets onto the surface in the order they need to be executed.

Orchestration View Orchestration View

Once complete, run the orchestration model. The assets will change colour as the orchestration model is executed, blue to indicate running, green success and pink failure.

Also examine the asset attributes, they will include information such as the log, runtime etc.