600字范文,内容丰富有趣,生活中的好帮手!
600字范文 > 迷你无人机 FPV WIFI CAMERA图传破解 mini drone WIFI camera

迷你无人机 FPV WIFI CAMERA图传破解 mini drone WIFI camera

时间:2024-02-12 18:41:48

相关推荐

迷你无人机 FPV WIFI CAMERA图传破解 mini drone WIFI camera

Author: Aric Wang

在闲鱼上买了个迷你无人机的图像模块,介绍说要用wificam app可以查看实时视频和控制。因为我想用电脑显示,或者开发自己的APP来控制和显示实时视频,于是逆向分析了一下在此记录。

首先抓包分析:准备一台有双无线网卡的电脑和安装好wireshark,电脑一张无线网卡设置好AP,另一张无线网卡连FPV WIFI 相机。手机安装好wificam app(apk),并且该手机连到电脑建的热点。

让电脑开起路由功能。此时打开wificam app,就可以看到视频了。 用于wireshark抓一个完整的包,包括,开始,视频传输,结束的。

通过分析抓包可知: UDP 8090用于控制,UDP 8080于来传输视频,probe出来,视频格式为MJPEG的VGA流。使用UDP分片传输JPEG思路挺不错的,效果了不错,毕竟是商用方案。

向UDP 8090,发 AA 80 80 00 80 00 80 55命令,再向UPD 8080发 42 76就开始实时视频。

结束也是类似,先向UDP 8090发命令,再向UDP 8080发 42 77就可以停止视频。

MJPEG通常使用TCP稳定的传输,该项目的创意之处就是UDP分片传输,一帧一帧的传。

抓包可以清析的看到一帧一帧的图片,最后一个包变小了,应该是传输剩余数据。

UDP分片也有引导头,包括JPEG的续号sequence number,1-255循环,只用了一个字节表示。头第二位用来表示该帧的最后一个分片 。

02就是第二张图片的意思,01就是分片的最后一片。

下图可以看到,非最后分片,第二位为0

UDP有大可能丢包,所以尾部也要处理一下,这里我猜了很多,以为是bcc crc8 crc-sum等等,原来只是little endian的unsigned short。那这就简单了!

写个向标准输出,输出视频流的脚本:

udp_decode_std.py

#!/usr/bin/env pythonimport os,sys,time,socketimport selectimport numpy as npimport struct, queue, _threadUDP_PKT_SIZE= 2000out_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)out_sock.bind(('0.0.0.0', 6666))out_sock.sendto(b'\xaa\x80\x80\x00\x80\x00\x80\x55',("192.168.4.153", 8090))out_sock.sendto(b'\x42\x76',("192.168.4.153", 8080))def isValidJPEG():t_len = len(s_buf)if s_buf[0] == 0xff and s_buf[1] == 0xd8 and s_buf[t_len-1] == 0xd9 and s_buf[t_len-2] == 0xff:return Trueelse:return Falsedef chk_tail(jpg_len, b_jpg_len):unpack_len = struct.unpack("<H", b_jpg_len)try:tail_jpeg_len = int(unpack_len[0])if tail_jpeg_len != jpg_len:print("Check tail failed.",tail_jpeg_len, jpg_len)return Falseelse:return Trueexcept:return Falsepasss_buf = b''sn_old = 0b_jpg_len=b''udp_recv_buf_q = queue.Queue()def decode_jpeg_proc():global udp_recv_buf_qwhile 1:if not udp_recv_buf_q.empty():m_item = udp_recv_buf_q.get()rx_buf_len = len(m_item[0])if chk_tail(rx_buf_len, m_item[1]):#decode_jpeg(m_item[0])os.write(1,m_item[0])else:print("#### CHECK FAILED ####")passelse:time.sleep(0.001)pass_thread.start_new_thread(decode_jpeg_proc, (()))while 1:rx_buf, addr = out_sock.recvfrom(UDP_PKT_SIZE)rv_len = len(rx_buf)sn = rx_buf[0]isEof = rx_buf[1]if sn_old != sn:sn_old = snif len(s_buf) == 0 :continue#Finish a whole picture. Decode later.#print("Got a frame, try decode:",len(s_buf))#Decode#decode_jpeg(s_buf)if not udp_recv_buf_q.full():if isValidJPEG():udp_recv_buf_q.put((s_buf, b_jpg_len))else:print("Not valid JPEG.")pass#Clear buffers_buf = b''if isEof != 1:s_buf=s_buf+rx_buf[8:]passelse:s_buf=s_buf+rx_buf[8:rv_len-5]#Featch jpeg length, little endian ushort b_jpg_len = rx_buf[rv_len-4:rv_len-2]time.sleep(0.001)

然后就可以用ffplay ffmpeg gstreamer来播放了

#ffplay 播放,有时延python3 udp_decode_std.py | ffplay -#low latency playpython3 udp_decode_std.py | ffmpeg -threads 1 -re -fflags nobuffer -f mjpeg -i - -pix_fmt yuv420p -f sdl -python3 wificam_udp_cam_stop.py #Gstreamer也可以播放,python3 udp_decode_std.py | gst-launch-1.0 filesrc location=/dev/stdin ! queue ! jpegdec ! autovideosink

当然也可以用opencv来显示实时视频:

#!/usr/bin/env pythonimport os,sys,time,socketimport selectimport cv2import numpy as npimport struct, queue, _threadUDP_PKT_SIZE= 2000out_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)out_sock.bind(('0.0.0.0', 6666))out_sock.sendto(b'\xaa\x80\x80\x00\x80\x00\x80\x55',("192.168.4.153", 8090))out_sock.sendto(b'\x42\x76',("192.168.4.153", 8080))def decode_jpeg(buf):try:img = cv2.imdecode(np.fromstring(buf, dtype=np.uint8) ,cv2.IMREAD_COLOR)cv2.imshow('IMG', img)if cv2.waitKey(1) == 27:exit(0)#img = Image.fromarray(np.fromstring(buf, dtype=np.uint8) )#cv2.imshow(img) except Exception as e:print(">>>>>>>>>> imdecode error!", e)passdef isValidJPEG():t_len = len(s_buf)if s_buf[0] == 0xff and s_buf[1] == 0xd8 and s_buf[t_len-1] == 0xd9 and s_buf[t_len-2] == 0xff:return Trueelse:return Falsedef chk_tail(jpg_len, b_jpg_len):unpack_len = struct.unpack("<H", b_jpg_len)try:tail_jpeg_len = int(unpack_len[0])if tail_jpeg_len != jpg_len:print("Check tail failed.",tail_jpeg_len, jpg_len)return Falseelse:return Trueexcept:return Falsepasss_buf = b''sn_old = 0b_jpg_len=b''udp_recv_buf_q = queue.Queue()def decode_jpeg_proc():global udp_recv_buf_qwhile 1:if not udp_recv_buf_q.empty():m_item = udp_recv_buf_q.get()rx_buf_len = len(m_item[0])if chk_tail(rx_buf_len, m_item[1]):decode_jpeg(m_item[0])else:print("#### CHECK FAILED ####")passelse:time.sleep(0.001)pass_thread.start_new_thread(decode_jpeg_proc, (()))while 1:rx_buf, addr = out_sock.recvfrom(UDP_PKT_SIZE)rv_len = len(rx_buf)sn = rx_buf[0]isEof = rx_buf[1]if sn_old != sn:sn_old = snif len(s_buf) == 0 :continue#Finish a whole picture. Decode later.#print("Got a frame, try decode:",len(s_buf))#Decode#decode_jpeg(s_buf)if not udp_recv_buf_q.full():if isValidJPEG():udp_recv_buf_q.put((s_buf, b_jpg_len))else:print("Not valid JPEG.")pass#Clear buffers_buf = b''if isEof != 1:s_buf=s_buf+rx_buf[8:]passelse:s_buf=s_buf+rx_buf[8:rv_len-5]#Featch jpeg length, little endian ushort b_jpg_len = rx_buf[rv_len-4:rv_len-2]time.sleep(0.001)

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。