Files
asknavidrome-cafofo/skill/app.py

1190 lines
43 KiB
Python
Raw Normal View History

from datetime import datetime
2022-08-05 16:37:56 +01:00
from flask import Flask, render_template
import logging
from multiprocessing import Process
from multiprocessing.managers import BaseManager
2022-08-05 16:37:56 +01:00
import os
import random
import sys
from ask_sdk_core.skill_builder import SkillBuilder
from ask_sdk_core.dispatch_components import AbstractRequestHandler, AbstractRequestInterceptor, AbstractResponseInterceptor
from ask_sdk_core.utils import is_request_type, is_intent_name, get_slot_value_v2, get_intent_name, get_request_type
from ask_sdk_core.handler_input import HandlerInput
from ask_sdk_model import Response
from ask_sdk_core.dispatch_components import AbstractExceptionHandler
from flask_ask_sdk.skill_adapter import SkillAdapter
import asknavidrome.subsonic_api as api
import asknavidrome.media_queue as queue
import asknavidrome.controller as controller
# Create web service
app = Flask(__name__)
# Create skill object
sb = SkillBuilder()
# Setup Logging
logger = logging.getLogger() # Create logger
level = logging.getLevelName('DEBUG')
logger.setLevel(level) # Set logger log level
log_formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler = logging.StreamHandler(sys.stdout)
handler.setLevel(level)
handler.setFormatter(log_formatter)
logger.addHandler(handler)
#
# Get service configuration
#
2022-10-22 15:50:38 +01:00
logger.info('AskNavidrome 0.6!')
2025-09-08 13:10:45 +01:00
logger.debug('Getting configuration from the environment...')
2022-08-05 16:37:56 +01:00
try:
if 'NAVI_SKILL_ID' in os.environ:
# Set skill ID, this is available on the Alexa Developer Console
# if this is not set the web service will respond to any skill.
sb.skill_id = os.getenv('NAVI_SKILL_ID')
logger.info(f'Skill ID set to: {sb.skill_id}')
else:
raise NameError
except NameError as err:
logger.error(f'The Alexa skill ID was not found! {err}')
raise
try:
if 'NAVI_SONG_COUNT' in os.environ:
min_song_count = os.getenv('NAVI_SONG_COUNT')
logger.info(f'Minimum song count is set to: {min_song_count}')
else:
raise NameError
except NameError as err:
logger.error(f'The minimum song count was not found! {err}')
raise
try:
if 'NAVI_URL' in os.environ:
navidrome_url = os.getenv('NAVI_URL')
logger.info(f'The URL for Navidrome is set to: {navidrome_url}')
else:
raise NameError
except NameError as err:
logger.error(f'The URL of the Navidrome server was not found! {err}')
raise
try:
if 'NAVI_USER' in os.environ:
navidrome_user = os.getenv('NAVI_USER')
logger.info(f'The Navidrome user name is set to: {navidrome_user}')
else:
raise NameError
except NameError as err:
logger.error(f'The Navidrome user name was not found! {err}')
raise
try:
if 'NAVI_PASS' in os.environ:
navidrome_passwd = os.getenv('NAVI_PASS')
logger.info('The Navidrome password is set')
else:
raise NameError
except NameError as err:
logger.error(f'The Navidrome password was not found! {err}')
raise
try:
if 'NAVI_PORT' in os.environ:
navidrome_port = os.getenv('NAVI_PORT')
logger.info(f'The Navidrome port is set to: {navidrome_port}')
else:
raise NameError
except NameError as err:
logger.error(f'The Navidrome port was not found! {err}')
raise
try:
if 'NAVI_API_PATH' in os.environ:
navidrome_api_location = os.getenv('NAVI_API_PATH')
logger.info(f'The Navidrome API path is set to: {navidrome_api_location}')
else:
raise NameError
except NameError as err:
logger.error(f'The Navidrome API path was not found! {err}')
raise
try:
if 'NAVI_API_VER' in os.environ:
navidrome_api_version = os.getenv('NAVI_API_VER')
logger.info(f'The Navidrome API version is set to: {navidrome_api_version}')
else:
raise NameError
except NameError as err:
logger.error(f'The Navidrome API version was not found! {err}')
raise
logger.debug('Configuration has been successfully loaded')
2022-11-19 16:24:13 +00:00
# Set log level based on config value
if 'NAVI_DEBUG' in os.environ:
navidrome_log_level = int(os.getenv('NAVI_DEBUG'))
if navidrome_log_level == 0:
# Warnings and higher
logger.setLevel(logging.WARNING)
logger.warning('Log level set to WARNING')
elif navidrome_log_level == 1:
# Info messages and higher
logger.setLevel(logging.INFO)
logger.info('Log level set to INFO')
elif navidrome_log_level == 2:
# Debug with request and response interceptors
logger.setLevel(logging.DEBUG)
logger.debug('Log level set to DEBUG')
elif navidrome_log_level == 3:
# Debug with request / response interceptors and Web GUI
logger.setLevel(logging.DEBUG)
logger.debug('Log level set to DEBUG')
else:
# Invalid value provided - set to WARNING
navidrome_log_level = 0
logger.setLevel(logging.WARNING)
logger.warning('Log level set to WARNING')
2025-09-08 13:10:45 +01:00
# Create a shareable queue than can be updated by multiple threads to enable larger playlists
# to be returned in the back ground avoiding the Amazon 8 second timeout
BaseManager.register('MediaQueue', queue.MediaQueue)
manager = BaseManager()
manager.start()
play_queue = manager.MediaQueue()
2022-08-05 16:37:56 +01:00
logger.debug('MediaQueue object created...')
# Variable to store the additional thread used to populate large playlists
2025-09-08 13:10:45 +01:00
# this is used to avoid concurrency issues if there is an attempt to load multiple playlists
# at the same time.
backgroundProcess = None
2022-08-05 16:37:56 +01:00
# Connect to Navidrome
connection = api.SubsonicConnection(navidrome_url,
navidrome_user,
navidrome_passwd,
navidrome_port,
navidrome_api_location,
navidrome_api_version)
try:
connection.ping()
except:
raise RuntimeError('Could not connect to SubSonic API!')
logger.info('AskNavidrome Web Service is ready to start!')
#
# Handler Classes
#
class LaunchRequestHandler(AbstractRequestHandler):
"""Handle LaunchRequest and NavigateHomeIntent"""
def can_handle(self, handler_input: HandlerInput) -> bool:
return (
is_request_type('LaunchRequest')(handler_input) or
is_intent_name('AMAZON.NavigateHomeIntent')(handler_input)
)
def handle(self, handler_input: HandlerInput) -> Response:
logger.debug('In LaunchRequestHandler')
connection.ping()
speech = sanitise_speech_output('Ready!')
2022-08-05 16:37:56 +01:00
handler_input.response_builder.speak(speech).ask(speech)
return handler_input.response_builder.response
class CheckAudioInterfaceHandler(AbstractRequestHandler):
"""Check if device supports audio play.
This can be used as the first handler to be checked, before invoking
other handlers, thus making the skill respond to unsupported devices
without doing much processing.
"""
def can_handle(self, handler_input: HandlerInput) -> bool:
if handler_input.request_envelope.context.system.device:
# Since skill events won't have device information
return handler_input.request_envelope.context.system.device.supported_interfaces.audio_player is None
else:
return False
def handle(self, handler_input: HandlerInput) -> Response:
logger.debug('In CheckAudioInterfaceHandler')
_ = handler_input.attributes_manager.request_attributes['_']
handler_input.response_builder.speak('This device is not supported').set_should_end_session(True)
return handler_input.response_builder.response
class SkillEventHandler(AbstractRequestHandler):
"""Close session for skill events or when session ends.
Handler to handle session end or skill events (SkillEnabled,
SkillDisabled etc.)
"""
def can_handle(self, handler_input: HandlerInput) -> bool:
return (handler_input.request_envelope.request.object_type.startswith(
'AlexaSkillEvent') or
is_request_type('SessionEndedRequest')(handler_input))
def handle(self, handler_input: HandlerInput) -> Response:
logger.debug('In SkillEventHandler')
return handler_input.response_builder.response
class HelpHandler(AbstractRequestHandler):
"""Handle HelpIntent"""
def can_handle(self, handler_input: HandlerInput) -> bool:
return is_intent_name('AMAZON.HelpIntent')(handler_input)
def handle(self, handler_input: HandlerInput) -> Response:
logger.debug('In HelpHandler')
text = sanitise_speech_output('AskNavidrome lets you interact with media servers that offer a Subsonic compatible A.P.I.')
2022-08-05 16:37:56 +01:00
handler_input.response_builder.speak(text)
return handler_input.response_builder.response
class NaviSonicPlayMusicByArtist(AbstractRequestHandler):
"""Handle NaviSonicPlayMusicByArtist
Play a selection of songs for the given artist
"""
def can_handle(self, handler_input: HandlerInput) -> bool:
return is_intent_name('NaviSonicPlayMusicByArtist')(handler_input)
def handle(self, handler_input: HandlerInput) -> Response:
global backgroundProcess
2022-08-05 16:37:56 +01:00
logger.debug('In NaviSonicPlayMusicByArtist')
# Check if a background process is already running, if it is then terminate the process
# in favour of the new process.
2025-09-08 15:47:49 +01:00
if backgroundProcess is not None:
backgroundProcess.terminate()
backgroundProcess.join()
2022-08-05 16:37:56 +01:00
# Get the requested artist
artist = get_slot_value_v2(handler_input, 'artist')
# Search for an artist
artist_lookup = connection.search_artist(artist.value)
if artist_lookup is None:
text = sanitise_speech_output(f"I couldn't find the artist {artist.value} in the collection.")
2022-08-05 16:37:56 +01:00
handler_input.response_builder.speak(text).ask(text)
return handler_input.response_builder.response
else:
# Get a list of albums by the artist
artist_album_lookup = connection.albums_by_artist(artist_lookup[0].get('id'))
# Build a list of songs to play
song_id_list = connection.build_song_list_from_albums(artist_album_lookup, min_song_count)
play_queue.clear()
controller.enqueue_songs(connection, play_queue, [song_id_list[0], song_id_list[1]]) # When generating the playlist return the first two tracks.
2025-09-08 15:47:49 +01:00
backgroundProcess = Process(target=queue_worker_thread, args=(connection, play_queue, song_id_list[2:])) # Create a thread to enqueue the remaining tracks
backgroundProcess.start() # Start the additional thread
speech = sanitise_speech_output(f'Playing music by: {artist.value}')
2022-08-05 16:37:56 +01:00
logger.info(speech)
card = {'title': 'AskNavidrome',
'text': speech
}
play_queue.shuffle()
track_details = play_queue.get_next_track()
return controller.start_playback('play', speech, card, track_details, handler_input)
class NaviSonicPlayAlbumByArtist(AbstractRequestHandler):
"""Handle NaviSonicPlayAlbumByArtist
Play a given album by a given artist
"""
def can_handle(self, handler_input: HandlerInput) -> bool:
return is_intent_name('NaviSonicPlayAlbumByArtist')(handler_input)
def handle(self, handler_input: HandlerInput) -> Response:
global backgroundProcess
2022-08-05 16:37:56 +01:00
logger.debug('In NaviSonicPlayAlbumByArtist')
# Check if a background process is already running, if it is then terminate the process
# in favour of the new process.
2025-09-08 15:47:49 +01:00
if backgroundProcess is not None:
backgroundProcess.terminate()
backgroundProcess.join()
2022-08-05 16:37:56 +01:00
# Get variables from intent
artist = get_slot_value_v2(handler_input, 'artist')
album = get_slot_value_v2(handler_input, 'album')
if artist is not None and album is not None:
# Play album by artist method
logger.debug(f'Searching for the album {album.value} by {artist.value}')
# Search for an artist
artist_lookup = connection.search_artist(artist.value)
if artist_lookup is None:
text = sanitise_speech_output(f"I couldn't find the artist {artist.value} in the collection.")
2022-08-05 16:37:56 +01:00
handler_input.response_builder.speak(text).ask(text)
return handler_input.response_builder.response
else:
artist_album_lookup = connection.albums_by_artist(artist_lookup[0].get('id'))
# Search the list of dictionaries for the requested album
# Strings are all converted to lower case to minimise matching errors
result = [album_result for album_result in artist_album_lookup if album_result.get('name').lower() == album.value.lower()]
2022-08-05 16:37:56 +01:00
if not result:
text = sanitise_speech_output(f"I couldn't find an album called {album.value} by {artist.value} in the collection.")
2022-08-05 16:37:56 +01:00
handler_input.response_builder.speak(text).ask(text)
return handler_input.response_builder.response
# At this point we have found an album that matches
song_id_list = connection.build_song_list_from_albums(result, -1)
2022-08-05 16:37:56 +01:00
play_queue.clear()
# Work around the Amazon / Alexa 8 second timeout.
controller.enqueue_songs(connection, play_queue, [song_id_list[0], song_id_list[1]]) # When generating the playlist return the first two tracks.
2025-09-08 15:47:49 +01:00
backgroundProcess = Process(target=queue_worker_thread, args=(connection, play_queue, song_id_list[2:])) # Create a thread to enqueue the remaining tracks
backgroundProcess.start() # Start the additional thread
2022-08-05 16:37:56 +01:00
speech = sanitise_speech_output(f'Playing {album.value} by: {artist.value}')
2022-08-05 16:37:56 +01:00
logger.info(speech)
card = {'title': 'AskNavidrome',
'text': speech
}
track_details = play_queue.get_next_track()
return controller.start_playback('play', speech, card, track_details, handler_input)
elif artist is None and album:
# Play album method
logger.debug(f'Searching for the album {album.value}')
result = connection.search_album(album.value)
if result is None:
text = sanitise_speech_output(f"I couldn't find the album {album.value} in the collection.")
2022-08-05 16:37:56 +01:00
handler_input.response_builder.speak(text).ask(text)
return handler_input.response_builder.response
else:
song_id_list = connection.build_song_list_from_albums(result, -1)
2022-08-05 16:37:56 +01:00
play_queue.clear()
# Work around the Amazon / Alexa 8 second timeout.
controller.enqueue_songs(connection, play_queue, [song_id_list[0], song_id_list[1]]) # When generating the playlist return the first two tracks.
2025-09-08 15:47:49 +01:00
backgroundProcess = Process(target=queue_worker_thread, args=(connection, play_queue, song_id_list[2:])) # Create a thread to enqueue the remaining tracks
backgroundProcess.start() # Start the additional thread
speech = sanitise_speech_output(f'Playing {album.value}')
2022-08-05 16:37:56 +01:00
logger.info(speech)
card = {'title': 'AskNavidrome',
'text': speech
}
track_details = play_queue.get_next_track()
return controller.start_playback('play', speech, card, track_details, handler_input)
class NaviSonicPlaySongByArtist(AbstractRequestHandler):
"""Handle the NaviSonicPlaySongByArtist intent
Play the given song by the given artist if it exists in the
collection.
"""
def can_handle(self, handler_input: HandlerInput) -> bool:
return is_intent_name('NaviSonicPlaySongByArtist')(handler_input)
def handle(self, handler_input: HandlerInput) -> Response:
logger.debug('In NaviSonicPlaySongByArtist')
# Get variables from intent
artist = get_slot_value_v2(handler_input, 'artist')
song = get_slot_value_v2(handler_input, 'song')
logger.debug(f'Searching for the song {song.value} by {artist.value}')
# Search for the artist
artist_lookup = connection.search_artist(artist.value)
if artist_lookup is None:
text = sanitise_speech_output(f"I couldn't find the artist {artist.value} in the collection.")
2022-08-05 16:37:56 +01:00
handler_input.response_builder.speak(text).ask(text)
return handler_input.response_builder.response
else:
artist_id = artist_lookup[0].get('id')
# Search for song
song_list = connection.search_song(song.value)
# Search for song by given artist.
song_dets = [item.get('id') for item in song_list if item.get('artistId') == artist_id]
if not song_dets:
text = sanitise_speech_output(f"I couldn't find a song called {song.value} by {artist.value} in the collection.")
2022-08-05 16:37:56 +01:00
handler_input.response_builder.speak(text).ask(text)
return handler_input.response_builder.response
play_queue.clear()
controller.enqueue_songs(connection, play_queue, song_dets)
speech = sanitise_speech_output(f'Playing {song.value} by {artist.value}')
2022-08-05 16:37:56 +01:00
logger.info(speech)
card = {'title': 'AskNavidrome',
'text': speech
}
track_details = play_queue.get_next_track()
return controller.start_playback('play', speech, card, track_details, handler_input)
class NaviSonicPlayPlaylist(AbstractRequestHandler):
"""Handle NaviSonicPlayPlaylist
Play the given playlist
"""
def can_handle(self, handler_input: HandlerInput) -> bool:
return is_intent_name('NaviSonicPlayPlaylist')(handler_input)
def handle(self, handler_input: HandlerInput) -> Response:
global backgroundProcess
2022-08-05 16:37:56 +01:00
logger.debug('In NaviSonicPlayPlaylist')
# Check if a background process is already running, if it is then terminate the process
# in favour of the new process.
2025-09-08 15:47:49 +01:00
if backgroundProcess is not None:
backgroundProcess.terminate()
backgroundProcess.join()
2022-08-05 16:37:56 +01:00
# Get the requested playlist
playlist = get_slot_value_v2(handler_input, 'playlist')
# Search for a playlist
playlist_id = connection.search_playlist(playlist.value)
if playlist_id is None:
text = sanitise_speech_output("I couldn't find the playlist " + str(playlist.value) + ' in the collection.')
2022-08-05 16:37:56 +01:00
handler_input.response_builder.speak(text).ask(text)
return handler_input.response_builder.response
else:
song_id_list = connection.build_song_list_from_playlist(playlist_id)
play_queue.clear()
# Work around the Amazon / Alexa 8 second timeout.
controller.enqueue_songs(connection, play_queue, [song_id_list[0], song_id_list[1]]) # When generating the playlist return the first two tracks.
2025-09-08 15:47:49 +01:00
backgroundProcess = Process(target=queue_worker_thread, args=(connection, play_queue, song_id_list[2:])) # Create a thread to enqueue the remaining tracks
backgroundProcess.start() # Start the additional thread
2025-09-08 13:10:45 +01:00
speech = sanitise_speech_output('Playing playlist ' + str(playlist.value))
2022-08-05 16:37:56 +01:00
logger.info(speech)
card = {'title': 'AskNavidrome',
'text': speech
}
track_details = play_queue.get_next_track()
return controller.start_playback('play', speech, card, track_details, handler_input)
class NaviSonicPlayMusicByGenre(AbstractRequestHandler):
2025-09-08 13:10:45 +01:00
""" Play songs from the given genre
2022-08-05 16:37:56 +01:00
2025-09-08 13:10:45 +01:00
50 tracks from the given genre are shuffled and played
2022-08-05 16:37:56 +01:00
"""
def can_handle(self, handler_input: HandlerInput) -> bool:
return is_intent_name('NaviSonicPlayMusicByGenre')(handler_input)
def handle(self, handler_input: HandlerInput) -> Response:
global backgroundProcess
2022-08-05 16:37:56 +01:00
logger.debug('In NaviSonicPlayMusicByGenre')
# Check if a background process is already running, if it is then terminate the process
# in favour of the new process.
2025-09-08 15:47:49 +01:00
if backgroundProcess is not None:
backgroundProcess.terminate()
backgroundProcess.join()
2022-08-05 16:37:56 +01:00
# Get the requested genre
genre = get_slot_value_v2(handler_input, 'genre')
song_id_list = connection.build_song_list_from_genre(genre.value, min_song_count)
if song_id_list is None:
text = sanitise_speech_output(f"I couldn't find any {genre.value} songs in the collection.")
2022-08-05 16:37:56 +01:00
handler_input.response_builder.speak(text).ask(text)
return handler_input.response_builder.response
else:
random.shuffle(song_id_list)
play_queue.clear()
# Work around the Amazon / Alexa 8 second timeout.
controller.enqueue_songs(connection, play_queue, [song_id_list[0], song_id_list[1]]) # When generating the playlist return the first two tracks.
2025-09-08 15:47:49 +01:00
backgroundProcess = Process(target=queue_worker_thread, args=(connection, play_queue, song_id_list[2:])) # Create a thread to enqueue the remaining tracks
backgroundProcess.start() # Start the additional thread
2022-08-05 16:37:56 +01:00
speech = sanitise_speech_output(f'Playing {genre.value} music')
2022-08-05 16:37:56 +01:00
logger.info(speech)
card = {'title': 'AskNavidrome',
'text': speech
}
track_details = play_queue.get_next_track()
return controller.start_playback('play', speech, card, track_details, handler_input)
class NaviSonicPlayMusicRandom(AbstractRequestHandler):
"""Handle the NaviSonicPlayMusicRandom intent
Play a random selection of music.
"""
def can_handle(self, handler_input: HandlerInput) -> bool:
return is_intent_name('NaviSonicPlayMusicRandom')(handler_input)
def handle(self, handler_input: HandlerInput) -> Response:
global backgroundProcess
2022-08-05 16:37:56 +01:00
logger.debug('In NaviSonicPlayMusicRandom')
# Check if a background process is already running, if it is then terminate the process
# in favour of the new process.
2025-09-08 15:47:49 +01:00
if backgroundProcess is not None:
backgroundProcess.terminate()
backgroundProcess.join()
2022-08-05 16:37:56 +01:00
song_id_list = connection.build_random_song_list(min_song_count)
if song_id_list is None:
text = sanitise_speech_output("I couldn't find any songs in the collection.")
2022-08-05 16:37:56 +01:00
handler_input.response_builder.speak(text).ask(text)
return handler_input.response_builder.response
else:
random.shuffle(song_id_list)
play_queue.clear()
# Work around the Amazon / Alexa 8 second timeout.
controller.enqueue_songs(connection, play_queue, [song_id_list[0], song_id_list[1]]) # When generating the playlist return the first two tracks.
2025-09-08 15:47:49 +01:00
backgroundProcess = Process(target=queue_worker_thread, args=(connection, play_queue, song_id_list[2:])) # Create a thread to enqueue the remaining tracks
backgroundProcess.start() # Start the additional thread
2022-08-05 16:37:56 +01:00
speech = sanitise_speech_output('Playing random music')
2022-08-05 16:37:56 +01:00
logger.info(speech)
card = {'title': 'AskNavidrome',
'text': speech
}
track_details = play_queue.get_next_track()
return controller.start_playback('play', speech, card, track_details, handler_input)
class NaviSonicPlayFavouriteSongs(AbstractRequestHandler):
"""Handle the NaviSonicPlayFavouriteSongs intent
Play all starred / liked songs, songs are automatically shuffled.
"""
def can_handle(self, handler_input: HandlerInput) -> bool:
return is_intent_name('NaviSonicPlayFavouriteSongs')(handler_input)
def handle(self, handler_input: HandlerInput) -> Response:
global backgroundProcess
2022-08-05 16:37:56 +01:00
logger.debug('In NaviSonicPlayFavouriteSongs')
# Check if a background process is already running, if it is then terminate the process
# in favour of the new process.
2025-09-08 15:47:49 +01:00
if backgroundProcess is not None:
backgroundProcess.terminate()
backgroundProcess.join()
2022-08-05 16:37:56 +01:00
song_id_list = connection.build_song_list_from_favourites()
if song_id_list is None:
text = sanitise_speech_output("You don't have any favourite songs in the collection.")
2022-08-05 16:37:56 +01:00
handler_input.response_builder.speak(text).ask(text)
return handler_input.response_builder.response
else:
random.shuffle(song_id_list)
play_queue.clear()
# Work around the Amazon / Alexa 8 second timeout.
controller.enqueue_songs(connection, play_queue, [song_id_list[0], song_id_list[1]]) # When generating the playlist return the first two tracks.
2025-09-08 15:47:49 +01:00
backgroundProcess = Process(target=queue_worker_thread, args=(connection, play_queue, song_id_list[2:])) # Create a thread to enqueue the remaining tracks
backgroundProcess.start() # Start the additional thread
2022-08-05 16:37:56 +01:00
speech = sanitise_speech_output('Playing your favourite tracks.')
2022-08-05 16:37:56 +01:00
logger.info(speech)
card = {'title': 'AskNavidrome',
'text': speech
}
track_details = play_queue.get_next_track()
return controller.start_playback('play', speech, card, track_details, handler_input)
2022-10-22 15:50:38 +01:00
class NaviSonicRandomiseQueue(AbstractRequestHandler):
"""Handle NaviSonicRandomiseQueue Intent
Shuffle the current play queue
"""
def can_handle(self, handler_input: HandlerInput) -> bool:
return is_intent_name('NaviSonicRandomiseQueue')(handler_input)
def handle(self, handler_input: HandlerInput) -> Response:
logger.debug('In NaviSonicRandomiseQueue Handler')
play_queue.shuffle()
play_queue.sync()
return handler_input.response_builder.response
2022-08-05 16:37:56 +01:00
class NaviSonicSongDetails(AbstractRequestHandler):
"""Handle NaviSonicSongDetails Intent
Returns information on the track that is currently playing
"""
def can_handle(self, handler_input: HandlerInput) -> bool:
return is_intent_name('NaviSonicSongDetails')(handler_input)
def handle(self, handler_input: HandlerInput) -> Response:
logger.debug('In NaviSonicSongDetails Handler')
current_track = play_queue.get_current_track()
title = sanitise_speech_output(current_track.title)
artist = sanitise_speech_output(current_track.artist)
album = sanitise_speech_output(current_track.album)
2022-08-05 16:37:56 +01:00
text = f'This is {title} by {artist}, from the album {album}'
handler_input.response_builder.speak(text)
return handler_input.response_builder.response
class NaviSonicStarSong(AbstractRequestHandler):
"""Handle NaviSonicStarSong Intent
Star / favourite the current song
"""
def can_handle(self, handler_input: HandlerInput) -> bool:
return is_intent_name('NaviSonicStarSong')(handler_input)
def handle(self, handler_input: HandlerInput) -> Response:
logger.debug('In NaviSonicStarSong Handler')
current_track = play_queue.get_current_track()
song_id = current_track.id
2022-08-05 16:37:56 +01:00
connection.star_entry(song_id, 'song')
return handler_input.response_builder.response
class NaviSonicUnstarSong(AbstractRequestHandler):
"""Handle NaviSonicUnstarSong Intent
Star / favourite the current song
"""
def can_handle(self, handler_input: HandlerInput) -> bool:
return is_intent_name('NaviSonicUnstarSong')(handler_input)
def handle(self, handler_input: HandlerInput) -> Response:
logger.debug('In NaviSonicUnstarSong Handler')
current_track = play_queue.get_current_track()
song_id = current_track.id
2022-08-05 16:37:56 +01:00
connection.star_entry(song_id, 'song')
connection.unstar_entry(song_id, 'song')
return handler_input.response_builder.response
#
# AudioPlayer Handlers
#
class PlaybackStartedHandler(AbstractRequestHandler):
"""AudioPlayer.PlaybackStarted Directive received.
Confirming that the requested audio file began playing.
Do not send any specific response.
"""
def can_handle(self, handler_input: HandlerInput) -> bool:
return is_request_type('AudioPlayer.PlaybackStarted')(handler_input)
def handle(self, handler_input: HandlerInput) -> Response:
logger.debug('In PlaybackStartedHandler')
logger.info('Playback started')
return handler_input.response_builder.response
class PlaybackStoppedHandler(AbstractRequestHandler):
"""AudioPlayer.PlaybackStopped Directive received.
Confirming that the requested audio file stopped playing.
Do not send any specific response.
"""
def can_handle(self, handler_input: HandlerInput) -> bool:
return is_request_type('AudioPlayer.PlaybackStopped')(handler_input)
def handle(self, handler_input: HandlerInput) -> Response:
logger.debug('In PlaybackStoppedHandler')
# store the current offset for later resumption
play_queue.set_current_track_offset(handler_input.request_envelope.request.offset_in_milliseconds)
current_track = play_queue.get_current_track()
logger.debug(f'Stored track offset of: {current_track.offset} ms for {current_track.title}')
2022-08-05 16:37:56 +01:00
logger.info('Playback stopped')
return handler_input.response_builder.response
class PlaybackNearlyFinishedHandler(AbstractRequestHandler):
"""AudioPlayer.PlaybackNearlyFinished Directive received.
Replacing queue with the URL again. This should not happen on live streams.
"""
def can_handle(self, handler_input: HandlerInput) -> bool:
return is_request_type('AudioPlayer.PlaybackNearlyFinished')(handler_input)
def handle(self, handler_input: HandlerInput) -> Response:
logger.debug('In PlaybackNearlyFinishedHandler')
logger.info('Queuing next track...')
track_details = play_queue.enqueue_next_track()
return controller.start_playback('continue', None, None, track_details, handler_input)
class PlaybackFinishedHandler(AbstractRequestHandler):
"""AudioPlayer.PlaybackFinished Directive received.
Confirming that the requested audio file completed playing.
Do not send any specific response.
"""
def can_handle(self, handler_input: HandlerInput) -> bool:
return is_request_type('AudioPlayer.PlaybackFinished')(handler_input)
def handle(self, handler_input: HandlerInput) -> Response:
logger.debug('In PlaybackFinishedHandler')
# Generate a timestamp in milliseconds for scrobbling
timestamp_ms = datetime.now().timestamp()
current_track = play_queue.get_current_track()
connection.scrobble(current_track.id, timestamp_ms)
2022-08-05 16:37:56 +01:00
play_queue.get_next_track()
return handler_input.response_builder.response
class PausePlaybackHandler(AbstractRequestHandler):
"""Handler for stopping audio.
Handles Stop, Cancel and Pause Intents and PauseCommandIssued event.
"""
def can_handle(self, handler_input: HandlerInput) -> bool:
return (is_intent_name('AMAZON.StopIntent')(handler_input) or
is_intent_name('AMAZON.CancelIntent')(handler_input) or
is_intent_name('AMAZON.PauseIntent')(handler_input))
def handle(self, handler_input: HandlerInput) -> Response:
logger.debug('In PausePlaybackHandler')
play_queue.sync()
return controller.stop(handler_input)
class ResumePlaybackHandler(AbstractRequestHandler):
"""Handler for resuming audio on different events.
Handles PlayAudio Intent, Resume Intent.
"""
def can_handle(self, handler_input: HandlerInput) -> bool:
return (is_intent_name('AMAZON.ResumeIntent')(handler_input) or
is_intent_name('PlayAudio')(handler_input))
def handle(self, handler_input: HandlerInput) -> Response:
logger.debug('In ResumePlaybackHandler')
current_track = play_queue.get_current_track()
if current_track.offset > 0:
2022-08-05 16:37:56 +01:00
# There is a paused track, continue
logger.info('Resuming ' + str(current_track.title))
logger.info('Offset ' + str(current_track.offset))
2022-08-05 16:37:56 +01:00
return controller.start_playback('play', None, None, current_track, handler_input)
2022-08-05 16:37:56 +01:00
elif play_queue.get_queue_count() > 0 and current_track.offset == 0:
2022-08-05 16:37:56 +01:00
# No paused tracks but tracks in queue
logger.info('Resuming - There was no paused track, getting next track from queue')
track_details = play_queue.get_next_track()
return controller.start_playback('play', None, None, track_details, handler_input)
class NextPlaybackHandler(AbstractRequestHandler):
"""Handle NextIntent"""
def can_handle(self, handler_input: HandlerInput) -> bool:
return is_intent_name('AMAZON.NextIntent')(handler_input)
def handle(self, handler_input: HandlerInput) -> Response:
logger.debug('In NextPlaybackHandler')
track_details = play_queue.get_next_track()
# Set the offset to 0 as we are skipping we want to start at the beginning
track_details.offset = 0
return controller.start_playback('play', None, None, track_details, handler_input)
class PreviousPlaybackHandler(AbstractRequestHandler):
"""Handle PreviousIntent"""
def can_handle(self, handler_input: HandlerInput) -> bool:
return is_intent_name('AMAZON.PreviousIntent')(handler_input)
def handle(self, handler_input: HandlerInput) -> Response:
logger.debug('In PreviousPlaybackHandler')
2025-09-08 13:10:45 +01:00
track_details = play_queue.get_previous_track()
2022-08-05 16:37:56 +01:00
# Set the offset to 0 as we are skipping we want to start at the beginning
track_details.offset = 0
return controller.start_playback('play', None, None, track_details, handler_input)
class PlaybackFailedEventHandler(AbstractRequestHandler):
"""AudioPlayer.PlaybackFailed Directive received.
Logging the error and restarting playing with no output speech.
"""
def can_handle(self, handler_input: HandlerInput) -> bool:
return is_request_type('AudioPlayer.PlaybackFailed')(handler_input)
def handle(self, handler_input: HandlerInput) -> Response:
logger.debug('In PlaybackFailedHandler')
current_track = play_queue.get_current_track()
song_id = current_track.id
# Log failure and track ID
2022-08-05 16:37:56 +01:00
logger.error(f'Playback Failed: {handler_input.request_envelope.request.error}')
logger.error(f'Failed playing track with ID: {song_id}')
2022-08-05 16:37:56 +01:00
# Skip to the next track instead of stopping
track_details = play_queue.get_next_track()
# Set the offset to 0 as we are skipping we want to start at the beginning
track_details.offset = 0
return controller.start_playback('play', None, None, track_details, handler_input)
2022-08-05 16:37:56 +01:00
#
# Exception Handers
#
class SystemExceptionHandler(AbstractExceptionHandler):
"""Handle System.ExceptionEncountered
Handles exceptions and prints error information
in the log
"""
def can_handle(self, handler_input: HandlerInput, exception: Exception) -> bool:
return is_request_type('System.ExceptionEncountered')(handler_input)
def handle(self, handler_input: HandlerInput, exception: Exception) -> Response:
logger.debug('In SystemExceptionHandler')
# Log the exception
logger.error(f'System Exception: {exception}')
logger.error(f'Request Type Was: {get_request_type(handler_input)}')
error = handler_input.request_envelope.request.to_dict()
logger.error(f"Details: {error.get('error').get('message')}")
if get_request_type(handler_input) == 'IntentRequest':
logger.error(f'Intent Name Was: {get_intent_name(handler_input)}')
speech = sanitise_speech_output("Sorry, I didn't get that. Can you please say it again!!")
2022-08-05 16:37:56 +01:00
handler_input.response_builder.speak(speech).ask(speech)
return handler_input.response_builder.response
class GeneralExceptionHandler(AbstractExceptionHandler):
"""Handle general exceptions
Handles exceptions and prints error information
in the log
"""
def can_handle(self, handler_input: HandlerInput, exception: Exception) -> bool:
return True
def handle(self, handler_input: HandlerInput, exception: Exception) -> Response:
logger.debug('In GeneralExceptionHandler')
# Log the exception
logger.error(f'General Exception: {exception}')
logger.error(f'Request Type Was: {get_request_type(handler_input)}')
if get_request_type(handler_input) == 'IntentRequest':
logger.error(f'Intent Name Was: {get_intent_name(handler_input)}')
speech = sanitise_speech_output("Sorry, I didn't get that. Can you please say it again!!")
2022-08-05 16:37:56 +01:00
handler_input.response_builder.speak(speech).ask(speech)
return handler_input.response_builder.response
#
# Request Interceptors
#
class LoggingRequestInterceptor(AbstractRequestInterceptor):
"""Intercept all requests
Intercepts all requests sent to the skill and prints them in the log
"""
def process(self, handler_input: HandlerInput):
logger.debug(f'Request received: {handler_input.request_envelope.request}')
class LoggingResponseInterceptor(AbstractResponseInterceptor):
"""Intercept all responses
Intercepts all responses sent from the skill and prints them in the log
"""
def process(self, handler_input: HandlerInput, response: Response):
logger.debug(f'Response sent: {response}')
#
# Functions
#
2025-09-08 15:47:49 +01:00
def sanitise_speech_output(speech_string: str) -> str:
"""Sanitise speech output inline with the SSML standard
Speech Synthesis Markup Language (SSML) has certain ASCII characters that are
reserved. This function replaces them with alternatives.
:param speech_string: The string to process
:type speech_string: str
:return: The processed SSML compliant string
:rtype: str
"""
logger.debug('In sanitise_speech_output()')
if '&' in speech_string:
speech_string = speech_string.replace('&', 'and')
if '/' in speech_string:
speech_string = speech_string.replace('/', 'and')
if '\\' in speech_string:
speech_string = speech_string.replace('\\', 'and')
if '"' in speech_string:
speech_string = speech_string.replace('"', '')
if "'" in speech_string:
speech_string = speech_string.replace("'", "")
if "<" in speech_string:
speech_string = speech_string.replace('<', '')
if ">" in speech_string:
speech_string = speech_string.replace('>', '')
return speech_string
2022-08-05 16:37:56 +01:00
2025-09-08 15:47:49 +01:00
def queue_worker_thread(connection: object, play_queue: object, song_id_list: list) -> None:
"""Media queue worker
This function allows media queues to be populated in the background enabling multithreading
and increasing skill response times.
:param connection: A SubSonic API connection object
:type connection: object
:param play_queue: A MediaQueue object
:type play_queue: object
:param song_id_list: A list containing Navidrome song IDs
:type song_id_list: list
"""
logger.debug('In playlist processing thread!')
controller.enqueue_songs(connection, play_queue, song_id_list)
play_queue.sync()
logger.debug('Finished playlist processing!')
2022-08-05 16:37:56 +01:00
# Register Intent Handlers
sb.add_request_handler(LaunchRequestHandler())
sb.add_request_handler(CheckAudioInterfaceHandler())
sb.add_request_handler(SkillEventHandler())
sb.add_request_handler(HelpHandler())
sb.add_request_handler(NaviSonicPlayMusicByArtist())
sb.add_request_handler(NaviSonicPlayAlbumByArtist())
sb.add_request_handler(NaviSonicPlaySongByArtist())
sb.add_request_handler(NaviSonicPlayPlaylist())
sb.add_request_handler(NaviSonicPlayFavouriteSongs())
sb.add_request_handler(NaviSonicPlayMusicByGenre())
sb.add_request_handler(NaviSonicPlayMusicRandom())
sb.add_request_handler(NaviSonicRandomiseQueue())
2022-08-05 16:37:56 +01:00
sb.add_request_handler(NaviSonicSongDetails())
sb.add_request_handler(NaviSonicStarSong())
sb.add_request_handler(NaviSonicUnstarSong())
# Register AutoPlayer Handlers
sb.add_request_handler(PlaybackStartedHandler())
sb.add_request_handler(PlaybackStoppedHandler())
sb.add_request_handler(PlaybackNearlyFinishedHandler())
sb.add_request_handler(PlaybackFinishedHandler())
sb.add_request_handler(PausePlaybackHandler())
sb.add_request_handler(NextPlaybackHandler())
sb.add_request_handler(PreviousPlaybackHandler())
sb.add_request_handler(ResumePlaybackHandler())
sb.add_request_handler(PlaybackFailedEventHandler())
# Register Exception Handlers
sb.add_exception_handler(SystemExceptionHandler())
sb.add_exception_handler(GeneralExceptionHandler())
2022-11-19 16:24:13 +00:00
if navidrome_log_level >= 2:
# Register Interceptors (log all requests)
sb.add_global_request_interceptor(LoggingRequestInterceptor())
sb.add_global_response_interceptor(LoggingResponseInterceptor())
2022-08-05 16:37:56 +01:00
sa = SkillAdapter(skill=sb.create(), skill_id='test', app=app)
sa.register(app=app, route='/')
# Enable queue and history diagnostics
2022-11-19 16:24:13 +00:00
if navidrome_log_level == 3:
2022-08-05 16:37:56 +01:00
logger.warning('AskNavidrome debugging has been enabled, this should only be used when testing!')
logger.warning('The /buffer, /queue and /history http endpoints are available publicly!')
@app.route('/queue')
def view_queue():
"""View the contents of play_queue.queue
Creates a tabulated page containing the contents of the play_queue.queue deque.
2022-08-05 16:37:56 +01:00
"""
current_track = play_queue.get_current_track()
2022-08-05 16:37:56 +01:00
return render_template('table.html', title='AskNavidrome - Queued Tracks',
tracks=play_queue.get_current_queue(), current=current_track)
2022-08-05 16:37:56 +01:00
@app.route('/history')
def view_history():
"""View the contents of play_queue.history
2025-09-08 13:10:45 +01:00
Creates a tabulated page containing the contents of the play_queue.history deque.
2022-08-05 16:37:56 +01:00
"""
current_track = play_queue.get_current_track()
2022-08-05 16:37:56 +01:00
return render_template('table.html', title='AskNavidrome - Track History',
tracks=play_queue.get_history(), current=current_track)
2022-08-05 16:37:56 +01:00
@app.route('/buffer')
def view_buffer():
"""View the contents of play_queue.buffer
2025-09-08 13:10:45 +01:00
Creates a tabulated page containing the contents of the play_queue.buffer deque.
2022-08-05 16:37:56 +01:00
"""
current_track = play_queue.get_current_track()
2022-08-05 16:37:56 +01:00
return render_template('table.html', title='AskNavidrome - Buffered Tracks',
tracks=play_queue.get_buffer(), current=current_track)
2022-08-05 16:37:56 +01:00
# Run web app by default when file is executed.
if __name__ == '__main__':
# Start the web service
app.run(host='0.0.0.0')