-
Notifications
You must be signed in to change notification settings - Fork 0
/
test.py
148 lines (118 loc) · 4.38 KB
/
test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
# working_iteration.py
import streamlit as st
import numpy as np
import time
from datetime import datetime
import asyncio
from dataclasses import dataclass, asdict
from typing import List
@dataclass
class DataPoint:
timestamp: datetime
price: float
volume: int
# Mock data generator to simulate yielded data
async def mock_data_generator():
while True:
timestamp = datetime.now()
price = np.random.randint(100, 200) + np.random.random()
volume = np.random.randint(1000, 10000)
yield DataPoint(timestamp, price, volume)
await asyncio.sleep(1) # Simulate delay between data points
# Function to save snapshot
def save_snapshot(data):
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
st.session_state.snapshots[timestamp] = data.copy()
return timestamp
# Initialize session state
if "data" not in st.session_state:
st.session_state.data = []
if "snapshots" not in st.session_state:
st.session_state.snapshots = {}
if "current_view" not in st.session_state:
st.session_state.current_view = "Real-time Stream"
if "snapshot_buttons" not in st.session_state:
st.session_state.snapshot_buttons = []
# Streamlit app
st.title("Data Stream with Snapshots")
# Sidebar for snapshots and navigation
st.sidebar.header("Snapshots and Navigation")
snapshot_interval = st.sidebar.slider("Snapshot Interval (seconds)", 5, 60, 30)
# Take snapshot button
if st.sidebar.button("Take Snapshot Now"):
new_snapshot = save_snapshot(st.session_state.data)
st.session_state.snapshot_buttons.append(new_snapshot)
st.sidebar.success(f"Snapshot taken at {new_snapshot}")
# Real-time stream button
if st.sidebar.button("Return to Mempool Block"):
st.session_state.current_view = "pending_block"
# Display current view
st.sidebar.write(f"Currently viewing: {st.session_state.current_view}")
# Snapshot navigation buttons
st.sidebar.header("Snapshot Navigation")
for timestamp in st.session_state.snapshot_buttons:
if st.sidebar.button(f"View Snapshot: {timestamp}"):
st.session_state.current_view = timestamp
# Main content
col1, col2 = st.columns(2)
with col1:
st.subheader("Data View")
stream_placeholder = st.empty()
with col2:
st.subheader("Summary Statistics")
stats_placeholder = st.empty()
chart_placeholder = st.empty()
# Function to update display
def update_display(data):
# Update stream display
stream_placeholder.table(data[-10:])
# Update summary statistics
if data:
prices = [d.price for d in data]
volumes = [d.volume for d in data]
stats = {
"Price": {
"mean": np.mean(prices),
"std": np.std(prices),
"min": np.min(prices),
"max": np.max(prices),
},
"Volume": {
"mean": np.mean(volumes),
"std": np.std(volumes),
"min": np.min(volumes),
"max": np.max(volumes),
},
}
stats_placeholder.table(stats)
# Update chart
if data:
chart_data = {
"Timestamp": [d.timestamp for d in data],
"Price": [d.price for d in data],
}
chart_placeholder.line_chart(chart_data, x="Timestamp", y="Price")
# Async function to update data
async def update_data():
async for new_data in mock_data_generator():
if st.session_state.current_view == "Real-time Stream":
# Append new data
st.session_state.data.append(new_data)
# Keep only the last 100 records
st.session_state.data = st.session_state.data[-100:]
# Update display with real-time data
update_display(st.session_state.data)
# Save snapshot if interval has passed
if len(st.session_state.data) % snapshot_interval == 0:
new_snapshot = save_snapshot(st.session_state.data)
if new_snapshot not in st.session_state.snapshot_buttons:
st.session_state.snapshot_buttons.append(new_snapshot)
else:
# Display selected snapshot
snapshot_data = st.session_state.snapshots[st.session_state.current_view]
update_display(snapshot_data)
# Use st.experimental_rerun() to update the UI and check for button presses
st.rerun()
# Run the async function
if __name__ == "__main__":
asyncio.run(update_data())