## ITC.L.AA0.5.NB.1 Startup with control and monitoring

### Test implemented based on:
- [ITC.L.AA0.5.NB.1 Startup with control and monitoring](https://skaoffice.jamacloud.com/perspective.req?docId=1121575&projectId=335)


References:
- [LOW.CSP LMC Documentation](https://developer.skatelescope.org/projects/ska-csp-lmc-low/en/latest/lmc/low_csp_lmc.html)
- [LOW.CSP LMC Tango Clients Examples](https://developer.skatelescope.org/projects/ska-csp-lmc-low/en/latest/example/example.html)
- [CSP LMC commands for AA05](https://confluence.skatelescope.org/display/SE/CSP+LMC+commands+for+AA05)

The LOW CSP release is deployed onto the Kubernetes (k8s) cluster as a software release with the underlying assumption that the CBF hardware is installed and available on the network.
After deployment the LOW CSP and associated subsystems are DISABLED and the TMC has to set the LOW CSP to adminMode=ONLINE to establish communication and enable command and control of the system.

Note: There is no direct ON command to LOW CSP. On the TMC command for adminMode=ONLINE controllers and subarrays for LOW CSP.LMC and LOW CBF will be in the ON state. The PST and PSS beams (when available) will remain in OFF state.

### Viewing Tango attributes

The notebook will interrogate device states and report back attribute values as part of the verification output.    
For visual inspection of device attributes the Taranta API interface is used.    
You can access the interface via a web browser by pointing the URL to the appropriate namespace on your k8s cluster    
`http://<k8s_CLUSTER>/<KUBE_NAMESPACE>/taranta/devices/low-csp/`    
e.g. for a deployment on the CLP    
`http://k8s.clp.skao.int/ska-low-csp-baseline/taranta/devices/low-csp/`

### Prerequisites
- All necessary equipment are installed and verified
- Assume a network is available and all equipment/systems are powered
- P4 switch is configured in order to control CBF
- Fresh deployment of LOW CSP on the k8s cluster

### Imports

In [1]:
%pip install --extra-index-url https://artefact.skao.int/repository/pypi-internal/simple pytango ska-control-model colorama

Looking in indexes: https://pypi.org/simple, https://artefact.skao.int/repository/pypi-internal/simple
Note: you may need to restart the kernel to use updated packages.


In [2]:
import os
import time
from typing import Any

from colorama import Fore
from ska_control_model import AdminMode, ObsState
from tango import Database, DeviceProxy, DevState

### Change next block with your data:
This section links the notebook execution to the tango devices on the cluster and requires user updates when the notebook is run on a system other than the Digital Signal PSI in Eindhoven.

Ensure the KUBE_NAMESPACE namespace parameter correctly identifies the k8s namespace with which to intend to interact.

In [3]:
# Choose AdminMode. Can be either ONLINE or ENGINEERING
ADMIN_MODE = AdminMode.ONLINE

VIS_FW_VERSION = "vis:0.0.4-main.e408a416"

# specify here the namespace to connect in this cluster
LOW_CSP_BASELINE = "ska-low-csp-baseline"  # run on deployment with HW

# select the namespace you want to connect to
# either choose one of the namespace variables provided above or supply your own
KUBE_NAMESPACE = LOW_CSP_BASELINE

# set the name of the databaseds service
DATABASEDS_NAME = "ska-low-csp-databaseds"

# finally set the TANGO_HOST
os.environ["TANGO_HOST"] = f"{DATABASEDS_NAME}.{KUBE_NAMESPACE}.svc.cluster.local:10000"

### Tango proxy devices

In [4]:
# Tango Database
tango_db = Database()

# Low CSP controller
csp_ctl_fqdns = tango_db.get_device_exported_for_class("LowCspController").value_string
csp_controller = DeviceProxy(csp_ctl_fqdns[0])
print(f"Using {csp_controller.name()}")

# Low CSP subarray
csp_subarr_fqdns = tango_db.get_device_exported_for_class("LowCspSubarray").value_string
print("CSP Subarrays:")
print(*[" - " + each for each in csp_subarr_fqdns], sep="\n")
csp_subarrays = [DeviceProxy(fqdn) for fqdn in csp_subarr_fqdns]
csp_subarray_1 = csp_subarrays[0]

# CBF controller
cbf_ctl_fqdns = tango_db.get_device_exported_for_class("LowCbfController").value_string
cbf_controller = DeviceProxy(cbf_ctl_fqdns[0])
print(f"Using {cbf_controller.name()}")

# CBF Subarray
cbf_subarr_fqdns = tango_db.get_device_exported_for_class("LowCbfSubarray").value_string
print("CBF Subarrays:")
print(*[" - " + each for each in cbf_subarr_fqdns], sep="\n")
cbf_subarrays = [DeviceProxy(fqdn) for fqdn in cbf_subarr_fqdns]

# CBF processor
cbf_allocator = DeviceProxy(tango_db.get_device_exported_for_class("LowCbfAllocator").value_string[0])
cbf_processors = [DeviceProxy(fqdn) for fqdn in tango_db.get_device_exported_for_class("LowCbfProcessor").value_string]

Using low-csp/control/0
CSP Subarrays:
 - low-csp/subarray/01
 - low-csp/subarray/02
 - low-csp/subarray/03
 - low-csp/subarray/04
Using low-cbf/control/0
CBF Subarrays:
 - low-cbf/subarray/01
 - low-cbf/subarray/02
 - low-cbf/subarray/03
 - low-cbf/subarray/04


### Helper Functions

In [5]:
def wait_for_attribute_value(
    device: DeviceProxy,  # pylint: disable = redefined-outer-name
    attribute: str,
    value: Any = True,
    failure_message: str = "Timed out waiting for attribute value",
    timeout_sec: int = 120,
) -> None:
    """
    Wait until an attribute has a certain value.

    :param device: Tango device proxy with the attribute to check
    :param attribute: The name of the attribute
    :param value: Expected value (defaults to True)
    :param failure_message: Message for the exception on failure.
    Defaults to "Timed out waiting for attribute value".
    A note about duration is appended.
    :param timeout_sec: Approximate time-out period  in seconds (in reality
    it could be longer due to delays waiting for each attribute read)
    :raises RuntimeError: if expected value not seen before timing out
    """
    deadline = time.time() + timeout_sec
    poll_interval_seconds = 2
    while time.time() < deadline:
        if device.read_attribute(attribute).value == value:
            break
        time.sleep(poll_interval_seconds)
    else:
        raise RuntimeError(f"{failure_message} after {timeout_sec} sec")


# Coloured printing functions for strings that use universal ANSI escape sequences.
# fail: bold red, pass: bold green


def print_fail(message, start="", end="\n"):
    """Print coloured fail message."""
    print(f"{start}{Fore.RED}{message}{Fore.RESET}", end=end)


def print_pass(message, start="", end="\n"):
    """Print coloured pass message."""
    print(f"{start}{Fore.GREEN}{message}{Fore.RESET}", end=end)

### Current state of LOW CSP devices

**Note**:
Should any of the AdminMode.OFFLINE assertions in the following cell result in "FAILED", please redeploy your system an run this notebook again.

In [6]:
assert csp_controller.state() == DevState.DISABLE, "State is not DISABLE"
for csp_subarray in csp_subarrays:
    assert csp_subarray.obsState == ObsState.EMPTY, f"LOW CSP Subarray {csp_subarray.name()} obsState is not EMPTY"
assert csp_controller.adminmode == AdminMode.OFFLINE, "Admin mode of low csp controller is not OFFLINE"
assert str(csp_controller.healthState) == "healthState.UNKNOWN"

assert cbf_controller.state() == DevState.DISABLE, "State is not DISABLE"
for cbf_subarray in cbf_subarrays:
    assert cbf_subarray.obsState == ObsState.EMPTY, f"LOW CBF Subarray {cbf_subarray.name()} obsState is not EMPTY"
assert cbf_controller.adminmode == AdminMode.OFFLINE, "Admin mode of low cbf controller is not OFFLINE"
assert str(csp_controller.healthState) == "healthState.UNKNOWN"
for proc_cbf in cbf_processors:
    assert proc_cbf.state() == DevState.ON, f"CBF processor {proc_cbf.name()} should be ON"

print_pass("All checks passed")

[32mAll checks passed[39m


### Under TMC control initiate LOW CSP by setting adminMode

In [7]:
csp_controller.adminMode = ADMIN_MODE
wait_for_attribute_value(csp_controller, "isCommunicating", True)

### Verify that LOW CSP controller and LOW CSP subarrays reports ON status

In [8]:
if csp_controller.state() == DevState.ON:
    print_pass(f"{csp_controller.name()} state is {csp_controller.state()}")
    print_pass("Check passed")
else:
    print_fail(f"{csp_controller.name()} state is {csp_controller.state()}")
    print_fail("Check Failed should be ON")

if csp_controller.adminmode == ADMIN_MODE:
    print_pass(f"{csp_controller.name()} adminmode is {csp_controller.adminmode.value}/{csp_controller.adminmode.name}")
    print_pass("Check passed")
else:
    print_fail(f"{csp_controller.name()} adminmode is {csp_controller.adminmode.value}/{csp_controller.adminmode.name}")
    print_fail("Check Failed should be ONLINE")

if csp_controller.healthState.name == "OK":
    print_pass(f"{csp_controller.name()} healthState is {csp_controller.healthState.value}/{csp_controller.healthState.name}")
    print_pass("Check passed")
else:
    print_fail(f"{csp_controller.name()} healthState is {csp_controller.healthState.value}/{csp_controller.healthState.name}")
    print_fail("Check Failed should be OK")

for csp_subarray in csp_subarrays:
    if csp_subarray.state() == DevState.ON:
        print_pass(f"{csp_subarray.name()} state is {csp_subarray.state()}")
        print_pass(f"Check passed for CSP Subarray {csp_subarray.name()}")
    else:
        print_fail(f"{csp_subarray.name()} state is {csp_subarray.state()}")
        print_fail(f"Check Failed should be ON for CSP subarray: {csp_subarray.name()}")

[32mlow-csp/control/0 state is ON[39m
[32mCheck passed[39m
[32mlow-csp/control/0 adminmode is 0/ONLINE[39m
[32mCheck passed[39m
[31mlow-csp/control/0 healthState is 3/UNKNOWN[39m
[31mCheck Failed should be OK[39m
[32mlow-csp/subarray/01 state is ON[39m
[32mCheck passed for CSP Subarray low-csp/subarray/01[39m
[32mlow-csp/subarray/02 state is ON[39m
[32mCheck passed for CSP Subarray low-csp/subarray/02[39m
[32mlow-csp/subarray/03 state is ON[39m
[32mCheck passed for CSP Subarray low-csp/subarray/03[39m
[32mlow-csp/subarray/04 state is ON[39m
[32mCheck passed for CSP Subarray low-csp/subarray/04[39m


### Verify that LOW CBF controller and LOW CBF subarrays reports ON status

In [9]:
if cbf_controller.state() == DevState.ON:
    print_pass(f"{cbf_controller.name()} state is {cbf_controller.state()}")
    print_pass("Check passed")
else:
    print_fail(f"{cbf_controller.name()} state is {cbf_controller.state()}")
    print_fail("Check Failed should be ON")

if cbf_controller.adminmode == ADMIN_MODE:
    print_pass(f"{cbf_controller.name()} adminmode is {cbf_controller.adminmode.value}/{cbf_controller.adminmode.name}")
    print_pass("Check passed")
else:
    print_fail(f"{cbf_controller.name()} adminmode is {cbf_controller.adminmode.value}/{cbf_controller.adminmode.name}")
    print_fail("Check Failed should be ONLINE")

if cbf_controller.healthState.name == "OK":
    print_pass(f"{cbf_controller.name()} healthState is {cbf_controller.healthState.value}/{cbf_controller.healthState.name}")
    print_pass("Check passed")
else:
    print_fail(f"{cbf_controller.name()} healthState is {cbf_controller.healthState.value}/{cbf_controller.healthState.name}")
    print_fail("Check Failed should be OK")

for cbf_subarray in cbf_subarrays:
    if cbf_subarray.state() == DevState.ON:
        print_pass(f"{cbf_subarray.name()} state is {cbf_subarray.state()}")
        print_pass(f"Check passed for CBF subarray: {cbf_subarray.name()}")
    else:
        print_fail(f"{cbf_subarray.name()} state is {cbf_subarray.state()}")
        print_fail(f"Check Failed should be ON for CBF subarray: {csp_subarray.name()}")

[32mlow-cbf/control/0 state is ON[39m
[32mCheck passed[39m
[32mlow-cbf/control/0 adminmode is 0/ONLINE[39m
[32mCheck passed[39m
[31mlow-cbf/control/0 healthState is 3/UNKNOWN[39m
[31mCheck Failed should be OK[39m
[32mlow-cbf/subarray/01 state is ON[39m
[32mCheck passed for CBF subarray: low-cbf/subarray/01[39m
[32mlow-cbf/subarray/02 state is ON[39m
[32mCheck passed for CBF subarray: low-cbf/subarray/02[39m
[32mlow-cbf/subarray/03 state is ON[39m
[32mCheck passed for CBF subarray: low-cbf/subarray/03[39m
[32mlow-cbf/subarray/04 state is ON[39m
[32mCheck passed for CBF subarray: low-cbf/subarray/04[39m
