Skip to content
Snippets Groups Projects
  • Benjamin Bertrand's avatar
    d9516a98
    Use sqlalchemy events hook to trigger tasks · d9516a98
    Benjamin Bertrand authored
    Use the before_flush hook to trigger the core services update.
    It's much nicer than having to call a method in several places.
    
    The core services update is triggered on creation/modification/deletion
    of an Interface.
    
    In the before_flush we can manipulate the session (add objects) but we
    shouldn't call commit.
    The db.session.commit() was removed from the launch_task method.
    d9516a98
    History
    Use sqlalchemy events hook to trigger tasks
    Benjamin Bertrand authored
    Use the before_flush hook to trigger the core services update.
    It's much nicer than having to call a method in several places.
    
    The core services update is triggered on creation/modification/deletion
    of an Interface.
    
    In the before_flush we can manipulate the session (add objects) but we
    shouldn't call commit.
    The db.session.commit() was removed from the launch_task method.
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
views.py 18.01 KiB
# -*- coding: utf-8 -*-
"""
app.network.views
~~~~~~~~~~~~~~~~~

This module implements the network blueprint.

:copyright: (c) 2017 European Spallation Source ERIC
:license: BSD 2-Clause, see LICENSE for more details.

"""
import ipaddress
import sqlalchemy as sa
from flask import (
    Blueprint,
    render_template,
    jsonify,
    session,
    redirect,
    url_for,
    request,
    flash,
    current_app,
)
from flask_login import login_required, current_user
from .forms import (
    HostForm,
    InterfaceForm,
    HostInterfaceForm,
    NetworkForm,
    NetworkScopeForm,
    DomainForm,
    CreateVMForm,
)
from ..extensions import db
from ..decorators import login_groups_accepted
from .. import models, utils, helpers, tasks

bp = Blueprint("network", __name__)


@bp.route("/hosts")
@login_required
def list_hosts():
    return render_template("network/hosts.html")


@bp.route("/hosts/create", methods=("GET", "POST"))
@login_groups_accepted("admin", "create")
def create_host():
    kwargs = {"random_mac": True}
    # Try to get the network_id from the session
    # to pre-fill the form with the same network
    try:
        network_id = session["network_id"]
    except KeyError:
        pass
    else:
        kwargs["network_id"] = network_id
    form = HostInterfaceForm(request.form, **kwargs)
    # Remove the host_id field inherited from the InterfaceForm
    # It's not used in this form
    del form.host_id
    if form.validate_on_submit():
        network_id = form.network_id.data
        host = models.Host(
            name=form.name.data,
            device_type_id=form.device_type_id.data,
            description=form.description.data or None,
        )
        # The total number of tags will always be quite small
        # It's more efficient to retrieve all of them in one query
        # and do the filtering here
        all_tags = models.Tag.query.all()
        tags = [tag for tag in all_tags if str(tag.id) in form.tags.data]
        interface = models.Interface(
            name=form.interface_name.data,
            ip=form.ip.data,
            network_id=network_id,
            tags=tags,
        )
        interface.cnames = [
            models.Cname(name=name) for name in form.cnames_string.data.split()
        ]
        helpers.associate_mac_to_interface(form.mac_address.data, interface)
        host.interfaces = [interface]
        current_app.logger.debug(f"Trying to create: {host!r}")
        db.session.add(host)
        try:
            db.session.commit()
        except sa.exc.IntegrityError as e:
            db.session.rollback()
            current_app.logger.warning(f"{e}")
            flash(f"{e}", "error")
        else:
            flash(f"Host {host} created!", "success")
        # Save network_id to the session to retrieve it after the redirect
        session["network_id"] = network_id
        return redirect(url_for("network.view_host", name=host.name))
    return render_template("network/create_host.html", form=form)


@bp.route("/hosts/view/<name>", methods=("GET", "POST"))
@login_required
def view_host(name):
    host = models.Host.query.filter_by(name=name).first_or_404()
    form = CreateVMForm()
    if host.is_ioc:
        form.cores.choices = utils.get_choices(current_app.config["VIOC_CORES_CHOICES"])
        form.memory.choices = utils.get_choices(
            current_app.config["VIOC_MEMORY_CHOICES"]
        )
    if form.validate_on_submit():
        if not current_user.is_admin:
            flash(f"Only admin users are allowed to create a VM!", "info")
            return redirect(url_for("network.view_host", name=name))
        else:
            interface = host.interfaces[0]
            task = tasks.trigger_vm_creation(
                name, interface, int(form.memory.data) * 1000, form.cores.data
            )
            db.session.commit()
            current_app.logger.info(f"Creation of {name} requested: task {task.id}")
            flash(
                f"Creation of {name} requested! Refresh the page to update the status.",
                "success",
            )
            return redirect(url_for("task.view_task", id_=task.id))
    return render_template("network/view_host.html", host=host, form=form)


@bp.route("/hosts/edit/<name>", methods=("GET", "POST"))
@login_groups_accepted("admin", "create")
def edit_host(name):
    host = models.Host.query.filter_by(name=name).first_or_404()
    form = HostForm(request.form, obj=host)
    if form.validate_on_submit():
        host.name = form.name.data
        host.device_type_id = form.device_type_id.data
        host.description = form.description.data or None
        current_app.logger.debug(f"Trying to update: {host!r}")
        try:
            db.session.commit()
        except sa.exc.IntegrityError as e:
            db.session.rollback()
            current_app.logger.warning(f"{e}")
            flash(f"{e}", "error")
        else:
            flash(f"Host {host} updated!", "success")
        return redirect(url_for("network.view_host", name=host.name))
    return render_template("network/edit_host.html", form=form)


@bp.route("/interfaces/create/<hostname>", methods=("GET", "POST"))
@login_groups_accepted("admin", "create")
def create_interface(hostname):
    host = models.Host.query.filter_by(name=hostname).first_or_404()
    random_mac = host.device_type.name.startswith("Virtual")
    form = InterfaceForm(
        request.form, host_id=host.id, interface_name=host.name, random_mac=random_mac
    )
    if form.validate_on_submit():
        # The total number of tags will always be quite small
        # It's more efficient to retrieve all of them in one query
        # and do the filtering here
        all_tags = models.Tag.query.all()
        tags = [tag for tag in all_tags if str(tag.id) in form.tags.data]
        interface = models.Interface(
            host_id=host.id,
            name=form.interface_name.data,
            ip=form.ip.data,
            network_id=form.network_id.data,
            tags=tags,
        )
        interface.cnames = [
            models.Cname(name=name) for name in form.cnames_string.data.split()
        ]
        helpers.associate_mac_to_interface(form.mac_address.data, interface)
        current_app.logger.debug(f"Trying to create: {interface!r}")
        db.session.add(interface)
        try:
            db.session.commit()
        except sa.exc.IntegrityError as e:
            db.session.rollback()
            current_app.logger.warning(f"{e}")
            flash(f"{e}", "error")
        else:
            flash(f"Host {interface} created!", "success")
        return redirect(url_for("network.create_interface", hostname=hostname))
    return render_template(
        "network/create_interface.html", form=form, hostname=hostname
    )


@bp.route("/interfaces/edit/<name>", methods=("GET", "POST"))
@login_groups_accepted("admin", "create")
def edit_interface(name):
    interface = models.Interface.query.filter_by(name=name).first_or_404()
    cnames_string = " ".join([str(cname) for cname in interface.cnames])
    try:
        mac_address = interface.mac.address
    except AttributeError:
        mac_address = ""
    form = InterfaceForm(
        request.form,
        obj=interface,
        interface_name=interface.name,
        mac_address=mac_address,
        cnames_string=cnames_string,
    )
    # Remove the random_mac field (not used when editing)
    del form.random_mac
    ips = [interface.ip]
    ips.extend([str(address) for address in interface.network.available_ips()])
    form.ip.choices = utils.get_choices(ips)
    # Passing tags as kwarg to the InterfaceForm doesn't work because
    # obj takes precedence (but interface.tags contain Tag instances and not id)
    # We need to update the default values. Calling process is required.
    # See https://stackoverflow.com/questions/5519729/wtforms-how-to-select-options-in-selectmultiplefield
    form.tags.default = [tag.id for tag in interface.tags]
    form.tags.process(request.form)
    if form.validate_on_submit():
        interface.name = form.interface_name.data
        interface.ip = form.ip.data
        interface.network_id = form.network_id.data
        if form.mac_address.data:
            if form.mac_address.data != mac_address:
                # The MAC changed - add the new one to the interface
                # that will remove the association to the previous one
                helpers.associate_mac_to_interface(form.mac_address.data, interface)
            # else: nothing to do (address didn't change)
        else:
            # No MAC associated
            interface.mac_id = None
        # Delete the cnames that have been removed
        new_cnames_string = form.cnames_string.data.split()
        for (index, cname) in enumerate(interface.cnames):
            if cname.name not in new_cnames_string:
                current_app.logger.debug(f"Deleting cname: {cname}")
                # Removing the cname from interface.cnames list will
                # delete it from the database due to the cascade
                # delete-orphan option defined on the model
                del interface.cnames[index]
        # Add new cnames
        for name in new_cnames_string:
            if name not in cnames_string:
                cname = models.Cname(name=name)
                current_app.logger.debug(f"Creating cname: {cname}")
                interface.cnames.append(cname)
        all_tags = models.Tag.query.all()
        tags = [tag for tag in all_tags if str(tag.id) in form.tags.data]
        interface.tags = tags
        current_app.logger.debug(f"Trying to update: {interface!r}")
        try:
            db.session.commit()
        except sa.exc.IntegrityError as e:
            db.session.rollback()
            current_app.logger.warning(f"{e}")
            flash(f"{e}", "error")
        else:
            flash(f"Interface {interface} updated!", "success")
        return redirect(url_for("network.view_host", name=interface.host.name))
    return render_template(
        "network/edit_interface.html", form=form, hostname=interface.host.name
    )


@bp.route("/interfaces/delete", methods=["POST"])
@login_groups_accepted("admin", "create")
def delete_interface():
    interface = models.Interface.query.get_or_404(request.form["interface_id"])
    hostname = interface.host.name
    # Deleting the interface will also delete all
    # associated cnames due to the cascade delete option
    # defined on the model
    db.session.delete(interface)
    db.session.commit()
    flash(f"Interface {interface.name} has been deleted", "success")
    return redirect(url_for("network.view_host", name=hostname))


@bp.route("/domains")
@login_required
def list_domains():
    return render_template("network/domains.html")


@bp.route("/domains/create", methods=("GET", "POST"))
@login_groups_accepted("admin")
def create_domain():
    form = DomainForm()
    if form.validate_on_submit():
        domain = models.Domain(name=form.name.data)
        current_app.logger.debug(f"Trying to create: {domain!r}")
        db.session.add(domain)
        try:
            db.session.commit()
        except sa.exc.IntegrityError as e:
            db.session.rollback()
            current_app.logger.warning(f"{e}")
            flash(f"{e}", "error")
        else:
            flash(f"Domain {domain} created!", "success")
        return redirect(url_for("network.create_domain"))
    return render_template("network/create_domain.html", form=form)


@bp.route("/scopes")
@login_required
def list_scopes():
    return render_template("network/scopes.html")


@bp.route("/scopes/create", methods=("GET", "POST"))
@login_groups_accepted("admin")
def create_scope():
    form = NetworkScopeForm()
    if form.validate_on_submit():
        scope = models.NetworkScope(
            name=form.name.data,
            description=form.description.data or None,
            first_vlan=form.first_vlan.data,
            last_vlan=form.last_vlan.data,
            supernet=form.supernet.data,
            domain_id=form.domain_id.data,
        )
        current_app.logger.debug(f"Trying to create: {scope!r}")
        db.session.add(scope)
        try:
            db.session.commit()
        except sa.exc.IntegrityError as e:
            db.session.rollback()
            current_app.logger.warning(f"{e}")
            flash(f"{e}", "error")
        else:
            flash(f"Network Scope {scope} created!", "success")
        return redirect(url_for("network.create_scope"))
    return render_template("network/create_scope.html", form=form)


@bp.route("/_retrieve_hosts")
@login_required
def retrieve_hosts():
    data = [
        (
            host.name,
            str(host.device_type),
            host.description,
            interface.name,
            interface.ip,
            str(interface.network),
        )
        for host in models.Host.query.all()
        for interface in host.interfaces
    ]
    return jsonify(data=data)


@bp.route("/_retrieve_first_available_ip/<int:network_id>")
@login_required
def retrieve_first_available_ip(network_id):
    try:
        network = models.Network.query.get(network_id)
    except sa.exc.DataError:
        current_app.logger.warning(f"Invalid network_id: {network_id}")
        data = ""
    else:
        data = str(network.available_ips()[0])
    return jsonify(data=data)


@bp.route("/networks")
@login_required
def list_networks():
    return render_template("network/networks.html")


@bp.route("/_retrieve_networks")
@login_required
def retrieve_networks():
    data = [
        (
            str(network.scope),
            network.vlan_name,
            network.vlan_id,
            network.description,
            network.address,
            network.first_ip,
            network.last_ip,
            str(network.domain),
            network.admin_only,
        )
        for network in models.Network.query.all()
    ]
    return jsonify(data=data)


@bp.route("/networks/create", methods=("GET", "POST"))
@login_groups_accepted("admin")
def create_network():
    # Try to get the scope_id from the session
    # to pre-fill the form with the same network scope
    try:
        scope_id = session["scope_id"]
    except KeyError:
        # No need to pass request.form when no extra keywords are given
        form = NetworkForm()
    else:
        form = NetworkForm(request.form, scope_id=scope_id)
    if form.validate_on_submit():
        scope_id = form.scope_id.data
        network = models.Network(
            scope_id=scope_id,
            vlan_name=form.vlan_name.data,
            vlan_id=form.vlan_id.data,
            description=form.description.data or None,
            address=form.address.data,
            first_ip=form.first_ip.data,
            last_ip=form.last_ip.data,
            domain_id=form.domain_id.data,
            admin_only=form.admin_only.data,
        )
        current_app.logger.debug(f"Trying to create: {network!r}")
        db.session.add(network)
        try:
            db.session.commit()
        except sa.exc.IntegrityError as e:
            db.session.rollback()
            current_app.logger.warning(f"{e}")
            flash(f"{e}", "error")
        else:
            flash(f"Network {network} created!", "success")
        # Save scope_id to the session to retrieve it after the redirect
        session["scope_id"] = scope_id
        return redirect(url_for("network.create_network"))
    return render_template("network/create_network.html", form=form)


@bp.route("/_retrieve_scope_defaults/<int:scope_id>")
@login_required
def retrieve_scope_defaults(scope_id):
    try:
        scope = models.NetworkScope.query.get(scope_id)
    except sa.exc.DataError:
        current_app.logger.warning(f"Invalid scope_id: {scope_id}")
        data = {
            "vlans": [],
            "prefixes": [],
            "selected_vlan": "",
            "selected_prefix": "",
            "domain_id": "",
        }
    else:
        vlans = [vlan_id for vlan_id in scope.available_vlans()]
        prefixes = scope.prefix_range()
        default_prefix = current_app.config["NETWORK_DEFAULT_PREFIX"]
        if default_prefix in prefixes:
            selected_prefix = default_prefix
        else:
            selected_prefix = prefixes[0]
        data = {
            "vlans": vlans,
            "prefixes": prefixes,
            "selected_vlan": vlans[0],
            "selected_prefix": selected_prefix,
            "domain_id": scope.domain_id,
        }
    return jsonify(data=data)


@bp.route("/_retrieve_subnets/<int:scope_id>/<int:prefix>")
@login_required
def retrieve_subnets(scope_id, prefix):
    try:
        scope = models.NetworkScope.query.get(scope_id)
    except sa.exc.DataError:
        current_app.logger.warning(f"Invalid scope_id: {scope_id}")
        data = {"subnets": [], "selected_subnet": ""}
    else:
        subnets = [subnet for subnet in scope.available_subnets(int(prefix))]
        data = {"subnets": subnets, "selected_subnet": subnets[0]}
    return jsonify(data=data)


@bp.route("/_retrieve_ips/<subnet>/<int:prefix>")
@login_required
def retrieve_ips(subnet, prefix):
    try:
        address = ipaddress.ip_network(f"{subnet}/{prefix}")
    except ValueError:
        current_app.logger.warning(f"Invalid address: {subnet}/{prefix}")
        data = {"ips": [], "first": "", "last": ""}
    else:
        hosts = [str(ip) for ip in address.hosts()]
        if len(hosts) > 17:
            first = hosts[10]
            last = hosts[-6]
        else:
            first = hosts[0]
            last = hosts[-1]
        data = {"ips": hosts, "selected_first": first, "selected_last": last}
    return jsonify(data=data)


@bp.route("/_retrieve_scopes")
@login_required
def retrieve_scopes():
    data = [
        (
            scope.name,
            scope.description,
            scope.first_vlan,
            scope.last_vlan,
            scope.supernet,
            str(scope.domain),
        )
        for scope in models.NetworkScope.query.all()
    ]
    return jsonify(data=data)


@bp.route("/_retrieve_domains")
@login_required
def retrieve_domains():
    data = [(domain.name,) for domain in models.Domain.query.all()]
    return jsonify(data=data)


@bp.route("/_generate_random_mac")
@login_required
def generate_random_mac():
    data = {"mac": utils.random_mac()}
    return jsonify(data=data)