Skip to content
Snippets Groups Projects
models.py 43 KiB
Newer Older
Benjamin Bertrand's avatar
Benjamin Bertrand committed
# -*- coding: utf-8 -*-
"""
app.models
~~~~~~~~~~

This module implements the models used in the app.

:copyright: (c) 2017 European Spallation Source ERIC
:license: BSD 2-Clause, see LICENSE for more details.

"""
import ipaddress
Benjamin Bertrand's avatar
Benjamin Bertrand committed
import qrcode
import urllib.parse
import sqlalchemy as sa
Benjamin Bertrand's avatar
Benjamin Bertrand committed
from enum import Enum
from sqlalchemy.ext.declarative import declared_attr
from sqlalchemy.dialects import postgresql
from sqlalchemy.orm import validates
from sqlalchemy_continuum import make_versioned, version_class
from citext import CIText
from flask import current_app
from flask_login import UserMixin, current_user
from wtforms import ValidationError
Benjamin Bertrand's avatar
Benjamin Bertrand committed
from rq import Queue
from .extensions import db, login_manager, ldap_manager, cache
from .plugins import FlaskUserPlugin
Benjamin Bertrand's avatar
Benjamin Bertrand committed
from .validators import (
    ICS_ID_RE,
    HOST_NAME_RE,
    VLAN_NAME_RE,
    MAC_ADDRESS_RE,
    DEVICE_TYPE_RE,
    TAG_RE,
)
Benjamin Bertrand's avatar
Benjamin Bertrand committed
from . import utils


make_versioned(plugins=[FlaskUserPlugin()])
# See http://docs.sqlalchemy.org/en/latest/core/compiler.html#utc-timestamp-function
class utcnow(sa.sql.expression.FunctionElement):
    type = sa.types.DateTime()


Benjamin Bertrand's avatar
Benjamin Bertrand committed
@sa.ext.compiler.compiles(utcnow, "postgresql")
def pg_utcnow(element, compiler, **kw):
Benjamin Bertrand's avatar
Benjamin Bertrand committed
    return "TIMEZONE('utc', CURRENT_TIMESTAMP)"
def temporary_ics_ids():
    """Generator that returns the full list of temporary ICS ids"""
Benjamin Bertrand's avatar
Benjamin Bertrand committed
    return (
        f'{current_app.config["TEMPORARY_ICS_ID"]}{letter}{number:0=3d}'
        for letter in string.ascii_uppercase
        for number in range(0, 1000)
    )
    """Return a set with the temporary ICS ids used"""
    temporary_items = Item.query.filter(
Benjamin Bertrand's avatar
Benjamin Bertrand committed
        Item.ics_id.startswith(current_app.config["TEMPORARY_ICS_ID"])
    ).all()
    return {item.ics_id for item in temporary_items}


def get_temporary_ics_id():
    """Return a temporary ICS id that is available"""
    used_temp_ics_ids = used_temporary_ics_ids()
    for ics_id in temporary_ics_ids():
        if ics_id not in used_temp_ics_ids:
Benjamin Bertrand's avatar
Benjamin Bertrand committed
        raise ValueError("No temporary ICS id available")
Benjamin Bertrand's avatar
Benjamin Bertrand committed
@login_manager.user_loader
Benjamin Bertrand's avatar
Benjamin Bertrand committed
@cache.memoize(timeout=1800)
Benjamin Bertrand's avatar
Benjamin Bertrand committed
def load_user(user_id):
    """User loader callback for flask-login

    :param str user_id: unicode ID of a user
    :returns: corresponding user object or None
Benjamin Bertrand's avatar
Benjamin Bertrand committed
    """
    return User.query.get(int(user_id))


@ldap_manager.save_user
def save_user(dn, username, data, memberships):
    """User saver for flask-ldap3-login

    This method is called whenever a LDAPLoginForm()
    successfully validates.
    """
    user = User.query.filter_by(username=username).first()
    if user is None:
Benjamin Bertrand's avatar
Benjamin Bertrand committed
        user = User(
            username=username,
            display_name=utils.attribute_to_string(data["cn"]),
            email=utils.attribute_to_string(data["mail"]),
        )
    # Always update the user groups to keep them up-to-date
Benjamin Bertrand's avatar
Benjamin Bertrand committed
    user.groups = sorted(
        [utils.attribute_to_string(group["cn"]) for group in memberships]
    )
    db.session.add(user)
    db.session.commit()
Benjamin Bertrand's avatar
Benjamin Bertrand committed
    return user


# Tables required for Many-to-Many relationships between users and favorites attributes
favorite_manufacturers_table = db.Table(
Benjamin Bertrand's avatar
Benjamin Bertrand committed
    "favorite_manufacturers",
    db.Column(
        "user_id", db.Integer, db.ForeignKey("user_account.id"), primary_key=True
    ),
    db.Column(
        "manufacturer_id",
        db.Integer,
        db.ForeignKey("manufacturer.id"),
        primary_key=True,
    ),
)
favorite_models_table = db.Table(
Benjamin Bertrand's avatar
Benjamin Bertrand committed
    "favorite_models",
    db.Column(
        "user_id", db.Integer, db.ForeignKey("user_account.id"), primary_key=True
    ),
    db.Column("model_id", db.Integer, db.ForeignKey("model.id"), primary_key=True),
)
favorite_locations_table = db.Table(
Benjamin Bertrand's avatar
Benjamin Bertrand committed
    "favorite_locations",
    db.Column(
        "user_id", db.Integer, db.ForeignKey("user_account.id"), primary_key=True
    ),
    db.Column(
        "location_id", db.Integer, db.ForeignKey("location.id"), primary_key=True
    ),
)
favorite_statuses_table = db.Table(
Benjamin Bertrand's avatar
Benjamin Bertrand committed
    "favorite_statuses",
    db.Column(
        "user_id", db.Integer, db.ForeignKey("user_account.id"), primary_key=True
    ),
    db.Column("status_id", db.Integer, db.ForeignKey("status.id"), primary_key=True),
)
favorite_actions_table = db.Table(
Benjamin Bertrand's avatar
Benjamin Bertrand committed
    "favorite_actions",
    db.Column(
        "user_id", db.Integer, db.ForeignKey("user_account.id"), primary_key=True
    ),
    db.Column("action_id", db.Integer, db.ForeignKey("action.id"), primary_key=True),
Benjamin Bertrand's avatar
Benjamin Bertrand committed
class User(db.Model, UserMixin):
    # "user" is a reserved word in postgresql
    # so let's use another name
Benjamin Bertrand's avatar
Benjamin Bertrand committed
    __tablename__ = "user_account"
Benjamin Bertrand's avatar
Benjamin Bertrand committed

    id = db.Column(db.Integer, primary_key=True)
    username = db.Column(db.Text, nullable=False, unique=True)
    display_name = db.Column(db.Text, nullable=False)
    email = db.Column(db.Text)
    groups = db.Column(postgresql.ARRAY(db.Text), default=[])
Benjamin Bertrand's avatar
Benjamin Bertrand committed
    tokens = db.relationship("Token", backref="user")
Benjamin Bertrand's avatar
Benjamin Bertrand committed
    tasks = db.relationship("Task", backref="user")
    # The favorites won't be accessed very often so we load them
    # only when necessary (lazy=True)
    favorite_manufacturers = db.relationship(
Benjamin Bertrand's avatar
Benjamin Bertrand committed
        "Manufacturer",
        secondary=favorite_manufacturers_table,
        lazy=True,
Benjamin Bertrand's avatar
Benjamin Bertrand committed
        backref=db.backref("favorite_users", lazy=True),
    )
    favorite_models = db.relationship(
Benjamin Bertrand's avatar
Benjamin Bertrand committed
        "Model",
        secondary=favorite_models_table,
        lazy=True,
Benjamin Bertrand's avatar
Benjamin Bertrand committed
        backref=db.backref("favorite_users", lazy=True),
    )
    favorite_locations = db.relationship(
Benjamin Bertrand's avatar
Benjamin Bertrand committed
        "Location",
        secondary=favorite_locations_table,
        lazy=True,
Benjamin Bertrand's avatar
Benjamin Bertrand committed
        backref=db.backref("favorite_users", lazy=True),
    )
    favorite_statuses = db.relationship(
Benjamin Bertrand's avatar
Benjamin Bertrand committed
        "Status",
        secondary=favorite_statuses_table,
        lazy=True,
Benjamin Bertrand's avatar
Benjamin Bertrand committed
        backref=db.backref("favorite_users", lazy=True),
    )
    favorite_actions = db.relationship(
Benjamin Bertrand's avatar
Benjamin Bertrand committed
        "Action",
        secondary=favorite_actions_table,
        lazy=True,
Benjamin Bertrand's avatar
Benjamin Bertrand committed
        backref=db.backref("favorite_users", lazy=True),
    )
Benjamin Bertrand's avatar
Benjamin Bertrand committed
    def get_id(self):
        """Return the user id as unicode

        Required by flask-login
        """
        return str(self.id)

    @property
    def csentry_groups(self):
        groups = []
Benjamin Bertrand's avatar
Benjamin Bertrand committed
        for key, values in current_app.config["CSENTRY_LDAP_GROUPS"].items():
            for value in values:
                if value in self.groups:
                    groups.append(key)
Benjamin Bertrand's avatar
Benjamin Bertrand committed
    @property
    def is_admin(self):
Benjamin Bertrand's avatar
Benjamin Bertrand committed
        for group in current_app.config["CSENTRY_LDAP_GROUPS"]["admin"]:
            if group in self.groups:
                return True
        return False

    def is_member_of_one_group(self, groups):
        """Return True if the user is at least member of one of the given groups"""
        names = []
        for group in groups:
Benjamin Bertrand's avatar
Benjamin Bertrand committed
            names.extend(current_app.config["CSENTRY_LDAP_GROUPS"].get(group))
        return bool(set(self.groups) & set(names))

    def favorite_attributes(self):
        """Return all user's favorite attributes"""
Benjamin Bertrand's avatar
Benjamin Bertrand committed
        favorites_list = [
            self.favorite_manufacturers,
            self.favorite_models,
            self.favorite_locations,
            self.favorite_statuses,
            self.favorite_actions,
        ]
        return [favorite for favorites in favorites_list for favorite in favorites]

Benjamin Bertrand's avatar
Benjamin Bertrand committed
    def launch_task(self, name, func, *args, **kwargs):
        """Launch a task in the background using RQ

        The task is added to the session but not committed.
        """
Benjamin Bertrand's avatar
Benjamin Bertrand committed
        q = Queue()
Benjamin Bertrand's avatar
Benjamin Bertrand committed
        job = q.enqueue(f"app.tasks.{func}", *args, **kwargs)
Benjamin Bertrand's avatar
Benjamin Bertrand committed
        # The status will be set to QUEUED or DEFERRED
Benjamin Bertrand's avatar
Benjamin Bertrand committed
        task = Task(
            id=job.id,
            name=name,
            command=job.get_call_string(),
            status=JobStatus(job.status),
            user=self,
        )
Benjamin Bertrand's avatar
Benjamin Bertrand committed
        db.session.add(task)
        return task

    def get_tasks(self, all=False):
        """Return all tasks created by the current user

        If the user is admin and all is set to True, will return all tasks
        """
        if all and self.is_admin:
            return Task.query.order_by(Task.created_at).all()
        return Task.query.filter_by(user=self).order_by(Task.created_at).all()

    def get_tasks_in_progress(self, name):
        """Return all the <name> tasks not finished or failed"""
Benjamin Bertrand's avatar
Benjamin Bertrand committed
        return (
            Task.query.filter_by(name=name)
            .filter(~Task.status.in_([JobStatus.FINISHED, JobStatus.FAILED]))
            .all()
        )
Benjamin Bertrand's avatar
Benjamin Bertrand committed

    def get_task_started(self, name):
        """Return the <name> task currently running or None"""
        return Task.query.filter_by(name=name, status=JobStatus.STARTED).first()

    def is_task_waiting(self, name):
        """Return True if a <name> task is waiting (queued or deferred)"""
Benjamin Bertrand's avatar
Benjamin Bertrand committed
        count = (
            Task.query.filter_by(name=name)
            .filter(Task.status.in_([JobStatus.DEFERRED, JobStatus.QUEUED]))
            .count()
        )
Benjamin Bertrand's avatar
Benjamin Bertrand committed
        return count > 0

Benjamin Bertrand's avatar
Benjamin Bertrand committed
    def __str__(self):

    def to_dict(self):
        return {
Benjamin Bertrand's avatar
Benjamin Bertrand committed
            "id": self.id,
            "username": self.username,
            "display_name": self.display_name,
            "email": self.email,
            "groups": self.csentry_groups,
Benjamin Bertrand's avatar
Benjamin Bertrand committed
class Token(db.Model):
    """Table to store valid tokens"""
Benjamin Bertrand's avatar
Benjamin Bertrand committed
    id = db.Column(db.Integer, primary_key=True)
    jti = db.Column(postgresql.UUID, nullable=False)
    token_type = db.Column(db.Text, nullable=False)
Benjamin Bertrand's avatar
Benjamin Bertrand committed
    user_id = db.Column(db.Integer, db.ForeignKey("user_account.id"), nullable=False)
Benjamin Bertrand's avatar
Benjamin Bertrand committed
    issued_at = db.Column(db.DateTime, nullable=False)
    # expires can be set to None for tokens that never expire
    expires = db.Column(db.DateTime)
    description = db.Column(db.Text)

Benjamin Bertrand's avatar
Benjamin Bertrand committed
    __table_args__ = (sa.UniqueConstraint(jti, user_id),)
Benjamin Bertrand's avatar
Benjamin Bertrand committed

    def __str__(self):
        return self.jti


Benjamin Bertrand's avatar
Benjamin Bertrand committed
class QRCodeMixin:
    id = db.Column(db.Integer, primary_key=True)
    name = db.Column(CIText, nullable=False, unique=True)
    description = db.Column(db.Text)
Benjamin Bertrand's avatar
Benjamin Bertrand committed

    def image(self):
        """Return a QRCode image to identify a record

        The QRCode includes:
             - CSE prefix
             - the table name
             - the name of the record
        """
Benjamin Bertrand's avatar
Benjamin Bertrand committed
        data = ":".join(["CSE", self.__tablename__, self.name])
Benjamin Bertrand's avatar
Benjamin Bertrand committed
        return qrcode.make(data, version=1, box_size=5)

    @cache.memoize(timeout=0)
    def base64_image(self):
        """Return the QRCode image as base64 string"""
        return utils.image_to_base64(self.image())

    def is_user_favorite(self):
        """Return True if the attribute is part of the current user favorites"""
        return current_user in self.favorite_users
Benjamin Bertrand's avatar
Benjamin Bertrand committed
    def __str__(self):
        return self.name

    def __repr__(self):
        # The cache.memoize decorator performs a repr() on the passed in arguments
        # __repr__ is used as part of the cache key and shall be a uniquely identifying string
        # See https://flask-caching.readthedocs.io/en/latest/#memoization
Benjamin Bertrand's avatar
Benjamin Bertrand committed
        return f"{self.__class__.__name__}(id={self.id}, name={self.name})"
    def to_dict(self):
        return {
Benjamin Bertrand's avatar
Benjamin Bertrand committed
            "id": self.id,
            "name": self.name,
            "description": self.description,
            "qrcode": self.base64_image(),
Benjamin Bertrand's avatar
Benjamin Bertrand committed

class Action(QRCodeMixin, db.Model):
class Manufacturer(QRCodeMixin, db.Model):
Benjamin Bertrand's avatar
Benjamin Bertrand committed
    items = db.relationship("Item", back_populates="manufacturer")
Benjamin Bertrand's avatar
Benjamin Bertrand committed


class Model(QRCodeMixin, db.Model):
Benjamin Bertrand's avatar
Benjamin Bertrand committed
    items = db.relationship("Item", back_populates="model")
Benjamin Bertrand's avatar
Benjamin Bertrand committed


class Location(QRCodeMixin, db.Model):
Benjamin Bertrand's avatar
Benjamin Bertrand committed
    items = db.relationship("Item", back_populates="location")
Benjamin Bertrand's avatar
Benjamin Bertrand committed


class Status(QRCodeMixin, db.Model):
Benjamin Bertrand's avatar
Benjamin Bertrand committed
    items = db.relationship("Item", back_populates="status")
class CreatedMixin:
    id = db.Column(db.Integer, primary_key=True)
    created_at = db.Column(db.DateTime, default=utcnow())
    updated_at = db.Column(db.DateTime, default=utcnow(), onupdate=utcnow())

    # Using ForeignKey and relationship in mixin requires the @declared_attr decorator
    # See http://docs.sqlalchemy.org/en/latest/orm/extensions/declarative/mixins.html
    @declared_attr
    def user_id(cls):
Benjamin Bertrand's avatar
Benjamin Bertrand committed
        return db.Column(
            db.Integer,
            db.ForeignKey("user_account.id"),
            nullable=False,
            default=utils.fetch_current_user_id,
        )
Benjamin Bertrand's avatar
Benjamin Bertrand committed
        return db.relationship("User")
    def __init__(self, **kwargs):
        # Automatically convert created_at/updated_at strings
        # to datetime object
Benjamin Bertrand's avatar
Benjamin Bertrand committed
        for key in ("created_at", "updated_at"):
            if key in kwargs:
                if isinstance(kwargs[key], str):
                    kwargs[key] = utils.parse_to_utc(kwargs[key])
        super().__init__(**kwargs)

    def to_dict(self):
        return {
Benjamin Bertrand's avatar
Benjamin Bertrand committed
            "id": self.id,
            "created_at": utils.format_field(self.created_at),
            "updated_at": utils.format_field(self.updated_at),
            "user": str(self.user),

class Item(CreatedMixin, db.Model):
Benjamin Bertrand's avatar
Benjamin Bertrand committed
        "exclude": [
            "created_at",
            "user_id",
            "ics_id",
            "serial_number",
            "manufacturer_id",
            "model_id",
        ]
    # WARNING! Inheriting id from CreatedMixin doesn't play well with
    # SQLAlchemy-Continuum. It has to be defined here.
Benjamin Bertrand's avatar
Benjamin Bertrand committed
    id = db.Column(db.Integer, primary_key=True)
Benjamin Bertrand's avatar
Benjamin Bertrand committed
    ics_id = db.Column(
        db.Text, unique=True, nullable=False, index=True, default=get_temporary_ics_id
    )
    serial_number = db.Column(db.Text, nullable=False)
    quantity = db.Column(db.Integer, nullable=False, default=1)
Benjamin Bertrand's avatar
Benjamin Bertrand committed
    manufacturer_id = db.Column(db.Integer, db.ForeignKey("manufacturer.id"))
    model_id = db.Column(db.Integer, db.ForeignKey("model.id"))
    location_id = db.Column(db.Integer, db.ForeignKey("location.id"))
    status_id = db.Column(db.Integer, db.ForeignKey("status.id"))
    parent_id = db.Column(db.Integer, db.ForeignKey("item.id"))
    host_id = db.Column(db.Integer, db.ForeignKey("host.id"))
    stack_member = db.Column(db.SmallInteger)
Benjamin Bertrand's avatar
Benjamin Bertrand committed

Benjamin Bertrand's avatar
Benjamin Bertrand committed
    manufacturer = db.relationship("Manufacturer", back_populates="items")
    model = db.relationship("Model", back_populates="items")
    location = db.relationship("Location", back_populates="items")
    status = db.relationship("Status", back_populates="items")
    children = db.relationship("Item", backref=db.backref("parent", remote_side=[id]))
    macs = db.relationship("Mac", backref="item")
    comments = db.relationship("ItemComment", backref="item")
Benjamin Bertrand's avatar
Benjamin Bertrand committed

    __table_args__ = (
Benjamin Bertrand's avatar
Benjamin Bertrand committed
        sa.CheckConstraint(
            "stack_member >= 0 AND stack_member <=9", name="stack_member_range"
        ),
        sa.UniqueConstraint(host_id, stack_member, name="uq_item_host_id_stack_member"),
    def __init__(self, **kwargs):
        # Automatically convert manufacturer/model/location/status to an
        # instance of their class if passed as a string
Benjamin Bertrand's avatar
Benjamin Bertrand committed
        for key, cls in [
            ("manufacturer", Manufacturer),
            ("model", Model),
            ("location", Location),
            ("status", Status),
        ]:
            if key in kwargs:
                kwargs[key] = utils.convert_to_model(kwargs[key], cls)
        super().__init__(**kwargs)
Benjamin Bertrand's avatar
Benjamin Bertrand committed

    def __str__(self):
Benjamin Bertrand's avatar
Benjamin Bertrand committed
    @validates("ics_id")
    def validate_ics_id(self, key, string):
        """Ensure the ICS id field matches the required format"""
        if string is not None:
            if ICS_ID_RE.fullmatch(string) is None:
Benjamin Bertrand's avatar
Benjamin Bertrand committed
                raise ValidationError("ICS id shall match [A-Z]{3}[0-9]{3}")
    def to_dict(self):
        d = super().to_dict()
Benjamin Bertrand's avatar
Benjamin Bertrand committed
        d.update(
            {
                "ics_id": self.ics_id,
                "serial_number": self.serial_number,
                "quantity": self.quantity,
                "manufacturer": utils.format_field(self.manufacturer),
                "model": utils.format_field(self.model),
                "location": utils.format_field(self.location),
                "status": utils.format_field(self.status),
                "parent": utils.format_field(self.parent),
                "children": [str(child) for child in self.children],
                "macs": [str(mac) for mac in self.macs],
                "host": utils.format_field(self.host),
                "stack_member": utils.format_field(self.stack_member),
                "history": self.history(),
                "comments": [str(comment) for comment in self.comments],
            }
        )
    def to_row_dict(self):
        """Convert to a dict that can easily be exported to an excel row

        All values should be a string
        """
        d = self.to_dict().copy()
        d["children"] = " ".join(d["children"])
        d["macs"] = " ".join(d["macs"])
        d["comments"] = "\n\n".join(d["comments"])
        d["history"] = "\n".join([str(version) for version in d["history"]])
        return d

    def history(self):
        versions = []
        for version in self.versions:
            # parent is an attribute used by SQLAlchemy-Continuum
            # version.parent refers to an ItemVersion instance (and has no link with
            # the item parent_id)
            # We need to retrieve the parent "manually"
            if version.parent_id is None:
                parent = None
            else:
                parent = Item.query.get(version.parent_id)
Benjamin Bertrand's avatar
Benjamin Bertrand committed
            versions.append(
                {
                    "updated_at": utils.format_field(version.updated_at),
                    "quantity": version.quantity,
                    "location": utils.format_field(version.location),
                    "status": utils.format_field(version.status),
                    "parent": utils.format_field(parent),
                }
            )
        return versions
class ItemComment(CreatedMixin, db.Model):
    body = db.Column(db.Text, nullable=False)
Benjamin Bertrand's avatar
Benjamin Bertrand committed
    item_id = db.Column(db.Integer, db.ForeignKey("item.id"), nullable=False)
    def __str__(self):
        return self.body

    def to_dict(self):
        d = super().to_dict()
Benjamin Bertrand's avatar
Benjamin Bertrand committed
        d.update({"body": self.body, "item": str(self.item)})
class Network(CreatedMixin, db.Model):
    vlan_name = db.Column(CIText, nullable=False, unique=True)
    vlan_id = db.Column(db.Integer, nullable=False, unique=True)
    address = db.Column(postgresql.CIDR, nullable=False, unique=True)
    first_ip = db.Column(postgresql.INET, nullable=False, unique=True)
    last_ip = db.Column(postgresql.INET, nullable=False, unique=True)
    description = db.Column(db.Text)
    admin_only = db.Column(db.Boolean, nullable=False, default=False)
Benjamin Bertrand's avatar
Benjamin Bertrand committed
    scope_id = db.Column(db.Integer, db.ForeignKey("network_scope.id"), nullable=False)
    domain_id = db.Column(db.Integer, db.ForeignKey("domain.id"), nullable=False)
Benjamin Bertrand's avatar
Benjamin Bertrand committed
    interfaces = db.relationship("Interface", backref="network")

    __table_args__ = (
Benjamin Bertrand's avatar
Benjamin Bertrand committed
        sa.CheckConstraint("first_ip < last_ip", name="first_ip_less_than_last_ip"),
        sa.CheckConstraint("first_ip << address", name="first_ip_in_network"),
        sa.CheckConstraint("last_ip << address", name="last_ip_in_network"),
    def __init__(self, **kwargs):
        # Automatically convert scope to an instance of NetworkScope if it was passed
        # as a string
Benjamin Bertrand's avatar
Benjamin Bertrand committed
        if "scope" in kwargs:
            kwargs["scope"] = utils.convert_to_model(
                kwargs["scope"], NetworkScope, "name"
            )
Benjamin Bertrand's avatar
Benjamin Bertrand committed
            # If domain_id is not passed, we set it to the network scope value
Benjamin Bertrand's avatar
Benjamin Bertrand committed
            if "domain_id" not in kwargs:
                kwargs["domain_id"] = kwargs["scope"].domain_id
        super().__init__(**kwargs)

    def __str__(self):
        return str(self.vlan_name)
    @property
    def network_ip(self):
        return ipaddress.ip_network(self.address)
    @property
    def netmask(self):
        return self.network_ip.netmask

    def first(self):
        return ipaddress.ip_address(self.first_ip)
    def last(self):
        return ipaddress.ip_address(self.last_ip)

    def ip_range(self):
        """Return the list of IP addresses that can be assigned for this network

        The range is defined by the first and last IP
        """
Benjamin Bertrand's avatar
Benjamin Bertrand committed
        return [
            addr for addr in self.network_ip.hosts() if self.first <= addr <= self.last
        ]

    def used_ips(self):
        """Return the list of IP addresses in use

        The list is sorted
        """
        return sorted(interface.address for interface in self.interfaces)

    def available_ips(self):
        """Return the list of IP addresses available"""
Benjamin Bertrand's avatar
Benjamin Bertrand committed
        return [addr for addr in self.ip_range() if addr not in self.used_ips()]
    def gateway(self):
        """Return the network gateway IP"""
        return list(self.network_ip.hosts())[-1]
    def ip_in_network(ip, address):
        """Ensure the IP is in the network

        :param str user_id: unicode ID of a user
        :returns: a tuple with the IP and network as (IPv4Address, IPv4Network)
        :raises: ValidationError if the IP is not in the network
        """
        addr = ipaddress.ip_address(ip)
        net = ipaddress.ip_network(address)
Benjamin Bertrand's avatar
Benjamin Bertrand committed
            raise ValidationError(f"IP address {ip} is not in network {address}")
Benjamin Bertrand's avatar
Benjamin Bertrand committed
    @validates("first_ip")
    def validate_first_ip(self, key, ip):
        """Ensure the first IP is in the network"""
        self.ip_in_network(ip, self.address)
Benjamin Bertrand's avatar
Benjamin Bertrand committed
    @validates("last_ip")
    def validate_last_ip(self, key, ip):
        """Ensure the last IP is in the network and greater than first_ip"""
        addr, net = self.ip_in_network(ip, self.address)
        if addr < self.first:
Benjamin Bertrand's avatar
Benjamin Bertrand committed
            raise ValidationError(
                f"Last IP address {ip} is less than the first address {self.first}"
            )
Benjamin Bertrand's avatar
Benjamin Bertrand committed
    @validates("interfaces")
    def validate_interfaces(self, key, interface):
        """Ensure the interface IP is in the network range"""
        addr, net = self.ip_in_network(interface.ip, self.address)
        # Admin user can create IP outside the defined range
        try:
            # current_user is a local proxy and is not
            # valid outside of a request context.
            is_admin = current_user.is_admin
        except AttributeError:
            is_admin = False
        if not is_admin:
            if addr < self.first or addr > self.last:
Benjamin Bertrand's avatar
Benjamin Bertrand committed
                raise ValidationError(
                    f"IP address {interface.ip} is not in range {self.first} - {self.last}"
                )
        return interface
Benjamin Bertrand's avatar
Benjamin Bertrand committed
    @validates("vlan_name")
    def validate_vlan_name(self, key, string):
        """Ensure the name matches the required format"""
        if string is None:
            return None
        if VLAN_NAME_RE.fullmatch(string) is None:
Benjamin Bertrand's avatar
Benjamin Bertrand committed
            raise ValidationError("Vlan name shall match [A-Za-z0-9\-]{3,25}")
    def to_dict(self):
        d = super().to_dict()
Benjamin Bertrand's avatar
Benjamin Bertrand committed
        d.update(
            {
                "vlan_name": self.vlan_name,
                "vlan_id": self.vlan_id,
                "address": self.address,
                "netmask": str(self.netmask),
                "first_ip": self.first_ip,
                "last_ip": self.last_ip,
                "description": self.description,
                "admin_only": self.admin_only,
                "scope": utils.format_field(self.scope),
                "domain": str(self.domain),
                "interfaces": [str(interface) for interface in self.interfaces],
            }
        )
        return d
# Table required for Many-to-Many relationships between interfaces and tags
interfacetags_table = db.Table(
Benjamin Bertrand's avatar
Benjamin Bertrand committed
    "interfacetags",
    db.Column("tag_id", db.Integer, db.ForeignKey("tag.id"), primary_key=True),
    db.Column(
        "interface_id", db.Integer, db.ForeignKey("interface.id"), primary_key=True
    ),
Benjamin Bertrand's avatar
Benjamin Bertrand committed
)


class Tag(QRCodeMixin, db.Model):
    admin_only = db.Column(db.Boolean, nullable=False, default=False)
Benjamin Bertrand's avatar
Benjamin Bertrand committed

Benjamin Bertrand's avatar
Benjamin Bertrand committed
    @validates("name")
    def validate_name(self, key, string):
        """Ensure the name field matches the required format"""
        if string is not None:
            if TAG_RE.fullmatch(string) is None:
                raise ValidationError(f"'{string}' is an invalid tag name")
        return string

Benjamin Bertrand's avatar
Benjamin Bertrand committed

class DeviceType(db.Model):
Benjamin Bertrand's avatar
Benjamin Bertrand committed
    __tablename__ = "device_type"
    id = db.Column(db.Integer, primary_key=True)
    name = db.Column(CIText, nullable=False, unique=True)

Benjamin Bertrand's avatar
Benjamin Bertrand committed
    hosts = db.relationship("Host", backref="device_type")
Benjamin Bertrand's avatar
Benjamin Bertrand committed
    @validates("name")
    def validate_name(self, key, string):
        """Ensure the name field matches the required format"""
        if string is not None:
            if DEVICE_TYPE_RE.fullmatch(string) is None:
                raise ValidationError(f"'{string}' is an invalid device type name")
        return string

    def __str__(self):
        return self.name

    def to_dict(self):
        return {
Benjamin Bertrand's avatar
Benjamin Bertrand committed
            "id": self.id,
            "name": self.name,
            "hosts": [str(host) for host in self.hosts],
# Table required for Many-to-Many relationships between Ansible parent and child groups
ansible_groups_parent_child_table = db.Table(
    "ansible_groups_parent_child",
    db.Column(
        "parent_group_id",
        db.Integer,
        db.ForeignKey("ansible_group.id"),
        primary_key=True,
    ),
    db.Column(
        "child_group_id",
        db.Integer,
        db.ForeignKey("ansible_group.id"),
        primary_key=True,
    ),
)


# Table required for Many-to-Many relationships between Ansible groups and hosts
ansible_groups_hosts_table = db.Table(
    "ansible_groups_hosts",
    db.Column(
        "ansible_group_id",
        db.Integer,
        db.ForeignKey("ansible_group.id"),
        primary_key=True,
    ),
    db.Column("host_id", db.Integer, db.ForeignKey("host.id"), primary_key=True),
)


class AnsibleGroupType(Enum):
    STATIC = "STATIC"
    NETWORK_SCOPE = "NETWORK_SCOPE"
    NETWORK = "NETWORK"
    DEVICE_TYPE = "DEVICE_TYPE"

    def __str__(self):
        return self.name

    @classmethod
    def choices(cls):
        return [(item, item.name) for item in AnsibleGroupType]

    @classmethod
    def coerce(cls, value):
        return value if type(value) == AnsibleGroupType else AnsibleGroupType[value]


class AnsibleGroup(CreatedMixin, db.Model):
    __versioned__ = {}
    __tablename__ = "ansible_group"
    # Define id here so that it can be used in the primary and secondary join
    id = db.Column(db.Integer, primary_key=True)
    name = db.Column(CIText, nullable=False, unique=True)
    vars = db.Column(postgresql.JSONB)
    type = db.Column(
        db.Enum(AnsibleGroupType, name="ansible_group_type"),
        default=AnsibleGroupType.STATIC,
        nullable=False,
    )
    children = db.relationship(
        "AnsibleGroup",
        secondary=ansible_groups_parent_child_table,
        primaryjoin=id == ansible_groups_parent_child_table.c.parent_group_id,
        secondaryjoin=id == ansible_groups_parent_child_table.c.child_group_id,
        backref=db.backref("parents"),
    )

    def __str__(self):
        return str(self.name)

    @property
    def is_dynamic(self):
        return self.type != AnsibleGroupType.STATIC

    @property
    def hosts(self):
        if self.type == AnsibleGroupType.STATIC:
            return self._hosts
        if self.type == AnsibleGroupType.NETWORK_SCOPE:
            return (
                Host.query.join(Host.interfaces)
                .join(Interface.network)
                .join(Network.scope)
                .filter(NetworkScope.name == self.name)
                .order_by(Host.name)
                .all()
            )
        if self.type == AnsibleGroupType.NETWORK:
            return (
                Host.query.join(Host.interfaces)
                .join(Interface.network)
                .filter(Network.vlan_name == self.name)
                .order_by(Host.name)
                .all()
            )
        if self.type == AnsibleGroupType.DEVICE_TYPE:
            return (
                Host.query.join(Host.device_type)
                .filter(DeviceType.name == self.name)
                .order_by(Host.name)
                .all()
            )

    @hosts.setter
    def hosts(self, value):
        # For dynamic group type, _hosts can only be set to []
        if self.is_dynamic and value:
            raise AttributeError("can't set dynamic hosts")
        self._hosts = value

    def to_dict(self):
        d = super().to_dict()
        d.update(
            {
                "name": self.name,
                "type": self.type.name,
                "hosts": [str(host) for host in self.hosts],
                "children": [str(child) for child in self.children],
class Host(CreatedMixin, db.Model):
    __versioned__ = {}

    # id shall be defined here to be used by SQLAlchemy-Continuum
    id = db.Column(db.Integer, primary_key=True)
    name = db.Column(db.Text, nullable=False, unique=True)
    description = db.Column(db.Text)
Benjamin Bertrand's avatar
Benjamin Bertrand committed
    device_type_id = db.Column(
        db.Integer, db.ForeignKey("device_type.id"), nullable=False
    )
    ansible_vars = db.Column(postgresql.JSONB)
    # Set cascade to all (to add delete) and delete-orphan to delete all interfaces
    # when deleting a host
    interfaces = db.relationship(
        "Interface", backref="host", cascade="all, delete-orphan"
    )
Benjamin Bertrand's avatar
Benjamin Bertrand committed
    items = db.relationship("Item", backref="host")
    ansible_groups = db.relationship(
        "AnsibleGroup",
        secondary=ansible_groups_hosts_table,
        lazy=True,
        backref=db.backref("_hosts"),
    def __init__(self, **kwargs):
        # Automatically convert device_type as an instance of its class if passed as a string
Benjamin Bertrand's avatar
Benjamin Bertrand committed
        if "device_type" in kwargs:
            kwargs["device_type"] = utils.convert_to_model(
                kwargs["device_type"], DeviceType
            )
        # Automatically convert items to a list of instances if passed as a list of ics_id
Benjamin Bertrand's avatar
Benjamin Bertrand committed
        if "items" in kwargs:
            kwargs["items"] = [
                utils.convert_to_model(item, Item, filter="ics_id")
                for item in kwargs["items"]
            ]
        # Automatically convert ansible groups to a list of instances if passed as a list of strings
        if "ansible_groups" in kwargs:
            kwargs["ansible_groups"] = [
                utils.convert_to_model(group, AnsibleGroup)
                for group in kwargs["ansible_groups"]
            ]
        super().__init__(**kwargs)

Benjamin Bertrand's avatar
Benjamin Bertrand committed
    @property
    def is_ioc(self):
        for interface in self.interfaces:
            if interface.is_ioc:
                return True
        return False

    @property
    def model(self):
        """Return the model of the first linked item"""
        try:
            return utils.format_field(self.items[0].model)
        except IndexError:
            return None

    @property
    def main_interface(self):
        """Return the host main interface

        The main interface is the one that has the same name as the host
        or the first one found
        """
        for interface in self.interfaces:
            if interface.name == self.name:
                return interface
        # No interface with the same name found...
        # Return the first one
        try:
            return self.interfaces[0]
        except IndexError:
            return None

    @property
    def fqdn(self):
        """Return the host fully qualified domain name

        The domain is based on the main interface
        """
        if self.main_interface:
            return f"{self.name}.{self.main_interface.network.domain}"
        else:
            return self.name

    def __str__(self):
        return str(self.name)

Benjamin Bertrand's avatar
Benjamin Bertrand committed
    @validates("name")
    def validate_name(self, key, string):
        """Ensure the name matches the required format"""
        if string is None:
            return None
        # Force the string to lowercase
        lower_string = string.lower()
        if HOST_NAME_RE.fullmatch(lower_string) is None:
Benjamin Bertrand's avatar
Benjamin Bertrand committed
            raise ValidationError("Interface name shall match [a-z0-9\-]{2,20}")
        return lower_string

    def stack_members(self):
        """Return all items part of the stack sorted by stack member number"""
        members = [item for item in self.items if item.stack_member is not None]
        return sorted(members, key=lambda x: x.stack_member)

    def stack_members_numbers(self):
        """Return the list of stack member numbers"""
        return [item.stack_member for item in self.stack_members()]