Programovanie

Vytvorené pre reálny čas: Posielanie veľkých dát pomocou aplikácie Apache Kafka, časť 2

V prvej polovici tohto úvodu do JavaWorld k Apache Kafka ste pomocou Kafky vyvinuli niekoľko aplikácií pre malých výrobcov / spotrebiteľov. Z týchto cvičení by ste mali ovládať základy systému správ Apache Kafka. V tejto druhej polovici sa naučíte, ako používať oddiely na rozloženie záťaže a horizontálne zväčšenie vašej aplikácie, pričom môžete manipulovať až s miliónmi správ denne. Dozviete sa tiež, ako Kafka využíva kompenzácie správ na sledovanie a správu zložitého spracovania správ a ako chrániť váš systém správ Apache Kafka pred zlyhaním, ak by spotrebiteľ spadol. Vyvinieme ukážkovú aplikáciu z časti 1 pre prípady použitia publikovania a prihlásenia na odber, ako aj použitia typu point-to-point.

Priečky v Apache Kafka

Témy v Kafke je možné rozdeliť na oddiely. Napríklad pri vytváraní témy s názvom Demo ju môžete nakonfigurovať tak, aby mala tri oddiely. Server by vytvoril tri protokolové súbory, jeden pre každú z ukážkových oblastí. Keď producent zverejnil správu k téme, pridelil by tejto správe ID oddielu. Server by potom pripojil správu k súboru protokolu iba pre tento oddiel.

Ak ste potom spustili dvoch spotrebiteľov, server by mohol priradiť oddiely 1 a 2 k prvému spotrebiteľovi a oddiel 3 k druhému spotrebiteľovi. Každý spotrebiteľ by čítal iba zo svojich priradených oddielov. Demo tému nakonfigurovanú pre tri oddiely môžete vidieť na obrázku 1.

Pre rozšírenie scenára si predstavte kafkovský klaster s dvoma maklérmi, ktorý je umiestnený v dvoch strojoch. Keď ste rozdelili ukážkovú tému, nakonfigurovali by ste ju tak, aby mala dve oddiely a dve repliky. Pre tento typ konfigurácie by server Kafka priradil tieto dve oddiely dvom sprostredkovateľom vo vašom klastri. Každý sprostredkovateľ by bol vodcom jednej z priečok.

Keď producent zverejnil správu, šla by vodcovi oddielu. Vedúci vzal správu a pripojil ju k súboru protokolu na lokálnom počítači. Druhý sprostredkovateľ by pasívne replikoval zadaný protokol do svojho vlastného počítača. Keby vedúci oddielu klesol, druhý sprostredkovateľ by sa stal novým vedúcim a začal by obslúžiť požiadavky klientov. Rovnakým spôsobom, keď spotrebiteľ poslal požiadavku na oddiel, táto požiadavka by šla najskôr vedúcemu oddielu, ktorý by vrátil požadované správy.

Výhody rozdelenia

Zvážte výhody rozdelenia systému správ založeného na Kafke:

  1. Škálovateľnosť: V systéme s iba jedným oddielom sú správy publikované k téme uložené v protokolovom súbore, ktorý existuje na jednom počítači. Počet správ pre tému sa musí zmestiť do jedného súboru denníka potvrdenia a veľkosť uložených správ nikdy nemôže byť väčšia ako diskový priestor daného stroja. Rozdelenie na jednu tému vám umožní rozšíriť váš systém ukladaním správ na rôznych počítačoch v klastri. Ak by ste napríklad chceli uložiť 30 gigabajtov (GB) správ pre Demo tému, mohli by ste zostaviť kafkovský klaster z troch strojov, každý s 10 GB miesta na disku. Potom by ste tému nakonfigurovali tak, aby mala tri oddiely.
  2. Vyrovnávanie zaťaženia servera: Viaceré oddiely vám umožňujú rozšíriť žiadosti o správy medzi sprostredkovateľov. Napríklad, ak ste mali tému, ktorá spracovala 1 milión správ za sekundu, môžete ju rozdeliť na 100 oddielov a pridať do svojho klastra 100 sprostredkovateľov. Každý sprostredkovateľ by bol vedúcim samostatného oddielu, ktorý by bol zodpovedný za reagovanie na iba 10 000 požiadaviek klientov za sekundu.
  3. Vyrovnávanie spotrebiteľského zaťaženia: Podobne ako pri vyrovnávaní zaťaženia servera, aj hosťovanie viacerých spotrebiteľov na rôznych počítačoch vám umožňuje rozložiť zaťaženie spotrebiteľov. Povedzme, že ste chceli spotrebovať 1 milión správ za sekundu z témy so 100 oddielmi. Môžete vytvoriť 100 spotrebiteľov a prevádzkovať ich paralelne. Server Kafka by každému spotrebiteľovi pridelil jeden oddiel a každý spotrebiteľ by paralelne spracoval 10 000 správ. Pretože Kafka priraďuje každý oddiel iba jednému spotrebiteľovi, v rámci oddielu by sa každá správa spotrebovala v poradí.

Dva spôsoby rozdelenia

Producent je zodpovedný za rozhodnutie, do ktorej oblasti bude správa smerovať. Výrobca má dve možnosti kontroly tohto zadania:

  • Vlastný rozdeľovač: Môžete vytvoriť triedu implementujúcu org.apache.kafka.clients.producer.Partitioner rozhranie. Tento zvyk Rozdeľovač implementuje obchodnú logiku pri rozhodovaní o tom, kam sa správy budú odosielať.
  • DefaultPartitioner: Ak nevytvoríte vlastnú triedu oddielov, potom je predvolene org.apache.kafka.clients.producer.internals.DefaultPartitioner triedy sa použije. Predvolený oddiel je pre väčšinu prípadov dosť dobrý a poskytuje tri možnosti:
    1. Manuálny: Keď vytvoríte a ProducerRecord, použite preťažený konštruktor nový ProducerRecord (topicName, partitionId, messageKey, message) na určenie ID oddielu.
    2. Hašovanie (citlivé na lokalitu): Keď vytvoríte a ProducerRecord, uveďte a messageKey, volaním nový ProducerRecord (topicName, messageKey, message). DefaultPartitioner použije hash kľúča na zabezpečenie toho, aby všetky správy pre rovnaký kľúč smerovali k rovnakému producentovi. Toto je najjednoduchší a najbežnejší prístup.
    3. Striekanie (náhodné vyvažovanie záťaže): Ak nechcete mať kontrolu nad správami oddielov, do ktorých idete, jednoducho zavolajte nový ProducerRecord (topicName, správa) vytvoriť svoj ProducerRecord. V takom prípade oddeľovač odošle správy na všetky oddiely spôsobom „každý s každým“, čím sa zabezpečí vyvážené zaťaženie servera.

Rozdelenie aplikácie Apache Kafka

Pre jednoduchý príklad výrobcu / spotrebiteľa v časti 1 sme použili a DefaultPartitioner. Teraz skúsime vytvoriť vlastný oddiel. V tomto príklade predpokladajme, že máme maloobchodnú stránku, ktorú môžu spotrebitelia používať na objednávanie produktov kdekoľvek na svete. Podľa používania vieme, že väčšina spotrebiteľov je buď v USA alebo v Indii. Chceme rozdeliť našu aplikáciu na odosielanie objednávok z USA alebo Indie ich vlastným zákazníkom, zatiaľ čo objednávky odkiaľkoľvek pôjdu tretiemu spotrebiteľovi.

Na začiatok vytvoríme CountryPartitioner ktorý vykonáva org.apache.kafka.clients.producer.Partitioner rozhranie. Musíme implementovať nasledujúce metódy:

  1. Ozve sa Kafka Konfigurovať () keď inicializujeme Rozdeľovač triedy, s a Mapa konfiguračných vlastností. Táto metóda inicializuje funkcie špecifické pre obchodnú logiku aplikácie, napríklad pripojenie k databáze. V tomto prípade chceme pomerne všeobecný rozdeľovač, ktorý bude trvať názov krajiny ako majetok. Potom môžeme použiť configProperties.put ("partitions.0", "USA") na zmapovanie toku správ do oddielov. V budúcnosti môžeme pomocou tohto formátu zmeniť, ktoré krajiny dostanú svoj vlastný oddiel.
  2. The Výrobca Hovory API oddiel () raz za každú správu. V takom prípade ho použijeme na prečítanie správy a analýzu názvu krajiny zo správy. Ak je názov krajiny v countryToPartitionMap, vráti sa partitionId uložené v Mapa. Ak nie, bude hashovať hodnotu krajiny a použije ju na výpočet, do ktorého oddielu má ísť.
  3. Voláme Zavrieť() vypnúť prepážku. Použitím tejto metódy sa zabezpečí, že všetky prostriedky získané počas inicializácie sa vyčistia počas vypínania.

Všimnite si, že keď volá Kafka Konfigurovať (), producent Kafka odovzdá všetky vlastnosti, ktoré sme nakonfigurovali pre producenta Rozdeľovač trieda. Je nevyhnutné, aby sme čítali iba tie vlastnosti, ktoré začínajú priečky., rozoberte ich a získajte partitionIda ID uložte do countryToPartitionMap.

Ďalej uvádzame našu vlastnú implementáciu Rozdeľovač rozhranie.

Zoznam 1. CountryPartitioner

 verejná trieda CountryPartitioner implementuje Partitioner {private static Map countryToPartitionMap; public void configure (konfigurácie mapy) {System.out.println ("Inside CountryPartitioner.configure" + konfigurácie); countryToPartitionMap = nový HashMap (); pre (položka Map.Entry: configs.entrySet ()) {if (entry.getKey (). startsWith ("partitions.")) {String keyName = entry.getKey (); Reťazcová hodnota = (Reťazec) entry.getValue (); System.out.println (keyName.substring (11)); int paritionId = Integer.parseInt (keyName.substring (11)); countryToPartitionMap.put (hodnota, paritionId); }}} verejný int oddiel (téma reťazca, kľúč objektu, byte [] keyBytes, hodnota objektu, byte [] valueBytes, klastrový klaster) {vypísať oddiely = cluster.availablePartitionsForTopic (téma); String valueStr = (String) hodnota; Reťazec countryName = ((hodnota reťazca)) .split (":") [0]; if (countryToPartitionMap.containsKey (countryName)) {// Ak je krajina namapovaná na konkrétny oddiel return, vráti countryToPartitionMap.get (countryName); } else {// Ak na konkrétny oddiel nie je namapovaná žiadna krajina, rozdeľte sa medzi zvyšné oddiely int noOfPartitions = cluster.topics (). size (); návratová hodnota.hashCode ()% noOfPartitions + countryToPartitionMap.size (); }} public void close () {}} 

The Výrobca trieda v zozname 2 (nižšie) je veľmi podobná nášmu jednoduchému producentovi z časti 1, s dvoma zmenami vyznačenými tučne:

  1. Nastavili sme vlastnosť config s kľúčom rovným hodnote ProducerConfig.PARTITIONER_CLASS_CONFIG, ktorý sa zhoduje s plne kvalifikovaným názvom nášho CountryPartitioner trieda. Aj sme nastavili názov krajiny do partitionId, a tým mapujeme vlastnosti, ktorým chceme odovzdať CountryPartitioner.
  2. Míňame inštanciu triedy implementujúcej org.apache.kafka.clients.producer.Callback rozhranie ako druhý argument k producent.send () metóda. Klient Kafka zavolá na svoje pri dokončovaní () po úspešnom zverejnení správy pomocou metódy a RecordMetadata objekt. Tento objekt budeme môcť použiť na zistenie, do ktorého oddielu bola správa odoslaná, ako aj posun priradený zverejnenej správe.

Zoznam 2. Rozdelený producent

 public class Producer {in static static Scanner in; public static void main (String [] argv) vyvolá výnimku {if (argv.length! = 1) {System.err.println ("Prosím, zadajte 1 parameter"); System.exit (-1); } Reťazec topicName = argv [0]; in = nový skener (System.in); System.out.println ("Zadajte správu (pre ukončenie zadajte príkaz exit)"); // Konfigurácia vlastností producenta configProperties = new Properties (); configProperties.put (ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost: 9092"); configProperties.put (ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); configProperties.put (ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");  configProperties.put (ProducerConfig.PARTITIONER_CLASS_CONFIG, CountryPartitioner.class.getCanonicalName ()); configProperties.put ("partition.1", "USA"); configProperties.put ("partition.2", "India");  org.apache.kafka.clients.producer.Producer producer = new KafkaProducer (configProperties); Reťazec line = in.nextLine (); while (! line.equals ("exit")) {ProducerRecord rec = new ProducerRecord (topicName, null, line); producent. poslať (rec, new Callback () {public void onCompletion (RecordMetadata metadata, Exception exception) {System.out.println ("Správa odoslaná do témy ->" + metadata.topic () + ", parition->" + metadata.partition () + "uložené na offset->" + metadata.offset ()); ; }}); riadok = in.nextLine (); } in.close (); producent.close (); }} 

Priradenie oddielov spotrebiteľom

Server Kafka zaručuje, že oddiel je priradený iba jednému spotrebiteľovi, čím zaručuje poradie spotreby správ. Oddiel môžete priradiť manuálne alebo ho môžete nechať priradiť automaticky.

Ak vaša obchodná logika vyžaduje väčšiu kontrolu, budete musieť oddiely priradiť manuálne. V takom prípade by ste použili KafkaConsumer.assign () odovzdať zoznam oddielov, o ktoré sa zaujímal každý spotrebiteľ, na server Kakfa.

Automaticky priradené oddiely sú predvolenou a najbežnejšou voľbou. V takom prípade server Kafka priradí každému spotrebiteľovi oddiel a znova pridelí oddiely v mierke pre nových spotrebiteľov.

Povedzme, že vytvárate novú tému s tromi oddielmi. Keď začnete s prvým spotrebiteľom pre novú tému, Kafka priradí všetky tri oddiely tomu istému spotrebiteľovi. Ak potom spustíte druhého spotrebiteľa, Kafka znova pridelí všetky oddiely, pričom jeden oddiel priradí prvému spotrebiteľovi a zvyšné dva oddiely druhému spotrebiteľovi. Ak pridáte tretieho spotrebiteľa, Kafka znova priradí oddiely, aby mal každý spotrebiteľ priradený jeden oddiel. Nakoniec, ak začnete štvrtého a piateho spotrebiteľa, potom budú mať traja spotrebitelia priradený oddiel, ostatní však nebudú dostávať žiadne správy. Ak jeden z počiatočných troch oddielov vypadne, Kafka použije rovnakú logiku rozdelenia na opätovné priradenie oddielu tohto spotrebiteľa k jednému z ďalších spotrebiteľov.

$config[zx-auto] not found$config[zx-overlay] not found