x
Data Science

How to Create 10,000 ML Features From Raw Data in Under Four Hours

by
Andrew Engel
on
4/26/2022
How to Create 10,000 ML Features From Raw Data in Under Four Hours

To start, I have a confession: I enjoy creating features too much. In almost all of my data science projects, I end up creating thousands of features. Once this has occurred, I am left with the challenge of how to select a small subset of these features for my final modeling (a topic for a different blog).

I find this aspect of the project my best opportunity to use my creativity. I express this creativity in two forms. The first is to apply my understanding of the business and data to build features that capture business information and previously identified patterns in the data. The second is the application of my mathematics and science training to build features that have shown benefit in related problems.

These features are my best opportunity to improve the performance of my final models and drive real business value with my work. Sadly, this aspect of the predictive analytics lifecycle is rarely discussed. When talking with individuals new to the field, they neither understand its importance nor how to go about engineering features.

In this blog, I will show how to sequentially build meaningful features for a project to predict time to failure in hard drives using data provided by Backblaze. I will also be using the open-source package RasgoQL to execute SQL on my data warehouse directly from my local machine. For this analysis we will assume we are interested in making this prediction once a week and aggregate the data weekly.

Once this is done, I show how this can be applied to generate 10,000 features from the BackBlaze data in under four hours. If you would like to try this yourself, Backblaze makes this data freely available here.

One hundred features from one column

The Backblaze data contains the S.M.A.R.T. data for each hard drive each day. Since Backblaze has stated they actively monitor five S.M.A.R.T. stats to help identify drives that are likely to fail: SMART 5, 187, 188, 197, 198.

First, we need to connect with our data warehouse and get the table that holds the Backblaze data. I use Snowflake, but other warehouses are supported.


import rasgoql
# Connect to warehouse
creds = rasgoql.SnowflakeCredentials.from_env()
rql = rasgoql.connect(credentials=creds)
# Get a connect to the hard drive data
backblaze = rql.dataset('RASGOQL.PUBLIC.BACKBLAZE_DAILY')

Next, before we aggregate the data to a weekly level, we want to extract the week from the date and rename that value to WEEK.


data = backblaze.datetrunc(dates={
                     'DATE':'week'}).rename(
                     renames={'DATE_WEEK': 'WEEK'})

Now we can aggregate the data by SERIAL_NUMBER (hard drive) and WEEK. We will calculate the weekly minimum, median, maximum, mean, and standard deviation for the first SMART value Backblaze mentioned: SMART_5_RAW (Reallocated Sectors Count).


aggds = data.aggregate(
             group_by=['SERIAL_NUMBER', 'WEEK'],             
             aggregations={'SMART_5_RAW':['MIN', 'MEDIAN', 'AVG',    
                                          'MAX', 'STDDEV']
                          })

This captures the most recent week of behavior for the reallocated sectors count, but often in problems like these, the trend of this information over time is most powerful. One of the most common ways of capturing this trend is to look at moving or rolling windows of data. In this case, we will calculate the minimum, average and maximum for all of the weekly features created in the prior step for the last four and twelve weeks. In addition, we will calculate the standard deviation for the weekly mean value already calculated.


movaggsds = aggds.rolling_agg(aggregations={
                  'SMART_5_RAW_AVG': ['MIN', 'AVG', 'MAX', 
                                      'STDDEV'],
                  'SMART_5_RAW_MIN': ['MIN', 'AVG', 'MAX'],                                                             
                  'SMART_5_RAW_MEDIAN': ['MIN', 'AVG', 'MAX'],
                  'SMART_5_RAW_MAX': ['MIN', 'AVG', 'MAX'],
                  'SMART_5_RAW_STDDEV': ['MIN', 'AVG', 'MAX']
                                           },
                              offsets=[4, 12],
                              partition=['WEEK'],
                              order_by=['SERIAL_NUMBER'])

The other common approach to capturing trends is using lagged features to include prior weeks values in this week’s data. The advantage of calculating the moving values is they can also be lagged. We won’t lag all these calculated values but will lag the moving average of both the weekly average and weekly median over the prior one through four weeks, eight weeks, twelve weeks, and sixteen weeks.


lagds = movaggsds.lag(columns=['AVG_SMART_5_RAW_AVG_4', 
                               'AVG_SMART_5_RAW_AVG_12', 
                               'AVG_SMART_5_RAW_MEDIAN_4', 
                               'AVG_SMART_5_RAW_MEDIAN_12']
                      amounts=[1, 2, 3, 4, 8, 12, 16],
                      order_by=['WEEK', 'SERIAL_NUMBER'],
                      partition=['SERIAL_NUMBER'])

Finally, these lag values can be combined with the current weeks value to capture trends through the use of differences, ratios, and weighted moving averages. First, we define the mathematical operations we want to run.


mathlist = ['(SMART_5_RAW_AVG - AVG_SMART_5_RAW_AVG_4) / 
         NULLIF(STDDEV_SMART_5_RAW_AVG_4, 0)', 
         'SMART_5_RAW_MIN/NULLIF(AVG_SMART_5_RAW_MIN_4, 0)',  
         'SMART_5_RAW_MEDIAN/NULLIF(AVG_SMART_5_RAW_MEDIAN_4, 0)', 
         'SMART_5_RAW_AVG/NULLIF(AVG_SMART_5_RAW_AVG_4, 0)', 
         'SMART_5_RAW_MAX/NULLIF(AVG_SMART_5_RAW_MAX_4, 0)', 
         'SMART_5_RAW_STDDEV/NULLIF(AVG_SMART_5_RAW_STDDEV_4, 0)', 
         '(SMART_5_RAW_AVG - AVG_SMART_5_RAW_AVG_12) / 
         NULLIF(STDDEV_SMART_5_RAW_AVG_12, 0)', 
         'SMART_5_RAW_MIN/NULLIF(AVG_SMART_5_RAW_MIN_12, 0)', 
         'SMART_5_RAW_MEDIAN/NULLIF(AVG_SMART_5_RAW_MEDIAN_12, 0)', 
         'SMART_5_RAW_AVG/NULLIF(AVG_SMART_5_RAW_AVG_12, 0)', 
         'SMART_5_RAW_MAX/NULLIF(AVG_SMART_5_RAW_MAX_12, 0)', 
         'SMART_5_RAW_STDDEV/NULLIF(AVG_SMART_5_RAW_STDDEV_12, 0)', 
         '(5*AVG_SMART_5_RAW_MEDIAN_4 + 
           4*LAG_AVG_SMART_5_RAW_MEDIAN_4_1 +  
           3*LAG_AVG_SMART_5_RAW_MEDIAN_4_2 +  
           2*LAG_AVG_SMART_5_RAW_MEDIAN_4_3 + 
           LAG_AVG_SMART_5_RAW_MEDIAN_4_4)/15', 
         '(5*AVG_SMART_5_RAW_MEDIAN_4 + 
           4*LAG_AVG_SMART_5_RAW_MEDIAN_4_4 +   
           3*LAG_AVG_SMART_5_RAW_MEDIAN_4_8 + 
           2*LAG_AVG_SMART_5_RAW_MEDIAN_4_12 + 
           LAG_AVG_SMART_5_RAW_MEDIAN_4_16)/15']

And the names we want these features to have.


mathnames = ['SMART_5_RAW_mahalanobis_4', 
             'SMART_5_RAW_MIN_4_ratio', 
             'SMART_5_RAW_MEDIAN_4_ratio', 
             'SMART_5_RAW_AVG_4_ratio', 
             'SMART_5_RAW_MAX_4_ratio', 
             'SMART_5_RAW_STDDEV_4_ratio', 
             'SMART_5_RAW_mahalanobis_12', 
             'SMART_5_RAW_MIN_12_ratio', 
             'SMART_5_RAW_MEDIAN_12_ratio', 
             'SMART_5_RAW_AVG_12_ratio', 
             'SMART_5_RAW_MAX_12_ratio', 
             'SMART_5_RAW_STDDEV_12_ratio', 
             'moving_average_SMART_5_RAW_MEDIAN_weekly', 
             'moving_average_SMART_5_RAW_MEDIAN_monthly']

Then execute these math operations on the data.


featureds = lagds.math(math_ops=mathlist,
                       names=mathnames)

We can now save the results of this work either as a view or table back on our data warehouse.


featureds.save(table_name="BACKBLAZE_SMART_5_RAW_FEATURES", 
               table_type="VIEW")

A sample of the records can be extracted by running preview or the entire data can be downloaded as a pandas dataframe for further work as


df = featureds.to_df()

This all happens in only a few minutes in the data warehouse, and I am ready to move on with the rest of my modeling. In the past, simply extracting this data (93 million observations) from the warehouse would have taken much longer, let alone the time to perform these calculations in pandas (if I didn’t run out of RAM first).
Additionally, the sql can be checked as


featureds.sql()

Descriptive statistics about these features can be generated and printed


print(featureds.describe().to_df())

Finally, in the past, one of the most painful parts of feature engineering was working with the data engineering team to convert my Python feature engineering code to SQL so they could run it in their dbt workflow. But with RasgoQL, I can export a dbt model that the data engineers can run in production.


featureds.to_dbt(project_directory='smart_5_dbt_model')

Scale to 10,000 features

Because this approach was systematic and each step built on the prior step, I can easily write a function to perform all of these steps automatically for any given column.


def create_features(dataset, features, window_sizes, lag_amounts, table):
    weekly_agg_dict = {}
    moving_columns_dict = {}
    lag_features = []
    math_list = []
    math_names = []
    for v in features:
        weekly_agg_dict[v] = ['MIN', 'MEDIAN', 
                              'AVG', 'MAX', 'STDDEV']
        moving_columns_dict[f"{v}_AVG"] = ['MIN', 'AVG', 'MAX', 
                                           'STDDEV']
        for postfix in ['MIN', 'MEDIAN', 'MAX', 'STDDEV']:
            moving_columns_dict[f"{v}_{postfix}"] = ['MIN', 'AVG', 
                                                     'MAX']
        for window in window_sizes:
            window = np.abs(window)
            lag_features.append(f"AVG_{v}_AVG_{window}")
            lag_features.append(f"AVG_{v}_MEDIAN_{window}")
            math_list.append(f"({v}_AVG - AVG_{v}_AVG_{window}) / 
                               NULLIF(STDDEV_{v}_AVG_{window}, 0)")
            math_names.append(f"{v}_mahalanobis_{window}")
           for agg in ['MIN', 'MEDIAN', 'AVG', 'MAX', 'STDDEV']:
                math_list.append(f"{v}_{agg} / 
                          NULLIF(AVG_{v}_{agg}_{window}, 0)")
                math_names.append(f"{v}_{agg}_{window}_ratio")
        
        for agg in ['MEDIAN', 'AVG']:
            math_list.append(
                 f"(5*AVG_{v}_{agg}_4 + 4*LAG_AVG_{v}_{agg}_4_1 + 
                    3*LAG_AVG_{v}_{agg}_4_2 + 
                    2*LAG_AVG_{v}_{agg}_4_3 + 
                    LAG_AVG_{v}_{agg}_4_4)/15")
            math_names.append(f"moving_average_{v}_{agg}_weekly")
            math_list.append(
                 f"(5*AVG_{v}_{agg}_4 + 4*LAG_AVG_{v}_{agg}_4_4 + 
                    3*LAG_AVG_{v}_{agg}_4_8 + 
                    2*LAG_AVG_{v}_{agg}_4_12 + 
                    LAG_AVG_{v}_{agg}_4_16)/15")
            math_names.append(f"moving_average_{v}_{agg}_monthly")
            for window in window_sizes:
                window = np.abs(window)
                for lag in lag_amounts:
                    math_list.append(
                         f"(AVG_{v}_{agg}_{window} - 
                            LAG_AVG_{v}_{agg}_{window}_{lag})
                            / {lag}")
                    math_names.append(
                     f"AVG_{v}_{agg}_{window}_rate_of_change_{lag}")
    # Setup for daily agg
    print("\tweek setup", flush=True)
    data = dataset.datetrunc(
                   dates={'DATE':'week'}).rename(
                   renames={'DATE_WEEK': 'WEEK'})
    # Aggregate to day level
    print("\taggregate", flush=True)
    aggds = data.aggregate(group_by=['SERIAL_NUMBER', 'WEEK'], 
                           aggregations=weekly_agg_dict)
    
    # Moving Window aggs
    print("\tmoving window", flush=True)
    movaggsds = aggds.rolling_agg(aggregations=moving_columns_dict,
                                  offsets=window_sizes,
                                  partition=['WEEK'],
                                  order_by=['SERIAL_NUMBER'])
    
    # Lag Variables
    print("\tlags", flush=True)
    lagds = movaggsds.lag(columns=lag_features,
                          amounts=lag_amounts,
                          order_by=['WEEK', 'SERIAL_NUMBER'],
                          partition=['SERIAL_NUMBER'])
    
    # Final features
    print("\tmath", flush=True)
    featureds = lagds.math(math_ops=math_list,
                           names=math_names)
    
    # publish and return dataset
    print("\tpublish", flush=True)
    return featureds.save(table_name=table, table_type="VIEW")
    

Backblaze uses the five listed features to identify soon to fail drives by flagging any drive that has a nonzero value. To identify failures earlier, I want to explore other features to see if there is any signal to be found. For this reason, I run this function for all columns (all 124 raw SMART features) as


datasets = []
for feature in feature_columns:
    table = f"BACKBLAZE_{feature}"
    ds = create_features(dataset=backblaze, 
                         features=[feature], 
                         window_sizes=window_sizes, 
                         lag_amounts=lag_amounts,
                         table=table)
    datasets.append(ds)

And finally I will extract all of the new features for modeling or dbt models for the data engineering team to place into production.

If you want to check out RasgoQL, the documentation can be found here and the repository here.