Move to GitHub

This commit is contained in:
Ross Stewart
2022-08-05 16:37:56 +01:00
commit 58885be096
18 changed files with 2092 additions and 0 deletions

View File

@@ -0,0 +1 @@
"""The AskNavidrome Module!"""

View File

@@ -0,0 +1,195 @@
import logging
from typing import Union
from ask_sdk_core.handler_input import HandlerInput
from ask_sdk_model import Response
from ask_sdk_model.ui import StandardCard
from ask_sdk_model.interfaces.audioplayer import (
PlayDirective, PlayBehavior, AudioItem, Stream, AudioItemMetadata,
StopDirective)
from ask_sdk_model.interfaces import display
from .track import Track
from .subsonic_api import SubsonicConnection
from .media_queue import MediaQueue
logger = logging.getLogger(__name__)
#
# Functions
#
def start_playback(mode: str, text: str, card_data: dict, track_details: Track, handler_input: HandlerInput) -> Response:
"""Function to play audio.
Begin playing audio when:
- Play Audio Intent is invoked.
- Resuming audio when stopped / paused.
- Next / Previous commands issues.
.. note ::
- https://developer.amazon.com/docs/custom-skills/audioplayer-interface-reference.html#play
- REPLACE_ALL: Immediately begin playback of the specified stream,
and replace current and enqueued streams.
:param str mode: play | continue - Play immediately or enqueue a track
:param str text: Text which should be spoken before playback starts
:param dict card_data: Data to display on a card
:param Track track_details: A Track object containing details of the track to use
:param HandlerInput handler_input: The Amazon Alexa HandlerInput object
:return: Amazon Alexa Response class
:rtype: Response
"""
if mode == 'play':
# Starting playback
logger.debug('In start_playback() - play mode')
if card_data:
# Cards are only supported if we are starting a new session
handler_input.response_builder.set_card(
StandardCard(
title=card_data['title'], text=card_data['text'],
# image=Image(
# small_image_url=card_data['small_image_url'],
# large_image_url=card_data['large_image_url'])
)
)
handler_input.response_builder.add_directive(
PlayDirective(
play_behavior=PlayBehavior.REPLACE_ALL,
audio_item=AudioItem(
stream=Stream(
token=track_details.id,
url=track_details.uri,
offset_in_milliseconds=track_details.offset,
expected_previous_token=None),
metadata=add_screen_background(card_data) if card_data else None
)
)
).set_should_end_session(True)
if text:
# Text is not supported if we are continuing an existing play list
handler_input.response_builder.speak(text)
logger.debug(f'Track ID: {track_details.id}')
logger.debug(f'Track Previous ID: {track_details.previous_id}')
logger.info(f'Playing track: {track_details.title} by: {track_details.artist}')
elif mode == 'continue':
# Continuing Playback
logger.debug('In start_playback() - continue mode')
handler_input.response_builder.add_directive(
PlayDirective(
play_behavior=PlayBehavior.ENQUEUE,
audio_item=AudioItem(
stream=Stream(
token=track_details.id,
url=track_details.uri,
# Offset is 0 to allow playing of the next track from the beginning
# if the Previous intent is used
offset_in_milliseconds=0,
expected_previous_token=track_details.previous_id),
metadata=None
)
)
).set_should_end_session(True)
logger.debug(f'Track ID: {track_details.id}')
logger.debug(f'Track Previous ID: {track_details.previous_id}')
logger.info(f'Enqueuing track: {track_details.title} by: {track_details.artist}')
return handler_input.response_builder.response
def stop(handler_input: HandlerInput) -> Response:
"""Stop playback
:param HandlerInput handler_input: The Amazon Alexa HandlerInput object
:return: Amazon Alexa Response class
:rtype: Response
"""
logger.debug('In stop()')
handler_input.response_builder.add_directive(StopDirective())
return handler_input.response_builder.response
def add_screen_background(card_data: dict) -> Union[AudioItemMetadata, None]:
"""Add background to card.
Cards are viewable on devices with screens and in the Alexa
app.
:param dict card_data: Dictionary containing card data
:return: An Amazon AudioItemMetadata object or None if card data is not present
:rtype: AudioItemMetadata | None
"""
logger.debug('In add_screen_background()')
if card_data:
metadata = AudioItemMetadata(
title=card_data['title'],
subtitle=card_data['text'],
art=display.Image(
content_description=card_data['title'],
sources=[
display.ImageInstance(
url='https://github.com/navidrome/navidrome/raw/master/resources/logo-192x192.png'
)
]
),
background_image=display.Image(
content_description=card_data['title'],
sources=[
display.ImageInstance(
url='https://github.com/navidrome/navidrome/raw/master/resources/logo-192x192.png'
)
]
)
)
return metadata
else:
return None
def enqueue_songs(api: SubsonicConnection, queue: MediaQueue, song_id_list: list) -> None:
"""Enqueue songs
Add Track objects to the queue deque
:param SubsonicConnection api: A SubsonicConnection object to allow access to the Navidrome API
:param MediaQueue queue: A MediaQueue object
:param list song_id_list: A list of song IDs to enqueue
:return: None
"""
for song_id in song_id_list:
song_details = api.get_song_details(song_id)
song_uri = api.get_song_uri(song_id)
# Create track object from song details
new_track = Track(song_details.get('song').get('id'),
song_details.get('song').get('title'),
song_details.get('song').get('artist'),
song_details.get('song').get('artistId'),
song_details.get('song').get('album'),
song_details.get('song').get('albumId'),
song_details.get('song').get('track'),
song_details.get('song').get('year'),
song_details.get('song').get('genre'),
song_details.get('song').get('duration'),
song_details.get('song').get('bitRate'),
song_uri,
0,
None)
# Add track object to queue
queue.add_track(new_track)

View File

@@ -0,0 +1,214 @@
from collections import deque
from copy import deepcopy
import logging
import random
from .track import Track
class MediaQueue:
""" The MediaQueue class
This class provides a queue based on a Python deque. This is used to store
the tracks in the current play queue
"""
def __init__(self) -> None:
"""
:return: None
"""
self.logger = logging.getLogger(__name__)
"""Logger"""
self.queue: deque = deque()
"""Deque containing tracks still to be played"""
self.history: deque = deque()
"""Deque to hold tracks that have already been played"""
self.buffer: deque = deque()
"""Deque to contain the list of tracks to be enqueued
This deque is created from self.queue when actions such as next or
previous are performed. This is because Amazon can send the
PlaybackNearlyFinished request early. Without self.buffer, this would
change self.current_track causing us to lose the real position of the
queue.
"""
self.current_track: Track = Track()
"""Property to hold the current track object"""
def add_track(self, track: Track) -> None:
"""Add tracks to the queue
:param Track track: A Track object containing details of the track to be played
:return: None
"""
self.logger.debug('In add_track()')
if not self.queue:
# This is the first track in the queue
self.queue.append(track)
else:
# There are already tracks in the queue, ensure previous_id is set
# Get the last track from the deque
prev_track = self.queue.pop()
# Set the previous_id attribute
track.previous_id = prev_track.id
# Return the previous track to the deque
self.queue.append(prev_track)
# Add the new track to the deque
self.queue.append(track)
self.logger.debug(f'In add_track() - there are {len(self.queue)} tracks in the queue')
def shuffle(self) -> None:
"""Shuffle the queue
Shuffles the queue and resets the previous track IDs required for the ENQUEUE PlayBehaviour
:return: None
"""
self.logger.debug('In shuffle()')
# Copy the original queue
orig = self.queue
new_queue = deque()
# Randomise the queue
random.shuffle(orig)
track_id = None
for t in orig:
if not new_queue:
# This is the first track, get the ID and add it
track_id = t.id
new_queue.append(t)
else:
# Set the tracks previous_id
t.previous_id = track_id
# Get the track ID to use as the next previous_id
track_id = t.id
# Add the track to the queue
new_queue.append(t)
# Replace the original queue with the new shuffled one
self.queue = new_queue
def get_next_track(self) -> Track:
"""Get the next track
Get the next track from self.queue and add it to the history deque
:return: The next track object
:rtype: Track
"""
self.logger.debug('In get_next_track()')
if self.current_track.id == '' or self.current_track.id is None:
# This is the first track
self.current_track = self.queue.popleft()
else:
# This is not the first track
self.history.append(self.current_track)
self.current_track = self.queue.popleft()
# Set the buffer to match the queue
self.sync()
return self.current_track
def get_prevous_track(self) -> Track:
"""Get the previous track
Get the last track added to the history deque and
add it to the front of the play queue
:return: The previous track object
:rtype: Track
"""
self.logger.debug('In get_prevous_track()')
# Return the current track to the queue
self.queue.appendleft(self.current_track)
# Set the new current track
self.current_track = self.history.pop()
# Set the buffer to match the queue
self.sync()
return self.current_track
def enqueue_next_track(self) -> Track:
"""Get the next buffered track
Get the next track from the buffer without updating the current track
attribute. This allows Amazon to send the PlaybackNearlyFinished
request early to queue the next track while maintaining the playlist
:return: The next track to be played
:rtype: Track
"""
self.logger.debug('In enqueue_next_track()')
return self.buffer.popleft()
def clear(self) -> None:
"""Clear queue, history and buffer deques
:return: None
"""
self.logger.debug('In clear()')
self.queue.clear()
self.history.clear()
self.buffer.clear()
def get_queue_count(self) -> int:
"""Get the number of tracks in the queue
:return: The number of tracks in the queue deque
:rtype: int
"""
self.logger.debug('In get_queue_count()')
return len(self.queue)
def get_history_count(self) -> int:
"""Get the number of tracks in the history deque
:return: The number of tracks in the history deque
:rtype: int
"""
self.logger.debug('In get_history_count()')
return len(self.history)
def sync(self) -> None:
"""Syncronise the buffer with the queue
Overwrite the buffer with the current queue.
This is useful when pausing or stopping to ensure
the resulting PlaybackNearlyFinished request gets
the correct. In practice this will have already
queued and there for missing from the current buffer
:return: None
"""
self.buffer = deepcopy(self.queue)

View File

@@ -0,0 +1,399 @@
from hashlib import md5
from typing import Union
import logging
import random
import secrets
import libsonic
class SubsonicConnection:
"""Class with methods to interact with Subsonic API compatible media servers
"""
def __init__(self, server_url: str, user: str, passwd: str, port: int, api_location: str, api_version: str) -> None:
"""
:param str server_url: The URL of the Subsonic API compatible media server
:param str user: Username to authenticate against the API
:param str passwd: Password to authenticate against the API
:param int port: Port the Subsonic compatibe server is listening on
:param str api_location: Path to the API, this is appended to server_url
:param str api_version: The version of the Subsonic API that is in use
:return: None
"""
self.logger = logging.getLogger(__name__)
self.server_url = server_url
self.user = user
self.passwd = passwd
self.port = port
self.api_location = api_location
self.api_version = api_version
self.conn = libsonic.Connection(self.server_url,
self.user,
self.passwd,
self.port,
self.api_location,
'AskNavidrome',
self.api_version,
False)
self.logger.debug('Connected to Navidrome')
def ping(self) -> bool:
"""Ping a Subsonic API server
Verify the connection to a Subsonic compatible API server
is working
:return: True if the connection works, False if it does not
:rtype: bool
"""
self.logger.debug('In function ping()')
status = self.conn.ping()
if status:
# Success
self.logger.info('Successfully connected to Navidrome')
else:
# Fail
self.logger.error('Failed to connect to Navidrome')
return self.conn.ping()
def search_playlist(self, term: str) -> Union[str, None]:
"""Search the media server for the given playlist
:param str term: The name of the playlist
:return: The ID of the playlist or None if the playlist is not found
:rtype: str | None
"""
self.logger.debug('In function search_playlist()')
playlist_dict = self.conn.getPlaylists()
# Search the list of dictionaries for a playlist with a name that matches the search term
playlist_id_list = [item.get('id') for item in playlist_dict['playlists']['playlist'] if item.get('name').lower() == term.lower()]
if len(playlist_id_list) == 1:
# We have matched the playlist return it
self.logger.debug(f'Found playlist {playlist_id_list[0]}')
return playlist_id_list[0]
elif len(playlist_id_list) > 1:
# More than one result was returned, this should not be possible
self.logger.error(f'More than one playlist called {term} was found, multiple playlists with the same name are not supported')
return None
elif len(playlist_id_list) == 0:
self.logger.error(f'No playlist matching the name {term} was found!')
return None
def search_artist(self, term: str) -> Union[dict, None]:
"""Search the media server for the given artist
:param str term: The name of the artist
:return: A dictionary of artists or None if no results are found
:rtype: dict | None
"""
self.logger.debug('In function search_artist()')
result_dict = self.conn.search3(term)
if len(result_dict['searchResult3']) > 0:
# Results found
result_count = len(result_dict['searchResult3']['artist'])
self.logger.debug(f'Searching artists for term: {term} found {result_count} entries.')
if result_count > 0:
# Results were found
return result_dict['searchResult3']['artist']
# No results were found
return None
def search_album(self, term: str) -> Union[dict, None]:
"""Search the media server for the given album
:param str term: The name of the album
:return: A dictionary of albums or None if no results are found
:rtype: dict | None
"""
self.logger.debug('In function search_album()')
result_dict = self.conn.search3(term)
if len(result_dict['searchResult3']) > 0:
# Results found
result_count = len(result_dict['searchResult3']['album'])
self.logger.debug(f'Searching albums for term: {term} found {result_count} entries.')
if result_count > 0:
# Results were found
return result_dict['searchResult3']['album']
# No results were found
return None
def search_song(self, term: str) -> Union[dict, None]:
"""Search the media server for the given song
:param str term: The name of the song
:return: A dictionary of songs or None if no results are found
:rtype: dict | None
"""
self.logger.debug('In function search_song()')
result_dict = self.conn.search3(term)
if len(result_dict['searchResult3']) > 0:
# Results found
result_count = len(result_dict['searchResult3']['song'])
self.logger.debug(f'Searching songs for term: {term}, found {result_count} entries.')
if result_count > 0:
# Results were found
return result_dict['searchResult3']['song']
# No results were found
return None
def albums_by_artist(self, id: str) -> 'list[dict]':
"""Get the albums for a given artist
:param str id: The artist ID
:return: A list of albums
:rtype: list of dict
"""
self.logger.debug('In function albums_by_artist()')
result_dict = self.conn.getArtist(id)
album_list = result_dict['artist'].get('album')
# Shuffle the album list to keep generic requests fresh
random.shuffle(album_list)
return album_list
def build_song_list_from_albums(self, albums: 'list[dict]', length: int) -> list:
"""Get a list of songs from given albums
Build a list of songs from the given albums, keep adding tracks
until song_count is greater than of equal to length
:param list[dict] albums: A list of dictionaries containing album information
:param int length: The minimum number of songs that should be returned, if -1 there is no limit
:return: A list of song IDs
:rtype: list
"""
self.logger.debug('In function build_song_list_from_albums()')
song_id_list = []
if length != -1:
song_count = 0
album_id_list = []
# The list of songs should be limited by length
for album in albums:
if song_count < int(length):
# We need more songs
album_id_list.append(album.get('id'))
song_count = song_count + album.get('songCount')
else:
# We have enough songs, stop iterating
break
else:
# The list of songs should not be limited
album_id_list = [album.get('id') for album in albums]
# Get a song listing for each album
for album_id in album_id_list:
album_details = self.conn.getAlbum(album_id)
for song_detail in album_details['album']['song']:
# Capture the song ID
song_id_list.append(song_detail.get('id'))
return song_id_list
def build_song_list_from_playlist(self, id: str) -> list:
"""Build a list of songs from a given playlist
:param str id: The playlist ID
:return: A list of song IDs
:rtype: list
"""
self.logger.debug('In function build_song_list_from_playlist()')
song_id_list = []
playlist_details = self.conn.getPlaylist(id)
song_id_list = [song_detail.get('id') for song_detail in playlist_details.get('playlist').get('entry')]
return song_id_list
def build_song_list_from_favourites(self) -> Union[list, None]:
"""Build a shuffled list favourite songs
:return: A list of song IDs or None if no favourite tracks are found.
:rtype: list | None
"""
self.logger.debug('In function build_song_list_from_favourites()')
favourite_songs = self.conn.getStarred2().get('starred2').get('song')
if len(favourite_songs) > 0:
song_id_list = [song.get('id') for song in favourite_songs]
return song_id_list
else:
return None
def build_song_list_from_genre(self, genre: str, count: int) -> Union[list, None]:
"""Build a shuffled list songs of songs from the given genre.
:param str genre: The genre, acceptible values are with the getGenres Subsonic API call.
:param int count: The number of songs to return
:return: A list of song IDs or None if no tracks are found.
:rtype: list | None
"""
self.logger.debug('In function build_song_list_from_genre()')
# Note the use of title() to captalise the first letter of each word in the genre
# without this the genres do not match the strings returned by the API.
self.logger.debug(f'Searching for {genre.title()} music')
songs_from_genre = self.conn.getSongsByGenre(genre.title(), count).get('songsByGenre').get('song')
if len(songs_from_genre) > 0:
song_id_list = [song.get('id') for song in songs_from_genre]
return song_id_list
else:
return None
def build_random_song_list(self, count: int) -> Union[list, None]:
"""Build a shuffled list of random songs
:param int count: The number of songs to return
:return: A list of song IDs or None if no tracks are found.
:rtype: list | None
"""
self.logger.debug('In function build_random_song_list()')
random_songs = self.conn.getRandomSongs(count).get('randomSongs').get('song')
if len(random_songs) > 0:
song_id_list = [song.get('id') for song in random_songs]
return song_id_list
else:
return None
def get_song_details(self, id: str) -> dict:
"""Get details about a given song ID
:param str id: A song ID
:return: A dictionary of details about the given song.
:rtype: dict
"""
self.logger.debug('In function get_song_details()')
song_details = self.conn.getSong(id)
return song_details
def get_song_uri(self, id: str) -> str:
"""Create a URI for a given song
Creates a URI for the song represented by the given ID. Authentication details are
embedded in the URI
:param str id: A song ID
:return: A properly formatted URI
:rtype: str
"""
self.logger.debug('In function get_song_uri()')
salt = secrets.token_hex(16)
auth_token = md5(self.passwd.encode() + salt.encode())
# This creates a multiline f string, uri contains a single line with both
# f strings.
uri = (
f'{self.server_url}:{self.port}{self.api_location}/stream.view?f=json&v={self.api_version}&c=AskNavidrome&u='
f'{self.user}&s={salt}&t={auth_token.hexdigest()}&id={id}'
)
return uri
def star_entry(self, id: str, mode: str) -> None:
"""Add a star to the given entity
:param str id: The Navidrome ID of the entity.
:param str mode: The type of entity, must be song, artist or album
:return: None.
"""
# Convert id to list
id_list = [id]
if mode == 'song':
self.conn.star(id_list, None, None)
return None
elif mode == 'album':
self.conn.star(None, id_list, None)
return None
elif mode == 'artist':
self.conn.star(None, None, id_list)
return None
def unstar_entry(self, id: str, mode: str) -> None:
"""Remove a star from the given entity
:param str id: The Navidrome ID of the entity.
:param str mode: The type of entity, must be song, artist or album
:return: None.
"""
# Convert id to list
id_list = [id]
if mode == 'song':
self.conn.unstar(id_list, None, None)
return None
elif mode == 'album':
self.conn.unstar(None, id_list, None)
return None
elif mode == 'artist':
self.conn.unstar(None, None, id_list)
return None

View File

@@ -0,0 +1,41 @@
class Track:
"""An object that represents an audio track
"""
def __init__(self,
id: str = '', title: str = '', artist: str = '', artist_id: str = '',
album: str = '', album_id: str = '', track_no: int = 0, year: int = 0,
genre: str = '', duration: int = 0, bitrate: int = 0, uri: str = '',
offset: int = 0, previous_id: str = '') -> None:
"""
:param str id: The song ID. Defaults to ''
:param str title: The song title. Defaults to ''
:param str artist: The artist name. Defaults to ''
:param str artist_id: The artist ID. Defaults to ''
:param str album: The album name. Defaults to ''
:param str album_id: The album ID. Defaults to ''
:param int track_no: The track number. Defaults to 0
:param int year: The release year. Defaults to 0
:param str genre: The music genre. Defaults to ''
:param int duration: The length of the track in seconds. Defaults to 0
:param int bitrate: The bit rate in kbps. Defaults to 0
:param str uri: The song's URI for streaming. Defaults to ''
:param int offset: The position in the track to start playback in milliseconds. Defaults to 0
:param str previous_id: The ID of the previous song in the playlist. Defaults to ''
:return: None
"""
self.id: str = id
self.artist: str = artist
self.artist_id: str = artist_id
self.title: str = title
self.album: str = album
self.album_id: str = album_id
self.track_no: int = track_no
self.year: int = year
self.genre: str = genre
self.duration: int = duration
self.bitrate: int = bitrate
self.uri: str = uri
self.offset: int = offset
self.previous_id: str = previous_id