Add full node bench example
This commit is contained in:
221
examples/full_node_bench.jl
Normal file
221
examples/full_node_bench.jl
Normal file
@ -0,0 +1,221 @@
|
||||
using MetagraphOptimization
|
||||
using CUDA
|
||||
using UUIDs
|
||||
using DataFrames
|
||||
using CSV
|
||||
using Random
|
||||
using BenchmarkTools
|
||||
using Dates
|
||||
|
||||
using Base.Threads
|
||||
|
||||
|
||||
function log(x...)
|
||||
println(now(), " ", join(x, " ")...)
|
||||
return flush(stdout)
|
||||
end
|
||||
|
||||
results_filename = "full_node_bench.csv"
|
||||
|
||||
df = DataFrame(
|
||||
process_name = String[],
|
||||
cpu_threads = Int[],
|
||||
gpu_devices = Int[],
|
||||
n_inputs = Int[],
|
||||
chunk_size = Int[],
|
||||
time = Float64[],
|
||||
std = Float64[],
|
||||
rate = Float64[],
|
||||
ratio_cpu = Float64[],
|
||||
ratio_gpu = Float64[],
|
||||
)
|
||||
|
||||
# if they exist, read existing results and append new ones
|
||||
if isfile(results_filename)
|
||||
df = CSV.read(results_filename, DataFrame)
|
||||
end
|
||||
|
||||
nInputs = 100_000_000
|
||||
|
||||
lck = SpinLock()
|
||||
|
||||
progress = 1
|
||||
cpu_chunks = 0
|
||||
gpu_chunks = 0
|
||||
|
||||
chunkSizes = [100, 1_000, 10_000, 50_000, 100_000]
|
||||
|
||||
function cpu_worker(compute_func, inputs, chunk_size)
|
||||
global progress
|
||||
global cpu_chunks
|
||||
quit = false
|
||||
work_start = 0
|
||||
work_end = 0
|
||||
while true
|
||||
lock(lck) do
|
||||
if progress >= nInputs
|
||||
quit = true
|
||||
else
|
||||
work_start = progress
|
||||
progress = progress + chunk_size
|
||||
work_end = min(progress, nInputs)
|
||||
cpu_chunks = cpu_chunks + 1
|
||||
end
|
||||
end
|
||||
if quit
|
||||
break
|
||||
end
|
||||
|
||||
for i in work_start:work_end
|
||||
compute_func(inputs[i])
|
||||
end
|
||||
end
|
||||
|
||||
return nothing
|
||||
end
|
||||
|
||||
# called with a specific device selected
|
||||
function gpu_worker(compute_func, inputs, chunk_size)
|
||||
global progress
|
||||
global gpu_chunks
|
||||
quit = false
|
||||
work_start = 0
|
||||
work_end = 0
|
||||
while true
|
||||
lock(lck) do
|
||||
if progress >= nInputs
|
||||
quit = true
|
||||
else
|
||||
work_start = progress
|
||||
progress = progress + chunk_size
|
||||
work_end = min(progress, nInputs)
|
||||
gpu_chunks = cpu_chunks + 1
|
||||
end
|
||||
end
|
||||
if quit
|
||||
break
|
||||
end
|
||||
|
||||
cuInputs = CuVector(inputs[work_start:work_end])
|
||||
compute_func.(cuInputs)
|
||||
end
|
||||
|
||||
return nothing
|
||||
end
|
||||
|
||||
cpu_gpu_ratio = Vector{Tuple{Int, Int}}()
|
||||
|
||||
function full_compute(compute_func, inputs, chunk_size)
|
||||
global progress = 1
|
||||
global cpu_chunks = 0
|
||||
global gpu_chunks = 0
|
||||
|
||||
tasks = Vector()
|
||||
|
||||
for dev in CUDA.devices()
|
||||
t = @task device!(dev) do
|
||||
gpu_worker(compute_func, inputs, chunk_size)
|
||||
return nothing
|
||||
end
|
||||
schedule(t)
|
||||
push!(tasks, t)
|
||||
end
|
||||
|
||||
for i in 1:Threads.nthreads()
|
||||
t = @task cpu_worker(compute_func, inputs, chunk_size)
|
||||
schedule(t)
|
||||
push!(tasks, t)
|
||||
end
|
||||
|
||||
for t in tasks
|
||||
wait(t)
|
||||
end
|
||||
|
||||
push!(cpu_gpu_ratio, (cpu_chunks, gpu_chunks))
|
||||
return nothing
|
||||
end
|
||||
|
||||
function bench(compute_function, inputs, chunk_size)
|
||||
bench = @benchmark begin
|
||||
full_compute($compute_function, $inputs, $chunk_size)
|
||||
end gcsample = true seconds = 600
|
||||
|
||||
time = median(bench.times) / 1e9
|
||||
s = std(bench.times) / 1e9
|
||||
rate = length(inputs) / time
|
||||
|
||||
med_cpu_chunks = median(getindex.(cpu_gpu_ratio, 1))
|
||||
med_gpu_chunks = median(getindex.(cpu_gpu_ratio, 2))
|
||||
|
||||
return (time, rate, s, med_cpu_chunks, med_gpu_chunks)
|
||||
end
|
||||
|
||||
function full_node_bench(process::MetagraphOptimization.AbstractProcessDescription, func, chunk_size)
|
||||
process_name = string(process)
|
||||
log("\n--- Benchmarking $(process_name) on $(nInputs) with chunk size $(chunk_size) ---")
|
||||
|
||||
log("Available Cuda Devices:")
|
||||
display.(CUDA.devices())
|
||||
log("Generating $nInputs inputs with $(Threads.nthreads()) threads...")
|
||||
|
||||
inputs = Vector{typeof(gen_process_input(process))}()
|
||||
resize!(inputs, nInputs)
|
||||
processes = Vector{typeof(process)}()
|
||||
for i in 1:Threads.nthreads()
|
||||
push!(processes, copy(process))
|
||||
end
|
||||
|
||||
@inbounds Threads.@threads for i in eachindex(inputs)
|
||||
inputs[i] = gen_process_input(processes[Threads.nthreads()])
|
||||
end
|
||||
|
||||
log("Benchmarking full node...")
|
||||
(time, rate, s, med_cpu_chunks, med_gpu_chunks) = bench(func, inputs, chunk_size)
|
||||
|
||||
push!(
|
||||
df,
|
||||
Dict(
|
||||
:process_name => process_name,
|
||||
:cpu_threads => Threads.nthreads(),
|
||||
:gpu_devices => length(CUDA.devices()),
|
||||
:n_inputs => nInputs,
|
||||
:chunk_size => chunk_size,
|
||||
:time => time,
|
||||
:std => s,
|
||||
:rate => rate,
|
||||
:ratio_cpu => med_cpu_chunks / (med_cpu_chunks + med_gpu_chunks),
|
||||
:ratio_gpu => med_gpu_chunks / (med_cpu_chunks + med_gpu_chunks),
|
||||
),
|
||||
)
|
||||
|
||||
return nothing
|
||||
end
|
||||
|
||||
# use "mock" machine that only uses cpu for compilation
|
||||
machine = Machine(
|
||||
[
|
||||
MetagraphOptimization.NumaNode(
|
||||
0,
|
||||
1,
|
||||
MetagraphOptimization.default_strategy(MetagraphOptimization.NumaNode),
|
||||
-1.0,
|
||||
UUIDs.uuid1(),
|
||||
),
|
||||
],
|
||||
[-1.0;;],
|
||||
)
|
||||
|
||||
optimizer = ReductionOptimizer()
|
||||
processes = ["ke->ke", "ke->kke", "ke->kkke", "ke->kkkke"]
|
||||
|
||||
for proc in processes
|
||||
process = parse_process(proc, QEDModel())
|
||||
graph = gen_graph(process)
|
||||
optimize_to_fixpoint!(optimizer, graph)
|
||||
func_gen_time = @elapsed compute_func = get_compute_function(graph, process, machine)
|
||||
|
||||
for chunk_size in chunkSizes
|
||||
full_node_bench(process, compute_func, chunk_size)
|
||||
CSV.write(results_filename, df)
|
||||
end
|
||||
end;
|
Reference in New Issue
Block a user