600字范文,内容丰富有趣,生活中的好帮手!
600字范文 > 在JavaScript中多线程运行库Nexus.js详解解答(附上部分代码)

在JavaScript中多线程运行库Nexus.js详解解答(附上部分代码)

时间:2023-12-01 19:26:16

相关推荐

在JavaScript中多线程运行库Nexus.js详解解答(附上部分代码)

web前端|js教程

javascript,JavaScript,Nexus.js

web前端-js教程

这篇文章主要介绍了JavaScript多线程运行库Nexus.js的学习心得以及代码分享,有需要的朋友一起参考学习下吧。

团购网址导航源码,vscode新手指南,ubuntu是ssh,企业Tomcat知乎,爬虫周边,php传智百度网盘,枣庄seo推广价格优惠,马克斯网站源码前台不显示,中医药问答网站模板lzw

事件循环

大宗商品源码,离线安装vscode插件命令,ubuntu jre下载,查询Tomcat是否运行,爬虫科研热点,php 数组中随机抽取,谷歌seo优化流程图,学啊网网站源码,dedecms 图片网站模板lzw

没有事件循环

财务管理系统 源码,vscode怎样安装中文包,xmind ubuntu,阿里tomcat容器,sqlite获取时间,网页设计效果图教程,ftp访问阿里云服务器,wp缓存插件,前端股票框架,爬虫怎么采集,免费php空间申请,seo实习生,springboot逻辑代码,直播源码网站,js如何获取当前网页的url,dede家具免费模板,独站宝后台,页面设计模版,集分享管理系统,java游戏程序lzw

有一个带有(无锁)任务对象的线程池

每次调用setTimeout或setImmediate或创建一个Promise时,任务就排队到任务队列钟。

每当计划任务时,第一个可用的线程将选择任务并执行它。

在CPU内核上处理Promise。对Promise.all()的调用将并行的解决Promise。

ES6

支持async/await,并且推荐使用

支持for await(…)

支持解构

支持async try/catch/finally

模块

不支持CommonJS。(require(…)和module.exports)

所有模块使用ES6的import/export语法

支持动态导入通过import(‘file-or-packge’).then(…)

支持import.meta,例如:import.meta.filename以及import.meta.dirname等等

附加功能:支持直接从URL中导入,例如:

import { h } from /preact/dist/preact.esm.js;

EventEmitter

Nexus实现了基于Promise的EventEmitter类

事件处理程序在所有线程上排序,并将并行处理执行。

EventEmitter.emit(…)的返回值是一个Promise,它可以被解析为在事件处理器中返回值所构成的数组。

例如:

class EmitterTest extends Nexus.EventEmitter { constructor() { super(); for(let i = 0; i { console.log(`fired test ${i}!`); console.inspect(value); }); for(let i = 0; i `${v + i}`); }}const test = new EmitterTest();async function start() { await test.emit( est, { payload: est 1 }); console.log(first test done!); await test.emit( est, { payload: est 2 }); console.log(second test done!); const values = await test.emit( eturns-a-value, 10); console.log( hird test done, returned values are:); console.inspect(values);}start().catch(console.error);

I/O

所有输入/输出都通过三个原语完成:Device,Filter和Stream。

所有输入/输出原语都实现了EventEmitter类

要使用Device,你需要在Device之上创建一个ReadableStream或WritableStream

要操作数据,可以将Filters添加到ReadableStream或WritableStream中。

最后,使用source.pipe(…destinationStreams),然后等待source.resume()来处理数据。

所有的输入/输出操作都是使用ArrayBuffer对象完成的。

Filter试了了process(buffer)方法来处理数据。

例如:使用2个独立的输出文件将UTF-8转换为UTF6。

const startTime = Date.now(); try { const device = new Nexus.IO.FilePushDevice(enwik8); const stream = new Nexus.IO.ReadableStream(device); stream.pushFilter(new Nexus.IO.EncodingConversionFilter("UTF-8", "UTF-16LE")); const wstreams = [0,1,2,3] .map(i => new Nexus.IO.WritableStream(new Nexus.IO.FileSinkDevice(enwik16- + i))); console.log(piping...); stream.pipe(...wstreams); console.log(streaming...); await stream.resume(); await stream.close(); await Promise.all(wstreams.map(stream => stream.close())); console.log(`finished in ${(Date.now() * startTime) / 1000} seconds!`); } catch (e) { console.error(An error occurred: , e); }}start().catch(console.error);

TCP/UDP

Nexus.js提供了一个Acceptor类,负责绑定ip地址/端口和监听连接

每次收到一个连接请求,connection事件就会被触发,并且提供一个Socket设备。

每一个Socket实例是全双工的I/O设备。

你可以使用ReadableStream和WritableStream来操作Socket。

最基础的例子:(向客户端发送“Hello World”)

const acceptor = new .TCP.Acceptor();let count = 0;acceptor.on(connection, (socket, endpoint) => { const connId = count++; console.log(`connection #${connId} from ${endpoint.address}:${endpoint.port}`); const rstream = new Nexus.IO.ReadableStream(socket); const wstream = new Nexus.IO.WritableStream(socket); const buffer = new Uint8Array(13); const message = Hello World!\n; for(let i = 0; i console.log(`got message: ${buffer}`)); rstream.resume().catch(e => console.log(`client #${connId} at ${endpoint.address}:${endpoint.port} disconnected!`)); console.log(`sending greeting to #${connId}!`); wstream.write(buffer);});acceptor.bind(127.0.0.1, 10000);acceptor.listen();console.log(server ready);

Http

Nexus提供了一个.HTTP.Server类,该类基本上继承了TCPAcceptor

一些基础接口

当服务器端完成了对传入连接的基本的Http头的解析/校验时,将使用连接和同样的信息触发connection事件

每一个连接实例都又一个request和一个response对象。这些是输入/输出设备。

你可以构造ReadableStream和WritableStream来操纵request/response。

如果你通过管道连接到一个Response对象,输入的流将会使用分块编码的模式。否者,你可以使用response.write()来写入一个常规的字符串。

复杂例子:(基本的Http服务器与块编码,细节省略)

..../** * Creates an input stream from a path. * @param path * @returns {Promise} */async function createInputStream(path) { if (path.startsWith(/)) // If it starts with /, omit it. path = path.substr(1); if (path.startsWith(.)) // If it starts with ., reject it. throw new NotFoundError(path); if (path === / || !path) // If its empty, set to index.html. path = index.html; /** * `import.meta.dirname` and `import.meta.filename` replace the old CommonJS `__dirname` and `__filename`. */ const filePath = Nexus.FileSystem.join(import.meta.dirname, server_root, path); try { // Stat the target path. const {type} = await Nexus.FileSystem.stat(filePath); if (type === Nexus.FileSystem.FileType.Directory) // If its a directory, return its index.html return createInputStream(Nexus.FileSystem.join(filePath, index.html)); else if (type === Nexus.FileSystem.FileType.Unknown || type === Nexus.FileSystem.FileType.NotFound) // If its not found, throw NotFound. throw new NotFoundError(path); } catch(e) { if (e.code) throw e; throw new NotFoundError(path); } try { // First, we create a device. const fileDevice = new Nexus.IO.FilePushDevice(filePath); // Then we return a new ReadableStream created using our source device. return new Nexus.IO.ReadableStream(fileDevice); } catch(e) { throw new InternalServerError(e.message); }}/** * Connections counter. */let connections = 0;/** * Create a new HTTP server. * @type {.HTTP.Server} */const server = new .HTTP.Server();// A server error means an error occurred while the server was listening to connections.// We can mostly ignore such errors, we display them anyway.server.on(error, e => { console.error(FgRed + Bright + Server Error: + e.message + \ + e.stack, Reset);});/** * Listen to connections. */server.on(connection, async (connection, peer) => { // Start with a connection ID of 0, increment with every new connection. const connId = connections++; // Record the start time for this connection. const startTime = Date.now(); // Destructuring is supported, why not use it? const { request, response } = connection; // Parse the URL parts. const { path } = parseURL(request.url); // Here well store any errors that occur during the connection. const errors = []; // inStream is our ReadableStream file source, outStream is our response (device) wrapped in a WritableStream. let inStream, outStream; try { // Log the request. console.log(`> #${FgCyan + connId + Reset} ${Bright + peer.address}:${peer.port + Reset} ${ FgGreen + request.method + Reset} "${FgYellow}${path}${Reset}"`, Reset); // Set the Server header. response.set(Server, `nexus.js/0.1.1`); // Create our input stream. inStream = await createInputStream(path); // Create our output stream. outStream = new Nexus.IO.WritableStream(response); // Hook all `error` events, add any errors to our `errors` array. inStream.on(error, e => { errors.push(e); }); request.on(error, e => { errors.push(e); }); response.on(error, e => { errors.push(e); }); outStream.on(error, e => { errors.push(e); }); // Set content type and request status. response .set(Content-Type, mimeType(path)) .status(200); // Hook input to output(s). const disconnect = inStream.pipe(outStream); try { // Resume our file stream, this causes the stream to switch to HTTP chunked encoding. // This will return a promise that will only resolve after the last byte (HTTP chunk) is written. await inStream.resume(); } catch (e) { // Capture any errors that happen during the streaming. errors.push(e); } // Disconnect all the callbacks created by `.pipe()`. return disconnect(); } catch(e) { // If an error occurred, push it to the array. errors.push(e); // Set the content type, status, and write a basic message. response .set(Content-Type, ext/plain) .status(e.code || 500) .send(e.message || An error has occurred.); } finally { // Close the streams manually. This is important because we may run out of file handles otherwise. if (inStream) await inStream.close(); if (outStream) await outStream.close(); // Close the connection, has no real effect with keep-alive connections. await connection.close(); // Grab the responses status. let status = response.status(); // Determine what colour to output to the terminal. const statusColors = { 200: Bright + FgGreen, // Green for 200 (OK), 404: Bright + FgYellow, // Yellow for 404 (Not Found) 500: Bright + FgRed // Red for 500 (Internal Server Error) }; let statusColor = statusColors[status]; if (statusColor) status = statusColor + status + Reset; // Log the connection (and time to complete) to the console. console.log(` error.message).join(, ) + Reset : Reset)); }});/** * IP and port to listen on. */const ip = .0.0.0, port = 3000;/** * Whether or not to set the `reuse` flag. (optional, default=false) */const portReuse = true;/** * Maximum allowed concurrent connections. Default is 128 on my system. (optional, system specific) * @type {number} */const maxConcurrentConnections = 1000;/** * Bind the selected address and port. */server.bind(ip, port, portReuse);/** * Start listening to requests. */server.listen(maxConcurrentConnections);/** * Happy streaming! */console.log(FgGreen + `Nexus.js HTTP server listening at ${ip}:${port}` + Reset);

基准

我想我已经涵盖了到目前为止所实现的一切。那么现在我们来谈谈性能。

这里是上诉Http服务器的当前基准,有100个并发连接和总共10000个请求:

This is ApacheBench, Version 2.3 Copyright 1996 Adam Twiss, Zeus Technology Ltd, /Licensed to The Apache Software Foundation, /Benchmarking localhost (be patient).....doneServer Software: nexus.js/0.1.1Server Hostname: localhostServer Port:3000Document Path:/Document Length: 8673 bytesConcurrency Level: 100Time taken for tests: 9.991 secondsComplete requests: 10000Failed requests: 0Total transferred: 87880000 bytesHTML transferred: 86730000 bytesRequests per second: 1000.94 [#/sec] (mean)Time per request: 99.906 [ms] (mean)Time per request: 0.999 [ms] (mean, across all concurrent requests)Transfer rate:8590.14 [Kbytes/sec] receivedConnection Times (ms) min mean[+/-sd] median maxConnect: 0 0 0.1 0 1Processing: 6 99 36.6 84 464Waiting: 5 99 36.4 84 463Total:6 100 36.6 84 464Percentage of the requests served within a certain time (ms) 50% 84 66% 97 75% 105 80% 112 90% 134 95% 188 98% 233 99% 238 100% 464 (longest request)

每秒1000个请求。在一个老的i7上,上面运行了包括这个基准测试软件,一个占用了5G内存的IDE,以及服务器本身。

voodooattack@voodooattack:~$ cat /proc/cpuinfo processor : 0vendor_id : GenuineIntelcpu family : 6model : 60model name : Intel(R) Core(TM) i7-4770 CPU @ 3.40GHzstepping : 3microcode : 0x22cpu MHz : 3392.093cache size : 8192 KBphysical id : 0siblings : 8core id : 0cpu cores : 4apicid : 0initial apicid : 0fpu : yesfpu_exception : yescpuid level : 13wp : yesflags : fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush dts acpi mmx fxsr sse sse2 ss ht tm pbe syscall nx pdpe1gb rdtscp lm constant_tsc arch_perfmon pebs bts rep_good nopl xtopology nonstop_tsc cpuid aperfmperf pni pclmulqdq dtes64 monitor ds_cpl vmx smx est tm2 ssse3 sdbg fma cx16 xtpr pdcm pcid sse4_1 sse4_2 x2apic movbe popcnt tsc_deadline_timer aes xsave avx f16c rdrand lahf_lm abm cpuid_fault tpr_shadow vnmi flexpriority ept vpid fsgsbase tsc_adjust bmi1 avx2 smep bmi2 erms invpcid xsaveopt dtherm ida arat pln ptsbugs :bogomips : 6784.18clflush size : 64cache_alignment : 64address sizes : 39 bits physical, 48 bits virtualpower management:

我尝试了1000个并发请求,但是APacheBench由于许多套接字被打开而超时。我尝试了httperf,这里是结果:

voodooattack@voodooattack:~$ httperf --port=3000 --num-conns=10000 --rate=1000httperf --client=0/1 --server=localhost --port=3000 --uri=/ --rate=1000 --send-buffer=4096 --recv-buffer=16384 --num-conns=10000 --num-calls=1httperf: warning: open file limit > FD_SETSIZE; limiting max. # of open files to FD_SETSIZEMaximum connect burst length: 262Total: connections 9779 requests 9779 replies 9779 test-duration 10.029 sConnection rate: 975.1 conn/s (1.0 ms/conn, <=1022 concurrent connections)Connection time [ms]: min 0.5 avg 337.9 max 7191.8 median 79.5 stddev 848.1Connection time [ms]: connect 207.3Connection length [replies/conn]: 1.000Request rate: 975.1 req/s (1.0 ms/req)Request size [B]: 62.0Reply rate [replies/s]: min 903.5 avg 974.6 max 1045.7 stddev 100.5 (2 samples)Reply time [ms]: response 129.5 transfer 1.1Reply size [B]: header 89.0 content 8660.0 footer 2.0 (total 8751.0)Reply status: 1xx=0 2xx=9779 3xx=0 4xx=0 5xx=0CPU time [s]: user 0.35 system 9.67 (user 3.5% system 96.4% total 99.9%)Net I/O: 8389.9 KB/s (68.7*10^6 bps)Errors: total 221 client-timo 0 socket-timo 0 connrefused 0 connreset 0Errors: fd-unavail 221 addrunavail 0 ftab-full 0 other 0

正如你看到的,它任然能工作。尽管由于压力,有些连接会超时。我们依然在研究导致这个问题的原因。

上面是我整理给大家的,希望今后会对大家有帮助。

相关文章:

JS操作DOM树遍历方法总结

JS实现数组去重算法

JS取得最小公倍数与最大公约数

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。