import numpy as np class Node: def __init__(self): self.inputs = [] self.output_buffer = None def process(self, n, sr): raise NotImplementedError class Const(Node): def __init__(self, value): super().__init__() self.value = float(value) def process(self, n, sr): return np.full(n, self.value, dtype=np.float32) class Osc(Node): WAVEFORMS = ('sine', 'saw', 'square', 'tri') def __init__(self, waveform, freq): super().__init__() if waveform not in self.WAVEFORMS: raise ValueError(f'Unknown waveform: {waveform!r}') self.waveform = waveform self.freq = freq self.inputs = [freq] self.phase = 0.0 def process(self, n, sr): freq = self.freq.output_buffer phase_inc = freq / sr phases = self.phase + np.cumsum(phase_inc) self.phase = float(phases[-1] % 1.0) p = phases % 1.0 if self.waveform == 'sine': out = np.sin(2.0 * np.pi * p) elif self.waveform == 'saw': out = 2.0 * p - 1.0 elif self.waveform == 'square': out = np.where(p < 0.5, 1.0, -1.0) else: # tri out = 4.0 * np.abs(p - 0.5) - 1.0 return out.astype(np.float32) class Trig(Node): """Periodic gate: high for `duration` seconds every `period` seconds, starting at t=0.""" def __init__(self, period, duration): super().__init__() self.period = period self.duration = duration self.inputs = [period, duration] self.t = 0.0 def process(self, n, sr): period = float(self.period.output_buffer[0]) duration = float(self.duration.output_buffer[0]) dt = 1.0 / sr times = self.t + np.arange(n, dtype=np.float64) * dt self.t = float(times[-1] + dt) phase = np.mod(times, period) return (phase < duration).astype(np.float32) class Adsr(Node): def __init__(self, a, d, s, r, gate): super().__init__() self.a, self.d, self.s, self.r = a, d, s, r self.gate = gate self.inputs = [a, d, s, r, gate] self.state = 'idle' self.value = 0.0 self.last_gate = 0.0 self.release_start = 0.0 def process(self, n, sr): a = float(self.a.output_buffer[0]) d = float(self.d.output_buffer[0]) s_lvl = float(self.s.output_buffer[0]) r = float(self.r.output_buffer[0]) gate = self.gate.output_buffer att_inc = 1.0 / max(a * sr, 1.0) dec_dec = (1.0 - s_lvl) / max(d * sr, 1.0) rel_norm = 1.0 / max(r * sr, 1.0) out = np.empty(n, dtype=np.float32) for i in range(n): g = gate[i] if g > 0.5 and self.last_gate <= 0.5: self.state = 'attack' elif g <= 0.5 and self.last_gate > 0.5: self.state = 'release' self.release_start = self.value self.last_gate = g if self.state == 'attack': self.value += att_inc if self.value >= 1.0: self.value = 1.0 self.state = 'decay' elif self.state == 'decay': self.value -= dec_dec if self.value <= s_lvl: self.value = s_lvl self.state = 'sustain' elif self.state == 'sustain': self.value = s_lvl elif self.state == 'release': self.value -= rel_norm * self.release_start if self.value <= 0.0: self.value = 0.0 self.state = 'idle' out[i] = self.value return out class Seq(Node): """Step sequencer: emits steps[i] held continuously, advancing at `rate` steps per second. Pair with trig(period=1/rate) for a synced gate.""" def __init__(self, rate, steps): super().__init__() if not isinstance(steps, list) or not steps: raise ValueError('seq() needs steps=[v1, v2, ...] with at least one element') self.rate = rate self.steps = np.asarray(steps, dtype=np.float32) self.inputs = [rate] self.t = 0.0 def process(self, n, sr): rate = float(self.rate.output_buffer[0]) dt = 1.0 / sr times = self.t + np.arange(n, dtype=np.float64) * dt self.t = float(times[-1] + dt) idx = (times * rate).astype(np.int64) % len(self.steps) return self.steps[idx] class Noise(Node): """White noise in [-1, 1].""" def __init__(self, seed=None): super().__init__() seed_val = int(seed.value) if hasattr(seed, 'value') else None self.rng = np.random.default_rng(seed_val) def process(self, n, sr): return (self.rng.random(n).astype(np.float32) * 2.0 - 1.0) class Filter(Node): """RBJ biquad: lp / hp / bp. Coefficients recomputed per sample so cutoff and q can be audio-rate modulated.""" KINDS = ('lp', 'hp', 'bp') def __init__(self, kind, in_, cutoff, q): super().__init__() if kind not in self.KINDS: raise ValueError(f'Unknown filter kind: {kind!r}') self.kind = kind self.in_ = in_ self.cutoff = cutoff self.q = q self.inputs = [in_, cutoff, q] self.x1 = self.x2 = 0.0 self.y1 = self.y2 = 0.0 def process(self, n, sr): x_buf = self.in_.output_buffer c_buf = self.cutoff.output_buffer q_buf = self.q.output_buffer out = np.empty(n, dtype=np.float32) nyq = 0.499 * sr kind = self.kind x1, x2, y1, y2 = self.x1, self.x2, self.y1, self.y2 two_pi_over_sr = 2.0 * np.pi / sr for i in range(n): cutoff = float(c_buf[i]) if cutoff > nyq: cutoff = nyq elif cutoff < 1.0: cutoff = 1.0 qv = float(q_buf[i]) if qv < 0.001: qv = 0.001 w = two_pi_over_sr * cutoff cw = np.cos(w) sw = np.sin(w) alpha = sw / (2.0 * qv) if kind == 'lp': b0 = (1.0 - cw) * 0.5 b1 = 1.0 - cw b2 = b0 elif kind == 'hp': b0 = (1.0 + cw) * 0.5 b1 = -(1.0 + cw) b2 = b0 else: # bp b0 = sw * 0.5 b1 = 0.0 b2 = -b0 a0 = 1.0 + alpha a1 = -2.0 * cw a2 = 1.0 - alpha x0 = float(x_buf[i]) y0 = (b0 * x0 + b1 * x1 + b2 * x2 - a1 * y1 - a2 * y2) / a0 out[i] = y0 x2 = x1 x1 = x0 y2 = y1 y1 = y0 self.x1, self.x2, self.y1, self.y2 = x1, x2, y1, y2 return out class Delay(Node): """Delay line with feedback. `time` in seconds, `feedback` 0..0.99, `mix` dry/wet 0..1. `max_time` (literal) sets the buffer size; defaults to 2.0s.""" def __init__(self, in_, time, feedback, mix, max_time=2.0): super().__init__() self.in_ = in_ self.time = time self.feedback = feedback self.mix = mix self.inputs = [in_, time, feedback, mix] # max_time is treated as compile-time constant (sets buffer size). self.max_t = float(max_time.value) if hasattr(max_time, 'value') else float(max_time) self.buffer = None self.size = 0 self.write_idx = 0 def process(self, n, sr): if self.buffer is None: self.size = max(int(self.max_t * sr), 1) self.buffer = np.zeros(self.size, dtype=np.float32) x = self.in_.output_buffer t = float(self.time.output_buffer[0]) fb = float(self.feedback.output_buffer[0]) if fb > 0.99: fb = 0.99 elif fb < 0.0: fb = 0.0 mix = float(self.mix.output_buffer[0]) d = int(t * sr) if d < 1: d = 1 elif d >= self.size: d = self.size - 1 out = np.empty(n, dtype=np.float32) buf = self.buffer size = self.size w = self.write_idx for i in range(n): r = (w - d) % size delayed = buf[r] buf[w] = x[i] + fb * delayed out[i] = (1.0 - mix) * x[i] + mix * delayed w = (w + 1) % size self.write_idx = w return out class BinOpNode(Node): def __init__(self, op, a, b): super().__init__() self.op = op self.a = a self.b = b self.inputs = [a, b] def process(self, n, sr): a = self.a.output_buffer b = self.b.output_buffer if self.op == '+': return a + b if self.op == '-': return a - b if self.op == '*': return a * b if self.op == '/': return a / b raise ValueError(self.op) class Negate(Node): def __init__(self, a): super().__init__() self.a = a self.inputs = [a] def process(self, n, sr): return -self.a.output_buffer class Poly(Node): """Polyphonic voice allocator. Holds N independent VoiceInstances and dispatches notes from a step sequence at `rate` steps per second. Each note triggers an LRU voice for `gate_duration` seconds. notes[i] == 0 is treated as a rest (no trigger).""" def __init__(self, rate, gate_duration, instances, notes): super().__init__() self.rate = rate self.gate_duration = gate_duration self.instances = instances self.notes = list(notes) self.inputs = [rate, gate_duration] self.t = 0.0 self.last_step = -1 def _alloc_voice(self): # Pick the voice idle the longest (lowest last_on_at). If tie, first found. best = self.instances[0] for v in self.instances[1:]: if v.last_on_at < best.last_on_at: best = v return best def process(self, n, sr): rate = float(self.rate.output_buffer[0]) gd = float(self.gate_duration.output_buffer[0]) # Trigger any step boundaries that fell into this block (catch up if multiple). cur_step = int(self.t * rate) while self.last_step < cur_step: self.last_step += 1 note = self.notes[self.last_step % len(self.notes)] if note > 0.0: v = self._alloc_voice() v.freq_slot.value = float(note) v.gate_slot.value = 1.0 step_t = self.last_step / rate v.gate_off_at = step_t + gd v.last_on_at = step_t # Release gates whose timeout has elapsed (block-rate granularity). for v in self.instances: if v.gate_off_at > 0.0 and self.t >= v.gate_off_at: v.gate_slot.value = 0.0 v.gate_off_at = -1.0 # Render every voice's sub-graph and sum. out = np.zeros(n, dtype=np.float32) for v in self.instances: for node in v.order: node.output_buffer = node.process(n, sr) out += v.output.output_buffer self.t += n / sr return out NODE_REGISTRY = { 'osc': Osc, 'trig': Trig, 'adsr': Adsr, 'noise': Noise, 'filter': Filter, 'seq': Seq, 'delay': Delay, } # Positional arg indices that must be bare identifiers (treated as strings). SYMBOLIC_ARGS = { 'osc': [0], # waveform name 'filter': [0], # filter kind }