Οι ροές στο Node.js μπορεί να είναι περίπλοκες, αλλά αξίζει να αφιερώσετε χρόνο για να τις κατανοήσετε.

Βασικά Takeaways

  • Οι ροές στο Node.js είναι ένα θεμελιώδες εργαλείο για την επεξεργασία και τη μεταφορά δεδομένων, καθιστώντας τα ιδανικά για εφαρμογές σε πραγματικό χρόνο και εφαρμογές που βασίζονται σε συμβάντα.
  • Για να δημιουργήσετε μια εγγράψιμη ροή στο Node.js, μπορείτε να χρησιμοποιήσετε τη συνάρτηση createWriteStream() της μονάδας fs, η οποία εγγράφει δεδομένα σε μια συγκεκριμένη τοποθεσία.
  • Αναγνώσιμο, εγγράψιμο, διπλό και μετασχηματισμένο είναι οι τέσσερις τύποι ροών στο Node.js, ο καθένας με τη δική του περίπτωση χρήσης και τη δική του λειτουργικότητα.

Η ροή είναι ένα θεμελιώδες εργαλείο προγραμματισμού που ασχολείται με τη ροή δεδομένων. Στον πυρήνα του, ένα ρεύμα αντιπροσωπεύει τυπικά τη διαδοχική μεταφορά byte από το ένα σημείο στο άλλο. Η επίσημη τεκμηρίωση του Node.js ορίζει τη ροή ως μια αφηρημένη διεπαφή που μπορείτε να χρησιμοποιήσετε για να εργαστείτε με δεδομένα.

Η μεταφορά δεδομένων σε υπολογιστή ή μέσω δικτύου είναι η ιδανική χρήση μιας ροής.

instagram viewer

Ροές στο Node.js

Οι ροές έχουν παίξει ουσιαστικό ρόλο στην επιτυχία του Node.js. Είναι ιδανικά για επεξεργασία δεδομένων σε πραγματικό χρόνο και εφαρμογές που βασίζονται σε συμβάντα, δύο εξέχοντα χαρακτηριστικά του περιβάλλοντος χρόνου εκτέλεσης Node.js.

Για να δημιουργήσετε μια νέα ροή στο Node.js, θα χρειαστεί να χρησιμοποιήσετε το API ροής, το οποίο λειτουργεί αποκλειστικά με Strings και Δεδομένα buffer Node.js. Το Node.js έχει τέσσερις τύπους ροών: εγγράψιμο, αναγνώσιμο, διπλό και μετασχηματισμένο.

Πώς να δημιουργήσετε και να χρησιμοποιήσετε μια ροή με δυνατότητα εγγραφής

Μια εγγράψιμη ροή σάς επιτρέπει να γράφετε ή να στέλνετε δεδομένα σε μια συγκεκριμένη τοποθεσία. Η λειτουργική μονάδα fs (σύστημα αρχείων) έχει μια κλάση WriteStream, την οποία μπορείτε να χρησιμοποιήσετε για να δημιουργήσετε μια νέα ροή με το fs.createWriteStream() λειτουργία. Αυτή η συνάρτηση δέχεται τη διαδρομή προς το αρχείο στο οποίο θέλετε να γράψετε δεδομένα, καθώς και μια προαιρετική σειρά επιλογών.

const {createWriteStream} = require("fs");

(() => {
const file = "myFile.txt";
const myWriteStream = createWriteStream(file);
let x = 0;
const writeNumber = 10000;

const writeData = () => {
while (x < writeNumber) {
const chunk = Buffer.from(`${x}, `, "utf-8");
if (x writeNumber - 1) return myWriteStream.end(chunk);
if (!myWriteStream.write(chunk)) break;
x++
}
};

writeData();
})();

Αυτός ο κωδικός εισάγει το createWriteStream() λειτουργία, η οποία η ανώνυμη συνάρτηση βέλους στη συνέχεια χρησιμοποιεί για να δημιουργήσει μια ροή που εγγράφει δεδομένα στο myFile.txt. Η ανώνυμη συνάρτηση περιέχει μια εσωτερική συνάρτηση που ονομάζεται writeData() που γράφει δεδομένα.

ο createWriteStream() Η συνάρτηση λειτουργεί με ένα buffer για να γράψει μια συλλογή αριθμών (0–9.999) στο αρχείο προορισμού. Ωστόσο, όταν εκτελείτε το παραπάνω σενάριο, δημιουργεί ένα αρχείο στον ίδιο κατάλογο που περιέχει τα ακόλουθα δεδομένα:

Η τρέχουσα συλλογή αριθμών τελειώνει στους 2.915, αλλά θα έπρεπε να περιλαμβάνει αριθμούς έως και 9.999. Αυτή η ασυμφωνία προκύπτει επειδή κάθε WriteStream χρησιμοποιεί ένα buffer που αποθηκεύει μια σταθερή ποσότητα δεδομένων κάθε φορά. Για να μάθετε ποια είναι αυτή η προεπιλεγμένη τιμή, θα πρέπει να συμβουλευτείτε το highWaterMark επιλογή.

console.log("The highWaterMark value is: " +
myWriteStream.writableHighWaterMark + " bytes.");

Η προσθήκη της γραμμής κώδικα παραπάνω στην ανώνυμη συνάρτηση θα παράγει την ακόλουθη έξοδο στο τερματικό:

Η έξοδος τερματικού δείχνει ότι η προεπιλογή highWaterMark η τιμή (η οποία είναι προσαρμόσιμη) είναι 16.384 byte. Αυτό σημαίνει ότι μπορείτε να αποθηκεύσετε μόνο κάτω από 16.384 byte δεδομένων σε αυτό το buffer κάθε φορά. Έτσι, μέχρι τον αριθμό 2.915 (συν όλα τα κόμματα και τα κενά) αντιπροσωπεύει τη μέγιστη ποσότητα δεδομένων που μπορεί να αποθηκεύσει η προσωρινή μνήμη κάθε φορά.

Η λύση στο σφάλμα buffer είναι η χρήση ενός συμβάντος ροής. Μια ροή συναντά διάφορα συμβάντα σε διαφορετικά στάδια της διαδικασίας μεταφοράς δεδομένων. ο διοχετεύω εκδήλωση είναι η κατάλληλη επιλογή για αυτήν την κατάσταση.

Στο writeData() λειτουργία παραπάνω, η κλήση προς το WriteStream's write() Η συνάρτηση επιστρέφει true εάν το κομμάτι δεδομένων (ή η εσωτερική προσωρινή μνήμη) είναι κάτω από το highWaterMark αξία. Αυτό υποδηλώνει ότι η εφαρμογή μπορεί να στείλει περισσότερα δεδομένα στη ροή. Ωστόσο, μόλις το γράφω() Η συνάρτηση επιστρέφει false ο βρόχος σπάει επειδή πρέπει να αδειάσετε το buffer.

myWriteStream.on('drain', () => {
console.log("a drain has occurred...");
writeData();
});

Εισαγωγή του διοχετεύω Ο κωδικός συμβάντος παραπάνω στην ανώνυμη συνάρτηση θα αδειάσει το Το buffer του WriteStream όταν είναι σε χωρητικότητα. Στη συνέχεια, υπενθυμίζει το writeData() μέθοδο, ώστε να μπορεί να συνεχίσει να γράφει δεδομένα. Η εκτέλεση της ενημερωμένης εφαρμογής θα παράγει την ακόλουθη έξοδο:

Θα πρέπει να σημειώσετε ότι η εφαρμογή έπρεπε να αποστραγγίσει το WriteStream buffer τρεις φορές κατά την εκτέλεσή του. Το αρχείο κειμένου παρουσίασε επίσης ορισμένες αλλαγές:

Πώς να δημιουργήσετε και να χρησιμοποιήσετε μια αναγνώσιμη ροή

Για να διαβάσετε δεδομένα, ξεκινήστε δημιουργώντας μια αναγνώσιμη ροή χρησιμοποιώντας το fs.createReadStream() λειτουργία.

const {createReadStream} = require("fs");

(() => {
const file = "myFile.txt";
const myReadStream = createReadStream(file);

myReadStream.on("open", () => {
console.log(`The read stream has successfully opened ${file}.`);
});

myReadStream.on("data", chunk => {
console.log("The file contains the following data: " + chunk.toString());
});

myReadStream.on("close", () => {
console.log("The file has been successfully closed.");
});
})();

Το παραπάνω σενάριο χρησιμοποιεί το createReadStream() μέθοδος πρόσβασης στο αρχείο που δημιούργησε ο προηγούμενος κώδικας: myFile.txt. ο createReadStream() Η συνάρτηση δέχεται μια διαδρομή αρχείου (η οποία μπορεί να έχει τη μορφή συμβολοσειράς, buffer ή URL) και πολλές προαιρετικές επιλογές ως ορίσματα.

Στην ανώνυμη συνάρτηση, υπάρχουν πολλά σημαντικά συμβάντα ροής. Ωστόσο, δεν υπάρχει κανένα σημάδι του διοχετεύω Εκδήλωση. Αυτό συμβαίνει επειδή μια αναγνώσιμη ροή αποθηκεύει δεδομένα μόνο όταν καλείτε το stream.push (κομμάτι) λειτουργούν ή χρησιμοποιήστε το αναγνώσιμος Εκδήλωση.

ο Άνοιξε Το συμβάν ενεργοποιείται όταν το fs ανοίγει το αρχείο από το οποίο θέλετε να διαβάσετε. Όταν επισυνάψετε το δεδομένα γεγονός σε μια σιωπηρά συνεχή ροή, προκαλεί τη μετάβαση του ρεύματος σε λειτουργία ροής. Αυτό επιτρέπει στα δεδομένα να περάσουν αμέσως μόλις γίνουν διαθέσιμα. Η εκτέλεση της παραπάνω εφαρμογής παράγει την ακόλουθη έξοδο:

Πώς να δημιουργήσετε και να χρησιμοποιήσετε μια ροή διπλής όψης

Μια ροή διπλής όψης υλοποιεί τόσο τις εγγράψιμες όσο και τις αναγνώσιμες διεπαφές ροής, ώστε να μπορείτε να διαβάζετε και να γράφετε σε μια τέτοια ροή. Ένα παράδειγμα είναι μια υποδοχή TCP που βασίζεται στη μονάδα δικτύου για τη δημιουργία της.

Ένας απλός τρόπος για να επιδείξετε τις ιδιότητες μιας ροής διπλής όψης είναι να δημιουργήσετε έναν διακομιστή TCP και έναν πελάτη που μεταφέρει δεδομένα.

Το αρχείο server.js

const net = require('net');
const port = 5000;
const host = '127.0.0.1';

const server = net.createServer();

server.on('connection', (socket)=> {
console.log('Connection established from client.');

socket.on('data', (data) => {
console.log(data.toString());
});

socket.write("Hi client, I am server " + server.address().address);

socket.on('close', ()=> {
console.log('the socket is closed')
});
});

server.listen(port, host, () => {
console.log('TCP server is running on port: ' + port);
});

Το αρχείο client.js

const net = require('net');
const client = new net.Socket();
const port = 5000;
const host = '127.0.0.1';

client.connect(port, host, ()=> {
console.log("connected to server!");
client.write("Hi, I'm client " + client.address().address);
});

client.on('data', (data) => {
console.log(data.toString());
client.write("Goodbye");
client.end();
});

client.on('end', () => {
console.log('disconnected from server.');
});

Θα παρατηρήσετε ότι τόσο τα σενάρια διακομιστή όσο και πελάτη χρησιμοποιούν μια αναγνώσιμη και εγγράψιμη ροή για την επικοινωνία (μεταφορά και λήψη δεδομένων). Φυσικά, η εφαρμογή διακομιστή εκτελείται πρώτη και αρχίζει να ακούει για συνδέσεις. Μόλις ξεκινήσετε τον πελάτη, συνδέεται με τον διακομιστή χρησιμοποιώντας τον αριθμό θύρας TCP.

Μετά τη δημιουργία μιας σύνδεσης, ο πελάτης ξεκινά τη μεταφορά δεδομένων γράφοντας στον διακομιστή χρησιμοποιώντας τον WriteStream. Ο διακομιστής καταγράφει τα δεδομένα που λαμβάνει στο τερματικό και, στη συνέχεια, εγγράφει δεδομένα χρησιμοποιώντας το WriteStream του. Τέλος, ο πελάτης καταγράφει τα δεδομένα που λαμβάνει, γράφει πρόσθετα δεδομένα και στη συνέχεια αποσυνδέεται από τον διακομιστή. Ο διακομιστής παραμένει ανοιχτός για να συνδεθούν άλλοι πελάτες.

Πώς να δημιουργήσετε και να χρησιμοποιήσετε μια ροή μετασχηματισμού

Οι ροές μετασχηματισμού είναι ροές διπλής όψης στις οποίες η έξοδος σχετίζεται, αλλά διαφέρει από την είσοδο. Το Node.js έχει δύο τύπους ροών Transform: zlib και crypto streams. Μια ροή zlib μπορεί να συμπιέσει ένα αρχείο κειμένου και στη συνέχεια να το αποσυμπιέσει μετά τη μεταφορά του αρχείου.

Η εφαρμογή compressFile.js

const zlib = require('zlib');
const { createReadStream, createWriteStream } = require('fs');

(() => {
const source = createReadStream('myFile.txt');
const destination = createWriteStream('myFile.txt.gz');

source.pipe(zlib.createGzip()).pipe(destination);
})();

Αυτό το απλό σενάριο παίρνει το αρχικό αρχείο κειμένου, το συμπιέζει και το αποθηκεύει στον τρέχοντα κατάλογο. Αυτή είναι μια απλή διαδικασία χάρη στην αναγνώσιμη ροή σωλήνας() μέθοδος. Οι αγωγοί ροής αφαιρούν τη χρήση προσωρινών αποθεμάτων και δεδομένων σωλήνων απευθείας από το ένα ρεύμα στο άλλο.

Ωστόσο, πριν τα δεδομένα φτάσουν στην εγγράψιμη ροή στο σενάριο, χρειάζεται μια μικρή παράκαμψη μέσω της μεθόδου createGzip() του zlib. Αυτή η μέθοδος συμπιέζει το αρχείο και επιστρέφει ένα νέο αντικείμενο Gzip το οποίο στη συνέχεια λαμβάνει η ροή εγγραφής.

Η εφαρμογή decompressFile.js

const zlib = require('zlib'); 
const { createReadStream, createWriteStream } = require('fs');
 
(() => {
const source = createReadStream('myFile.txt.gz');
const destination = createWriteStream('myFile2.txt');

source.pipe(zlib.createUnzip()).pipe(destination);
})();

Αυτό το παραπάνω σενάριο παίρνει το συμπιεσμένο αρχείο και το αποσυμπιέζει. Αν ανοίξετε το νέο myFile2.txt αρχείο, θα δείτε ότι περιέχει τα ίδια δεδομένα με το αρχικό αρχείο:

Γιατί είναι σημαντικές οι ροές;

Οι ροές ενισχύουν την αποτελεσματικότητα της μεταφοράς δεδομένων. Οι αναγνώσιμες και εγγράψιμες ροές χρησιμεύουν ως το θεμέλιο που επιτρέπει την επικοινωνία μεταξύ πελατών και διακομιστών, καθώς και τη συμπίεση και μεταφορά μεγάλων αρχείων.

Οι ροές βελτιώνουν επίσης την απόδοση των γλωσσών προγραμματισμού. Χωρίς ροές, η διαδικασία μεταφοράς δεδομένων γίνεται πιο περίπλοκη, απαιτώντας μεγαλύτερη μη αυτόματη εισαγωγή από τους προγραμματιστές και με αποτέλεσμα περισσότερα σφάλματα και προβλήματα απόδοσης.