博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
akka分布式爬虫框架(一)——设计思路与demo
阅读量:4224 次
发布时间:2019-05-26

本文共 6212 字,大约阅读时间需要 20 分钟。

最近在学习akka,在读了一下解析actor model的文章以及熟悉了一下官方文档的例子的后
我觉得需要一个项目来帮我进一步熟悉akka与scala编程,进过一番思索,我觉得akka可以用来
实现一个分布式爬虫框架。
   
设计思路

1. 依赖的库,

    http请求方面使用async-http-client,链接:https://github.com/AsyncHttpClient/async-http-client

    分布式框架则是使用akka。

    集中式存储系统使用kafka

    缓存使用redis

2. 运行流程

    1. 用户提交任务,就是提交一个jar包给manager。

    2. manager接收到jar包后对其进行扫描,读取其中的配置文件以及爬虫的具体逻辑,爬虫的接口现在考虑仿照scrapy的来

    3. manager根据jar包的爬虫逻辑类和配置创建相应的检查url的线程与执行任务的actor,并且将初始url推入kafka

    4. 检查url的线程启动,不断的从kafka中读取url,然后封装后投递给actor(此处应该考虑actor的负载均衡问题,暂时打算用平均方法算法,以后考虑支持负载均衡的配置)

    5. actor调用async-http-client异步请求url,同时注册回调时将response封装后投递给处理actor,同时将成功的url异步写入redis(防止爬取重复url)

    6. 处理actor调用用户自定义的处理方法,将解析获得的url集合与redis中已爬取url取差集,然后将未爬取url推入kafka

  核心逻辑测试

    模拟了用户解析过程,直接生成url,同时kafka与redis也舍去,直接用单例的LinkedBlockingQueue替代

    实现

package awmimport scala.collection.JavaConverters._import java.util.concurrent.{BlockingQueue, Future, LinkedBlockingQueue}import akka.actor.{Actor, ActorLogging, ActorRef, ActorSystem, Props}import com.ning.http.client.{AsyncCompletionHandler, AsyncHttpClient, Response}case class UrlMessage(url: String, id: Int)object Singleton {  val queue: BlockingQueue[String] = new LinkedBlockingQueue[String]()  val c = new AsyncHttpClient  var count: Long = 0}class CrawlerDemoActor extends Actor with ActorLogging{  override def receive = {    case UrlMessage(url, id) => {      val futureResponse: Future[Response] = Singleton.c.prepareGet(url).execute(new AsyncCompletionHandler[Response] {        override def onCompleted(response: Response) = {          Singleton.queue.addAll(List(url, url).asJava)          Singleton.count += 1          log.info(s"${Singleton.count}, ${Thread.activeCount}")          response        }      })      Singleton.queue.addAll(List(url, url).asJava)    }  }}object CrawlerDemoActor{  def props: Props = Props(new CrawlerDemoActor())}
 测试

import org.scalatest.{BeforeAndAfterAll, FlatSpecLike, Matchers}import akka.actor.{Actor, ActorSystem, Props, ActorRef}import akka.testkit.{ImplicitSender, TestActorRef, TestKit, TestProbe}import awm.{CrawlerDemoActor, UrlMessage}import util.control.Breaks._import awm.Singleton._class TestAsyncClientWithAkka(_system: ActorSystem)  extends TestKit(_system)  with Matchers  with FlatSpecLike  with BeforeAndAfterAll{  object N{var i = 0}  def this() = this(ActorSystem("akka-demo"))  override def afterAll(): Unit = {    shutdown(system)  }  "A Demo" should "succesful" in {    var c: List[ActorRef] = List.empty[ActorRef]    for (x <- 1 to 40){      val actor = system.actorOf(CrawlerDemoActor.props)      c = actor::c    }    queue.add("http://www.csdn.net/")    queue.add("http://www.oschina.net/")    val testProbe = TestProbe()    var i = 0    while (true){      val url = queue.poll()      breakable {        if (url == null)          break()        if (i >= 40)          i = 0        c(i) ! UrlMessage(url, N.i)        N.i += 1        i += 1      }    }  }}

pom.xml

org.scala-lang
scala-library
2.11.11
org.scala-lang
scala-reflect
2.11.11
org.scala-lang
scala-reflect
2.11.11
com.ning
async-http-client
1.9.40
junit
junit
4.12
test
com.typesafe.akka
akka-actor_2.11
2.5.6
com.typesafe.akka
akka-testkit_2.11
2.5.6
test
org.scalatest
scalatest_2.11
3.0.4
test
javax.xml.bind
jaxb-api
2.3.0
org.scala-tools
maven-scala-plugin
2.15.2
compile
testCompile

输出结果

[INFO] [11/05/2017 14:07:32.728] [New I/O worker #14] [akka://akka-demo/user/$H] 21823, 47[INFO] [11/05/2017 14:07:32.730] [New I/O worker #5] [akka://akka-demo/user/$z] 21824, 47[INFO] [11/05/2017 14:07:32.731] [New I/O worker #11] [akka://akka-demo/user/$k] 21825, 47[INFO] [11/05/2017 14:07:32.734] [New I/O worker #13] [akka://akka-demo/user/$c] 21826, 47[INFO] [11/05/2017 14:07:32.734] [New I/O worker #13] [akka://akka-demo/user/$n] 21827, 47[INFO] [11/05/2017 14:07:32.736] [New I/O worker #8] [akka://akka-demo/user/$z] 21828, 47[INFO] [11/05/2017 14:07:32.746] [New I/O worker #9] [akka://akka-demo/user/$n] 21829, 47[INFO] [11/05/2017 14:07:32.746] [New I/O worker #6] [akka://akka-demo/user/$A] 21830, 47[INFO] [11/05/2017 14:07:32.754] [New I/O worker #8] [akka://akka-demo/user/$K] 21831, 47[INFO] [11/05/2017 14:07:32.754] [New I/O worker #5] [akka://akka-demo/user/$r] 21832, 47[INFO] [11/05/2017 14:07:32.754] [New I/O worker #8] [akka://akka-demo/user/$o] 21833, 47[INFO] [11/05/2017 14:07:32.754] [New I/O worker #14] [akka://akka-demo/user/$j] 21834, 47[INFO] [11/05/2017 14:07:32.754] [New I/O worker #8] [akka://akka-demo/user/$o] 21835, 47[INFO] [11/05/2017 14:07:32.754] [New I/O worker #10] [akka://akka-demo/user/$r] 21836, 47[INFO] [11/05/2017 14:07:32.754] [New I/O worker #10] [akka://akka-demo/user/$N] 21837, 47[INFO] [11/05/2017 14:07:32.754] [New I/O worker #16] [akka://akka-demo/user/$K] 21838, 47
这个设计思路只是我一点不成熟的想法,欢迎大家提出建议。

转载地址:http://nigmi.baihongyu.com/

你可能感兴趣的文章
CUDA 学习(五)、线程块
查看>>
CUDA 学习(八)、线程块调度
查看>>
CUDA 学习(九)、CUDA 内存
查看>>
CUDA 学习(十一)、共享内存
查看>>
游戏感:虚拟感觉的游戏设计师指南——第十四章 生化尖兵
查看>>
游戏感:虚拟感觉的游戏设计师指南——第十五章 超级马里奥64
查看>>
游戏感:虚拟感觉的游戏设计师指南——第十七章 游戏感的原理
查看>>
游戏感:虚拟感觉的游戏设计师指南——第十八章 我想做的游戏
查看>>
游戏设计的艺术:一本透镜的书——第十章 某些元素是游戏机制
查看>>
游戏设计的艺术:一本透镜的书——第十一章 游戏机制必须平衡
查看>>
游戏设计的艺术:一本透镜的书——第十二章 游戏机制支撑谜题
查看>>
游戏设计的艺术:一本透镜的书——第十三章 玩家通过界面玩游戏
查看>>
编写苹果游戏中心应用程序(翻译 1.3 为iOS应用程序设置游戏中心)
查看>>
编写苹果游戏中心应用程序(翻译 1.4 添加游戏工具包框架)
查看>>
编写苹果游戏中心应用程序(翻译 1.5 在游戏中心验证本地玩家)
查看>>
编写苹果游戏中心应用程序(翻译 1.6 获取本地玩家的信息)
查看>>
编写苹果游戏中心应用程序(翻译 1.7 在游戏中心添加朋友)
查看>>
编写苹果游戏中心应用程序(翻译 1.8 获取本地玩家的好友信息)
查看>>
WebGL自学教程《OpenGL ES 2.0编程指南》翻译——勘误表
查看>>
WebGL自学教程——WebGL示例:13.0 代码整理
查看>>