1
0
Fork 0
mirror of https://github.com/shlomif/PySolFC.git synced 2025-04-05 00:02:29 -04:00
PySolFC/pysollib/pysolaudio.py
2025-03-09 15:13:54 +02:00

626 lines
18 KiB
Python

#!/usr/bin/env python
# -*- mode: python; coding: utf-8; -*-
# ---------------------------------------------------------------------------##
#
# Copyright (C) 1998-2003 Markus Franz Xaver Johannes Oberhumer
# Copyright (C) 2003 Mt. Hood Playing Card Co.
# Copyright (C) 2005-2009 Skomoroh
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# ---------------------------------------------------------------------------##
# imports
import os
import traceback
from threading import Thread
try:
import pysolsoundserver
except ImportError:
pysolsoundserver = None
# ************************************************************************
# * basic audio client
# ************************************************************************
class AbstractAudioClient:
EXTENSIONS = r"\.((wav)|(it)|(mod)|(mp3)|(pym)|(s3m)|(xm))$"
CAN_PLAY_SOUND = False
CAN_PLAY_MUSIC = False
def __init__(self):
self.server = None
self.audiodev = None
self.connected = 0
self.app = None
self.sample_priority = -1
self.sample_loop = 0
self.music_priority = -1
self.music_loop = 0
def __del__(self):
self.destroy()
# start server - set self.server on success (may also set self.audiodev)
def startServer(self):
pass
# connect to server - set self.audiodev on success
def connectServer(self, app):
assert app
self.app = app
if self.server is not None:
try:
if self._connectServer():
self.connected = 1
except Exception:
if traceback:
traceback.print_exc()
self.destroy()
# disconnect and stop server
def destroy(self):
if self.audiodev:
self._destroy()
self.server = None
self.audiodev = None
self.connected = 0
self.app = None
#
# high-level interface
#
def playSample(self, name, priority=0, loop=0, volume=-1):
# print 'AbstractAudioClient.playSample', name
if self.audiodev is None or not self.app or not self.app.opt.sound:
return 0
if priority <= self.sample_priority and self.sample_loop:
return 0
obj = self.app.sample_manager.getByName(name)
if not obj or not obj.absname:
return 0
try:
if self._playSample(obj.absname, priority, loop, volume):
self.sample_priority = priority
self.sample_loop = loop
return 1
except Exception:
if traceback:
traceback.print_exc()
return 0
def stopSamples(self):
if self.audiodev is None:
return
try:
self._stopSamples()
except Exception:
if traceback:
traceback.print_exc()
self.sample_priority = -1
self.sample_loop = 0
def stopSamplesLoop(self):
if self.audiodev is None:
return
try:
self._stopSamplesLoop()
except Exception:
if traceback:
traceback.print_exc()
self.sample_priority = -1
self.sample_loop = 0
#
# subclass - core implementation
#
def _connectServer(self):
return 0
def _destroy(self):
pass
def _playSample(self, filename, priority, loop, volume):
return 0
def _stopSamples(self):
pass
def _stopSamplesLoop(self):
self._stopSamples()
#
# subclass - extensions
#
def getMusicInfo(self):
return -1
def playContinuousMusic(self, music_list):
pass
def playNextMusic(self):
pass
def updateSettings(self):
pass
# ************************************************************************
# * pysolsoundserver module
# ************************************************************************
class PysolSoundServerModuleClient(AbstractAudioClient):
CAN_PLAY_SOUND = True
CAN_PLAY_MUSIC = True
def __init__(self):
AbstractAudioClient.__init__(self)
def startServer(self):
# use the module
try:
import pysolsoundserver
self.audiodev = pysolsoundserver
self.audiodev.init()
self.server = 1
except Exception:
if traceback:
traceback.print_exc()
self.server = None
self.audiodev = None
def cmd(self, cmd):
return self.audiodev.cmd(cmd)
# connect to server
def _connectServer(self):
r = self.cmd("protocol 6")
if r != 0:
return 0
# self.cmd("debug 1")
return 1
# disconnect and stop server
def _destroy(self):
self.audiodev.exit()
#
#
#
def _playSample(self, filename, priority, loop, volume):
self.cmd("playwav '%s' %d %d %d %d"
% (filename, -1, priority, loop, volume))
return 1
def _stopSamples(self):
self.cmd("stopwav")
def _stopSamplesLoop(self):
self.cmd("stopwavloop")
def getMusicInfo(self):
if self.audiodev:
return self.audiodev.getMusicInfo()
return -1
def playContinuousMusic(self, music_list):
if self.audiodev is None or not self.app or not self.app.opt.music:
return
try:
loop = 999999
for music in music_list:
if music.absname:
self.cmd("queuemus '%s' %d %d %d %d"
% (music.absname, music.index, 0, loop,
music.volume))
self.cmd("startqueue")
except Exception:
if traceback:
traceback.print_exc()
def playNextMusic(self):
self.cmd("nextmus")
def updateSettings(self):
if self.audiodev is None or not self.app:
return
s, m = 0, 0
if self.app.opt.sound:
s = self.app.opt.sound_sample_volume
if self.app.opt.music:
m = self.app.opt.sound_music_volume
try:
self.cmd("setwavvol %d" % s)
self.cmd("setmusvol %d" % m)
except Exception:
if traceback:
traceback.print_exc()
# ************************************************************************
# * Kivy audio
# ************************************************************************
class KivyAudioClient(AbstractAudioClient):
CAN_PLAY_SOUND = True
CAN_PLAY_MUSIC = False
def __init__(self):
AbstractAudioClient.__init__(self)
from pysollib.kivy.LApp import LSoundLoader
self.audiodev = LSoundLoader
self.sound = None
self.queue = []
self.sounds = {}
def startServer(self):
pass
def _condPlaySample(self):
# print("_condPlaySample: queue = %s" % self.sounds)
for k in self.sounds:
# print("%s" % (k))
pass
if self.sound:
sound = self.sound
sound.stop()
return
if self.sound is None:
if len(self.queue) > 0:
# print("Sound play start")
filename = self.queue[0]
self.queue = self.queue[1:]
self.sound = self.sounds[filename]
if self.sound:
vol = float(self.app.opt.sound_sample_volume)/100.0
self.sound.volume = vol
self.sound.play()
print("Sound plays %s" % self.sound.source)
# print("Sound is %.3f seconds" % self.sound.length)
# print("Sound volume is %s" % self.sound.volume)
else:
# print("Sound queue is empty")
pass
else:
# print("Sound play start deferred")
pass
def _playSample(self, filename, priority, loop, volume):
self.queue.append(filename)
if filename not in self.sounds:
sound = self.audiodev.load(filename)
sound.bind(on_stop=self._endSample)
self.sounds[filename] = sound
# print("Sound found at %s" % sound.source)
self._condPlaySample()
def _endSample(self, a):
print('Sound: stopped, %s' % self.sound)
self.sound = None
self._condPlaySample()
def _stopSamples(self):
print("Sound play stop")
self.queue = []
if self.sound:
self.sound.stop()
self.sound = None
# ************************************************************************
# * Win32 winsound audio
# ************************************************************************
class Win32AudioClient(AbstractAudioClient):
CAN_PLAY_SOUND = True
CAN_PLAY_MUSIC = False
def __init__(self):
AbstractAudioClient.__init__(self)
import winsound
self.audiodev = winsound
def startServer(self):
pass
def _playSample(self, filename, priority, loop, volume):
a = self.audiodev
flags = a.SND_FILENAME | a.SND_NODEFAULT | a.SND_NOWAIT | a.SND_ASYNC
if loop:
flags = flags | a.SND_LOOP
if priority <= self.sample_priority:
flags = flags | a.SND_NOSTOP
# print filename, flags, priority
try:
a.PlaySound(filename, flags)
return 1
except Exception:
pass
return 0
def _stopSamples(self):
a = self.audiodev
flags = a.SND_NODEFAULT | a.SND_PURGE
a.PlaySound(None, flags)
# ************************************************************************
# * OSS audio
# ************************************************************************
class OSSAudioServer:
def __init__(self, pipe):
self.pipe = pipe
# import ossaudiodev
# self.audiodev = ossaudiodev.open('w')
self.sound_priority = -1
self._busy = False
def mainLoop(self):
while True:
s = os.read(self.pipe, 256)
ss = s.split(b'\0')
if not ss[0]:
os._exit(0)
if ss[0] == b'break':
self._play_loop = False
continue
filename, priority, loop = ss[0].decode(), int(ss[1]), int(ss[2])
if loop:
self._play_loop = True
th = Thread(target=self.playLoop, args=(filename,))
th.start()
else:
if not self._busy:
self.play(filename, priority)
def _getParameters(self, filename):
import ossaudiodev
import wave
w = wave.open(filename)
fmt = ossaudiodev.AFMT_U8
nch = w.getnchannels()
rate = w.getframerate()
frames = w.readframes(w.getnframes())
return (frames, fmt, nch, rate)
def playLoop(self, filename, priority=None):
# print '_playLoop:', filename
import ossaudiodev
try:
# audiodev = self.audiodev
audiodev = ossaudiodev.open('w')
# audiodev.nonblock()
frames, fmt, nch, rate = self._getParameters(filename)
audiodev.setparameters(fmt, nch, rate)
while self._play_loop:
audiodev.write(frames)
audiodev.reset()
# audiodev.close()
# self.audiodev = ossaudiodev.open('w')
return 1
except Exception:
if traceback:
traceback.print_exc()
return 0
def play(self, filename, priority):
# print '_play:', filename
import ossaudiodev
try:
self._busy = True
# audiodev = self.audiodev
audiodev = ossaudiodev.open('w')
# audiodev.nonblock()
frames, fmt, nch, rate = self._getParameters(filename)
audiodev.setparameters(fmt, nch, rate)
audiodev.write(frames)
# audiodev.close()
# self.audiodev = ossaudiodev.open('w')
self.sound_priority = priority
self._busy = False
return 1
except Exception:
if traceback:
traceback.print_exc()
self._busy = False
return 0
class OSSAudioClient(AbstractAudioClient):
CAN_PLAY_SOUND = True
CAN_PLAY_MUSIC = False
def __init__(self):
AbstractAudioClient.__init__(self)
import ossaudiodev
if not os.path.exists('/dev/dsp'):
raise RuntimeError('OSS interface not available')
self.audiodev = ossaudiodev
def startServer(self):
pin, pout = os.pipe()
self.pout = pout
server = OSSAudioServer(pin)
pid = os.fork()
if pid == 0:
server.mainLoop()
def _playSample(self, filename, priority, loop, volume):
# print '_playSample:', filename, loop
os.write(self.pout, b'%s\0%s\0%s\0' %
(filename.encode(),
str(priority).encode(),
str(loop).encode()))
return 1
def _stopSamples(self):
os.write(self.pout, b'break\0\0\0')
def _destroy(self):
os.write(self.pout, b'\0\0\0')
# ************************************************************************
# * PyGame
# ************************************************************************
class PyGameAudioClient(AbstractAudioClient):
EXTENSIONS = r'\.((ogg)|(mp3)|(wav)|(it)|(mod)|(s3m)|(xm)|(mid)|(midi))$'
CAN_PLAY_SOUND = True
CAN_PLAY_MUSIC = True
def __init__(self):
print("print PyGameAudioClient", flush=True)
AbstractAudioClient.__init__(self)
import pygame.mixer
import pygame.mixer_music
import pygame.time
if os.name == 'nt':
# for py2exe
import pygame.base
import pygame.rwobject
import pygame.mixer_music
self.mixer = pygame.mixer
self.time = pygame.time
self.mixer_music = pygame.mixer_music
self.music = self.mixer_music
self.audiodev = self.mixer
self.sound = None
self.sound_channel = None
self.sound_priority = -1
def startServer(self):
pass
def connectServer(self, app):
AbstractAudioClient.connectServer(self, app)
# http://www.pygame.org/docs/ref/mixer.html
# NOTE: there is currently a bug on some windows machines which
# makes sound play back 'scratchy'. There is not enough cpu in
# the sound thread to feed the buffer to the sound api. To get
# around this you can increase the buffer size. However this
# means that there is more of a delay between the time you ask to
# play the sound and when it gets played. Try calling this before
# the pygame.init or pygame.mixer.init calls.
# pygame.mixer.pre_init(44100,-16,2, 1024 * 3)
# self.mixer.pre_init(44100, -16, 2, 1024 * 3)
buff_size = self.app.opt.sound_sample_buffer_size
self.mixer.pre_init(44100, -16, 2, 1024*buff_size)
self.mixer.init()
def _playSample(self, filename, priority, loop, volume):
# print '_playSample:', filename, priority, loop, volume
if self.sound_channel and self.sound_channel.get_busy():
if self.sound_priority >= priority:
return 0
else:
self.sound.stop()
vol = self.app.opt.sound_sample_volume/128.0
try:
self.sound = self.mixer.Sound(filename)
self.sound.set_volume(vol)
self.sound_channel = self.sound.play(loop)
except Exception:
if traceback:
traceback.print_exc()
pass
self.sound_priority = priority
return 1
def _stopSamples(self):
if self.sound:
self.sound.stop()
self.sound = None
self.sound_channel = None
def _playMusicLoop(self):
# print('_playMusicLoop')
music_list = self.music_list
if not music_list:
return
while True:
if not self.music or not self.app.opt.music:
break
for m in music_list:
if not self.music or not self.app.opt.music:
break
vol = self.app.opt.sound_music_volume/128.0
try:
self.music.load(m.absname)
self.music.set_volume(vol)
self.music.play()
while self.music and self.music.get_busy():
self._wait(200)
self._wait(300)
except Exception:
# if traceback: traceback.print_exc()
self._wait(1000)
def _destroy(self):
self.mixer.stop()
self.mixer.quit()
self.music = None
def _wait(self, s):
# sometime time or time.wait is None (threading)
if self.time and self.time.wait:
self.time.wait(s)
def playContinuousMusic(self, music_list):
# print 'playContinuousMusic'
self.music_list = music_list
# if self.audiodev is None or not self.app:
# return
if not music_list:
return
th = Thread(target=self._playMusicLoop)
th.start()
def updateSettings(self):
if (not self.app.opt.sound or not self.app.opt.music or
self.app.opt.sound_music_volume == 0):
if self.music:
self.music.stop()
self.music = None
else:
if not self.music:
self.music = self.mixer_music
th = Thread(target=self._playMusicLoop)
th.start()
else:
vol = self.app.opt.sound_music_volume/128.0
self.music.set_volume(vol)
def playNextMusic(self):
if self.music:
self.music.stop()