Source code for aiobosest.aiobosest
# Copyright 2016 Wagner Sartori Junior
#
# This file is part of aiobosest.
#
# aiobosest is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License.
#
# aiobosest is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with aiobosest. If not, see <http://www.gnu.org/licenses/>.
from .helpers import (
Key,
NowPlaying,
Volume,
)
from .log import logger
from .connection import (
Connection,
)
from .errors import (
RESTConnectionError,
)
import asyncio
import logging
import aiohttp
__all__ = [
'BoseSoundTouch',
]
[docs]class BoseSoundTouch:
"""Main Bose SoundTouch Class.
Args:
address: Bose SoundTouch IP address
is_updated: asyncio.Event() is set when an update happens
loop: asyncio loop if you want to provide one
Attributes:
key: :class:`.Key` Class
nowplaying: :class:`.NowPlaying` Class
volume: :class:`.Volume` Class
"""
def __init__(self, address, is_updated=None, loop=None):
if loop is None:
loop = asyncio.get_event_loop()
if is_updated is None:
is_updated = asyncio.Event()
self._connection = Connection(address, loop)
self._loop = loop
self.is_updated = is_updated
self._running = False
self.key = Key(connection=self._connection, is_updated=self.is_updated)
self.nowplaying = NowPlaying(connection=self._connection, is_updated=self.is_updated)
self.volume = Volume(connection=self._connection, is_updated=self.is_updated)
self._reader_task = asyncio.ensure_future(self._read_data(), loop=self._loop)
[docs] async def shutdown(self):
"""Needs to be called before application quit."""
await self._connection.shutdown()
if not self._reader_task.done():
self._reader_task.cancel()
[docs] async def is_running(self):
"""Wait for reader task running.
Returns:
True when reader task is running"""
while not self._running:
await asyncio.sleep(.1)
return True
async def _read_data(self):
"""Always running reader task on the websocket port."""
try:
await self.key.get()
await self.nowplaying.get()
await self.volume.get()
except RESTConnectionError as e:
logging.critical('Error connecting to your BoseSoundTouch system, '
'read loop from websocket will not run')
return
logging.info('Connecting to the BoseSoundTouch WebSocket...')
await self._connection.connect_websocket()
while True:
self._running = True
try:
msg = await self._connection.websocket.receive()
except asyncio.CancelledError as e:
logging.info('Closing WebSocket.')
self._running = False
break
except Exception as e:
logging.critical('Exception receiving from websocket: {!r}'.format(e))
self._running = False
break
logging.debug('<<< WEBSOCKET: {}'.format(msg))
if msg.type == aiohttp.WSMsgType.TEXT:
await self.key.parse(msg.data)
await self.nowplaying.parse(msg.data)
await self.volume.parse(msg.data)