Distributed Computing

Distributed computing (or parallelism) is the practice of dividing computing tasks among multiple processing resources to speed up computations.

In apsimNGpy, this is achieved through the MultiCoreManager API, which abstracts away most of the setup required for distributing tasks.

Below, we’ll walk through a step-by-step example of using this API

from apsimNGpy.core.multi_core import MultiCoreManager
from apsimNGpy.core.apsim import ApsimModel
from pathlib import Path

In this example (v0.39.03.11+), we assume your APSIM files are already prepared (or available in various locations) and you simply want a speed boost when running them and processing results. For demonstration purposes, we’ll generate some example jobs:

# Best to supply a generator so jobs are created on the fly
create_jobs = (
    ApsimModel('Maize').path
    for _ in range(100)
)

Here we use the ApsimModel class to clone from the template Maize model shipped with APSIM. If you do not specify the out_path argument, each file is assigned a random filename. This is critical in multi-processing—you must ensure that no two processes share the same filename, otherwise the run will fail.

To explicitly set unique filenames for each simulation:

create_jobs = (ApsimModel('Maize', out_path = Path(f"_{i}.apsimx").resolve()).path for i in range(100))

Tip

The key idea: every file must have a unique identifier to avoid race conditions during parallel execution.

Instantiating and Running the Simulations

if __name__ == '__main__': # a guard is required
    # initialize
    task_manager = MultiCoreManager(db_path=Path('test.db').resolve(), agg_func=None)
    # Run all the jobs
    task_manager.run_all_jobs(create_jobs, n_cores=16, threads=False, clear_db=True)
    # this the progress info
    Processing all jobs. Please wait!: :  |██████████| 100.0%| [100/100]| Complete | 1.07s/iteration | Elapsed time: 00:01:46.850
    # get the results
    df = task_manager.get_simulated_output(axis=0)
    # same as
    data = task_manager.results  # defaults is axis =0

If agg_func is specified, it can be one of: mean, median, sum, min, or max. Each results table will then be summarized using the selected aggregation function.

clear_db is used to clear the database tables before all new entries are added

threads (bool): If True, use threads; if False, use processes. For CPU-bound tasks like this one, processes are preferred as they prevent resource contention and blocking inherent to threads.

n_cores (int): Specifies the number of worker cores to use for the task. The workload will be divided among these workers. If the number of cores is large but the number of tasks is small, some scheduling overhead will occur, and workers may remain idle while waiting for available tasks.

Customization

If you dont want to use the above API, you can still build things from scratch

from pathlib import Path
from apsimNGpy.core.apsim import ApsimModel
from apsimNGpy.core_utils.database_utils import read_db_table
from apsimNGpy.parallel.process import custom_parallel
import pandas as pd
from sqlalchemy import create_engine


DATABAse = str(Path('test_custom.db').resolve())

# define function to insert insert results
def insert_results(db_path, results, table_name):
    """
    Insert a pandas DataFrame into a SQLite table.

    Parameters
    ----------
    db_path : str or Path
        Path to the SQLite database file.
    results : pandas.DataFrame
        DataFrame to insert into the database.
    table_name : str
        Name of the table to insert the data into.
    """
    if not isinstance(results, pd.DataFrame):
        raise TypeError("`results` must be a pandas DataFrame")

    engine = create_engine(f"sqlite:///{db_path}")
    results.to_sql(table_name, con=engine, if_exists='append', index=False)


def worker(nitrogen_rate, model):
    out_path = Path(f"_{nitrogen_rate}.apsimx").resolve()
    model = ApsimModel(model, out_path=out_path)
    model.edit_model("Models.Manager", model_name='Fertilise at sowing', Amount=nitrogen_rate)
    model.run(report_name="Report")
    df = model.results
    # we can even create column for each simulation
    df['nitrogen rate'] = nitrogen_rate
    insert_results(DATABAse, df, 'Report')
    model.clean_up()


if __name__ == '__main__':

    for _ in custom_parallel(worker, range(0, 400, 10), 'Maize', n_cores=6, use_threads=False):
        pass
    # get the results
    data = read_db_table(DATABAse, report_name="Report")

Processing via 'worker' please wait!:  |██████████| 100.0%| [40/40]| Complete | 0.76s/iteration | Elapsed time: 00:00:30.591

print(data)
    SimulationName  SimulationID  ...  source_table nitrogen rate
0       Simulation             1  ...        Report            20
1       Simulation             1  ...        Report            20
2       Simulation             1  ...        Report            20
3       Simulation             1  ...        Report            20
4       Simulation             1  ...        Report            20
..             ...           ...  ...           ...           ...
395     Simulation             1  ...        Report           380
396     Simulation             1  ...        Report           380
397     Simulation             1  ...        Report           380
398     Simulation             1  ...        Report           380
399     Simulation             1  ...        Report           380
[400 rows x 18 columns]

Our 40 simulations ran in 30 seconds only, almost 0.76 seconds per simulation.

Note

Performance can vary between systems depending on hardware specifications, such as RAM, processor clock speed, and the number of CPU cores.