full node bench testing
This commit is contained in:
@ -12,7 +12,8 @@ using Base.Threads
|
||||
|
||||
function log(x...)
|
||||
println(now(), " ", join(x, " ")...)
|
||||
return flush(stdout)
|
||||
#flush(stdout)
|
||||
return nothing
|
||||
end
|
||||
|
||||
results_filename = "full_node_bench.csv"
|
||||
@ -26,8 +27,8 @@ df = DataFrame(
|
||||
time = Float64[],
|
||||
std = Float64[],
|
||||
rate = Float64[],
|
||||
ratio_cpu = Float64[],
|
||||
ratio_gpu = Float64[],
|
||||
cpu_chunks = Float64[],
|
||||
gpu_chunks = Float64[],
|
||||
)
|
||||
|
||||
# if they exist, read existing results and append new ones
|
||||
@ -35,19 +36,20 @@ if isfile(results_filename)
|
||||
df = CSV.read(results_filename, DataFrame)
|
||||
end
|
||||
|
||||
nInputs = 100_000_000
|
||||
nInputs = 1_073_741_824 # 2^30
|
||||
|
||||
lck = SpinLock()
|
||||
lck = ReentrantLock()
|
||||
|
||||
progress = 1
|
||||
cpu_chunks = 0
|
||||
gpu_chunks = 0
|
||||
|
||||
chunkSizes = [100, 1_000, 10_000, 50_000, 100_000]
|
||||
chunkSizes = [1024, 4096, 16384, 65536, 262144, 1048576] # 2^10 to 2^20
|
||||
|
||||
function cpu_worker(compute_func, inputs, chunk_size)
|
||||
global progress
|
||||
global cpu_chunks
|
||||
global lck
|
||||
quit = false
|
||||
work_start = 0
|
||||
work_end = 0
|
||||
@ -58,8 +60,9 @@ function cpu_worker(compute_func, inputs, chunk_size)
|
||||
else
|
||||
work_start = progress
|
||||
progress = progress + chunk_size
|
||||
work_end = min(progress, nInputs)
|
||||
work_end = min(progress - 1, nInputs)
|
||||
cpu_chunks = cpu_chunks + 1
|
||||
#log("CPU Worker $(Threads.threadid()) computing $(cpu_chunks)th cpu chunk ($work_start, $work_end)")
|
||||
end
|
||||
end
|
||||
if quit
|
||||
@ -71,6 +74,8 @@ function cpu_worker(compute_func, inputs, chunk_size)
|
||||
end
|
||||
end
|
||||
|
||||
#log("CPU Worker on $(Threads.threadid()) finished!")
|
||||
|
||||
return nothing
|
||||
end
|
||||
|
||||
@ -78,6 +83,7 @@ end
|
||||
function gpu_worker(compute_func, inputs, chunk_size)
|
||||
global progress
|
||||
global gpu_chunks
|
||||
global lck
|
||||
quit = false
|
||||
work_start = 0
|
||||
work_end = 0
|
||||
@ -88,8 +94,9 @@ function gpu_worker(compute_func, inputs, chunk_size)
|
||||
else
|
||||
work_start = progress
|
||||
progress = progress + chunk_size
|
||||
work_end = min(progress, nInputs)
|
||||
work_end = min(progress - 1, nInputs)
|
||||
gpu_chunks = gpu_chunks + 1
|
||||
#log("GPU Worker $(CUDA.device()) computing $(gpu_chunks)th gpu chunk ($work_start, $work_end)")
|
||||
end
|
||||
end
|
||||
if quit
|
||||
@ -100,30 +107,33 @@ function gpu_worker(compute_func, inputs, chunk_size)
|
||||
compute_func.(cuInputs)
|
||||
end
|
||||
|
||||
#log("GPU Worker on Device $(CUDA.device()) finished!")
|
||||
|
||||
return nothing
|
||||
end
|
||||
|
||||
cpu_gpu_ratio = Vector{Tuple{Int, Int}}()
|
||||
|
||||
function full_compute(compute_func, inputs, chunk_size)
|
||||
global progress = 1
|
||||
global cpu_chunks = 0
|
||||
global gpu_chunks = 0
|
||||
global progress
|
||||
progress = 1
|
||||
global cpu_chunks
|
||||
cpu_chunks = 0
|
||||
global gpu_chunks
|
||||
gpu_chunks = 0
|
||||
|
||||
tasks = Vector()
|
||||
|
||||
for dev in CUDA.devices()
|
||||
t = @task device!(dev) do
|
||||
t = Threads.@spawn device!(dev) do
|
||||
gpu_worker(compute_func, inputs, chunk_size)
|
||||
return nothing
|
||||
end
|
||||
schedule(t)
|
||||
push!(tasks, t)
|
||||
end
|
||||
|
||||
for i in 1:Threads.nthreads()
|
||||
t = @task cpu_worker(compute_func, inputs, chunk_size)
|
||||
schedule(t)
|
||||
for i in 1:(Threads.nthreads() - length(CUDA.devices()))
|
||||
t = Threads.@spawn cpu_worker(compute_func, inputs, chunk_size)
|
||||
push!(tasks, t)
|
||||
end
|
||||
|
||||
@ -136,6 +146,9 @@ function full_compute(compute_func, inputs, chunk_size)
|
||||
end
|
||||
|
||||
function bench(compute_function, inputs, chunk_size)
|
||||
global cpu_gpu_ratio
|
||||
empty!(cpu_gpu_ratio)
|
||||
|
||||
bench = @benchmark begin
|
||||
full_compute($compute_function, $inputs, $chunk_size)
|
||||
end gcsample = true seconds = 600
|
||||
@ -147,44 +160,37 @@ function bench(compute_function, inputs, chunk_size)
|
||||
med_cpu_chunks = median(getindex.(cpu_gpu_ratio, 1))
|
||||
med_gpu_chunks = median(getindex.(cpu_gpu_ratio, 2))
|
||||
|
||||
log("CPU/GPU ratios: $(cpu_gpu_ratio)")
|
||||
|
||||
return (time, rate, s, med_cpu_chunks, med_gpu_chunks)
|
||||
end
|
||||
|
||||
function full_node_bench(process::MetagraphOptimization.AbstractProcessDescription, func, chunk_size)
|
||||
function full_node_bench(process::MetagraphOptimization.AbstractProcessDescription, func, chunk_size, inputs)
|
||||
process_name = string(process)
|
||||
log("\n--- Benchmarking $(process_name) on $(nInputs) with chunk size $(chunk_size) ---")
|
||||
|
||||
log("Available Cuda Devices:")
|
||||
display.(CUDA.devices())
|
||||
log("Generating $nInputs inputs with $(Threads.nthreads()) threads...")
|
||||
|
||||
inputs = Vector{typeof(gen_process_input(process))}()
|
||||
resize!(inputs, nInputs)
|
||||
processes = Vector{typeof(process)}()
|
||||
for i in 1:Threads.nthreads()
|
||||
push!(processes, copy(process))
|
||||
end
|
||||
|
||||
@inbounds Threads.@threads for i in eachindex(inputs)
|
||||
inputs[i] = gen_process_input(processes[Threads.nthreads()])
|
||||
end
|
||||
|
||||
log("Benchmarking full node...")
|
||||
(time, rate, s, med_cpu_chunks, med_gpu_chunks) = bench(func, inputs, chunk_size)
|
||||
log(
|
||||
"Benchmarking complete with median time $(time), $(med_cpu_chunks) cpu chunks, and $(med_gpu_chunks) gpu chunks.",
|
||||
)
|
||||
|
||||
push!(
|
||||
df,
|
||||
Dict(
|
||||
:process_name => process_name,
|
||||
:cpu_threads => Threads.nthreads(),
|
||||
:cpu_threads => Threads.nthreads() - length(CUDA.devices()),
|
||||
:gpu_devices => length(CUDA.devices()),
|
||||
:n_inputs => nInputs,
|
||||
:chunk_size => chunk_size,
|
||||
:time => time,
|
||||
:std => s,
|
||||
:rate => rate,
|
||||
:ratio_cpu => med_cpu_chunks / (med_cpu_chunks + med_gpu_chunks),
|
||||
:ratio_gpu => med_gpu_chunks / (med_cpu_chunks + med_gpu_chunks),
|
||||
:cpu_chunks => med_cpu_chunks,
|
||||
:gpu_chunks => med_gpu_chunks,
|
||||
),
|
||||
)
|
||||
|
||||
@ -206,7 +212,7 @@ machine = Machine(
|
||||
)
|
||||
|
||||
optimizer = ReductionOptimizer()
|
||||
processes = ["ke->ke", "ke->kke", "ke->kkke", "ke->kkkke"]
|
||||
processes = ["ke->ke", "ke->kke", "ke->kkke", "ke->kkkke", "ke->kkkkke"]
|
||||
|
||||
for proc in processes
|
||||
process = parse_process(proc, QEDModel())
|
||||
@ -214,8 +220,21 @@ for proc in processes
|
||||
optimize_to_fixpoint!(optimizer, graph)
|
||||
func_gen_time = @elapsed compute_func = get_compute_function(graph, process, machine)
|
||||
|
||||
|
||||
log("Generating $nInputs inputs with $(Threads.nthreads()) threads...")
|
||||
inputs = Vector{typeof(gen_process_input(process))}()
|
||||
resize!(inputs, nInputs)
|
||||
procs = Vector{typeof(process)}()
|
||||
for i in 1:Threads.nthreads()
|
||||
push!(procs, copy(process))
|
||||
end
|
||||
|
||||
@inbounds Threads.@threads for i in eachindex(inputs)
|
||||
inputs[i] = gen_process_input(procs[Threads.nthreads()])
|
||||
end
|
||||
|
||||
for chunk_size in chunkSizes
|
||||
full_node_bench(process, compute_func, chunk_size)
|
||||
full_node_bench(process, compute_func, chunk_size, inputs)
|
||||
CSV.write(results_filename, df)
|
||||
end
|
||||
end;
|
||||
|
Reference in New Issue
Block a user