<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
日常生活中常會遇到一些小任務,如果人工處理會很麻煩。
用python做些小指令碼處理,能夠提高不少效率。或者可以把python當工具使用,輔助提高一下辦公效率。(比如我常拿python當計算器,計算和字元轉換用)
以下總結下個人用到的一些python小指令碼留作備忘。
用途:通訊報文中的hex資料不好看,可以列印為16進位制的字串顯示出來。
#coding=utf-8 #name: myutil.py def print_hex1(s,prev='0x'): for c in s: print '%s%02x' %(prev,ord(c)), print def print_hex(s): for c in s: print '%02x' %(ord(c)), print print 'myutil' def print_hex3(s,prev='0x'): i = 0 for c in s: print '%s%s,' %(prev,s[i:i+2]), i += 2 print
之前搞微控制器時生成的hex應用程式檔案不能直接刷到微控制器裡,還需要把iap程式合併成一個檔案才能燒寫到微控制器。每次打包麻煩,做個指令碼處理:
#path='C:\Users\test\IAP_CZ_v204w.hex' #file=open(path,'r') #for ll in file.readlines() # print ll #coding=gb18030 import time import os def prr(): print 'file combination begin..' path0=os.getcwd() print path0 path=path0 #path1=path0 path2=path0 path+='\IAP_CZ_v204w.hex' #path1+='\NC_armStaSystem.hex' path2+='\' print path s=raw_input('enter file path:') path1=s #path1+='\NC_armStaSystem.hex' print path1 s=raw_input('enter file name:') path2+=s path2+=time.strftime('_%y%m%d%H%M%S') path2+='.hex' print path2 prr() try: f1=open(path,'r') count=0 for l in f1.readlines(): # print l count+=1 #print count f1.close() f1=open(path,'r') f2=open(path1,'r') f3=open(path2,'w') while(count>1): l=f1.readline() # print l f3.write(l) count-=1 # print count f3.flush() for l in f2.readlines(): f3.write(l) f3.flush() f3.close() print 'combination success!' except Exception,ex: print 'excettion occured!' print ex s=raw_input('press any key to continue...') finally: f1.close() f2.close() s=raw_input('press any key to continue...')
網上好看的動漫圖集,如果手工下載太費時了。簡單分析下網頁地址規律,寫個多執行緒指令碼搞定。
#!/usr/bin/python # -*- coding: utf-8 -*- # filename: paxel.py '''It is a multi-thread downloading tool It was developed follow axel. Author: volans E-mail: volansw [at] gmail.com ''' import sys import os import time import urllib from threading import Thread local_proxies = {'http': 'http://131.139.58.200:8080'} class AxelPython(Thread, urllib.FancyURLopener): '''Multi-thread downloading class. run() is a vitural method of Thread. ''' def __init__(self, threadname, url, filename, ranges=0, proxies={}): Thread.__init__(self, name=threadname) urllib.FancyURLopener.__init__(self, proxies) self.name = threadname self.url = url self.filename = filename self.ranges = ranges self.downloaded = 0 def run(self): '''vertual function in Thread''' try: self.downloaded = os.path.getsize( self.filename ) except OSError: #print 'never downloaded' self.downloaded = 0 # rebuild start poind self.startpoint = self.ranges[0] + self.downloaded # This part is completed if self.startpoint >= self.ranges[1]: print 'Part %s has been downloaded over.' % self.filename return self.oneTimeSize = 16384 #16kByte/time print 'task %s will download from %d to %d' % (self.name, self.startpoint, self.ranges[1]) self.addheader("Range", "bytes=%d-%d" % (self.startpoint, self.ranges[1])) self.urlhandle = self.open( self.url ) data = self.urlhandle.read( self.oneTimeSize ) while data: filehandle = open( self.filename, 'ab+' ) filehandle.write( data ) filehandle.close() self.downloaded += len( data ) #print "%s" % (self.name) #progress = u'r...' data = self.urlhandle.read( self.oneTimeSize ) def GetUrlFileSize(url, proxies={}): urlHandler = urllib.urlopen( url, proxies=proxies ) headers = urlHandler.info().headers length = 0 for header in headers: if header.find('Length') != -1: length = header.split(':')[-1].strip() length = int(length) return length def SpliteBlocks(totalsize, blocknumber): blocksize = totalsize/blocknumber ranges = [] for i in range(0, blocknumber-1): ranges.append((i*blocksize, i*blocksize +blocksize - 1)) ranges.append(( blocksize*(blocknumber-1), totalsize -1 )) return ranges def islive(tasks): for task in tasks: if task.isAlive(): return True return False def paxel(url, output, blocks=6, proxies=local_proxies): ''' paxel ''' size = GetUrlFileSize( url, proxies ) ranges = SpliteBlocks( size, blocks ) threadname = [ "thread_%d" % i for i in range(0, blocks) ] filename = [ "tmpfile_%d" % i for i in range(0, blocks) ] tasks = [] for i in range(0,blocks): task = AxelPython( threadname[i], url, filename[i], ranges[i] ) task.setDaemon( True ) task.start() tasks.append( task ) time.sleep( 2 ) while islive(tasks): downloaded = sum( [task.downloaded for task in tasks] ) process = downloaded/float(size)*100 show = u'rFilesize:%d Downloaded:%d Completed:%.2f%%' % (size, downloaded, process) sys.stdout.write(show) sys.stdout.flush() time.sleep( 0.5 ) filehandle = open( output, 'wb+' ) for i in filename: f = open( i, 'rb' ) filehandle.write( f.read() ) f.close() try: os.remove(i) pass except: pass filehandle.close() if __name__ == '__main__': url = "https://s3.ap-northeast-1.wasabisys.com/img.it145.com/202207/001qwbch3eqg0h.jpg" output = '001.jpg' paxel( url, output, blocks=4, proxies={} )
多執行緒下載圖片並儲存到指定目錄中,若目錄不存在則自動建立。
# -*- coding: UTF-8 -*- ''' import re import urllib urls='https://s3.ap-northeast-1.wasabisys.com/img.it145.com/202207/01sfxwes4tzoe.jpg' def getHtml(url): page = urllib.urlopen(url) html = page.read() return html def getImg(html): reg = r'src="(.+?.jpg)" pic_ext' imgre = re.compile(reg) imglist = imgre.findall(html) x = 0 for imgurl in imglist: urllib.urlretrieve(imgurl,'%s.jpg' % x) x = x + 1 html = getHtml("http://tieba.baidu.com/p/2460150866") getImg(html) ''' import re import urllib import threading import time import socket socket.setdefaulttimeout(30) urls=[] j=0 for i in xrange(1,81): if (i-1)%4 == 0: j += 1 if ((j-1)%5) == 0 : j=1 site='http://xz%d.mm667.com/xz%02d/images/' %(j,i) urls.append(site) print urls[i-1] #print urls ''' urls.append('http://xz1.mm667.com/xz01/images/') urls.append('http://xz1.mm667.com/xz02/images/') urls.append('http://xz1.mm667.com/xz03/images/') urls.append('http://xz1.mm667.com/xz04/images/') urls.append('http://xz1.mm667.com/xz84/images/') urls.append('http://xz2.mm667.com/xz85/images/') urls.append('http://xz3.mm667.com/xz86/images/') urls.append('http://xz1.mm667.com/s/') urls.append('http://xz1.mm667.com/p/') ''' def mkdir(path): # 引入模組 import os # 去除首位空格 path=path.strip() # 去除尾部 符號 path=path.rstrip("\") # 判斷路徑是否存在 # 存在 True # 不存在 False isExists=os.path.exists(path) # 判斷結果 if not isExists: # 如果不存在則建立目錄 print path+u' 建立成功' # 建立目錄操作函數 os.makedirs(path) return True else: # 如果目錄存在則不建立,並提示目錄已存在 print path+u' 目錄已存在' return False def cbk(a,b,c): '''''回撥函數 @a: 已經下載的資料塊 @b: 資料塊的大小 @c: 遠端檔案的大小 ''' per = 100.0 * a * b / c if per > 100: per = 100 print '%.2f%%' % per #url = 'http://www.sina.com.cn' local = 'd:\mysite\pic1\' d=0 mutex = threading.Lock() # mutex1 = threading.Lock() class MyThread(threading.Thread): def __init__(self, url, name): threading.Thread.__init__(self) self.url=url self.name=name def run(self): mutex.acquire() print print 'down from %s' % self.url time.sleep(1) mutex.release() try: urllib.urlretrieve(self.url, self.name) except Exception,e: print e time.sleep(1) urllib.urlretrieve(self.url, self.name) threads=[] for u in urls[84:]: d += 1 local = 'd:\mysite\pic1\%d\' %d mkdir(local) print 'download begin...' for i in xrange(40): lcal = local url=u url += '%03d.jpg' %i lcal += '%03d.jpg' %i th = MyThread(url,lcal) threads.append(th) th.start() # for t in threads: # t.join() print 'over! download finished'
#!/usr/bin/env python # -*- coding:utf-8 -*- """ Python爬蟲,抓取一卡通相關企業資訊 Anthor: yangyongzhen Version: 0.0.2 Date: 2014-12-14 Language: Python2.7.5 Editor: Sublime Text2 """ import urllib2, re, string import threading, Queue, time import sys import os from bs4 import BeautifulSoup #from pprint import pprint reload(sys) sys.setdefaultencoding('utf8') _DATA = [] FILE_LOCK = threading.Lock() SHARE_Q = Queue.Queue() #構造一個不限制大小的的佇列 _WORKER_THREAD_NUM = 3 #設定執行緒的個數 _Num = 0 #總條數 class MyThread(threading.Thread) : def __init__(self, func,num) : super(MyThread, self).__init__() #呼叫父類別的建構函式 self.func = func #傳入執行緒函數邏輯 self.thread_num = num def run(self) : self.func() #print u'執行緒ID:',self.thread_num def worker() : global SHARE_Q while not SHARE_Q.empty(): url = SHARE_Q.get() #獲得任務 my_page = get_page(url) find_data(my_page) #獲得當前頁面的資料 #write_into_file(temp_data) time.sleep(1) SHARE_Q.task_done() def get_page(url) : """ 根據所給的url爬取網頁HTML Args: url: 表示當前要爬取頁面的url Returns: 返回抓取到整個頁面的HTML(unicode編碼) Raises: URLError:url引發的異常 """ try : html = urllib2.urlopen(url).read() my_page = html.decode("gbk",'ignore') #my_page = unicode(html,'utf-8','ignore').encode('utf-8','ignore') #my_page = urllib2.urlopen(url).read().decode("utf8") except urllib2.URLError, e : if hasattr(e, "code"): print "The server couldn't fulfill the request." print "Error code: %s" % e.code elif hasattr(e, "reason"): print "We failed to reach a server. Please check your url and read the Reason" print "Reason: %s" % e.reason return my_page def find_data(my_page) : """ 通過返回的整個網頁HTML, 正則匹配名稱 Args: my_page: 傳入頁面的HTML文字用於正則匹配 """ global _Num temp_data = [] items = BeautifulSoup(my_page).find_all("div", style="width:96%;margin:10px;border-bottom:1px #CCC dashed;padding-bottom:10px;") for index, item in enumerate(items) : #print item #print item.h1 #print h.group() #temp_data.append(item) #print item.find(re.compile("^a")) href = item.find(re.compile("^a")) #soup = BeautifulSoup(item) #公司名稱 if item.a: data = item.a.string.encode("gbk","ignore") print data temp_data.append(data) goods = item.find_all("div", style="font-size:12px;") #經營產品與聯絡方式 for i in goods: data = i.get_text().encode("gbk","ignore") temp_data.append(data) print data #b = item.find_all("b") #print b #連結地址 pat = re.compile(r'href="([^"]*)"') h = pat.search(str(item)) if h: #print h.group(0) href = h.group(1) print href temp_data.append(h.group(1)) _Num += 1 #b = item.find_all(text=re.compile("Dormouse")) #pprint(goods) #print href #pat = re.compile(r'title="([^"]*)"') #h = pat.search(str(href)) #if h: #print h.group(1) #temp_data.append(h.group(1)) _DATA.append(temp_data) #headers = {'User-Agent':"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.1 (KHTML, like Gecko) Chrome/22.0.1207.1 Safari/537.1"}##瀏覽器請求頭(大部分網站沒有這個請求頭會報錯、請務必加上哦) #all_url = 'http://www.mzitu.com/all' ##開始的URL地址 #start_html = requests.get(all_url, headers=headers) ##使用requests中的get方法來獲取all_url(就是:http://www.mzitu.com/all這個地址)的內容 headers為上面設定的請求頭、請務必參考requests官方檔案解釋 #print(start_html.text) ##列印出start_html (請注意,concent是二進位制的資料,一般用於下載圖片、視訊、音訊、等多媒體內容是才使用concent, 對於列印網頁內容請使用text) def main() : global SHARE_Q threads = [] start = time.clock() douban_url = "http://company.yktworld.com/comapny_search.asp?page={page}" #向佇列中放入任務, 真正使用時, 應該設定為可持續的放入任務 for index in xrange(20) : SHARE_Q.put(douban_url.format(page = index * 1)) for i in xrange(_WORKER_THREAD_NUM) : thread = MyThread(worker,i) thread.start() #執行緒開始處理任務 threads.append(thread) for thread in threads : thread.join() SHARE_Q.join() i = 0 with open("down.txt", "w+") as my_file : for page in _DATA : i += 1 for name in page: my_file.write(name + "n") print "Spider Successful!!!" end = time.clock() print u'抓取完成!' print u'總頁數:',i print u'總條數:',_Num print u'一共用時:',end-start,u'秒' if __name__ == '__main__': main()
#!/usr/bin/env python # -*- coding:utf-8 -*- """ Python爬蟲 Anthor: yangyongzhen Version: 0.0.2 Date: 2014-12-14 Language: Python2.7.8 Editor: Sublime Text2 """ import urllib2, re, string import threading, Queue, time import sys import os from bs4 import BeautifulSoup reload(sys) sys.setdefaultencoding('utf8') _DATA = [] FILE_LOCK = threading.Lock() SHARE_Q = Queue.Queue() #構造一個不限制大小的的佇列 _WORKER_THREAD_NUM = 3 #設定執行緒的個數 rootpath = os.getcwd()+u'/抓取的內容/' def makedir(path): if not os.path.isdir(path): os.makedirs(path) #建立抓取的根目錄 #makedir(rootpath) #顯示下載進度 def Schedule(a,b,c): ''''' a:已經下載的資料塊 b:資料塊的大小 c:遠端檔案的大小 ''' per = 100.0 * a * b / c if per > 100 : per = 100 print '%.2f%%' % per class MyThread(threading.Thread) : def __init__(self, func) : super(MyThread, self).__init__() #呼叫父類別的建構函式 self.func = func #傳入執行緒函數邏輯 def run(self) : self.func() def worker() : print 'work thread start...n' global SHARE_Q while not SHARE_Q.empty(): url = SHARE_Q.get() #獲得任務 my_page = get_page(url) find_title(my_page) #獲得當前頁面的電影名 #write_into_file(temp_data) time.sleep(1) SHARE_Q.task_done() def get_page(url) : """ 根據所給的url爬取網頁HTML Args: url: 表示當前要爬取頁面的url Returns: 返回抓取到整個頁面的HTML(unicode編碼) Raises: URLError:url引發的異常 """ try : html = urllib2.urlopen(url).read() my_page = html.decode("utf8") #my_page = unicode(html,'utf-8','ignore').encode('utf-8','ignore') #my_page = urllib2.urlopen(url).read().decode("utf8") except urllib2.URLError, e : if hasattr(e, "code"): print "The server couldn't fulfill the request." print "Error code: %s" % e.code elif hasattr(e, "reason"): print "We failed to reach a server. Please check your url and read the Reason" print "Reason: %s" % e.reason return my_page def find_title(my_page) : """ 通過返回的整個網頁HTML, 正則匹配前100的電影名稱 Args: my_page: 傳入頁面的HTML文字用於正則匹配 """ temp_data = [] movie_items = BeautifulSoup(my_page).findAll('h1') for index, item in enumerate(movie_items) : #print item #print item.h1 pat = re.compile(r'href="([^"]*)"') h = pat.search(str(item)) if h: #print h.group(0) href = h.group(1) print href temp_data.append(h.group(1)) #print h.group() #temp_data.append(item) #print item.find(re.compile("^a")) href = item.find(re.compile("^a")) #soup = BeautifulSoup(item) if item.a: #print item.a.string temp_data.append(item.a.string) #print href #pat = re.compile(r'title="([^"]*)"') #h = pat.search(str(href)) #if h: #print h.group(1) #temp_data.append(h.group(1)) _DATA.append(temp_data) def main() : global SHARE_Q threads = [] start = time.clock() douban_url = "http://movie.misszm.com/page/{page}" #向佇列中放入任務, 真正使用時, 應該設定為可持續的放入任務 for index in xrange(5) : SHARE_Q.put(douban_url.format(page = index * 1)) for i in xrange(_WORKER_THREAD_NUM) : thread = MyThread(worker) thread.start() #執行緒開始處理任務 threads.append(thread) for thread in threads : thread.join() SHARE_Q.join() with open("movie.txt", "w+") as my_file : for page in _DATA : for movie_name in page: my_file.write(movie_name + "n") print "Spider Successful!!!" end = time.clock() print u'抓取完成!' print u'一共用時:',end-start,u'秒' if __name__ == '__main__': main()
#coding=utf-8 #author:yangyongzhen #QQ:534117529 #'CardTest TcpServer - Simple Test Card Tool 1.00' import sys,threading,time; import serial; import binascii,encodings; import re; import os; from socket import * from struct import *; #from myutil import *; #name: myutil.py mylock = threading.RLock() Server_IP = '' Srever_Port = '' def print_hex1(s,prev='0x'): for c in s: print '%s%02x' %(prev,ord(c)), print def print_hex(s): for c in s: print '%02x' %(ord(c)), print def hexto_str(s): r ='' for c in s: r += '%02x' %(ord(c)) return r def strto_hex(s): r = s.decode('hex') return r #''代表伺服器為localhost #在一個非保留埠號上進行監聽 class ComThread: def __init__(self, Port=0): self.l_serial = None; self.alive = False; self.waitEnd = None; self.port = Port; #TCP部分 #self.sockobj = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.connection = None #資料 self.snddata = '' self.rcvdata = '' def waiting(self): if not self.waitEnd is None: self.waitEnd.wait(); def SetStopEvent(self): if not self.waitEnd is None: self.waitEnd.set(); self.alive = False; self.stop(); def start(self): self.l_serial = serial.Serial(); self.l_serial.port = self.port; self.l_serial.baudrate = 115200; self.l_serial.timeout = 2; #秒 self.l_serial.open(); if self.l_serial.isOpen(): self.waitEnd = threading.Event(); self.alive = True; print 'open serial port %d ok!n' %(self.port+1) print 'baudrate:115200 n' self.thread_read = None; self.thread_read = threading.Thread(target=self.FirstReader); self.thread_read.setDaemon(1); self.thread_read.start(); self.thread_write = None; self.thread_write = threading.Thread(target=self.FirstWriter); self.thread_write.setDaemon(1); self.thread_write.start(); #TCP部分 self.thread_TcpClient = None; self.thread_TcpClient = threading.Thread(target=self.TcpClient); self.thread_TcpClient.setDaemon(1); self.thread_TcpClient.start(); self.thread_TcpSend = None; self.thread_TcpSend = threading.Thread(target=self.TcpSend); self.thread_TcpSend.setDaemon(1); self.thread_TcpSend.start(); return True; else: return False; def FirstReader(self): while self.alive: # 接收間隔 time.sleep(0.1); try: data = ''; n = self.l_serial.inWaiting(); if n: data = data+self.l_serial.read(n); #for l in xrange(len(data)): #print '%02X' % ord(data[l]), # 傳送資料 print u'->請求:' print data; mylock.acquire() self.snddata = data mylock.release() #print_hex(data); # 判斷結束 except Exception, ex: print str(ex); self.waitEnd.set(); self.alive = False; def FirstWriter(self): while self.alive: # 接收間隔 time.sleep(0.1); try: #snddata = raw_input('nenter data send:n') if self.rcvdata!='': self.l_serial.write(self.rcvdata); print u'-<應答:' print self.rcvdata; mylock.acquire() self.rcvdata = ''; mylock.release() #print_hex(snddata); except Exception, ex: print str(ex); self.waitEnd.set(); self.alive = False; def TcpClient(self): while True: # 接收間隔 time.sleep(0.1); self.connection = socket(AF_INET, SOCK_STREAM); self.connection.connect((Server_IP, int(Server_Port))); print 'Connect to Server OK!'; self.snddata = '' self.rcvdata = '' while True: #讀取使用者端通訊端的下一行 data = self.connection.recv(1024) #如果沒有數量的話,那麼跳出迴圈 if not data: break #傳送一個回覆至使用者端 mylock.acquire() self.snddata = '' self.rcvdata = data mylock.release() #connection.send('Echo=>' + data) self.connection.close() self.waitEnd.set(); self.alive = False; def TcpSend(self): while True: # 接收間隔 time.sleep(0.1); while True: time.sleep(0.1); try: if not self.connection is None: if self.snddata != '': self.connection.send(self.snddata) mylock.acquire() self.rcvdata = '' self.snddata = '' mylock.release() except Exception, ex: pass def stop(self): self.alive = False; self.thread_read.join(); if self.l_serial.isOpen(): self.l_serial.close(); #測試用部分 if __name__ == '__main__': print 'Serial to Tcp Tool 1.00n' print 'Author:yangyongzhenn' print 'QQ:534117529n' print 'Copyright (c) **cap 2015-2016.n' Server_IP = raw_input('please enter ServerIP:') print 'Server_IP: %s' %(Server_IP) Server_Port = raw_input('please enter ServerPort:') print 'Server_Port: %s' %(Server_Port) com =raw_input('please enter com port(1-9):') rt = ComThread(int(com)-1); try: if rt.start(): rt.waiting(); rt.stop(); else: pass; except Exception,se: print str(se); if rt.alive: rt.stop(); os.system("pause") print ''; print 'End OK .'; del rt;
很早之前做過一個遠端讀卡器工具,原理就是在現場客服電腦上裝個python做的tcpserver伺服器端,操控現場的讀卡器。在公司內部做個使用者端連線過去,這樣實現在公司偵錯現場的卡片業務。
這個就是伺服器端工具的實現:
#coding=utf-8 #author:yangyongzhen #QQ:534117529 #'CardTest TcpServer - Simple Test Card Tool 1.00' import sys,threading,time; import serial; import binascii,encodings; import re; import os; from socket import * from struct import *; #from myutil import *; #name: myutil.py mylock = threading.RLock() def print_hex1(s,prev='0x'): for c in s: print '%s%02x' %(prev,ord(c)), print def print_hex(s): for c in s: print '%02x' %(ord(c)), print def hexto_str(s): r ='' for c in s: r += '%02x' %(ord(c)) return r def strto_hex(s): r = s.decode('hex') return r #''代表伺服器為localhost #在一個非保留埠號上進行監聽 class ComThread: def __init__(self, Port=0): self.l_serial = None; self.alive = False; self.waitEnd = None; self.port = Port; #TCP部分 self.myHost = '' self.myPort = 5050 self.sockobj = socket(AF_INET, SOCK_STREAM) self.connection = None #資料 self.snddata = '' self.rcvdata = '' def waiting(self): if not self.waitEnd is None: self.waitEnd.wait(); def SetStopEvent(self): if not self.waitEnd is None: self.waitEnd.set(); self.alive = False; self.stop(); def start(self): self.l_serial = serial.Serial(); self.l_serial.port = self.port; self.l_serial.baudrate = 115200; self.l_serial.timeout = 2; #秒 self.l_serial.open(); if self.l_serial.isOpen(): self.waitEnd = threading.Event(); self.alive = True; print 'open serial port %d ok!n' %(self.port+1) print 'baudrate:115200 n' self.thread_read = None; self.thread_read = threading.Thread(target=self.FirstReader); self.thread_read.setDaemon(1); self.thread_read.start(); self.thread_write = None; self.thread_write = threading.Thread(target=self.FirstWriter); self.thread_write.setDaemon(1); self.thread_write.start(); #TCP部分 self.thread_TcpServer = None; self.thread_TcpServer = threading.Thread(target=self.TcpServer); self.thread_TcpServer.setDaemon(1); self.thread_TcpServer.start(); self.thread_TcpSend = None; self.thread_TcpSend = threading.Thread(target=self.TcpSend); self.thread_TcpSend.setDaemon(1); self.thread_TcpSend.start(); return True; else: return False; def FirstReader(self): while self.alive: # 接收間隔 time.sleep(0.1); try: data = ''; n = self.l_serial.inWaiting(); if n: data = data+self.l_serial.read(n); #for l in xrange(len(data)): #print '%02X' % ord(data[l]), # 傳送資料 print 'serial recv:' print data; mylock.acquire() self.snddata = data mylock.release() #print_hex(data); # 判斷結束 except Exception, ex: print str(ex); self.waitEnd.set(); self.alive = False; def FirstWriter(self): while self.alive: # 接收間隔 time.sleep(0.1); try: #snddata = raw_input('nenter data send:n') if self.rcvdata!='': self.l_serial.write(self.rcvdata); print 'serial send:' print self.rcvdata; mylock.acquire() self.rcvdata = ''; mylock.release() #print_hex(snddata); except Exception, ex: print str(ex); self.waitEnd.set(); self.alive = False; def TcpServer(self): self.sockobj.bind((self.myHost, self.myPort)) self.sockobj.listen(10) print 'TcpServer listen at 5050 oK!n' print 'Waiting for connect...n' while True: # 接收間隔 time.sleep(0.1); self.connection, address = self.sockobj.accept() print 'Server connected by', address self.snddata = '' self.rcvdata = '' try: while True: #讀取使用者端通訊端的下一行 data = self.connection.recv(1024) #如果沒有數量的話,那麼跳出迴圈 if not data: break #傳送一個回覆至使用者端 mylock.acquire() self.snddata = '' self.rcvdata = data mylock.release() #connection.send('Echo=>' + data) self.connection.close() except Exception, ex: self.connection.close() self.waitEnd.set(); self.alive = False; def TcpSend(self): while True: # 接收間隔 time.sleep(0.1); while True: time.sleep(0.1); try: if not self.connection is None: if self.snddata != '': self.connection.send(self.snddata) mylock.acquire() self.rcvdata = '' self.snddata = '' mylock.release() except Exception, ex: pass def stop(self): self.alive = False; self.thread_read.join(); if self.l_serial.isOpen(): self.l_serial.close(); #測試用部分 if __name__ == '__main__': print 'CardTest TcpServer - Simple Test Card Tool 1.00n' print 'Author:yangyongzhenn' print 'QQ:534117529n' print 'Copyright (c) **** 2015-2016.n' com =raw_input('please enter com port(1-9):') rt = ComThread(int(com)-1); try: if rt.start(): rt.waiting(); rt.stop(); else: pass; except Exception,se: print str(se); if rt.alive: rt.stop(); os.system("pause") print ''; print 'End OK .'; del rt;
# -*- coding: utf-8 -*- ''' filename:rtcp.py @desc: 利用python的socket埠轉發,用於遠端維護 如果連線不到遠端,會sleep 36s,最多嘗試200(即兩小時) @usage: ./rtcp.py stream1 stream2 stream為:l:port或c:host:port l:port表示監聽指定的本地埠 c:host:port表示監聽遠端指定的埠 @author: watercloud, zd, knownsec team @web: www.knownsec.com, blog.knownsec.com @date: 2009-7 ''' import socket import sys import threading import time streams = [None, None] # 存放需要進行資料轉發的兩個資料流(都是SocketObj物件) debug = 1 # 偵錯狀態 0 or 1 def print_hex(s): for c in s: print '%02x' %(ord(c)), print def _usage(): print 'Usage: ./rtcp.py stream1 stream2nstream : L:port or C:host:port' def _get_another_stream(num): ''' 從streams獲取另外一個流物件,如果當前為空,則等待 ''' if num == 0: num = 1 elif num == 1: num = 0 else: raise "ERROR" while True: if streams[num] == 'quit': print("can't connect to the target, quit now!") sys.exit(1) if streams[num] != None: return streams[num] else: time.sleep(1) def _xstream(num, s1, s2): ''' 交換兩個流的資料 num為當前流編號,主要用於偵錯目的,區分兩個迴路狀態用。 ''' try: while True: #注意,recv函數會阻塞,直到對端完全關閉(close後還需要一定時間才能關閉,最快關閉方法是shutdow) buff = s1.recv(1024) if debug > 0: print num,"recv" if len(buff) == 0: #對端關閉連線,讀不到資料 print num,"one closed" break s2.sendall(buff) if debug > 0: print num,"sendall" print_hex(buff) except : print num,"one connect closed." try: s1.shutdown(socket.SHUT_RDWR) s1.close() except: pass try: s2.shutdown(socket.SHUT_RDWR) s2.close() except: pass streams[0] = None streams[1] = None print num, "CLOSED" def _server(port, num): ''' 處理服務情況,num為流編號(第0號還是第1號) ''' srv = socket.socket(socket.AF_INET, socket.SOCK_STREAM) srv.bind(('0.0.0.0', port)) srv.listen(1) #print 'local listening at port %d' (%(port)) while True: conn, addr = srv.accept() print "connected from:", addr streams[num] = conn # 放入本端流物件 s2 = _get_another_stream(num) # 獲取另一端流物件 _xstream(num, conn, s2) def _connect(host, port, num): ''' 處理連線,num為流編號(第0號還是第1號) @note: 如果連線不到遠端,會sleep 36s,最多嘗試200(即兩小時) ''' not_connet_time = 0 wait_time = 36 try_cnt = 199 while True: if not_connet_time > try_cnt: streams[num] = 'quit' print('not connected') return None conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM) try: conn.connect((host, port)) except Exception, e: print ('can not connect %s:%s!' % (host, port)) not_connet_time += 1 time.sleep(wait_time) continue print "connected to %s:%i" % (host, port) streams[num] = conn #放入本端流物件 s2 = _get_another_stream(num) #獲取另一端流物件 _xstream(num, conn, s2) if __name__ == '__main__': print 'Tcp to Tcp Tool 1.00n' print 'Author:yangyongzhenn' print 'QQ:534117529n' print 'Copyright (c) Newcapec 2015-2016.n' Server_IP = raw_input('please enter Server IP:') print 'Server_IP: %s' %(Server_IP) Server_Port = raw_input('please enter Server Port:') print 'Server_Port: %s' %(Server_Port) com =raw_input('please enter Local Port:') tlist = [] # 執行緒列表,最終存放兩個執行緒物件 #targv = [sys.argv[1], sys.argv[2] ] t = threading.Thread(target=_server, args=(int(com), 0)) tlist.append(t) t = threading.Thread(target=_connect, args=(Server_IP, int(Server_Port), 1)) tlist.append(t) for t in tlist: t.start() for t in tlist: t.join() sys.exit(0)
# -*- coding:utf8 -*- from ctypes import * from binascii import unhexlify as unhex import os dll = cdll.LoadLibrary('mydll.dll'); print 'begin load mydll..' #key #str1='x9BxEDx98x89x15x80xC3xB2' str1=unhex('0000556677222238') #data str2=unhex('002d2000000100015566772222383CD881604D0D286A556677222238000020141214181427') #output str3='x12x34x56x78x12x34x56x78' pstr1=c_char_p() pstr2=c_char_p() pstr3=c_char_p() pstr1.value=str1 pstr2.value=str2 pstr3.value=str3 dll.CurCalc_DES_MAC64(805306481,pstr1,0,pstr2,13,pstr3) print pstr1 print pstr2 print pstr3 stro= pstr3.value print stro strtemp='' for c in stro: print "%02x" % (ord(c)) strtemp+="{0:02x}".format(ord(c)) print strtemp os.execlp("E:\RSA.exe",'') s=raw_input('press any key to continue...')
# -*- coding: utf-8 -*- import socket from myutil import * from binascii import unhexlify as unhex from ctypes import * dll = cdll.LoadLibrary('mydll.dll') print 'begin load mydll..' HOST, PORT = "192.168.51.28", 5800 sd ="1234567812345678" # Create a socket (SOCK_STREAM means a TCP socket) sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) try: # Connect to server and send data sock.connect((HOST, int(PORT)) print "Sent1 OK:" print sd # Receive data from the server and shut down received = sock.recv(1024) print "Received:" print_hex(received) print 'received len is 0x%02x' %(len(received)) print 'received data analysis...' re1=received[0:4] print_hex(re1) re1=received[4:6] print_hex(re1) re1=received[6:10] print_hex(re1) re1=received[10:16] print_hex(re1) #pack2 send sock.send(sd2.decode('hex')) print "Sent2 OK:" print sd2 # Receive data from the server and shut down received1 = sock.recv(1024) print "Received1:" print_hex(received1) print 'received1 len is 0x%02x' %(len(received1)) finally: sock.close() s=raw_input('press any key to continue...')
# -*- coding: gb2312 -*- import socket from myutil import * from binascii import unhexlify as unhex from ctypes import * dll = cdll.LoadLibrary('mydll.dll') print 'begin load mydll..' #key key='xF1xE2xD3xC4xF1xE2xD3xC4' #output MAC mac='x00'*8 data='x00'*8 pkey=c_char_p() pdata=c_char_p() pmac=c_char_p() pkey.value=key pdata.value=data pmac.value=mac #pack1 class pack: pass pk=pack() pk.len='00000032' pk.ID='0001' pk.slnum='00000004' pk.poscode='123456781234' pk.rand='1122334455667788' pk.psam='313233343536' pk.kind='0000' pk.ver='000001' pk.time='20140805135601' pk.mac='06cc571e6d96e12d' data=unhex(pk.len+pk.ID+pk.slnum+pk.poscode+pk.rand+pk.psam+pk.kind+pk.ver+pk.time) #print_hex(data) pdata.value=data #cacl MAC dll.CurCalc_DES_MAC64(805306481,pkey,0,pdata,42,pmac) stro= pmac.value strtemp='' for c in stro: strtemp+="{0:02x}".format(ord(c)) #print strtemp pk.mac=strtemp #data to send sd=pk.len+pk.ID+pk.slnum+pk.poscode+pk.rand+pk.psam+pk.kind+pk.ver+pk.time+pk.mac print 'send1 len is 0x%02x' %(len(sd)/2) print sd #pack2 class pack2: pass pk2=pack2() pk2.len='0000006E' pk2.ID='0012' pk2.slnum='00000005' pk2.fatCode='00' pk2.cardASN='0000000000000000' pk2.cardType='00' pk2.userNO= '0000000000000000' pk2.fileName1='00000000000000000000000000000015' pk2.dataLen1='00' pk2.dataArea1='00000000000000319999990800FB2014080620240806FFFFFFFFFFFFFFFFFFFF' pk2.fileName2='00000000000000000000000000000016' pk2.dataLen2='00' pk2.dataArea2='000003E800FFFF16' pk2.mac='06cc571e6d96e12d' data2=unhex(pk2.len+pk2.ID+pk2.slnum+pk2.fatCode+pk2.cardASN+pk2.cardType+pk2.userNO+pk2.fileName1+pk2.dataLen1+pk2.dataArea1+pk2.fileName2+pk2.dataLen2+pk2.dataArea2) pdata.value=data2 #cacl MAC dll.CurCalc_DES_MAC64(805306481,pkey,0,pdata,102,pmac) stro= pmac.value strtemp='' for c in stro: strtemp+="{0:02x}".format(ord(c)) #print strtemp pk2.mac=strtemp #data to send sd2=pk2.len+pk2.ID+pk2.slnum+pk2.fatCode+pk2.cardASN+pk2.cardType+pk2.userNO+pk2.fileName1+pk2.dataLen1+pk2.dataArea1+pk2.fileName2+pk2.dataLen2+pk2.dataArea2+pk2.mac print 'send2 len is 0x%02x' %(len(sd2)/2) print sd2 #PORT="192.168.60.37" #PORT="localhost" HOST, PORT = "192.168.51.28", 5800 # Create a socket (SOCK_STREAM means a TCP socket) sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) try: # Connect to server and send data sock.connect((HOST, int(PORT)) #data= "123456789" #s = struct.pack('bbb',1,2,3) sock.send(sd.decode('hex')) print "Sent1 OK:" print sd # Receive data from the server and shut down received = sock.recv(1024) print "Received:" print_hex(received) print 'received len is 0x%02x' %(len(received)) print 'received data analysis...' re1=received[0:4] print_hex(re1) re1=received[4:6] print_hex(re1) re1=received[6:10] print_hex(re1) re1=received[10:16] print_hex(re1) #pack2 send sock.send(sd2.decode('hex')) print "Sent2 OK:" print sd2 # Receive data from the server and shut down received1 = sock.recv(1024) print "Received1:" print_hex(received1) print 'received1 len is 0x%02x' %(len(received1)) finally: sock.close() s=raw_input('press any key to continue...')
# -*- coding: utf-8 -*- from myutil import * from binascii import unhexlify as unhex import os path=os.getcwd() path+='\rec04.bin' #print path print "begin ans......" f1=open(path,'rb') for i in range(1,35): s=f1.read(280) print "data:",i print_hex(s) print 'read data is:' print_hex(s) recstatadd = 187 print "終端編號:" print_hex(s[recstatadd:recstatadd+10]) print "卡號長度:" print_hex(s[10]) print "卡號: " print_hex(s[11:11+10]) print "持卡序號1+所屬地城市程式碼2+交易地城市程式碼2" print_hex(s[recstatadd+22:recstatadd+22+5]) print "應用交易計數器" print_hex(s[92:92+2]) print "交易前餘額4,交易金額3" print_hex(s[recstatadd+29:recstatadd+29+7]) print "交易日期:" print_hex(s[99:99+3]) print "交易時間:" print_hex(s[44:44+3]) print "終端編號" print_hex(s[21:21+8]) print "商戶編號" print_hex(s[21+8:21+8+15]) print "批次號" print_hex(s[5:5+3]) print "應用密文" print_hex(s[47:47+8]) print "授權金額" print_hex(s[103:103+6]) print "其他金額" print_hex(s[115:115+6]) print "終端驗證結果" print_hex(s[94:5+94]) print "應用交易計數器" print_hex(s[92:92+4]) print "卡片驗證結果" print_hex(s[56:56+32]) print "卡片序列號:" print_hex(s[131]) f1.close()
# -*- coding:utf8 -*- # 2013.12.36 19:41 # 抓取dbmei.com的圖片。 from bs4 import BeautifulSoup import os, sys, urllib2,time,random # 建立資料夾 path = os.getcwd() # 獲取此指令碼所在目錄 new_path = os.path.join(path,u'暴走漫畫') if not os.path.isdir(new_path): os.mkdir(new_path) def page_loop(page=1): url = 'http://baozoumanhua.com/all/hot/page/%s?sv=1389537379' % page content = urllib2.urlopen(url) soup = BeautifulSoup(content) my_girl = soup.find_all('div',class_='img-wrap') for girl in my_girl: jokes = girl.find('img') link = jokes.get('src') flink = link print flink content2 = urllib2.urlopen(flink).read() #with open(u'暴走漫畫'+'/'+time.strftime('%H-%M-%S')+random.choice('qwertyuiopasdfghjklzxcvbnm')+flink[-5:],'wb') as code: #在OSC上現學的 with open(u'暴走漫畫'+'/'+flink[-11:],'wb') as code: code.write(content2) page = int(page) + 1 print u'開始抓取下一頁' print 'the %s page' % page page_loop(page) page_loop()
#!/usr/bin/env python # -*- coding: utf-8 -*- # by yangyongzhen # 2016-12-06 from bs4 import BeautifulSoup import urllib,urllib2,os,time import re rootpath = os.getcwd()+u'/抓取的模板/' def makedir(path): if not os.path.isdir(path): os.makedirs(path) #建立抓取的根目錄 makedir(rootpath) #顯示下載進度 def Schedule(a,b,c): ''''' a:已經下載的資料塊 b:資料塊的大小 c:遠端檔案的大小 ''' per = 100.0 * a * b / c if per > 100 : per = 100 print '%.2f%%' % per def grabHref(url,listhref,localfile): html = urllib2.urlopen(url).read() html = unicode(html,'gb2312','ignore').encode('utf-8','ignore') content = BeautifulSoup(html).findAll('link') myfile = open(localfile,'w') pat = re.compile(r'href="([^"]*)"') pat2 = re.compile(r'http') for item in content: h = pat.search(str(item)) href = h.group(1) if pat2.search(href): ans = href else: ans = url+href listhref.append(ans) myfile.write(ans) myfile.write('rn') print ans content = BeautifulSoup(html).findAll('script') pat = re.compile(r'src="([^"]*)"') pat2 = re.compile(r'http') for item in content: h = pat.search(str(item)) if h: href = h.group(1) if pat2.search(href): ans = href else: ans = url+href listhref.append(ans) myfile.write(ans) myfile.write('rn') print ans content = BeautifulSoup(html).findAll('a') pat = re.compile(r'href="([^"]*)"') pat2 = re.compile(r'http') for item in content: h = pat.search(str(item)) if h: href = h.group(1) if pat2.search(href): ans = href else: ans = url+href listhref.append(ans) myfile.write(ans) myfile.write('rn') print ans myfile.close() def main(): url = "http://192.168.72.140/qdkj/" #採集網頁的地址 listhref =[] #連結地址 localfile = 'ahref.txt' #儲存連結地址為本地檔案,檔名 grabHref(url,listhref,localfile) listhref = list(set(listhref)) #去除連結中的重複地址 curpath = rootpath start = time.clock() for item in listhref: curpath = rootpath name = item.split('/')[-1] fdir = item.split('/')[3:-1] for i in fdir: curpath += i curpath += '/' print curpath makedir(curpath) local = curpath+name urllib.urlretrieve(item, local,Schedule) # 遠端儲存函數 end = time.clock() print u'模板抓取完成!' print u'一共用時:',end-start,u'秒' if __name__=="__main__": main()
到此這篇關於python常用小指令碼的文章就介紹到這了,更多相關python常用小指令碼內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!
相關文章
<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
综合看Anker超能充系列的性价比很高,并且与不仅和iPhone12/苹果<em>Mac</em>Book很配,而且适合多设备充电需求的日常使用或差旅场景,不管是安卓还是Switch同样也能用得上它,希望这次分享能给准备购入充电器的小伙伴们有所
2021-06-01 09:31:42
除了L4WUDU与吴亦凡已经多次共事,成为了明面上的厂牌成员,吴亦凡还曾带领20XXCLUB全队参加2020年的一场音乐节,这也是20XXCLUB首次全员合照,王嗣尧Turbo、陈彦希Regi、<em>Mac</em> Ova Seas、林渝植等人全部出场。然而让
2021-06-01 09:31:34
目前应用IPFS的机构:1 谷歌<em>浏览器</em>支持IPFS分布式协议 2 万维网 (历史档案博物馆)数据库 3 火狐<em>浏览器</em>支持 IPFS分布式协议 4 EOS 等数字货币数据存储 5 美国国会图书馆,历史资料永久保存在 IPFS 6 加
2021-06-01 09:31:24
开拓者的车机是兼容苹果和<em>安卓</em>,虽然我不怎么用,但确实兼顾了我家人的很多需求:副驾的门板还配有解锁开关,有的时候老婆开车,下车的时候偶尔会忘记解锁,我在副驾驶可以自己开门:第二排设计很好,不仅配置了一个很大的
2021-06-01 09:30:48
不仅是<em>安卓</em>手机,苹果手机的降价力度也是前所未有了,iPhone12也“跳水价”了,发布价是6799元,如今已经跌至5308元,降价幅度超过1400元,最新定价确认了。iPhone12是苹果首款5G手机,同时也是全球首款5nm芯片的智能机,它
2021-06-01 09:30:45