Skip to content

Commit

Permalink
Merge pull request #2 from AlexanderBraml/kei2600_buffering
Browse files Browse the repository at this point in the history
Implement Execution Buffering for KEI2600
  • Loading branch information
AlexanderBraml authored Feb 1, 2024
2 parents 2a47178 + 1c094dd commit 37c2940
Show file tree
Hide file tree
Showing 10 changed files with 258 additions and 9 deletions.
4 changes: 2 additions & 2 deletions py_instrument_control_lib/device_types/FunctionGenerator.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@


class FuncChannel(Enum):
CHANNEL_A = ""
CHANNEL_B = ""
CHANNEL_A = "A"
CHANNEL_B = "B"


class FGUnit(Enum):
Expand Down
128 changes: 127 additions & 1 deletion py_instrument_control_lib/devices/KEI2600.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""
This code is automatically generated from '../../../specifications/KEI2600.json'.
This code is automatically generated from '../../specifications/KEI2600.json'.
Any changes made to this file are overwritten if you regenerate this module.
Only make changes in the source file.
"""
Expand All @@ -16,10 +16,40 @@

_, _, _, _, _, _, _, _, _ = (SMU, FunctionGenerator, PowerSupply, Oscilloscope, TCPDevice,
KeysightDevice, KeithleyDevice, AbstractSwitchMatrix, FloDevice)
import requests
from py_instrument_control_lib.device_base.DeviceException import DeviceException
import time
import copy
from typing import Optional


class KEI2600(SMU, KeithleyDevice):

def execute(self, command: str, check_errors: bool = False) \
-> None:
if self._buffering_enabled:
self._buffered_script.append(command)
else:
super().execute(command)

def query(self, query: str, check_errors: bool = False) \
-> Optional[str]:
if self._buffering_enabled:
buffer_name = 'A_M_BUFFER' if query.startswith('smua') else 'B_M_BUFFER'
query = query[:-1] + buffer_name + ')'
self._buffered_script.append(query)
return "inf"
else:
command = 'reading = ' + query
self.execute(command)
self.execute('print(reading)')
response = self._socket.recv(1024)
response_decoded = response.decode()
if 'TSP>' in response_decoded:
raise DeviceException(msg='Do not use the web interface before or during use of this control lib! '
'You have to restart your device in order to continue.')
return response_decoded

def measure(self, unit: Unit, channel: SMUChannel, check_errors: bool = False) \
-> float:
"""
Expand Down Expand Up @@ -166,3 +196,99 @@ def beep(self, duration: float, frequency: int, check_errors: bool = False) \
self.execute(f'beeper.beep({duration}, {frequency})')
if check_errors:
self.check_error_buffer()

def toggle_buffering(self, enable: bool, check_errors: bool = False) \
-> None:
self._buffering_enabled = enable
if not hasattr(self, '_buffered_script'):
self._buffered_script = []

def execute_buffered_script(self, blocking: bool = True, check_errors: bool = False) \
-> None:
self.toggle_buffering(False)

buffer_entries_a = len([line for line in self._buffered_script if 'A_M_BUFFER' in line])
buffer_entries_b = len([line for line in self._buffered_script if 'B_M_BUFFER' in line])
default_script = ["loadscript pyBuff",
f'A_M_BUFFER = smua.makebuffer({buffer_entries_a})',
f'B_M_BUFFER = smub.makebuffer({buffer_entries_b})',
'A_M_BUFFER.appendmode = 1',
'B_M_BUFFER.appendmode = 1']
self._buffered_script = default_script + self._buffered_script
self._buffered_script.append('endscript')

exit_payload: dict = {'command': 'keyInput', 'value': 'K'}
payloads: list[dict] = []

n = 32
chunks = [self._buffered_script[i:i + n] for i in range(0, len(self._buffered_script), n)]

payloads += [self.__make_payload('\n'.join(chunk)) for chunk in chunks]
payloads += [self.__make_payload('pyBuff.save()'), exit_payload]

for payload in payloads:
response = requests.post('http://' + self._config.ip + '/HttpCommand', json=payload)
if response.status_code != 200:
raise DeviceException(msg='Failed to send and execute buffered script')
time.sleep(0.5)

self.execute('pyBuff()')

print('Waiting for script to complete')
if blocking:
status = requests.post('http://' + self._config.ip + '/HttpCommand', json={
"command": "shellOutput",
"timeout": 3,
"acceptsMultiple": True
})
while status.json()['status']['value'] == 'timeout':
status = requests.post('http://' + self._config.ip + '/HttpCommand', json={
"command": "shellOutput",
"timeout": 3,
"acceptsMultiple": True
})
print('Script finished')

def __make_payload(self, value: str, check_errors: bool = False) \
-> dict:
return {'command': 'shellInput', 'value': value}

def read_buffer(self, check_errors: bool = False) \
-> None:
buffer_a = self.__read_channel_buffer(SMUChannel.CHANNEL_A)
buffer_b = self.__read_channel_buffer(SMUChannel.CHANNEL_B)
self._buffer = (buffer_a, buffer_b)

def __read_channel_buffer(self, channel: SMUChannel, check_errors: bool = False) \
-> list[str]:
buffer_name = ('A' if channel == SMUChannel.CHANNEL_A else 'B') + '_M_BUFFER'
buffer_size = len([line for line in self._buffered_script if buffer_name in line]) - 2
batch_size = 1024 // 15

buffer = []
offset = 1
while offset + batch_size <= buffer_size:
buffer += self.__get_buffer_content(offset, batch_size, buffer_name)
offset += batch_size

if (remaining := buffer_size % batch_size) > 0:
buffer += self.__get_buffer_content(offset, remaining, buffer_name)

return buffer

def get_buffer(self, check_errors: bool = False) \
-> tuple[list[str], list[str]]:
return copy.deepcopy(self._buffer)

def __get_buffer_content(self, offset: int, batch_size: int, buffer_name: str, check_errors: bool = False) \
-> list[str]:
print_query = f'printbuffer({offset}, {offset + batch_size - 1}, {buffer_name})'
self.execute(print_query)
return self._socket.recv(1024).decode().replace('\n', '').split(', ')

def next_buffer_element(self, channel: SMUChannel, check_errors: bool = False) \
-> float:
if len(self._buffer) == 0:
raise Exception('Buffer is empty!')
buffer_idx = 0 if channel == SMUChannel.CHANNEL_A else 1
return float(self._buffer[buffer_idx].pop(0))
2 changes: 1 addition & 1 deletion py_instrument_control_lib/devices/KST3000.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""
This code is automatically generated from '../../../specifications/KST3000.json'.
This code is automatically generated from '../../specifications/KST3000.json'.
Any changes made to this file are overwritten if you regenerate this module.
Only make changes in the source file.
"""
Expand Down
2 changes: 1 addition & 1 deletion py_instrument_control_lib/devices/KST33500.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""
This code is automatically generated from '../../../specifications/KST33500.json'.
This code is automatically generated from '../../specifications/KST33500.json'.
Any changes made to this file are overwritten if you regenerate this module.
Only make changes in the source file.
"""
Expand Down
2 changes: 1 addition & 1 deletion py_instrument_control_lib/devices/SPD1305X.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""
This code is automatically generated from '../../../specifications/SPD1305X.json'.
This code is automatically generated from '../../specifications/SPD1305X.json'.
Any changes made to this file are overwritten if you regenerate this module.
Only make changes in the source file.
"""
Expand Down
2 changes: 1 addition & 1 deletion py_instrument_control_lib/devices/SwitchMatrix.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""
This code is automatically generated from '../../../specifications/SwitchMatrix.json'.
This code is automatically generated from '../../specifications/SwitchMatrix.json'.
Any changes made to this file are overwritten if you regenerate this module.
Only make changes in the source file.
"""
Expand Down
5 changes: 3 additions & 2 deletions py_instrument_control_lib/playground/Emulator.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import socket
import sys

host = ''
port = 5025
host = sys.argv[1]
port = int(sys.argv[2])

if __name__ == '__main__':
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
Expand Down
7 changes: 7 additions & 0 deletions py_instrument_control_lib/playground/emulator.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
#!/bin/bash

while :
do
python3 Emulator.py 127.0.0.1 $1
done

48 changes: 48 additions & 0 deletions py_instrument_control_lib/playground/test_kei2600_buffering.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
from py_instrument_control_lib.device_base.DeviceConfigs import TCPDeviceConfig
from py_instrument_control_lib.device_types.SMU import SMUChannel, Unit, SMUDisplay, SMUMode, SourceFunction, \
SourceOffMode, SourceSettling, Autozero
from py_instrument_control_lib.devices.KEI2600 import KEI2600

smu_config = TCPDeviceConfig(ip='132.231.14.169', port=5025, timeout=5)
smu = KEI2600(smu_config)
smu.connect()
smu.toggle_buffering(True)

smu.set_limit(Unit.CURRENT, SMUChannel.CHANNEL_A, 1)
smu.set_limit(Unit.CURRENT, SMUChannel.CHANNEL_B, 1)
smu.display_measure_function(SMUChannel.CHANNEL_A, SMUDisplay.MEASURE_DC_AMPS)
smu.display_measure_function(SMUChannel.CHANNEL_B, SMUDisplay.MEASURE_DC_VOLTS)
for channel in (SMUChannel.CHANNEL_A, SMUChannel.CHANNEL_B):
smu.toggle_measure_analog_filter(channel, True)
smu.toggle_autorange(Unit.VOLTAGE, channel, SMUMode.MEASURE, True)
smu.set_measure_plc(channel, 2)
smu.set_measure_auto_zero(channel, Autozero.ONCE)
smu.set_source_function(channel, SourceFunction.DC_VOLTS)
smu.set_source_off_mode(channel, SourceOffMode.OUTPUT_ZERO)
smu.set_source_settling(channel, SourceSettling.SMOOTH)
smu.toggle_source_sink(channel, True)
smu.toggle_channel(SMUChannel.CHANNEL_A, True)
smu.toggle_channel(SMUChannel.CHANNEL_B, True)
smu.set_level(Unit.VOLTAGE, SMUChannel.CHANNEL_A, 1)
smu.set_level(Unit.VOLTAGE, SMUChannel.CHANNEL_B, 1)
for i in range(0, 10):
smu.measure(Unit.CURRENT, SMUChannel.CHANNEL_A)
smu.measure(Unit.VOLTAGE, SMUChannel.CHANNEL_A)
smu.measure(Unit.VOLTAGE, SMUChannel.CHANNEL_B)
smu.measure(Unit.CURRENT, SMUChannel.CHANNEL_A)
smu.measure(Unit.VOLTAGE, SMUChannel.CHANNEL_B)
smu.toggle_channel(SMUChannel.CHANNEL_A, False)
smu.toggle_channel(SMUChannel.CHANNEL_B, False)

smu.execute_buffered_script(blocking=True)
smu.read_buffer()
buff = smu.get_buffer()

for i in range(0, 10):
print(f'smu.measure(Unit.CURRENT, SMUChannel.CHANNEL_A) = {smu.next_buffer_element(SMUChannel.CHANNEL_A)}\n')
print(f'smu.measure(Unit.VOLTAGE, SMUChannel.CHANNEL_A) = {smu.next_buffer_element(SMUChannel.CHANNEL_A)}\n')
print(f'smu.measure(Unit.VOLTAGE, SMUChannel.CHANNEL_B) = {smu.next_buffer_element(SMUChannel.CHANNEL_B)}\n')
print(f'smu.measure(Unit.CURRENT, SMUChannel.CHANNEL_A) = {smu.next_buffer_element(SMUChannel.CHANNEL_A)}\n')
print(f'smu.measure(Unit.VOLTAGE, SMUChannel.CHANNEL_B) = {smu.next_buffer_element(SMUChannel.CHANNEL_B)}\n')

pass
67 changes: 67 additions & 0 deletions specifications/KEI2600.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,21 @@
"name": "KEI2600",
"devicetype": "SMU",
"superclass": "KeithleyDevice",
"imports": "import requests\nfrom py_instrument_control_lib.device_base.DeviceException import DeviceException\nimport time\nimport copy\nfrom typing import Optional",
"commands": [
{
"name": "execute",
"type": "code",
"signature": "command: str",
"code": "if self._buffering_enabled:\n self._buffered_script.append(command)\nelse:\n super().execute(command)"
},
{
"name": "query",
"type": "code",
"signature": "query: str",
"return": "Optional[str]",
"code": "if self._buffering_enabled:\n buffer_name = 'A_M_BUFFER' if query.startswith('smua') else 'B_M_BUFFER'\n query = query[:-1] + buffer_name + ')'\n self._buffered_script.append(query)\n return \"inf\"\nelse:\n command = 'reading = ' + query\n self.execute(command)\n self.execute('print(reading)')\n response = self._socket.recv(1024)\n response_decoded = response.decode()\n if 'TSP>' in response_decoded:\n raise DeviceException(msg='Do not use the web interface before or during use of this control lib! '\n 'You have to restart your device in order to continue.')\n return response_decoded"
},
{
"name": "measure",
"docs": "This function measures a certain unit on a specific channel. Can be used to measure voltage, current, power or resistance.\n:param unit: The thing to measure.\n:param channel: The channel to measure on.\n:param check_errors: Whether to check the error buffer after the execution.\n:return: None.",
Expand Down Expand Up @@ -125,6 +139,59 @@
"docs": "Send a beeping sound with a specific duration and frequency to the SMU.\n:param duration: Time in seconds to play the beeping sound.\n:param frequency: Frequency in HZ of the sound to play.\n:param check_errors: Whether to check the error buffer after the execution.\n:return: None.",
"signature": "duration: float, frequency: int",
"command": "f'beeper.beep({duration}, {frequency})'"
},
{
"name": "toggle_buffering",
"type": "code",
"signature": "enable: bool",
"code": "self._buffering_enabled = enable\nif not hasattr(self, '_buffered_script'):\n self._buffered_script = []"
},
{
"name": "execute_buffered_script",
"type": "code",
"signature": "blocking: bool = True",
"code": "self.toggle_buffering(False)\n\nbuffer_entries_a = len([line for line in self._buffered_script if 'A_M_BUFFER' in line])\nbuffer_entries_b = len([line for line in self._buffered_script if 'B_M_BUFFER' in line])\ndefault_script = [\"loadscript pyBuff\",\n f'A_M_BUFFER = smua.makebuffer({buffer_entries_a})',\n f'B_M_BUFFER = smub.makebuffer({buffer_entries_b})',\n 'A_M_BUFFER.appendmode = 1',\n 'B_M_BUFFER.appendmode = 1']\nself._buffered_script = default_script + self._buffered_script\nself._buffered_script.append('endscript')\n\nexit_payload: dict = {'command': 'keyInput', 'value': 'K'}\npayloads: list[dict] = []\n\nn = 32\nchunks = [self._buffered_script[i:i + n] for i in range(0, len(self._buffered_script), n)]\n\npayloads += [self.__make_payload('\\n'.join(chunk)) for chunk in chunks]\npayloads += [self.__make_payload('pyBuff.save()'), exit_payload]\n\nfor payload in payloads:\n response = requests.post('http://' + self._config.ip + '/HttpCommand', json=payload)\n if response.status_code != 200:\n raise DeviceException(msg='Failed to send and execute buffered script')\n time.sleep(0.5)\n\nself.execute('pyBuff()')\n\nprint('Waiting for script to complete')\nif blocking:\n status = requests.post('http://' + self._config.ip + '/HttpCommand', json={\n \"command\": \"shellOutput\",\n \"timeout\": 3,\n \"acceptsMultiple\": True\n })\n while status.json()['status']['value'] == 'timeout':\n status = requests.post('http://' + self._config.ip + '/HttpCommand', json={\n \"command\": \"shellOutput\",\n \"timeout\": 3,\n \"acceptsMultiple\": True\n })\n print('Script finished')"
},
{
"name": "__make_payload",
"type": "code",
"return": "dict",
"signature": "value: str",
"code": "return {'command': 'shellInput', 'value': value}"
},
{
"name": "read_buffer",
"type": "code",
"signature": "",
"code": "buffer_a = self.__read_channel_buffer(SMUChannel.CHANNEL_A)\nbuffer_b = self.__read_channel_buffer(SMUChannel.CHANNEL_B)\nself._buffer = (buffer_a, buffer_b)"
},
{
"name": "__read_channel_buffer",
"type": "code",
"signature": "channel: SMUChannel",
"return": "list[str]",
"code": "buffer_name = ('A' if channel == SMUChannel.CHANNEL_A else 'B') + '_M_BUFFER'\nbuffer_size = len([line for line in self._buffered_script if buffer_name in line]) - 2\nbatch_size = 1024 // 15\n\nbuffer = []\noffset = 1\nwhile offset + batch_size <= buffer_size:\n buffer += self.__get_buffer_content(offset, batch_size, buffer_name)\n offset += batch_size\n\nif (remaining := buffer_size % batch_size) > 0:\n buffer += self.__get_buffer_content(offset, remaining, buffer_name)\n\nreturn buffer"
},
{
"name": "get_buffer",
"type": "code",
"signature": "",
"return": "tuple[list[str], list[str]]",
"code": "return copy.deepcopy(self._buffer)"
},
{
"name": "__get_buffer_content",
"type": "code",
"signature": "offset: int, batch_size: int, buffer_name: str",
"return": "list[str]",
"code": "print_query = f'printbuffer({offset}, {offset + batch_size - 1}, {buffer_name})'\nself.execute(print_query)\nreturn self._socket.recv(1024).decode().replace('\\n', '').split(', ')"
},
{
"name": "next_buffer_element",
"type": "code",
"signature": "channel: SMUChannel",
"return": "float",
"code": "if len(self._buffer) == 0:\n raise Exception('Buffer is empty!')\nbuffer_idx = 0 if channel == SMUChannel.CHANNEL_A else 1\nreturn float(self._buffer[buffer_idx].pop(0))"
}
]
}

0 comments on commit 37c2940

Please sign in to comment.