Programovanie

Ako vytvárať stavové streamovacie aplikácie s Apache Flink

Fabian Hueske je členom komisie a členom PMC projektu Apache Flink a spoluzakladateľom spoločnosti Data Artisans.

Apache Flink je rámec na implementáciu stavových aplikácií na spracovanie toku a ich spúšťanie vo veľkom rozsahu na výpočtovom klastri. V predchádzajúcom článku sme skúmali, čo je stavové spracovanie streamu, aké prípady použitia adresuje a prečo by ste mali implementovať a spúšťať svoje streamovacie aplikácie pomocou Apache Flink.

V tomto článku predstavím príklady dvoch bežných prípadov použitia stavového spracovania toku a rozoberiem, ako ich možno implementovať pomocou Flinku. Prvým prípadom použitia sú aplikácie riadené udalosťami, t. J. Aplikácie, ktoré prijímajú nepretržité prúdy udalostí a na tieto udalosti používajú určitú obchodnú logiku. Druhým je prípad použitia streamovacej analýzy, kde predstavím dva analytické dotazy implementované pomocou Flink’s SQL API, ktoré agregujú streamované dáta v reálnom čase. My v spoločnosti Data Artisans poskytujeme zdrojový kód všetkých našich príkladov vo verejnom úložisku GitHub.

Predtým, ako sa ponoríme do podrobností príkladov, uvediem prúd udalostí, ktorý prijímajú príklady aplikácií, a vysvetlím, ako môžete spustiť kód, ktorý poskytujeme.

Prúd udalostí z jazdy taxíkom

Naše vzorové aplikácie sú založené na verejnom súbore údajov o jazdách taxíkom, ku ktorému došlo v New Yorku v roku 2013. Organizátori Veľkej výzvy DEBS (Medzinárodná konferencia ACM o distribuovaných systémoch založených na udalostiach v roku 2015) v roku 2015 usporiadali pôvodný súbor údajov a premenili ho na jediný súbor CSV, z ktorého čítame nasledujúcich deväť polí.

  • Medailón - identifikačné číslo taxíka MD5
  • Hack_license - identifikačné číslo súčtu MD5 taxislužby
  • Pickup_datetime - čas, keď boli cestujúci vyzdvihnutí
  • Dropoff_datetime — čas, kedy boli cestujúci vysadení
  • Pickup_longitude - zemepisná dĺžka miesta vyzdvihnutia
  • Pickup_latitude - zemepisná šírka miesta vyzdvihnutia
  • Dropoff_longitude - zemepisná dĺžka miesta odovzdania
  • Dropoff_latitude - zemepisná šírka miesta odovzdania
  • Total_amount - celková suma vyplatená v dolároch

Súbor CSV ukladá záznamy vo vzostupnom poradí podľa atribútu času odovzdania. Súbor teda možno považovať za usporiadaný denník udalostí, ktoré boli zverejnené po ukončení cesty. Ak chcete spustiť príklady, ktoré poskytujeme na GitHub, musíte si stiahnuť dátovú sadu výzvy DEBS z Disku Google.

Všetky príklady aplikácií postupne načítajú súbor CSV a prijímajú ho ako prúd udalostí jazdy taxíkom. Od tejto chvíle budú aplikácie spracúvať udalosti rovnako ako akýkoľvek iný prúd, t. J. Ako prúd, ktorý je prijímaný zo systému publikovania a prihlasovania na základe denníka, napríklad Apache Kafka alebo Kinesis. V skutočnosti je načítanie súboru (alebo iného typu pretrvávajúcich údajov) a jeho spracovanie ako prúdu základným kameňom prístupu spoločnosti Flink k zjednoteniu dávkového a streamového spracovania.

Spustenie príkladov Flink

Ako sme už spomínali, zdrojový kód našich vzorových aplikácií sme zverejnili v úložisku GitHub. Odporúčame vám rozdeliť a naklonovať úložisko. Príklady je možné ľahko vykonať z vášho IDE podľa vášho výberu; na ich spustenie nemusíte nastavovať a konfigurovať klaster Flink. Najskôr importujte zdrojový kód príkladov ako projekt Maven. Potom vykonajte hlavnú triedu aplikácie a ako programové miesto zadajte umiestnenie dátového súboru (odkaz na stiahnutie dát nájdete vyššie).

Po spustení aplikácie sa v procese JVM aplikácie spustí lokálna vložená inštancia Flink a odošle sa na jej vykonanie. Počas spustenia programu Flink a plánovania úloh úlohy uvidíte množstvo výpisov z denníka. Po spustení aplikácie sa jej výstup zapíše na štandardný výstup.

Budovanie aplikácie riadenej udalosťami vo Flinke

Teraz poďme diskutovať o našom prvom prípade použitia, ktorým je aplikácia založená na udalostiach. Aplikácie založené na udalostiach prijímajú prúdy udalostí, vykonávajú výpočty, keď sú udalosti prijímané, a môžu vydávať nové udalosti alebo spúšťať externé akcie. Je možné vytvoriť viac aplikácií riadených udalosťami ich vzájomným prepojením prostredníctvom systémov denníka udalostí, podobne ako je možné skladať veľké systémy z mikroslužieb. Aplikácie založené na udalostiach, protokoly udalostí a snímky stavu aplikácie (známe ako body uloženia vo Flink) obsahujú veľmi výkonný návrhový vzor, ​​pretože môžete resetovať ich stav a prehrať ich vstup, aby ste sa zotavili zo zlyhania, opravili chybu alebo migrovali aplikáciu do iného klastra.

V tomto článku preskúmame aplikáciu založenú na udalostiach, ktorá podporuje službu sledujúcu pracovnú dobu taxikárov. V roku 2016 sa NYC Taxi and Limousine Commission rozhodla obmedziť pracovný čas taxikárov na 12 hodinové zmeny a vyžadovať prestávku najmenej osem hodín pred začiatkom ďalšej zmeny. Radenie sa začína začiatkom prvej jazdy. Odvtedy môže vodič začať nové jazdy v rozmedzí 12 hodín. Naša aplikácia sleduje jazdy vodičov, označuje čas ich 12-hodinového okna (t. J. Čas, kedy môžu začať poslednú jazdu) a označuje jazdy, ktoré porušili dané nariadenie. Celý zdrojový kód tohto príkladu nájdete v našom úložisku GitHub.

Naša aplikácia je implementovaná pomocou rozhrania DataStream API spoločnosti Flink a KeyedProcessFunction. Rozhranie DataStream API je funkčné rozhranie API a je založené na koncepcii typových dátových tokov. A DataStream je logická reprezentácia toku udalostí typu T. Prúd sa spracuje uplatnením funkcie, ktorá produkuje ďalší dátový tok, pravdepodobne iného typu. Flink spracováva prúdy paralelne distribúciou udalostí do prúdových oddielov a aplikáciou rôznych inštancií funkcií na každý oddiel.

Nasledujúci úryvok kódu zobrazuje tok našej monitorovacej aplikácie na vysokej úrovni.

// nasávať prúd jázd taxíkom.

Jazdy DataStream = TaxiRides.getRides (env, inputPath);

DataStream notifikácie = jazdy

// stream oddielu podľa ID vodičského preukazu

.keyBy (r -> r.licenseId)

// monitoruje jazdné udalosti a generuje oznámenia

.process (new MonitorWorkTime ());

// tlačiť oznámenia

oznámenia.tlač ();

Aplikácia začne prijímať prúd udalostí jazdy taxíkom. V našom príklade sú udalosti načítané z textového súboru, analyzované a uložené v TaxiRide Objekty POJO. Aplikácia v reálnom svete by zvyčajne prijímala udalosti z frontu správ alebo denníka udalostí, napríklad Apache Kafka alebo Pravega. Ďalším krokom je zadanie kľúča TaxiRide udalosti licenseId vodiča. The keyBy prevádzka rozdelí prúd na deklarované pole tak, že všetky udalosti s rovnakým kľúčom sú spracovávané rovnakou paralelnou inštanciou nasledujúcej funkcie. V našom prípade rozdelíme na licenseId pretože chceme sledovať pracovný čas každého jednotlivého vodiča.

Ďalej aplikujeme MonitorWorkTime funkcia na rozdelenom disku TaxiRide diania. Táto funkcia sleduje jazdy na vodiča a sleduje ich posuny a doby brzdenia. Vydáva udalosti typu Tuple2, kde každá n-tica predstavuje oznámenie pozostávajúce z ID preukazu vodiča a správy. Nakoniec naša aplikácia vydá správy tak, že ich vytlačí na štandardný výstup. Aplikácia v reálnom svete by zapísala oznámenia do externej správy alebo do úložného systému, ako je Apache Kafka, HDFS alebo do databázového systému, alebo by spustila externé volanie, aby ich okamžite vytlačila.

Teraz, keď sme diskutovali o celkovom toku aplikácie, sa pozrime na MonitorWorkTime funkcia, ktorá obsahuje väčšinu skutočnej obchodnej logiky aplikácie. The MonitorWorkTime funkcia je stavová KeyedProcessFunction že prijíma TaxiRide udalosti a emituje Tuple2 záznamy. The KeyedProcessFunction rozhranie obsahuje dve metódy spracovania údajov: processElement () a onTimer (). The processElement () metóda sa volá pre každú prichádzajúcu udalosť. The onTimer () metóda sa volá, keď sa spustí predtým registrovaný časovač. Nasledujúci úryvok zobrazuje kostru súboru MonitorWorkTime funkcie a všetkého, čo je deklarované mimo metód spracovania.

verejná statická trieda MonitorWorkTime

rozširuje KeyedProcessFunction {

// časové konštanty v milisekundách

súkromné ​​statické posledné dlhé ALLOWED_WORK_TIME = 12 * 60 * 60 * 1000; // 12 hodín

súkromné ​​statické konečné dlhé REQ_BREAK_TIME = 8 * 60 * 60 * 1000; // 8 hodín

súkromné ​​statické konečné dlhé CLEAN_UP_INTERVAL = 28 * 60 * 60 * 1000; // 24 hodín

súkromný prechodný formátovač DateTimeFormatter;

// stav handle na uloženie začiatočného času zmeny

ValueState shiftStart;

@ Override

public void open (konfigurácia conf) {

// rukoväť stavu registrácie

shiftStart = getRuntimeContext (). getState (

nový ValueStateDescriptor („shiftStart“, Types.LONG));

// inicializuje formátovač času

this.formatter = DateTimeFormat.forPattern („rrrr-MM-dd HH: mm: ss“);

  }

// processElement () a onTimer () sú podrobne popísané nižšie.

}

Funkcia deklaruje niekoľko konštánt pre časové intervaly v milisekundách, formátovač času a popisovač stavu pre kľúčovaný stav, ktorý spravuje Flink. Spravovaný stav je pravidelne kontrolovaný a automaticky obnovený v prípade poruchy. Stav kľúča je usporiadaný podľa kľúča, čo znamená, že funkcia zachová jednu hodnotu na každý popisovač a kľúč. V našom prípade MonitorWorkTime funkcia udržuje a Dlhé hodnotu pre každý kľúč, t. j. pre každý licenseId. The shiftStart štát ukladá začiatočný čas zmeny vodiča. Stavová rukoväť je inicializovaná v otvorené() metóda, ktorá sa vyvolá raz pred spracovaním prvej udalosti.

Teraz sa pozrime na processElement () metóda.

@ Override

public void processElement (

Jazda taxíkom,

Kontext ctx,

Zberateľ out) hodí výnimku {

// vyhľadať čas začatia poslednej zmeny

Dlhý štartTs = shiftStart.value ();

if (startTs == null ||

startTs <ride.pickUpTime - (ALLOWED_WORK_TIME + REQ_BREAK_TIME)) {

// toto je prvá jazda novej zmeny.

startTs = jazda.pickUpTime;

shiftStart.update (startTs);

long endTs = startTs + ALLOWED_WORK_TIME;

out.collect (Tuple2.of (ride.licenseId,

„Môžete prijímať nových cestujúcich až do„ + formatter.print (endTs)));

// časovač registrácie na vyčistenie stavu za 24 hodín

ctx.timerService (). registerEventTimeTimer (startTs + CLEAN_UP_INTERVAL);

} else if (startTs <ride.pickUpTime - ALLOWED_WORK_TIME) {

// táto jazda sa začala po skončení povoleného pracovného času.

// je to porušenie predpisov!

out.collect (Tuple2.of (ride.licenseId,

„Táto jazda porušila predpisy o pracovnom čase.“));

  }

}

The processElement () pre každého sa volá metóda TaxiRide udalosť. Najskôr metóda načíta čas začatia posunu vodiča zo stavu rukoväte. Ak štát neobsahuje čas začatia (startTs == null) alebo ak posledná zmena začala viac ako 20 hodín (ALLOWED_WORK_TIME + REQ_BREAK_TIME) skôr ako aktuálna jazda, je súčasná jazda prvou jazdou novej zmeny. V obidvoch prípadoch funkcia spustí nové radenie aktualizáciou času začiatku radenia na začiatočný čas aktuálnej jazdy, vydá vodičovi správu s časom ukončenia nového radenia a zaregistruje časovač na vyčistenie štát do 24 hodín.

Ak súčasná jazda nie je prvou jazdou v novej zmene, funkcia skontroluje, či neporušuje predpis o pracovnom čase, t. J. Či začala o viac ako 12 hodín neskôr ako začiatok aktuálnej zmeny vodiča. Ak je to tak, funkcia vydá správu, ktorá informuje vodiča o priestupku.

The processElement () metóda MonitorWorkTime funkcia zaregistruje časovač na vyčistenie stavu 24 hodín po začiatku zmeny. Odstránenie stavu, ktorý už nie je potrebný, je dôležité, aby sa zabránilo zväčšeniu veľkosti stavu v dôsledku netesného stavu. Časovač sa spustí, keď čas aplikácie prejde časovou značkou časovača. V tom okamihu onTimer () metóda sa volá. Podobne ako v stave, časovače sa udržiavajú pre každý kľúč a funkcia sa dáva do kontextu priradeného kľúča pred onTimer () metóda sa volá. Preto je všetok štátny prístup smerovaný na kľúč, ktorý bol aktívny pri registrácii časovača.

Pozrime sa na onTimer () metóda MonitorWorkTime.

@ Override

public void onTimer (

dlhý časovač,

OnTimerContext ctx,

Zberateľ out) hodí výnimku {

// odstráni stav zmeny, ak už nebol zahájený nový posun.

Dlhý štartTs = shiftStart.value ();

if (startTs == timerTs - CLEAN_UP_INTERVAL) {

shiftStart.clear ();

  }

}

The processElement () metóda registruje časovače na 24 hodín po tom, čo zmena začala vyčistiť stav, ktorý už nie je potrebný. Vyčistenie štátu je jedinou logikou, ktorú onTimer () metóda realizuje. Pri spustení časovača skontrolujeme, či vodič medzičasom začal novú zmenu, t. J. Či sa zmenil čas začiatku zmeny. Ak to tak nie je, vodičovi zrušíme stav radenia.

$config[zx-auto] not found$config[zx-overlay] not found