Friday, 14 August 2020

SQOOP

SQOOP

====
to see the sqoop current working version

------------------------------------------------------------------------------------------------------------------------------- ----------------

echo $SQOOP_HOME
/home/gopalkrishna/INSTALL/sqoop-1.4.6.bin__hadoop-2.0.4-alpha

 ------------------------------------------------------------------------------------------------------------------------------- ---------------

to see the all commands available in sqoop 

------------------------------------------------------------------------------------------------------------------------------- ----------------
$sqoop help ------------------------------------------------------------------------------------------------------------------------------- ---------------

SQOOP COMMANDS 

--------------

TO CHECK MYSQL “DATABASES” , “TABLES” & “TABLES DATA” not be performed(action) to get the results from rdbms

to see the all databases available in given rdbms

---in this case mapper will

------------------------------------------------------------------------------------------------------------------------------- ---------------
sqoop list-databases --connect jdbc:mysql://localhost:3306 --username root -password root

 ------------------------------------------------------------------------------------------------------------------------------- ---------------

to see the availavle tables in the given database

--------------------------------------------------------------------------------------
sqoop list-tables --connect jdbc:mysql://localhost/nag --username root --password root 

--------------------------------------------------------------------------------------

eval 

----

it is Evaluate given SQL statement if given sql stmt is currect then display the results on console


advantage of eval 

------------------

by using eval command u can see the what type of data u going to import from rdbms table to hdfs

eval + --query 

--------------

------------------------------------------------------------------------------------------------------------------------------- -------------
sqoop eval --connect jdbc:mysql://localhost/nag --username root -password root --query 'select *from emp limit 3'

sqoop eval --connect jdbc:mysql://localhost:3306/gopaldb --username root --password root --query "select * from emp where esal > 25000"
sqoop eval --connect jdbc:mysql://localhost/nag --username root -password root --query 'show databases'

 ------------------------------------------------------------------------------------------------------------------------------- ----------------

we can perform any dml operaion through the sqoop commands

------------------------------------------------------------------------------------------------------------------------------- --------------
sqoop eval --connect jdbc:mysql://localhost/nag --username root -password root --query 'delete from emp where empno=1';

sqoop eval --connect jdbc:mysql://localhost/nag --username root -password root --query 'update emp set ename='nag' where ename='nag';
sqoop eval --connect jdbc:mysql://localhost/nag --username root -password root --query 'insert into emp values('nag');

sqoop eval --connect jdbc:mysql://localhost/nag --username root -password root --query 'drop table emp'

 ------------------------------------------------------------------------------------------------------------------------------- --------------

--OPTION-FILE 

------------------------------------------------------------------------------------------------------------------------------- ---------------

it is the concept of writing group of sqoop commandes in file and save that file with some name and run that file through command propmt

by using options file we can hide some information
it is used for reusabilty
--option-file path should be local directory. Don't use HDFS directory.

------------------------------------------------------------------------------------------------------------------------------- --------------
U HAVE TO CREATE OPTION FILE THIS MANNER ONLY OTHERWISE GET ERROR

filename:connection.details

 --------------------------
import
--connect jdbc:mysql://localhost/nag --username

root
--password
root
--table
emp
--target-dir
/op5 

-------------------------------------------------

sqoop --options-file /home/gopalkrishna/connection.details

 -----------------------------------------------------------------
to hide the username and pass word in options file

------------------------------------------------------------------------------------------------------------------------------- 

-------------
sqoop import --connect jdbc:mysql://localhost/nag --options-file

 /home/gopalkrishna/mahi/connection.details --table emp1 --target-dir /op3 

------------------------------------------------------------------------------------------------------------------------------- ------------

SQOOP IMPORT
-----------
to import data from rdbs to hdfs path

------------------------------------------------------------------------------------------------------------------------------- --------------
sqoop import --connect jdbc:mysql://localhost/nag --username root --password root --table emp --target-dir /ok1

------------------------------------------------------------------------------------------------------------------------------- -------------

note:in sqoop default parallel mappers are 4 then table data will be stored in hdfs as 4 mappers _SUCCESS
part-m-00000
part-m-00001

part-m-00002
part-m-00003( in this part-m files table data will be stored in hdfs as random manner) ------------------------------------------------------------------------------------------------------------------------------- ----------------

if we want to store all rdbms data into the single mapper in hdfs path

------------------------------------------------------------------------------------------------------------------------------- ---------------
sqoop import --connect jdbc:mysql://localhost/nag --username root --password root --table emp --target-dir /ok2 -m 1(--num-mappers) ------------------------------------------------------------------------------------------------------------------------------- ----------------

when we importing rdbms table doesnt have primary key there is problem beacuse there is no random access in between mappers then we get error to over come this problem then we need to set number of mappers as 1

------------------------------------------------------------------------------------------------------------------------------- 

------------
sqoop import --connect jdbc:mysql://localhost/nag --username root --password root --table emp1 --target-dir /ok4

during import: No primary key could be found for table emp1. Please specify one with --split-by or perform a sequential import with '-m 1'.
NOTE(here emp1 table dont have primary key because of that there is no random access in between default 4 mappers in this case u should use

--split-by or decrease the reducers as 1) 

------------------------------------------------------------------------------------------------------------------------------- ---------------
sqoop import --connect jdbc:mysql://localhost/nag --username root --password root --table emp1 --target-dir /ok4 -m 1

 ------------------------------------------------------------------------------------------------------------------------------- ----------------

------------------------------------------------------------------------------------------------------------------------------- --------------

sqoop import --connect jdbc:mysql://localhost/nag --username root --password root --table emp --target-dir /ok8 /ok9
error because we cant store data into 2 hdfs paths at time 

------------------------------------------------------------------------------------------------------------------------------- ---------------

--fields-terminated-by

 ------------------------------------------------------------------------------------------------------------------------------- ---------------
note sqoop by default writes the data into hdfs as semicolon then we can overwrite that delimiter by using --fields-terminated-by 

------------------------------------------------------------------------------------------------------------------------------- --------------
sqoop import --connect jdbc:mysql://localhost/nag --username root --password root --table emp --target-dir /mahi2 --fields-terminated-by '\t'

------------------------------------------------------------------------------------------------------------------------------- ---------------

WHERE 

------

based on the condition if we want import particular recordes from rdbs table hdfs path then we can use where

 ------------------------------------------------------------------------------------------------------------------------------- --------------

sqoop import --connect jdbc:mysql://localhost/nag --username root --password root --table emp --target-dir /mahi7 --fields-terminated-by '\t' --where 'sal>700'
sqoop import --connect jdbc:mysql://localhost/nag --username root --password root --table emp --target-dir /mahi9 --fields-terminated-by '\t' --where "sal>700 and deptno=30"

(NOTE QUOTES CAN BE SINGLE OR DOUBLE)

 ------------------------------------------------------------------------------------------------------------------------------- ----------------

COLUMNS

 -------

to import particular columns data from source rdbms table into target hdfs path

------------------------------------------------------------------------------------------------------------------------------- ----------------
sqoop import --connect jdbc:mysql://localhost/nag --username root --password root --table emp --target-dir /nag9 --fields-terminated-by '\t'

--where "sal>700 and deptno=30" --columns ename,deptno 

------------------------------------------------------------------------------------------------------------------------------- ---------------

APPLAYING MATHEMATICAL EXPRESSIONS ON THE COLUMNS

 ------------------------------------------------------------------------------------------------------------------------------- ----------------

commandes required 

------------------------------------------------------------------------------------------------------------------------------- ----------------

--QUERY 'SELECT STATMENT 'where$CONDITIONS ALONG WITH -M 1 NOTE(IN THIS CASE U SHOLUD NOT USE --TABLE OPTION)

------------------------------------------------------------------------------------------------------------------------------- -----------------
sqoop import --connect jdbc:mysql://localhost/nag --username root -password root --query 'select empno,sal+100 from emp where$CONDITIONS' -m 1 --target-dir /az 

------------------------------------------------------------------------------------------------------------------------------- ----------------

sqoop import --connect jdbc:mysql://localhost/nag --username root -password root --query 'select empno,sal+100 from emp
where sal>600 and deptno=30 and $CONDITIONS' -m 1 --columns empno --target-dir /az6 --fields-terminated-by '\t'

sqoop import --connect jdbc:mysql://localhost/nag --username root -password root --query 'select empno,sal+100 from emp
where sal>600 and deptno=30 or $CONDITIONS' -m 1 --columns empno --target-dir /az7 --fields-terminated-by '\t'

note(instead of AND WHEN U KEEPING OR INFRONT OF $CONDITION THEN IT WILL NOT CHECK AND CONDITION IT WILL DISPALY ALL RECORDES WHAT RECORDES ARE EXICUTED BY SELECT STATMENT)

 ------------------------------------------------------------------------------------------------------------------------------- -----------------

SET OPERATORS

 ---------------

UNION --IT IS USED TO ELIMINATING DUPLICATE ROWS FROM GIVEN TABLES IN SELECT STATMENTS
UNION ALL --IT WILL NOT ELIMINATE DUPLICATE ROWS FROM GIVEN TABLES IN

SELECT STATEMENTS

UNION
-----

 ------------------------------------------------------------------------------------------------------------------------------- ----------
sqoop import --connect jdbc:mysql://localhost/nag --username root --password root --query 'select * from emp where$CONDITIONS
union select * from emp1 where$CONDITIONS' --target-dir /az8 -m 1 --fields-terminated-by '\t'

sqoop import --connect jdbc:mysql://localhost/nag --username root --password root --query 'select * from emp where $CONDITIONS
union select * from emp1 where $CONDITIONS' --target-dir /bz -m 1 --fields-terminated-by '\t' --columns empno 

------------------------------------------------------------------------------------------------------------------------------- --------------

UNION ALL
---------
sqoop import --connect jdbc:mysql://localhost/nag --username root --password root --query 'select * from emp where $CONDITIONS
union all select * from emp1 where $CONDITIONS' --target-dir /az9 -m 1 --fields-terminated-by '\t'

------------------------------------------------------------------------------------------------------------------------------- ----------------
JOINS
-------

JOINS ARE USED TO RETRIVE DATA FROM MULTIPULE TABLES BASED ON THE COMMON COLUMN NAME WITH SAME DATA TYPE

EQUI JOIN--INNNER JOIN LEFT OUTER JOIN RIGHT OUTER JOIN

EQUI JOIN--INNNER JOIN

----------------------- 

-------------------------------------------------------------------------------------------------------------------------------

 --------------
sqoop import --connect jdbc:mysql://localhost/nag --username root --password root --query 'select * from emp e join emp1 d on(e.empno=d.empno) and $CONDITIONS' --m 1 --target-dir /j

ERROR tool.ImportTool: Imported Failed: Duplicate Column identifier specified: 'empno' sqoop import --connect jdbc:mysql://localhost/nag --username root --password root --query

'select e.empno,d.deptno from emp e join emp1 d on(e.empno=d.empno) where $CONDITIONS' --m 1 --target-dir /j4

------------------------------------------------------------------------------------------------------------------------------- 

---------------

join more then two tables
--------------------------- 

------------------------------------------------------------------------------------------------------------------------------- 

--------------
sqoop import --connect jdbc:mysql://localhost/nag --username root --password root --query 'select e.empno,d.deptno,f.ename from emp e join emp1 d

on(e.empno=d.empno) join emp3 f on(e.empno=f.empno) where $CONDITIONS' -m 1 --target-dir /j5 

------------------------------------------------------------------------------------------------------------------------------- 

---------------

LEFT OUTER JOIN
------------------ 

------------------------------------------------------------------------------------------------------------------------------- 

--------------
sqoop import --connect jdbc:mysql://localhost/nag --username root --password root --query 'select e.empno,d.deptno,f.ename from emp e left outer join emp1 d

on(e.empno=d.empno) left outer join emp3 f on(e.empno=f.empno) where $CONDITIONS' --m 1 --target-dir /j6 ------------------------------------------------------------------------------------------------------------------------------- 

----------------

RIGHT OUTER JOIN
------------------

 -------------------------------------------------------------------------------------------------------------------------------

 ---------------
sqoop import --connect jdbc:mysql://localhost/nag --username root --password root --query 'select e.empno,d.deptno,f.ename from emp e right outer join emp1 d

on(e.empno=d.empno) right outer join emp3 f on(e.empno=f.empno) where $CONDITIONS' --m 1 --target-dir /j6 

------------------------------------------------------------------------------------------------------------------------------- -------------------------------

GROUP BY
---------

 ------------------------------------------------------------------------------------------------------------------------------- 

---------------

sqoop import --connect jdbc:mysql://localhost/nag --username root --password root --query 'select deptno,sum(sal)from emp where $CONDITIONS group by deptno' --m 1 --target-dir /j8 note:in this case you should use group by class after $conditions otherwise it will give error 

-------------------------------------------------------------------------------------------------------------------------------

 ----------------

SELECT STATMENT IN DOUBLE QUOTES
--------------------------------
sqoop import --connect jdbc:mysql://localhost/nag --username root --password root --query "select * from emp where \$CONDITIONS" -m 1 --target-dir /j9
note:in this we should use backslash(\)in front of $CONDITIONS otherwise it will give an error)

 ------------------------------------------------------------------------------------------------------------------------------- 

---------------

APPEND

 -------

Append data to an existing directory in HDFS no need to create new dir

-------------------------------------------------------------------------------------------------------------------------------

 ---------------
sqoop import --connnect jdbc:mysql://localhost/nag --username root --password root --table emp --target-dir /k --append ------------------------------------------------------------------------------------------------------------------------------- ---------------

IMPORT ALL TABLES FROM RDBMS TO HDFS PAT

IMPORT-ALL-TABLES 

-----------------

------------------------------------------------------------------------------------------------------------------------------- 

---------------
sqoop import-all-tables --connect jdbc:mysql://localhost/nag --username root --password root -m 1

here u should mension -m1 and also u should not use --target-dir keyword here because all tables will not store one single hdfs direcory

 ------------------------------------------------------------------------------------------------------------------------------- 

----------------

IMPORT-ALL-TABLES WITH --EXCLUDE-TABLES IN HDFS OR HIVE PATH 

------------------------------------------------------------

-------------------------------------------------------------------------------------------------------------------------------

 ---------------
sqoop import-all-tables --exclude-tables emp,emp1,emp3,avg --connect jdbc:mysql://localhost/nag --username root -password root -m 1 --targer-dir /et

note:it is giveing error because all tables data will not be stored in specific hdfs path sqoop import-all-tables --exclude-tables emp,emp1,emp3,avg --connect jdbc:mysql://localhost/nag --username root -password root -m 1
Skipping table: emp

Skipping table: emp1
Skipping table: emp3 ---reaming tables will be stored in the root use directory 

------------------------------------------------------------------------------------------------------------------------------- 

---------------

TO HIVE PATH 

-------------

in this case source, RDBMS table will store in specified hive-warehouse dir path but we cant select imported table in hive prompt

------------------------------------------------------------------------------------------------------------------------------- -----
sqoop import --connect jdbc:mysql://localhost/abc --username root --password root --table emp --warehouse-dir /user/hive/warehouse

sqoop import --connect jdbc:mysql://localhost/abc --username root --password root --table emp --warehouse-dir /user/hive/warehouse/nag.db 

------------------------------------------------------------------------------------------------------------------------------- 

------

--hive-import
--------------
in this we need not mension hive warehouse-dir automatical import rdbms table will store in default hivewarehouse path
and we can select the imported table in hive prompt

-------------------------------------------------------------------------------------------------------------------------------

 ----------------
sqoop import --hive-import --connect jdbc:mysql://localhost/abc --username root --password root --table emp2 

------------------------------------------------------------------------------------------------------------------------------- 

----------------

--hive-import --hive-database

------------------------------

in this case source rdbms table will be stored specifed hive data base with schema as rdbms source table

------------------------------------------------------------------------------------------------------------------------------- 

----------------
sqoop import --hive-import --hive-database nag --connect jdbc:mysql://localhost/nag --username root -password root --table emp -m 1

 ------------------------------------------------------------------------------------------------------------------------------- ---------------

import-all-tables

 -------------------

it is used to import all tables from rdbms to hivewarehouth path but rdbms source tables schema will not be capied in hive prompt

-------------------------------------------------------------------------------------------------------------------------------

 ---------------
sqoop import-all-tables --connect jdbc:mysql://localhost/abc --username root --password root --warehouse-dir user/hive/warehouse 

-------------------------------------------------------------------------------------------------------------------------------

 --------------

import-all-tables --hive-import --hive-database -------------------------------------------------

in this case rdbms source tables will be stored in specified hive data base with all data with schema as name as source rdbms table schema

-------------------------------------------------------------------------------------------------------------------------------

 ---------------
sqoop import-all-tables --hive-import --hive-database perfect --connect jdbc:mysql://localhost/nag --username root -password root -m 1

 ------------------------------------------------------------------------------------------------------------------------------- 

---------------

import-all-tables --exclude-tables
------------------------------------ 

------------------------------------------------------------------------------------------------------------------------------- ---------------
sqoop import-all-tables --exclude-tables emp,emp1,emp3,avg,r,o --connect


jdbc:mysql://localhost/nag --username root -password root --warehouse-dir /user/hive/warehouse/oops.db

sqoop import-all-tables --hive-import --hive-datebase nag --exclude-tables emp,emp1,emp3,avg,r,o --connect jdbc:mysql://localhost/nag --username root -password root 

-------------------------------------------------------------------------------------------------------------------------------

 ---------------

--boundary-query 

----------------

for example theire is one source rdbms table which having r like from 1 to 1 lahk recordes when we dont want import all recoreds from source rdbms table to
hdfs path.if we want to import specified range of recordes from source rdbms table to hive path then we can use --boundary-query option because

in this option we can specify imported recoreds range like where needs to start import and where needs to import end then those recores only import from source
rdbms table to specified hdfs path

-------------------------------------------------------------------------------------------------------------------------------

 ------------------------------------------
sqoop import --connect jdbc:mysql://localhost/nag --username root -password root --table emp --boundary-query 'select 3,6 from emp'

--target-dir /manju7
NOTE:if u want to use --boundary-query exported table should be primary key and also u should mension in select statments primary key values only
sqoop import --connect jdbc:mysql://localhost/nag --username root -password root --table emp --boundary-query 'select 3,6 from emp3'
--target-dir /manju7 -m 1
here emp3 table not having primary key then u have mension -m 1 and also that colum contain any duplicat values it egnore duplicates it works like union in sql) 

------------------------------------------------------------------------------------------------------------------------------- 

------------------------------------------

--split-by 

-----------

--split-by is used to specify the column of the source rdbms table is used to generate splits for imports. it menans which column will be used to create the split while importing the data from source rdbms tables into your cluster.

Apache Sqoop will create splits based on the values present in the columns specified in the –split-by clause of the import command. If the –split-by clause is not specified, then the primary key of the table is used to create the splits while data import.

mysql> select * from emp2

; +----+------+
| id | name |
+----+------+

|10|aa |

 |20|bb |

 |30|cc |

 |40|dd |

 |50|ee |

 |50|ee | 

+----+------+

------------------------------------------------------------------------------------------------------------------------------- 

--
sqoop import --connect jdbc:mysql://localhost/abc --username root --password root --table emp2 --split-by id --target-dir /spli

duplicate recoreds also will be stored in default on of mapper

 ------------------------------------------------------------------------------------------------------------------------------- 

---

sqoop import --connect jdbc:mysql://localhost/abc --username root --password root --table emp2 --split-by id -m 5 --target-dir /spli

data will be stored in five mappers

mysql> select * from nul; 

+------+-------+
| sno | ename | 

+------+-------+ 

|NULL|abc | 

|NULL|c |

 | 1 | c |

| 2 | c |

 | 3 | c |

 | 3 | c |

 | 4 | c |

 +------+-------+

sqoop import --connect jdbc:mysql://localhost/abc --username root --password root --table nul --split-by sno --target-dir /nagu1

note inthis case null recoreds will not store into hdfs path TO INSERT NULL RECORDES ALSO

sqoop import --connect jdbc:mysql://localhost/abc --username root --password root --query 'select ifnull(sno,1),ename from nul where $CONDITIONS' --split-by sno -m 1 --target-dir /ok1

WHAT IS DIFFERENCE BETWEEN --WAREHOUSE-DIR AND --TARGET-DIR 

-------------------------------------------------------------

if you are importing more than one tables then you need to specify the warehouse-dir not target-dir ,
BUT IF U WANT TO IMPORT SINGLE TABLE THEN --TARGET-DIR IS ENOUGH

IMPORTING THE DATA FROM RDBMS TO HDFS IN DIFFERENT FORMS

 --------------------------------------------------------

-------------------------------------------------------------------------------------------------------------------------------

 -------------------------------------
sqoop import --connect jdbc:mysql://localhost/nag --username root -password root --table emp --as-textfile --target-dir

=============
sqoop import --connect jdbc:mysql://localhost/nag --username root -password root --table emp

--as-sequencefile --target-dir /rani

=================
sqoop import --connect jdbc:mysql://localhost/nag --username root -password root --table emp

--as-avrodatafile --target-dir /rani
sqoop import --connect jdbc:mysql://localhost/nag --username root -password root --table emp --as-parquetfile --target-dir /rani 

------------------------------------------------------------------------------------------------------------------------------- -------------------------------------S

IMPORTING THE DATA WITH “COMPRESSION TECHNIQUES”

 -----------------------------------------------

it will reduce the file size and also it speedup data transfer across the network By default, when using the --compress parameter, output files

will be compressed using the GZip codec, and all files will end up with a .gz extension You can choose any other codec using the --compression-codec parameter. The fol- lowing example uses the BZip2 codec instead of GZip (files on HDFS will end up having the .bz2 extension):

--compression-codec GzipCodec

-------------------------------------------------------------------------------------------------------------------------------

 ---------------------------------------
sqoop import --connect jdbc:mysql://localhost/nag --username root --password root --table emp3 --split-by empno -m 1 --target-dir /ab

--compress
or
sqoop import --connect jdbc:mysql://localhost/nag --username root --password root --table emp3 --split-by empno -m 1 --target-dir /ab11 --compression-codec GzipCodec
sqoop import --connect jdbc:mysql://localhost/nag --username root --password root --table emp3 --split-by empno -m 1 --target-dir /ao1 --compression-codec org.apache.hadoop.io.compress.GzipCodec

part-m-00000.gz --extension 

-------------------------------------------------------------------------------------------------------------------------------

 -----------------------------------------

--compression-codec BZip2Codec

 ----------------------------

-------------------------------------------------------------------------------------------------------------------------------

 ----------------------------------------
sqoop import --connect jdbc:mysql://localhost/nag --username root --password root --table emp3nag --username root --password root --table emp3 --split-by empno -m 1 --target-dir /ao --compression-codec org.apache.hadoop.io.compress.BZip2Codec part-m-00000.bz2-extension 

-------------------------------------------------------------------------------------------------------------------------------

 -----------------------------------------

--compression-codec SnappyCodecc 

-------------------------------

------------------------------------------------------------------------------------------------------------------------------- 

-----------------------------------------
sqoop import --connect jdbc:mysql://localhost/nag --username root --password root --table emp3 --split-by empno -m 1 --target-dir /ab11 --compression-codec SnappyCodec

sqoop import --connect jdbc:mysql://localhost/nag --username root --password root --table emp3 --split-by empno -m 1 --target-dir /ab6 --compression-codec org.apache.hadoop.io.compress.SnappyCodec
Table 2-2. Compression codecs

Splittable --Not Splittable
BZip2, LZO-- GZip, Snappy 

-------------------------------------------------------------------------------------------------------------------------------

 ------------------------------------------

--delete-target-dir 

------------------

delete the target directory if already available in hdfs path

------------------------------------------------------------------------------------------------------------------------------- 

------------------------------------------
sqoop import --connect jdbc:mysql://localhost/abc --username root --password root --table emp --target-dir /dou--delete-target-dir 

-------------------------------------------------------------------------------------------------------------------------------

 ------------------------------------------

SQOOP-JOB

 -----------

It specifies parameters to identify and recall the saved job.

Sqoop metastore that allows you to save all sqoop parameters for later reuse AND easily run them anytime

TO CREATE JOB 

-------------

-------------------------------------------------------------------------------------------------------------------------------

 ----------------------
sqoop job --create nag -- import --connect jdbc:mysql://localhost/nag --username root --password root --table emp --target-dir /aaa2 --APPEND

NOTE: -- IMPORT (U HAVE TO MAINTAIN GAP)

 -------------------------------------------------------------------------------------------------------------------------------

 ----------------------

TO SEE THE LIST OF JOBS

 -----------------------

sqoop job --list;

 -----------------------

To See the Details about a Job 

--------------------------------

------------------------------- 

sqoop job --show nag 

-----------------------------

To Execute a Job (after it will ask password for execution) 

-----------------

------------------------------

 sqoop job --exec gopaljob; 

-------------------------------

To Delete a Job

 --------------------------------

-------------------------------- 

Sqoop job --delete gopaljob 

---------------------------------

to edit sqoop job 

-----------------

steps
---
ls -a
cd .sqoop
nano metastore.db.script
in this file, we can modify min insert or max insert value

------------------------------------------------------------------------------------------------------------------------------- 

-----------------------------------------

what is the difference between append and incremental append

 ---------------------------------------------------------------

incremental append 

--------------------

-----------------------


it is used to import only newly inserted records than some previously imported set of records to hdfs

append
---------
Append command used to append records old the old directory, there is no need to over right or create a new directory it will be appended to the old directory

That is why incremental append asks for a parameter (last-modified or append). Choosing last-modified would add only updated records and choosing append would add all
newly added records

INCREMENTAL MODE

 -----------------

incremental append 

------------------

it is used to import only newly inserted records than some previously imported set of records to hdfs

--check-column
-------------
the check-column argument specifies which column needs to be checked during the import operation. The column can not be *CHAR types, like VARCHAR2 or CHAR.

sqoop job --create job1 -- import --connect jdbc:mysql://localhost/nag --username root --password root --table st --target-dir /job1 --incremental append --check-column empno --last-value 0
------

sqoop job --exec job1
now job1 path having six rec mysql> select * from st; 

+-------+---------+---------------------+
| empno | ename | hdate | 

+-------+---------+---------------------+

  • |  1 | nag | 2018-02-12 01:13:06 |

  • |  2 | mahesh | 2018-02-12 03:20:02 |

  • |  3 | malli | 2018-02-12 01:13:25 |

  • |  4 | eswar | 2018-02-12 03:56:34 |

  • |  5 | konda | 2018-02-12 03:55:52 |

  • |  6 | kasturi | 2018-02-12 04:21:45 | 

    +-------+---------+---------------------+
    after insert one rec into rdbms table
    insert into st(ename,empno)values(7,'nagendra') 

    insert into st(ename,empno)values(8,'sai') 

    ----------------------------------------

    I want to without insert previously inserted rec only insert newly insert records into /job1 path; sqoop job --exec job1

    now job1 path having 8 rec
    because only new inserted records inserted into /job1 path

    gopalkrishna@ubuntu:~$ sqoop job --create kondaa -- import --connect jdbc:mysql://localhost/abc --username root --password root --table ip --target-dir /great --incremental append --check-column ename --last-value 'uma'

    the job will be created but will get a runtime error

    18/03/31 21:30:14 ERROR tool.ImportTool: Error during import: Character column (ename) can not be used to determine which rows to incrementally import.

    ------------------------------------------------------------------------------------------------------------------------------- ----------------------------------------
    IT WILL IMPORT ONLY NEWLY INSERTED RECOREDS BUT IT WILL NOT IMPORT NEWLY UPDATED RECOREDS 

    ------------------------------------------------------------------------------------------------------------------------------- ----------------------------------------

    incremental lastmodified 

    ------------------------

    “lastmodified,” works on time-stamped data.
    use this when rows of the source table may be updated

    mysql> select * from ip; 

    +------+---------+---------------------+
    | emmo | ename | hdate | 

    +------+---------+---------------------+

  • |  1 | uma | 2018-03-31 21:18:54 | 

  • |  2 | naga | 2018-03-31 20:58:29 | 

  • |  3 | konda | 2018-03-31 20:58:41 |

  • |  4 | malli | 2018-03-31 21:09:45|

  • |  5 | kasturi | 2018-03-31 21:09:13 |

  • |  7 | hema | 2018-03-31 21:19:48 | 

    +------+---------+----------------------

    1. ------------------------------------------------------------------------------------------------------------------------------- -------------------------------------
      gopalkrishna@ubuntu:~$ sqoop job --create rani -- import --connect jdbc:mysql://localhost/abc --username root --password root --table ip --append --incremental lastmodified --check-column hdate --last-value '2018-03-31 21:18:54' --target-dir /sex

       ------------------------------------------------------------------------------------------------------------------------------- ------------------------------------

      gopalkrishna@ubuntu:~$ sqoop job --exec rani

      THEN ABOVE RECOREDS WILL BE STORED IN DEAULT 4 MAPPER

       -----------------------------------------------------

      LATER SOME CHANGES HAPPENDED IN SOURCE RDBMS TABLE

       ---------------------------------------------------

      mysql> update ip set ename='mahi' where ename='uma'; Query OK, 1 row affected (0.00 sec)
      Rows matched: 1 Changed: 1 Warnings: 0

      mysql> insert into ip(emmo,ename)values(6,'varma'); Query OK, 1 row affected (0.00 sec)

      sqoop job --exec rani

      OLD RECORED,UPDATED RECORED,NEWLY INSERTED RECORED ALL AVILABLE IN PART-M FILES 

      -------------------------------------------------------------------------------------------------------------------------------

       -----------------------------------------

      BUT OLD DATA SHOULD NOT AVAILABLE THEN UPDATED RECOREDS AND NEWLY INSERTED RECORDES SHOULD BE AVAILBLE(EVEN SOURCE TABLE DOENT HAVE TIMESTAMP COLUMN)

      mysql> select * from st;

       +-------+--------+---------------------+
      | empno | ename | hdate |

       +-------+--------+---------------------+

    2. | 1 | nag | 2018-02-12 01:13:06 |

    • |  2 | mahesh | 2018-02-12 03:20:02 |

    • |  3 | malli | 2018-02-12 01:13:25 |

    • |  4 | ewar | 2018-02-12 03:19:13 |

       +-------+--------+---------------------+

       import above table to hdfs path 

      ------------------------------

      sqoop import --connect jdbc:mysql://localhost/nag --username root --password root --table st --target-dir /s0

      LATER SOME RECORDES UPDATED AND SOME RECORDES UPDATED IN SOURCE RDBMS TABLE THOSE MODIFICATION NEEDS RO UPDATED IN HDFS PATH ----------------------------------------------------

      insert into st(empno,ename)values(5,konda);
      mysql> insert into st(empno,ename)values(5,'konda'); Query OK, 1 row affected (0.01 sec)

      mysql> update st set ename='eswar' where empno=4; Query OK, 1 row affected (0.00 sec)
      Rows matched: 1 Changed: 1 Warning: 0

      1. mysql> select * from st; 

        +-------+--------+---------------------+
        | empno | ename | hdate |

         +-------+--------+---------------------+

        |  1 | nag | 2018-02-12 01:13:06 |

        |  2 | mahesh | 2018-02-12 03:20:02 |

        |  3 | malli | 2018-02-12 01:13:25 |

        |  4 | eswar | 2018-02-12 03:56:34 |

        |  5 | konda | 2018-02-12 03:55:52 |

        +-------+--------+---------------------+

        5 rows in set (0.00 sec)

        ---------------------------------

        note: this modifications should be effected in privious hdfs path

         -----------------------------------------------------------------

        sqoop import --connect jdbc:mysql://localhost/nag --username root --password root table st --target-dir /s00

         -----------------------------------------------------------------------------------------------------------

        u should merge aboue two hdfs path

        ------------------------------------

        sqoop merge --merge-key empno --new-data /s00 --onto /s0 --target-dir /s000 ---class-name st(source table name) --jar-file //tmp/sqoop-gopalkrishna/compile/d704615d8031b4b72beb487318771bc3/st.jar(/soo jar file)

        then u get /s000 path we will get modified re,and new inserted reds
        here leftouter join applied in both /s00 and /so path
        and give result in one part-m file

         -------------------------------------------------------------------------------------------------------------------------------

         ----------------------------------------

        -------------------------------------------------------------------------------------------------------------------------------

         ----------------

        to-handle the null values in hdfs path

        -----------------------------------------

         mysql> select * from nul;

        +-------+------+-------+
        | empno | comm | ename | 

        +-------+------+-------+
        | 2| 3|NULL|
        | 3|NULL|mahi |
        | 5|NULL|malli| 

        +-------+------+-------+

        --null-string and --null-non-string

        sqoop import --connect jdbc:mysql://localhost/nag --username root --password root --table nul --target-dir /n0 --null-non-string 0 --null-string 'no user' --lines-terminated-by ':' -m 1

        while importing table instead of null values by insert user defined values in hdfs or hive path 2,3,no user:3,0,mahi:5,0,malli:

        --null-string -any string and data column null replace by udv
        --null-non-string --any numaric column null replaced by udv

         ------------------------------------------------------------------------------------------------------------------------------- 

        ----------------------------------------

        direct

        ======
        You can improve the performance by giving --direct option in sqoop

        --direct is only supported in mysql and postgresql.

        Sqoop’s direct mode does not support imports of BLOB, CLOB, or LONGVARBINARY columns.

        sqoop import --connect jdbc:mysql://localhost/nag --username root --password root --table emp3 --target-dir /anju6 -m 1 --direct

        https://sqoop.apache.org/docs/1.4.2/SqoopUserGuide.html https://www.tutorialspoint.com/sqoop/sqoop_job.htm

        EXPORT

        ========

         FROM HDFS PATH TO RBMS TABLE

        ------------------------------------------------------------------------------------------------------------------------------- 

        -------------
        sqoop export --connect jdbc:mysql://localhost/nag --username root --password root --table emp4 --export-dir /anju8

        sqoop export --connect jdbc:mysql://localhost/nag --username root --password root --table emp4 --export-dir /anju8/part-m-00000--possible
        (--TABLE EMP4 SHOULD BE IN RDMS WITH REQUIRED SCEHEMA)
        sqoop export --connect jdbc:mysql://localhost/nag --username root --password root --table emp4 --export-dir /anju8/part-m-00000

        those recordes will be appended to the target rdbms table
        qoop export --connect jdbc:mysql://localhost/nag --username root --password root --table emp4 --export-dir /anju8/part-m-00000 --fields-terminated-by '\t'

        ------------------------------------------------------------------------------------------------------------------------------- 

        ---------------

        UPDATE-KEY

         ----------

        NOTE UPDATE-KEY COLUMN SHOULD BE PRIMARY AND UNIQUE CONSTRAINTS 

        ---------------------------------------------------------------

        INSERT mode – This is default mode to export. If you don’t specify anything in Sqoop export command this will be picked up by default. This mode is useful when you only inject the records into the table.

        UPDATE mode – In “update mode,” Sqoop generates UPDATE statements that replace existing records in the database. Legal values for mode include `updateonly` (default) and `allowinsert`.

        But, when I need to update already existing records I have to use --update-key

        SECANARIO 

        ----------

        ALREADY DATA AVAILABLE IN RDBMS TARGET TABLE

        mysql> select * from emp1; 

        +----+------+
        | id | name |
        +----+------+

        |10 | a  |
        |20 | bb  |
        +----+------+
        2 rows in set (0.00 sec)

        NOTE IN HDFS EXPORT DIR PATH SOME RECORDES ARE UPDATED AND SOME RECOREDS ARE INSERTED THOSE MPDIFICATION SHOULD BE MODIFIED IN SOURCE RDBMS
        TABLE

        cat kl
        10,abc
        20,bb
        30,c
        40,nag
        gopalkrishna@ubuntu:~$ hadoop fs -mkdir /aim

         gopalkrishna@ubuntu:~$ hadoop fs -put kl /aim

        sqoop export --connect jdbc:mysql://localhost/abc --username root --password root --table emp1 \
        --export-dir /kl/kl --update-key id --update-mode allowinsert --verbose

        AFTER RAN THE SQOOP JOB

        mysql> select * from emp1; 

        +----+------+
        | id | name |
        +----+------+

        |10 | abc |

        |20 | bb |

         | 30  | c  |

         | 40 | nag |

        important interview question

         ===================================

        Data types conversion issue:- will have to be very careful when we import the data from RDBMS to hadoop system, you will notice default conversion happening and are not suitable to the business need . we can use map-column-java/ map-column-hive function to handle this issue.

        ex

         ====

        mysql> select * from e;

         +------+------+
         | id | name |

         +------+------+

        | 10 | aaa |
        | 20 | bbb |
        | 30 | ccc |
        | 40 | ddd | 

        +------+------+
        4 rows in set (0.00 sec)

        mysql> desc e; 

        +-------+-------------+------+-----+---------+-------+
        | Field | Type | Null | Key | Default | Extra |

         +-------+-------------+------+-----+---------+-------+

         |id |varchar(10)|YES | |NULL | 

        | |name |varchar(10)|YES | |NULL | 

        | +-------+-------------+------+-----+---------+-------+
        2 rows in set (0.00 sec)

        gopalkrishna@ubuntu:~$ sqoop import --connect jdbc:mysql://localhost:3306/abc --table e --username root --password root --map-column-hive id=int --hive-import --create-hive-table --hive-database prudhvi -m 1

        hive> select * from e; OK
        10 aaa
        20 bbb

        30 ccc
        40 ddd
        Time taken: 0.756 seconds, Fetched: 4 row(s) hive> desc e;
        OK
        id int
        name
        Time taken: 0.144 seconds, Fetched: 2 row(s)

        Important 

        --------------

        sqoop import \
        --connect jdbc:mysql://ms.itversity.com:3306/retail_db \ --username retail_user \
        --password itversity \
        --table orders \
        --warehouse-dir /user/dgadiraju/sqoop_import/retail_db \ --split-by order_status

        (order_status it is not a primary key column). so we get error. we can resolve this problem in below command

        sqoop import \ -Dorg.apache.sqoop.splitter.allow_text_splitter=true \ --connect jdbc:mysql://ms.itversity.com:3306/retail_db \ --username retail_user \
        --password itversity \
        --table orders \
        --warehouse-dir /user/dgadiraju/sqoop_import/retail_db \ --split-by order_status

        (order_status it is not a primary key column but we can use -Dorg ... property , we won't get any error)

        string

        -> -Dorg.apache.sqoop.splitter.allow_text_splitter=true
        This property may use on --split-by <column name> (non-numeric columns in those it allows duplicate values) . so we can resolve this problem we may use that property. we don't get error.

        -> If you use --split-by <column name> (numeric columns which as primary key column) we can follow these restrictions, we don't wan to use above property.

        ->If the primary key column or the column specified in split-by clause is non numeric type, then we need to use this additional argument
        -Dorg.apache.sqoop.splitter.allow_text_splitter=true

        --autoreset-to-one-mapper 

        ==================================

        Import should use one mapper if a table has no primary key and no split-by column is provided. cannot be used with --Split-by <col> option is
        (--autoreset-to-one-mapper)

        -> If a table does not have a primary key defined and the --split-by <col> is not provided, then import will fail unless the number of mappers is explicitly set to one with the --num-mappers 1 option or the --autoreset-to-one-mapper option is used. The option --autoreset-to-one-mapper is typically used with the import-all-tables tool to automatically handle tables without a primary key in a schema.

        sqoop import \
        --connect jdbc:mysql://ms.itversity.com:3306/retail_db \ --username retail_user \
        --password itversity \
        --table order_items_nopk \
        --warehouse-dir /user/dgadiraju/sqoop_import/retail_db \ --autoreset-to-one-mapper

        static partition using sqoop with hive warehouse ( we can't create dynamic partition using sqoop) ======================================================================== ================================

        sqoop import --connect jdbc:mysql://localhost:3306/abc --username root --password root --hive-import --query "select id,sal from uemp where name = 'ccc' and \$CONDITIONS" --hive-partition-key name --hive-partition-value 'ccc' --split-by id --target-dir /aapp --hive-table VP1 --hive-database prudhvi -m 1

Saturday, 21 March 2020

Easy to Understand Kafka Importance

KAFKA DOCUMENTATION

Apache Kafka is a community distributed event streaming platform capable of handling trillions of events a day. Initially conceived as a messaging queue, Kafka is based on an abstraction of a distributed commit log. Since being created and open-sourced by LinkedIn in 2011, Kafka has quickly evolved from messaging queue to a full-fledged event streaming platform.

TOPIC: 
A particular stream of Data

* Similar to a table in a database(without all constraints)
* You can have as many topics  as you want
* A topic is identified by its name
*Topics are split into Partitions


  

*Each partition is ordered
*Each message within a partition gets on incremental id, called offset

Example:
*say you have a fleet of trucks, each truck reports it’s GPS position to Kafka
*You can have a topic truck-GPS that contains the position of all trucks
*Each truck will send a message to Kafka every 20 seconds, each message will contain the Truck ID
& Truck position(latitude & longitude)
*we choose to create that topic with 10 partitions(orbitary number)
*offset only have a meaning for a specific partition, Eg offset 3 in partition 0 doesn't represent the same data as offset 3 in partition 1
*Order is guaranteed only within a partition(not across partitions)
*Data is kept only for a limited time(default is one week)
*once the data is written to a partition, it can’t be changed(Immutability)
*Data is assigned randomly to a partition unless a key is provided(more on this later)

BROKERS:
*A Kafka cluster is composed of multiple brokers(Servers)
*Each broker is identified with its ID(integer)
*Each broker contains certain topic partitions.
*After connecting to any broker ( called a bootstrap broker), you will be connected to the either cluster
*A good number to get started is 3 brokers, but some  big clusters have over 100 brokes
*In these examples we choose to number brokers starting at 100(orbitary)

 Broker 
  101
 
   Broker
     102
 
Broker
 103
 







BROKER & TOPICS
* Example of Topic-A with 3 Partitions
* Example of Topic-B with 2 Partitions
















*NOTE:Data is distributed & Broker 103 doesn’t have any Topic B data.

Topic Replication Factor
* Topics should have a replication factor >1(usually between 2 & 3)
*This way if a broker is down, another broker can serve the data
*Example: Topic-A with 2 partitions & replication of 2
*Example: we lost Broker 102
*Result: Broker 101 & 103 can still serve the data
*At any time only ONE broker can be a leader for a given partition.
*only that leader can receive & serve data for a partition
*The other brokers will synchronize the data
*Therefore each partition has one leader & multiple ISR(in-sync-replica)

PRODUCERS
*Producers write data to topics(which is made of partitions)
*Producers automatically know to which broker & partition to write to.
*In case of broker failures, producers will automatically recover
*Producers can choose to receive acknowledgment of data writes:
*acks=0 producer Don’t wait for an acknowledgment (possible data loss)
*acks=1 Producer will wait for leader acknowledgment (limited data loss)
*acks=all leader +replicas acknowledgment(no data loss)

Producers: Message keys
*Producer can choose to send a key with the message (String, number, etc)
*If key=null, data is sent round-robin (broker 101 then 102 then 103...)
*If the key is sent, then all messages for that key will always go to the same partition
*A key is basically sent if you need message orbitary for a specific field(ex:trick_id)

CONSUMERS & CONSUMER GROUPS

CONSUMERS
*Consumers read data from a topic (Identified by name)
*Consumers Known which broker to read from
*In case of broker failures, Consumes Know how to recover
*Data is read in order with each Partition

CONSUMER GROUP
*Consumers read data in consumer groups
*Each Consumer within a group reads from exclusive Partitions.
*If you have more consumers than Partitions, some consumers will be inactive

CONSUMER OFFSETS
*Kafka stores the offsets at which a consumer group has been reading.
*The offsets committed live in a Kafka topic named_consumer_offsets(_ _)
*When a consumer in a group has processed data received from Kafka, it should be committing the offsets
*if a consumer disc,it will be able to read back from where it left off thanks to the committed consumer offsets

Delivery Semantics for Consumers
*Consumers choose when to commit offsets
*There are 3 delivery semantics:
            *At most once:
            >Offsets are committed as soon as the message is received.
            >if the processing goes wrong, the message will be lost (it won’t be read again)
          *At least once(usually Preferred):
            >offsets are committed after the message is processed.
            >If the processing goes wrong, the message will be read again.
            >This can result in duplicate processing is idempotent(i.e processing again the messages won’t impact your systems).
     *Exactly Once:
            >Can be achived for kafka => Kafka workflows using Kafka streams API
            >For kafka => External system workflows, use an idempotent consumer.

 Kafka Broker Discovery
·      Every Kafka broker is also called a “bootstrap server”
·     That means that you only need to connect to the entire cluster
·     Each broker knows about all brokers, topics & partition(metadata)




ZOOKEEPER:
·     Zookeeper manages brokers(keeps a list of then)
·     Zookeeper helps in performing leader election for partitions
·     Zookeeper sends notifications to kafka in case of changes(e.g new topic, broker dies, broker comes up,delete toipc, etc...)
·     kafka can’t work witout zookeeper.
·     zookeeper by design operates with an odd number of servers(3,5,7)
·     Zookeeper has a leader (handle writes) the reset of the servers are followers
·     zookeeper does not store consumer offsets with kafka >v0.10
·       

KAFKA GUARANTEES:
·     Messages are appended to a topic-partition in the order they are sent.
·     Consumers read messages in the order stored in a topic-partition.
·     With a replication factor of N, Producers & Consumers can tolerate upto N-1 brokers being down.
·     This is why a replication factor of 3 is a good idea.
·     Allows for one broker to be taken down for maintenance.
·     Allows for another broker to be taken down unexpectedly.
·     As long as the number of partitions remains constant for a topic (no new partitions)the same key will always go to the same partition

COMMANDS TO START KAFKA
Zookeeper server starts
zookeeper-server-start.sh config/zookeeper.properties

#Kafka Topic
 kafka-server-start.sh config/server.properties

#Create Topic
kafka-topics.sh --zookeeper 127.0.0.1:2181 --topic third-topic --create --partitions 3 --replication-factor 1

#To check kafka topic created or not
kafka-topics.sh--zookeeper 127.0.0.1:2181 --list

#To check the details of topic
kafka-topics.sh --zookeeper 127.0.0.1:2181 --topic first_topic --describe

#Delete topic
kafka-topics.sh --zookeeper 127.0.0.1:2181--topic second_topic --delete

#kafka console producer CLI
$kafka-console-producer.sh(It shows list)

#Create Message in producer
kafka-console-prpducer.sh--broker-list 127.0.0.1:9092 --topic first_topic

>Hello
>How are You


#Creating producer property

Kafka -console-producer --broker-list 127.0.0.1:9092 --topic first_topic --producer-property acks=all

#Automatically create topic if given the wrong name in that name one new topic is created

kafka-console-producer --broker-list 127.0.0.1:9092 --topic new_topic
>hey this topic does not exit!
    WARN[producer clientID = console-producer
>another message


#check the list of kafka topics
kafka-topics.sh --zookeeper 127.0.0.1:2181 --list

#By default to set the partition
3 or more
in nano config/server.properties

#kafka console consumer CLI

kafka-console-consumer.sh

kafka-console-consumer.sh --bootstarp-server 127.0.0.1:9092 --topic first-topic

kafka-console-consumer.sh --bootstarp-server 127.0.0.1:9092 --topic first-topic --from -begining 

#kafka consumers in Group(From IDE)
kafka-console-consumer --bootstrap-server 127.0.0.1:9092 --topic first-topic --group my-first-application

$kafka-consumer-groups.sh

$kafka--consumer-groups --bootstraps-server localhost:9092 --list

my-first-application
console-consumer-10824
my-second-application
console-consumer-1052

#Resetting offsets
How to do restart(--to-datetime,--by-period,--to-earliest,--to-latest,--shift-by,--from-file,--to-current)

$kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group my-first-application --reset-offsets --to-earliest --execute --topic first_topic

$kafka-consumer-groups.sh --bootstrap.server localhost:9092 --decribe –group my-first-application




#Shift-by(which it takes back messages)

kafka-consumer-groups –bootstrap-server localhost:9092 --group my-first-application --reset-offsets --shift-by -2 --execute --topic first_topic


Kafka Producer:

from kafka import KafkaProducer
from kafka.errors import KafkaError

producer = KafkaProducer(bootstrap_servers=['broker1:1234'])

# Asynchronous by default 
future = producer.send('my-topic', b'raw_bytes')

# Block for 'synchronous' sends
try:
            record_metadata = future.get(timeout=10)
except KafkaError:
            # Decide what to do if produce request failed...
            log.exception()
            pass

# Successful result returns assigned partition and offset
print (record_metadata.topic)
print (record_metadata.partition)
print (record_metadata.offset)

# produce keyed messages to enable hashed partitioning
producer.send('my-topic', key=b'foo', value=b'bar')

# encode objects via msgpack
producer = KafkaProducer(value_serializer=msgpack.dumps)
producer.send('msgpack-topic', {'key': 'value'})

# produce json messages
producer = KafkaProducer(value_serializer=lambda m: json.dumps(m).encode('ascii'))
producer.send('json-topic', {'key': 'value'})

# produce asynchronously
for _ in range(100):
            producer.send('my-topic', b'msg')
def on_send_success(record_metadata):
            print(record_metadata.topic)
            print(record_metadata.partition)
            print(record_metadata.offset)

def on_send_error(excp):
            log.error('I am an errback', exc_info=excp)
            # handle exception

# produce asynchronously with callbacks
producer.send('my-topic', b'raw_bytes').add_callback(on_send_success).add_errback(on_
˓→send_error)

# block until all async messages are sent
producer.flush()

# configure multiple retries
producer = KafkaProducer(retries=5)


Kafka Consumer:

from kafka import KafkaConsumer

# To consume latest messages and auto-commit offsets
consumer = KafkaConsumer('my-topic',
                         group_id='my-group',
                         bootstrap_servers=['localhost:9092'])
for message in consumer:
   # message value and key are raw bytes -- decode if necessary!
   # e.g., for unicode: `message.value.decode('utf-8')`
   print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
                                          message.offset, message.key,
                                          message.value))

# consume earliest available messages, don't commit offsets
KafkaConsumer(auto_offset_reset='earliest', enable_auto_commit=False)

# consume json messages
KafkaConsumer(value_deserializer=lambda m: json.loads(m.decode('ascii')))

# consume msgpack
KafkaConsumer(value_deserializer=msgpack.unpackb)

# StopIteration if no message after 1sec
KafkaConsumer(consumer_timeout_ms=1000)

# Subscribe to a regex topic pattern
consumer = KafkaConsumer()
consumer.subscribe(pattern='^awesome.*')

# Use multiple consumers in parallel w/ 0.9 kafka brokers
# typically you would run each on a different server / process / CPU
consumer1 = KafkaConsumer('my-topic',
                          group_id='my-group',
                          bootstrap_servers='my.server.com')
consumer2 = KafkaConsumer('my-topic',
                          group_id='my-group',
                          bootstrap_servers='my.server.com')


#acks &min.insync replicas

producers Acks Deep Drive
acks =0(no acks)

·     No response is requested
·     If the broker goes offline or an exception happens, we want to know & will lose data
·     Useful for data where it’s okay to potentially lose messages:
            #Metrics collection
            #Log Collection
·     Leader response is requested, but replication is not a guarantee(happens in the background)
·     If an ack is not received,the producer may retry
·       
·     If the leader broker goes offline but replicas haven’t replicated the data yet, we have a data loss.
·     Leader + Replicas ack requested


#Retries & max.in.flight.requests.per.connection

Producer retries
            In case of transient failures,developerrs are expected to handle exceptions,otherwise the data will be lost
            >Example of transient failure.
            >Not Enough ReplicationsException
There is a “retries” setting
            >defaults to 0
            >you can increase to a high number, ex Integer.MAX_VALUE
·     In case of retries, by default, there is a chance that messages will be sent out of order(if a batch has failed to be sent)
·     If you rely on key-based ordering, that can be an issue.
·     For this, you can set the security while controls how many produce requests can be made in parallel: max.in.flight.requests.per.connection
            .Default :5
            .set it to 1 if you need to ensure ordering (may impact throughput)
·     In kafka>=1.0.0,there a better solution!  

#Idempotent Producer
*Here’s the problem: the producer can introduce duplicate messages in kafka due to network errors.

*In Kafka >=0.11 you can define an “idempotent producer” which won’t introduce duplicates on network error

*idempotent producers are great to guarantee a stable & safe pipeline!

*They come with:
            #retries = Integer.MAX_VALUE (2{31-1=214783647)
            #max.in.flight.requests=1(kafka >0.11<1.1) or
            #max.in.flight.requests=5(kafka>1.1-higher performance)
            #acks=all
            #just set:
                        *ProducerProps.Put(“enable.idempotence”,true);

Safe producer summery & Demo

kafka<0.11
            .acks=all(producer level)
            .Ensures data is property replicated before an ack is received
            .min.insync.replicas=2(broker/topic level)
            .Encures two brokers in ISR at least have the data after an ack
            .retries=MAX_INT(Producer level)
            .Ensures transient errors are retried intefinetly
            .max.in.flight.request.per.connection=1(producer level)
            .Ensures only one request is tried at any time,preventing message re-ordering in case retries.

Kafka>=0.11
            .enable.idempotence=true(Producer level) + min.insync.replicas=2(broker/topic level)
            .Implies acks =all,retries = MAX_INT,max.in.flight.requests.per.connection=5(default)
            .while keeping ordering guarantess & improving performance!
            .Running a “safe producer”might impact throughput & latency,always test for your use case.

PRODUCER COMPRESSION
Message Compression
*Producer usually send data this is text-based, for example with JSON data.
*In this case, it is important to apply compression to the producer
*Compression is enabled at the producer level & doesn’t require any configuration change in the Brokers or in the consumers
*” Compression.type” can be ‘name(default),’gzip’, ’snappy’
*compression is more effective the bigger the batch of message being sent to kafka
*The compressed batch has the following advantage:
 Much smaller producer request size(compression ratio up to 4x!)
*Faster to transfer data over the network =>less latency
*Better throughput
*Better disk utilison in Kafka (stored messages on disk are smaller)

Disadvantages(very minor):
*Producers must commit some CPU cycles to compression
*Consumers must Commit some CPU cycles to decompression

*Overall:
            *Cosider testing snappy  for optimal speed/compression ratio

Message Compression Recommendations
*Find a compression algorithm that gives you the best peformance for your specific data test all of them!
*Always use compression in production & especially if you have high throughput
*Consider tweaking linger.ms & batch.size to have bigger batches & therefore more compression & higher throughput


Linger.ms & batch.size
.By default,kafka tries to send records as soon as posible
            >It will have up to 5 requests in flight, meaning up to 5 messages individually sent at the some time.
            >After this, if more messages have to be sent while others are in flight, Kafka is smart & will start batching them while they wait to send them all at once.
            >This smart batching allows kafka to increase throughput while maintaining very low latency
            >Batches have higher compression ratio so better efficiency
            >So how can we control the batching mechanism?
*Linger .ms: Number of milliseconds a producer is willing to wait before sending a batch out(default 0)
*By introducing some lag (for example linger.ms=5),we increase the chances of messages being sent together in a batch
*So at the expense of introducing a small delay,we can increase throughput compression & efficiency of our producer
*If a batch is full (see batch.size)before the end of the linger.ms period, it will be sent to kafka right away!

Batch Size
*batch.size:Maximum number of bytes that will be included in a batch. The default is 16 KB
*Incrreasing a batch size to something like32KB or 64KB can help to increase the compression, throughput & efficiency of requests
*Any message that is bigger than the batch size will not be batched
*A batch is allocated per partition, so make sure that you don’t set it to a number that’s too high,otherwise you run waste memory!
*(Note: you can monitor the average batch size metric using Kafka producer Metrics)

High Throughput Producer demo 
*we’ll add snappy message compression in our producer
*Snappy is very helpful if your message are text based, for example, log lines or JSON documents
*Snappy has a good balance of CPU /Compression ratio
*we’ll also increase the batchsize to 32KB & introduce a small delay through linger ms (20ms)

High throughput latency expences & CPU Usage            
Properties.setProperty(ProducerConfig.COMPARESSION TYPE_CONFIG,”snappy”);
Properties.setProperty(ProducerConfig.LINGER_MS_CONFIG,”20”)
Properties.setProperty(ProducerConfig.Batch_SIZE_CONFIG,Integer to String(32*1024); //32 kilobyte size

Producer Default Partition & how keys are hashed        
*By default, your keys are hushed using the “murmur2” algorithm.
*It is most likely preferred to not override the behavior of the partitioner, but it is possible to do so(partitioner.class)
*The formula is:
targetPartition =utils.abs(Utils.murmur2(recrd.key()))%numpartitions;
*This means that the same key will go to the same partition (we already know this),& adding partitions to a topic will completely alter the formula.

Max.block.ms & buffer.memory
*If the producer produces faster than the broker can take,the records will be buffered in memory
*buffer.memory=33554432(32MB):the size of the send buffer
*That buffer will fill up over time & fill back down when the throughput to the broker increases
*If that buffer is full(all 32MB),then the send() method will start to block(won’t return right away)
*max.block.ms=6000:the time the .send() will block untill throwing an exception.Exceptions are basically thrown when
            >The producer has filled up its buffer
            >The broker is not accepting any new data
            >60 seconds has elapsed
*If you hit an exception hit that usually means your brokers are down or overloaded as they can’t respond to requests.



Consumer poll Behavior     
.Kafka consumers have a “poll” model,while many other messing buses in enterprises have a” push” model
.This allows consumers to control wherein the log they want to consume, how fast & gives them the ability to reply events.

Consumer poll Behaviour
*Fetch.min.bytes(default 1):
            >controls how much data you want to pull at least on each request
            >Helps improving throughput & decreasing request number
            >At the cost of latency
*Max.poll.records(default 500):
            >Controls how many records to receive per poll request
            >Increase if your messages are very small & have a lot of available RAM
            >Good to monitor how many records are polled per request
*Max.partitions.fetch.bytes(default 1MB)
            >Maximum data returned by the broker per partition
            >If you read from 100 partitions, you’ll need a lot of memory(RAM)
*Fetch.max.bytes(default 50MB)
            >Maximum data returned for each fetch request(covers muliple Partitions)
*Change these setting only if your consumer maxes out on throughput already

Consumer offset commits strategies
*There are two most common patterns for committing offsets in a consumer application
*2 Strategies
            >(easy) enable.auto.commit=true &synchronous processing of batches
            >(medium)enable.auto.commit=false & manual commit of offsets
            >enable.auto.commit = true & synchronous processsing of batches

while(true){
            List<Records>batch = consumer.poll(Duration.ofMillis(100))
            do sometingSynchronous(batch)
}

*with auto-commit, offsets will be committed automatically for you at regular interval
(auto.commit.interval.ms=500 by default)
every-time you call .poll()
*If you don’t use synchronous processing, you will be in “at-most-once” behavior because offsets will be committed before your data is processed.
*enable.auto.commit =false & synchronous processing of batches

while(true){
            batch + = Consumer.poll(Duration.ofMillis(100)
            if is Ready(batch){
                        doSomethingsynchronous(batch)
                        Consumer.CommitSync();
            }
}

*you control when you commit offsets & whats the condition for committing them.
*Example: accumulating records into a buffer & then flushing the buffer to a database + committing offsets then.

Consumer offset Reset Behaviour
*The behavior for the consumer is to then use:
            >auto.offset.reset = latest:will read the end of the log
            >auto.offset.reset = earliest:will read from the start of the log
            >auto.offset.reset = none:will throw exception if no offset is found
*Additionally consumer offsets can be lost:
            >If a consumer hasn’t read new data in 1 day(kafka < 2.0)
            >If a consumer hasn’t read new data in 7 days(kafka>=2.0)
*This can be controlled by the broker settting offset.retention.minutes
*To reply data for a consumer group:
            >Take all the consumers from a specific group down 
            >use kafka -consumer- groups command to set offset to what you want
            >Restart Consumers

Bottom line
*Set proper data retention period & offset retention period & offset retention period.
*Ensure the auto offset rest behavior is the one you except/want

kafka-consumer-groups –bootstrap-server 127.0.0.1:9092 –group kafka-demo-elasticsearch –reset-offsets –execute –to -earliest –topic manju1

Consumer Heartbeat Thread
*Heartbeats are sent periodically to the broker
*If no heartbeat is sent during what period the consumer is considered dead
*Set even lower to faster consumer rebalances
*Heartbeat.interval.ms(default 3 seconds):
            >How often to send Heartbeats
            >usually set to 3rdof session.timeout.ms
*Take-away: This mechanism is used to detect a consumer application being down

Consumer Poll Thread
*max.poll.intervals.ms(default 5 minutes):
            *Maximum amount of time between two .poll() calls before declaring the consumer dead.
            *This is particularly relevant for BigdData frameworks like the spark in case of the processing tasks time
*Take-away: This mechanism is used to detect a data processing issue with the Consumer.