1030 lines
35 KiB
Python
Executable File
1030 lines
35 KiB
Python
Executable File
from flask import Flask, render_template
|
|
import logging
|
|
import os
|
|
import random
|
|
import sys
|
|
|
|
from ask_sdk_core.skill_builder import SkillBuilder
|
|
from ask_sdk_core.dispatch_components import AbstractRequestHandler, AbstractRequestInterceptor, AbstractResponseInterceptor
|
|
from ask_sdk_core.utils import is_request_type, is_intent_name, get_slot_value_v2, get_intent_name, get_request_type
|
|
from ask_sdk_core.handler_input import HandlerInput
|
|
from ask_sdk_model import Response
|
|
from ask_sdk_core.dispatch_components import AbstractExceptionHandler
|
|
from flask_ask_sdk.skill_adapter import SkillAdapter
|
|
|
|
import asknavidrome.subsonic_api as api
|
|
import asknavidrome.media_queue as queue
|
|
import asknavidrome.controller as controller
|
|
|
|
# Create web service
|
|
app = Flask(__name__)
|
|
|
|
# Create skill object
|
|
sb = SkillBuilder()
|
|
|
|
# Setup Logging
|
|
logger = logging.getLogger() # Create logger
|
|
level = logging.getLevelName('DEBUG')
|
|
logger.setLevel(level) # Set logger log level
|
|
|
|
log_formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
|
|
|
handler = logging.StreamHandler(sys.stdout)
|
|
handler.setLevel(level)
|
|
handler.setFormatter(log_formatter)
|
|
|
|
logger.addHandler(handler)
|
|
|
|
#
|
|
# Get service configuration
|
|
#
|
|
|
|
logger.info('AskNavidrome 0.6!')
|
|
logger.debug('Getting configutration from the environment...')
|
|
|
|
try:
|
|
if 'NAVI_SKILL_ID' in os.environ:
|
|
# Set skill ID, this is available on the Alexa Developer Console
|
|
# if this is not set the web service will respond to any skill.
|
|
sb.skill_id = os.getenv('NAVI_SKILL_ID')
|
|
|
|
logger.info(f'Skill ID set to: {sb.skill_id}')
|
|
|
|
else:
|
|
raise NameError
|
|
except NameError as err:
|
|
logger.error(f'The Alexa skill ID was not found! {err}')
|
|
raise
|
|
|
|
try:
|
|
if 'NAVI_SONG_COUNT' in os.environ:
|
|
min_song_count = os.getenv('NAVI_SONG_COUNT')
|
|
|
|
logger.info(f'Minimum song count is set to: {min_song_count}')
|
|
|
|
else:
|
|
raise NameError
|
|
except NameError as err:
|
|
logger.error(f'The minimum song count was not found! {err}')
|
|
raise
|
|
|
|
try:
|
|
if 'NAVI_URL' in os.environ:
|
|
navidrome_url = os.getenv('NAVI_URL')
|
|
|
|
logger.info(f'The URL for Navidrome is set to: {navidrome_url}')
|
|
|
|
else:
|
|
raise NameError
|
|
except NameError as err:
|
|
logger.error(f'The URL of the Navidrome server was not found! {err}')
|
|
raise
|
|
|
|
try:
|
|
if 'NAVI_USER' in os.environ:
|
|
navidrome_user = os.getenv('NAVI_USER')
|
|
|
|
logger.info(f'The Navidrome user name is set to: {navidrome_user}')
|
|
|
|
else:
|
|
raise NameError
|
|
except NameError as err:
|
|
logger.error(f'The Navidrome user name was not found! {err}')
|
|
raise
|
|
|
|
try:
|
|
if 'NAVI_PASS' in os.environ:
|
|
navidrome_passwd = os.getenv('NAVI_PASS')
|
|
|
|
logger.info('The Navidrome password is set')
|
|
|
|
else:
|
|
raise NameError
|
|
except NameError as err:
|
|
logger.error(f'The Navidrome password was not found! {err}')
|
|
raise
|
|
|
|
try:
|
|
if 'NAVI_PORT' in os.environ:
|
|
navidrome_port = os.getenv('NAVI_PORT')
|
|
|
|
logger.info(f'The Navidrome port is set to: {navidrome_port}')
|
|
|
|
else:
|
|
raise NameError
|
|
except NameError as err:
|
|
logger.error(f'The Navidrome port was not found! {err}')
|
|
raise
|
|
|
|
try:
|
|
if 'NAVI_API_PATH' in os.environ:
|
|
navidrome_api_location = os.getenv('NAVI_API_PATH')
|
|
|
|
logger.info(f'The Navidrome API path is set to: {navidrome_api_location}')
|
|
|
|
else:
|
|
raise NameError
|
|
except NameError as err:
|
|
logger.error(f'The Navidrome API path was not found! {err}')
|
|
raise
|
|
|
|
try:
|
|
if 'NAVI_API_VER' in os.environ:
|
|
navidrome_api_version = os.getenv('NAVI_API_VER')
|
|
|
|
logger.info(f'The Navidrome API version is set to: {navidrome_api_version}')
|
|
|
|
else:
|
|
raise NameError
|
|
except NameError as err:
|
|
logger.error(f'The Navidrome API version was not found! {err}')
|
|
raise
|
|
|
|
logger.debug('Configuration has been successfully loaded')
|
|
|
|
# Set log level based on config value
|
|
if 'NAVI_DEBUG' in os.environ:
|
|
navidrome_log_level = int(os.getenv('NAVI_DEBUG'))
|
|
|
|
if navidrome_log_level == 0:
|
|
# Warnings and higher
|
|
logger.setLevel(logging.WARNING)
|
|
logger.warning('Log level set to WARNING')
|
|
|
|
elif navidrome_log_level == 1:
|
|
# Info messages and higher
|
|
logger.setLevel(logging.INFO)
|
|
logger.info('Log level set to INFO')
|
|
|
|
elif navidrome_log_level == 2:
|
|
# Debug with request and response interceptors
|
|
logger.setLevel(logging.DEBUG)
|
|
logger.debug('Log level set to DEBUG')
|
|
|
|
elif navidrome_log_level == 3:
|
|
# Debug with request / response interceptors and Web GUI
|
|
logger.setLevel(logging.DEBUG)
|
|
logger.debug('Log level set to DEBUG')
|
|
|
|
else:
|
|
# Invalid value provided - set to WARNING
|
|
navidrome_log_level = 0
|
|
logger.setLevel(logging.WARNING)
|
|
logger.warning('Log level set to WARNING')
|
|
|
|
# Create a queue
|
|
play_queue = queue.MediaQueue()
|
|
logger.debug('MediaQueue object created...')
|
|
|
|
# Connect to Navidrome
|
|
connection = api.SubsonicConnection(navidrome_url,
|
|
navidrome_user,
|
|
navidrome_passwd,
|
|
navidrome_port,
|
|
navidrome_api_location,
|
|
navidrome_api_version)
|
|
|
|
try:
|
|
connection.ping()
|
|
|
|
except:
|
|
raise RuntimeError('Could not connect to SubSonic API!')
|
|
|
|
logger.info('AskNavidrome Web Service is ready to start!')
|
|
|
|
|
|
#
|
|
# Handler Classes
|
|
#
|
|
|
|
class LaunchRequestHandler(AbstractRequestHandler):
|
|
"""Handle LaunchRequest and NavigateHomeIntent"""
|
|
|
|
def can_handle(self, handler_input: HandlerInput) -> bool:
|
|
return (
|
|
is_request_type('LaunchRequest')(handler_input) or
|
|
is_intent_name('AMAZON.NavigateHomeIntent')(handler_input)
|
|
)
|
|
|
|
def handle(self, handler_input: HandlerInput) -> Response:
|
|
logger.debug('In LaunchRequestHandler')
|
|
|
|
connection.ping()
|
|
speech = 'Ready!'
|
|
|
|
handler_input.response_builder.speak(speech).ask(speech)
|
|
return handler_input.response_builder.response
|
|
|
|
|
|
class CheckAudioInterfaceHandler(AbstractRequestHandler):
|
|
"""Check if device supports audio play.
|
|
|
|
This can be used as the first handler to be checked, before invoking
|
|
other handlers, thus making the skill respond to unsupported devices
|
|
without doing much processing.
|
|
"""
|
|
|
|
def can_handle(self, handler_input: HandlerInput) -> bool:
|
|
if handler_input.request_envelope.context.system.device:
|
|
# Since skill events won't have device information
|
|
return handler_input.request_envelope.context.system.device.supported_interfaces.audio_player is None
|
|
else:
|
|
return False
|
|
|
|
def handle(self, handler_input: HandlerInput) -> Response:
|
|
logger.debug('In CheckAudioInterfaceHandler')
|
|
|
|
_ = handler_input.attributes_manager.request_attributes['_']
|
|
handler_input.response_builder.speak('This device is not supported').set_should_end_session(True)
|
|
|
|
return handler_input.response_builder.response
|
|
|
|
|
|
class SkillEventHandler(AbstractRequestHandler):
|
|
"""Close session for skill events or when session ends.
|
|
|
|
Handler to handle session end or skill events (SkillEnabled,
|
|
SkillDisabled etc.)
|
|
"""
|
|
|
|
def can_handle(self, handler_input: HandlerInput) -> bool:
|
|
return (handler_input.request_envelope.request.object_type.startswith(
|
|
'AlexaSkillEvent') or
|
|
is_request_type('SessionEndedRequest')(handler_input))
|
|
|
|
def handle(self, handler_input: HandlerInput) -> Response:
|
|
logger.debug('In SkillEventHandler')
|
|
|
|
return handler_input.response_builder.response
|
|
|
|
|
|
class HelpHandler(AbstractRequestHandler):
|
|
"""Handle HelpIntent"""
|
|
|
|
def can_handle(self, handler_input: HandlerInput) -> bool:
|
|
return is_intent_name('AMAZON.HelpIntent')(handler_input)
|
|
|
|
def handle(self, handler_input: HandlerInput) -> Response:
|
|
logger.debug('In HelpHandler')
|
|
|
|
text = 'AskNavidrome lets you interact with media servers that offer a Subsonic compatible A.P.I.'
|
|
handler_input.response_builder.speak(text)
|
|
|
|
return handler_input.response_builder.response
|
|
|
|
|
|
class NaviSonicPlayMusicByArtist(AbstractRequestHandler):
|
|
"""Handle NaviSonicPlayMusicByArtist
|
|
|
|
Play a selection of songs for the given artist
|
|
"""
|
|
|
|
def can_handle(self, handler_input: HandlerInput) -> bool:
|
|
return is_intent_name('NaviSonicPlayMusicByArtist')(handler_input)
|
|
|
|
def handle(self, handler_input: HandlerInput) -> Response:
|
|
logger.debug('In NaviSonicPlayMusicByArtist')
|
|
|
|
# Get the requested artist
|
|
artist = get_slot_value_v2(handler_input, 'artist')
|
|
|
|
# Search for an artist
|
|
artist_lookup = connection.search_artist(artist.value)
|
|
|
|
if artist_lookup is None:
|
|
text = f"I couldn't find the artist {artist.value} in the collection."
|
|
handler_input.response_builder.speak(text).ask(text)
|
|
|
|
return handler_input.response_builder.response
|
|
|
|
else:
|
|
# Get a list of albums by the artist
|
|
artist_album_lookup = connection.albums_by_artist(artist_lookup[0].get('id'))
|
|
|
|
# Build a list of songs to play
|
|
song_id_list = connection.build_song_list_from_albums(artist_album_lookup, min_song_count)
|
|
play_queue.clear()
|
|
controller.enqueue_songs(connection, play_queue, song_id_list)
|
|
speech = f'Playing music by: {artist.value}'
|
|
logger.info(speech)
|
|
|
|
card = {'title': 'AskNavidrome',
|
|
'text': speech
|
|
}
|
|
|
|
play_queue.shuffle()
|
|
track_details = play_queue.get_next_track()
|
|
return controller.start_playback('play', speech, card, track_details, handler_input)
|
|
|
|
|
|
class NaviSonicPlayAlbumByArtist(AbstractRequestHandler):
|
|
"""Handle NaviSonicPlayAlbumByArtist
|
|
|
|
Play a given album by a given artist
|
|
"""
|
|
|
|
def can_handle(self, handler_input: HandlerInput) -> bool:
|
|
return is_intent_name('NaviSonicPlayAlbumByArtist')(handler_input)
|
|
|
|
def handle(self, handler_input: HandlerInput) -> Response:
|
|
logger.debug('In NaviSonicPlayAlbumByArtist')
|
|
|
|
# Get variables from intent
|
|
artist = get_slot_value_v2(handler_input, 'artist')
|
|
album = get_slot_value_v2(handler_input, 'album')
|
|
|
|
if artist is not None and album is not None:
|
|
# Play album by artist method
|
|
logger.debug(f'Searching for the album {album.value} by {artist.value}')
|
|
|
|
# Search for an artist
|
|
artist_lookup = connection.search_artist(artist.value)
|
|
|
|
if artist_lookup is None:
|
|
text = f"I couldn't find the artist {artist.value} in the collection."
|
|
handler_input.response_builder.speak(text).ask(text)
|
|
|
|
return handler_input.response_builder.response
|
|
|
|
else:
|
|
artist_album_lookup = connection.albums_by_artist(artist_lookup[0].get('id'))
|
|
|
|
# Search the list of dictionaries for the requested album
|
|
# Strings are all converted to lower case to minimise matching errors
|
|
result = [album_result for album_result in artist_album_lookup if album_result.get('title').lower() == album.value.lower()]
|
|
|
|
if not result:
|
|
text = f"I couldn't find an album called {album.value} by {artist.value} in the collection."
|
|
handler_input.response_builder.speak(text).ask(text)
|
|
|
|
return handler_input.response_builder.response
|
|
|
|
# At this point we have found an album that matches
|
|
songs = connection.build_song_list_from_albums(result, -1)
|
|
play_queue.clear()
|
|
controller.enqueue_songs(connection, play_queue, songs)
|
|
|
|
speech = f'Playing {album.value} by: {artist.value}'
|
|
logger.info(speech)
|
|
card = {'title': 'AskNavidrome',
|
|
'text': speech
|
|
}
|
|
track_details = play_queue.get_next_track()
|
|
|
|
return controller.start_playback('play', speech, card, track_details, handler_input)
|
|
|
|
elif artist is None and album:
|
|
# Play album method
|
|
logger.debug(f'Searching for the album {album.value}')
|
|
|
|
result = connection.search_album(album.value)
|
|
|
|
if result is None:
|
|
text = f"I couldn't find the album {album.value} in the collection."
|
|
handler_input.response_builder.speak(text).ask(text)
|
|
|
|
return handler_input.response_builder.response
|
|
|
|
else:
|
|
songs = connection.build_song_list_from_albums(result, -1)
|
|
play_queue.clear()
|
|
controller.enqueue_songs(connection, play_queue, songs)
|
|
|
|
speech = f'Playing {album.value}'
|
|
logger.info(speech)
|
|
card = {'title': 'AskNavidrome',
|
|
'text': speech
|
|
}
|
|
track_details = play_queue.get_next_track()
|
|
|
|
return controller.start_playback('play', speech, card, track_details, handler_input)
|
|
|
|
|
|
class NaviSonicPlaySongByArtist(AbstractRequestHandler):
|
|
"""Handle the NaviSonicPlaySongByArtist intent
|
|
|
|
Play the given song by the given artist if it exists in the
|
|
collection.
|
|
"""
|
|
|
|
def can_handle(self, handler_input: HandlerInput) -> bool:
|
|
return is_intent_name('NaviSonicPlaySongByArtist')(handler_input)
|
|
|
|
def handle(self, handler_input: HandlerInput) -> Response:
|
|
logger.debug('In NaviSonicPlaySongByArtist')
|
|
|
|
# Get variables from intent
|
|
artist = get_slot_value_v2(handler_input, 'artist')
|
|
song = get_slot_value_v2(handler_input, 'song')
|
|
|
|
logger.debug(f'Searching for the song {song.value} by {artist.value}')
|
|
|
|
# Search for the artist
|
|
artist_lookup = connection.search_artist(artist.value)
|
|
|
|
if artist_lookup is None:
|
|
text = f"I couldn't find the artist {artist.value} in the collection."
|
|
handler_input.response_builder.speak(text).ask(text)
|
|
|
|
return handler_input.response_builder.response
|
|
|
|
else:
|
|
artist_id = artist_lookup[0].get('id')
|
|
|
|
# Search for song
|
|
song_list = connection.search_song(song.value)
|
|
|
|
# Search for song by given artist.
|
|
song_dets = [item.get('id') for item in song_list if item.get('artistId') == artist_id]
|
|
|
|
if not song_dets:
|
|
text = f"I couldn't find a song called {song.value} by {artist.value} in the collection."
|
|
handler_input.response_builder.speak(text).ask(text)
|
|
|
|
return handler_input.response_builder.response
|
|
|
|
play_queue.clear()
|
|
controller.enqueue_songs(connection, play_queue, song_dets)
|
|
|
|
speech = f'Playing {song.value} by {artist.value}'
|
|
logger.info(speech)
|
|
card = {'title': 'AskNavidrome',
|
|
'text': speech
|
|
}
|
|
track_details = play_queue.get_next_track()
|
|
|
|
return controller.start_playback('play', speech, card, track_details, handler_input)
|
|
|
|
|
|
class NaviSonicPlayPlaylist(AbstractRequestHandler):
|
|
"""Handle NaviSonicPlayPlaylist
|
|
|
|
Play the given playlist
|
|
"""
|
|
|
|
def can_handle(self, handler_input: HandlerInput) -> bool:
|
|
return is_intent_name('NaviSonicPlayPlaylist')(handler_input)
|
|
|
|
def handle(self, handler_input: HandlerInput) -> Response:
|
|
logger.debug('In NaviSonicPlayPlaylist')
|
|
|
|
# Get the requested playlist
|
|
playlist = get_slot_value_v2(handler_input, 'playlist')
|
|
|
|
# Search for a playlist
|
|
playlist_id = connection.search_playlist(playlist.value)
|
|
|
|
if playlist_id is None:
|
|
text = "I couldn't find the playlist " + str(playlist.value) + ' in the collection.'
|
|
handler_input.response_builder.speak(text).ask(text)
|
|
|
|
return handler_input.response_builder.response
|
|
|
|
else:
|
|
song_id_list = connection.build_song_list_from_playlist(playlist_id)
|
|
play_queue.clear()
|
|
controller.enqueue_songs(connection, play_queue, song_id_list)
|
|
|
|
speech = 'Playing playlist ' + str(playlist.value)
|
|
logger.info(speech)
|
|
card = {'title': 'AskNavidrome',
|
|
'text': speech
|
|
}
|
|
track_details = play_queue.get_next_track()
|
|
|
|
return controller.start_playback('play', speech, card, track_details, handler_input)
|
|
|
|
|
|
class NaviSonicPlayMusicByGenre(AbstractRequestHandler):
|
|
""" Play songs from the given genere
|
|
|
|
50 tracks from the given genere are shuffled and played
|
|
"""
|
|
|
|
def can_handle(self, handler_input: HandlerInput) -> bool:
|
|
return is_intent_name('NaviSonicPlayMusicByGenre')(handler_input)
|
|
|
|
def handle(self, handler_input: HandlerInput) -> Response:
|
|
logger.debug('In NaviSonicPlayMusicByGenre')
|
|
|
|
# Get the requested genre
|
|
genre = get_slot_value_v2(handler_input, 'genre')
|
|
|
|
song_id_list = connection.build_song_list_from_genre(genre.value, min_song_count)
|
|
|
|
if song_id_list is None:
|
|
text = f"I couldn't find any {genre.value} songs in the collection."
|
|
handler_input.response_builder.speak(text).ask(text)
|
|
|
|
return handler_input.response_builder.response
|
|
|
|
else:
|
|
random.shuffle(song_id_list)
|
|
play_queue.clear()
|
|
controller.enqueue_songs(connection, play_queue, song_id_list)
|
|
|
|
speech = f'Playing {genre.value} music'
|
|
logger.info(speech)
|
|
card = {'title': 'AskNavidrome',
|
|
'text': speech
|
|
}
|
|
track_details = play_queue.get_next_track()
|
|
|
|
return controller.start_playback('play', speech, card, track_details, handler_input)
|
|
|
|
|
|
class NaviSonicPlayMusicRandom(AbstractRequestHandler):
|
|
"""Handle the NaviSonicPlayMusicRandom intent
|
|
|
|
Play a random selection of music.
|
|
"""
|
|
|
|
def can_handle(self, handler_input: HandlerInput) -> bool:
|
|
return is_intent_name('NaviSonicPlayMusicRandom')(handler_input)
|
|
|
|
def handle(self, handler_input: HandlerInput) -> Response:
|
|
logger.debug('In NaviSonicPlayMusicRandom')
|
|
|
|
song_id_list = connection.build_random_song_list(min_song_count)
|
|
|
|
if song_id_list is None:
|
|
text = "I couldn't find any songs in the collection."
|
|
handler_input.response_builder.speak(text).ask(text)
|
|
|
|
return handler_input.response_builder.response
|
|
|
|
else:
|
|
random.shuffle(song_id_list)
|
|
play_queue.clear()
|
|
controller.enqueue_songs(connection, play_queue, song_id_list)
|
|
|
|
speech = 'Playing random music'
|
|
logger.info(speech)
|
|
card = {'title': 'AskNavidrome',
|
|
'text': speech
|
|
}
|
|
track_details = play_queue.get_next_track()
|
|
|
|
return controller.start_playback('play', speech, card, track_details, handler_input)
|
|
|
|
|
|
class NaviSonicPlayFavouriteSongs(AbstractRequestHandler):
|
|
"""Handle the NaviSonicPlayFavouriteSongs intent
|
|
|
|
Play all starred / liked songs, songs are automatically shuffled.
|
|
"""
|
|
|
|
def can_handle(self, handler_input: HandlerInput) -> bool:
|
|
return is_intent_name('NaviSonicPlayFavouriteSongs')(handler_input)
|
|
|
|
def handle(self, handler_input: HandlerInput) -> Response:
|
|
logger.debug('In NaviSonicPlayFavouriteSongs')
|
|
|
|
song_id_list = connection.build_song_list_from_favourites()
|
|
|
|
if song_id_list is None:
|
|
text = "You don't have any favourite songs in the collection."
|
|
handler_input.response_builder.speak(text).ask(text)
|
|
|
|
return handler_input.response_builder.response
|
|
|
|
else:
|
|
random.shuffle(song_id_list)
|
|
play_queue.clear()
|
|
controller.enqueue_songs(connection, play_queue, song_id_list)
|
|
|
|
speech = 'Playing your favourite tracks.'
|
|
logger.info(speech)
|
|
card = {'title': 'AskNavidrome',
|
|
'text': speech
|
|
}
|
|
track_details = play_queue.get_next_track()
|
|
|
|
return controller.start_playback('play', speech, card, track_details, handler_input)
|
|
|
|
|
|
class NaviSonicRandomiseQueue(AbstractRequestHandler):
|
|
"""Handle NaviSonicRandomiseQueue Intent
|
|
|
|
Shuffle the current play queue
|
|
"""
|
|
|
|
def can_handle(self, handler_input: HandlerInput) -> bool:
|
|
return is_intent_name('NaviSonicRandomiseQueue')(handler_input)
|
|
|
|
def handle(self, handler_input: HandlerInput) -> Response:
|
|
logger.debug('In NaviSonicRandomiseQueue Handler')
|
|
|
|
play_queue.shuffle()
|
|
play_queue.sync()
|
|
|
|
return handler_input.response_builder.response
|
|
|
|
|
|
class NaviSonicSongDetails(AbstractRequestHandler):
|
|
"""Handle NaviSonicSongDetails Intent
|
|
|
|
Returns information on the track that is currently playing
|
|
"""
|
|
|
|
def can_handle(self, handler_input: HandlerInput) -> bool:
|
|
return is_intent_name('NaviSonicSongDetails')(handler_input)
|
|
|
|
def handle(self, handler_input: HandlerInput) -> Response:
|
|
logger.debug('In NaviSonicSongDetails Handler')
|
|
|
|
title = play_queue.current_track.title
|
|
artist = play_queue.current_track.artist
|
|
album = play_queue.current_track.album
|
|
|
|
text = f'This is {title} by {artist}, from the album {album}'
|
|
handler_input.response_builder.speak(text)
|
|
|
|
return handler_input.response_builder.response
|
|
|
|
|
|
class NaviSonicStarSong(AbstractRequestHandler):
|
|
"""Handle NaviSonicStarSong Intent
|
|
|
|
Star / favourite the current song
|
|
"""
|
|
|
|
def can_handle(self, handler_input: HandlerInput) -> bool:
|
|
return is_intent_name('NaviSonicStarSong')(handler_input)
|
|
|
|
def handle(self, handler_input: HandlerInput) -> Response:
|
|
logger.debug('In NaviSonicStarSong Handler')
|
|
|
|
song_id = play_queue.current_track.id
|
|
connection.star_entry(song_id, 'song')
|
|
|
|
return handler_input.response_builder.response
|
|
|
|
|
|
class NaviSonicUnstarSong(AbstractRequestHandler):
|
|
"""Handle NaviSonicUnstarSong Intent
|
|
|
|
Star / favourite the current song
|
|
"""
|
|
|
|
def can_handle(self, handler_input: HandlerInput) -> bool:
|
|
return is_intent_name('NaviSonicUnstarSong')(handler_input)
|
|
|
|
def handle(self, handler_input: HandlerInput) -> Response:
|
|
logger.debug('In NaviSonicUnstarSong Handler')
|
|
|
|
song_id = play_queue.current_track.id
|
|
connection.star_entry(song_id, 'song')
|
|
connection.unstar_entry(song_id, 'song')
|
|
|
|
return handler_input.response_builder.response
|
|
|
|
#
|
|
# AudioPlayer Handlers
|
|
#
|
|
|
|
|
|
class PlaybackStartedHandler(AbstractRequestHandler):
|
|
"""AudioPlayer.PlaybackStarted Directive received.
|
|
|
|
Confirming that the requested audio file began playing.
|
|
Do not send any specific response.
|
|
"""
|
|
|
|
def can_handle(self, handler_input: HandlerInput) -> bool:
|
|
return is_request_type('AudioPlayer.PlaybackStarted')(handler_input)
|
|
|
|
def handle(self, handler_input: HandlerInput) -> Response:
|
|
logger.debug('In PlaybackStartedHandler')
|
|
logger.info('Playback started')
|
|
|
|
return handler_input.response_builder.response
|
|
|
|
|
|
class PlaybackStoppedHandler(AbstractRequestHandler):
|
|
"""AudioPlayer.PlaybackStopped Directive received.
|
|
|
|
Confirming that the requested audio file stopped playing.
|
|
Do not send any specific response.
|
|
"""
|
|
|
|
def can_handle(self, handler_input: HandlerInput) -> bool:
|
|
return is_request_type('AudioPlayer.PlaybackStopped')(handler_input)
|
|
|
|
def handle(self, handler_input: HandlerInput) -> Response:
|
|
logger.debug('In PlaybackStoppedHandler')
|
|
|
|
# store the current offset for later resumption
|
|
play_queue.current_track.offset = handler_input.request_envelope.request.offset_in_milliseconds
|
|
logger.debug(f'Stored track offset of: {play_queue.current_track.offset} ms for {play_queue.current_track.title}')
|
|
logger.info('Playback stopped')
|
|
|
|
return handler_input.response_builder.response
|
|
|
|
|
|
class PlaybackNearlyFinishedHandler(AbstractRequestHandler):
|
|
"""AudioPlayer.PlaybackNearlyFinished Directive received.
|
|
|
|
Replacing queue with the URL again. This should not happen on live streams.
|
|
"""
|
|
|
|
def can_handle(self, handler_input: HandlerInput) -> bool:
|
|
return is_request_type('AudioPlayer.PlaybackNearlyFinished')(handler_input)
|
|
|
|
def handle(self, handler_input: HandlerInput) -> Response:
|
|
logger.debug('In PlaybackNearlyFinishedHandler')
|
|
logger.info('Queuing next track...')
|
|
track_details = play_queue.enqueue_next_track()
|
|
|
|
return controller.start_playback('continue', None, None, track_details, handler_input)
|
|
|
|
|
|
class PlaybackFinishedHandler(AbstractRequestHandler):
|
|
"""AudioPlayer.PlaybackFinished Directive received.
|
|
|
|
Confirming that the requested audio file completed playing.
|
|
Do not send any specific response.
|
|
"""
|
|
|
|
def can_handle(self, handler_input: HandlerInput) -> bool:
|
|
return is_request_type('AudioPlayer.PlaybackFinished')(handler_input)
|
|
|
|
def handle(self, handler_input: HandlerInput) -> Response:
|
|
logger.debug('In PlaybackFinishedHandler')
|
|
play_queue.get_next_track()
|
|
|
|
return handler_input.response_builder.response
|
|
|
|
|
|
class PausePlaybackHandler(AbstractRequestHandler):
|
|
"""Handler for stopping audio.
|
|
|
|
Handles Stop, Cancel and Pause Intents and PauseCommandIssued event.
|
|
"""
|
|
|
|
def can_handle(self, handler_input: HandlerInput) -> bool:
|
|
return (is_intent_name('AMAZON.StopIntent')(handler_input) or
|
|
is_intent_name('AMAZON.CancelIntent')(handler_input) or
|
|
is_intent_name('AMAZON.PauseIntent')(handler_input))
|
|
|
|
def handle(self, handler_input: HandlerInput) -> Response:
|
|
logger.debug('In PausePlaybackHandler')
|
|
play_queue.sync()
|
|
|
|
return controller.stop(handler_input)
|
|
|
|
|
|
class ResumePlaybackHandler(AbstractRequestHandler):
|
|
"""Handler for resuming audio on different events.
|
|
|
|
Handles PlayAudio Intent, Resume Intent.
|
|
"""
|
|
|
|
def can_handle(self, handler_input: HandlerInput) -> bool:
|
|
return (is_intent_name('AMAZON.ResumeIntent')(handler_input) or
|
|
is_intent_name('PlayAudio')(handler_input))
|
|
|
|
def handle(self, handler_input: HandlerInput) -> Response:
|
|
logger.debug('In ResumePlaybackHandler')
|
|
|
|
if play_queue.current_track.offset > 0:
|
|
# There is a paused track, continue
|
|
logger.info('Resuming ' + str(play_queue.current_track.title))
|
|
logger.info('Offset ' + str(play_queue.current_track.offset))
|
|
|
|
return controller.start_playback('play', None, None, play_queue.current_track, handler_input)
|
|
|
|
elif play_queue.get_queue_count() > 0 and play_queue.current_track.offset == 0:
|
|
# No paused tracks but tracks in queue
|
|
logger.info('Resuming - There was no paused track, getting next track from queue')
|
|
track_details = play_queue.get_next_track()
|
|
|
|
return controller.start_playback('play', None, None, track_details, handler_input)
|
|
|
|
|
|
class NextPlaybackHandler(AbstractRequestHandler):
|
|
"""Handle NextIntent"""
|
|
|
|
def can_handle(self, handler_input: HandlerInput) -> bool:
|
|
return is_intent_name('AMAZON.NextIntent')(handler_input)
|
|
|
|
def handle(self, handler_input: HandlerInput) -> Response:
|
|
logger.debug('In NextPlaybackHandler')
|
|
|
|
track_details = play_queue.get_next_track()
|
|
|
|
# Set the offset to 0 as we are skipping we want to start at the beginning
|
|
track_details.offset = 0
|
|
|
|
return controller.start_playback('play', None, None, track_details, handler_input)
|
|
|
|
|
|
class PreviousPlaybackHandler(AbstractRequestHandler):
|
|
"""Handle PreviousIntent"""
|
|
|
|
def can_handle(self, handler_input: HandlerInput) -> bool:
|
|
return is_intent_name('AMAZON.PreviousIntent')(handler_input)
|
|
|
|
def handle(self, handler_input: HandlerInput) -> Response:
|
|
logger.debug('In PreviousPlaybackHandler')
|
|
track_details = play_queue.get_prevous_track()
|
|
|
|
# Set the offset to 0 as we are skipping we want to start at the beginning
|
|
track_details.offset = 0
|
|
|
|
return controller.start_playback('play', None, None, track_details, handler_input)
|
|
|
|
|
|
class PlaybackFailedEventHandler(AbstractRequestHandler):
|
|
"""AudioPlayer.PlaybackFailed Directive received.
|
|
|
|
Logging the error and restarting playing with no output speech.
|
|
"""
|
|
|
|
def can_handle(self, handler_input: HandlerInput) -> bool:
|
|
return is_request_type('AudioPlayer.PlaybackFailed')(handler_input)
|
|
|
|
def handle(self, handler_input: HandlerInput) -> Response:
|
|
logger.debug('In PlaybackFailedHandler')
|
|
|
|
song_id = play_queue.current_track.id
|
|
|
|
# Log failure and track ID
|
|
logger.error(f'Playback Failed: {handler_input.request_envelope.request.error}')
|
|
logger.error(f'Failed playing track with ID: {song_id}')
|
|
|
|
# Skip to the next track instead of stopping
|
|
track_details = play_queue.get_next_track()
|
|
|
|
# Set the offset to 0 as we are skipping we want to start at the beginning
|
|
track_details.offset = 0
|
|
|
|
return controller.start_playback('play', None, None, track_details, handler_input)
|
|
|
|
|
|
#
|
|
# Exception Handers
|
|
#
|
|
|
|
|
|
class SystemExceptionHandler(AbstractExceptionHandler):
|
|
"""Handle System.ExceptionEncountered
|
|
|
|
Handles exceptions and prints error information
|
|
in the log
|
|
"""
|
|
|
|
def can_handle(self, handler_input: HandlerInput, exception: Exception) -> bool:
|
|
return is_request_type('System.ExceptionEncountered')(handler_input)
|
|
|
|
def handle(self, handler_input: HandlerInput, exception: Exception) -> Response:
|
|
logger.debug('In SystemExceptionHandler')
|
|
|
|
# Log the exception
|
|
logger.error(f'System Exception: {exception}')
|
|
logger.error(f'Request Type Was: {get_request_type(handler_input)}')
|
|
error = handler_input.request_envelope.request.to_dict()
|
|
logger.error(f"Details: {error.get('error').get('message')}")
|
|
|
|
if get_request_type(handler_input) == 'IntentRequest':
|
|
logger.error(f'Intent Name Was: {get_intent_name(handler_input)}')
|
|
|
|
speech = "Sorry, I didn't get that. Can you please say it again!!"
|
|
handler_input.response_builder.speak(speech).ask(speech)
|
|
|
|
return handler_input.response_builder.response
|
|
|
|
|
|
class GeneralExceptionHandler(AbstractExceptionHandler):
|
|
"""Handle general exceptions
|
|
|
|
Handles exceptions and prints error information
|
|
in the log
|
|
"""
|
|
|
|
def can_handle(self, handler_input: HandlerInput, exception: Exception) -> bool:
|
|
return True
|
|
|
|
def handle(self, handler_input: HandlerInput, exception: Exception) -> Response:
|
|
logger.debug('In GeneralExceptionHandler')
|
|
|
|
# Log the exception
|
|
logger.error(f'General Exception: {exception}')
|
|
logger.error(f'Request Type Was: {get_request_type(handler_input)}')
|
|
|
|
if get_request_type(handler_input) == 'IntentRequest':
|
|
logger.error(f'Intent Name Was: {get_intent_name(handler_input)}')
|
|
|
|
speech = "Sorry, I didn't get that. Can you please say it again!!"
|
|
handler_input.response_builder.speak(speech).ask(speech)
|
|
|
|
return handler_input.response_builder.response
|
|
|
|
|
|
#
|
|
# Request Interceptors
|
|
#
|
|
|
|
|
|
class LoggingRequestInterceptor(AbstractRequestInterceptor):
|
|
"""Intercept all requests
|
|
|
|
Intercepts all requests sent to the skill and prints them in the log
|
|
"""
|
|
|
|
def process(self, handler_input: HandlerInput):
|
|
logger.debug(f'Request received: {handler_input.request_envelope.request}')
|
|
|
|
|
|
class LoggingResponseInterceptor(AbstractResponseInterceptor):
|
|
"""Intercept all responses
|
|
|
|
Intercepts all responses sent from the skill and prints them in the log
|
|
"""
|
|
|
|
def process(self, handler_input: HandlerInput, response: Response):
|
|
logger.debug(f'Response sent: {response}')
|
|
|
|
|
|
# Register Intent Handlers
|
|
sb.add_request_handler(LaunchRequestHandler())
|
|
sb.add_request_handler(CheckAudioInterfaceHandler())
|
|
sb.add_request_handler(SkillEventHandler())
|
|
sb.add_request_handler(HelpHandler())
|
|
sb.add_request_handler(NaviSonicPlayMusicByArtist())
|
|
sb.add_request_handler(NaviSonicPlayAlbumByArtist())
|
|
sb.add_request_handler(NaviSonicPlaySongByArtist())
|
|
sb.add_request_handler(NaviSonicPlayPlaylist())
|
|
sb.add_request_handler(NaviSonicPlayFavouriteSongs())
|
|
sb.add_request_handler(NaviSonicPlayMusicByGenre())
|
|
sb.add_request_handler(NaviSonicPlayMusicRandom())
|
|
sb.add_request_handler(NaviSonicRandomiseQueue())
|
|
sb.add_request_handler(NaviSonicSongDetails())
|
|
sb.add_request_handler(NaviSonicStarSong())
|
|
sb.add_request_handler(NaviSonicUnstarSong())
|
|
|
|
# Register AutoPlayer Handlers
|
|
sb.add_request_handler(PlaybackStartedHandler())
|
|
sb.add_request_handler(PlaybackStoppedHandler())
|
|
sb.add_request_handler(PlaybackNearlyFinishedHandler())
|
|
sb.add_request_handler(PlaybackFinishedHandler())
|
|
sb.add_request_handler(PausePlaybackHandler())
|
|
sb.add_request_handler(NextPlaybackHandler())
|
|
sb.add_request_handler(PreviousPlaybackHandler())
|
|
sb.add_request_handler(ResumePlaybackHandler())
|
|
sb.add_request_handler(PlaybackFailedEventHandler())
|
|
|
|
|
|
# Register Exception Handlers
|
|
sb.add_exception_handler(SystemExceptionHandler())
|
|
sb.add_exception_handler(GeneralExceptionHandler())
|
|
|
|
if navidrome_log_level >= 2:
|
|
# Register Interceptors (log all requests)
|
|
sb.add_global_request_interceptor(LoggingRequestInterceptor())
|
|
sb.add_global_response_interceptor(LoggingResponseInterceptor())
|
|
|
|
sa = SkillAdapter(skill=sb.create(), skill_id='test', app=app)
|
|
sa.register(app=app, route='/')
|
|
|
|
# Enable queue and history diagnostics
|
|
if navidrome_log_level == 3:
|
|
logger.warning('AskNavidrome debugging has been enabled, this should only be used when testing!')
|
|
logger.warning('The /buffer, /queue and /history http endpoints are available publicly!')
|
|
|
|
@app.route('/queue')
|
|
def view_queue():
|
|
"""View the contents of play_queue.queue
|
|
|
|
Creates a tabulated page contining the contents of the play_queue.queue deque.
|
|
"""
|
|
|
|
return render_template('table.html', title='AskNavidrome - Queued Tracks',
|
|
tracks=play_queue.queue, current=play_queue.current_track)
|
|
|
|
@app.route('/history')
|
|
def view_history():
|
|
"""View the contents of play_queue.history
|
|
|
|
Creates a tabulated page contining the contents of the play_queue.history deque.
|
|
"""
|
|
|
|
return render_template('table.html', title='AskNavidrome - Track History',
|
|
tracks=play_queue.history, current=play_queue.current_track)
|
|
|
|
@app.route('/buffer')
|
|
def view_buffer():
|
|
"""View the contents of play_queue.buffer
|
|
|
|
Creates a tabulated page contining the contents of the play_queue.buffer deque.
|
|
"""
|
|
|
|
return render_template('table.html', title='AskNavidrome - Buffered Tracks',
|
|
tracks=play_queue.buffer, current=play_queue.current_track)
|
|
|
|
|
|
# Run web app by default when file is executed.
|
|
if __name__ == '__main__':
|
|
# Start the web service
|
|
app.run(host='0.0.0.0')
|