# SKA SDP Receive Workflow Testing

## Summary

This notebook covers how to configure and deploy a ska-sdp receive workflow followed by methods for testing the pipeline output.  

## Installation

### Docker
* https://docs.docker.com/get-docker/

### Minikube and Kubectl
* https://minikube.sigs.k8s.io/docs/start/

### Testing Packages
* `pip3 install ska-sdp-cbf-emulator --extra-index-url=https://artefact.skao.int/repository/pypi-all/simple`


## Deploying SKA SDP Standalone and Receive Workflow

Refer to:

https://developer.skao.int/projects/ska-sdp-integration/en/latest/running/standalone.html

https://developer.skao.int/projects/ska-sdp-science-pipelines/en/latest/workflows/vis_receive.html

![Deployment Architecture](architecture.png)

In [54]:
# Start Minikube and bind mount paths
# Kubernetes Cluster will be run from within the minikube docker environment. A mount point from the local filesystem
# must be made here first

import os
import subprocess

# Replace the following path string with the measurement set data location

data_dir = "/data/data"

os.system("minikube delete")
os.system(f"minikube start --driver=docker --mount=true --mount-string {data_dir}:/mnt/data/")

# minikube mounting will create the directory if it doesn't exist, check that data mounted correctly
result = subprocess.run("docker exec minikube ls /mnt/data/".split(), stdout=subprocess.PIPE)
assert result.returncode==0
mounted_data_filenames = result.stdout.decode("unicode_escape").split()
assert len(mounted_data_filenames)>0
print(mounted_data_filenames)

* Deleting "minikube" in docker ...
* Deleting container "minikube" ...
* Removing /home/azureuser/.minikube/machines/minikube ...
* Removed all traces of the "minikube" cluster.
* minikube v1.21.0 on Debian 10.11
* Using the docker driver based on user configuration
* Starting control plane node minikube in cluster minikube
* Pulling base image ...
* Creating docker container (CPUs=2, Memory=16100MB) ...
* Preparing Kubernetes v1.20.7 on Docker 20.10.7 ...
  - Generating certificates and keys ...
  - Booting up control plane ...
  - Configuring RBAC rules ...
* Verifying Kubernetes components...
  - Using image gcr.io/k8s-minikube/storage-provisioner:v5
* Enabled addons: storage-provisioner, default-storageclass
* Done! kubectl is now configured to use "minikube" cluster and "default" namespace by default
['AA05LOW.ms', 'SKA_MID_SIM_custom_B2_dec_-45.0_nominal_nchan100_nominal.ms', 'functest.ms', 'gaussian_beams.ms', 'low_sim.ms', 'low_sim.ms.tar.gz', 'mid_16_tiled.ms', 'mssplit-onech

In [60]:
# Cleanup Minikube SKA SDP
import os
import time
os.system("helm uninstall sdp-standalone")
os.system("kubectl --namespace sdp delete pod --all")
os.system("kubectl --namespace sdp delete pvc --all")
os.system("kubectl --namespace sdp delete pv --all")
os.system("kubectl delete namespace sdp")

release "sdp-standalone" uninstalled
pod "proc-pb-sdpcli-20220223-00000-receive-0" deleted
pod "proc-pb-sdpcli-20220223-00000-receive-1" deleted
pod "proc-pb-sdpcli-20220223-00000-receive-10" deleted
pod "proc-pb-sdpcli-20220223-00000-receive-11" deleted
pod "proc-pb-sdpcli-20220223-00000-receive-12" deleted
pod "proc-pb-sdpcli-20220223-00000-receive-13" deleted
pod "proc-pb-sdpcli-20220223-00000-receive-14" deleted
pod "proc-pb-sdpcli-20220223-00000-receive-15" deleted
pod "proc-pb-sdpcli-20220223-00000-receive-16" deleted
pod "proc-pb-sdpcli-20220223-00000-receive-17" deleted
pod "proc-pb-sdpcli-20220223-00000-receive-18" deleted
pod "proc-pb-sdpcli-20220223-00000-receive-19" deleted
pod "proc-pb-sdpcli-20220223-00000-receive-2" deleted
pod "proc-pb-sdpcli-20220223-00000-receive-3" deleted
pod "proc-pb-sdpcli-20220223-00000-receive-4" deleted
pod "proc-pb-sdpcli-20220223-00000-receive-5" deleted
pod "proc-pb-sdpcli-20220223-00000-receive-6" deleted
pod "proc-pb-sdpcli-20220223-00000-



persistentvolume "local-pv" deleted
namespace "sdp" deleted


0

In [61]:
# Create Namespace and Persistent Volume
import os
import yaml

def kubectl_create(filename, config):
    with open(filename, 'w') as file:
        yaml.dump(config, file, default_flow_style=False)
    os.system(f"kubectl create -f {filename} -n sdp")
    os.remove(filename)
    

# Create Namespace
os.system("kubectl create namespace sdp")
    
# Create Persistent Volume and Persistent Volume Claim
pv_config_filename = "data-pv.yaml"
pv_config = \
{
  "apiVersion": "v1",
  "kind": "PersistentVolume",
  "metadata": {
    "name": "local-pv",
    "namespace": "sdp",
    "labels": {
      "type": "local"
    }
  },
  "spec": {
    "storageClassName": "local-storage",
    "capacity": {
      "storage": "5Gi"
    },
    "volumeMode": "Filesystem",
    "accessModes": [
      "ReadWriteOnce"
    ],
    "local": {
      "path": "/mnt/data/"
    },
    "nodeAffinity": { "required": { "nodeSelectorTerms": [{ "matchExpressions": [{
                "key": "kubernetes.io/hostname",
                "operator": "In",
                "values": [
                  "minikube"
    ]}]}]}}
  }
}
kubectl_create(pv_config_filename, pv_config)

pvc_config_filename = "data-pvc.yaml"
pvc_config = \
{
  "apiVersion": "v1",
  "kind": "PersistentVolumeClaim",
  "metadata": {
    "name": "local-pvc",
    "namespace": "sdp"
  },
  "spec": {
    "storageClassName": "local-storage",
    "accessModes": [
      "ReadWriteOnce"
    ],
    "resources": {
      "requests": {
        "storage": "5Gi"
      }
    },
    "selector": {
      "matchLabels": {
        "type": "local"
      }
    }
  }
}
kubectl_create(pvc_config_filename, pvc_config)

namespace/sdp created
persistentvolume/local-pv created
persistentvolumeclaim/local-pvc created


In [62]:
# Deploy SKA SDP Standalone and wait for the console to become available

import os
import time

os.system("helm install sdp-standalone ska/ska-sdp")
time.sleep(10)
# This can take a long time on a fresh minikube, ~4mins
os.system("kubectl wait --for=condition=Ready pod/sdp-console-0 --timeout=300s")

NAME: sdp-standalone
LAST DEPLOYED: Wed Feb 23 03:12:57 2022
NAMESPACE: default
STATUS: deployed
REVISION: 1
TEST SUITE: None
NOTES:
SKA SDP prototype deployed!

You can connect to the configuration database by running a shell in the
console pod. To start a bash shell, use:

    $ kubectl -n default exec -it sdp-console-0 -- bash

and from there you can use the ska-sdp command, e.g.:

    # ska-sdp list -a

Alternatively to start an iPython shell, use:

    $ kubectl -n default exec -it sdp-console-0 -- ipython

and from there you can use the ska_sdp_config package, e.g.:

    import ska_sdp_config
    config = ska_sdp_config.Config()
pod/sdp-console-0 condition met


0

# Receiver Configuration

In [71]:
# Create a SKA SDP Scheduling Block and Receive Processing Block

import subprocess
import os
import time
import json

# ICRS configuration

sbi_config = \
{
  "scan_types": [
            {
                "id": "science_A",
                "coordinate_system": "ICRS",
                "ra": "02:42:40.771",
                "dec": "-00:00:47.84",
                "channels": [
                    { "count": 4, "start": 0, "stride": 2, "freq_min": 0.35e9, "freq_max": 0.368e9, "link_map": [[0,0], [200,1], [744,2], [944,3]] }
                ]
            },
            {
                "id": "calibration_B",
                "coordinate_system": "ICRS", "ra": "12:29:06.699", "dec": "02:03:08.598",
                "channels": [
                { "count": 4, "start": 0, "stride": 2, "freq_min": 0.35e9, "freq_max": 0.368e9, "link_map": [[0,0], [200,1], [744,2], [944,3]] }
                ]
            }
  ]
}



sbi_config_for_recv = \
{
  "scan_types": [
    {
      "id": "science_A",
      "coordinate_system": "ICRS", "ra": "02:42:40.771", "dec": "-00:00:47.84",
      "channels": [
        { "count": 13824, "start": 0, "stride": 2, "freq_min": 2.61984e+08 , "freq_max": 3.38016e+08, "link_map": [[0,0], [200,1], [744,2], [944,3]] }
      ]
    },
    {
      "id": "calibration_B",
      "coordinate_system": "ICRS", "ra": "12:29:06.699", "dec": "02:03:08.598",
      "channels": [
        { "count": 13824, "start": 0, "stride": 2, "freq_min": 2.61984e+08, "freq_max": 3.38016e+08, "link_map": [[0,0], [200,1], [744,2], [944,3]] }
      ]
    }
  ],
  "mccs":
    { 
        "station_ids":[0,1,2,3]
    }
}


with open('/data/data/sbi_config.json', 'w+') as fp:
    json.dump(sbi_config_for_recv, fp)
    

# Calculate receive parameters based on the model
# these must be also used to configure the sender

#helpers
total_channels = 13824
total_timesteps = 6
total_streams = 4

# required
num_repeats         = 2
rate                = 10416667
channels_per_stream = total_channels // total_streams
max_payloads        = total_timesteps * total_streams * num_repeats
max_payload_misses  = 100
max_ms              = 1 # -1 to continuously run
input_ms = "AA05LOW.ms"
output_ms = "output.ms"
realtime_pb_parameters = \
{
  "image": "artefact.skatelescope.org/ska-sdp-cbf-emulator",
  "version": "1.6.11",
  "model": {
    "name": input_ms
  },
  "transmission": {
    "channels_per_stream": channels_per_stream,
    "rate": str(rate)
  },
  "payload.method": "icd",
  "reader": {
    "num_timestamps": 0,
    "start_chan": 0,
    "num_chan": 0,
    "num_repeats": num_repeats,
  },
  "reception": {
    "outputfilename": f"/mnt/{output_ms}",
    "receiver_port_start": "41000",
    "schedblock": f"/mnt/data/sbi_config.json",  
    #"num_ports": 1,
    "ring_heaps": 133,
    "consumer": "mswriter",
    "continuous_mode": True,
    "max_payloads": max_payloads,
    "timestamp_output": True,
    "command_template": f"mv %%s /mnt/data/%%s" 
  },
  #"rclone.configurl": "https://www.dropbox.com/s/yqmzfs8ovtnonbe/rclone.conf?dl=1",
  #"rclone.command": "gcs://yan-486-bucket/demo.ms",
  "pvc.name": "local-pvc",
  "pvc.path": "/mnt/data",
    
  #"plasmaEnabled": False, # True TODO: Plasma defaults can't be overriden at the moment
  #"plasmaStoreCommand": "/usr/local/bin/plasma_store -s /plasma/socket -m 2000000000",
  #"plasmaProcessorCommand": "plasma_mswriter"
  # Workaround
  
  
}

# Delete existing processing blocks as only a single receive pod can run at a time
p = subprocess.Popen("kubectl exec -it sdp-console-0 -c console -- ska-sdp delete -a pb".split(), stdout=subprocess.PIPE, stdin=subprocess.PIPE)
p.stdin.write(b'yes') 
p.communicate()[0]
p.stdin.close()
# wait for processing block to delete
os.system("kubectl -n sdp delete job --all --grace-period=5")
os.system("kubectl -n sdp delete pod --all --grace-period=5")
os.system("sleep 5")

# Create SBI and PB
print("waiting for receive pod creation...")
result = subprocess.run(f"kubectl exec sdp-console-0 -c console -- ska-sdp create pb realtime:vis_receive:0.3.6 \"{realtime_pb_parameters}\" --sbi=\"{sbi_config}\"",
                        shell=True,
                        stdout=subprocess.PIPE)
assert result.returncode == 0
output = result.stdout.decode("unicode_escape").rstrip()
pb_id = output[output.find('pb-sdp'):]
print(f"pb_id: {pb_id}")
time.sleep(20)
print("waiting for receive pod to be ready...")
os.system(f"kubectl -n sdp wait --for=condition=Ready pod/proc-{pb_id}-receive-0 --timeout=20s")

# Get DNS Name
result = subprocess.run(f"kubectl exec sdp-console-0 -c console -- ska-sdp get /pb/{pb_id}/state".split(), stdout=subprocess.PIPE)
assert result.returncode == 0
full_string = result.stdout.decode("unicode_escape")
json_string = full_string[full_string.find('{'):]
pb_status = json.loads(json_string)
receive_address = pb_status['receive_addresses']['calibration_B']['host'][0][1]
print(f"receive_address: {receive_address}")


os.system(f"kubectl get pods -n sdp proc-{pb_id}-receive-0")

Unable to use a TTY - input is not a terminal or the right kind of file


job.batch "proc-pb-sdpcli-20220223-00000-workflow" deleted
job.batch "sendtest" deleted
pod "proc-pb-sdpcli-20220223-00000-receive-0" deleted
pod "proc-pb-sdpcli-20220223-00000-workflow-ktkg2" deleted
pod "sendtest-4qpk9" deleted
waiting for receive pod creation...
pb_id: pb-sdpcli-20220223-00000
waiting for receive pod to be ready...
pod/proc-pb-sdpcli-20220223-00000-receive-0 condition met
receive_address: proc-pb-sdpcli-20220223-00000-receive-0.receive.sdp.svc.cluster.local
NAME                                      READY   STATUS    RESTARTS   AGE
proc-pb-sdpcli-20220223-00000-receive-0   1/1     Running   0          15s


0

In [66]:
# Debug Pods here
os.system(f"kubectl -n sdp logs proc-{pb_id}-receive-0 receiver")


1|2022-02-23T03:20:48.368Z|INFO|MainThread|__init__|spead2_receivers.py#89||Expecting 13824 channels
1|2022-02-23T03:20:48.369Z|INFO|MainThread|__init__|spead2_receivers.py#91||Creating stream with 4 UDP readers to receive data for 3456 channels
1|2022-02-23T03:20:48.369Z|INFO|MainThread|_setup_streams|spead2_receivers.py#111||Started udp_reader on port 41000
1|2022-02-23T03:20:48.369Z|INFO|MainThread|_setup_streams|spead2_receivers.py#111||Started udp_reader on port 41001
1|2022-02-23T03:20:48.370Z|INFO|MainThread|_setup_streams|spead2_receivers.py#111||Started udp_reader on port 41002
1|2022-02-23T03:20:48.370Z|INFO|MainThread|_setup_streams|spead2_receivers.py#111||Started udp_reader on port 41003


0

# Sender Configuration

In [72]:
# Create test pod to send data to receive pod
import os
import yaml
import json

test_filename = 'test-AA05LOW.yaml'
test_config = \
{
  "apiVersion": "batch/v1",
  "kind": "Job",
  "metadata": {
    "labels": {
      "app.kubernetes.io/name": "sendtest"
    },
    "name": "sendtest",
    "namespace": "sdp"
  },
  "spec": { "template": { "spec": {
    "volumes": [
      {
        "name": "local-pvc",
        "persistentVolumeClaim": {
          "claimName": "local-pvc"
        }
      }
    ],
    "containers": [
      {
        "command": ["/bin/bash", "-c", " ".join([
          'rm -rf /mnt/data/output.ms',
          '&&',
          'emu-send',
          '-o transmission.method=spead2_transmitters',
          f'-o transmission.channels_per_stream={channels_per_stream}',
          f'-o transmission.rate={rate}',
          '-o transmission.target_start_port=41000',
          f' -o transmission.target_host={receive_address}',
          f' -o reader.num_repeats={num_repeats}',
          ' /mnt/data/AA05LOW.ms',
          #'&&',
          #'sleep 5',
          #'&&',
          #f'ms-asserter /mnt/data/AA05LOW.ms/ /mnt/data/output.ms/ --minimal true --repeats {num_repeats}',
        ])],
        "image": "artefact.skao.int/ska-sdp-cbf-emulator:1.6.11",
        "imagePullPolicy": "IfNotPresent",
        "name": "sendtest",
        "resources": {},
        "terminationMessagePath": "/dev/termination-log",
        "terminationMessagePolicy": "File",
        "volumeMounts": [
          {
            "mountPath": "/mnt/data",
            "name": "local-pvc"
          }
        ]
      }
    ],
    "restartPolicy": "Never"
  }}}
}

os.system("kubectl -n sdp delete job sendtest")
kubectl_create(test_filename, test_config)

job.batch/sendtest created


Error from server (NotFound): jobs.batch "sendtest" not found


In [68]:
os.system(f"kubectl -n sdp wait --for=condition=complete job/sendtest --timeout=300s")

print("check status with:")
print("kubectl -n sdp logs job.batch/sendtest")
print(f"kubectl -n sdp logs proc-{pb_id}-receive-0")

print(f"input measurement set: {data_dir}/AA05LOW.ms")
print(f"output measurement set: {data_dir}/output.ms")

job.batch/sendtest condition met
check status with:
kubectl -n sdp logs job.batch/sendtest
kubectl -n sdp logs proc-pb-sdpcli-20220223-00000-receive-0
input measurement set: /data/data/AA05LOW.ms
output measurement set: /data/data/output.ms


# Test MS Output
Both input and output measurement sets are available on the local filesystem at the above path  

In [None]:
# Test output
os.system(f"showtableinfo in={data_dir}/AA05LOW.ms/")
os.system(f"showtableinfo in={data_dir}/output.ms/")
os.system(f"ms-asserter {data_dir}/AA05LOW.ms {data_dir}/output.ms --minimal true --repeats {num_repeats}")