Files
asknavidrome-cafofo/skill/app.py
Ross Stewart 15fbdfdeec - Reordered imports
- Updated comments for multi-thread queue processing
2025-08-30 13:48:55 +01:00

1056 lines
36 KiB
Python
Executable File

from flask import Flask, render_template
import logging
from multiprocessing import Process, Manager
from multiprocessing.managers import BaseManager
import os
import random
import sys
from ask_sdk_core.skill_builder import SkillBuilder
from ask_sdk_core.dispatch_components import AbstractRequestHandler, AbstractRequestInterceptor, AbstractResponseInterceptor
from ask_sdk_core.utils import is_request_type, is_intent_name, get_slot_value_v2, get_intent_name, get_request_type
from ask_sdk_core.handler_input import HandlerInput
from ask_sdk_model import Response
from ask_sdk_core.dispatch_components import AbstractExceptionHandler
from flask_ask_sdk.skill_adapter import SkillAdapter
import asknavidrome.subsonic_api as api
import asknavidrome.media_queue as queue
import asknavidrome.controller as controller
# Create web service
app = Flask(__name__)
# Create skill object
sb = SkillBuilder()
# Setup Logging
logger = logging.getLogger() # Create logger
level = logging.getLevelName('DEBUG')
logger.setLevel(level) # Set logger log level
log_formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler = logging.StreamHandler(sys.stdout)
handler.setLevel(level)
handler.setFormatter(log_formatter)
logger.addHandler(handler)
#
# Get service configuration
#
logger.info('AskNavidrome 0.6!')
logger.debug('Getting configutration from the environment...')
try:
if 'NAVI_SKILL_ID' in os.environ:
# Set skill ID, this is available on the Alexa Developer Console
# if this is not set the web service will respond to any skill.
sb.skill_id = os.getenv('NAVI_SKILL_ID')
logger.info(f'Skill ID set to: {sb.skill_id}')
else:
raise NameError
except NameError as err:
logger.error(f'The Alexa skill ID was not found! {err}')
raise
try:
if 'NAVI_SONG_COUNT' in os.environ:
min_song_count = os.getenv('NAVI_SONG_COUNT')
logger.info(f'Minimum song count is set to: {min_song_count}')
else:
raise NameError
except NameError as err:
logger.error(f'The minimum song count was not found! {err}')
raise
try:
if 'NAVI_URL' in os.environ:
navidrome_url = os.getenv('NAVI_URL')
logger.info(f'The URL for Navidrome is set to: {navidrome_url}')
else:
raise NameError
except NameError as err:
logger.error(f'The URL of the Navidrome server was not found! {err}')
raise
try:
if 'NAVI_USER' in os.environ:
navidrome_user = os.getenv('NAVI_USER')
logger.info(f'The Navidrome user name is set to: {navidrome_user}')
else:
raise NameError
except NameError as err:
logger.error(f'The Navidrome user name was not found! {err}')
raise
try:
if 'NAVI_PASS' in os.environ:
navidrome_passwd = os.getenv('NAVI_PASS')
logger.info('The Navidrome password is set')
else:
raise NameError
except NameError as err:
logger.error(f'The Navidrome password was not found! {err}')
raise
try:
if 'NAVI_PORT' in os.environ:
navidrome_port = os.getenv('NAVI_PORT')
logger.info(f'The Navidrome port is set to: {navidrome_port}')
else:
raise NameError
except NameError as err:
logger.error(f'The Navidrome port was not found! {err}')
raise
try:
if 'NAVI_API_PATH' in os.environ:
navidrome_api_location = os.getenv('NAVI_API_PATH')
logger.info(f'The Navidrome API path is set to: {navidrome_api_location}')
else:
raise NameError
except NameError as err:
logger.error(f'The Navidrome API path was not found! {err}')
raise
try:
if 'NAVI_API_VER' in os.environ:
navidrome_api_version = os.getenv('NAVI_API_VER')
logger.info(f'The Navidrome API version is set to: {navidrome_api_version}')
else:
raise NameError
except NameError as err:
logger.error(f'The Navidrome API version was not found! {err}')
raise
logger.debug('Configuration has been successfully loaded')
# Set log level based on config value
if 'NAVI_DEBUG' in os.environ:
navidrome_log_level = int(os.getenv('NAVI_DEBUG'))
if navidrome_log_level == 0:
# Warnings and higher
logger.setLevel(logging.WARNING)
logger.warning('Log level set to WARNING')
elif navidrome_log_level == 1:
# Info messages and higher
logger.setLevel(logging.INFO)
logger.info('Log level set to INFO')
elif navidrome_log_level == 2:
# Debug with request and response interceptors
logger.setLevel(logging.DEBUG)
logger.debug('Log level set to DEBUG')
elif navidrome_log_level == 3:
# Debug with request / response interceptors and Web GUI
logger.setLevel(logging.DEBUG)
logger.debug('Log level set to DEBUG')
else:
# Invalid value provided - set to WARNING
navidrome_log_level = 0
logger.setLevel(logging.WARNING)
logger.warning('Log level set to WARNING')
# Create a sharable queue than can be updated by multiple threads to enable larger playlists
# to be returned in the back ground avoiding the Amazon 8 second timeout
BaseManager.register('MediaQueue', queue.MediaQueue)
manager = BaseManager()
manager.start()
play_queue = manager.MediaQueue()
logger.debug('MediaQueue object created...')
# Variable to store the additional thread used to populate large playlists
# this is used to avoid concurency issues if there is an attempt to load multiple playlists
# at the same time.
backgroundProcess = None
# Connect to Navidrome
connection = api.SubsonicConnection(navidrome_url,
navidrome_user,
navidrome_passwd,
navidrome_port,
navidrome_api_location,
navidrome_api_version)
try:
connection.ping()
except:
raise RuntimeError('Could not connect to SubSonic API!')
logger.info('AskNavidrome Web Service is ready to start!')
#
# Handler Classes
#
class LaunchRequestHandler(AbstractRequestHandler):
"""Handle LaunchRequest and NavigateHomeIntent"""
def can_handle(self, handler_input: HandlerInput) -> bool:
return (
is_request_type('LaunchRequest')(handler_input) or
is_intent_name('AMAZON.NavigateHomeIntent')(handler_input)
)
def handle(self, handler_input: HandlerInput) -> Response:
logger.debug('In LaunchRequestHandler')
connection.ping()
speech = 'Ready!'
handler_input.response_builder.speak(speech).ask(speech)
return handler_input.response_builder.response
class CheckAudioInterfaceHandler(AbstractRequestHandler):
"""Check if device supports audio play.
This can be used as the first handler to be checked, before invoking
other handlers, thus making the skill respond to unsupported devices
without doing much processing.
"""
def can_handle(self, handler_input: HandlerInput) -> bool:
if handler_input.request_envelope.context.system.device:
# Since skill events won't have device information
return handler_input.request_envelope.context.system.device.supported_interfaces.audio_player is None
else:
return False
def handle(self, handler_input: HandlerInput) -> Response:
logger.debug('In CheckAudioInterfaceHandler')
_ = handler_input.attributes_manager.request_attributes['_']
handler_input.response_builder.speak('This device is not supported').set_should_end_session(True)
return handler_input.response_builder.response
class SkillEventHandler(AbstractRequestHandler):
"""Close session for skill events or when session ends.
Handler to handle session end or skill events (SkillEnabled,
SkillDisabled etc.)
"""
def can_handle(self, handler_input: HandlerInput) -> bool:
return (handler_input.request_envelope.request.object_type.startswith(
'AlexaSkillEvent') or
is_request_type('SessionEndedRequest')(handler_input))
def handle(self, handler_input: HandlerInput) -> Response:
logger.debug('In SkillEventHandler')
return handler_input.response_builder.response
class HelpHandler(AbstractRequestHandler):
"""Handle HelpIntent"""
def can_handle(self, handler_input: HandlerInput) -> bool:
return is_intent_name('AMAZON.HelpIntent')(handler_input)
def handle(self, handler_input: HandlerInput) -> Response:
logger.debug('In HelpHandler')
text = 'AskNavidrome lets you interact with media servers that offer a Subsonic compatible A.P.I.'
handler_input.response_builder.speak(text)
return handler_input.response_builder.response
class NaviSonicPlayMusicByArtist(AbstractRequestHandler):
"""Handle NaviSonicPlayMusicByArtist
Play a selection of songs for the given artist
"""
def can_handle(self, handler_input: HandlerInput) -> bool:
return is_intent_name('NaviSonicPlayMusicByArtist')(handler_input)
def handle(self, handler_input: HandlerInput) -> Response:
logger.debug('In NaviSonicPlayMusicByArtist')
# Get the requested artist
artist = get_slot_value_v2(handler_input, 'artist')
# Search for an artist
artist_lookup = connection.search_artist(artist.value)
if artist_lookup is None:
text = f"I couldn't find the artist {artist.value} in the collection."
handler_input.response_builder.speak(text).ask(text)
return handler_input.response_builder.response
else:
# Get a list of albums by the artist
artist_album_lookup = connection.albums_by_artist(artist_lookup[0].get('id'))
# Build a list of songs to play
song_id_list = connection.build_song_list_from_albums(artist_album_lookup, min_song_count)
play_queue.clear()
controller.enqueue_songs(connection, play_queue, song_id_list)
speech = f'Playing music by: {artist.value}'
logger.info(speech)
card = {'title': 'AskNavidrome',
'text': speech
}
play_queue.shuffle()
track_details = play_queue.get_next_track()
return controller.start_playback('play', speech, card, track_details, handler_input)
class NaviSonicPlayAlbumByArtist(AbstractRequestHandler):
"""Handle NaviSonicPlayAlbumByArtist
Play a given album by a given artist
"""
def can_handle(self, handler_input: HandlerInput) -> bool:
return is_intent_name('NaviSonicPlayAlbumByArtist')(handler_input)
def handle(self, handler_input: HandlerInput) -> Response:
logger.debug('In NaviSonicPlayAlbumByArtist')
# Get variables from intent
artist = get_slot_value_v2(handler_input, 'artist')
album = get_slot_value_v2(handler_input, 'album')
if artist is not None and album is not None:
# Play album by artist method
logger.debug(f'Searching for the album {album.value} by {artist.value}')
# Search for an artist
artist_lookup = connection.search_artist(artist.value)
if artist_lookup is None:
text = f"I couldn't find the artist {artist.value} in the collection."
handler_input.response_builder.speak(text).ask(text)
return handler_input.response_builder.response
else:
artist_album_lookup = connection.albums_by_artist(artist_lookup[0].get('id'))
# Search the list of dictionaries for the requested album
# Strings are all converted to lower case to minimise matching errors
result = [album_result for album_result in artist_album_lookup if album_result.get('title').lower() == album.value.lower()]
if not result:
text = f"I couldn't find an album called {album.value} by {artist.value} in the collection."
handler_input.response_builder.speak(text).ask(text)
return handler_input.response_builder.response
# At this point we have found an album that matches
songs = connection.build_song_list_from_albums(result, -1)
play_queue.clear()
controller.enqueue_songs(connection, play_queue, songs)
speech = f'Playing {album.value} by: {artist.value}'
logger.info(speech)
card = {'title': 'AskNavidrome',
'text': speech
}
track_details = play_queue.get_next_track()
return controller.start_playback('play', speech, card, track_details, handler_input)
elif artist is None and album:
# Play album method
logger.debug(f'Searching for the album {album.value}')
result = connection.search_album(album.value)
if result is None:
text = f"I couldn't find the album {album.value} in the collection."
handler_input.response_builder.speak(text).ask(text)
return handler_input.response_builder.response
else:
songs = connection.build_song_list_from_albums(result, -1)
play_queue.clear()
controller.enqueue_songs(connection, play_queue, songs)
speech = f'Playing {album.value}'
logger.info(speech)
card = {'title': 'AskNavidrome',
'text': speech
}
track_details = play_queue.get_next_track()
return controller.start_playback('play', speech, card, track_details, handler_input)
class NaviSonicPlaySongByArtist(AbstractRequestHandler):
"""Handle the NaviSonicPlaySongByArtist intent
Play the given song by the given artist if it exists in the
collection.
"""
def can_handle(self, handler_input: HandlerInput) -> bool:
return is_intent_name('NaviSonicPlaySongByArtist')(handler_input)
def handle(self, handler_input: HandlerInput) -> Response:
logger.debug('In NaviSonicPlaySongByArtist')
# Get variables from intent
artist = get_slot_value_v2(handler_input, 'artist')
song = get_slot_value_v2(handler_input, 'song')
logger.debug(f'Searching for the song {song.value} by {artist.value}')
# Search for the artist
artist_lookup = connection.search_artist(artist.value)
if artist_lookup is None:
text = f"I couldn't find the artist {artist.value} in the collection."
handler_input.response_builder.speak(text).ask(text)
return handler_input.response_builder.response
else:
artist_id = artist_lookup[0].get('id')
# Search for song
song_list = connection.search_song(song.value)
# Search for song by given artist.
song_dets = [item.get('id') for item in song_list if item.get('artistId') == artist_id]
if not song_dets:
text = f"I couldn't find a song called {song.value} by {artist.value} in the collection."
handler_input.response_builder.speak(text).ask(text)
return handler_input.response_builder.response
play_queue.clear()
controller.enqueue_songs(connection, play_queue, song_dets)
speech = f'Playing {song.value} by {artist.value}'
logger.info(speech)
card = {'title': 'AskNavidrome',
'text': speech
}
track_details = play_queue.get_next_track()
return controller.start_playback('play', speech, card, track_details, handler_input)
class NaviSonicPlayPlaylist(AbstractRequestHandler):
"""Handle NaviSonicPlayPlaylist
Play the given playlist
"""
def can_handle(self, handler_input: HandlerInput) -> bool:
return is_intent_name('NaviSonicPlayPlaylist')(handler_input)
def handle(self, handler_input: HandlerInput) -> Response:
global backgroundProcess
logger.debug('In NaviSonicPlayPlaylist')
# Check if a background process is already running, if it is then terminate the process
# in favour of the new process.
if backgroundProcess != None:
backgroundProcess.terminate()
backgroundProcess.join()
# Get the requested playlist
playlist = get_slot_value_v2(handler_input, 'playlist')
# Search for a playlist
playlist_id = connection.search_playlist(playlist.value)
if playlist_id is None:
text = "I couldn't find the playlist " + str(playlist.value) + ' in the collection.'
handler_input.response_builder.speak(text).ask(text)
return handler_input.response_builder.response
else:
song_id_list = connection.build_song_list_from_playlist(playlist_id)
play_queue.clear()
# Work around the Amazon / Alexa 8 second timeout.
controller.enqueue_songs(connection, play_queue, [song_id_list[0], song_id_list[1]]) # When generating the playlist return the first two tracks.
backgroundProcess = Process(target=queueWorkerThread, args=(connection, play_queue, song_id_list[2:])) # Create a thread to enqueue the remaining tracks
backgroundProcess.start() # Start the additional thread
speech = 'Playing playlist ' + str(playlist.value)
logger.info(speech)
card = {'title': 'AskNavidrome',
'text': speech
}
track_details = play_queue.get_next_track()
return controller.start_playback('play', speech, card, track_details, handler_input)
def queueWorkerThread(connection, play_queue, song_id_list):
logger.debug('In playlist processing thread!')
controller.enqueue_songs(connection, play_queue, song_id_list)
play_queue.sync()
logger.debug('Finished playlist processing!')
class NaviSonicPlayMusicByGenre(AbstractRequestHandler):
""" Play songs from the given genere
50 tracks from the given genere are shuffled and played
"""
def can_handle(self, handler_input: HandlerInput) -> bool:
return is_intent_name('NaviSonicPlayMusicByGenre')(handler_input)
def handle(self, handler_input: HandlerInput) -> Response:
logger.debug('In NaviSonicPlayMusicByGenre')
# Get the requested genre
genre = get_slot_value_v2(handler_input, 'genre')
song_id_list = connection.build_song_list_from_genre(genre.value, min_song_count)
if song_id_list is None:
text = f"I couldn't find any {genre.value} songs in the collection."
handler_input.response_builder.speak(text).ask(text)
return handler_input.response_builder.response
else:
random.shuffle(song_id_list)
play_queue.clear()
controller.enqueue_songs(connection, play_queue, song_id_list)
speech = f'Playing {genre.value} music'
logger.info(speech)
card = {'title': 'AskNavidrome',
'text': speech
}
track_details = play_queue.get_next_track()
return controller.start_playback('play', speech, card, track_details, handler_input)
class NaviSonicPlayMusicRandom(AbstractRequestHandler):
"""Handle the NaviSonicPlayMusicRandom intent
Play a random selection of music.
"""
def can_handle(self, handler_input: HandlerInput) -> bool:
return is_intent_name('NaviSonicPlayMusicRandom')(handler_input)
def handle(self, handler_input: HandlerInput) -> Response:
logger.debug('In NaviSonicPlayMusicRandom')
song_id_list = connection.build_random_song_list(min_song_count)
if song_id_list is None:
text = "I couldn't find any songs in the collection."
handler_input.response_builder.speak(text).ask(text)
return handler_input.response_builder.response
else:
random.shuffle(song_id_list)
play_queue.clear()
controller.enqueue_songs(connection, play_queue, song_id_list)
speech = 'Playing random music'
logger.info(speech)
card = {'title': 'AskNavidrome',
'text': speech
}
track_details = play_queue.get_next_track()
return controller.start_playback('play', speech, card, track_details, handler_input)
class NaviSonicPlayFavouriteSongs(AbstractRequestHandler):
"""Handle the NaviSonicPlayFavouriteSongs intent
Play all starred / liked songs, songs are automatically shuffled.
"""
def can_handle(self, handler_input: HandlerInput) -> bool:
return is_intent_name('NaviSonicPlayFavouriteSongs')(handler_input)
def handle(self, handler_input: HandlerInput) -> Response:
logger.debug('In NaviSonicPlayFavouriteSongs')
song_id_list = connection.build_song_list_from_favourites()
if song_id_list is None:
text = "You don't have any favourite songs in the collection."
handler_input.response_builder.speak(text).ask(text)
return handler_input.response_builder.response
else:
random.shuffle(song_id_list)
play_queue.clear()
controller.enqueue_songs(connection, play_queue, song_id_list)
speech = 'Playing your favourite tracks.'
logger.info(speech)
card = {'title': 'AskNavidrome',
'text': speech
}
track_details = play_queue.get_next_track()
return controller.start_playback('play', speech, card, track_details, handler_input)
class NaviSonicRandomiseQueue(AbstractRequestHandler):
"""Handle NaviSonicRandomiseQueue Intent
Shuffle the current play queue
"""
def can_handle(self, handler_input: HandlerInput) -> bool:
return is_intent_name('NaviSonicRandomiseQueue')(handler_input)
def handle(self, handler_input: HandlerInput) -> Response:
logger.debug('In NaviSonicRandomiseQueue Handler')
play_queue.shuffle()
play_queue.sync()
return handler_input.response_builder.response
class NaviSonicSongDetails(AbstractRequestHandler):
"""Handle NaviSonicSongDetails Intent
Returns information on the track that is currently playing
"""
def can_handle(self, handler_input: HandlerInput) -> bool:
return is_intent_name('NaviSonicSongDetails')(handler_input)
def handle(self, handler_input: HandlerInput) -> Response:
logger.debug('In NaviSonicSongDetails Handler')
title = play_queue.current_track.title
artist = play_queue.current_track.artist
album = play_queue.current_track.album
text = f'This is {title} by {artist}, from the album {album}'
handler_input.response_builder.speak(text)
return handler_input.response_builder.response
class NaviSonicStarSong(AbstractRequestHandler):
"""Handle NaviSonicStarSong Intent
Star / favourite the current song
"""
def can_handle(self, handler_input: HandlerInput) -> bool:
return is_intent_name('NaviSonicStarSong')(handler_input)
def handle(self, handler_input: HandlerInput) -> Response:
logger.debug('In NaviSonicStarSong Handler')
song_id = play_queue.current_track.id
connection.star_entry(song_id, 'song')
return handler_input.response_builder.response
class NaviSonicUnstarSong(AbstractRequestHandler):
"""Handle NaviSonicUnstarSong Intent
Star / favourite the current song
"""
def can_handle(self, handler_input: HandlerInput) -> bool:
return is_intent_name('NaviSonicUnstarSong')(handler_input)
def handle(self, handler_input: HandlerInput) -> Response:
logger.debug('In NaviSonicUnstarSong Handler')
song_id = play_queue.current_track.id
connection.star_entry(song_id, 'song')
connection.unstar_entry(song_id, 'song')
return handler_input.response_builder.response
#
# AudioPlayer Handlers
#
class PlaybackStartedHandler(AbstractRequestHandler):
"""AudioPlayer.PlaybackStarted Directive received.
Confirming that the requested audio file began playing.
Do not send any specific response.
"""
def can_handle(self, handler_input: HandlerInput) -> bool:
return is_request_type('AudioPlayer.PlaybackStarted')(handler_input)
def handle(self, handler_input: HandlerInput) -> Response:
logger.debug('In PlaybackStartedHandler')
logger.info('Playback started')
return handler_input.response_builder.response
class PlaybackStoppedHandler(AbstractRequestHandler):
"""AudioPlayer.PlaybackStopped Directive received.
Confirming that the requested audio file stopped playing.
Do not send any specific response.
"""
def can_handle(self, handler_input: HandlerInput) -> bool:
return is_request_type('AudioPlayer.PlaybackStopped')(handler_input)
def handle(self, handler_input: HandlerInput) -> Response:
logger.debug('In PlaybackStoppedHandler')
# store the current offset for later resumption
play_queue.current_track.offset = handler_input.request_envelope.request.offset_in_milliseconds
logger.debug(f'Stored track offset of: {play_queue.current_track.offset} ms for {play_queue.current_track.title}')
logger.info('Playback stopped')
return handler_input.response_builder.response
class PlaybackNearlyFinishedHandler(AbstractRequestHandler):
"""AudioPlayer.PlaybackNearlyFinished Directive received.
Replacing queue with the URL again. This should not happen on live streams.
"""
def can_handle(self, handler_input: HandlerInput) -> bool:
return is_request_type('AudioPlayer.PlaybackNearlyFinished')(handler_input)
def handle(self, handler_input: HandlerInput) -> Response:
logger.debug('In PlaybackNearlyFinishedHandler')
logger.info('Queuing next track...')
track_details = play_queue.enqueue_next_track()
return controller.start_playback('continue', None, None, track_details, handler_input)
class PlaybackFinishedHandler(AbstractRequestHandler):
"""AudioPlayer.PlaybackFinished Directive received.
Confirming that the requested audio file completed playing.
Do not send any specific response.
"""
def can_handle(self, handler_input: HandlerInput) -> bool:
return is_request_type('AudioPlayer.PlaybackFinished')(handler_input)
def handle(self, handler_input: HandlerInput) -> Response:
logger.debug('In PlaybackFinishedHandler')
play_queue.get_next_track()
return handler_input.response_builder.response
class PausePlaybackHandler(AbstractRequestHandler):
"""Handler for stopping audio.
Handles Stop, Cancel and Pause Intents and PauseCommandIssued event.
"""
def can_handle(self, handler_input: HandlerInput) -> bool:
return (is_intent_name('AMAZON.StopIntent')(handler_input) or
is_intent_name('AMAZON.CancelIntent')(handler_input) or
is_intent_name('AMAZON.PauseIntent')(handler_input))
def handle(self, handler_input: HandlerInput) -> Response:
logger.debug('In PausePlaybackHandler')
play_queue.sync()
return controller.stop(handler_input)
class ResumePlaybackHandler(AbstractRequestHandler):
"""Handler for resuming audio on different events.
Handles PlayAudio Intent, Resume Intent.
"""
def can_handle(self, handler_input: HandlerInput) -> bool:
return (is_intent_name('AMAZON.ResumeIntent')(handler_input) or
is_intent_name('PlayAudio')(handler_input))
def handle(self, handler_input: HandlerInput) -> Response:
logger.debug('In ResumePlaybackHandler')
if play_queue.current_track.offset > 0:
# There is a paused track, continue
logger.info('Resuming ' + str(play_queue.current_track.title))
logger.info('Offset ' + str(play_queue.current_track.offset))
return controller.start_playback('play', None, None, play_queue.current_track, handler_input)
elif play_queue.get_queue_count() > 0 and play_queue.current_track.offset == 0:
# No paused tracks but tracks in queue
logger.info('Resuming - There was no paused track, getting next track from queue')
track_details = play_queue.get_next_track()
return controller.start_playback('play', None, None, track_details, handler_input)
class NextPlaybackHandler(AbstractRequestHandler):
"""Handle NextIntent"""
def can_handle(self, handler_input: HandlerInput) -> bool:
return is_intent_name('AMAZON.NextIntent')(handler_input)
def handle(self, handler_input: HandlerInput) -> Response:
logger.debug('In NextPlaybackHandler')
track_details = play_queue.get_next_track()
# Set the offset to 0 as we are skipping we want to start at the beginning
track_details.offset = 0
return controller.start_playback('play', None, None, track_details, handler_input)
class PreviousPlaybackHandler(AbstractRequestHandler):
"""Handle PreviousIntent"""
def can_handle(self, handler_input: HandlerInput) -> bool:
return is_intent_name('AMAZON.PreviousIntent')(handler_input)
def handle(self, handler_input: HandlerInput) -> Response:
logger.debug('In PreviousPlaybackHandler')
track_details = play_queue.get_prevous_track()
# Set the offset to 0 as we are skipping we want to start at the beginning
track_details.offset = 0
return controller.start_playback('play', None, None, track_details, handler_input)
class PlaybackFailedEventHandler(AbstractRequestHandler):
"""AudioPlayer.PlaybackFailed Directive received.
Logging the error and restarting playing with no output speech.
"""
def can_handle(self, handler_input: HandlerInput) -> bool:
return is_request_type('AudioPlayer.PlaybackFailed')(handler_input)
def handle(self, handler_input: HandlerInput) -> Response:
logger.debug('In PlaybackFailedHandler')
song_id = play_queue.current_track.id
# Log failure and track ID
logger.error(f'Playback Failed: {handler_input.request_envelope.request.error}')
logger.error(f'Failed playing track with ID: {song_id}')
# Skip to the next track instead of stopping
track_details = play_queue.get_next_track()
# Set the offset to 0 as we are skipping we want to start at the beginning
track_details.offset = 0
return controller.start_playback('play', None, None, track_details, handler_input)
#
# Exception Handers
#
class SystemExceptionHandler(AbstractExceptionHandler):
"""Handle System.ExceptionEncountered
Handles exceptions and prints error information
in the log
"""
def can_handle(self, handler_input: HandlerInput, exception: Exception) -> bool:
return is_request_type('System.ExceptionEncountered')(handler_input)
def handle(self, handler_input: HandlerInput, exception: Exception) -> Response:
logger.debug('In SystemExceptionHandler')
# Log the exception
logger.error(f'System Exception: {exception}')
logger.error(f'Request Type Was: {get_request_type(handler_input)}')
error = handler_input.request_envelope.request.to_dict()
logger.error(f"Details: {error.get('error').get('message')}")
if get_request_type(handler_input) == 'IntentRequest':
logger.error(f'Intent Name Was: {get_intent_name(handler_input)}')
speech = "Sorry, I didn't get that. Can you please say it again!!"
handler_input.response_builder.speak(speech).ask(speech)
return handler_input.response_builder.response
class GeneralExceptionHandler(AbstractExceptionHandler):
"""Handle general exceptions
Handles exceptions and prints error information
in the log
"""
def can_handle(self, handler_input: HandlerInput, exception: Exception) -> bool:
return True
def handle(self, handler_input: HandlerInput, exception: Exception) -> Response:
logger.debug('In GeneralExceptionHandler')
# Log the exception
logger.error(f'General Exception: {exception}')
logger.error(f'Request Type Was: {get_request_type(handler_input)}')
if get_request_type(handler_input) == 'IntentRequest':
logger.error(f'Intent Name Was: {get_intent_name(handler_input)}')
speech = "Sorry, I didn't get that. Can you please say it again!!"
handler_input.response_builder.speak(speech).ask(speech)
return handler_input.response_builder.response
#
# Request Interceptors
#
class LoggingRequestInterceptor(AbstractRequestInterceptor):
"""Intercept all requests
Intercepts all requests sent to the skill and prints them in the log
"""
def process(self, handler_input: HandlerInput):
logger.debug(f'Request received: {handler_input.request_envelope.request}')
class LoggingResponseInterceptor(AbstractResponseInterceptor):
"""Intercept all responses
Intercepts all responses sent from the skill and prints them in the log
"""
def process(self, handler_input: HandlerInput, response: Response):
logger.debug(f'Response sent: {response}')
# Register Intent Handlers
sb.add_request_handler(LaunchRequestHandler())
sb.add_request_handler(CheckAudioInterfaceHandler())
sb.add_request_handler(SkillEventHandler())
sb.add_request_handler(HelpHandler())
sb.add_request_handler(NaviSonicPlayMusicByArtist())
sb.add_request_handler(NaviSonicPlayAlbumByArtist())
sb.add_request_handler(NaviSonicPlaySongByArtist())
sb.add_request_handler(NaviSonicPlayPlaylist())
sb.add_request_handler(NaviSonicPlayFavouriteSongs())
sb.add_request_handler(NaviSonicPlayMusicByGenre())
sb.add_request_handler(NaviSonicPlayMusicRandom())
sb.add_request_handler(NaviSonicRandomiseQueue())
sb.add_request_handler(NaviSonicSongDetails())
sb.add_request_handler(NaviSonicStarSong())
sb.add_request_handler(NaviSonicUnstarSong())
# Register AutoPlayer Handlers
sb.add_request_handler(PlaybackStartedHandler())
sb.add_request_handler(PlaybackStoppedHandler())
sb.add_request_handler(PlaybackNearlyFinishedHandler())
sb.add_request_handler(PlaybackFinishedHandler())
sb.add_request_handler(PausePlaybackHandler())
sb.add_request_handler(NextPlaybackHandler())
sb.add_request_handler(PreviousPlaybackHandler())
sb.add_request_handler(ResumePlaybackHandler())
sb.add_request_handler(PlaybackFailedEventHandler())
# Register Exception Handlers
sb.add_exception_handler(SystemExceptionHandler())
sb.add_exception_handler(GeneralExceptionHandler())
if navidrome_log_level >= 2:
# Register Interceptors (log all requests)
sb.add_global_request_interceptor(LoggingRequestInterceptor())
sb.add_global_response_interceptor(LoggingResponseInterceptor())
sa = SkillAdapter(skill=sb.create(), skill_id='test', app=app)
sa.register(app=app, route='/')
# Enable queue and history diagnostics
if navidrome_log_level == 3:
logger.warning('AskNavidrome debugging has been enabled, this should only be used when testing!')
logger.warning('The /buffer, /queue and /history http endpoints are available publicly!')
@app.route('/queue')
def view_queue():
"""View the contents of play_queue.queue
Creates a tabulated page contining the contents of the play_queue.queue deque.
"""
return render_template('table.html', title='AskNavidrome - Queued Tracks',
tracks=play_queue.queue, current=play_queue.current_track)
@app.route('/history')
def view_history():
"""View the contents of play_queue.history
Creates a tabulated page contining the contents of the play_queue.history deque.
"""
return render_template('table.html', title='AskNavidrome - Track History',
tracks=play_queue.history, current=play_queue.current_track)
@app.route('/buffer')
def view_buffer():
"""View the contents of play_queue.buffer
Creates a tabulated page contining the contents of the play_queue.buffer deque.
"""
return render_template('table.html', title='AskNavidrome - Buffered Tracks',
tracks=play_queue.buffer, current=play_queue.current_track)
# Run web app by default when file is executed.
if __name__ == '__main__':
# Start the web service
app.run(host='0.0.0.0')