Code Example - Model Orchestration
There are occasions where models need to be executed in a certain order, or only execute models where certain conditions are met. This can be achieved by creating a “model orchestration” model, that is, a model whose sole purpose is to control the execution of other models. This can be executed manually, or scheduled, or the code below used in an orchestration framework called Airflow.
Prerequisites
Simply create two Python models, one called Model Orchestrator, and a second one based on the default Python template.
About the model
The input into the model is of type Scenario. This parameter provides a popup to select the Model, Study and Scenario (though the scenario is only used for the picker, and not used in the logic below). This could be a parameter fed in from Airflow, or be hard coded into the model. When the model runs, it executes the requested model and study (and if there are multiple scenarios it will execute all scenarios), waiting for them all to be completed before returning a successful run.
Additional features can be added to the model, such as returning the run log, or kicking off additional runs.
Building the Orchestration Model
import time
import requests
import akumen_api
from akumen_api import progress
def akumen(scenario, **kwargs):
"""
!! This akumen() function must exist in the execution file!
Parameters:
!! These lines define parameters, and a line must exist per input (or output).
- Input: scenario [scenario]
"""
print('Running Akumen model...')
modelname = scenario['model_name']
studyname = scenario['study_name']
authkey = akumen_api.API_KEY
# --- Execute the selected study defined by the inputs
url = f"{akumen_api.AKUMEN_API_URL}execute/{modelname}/{studyname}/"
response = requests.post(url, headers={'authorization': authkey }, json={ 'clear_outputs': True })
response.raise_for_status()
status = 'Running'
while status == 'Running':
progress('Checking status')
# --- Get the model and study, including the logs
url = f"{akumen_api.AKUMEN_API_URL}models/{modelname}/{studyname}?include_scenario_logs=true&include_parameters=false"
response = requests.get(url, headers={'authorization': authkey } )
response.raise_for_status()
study = response.json()
# -- Loop through all of the scenarios, and check the run status - we need to make sure all scenarios in the study are complete
scenario_count = len(study['scenarios'])
completed_count = len([scenario for scenario in study['scenarios'] if scenario['run_status_string'] == 'Complete' or scenario['run_status_string'] == 'Warning'])
error_count = len([scenario for scenario in study['scenarios'] if scenario['run_status_string'] == 'Error'])
if completed_count + error_count == scenario_count:
if error_count > 0:
status = 'Error'
else:
status = 'Complete'
time.sleep(5)
# The akumen() function must return a dictionary including keys relating to outputs.
return {
}
if __name__ == '__main__':
"""
Any local test code can be used in an import guard
when developing on a local machine, you can put code here that won't
get run by Akumen.
"""
print('Running local tests...')
assert (akumen(1, 2)['first_result'] == 3)
assert (akumen(3, 4)['second_result'] == -1)
print('Tests completed!')