diff options
author | Steven Koo <steven.koo@ni.com> | 2022-06-20 10:47:18 -0500 |
---|---|---|
committer | skooNI <60897865+skooNI@users.noreply.github.com> | 2022-07-20 15:57:20 -0500 |
commit | 770711c40a482d1e87d75393dd3fe95d75efa379 (patch) | |
tree | 87e877dffdc4740c47904caa1118fa267b89f0c5 | |
parent | a3ca7796b75ab17df4980e76749dfaa3cf6ea110 (diff) | |
download | uhd-770711c40a482d1e87d75393dd3fe95d75efa379.tar.gz uhd-770711c40a482d1e87d75393dd3fe95d75efa379.tar.bz2 uhd-770711c40a482d1e87d75393dd3fe95d75efa379.zip |
ci: add devtest e320 support
This commit adds devtest support for e320 via tftp. The e320 has
a hardware incompatibility with sdmuxes that we use for the n3xx
devices, which makes them unreliable. Instead this loads a small
Linux OS into the e320 system memory and reimages the sd card
from there.
Signed-off-by: Steven Koo <steven.koo@ni.com>
-rw-r--r-- | .ci/templates/job-uhd-devtest-rhombus.yml | 19 | ||||
-rw-r--r-- | .ci/templates/job-uhd-devtest.yml | 41 | ||||
-rw-r--r-- | .ci/templates/stages-uhd-pipeline.yml | 6 | ||||
-rwxr-xr-x | .ci/templates/tests/rhombus-labgrid/crossbar/places.yaml | 16 | ||||
-rw-r--r-- | .ci/templates/tests/rhombus-labgrid/device-configs/rhombus-e320-0.yml | 14 | ||||
-rwxr-xr-x | .ci/templates/tests/rhombus-labgrid/exporter-conf/exporter.yaml | 13 | ||||
-rw-r--r-- | .ci/utils/httpd.py | 63 | ||||
-rw-r--r-- | .ci/utils/mutex_hardware.py | 124 | ||||
-rw-r--r-- | .ci/utils/tftp.py | 98 |
9 files changed, 380 insertions, 14 deletions
diff --git a/.ci/templates/job-uhd-devtest-rhombus.yml b/.ci/templates/job-uhd-devtest-rhombus.yml index 1395b0de0..3348e4fff 100644 --- a/.ci/templates/job-uhd-devtest-rhombus.yml +++ b/.ci/templates/job-uhd-devtest-rhombus.yml @@ -10,7 +10,7 @@ parameters: default: current - name: testDevices type: string - default: 'x3xx,b2xx,n3xx' + default: 'x3xx,b2xx,n3xx,e320' jobs: - template: job-uhd-devtest.yml @@ -136,3 +136,20 @@ jobs: devtestPattern: 'n3x0' devSDImage: gnuradio-image-ni-sulfur-rev11-mender.sdimg.bz2 devLabgridConfig: .ci/templates/tests/rhombus-labgrid/device-configs/rhombus-n321-0.yml + + ${{ if contains(parameters.testDevices, 'e320') }}: + rhombus-e320-0: + devAgent: rhombus-e320-0 + devType: 'e3xx' + devModel: 'e320' + devName: rhombus-e320-0 + devSerial: '31A8171' + devHostname: 'ni-e320-31a8171' + devBus: 'ip' + devAddr: '192.168.20.7' + sfpAddrs: '192.168.20.7' + devFpga: 'XG' + devtestPattern: 'e320' + devInitramfsImage: fitImage-manufacturing + devSDImage: gnuradio-image-ni-neon-rev2-mender.sdimg.bz2 + devLabgridConfig: .ci/templates/tests/rhombus-labgrid/device-configs/rhombus-e320-0.yml diff --git a/.ci/templates/job-uhd-devtest.yml b/.ci/templates/job-uhd-devtest.yml index 2b284c7ab..3e095c5f3 100644 --- a/.ci/templates/job-uhd-devtest.yml +++ b/.ci/templates/job-uhd-devtest.yml @@ -67,18 +67,30 @@ jobs: cleanDestinationFolder: true - download: ${{ parameters.uhdArtifactSource }} - artifact: $(devType)-images + artifact: n3xx-images # Only sync the bz2 sdimg since the bmap # is incompatible with mender - patterns: '**/*.bz2' + patterns: | + **/*.bz2 + fitImage-manufacturing displayName: Download $(devType)-images artifact condition: and(succeeded(), eq(variables.devType, 'n3xx')) + - download: ${{ parameters.uhdArtifactSource }} + artifact: e320-images + # Only sync the bz2 sdimg since the bmap + # is incompatible with mender + patterns: | + **/*.bz2 + fitImage-manufacturing + displayName: Download $(devType)-images artifact + condition: and(succeeded(), eq(variables.devModel, 'e320')) + - script: | cd $(Build.BinariesDirectory)/uhddev/build mkdir -p fpga_images rm -rf fpga_images/* - python3 utils/uhd_images_downloader.py -t $(devModel) -i fpga_images \ + python3 utils/uhd_images_downloader.py -t $(devModel)_fpga -i fpga_images \ -b $(sdr-fileserver) if [ "$(devType)" = "b200" ]; then python3 utils/uhd_images_downloader.py -t b2xx_common -i fpga_images \ @@ -93,7 +105,7 @@ jobs: export LD_LIBRARY_PATH=$(Build.BinariesDirectory)/uhddev/build/lib:$LD_LIBRARY_PATH export UHD_IMAGES_DIR=$(Build.BinariesDirectory)/uhddev/build/fpga_images python3 ${{ parameters.uhdSrcDir }}/.ci/utils/mutex_hardware.py \ - --sdimage $(devType),$(devModel),$(uhd_artifact_directory)/$(devType)-images/$(devSDImage),${{ parameters.uhdSrcDir }}/$(devLabgridConfig),$(devHostname) \ + --sdimage_sdmux $(devType),$(devModel),$(uhd_artifact_directory)/$(devType)-images/$(devSDImage),${{ parameters.uhdSrcDir }}/$(devLabgridConfig),$(devHostname) \ --fpgas $(devFpga) \ --sfp_addrs $(sfpAddrs) \ ${{ parameters.redisHost }} $(devName) \ @@ -114,6 +126,27 @@ jobs: export LD_LIBRARY_PATH=$(Build.BinariesDirectory)/uhddev/build/lib:$LD_LIBRARY_PATH export UHD_IMAGES_DIR=$(Build.BinariesDirectory)/uhddev/build/fpga_images python3 ${{ parameters.uhdSrcDir }}/.ci/utils/mutex_hardware.py \ + --sdimage_tftp $(devType),$(devModel),$(uhd_artifact_directory)/e320-images/$(devSDImage),$(uhd_artifact_directory)/e320-images/$(devInitramfsImage),${{ parameters.uhdSrcDir }}/$(devLabgridConfig) \ + --fpgas $(devFpga) \ + --sfp_addrs $(sfpAddrs) \ + ${{ parameters.redisHost }} $(devName) \ + "$(Build.BinariesDirectory)/uhddev/build/utils/uhd_usrp_probe --args addr=$(devAddr)" \ + "python3 ${{ parameters.uhdSrcDir }}/host/tests/devtest/run_testsuite.py \ + --src-dir ${{ parameters.uhdSrcDir }}/host/tests/devtest \ + --devtest-pattern $(devtestPattern) --args addr=$(devAddr),type=$(devType) \ + --build-type Release --build-dir $(Build.BinariesDirectory)/uhddev/build \ + --python-interp python3 --xml" + continueOnError: true + condition: and(succeeded(), eq(variables.devModel, 'e320'), eq(variables.devBus, 'ip')) + displayName: Run e320 devtest on $(devName) + + - script: | + mkdir -p $(Common.TestResultsDirectory)/devtest + cd $(Common.TestResultsDirectory)/devtest + export PATH=$(Build.BinariesDirectory)/uhddev/build/utils:$(Build.BinariesDirectory)/uhddev/build/examples:$PATH + export LD_LIBRARY_PATH=$(Build.BinariesDirectory)/uhddev/build/lib:$LD_LIBRARY_PATH + export UHD_IMAGES_DIR=$(Build.BinariesDirectory)/uhddev/build/fpga_images + python3 ${{ parameters.uhdSrcDir }}/.ci/utils/mutex_hardware.py \ ${{ parameters.redisHost }} $(devName) \ "$(Build.BinariesDirectory)/uhddev/build/utils/uhd_usrp_probe --args serial=$(devSerial)" \ "$(Build.BinariesDirectory)/uhddev/build/utils/uhd_image_loader --args serial=$(devSerial),type=$(devType)" \ diff --git a/.ci/templates/stages-uhd-pipeline.yml b/.ci/templates/stages-uhd-pipeline.yml index 0b30012b4..a93ec5185 100644 --- a/.ci/templates/stages-uhd-pipeline.yml +++ b/.ci/templates/stages-uhd-pipeline.yml @@ -238,8 +238,8 @@ stages: uhdSrcDir: $(Build.SourcesDirectory) testDevices: 'x3xx,b2xx' -- stage: devtest_uhd_n3xx_stage - displayName: devtest UHD n3xx +- stage: devtest_uhd_n3xx_e320_stage + displayName: devtest UHD n3xx e320 dependsOn: - build_uhd_stage_linux - build_uhd_embedded_system_images @@ -248,7 +248,7 @@ stages: parameters: testOS: ubuntu2004 uhdSrcDir: $(Build.SourcesDirectory) - testDevices: 'n3xx' + testDevices: 'n3xx,e320' - stage: test_uhd_x4xx_stage displayName: Test UHD x4xx diff --git a/.ci/templates/tests/rhombus-labgrid/crossbar/places.yaml b/.ci/templates/tests/rhombus-labgrid/crossbar/places.yaml index baf68bbd1..2fbf5052b 100755 --- a/.ci/templates/tests/rhombus-labgrid/crossbar/places.yaml +++ b/.ci/templates/tests/rhombus-labgrid/crossbar/places.yaml @@ -32,3 +32,19 @@ rhombus-n321-0: reservation: null tags: {} +rhombus-e320-0: + acquired: null + acquired_resources: [] + aliases: [] + allowed: [] + changed: 1654034475.1935894 + comment: '' + created: 1654034136.0077882 + matches: + - cls: '*' + exporter: '*' + group: rhombus-e320-0-group + name: null + rename: null + reservation: null + tags: {} diff --git a/.ci/templates/tests/rhombus-labgrid/device-configs/rhombus-e320-0.yml b/.ci/templates/tests/rhombus-labgrid/device-configs/rhombus-e320-0.yml new file mode 100644 index 000000000..e9c934817 --- /dev/null +++ b/.ci/templates/tests/rhombus-labgrid/device-configs/rhombus-e320-0.yml @@ -0,0 +1,14 @@ +targets: + main: + resources: + RemotePlace: + name: 'rhombus-e320-0' + drivers: + - SerialDriver: + name: 'linux_serial_driver' + bindings: + port: 'console-linux' + - SerialDriver: + name: 'scu_serial_driver' + bindings: + port: 'console-scu' diff --git a/.ci/templates/tests/rhombus-labgrid/exporter-conf/exporter.yaml b/.ci/templates/tests/rhombus-labgrid/exporter-conf/exporter.yaml index f13ebbe07..c15385aac 100755 --- a/.ci/templates/tests/rhombus-labgrid/exporter-conf/exporter.yaml +++ b/.ci/templates/tests/rhombus-labgrid/exporter-conf/exporter.yaml @@ -32,3 +32,16 @@ rhombus-n321-0-group: match: ID_SERIAL_SHORT: '000000001140' +rhombus-e320-0-group: + console-scu: + cls: USBSerialPort + match: + ID_SERIAL: 'Silicon_Labs_CP2105_Dual_USB_to_UART_Bridge_Controller_0097841B' + ID_USB_INTERFACE_NUM: '00' + speed: 115200 + console-linux: + cls: USBSerialPort + match: + ID_SERIAL: 'Silicon_Labs_CP2105_Dual_USB_to_UART_Bridge_Controller_0097841B' + ID_USB_INTERFACE_NUM: '01' + speed: 115200 diff --git a/.ci/utils/httpd.py b/.ci/utils/httpd.py new file mode 100644 index 000000000..53741566e --- /dev/null +++ b/.ci/utils/httpd.py @@ -0,0 +1,63 @@ +import http.server +import time +import os +import pyroute2 +import socket +import socketserver +import threading +from functools import partial +from pathlib import Path + +class ThreadingHTTPServer(socketserver.ThreadingMixIn, http.server.HTTPServer): + pass + +class HTTPServer: + def __init__(self, path, remote_ip): + self.path = path + self.port = None + self.old_path = None + self.httpd = None + + with pyroute2.IPRoute() as ipr: + r = ipr.route('get', dst=remote_ip) + for attr in r[0]['attrs']: + if attr[0] == 'RTA_PREFSRC': + self.ip = attr[1] + with socket.socket() as s: + s.bind(('', 0)) + self.port = s.getsockname()[1] + + def get_url(self, filename): + path = Path(self.path) / filename + assert path.exists() + return f"http://{self.ip}:{self.port}/{filename}" + + def __enter__(self): + def start_server(): + Handler = http.server.SimpleHTTPRequestHandler + self.httpd = ThreadingHTTPServer(("", self.port), Handler) + self.httpd.serve_forever() + + # Kind of annoying, but to work with older pythons where + # SimpleHTTPRequestHandler doesn't take a directory parameter but only + # serves the current directory: + self.old_path = os.getcwd() + os.chdir(self.path) + + self.thread = threading.Thread(target=start_server) + self.thread.start() + return self + + def __exit__(self, type, value, exc): + if self.httpd is not None: + self.httpd.shutdown() + self.httpd.server_close() + if self.old_path is not None: + os.chdir(self.old_path) + +if __name__ == '__main__': + with HTTPServer("/tmp", "127.0.0.1") as server: + print("server ip", server.ip) + print("server port", server.port) + time.sleep(300) + diff --git a/.ci/utils/mutex_hardware.py b/.ci/utils/mutex_hardware.py index 485f0fcbd..9f48ed22e 100644 --- a/.ci/utils/mutex_hardware.py +++ b/.ci/utils/mutex_hardware.py @@ -8,17 +8,22 @@ import labgrid import os import pathlib import shlex +import socket import subprocess import sys import time from fabric import Connection +from httpd import HTTPServer from pottery import Redlock from redis import Redis +from tftp import TFTPServer bitfile_name = "usrp_{}_fpga_{}.bit" def jtag_x3xx(dev_type, dev_model, jtag_server, jtag_serial, fpga_folder, fpga, redis_server): + if dev_model not in ["x300", "x310"]: + raise RuntimeError(f'{dev_type} not supported with jtag_x3xx') remote_working_dir = "pipeline_fpga" vivado_program_jtag = "/opt/Xilinx/Vivado_Lab/2020.1/bin/vivado_lab -mode batch -source {}/viv_hardware_utils.tcl -nolog -nojournal -tclargs program".format( remote_working_dir) @@ -50,7 +55,12 @@ def set_sfp_addrs(mgmt_addr, sfp_addrs): dut.run(f"ip link set sfp{idx} up") time.sleep(30) -def flash_sdimage(dev_model, sdimage_path, labgrid_device_yaml, mgmt_addr, sfp_addrs): +def flash_sdimage_sdmux(dev_model, sdimage_path, labgrid_device_yaml, mgmt_addr, sfp_addrs): + """ This method uses an sdmux (https://linux-automation.com/en/products/usb-sd-mux.html) + to reimage the sd card. + """ + if dev_model not in ["n300", "n310", "n320", "n321"]: + raise RuntimeError(f'{dev_model} not supported with sdimage_sdmux') subprocess.run(shlex.split(f"labgrid-client -c {labgrid_device_yaml} release --kick")) subprocess.run(shlex.split(f"labgrid-client -c {labgrid_device_yaml} acquire")) env = labgrid.Environment(labgrid_device_yaml) @@ -101,6 +111,97 @@ def flash_sdimage(dev_model, sdimage_path, labgrid_device_yaml, mgmt_addr, sfp_a subprocess.run(shlex.split(f"labgrid-client -c {labgrid_device_yaml} release")) +def flash_sdimage_tftp(dev_model, sdimage_path, initramfs_path, labgrid_device_yaml, sfp_addrs, redis_server): + """ This method uses tftp to boot the device into a small Linux envionment to + write to the device's sd card. This method is used on the E320 since it has + a hardware incompatibility with sdmuxes. + """ + if dev_model not in ["e320"]: + raise RuntimeError(f'{dev_model} not supported with sdimage_tftp') + + if dev_model == "e320": + dev_ram_address = '0x20000000' + dev_bootm_config = 'conf@zynq-ni-${mboard}.dtb' + + subprocess.run(shlex.split(f"labgrid-client -c {labgrid_device_yaml} release --kick")) + subprocess.run(shlex.split(f"labgrid-client -c {labgrid_device_yaml} acquire")) + env = labgrid.Environment(labgrid_device_yaml) + target = env.get_target() + + cp_scu = target.get_driver(labgrid.protocol.ConsoleProtocol, name="scu_serial_driver") + cp_linux = target.get_driver(labgrid.protocol.ConsoleProtocol, name="linux_serial_driver") + + print("Powering down DUT", flush=True) + cp_scu.write("\napshutdown\n".encode()) + time.sleep(10) + + print("Powering on DUT", flush=True) + cp_scu.write("\npowerbtn\n".encode()) + # Sometimes it requires multiple powerbtn calls to turn on device + try: + cp_linux.expect("Enter 'noautoboot' to enter prompt without timeout", timeout=5) + except Exception: + print("Device didn't power on with first attempt. Trying again...", flush=True) + cp_scu.write("\npowerbtn\n".encode()) + cp_linux.expect("Enter 'noautoboot' to enter prompt without timeout", timeout=5) + + print("Attempting to get into uboot console", flush=True) + cp_linux.write("noautoboot".encode()) + # Handle if the watchdog triggers + try: + cp_linux.expect("Enter 'noautoboot' to enter prompt without timeout", timeout=30) + cp_linux.write("noautoboot".encode()) + except Exception: + pass + cp_linux.expect("uboot>") + print("Waiting for NIC to come up", flush=True) + time.sleep(10) + cp_linux.write(f"setenv autoload no; dhcp;\n".encode()) + cp_linux.expect("DHCP client bound to address") + expect_index, expect_before, expect_match , expect_after = cp_linux.expect(r"\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b") + mgmt_addr = expect_match[0].decode() + print(f"Dev got IP Address {mgmt_addr}") + + with TFTPServer(initramfs_path, mgmt_addr) as server: + time.sleep(10) + cp_linux.expect("uboot>") + cp_linux.write(f"setenv tftpdstp {server.port}\n".encode()) + cp_linux.expect("uboot>") + print("TFTPing initramfs image", flush=True) + cp_linux.write(f"tftpboot {dev_ram_address} {server.ip}:{os.path.basename(initramfs_path)}\n".encode()) + cp_linux.expect("uboot>", timeout=120) + print("Booting into initramfs", flush=True) + cp_linux.write(f"bootm {dev_ram_address}#{dev_bootm_config}\n".encode()) + cp_linux.expect("mender login:", timeout=120) + print("Logging into Linux", flush=True) + cp_linux.write("root\n".encode()) + cp_linux.expect("mender:~#") + print("Waiting for NIC to DHCP", flush=True) + time.sleep(10) + + with HTTPServer(os.path.dirname(sdimage_path), mgmt_addr) as server: + print(f"Writing SD Card using {sdimage_path}", flush=True) + print("Running bmaptool... This will take awhile", flush=True) + cp_linux.write(f"bmaptool copy --nobmap {server.get_url(os.path.basename(sdimage_path))} /dev/mmcblk0\n".encode()) + cp_linux.expect("mender:~#", timeout=1800) + cp_linux.write("echo bmaptool exit code: $?\n".encode()) + cp_linux.expect("bmaptool exit code: 0", timeout=10) + time.sleep(10) + print("Rebooting into new image from sd card", flush=True) + cp_linux.write("reboot\n".encode()) + + print("Waiting 2 minutes for device to boot", flush=True) + time.sleep(120) + cp_linux.expect("login:", timeout=30) + known_hosts_path = os.path.expanduser("~/.ssh/known_hosts") + subprocess.run(shlex.split(f"ssh-keygen -f \"{known_hosts_path}\" -R \"{mgmt_addr}\"")) + + if sfp_addrs: + set_sfp_addrs(mgmt_addr, sfp_addrs) + + subprocess.run(shlex.split(f"labgrid-client -c {labgrid_device_yaml} release")) + return mgmt_addr + def main(args): redis_server = {Redis.from_url( "redis://{}:6379/0".format(args.redis_server))} @@ -108,13 +209,21 @@ def main(args): with Redlock(key=args.dut_name, masters=redis_server, auto_release_time=1000 * 60 * args.dut_timeout): print("Got mutex for {}".format(args.dut_name), flush=True) - if args.sdimage: - dev_type, dev_model, sdimage_path, labgrid_device_yaml, mgmt_addr = args.sdimage.split(',') + if args.sdimage_sdmux: + dev_type, dev_model, sdimage_path, labgrid_device_yaml, mgmt_addr = args.sdimage_sdmux.split(',') + if args.sfp_addrs: + sfp_addrs = args.sfp_addrs.split(',') + else: + sfp_addrs = None + flash_sdimage_sdmux(dev_model, sdimage_path, labgrid_device_yaml, mgmt_addr, sfp_addrs) + + if args.sdimage_tftp: + dev_type, dev_model, sdimage_path, initramfs_path, labgrid_device_yaml = args.sdimage_tftp.split(',') if args.sfp_addrs: sfp_addrs = args.sfp_addrs.split(',') else: sfp_addrs = None - flash_sdimage(dev_model, sdimage_path, labgrid_device_yaml, mgmt_addr, sfp_addrs) + mgmt_addr = flash_sdimage_tftp(dev_model, sdimage_path, initramfs_path, labgrid_device_yaml, sfp_addrs, redis_server) if args.fpgas: working_dir = os.getcwd() @@ -146,14 +255,17 @@ def main(args): if __name__ == "__main__": parser = argparse.ArgumentParser() + group = parser.add_mutually_exclusive_group() # jtag_x3xx will flash the fpga for a given jtag_serial using # Vivado on jtag_server. It uses SSH to control jtag_server. # Provide fpga_path as a local path and it will be copied # to jtag_server. - parser.add_argument("--jtag_x3xx", type=str, + group.add_argument("--jtag_x3xx", type=str, help="dev_type,dev_model,user@jtag_server,jtag_serial,fpga_folder") - parser.add_argument("--sdimage", type=str, + group.add_argument("--sdimage_sdmux", type=str, help="dev_type,dev_model,sdimg_path,labgrid_device_yaml,mgmt_addr") + group.add_argument("--sdimage_tftp", type=str, + help="dev_type,dev_model,sdimg_path,initramfs_path,labgrid_device_yaml") parser.add_argument("--sfp_addrs", type=str, help="sfp0ip,sfp1ip,...") parser.add_argument("--fpgas", type=str, diff --git a/.ci/utils/tftp.py b/.ci/utils/tftp.py new file mode 100644 index 000000000..44291cd93 --- /dev/null +++ b/.ci/utils/tftp.py @@ -0,0 +1,98 @@ +#!/usr/bin/env python3 + +import asyncio +import py3tftp.protocols +import pyroute2 +import socket +import threading +from pathlib import Path + + +class FileReaderSingle: + def __init__(self, path, fname_req, chunk_size=0): + self.path = path + # TODO: Should check fname_req against actual name + self.chunk_size = chunk_size + self._f = None + self._f = open(self.path, 'rb') + self.finished = False + + def file_size(self): + return self.path.stat().st_size + + def read_chunk(self, size=None): + size = size or self.chunk_size + if self.finished: + return b'' + + data = self._f.read(size) + if not data or (size > 0 and len(data) < size): + self._f.close() + self.finished = True + + return data + + def __del__(self): + if self._f and not self._f.closed: + self._f.close() + + +class TFTPServerSingle(py3tftp.protocols.BaseTFTPServerProtocol): + def __init__(self, path, host_interface, loop, extra_opts): + super().__init__(host_interface, loop, extra_opts) + self.path = path + + def select_protocol(self, packet): + if packet.is_rrq(): + return py3tftp.protocols.RRQProtocol + raise py3tftp.protocols.ProtocolException("Unhandled protocol") + + def select_file_handler(self, packet): + if packet.is_rrq(): + return lambda filename, opts: FileReaderSingle(self.path, filename, opts) + + +class TFTPServer: + """ + Simple TFTP server, meant to be short-lived and capable of serving a single + file only + """ + def __init__(self, filename, remote_ip, port=None): + self.path = Path(filename).absolute() + assert self.path.exists() + assert self.path.is_file() + + self.filename = self.path.name + + if port == None: + with socket.socket() as s: + s.bind(('', 0)) + self.port = s.getsockname()[1] + else: + self.port = port + + with pyroute2.IPRoute() as ipr: + r = ipr.route('get', dst=remote_ip) + for attr in r[0]['attrs']: + if attr[0] == 'RTA_PREFSRC': + self.ip = attr[1] + + def __enter__(self): + self.loop = asyncio.new_event_loop() + listen = self.loop.create_datagram_endpoint( + lambda: TFTPServerSingle(self.path, self.ip, self.loop, {}), + local_addr=(self.ip, self.port)) + + def start_loop(loop): + asyncio.set_event_loop(loop) + loop.run_forever() + + self.transport, protocol = self.loop.run_until_complete(listen) + self.thread = threading.Thread(target=start_loop, args=(self.loop,)) + self.thread.start() + return self + + def __exit__(self, type, value, exc): + self.transport.close() + self.loop.call_soon_threadsafe(self.loop.stop) + self.thread.join() |