Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -134,3 +134,4 @@ leaderboard/rtd_token.txt
# locally pre-trained models
pyhealth/medcode/pretrained_embeddings/kg_emb/examples/pretrained_model
data/physionet.org/
.vscode
2 changes: 2 additions & 0 deletions pyhealth/datasets/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ def __init__(self, *args, **kwargs):
from .mimic4 import MIMIC4CXRDataset, MIMIC4Dataset, MIMIC4EHRDataset, MIMIC4NoteDataset
from .mimicextract import MIMICExtractDataset
from .omop import OMOPDataset
from .pendulum_data import PendulumData
from .sample_dataset import SampleDataset
from .sample_dataset import SampleBuilder, SampleDataset, create_sample_dataset
from .shhs import SHHSDataset
from .sleepedf import SleepEDFDataset
Expand Down
154 changes: 154 additions & 0 deletions pyhealth/datasets/pendulum_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
# Author(s): Esteban Benitez, Chloe Yang
# NetID(s): benitez5, junkey2
# Paper Title: Generative ODE Modeling with Known Unknowns
# Paper Link: https://arxiv.org/abs/2003.10775
# Description:
# Implements a Gymnasium-based Pendulum dataset generator for sequence modeling,
# reproducing the environment, parameterization, and data layout described in the paper.

import numpy as np
import gymnasium as gym
import skimage.transform
from tqdm import trange
from .base_dataset import BaseDataset
import logging

logger = logging.getLogger(__name__)


class PendulumData(BaseDataset):
"""
Synthetic data generator for the OpenAI Gymnasium Pendulum environment.

This class uses the Pendulum environment from OpenAI's Gymnasium to generate
sequences of pendulum states and rendered images. The resulting dataset is suitable
for learning and evaluating dynamical system models, such as generative ODEs.

Parameters
----------
env_name : str
The name of the gymnasium environment to use (default: 'Pendulum-v1').
render_mode : str
The render mode for the environment (e.g., "rgb_array").
seed : int
Random seed for reproducibility.
data_size : int
Number of trajectories to simulate (number of pendulum sequences).
seq_len : int
Number of time steps per trajectory.
side : int
Size (pixels) of each rendered image (output shape: [side, side]).
friction : float, optional
Friction coefficient used during simulation (if supported by environment).

References
----------
.. [1] Linial, Ori, Neta Ravid, Danny Eytan, and Uri Shalit.
"Generative ODE Modeling with Known Unknowns."
arXiv preprint arXiv:2003.10775 [cs.LG], 2020. https://arxiv.org/abs/2003.10775
.. [2] GOKU GitHub repository: https://github.com/orilinial/GOKU

Example
-------
>>> args = {
... "env_name": "Pendulum-v1",
... "render_mode": "rgb_array",
... "seed": 42,
... "data_size": 100,
... "seq_len": 50,
... "side": 64,
... "friction": 0.0,
... }
>>> dataset = PendulumData(**args)
"""

def __init__(self, **args):
self.env = gym.make(
args.get("env", "Pendulum-v1"), render_mode=args["render_mode"]
).unwrapped
self.env.reset(seed=args.get("seed", 2))
self.data = np.zeros(
(args["data_size"], args["seq_len"], args["side"], args["side"])
)
self.latent_data = np.zeros((args["data_size"], args["seq_len"], 2))
self.params_data = []
self.args = args
self.data_size = args["data_size"]
self.seq_len = args["seq_len"]
self.side = args["side"]

def create_pendulum_data(self):
for trial in trange(self.data_size):
reset_env(self.env, self.args)
params = get_params()
unlearned_params = get_unlearned_params()

for step in range(self.seq_len):
processed_frame = preproc(self.env.render(), self.side)
self.data[trial, step] = processed_frame
obs = step_env(self.args, self.env, [0.0], params, unlearned_params)

self.latent_data[trial, step, 0] = get_theta(obs)
self.latent_data[trial, step, 1] = obs[-1]

self.params_data.append(params)

self.env.close()
return self.data, self.latent_data, self.params_data


def get_theta(obs):
"""Transforms coordinate basis from the defaults of the gym pendulum env."""
theta = np.arctan2(obs[0], -obs[1])
theta = theta + np.pi / 2
theta = theta + 2 * np.pi if theta < -np.pi else theta
theta = theta - 2 * np.pi if theta > np.pi else theta
return theta


def preproc(X, side):
"""Crops, downsamples, desaturates, etc. the rgb pendulum observation."""
X = X[..., 0][220:-110, 165:-165] - X[..., 1][220:-110, 165:-165]
return skimage.transform.resize(X, [int(side), side]) / 255.0


def step_env(args, env, u, params, additional_params):
th, thdot = env.state

g = 10.0
m = 1.0
b = additional_params["b"]
l = params["l"]
dt = env.dt

if args["friction"]:
newthdot = thdot + ((-g / l) * np.sin(th + np.pi) - (b / m) * thdot) * dt
else:
newthdot = thdot + ((-g / l) * np.sin(th + np.pi)) * dt

newth = th + newthdot * dt
newthdot = np.clip(newthdot, -env.max_speed, env.max_speed)

env.state = np.array([newth, newthdot])
return env._get_obs()


def get_params():
l = np.random.uniform(1.0, 2.0)
params = {"l": l}
return params


def get_unlearned_params():
b = 0.7
params = {"b": b}
return params


def reset_env(env, args, min_angle=0.0, max_angle=np.pi / 6):
angle_ok = False
while not angle_ok:
obs, info = env.reset()
theta_init = np.abs(get_theta(obs))
if min_angle < theta_init < max_angle:
angle_ok = True
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ dependencies = [
"polars~=1.35.2",
"pandas~=2.3.1",
"pydantic~=2.11.7",
"gymnasium",
"scikit-image",
"pygame"
"dask[complete]~=2025.11.0",
"litdata~=0.2.59",
"pyarrow~=22.0.0",
Expand Down
52 changes: 52 additions & 0 deletions tests/core/test_pendulum.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import unittest
from pyhealth.datasets import PendulumData


class TestPendulumData(unittest.TestCase):
"""Test cases for OpenAI Pendulum Dataset."""

def setUp(self):
self.args = {
"env_name": "Pendulum-v1",
"render_mode": "rgb_array",
"seed": 1,
"data_size": 10,
"seq_len": 2,
"side": 1,
"friction": 0,
}

def test_args(self):
self.assertDictEqual(
self.args,
{
"env_name": "Pendulum-v1",
"render_mode": "rgb_array",
"seed": 1,
"data_size": 10,
"seq_len": 2,
"side": 1,
"friction": 0,
},
)

def test_get_data(self):
pd = PendulumData(**self.args)
data, latent_data, params_data = pd.create_pendulum_data()
self.assertListEqual(
data.tolist(),
[
[[[0.0]], [[0.0]]],
[[[0.0]], [[0.0]]],
[[[0.0]], [[0.0]]],
[[[0.0]], [[0.0]]],
[[[0.0]], [[0.0]]],
[[[0.0]], [[0.0]]],
[[[0.0]], [[0.0]]],
[[[0.0]], [[0.0]]],
[[[0.0]], [[0.0]]],
[[[0.00021478441553081915]], [[0.00021086596582142346]]],
],
)
self.assertEqual(latent_data.shape, (10, 2, 2))
self.assertGreater(params_data[0]["l"], 0)