Skip to content

Commit

Permalink
Update 0.5.5 (#11)
Browse files Browse the repository at this point in the history
- Added real-time reconfiguration feature
- Fixed errors related to queues
- Fixed errors of simultaneous access of several streams to sources
  • Loading branch information
romanin-rf authored Dec 30, 2024
2 parents ddf2c5e + d39123c commit a304143
Show file tree
Hide file tree
Showing 7 changed files with 155 additions and 25 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "seaplayer-audio"
version = "0.4.0"
version = "0.5.5"
description = "A library for async/sync playback audio."
repository = "https://github.com/romanin-rf/seaplayer-audio"
authors = ["Romanin <[email protected]>"]
Expand Down
36 changes: 28 additions & 8 deletions seaplayer_audio/audiosources/fileaudiosource.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import asyncio
from numpy import ndarray
from soundfile import SoundFile
from threading import Semaphore
# * Typing
from types import TracebackType
from typing_extensions import Optional, Type
Expand Down Expand Up @@ -31,6 +32,7 @@ def __init__(
) -> None:
self.name = os.path.abspath(str(filepath))
self.sfio = SoundFile(self.name, 'r', samplerate, channels, subtype, endian, format, closefd=closefd)
self.semaphore = Semaphore(1)
self.minfo = get_mutagen_info(self.name)
self.metadata = get_audio_metadata(self.sfio, self.minfo)
self.closefd = closefd
Expand Down Expand Up @@ -133,7 +135,10 @@ def read(
Returns:
ndarray: If out is specified, the data is written into the given array instead of creating a new array. In this case, the arguments *dtype* and *always_2d* are silently ignored! If *frames* is not given, it is obtained from the length of out.
"""
return self.sfio.read(frames, dtype, always_2d, **extra)
self.semaphore.acquire()
result = self.sfio.read(frames, dtype, always_2d, **extra)
self.semaphore.release()
return result

def readline(self, seconds: float=-1.0, dtype: AudioDType='float32', always_2d: bool=False, **extra: object):
"""Read from the file and return data (*1 second*) as NumPy array.
Expand All @@ -144,7 +149,10 @@ def readline(self, seconds: float=-1.0, dtype: AudioDType='float32', always_2d:
Returns:
ndarray: If out is specified, the data is written into the given array instead of creating a new array. In this case, the arguments *dtype* and *always_2d* are silently ignored! If *frames* is not given, it is obtained from the length of out.
"""
return self.sfio.read(int(seconds * self.sfio.samplerate), dtype, always_2d, **extra)
self.semaphore.acquire()
result = self.sfio.read(int(seconds * self.sfio.samplerate), dtype, always_2d, **extra)
self.semaphore.release()
return result

def seek(self, frames: int, whence=0) -> int:
"""Set the read position.
Expand All @@ -159,7 +167,10 @@ def seek(self, frames: int, whence=0) -> int:
Returns:
int: The new absolute read position in frames.
"""
return self.sfio.seek(frames, whence)
self.semaphore.acquire()
result = self.sfio.seek(frames, whence)
self.semaphore.release()
return result

def tell(self) -> int:
"""Return the current read position.
Expand Down Expand Up @@ -207,10 +218,10 @@ async def __aexit__(
if self.closefd:
await self.close()

async def seekable(self):
def seekable(self):
return super().seekable()

async def readable(self):
def readable(self):
return not self.closed

async def read(
Expand All @@ -220,13 +231,22 @@ async def read(
always_2d: bool=False,
**extra: object
) -> ndarray:
return await aiorun(self.loop, super().read, frames, dtype, always_2d, **extra)
self.semaphore.acquire()
result = await aiorun(self.loop, super().read, frames, dtype, always_2d, **extra)
self.semaphore.release()
return result

async def readline(self, seconds: float=-1.0, dtype: AudioDType='float32', always_2d: bool=False, **extra) -> ndarray:
return await aiorun(self.loop, super().readline, seconds, dtype, always_2d, **extra)
self.semaphore.acquire()
result = await aiorun(self.loop, super().readline, seconds, dtype, always_2d, **extra)
self.semaphore.release()
return result

async def seek(self, frames: int, whence=0) -> int:
return await aiorun(self.loop, super().seek, frames, whence)
self.semaphore.acquire()
result = await aiorun(self.loop, super().seek, frames, whence)
self.semaphore.release()
return result

async def tell(self) -> int:
return await aiorun(self.loop, super().tell)
Expand Down
19 changes: 16 additions & 3 deletions seaplayer_audio/audiosources/urlaudiosource.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
from numpy import ndarray
from soundfile import SoundFile
from threading import Semaphore
# > Typing
from typing_extensions import Optional, Self
# > Local Imports
from .._types import AudioSamplerate, AudioChannels, AudioSubType, AudioFormat, AudioEndians, AudioDType
from ..base import AudioSourceBase
from ..functions import get_mutagen_info, get_audio_metadata
Expand All @@ -27,6 +30,7 @@ def __init__(
self.sfio = SoundFile(self.urlio, 'r', samplerate, channels, subtype, endian, format, closefd=closefd)
self.minfo = get_mutagen_info(self.name)
self.metadata = get_audio_metadata(self.sfio, self.minfo)
self.semaphore = Semaphore(1)
self.closefd = closefd

# ^ Dander Methods
Expand Down Expand Up @@ -122,7 +126,10 @@ def read(
Returns:
ndarray: If out is specified, the data is written into the given array instead of creating a new array. In this case, the arguments *dtype* and *always_2d* are silently ignored! If *frames* is not given, it is obtained from the length of out.
"""
return self.sfio.read(frames, dtype, always_2d, **extra)
self.semaphore.acquire()
result = self.sfio.read(frames, dtype, always_2d, **extra)
self.semaphore.release()
return result

def readline(self, seconds: float=-1.0, dtype: AudioDType='float32', always_2d: bool=False, **extra: object):
"""Read from the file and return data (*1 second*) as NumPy array.
Expand All @@ -133,7 +140,10 @@ def readline(self, seconds: float=-1.0, dtype: AudioDType='float32', always_2d:
Returns:
ndarray: If out is specified, the data is written into the given array instead of creating a new array. In this case, the arguments *dtype* and *always_2d* are silently ignored! If *frames* is not given, it is obtained from the length of out.
"""
return self.sfio.read(int(seconds * self.sfio.samplerate), dtype, always_2d, **extra)
self.semaphore.acquire()
result = self.sfio.read(int(seconds * self.sfio.samplerate), dtype, always_2d, **extra)
self.semaphore.release()
return result

def seek(self, frames: int, whence=0) -> int:
"""Set the read position.
Expand All @@ -148,7 +158,10 @@ def seek(self, frames: int, whence=0) -> int:
Returns:
int: The new absolute read position in frames.
"""
return self.sfio.seek(frames, whence)
self.semaphore.acquire()
result = self.sfio.seek(frames, whence)
self.semaphore.release()
return result

def tell(self) -> int:
"""Return the current read position.
Expand Down
2 changes: 1 addition & 1 deletion seaplayer_audio/base/audiosource.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ async def __anext__(self) -> NoReturn:
"""!!! NOT IMPLEMENTED !!!"""
raise NotImplementedError

async def writable(self) -> bool:
def writable(self) -> bool:
return False

@deprecated('!!! NOT IMPLEMENTED !!!')
Expand Down
13 changes: 11 additions & 2 deletions seaplayer_audio/base/streamer.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,15 @@ def set_lock(self, __value: bool) -> None:
def is_busy(self) -> bool:
return False

def reconfigure(self,
samplerate: Optional[AudioSamplerate]=None,
channels: Optional[AudioChannels]=None,
dtype: Optional[AudioDType]=None,
) -> None:
self.samplerate = samplerate or 44100
self.channels = channels or 2
self.dtype = dtype or 'float32'

def run(self) -> None:
raise NotImplementedError

Expand Down Expand Up @@ -119,13 +128,13 @@ async def __aexit__(
await self.abort()
await self.stop()

async def set_lock(self, __value: bool) -> None:
def set_lock(self, __value: bool) -> None:
if __value:
self.state |= StreamerState.LOCKED
else:
self.state &= ~StreamerState.LOCKED

async def is_busy(self) -> bool:
def is_busy(self) -> bool:
return False

def run(self) -> None:
Expand Down
60 changes: 53 additions & 7 deletions seaplayer_audio/streamers/callbackstreamersnd.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import queue
import asyncio
from queue import Queue
from asyncio import AbstractEventLoop, Queue as AsyncQueue
Expand All @@ -8,6 +9,7 @@
from ..base import AsyncSoundDeviceStreamerBase, SoundDeviceStreamerBase, StreamerState

# ! Main Class

class CallbackSoundDeviceStreamer(SoundDeviceStreamerBase):
__steamer_type__ = 'callback-sounddevice'

Expand Down Expand Up @@ -37,7 +39,7 @@ def __callback__(self, outdata: ndarray, frames: int, time, status):
if self.buffer is None:
try:
d = self.queue.get_nowait()
except:
except queue.Empty:
return
if len(d) >= frames:
wdata = d[:frames]
Expand All @@ -50,7 +52,7 @@ def __callback__(self, outdata: ndarray, frames: int, time, status):
elif (len(self.buffer) < frames) and (self.queue.qsize() >= 1):
try:
d = self.queue.get_nowait()
except:
except queue.Empty:
return
wdata = self.buffer.copy()
self.buffer = None
Expand All @@ -72,6 +74,28 @@ def __callback__(self, outdata: ndarray, frames: int, time, status):
def is_busy(self) -> bool:
return self.queue.qsize() >= 1

def reconfigure(
self,
samplerate: Optional[AudioSamplerate]=None,
channels: Optional[AudioChannels]=None,
dtype: Optional[AudioDType]=None,
*,
restore_state: bool=True
) -> None:
super().reconfigure(samplerate, channels, dtype)
state = self.state
if StreamerState.RUNNING in self.state:
self.stop()
self.stream = OutputStream(
samplerate=self.samplerate,
channels=self.channels,
dtype=self.dtype,
device=self.device,
callback=self.__callback__
)
if restore_state and (StreamerState.RUNNING in state):
self.start()

@deprecated("!!! NOT IMPLEMENTED !!!")
def run(self) -> NoReturn:
raise NotImplementedError
Expand Down Expand Up @@ -132,8 +156,8 @@ def __callback__(self, outdata: ndarray, frames: int, time, status):
self.precallback(frames)
if self.buffer is None:
try:
d = asyncio.run_coroutine_threadsafe(self.queue.get_nowait(), self.loop).result()
except:
d = self.queue.get_nowait()
except queue.Empty:
return
if len(d) >= frames:
wdata = d[:frames]
Expand All @@ -145,8 +169,8 @@ def __callback__(self, outdata: ndarray, frames: int, time, status):
self.buffer = self.buffer[frames:]
elif (len(self.buffer) < frames) and (self.queue.qsize() >= 1):
try:
d = asyncio.run_coroutine_threadsafe(self.queue.get_nowait(), self.loop).result()
except:
d = self.queue.get_nowait()
except queue.Empty:
return
wdata = self.buffer.copy()
self.buffer = None
Expand All @@ -165,9 +189,31 @@ def __callback__(self, outdata: ndarray, frames: int, time, status):
wdata = npzeros((frames, self.channels), dtype=outdata.dtype)
outdata[:] = wdata

async def is_busy(self) -> bool:
def is_busy(self) -> bool:
return self.queue.qsize() >= self.queue.maxsize

def reconfigure(
self,
samplerate: Optional[AudioSamplerate]=None,
channels: Optional[AudioChannels]=None,
dtype: Optional[AudioDType]=None,
*,
restore_state: bool=True
) -> None:
super().reconfigure(samplerate, channels, dtype)
state = self.state
if StreamerState.RUNNING in self.state:
asyncio.run_coroutine_threadsafe(self.stop(), self.loop).result()
self.stream = OutputStream(
samplerate=self.samplerate,
channels=self.channels,
dtype=self.dtype,
device=self.device,
callback=self.__callback__
)
if restore_state and (StreamerState.RUNNING in state):
asyncio.run_coroutine_threadsafe(self.start(), self.loop).result()

@deprecated("!!! NOT IMPLEMENTED !!!")
async def run(self) -> NoReturn:
raise NotImplementedError
Expand Down
48 changes: 45 additions & 3 deletions seaplayer_audio/streamers/threadstreamersnd.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,34 @@ def __init__(
def is_busy(self) -> bool:
return self.queue.qsize() >= self.queue.maxsize

def reconfigure(
self,
samplerate: Optional[AudioSamplerate]=None,
channels: Optional[AudioChannels]=None,
dtype: Optional[AudioDType]=None,
*,
restore_state: bool=True
) -> None:
super().reconfigure(samplerate, channels, dtype)
state = self.state
if StreamerState.RUNNING in self.state:
self.stop()
self.stream = sd.OutputStream(
samplerate=self.samplerate,
channels=self.channels,
dtype=self.dtype,
device=self.device
)
if restore_state and (StreamerState.RUNNING in state):
self.start()

def run(self) -> None:
if not self.stream.active:
self.stream.start()
self.state |= StreamerState.STARTED
while StreamerState.RUNNING in self.state:
try:
data = self.queue.get(timeout=0.01)
data = self.queue.get_nowait()
self.stream.write(data)
except queue.Empty:
pass
Expand Down Expand Up @@ -106,16 +127,37 @@ def __init__(
self.state = StreamerState(StreamerState.LOCKED)
self.queue: AsyncQueue[np.ndarray] = AsyncQueue(1)

async def is_busy(self) -> bool:
def is_busy(self) -> bool:
return self.queue.qsize() >= self.queue.maxsize

def reconfigure(
self,
samplerate: Optional[AudioSamplerate]=None,
channels: Optional[AudioChannels]=None,
dtype: Optional[AudioDType]=None,
*,
restore_state: bool=True
) -> None:
super().reconfigure(samplerate, channels, dtype)
state = self.state
if StreamerState.RUNNING in self.state:
asyncio.run_coroutine_threadsafe(self.stop(), self.loop).result()
self.stream = sd.OutputStream(
samplerate=self.samplerate,
channels=self.channels,
dtype=self.dtype,
device=self.device
)
if restore_state and (StreamerState.RUNNING in state):
asyncio.run_coroutine_threadsafe(self.start(), self.loop).result()

def run(self) -> None:
if not self.stream.active:
self.stream.start()
self.state |= StreamerState.STARTED
while StreamerState.RUNNING in self.state:
try:
data = asyncio.run_coroutine_threadsafe(self.queue.get(timeout=0.01), self.loop).result()
data = self.queue.get_nowait()
self.stream.write(data)
except queue.Empty:
pass
Expand Down

0 comments on commit a304143

Please sign in to comment.