Source code for torchdyno.data.datasets.hhar

import os
import pickle
from typing import Optional

import numpy as np
import pandas as pd
import torch
import torch.nn.functional as F
from scipy.stats import zscore

PHONES = ["nexus4", "s3", "s3mini"]  # , 'samsungold']
WATCHES = ["gear", "lgwatch"]
LABELS = {"stand": 0, "sit": 1, "walk": 2, "stairsup": 3, "stairsdown": 4, "bike": 5}
FREQ = {
    "nexus4": 200,
    "s3": 150,
    "s3mini": 100,
    "gear": 100,
    "lgwatch": 200,
}  #'samsungold': 50
TARGET_FREQ = 100
TOLERANCE = 5


[docs] class HHARDataset(torch.utils.data.Dataset): """HHAR dataset. The HHAR dataset is a dataset of human activity recognition (HAR) collected from smartphones and smartwatches. The dataset contains accelerometer and gyroscope data from different devices and users. The dataset is used to classify the activity of the user based on the sensor data. """ USERS = { "all": ["a", "b", "c", "d", "e", "f", "g", "h", "i"], "train": { 25: ["a", "b"], 50: ["a", "b", "c"], 75: ["a", "b", "c", "e"], 100: ["a", "b", "c", "e", "h"], }, "eval": ["f", "i"], "test": ["d", "g"], } CONTEXTS = ["nexus4", "s3", "s3mini", "lgwatch"]
[docs] def __init__( self, root: str, user: str, context: int, seq_length: Optional[int] = 200 ): """Initialize the HHAR dataset. Args: root (str): Root directory of the dataset. user (str): User to load. context (int): Context to load. seq_length (Optional[int], optional): Sequence length. If None, the dataset is not split into sequences. Defaults to 200. """ super().__init__() if user not in HHARDataset.USERS["all"]: raise ValueError(f"User {user} not found in HHAR dataset") if context < 0 or context >= len(HHARDataset.CONTEXTS): raise ValueError(f"Context {context} not found in HHAR dataset") self.root = root self.user = user self.context = HHARDataset.CONTEXTS[context] self.freq = TARGET_FREQ self._seq_length = seq_length self.path = os.path.join(self.processed_path, self.user, f"{self.context}.pkl") if not os.path.exists(self.path): os.makedirs(os.path.join(self.processed_path, self.user), exist_ok=True) self.preprocess() self.data = pickle.load(open(self.path, "rb")) self.features, self.targets = self.data["X"], self.data["Y"] if seq_length is not None: self.features, self.targets = self._to_sequence_chunks(seq_length)
def __len__(self): """Return the length of the dataset.""" if len(self.features.shape) == 2: return 1 return self.features.shape[1] def __getitem__(self, i: int): """Return the item at the given index.""" if len(self.features.shape) == 2: return self.features, self.targets return self.features[:, i], self.targets[:, i] def _to_sequence_chunks(self, length: int): X, Y = self.data["X"], self.data["Y"] X, Y = torch.split(X, length, dim=0), torch.split(Y, length, dim=0) if X[-1].shape[0] != length: X, Y = X[:-1], Y[:-1] return torch.stack(X, dim=1), torch.stack(Y, dim=1)
[docs] def preprocess(self): print(f"Preprocessing {self.user} - {self.context} - {self.freq}Hz...") if self.context in PHONES: acc_path, gyr_path = "Phones_accelerometer.csv", "Phones_gyroscope.csv" else: acc_path, gyr_path = "Watch_accelerometer.csv", "Watch_gyroscope.csv" rdfs = { "acc": pd.read_csv(os.path.join(self.raw_path, acc_path)).dropna( subset=["gt"] ), "gyr": pd.read_csv(os.path.join(self.raw_path, gyr_path)).dropna( subset=["gt"] ), } dev = {"1": None, "2": None} for i in ["1", "2"]: dev[i] = self._merge_user_device( rdfs["acc"], rdfs["gyr"], self.user, f"{self.context}_{i}", TOLERANCE ) dev_1_perc, dev_2_perc = dev["1"].isna().sum().max() / len(dev["1"]), dev[ "2" ].isna().sum().max() / len(dev["2"]) chosen_dev, perc = ( ("1", dev_1_perc) if (dev_1_perc <= dev_2_perc) else ("2", dev_2_perc) ) df_dev = dev[chosen_dev] if self.freq != FREQ[self.context]: down_idx = np.around( np.arange(0, len(df_dev) - 1, FREQ[self.context] / self.freq) ) df_dev = df_dev.iloc[down_idx] df_dev = df_dev.dropna() values = ["x_acc", "y_acc", "z_acc", "x_gyr", "y_gyr", "z_gyr"] to_save = { "X": torch.tensor(df_dev[values].apply(zscore).values), "Y": F.one_hot( torch.tensor(df_dev["gt"].apply(lambda x: LABELS[x]).values), num_classes=6, ), } pickle.dump(to_save, open(self.path, "wb+")) print(f"{self.user} - {self.context} - {self.freq}Hz preprocessed!")
def _merge_user_device(self, acc_df, gyr_df, user, device, tolerance): def aux(df, user, device, sd): return ( df.loc[ ((df["User"] == user) & (df["Device"] == device)), ["Arrival_Time", "x", "y", "z", "gt"], ] .sort_values("Arrival_Time") .rename(columns={"x": f"x_{sd}", "y": f"y_{sd}", "z": f"z_{sd}"}) ) ud_acc, ud_gyr = aux(acc_df, user, device, f"acc"), aux( gyr_df, user, device, f"gyr" ) no_gt = ["Arrival_Time", "x_gyr", "y_gyr", "z_gyr"] lu_df = pd.merge_asof( ud_acc, ud_gyr[no_gt], on="Arrival_Time", direction="nearest", tolerance=tolerance, ) no_gt = ["Arrival_Time", "x_acc", "y_acc", "z_acc"] ru_df = pd.merge_asof( ud_gyr, ud_acc[no_gt], on="Arrival_Time", direction="nearest", tolerance=tolerance, ) u_df = pd.concat([lu_df, ru_df[ru_df["x_acc"].isnull()]]).sort_values( "Arrival_Time" ) u_df = u_df[ ["Arrival_Time", "x_acc", "y_acc", "z_acc", "x_gyr", "y_gyr", "z_gyr", "gt"] ] return u_df @property def seq_length(self): """Return the sequence length.""" return self._seq_length @seq_length.setter def seq_length(self, new_length: int): """Set the sequence length. If the sequence length is different from the current one, the dataset is split into sequences of the new length. """ if self._seq_length is None or new_length != self._seq_length: self.features, self.targets = self._to_sequence_chunks(new_length) self._seq_length = new_length @property def raw_path(self): return os.path.join(self.root, "raw", "HHAR") @property def processed_path(self): return os.path.join(self.root, "processed", "HHAR")