cm-dfs/osc/main.py

65 lines
1.4 KiB
Python

from graphviz import Digraph
from pythonosc.dispatcher import Dispatcher
from pythonosc.osc_server import BlockingOSCUDPServer
import threading
import time
edge_attrs = {
'arrowhead': 'vee',
}
edge_colors = {
'T': 'red',
'F': 'yellow',
'B': 'green',
'X': 'blue',
'L': 'purple',
}
g = Digraph('G', filename='graph.gv', format='png', engine='circo', strict=True, edge_attr = edge_attrs)
g_lock = threading.Lock()
def print_vertex(address, *args):
global g
v = args[args.index('vertex') + 1]
if v == -1:
with g_lock:
g = Digraph('G', filename='graph.gv', format='pdf', engine='circo', strict=True, edge_attr = edge_attrs)
return
with g_lock:
g.node(str(v))
print(args)
def print_edge(address, *args):
v = args[args.index('v') + 1]
w = args[args.index('w') + 1]
t = args[args.index('edge_type') + 1]
if v == -1:
return
with g_lock:
g.edge(str(v), str(w), color=edge_colors[t])
print(args)
def update_loop():
while True:
g.render("Test")
time.sleep(0.4)
ul_thread = threading.Thread(target=update_loop)
ul_thread.start()
dispatcher = Dispatcher()
dispatcher.map("/edge", print_edge)
dispatcher.map("/vertex", print_vertex)
ip = "127.0.0.1"
port = 5050
server = BlockingOSCUDPServer((ip, port), dispatcher)
server.serve_forever() # Blocks forever