元派遣プログラマの自称技術系ブログです。雑記とか自作のオープンソースプロジェクトの話とか。
Javaとか組込とかできます。お仕事ください。

Pythonでテクニカル指標を計算する

オレオレ指標を作り続けて数年、何をやっても損をするので、初心に帰ってPythonでテクニカル指標のRSI、RCIMACD、SMA(単純移動平均)、WMA加重移動平均)、EMA(指数平滑移動平均)を計算するコードを書いた。

二度と書きたくないのでここに記録する。

class Indicator:
    def __init__(self):
        pass
    @classmethod
    def _filledPrices(cls,prices):
        """
        [0]以外をfillした配列を作成する。
        """
        nprices=[i for i in prices]
        for i in range(1,len(nprices)):
            if nprices[i] is None:
                nprices[i]=nprices[i-1]
        return nprices
    @classmethod
    def rsi(cls,prices,term=14):
        """
        RSI値を計算します。
        """
        def rsi_first(d):
            """
            rsi[term]のパラメータを計算する。
            """
            up=0
            down=0
            l=len(d)
            for i in range(1,term+1):
                if d[i] is None:
                    continue
                if d[i]>0:
                    up=up+d[i]
                elif d[i]==0:
                    pass
                else:
                    down=down-d[i]
            return up/term,down/term

        #2番目以降のRSI
        def rsi_next(prev_up,prev_down,diff):
            """
            [term+1]移行の計算
            """
            up=prev_up*(term-1)
            down=prev_down*(term-1)
            if diff>0:
                up=up+diff
            elif diff==0:
                pass
            else:
                down=down-diff
            return up/term,down/term
        def makeDiffArray(prices):
            """
            [t]-[t-1]の差分配列を生成する。
            [t-1]がNoneの場合は最後に有効だった[t-n]の値を使う。
            [0]がNoneの場合には計算不能
            """
            f=None
            r=[None]
            for i in range(0,len(prices)):
                if prices[i] is not None:
                    f=i
                    break
                r.append(None)
            if f is None:
                return None
            last=prices[i]
            for i in range(f+1,len(prices)):
                if prices[i] is None:
                    #途中にNoneが挟まってた時は差分0として計算
                    r.append(0)
                    continue
                r.append(prices[i]-last)
                last=prices[i]

            return r
        da=makeDiffArray(prices)
        if da is None:
            return None
        r=[None]*(term)
        rsi=rsi_first(da)
        r.append(0.5 if rsi[0]+rsi[1]==0 else rsi[0]/(rsi[0]+rsi[1]))
        for i in range(term+1,len(prices)):
            rsi=rsi_next(rsi[0],rsi[1],da[i])
            r.append(0.5 if rsi[0]+rsi[1]==0 else rsi[0]/(rsi[0]+rsi[1]))
        return r
    @classmethod
    def rci(cls,prices,term=3):
        """
        RCI値を計算します。
        """
        #中間をfillした配列を作る
        nprices=cls._filledPrices(prices)

        def rci_(idx):
            if idx+1<term:
                return None
            #価格,日付Rank,価格Rankの配列
            if nprices[idx-term+1] is None:
                return None
            rank=[[nprices[idx-i],i+1,None] for i in range(term)]            
            rank.sort(key=lambda x:x[0])
            #価格Rankの平均化
            for i in range(len(rank)):
                rank[i][2]=i+1
            #print(rank)
            i=0
            while i<term:
                n=1
                s=rank[i][2]
                for j in range(i+1,len(rank)):
                    if rank[i][0]!=rank[j][0]:
                        break
                    n=n+1
                    s=s+rank[j][2]
                for j in range(n):
                    rank[i+j][2]=s/n
                i=i+n
            
            return (1-((6*sum([(i[1]-i[2])**2 for i in rank]))/(term*(term**2-1))))*100
        return [rci_(i) for i in range(len(nprices))] 
    @classmethod
    def sma(cls,prices,term=6):
        """
        単純移動平均を計算します。
        """
        assert(len(prices)>=term)
        nprices=cls._filledPrices(prices)
        r=[None]*(term-1)
        for i in range(term-1,len(nprices)):
            ma=nprices[i-term+1:i+1]
            if ma[0] is None:
                r.append(None)
            else:
                r.append(sum(ma)/term)
        return r
    @classmethod
    def wma(cls,prices,term=3):
        """
        加重移動平均を計算します。
        """
        assert(len(prices)>=term)
        nprices=cls._filledPrices(prices)
        r=[None]*(term-1)
        for i in range(term-1,len(nprices)):
            ma=nprices[i-term+1:i+1]
            if ma[0] is None:
                r.append(None)
            else:
                r.append(sum([(j+1)*ma[j] for j in range(term)])/(term*(2+(term-1))/2))
        return r
    @classmethod
    def ema(cls,prices,term=3,normalized=False):
        """
        指数何とか移動平均を計算します。
        """        
        assert(len(prices)>=term)
        nprices=prices if normalized else cls._filledPrices(prices)
        #1日目
        ret=[None]*(term-1)
        for i in range(term-1,len(nprices)):
            ma=nprices[i-term+1:i+1] 
            if ma[0] is None:
                ret.append(None)
                continue
            first=sum(ma)/term
            ret.append(first)
            break
        for i in range(len(ret),len(nprices)):
            ret.append((ret[-1]*(term-1)+nprices[i]*2)/(term+1))
        return ret
    @classmethod
    def macd(cls,prices,short_term=12,long_term=26,signal_term=26):
        """
        macdを計算します。
        returns
           [ diff,signal,histogram]
        """
        assert(len(prices)>=short_term)
        assert(len(prices)>=signal_term)
        assert(short_term<=long_term)
        nprices=cls._filledPrices(prices)
        s=cls.ema(nprices,short_term,True)
        l=cls.ema(nprices,long_term,True)
        d=[None if i is None or j is None else i-j for i,j in zip(s,l)]
        sig=cls.ema(d,signal_term)
        return d,sig,[None if i is None or j is None else i-j for i,j in zip(d,sig)]

入力値は一次配列。このように使う。

    sma_testpatt=[1,10,100,1000,10000,5000,500]
    print(Indicator.rsi(sma_testpatt,3))
    print(Indicator.rci(sma_testpatt,3))
    print(Indicator.sma(sma_testpatt,3))
    print(Indicator.wma(sma_testpatt,3))
    print(Indicator.ema(sma_testpatt,3))
    print(Indicator.macd(sma_testpatt,3,4,3))

出力

[None, None, None, 1.0, 1.0, 0.5630898287312128, 0.35418269759261295]
[None, None, -100.0, -100.0, -100.0, -50.0, 100.0]
[None, None, 37.0, 370.0, 3700.0, 5333.333333333333, 5166.666666666667]
[None, None, 53.5, 535.0, 5350.0, 6000.0, 3583.3333333333335]
[None, None, 37.0, 518.5, 5259.25, 5129.625, 2814.8125]
([None, None, None, 240.75, 1092.6000000000004, 629.6350000000002, -85.18149999999969], [None, None, None, None, None, 654.3283333333335, 284.5734166666669], [None, None, None, None, None, -24.69333333333327, -369.7549166666666])