From 71219f101ebb94c16a856ec2bedeb74a96e00f47 Mon Sep 17 00:00:00 2001 From: Anton Reinhard Date: Mon, 4 Mar 2024 23:54:47 +0100 Subject: [PATCH] Add full node bench example --- examples/full_node_bench.jl | 221 ++++++++++++++++++++++++++++++++ experiments/full_node.sh | 24 ++++ experiments/full_node_hemera.sh | 24 ++++ 3 files changed, 269 insertions(+) create mode 100644 examples/full_node_bench.jl create mode 100755 experiments/full_node.sh create mode 100755 experiments/full_node_hemera.sh diff --git a/examples/full_node_bench.jl b/examples/full_node_bench.jl new file mode 100644 index 0000000..7fd4704 --- /dev/null +++ b/examples/full_node_bench.jl @@ -0,0 +1,221 @@ +using MetagraphOptimization +using CUDA +using UUIDs +using DataFrames +using CSV +using Random +using BenchmarkTools +using Dates + +using Base.Threads + + +function log(x...) + println(now(), " ", join(x, " ")...) + return flush(stdout) +end + +results_filename = "full_node_bench.csv" + +df = DataFrame( + process_name = String[], + cpu_threads = Int[], + gpu_devices = Int[], + n_inputs = Int[], + chunk_size = Int[], + time = Float64[], + std = Float64[], + rate = Float64[], + ratio_cpu = Float64[], + ratio_gpu = Float64[], +) + +# if they exist, read existing results and append new ones +if isfile(results_filename) + df = CSV.read(results_filename, DataFrame) +end + +nInputs = 100_000_000 + +lck = SpinLock() + +progress = 1 +cpu_chunks = 0 +gpu_chunks = 0 + +chunkSizes = [100, 1_000, 10_000, 50_000, 100_000] + +function cpu_worker(compute_func, inputs, chunk_size) + global progress + global cpu_chunks + quit = false + work_start = 0 + work_end = 0 + while true + lock(lck) do + if progress >= nInputs + quit = true + else + work_start = progress + progress = progress + chunk_size + work_end = min(progress, nInputs) + cpu_chunks = cpu_chunks + 1 + end + end + if quit + break + end + + for i in work_start:work_end + compute_func(inputs[i]) + end + end + + return nothing +end + +# called with a specific device selected +function gpu_worker(compute_func, inputs, chunk_size) + global progress + global gpu_chunks + quit = false + work_start = 0 + work_end = 0 + while true + lock(lck) do + if progress >= nInputs + quit = true + else + work_start = progress + progress = progress + chunk_size + work_end = min(progress, nInputs) + gpu_chunks = cpu_chunks + 1 + end + end + if quit + break + end + + cuInputs = CuVector(inputs[work_start:work_end]) + compute_func.(cuInputs) + end + + return nothing +end + +cpu_gpu_ratio = Vector{Tuple{Int, Int}}() + +function full_compute(compute_func, inputs, chunk_size) + global progress = 1 + global cpu_chunks = 0 + global gpu_chunks = 0 + + tasks = Vector() + + for dev in CUDA.devices() + t = @task device!(dev) do + gpu_worker(compute_func, inputs, chunk_size) + return nothing + end + schedule(t) + push!(tasks, t) + end + + for i in 1:Threads.nthreads() + t = @task cpu_worker(compute_func, inputs, chunk_size) + schedule(t) + push!(tasks, t) + end + + for t in tasks + wait(t) + end + + push!(cpu_gpu_ratio, (cpu_chunks, gpu_chunks)) + return nothing +end + +function bench(compute_function, inputs, chunk_size) + bench = @benchmark begin + full_compute($compute_function, $inputs, $chunk_size) + end gcsample = true seconds = 600 + + time = median(bench.times) / 1e9 + s = std(bench.times) / 1e9 + rate = length(inputs) / time + + med_cpu_chunks = median(getindex.(cpu_gpu_ratio, 1)) + med_gpu_chunks = median(getindex.(cpu_gpu_ratio, 2)) + + return (time, rate, s, med_cpu_chunks, med_gpu_chunks) +end + +function full_node_bench(process::MetagraphOptimization.AbstractProcessDescription, func, chunk_size) + process_name = string(process) + log("\n--- Benchmarking $(process_name) on $(nInputs) with chunk size $(chunk_size) ---") + + log("Available Cuda Devices:") + display.(CUDA.devices()) + log("Generating $nInputs inputs with $(Threads.nthreads()) threads...") + + inputs = Vector{typeof(gen_process_input(process))}() + resize!(inputs, nInputs) + processes = Vector{typeof(process)}() + for i in 1:Threads.nthreads() + push!(processes, copy(process)) + end + + @inbounds Threads.@threads for i in eachindex(inputs) + inputs[i] = gen_process_input(processes[Threads.nthreads()]) + end + + log("Benchmarking full node...") + (time, rate, s, med_cpu_chunks, med_gpu_chunks) = bench(func, inputs, chunk_size) + + push!( + df, + Dict( + :process_name => process_name, + :cpu_threads => Threads.nthreads(), + :gpu_devices => length(CUDA.devices()), + :n_inputs => nInputs, + :chunk_size => chunk_size, + :time => time, + :std => s, + :rate => rate, + :ratio_cpu => med_cpu_chunks / (med_cpu_chunks + med_gpu_chunks), + :ratio_gpu => med_gpu_chunks / (med_cpu_chunks + med_gpu_chunks), + ), + ) + + return nothing +end + +# use "mock" machine that only uses cpu for compilation +machine = Machine( + [ + MetagraphOptimization.NumaNode( + 0, + 1, + MetagraphOptimization.default_strategy(MetagraphOptimization.NumaNode), + -1.0, + UUIDs.uuid1(), + ), + ], + [-1.0;;], +) + +optimizer = ReductionOptimizer() +processes = ["ke->ke", "ke->kke", "ke->kkke", "ke->kkkke"] + +for proc in processes + process = parse_process(proc, QEDModel()) + graph = gen_graph(process) + optimize_to_fixpoint!(optimizer, graph) + func_gen_time = @elapsed compute_func = get_compute_function(graph, process, machine) + + for chunk_size in chunkSizes + full_node_bench(process, compute_func, chunk_size) + CSV.write(results_filename, df) + end +end; diff --git a/experiments/full_node.sh b/experiments/full_node.sh new file mode 100755 index 0000000..05b10dc --- /dev/null +++ b/experiments/full_node.sh @@ -0,0 +1,24 @@ +#!/bin/bash + +SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd ) +LOG_FILE="$SCRIPT_DIR/../julia_full_node.log" + +cd $SCRIPT_DIR/.. + +echo "Writing system info..." + +# collect some information of the used node and system +uname -a > results/system_full_node.txt +julia --version > results/julia_full_node.txt +lscpu > results/cpu_full_node.txt +nvidia-smi > results/cuda_gpu_full_node.txt +lsblk > results/storage_full_node.txt +lspci > results/pci_full_node.txt + +#echo "Initiating julia..." +#julia --threads=8 --project=./ -e 'using Pkg; Pkg.instantiate(); Pkg.add(url="https://github.com/QEDjl-project/QEDprocesses.jl/")' >> $LOG_FILE 2>&1 || exit 1 # need current dev version of QEDprocesses +#julia --threads=8 -e 'using Pkg; Pkg.add("CSV"); Pkg.add("DataFrames"); Pkg.add("CUDA"); Pkg.add("Random"); Pkg.add("BenchmarkTools"); Pkg.add("Dates")' >> $LOG_FILE 2>&1 || exit 1 # add requirements for the bench script +julia --project -e 'using CUDA; CUDA.set_runtime_version!(VersionNumber("12.1"))' >> $LOG_FILE 2>&1 || echo "Failed to set CUDA version number" + +echo "Benchmarking Reduction 128 Threads, *GPU*" +julia --project --threads=128 examples/full_node_bench.jl >> $LOG_FILE 2>&1 || echo "-- Something went wrong, check logs --" diff --git a/experiments/full_node_hemera.sh b/experiments/full_node_hemera.sh new file mode 100755 index 0000000..679fe23 --- /dev/null +++ b/experiments/full_node_hemera.sh @@ -0,0 +1,24 @@ +#!/bin/bash +#SBATCH --job-name=qed_bench +#SBATCH --partition=casus_a100 +#SBATCH --account=casus +#SBATCH --time=10:00:00 +#SBATCH --nodes=1 +#SBATCH --ntasks=1 +#SBATCH --cpus-per-task=128 +#SBATCH --gres=gpu:4 +#SBATCH --mem=256GB +#SBATCH --output=simulation-%A-%a.out +#SBATCH --error=simulation-%A-%a.err + +cd $HOME/repos/metagraph_optimization + +module load singularity +module load git +module load cuda/12.1 + +printf "Current git commit hash: " > results/git_reduce_bench_gpu.txt +git rev-parse HEAD >> results/git_reduce_bench_gpu.txt +git status >> results/git_reduce_bench_gpu.txt + +singularity exec --nv experiments/CUDA_container.sif ./experiments/full_node.sh