Big Data
Big Data is a term for data sets that are so large or complex that traditional data processing application softwares are inadequate to deal with them. Challenges include capture, storage, analysis, data curation, search, sharing, transfer, visualization, querying, updating, and information privacy. The term "big data" often refers simply to the use of predictive analytics, user behaviour analytics, or certain other advanced data analytics methods that extract value from data, and seldom to a particular size of data set.
Contents
Big Data terms and concepts
- Doug Laney's "4 V's of Big Data":[1]
- Volume
- Extremely large volumes of data (i.e., peta- or exa-bytes, as of February 2017)
- Variety
- Various forms of data (structured, semi-structured, and unstructured)
- Velocity
- Real-time (e.g., IoT, social media, sensors, etc.), batch, streams of data. Is usually either human- or machined-generated data.
- Veracity or variability
- Inconsistent, sometimes inaccurate, varying, or missing data
- Format of Big Data:
- Structured
- Data that has a defined length and format (aka "schema"). Examples include numbers, words, dates, etc. Easy to store and analyse. Often managed using SQL.
- Semi-structured
- Between structured and unstructured. Does not conform to a specific format, but is self-describing and involving simple key-value pairs. Examples include JSON, SWIFT (financial transactions), and EDI (healthcare).
- Unstructured
- Data that does not follow a specific format. Examples include audio, video, images, text messages, etc.
- Big Data Analytics:
- Basic analytics
- Reporting, dashboards, simple visualizations, slicing and dicing.
- Advanced analytics
- Complex analytics models using machine learning, statistics, text analytics, neural networks, data mining, etc.
- Operationalized analytics
- Embedded big data analytics in a business process to streamline and increase efficiency.
- Analytics for business decisions
- Implemented for better decision-making, which drives revenue.
- What is IoT?
- Internet of Things
- Physical objects that are connected to the Internet
- Identified by an IP address (IPv4 now; IPv6 in the future)
- Devices communicate with each other and other Internet-enabled devices and systems
- Includes everyday devices that utilize embedded technology to communicate with an external environment by connecting to the Internet
- IoT data is high volume, high velocity, high variety, and high veracity
- Examples of IoT:
- Security systems
- Thermostats (e.g., Nest)
- Vehicles
- Electronic appliances
- Smart-lighting in households or commercial buildings (e.g., Philips Hue)
- Fitness devices (e.g., Fitbit)
- Sensors to measure environmental parameters (e.g., temperature, humidity, wind, etc.)
- Cycle of Big Data management
- Capture data: depending on the problem to be solved, decide on the data sources and the data to be collected.
- Organize: cleanse, organize, and validate data. If data contains sensitive information, implement sufficient levels of security and governance.
- Integrate: integrate with business rules and other relevant systems like data warehouses, CRMs, ERPs (Enterprise Resource Planning), etc.
- Analyze: real-time analysis, batch type analysis, reports, visualizations, advanced analytics, etc.
- Act: use analysis to solve the business problem.
- Components of a Big Data infrastructure
- Redundant physical infrastructure: hardware, storage servers, network, etc.
- Security infrastructure: maintaining security and governance on data is critical to protect from misuse of Big Data.
- Data stores: to capture structured, semi-structured, and un-structured data. Data stores that need to be fast, scalable, and durable.
- Organize and integrate data: stage, clean, organize, normalize, transform, and integrate data.
- Analytics: traditional and including Business Intelligence and advanced analytics.
- Data:
- text, audio, video, etc.
- social media
- machine generated
- human generated
- Capture:
- distributed file systems
- streaming data
- NoSQL
- RDBMS
- Organize and integrate
- Apache Spark SQL
- Hadoop MapReduce
- ETL / ELT
- data warehouse
- Analyze
- predictive analytics
- advanced analytics
- social media and text analytics
- alerts and recommendations
- visualization, reports, dashboards
- Physical infrastructure
- You physical infrastructure can make or break your Big Data implementations. Has to support high-volume, high-velocity, high-variety of Big Data and be highly available, resilient, and redundant.
- Requirements to factor while designing the infrastructure include performance, availability, scalability, flexibility, and costs.
- Networks must be redundant and resilient and have sufficient capacity to accommodate the anticipated volume and velocity of data in addition to normal business data. You infrastructure should be elastic.
- Hardware storage and servers must have sufficient computing power and memory to support analytics requirements.
- Infrastructure operations: Managing and maintaining data centres to avoid catastrophic failure and thus preserve the integrity of data and continuity of business processes.
- Cloud based infrastructures allow outsourcing of building Big Data infrastructure and managing the infrastructure.
- Security infrastructure
- Data access: Same as non-Big Data implementations. Data access is granted only to users who have legitimate business reason to access the data.
- Application access; Accessing data from applications is defined by restrictions imposed by an API.
- Data encryption: Encrypting and decrypting data for high-volume, high-velocity, and high-variability can be expensive (computationally and to your wallet). An alternative is to encrypt only certain elements of the data that are sensitive and critical.
- Threat detection: With exposure to social media and mobile data comes increase exposure to threats. Multiple layers of defence for network security are required to protect from security threats.
- Data stores to capture data
- Data stores are at the core of the Big Data infrastructure and need to be fast, scalable, and highly available (HA).
- A number of different data stores are available and each is suitable for a set of different requirements.
- Example data stores include:
- Distributed file systems (e.g., Hadoop Distributed File System (HDFS))
- NoSQL databases (e.g., Cassandra, MongoDB)
- Traditional RDBMs (e.g., MySQL, Postgres)
- Real-time streaming data can be ingested using Apache Kafka, Apache Storm, Apache Spark streaming, etc.
- Distributed file systems
A shared file system mounted on several nodes of a cluster, that has the following characteristics:
- Access transparency: Can be accessed the same way as local files without knowing its exact location.
- Concurrency transparency: All clients have the same view of the files.
- Failure transparency: Clients should have a correct view after a server failure.
- Scalability transparency: Should work for small loads and scale for bigger loads on addition of nodes.
- Replication transparency: To support fault tolerance and high availability, replication across multiple servers must be transparent to the client.
- Relational Database Management systems (RDBMs)
Relational databases are easy to store and query data and follow the ACID principle:
- Atomicity: A transaction is all-or-nothing. If any part of the transaction fails, the entire transaction fails.
- Consistency: Any transaction will bring a database from one valid state to another.
- Isolation: Ensures that concurrent transactions executed result in a state of the system similar to if transactions were executed serially.
- Durability: Ensures that once a transaction is committed, it remains so, irrespective of a database crash, power, or other errors.
- NoSQL databases
Not only SQL (NoSQL) implies that there is more than one mechanism of storing data depending on the needs. NoSQL databases are mostly open source and built to handle the requirements of fast, scalable processing of unstructured data. Types of NoSQL databases are:
- Document-oriented data stores are mainly designed to store and retrieve collections of documents and support complex data forms in several standard formats, such as JSON, XML, and binary forms (e.g., PDFs or MS Word documents). Examples: MongoDB, CouchDB.
- A column-oriented database stores its content in columns instead of rows, with attribute values belonging to the same column stored contiguously. Examples: Cassandra, Hbase.
- A graph database is designed to store and represent data that utilize a graph model with nodes, edges, and properties related to one another through relations. Examples: Neo4j.
- Key-value stores data blobs referenced by a key. This is the simplest form of NoSQL and is high-performing. Examples: Memcached, Couchbase.
- Extract Transform Load (ETL)
- Extract: Read data from the data source.
- Transform: Convert the format of the extracted data so that it conforms to the requirements of the target database.
- Load: Write data to the target database.
Apache Hadoop's MapReduce is a popular choice for batch processing large volumes of data in a scalable and resilient style. Apache Spark is more suitable for applying complex analytics using machine learning models in an interactive approach.
- Data Warehouse
A data warehouse is a relational database that is designed for query and analysis rather than for transaction processing. It usually contains historical data derived from transaction data, but it can include data from other sources. It separates analysis workloads from transaction workloads and enables an organization to consolidate data from several sources.
Properties of a data warehouse include:
- Organized by subject are. It contains an abstracted view of the business.
- Highly transformed and structured.
- Data loaded in the data warehouse is based on a strictly defined use case.
- Data Lake
A data lake is a storage repository that holds a vast amount of raw data in its native format, including structured, semi-structured, and unstructured data. The data structure and requirements are not defined until the data is needed.
Data lakes can be built on Hadoop's HDFS or in the Cloud using Amazon S3, Google Cloud Storage, etc.
- Differences between a Data Lake and a Data Warehouse
- Data Lake:
- Retains all data.
- Data types consist of data sources, such as web server logs, sensor data, social network activity, text and images, etc.
- Processing of data follows "schema-on-read".
- Extremely agile and can adapt to changes in business requirements quickly.
- Harder to secure a data lake, but a top priority for data lake providers (as of early 2017).
- Hardware to support a data lake consists of commodity servers connected in a cluster.
- Used for advanced analytics, typically by data scientists.
- Data Warehouse:
- Only data that is required for the business use case and data that is highly modelled and structured.
- Consists of data extracted from transactional systems and consist of quantitative metrics and the attributes.
- Processing of data follows "schema-on-write".
- Due to its structured modelling, it is time consuming to modify business processes.
- An older technology and mature in security.
- Hardware consists of enterprise-grade servers and vendor provided data warehousing software.
- Used for operational analytics to report on KPIs and slices of the data.
- Analyzing Big Data
Examples of analytics:
- Predictive analytics: Using statistical models and machine learning algorithms to predict the outcome of a task.
- Advanced analytics: Applying deep learning for applications like speech recognition, face recognition, genomics, etc.
- Social media analytics: Analyze social media to determine market trends, forecast demand, etc.
- Text analytics: Derive insights from text using Natural Language Processing (NLP).
- Alerts and recommendations: Fraud detection and recommendation engines embedded within commerce applications.
- Reports, dashboards, and visualizations: Description analytics providing answers on what, when, where type questions.
- Big Data in the Cloud
Comparison of Big Data Cloud platforms | |||
---|---|---|---|
Amazon | Microsoft | ||
Big Data storage | S3 | Cloud Storage | Azure Storage |
MapReduce | Elastic MapReduce (EMR) | N/A (MapReduce replaced by Cloud Dataflow) | Hadoop on Azure, HDInsight |
Big Data analytics | EMR with Apache Spark and Presto | Cloud Dataproc, Cloud DataLab | HDInsight |
Relational Database | RDS | Cloud SQL | SQL Database as a Service |
NoSQL database | DynamoDB or EMR with Apache Hbase | Cloud BigTable, Cloud Datastore | DocumentDB, TableStorage |
Data warehouse | Redshift | BigQuery | SQL Data warehouse as a Service |
Stream processing | Kinesis | Cloud Dataflow | Azure Stream analytics |
Machine Learning | Amazon ML | Cloud Machine Learning Services, Vision API, Speech API, Natural Language API, Translate API | Azure ML, Azure Cognitive Services |
Big Data tools and technologies
- Apache Hadoop
- Hadoop is an open-source software framework for distributed storage and the distributed processing of very large data sets on compute clusters built from commodity hardware.
- Originally build by Yahoo engineer Doug Cutting in 2005 and named after his son's toy elephant.
- Inspired by Google's MapReduce and Google File System papers.
- Written in Java to implement the MapReduce programming model for scalable, reliable, and distributed computing.
The Apache Hadoop framework is composed of the following modules:
- Hadoop Common: Contains libraries and utilities needed by other Hadoop modules.
- Hadoop Distributed File System (HDFS): A distributed file system that stores data on commodity machines, providing very high aggregate bandwidth across the cluster.
- Hadoop MapReduce: A programming model for large-scale data processing.
- Hadoop YARN: A resource management platform responsible for managing compute resources in clusters and using them for the scheduling of users' applications.
- Hadoop Distributed File System (HDFS)
- Structured like a regular Unix-like file system with data storage distributed across several machines in a cluster.
- A data service that sits atop regular file systems, allowing a fault-tolerant, resilient, clustered approach to storing and processing data.
- Fault-tolerant: Detection of faults and quick automatic recovery is a core architectural goal.
- Tuned to support large files. A typical file size is in the GB or TB and can support tens of millions of files by scaling to hundreds of nodes in a cluster.
- Follow the write once, read multiple time approach, simplifying data coherency issues and enabling high throughput data access. Example: a web crawler application.
- Optimized for throughput rather than latency. Hence, suited for long running batch operations on large scale data rather than interactive analysis on streaming data.
- Moving computation near the data reduces network congestion and latency and increases throughput. HDFS provides interfaces or applications to move closer to where the data is stored.
- HDFS architecture
- Master-slave architecture, where Namenode is the master and Datanodes are the slaves.
- Files are split into blocks, and blocks are stored on Datanodes, which are generally one per node in the cluster.
- Datanodes manage storage attached to the nodes that they run on.
- The Namenode controls all metadata, including what blocks make up a file and which Datanode the blocks are stored on.
- The Namenode executes file system operations like opening, closing, and renaming files and directories.
- Datanodes serve read/write requests from the clients.
- Datanodes also perform block creation, deletion, and replication upon instruction from the Namenode.
- The Namenode and Datanodes are Java software designed to run on commodity hardware that supports Java.
- Usually, a cluster contains a single Namenode and multiple Datanodes; one each for each node in the cluster.
- File system and replication in HDFS
- Supports a traditional hierarchical file system. Users or applications can create directories and store files inside of directories. Can move, rename, or delete files and directories.
- Stores each file as a sequence of blocks, where blocks are replicated for fault-tolerance. Block size and replication factors are configurable per file.
- The Namenode makes all decisions regarding replication of blocks. It periodically receives heartbeat and BlockReport from each of the Datanodes in the cluster. A receipt of a heartbeat implies that the Datanode is functioning properly.
- Hadoop MapReduce
- Software framework:
- For easily writing applications that process vast amount of data (multi-terabyte data sets)
- Enables parallel processing on large clusters of thousands of nodes
- Runs on clusters made of commodity hardware
- Is reliable and fault-tolerant
- The Hadoop MapReduce layer consists of two components:
- An API for writing MapReduce workflows in Java
- A set of services for managing these workflows, providing scheduling, distribution, and parallelizing.
- A MapReduce job consists of the following steps
- Splits the data sets into independent chunks.
- Data sets are processed by map tasks in a parallel manner.
- The MapReduce framework sorts the output of map jobs and feeds them to the reduce tasks.
- Both input and output of map and reducec tasks are stored on the file system.
- The framework takes care of scheduling tasks, monitoring them in re-executing failed tasks.
- The MapReduce framework and HDFS are running on the same set of nodes. Tasks are scheduled on nodes where data is already present, thereby yielding high-bandwidth across the cluster.
- Inputs and outputs in a MapReduce job
- MapReduce operates exclusively on key-value pairs.
- Input is a large-scale data set that benefits from parallel processing and does not fit on a single machine.
- Input is split into multiple independent data sets and the map function produces a key-value pair for each record in the data set.
- The output of mappers are shuffled, sorted, grouped, and passed to the reducers.
- The reducer function is applied to set of key-value pairs that share the same key. The reducer function often aggregates the value for the pairs with the same key.
- Almost all data can be mapped to a key-value pair by using a map function.
- Keys and values can be of any type: string, numeric, or a custom type. Since they have to be serializable, a custom type must implement the writable interface provided by MapReduce.
- MapReduce cannot be used if a computation of a value depends on a previously computed value. Recursive functions like Fibonacci cannot be implemented using MapReduce.
- Applications of MapReduce
- Counting votes across the US by processing data from each polling booth.
- Aggregating electricity consumption from data points collected across a large geographical area.
- Word count applications used for document classification, page rank in web searches, sntiment analysis, etc.
- Used by Google Maps to calculate nearest neighbour (e.g., find the nearest gas station).
- Performing statistical aggregate type functions on large data sets.
- Distributed grep: grepping files across a large cluster.
- Counting the number of href links in web log files for clickstream analysis.
- Writing and running MapReduce jobs
- MapReduce jobs are typically written in Java, but can also be written using:
- Hadoop Streaming: A utility that allows users to create and run MapReduce jobs with any executables (e.g., Linux shell utilities) as the mapper and/or the reducer.
- Hadoop Pipes: A SWIG-compatible C++ API to implement MapReduce applications.
- A Hadoop job configuration consists of:
- Input and output locations on HDFS.
- Map and reduce functions via implementations of interfaces or abstract classes.
- Other job parameters
- A Hadoop job client then submits the job (jar/executable) and configuration to the ResourceManager in YARN, which distributes them to the workers and performs functions like scheduling, monitoring, and providing status and diagnostic information.
- YARN (Yet Another Resource Negotiator)
- Introduced in Hadoop 2.0, YARN provides a more general processing platform that is not constrained to MapReduce.
- It is a global ResourceManager: It is the ultimate authority, which arbitrates resources among all the applications in the system.
- It provides a per-machine NodeManager that is responsible for containers, monitoring their resource usage (CPU, memory, disk, network) and reporting the same to the ResourceManager.
- A "container" is an abstract notion in a YARN platform. It represents a collection of physical resources. This could also mean CPU cores, disk, and RAM.
- The Resource Manager has two components: Scheduler and ApplicationsManager.
- The Scheduler is responsible for allocating resources to the various running applications.
- The ApplicationsManager is responsible for accepting job-submissions, negotiating the first container for executing the application-specific ApplicationMaster and provides the service for restarting the ApplicationMaster container on failure.
- The per-application ApplicationMaster has the responsibility of negotiating appropriate resource containers from the Scheduler, tracking their status and monitoring for progress.
- The Hadoop ecosystem
- Apache Hadoop is an open source framework for storing and processing large scale data distributed across clusters of nodes.
- To make Hadoop enterprise-ready, numerous Apache software foundation projects are available to integrate and deploy with Hadoop. Each project has its own community of developers and release cycles.
- The Hadoop ecosystem includes both open source Apache software projects and a wide range of commercial tools and solutions that integrate with Hadoop to extend its capabilities.
- Commercial Hadoop offerings include distributions from vendors like Hortonworks, Cloudera, and MapR, as well as a variety of tools for specific Hadoop development and maintenance tasks.
- Applications:
- HDFS: Hadoop Distributed File system
- YARN Map Reduce v2: distributed processing framework
- Ambari: provisioning, managing, and monitoring Hadoop clusters
- Flume: log collector
- Sqoop: data exchange
- Oozie: workflow
- Pig: scripting
- Mahout: machine learning
- R Connectors: statistics
- Hive: SQL query
- Hbase: columnar store
The 5 functions of the Hadoop ecosystem:
- Data management using HDFS, HBase, and YARN
- Data access with MapReduce, Hive, and Pig.
- Data ingestion and integration using Flume, Sqoop, Kafka, and Storm.
- Data monitoring using Ambari, Zookeeper, and Oozie.
- Data governance and security using Falcon, Ranger, and Knox.
Data access in the Hadoop ecosystem
- MapReduce
- A framework for writing applications that process large amounts of structured and un-structured data in parallel across a cluster of thousands of machines in a reliable and fault-tolerant manner.
- Apache Hive
- A data warehouse, which enables easy data summarization and ad hoc queries via an SQL-like interface for large datasets stored in HDFS.
- Hive evolved as a data warehousing solution built on top of the Hadoop MapReduce framework.
- Hive enables data warehouse tasks like ETL, data summarization, query, and analysis.
- Can access files stored in HDFS or other mechanisms (e.g., HBase).
- Hive provides SQL-like declarative language, called HiveQL, to run distributed queries on large volumes of data.
- In Hive, tables and databases are created first and then data is loaded into these tables for managing and querying structured data.
- Hive comes with a command-line shell interface, which can be used to create tables and execute queries.
- Originally built by Facebook and was open-sourced to the Apache Foundation.
- Hive engine : compiles HiveQL queries into MadReduce jobs to be executed on Hadoop. In addition, custom MapReduce scripts can also be plugged into queries. Components of Hive include:
- HCatalog : A table and storage management layer for Hadoop, which enables users with different data processing tools, including Pig and MapReduce, to more easily read and write data on the grid.
- WebHCat : Provides a service that one can use to run Hadoop MapReduce (or YARN), Pig, Hive jobs, or perform Hive metadata operations using an HTTP (REST-style) interface.
- Apache Pig
- Provides scripting capabilities. Offers an alternative to MapReduce programming in Java. Pig scripts are automatically converted to MapReduce programs by the Pig engine.
- A high-level programming language useful for analyzing large data sets. Enables data analysts to write data analysis programs without the complexities of MapReduce.
- Developed at Yahoo and open-sourced to Apache.
- Pig runs on Apache Hadoop YARN and makes use of MapReduce and the Hadoop Distributed File System (HDFS).
- Pig consists of 2 components:
- Pig Latin: A language to write scripts; and
- Runtime environment: Includes a parser, optimizer, compiler, and execution engine. Convert Pig scripts into a series of MapReduce jobs and executes them on the Hadoop MapReduce engine.
- Pig Latin
- The Pig Latin program consists of a series of operations or transformations, which are applied to the input data to produce output. The languages consists of data types, statements, general and relational operators like join, group, filter, etc.
- Users can extend Pig Latin by writing their own functions using Java, Python, Ruby, or other scripting languages.
- Pig Latin is sometimes extended using User Defined Functions (UDFs), which the user can write in any of the above languages and can then call them directly from within Pig Latin.
- Pig has two execution modes:
- Local mode: Runs in a single JVM and makes use of the local file system. This mode is only suitable for analysis of small data sets using Pig.
- MapReduce mode: In this mode, queries written in Pig Latin are translated into MapReduce jobs and run on a Hadoop cluster. Used for running on large data sets.
- Hive vs. Pig: While Hive is for querying data, Pig is for preparing data to make it suitable for querying.
Comparison of MapReduce, Pig, and Hive | ||
---|---|---|
MapReduce | Apache Pig | Apache Hive |
Compiled language | Scripting language | SQL-like query language |
Lower-level of abstraction | Higher-level of abstraction | High-level of abstraction |
More lines of code | Less lines of code | Less lines of code |
Technically complex and higher development effort required | Easy and lesser development effort required | Easy and lesser development effort required |
Code performance is high | Code performance is lesser than MapReduce | Code performance is lesser than MapReduce and provisioning |
Suitable for complex business logic | Easy for writing joins | Suitable for ad hoc analysis |
For structured and unstructured data | For both structure and unstructured data | For structured data only |
Ingestion and integration of data
- Goal and ingestion and integration of data
- Quickly and easily load and process data from a variety of sources.
- Apache Flume
- Flume allows you to efficiently move large amounts of log data from many different sources to Hadoop.
- Apache Sqoop
- Sqoop is an effective tool for transferring data from RDBMS and NoSQL data stores into Hadoop.
- Apache Kafka
- Publish-subscribe (pub/sub) messaging system for real-time event processing, which offers strong durability, scalability, and fault-tolerance support.
- Apache Storm
- Real-time message computation system for processing fast, large streams of data. It is no a queue.
- Apache Sqoop (SQL-to-Hadoop)
- Designed to support bulk import of data into HDFS from structured data stores, such as relational databases, enterprise data warehouses, and NoSQL systems.
- Also used to export from a Hadoop file system to a relational database.
- Sqoop Import: This tool imports individual tables from RDBMS to HDFS. Each row in a table is treated as a record in HDFS. All records are stored as text data in text files or as binary data in Avro and Sequence files.
- Sqoop Export: This tool exports a set of files from HDFS back to an RDBMS. The files given as input to Sqoop contain records, which are called as rows in a table. Those are read and parsed into a set of records and delimited with a user-specified delimiter.
- Sqoop connector architecture
- Data transfer between Sqoop and an external storage system is made possible with the help of Sqoop's connectors.
- Sqoop has connectors for working with a range of popular relational databases, including MySQL, PostreSQL, Oracle, SQL Server, and DB2.
- A generic JDBC connector for connecting to any database that support Java's JDBC protocol.
- Sqoop provides optimized MySQL and PostreSQL connectors that use database-specific APIs to perform bulk transfer efficiently.
- Various third party connectors for data stores, ranging from enterprise data warehouses (e.g., Netezza, Teradata, and Oracle) to NoSQL stores (e.g., Couchbase) can be downloaded and installed with Sqoop.
- Apache Flume
- Flume is a tool that provides a data ingestion mechanism for collecting, aggregating, and transporting large amounts of streams data, such as log files and events from various sources to a centralized data store.
- Flume supports a large set of source and destination from multiple sources. Including:
-
tail
: pipes data from a local file and writes into HDFS via Flume. Similar to the Unixtail
command. - System logs: Apache log4j (enables Java applications to write events to files in HDFS via Flume).
- Social networking sites (e.g., Twitter, Facebook, etc.)
- IoT streams
- Destinations include HDFS and HBase.
-
- The transactions in Flume are channel-based, where two transactions (one sender; one receiver) are maintained for each message. It guarantees reliable message deliver.
- When the rate of incoming data exceeds the rate at which data can be written to the destination, Flume acts as a mediator between data producers and the centralized stores and provides a steady flow of data between them.
- There are three Flume agents (JVM processes):
- Flume Data Source
- Flume Data Channel
- Flume Data Sink
- Source Media -> [Flume Agents (JVM processes): Flume Data Source -> Flume Data Channel -> Flume Data Sinks] -> HDFS
- Events from external sources are consumed by the Flume Data Source. The external source sends events to Flume in a format that is recognized by the target source.
- The Flume Data Source receives an event and stores in into one or more channels. The channel acts as a store, which keeps the event until it is consumed by the Flume sink. This channel may use a local file system to store these events.
- The Flume sink removes the event from the channel and stores it into an external repository (e.g., HDFS). There can be multiple Flume agents, in which case the Flume sink forwards the event to the Flume source of the next Flume agent in the flow.
- Flume has a flexible design based upon streaming data.
- Flume has its own query processing engine, which makes it easy to transform each new batch of data before it is moved to the intended sink.
- Apache Kafka
- Developed by LinkedIn and later open-sourced to Apache.
- Kafka uses a publish/subscribe (pub/sub) model.
- Kafka is a distributed streaming platform that allows you to:
- Publish and subscribe to streams of records.
- Store streams of records in a fault-tolerant manner.
- Process streams of records as they occur.
- Kafka is meant to run in a cluster on one or more servers.
- The Kafka cluster stores streams of records in categories called topics.
- Each record consists of a key, value, and a timestamp.
- Producers publish their data to Kafka, consumers subscribe to Kafka. Connectors store the Kafka data in DBs and stream processors analyze data stored in Kafka topics.
- Kafka has four core APIs:
- Producer API: Allows an application to publish a stream of records to one or more Kafka topics;
- Consumer API: Allows an application to subscribe to one or more topics and process the stream of records produced to them;
- Stream API: Allows an application to act as a stream processor, consuming an input stream from one or more topics and producing an output stream to one or more output topics, effectively transforming the input streams to output streams; and
- Connector API: Allow building and running reusable producers or consumers, which connect Kafka topics to existing applications or data systems.
- Apache Storm
- Real-time message computation system for processing fast, large streams of data.
- It is not a queue.
- It consumes streams of data and processes them in arbitrarily complex ways.
- It can integrate with any queuing and/or any database system.
- Example use case: Kafka provides a distributed and robust queue in a pub/sub model to pass high-volume, real-time data from one end-point to another. Storm extracts data from Kafka messages and allows manipulating data in many ways.
- Real-time data source -> Kafka -> Storm -> NoSQL
- There are 3 main abstractions in Storm:
- Spout: A source of streams in a computation. Typically can read from a queuing broker, such as Kafka or read from somewhere like the Twitter streaming API;
- Bolt: Processes any number of input streams and produces any number of new output streams. Most of the logic of a computation goes into bolts, such as functions, filters, streaming joins, streaming aggregations, talking to databases, etc.; and
- Topology: A network of spouts and bolts, with each edge in the network representing a bolt subscribing to the output stream of some other spout or bolt. A topology is an arbitrarily complex multi-stage stream computation. Topologies run indefinitely when deployed.
- When to use which tool?
- Sqoop: When data is sitting in data stores like RDBMS, data warehouses, or NoSQL data stores.
- Storm: When you need a real-time message computation system for processing fast, large streams of data. It is not a queue.
- Flume: When moving bulk streaming data from various sources, like web servers and social media.
- Pros:
- Flume is tightly integrated with Hadoop. It integrates with HDFS security very well.
- Flume is supported by a number of enterprise Hadoop providers.
- Flume supports built-in sources and sinks out-of-the-box.
- Flume makes event filtering and transforming very easy. For example, you can filter out messages that you are not interested in from the pipeline first before sending it through the network for performance gains.
- Cons:
- Not as scalable for adding additional consumers. Lesser message durability than Kafka.
- Pros:
- Kafka: When needing real-time data pipelines and streaming apps.
- Pros:
- Better in scalability and message durability.
- Easy to add a large number of consumers without affecting performance and down time.
- Cons:
- General purpose and not tightly integrated with Hadoop. You may need to write your own producers and consumers.
- Pros:
- Kafka vs. Flume
- Kafka consumers pull data from a topic. Different consumers can consume the messages at a different pace.
- A Flume sink supports a push model. Does not scale well when producers push massive volumes. Also not easy to add multiple consumers. Tight integration with Hadoop provides in-built sinks and enterprise support.
- Kafka and Flume work well together. Use a Kafka source to write to a Flume agent to take advantage of both.
Provisioning, managing, and monitoring
- Goal
- To provision, manage, monitor, and operate Hadoop clusters at scale.
- Apache Ambari
- Makes Hadoop management simpler by providing software for provisioning, managing, and monitoring Hadoop clusters.
- Provides an intuitive, easy-to-use Hadoop management web UI backed by its RESTful APIs.
- Enables application developers and system integrators to easily integrate Hadoop provisioning, management, and monitoring capabilities to their own applications with the Ambari APIs.
- Ambari enables system administrators to:
- Provision a Hadoop cluster
- Ambari provides a step-by-step wizard for installing Hadoop services across any number of hosts.
- Ambari handles the configuration of Hadoop services for the cluster.
- Manage a Hadoop cluster
- Ambari provides central management for starting, stopping, and re-configuring Hadoop services across the entire cluster.
- Monitor a Hadoop cluster
- Ambari provides a dashboard for monitoring the health and status of the Hadoop cluster.
- Ambari leverages the Ambari Metrics System for metrics collection.
- Ambari leverages the Ambari Alert Framework for system alerting and will notify you when your attention is needed (e.g., a node does down, remaining disk space is low, etc.).
- Provision a Hadoop cluster
- Apache Oozie
- A Java-based Web application that serves as a Workflow engine that schedules, runs, and manages jobs on Hadoop.
- Can combine multiple complex jobs to be run in a sequential order to complete a bigger task (i.e., combine multiple jobs sequentially into one logical unit of work). Within a sequence of tasks, two or more jobs can also be programmed to run in parallel to each other.
- Responsible for triggering the workflow actions, which, in turn, uses the Hadoop execution engine to actually execute the task.
- Tightly integrated with the Hadoop stack supporting various Hadoop jobs, like MapReduce, Hive, Pig, and Sqoop, as well as system-specific jobs like Java and Shell.
- Oozie Workflows:
- A workflow is a collection of action nodes (i.e., MapReduce jobs, Pig jobs) and control flow nodes (start, decision, end, fork) arranged in a control dependency Direct Acyclic Graph (DAG).
- Oozie workflows definitions are written in hPDL (an XML Process Definition Language).
- Oozie workflow actions start jobs in remote systems (i.e., Hadoop, Pig). Upon action completion, the remote systems callback Oozie to notify the action completion, at this point Oozie proceeds to the next action in the workflow.
- Control flow nodes define the beginning and the end of a workflow and provide a mechanism to control the workflow execution path (decision, fork, and join nodes).
- Apache ZooKeeper
- "Because coordinating distributed systems is a zoo"
- A highly available system for coordinating distributed processes. Distributed applications use ZooKeeper to store and mediate updates to important configuration information.
- Open source distributed software that provides coordination and operational services between distributed processes on a Hadoop cluster of nodes.
- ZooKeeper provides a:
- Naming service: Identify nodes in a cluster by name.
- Configuration management: Synchronizes configuration between nodes, ensuring consistent configuration.
- Process synchronization: ZooKeeper coordinates the starting and stopping of multiple nodes in a cluster. This ensures that all processing occurs in the intended order.
- Self-election: ZooKeeper can assign a "leader" role to one of the nodes. This leader/master handles all client requests on behalf of the cluster. If the leader node fails, another leader will be elected from the remaining nodes.
- Reliable messaging: Fulfills need for communication between and among the nodes in the cluster specific to the distributed application. ZooKeeper offers a publish/subscribe capability that allows the creation of a queue. This queue guarantees message delivery, even in the case of a node failure.
Other tools
- Ganglia
- Hue
- Mahout
- Tez
- HBase
- Phoenix
- Presto
- Metastore
- Spark
- Zeppelin
Overview of NoSQL
- What is NoSQL (Not Only SQL)?
- NoSQL does not use the relational model. Data is not stored in tables of rows and columns, but in a variety of different formats.
- Mostly open source.
- Encompasses a wide variety of database technologies that were developed to support the real-time processing needs of high-volume and high-velocity big data.
- Supports agile development, where the nature of data changes rapidly over cycles of a software development project.
- The challenges of relational databases
- Requires a well-defined structure of data, not suitable for the high-variety of Big Data.
- Schema is enforced rigorously. The database schema is defined upfront before building the application. This pattern does not provide flexibility in an agile development project that deals with highly dynamic applications.
- Relational databases can only grow vertically and more resources need to be added to the existing servers.
- Benefits of NoSQL over relational databases
- NoSQL databases are schema-less and do not define a strict data structure.
- Highly agile, can adapt to a variety of data formats including structured, unstructured, and semi-structured.
- Can scale horizontally by adding more servers. Utilizes concepts of sharding and replication.
- Sharding: Distributes data over multiple servers so that a single server acts as a source for a subset of data.
- Replication: Copies data across multiple servers so that data can be found in multiple places and can be recovered in case of server failure.
- Can utilize the Cloud model, which utilizes virtual servers that can be scaled on demand.
- Better performance than relational databases for unstructured data.
- The CAP theorem
- Originally published by Eric Brewer and describes the basic requirements for a distributed system and only 2 of the 3 can be achieved at any point in time and trade offs must be made, depending on the task.
- Consistency: All servers maintain the same state of data. Any queries for data will yield the same answers regardless of which server answers the query.
- Availability: The system is always available to yield answers to queries for data.
- Partition tolerance: The system continues to operate as a whole, even if individual servers fail or crash.
- ACID
- Describes a set of properties that apply to data transactions and which databases can choose to follow.
- Atomicity: All-or-none of a transaction must happen successfully.
- Consistency: Data is committed only if it passes all rules imposed by data types, triggers, constraints, etc.
- Isolation: Transactions are ordered such that one transaction does not get affected by another making changes to data.
- Durability: Once data is committed, it is durably stored and safeguarded against errors, crashes, or any other malfunctions within the database.
- Eventual consistency in NoSQL databases
- The consistency principle (from ACID) is not followed, but provides "eventual consistency".
- As per the CAP theorem, availability and partition tolerance are chosen over consistency.
- Database changes are propagated to all nodes "eventually" (typically within milliseconds), so queries for data might not return updated data immediately or might result in stale data.
- NoSQL databases offer options to tune the database according to application requirements. For example, if a database is read-heavy, eventual consistency is preferred.
- Example: DynamoDB on AWS offers options between eventual consistency and strong consistency.
- Eventually Consistent Reads (default): Maximizes your read throughput. However, an eventually consistent read might not reflect the results of a recently completed write. Consistency across all copies of a data is usually reached within a second. Repeating a read after a short time should return the updated data.
- Strongly Consistent Reads: Returns a result that reflects all writes that received a successful response prior to the read.
- Types of NoSQL databases
- Key-value
- Perform operations based on a unique key, where its value is a blob that can be of any format.
- Can store a blob with a key. Read, update, or delete blob for a given key.
- Scalable and high-performance.
- Fast lookup of values by known keys
- Generally useful for storing session information, user profiles, preferences, shopping cart data, etc.
- Avoid using key-value databases when relationships exist between data and the need to operate on multiple keys at a time.
- Some persist on disk (e.g., Redis), others store in memory (e.g., Memcached). Choose according to application requirements.
- Examples include Redis, MemcacheDB, Amazon DynamoDB, Riak, and Couchbase.
- Document-oriented
- Store a document as a whole and index on its identifier and properties. Allows putting a diverse set of documents together in a collection.
- Based on the main concept of documents.
- Heterogenous data, working object-oriented, agile devlopment
- Documents can be in XML, JSON, EDI, SWIFT, etc.
- Document databases store documents in the value part of the key.
- Documents are often self-describing, hierarchical tree structures consisting of map, collections, and scalar values.
- Documents are indexed using the Btree algorithm and queried using a Javascript query engine.
- Examples include MongoDB and CouchDB.
- Column
- Column family databases store data in column families as rows that have many columns associated with a row key.
- A column family is a container for an ordered collection of rows. Column families are groups of related data that are often associated together. As an example, for a given customer, one would often access their profile information at the same time, but not their orders.
- Each column family can be compared to a container of rows in an RDBMS table, where the key identifies the row and the row consists of multiple columns. The difference is that various rows do not have to have the same columns, and columns can be added to any row at any time without having to add it to other rows.
- When a column consists of a map of columns, then there is a super-column. A super-column consists of a name and a value, which is a map of columns. Think of a super-column as a container of columns.
- Examples include Google BigTable, Apache Cassandra, Hypertable, HBase, etc.
- Graph
- Base on graph theory. Uses nodes, edges, and properties on both nodes and edges.
- Graph databases allow you to store entities and relationships between these entities. Entities are also known as nodes, which have properties.
- Relations are known as "edges" that can have properties. Edges have directional significance; nodes are organized by relationships, which allow one to find interesting patterns between nodes.
- In graph databases, traversing the joins or relationships is very fast. The relationship between nodes is not calculated at query time, but is actually persisted as a relationship. Traversing persisted relationships is fast than caclulating them for every query.
- The power from graph databases comes from the relationships and their properties. Modeling relationships requires thorough design.
- Useful for data mining
- Examples include Neo4j, GiraffeDB, Infinite Graph, OrientDB
Below is an example of a super column family that contains other column families:
UserList={ Bob:{ username:{firstname:"Bob",lastname:"Brown"} address:{city:"Seattle",postcode:"98101"} } Alice:{ username:{firstname:"Alice",lastname:"Green"} account:{bank:"Bank of England",accountid:"123456"} } }
Where "Bob" and "Alice" are row keys; "username", "address", and "account" are super column names; and "firstname", "lastname", "city", etc. are column names.
- Further reading
Apache Spark
- What is Spark?
- Fast and general engine for large-scale data processing and analysis
- Parallel distributed processing on commodity hardware
- Easy to use
- A comprehensive, unified framework for Big Data analytics
- Open source and a top-level Apache project
- Spark use cases
- Big Data use case like intrusion detection, product recommendations, estimating financial risks, detecting genomic associations to a disease, etc. require analysis on large-scale data.
- In depth analysis requires a combination of tools like SQL, statistics, and machine learning to gain meaningful insights from data.
- Historical choices like R and Octave operate only on single node machines and are, therefore, not suitable for large volume data sets.
- Spark allows rich programming APIs like SQL, machine learning, graph processing, etc. to run on clusters of computers to achieve large-scale data processing and analysis.
- Spark and distributed processing
- Challenges of distributed processing:
- Distributed programming is much more complex than single node programming
- Data must be partitioned across servers, increasing the latency if data has to be shared between servers over the network
- Chances of failure increases with the increase in the number of servers
- Spark makes distributed processing easy:
- Provides a distributed and parallel processing framework
- Provides scalability
- Provides fault-tolerance
- Provides a programming paradigm that makes it easy to write code in a parallel manner
- Spark and its speed
- Lightning fast speeds due to in-memory caching and a DAG-based processing engine.
- 100 times faster than Hadoop's MapReduce for in-memory computations and 10 time faster for on-disk.
- Well suited for iterative algorithms in machine learning
- Fast, real-time response to user queries on large in-memory data sets.
- Low latency data analysis applied to processing live data streams
Glossary
- Hadoop
- A software ecosystem that enables massively parallel computations distributed across thousands of (commodity) servers in a cluster
- Data Lake
See also
- Amazon Web Services (AWS)
- Google Cloud Platform (GCP)
References
- ↑ 4 Vs For Big Data Analytics. 2013-06-31.