Pipeline: SimpleMovingAverage and EquityPricing.close

Below is some of the code of my Pipeline Algo:

Am I not entering the Parameters for SimpleMovingAverage correctly?

it says it requires a dtype? Iam working from a course that was using zipline, but I am making an attempt.

Error in strategy - Traceback (most recent call last):
File "L111_Pipeline_Trading_Algo.py", line 33, in initialize
File "L111_Pipeline_Trading_Algo.py", line 95, in make_pipeline
zipline_pipeline.errors.DTypeNotSpecified: SimpleMovingAverage requires a dtype, but no dtype was passed.
Blueshift Alert[2022-03-01 00:00:00-05:00] ERROR in algorithm:Fatal error, will exit.
Blueshift Alert[2022-06-14 07:44:12.445447-04:00] WARNING :Shutting down Blueshift gracefully.

 
def make_pipeline():
    
    # Universe 
    pipe = Pipeline()
    
    # Dollar Volume (30 Days) Grab the Info
    dollar_volume = AverageDollarVolume(window_length=30)
    
    # Grab the top 5% in avg dollar volume
    high_dollar_volume = dollar_volume.percentile_between(95,100)
     
    # Combine the filters
    
    # 10 day mean close
    mean_10 = SimpleMovingAverage(inputs=[EquityPricing.close],window_length=10)
    
    # 30 day mean close
    mean_30 = SimpleMovingAverage(inputs=[EquityPricing.close],window_length=30)
    
    # Percent Difference
    percent_difference = (mean_10-mean_30)/mean_30
    
    # List of Shorts
    shorts = percent_difference < 0
    
    # List of Longs
    longs = percent_difference > 0
    
    # Final Mask/Filter for anything in shorts or longs
    securities_to_trade = (shorts | longs)
    
    # return Pipeline

    pipe.add(longs, 'longs')
    pipe.add(shorts, 'short')
    pipe.add(percent_difference, 'perc_diff')
    pipe.set_screen(high_dollar_volume)
    pipe.set_screen(securities_to_trade)
    
    return pipe

 

 

Here is all the code in its entirety:



 

"""
    Title: L111_Pipeline_Trading_Algo
    Description: A Pipline trading algo
    Style tags: Systematic
    Asset class: Equities
    Dataset: US Equities
"""
from blueshift.api import(    symbol,
                            order_target_percent,
                            schedule_function,
                            date_rules,
                            time_rules,
                            pipeline_output,
                            attach_pipeline,
                            get_datetime,
                       )

from blueshift.pipeline import Pipeline, CustomFilter
from blueshift.pipeline.data import EquityPricing
from blueshift.pipeline.factors import AverageDollarVolume,SimpleMovingAverage
#from blueshift_library.technicals.indicators import sma

def initialize(context):
    """
        function to define things to do at the start of the strategy
    """
    # The context variables can be accessed by other methods
    

    schedule_function(my_rebalance,date_rules.week_start(),time_rules.market_open(minutes=0, hours=1))
    
    
    attach_pipeline(make_pipeline(),'my_pipeline')
    
def my_rebalance(context,data):
    for security in context.portfolio.positions:
        if security not in context.longs and security not in context.shorts and data.can_trade(security):
            order_target_percent(security,0)
            
    for security in context.longs:
        if data.can_trade(security):
            order_target_percent(security,context.long_weight)

    for security in context.shorts:
        if data.can_trade(security):
            order_target_percent(security,context.short_weight)




def my_compute_weights(context):
    
    if len(context.longs)==0:
        long_weight = 0
    else:
        long_weight = 0.5 / len(context.longs)
  
    if len(context.shorts)==0:
        short_weight = 0
    else:
        short_weight = 0.5 / len(context.shorts)
    
    return (long_weight,short_weight)



def before_trading_start(context,data):
    context.output = pipeline_output('my_pipeline')
    
    # LONG
    context.longs = context.output[context.output['longs']].index.tolist()
    
    # SHORT
    context.shorts = context.output[context.output['shorts']].index.tolist()


    context.long_weight,context.short_weight = my_compute_weights(context)



def make_pipeline():
    
    # Universe 
    pipe = Pipeline()
    
    # Dollar Volume (30 Days) Grab the Info
    dollar_volume = AverageDollarVolume(window_length=30)
    
    # Grab the top 5% in avg dollar volume
    high_dollar_volume = dollar_volume.percentile_between(95,100)
     
    # Combine the filters
    
    # 10 day mean close
    mean_10 = SimpleMovingAverage(inputs=[EquityPricing.close],window_length=10)
    
    # 30 day mean close
    mean_30 = SimpleMovingAverage(inputs=[EquityPricing.close],window_length=30)
    
    # Percent Difference
    percent_difference = (mean_10-mean_30)/mean_30
    
    # List of Shorts
    shorts = percent_difference < 0
    
    # List of Longs
    longs = percent_difference > 0
    
    # Final Mask/Filter for anything in shorts or longs
    securities_to_trade = (shorts | longs)
    
    # return Pipeline

    pipe.add(longs, 'longs')
    pipe.add(shorts, 'short')
    pipe.add(percent_difference, 'perc_diff')
    pipe.set_screen(high_dollar_volume)
    pipe.set_screen(securities_to_trade)
    
    return pipe

Thank you,

Philip

Quite a few of the pipeline factors (and some other features) are not yet available in Blueshift directly. For these, you can try importing them directly from the zipline_pipeline module. This is the original zipline pipeline module adapted for latest python releases (and packages) and our datasets. So pleae try "from zipline_pipeline.pipeline.factors import AverageDollarVolume,SimpleMovingAverage" instead of "from blueshift.pipeline.factors import AverageDollarVolume,SimpleMovingAverage". In most cases importing from Blueshift will work though (more so in the future).



Also please see this thread for your "can_trade" calls.

Thanks again Propdita, also thanks for reminding me about can_trade().



I've updated the code and I am getting results,  which is good.



Backtest results, with Logs:



https://blueshift.quantinsti.com/research/api/share/public/W6mzUkpiF



However in the Error log I'm getting:

 

/usr/local/lib/python3.6/dist-packages/zipline_pipeline/pipeline/factors/basic.py:113: RuntimeWarning:

Mean of empty slice

/usr/local/lib/python3.6/dist-packages/zipline_pipeline/pipeline/factors/basic.py:113: RuntimeWarning:

Mean of empty slice


Is there a bug with the SimpleMovingAverage import?

I'll put all my code in my next post:

 


from blueshift.api import(    symbol,
                            order_target_percent,
                            schedule_function,
                            date_rules,
                            time_rules,
                            pipeline_output,
                            attach_pipeline,
                            get_datetime,
                       )

from blueshift.pipeline import Pipeline, CustomFilter
from blueshift.pipeline.data import EquityPricing
#from blueshift.pipeline.factors import AverageDollarVolume,SimpleMovingAverage
from zipline_pipeline.pipeline.factors import AverageDollarVolume,SimpleMovingAverage


import pandas as pd
import numpy as np

def can_trade(asset):
    if asset.auto_close_date:
        if asset.auto_close_date >= get_datetime().date():
            return True
    return False


def initialize(context):
    """
        function to define things to do at the start of the strategy
    """
    # The context variables can be accessed by other methods
    

    schedule_function(my_rebalance,date_rules.week_start(),time_rules.market_open(minutes=0, hours=1))
    
    
    attach_pipeline(make_pipeline(),'my_pipeline')


    
def my_rebalance(context,data):
    for security in context.portfolio.positions:
        if security not in context.longs and security not in context.shorts and can_trade(security):
            order_target_percent(security,0)
            
    for security in context.longs:
        if can_trade(security):    
            order_target_percent(security,context.long_weight)

    for security in context.shorts:
        if can_trade(security):   
            order_target_percent(security,context.short_weight)




def my_compute_weights(context):
    
    if len(context.longs)==0:
        long_weight = 0
    else:
        long_weight = 0.5 / len(context.longs)
  
    if len(context.shorts)==0:
        short_weight = 0
    else:
        short_weight = 0.5 / len(context.shorts)
    
    return (long_weight,short_weight)



def before_trading_start(context,data):
    context.output = pipeline_output('my_pipeline')
    
    # LONG
    context.longs = context.output[context.output['longs']].index.tolist()
    
    # SHORT
    context.shorts = context.output[context.output['shorts']].index.tolist()


    context.long_weight,context.short_weight = my_compute_weights(context)



def make_pipeline():
    
    # Universe 
    pipe = Pipeline()
    
    # Dollar Volume (30 Days) Grab the Info
    dollar_volume = AverageDollarVolume(window_length=30)
    
    # Grab the top 5% in avg dollar volume
    high_dollar_volume = dollar_volume.percentile_between(95,100)
     
    # Combine the filters
    
    # 10 day mean close
    mean_10 = SimpleMovingAverage(inputs=[EquityPricing.close], window_length=10)
    
    # 30 day mean close
    mean_30 = SimpleMovingAverage(inputs=[EquityPricing.close], window_length=30)
    
    # Percent Difference
    percent_difference = (mean_10-mean_30)/mean_30
    
    # List of Shorts
    shorts = percent_difference < 0
    
    # List of Longs
    longs = percent_difference > 0
    
    # Final Mask/Filter for anything in shorts or longs
    securities_to_trade = (shorts | longs)
    
    # return Pipeline
    pipe.add(mean_10, 'mean_10')
    pipe.add(mean_30, 'mean_30')
    pipe.add(longs, 'longs')
    pipe.add(shorts, 'shorts')
    pipe.add(percent_difference, 'perc_diff')
    pipe.set_screen(high_dollar_volume & securities_to_trade)
    
    return pipe

 

This is not an error but a warning. In this case, it signifies input pricing data for an asset are all NaNs. This may happen on Blueshift if an asset leaves the benchmark. I would not worry too much about it. 

Thank you so much Propdita,

thanks to you I have managed to finish my course on Udemy, that originally used Quantopian, and recommended we used Blueshift. They however did not provide any new material for us.



I will paste the Correct Fully Working Pipeline Blueshift 2.0 code, for any student that is interested in learning.



DataType: US Equities


  1. It filters the top 5% of Average Volume Traded
  2. It sorts stocks to go long or short based on the criteria:

                short = (10sma-30sma)/30sma < 0

                 long = (10sma-30sma)/30sma > 0
  3. At any time Capital will be alocated 50%long and 50% short,

    divided equally between number of stocks.
  4. No Leverage (hence weights will be 0.5 long, -0.5 short, the abs values sum to 1.00)

     

     
"""
    Title: L111_Pipeline_Trading_Algo
    Description: A Pipline trading algo
    Style tags: Systematic
    Asset class: Equities
    Dataset: US Equities
"""
from blueshift.api import(    symbol,
                            order_target_percent,
                            schedule_function,
                            date_rules,
                            time_rules,
                            pipeline_output,
                            attach_pipeline,
                            get_datetime,
                       )

from blueshift.pipeline import Pipeline, CustomFilter
from blueshift.pipeline.data import EquityPricing
#from blueshift.pipeline.factors import AverageDollarVolume,SimpleMovingAverage
from zipline_pipeline.pipeline.factors import AverageDollarVolume,SimpleMovingAverage


import pandas as pd
import numpy as np

def can_trade(asset):
    if asset.auto_close_date:
        if asset.auto_close_date >= get_datetime().date():
            return True
    return False


def initialize(context):
    """
        function to define things to do at the start of the strategy
    """
    # The context variables can be accessed by other methods
    

    schedule_function(my_rebalance,date_rules.week_start(),time_rules.market_open(minutes=0, hours=1))
    
    
    attach_pipeline(make_pipeline(),'my_pipeline')


    
def my_rebalance(context,data):
    for security in context.portfolio.positions:
        if security not in context.longs and security not in context.shorts and can_trade(security):
            order_target_percent(security,0)
            
    for security in context.longs:
        if can_trade(security):    
            order_target_percent(security,context.long_weight)

    for security in context.shorts:
        if can_trade(security):   
            order_target_percent(security,context.short_weight)




def my_compute_weights(context):
    
    if len(context.longs)==0:
        long_weight = 0
    else:
        long_weight = 0.5 / len(context.longs)
  
    if len(context.shorts)==0:
        short_weight = 0
    else:
        short_weight = -0.5 / len(context.shorts)
    
    return (long_weight,short_weight)



def before_trading_start(context,data):
    context.output = pipeline_output('my_pipeline')
    
    
    # LONG
    context.longs = context.output[context.output['longs']].index.tolist()
    
    # SHORT
    context.shorts = context.output[context.output['shorts']].index.tolist()


    context.long_weight,context.short_weight = my_compute_weights(context)



def make_pipeline():
    
    # Universe 
    pipe = Pipeline()
    
    # Dollar Volume (30 Days) Grab the Info
    dollar_volume = AverageDollarVolume(window_length=30)
    
    # Grab the top 5% in avg dollar volume
    high_dollar_volume = dollar_volume.percentile_between(95,100)
     
    # Combine the filters
    
    # 10 day mean close
    mean_10 = SimpleMovingAverage(inputs=[EquityPricing.close], window_length=10)
    
    # 30 day mean close
    mean_30 = SimpleMovingAverage(inputs=[EquityPricing.close], window_length=30)
    
    # Percent Difference
    percent_difference = (mean_10-mean_30)/mean_30
    
    # List of Shorts
    shorts = percent_difference < 0
    
    # List of Longs
    longs = percent_difference > 0
    
    # Final Mask/Filter for anything in shorts or longs
    securities_to_trade = (shorts | longs)
    
    # return Pipeline
    pipe.add(mean_10, 'mean_10')
    pipe.add(mean_30, 'mean_30')
    pipe.add(percent_difference, 'percent_difference')
    pipe.add(shorts, 'shorts')
    pipe.add(longs, 'longs')
    pipe.set_screen(high_dollar_volume & securities_to_trade)
    
    return pipe