RDD ved hjelp av Spark: Building Block of Apache Spark



Denne bloggen om RDD ved hjelp av Spark vil gi deg en detaljert og omfattende kunnskap om RDD, som er den grunnleggende enheten til Spark og hvor nyttig den er.

, Selve ordet er nok til å generere en gnist i tankene til enhver Hadoop-ingeniør. TIL n i minnet behandlingsverktøy som er lynrask i klyngedatamaskiner. Sammenlignet med MapReduce lager datadeling i minnet RDDer 10-100x raskere enn nettverks- og diskdeling, og alt dette er mulig på grunn av RDD (Resilient Distribuerte datasett). Nøkkelpunktene vi fokuserer i dag i denne RDD ved hjelp av Spark-artikkelen er:

Trenger du RDD?

Hvorfor trenger vi RDD? -RDD ved hjelp av Spark





Verden utvikler seg med og Datavitenskap på grunn av fremgangen i . Algoritmer basert på Regresjon , , og som går på Distribuert Iterativ Comput ation mote som inkluderer gjenbruk og deling av data mellom flere databehandlingsenheter.

Det tradisjonelle teknikker trengte en stabil mellomliggende og distribuert lagring som HDFS som består av gjentatte beregninger med datareplikasjoner og dataserialisering, noe som gjorde prosessen mye tregere. Det var aldri lett å finne en løsning.



Dette er hvor RDDs (Resilient Distribuerte datasett) kommer til det store bildet.

RDD s er enkle å bruke og enkle å lage ettersom data importeres fra datakilder og slippes inn i RDD. Videre blir operasjonene brukt for å behandle dem. De er en distribuert minnesamling med tillatelser som Skrivebeskyttet og viktigst, det er de Feiltolerant .



Hvis noen datapartisjon av RDD er tapt , kan det regenereres ved å bruke det samme transformasjon operasjon på den tapte partisjonen i avstamning , i stedet for å behandle alle dataene fra bunnen av. Denne typen tilnærming i sanntidsscenarier kan gjøre mirakler i situasjoner med tap av data eller når et system er nede.

Hva er RDD?

RDD eller ( Resilient Distribuert datasett ) er en grunnleggende data struktur i Spark. Begrepet Fleksibel definerer evnen som genererer data automatisk eller data ruller tilbake til opprinnelig tilstand når det oppstår en uventet katastrofe med sannsynlighet for tap av data.

Dataene skrevet inn i RDD er partisjonert og lagret i flere kjørbare noder . Hvis en utførende node mislykkes i kjøretiden, så får den øyeblikkelig ryggen opp fra neste kjørbare node . Dette er grunnen til at RDDs blir sett på som en avansert type datastrukturer sammenlignet med andre tradisjonelle datastrukturer. RDD kan lagre strukturerte, ustrukturerte og semistrukturerte data.

La oss gå videre med vår RDD ved hjelp av Spark-bloggen og lære om de unike egenskapene til RDDs som gir den en fordel i forhold til andre typer datastrukturer.

Funksjoner av RDD

  • I minne (RAM) Beregninger : Konseptet med In-Memory-beregning tar databehandlingen til et raskere og effektivt stadium der det samlede opptreden av systemet er oppgradert.
  • L hans evaluering : Begrepet Lat evaluering sier transformasjoner blir brukt på dataene i RDD, men utdata genereres ikke. I stedet er de anvendte transformasjonene logget.
  • Standhaftighet : De resulterende RDDene er alltid gjenbrukbar.
  • Grovkornede operasjoner : Brukeren kan bruke transformasjoner til alle elementene i datasett gjennom kart, filter eller gruppe av operasjoner.
  • Feiltolerant : Hvis det er tap av data, kan systemet rull tilbake til dens opprinnelig tilstand ved å bruke den loggede transformasjoner .
  • Uforanderlighet : Data som er definert, hentet eller opprettet kan ikke være endret når den er logget inn i systemet. I tilfelle hvis du trenger tilgang til og endre eksisterende RDD, må du opprette en ny RDD ved å bruke et sett med Transformasjon fungerer på gjeldende eller foregående RDD.
  • Oppdeling : Det er den avgjørende enhet av parallellisme i Spark RDD. Antall opprettede partisjoner er som standard basert på datakilden din. Du kan til og med bestemme antall partisjoner du vil lage ved hjelp av tilpasset partisjon funksjoner.

Opprettelse av RDD ved hjelp av Spark

RDD kan opprettes i tre måter:

  1. Lese data fra parallelliserte samlinger
val PCRDD = spark.sparkContext.parallelize (Array ('Man', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat'), 2) val resultRDD = PCRDD.collect () resultRDD.collect ( ) .foreach (println)
  1. Søker transformasjon på tidligere RDD
val ord = spark.sparkContext.parallelize (Seq ('Spark', 'er', 'a', 'veldig', 'kraftig', 'språk')) val wordpair = words.map (w = (w.charAt ( 0), w)) wordpair.collect (). Foreach (println)
  1. Lese data fra ekstern lagring eller filstier som HDFS eller HBase
val Sparkfile = spark.read.textFile ('/ user / edureka_566977 / spark / spark.txt.') Sparkfile.collect ()

Operasjoner utført på RDD:

Det er hovedsakelig to typer operasjoner som utføres på RDD, nemlig:

  • Transformasjoner
  • Handlinger

Transformasjoner : De operasjoner vi søker på RDDs til filter, tilgang og endre dataene i foreldre RDD for å generere en påfølgende RDD er kalt transformasjon . Den nye RDD returnerer en peker til forrige RDD som sikrer avhengigheten mellom dem.

Transformasjoner er Lat evalueringer, med andre ord, operasjonene som brukes på RDD som du jobber, blir logget, men ikke henrettet. Systemet kaster et resultat eller unntak etter å ha utløst Handling .

Vi kan dele transformasjoner i to typer som nedenfor:

  • Smale transformasjoner
  • Brede transformasjoner

Smale transformasjoner Vi bruker smale transformasjoner på a enkelt partisjon av den overordnede RDD for å generere en ny RDD ettersom data som kreves for å behandle RDD er tilgjengelig på en enkelt partisjon av foreldre ASD . Eksemplene for smale transformasjoner er:

  • kart()
  • filter()
  • flatMap ()
  • skillevegg()
  • mapPartitions ()

Brede transformasjoner: Vi bruker den brede transformasjonen på flere partisjoner for å generere en ny RDD. Dataene som kreves for å behandle RDD er tilgjengelig på flere partisjoner av foreldre ASD . Eksemplene for brede transformasjoner er:

  • redusere av ()
  • fagforening ()

Handlinger : Handlinger instruerer Apache Spark om å søke beregning og send resultatet eller unntaket tilbake til sjåføren RDD. Få av handlingene inkluderer:

hva er forskjellen mellom git og github
  • samle inn()
  • telle()
  • ta()
  • først()

La oss praktisk talt bruke operasjonene på RDD:

IPL (indisk Premier League) er en cricket-turnering med hipe på toppnivå. Så, la oss i dag få tak i IPL-datasettet og utføre vår RDD ved hjelp av Spark.

  • Først, la oss laste ned CSV-matchdata for IPL. Etter at den er lastet ned, begynner den å se ut som en EXCEL-fil med rader og kolonner.

I neste trinn fyrer vi opp gnisten og laster matches.csv-filen fra dens plassering, i mitt tilfelle mincsvfilplassering er “/Bruker / edureka_566977/test/matches.csv”

La oss nå begynne med Transformasjon del først:

  • kart():

Vi bruker Karttransformasjon å bruke en spesifikk transformasjonsoperasjon på hvert element i en RDD. Her lager vi en RDD ved navn CKfile hvor du lagrer vårcsvfil. Vi skal opprette en annen RDD kalt stater til lagre bydetaljene .

spark2-shell val CKfile = sc.textFile ('/ user / edureka_566977 / test / matches.csv') CKfile.collect.foreach (println) val states = CKfile.map (_. split (',') (2)) states.collect (). foreach (println)

  • filter():

Filtertransformasjon, selve navnet beskriver bruken. Vi bruker denne transformasjonsoperasjonen for å filtrere ut selektive data fra en gitt datainnsamling. Vi søker filterdrift her for å få oversikten over årets IPL-kamper 2017 og lagre den i fil RDD.

val fil = CKfile.filter (line => line.contains ('2017')) fil.collect (). foreach (println)

  • flatMap ():

Vi bruker flatMap er en transformasjonsoperasjon til hvert av elementene i en RDD for å lage en newRDD. Det ligner på karttransformasjon. her søker viFlatmaptil spytt ut fyrstikkene i Hyderabad by og lagre dataene ifilRDDRDD.

val filRDD = fil.flatMap (line => line.split ('Hyderabad')). samle ()

  • skillevegg():

Hver data vi skriver inn i en RDD er delt inn i et visst antall partisjoner. Vi bruker denne transformasjonen til å finne antall partisjoner dataene er faktisk delt inn i.

val fil = CKfile.filter (line => line.contains ('2017')) fil.partitions.size

passere referanse i java

  • mapPartitions ():

Vi anser MapPatitions som et alternativ til Map () ogfor hver() sammen. Vi bruker mapPartitions her for å finne antall rader vi har i vår fil RDD.

val fil = CKfile.filter (line => line.contains ('2016')) fil.mapPartitions (idx => Array (idx.size) .iterator) .collect

  • redusere av ():

Vi brukerReduser av() på Nøkkelverdipar . Vi brukte denne transformasjonen på vårcsvfil for å finne spilleren med kampens høyeste mann .

val ManOfTheMatch = CKfile.map (_. split (',') (13)) val MOTMcount = ManOfTheMatch.map (WINcount => (WINcount, 1)) val ManOTH = MOTMcount.reduceByKey ((x, y) => x + y) .kart (tup => (tup._2, tup._1)) sortByKey (false) ManOTH.take (10) .foreach (println)

  • fagforening ():

Navnet forklarer alt, Vi bruker fagforeningstransformasjon er å klubben to RDDs sammen . Her lager vi to RDD-er, nemlig fil og fil2. fil RDD inneholder postene over 2017 IPL-kamper og fil2 RDD inneholder 2016 IPL-kamprekord.

val fil = CKfile.filter (line => line.contains ('2017')) val fil2 = CKfile.filter (line => line.contains ('2016')) val uninRDD = fil.union (fil2)

La oss starte med Handling del der vi viser faktisk produksjon:

  • samle inn():

Collect er handlingen vi pleier å vise innholdet i RDD.

val CKfile = sc.textFile ('/ user / edureka_566977 / test / matches.csv') CKfile.collect.foreach (println)

  • telle():

Telleer en handling som vi bruker til å telle antall poster til stede i RDD.Hervi bruker denne operasjonen til å telle det totale antallet poster i matches.csv-filen.

val CKfile = sc.textFile ('/ user / edureka_566977 / test / matches.csv') CKfile.count ()

  • ta():

Take er en handlingsoperasjon som samler, men den eneste forskjellen er at den kan skrive ut hvilken som helst selektivt antall rader i henhold til brukerforespørsel. Her bruker vi følgende kode for å skrive ut topp ti ledende rapporter.

val statecountm = Scount.reduceByKey ((x, y) => x + y) .map (tup => (tup._2, tup._1)) sortByKey (false) statecountm.collect (). foreach (println) statecountm. ta (10) .foreach (println)

  • først():

First () er en handlingsoperasjon som ligner på collect () og take ()denbrukes til å skrive ut den øverste rapporten s output Her bruker vi den første () operasjonen for å finne maksimalt antall kamper spilt i en bestemt by og vi får Mumbai som utgang.

val CKfile = sc.textFile ('/ user / edureka_566977 / test / matches.csv') val states = CKfile.map (_. split (',') (2)) val Scount = states.map (Scount => ( Scount, 1)) scala & gt val statecount = Scount.reduceByKey ((x, y) => x + y) .collect.foreach (println) Scount.reduceByKey ((x, y) => x + y) .collect.foreach (println) val statecountm = Scount.reduceByKey ((x, y) => x + y) .map (tup => (tup._2, tup._1)) sortByKey (false) statecountm.first ()

For å gjøre prosessen vår til å lære RDD ved hjelp av Spark, enda mer interessant, har jeg kommet med en interessant brukssak.

RDD ved hjelp av Spark: Pokemon Use Case

  • Først, La oss laste ned en Pokemon.csv-fil og laste den til gnistskallet slik vi gjorde det til Matches.csv-filen.
val PokemonDataRDD1 = sc.textFile ('/ user / edureka_566977 / PokemonFile / PokemonData.csv') PokemonDataRDD1.collect (). foreach (println)

Pokemons er faktisk tilgjengelige i et stort utvalg, la oss finne noen få varianter.

  • Fjerner skjema fra Pokemon.csv-filen

Vi trenger kanskje ikke Skjema av filen Pokemon.csv. Derfor fjerner vi den.

val Head = PokemonDataRDD1.first () val NoHeader = PokemonDataRDD1.filter (line =>! line.equals (Head))

  • Finne antall skillevegger vår pokemon.csv distribueres i.
println ('No.ofpartitions =' + NoHeader.partitions.size)

  • Vann Pokémon

Å finne antall vannpokemon

val WaterRDD = PokemonDataRDD1.filter (line => line.contains ('Water')) WaterRDD.collect (). foreach (println)

  • Brann Pokémon

Å finne antall Fire Pokemon

val FireRDD = PokemonDataRDD1.filter (line => line.contains ('Fire')) FireRDD.collect (). foreach (println)

  • Vi kan også oppdage befolkning av en annen type pokemon ved hjelp av tellefunksjonen
WaterRDD.count () FireRDD.count ()

  • Siden jeg liker spillet av defensiv strategi la oss finne pokemon med maksimalt forsvar.
val defenceList = NoHeader.map {x => x.split (',')}. kart {x => (x (6) .toDouble)} println ('Highest_Defence:' + defenceList.max ())

  • Vi vet maksimalt forsvarsstyrkeverdi men vi vet ikke hvilken pokemon det er. så la oss finne ut hva som er det pokemon.
val defWithPokemonName = NoHeader.map {x => x.split (',')}. kart {x => (x (6) .toDouble, x (1))} val MaxDefencePokemon = defWithPokemonName.groupByKey.takeOrdered (1) (Bestiller [Double] .reverse.on (_._ 1)) MaxDefencePokemon.foreach (println)

  • La oss nå ordne pokemon med minst forsvar
val minDefencePokemon = defenceList.distinct.sortBy (x => x.toDouble, true, 1) minDefencePokemon.take (5) .foreach (println)

  • La oss nå se Pokémon med en mindre defensiv strategi.
val PokemonDataRDD2 = sc.textFile ('/ user / edureka_566977 / PokemonFile / PokemonData.csv') val Head2 = PokemonDataRDD2.first () val NoHeader2 = PokemonDataRDD2.filter (line =>! line.equals (Head)) val defWithPeader .kart {x => x.split (',')}. kart {x => (x (6) .toDouble, x (1))} val MinDefencePokemon2 = defWithPokemonName2.groupByKey.takeOrdered (1) (Bestilling [Dobbelt ] .on (_._ 1)) MinDefencePokemon2.foreach (println)

skannerklassemetoder i java

Så med dette kommer vi til en slutt på denne RDD ved hjelp av Spark-artikkelen. Jeg håper vi fikk litt lys over din kunnskap om RDD, deres funksjoner og de forskjellige typer operasjoner som kan utføres på dem.

Denne artikkelen er basert på er designet for å forberede deg på Cloudera Hadoop og Spark Developer Certification Exam (CCA175). Du vil få en grundig kunnskap om Apache Spark og Spark Ecosystem, som inkluderer Spark RDD, Spark SQL, Spark MLlib og Spark Streaming. Du vil få omfattende kunnskap om Scala programmeringsspråk, HDFS, Sqoop, Flume, Spark GraphX ​​og Messaging System som Kafka.