视频1 视频21 视频41 视频61 视频文章1 视频文章21 视频文章41 视频文章61 推荐1 推荐3 推荐5 推荐7 推荐9 推荐11 推荐13 推荐15 推荐17 推荐19 推荐21 推荐23 推荐25 推荐27 推荐29 推荐31 推荐33 推荐35 推荐37 推荐39 推荐41 推荐43 推荐45 推荐47 推荐49 关键词1 关键词101 关键词201 关键词301 关键词401 关键词501 关键词601 关键词701 关键词801 关键词901 关键词1001 关键词1101 关键词1201 关键词1301 关键词1401 关键词1501 关键词1601 关键词1701 关键词1801 关键词1901 视频扩展1 视频扩展6 视频扩展11 视频扩展16 文章1 文章201 文章401 文章601 文章801 文章1001 资讯1 资讯501 资讯1001 资讯1501 标签1 标签501 标签1001 关键词1 关键词501 关键词1001 关键词1501 专题2001
python量化之股票分析
2025-09-30 19:38:22 责编:小OO
文档
python量化之股票分析

Python股票数据分析

最近在学习基于python的股票数据分析,其中主要⽤到了tushare和seaborn。tushare是⼀款财经类数据接⼝包,国内的股票数据还是⽐较全的

  导⼊的模块:

import matplotlib.pyplot as plt

  import seaborn as sns

  import seaborn.linearmodels as snsl

from datetime import datetime

  import tushare as ts

代码部分:

  股票收盘价⾛势曲线

  sns.set_style("whitegrid")

  end = datetime.today() #开始时间结束时间,选取最近⼀年的数据

  start = datetime(end.year-1,end.month,end.day)

  end = str(end)[0:10]

  start = str(start)[0:10]

stock = ts.get_hist_data('300104',start,end)#选取⼀⽀股票

  stock['close'].plot(legend=True ,figsize=(10,4))

  plt.show()

股票⽇线

同理,可以做出5⽇均线、10⽇均线以及20⽇均线

  stock[['close','ma5','ma10','ma20']].plot(legend=True ,figsize=(10,4))

⽇线、5⽇均线、10⽇均线、20⽇均线

股票每⽇涨跌幅度

  stock['Daily Return'] = stock['close'].pct_change()

  stock['Daily Return'].plot(legend=True,figsize=(10,4))

每⽇涨跌幅

核密度估计

  sns.kdeplot(stock['Daily Return'].dropna())

核密度估计

核密度估计+统计柱状图

  sns.distplot(stock['Daily Return'].dropna(),bins=100)

核密度+柱状图

两⽀股票的⽪尔森相关系数  sns.jointplot(stock['Daily Return'],stock['Daily Return'],alpha=0.2)

⽪尔森相关系数

多只股票相关性计算

  stock_lis=['300113','300343','300295','300315`] #随便选取了四⽀互联⽹相关的股票

  df=pd.DataFrame()

  for stock in stock_lis: closing_df = ts.get_hist_data(stock,start,end)['close'] df = df.join(pd.DataFrame({stock:closing_df}),how='outer')

  tech_rets = df.pct_change()

  snsl.corrplot(tech_rets.dropna())

  相关性

简单地计算股票的收益与风险,衡量股票收益与风险的数值分别为股票涨跌的平均值以及标准差,平均值为正则说明收益是正的,标准差越⼤则说明股票波动⼤,风险也⼤。

  rets = tech_rets.dropna()

  plt.scatter(rets.mean(),rets.std())

  plt.xlabel('Excepted Return')

  plt.ylabel('Risk')

  for label,x,y in zip(rets.columns,rets.mean(),rets.std()):#添加标注 plt.annotate( label, xy =(x,y),xytext=(15,15), textcoords = 'offset points', arrowprops = dict(arrowstyle = '-',connectionstyle = 'arc3,rad=-0.3'))

声明:本⽂由⼊驻搜狐公众平台的作者撰写,除搜狐官⽅账号外,观点仅代表作者本⼈,不代表搜狐⽴场。

⽤Python分析公开数据选出⾼送转预期股票

根据以往的经验,每年年底都会有⼀波⾼送转预期⾏情。今天,⽶哥就带⼤家实践⼀下如何利⽤tushare实现⾼送转预期选股。

本⽂主要是讲述选股的思路⽅法,选股条件和参数⼤家可以根据⽶哥提供的代码⾃⾏修改。

1. 选股原理

⼀般来说,具备⾼送转预期的个股,都具有总市值低、每股公积⾦⾼、每股收益⼤,流通股本少的特点。当然,也还有其它的因素,⽐如当前股价、经营收益变动情况、以及以往分红送股习惯等等。

这⾥我们暂时只考虑每股公积⾦、每股收益、流通股本和总市值四个因素,将公积⾦⼤于等于5元,每股收益⼤于等于5⽑,流通股本在3亿以下,总市值在100亿以内作为⾼送转预期⽬标(这些参数⼤家可根据⾃⼰的经验随意调整)。

2. 数据准备

⾸先要导⼊tushare:

import tushare as ts

调取股票基本⾯数据和⾏情数据

# 基本⾯数据

basic = ts.get_stock_basics()

# ⾏情和市值数据

hq = ts.get_today_all()

3. 数据清洗整理

#当前股价,如果停牌则设置当前价格为上⼀个交易⽇股价

hq['trade'] = hq.apply(lambda x:x.settlement if x.trade==0 else x.trade, axis=1)#分别选取流通股本,总股本,每股公积⾦,每股收益

basedata = basic[['outstanding', 'totals', 'reservedPerShare', 'esp']]

#选取股票代码,名称,当前价格,总市值,流通市值

hqdata = hq[['code', 'name', 'trade', 'mktcap', 'nmc']]

#设置⾏情数据code为index列

hqdata = hqdata.set_index('code')

#合并两个数据表

data = basedata.merge(hqdata, left_index=True, right_index=True)

4. 选股条件

根据上⽂提到的选股参数和条件,我们对数据进⼀步处理。

将总市值和流通市值换成亿元单位

data['mktcap'] = data['mktcap'] / 10000

data['nmc'] = data['nmc'] / 10000

设置参数和过滤值(此次各⾃调整)

#每股公积⾦>=5

res = data.reservedPerShare >= 5

#流通股本<=3亿

out = data.outstanding <= 30000

#每股收益>=5⽑

eps = data.esp >= 0.5

#总市值<100亿

mktcap = data.mktcap <= 100

取并集结果:

allcrit = res & out & eps & mktcap

selected = data[allcrit]

具有⾼送转预期股票的结果呈现:

以上字段的含义分别为:股票名称、收盘价格、每股公积⾦、流通股本、每股收益(应该为eps,之前发布笔误)、总市值和流通市值。https://zhuanlan.zhihu.com/p/23829205

Python ⾦叉判定

def jincha(context, bar_dict, his):

#站上5⽇线

def zs5(context, bar_dict, his):

ma_n = pd.rolling_mean(his, 5)

temp = his - ma_n

#temp_s包含了前⼀天站上五⽇线得股票代码

temp_s = list(temp[temp>0].iloc[-1,:].dropna().index)

return temp_s

#站上10⽇线

def zs10(context, bar_dict, his):

ma_n = pd.rolling_mean(his, 10)temp = his - ma_n

temp_s = list(temp[temp>0].iloc[-1,:].dropna().index)

return temp_s

#⾦叉突破

def jc(context, bar_dict, his):

mas = pd.rolling_mean(his,5)

mal = pd.rolling_mean(his, 10)

temp = mas - mal

#temp_jc昨天⼤于0股票代码

#temp_r前天⼤于0股票代码

temp_jc = list(temp[temp>0].iloc[-1,:].dropna().index)

temp_r = list(temp[temp>0].iloc[-2,:].dropna().index)

temp = []

for stock in temp_jc:

if stock not in temp_r:

temp.append(stock)

return temp

#求三种条件下的股票代码交集

con1 = zs5(context, bar_dict, his)

con2 = zs10(context, bar_dict, his)

con3 = jc(context, bar_dict, his)

tar_list=[con1,con2,con3]

tarstock = tar_list[0]

for i in tar_list:

tarstock = list(set(tarstock).intersection(set(i)))

return tarstock

Python 过滤次新股、停牌、涨跌停#过滤次新股、是否涨跌停、是否停牌等条件

def filcon(context,bar_dict,tar_list):

def zdt_trade(stock, context, bar_dict):

yesterday = history(2,'1d', 'close')[stock].values[-1]

zt = round(1.10 * yesterday,2)

dt = round(0.99 * yesterday,2)

#last最后交易价

return dt < bar_dict[stock].last < zt

filstock = []

for stock in tar_list:

con1 = ipo_days(stock,context.now) > 60con2 = bar_dict[stock].is_trading

con3 = zdt_trade(stock,context,bar_dict)

if con1 & con2 & con3:

filstock.append(stock)

return filstock

Python 按平均持仓市值调仓

# 按平均持仓市值调仓

def for_balance(context, bar_dict):

#mvalues = context.portfolio.market_value

#avalues = context.portfolio.portfolio_value

#per = mvalues / avalues

hlist = []

for stock in context.portfolio.positions:

#获取股票及对应持仓市值

hlist.append([stock,bar_dict[stock].last * context.portfolio.positions[stock].quantity])

if hlist:

#按持仓市值由⼤到⼩排序

hlist = sorted(hlist,key=lambda x:x[1], reverse=True)

temp = 0

for li in hlist:

#计算持仓总市值

temp += li[1]

for li in hlist:

#平均各股持仓市值

if bar_dict[li[0]].is_trading:

order_target_value(li[0], temp/len(hlist))

return

Python PCA主成分分析算法

Python主成分分析算法的作⽤是提取样本的主要特征向量,从⽽实现数据降维的⽬的。# -*- coding: utf-8 -*-

"""

Created on Sun Feb 28 10:04:26 2016

PCA source code

@author: liudiwei

"""

import numpy as np

import pandas as pd

import matplotlib.pyplot as plt

#计算均值,要求输⼊数据为numpy的矩阵格式,⾏表⽰样本数,列表⽰特征

def meanX(dataX):

return np.mean(dataX,axis=0)#axis=0表⽰按照列来求均值,如果输⼊list,则axis=1

#计算⽅差,传⼊的是⼀个numpy的矩阵格式,⾏表⽰样本数,列表⽰特征

def variance(X):

m, n = np.shape(X)

mu = meanX(X)

muAll = np.tile(mu, (m, 1))

X1 = X - muAll

variance = 1./m * np.diag(X1.T * X1)

return variance

#标准化,传⼊的是⼀个numpy的矩阵格式,⾏表⽰样本数,列表⽰特征

def normalize(X):

m, n = np.shape(X)

mu = meanX(X)

muAll = np.tile(mu, (m, 1))

X1 = X - muAll

X2 = np.tile(np.diag(X.T * X), (m, 1))

XNorm = X1/X2

return XNorm

"""

参数:- XMat:传⼊的是⼀个numpy的矩阵格式,⾏表⽰样本数,列表⽰特征

- k:表⽰取前k个特征值对应的特征向量

返回值:

- finalData:参数⼀指的是返回的低维矩阵,对应于输⼊参数⼆

- reconData:参数⼆对应的是移动坐标轴后的矩阵

"""

def pca(XMat, k):

average = meanX(XMat)

m, n = np.shape(XMat)

data_adjust = []

avgs = np.tile(average, (m, 1))

data_adjust = XMat - avgs

covX = np.cov(data_adjust.T) #计算协⽅差矩阵

featValue, featVec= np.linalg.eig(covX) #求解协⽅差矩阵的特征值和特征向量

index = np.argsort(-featValue) #按照featValue进⾏从⼤到⼩排序

finalData = []

if k > n:

print"k must lower than feature number"

return

else:

#注意特征向量时列向量,⽽numpy的⼆维矩阵(数组)a[m][n]中,a[1]表⽰第1⾏值

selectVec = np.matrix(featVec.T[index[:k]]) #所以这⾥需要进⾏转置

finalData = data_adjust * selectVec.T

reconData = (finalData * selectVec) + average

return finalData, reconData

def loaddata(datafile):

return np.array(pd.read_csv(datafile,sep="\

def plotBestFit(data1, data2):

dataArr1 = np.array(data1)

dataArr2 = np.array(data2)

m = np.shape(dataArr1)[0]

axis_x1 = []

axis_y1 = []

axis_x2 = []

axis_y2 = []

for i in range(m):

axis_x1.append(dataArr1[i,0])

axis_y1.append(dataArr1[i,1])

axis_x2.append(dataArr2[i,0])

axis_y2.append(dataArr2[i,1])

fig = plt.figure()

ax = fig.add_subplot(111)

ax.scatter(axis_x1, axis_y1, s=50, c='red', marker='s')

ax.scatter(axis_x2, axis_y2, s=50, c='blue')

plt.xlabel('x1'); plt.ylabel('x2');

plt.savefig("outfile.png")

plt.show()

#简单测试

#数据来源:http://www.cnblogs.com/jerrylead/archive/2011/04/18/2020209.html

def test():

X = [[2.5, 0.5, 2.2, 1.9, 3.1, 2.3, 2, 1, 1.5, 1.1],

[2.4, 0.7, 2.9, 2.2, 3.0, 2.7, 1.6, 1.1, 1.6, 0.9]]

XMat = np.matrix(X).T

k = 2

return pca(XMat, k)

#根据数据集data.txt

def main():

datafile = "data.txt"

XMat = loaddata(datafile)

k = 2

return pca(XMat, k)

if__name__ == "__main__":

finalData, reconMat = main()

plotBestFit(finalData, reconMat)

经过主成分降维的数据如红⾊图案所⽰,蓝⾊的是恢复的原始数据。可以看到经过降维的数据样本差异更加明显。

Python KNN最近邻分类算法

KNN最近邻算法:利⽤向量之间的距离来分类。

步骤:

第⼀步:计算新样本与已知分类样本之间的距离。

第⼆步:将所求距离按从⼩到⼤排列。

第三步:选取距离最近的k个样本。

第四步:将新样本归为以上k个样本⼤多数中的⼀类。

以下为KNN最近邻分类算法的python代码:

第⼀部分:KNN分类代码

# -*- coding: utf-8 -*-

"""

Created on Mon Feb 22 13:21:22 2016

K-NearestNeighbor

"""

import numpy as np

import operator

class KNNClassifier():

"""This is a Nearest Neighbor classifier. """

#定义k的值

def__init__(self, k=3):

self._k = k

#计算新样本与已知分类样本的距离并从⼩到⼤排列

def _calEDistance(self, inSample, dataset):

m = dataset.shape[0]

diffMat = np.tile(inSample, (m,1)) - dataset

sqDiffMat = diffMat**2 #每个元素平⽅

sqDistances = sqDiffMat.sum(axis = 1) #求和

distances = sqDistances ** 0.5 #开根号

return distances.argsort() #按距离的从⼩到达排列的下标值

def _classify0(self, inX, dataSet, labels):

k = self._k

dataSetSize = dataSet.shape[0]

diffMat = np.tile(inX, (dataSetSize,1)) - dataSet

sqDiffMat = diffMat**2

sqDistances = sqDiffMat.sum(axis=1)

distances = sqDistances**0.5

sortedDistIndicies = distances.argsort()

classCount={}

for i in range(k):

voteIlabel = labels[sortedDistIndicies[i]]

classCount[voteIlabel] = classCount.get(voteIlabel,0) + 1

sortedClassCount = sorted(classCount.iteritems(), key=operator.itemgetter(1), reverse=True) return sortedClassCount[0][0]

#对⼀个样本进⾏分类

def _classify(self, sample, train_X, train_y):

#数据类型检测

if isinstance(sample, np.ndarray) and isinstance(train_X, np.ndarray) \

and isinstance(train_y, np.ndarray):

pass

else:

try:

sample = np.array(sample)

train_X = np.array(train_X)

train_y = np.array(train_y)

except:

raise TypeError("numpy.ndarray required for train_X and ..")

sortedDistances = self._calEDistance(sample, train_X)

classCount = {}

for i in range(self._k):

oneVote = train_y[sortedDistances[i]] #获取最近的第i个点的类别

classCount[oneVote] = classCount.get(oneVote, 0) + 1

sortedClassCount = sorted(classCount.iteritems(),\

key=operator.itemgetter(1), reverse=True)

#print "the sample :

return sortedClassCount[0][0]

def classify(self, test_X, train_X, train_y):

results = []

#数据类型检测

if isinstance(test_X, np.ndarray) and isinstance(train_X, np.ndarray) \

and isinstance(train_y, np.ndarray):

pass

else:

try:

test_X = np.array(test_X)d = len(np.shape(test_X))

if d == 1:

sample = test_X

result = self._classify(sample, train_X, train_y)

results.append(result)

else:

for i in range(len(test_X)):

sample = test_X[i]

result = self._classify(sample, train_X, train_y)

results.append(result)

return results

if__name__=="__main__":

train_X = [[1, 2, 0, 1, 0],

[0, 1, 1, 0, 1],

[1, 0, 0, 0, 1],

[2, 1, 1, 0, 1],

[1, 1, 0, 1, 1]]

train_y = [1, 1, 0, 0, 0]

clf = KNNClassifier(k = 3)

sample = [[1,2,0,1,0],[1,2,0,1,1]]

result = clf.classify(sample, train_X, train_y)

View Code

第⼆部分:KNN测试代码

# -*- coding: utf-8 -*-

"""

Created on Mon Feb 22 13:21:22 2016

K-NearestNeighbor

"""

import numpy as np

import operator

class KNNClassifier():

"""This is a Nearest Neighbor classifier. """

#定义k的值

def__init__(self, k=3):

self._k = k

#计算新样本与已知分类样本的距离并从⼩到⼤排列

def _calEDistance(self, inSample, dataset):

m = dataset.shape[0]

diffMat = np.tile(inSample, (m,1)) - dataset

sqDiffMat = diffMat**2 #每个元素平⽅

sqDistances = sqDiffMat.sum(axis = 1) #求和

distances = sqDistances ** 0.5 #开根号

return distances.argsort() #按距离的从⼩到达排列的下标值

def _classify0(self, inX, dataSet, labels):

k = self._k

dataSetSize = dataSet.shape[0]

diffMat = np.tile(inX, (dataSetSize,1)) - dataSet

sqDiffMat = diffMat**2

sqDistances = sqDiffMat.sum(axis=1)

distances = sqDistances**0.5

sortedDistIndicies = distances.argsort()

classCount={}

for i in range(k):

voteIlabel = labels[sortedDistIndicies[i]]

classCount[voteIlabel] = classCount.get(voteIlabel,0) + 1

sortedClassCount = sorted(classCount.iteritems(), key=operator.itemgetter(1), reverse=True) return sortedClassCount[0][0]

#对⼀个样本进⾏分类

def _classify(self, sample, train_X, train_y):

#数据类型检测

if isinstance(sample, np.ndarray) and isinstance(train_X, np.ndarray) \

and isinstance(train_y, np.ndarray):

pass

else:

try:

sample = np.array(sample)sortedDistances = self._calEDistance(sample, train_X)

classCount = {}

for i in range(self._k):

oneVote = train_y[sortedDistances[i]] #获取最近的第i个点的类别

classCount[oneVote] = classCount.get(oneVote, 0) + 1

sortedClassCount = sorted(classCount.iteritems(),\

key=operator.itemgetter(1), reverse=True)

#print "the sample :

return sortedClassCount[0][0]

def classify(self, test_X, train_X, train_y):

results = []

#数据类型检测

if isinstance(test_X, np.ndarray) and isinstance(train_X, np.ndarray) \

and isinstance(train_y, np.ndarray):

pass

else:

try:

test_X = np.array(test_X)

train_X = np.array(train_X)

train_y = np.array(train_y)

except:

raise TypeError("numpy.ndarray required for train_X and ..")

d = len(np.shape(test_X))

if d == 1:

sample = test_X

result = self._classify(sample, train_X, train_y)

results.append(result)

else:

for i in range(len(test_X)):

sample = test_X[i]

result = self._classify(sample, train_X, train_y)

results.append(result)

return results

if__name__=="__main__":

train_X = [[1, 2, 0, 1, 0],

[0, 1, 1, 0, 1],

[1, 0, 0, 0, 1],

[2, 1, 1, 0, 1],

[1, 1, 0, 1, 1]]

train_y = [1, 1, 0, 0, 0]

clf = KNNClassifier(k = 3)

sample = [[1,2,0,1,0],[1,2,0,1,1]]

result = clf.classify(sample, train_X, train_y)

View Code

Python 决策树算法(ID3 &C4.5)

决策树(Decision Tree)算法:按照样本的属性逐步进⾏分类,为了能够使分类更快、更有效。每⼀个新分类属性的选择依据可以是信息增益IG和信息增益率IGR,前者为最基本的ID3算法,后者为改进后的C4.5算法。

以ID3为例,其训练过程的编程思路如下:

(1)输⼊x、y(x为样本,y为label),⾏为样本,列为样本特征。

(2)计算信息增益IG,获取使IG最⼤的特征。

(3)获得删除最佳分类特征后的样本阵列。

(4)按照最佳分类特征的属性值将更新后的样本进⾏归类。

属性值1(x1,y1)属性值2(x2,y2)属性值(x3,y3)

(5)分别对以上类别重复以上操作直⾄到达叶节点(递归调⽤)。

叶节点的特征:

(1)所有的标签值y都⼀样。

(2)没有特征可以继续划分。

(2)从根节点开始递归遍历整个决策树直到到达叶节点为⽌。

以下为具体代码,训练后的决策树结构为递归套⽤的字典,其是由特征值组成的索引加上label组成的。# -*- coding: utf-8 -*-

"""

Created on Mon Nov 07 09:06:37 2016

@author: yehx

"""

# -*- coding: utf-8 -*-

"""

Created on Sun Feb 21 12:17:10 2016

Decision Tree Source Code

@author: liudiwei

"""

import os

import numpy as np

class DecitionTree():

"""This is a decision tree classifier. """

def__init__(self, criteria='ID3'):

self._tree = None

if criteria == 'ID3'or criteria == 'C4.5':

self._criteria = criteria

else:

raise Exception("criterion should be ID3 or C4.5")

def _calEntropy(slef, y):

'''

功能:_calEntropy⽤于计算⾹农熵 e=-sum(pi*log pi)

参数:其中y为数组array

输出:信息熵entropy

'''

n = y.shape[0]

labelCounts = {}

for label in y:

if label not in labelCounts.keys():

labelCounts[label] = 1

else:

labelCounts[label] += 1

entropy = 0.0

for key in labelCounts:

prob = float(labelCounts[key])/n

entropy -= prob * np.log2(prob)

return entropy

def _splitData(self, X, y, axis, cutoff):

"""

参数:X为特征,y为label,axis为某个特征的下标,cutoff是下标为axis特征取值值

输出:返回数据集中特征下标为axis,特征值等于cutoff的⼦数据集

先将特征列从样本矩阵⾥除去,然后将属性值为cutoff的数据归为⼀类

"""

ret = []

featVec = X[:,axis]

n = X.shape[1] #特征个数

#除去第axis列特征后的样本矩阵

X = X[:,[i for i in range(n) if i!=axis]]

for i in range(len(featVec)):

if featVec[i] == cutoff:

ret.append(i)

return X[ret, :], y[ret]

def _chooseBestSplit(self, X, y):

"""ID3 & C4.5

参数:X为特征,y为label

功能:根据信息增益或者信息增益率来获取最好的划分特征

输出:返回最好划分特征的下标

"""

numFeat = X.shape[1]

baseEntropy = self._calEntropy(y)

bestSplit = 0.0

best_idx = -1

for i in range(numFeat):

featlist = X[:,i] #得到第i个特征对应的特征列

uniqueVals = set(featlist)prob = len(sub_y)/float(len(y)) #计算某个特征的某个值的概率

curEntropy += prob * self._calEntropy(sub_y) #迭代计算条件熵

splitInfo -= prob * np.log2(prob) #信息,⽤于计算信息增益率

IG = baseEntropy - curEntropy

if self._criteria=="ID3":

if IG > bestSplit:

bestSplit = IG

best_idx = i

if self._criteria=="C4.5":

if splitInfo == 0.0:

pass

IGR = IG/splitInfo

if IGR > bestSplit:

bestSplit = IGR

best_idx = i

return best_idx

def _majorityCnt(self, labellist):

"""

参数:labellist是类标签,序列类型为list

输出:返回labellist中出现次数最多的label

"""

labelCount={}

for vote in labellist:

if vote not in labelCount.keys():

labelCount[vote] = 0

labelCount[vote] += 1

sortedClassCount = sorted(labelCount.iteritems(), key=lambda x:x[1], \

reverse=True)

return sortedClassCount[0][0]

def _createTree(self, X, y, featureIndex):

"""

参数:X为特征,y为label,featureIndex类型是元组,记录X特征在原始数据中的下标输出:根据当前的featureIndex创建⼀颗完整的树

"""

labelList = list(y)

#如果所有的标签都⼀样(叶节点),直接返回标签

if labelList.count(labelList[0]) == len(labelList):

return labelList[0]

#如果没有特征可以继续划分,那么将所有的label归为⼤多数的⼀类,并返回标签if len(featureIndex) == 0:

return self._majorityCnt(labelList)

#返回最佳分类特征的下标

bestFeatIndex = self._chooseBestSplit(X,y)

#返回最佳分类特征的索引

bestFeatAxis = featureIndex[bestFeatIndex]

featureIndex = list(featureIndex)

#获得删除最佳分类特征索引后的列表

featureIndex.remove(bestFeatAxis)

featureIndex = tuple(featureIndex)

myTree = {bestFeatAxis:{}}

featValues = X[:, bestFeatIndex]

uniqueVals = set(featValues)

for value in uniqueVals:

#对每个value递归地创建树

sub_X, sub_y = self._splitData(X,y, bestFeatIndex, value)

myTree[bestFeatAxis][value] = self._createTree(sub_X, sub_y, \

featureIndex)

return myTree

def fit(self, X, y):

"""

参数:X是特征,y是类标签

注意事项:对数据X和y进⾏类型检测,保证其为array

输出:self本⾝

"""

if isinstance(X, np.ndarray) and isinstance(y, np.ndarray):

pass

else:

try:

X = np.array(X)

y = np.array(y)

except:

raise TypeError("numpy.ndarray required for X,y")

featureIndex = tuple(['x'+str(i) for i in range(X.shape[1])])

self._tree = self._createTree(X,y,featureIndex)

return self #allow using: clf.fit().predict()"""

featIndex = tree.keys()[0] #得到数的根节点值

secondDict = tree[featIndex] #得到以featIndex为划分特征的结果

axis=featIndex[1:] #得到根节点特征在原始数据中的下标

key = sample[int(axis)] #获取待分类样本中下标为axis的值

valueOfKey = secondDict[key] #获取secondDict中keys为key的value值

if type(valueOfKey).__name__=='dict': #如果value为dict,则继续递归分类

return self._classify(valueOfKey, sample)

else:

return valueOfKey

def predict(self, X):

if self._tree==None:

raise NotImplementedError("Estimator not fitted, call `fit` first")

#对X的类型进⾏检测,判断其是否是数组

if isinstance(X, np.ndarray):

pass

else:

try:

X = np.array(X)

except:

raise TypeError("numpy.ndarray required for X")

if len(X.shape) == 1:

return self._classify(self._tree, X)

else:

result = []

for i in range(X.shape[0]):

value = self._classify(self._tree, X[i])

print str(i+1)+"-th sample is classfied as:

result.append(value)

return np.array(result)

def show(self, outpdf):

if self._tree==None:

pass

#plot the tree using matplotlib

import treePlotter

treePlotter.createPlot(self._tree, outpdf)

if__name__=="__main__":

trainfile=r"data\rain.txt"

testfile=r"data\est.txt"

import sys

sys.path.append(r"F:\\CSU\\Github\\MachineLearning\\lib")

import dataload as dload

train_x, train_y = dload.loadData(trainfile)

test_x, test_y = dload.loadData(testfile)

clf = DecitionTree(criteria="C4.5")

clf.fit(train_x, train_y)

result = clf.predict(test_x)

outpdf = r"tree.pdf"

clf.show(outpdf)

Python K均值聚类

Python K均值聚类是⼀种⽆监督的机器学习算法,能够实现⾃动归类的功能。

算法步骤如下:

(1)随机产⽣K个分类中⼼,⼀般称为质⼼。

(2)将所有样本划分到距离最近的质⼼代表的分类中。(距离可以是欧⽒距离、曼哈顿距离、夹⾓余弦等)(3)计算分类后的质⼼,可以⽤同⼀类中所有样本的平均属性来代表新的质⼼。

(4)重复(2)(3)两步,直到满⾜以下其中⼀个条件:

3)迭代总数达到设置的最⼤值。

常见的K均值聚类算法还有2分K均值聚类算法,其步骤如下:

(1)将所有样本作为⼀类。

(2)按照传统K均值聚类的⽅法将样本分为两类。

(3)对以上两类分别再分为两类,且分别计算两种情况下误差,仅保留误差更⼩的分类;即第(2)步产⽣的两类其中⼀类保留,另⼀类进⾏再次分类。

(4)重复对已有类别分别进⾏⼆分类,同理保留误差最⼩的分类,直到达到所需要的分类数⽬。

具体Python代码如下:

# -*- coding: utf-8 -*-

"""

Created on Tue Nov 08 14:01:44 2016

K - means cluster

"""

import numpy as np

class KMeansClassifier():

"this is a k-means classifier"

def__init__(self, k=3, initCent='random', max_iter=500 ):

self._k = k

self._initCent = initCent

self._max_iter = max_iter

self._clusterAssment = None

self._labels = None

self._sse = None

def _calEDist(self, arrA, arrB):

"""

功能:欧拉距离距离计算

输⼊:两个⼀维数组

"""

return np.math.sqrt(sum(np.power(arrA-arrB, 2)))

def _calMDist(self, arrA, arrB):

"""

功能:曼哈顿距离距离计算

输⼊:两个⼀维数组

"""

return sum(np.abs(arrA-arrB))

def _randCent(self, data_X, k):

"""

功能:随机选取k个质⼼

输出:centroids #返回⼀个m*n的质⼼矩阵

"""

n = data_X.shape[1] #获取特征的维数

centroids = np.empty((k,n)) #使⽤numpy⽣成⼀个k*n的矩阵,⽤于存储质⼼

for j in range(n):

minJ = min(data_X[:, j])

rangeJ = float(max(data_X[:, j] - minJ))

#使⽤flatten拉平嵌套列表(nested list)

centroids[:, j] = (minJ + rangeJ * np.random.rand(k, 1)).flatten()

return centroids

def fit(self, data_X):

"""

输⼊:⼀个m*n维的矩阵

"""

if not isinstance(data_X, np.ndarray) or \

isinstance(data_X, np.matrixlib.defmatrix.matrix):

try:

data_X = np.asarray(data_X)

except:

raise TypeError("numpy.ndarray resuired for data_X")

m = data_X.shape[0] #获取样本的个数

#⼀个m*2的⼆维矩阵,矩阵第⼀列存储样本点所属的族的索引值,

#第⼆列存储该点与所属族的质⼼的平⽅误差

self._clusterAssment = np.zeros((m,2))if self._initCent == 'random':

self._centroids = self._randCent(data_X, self._k)

clusterChanged = True

for _ in range(self._max_iter): #使⽤"_"主要是因为后⾯没有⽤到这个值

clusterChanged = False

for i in range(m): #将每个样本点分配到离它最近的质⼼所属的族

minDist = np.inf #⾸先将minDist置为⼀个⽆穷⼤的数

minIndex = -1 #将最近质⼼的下标置为-1

for j in range(self._k): #次迭代⽤于寻找最近的质⼼

arrA = self._centroids[j,:]

arrB = data_X[i,:]

distJI = self._calEDist(arrA, arrB) #计算误差值

if distJI < minDist:

minDist = distJI

minIndex = j

if self._clusterAssment[i,0] !=minIndex:

clusterChanged = True

self._clusterAssment[i,:] = minIndex, minDist**2

if not clusterChanged:#若所有样本点所属的族都不改变,则已收敛,结束迭代

break

for i in range(self._k):#更新质⼼,将每个族中的点的均值作为质⼼

index_all = self._clusterAssment[:,0] #取出样本所属簇的索引值

value = np.nonzero(index_all==i) #取出所有属于第i个簇的索引值

ptsInClust = data_X[value[0]] #取出属于第i个簇的所有样本点

self._centroids[i,:] = np.mean(ptsInClust, axis=0) #计算均值

self._labels = self._clusterAssment[:,0]

self._sse = sum(self._clusterAssment[:,1])

def predict(self, X):#根据聚类结果,预测新输⼊数据所属的族

#类型检查

if not isinstance(X,np.ndarray):

try:

X = np.asarray(X)

except:

raise TypeError("numpy.ndarray required for X")

m = X.shape[0]#m代表样本数量

preds = np.empty((m,))

for i in range(m):#将每个样本点分配到离它最近的质⼼所属的族

minDist = np.inf

for j in range(self._k):

distJI = self._calEDist(self._centroids[j,:], X[i,:])

if distJI < minDist:

minDist = distJI

preds[i] = j

return preds

class biKMeansClassifier():

"this is a binary k-means classifier"

def__init__(self, k=3):

self._k = k

self._centroids = None

self._clusterAssment = None

self._labels = None

self._sse = None

def _calEDist(self, arrA, arrB):

"""

功能:欧拉距离距离计算

输⼊:两个⼀维数组

"""

return np.math.sqrt(sum(np.power(arrA-arrB, 2)))

def fit(self, X):

m = X.shape[0]

self._clusterAssment = np.zeros((m,2))

centroid0 = np.mean(X, axis=0).tolist()

centList =[centroid0]

for j in range(m):#计算每个样本点与质⼼之间初始的平⽅误差

self._clusterAssment[j,1] = self._calEDist(np.asarray(centroid0), \

X[j,:])**2

while (len(centList) < self._k):

lowestSSE = np.inf

#尝试划分每⼀族,选取使得误差最⼩的那个族进⾏划分

for i in range(len(centList)):

index_all = self._clusterAssment[:,0] #取出样本所属簇的索引值

value = np.nonzero(index_all==i) #取出所有属于第i个簇的索引值ptsInCurrCluster = X[value[0],:] #取出属于第i个簇的所有样本点

clf = KMeansClassifier(k=2)

clf.fit(ptsInCurrCluster)

#划分该族后,所得到的质⼼、分配结果及误差矩阵

centroidMat, splitClustAss = clf._centroids, clf._clusterAssment

sseSplit = sum(splitClustAss[:,1])

index_all = self._clusterAssment[:,0]

value = np.nonzero(index_all==i)

sseNotSplit = sum(self._clusterAssment[value[0],1])

if (sseSplit + sseNotSplit) < lowestSSE:

bestCentToSplit = i

bestNewCents = centroidMat

bestClustAss = splitClustAss.copy()

lowestSSE = sseSplit + sseNotSplit

#该族被划分成两个⼦族后,其中⼀个⼦族的索引变为原族的索引

#另⼀个⼦族的索引变为len(centList),然后存⼊centList

bestClustAss[np.nonzero(bestClustAss[:,0]==1)[0],0]=len(centList)

bestClustAss[np.nonzero(bestClustAss[:,0]==0)[0],0]=bestCentToSplit

centList[bestCentToSplit] = bestNewCents[0,:].tolist()

centList.append(bestNewCents[1,:].tolist())

self._clusterAssment[np.nonzero(self._clusterAssment[:,0] == \

bestCentToSplit)[0],:]= bestClustAss

self._labels = self._clusterAssment[:,0]

self._sse = sum(self._clusterAssment[:,1])

self._centroids = np.asarray(centList)

def predict(self, X):#根据聚类结果,预测新输⼊数据所属的族

#类型检查

if not isinstance(X,np.ndarray):

try:

X = np.asarray(X)

except:

raise TypeError("numpy.ndarray required for X")

m = X.shape[0]#m代表样本数量

preds = np.empty((m,))

for i in range(m):#将每个样本点分配到离它最近的质⼼所属的族

minDist = np.inf

for j in range(self._k):

distJI = self._calEDist(self._centroids[j,:],X[i,:])

if distJI < minDist:

minDist = distJI

preds[i] = j

return preds

Python股票历史涨跌幅数据获取

股票涨跌幅数据是量化投资学习的基本数据资料之⼀,下⾯以Python代码编程为⼯具,获得所需要的历史数据。主要步骤有:(1) #按照市值从⼩到⼤的顺序活得N⽀股票的代码;

(2) #分别对这⼀百只股票进⾏100⽀股票操作;

(3) #获取从2016.05.01到2016.11.17的涨跌幅数据;

(4) #选取记录⼤于40个的数据,去除次新股;

(5) #将⽂件名名为“股票代码.csv”。

具体代码如下:

# -*- coding: utf-8 -*-

"""

Created on Thu Nov 17 23:04:33 2016

获取股票的历史涨跌幅,并分别存为csv格式

@author: yehx

"""

import numpy as np

import pandas as pd

#按照市值从⼩到⼤的顺序活得100⽀股票的代码

df = get_fundamentals(query(fundamentals.eod_derivative_indicator.market_cap)

.order_by(fundamentals.eod_derivative_indicator.market_cap.asc())

.limit(100),'2016-11-17', '1y'

)

#分别对这⼀百只股票进⾏100⽀股票操作

#获取从2016.05.01到2016.11.17的涨跌幅数据

#选取记录⼤于40个的数据,去除次新股

#将⽂件名名为“股票代码.csv”

for stock in range(100):

priceChangeRate = get_price_change_rate(df['market_cap'].columns[stock], '20160501', '20161117')

if priceChangeRate is None:

openDays = 0

else:

openDays = len(priceChangeRate)

if openDays > 40:

tempPrice = priceChangeRate[39:(openDays - 1)]

for rate in range(len(tempPrice)):

tempPrice[rate] = "%.3f" %tempPrice[rate]

fileName = ''

fileName = fileName.join(df['market_cap'].columns[i].split('.')) + '.csv'

fileName

tempPrice.to_csv(fileName)

Python Logistic 回归分类

Logistic回归可以认为是线性回归的延伸,其作⽤是对⼆分类样本进⾏训练,从⽽对达到预测新样本分类的⽬的。

假设有⼀组已知分类的MxN维样本X,M为样本数,N为特征维度,其相应的已知分类标签为Mx1维矩阵Y。那么Logistic回归的实现思路如下:

(1)⽤⼀组权重值W(Nx1)对X的特征进⾏线性变换,得到变换后的样本X’(Mx1),其⽬标是使属于不同分类的样本X’存在⼀个明显的⼀维边界。

(2)然后再对样本X’进⼀步做函数变换,从⽽使处于⼀维边界两测的值变换到相应的范围之内。

(3)训练过程就是通过改变W尽可能使得到的值位于⼀维边界两侧,并且与已知分类相符。

(4)对于Logistic回归,就是将原样本的边界变换到x=0这个边界。

下⾯是Logistic回归的典型代码:

# -*- coding: utf-8 -*-

"""

Created on Wed Nov 09 15:21:48 2016

Logistic回归分类

"""

import numpy as np

class LogisticRegressionClassifier():

def__init__(self):

self._alpha = None

#定义⼀个sigmoid函数

def _sigmoid(self, fx):

return 1.0/(1 + np.exp(-fx))

#alpha为步长(学习率);maxCycles最⼤迭代次数

def _gradDescent(self, featData, labelData, alpha, maxCycles):

dataMat = np.mat(featData) #size: m*n

labelMat = np.mat(labelData).transpose() #size: m*1

m, n = np.shape(dataMat)

weigh = np.ones((n, 1))

for i in range(maxCycles):

hx = self._sigmoid(dataMat * weigh)

error = labelMat - hx #size:m*1

weigh = weigh + alpha * dataMat.transpose() * error#根据误差修改回归系数

return weigh

#使⽤梯度下降⽅法训练模型,如果使⽤其它的寻参⽅法,此处可以做相应修改

def fit(self, train_x, train_y, alpha=0.01, maxCycles=100):

return self._gradDescent(train_x, train_y, alpha, maxCycles)

#使⽤学习得到的参数进⾏分类

def predict(self, test_X, test_y, weigh):

dataMat = np.mat(test_X)

labelMat = np.mat(test_y).transpose() #使⽤transpose()转置

hx = self._sigmoid(dataMat*weigh) #size:m*1m = len(hx)

error = 0.0

for i in range(m):

if int(hx[i]) > 0.5:

print str(i+1)+'-th sample ', int(labelMat[i]), 'is classfied as: 1'

if int(labelMat[i]) != 1:

error += 1.0

print"classify error."

else:

print str(i+1)+'-th sample ', int(labelMat[i]), 'is classfied as: 0'

if int(labelMat[i]) != 0:

error += 1.0

print"classify error."

error_rate = error/m

print"error rate is:

return error_rate

View Code

Python 朴素贝叶斯(Naive Bayes)分类

Naïve Bayes 分类的核⼼是计算条件概率P(y|x),其中y为类别,x为特征向量。其意义是在x样本出现时,它被划分为y类的可能性(概率)。通过计算不同分类下的概率,进⽽把样本划分到概率最⼤的⼀类。

根据条件概率的计算公式可以得到:

P(y|x) = P(y)*P(x|y)/P(x)。

由于在计算不同分类概率是等式右边的分母是相同的,所以只需⽐较分⼦的⼤⼩。并且,如果各个样本特征是独⽴分布的,那么p(x |y)等于p(xi|y)相乘。

下⾯以⽂本分类来介绍Naïve Bayes分类的应⽤。其思路如下:

(1)建⽴词库,即⽆重复的单词表。

(2)分别计算词库中类别标签出现的概率P(y)。

(3)分别计算各个类别标签下不同单词出现的概率P(xi|y)。

(4)在不同类别下,将待分类样本各个特征出现概率((xi|y)相乘,然后在乘以对应的P(y)。

(5)⽐较不同类别下(4)中结果,将待分类样本分到取值最⼤的类别。

下⾯是Naïve Bayes ⽂本分类的Python代码,其中为了⽅便计算,程序中借助log对数函数将乘法转化为了加法。

# -*- coding: utf-8 -*-

"""

Created on Mon Nov 14 11:15:47 2016

Naive Bayes Clssification

"""

# -*- coding: utf-8 -*-

import numpy as np

class NaiveBayes:

def__init__(self):

self._creteria = "NB"

def _createVocabList(self, dataList):

"""

创建⼀个词库向量

"""

vocabSet = set([])

for line in dataList:

print set(line)

vocabSet = vocabSet | set(line)

return list(vocabSet)

#⽂档词集模型

def _setOfWords2Vec(self, vocabList, inputSet):

"""

功能:根据给定的⼀⾏词,将每个词映射到此库向量中,出现则标记为1,不出现则为0

"""

outputVec = [0] * len(vocabList)

for word in inputSet:if word in vocabList:

outputVec[vocabList.index(word)] = 1

else:

print"the word:%s is not in my vocabulary!" % word

return outputVec

# 修改 _setOfWordsVec ⽂档词袋模型

def _bagOfWords2VecMN(self, vocabList, inputSet):

"""

功能:对每⾏词使⽤第⼆种统计策略,统计单个词的个数,然后映射到此库中

输出:⼀个n维向量,n为词库的长度,每个取值为单词出现的次数

"""

returnVec = [0]*len(vocabList)

for word in inputSet:

if word in vocabList:

returnVec[vocabList.index(word)] += 1 # 更新此处代码

return returnVec

def _trainNB(self, trainMatrix, trainLabel):

"""

输⼊:训练矩阵和类别标签,格式为numpy矩阵格式

功能:计算条件概率和类标签概率

"""

numTrainDocs = len(trainMatrix) #统计样本个数

numWords = len(trainMatrix[0]) #统计特征个数,理论上是词库的长度

pNeg = sum(trainLabel)/float(numTrainDocs) #计算负样本出现的概率

p0Num = np.ones(numWords) #初始样本个数为1,防⽌条件概率为0,影响结果

p1Num = np.ones(numWords) #作⽤同上

p0InAll = 2.0 #词库中只有两类,所以此处初始化为2(use laplace)

p1InAll = 2.0

# 再单个⽂档和整个词库中更新正负样本数据

for i in range(numTrainDocs):

if trainLabel[i] == 1:

p1Num += trainMatrix[i]

p1InAll += sum(trainMatrix[i])

else:

p0Num += trainMatrix[i]

p0InAll += sum(trainMatrix[i])

print p1InAll

#计算给定类别的条件下,词汇表中单词出现的概率

#然后取log对数,解决条件概率乘积下溢

p0Vect = np.log(p0Num/p0InAll) #计算类标签为0时的其它属性发⽣的条件概率

p1Vect = np.log(p1Num/p1InAll) #log函数默认以e为底 #p(ci|w=0)

return p0Vect, p1Vect, pNeg

def _classifyNB(self, vecSample, p0Vec, p1Vec, pNeg):

"""

使⽤朴素贝叶斯进⾏分类,返回结果为0/1

"""

prob_y0 = sum(vecSample * p0Vec) + np.log(1-pNeg)

prob_y1 = sum(vecSample * p1Vec) + np.log(pNeg) #log是以e为底

if prob_y0 < prob_y1:

return 1

else:

return 0

# 测试NB算法

def testingNB(self, testSample):

listOPosts, listClasses = loadDataSet()

myVocabList = self._createVocabList(listOPosts)

# print myVocabList

trainMat=[]

for postinDoc in listOPosts:

trainMat.append(self._bagOfWords2VecMN(myVocabList, postinDoc))

p0V,p1V,pAb = self._trainNB(np.array(trainMat), np.array(listClasses))

print trainMat

thisSample = np.array(self._bagOfWords2VecMN(myVocabList, testSample))

result = self._classifyNB(thisSample, p0V, p1V, pAb)

print testSample,'classified as: ', result

return result

############################################################################### def loadDataSet():

wordsList=[['my', 'dog', 'has', 'flea', 'problems', 'help', 'please'],

['maybe', 'not', 'take', 'him', 'to', 'dog', 'park', 'stupid'],

['my', 'dalmation', 'is', 'so', 'cute', ' and', 'I', 'love', 'him'],

['stop', 'posting', 'stupid', 'worthless', 'garbage'],

['mr', 'licks','ate','my', 'steak', 'how', 'to', 'stop', 'him'],

['quit', 'buying', 'worthless', 'dog', 'food', 'stupid']]

classLable = [0,1,0,1,0,1] # 0:good; 1:bad

return wordsList, classLable

if__name__=="__main__":

clf = NaiveBayes()

testEntry = [['love', 'my', 'girl', 'friend'],

['stupid', 'garbage'],

['Haha', 'I', 'really', "Love

['This', 'is', "my

clf.testingNB(testEntry[0])

# for item in testEntry:

# clf.testingNB(item)

View Code

Python股票历史数据预处理(⼀)

具体步骤如下:

(1) 建⽴股票池,这⾥按照股本⼤⼩来作为选择依据。

(2) 分别读取股票池中所有股票的历史涨跌幅。

(3) 将各⽀股票的历史涨跌幅存到DataFrame结构变量中,每⼀列代表⼀⽀股票,对于在指定时间内还没有发⾏的股票的涨跌幅设置为

0。

(4) 将DataFrame最后⼀⾏的数值设置为各⽀股票对应的交易天数。

(5) 将DataFrame数据存到csv⽂件中去。

具体代码如下:

# -*- coding: utf-8 -*-

"""

Created on Thu Nov 17 23:04:33 2016

获取股票的历史涨跌幅,先合并为DataFrame后存为csv格式

@author: yehx

"""

import numpy as np

import pandas as pd

#按照市值从⼩到⼤的顺序获得50⽀股票的代码

df = get_fundamentals(

query(fundamentals.eod_derivative_indicator.market_cap)

.order_by(fundamentals.eod_derivative_indicator.market_cap.asc())

.limit(50),'2016-11-17', '1y'

)

b1= {}

priceChangeRate_300 = get_price_change_rate('000300.XSHG', '20060101', '20161118')

df300 = pd.DataFrame(priceChangeRate_300)

lenReference = len(priceChangeRate_300)

dfout = df300

dflen = pd.DataFrame()

dflen['000300.XSHG'] = [lenReference]

#分别对这⼀百只股票进⾏50⽀股票操作

#获取从2006.01.01到2016.11.17的涨跌幅数据

#将数据存到DataFrame中

#DataFrame存为csv⽂件

for stock in range(50):

priceChangeRate = get_price_change_rate(df['market_cap'].columns[stock], '20150101', '20161118')

if priceChangeRate is None:

openDays = 0

else:

openDays = len(priceChangeRate)

dftempPrice = pd.DataFrame(priceChangeRate)

tempArr = []

for i in range(lenReference):

if df300.index[i] in list(dftempPrice.index):

#保存为4位有效数字

tempArr.append( "%.4f" %((dftempPrice.loc[str(df300.index[i])][0])))

pass

else:

tempArr.append(float(0.0))

fileName = ''

fileName = fileName.join(df['market_cap'].columns[stock].split('.'))dfout[fileName] = tempArr

dflen[fileName] = [len(priceChangeRate)]

dfout = dfout.append(dflen)

dfout.to_csv('00050.csv')

Python股票历史数据预处理(⼆)

主要步骤有(Python csv数据读写):

#csv⽂件读取股票历史涨跌幅数据;

#随机选取30个历史涨跌幅数据;

#构建⾃⼰的数据库;

#将处理结果保存为新的csv⽂件。

具体代码如下:

# -*- coding: utf-8 -*-

"""

Created on Thu Nov 17 23:04:33 2016

csv格式股票历史涨跌幅数据处理

@author: yehx

"""

import numpy as np

import pandas as pd

import random

import csv

import sys

reload(sys)

sys.setdefaultencoding('utf-8')

'''

- 加载csv格式数据

'''

def loadCSVfile1(datafile):

filelist = []

with open(datafile) as file:

lines = csv.reader(file)

for oneline in lines:

filelist.append(oneline)

filelist = np.array(filelist)

return filelist

#数据处理

#随机选取30个历史涨跌幅数据

#构建⾃⼰的数据库

def dataProcess(dataArr, subLen):

totLen, totWid = np.shape(data)

print totLen, totWid

lenArr = dataArr[totLen-1,2:totWid]

columnCnt = 1

dataOut = []

for lenData in lenArr:

columnCnt = columnCnt + 1

N60 = int(lenData) / (2 * subLen)

print N60

if N60 > 0:

randIndex = random.sample(range(totLen-int(lenData)-1,totLen-subLen), N60) for i in randIndex:

dataOut.append(dataArr[i:(i+subLen),columnCnt])

dataOut = np.array(dataOut)

return dataOut

if__name__=="__main__":

datafile = "00100 (3).csv"

data = loadCSVfile1(datafile)

df = pd.DataFrame(data)

m, n = np.shape(data)dataOut = dataProcess(data, 30) m, n = np.shape(dataOut)

#保存处理结果

csvfile = file('csvtest.csv', 'wb') writer = csv.writer(csvfile)

writer.writerows(dataOut)

csvfile.close()下载本文

显示全文
专题