通常HTTP都是TCP的短连接,但本案例中传输的是视频流,短连接无法实现,可以使用服务器数据推送的方式。也就是在一次HTTP请求当中,一直使用一个TCP连接持续向客户端发送数据。
说明本地数据local的特点
例子一:
>>> import threading>>> L = threading.local() #创建一个本地数据>>> >>> L.x =1 #创建本地对象L的一个属性x>>> L.x #在主线程中可以访问这个属性1>>> def f(): print(L.x) >>> f() #在主线程中可以访问这个属性1>>> threading.Thread(target=f).start() #新创建一个线程并启动,通过异常可以看出,找不到local这个对象L没有属性xException in thread Thread-15:Traceback (most recent call last): File "C:\Python27\lib\threading.py", line 801, in __bootstrap_inner self.run() File "C:\Python27\lib\threading.py", line 754, in run self.__target(*self.__args, **self.__kwargs) File "", line 3, in fAttributeError: 'thread._local' object has no attribute 'x'
例子二:
>>> import threading>>> L = threading.local()>>> >>> L.x =1>>> L.x1>>> def f(): print(L.x) >>> f()1>>> def g(): L.x =5 print (L.x) >>> threading.Thread(target = g).start()5>>> L.x1
说明:同一个本地数据的对象L,但在不同线程中,他的x属性的值不同。
>>> from select import select>>> help(select)Help on built-in function select in module select:select(...) select(rlist, wlist, xlist[, timeout]) -> (rlist, wlist, xlist) Wait until one or more file descriptors are ready for some kind of I/O. The first three arguments are sequences of file descriptors to be waited for: rlist -- wait until ready for reading wlist -- wait until ready for writing xlist -- wait for an ``exceptional condition'' If only one kind of condition is required, pass [] for the other lists. A file descriptor is either a socket or file object, or a small integer gotten from a fileno() method call on one of those. The optional 4th argument specifies a timeout in seconds; it may be a floating point number to specify fractions of seconds. If it is absent or None, the call will never time out. The return value is a tuple of three lists corresponding to the first three arguments; each contains the subset of the corresponding file descriptors that are ready. *** IMPORTANT NOTICE *** On Windows and OpenVMS, only sockets are supported; on Unix, all filedescriptors can be used.
>>> d = {0:kobe,1:bob}>>> for x in d.itervalues(): print xkobebob
1、例子实现的代码:
import os, cv2, time, struct, threadingfrom BaseHTTPServer import HTTPServer, BaseHTTPRequestHandlerfrom SocketServer import TCPServer, ThreadingTCPServerfrom threading import Thread, RLockfrom select import selectclass JpegStreamer(Thread): def __init__(self, camera): Thread.__init__(self) self.cap = cv2.VideoCapture(camera) self.lock = RLock() self.pipes = {} def register(self): pr, pw = os.pipe() self.lock.acquire() self.pipes[pr] = pw self.lock.release() return pr def unregister(self, pr): self.lock.acquire() self.pipes.pop(pr) self.lock.release() pr.close() pw.close() def capture(self): cap = self.cap while cap.isOpened(): ret, frame = cap.read() if ret: #ret, data = cv2.imencode('.jpg', frame) ret, data = cv2.imencode('.jpg', frame, (cv2.IMWRITE_JPEG_QUALITY, 40)) yield data.tostring() def send(self, frame): n = struct.pack('l', len(frame)) self.lock.acquire() if len(self.pipes): _, pipes, _ = select([], self.pipes.itervalues(), [], 1) for pipe in pipes: os.write(pipe, n) os.write(pipe, frame) self.lock.release() def run(self): for frame in self.capture(): self.send(frame)class JpegRetriever(object): def __init__(self, streamer): self.streamer = streamer self.local = threading.local() def retrieve(self): while True: ns = os.read(self.local.pipe, 8) n = struct.unpack('l', ns)[0] data = os.read(self.local.pipe, n) yield data def __enter__(self): if hasattr(self.local, 'pipe'): raise RuntimeError() self.local.pipe = streamer.register() return self.retrieve() def __exit__(self, *args): self.streamer.unregister(self.local.pipe) del self.local.pipe return Trueclass Handler(BaseHTTPRequestHandler): retriever = None @staticmethod def setJpegRetriever(retriever): Handler.retriever = retriever def do_GET(self): if self.retriever is None: raise RuntimeError('no retriver') if self.path != '/': return self.send_response(200) self.send_header("Content-type", 'multipart/x-mixed-replace;boundary=abcde') self.end_headers() with self.retriever as frames: for frame in frames: self.send_frame(frame) def send_frame(self, frame): self.wfile.write('--abcde\r\n') self.wfile.write('Content-Type: image/jpeg\r\n') self.wfile.write('Content-Length: %d\r\n\r\n' % len(frame)) self.wfile.write(frame)if __name__ == '__main__': streamer = JpegStreamer(0) streamer.start() retriever = JpegRetriever(streamer) Handler.setJpegRetriever(retriever) print 'Start server...' httpd = ThreadingTCPServer(('', 9000), Handler) httpd.serve_forever()
2、通过串口摄像头实现例子:
自己的台式机,不像笔记本自带摄像头,所以通过《python使用ctypes模块调用C语言动态库》中实现了,存储一个串口摄像头的照片。现在修改代码,并在代码中比较详细注释了。
# -*- coding: cp936 -*-import os, cv2, time, struct, threadingfrom BaseHTTPServer import HTTPServer, BaseHTTPRequestHandlerfrom SocketServer import TCPServer, ThreadingTCPServerfrom threading import Thread, RLockfrom select import selectfrom time import sleep#数据源,不断从摄像头获取数据,并发送到管道中去。class JpegStreamer(Thread): def __init__(self, camera): Thread.__init__(self) #self.cap = cv2.VideoCapture(camera) # 打开摄像头文件 openCV是图像处理,视觉工具,可以打开图像或视频 self.lock = RLock() #RLock递归锁RLock允许在同一线程中被多次acquire。而Lock却不允许这种情况 #使用RLock,那么acquire和release必须成对出现,即调用了n次acquire,必须调用n次的release才能真正释放所占用的琐 self.pipes = {} """ 注册接口,想使用源中的数据,需要调用 这个接口。 将数据源写入管道,用户从管道读出数据 """ def register(self): pr, pw = os.pipe() # 创建一个管道 self.lock.acquire() #对列表进行互斥锁操作 self.pipes[pr] = pw # 维护管道的写端 把写端入在了pipes字典 {pr:pw} self.lock.release() return pr # 把读端返回给用户接口 def unregister(self, pr): self.lock.acquire() pw = self.pipes.pop(pr) #将读的键删除值删除 self.lock.release() pr.close() #关闭读写管道 pw.close() """ 图像采集函数 """ """ #装饰器(decorator)可以给函数动态加上功能 # @property装饰器就是负责把一个方法变成属性调用的 # 把一个getter方法变成属性,只需要加上@property就可以了, 此时,本例中@property本身又创建了另一个装饰器@capture.setter,负责把一个setter方法变成属性赋值,于是,我们就拥有一个可控的属性操作 @capture.setter def capture(self): xxxx 只不过这里只可以定义只读属性,只定义getter方法,不定义setter方法就是一个只读属性 因为调用capture()函数时,"()"没有加,就相当于使用他的属性,定义函数时就要使用@property装饰器 """ @property def capture(self): ''' cap = self.cap while cap.isOpened(): ret, frame = cap.read() # 从摄像头获取一帧数据 if ret: """ 返回一个生成器对象, """ # ret, data = cv2.imencode('.jpg', frame) ret, data = cv2.imencode('.jpg', frame, (cv2.IMWRITE_JPEG_QUALITY, 40)) # 将获取的数据帧编码成jpg图片 yield data.tostring() ''' while True: frame = cv2.imread(r"D:\test123.jpg") #opencv库函数 从地址文件获取照片数据,解码器以像素BGR顺序存储到矩阵中。通过照片的内容决定照片的格式,而不是通过后缀名实现的 ret, data = cv2.imencode('.jpg', frame, (cv2.IMWRITE_JPEG_QUALITY, 40)) # 将获取的数据帧编码成jpg图片。 将解码后的数据矩阵编码成相应的格式 返回是一个buffer yield data.tostring() #返回一个生成器,变成generator的函数,在每次调用next()或for循环迭代的时候执行,遇到yield语句返回,再次执行时从上次返回的yield语句处继续执行。 #tostring具体用法不太知道,这是openCV的依赖库numpy模块里的函数。应该是转换将16进制转为相应的字符串 0x16改为”16“,python写文件是写的字符串或者是反过来的????? sleep(0.2) #延时200ms """ 将其中的一帧发送到所有已注册的管道中 """ def send(self, frame): n = struct.pack('l', len(frame)) #struct模块用于python的值和c结构转换。官方文档https://docs.python.org/3/library/struct.html#format-characters #这里的意思是把frame的长度按long型转换 如3转换为b‘\x03x00\x00\x00’即0x00000003 """ >>> from struct import * >>> pack('hhl', 1, 2, 3) #h 对应C_TYPE 为 short b'\x00\x01\x00\x02\x00\x00\x00\x03' >>> unpack('hhl', b'\x00\x01\x00\x02\x00\x00\x00\x03') (1, 2, 3) """ self.lock.acquire() if len(self.pipes): #self.pipes字典里有值时,注册后自然就会有值 了 #_, pipes, _ = select([], self.pipes.itervalues(), [], 1) #linux里可以用select监测socket和文件的数据,但windows只能监测socket里的,不能监测文件的 sleep(0.2) """ | iteritems(...) | D.iteritems() -> an iterator over the (key, value) items of D | | iterkeys(...) | D.iterkeys() -> an iterator over the keys of D | | itervalues(...) | D.itervalues() -> an iterator over the values of D """ for pipe in self.pipes.itervalues(): #返回字典值的生成器,pipes的键是读pipe 值 是写pipe #if(len(pipe)): os.write(pipe, n) #把长度写入pipe os.write(pipe, frame) #把图像帧写入pipe self.lock.release() """ 从capture()获取一帧数据,再发送到管道中去 """ def run(self): for frame in self.capture: #循环迭代获取的图片,并把图片数据发送到管道中 self.send(frame)class JpegRetriever(object): def __init__(self, streamer): self.streamer = streamer # 持有一个streamer对象(数据源对象) self.local = threading.local() # 需要将每个线程使用的管道,实现线程本地数据,以后每注册一个管道都应该成为这个 self.local的一个属性 # self.pipe = streamer.register() #多线程就是多线程Retriever来处理,这里用相同的本地数据在不同线程不同值的特点. # 调用注册register()接口拿道管道 def retrieve(self): while True: ns = os.read(self.local.pipe, 8) #写入的时候明明是4个字节long的长度,读出时为什么读8个长度 n = struct.unpack('l', ns)[0] #unpack返回的是一个元组,即使只有一个元素如unpack('l','\x03\x00\x00\x00') 返回 (3,)返回这里加了[0]表示返回元组的第0项 data = os.read(self.local.pipe, n) #从管道中读出图片数据 yield data #返回生成器,图片数据的生成器,向Handler返回 """ retriever是单线程时没有—__enter__()。 有注册也有注销,最好实现成上下文管理器。__enter__() """ """ 使用管道时,每次使用都需要注册,使用完后都需要注销,这样最好使用上下文管理 实现上下文管理器 __enter__进入函数 """ def __enter__(self): # self.pipe = streamer.register() if hasattr(self.local, 'pipe'): # 为了避免重复进入enter,判断local下是否有pipe属性,没有时,才注册。hasattr()查看某一对象是否包含某一属性。 raise RuntimeError() self.local.pipe = streamer.register() # 注册成为一个本地的管道 return self.retrieve() # 得到每一帧的生成器 """ def cleanup(self): self.streamer.unregister(self.pipe) """ """ 上下文管理器的退出函数,这里先不关心异常,用*args收集异常,压制所有异常 """ def __exit__(self, *args): self.streamer.unregister(self.local.pipe) del self.local.pipe # 使用完毕后删除本地属性 return Trueclass Handler(BaseHTTPRequestHandler): #用于实现HTTP服务器,但自身不能实现,需要通过继承子类,实现do_METHOD(METHOD为具体的方法如GET、POST等) retriever = None @staticmethod #装饰器 使setJpegRetriever成为静态的方法。参见C++的类里静态方法,只能类调用 ,不能实例调用 def setJpegRetriever(retriever): Handler.retriever = retriever #将传入的retriever对象赋值给类的retriever的对象 def do_GET(self): if self.retriever is None: raise RuntimeError('no retriver') """ http响应头部的构造 """ if self.path != '/': return self.send_response(200) self.send_header("Content-type", 'multipart/x-mixed-replace;boundary=abcde') self.end_headers() """ #调用retrieve()每次拿到一帧数据并通过send_frame()变成hettp的响应发送出去 for frame in self.retriever.retrieve(): self.send_frame(frame) """ """ 使用上下文管理,得到数据帧生成器 """ with self.retriever as frames: # 增加上下文管理 打开retriever,循环迭代 得到图到每一帧的数据 for frame in frames: self.send_frame(frame) #调用send_frame 将图片的的数据发送到http中去 def send_frame(self, frame): self.wfile.write('--abcde\r\n') self.wfile.write('Content-Type: image/jpeg\r\n') self.wfile.write('Content-Length: %d\r\n\r\n' % len(frame)) self.wfile.write(frame)if __name__ == '__main__': streamer = JpegStreamer(0) #定义JpegStreamer线程类的对象 streamer.start() #线程启动调用 run()方法 retriever = JpegRetriever(streamer) #定义JpegRetriever类的实例, Handler.setJpegRetriever(retriever) print 'Start server...' # httpd = TCPServer(('', 9000), Handler) #TCPServer和ThreadingTCPServer的差别是。在处理每一次的http请求的时候,ThreadingTCPServer会创建一个独立的线程来执行Handler中的do_GET httpd = ThreadingTCPServer(('', 9000), Handler) httpd.serve_forever() #启动服务
只有一个 JpegRetriever对象,也就是只有一个管道,在多链接时,也就是多线程时是不行的。每个客户端(每个线程应该有一个独立的管道) 使用线程本地数据,可以解决这个问题,一个JpegRetriever对象,创建本地数据管道,这样每个线程的数据都是独立的。
下面是实验中的结果图:
3、串口摄像头实现视频播放
上面只是使用现有图片做为测试,下面使用摄像头,使用了ctypes模块调用串口摄像头C语言驱动,再进行测试
import os, cv2, time, struct, threadingfrom BaseHTTPServer import HTTPServer, BaseHTTPRequestHandlerfrom SocketServer import TCPServer, ThreadingTCPServerfrom threading import Thread, RLockfrom select import selectimport ctypesfrom time import sleepclass JpegStreamer(Thread): def __init__(self, camera): Thread.__init__(self) #self.cap = cv2.VideoCapture(camera) self.testdll = ctypes.CDLL(r"D:\project program\photo\VS_dll\photo_dll\photo_dll.dll") ret = self.testdll.ComCamInit() type_uchar_array_20k = ctypes.c_char * (20 * 1024) self.my_array = type_uchar_array_20k() self.uwLenPara = ctypes.c_ushort(0) self.testdll.BSP_CameraGetPhoto.restype = ctypes.c_ubyte self.lock = RLock() self.pipes = {} def register(self): pr, pw = os.pipe() self.lock.acquire() self.pipes[pr] = pw self.lock.release() return pr def unregister(self, pr): self.lock.acquire() self.pipes.pop(pr) self.lock.release() pr.close() pw.close() ret = self.testdll.BSP_RearCommClose() def capture(self): """ cap = self.cap while cap.isOpened(): ret, frame = cap.read() if ret: #ret, data = cv2.imencode('.jpg', frame) ret, data = cv2.imencode('.jpg', frame, (cv2.IMWRITE_JPEG_QUALITY, 40)) yield data.tostring() """ while True: ret = self.testdll.BSP_CameraGetPhoto(1, self.my_array, ctypes.byref(self.uwLenPara)) #yield self.my_array.tostring() yield self.my_array def send(self, frame): n = struct.pack('l', len(frame)) self.lock.acquire() if len(self.pipes): #_, pipes, _ = select([], self.pipes.itervalues(), [], 1) #for pipe in pipes: #sleep(0.1) for pipe in self.pipes.itervalues(): os.write(pipe, n) os.write(pipe, frame) self.lock.release() def run(self): for frame in self.capture(): self.send(frame)class JpegRetriever(object): def __init__(self, streamer): self.streamer = streamer self.local = threading.local() def retrieve(self): while True: ns = os.read(self.local.pipe, 8) n = struct.unpack('l', ns)[0] data = os.read(self.local.pipe, n) yield data def __enter__(self): if hasattr(self.local, 'pipe'): raise RuntimeError() self.local.pipe = streamer.register() return self.retrieve() def __exit__(self, *args): self.streamer.unregister(self.local.pipe) del self.local.pipe return Trueclass Handler(BaseHTTPRequestHandler): retriever = None @staticmethod def setJpegRetriever(retriever): Handler.retriever = retriever def do_GET(self): if self.retriever is None: raise RuntimeError('no retriver') if self.path != '/': return self.send_response(200) self.send_header("Content-type", 'multipart/x-mixed-replace;boundary=abcde') self.end_headers() with self.retriever as frames: for frame in frames: self.send_frame(frame) def send_frame(self, frame): self.wfile.write('--abcde\r\n') self.wfile.write('Content-Type: image/jpeg\r\n') self.wfile.write('Content-Length: %d\r\n\r\n' % len(frame)) self.wfile.write(frame)if __name__ == '__main__': streamer = JpegStreamer(0) streamer.start() retriever = JpegRetriever(streamer) Handler.setJpegRetriever(retriever) print 'Start server...' httpd = ThreadingTCPServer(('', 9000), Handler) httpd.serve_forever()
实现结果,延时非常大,串口摄像头获取数据较慢。
已发现的问题就是windows的select的使用和linux的区别,虽然使用的python,但是底层还是调用系统底层的api实现的,并不是通过语言自身实现。所以不同平台,还是会有不同。
另外在已显示的页面,再刷新时会出现异常,这个异常号通过网上查找,是因为客户端断开连接时,出现的。也就是刷新时会出现。也许是正常现象,因为程序里没有对异常进行处理,直接抛给了系统。