百木园-与人分享,
就是让自己快乐。

使用Go搭建并行排序处理管道笔记

一、并行管道搭建:

总结下实现思路:

  1. 归并排序:进行集合元素排序(节点),并两两节点归并排序;每个节点元素要求有序的(排序),当然终点最小节点元数个数为1必是有序的;
  2. 节点:任务处理单元,归并排序节点是处理输出有序集合任务的单元;文件过大单台机排不了需要多台机集群;
  3. 根据粒度,单机版:非并发节点可以是排序方法,并发节点可以是一个线程/协程去处理(异步排序),集群版节点是一个主机;
  4. 单机版,不管并发还是非并发,节点采用的是内存共享数据;集群版节点则需要网络连接请求应答来共享数据;
  5. go语言异步数据传输通道通过channel实现的;
  6. 每个节点将处理的数据异步发送到各自channel中,等待一个主节点获取归并,集群版多了网络的数据传输。

 

二、代码实现:

  1. 本地节点 nodes.go:
    package pipeline
    
    import (
    	\"encoding/binary\"
    	\"fmt\"
    	\"io\"
    	\"math/rand\"
    	\"sort\"
    	\"time\"
    )
    
    var startTime time.Time
    
    func Init() {
    	startTime = time.Now()
    }
    
    //内部处理方法
    //这里是排序:异步处理容器元素排序
    func InMemSort(in <-chan int) <-chan int {
    	out := make(chan int, 1024)
    	go func() {
    		a := []int{}
    		for v := range in {
    			a = append(a, v)
    		}
    		fmt.Println(\"Read done:\", time.Since(startTime))
    
    		sort.Ints(a)
    		fmt.Println(\"InMemSort done:\", time.Since(startTime))
    
    		for _, v := range a {
    			out <- v
    		}
    		close(out)
    	}()
    	return out
    }
    
    //两路和并,每路通过内部方法异步处理
    //这里是排序:in1,in2元素需要排好序(经过内部方法InMemSort异步处理)的容器单元(channel 异步容器/队列)
    func Merge(in1, in2 <-chan int) <-chan int {
    	out := make(chan int, 1024)
    	// go func() {
    	// 	v1, ok1 := <-in1
    	// 	v2, ok2 := <-in2
    	// 	for {
    	// 		if ok1 || ok2 {
    	// 			if !ok2 || (ok1 && v1 <= v2) { //v2无值或v1值比v2大
    	// 				out <- v1
    	// 				v1, ok1 = <-in1
    	// 			} else {
    	// 				out <- v2
    	// 				v2, ok2 = <-in2
    	// 			}
    	// 		} else {
    	// 			close(out)
    	// 			break
    	// 		}
    	// 	}
    	// }()
    	go func() {
    		v1, ok1 := <-in1
    		v2, ok2 := <-in2
    		for ok1 || ok2 {
    			if !ok2 || (ok1 && v1 <= v2) { //v2无值或v1值比v2大
    				out <- v1
    				v1, ok1 = <-in1
    			} else {
    				out <- v2
    				v2, ok2 = <-in2
    			}
    		}
    		close(out)
    
    		fmt.Println(\"Merge done:\", time.Since(startTime))
    	}()
    	return out
    }
    
    //读取原数据
    //chunkSize=-1全读
    func ReadSource(r io.Reader, chunkSize int) <-chan int {
    	out := make(chan int, 1024)
    	go func() {
    		buffer := make([]byte, 8) //int长度根据操作系统来的,64位为int64,64位8个字节
    		bytesRead := 0
    		for { //持续读取
    			n, err := r.Read(buffer) //读取一个int 8byte
    			bytesRead += n
    			if n > 0 {
    				out <- int(binary.BigEndian.Uint64(buffer)) //字节数组转int
    			}
    			if err != nil || (chunkSize != -1 && bytesRead >= chunkSize) { //-1全读
    				break
    			}
    		}
    		close(out)
    	}()
    	return out
    }
    
    //写处理后(排序)数据
    func WriteSink(w io.Writer, in <-chan int) {
    	for v := range in {
    		buffer := make([]byte, 8)
    		binary.BigEndian.PutUint64(buffer, uint64(v))
    		w.Write(buffer)
    	}
    }
    
    //随机生成数据源
    func RandomSource(count int) <-chan int {
    	out := make(chan int)
    	go func() {
    		for i := 0; i < count; i++ {
    			out <- rand.Int()
    		}
    		close(out)
    	}()
    	return out
    }
    
    //多路两两归并,每路通过内部方法异步处理
    //这里是排序:ins元素需要排好序(经过内部方法InMemSort异步处理)的容器单元(channel 异步容器/队列)
    func MergeN(ins ...<-chan int) <-chan int {
    	if len(ins) == 1 {
    		return ins[0]
    	}
    	m := len(ins) / 2
    	return Merge(
    		MergeN(ins[:m]...),
    		MergeN(ins[m:]...)) //chennel异步并发归并
    }
    

      

  2. 网络节点:
    package pipeline
    
    import (
    	\"bufio\"
    	\"net\"
    )
    
    //节点服务端数据写入到Network中
    //开启服务后,用goroutine等连接,避免创建pipeline阻塞
    func NetworkSink(addr string, in <-chan int) {
    	//net必须是面向流的网络:\"tcp\"、\"tcp4\"、\"tcp6\"、\"unix\"或\"unixpacket\"
    	listener, err := net.Listen(\"tcp\", addr) //addr ip:port
    	if err != nil {
    		panic(err)
    	}
    	go func() { //不能等待阻塞
    		for {
    			conn, err := listener.Accept()
    			if err != nil {
    				continue
    			}
    			w := bufio.NewWriter(conn)
    			WriteSink(w, in)
    			w.Flush()    //使用bufio Writer最后一定要Flush把缓存数据发出去  defer
    			conn.Close() //关闭
    		}
    		// defer listener.Close()
    		// conn, err := listener.Accept()
    		// if err != nil {
    		// 	panic(err)
    		// }
    		// defer conn.Close()
    		// w := bufio.NewWriter(conn)
    		// WriteSink(w, in)
    		// defer w.Flush()
    	}()
    }
    
    //Network向节点服务端读取数据源
    func NetworkSource(addr string) <-chan int {
    	out := make(chan int)
    	go func() {
    		conn, err := net.Dial(\"tcp\", addr)
    		if err != nil {
    			panic(err)
    		}
    		defer conn.Close()
    
    		r := ReadSource(bufio.NewReader(conn), -1)
    		for v := range r {
    			out <- v
    		}
    		close(out)
    	}()
    	return out
    }
    

      

  3. 创建管道:
    package main
    
    import (
    	\"bufio\"
    	\"fmt\"
    	\"goBase/pipelinedemo/pipeline\"
    	\"os\"
    	\"strconv\"
    )
    
    const sourceFilename = \"../large.in\"
    const resultFilename = \"../large.out\"
    
    //单机版而言,并发使用channel效率肯定是下降的
    //好处,当文件过大,一台机器排不了,多机排序
    func main() {
    	p, files := createNetworkPipeline(sourceFilename, 800000000, 4) //平均每个文件读取int64数:800000000/8/4
    	defer func() {
    		for _, file := range files {
    			file.Close()
    		}
    	}()
    	writeToFile(p, resultFilename) //该方法运行,通道才真正打开
    	printFile(resultFilename)
    }
    
    
    //创建并行处理管道
    //fileSize 文件字节数
    //chunkCount 节点数 读取文件分块数
    func createNetworkPipeline(filename string, fileSize, chunkCount int) (<-chan int, []*os.File) {
    
    	chunkSize := fileSize / chunkCount //每个节点读取文件字节数
    
    	//outs := make([]<-chan int, chunkCount)
    	outs := []<-chan int{}
    	sortAddr := []string{}
    
    	files := []*os.File{}
    
    	pipeline.Init() //开始计时
    
    	//#region 节点服务端工作
    
    	for count := 0; count < chunkCount; count++ {
    		file, err := os.Open(filename) //这里file没有close,需要返回*[]File,在外面close
    		if err != nil {
    			panic(err)
    		}
    		files = append(files, file)
    
    		//Seek设置下一次读/写的位置。offset为相对偏移量,
    		//whence决定相对位置:0为相对文件开头,1为相对当前位置,2为相对文件结尾
    		file.Seek(int64(count*chunkSize), 0) //读文件字节范围
    
    		source := pipeline.ReadSource(bufio.NewReader(file), chunkSize)
    
    		// outs = append(outs, pipeline.InMemSort(source))
    		//本机地址
    		addr := \":\" + strconv.Itoa(7000+count)                 //将数字转换成对应的字符串类型的数字
    		pipeline.NetworkSink(addr, pipeline.InMemSort(source)) //开启节点服务监听,收到请求发送数据将写入到Network,异步不能等待阻塞
    
    		sortAddr = append(sortAddr, addr)
    	}
    
    	//#endregion
    
    
    	//#region Network工作
    
    	for _, addr := range sortAddr {
    		outs = append(outs, pipeline.NetworkSource(addr))
    	}
    	//构建管道,goroutine还没有运行,不能确定InMemSort是否全部排序完成,不能在该方法close file
    	return pipeline.MergeN(outs...), files
    
    	//#endregion
    }
    
    func writeToFile(in <-chan int, filename string) {
    	file, err := os.Create(filename)
    	if err != nil {
    		panic(err)
    	}
    	defer file.Close()
    
    	w := bufio.NewWriter(file)
    	defer w.Flush()
    
    	pipeline.WriteSink(w, in)
    }
    
    func printFile(filename string) {
    	file, err := os.Open(filename)
    	if err != nil {
    		panic(err)
    	}
    	defer file.Close()
    	count := 0
    	all := pipeline.ReadSource(bufio.NewReader(file), -1)
    	for s := range all {
    		fmt.Println(s)
    		count++
    		if count > 100 {
    			break
    		}
    	}
    }

来源:https://www.cnblogs.com/jn-shao/p/16248109.html
本站部分图文来源于网络,如有侵权请联系删除。

未经允许不得转载:百木园 » 使用Go搭建并行排序处理管道笔记

相关推荐

  • 暂无文章