Skip to content

Python Kubernetes Client

hokiegeek2 edited this page May 12, 2022 · 3 revisions
from kubernetes.client.rest import ApiException
from kubernetes.client import Configuration, ApiClient, CoreV1Api, AppsV1Api, V1Pod, \
     V1Service, V1ServiceSpec, V1ServicePort, V1Endpoints, V1EndpointSubset, \
     V1EndpointAddress, CoreV1EndpointPort, V1ObjectMeta
 
# Configuration and Client Generation Methods
def getConfiguration(k8sApiUrl : str, keyFilePath : str, certFilePath : str) -> Configuration:
    import urllib3
    urllib3.disable_warnings()
    config = Configuration()
    config.host=k8sApiUrl
    config.key_file=keyFilePath
    config.cert_file=certFilePath
    config.verify_ssl=False
    return config
 
def getCoreK8sClient(config : Configuration) -> CoreV1Api:
    apiClient = ApiClient(configuration=config)
    return CoreV1Api(api_client=apiClient)
 
def getAppsK8sClient(config : Configuration) -> AppsV1Api:
    apiClient = ApiClient(configuration=config)
    return AppsV1Api(apiClient)
def createService(coreClient : CoreV1Api, serviceName : str, appName: str, port : int, targetPort : int,
                  namespace : str='default', protocol : str='TCP') -> None:
    service = V1Service()
    service.kind = "Service"
    service.metadata = V1ObjectMeta(name=serviceName)
    spec = V1ServiceSpec()
    spec.selector = {"app": appName}
    spec.ports = [V1ServicePort(protocol=protocol, port=port,
                                                target_port=targetPort)]
    service.spec = spec
    try:
        self.coreClient.create_namespaced_service(namespace=namespace,
                                                  body=service)
    except ApiException as e:
        raise ValueError(e) 
  
def updateService(coreClient : CoreV1Api, serviceName : str, servicePort : int, targetPort : int,
                  appName : str, protocol : str='TCP', namespace : str=None) -> None:
    service = V1Service()
    service.kind = "Service"
    service.metadata = V1ObjectMeta(name=serviceName)
    spec = V1ServiceSpec()
    spec.selector = {"app": appName}
    spec.ports = [V1ServicePort(protocol=protocol, port=servicePort,
                                                target_port=targetPort)]
    service.spec = spec
    try:
        coreClient.patch_namespaced_service(serviceName,namespace,spec)
    except ApiException as e:
        raise ValueError(e)
         
def deleteService(coreClient : CoreV1Api, serviceName : str, namespace : str='default') -> None:
    try:
        coreClient.delete_namespaced_service(name=serviceName, namespace=namespace)
    except ApiException as e:
        raise ValueError(e)
Clone this wiki locally