Files
code-sinth/code_sinth/nodes.py
Jose Luis Montañes 7debc7436e initial: code-sinth — DSL-driven modular synth (Python engine + web app)
Patch language with osc/noise/trig/seq/adsr/filter/delay/poly + voice templates
and inline live values. Two runtimes:

- code_sinth/ — Python engine (numpy + sounddevice). Hot-reload via mtime
  watcher. Offline render to WAV. Static-HTTP+WS visualizer (viz/) that
  injects waveforms next to each `node X = ...` line.
- web/ — port of the engine to JS running in AudioWorklet. Single static
  page with CodeMirror 6 editor (line widgets for live waveforms) and a
  control surface on the right with knobs/faders/step_seq/piano_roll
  declared from the patch. State preserved across hot-reload.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-01 17:37:06 +02:00

369 lines
11 KiB
Python

import numpy as np
class Node:
def __init__(self):
self.inputs = []
self.output_buffer = None
def process(self, n, sr):
raise NotImplementedError
class Const(Node):
def __init__(self, value):
super().__init__()
self.value = float(value)
def process(self, n, sr):
return np.full(n, self.value, dtype=np.float32)
class Osc(Node):
WAVEFORMS = ('sine', 'saw', 'square', 'tri')
def __init__(self, waveform, freq):
super().__init__()
if waveform not in self.WAVEFORMS:
raise ValueError(f'Unknown waveform: {waveform!r}')
self.waveform = waveform
self.freq = freq
self.inputs = [freq]
self.phase = 0.0
def process(self, n, sr):
freq = self.freq.output_buffer
phase_inc = freq / sr
phases = self.phase + np.cumsum(phase_inc)
self.phase = float(phases[-1] % 1.0)
p = phases % 1.0
if self.waveform == 'sine':
out = np.sin(2.0 * np.pi * p)
elif self.waveform == 'saw':
out = 2.0 * p - 1.0
elif self.waveform == 'square':
out = np.where(p < 0.5, 1.0, -1.0)
else: # tri
out = 4.0 * np.abs(p - 0.5) - 1.0
return out.astype(np.float32)
class Trig(Node):
"""Periodic gate: high for `duration` seconds every `period` seconds, starting at t=0."""
def __init__(self, period, duration):
super().__init__()
self.period = period
self.duration = duration
self.inputs = [period, duration]
self.t = 0.0
def process(self, n, sr):
period = float(self.period.output_buffer[0])
duration = float(self.duration.output_buffer[0])
dt = 1.0 / sr
times = self.t + np.arange(n, dtype=np.float64) * dt
self.t = float(times[-1] + dt)
phase = np.mod(times, period)
return (phase < duration).astype(np.float32)
class Adsr(Node):
def __init__(self, a, d, s, r, gate):
super().__init__()
self.a, self.d, self.s, self.r = a, d, s, r
self.gate = gate
self.inputs = [a, d, s, r, gate]
self.state = 'idle'
self.value = 0.0
self.last_gate = 0.0
self.release_start = 0.0
def process(self, n, sr):
a = float(self.a.output_buffer[0])
d = float(self.d.output_buffer[0])
s_lvl = float(self.s.output_buffer[0])
r = float(self.r.output_buffer[0])
gate = self.gate.output_buffer
att_inc = 1.0 / max(a * sr, 1.0)
dec_dec = (1.0 - s_lvl) / max(d * sr, 1.0)
rel_norm = 1.0 / max(r * sr, 1.0)
out = np.empty(n, dtype=np.float32)
for i in range(n):
g = gate[i]
if g > 0.5 and self.last_gate <= 0.5:
self.state = 'attack'
elif g <= 0.5 and self.last_gate > 0.5:
self.state = 'release'
self.release_start = self.value
self.last_gate = g
if self.state == 'attack':
self.value += att_inc
if self.value >= 1.0:
self.value = 1.0
self.state = 'decay'
elif self.state == 'decay':
self.value -= dec_dec
if self.value <= s_lvl:
self.value = s_lvl
self.state = 'sustain'
elif self.state == 'sustain':
self.value = s_lvl
elif self.state == 'release':
self.value -= rel_norm * self.release_start
if self.value <= 0.0:
self.value = 0.0
self.state = 'idle'
out[i] = self.value
return out
class Seq(Node):
"""Step sequencer: emits steps[i] held continuously, advancing at `rate` steps per second.
Pair with trig(period=1/rate) for a synced gate."""
def __init__(self, rate, steps):
super().__init__()
if not isinstance(steps, list) or not steps:
raise ValueError('seq() needs steps=[v1, v2, ...] with at least one element')
self.rate = rate
self.steps = np.asarray(steps, dtype=np.float32)
self.inputs = [rate]
self.t = 0.0
def process(self, n, sr):
rate = float(self.rate.output_buffer[0])
dt = 1.0 / sr
times = self.t + np.arange(n, dtype=np.float64) * dt
self.t = float(times[-1] + dt)
idx = (times * rate).astype(np.int64) % len(self.steps)
return self.steps[idx]
class Noise(Node):
"""White noise in [-1, 1]."""
def __init__(self, seed=None):
super().__init__()
seed_val = int(seed.value) if hasattr(seed, 'value') else None
self.rng = np.random.default_rng(seed_val)
def process(self, n, sr):
return (self.rng.random(n).astype(np.float32) * 2.0 - 1.0)
class Filter(Node):
"""RBJ biquad: lp / hp / bp. Coefficients recomputed per sample so cutoff and q can be audio-rate modulated."""
KINDS = ('lp', 'hp', 'bp')
def __init__(self, kind, in_, cutoff, q):
super().__init__()
if kind not in self.KINDS:
raise ValueError(f'Unknown filter kind: {kind!r}')
self.kind = kind
self.in_ = in_
self.cutoff = cutoff
self.q = q
self.inputs = [in_, cutoff, q]
self.x1 = self.x2 = 0.0
self.y1 = self.y2 = 0.0
def process(self, n, sr):
x_buf = self.in_.output_buffer
c_buf = self.cutoff.output_buffer
q_buf = self.q.output_buffer
out = np.empty(n, dtype=np.float32)
nyq = 0.499 * sr
kind = self.kind
x1, x2, y1, y2 = self.x1, self.x2, self.y1, self.y2
two_pi_over_sr = 2.0 * np.pi / sr
for i in range(n):
cutoff = float(c_buf[i])
if cutoff > nyq:
cutoff = nyq
elif cutoff < 1.0:
cutoff = 1.0
qv = float(q_buf[i])
if qv < 0.001:
qv = 0.001
w = two_pi_over_sr * cutoff
cw = np.cos(w)
sw = np.sin(w)
alpha = sw / (2.0 * qv)
if kind == 'lp':
b0 = (1.0 - cw) * 0.5
b1 = 1.0 - cw
b2 = b0
elif kind == 'hp':
b0 = (1.0 + cw) * 0.5
b1 = -(1.0 + cw)
b2 = b0
else: # bp
b0 = sw * 0.5
b1 = 0.0
b2 = -b0
a0 = 1.0 + alpha
a1 = -2.0 * cw
a2 = 1.0 - alpha
x0 = float(x_buf[i])
y0 = (b0 * x0 + b1 * x1 + b2 * x2 - a1 * y1 - a2 * y2) / a0
out[i] = y0
x2 = x1
x1 = x0
y2 = y1
y1 = y0
self.x1, self.x2, self.y1, self.y2 = x1, x2, y1, y2
return out
class Delay(Node):
"""Delay line with feedback. `time` in seconds, `feedback` 0..0.99, `mix` dry/wet 0..1.
`max_time` (literal) sets the buffer size; defaults to 2.0s."""
def __init__(self, in_, time, feedback, mix, max_time=2.0):
super().__init__()
self.in_ = in_
self.time = time
self.feedback = feedback
self.mix = mix
self.inputs = [in_, time, feedback, mix]
# max_time is treated as compile-time constant (sets buffer size).
self.max_t = float(max_time.value) if hasattr(max_time, 'value') else float(max_time)
self.buffer = None
self.size = 0
self.write_idx = 0
def process(self, n, sr):
if self.buffer is None:
self.size = max(int(self.max_t * sr), 1)
self.buffer = np.zeros(self.size, dtype=np.float32)
x = self.in_.output_buffer
t = float(self.time.output_buffer[0])
fb = float(self.feedback.output_buffer[0])
if fb > 0.99:
fb = 0.99
elif fb < 0.0:
fb = 0.0
mix = float(self.mix.output_buffer[0])
d = int(t * sr)
if d < 1:
d = 1
elif d >= self.size:
d = self.size - 1
out = np.empty(n, dtype=np.float32)
buf = self.buffer
size = self.size
w = self.write_idx
for i in range(n):
r = (w - d) % size
delayed = buf[r]
buf[w] = x[i] + fb * delayed
out[i] = (1.0 - mix) * x[i] + mix * delayed
w = (w + 1) % size
self.write_idx = w
return out
class BinOpNode(Node):
def __init__(self, op, a, b):
super().__init__()
self.op = op
self.a = a
self.b = b
self.inputs = [a, b]
def process(self, n, sr):
a = self.a.output_buffer
b = self.b.output_buffer
if self.op == '+':
return a + b
if self.op == '-':
return a - b
if self.op == '*':
return a * b
if self.op == '/':
return a / b
raise ValueError(self.op)
class Negate(Node):
def __init__(self, a):
super().__init__()
self.a = a
self.inputs = [a]
def process(self, n, sr):
return -self.a.output_buffer
class Poly(Node):
"""Polyphonic voice allocator. Holds N independent VoiceInstances and dispatches notes
from a step sequence at `rate` steps per second. Each note triggers an LRU voice for
`gate_duration` seconds. notes[i] == 0 is treated as a rest (no trigger)."""
def __init__(self, rate, gate_duration, instances, notes):
super().__init__()
self.rate = rate
self.gate_duration = gate_duration
self.instances = instances
self.notes = list(notes)
self.inputs = [rate, gate_duration]
self.t = 0.0
self.last_step = -1
def _alloc_voice(self):
# Pick the voice idle the longest (lowest last_on_at). If tie, first found.
best = self.instances[0]
for v in self.instances[1:]:
if v.last_on_at < best.last_on_at:
best = v
return best
def process(self, n, sr):
rate = float(self.rate.output_buffer[0])
gd = float(self.gate_duration.output_buffer[0])
# Trigger any step boundaries that fell into this block (catch up if multiple).
cur_step = int(self.t * rate)
while self.last_step < cur_step:
self.last_step += 1
note = self.notes[self.last_step % len(self.notes)]
if note > 0.0:
v = self._alloc_voice()
v.freq_slot.value = float(note)
v.gate_slot.value = 1.0
step_t = self.last_step / rate
v.gate_off_at = step_t + gd
v.last_on_at = step_t
# Release gates whose timeout has elapsed (block-rate granularity).
for v in self.instances:
if v.gate_off_at > 0.0 and self.t >= v.gate_off_at:
v.gate_slot.value = 0.0
v.gate_off_at = -1.0
# Render every voice's sub-graph and sum.
out = np.zeros(n, dtype=np.float32)
for v in self.instances:
for node in v.order:
node.output_buffer = node.process(n, sr)
out += v.output.output_buffer
self.t += n / sr
return out
NODE_REGISTRY = {
'osc': Osc,
'trig': Trig,
'adsr': Adsr,
'noise': Noise,
'filter': Filter,
'seq': Seq,
'delay': Delay,
}
# Positional arg indices that must be bare identifiers (treated as strings).
SYMBOLIC_ARGS = {
'osc': [0], # waveform name
'filter': [0], # filter kind
}