mirror of
https://github.com/beetbox/beets.git
synced 2026-01-06 07:53:40 +01:00
commit
c4347960ea
4 changed files with 389 additions and 0 deletions
|
|
@ -116,12 +116,19 @@ def resource(name, patchable=False):
|
|||
entities = [entity for entity in entities if entity]
|
||||
|
||||
if get_method() == "DELETE":
|
||||
|
||||
if app.config.get('READONLY', True):
|
||||
return flask.abort(405)
|
||||
|
||||
for entity in entities:
|
||||
entity.remove(delete=is_delete())
|
||||
|
||||
return flask.make_response(jsonify({'deleted': True}), 200)
|
||||
|
||||
elif get_method() == "PATCH" and patchable:
|
||||
if app.config.get('READONLY', True):
|
||||
return flask.abort(405)
|
||||
|
||||
for entity in entities:
|
||||
entity.update(flask.request.get_json())
|
||||
entity.try_sync(True, False) # write, don't move
|
||||
|
|
@ -162,12 +169,19 @@ def resource_query(name, patchable=False):
|
|||
entities = query_func(queries)
|
||||
|
||||
if get_method() == "DELETE":
|
||||
|
||||
if app.config.get('READONLY', True):
|
||||
return flask.abort(405)
|
||||
|
||||
for entity in entities:
|
||||
entity.remove(delete=is_delete())
|
||||
|
||||
return flask.make_response(jsonify({'deleted': True}), 200)
|
||||
|
||||
elif get_method() == "PATCH" and patchable:
|
||||
if app.config.get('READONLY', True):
|
||||
return flask.abort(405)
|
||||
|
||||
for entity in entities:
|
||||
entity.update(flask.request.get_json())
|
||||
entity.try_sync(True, False) # write, don't move
|
||||
|
|
@ -428,6 +442,7 @@ class WebPlugin(BeetsPlugin):
|
|||
'cors_supports_credentials': False,
|
||||
'reverse_proxy': False,
|
||||
'include_paths': False,
|
||||
'readonly': True,
|
||||
})
|
||||
|
||||
def commands(self):
|
||||
|
|
@ -447,6 +462,7 @@ class WebPlugin(BeetsPlugin):
|
|||
app.config['JSONIFY_PRETTYPRINT_REGULAR'] = False
|
||||
|
||||
app.config['INCLUDE_PATHS'] = self.config['include_paths']
|
||||
app.config['READONLY'] = self.config['readonly']
|
||||
|
||||
# Enable CORS if required.
|
||||
if self.config['cors']:
|
||||
|
|
|
|||
|
|
@ -335,6 +335,9 @@ Fixes:
|
|||
* :doc:`/plugins/chroma`: Fixed submitting AcoustID information for tracks
|
||||
that already have a fingerprint.
|
||||
:bug:`3834`
|
||||
* :doc:`/plugins/web`: DELETE and PATCH methods are disallowed by default.
|
||||
Set ``readonly: no`` web config option to enable them.
|
||||
:bug:`3870`
|
||||
|
||||
For plugin developers:
|
||||
|
||||
|
|
|
|||
|
|
@ -66,6 +66,8 @@ configuration file. The available options are:
|
|||
Default: false.
|
||||
- **include_paths**: If true, includes paths in item objects.
|
||||
Default: false.
|
||||
- **readonly**: If true, DELETE and PATCH operations are not allowed. Only GET is permitted.
|
||||
Default: true.
|
||||
|
||||
Implementation
|
||||
--------------
|
||||
|
|
@ -189,6 +191,8 @@ code.
|
|||
Removes the item with id *6* from the beets library. If the *?delete* query string is included,
|
||||
the matching file will be deleted from disk.
|
||||
|
||||
Only allowed if ``readonly`` configuration option is set to ``no``.
|
||||
|
||||
``PATCH /item/6``
|
||||
++++++++++++++++++
|
||||
|
||||
|
|
@ -203,6 +207,8 @@ Returns the updated JSON representation. ::
|
|||
...
|
||||
}
|
||||
|
||||
Only allowed if ``readonly`` configuration option is set to ``no``.
|
||||
|
||||
``GET /item/6,12,13``
|
||||
+++++++++++++++++++++
|
||||
|
||||
|
|
@ -279,6 +285,7 @@ or ``/album/5,7``. In addition we can request the cover art of an album with
|
|||
``GET /album/5/art``.
|
||||
You can also add the '?expand' flag to get the individual items of an album.
|
||||
|
||||
``DELETE`` is only allowed if ``readonly`` configuration option is set to ``no``.
|
||||
|
||||
``GET /stats``
|
||||
++++++++++++++
|
||||
|
|
|
|||
363
test/test_web.py
363
test/test_web.py
|
|
@ -8,6 +8,7 @@ import json
|
|||
import unittest
|
||||
import os.path
|
||||
from six import assertCountEqual
|
||||
import shutil
|
||||
|
||||
from test import _common
|
||||
from beets.library import Item, Album
|
||||
|
|
@ -65,6 +66,7 @@ class WebPluginTest(_common.LibTestCase):
|
|||
web.app.config['TESTING'] = True
|
||||
web.app.config['lib'] = self.lib
|
||||
web.app.config['INCLUDE_PATHS'] = False
|
||||
web.app.config['READONLY'] = True
|
||||
self.client = web.app.test_client()
|
||||
|
||||
def test_config_include_paths_true(self):
|
||||
|
|
@ -308,6 +310,367 @@ class WebPluginTest(_common.LibTestCase):
|
|||
self.assertEqual(res_json['items'], 3)
|
||||
self.assertEqual(res_json['albums'], 2)
|
||||
|
||||
def test_delete_item_id(self):
|
||||
|
||||
web.app.config['READONLY'] = False
|
||||
|
||||
# Create a temporary item
|
||||
item_id = self.lib.add(Item(title=u'test_delete_item_id',
|
||||
test_delete_item_id=1))
|
||||
|
||||
# Check we can find the temporary item we just created
|
||||
response = self.client.get('/item/' + str(item_id))
|
||||
res_json = json.loads(response.data.decode('utf-8'))
|
||||
self.assertEqual(response.status_code, 200)
|
||||
self.assertEqual(res_json['id'], item_id)
|
||||
|
||||
# Delete item by id
|
||||
response = self.client.delete('/item/' + str(item_id))
|
||||
res_json = json.loads(response.data.decode('utf-8'))
|
||||
self.assertEqual(response.status_code, 200)
|
||||
|
||||
# Check the item has gone
|
||||
response = self.client.get('/item/' + str(item_id))
|
||||
self.assertEqual(response.status_code, 404)
|
||||
# Note: if this fails, the item may still be around
|
||||
# and may cause other tests to fail
|
||||
|
||||
def test_delete_item_without_file(self):
|
||||
|
||||
web.app.config['READONLY'] = False
|
||||
|
||||
# Create an item with a file
|
||||
ipath = os.path.join(self.temp_dir, b'testfile1.mp3')
|
||||
shutil.copy(os.path.join(_common.RSRC, b'full.mp3'), ipath)
|
||||
self.assertTrue(os.path.exists(ipath))
|
||||
item_id = self.lib.add(Item.from_path(ipath))
|
||||
|
||||
# Check we can find the temporary item we just created
|
||||
response = self.client.get('/item/' + str(item_id))
|
||||
res_json = json.loads(response.data.decode('utf-8'))
|
||||
self.assertEqual(response.status_code, 200)
|
||||
self.assertEqual(res_json['id'], item_id)
|
||||
|
||||
# Delete item by id, without deleting file
|
||||
response = self.client.delete('/item/' + str(item_id))
|
||||
res_json = json.loads(response.data.decode('utf-8'))
|
||||
self.assertEqual(response.status_code, 200)
|
||||
|
||||
# Check the item has gone
|
||||
response = self.client.get('/item/' + str(item_id))
|
||||
self.assertEqual(response.status_code, 404)
|
||||
|
||||
# Check the file has not gone
|
||||
self.assertTrue(os.path.exists(ipath))
|
||||
os.remove(ipath)
|
||||
|
||||
def test_delete_item_with_file(self):
|
||||
|
||||
web.app.config['READONLY'] = False
|
||||
|
||||
# Create an item with a file
|
||||
ipath = os.path.join(self.temp_dir, b'testfile2.mp3')
|
||||
shutil.copy(os.path.join(_common.RSRC, b'full.mp3'), ipath)
|
||||
self.assertTrue(os.path.exists(ipath))
|
||||
item_id = self.lib.add(Item.from_path(ipath))
|
||||
|
||||
# Check we can find the temporary item we just created
|
||||
response = self.client.get('/item/' + str(item_id))
|
||||
res_json = json.loads(response.data.decode('utf-8'))
|
||||
self.assertEqual(response.status_code, 200)
|
||||
self.assertEqual(res_json['id'], item_id)
|
||||
|
||||
# Delete item by id, with file
|
||||
response = self.client.delete('/item/' + str(item_id) + '?delete')
|
||||
res_json = json.loads(response.data.decode('utf-8'))
|
||||
self.assertEqual(response.status_code, 200)
|
||||
|
||||
# Check the item has gone
|
||||
response = self.client.get('/item/' + str(item_id))
|
||||
self.assertEqual(response.status_code, 404)
|
||||
|
||||
# Check the file has gone
|
||||
self.assertFalse(os.path.exists(ipath))
|
||||
|
||||
def test_delete_item_query(self):
|
||||
|
||||
web.app.config['READONLY'] = False
|
||||
|
||||
# Create a temporary item
|
||||
self.lib.add(Item(title=u'test_delete_item_query',
|
||||
test_delete_item_query=1))
|
||||
|
||||
# Check we can find the temporary item we just created
|
||||
response = self.client.get('/item/query/test_delete_item_query')
|
||||
res_json = json.loads(response.data.decode('utf-8'))
|
||||
self.assertEqual(response.status_code, 200)
|
||||
self.assertEqual(len(res_json['results']), 1)
|
||||
|
||||
# Delete item by query
|
||||
response = self.client.delete('/item/query/test_delete_item_query')
|
||||
res_json = json.loads(response.data.decode('utf-8'))
|
||||
self.assertEqual(response.status_code, 200)
|
||||
|
||||
# Check the item has gone
|
||||
response = self.client.get('/item/query/test_delete_item_query')
|
||||
res_json = json.loads(response.data.decode('utf-8'))
|
||||
self.assertEqual(response.status_code, 200)
|
||||
self.assertEqual(len(res_json['results']), 0)
|
||||
|
||||
def test_delete_item_all_fails(self):
|
||||
""" DELETE is not supported for list all """
|
||||
|
||||
web.app.config['READONLY'] = False
|
||||
|
||||
# Delete all items
|
||||
response = self.client.delete('/item/')
|
||||
self.assertEqual(response.status_code, 405)
|
||||
|
||||
# Note: if this fails, all items have gone and rest of
|
||||
# tests wil fail!
|
||||
|
||||
def test_delete_item_id_readonly(self):
|
||||
|
||||
web.app.config['READONLY'] = True
|
||||
|
||||
# Create a temporary item
|
||||
item_id = self.lib.add(Item(title=u'test_delete_item_id_ro',
|
||||
test_delete_item_id_ro=1))
|
||||
|
||||
# Check we can find the temporary item we just created
|
||||
response = self.client.get('/item/' + str(item_id))
|
||||
res_json = json.loads(response.data.decode('utf-8'))
|
||||
self.assertEqual(response.status_code, 200)
|
||||
self.assertEqual(res_json['id'], item_id)
|
||||
|
||||
# Try to delete item by id
|
||||
response = self.client.delete('/item/' + str(item_id))
|
||||
self.assertEqual(response.status_code, 405)
|
||||
|
||||
# Check the item has not gone
|
||||
response = self.client.get('/item/' + str(item_id))
|
||||
res_json = json.loads(response.data.decode('utf-8'))
|
||||
self.assertEqual(response.status_code, 200)
|
||||
self.assertEqual(res_json['id'], item_id)
|
||||
|
||||
# Remove it
|
||||
self.lib.get_item(item_id).remove()
|
||||
|
||||
def test_delete_item_query_readonly(self):
|
||||
|
||||
web.app.config['READONLY'] = True
|
||||
|
||||
# Create a temporary item
|
||||
item_id = self.lib.add(Item(title=u'test_delete_item_q_ro',
|
||||
test_delete_item_q_ro=1))
|
||||
|
||||
# Check we can find the temporary item we just created
|
||||
response = self.client.get('/item/query/test_delete_item_q_ro')
|
||||
res_json = json.loads(response.data.decode('utf-8'))
|
||||
self.assertEqual(response.status_code, 200)
|
||||
self.assertEqual(len(res_json['results']), 1)
|
||||
|
||||
# Try to delete item by query
|
||||
response = self.client.delete('/item/query/test_delete_item_q_ro')
|
||||
self.assertEqual(response.status_code, 405)
|
||||
|
||||
# Check the item has not gone
|
||||
response = self.client.get('/item/query/test_delete_item_q_ro')
|
||||
res_json = json.loads(response.data.decode('utf-8'))
|
||||
self.assertEqual(response.status_code, 200)
|
||||
self.assertEqual(len(res_json['results']), 1)
|
||||
|
||||
# Remove it
|
||||
self.lib.get_item(item_id).remove()
|
||||
|
||||
def test_delete_album_id(self):
|
||||
|
||||
web.app.config['READONLY'] = False
|
||||
|
||||
# Create a temporary album
|
||||
album_id = self.lib.add(Album(album=u'test_delete_album_id',
|
||||
test_delete_album_id=1))
|
||||
|
||||
# Check we can find the temporary album we just created
|
||||
response = self.client.get('/album/' + str(album_id))
|
||||
res_json = json.loads(response.data.decode('utf-8'))
|
||||
self.assertEqual(response.status_code, 200)
|
||||
self.assertEqual(res_json['id'], album_id)
|
||||
|
||||
# Delete album by id
|
||||
response = self.client.delete('/album/' + str(album_id))
|
||||
res_json = json.loads(response.data.decode('utf-8'))
|
||||
self.assertEqual(response.status_code, 200)
|
||||
|
||||
# Check the album has gone
|
||||
response = self.client.get('/album/' + str(album_id))
|
||||
self.assertEqual(response.status_code, 404)
|
||||
# Note: if this fails, the album may still be around
|
||||
# and may cause other tests to fail
|
||||
|
||||
def test_delete_album_query(self):
|
||||
|
||||
web.app.config['READONLY'] = False
|
||||
|
||||
# Create a temporary album
|
||||
self.lib.add(Album(album=u'test_delete_album_query',
|
||||
test_delete_album_query=1))
|
||||
|
||||
# Check we can find the temporary album we just created
|
||||
response = self.client.get('/album/query/test_delete_album_query')
|
||||
res_json = json.loads(response.data.decode('utf-8'))
|
||||
self.assertEqual(response.status_code, 200)
|
||||
self.assertEqual(len(res_json['results']), 1)
|
||||
|
||||
# Delete album
|
||||
response = self.client.delete('/album/query/test_delete_album_query')
|
||||
res_json = json.loads(response.data.decode('utf-8'))
|
||||
self.assertEqual(response.status_code, 200)
|
||||
|
||||
# Check the album has gone
|
||||
response = self.client.get('/album/query/test_delete_album_query')
|
||||
res_json = json.loads(response.data.decode('utf-8'))
|
||||
self.assertEqual(response.status_code, 200)
|
||||
self.assertEqual(len(res_json['results']), 0)
|
||||
|
||||
def test_delete_album_all_fails(self):
|
||||
""" DELETE is not supported for list all """
|
||||
|
||||
web.app.config['READONLY'] = False
|
||||
|
||||
# Delete all albums
|
||||
response = self.client.delete('/album/')
|
||||
self.assertEqual(response.status_code, 405)
|
||||
|
||||
# Note: if this fails, all albums have gone and rest of
|
||||
# tests wil fail!
|
||||
|
||||
def test_delete_album_id_readonly(self):
|
||||
|
||||
web.app.config['READONLY'] = True
|
||||
|
||||
# Create a temporary album
|
||||
album_id = self.lib.add(Album(album=u'test_delete_album_id_ro',
|
||||
test_delete_album_id_ro=1))
|
||||
|
||||
# Check we can find the temporary album we just created
|
||||
response = self.client.get('/album/' + str(album_id))
|
||||
res_json = json.loads(response.data.decode('utf-8'))
|
||||
self.assertEqual(response.status_code, 200)
|
||||
self.assertEqual(res_json['id'], album_id)
|
||||
|
||||
# Try to delete album by id
|
||||
response = self.client.delete('/album/' + str(album_id))
|
||||
self.assertEqual(response.status_code, 405)
|
||||
|
||||
# Check the item has not gone
|
||||
response = self.client.get('/album/' + str(album_id))
|
||||
res_json = json.loads(response.data.decode('utf-8'))
|
||||
self.assertEqual(response.status_code, 200)
|
||||
self.assertEqual(res_json['id'], album_id)
|
||||
|
||||
# Remove it
|
||||
self.lib.get_album(album_id).remove()
|
||||
|
||||
def test_delete_album_query_readonly(self):
|
||||
|
||||
web.app.config['READONLY'] = True
|
||||
|
||||
# Create a temporary album
|
||||
album_id = self.lib.add(Album(album=u'test_delete_album_query_ro',
|
||||
test_delete_album_query_ro=1))
|
||||
|
||||
# Check we can find the temporary album we just created
|
||||
response = self.client.get('/album/query/test_delete_album_query_ro')
|
||||
res_json = json.loads(response.data.decode('utf-8'))
|
||||
self.assertEqual(response.status_code, 200)
|
||||
self.assertEqual(len(res_json['results']), 1)
|
||||
|
||||
# Try to delete album
|
||||
response = self.client.delete(
|
||||
'/album/query/test_delete_album_query_ro'
|
||||
)
|
||||
self.assertEqual(response.status_code, 405)
|
||||
|
||||
# Check the album has not gone
|
||||
response = self.client.get('/album/query/test_delete_album_query_ro')
|
||||
res_json = json.loads(response.data.decode('utf-8'))
|
||||
self.assertEqual(response.status_code, 200)
|
||||
self.assertEqual(len(res_json['results']), 1)
|
||||
|
||||
# Remove it
|
||||
self.lib.get_album(album_id).remove()
|
||||
|
||||
def test_patch_item_id(self):
|
||||
# Note: PATCH is currently only implemented for track items, not albums
|
||||
|
||||
web.app.config['READONLY'] = False
|
||||
|
||||
# Create a temporary item
|
||||
item_id = self.lib.add(Item(title=u'test_patch_item_id',
|
||||
test_patch_f1=1,
|
||||
test_patch_f2="Old"))
|
||||
|
||||
# Check we can find the temporary item we just created
|
||||
response = self.client.get('/item/' + str(item_id))
|
||||
res_json = json.loads(response.data.decode('utf-8'))
|
||||
self.assertEqual(response.status_code, 200)
|
||||
self.assertEqual(res_json['id'], item_id)
|
||||
self.assertEqual(
|
||||
[res_json['test_patch_f1'], res_json['test_patch_f2']],
|
||||
['1', 'Old'])
|
||||
|
||||
# Patch item by id
|
||||
# patch_json = json.JSONEncoder().encode({"test_patch_f2": "New"}]})
|
||||
response = self.client.patch('/item/' + str(item_id),
|
||||
json={"test_patch_f2": "New"})
|
||||
res_json = json.loads(response.data.decode('utf-8'))
|
||||
self.assertEqual(response.status_code, 200)
|
||||
self.assertEqual(res_json['id'], item_id)
|
||||
self.assertEqual(
|
||||
[res_json['test_patch_f1'], res_json['test_patch_f2']],
|
||||
['1', 'New'])
|
||||
|
||||
# Check the update has really worked
|
||||
response = self.client.get('/item/' + str(item_id))
|
||||
res_json = json.loads(response.data.decode('utf-8'))
|
||||
self.assertEqual(response.status_code, 200)
|
||||
self.assertEqual(res_json['id'], item_id)
|
||||
self.assertEqual(
|
||||
[res_json['test_patch_f1'], res_json['test_patch_f2']],
|
||||
['1', 'New'])
|
||||
|
||||
# Remove the item
|
||||
self.lib.get_item(item_id).remove()
|
||||
|
||||
def test_patch_item_id_readonly(self):
|
||||
# Note: PATCH is currently only implemented for track items, not albums
|
||||
|
||||
web.app.config['READONLY'] = True
|
||||
|
||||
# Create a temporary item
|
||||
item_id = self.lib.add(Item(title=u'test_patch_item_id_ro',
|
||||
test_patch_f1=2,
|
||||
test_patch_f2="Old"))
|
||||
|
||||
# Check we can find the temporary item we just created
|
||||
response = self.client.get('/item/' + str(item_id))
|
||||
res_json = json.loads(response.data.decode('utf-8'))
|
||||
self.assertEqual(response.status_code, 200)
|
||||
self.assertEqual(res_json['id'], item_id)
|
||||
self.assertEqual(
|
||||
[res_json['test_patch_f1'], res_json['test_patch_f2']],
|
||||
['2', 'Old'])
|
||||
|
||||
# Patch item by id
|
||||
# patch_json = json.JSONEncoder().encode({"test_patch_f2": "New"})
|
||||
response = self.client.patch('/item/' + str(item_id),
|
||||
json={"test_patch_f2": "New"})
|
||||
self.assertEqual(response.status_code, 405)
|
||||
|
||||
# Remove the item
|
||||
self.lib.get_item(item_id).remove()
|
||||
|
||||
|
||||
def suite():
|
||||
return unittest.TestLoader().loadTestsFromName(__name__)
|
||||
|
|
|
|||
Loading…
Reference in a new issue