runner_m.f90 Source File


This file depends on

sourcefile~~runner_m.f90~~EfferentGraph sourcefile~runner_m.f90 runner_m.f90 sourcefile~dag_m.f90 dag_m.f90 sourcefile~runner_m.f90->sourcefile~dag_m.f90 sourcefile~payload_m.f90 payload_m.f90 sourcefile~runner_m.f90->sourcefile~payload_m.f90 sourcefile~vertex_m.f90 vertex_m.f90 sourcefile~dag_m.f90->sourcefile~vertex_m.f90 sourcefile~task_m.f90 task_m.f90 sourcefile~vertex_m.f90->sourcefile~task_m.f90 sourcefile~task_m.f90->sourcefile~payload_m.f90

Contents

Source Code


Source Code

module runner_m
    !! Compute-image/Scheduler-image abstraction
    use dag_m, only: dag_t
    use iso_fortran_env, only: event_type, atomic_int_kind
    use payload_m, only: payload_t

    implicit none
    private
    public :: run
contains
    subroutine run(dag)
        type(dag_t), intent(in) :: dag

        integer(atomic_int_kind), parameter :: UNCLAIMED = 0_atomic_int_kind
        integer, parameter :: NONE_AVAILABLE = 0
        integer :: n_tasks, n_imgs, num_dependencies
        integer(atomic_int_kind) :: me
        integer(atomic_int_kind), allocatable :: taken_on(:)[:] !! Records who executed a given task
        integer(atomic_int_kind), allocatable :: num_tasks_completed[:] !! keep count of the tasks completed
        type(event_type), allocatable :: task_completed(:)[:] !! To notify the other images when task has been completed
        type(payload_t), allocatable :: mailbox(:)[:] !! storage for communicating inputs/outputs between tasks
        integer, allocatable :: dependencies(:)

        call initialize
        do while (not_all_tasks_completed())
            associate(next_task => find_next_task())
                if (next_task /= NONE_AVAILABLE) then
                    if (claim_task(next_task)) then
                        call execute_task(next_task)
                        call notify_task_completed(next_task)
                    end if
                end if
            end associate
        end do
    contains
        subroutine initialize()
            integer :: i
            n_tasks = size(dag%vertices)
            n_imgs = num_images()
            me = this_image()
            allocate( &
                taken_on(n_tasks)[*], &
                task_completed(n_tasks)[*], &
                mailbox(n_tasks)[*], &
                num_tasks_completed[*])
            if (me == 1) then
                do i = 1, n_tasks
                    call atomic_define(taken_on(i), UNCLAIMED)
                end do
                call atomic_define(num_tasks_completed, 0)
            end if
            sync all
        end subroutine

        function not_all_tasks_completed()
            logical :: not_all_tasks_completed

            integer :: tasks_completed

            call atomic_ref(tasks_completed, num_tasks_completed[1])
            not_all_tasks_completed = tasks_completed < n_tasks
        end function

        function find_next_task()
            integer :: find_next_task

            integer :: task, dependency, dependency_completed, taken_by

            task_loop: &
            do task = 1, n_tasks
                call atomic_ref(taken_by, taken_on(task)[1])
                if (taken_by == UNCLAIMED) then
                    dependencies = dag%dependencies_for(task)
                    num_dependencies = size(dependencies)
                    do dependency = 1, num_dependencies
                        call event_query(task_completed(dependencies(dependency)), dependency_completed)
                        if (dependency_completed == 0) cycle task_loop
                    end do
                    find_next_task = task
                    return
                end if
            end do task_loop
            find_next_task = NONE_AVAILABLE
        end function

        function claim_task(task)
            integer, intent(in) :: task
            logical :: claim_task

            integer(atomic_int_kind) :: old

            call atomic_cas(taken_on(task)[1], old, UNCLAIMED, me)
            claim_task = old == UNCLAIMED
        end function

        subroutine execute_task(task)
            integer, intent(in) :: task

            integer :: dependency
            type(payload_t), allocatable :: arguments(:)

            allocate(arguments(num_dependencies))
            do dependency = 1, num_dependencies
                event wait (task_completed(dependencies(dependency)))
            end do
            do concurrent (dependency = 1 : num_dependencies)
                arguments(dependency) = mailbox(dependencies(dependency))[taken_on(dependencies(dependency))[1]]
            end do
            mailbox(task) = dag%vertices(task)%task%execute(arguments)
        end subroutine

        subroutine notify_task_completed(task)
            integer, intent(in) :: task

            integer :: img, num_downstream, n

            num_downstream = size(dag%depends_on(task))
            do img = 1, n_imgs
                do n = 1, num_downstream
                    event post (task_completed(task)[img])
                end do
            end do
            call atomic_add(num_tasks_completed[1], 1)
        end subroutine
    end subroutine
end module