Add scheduling, machine info, caching strategies and devices (#9)
Reviewed-on: Rubydragon/MetagraphOptimization.jl#9 Co-authored-by: Anton Reinhard <anton.reinhard@proton.me> Co-committed-by: Anton Reinhard <anton.reinhard@proton.me>
This commit is contained in:
@@ -1,126 +1,157 @@
|
||||
using DataStructures
|
||||
|
||||
"""
|
||||
gen_code(graph::DAG)
|
||||
|
||||
Generate the code for a given graph. The return value is a tuple of:
|
||||
Generate the code for a given graph. The return value is a named tuple of:
|
||||
|
||||
- `code::Expr`: The julia expression containing the code for the whole graph.
|
||||
- `inputSymbols::Dict{String, Symbol}`: A dictionary of symbols mapping the names of the input nodes of the graph to the symbols their inputs should be provided on.
|
||||
- `inputSymbols::Dict{String, Vector{Symbol}}`: A dictionary of symbols mapping the names of the input nodes of the graph to the symbols their inputs should be provided on.
|
||||
- `outputSymbol::Symbol`: The symbol of the final calculated value
|
||||
|
||||
See also: [`execute`](@ref)
|
||||
"""
|
||||
function gen_code(graph::DAG)
|
||||
code = Vector{Expr}()
|
||||
sizehint!(code, length(graph.nodes))
|
||||
function gen_code(graph::DAG, machine::Machine)
|
||||
sched = schedule_dag(GreedyScheduler(), graph, machine)
|
||||
|
||||
nodeQueue = PriorityQueue{Node, Int}()
|
||||
inputSyms = Dict{String, Symbol}()
|
||||
codeAcc = Vector{Expr}()
|
||||
sizehint!(codeAcc, length(graph.nodes))
|
||||
|
||||
# use a priority equal to the number of unseen children -> 0 are nodes that can be added
|
||||
for node in sched
|
||||
# TODO: this is kind of ugly, should init nodes be scheduled differently from the rest?
|
||||
if (node isa DataTaskNode && length(node.children) == 0)
|
||||
push!(codeAcc, get_init_expression(node, entry_device(machine)))
|
||||
continue
|
||||
end
|
||||
push!(codeAcc, get_expression(node))
|
||||
end
|
||||
|
||||
# get inSymbols
|
||||
inputSyms = Dict{String, Vector{Symbol}}()
|
||||
for node in get_entry_nodes(graph)
|
||||
enqueue!(nodeQueue, node => 0)
|
||||
push!(inputSyms, node.name => Symbol("data_$(to_var_name(node.id))_in"))
|
||||
if !haskey(inputSyms, node.name)
|
||||
inputSyms[node.name] = Vector{Symbol}()
|
||||
end
|
||||
|
||||
push!(inputSyms[node.name], Symbol("$(to_var_name(node.id))_in"))
|
||||
end
|
||||
|
||||
node = nothing
|
||||
while !isempty(nodeQueue)
|
||||
@assert peek(nodeQueue)[2] == 0
|
||||
node = dequeue!(nodeQueue)
|
||||
# get outSymbol
|
||||
outSym = Symbol(to_var_name(get_exit_node(graph).id))
|
||||
|
||||
push!(code, get_expression(node))
|
||||
for parent in node.parents
|
||||
# reduce the priority of all parents by one
|
||||
if (!haskey(nodeQueue, parent))
|
||||
enqueue!(nodeQueue, parent => length(parent.children) - 1)
|
||||
else
|
||||
nodeQueue[parent] = nodeQueue[parent] - 1
|
||||
end
|
||||
return (code = Expr(:block, codeAcc...), inputSymbols = inputSyms, outputSymbol = outSym)
|
||||
end
|
||||
|
||||
function gen_cache_init_code(machine::Machine)
|
||||
initializeCaches = Vector{Expr}()
|
||||
|
||||
for device in machine.devices
|
||||
push!(initializeCaches, gen_cache_init_code(device))
|
||||
end
|
||||
|
||||
return Expr(:block, initializeCaches...)
|
||||
end
|
||||
|
||||
function gen_input_assignment_code(
|
||||
inputSymbols::Dict{String, Vector{Symbol}},
|
||||
processDescription::AbstractProcessDescription,
|
||||
machine::Machine,
|
||||
processInputSymbol::Symbol = :input,
|
||||
)
|
||||
@assert length(inputSymbols) >=
|
||||
sum(values(in_particles(processDescription))) + sum(values(out_particles(processDescription))) "Number of input Symbols is smaller than the number of particles in the process description"
|
||||
|
||||
assignInputs = Vector{Expr}()
|
||||
for (name, symbols) in inputSymbols
|
||||
type = type_from_name(name)
|
||||
index = parse(Int, name[2:end])
|
||||
|
||||
p = nothing
|
||||
|
||||
if (index > in_particles(processDescription)[type])
|
||||
index -= in_particles(processDescription)[type]
|
||||
@assert index <= out_particles(processDescription)[type] "Too few particles of type $type in input particles for this process"
|
||||
|
||||
p = "filter(x -> typeof(x) <: $type, out_particles($(processInputSymbol)))[$(index)]"
|
||||
else
|
||||
p = "filter(x -> typeof(x) <: $type, in_particles($(processInputSymbol)))[$(index)]"
|
||||
end
|
||||
|
||||
for symbol in symbols
|
||||
# TODO: how to get the "default" cpu device?
|
||||
device = entry_device(machine)
|
||||
evalExpr = eval(gen_access_expr(device, symbol))
|
||||
push!(assignInputs, Meta.parse("$(evalExpr) = ParticleValue($p, 1.0)"))
|
||||
end
|
||||
end
|
||||
|
||||
# node is now the last node we looked at -> the output node
|
||||
outSym = Symbol("data_$(to_var_name(node.id))")
|
||||
return Expr(:block, assignInputs...)
|
||||
end
|
||||
|
||||
return (
|
||||
code = Expr(:block, code...),
|
||||
inputSymbols = inputSyms,
|
||||
outputSymbol = outSym,
|
||||
"""
|
||||
get_compute_function(graph::DAG, process::AbstractProcessDescription, machine::Machine)
|
||||
|
||||
Return a function of signature `compute_<id>(input::AbstractProcessInput)`, which will return the result of the DAG computation on the given input.
|
||||
"""
|
||||
function get_compute_function(graph::DAG, process::AbstractProcessDescription, machine::Machine)
|
||||
(code, inputSymbols, outputSymbol) = gen_code(graph, machine)
|
||||
|
||||
initCaches = gen_cache_init_code(machine)
|
||||
assignInputs = gen_input_assignment_code(inputSymbols, process, machine, :input)
|
||||
|
||||
functionId = to_var_name(UUIDs.uuid1(rng[1]))
|
||||
resSym = eval(gen_access_expr(entry_device(machine), outputSymbol))
|
||||
expr = Meta.parse(
|
||||
"function compute_$(functionId)(input::AbstractProcessInput) $initCaches; $assignInputs; $code; return $resSym; end",
|
||||
)
|
||||
func = eval(expr)
|
||||
|
||||
return func
|
||||
end
|
||||
|
||||
"""
|
||||
execute(generated_code, input::Dict{ParticleType, Vector{Particle}})
|
||||
execute(graph::DAG, process::AbstractProcessDescription, machine::Machine, input::AbstractProcessInput)
|
||||
|
||||
Execute the given `generated_code` (as returned by [`gen_code`](@ref)) on the given input particles.
|
||||
Execute the code of the given `graph` on the given input particles.
|
||||
|
||||
This is essentially shorthand for
|
||||
```julia
|
||||
compute_graph = get_compute_function(graph, process)
|
||||
result = compute_graph(particles)
|
||||
```
|
||||
|
||||
See also: [`parse_dag`](@ref), [`parse_process`](@ref), [`gen_process_input`](@ref)
|
||||
"""
|
||||
function execute(generated_code, input::Dict{ParticleType, Vector{Particle}})
|
||||
(code, inputSymbols, outputSymbol) = generated_code
|
||||
function execute(graph::DAG, process::AbstractProcessDescription, machine::Machine, input::AbstractProcessInput)
|
||||
(code, inputSymbols, outputSymbol) = gen_code(graph, machine)
|
||||
|
||||
assignInputs = Vector{Expr}()
|
||||
for (name, symbol) in inputSymbols
|
||||
type = nothing
|
||||
if startswith(name, "A")
|
||||
type = A
|
||||
elseif startswith(name, "B")
|
||||
type = B
|
||||
else
|
||||
type = C
|
||||
initCaches = gen_cache_init_code(machine)
|
||||
assignInputs = gen_input_assignment_code(inputSymbols, process, machine, :input)
|
||||
|
||||
|
||||
functionId = to_var_name(UUIDs.uuid1(rng[1]))
|
||||
resSym = eval(gen_access_expr(entry_device(machine), outputSymbol))
|
||||
expr = Meta.parse(
|
||||
"function compute_$(functionId)(input::AbstractProcessInput) $initCaches; $assignInputs; $code; return $resSym; end",
|
||||
)
|
||||
func = eval(expr)
|
||||
|
||||
result = 0
|
||||
try
|
||||
result = @eval $func($input)
|
||||
catch e
|
||||
println("Error while evaluating: $e")
|
||||
|
||||
# if we find a uuid in the exception we can color it in so it's easier to spot
|
||||
uuidRegex = r"[0-9a-f]{8}_[0-9a-f]{4}_[0-9a-f]{4}_[0-9a-f]{4}_[0-9a-f]{12}"
|
||||
m = match(uuidRegex, string(e))
|
||||
|
||||
functionStr = string(expr)
|
||||
if (isa(m, RegexMatch))
|
||||
functionStr = replace(functionStr, m.match => "\033[31m$(m.match)\033[0m")
|
||||
end
|
||||
index = parse(Int, name[2:end])
|
||||
|
||||
push!(
|
||||
assignInputs,
|
||||
Meta.parse(
|
||||
"$(symbol) = ParticleValue(Particle($(input[type][index]).P0, $(input[type][index]).P1, $(input[type][index]).P2, $(input[type][index]).P3, $(type)), 1.0)",
|
||||
),
|
||||
)
|
||||
println("Function:\n$functionStr")
|
||||
@assert false
|
||||
end
|
||||
|
||||
assignInputs = Expr(:block, assignInputs...)
|
||||
eval(assignInputs)
|
||||
eval(code)
|
||||
|
||||
eval(Meta.parse("result = $outputSymbol"))
|
||||
return result
|
||||
end
|
||||
|
||||
"""
|
||||
execute(graph::DAG, input::Dict{ParticleType, Vector{Particle}})
|
||||
|
||||
Execute the given `generated_code` (as returned by [`gen_code`](@ref)) on the given input particles.
|
||||
The input particles should be sorted correctly into the dictionary to their according [`ParticleType`](@ref)s.
|
||||
|
||||
See also: [`gen_particles`](@ref)
|
||||
"""
|
||||
function execute(graph::DAG, input::Dict{ParticleType, Vector{Particle}})
|
||||
(code, inputSymbols, outputSymbol) = gen_code(graph)
|
||||
|
||||
assignInputs = Vector{Expr}()
|
||||
for (name, symbol) in inputSymbols
|
||||
type = nothing
|
||||
if startswith(name, "A")
|
||||
type = A
|
||||
elseif startswith(name, "B")
|
||||
type = B
|
||||
else
|
||||
type = C
|
||||
end
|
||||
index = parse(Int, name[2:end])
|
||||
|
||||
push!(
|
||||
assignInputs,
|
||||
Meta.parse(
|
||||
"$(symbol) = ParticleValue(Particle($(input[type][index]).P0, $(input[type][index]).P1, $(input[type][index]).P2, $(input[type][index]).P3, $(type)), 1.0)",
|
||||
),
|
||||
)
|
||||
end
|
||||
|
||||
assignInputs = Expr(:block, assignInputs...)
|
||||
eval(assignInputs)
|
||||
eval(code)
|
||||
|
||||
eval(Meta.parse("result = $outputSymbol"))
|
||||
return result
|
||||
end
|
||||
|
Reference in New Issue
Block a user