summaryrefslogtreecommitdiff
path: root/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests
diff options
context:
space:
mode:
Diffstat (limited to 'tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests')
-rw-r--r--tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/__init__.py59
-rw-r--r--tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/default_auto.py36
-rw-r--r--tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/detect_auto.py55
-rw-r--r--tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/dev_null_auto.py50
-rw-r--r--tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/echo.py59
-rw-r--r--tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/echo_flow_control.py48
-rw-r--r--tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/example/BroadcastReceive.py25
-rw-r--r--tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/example/BroadcastSend.py30
-rw-r--r--tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/example/MulticastReceive.py31
-rw-r--r--tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/example/MulticastSend.py30
-rw-r--r--tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/example/TCPEchoClient.py28
-rw-r--r--tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/example/TCPEchoServer.py30
-rw-r--r--tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/example/UDPEchoClient.py28
-rw-r--r--tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/example/UDPEchoServer.py27
-rw-r--r--tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/example/__init__.py16
-rw-r--r--tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/hello_auto.py34
-rw-r--r--tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/host_registry.py36
-rw-r--r--tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/host_test.py397
-rw-r--r--tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/host_tests_plugins/__init__.py68
-rw-r--r--tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/host_tests_plugins/host_test_plugins.py118
-rw-r--r--tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/host_tests_plugins/host_test_registry.py89
-rw-r--r--tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/host_tests_plugins/module_copy_firefox.py76
-rw-r--r--tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/host_tests_plugins/module_copy_mbed.py71
-rw-r--r--tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/host_tests_plugins/module_copy_mps2.py107
-rw-r--r--tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/host_tests_plugins/module_copy_shell.py64
-rw-r--r--tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/host_tests_plugins/module_copy_silabs.py61
-rw-r--r--tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/host_tests_plugins/module_reset_mbed.py72
-rw-r--r--tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/host_tests_plugins/module_reset_mps2.py74
-rw-r--r--tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/host_tests_plugins/module_reset_silabs.py66
-rw-r--r--tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/mbedrpc.py287
-rw-r--r--tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/midi.py72
-rw-r--r--tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/net_test.py27
-rw-r--r--tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/rpc.py56
-rw-r--r--tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/rtc_auto.py50
-rw-r--r--tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/stdio_auto.py56
-rw-r--r--tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/tcpecho_client.py57
-rw-r--r--tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/tcpecho_client_auto.py87
-rw-r--r--tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/tcpecho_server.py50
-rw-r--r--tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/tcpecho_server_auto.py84
-rw-r--r--tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/tcpecho_server_loop.py40
-rw-r--r--tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/udp_link_layer_auto.py145
-rw-r--r--tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/udpecho_client.py55
-rw-r--r--tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/udpecho_client_auto.py77
-rw-r--r--tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/udpecho_server.py29
-rw-r--r--tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/udpecho_server_auto.py68
-rw-r--r--tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/wait_us_auto.py69
46 files changed, 3194 insertions, 0 deletions
diff --git a/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/__init__.py b/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/__init__.py
new file mode 100644
index 0000000000..cae0e20908
--- /dev/null
+++ b/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/__init__.py
@@ -0,0 +1,59 @@
+"""
+mbed SDK
+Copyright (c) 2011-2013 ARM Limited
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+
+from host_registry import HostRegistry
+
+# Host test supervisors
+from echo import EchoTest
+from rtc_auto import RTCTest
+from stdio_auto import StdioTest
+from hello_auto import HelloTest
+from detect_auto import DetectPlatformTest
+from default_auto import DefaultAuto
+from dev_null_auto import DevNullTest
+from wait_us_auto import WaitusTest
+from tcpecho_server_auto import TCPEchoServerTest
+from udpecho_server_auto import UDPEchoServerTest
+from tcpecho_client_auto import TCPEchoClientTest
+from udpecho_client_auto import UDPEchoClientTest
+
+# Populate registry with supervising objects
+HOSTREGISTRY = HostRegistry()
+HOSTREGISTRY.register_host_test("echo", EchoTest())
+HOSTREGISTRY.register_host_test("default", DefaultAuto())
+HOSTREGISTRY.register_host_test("rtc_auto", RTCTest())
+HOSTREGISTRY.register_host_test("hello_auto", HelloTest())
+HOSTREGISTRY.register_host_test("stdio_auto", StdioTest())
+HOSTREGISTRY.register_host_test("detect_auto", DetectPlatformTest())
+HOSTREGISTRY.register_host_test("default_auto", DefaultAuto())
+HOSTREGISTRY.register_host_test("wait_us_auto", WaitusTest())
+HOSTREGISTRY.register_host_test("dev_null_auto", DevNullTest())
+HOSTREGISTRY.register_host_test("tcpecho_server_auto", TCPEchoServerTest())
+HOSTREGISTRY.register_host_test("udpecho_server_auto", UDPEchoServerTest())
+HOSTREGISTRY.register_host_test("tcpecho_client_auto", TCPEchoClientTest())
+HOSTREGISTRY.register_host_test("udpecho_client_auto", UDPEchoClientTest())
+
+###############################################################################
+# Functional interface for test supervisor registry
+###############################################################################
+
+
+def get_host_test(ht_name):
+ return HOSTREGISTRY.get_host_test(ht_name)
+
+def is_host_test(ht_name):
+ return HOSTREGISTRY.is_host_test(ht_name)
diff --git a/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/default_auto.py b/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/default_auto.py
new file mode 100644
index 0000000000..0883d79d53
--- /dev/null
+++ b/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/default_auto.py
@@ -0,0 +1,36 @@
+"""
+mbed SDK
+Copyright (c) 2011-2013 ARM Limited
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+
+from sys import stdout
+
+class DefaultAuto():
+ """ Simple, basic host test's test runner waiting for serial port
+ output from MUT, no supervision over test running in MUT is executed.
+ """
+ def test(self, selftest):
+ result = selftest.RESULT_SUCCESS
+ try:
+ while True:
+ c = selftest.mbed.serial_read(512)
+ if c is None:
+ return selftest.RESULT_IO_SERIAL
+ stdout.write(c)
+ stdout.flush()
+ except KeyboardInterrupt, _:
+ selftest.notify("\r\n[CTRL+C] exit")
+ result = selftest.RESULT_ERROR
+ return result
diff --git a/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/detect_auto.py b/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/detect_auto.py
new file mode 100644
index 0000000000..2999946c08
--- /dev/null
+++ b/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/detect_auto.py
@@ -0,0 +1,55 @@
+"""
+mbed SDK
+Copyright (c) 2011-2013 ARM Limited
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+
+import re
+
+class DetectPlatformTest():
+ PATTERN_MICRO_NAME = "Target '(\w+)'"
+ re_detect_micro_name = re.compile(PATTERN_MICRO_NAME)
+
+ def test(self, selftest):
+ result = True
+
+ c = selftest.mbed.serial_readline() # {{start}} preamble
+ if c is None:
+ return selftest.RESULT_IO_SERIAL
+
+ selftest.notify(c.strip())
+ selftest.notify("HOST: Detecting target name...")
+
+ c = selftest.mbed.serial_readline()
+ if c is None:
+ return selftest.RESULT_IO_SERIAL
+ selftest.notify(c.strip())
+
+ # Check for target name
+ m = self.re_detect_micro_name.search(c)
+ if m and len(m.groups()):
+ micro_name = m.groups()[0]
+ micro_cmp = selftest.mbed.options.micro == micro_name
+ result = result and micro_cmp
+ selftest.notify("HOST: MUT Target name '%s', expected '%s'... [%s]"% (micro_name,
+ selftest.mbed.options.micro,
+ "OK" if micro_cmp else "FAIL"))
+
+ for i in range(0, 2):
+ c = selftest.mbed.serial_readline()
+ if c is None:
+ return selftest.RESULT_IO_SERIAL
+ selftest.notify(c.strip())
+
+ return selftest.RESULT_SUCCESS if result else selftest.RESULT_FAILURE
diff --git a/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/dev_null_auto.py b/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/dev_null_auto.py
new file mode 100644
index 0000000000..4538f6d79e
--- /dev/null
+++ b/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/dev_null_auto.py
@@ -0,0 +1,50 @@
+"""
+mbed SDK
+Copyright (c) 2011-2013 ARM Limited
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+
+class DevNullTest():
+
+ def check_readline(self, selftest, text):
+ """ Reads line from serial port and checks if text was part of read string
+ """
+ result = False
+ c = selftest.mbed.serial_readline()
+ if c and text in c:
+ result = True
+ return result
+
+ def test(self, selftest):
+ result = True
+ # Test should print some text and later stop printing
+ # 'MBED: re-routing stdout to /null'
+ res = self.check_readline(selftest, "re-routing stdout to /null")
+ if not res:
+ # We haven't read preamble line
+ result = False
+ else:
+ # Check if there are printed characters
+ str = ''
+ for i in range(3):
+ c = selftest.mbed.serial_read(32)
+ if c is None:
+ return selftest.RESULT_IO_SERIAL
+ else:
+ str += c
+ if len(str) > 0:
+ result = False
+ break
+ selftest.notify("Received %d bytes: %s"% (len(str), str))
+ return selftest.RESULT_SUCCESS if result else selftest.RESULT_FAILURE
diff --git a/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/echo.py b/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/echo.py
new file mode 100644
index 0000000000..75e534fb84
--- /dev/null
+++ b/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/echo.py
@@ -0,0 +1,59 @@
+"""
+mbed SDK
+Copyright (c) 2011-2013 ARM Limited
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+
+import sys
+import uuid
+from sys import stdout
+
+class EchoTest():
+
+ # Test parameters
+ TEST_SERIAL_BAUDRATE = 115200
+ TEST_LOOP_COUNT = 50
+
+ def test(self, selftest):
+ """ This host test will use mbed serial port with
+ baudrate 115200 to perform echo test on that port.
+ """
+ # Custom initialization for echo test
+ selftest.mbed.init_serial_params(serial_baud=self.TEST_SERIAL_BAUDRATE)
+ selftest.mbed.init_serial()
+
+ # Test function, return True or False to get standard test notification on stdout
+ selftest.mbed.flush()
+ selftest.notify("HOST: Starting the ECHO test")
+ result = True
+
+ """ This ensures that there are no parasites left in the serial buffer.
+ """
+ for i in range(0, 2):
+ selftest.mbed.serial_write("\n")
+ c = selftest.mbed.serial_readline()
+
+ for i in range(0, self.TEST_LOOP_COUNT):
+ TEST_STRING = str(uuid.uuid4()) + "\n"
+ selftest.mbed.serial_write(TEST_STRING)
+ c = selftest.mbed.serial_readline()
+ if c is None:
+ return selftest.RESULT_IO_SERIAL
+ if c.strip() != TEST_STRING.strip():
+ selftest.notify('HOST: "%s" != "%s"'% (c, TEST_STRING))
+ result = False
+ else:
+ sys.stdout.write('.')
+ stdout.flush()
+ return selftest.RESULT_SUCCESS if result else selftest.RESULT_FAILURE
diff --git a/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/echo_flow_control.py b/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/echo_flow_control.py
new file mode 100644
index 0000000000..7ea11e9736
--- /dev/null
+++ b/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/echo_flow_control.py
@@ -0,0 +1,48 @@
+"""
+mbed SDK
+Copyright (c) 2011-2013 ARM Limited
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+from host_test import Test
+
+
+class EchoTest(Test):
+ def __init__(self):
+ Test.__init__(self)
+ self.mbed.init_serial()
+ self.mbed.extra_serial.rtscts = True
+ self.mbed.reset()
+
+ def test(self):
+ self.mbed.flush()
+ self.notify("Starting the ECHO test")
+ TEST="longer serial test"
+ check = True
+ for i in range(1, 100):
+ self.mbed.extra_serial.write(TEST + "\n")
+ l = self.mbed.extra_serial.readline().strip()
+ if not l: continue
+
+ if l != TEST:
+ check = False
+ self.notify('"%s" != "%s"' % (l, TEST))
+ else:
+ if (i % 10) == 0:
+ self.notify('.')
+
+ return check
+
+
+if __name__ == '__main__':
+ EchoTest().run()
diff --git a/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/example/BroadcastReceive.py b/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/example/BroadcastReceive.py
new file mode 100644
index 0000000000..2e846ca19e
--- /dev/null
+++ b/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/example/BroadcastReceive.py
@@ -0,0 +1,25 @@
+"""
+mbed SDK
+Copyright (c) 2011-2013 ARM Limited
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+import socket
+
+BROADCAST_PORT = 58083
+
+s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+s.bind(('0.0.0.0', BROADCAST_PORT))
+
+while True:
+ print s.recvfrom(256)
diff --git a/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/example/BroadcastSend.py b/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/example/BroadcastSend.py
new file mode 100644
index 0000000000..0a5f8c3201
--- /dev/null
+++ b/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/example/BroadcastSend.py
@@ -0,0 +1,30 @@
+"""
+mbed SDK
+Copyright (c) 2011-2013 ARM Limited
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+import socket
+from time import sleep, time
+
+BROADCAST_PORT = 58083
+
+s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+s.bind(('', 0))
+s.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
+
+while True:
+ print "Broadcasting..."
+ data = 'Hello World: ' + repr(time()) + '\n'
+ s.sendto(data, ('<broadcast>', BROADCAST_PORT))
+ sleep(1)
diff --git a/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/example/MulticastReceive.py b/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/example/MulticastReceive.py
new file mode 100644
index 0000000000..9001f40b7d
--- /dev/null
+++ b/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/example/MulticastReceive.py
@@ -0,0 +1,31 @@
+"""
+mbed SDK
+Copyright (c) 2011-2013 ARM Limited
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+import socket
+import struct
+
+MCAST_GRP = '224.1.1.1'
+MCAST_PORT = 5007
+
+sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
+sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+sock.bind(('', MCAST_PORT))
+mreq = struct.pack("4sl", socket.inet_aton(MCAST_GRP), socket.INADDR_ANY)
+
+sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
+
+while True:
+ print sock.recv(10240)
diff --git a/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/example/MulticastSend.py b/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/example/MulticastSend.py
new file mode 100644
index 0000000000..8efd4534ae
--- /dev/null
+++ b/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/example/MulticastSend.py
@@ -0,0 +1,30 @@
+"""
+mbed SDK
+Copyright (c) 2011-2013 ARM Limited
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+import socket
+from time import sleep, time
+
+MCAST_GRP = '224.1.1.1'
+MCAST_PORT = 5007
+
+sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
+sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 2)
+
+while True:
+ print "Multicast to group: %s\n" % MCAST_GRP
+ data = 'Hello World: ' + repr(time()) + '\n'
+ sock.sendto(data, (MCAST_GRP, MCAST_PORT))
+ sleep(1)
diff --git a/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/example/TCPEchoClient.py b/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/example/TCPEchoClient.py
new file mode 100644
index 0000000000..dfa9bfdae7
--- /dev/null
+++ b/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/example/TCPEchoClient.py
@@ -0,0 +1,28 @@
+"""
+mbed SDK
+Copyright (c) 2011-2013 ARM Limited
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+import socket
+
+ECHO_SERVER_ADDRESS = "10.2.202.45"
+ECHO_PORT = 7
+
+s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+s.connect((ECHO_SERVER_ADDRESS, ECHO_PORT))
+
+s.sendall('Hello, world')
+data = s.recv(1024)
+s.close()
+print 'Received', repr(data)
diff --git a/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/example/TCPEchoServer.py b/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/example/TCPEchoServer.py
new file mode 100644
index 0000000000..1324edbe64
--- /dev/null
+++ b/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/example/TCPEchoServer.py
@@ -0,0 +1,30 @@
+"""
+mbed SDK
+Copyright (c) 2011-2013 ARM Limited
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+import socket
+
+s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+s.bind(('', 7))
+s.listen(1)
+
+while True:
+ conn, addr = s.accept()
+ print 'Connected by', addr
+ while True:
+ data = conn.recv(1024)
+ if not data: break
+ conn.sendall(data)
+ conn.close()
diff --git a/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/example/UDPEchoClient.py b/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/example/UDPEchoClient.py
new file mode 100644
index 0000000000..6a6cf8c902
--- /dev/null
+++ b/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/example/UDPEchoClient.py
@@ -0,0 +1,28 @@
+"""
+mbed SDK
+Copyright (c) 2011-2013 ARM Limited
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+import socket
+
+ECHO_SERVER_ADDRESS = '10.2.202.45'
+ECHO_PORT = 7
+
+sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+
+sock.sendto("Hello World\n", (ECHO_SERVER_ADDRESS, ECHO_PORT))
+response = sock.recv(256)
+sock.close()
+
+print response
diff --git a/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/example/UDPEchoServer.py b/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/example/UDPEchoServer.py
new file mode 100644
index 0000000000..38503489ee
--- /dev/null
+++ b/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/example/UDPEchoServer.py
@@ -0,0 +1,27 @@
+"""
+mbed SDK
+Copyright (c) 2011-2013 ARM Limited
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+import socket
+
+ECHO_PORT = 7
+
+sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+sock.bind(('', ECHO_PORT))
+
+while True:
+ data, address = sock.recvfrom(256)
+ print "datagram from", address
+ sock.sendto(data, address)
diff --git a/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/example/__init__.py b/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/example/__init__.py
new file mode 100644
index 0000000000..10e7e1d1de
--- /dev/null
+++ b/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/example/__init__.py
@@ -0,0 +1,16 @@
+"""
+mbed SDK
+Copyright (c) 2011-2013 ARM Limited
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+""" \ No newline at end of file
diff --git a/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/hello_auto.py b/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/hello_auto.py
new file mode 100644
index 0000000000..69b39bf6bb
--- /dev/null
+++ b/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/hello_auto.py
@@ -0,0 +1,34 @@
+"""
+mbed SDK
+Copyright (c) 2011-2013 ARM Limited
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+
+class HelloTest():
+ HELLO_WORLD = "Hello World"
+
+ def test(self, selftest):
+ c = selftest.mbed.serial_readline()
+ if c is None:
+ return selftest.RESULT_IO_SERIAL
+ selftest.notify("Read %d bytes:"% len(c))
+ selftest.notify(c.strip())
+
+ result = True
+ # Because we can have targetID here let's try to decode
+ if len(c) < len(self.HELLO_WORLD):
+ result = False
+ else:
+ result = self.HELLO_WORLD in c
+ return selftest.RESULT_SUCCESS if result else selftest.RESULT_FAILURE
diff --git a/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/host_registry.py b/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/host_registry.py
new file mode 100644
index 0000000000..d52384834a
--- /dev/null
+++ b/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/host_registry.py
@@ -0,0 +1,36 @@
+"""
+mbed SDK
+Copyright (c) 2011-2013 ARM Limited
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+
+class HostRegistry:
+ """ Class stores registry with host tests and objects representing them
+ """
+ HOST_TESTS = {} # host_test_name -> host_test_ojbect
+
+ def register_host_test(self, ht_name, ht_object):
+ if ht_name not in self.HOST_TESTS:
+ self.HOST_TESTS[ht_name] = ht_object
+
+ def unregister_host_test(self):
+ if ht_name in HOST_TESTS:
+ self.HOST_TESTS[ht_name] = None
+
+ def get_host_test(self, ht_name):
+ return self.HOST_TESTS[ht_name] if ht_name in self.HOST_TESTS else None
+
+ def is_host_test(self, ht_name):
+ return ht_name in self.HOST_TESTS
+ \ No newline at end of file
diff --git a/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/host_test.py b/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/host_test.py
new file mode 100644
index 0000000000..4dd16505a2
--- /dev/null
+++ b/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/host_test.py
@@ -0,0 +1,397 @@
+"""
+mbed SDK
+Copyright (c) 2011-2013 ARM Limited
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+
+# Check if 'serial' module is installed
+try:
+ from serial import Serial
+except ImportError, e:
+ print "Error: Can't import 'serial' module: %s"% e
+ exit(-1)
+
+import os
+import re
+import types
+from sys import stdout
+from time import sleep, time
+from optparse import OptionParser
+
+import host_tests_plugins
+
+# This is a little tricky. We need to add upper directory to path so
+# we can find packages we want from the same level as other files do
+import sys
+sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../..')))
+
+
+class Mbed:
+ """ Base class for a host driven test
+ """
+ def __init__(self):
+ parser = OptionParser()
+
+ parser.add_option("-m", "--micro",
+ dest="micro",
+ help="The target microcontroller",
+ metavar="MICRO")
+
+ parser.add_option("-p", "--port",
+ dest="port",
+ help="The serial port of the target mbed",
+ metavar="PORT")
+
+ parser.add_option("-d", "--disk",
+ dest="disk",
+ help="The target disk path",
+ metavar="DISK_PATH")
+
+ parser.add_option("-f", "--image-path",
+ dest="image_path",
+ help="Path with target's image",
+ metavar="IMAGE_PATH")
+
+ parser.add_option("-c", "--copy",
+ dest="copy_method",
+ help="Copy method selector",
+ metavar="COPY_METHOD")
+
+ parser.add_option("-C", "--program_cycle_s",
+ dest="program_cycle_s",
+ help="Program cycle sleep. Define how many seconds you want wait after copying bianry onto target",
+ type="float",
+ metavar="COPY_METHOD")
+
+ parser.add_option("-t", "--timeout",
+ dest="timeout",
+ help="Timeout",
+ metavar="TIMEOUT")
+
+ parser.add_option("-r", "--reset",
+ dest="forced_reset_type",
+ help="Forces different type of reset")
+
+ parser.add_option("-R", "--reset-timeout",
+ dest="forced_reset_timeout",
+ metavar="NUMBER",
+ type="int",
+ help="When forcing a reset using option -r you can set up after reset timeout in seconds")
+
+ (self.options, _) = parser.parse_args()
+
+ self.DEFAULT_RESET_TOUT = 0
+ self.DEFAULT_TOUT = 10
+
+ if self.options.port is None:
+ raise Exception("The serial port of the target mbed have to be provided as command line arguments")
+
+ # Options related to copy / reset mbed device
+ self.port = self.options.port
+ self.disk = self.options.disk
+ self.image_path = self.options.image_path.strip('"')
+ self.copy_method = self.options.copy_method
+ self.program_cycle_s = float(self.options.program_cycle_s)
+
+ self.serial = None
+ self.serial_baud = 9600
+ self.serial_timeout = 1
+
+ self.timeout = self.DEFAULT_TOUT if self.options.timeout is None else self.options.timeout
+ print 'MBED: Instrumentation: "%s" and disk: "%s"' % (self.port, self.disk)
+
+ def init_serial_params(self, serial_baud=9600, serial_timeout=1):
+ """ Initialize port parameters.
+ This parameters will be used by self.init_serial() function to open serial port
+ """
+ self.serial_baud = serial_baud
+ self.serial_timeout = serial_timeout
+
+ def init_serial(self, serial_baud=None, serial_timeout=None):
+ """ Initialize serial port.
+ Function will return error is port can't be opened or initialized
+ """
+ # Overload serial port configuration from default to parameters' values if they are specified
+ serial_baud = serial_baud if serial_baud is not None else self.serial_baud
+ serial_timeout = serial_timeout if serial_timeout is not None else self.serial_timeout
+
+ # Clear serial port
+ if self.serial:
+ self.serial.close()
+ self.serial = None
+
+ # We will pool for serial to be re-mounted if it was unmounted after device reset
+ result = self.pool_for_serial_init(serial_baud, serial_timeout) # Blocking
+
+ # Port can be opened
+ if result:
+ self.flush()
+ return result
+
+ def pool_for_serial_init(self, serial_baud, serial_timeout, pooling_loops=40, init_delay=0.5, loop_delay=0.25):
+ """ Functions pools for serial port readiness
+ """
+ result = True
+ last_error = None
+ # This loop is used to check for serial port availability due to
+ # some delays and remounting when devices are being flashed with new software.
+ for i in range(pooling_loops):
+ sleep(loop_delay if i else init_delay)
+ try:
+ self.serial = Serial(self.port, baudrate=serial_baud, timeout=serial_timeout)
+ except Exception as e:
+ result = False
+ last_error = "MBED: %s"% str(e)
+ stdout.write('.')
+ stdout.flush()
+ else:
+ print "...port ready!"
+ result = True
+ break
+ if not result and last_error:
+ print last_error
+ return result
+
+ def set_serial_timeout(self, timeout):
+ """ Wraps self.mbed.serial object timeout property
+ """
+ result = None
+ if self.serial:
+ self.serial.timeout = timeout
+ result = True
+ return result
+
+ def serial_read(self, count=1):
+ """ Wraps self.mbed.serial object read method
+ """
+ result = None
+ if self.serial:
+ try:
+ result = self.serial.read(count)
+ except:
+ result = None
+ return result
+
+ def serial_readline(self, timeout=5):
+ """ Wraps self.mbed.serial object read method to read one line from serial port
+ """
+ result = ''
+ start = time()
+ while (time() - start) < timeout:
+ if self.serial:
+ try:
+ c = self.serial.read(1)
+ result += c
+ except Exception as e:
+ print "MBED: %s"% str(e)
+ result = None
+ break
+ if c == '\n':
+ break
+ return result
+
+ def serial_write(self, write_buffer):
+ """ Wraps self.mbed.serial object write method
+ """
+ result = None
+ if self.serial:
+ try:
+ result = self.serial.write(write_buffer)
+ except:
+ result = None
+ return result
+
+ def reset_timeout(self, timeout):
+ """ Timeout executed just after reset command is issued
+ """
+ for n in range(0, timeout):
+ sleep(1)
+
+ def reset(self):
+ """ Calls proper reset plugin to do the job.
+ Please refer to host_test_plugins functionality
+ """
+ # Flush serials to get only input after reset
+ self.flush()
+ if self.options.forced_reset_type:
+ result = host_tests_plugins.call_plugin('ResetMethod', self.options.forced_reset_type, disk=self.disk)
+ else:
+ result = host_tests_plugins.call_plugin('ResetMethod', 'default', serial=self.serial)
+ # Give time to wait for the image loading
+ reset_tout_s = self.options.forced_reset_timeout if self.options.forced_reset_timeout is not None else self.DEFAULT_RESET_TOUT
+ self.reset_timeout(reset_tout_s)
+ return result
+
+ def copy_image(self, image_path=None, disk=None, copy_method=None):
+ """ Closure for copy_image_raw() method.
+ Method which is actually copying image to mbed
+ """
+ # Set closure environment
+ image_path = image_path if image_path is not None else self.image_path
+ disk = disk if disk is not None else self.disk
+ copy_method = copy_method if copy_method is not None else self.copy_method
+ # Call proper copy method
+ result = self.copy_image_raw(image_path, disk, copy_method)
+ sleep(self.program_cycle_s)
+ return result
+
+ def copy_image_raw(self, image_path=None, disk=None, copy_method=None):
+ """ Copy file depending on method you want to use. Handles exception
+ and return code from shell copy commands.
+ """
+ if copy_method is not None:
+ # image_path - Where is binary with target's firmware
+ result = host_tests_plugins.call_plugin('CopyMethod', copy_method, image_path=image_path, destination_disk=disk)
+ else:
+ copy_method = 'default'
+ result = host_tests_plugins.call_plugin('CopyMethod', copy_method, image_path=image_path, destination_disk=disk)
+ return result;
+
+ def flush(self):
+ """ Flush serial ports
+ """
+ result = False
+ if self.serial:
+ self.serial.flushInput()
+ self.serial.flushOutput()
+ result = True
+ return result
+
+
+class HostTestResults:
+ """ Test results set by host tests
+ """
+ def __init__(self):
+ self.RESULT_SUCCESS = 'success'
+ self.RESULT_FAILURE = 'failure'
+ self.RESULT_ERROR = 'error'
+ self.RESULT_IO_SERIAL = 'ioerr_serial'
+ self.RESULT_NO_IMAGE = 'no_image'
+ self.RESULT_IOERR_COPY = "ioerr_copy"
+ self.RESULT_PASSIVE = "passive"
+ self.RESULT_NOT_DETECTED = "not_detected"
+ self.RESULT_MBED_ASSERT = "mbed_assert"
+
+
+import workspace_tools.host_tests as host_tests
+
+
+class Test(HostTestResults):
+ """ Base class for host test's test runner
+ """
+ # Select default host_test supervision (replaced after autodetection)
+ test_supervisor = host_tests.get_host_test("default")
+
+ def __init__(self):
+ self.mbed = Mbed()
+
+ def detect_test_config(self, verbose=False):
+ """ Detects test case configuration
+ """
+ result = {}
+ while True:
+ line = self.mbed.serial_readline()
+ if "{start}" in line:
+ self.notify("HOST: Start test...")
+ break
+ else:
+ # Detect if this is property from TEST_ENV print
+ m = re.search('{([\w_]+);([\w\d\+ ]+)}}', line[:-1])
+ if m and len(m.groups()) == 2:
+ # This is most likely auto-detection property
+ result[m.group(1)] = m.group(2)
+ if verbose:
+ self.notify("HOST: Property '%s' = '%s'"% (m.group(1), m.group(2)))
+ else:
+ # We can check if this is TArget Id in mbed specific format
+ m2 = re.search('^([\$]+)([a-fA-F0-9]+)', line[:-1])
+ if m2 and len(m2.groups()) == 2:
+ if verbose:
+ target_id = m2.group(1) + m2.group(2)
+ self.notify("HOST: TargetID '%s'"% target_id)
+ self.notify(line[len(target_id):-1])
+ else:
+ self.notify("HOST: Unknown property: %s"% line.strip())
+ return result
+
+ def run(self):
+ """ Test runner for host test. This function will start executing
+ test and forward test result via serial port to test suite
+ """
+ # Copy image to device
+ self.notify("HOST: Copy image onto target...")
+ result = self.mbed.copy_image()
+ if not result:
+ self.print_result(self.RESULT_IOERR_COPY)
+
+ # Initialize and open target's serial port (console)
+ self.notify("HOST: Initialize serial port...")
+ result = self.mbed.init_serial()
+ if not result:
+ self.print_result(self.RESULT_IO_SERIAL)
+
+ # Reset device
+ self.notify("HOST: Reset target...")
+ result = self.mbed.reset()
+ if not result:
+ self.print_result(self.RESULT_IO_SERIAL)
+
+ # Run test
+ try:
+ CONFIG = self.detect_test_config(verbose=True) # print CONFIG
+
+ if "host_test_name" in CONFIG:
+ if host_tests.is_host_test(CONFIG["host_test_name"]):
+ self.test_supervisor = host_tests.get_host_test(CONFIG["host_test_name"])
+ result = self.test_supervisor.test(self) #result = self.test()
+
+ if result is not None:
+ self.print_result(result)
+ else:
+ self.notify("HOST: Passive mode...")
+ except Exception, e:
+ print str(e)
+ self.print_result(self.RESULT_ERROR)
+
+ def setup(self):
+ """ Setup and check if configuration for test is
+ correct. E.g. if serial port can be opened.
+ """
+ result = True
+ if not self.mbed.serial:
+ result = False
+ self.print_result(self.RESULT_IO_SERIAL)
+ return result
+
+ def notify(self, message):
+ """ On screen notification function
+ """
+ print message
+ stdout.flush()
+
+ def print_result(self, result):
+ """ Test result unified printing function
+ """
+ self.notify("\r\n{{%s}}\r\n{{end}}" % result)
+
+
+class DefaultTestSelector(Test):
+ """ Test class with serial port initialization
+ """
+ def __init__(self):
+ HostTestResults.__init__(self)
+ Test.__init__(self)
+
+if __name__ == '__main__':
+ DefaultTestSelector().run()
diff --git a/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/host_tests_plugins/__init__.py b/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/host_tests_plugins/__init__.py
new file mode 100644
index 0000000000..913da02648
--- /dev/null
+++ b/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/host_tests_plugins/__init__.py
@@ -0,0 +1,68 @@
+"""
+mbed SDK
+Copyright (c) 2011-2013 ARM Limited
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+
+import host_test_registry
+
+# This plugins provide 'flashing' methods to host test scripts
+import module_copy_mbed
+import module_copy_shell
+import module_copy_silabs
+#import module_copy_firefox
+#import module_copy_mps2
+
+# Plugins used to reset certain platform
+import module_reset_mbed
+import module_reset_silabs
+#import module_reset_mps2
+
+
+# Plugin registry instance
+HOST_TEST_PLUGIN_REGISTRY = host_test_registry.HostTestRegistry()
+
+# Static plugin registration
+# Some plugins are commented out if they are not stable or not commonly used
+HOST_TEST_PLUGIN_REGISTRY.register_plugin(module_copy_mbed.load_plugin())
+HOST_TEST_PLUGIN_REGISTRY.register_plugin(module_copy_shell.load_plugin())
+HOST_TEST_PLUGIN_REGISTRY.register_plugin(module_reset_mbed.load_plugin())
+#HOST_TEST_PLUGIN_REGISTRY.register_plugin(module_copy_firefox.load_plugin())
+
+# Extra platforms support
+#HOST_TEST_PLUGIN_REGISTRY.register_plugin(module_copy_mps2.load_plugin())
+#HOST_TEST_PLUGIN_REGISTRY.register_plugin(module_reset_mps2.load_plugin())
+HOST_TEST_PLUGIN_REGISTRY.register_plugin(module_copy_silabs.load_plugin())
+HOST_TEST_PLUGIN_REGISTRY.register_plugin(module_reset_silabs.load_plugin())
+
+# TODO: extend plugin loading to files with name module_*.py loaded ad-hoc
+
+###############################################################################
+# Functional interface for host test plugin registry
+###############################################################################
+def call_plugin(type, capability, *args, **kwargs):
+ """ Interface to call plugin registry functional way
+ """
+ return HOST_TEST_PLUGIN_REGISTRY.call_plugin(type, capability, *args, **kwargs)
+
+def get_plugin_caps(type):
+ """ Returns list of all capabilities for plugin family with the same type.
+ If there are no capabilities empty list is returned
+ """
+ return HOST_TEST_PLUGIN_REGISTRY.get_plugin_caps(type)
+
+def print_plugin_info():
+ """ Prints plugins' information in user friendly way
+ """
+ print HOST_TEST_PLUGIN_REGISTRY
diff --git a/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/host_tests_plugins/host_test_plugins.py b/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/host_tests_plugins/host_test_plugins.py
new file mode 100644
index 0000000000..8bc1da35d3
--- /dev/null
+++ b/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/host_tests_plugins/host_test_plugins.py
@@ -0,0 +1,118 @@
+"""
+mbed SDK
+Copyright (c) 2011-2013 ARM Limited
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+
+from os import access, F_OK
+from sys import stdout
+from time import sleep
+from subprocess import call
+
+
+class HostTestPluginBase:
+ """ Base class for all plug-ins used with host tests.
+ """
+ ###########################################################################
+ # Interface:
+ ###########################################################################
+
+ ###########################################################################
+ # Interface attributes defining plugin name, type etc.
+ ###########################################################################
+ name = "HostTestPluginBase" # Plugin name, can be plugin class name
+ type = "BasePlugin" # Plugin type: ResetMethod, Copymethod etc.
+ capabilities = [] # Capabilities names: what plugin can achieve
+ # (e.g. reset using some external command line tool)
+ stable = False # Determine if plugin is stable and can be used
+
+ ###########################################################################
+ # Interface methods
+ ###########################################################################
+ def setup(self, *args, **kwargs):
+ """ Configure plugin, this function should be called before plugin execute() method is used.
+ """
+ return False
+
+ def execute(self, capabilitity, *args, **kwargs):
+ """ Executes capability by name.
+ Each capability e.g. may directly just call some command line
+ program or execute building pythonic function
+ """
+ return False
+
+ ###########################################################################
+ # Interface helper methods - overload only if you need to have custom behaviour
+ ###########################################################################
+ def print_plugin_error(self, text):
+ """ Function prints error in console and exits always with False
+ """
+ print "Plugin error: %s::%s: %s"% (self.name, self.type, text)
+ return False
+
+ def print_plugin_info(self, text, NL=True):
+ """ Function prints notification in console and exits always with True
+ """
+ if NL:
+ print "Plugin info: %s::%s: %s"% (self.name, self.type, text)
+ else:
+ print "Plugin info: %s::%s: %s"% (self.name, self.type, text),
+ return True
+
+ def print_plugin_char(self, char):
+ """ Function prints char on stdout
+ """
+ stdout.write(char)
+ stdout.flush()
+ return True
+
+ def check_mount_point_ready(self, destination_disk, init_delay=0.2, loop_delay=0.25):
+ """ Checks if destination_disk is ready and can be accessed by e.g. copy commands
+ @init_delay - Initial delay time before first access check
+ @loop_delay - pooling delay for access check
+ """
+ if not access(destination_disk, F_OK):
+ self.print_plugin_info("Waiting for mount point '%s' to be ready..."% destination_disk, NL=False)
+ sleep(init_delay)
+ while not access(destination_disk, F_OK):
+ sleep(loop_delay)
+ self.print_plugin_char('.')
+
+ def check_parameters(self, capabilitity, *args, **kwargs):
+ """ This function should be ran each time we call execute()
+ to check if none of the required parameters is missing.
+ """
+ missing_parameters = []
+ for parameter in self.required_parameters:
+ if parameter not in kwargs:
+ missing_parameters.append(parameter)
+ if len(missing_parameters) > 0:
+ self.print_plugin_error("execute parameter(s) '%s' missing!"% (', '.join(parameter)))
+ return False
+ return True
+
+ def run_command(self, cmd, shell=True):
+ """ Runs command from command line.
+ """
+ result = True
+ try:
+ ret = call(cmd, shell=shell)
+ if ret:
+ self.print_plugin_error("[ret=%d] Command: %s"% (int(ret), cmd))
+ return False
+ except Exception as e:
+ result = False
+ self.print_plugin_error("[ret=%d] Command: %s"% (int(ret), cmd))
+ self.print_plugin_error(str(e))
+ return result
diff --git a/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/host_tests_plugins/host_test_registry.py b/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/host_tests_plugins/host_test_registry.py
new file mode 100644
index 0000000000..5237b9a254
--- /dev/null
+++ b/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/host_tests_plugins/host_test_registry.py
@@ -0,0 +1,89 @@
+"""
+mbed SDK
+Copyright (c) 2011-2013 ARM Limited
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+
+class HostTestRegistry:
+ """ Simple class used to register and store
+ host test plugins for further usage
+ """
+ # Here we actually store all the plugins
+ PLUGINS = {} # 'Plugin Name' : Plugin Object
+
+ def print_error(self, text):
+ print "Plugin load failed. Reason: %s"% text
+
+ def register_plugin(self, plugin):
+ """ Registers and stores plugin inside registry for further use.
+ Method also calls plugin's setup() function to configure plugin if needed.
+
+ Note: Different groups of plugins may demand different extra parameter. Plugins
+ should be at least for one type of plugin configured with the same parameters
+ because we do not know which of them will actually use particular parameter.
+ """
+ # TODO:
+ # - check for unique caps for specified type
+ if plugin.name not in self.PLUGINS:
+ if plugin.setup(): # Setup plugin can be completed without errors
+ self.PLUGINS[plugin.name] = plugin
+ return True
+ else:
+ self.print_error("%s setup failed"% plugin.name)
+ else:
+ self.print_error("%s already loaded"% plugin.name)
+ return False
+
+ def call_plugin(self, type, capability, *args, **kwargs):
+ """ Execute plugin functionality respectively to its purpose
+ """
+ for plugin_name in self.PLUGINS:
+ plugin = self.PLUGINS[plugin_name]
+ if plugin.type == type and capability in plugin.capabilities:
+ return plugin.execute(capability, *args, **kwargs)
+ return False
+
+ def get_plugin_caps(self, type):
+ """ Returns list of all capabilities for plugin family with the same type.
+ If there are no capabilities empty list is returned
+ """
+ result = []
+ for plugin_name in self.PLUGINS:
+ plugin = self.PLUGINS[plugin_name]
+ if plugin.type == type:
+ result.extend(plugin.capabilities)
+ return sorted(result)
+
+ def load_plugin(self, name):
+ """ Used to load module from
+ """
+ mod = __import__("module_%s"% name)
+ return mod
+
+ def __str__(self):
+ """ User friendly printing method to show hooked plugins
+ """
+ from prettytable import PrettyTable
+ column_names = ['name', 'type', 'capabilities', 'stable']
+ pt = PrettyTable(column_names)
+ for column in column_names:
+ pt.align[column] = 'l'
+ for plugin_name in sorted(self.PLUGINS.keys()):
+ name = self.PLUGINS[plugin_name].name
+ type = self.PLUGINS[plugin_name].type
+ stable = self.PLUGINS[plugin_name].stable
+ capabilities = ', '.join(self.PLUGINS[plugin_name].capabilities)
+ row = [name, type, capabilities, stable]
+ pt.add_row(row)
+ return pt.get_string()
diff --git a/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/host_tests_plugins/module_copy_firefox.py b/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/host_tests_plugins/module_copy_firefox.py
new file mode 100644
index 0000000000..360835e498
--- /dev/null
+++ b/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/host_tests_plugins/module_copy_firefox.py
@@ -0,0 +1,76 @@
+"""
+mbed SDK
+Copyright (c) 2011-2013 ARM Limited
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+
+from os.path import join, basename
+from host_test_plugins import HostTestPluginBase
+
+
+class HostTestPluginCopyMethod_Firefox(HostTestPluginBase):
+
+ def file_store_firefox(self, file_path, dest_disk):
+ try:
+ from selenium import webdriver
+ profile = webdriver.FirefoxProfile()
+ profile.set_preference('browser.download.folderList', 2) # custom location
+ profile.set_preference('browser.download.manager.showWhenStarting', False)
+ profile.set_preference('browser.download.dir', dest_disk)
+ profile.set_preference('browser.helperApps.neverAsk.saveToDisk', 'application/octet-stream')
+ # Launch browser with profile and get file
+ browser = webdriver.Firefox(profile)
+ browser.get(file_path)
+ browser.close()
+ except:
+ return False
+ return True
+
+ # Plugin interface
+ name = 'HostTestPluginCopyMethod_Firefox'
+ type = 'CopyMethod'
+ capabilities = ['firefox']
+ required_parameters = ['image_path', 'destination_disk']
+
+ def setup(self, *args, **kwargs):
+ """ Configure plugin, this function should be called before plugin execute() method is used.
+ """
+ try:
+ from selenium import webdriver
+ except ImportError, e:
+ self.print_plugin_error("Error: firefox copy method requires selenium library. %s"% e)
+ return False
+ return True
+
+ def execute(self, capabilitity, *args, **kwargs):
+ """ Executes capability by name.
+ Each capability may directly just call some command line
+ program or execute building pythonic function
+ """
+ result = False
+ if self.check_parameters(capabilitity, *args, **kwargs) is True:
+ image_path = kwargs['image_path']
+ destination_disk = kwargs['destination_disk']
+ # Prepare correct command line parameter values
+ image_base_name = basename(image_path)
+ destination_path = join(destination_disk, image_base_name)
+ if capabilitity == 'firefox':
+ self.file_store_firefox(image_path, destination_path)
+ return result
+
+
+def load_plugin():
+ """ Returns plugin available in this module
+ """
+ return HostTestPluginCopyMethod_Firefox()
diff --git a/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/host_tests_plugins/module_copy_mbed.py b/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/host_tests_plugins/module_copy_mbed.py
new file mode 100644
index 0000000000..18fe0c42ae
--- /dev/null
+++ b/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/host_tests_plugins/module_copy_mbed.py
@@ -0,0 +1,71 @@
+"""
+mbed SDK
+Copyright (c) 2011-2013 ARM Limited
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+
+from shutil import copy
+from host_test_plugins import HostTestPluginBase
+
+
+class HostTestPluginCopyMethod_Mbed(HostTestPluginBase):
+
+ def generic_mbed_copy(self, image_path, destination_disk):
+ """ Generic mbed copy method for "mbed enabled" devices.
+ It uses standard python shuitl function to copy
+ image_file (target specific binary) to device's disk.
+ """
+ result = True
+ if not destination_disk.endswith('/') and not destination_disk.endswith('\\'):
+ destination_disk += '/'
+ try:
+ copy(image_path, destination_disk)
+ except Exception, e:
+ self.print_plugin_error("shutil.copy('%s', '%s')"% (image_path, destination_disk))
+ self.print_plugin_error("Error: %s"% str(e))
+ result = False
+ return result
+
+ # Plugin interface
+ name = 'HostTestPluginCopyMethod_Mbed'
+ type = 'CopyMethod'
+ stable = True
+ capabilities = ['default']
+ required_parameters = ['image_path', 'destination_disk']
+
+ def setup(self, *args, **kwargs):
+ """ Configure plugin, this function should be called before plugin execute() method is used.
+ """
+ return True
+
+ def execute(self, capabilitity, *args, **kwargs):
+ """ Executes capability by name.
+ Each capability may directly just call some command line
+ program or execute building pythonic function
+ """
+ result = False
+ if self.check_parameters(capabilitity, *args, **kwargs) is True:
+ if capabilitity == 'default':
+ image_path = kwargs['image_path']
+ destination_disk = kwargs['destination_disk']
+ # Wait for mount point to be ready
+ self.check_mount_point_ready(destination_disk) # Blocking
+ result = self.generic_mbed_copy(image_path, destination_disk)
+ return result
+
+
+def load_plugin():
+ """ Returns plugin available in this module
+ """
+ return HostTestPluginCopyMethod_Mbed()
diff --git a/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/host_tests_plugins/module_copy_mps2.py b/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/host_tests_plugins/module_copy_mps2.py
new file mode 100644
index 0000000000..f7768873f9
--- /dev/null
+++ b/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/host_tests_plugins/module_copy_mps2.py
@@ -0,0 +1,107 @@
+"""
+mbed SDK
+Copyright (c) 2011-2013 ARM Limited
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+
+import re
+from os.path import join
+from host_test_plugins import HostTestPluginBase
+
+
+class HostTestPluginCopyMethod_MPS2(HostTestPluginBase):
+
+ # MPS2 specific flashing / binary setup funcitons
+ def mps2_set_board_image_file(self, disk, images_cfg_path, image0file_path, image_name='images.txt'):
+ """ This function will alter image cfg file.
+ Main goal of this function is to change number of images to 1, comment all
+ existing image entries and append at the end of file new entry with test path.
+ @return True when all steps succeed.
+ """
+ MBED_SDK_TEST_STAMP = 'test suite entry'
+ image_path = join(disk, images_cfg_path, image_name)
+ new_file_lines = [] # New configuration file lines (entries)
+
+ # Check each line of the image configuration file
+ try:
+ with open(image_path, 'r') as file:
+ for line in file:
+ if re.search('^TOTALIMAGES', line):
+ # Check number of total images, should be 1
+ new_file_lines.append(re.sub('^TOTALIMAGES:[\t ]*[\d]+', 'TOTALIMAGES: 1', line))
+ elif re.search('; - %s[\n\r]*$'% MBED_SDK_TEST_STAMP, line):
+ # Look for test suite entries and remove them
+ pass # Omit all test suite entries
+ elif re.search('^IMAGE[\d]+FILE', line):
+ # Check all image entries and mark the ';'
+ new_file_lines.append(';' + line) # Comment non test suite lines
+ else:
+ # Append line to new file
+ new_file_lines.append(line)
+ except IOError as e:
+ return False
+
+ # Add new image entry with proper commented stamp
+ new_file_lines.append('IMAGE0FILE: %s ; - %s\r\n'% (image0file_path, MBED_SDK_TEST_STAMP))
+
+ # Write all lines to file
+ try:
+ with open(image_path, 'w') as file:
+ for line in new_file_lines:
+ file.write(line),
+ except IOError:
+ return False
+
+ return True
+
+ def mps2_select_core(self, disk, mobo_config_name=""):
+ """ Function selects actual core
+ """
+ # TODO: implement core selection
+ pass
+
+ def mps2_switch_usb_auto_mounting_after_restart(self, disk, usb_config_name=""):
+ """ Function alters configuration to allow USB MSD to be mounted after restarts
+ """
+ # TODO: implement USB MSD restart detection
+ pass
+
+ # Plugin interface
+ name = 'HostTestPluginCopyMethod_MPS2'
+ type = 'CopyMethod'
+ capabilities = ['mps2']
+ required_parameters = ['image_path', 'destination_disk']
+
+ def setup(self, *args, **kwargs):
+ """ Configure plugin, this function should be called before plugin execute() method is used.
+ """
+ return True
+
+ def execute(self, capabilitity, *args, **kwargs):
+ """ Executes capability by name.
+ Each capability may directly just call some command line
+ program or execute building pythonic function
+ """
+ result = False
+ if self.check_parameters(capabilitity, *args, **kwargs) is True:
+ if capabilitity == 'mps2':
+ # TODO: Implement MPS2 firmware setup here
+ pass
+ return result
+
+
+def load_plugin():
+ """ Returns plugin available in this module
+ """
+ return HostTestPluginCopyMethod_MPS2()
diff --git a/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/host_tests_plugins/module_copy_shell.py b/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/host_tests_plugins/module_copy_shell.py
new file mode 100644
index 0000000000..f7fb23b0a7
--- /dev/null
+++ b/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/host_tests_plugins/module_copy_shell.py
@@ -0,0 +1,64 @@
+"""
+mbed SDK
+Copyright (c) 2011-2013 ARM Limited
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+
+import os
+from os.path import join, basename
+from host_test_plugins import HostTestPluginBase
+
+
+class HostTestPluginCopyMethod_Shell(HostTestPluginBase):
+
+ # Plugin interface
+ name = 'HostTestPluginCopyMethod_Shell'
+ type = 'CopyMethod'
+ stable = True
+ capabilities = ['shell', 'cp', 'copy', 'xcopy']
+ required_parameters = ['image_path', 'destination_disk']
+
+ def setup(self, *args, **kwargs):
+ """ Configure plugin, this function should be called before plugin execute() method is used.
+ """
+ return True
+
+ def execute(self, capabilitity, *args, **kwargs):
+ """ Executes capability by name.
+ Each capability may directly just call some command line
+ program or execute building pythonic function
+ """
+ result = False
+ if self.check_parameters(capabilitity, *args, **kwargs) is True:
+ image_path = kwargs['image_path']
+ destination_disk = kwargs['destination_disk']
+ # Wait for mount point to be ready
+ self.check_mount_point_ready(destination_disk) # Blocking
+ # Prepare correct command line parameter values
+ image_base_name = basename(image_path)
+ destination_path = join(destination_disk, image_base_name)
+ if capabilitity == 'shell':
+ if os.name == 'nt': capabilitity = 'copy'
+ elif os.name == 'posix': capabilitity = 'cp'
+ if capabilitity == 'cp' or capabilitity == 'copy' or capabilitity == 'copy':
+ copy_method = capabilitity
+ cmd = [copy_method, image_path, destination_path]
+ result = self.run_command(cmd)
+ return result
+
+
+def load_plugin():
+ """ Returns plugin available in this module
+ """
+ return HostTestPluginCopyMethod_Shell()
diff --git a/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/host_tests_plugins/module_copy_silabs.py b/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/host_tests_plugins/module_copy_silabs.py
new file mode 100644
index 0000000000..1572bbc6ee
--- /dev/null
+++ b/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/host_tests_plugins/module_copy_silabs.py
@@ -0,0 +1,61 @@
+"""
+mbed SDK
+Copyright (c) 2011-2013 ARM Limited
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+
+from host_test_plugins import HostTestPluginBase
+
+
+class HostTestPluginCopyMethod_Silabs(HostTestPluginBase):
+
+ # Plugin interface
+ name = 'HostTestPluginCopyMethod_Silabs'
+ type = 'CopyMethod'
+ capabilities = ['eACommander', 'eACommander-usb']
+ required_parameters = ['image_path', 'destination_disk']
+
+ def setup(self, *args, **kwargs):
+ """ Configure plugin, this function should be called before plugin execute() method is used.
+ """
+ self.EACOMMANDER_CMD = 'eACommander.exe'
+ return True
+
+ def execute(self, capabilitity, *args, **kwargs):
+ """ Executes capability by name.
+ Each capability may directly just call some command line
+ program or execute building pythonic function
+ """
+ result = False
+ if self.check_parameters(capabilitity, *args, **kwargs) is True:
+ image_path = kwargs['image_path']
+ destination_disk = kwargs['destination_disk']
+ if capabilitity == 'eACommander':
+ cmd = [self.EACOMMANDER_CMD,
+ '--serialno', destination_disk,
+ '--flash', image_path,
+ '--resettype', '2', '--reset']
+ result = self.run_command(cmd)
+ elif capabilitity == 'eACommander-usb':
+ cmd = [self.EACOMMANDER_CMD,
+ '--usb', destination_disk,
+ '--flash', image_path]
+ result = self.run_command(cmd)
+ return result
+
+
+def load_plugin():
+ """ Returns plugin available in this module
+ """
+ return HostTestPluginCopyMethod_Silabs()
diff --git a/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/host_tests_plugins/module_reset_mbed.py b/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/host_tests_plugins/module_reset_mbed.py
new file mode 100644
index 0000000000..0390d84ba6
--- /dev/null
+++ b/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/host_tests_plugins/module_reset_mbed.py
@@ -0,0 +1,72 @@
+"""
+mbed SDK
+Copyright (c) 2011-2013 ARM Limited
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+
+from host_test_plugins import HostTestPluginBase
+
+
+class HostTestPluginResetMethod_Mbed(HostTestPluginBase):
+
+ def safe_sendBreak(self, serial):
+ """ Wraps serial.sendBreak() to avoid serial::serialposix.py exception on Linux
+ Traceback (most recent call last):
+ File "make.py", line 189, in <module>
+ serial.sendBreak()
+ File "/usr/lib/python2.7/dist-packages/serial/serialposix.py", line 511, in sendBreak
+ termios.tcsendbreak(self.fd, int(duration/0.25))
+ error: (32, 'Broken pipe')
+ """
+ result = True
+ try:
+ serial.sendBreak()
+ except:
+ # In linux a termios.error is raised in sendBreak and in setBreak.
+ # The following setBreak() is needed to release the reset signal on the target mcu.
+ try:
+ serial.setBreak(False)
+ except:
+ result = False
+ return result
+
+ # Plugin interface
+ name = 'HostTestPluginResetMethod_Mbed'
+ type = 'ResetMethod'
+ stable = True
+ capabilities = ['default']
+ required_parameters = ['serial']
+
+ def setup(self, *args, **kwargs):
+ """ Configure plugin, this function should be called before plugin execute() method is used.
+ """
+ return True
+
+ def execute(self, capabilitity, *args, **kwargs):
+ """ Executes capability by name.
+ Each capability may directly just call some command line
+ program or execute building pythonic function
+ """
+ result = False
+ if self.check_parameters(capabilitity, *args, **kwargs) is True:
+ if capabilitity == 'default':
+ serial = kwargs['serial']
+ result = self.safe_sendBreak(serial)
+ return result
+
+
+def load_plugin():
+ """ Returns plugin available in this module
+ """
+ return HostTestPluginResetMethod_Mbed()
diff --git a/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/host_tests_plugins/module_reset_mps2.py b/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/host_tests_plugins/module_reset_mps2.py
new file mode 100644
index 0000000000..22938090bb
--- /dev/null
+++ b/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/host_tests_plugins/module_reset_mps2.py
@@ -0,0 +1,74 @@
+"""
+mbed SDK
+Copyright (c) 2011-2013 ARM Limited
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+
+import os
+from host_test_plugins import HostTestPluginBase
+
+# Note: This plugin is not fully functional, needs improvements
+
+class HostTestPluginResetMethod_MPS2(HostTestPluginBase):
+ """ Plugin used to reset ARM_MPS2 platform
+ Supports:
+ reboot.txt - startup from standby state, reboots when in run mode.
+ shutdown.txt - shutdown from run mode.
+ reset.txt - reset FPGA during run mode.
+ """
+ def touch_file(self, path):
+ """ Touch file and set timestamp to items
+ """
+ with open(path, 'a'):
+ os.utime(path, None)
+
+ # Plugin interface
+ name = 'HostTestPluginResetMethod_MPS2'
+ type = 'ResetMethod'
+ capabilities = ['reboot.txt', 'shutdown.txt', 'reset.txt']
+ required_parameters = ['disk']
+
+ def setup(self, *args, **kwargs):
+ """ Prepare / configure plugin to work.
+ This method can receive plugin specific parameters by kwargs and
+ ignore other parameters which may affect other plugins.
+ """
+ return True
+
+ def execute(self, capabilitity, *args, **kwargs):
+ """ Executes capability by name.
+ Each capability may directly just call some command line
+ program or execute building pythonic function
+ """
+ result = False
+ if self.check_parameters(capabilitity, *args, **kwargs) is True:
+
+ if capabilitity == 'reboot.txt':
+ # TODO: Implement touch file for reboot
+ pass
+
+ elif capabilitity == 'shutdown.txt':
+ # TODO: Implement touch file for shutdown
+ pass
+
+ elif capabilitity == 'reset.txt':
+ # TODO: Implement touch file for reset
+ pass
+
+ return result
+
+def load_plugin():
+ """ Returns plugin available in this module
+ """
+ return HostTestPluginResetMethod_MPS2()
diff --git a/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/host_tests_plugins/module_reset_silabs.py b/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/host_tests_plugins/module_reset_silabs.py
new file mode 100644
index 0000000000..2c05cb21c3
--- /dev/null
+++ b/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/host_tests_plugins/module_reset_silabs.py
@@ -0,0 +1,66 @@
+"""
+mbed SDK
+Copyright (c) 2011-2013 ARM Limited
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+
+from host_test_plugins import HostTestPluginBase
+
+
+class HostTestPluginResetMethod_SiLabs(HostTestPluginBase):
+
+ # Plugin interface
+ name = 'HostTestPluginResetMethod_SiLabs'
+ type = 'ResetMethod'
+ stable = True
+ capabilities = ['eACommander', 'eACommander-usb']
+ required_parameters = ['disk']
+
+ def setup(self, *args, **kwargs):
+ """ Configure plugin, this function should be called before plugin execute() method is used.
+ """
+ # Note you need to have eACommander.exe on your system path!
+ self.EACOMMANDER_CMD = 'eACommander.exe'
+ return True
+
+ def execute(self, capabilitity, *args, **kwargs):
+ """ Executes capability by name.
+ Each capability may directly just call some command line
+ program or execute building pythonic function
+ """
+ result = False
+ if self.check_parameters(capabilitity, *args, **kwargs) is True:
+ disk = kwargs['disk'].rstrip('/\\')
+
+ if capabilitity == 'eACommander':
+ # For this copy method 'disk' will be 'serialno' for eACommander command line parameters
+ # Note: Commands are executed in the order they are specified on the command line
+ cmd = [self.EACOMMANDER_CMD,
+ '--serialno', disk,
+ '--resettype', '2', '--reset',]
+ result = self.run_command(cmd)
+ elif capabilitity == 'eACommander-usb':
+ # For this copy method 'disk' will be 'usb address' for eACommander command line parameters
+ # Note: Commands are executed in the order they are specified on the command line
+ cmd = [self.EACOMMANDER_CMD,
+ '--usb', disk,
+ '--resettype', '2', '--reset',]
+ result = self.run_command(cmd)
+ return result
+
+
+def load_plugin():
+ """ Returns plugin available in this module
+ """
+ return HostTestPluginResetMethod_SiLabs()
diff --git a/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/mbedrpc.py b/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/mbedrpc.py
new file mode 100644
index 0000000000..3fbc2e6f0d
--- /dev/null
+++ b/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/mbedrpc.py
@@ -0,0 +1,287 @@
+"""
+mbed SDK
+Copyright (c) 2010-2013 ARM Limited
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+
+Example:
+> from mbedRPC import*
+> mbed = SerialRPC("COM5",9600);
+> myled = DigitalOut(mbed, LED1);
+> myled.write(1)
+>
+"""
+import serial, urllib2, time
+
+
+class pin():
+ def __init__(self, id):
+ self.name = id
+
+LED1 = pin("LED1")
+LED2 = pin("LED2")
+LED3 = pin("LED3")
+LED4 = pin("LED4")
+
+p5 = pin("p5")
+p6 = pin("p6")
+p7 = pin("p7")
+p8 = pin("p8")
+p9 = pin("p9")
+p10 = pin("p10")
+p11 = pin("p11")
+p12 = pin("p12")
+p13 = pin("p13")
+p14 = pin("p14")
+p15 = pin("p15")
+p16 = pin("p16")
+p17 = pin("p17")
+p18 = pin("p18")
+p19 = pin("p19")
+p20 = pin("p20")
+p21 = pin("p21")
+p22 = pin("p22")
+p23 = pin("p23")
+p24 = pin("p24")
+p25 = pin("p25")
+p26 = pin("p26")
+p27 = pin("p27")
+p28 = pin("p28")
+p29 = pin("p29")
+p30 = pin("p30")
+
+
+#mbed super class
+class mbed:
+ def __init__(self):
+ print("This will work as a demo but no transport mechanism has been selected")
+
+ def rpc(self, name, method, args):
+ print("Superclass method not overridden")
+
+#Transport mechanisms, derived from mbed
+
+class SerialRPC(mbed):
+ def __init__(self, port, baud=9600, reset=True, debug=False):
+ self.ser = serial.Serial(port)
+ self.ser.setBaudrate(baud)
+ self.ser.flushInput()
+ self.ser.flushOutput()
+ self.debug = debug
+ if reset:
+ if debug:
+ print "Reset mbed"
+ self.ser.sendBreak()
+ time.sleep(2)
+
+ def rpc(self, name, method, args):
+ request = "/" + name + "/" + method + " " + " ".join(args)
+ if self.debug:
+ print "[RPC::TX] %s" % request
+ self.ser.write(request + "\n")
+
+ while True:
+ response = self.ser.readline().strip()
+ if self.debug:
+ print "[RPC::RX] %s" % response
+
+ # Ignore comments
+ if not response.startswith('#'): break
+ return response
+
+
+class HTTPRPC(mbed):
+ def __init__(self, ip):
+ self.host = "http://" + ip
+
+ def rpc(self, name, method, args):
+ response = urllib2.urlopen(self.host + "/rpc/" + name + "/" + method + "," + ",".join(args))
+ return response.read().strip()
+
+
+#mbed Interfaces
+
+class DigitalOut():
+ def __init__(self, this_mbed , mpin):
+ self.mbed = this_mbed
+ if isinstance(mpin, str):
+ self.name = mpin
+ elif isinstance(mpin, pin):
+ self.name = self.mbed.rpc("DigitalOut", "new", [mpin.name])
+
+ def __del__(self):
+ r = self.mbed.rpc(self.name, "delete", [])
+
+ def write(self, value):
+ r = self.mbed.rpc(self.name, "write", [str(value)])
+
+ def read(self):
+ r = self.mbed.rpc(self.name, "read", [])
+ return int(r)
+
+
+class AnalogIn():
+ def __init__(self, this_mbed , mpin):
+ self.mbed = this_mbed
+ if isinstance(mpin, str):
+ self.name = mpin
+ elif isinstance(mpin, pin):
+ self.name = self.mbed.rpc("AnalogIn", "new", [mpin.name])
+
+ def __del__(self):
+ r = self.mbed.rpc(self.name, "delete", [])
+
+ def read(self):
+ r = self.mbed.rpc(self.name, "read", [])
+ return float(r)
+
+ def read_u16(self):
+ r = self.mbed.rpc(self.name, "read_u16", [])
+ return int(r)
+
+
+class AnalogOut():
+ def __init__(self, this_mbed , mpin):
+ self.mbed = this_mbed
+ if isinstance(mpin, str):
+ self.name = mpin
+ elif isinstance(mpin, pin):
+ self.name = self.mbed.rpc("AnalogOut", "new", [mpin.name])
+
+ def __del__(self):
+ r = self.mbed.rpc(self.name, "delete", [])
+
+ def write(self, value):
+ r = self.mbed.rpc(self.name, "write", [str(value)])
+
+ def write_u16(self, value):
+ r = self.mbed.rpc(self.name, "write_u16", [str(value)])
+
+ def read(self):
+ r = self.mbed.rpc(self.name, "read", [])
+ return float(r)
+
+
+class DigitalIn():
+ def __init__(self, this_mbed , mpin):
+ self.mbed = this_mbed
+ if isinstance(mpin, str):
+ self.name = mpin
+ elif isinstance(mpin, pin):
+ self.name = self.mbed.rpc("DigitalIn", "new", [mpin.name])
+
+ def __del__(self):
+ r = self.mbed.rpc(self.name, "delete", [])
+
+ def read(self):
+ r = self.mbed.rpc(self.name, "read", [])
+ return int(r)
+
+
+class PwmOut():
+ def __init__(self, this_mbed , mpin):
+ self.mbed = this_mbed
+ if isinstance(mpin, str):
+ self.name = mpin
+ elif isinstance(mpin, pin):
+ self.name = self.mbed.rpc("PwmOut", "new", [mpin.name])
+
+ def __del__(self):
+ r = self.mbed.rpc(self.name, "delete", [])
+
+ def write(self, value):
+ r = self.mbed.rpc(self.name, "write", [str(value)])
+
+ def read(self):
+ r = self.mbed.rpc(self.name, "read", [])
+ return float(r)
+
+ def period(self, value):
+ r = self.mbed.rpc(self.name, "period", [str(value)])
+
+ def period_ms(self, value):
+ r = self.mbed.rpc(self.name, "period_ms", [str(value)])
+
+ def period_us(self, value):
+ r = self.mbed.rpc(self.name, "period_us", [str(value)])
+
+ def puslewidth(self, value):
+ r = self.mbed.rpc(self.name, "pulsewidth", [str(value)])
+
+ def puslewidth_ms(self, value):
+ r = self.mbed.rpc(self.name, "pulsewidth_ms", [str(value)])
+
+ def puslewidth_us(self, value):
+ r = self.mbed.rpc(self.name, "pulsewidth_us", [str(value)])
+
+
+class Serial():
+ def __init__(self, this_mbed , tx, rx = ""):
+ self.mbed = this_mbed
+ if isinstance(tx, str):
+ self.name = mpin
+ elif isinstance(mpin, pin):
+ self.name = self.mbed.rpc("Serial", "new", [tx.name, rx.name])
+
+ def __del__(self):
+ r = self.mbed.rpc(self.name, "delete", [])
+
+ def putc(self, value):
+ r = self.mbed.rpc(self.name, "putc", [str(value)])
+
+ def puts(self, value):
+ r = self.mbed.rpc(self.name, "puts", [ "\"" + str(value) + "\""])
+
+ def getc(self):
+ r = self.mbed.rpc(self.name, "getc", [])
+ return int(r)
+
+
+class RPCFunction():
+ def __init__(self, this_mbed , name):
+ self.mbed = this_mbed
+ if isinstance(name, str):
+ self.name = name
+
+ def __del__(self):
+ r = self.mbed.rpc(self.name, "delete", [])
+
+ def read(self):
+ r = self.mbed.rpc(self.name, "read", [])
+ return int(r)
+
+ def run(self, input):
+ r = self.mbed.rpc(self.name, "run", [input])
+ return r
+
+
+class RPCVariable():
+ def __init__(self, this_mbed , name):
+ self.mbed = this_mbed
+ if isinstance(name, str):
+ self.name = name
+
+ def __del__(self):
+ r = self.mbed.rpc(self.name, "delete", [])
+
+ def write(self, value):
+ self.mbed.rpc(self.name, "write", [str(value)])
+
+ def read(self):
+ r = self.mbed.rpc(self.name, "read", [])
+ return r
+
+
+def wait(s):
+ time.sleep(s)
diff --git a/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/midi.py b/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/midi.py
new file mode 100644
index 0000000000..67f34ea6f8
--- /dev/null
+++ b/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/midi.py
@@ -0,0 +1,72 @@
+from __future__ import print_function
+import sys
+import re
+import time
+import mido
+from mido import Message
+
+
+def test_midi_in(port):
+ expected_messages_count=0
+ while expected_messages_count < 7:
+ for message in port.iter_pending():
+ if message.type in ('note_on', 'note_off', 'program_change', 'sysex'):
+ yield message
+ expected_messages_count+=1
+ time.sleep(0.1)
+
+def test_midi_loopback(input_port):
+ expected_messages_count=0
+ while expected_messages_count < 1:
+ for message in input_port.iter_pending():
+ print('Test MIDI OUT loopback received {}'.format(message.hex()))
+ expected_messages_count+=1
+
+def test_midi_out_loopback(output_port,input_port):
+ print("Test MIDI OUT loopback")
+ output_port.send(Message('program_change', program=1))
+ test_midi_loopback(input_port)
+
+ output_port.send(Message('note_on', note=21))
+ test_midi_loopback(input_port)
+
+ output_port.send(Message('note_off', note=21))
+ test_midi_loopback(input_port)
+
+ output_port.send(Message('sysex', data=[0x7E,0x7F,0x09,0x01]))
+ test_midi_loopback(input_port)
+
+ output_port.send(Message('sysex', data=[0x7F,0x7F,0x04,0x01,0x7F,0x7F]))
+ test_midi_loopback(input_port)
+
+ output_port.send(Message('sysex', data=[0x41,0x10,0x42,0x12,0x40,0x00,0x7F,0x00,0x41]))
+ test_midi_loopback(input_port)
+
+ output_port.send(Message('sysex', data=[0x41,0x10,0x42,0x12,0x40,0x00,0x04,0x7F,0x3D]))
+ test_midi_loopback(input_port)
+
+portname=""
+
+while portname=="":
+ print("Wait for MIDI IN plug ...")
+ for name in mido.get_input_names():
+ matchObj = re.match( r'Mbed', name)
+
+ if matchObj:
+ portname=name
+ time.sleep( 1 )
+
+try:
+ input_port = mido.open_input(portname)
+ output_port = mido.open_output(portname)
+
+ print('Using {}'.format(input_port))
+
+ print("Test MIDI IN")
+
+ for message in test_midi_in(input_port):
+ print('Test MIDI IN received {}'.format(message.hex()))
+
+ test_midi_out_loopback(output_port,input_port)
+except KeyboardInterrupt:
+ pass \ No newline at end of file
diff --git a/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/net_test.py b/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/net_test.py
new file mode 100644
index 0000000000..01b4541abb
--- /dev/null
+++ b/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/net_test.py
@@ -0,0 +1,27 @@
+"""
+mbed SDK
+Copyright (c) 2011-2013 ARM Limited
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+from host_test import Test, Simple
+from sys import stdout
+
+class NETTest(Simple):
+ def __init__(self):
+ Test.__init__(self)
+ self.mbed.init_serial(115200)
+ self.mbed.reset()
+
+if __name__ == '__main__':
+ NETTest().run()
diff --git a/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/rpc.py b/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/rpc.py
new file mode 100644
index 0000000000..84b85d2cc6
--- /dev/null
+++ b/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/rpc.py
@@ -0,0 +1,56 @@
+"""
+mbed SDK
+Copyright (c) 2011-2013 ARM Limited
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+from host_test import Test
+from mbedrpc import SerialRPC, DigitalOut, DigitalIn, pin
+
+
+class RpcTest(Test):
+ def test(self):
+ self.notify("RPC Test")
+ s = SerialRPC(self.mbed.port, debug=True)
+
+ self.notify("Init remote objects")
+
+ p_out = pin("p10")
+ p_in = pin("p11")
+
+ if hasattr(self.mbed.options, 'micro'):
+ if self.mbed.options.micro == 'M0+':
+ print "Freedom Board: PTA12 <-> PTC4"
+ p_out = pin("PTA12")
+ p_in = pin("PTC4")
+
+ self.output = DigitalOut(s, p_out);
+ self.input = DigitalIn(s, p_in);
+
+ self.check = True
+ self.write_read_test(1)
+ self.write_read_test(0)
+ return self.check
+
+ def write_read_test(self, v):
+ self.notify("Check %d" % v)
+ self.output.write(v)
+ if self.input.read() != v:
+ self.notify("ERROR")
+ self.check = False
+ else:
+ self.notify("OK")
+
+
+if __name__ == '__main__':
+ RpcTest().run()
diff --git a/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/rtc_auto.py b/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/rtc_auto.py
new file mode 100644
index 0000000000..d267936517
--- /dev/null
+++ b/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/rtc_auto.py
@@ -0,0 +1,50 @@
+"""
+mbed SDK
+Copyright (c) 2011-2013 ARM Limited
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+
+import re
+from time import time, strftime, gmtime
+
+class RTCTest():
+ PATTERN_RTC_VALUE = "\[(\d+)\] \[(\d+-\d+-\d+ \d+:\d+:\d+ [AaPpMm]{2})\]"
+ re_detect_rtc_value = re.compile(PATTERN_RTC_VALUE)
+
+ def test(self, selftest):
+ test_result = True
+ start = time()
+ sec_prev = 0
+ for i in range(0, 5):
+ # Timeout changed from default: we need to wait longer for some boards to start-up
+ c = selftest.mbed.serial_readline(timeout=10)
+ if c is None:
+ return selftest.RESULT_IO_SERIAL
+ selftest.notify(c.strip())
+ delta = time() - start
+ m = self.re_detect_rtc_value.search(c)
+ if m and len(m.groups()):
+ sec = int(m.groups()[0])
+ time_str = m.groups()[1]
+ correct_time_str = strftime("%Y-%m-%d %H:%M:%S %p", gmtime(float(sec)))
+ single_result = time_str == correct_time_str and sec > 0 and sec > sec_prev
+ test_result = test_result and single_result
+ result_msg = "OK" if single_result else "FAIL"
+ selftest.notify("HOST: [%s] [%s] received time %+d sec after %.2f sec... %s"% (sec, time_str, sec - sec_prev, delta, result_msg))
+ sec_prev = sec
+ else:
+ test_result = False
+ break
+ start = time()
+ return selftest.RESULT_SUCCESS if test_result else selftest.RESULT_FAILURE
diff --git a/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/stdio_auto.py b/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/stdio_auto.py
new file mode 100644
index 0000000000..1fe18906aa
--- /dev/null
+++ b/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/stdio_auto.py
@@ -0,0 +1,56 @@
+"""
+mbed SDK
+Copyright (c) 2011-2013 ARM Limited
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+
+import re
+import random
+from time import time
+
+class StdioTest():
+ PATTERN_INT_VALUE = "Your value was: (-?\d+)"
+ re_detect_int_value = re.compile(PATTERN_INT_VALUE)
+
+ def test(self, selftest):
+ test_result = True
+
+ c = selftest.mbed.serial_readline() # {{start}} preamble
+ if c is None:
+ return selftest.RESULT_IO_SERIAL
+ selftest.notify(c)
+
+ for i in range(0, 10):
+ random_integer = random.randint(-99999, 99999)
+ selftest.notify("HOST: Generated number: " + str(random_integer))
+ start = time()
+ selftest.mbed.serial_write(str(random_integer) + "\n")
+
+ serial_stdio_msg = selftest.mbed.serial_readline()
+ if serial_stdio_msg is None:
+ return selftest.RESULT_IO_SERIAL
+ delay_time = time() - start
+ selftest.notify(serial_stdio_msg.strip())
+
+ # Searching for reply with scanned values
+ m = self.re_detect_int_value.search(serial_stdio_msg)
+ if m and len(m.groups()):
+ int_value = m.groups()[0]
+ int_value_cmp = random_integer == int(int_value)
+ test_result = test_result and int_value_cmp
+ selftest.notify("HOST: Number %s read after %.3f sec ... [%s]"% (int_value, delay_time, "OK" if int_value_cmp else "FAIL"))
+ else:
+ test_result = False
+ break
+ return selftest.RESULT_SUCCESS if test_result else selftest.RESULT_FAILURE
diff --git a/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/tcpecho_client.py b/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/tcpecho_client.py
new file mode 100644
index 0000000000..303f002ab8
--- /dev/null
+++ b/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/tcpecho_client.py
@@ -0,0 +1,57 @@
+"""
+mbed SDK
+Copyright (c) 2011-2013 ARM Limited
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+import socket
+import string, random
+from time import time
+
+from private_settings import SERVER_ADDRESS
+
+ECHO_PORT = 7
+
+LEN_PACKET = 127
+N_PACKETS = 5000
+TOT_BITS = float(LEN_PACKET * N_PACKETS * 8) * 2
+MEGA = float(1024 * 1024)
+UPDATE_STEP = (N_PACKETS/10)
+
+class TCP_EchoClient:
+ def __init__(self, host):
+ self.s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ self.s.connect((host, ECHO_PORT))
+ self.packet = ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(LEN_PACKET))
+
+ def __packet(self):
+ # Comment out the checks when measuring the throughput
+ # self.packet = ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(LEN_PACKET))
+ self.s.send(self.packet)
+ data = self.s.recv(LEN_PACKET)
+ # assert self.packet == data, "packet error:\n%s\n%s\n" % (self.packet, data)
+
+ def test(self):
+ start = time()
+ for i in range(N_PACKETS):
+ if (i % UPDATE_STEP) == 0: print '%.2f%%' % ((float(i)/float(N_PACKETS)) * 100.)
+ self.__packet()
+ t = time() - start
+ print 'Throughput: (%.2f)Mbits/s' % ((TOT_BITS / t)/MEGA)
+
+ def __del__(self):
+ self.s.close()
+
+while True:
+ e = TCP_EchoClient(SERVER_ADDRESS)
+ e.test()
diff --git a/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/tcpecho_client_auto.py b/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/tcpecho_client_auto.py
new file mode 100644
index 0000000000..fe915a1ce9
--- /dev/null
+++ b/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/tcpecho_client_auto.py
@@ -0,0 +1,87 @@
+"""
+mbed SDK
+Copyright (c) 2011-2013 ARM Limited
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+
+import sys
+import socket
+from sys import stdout
+from SocketServer import BaseRequestHandler, TCPServer
+
+class TCPEchoClient_Handler(BaseRequestHandler):
+ def handle(self):
+ """ One handle per connection
+ """
+ print "HOST: Connection received...",
+ count = 1;
+ while True:
+ data = self.request.recv(1024)
+ if not data: break
+ self.request.sendall(data)
+ if '{{end}}' in str(data):
+ print
+ print str(data)
+ else:
+ if not count % 10:
+ sys.stdout.write('.')
+ count += 1
+ stdout.flush()
+
+class TCPEchoClientTest():
+ def send_server_ip_port(self, selftest, ip_address, port_no):
+ """ Set up network host. Reset target and and send server IP via serial to Mbed
+ """
+ c = selftest.mbed.serial_readline() # 'TCPCllient waiting for server IP and port...'
+ if c is None:
+ self.print_result(selftest.RESULT_IO_SERIAL)
+ return
+
+ selftest.notify(c.strip())
+ selftest.notify("HOST: Sending server IP Address to target...")
+
+ connection_str = ip_address + ":" + str(port_no) + "\n"
+ selftest.mbed.serial_write(connection_str)
+ selftest.notify(connection_str)
+
+ # Two more strings about connection should be sent by MBED
+ for i in range(0, 2):
+ c = selftest.mbed.serial_readline()
+ if c is None:
+ selftest.print_result(self.RESULT_IO_SERIAL)
+ return
+ selftest.notify(c.strip())
+
+ def test(self, selftest):
+ # We need to discover SERVEP_IP and set up SERVER_PORT
+ # Note: Port 7 is Echo Protocol:
+ #
+ # Port number rationale:
+ #
+ # The Echo Protocol is a service in the Internet Protocol Suite defined
+ # in RFC 862. It was originally proposed for testing and measurement
+ # of round-trip times[citation needed] in IP networks.
+ #
+ # A host may connect to a server that supports the Echo Protocol using
+ # the Transmission Control Protocol (TCP) or the User Datagram Protocol
+ # (UDP) on the well-known port number 7. The server sends back an
+ # identical copy of the data it received.
+ SERVER_IP = str(socket.gethostbyname(socket.getfqdn()))
+ SERVER_PORT = 7
+
+ # Returning none will suppress host test from printing success code
+ server = TCPServer((SERVER_IP, SERVER_PORT), TCPEchoClient_Handler)
+ print "HOST: Listening for TCP connections: " + SERVER_IP + ":" + str(SERVER_PORT)
+ self.send_server_ip_port(selftest, SERVER_IP, SERVER_PORT)
+ server.serve_forever()
diff --git a/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/tcpecho_server.py b/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/tcpecho_server.py
new file mode 100644
index 0000000000..4a68bd9ee7
--- /dev/null
+++ b/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/tcpecho_server.py
@@ -0,0 +1,50 @@
+"""
+mbed SDK
+Copyright (c) 2011-2013 ARM Limited
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+from SocketServer import BaseRequestHandler, TCPServer
+from time import time
+
+from private_settings import LOCALHOST
+
+MAX_INDEX = 126
+MEGA = float(1024 * 1024)
+
+class TCP_EchoHandler(BaseRequestHandler):
+ def handle(self):
+ print "\nconnection received"
+ start = time()
+ bytes = 0
+ index = 0
+ while True:
+ data = self.request.recv(1024)
+ if not data: break
+
+ bytes += len(data)
+ for n in map(ord, data):
+ if n != index:
+ print "data error %d != %d" % (n , index)
+ index += 1
+ if index > MAX_INDEX:
+ index = 0
+
+ self.request.sendall(data)
+ t = time() - start
+ b = float(bytes * 8) * 2
+ print "Throughput: (%.2f)Mbits/s" % ((b/t)/MEGA)
+
+server = TCPServer((LOCALHOST, 7), TCP_EchoHandler)
+print "listening for connections"
+server.serve_forever()
diff --git a/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/tcpecho_server_auto.py b/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/tcpecho_server_auto.py
new file mode 100644
index 0000000000..8bc0e300d7
--- /dev/null
+++ b/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/tcpecho_server_auto.py
@@ -0,0 +1,84 @@
+"""
+mbed SDK
+Copyright (c) 2011-2013 ARM Limited
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+
+import re
+import sys
+import uuid
+import socket
+from sys import stdout
+
+class TCPEchoServerTest():
+ ECHO_SERVER_ADDRESS = ""
+ ECHO_PORT = 0
+ ECHO_LOOPs = 100
+ s = None # Socket
+
+ PATTERN_SERVER_IP = "Server IP Address is (\d+).(\d+).(\d+).(\d+):(\d+)"
+ re_detect_server_ip = re.compile(PATTERN_SERVER_IP)
+
+ def test(self, selftest):
+ result = False
+ c = selftest.mbed.serial_readline()
+ if c is None:
+ return selftest.RESULT_IO_SERIAL
+ selftest.notify(c)
+
+ m = self.re_detect_server_ip.search(c)
+ if m and len(m.groups()):
+ self.ECHO_SERVER_ADDRESS = ".".join(m.groups()[:4])
+ self.ECHO_PORT = int(m.groups()[4]) # must be integer for socket.connect method
+ selftest.notify("HOST: TCP Server found at: " + self.ECHO_SERVER_ADDRESS + ":" + str(self.ECHO_PORT))
+
+ # We assume this test fails so can't send 'error' message to server
+ try:
+ self.s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ self.s.connect((self.ECHO_SERVER_ADDRESS, self.ECHO_PORT))
+ except Exception, e:
+ self.s = None
+ selftest.notify("HOST: Socket error: %s"% e)
+ return selftest.RESULT_ERROR
+
+ print 'HOST: Sending %d echo strings...'% self.ECHO_LOOPs,
+ for i in range(0, self.ECHO_LOOPs):
+ TEST_STRING = str(uuid.uuid4())
+ try:
+ self.s.sendall(TEST_STRING)
+ data = self.s.recv(128)
+ except Exception, e:
+ self.s = None
+ selftest.notify("HOST: Socket error: %s"% e)
+ return selftest.RESULT_ERROR
+
+ received_str = repr(data)[1:-1]
+ if TEST_STRING == received_str: # We need to cut not needed single quotes from the string
+ sys.stdout.write('.')
+ stdout.flush()
+ result = True
+ else:
+ print "Expected: "
+ print "'%s'"% TEST_STRING
+ print "received: "
+ print "'%s'"% received_str
+ result = False
+ break
+
+ if self.s is not None:
+ self.s.close()
+ else:
+ selftest.notify("HOST: TCP Server not found")
+ result = False
+ return selftest.RESULT_SUCCESS if result else selftest.RESULT_FAILURE
diff --git a/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/tcpecho_server_loop.py b/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/tcpecho_server_loop.py
new file mode 100644
index 0000000000..df483974aa
--- /dev/null
+++ b/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/tcpecho_server_loop.py
@@ -0,0 +1,40 @@
+"""
+mbed SDK
+Copyright (c) 2011-2013 ARM Limited
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+# Be sure that the tools directory is in the search path
+import sys
+from os.path import join, abspath, dirname
+ROOT = abspath(join(dirname(__file__), "..", ".."))
+sys.path.insert(0, ROOT)
+
+from workspace_tools.private_settings import LOCALHOST
+from SocketServer import BaseRequestHandler, TCPServer
+
+
+class TCP_EchoHandler(BaseRequestHandler):
+ def handle(self):
+ print "\nHandle connection from:", self.client_address
+ while True:
+ data = self.request.recv(1024)
+ if not data: break
+ self.request.sendall(data)
+ self.request.close()
+ print "socket closed"
+
+if __name__ == '__main__':
+ server = TCPServer((LOCALHOST, 7), TCP_EchoHandler)
+ print "listening for connections on:", (LOCALHOST, 7)
+ server.serve_forever()
diff --git a/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/udp_link_layer_auto.py b/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/udp_link_layer_auto.py
new file mode 100644
index 0000000000..cb0578fdf6
--- /dev/null
+++ b/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/udp_link_layer_auto.py
@@ -0,0 +1,145 @@
+"""
+mbed SDK
+Copyright (c) 2011-2013 ARM Limited
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+
+"""
+How to use:
+make.py -m LPC1768 -t ARM -d E:\ -n NET_14
+udp_link_layer_auto.py -p COM20 -d E:\ -t 10
+"""
+
+import re
+import uuid
+import socket
+import thread
+from sys import stdout
+from time import time, sleep
+from host_test import DefaultTest
+from SocketServer import BaseRequestHandler, UDPServer
+
+
+# Received datagrams (with time)
+dict_udp_recv_datagrams = dict()
+
+# Sent datagrams (with time)
+dict_udp_sent_datagrams = dict()
+
+
+class UDPEchoClient_Handler(BaseRequestHandler):
+ def handle(self):
+ """ One handle per connection
+ """
+ _data, _socket = self.request
+ # Process received datagram
+ data_str = repr(_data)[1:-1]
+ dict_udp_recv_datagrams[data_str] = time()
+
+
+def udp_packet_recv(threadName, server_ip, server_port):
+ """ This function will receive packet stream from mbed device
+ """
+ server = UDPServer((server_ip, server_port), UDPEchoClient_Handler)
+ print "[UDP_COUNTER] Listening for connections... %s:%d"% (server_ip, server_port)
+ server.serve_forever()
+
+
+class UDPEchoServerTest(DefaultTest):
+ ECHO_SERVER_ADDRESS = "" # UDP IP of datagram bursts
+ ECHO_PORT = 0 # UDP port for datagram bursts
+ CONTROL_PORT = 23 # TCP port used to get stats from mbed device, e.g. counters
+ s = None # Socket
+
+ TEST_PACKET_COUNT = 1000 # how many packets should be send
+ TEST_STRESS_FACTOR = 0.001 # stress factor: 10 ms
+ PACKET_SATURATION_RATIO = 29.9 # Acceptable packet transmission in %
+
+ PATTERN_SERVER_IP = "Server IP Address is (\d+).(\d+).(\d+).(\d+):(\d+)"
+ re_detect_server_ip = re.compile(PATTERN_SERVER_IP)
+
+ def get_control_data(self, command="stat\n"):
+ BUFFER_SIZE = 256
+ try:
+ s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ s.connect((self.ECHO_SERVER_ADDRESS, self.CONTROL_PORT))
+ except Exception, e:
+ data = None
+ s.send(command)
+ data = s.recv(BUFFER_SIZE)
+ s.close()
+ return data
+
+ def test(self):
+ serial_ip_msg = self.mbed.serial_readline()
+ if serial_ip_msg is None:
+ return self.RESULT_IO_SERIAL
+ stdout.write(serial_ip_msg)
+ stdout.flush()
+ # Searching for IP address and port prompted by server
+ m = self.re_detect_server_ip.search(serial_ip_msg)
+ if m and len(m.groups()):
+ self.ECHO_SERVER_ADDRESS = ".".join(m.groups()[:4])
+ self.ECHO_PORT = int(m.groups()[4]) # must be integer for socket.connect method
+ self.notify("HOST: UDP Server found at: " + self.ECHO_SERVER_ADDRESS + ":" + str(self.ECHO_PORT))
+
+ # Open client socket to burst datagrams to UDP server in mbed
+ try:
+ self.s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+ except Exception, e:
+ self.s = None
+ self.notify("HOST: Error: %s"% e)
+ return self.RESULT_ERROR
+
+ # UDP replied receiver works in background to get echoed datagrams
+ SERVER_IP = str(socket.gethostbyname(socket.getfqdn()))
+ SERVER_PORT = self.ECHO_PORT + 1
+ thread.start_new_thread(udp_packet_recv, ("Thread-udp-recv", SERVER_IP, SERVER_PORT))
+ sleep(0.5)
+
+ # Burst part
+ for no in range(self.TEST_PACKET_COUNT):
+ TEST_STRING = str(uuid.uuid4())
+ payload = str(no) + "__" + TEST_STRING
+ self.s.sendto(payload, (self.ECHO_SERVER_ADDRESS, self.ECHO_PORT))
+ dict_udp_sent_datagrams[payload] = time()
+ sleep(self.TEST_STRESS_FACTOR)
+
+ if self.s is not None:
+ self.s.close()
+
+ # Wait 5 seconds for packets to come
+ result = True
+ self.notify("HOST: Test Summary:")
+ for d in range(5):
+ sleep(1.0)
+ summary_datagram_success = (float(len(dict_udp_recv_datagrams)) / float(self.TEST_PACKET_COUNT)) * 100.0
+ self.notify("HOST: Datagrams received after +%d sec: %.3f%% (%d / %d), stress=%.3f ms"% (d,
+ summary_datagram_success,
+ len(dict_udp_recv_datagrams),
+ self.TEST_PACKET_COUNT,
+ self.TEST_STRESS_FACTOR))
+ result = result and (summary_datagram_success >= self.PACKET_SATURATION_RATIO)
+ stdout.flush()
+
+ # Getting control data from test
+ self.notify("...")
+ self.notify("HOST: Mbed Summary:")
+ mbed_stats = self.get_control_data()
+ self.notify(mbed_stats)
+ return self.RESULT_SUCCESS if result else self.RESULT_FAILURE
+
+
+if __name__ == '__main__':
+ UDPEchoServerTest().run()
diff --git a/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/udpecho_client.py b/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/udpecho_client.py
new file mode 100644
index 0000000000..1ff833f175
--- /dev/null
+++ b/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/udpecho_client.py
@@ -0,0 +1,55 @@
+"""
+mbed SDK
+Copyright (c) 2011-2013 ARM Limited
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+from socket import socket, AF_INET, SOCK_DGRAM
+import string, random
+from time import time
+
+from private_settings import CLIENT_ADDRESS
+
+ECHO_PORT = 7
+
+LEN_PACKET = 127
+N_PACKETS = 5000
+TOT_BITS = float(LEN_PACKET * N_PACKETS * 8) * 2
+MEGA = float(1024 * 1024)
+UPDATE_STEP = (N_PACKETS/10)
+
+class UDP_EchoClient:
+ s = socket(AF_INET, SOCK_DGRAM)
+
+ def __init__(self, host):
+ self.host = host
+ self.packet = ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(LEN_PACKET))
+
+ def __packet(self):
+ # Comment out the checks when measuring the throughput
+ # packet = ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(LEN_PACKET))
+ UDP_EchoClient.s.sendto(packet, (self.host, ECHO_PORT))
+ data = UDP_EchoClient.s.recv(LEN_PACKET)
+ # assert packet == data, "packet error:\n%s\n%s\n" % (packet, data)
+
+ def test(self):
+ start = time()
+ for i in range(N_PACKETS):
+ if (i % UPDATE_STEP) == 0: print '%.2f%%' % ((float(i)/float(N_PACKETS)) * 100.)
+ self.__packet()
+ t = time() - start
+ print 'Throughput: (%.2f)Mbits/s' % ((TOT_BITS / t)/MEGA)
+
+while True:
+ e = UDP_EchoClient(CLIENT_ADDRESS)
+ e.test()
diff --git a/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/udpecho_client_auto.py b/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/udpecho_client_auto.py
new file mode 100644
index 0000000000..7896127077
--- /dev/null
+++ b/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/udpecho_client_auto.py
@@ -0,0 +1,77 @@
+"""
+mbed SDK
+Copyright (c) 2011-2013 ARM Limited
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+
+import sys
+import socket
+from sys import stdout
+from SocketServer import BaseRequestHandler, UDPServer
+
+class UDPEchoClient_Handler(BaseRequestHandler):
+ def handle(self):
+ """ One handle per connection
+ """
+ data, socket = self.request
+ socket.sendto(data, self.client_address)
+ if '{{end}}' in data:
+ print
+ print data
+ else:
+ sys.stdout.write('.')
+ stdout.flush()
+
+class UDPEchoClientTest():
+
+ def send_server_ip_port(self, selftest, ip_address, port_no):
+ c = selftest.mbed.serial_readline() # 'UDPCllient waiting for server IP and port...'
+ if c is None:
+ selftest.print_result(selftest.RESULT_IO_SERIAL)
+ return
+ selftest.notify(c.strip())
+
+ selftest.notify("HOST: Sending server IP Address to target...")
+ connection_str = ip_address + ":" + str(port_no) + "\n"
+ selftest.mbed.serial_write(connection_str)
+
+ c = selftest.mbed.serial_readline() # 'UDPCllient waiting for server IP and port...'
+ if c is None:
+ self.print_result(selftest.RESULT_IO_SERIAL)
+ return
+ selftest.notify(c.strip())
+ return selftest.RESULT_PASSIVE
+
+ def test(self, selftest):
+ # We need to discover SERVEP_IP and set up SERVER_PORT
+ # Note: Port 7 is Echo Protocol:
+ #
+ # Port number rationale:
+ #
+ # The Echo Protocol is a service in the Internet Protocol Suite defined
+ # in RFC 862. It was originally proposed for testing and measurement
+ # of round-trip times[citation needed] in IP networks.
+ #
+ # A host may connect to a server that supports the Echo Protocol using
+ # the Transmission Control Protocol (TCP) or the User Datagram Protocol
+ # (UDP) on the well-known port number 7. The server sends back an
+ # identical copy of the data it received.
+ SERVER_IP = str(socket.gethostbyname(socket.getfqdn()))
+ SERVER_PORT = 7
+
+ # Returning none will suppress host test from printing success code
+ server = UDPServer((SERVER_IP, SERVER_PORT), UDPEchoClient_Handler)
+ print "HOST: Listening for UDP connections..."
+ self.send_server_ip_port(selftest, SERVER_IP, SERVER_PORT)
+ server.serve_forever()
diff --git a/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/udpecho_server.py b/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/udpecho_server.py
new file mode 100644
index 0000000000..f6074332e4
--- /dev/null
+++ b/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/udpecho_server.py
@@ -0,0 +1,29 @@
+"""
+mbed SDK
+Copyright (c) 2011-2013 ARM Limited
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+from SocketServer import BaseRequestHandler, UDPServer
+from private_settings import SERVER_ADDRESS
+
+class UDP_EchoHandler(BaseRequestHandler):
+ def handle(self):
+ data, socket = self.request
+ print "client:", self.client_address
+ print "data:", data
+ socket.sendto(data, self.client_address)
+
+server = UDPServer((SERVER_ADDRESS, 7195), UDP_EchoHandler)
+print "listening for connections"
+server.serve_forever()
diff --git a/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/udpecho_server_auto.py b/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/udpecho_server_auto.py
new file mode 100644
index 0000000000..a7ee026306
--- /dev/null
+++ b/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/udpecho_server_auto.py
@@ -0,0 +1,68 @@
+"""
+mbed SDK
+Copyright (c) 2011-2013 ARM Limited
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+
+import re
+import sys
+import uuid
+from sys import stdout
+from socket import socket, AF_INET, SOCK_DGRAM
+
+class UDPEchoServerTest():
+ ECHO_SERVER_ADDRESS = ""
+ ECHO_PORT = 0
+ s = None # Socket
+
+ PATTERN_SERVER_IP = "Server IP Address is (\d+).(\d+).(\d+).(\d+):(\d+)"
+ re_detect_server_ip = re.compile(PATTERN_SERVER_IP)
+
+ def test(self, selftest):
+ result = True
+ serial_ip_msg = selftest.mbed.serial_readline()
+ if serial_ip_msg is None:
+ return selftest.RESULT_IO_SERIAL
+ selftest.notify(serial_ip_msg)
+ # Searching for IP address and port prompted by server
+ m = self.re_detect_server_ip.search(serial_ip_msg)
+ if m and len(m.groups()):
+ self.ECHO_SERVER_ADDRESS = ".".join(m.groups()[:4])
+ self.ECHO_PORT = int(m.groups()[4]) # must be integer for socket.connect method
+ selftest.notify("HOST: UDP Server found at: " + self.ECHO_SERVER_ADDRESS + ":" + str(self.ECHO_PORT))
+
+ # We assume this test fails so can't send 'error' message to server
+ try:
+ self.s = socket(AF_INET, SOCK_DGRAM)
+ except Exception, e:
+ self.s = None
+ selftest.notify("HOST: Socket error: %s"% e)
+ return selftest.RESULT_ERROR
+
+ for i in range(0, 100):
+ TEST_STRING = str(uuid.uuid4())
+ self.s.sendto(TEST_STRING, (self.ECHO_SERVER_ADDRESS, self.ECHO_PORT))
+ data = self.s.recv(len(TEST_STRING))
+ received_str = repr(data)[1:-1]
+ if TEST_STRING != received_str:
+ result = False
+ break
+ sys.stdout.write('.')
+ stdout.flush()
+ else:
+ result = False
+
+ if self.s is not None:
+ self.s.close()
+ return selftest.RESULT_SUCCESS if result else selftest.RESULT_FAILURE
diff --git a/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/wait_us_auto.py b/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/wait_us_auto.py
new file mode 100644
index 0000000000..2ab66a3b51
--- /dev/null
+++ b/tmk_core/tool/mbed/mbed-sdk/workspace_tools/host_tests/wait_us_auto.py
@@ -0,0 +1,69 @@
+"""
+mbed SDK
+Copyright (c) 2011-2013 ARM Limited
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+
+from time import time
+
+class WaitusTest():
+ """ This test is reading single characters from stdio
+ and measures time between their occurrences.
+ """
+ TICK_LOOP_COUNTER = 13
+ TICK_LOOP_SUCCESSFUL_COUNTS = 10
+ DEVIATION = 0.10 # +/-10%
+
+ def test(self, selftest):
+ test_result = True
+ # First character to start test (to know after reset when test starts)
+ if selftest.mbed.set_serial_timeout(None) is None:
+ return selftest.RESULT_IO_SERIAL
+ c = selftest.mbed.serial_read(1)
+ if c is None:
+ return selftest.RESULT_IO_SERIAL
+ if c == '$': # target will printout TargetID e.g.: $$$$1040e649d5c09a09a3f6bc568adef61375c6
+ #Read additional 39 bytes of TargetID
+ if selftest.mbed.serial_read(39) is None:
+ return selftest.RESULT_IO_SERIAL
+ c = selftest.mbed.serial_read(1) # Re-read first 'tick'
+ if c is None:
+ return selftest.RESULT_IO_SERIAL
+ start_serial_pool = time()
+ start = time()
+
+ success_counter = 0
+
+ for i in range(0, self.TICK_LOOP_COUNTER):
+ c = selftest.mbed.serial_read(1)
+ if c is None:
+ return selftest.RESULT_IO_SERIAL
+ delta = time() - start
+ deviation = abs(delta - 1)
+ # Round values
+ delta = round(delta, 2)
+ deviation = round(deviation, 2)
+ # Check if time measurements are in given range
+ deviation_ok = True if delta > 0 and deviation <= self.DEVIATION else False
+ success_counter = success_counter+1 if deviation_ok else 0
+ msg = "OK" if deviation_ok else "FAIL"
+ selftest.notify("%s in %.2f sec (%.2f) [%s]"% (c, delta, deviation, msg))
+ start = time()
+ if success_counter >= self.TICK_LOOP_SUCCESSFUL_COUNTS:
+ break
+ measurement_time = time() - start_serial_pool
+ selftest.notify("Consecutive OK timer reads: %d"% success_counter)
+ selftest.notify("Completed in %.2f sec" % (measurement_time))
+ test_result = True if success_counter >= self.TICK_LOOP_SUCCESSFUL_COUNTS else False
+ return selftest.RESULT_SUCCESS if test_result else selftest.RESULT_FAILURE