Programovanie

Ako používať Redis na spracovanie toku v reálnom čase

Roshan Kumar je senior produktový manažér v Redis Labs.

Prijímanie údajov v reálnom čase je bežnou požiadavkou pre veľa prípadov použitia veľkých dát. V oblastiach ako IoT, elektronický obchod, bezpečnosť, komunikácia, zábava, financie a maloobchod, kde toľko závisí od včasného a presného rozhodovania na základe údajov, je zber a analýza údajov v reálnom čase v skutočnosti jadrom podnikania.

Zhromažďovanie, ukladanie a spracovanie streamovaných údajov vo veľkých objemoch a vysokou rýchlosťou však predstavuje architektonické výzvy. Dôležitým prvým krokom pri poskytovaní analýzy údajov v reálnom čase je zabezpečenie toho, aby boli k dispozícii adekvátne sieťové, výpočtové, úložné a pamäťové zdroje na zachytenie rýchlych dátových tokov. Softvérový zásobník spoločnosti však musí zodpovedať výkonu jej fyzickej infraštruktúry. V opačnom prípade budú podniky čeliť obrovskému nahromadeniu údajov, v horšom prípade chýbajúcim alebo neúplným údajom.

Redis sa stal populárnou voľbou pre také scenáre rýchleho príjmu dát. Ľahká databázová platforma v pamäti, Redis dosahuje priepustnosť v miliónoch operácií za sekundu s latenciou nižšou ako milisekunda pri súčasnom využití minimálnych zdrojov. Ponúka tiež jednoduché implementácie, ktoré umožňuje viacero dátových štruktúr a funkcií.

V tomto článku ukážem, ako môže Redis Enterprise vyriešiť bežné problémy spojené s prijímaním a spracovaním veľkého množstva údajov vysokej rýchlosti. Prejdeme si tri rôzne prístupy (vrátane kódu) k spracovaniu informačného kanála Twitter v reálnom čase, a to pomocou Redis Pub / Sub, Redis Lists a Redis Sorted Sets. Ako uvidíme, pri rýchlom prijímaní údajov majú v závislosti od prípadu použitia úlohu všetky tri spôsoby.

Výzvy pri navrhovaní rýchlych riešení pre príjem dát

Vysokorýchlostné prijímanie údajov často zahŕňa niekoľko rôznych typov zložitosti:

  • Veľké objemy dát niekedy prichádzajúce nárazovo. Bursty data vyžaduje riešenie, ktoré je schopné spracovať veľké objemy dát s minimálnou latenciou. V ideálnom prípade by malo byť schopné vykonávať milióny zápisov za sekundu s latenciou milisekúnd, s využitím minimálnych zdrojov.
  • Údaje z viacerých zdrojov. Riešenia na príjem dát musia byť dostatočne flexibilné na to, aby zvládli dáta v mnohých rôznych formátoch, v prípade potreby si zachovali identitu zdroja a transformovali sa alebo sa normalizovali v reálnom čase.
  • Údaje, ktoré je potrebné filtrovať, analyzovať alebo poslať ďalej. Väčšina riešení na príjem dát má jedného alebo viacerých predplatiteľov, ktorí dáta konzumujú. Často ide o rôzne aplikácie, ktoré fungujú na rovnakom alebo odlišnom mieste s rôznym súborom predpokladov. V takýchto prípadoch musí databáza nielen transformovať údaje, ale aj filtrovať alebo agregovať v závislosti od požiadaviek náročných aplikácií.
  • Údaje pochádzajú z geograficky distribuovaných zdrojov. V tomto scenári je často vhodné distribuovať uzly zhromažďovania údajov a umiestniť ich blízko k zdrojom. Samotné uzly sa stávajú súčasťou rýchleho riešenia na príjem dát, na zhromažďovanie, spracovanie, preposielanie alebo presmerovanie údajov na príjem.

Spracovanie rýchleho príjmu dát v Redis

Mnoho riešení podporujúcich rýchle prijímanie dát je dnes zložitých, bohatých na funkcie a prepracovaných pre jednoduché požiadavky. Redis je na druhej strane mimoriadne ľahký, rýchly a ľahko použiteľný. Vďaka klientom dostupným vo viac ako 60 jazykoch je možné Redis ľahko integrovať do populárnych softvérových balíkov.

Redis ponúka dátové štruktúry, ako sú zoznamy, sady, zoradené sady a haše, ktoré ponúkajú jednoduché a všestranné spracovanie údajov. Redis poskytuje viac ako milión operácií čítania a zápisu za sekundu s latenciou milisekund pri komoditnej cloudovej inštancii so skromnou veľkosťou, čo ju robí mimoriadne efektívnou pre veľké objemy dát. Redis tiež podporuje služby zasielania správ a klientske knižnice vo všetkých populárnych programovacích jazykoch, takže je vhodný na kombináciu vysokorýchlostného príjmu dát a analýzy v reálnom čase. Príkazy Redis Pub / Sub mu umožňujú hrať úlohu sprostredkovateľa správ medzi vydavateľmi a predplatiteľmi, čo je funkcia často používaná na odosielanie oznámení alebo správ medzi uzlami príjmu distribuovaných údajov.

Redis Enterprise vylepšuje Redis plynulým škálovaním, nepretržitou dostupnosťou, automatickým nasadením a schopnosťou využívať nákladovo efektívnu pamäť typu flash ako predlžovač RAM, aby bolo možné nákladovo efektívne spracovať veľké súbory údajov.

V nasledujúcich častiach načrtnem, ako používať Redis Enterprise na riešenie bežných výziev pri prijímaní údajov.

Znova pri rýchlosti Twitteru

Na ilustráciu jednoduchosti Redisu preskúmame ukážkové rýchle riešenie prijímania údajov, ktoré zhromažďuje správy z informačného kanála Twitter. Cieľom tohto riešenia je spracovať tweety v reálnom čase a pri ich spracovaní ich tlačiť dolu potrubím.

Údaje z Twitteru absorbované riešením sú potom spotrebované viacerými procesormi v rade. Ako je znázornené na obrázku 1, tento príklad sa zaoberá dvoma procesormi - anglickým procesorom Tweet a procesorom Influencer. Každý procesor tweety filtruje a odovzdáva ich príslušnými kanálmi ďalším spotrebiteľom. Tento reťazec môže ísť až tak ďaleko, ako si to vyžaduje riešenie. V našom príklade sa však zastavíme na tretej úrovni, kde agregujeme populárne diskusie medzi anglicky hovoriacimi a najlepšími influencermi.

Redis Labs

Upozorňujeme, že používame príklad spracovania informačných kanálov služby Twitter z dôvodu rýchlosti a príchodu dát. Upozorňujeme tiež, že dáta z Twitteru sa k nášmu rýchlemu príjmu dát dostanú prostredníctvom jedného kanála. V mnohých prípadoch, ako napríklad IoT, môže byť viac zdrojov údajov, ktoré odosielajú údaje do hlavného prijímača.

Existujú tri možné spôsoby, ako implementovať toto riešenie pomocou Redis: ingest s Redis Pub / Sub, ingest s dátovou štruktúrou zoznamu alebo ingest s dátovou štruktúrou zoradenej sady. Pozrime sa na každú z týchto možností.

Požitie s Redis Pub / Sub

Toto je najjednoduchšia implementácia rýchleho príjmu dát. Toto riešenie využíva funkciu Pub / Sub spoločnosti Redis, ktorá umožňuje aplikáciám publikovať a predplatiť správy. Ako je znázornené na obrázku 2, každá fáza spracováva údaje a zverejňuje ich na kanáli. Nasledujúca fáza sa prihlási na odber kanála a prijíma správy na ďalšie spracovanie alebo filtrovanie.

Redis Labs

Pros

  • Ľahko implementovateľné.
  • Funguje dobre, keď sú zdroje údajov a procesory distribuované geograficky.

Zápory

  • Riešenie vyžaduje, aby vydavatelia a predplatitelia boli neustále v strehu. Predplatitelia stratia dáta, keď ich zastavia alebo stratia pripojenie.
  • Vyžaduje viac pripojení. Program nemôže zverejniť a prihlásiť sa na odber rovnakého pripojenia, takže každý sprostredkujúci procesor údajov vyžaduje dve pripojenia - jedno na prihlásenie na odber a druhé na zverejnenie. Ak bežíte Redis na platforme DBaaS, je dôležité overiť, či má váš balíček alebo úroveň služby nejaké obmedzenia počtu pripojení.

Poznámka o spojeniach

Ak sa na odber kanála prihlási viac ako jeden klient, spoločnosť Redis postúpi údaje každému klientovi lineárne, jeden po druhom. Veľké užitočné zaťaženie dát a veľa pripojení môže zaviesť latenciu medzi vydavateľom a jeho predplatiteľmi. Aj keď je predvolený pevný limit pre maximálny počet pripojení 10 000, musíte otestovať a porovnať, koľko pripojení je vhodných pre vaše užitočné zaťaženie.

Redis udržuje výstupnú medzipamäť klienta pre každého klienta. Predvolené limity pre výstupný buffer klienta pre Pub / Sub sú nastavené ako:

limit klientskeho výstupu-medzipamäte pubsub 32mb 8mb 60

S týmto nastavením Redis prinúti klientov odpojiť sa za dvoch podmienok: ak výstupná vyrovnávacia pamäť rastie nad 32 MB alebo ak výstupná vyrovnávacia pamäť uchováva nepretržite 8 MB dát na 60 sekúnd.

To naznačuje, že klienti dáta spotrebúvajú pomalšie, ako sú zverejnené. Ak by takáto situácia nastala, najskôr sa pokúste optimalizovať spotrebiteľov tak, aby pri konzumácii údajov nepridávali latenciu. Ak zistíte, že sa vaši klienti stále odpojujú, môžete zvýšiť limity pre pubsub client-output-buffer-limit pubsub majetok v redis.conf. Nezabudnite, že akékoľvek zmeny nastavení môžu zvýšiť latenciu medzi vydavateľom a predplatiteľom. Všetky zmeny musia byť dôkladne otestované a overené.

Návrh kódu pre riešenie Redis Pub / Sub

Redis Labs

Toto je najjednoduchšie z troch riešení opísaných v tomto dokumente. Tu sú dôležité triedy Java implementované pre toto riešenie. Stiahnite si zdrojový kód s plnou implementáciou tu: //github.com/redislabsdemo/IngestPubSub.

The Predplatiteľ trieda je základnou triedou tohto dizajnu. Každý Predplatiteľ objekt udržuje nové spojenie s Redis.

class Subscriber extends JedisPubSub implementuje Runnable {

súkromné ​​meno reťazca;

private RedisConnection conn = null;

súkromný Jedis jedis = null;

private String subscriberChannel;

public Subscriber (String subscriberName, String channelName) throws Exception {

meno = subscriberName;

subscriberChannel = channelName;

Vlákno t = nové vlákno (toto);

t.start ();

       }

@ Override

public void run () {

skúsiť {

conn = RedisConnection.getRedisConnection ();

jedis = conn.getJedis ();

while (true) {

jedis.subscribe (this, this.subscriberChannel);

                      }

} úlovok (výnimka e) {

e.printStackTrace ();

              }

       }

@ Override

public void onMessage (reťazcový kanál, reťazcová správa) {

super.onMessage (kanál, správa);

       }

}

The Vydavateľ trieda udržiava samostatné pripojenie k Redisu na publikovanie správ na kanál.

public class Publisher {

RedisConnection conn = null;

Jedis jedis = null;

súkromný reťazcový kanál;

public Publisher (String channelName) vyvolá výnimku {

channel = channelName;

conn = RedisConnection.getRedisConnection ();

jedis = conn.getJedis ();

       }

public void publish (String msg) throws Exception {

jedis.publish (kanál, správa);

       }

}

The EnglishTweetFilter, InfluencerTweetFilter, HashTagCollectora InfluencerCollector filtre rozširujú Predplatiteľ, čo im umožňuje počúvať prichádzajúce kanály. Pretože na prihlásenie a zverejnenie potrebujete samostatné pripojenia Redis, každá trieda filtra má svoje vlastné RedisConnection objekt. Filtre počúvajú nové správy na ich kanáloch v slučke. Tu je vzorový kód EnglishTweetFilter trieda:

verejná trieda EnglishTweetFilter rozširuje predplatiteľa

{

private RedisConnection conn = null;

súkromný Jedis jedis = null;

private String publisherChannel = null;

public EnglishTweetFilter (názov reťazca, reťazec subscriberChannel, reťazec publisherChannel) vyvolá výnimku {

super (meno, subscriberChannel);

this.publisherChannel = publisherChannel;

conn = RedisConnection.getRedisConnection ();

jedis = conn.getJedis ();

       }

@ Override

public void onMessage (String subscriberChannel, String message) {

JsonParser jsonParser = nový JsonParser ();

JsonElement jsonElement = jsonParser.parse (správa);

JsonObject jsonObject = jsonElement.getAsJsonObject ();

// filtrovať správy: zverejňovať iba anglické tweety

if (jsonObject.get („lang“)! = null &&

jsonObject.get („lang“). getAsString (). equals („en“)) {

jedis.publish (publisherChannel, správa);

              }

       }

}

The Vydavateľ trieda má metódu publikovania, ktorá publikuje správy na požadovaný kanál.

public class Publisher {

.

.     

public void publish (String msg) throws Exception {

jedis.publish (kanál, správa);

       }

.

}

Hlavná trieda načíta údaje z toku príjmu a zverejní ich v AllData kanál. Hlavná metóda tejto triedy spúšťa všetky objekty filtra.

verejná trieda IngestPubSub

{

.

public void start () vyvolá výnimku {

       .

       .

publisher = nový vydavateľ („AllData“);

englishFilter = nový EnglishTweetFilter („Anglický filter“, „AllData“,

„EnglishTweets“);

influencerFilter = nový InfluencerTweetFilter („Filter vplyvcov“,

„AllData“, „InfluencerTweets“);

hashtagCollector = nový HashTagCollector („zberateľ Hashtag“,

„EnglishTweets“);

influencerCollector = nový InfluencerCollector („Influencer Collector“,

„InfluencerTweets“);

       .

       .

}

Pohltiť pomocou zoznamov Redis

Vďaka dátovej štruktúre Zoznam v Redis je implementácia riešenia čakania v rade ľahká a jednoduchá. V tomto riešení producent posúva každú správu do zadnej časti frontu a predplatiteľ dopytuje frontu a sťahuje nové správy z druhého konca.

Redis Labs

Pros

  • Táto metóda je spoľahlivá v prípade straty spojenia. Len čo sa údaje vložia do zoznamov, uchová sa ich tam, kým si ich predplatitelia neprečítajú. To platí, aj keď sú predplatitelia zastavení alebo stratia spojenie so serverom Redis.
  • Výrobcovia a spotrebitelia medzi nimi nevyžadujú nijaké spojenie.

Zápory

  • Po vytiahnutí údajov zo zoznamu sa odstránia a nemožno ich znova načítať. Pokiaľ spotrebitelia nepretrvávajú údaje, stratia sa ihneď po ich spotrebovaní.
  • Každý spotrebiteľ vyžaduje samostatný rad, ktorý vyžaduje uloženie viacerých kópií údajov.

Návrh kódu pre riešenie Redis Lists

Redis Labs

Zdrojový kód riešenia Redis Lists si môžete stiahnuť tu: //github.com/redislabsdemo/IngestList. Hlavné triedy tohto riešenia sú vysvetlené nižšie.

MessageList vloží dátovú štruktúru Redis List. The tam() metóda posúva novú správu doľava od poradia a pop () čaká na novú správu sprava, ak je rad prázdny.

verejná trieda MessageList {

protected String name = “MyList”; // Názov

.

.     

public void push (String msg) vyvolá výnimku {

jedis.lpush (meno, správa); // Stlačenie doľava

       }

public String pop () vyvolá výnimku {

návrat jedis.brpop (0, meno) .toString ();

       }

.

.

}

MessageListener je abstraktná trieda, ktorá implementuje logiku poslucháča a vydavateľa. A MessageListener objekt počúva iba jeden zoznam, ale môže publikovať na viacerých kanáloch (MessageFilter predmety). Toto riešenie si vyžaduje samostatný MessageFilter objekt pre každého predplatiteľa.

trieda MessageListener implementuje Runnable {

private String name = null;

private MessageList inboundList = null;

Map outBoundMsgFilters = nový HashMap ();

.

.     

public void registerOutBoundMessageList (MessageFilter msgFilter) {

if (msgFilter! = null) {

if (outBoundMsgFilters.get (msgFilter.name) == null) {

outBoundMsgFilters.put (msgFilter.name, msgFilter);

                      }

              }

       }

.

.

@ Override

public void run () {

.

while (true) {

Reťazcová správa = inboundList.pop ();

processMessage (správa);

                      }                                  

.

       }

.

Protected Void pushMessage (String msg) throws Exception {

Set outBoundMsgNames = outBoundMsgFilters.keySet ();

pre (Názov reťazca: outBoundMsgNames) {

MessageFilter msgList = outBoundMsgFilters.get (name);

msgList.filterAndPush (msg);

              }

       }

}

MessageFilter je rodičovská trieda uľahčujúca filterAndPush () metóda. Keď údaje prechádzajú systémom ingest, často sa filtrujú alebo transformujú pred odoslaním do ďalšej fázy. Triedy, ktoré rozširujú MessageFilter triedy prepísať filterAndPush () metódu a implementovať svoju vlastnú logiku, aby sa filtrovaná správa presunula na ďalší zoznam.

verejná trieda MessageFilter {

MessageList messageList = null;

.

.

public void filterAndPush (správa reťazca) vyvolá výnimku {

messageList.push (správa);

       }

.

.     

}

AllTweetsListener je ukážka implementácie a MessageListener trieda. Toto počúva všetky tweety na serveri AllData kanál a zverejňuje údaje do EnglishTweetsFilter a InfluencerFilter.

verejná trieda AllTweetsListener rozširuje MessageListener {

.

.     

public static void main (String [] args) vyvolá výnimku {

MessageListener allTweetsProcessor = AllTweetsListener.getInstance ();

allTweetsProcessor.registerOutBoundMessageList (nový

EnglishTweetsFilter („EnglishTweetsFilter“, „EnglishTweets“));

allTweetsProcessor.registerOutBoundMessageList (nový

InfluencerFilter („InfluencerFilter“, „Influencer“));

allTweetsProcessor.start ();

       }

.

.

}

EnglishTweetsFilter predlžuje MessageFilter. Táto trieda implementuje logiku na výber iba tých tweetov, ktoré sú označené ako anglické tweety. Filter zahodí neanglické tweety a anglické tweety presunie na ďalší zoznam.

verejná trieda EnglishTweetsFilter rozširuje MessageFilter {

public EnglishTweetsFilter (názov reťazca, reťazec listName) vyvolá výnimku {

super (meno, listName);

       }

@ Override

public void filterAndPush (reťazcová správa) vyvolá výnimku {

JsonParser jsonParser = nový JsonParser ();

JsonElement jsonElement = jsonParser.parse (správa);

JsonArray jsonArray = jsonElement.getAsJsonArray ();

JsonObject jsonObject = jsonArray.get (1) .getAsJsonObject ();

if (jsonObject.get („lang“)! = null &&

jsonObject.get („lang“). getAsString (). equals („en“)) {

Jedis jedis = super.getJedisInstance ();

if (jedis! = null) {

jedis.lpush (super.name, jsonObject.toString ());

                             }

              }

       }

}

$config[zx-auto] not found$config[zx-overlay] not found