博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
8-4 如何使用线程本地数据
阅读量:5171 次
发布时间:2019-06-13

本文共 17139 字,大约阅读时间需要 57 分钟。

通常HTTP都是TCP的短连接,但本案例中传输的是视频流,短连接无法实现,可以使用服务器数据推送的方式。也就是在一次HTTP请求当中,一直使用一个TCP连接持续向客户端发送数据。

 

说明本地数据local的特点

例子一:

>>> import threading>>> L = threading.local()        #创建一个本地数据>>> >>> L.x =1                           #创建本地对象L的一个属性x>>> L.x                              #在主线程中可以访问这个属性1>>> def f():    print(L.x)    >>> f()                             #在主线程中可以访问这个属性1>>> threading.Thread(target=f).start()  #新创建一个线程并启动,通过异常可以看出,找不到local这个对象L没有属性xException in thread Thread-15:Traceback (most recent call last):  File "C:\Python27\lib\threading.py", line 801, in __bootstrap_inner    self.run()  File "C:\Python27\lib\threading.py", line 754, in run    self.__target(*self.__args, **self.__kwargs)  File "
", line 3, in fAttributeError: 'thread._local' object has no attribute 'x'

例子二:

>>> import threading>>> L = threading.local()>>> >>> L.x =1>>> L.x1>>> def f():    print(L.x)    >>> f()1>>> def g():    L.x =5    print (L.x)    >>> threading.Thread(target = g).start()5>>> L.x1

说明:同一个本地数据的对象L,但在不同线程中,他的x属性的值不同。

>>> from select import select>>> help(select)Help on built-in function select in module select:select(...)    select(rlist, wlist, xlist[, timeout]) -> (rlist, wlist, xlist)        Wait until one or more file descriptors are ready for some kind of I/O.    The first three arguments are sequences of file descriptors to be waited for:    rlist -- wait until ready for reading    wlist -- wait until ready for writing    xlist -- wait for an ``exceptional condition''    If only one kind of condition is required, pass [] for the other lists.    A file descriptor is either a socket or file object, or a small integer    gotten from a fileno() method call on one of those.        The optional 4th argument specifies a timeout in seconds; it may be    a floating point number to specify fractions of seconds.  If it is absent    or None, the call will never time out.        The return value is a tuple of three lists corresponding to the first three    arguments; each contains the subset of the corresponding file descriptors    that are ready.        *** IMPORTANT NOTICE ***    On Windows and OpenVMS, only sockets are supported; on Unix, all filedescriptors can be used.
help(select)
>>> d = {0:kobe,1:bob}>>> for x in d.itervalues():    print xkobebob

1、例子实现的代码:

import os, cv2, time, struct, threadingfrom BaseHTTPServer import HTTPServer, BaseHTTPRequestHandlerfrom SocketServer import TCPServer, ThreadingTCPServerfrom threading import Thread, RLockfrom select import selectclass JpegStreamer(Thread):    def __init__(self, camera):        Thread.__init__(self)        self.cap = cv2.VideoCapture(camera)        self.lock = RLock()        self.pipes = {}    def register(self):        pr, pw = os.pipe()        self.lock.acquire()        self.pipes[pr] = pw        self.lock.release()        return pr    def unregister(self, pr):        self.lock.acquire()        self.pipes.pop(pr)        self.lock.release()        pr.close()        pw.close()    def capture(self):        cap = self.cap        while cap.isOpened():            ret, frame = cap.read()            if ret:                #ret, data = cv2.imencode('.jpg', frame)                ret, data = cv2.imencode('.jpg', frame, (cv2.IMWRITE_JPEG_QUALITY, 40))                yield data.tostring()    def send(self, frame):        n = struct.pack('l', len(frame))        self.lock.acquire()        if len(self.pipes):            _, pipes, _ = select([], self.pipes.itervalues(), [], 1)            for pipe in pipes:                os.write(pipe, n)                os.write(pipe, frame)        self.lock.release()    def run(self):        for frame in self.capture():            self.send(frame)class JpegRetriever(object):    def __init__(self, streamer):        self.streamer = streamer        self.local = threading.local()    def retrieve(self):        while True:            ns = os.read(self.local.pipe, 8)            n = struct.unpack('l', ns)[0]            data = os.read(self.local.pipe, n)            yield data    def __enter__(self):        if hasattr(self.local, 'pipe'):            raise RuntimeError()        self.local.pipe = streamer.register()        return self.retrieve()    def __exit__(self, *args):        self.streamer.unregister(self.local.pipe)        del self.local.pipe        return Trueclass Handler(BaseHTTPRequestHandler):    retriever = None    @staticmethod    def setJpegRetriever(retriever):        Handler.retriever = retriever    def do_GET(self):        if self.retriever is None:            raise RuntimeError('no retriver')        if self.path != '/':            return        self.send_response(200)         self.send_header("Content-type", 'multipart/x-mixed-replace;boundary=abcde')        self.end_headers()        with self.retriever as frames:            for frame in frames:                self.send_frame(frame)    def send_frame(self, frame):        self.wfile.write('--abcde\r\n')        self.wfile.write('Content-Type: image/jpeg\r\n')        self.wfile.write('Content-Length: %d\r\n\r\n' % len(frame))        self.wfile.write(frame)if __name__ == '__main__':    streamer = JpegStreamer(0)    streamer.start()    retriever = JpegRetriever(streamer)    Handler.setJpegRetriever(retriever)    print 'Start server...'    httpd = ThreadingTCPServer(('', 9000), Handler)    httpd.serve_forever()

 

2、通过串口摄像头实现例子:

自己的台式机,不像笔记本自带摄像头,所以通过《python使用ctypes模块调用C语言动态库》中实现了,存储一个串口摄像头的照片。现在修改代码,并在代码中比较详细注释了。

# -*- coding: cp936 -*-import os, cv2, time, struct, threadingfrom BaseHTTPServer import HTTPServer, BaseHTTPRequestHandlerfrom SocketServer import TCPServer, ThreadingTCPServerfrom threading import Thread, RLockfrom select import selectfrom time import sleep#数据源,不断从摄像头获取数据,并发送到管道中去。class JpegStreamer(Thread):    def __init__(self, camera):        Thread.__init__(self)        #self.cap = cv2.VideoCapture(camera)  # 打开摄像头文件 openCV是图像处理,视觉工具,可以打开图像或视频        self.lock = RLock()                    #RLock递归锁RLock允许在同一线程中被多次acquire。而Lock却不允许这种情况                                                #使用RLock,那么acquire和release必须成对出现,即调用了n次acquire,必须调用n次的release才能真正释放所占用的琐        self.pipes = {}    """    注册接口,想使用源中的数据,需要调用 这个接口。    将数据源写入管道,用户从管道读出数据    """    def register(self):        pr, pw = os.pipe()  # 创建一个管道        self.lock.acquire()     #对列表进行互斥锁操作        self.pipes[pr] = pw  # 维护管道的写端  把写端入在了pipes字典  {pr:pw}        self.lock.release()        return pr  # 把读端返回给用户接口    def unregister(self, pr):        self.lock.acquire()        pw = self.pipes.pop(pr)   #将读的键删除值删除        self.lock.release()        pr.close()                 #关闭读写管道        pw.close()    """    图像采集函数    """    """            #装饰器(decorator)可以给函数动态加上功能            # @property装饰器就是负责把一个方法变成属性调用的            # 把一个getter方法变成属性,只需要加上@property就可以了,            此时,本例中@property本身又创建了另一个装饰器@capture.setter,负责把一个setter方法变成属性赋值,于是,我们就拥有一个可控的属性操作            @capture.setter            def capture(self):                xxxx            只不过这里只可以定义只读属性,只定义getter方法,不定义setter方法就是一个只读属性            因为调用capture()函数时,"()"没有加,就相当于使用他的属性,定义函数时就要使用@property装饰器     """    @property    def capture(self):        '''        cap = self.cap        while cap.isOpened():            ret, frame = cap.read()  # 从摄像头获取一帧数据            if ret:                """                    返回一个生成器对象,                """            # ret, data = cv2.imencode('.jpg', frame)            ret, data = cv2.imencode('.jpg', frame, (cv2.IMWRITE_JPEG_QUALITY, 40))  # 将获取的数据帧编码成jpg图片            yield data.tostring()        '''        while True:            frame = cv2.imread(r"D:\test123.jpg")  #opencv库函数 从地址文件获取照片数据,解码器以像素BGR顺序存储到矩阵中。通过照片的内容决定照片的格式,而不是通过后缀名实现的            ret, data = cv2.imencode('.jpg', frame, (cv2.IMWRITE_JPEG_QUALITY, 40))  # 将获取的数据帧编码成jpg图片。 将解码后的数据矩阵编码成相应的格式  返回是一个buffer            yield data.tostring()                  #返回一个生成器,变成generator的函数,在每次调用next()或for循环迭代的时候执行,遇到yield语句返回,再次执行时从上次返回的yield语句处继续执行。                                                    #tostring具体用法不太知道,这是openCV的依赖库numpy模块里的函数。应该是转换将16进制转为相应的字符串  0x16改为”16“,python写文件是写的字符串或者是反过来的?????            sleep(0.2)                                #延时200ms        """        将其中的一帧发送到所有已注册的管道中        """    def send(self, frame):        n = struct.pack('l', len(frame))    #struct模块用于python的值和c结构转换。官方文档https://docs.python.org/3/library/struct.html#format-characters                                            #这里的意思是把frame的长度按long型转换 如3转换为b‘\x03x00\x00\x00’即0x00000003        """            >>> from struct import *            >>> pack('hhl', 1, 2, 3)     #h  对应C_TYPE 为 short            b'\x00\x01\x00\x02\x00\x00\x00\x03'            >>> unpack('hhl', b'\x00\x01\x00\x02\x00\x00\x00\x03')            (1, 2, 3)        """        self.lock.acquire()        if len(self.pipes):             #self.pipes字典里有值时,注册后自然就会有值 了            #_, pipes, _ = select([], self.pipes.itervalues(), [], 1)  #linux里可以用select监测socket和文件的数据,但windows只能监测socket里的,不能监测文件的            sleep(0.2)            """ |  iteritems(...) |      D.iteritems() -> an iterator over the (key, value) items of D |   |  iterkeys(...) |      D.iterkeys() -> an iterator over the keys of D |   |  itervalues(...) |      D.itervalues() -> an iterator over the values of D            """            for pipe in self.pipes.itervalues():    #返回字典值的生成器,pipes的键是读pipe 值 是写pipe                #if(len(pipe)):                os.write(pipe, n)                   #把长度写入pipe                os.write(pipe, frame)               #把图像帧写入pipe        self.lock.release()        """        从capture()获取一帧数据,再发送到管道中去        """    def run(self):        for frame in self.capture:      #循环迭代获取的图片,并把图片数据发送到管道中            self.send(frame)class JpegRetriever(object):    def __init__(self, streamer):        self.streamer = streamer        # 持有一个streamer对象(数据源对象)        self.local = threading.local()  # 需要将每个线程使用的管道,实现线程本地数据,以后每注册一个管道都应该成为这个 self.local的一个属性        # self.pipe = streamer.register()    #多线程就是多线程Retriever来处理,这里用相同的本地数据在不同线程不同值的特点.        # 调用注册register()接口拿道管道    def retrieve(self):        while True:            ns = os.read(self.local.pipe, 8)     #写入的时候明明是4个字节long的长度,读出时为什么读8个长度            n = struct.unpack('l', ns)[0]       #unpack返回的是一个元组,即使只有一个元素如unpack('l','\x03\x00\x00\x00') 返回 (3,)返回这里加了[0]表示返回元组的第0项            data = os.read(self.local.pipe, n)  #从管道中读出图片数据            yield data                          #返回生成器,图片数据的生成器,向Handler返回    """    retriever是单线程时没有—__enter__()。    有注册也有注销,最好实现成上下文管理器。__enter__()    """    """    使用管道时,每次使用都需要注册,使用完后都需要注销,这样最好使用上下文管理    实现上下文管理器 __enter__进入函数    """    def __enter__(self):        # self.pipe = streamer.register()        if hasattr(self.local, 'pipe'):  # 为了避免重复进入enter,判断local下是否有pipe属性,没有时,才注册。hasattr()查看某一对象是否包含某一属性。            raise RuntimeError()        self.local.pipe = streamer.register()  # 注册成为一个本地的管道        return self.retrieve()  # 得到每一帧的生成器    """    def cleanup(self):        self.streamer.unregister(self.pipe)    """    """    上下文管理器的退出函数,这里先不关心异常,用*args收集异常,压制所有异常    """    def __exit__(self, *args):        self.streamer.unregister(self.local.pipe)        del self.local.pipe  # 使用完毕后删除本地属性        return Trueclass Handler(BaseHTTPRequestHandler):         #用于实现HTTP服务器,但自身不能实现,需要通过继承子类,实现do_METHOD(METHOD为具体的方法如GET、POST等)    retriever = None    @staticmethod                       #装饰器 使setJpegRetriever成为静态的方法。参见C++的类里静态方法,只能类调用 ,不能实例调用    def setJpegRetriever(retriever):        Handler.retriever = retriever  #将传入的retriever对象赋值给类的retriever的对象    def do_GET(self):        if self.retriever is None:            raise RuntimeError('no retriver')        """        http响应头部的构造        """        if self.path != '/':            return        self.send_response(200)        self.send_header("Content-type", 'multipart/x-mixed-replace;boundary=abcde')        self.end_headers()        """        #调用retrieve()每次拿到一帧数据并通过send_frame()变成hettp的响应发送出去        for frame in self.retriever.retrieve():            self.send_frame(frame)        """        """        使用上下文管理,得到数据帧生成器        """        with self.retriever as frames:  # 增加上下文管理  打开retriever,循环迭代  得到图到每一帧的数据            for frame in frames:                self.send_frame(frame)   #调用send_frame 将图片的的数据发送到http中去    def send_frame(self, frame):        self.wfile.write('--abcde\r\n')        self.wfile.write('Content-Type: image/jpeg\r\n')        self.wfile.write('Content-Length: %d\r\n\r\n' % len(frame))        self.wfile.write(frame)if __name__ == '__main__':    streamer = JpegStreamer(0)                  #定义JpegStreamer线程类的对象    streamer.start()                            #线程启动调用 run()方法    retriever = JpegRetriever(streamer)         #定义JpegRetriever类的实例,    Handler.setJpegRetriever(retriever)    print 'Start server...'    # httpd = TCPServer(('', 9000), Handler)     #TCPServer和ThreadingTCPServer的差别是。在处理每一次的http请求的时候,ThreadingTCPServer会创建一个独立的线程来执行Handler中的do_GET    httpd = ThreadingTCPServer(('', 9000), Handler)    httpd.serve_forever()                       #启动服务

 只有一个 JpegRetriever对象,也就是只有一个管道,在多链接时,也就是多线程时是不行的。每个客户端(每个线程应该有一个独立的管道)

    使用线程本地数据,可以解决这个问题,一个JpegRetriever对象,创建本地数据管道,这样每个线程的数据都是独立的。

下面是实验中的结果图:

 

3、串口摄像头实现视频播放

上面只是使用现有图片做为测试,下面使用摄像头,使用了ctypes模块调用串口摄像头C语言驱动,再进行测试

import os, cv2, time, struct, threadingfrom BaseHTTPServer import HTTPServer, BaseHTTPRequestHandlerfrom SocketServer import TCPServer, ThreadingTCPServerfrom threading import Thread, RLockfrom select import selectimport ctypesfrom time import sleepclass JpegStreamer(Thread):    def __init__(self, camera):        Thread.__init__(self)        #self.cap = cv2.VideoCapture(camera)        self.testdll = ctypes.CDLL(r"D:\project program\photo\VS_dll\photo_dll\photo_dll.dll")        ret = self.testdll.ComCamInit()        type_uchar_array_20k = ctypes.c_char * (20 * 1024)        self.my_array = type_uchar_array_20k()        self.uwLenPara = ctypes.c_ushort(0)        self.testdll.BSP_CameraGetPhoto.restype = ctypes.c_ubyte        self.lock = RLock()        self.pipes = {}    def register(self):        pr, pw = os.pipe()        self.lock.acquire()        self.pipes[pr] = pw        self.lock.release()        return pr    def unregister(self, pr):        self.lock.acquire()        self.pipes.pop(pr)        self.lock.release()        pr.close()        pw.close()        ret = self.testdll.BSP_RearCommClose()    def capture(self):        """        cap = self.cap        while cap.isOpened():            ret, frame = cap.read()            if ret:                #ret, data = cv2.imencode('.jpg', frame)                ret, data = cv2.imencode('.jpg', frame, (cv2.IMWRITE_JPEG_QUALITY, 40))                yield data.tostring()        """        while True:            ret = self.testdll.BSP_CameraGetPhoto(1, self.my_array, ctypes.byref(self.uwLenPara))            #yield self.my_array.tostring()            yield self.my_array    def send(self, frame):        n = struct.pack('l', len(frame))        self.lock.acquire()        if len(self.pipes):            #_, pipes, _ = select([], self.pipes.itervalues(), [], 1)            #for pipe in pipes:            #sleep(0.1)            for pipe in self.pipes.itervalues():                os.write(pipe, n)                os.write(pipe, frame)        self.lock.release()    def run(self):        for frame in self.capture():            self.send(frame)class JpegRetriever(object):    def __init__(self, streamer):        self.streamer = streamer        self.local = threading.local()    def retrieve(self):        while True:            ns = os.read(self.local.pipe, 8)            n = struct.unpack('l', ns)[0]            data = os.read(self.local.pipe, n)            yield data    def __enter__(self):        if hasattr(self.local, 'pipe'):            raise RuntimeError()        self.local.pipe = streamer.register()        return self.retrieve()    def __exit__(self, *args):        self.streamer.unregister(self.local.pipe)        del self.local.pipe        return Trueclass Handler(BaseHTTPRequestHandler):    retriever = None    @staticmethod    def setJpegRetriever(retriever):        Handler.retriever = retriever    def do_GET(self):        if self.retriever is None:            raise RuntimeError('no retriver')        if self.path != '/':            return        self.send_response(200)         self.send_header("Content-type", 'multipart/x-mixed-replace;boundary=abcde')        self.end_headers()        with self.retriever as frames:            for frame in frames:                self.send_frame(frame)    def send_frame(self, frame):        self.wfile.write('--abcde\r\n')        self.wfile.write('Content-Type: image/jpeg\r\n')        self.wfile.write('Content-Length: %d\r\n\r\n' % len(frame))        self.wfile.write(frame)if __name__ == '__main__':    streamer = JpegStreamer(0)    streamer.start()    retriever = JpegRetriever(streamer)    Handler.setJpegRetriever(retriever)    print 'Start server...'    httpd = ThreadingTCPServer(('', 9000), Handler)    httpd.serve_forever()

实现结果,延时非常大,串口摄像头获取数据较慢。

已发现的问题就是windows的select的使用和linux的区别,虽然使用的python,但是底层还是调用系统底层的api实现的,并不是通过语言自身实现。所以不同平台,还是会有不同。

另外在已显示的页面,再刷新时会出现异常,这个异常号通过网上查找,是因为客户端断开连接时,出现的。也就是刷新时会出现。也许是正常现象,因为程序里没有对异常进行处理,直接抛给了系统。

 

转载于:https://www.cnblogs.com/smulngy/p/9010189.html

你可能感兴趣的文章
7.28-说说对javaweb的感想吧
查看>>
[九省联考2018] 一双木棋 chess
查看>>
swiper控件(回调函数)
查看>>
Linux串口编程详解(转)
查看>>
.7-Vue源码之AST(3)
查看>>
checkbox:全选、全不选、单选(慕课网题目)
查看>>
supervisor 使用
查看>>
log4j的使用 && slf4j简单介绍
查看>>
只要是[运算] 就会提升数据类型
查看>>
PHP中使用cURL实现Get和Post请求的方法
查看>>
win10下JDK安装,配置环境变量后出现error:could not open '...jvm.cfg'
查看>>
Leetcode 508. Most Frequent Subtree Sum
查看>>
单机配置tomcat 8 集群
查看>>
python-线程互斥锁与递归锁
查看>>
异界冒险
查看>>
Unity3D:UGUI遍历子控件
查看>>
Fizz Buzz 面试题
查看>>
HDU-2027
查看>>
关于 Java 数组的 12 个最佳方法
查看>>
java中强制类型转换
查看>>