admin管理员组

文章数量:1552151

这段时间研究java的io与nio框架,一时兴起决定用java实现一个下载工具,主要有下面几个功能

1)支持多任务下载

2)支持多线程下载

3) 支持断点续传

4)错误线程任务的重新调度

用到的技术点 

1) http协议的range头技术

2)java的多线程

3)java的网络编程,主要是HttpUrlConnection类

4)java的io文件操作, 如RandomAccessFile类

速度还不错,基本上和浏览器下载差不多,可能比它还要快,我只开了10个线程。

 
  1. package org.blackfoxer.cat;

  2.  
  3. import java.io.File;

  4. import java.io.IOException;

  5. import java.io.RandomAccessFile;

  6. import java.io.UnsupportedEncodingException;

  7. import java.HttpURLConnection;

  8. import java.URL;

  9. import java.URLDecoder;

  10. import java.text.DecimalFormat;

  11. import java.util.concurrent.CountDownLatch;

  12. import java.util.concurrent.TimeUnit;

  13. import java.util.regex.Matcher;

  14. import java.util.regex.Pattern;

  15.  
  16. public class Job {

  17.  
  18. private int fileSize;

  19. private String fileName;

  20. private int connectTimeout = 10000;

  21. private int readTimeout = 20000;

  22. private String url;

  23. private String storeDir;

  24. private int taskNum;

  25. private String jobId;

  26.  
  27. private int[] startIndexes;

  28. private int[] endIndexes;

  29. private int[] progress;

  30. private Task[] tasks;

  31.  
  32. private File storeDirFile;

  33. private File dtDirFile;

  34. private File localFile;

  35.  
  36. private ThreadLocal<RandomAccessFile> rafLocalTl;

  37. private ThreadLocal<RandomAccessFile> rafOffsetTl;

  38. private CountDownLatch latch;

  39. private ProgressThread pt;

  40.  
  41. public Job(String url, String storeDir, int taskNum) throws IOException {

  42. this.url = url;

  43. this.storeDir = storeDir;

  44. this.taskNum = taskNum;

  45. this.startIndexes = new int[taskNum];

  46. this.endIndexes = new int[taskNum];

  47. this.progress = new int[taskNum];

  48. this.tasks = new Task[taskNum];

  49. this.latch = new CountDownLatch(taskNum);

  50. this.jobId = Math.abs(url.hashCode()) + "_" + taskNum;

  51. this.rafLocalTl = new ThreadLocal<RandomAccessFile>();

  52. this.rafOffsetTl = new ThreadLocal<RandomAccessFile>();

  53. this.pt = new ProgressThread();

  54. }

  55.  
  56. public void startJob() throws Exception {

  57. long start = System.currentTimeMillis();

  58. System.out.println("开始下载文件...");

  59. boolean j = fetchFileMetaInfo();

  60. if (j) {

  61. assignTasks();

  62. createFiles();

  63. startTasks();

  64. openProgressThread();

  65. waitForCompeletion();

  66. long end = System.currentTimeMillis();

  67. System.out.println("下载完毕,全程耗时" + (end - start) + "ms");

  68. } else {

  69. System.out.println("获取文件长度或文件名失败,请重试");

  70. }

  71. }

  72.  
  73. private void openProgressThread() {

  74. this.pt.start();

  75. }

  76.  
  77. private void waitForCompeletion() throws Exception {

  78. latch.await();

  79. deleteFiles();

  80. pt.join();

  81. }

  82.  
  83. private void deleteFiles() {

  84. if (dtDirFile != null) {

  85. File[] subFiles = dtDirFile.listFiles();

  86. for (File subFile : subFiles) {

  87. subFile.delete();

  88. }

  89. dtDirFile.delete();

  90. }

  91. }

  92.  
  93. // 1.fetch file size and file name

  94. private boolean fetchFileMetaInfo() throws IOException {

  95. HttpURLConnection connection = createConnection();

  96. connection.setRequestMethod("GET");

  97. if (connection.getResponseCode() == 200) {

  98. this.fileSize = connection.getContentLength();

  99. String disposition = connection.getHeaderField("Content-Disposition");

  100. if (disposition == null) {

  101. parseFileNameFromUrl(url);

  102. } else {

  103. parseFileNameFromDisposition(disposition);

  104. }

  105. if (this.fileName == null || this.fileSize < 0) {

  106. return false;

  107. }

  108. System.out.println("找到文件资源,长度为" + fileSize + ",资源名称为" + fileName);

  109. return true;

  110. }

  111. return false;

  112. }

  113.  
  114. private void parseFileNameFromUrl(String url) throws UnsupportedEncodingException {

  115. this.fileName = url.substring(url.lastIndexOf("/") + 1, url.length());

  116. if (this.fileName.contains("%")) {

  117. this.fileName = URLDecoder.decode(this.fileName, "UTF-8");

  118. }

  119. }

  120.  
  121. private void parseFileNameFromDisposition(String disposition) throws UnsupportedEncodingException {

  122. Pattern pattern = Patternpile(".+filename=\"(.+?)\".*");

  123. Matcher matcher = pattern.matcher(disposition);

  124. if (matcher.matches()) {

  125. this.fileName = new String(matcher.group(1).getBytes("ISO-8859-1"), "UTF-8");

  126. } else {

  127. parseFileNameFromUrl(url);

  128. }

  129. }

  130.  
  131. public HttpURLConnection createConnection() throws IOException {

  132. URL urlObj = new URL(url);

  133. HttpURLConnection connection = (HttpURLConnection) urlObj.openConnection();

  134. connection.setConnectTimeout(connectTimeout);

  135. connection.setReadTimeout(readTimeout);

  136. connection.setRequestProperty("Accept-Charset", "UTF-8");

  137. connection.setRequestProperty("contentType", "UTF-8");

  138. return connection;

  139. }

  140.  
  141. // 2.assign every task start index and end index out of the file

  142. private void assignTasks() throws IOException {

  143. for (int i = 0; i < taskNum; i++) {

  144. int size = fileSize / taskNum;

  145. int startIndex = i * size;

  146. int endIndex = i == taskNum - 1 ? fileSize - 1 : i * size + size - 1;

  147. this.startIndexes[i] = startIndex;

  148. this.endIndexes[i] = endIndex;

  149. }

  150. }

  151.  
  152. // 3.create the local file and temp directory

  153. private void createFiles() throws IOException {

  154. storeDirFile = new File(storeDir);

  155. storeDirFile.mkdirs();

  156. localFile = new File(storeDirFile, fileName);

  157. dtDirFile = new File(storeDirFile, "." + jobId);

  158. dtDirFile.mkdirs();

  159. if (!localFile.exists()) {

  160. RandomAccessFile raf = new RandomAccessFile(localFile, "rw");

  161. raf.setLength(fileSize);

  162. raf.close();

  163. }

  164. }

  165.  
  166. // 4.let the task start to do their work

  167. private void startTasks() throws IOException {

  168. for (int i = 0; i < taskNum; i++) {

  169. Task task = new Task(this, i);

  170. tasks[i] = task;

  171. task.start();

  172. }

  173. }

  174.  
  175. private int totalReadBytes() {

  176. int totalReadBytes = 0;

  177. for (int i = 0; i < progress.length; i++) {

  178. totalReadBytes += progress[i];

  179. }

  180. return totalReadBytes;

  181. }

  182.  
  183. public int[] getStartIndexes() {

  184. return startIndexes;

  185. }

  186.  
  187. public int[] getEndIndexes() {

  188. return endIndexes;

  189. }

  190.  
  191. public void writeLocalFile(int startIndex, byte[] buf, int off, int len) throws IOException {

  192. if (rafLocalTl.get() == null) {

  193. RandomAccessFile raf = new RandomAccessFile(localFile, "rw");

  194. rafLocalTl.set(raf);

  195. }

  196. RandomAccessFile raf = rafLocalTl.get();

  197. raf.seek(startIndex);

  198. raf.write(buf, off, len);

  199.  
  200. }

  201.  
  202. // 5.let task to report their progress

  203. public void reportProgress(int index, int readBytes) {

  204. progress[index] = readBytes;

  205. }

  206.  
  207. public void closeTaskResource(int index) throws IOException {

  208. RandomAccessFile raf = rafLocalTl.get();

  209. if (raf != null) {

  210. raf.close();

  211. }

  212. raf = rafOffsetTl.get();

  213. if (raf != null) {

  214. raf.close();

  215. }

  216. }

  217.  
  218. public void commitOffset(int index, int offset) throws IOException {

  219. File offsetFile = new File(dtDirFile, String.valueOf(index));

  220. if (rafOffsetTl.get() == null) {

  221. RandomAccessFile raf = new RandomAccessFile(offsetFile, "rw");

  222. rafOffsetTl.set(raf);

  223. }

  224. RandomAccessFile raf = rafOffsetTl.get();

  225. raf.seek(0);

  226. raf.writeInt(offset);

  227. }

  228.  
  229. public int readOffset(int index) throws IOException {

  230. File offsetFile = new File(dtDirFile, String.valueOf(index));

  231. if (offsetFile.exists()) {

  232. RandomAccessFile raf = new RandomAccessFile(offsetFile, "rw");

  233. raf.seek(0);

  234. int offset = raf.readInt();

  235. raf.close();

  236. return offset;

  237. }

  238. return 0;

  239. }

  240.  
  241. public void reStartTask(int index) throws IOException {

  242. Task task = new Task(this, index);

  243. tasks[index] = task;

  244. task.start();

  245. System.out.println("任务" + index + "发生错误,重新调度该任务");

  246. }

  247.  
  248. public void taskFinished() {

  249. latch.countDown();

  250. }

  251.  
  252. private class ProgressThread extends Thread {

  253. private DecimalFormat decimalFormat = new DecimalFormat();

  254.  
  255. public void run() {

  256. decimalFormat.applyPattern("0.0");

  257. while (true) {

  258. try {

  259. int endPointX = totalReadBytes();

  260. TimeUnit.SECONDS.sleep(1);

  261. int endPointY = totalReadBytes();

  262. int waitSeconds = 1;

  263. while (endPointY - endPointX == 0) {

  264. TimeUnit.SECONDS.sleep(1);

  265. waitSeconds++;

  266. endPointY = totalReadBytes();

  267. }

  268. int speed = (endPointY - endPointX) / waitSeconds;

  269. String speedStr = speed > 1024 ? speed/1024+"kb/s":speed+"b/s";

  270. String percent = decimalFormat.format(endPointY * 100.0 / fileSize);

  271. int remainSeconds = (fileSize - endPointY)/speed;

  272. System.out.println("下载完成"+percent+"%,速度"+speedStr+",估计还需要"+remainSeconds+"秒");

  273. if("100.0".equals(percent)) {

  274. break;

  275. }

  276. } catch (InterruptedException e) {

  277. e.printStackTrace();

  278. }

  279. }

  280. }

  281. }

  282.  
  283. }

 
  1. package org.blackfoxer.cat;

  2.  
  3. import java.io.IOException;

  4. import java.io.InputStream;

  5. import java.HttpURLConnection;

  6.  
  7. public class Task extends Thread {

  8.  
  9. private Job owner;

  10. private int index;

  11. private int readBytes;

  12. private int startIndex;

  13. private int endIndex;

  14.  
  15. public Task(Job owner,int index) throws IOException {

  16. this.owner = owner;

  17. this.index = index;

  18. if(owner.readOffset(index)!=0) {

  19. this.readBytes = owner.readOffset(index)-owner.getStartIndexes()[index];

  20. owner.reportProgress(index, readBytes);

  21. }

  22. this.startIndex = owner.getStartIndexes()[index]+readBytes;

  23. this.endIndex = owner.getEndIndexes()[index];

  24. }

  25.  
  26. public void run() {

  27. InputStream inputStream = null;

  28. HttpURLConnection connection = null;

  29. try {

  30. if(startIndex > endIndex) {

  31. owner.taskFinished();

  32. return;

  33. }

  34. connection = owner.createConnection();

  35. connection.setRequestMethod("GET");

  36. String range = "bytes="+startIndex+"-"+endIndex ;

  37. connection.setRequestProperty("Range", range);

  38. if(connection.getResponseCode()==206) {

  39. inputStream = connection.getInputStream();

  40. int len = -1;

  41. byte buf[] = new byte[1024];

  42. int offset = startIndex;

  43. while((len=inputStream.read(buf))!=-1) {

  44. owner.writeLocalFile(offset,buf,0,len);

  45. readBytes+=len;

  46. offset+=len;

  47. ownermitOffset(index,offset);

  48. owner.reportProgress(index,readBytes);

  49. }

  50. owner.taskFinished();

  51. }

  52. } catch (IOException e) {

  53. e.printStackTrace();

  54. try {

  55. owner.reStartTask(index);

  56. } catch (IOException e1) {

  57. e1.printStackTrace();

  58. }

  59. } finally {

  60. if(inputStream != null) {

  61. try {

  62. inputStream.close();

  63. } catch (IOException e) {

  64. e.printStackTrace();

  65. }

  66. }

  67. if(connection != null) {

  68. connection.disconnect();

  69. }

  70. try {

  71. owner.closeTaskResource(index);

  72. } catch (IOException e) {

  73. e.printStackTrace();

  74. }

  75. }

  76. }

  77. }

 

 
  1. package org.blackfoxer.cat;

  2.  
  3. import java.io.BufferedReader;

  4. import java.io.IOException;

  5. import java.io.InputStreamReader;

  6.  
  7. public class JavaXunlei {

  8.  
  9. private static final BufferedReader in = new BufferedReader(new InputStreamReader(System.in));

  10. private static String storeDir = null;

  11. public static void main(String args[]) throws Exception {

  12. storeDir = getInput("请先设置你的文件存储目录:");

  13. int taskNum = getIntInput("请输入你的开启下载的线程数:");

  14. while(true) {

  15. String url = getInput("请输入文件链接地址:");

  16. Job job = new Job(url,storeDir,taskNum);

  17. job.startJob();

  18. }

  19. }

  20.  
  21.  
  22. private static int getIntInput(String message) throws IOException {

  23. String number = getInput(message);

  24. while(!number.matches("\\d+")) {

  25. System.out.println("线程数必须是1个整数");

  26. number = getInput(message);

  27. }

  28. return Integer.parseInt(number);

  29. }

  30.  
  31. private static String getInput(String message) throws IOException {

  32. System.out.print(message);

  33. String line = in.readLine();

  34. while(line == null || line.trim().length()<1) {

  35. System.out.print(message);

  36. line = in.readLine();

  37. }

  38. return line.trim();

  39. }

  40. }

本文标签: 迅雷源码Java