Create Slave App Part 1

The second step is to create the slave application.

  • Access the App manager by clicking on the Apps button
  • Create a new Python application called Slave
  • When in the application, create a new main.py file
  • Copy the following code into main.py, overwriting all contents
Tip

All of the code is listed below with full code comments so that once it’s created under your client it can remain as a reference on how Akumen apps can talk to one another.

This application does the following:

  • Demonstrates how to handle errors coming out of any api call
  • Demonstrates how to setup authorization and the API token
  • Gets a list of input parameters from a different model
  • Modifies the value of an input parameter from another model
  • Kick off execution of one model from another model
  • Create a loop to check if another model is currently executing (timing out if it takes too long)
  • Once the Master app is executed, fetch the results so they can be used
Tip

Make sure to follow through with “Create Slave App Part 2” or this code will not work!

# Use the requests module to interact with other Akumen apps
import requests

# Use the akumen_api model to access additional variables etc from Akumen
# See https://docs.akumen.io/apps/build/code/helpers/ for documentation on available variables
import akumen_api

import os
import json
import numpy as np

import time

from akumen_api import progress

def akumen(scenario, **kwargs):
    """
    We can use a scenario parameter to allow the end user to select the model/study/scenario, rather than baking it into the model via code
    This also transfers to the Pages templates as a series of selection drop down lists
    - Input: scenario [scenario]
    """
    print('Running Akumen model...')
    # First we want to get the input parameters that make up the Master model
    url = os.path.join(akumen_api.AKUMEN_API_URL, 'models', scenario['model_name'], scenario['study_name'], scenario['scenario_name'], 'input_parameters')

    # We need an authorization token to allow us to access the other app using the currently executing user.  In this case, we use an API key available
    # in the akumen_api.  Note that if a model is scheduled, the API token is of the runas user, which could be a service user
    headers = { 'authorization': akumen_api.API_KEY }
    
    response = requests.get(url, headers=headers)
    response_json = json.loads(response.text)
    if response.status_code >= 400:
        # Any errors that come through are returned as json in the error field.  You can trap for these and raise them.  Akumen will
        # automatically set the run status of the model to error, returning the result to the end user.
        # 500 errors are when there is an exception
        # 404 errors are when the url is not formed correctly, or it cannot find the model/study/scenario
        raise Exception(response_json['error'])
        
    progress("Fetched input values")
        
    # If we get here successfully, ie no errors, we can look at the input parameters - it comes through as an array of input parameters from the model
    print(response_json)
    for i in range(len(response_json)):
        if response_json[i]['parameter_name'] == 'first':
            # Here we're checking we have the right parameters, under the hood, outputs could also come through here, or you could have have multiple
            # parameters you need to handle, so we need to make sure we get the correct one
            # Lets just give it a random value
            response_json[i]['expression'] = np.random.randint(0,100)
            
    # Ok, we've set the input parameter, lets save it back using another api call - we can send the entire list back, or an individual one - these
    # are two separate calls
    url = os.path.join(akumen_api.AKUMEN_API_URL, 'models', scenario['model_name'], scenario['study_name'], scenario['scenario_name'], 'SaveInputParameters')
    # We can can feed back the updated data as a json parameter to the requests - if you flick back to the master model, you'll see the value
    # change in the research grid, and the run status get reset if the model has previously run
    # Note we also pass in the scope - this is to ensure the scope is correctly set as required
    response = requests.post(url, headers=headers, json={'input_parameters': response_json, 'scope': 'Model', 'reset_run_status': True})
    if response.status_code >= 400:
        # Note the response json call is inside here - that's because setting input parameters doesn't actually return anything
        response_json = json.loads(response.text)
        raise Exception(response_json['error'])
        
    progress("Written changed input values")
        
    # Now that we've set a value, lets kick off the model execution.  This is a fire and forget, so we're going to have to sit and wait
    # for it to finish.  We could've set the "dependent" checkbox in the research grid and let Akumen handle it, but then we woulnd't be able to 
    # demonstrate this

    # Note we now use the execute part of the API - there are two execute functions.  The one we're calling is where we simply execute an entire
    # study, which executes every scenario within that study.  The second is the execute where we don't pass a study name.  In this case
    # none of the scenarios run, it simply evaluates the model with the supplied inputs on the fly - it doesn't actually save anything to the database
    # either, but returns the results immediately.  This is super handy for those "micro" type Python models (eg the blast furnace model
    # could do this, getting data from a bunch of other models in Akumen).  But in this case, we're simply going to execute the study
    url = os.path.join(akumen_api.AKUMEN_API_URL, 'execute', scenario['model_name'], scenario['study_name'])
    # We pass in an empty list of scenario names, indicating we want to execute them all
    response = requests.post(url, headers=headers, json={'scenario_names': []})
    if response.status_code >= 400:
        # Note the response json call is inside here - that's because setting input parameters doesn't actually return anything
        response_json = json.loads(response.text)
        raise Exception(response_json['errors'])
        
    progress("Initiated model execution")
        
    # You can flick across to the Master model to see if it's queued or executed at this point
    # Now lets setup a loop, waiting for a successful (or not) execution)
    executed = False
    loop_counter = 0
    url = os.path.join(akumen_api.AKUMEN_API_URL, 'models', scenario['model_name'], scenario['study_name'], scenario['scenario_name'])
    while not executed:
        if loop_counter > 5: 
            # Make sure we don't go into an indefinite loop
            raise Exception('Exceeded loop counter - something went wrong')
        response = requests.get(url, headers=headers)
        response_json = json.loads(response.text)
        if response.status_code >= 400:
            raise Exception(response_json['error'])
        if response_json['run_status_string'] == "Complete" or response_json['run_status_string'] == 'Error':
            executed = True
        else:
            # Wait for 5 seconds before trying again
            time.sleep(5)
            # We can use the progress call - which sends the run logs back to Akumen during model execution, so we can get feedback
            # on what's going on while we're waiting
            progress('Not yet executed, waiting')
            loop_counter = loop_counter + 1
            
    # If we make it here, we're all good, we can now proceed to fetch the results from the results view.  We won't do anything with this,
    # we'll simply print them out.  The results view are available in the data tab of the Master model
    url = os.path.join(akumen_api.AKUMEN_API_URL, 'models', scenario['model_name'], scenario['study_name'], 'results_vw', 'results')
    response = requests.post(url, headers=headers, json={'scenario_names': []})
    response_json = json.loads(response.text)
    if response.status_code >= 400:
        raise Exception(response_json['error'])
        
    progress("Fetching results")
            
    # And spit out the results
    print(response_json)

    # The akumen() function must return a dictionary including keys relating to outputs.
    return {

    }