except ValueError:
self._coordinator = None
else:
self._coordinator = None
if self._coordinator == self:
_LOGGER.warning("Coordinator loop on: %s", self.unique_id)
self._coordinator = None
track_info = None
if self._last_avtransport_event:
variables = self._last_avtransport_event.variables
current_track_metadata = variables.get(
"current_track_meta_data", {}
)
self._status = variables.get("transport_state")
if current_track_metadata:
// no need to ask speaker for information we already have
current_track_metadata = current_track_metadata.__dict__
track_info = {
"uri": variables.get("current_track_uri"),
"artist": current_track_metadata.get("creator"),
"album": current_track_metadata.get("album"),
"title": current_track_metadata.get("title"),
"playlist_position": variables.get("current_track"),
"duration": variables.get("current_track_duration")
}
else:
self._player_volume = self._player.volume
self._player_volume_muted = self._player.mute
transport_info = self._player.get_current_transport_info()
self._status = transport_info.get("current_transport_state")
if not track_info:
track_info = self._player.get_current_track_info()
if not self._coordinator:
is_playing_tv = self._player.is_playing_tv
is_playing_line_in = self._player.is_playing_line_in
media_info = self._player.avTransport.GetMediaInfo(
[("InstanceID", 0)]
)
current_media_uri = media_info["CurrentURI"]
media_artist = track_info.get("artist")
media_album_name = track_info.get("album")
media_title = track_info.get("title")
media_image_url = track_info.get("album_art", None)
media_position = None
media_position_updated_at = None
source_name = None
is_radio_stream = \
current_media_uri.startswith("x-sonosapi-stream:") or \
current_media_uri.startswith("x-rincon-mp3radio:")
if is_playing_tv or is_playing_line_in:
// playing from line-in/tv.
support_previous_track = False
support_next_track = False
support_stop = False
support_pause = False
if is_playing_tv:
media_artist = SUPPORT_SOURCE_TV
else:
media_artist = SUPPORT_SOURCE_LINEIN
source_name = media_artist
media_album_name = None
media_title = None
media_image_url = None
elif is_radio_stream:
media_image_url = self._format_media_image_url(
media_image_url,
current_media_uri
)
support_previous_track = False
support_next_track = False
support_stop = False
support_pause = False
source_name = "Radio"
// Check if currently playing radio station is in favorites
favs = self._player.get_sonos_favorites()["favorites"]
favc = [
fav for fav in favs if fav["uri"] == current_media_uri
]
if len(favc) == 1:
src = favc.pop()
source_name = src["title"]
// for radio streams we set the radio station name as the
// title.
if media_artist and media_title:
// artist and album name are in the data, concatenate
// that do display as artist.
// "Information" field in the sonos pc app
media_artist = "{artist} - {title}".format(
artist=media_artist,
title=media_title
)
else:
// "On Now" field in the sonos pc app
media_artist = self._media_radio_show
current_uri_metadata = media_info["CurrentURIMetaData"]
if current_uri_metadata not in \
("", "NOT_IMPLEMENTED", None):
// currently soco does not have an API for this
import soco
current_uri_metadata = soco.xml.XML.fromstring(
soco.utils.really_utf8(current_uri_metadata))
md_title = current_uri_metadata.findtext(
".//{http://purl.org/dc/elements/1.1/}title")
if md_title not in ("", "NOT_IMPLEMENTED", None):
media_title = md_title
if media_artist and media_title:
// some radio stations put their name into the artist
// name, e.g.:
// media_title = "Station"
// media_artist = "Station - Artist - Title"
// detect this case and trim from the front of
// media_artist for cosmetics
str_to_trim = "{title} - ".format(
title=media_title
)
chars = min(len(media_artist), len(str_to_trim))
if media_artist[:chars].upper() == \
str_to_trim[:chars].upper():
media_artist = media_artist[chars:]
else:
// not a radio stream
media_image_url = self._format_media_image_url(
media_image_url,
track_info["uri"]
)
support_previous_track = True
support_next_track = True
support_stop = True
support_pause = True
position_info = self._player.avTransport.GetPositionInfo(
[("InstanceID", 0),
("Channel", "Master")]
)
rel_time = _parse_timespan(
position_info.get("RelTime")
)
// player no longer reports position?
update_media_position = rel_time is None and \
self._media_position is not None
// player started reporting position?
update_media_position |= rel_time is not None and \
self._media_position is None
// used only if a media is playing
if self.state != STATE_PLAYING:
update_media_position = None
// position changed?
if rel_time is not None and \
self._media_position is not None:
time_diff = utcnow() - self._media_position_updated_at
time_diff = time_diff.total_seconds()
calculated_position = \
self._media_position + \
time_diff
update_media_position = \
abs(calculated_position - rel_time) > 1.5
if update_media_position:
media_position = rel_time
media_position_updated_at = utcnow()
else:
// don"t update media_position (don"t want unneeded
// state transitions)
media_position = self._media_position
media_position_updated_at = \
self._media_position_updated_at
playlist_position = track_info.get("playlist_position")
if playlist_position in ("", "NOT_IMPLEMENTED", None):
playlist_position = None
else:
playlist_position = int(playlist_position)
playlist_size = media_info.get("NrTracks")
if playlist_size in ("", "NOT_IMPLEMENTED", None):
playlist_size = None
else:
playlist_size = int(playlist_size)
if playlist_position is not None and \
playlist_size is not None:
if playlist_position == 1:
support_previous_track = False
if playlist_position == playlist_size:
support_next_track = False
self._media_content_id = track_info.get("title")
self._media_duration = _parse_timespan(
track_info.get("duration")
)
self._media_position = media_position
self._media_position_updated_at = media_position_updated_at
self._media_image_url = media_image_url
self._media_artist = media_artist
self._media_album_name = media_album_name
self._media_title = media_title
self._current_track_uri = track_info["uri"]
self._current_track_is_radio_stream = is_radio_stream
self._support_previous_track = support_previous_track
self._support_next_track = support_next_track
self._support_stop = support_stop
self._support_pause = support_pause
self._is_playing_tv = is_playing_tv
self._is_playing_line_in = is_playing_line_in
self._source_name = source_name
// update state of the whole group
for device in [x for x in self.hass.data[DATA_SONOS]
if x.coordinator == self]:
if device.entity_id is not self.entity_id:
self.schedule_update_ha_state()
if self._queue is None and self.entity_id is not None:
self._subscribe_to_player_events()
favs = self._player.get_sonos_favorites().get("favorites", [])