Skip to content

Commit

Permalink
Merge pull request #1062 from daslyfe/osc_time_fix
Browse files Browse the repository at this point in the history
fix OSC timing for recent scheduler updates
  • Loading branch information
daslyfe authored Apr 21, 2024
2 parents 0a3694f + 57ad278 commit 9348a80
Show file tree
Hide file tree
Showing 7 changed files with 134 additions and 121 deletions.
5 changes: 5 additions & 0 deletions packages/core/util.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@ export const valueToMidi = (value, fallbackValue) => {
return fallbackValue;
};

// used to schedule external event like midi and osc out
export const getEventOffsetMs = (targetTimeSeconds, currentTimeSeconds) => {
return (targetTimeSeconds - currentTimeSeconds) * 1000;
};

/**
* @deprecated does not appear to be referenced or invoked anywhere in the codebase
* @noAutocomplete
Expand Down
6 changes: 3 additions & 3 deletions packages/desktopbridge/midibridge.mjs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { Invoke } from './utils.mjs';
import { Pattern, noteToMidi } from '@strudel/core';
import { Pattern, getEventOffsetMs, noteToMidi } from '@strudel/core';

const ON_MESSAGE = 0x90;
const OFF_MESSAGE = 0x80;
Expand All @@ -9,8 +9,8 @@ Pattern.prototype.midi = function (output) {
return this.onTrigger((time_deprecate, hap, currentTime, cps, targetTime) => {
let { note, nrpnn, nrpv, ccn, ccv, velocity = 0.9, gain = 1 } = hap.value;
//magic number to get audio engine to line up, can probably be calculated somehow
const latency = 0.034;
const offset = (targetTime - currentTime + latency) * 1000;
const latencyMs = 34;
const offset = getEventOffsetMs(targetTime, currentTime) + latencyMs;
velocity = Math.floor(gain * velocity * 100);
const duration = Math.floor((hap.duration.valueOf() / cps) * 1000 - 10);
const roundedOffset = Math.round(offset);
Expand Down
6 changes: 3 additions & 3 deletions packages/desktopbridge/oscbridge.mjs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import { parseNumeral, Pattern } from '@strudel/core';
import { parseNumeral, Pattern, getEventOffsetMs } from '@strudel/core';
import { Invoke } from './utils.mjs';

Pattern.prototype.osc = function () {
return this.onTrigger(async (time, hap, currentTime, cps = 1) => {
return this.onTrigger(async (time, hap, currentTime, cps = 1, targetTime) => {
hap.ensureObjectValue();
const cycle = hap.wholeOrPart().begin.valueOf();
const delta = hap.duration.valueOf();
Expand All @@ -13,7 +13,7 @@ Pattern.prototype.osc = function () {

const params = [];

const timestamp = Math.round(Date.now() + (time - currentTime) * 1000);
const timestamp = Math.round(Date.now() + getEventOffsetMs(targetTime, currentTime));

Object.keys(controls).forEach((key) => {
const val = controls[key];
Expand Down
7 changes: 3 additions & 4 deletions packages/midi/midi.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ This program is free software: you can redistribute it and/or modify it under th
*/

import * as _WebMidi from 'webmidi';
import { Pattern, isPattern, logger, ref } from '@strudel/core';
import { Pattern, getEventOffsetMs, isPattern, logger, ref } from '@strudel/core';
import { noteToMidi } from '@strudel/core';
import { Note } from 'webmidi';
// if you use WebMidi from outside of this package, make sure to import that instance:
Expand Down Expand Up @@ -120,10 +120,9 @@ Pattern.prototype.midi = function (output) {
const device = getDevice(output, WebMidi.outputs);
hap.ensureObjectValue();
//magic number to get audio engine to line up, can probably be calculated somehow
const latency = 0.034;
const latencyMs = 34;
// passing a string with a +num into the webmidi api adds an offset to the current time https://webmidijs.org/api/classes/Output
const timeOffsetString = `+${(targetTime - currentTime + latency) * 1000}`;

const timeOffsetString = `+${getEventOffsetMs(targetTime, currentTime) + latencyMs}`;
// destructure value
let { note, nrpnn, nrpv, ccn, ccv, midichan = 1, midicmd, gain = 1, velocity = 0.9 } = hap.value;

Expand Down
7 changes: 4 additions & 3 deletions packages/osc/osc.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ This program is free software: you can redistribute it and/or modify it under th

import OSC from 'osc-js';

import { logger, parseNumeral, Pattern } from '@strudel/core';
import { logger, parseNumeral, Pattern, getEventOffsetMs } from '@strudel/core';

let connection; // Promise<OSC>
function connect() {
Expand Down Expand Up @@ -44,7 +44,7 @@ function connect() {
* @returns Pattern
*/
Pattern.prototype.osc = function () {
return this.onTrigger(async (time, hap, currentTime, cps = 1) => {
return this.onTrigger(async (time, hap, currentTime, cps = 1, targetTime) => {
hap.ensureObjectValue();
const osc = await connect();
const cycle = hap.wholeOrPart().begin.valueOf();
Expand All @@ -56,7 +56,8 @@ Pattern.prototype.osc = function () {
const keyvals = Object.entries(controls).flat();
// time should be audio time of onset
// currentTime should be current time of audio context (slightly before time)
const offset = (time - currentTime) * 1000;
const offset = getEventOffsetMs(targetTime, currentTime);

// timestamp in milliseconds used to trigger the osc bundle at a precise moment
const ts = Math.floor(Date.now() + offset);
const message = new OSC.Message('/dirt/play', ...keyvals);
Expand Down
2 changes: 1 addition & 1 deletion packages/osc/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ Copyright (C) 2022 Strudel contributors - see <https://github.com/tidalcycles/st
This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. You should have received a copy of the GNU Affero General Public License along with this program. If not, see <https://www.gnu.org/licenses/>.
*/

const OSC = require('osc-js');
import OSC from 'osc-js';

const config = {
receiver: 'ws', // @param {string} Where messages sent via 'send' method will be delivered to, 'ws' for Websocket clients, 'udp' for udp client
Expand Down
222 changes: 115 additions & 107 deletions src-tauri/src/oscbridge.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,22 @@
use rosc::{ encoder, OscTime };
use rosc::{ OscMessage, OscPacket, OscType, OscBundle };
use rosc::{encoder, OscTime};
use rosc::{OscBundle, OscMessage, OscPacket, OscType};

use std::net::UdpSocket;

use std::time::Duration;
use std::sync::Arc;
use tokio::sync::{ mpsc, Mutex };
use serde::Deserialize;
use std::sync::Arc;
use std::thread::sleep;
use std::time::Duration;
use tokio::sync::{mpsc, Mutex};

use crate::loggerbridge::Logger;
pub struct OscMsg {
pub msg_buf: Vec<u8>,
pub timestamp: u64,
pub msg_buf: Vec<u8>,
pub timestamp: u64,
}

pub struct AsyncInputTransmit {
pub inner: Mutex<mpsc::Sender<Vec<OscMsg>>>,
pub inner: Mutex<mpsc::Sender<Vec<OscMsg>>>,
}

const UNIX_OFFSET: u64 = 2_208_988_800; // 70 years in seconds
Expand All @@ -25,133 +25,141 @@ const NANOS_PER_SECOND: f64 = 1.0e9;
const SECONDS_PER_NANO: f64 = 1.0 / NANOS_PER_SECOND;

pub fn init(
logger: Logger,
async_input_receiver: mpsc::Receiver<Vec<OscMsg>>,
mut async_output_receiver: mpsc::Receiver<Vec<OscMsg>>,
async_output_transmitter: mpsc::Sender<Vec<OscMsg>>
logger: Logger,
async_input_receiver: mpsc::Receiver<Vec<OscMsg>>,
mut async_output_receiver: mpsc::Receiver<Vec<OscMsg>>,
async_output_transmitter: mpsc::Sender<Vec<OscMsg>>,
) {
tauri::async_runtime::spawn(async move { async_process_model(async_input_receiver, async_output_transmitter).await });
let message_queue: Arc<Mutex<Vec<OscMsg>>> = Arc::new(Mutex::new(Vec::new()));
/* ...........................................................
Listen For incoming messages and add to queue
............................................................*/
let message_queue_clone = Arc::clone(&message_queue);
tauri::async_runtime::spawn(async move {
loop {
if let Some(package) = async_output_receiver.recv().await {
let mut message_queue = message_queue_clone.lock().await;
let messages = package;
for message in messages {
(*message_queue).push(message);
}
}
}
});

let message_queue_clone = Arc::clone(&message_queue);
tauri::async_runtime::spawn(async move {
/* ...........................................................
Open OSC Ports
............................................................*/
let sock = UdpSocket::bind("127.0.0.1:57122").unwrap();
let to_addr = String::from("127.0.0.1:57120");
sock.set_nonblocking(true).unwrap();
sock.connect(to_addr).expect("could not connect to OSC address");

tauri::async_runtime::spawn(async move {
async_process_model(async_input_receiver, async_output_transmitter).await
});
let message_queue: Arc<Mutex<Vec<OscMsg>>> = Arc::new(Mutex::new(Vec::new()));
/* ...........................................................
Process queued messages
Listen For incoming messages and add to queue
............................................................*/

loop {
let mut message_queue = message_queue_clone.lock().await;

message_queue.retain(|message| {
let result = sock.send(&message.msg_buf);
if result.is_err() {
logger.log(
format!("OSC Message failed to send, the server might no longer be available"),
"error".to_string()
);
let message_queue_clone = Arc::clone(&message_queue);
tauri::async_runtime::spawn(async move {
loop {
if let Some(package) = async_output_receiver.recv().await {
let mut message_queue = message_queue_clone.lock().await;
let messages = package;
for message in messages {
(*message_queue).push(message);
}
}
}
return false;
});
});

sleep(Duration::from_millis(1));
}
});
let message_queue_clone = Arc::clone(&message_queue);
tauri::async_runtime::spawn(async move {
/* ...........................................................
Open OSC Ports
............................................................*/
let sock = UdpSocket::bind("127.0.0.1:57122").unwrap();
let to_addr = String::from("127.0.0.1:57120");
sock.set_nonblocking(true).unwrap();
sock.connect(to_addr)
.expect("could not connect to OSC address");

/* ...........................................................
Process queued messages
............................................................*/

loop {
let mut message_queue = message_queue_clone.lock().await;

message_queue.retain(|message| {
let result = sock.send(&message.msg_buf);
if result.is_err() {
logger.log(
format!(
"OSC Message failed to send, the server might no longer be available"
),
"error".to_string(),
);
}
return false;
});

sleep(Duration::from_millis(1));
}
});
}

pub async fn async_process_model(
mut input_reciever: mpsc::Receiver<Vec<OscMsg>>,
output_transmitter: mpsc::Sender<Vec<OscMsg>>
mut input_reciever: mpsc::Receiver<Vec<OscMsg>>,
output_transmitter: mpsc::Sender<Vec<OscMsg>>,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
while let Some(input) = input_reciever.recv().await {
let output = input;
output_transmitter.send(output).await?;
}
Ok(())
while let Some(input) = input_reciever.recv().await {
let output = input;
output_transmitter.send(output).await?;
}
Ok(())
}

#[derive(Deserialize)]
pub struct Param {
name: String,
value: String,
valueisnumber: bool,
name: String,
value: String,
valueisnumber: bool,
}
#[derive(Deserialize)]
pub struct MessageFromJS {
params: Vec<Param>,
timestamp: u64,
target: String,
params: Vec<Param>,
timestamp: u64,
target: String,
}
// Called from JS
#[tauri::command]
pub async fn sendosc(
messagesfromjs: Vec<MessageFromJS>,
state: tauri::State<'_, AsyncInputTransmit>
messagesfromjs: Vec<MessageFromJS>,
state: tauri::State<'_, AsyncInputTransmit>,
) -> Result<(), String> {
let async_proc_input_tx = state.inner.lock().await;
let mut messages_to_process: Vec<OscMsg> = Vec::new();
for m in messagesfromjs {
let mut args = Vec::new();
for p in m.params {
args.push(OscType::String(p.name));
if p.valueisnumber {
args.push(OscType::Float(p.value.parse().unwrap()));
} else {
args.push(OscType::String(p.value));
}
}
let async_proc_input_tx = state.inner.lock().await;
let mut messages_to_process: Vec<OscMsg> = Vec::new();
for m in messagesfromjs {
let mut args = Vec::new();
for p in m.params {
args.push(OscType::String(p.name));
if p.valueisnumber {
args.push(OscType::Float(p.value.parse().unwrap()));
} else {
args.push(OscType::String(p.value));
}
}

let duration_since_epoch = Duration::from_millis(m.timestamp) + Duration::new(UNIX_OFFSET, 0);
let duration_since_epoch =
Duration::from_millis(m.timestamp) + Duration::new(UNIX_OFFSET, 0);

let seconds = u32
::try_from(duration_since_epoch.as_secs())
.map_err(|_| "bit conversion failed for osc message timetag")?;
let seconds = u32::try_from(duration_since_epoch.as_secs())
.map_err(|_| "bit conversion failed for osc message timetag")?;

let nanos = duration_since_epoch.subsec_nanos() as f64;
let fractional = (nanos * SECONDS_PER_NANO * TWO_POW_32).round() as u32;
let nanos = duration_since_epoch.subsec_nanos() as f64;
let fractional = (nanos * SECONDS_PER_NANO * TWO_POW_32).round() as u32;

let timetag = OscTime::from((seconds, fractional));
let timetag = OscTime::from((seconds, fractional));

let packet = OscPacket::Message(OscMessage {
addr: m.target,
args,
});
let packet = OscPacket::Message(OscMessage {
addr: m.target,
args,
});

let bundle = OscBundle {
content: vec![packet],
timetag,
};
let bundle = OscBundle {
content: vec![packet],
timetag,
};

let msg_buf = encoder::encode(&OscPacket::Bundle(bundle)).unwrap();
let msg_buf = encoder::encode(&OscPacket::Bundle(bundle)).unwrap();

let message_to_process = OscMsg {
msg_buf,
timestamp: m.timestamp,
};
messages_to_process.push(message_to_process);
}
let message_to_process = OscMsg {
msg_buf,
timestamp: m.timestamp,
};
messages_to_process.push(message_to_process);
}

async_proc_input_tx.send(messages_to_process).await.map_err(|e| e.to_string())
async_proc_input_tx
.send(messages_to_process)
.await
.map_err(|e| e.to_string())
}

0 comments on commit 9348a80

Please sign in to comment.