Skip to content

Commit

Permalink
simulation: Add cocotb python library for XLS
Browse files Browse the repository at this point in the history
The library contains XLSChannel, XLSChannelDriver and XLSChannelMonitor classes.

* XLSChannel - provides a mechanism for wrapping all signals related to XLS channels into one object.
* XLSChannelDriver - may be used to send data to XLS channel
* XLSChannelMonitor - may be used to monitor transaction taking place in XLS Channel

Internal-tag: [#46586]
Signed-off-by: Robert Winkler <rwinkler@antmicro.com>
  • Loading branch information
rw1nkler committed Aug 30, 2023
1 parent cf68778 commit bd3d696
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 0 deletions.
30 changes: 30 additions & 0 deletions xls/simulation/cocotb/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Copyright 2023 The XLS Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

load("@xls_pip_deps//:requirements.bzl", "requirement")

package(
default_visibility = ["//xls:xls_public"],
licenses = ["notice"], # Apache 2.0
)

py_library(
name = "cocotb_xls",
srcs = ["cocotb_xls.py"],
imports = ["."],
deps = [
requirement("cocotb"),
requirement("cocotb_bus"),
],
)
80 changes: 80 additions & 0 deletions xls/simulation/cocotb/cocotb_xls.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
# Copyright 2023 The XLS Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Any, Sequence, Union

import cocotb
from cocotb.binary import BinaryValue
from cocotb.handle import SimHandleBase
from cocotb.triggers import RisingEdge
from cocotb_bus.bus import Bus
from cocotb_bus.drivers import BusDriver
from cocotb_bus.monitors import BusMonitor

Transaction = Union[BinaryValue, Sequence[BinaryValue]]

XLS_CHANNEL_SIGNALS = ["data", "rdy", "vld"]
XLS_CHANNEL_OPTIONAL_SIGNALS = []


class XLSChannel(Bus):
_signals = XLS_CHANNEL_SIGNALS
_optional_signals = XLS_CHANNEL_OPTIONAL_SIGNALS

def __init__(self, entity, name, **kwargs: Any):
super().__init__(entity, name, self._signals, self._optional_signals, **kwargs)


class XLSChannelDriver(BusDriver):
_signals = XLS_CHANNEL_SIGNALS
_optional_signals = XLS_CHANNEL_OPTIONAL_SIGNALS

def __init__(self, entity: SimHandleBase, name: str, clock: SimHandleBase, **kwargs: Any):
BusDriver.__init__(self, entity, name, clock, **kwargs)

self.bus.data.setimmediatevalue(0)
self.bus.vld.setimmediatevalue(0)

async def _driver_send(self, transaction: Transaction, sync: bool = True, **kwargs: Any) -> None:
if sync:
await RisingEdge(self.clock)

data_to_send = (transaction if isinstance(transaction, Sequence) else [transaction])

for word in data_to_send:
self.bus.vld.value = 1
self.bus.data.value = word

while True:
await RisingEdge(self.clock)
if self.bus.rdy.value:
break

self.bus.vld.value = 0


class XLSChannelMonitor(BusMonitor):
_signals = XLS_CHANNEL_SIGNALS
_optional_signals = XLS_CHANNEL_OPTIONAL_SIGNALS

def __init__(self, entity: SimHandleBase, name: str, clock: SimHandleBase, **kwargs: Any):
BusMonitor.__init__(self, entity, name, clock, **kwargs)

@cocotb.coroutine
async def _monitor_recv(self) -> None:
while True:
await RisingEdge(self.clock)
if self.bus.rdy.value:
vec = self.bus.data.value
self._recv(vec)

0 comments on commit bd3d696

Please sign in to comment.