Programovanie

Vytvorené pre reálny čas: Posielanie veľkých dát pomocou aplikácie Apache Kafka, 1. časť

Keď sa začal pohyb veľkých dát, bol väčšinou zameraný na dávkové spracovanie. Distribuované nástroje na ukladanie a dopytovanie údajov, ako sú MapReduce, Hive a Pig, boli navrhnuté na spracovanie údajov v dávkach a nie kontinuálne. Podniky by každý večer spustili viac úloh, aby extrahovali údaje z databázy, potom ich analyzovali, transformovali a prípadne uložili. Podniky v poslednej dobe objavili silu analyzovať a spracovávať údaje a udalosti ako sa stávajú, nielen raz za niekoľko hodín. Väčšina tradičných systémov na odosielanie správ sa však nezväčšuje, aby zvládla veľké dáta v reálnom čase. Inžinieri v spoločnosti LinkedIn teda vytvorili a otvorili Apache Kafka: rámec distribuovaného zasielania správ, ktorý spĺňa požiadavky veľkých dát škálovaním na komoditný hardvér.

Za posledných pár rokov sa vyvinul Apache Kafka, ktorý rieši rôzne prípady použitia. V najjednoduchšom prípade to môže byť jednoduchá vyrovnávacia pamäť na ukladanie protokolov aplikácií. V kombinácii s technológiou, ako je Spark Streaming, sa dá použiť na sledovanie zmien údajov a vykonanie opatrení na základe týchto údajov pred ich uložením do konečného cieľa. Vďaka prediktívnemu režimu Kafka je výkonným nástrojom na odhaľovanie podvodov, ako je kontrola platnosti transakcie kreditnou kartou, keď k nej dôjde, a nečakanie na dávkové spracovanie o hodiny neskôr.

Tento dvojdielny tutoriál predstavuje Kafku, počnúc tým, ako ju nainštalovať a spustiť vo vašom vývojovom prostredí. Získate prehľad o architektúre spoločnosti Kafka, nasledovaný úvodom do vývoja hotového systému správ Apache Kafka. Na záver vytvoríte vlastnú aplikáciu výrobcu / spotrebiteľa, ktorá odosiela a spotrebúva správy prostredníctvom servera Kafka. V druhej polovici tutoriálu sa dozviete, ako rozdeliť a zoskupiť správy a ako ovládať, ktoré správy bude spotrebiteľ Kafky konzumovať.

Čo je Apache Kafka?

Apache Kafka je systém správ zostavený na mieru pre veľké dáta. Podobne ako Apache ActiveMQ alebo RabbitMq, Kafka umožňuje komunikáciu aplikácií postavených na rôznych platformách prostredníctvom asynchrónneho posielania správ. Ale Kafka sa od týchto tradičnejších systémov zasielania správ líši kľúčovými spôsobmi:

  • Je navrhnutý na horizontálne zväčšenie pridaním ďalších komoditných serverov.
  • Poskytuje oveľa vyššiu priepustnosť pre výrobné aj spotrebiteľské procesy.
  • Môže byť použitý na podporu dávkových aj real-time prípadov použitia.
  • Nepodporuje JMS, prostredníctvo rozhrania API pre správu Java.

Architektúra Apache Kafku

Predtým, ako preskúmame architektúru Kafky, mali by ste poznať jej základnú terminológiu:

  • A producent je proces, ktorý dokáže zverejniť správu k téme.
  • a spotrebiteľ je proces, ktorý sa môže prihlásiť na odber jednej alebo viacerých tém a konzumovať správy zverejnené k týmto témam.
  • A tematická kategória je názov informačného kanála, do ktorého sa správy zverejňujú.
  • A sprostredkovateľ je proces bežiaci na jednom počítači.
  • A zhluk je skupina sprostredkovateľov spolupracujúcich.

Architektúra Apache Kafka je veľmi jednoduchá, čo môže mať v niektorých systémoch za následok lepší výkon a priepustnosť. Každá téma v Kafke je ako jednoduchý súbor denníka. Keď producent zverejní správu, server Kafka ju pripojí na koniec súboru denníka pre danú tému. Server tiež priradí vyrovnať, čo je číslo používané na trvalú identifikáciu každej správy. S rastúcim počtom správ sa zvyšuje hodnota každého posunutia; napríklad ak producent zverejní tri správy, prvá môže získať posun 1, druhá posun 2 a tretia posun 3.

Pri prvom spustení spotrebiteľa Kafka odošle požiadavku na načítanie na server so žiadosťou o načítanie akýchkoľvek správ pre konkrétnu tému s hodnotou posunu vyššou ako 0. Server skontroluje súbor denníka pre túto tému a vráti tri nové správy . Spotrebiteľ spracuje správy a potom pošle žiadosť o správy s posunom vyššie ako 3 a tak ďalej.

V Kafke je klient zodpovedný za zapamätanie počtu ofsetov a načítanie správ. Server Kafka nesleduje ani nespravuje spotrebu správ. Server Kafka predvolene uchová správu sedem dní. Vlákno na pozadí na serveri kontroluje a odstraňuje správy staršie ako sedem dní. Spotrebiteľ má prístup k správam, pokiaľ sú na serveri. Môže prečítať správu niekoľkokrát a dokonca aj správy prečítať v opačnom poradí od prijatia. Ak sa však spotrebiteľovi nepodarí načítať správu skôr, ako uplynie sedem dní, bude mu táto správa chýbať.

Kafkove referenčné hodnoty

Produkčné využitie spoločnosťou LinkedIn a ďalšími podnikmi ukázalo, že pri správnej konfigurácii je Apache Kafka schopný denne spracovať stovky gigabajtov dát. V roku 2011 použili traja inžinieri spoločnosti LinkedIn testovacie testy na preukázanie toho, že Kafka môže dosiahnuť oveľa vyššiu priepustnosť ako ActiveMQ a RabbitMQ.

Rýchle nastavenie a ukážka aplikácie Apache Kafka

V tomto tutoriále si zostavíme vlastnú aplikáciu, ale začnime inštaláciou a testovaním inštancie Kafka s out-of-the-box výrobcom a spotrebiteľom.

  1. Navštívte stránku na stiahnutie Kafky a nainštalujte si najnovšiu verziu (od tohto písania 0,9).
  2. Extrahujte binárne súbory do a softvér / kafka priečinok. Pre aktuálnu verziu je to softvér / kafka_2.11-0.9.0.0.
  3. Zmeňte svoj aktuálny adresár tak, aby ukazoval na nový priečinok.
  4. Spustením servera Zookeeper vykonajte príkaz: bin / zookeeper-server-start.sh config / zookeeper.properties.
  5. Spustite server Kafka vykonaním: bin / kafka-server-start.sh config / server.properties.
  6. Vytvorte testovaciu tému, ktorú môžete použiť na testovanie: bin / kafka-topics.sh --create --zookeeper localhost: 2181 --replication-factor 1 --partitions 1 --topic javaworld.
  7. Začnite jednoduchého konzolového spotrebiteľa, ktorý dokáže konzumovať správy zverejnené na danú tému, ako napr javaworld: bin / kafka-console-consumer.sh --zookeeper localhost: 2181 --topický javaworld - od začiatku.
  8. Spustite jednoduchú konzolu producenta, ktorá dokáže publikovať správy k testovacej téme: bin / kafka-console-producer.sh --broker-list localhost: 9092 --topický javaworld.
  9. Skúste napísať jednu alebo dve správy do konzoly výrobcu. Vaše správy by sa mali zobrazovať v spotrebiteľskej konzole.

Príklad aplikácie s Apache Kafkou

Už ste videli, ako Apache Kafka funguje po vybalení z krabice. Ďalej vyvinieme vlastnú aplikáciu výrobcu / spotrebiteľa. Producent načíta vstup používateľa z konzoly a každý nový riadok odošle ako správu na server Kafka. Spotrebiteľ stiahne správy pre danú tému a vytlačí ich na konzolu. Výrobné a spotrebiteľské komponenty sú v tomto prípade vaše vlastné implementácie kafka-console-producer.sh a kafka-console-consumer.sh.

Začnime vytvorením a Producent.java trieda. Táto trieda klienta obsahuje logiku na načítanie vstupu používateľa z konzoly a odoslanie tohto vstupu ako správy na server Kafka.

Konfigurujeme výrobcu vytvorením objektu z java.util.Vlastnosti triedy a nastavenie jej vlastností. Trieda ProducerConfig definuje všetky rôzne dostupné vlastnosti, ale predvolené hodnoty Kafky sú dostatočné na väčšinu použití. Pre predvolenú konfiguráciu musíme nastaviť iba tri povinné vlastnosti:

  • BOOTSTRAP_SERVERS_CONFIG
  • KEY_SERIALIZER_CLASS_CONFIG
  • VALUE_SERIALIZER_CLASS_CONFIG

BOOTSTRAP_SERVERS_CONFIG (bootstrap.servers) nastavuje zoznam párov hostiteľ: port použitých na nadviazanie počiatočných pripojení ku klastru Kakfa v host1: port1, host2: port2, ... formát. Aj keď máme v našom kafkovskom klastri viac ako jedného sprostredkovateľa, stačí zadať hodnotu prvého sprostredkovateľa hostiteľ: port. Klient Kafka použije túto hodnotu na uskutočnenie hovoru sprostredkovateľa, ktorý vráti zoznam všetkých sprostredkovateľov v klastri. Je dobré uviesť v zozname viac ako jedného sprostredkovateľa BOOTSTRAP_SERVERS_CONFIG, takže ak bude prvý sprostredkovateľ nefunkčný, bude môcť klient vyskúšať iných sprostredkovateľov.

Server Kafka očakáva správy v kľúč byte [], hodnota byte [] formát. Namiesto prevodu každého kľúča a hodnoty nám Kafkova knižnica na strane klienta umožňuje používať prívetivejšie typy String a int na odosielanie správ. Knižnica ich prevedie na vhodný typ. Napríklad vzorová aplikácia nemá kľúč pre konkrétnu správu, takže použijeme nulový pre kľúč. Pre hodnotu použijeme a String, čo sú údaje zadané používateľom na konzole.

Ak chcete nakonfigurovať kľúč správy, nastavili sme hodnotu KEY_SERIALIZER_CLASS_CONFIG na org.apache.kafka.common.serialization.ByteArraySerializer. Toto funguje, pretože nulový nie je potrebné prevádzať do byte []. Pre hodnota správy, nastavili sme VALUE_SERIALIZER_CLASS_CONFIG na org.apache.kafka.common.serialization.StringSerializer, pretože táto trieda vie, ako previesť a String do a byte [].

Vlastné objekty kľúč / hodnota

Podobný StringSerializer, Kafka poskytuje serializátory pre ďalšie primitívy ako napr int a dlho. Aby sme mohli pre náš kľúč alebo hodnotu použiť vlastný objekt, museli by sme vytvoriť triedu implementujúcu org.apache.kafka.common.serialization.Serializer. Potom by sme mohli pridať logiku, do ktorej by sme triedu serializovali bajt []. V našom spotrebiteľskom kóde by sme tiež museli použiť zodpovedajúci deserializátor.

Výrobca Kafka

Po naplnení Vlastnosti triedy s potrebnými konfiguračnými vlastnosťami, môžeme ju použiť na vytvorenie objektu KafkaVýrobca. Kedykoľvek potom chceme poslať správu na server Kafka, vytvoríme objekt z ProducerRecord a zavolajte na KafkaVýrobcaje poslať () metóda s týmto záznamom na odoslanie správy. The ProducerRecord má dva parametre: názov témy, do ktorej má byť správa zverejnená, a skutočná správa. Nezabudnite zavolať Producer.close () po dokončení práce s výrobcom:

Zoznam 1. KafkaProducer

 public class Producer {in static static Scanner in; public static void main (String [] argv) vyvolá výnimku {if (argv.length! = 1) {System.err.println ("Prosím, zadajte 1 parameter"); System.exit (-1); } Reťazec topicName = argv [0]; in = nový skener (System.in); System.out.println ("Zadajte správu (pre ukončenie zadajte príkaz exit)"); // Konfigurácia vlastností producenta configProperties = new Properties (); configProperties.put (ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost: 9092"); configProperties.put (ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); configProperties.put (ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); org.apache.kafka.clients.producer.Producer producer = new KafkaProducer (configProperties); Reťazec line = in.nextLine (); while (! line.equals ("exit")) {ProducerRecord rec = new ProducerRecord (topicName, line); producent.send (rec); riadok = in.nextLine (); } in.close (); producent.close (); }} 

Konfigurácia spotrebiteľa správy

Ďalej vytvoríme jednoduchého spotrebiteľa, ktorý sa prihlási na odber témy. Kedykoľvek je k téme zverejnená nová správa, správu si prečíta a vytlačí na konzolu. Spotrebiteľský kód je dosť podobný kódu výrobcu. Začneme vytvorením objektu java.util.Vlastnosti, nastavenie jeho vlastností špecifických pre spotrebiteľa a jeho následné použitie na vytvorenie nového objektu Kafkaspotrebiteľ. Trieda ConsumerConfig definuje všetky vlastnosti, ktoré môžeme nastaviť. Existujú iba štyri povinné vlastnosti:

  • BOOTSTRAP_SERVERS_CONFIG (bootstrap.servers)
  • KEY_DESERIALIZER_CLASS_CONFIG (key.deserializer)
  • VALUE_DESERIALIZER_CLASS_CONFIG (value.deserializer)
  • GROUP_ID_CONFIG (bootstrap.servers)

Rovnako ako v prípade triedy producentov, aj my použijeme BOOTSTRAP_SERVERS_CONFIG nakonfigurovať páry hostiteľ / port pre spotrebiteľskú triedu. Táto konfigurácia nám umožňuje nadviazať počiatočné pripojenia k klastru Kakfa v host1: port1, host2: port2, ... formát.

Ako som už poznamenal, server Kafka očakáva správy v byte [] kľúč a bajt [] formátov hodnoty a má vlastnú implementáciu na serializáciu rôznych typov do bajt []. Rovnako ako v prípade výrobcu, aj na strane spotrebiteľa budeme musieť na konverziu použiť vlastný deserializátor byte [] späť do príslušného typu.

V prípade ukážkovej aplikácie vieme, že výrobca používa ByteArraySerializer pre kľúč a StringSerializer pre hodnotu. Na strane klienta teda musíme použiť org.apache.kafka.common.serialization.ByteArrayDeserializer pre kľúč a org.apache.kafka.common.serialization.StringDeserializer pre hodnotu. Nastavenie týchto tried ako hodnôt pre KEY_DESERIALIZER_CLASS_CONFIG a VALUE_DESERIALIZER_CLASS_CONFIG umožní spotrebiteľovi deserializáciu bajt [] kódované typy zasielané výrobcom.

Nakoniec musíme nastaviť hodnotu GROUP_ID_CONFIG. Toto by mal byť názov skupiny vo formáte reťazca. Viac o tejto konfigurácii vysvetlím za minútu. Zatiaľ sa pozrite na spotrebiteľa Kafka so štyrmi nastavenými povinnými vlastnosťami:

$config[zx-auto] not found$config[zx-overlay] not found