"master/fsm_slave.h" did not exist on "2873c5447d02ea28d0efaacab805a2cbe0d0fbe1"
Newer
Older
app.network.views
~~~~~~~~~~~~~~~~~
This module implements the network blueprint.
:copyright: (c) 2017 European Spallation Source ERIC
:license: BSD 2-Clause, see LICENSE for more details.
"""
from flask import (
Blueprint,
render_template,
jsonify,
session,
redirect,
url_for,
request,
flash,
current_app,
from flask_login import login_required, current_user
from .forms import (
HostForm,
InterfaceForm,
HostInterfaceForm,
NetworkForm,
NetworkScopeForm,
DomainForm,
CreateVMForm,
from ..extensions import db
from ..decorators import login_groups_accepted
@bp.route("/_retrieve_hosts", methods=["POST"])
@login_required
def retrieve_hosts():
return utils.retrieve_data_for_datatables(
request.values, models.Host, filter_sensitive=True
)
@bp.route("/hosts/create", methods=("GET", "POST"))
@login_groups_accepted("admin", "network")
# Try to get the network_id from the session
# to pre-fill the form with the same network
if session.get("network_id"):
kwargs["network_id"] = session["network_id"]
# Same for the device_type
if session.get("device_type_id"):
kwargs["device_type_id"] = session["device_type_id"]
form = HostInterfaceForm(request.form, **kwargs)
# Remove the host_id field inherited from the InterfaceForm
# It's not used in this form
del form.host_id
# First interface name shall be identical to host name
del form.interface_name
device_type_id = form.device_type_id.data
network = models.Network.query.get(network_id)
if not current_user.has_access_to_network(network):
abort(403)
models.AnsibleGroup.query.get(id_) for id_ in form.ansible_groups.data
try:
host = models.Host(
name=form.name.data,
device_type=models.DeviceType.query.get(device_type_id),
is_ioc=form.is_ioc.data,
description=form.description.data or None,
ansible_vars=form.ansible_vars.data or None,
ansible_groups=ansible_groups,
)
interface = models.Interface(
host=host,
name=form.name.data,
ip=form.ip.data,
mac=form.mac.data,
)
interface.cnames = [
models.Cname(name=name) for name in form.cnames_string.data.split()
]
except ValidationError as e:
# Check for error raised by model validation (not implemented in form validation)
current_app.logger.warning(f"{e}")
flash(f"{e}", "error")
return render_template("network/create_host.html", form=form)
current_app.logger.debug(f"Trying to create: {host!r}")
db.session.add(host)
try:
db.session.commit()
except sa.exc.IntegrityError as e:
db.session.rollback()
current_app.logger.warning(f"{e}")
flash(f"{e}", "error")
if host.is_ioc:
utils.trigger_ioc_repository_creation(host)
db.session.commit()
# Save network_id and device_type_id to the session to retrieve them after the redirect
session["device_type_id"] = device_type_id
return redirect(url_for("network.view_host", name=host.name))
return render_template("network/create_host.html", form=form)
@bp.route("/hosts/delete", methods=["POST"])
def delete_host():
host = models.Host.query.get_or_404(request.form["host_id"])
if not current_user.can_delete_host(host):
abort(403)
# Deleting the host will also delete all
# associated interfaces due to the cascade delete option
# defined on the model
db.session.delete(host)
db.session.commit()
flash(f"Host {host.name} has been deleted", "success")
if host.device_type.name == "VirtualMachine":
flash(
"Note that the host was only removed from CSEntry. "
"To delete a running VM, please contact an administrator.",
"info",
)
return redirect(url_for("network.list_hosts"))
@bp.route("/hosts/view/<name>", methods=("GET", "POST"))
@login_required
def view_host(name):
host = models.Host.query.filter_by(name=name).first_or_404()
if not current_user.is_admin and host.sensitive:
# Only admin users can see hosts on sensitive networks
abort(403)
if host.main_interface is None:
flash(f"Host {host.name} has no interface! Add one or delete it.", "warning")
elif host.main_interface.name != host.name:
flash(
f"The main interface '{host.main_interface.name}' shall have the same name as the host!"
f" Please rename it '{host.name}'.",
"warning",
)
if host.device_type.name == "PhysicalMachine":
elif host.device_type.name.startswith("Virtual"):
form = CreateVMForm()
if host.is_ioc:
for field in ("cores", "memory", "disk", "osversion"):
key = f"VIOC_{field.upper()}_CHOICES"
getattr(form, field).choices = utils.get_choices(
current_app.config[key]
)
else:
form = None
if form is not None and form.validate_on_submit():
if host.device_type.name == "PhysicalMachine":
if not current_user.can_set_boot_profile(host):
flash(
f"You don't have the proper permissions to set the boot profile. Please contact an admin user.",
"warning",
)
return redirect(url_for("network.view_host", name=name))
else:
boot_profile = form.boot_profile.data
task = utils.trigger_set_network_boot_profile(
host, boot_profile=boot_profile
)
if boot_profile != "localboot":
# For localboot, there is no need to update the variable
# csentry_autoinstall_boot_profile is used for DHCP options
if utils.update_ansible_vars(
host, {"csentry_autoinstall_boot_profile": boot_profile}
):
# If a change occured, force DHCP update
utils.trigger_core_services_update()
db.session.commit()
current_app.logger.info(
f"Set network boot profile to {boot_profile} for {name} requested: task {task.id}"
)
flash(
f"Set network boot profile to {boot_profile} for {name} requested! "
"Refresh the page to update the status. You can reboot the machine when the task is done.",
"success",
)
return redirect(url_for("task.view_task", id_=task.id))
if not current_user.can_create_vm(host):
flash(
f"You don't have the proper permissions to create this VM. Please contact an admin user.",
"warning",
)
return redirect(url_for("network.view_host", name=name))
else:
host,
vm_disk_size=int(form.disk.data),
vm_cores=int(form.cores.data),
vm_memory=int(form.memory.data) * 1024,
vm_osversion=form.osversion.data,
skip_post_install_job=form.skip_post_install_job.data,
)
db.session.commit()
current_app.logger.info(f"Creation of {name} requested: task {task.id}")
flash(
f"Creation of {name} requested! Refresh the page to update the status.",
"success",
)
return redirect(url_for("task.view_task", id_=task.id))
return render_template("network/view_host.html", host=host, form=form)
@bp.route("/hosts/edit/<name>", methods=("GET", "POST"))
@login_groups_accepted("admin", "network")
def edit_host(name):
host = models.Host.query.filter_by(name=name).first_or_404()
if not current_user.has_access_to_network(host.main_network):
abort(403)
form = HostForm(request.form, obj=host)
# Passing ansible_groups as kwarg to the HostForm doesn't work because
# obj takes precedence (but host.ansible_groups contain AnsibleGroup instances and not id)
# We need to update the default values. Calling process is required.
# See https://stackoverflow.com/questions/5519729/wtforms-how-to-select-options-in-selectmultiplefield
form.ansible_groups.default = [group.id for group in host.ansible_groups]
form.ansible_groups.process(request.form)
if form.validate_on_submit():
try:
host.name = form.name.data
host.device_type = models.DeviceType.query.get(form.device_type_id.data)
host.is_ioc = form.is_ioc.data
host.description = form.description.data or None
host.ansible_vars = form.ansible_vars.data or None
host.ansible_groups = [
models.AnsibleGroup.query.get(id_) for id_ in form.ansible_groups.data
]
# Interface names shall always start with the host name
for interface in host.interfaces:
interface.name = interface.name.replace(name, host.name, 1)
except ValidationError as e:
# Check for error raised by model validation (not implemented in form validation)
current_app.logger.warning(f"{e}")
flash(f"{e}", "error")
return render_template("network/edit_host.html", form=form)
current_app.logger.debug(f"Trying to update: {host!r}")
try:
db.session.commit()
except sa.exc.IntegrityError as e:
db.session.rollback()
current_app.logger.warning(f"{e}")
flash(f"{e}", "error")
if host.is_ioc:
utils.trigger_ioc_repository_creation(host)
db.session.commit()
return redirect(url_for("network.view_host", name=host.name))
return render_template("network/edit_host.html", form=form)
@bp.route("/interfaces/create/<hostname>", methods=("GET", "POST"))
@login_groups_accepted("admin", "network")
def create_interface(hostname):
host = models.Host.query.filter_by(name=hostname).first_or_404()
# User shall have access to the host main interface domain
if not current_user.has_access_to_network(host.main_network):
abort(403)
random_mac = host.device_type.name.startswith("Virtual")
form = InterfaceForm(
request.form, host_id=host.id, interface_name=host.name, random_mac=random_mac
)
if not current_user.is_admin and host.main_network is not None:
# Restrict the networks to the same network scope as the main interface
form.network_id.choices = [
(str(network.id), network.vlan_name)
for network in models.Network.query.filter_by(scope=host.main_network.scope)
.order_by(models.Network.vlan_name)
.all()
if current_user.has_access_to_network(network)
]
if form.validate_on_submit():
# User shall have access to the new interface domain
network = models.Network.query.get(form.network_id.data)
if not current_user.has_access_to_network(network):
abort(403)
try:
interface = models.Interface(
host=host,
name=form.interface_name.data,
ip=form.ip.data,
mac=form.mac.data,
)
interface.cnames = [
models.Cname(name=name) for name in form.cnames_string.data.split()
]
except ValidationError as e:
# Check for error raised by model validation (not implemented in form validation)
current_app.logger.warning(f"{e}")
flash(f"{e}", "error")
return render_template(
"network/create_interface.html", form=form, hostname=hostname
)
current_app.logger.debug(f"Trying to create: {interface!r}")
db.session.add(interface)
try:
db.session.commit()
except sa.exc.IntegrityError as e:
db.session.rollback()
current_app.logger.warning(f"{e}")
flash(f"{e}", "error")
flash(f"Interface {interface} created!", "success")
return redirect(url_for("network.create_interface", hostname=hostname))
return render_template(
"network/create_interface.html", form=form, hostname=hostname
)
@bp.route("/interfaces/edit/<name>", methods=("GET", "POST"))
@login_groups_accepted("admin", "network")
def edit_interface(name):
interface = models.Interface.query.filter_by(name=name).first_or_404()
if not current_user.has_access_to_network(interface.network):
abort(403)
cnames_string = " ".join([str(cname) for cname in interface.cnames])
form = InterfaceForm(
request.form,
obj=interface,
interface_name=interface.name,
cnames_string=cnames_string,
)
if not current_user.is_admin and not interface.is_main:
# Restrict the networks to the same network scope as the main interface
form.network_id.choices = [
(str(network.id), network.vlan_name)
for network in models.Network.query.filter_by(
scope=interface.host.main_network.scope
)
.order_by(models.Network.vlan_name)
.all()
if current_user.has_access_to_network(network)
]
# Remove the random_mac field (not used when editing)
del form.random_mac
ips = [interface.ip]
ips.extend([str(address) for address in interface.network.available_ips()])
form.ip.choices = utils.get_choices(ips)
if form.validate_on_submit():
network = models.Network.query.get(form.network_id.data)
if not current_user.has_access_to_network(network):
abort(403)
try:
interface.name = form.interface_name.data
interface.ip = form.ip.data
interface.mac = form.mac.data
# Setting directly network_id doesn't update the relationship and bypass the checks
# performed on the model
# Delete the cnames that have been removed
new_cnames_string = form.cnames_string.data.split()
for (index, cname) in enumerate(interface.cnames):
if cname.name not in new_cnames_string:
current_app.logger.debug(f"Deleting cname: {cname}")
# Removing the cname from interface.cnames list will
# delete it from the database due to the cascade
# delete-orphan option defined on the model
del interface.cnames[index]
# Add new cnames
for name in new_cnames_string:
if name not in cnames_string:
cname = models.Cname(name=name)
current_app.logger.debug(f"Creating cname: {cname}")
interface.cnames.append(cname)
except ValidationError as e:
# Check for error raised by model validation (not implemented in form validation)
current_app.logger.warning(f"{e}")
flash(f"{e}", "error")
return render_template(
"network/edit_interface.html", form=form, hostname=interface.host.name
)
# Mark the host as "dirty" to add it to the session so that it will
# be re-indexed
sa.orm.attributes.flag_modified(interface.host, "interfaces")
current_app.logger.debug(f"Trying to update: {interface!r}")
try:
db.session.commit()
except sa.exc.IntegrityError as e:
db.session.rollback()
current_app.logger.warning(f"{e}")
flash(f"{e}", "error")
flash(f"Interface {interface} updated!", "success")
return redirect(url_for("network.view_host", name=interface.host.name))
return render_template(
"network/edit_interface.html", form=form, hostname=interface.host.name
)
@bp.route("/interfaces/delete", methods=["POST"])
@login_groups_accepted("admin", "network")
def delete_interface():
interface = models.Interface.query.get_or_404(request.form["interface_id"])
if not current_user.has_access_to_network(interface.network):
abort(403)
hostname = interface.host.name
# Explicitely remove the interface from the host to make sure
# it will be re-indexed
interface.host.interfaces.remove(interface)
# Deleting the interface will also delete all
# associated cnames due to the cascade delete option
# defined on the model
db.session.delete(interface)
db.session.commit()
flash(f"Interface {interface.name} has been deleted", "success")
return redirect(url_for("network.view_host", name=hostname))
@bp.route("/groups")
@login_required
def list_ansible_groups():
return render_template("network/groups.html")
@bp.route("/groups/view/<name>", methods=("GET", "POST"))
@login_required
def view_ansible_group(name):
group = models.AnsibleGroup.query.filter_by(name=name).first_or_404()
return render_template("network/view_group.html", group=group)
@bp.route("/groups/delete", methods=["POST"])
@login_groups_accepted("admin")
def delete_ansible_group():
group = models.AnsibleGroup.query.get_or_404(request.form["group_id"])
db.session.delete(group)
db.session.commit()
flash(f"Group {group.name} has been deleted", "success")
return redirect(url_for("network.list_ansible_groups"))
@bp.route("/groups/edit/<name>", methods=("GET", "POST"))
@login_groups_accepted("admin")
def edit_ansible_group(name):
group = models.AnsibleGroup.query.filter_by(name=name).first_or_404()
form = AnsibleGroupForm(request.form, obj=group)
# Restrict the children that can be added
# We don't check parents of parents, but that will be catched by the validate_children
# and raise a ValidationError
form.children.choices = [
(str(ansible_group.id), ansible_group.name)
for ansible_group in models.AnsibleGroup.query.order_by(
models.AnsibleGroup.name
).all()
if (ansible_group not in group.parents)
and (ansible_group.name != "all")
and (ansible_group != group)
]
# Passing hosts as kwarg to the AnsibleGroupForm doesn't work because
# obj takes precedence (but group.hosts contain Host instances and not id)
# We need to update the default values. Calling process is required.
# See https://stackoverflow.com/questions/5519729/wtforms-how-to-select-options-in-selectmultiplefield
form.hosts.default = [host.id for host in group.hosts]
form.hosts.process(request.form)
# Same for AnsibleGroup children
form.children.default = [child.id for child in group.children]
form.children.process(request.form)
try:
group.name = form.name.data
group.vars = form.vars.data or None
group.type = form.type.data
group.hosts = [models.Host.query.get(id_) for id_ in form.hosts.data]
group.children = [
models.AnsibleGroup.query.get(id_) for id_ in form.children.data
]
except ValidationError as e:
# Check for error raised by model validation (not implemented in form validation)
current_app.logger.warning(f"{e}")
flash(f"{e}", "error")
return render_template("network/edit_group.html", form=form)
current_app.logger.debug(f"Trying to update: {group!r}")
try:
db.session.commit()
except sa.exc.IntegrityError as e:
db.session.rollback()
current_app.logger.warning(f"{e}")
flash(f"{e}", "error")
else:
flash(f"Group {group} updated!", "success")
return redirect(url_for("network.view_ansible_group", name=group.name))
return render_template("network/edit_group.html", form=form)
@bp.route("/groups/create", methods=("GET", "POST"))
@login_groups_accepted("admin")
def create_ansible_group():
form = AnsibleGroupForm()
if form.validate_on_submit():
hosts = [models.Host.query.get(id_) for id_ in form.hosts.data]
children = [models.AnsibleGroup.query.get(id_) for id_ in form.children.data]
try:
group = models.AnsibleGroup(
name=form.name.data,
vars=form.vars.data or None,
type=form.type.data,
hosts=hosts,
children=children,
)
except ValidationError as e:
# Check for error raised by model validation (not implemented in form validation)
current_app.logger.warning(f"{e}")
flash(f"{e}", "error")
return render_template("network/create_group.html", form=form)
current_app.logger.debug(f"Trying to create: {group!r}")
db.session.add(group)
try:
db.session.commit()
except sa.exc.IntegrityError as e:
db.session.rollback()
current_app.logger.warning(f"{e}")
flash(f"{e}", "error")
else:
flash(f"Group {group} created!", "success")
return redirect(url_for("network.view_ansible_group", name=group.name))
return render_template("network/create_group.html", form=form)
domains = models.Domain.query.all()
return render_template("network/domains.html", domains=domains)
@bp.route("/domains/create", methods=("GET", "POST"))
@login_groups_accepted("admin")
def create_domain():
form = DomainForm()
if form.validate_on_submit():
domain = models.Domain(name=form.name.data)
current_app.logger.debug(f"Trying to create: {domain!r}")
db.session.add(domain)
try:
db.session.commit()
except sa.exc.IntegrityError as e:
db.session.rollback()
current_app.logger.warning(f"{e}")
flash(f"{e}", "error")
flash(f"Domain {domain} created!", "success")
return redirect(url_for("network.create_domain"))
return render_template("network/create_domain.html", form=form)
@login_groups_accepted("admin")
scopes = models.NetworkScope.query.all()
return render_template("network/scopes.html", scopes=scopes)
@login_groups_accepted("admin")
def view_scope(name):
scope = models.NetworkScope.query.filter_by(name=name).first_or_404()
return render_template("network/view_scope.html", scope=scope)
@bp.route("/scopes/create", methods=("GET", "POST"))
@login_groups_accepted("admin")
def create_scope():
form = NetworkScopeForm()
if form.validate_on_submit():
try:
scope = models.NetworkScope(
name=form.name.data,
description=form.description.data or None,
first_vlan=form.first_vlan.data,
last_vlan=form.last_vlan.data,
supernet=form.supernet.data,
domain=models.Domain.query.get(form.domain_id.data),
)
except ValidationError as e:
# Check for error raised by model validation (not implemented in form validation)
current_app.logger.warning(f"{e}")
flash(f"{e}", "error")
return render_template("network/create_scope.html", form=form)
current_app.logger.debug(f"Trying to create: {scope!r}")
db.session.add(scope)
try:
db.session.commit()
except sa.exc.IntegrityError as e:
db.session.rollback()
current_app.logger.warning(f"{e}")
flash(f"{e}", "error")
flash(f"Network Scope {scope} created!", "success")
return redirect(url_for("network.create_scope"))
return render_template("network/create_scope.html", form=form)
@bp.route("/_retrieve_first_available_ip/<int:network_id>")
def retrieve_first_available_ip(network_id):
try:
network = models.Network.query.get(network_id)
except sa.exc.DataError:
current_app.logger.warning(f"Invalid network_id: {network_id}")
data = ""
data = str(network.available_ips()[0])
@login_groups_accepted("admin", "network")
networks = models.Network.query.all()
if not current_user.is_admin:
networks = [
network
for network in networks
if current_user.has_access_to_network(network)
]
return render_template("network/networks.html", networks=networks)
@login_groups_accepted("admin", "network")
def view_network(vlan_name):
network = models.Network.query.filter_by(vlan_name=vlan_name).first_or_404()
if not current_user.has_access_to_network(network):
abort(403)
return render_template("network/view_network.html", network=network)
@bp.route("/networks/create", methods=("GET", "POST"))
@login_groups_accepted("admin")
def create_network():
# Try to get the scope_id from the session
# to pre-fill the form with the same network scope
try:
except KeyError:
# No need to pass request.form when no extra keywords are given
form = NetworkForm()
else:
form = NetworkForm(request.form, scope_id=scope_id)
if form.validate_on_submit():
scope_id = form.scope_id.data
try:
network = models.Network(
scope=models.NetworkScope.query.get(scope_id),
vlan_name=form.vlan_name.data,
vlan_id=form.vlan_id.data,
description=form.description.data or None,
address=form.address.data,
first_ip=form.first_ip.data,
last_ip=form.last_ip.data,
gateway=form.gateway.data,
domain=models.Domain.query.get(form.domain_id.data),
admin_only=form.admin_only.data,
)
except ValidationError as e:
# Check for error raised by model validation (not implemented in form validation)
current_app.logger.warning(f"{e}")
flash(f"{e}", "error")
return render_template("network/create_network.html", form=form)
current_app.logger.debug(f"Trying to create: {network!r}")
db.session.add(network)
try:
db.session.commit()
except sa.exc.IntegrityError as e:
db.session.rollback()
current_app.logger.warning(f"{e}")
flash(f"{e}", "error")
flash(f"Network {network} created!", "success")
# Save scope_id to the session to retrieve it after the redirect
session["scope_id"] = scope_id
return redirect(url_for("network.create_network"))
else:
current_app.logger.info(form.errors)
return render_template("network/create_network.html", form=form)
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
@bp.route("/networks/edit/<vlan_name>", methods=("GET", "POST"))
@login_groups_accepted("admin")
def edit_network(vlan_name):
network = models.Network.query.filter_by(vlan_name=vlan_name).first_or_404()
form = EditNetworkForm(request.form, obj=network)
if form.validate_on_submit():
try:
for field in (
"vlan_name",
"vlan_id",
"description",
"address",
"first_ip",
"last_ip",
"gateway",
"domain_id",
"admin_only",
"sensitive",
):
setattr(network, field, getattr(form, field).data)
except ValidationError as e:
# Check for error raised by model validation (not implemented in form validation)
current_app.logger.warning(f"{e}")
flash(f"{e}", "error")
return render_template("network/edit_network.html", form=form)
current_app.logger.debug(f"Trying to update: {network!r}")
try:
db.session.commit()
except sa.exc.IntegrityError as e:
db.session.rollback()
current_app.logger.warning(f"{e}")
flash(f"{e}", "error")
else:
flash(f"Network {network} updated!", "success")
return redirect(url_for("network.view_network", vlan_name=network.vlan_name))
return render_template("network/edit_network.html", form=form)
@bp.route("/_retrieve_scope_defaults/<int:scope_id>")
try:
scope = models.NetworkScope.query.get(scope_id)
except sa.exc.DataError:
current_app.logger.warning(f"Invalid scope_id: {scope_id}")
data = {
"vlans": [],
"prefixes": [],
"selected_vlan": "",
"selected_prefix": "",
"domain_id": "",
}
vlans = [vlan_id for vlan_id in scope.available_vlans()]
if vlans:
selected_vlan = vlans[0]
else:
selected_vlan = ""
prefixes = scope.prefix_range()
default_prefix = current_app.config["NETWORK_DEFAULT_PREFIX"]
if default_prefix in prefixes:
selected_prefix = default_prefix
else:
selected_prefix = prefixes[0]
data = {
"vlans": vlans,
"prefixes": prefixes,
"selected_vlan": selected_vlan,
"selected_prefix": selected_prefix,
"domain_id": scope.domain_id,
}
@bp.route("/_retrieve_subnets/<int:scope_id>/<int:prefix>")
@login_required
def retrieve_subnets(scope_id, prefix):
try:
scope = models.NetworkScope.query.get(scope_id)
except sa.exc.DataError:
current_app.logger.warning(f"Invalid scope_id: {scope_id}")
data = {"subnets": [], "selected_subnet": ""}
subnets = [subnet for subnet in scope.available_subnets(int(prefix))]
data = {"subnets": subnets, "selected_subnet": subnets[0]}
@bp.route("/_retrieve_ips/<subnet>/<int:prefix>")
@login_required
def retrieve_ips(subnet, prefix):
try:
address = ipaddress.ip_network(f"{subnet}/{prefix}")
current_app.logger.warning(f"Invalid address: {subnet}/{prefix}")
data = {
"ips": [],
"selected_first": "",
"selected_last": "",
"selected_gateway": "",
}
hosts = [str(ip) for ip in address.hosts()]
# The gateway is set to the last IP by default
gateway = hosts[-1]
if len(hosts) > 17:
first = hosts[10]
last = hosts[-6]
else:
first = hosts[0]
last = hosts[-2]
data = {
"ips": hosts,
"selected_first": first,
"selected_last": last,
"selected_gateway": gateway,
}
@bp.route("/_retrieve_groups", methods=["POST"])
return utils.retrieve_data_for_datatables(request.values, models.AnsibleGroup)
@login_required
def generate_random_mac():