数据读写
csv
- read
import csv
import sys
fileName = \'ch02-data.csv\'
data = []
try:
with open(fileName) as f:
# 读取csv数据
reader = csv.reader(f)
# 获取文件头
header = next(reader)
# 获取文件体
data = [row for row in reader]
# 第二种写法
# reader = csv.reader(f)
# c = 0
# for row in reader:
# if c == 0:
# header = row
# else:
# data.append(row)
# c += 1
except csv.Error as e:
print("读取csv文件错误所在行:%s,错误内容:%s" % (reader.line_num,e))
sys.exit(-1)
if header:
print(header)
print("=========")
for datarow in data:
print(datarow)
注意:
加载大数据文件可使用
# 速度快
numpy.loadtxt(\'ch02-data.csv\', dtype = \'string\', delimiter=\',\')
# 速读稍慢,但更好地处理缺失数据
numpy.genformtxt()
- write
import csv
filename = \'ch02-data-write.csv\'
# we open file with \'b\' flag
# for compatibility with non-linux users
with open(filename,\'wb\') as f:
writer = csv.writer(f)
for row in range(10):
writer.writerow([row + 1, \'2012-01-%s\' % (19 + row)])
excel
使用openpyxl
from openpyxl import load_workbook
file = \'ch02-xlsxdata.xlsx\'
wb = load_workbook(filename=file)
ws = wb.get_sheet_by_name(\'Sheet1\')
dataset = []
for r in ws.rows:
col = []
for c in r:
col.append(c.value)
dataset.append(col)
from pprint import pprint
pprint(dataset)
使用xlrd
import xlrd
file = \'data/ch02-xlsxdata.xlsx\'
# 读取xlsx文件,创建xlrd_book实例
wb = xlrd.open_workbook(filename=file)
# 通过表名获取工作表
# ws = wb.sheet_by_name(\'Sheet1\')
# 通过索引获得工作表
ws = wb.sheet_by_index(0)
dataSet = []
for r in range(ws.nrows):
col = []
for c in range(ws.ncols):
col.append(ws.cell(r, c).value)
dataSet.append(col)
# 美观打印数据结构
from pprint import pprint
pprint(dataSet)
xlrd检查日期类型
import xlrd
from xlrd.xldate import XLDateAmbiguous
file = \'ch02-xlsxdata.xlsx\'
wb = xlrd.open_workbook(filename=file)
ws = wb.sheet_by_name(\'Sheet1\')
dataset = []
for r in range(ws.nrows):
col = []
for c in range(ws.ncols):
col.append(ws.cell(r, c).value)
if ws.cell_type(r, c) == xlrd.XL_CELL_DATE:
try:
print ws.cell_type(r, c)
from datetime import datetime
date_value = xlrd.xldate_as_tuple(ws.cell(r, c).value, wb.datemode)
print datetime(*date_value)
except XLDateAmbiguous as e:
print e
dataset.append(col)
from pprint import pprint
pprint(dataset)
仅加载部分内容到内存
book = open_workbook(\'large.xls\', on_demand=True)
fixedWidth
事件的日志文件和基于时间序列的文件是数据可是话中最常见的数据源。有时可以以制表符分隔数据这种CSV格式来读取,但有时它们不是通过任何特殊字符来分隔的。实际上,这些文件中的字段是有固定宽度的,可以通过格式来匹配并提取数据
方法一:逐行读取,然后用字符串操作方法把字符串分隔成独立部分。若性能不是问题可以作为优选
方法二:若性能重要或文件较大(几百兆),用python中的struct模块(C实现)能提升性能。
步骤:
1.指定要读取的数据文件
2.定义数据读取的方式
3.逐行读取文件并根据格式把每行解析成单独的数据字段
4.按单独数据字段的形式打印每一行
实现
import struct
import string
datafile = \'ch02-fixed-width-1M.data\'
# 定义定宽数据格式
mask = \'9s14s5s\'
print \'formatstring {!r}, record size: {}\'.format(mask, struct.calcsize(mask))
with open(datafile, \'r\') as f:
for line in f:
fields = struct.Struct(mask).unpack_from(line)
print \'fields:\', [field.strip() for field in fields]
tab
- read
制表符分割的文件
import csv
import sys
fileName = \'ch02-data.csv\'
data = []
try:
with open(fileName) as f:
# 读取csv数据
reader = csv.reader(f)
# 获取文件头
header = next(reader)
# 获取文件体
data = [row for row in reader]
# 第二种写法
# reader = csv.reader(f)
# c = 0
# for row in reader:
# if c == 0:
# header = row
# else:
# data.append(row)
# c += 1
except csv.Error as e:
print("读取csv文件错误所在行:%s,错误内容:%s" % (reader.line_num,e))
sys.exit(-1)
if header:
print(header)
print("=========")
for datarow in data:
print(datarow)
对脏数据进行简单处理
datafile = \'ch02-data-dirty.tab\'
with open(datafile, \'r\') as f:
for line in f:
# remove next comment to see line before cleanup
# print \'DIRTY: \', line.split(\'\t\')
# we remove any space in line start or end
line = line.strip()
# now we split the line by tab delimiter
print line.split(\'\t\')
- write
import csv
filename = \'ch02-data-write.tab\'
# we open file with \'b\' flag
# for compatibility with non-linux users
with open(filename,\'wb\') as f:
writer = csv.writer(f, dialect=csv.excel_tab)
for row in range(10):
writer.writerow([row + 1, \'2012-01-%s\' % (19 + row)])
json
import requests
url = "https://github.com/timeline.json"
r = requests.get(url)
# 获取response.content,解析成json并加载至json对象中
json_obj = r.json()
repos = set()
for entry in json_obj:
try:
repos.add(entry[\'repository\'][\'url\'])
except KeyError as e:
print "No Key %s. Skipping..." % (e)
from pprint import pprint
pprint(repos)
价格
from decimal import Decimal
import json
jstring = \'{"name":"prod1", "price":12.50}\'
json.loads(jsting, parse_float=Decimal)
Export
import os
import sys
import argparse
try:
import cStringIO as StringIO
except:
import StringIO
import struct
import json
import csv
def import_data(import_file):
\'\'\'
Imports data from import_file.
Expects to find fixed width row
Sample row: 161322597 0386544351896 0042
\'\'\'
mask = \'9s14s5s\'
data = []
with open(import_file, \'r\') as f:
for line in f:
# unpack line to tuple
fields = struct.Struct(mask).unpack_from(line)
# strip any whitespace for each field
# pack everything in a list and add to full dataset
data.append(list([f.strip() for f in fields]))
return data
def write_data(data, export_format):
\'\'\'
Dispatches call to a specific transformer
and returns data set.
Exception is xlsx where we have to save data in a file.
\'\'\'
if export_format == \'csv\':
return write_csv(data)
elif export_format == \'json\':
return write_json(data)
elif export_format == \'xlsx\':
return write_xlsx(data)
else:
raise Exception("Illegal format defined")
def write_csv(data):
\'\'\'
Transforms data into csv.
Returns csv as string.
\'\'\'
# Using this to simulate file IO,
# as csv can only write to files.
f = StringIO.StringIO()
writer = csv.writer(f)
for row in data:
writer.writerow(row)
# Get the content of the file-like object
return f.getvalue()
def write_json(data):
\'\'\'
Transforms data into json.
Very straightforward.
\'\'\'
j = json.dumps(data)
return j
def write_xlsx(data):
\'\'\'
Writes data into xlsx file.
\'\'\'
from xlwt import Workbook
book = Workbook()
sheet1 = book.add_sheet("Sheet 1")
row = 0
for line in data:
col = 0
for datum in line:
print datum
sheet1.write(row, col, datum)
col += 1
row += 1
# We have hard limit here of 65535 rows
# that we are able to save in spreadsheet.
if row > 65535:
print >> sys.stderr, "Hit limit of # of rows in one sheet (65535)."
break
# XLS is special case where we have to
# save the file and just return 0
f = StringIO.StringIO()
book.save(f)
return f.getvalue()
if __name__ == \'__main__\':
# parse input arguments
parser = argparse.ArgumentParser()
parser.add_argument("import_file", help="Path to a fixed-width data file.")
parser.add_argument("export_format", help="Export format: json, csv, xlsx.")
args = parser.parse_args()
if args.import_file is None:
print >> sys.stderr, "You myst specify path to import from."
sys.exit(1)
if args.export_format not in (\'csv\',\'json\',\'xlsx\'):
print >> sys.stderr, "You must provide valid export file format."
sys.exit(1)
# verify given path is accesible file
if not os.path.isfile(args.import_file):
print >> sys.stderr, "Given path is not a file: %s" % args.import_file
sys.exit(1)
# read from formated fixed-width file
data = import_data(args.import_file)
# export data to specified format
# to make this Unix-lixe pipe-able
# we just print to stdout
print write_data(data, args.export_format)
SQLite
import
将SQL文件导入数据库中
import sqlite3
import sys
# 检查是否提供了SQL文件路径
if len(sys.argv) < 2:
print "Error: You must supply at least SQL script."
print "Usage: %s table.db ./sql-dump.sql" % (sys.argv[0])
sys.exit(1)
# 数据库文件路径
script_path = sys.argv[1]
# 数据库
if len(sys.argv) == 3:
db = sys.argv[2]
else:
# if DB is not defined
# create memory database
db = ":memory:"
try:
# 数据库连接
con = sqlite3.connect(db)
with con:
# 获取游标
cur = con.cursor()
with open(script_path,\'rb\') as f:
cur.executescript(f.read())
except sqlite3.Error as err:
print "Error occured: %s" % err
read
从数据库文件读取数据
import sqlite3
import sys
# 检查数据库文件
if len(sys.argv) != 2:
print "Please specify database file."
sys.exit(1)
db = sys.argv[1]
try:
# 数据库连接
con = sqlite3.connect(db)
with con:
# 获取游标
cur = con.cursor()
query = \'SELECT ID, Name, Population FROM City ORDER BY Population DESC LIMIT 1000\'
con.text_factory = str
# 执行查询语句
cur.execute(query)
# 获取所有查询结果
resultset = cur.fetchall()
# extract column names
col_names = [cn[0] for cn in cur.description]
print "%10s %30s %10s" % tuple(col_names)
print "="*(10+1+30+1+10)
for row in resultset:
print "%10s %30s %10s" % row
except sqlite3.Error as err:
print "[ERROR]:", err
chunk
大块文件读取
import sys
filename = sys.argv[1]
with open(filename, \'rb\') as hugefile:
chunksize = 1000
readable = \'\'
# if you want to stop after certain number of blocks
# put condition in the while
while hugefile:
# if you want to start not from 1st byte
# do a hugefile.seek(skipbytes) to skip
# skipbytes of bytes from the file start
start = hugefile.tell()
print "starting at:", start
file_block = \'\' # holds chunk_size of lines
for _ in range(start, start + chunksize):
# next在文件中读取一行,然后把文件指针移动到下一行
line = hugefile.next()
file_block = file_block + line
print \'file_block\', type(file_block), file_block
readable = readable + file_block
# tell where are we in file
# file IO is usually buffered so tell()
# will not be precise for every read.
stop = hugefile.tell()
print \'readable\', type(readable), readable
print \'reading bytes from %s to %s\' % (start, stop)
print \'read bytes total:\', len(readable)
# 若想暂停读取,则取消下面的注释
# raw_input()
其他方法
并行方法如MapReduce范式,可以以低成本获得更大的处理能力和内存空间
多进程处理(multiprocessing)有时也是可行方法
stream
若数据是来自一个连续的数据源,如输入是一个类文件对象或一个远程HTTP资源,就可以从远程服务读取输入信息,并持续地解析它,然后实时地更新图表,或更新到中间队列(intermediate queue)、缓冲或者数据库
import time
import os
import sys
if len(sys.argv) != 2:
print >> sys.stderr, "Please specify filename to read"
filename = sys.argv[1]
if not os.path.isfile(filename):
print >> sys.stderr, "Given file: \"%s\" is not a file" % filename
with open(filename,\'r\') as file:
# Move to the end of file
filesize = os.stat(filename)[6]
file.seek(filesize)
# endlessly loop
while True:
# 文件指针移动至末尾
where = file.tell()
# try reading a line
line = file.readline()
# if empty, go back
if not line:
time.sleep(1)
file.seek(where)
else:
# , at the end prevents print to add newline, as readline()
# already read that.
print line,
补充
若想读取文件的最后n行
file.seek(filesize - N * avg_line_len) # avg_line_len是金丝的平均行长度(约1024)
io模块非常适合用于流处理,python从2.6开始支持,作为文件模块的替代品,python3已是默认接口
在一些复杂的数据管道中,需要启用消息队列(message queue)。到达的连续数据会放在队列里一段时间,然后才能被接收。这样做的好处是作为数据的使用者,有能力在数据过载时暂停处理。而且,把数据放在通用的消息总线(message bus)中,能够让项目中的客户使用同样的数据,同时不会干扰到我们的软件
image
在科学计算中,图像被看做n维数组。图像一般是二维数组,在Python中可被视为NumPy数组数据结构。对图像执行的一些方法及操作被看做是矩阵操作。
从矩阵操作的意义上,图像不需要总是二维的,在医疗或生物科学领域,图像是更高维度的数据结构,如3D或4D
可以使用多种方法导入图像,取决于想对图像做的操作,也取决于使用的工具的生态系统和项目所运行的平台
lena图
import scipy.misc
import matplotlib.pyplot as plt
lena = scipy.misc.lena()
plt.gray()
plt.imshow(lena)
plt.colorbar()
plt.show()
PIL
import numpy
import Image
import matplotlib.pyplot as plt
bug = Image.open(\'stinkbug.png\')
arr = numpy.array(bug.getdata(), numpy.uint8).reshape(bug.size[1], bug.size[0], 3)
plt.gray()
plt.imshow(arr)
plt.colorbar()
plt.show()
matplotlib
import matplotlib.pyplot as plt
import matplotlib.image as mpimg
import numpy as np
bugfile = \'stinkbug.png\'
# read image
bug = mpimg.imread(bugfile)
imgplot = plt.imshow(bug)
放大图像
import matplotlib.pyplot as plt
import scipy
import numpy
# because the image we loaded is RGB image,
# http://en.wikipedia.org/wiki/Grayscale#Converting_color_to_grayscale
bug = scipy.misc.imread(\'stinkbug1.png\')
# if you want to inspect the shape of the loaded image
# uncomment following line
#print bug.shape
# convert to gray
bug = bug[:,:,0]
# show original image
plt.figure()
plt.gray()
plt.subplot(121)
plt.imshow(bug)
# show \'zoomed\' region
zbug = bug[100:350,140:350]
plt.subplot(122)
plt.imshow(zbug)
plt.show()
补充
对于大图像,推荐使用numpy.memmap来做图像的内存映射,可加快操作图像的速度
import numpy
file_name = \'stinkbug.png\'
# memap返回数组对象,shape定义了数组形状
image = numpy.memmap(file_name, dtype=numpy.uint8, shape=(375, 500))
数据生成
random
随机数样本(均匀分布)
import pylab
import random
# 样本数
SAMPLE_SIZE = 100
# seed random generator
# if no argument provided uses system current time
# 初始化伪随机数生成器,这样random()方法就能产生相同的期望随机值
random.seed()
# store generated random values here
real_rand_vars = []
# we don\'t need iterator value,
# so we can put call it \'_\'
for _ in range(SAMPLE_SIZE):
# get next random value
new_value = random.random()
# 1~6
# random.randint(min, max)
# float
# random.uniform(min, max)
real_rand_vars.append(new_value)
# create histogram from data in 10 buckets
pylab.hist(real_rand_vars, 10)
# define x and y labels
pylab.xlabel("Number range")
pylab.ylabel("Count")
# show figure
pylab.show()
systemrandom
若要避免随机生成的序列重复,推荐使用random.SystemRandom(),其底层使用os.urandom。os.urandom提供了更多熵源(entropy source)的访问。若使用这个随机数生成接口,seed()和setstate()没有影响。这样一来,样本就不是可重现的了
import random
import time
print \'Default initializiation:\n\'
r1 = random.SystemRandom()
r2 = random.SystemRandom()
for i in xrange(3):
print \'%04.3f %04.3f\' % (r1.random(), r2.random())
print \'\nSame seed:\n\'
seed = time.time()
r1 = random.SystemRandom(seed)
r2 = random.SystemRandom(seed)
for i in xrange(3):
print \'%04.3f %04.3f\' % (r1.random(), r2.random())
timeseries
虚拟价格增长数据的时序图,加上一些随机噪声
import pylab
import random
# days to generate data for
duration = 100
# mean value
mean_inc = 0.2
# standard deviation
std_dev_inc = 1.2
# time series
x = range(duration)
y = []
price_today = 0
for i in x:
# 正态分布,均值0.2,标准差1.2
next_delta = random.normalvariate(mean_inc, std_dev_inc)
price_today += next_delta
y.append(price_today)
pylab.plot(x,y)
pylab.xlabel("Time")
pylab.xlabel("Time")
pylab.ylabel("Value")
pylab.show()
distributions
不同的分布
# coding: utf-8
import random
import matplotlib
import matplotlib.pyplot as plt
# 样本量
SAMPLE_SIZE = 1000
# histogram buckets
buckets = 100
plt.figure()
# we need to update font size just for this example
matplotlib.rcParams.update({\'font.size\': 7})
# 定义一个6*2的网格显示所有的直方图
# 第一个图形是[0,1)之间分布的随机变量(normal distributed random variable)
plt.subplot(621)
plt.xlabel("random.random")
# Return the next random floating point number in the range [0.0, 1.0).
res = []
for _ in xrange(1, SAMPLE_SIZE):
res.append(random.random())
plt.hist(res, buckets)
# 第二个图形是均匀分布的随机变量(uniformly distributed random variable)
plt.subplot(622)
plt.xlabel("random.uniform")
# Return a random floating point number N such that a <= N <= b for a <= b and b <= N <= a for b < a.
# The end-point value b may or may not be included in the range depending on floating-point rounding in the equation a + (b-a) * random().
a = 1
b = SAMPLE_SIZE
res = []
for _ in xrange(1, SAMPLE_SIZE):
res.append(random.uniform(a, b))
plt.hist(res, buckets)
# 第三个图形是三角形分布(triangular distribution)
plt.subplot(623)
plt.xlabel("random.triangular")
# Return a random floating point number N such that low <= N <= high and with the specified mode between those bounds. The low and high bounds default to zero and one. The mode argument defaults to the midpoint between the bounds, giving a symmetric distribution.
low = 1
high = SAMPLE_SIZE
res = []
for _ in xrange(1, SAMPLE_SIZE):
res.append(random.triangular(low, high))
plt.hist(res, buckets)
# 第四个图形是beta分布(beta distribution),参数条件是alpha和beta都要大于0,返回值在0~1之间
plt.subplot(624)
plt.xlabel("random.betavariate")
# Beta distribution. Conditions on the parameters are alpha > 0 and beta > 0. Returned values range between 0 and 1.
alpha = 1
beta = 10
res = []
for _ in xrange(1, SAMPLE_SIZE):
res.append(random.betavariate(alpha, beta))
plt.hist(res, buckets)
# 第五个图形是指数分布(exponential distribution)
plt.subplot(625)
plt.xlabel("random.expovariate")
# Exponential distribution. lambd is 1.0 divided by the desired mean. It should be nonzero. (The parameter would be called “lambda”, but that is a reserved word in Python.) Returned values range from 0 to positive infinity if lambd is positive, and from negative infinity to 0 if lambd is negative.
lambd = 1.0 / ((SAMPLE_SIZE + 1) / 2.)
res = []
for _ in xrange(1, SAMPLE_SIZE):
res.append(random.expovariate(lambd))
plt.hist(res, buckets)
# 第六个图形是gamma分布(gamma distribution)
plt.subplot(626)
plt.xlabel("random.gammavariate")
# Gamma distribution. (Not the gamma function!) Conditions on the parameters are alpha > 0 and beta > 0.
# The probability distribution function is:
#
# x ** (alpha - 1) * math.exp(-x / beta)
# pdf(x) = --------------------------------------
# math.gamma(alpha) * beta ** alpha
alpha = 1
beta = 10
res = []
for _ in xrange(1, SAMPLE_SIZE):
res.append(random.gammavariate(alpha, beta))
plt.hist(res, buckets)
# 第七个图形是对数正态分布(Log normal distribution)
plt.subplot(627)
plt.xlabel("random.lognormvariate")
# Log normal distribution. If you take the natural logarithm of this distribution, you’ll get a normal distribution with mean mu and standard deviation sigma. mu can have any value, and sigma must be greater than zero.
mu = 1
sigma = 0.5
res = []
for _ in xrange(1, SAMPLE_SIZE):
res.append(random.lognormvariate(mu, sigma))
plt.hist(res, buckets)
# 第八个是正态分布(normal distribution)
plt.subplot(628)
plt.xlabel("random.normalvariate")
# Normal distribution. mu is the mean, and sigma is the standard deviation.
mu = 1
sigma = 0.5
res = []
for _ in xrange(1, SAMPLE_SIZE):
res.append(random.normalvariate(mu, sigma))
plt.hist(res, buckets)
# 第九个是帕累托分布(pareto distributionß)
plt.subplot(629)
plt.xlabel("random.paretovariate")
# Pareto distribution. alpha is the shape parameter.
alpha = 1
res = []
for _ in xrange(1, SAMPLE_SIZE):
res.append(random.paretovariate(alpha))
plt.hist(res, buckets)
plt.tight_layout()
plt.show()
数据清理
mad
中位数绝对偏差(Median absolute deviation)是用来描述单变量样本在定量数据中可变性的一种标准。常用来度量统计分布,因为它会落在一组稳健统计数据中,因此对异常值有抵抗能力
用MAD来检验数据中异常值步骤
1. 生成0~1之间的随机数据
2. 加入一些异常值
3. 用is_outlier()方法检测异常值
4. 绘制出两个数据集合(x和filtered)的图表,观察区别
样例
import numpy as np
import matplotlib.pyplot as plt
def is_outlier(points, threshold=3.5):
"""
Returns a boolean array with True if points are outliers and False
otherwise.
Data points with a modified z-score greater than this
# value will be classified as outliers.
"""
# transform into vector
if len(points.shape) == 1:
points = points[:,None]
# compute median value
median = np.median(points, axis=0)
# compute diff sums along the axis
diff = np.sum((points - median)**2, axis=-1)
diff = np.sqrt(diff)
# compute MAD
med_abs_deviation = np.median(diff)
# compute modified Z-score
# http://www.itl.nist.gov/div898/handbook/eda/section4/eda43.htm#Iglewicz
modified_z_score = 0.6745 * diff / med_abs_deviation
# return a mask for each outlier
return modified_z_score > threshold
# Random data
x = np.random.random(100)
# histogram buckets
buckets = 50
# Add in a few outliers
x = np.r_[x, -49, 95, 100, -100]
# Keep inlier data points
# Note here that
# "~" is logical NOT on boolean numpy arrays
filtered = x[~is_outlier(x)]
# plot histograms
plt.figure()
plt.subplot(211)
plt.hist(x, buckets)
plt.xlabel(\'Raw\')
plt.subplot(212)
plt.hist(filtered, buckets)
plt.xlabel(\'Cleaned\')
plt.show()
boxplot
使用箱图
from pylab import *
# fake up some data
spread= rand(50) * 100
center = ones(25) * 50
# generate some outliers high and low
flier_high = rand(10) * 100 + 100
flier_low = rand(10) * -100
# merge generated data set
data = concatenate((spread, center, flier_high, flier_low), 0)
subplot(311)
# basic plot
# \'gx\' defining the outlier plotting properties
boxplot(data, 0, \'gx\')
# compare this with similar scatter plot
subplot(312)
spread_1 = concatenate((spread, flier_high, flier_low), 0)
center_1 = ones(70) * 25
scatter(center_1, spread_1)
xlim([0, 50])
# and with another that is more appropriate for
# scatter plot
subplot(313)
center_2 = rand(70) * 50
scatter(center_2, spread_1)
xlim([0, 50])
show()
log
# coding: utf-8
from pylab import *
# generate uniform data points
x = 1e6*rand(1000)
y = rand(1000)
figure()
# crate first subplot
subplot(211)
# make scatter plot
scatter(x, y)
xscale(\'linear\')
# limit x axis
xlim([1e-6, 1e6])
# crate second subplot
subplot(212)
# make scatter plot
scatter(x,y)
# but make x axis logarithmic
xscale(\'log\')
# set same x axis limit
xlim([1e-6, 1e6])
show()
噪声平滑处理
moving-average
求平均值
对窗口(样本)求平均,然后仅仅绘制给定窗口(样本)的平均值,而不是所有数据点,是更高级算法的基础。
from pylab import *
from numpy import *
def moving_average(interval, window_size):
\'\'\'
Compute convoluted window for given size
\'\'\'
window = ones(int(window_size)) / float(window_size)
return convolve(interval, window, \'same\')
t = linspace(-4, 4, 100)
y = sin(t) + randn(len(t))*0.1
plot(t, y, "k.")
# compute moving average
y_av = moving_average(y, 10)
plot(t, y_av,"r")
#xlim(0,1000)
xlabel("Time")
ylabel("Value")
grid(True)
show()
median-filter
中值滤波
中值滤波的中心思想就是逐项遍历信号,并用相邻信号项的中值替换当前项。此方法使得滤波处理非常快速,而且对一维数据集合和二维数据集合(如图像)都适用
import numpy as np
import pylab as p
import scipy.signal as signal
# get some linear data
x = np.linspace (0, 1, 101)
# add some noisy signal
x[3::10] = 1.5
p.plot(x)
p.plot(signal.medfilt(x,3))
p.plot(signal.medfilt(x,5))
p.legend([\'original signal\', \'length 3\',\'length 5\'])
p.show ()
scipy-smooth
基于信号(数据点)窗口的卷积(函数的总和)。准备信号时向两端添加相同信号的副本并做反射。减小了数据的边界效应。
import numpy
from numpy import *
from pylab import *
# possible window type
WINDOWS = [\'flat\', \'hanning\', \'hamming\', \'bartlett\', \'blackman\']
# if you want to see just two window type, comment previous line,
# and uncomment the following one
# WINDOWS = [\'flat\', \'hanning\']
def smooth(x, window_len=11, window=\'hanning\'):
"""
Smooth the data using a window with requested size.
Returns smoothed signal.
x -- input signal
window_len -- lenght of smoothing window
window -- type of window: \'flat\', \'hanning\', \'hamming\',
\'bartlett\', \'blackman\'
flat window will produce a moving average smoothing.
"""
if x.ndim != 1:
raise ValueError, "smooth only accepts 1 dimension arrays."
if x.size < window_len:
raise ValueError, "Input vector needs to be bigger than window size."
if window_len < 3:
return x
if not window in WINDOWS:
raise ValueError("Window is one of \'flat\', \'hanning\', \'hamming\', "
"\'bartlett\', \'blackman\'")
# adding reflected windows in front and at the end
s=numpy.r_[x[window_len-1:0:-1], x, x[-1:-window_len:-1]]
# pick windows type and do averaging
if window == \'flat\': #moving average
w = numpy.ones(window_len, \'d\')
else:
# call appropriate function in numpy
w = eval(\'numpy.\' + window + \'(window_len)\')
# NOTE: length(output) != length(input), to correct this:
# return y[(window_len/2-1):-(window_len/2)] instead of just y.
y = numpy.convolve(w/w.sum(), s, mode=\'valid\')
return y
# Get some evenly spaced numbers over a specified interval.
t = linspace(-4, 4, 100)
# Make some noisy sinusoidal
x = sin(t)
xn = x + randn(len(t))*0.1
# Smooth it
y = smooth(x)
# windows
ws = 31
subplot(211)
plot(ones(ws))
# draw on the same axes
hold(True)
# plot for every windows
for w in WINDOWS[1:]:
eval(\'plot(\'+w+\'(ws) )\')
# configure axis properties
axis([0, 30, 0, 1.1])
# add legend for every window
legend(WINDOWS)
title("Smoothing windows")
# add second plot
subplot(212)
# draw original signal
plot(x)
# and signal with added noise
plot(xn)
# smooth signal with noise for every possible windowing algorithm
for w in WINDOWS:
plot(smooth(xn, 10, w))
# add legend for every graph
l=[\'original signal\', \'signal with noise\']
l.extend(WINDOWS)
legend(l)
title("Smoothed signal")
show()