Hello,
Im coding a cointegration strategy between KO and PEP the last code to graph the PNL return me and error. i put all the code and the error. i couldnt find why that error
get_ipython().system(‘pip install --upgrade yfinance’)
import yfinance as yf
import numpy as np
import pandas as pd
import statsmodels
import matplotlib.pyplot as plt
import seaborn
For cointegration
import statsmodels.api as stat
import statsmodels.tsa.stattools as ts
import warnings
warnings.filterwarnings(“ignore”)
#im going to check the cointegration at 90% confidence with dickey-fuller test
import pyexcel
from datetime import datetime
In[329]:
get_ipython().system(‘pip install pandas-datareader’)
In[330]:
x=yf.download(“KO”,start=“2019-01-01”, end=“2022-12-31”,auto_adjust=True)
y= yf.download(“PEP”, start=“2019-01-01”, end=“2022-12-31”,auto_adjust=True)
In[331]:
x.index = pd.to_datetime(x.index)
y.index = pd.to_datetime(y.index)
y.isna().sum()
x.isna().sum()
x.duplicated().value_counts()
y.duplicated().value_counts()
In[367]:
def cointegration_test(x,y):
result=stat.OLS(x["Close"],y["Close"]).fit()
x_const = stat.add_constant(x["Close"])
##adf_results=ts.adfuller(result.resid)
if adf_results[0]<=adf_results[4][“10%”]:
##return "Pair is cointegrated"
else:
##return "Pair is not cointegrated"
return ts.adfuller(result.resid)
In[368]:
print(“Cointegration test”,cointegration_test(x,y))
In[369]:
#New dataframe
In:
In[370]:
data=
headers=[“Date”,“stock1”,“stock2”,“zscore”,“signal”,“status”,“buyprice”,“sellprice”,“mtm”,“cointegration test”]
data.append(headers)
#tolerance for the stdev
threshold=1.75
start=20
end=90
prev_status= “”
mtm=“”
prev_sell_price=“”
sell_price=“”
prev_buy_price=“”
buy_price=“”
SL=-10000
TP=20000
#lotaje
#Data 1
N=5000
#data2
M=5000
In[419]:
def zscore_cal(data1, data2, start, end):
s1 = (data1["Close"].iloc[start:end].values)
s2 = (data2["Close"].iloc[start:end].values)
valid_prices = (s1 > 0) & (s2 > 0)
s1 = s1[valid_prices]
s2 = s2[valid_prices]
if len(s1) == 0 or len(s2) == 0:
return 0
mvavg_old = np.mean(np.log(s1/s2))
std_old = np.std(np.log(s1/s2))
current_spread = np.log(
data1["Close"][end]/data2["Close"][end])
zscore = (current_spread - mvavg_old) / \
std_old if std_old > 0 else 0
return (zscore)
In:
In[420]:
print(type(x[“Close”]))
print(x[“Close”].shape)
In[421]:
def signal_cal(zscore,threshold, adfest):
if zscore> threshold and adfest=="Yes":
signal="SELL"
elif zscore<-threshold and adfest=="Yes":
signal="BUY"
else:
signal=""
return signal
In[422]:
def status_cal(prev_status, mtm, SL, TP, signal, adftest):
if prev_status in["","SL","TP","CB"]:
status = signal
else:
if adftest=="No":
status="CB"
else:
if mtm == "":
status = ""
else:
if mtm < SL:
status = "SL"
else:
if mtm>TP:
status="TP"
else:
status=prev_status
return status
In[423]:
def buy_price_cal(prev_status,prev_buy_price, buy_price,signal,status,data1,data2,end):
if status ==prev_status:
buy_price = prev_buy_price
else:
if status in ["SL", "TP", "CB", ""]:
buy_price = ""
else:
if signal=="BUY":
buy_price=data1["Close"][end]
else:
if signal =="SELL":
buy_price=data2["Close"][end]
else:
buy_price= ""
return buy_price
In[424]:
def sell_price_cal(prev_status,prev_sell_price,sell_price,signal,status,data1,data2,end):
if status==prev_status:
sell_price=prev_sell_price
else:
if status in [“SL”,“TP”,“CB”,“”]:
sell_price=“”
else:
if signal==“SELL”:
sell_price=data1[“Close”][end]
else:
if signal==“BUY”:
sell_price=data2[“Close”][end]
else:
sell_price=“”
return sell_price
In[425]:
def mtm_cal(data1, data2, prev_status, prev_sell_price, prev_buy_price, M, N, end):
if prev_status == "BUY":
mtm = (prev_sell_price-data2['Close'][end])*M + (data1['Close'][end] -
prev_buy_price)*N
else:
if prev_status == "SELL":
mtm = (prev_sell_price-data1['Close'][end])*N + (data2['Close'][end]
- prev_buy_price)*M
else:
mtm = ""
return mtm
In[426]:
def strategy (data1, data2, threshold, start, end, prev_status, mtm, prev_sell_price,sell_price,prev_buy_price,buy_price,SL,TP,M,N):
pnl=0
i=0
while end < len(data1) and end < len(data2):
for row in data1[90:].iterrows():
d1=data1[i:90+i]
d2=data2[i:90+i]
c_t=cointegration_test(d1,d2)
if c_t[0] <= c_t[4]["10%"] and c_t[1]<=0.1:
adftest="Yes"
else:
adftest="No"
zscore=zscore_cal(data1,data2,start,end)
signal=signal_cal(zscore,threshold,adftest)
mtm=mtm_cal(data1,data2,prev_status,prev_sell_price,prev_buy_price,M,N,end)
status = status_cal(prev_status, mtm, SL, TP, signal, adftest)
buy_price=buy_price_cal(prev_status,prev_buy_price,buy_price,signal,status,data1,data2,end)
sell_price=sell_price_cal(prev_status,prev_sell_price,sell_price,signal,status,data1,data2,end)
pnl=(pnl+mtm) if status in ["TP","SL","CB"] else pnl
prev_sell_price = sell_price
prev_status = status
prev_buy_price = buy_price
data.append([
str(data1.index[end])[:10],
data1["Close"][end],
data2["Close"][end],
zscore,
signal,
status,
buy_price if buy_price != "" else np.nan,
sell_price if sell_price != "" else np.nan,
mtm if mtm != "" else 0,
pnl,
adftest])
end = end+1
start = start+1
i = i+1
pyexcel.save_as(array=data,dest_file_name="pair_strategy_withoutzipline-90.csv")
In:
In[427]:
fig, ax=plt.subplots(1,1, figsize=(10,7))
ax.plot(x.index.to_numpy(), x[‘Close’].to_numpy(), label=“Coke”)
ax.plot(y.index.to_numpy(), y[‘Close’].to_numpy(), label=“Pepsi”)
ax.legend()
ax.set_xlabel(“Date”, fontsize=12)
ax.set_ylabel(“Close Prices”, fontsize=12)
ax.set_title(“Coke and Pepsi”, fontsize=14)
ax.grid()
plt.show()
In[428]:
fig, ax = plt.subplots(1, 1, figsize=(10, 7))
Plot the spread
ax.plot(x.index.to_numpy(), np.log(
x.Close.values/y.Close.values), label=‘Spread’)
plt.legend()
plt.xlabel(‘Date’, fontsize=12)
plt.ylabel(‘Spread’, fontsize=12)
plt.title(‘Spread of x and Lead y Prices’, fontsize=14)
plt.grid()
plt.show()
In[429]:
strategy (x, y, threshold, start, end, prev_status, mtm, prev_sell_price,sell_price,prev_buy_price,buy_price,SL,TP,M,N)
result = pd.DataFrame(data, columns=headers)[1:]
result.set_index(‘Date’,inplace=True)
result.index = pd.to_datetime(result.index)
fig, ax = plt.subplots(1, 1, figsize=(12, 7))
ax.plot(result_df.index, result_df[‘pnl’], label=‘Profit and Loss’, color=‘blue’)
ax.axhline(y=0, color=‘black’, linestyle=‘-’) # Línea de referencia en 0
ax.set_title(“Strategy Performance - KO vs PEP Pair Trading”, fontsize=14)
plt.legend()
plt.xlabel(‘Date’, fontsize=12)
plt.ylabel(‘PnL’, fontsize=12)
plt.title(‘PnL’, fontsize=14)
plt.grid()
plt.show()
KeyError Traceback (most recent call last)
File ~\anaconda3\envs\quantra_py\lib\site-packages\pandas\core\indexes\base.py:3805, in Index.get_loc(self, key)
3804 try:
→ 3805 return self._engine.get_loc(casted_key)
3806 except KeyError as err:
File index.pyx:167, in pandas._libs.index.IndexEngine.get_loc()
File index.pyx:196, in pandas._libs.index.IndexEngine.get_loc()
File pandas\_libs\hashtable_class_helper.pxi:7081, in pandas._libs.hashtable.PyObjectHashTable.get_item()
File pandas\_libs\hashtable_class_helper.pxi:7089, in pandas._libs.hashtable.PyObjectHashTable.get_item()
KeyError: 90
The above exception was the direct cause of the following exception:
KeyError Traceback (most recent call last)
Cell In[429], line 1
----> 1 strategy (x, y, threshold, start, end, prev_status, mtm, prev_sell_price,sell_price,prev_buy_price,buy_price,SL,TP,M,N)
3 result = pd.DataFrame(data, columns=headers)[1:]
5 result.set_index(‘Date’,inplace=True)
Cell In[426], line 14, in strategy(data1, data2, threshold, start, end, prev_status, mtm, prev_sell_price, sell_price, prev_buy_price, buy_price, SL, TP, M, N)
11 else:
12 adftest=“No”
—> 14 zscore=zscore_cal(data1,data2,start,end)
16 signal=signal_cal(zscore,threshold,adftest)
18 mtm=mtm_cal(data1,data2,prev_status,prev_sell_price,prev_buy_price,M,N,end)
Cell In[419], line 19, in zscore_cal(data1, data2, start, end)
13 mvavg_old = np.mean(np.log(s1/s2))
14 std_old = np.std(np.log(s1/s2))
18 current_spread = np.log(
—> 19 data1[“Close”][end]/data2[“Close”][end])
23 zscore = (current_spread - mvavg_old) /
24 std_old if std_old > 0 else 0
26 return (zscore)
File ~\anaconda3\envs\quantra_py\lib\site-packages\pandas\core\frame.py:4102, in DataFrame.getitem(self, key)
4100 if self.columns.nlevels > 1:
4101 return self._getitem_multilevel(key)
→ 4102 indexer = self.columns.get_loc(key)
4103 if is_integer(indexer):
4104 indexer = [indexer]
File ~\anaconda3\envs\quantra_py\lib\site-packages\pandas\core\indexes\base.py:3812, in Index.get_loc(self, key)
3807 if isinstance(casted_key, slice) or (
3808 isinstance(casted_key, abc.Iterable)
3809 and any(isinstance(x, slice) for x in casted_key)
3810 ):
3811 raise InvalidIndexError(key)
→ 3812 raise KeyError(key) from err
3813 except TypeError:
3814 # If we have a listlike key, _check_indexing_error will raise
3815 # InvalidIndexError. Otherwise we fall through and re-raise
3816 # the TypeError.
3817 self._check_indexing_error(key)
KeyError: 90