From b228d70c1d1be9a3a0edb77c6089b201d43c374b Mon Sep 17 00:00:00 2001 From: Matthew Kaney Date: Wed, 5 Jul 2023 16:44:16 -0400 Subject: [PATCH 1/4] Move bus handshake to per-target implementation --- src/Sound/Tidal/Stream.hs | 204 +++++++++++++++++++------------------- 1 file changed, 101 insertions(+), 103 deletions(-) diff --git a/src/Sound/Tidal/Stream.hs b/src/Sound/Tidal/Stream.hs index 9366cd7c0..3d9c117fc 100644 --- a/src/Sound/Tidal/Stream.hs +++ b/src/Sound/Tidal/Stream.hs @@ -65,7 +65,6 @@ import Sound.Tidal.Version import Sound.Tidal.StreamTypes as Sound.Tidal.Stream data Stream = Stream {sConfig :: Config, - sBusses :: MVar [Int], sStateMV :: MVar ValueMap, -- sOutput :: MVar ControlSignal, sLink :: Link.AbletonLink, @@ -80,9 +79,9 @@ data Cx = Cx {cxTarget :: Target, cxUDP :: O.Udp, cxOSCs :: [OSC], cxAddr :: N.AddrInfo, - cxBusAddr :: Maybe N.AddrInfo + cxBusAddr :: Maybe N.AddrInfo, + cxBusses :: Maybe (MVar [Int]) } - deriving (Show) data StampStyle = BundleStamp | MessageStamp @@ -213,7 +212,6 @@ startStream :: Config -> [(Target, [OSC])] -> IO Stream startStream config oscmap = do sMapMV <- newMVar Map.empty pMapMV <- newMVar Map.empty - bussesMV <- newMVar [] globalFMV <- newMVar id actionsMV <- newEmptyMVar @@ -221,20 +219,20 @@ startStream config oscmap verbose config $ "Listening for external controls on " ++ cCtrlAddr config ++ ":" ++ show (cCtrlPort config) listen <- openListener config - cxs <- mapM (\(target, os) -> do remote_addr <- resolve (oAddress target) (show $ oPort target) - remote_bus_addr <- if isJust $ oBusPort target - then Just <$> resolve (oAddress target) (show $ fromJust $ oBusPort target) - else return Nothing + cxs <- mapM (\(target, os) -> do remote_addr <- resolve (oAddress target) (oPort target) + remote_bus_addr <- sequence (resolve (oAddress target) <$> (oBusPort target)) + remote_busses <- sequence (oBusPort target >> (Just $ newMVar [])) let broadcast = if cCtrlBroadcast config then 1 else 0 u <- O.udp_socket (\sock sockaddr -> do N.setSocketOption sock N.Broadcast broadcast N.connect sock sockaddr ) (oAddress target) (oPort target) - return $ Cx {cxUDP = u, cxAddr = remote_addr, cxBusAddr = remote_bus_addr, cxTarget = target, cxOSCs = os} + let cx = Cx {cxUDP = u, cxAddr = remote_addr, cxBusAddr = remote_bus_addr, cxBusses = remote_busses, cxTarget = target, cxOSCs = os} + handshake cx config + return cx ) oscmap let bpm = (coerce defaultCps) * 60 * (cBeatsPerCycle config) abletonLink <- Link.create bpm let stream = Stream {sConfig = config, - sBusses = bussesMV, sStateMV = sMapMV, sLink = abletonLink, sListen = listen, @@ -243,7 +241,6 @@ startStream config oscmap sGlobalFMV = globalFMV, sCxs = cxs } - sendHandshakes stream let ac = T.ActionHandler { T.onTick = onTick stream, T.onSingleTick = onSingleTick stream, @@ -252,35 +249,51 @@ startStream config oscmap -- Spawn a thread that acts as the clock _ <- T.clocked config sMapMV pMapMV actionsMV ac abletonLink -- Spawn a thread to handle OSC control messages - _ <- forkIO $ ctrlResponder 0 config stream + _ <- forkIO $ ctrlResponder config stream return stream --- It only really works to handshake with one target at the moment.. -sendHandshakes :: Stream -> IO () -sendHandshakes stream = mapM_ sendHandshake $ filter (oHandshake . cxTarget) (sCxs stream) - where sendHandshake cx = if (isJust $ sListen stream) - then - do -- send it _from_ the udp socket we're listening to, so the - -- replies go back there - sendO False (sListen stream) cx $ O.Message "/dirt/handshake" [] - else - hPutStrLn stderr "Can't handshake with SuperCollider without control port." - -sendO :: Bool -> (Maybe O.Udp) -> Cx -> O.Message -> IO () -sendO isBusMsg (Just listen) cx msg = O.sendTo listen (O.Packet_Message msg) (N.addrAddress addr) +handshake :: Cx -> Config -> IO () +handshake Cx { cxUDP = udp, cxBusses = Just bussesMV } c = sendHandshake >> listen 0 + where + sendHandshake :: IO () + sendHandshake = O.sendMessage udp (O.Message "/dirt/handshake" []) + listen :: Int -> IO () + listen waits = do ms <- recvMessagesTimeout 2 udp + if (null ms) + then do checkHandshake waits -- there was a timeout, check handshake + listen (waits+1) + else do mapM_ respond ms + listen 0 + checkHandshake :: Int -> IO () + checkHandshake waits = do busses <- readMVar bussesMV + when (null busses) $ do when (waits == 0) $ verbose c $ "Waiting for SuperDirt (v.1.7.2 or higher).." + sendHandshake + respond :: O.Message -> IO () + respond (O.Message "/dirt/hello" _) = sendHandshake + respond (O.Message "/dirt/handshake/reply" xs) = do prev <- swapMVar bussesMV $ bufferIndices xs + -- Only report the first time.. + when (null prev) $ verbose c $ "Connected to SuperDirt." + return () + respond _ = return () + bufferIndices :: [O.Datum] -> [Int] + bufferIndices [] = [] + bufferIndices (x:xs') | x == (O.AsciiString $ O.ascii "&controlBusIndices") = catMaybes $ takeWhile isJust $ map O.datum_integral xs' + | otherwise = bufferIndices xs' +handshake _ _ = return () + +sendO :: Bool -> Cx -> O.Message -> IO () +sendO isBusMsg cx msg = O.sendTo (cxUDP cx) (O.Packet_Message msg) (N.addrAddress addr) where addr | isBusMsg && isJust (cxBusAddr cx) = fromJust $ cxBusAddr cx | otherwise = cxAddr cx -sendO _ Nothing cx msg = O.sendMessage (cxUDP cx) msg -sendBndl :: Bool -> (Maybe O.Udp) -> Cx -> O.Bundle -> IO () -sendBndl isBusMsg (Just listen) cx bndl = O.sendTo listen (O.Packet_Bundle bndl) (N.addrAddress addr) +sendBndl :: Bool -> Cx -> O.Bundle -> IO () +sendBndl isBusMsg cx bndl = O.sendTo (cxUDP cx) (O.Packet_Bundle bndl) (N.addrAddress addr) where addr | isBusMsg && isJust (cxBusAddr cx) = fromJust $ cxBusAddr cx | otherwise = cxAddr cx -sendBndl _ Nothing cx bndl = O.sendBundle (cxUDP cx) bndl -resolve :: String -> String -> IO N.AddrInfo +resolve :: String -> Int -> IO N.AddrInfo resolve host port = do let hints = N.defaultHints { N.addrSocketType = N.Stream } - addr:_ <- N.getAddrInfo (Just hints) (Just host) (Just port) + addr:_ <- N.getAddrInfo (Just hints) (Just host) (Just $ show port) return addr -- Start an instance of Tidal with superdirt OSC @@ -348,8 +361,8 @@ playStack pMap = stack $ map pattern activeStates else not (mute pState) ) $ Map.elems pMap -toOSC :: [Int] -> ProcessedEvent -> OSC -> [(Double, Bool, O.Message)] -toOSC busses pe osc@(OSC _ _) +toOSC :: Maybe [Int] -> ProcessedEvent -> OSC -> [(Double, Bool, O.Message)] +toOSC maybeBusses pe osc@(OSC _ _) = catMaybes (playmsg:busmsgs) -- playmap is a ValueMap where the keys don't start with ^ and are not "" -- busmap is a ValueMap containing the rest of the keys from the event value @@ -384,8 +397,8 @@ toOSC busses pe osc@(OSC _ _) O.Message mungedPath vs ) | otherwise = Nothing - toBus n | null busses = n - | otherwise = busses !!! n + toBus n | Just busses <- maybeBusses, (not . null) busses = busses !!! n + | otherwise = n busmsgs = map (\(('^':k), (VI b)) -> do v <- Map.lookup k playmap return $ (tsPart, @@ -516,7 +529,6 @@ doTick stream st ops sMap = setPreviousPatternOrSilence stream return sMap) (do pMap <- readMVar (sPMapMV stream) - busses <- readMVar (sBusses stream) sGlobalF <- readMVar (sGlobalFMV stream) bpm <- (T.getTempo ops) let @@ -534,14 +546,15 @@ doTick stream st ops sMap = (sMap'', es') = resolveState sMap' es tes <- processCps ops es' -- For each OSC target - forM_ cxs $ \cx@(Cx target _ oscs _ _) -> do + forM_ cxs $ \cx@(Cx target _ oscs _ _ bussesMV) -> do + busses <- mapM readMVar bussesMV -- Latency is configurable per target. -- Latency is only used when sending events live. let latency = oLatency target ms = concatMap (\e -> concatMap (toOSC busses e) oscs) tes -- send the events to the OSC target forM_ ms $ \ m -> (do - send (sListen stream) cx latency extraLatency m) `E.catch` \ (e :: E.SomeException) -> do + send cx latency extraLatency m) `E.catch` \ (e :: E.SomeException) -> do hPutStrLn stderr $ "Failed to send. Is the '" ++ oName target ++ "' target running? " ++ show e sMap'' `seq` return sMap'') @@ -557,13 +570,13 @@ setPreviousPatternOrSilence stream = -- Send events early using timestamp in the OSC bundle - used by Superdirt -- Send events early by adding timestamp to the OSC message - used by Dirt -- Send events live by delaying the thread -send :: Maybe O.Udp -> Cx -> Double -> Double -> (Double, Bool, O.Message) -> IO () -send listen cx latency extraLatency (time, isBusMsg, m) - | oSchedule target == Pre BundleStamp = sendBndl isBusMsg listen cx $ O.Bundle timeWithLatency [m] - | oSchedule target == Pre MessageStamp = sendO isBusMsg listen cx $ addtime m +send :: Cx -> Double -> Double -> (Double, Bool, O.Message) -> IO () +send cx latency extraLatency (time, isBusMsg, m) + | oSchedule target == Pre BundleStamp = sendBndl isBusMsg cx $ O.Bundle timeWithLatency [m] + | oSchedule target == Pre MessageStamp = sendO isBusMsg cx $ addtime m | otherwise = do _ <- forkOS $ do now <- O.time threadDelay $ floor $ (timeWithLatency - now) * 1000000 - sendO isBusMsg listen cx m + sendO isBusMsg cx m return () where addtime (O.Message mpath params) = O.Message mpath ((O.int32 sec):((O.int32 usec):params)) ut = O.ntpr_to_posix timeWithLatency @@ -684,66 +697,51 @@ openListener c catchAny = E.catch -- Listen to and act on OSC control messages -ctrlResponder :: Int -> Config -> Stream -> IO () -ctrlResponder waits c (stream@(Stream {sListen = Just sock})) - = do ms <- recvMessagesTimeout 2 sock - if (null ms) - then do checkHandshake -- there was a timeout, check handshake - ctrlResponder (waits+1) c stream - else do mapM_ act ms - ctrlResponder 0 c stream - where - checkHandshake = do busses <- readMVar (sBusses stream) - when (null busses) $ do when (waits == 0) $ verbose c $ "Waiting for SuperDirt (v.1.7.2 or higher).." - sendHandshakes stream - - act (O.Message "/dirt/hello" _) = sendHandshakes stream - act (O.Message "/dirt/handshake/reply" xs) = do prev <- swapMVar (sBusses stream) $ bufferIndices xs - -- Only report the first time.. - when (null prev) $ verbose c $ "Connected to SuperDirt." - return () - where - bufferIndices [] = [] - bufferIndices (x:xs') | x == (O.AsciiString $ O.ascii "&controlBusIndices") = catMaybes $ takeWhile isJust $ map O.datum_integral xs' - | otherwise = bufferIndices xs' - -- External controller commands - act (O.Message "/ctrl" (O.Int32 k:v:[])) - = act (O.Message "/ctrl" [O.string $ show k,v]) - act (O.Message "/ctrl" (O.AsciiString k:v@(O.Float _):[])) - = add (O.ascii_to_string k) (VF (fromJust $ O.datum_floating v)) - act (O.Message "/ctrl" (O.AsciiString k:O.AsciiString v:[])) - = add (O.ascii_to_string k) (VS (O.ascii_to_string v)) - act (O.Message "/ctrl" (O.AsciiString k:O.Int32 v:[])) - = add (O.ascii_to_string k) (VI (fromIntegral v)) - -- Stream playback commands - act (O.Message "/mute" (k:[])) - = withID k $ streamMute stream - act (O.Message "/unmute" (k:[])) - = withID k $ streamUnmute stream - act (O.Message "/solo" (k:[])) - = withID k $ streamSolo stream - act (O.Message "/unsolo" (k:[])) - = withID k $ streamUnsolo stream - act (O.Message "/muteAll" []) - = streamMuteAll stream - act (O.Message "/unmuteAll" []) - = streamUnmuteAll stream - act (O.Message "/unsoloAll" []) - = streamUnsoloAll stream - act (O.Message "/hush" []) - = streamHush stream - act (O.Message "/silence" (k:[])) - = withID k $ streamSilence stream - act m = hPutStrLn stderr $ "Unhandled OSC: " ++ show m - add :: String -> Value -> IO () - add k v = do sMap <- takeMVar (sStateMV stream) - putMVar (sStateMV stream) $ Map.insert k v sMap - return () - withID :: O.Datum -> (ID -> IO ()) -> IO () - withID (O.AsciiString k) func = func $ (ID . O.ascii_to_string) k - withID (O.Int32 k) func = func $ (ID . show) k - withID _ _ = return () -ctrlResponder _ _ _ = return () +ctrlResponder :: Config -> Stream -> IO () +ctrlResponder c (stream@(Stream {sListen = Just sock})) = loop + where + loop :: IO () + loop = do O.recvMessages sock >>= mapM_ act + loop + -- External controller commands + act :: O.Message -> IO () + act (O.Message "/ctrl" (O.Int32 k:v:[])) + = act (O.Message "/ctrl" [O.string $ show k,v]) + act (O.Message "/ctrl" (O.AsciiString k:v@(O.Float _):[])) + = add (O.ascii_to_string k) (VF (fromJust $ O.datum_floating v)) + act (O.Message "/ctrl" (O.AsciiString k:O.AsciiString v:[])) + = add (O.ascii_to_string k) (VS (O.ascii_to_string v)) + act (O.Message "/ctrl" (O.AsciiString k:O.Int32 v:[])) + = add (O.ascii_to_string k) (VI (fromIntegral v)) + -- Stream playback commands + act (O.Message "/mute" (k:[])) + = withID k $ streamMute stream + act (O.Message "/unmute" (k:[])) + = withID k $ streamUnmute stream + act (O.Message "/solo" (k:[])) + = withID k $ streamSolo stream + act (O.Message "/unsolo" (k:[])) + = withID k $ streamUnsolo stream + act (O.Message "/muteAll" []) + = streamMuteAll stream + act (O.Message "/unmuteAll" []) + = streamUnmuteAll stream + act (O.Message "/unsoloAll" []) + = streamUnsoloAll stream + act (O.Message "/hush" []) + = streamHush stream + act (O.Message "/silence" (k:[])) + = withID k $ streamSilence stream + act m = hPutStrLn stderr $ "Unhandled OSC: " ++ show m + add :: String -> Value -> IO () + add k v = do sMap <- takeMVar (sStateMV stream) + putMVar (sStateMV stream) $ Map.insert k v sMap + return () + withID :: O.Datum -> (ID -> IO ()) -> IO () + withID (O.AsciiString k) func = func $ (ID . O.ascii_to_string) k + withID (O.Int32 k) func = func $ (ID . show) k + withID _ _ = return () +ctrlResponder _ _ = return () verbose :: Config -> String -> IO () verbose c s = when (cVerbose c) $ putStrLn s From 8f6e3f72a7c1ad2bbd87637921577c19cb2e39d6 Mon Sep 17 00:00:00 2001 From: Matthew Kaney Date: Thu, 6 Jul 2023 18:45:06 -0400 Subject: [PATCH 2/4] Start adding tests for handshake --- test/Sound/Tidal/StreamTest.hs | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/test/Sound/Tidal/StreamTest.hs b/test/Sound/Tidal/StreamTest.hs index 098208487..61b1dc8ee 100644 --- a/test/Sound/Tidal/StreamTest.hs +++ b/test/Sound/Tidal/StreamTest.hs @@ -4,13 +4,15 @@ module Sound.Tidal.StreamTest where import Test.Microspec +import Sound.Tidal.Config import Sound.Tidal.Stream import Sound.Tidal.Types import qualified Sound.Osc.Fd as O import qualified Data.Map.Strict as M +import Control.Concurrent.MVar -run :: Microspec () -run = +main :: Microspec () +main = describe "Sound.Tidal.Stream" $ do describe "toDatum" $ do it "should convert VN to osc float" $ do @@ -35,3 +37,9 @@ run = getString (M.singleton "s" (VS "sn")) "s=bd" `shouldBe` Just "sn" it "should work for missing params with fallback expressions" $ do getString M.empty "s=bd" `shouldBe` Just "bd" + + describe "handshake" $ do + it "should only handshake when a busPort is set" $ monadicIO $ do + superdirtHandshake <- run $ newMVar False + run $ startStream defaultConfig [(superdirtTarget, [superdirtShape])] + (== True) <$> run (readMVar superdirtHandshake) \ No newline at end of file From 8b4b5fa7e82a955affb9eae3f0de05dfb0d0cf9e Mon Sep 17 00:00:00 2001 From: Matthew Kaney Date: Mon, 17 Jul 2023 17:10:32 -0400 Subject: [PATCH 3/4] Fix bugs with per-context busmaps --- src/Sound/Tidal/Stream.hs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/Sound/Tidal/Stream.hs b/src/Sound/Tidal/Stream.hs index 3d9c117fc..b62bec765 100644 --- a/src/Sound/Tidal/Stream.hs +++ b/src/Sound/Tidal/Stream.hs @@ -224,10 +224,10 @@ startStream config oscmap remote_busses <- sequence (oBusPort target >> (Just $ newMVar [])) let broadcast = if cCtrlBroadcast config then 1 else 0 u <- O.udp_socket (\sock sockaddr -> do N.setSocketOption sock N.Broadcast broadcast - N.connect sock sockaddr + -- N.connect sock sockaddr ) (oAddress target) (oPort target) let cx = Cx {cxUDP = u, cxAddr = remote_addr, cxBusAddr = remote_bus_addr, cxBusses = remote_busses, cxTarget = target, cxOSCs = os} - handshake cx config + _ <- forkIO $ handshake cx config return cx ) oscmap let bpm = (coerce defaultCps) * 60 * (cBeatsPerCycle config) @@ -253,10 +253,10 @@ startStream config oscmap return stream handshake :: Cx -> Config -> IO () -handshake Cx { cxUDP = udp, cxBusses = Just bussesMV } c = sendHandshake >> listen 0 +handshake Cx { cxUDP = udp, cxBusses = Just bussesMV, cxAddr = addr } c = sendHandshake >> listen 0 where sendHandshake :: IO () - sendHandshake = O.sendMessage udp (O.Message "/dirt/handshake" []) + sendHandshake = O.sendTo udp (O.Packet_Message $ O.Message "/dirt/handshake" []) (N.addrAddress addr) listen :: Int -> IO () listen waits = do ms <- recvMessagesTimeout 2 udp if (null ms) From cf6377e478a2e6dcbcc61a0204c8fb286dfa5109 Mon Sep 17 00:00:00 2001 From: Matthew Kaney Date: Mon, 17 Jul 2023 20:50:46 -0400 Subject: [PATCH 4/4] Remove handshake tests for now This reverts commit 8f6e3f72a7c1ad2bbd87637921577c19cb2e39d6. --- test/Sound/Tidal/StreamTest.hs | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/test/Sound/Tidal/StreamTest.hs b/test/Sound/Tidal/StreamTest.hs index 61b1dc8ee..098208487 100644 --- a/test/Sound/Tidal/StreamTest.hs +++ b/test/Sound/Tidal/StreamTest.hs @@ -4,15 +4,13 @@ module Sound.Tidal.StreamTest where import Test.Microspec -import Sound.Tidal.Config import Sound.Tidal.Stream import Sound.Tidal.Types import qualified Sound.Osc.Fd as O import qualified Data.Map.Strict as M -import Control.Concurrent.MVar -main :: Microspec () -main = +run :: Microspec () +run = describe "Sound.Tidal.Stream" $ do describe "toDatum" $ do it "should convert VN to osc float" $ do @@ -37,9 +35,3 @@ main = getString (M.singleton "s" (VS "sn")) "s=bd" `shouldBe` Just "sn" it "should work for missing params with fallback expressions" $ do getString M.empty "s=bd" `shouldBe` Just "bd" - - describe "handshake" $ do - it "should only handshake when a busPort is set" $ monadicIO $ do - superdirtHandshake <- run $ newMVar False - run $ startStream defaultConfig [(superdirtTarget, [superdirtShape])] - (== True) <$> run (readMVar superdirtHandshake) \ No newline at end of file