import os import time import numpy as np from .graph import topo_sort # State-bearing attributes for each Node type. Used by transfer_state() to keep # phase / time / filter memory continuous across hot-reloads. STATE_ATTRS = { 'Osc': ('phase',), 'Trig': ('t',), 'Seq': ('t',), 'Adsr': ('state', 'value', 'last_gate', 'release_start'), 'Filter': ('x1', 'x2', 'y1', 'y2'), 'Delay': ('buffer', 'size', 'write_idx'), 'Poly': ('t', 'last_step'), } def transfer_state(old_graph, new_graph): """Copy state from old.named[name] to new.named[name] when the class matches. Returns count of nodes whose state was preserved.""" n = 0 for name, new_node in new_graph.named.items(): old_node = old_graph.named.get(name) if old_node is None or type(old_node) is not type(new_node): continue attrs = STATE_ATTRS.get(type(new_node).__name__) if not attrs: continue for a in attrs: if hasattr(old_node, a): setattr(new_node, a, getattr(old_node, a)) n += 1 return n class Engine: def __init__(self, graph=None, sr=48000, block_size=512, gain=0.3): self.sr = sr self.block_size = block_size self.gain = gain # `live` is a single tuple (graph, order). Replacing it is one atomic ref-store # under the GIL, which is enough to swap from the audio callback's point of view. self.live = None # Per-block snapshot of named-node output buffers, replaced atomically each block. # Readers (e.g. WS thread) see either the pre- or post-block dict, never a torn one. self.taps_snapshot = {} # Latest patch source loaded into the engine. Updated by watch_and_reload. self.current_patch_text = '' if graph is not None: self.set_graph(graph) @property def graph(self): return self.live[0] if self.live is not None else None def set_graph(self, graph, preserve_from=None): n_kept = 0 if preserve_from is not None: n_kept = transfer_state(preserve_from, graph) order = topo_sort(graph.all) self.live = (graph, order) return n_kept def render_block(self, n): graph, order = self.live # snapshot — survives a mid-block swap for node in order: node.output_buffer = node.process(n, self.sr) # Cheap snapshot of named-node outputs (refs only — process() returns a fresh # array each block so old refs stay valid until GC). Single dict-ref swap. self.taps_snapshot = {name: node.output_buffer for name, node in graph.named.items() if node.output_buffer is not None} return graph.out.output_buffer def render_offline(self, duration_s): if self.live is None: raise RuntimeError('No graph set. Pass graph= to Engine() or call set_graph().') total = int(duration_s * self.sr) out = np.zeros(total, dtype=np.float32) offset = 0 while offset < total: n = min(self.block_size, total - offset) out[offset:offset + n] = self.render_block(n) * self.gain offset += n return out def _audio_callback(self, outdata, frames, time_info, status): if status: print(f'[stream] {status}', flush=True) offset = 0 while offset < frames: n = min(self.block_size, frames - offset) block = self.render_block(n) * self.gain outdata[offset:offset + n, 0] = block if outdata.shape[1] > 1: outdata[offset:offset + n, 1] = block offset += n def run(self, duration_s): import sounddevice as sd if self.live is None: raise RuntimeError('No graph set.') with sd.OutputStream(samplerate=self.sr, channels=2, callback=self._audio_callback, blocksize=self.block_size, dtype='float32'): sd.sleep(int(duration_s * 1000)) def watch_and_reload(self, patch_path, build_fn, poll_interval=0.15, stop_event=None): """Poll patch_path's mtime; on change, parse+build+swap. Audio-agnostic. Stops when stop_event (threading.Event) is set, or on KeyboardInterrupt.""" try: last_mtime = os.path.getmtime(patch_path) except FileNotFoundError: last_mtime = 0 try: while stop_event is None or not stop_event.is_set(): time.sleep(poll_interval) try: mt = os.path.getmtime(patch_path) except FileNotFoundError: continue if mt == last_mtime: continue last_mtime = mt try: with open(patch_path, 'r', encoding='utf-8') as f: src = f.read() new_graph = build_fn(src) except Exception as e: print(f'[reload error] {type(e).__name__}: {e}') continue n_kept = self.set_graph(new_graph, preserve_from=self.graph) self.current_patch_text = src ts = time.strftime('%H:%M:%S') print(f'[{ts}] reloaded ({n_kept} nodes kept state)') except KeyboardInterrupt: pass def run_live(self, patch_path, build_fn, poll_interval=0.15): """Play indefinitely, hot-reloading patch_path whenever its mtime changes. build_fn(src_text) -> Graph. Ctrl+C to stop. Reload errors are reported on stdout; the previous graph keeps playing.""" import sounddevice as sd if self.live is None: raise RuntimeError('No graph set.') print(f'[live] watching {patch_path} (Ctrl+C to stop)') with sd.OutputStream(samplerate=self.sr, channels=2, callback=self._audio_callback, blocksize=self.block_size, dtype='float32'): self.watch_and_reload(patch_path, build_fn, poll_interval) print('\n[live] stopping')