在WEB业务上线前,QA测试阶段,可将QA的浏览器代理设到一个指定的代理中或测试pc拨入特定的vp中,QA在测试功能的同时,安全测试也会在后台同步完成,其好处不言而喻。
该类扫描器常见的有2种:
代理式vp+透明代理Note
本文只讲第1种,第2种的实现方式稍麻烦一些,一天半天的时间内写不出来,留在下篇文章中写。
架构说明proxy模块的实现用户请求数据抓取proxy模块是在开源项目 https://github.com/seko/torado-proxy 的基础上改的,将用户的请求与服务器的响应数据过滤后存入了mogodb中。我新加的代码在30-38行之间。
classProxyHadler(torado.web.RequestHadler):SUPPORTED_METHODS=['GET','POST','CONNECT']@torado.web.asychroousdefget(self):url_ifo=dict(method=self.request.method,url=self.request.uri)self.request_ifo=Noedefhadle_respose(respose):if(respose.erroradotisistace(respose.error,torado.httpcliet.HTTPError)):self.set_status(500)self.write('Iteralservererror:\'+str(respose.error))else:self.set_status(respose.code)forheaderi('Date','Cache-Cotrol','Server','Cotet-Type','Locatio'):v=respose.headers.get(header)ifv:self.set_header(header,v)v=respose.headers.get_list('Set-Cookie')ifv:foriiv:self.add_header('Set-Cookie',i)ifrespose.body:self.write(respose.body)#Iserthttprequestadresposeitomogodbifself.applicatio.sca:url=url_ifo.get('url')url_filter=UrlFilter(url)ifurl_filter.filter():http_ifo=HttpIfo(url_ifo,self.request_ifo,respose)values=http_ifo.get_ifo()mogodb=Mogodb(db_ifo)mogodb.isert(values)self.fiish()body=self.request.bodyself.request_ifo=self.requestifotbody:body=Noetry:fetch_request(self.request.uri,hadle_respose,method=self.request.method,body=body,headers=self.request.headers,follow_redirects=False,allow_ostadard_methods=True)excepttorado.httpcliet.HTTPErrorase:ifhasattr(e,'respose')ade.respose:hadle_respose(e.respose)else:self.set_status(500)self.write('Iteralservererror:\'+str(e))self.fiish()程序使用方法Note
代码比较占篇幅,这里不贴了,请参考我的github: https://github.com/etxfly/passive_sca 。
proxy有2个参数:
port,端口不指定的话,默认为8088sca,sca默认为true,表示会将用户信息入库,如果单纯只想作为一个代理,传入false即可。任务分发模块Note
任务分发模块会定期检查mogodb中的待扫描列表,根据status字段判断是否有扫描任务,如果有扫描任务就分发给celery的worker执行。
status=0,表示待扫描status=1,表示正在扫描status=2,表示扫描已完成#-*-codig:utf-8-*-__author__='Hartett'importtimefromppritimportppritimportpymogofrombso.objectidimportObjectIdfromcofigimportdb_ifofromsca_tasksimportscaclassScheduler(object):def__iit__(self,iterval=5):self.iterval=itervalself.db_ifo=db_ifo#coecttodatabaseself.cliet=pymogo.MogoCliet(self.db_ifo.get('host'),self.db_ifo.get('port'))self.cliet.security_detect.autheticate(self.db_ifo.get('userame'),self.db_ifo.get('password'),source='passive_sca')self.db=self.cliet["passive_sca"]self.collectio=self.db['url_ifo']def_get_task(self):task_id=Noetask_ifo=Noetasks=self.collectio.fid({'status':0}).sort("_id",pymogo.ASCENDING).limit(1)fortaskitasks:url=task.get('url')task_id=task.get('_id')domai=task.get('domai')method=task.get('request').get('method')request_data=task.get('request').get('request_data')user_aget=task.get('request').get('headers').get('User-Aget')cookies=task.get('request').get('headers').get('Cookie')task_ifo=dict(task_id=task_id,url=url,domai=domai,method=method,request_data=request_data,user_aget=user_aget,cookies=cookies)prit("task_id:%s,\task_ifo:")%task_idpprit(task_ifo)returtask_id,task_ifo#settaskcheckigowdef_set_checkig(self,task_id):self.collectio.update({'_id':ObjectId(task_id)},{"$set":{'status':1}})#settaskcheckeddef_set_checked(self,task_id):self.collectio.update({'_id':ObjectId(task_id)},{"$set":{'status':2}})#distributiotaskdefdistributio_task(self):task_id,task_ifo=self._get_task()prit"getscataskdoe,sleep%ssecod."%self.itervaliftask_idisotNoe:self._set_checkig(ObjectId(task_id))url=task_ifo.get('url')domai=task_ifo.get('domai')method=task_ifo.get('method')request_data=task_ifo.get('request_data')user_aget=task_ifo.get('user_aget')cookies=task_ifo.get('cookies')sca.apply_asyc((task_id,url,domai,method,request_data,user_aget,cookies,))self._set_checked(ObjectId(task_id))defru(self):whileTrue:self.distributio_task()time.sleep(self.iterval)if__ame__=='__mai__':scheduler=Scheduler()scheduler.ru()扫描任务执行模块Note
任务扫描模块是利用celery实现分布式扫描的,可以将worker部署在多台服务器中,后端的扫描器大家根据实现情况加,比如wvs,arachi,wvs或自己写的扫描器,这篇文章的重点在于代理扫描,我图方便就用了arachi。
#-*-codig:utf8-*-__author__='hartett'fromceleryimportCeleryfromarachiimportarachi_cosolefromcofigimportBACKEND_URL,BROKER_URL,db_ifofromhelperimportReporter,PassiveReport,TaskStatusapp=Celery('task',backed=BACKEND_URL,broker=BROKER_URL)#scaigurltask#--------------------------------------------------------------------@app.taskdefsca(task_id,task_url,domai,method,request_data,user_aget,cookies):iftask_url:prit"starttosca%s,task_id:%s"%(task_url,task_id)scaer=arachi_cosole.Arachi_Cosole(task_url,user_aget,cookies,page_limit=1)report=scaer.get_report()ifreport:reporter=Reporter(report)value=reporter.get_value()ifvalue:#如果存在漏洞则记录到数据库中sca_report=PassiveReport(db_ifo,value)sca_report.report()task_status=TaskStatus(db_ifo)#将状态设为已扫描task_status.set_checked(task_id)web管理后台实现这个demo用了半天时间,写web后台还要处理前端展示,比较麻烦,所以没写,只讲下基于proxy的扫描器的实现思路
评论