Shuffle files and functions around for more consistent naming and smaller files

This commit is contained in:
Anton Reinhard 2023-08-24 15:11:54 +02:00
parent 1b4030d633
commit 0f5f475cb4
34 changed files with 573 additions and 571 deletions

View File

@ -21,29 +21,45 @@ import Base.insert!
import Base.collect
include("tasks.jl")
include("nodes.jl")
include("graph.jl")
include("task/type.jl")
include("node/type.jl")
include("diff/type.jl")
include("operation/type.jl")
include("graph/type.jl")
include("trie.jl")
include("utility.jl")
include("task_functions.jl")
include("node_functions.jl")
include("graph_functions.jl")
include("diff/print.jl")
include("diff/properties.jl")
include("operations/utility.jl")
include("operations/apply.jl")
include("operations/clean.jl")
include("operations/find.jl")
include("operations/get.jl")
include("operations/print.jl")
include("operations/validate.jl")
include("graph/compare.jl")
include("graph/interface.jl")
include("graph/mute.jl")
include("graph/print.jl")
include("graph/properties.jl")
include("graph/validate.jl")
include("graph_interface.jl")
include("node/compare.jl")
include("node/create.jl")
include("node/print.jl")
include("node/properties.jl")
include("node/validate.jl")
include("abc_model/tasks.jl")
include("abc_model/task_functions.jl")
include("abc_model/parse.jl")
include("operation/utility.jl")
include("operation/apply.jl")
include("operation/clean.jl")
include("operation/find.jl")
include("operation/get.jl")
include("operation/print.jl")
include("operation/validate.jl")
include("task/compare.jl")
include("task/print.jl")
include("task/properties.jl")
include("models/abc/types.jl")
include("models/abc/properties.jl")
include("models/abc/parse.jl")
end # module MetagraphOptimization

6
src/diff/print.jl Normal file
View File

@ -0,0 +1,6 @@
function show(io::IO, diff::Diff)
print(io, "Nodes: ")
print(io, length(diff.addedNodes) + length(diff.removedNodes))
print(io, " Edges: ")
print(io, length(diff.addedEdges) + length(diff.removedEdges))
end

9
src/diff/properties.jl Normal file
View File

@ -0,0 +1,9 @@
# return a namedtuple of the lengths of the added/removed nodes/edges
function length(diff::Diff)
return (
addedNodes = length(diff.addedNodes),
removedNodes = length(diff.removedNodes),
addedEdges = length(diff.addedEdges),
removedEdges = length(diff.removedEdges)
)
end

13
src/diff/type.jl Normal file
View File

@ -0,0 +1,13 @@
const Diff = NamedTuple{
(:addedNodes, :removedNodes, :addedEdges, :removedEdges),
Tuple{Vector{Node}, Vector{Node}, Vector{Edge}, Vector{Edge}}
}
function Diff()
return (
addedNodes = Vector{Node}(),
removedNodes = Vector{Node}(),
addedEdges = Vector{Edge}(),
removedEdges = Vector{Edge}()
)::Diff
end

14
src/graph/compare.jl Normal file
View File

@ -0,0 +1,14 @@
in(node::Node, graph::DAG) = node in graph.nodes
in(edge::Edge, graph::DAG) = edge in graph.edges
function ==(n1::Node, n2::Node, g::DAG)
if typeof(n1) != typeof(n2)
return false
end
if !(n1 in g) || !(n2 in g)
return false
end
return n1.task == n2.task && children(n1) == children(n2)
end

View File

@ -19,6 +19,7 @@ function pop_operation!(graph::DAG)
else
error("No more operations to pop!")
end
return nothing
end

162
src/graph/mute.jl Normal file
View File

@ -0,0 +1,162 @@
# for graph mutating functions we need to do a few things
# 1: mute the graph (duh)
# 2: keep track of what was changed for the diff (if track == true)
# 3: invalidate operation caches
function insert_node!(graph::DAG, node::Node, track=true, invalidate_cache=true)
# 1: mute
push!(graph.nodes, node)
# 2: keep track
if (track) push!(graph.diff.addedNodes, node) end
# 3: invalidate caches
if (!invalidate_cache) return node end
push!(graph.dirtyNodes, node)
return node
end
function insert_edge!(graph::DAG, node1::Node, node2::Node, track=true, invalidate_cache=true)
# @assert (node2 ∉ node1.parents) && (node1 ∉ node2.children) "Edge to insert already exists"
# 1: mute
# edge points from child to parent
push!(node1.parents, node2)
push!(node2.children, node1)
# 2: keep track
if (track) push!(graph.diff.addedEdges, make_edge(node1, node2)) end
# 3: invalidate caches
if (!invalidate_cache) return nothing end
invalidate_operation_caches!(graph, node1)
invalidate_operation_caches!(graph, node2)
push!(graph.dirtyNodes, node1)
push!(graph.dirtyNodes, node2)
return nothing
end
function remove_node!(graph::DAG, node::Node, track=true, invalidate_cache=true)
# @assert node in graph.nodes "Trying to remove a node that's not in the graph"
# 1: mute
delete!(graph.nodes, node)
# 2: keep track
if (track) push!(graph.diff.removedNodes, node) end
# 3: invalidate caches
if (!invalidate_cache) return nothing end
invalidate_operation_caches!(graph, node)
delete!(graph.dirtyNodes, node)
return nothing
end
function remove_edge!(graph::DAG, node1::Node, node2::Node, track=true, invalidate_cache=true)
# 1: mute
pre_length1 = length(node1.parents)
pre_length2 = length(node2.children)
filter!(x -> x != node2, node1.parents)
filter!(x -> x != node1, node2.children)
#=@assert begin
removed = pre_length1 - length(node1.parents)
removed <= 1
end "removed more than one node from node1's parents"=#
#=@assert begin
removed = pre_length2 - length(node2.children)
removed <= 1
end "removed more than one node from node2's children"=#
# 2: keep track
if (track) push!(graph.diff.removedEdges, make_edge(node1, node2)) end
# 3: invalidate caches
if (!invalidate_cache) return nothing end
invalidate_operation_caches!(graph, node1)
invalidate_operation_caches!(graph, node2)
if (node1 in graph)
push!(graph.dirtyNodes, node1)
end
if (node2 in graph)
push!(graph.dirtyNodes, node2)
end
return nothing
end
# return the graph "difference" since last time this function was called
function get_snapshot_diff(graph::DAG)
return swapfield!(graph, :diff, Diff())
end
# function to invalidate the operation caches for a given NodeFusion
function invalidate_caches!(graph::DAG, operation::NodeFusion)
delete!(graph.possibleOperations, operation)
# delete the operation from all caches of nodes involved in the operation
filter!(!=(operation), operation.input[1].nodeFusions)
filter!(!=(operation), operation.input[3].nodeFusions)
operation.input[2].nodeFusion = missing
return nothing
end
# function to invalidate the operation caches for a given NodeReduction
function invalidate_caches!(graph::DAG, operation::NodeReduction)
delete!(graph.possibleOperations, operation)
for node in operation.input
node.nodeReduction = missing
end
return nothing
end
# function to invalidate the operation caches for a given NodeSplit
function invalidate_caches!(graph::DAG, operation::NodeSplit)
delete!(graph.possibleOperations, operation)
# delete the operation from all caches of nodes involved in the operation
# for node split there is only one node
operation.input.nodeSplit = missing
return nothing
end
# function to invalidate the operation caches of a ComputeTaskNode
function invalidate_operation_caches!(graph::DAG, node::ComputeTaskNode)
if !ismissing(node.nodeReduction)
invalidate_caches!(graph, node.nodeReduction)
end
if !ismissing(node.nodeSplit)
invalidate_caches!(graph, node.nodeSplit)
end
while !isempty(node.nodeFusions)
invalidate_caches!(graph, pop!(node.nodeFusions))
end
return nothing
end
# function to invalidate the operation caches of a DataTaskNode
function invalidate_operation_caches!(graph::DAG, node::DataTaskNode)
if !ismissing(node.nodeReduction)
invalidate_caches!(graph, node.nodeReduction)
end
if !ismissing(node.nodeSplit)
invalidate_caches!(graph, node.nodeSplit)
end
if !ismissing(node.nodeFusion)
invalidate_caches!(graph, node.nodeFusion)
end
return nothing
end

55
src/graph/print.jl Normal file
View File

@ -0,0 +1,55 @@
function show_nodes(io, graph::DAG)
print(io, "[")
first = true
for n in graph.nodes
if first
first = false
else
print(io, ", ")
end
print(io, n)
end
print(io, "]")
end
function show(io::IO, graph::DAG)
println(io, "Graph:")
print(io, " Nodes: ")
nodeDict = Dict{Type, Int64}()
noEdges = 0
for node in graph.nodes
if haskey(nodeDict, typeof(node.task))
nodeDict[typeof(node.task)] = nodeDict[typeof(node.task)] + 1
else
nodeDict[typeof(node.task)] = 1
end
noEdges += length(parents(node))
end
if length(graph.nodes) <= 20
show_nodes(io, graph)
else
print("Total: ", length(graph.nodes), ", ")
first = true
i = 0
for (type, number) in zip(keys(nodeDict), values(nodeDict))
i += 1
if first
first = false
else
print(", ")
end
if (i % 3 == 0)
print("\n ")
end
print(type, ": ", number)
end
end
println(io)
println(io, " Edges: ", noEdges)
properties = graph_properties(graph)
println(io, " Total Compute Effort: ", properties.compute_effort)
println(io, " Total Data Transfer: ", properties.data)
println(io, " Total Compute Intensity: ", properties.compute_intensity)
end

31
src/graph/properties.jl Normal file
View File

@ -0,0 +1,31 @@
function graph_properties(graph::DAG)
# make sure the graph is fully generated
apply_all!(graph)
d = 0
ce = 0
ed = 0
for node in graph.nodes
d += data(node.task) * length(node.parents)
ce += compute_effort(node.task)
ed += length(node.parents)
end
ci = ce / d
result = (data = d,
compute_effort = ce,
compute_intensity = ci,
nodes = length(graph.nodes),
edges = ed)
return result
end
function get_exit_node(graph::DAG)
for node in graph.nodes
if (is_exit_node(node))
return node
end
end
@assert false "The given graph has no exit node! It is either empty or not acyclic!"
end

View File

@ -1,54 +1,5 @@
using DataStructures
const Diff = NamedTuple{
(:addedNodes, :removedNodes, :addedEdges, :removedEdges),
Tuple{Vector{Node}, Vector{Node}, Vector{Edge}, Vector{Edge}}
}
function Diff()
return (
addedNodes = Vector{Node}(),
removedNodes = Vector{Node}(),
addedEdges = Vector{Edge}(),
removedEdges = Vector{Edge}()
)::Diff
end
# An abstract base class for operations
# an operation can be applied to a DAG
abstract type Operation end
# An abstract base class for already applied operations
# an applied operation can be reversed iff it is the last applied operation on the DAG
abstract type AppliedOperation end
struct NodeFusion <: Operation
input::Tuple{ComputeTaskNode, DataTaskNode, ComputeTaskNode}
end
struct AppliedNodeFusion <: AppliedOperation
operation::NodeFusion
diff::Diff
end
struct NodeReduction <: Operation
input::Vector{Node}
end
struct AppliedNodeReduction <: AppliedOperation
operation::NodeReduction
diff::Diff
end
struct NodeSplit <: Operation
input::Node
end
struct AppliedNodeSplit <: AppliedOperation
operation::NodeSplit
diff::Diff
end
mutable struct PossibleOperations
nodeFusions::Set{NodeFusion}
nodeReductions::Set{NodeReduction}

52
src/graph/validate.jl Normal file
View File

@ -0,0 +1,52 @@
# check whether the given graph is connected
function is_connected(graph::DAG)
nodeQueue = Deque{Node}()
push!(nodeQueue, get_exit_node(graph))
seenNodes = Set{Node}()
while !isempty(nodeQueue)
current = pop!(nodeQueue)
push!(seenNodes, current)
for child in current.children
push!(nodeQueue, child)
end
end
return length(seenNodes) == length(graph.nodes)
end
function is_valid(graph::DAG)
for node in graph.nodes
@assert is_valid(graph, node)
end
for op in graph.operationsToApply
@assert is_valid(graph, op)
end
for nr in graph.possibleOperations.nodeReductions
@assert is_valid(graph, nr)
end
for ns in graph.possibleOperations.nodeSplits
@assert is_valid(graph, ns)
end
for nf in graph.possibleOperations.nodeFusions
@assert is_valid(graph, nf)
end
for node in graph.dirtyNodes
@assert node in graph "Dirty Node is not part of the graph!"
@assert ismissing(node.nodeReduction) "Dirty Node has a NodeReduction!"
@assert ismissing(node.nodeSplit) "Dirty Node has a NodeSplit!"
if (typeof(node) <: DataTaskNode)
@assert ismissing(node.nodeFusion) "Dirty DataTaskNode has a Node Fusion!"
elseif (typeof(node) <: ComputeTaskNode)
@assert isempty(node.nodeFusions) "Dirty ComputeTaskNode has Node Fusions!"
end
end
@assert is_connected(graph) "Graph is not connected!"
return true
end

View File

@ -1,389 +0,0 @@
using DataStructures
in(node::Node, graph::DAG) = node in graph.nodes
in(edge::Edge, graph::DAG) = edge in graph.edges
function is_parent(potential_parent, node)
return potential_parent in node.parents
end
function is_child(potential_child, node)
return potential_child in node.children
end
function ==(n1::Node, n2::Node, g::DAG)
if typeof(n1) != typeof(n2)
return false
end
if !(n1 in g) || !(n2 in g)
return false
end
return n1.task == n2.task && children(n1) == children(n2)
end
# children = prerequisite nodes, nodes that need to execute before the task, edges point into this task
function children(node::Node)
return copy(node.children)
end
# parents = subsequent nodes, nodes that need this node to execute, edges point from this task
function parents(node::Node)
return copy(node.parents)
end
# siblings = all children of any parents, no duplicates, includes the node itself
function siblings(node::Node)
result = Set{Node}()
push!(result, node)
for parent in node.parents
union!(result, parent.children)
end
return result
end
# partners = all parents of any children, no duplicates, includes the node itself
function partners(node::Node)
result = Set{Node}()
push!(result, node)
for child in node.children
union!(result, child.parents)
end
return result
end
# alternative version to partners(Node), avoiding allocation of a new set
# works on the given set and returns nothing
function partners(node::Node, set::Set{Node})
push!(set, node)
for child in node.children
union!(set, child.parents)
end
return nothing
end
is_entry_node(node::Node) = length(node.children) == 0
is_exit_node(node::Node) = length(node.parents) == 0
# function to invalidate the operation caches for a given NodeFusion
function invalidate_caches!(graph::DAG, operation::NodeFusion)
delete!(graph.possibleOperations, operation)
# delete the operation from all caches of nodes involved in the operation
filter!(!=(operation), operation.input[1].nodeFusions)
filter!(!=(operation), operation.input[3].nodeFusions)
operation.input[2].nodeFusion = missing
return nothing
end
# function to invalidate the operation caches for a given NodeReduction
function invalidate_caches!(graph::DAG, operation::NodeReduction)
delete!(graph.possibleOperations, operation)
for node in operation.input
node.nodeReduction = missing
end
return nothing
end
# function to invalidate the operation caches for a given NodeSplit
function invalidate_caches!(graph::DAG, operation::NodeSplit)
delete!(graph.possibleOperations, operation)
# delete the operation from all caches of nodes involved in the operation
# for node split there is only one node
operation.input.nodeSplit = missing
return nothing
end
# function to invalidate the operation caches of a ComputeTaskNode
function invalidate_operation_caches!(graph::DAG, node::ComputeTaskNode)
if !ismissing(node.nodeReduction)
invalidate_caches!(graph, node.nodeReduction)
end
if !ismissing(node.nodeSplit)
invalidate_caches!(graph, node.nodeSplit)
end
while !isempty(node.nodeFusions)
invalidate_caches!(graph, pop!(node.nodeFusions))
end
return nothing
end
# function to invalidate the operation caches of a DataTaskNode
function invalidate_operation_caches!(graph::DAG, node::DataTaskNode)
if !ismissing(node.nodeReduction)
invalidate_caches!(graph, node.nodeReduction)
end
if !ismissing(node.nodeSplit)
invalidate_caches!(graph, node.nodeSplit)
end
if !ismissing(node.nodeFusion)
invalidate_caches!(graph, node.nodeFusion)
end
return nothing
end
# for graph mutating functions we need to do a few things
# 1: mute the graph (duh)
# 2: keep track of what was changed for the diff (if track == true)
# 3: invalidate operation caches
function insert_node!(graph::DAG, node::Node, track=true, invalidate_cache=true)
# 1: mute
push!(graph.nodes, node)
# 2: keep track
if (track) push!(graph.diff.addedNodes, node) end
# 3: invalidate caches
if (!invalidate_cache) return node end
push!(graph.dirtyNodes, node)
return node
end
function insert_edge!(graph::DAG, node1::Node, node2::Node, track=true, invalidate_cache=true)
# @assert (node2 ∉ node1.parents) && (node1 ∉ node2.children) "Edge to insert already exists"
# 1: mute
# edge points from child to parent
push!(node1.parents, node2)
push!(node2.children, node1)
# 2: keep track
if (track) push!(graph.diff.addedEdges, make_edge(node1, node2)) end
# 3: invalidate caches
if (!invalidate_cache) return nothing end
invalidate_operation_caches!(graph, node1)
invalidate_operation_caches!(graph, node2)
push!(graph.dirtyNodes, node1)
push!(graph.dirtyNodes, node2)
return nothing
end
function remove_node!(graph::DAG, node::Node, track=true, invalidate_cache=true)
# @assert node in graph.nodes "Trying to remove a node that's not in the graph"
# 1: mute
delete!(graph.nodes, node)
# 2: keep track
if (track) push!(graph.diff.removedNodes, node) end
# 3: invalidate caches
if (!invalidate_cache) return nothing end
invalidate_operation_caches!(graph, node)
delete!(graph.dirtyNodes, node)
return nothing
end
function remove_edge!(graph::DAG, node1::Node, node2::Node, track=true, invalidate_cache=true)
# 1: mute
pre_length1 = length(node1.parents)
pre_length2 = length(node2.children)
filter!(x -> x != node2, node1.parents)
filter!(x -> x != node1, node2.children)
#=@assert begin
removed = pre_length1 - length(node1.parents)
removed <= 1
end "removed more than one node from node1's parents"=#
#=@assert begin
removed = pre_length2 - length(node2.children)
removed <= 1
end "removed more than one node from node2's children"=#
# 2: keep track
if (track) push!(graph.diff.removedEdges, make_edge(node1, node2)) end
# 3: invalidate caches
if (!invalidate_cache) return nothing end
invalidate_operation_caches!(graph, node1)
invalidate_operation_caches!(graph, node2)
if (node1 in graph)
push!(graph.dirtyNodes, node1)
end
if (node2 in graph)
push!(graph.dirtyNodes, node2)
end
return nothing
end
# return the graph "difference" since last time this function was called
function get_snapshot_diff(graph::DAG)
return swapfield!(graph, :diff, Diff())
end
function graph_properties(graph::DAG)
# make sure the graph is fully generated
apply_all!(graph)
d = 0
ce = 0
ed = 0
for node in graph.nodes
d += data(node.task) * length(node.parents)
ce += compute_effort(node.task)
ed += length(node.parents)
end
ci = ce / d
result = (data = d,
compute_effort = ce,
compute_intensity = ci,
nodes = length(graph.nodes),
edges = ed)
return result
end
function get_exit_node(graph::DAG)
for node in graph.nodes
if (is_exit_node(node))
return node
end
end
@assert false "The given graph has no exit node! It is either empty or not acyclic!"
end
# check whether the given graph is connected
function is_connected(graph::DAG)
nodeQueue = Deque{Node}()
push!(nodeQueue, get_exit_node(graph))
seenNodes = Set{Node}()
while !isempty(nodeQueue)
current = pop!(nodeQueue)
push!(seenNodes, current)
for child in current.children
push!(nodeQueue, child)
end
end
return length(seenNodes) == length(graph.nodes)
end
function show_nodes(io, graph::DAG)
print(io, "[")
first = true
for n in graph.nodes
if first
first = false
else
print(io, ", ")
end
print(io, n)
end
print(io, "]")
end
function show(io::IO, graph::DAG)
println(io, "Graph:")
print(io, " Nodes: ")
nodeDict = Dict{Type, Int64}()
noEdges = 0
for node in graph.nodes
if haskey(nodeDict, typeof(node.task))
nodeDict[typeof(node.task)] = nodeDict[typeof(node.task)] + 1
else
nodeDict[typeof(node.task)] = 1
end
noEdges += length(parents(node))
end
if length(graph.nodes) <= 20
show_nodes(io, graph)
else
print("Total: ", length(graph.nodes), ", ")
first = true
i = 0
for (type, number) in zip(keys(nodeDict), values(nodeDict))
i += 1
if first
first = false
else
print(", ")
end
if (i % 3 == 0)
print("\n ")
end
print(type, ": ", number)
end
end
println(io)
println(io, " Edges: ", noEdges)
properties = graph_properties(graph)
println(io, " Total Compute Effort: ", properties.compute_effort)
println(io, " Total Data Transfer: ", properties.data)
println(io, " Total Compute Intensity: ", properties.compute_intensity)
end
function show(io::IO, diff::Diff)
print(io, "Nodes: ")
print(io, length(diff.addedNodes) + length(diff.removedNodes))
print(io, " Edges: ")
print(io, length(diff.addedEdges) + length(diff.removedEdges))
end
# return a namedtuple of the lengths of the added/removed nodes/edges
function length(diff::Diff)
return (
addedNodes = length(diff.addedNodes),
removedNodes = length(diff.removedNodes),
addedEdges = length(diff.addedEdges),
removedEdges = length(diff.removedEdges)
)
end
function is_valid(graph::DAG)
for node in graph.nodes
@assert is_valid(graph, node)
end
for op in graph.operationsToApply
@assert is_valid(graph, op)
end
for nr in graph.possibleOperations.nodeReductions
@assert is_valid(graph, nr)
end
for ns in graph.possibleOperations.nodeSplits
@assert is_valid(graph, ns)
end
for nf in graph.possibleOperations.nodeFusions
@assert is_valid(graph, nf)
end
for node in graph.dirtyNodes
@assert node in graph "Dirty Node is not part of the graph!"
@assert ismissing(node.nodeReduction) "Dirty Node has a NodeReduction!"
@assert ismissing(node.nodeSplit) "Dirty Node has a NodeSplit!"
if (typeof(node) <: DataTaskNode)
@assert ismissing(node.nodeFusion) "Dirty DataTaskNode has a Node Fusion!"
elseif (typeof(node) <: ComputeTaskNode)
@assert isempty(node.nodeFusions) "Dirty ComputeTaskNode has Node Fusions!"
end
end
@assert is_connected(graph) "Graph is not connected!"
return true
end

15
src/node/compare.jl Normal file
View File

@ -0,0 +1,15 @@
function ==(e1::Edge, e2::Edge)
return e1.edge[1] == e2.edge[1] && e1.edge[2] == e2.edge[2]
end
function ==(n1::Node, n2::Node)
return false
end
function ==(n1::ComputeTaskNode, n2::ComputeTaskNode)
return n1.id == n2.id
end
function ==(n1::DataTaskNode, n2::DataTaskNode)
return n1.id == n2.id
end

23
src/node/create.jl Normal file
View File

@ -0,0 +1,23 @@
function make_node(t::AbstractTask)
error("Cannot make a node from this task type")
end
function make_node(t::AbstractDataTask)
return DataTaskNode(t)
end
function make_node(t::AbstractComputeTask)
return ComputeTaskNode(t)
end
function make_edge(n1::Node, n2::Node)
error("Can only create edges from compute to data node or reverse")
end
function make_edge(n1::ComputeTaskNode, n2::DataTaskNode)
return Edge((n1, n2))
end
function make_edge(n1::DataTaskNode, n2::ComputeTaskNode)
return Edge((n1, n2))
end

7
src/node/print.jl Normal file
View File

@ -0,0 +1,7 @@
function show(io::IO, n::Node)
print(io, "Node(", n.task, ")")
end
function show(io::IO, e::Edge)
print(io, "Edge(", e.edge[1], ", ", e.edge[2], ")")
end

52
src/node/properties.jl Normal file
View File

@ -0,0 +1,52 @@
is_entry_node(node::Node) = length(node.children) == 0
is_exit_node(node::Node) = length(node.parents) == 0
# children = prerequisite nodes, nodes that need to execute before the task, edges point into this task
function children(node::Node)
return copy(node.children)
end
# parents = subsequent nodes, nodes that need this node to execute, edges point from this task
function parents(node::Node)
return copy(node.parents)
end
# siblings = all children of any parents, no duplicates, includes the node itself
function siblings(node::Node)
result = Set{Node}()
push!(result, node)
for parent in node.parents
union!(result, parent.children)
end
return result
end
# partners = all parents of any children, no duplicates, includes the node itself
function partners(node::Node)
result = Set{Node}()
push!(result, node)
for child in node.children
union!(result, child.parents)
end
return result
end
# alternative version to partners(Node), avoiding allocation of a new set
# works on the given set and returns nothing
function partners(node::Node, set::Set{Node})
push!(set, node)
for child in node.children
union!(set, child.parents)
end
return nothing
end
function is_parent(potential_parent, node)
return potential_parent in node.parents
end
function is_child(potential_child, node)
return potential_child in node.children
end

View File

@ -54,3 +54,7 @@ struct Edge
# edge points from child to parent
edge::Union{Tuple{DataTaskNode, ComputeTaskNode}, Tuple{ComputeTaskNode, DataTaskNode}}
end
copy(m::Missing) = missing
copy(n::ComputeTaskNode) = ComputeTaskNode(copy(n.task), copy(n.parents), copy(n.children), UUIDs.uuid1(rng[threadid()]), copy(n.nodeReduction), copy(n.nodeSplit), copy(n.nodeFusions))
copy(n::DataTaskNode) = DataTaskNode(copy(n.task), copy(n.parents), copy(n.children), UUIDs.uuid1(rng[threadid()]), copy(n.nodeReduction), copy(n.nodeSplit), copy(n.nodeFusion))

43
src/node/validate.jl Normal file
View File

@ -0,0 +1,43 @@
function is_valid_node(graph::DAG, node::Node)
@assert node in graph "Node is not part of the given graph!"
for parent in node.parents
@assert typeof(parent) != typeof(node) "Node's type is the same as its parent's!"
@assert parent in graph "Node's parent is not in the same graph!"
@assert node in parent.children "Node is not a child of its parent!"
end
for child in node.children
@assert typeof(child) != typeof(node) "Node's type is the same as its child's!"
@assert child in graph "Node's child is not in the same graph!"
@assert node in child.parents "Node is not a parent of its child!"
end
if !ismissing(node.nodeReduction)
@assert is_valid(graph, node.nodeReduction)
end
if !ismissing(node.nodeSplit)
@assert is_valid(graph, node.nodeSplit)
end
return true
end
# call with @assert
function is_valid(graph::DAG, node::ComputeTaskNode)
@assert is_valid_node(graph, node)
for nf in node.nodeFusions
@assert is_valid(graph, nf)
end
return true
end
# call with @assert
function is_valid(graph::DAG, node::DataTaskNode)
@assert is_valid_node(graph, node)
if !ismissing(node.nodeFusion)
@assert is_valid(graph, node.nodeFusion)
end
return true
end

View File

@ -1,95 +0,0 @@
function make_node(t::AbstractTask)
error("Cannot make a node from this task type")
end
function make_node(t::AbstractDataTask)
return DataTaskNode(t)
end
function make_node(t::AbstractComputeTask)
return ComputeTaskNode(t)
end
function make_edge(n1::Node, n2::Node)
error("Can only create edges from compute to data node or reverse")
end
function make_edge(n1::ComputeTaskNode, n2::DataTaskNode)
return Edge((n1, n2))
end
function make_edge(n1::DataTaskNode, n2::ComputeTaskNode)
return Edge((n1, n2))
end
function show(io::IO, n::Node)
print(io, "Node(", n.task, ")")
end
function show(io::IO, e::Edge)
print(io, "Edge(", e.edge[1], ", ", e.edge[2], ")")
end
function ==(e1::Edge, e2::Edge)
return e1.edge[1] == e2.edge[1] && e1.edge[2] == e2.edge[2]
end
function ==(n1::Node, n2::Node)
return false
end
function ==(n1::ComputeTaskNode, n2::ComputeTaskNode)
return n1.id == n2.id
end
function ==(n1::DataTaskNode, n2::DataTaskNode)
return n1.id == n2.id
end
function is_valid_node(graph::DAG, node::Node)
@assert node in graph "Node is not part of the given graph!"
for parent in node.parents
@assert typeof(parent) != typeof(node) "Node's type is the same as its parent's!"
@assert parent in graph "Node's parent is not in the same graph!"
@assert node in parent.children "Node is not a child of its parent!"
end
for child in node.children
@assert typeof(child) != typeof(node) "Node's type is the same as its child's!"
@assert child in graph "Node's child is not in the same graph!"
@assert node in child.parents "Node is not a parent of its child!"
end
if !ismissing(node.nodeReduction)
@assert is_valid(graph, node.nodeReduction)
end
if !ismissing(node.nodeSplit)
@assert is_valid(graph, node.nodeSplit)
end
return true
end
# call with @assert
function is_valid(graph::DAG, node::ComputeTaskNode)
@assert is_valid_node(graph, node)
for nf in node.nodeFusions
@assert is_valid(graph, nf)
end
return true
end
# call with @assert
function is_valid(graph::DAG, node::DataTaskNode)
@assert is_valid_node(graph, node)
if !ismissing(node.nodeFusion)
@assert is_valid(graph, node.nodeFusion)
end
return true
end
copy(m::Missing) = missing
copy(n::ComputeTaskNode) = ComputeTaskNode(copy(n.task), copy(n.parents), copy(n.children), UUIDs.uuid1(rng[threadid()]), copy(n.nodeReduction), copy(n.nodeSplit), copy(n.nodeFusions))
copy(n::DataTaskNode) = DataTaskNode(copy(n.task), copy(n.parents), copy(n.children), UUIDs.uuid1(rng[threadid()]), copy(n.nodeReduction), copy(n.nodeSplit), copy(n.nodeFusion))

34
src/operation/type.jl Normal file
View File

@ -0,0 +1,34 @@
# An abstract base class for operations
# an operation can be applied to a DAG
abstract type Operation end
# An abstract base class for already applied operations
# an applied operation can be reversed iff it is the last applied operation on the DAG
abstract type AppliedOperation end
struct NodeFusion <: Operation
input::Tuple{ComputeTaskNode, DataTaskNode, ComputeTaskNode}
end
struct AppliedNodeFusion <: AppliedOperation
operation::NodeFusion
diff::Diff
end
struct NodeReduction <: Operation
input::Vector{Node}
end
struct AppliedNodeReduction <: AppliedOperation
operation::NodeReduction
diff::Diff
end
struct NodeSplit <: Operation
input::Node
end
struct AppliedNodeSplit <: AppliedOperation
operation::NodeSplit
diff::Diff
end

11
src/task/compare.jl Normal file
View File

@ -0,0 +1,11 @@
function ==(t1::AbstractTask, t2::AbstractTask)
return false
end
function ==(t1::AbstractComputeTask, t2::AbstractComputeTask)
return typeof(t1) == typeof(t2)
end
function ==(t1::AbstractDataTask, t2::AbstractDataTask)
return data(t1) == data(t2)
end

4
src/task/print.jl Normal file
View File

@ -0,0 +1,4 @@
function show(io::IO, t::FusedComputeTask)
(T1, T2) = get_types(t)
print(io, "ComputeFuse(", T1(), ", ", T2(), ")")
end

View File

@ -30,24 +30,4 @@ function compute_intensity(t::AbstractTask)::UInt64
return typemax(UInt64)
end
return compute_effort(t) / data(t)
end
function show(io::IO, t::FusedComputeTask)
(T1, T2) = get_types(t)
print(io, "ComputeFuse(", T1(), ", ", T2(), ")")
end
function ==(t1::AbstractTask, t2::AbstractTask)
return false
end
function ==(t1::AbstractComputeTask, t2::AbstractComputeTask)
return typeof(t1) == typeof(t2)
end
function ==(t1::AbstractDataTask, t2::AbstractDataTask)
return data(t1) == data(t2)
end
copy(t::AbstractDataTask) = error("Need to implement copying for your data tasks!")
copy(t::AbstractComputeTask) = typeof(t)()
end

View File

@ -7,3 +7,6 @@ struct FusedComputeTask{T1<:AbstractComputeTask, T2<:AbstractComputeTask} <: Abs
end
get_types(::FusedComputeTask{T1, T2}) where {T1, T2} = (T1, T2)
copy(t::AbstractDataTask) = error("Need to implement copying for your data tasks!")
copy(t::AbstractComputeTask) = typeof(t)()