Skip to content
This repository was archived by the owner on Oct 29, 2024. It is now read-only.

Add more operations to the client #179

Merged
merged 5 commits into from
May 5, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 93 additions & 0 deletions influxdb/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,43 @@ def create_retention_policy(self, name, duration, replication,

self.query(query_string)

def alter_retention_policy(self, name, database=None,
duration=None, replication=None, default=None):
"""Mofidy an existing retention policy for a database.

:param name: the name of the retention policy to modify
:type name: str
:param database: the database for which the retention policy is
modified. Defaults to current client's database
:type database: str
:param duration: the new duration of the existing retention policy.
Durations such as 1h, 90m, 12h, 7d, and 4w, are all supported
and mean 1 hour, 90 minutes, 12 hours, 7 day, and 4 weeks,
respectively. For infinite retention – meaning the data will
never be deleted – use 'INF' for duration.
The minimum retention period is 1 hour.
:type duration: str
:param replication: the new replication of the existing
retention policy
:type replication: str
:param default: whether or not to set the modified policy as default
:type default: bool

.. note:: at least one of duration, replication, or default flag
should be set. Otherwise the operation will fail.
"""
query_string = (
"ALTER RETENTION POLICY {} ON {}"
).format(name, database or self._database)
if duration:
query_string += " DURATION {}".format(duration)
if replication:
query_string += " REPLICATION {}".format(replication)
if default is True:
query_string += " DEFAULT"

self.query(query_string)

def get_list_retention_policies(self, database=None):
"""Get the list of retention policies for a database.

Expand Down Expand Up @@ -581,6 +618,62 @@ def delete_series(self, name, database=None):
database = database or self._database
self.query('DROP SERIES \"%s\"' % name, database=database)

def grant_admin_privileges(self, username):
"""Grant cluster administration privileges to an user.

:param username: the username to grant privileges to
:type username: str

.. note:: Only a cluster administrator can create/ drop databases
and manage users.
"""
text = "GRANT ALL PRIVILEGES TO {}".format(username)
self.query(text)

def revoke_admin_privileges(self, username):
"""Revoke cluster administration privileges from an user.

:param username: the username to revoke privileges from
:type username: str

.. note:: Only a cluster administrator can create/ drop databases
and manage users.
"""
text = "REVOKE ALL PRIVILEGES FROM {}".format(username)
self.query(text)

def grant_privilege(self, privilege, database, username):
"""Grant a privilege on a database to an user.

:param privilege: the privilege to grant, one of 'read', 'write'
or 'all'. The string is case-insensitive
:type privilege: str
:param database: the database to grant the privilege on
:type database: str
:param username: the username to grant the privilege to
:type username: str
"""
text = "GRANT {} ON {} TO {}".format(privilege,
database,
username)
self.query(text)

def revoke_privilege(self, privilege, database, username):
"""Revoke a privilege on a database from an user.

:param privilege: the privilege to revoke, one of 'read', 'write'
or 'all'. The string is case-insensitive
:type privilege: str
:param database: the database to revoke the privilege on
:type database: str
:param username: the username to revoke the privilege from
:type username: str
"""
text = "REVOKE {} ON {} FROM {}".format(privilege,
database,
username)
self.query(text)

def send_packet(self, packet):
"""Send an UDP packet.

Expand Down
126 changes: 126 additions & 0 deletions tests/influxdb/client_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,44 @@ def test_create_retention_policy(self):
'db duration 1d replication 4'
)

def test_alter_retention_policy(self):
example_response = '{"results":[{}]}'

with requests_mock.Mocker() as m:
m.register_uri(
requests_mock.GET,
"http://localhost:8086/query",
text=example_response
)
# Test alter duration
self.cli.alter_retention_policy('somename', 'db',
duration='4d')
self.assertEqual(
m.last_request.qs['q'][0],
'alter retention policy somename on db duration 4d'
)
# Test alter replication
self.cli.alter_retention_policy('somename', 'db',
replication=4)
self.assertEqual(
m.last_request.qs['q'][0],
'alter retention policy somename on db replication 4'
)

# Test alter default
self.cli.alter_retention_policy('somename', 'db',
default=True)
self.assertEqual(
m.last_request.qs['q'][0],
'alter retention policy somename on db default'
)

@raises(Exception)
def test_alter_retention_policy_invalid(self):
cli = InfluxDBClient('host', 8086, 'username', 'password')
with _mocked_session(cli, 'get', 400):
self.cli.alter_retention_policy('somename', 'db')

def test_get_list_retention_policies(self):
example_response = \
'{"results": [{"series": [{"values": [["fsfdsdf", "24h0m0s", 2]],'\
Expand Down Expand Up @@ -586,6 +624,94 @@ def test_get_list_users_empty(self):

self.assertListEqual(self.cli.get_list_users(), [])

def test_grant_admin_privileges(self):
example_response = '{"results":[{}]}'

with requests_mock.Mocker() as m:
m.register_uri(
requests_mock.GET,
"http://localhost:8086/query",
text=example_response
)
self.cli.grant_admin_privileges('test')

self.assertEqual(
m.last_request.qs['q'][0],
'grant all privileges to test'
)

@raises(Exception)
def test_grant_admin_privileges_invalid(self):
cli = InfluxDBClient('host', 8086, 'username', 'password')
with _mocked_session(cli, 'get', 400):
self.cli.grant_admin_privileges('')

def test_revoke_admin_privileges(self):
example_response = '{"results":[{}]}'

with requests_mock.Mocker() as m:
m.register_uri(
requests_mock.GET,
"http://localhost:8086/query",
text=example_response
)
self.cli.revoke_admin_privileges('test')

self.assertEqual(
m.last_request.qs['q'][0],
'revoke all privileges from test'
)

@raises(Exception)
def test_revoke_admin_privileges_invalid(self):
cli = InfluxDBClient('host', 8086, 'username', 'password')
with _mocked_session(cli, 'get', 400):
self.cli.revoke_admin_privileges('')

def test_grant_privilege(self):
example_response = '{"results":[{}]}'

with requests_mock.Mocker() as m:
m.register_uri(
requests_mock.GET,
"http://localhost:8086/query",
text=example_response
)
self.cli.grant_privilege('read', 'testdb', 'test')

self.assertEqual(
m.last_request.qs['q'][0],
'grant read on testdb to test'
)

@raises(Exception)
def test_grant_privilege_invalid(self):
cli = InfluxDBClient('host', 8086, 'username', 'password')
with _mocked_session(cli, 'get', 400):
self.cli.grant_privilege('', 'testdb', 'test')

def test_revoke_privilege(self):
example_response = '{"results":[{}]}'

with requests_mock.Mocker() as m:
m.register_uri(
requests_mock.GET,
"http://localhost:8086/query",
text=example_response
)
self.cli.revoke_privilege('read', 'testdb', 'test')

self.assertEqual(
m.last_request.qs['q'][0],
'revoke read on testdb from test'
)

@raises(Exception)
def test_revoke_privilege_invalid(self):
cli = InfluxDBClient('host', 8086, 'username', 'password')
with _mocked_session(cli, 'get', 400):
self.cli.revoke_privilege('', 'testdb', 'test')


class FakeClient(InfluxDBClient):
fail = False
Expand Down
116 changes: 116 additions & 0 deletions tests/influxdb/client_test_with_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,67 @@ def test_drop_user_invalid(self):
'found invalid, expected',
ctx.exception.content)

def test_grant_admin_privileges(self):
self.cli.create_user('test', 'test')
self.assertEqual([{'user': 'test', 'admin': False}],
self.cli.get_list_users())
self.cli.grant_admin_privileges('test')
self.assertEqual([{'user': 'test', 'admin': True}],
self.cli.get_list_users())

def test_grant_admin_privileges_invalid(self):
with self.assertRaises(InfluxDBClientError) as ctx:
self.cli.grant_admin_privileges('')
self.assertEqual(400, ctx.exception.code)
self.assertIn('{"error":"error parsing query: ',
ctx.exception.content)

def test_revoke_admin_privileges(self):
self.cli.create_user('test', 'test')
self.cli.grant_admin_privileges('test')
self.assertEqual([{'user': 'test', 'admin': True}],
self.cli.get_list_users())
self.cli.revoke_admin_privileges('test')
self.assertEqual([{'user': 'test', 'admin': False}],
self.cli.get_list_users())

def test_revoke_admin_privileges_invalid(self):
with self.assertRaises(InfluxDBClientError) as ctx:
self.cli.revoke_admin_privileges('')
self.assertEqual(400, ctx.exception.code)
self.assertIn('{"error":"error parsing query: ',
ctx.exception.content)

def test_grant_privilege(self):
self.cli.create_user('test', 'test')
self.cli.create_database('testdb')
self.cli.grant_privilege('all', 'testdb', 'test')
# TODO: when supported by InfluxDB, check if privileges are granted

def test_grant_privilege_invalid(self):
self.cli.create_user('test', 'test')
self.cli.create_database('testdb')
with self.assertRaises(InfluxDBClientError) as ctx:
self.cli.grant_privilege('', 'testdb', 'test')
self.assertEqual(400, ctx.exception.code)
self.assertIn('{"error":"error parsing query: ',
ctx.exception.content)

def test_revoke_privilege(self):
self.cli.create_user('test', 'test')
self.cli.create_database('testdb')
self.cli.revoke_privilege('all', 'testdb', 'test')
# TODO: when supported by InfluxDB, check if privileges are revoked

def test_revoke_privilege_invalid(self):
self.cli.create_user('test', 'test')
self.cli.create_database('testdb')
with self.assertRaises(InfluxDBClientError) as ctx:
self.cli.revoke_privilege('', 'testdb', 'test')
self.assertEqual(400, ctx.exception.code)
self.assertIn('{"error":"error parsing query: ',
ctx.exception.content)


############################################################################

Expand Down Expand Up @@ -657,6 +718,61 @@ def test_create_retention_policy(self):
rsp
)

def test_alter_retention_policy(self):
self.cli.create_retention_policy('somename', '1d', 1)

# Test alter duration
self.cli.alter_retention_policy('somename', 'db',
duration='4d')
rsp = self.cli.get_list_retention_policies()
self.assertEqual(
[{'duration': '0', 'default': True,
'replicaN': 1, 'name': 'default'},
{'duration': '96h0m0s', 'default': False,
'replicaN': 1, 'name': 'somename'}],
rsp
)

# Test alter replication
self.cli.alter_retention_policy('somename', 'db',
replication=4)
rsp = self.cli.get_list_retention_policies()
self.assertEqual(
[{'duration': '0', 'default': True,
'replicaN': 1, 'name': 'default'},
{'duration': '96h0m0s', 'default': False,
'replicaN': 4, 'name': 'somename'}],
rsp
)

# Test alter default
self.cli.alter_retention_policy('somename', 'db',
default=True)
rsp = self.cli.get_list_retention_policies()
self.assertEqual(
[{'duration': '0', 'default': False,
'replicaN': 1, 'name': 'default'},
{'duration': '96h0m0s', 'default': True,
'replicaN': 4, 'name': 'somename'}],
rsp
)

def test_alter_retention_policy_invalid(self):
self.cli.create_retention_policy('somename', '1d', 1)
with self.assertRaises(InfluxDBClientError) as ctx:
self.cli.alter_retention_policy('somename', 'db')
self.assertEqual(400, ctx.exception.code)
self.assertIn('{"error":"error parsing query: ',
ctx.exception.content)
rsp = self.cli.get_list_retention_policies()
self.assertEqual(
[{'duration': '0', 'default': True,
'replicaN': 1, 'name': 'default'},
{'duration': '24h0m0s', 'default': False,
'replicaN': 1, 'name': 'somename'}],
rsp
)

def test_issue_143(self):
pt = partial(point, 'a_serie_name', timestamp='2015-03-30T16:16:37Z')
pts = [
Expand Down