In [29]:
# pylint: disable=broad-exception-caught

# Shutdown integration test
- Sequency of steps to shut down CSP (https://skaoffice.jamacloud.com/perspective.req#/testCases/1088698?projectId=328)
- Check what to expects for commands available via the `csp_controller`

References:
- [LOW.CSP LMC Documentation](https://developer.skatelescope.org/projects/ska-csp-lmc-low/en/latest/lmc/low_csp_lmc.html)
- [LOW.CSP LMC Tango Clients Examples](https://developer.skatelescope.org/projects/ska-csp-lmc-low/en/latest/example/example.html)
- [CSP LMC commands for AA05](https://confluence.skatelescope.org/display/SE/CSP+LMC+commands+for+AA05)

**User note**:
- As stated in https://confluence.skatelescope.org/display/SE/Standby, the Standby command is not yet supported in the current version of Low SCP.LMC and this notebook will fail    
The choice is to retain the notebook test describing expected functionality and allow the steps that is still under development to fail.
Failures in LOW CSP test cases are allowed and considered valid when deviating from the expected/anticipated.
In the case of this notebook test the failure is not preventing integration as workarounds is available, all workarounds and alternatives are provided in separate notebooks to allow integration to continue.
- Currently the test is restricted to testing only a single subarray: subarray 1

Although this test fails, it does not currently affect the fundamental operation of the LOW CSP.    
These mode status changes are not currently a requirement for the correlator functionality tests.

### Viewing Tango attributes
The notebook will interogate device states and report back attribute values as part of the verification output.    
For visual inspection of device attributes the Taranta API interface is used.    
You can access the interface via a web browser by pointing the URL to the appropriate namespace on your k8s cluster    
`http://<k8s_CLUSTER>/<KUBE_NAMESPACE>/taranta/devices/low-csp/`    
e.g. for a deployment on the CLP    
`http://k8s.clp.skao.int/ska-low-csp-baseline/taranta/devices/low-csp/`

### Prerequisites

- All necessary equipment are installed and verified
- Assume a network is available and all equipment/systems are powered
- P4 switch is configured in order to control CBF
- LOW CSP has been deployed to the k8s cluster

### Imports

In [1]:
import json
import os
import time
from contextlib import suppress
from typing import Any

from ska_control_model import AdminMode, ObsState
from tango import ConnectionFailed, DeviceProxy, DevState

### Tango config

This section links the notebook execution to the tango devices on the cluster.
The most important parameter is the namespace name: KUBE_NAMESPACE    
This identifies the k8s namespace with which to intend to interact.
For running notebooks on the CLP k8s cluster this needs to be "ska-low-csp-baseline".

In [2]:
# specify here the namespace to connect in this cluster
KUBE_NAMESPACE = "ska-low-csp-baseline-no-hw"
KUBE_NAMESPACE = "ci-ska-low-csp-top-683-init-function-investigation"
# set the name of the databaseds service
DATABASEDS_NAME = "ska-low-csp-databaseds"

# finally set the TANGO_HOST
os.environ["TANGO_HOST"] = f"{DATABASEDS_NAME}.{KUBE_NAMESPACE}.svc.cluster.local:10000"

### Tango proxy devices

In [3]:
csp_controller = DeviceProxy("low-csp/control/0")
csp_subarray_1 = DeviceProxy("low-csp/subarray/01")
cbf_controller = DeviceProxy("low-cbf/control/0")
cbf_subarray_id = 1
cbf_subarray_str = f"low-cbf/subarray/{cbf_subarray_id:02}"
cbf_subarray_1 = DeviceProxy(cbf_subarray_str)
csp_devices = (csp_controller, csp_subarray_1)
cbf_devices = (cbf_controller, cbf_subarray_1)
all_devices = csp_devices + cbf_devices

### Helper functions

In [4]:
def wait_until(
    predicate: callable, message_on_fail: str = None, timeout: int = 300, poll_frequency: int = 2
) -> None:
    start = time.time()
    while True:
        try:
            return_val = predicate()
            if return_val:
                return return_val
        except Exception:
            time.sleep(0.1)
        if time.time() - start > timeout:
            raise TimeoutError(f"Timeout occurred: {message_on_fail}")
        time.sleep(poll_frequency)

In [5]:
def wait_for_attribute_value(
    device: DeviceProxy,
    attribute: str,
    value: Any = True,
    failure_message: str = "Timed out waiting for attribute value",
    timeout_sec: int = 120,
) -> None:
    """
    Wait until an attribute has a certain value

    :param device: Tango device proxy with the attribute to check
    :param attribute: The name of the attribute
    :param value: Expected value (defaults to True)
    :param failure_message: Message for the exception on failure.
    Defaults to "Timed out waiting for attribute value".
    A note about duration is appended.
    :param timeout_sec: Approximate time-out period  in seconds (in reality
    it could be longer due to delays waiting for each attribute read)
    :raises RuntimeError: if expected value not seen before timing out
    """
    deadline = time.time() + timeout_sec
    poll_interval_seconds = 2
    while time.time() < deadline:
        if device.read_attribute(attribute).value == value:
            break
        time.sleep(poll_interval_seconds)
    else:
        raise RuntimeError(f"{failure_message} after {timeout_sec} sec")

In [6]:
def wait_for_device_response(
    device: DeviceProxy,
    failure_message: str = "Timed out waiting for device to respond",
    timeout_sec: int = 120,
) -> None:
    """
    Wait until a device responds.

    :param device: Tango device proxy to wait for
    :param failure_message: Message for the exception on failure.
    Defaults to "Timed out waiting for device to respond".
    A note about duration is appended.
    :param timeout_sec: Approximate time-out period in seconds
    :raises RuntimeError: if the device does not respond in time
    """
    deadline = time.time() + timeout_sec
    poll_interval_seconds = 2
    while time.time() < deadline:
        try:
            device.ping()
            return
        except ConnectionFailed:
            time.sleep(poll_interval_seconds)
    raise RuntimeError(f"{failure_message} after {timeout_sec} sec")

In [7]:
# Colored printing functions for strings that use universal ANSI escape sequences.
# fail: bold red, pass: bold green, warn: bold yellow,
# info: bold blue, bold: bold white


def print_fail(message, start="", end="\n"):
    print(f"{start}\x1b[1;31m{message.strip()}\x1b[0m", end=end)


def print_pass(message, start="", end="\n"):
    print(f"{start}\x1b[1;32m{message.strip()}\x1b[0m", end=end)


def print_warn(message, start="", end="\n"):
    print(f"{start}\x1b[1;33m{message.strip()}\x1b[0m", end=end)


def print_debug(message, start="", end="\n"):
    print(f"{start}\x1b[1;34m{message.strip()}\x1b[0m", end=end)


def print_info(message, start="", end="\n"):
    print(f"{start}{message.strip()}", end=end)


def print_bold(message, start="", end="\n"):
    print(f"{start}\x1b[1;37m{message.strip()}\x1b[0m", end=end)

In [8]:
# show current state of device


def color_print(device):
    wait_for_device_response(device)
    if device.state() == DevState.FAULT:
        print_fail(f"{device.status()}", start="\t")
    elif device.state() == DevState.ALARM:
        print_warn(f"{device.status()}", start="\t")
    else:
        print_info(f"{device.status()}", start="\t")


def show_state():
    for device in all_devices:
        print(f"TANGO device: {device.name()}")
        color_print(device)
        try:
            print(f"\t{str(device.adminMode)}")
        except Exception:
            print("raises error in this state")
        print(f"\t{str(device.healthState)}")
        with suppress(AttributeError):
            print(f"\t{str(device.obsState)}")

In [9]:
show_state()

TANGO device: low-csp/control/0
	The device is in ON state.
	adminMode.ONLINE
	healthState.UNKNOWN
TANGO device: low-csp/subarray/01
	The device is in ON state.
	adminMode.ONLINE
	healthState.UNKNOWN
	obsState.EMPTY
TANGO device: low-cbf/control/0
	The device is in ON state.
	adminMode.ONLINE
	healthState.UNKNOWN
TANGO device: low-cbf/subarray/01
	The device is in ON state.
	adminMode.ONLINE
	healthState.UNKNOWN
	obsState.EMPTY


### Init devices

**WARNING**:    
Initialisation of the system only happens once after a fresh deployment.    
Only run the cell below if the test is running on a freshly deployed system that has not been initialised yet.
Rerunning Init() on an already deployed system may lead to errors and FAULT conditions

In [10]:
if csp_controller.adminMode == AdminMode.OFFLINE:
    for device in all_devices:
        print(f"Initializing TANGO device: {device.name()}")
        device.set_timeout_millis(60_000)
        device.Init()

In [11]:
show_state()

TANGO device: low-csp/control/0
	The device is in ON state.
	adminMode.ONLINE
	healthState.UNKNOWN
TANGO device: low-csp/subarray/01
	The device is in ON state.
	adminMode.ONLINE
	healthState.UNKNOWN
	obsState.EMPTY
TANGO device: low-cbf/control/0
	The device is in ON state.
	adminMode.ONLINE
	healthState.UNKNOWN
TANGO device: low-cbf/subarray/01
	The device is in ON state.
	adminMode.ONLINE
	healthState.UNKNOWN
	obsState.EMPTY


### AdminMode ONLINE
Low CSP.LMC Controller and Subarrays adminMode have to be set to MAINTENANCE or ONLINE to start the connection with the subordinate Low CBF TANGO Devices.

In [12]:
csp_controller.adminMode = AdminMode.ONLINE
wait_for_attribute_value(csp_controller, "isCommunicating", True)
show_state()

TANGO device: low-csp/control/0
	The device is in ON state.
	adminMode.ONLINE
	healthState.UNKNOWN
TANGO device: low-csp/subarray/01
	The device is in ON state.
	adminMode.ONLINE
	healthState.UNKNOWN
	obsState.EMPTY
TANGO device: low-cbf/control/0
	The device is in ON state.
	adminMode.ONLINE
	healthState.UNKNOWN
TANGO device: low-cbf/subarray/01
	The device is in ON state.
	adminMode.ONLINE
	healthState.UNKNOWN
	obsState.EMPTY


### Transit LOW CSP to ON state

In [13]:
csp_controller.on([])
wait_until(
    lambda: "on completed" in csp_controller.longRunningCommandResult[1],
    "On is not completed after 300s",
)
show_state()

TANGO device: low-csp/control/0
	The device is in ON state.
	adminMode.ONLINE
	healthState.UNKNOWN
TANGO device: low-csp/subarray/01
	The device is in ON state.
	adminMode.ONLINE
	healthState.UNKNOWN
	obsState.EMPTY
TANGO device: low-cbf/control/0
	The device is in ON state.
	adminMode.ONLINE
	healthState.UNKNOWN
TANGO device: low-cbf/subarray/01
	The device is in ON state.
	adminMode.ONLINE
	healthState.UNKNOWN
	obsState.EMPTY


### Prove ON state
Prove ON state by assigning resources to subarray.   
This step is selected since it can be executed even without HW in the loop.

In [14]:
print("Assign resources")
# resources can only be assigned if the array is empty
print(f"{csp_subarray_1.dev_name()} in {str(csp_subarray_1.obsState)}")
assert csp_subarray_1.obsState == ObsState.EMPTY

assign_resources_json = {
    "interface": "https://schema.skao.int/ska-low-csp-assignresources/2.0",
    "common": {
        "subarray_id": cbf_subarray_id,
    },
    "lowcbf": {},
}
print(assign_resources_json)

Assign resources
low-csp/subarray/01 in obsState.EMPTY
{'interface': 'https://schema.skao.int/ska-low-csp-assignresources/2.0', 'common': {'subarray_id': 1}, 'lowcbf': {}}


In [15]:
csp_subarray_1.AssignResources(json.dumps(assign_resources_json))
print(f"Verify subarray {cbf_subarray_id} moved from EMPTY to IDLE")
wait_for_attribute_value(csp_subarray_1, "obsState", ObsState.IDLE, "Assignment not finished")
print(f"{csp_subarray_1.dev_name()} in {str(csp_subarray_1.obsState)}")

Verify subarray 1 moved from EMPTY to IDLE
low-csp/subarray/01 in obsState.IDLE


In [16]:
show_state()

TANGO device: low-csp/control/0
	The device is in ON state.
	adminMode.ONLINE
	healthState.UNKNOWN
TANGO device: low-csp/subarray/01
	The device is in ON state.
	adminMode.ONLINE
	healthState.UNKNOWN
	obsState.IDLE
TANGO device: low-cbf/control/0
	The device is in ON state.
	adminMode.ONLINE
	healthState.UNKNOWN
TANGO device: low-cbf/subarray/01
	The device is in ON state.
	adminMode.ONLINE
	healthState.UNKNOWN
	obsState.IDLE


In [17]:
if csp_subarray_1.obsState == ObsState.IDLE:
    print_pass(f"{csp_controller.name()} is {csp_controller.state()}")
    print_pass(f"{csp_subarray_1.name()} is {csp_subarray_1.state()}")
    print_pass(
        f"{csp_subarray_1.name()} transitioned from EMPTY to {str(csp_subarray_1.obsState)}"
    )
    print_pass("Test passed")
else:
    print_fail(f"{csp_controller.name()} is {csp_controller.state()}")
    print_fail(f"{csp_subarray_1.name()} is {csp_subarray_1.state()}")
    print_fail(f"{csp_subarray_1.name()} unsuccessfully transitioned from EMPTY to IDLE")
    print_fail(f"{csp_subarray_1.name()} in {str(csp_subarray_1.obsState)} state")
    print_fail("Test failed")

[1;32mlow-csp/control/0 is ON[0m
[1;32mlow-csp/subarray/01 is ON[0m
[1;32mlow-csp/subarray/01 transitioned from EMPTY to obsState.IDLE[0m
[1;32mTest passed[0m


### Transit LOW CSP to OFF state
OFF: power is disconnected. This state cannot be reported by CSP itself.

The Off command disables any signal processing capability of a subarray and all its allocated resources are also released. As for the ADR-8, this command can be issued fron any observing state.    
https://confluence.skatelescope.org/pages/viewpage.action?pageId=105416556

In [18]:
csp_controller.off([])
wait_until(
    lambda: "off completed" in csp_controller.longRunningCommandResult[1],
    "Off is not completed after 300s",
)
csp_controller.longRunningCommandResult

('1697205564.0378373_244397026655425_Off', '[3, "off completed  1/1"]')

### Prove OFF state

In [19]:
if str(csp_controller.state()) == DevState.OFF:
    print_pass(f"{cbf_controller.name()} is {csp_controller.state()}")
    print_pass(f"{cbf_controller.name()} is {cbf_controller.state()}")
    print_pass("LOW CSP reports OFF")
    print_pass("Test passed")
else:
    print_fail(f"{csp_controller.name()} is {csp_controller.state()}")
    print_fail(f"{cbf_controller.name()} is {cbf_controller.state()}")
    print_fail("LOW CSP not in OFF")
    print_fail("Test failed")

[1;31mlow-csp/control/0 is ON[0m
[1;31mlow-cbf/control/0 is ON[0m
[1;31mLOW CSP not in OFF[0m
[1;31mTest failed[0m


In [20]:
if csp_subarray_1.obsState == ObsState.EMPTY:
    print_pass(
        f"{csp_subarray_1.name()} transitioned from IDLE/READY to {str(csp_subarray_1.obsState)}"
    )
    print_pass("Test passed")
else:
    print_fail(f"{csp_subarray_1.name()} failed transitioned from IDLE/READY to EMPTY")
    print_fail(f"{csp_subarray_1.name()} in {str(csp_subarray_1.obsState)} state")
    print_fail("Test failed")

[1;32mlow-csp/subarray/01 transitioned from IDLE/READY to obsState.EMPTY[0m
[1;32mTest passed[0m


In [21]:
show_state()

TANGO device: low-csp/control/0
	The device is in ON state.
	adminMode.ONLINE
	healthState.UNKNOWN
TANGO device: low-csp/subarray/01
	The device is in ON state.
	adminMode.ONLINE
	healthState.UNKNOWN
	obsState.EMPTY
TANGO device: low-cbf/control/0
	The device is in ON state.
	adminMode.ONLINE
	healthState.UNKNOWN
TANGO device: low-cbf/subarray/01
	The device is in ON state.
	adminMode.ONLINE
	healthState.UNKNOWN
	obsState.EMPTY


### Transit LOW CSP to STANDBY state
STANDBY: Low‐power state, Low.CSP uses < 5% of nominal power. Basic monitor and control functionality is available, including the commands to request state transition to ON, OFF, DISABLE, or INIT. Signal processing functionality and related commands are not available. All sub‐arrays are empty (OFF) and IDLE; all resources (receptors, tide‐array beams) are placed in the pool of unused resources.

In [22]:
csp_controller.standby([])
wait_until(
    lambda: "standby completed" in csp_controller.longRunningCommandResult[1],
    "Standby is not completed after 300s",
)
print(csp_controller.commandResult)
print(csp_controller.longRunningCommandResult)

('standby', '3')
('1697205686.626241_178833921088301_Standby', '[3, "standby completed  1/1"]')


In [23]:
if csp_subarray_1.obsState == ObsState.EMPTY:
    print_pass(f"{csp_controller.name()} is {csp_controller.state()}")
    print_pass(f"{csp_subarray_1.name()} is {csp_subarray_1.state()}")
    print_pass(f"{csp_subarray_1.name()} transitioned from IDLE to EMPTY")
    print_pass("Test passed")
else:
    print_fail(f"{csp_controller.name()} is {csp_controller.state()}")
    print_fail(f"{csp_subarray_1.name()} is {csp_subarray_1.state()}")
    print_fail(f"{csp_subarray_1.name()} failed transitioned from IDLE to EMPTY")
    print_fail(f"{csp_subarray_1.name()} in {csp_subarray_1.obsState} state")
    print_fail("Test failed")

[1;32mlow-csp/control/0 is ON[0m
[1;32mlow-csp/subarray/01 is ON[0m
[1;32mlow-csp/subarray/01 transitioned from IDLE to EMPTY[0m
[1;32mTest passed[0m


`LOW CSP.LMC and LOW CBF controllers and subarrays reports STANDBY state`  
`LOW CSP.LMC and LOW CBF obsState reports EMPTY`   
`The LOW CSP healthState is OK`

In [24]:
show_state()

TANGO device: low-csp/control/0
	The device is in ON state.
	adminMode.ONLINE
	healthState.UNKNOWN
TANGO device: low-csp/subarray/01
	The device is in ON state.
	adminMode.ONLINE
	healthState.UNKNOWN
	obsState.EMPTY
TANGO device: low-cbf/control/0
	The device is in ON state.
	adminMode.ONLINE
	healthState.UNKNOWN
TANGO device: low-cbf/subarray/01
	The device is in ON state.
	adminMode.ONLINE
	healthState.UNKNOWN
	obsState.EMPTY


### Prove STANDBY state

In [25]:
csp_subarray_1.AssignResources(json.dumps(assign_resources_json))
print(f"Verify subarray {cbf_subarray_id} do not moved from EMPTY to IDLE")
# TODO change waiter when it starts working
wait_for_attribute_value(csp_subarray_1, "obsState", ObsState.IDLE, "Assignment not finished")
print(f"{csp_subarray_1.dev_name()} in {str(csp_subarray_1.obsState)}")

Verify subarray 1 do not moved from EMPTY to IDLE
low-csp/subarray/01 in obsState.IDLE


In [26]:
if csp_controller.state() == DevState.STANDBY:
    print_pass(f"{csp_controller.name()} is {csp_controller.state()}")
    print_pass(f"{csp_subarray_1.name()} is {csp_subarray_1.state()}")
    print_pass(f"{csp_subarray_1.name()} in {str(csp_subarray_1.obsState())}")
    print_pass("Test passed")
else:
    print_fail(f"{csp_controller.name()} is {csp_controller.state()}")
    print_fail(f"{csp_subarray_1.name()} is {csp_subarray_1.state()}")
    print_fail(f"{csp_subarray_1.name()} in {str(csp_subarray_1.obsState)} state")
    print_fail("Test failed")

[1;31mlow-csp/control/0 is ON[0m
[1;31mlow-csp/subarray/01 is ON[0m
[1;31mlow-csp/subarray/01 in obsState.IDLE state[0m
[1;31mTest failed[0m


### Return back to initial state

In [27]:
# temporary step while STANDBY is not working
csp_subarray_1.ReleaseAllResources()
csp_controller.adminMode = AdminMode.OFFLINE
wait_for_attribute_value(csp_controller, "isCommunicating", False)
show_state()

TANGO device: low-csp/control/0
	The device is in DISABLE state.
	adminMode.OFFLINE
	healthState.UNKNOWN
TANGO device: low-csp/subarray/01
	The device is in DISABLE state.
	adminMode.OFFLINE
	healthState.UNKNOWN
	obsState.EMPTY
TANGO device: low-cbf/control/0
	The device is in DISABLE state.
	adminMode.OFFLINE
	healthState.UNKNOWN
TANGO device: low-cbf/subarray/01
	The device is in DISABLE state.
	adminMode.OFFLINE
	healthState.UNKNOWN
	obsState.EMPTY
