Kumulativ Stateful Transformation In Apache Spark Streaming



Dette blogginnlegget diskuterer stateful transformasjoner i Spark Streaming. Lær alt om kumulativ sporing og dyktighet for en Hadoop Spark-karriere.

Bidratt av Prithviraj Bose

I min forrige blogg har jeg diskutert stateful transformasjoner ved hjelp av vindusbegrepet Apache Spark Streaming. Du kan lese den her .





I dette innlegget skal jeg diskutere kumulative stateful operasjoner i Apache Spark Streaming. Hvis du er ny i Spark Streaming, anbefaler jeg deg på det sterkeste å lese den forrige bloggen min for å forstå hvordan windowsing fungerer.

Typer av stateful Transformation in Spark Streaming (Fortsatt ...)

> Kumulativ sporing

Vi hadde brukt redusereByKeyAndWindow (...) API for å spore nøklestatusene, men vindu gir imidlertid begrensninger for visse brukstilfeller. Hva om vi vil samle tastenes tilstander i stedet for å begrense det til et tidsvindu? I så fall må vi bruke updateStateByKey (…) BRANN.



Denne API-en ble introdusert i Spark 1.3.0 og har vært veldig populær. Imidlertid har denne API-en noen ytelsesomkostninger, ytelsen degraderer når størrelsen på statene øker over tid. Jeg har skrevet et eksempel for å vise bruken av denne API-en. Du finner koden her .

Spark 1.6.0 introduserte en ny API mapWithState (…) som løser ytelseskostnadene fra updateStateByKey (…) . I denne bloggen skal jeg diskutere dette API-et ved hjelp av et eksempelprogram som jeg har skrevet. Du finner koden her .

beste programvaren for Java-programmering

Før jeg dykker inn i en kodegjennomgang, la oss spare noen ord på sjekkpunktet. For enhver stateful transformasjon er kontrollpunkt obligatorisk. Kontrollpunkt er en mekanisme for å gjenopprette tilstanden til nøklene i tilfelle driverprogrammet mislykkes. Når driveren starter på nytt, blir tastenes tilstand gjenopprettet fra kontrollpunktsfilene. Kontrollpunktsteder er vanligvis HDFS eller Amazon S3 eller en hvilken som helst pålitelig lagring. Mens du tester koden, kan man også lagre i det lokale filsystemet.



I eksempelprogrammet lytter vi til socket-tekststrøm på host = localhost og port = 9999. Den tokeniserer den innkommende strømmen til (ord, antall forekomster) og sporer ordtellingen ved hjelp av 1.6.0 API mapWithState (…) . I tillegg fjernes nøkler uten oppdateringer ved hjelp av StateSpec.timeout API. Vi sjekker i HDFS, og sjekkfrekvensen er hvert 20. sekund.

La oss først lage en Spark Streaming-økt,

Spark-streaming-session

Vi lager en sjekkpunktDir i HDFS og ring deretter objektmetoden getOrCreate (…) . De getOrCreate API sjekker sjekkpunktDir for å se om det er noen tidligere tilstander å gjenopprette, hvis det eksisterer, gjenskaper det Spark Streaming-sesjonen og oppdaterer tilstandene til nøklene fra dataene som er lagret i filene før de går videre med nye data. Ellers oppretter det en ny Spark Streaming-økt.

De getOrCreate tar kontrollpunktsnavnet og en funksjon (som vi har navngitt createFunc ) hvis signatur skal være () => StreamingContext .

La oss undersøke koden inni createFunc .

Linje 2: Vi oppretter en streaming-kontekst med jobbnavn til “TestMapWithStateJob” og batchintervall = 5 sekunder.

Linje nr. 5: Still inn kontrollpunktkatalogen.

hvordan du kan forhindre lås i Java

Linje # 8: Still tilstandsspesifikasjonen ved hjelp av klassen org.apache.streaming.StateSpec gjenstand. Vi setter først funksjonen som skal spore tilstanden, deretter setter vi antall partisjoner for de resulterende DStreams som skal genereres under påfølgende transformasjoner. Til slutt setter vi tidsavbruddet (til 30 sekunder), hvis noen oppdatering for en nøkkel ikke mottas på 30 sekunder, vil nøkkeltilstanden bli fjernet.

Linje 12 #: Sett opp sokkelstrømmen, flate innkommende batchdata, opprett et nøkkelverdipar, ring mapWithState , sett kontrollintervallintervallet til 20-tallet og skriv til slutt ut resultatene.

Spark-rammeverket kaller th e createFunc for hver tast med forrige verdi og gjeldende tilstand. Vi beregner summen og oppdaterer tilstanden med den kumulative summen, og til slutt returnerer vi summen for nøkkelen.

def __init __ (selv) python

Github Kilder -> TestMapStateWithKey.scala , TestUpdateStateByKey.scala

Har du et spørsmål til oss? Vennligst nevn det i kommentarfeltet, så kommer vi tilbake til deg.

Relaterte innlegg:

Kom i gang med Apache Spark & ​​Scala

Stateful Transformations with Windowing in Spark Streaming