Below is some of the code of my Pipeline Algo:
Am I not entering the Parameters for SimpleMovingAverage correctly?
it says it requires a dtype? Iam working from a course that was using zipline, but I am making an attempt.
Error in strategy - Traceback (most recent call last):
File "L111_Pipeline_Trading_Algo.py", line 33, in initialize
File "L111_Pipeline_Trading_Algo.py", line 95, in make_pipeline
zipline_pipeline.errors.DTypeNotSpecified: SimpleMovingAverage requires a dtype, but no dtype was passed.
Blueshift Alert[2022-03-01 00:00:00-05:00] ERROR in algorithm:Fatal error, will exit.
Blueshift Alert[2022-06-14 07:44:12.445447-04:00] WARNING :Shutting down Blueshift gracefully.
def make_pipeline():
# Universe
pipe = Pipeline()
# Dollar Volume (30 Days) Grab the Info
dollar_volume = AverageDollarVolume(window_length=30)
# Grab the top 5% in avg dollar volume
high_dollar_volume = dollar_volume.percentile_between(95,100)
# Combine the filters
# 10 day mean close
mean_10 = SimpleMovingAverage(inputs=[EquityPricing.close],window_length=10)
# 30 day mean close
mean_30 = SimpleMovingAverage(inputs=[EquityPricing.close],window_length=30)
# Percent Difference
percent_difference = (mean_10-mean_30)/mean_30
# List of Shorts
shorts = percent_difference < 0
# List of Longs
longs = percent_difference > 0
# Final Mask/Filter for anything in shorts or longs
securities_to_trade = (shorts | longs)
# return Pipeline
pipe.add(longs, 'longs')
pipe.add(shorts, 'short')
pipe.add(percent_difference, 'perc_diff')
pipe.set_screen(high_dollar_volume)
pipe.set_screen(securities_to_trade)
return pipe
Here is all the code in its entirety:
"""
Title: L111_Pipeline_Trading_Algo
Description: A Pipline trading algo
Style tags: Systematic
Asset class: Equities
Dataset: US Equities
"""
from blueshift.api import( symbol,
order_target_percent,
schedule_function,
date_rules,
time_rules,
pipeline_output,
attach_pipeline,
get_datetime,
)
from blueshift.pipeline import Pipeline, CustomFilter
from blueshift.pipeline.data import EquityPricing
from blueshift.pipeline.factors import AverageDollarVolume,SimpleMovingAverage
#from blueshift_library.technicals.indicators import sma
def initialize(context):
"""
function to define things to do at the start of the strategy
"""
# The context variables can be accessed by other methods
schedule_function(my_rebalance,date_rules.week_start(),time_rules.market_open(minutes=0, hours=1))
attach_pipeline(make_pipeline(),'my_pipeline')
def my_rebalance(context,data):
for security in context.portfolio.positions:
if security not in context.longs and security not in context.shorts and data.can_trade(security):
order_target_percent(security,0)
for security in context.longs:
if data.can_trade(security):
order_target_percent(security,context.long_weight)
for security in context.shorts:
if data.can_trade(security):
order_target_percent(security,context.short_weight)
def my_compute_weights(context):
if len(context.longs)==0:
long_weight = 0
else:
long_weight = 0.5 / len(context.longs)
if len(context.shorts)==0:
short_weight = 0
else:
short_weight = 0.5 / len(context.shorts)
return (long_weight,short_weight)
def before_trading_start(context,data):
context.output = pipeline_output('my_pipeline')
# LONG
context.longs = context.output[context.output['longs']].index.tolist()
# SHORT
context.shorts = context.output[context.output['shorts']].index.tolist()
context.long_weight,context.short_weight = my_compute_weights(context)
def make_pipeline():
# Universe
pipe = Pipeline()
# Dollar Volume (30 Days) Grab the Info
dollar_volume = AverageDollarVolume(window_length=30)
# Grab the top 5% in avg dollar volume
high_dollar_volume = dollar_volume.percentile_between(95,100)
# Combine the filters
# 10 day mean close
mean_10 = SimpleMovingAverage(inputs=[EquityPricing.close],window_length=10)
# 30 day mean close
mean_30 = SimpleMovingAverage(inputs=[EquityPricing.close],window_length=30)
# Percent Difference
percent_difference = (mean_10-mean_30)/mean_30
# List of Shorts
shorts = percent_difference < 0
# List of Longs
longs = percent_difference > 0
# Final Mask/Filter for anything in shorts or longs
securities_to_trade = (shorts | longs)
# return Pipeline
pipe.add(longs, 'longs')
pipe.add(shorts, 'short')
pipe.add(percent_difference, 'perc_diff')
pipe.set_screen(high_dollar_volume)
pipe.set_screen(securities_to_trade)
return pipe
Thank you,
Philip
Quite a few of the pipeline factors (and some other features) are not yet available in Blueshift directly. For these, you can try importing them directly from the zipline_pipeline module. This is the original zipline pipeline module adapted for latest python releases (and packages) and our datasets. So pleae try "from zipline_pipeline.pipeline.factors import AverageDollarVolume,SimpleMovingAverage" instead of "from blueshift.pipeline.factors import AverageDollarVolume,SimpleMovingAverage". In most cases importing from Blueshift will work though (more so in the future).
Also please see this thread for your "can_trade" calls.
Thanks again Propdita, also thanks for reminding me about can_trade().
I've updated the code and I am getting results, which is good.
Backtest results, with Logs:
https://blueshift.quantinsti.com/research/api/share/public/W6mzUkpiF
However in the Error log I'm getting:
/usr/local/lib/python3.6/dist-packages/zipline_pipeline/pipeline/factors/basic.py:113: RuntimeWarning:
Mean of empty slice
/usr/local/lib/python3.6/dist-packages/zipline_pipeline/pipeline/factors/basic.py:113: RuntimeWarning:
Mean of empty slice
Is there a bug with the SimpleMovingAverage import?
I'll put all my code in my next post:
from blueshift.api import( symbol,
order_target_percent,
schedule_function,
date_rules,
time_rules,
pipeline_output,
attach_pipeline,
get_datetime,
)
from blueshift.pipeline import Pipeline, CustomFilter
from blueshift.pipeline.data import EquityPricing
#from blueshift.pipeline.factors import AverageDollarVolume,SimpleMovingAverage
from zipline_pipeline.pipeline.factors import AverageDollarVolume,SimpleMovingAverage
import pandas as pd
import numpy as np
def can_trade(asset):
if asset.auto_close_date:
if asset.auto_close_date >= get_datetime().date():
return True
return False
def initialize(context):
"""
function to define things to do at the start of the strategy
"""
# The context variables can be accessed by other methods
schedule_function(my_rebalance,date_rules.week_start(),time_rules.market_open(minutes=0, hours=1))
attach_pipeline(make_pipeline(),'my_pipeline')
def my_rebalance(context,data):
for security in context.portfolio.positions:
if security not in context.longs and security not in context.shorts and can_trade(security):
order_target_percent(security,0)
for security in context.longs:
if can_trade(security):
order_target_percent(security,context.long_weight)
for security in context.shorts:
if can_trade(security):
order_target_percent(security,context.short_weight)
def my_compute_weights(context):
if len(context.longs)==0:
long_weight = 0
else:
long_weight = 0.5 / len(context.longs)
if len(context.shorts)==0:
short_weight = 0
else:
short_weight = 0.5 / len(context.shorts)
return (long_weight,short_weight)
def before_trading_start(context,data):
context.output = pipeline_output('my_pipeline')
# LONG
context.longs = context.output[context.output['longs']].index.tolist()
# SHORT
context.shorts = context.output[context.output['shorts']].index.tolist()
context.long_weight,context.short_weight = my_compute_weights(context)
def make_pipeline():
# Universe
pipe = Pipeline()
# Dollar Volume (30 Days) Grab the Info
dollar_volume = AverageDollarVolume(window_length=30)
# Grab the top 5% in avg dollar volume
high_dollar_volume = dollar_volume.percentile_between(95,100)
# Combine the filters
# 10 day mean close
mean_10 = SimpleMovingAverage(inputs=[EquityPricing.close], window_length=10)
# 30 day mean close
mean_30 = SimpleMovingAverage(inputs=[EquityPricing.close], window_length=30)
# Percent Difference
percent_difference = (mean_10-mean_30)/mean_30
# List of Shorts
shorts = percent_difference < 0
# List of Longs
longs = percent_difference > 0
# Final Mask/Filter for anything in shorts or longs
securities_to_trade = (shorts | longs)
# return Pipeline
pipe.add(mean_10, 'mean_10')
pipe.add(mean_30, 'mean_30')
pipe.add(longs, 'longs')
pipe.add(shorts, 'shorts')
pipe.add(percent_difference, 'perc_diff')
pipe.set_screen(high_dollar_volume & securities_to_trade)
return pipe
This is not an error but a warning. In this case, it signifies input pricing data for an asset are all NaNs. This may happen on Blueshift if an asset leaves the benchmark. I would not worry too much about it.
Thank you so much Propdita,
thanks to you I have managed to finish my course on Udemy, that originally used Quantopian, and recommended we used Blueshift. They however did not provide any new material for us.
I will paste the Correct Fully Working Pipeline Blueshift 2.0 code, for any student that is interested in learning.
DataType: US Equities
- It filters the top 5% of Average Volume Traded
- It sorts stocks to go long or short based on the criteria:
short = (10sma-30sma)/30sma < 0
long = (10sma-30sma)/30sma > 0
- At any time Capital will be alocated 50%long and 50% short,
divided equally between number of stocks.
- No Leverage (hence weights will be 0.5 long, -0.5 short, the abs values sum to 1.00)
"""
Title: L111_Pipeline_Trading_Algo
Description: A Pipline trading algo
Style tags: Systematic
Asset class: Equities
Dataset: US Equities
"""
from blueshift.api import( symbol,
order_target_percent,
schedule_function,
date_rules,
time_rules,
pipeline_output,
attach_pipeline,
get_datetime,
)
from blueshift.pipeline import Pipeline, CustomFilter
from blueshift.pipeline.data import EquityPricing
#from blueshift.pipeline.factors import AverageDollarVolume,SimpleMovingAverage
from zipline_pipeline.pipeline.factors import AverageDollarVolume,SimpleMovingAverage
import pandas as pd
import numpy as np
def can_trade(asset):
if asset.auto_close_date:
if asset.auto_close_date >= get_datetime().date():
return True
return False
def initialize(context):
"""
function to define things to do at the start of the strategy
"""
# The context variables can be accessed by other methods
schedule_function(my_rebalance,date_rules.week_start(),time_rules.market_open(minutes=0, hours=1))
attach_pipeline(make_pipeline(),'my_pipeline')
def my_rebalance(context,data):
for security in context.portfolio.positions:
if security not in context.longs and security not in context.shorts and can_trade(security):
order_target_percent(security,0)
for security in context.longs:
if can_trade(security):
order_target_percent(security,context.long_weight)
for security in context.shorts:
if can_trade(security):
order_target_percent(security,context.short_weight)
def my_compute_weights(context):
if len(context.longs)==0:
long_weight = 0
else:
long_weight = 0.5 / len(context.longs)
if len(context.shorts)==0:
short_weight = 0
else:
short_weight = -0.5 / len(context.shorts)
return (long_weight,short_weight)
def before_trading_start(context,data):
context.output = pipeline_output('my_pipeline')
# LONG
context.longs = context.output[context.output['longs']].index.tolist()
# SHORT
context.shorts = context.output[context.output['shorts']].index.tolist()
context.long_weight,context.short_weight = my_compute_weights(context)
def make_pipeline():
# Universe
pipe = Pipeline()
# Dollar Volume (30 Days) Grab the Info
dollar_volume = AverageDollarVolume(window_length=30)
# Grab the top 5% in avg dollar volume
high_dollar_volume = dollar_volume.percentile_between(95,100)
# Combine the filters
# 10 day mean close
mean_10 = SimpleMovingAverage(inputs=[EquityPricing.close], window_length=10)
# 30 day mean close
mean_30 = SimpleMovingAverage(inputs=[EquityPricing.close], window_length=30)
# Percent Difference
percent_difference = (mean_10-mean_30)/mean_30
# List of Shorts
shorts = percent_difference < 0
# List of Longs
longs = percent_difference > 0
# Final Mask/Filter for anything in shorts or longs
securities_to_trade = (shorts | longs)
# return Pipeline
pipe.add(mean_10, 'mean_10')
pipe.add(mean_30, 'mean_30')
pipe.add(percent_difference, 'percent_difference')
pipe.add(shorts, 'shorts')
pipe.add(longs, 'longs')
pipe.set_screen(high_dollar_volume & securities_to_trade)
return pipe