版本:v1.1
摘要
本文主要介绍了我自己crifan的Python函数库crifanLib.py,包括解释crifanLib.py由来,以及其中各种函数的功能和用法示例。
2013-09-29
| 修订历史 | ||
|---|---|---|
| 修订 1.1 | 2013-09-29 | crl |
| ||
版权 © 2013 Crifan, http://crifan.com
目录
范例清单
之前在折腾将(新版)百度空间,网易163,新浪sina,QQ空间,人人网,CSDN,搜狐Sohu,博客大巴Blogbus,天涯博客,点点轻博客等博客搬家到WordPress的过程中,先后遇到很多个问题,然后基本上也都自己解决了。对应的也写了相应的代码和函数。
后来就把其中比较常用或通用的功能,整理提取出来,放到一个单独的文件中,即crifanLib.py。
该文件,之前是以帖子的方式贴出来的:crifan的Python库:crifanLib.py
现在已放到google code中的crifanLib中的crifanLib.py,并且以后会同步保持最新版本的。
目录
下面把所有的函数的用法,都简单解释一下:
from datetime import datetime,timedelta;
#------------------------------------------------------------------------------
# get current time's timestamp
def getCurTimestamp() :
return datetimeToTimestamp(datetime.now());
#------------------------------------------------------------------------------
# convert datetime value to timestamp
# from "2006-06-01 00:00:00" to 1149091200
def datetimeToTimestamp(datetimeVal) :
return int(time.mktime(datetimeVal.timetuple()));
from datetime import datetime,timedelta;
#------------------------------------------------------------------------------
# convert timestamp to datetime value
# from 1149091200 to "2006-06-01 00:00:00"
def timestampToDatetime(timestamp) :
#print "type(timestamp)=",type(timestamp);
#print "timestamp=",timestamp;
#timestamp = int(timestamp);
timestamp = float(timestamp);
return datetime.fromtimestamp(timestamp);
例 2.2. timestampToDatetime使用范例
createtimeFloat = float(createtimeMillisecond)/1000;
localTime = timestampToDatetime(createtimeFloat);
import time;
#------------------------------------------------------------------------------
#init for calculate elapsed time
def calcTimeStart(uniqueKey) :
global gVal
gVal['calTimeKeyDict'][uniqueKey] = time.time();
return
#------------------------------------------------------------------------------
# to get elapsed time, before call this, should use calcTimeStart to init
def calcTimeEnd(uniqueKey) :
global gVal
return time.time() - gVal['calTimeKeyDict'][uniqueKey];
例 2.3. calcTimeStart和calcTimeEnd的使用范例
calcTimeStart("export_head");
exportHead(blogInfoDic);
gVal['statInfoDict']['exportHeadTime'] = calcTimeEnd("export_head");
from datetime import datetime,timedelta;
#------------------------------------------------------------------------------
# convert local GMT8 to GMT time
# note: input should be 'datetime' type, not 'time' type
def convertLocalToGmt(localTime) :
return localTime - timedelta(hours=8);
#------------------------------------------------------------------------------
# got python script self file name
# extract out xxx from:
# D:\yyy\zzz\xxx.py
# xxx.py
def extractFilename(inputStr) :
argv0List = inputStr.split("\\");
scriptName = argv0List[len(argv0List) - 1]; # get script file name self
possibleSuf = scriptName[-3:];
if possibleSuf == ".py" :
scriptName = scriptName[0:-3]; # remove ".py"
return scriptName;
例 2.5. extractFilename的使用范例
if __name__=="__main__":
# for : python xxx.py -s yyy # -> sys.argv[0]=xxx.py
# for : xxx.py -s yyy # -> sys.argv[0]=D:\yyy\zzz\xxx.py
scriptSelfName = extractFilename(sys.argv[0]);
#------------------------------------------------------------------------------
# replace the &#N; (N is digit number, N > 1) to unicode char
# eg: replace "'" with "'" in "Creepin' up on you"
def repUniNumEntToChar(text):
unicodeP = re.compile('&#[0-9]+;');
def transToUniChr(match): # translate the matched string to unicode char
numStr = match.group(0)[2:-1]; # remove '&#' and ';'
num = int(numStr);
unicodeChar = unichr(num);
return unicodeChar;
return unicodeP.sub(transToUniChr, text);
#------------------------------------------------------------------------------
# generate the full url, which include the main url plus the parameter list
# Note:
# normally just use urllib.urlencode is OK.
# only use this if you do NOT want urllib.urlencode convert some special chars($,:,{,},...) into %XX
def genFullUrl(mainUrl, paraDict) :
fullUrl = mainUrl;
fullUrl += '?';
for i, para in enumerate(paraDict.keys()) :
if(i == 0):
# first para no '&'
fullUrl += str(para) + '=' + str(paraDict[para]);
else :
fullUrl += '&' + str(para) + '=' + str(paraDict[para]);
return fullUrl;
例 2.7. genFullUrl的使用范例
# Note: here not use urllib.urlencode to encode para,
# for the encoded result will convert some special chars($,:,{,},...) into %XX
paraDict = {
'asyn' : '1',
'thread_id_enc' : '',
'start' : '',
'count' : '',
'orderby_type' : '0',
};
paraDict['thread_id_enc'] = str(threadIdEnc);
paraDict['start'] = str(startCmtIdx);
paraDict['count'] = str(reqCmtNum);
paraDict['t'] = str(cmtReqTime);
mainUrl = "http://hi.baidu.com/cmt/spcmt/get_thread";
getCmtUrl = genFullUrl(mainUrl, paraDict);
#------------------------------------------------------------------------------
# check whether two url is similar
# note: input two url both should be str type
def urlIsSimilar(url1, url2) :
isSim = False;
url1 = str(url1);
url2 = str(url2);
slashList1 = url1.split('/');
slashList2 = url2.split('/');
lenS1 = len(slashList1);
lenS2 = len(slashList2);
# all should have same structure
if lenS1 != lenS2 :
# not same sturcture -> must not similar
isSim = False;
else :
sufPos1 = url1.rfind('.');
sufPos2 = url2.rfind('.');
suf1 = url1[(sufPos1 + 1) : ];
suf2 = url2[(sufPos2 + 1) : ];
# at least, suffix should same
if (suf1 == suf2) :
lastSlashPos1 = url1.rfind('/');
lastSlashPos2 = url2.rfind('/');
exceptName1 = url1[:lastSlashPos1];
exceptName2 = url2[:lastSlashPos2];
# except name, all other part should same
if (exceptName1 == exceptName2) :
isSim = True;
else :
# except name, other part is not same -> not similar
isSim = False;
else :
# suffix not same -> must not similar
isSim = False;
return isSim;
如果相似,返回True和相似的地址;
如果不相似,返回False。
#------------------------------------------------------------------------------
# found whether the url is similar in urlList
# if found, return True, similarSrcUrl
# if not found, return False, ''
def findSimilarUrl(url, urlList) :
(isSimilar, similarSrcUrl) = (False, '');
for srcUrl in urlList :
if urlIsSimilar(url, srcUrl) :
isSimilar = True;
similarSrcUrl = srcUrl;
break;
return (isSimilar, similarSrcUrl);
例 2.9. findSimilarUrl的使用范例
# to check is similar, only when need check and the list it not empty
if ((gCfg['omitSimErrUrl'] == 'yes') and gVal['errorUrlList']):
(isSimilar, simSrcUrl) = findSimilarUrl(curUrl, gVal['errorUrlList']);
if isSimilar :
logging.warning(" Omit process %s for similar with previous error url", curUrl);
logging.warning(" %s", simSrcUrl);
continue;
#------------------------------------------------------------------------------
# remove non-word char == only retian alphanumeric character (char+number) and underscore
# eg:
# from againinput4@yeah to againinput4yeah
# from green-waste to greenwaste
def removeNonWordChar(inputString) :
return re.sub(r"[^\w]", "", inputString); # non [a-zA-Z0-9_]
例 2.10. removeNonWordChar的使用范例
wxrValidUsername = removeNonWordChar(gVal['blogUser']);
wxrValidUsername = wxrValidUsername.replace("_", "");
logging.info("Generated WXR safe username is %s", wxrValidUsername);
使得处理后的字符串,在XML都是合法的了。
#------------------------------------------------------------------------------
# remove control character from input string
# otherwise will cause wordpress importer import failed
# for wordpress importer, if contains contrl char, will fail to import wxr
# eg:
# 1. http://againinput4.blog.163.com/blog/static/172799491201110111145259/
# content contains some invalid ascii control chars
# 2. http://hi.baidu.com/notebookrelated/blog/item/8bd88e351d449789a71e12c2.html
# 165th comment contains invalid control char: ETX
# 3. http://green-waste.blog.163.com/blog/static/32677678200879111913911/
# title contains control char:DC1, BS, DLE, DLE, DLE, DC1
def removeCtlChr(inputString) :
validContent = '';
for c in inputString :
asciiVal = ord(c);
validChrList = [
9, # 9=\t=tab
10, # 10=\n=LF=Line Feed=换行
13, # 13=\r=CR=回车
];
# filter out others ASCII control character, and DEL=delete
isValidChr = True;
if (asciiVal == 0x7F) :
isValidChr = False;
elif ((asciiVal < 32) and (asciiVal not in validChrList)) :
isValidChr = False;
if(isValidChr) :
validContent += c;
return validContent;
例 2.11. removeCtlChr的使用范例
# remove the control char in title:
# eg;
# http://green-waste.blog.163.com/blog/static/32677678200879111913911/
# title contains control char:DC1, BS, DLE, DLE, DLE, DC1
infoDict['title'] = removeCtlChr(infoDict['title']);
![]() |
关于控制字符 |
|---|---|
如果不了解什么是控制字符,请参考:ASCII字符集中的功能/控制字符 |
#------------------------------------------------------------------------------
# convert the string entity to unicode unmber entity
# refer: http://www.htmlhelp.com/reference/html40/entities/latin1.html
# TODO: need later use this htmlentitydefs instead following
def replaceStrEntToNumEnt(text) :
strToNumEntDict = {
# Latin-1 Entities
" " : " ",
"¡" : "¡",
"¢" : "¢",
"£" : "£",
"¤" : "¤",
"¥" : "¥",
"¦" : "¦",
"§" : "§",
"¨" : "¨",
"©" : "©",
"ª" : "ª",
"«" : "«",
"¬" : "¬",
"­" : "­",
"®" : "®",
"¯" : "¯",
"°" : "°",
"±" : "±",
"²" : "²",
"³" : "³",
"´" : "´",
"µ" : "µ",
"¶" : "¶",
"·" : "·",
"¸" : "¸",
"¹" : "¹",
"º" : "º",
"»" : "»",
"¼" : "¼",
"½" : "½",
"¾" : "¾",
"¿" : "¿",
"À" : "À",
"Á" : "Á",
"Â" : "Â",
"Ã" : "Ã",
"Ä" : "Ä",
"Å" : "Å",
"Æ" : "Æ",
"Ç" : "Ç",
"È" : "È",
"É" : "É",
"Ê" : "Ê",
"Ë" : "Ë",
"Ì" : "Ì",
"Í" : "Í",
"Î" : "Î",
"Ï" : "Ï",
"Ð" : "Ð",
"Ñ" : "Ñ",
"Ò" : "Ò",
"Ó" : "Ó",
"Ô" : "Ô",
"Õ" : "Õ",
"Ö" : "Ö",
"×" : "×",
"Ø" : "Ø",
"Ù" : "Ù",
"Ú" : "Ú",
"Û" : "Û",
"Ü" : "Ü",
"Ý" : "Ý",
"Þ" : "Þ",
"ß" : "ß",
"à" : "à",
"á" : "á",
"â" : "â",
"ã" : "ã",
"ä" : "ä",
"å" : "å",
"æ" : "æ",
"ç" : "ç",
"è" : "è",
"é" : "é",
"ê" : "ê",
"ë" : "ë",
"ì" : "ì",
"í" : "í",
"î" : "î",
"ï" : "ï",
"ð" : "ð",
"ñ" : "ñ",
"ò" : "ò",
"ó" : "ó",
"ô" : "ô",
"õ" : "õ",
"ö" : "ö",
"÷" : "÷",
"ø" : "ø",
"ù" : "ù",
"ú" : "ú",
"û" : "û",
"ü" : "ü",
"ý" : "ý",
"þ" : "þ",
"ÿ" : "ÿ",
# http://www.htmlhelp.com/reference/html40/entities/special.html
# Special Entities
""" : """,
"&" : "&",
"<" : "<",
">" : ">",
"Œ" : "Œ",
"œ" : "œ",
"Š" : "Š",
"š" : "š",
"Ÿ" : "Ÿ",
"ˆ" : "ˆ",
"˜" : "˜",
" " : " ",
" " : " ",
" " : " ",
"‌" : "‌",
"‍" : "‍",
"‎" : "‎",
"‏" : "‏",
"–" : "–",
"—" : "—",
"‘" : "‘",
"’" : "’",
"‚" : "‚",
"“" : "“",
"”" : "”",
"„" : "„",
"†" : "†",
"‡" : "‡",
"‰" : "‰",
"‹" : "‹",
"›" : "›",
"€" : "€",
}
replacedText = text;
for key in strToNumEntDict.keys() :
replacedText = re.compile(key).sub(strToNumEntDict[key], replacedText);
return replacedText;
#------------------------------------------------------------------------------
# convert the xxx=yyy into tuple('xxx', yyy), then return the tuple value
# [makesure input string]
# (1) is not include whitespace
# (2) include '='
# (3) last is no ';'
# [possible input string]
# blogUserName="againinput4"
# publisherEmail=""
# synchMiniBlog=false
# publishTime=1322129849397
# publisherName=null
# publisherNickname="\u957F\u5927\u662F\u70E6\u607C"
def convertToTupleVal(equationStr) :
(key, value) = ('', None);
try :
# Note:
# here should not use split with '=', for maybe input string contains string like this:
# http://img.bimg.126.net/photo/hmZoNQaqzZALvVp0rE7faA==/0.jpg
# so use find('=') instead
firstEqualPos = equationStr.find("=");
key = equationStr[0:firstEqualPos];
valuePart = equationStr[(firstEqualPos + 1):];
# string type
valLen = len(valuePart);
if valLen >= 2 :
# maybe string
if valuePart[0] == '"' and valuePart[-1] == '"' :
# is string type
value = str(valuePart[1:-1]);
elif (valuePart.lower() == 'null'):
value = None;
elif (valuePart.lower() == 'false'):
value = False;
elif (valuePart.lower() == 'true') :
value = True;
else :
# must int value
value = int(valuePart);
else :
# len=1 -> must be value
value = int(valuePart);
#print "Convert %s to [%s]=%s"%(equationStr, key, value);
except :
(key, value) = ('', None);
print "Fail of convert the equal string %s to value"%(equationStr);
return (key, value);
例 2.13. convertToTupleVal的使用范例
# (4) convert to value
for equation in equationList :
(key, value) = convertToTupleVal(equation);
#------------------------------------------------------------------------------
# remove the empty ones in list
def removeEmptyInList(list) :
newList = [];
for val in list :
if val :
newList.append(val);
return newList;
例 2.14. removeEmptyInList的使用范例
# Note: some list contain [u''], so is not meaningful, remove it here
# for only [] is empty, [u''] is not empty -> error while exporting to WXR
infoDict['tags'] = removeEmptyInList(infoDict['tags']);
#------------------------------------------------------------------------------
# remove overlapped item in the list
def uniqueList(old_list):
newList = []
for x in old_list:
if x not in newList :
newList.append(x)
return newList
#------------------------------------------------------------------------------
# for listToFilter, remove the ones which is in listToCompare
# also return the ones which is already exist in listToCompare
def filterList(listToFilter, listToCompare) :
filteredList = [];
existedList = [];
for singleOne in listToFilter : # remove processed
if (not(singleOne in listToCompare)) :
# omit the ones in listToCompare
filteredList.append(singleOne);
else :
# record the already exist ones
existedList.append(singleOne);
return (filteredList, existedList);
例 2.16. filterList的使用范例
# remove processed and got ones that has been processed
(filteredPicList, existedList) = filterList(nonOverlapList, gVal['processedUrlList']);
#------------------------------------------------------------------------------
# generated the random digits number string
# max digit number is 12
def randDigitsStr(digitNum = 12) :
if(digitNum > 12):
digitNum = 12;
randVal = random.random();
#print "randVal=",randVal; #randVal= 0.134248340235
randVal = str(randVal);
#print "randVal=",randVal; #randVal= 0.134248340235
randVal = randVal.replace("0.", "");
#print "randVal=",randVal; #randVal= 0.134248340235
# if last is 0, append that 0
if(len(randVal)==11):
randVal = randVal + "0";
#print "randVal=",randVal; #randVal= 0.134248340235
#randVal = randVal.replace("e+11", "");
#randVal = randVal.replace(".", "");
#print "randVal=",randVal; #randVal= 0.134248340235
randVal = randVal[0 : digitNum];
#print "randVal=",randVal; #randVal= 0.134248340235
return randVal;
#------------------------------------------------------------------------------
# convert tuple list to dict value
# [(u'type', u'text/javascript'), (u'src', u'http://partner.googleadservices.com/gampad/google_service.js')]
# { u'type':u'text/javascript', u'src':u'http://partner.googleadservices.com/gampad/google_service.js' }
def tupleListToDict(tupleList):
convertedDict = {};
for eachTuple in tupleList:
(key, value) = eachTuple;
convertedDict[key] = value;
return convertedDict;
例 2.18. tupleListToDict 的使用范例
#singleContent: name=script, attrMap=None, attrs=[(u'type', u'text/javascript'), (u'src', u'http://partner.googleadservices.com/gampad/google_service.js')]
attrsDict = tupleListToDict(singleContent.attrs);
#------------------------------------------------------------------------------
# save binary data into file
def saveBinDataToFile(binaryData, fileToSave):
saveOK = False;
try:
savedBinFile = open(fileToSave, "wb"); # open a file, if not exist, create it
#print "savedBinFile=",savedBinFile;
savedBinFile.write(binaryData);
savedBinFile.close();
saveOK = True;
except :
saveOK = False;
return saveOK;
例 2.19. saveBinDataToFile的使用范例
# if url is invalid, then add timeout can avoid dead
respHtml = getUrlRespHtml(realUrl, useGzip=False, timeout=gConst['defaultTimeout']);
isDownOK = saveBinDataToFile(respHtml, fileToSave);
#------------------------------------------------------------------------------
# check file validation:
# open file url to check return info is match or not
# with exception support
# note: should handle while the file url is redirect
# eg :
# http://publish.it168.com/2007/0627/images/500754.jpg ->
# http://img.publish.it168.com/2007/0627/images/500754.jpg
# other special one:
# sina pic url:
# http://s14.sinaimg.cn/middle/3d55a9b7g9522d474a84d&690
# http://s14.sinaimg.cn/orignal/3d55a9b7g9522d474a84d
# the real url is same with above url
def isFileValid(fileUrl) :
fileIsValid = False;
errReason = "Unknown error";
try :
#print "original fileUrl=",fileUrl;
origFileName = fileUrl.split('/')[-1];
#print "origFileName=",origFileName;
#old: https://ie2zeq.bay.livefilestore.com/y1mo7UWr-TrmqbBhkw52I0ii__WE6l2UtMRSTZHSky66-uDxnCdKPr3bdqVrpUcQHcoJLedlFXa43bvCp_O0zEGF3JdG_yZ4wRT-c2AQmJ_TNcWvVZIXfBDgGerouWyx19WpA4I0XQR1syRJXjDNpwAbQ/IMG_5214_thumb[1].jpg
#new: https://kxoqva.bay.livefilestore.com/y1mQlGjwNAYiHKoH5Aw6TMNhsCmX2YDR3vPKnP86snuqQEtnZgy3dHkwUvZ61Ah8zU3AGiS4whmm_ADrvxdufEAfMGo56KjLdhIbosn9F34olQ/IMG_5214_thumb%5b1%5d.jpg
unquotedOrigFilenname = urllib.unquote(origFileName);
#print "unquotedOrigFilenname=",unquotedOrigFilenname
lowUnquotedOrigFilename = unquotedOrigFilenname.lower();
#print "lowUnquotedOrigFilename=",lowUnquotedOrigFilename;
resp = urllib2.urlopen(fileUrl, timeout=gConst['defaultTimeout']); # note: Python 2.6 has added timeout support.
#print "resp=",resp;
realUrl = resp.geturl();
#print "realUrl=",realUrl;
newFilename = realUrl.split('/')[-1];
#print "newFilename=",newFilename;
#http://blog.sina.com.cn/s/blog_696e50390100ntxs.html
unquotedNewFilename = urllib.unquote(newFilename);
#print "unquotedNewFilename=",unquotedNewFilename;
unquotedLowNewFilename = unquotedNewFilename.lower();
#print "unquotedLowNewFilename=",unquotedLowNewFilename;
respInfo = resp.info();
#print "respInfo=",respInfo;
respCode = resp.getcode();
#print "respCode=",respCode;
# special:
# http://116.img.pp.sohu.com/images/blog/2007/5/24/17/24/11355bf42a9.jpg
# return no content-length
#contentLen = respInfo['Content-Length'];
# for redirect, if returned size>0 and filename is same, also should be considered valid
#if (origFileName == newFilename) and (contentLen > 0):
# for redirect, if returned response code is 200(OK) and filename is same, also should be considered valid
#if (origFileName == newFilename) and (respCode == 200):
if (lowUnquotedOrigFilename == unquotedLowNewFilename) and (respCode == 200):
fileIsValid = True;
else :
fileIsValid = False;
# eg: Content-Type= image/gif, ContentTypes : audio/mpeg
# more ContentTypes can refer: http://kenya.bokee.com/3200033.html
contentType = respInfo['Content-Type'];
errReason = "file url returned info: type=%s, len=%d, realUrl=%s"%(contentType, contentLen, realUrl);
except urllib2.URLError,reason :
fileIsValid = False;
errReason = reason;
except urllib2.HTTPError,code :
fileIsValid = False;
errReason = code;
except :
fileIsValid = False;
errReason = "Unknown error";
# here type(errReason)= <class 'urllib2.HTTPError'>, so just convert it to str
errReason = str(errReason);
return (fileIsValid, errReason);
#------------------------------------------------------------------------------
# download from fileUrl then save to fileToSave
# with exception support
# note: the caller should make sure the fileUrl is a valid internet resource/file
def downloadFile(fileUrl, fileToSave, needReport = False) :
isDownOK = False;
downloadingFile = '';
#---------------------------------------------------------------------------
# note: totalFileSize -> may be -1 on older FTP servers which do not return a file size in response to a retrieval request
def reportHook(copiedBlocks, blockSize, totalFileSize) :
#global downloadingFile
if copiedBlocks == 0 : # 1st call : once on establishment of the network connection
print 'Begin to download %s, total size=%d'%(downloadingFile, totalFileSize);
else : # rest call : once after each block read thereafter
print 'Downloaded bytes: %d' % ( blockSize * copiedBlocks);
return;
#---------------------------------------------------------------------------
try :
if fileUrl :
downloadingFile = fileUrl;
if needReport :
urllib.urlretrieve(fileUrl, fileToSave, reportHook);
else :
urllib.urlretrieve(fileUrl, fileToSave);
isDownOK = True;
else :
print "Input download file url is NULL";
except urllib.ContentTooShortError(msg) :
isDownOK = False;
except :
isDownOK = False;
return isDownOK;
例 2.21. downloadFile的使用范例
if dstPicFile and downloadFile(curUrl, dstPicFile) :
# replace old url with new url
#------------------------------------------------------------------------------
# manually download fileUrl then save to fileToSave
def manuallyDownloadFile(fileUrl, fileToSave) :
isDownOK = False;
downloadingFile = '';
try :
if fileUrl :
# 1. find real address
#print "fileUrl=",fileUrl;
resp = urllib2.urlopen(fileUrl, timeout=gConst['defaultTimeout']);
#print "resp=",resp;
realUrl = resp.geturl(); # not same with original file url if redirect
# if url is invalid, then add timeout can avoid dead
respHtml = getUrlRespHtml(realUrl, useGzip=False, timeout=gConst['defaultTimeout']);
isDownOK = saveBinDataToFile(respHtml, fileToSave);
else :
print "Input download file url is NULL";
except urllib.ContentTooShortError(msg) :
isDownOK = False;
except :
isDownOK = False;
return isDownOK;
例 2.22. manuallyDownloadFile的使用范例
#if dstPicFile and downloadFile(curUrl, dstPicFile) :
# urlretrieve in downloadFile is too slow while download QQ Space Picture
# so here use manuallyDownloadFile instead
if dstPicFile and manuallyDownloadFile(curUrl, dstPicFile) :
# replace old url with new url
#------------------------------------------------------------------------------
# get response from url
# note: if you have already used cookiejar, then here will automatically use it
# while using rllib2.Request
def getUrlResponse(url, postDict={}, headerDict={}, timeout=0, useGzip=False) :
# makesure url is string, not unicode, otherwise urllib2.urlopen will error
url = str(url);
if (postDict) :
postData = urllib.urlencode(postDict);
req = urllib2.Request(url, postData);
req.add_header('Content-Type', "application/x-www-form-urlencoded");
else :
req = urllib2.Request(url);
if(headerDict) :
#print "added header:",headerDict;
for key in headerDict.keys() :
req.add_header(key, headerDict[key]);
defHeaderDict = {
'User-Agent' : gConst['userAgentIE9'],
'Cache-Control' : 'no-cache',
'Accept' : '*/*',
'Connection' : 'Keep-Alive',
};
# add default headers firstly
for eachDefHd in defHeaderDict.keys() :
#print "add default header: %s=%s"%(eachDefHd,defHeaderDict[eachDefHd]);
req.add_header(eachDefHd, defHeaderDict[eachDefHd]);
if(useGzip) :
#print "use gzip for",url;
req.add_header('Accept-Encoding', 'gzip, deflate');
# add customized header later -> allow overwrite default header
if(headerDict) :
#print "added header:",headerDict;
for key in headerDict.keys() :
req.add_header(key, headerDict[key]);
if(timeout > 0) :
# set timeout value if necessary
resp = urllib2.urlopen(req, timeout=timeout);
else :
resp = urllib2.urlopen(req);
return resp;
例 2.23. getUrlResponse的使用范例
resp = getUrlResponse(url, postDict, headerDict, timeout, useGzip);
respHtml = resp.read();
#------------------------------------------------------------------------------
# get response html==body from url
#def getUrlRespHtml(url, postDict={}, headerDict={}, timeout=0, useGzip=False) :
def getUrlRespHtml(url, postDict={}, headerDict={}, timeout=0, useGzip=True) :
resp = getUrlResponse(url, postDict, headerDict, timeout, useGzip);
respHtml = resp.read();
if(useGzip) :
#print "---before unzip, len(respHtml)=",len(respHtml);
respInfo = resp.info();
# Server: nginx/1.0.8
# Date: Sun, 08 Apr 2012 12:30:35 GMT
# Content-Type: text/html
# Transfer-Encoding: chunked
# Connection: close
# Vary: Accept-Encoding
# ...
# Content-Encoding: gzip
# sometime, the request use gzip,deflate, but actually returned is un-gzip html
# -> response info not include above "Content-Encoding: gzip"
# eg: http://blog.sina.com.cn/s/comment_730793bf010144j7_3.html
# -> so here only decode when it is indeed is gziped data
if( ("Content-Encoding" in respInfo) and (respInfo['Content-Encoding'] == "gzip")) :
respHtml = zlib.decompress(respHtml, 16+zlib.MAX_WBITS);
#print "+++ after unzip, len(respHtml)=",len(respHtml);
return respHtml;
例 2.25. getUrlRespHtml的使用范例:带额外参数
modifyUrl = gVal['blogEntryUrl'] + "/blog/submit/modifyblog";
#logging.debug("Modify Url is %s", modifyUrl);
#http://hi.baidu.com/wwwhaseecom/blog/item/79188d1b4fa36f068718bf79.html
foundSpBlogID = re.search(r"blog/item/(?P<spBlogID>\w+?).html", url);
if(foundSpBlogID) :
spBlogID = foundSpBlogID.group("spBlogID");
logging.debug("Extracted spBlogID=%s", spBlogID);
else :
modifyOk = False;
errInfo = "Can't extract post spBlogID !";
return (modifyOk, errInfo);
newPostContentGb18030 = newPostContentUni.encode("GB18030");
categoryGb18030 = infoDict['category'].encode("GB18030");
titleGb18030 = infoDict['title'].encode("GB18030");
postDict = {
"bdstoken" : gVal['spToken'],
"ct" : "1",
"mms_flag" : "0",
"cm" : "2",
"spBlogID" : spBlogID,
"spBlogCatName_o": categoryGb18030, # old catagory
"edithid" : "",
"previewImg" : "",
"spBlogTitle" : titleGb18030,
"spBlogText" : newPostContentGb18030,
"spBlogCatName" : categoryGb18030, # new catagory
"spBlogPower" : "0",
"spIsCmtAllow" : "1",
"spShareNotAllow":"0",
"spVcode" : "",
"spVerifyKey" : "",
}
headerDict = {
# 如果不添加Referer,则返回的html则会出现错误:"数据添加的一般错误"
"Referer" : gVal['blogEntryUrl'] + "/blog/modify/" + spBlogID,
}
respHtml = getUrlRespHtml(modifyUrl, postDict, headerDict);
因为成功登录某网页后,一般都会有对应的cookie返回,所以常用此函数去判断是否成功登录某网页。
#------------------------------------------------------------------------------
# check all cookies in cookiesDict is exist in cookieJar or not
def checkAllCookiesExist(cookieNameList, cookieJar) :
cookiesDict = {};
for eachCookieName in cookieNameList :
cookiesDict[eachCookieName] = False;
allCookieFound = True;
for cookie in cookieJar :
if(cookie.name in cookiesDict) :
cookiesDict[cookie.name] = True;
for eachCookie in cookiesDict.keys() :
if(not cookiesDict[eachCookie]) :
allCookieFound = False;
break;
return allCookieFound;
例 2.26. checkAllCookiesExist的使用范例
#http://www.darlingtree.com/wordpress/archives/242
gVal['cj'] = cookielib.CookieJar();
opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(gVal['cj']));
urllib2.install_opener(opener);
resp = urllib2.urlopen(baiduSpaceEntryUrl);
loginBaiduUrl = "https://passport.baidu.com/?login";
#username=%D0%C4%C7%E9%C6%DC%CF%A2%B5%D8&password=xxx&mem_pass=on
postDict = {
'username' : username,
'password' : password,
'mem_pass' : 'on',
};
resp = getUrlResponse(loginBaiduUrl, postDict);
# check whether the cookie is OK
cookieNameList = ["USERID", "PTOKEN", "STOKEN"];
loginOk = checkAllCookiesExist(cookieNameList, gVal['cj']);
if (not loginOk) :
logging.error("Login fail for not all expected cookies exist !");
return loginOk;
#------------------------------------------------------------------------------
# depend on chardet
# check whether the strToDect is ASCII string
def strIsAscii(strToDect) :
isAscii = False;
encInfo = chardet.detect(strToDect);
if (encInfo['confidence'] > 0.9) and (encInfo['encoding'] == 'ascii') :
isAscii = True;
return isAscii;
例 2.27. strIsAscii的使用范例
if(not strIsAscii(extractedBlogUser)) :
# if is: http://hi.baidu.com/资料收集
# then should quote it, otherwise later output to WXR will fail !
extractedBlogUser = urllib.quote(extractedBlogUser);
此代码中是判断是否大于0.5来决定是否是可能的字符串类型。使用者可根据自己需要,改为自己想要的概率,比如0.8等。
#------------------------------------------------------------------------------
# get the possible(possiblility > 0.5) charset of input string
def getStrPossibleCharset(inputStr) :
possibleCharset = "ascii";
#possibleCharset = "UTF-8";
encInfo = chardet.detect(inputStr);
#print "encInfo=",encInfo;
if (encInfo['confidence'] > 0.5):
possibleCharset = encInfo['encoding'];
return possibleCharset;
#return encInfo['encoding'];
例 2.28. getStrPossibleCharset的使用范例
validCharset = getStrPossibleCharset(dataJsonStr);
logging.debug("Now try use the detected charset %s to decode it again", validCharset);
此函数支持多种语言。
如无额外参数,则默认是将中文翻译为英文。
#------------------------------------------------------------------------------
# depend on BeautifulSoup
# translate strToTranslate from fromLanguage to toLanguage
# return the translated unicode string
# some frequently used language abbrv:
# Chinese Simplified: zh-CN
# Chinese Traditional: zh-TW
# English: en
# German: de
# Japanese: ja
# Korean: ko
# French: fr
# more can be found at:
# http://code.google.com/intl/ru/apis/language/translate/v2/using_rest.html#language-params
def translateString(strToTranslate, fromLanguage="zh-CN", toLanguage="en"):
transOK = False;
translatedStr = strToTranslate;
transErr = '';
try :
# following refer: http://python.u85.us/viewnews-335.html
postDict = {'hl':'zh-CN', 'ie':'UTF-8', 'text':strToTranslate, 'langpair':"%s|%s"%(fromLanguage, toLanguage)};
googleTranslateUrl = 'http://translate.google.cn/translate_t';
resp = getUrlRespHtml(googleTranslateUrl, postDict);
#logging.debug("---------------google translate resp html:\n%s", resp);
except urllib2.URLError,reason :
transOK = False;
transErr = reason;
except urllib2.HTTPError,code :
transOK = False;
transErr = code;
else :
soup = BeautifulSoup(resp);
resultBoxSpan = soup.find(id='result_box');
if resultBoxSpan and resultBoxSpan.span and resultBoxSpan.span.string :
transOK = True;
#translatedStr = resultBoxSpan.span.string.encode('utf-8');
googleRetTransStr = resultBoxSpan.span.string;
translatedStr = unicode(googleRetTransStr);
# just record some special one:
# from:
#【转载】[SEP4020 u-boot] start.s 注释
# to:
# The 【reserved] [the SEP4020 u-boot] start.s comment
else :
transOK = False;
transErr = "can not extract translated string from returned result";
transErr = str(transErr);
if transOK :
return (transOK, translatedStr);
else :
return (transOK, transErr);
#------------------------------------------------------------------------------
# translate the Chinese Simplified(Zh-cn) string to English(en)
def transZhcnToEn(strToTrans) :
translatedStr = strToTrans;
transOK = False;
transErr = '';
if strIsAscii(strToTrans) :
transOK = True;
translatedStr = strToTrans;
else :
(transOK, translatedStr) = translateString(strToTrans, "zh-CN", "en");
return (transOK, translatedStr);
#------------------------------------------------------------------------------
#remove specific tag[key]=value in soup contents (list of BeautifulSoup.Tag/BeautifulSoup.NavigableString)
# eg:
# (1)
# removeSoupContentsTagAttr(soupContents, "p", "class", "cc-lisence")
# to remove <p class="cc-lisence" style="line-height:180%;">......</p>, from
# [
# u'\n',
# <p class="cc-lisence" style="line-height:180%;">......</p>,
# u'\u5bf9......\u3002',
# <p>跑题了。......我争取。</p>,
# <br />,
# u'\n',
# <div class="clear"></div>,
# ]
# (2)
#contents = removeSoupContentsTagAttr(contents, "div", "class", "addfav", True);
# remove <div class="addfav">.....</div> from:
# [u'\n',
# <div class="postFooter">......</div>,
# <div style="padding-left:2em">
# ...
# <div class="addfav">......</div>
# ...
# </div>,
# u'\n']
def removeSoupContentsTagAttr(soupContents, tagName, tagAttrKey, tagAttrVal="", recursive=False) :
global gVal;
#print "in removeSoupContentsClass";
#print "[",gVal['currentLevel'],"] input tagName=",tagName," tagAttrKey=",tagAttrKey," tagAttrVal=",tagAttrVal;
#logging.debug("[%d] input, %s[%s]=%s, soupContents:%s", gVal['currentLevel'],tagName,tagAttrKey,tagAttrVal, soupContents);
#logging.debug("[%d] input, %s[%s]=%s", gVal['currentLevel'],tagName, tagAttrKey, tagAttrVal);
filtedContents = [];
for singleContent in soupContents:
#logging.debug("current singleContent=%s",singleContent);
#logging.info("singleContent=%s", singleContent);
#print "type(singleContent)=",type(singleContent);
#print "singleContent.__class__=",singleContent.__class__;
#if(isinstance(singleContent, BeautifulSoup)):
#if(BeautifulSoup.Tag == singleContent.__class__):
#if(isinstance(singleContent, instance)):
#if(isinstance(singleContent, BeautifulSoup.Tag)):
if(isinstance(singleContent, Tag)):
#print "isinstance true";
#logging.debug("singleContent: name=%s, attrMap=%s, attrs=%s",singleContent.name, singleContent.attrMap, singleContent.attrs);
# if( (singleContent.name == tagName)
# and (singleContent.attrMap)
# and (tagAttrKey in singleContent.attrMap)
# and ( (tagAttrVal and (singleContent.attrMap[tagAttrKey]==tagAttrVal)) or (not tagAttrVal) ) ):
# print "++++++++found tag:",tagName,"[",tagAttrKey,"]=",tagAttrVal,"\n in:",singleContent;
# #print "dir(singleContent)=",dir(singleContent);
# logging.debug("found %s[%s]=%s in %s", tagName, tagAttrKey, tagAttrVal, singleContent.attrMap);
# above using attrMap, but attrMap has bug for:
#singleContent: name=script, attrMap=None, attrs=[(u'type', u'text/javascript'), (u'src', u'http://partner.googleadservices.com/gampad/google_service.js')]
# so use attrs here
#logging.debug("singleContent: name=%s, attrs=%s", singleContent.name, singleContent.attrs);
attrsDict = tupleListToDict(singleContent.attrs);
if( (singleContent.name == tagName)
and (singleContent.attrs)
and (tagAttrKey in attrsDict)
and ( (tagAttrVal and (attrsDict[tagAttrKey]==tagAttrVal)) or (not tagAttrVal) ) ):
#print "++++++++found tag:",tagName,"[",tagAttrKey,"]=",tagAttrVal,"\n in:",singleContent;
#print "dir(singleContent)=",dir(singleContent);
logging.debug("found %s[%s]=%s in %s", tagName, tagAttrKey, tagAttrVal, attrsDict);
else:
if(recursive):
#print "-----sub call";
gVal['currentLevel'] = gVal['currentLevel'] + 1;
#logging.debug("[%d] now will filter %s[%s=]%s, for singleContent.contents=%s", gVal['currentLevel'], tagName,tagAttrKey,tagAttrVal, singleContent.contents);
#logging.debug("[%d] now will filter %s[%s=]%s", gVal['currentLevel'], tagName,tagAttrKey,tagAttrVal);
filteredSingleContent = singleContent;
filteredSubContentList = removeSoupContentsTagAttr(filteredSingleContent.contents, tagName, tagAttrKey, tagAttrVal, recursive);
gVal['currentLevel'] = gVal['currentLevel'] -1;
filteredSingleContent.contents = filteredSubContentList;
#logging.debug("[%d] after filter, sub contents=%s", gVal['currentLevel'], filteredSingleContent);
#logging.debug("[%d] after filter contents", gVal['currentLevel']);
filtedContents.append(filteredSingleContent);
else:
#logging.debug("not recursive, append:%s", singleContent);
#logging.debug("not recursive, now append singleContent");
filtedContents.append(singleContent);
# name = singleContent.name;
# if(name == tagName):
# print "name is equal, name=",name;
# attrMap = singleContent.attrMap;
# print "attrMap=",attrMap;
# if attrMap:
# if tagAttrKey in attrMap:
# print "tagAttrKey=",tagAttrKey," in attrMap";
# if(tagAttrVal and (attrMap[tagAttrKey]==tagAttrVal)) or (not tagAttrVal):
# print "++++++++found tag:",tagName,"[",tagAttrKey,"]=",tagAttrVal,"\n in:",singleContent;
# #print "dir(singleContent)=",dir(singleContent);
# logging.debug("found tag, tagAttrVal=%s, %s[%s]=%s", tagAttrVal, tagName, tagAttrVal, attrMap[tagAttrKey]);
# else:
# print "key in attrMap, but value not equal";
# if(recursive):
# print "-----sub call 111";
# gVal['currentLevel'] = gVal['currentLevel'] + 1;
# singleContent = removeSoupContentsTagAttr(singleContent.contents, tagName, tagAttrKey, tagAttrVal, recursive);
# gVal['currentLevel'] = gVal['currentLevel'] -1;
# filtedContents.append(singleContent);
# else:
# print "key not in attrMap";
# if(recursive):
# print "-----sub call 222";
# gVal['currentLevel'] = gVal['currentLevel'] + 1;
# singleContent = removeSoupContentsTagAttr(singleContent.contents, tagName, tagAttrKey, tagAttrVal, recursive);
# gVal['currentLevel'] = gVal['currentLevel'] -1;
# filtedContents.append(singleContent);
# else:
# print "attrMap is None";
# if(recursive):
# print "-----sub call 333";
# gVal['currentLevel'] = gVal['currentLevel'] + 1;
# singleContent = removeSoupContentsTagAttr(singleContent.contents, tagName, tagAttrKey, tagAttrVal, recursive);
# gVal['currentLevel'] = gVal['currentLevel'] -1;
# filtedContents.append(singleContent);
# else:
# print "name not equal, name=",name," tagName=",tagName;
# if(recursive):
# print "-----sub call 444";
# gVal['currentLevel'] = gVal['currentLevel'] + 1;
# singleContent = removeSoupContentsTagAttr(singleContent.contents, tagName, tagAttrKey, tagAttrVal, recursive);
# gVal['currentLevel'] = gVal['currentLevel'] -1;
# filtedContents.append(singleContent);
else:
# is BeautifulSoup.NavigableString
#print "not BeautifulSoup instance";
filtedContents.append(singleContent);
#print "filterd contents=",filtedContents;
#logging.debug("[%d] before return, filtedContents=%s", gVal['currentLevel'], filtedContents);
return filtedContents;
例 2.31. removeSoupContentsTagAttr 的使用范例
foundPostbody = soup.find(attrs={"class":"postBody"});
contents = foundPostbody.contents;
contents = removeSoupContentsTagAttr(contents, "p", "class", "cc-lisence", True); #版权声明
contents = removeSoupContentsTagAttr(contents, "div", "class", "relpost", True); #历史上的今天, 相关帖子
contents = removeSoupContentsTagAttr(contents, "div", "class", "addfav", True); #收藏到
#------------------------------------------------------------------------------
# find the first BeautifulSoup.NavigableString from soup contents
def findFirstNavigableString(soupContents):
firstString = None;
for eachContent in soupContents:
# note here must import NavigableString from BeautifulSoup
if(isinstance(eachContent, NavigableString)):
firstString = eachContent;
break;
return firstString;
#------------------------------------------------------------------------------
# convert soup contents into unicode string
def soupContentsToUnicode(soupContents) :
#method 1
mappedContents = map(CData, soupContents);
#print "mappedContents OK";
#print "type(mappedContents)=",type(mappedContents); #type(mappedContents)= <type 'list'>
contentUni = ''.join(mappedContents);
#print "contentUni=",contentUni;
# #method 2
# originBlogContent = "";
# logging.debug("Total %d contents for original soup contents:", len(soupContents));
# for i, content in enumerate(soupContents):
# if(content):
# logging.debug("[%d]=%s", i, content);
# originBlogContent += unicode(content);
# else :
# logging.debug("[%d] is null", i);
# logging.debug("---method 1: map and join---\n%s", contentUni);
# logging.debug("---method 2: enumerate ---\n%s", originBlogContent);
# # -->> seem that two method got same blog content
#logging.debug("soup contents to unicode string OK");
return contentUni;
例 2.32. soupContentsToUnicode 的使用范例
postmetadataUni = soupContentsToUnicode(foundPostmetadata.contents);