Code Example - Asset Library

Akumen’s Asset Library is a powerful tool for storing and contexualising data. As well as storing individual values such as numerics, text and dates against attributes of an asset, an individual attribute can be linked to a database tables, and filtered down to only retrieve data relevent to that asset. Database tables can be created as a result of standard model outputs, or created through Akumen’s API in the v2\Database section (see Swagger).

This code example shows how to fetch weather station information from the asset library, then how to write updates to include the current temperature and gps coordinates back to each individual asset. An output table is generated with the last 72 hour observation history, which we will then use to create a link back to each asset. Note that we can create the link to the table through the Akumen UI, but we will do that programmatically as part of the weather model.

Finally we will write a second Python model that retrieves data from the weather output table, filtering the data for the weather station we want to access the data for. The filter information is stored against the weather station’s attributes.

Build the assets

The assets we need to build need to be of template type Weather Station (see here for information regarding templates). Each weather station needs the following attributes for the following model to work:

Attribute Data Type
BOM_Id Text
Current Temperature Decimal Number
Last Updated Text
Latitude Decimal Number
Longitude Decimal Number
Observations Database Table
WMO Whole Number

Once the template is created, create 3 assets using this template called Jandakot, Perth and Perth AP. Note that we do not need to fill in all of the attributes, only enough for the model to populate them from BOM. For each asset, configure the follow (sourced from BOM):

  • Jandakot
    • BOM_Id = IDW60901
    • WMO = 94609
  • Perth
    • BOM_Id = IDW60901
    • WMO = 94608
  • Perth AP
    • BOM_Id = IDW60901
    • WMO = 94610

Once the assets are built, create a new Python model called Weather Loader, replace the contents of main.py with the following code and execute the model. Once the model is executed successfull, you will notice that the rest of the attributes for the assets are populated. Running the model a number of times updates the values. Temperature, for example, contains the latest value, but the history shows the historical values. Also note the Observations database table - clicking on View should display a popup window with the data filtered on the currently selected asset. We will use this configuration in the next section to fetch data from the table to use in a new Python model.

Build the weather model

import akumen_api
from akumen_api import progress
import requests
import pandas as pd
from datetime import datetime
import json

def akumen(**kwargs):
    """
    !! This akumen() function must exist in the execution file!

    Parameters:
        !! These lines define parameters, and a line must exist per input (or output).

        - Output: weather [tabular] {wmo:string}
    """
    progress('Running Akumen model...')

    # Get all of the assets of type "Weather Station"
    response = requests.get(f"{akumen_api.AKUMEN_API_URL}assets/template/Weather Station", headers={ 'Authorization': akumen_api.API_KEY })
    response.raise_for_status()
    assets = response.json()
    progress('Fetched weather stations from asset library')
    
    weather = None
    
    asset_updates = []
    
    # This is where we link the output table of the model to individual assets
    # Note the table name to link back to the output is the model name, replacing spaces with _ and all lower case followed by _ and the name of the output table_name
    # so this will be weather_loader_weather_vw
    # Also note the use of {} as a string within the json.  The purpose of this is to utilise the value of the asset attributes, rather than hardcoding the name in. 
    # This means that the asset value can change, but our configuration doesn't
    observations = {
        'table_name': f"{akumen_api.MODEL_NAME.replace(' ', '_').lower()}_weather_vw",
        'fields': {
            'FieldSelection1': {
                'column_name': 'history_product',
                'filter_value': '{BOM_Id}'
            },
            'FieldSelection2': {
                'column_name': 'WMO',
                'filter_value': '{WMO}'
            }
        },
        'sort_order': 'date desc'
    }
    
    # Loop through each asset, fetching it's id and wmo so we can get the data from BOM
    for asset in assets:
        id = next((attribute['value'] for attribute in asset['attributes'] if attribute['name'] == 'BOM_Id'), None)
        wmo = next((attribute['value'] for attribute in asset['attributes'] if attribute['name'] == 'WMO'), None)
        if id is None or wmo is None:
            # We don't have all the data for the weather station, so ignore
            continue
        
        response = requests.get(f"http://bom.gov.au/fwo/{id}/{id}.{wmo}.json")
        response.raise_for_status()
        
        # Return a dataframe of the observation data
        df = pd.DataFrame(response.json()['observations']['data'])

        if weather is None:
            weather = df
        else:
            # We've already populated from one weather station, so merge in
            weather = weather.append(df, ignore_index=True)

        # And populate the asset_updates, which sets the current temperature and other values in the asset library
        asset_updates.append({
            'name': asset['name'],
            'attributes': {
                'Current Temperature': df.iloc[0]['air_temp'],
                'Last Updated': datetime.now().isoformat(),
                'Latitude': df.iloc[0]['lat'],
                'Longitude': df.iloc[0]['lon'],
                'Observations': json.dumps(observations)
            }
        })
            
        progress(f"Populated weather for {asset['name']}")
        
    # Send the asset updates to the asset library
    response = requests.put(f"{akumen_api.AKUMEN_API_URL_V2}assets", headers={ 'Authorization': akumen_api.API_KEY }, json=asset_updates)
    response.raise_for_status()
    
    # And return only the relevant columns in the data table_name
    weather['date'] = pd.to_datetime(weather['local_date_time_full'], format='%Y%m%d%H%M%S')
    weather = weather[['wmo', 'name', 'history_product', 'date', 'air_temp']]

    # The akumen() function must return a dictionary including keys relating to outputs.
    return {
        'weather': weather
    }


if __name__ == '__main__':
    """
    Any local test code can be used in an import guard
    when developing on a local machine, you can put code here that won't
    get run by Akumen.
    """
    print('Running local tests...')
    assert (akumen(1, 2)['first_result'] == 3)
    assert (akumen(3, 4)['second_result'] == -1)
    print('Tests completed!')

Build the Consumer model

The consumer model is simple in that it’s sole job is to fetch the assets from the asset library to retrieve the configuration of the database table attribute, and use that to fetch the data from the database table. Create a Python model called Consumer and replace main.py with the following code.

import akumen_api
import requests
import json
import pandas as pd

def akumen(**kwargs):
    """
    !! This akumen() function must exist in the execution file!

    Parameters:
        !! These lines define parameters, and a line must exist per input (or output).

        - Output: results [tabular]
    """
    print('Running Akumen model...')

    # Request the Perth asset - for this particular model, we only want to get the data for one
    response = requests.post(f"{akumen_api.AKUMEN_API_URL}assets/read/Perth", headers={ 'Authorization': akumen_api.API_KEY })
    response.raise_for_status()
    asset = response.json()
    
    # Get the Observations value, which contains the json required to 
    database_config = json.loads(next(attribute for attribute in asset['attributes'] if attribute['name'] == 'Observations')['value'])

    if not 'table_name' in database_config:
        raise Exception('Could not find database config, has the asset been configured?')
        
    # Check if the table exists
    response = requests.get(f"{akumen_api.AKUMEN_API_URL_V2}database/table_names", headers={ 'Authorization': akumen_api.API_KEY })
    response.raise_for_status()
    table_names = response.json()
    
    table = next((table for table in table_names if table == database_config['table_name']), None)
    if table is None:
        raise Exception('Table not found in database, indicating the Weather Loader model has not been run')
        
    # Build a url and configuration to return json of the results
    url = f"{akumen_api.AKUMEN_API_URL_V2}database/{database_config['table_name']}/csv"
    field1 = f"{database_config['fields']['FieldSelection1']['column_name']}"
    field1Value = f"{database_config['fields']['FieldSelection1']['filter_value']}"
    
    if '{' in field1Value and '}' in field1Value:
        # We need to look at the the attribute value on the asset, rather than the value itself
        field1Value = next(attribute for attribute in asset['attributes'] if attribute['name'] == field1Value.replace('{', '').replace('}', ''))['value']
        
    field2 = f"{database_config['fields']['FieldSelection2']['column_name']}"
    field2Value = f"{database_config['fields']['FieldSelection2']['filter_value']}"
    if '{' in field2Value and '}' in field2Value:
        # We need to look at the the attribute value on the asset, rather than the value itself
        field2Value = next(attribute for attribute in asset['attributes'] if attribute['name'] == field2Value.replace('{', '').replace('}', ''))['value']
        
    # Now that we have the field values, we can construct the query
    body = {
        'query': f"{field1} = '{field1Value}' and {field2} = '{field2Value}'"
    }

    # And send the request
    response = requests.post(url, headers={ 'Authorization': akumen_api.API_KEY }, json=body)
    response.raise_for_status()
    
    # Download the resulting csv
    open('results.csv', 'wb').write(response.content)
    
    # And open in a pandas df for further processing if required, then return the results
    df = pd.read_csv('results.csv')
    
    # We need to only return the data columns, not any akumen columns, otherwise the model will error as there'll be duplicate columns when Akumen adds it's own columns in
    df = df[['wmo', 'name', 'history_product', 'date', 'air_temp']]

    # The akumen() function must return a dictionary including keys relating to outputs.
    return {
        'results': df
    }


if __name__ == '__main__':
    """
    Any local test code can be used in an import guard
    when developing on a local machine, you can put code here that won't
    get run by Akumen.
    """
    print('Running local tests...')
    assert (akumen(1, 2)['first_result'] == 3)
    assert (akumen(3, 4)['second_result'] == -1)
    print('Tests completed!')

Basically this model will construct a dataframe from a data table in another model, using the configuration specified against an asset. The results are returned to the data tab