Forked from
ICS Control System Infrastructure / csentry
299 commits behind the upstream repository.
-
Benjamin Bertrand authored
Use the before_flush hook to trigger the core services update. It's much nicer than having to call a method in several places. The core services update is triggered on creation/modification/deletion of an Interface. In the before_flush we can manipulate the session (add objects) but we shouldn't call commit. The db.session.commit() was removed from the launch_task method.
Benjamin Bertrand authoredUse the before_flush hook to trigger the core services update. It's much nicer than having to call a method in several places. The core services update is triggered on creation/modification/deletion of an Interface. In the before_flush we can manipulate the session (add objects) but we shouldn't call commit. The db.session.commit() was removed from the launch_task method.
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
views.py 18.01 KiB
# -*- coding: utf-8 -*-
"""
app.network.views
~~~~~~~~~~~~~~~~~
This module implements the network blueprint.
:copyright: (c) 2017 European Spallation Source ERIC
:license: BSD 2-Clause, see LICENSE for more details.
"""
import ipaddress
import sqlalchemy as sa
from flask import (
Blueprint,
render_template,
jsonify,
session,
redirect,
url_for,
request,
flash,
current_app,
)
from flask_login import login_required, current_user
from .forms import (
HostForm,
InterfaceForm,
HostInterfaceForm,
NetworkForm,
NetworkScopeForm,
DomainForm,
CreateVMForm,
)
from ..extensions import db
from ..decorators import login_groups_accepted
from .. import models, utils, helpers, tasks
bp = Blueprint("network", __name__)
@bp.route("/hosts")
@login_required
def list_hosts():
return render_template("network/hosts.html")
@bp.route("/hosts/create", methods=("GET", "POST"))
@login_groups_accepted("admin", "create")
def create_host():
kwargs = {"random_mac": True}
# Try to get the network_id from the session
# to pre-fill the form with the same network
try:
network_id = session["network_id"]
except KeyError:
pass
else:
kwargs["network_id"] = network_id
form = HostInterfaceForm(request.form, **kwargs)
# Remove the host_id field inherited from the InterfaceForm
# It's not used in this form
del form.host_id
if form.validate_on_submit():
network_id = form.network_id.data
host = models.Host(
name=form.name.data,
device_type_id=form.device_type_id.data,
description=form.description.data or None,
)
# The total number of tags will always be quite small
# It's more efficient to retrieve all of them in one query
# and do the filtering here
all_tags = models.Tag.query.all()
tags = [tag for tag in all_tags if str(tag.id) in form.tags.data]
interface = models.Interface(
name=form.interface_name.data,
ip=form.ip.data,
network_id=network_id,
tags=tags,
)
interface.cnames = [
models.Cname(name=name) for name in form.cnames_string.data.split()
]
helpers.associate_mac_to_interface(form.mac_address.data, interface)
host.interfaces = [interface]
current_app.logger.debug(f"Trying to create: {host!r}")
db.session.add(host)
try:
db.session.commit()
except sa.exc.IntegrityError as e:
db.session.rollback()
current_app.logger.warning(f"{e}")
flash(f"{e}", "error")
else:
flash(f"Host {host} created!", "success")
# Save network_id to the session to retrieve it after the redirect
session["network_id"] = network_id
return redirect(url_for("network.view_host", name=host.name))
return render_template("network/create_host.html", form=form)
@bp.route("/hosts/view/<name>", methods=("GET", "POST"))
@login_required
def view_host(name):
host = models.Host.query.filter_by(name=name).first_or_404()
form = CreateVMForm()
if host.is_ioc:
form.cores.choices = utils.get_choices(current_app.config["VIOC_CORES_CHOICES"])
form.memory.choices = utils.get_choices(
current_app.config["VIOC_MEMORY_CHOICES"]
)
if form.validate_on_submit():
if not current_user.is_admin:
flash(f"Only admin users are allowed to create a VM!", "info")
return redirect(url_for("network.view_host", name=name))
else:
interface = host.interfaces[0]
task = tasks.trigger_vm_creation(
name, interface, int(form.memory.data) * 1000, form.cores.data
)
db.session.commit()
current_app.logger.info(f"Creation of {name} requested: task {task.id}")
flash(
f"Creation of {name} requested! Refresh the page to update the status.",
"success",
)
return redirect(url_for("task.view_task", id_=task.id))
return render_template("network/view_host.html", host=host, form=form)
@bp.route("/hosts/edit/<name>", methods=("GET", "POST"))
@login_groups_accepted("admin", "create")
def edit_host(name):
host = models.Host.query.filter_by(name=name).first_or_404()
form = HostForm(request.form, obj=host)
if form.validate_on_submit():
host.name = form.name.data
host.device_type_id = form.device_type_id.data
host.description = form.description.data or None
current_app.logger.debug(f"Trying to update: {host!r}")
try:
db.session.commit()
except sa.exc.IntegrityError as e:
db.session.rollback()
current_app.logger.warning(f"{e}")
flash(f"{e}", "error")
else:
flash(f"Host {host} updated!", "success")
return redirect(url_for("network.view_host", name=host.name))
return render_template("network/edit_host.html", form=form)
@bp.route("/interfaces/create/<hostname>", methods=("GET", "POST"))
@login_groups_accepted("admin", "create")
def create_interface(hostname):
host = models.Host.query.filter_by(name=hostname).first_or_404()
random_mac = host.device_type.name.startswith("Virtual")
form = InterfaceForm(
request.form, host_id=host.id, interface_name=host.name, random_mac=random_mac
)
if form.validate_on_submit():
# The total number of tags will always be quite small
# It's more efficient to retrieve all of them in one query
# and do the filtering here
all_tags = models.Tag.query.all()
tags = [tag for tag in all_tags if str(tag.id) in form.tags.data]
interface = models.Interface(
host_id=host.id,
name=form.interface_name.data,
ip=form.ip.data,
network_id=form.network_id.data,
tags=tags,
)
interface.cnames = [
models.Cname(name=name) for name in form.cnames_string.data.split()
]
helpers.associate_mac_to_interface(form.mac_address.data, interface)
current_app.logger.debug(f"Trying to create: {interface!r}")
db.session.add(interface)
try:
db.session.commit()
except sa.exc.IntegrityError as e:
db.session.rollback()
current_app.logger.warning(f"{e}")
flash(f"{e}", "error")
else:
flash(f"Host {interface} created!", "success")
return redirect(url_for("network.create_interface", hostname=hostname))
return render_template(
"network/create_interface.html", form=form, hostname=hostname
)
@bp.route("/interfaces/edit/<name>", methods=("GET", "POST"))
@login_groups_accepted("admin", "create")
def edit_interface(name):
interface = models.Interface.query.filter_by(name=name).first_or_404()
cnames_string = " ".join([str(cname) for cname in interface.cnames])
try:
mac_address = interface.mac.address
except AttributeError:
mac_address = ""
form = InterfaceForm(
request.form,
obj=interface,
interface_name=interface.name,
mac_address=mac_address,
cnames_string=cnames_string,
)
# Remove the random_mac field (not used when editing)
del form.random_mac
ips = [interface.ip]
ips.extend([str(address) for address in interface.network.available_ips()])
form.ip.choices = utils.get_choices(ips)
# Passing tags as kwarg to the InterfaceForm doesn't work because
# obj takes precedence (but interface.tags contain Tag instances and not id)
# We need to update the default values. Calling process is required.
# See https://stackoverflow.com/questions/5519729/wtforms-how-to-select-options-in-selectmultiplefield
form.tags.default = [tag.id for tag in interface.tags]
form.tags.process(request.form)
if form.validate_on_submit():
interface.name = form.interface_name.data
interface.ip = form.ip.data
interface.network_id = form.network_id.data
if form.mac_address.data:
if form.mac_address.data != mac_address:
# The MAC changed - add the new one to the interface
# that will remove the association to the previous one
helpers.associate_mac_to_interface(form.mac_address.data, interface)
# else: nothing to do (address didn't change)
else:
# No MAC associated
interface.mac_id = None
# Delete the cnames that have been removed
new_cnames_string = form.cnames_string.data.split()
for (index, cname) in enumerate(interface.cnames):
if cname.name not in new_cnames_string:
current_app.logger.debug(f"Deleting cname: {cname}")
# Removing the cname from interface.cnames list will
# delete it from the database due to the cascade
# delete-orphan option defined on the model
del interface.cnames[index]
# Add new cnames
for name in new_cnames_string:
if name not in cnames_string:
cname = models.Cname(name=name)
current_app.logger.debug(f"Creating cname: {cname}")
interface.cnames.append(cname)
all_tags = models.Tag.query.all()
tags = [tag for tag in all_tags if str(tag.id) in form.tags.data]
interface.tags = tags
current_app.logger.debug(f"Trying to update: {interface!r}")
try:
db.session.commit()
except sa.exc.IntegrityError as e:
db.session.rollback()
current_app.logger.warning(f"{e}")
flash(f"{e}", "error")
else:
flash(f"Interface {interface} updated!", "success")
return redirect(url_for("network.view_host", name=interface.host.name))
return render_template(
"network/edit_interface.html", form=form, hostname=interface.host.name
)
@bp.route("/interfaces/delete", methods=["POST"])
@login_groups_accepted("admin", "create")
def delete_interface():
interface = models.Interface.query.get_or_404(request.form["interface_id"])
hostname = interface.host.name
# Deleting the interface will also delete all
# associated cnames due to the cascade delete option
# defined on the model
db.session.delete(interface)
db.session.commit()
flash(f"Interface {interface.name} has been deleted", "success")
return redirect(url_for("network.view_host", name=hostname))
@bp.route("/domains")
@login_required
def list_domains():
return render_template("network/domains.html")
@bp.route("/domains/create", methods=("GET", "POST"))
@login_groups_accepted("admin")
def create_domain():
form = DomainForm()
if form.validate_on_submit():
domain = models.Domain(name=form.name.data)
current_app.logger.debug(f"Trying to create: {domain!r}")
db.session.add(domain)
try:
db.session.commit()
except sa.exc.IntegrityError as e:
db.session.rollback()
current_app.logger.warning(f"{e}")
flash(f"{e}", "error")
else:
flash(f"Domain {domain} created!", "success")
return redirect(url_for("network.create_domain"))
return render_template("network/create_domain.html", form=form)
@bp.route("/scopes")
@login_required
def list_scopes():
return render_template("network/scopes.html")
@bp.route("/scopes/create", methods=("GET", "POST"))
@login_groups_accepted("admin")
def create_scope():
form = NetworkScopeForm()
if form.validate_on_submit():
scope = models.NetworkScope(
name=form.name.data,
description=form.description.data or None,
first_vlan=form.first_vlan.data,
last_vlan=form.last_vlan.data,
supernet=form.supernet.data,
domain_id=form.domain_id.data,
)
current_app.logger.debug(f"Trying to create: {scope!r}")
db.session.add(scope)
try:
db.session.commit()
except sa.exc.IntegrityError as e:
db.session.rollback()
current_app.logger.warning(f"{e}")
flash(f"{e}", "error")
else:
flash(f"Network Scope {scope} created!", "success")
return redirect(url_for("network.create_scope"))
return render_template("network/create_scope.html", form=form)
@bp.route("/_retrieve_hosts")
@login_required
def retrieve_hosts():
data = [
(
host.name,
str(host.device_type),
host.description,
interface.name,
interface.ip,
str(interface.network),
)
for host in models.Host.query.all()
for interface in host.interfaces
]
return jsonify(data=data)
@bp.route("/_retrieve_first_available_ip/<int:network_id>")
@login_required
def retrieve_first_available_ip(network_id):
try:
network = models.Network.query.get(network_id)
except sa.exc.DataError:
current_app.logger.warning(f"Invalid network_id: {network_id}")
data = ""
else:
data = str(network.available_ips()[0])
return jsonify(data=data)
@bp.route("/networks")
@login_required
def list_networks():
return render_template("network/networks.html")
@bp.route("/_retrieve_networks")
@login_required
def retrieve_networks():
data = [
(
str(network.scope),
network.vlan_name,
network.vlan_id,
network.description,
network.address,
network.first_ip,
network.last_ip,
str(network.domain),
network.admin_only,
)
for network in models.Network.query.all()
]
return jsonify(data=data)
@bp.route("/networks/create", methods=("GET", "POST"))
@login_groups_accepted("admin")
def create_network():
# Try to get the scope_id from the session
# to pre-fill the form with the same network scope
try:
scope_id = session["scope_id"]
except KeyError:
# No need to pass request.form when no extra keywords are given
form = NetworkForm()
else:
form = NetworkForm(request.form, scope_id=scope_id)
if form.validate_on_submit():
scope_id = form.scope_id.data
network = models.Network(
scope_id=scope_id,
vlan_name=form.vlan_name.data,
vlan_id=form.vlan_id.data,
description=form.description.data or None,
address=form.address.data,
first_ip=form.first_ip.data,
last_ip=form.last_ip.data,
domain_id=form.domain_id.data,
admin_only=form.admin_only.data,
)
current_app.logger.debug(f"Trying to create: {network!r}")
db.session.add(network)
try:
db.session.commit()
except sa.exc.IntegrityError as e:
db.session.rollback()
current_app.logger.warning(f"{e}")
flash(f"{e}", "error")
else:
flash(f"Network {network} created!", "success")
# Save scope_id to the session to retrieve it after the redirect
session["scope_id"] = scope_id
return redirect(url_for("network.create_network"))
return render_template("network/create_network.html", form=form)
@bp.route("/_retrieve_scope_defaults/<int:scope_id>")
@login_required
def retrieve_scope_defaults(scope_id):
try:
scope = models.NetworkScope.query.get(scope_id)
except sa.exc.DataError:
current_app.logger.warning(f"Invalid scope_id: {scope_id}")
data = {
"vlans": [],
"prefixes": [],
"selected_vlan": "",
"selected_prefix": "",
"domain_id": "",
}
else:
vlans = [vlan_id for vlan_id in scope.available_vlans()]
prefixes = scope.prefix_range()
default_prefix = current_app.config["NETWORK_DEFAULT_PREFIX"]
if default_prefix in prefixes:
selected_prefix = default_prefix
else:
selected_prefix = prefixes[0]
data = {
"vlans": vlans,
"prefixes": prefixes,
"selected_vlan": vlans[0],
"selected_prefix": selected_prefix,
"domain_id": scope.domain_id,
}
return jsonify(data=data)
@bp.route("/_retrieve_subnets/<int:scope_id>/<int:prefix>")
@login_required
def retrieve_subnets(scope_id, prefix):
try:
scope = models.NetworkScope.query.get(scope_id)
except sa.exc.DataError:
current_app.logger.warning(f"Invalid scope_id: {scope_id}")
data = {"subnets": [], "selected_subnet": ""}
else:
subnets = [subnet for subnet in scope.available_subnets(int(prefix))]
data = {"subnets": subnets, "selected_subnet": subnets[0]}
return jsonify(data=data)
@bp.route("/_retrieve_ips/<subnet>/<int:prefix>")
@login_required
def retrieve_ips(subnet, prefix):
try:
address = ipaddress.ip_network(f"{subnet}/{prefix}")
except ValueError:
current_app.logger.warning(f"Invalid address: {subnet}/{prefix}")
data = {"ips": [], "first": "", "last": ""}
else:
hosts = [str(ip) for ip in address.hosts()]
if len(hosts) > 17:
first = hosts[10]
last = hosts[-6]
else:
first = hosts[0]
last = hosts[-1]
data = {"ips": hosts, "selected_first": first, "selected_last": last}
return jsonify(data=data)
@bp.route("/_retrieve_scopes")
@login_required
def retrieve_scopes():
data = [
(
scope.name,
scope.description,
scope.first_vlan,
scope.last_vlan,
scope.supernet,
str(scope.domain),
)
for scope in models.NetworkScope.query.all()
]
return jsonify(data=data)
@bp.route("/_retrieve_domains")
@login_required
def retrieve_domains():
data = [(domain.name,) for domain in models.Domain.query.all()]
return jsonify(data=data)
@bp.route("/_generate_random_mac")
@login_required
def generate_random_mac():
data = {"mac": utils.random_mac()}
return jsonify(data=data)