Skip to content
Snippets Groups Projects
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
views.py 18.32 KiB
# -*- coding: utf-8 -*-
"""
app.network.views
~~~~~~~~~~~~~~~~~

This module implements the network blueprint.

:copyright: (c) 2017 European Spallation Source ERIC
:license: BSD 2-Clause, see LICENSE for more details.

"""
import ipaddress
import sqlalchemy as sa
from flask import (Blueprint, render_template, jsonify, session,
                   redirect, url_for, request, flash, current_app)
from flask_login import login_required, current_user
from .forms import (HostForm, InterfaceForm, HostInterfaceForm, NetworkForm,
                    NetworkScopeForm, DomainForm, CreateVMForm)
from ..extensions import db
from ..decorators import login_groups_accepted
from .. import models, utils, helpers, tasks

bp = Blueprint('network', __name__)


@bp.route('/hosts')
@login_required
def list_hosts():
    return render_template('network/hosts.html')


@bp.route('/hosts/create', methods=('GET', 'POST'))
@login_groups_accepted('admin', 'create')
def create_host():
    kwargs = {'random_mac': True}
    # Try to get the network_id from the session
    # to pre-fill the form with the same network
    try:
        network_id = session['network_id']
    except KeyError:
        pass
    else:
        kwargs['network_id'] = network_id
    form = HostInterfaceForm(request.form, **kwargs)
    # Remove the host_id field inherited from the InterfaceForm
    # It's not used in this form
    del form.host_id
    if form.validate_on_submit():
        network_id = form.network_id.data
        host = models.Host(name=form.name.data,
                           device_type_id=form.device_type_id.data,
                           description=form.description.data or None)
        # The total number of tags will always be quite small
        # It's more efficient to retrieve all of them in one query
        # and do the filtering here
        all_tags = models.Tag.query.all()
        tags = [tag for tag in all_tags if str(tag.id) in form.tags.data]
        interface = models.Interface(name=form.interface_name.data,
                                     ip=form.ip.data,
                                     network_id=network_id,
                                     tags=tags)
        interface.cnames = [models.Cname(name=name) for name in form.cnames_string.data.split()]
        helpers.associate_mac_to_interface(form.mac_address.data, interface)
        host.interfaces = [interface]
        current_app.logger.debug(f'Trying to create: {host!r}')
        db.session.add(host)
        try:
            db.session.commit()
        except sa.exc.IntegrityError as e:
            db.session.rollback()
            current_app.logger.warning(f'{e}')
            flash(f'{e}', 'error')
        else:
            flash(f'Host {host} created!', 'success')
            tasks.trigger_core_services_update()
        # Save network_id to the session to retrieve it after the redirect
        session['network_id'] = network_id
        return redirect(url_for('network.view_host', name=host.name))
    return render_template('network/create_host.html', form=form)


@bp.route('/hosts/view/<name>', methods=('GET', 'POST'))
@login_required
def view_host(name):
    host = models.Host.query.filter_by(name=name).first_or_404()
    form = CreateVMForm()
    if host.is_ioc:
        form.cores.choices = utils.get_choices(current_app.config['VIOC_CORES_CHOICES'])
        form.memory.choices = utils.get_choices(current_app.config['VIOC_MEMORY_CHOICES'])
    if form.validate_on_submit():
        if not current_user.is_admin:
            flash(f'Only admin users are allowed to create a VM!', 'info')
            return redirect(url_for('network.view_host', name=name))
        else:
            interface = host.interfaces[0]
            task = tasks.trigger_vm_creation(name, interface, int(form.memory.data) * 1000, form.cores.data)
            current_app.logger.info(f'Creation of {name} requested: task {task.id}')
            flash(f'Creation of {name} requested! Refresh the page to update the status.', 'success')
            return redirect(url_for('task.view_task', id_=task.id))
    return render_template('network/view_host.html', host=host, form=form)


@bp.route('/hosts/edit/<name>', methods=('GET', 'POST'))
@login_groups_accepted('admin', 'create')
def edit_host(name):
    host = models.Host.query.filter_by(name=name).first_or_404()
    form = HostForm(request.form, obj=host)
    if form.validate_on_submit():
        host.name = form.name.data
        host.device_type_id = form.device_type_id.data
        host.description = form.description.data or None
        current_app.logger.debug(f'Trying to update: {host!r}')
        try:
            db.session.commit()
        except sa.exc.IntegrityError as e:
            db.session.rollback()
            current_app.logger.warning(f'{e}')
            flash(f'{e}', 'error')
        else:
            flash(f'Host {host} updated!', 'success')
            tasks.trigger_core_services_update()
        return redirect(url_for('network.view_host', name=host.name))
    return render_template('network/edit_host.html', form=form)


@bp.route('/interfaces/create/<hostname>', methods=('GET', 'POST'))
@login_groups_accepted('admin', 'create')
def create_interface(hostname):
    host = models.Host.query.filter_by(name=hostname).first_or_404()
    random_mac = host.device_type.name.startswith('Virtual')
    form = InterfaceForm(request.form, host_id=host.id, interface_name=host.name,
                         random_mac=random_mac)
    if form.validate_on_submit():
        # The total number of tags will always be quite small
        # It's more efficient to retrieve all of them in one query
        # and do the filtering here
        all_tags = models.Tag.query.all()
        tags = [tag for tag in all_tags if str(tag.id) in form.tags.data]
        interface = models.Interface(host_id=host.id,
                                     name=form.interface_name.data,
                                     ip=form.ip.data,
                                     network_id=form.network_id.data,
                                     tags=tags)
        interface.cnames = [models.Cname(name=name) for name in form.cnames_string.data.split()]
        helpers.associate_mac_to_interface(form.mac_address.data, interface)
        current_app.logger.debug(f'Trying to create: {interface!r}')
        db.session.add(interface)
        try:
            db.session.commit()
        except sa.exc.IntegrityError as e:
            db.session.rollback()
            current_app.logger.warning(f'{e}')
            flash(f'{e}', 'error')
        else:
            flash(f'Host {interface} created!', 'success')
            tasks.trigger_core_services_update()
        return redirect(url_for('network.create_interface', hostname=hostname))
    return render_template('network/create_interface.html', form=form, hostname=hostname)


@bp.route('/interfaces/edit/<name>', methods=('GET', 'POST'))
@login_groups_accepted('admin', 'create')
def edit_interface(name):
    interface = models.Interface.query.filter_by(name=name).first_or_404()
    cnames_string = ' '.join([str(cname) for cname in interface.cnames])
    try:
        mac_address = interface.mac.address
    except AttributeError:
        mac_address = ''
    form = InterfaceForm(request.form, obj=interface,
                         interface_name=interface.name,
                         mac_address=mac_address,
                         cnames_string=cnames_string)
    # Remove the random_mac field (not used when editing)
    del form.random_mac
    ips = [interface.ip]
    ips.extend([str(address) for address in interface.network.available_ips()])
    form.ip.choices = utils.get_choices(ips)
    # Passing tags as kwarg to the InterfaceForm doesn't work because
    # obj takes precedence (but interface.tags contain Tag instances and not id)
    # We need to update the default values. Calling process is required.
    # See https://stackoverflow.com/questions/5519729/wtforms-how-to-select-options-in-selectmultiplefield
    form.tags.default = [tag.id for tag in interface.tags]
    form.tags.process(request.form)
    if form.validate_on_submit():
        interface.name = form.interface_name.data
        interface.ip = form.ip.data
        interface.network_id = form.network_id.data
        if form.mac_address.data:
            if form.mac_address.data != mac_address:
                # The MAC changed - add the new one to the interface
                # that will remove the association to the previous one
                helpers.associate_mac_to_interface(form.mac_address.data, interface)
            # else: nothing to do (address didn't change)
        else:
            # No MAC associated
            interface.mac_id = None
        # Delete the cnames that have been removed
        new_cnames_string = form.cnames_string.data.split()
        for (index, cname) in enumerate(interface.cnames):
            if cname.name not in new_cnames_string:
                current_app.logger.debug(f'Deleting cname: {cname}')
                # Removing the cname from interface.cnames list will
                # delete it from the database due to the cascade
                # delete-orphan option defined on the model
                del interface.cnames[index]
        # Add new cnames
        for name in new_cnames_string:
            if name not in cnames_string:
                cname = models.Cname(name=name)
                current_app.logger.debug(f'Creating cname: {cname}')
                interface.cnames.append(cname)
        all_tags = models.Tag.query.all()
        tags = [tag for tag in all_tags if str(tag.id) in form.tags.data]
        interface.tags = tags
        current_app.logger.debug(f'Trying to update: {interface!r}')
        try:
            db.session.commit()
        except sa.exc.IntegrityError as e:
            db.session.rollback()
            current_app.logger.warning(f'{e}')
            flash(f'{e}', 'error')
        else:
            flash(f'Interface {interface} updated!', 'success')
            tasks.trigger_core_services_update()
        return redirect(url_for('network.view_host', name=interface.host.name))
    return render_template('network/edit_interface.html', form=form, hostname=interface.host.name)


@bp.route('/interfaces/delete', methods=['POST'])
@login_groups_accepted('admin', 'create')
def delete_interface():
    interface = models.Interface.query.get_or_404(request.form['interface_id'])
    hostname = interface.host.name
    # Deleting the interface will also delete all
    # associated cnames due to the cascade delete option
    # defined on the model
    db.session.delete(interface)
    db.session.commit()
    flash(f'Interface {interface.name} has been deleted', 'success')
    return redirect(url_for('network.view_host', name=hostname))


@bp.route('/domains')
@login_required
def list_domains():
    return render_template('network/domains.html')


@bp.route('/domains/create', methods=('GET', 'POST'))
@login_groups_accepted('admin')
def create_domain():
    form = DomainForm()
    if form.validate_on_submit():
        domain = models.Domain(name=form.name.data)
        current_app.logger.debug(f'Trying to create: {domain!r}')
        db.session.add(domain)
        try:
            db.session.commit()
        except sa.exc.IntegrityError as e:
            db.session.rollback()
            current_app.logger.warning(f'{e}')
            flash(f'{e}', 'error')
        else:
            flash(f'Domain {domain} created!', 'success')
        return redirect(url_for('network.create_domain'))
    return render_template('network/create_domain.html', form=form)


@bp.route('/scopes')
@login_required
def list_scopes():
    return render_template('network/scopes.html')


@bp.route('/scopes/create', methods=('GET', 'POST'))
@login_groups_accepted('admin')
def create_scope():
    form = NetworkScopeForm()
    if form.validate_on_submit():
        scope = models.NetworkScope(name=form.name.data,
                                    description=form.description.data or None,
                                    first_vlan=form.first_vlan.data,
                                    last_vlan=form.last_vlan.data,
                                    supernet=form.supernet.data,
                                    domain_id=form.domain_id.data)
        current_app.logger.debug(f'Trying to create: {scope!r}')
        db.session.add(scope)
        try:
            db.session.commit()
        except sa.exc.IntegrityError as e:
            db.session.rollback()
            current_app.logger.warning(f'{e}')
            flash(f'{e}', 'error')
        else:
            flash(f'Network Scope {scope} created!', 'success')
        return redirect(url_for('network.create_scope'))
    return render_template('network/create_scope.html', form=form)


@bp.route('/_retrieve_hosts')
@login_required
def retrieve_hosts():
    data = [(host.name,
             str(host.device_type),
             host.description,
             interface.name,
             interface.ip,
             str(interface.network))
            for host in models.Host.query.all()
            for interface in host.interfaces]
    return jsonify(data=data)


@bp.route('/_retrieve_first_available_ip/<int:network_id>')
@login_required
def retrieve_first_available_ip(network_id):
    try:
        network = models.Network.query.get(network_id)
    except sa.exc.DataError:
        current_app.logger.warning(f'Invalid network_id: {network_id}')
        data = ''
    else:
        data = str(network.available_ips()[0])
    return jsonify(data=data)


@bp.route('/networks')
@login_required
def list_networks():
    return render_template('network/networks.html')


@bp.route('/_retrieve_networks')
@login_required
def retrieve_networks():
    data = [(str(network.scope),
             network.vlan_name,
             network.vlan_id,
             network.description,
             network.address,
             network.first_ip,
             network.last_ip,
             str(network.domain),
             network.admin_only)
            for network in models.Network.query.all()]
    return jsonify(data=data)


@bp.route('/networks/create', methods=('GET', 'POST'))
@login_groups_accepted('admin')
def create_network():
    # Try to get the scope_id from the session
    # to pre-fill the form with the same network scope
    try:
        scope_id = session['scope_id']
    except KeyError:
        # No need to pass request.form when no extra keywords are given
        form = NetworkForm()
    else:
        form = NetworkForm(request.form, scope_id=scope_id)
    if form.validate_on_submit():
        scope_id = form.scope_id.data
        network = models.Network(scope_id=scope_id,
                                 vlan_name=form.vlan_name.data,
                                 vlan_id=form.vlan_id.data,
                                 description=form.description.data or None,
                                 address=form.address.data,
                                 first_ip=form.first_ip.data,
                                 last_ip=form.last_ip.data,
                                 domain_id=form.domain_id.data,
                                 admin_only=form.admin_only.data)
        current_app.logger.debug(f'Trying to create: {network!r}')
        db.session.add(network)
        try:
            db.session.commit()
        except sa.exc.IntegrityError as e:
            db.session.rollback()
            current_app.logger.warning(f'{e}')
            flash(f'{e}', 'error')
        else:
            flash(f'Network {network} created!', 'success')
        # Save scope_id to the session to retrieve it after the redirect
        session['scope_id'] = scope_id
        return redirect(url_for('network.create_network'))
    return render_template('network/create_network.html', form=form)


@bp.route('/_retrieve_scope_defaults/<int:scope_id>')
@login_required
def retrieve_scope_defaults(scope_id):
    try:
        scope = models.NetworkScope.query.get(scope_id)
    except sa.exc.DataError:
        current_app.logger.warning(f'Invalid scope_id: {scope_id}')
        data = {'vlans': [], 'prefixes': [],
                'selected_vlan': '', 'selected_prefix': '',
                'domain_id': ''}
    else:
        vlans = [vlan_id for vlan_id in scope.available_vlans()]
        prefixes = scope.prefix_range()
        default_prefix = current_app.config['NETWORK_DEFAULT_PREFIX']
        if default_prefix in prefixes:
            selected_prefix = default_prefix
        else:
            selected_prefix = prefixes[0]
        data = {'vlans': vlans,
                'prefixes': prefixes,
                'selected_vlan': vlans[0],
                'selected_prefix': selected_prefix,
                'domain_id': scope.domain_id}
    return jsonify(data=data)


@bp.route('/_retrieve_subnets/<int:scope_id>/<int:prefix>')
@login_required
def retrieve_subnets(scope_id, prefix):
    try:
        scope = models.NetworkScope.query.get(scope_id)
    except sa.exc.DataError:
        current_app.logger.warning(f'Invalid scope_id: {scope_id}')
        data = {'subnets': [], 'selected_subnet': ''}
    else:
        subnets = [subnet for subnet in scope.available_subnets(int(prefix))]
        data = {'subnets': subnets,
                'selected_subnet': subnets[0]}
    return jsonify(data=data)


@bp.route('/_retrieve_ips/<subnet>/<int:prefix>')
@login_required
def retrieve_ips(subnet, prefix):
    try:
        address = ipaddress.ip_network(f'{subnet}/{prefix}')
    except ValueError:
        current_app.logger.warning(f'Invalid address: {subnet}/{prefix}')
        data = {'ips': [], 'first': '', 'last': ''}
    else:
        hosts = [str(ip) for ip in address.hosts()]
        if len(hosts) > 17:
            first = hosts[10]
            last = hosts[-6]
        else:
            first = hosts[0]
            last = hosts[-1]
        data = {'ips': hosts,
                'selected_first': first,
                'selected_last': last}
    return jsonify(data=data)


@bp.route('/_retrieve_scopes')
@login_required
def retrieve_scopes():
    data = [(scope.name,
             scope.description,
             scope.first_vlan,
             scope.last_vlan,
             scope.supernet,
             str(scope.domain))
            for scope in models.NetworkScope.query.all()]
    return jsonify(data=data)


@bp.route('/_retrieve_domains')
@login_required
def retrieve_domains():
    data = [(domain.name,)
            for domain in models.Domain.query.all()]
    return jsonify(data=data)


@bp.route('/_generate_random_mac')
@login_required
def generate_random_mac():
    data = {'mac': utils.random_mac()}
    return jsonify(data=data)