Patch language with osc/noise/trig/seq/adsr/filter/delay/poly + voice templates and inline live values. Two runtimes: - code_sinth/ — Python engine (numpy + sounddevice). Hot-reload via mtime watcher. Offline render to WAV. Static-HTTP+WS visualizer (viz/) that injects waveforms next to each `node X = ...` line. - web/ — port of the engine to JS running in AudioWorklet. Single static page with CodeMirror 6 editor (line widgets for live waveforms) and a control surface on the right with knobs/faders/step_seq/piano_roll declared from the patch. State preserved across hot-reload. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
369 lines
11 KiB
Python
369 lines
11 KiB
Python
import numpy as np
|
|
|
|
|
|
class Node:
|
|
def __init__(self):
|
|
self.inputs = []
|
|
self.output_buffer = None
|
|
|
|
def process(self, n, sr):
|
|
raise NotImplementedError
|
|
|
|
|
|
class Const(Node):
|
|
def __init__(self, value):
|
|
super().__init__()
|
|
self.value = float(value)
|
|
|
|
def process(self, n, sr):
|
|
return np.full(n, self.value, dtype=np.float32)
|
|
|
|
|
|
class Osc(Node):
|
|
WAVEFORMS = ('sine', 'saw', 'square', 'tri')
|
|
|
|
def __init__(self, waveform, freq):
|
|
super().__init__()
|
|
if waveform not in self.WAVEFORMS:
|
|
raise ValueError(f'Unknown waveform: {waveform!r}')
|
|
self.waveform = waveform
|
|
self.freq = freq
|
|
self.inputs = [freq]
|
|
self.phase = 0.0
|
|
|
|
def process(self, n, sr):
|
|
freq = self.freq.output_buffer
|
|
phase_inc = freq / sr
|
|
phases = self.phase + np.cumsum(phase_inc)
|
|
self.phase = float(phases[-1] % 1.0)
|
|
p = phases % 1.0
|
|
if self.waveform == 'sine':
|
|
out = np.sin(2.0 * np.pi * p)
|
|
elif self.waveform == 'saw':
|
|
out = 2.0 * p - 1.0
|
|
elif self.waveform == 'square':
|
|
out = np.where(p < 0.5, 1.0, -1.0)
|
|
else: # tri
|
|
out = 4.0 * np.abs(p - 0.5) - 1.0
|
|
return out.astype(np.float32)
|
|
|
|
|
|
class Trig(Node):
|
|
"""Periodic gate: high for `duration` seconds every `period` seconds, starting at t=0."""
|
|
def __init__(self, period, duration):
|
|
super().__init__()
|
|
self.period = period
|
|
self.duration = duration
|
|
self.inputs = [period, duration]
|
|
self.t = 0.0
|
|
|
|
def process(self, n, sr):
|
|
period = float(self.period.output_buffer[0])
|
|
duration = float(self.duration.output_buffer[0])
|
|
dt = 1.0 / sr
|
|
times = self.t + np.arange(n, dtype=np.float64) * dt
|
|
self.t = float(times[-1] + dt)
|
|
phase = np.mod(times, period)
|
|
return (phase < duration).astype(np.float32)
|
|
|
|
|
|
class Adsr(Node):
|
|
def __init__(self, a, d, s, r, gate):
|
|
super().__init__()
|
|
self.a, self.d, self.s, self.r = a, d, s, r
|
|
self.gate = gate
|
|
self.inputs = [a, d, s, r, gate]
|
|
self.state = 'idle'
|
|
self.value = 0.0
|
|
self.last_gate = 0.0
|
|
self.release_start = 0.0
|
|
|
|
def process(self, n, sr):
|
|
a = float(self.a.output_buffer[0])
|
|
d = float(self.d.output_buffer[0])
|
|
s_lvl = float(self.s.output_buffer[0])
|
|
r = float(self.r.output_buffer[0])
|
|
gate = self.gate.output_buffer
|
|
|
|
att_inc = 1.0 / max(a * sr, 1.0)
|
|
dec_dec = (1.0 - s_lvl) / max(d * sr, 1.0)
|
|
rel_norm = 1.0 / max(r * sr, 1.0)
|
|
|
|
out = np.empty(n, dtype=np.float32)
|
|
for i in range(n):
|
|
g = gate[i]
|
|
if g > 0.5 and self.last_gate <= 0.5:
|
|
self.state = 'attack'
|
|
elif g <= 0.5 and self.last_gate > 0.5:
|
|
self.state = 'release'
|
|
self.release_start = self.value
|
|
self.last_gate = g
|
|
|
|
if self.state == 'attack':
|
|
self.value += att_inc
|
|
if self.value >= 1.0:
|
|
self.value = 1.0
|
|
self.state = 'decay'
|
|
elif self.state == 'decay':
|
|
self.value -= dec_dec
|
|
if self.value <= s_lvl:
|
|
self.value = s_lvl
|
|
self.state = 'sustain'
|
|
elif self.state == 'sustain':
|
|
self.value = s_lvl
|
|
elif self.state == 'release':
|
|
self.value -= rel_norm * self.release_start
|
|
if self.value <= 0.0:
|
|
self.value = 0.0
|
|
self.state = 'idle'
|
|
out[i] = self.value
|
|
return out
|
|
|
|
|
|
class Seq(Node):
|
|
"""Step sequencer: emits steps[i] held continuously, advancing at `rate` steps per second.
|
|
Pair with trig(period=1/rate) for a synced gate."""
|
|
def __init__(self, rate, steps):
|
|
super().__init__()
|
|
if not isinstance(steps, list) or not steps:
|
|
raise ValueError('seq() needs steps=[v1, v2, ...] with at least one element')
|
|
self.rate = rate
|
|
self.steps = np.asarray(steps, dtype=np.float32)
|
|
self.inputs = [rate]
|
|
self.t = 0.0
|
|
|
|
def process(self, n, sr):
|
|
rate = float(self.rate.output_buffer[0])
|
|
dt = 1.0 / sr
|
|
times = self.t + np.arange(n, dtype=np.float64) * dt
|
|
self.t = float(times[-1] + dt)
|
|
idx = (times * rate).astype(np.int64) % len(self.steps)
|
|
return self.steps[idx]
|
|
|
|
|
|
class Noise(Node):
|
|
"""White noise in [-1, 1]."""
|
|
def __init__(self, seed=None):
|
|
super().__init__()
|
|
seed_val = int(seed.value) if hasattr(seed, 'value') else None
|
|
self.rng = np.random.default_rng(seed_val)
|
|
|
|
def process(self, n, sr):
|
|
return (self.rng.random(n).astype(np.float32) * 2.0 - 1.0)
|
|
|
|
|
|
class Filter(Node):
|
|
"""RBJ biquad: lp / hp / bp. Coefficients recomputed per sample so cutoff and q can be audio-rate modulated."""
|
|
KINDS = ('lp', 'hp', 'bp')
|
|
|
|
def __init__(self, kind, in_, cutoff, q):
|
|
super().__init__()
|
|
if kind not in self.KINDS:
|
|
raise ValueError(f'Unknown filter kind: {kind!r}')
|
|
self.kind = kind
|
|
self.in_ = in_
|
|
self.cutoff = cutoff
|
|
self.q = q
|
|
self.inputs = [in_, cutoff, q]
|
|
self.x1 = self.x2 = 0.0
|
|
self.y1 = self.y2 = 0.0
|
|
|
|
def process(self, n, sr):
|
|
x_buf = self.in_.output_buffer
|
|
c_buf = self.cutoff.output_buffer
|
|
q_buf = self.q.output_buffer
|
|
out = np.empty(n, dtype=np.float32)
|
|
nyq = 0.499 * sr
|
|
kind = self.kind
|
|
x1, x2, y1, y2 = self.x1, self.x2, self.y1, self.y2
|
|
two_pi_over_sr = 2.0 * np.pi / sr
|
|
for i in range(n):
|
|
cutoff = float(c_buf[i])
|
|
if cutoff > nyq:
|
|
cutoff = nyq
|
|
elif cutoff < 1.0:
|
|
cutoff = 1.0
|
|
qv = float(q_buf[i])
|
|
if qv < 0.001:
|
|
qv = 0.001
|
|
w = two_pi_over_sr * cutoff
|
|
cw = np.cos(w)
|
|
sw = np.sin(w)
|
|
alpha = sw / (2.0 * qv)
|
|
if kind == 'lp':
|
|
b0 = (1.0 - cw) * 0.5
|
|
b1 = 1.0 - cw
|
|
b2 = b0
|
|
elif kind == 'hp':
|
|
b0 = (1.0 + cw) * 0.5
|
|
b1 = -(1.0 + cw)
|
|
b2 = b0
|
|
else: # bp
|
|
b0 = sw * 0.5
|
|
b1 = 0.0
|
|
b2 = -b0
|
|
a0 = 1.0 + alpha
|
|
a1 = -2.0 * cw
|
|
a2 = 1.0 - alpha
|
|
x0 = float(x_buf[i])
|
|
y0 = (b0 * x0 + b1 * x1 + b2 * x2 - a1 * y1 - a2 * y2) / a0
|
|
out[i] = y0
|
|
x2 = x1
|
|
x1 = x0
|
|
y2 = y1
|
|
y1 = y0
|
|
self.x1, self.x2, self.y1, self.y2 = x1, x2, y1, y2
|
|
return out
|
|
|
|
|
|
class Delay(Node):
|
|
"""Delay line with feedback. `time` in seconds, `feedback` 0..0.99, `mix` dry/wet 0..1.
|
|
`max_time` (literal) sets the buffer size; defaults to 2.0s."""
|
|
def __init__(self, in_, time, feedback, mix, max_time=2.0):
|
|
super().__init__()
|
|
self.in_ = in_
|
|
self.time = time
|
|
self.feedback = feedback
|
|
self.mix = mix
|
|
self.inputs = [in_, time, feedback, mix]
|
|
# max_time is treated as compile-time constant (sets buffer size).
|
|
self.max_t = float(max_time.value) if hasattr(max_time, 'value') else float(max_time)
|
|
self.buffer = None
|
|
self.size = 0
|
|
self.write_idx = 0
|
|
|
|
def process(self, n, sr):
|
|
if self.buffer is None:
|
|
self.size = max(int(self.max_t * sr), 1)
|
|
self.buffer = np.zeros(self.size, dtype=np.float32)
|
|
x = self.in_.output_buffer
|
|
t = float(self.time.output_buffer[0])
|
|
fb = float(self.feedback.output_buffer[0])
|
|
if fb > 0.99:
|
|
fb = 0.99
|
|
elif fb < 0.0:
|
|
fb = 0.0
|
|
mix = float(self.mix.output_buffer[0])
|
|
d = int(t * sr)
|
|
if d < 1:
|
|
d = 1
|
|
elif d >= self.size:
|
|
d = self.size - 1
|
|
out = np.empty(n, dtype=np.float32)
|
|
buf = self.buffer
|
|
size = self.size
|
|
w = self.write_idx
|
|
for i in range(n):
|
|
r = (w - d) % size
|
|
delayed = buf[r]
|
|
buf[w] = x[i] + fb * delayed
|
|
out[i] = (1.0 - mix) * x[i] + mix * delayed
|
|
w = (w + 1) % size
|
|
self.write_idx = w
|
|
return out
|
|
|
|
|
|
class BinOpNode(Node):
|
|
def __init__(self, op, a, b):
|
|
super().__init__()
|
|
self.op = op
|
|
self.a = a
|
|
self.b = b
|
|
self.inputs = [a, b]
|
|
|
|
def process(self, n, sr):
|
|
a = self.a.output_buffer
|
|
b = self.b.output_buffer
|
|
if self.op == '+':
|
|
return a + b
|
|
if self.op == '-':
|
|
return a - b
|
|
if self.op == '*':
|
|
return a * b
|
|
if self.op == '/':
|
|
return a / b
|
|
raise ValueError(self.op)
|
|
|
|
|
|
class Negate(Node):
|
|
def __init__(self, a):
|
|
super().__init__()
|
|
self.a = a
|
|
self.inputs = [a]
|
|
|
|
def process(self, n, sr):
|
|
return -self.a.output_buffer
|
|
|
|
|
|
class Poly(Node):
|
|
"""Polyphonic voice allocator. Holds N independent VoiceInstances and dispatches notes
|
|
from a step sequence at `rate` steps per second. Each note triggers an LRU voice for
|
|
`gate_duration` seconds. notes[i] == 0 is treated as a rest (no trigger)."""
|
|
def __init__(self, rate, gate_duration, instances, notes):
|
|
super().__init__()
|
|
self.rate = rate
|
|
self.gate_duration = gate_duration
|
|
self.instances = instances
|
|
self.notes = list(notes)
|
|
self.inputs = [rate, gate_duration]
|
|
self.t = 0.0
|
|
self.last_step = -1
|
|
|
|
def _alloc_voice(self):
|
|
# Pick the voice idle the longest (lowest last_on_at). If tie, first found.
|
|
best = self.instances[0]
|
|
for v in self.instances[1:]:
|
|
if v.last_on_at < best.last_on_at:
|
|
best = v
|
|
return best
|
|
|
|
def process(self, n, sr):
|
|
rate = float(self.rate.output_buffer[0])
|
|
gd = float(self.gate_duration.output_buffer[0])
|
|
|
|
# Trigger any step boundaries that fell into this block (catch up if multiple).
|
|
cur_step = int(self.t * rate)
|
|
while self.last_step < cur_step:
|
|
self.last_step += 1
|
|
note = self.notes[self.last_step % len(self.notes)]
|
|
if note > 0.0:
|
|
v = self._alloc_voice()
|
|
v.freq_slot.value = float(note)
|
|
v.gate_slot.value = 1.0
|
|
step_t = self.last_step / rate
|
|
v.gate_off_at = step_t + gd
|
|
v.last_on_at = step_t
|
|
|
|
# Release gates whose timeout has elapsed (block-rate granularity).
|
|
for v in self.instances:
|
|
if v.gate_off_at > 0.0 and self.t >= v.gate_off_at:
|
|
v.gate_slot.value = 0.0
|
|
v.gate_off_at = -1.0
|
|
|
|
# Render every voice's sub-graph and sum.
|
|
out = np.zeros(n, dtype=np.float32)
|
|
for v in self.instances:
|
|
for node in v.order:
|
|
node.output_buffer = node.process(n, sr)
|
|
out += v.output.output_buffer
|
|
|
|
self.t += n / sr
|
|
return out
|
|
|
|
|
|
NODE_REGISTRY = {
|
|
'osc': Osc,
|
|
'trig': Trig,
|
|
'adsr': Adsr,
|
|
'noise': Noise,
|
|
'filter': Filter,
|
|
'seq': Seq,
|
|
'delay': Delay,
|
|
}
|
|
|
|
# Positional arg indices that must be bare identifiers (treated as strings).
|
|
SYMBOLIC_ARGS = {
|
|
'osc': [0], # waveform name
|
|
'filter': [0], # filter kind
|
|
}
|