Skip to content
Snippets Groups Projects
Commit 84c2183c authored by Benjamin Bertrand's avatar Benjamin Bertrand
Browse files

Add hosts to API

parent b52cb0f8
No related branches found
No related tags found
No related merge requests found
...@@ -14,7 +14,7 @@ from flask import (current_app, Blueprint, jsonify, request) ...@@ -14,7 +14,7 @@ from flask import (current_app, Blueprint, jsonify, request)
from flask_jwt_extended import create_access_token, jwt_required from flask_jwt_extended import create_access_token, jwt_required
from flask_ldap3_login import AuthenticationResponseStatus from flask_ldap3_login import AuthenticationResponseStatus
from ..extensions import ldap_manager, db from ..extensions import ldap_manager, db
from ..models import Item, Manufacturer, Model, Location, Status, Action, Network from ..models import Item, Manufacturer, Model, Location, Status, Action, Network, Host
from .. import utils from .. import utils
from ..decorators import jwt_groups_accepted from ..decorators import jwt_groups_accepted
...@@ -255,3 +255,21 @@ def get_networks(): ...@@ -255,3 +255,21 @@ def get_networks():
def create_network(): def create_network():
"""Create a new network""" """Create a new network"""
return create_generic_model(Network, mandatory_fields=('prefix', 'first', 'last')) return create_generic_model(Network, mandatory_fields=('prefix', 'first', 'last'))
@bp.route('/hosts')
@jwt_required
def get_hosts():
# TODO: add pagination
query = utils.get_query(Host.query, request.args)
hosts = query.order_by(Host.ip)
data = [host.to_dict() for host in hosts]
return jsonify(data)
@bp.route('/hosts', methods=['POST'])
@jwt_required
@jwt_groups_accepted('admin', 'create')
def create_host():
"""Create a new host"""
return create_generic_model(Host, mandatory_fields=('network', 'ip'))
...@@ -388,6 +388,21 @@ class Host(db.Model): ...@@ -388,6 +388,21 @@ class Host(db.Model):
mac = db.relationship('Mac', backref='host') mac = db.relationship('Mac', backref='host')
def __init__(self, **kwargs):
# Automatically convert network to an instance of Network if it was passed
# as a prefix string
if 'network' in kwargs:
kwargs['network'] = utils.convert_to_model(kwargs['network'], Network, 'prefix')
# WARNING! Setting self.network will call validates_hosts in the Network class
# For the validation to work, self.ip must be set before!
# Ensure that ip is passed before network
try:
ip = kwargs.pop('ip')
except KeyError:
super().__init__(**kwargs)
else:
super().__init__(ip=ip, **kwargs)
@property @property
def address(self): def address(self):
return ipaddress.ip_address(self.ip) return ipaddress.ip_address(self.ip)
......
...@@ -59,7 +59,7 @@ def format_field(field): ...@@ -59,7 +59,7 @@ def format_field(field):
return str(field) return str(field)
def convert_to_model(item, model): def convert_to_model(item, model, filter='name'):
"""Convert item to an instance of model """Convert item to an instance of model
Allow to convert a string to an instance of model Allow to convert a string to an instance of model
...@@ -70,7 +70,8 @@ def convert_to_model(item, model): ...@@ -70,7 +70,8 @@ def convert_to_model(item, model):
if item is None: if item is None:
return None return None
if not isinstance(item, model): if not isinstance(item, model):
instance = model.query.filter_by(name=item).first() kwarg = {filter: item}
instance = model.query.filter_by(**kwarg).first()
if instance is None: if instance is None:
raise CSEntryError(f'{item} is not a valid {model.__name__.lower()}') raise CSEntryError(f'{item} is not a valid {model.__name__.lower()}')
return instance return instance
......
...@@ -22,9 +22,10 @@ ENDPOINT_MODEL = { ...@@ -22,9 +22,10 @@ ENDPOINT_MODEL = {
'status': models.Status, 'status': models.Status,
'items': models.Item, 'items': models.Item,
'networks': models.Network, 'networks': models.Network,
'hosts': models.Host,
} }
GENERIC_GET_ENDPOINTS = [key for key in ENDPOINT_MODEL.keys() if key not in ('items', 'networks')] GENERIC_GET_ENDPOINTS = [key for key in ENDPOINT_MODEL.keys() if key not in ('items', 'networks', 'hosts')]
GENERIC_CREATE_ENDPOINTS = [key for key in ENDPOINT_MODEL.keys() if key not in ('items', 'actions', 'networks')] GENERIC_CREATE_ENDPOINTS = [key for key in ENDPOINT_MODEL.keys() if key not in ('items', 'actions', 'networks', 'hosts')]
CREATE_AUTH_ENDPOINTS = [key for key in ENDPOINT_MODEL.keys() if key != 'actions'] CREATE_AUTH_ENDPOINTS = [key for key in ENDPOINT_MODEL.keys() if key != 'actions']
...@@ -571,3 +572,88 @@ def test_create_network_invalid_range(client, session, admin_token): ...@@ -571,3 +572,88 @@ def test_create_network_invalid_range(client, session, admin_token):
'last': '172.16.1.9'} 'last': '172.16.1.9'}
response = post(client, '/api/networks', data=data, token=admin_token) response = post(client, '/api/networks', data=data, token=admin_token)
check_response_message(response, 'Last IP address 172.16.1.9 is less than the first address 172.16.1.10', 422) check_response_message(response, 'Last IP address 172.16.1.9 is less than the first address 172.16.1.10', 422)
def test_get_hosts(client, session, readonly_token):
# Create some hosts
network1 = models.Network(prefix='192.168.1.0/24', first='192.168.1.10', last='192.168.1.250')
network2 = models.Network(prefix='192.168.2.0/24', first='192.168.2.10', last='192.168.2.250')
session.add(network1)
session.add(network2)
session.flush()
host1 = models.Host(network_id=network1.id, ip='192.168.1.10')
host2 = models.Host(network_id=network1.id, ip='192.168.1.11', name='hostname2')
host3 = models.Host(network_id=network2.id, ip='192.168.2.10')
for host in (host1, host2, host3):
session.add(host)
session.commit()
response = get(client, '/api/hosts', token=readonly_token)
assert response.status_code == 200
assert len(response.json) == 3
check_input_is_subset_of_response(response, (host1.to_dict(), host2.to_dict(), host3.to_dict()))
# test filtering by network_id
response = get(client, f'/api/hosts?network_id={network2.id}', token=readonly_token)
assert response.status_code == 200
assert len(response.json) == 1
check_input_is_subset_of_response(response, (host3.to_dict(),))
def test_create_host(client, session, user_token):
network = models.Network(prefix='192.168.1.0/24', first='192.168.1.10', last='192.168.1.250')
session.add(network)
session.commit()
# check that network_id and ip are mandatory
response = post(client, '/api/hosts', data={}, token=user_token)
check_response_message(response, "Missing mandatory field 'network'", 422)
response = post(client, '/api/hosts', data={'ip': '192.168.1.20'}, token=user_token)
check_response_message(response, "Missing mandatory field 'network'", 422)
response = post(client, '/api/hosts', data={'network': network.prefix}, token=user_token)
check_response_message(response, "Missing mandatory field 'ip'", 422)
data = {'network': network.prefix,
'ip': '192.168.1.20'}
response = post(client, '/api/hosts', data=data, token=user_token)
assert response.status_code == 201
assert {'id', 'network_id', 'ip', 'name'} == set(response.json.keys())
assert response.json['network_id'] == network.id
assert response.json['ip'] == '192.168.1.20'
# Check that IP shall be unique
response = post(client, '/api/hosts', data=data, token=user_token)
check_response_message(response, '(psycopg2.IntegrityError) duplicate key value violates unique constraint', 422)
# Check that all parameters can be passed
data2 = {'network': network.prefix,
'ip': '192.168.1.21',
'name': 'myhostname'}
response = post(client, '/api/hosts', data=data2, token=user_token)
assert response.status_code == 201
assert response.json['name'] == 'myhostname'
# check all items that were created
assert models.Host.query.count() == 2
@pytest.mark.parametrize('ip', ('', 'foo', '192.168'))
def test_create_host_invalid_ip(ip, client, session, user_token):
network = models.Network(prefix='192.168.1.0/24', first='192.168.1.10', last='192.168.1.250')
session.add(network)
session.commit()
# invalid IP address
data = {'network': network.prefix,
'ip': ip}
response = post(client, '/api/hosts', data=data, token=user_token)
check_response_message(response, f"'{ip}' does not appear to be an IPv4 or IPv6 address", 422)
def test_create_host_ip_not_in_network(client, session, user_token):
network = models.Network(prefix='192.168.1.0/24', first='192.168.1.10', last='192.168.1.250')
session.add(network)
session.commit()
# IP address not in range
data = {'network': network.prefix,
'ip': '192.168.2.4'}
response = post(client, '/api/hosts', data=data, token=user_token)
check_response_message(response, 'IP address 192.168.2.4 is not in network 192.168.1.0/24', 422)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment