-
Notifications
You must be signed in to change notification settings - Fork 89
6.更新记录。
支持运行过程中,随意关闭和启动python程序。无惧反复关闭python和 突然断电导致任务丢失几百个。
之前开100线程/协程的话,随意重启python和断电会导致极大概率丢失200个任务。
官方Threadpoolexecutor是无界队列。使用这个会导致丢失无数个任务,
因为他会迅速把redis的消息全部取出来,添加到自己的queue队列慢慢消费。
因为这个原因所以需要自定义写BoundedThreadpoolexecutor和CustomThreadpoolexecutor。
改版的CustomThreadpoolexecutor修改成了queue最大长度是max_works,自己内部存储100个,
运行中100个,突然关闭python会丢失200个任务。如果queue设置大小为0,则只会丢失100个运行中的任务。
采用的是消费者去除消息时候,用lua脚本同时pop和添加到unacked的独立zset中,函数运行成功后会从set中删除该任务。
同时有一个一直每隔5秒发送心跳到redis服务中的线程,心跳标识中有消费者的唯一标识,绝对不会重复。
如果突然关闭消费者(例如突然断电或者点击关闭python),那么该消费者的心跳将会停止了。这时其他机器的同队列消费者或者当前机器重新启动代码后,在15秒后会
检到被关闭的消费者是非活跃消费者,那么自动将该消费者的unack里面任务全部重新取出返回到待消费队列中。
RedisConsumerAckAble类比RedisConsumer会有一丝丝性能损耗,但python玩redis大部分情况还是python代码本身有性能瓶颈,
而不是造成redis服务端有性能瓶颈,一般只要用在有意义的业务上,就算python很忙把cpu占光了,也不会造成redis服务端达到极限,
python是性能很差的语言,没玩垮redis,自身就把电脑玩死了,所以大部分情况下不要在意加入确认消费后产生额外的对redis服务端的性能压力。
redis要是能直接作为mq使用,redis早就一统天下了,哪里还不断有几十种mq出来。
所以直接基于redis list的如果要做到可靠就必须改进。
基于redisboard,但对redis的list模拟mq功能,进行页面显示优化突出消息队列消费,
加黄显示正在运行中的队列和每10秒的消费速度。每隔10秒自动刷新统计。
由于实时发布和消费,例如10秒内发布20个,消费50个,页面只能显示大小降低了30个,
这个只有专业的mq才能分别显示出来,redis list只是简单数组。
rabbitmq nsq都有官方自带速率显示。
对比concurrent_log_handler包的 ConcurrentRotatingFileHandler ,
windows下性能提高100倍,linux提高10倍,不信的可以测试对比原三方版。
日志用法不变。默默改变多进程文件切片日志,steramhanlder不变,任然是五彩可点击跳转。 强烈建议使用pycharm的 monokai主题颜色,这样日志的颜色符合常规的交通信号灯颜色指示,色彩也非常饱和鲜艳。 设置方式为 打开pycharm的settings -> Editor -> Color Scheme -> Console Font 选择monokai (顺便把ANSi Color的Bright Blue 调成为深蓝色,可以为6.7中说明的print猴子补丁设置更好的底色)
主要在原来基础上实现汉化 彩色 可点击跳转功能。只是放在里面,功能与此框架完全无关。
用法见test_pysnooper.py文件。
可以用来查看执行了哪些代码行 逻辑分支走向,也可以用来度量程序性能,能精确显示运行了多少行python代码。
例如这个可以发现redis.set命令需要执行1000行py代码,
requests.get("https://www.baidu.com")需要执行3万行代码,如果不用工具是万万想不到到底执行了多少行python代码的。
beggar_redis_consumer.py文件的 start_consuming_message函数。
def start_consuming_message(queue_name, consume_function, threads_num):
pool = ThreadPoolExecutor(threads_num)
while True:
try:
redis_task = redis_db_frame.brpop(queue_name, timeout=60)
if redis_task:
task_str = redis_task[1].decode()
print(f'从redis的 {queue_name} 队列中 取出的消息是: {task_str}')
pool.submit(consume_function, **json.loads(task_str))
else:
print(f'redis的 {queue_name} 队列中没有任务')
except redis.RedisError as e:
print(e)
def add(x, y):
time.sleep(5)
print(f'{x} + {y} 的结果是 {x + y}')
# 推送任务
for i in range(100):
redis_db_frame.lpush('test_beggar_redis_consumer_queue', json.dumps(dict(x=i, y=i * 2)))
# 消费任务
start_consuming_message('test_beggar_redis_consumer_queue', consume_function=add, threads_num=10)
看完整版代码很长很多,是由于控制功能太多,中间件类型多,并发模式多, 所以加入一个最精简版,精简版的本质实现原理和完整版相同。
每个队列是一张表模拟的。
每个任务是表里面的一行记录。
只要导入了此框架,那么你的项目里面所有print的行为都会直接发生改变。
控制台彩色和可点击跳转很重要,原来是必须使用我框架带的日志才能变成五彩可点击,
或者需要使用框架里面的nb_print函数来打印,才能使打印变成彩色可点击。
现在则直接改变项目所有直接使用print的地方。
有的人在项目中疯狂的例如 print(x),结果项目运行一层一层的调用,很难找到当时是哪里打印的x,几乎不可能找得到的。
除非他是这么写代码 print("x的值是:",x) ,只有这样才有可能通过ide的全局搜索找得到print的地方。
再说代码里面疯狂频繁print本来就不是好的习惯,谁让你那么频繁的print呢。
原来是需要手动调用patch_frame_config函数来设置框架中间件配置,现在新增一种方式。
用户运行一次任意导入了function_scheduling_distributed_framework框架的文件,
框架自动寻找用户的项目根目录,并在用户的项目的根目录下生成一个 distributed_frame_config.py的文件。
生成的distributed_frame_config.py文件 中包含了所有默认配置项,但以 # 做了注释。
用户需要按需修改用到的中间件的值。框架自动读取distributed_frame_config.py文件中变量的值作为框架的配置。
也可以将distributed_frame_config.py文件移到你的python文件运行起点所在的目录,
框架会优先读取python文件运行起点所在的目录中的distributed_frame_config.py 作为配置,
没找到则读取项目根目录下的distributed_frame_config.py作为配置。同时兼容以最后一次手动调用patch_frame_config函数作为追踪配置。
例如设置默认需不需要彩色,需不需要大背景彩色色块,需不需要自动拦截转化python内置的print.
将qps按范围分段,采用不同的等待或计数方式。使当qps设置很高的时候,控频更精确。
增加了分布式控频,需要依赖redis中间件。
分布式环境中的控频指的是,假如xx.py文件中有一个consumer,设置func函数的qps为10。
如果在线上部署了三个容器服务,如果不使用分布式控频,则func函数的每秒运行总次数会是30。
即使只有1台机器,如果开多进程,Process运行3个进程,或者把xx.py反复运行启动3个,
也会造成func函数每秒运行总次数是30。
分布式控频主要是解决这种问题。默认不使用分布式控频,
当设置 is_using_distributed_frequency_control为True的时候,使用分布式控频。
这次使用修改你的项目根目录下的自动生成的distributed_frame_config.py配置文件的方式来进行redis rabbitmq等的配置。
不用调用patch_frame_config函数的方式进行配置。
装饰器版,使用方式例如:
from function_scheduling_distributed_framework import task_deco
@task_deco('queue_test_f01', qps=0.2, broker_kind=2)
def add(a, b):
print(a + b)
for i in range(10, 20):
add.pub(dict(a=i, b=i * 2)) # 使用add.pub 发布任务
add.push(i, b=i * 2) # 使用add.push 发布任务
add.consume() # 使用add.consume 消费任务
对比常规方式,常规方式使用方式如下
from function_scheduling_distributed_framework import get_consumer
def add(a, b):
print(a + b)
# 需要手动指定consuming_function入参的值。
consumer = get_consumer('queue_test_f01', consuming_function=add,qps=0.2, broker_kind=2)
for i in range(10, 20):
consumer.publisher_of_same_queue.publish(dict(a=i, b=i * 2))
consumer.start_consuming_message()
装饰器版本的 task_deco 入参 和 get_consumer 入参99%一致,唯一不同的是 装饰器版本加在了函数上自动知道消费函数了,
所以不需要传consuming_function参数。
from function_scheduling_distributed_framework import task_deco, BrokerEnum
@task_deco('queue_test_f03', qps=2, broker_kind=BrokerEnum.ROCKETMQ)
def f(a, b):
print(f'{a} + {b} = {a + b}')
if __name__ == '__main__':
for i in range(100):
f.push(i, i * 2)
f.consume()
之前一直都没支持这种并发模式,异步代码不仅消费函数本身与同步代码很多不同,例如函数的定义和调用以及三方库,
不同于gevent和eventlet打个猴子补丁就可以变并发方式并且代码保持100%原样,asyncio的方式代比同步码真的是要大改特改。
而且在框架层面要支持异步也要增加和修改很多,支持异步并不是很容易。这一点连celery5.0目前都还没支持到(据官方文档说5.0要加入支持,但目前的5.0.3还没加入。)
如果消费函数已经写成了async def这种,那么可以设置 concurrent_mode=ConcurrentModeEnum.ASYNC,
框架会在一个新的线程的loop里面自动运行协程。
from function_scheduling_distributed_framework import task_deco, BrokerEnum,ConcurrentModeEnum
import asyncio
# 此段代码使用的是语言级Queue队列,不需要安装中间件,可以直接复制运行测试。
@task_deco('test_async_queue2', concurrent_mode=ConcurrentModeEnum.ASYNC,
broker_kind=BrokerEnum.LOCAL_PYTHON_QUEUE, concurrent_num=500,qps=20)
async def async_f(x):
# 测试异步阻塞并发, 此处不能写成time.sleep(1),否则无论设置多高的并发,1秒钟最多只能运行1次函数。
# 同理asyncio 不能和 requests搭配,要和 aiohttp 搭配。
await asyncio.sleep(1)
print(id(asyncio.get_event_loop()))
#通过 id 可以看到每个并发函数使用的都是同一个loop,而不是采用了愚蠢的临时 asyncio.new_event_loop().run_until_complete(async_f(x)) 方式调度。
print(x)
if __name__ == '__main__':
async_f.clear()
for i in range(100):
async_f.push(i, )
async_f.consume()
比方说汽车的自动挡和手动挡,学了手动挡一定会开自动挡,只学自动挡很难开手动挡。
asyncio方式的代码比正常普通同步思维的代码写法也要难得多了,能玩asyncio的人一定会用threading gevent,
但只用过threading gevent,不去专门学习asyncio的用法,100%是玩不转的。
gevent就像自动挡汽车,自动换挡相当于自动切换阻塞。
asyncio就像手动挡,全要靠自己写 await / async def /loop / run_until_complete /run_forever/
run_coroutine_threadsafe /wait / wait_for /get_event_loop / new_event_loop / get_running_loop
,写法很麻烦很难。异步多了一个loop就像手动挡汽车多了一个离合器一样,十分之难懂。
手动挡玩的溜性能比自动挡高也更省油。asyncio玩的溜那么他的io并发执行速度和效率也会更好,cpu消耗更少。
如果你写一般的代码,那就用同步方式思维来写吧,让分布式函数调度框架来替你自动并发就可以啦。
如果追求更好的控制和性能,不在乎代码写法上的麻烦,并且asyncio技术掌握的很溜,那就用asyncio的方式吧。
异步鬓发模式里面,整个调用链路必须是一旦异步,必须处处异步,在base_consumer.py的AbstractConsumer中,
方法 _async_run_consuming_function_with_confirm_and_retry里面使用的还是操作中间件的同步库,
主要是因为框架目前支持15种中间件,一个一个的使用异步模式的库操作中间件来实现,比现在代码起码要增加80%,无异于重写一个项目了。
异步和同步真的写法语法相差很大的,不信可以比比aiomysql 和pymysql库,aiohttp和requests,如果非常简单能实现异步,
那aiohttp和aiomysql作者为什么要写几万行代码来重新实现,不在原来基础上改造个七八行来实现?
目前此库对 消息确认 消息重新入队 任务过滤 mongo插入,都是采用的同步库,但是使用了 run_in_executor,
把这些操作在异步链路中交给线程池来运行了,同事这个线程池不是官方内置线程池,是智能缩小扩大线程池 ThreadPoolExecutorShrinkAble。
run_in_executor 会把一个同步的操作,sumbit提交给线程池,线程池返回的是一个concurrent.futures包的Future对象,
run_in_executor包装转化了这个Future(此Future不是asyncio的,不是一个awaitable对象)成为了一个asyncio包的Future对象,asyncio的Future对象可以被await,
所以这是非常快捷的同步阻塞函数在异步链路中转同步转异步语法的最佳方式。官方也是这么推荐的。
除了框架内部的阻塞函数是run_in_executor快速转化成非阻塞事件循环的,但是主要的用户的消费函数,是使用的真async模式运行在一个loop循环中的,
也即是单线陈鬓发运行用户的异步函数。
其次框架的同步阻塞函数,都是操作中间件类型的库,异步就是 入队 确认消费 查询是否过滤,这些操作一般都会在1毫秒之内完成,不阻塞太长的事件,
即使不使用run_in_executor,直接在异步链路使用这些同步操作,也没太大问题。一旦异步必须处处异步,说的是不能调用耗时太长的同步阻塞函数,
1毫秒的无伤大雅,因为celery 1秒钟最多能调度300个 def f: print(hello) 这样的无cpu 无io的函数,此框架调度运行速度任然超过celery。
还有一种调度起 async def定义 的消费函数方式是继续开多线程并发,然后使用 临时loop = get_event_loop,loop.run_until_complete,这方式愚蠢了,
相当于只是为了运行起这个函数,但全流程丝毫没有丁点异步。
这个是 redis 的 真消息队列,这次是 真mq,
stream 数据结构功能更加丰富接近 rabbitmq kafka这种真mq的消息队列协议,比 list 做消息队列更强。
需要redis的服务端5.0版本以上才能使用这个数据结构。
代码文件在 function_scheduling_distributed_framework/consumers/redis_stream_consumer.py
这个 REDIS_STREAM 中间件和 REDIS_ACK_ABLE 都支持消费确认,不管客户端怎么掉线关闭,都可以确保消息万无一失。
BrokerEnum.REDIS 中间件 不支持消费确认,随意重启或者断电断线会丢失一批任务。
from function_scheduling_distributed_framework import task_deco, BrokerEnum
@task_deco('queue_test_f01', broker_kind=BrokerEnum.REDIS_STREAM,)
def f(a, b):
print(f'{a} + {b} = {a + b}')
if __name__ == '__main__':
for i in range(100):
f.push(i , b=i * 2)
f.consume()
代码演示省略,设置broker_kind=BrokerEnum.RedisBrpopLpush就行了。
@task_deco('queue_test_f01', broker_kind=BrokerEnum.RedisBrpopLpush,)
zeromq 和rabbbitmq kafka redis都不同,这个不需要安装一个服务端软件,是纯代码的。
zeromq方式是启动一个端口,所以queue_name传一个大于20000小于65535的数字,不能传字母。
import time
from function_scheduling_distributed_framework import task_deco,BrokerEnum
@task_deco('30778',broker_kind=BrokerEnum.ZEROMQ,qps=2)
def f(x):
time.sleep(1)
print(x)
if __name__ == '__main__':
f.consume()
from test_frame.test_broker.test_consume import f
for i in range(100):
f.push(i)
一次性新增操作10种消息队列,.但比较知名的例如rabbitmq redis sqlite3 函数调度框架已经在之前实现了。
使用方式为设置 @task_deco 装饰器的 broker_kind 为 BrokerEnum.KOMBU
在你项目根目录下的 distributed_frame_config.py 文件中设置
KOMBU_URL = 'redis://127.0.0.1:6379/7' 那么就是使用komb 操作redis。
KOMBU_URL = 'amqp://username:[email protected]:5672/',那么就是操纵rabbitmq
KOMBU_URL = 'sqla+sqlite:////dssf_sqlite.sqlite',那么就是在你的代码所在磁盘的根目录创建一个sqlite文件。四个////表示根目,三个///表示当前目录。
其余支持的中间件种类大概有10种,不是很常用,可以百度 google查询kombu或者celery的 broker_url 配置方式。
操作 kombu 包,这个包也是celery的中间件依赖包,这个包可以操作10种中间件(例如rabbitmq redis),
但没包括分布式函数调度框架能支持的kafka nsq zeromq 等。
但是 kombu 包的性能非常差,如何测试对比性能呢?
可以用原生redis的lpush和kombu的publish测试发布
使用brpop 和 kombu 的 drain_events测试消费,对比差距相差了5到10倍。
由于性能差,除非是分布式函数调度框架没实现的中间件才选kombu方式(例如kombu支持亚马逊队列 qpid pyro 队列),
否则强烈建议使用此框架的操作中间件方式而不是使用kombu。
可以把@task_deco装饰器的broker_kind参数 设置为 BrokerEnum.REDIS_ACK_ABLE 和BrokerEnum.KOMBU(配置文件的KOMBU_URL配置为redis),
进行对比,REDIS_ACK_ABLE的消费速度远远超过 BrokerEnum.KOMBU,所以之前专门测试对比celery和此框架的性能,
差距很大,光一个 kombu 就拉了celery大腿很多,再加上celery的除了kombu的执行性能也很低,所以celery比此框架慢很多。
test_frame\test_celery 下面有celery的发布 消费例子,可以测试对比下速度,同样gevent 并发和redis中间件,
celery 执行 print hello 这样的最简单任务,单核单进程每秒执行次数过不了300,celery性能真的是太差了。
import time
from function_scheduling_distributed_framework import task_deco,BrokerEnum
@task_deco('test_kombu2',broker_kind=BrokerEnum.KOMBU,qps=5,)
def f(x):
time.sleep(60)
print(x)
if __name__ == '__main__':
f.consume()
from test_frame.test_broker.test_consume import f
for i in range(10000):
f.push(i)
KOMBU_URL = 'redis://127.0.0.1:6379/7'
# KOMBU_URL = f'amqp://{RABBITMQ_USER}:{RABBITMQ_PASS}@{RABBITMQ_HOST}:{RABBITMQ_PORT}/{RABBITMQ_VIRTUAL_HOST}'
# KOMBU_URL = 'sqla+sqlite:////celery_sqlite3.sqlite' # 4个//// 代表磁盘根目录下生成一个文件。推荐绝对路径。3个///是相对路径。