DBInputFormat for å overføre data fra SQL til NoSQL-database



Målet med denne bloggen er å lære hvordan du overfører data fra SQL-databaser til HDFS, hvordan du overfører data fra SQL-databaser til NoSQL-databaser.

I denne bloggen vil vi utforske mulighetene og mulighetene til en av de viktigste komponentene i Hadoop-teknologien, dvs. MapReduce.

I dag vedtar selskaper Hadoop-rammeverket som deres førstevalg for datalagring på grunn av dets evner til å håndtere store data effektivt. Men vi vet også at dataene er allsidige og finnes i forskjellige strukturer og formater. For å kontrollere et så stort utvalg av data og dets forskjellige formater, bør det være en mekanisme for å imøtekomme alle varianter og likevel gi et effektivt og konsistent resultat.





Den kraftigste komponenten i Hadoop-rammeverket er MapReduce, som kan gi kontrollen over dataene og strukturen bedre enn de andre kolleger. Selv om det krever overhead av læringskurve og programmeringskompleksitet, hvis du kan håndtere disse kompleksitetene, kan du sikkert håndtere alle slags data med Hadoop.

MapReduce framework deler alle behandlingsoppgavene i utgangspunktet i to faser: Map og Reduce.



Forberedelse av rådata for disse fasene krever forståelse av noen grunnleggende klasser og grensesnitt. Superklassen for denne etterbehandlingen er InputFormat.

De InputFormat klasse er en av kjerneklassene i Hadoop MapReduce API. Denne klassen er ansvarlig for å definere to hoved ting:

  • Datasplitt
  • Plateleser

Datadeling er et grunnleggende konsept i Hadoop MapReduce-rammeverk som definerer både størrelsen på individuelle kartoppgaver og dens potensielle kjøringsserver. De Ta opp leser er ansvarlig for faktisk lesing av poster fra inndatafilen og å sende dem (som nøkkel / verdipar) til kartleggeren.



Antall kartleggere avgjøres ut fra antall splitter. Det er InputFormats jobb å lage splittene. For det meste tilsvarer delingsstørrelse blokkstørrelse, men det er ikke alltid splittelser vil opprettes basert på HDFS-blokkstørrelse. Det kommer helt an på hvordan getSplits () -metoden til InputFormat er blitt overstyrt.

Det er en grunnleggende forskjell mellom MR split og HDFS block. En blokk er et fysisk stykke data mens en splittelse bare er en logisk del som en kartlegger leser. En splitt inneholder ikke inngangsdata, den inneholder bare en referanse eller adresse til dataene. En splittelse har i utgangspunktet to ting: En lengde i byte og et sett med lagringssteder, som bare er strenger.

dataabstrahering c ++

For å forstå dette bedre, la oss ta et eksempel: Behandle data som er lagret i MySQL ved hjelp av MR. Siden det ikke er noe konsept med blokker i dette tilfellet, blir teorien: 'splitter alltid opprettet basert på HDFS-blokken',mislykkes. En mulighet er å opprette splittelser basert på rekkeområder i MySQL-tabellen din (og dette er hva DBInputFormat gjør, et inndataformat for å lese data fra en relasjonsdatabase). Vi kan ha k antall splitt som består av n rader.

Det er bare for InputFormats basert på FileInputFormat (en InputFormat for håndtering av data som er lagret i filer) at splittene blir opprettet basert på den totale størrelsen, i byte, av inndatafilene. FileSystem-blokkstørrelsen på inndatafilene blir imidlertid behandlet som en øvre grense for inndelinger. Hvis du har en fil som er mindre enn HDFS-blokkstørrelsen, får du bare en kartlegger for den filen. Hvis du vil ha en annen oppførsel, kan du bruke mapred.min.split.size. Men det avhenger igjen bare av getSplits () til InputFormat.

Vi har så mange eksisterende inngangsformater tilgjengelig under pakken org.apache.hadoop.mapreduce.lib.input.

CombineFileInputFormat.html

CombineFileRecordReader.html

CombineFileRecordReaderWrapper.html

CombineFileSplit.html

CombineSequenceFileInputFormat.html

CombineTextInputFormat.html

FileInputFormat.html

FileInputFormatCounter.html

FileSplit.html

FixedLengthInputFormat.html

InvalidInputException.html

KeyValueLineRecordReader.html

KeyValueTextInputFormat.html

MultipleInputs.html

NLineInputFormat.html

SequenceFileAsBinaryInputFormat.html

SequenceFileAsTextInputFormat.html

SequenceFileAsTextRecordReader.html

SequenceFileInputFilter.html

SequenceFileInputFormat.html

SequenceFileRecordReader.html

TextInputFormat.html

Standard er TextInputFormat.

På samme måte har vi så mange utdataformater som leser dataene fra reduseringsanordninger og lagrer dem i HDFS:

FileOutputCommitter.html

FileOutputFormat.html

FileOutputFormatCounter.html

FilterOutputFormat.html

LazyOutputFormat.html

MapFileOutputFormat.html

MultipleOutputs.html

NullOutputFormat.html

PartialFileOutputCommitter.html

PartialOutputCommitter.html

SequenceFileAsBinaryOutputFormat.html

SequenceFileOutputFormat.html

TextOutputFormat.html

Standard er TextOutputFormat.

selen webdriver testng rammeeksempel

Når du er ferdig med å lese denne bloggen, ville du ha lært:

  • Hvordan skrive et kartreduksjonsprogram
  • Om forskjellige typer InputFormats tilgjengelig i Mapreduce
  • Hva er behovet for InputFormats
  • Hvordan skrive tilpassede InputFormats
  • Hvordan overføre data fra SQL-databaser til HDFS
  • Hvordan overføre data fra SQL (her MySQL) -databaser til NoSQL-databaser (her Hbase)
  • Hvordan overføre data fra en SQL-database til en annen tabell i SQL-databaser (Kanskje dette ikke er så viktig hvis vi gjør dette i samme SQL-database. Det er imidlertid ingenting galt i å ha kunnskap om det samme. Du vet aldri hvordan den kan komme i bruk)

Forutsetning:

  • Hadoop forhåndsinstallert
  • SQL forhåndsinstallert
  • Hbase forhåndsinstallert
  • Java grunnleggende forståelse
  • MapReduser kunnskap
  • Hadoop rammeverk grunnleggende kunnskap

La oss forstå problemstillingen som vi skal løse her:

Vi har en ansattstabell i MySQL DB i vår relasjonsdatabase Edureka. Nå som i henhold til forretningskravet, må vi flytte alle tilgjengelige data i relasjonell DB til Hadoop-filsystem, dvs. HDFS, NoSQL DB kjent som Hbase.

Vi har mange muligheter for å gjøre denne oppgaven:

  • Sqoop
  • Flume
  • Kart reduksjon

Nå vil du ikke installere og konfigurere noe annet verktøy for denne operasjonen. Du sitter igjen med bare ett alternativ som er Hadoops behandlingsramme MapReduce. MapReduce-rammeverk vil gi deg full kontroll over dataene mens du overfører. Du kan manipulere kolonnene og sette direkte på et av de to målstedene.

Merk:

  • Vi må laste ned og plassere MySQL-kontakten i kursstien til Hadoop for å hente tabeller fra MySQL-tabellen. For å gjøre dette, last ned kontakten com.mysql.jdbc_5.1.5.jar og hold den under katalogen Hadoop_home / share / Hadoop / MaPreduce / lib.
cp Nedlastinger / com.mysql.jdbc_5.1.5.jar $ HADOOP_HOME / del / hadoop / mapreduce / lib /
  • Sett også alle Hbase-krukker under Hadoop-klassen for å få MR-programmet ditt til å gå til Hbase. For å gjøre dette, utfør følgende kommando :
cp $ HBASE_HOME / lib / * $ HADOOP_HOME / del / hadoop / mapreduce / lib /

Programvareversjonene jeg har brukt i utførelsen av denne oppgaven er:

  • Hadooop-2.3.0
  • HBase 0,98,9-Hadoop2
  • Formørkelse Månen

For å unngå programmet i kompatibilitetsproblemer, foreskriver jeg leserne mine å kjøre kommandoen med lignende miljø.

Tilpasset DBInputWritable:

pakke com.inputFormat.copy import java.io.DataInput import java.io.DataOutput import java.io.IOException import java.sql.ResultSet import java.sql.PreparedStatement import java.sql.SQLException import org.apache.hadoop.io .Writable import org.apache.hadoop.mapreduce.lib.db.DBWritable public class DBInputWritable implementer Writable, DBWritable {private int id private Strengnavn, avd. Public void readFields (DataInput in) kaster IOException {} public void readFields (ResultSet rs) kaster SQLException // Resultset-objekt representerer dataene som returneres fra en SQL-setning {id = rs.getInt (1) name = rs.getString (2) dept = rs.getString (3)} public void write (DataOutput out) kaster IOException { } public void write (PreparedStatement ps) kaster SQLException {ps.setInt (1, id) ps.setString (2, name) ps.setString (3, dept)} public int getId () {return id} public String getName () {return name} public String getDept () {return dept}}

Egendefinert DBOutput Skrivbar:

pakke com.inputFormat.copy import java.io.DataInput import java.io.DataOutput import java.io.IOException import java.sql.ResultSet import java.sql.PreparedStatement import java.sql.SQLException import org.apache.hadoop.io .Writable import org.apache.hadoop.mapreduce.lib.db.DBWritable public class DBOutputWritable implementer Writable, DBWritable {private String name private int id private String dept public DBOutputWritable (String name, int id, Streng dept) {this.name = navngi this.id = id this.dept = dept} public void readFields (DataInput in) kaster IOException {} public void readFields (ResultSet rs) kaster SQLException {} public void write (DataOutput out) kaster IOException {} public void write (PreparedStatement ps) kaster SQLException {ps.setString (1, navn) ps.setInt (2, id) ps.setString (3, avd.)}}

Inngangstabell:

lage database edureka
opprett tabell emp (empid int ikke null, navn varchar (30), avd. varchar (20), primærnøkkel (empid))
sett inn i emp-verdier (1, 'abhay', 'utvikling'), (2, 'brundesh', 'test')
velg * fra emp

Sak 1: Overføring fra MySQL til HDFS

pakke com.inputFormat.copy importerer java.net.URI importerer org.apache.hadoop.conf.Configuration importerer org.apache.hadoop.fs.FileSystem importerer org.apache.hadoop.fs.Path importerer org.apache.hadoop.mapreduce .Job import org.apache.hadoop.mapreduce.lib.db.DBConfiguration import org.apache.hadoop.mapreduce.lib.db.DBInputFormat import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat importer org.apache.hadoop .io.Text importerer org.apache.hadoop.io.IntWritable public class MainDbtohdfs {public static void main (String [] args) kaster Unntak {Configuration conf = new Configuration () DBConfiguration.configureDB (conf, 'com.mysql.jdbc .Driver ', // driverklasse' jdbc: mysql: // localhost: 3306 / edureka ', // db url' root ', // brukernavn' root ') // passord Jobjobb = ny jobb (conf) jobb .setJarByClass (MainDbtohdfs.class) job.setMapperClass (Map.class) job.setMapOutputKeyClass (Text.class) job.setMapOutputValueClass (IntWritable.class) job.setInputFormatClass (DBInputFormat.classPatformat) ny bane (args [0])) DBInputFormat.setInput (jobb, DBInputWritable.class, 'emp', // input tabellnavn null, null, new String [] {'empid', 'name', 'dept'} / / tabellkolonner) Sti p = ny Sti (args [0]) FileSystem fs = FileSystem.get (ny URI (args [0]), conf) fs.delete (p) System.exit (job.waitForCompletion (true)? 0: 1)}}

Denne koden lar oss forberede eller konfigurere inngangsformatet for å få tilgang til kilden vår SQL DB. Parameteren inkluderer førerklassen, URL-adressen har adressen til SQL-databasen, brukernavnet og passordet.

DBConfiguration.configureDB (conf, 'com.mysql.jdbc.Driver', // driver class 'jdbc: mysql: // localhost: 3306 / edureka', // db url 'root', // user name 'root') //passord

Denne koden lar oss sende detaljene til tabellene i databasen og sette den i jobbobjektet. Parametrene inkluderer selvfølgelig jobbforekomsten, den tilpassede skrivbare klassen som må implementere DBWritable-grensesnitt, kildetabellnavnet, tilstand hvis noe annet er null, eventuelle sorteringsparametere annet null, listen over henholdsvis tabellkolonner.

DBInputFormat.setInput (jobb, DBInputWritable.class, 'emp', // input tabellnavn null, null, ny streng [] {'empid', 'name', 'dept'} // tabellkolonner)

Kartlegger

pakke com.inputFormat.copy import java.io.IOException import org.apache.hadoop.mapreduce.Mapper import org.apache.hadoop.io.LongWritable import org.apache.hadoop.io.Text import org.apache.hadoop.io .IntWritable public class Map utvider Mapper {.
beskyttet ugyldig kart (LongWritable-nøkkel, DBInputWritable-verdi, Context ctx) {prøv {String name = value.getName () IntWritable id = new IntWritable (value.getId ()) Streng dept = value.getDept ()
ctx.write (ny tekst (navn + '' + id + '' + avd.), id)
} fangst (IOException e) {e.printStackTrace ()} fangst (InterruptedException e) {e.printStackTrace ()}}}

Reduksjon: Identitetsreduksjon brukt

Kommando for å løpe:

hadoop jar dbhdfs.jar com.inputFormat.copy.MainDbtohdfs / dbtohdfs

Utgang: MySQL-tabell overført til HDFS

hadoop dfs -ls / dbtohdfs / *

Sak 2: Overføring fra ett bord i MySQL til et annet i MySQL

lage utgangstabell i MySQL

opprett tabellmedarbeider1 (navn varchar (20), id int, avd. varchar (20))

mysql tutorial for nybegynnere med eksempler
pakke com.inputFormat.copy import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapreduce.Job importer org.apache.hadoop.mapreduce.lib.db.DBConfiguration import org.apache.hadoop.mapreduce.lib .db.DBInputFormat importerer org.apache.hadoop.mapreduce.lib.db.DBOutputFormat importerer org.apache.hadoop.io.Text importerer org.apache.hadoop.io.IntWritable importerer org.apache.hadoop.io.NullWritable offentlig klasse Mainonetable_to_other_table {public static void main (String [] args) kaster unntak {Configuration conf = new Configuration () DBConfiguration.configureDB (conf, 'com.mysql.jdbc.Driver', // driver class 'jdbc: mysql: // localhost : 3306 / edureka ', // db url' root ', // brukernavn' root ') // passord Jobjobb = ny jobb (conf) jobb.setJarByClass (Mainonetable_to_other_table.class) jobb.setMapperClass (Map.class) jobb .setReducerClass (Reduce.class) jobb.setMapOutputKeyClass (Text.class) jobb.setMapOutputValueClass (IntWritable.class) jobb.setOutputKeyClass (DBOutputWritable.class) jobb.setOutputValueClass (Nul lWritable.class) job.setInputFormatClass (DBInputFormat.class) job.setOutputFormatClass (DBOutputFormat.class) DBInputFormat.setInput (job, DBInputWritable.class, 'emp', // input tabellnavn null, null, ny String [] {'empid ',' navn ',' avd. '} // tabellkolonner) DBOutputFormat.setOutput (jobb,' ansatt1 ', // navn på utgangstabell ny streng [] {' navn ',' id ',' avd. '} // tabell kolonner) System.exit (job.waitForCompletion (true)? 0: 1)}}

Denne koden lar oss konfigurere navnet på utgangstabellen i SQL DB. Parametrene er henholdsvis jobbinstans, navn på utgangstabell og utdatakolonne.

DBOutputFormat.setOutput (jobb, 'ansatt1', // navn på utgangstabell ny streng [] {'navn', 'id', 'avd.'} // tabellkolonner)

Mapper: Samme som sak 1

Reduksjon:

pakke com.inputFormat.copy import java.io.IOException import org.apache.hadoop.mapreduce.Reducer import org.apache.hadoop.io.Text import org.apache.hadoop.io.IntWritable import org.apache.hadoop.io .NullWritable offentlig klasse Reduser utvider Reduser {beskyttet tomrom redusere (Tekstnøkkel, Iterable verdier, Kontekst ctx) {int sum = 0 Strenglinje [] = key.toString (). Split ('') prøv {ctx.write (ny DBOutputWritable (linje [0] .toString (), Heltall.parseInt (linje [1] .toString ()), linje [2] .toString ()), NullWritable.get ())} fangst (IOException e) {e.printStackTrace ()} fangst (InterruptedException e) {e.printStackTrace ()}}}

Kommando å kjøre:

hadoop jar dbhdfs.jar com.inputFormat.copy.Mainonetable_to_other_table

Output: Overførte data fra EMP-tabellen i MySQL til en annen tabellmedarbeider1 i MySQL

Sak 3: Overføring fra tabell i MySQL til NoSQL (Hbase) tabell

Opprette Hbase-tabell for å imøtekomme utdata fra SQL-tabellen:

opprett 'ansatt', 'offisiell_info'

Førerklasse:

pakke Dbtohbase import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.mapreduce.lib.db.DBConfiguration import org.apache.hadoop.mapreduce.lib.db.DBInputFormat importer org.apache.hadoop.hbase.mapreduce.TableOutputFormat importer org.apache.hadoop.hbase.HBaseConfiguration importer org.apache.hadoop.hbase.client.HTable import org.apache.hadoop.hbase.client.HTableInterface import org.apache .hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil import org.apache.hadoop.io.Text offentlig klasse MainDbToHbase {offentlig statisk ugyldig hoved (String [] args) kaster Unntak {Configuration conf = HBaseConfiguration.create () HTableInterface mytable = new HTable (conf, 'emp') DBConfiguration.configureDB (conf, 'com.mysql.jdbc.Driver', // driver class 'jdbc: mysql: // localhost: 3306 / edureka' , // db url 'root', // brukernavn 'root') // passord Jobjobb = ny jobb (conf, 'dbtohbase') jobb.setJarByClass (MainDbToHbase.class) jobb.s etMapperClass (Map.class) jobb.setMapOutputKeyClass (ImmutableBytesWritable.class) jobb.setMapOutputValueClass (Text.class) TableMapReduceUtil.initTableReducerJob ('ansatt', Reduser.klasse, jobb) jobb.setInputFormatClass.CormFormatClass. klasse) DBInputFormat.setInput (jobb, DBInputWritable.class, 'emp', // input tabellnavn null, null, ny streng [] {'empid', 'name', 'dept'} // tabellkolonner) System.exit (job.waitForCompletion (true)? 0: 1)}}

Denne koden lar deg konfigurere utgangsnøkkelklassen som i tilfelle hbase er ImmutableBytesWritable

job.setMapOutputKeyClass (ImmutableBytesWritable.class) job.setMapOutputValueClass (Text.class)

Her passerer vi navnet på hbase-tabellen og reduksjonsenheten for å handle på bordet.

TableMapReduceUtil.initTableReducerJob ('ansatt', Reduser.klasse, jobb)

Kartlegger:

pakke Dbtohbase import java.io.IOException import org.apache.hadoop.mapreduce.Mapper import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.io .LongWritable import org.apache.hadoop.io.Text import org.apache.hadoop.io.IntWritable public class Kart utvider Mapper {private IntWritable en = ny IntWritable (1) beskyttet ugyldig kart (LongWritable id, DBInputWritable verdi, Context context) {prøv {String line = value.getName () String cd = value.getId () + '' String dept = value.getDept () context.write (new ImmutableBytesWritable (Bytes.toBytes (cd)), new Text (line + ' '+ dept))} fangst (IOException e) {e.printStackTrace ()} fangst (InterruptedException e) {e.printStackTrace ()}}}

I denne koden tar vi verdier fra getters av klassen DBinputwritable og sender dem inn
ImmutableBytes Skrives slik at de når reduksjonsenheten i bytewriatble form som Hbase forstår.

String line = value.getName () String cd = value.getId () + '' String dept = value.getDept () context.write (new ImmutableBytesWritable (Bytes.toBytes (cd)), new Text (line + '' + avd. ))

Reduksjon:

pakke Dbtohbase import java.io.IOException import org.apache.hadoop.hbase.client.Put import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapreduce.TableReducer import org.apache.hadoop .hbase.util.Bytes importerer org.apache.hadoop.io.Text public class Reduce utvider TableReducer {public void reduce (ImmutableBytesWritable key, Iterable values, Context context) kaster IOException, InterruptedException {String [] årsak = null // Loop-verdier for (Text val: values) {cause = val.toString (). split ('')} // Sett til HBase Put put = new Put (key.get ()) put.add (Bytes.toBytes ('official_info') ), Bytes.toBytes ('navn'), Bytes.toBytes (årsak [0])) put.add (Bytes.toBytes ('official_info'), Bytes.toBytes ('avdeling'), Bytes.toBytes (årsak [1 ])) context.write (nøkkel, put)}}

Denne koden lar oss bestemme den eksakte raden og kolonnen der vi skal lagre verdier fra reduseringsenheten. Her lagrer vi hver empid i egen rad da vi lagde empid som radnøkkel som ville være unik. I hver rad lagrer vi den offisielle informasjonen til de ansatte under henholdsvis kolonnefamilien “offisiell_info” under kolonnene “navn” og “avdeling”.

Put put = new Put (key.get ()) put.add (Bytes.toBytes ('official_info'), Bytes.toBytes ('name'), Bytes.toBytes (årsak [0])) put.add (Bytes. toBytes ('official_info'), Bytes.toBytes ('department'), Bytes.toBytes (cause [1])) context.write (key, put)

Overførte data i Hbase:

skann ansatt

Som vi ser var vi i stand til å fullføre oppgaven med å overføre forretningsdataene våre fra en relasjonell SQL DB til en NoSQL DB.

I neste blogg lærer vi hvordan du skriver og utfører koder for andre inndata og utdataformater.

Fortsett å legge inn kommentarer, spørsmål eller tilbakemeldinger. Jeg vil gjerne høre fra deg.

Har du spørsmål til oss? Vennligst nevn det i kommentarfeltet, så kommer vi tilbake til deg.

Relaterte innlegg: