Skip to content
Snippets Groups Projects
views.py 32.1 KiB
Newer Older
Benjamin Bertrand's avatar
Benjamin Bertrand committed
# -*- coding: utf-8 -*-
"""
app.network.views
~~~~~~~~~~~~~~~~~
This module implements the network blueprint.
Benjamin Bertrand's avatar
Benjamin Bertrand committed

:copyright: (c) 2017 European Spallation Source ERIC
:license: BSD 2-Clause, see LICENSE for more details.

"""
import ipaddress
Benjamin Bertrand's avatar
Benjamin Bertrand committed
import sqlalchemy as sa
Benjamin Bertrand's avatar
Benjamin Bertrand committed
from flask import (
    Blueprint,
    render_template,
    jsonify,
    session,
    redirect,
    url_for,
    request,
    flash,
    current_app,
Benjamin Bertrand's avatar
Benjamin Bertrand committed
from flask_login import login_required, current_user
from wtforms import ValidationError
Benjamin Bertrand's avatar
Benjamin Bertrand committed
from .forms import (
    HostForm,
    InterfaceForm,
    HostInterfaceForm,
    NetworkForm,
    EditNetworkForm,
Benjamin Bertrand's avatar
Benjamin Bertrand committed
    NetworkScopeForm,
    DomainForm,
    CreateVMForm,
    AnsibleGroupForm,
Benjamin Bertrand's avatar
Benjamin Bertrand committed
from ..extensions import db
from ..decorators import login_groups_accepted
Benjamin Bertrand's avatar
Benjamin Bertrand committed
from .. import models, utils
Benjamin Bertrand's avatar
Benjamin Bertrand committed
bp = Blueprint("network", __name__)
@bp.route("/_retrieve_hosts", methods=["POST"])
@login_required
def retrieve_hosts():
    return utils.retrieve_data_for_datatables(
        request.values, models.Host, filter_sensitive=True
    )
Benjamin Bertrand's avatar
Benjamin Bertrand committed
@bp.route("/hosts")
Benjamin Bertrand's avatar
Benjamin Bertrand committed
@login_required
def list_hosts():
Benjamin Bertrand's avatar
Benjamin Bertrand committed
    return render_template("network/hosts.html")
Benjamin Bertrand's avatar
Benjamin Bertrand committed
@bp.route("/hosts/create", methods=("GET", "POST"))
@login_groups_accepted("admin", "network")
Benjamin Bertrand's avatar
Benjamin Bertrand committed
def create_host():
Benjamin Bertrand's avatar
Benjamin Bertrand committed
    kwargs = {"random_mac": True}
Benjamin Bertrand's avatar
Benjamin Bertrand committed
    # Try to get the network_id from the session
    # to pre-fill the form with the same network
    if session.get("network_id"):
        kwargs["network_id"] = session["network_id"]
    # Same for the device_type
    if session.get("device_type_id"):
        kwargs["device_type_id"] = session["device_type_id"]
    form = HostInterfaceForm(request.form, **kwargs)
    # Remove the host_id field inherited from the InterfaceForm
    # It's not used in this form
    del form.host_id
    # First interface name shall be identical to host name
    del form.interface_name
Benjamin Bertrand's avatar
Benjamin Bertrand committed
    if form.validate_on_submit():
        device_type_id = form.device_type_id.data
Benjamin Bertrand's avatar
Benjamin Bertrand committed
        network_id = form.network_id.data
        network = models.Network.query.get(network_id)
        if not current_user.has_access_to_network(network):
            abort(403)
        ansible_groups = [
            models.AnsibleGroup.query.get(id_) for id_ in form.ansible_groups.data
        try:
            host = models.Host(
                name=form.name.data,
                device_type=models.DeviceType.query.get(device_type_id),
                is_ioc=form.is_ioc.data,
                description=form.description.data or None,
                ansible_vars=form.ansible_vars.data or None,
                ansible_groups=ansible_groups,
            )
            interface = models.Interface(
                host=host,
                name=form.name.data,
                ip=form.ip.data,
                mac=form.mac.data,
                network=network,
            )
            interface.cnames = [
                models.Cname(name=name) for name in form.cnames_string.data.split()
            ]
        except ValidationError as e:
Benjamin Bertrand's avatar
Benjamin Bertrand committed
            # Check for error raised by model validation (not implemented in form validation)
            current_app.logger.warning(f"{e}")
            flash(f"{e}", "error")
            return render_template("network/create_host.html", form=form)
Benjamin Bertrand's avatar
Benjamin Bertrand committed
        current_app.logger.debug(f"Trying to create: {host!r}")
Benjamin Bertrand's avatar
Benjamin Bertrand committed
        db.session.add(host)
        try:
            db.session.commit()
        except sa.exc.IntegrityError as e:
            db.session.rollback()
Benjamin Bertrand's avatar
Benjamin Bertrand committed
            current_app.logger.warning(f"{e}")
            flash(f"{e}", "error")
Benjamin Bertrand's avatar
Benjamin Bertrand committed
        else:
Benjamin Bertrand's avatar
Benjamin Bertrand committed
            flash(f"Host {host} created!", "success")
Benjamin Bertrand's avatar
Benjamin Bertrand committed
            if host.is_ioc:
                utils.trigger_ioc_repository_creation(host)
                db.session.commit()
        # Save network_id and device_type_id to the session to retrieve them after the redirect
Benjamin Bertrand's avatar
Benjamin Bertrand committed
        session["network_id"] = network_id
        session["device_type_id"] = device_type_id
Benjamin Bertrand's avatar
Benjamin Bertrand committed
        return redirect(url_for("network.view_host", name=host.name))
    return render_template("network/create_host.html", form=form)
@bp.route("/hosts/delete", methods=["POST"])
@login_required
def delete_host():
    host = models.Host.query.get_or_404(request.form["host_id"])
    if not current_user.can_delete_host(host):
        abort(403)
    # Deleting the host will also delete all
    # associated interfaces due to the cascade delete option
    # defined on the model
    db.session.delete(host)
    db.session.commit()
    flash(f"Host {host.name} has been deleted", "success")
    if host.device_type.name == "VirtualMachine":
        flash(
            "Note that the host was only removed from CSEntry. "
            "To delete a running VM, please contact an administrator.",
            "info",
        )
    return redirect(url_for("network.list_hosts"))


Benjamin Bertrand's avatar
Benjamin Bertrand committed
@bp.route("/hosts/view/<name>", methods=("GET", "POST"))
@login_required
def view_host(name):
    host = models.Host.query.filter_by(name=name).first_or_404()
    if not current_user.is_admin and host.sensitive:
        # Only admin users can see hosts on sensitive networks
        abort(403)
    if host.main_interface is None:
        flash(f"Host {host.name} has no interface! Add one or delete it.", "warning")
    elif host.main_interface.name != host.name:
        flash(
            f"The main interface '{host.main_interface.name}' shall have the same name as the host!"
            f" Please rename it '{host.name}'.",
            "warning",
        )
    if host.device_type.name == "PhysicalMachine":
        form = BootProfileForm()
    elif host.device_type.name.startswith("Virtual"):
        form = CreateVMForm()
        if host.is_ioc:
            for field in ("cores", "memory", "disk", "osversion"):
                key = f"VIOC_{field.upper()}_CHOICES"
                getattr(form, field).choices = utils.get_choices(
                    current_app.config[key]
                )
    else:
        form = None
    if form is not None and form.validate_on_submit():
        if host.device_type.name == "PhysicalMachine":
            if not current_user.can_set_boot_profile(host):
                flash(
                    f"You don't have the proper permissions to set the boot profile. Please contact an admin user.",
                    "warning",
                )
                return redirect(url_for("network.view_host", name=name))
            else:
                boot_profile = form.boot_profile.data
                task = utils.trigger_set_network_boot_profile(
                    host, boot_profile=boot_profile
                )
                if boot_profile != "localboot":
                    # For localboot, there is no need to update the variable
                    # csentry_autoinstall_boot_profile is used for DHCP options
                    if utils.update_ansible_vars(
                        host, {"csentry_autoinstall_boot_profile": boot_profile}
                    ):
                        # If a change occured, force DHCP update
                        utils.trigger_core_services_update()
                db.session.commit()
                current_app.logger.info(
                    f"Set network boot profile to {boot_profile} for {name} requested: task {task.id}"
                )
                flash(
                    f"Set network boot profile to {boot_profile} for {name} requested! "
                    "Refresh the page to update the status. You can reboot the machine when the task is done.",
                    "success",
                )
                return redirect(url_for("task.view_task", id_=task.id))
            if not current_user.can_create_vm(host):
                flash(
                    f"You don't have the proper permissions to create this VM. Please contact an admin user.",
                    "warning",
                )
                return redirect(url_for("network.view_host", name=name))
            else:
Benjamin Bertrand's avatar
Benjamin Bertrand committed
                task = utils.trigger_vm_creation(
                    host,
                    vm_disk_size=int(form.disk.data),
                    vm_cores=int(form.cores.data),
                    vm_memory=int(form.memory.data) * 1024,
                    vm_osversion=form.osversion.data,
                    skip_post_install_job=form.skip_post_install_job.data,
                )
                db.session.commit()
                current_app.logger.info(f"Creation of {name} requested: task {task.id}")
                flash(
                    f"Creation of {name} requested! Refresh the page to update the status.",
                    "success",
                )
                return redirect(url_for("task.view_task", id_=task.id))
Benjamin Bertrand's avatar
Benjamin Bertrand committed
    return render_template("network/view_host.html", host=host, form=form)


@bp.route("/hosts/edit/<name>", methods=("GET", "POST"))
@login_groups_accepted("admin", "network")
def edit_host(name):
    host = models.Host.query.filter_by(name=name).first_or_404()
    if not current_user.has_access_to_network(host.main_network):
        abort(403)
    form = HostForm(request.form, obj=host)
    # Passing ansible_groups as kwarg to the HostForm doesn't work because
    # obj takes precedence (but host.ansible_groups contain AnsibleGroup instances and not id)
    # We need to update the default values. Calling process is required.
    # See https://stackoverflow.com/questions/5519729/wtforms-how-to-select-options-in-selectmultiplefield
    form.ansible_groups.default = [group.id for group in host.ansible_groups]
    form.ansible_groups.process(request.form)
    if form.validate_on_submit():
        try:
            host.name = form.name.data
            host.device_type = models.DeviceType.query.get(form.device_type_id.data)
            host.is_ioc = form.is_ioc.data
            host.description = form.description.data or None
            host.ansible_vars = form.ansible_vars.data or None
            host.ansible_groups = [
                models.AnsibleGroup.query.get(id_) for id_ in form.ansible_groups.data
            ]
            # Interface names shall always start with the host name
            for interface in host.interfaces:
                interface.name = interface.name.replace(name, host.name, 1)
        except ValidationError as e:
Benjamin Bertrand's avatar
Benjamin Bertrand committed
            # Check for error raised by model validation (not implemented in form validation)
            current_app.logger.warning(f"{e}")
            flash(f"{e}", "error")
            return render_template("network/edit_host.html", form=form)
Benjamin Bertrand's avatar
Benjamin Bertrand committed
        current_app.logger.debug(f"Trying to update: {host!r}")
        try:
            db.session.commit()
        except sa.exc.IntegrityError as e:
            db.session.rollback()
Benjamin Bertrand's avatar
Benjamin Bertrand committed
            current_app.logger.warning(f"{e}")
            flash(f"{e}", "error")
Benjamin Bertrand's avatar
Benjamin Bertrand committed
            flash(f"Host {host} updated!", "success")
Benjamin Bertrand's avatar
Benjamin Bertrand committed
            if host.is_ioc:
                utils.trigger_ioc_repository_creation(host)
                db.session.commit()
Benjamin Bertrand's avatar
Benjamin Bertrand committed
        return redirect(url_for("network.view_host", name=host.name))
    return render_template("network/edit_host.html", form=form)
Benjamin Bertrand's avatar
Benjamin Bertrand committed
@bp.route("/interfaces/create/<hostname>", methods=("GET", "POST"))
@login_groups_accepted("admin", "network")
def create_interface(hostname):
    host = models.Host.query.filter_by(name=hostname).first_or_404()
    # User shall have access to the host main interface domain
    if not current_user.has_access_to_network(host.main_network):
        abort(403)
Benjamin Bertrand's avatar
Benjamin Bertrand committed
    random_mac = host.device_type.name.startswith("Virtual")
    form = InterfaceForm(
        request.form, host_id=host.id, interface_name=host.name, random_mac=random_mac
    )
    if not current_user.is_admin and host.main_network is not None:
        # Restrict the networks to the same network scope as the main interface
        form.network_id.choices = [
            (str(network.id), network.vlan_name)
            for network in models.Network.query.filter_by(scope=host.main_network.scope)
            .order_by(models.Network.vlan_name)
            .all()
            if current_user.has_access_to_network(network)
        ]
    if form.validate_on_submit():
        # User shall have access to the new interface domain
        network = models.Network.query.get(form.network_id.data)
        if not current_user.has_access_to_network(network):
            abort(403)
        try:
            interface = models.Interface(
                host=host,
                name=form.interface_name.data,
                ip=form.ip.data,
                mac=form.mac.data,
                network=network,
            )
            interface.cnames = [
                models.Cname(name=name) for name in form.cnames_string.data.split()
            ]
        except ValidationError as e:
Benjamin Bertrand's avatar
Benjamin Bertrand committed
            # Check for error raised by model validation (not implemented in form validation)
            current_app.logger.warning(f"{e}")
            flash(f"{e}", "error")
            return render_template(
                "network/create_interface.html", form=form, hostname=hostname
            )
Benjamin Bertrand's avatar
Benjamin Bertrand committed
        current_app.logger.debug(f"Trying to create: {interface!r}")
        db.session.add(interface)
        try:
            db.session.commit()
        except sa.exc.IntegrityError as e:
            db.session.rollback()
Benjamin Bertrand's avatar
Benjamin Bertrand committed
            current_app.logger.warning(f"{e}")
            flash(f"{e}", "error")
            flash(f"Interface {interface} created!", "success")
Benjamin Bertrand's avatar
Benjamin Bertrand committed
        return redirect(url_for("network.create_interface", hostname=hostname))
    return render_template(
        "network/create_interface.html", form=form, hostname=hostname
    )
Benjamin Bertrand's avatar
Benjamin Bertrand committed
@bp.route("/interfaces/edit/<name>", methods=("GET", "POST"))
@login_groups_accepted("admin", "network")
def edit_interface(name):
    interface = models.Interface.query.filter_by(name=name).first_or_404()
    if not current_user.has_access_to_network(interface.network):
        abort(403)
Benjamin Bertrand's avatar
Benjamin Bertrand committed
    cnames_string = " ".join([str(cname) for cname in interface.cnames])
    form = InterfaceForm(
        request.form,
        obj=interface,
        interface_name=interface.name,
        cnames_string=cnames_string,
    )
    if not current_user.is_admin and not interface.is_main:
        # Restrict the networks to the same network scope as the main interface
        form.network_id.choices = [
            (str(network.id), network.vlan_name)
            for network in models.Network.query.filter_by(
                scope=interface.host.main_network.scope
            )
            .order_by(models.Network.vlan_name)
            .all()
            if current_user.has_access_to_network(network)
        ]
    # Remove the random_mac field (not used when editing)
    del form.random_mac
    ips = [interface.ip]
    ips.extend([str(address) for address in interface.network.available_ips()])
    form.ip.choices = utils.get_choices(ips)
    if form.validate_on_submit():
        network = models.Network.query.get(form.network_id.data)
        if not current_user.has_access_to_network(network):
            abort(403)
        try:
            interface.name = form.interface_name.data
            interface.ip = form.ip.data
            interface.mac = form.mac.data
            # Setting directly network_id doesn't update the relationship and bypass the checks
            # performed on the model
            interface.network = network
            # Delete the cnames that have been removed
            new_cnames_string = form.cnames_string.data.split()
            for (index, cname) in enumerate(interface.cnames):
                if cname.name not in new_cnames_string:
                    current_app.logger.debug(f"Deleting cname: {cname}")
                    # Removing the cname from interface.cnames list will
                    # delete it from the database due to the cascade
                    # delete-orphan option defined on the model
                    del interface.cnames[index]
            # Add new cnames
            for name in new_cnames_string:
                if name not in cnames_string:
                    cname = models.Cname(name=name)
                    current_app.logger.debug(f"Creating cname: {cname}")
                    interface.cnames.append(cname)
        except ValidationError as e:
Benjamin Bertrand's avatar
Benjamin Bertrand committed
            # Check for error raised by model validation (not implemented in form validation)
            current_app.logger.warning(f"{e}")
            flash(f"{e}", "error")
            return render_template(
                "network/edit_interface.html", form=form, hostname=interface.host.name
            )
        # Mark the host as "dirty" to add it to the session so that it will
        # be re-indexed
        sa.orm.attributes.flag_modified(interface.host, "interfaces")
Benjamin Bertrand's avatar
Benjamin Bertrand committed
        current_app.logger.debug(f"Trying to update: {interface!r}")
        try:
            db.session.commit()
        except sa.exc.IntegrityError as e:
            db.session.rollback()
Benjamin Bertrand's avatar
Benjamin Bertrand committed
            current_app.logger.warning(f"{e}")
            flash(f"{e}", "error")
Benjamin Bertrand's avatar
Benjamin Bertrand committed
            flash(f"Interface {interface} updated!", "success")
        return redirect(url_for("network.view_host", name=interface.host.name))
    return render_template(
        "network/edit_interface.html", form=form, hostname=interface.host.name
    )
Benjamin Bertrand's avatar
Benjamin Bertrand committed
@bp.route("/interfaces/delete", methods=["POST"])
@login_groups_accepted("admin", "network")
def delete_interface():
Benjamin Bertrand's avatar
Benjamin Bertrand committed
    interface = models.Interface.query.get_or_404(request.form["interface_id"])
    if not current_user.has_access_to_network(interface.network):
        abort(403)
    hostname = interface.host.name
    # Explicitely remove the interface from the host to make sure
    # it will be re-indexed
    interface.host.interfaces.remove(interface)
    # Deleting the interface will also delete all
    # associated cnames due to the cascade delete option
    # defined on the model
    db.session.delete(interface)
    db.session.commit()
Benjamin Bertrand's avatar
Benjamin Bertrand committed
    flash(f"Interface {interface.name} has been deleted", "success")
    return redirect(url_for("network.view_host", name=hostname))
@bp.route("/groups")
@login_required
def list_ansible_groups():
    return render_template("network/groups.html")


@bp.route("/groups/view/<name>", methods=("GET", "POST"))
@login_required
def view_ansible_group(name):
    group = models.AnsibleGroup.query.filter_by(name=name).first_or_404()
    return render_template("network/view_group.html", group=group)


@bp.route("/groups/delete", methods=["POST"])
@login_groups_accepted("admin")
def delete_ansible_group():
    group = models.AnsibleGroup.query.get_or_404(request.form["group_id"])
    db.session.delete(group)
    db.session.commit()
    flash(f"Group {group.name} has been deleted", "success")
    return redirect(url_for("network.list_ansible_groups"))


@bp.route("/groups/edit/<name>", methods=("GET", "POST"))
@login_groups_accepted("admin")
def edit_ansible_group(name):
    group = models.AnsibleGroup.query.filter_by(name=name).first_or_404()
    form = AnsibleGroupForm(request.form, obj=group)
    # Restrict the children that can be added
    # We don't check parents of parents, but that will be catched by the validate_children
    # and raise a ValidationError
    form.children.choices = [
        (str(ansible_group.id), ansible_group.name)
        for ansible_group in models.AnsibleGroup.query.order_by(
            models.AnsibleGroup.name
        ).all()
        if (ansible_group not in group.parents)
        and (ansible_group.name != "all")
        and (ansible_group != group)
    ]
    # Passing hosts as kwarg to the AnsibleGroupForm doesn't work because
    # obj takes precedence (but group.hosts contain Host instances and not id)
    # We need to update the default values. Calling process is required.
    # See https://stackoverflow.com/questions/5519729/wtforms-how-to-select-options-in-selectmultiplefield
    form.hosts.default = [host.id for host in group.hosts]
    form.hosts.process(request.form)
    # Same for AnsibleGroup children
    form.children.default = [child.id for child in group.children]
    form.children.process(request.form)
    if form.validate_on_submit():
        try:
            group.name = form.name.data
            group.vars = form.vars.data or None
            group.type = form.type.data
            group.hosts = [models.Host.query.get(id_) for id_ in form.hosts.data]
            group.children = [
                models.AnsibleGroup.query.get(id_) for id_ in form.children.data
            ]
        except ValidationError as e:
Benjamin Bertrand's avatar
Benjamin Bertrand committed
            # Check for error raised by model validation (not implemented in form validation)
            current_app.logger.warning(f"{e}")
            flash(f"{e}", "error")
            return render_template("network/edit_group.html", form=form)
        current_app.logger.debug(f"Trying to update: {group!r}")
        try:
            db.session.commit()
        except sa.exc.IntegrityError as e:
            db.session.rollback()
            current_app.logger.warning(f"{e}")
            flash(f"{e}", "error")
        else:
            flash(f"Group {group} updated!", "success")
        return redirect(url_for("network.view_ansible_group", name=group.name))
    return render_template("network/edit_group.html", form=form)


@bp.route("/groups/create", methods=("GET", "POST"))
@login_groups_accepted("admin")
def create_ansible_group():
    form = AnsibleGroupForm()
    if form.validate_on_submit():
        hosts = [models.Host.query.get(id_) for id_ in form.hosts.data]
        children = [models.AnsibleGroup.query.get(id_) for id_ in form.children.data]
        try:
            group = models.AnsibleGroup(
                name=form.name.data,
                vars=form.vars.data or None,
                type=form.type.data,
                hosts=hosts,
                children=children,
            )
        except ValidationError as e:
Benjamin Bertrand's avatar
Benjamin Bertrand committed
            # Check for error raised by model validation (not implemented in form validation)
            current_app.logger.warning(f"{e}")
            flash(f"{e}", "error")
            return render_template("network/create_group.html", form=form)
        current_app.logger.debug(f"Trying to create: {group!r}")
        db.session.add(group)
        try:
            db.session.commit()
        except sa.exc.IntegrityError as e:
            db.session.rollback()
            current_app.logger.warning(f"{e}")
            flash(f"{e}", "error")
        else:
            flash(f"Group {group} created!", "success")
        return redirect(url_for("network.view_ansible_group", name=group.name))
    return render_template("network/create_group.html", form=form)


Benjamin Bertrand's avatar
Benjamin Bertrand committed
@bp.route("/domains")
Benjamin Bertrand's avatar
Benjamin Bertrand committed
@login_required
def list_domains():
Benjamin Bertrand's avatar
Benjamin Bertrand committed
    domains = models.Domain.query.all()
    return render_template("network/domains.html", domains=domains)
Benjamin Bertrand's avatar
Benjamin Bertrand committed
@bp.route("/domains/create", methods=("GET", "POST"))
@login_groups_accepted("admin")
Benjamin Bertrand's avatar
Benjamin Bertrand committed
def create_domain():
    form = DomainForm()
    if form.validate_on_submit():
        domain = models.Domain(name=form.name.data)
Benjamin Bertrand's avatar
Benjamin Bertrand committed
        current_app.logger.debug(f"Trying to create: {domain!r}")
Benjamin Bertrand's avatar
Benjamin Bertrand committed
        db.session.add(domain)
        try:
            db.session.commit()
        except sa.exc.IntegrityError as e:
            db.session.rollback()
Benjamin Bertrand's avatar
Benjamin Bertrand committed
            current_app.logger.warning(f"{e}")
            flash(f"{e}", "error")
Benjamin Bertrand's avatar
Benjamin Bertrand committed
        else:
Benjamin Bertrand's avatar
Benjamin Bertrand committed
            flash(f"Domain {domain} created!", "success")
        return redirect(url_for("network.create_domain"))
    return render_template("network/create_domain.html", form=form)
Benjamin Bertrand's avatar
Benjamin Bertrand committed
@bp.route("/scopes")
@login_groups_accepted("admin")
def list_scopes():
Benjamin Bertrand's avatar
Benjamin Bertrand committed
    scopes = models.NetworkScope.query.all()
    return render_template("network/scopes.html", scopes=scopes)
@bp.route("/scopes/view/<name>")
@login_groups_accepted("admin")
def view_scope(name):
    scope = models.NetworkScope.query.filter_by(name=name).first_or_404()
    return render_template("network/view_scope.html", scope=scope)


Benjamin Bertrand's avatar
Benjamin Bertrand committed
@bp.route("/scopes/create", methods=("GET", "POST"))
@login_groups_accepted("admin")
def create_scope():
    form = NetworkScopeForm()
    if form.validate_on_submit():
        try:
            scope = models.NetworkScope(
                name=form.name.data,
                description=form.description.data or None,
                first_vlan=form.first_vlan.data,
                last_vlan=form.last_vlan.data,
                supernet=form.supernet.data,
                domain=models.Domain.query.get(form.domain_id.data),
            )
        except ValidationError as e:
Benjamin Bertrand's avatar
Benjamin Bertrand committed
            # Check for error raised by model validation (not implemented in form validation)
            current_app.logger.warning(f"{e}")
            flash(f"{e}", "error")
            return render_template("network/create_scope.html", form=form)
Benjamin Bertrand's avatar
Benjamin Bertrand committed
        current_app.logger.debug(f"Trying to create: {scope!r}")
        db.session.add(scope)
        try:
            db.session.commit()
        except sa.exc.IntegrityError as e:
            db.session.rollback()
Benjamin Bertrand's avatar
Benjamin Bertrand committed
            current_app.logger.warning(f"{e}")
            flash(f"{e}", "error")
Benjamin Bertrand's avatar
Benjamin Bertrand committed
            flash(f"Network Scope {scope} created!", "success")
        return redirect(url_for("network.create_scope"))
    return render_template("network/create_scope.html", form=form)
Benjamin Bertrand's avatar
Benjamin Bertrand committed
@bp.route("/_retrieve_first_available_ip/<int:network_id>")
Benjamin Bertrand's avatar
Benjamin Bertrand committed
@login_required
def retrieve_first_available_ip(network_id):
Benjamin Bertrand's avatar
Benjamin Bertrand committed
    try:
        network = models.Network.query.get(network_id)
    except sa.exc.DataError:
Benjamin Bertrand's avatar
Benjamin Bertrand committed
        current_app.logger.warning(f"Invalid network_id: {network_id}")
        data = ""
Benjamin Bertrand's avatar
Benjamin Bertrand committed
    else:
        data = str(network.available_ips()[0])
Benjamin Bertrand's avatar
Benjamin Bertrand committed
    return jsonify(data=data)
Benjamin Bertrand's avatar
Benjamin Bertrand committed
@bp.route("/networks")
@login_groups_accepted("admin", "network")
def list_networks():
    networks = models.Network.query.all()
    if not current_user.is_admin:
        networks = [
            network
            for network in networks
            if current_user.has_access_to_network(network)
        ]
    return render_template("network/networks.html", networks=networks)
@bp.route("/networks/view/<vlan_name>")
@login_groups_accepted("admin", "network")
def view_network(vlan_name):
    network = models.Network.query.filter_by(vlan_name=vlan_name).first_or_404()
    if not current_user.has_access_to_network(network):
        abort(403)
    return render_template("network/view_network.html", network=network)


Benjamin Bertrand's avatar
Benjamin Bertrand committed
@bp.route("/networks/create", methods=("GET", "POST"))
@login_groups_accepted("admin")
def create_network():
    # Try to get the scope_id from the session
    # to pre-fill the form with the same network scope
    try:
Benjamin Bertrand's avatar
Benjamin Bertrand committed
        scope_id = session["scope_id"]
    except KeyError:
        # No need to pass request.form when no extra keywords are given
        form = NetworkForm()
    else:
        form = NetworkForm(request.form, scope_id=scope_id)
    if form.validate_on_submit():
        scope_id = form.scope_id.data
        try:
            network = models.Network(
                scope=models.NetworkScope.query.get(scope_id),
                vlan_name=form.vlan_name.data,
                vlan_id=form.vlan_id.data,
                description=form.description.data or None,
                address=form.address.data,
                first_ip=form.first_ip.data,
                last_ip=form.last_ip.data,
                gateway=form.gateway.data,
                domain=models.Domain.query.get(form.domain_id.data),
                admin_only=form.admin_only.data,
            )
        except ValidationError as e:
Benjamin Bertrand's avatar
Benjamin Bertrand committed
            # Check for error raised by model validation (not implemented in form validation)
            current_app.logger.warning(f"{e}")
            flash(f"{e}", "error")
            return render_template("network/create_network.html", form=form)
Benjamin Bertrand's avatar
Benjamin Bertrand committed
        current_app.logger.debug(f"Trying to create: {network!r}")
        db.session.add(network)
        try:
            db.session.commit()
        except sa.exc.IntegrityError as e:
            db.session.rollback()
Benjamin Bertrand's avatar
Benjamin Bertrand committed
            current_app.logger.warning(f"{e}")
            flash(f"{e}", "error")
Benjamin Bertrand's avatar
Benjamin Bertrand committed
            flash(f"Network {network} created!", "success")
        # Save scope_id to the session to retrieve it after the redirect
Benjamin Bertrand's avatar
Benjamin Bertrand committed
        session["scope_id"] = scope_id
        return redirect(url_for("network.create_network"))
    else:
        current_app.logger.info(form.errors)
Benjamin Bertrand's avatar
Benjamin Bertrand committed
    return render_template("network/create_network.html", form=form)
@bp.route("/networks/edit/<vlan_name>", methods=("GET", "POST"))
@login_groups_accepted("admin")
def edit_network(vlan_name):
    network = models.Network.query.filter_by(vlan_name=vlan_name).first_or_404()
    form = EditNetworkForm(request.form, obj=network)
    if form.validate_on_submit():
        try:
            for field in (
                "vlan_name",
                "vlan_id",
                "description",
                "address",
                "first_ip",
                "last_ip",
                "gateway",
                "domain_id",
                "admin_only",
                "sensitive",
            ):
                setattr(network, field, getattr(form, field).data)
        except ValidationError as e:
            # Check for error raised by model validation (not implemented in form validation)
            current_app.logger.warning(f"{e}")
            flash(f"{e}", "error")
            return render_template("network/edit_network.html", form=form)
        current_app.logger.debug(f"Trying to update: {network!r}")
        try:
            db.session.commit()
        except sa.exc.IntegrityError as e:
            db.session.rollback()
            current_app.logger.warning(f"{e}")
            flash(f"{e}", "error")
        else:
            flash(f"Network {network} updated!", "success")
        return redirect(url_for("network.view_network", vlan_name=network.vlan_name))
    return render_template("network/edit_network.html", form=form)


Benjamin Bertrand's avatar
Benjamin Bertrand committed
@bp.route("/_retrieve_scope_defaults/<int:scope_id>")
@login_required
Benjamin Bertrand's avatar
Benjamin Bertrand committed
def retrieve_scope_defaults(scope_id):
    try:
        scope = models.NetworkScope.query.get(scope_id)
    except sa.exc.DataError:
Benjamin Bertrand's avatar
Benjamin Bertrand committed
        current_app.logger.warning(f"Invalid scope_id: {scope_id}")
        data = {
            "vlans": [],
            "prefixes": [],
            "selected_vlan": "",
            "selected_prefix": "",
            "domain_id": "",
        }
        vlans = [vlan_id for vlan_id in scope.available_vlans()]
        if vlans:
            selected_vlan = vlans[0]
        else:
            selected_vlan = ""
        prefixes = scope.prefix_range()
Benjamin Bertrand's avatar
Benjamin Bertrand committed
        default_prefix = current_app.config["NETWORK_DEFAULT_PREFIX"]
        if default_prefix in prefixes:
            selected_prefix = default_prefix
        else:
            selected_prefix = prefixes[0]
Benjamin Bertrand's avatar
Benjamin Bertrand committed
        data = {
            "vlans": vlans,
            "prefixes": prefixes,
            "selected_vlan": selected_vlan,
Benjamin Bertrand's avatar
Benjamin Bertrand committed
            "selected_prefix": selected_prefix,
            "domain_id": scope.domain_id,
        }
    return jsonify(data=data)


Benjamin Bertrand's avatar
Benjamin Bertrand committed
@bp.route("/_retrieve_subnets/<int:scope_id>/<int:prefix>")
@login_required
def retrieve_subnets(scope_id, prefix):
    try:
        scope = models.NetworkScope.query.get(scope_id)
    except sa.exc.DataError:
Benjamin Bertrand's avatar
Benjamin Bertrand committed
        current_app.logger.warning(f"Invalid scope_id: {scope_id}")
        data = {"subnets": [], "selected_subnet": ""}
        subnets = [subnet for subnet in scope.available_subnets(int(prefix))]
Benjamin Bertrand's avatar
Benjamin Bertrand committed
        data = {"subnets": subnets, "selected_subnet": subnets[0]}
    return jsonify(data=data)


Benjamin Bertrand's avatar
Benjamin Bertrand committed
@bp.route("/_retrieve_ips/<subnet>/<int:prefix>")
@login_required
def retrieve_ips(subnet, prefix):
    try:
Benjamin Bertrand's avatar
Benjamin Bertrand committed
        address = ipaddress.ip_network(f"{subnet}/{prefix}")
    except ValueError:
Benjamin Bertrand's avatar
Benjamin Bertrand committed
        current_app.logger.warning(f"Invalid address: {subnet}/{prefix}")
        data = {
            "ips": [],
            "selected_first": "",
            "selected_last": "",
            "selected_gateway": "",
        }
        hosts = [str(ip) for ip in address.hosts()]
        # The gateway is set to the last IP by default
        gateway = hosts[-1]
        if len(hosts) > 17:
            first = hosts[10]
            last = hosts[-6]
            last = hosts[-2]
        data = {
            "ips": hosts,
            "selected_first": first,
            "selected_last": last,
            "selected_gateway": gateway,
        }
    return jsonify(data=data)
@bp.route("/_retrieve_groups", methods=["POST"])
@login_required
def retrieve_groups():
    return utils.retrieve_data_for_datatables(request.values, models.AnsibleGroup)
Benjamin Bertrand's avatar
Benjamin Bertrand committed
@bp.route("/_generate_random_mac")
@login_required
def generate_random_mac():
Benjamin Bertrand's avatar
Benjamin Bertrand committed
    data = {"mac": utils.random_mac()}
    return jsonify(data=data)