passive_scan 基于http代理的web漏洞扫描器的实现开源项目

我要开发同款
匿名用户2019年05月05日
46阅读
开发技术GO语言
所属分类Google Go、安全相关、管理和监控
授权协议未知

作品详情

在WEB业务上线前,QA测试阶段,可将QA的浏览器代理设到一个指定的代理中或测试pc拨入特定的vpn中,QA在测试功能的同时,安全测试也会在后台同步完成,其好处不言而喻。

该类扫描器常见的有2种:

代理式vpn+透明代理

Note

本文只讲第1种,第2种的实现方式稍麻烦一些,一天半天的时间内写不出来,留在下篇文章中写。

架构说明

proxy模块的实现用户请求数据抓取

proxy模块是在开源项目 https://github.com/senko/tornado-proxy 的基础上改的,将用户的请求与服务器的响应数据过滤后存入了mongodb中。我新加的代码在30-38行之间。

classProxyHandler(tornado.web.RequestHandler):SUPPORTED_METHODS=['GET','POST','CONNECT']@tornado.web.asynchronousdefget(self):url_info=dict(method=self.request.method,url=self.request.uri)self.request_info=Nonedefhandle_response(response):if(response.errorandnotisinstance(response.error,tornado.httpclient.HTTPError)):self.set_status(500)self.write('Internalservererror:\n'+str(response.error))else:self.set_status(response.code)forheaderin('Date','Cache-Control','Server','Content-Type','Location'):v=response.headers.get(header)ifv:self.set_header(header,v)v=response.headers.get_list('Set-Cookie')ifv:foriinv:self.add_header('Set-Cookie',i)ifresponse.body:self.write(response.body)#Inserthttprequestandresponseintomongodbifself.application.scan:url=url_info.get('url')url_filter=UrlFilter(url)ifurl_filter.filter():http_info=HttpInfo(url_info,self.request_info,response)values=http_info.get_info()mongodb=Mongodb(db_info)mongodb.insert(values)self.finish()body=self.request.bodyself.request_info=self.requestifnotbody:body=Nonetry:fetch_request(self.request.uri,handle_response,method=self.request.method,body=body,headers=self.request.headers,follow_redirects=False,allow_nonstandard_methods=True)excepttornado.httpclient.HTTPErrorase:ifhasattr(e,'response')ande.response:handle_response(e.response)else:self.set_status(500)self.write('Internalservererror:\n'+str(e))self.finish()程序使用方法

Note

代码比较占篇幅,这里不贴了,请参考我的github: https://github.com/netxfly/passive_scan 。

proxy有2个参数:

port,端口不指定的话,默认为8088scan,scan默认为true,表示会将用户信息入库,如果单纯只想作为一个代理,传入false即可。

任务分发模块

Note

任务分发模块会定期检查mongodb中的待扫描列表,根据status字段判断是否有扫描任务,如果有扫描任务就分发给celery的worker执行。

status=0,表示待扫描status=1,表示正在扫描status=2,表示扫描已完成#-*-coding:utf-8-*-__author__='Hartnett'importtimefrompprintimportpprintimportpymongofrombson.objectidimportObjectIdfromconfigimportdb_infofromscan_tasksimportscanclassScheduler(object):def__init__(self,interval=5):self.interval=intervalself.db_info=db_info#connecttodatabaseself.client=pymongo.MongoClient(self.db_info.get('host'),self.db_info.get('port'))self.client.security_detect.authenticate(self.db_info.get('username'),self.db_info.get('password'),source='passive_scan')self.db=self.client["passive_scan"]self.collection=self.db['url_info']def_get_task(self):task_id=Nonetask_info=Nonetasks=self.collection.find({'status':0}).sort("_id",pymongo.ASCENDING).limit(1)fortaskintasks:url=task.get('url')task_id=task.get('_id')domain=task.get('domain')method=task.get('request').get('method')request_data=task.get('request').get('request_data')user_agent=task.get('request').get('headers').get('User-Agent')cookies=task.get('request').get('headers').get('Cookie')task_info=dict(task_id=task_id,url=url,domain=domain,method=method,request_data=request_data,user_agent=user_agent,cookies=cookies)print("task_id:%s,\ntask_info:")%task_idpprint(task_info)returntask_id,task_info#settaskcheckingnowdef_set_checking(self,task_id):self.collection.update({'_id':ObjectId(task_id)},{"$set":{'status':1}})#settaskcheckeddef_set_checked(self,task_id):self.collection.update({'_id':ObjectId(task_id)},{"$set":{'status':2}})#distributiontaskdefdistribution_task(self):task_id,task_info=self._get_task()print"getscantaskdone,sleep%ssecond."%self.intervaliftask_idisnotNone:self._set_checking(ObjectId(task_id))url=task_info.get('url')domain=task_info.get('domain')method=task_info.get('method')request_data=task_info.get('request_data')user_agent=task_info.get('user_agent')cookies=task_info.get('cookies')scan.apply_async((task_id,url,domain,method,request_data,user_agent,cookies,))self._set_checked(ObjectId(task_id))defrun(self):whileTrue:self.distribution_task()time.sleep(self.interval)if__name__=='__main__':scheduler=Scheduler()scheduler.run()

扫描任务执行模块

Note

任务扫描模块是利用celery实现分布式扫描的,可以将worker部署在多台服务器中,后端的扫描器大家根据实现情况加,比如wvs,arachni,wvs或自己写的扫描器,这篇文章的重点在于代理扫描,我图方便就用了arachni。

#-*-coding:utf8-*-__author__='hartnett'fromceleryimportCeleryfromarachniimportarachni_consolefromconfigimportBACKEND_URL,BROKER_URL,db_infofromhelperimportReporter,PassiveReport,TaskStatusapp=Celery('task',backend=BACKEND_URL,broker=BROKER_URL)#scanningurltask#--------------------------------------------------------------------@app.taskdefscan(task_id,task_url,domain,method,request_data,user_agent,cookies):iftask_url:print"starttoscan%s,task_id:%s"%(task_url,task_id)scanner=arachni_console.Arachni_Console(task_url,user_agent,cookies,page_limit=1)report=scanner.get_report()ifreport:reporter=Reporter(report)value=reporter.get_value()ifvalue:#如果存在漏洞则记录到数据库中scan_report=PassiveReport(db_info,value)scan_report.report()task_status=TaskStatus(db_info)#将状态设为已扫描task_status.set_checked(task_id)web管理后台

实现这个demo用了半天时间,写web后台还要处理前端展示,比较麻烦,所以没写,只讲下基于proxy的扫描器的实现思路

查看全文
声明:本文仅代表作者观点,不代表本站立场。如果侵犯到您的合法权益,请联系我们删除侵权资源!如果遇到资源链接失效,请您通过评论或工单的方式通知管理员。未经允许,不得转载,本站所有资源文章禁止商业使用运营!
下载安装【程序员客栈】APP
实时对接需求、及时收发消息、丰富的开放项目需求、随时随地查看项目状态

评论