Python 中的进程管道


Python 中的进程管道

问题需求:需要在终端执行一条命令,但是输出的时间非常长,为了响应终端的输出又不影响程序的执行,将输出内容进行异步启动并将终端输出的内容存放至管道中,进行读取。
相关文章:https://ld246.com/article/1577762914087

1.Popen方法

使用的是subprocess.Poen()方法,这个模块从python 2.4 的时候开始支持模块中的Popen()方法。用这个模块subprocess产生子进程,并连接到子进程的标准输入/输出/错误中去,还可以得到子进程的返回值。

subprocess 模块主要用于创建子进程,并连接它们的输入、输出和错误管道,获取它们的返回状态。通俗地说就是通过这个模块,你可以在Python的代码里执行操作系统级别的命令,比如ipconfigdu -sh等等。subprocess模块替代了一些老的模块和函数,比如:os.system,os.spawn,os.popen

建议:当执行命令的参数或者返回中包含了中文文字,那么建议使用subprocess。

1.1 基本介绍

源码浏览:

class Popen(Generic[AnyStr]):
    args: _CMD
    stdin: IO[AnyStr] | None
    stdout: IO[AnyStr] | None
    stderr: IO[AnyStr] | None
    pid: int
    returncode: int
    universal_newlines: bool

    # Technically it is wrong that Popen provides __new__ instead of __init__
    # but this shouldn't come up hopefully?

    if sys.version_info >= (3, 7):
        # text is added in 3.7
        @overload
        def __new__(
            cls,
            args: _CMD,
            bufsize: int = ...,
            executable: StrOrBytesPath | None = ...,
            stdin: _FILE | None = ...,
            stdout: _FILE | None = ...,
            stderr: _FILE | None = ...,
            preexec_fn: Callable[[], Any] | None = ...,
            close_fds: bool = ...,
            shell: bool = ...,
            cwd: StrOrBytesPath | None = ...,
            env: _ENV | None = ...,
            universal_newlines: bool = ...,
            startupinfo: Any | None = ...,
            creationflags: int = ...,
            restore_signals: bool = ...,
            start_new_session: bool = ...,
            pass_fds: Any = ...,
            *,
            text: bool | None = ...,
            encoding: str,
            errors: str | None = ...,
        ) -> Popen[str]: ...

简化

subprocess.Popen()(args, *, stdin=None, 
                   input=None, stdout=None, 
                   stderr=None, shell=False, 
                   timeout=None, check=False, 
                   encoding=None, errors=None)

功能:执行*args参数所表示的命令,等待命令结束返回一个CompleteProcess类型的对象。

注意:run()方法返回的不是我们想要的结果或相关信息,而是一个CompleteProcess类型的对象。

参数解释:

args # 表示要执行的命令,必须是字符串,字符串参数列表。
stdin,stderr,stdout # 子进程标准输入输出和错误。其值可以是 subprocess.PIPE、subprocess.DEVNULL、一个已经存在文件描述符、已经打开的文件对象或者None. subprocess.PIPE 表示为子进程创建新的管道。subprocess.DEVNULL 表示使用os.devnull.默认是None表示什么都不做。另外stderr可以合并到stdout里一起输出。
timeout # 设置超时时间。如果执行时间超时,子进程将被杀死,并抛出异常。
check # 如果参数设置为True,并且进程退出状态码不是0,则抛出异常。
encoding #如果指定了该参数,则stdin、stdout和stderr可以接收字符串数据,并以该编码方式编码。否则只接收bytes类型的数据。
shell # 如果参数设置为True,将通过操作系统的shell执行指定命令。

1.2 简单使用

import subprocess
s = subprocess.Popen('pip list',stdout=subprocess.PIPE,stdin=subprocess.PIPE)

print(s.stdout.read()) # 标准输出。
for i in s.stdout.readlines():
    print(i) # 按行输出

1.3 获取进程pid

s = subprocess.Popen('ping 127.0.0.1',shell=False,stdout=subprocess.PIPE,stdin=subprocess.PIPE)
print(s.stdout.read())
print(s.pid) # 获取进程号。

1.4 检查子进程状态

s = subprocess.Popen('ping 127.0.0.1',shell=False,stdout=subprocess.PIPE,stdin=subprocess.PIPE,encoding="gbk")
print(s.poll()) # None 的时候表示正在运行的状态
print(s.stdout.read())
print(s.pid) # 获取进程号。
print(s.poll()) # 0 表示运行结束的状态
s.wait() # 等待运行结束。

1.5 终止任务

print(s.poll())
print(s.terminate()) # 停止子进程
print(s.poll())
print(s.kill()) # 杀死进程
print("zzz",s.poll())

1.6 获取状态信息

print("状态返回值",s.returncode) # None
while s.poll() is None:
    print(s.stdout.readline()) # 输出信息
print("状态返回值",s.returncode) # 0

未结束的时候子进程状态的返回值是None,当进程结束后状态返回值的0;

2.实时读取终端信息

介绍:有些时候我们想在前端界面中中嵌入一个命令行,进行实时交互。或者将执行信息实时的显示在屏幕上,或者在执行终端的命令的时候去执行其他的任务。

2.1 案列:获取 ping 信息

import sys
import subprocess

def run(cmd,shell=False) -> (int,str):
    '''
    开启子进程,执行对应的命令。然后返回进程执行返回的状态码和数据。
    Args:
        cmd:
        shell:

    Returns:

    '''
    print('/033[1;32m************** START **************/033[0m')
    p = subprocess.Popen(cmd, shell=shell, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
    # STDOUT 也可以改为 PIPE,但是是STDOUT的时候,标准错误也会直接重定向到stdout中。
    result = []
    while p.poll() is None:
        line = p.stdout.readline().strip() # 获取每行的输出信息并去除空格。
        if line:
            line = _decode_data(line)
            result.append(line)
            print('/033[1;35m{0}/033[0m'.format(line))
        print('/033[1;35m{0}/033[0m'.format(line))
        # 清空缓存。
        sys.stdout.flush() # 实时显示的核心,
        sys.stderr.flush()
    if p.returncode == 0:
        print('/033[1;32m************** SUCCESS **************/033[0m')
    else:
        print('/033[1;31m************** FAILED **************/033[0m')
    return p.returncode,'/r/n'.join(result)

def _decode_data(byte_data:bytes):
    '''
    解码数据。
    Args:
        byte_data:

    Returns:

    '''
    try:
        return byte_data.decode("UTF-8")
    except UnicodeDecodeError:
        return byte_data.decode("GB18030")

if __name__ == '__main__':
    return_code, data = run('ping www.baidu.com')
    print('return code:', return_code, 'data:', data)

以上部分可以显示出实时获取的数据,因为开启子进程之后去执行相关的命令。

2.2 知识补充sys模块的使用

重点:在读取终端信息中,需要重点学习的方法有sys.stdoutsys.stdin两个部分。

2.2.1 sys 中的常用函数

sys.argv, 实现从程序外部直接向程序传递参数
sys.path, 查看模块的搜索路径
sys.modules, 获取导入到内存中所有模块的名字:内存地址
sys.getdefaultencoding(), 获取python解释器当前编码,python一般默认为utf-8
sys.getfilesystemencoding(), 获取python解释器中文件系统编码
sys.platform, 获取当前系统平台
sys.stdout ,把 print 输出重定向到另一个位置
sys.stdin,标准化输入可以理解为input

2.2.2 sys.path

import sys

# 获取解释器查找的路径
print(sys.path) #当我们导入包的时候需要从这个里面包含的路径中去寻找,当我们导入的包不存在的时候可以向列表中添加元素。
>>> ['xxx/xxx','xxxx/xxx']
sys.path.append('D:/') # 直接将该目录添加至以下列表中。
>>> ['xxx/xxx','xxxx/xxx','D:/'] 

sys.path.append()在 linux 中执行相关单独脚本的时候,导包失败的情况下经常使用,用来添加执行路径,从而导包成功。

相关参考文献:https://zhuanlan.zhihu.com/p/436183856

补充:sys.exit()函数

功能结束的函数:

import sys

# 获取解释器查找的路径
sys.path.append('D:/')
print(sys.path)
# 程序退出
sys.exit() #执行结束。

print("我没有执行") # 此处的代码不会执行。

2.2.3 sys.stdout

import sys
print('奥力给')

oldPrint = sys.stdout   # 用于后期还原

# 把输出重定向到文件
f=open('outfile.log',"a+",encoding="utf-8")
sys.stdout=f
print('给力奥') # 给力奥直接输出到文件中去。

sys.stdout.flush()刷新缓存。

相关文章:https://blog.csdn.net/wuguangbin1230/article/details/77680058

import sys
import time

for i in range(5):
    print(i) # 此处如果是终端的输出
    sys.stdout.flush() # 可以保证实时的刷新显示。立即刷新内存到屏幕显示。
    time.sleep(10)

其他方法参考:https://blog.csdn.net/jjjndk1314/article/details/79634054https://blog.csdn.net/qq_45894553/article/details/104878735

2.3 多线程异步读取

介绍:主要变化就是将,读取部分的内容移动到线程的后续部分进行读取,在线程中开启子进程执行相关的命令。

编写执行终端命令的函数:

import subprocess

def run(cmd,shell=False) -> (int,str):
    '''
    开启子进程,执行对应的命令。然后返回进程执行返回的状态码和数据。
    Args:
        cmd:
        shell:

    Returns:

    '''
    print('/033[1;32m************** START **************/033[0m')
    p = subprocess.Popen(cmd, shell=shell, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
    return p #直接将进程的执行对象进行返回

def _decode_data(byte_data:bytes):
    '''
    解码数据。
    Args:
        byte_data:

    Returns:

    '''
    try:
        return byte_data.decode("UTF-8")
    except UnicodeDecodeError:
        return byte_data.decode("GB18030")

线程部分逻辑的实现

import sys
from concurrent.futures import ThreadPoolExecutor

from test2 import run,_decode_data

def task(cmd):
    print("请求到来>>>")
    # return_code, data = run(cmd)
    p = run(cmd)
    result = []
    while p.poll() is None:
        line = p.stdout.readline().strip()  # 获取每行的输出信息并去除空格。
        if line:
            line = _decode_data(line)
            result.append(line)
            print('/033[1;35m{0}/033[0m'.format(line))
        # 清空缓存。
        sys.stdout.flush()
        sys.stderr.flush()
    if p.returncode == 0:
        print('/033[1;32m************** SUCCESS **************/033[0m')
    else:
        print('/033[1;31m************** FAILED **************/033[0m')
    return p.returncode, '/r/n'.join(result)

    # return "执行中"

pool = ThreadPoolExecutor(10)

if __name__ == '__main__':
    print("开始执行。。。")
    pool.submit(task,"ping -t www.baidu.com")
    print("我正在执行中。。。")

线程中开启进程,将数据的读取直接直接在异步线程中完成,如果要进行通信线程之间可以使用公共变量的方式进行数据共享或者使用队列进行线程或者进程之间的数据传输。

原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/287863.html

(0)
上一篇 2022年9月6日
下一篇 2022年9月6日

相关推荐

发表回复

登录后才能评论