Flow Analysis & Time-based Bloom Filters »
Created at: 06.01.2010 19:45, source: igvita.com, tagged: Architecture bigdata bloomfilter flow
Working with large streams of data is becoming increasingly widespread, be it for log, user behavior, or raw firehose analysis of user generated content. There is some very interesting academic literature on this type of data crunching, although much of it is focused on query or network packet analysis and is often not directly applicable to the type of data we have to deal with in the social web. For example, if you were tasked to build (a better) "Trending Topics" algorithm for Twitter, how would you do it?
Of course, the challenge is that it has to be practical - it needs to be "real-time" and be able to react to emerging trends in under a minute, all the while using a reasonable amount of CPU and memory. Now, we don't know how the actual system is implemented at Twitter, nor will we look at any specific solutions - I have some ideas, but I am more curious to hear how you would approach it. Instead, I want to revisit the concept of Bloom Filters, because as I am making my way through the literature, it is surprising how sparsely they are employed for these types of tasks. Specifically, a concept I have been thinking of prototyping for some time now: time-based, counting bloom filters!
Bloom Filters: What & Why
A Bloom Filter is a probabilistic data structure which can tell if an element is a member of a set. However, the reason it is interesting is because it accomplishes this task with an incredibly efficient use of memory: instead of storing a full hash map, it is simply a bit vector which guarantees that you may have some small fraction of false positives (the filter will report that a key is in the bloom filter when it is really not), but it will never report a false negative. File system and web caches frequently use bloom filters as the first query to avoid otherwise costly database or file system lookups. There is some math involved in determining the right parameters for your bloom filter, which you can read about in an earlier post.

Of course, as is, the Bloom Filter data structure is not very useful for analyzing continuous data streams - eventually we would fill up the filter and it would begin reporting false positives all the time. But, what if your bloom filter only remembered seen data for a fixed interval of time? Imagine adding time-to-live (TTL) timestamp on each record. All of the sudden, if you knew the approximate number of messages for the interval of time you wanted to analyze, then a bloom filter is once again an incredibly fast and space-efficient (fixed memory footprint) data structure!
Time-based Bloom Filters
Arguably the key feature of bloom filters is their compact representation as a bit vector. By associating a timestamp with each record, the size of the filter immediately expands by an order of magnitude, but even with that, depending on the size of the time window you are analyzing, you could store the TTL's in just a few additional bits. Conversely, if counting bits is not mission critical, you could even used a backend such as Redis or Memcached to drive the filter as well. The direct benefit of such approach is that the data can be shared by many distributed processes. On that note, I have added a prototype Redis backend to the bloomfilter gem which implements a time-based, counting Bloom Filter. Let's take a look at a simple example:
require 'bloomfilter' options = { :size => 100, # size of bit vector :hashes => 4, # number of hash functions :seed => rand(100), # seed value for the filter :bucket => 3 # number of bits for the counting filter } # Regular, in-memory counting bloom filter bf = BloomFilter.new(options) bf.insert("mykey") bf.include?("mykey") # => true bf.include?("mykey1") # => false # # Redis-backed bloom filter, with optional time-based semantics # bf = BloomFilter.new(options.merge({:type => :redis, :ttl => 2, :server => {:host => 'localhost'}})) bf.insert("mykey") bf.include?("mykey") # => true sleep(3) bf.include?("mykey") # => false # custom 5s TTL for a key bf.insert("newkey", nil, 5)
Storing data in Redis or Memcached is roughly an order of magnitude less efficient, but it gives us an easy to use, distributed, and fixed memory filter for analyzing continuous data streams. In other words, a useful tool for applications such as duplicate detection, trends analysis, and many others.
Mechanics of Time-Based Bloom Filters
So how does it work? Given the settings above, we create a fixed memory vector of 100 buckets (or bits in raw C implementation). Then, for each key, we hash it 4 times with different key offsets and increment the counts in those buckets - a non-negative value indicates that one of the hash functions for some key has used that bucket. Then, for a lookup, we reverse the operation: generate the 4 different hash keys and look them up, if all of them are non-zero then either we have seen this key or there has been a collision (false positive). By optimizing the size of the bit vector we can control the false positive rate - you're always trading the of amount of allocated memory vs. collision rate. Finally, we also make use of the native expire functionality in Redis to guarantee that keys are only stored for a bounded amount of time.
Time-based bloom filters have seen a few rogue mentions in the academic literature, but to the best of my knowledge, have not seen wide applications in the real world. However, it is an incredibly powerful data structure, and one that could benefit many modern, big-data applications. Gem install the bloomfilter gem and give it a try, perhaps it will help you build a better trends analysis tool. Speaking of which, what other tools, algorithms, or data structures would you use to build a "Trending Topics" algorithm for a high-velocity stream?
more »
Ruby & WebSockets: TCP for the Browser »
Created at: 22.12.2009 18:39, source: igvita.com, tagged: Architecture html5 realtime websocket
WebSockets are one of the most underappreciated innovations in HTML5. Unlike local storage, canvas, web workers, or even video playback, the benefits of the WebSocket API are not immediately apparent to the end user. In fact, over the course of the past decade we have invented a dozen technologies to solve the problem of asynchronous and bi-directional communication between the browser and the server: AJAX, Comet & HTTP Streaming, BOSH, ReverseHTTP, WebHooks & PubSubHubbub, and Flash sockets amongst many others. Having said that, it does not take much experience with any of the above to realize that each has a weak spot and none solve the fundamental problem: web-browsers of yesterday were not designed for bi-directional communication.
WebSockets in HTML5 change all of that as they were designed from the ground up to be data agnostic (binary or text) with support for full-duplex communication. WebSockets are TCP for the web-browser. Unlike BOSH or equivalents, they require only a single connection, which translates into much better resource utilization for both the server and the client. Likewise, WebSockets are proxy and firewall aware, can operate over SSL and leverage the HTTP channel to accomplish all of the above - your existing load balancers, proxies and routers will work just fine.
WebSockets in the Browser: Chrome, Firefox & Safari
The WebSocket API is still a draft, but the developers of our favorite browsers have already implemented much of the functionality. Chrome’s developer build (4.0.249.0) now officially supports the API and has it enabled by default. Webkit nightly builds also support WebSockets, and Firefox has an outstanding patch under review. In other words, while mainstream adoption is still on the horizon, as developers we can start thinking about much improved architectures that WebSockets enable. A minimal example with the help of jQuery:
<html>
<head>
<script src='http://ajax.googleapis.com/ajax/libs/jquery/1.3.2/jquery.min.js'></script>
<script>
$(document).ready(function(){
function debug(str){ $("#debug").append("<p>"+str+"</p>"); };
ws = new WebSocket("ws://yourservice.com/websocket");
ws.onmessage = function(evt) { $("#msg").append("<p>"+evt.data+"</p>"); };
ws.onclose = function() { debug("socket closed"); };
ws.onopen = function() {
debug("connected...");
ws.send("hello server");
};
});
</script>
</head>
<body>
<div id="debug"></div>
<div id="msg"></div>
</body>
</html>
The above example showcases the bi-directional nature of WebSockets: send pushes data to the server, and onmessage callback is invoked anytime the server pushes data to the client. No need for long-polling, HTTP header overhead, or juggling multiple connections. In fact, you could even deploy the WebSocket API today without waiting for the browser adoption by using a Flash socket as an intermediate step: web-socket-js.
Streaming Data to WebSocket Clients
WebSockets are not the same as raw TCP sockets and for a good reason. While it may seem tempting to be able to open a raw TCP connections from within the browser, the security of the browser would be immediately compromised: any website could then access the network on behalf of the user, within the same security context as the user. For example, a website could open a connection to a remote SMTP server and start delivering spam - a scary thought. Instead, WebSockets extend the HTTP protocol by defining a special handshake in order for the browser to establish a connection. In other words, it is an opt-in protocol which requires a standalone server.

Nothing stops you from talking to an SMTP, AMQP, or any other server via the raw protocol, but you will have to introduce a WebSocket server in between to mediate the connection. Kaazing Gateway already provides adapters for STOMP and Apache ActiveMQ, and you could also implement your own JavaScript wrappers for others. And if a Java based WebSocket server is not for you, Ruby EventMachine also allows us to build a very simple event-driven WebSocket server in just a few lines of code:
require 'em-websocket' EventMachine::WebSocket.start(:host => "0.0.0.0", :port => 8080) do |ws| ws.onopen { ws.send "Hello Client!"} ws.onmessage { |msg| ws.send "Pong: #{msg}" } ws.onclose { puts "WebSocket closed" } end
Consuming WebSocket Services
Support for WebSockets in Chrome and Safari also means that our mobile devices will soon support bi-directional push, which is both easier on the battery, and much more efficient for bandwidth consumption. However, WebSockets can also be utilized outside of the browser (ex: real-time data firehose), which means that a regular Ruby HTTP client should be able to handle WebSockets as well:
require 'eventmachine' EventMachine.run { http = EventMachine::HttpRequest.new("ws://yourservice.com/websocket").get :timeout => 0 http.errback { puts "oops" } http.callback { puts "WebSocket connected!" http.send("Hello client") } http.stream { |msg| puts "Recieved: #{msg}" http.send "Pong: #{msg}" } }
WebSocket support is still an experimental branch within em-http-request, but the aim is to provide a consistent and fully transparent API: simply specify a WebSocket resource and it will do the rest, just as if you were using a streaming HTTP connection! Best of all, HTTP & OAuth authentication, proxies and existing load balancers will all work and play nicely with this new delivery model.
WebHooks, PubSubHubbub, WebSockets, ...
Of course, WebSockets are not the panacea to every problem. WebHooks and PubSubHubbub are great protocols for intermittent push updates where a long-lived TCP connection may prove to be inefficient. Likewise, if you require non-trivial routing then AMQP is a powerful tool, and there is little reason to reinvent the powerful presence model built into XMPP. Right tool for the right job, but WebSockets are without a doubt a much-needed addition to every developers toolkit.
more »
Future of RDBMS is RAM Clouds & SSD »
Created at: 07.12.2009 18:51, source: igvita.com, tagged: Architecture databases database ramcloud ssd
Rumors of the demise of relational database systems are greatly exaggerated. The NoSQL movement is increasingly capturing the mindshare of the developers, all the while the academia have been talking about the move away from "RDBMS as one size fits all" for several years. However, while the new storage engines are exciting to see, it is also important to recognize that relational databases still have a bright future ahead - RDBMS systems are headed into main memory, which changes the playing field all together.
Performance is only one aspect that influences the choice of a database. Tree and graph structures are not easy to model within a relational structure, which in turn leads to complicated schemas and system overhead. For that reason alone, document-stores (Tokyo, CouchDB, MongoDB), graph stores (Neo4J), and other alternative data structure databases (Redis) are finding fertile ground for adoption. However, the end of "RDBMS as one size fits all" does not mean the end of relational systems all together. It is too early to bury RDBMS in favor of No (or Less) SQL. We just need to reset how we think about the RDBMS.
Disks are the New Tape
The evolution of disks has been extremely uneven over the last 25 years: disk capacity has increased 1000x, data transfer speeds increased 50x, while seek and rotational delays have only gone up by a factor of 2. Hence, if we only needed to transfer several hundred kilobytes of data in the mid 80's to achieve good disk utilization, then today we need to read at least 10MB of data to amortize the costs of seeking the data - refresh your memory on seek, rotational, and transfer times of our rusty hard drives.
When the best we can hope for is 100-200 IOPS out of a modern hard drive, the trend towards significantly larger block sizes begins to make a lot more sense. Whereas your local filesystem is likely to use 4 or 8kb blocks, systems such as Google's GFS and Hadoop's HDFS are opting out for 64MB+ blocks in order to amortize the cost of seeking for the data - by using much larger blocks, the cost of seeks and access time is once again brought down to single digit percent figures over the transfer time.
Hence, as we generate and store more and more data, the role of the disks must inevitably become more archival. Batch processing systems such as Map-Reduce are well suited for this world and are quickly replacing the old business intelligence (BI) systems for exactly these reasons. In the meantime, the limitations imposed by the random access to disk mean that we need to reconsider the role of disk in our database systems.
OLTP is Headed Into Main Memory & Flash
An average random seek will take 5-10ms when hitting the physical disk and hundreds of microseconds for accessing data from cache. Compare that to a fixed cost of 5-10 microseconds for accessing data in RAM and the benefits of a 100-1000x speed difference can be transformative. Instead of treating memory as a cache, why not treat it as a primary data store? John Ousterhout and his co-authors outline a compelling argument for "RAMCloud". After all, if Facebook keeps over 80% of their data in memcached, and Google stores entire indexes of the web in memory many times over, then your average database-backed application should easily fit and be able to take advantage of the pure memory model also.
The moment all of the data is available in memory, it is an entirely new game: access time and seek times become irrelevant (no disk seeks), the value of optimizing for locality and access patterns is diminished by orders of magnitude, and in fact, entirely new and much richer query models can enable a new class of data-intensive applications. In a world where the developer's time is orders of magnitude more expensive than the hardware (a recent phenomenon), this also means faster iterations and less data-optimization overhead.
The downside to the RAMCloud is the equivalent order of magnitude increase in costs - RAM prices are dropping, but dollar for dollar, RAMCloud systems are still significantly more expensive. Flash storage is an obvious compromise for both speed and price. Theoretical access time for solid-state devices is on the order of 50 microseconds for reads, and 200 microseconds for writes. However, in reality, wrapping solid-state storage in SATA-like hardware devices brings us back to ~200 microseconds for reads, or ~5000 IOPS. Though, of course, innovation continues and devices such as FusionIO’s PCI-E flash storage controller bring us back to 80 microsecond reads at a cost of ~$11 per Gigabyte.
However, even the significantly higher hardware price point is often quickly offset once you factor in the saved developer time and adjacent benefits such as guaranteed performance independent of access patterns or data locality. Database servers with 32GB and 64GB of RAM are no longer unusual, and when combined with SSDs, such as the system deployed at SmugMug, often offer a much easier upgrade path than switching your underlying database system to a NoSQL alternative.
Database Architecture for the RAMCloud
Migrating your data into RAM or Flash yields significant improvements via pure speedup in hardware, however, "it is time for a complete rewrite" argument still holds: majority of existing database systems are built with implicit assumptions for disk-backed storage. These architectures optimize for disk-based indexing structures, and have to rely on multithreading and locking-based concurrency to hide latency of the underlying storage.
When access time is measured in microseconds, optimistic and lock-free concurrency is fair game, which leads to much better multi-core performance and allows us to drop thousands of lines of code for multi-threaded data structures (concurrent B-Trees, etc). RethinkDB is a drop-in MySQL engine designed for SSD drives leveraging exactly these trends, and Drizzle is a larger fork of the entire MySQL codebase aimed at optimizing the relational model for "cloud and net applications": massively distributed, lightweight kernel and extensible.
Migrating Into Main Memory
Best of all, you can start leveraging the benefits of storing your data in main memory even with the existing MySQL databases - most of them are small enough to make the memory buffers nothing but a leaky abstraction. Enable periodic flush to disk for InnoDB (innodb_flush_log_at_trx_commit=2), and create covering indexes for your data (a covering index is an index which itself contains all the required data to answer the query). Issue a couple of warm-up requests to load the data into memory and you are off to the races.
Of course, the above strategy is at best an intermediate solution, so investigating SSD’s as a primary storage layer, and if you are adventurous, give RethinkDB a try. Also keep an eye on Drizzle as the first production release is aimed for summer of 2010. Alternative data storage engines such as Redis, MongoDB and others are also worth looking into, but let us not forget: laws of physics still apply to NoSQL. There is no magic there. Memory is fast, disks are slow. Nothing is stopping relational systems from taking advantage of main memory or SSD storage.
more »
Consuming XMPP PubSub in Ruby »
Created at: 10.11.2009 19:22, source: igvita.com, tagged: Architecture pubsub realtime xmpp
XMPP is a very versatile protocol with well over several hundred proposed and working extensions, which has also proven itself in production (ex: Google Talk). Presence, roster management, federated and server to server (S2S) messaging are all examples of features that you get for free, which make it a very appealing platform for messaging applications. Combine it with extensions such as XEP-0060 (PubSub), and we have all the relevant buzzwords: pubsub, real-time, federated, and presence.
The PubSub specification within XMPP, as defined in XEP-0060, is definitely not as flexible as that of AMQP, but it is often times enough to cover the most popular use cases. However, technical merits aside, one of the key missing components, especially in Ruby, has been the historical lack of functioning libraries - xmpp4r claims to support it, but examples are lacking. Thankfully, after test driving the latest batch of gems, it looks like we're finally there.
Getting off the ground with XMPP
Without a good toolkit XMPP can be a gnarly protocol to get started with - Pidgin IM client has some great tools for spying on the exchange, but monitoring pages of XML scroll by can only get you so far. Thankfully, Seth Fitzsimmons has built switchboard ("curl for XMPP"), which offers a powerful command line tool to greatly simplify the process. Make sure to read the full tutorial, or jump right into it by testing it with the Wordpress XMPP stream:
# list available options, subscribe to a blog, list subscriptions and then open the stream
switchboard disco --target pubsub.im.wordpress.com info
switchboard pubsub --server pubsub.im.wordpress.com --node /blog/icanhazcheesburger.com subscribe
switchboard pubsub --server pubsub.im.wordpress.com subscriptions
switchboard pubsub --server pubsub.im.wordpress.com listen
Based on xmpp4r, switchboard is also a toolkit for assembling your own XMPP clients, which means that it can be easily customized to power a PubSub consumer. From start to finish, and since examples are still hard to come by:
require 'rubygems' require 'switchboard' class WordpressJack def self.connect(switchboard, settings) switchboard.plug!(PubSubJack) switchboard.hook(:post) switchboard.on_pubsub_event do |event| event.payload.each do |payload| payload.elements.each do |item| on(:post, item) end end end end end settings = Switchboard::Settings.new settings['pubsub.server'] = 'pubsub.im.wordpress.com' settings['jid'] = 'user@im.wordpress.com' settings['password'] = 'password' switchboard = Switchboard::Client.new(settings) switchboard.plug!(WordpressJack) switchboard.on_post do |post| puts "A new post was received:" puts post.methods.sort.uniq exit end switchboard.run!
XMPP with EventMachine and Nokogiri
If you have an EventMachine stack, or looking for a high performance library, Jeff Smick's blather is definitely a gem to investigate. The combination of the asynchronous nature of EventMachine, a SAX parser within Nokogiri, and a great DSL make it very fast and a pleasure to work with:
require 'rubygems' require 'blather/client/client' require 'blather/client/dsl/pubsub' require 'blather' EventMachine.run { host = 'pubsub.im.wordpress.com' node = 'blog/icanhazcheesburger.com' user = 'user@im.wordpress.com' pass = 'pass' jid = Blather::JID.new(user) client = Blather::Client.setup(jid, pass) client.register_handler(:ready) { puts "Connected. Send messages to #{client.jid.inspect}." pub = Blather::DSL::PubSub.new(client, host) } client.register_handler(:pubsub_event) { |event| puts event } client.connect }
PubSub & Event-Driven Architecture
Having personally struggled in the past with XMPP PubSub and Ruby, it's been great to revisit the use case and find a new set of fully functional libraries. The event driven architecture which is enabled by technologies such as XMPP, AMQP, Comet, Webhooks and PubsubHubbub are increasingly becoming the staple of many web applications, and for a good reason. If you haven't already, grab switchboard or blather and take XMPP for a test drive.
more »
Nginx & Comet: Low Latency Server Push »
Created at: 21.10.2009 19:06, source: igvita.com, tagged: Architecture comet nginx push
Server push is the most efficient and low latency way to exchange data. If both the publisher and the receiver are publicly visible then a protocol such as PubSubHubbub or a simpler Webhook will do the job. However, if the receiver is hidden behind a firewall, a NAT, or is a web-browser which is designed to generated outbound requests, not handle incoming traffic, then the implementation gets harder. If you are adventurous, you could setup a ReverseHTTP server. If you are patient, you could wait for the WebSocket's API in HTML5. And if you need an immediate solution, you could compromise: instead of a fully asynchronous push model, you could use Comet, also known as Reverse Ajax, HTTP Server Push, or HTTP Streaming.
Coined by Alex Russell in early 2006, the term Comet is an umbrella term for technologies which take advantage of persistent connections initiated by the client and kept open until data is available (long polling), or kept open indefinitely as the data is pushed to the client (streaming) in chunks. The immediate advantage of both techniques is that the client and server can communicate with minimal latency. For this reason, Comet is widely deployed in chat applications (Facebook, Google, Meebo, etc), and is also commonly used as a firehose delivery mechanism.
Converting Nginx into a Long Polling Comet Server
A large entry barrier to Comet adoption is the implicit requirement for specialized, event driven web servers capable of efficiently handling large numbers of long polling connections. Friendfeed's Tornado server is a good example of an app level server that meets the criteria. However, thanks to Leo Ponomarev's efforts, you can now also turn your Nginx server into a fully functional Comet server with the nginx_http_push_module plugin.

Instead of using a custom framework, Leo's plugin exposes two endpoints on your Nginx server: one for the subscribers, and one for the publisher. The clients open long-polling connections to a channel on the Nginx server and start waiting for data. Meanwhile, the publisher simply POST's the data to Nginx and the plugin then does all the heavy lifting for you by distributing the data to the waiting clients. This means that the publisher never actually serves the data directly, it is simply an event generator! It is hard to make it any simpler then that.
Best of all, it only gets better from here. Both the client and the publisher can create arbitrary channels, and the plugin is also capable of message queuing, which means that the Nginx server will store intermediate messages if the client is offline. Queued messages can be expired based on time, size of the waiting stack, or through a memory limit.
Configuring Nginx & Ruby Demo
To get started you will have to build Nginx from source. Unpack the source tree, grab the plugin repo from GitHub and then build the server with the push module (./configure --add-module=/path/to/plugin && make && make install). Next, consult the readme and the protocol files to learn about all the available options. A simple multi client broadcast configuration looks like the following:
# internal publish endpoint (keep it private / protected) location /publish { set $push_channel_id $arg_id; #/?id=239aff3 or somesuch push_publisher; push_store_messages on; # enable message queueing push_message_timeout 2h; # expire buffered messages after 2 hours push_max_message_buffer_length 10; # store 10 messages push_min_message_recipients 0; # minimum recipients before purge } # public long-polling endpoint location /activity { push_subscriber; # how multiple listener requests to the same channel id are handled # - last: only the most recent listener request is kept, 409 for others. # - first: only the oldest listener request is kept, 409 for others. # - broadcast: any number of listener requests may be long-polling. push_subscriber_concurrency broadcast; set $push_channel_id $arg_id; default_type text/plain; }
Once you have the Nginx server up and running, we can setup a simple broadcast scenario with a single publisher and several subscribers to test-drive our new Comet server:
require 'rubygems' require 'em-http' def subscribe(opts) listener = EventMachine::HttpRequest.new('http://127.0.0.1/activity?id='+ opts[:channel]).get :head => opts[:head] listener.callback { # print recieved message, re-subscribe to channel with # the last-modified header to avoid duplicate messages puts "Listener recieved: " + listener.response + "\\n" modified = listener.response_header['LAST_MODIFIED'] subscribe({:channel => opts[:channel], :head => {'If-Modified-Since' => modified}}) } end EventMachine.run { channel = "pub" # Publish new message every 5 seconds EM.add_periodic_timer(5) do time = Time.now publisher = EventMachine::HttpRequest.new('http://127.0.0.1/publish?id='+channel).post :body => "Hello @ #{time}" publisher.callback { puts "Published message @ #{time}" puts "Response code: " + publisher.response_header.status.to_s puts "Headers: " + publisher.response_header.inspect puts "Body: \\n" + publisher.response puts "\\n" } end # open two listeners (aka broadcast/pubsub distribution) subscribe(:channel => channel) subscribe(:channel => channel) }
In the script above, every five seconds a publisher emits a new event to our Nginx server, which in turn, pushes the data to two subscribers which have long-polling connections open and are waiting for data. Once the message is sent to each subscriber, Nginx closes their connections and the clients then immediately re-establish them to wait for the next available message. End result, a real-time message push between the publisher and the clients via Nginx!
Long Polling, Streaming, and Comet in Production
Leo's module is still very young and is under active development, but it is definitely one to keep an eye on. The upcoming release is focused on bug fixes, but looking ahead there are also plans to add a streaming protocol: instead of closing the connection every time (aka, long polling), Nginx would keep it open and stream the incoming events as chunks of data to the clients in real-time. Having such an option would make it ridiculously easy to deploy your own firehose API's (ex: Twitter streaming).
Last but not least, don't forget about the growing number of other available modules for Nginx, or if you are so inclined, get a head start on building your own by reading Evan Miller's great guide on the subject.
more »
