Source code for renga_deployer.engines

# -*- coding: utf-8 -*-
# Copyright 2017 - Swiss Data Science Center (SDSC)
# A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and
# Eidgenössische Technische Hochschule Zürich (ETHZ).
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.
"""Engine sub-module."""

import logging
import os
import re
import shlex
import time
from enum import Enum
from functools import wraps

from flask import abort, current_app
from werkzeug.exceptions import NotFound
from werkzeug.utils import cached_property

from renga_deployer.serializers import ContextSchema, ExecutionSchema

from .models import Context, Execution, ExecutionStates
from .utils import decode_bytes, resource_available

context_schema = ContextSchema()
execution_schema = ExecutionSchema()

[docs]class Engine(object): """Base engine class."""
[docs] def launch(self, context, **kwargs): """Create new execution environment for a given context.""" raise NotImplemented
[docs] def stop(self, execution, remove=False): """Stop an execution.""" raise NotImplemented
[docs] def get_logs(self, execution): """Extract logs for a container.""" raise NotImplemented
[docs] def get_host_port(self, execution): """Retrieve the host/port where the application can be reached.""" raise NotImplemented
[docs] def get_execution_environment(self, execution) -> dict: """Retrieve the environment specified for an execution container.""" raise NotImplemented
[docs] def get_state(self, execution): """Check the state of an execution.""" raise NotImplemented
[docs]class DockerEngine(Engine): """Class for deploying contexts on docker."""
[docs] class EXECUTION_STATE_MAPPING(Enum): """State mappings for the Docker engine.""" running = ExecutionStates.RUNNING exited = ExecutionStates.EXITED restarting = ExecutionStates.UNAVAILABLE paused = ExecutionStates.UNAVAILABLE
def __init__(self): """Initialize the docker engine.""" import docker self._docker = docker
[docs] @cached_property def logger(self): """Create a logger instance.""" return logging.getLogger('renga.deployer.engines.docker')
[docs] @cached_property def client(self): """Create a docker client from local environment.""" return self._docker.from_env()
[docs] def launch(self, execution, **kwargs): """Launch a docker container with the context image.""" context = execution.context execution.environment.setdefault( 'DEPLOYER_BASE_URL', current_app.config.get('DEPLOYER_DEFAULT_BASE_URL') ) if context.spec.get('ports'): ports = {port: None for port in context.spec.get('ports')} else: ports = None # Fix an unexpected behaviour of the python docker client which # leads to all images being downloaded when no tag is specified. image = context.spec['image'] if ':' not in image: image += ':latest' container = image=image, ports=ports, command=context.spec.get('command'), detach=True, environment=execution.environment or None) 'Launched container for execution {1} of context {0}'.format(,, extra={ 'container_attrs': container.attrs, 'execution': execution_schema.dump(execution).data, 'context': context_schema.dump(execution.context).data }) execution.engine_id = return execution
[docs] def stop(self, execution, remove=False): """Stop a running container, optionally removing it.""" from docker.errors import NotFound try: container = self.client.containers.get(execution.engine_id) except NotFound: return execution container.stop() if remove: container.remove() 'Stopped execution {0} of context {1}'.format(,, extra={ 'container_attrs': container.attrs, 'execution': execution_schema.dump(execution).data, 'context': context_schema.dump(execution.context).data }) return execution
[docs] def get_logs(self, execution): """Extract logs for a container.""" try: return decode_bytes( self.client.containers.get(execution.engine_id).logs)() except self._docker.errors.NotFound: # FIXME: implement proper exception handling and propagation raise NotFound('Execution container not found.')
[docs] def get_host_ports(self, execution): """Return host ip and port bindings for the running execution.""" if not execution.check_state(ExecutionStates.RUNNING, self): return {'ports': []} container = self.client.containers.get(execution.engine_id) port_bindings = container.attrs['NetworkSettings'].get('Ports', {}) return { 'ports': [{ 'specified': container_port.split('/')[0], 'protocol': container_port.split('/')[1].upper(), 'host': current_app.config[ 'DEPLOYER_DOCKER_CONTAINER_IP'] or host_spec['HostIp'], 'exposed': host_spec['HostPort'], } for container_port, host_specs in port_bindings.items() for host_spec in host_specs], }
[docs] def get_execution_environment(self, execution) -> dict: """Retrieve the environment specified for an execution container.""" container = self.client.containers.get(execution.engine_id) return { k: v for ( k, v) in [e.split('=') for e in container.attrs['Config']['Env']] }
[docs] def get_state(self, execution): """Return the status of an execution.""" try: return getattr( self.__class__.EXECUTION_STATE_MAPPING, self.client.containers.get(execution.engine_id).status).value except self._docker.errors.NotFound: return ExecutionStates.UNAVAILABLE
[docs]class K8SEngine(Engine): """Class for deploying contexts on Kubernetes."""
[docs] class EXECUTION_STATE_MAPPING(Enum): """State mappings for the K8S engine.""" running = ExecutionStates.RUNNING terminated = ExecutionStates.EXITED waiting = ExecutionStates.UNAVAILABLE
def __init__(self, config=None, timeout=10): """Create a K8SNode instance.""" # FIXME add super import kubernetes self._kubernetes = kubernetes self.timeout = timeout self.config = config if self.config is None: kubernetes.config.load_kube_config() self.config = kubernetes.config.kube_config.Configuration() self.logger.debug( 'Loaded k8s configuration.', extra=self.config.__dict__)
[docs] @cached_property def logger(self): """Create a logger instance.""" return logging.getLogger('renga.deployer.engines.k8s')
[docs] def launch(self, execution, engine=None, **kwargs): """Launch a Kubernetes Job with the context spec.""" context = execution.context execution.environment.setdefault( 'DEPLOYER_BASE_URL', '/interactive/{0}'.format( if current_app.config.get('DEPLOYER_K8S_INGRESS') else current_app.config.get('DEPLOYER_DEFAULT_BASE_URL') ) batch = self._kubernetes.client.BatchV1Api() namespace = kwargs.pop('namespace', 'default') job_spec = self._k8s_job_template(namespace, execution) self.logger.debug('Context spec: {}'.format(context.spec)) self.logger.debug('Job spec created: {}'.format(job_spec)) job = batch.create_namespaced_job(namespace, job_spec) uid = job.metadata.labels['controller-uid'] 'Created job for execution {0} of context {1}'.format(,, extra={ 'job': job.to_dict(), 'execution': execution_schema.dump(execution).data, 'context': context_schema.dump(execution.context).data }) # assume that if the user specified a port to open, they want # it available from the outside if context.spec.get('ports'): # To expose an interactive job, we need to start a service. # We use the job controller-uid to link the service. api = self._kubernetes.client.CoreV1Api() service_spec = self._k8s_service_template(namespace, context, uid) service = api.create_namespaced_service(namespace, service_spec) 'Created service for namespaced job {}'.format(uid), extra={'service': service.to_dict()}) # if using an ingress, need to make an additional object if current_app.config.get( 'DEPLOYER_K8S_INGRESS'): beta_api = self._kubernetes.client.ExtensionsV1beta1Api() ingress = beta_api.create_namespaced_ingress( namespace, self._k8s_ingress_template(uid, service, execution)) 'Created ingress for service {}'.format(, extra={'ingress': ingress.to_dict()}) execution.engine_id = uid execution.namespace = namespace return execution
[docs] def stop(self, execution, remove=False): """Stop a running job.""" if not execution.check_state( {ExecutionStates.RUNNING, ExecutionStates.EXITED}, self): return execution api = self._kubernetes.client.CoreV1Api() batch = self._kubernetes.client.BatchV1Api() if execution.context.spec.get('ports'): service = api.list_namespaced_service( execution.namespace, label_selector='job-uid={0}'.format(execution.engine_id)) if service.items: service = service.items[0] api.delete_namespaced_service(, execution.namespace, ) 'Deleted namespaced service {}'.format(, extra={'service': service.to_dict()}) if current_app.config.get( 'DEPLOYER_K8S_INGRESS'): beta_api = self._kubernetes.client.ExtensionsV1beta1Api() ingress = beta_api.list_namespaced_ingress( execution.namespace, label_selector='job-uid={0}'.format( execution.engine_id)).items[0] beta_api.delete_namespaced_ingress(, execution.namespace, self._kubernetes.client.V1DeleteOptions()) 'Deleted namespaced ingress {0} for service {1}'. format(ingress.metadata.uid,, extra={'ingress': ingress.to_dict()}) batch.delete_collection_namespaced_job( execution.namespace, label_selector='controller-uid={0}'.format(execution.engine_id)) 'Deleted namespaced job for execution {}'.format( execution.engine_id), extra={ 'execution': execution_schema.dump(execution).data, 'context': context_schema.dump(execution.context).data }) if remove: api.delete_collection_namespaced_pod( execution.namespace, label_selector='controller-uid={0}'.format( execution.engine_id))'Deleted namespaced pod for execution {}'.format( execution.engine_id)) return execution
[docs] def get_state(self, execution): """Get status of a running job.""" api = self._kubernetes.client.CoreV1Api() pod = api.list_namespaced_pod( execution.namespace, label_selector='controller-uid={}'.format(execution.engine_id)) if not pod.items: return ExecutionStates.UNAVAILABLE status = list( filter(lambda c: == str(, pod.items[0] .status.container_statuses))[0] return getattr( self.__class__.EXECUTION_STATE_MAPPING, list(filter(lambda x: x[1], status.state.to_dict().items()))[0][ 0]).value
@staticmethod def _k8s_job_template(namespace, execution): """Return simple kubernetes job JSON.""" # required spec context = execution.context context_spec = context.spec.copy() spec = { "containers": [{ "name": "{0}".format(, "image": "{0}".format(context_spec.pop('image')) }], "restartPolicy": "Never" } # optional spec if context_spec.get('ports'): spec['containers'][0]['ports'] = [{ 'containerPort': int(port) } for port in context_spec.pop('ports')] if context_spec.get('command'): command = shlex.split(context_spec.pop('command')) spec['containers'][0]['command'] = [command[0]] if len(command) > 1: spec['containers'][0]['args'] = command[1:] spec['containers'][0]['env'] = [{ 'name': k, 'value': str(v) } for k, v in execution.environment.items()] spec['containers'][0]['env'] += context_spec.pop('env', []) spec['volumes'] = context_spec.pop('volumes', []) # add all other stuff to spec spec['containers'][0].update(context_spec) # finalize job template template = { "kind": "Job", "metadata": { "namespace": "{0}".format(namespace), "generateName": "{0}-".format( }, "spec": { "template": { "spec": spec } } } return template @staticmethod def _k8s_service_template(namespace, context, uid): """Return simple kubernetes service JSON.""" return { 'apiVersion': 'v1', 'kind': 'Service', 'metadata': { 'generateName': re.sub('[^\w]+', '-', context.spec['image']), 'namespace': namespace, 'labels': { 'job-uid': "{0}".format(uid) } }, 'spec': { 'hostNetwork': 'true', 'ports': [{ 'port': int(port) } for port in context.spec['ports']], 'selector': { 'controller-uid': "{0}".format(uid) }, 'type': 'NodePort' } } @staticmethod def _k8s_ingress_template(uid, service, execution): """Return kubernetes ingress JSON.""" service_name = service_port = execution.context.spec['ports'][0] return { 'apiVersion': 'extensions/v1beta1', 'kind': 'Ingress', 'metadata': { 'generateName': 'interactive', 'annotations': { '': current_app.config.get( 'DEPLOYER_K8S_INGRESS_CLASS'), }, 'labels': { 'job-uid': "{0}".format(uid) } }, 'spec': { 'rules': [ { 'http': { 'paths': [ { 'path': '/interactive/{0}'.format(, 'backend': { 'serviceName': service_name, 'servicePort': int(service_port) } } ] } } ] } }
[docs] def get_logs(self, execution, timeout=None, **kwargs): """Extract logs for the Job from the Pod.""" api = self._kubernetes.client.CoreV1Api() namespace = execution.namespace pod = api.list_namespaced_pod( namespace, label_selector='controller-uid={0}'.format(execution.engine_id)) if not pod.items: # FIXME: implement proper exception handling and propagation raise NotFound('Execution container not found.') timein = time.time() while not resource_available(api.read_namespaced_pod_log)( pod.items[0], namespace): if time.time() - timein > (timeout or self.timeout): raise RuntimeError("Timeout while fetching logs.") return api.read_namespaced_pod_log(pod.items[0], namespace)
[docs] def get_host_ports(self, execution): """Return host ip and port bindings for the running execution.""" api = self._kubernetes.client.CoreV1Api() service = api.list_namespaced_service( execution.namespace, label_selector='job-uid={0}'.format(execution.engine_id)) pod = api.list_namespaced_pod( execution.namespace, label_selector='controller-uid={0}'.format(execution.engine_id)) if not service.items or not execution.check_state( ExecutionStates.RUNNING, self): # this service doesn't exist or job isn't running yet return {'ports': []} else: if current_app.config.get( 'DEPLOYER_K8S_INGRESS'): beta_api = self._kubernetes.client.ExtensionsV1beta1Api() ingress = beta_api.list_namespaced_ingress( execution.namespace, label_selector='job-uid={0}'.format( execution.engine_id)).items[0] if not ingress.status.load_balancer.ingress or \ not ingress.status.load_balancer.ingress[0].ip: host = None else: host = ingress.status.load_balancer.ingress[0].ip return { 'ports': [{ 'specified': port.port, 'host': current_app.config[ 'DEPLOYER_K8S_CONTAINER_IP'] or host, 'path': ingress.spec.rules[0].http.paths[0].path, 'exposed': '443', 'protocol': port.protocol, } for port in service.items[0].spec.ports] } return { 'ports': [{ 'specified': port.port, 'host': current_app.config[ 'DEPLOYER_K8S_CONTAINER_IP'] or pod.items[ 0].status.host_ip, 'exposed': port.node_port, 'protocol': port.protocol, } for port in service.items[0].spec.ports] }
[docs] def get_execution_environment(self, execution) -> dict: """Retrieve the environment specified for an execution container.""" client = self._kubernetes.client.BatchV1Api() job = client.list_namespaced_job( execution.namespace, label_selector='controller-uid={0}'.format(execution.engine_id)) if not job.items: # FIXME: implement proper exception handling and propagation raise NotFound('Execution container not found.') return { e.value for e in job.items[0].spec.template.spec.containers[0].env }