65 lines
1.4 KiB
Python
65 lines
1.4 KiB
Python
from graphviz import Digraph
|
|
from pythonosc.dispatcher import Dispatcher
|
|
from pythonosc.osc_server import BlockingOSCUDPServer
|
|
import threading
|
|
import time
|
|
|
|
edge_attrs = {
|
|
'arrowhead': 'vee',
|
|
}
|
|
|
|
edge_colors = {
|
|
'T': 'red',
|
|
'F': 'yellow',
|
|
'B': 'green',
|
|
'X': 'blue',
|
|
'L': 'purple',
|
|
}
|
|
|
|
|
|
g = Digraph('G', filename='graph.gv', format='png', engine='circo', strict=True, edge_attr = edge_attrs)
|
|
|
|
g_lock = threading.Lock()
|
|
|
|
def print_vertex(address, *args):
|
|
global g
|
|
v = args[args.index('vertex') + 1]
|
|
if v == -1:
|
|
with g_lock:
|
|
g = Digraph('G', filename='graph.gv', format='pdf', engine='circo', strict=True, edge_attr = edge_attrs)
|
|
return
|
|
with g_lock:
|
|
g.node(str(v))
|
|
print(args)
|
|
|
|
def print_edge(address, *args):
|
|
v = args[args.index('v') + 1]
|
|
w = args[args.index('w') + 1]
|
|
t = args[args.index('edge_type') + 1]
|
|
if v == -1:
|
|
return
|
|
with g_lock:
|
|
g.edge(str(v), str(w), color=edge_colors[t])
|
|
print(args)
|
|
|
|
def update_loop():
|
|
while True:
|
|
g.render("Test")
|
|
time.sleep(0.4)
|
|
|
|
|
|
ul_thread = threading.Thread(target=update_loop)
|
|
|
|
ul_thread.start()
|
|
|
|
dispatcher = Dispatcher()
|
|
dispatcher.map("/edge", print_edge)
|
|
dispatcher.map("/vertex", print_vertex)
|
|
|
|
ip = "127.0.0.1"
|
|
port = 5050
|
|
|
|
server = BlockingOSCUDPServer((ip, port), dispatcher)
|
|
server.serve_forever() # Blocks forever
|
|
|