From fc7ac34aced133bb47c8b102dff4ebf56ef9788c Mon Sep 17 00:00:00 2001 From: Benjamin Bertrand <benjamin.bertrand@esss.se> Date: Mon, 5 Nov 2018 18:10:32 +0100 Subject: [PATCH] Make host/interface/cname unique - Host names are unique (no 2 hosts with same name) - A host name shall not conflict with a cname or interface (if not main interface) - Interface names are unique (no 2 interfaces with same name) - An interface shall not conflict with a cname or host (if not assigned host) - cnames are only unique by domain (2 cnames can have the same name if different domains) - A cname shall not conflict with a host or interface - An interface has to be linked to a host Validation is implemented at the model level. Some is also implemented at the form level as it gives nicer feedback, but not all as it's a bit more complex when it requires check on several fields. JIRA INFRA-245 --- app/api/network.py | 5 +- app/models.py | 85 +++++++++- app/network/forms.py | 9 +- app/network/views.py | 160 +++++++++++------- app/static/js/hosts.js | 12 -- app/templates/network/create_host.html | 1 - app/validators.py | 28 +++ ...31_cname_shall_only_be_unique_by_domain.py | 23 +++ ...e6_interface_host_shall_not_be_nullable.py | 24 +++ tests/functional/factories.py | 33 ++-- tests/functional/test_api.py | 120 ++++++++----- tests/functional/test_models.py | 81 ++++++++- tests/functional/test_web.py | 30 ++-- 13 files changed, 448 insertions(+), 163 deletions(-) create mode 100644 migrations/versions/6b65e1309431_cname_shall_only_be_unique_by_domain.py create mode 100644 migrations/versions/7c38e78b6de6_interface_host_shall_not_be_nullable.py diff --git a/app/api/network.py b/app/api/network.py index ff90b87..58fc19d 100644 --- a/app/api/network.py +++ b/app/api/network.py @@ -129,14 +129,15 @@ def create_interface(): :jsonparam network: network name :jsonparam ip: interface IP :jsonparam name: interface name + :jsonparam host: host name :jsonparam mac: (optional) MAC address - :jsonparam host_id: (optional) linked host primary key """ # The validate_interfaces method from the Network class is called when # setting interface.network. This is why we don't pass network_id here # but network (as vlan_name string) + # Same for host return utils.create_generic_model( - models.Interface, mandatory_fields=("network", "ip", "name") + models.Interface, mandatory_fields=("network", "ip", "name", "host") ) diff --git a/app/models.py b/app/models.py index 7795899..a3d28ec 100644 --- a/app/models.py +++ b/app/models.py @@ -1103,7 +1103,15 @@ class Host(CreatedMixin, SearchableMixin, db.Model): # Force the string to lowercase lower_string = string.lower() if HOST_NAME_RE.fullmatch(lower_string) is None: - raise ValidationError("Interface name shall match [a-z0-9\-]{2,20}") + raise ValidationError("Host name shall match [a-z0-9\-]{2,20}") + existing_cname = Cname.query.filter_by(name=lower_string).first() + if existing_cname: + raise ValidationError(f"Host name matches an existing cname") + existing_interface = Interface.query.filter( + Interface.name == lower_string, Interface.host_id != self.id + ).first() + if existing_interface: + raise ValidationError(f"Host name matches an existing interface") return lower_string def stack_members(self): @@ -1145,7 +1153,7 @@ class Interface(CreatedMixin, db.Model): ip = db.Column(postgresql.INET, nullable=False, unique=True) name = db.Column(db.Text, nullable=False, unique=True) mac = db.Column(postgresql.MACADDR, nullable=True, unique=True) - host_id = db.Column(db.Integer, db.ForeignKey("host.id")) + host_id = db.Column(db.Integer, db.ForeignKey("host.id"), nullable=False) # Add delete and delete-orphan options to automatically delete cnames when: # - deleting an interface @@ -1164,6 +1172,16 @@ class Interface(CreatedMixin, db.Model): ) def __init__(self, **kwargs): + # Always set self.host and not self.host_id to call validate_name + host_id = kwargs.pop("host_id", None) + if host_id is not None: + host = Host.query.get(host_id) + elif "host" in kwargs: + # Automatically convert host to an instance of Host if it was passed + # as a string + host = utils.convert_to_model(kwargs.pop("host"), Host, "name") + else: + host = None # Always set self.network and not self.network_id to call validate_interfaces network_id = kwargs.pop("network_id", None) if network_id is not None: @@ -1180,9 +1198,9 @@ class Interface(CreatedMixin, db.Model): try: ip = kwargs.pop("ip") except KeyError: - super().__init__(**kwargs) + super().__init__(host=host, **kwargs) else: - super().__init__(ip=ip, **kwargs) + super().__init__(host=host, ip=ip, **kwargs) @validates("name") def validate_name(self, key, string): @@ -1193,6 +1211,18 @@ class Interface(CreatedMixin, db.Model): lower_string = string.lower() if HOST_NAME_RE.fullmatch(lower_string) is None: raise ValidationError("Interface name shall match [a-z0-9\-]{2,20}") + if self.host and not lower_string.startswith(self.host.name): + raise ValidationError( + f"Interface name shall start with the host name '{self.host}'" + ) + existing_cname = Cname.query.filter_by(name=lower_string).first() + if existing_cname: + raise ValidationError(f"Interface name matches an existing cname") + existing_host = Host.query.filter( + Host.name == lower_string, Host.id != self.host.id + ).first() + if existing_host: + raise ValidationError(f"Interface name matches an existing host") return lower_string @validates("mac") @@ -1204,6 +1234,17 @@ class Interface(CreatedMixin, db.Model): raise ValidationError(f"'{string}' does not appear to be a MAC address") return string + @validates("cnames") + def validate_cnames(self, key, cname): + """Ensure the cname is unique by domain""" + existing_cnames = Cname.query.filter_by(name=cname.name).all() + for existing_cname in existing_cnames: + if existing_cname.domain == str(self.network.domain): + raise ValidationError( + f"Duplicate cname on the {self.network.domain} domain" + ) + return cname + @property def address(self): return ipaddress.ip_address(self.ip) @@ -1270,12 +1311,46 @@ class Mac(db.Model): class Cname(CreatedMixin, db.Model): - name = db.Column(db.Text, nullable=False, unique=True) + name = db.Column(db.Text, nullable=False) interface_id = db.Column(db.Integer, db.ForeignKey("interface.id"), nullable=False) + def __init__(self, **kwargs): + # Always set self.interface and not self.interface_id to call validate_cnames + interface_id = kwargs.pop("interface_id", None) + if interface_id is not None: + kwargs["interface"] = Interface.query.get(interface_id) + super().__init__(**kwargs) + def __str__(self): return str(self.name) + @property + def domain(self): + """Return the cname domain name""" + return str(self.interface.network.domain) + + @property + def fqdn(self): + """Return the cname fully qualified domain name""" + return f"{self.name}.{self.domain}" + + @validates("name") + def validate_name(self, key, string): + """Ensure the name matches the required format""" + if string is None: + return None + # Force the string to lowercase + lower_string = string.lower() + if HOST_NAME_RE.fullmatch(lower_string) is None: + raise ValidationError("cname shall match [a-z0-9\-]{2,20}") + existing_interface = Interface.query.filter_by(name=lower_string).first() + if existing_interface: + raise ValidationError(f"cname matches an existing interface") + existing_host = Host.query.filter_by(name=lower_string).first() + if existing_host: + raise ValidationError(f"cname matches an existing host") + return lower_string + def to_dict(self): d = super().to_dict() d.update({"name": self.name, "interface": str(self.interface)}) diff --git a/app/network/forms.py b/app/network/forms.py index 83d478c..e218479 100644 --- a/app/network/forms.py +++ b/app/network/forms.py @@ -24,6 +24,7 @@ from wtforms import ( from ..helpers import CSEntryForm from ..validators import ( Unique, + UniqueAccrossModels, RegexpList, IPNetwork, HOST_NAME_RE, @@ -135,6 +136,7 @@ class HostForm(CSEntryForm): validators.InputRequired(), validators.Regexp(HOST_NAME_RE), Unique(models.Host), + UniqueAccrossModels([models.Cname]), ], filters=[utils.lowercase_field], ) @@ -179,6 +181,7 @@ class InterfaceForm(CSEntryForm): validators.Regexp(HOST_NAME_RE), Unique(models.Interface), starts_with_hostname, + UniqueAccrossModels([models.Cname]), ], filters=[utils.lowercase_field], ) @@ -194,7 +197,11 @@ class InterfaceForm(CSEntryForm): cnames_string = StringField( "Cnames", description="space separated list of cnames (must be 2-20 characters long and contain only letters, numbers and dash)", - validators=[validators.Optional(), RegexpList(HOST_NAME_RE)], + validators=[ + validators.Optional(), + RegexpList(HOST_NAME_RE), + UniqueAccrossModels([models.Host, models.Interface]), + ], ) tags = SelectMultipleField("Tags", coerce=utils.coerce_to_str_or_none) diff --git a/app/network/views.py b/app/network/views.py index 555c094..5d61312 100644 --- a/app/network/views.py +++ b/app/network/views.py @@ -23,6 +23,7 @@ from flask import ( current_app, ) from flask_login import login_required, current_user +from wtforms import ValidationError from .forms import ( HostForm, InterfaceForm, @@ -69,34 +70,42 @@ def create_host(): # Remove the host_id field inherited from the InterfaceForm # It's not used in this form del form.host_id + # First interface name shall be identical to host name + del form.interface_name if form.validate_on_submit(): network_id = form.network_id.data ansible_groups = [ models.AnsibleGroup.query.get(id_) for id_ in form.ansible_groups.data ] - host = models.Host( - name=form.name.data, - device_type=models.DeviceType.query.get(form.device_type_id.data), - description=form.description.data or None, - ansible_vars=form.ansible_vars.data or None, - ansible_groups=ansible_groups, - ) - # The total number of tags will always be quite small - # It's more efficient to retrieve all of them in one query - # and do the filtering here - all_tags = models.Tag.query.all() - tags = [tag for tag in all_tags if str(tag.id) in form.tags.data] - interface = models.Interface( - name=form.interface_name.data, - ip=form.ip.data, - mac=form.mac.data, - network=models.Network.query.get(network_id), - tags=tags, - ) - interface.cnames = [ - models.Cname(name=name) for name in form.cnames_string.data.split() - ] - host.interfaces = [interface] + try: + host = models.Host( + name=form.name.data, + device_type=models.DeviceType.query.get(form.device_type_id.data), + description=form.description.data or None, + ansible_vars=form.ansible_vars.data or None, + ansible_groups=ansible_groups, + ) + # The total number of tags will always be quite small + # It's more efficient to retrieve all of them in one query + # and do the filtering here + all_tags = models.Tag.query.all() + tags = [tag for tag in all_tags if str(tag.id) in form.tags.data] + interface = models.Interface( + host=host, + name=form.name.data, + ip=form.ip.data, + mac=form.mac.data, + network=models.Network.query.get(network_id), + tags=tags, + ) + interface.cnames = [ + models.Cname(name=name) for name in form.cnames_string.data.split() + ] + except ValidationError as e: + # Check for error raised by model validation (not implemented in form vaildation) + current_app.logger.warning(f"{e}") + flash(f"{e}", "error") + return render_template("network/create_host.html", form=form) current_app.logger.debug(f"Trying to create: {host!r}") db.session.add(host) try: @@ -193,13 +202,22 @@ def edit_host(name): form.ansible_groups.default = [group.id for group in host.ansible_groups] form.ansible_groups.process(request.form) if form.validate_on_submit(): - host.name = form.name.data - host.device_type = models.DeviceType.query.get(form.device_type_id.data) - host.description = form.description.data or None - host.ansible_vars = form.ansible_vars.data or None - host.ansible_groups = [ - models.AnsibleGroup.query.get(id_) for id_ in form.ansible_groups.data - ] + try: + host.name = form.name.data + host.device_type = models.DeviceType.query.get(form.device_type_id.data) + host.description = form.description.data or None + host.ansible_vars = form.ansible_vars.data or None + host.ansible_groups = [ + models.AnsibleGroup.query.get(id_) for id_ in form.ansible_groups.data + ] + # Interface names shall always start with the host name + for interface in host.interfaces: + interface.name = interface.name.replace(name, host.name, 1) + except ValidationError as e: + # Check for error raised by model validation (not implemented in form vaildation) + current_app.logger.warning(f"{e}") + flash(f"{e}", "error") + return render_template("network/edit_host.html", form=form) current_app.logger.debug(f"Trying to update: {host!r}") try: db.session.commit() @@ -227,19 +245,25 @@ def create_interface(hostname): # and do the filtering here all_tags = models.Tag.query.all() tags = [tag for tag in all_tags if str(tag.id) in form.tags.data] - interface = models.Interface( - name=form.interface_name.data, - ip=form.ip.data, - mac=form.mac.data, - network=models.Network.query.get(form.network_id.data), - tags=tags, - ) - interface.cnames = [ - models.Cname(name=name) for name in form.cnames_string.data.split() - ] - # Make sure to update host.interfaces instead of using interface.host_id - # to force the host to be added to the session for indexing - host.interfaces.append(interface) + try: + interface = models.Interface( + host=host, + name=form.interface_name.data, + ip=form.ip.data, + mac=form.mac.data, + network=models.Network.query.get(form.network_id.data), + tags=tags, + ) + interface.cnames = [ + models.Cname(name=name) for name in form.cnames_string.data.split() + ] + except ValidationError as e: + # Check for error raised by model validation (not implemented in form vaildation) + current_app.logger.warning(f"{e}") + flash(f"{e}", "error") + return render_template( + "network/create_interface.html", form=form, hostname=hostname + ) current_app.logger.debug(f"Trying to create: {interface!r}") db.session.add(interface) try: @@ -279,27 +303,35 @@ def edit_interface(name): form.tags.default = [tag.id for tag in interface.tags] form.tags.process(request.form) if form.validate_on_submit(): - interface.name = form.interface_name.data - interface.ip = form.ip.data - interface.mac = form.mac.data - # Setting directly network_id doesn't update the relationship and bypass the checks - # performed on the model - interface.network = models.Network.query.get(form.network_id.data) - # Delete the cnames that have been removed - new_cnames_string = form.cnames_string.data.split() - for (index, cname) in enumerate(interface.cnames): - if cname.name not in new_cnames_string: - current_app.logger.debug(f"Deleting cname: {cname}") - # Removing the cname from interface.cnames list will - # delete it from the database due to the cascade - # delete-orphan option defined on the model - del interface.cnames[index] - # Add new cnames - for name in new_cnames_string: - if name not in cnames_string: - cname = models.Cname(name=name) - current_app.logger.debug(f"Creating cname: {cname}") - interface.cnames.append(cname) + try: + interface.name = form.interface_name.data + interface.ip = form.ip.data + interface.mac = form.mac.data + # Setting directly network_id doesn't update the relationship and bypass the checks + # performed on the model + interface.network = models.Network.query.get(form.network_id.data) + # Delete the cnames that have been removed + new_cnames_string = form.cnames_string.data.split() + for (index, cname) in enumerate(interface.cnames): + if cname.name not in new_cnames_string: + current_app.logger.debug(f"Deleting cname: {cname}") + # Removing the cname from interface.cnames list will + # delete it from the database due to the cascade + # delete-orphan option defined on the model + del interface.cnames[index] + # Add new cnames + for name in new_cnames_string: + if name not in cnames_string: + cname = models.Cname(name=name) + current_app.logger.debug(f"Creating cname: {cname}") + interface.cnames.append(cname) + except ValidationError as e: + # Check for error raised by model validation (not implemented in form vaildation) + current_app.logger.warning(f"{e}") + flash(f"{e}", "error") + return render_template( + "network/edit_interface.html", form=form, hostname=interface.host.name + ) all_tags = models.Tag.query.all() tags = [tag for tag in all_tags if str(tag.id) in form.tags.data] interface.tags = tags diff --git a/app/static/js/hosts.js b/app/static/js/hosts.js index f40cb01..872d6cb 100644 --- a/app/static/js/hosts.js +++ b/app/static/js/hosts.js @@ -118,14 +118,6 @@ $(document).ready(function() { }); } - // The interface name field is hidden in the create host form - // because it should be forced to the hostname - // Do that when submitting the form - $("#hostForm").submit(function(event) { - var hostname = $("#name").val(); - $("#interface_name").val(hostname); - }); - // Fill MAC address on page load if( $("#random_mac").length ) { fill_mac_address(); @@ -136,10 +128,6 @@ $(document).ready(function() { fill_mac_address(); }); - // Force the first interface to have the hostname as name - // This only applies to the hostForm (not when editing or adding interfaces) - $("#hostForm input[name=interface_name]").prop("readonly", true); - var hosts_table = $("#hosts_table").DataTable({ dom: "<'row'<'col-sm-12 col-md-6'l><'col-sm-12 col-md-3 text-right'><'col-sm-12 col-md-3'f>>" + "<'row'<'col-sm-12'tr>>" + diff --git a/app/templates/network/create_host.html b/app/templates/network/create_host.html index b7eeb21..15d844c 100644 --- a/app/templates/network/create_host.html +++ b/app/templates/network/create_host.html @@ -6,7 +6,6 @@ {% block hosts_main %} <form id="hostForm" method="POST"> {{ form.hidden_tag() }} - {{ form.interface_name(type="hidden") }} {{ render_field(form.name, class_="text-lowercase") }} {{ render_field(form.device_type_id, class_="selectize-default") }} {{ render_field(form.description) }} diff --git a/app/validators.py b/app/validators.py index d5d78ac..0248e7a 100644 --- a/app/validators.py +++ b/app/validators.py @@ -82,6 +82,34 @@ class Unique(object): pass +class UniqueAccrossModels: + """Checks field value unicity against other tables + + :param models: the models to check unicity against + :param column: the unique column + :param message: the error message + """ + + def __init__(self, models, column="name", message=None): + self.models = models + self.column = column + self.message = message + + def __call__(self, form, field): + # databases allow multiple NULL values for unique columns + if field.data is None: + return + for string in field.data.split(): + kwargs = {self.column: string} + for model in self.models: + obj = model.query.filter_by(**kwargs).first() + if obj is not None: + message = ( + self.message or f"{string} already exists as {model.__table__}." + ) + raise ValidationError(message) + + class RegexpList: """Validates a list of strings against a user provided regexp. diff --git a/migrations/versions/6b65e1309431_cname_shall_only_be_unique_by_domain.py b/migrations/versions/6b65e1309431_cname_shall_only_be_unique_by_domain.py new file mode 100644 index 0000000..df0c235 --- /dev/null +++ b/migrations/versions/6b65e1309431_cname_shall_only_be_unique_by_domain.py @@ -0,0 +1,23 @@ +"""Cname shall only be unique by domain + +Revision ID: 6b65e1309431 +Revises: 9184cc675b4e +Create Date: 2018-10-26 12:36:20.971032 + +""" +from alembic import op + + +# revision identifiers, used by Alembic. +revision = "6b65e1309431" +down_revision = "9184cc675b4e" +branch_labels = None +depends_on = None + + +def upgrade(): + op.drop_constraint("uq_cname_name", "cname", type_="unique") + + +def downgrade(): + op.create_unique_constraint("uq_cname_name", "cname", ["name"]) diff --git a/migrations/versions/7c38e78b6de6_interface_host_shall_not_be_nullable.py b/migrations/versions/7c38e78b6de6_interface_host_shall_not_be_nullable.py new file mode 100644 index 0000000..a75b3cb --- /dev/null +++ b/migrations/versions/7c38e78b6de6_interface_host_shall_not_be_nullable.py @@ -0,0 +1,24 @@ +"""Interface host shall not be nullable + +Revision ID: 7c38e78b6de6 +Revises: 6b65e1309431 +Create Date: 2018-11-05 17:08:05.396668 + +""" +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = "7c38e78b6de6" +down_revision = "6b65e1309431" +branch_labels = None +depends_on = None + + +def upgrade(): + op.alter_column("interface", "host_id", existing_type=sa.INTEGER(), nullable=False) + + +def downgrade(): + op.alter_column("interface", "host_id", existing_type=sa.INTEGER(), nullable=True) diff --git a/tests/functional/factories.py b/tests/functional/factories.py index aa7b35d..cd1fdc1 100644 --- a/tests/functional/factories.py +++ b/tests/functional/factories.py @@ -145,21 +145,6 @@ class NetworkFactory(factory.alchemy.SQLAlchemyModelFactory): return str(hosts[-5]) -class InterfaceFactory(factory.alchemy.SQLAlchemyModelFactory): - class Meta: - model = models.Interface - sqlalchemy_session = common.Session - sqlalchemy_session_persistence = "commit" - - name = factory.Sequence(lambda n: f"host{n}") - network = factory.SubFactory(NetworkFactory) - ip = factory.LazyAttributeSequence( - lambda o, n: str(ipaddress.ip_address(o.network.first_ip) + n) - ) - mac = factory.Faker("mac_address") - user = factory.SubFactory(UserFactory) - - class DeviceTypeFactory(factory.alchemy.SQLAlchemyModelFactory): class Meta: model = models.DeviceType @@ -190,6 +175,22 @@ class HostFactory(factory.alchemy.SQLAlchemyModelFactory): device_type = factory.SubFactory(DeviceTypeFactory) +class InterfaceFactory(factory.alchemy.SQLAlchemyModelFactory): + class Meta: + model = models.Interface + sqlalchemy_session = common.Session + sqlalchemy_session_persistence = "commit" + + host = factory.SubFactory(HostFactory) + name = factory.LazyAttributeSequence(lambda o, n: f"{o.host.name}-{n}") + network = factory.SubFactory(NetworkFactory) + ip = factory.LazyAttributeSequence( + lambda o, n: str(ipaddress.ip_address(o.network.first_ip) + n) + ) + mac = factory.Faker("mac_address") + user = factory.SubFactory(UserFactory) + + class MacFactory(factory.alchemy.SQLAlchemyModelFactory): class Meta: model = models.Mac @@ -205,7 +206,7 @@ class CnameFactory(factory.alchemy.SQLAlchemyModelFactory): sqlalchemy_session = common.Session sqlalchemy_session_persistence = "commit" - name = factory.Sequence(lambda n: f"host{n}") + name = factory.Sequence(lambda n: f"cname{n}") interface = factory.SubFactory(InterfaceFactory) user = factory.SubFactory(UserFactory) diff --git a/tests/functional/test_api.py b/tests/functional/test_api.py index 45f2fe1..c678b84 100644 --- a/tests/functional/test_api.py +++ b/tests/functional/test_api.py @@ -848,7 +848,7 @@ def test_get_interfaces(client, network_factory, interface_factory, readonly_tok ) interface1 = interface_factory(network=network1, ip="192.168.1.10") interface2 = interface_factory( - network=network1, ip="192.168.1.11", name="interface2" + network=network1, ip="192.168.1.11", host=interface1.host ) interface3 = interface_factory(network=network2, ip="192.168.2.10") @@ -889,9 +889,7 @@ def test_get_interfaces_by_domain( domain=domain2, ) interface1 = interface_factory(network=network1, ip="192.168.1.10") - interface2 = interface_factory( - network=network1, ip="192.168.1.11", name="interface2" - ) + interface2 = interface_factory(network=network1, ip="192.168.1.11") interface3 = interface_factory(network=network2, ip="192.168.2.10") # test filtering by domain @@ -933,9 +931,7 @@ def test_get_interfaces_by_network( last_ip="192.168.2.250", ) interface1 = interface_factory(network=network1, ip="192.168.1.10") - interface2 = interface_factory( - network=network1, ip="192.168.1.11", name="interface2" - ) + interface2 = interface_factory(network=network1, ip="192.168.1.11") interface3 = interface_factory(network=network2, ip="192.168.2.10") # test filtering by network name @@ -962,12 +958,12 @@ def test_get_interfaces_with_model( host1 = host_factory() model1 = model_factory(name="EX3400") item_factory(model=model1, host_id=host1.id) - interface_factory(host_id=host1.id) + interface_factory(host=host1) response = get(client, f"{API_URL}/network/interfaces", token=readonly_token) assert response.get_json()[0]["model"] == "EX3400" -def test_create_interface(client, network_factory, user_token): +def test_create_interface_fails(client, host, network_factory, user_token): network = network_factory( address="192.168.1.0/24", first_ip="192.168.1.10", last_ip="192.168.1.250" ) @@ -993,6 +989,30 @@ def test_create_interface(client, network_factory, user_token): response = post( client, f"{API_URL}/network/interfaces", data=data, token=user_token ) + check_response_message(response, "Missing mandatory field 'host'", 422) + + data["host"] = host.name + response = post( + client, f"{API_URL}/network/interfaces", data=data, token=user_token + ) + check_response_message( + response, f"Interface name shall start with the host name '{host.name}'", 422 + ) + + +def test_create_interface(client, host, network_factory, user_token): + network = network_factory( + address="192.168.1.0/24", first_ip="192.168.1.10", last_ip="192.168.1.250" + ) + data = { + "network": network.vlan_name, + "ip": "192.168.1.20", + "name": host.name, + "host": host.name, + } + response = post( + client, f"{API_URL}/network/interfaces", data=data, token=user_token + ) assert response.status_code == 201 assert { "id", @@ -1012,23 +1032,14 @@ def test_create_interface(client, network_factory, user_token): } == set(response.get_json().keys()) assert response.get_json()["network"] == network.vlan_name assert response.get_json()["ip"] == "192.168.1.20" - assert response.get_json()["name"] == "interface1" - - # Check that IP and name shall be unique - response = post( - client, f"{API_URL}/network/interfaces", data=data, token=user_token - ) - check_response_message( - response, - "(psycopg2.IntegrityError) duplicate key value violates unique constraint", - 422, - ) + assert response.get_json()["name"] == host.name # Check that all parameters can be passed data2 = { "network": network.vlan_name, "ip": "192.168.1.21", - "name": "myhostname", + "name": host.name + "-2", + "host": host.name, "mac": "7c:e2:ca:64:d0:68", } response = post( @@ -1039,14 +1050,29 @@ def test_create_interface(client, network_factory, user_token): # check all items that were created assert models.Interface.query.count() == 2 + # Check that IP and name shall be unique + response = post( + client, f"{API_URL}/network/interfaces", data=data, token=user_token + ) + check_response_message( + response, + "(psycopg2.IntegrityError) duplicate key value violates unique constraint", + 422, + ) + @pytest.mark.parametrize("ip", ("", "foo", "192.168")) -def test_create_interface_invalid_ip(ip, client, network_factory, user_token): +def test_create_interface_invalid_ip(ip, client, host, network_factory, user_token): network = network_factory( address="192.168.1.0/24", first_ip="192.168.1.10", last_ip="192.168.1.250" ) # invalid IP address - data = {"network": network.vlan_name, "ip": ip, "name": "hostname"} + data = { + "network": network.vlan_name, + "ip": ip, + "name": host.name, + "host": host.name, + } response = post( client, f"{API_URL}/network/interfaces", data=data, token=user_token ) @@ -1055,12 +1081,17 @@ def test_create_interface_invalid_ip(ip, client, network_factory, user_token): ) -def test_create_interface_ip_not_in_network(client, network_factory, user_token): +def test_create_interface_ip_not_in_network(client, host, network_factory, user_token): network = network_factory( address="192.168.1.0/24", first_ip="192.168.1.10", last_ip="192.168.1.250" ) # IP address not in range - data = {"network": network.vlan_name, "ip": "192.168.2.4", "name": "hostname"} + data = { + "network": network.vlan_name, + "ip": "192.168.2.4", + "name": host.name, + "host": host.name, + } response = post( client, f"{API_URL}/network/interfaces", data=data, token=user_token ) @@ -1069,12 +1100,17 @@ def test_create_interface_ip_not_in_network(client, network_factory, user_token) ) -def test_create_interface_ip_not_in_range(client, network_factory, user_token): +def test_create_interface_ip_not_in_range(client, host, network_factory, user_token): network = network_factory( address="192.168.1.0/24", first_ip="192.168.1.10", last_ip="192.168.1.250" ) # IP address not in range - data = {"network": network.vlan_name, "ip": "192.168.1.4", "name": "hostname"} + data = { + "network": network.vlan_name, + "ip": "192.168.1.4", + "name": host.name, + "host": host.name, + } response = post( client, f"{API_URL}/network/interfaces", data=data, token=user_token ) @@ -1086,13 +1122,18 @@ def test_create_interface_ip_not_in_range(client, network_factory, user_token): def test_create_interface_ip_not_in_range_as_admin( - client, network_factory, admin_token + client, host, network_factory, admin_token ): network = network_factory( address="192.168.1.0/24", first_ip="192.168.1.10", last_ip="192.168.1.250" ) # IP address not in range - data = {"network": network.vlan_name, "ip": "192.168.1.4", "name": "hostname"} + data = { + "network": network.vlan_name, + "ip": "192.168.1.4", + "name": host.name, + "host": host.name, + } response = post( client, f"{API_URL}/network/interfaces", data=data, token=admin_token ) @@ -1392,15 +1433,12 @@ def test_delete_host_invalid_id(client, host_factory, admin_token): assert len(models.Host.query.all()) == 1 -def test_delete_host_with_interfaces( - client, interface_factory, host_factory, admin_token -): - interface1 = interface_factory() - interface2 = interface_factory() - host1 = host_factory(interfaces=[interface1, interface2]) - assert len(host1.interfaces) == 2 +def test_delete_host_with_interfaces(client, interface_factory, host, admin_token): + interface_factory(host=host) + interface_factory(host=host) + assert len(host.interfaces) == 2 assert len(models.Interface.query.all()) == 2 - response = delete(client, f"{API_URL}/network/hosts/{host1.id}", token=admin_token) + response = delete(client, f"{API_URL}/network/hosts/{host.id}", token=admin_token) assert response.status_code == 204 assert len(models.Host.query.all()) == 0 assert len(models.Interface.query.all()) == 0 @@ -1522,7 +1560,7 @@ def test_create_cname(client, interface, admin_token): ) check_response_message(response, "Missing mandatory field 'name'", 422) - data = {"name": "myhost.tn.esss.lu.se", "interface_id": interface.id} + data = {"name": "myhost", "interface_id": interface.id} response = post(client, f"{API_URL}/network/cnames", data=data, token=admin_token) assert response.status_code == 201 assert {"id", "name", "interface", "created_at", "updated_at", "user"} == set( @@ -1530,10 +1568,8 @@ def test_create_cname(client, interface, admin_token): ) assert response.get_json()["name"] == data["name"] - # Check that name shall be unique + # Check that name shall be unique by domain response = post(client, f"{API_URL}/network/cnames", data=data, token=admin_token) check_response_message( - response, - "(psycopg2.IntegrityError) duplicate key value violates unique constraint", - 422, + response, f"Duplicate cname on the {interface.network.domain} domain", 422 ) diff --git a/tests/functional/test_models.py b/tests/functional/test_models.py index e35603c..9600bda 100644 --- a/tests/functional/test_models.py +++ b/tests/functional/test_models.py @@ -363,10 +363,83 @@ def test_host_indexed(db, host_factory): def test_host_with_interfaces_indexed(db, host_factory, interface_factory): - interface1 = interface_factory(name="myinterface1") - interface2 = interface_factory(name="myinterface2") - host1 = host_factory(name="myhost", interfaces=[interface1, interface2]) - for name in ("myhost", "myinterface1", "myinterface2"): + host1 = host_factory(name="myhost") + interface_factory(name="myhost", host=host1) + interface_factory(name="myhost2", host=host1) + for name in ("myhost", "myhost2"): res = db.app.elasticsearch.search(index="host-test", q=name) assert res["hits"]["total"] == 1 assert res["hits"]["hits"][0]["_id"] == str(host1.id) + + +def test_interface_name_starts_with_host_name(db, host_factory, interface_factory): + host1 = host_factory(name="myhost") + interface = interface_factory(name="myhost-1", host=host1) + assert host1.interfaces[0] == interface + with pytest.raises(ValidationError) as excinfo: + interface_factory(name="myinterface", host=host1) + assert "Interface name shall start with the host name 'myhost'" in str( + excinfo.value + ) + + +def test_interface_name_existing_host(db, host_factory, interface_factory): + host1 = host_factory(name="myhost") + host_factory(name="myhost2") + with pytest.raises(ValidationError) as excinfo: + interface_factory(name="myhost2", host=host1) + assert "Interface name matches an existing host" in str(excinfo.value) + + +def test_interface_name_existing_cname( + db, host_factory, interface_factory, cname_factory +): + host1 = host_factory(name="myhost") + cname_factory(name="myhost2") + with pytest.raises(ValidationError) as excinfo: + interface_factory(name="myhost2", host=host1) + assert "Interface name matches an existing cname" in str(excinfo.value) + + +def test_host_existing_interface(db, host_factory, interface): + with pytest.raises(ValidationError) as excinfo: + host_factory(name=interface.name) + assert "Host name matches an existing interface" in str(excinfo.value) + + +def test_host_existing_cname(db, host_factory, cname): + with pytest.raises(ValidationError) as excinfo: + host_factory(name=cname.name) + assert "Host name matches an existing cname" in str(excinfo.value) + + +def test_cname_existing_host(db, host_factory, cname_factory): + host_factory(name="myhost") + with pytest.raises(ValidationError) as excinfo: + cname_factory(name="myhost") + assert "cname matches an existing host" in str(excinfo.value) + + +def test_cname_existing_interface(db, interface, cname_factory): + with pytest.raises(ValidationError) as excinfo: + cname_factory(name=interface.name) + assert "cname matches an existing interface" in str(excinfo.value) + + +def test_cname_unique_by_domain(db, interface_factory, network_factory, cname_factory): + network1 = network_factory() + network2 = network_factory() + assert network1.domain.name != network2.domain.name + interface1 = interface_factory(network=network1) + interface2 = interface_factory(network=network2) + # We can have identical cname on different domains + cname1 = cname_factory(name="mycname", interface=interface1) + cname2 = cname_factory(name="mycname", interface=interface2) + assert cname1.fqdn == f"mycname.{network1.domain}" + assert cname2.fqdn == f"mycname.{network2.domain}" + assert cname1.fqdn != cname2.fqdn + # cname must be unique by domain + interface3 = interface_factory(network=network1) + with pytest.raises(ValidationError) as excinfo: + cname_factory(name="mycname", interface=interface3) + assert f"Duplicate cname on the {network1.domain} domain" in str(excinfo.value) diff --git a/tests/functional/test_web.py b/tests/functional/test_web.py index a03ebd5..8d4267f 100644 --- a/tests/functional/test_web.py +++ b/tests/functional/test_web.py @@ -176,10 +176,10 @@ def test_generate_random_mac(logged_client): def test_retrieve_hosts(logged_client, interface_factory, host_factory): response = logged_client.post("/network/_retrieve_hosts") assert response.get_json()["data"] == [] - interface1 = interface_factory(name="myinterface1") - interface2 = interface_factory(name="myinterface2") - host1 = host_factory(name="host1", interfaces=[interface1]) - host2 = host_factory(name="host2", interfaces=[interface2]) + host1 = host_factory(name="host1") + host2 = host_factory(name="host2") + interface_factory(name="host1", host=host1) + interface_factory(name="host2", host=host2) response = logged_client.post("/network/_retrieve_hosts") hosts = response.get_json()["data"] assert {host1.name, host2.name} == set(host["name"] for host in hosts) @@ -187,11 +187,9 @@ def test_retrieve_hosts(logged_client, interface_factory, host_factory): assert len(hosts[0]["interfaces"][0]) == 14 -def test_retrieve_hosts_by_ip(logged_client, interface_factory, host_factory): - interface1 = interface_factory(name="myinterface1") - interface2 = interface_factory(name="myinterface2") - host1 = host_factory(name="host1", interfaces=[interface1]) - host_factory(name="host2", interfaces=[interface2]) +def test_retrieve_hosts_by_ip(logged_client, interface_factory): + interface1 = interface_factory() + interface_factory() response = logged_client.post( "/network/_retrieve_hosts", data={"draw": "50", "length": 20, "start": 0, "search[value]": interface1.ip}, @@ -200,15 +198,15 @@ def test_retrieve_hosts_by_ip(logged_client, interface_factory, host_factory): assert r["recordsTotal"] == 2 assert r["recordsFiltered"] == 1 assert len(r["data"]) == 1 - assert r["data"][0]["name"] == host1.name + assert r["data"][0]["name"] == interface1.host.name def test_delete_interface_from_index(logged_rw_client, interface_factory, host_factory): - interface1 = interface_factory(name="myinterface1") - interface2 = interface_factory(name="myinterface2") - host1 = host_factory(name="host1", interfaces=[interface1, interface2]) + host1 = host_factory(name="host1") + interface_factory(name="host1", host=host1) + interface2 = interface_factory(name="host1b", host=host1) # The interface is in the index - instances, nb = models.Host.search("myinterface2") + instances, nb = models.Host.search("host1b") assert list(instances) == [host1] assert nb == 1 # Delete the interface @@ -219,7 +217,7 @@ def test_delete_interface_from_index(logged_rw_client, interface_factory, host_f # It's not in the database anymore assert models.Interface.query.get(interface2.id) is None # Neither in the index - instances, nb = models.Host.search("myinterface2") + instances, nb = models.Host.search("host1b") assert list(instances) == [] assert nb == 0 # But host1 is still in the index @@ -267,7 +265,6 @@ def test_create_host(logged_rw_client, network_factory, device_type): "tags": [], "random_mac": False, "cnames_string": "", - "interface_name": "myhost", } response = logged_rw_client.post(f"/network/hosts/create", data=form) assert response.status_code == 302 @@ -276,3 +273,4 @@ def test_create_host(logged_rw_client, network_factory, device_type): assert host is not None assert host.interfaces[0].ip == ip assert host.interfaces[0].mac == mac + assert host.interfaces[0].name == name -- GitLab