-
當(dāng)前位置:首頁 > 創(chuàng)意學(xué)院 > 技術(shù) > 專題列表 > 正文
請簡述一下HDFS數(shù)據(jù)讀取和寫入流程(請簡述hdfs的讀寫流程)
大家好!今天讓創(chuàng)意嶺的小編來大家介紹下關(guān)于請簡述一下HDFS數(shù)據(jù)讀取和寫入流程的問題,以下是小編對此問題的歸納整理,讓我們一起來看看吧。
開始之前先推薦一個非常厲害的Ai人工智能工具,一鍵生成原創(chuàng)文章、方案、文案、工作計劃、工作報告、論文、代碼、作文、做題和對話答疑等等
只需要輸入關(guān)鍵詞,就能返回你想要的內(nèi)容,越精準(zhǔn),寫出的就越詳細(xì),有微信小程序端、在線網(wǎng)頁版、PC客戶端
官網(wǎng):https://ai.de1919.com。
創(chuàng)意嶺作為行業(yè)內(nèi)優(yōu)秀的企業(yè),服務(wù)客戶遍布全球各地,如需了解SEO相關(guān)業(yè)務(wù)請撥打電話175-8598-2043,或添加微信:1454722008
本文目錄:
一、Hadoop讀寫文件時內(nèi)部工作機(jī)制是怎樣的
客戶端通過調(diào)用FileSystem對象(對應(yīng)于HDFS文件系統(tǒng),調(diào)用DistributedFileSystem對象)的open()方法來打開文件(也即圖中的第一步),DistributedFileSystem通過RPC(Remote Procedure Call)調(diào)用詢問NameNode來得到此文件最開始幾個block的文件位置(第二步)。對每一個block來說,namenode返回?fù)碛写薭lock備份的所有namenode的地址信息(按集群的拓?fù)渚W(wǎng)絡(luò)中與客戶端距離的遠(yuǎn)近排序,關(guān)于在Hadoop集群中如何進(jìn)行網(wǎng)絡(luò)拓?fù)湔埧聪旅娼榻B)。如果客戶端本身就是一個datanode(如客戶端是一個mapreduce任務(wù))并且此datanode本身就有所需文件block的話,客戶端便從本地讀取文件。
以上步驟完成后,DistributedFileSystem會返回一個FSDataInputStream(支持文件seek),客戶端可以從FSDataInputStream中讀取數(shù)據(jù)。FSDataInputStream包裝了一個DFSInputSteam類,用來處理namenode和datanode的I/O操作。
客戶端然后執(zhí)行read()方法(第三步),DFSInputStream(已經(jīng)存儲了欲讀取文件的開始幾個block的位置信息)連接到第一個datanode(也即最近的datanode)來獲取數(shù)據(jù)。通過重復(fù)調(diào)用read()方法(第四、第五步),文件內(nèi)的數(shù)據(jù)就被流式的送到了客戶端。當(dāng)讀到該block的末尾時,DFSInputStream就會關(guān)閉指向該block的流,轉(zhuǎn)而找到下一個block的位置信息然后重復(fù)調(diào)用read()方法繼續(xù)對該block的流式讀取。這些過程對于用戶來說都是透明的,在用戶看來這就是不間斷的流式讀取整個文件。
當(dāng)真?zhèn)€文件讀取完畢時,客戶端調(diào)用FSDataInputSteam中的close()方法關(guān)閉文件輸入流(第六步)。
如果在讀某個block是DFSInputStream檢測到錯誤,DFSInputSteam就會連接下一個datanode以獲取此block的其他備份,同時他會記錄下以前檢測到的壞掉的datanode以免以后再無用的重復(fù)讀取該datanode。DFSInputSteam也會檢查從datanode讀取來的數(shù)據(jù)的校驗和,如果發(fā)現(xiàn)有數(shù)據(jù)損壞,它會把壞掉的block報告給namenode同時重新讀取其他datanode上的其他block備份。
這種設(shè)計模式的一個好處是,文件讀取是遍布這個集群的datanode的,namenode只是提供文件block的位置信息,這些信息所需的帶寬是很少的,這樣便有效的避免了單點瓶頸問題從而可以更大的擴(kuò)充集群的規(guī)模。
Hadoop中的網(wǎng)絡(luò)拓?fù)?/p>
在Hadoop集群中如何衡量兩個節(jié)點的遠(yuǎn)近呢?要知道,在高速處理數(shù)據(jù)時,數(shù)據(jù)處理速率的唯一限制因素就是數(shù)據(jù)在不同節(jié)點間的傳輸速度:這是由帶寬的可怕匱乏引起的。所以我們把帶寬作為衡量兩個節(jié)點距離大小的標(biāo)準(zhǔn)。
但是計算兩個節(jié)點之間的帶寬是比較復(fù)雜的,而且它需要在一個靜態(tài)的集群下才能衡量,但Hadoop集群一般是隨著數(shù)據(jù)處理的規(guī)模動態(tài)變化的(且兩兩節(jié)點直接相連的連接數(shù)是節(jié)點數(shù)的平方)。于是Hadoop使用了一個簡單的方法來衡量距離,它把集群內(nèi)的網(wǎng)絡(luò)表示成一個樹結(jié)構(gòu),兩個節(jié)點之間的距離就是他們離共同祖先節(jié)點的距離之和。樹一般按數(shù)據(jù)中心(datacenter),機(jī)架(rack),計算節(jié)點(datanode)的結(jié)構(gòu)組織。計算節(jié)點上的本地運(yùn)算速度最快,跨數(shù)據(jù)中心的計算速度最慢(現(xiàn)在跨數(shù)據(jù)中心的Hadoop集群用的還很少,一般都是在一個數(shù)據(jù)中心內(nèi)做運(yùn)算的)。
假如有個計算節(jié)點n1處在數(shù)據(jù)中心c1的機(jī)架r1上,它可以表示為/c1/r1/n1,下面是不同情況下兩個節(jié)點的距離:
• distance(/d1/r1/n1, /d1/r1/n1) = 0 (processes on the same node)
• distance(/d1/r1/n1, /d1/r1/n2) = 2 (different nodes on the same rack)
• distance(/d1/r1/n1, /d1/r2/n3) = 4 (nodes on different racks in the same data center)
• distance(/d1/r1/n1, /d2/r3/n4) = 6 (nodes in different data centers)
如下圖所示:
Hadoop
寫文件
現(xiàn)在我們來看一下Hadoop中的寫文件機(jī)制解析,通過寫文件機(jī)制我們可以更好的了解一下Hadoop中的一致性模型。
Hadoop
上圖為我們展示了一個創(chuàng)建一個新文件并向其中寫數(shù)據(jù)的例子。
首先客戶端通過DistributedFileSystem上的create()方法指明一個欲創(chuàng)建的文件的文件名(第一步),DistributedFileSystem再通過RPC調(diào)用向NameNode申請創(chuàng)建一個新文件(第二步,這時該文件還沒有分配相應(yīng)的block)。namenode檢查是否有同名文件存在以及用戶是否有相應(yīng)的創(chuàng)建權(quán)限,如果檢查通過,namenode會為該文件創(chuàng)建一個新的記錄,否則的話文件創(chuàng)建失敗,客戶端得到一個IOException異常。DistributedFileSystem返回一個FSDataOutputStream以供客戶端寫入數(shù)據(jù),與FSDataInputStream類似,F(xiàn)SDataOutputStream封裝了一個DFSOutputStream用于處理namenode與datanode之間的通信。
當(dāng)客戶端開始寫數(shù)據(jù)時(第三步),DFSOutputStream把寫入的數(shù)據(jù)分成包(packet), 放入一個中間隊列——數(shù)據(jù)隊列(data queue)中去。DataStreamer從數(shù)據(jù)隊列中取數(shù)據(jù),同時向namenode申請一個新的block來存放它已經(jīng)取得的數(shù)據(jù)。namenode選擇一系列合適的datanode(個數(shù)由文件的replica數(shù)決定)構(gòu)成一個管道線(pipeline),這里我們假設(shè)replica為3,所以管道線中就有三個datanode。DataSteamer把數(shù)據(jù)流式的寫入到管道線中的第一個datanode中(第四步),第一個datanode再把接收到的數(shù)據(jù)轉(zhuǎn)到第二個datanode中(第四步),以此類推。
DFSOutputStream同時也維護(hù)著另一個中間隊列——確認(rèn)隊列(ack queue),確認(rèn)隊列中的包只有在得到管道線中所有的datanode的確認(rèn)以后才會被移出確認(rèn)隊列(第五步)。
如果某個datanode在寫數(shù)據(jù)的時候當(dāng)?shù)袅?,下面這些對用戶透明的步驟會被執(zhí)行:
1)管道線關(guān)閉,所有確認(rèn)隊列上的數(shù)據(jù)會被挪到數(shù)據(jù)隊列的首部重新發(fā)送,這樣可以確保管道線中當(dāng)?shù)舻膁atanode下流的datanode不會因為當(dāng)?shù)舻膁atanode而丟失數(shù)據(jù)包。
2)在還在正常運(yùn)行的datanode上的當(dāng)前block上做一個標(biāo)志,這樣當(dāng)當(dāng)?shù)舻膁atanode重新啟動以后namenode就會知道該datanode上哪個block是剛才當(dāng)機(jī)時殘留下的局部損壞block,從而可以把它刪掉。
3)已經(jīng)當(dāng)?shù)舻膁atanode從管道線中被移除,未寫完的block的其他數(shù)據(jù)繼續(xù)被寫入到其他兩個還在正常運(yùn)行的datanode中去,namenode知道這個block還處在under-replicated狀態(tài)(也即備份數(shù)不足的狀態(tài))下,然后他會安排一個新的replica從而達(dá)到要求的備份數(shù),后續(xù)的block寫入方法同前面正常時候一樣。
有可能管道線中的多個datanode當(dāng)?shù)簦m然不太經(jīng)常發(fā)生),但只要dfs.replication.min(默認(rèn)為1)個replica被創(chuàng)建,我們就認(rèn)為該創(chuàng)建成功了。剩余的replica會在以后異步創(chuàng)建以達(dá)到指定的replica數(shù)。
當(dāng)客戶端完成寫數(shù)據(jù)后,它會調(diào)用close()方法(第六步)。這個操作會沖洗(flush)所有剩下的package到pipeline中,等待這些package確認(rèn)成功,然后通知namenode寫入文件成功(第七步)。這時候namenode就知道該文件由哪些block組成(因為DataStreamer向namenode請求分配新block,namenode當(dāng)然會知道它分配過哪些blcok給給定文件),它會等待最少的replica數(shù)被創(chuàng)建,然后成功返回。
replica是如何分布的
Hadoop在創(chuàng)建新文件時是如何選擇block的位置的呢,綜合來說,要考慮以下因素:帶寬(包括寫帶寬和讀帶寬)和數(shù)據(jù)安全性。如果我們把三個備份全部放在一個datanode上,雖然可以避免了寫帶寬的消耗,但幾乎沒有提供數(shù)據(jù)冗余帶來的安全性,因為如果這個datanode當(dāng)機(jī),那么這個文件的所有數(shù)據(jù)就全部丟失了。另一個極端情況是,如果把三個冗余備份全部放在不同的機(jī)架,甚至數(shù)據(jù)中心里面,雖然這樣數(shù)據(jù)會安全,但寫數(shù)據(jù)會消耗很多的帶寬。Hadoop 0.17.0給我們提供了一個默認(rèn)replica分配策略(Hadoop 1.X以后允許replica策略是可插拔的,也就是你可以自己制定自己需要的replica分配策略)。replica的默認(rèn)分配策略是把第一個備份放在與客戶端相同的datanode上(如果客戶端在集群外運(yùn)行,就隨機(jī)選取一個datanode來存放第一個replica),第二個replica放在與第一個replica不同機(jī)架的一個隨機(jī)datanode上,第三個replica放在與第二個replica相同機(jī)架的隨機(jī)datanode上。如果replica數(shù)大于三,則隨后的replica在集群中隨機(jī)存放,Hadoop會盡量避免過多的replica存放在同一個機(jī)架上。選取replica的放置位置后,管道線的網(wǎng)絡(luò)拓?fù)浣Y(jié)構(gòu)如下所示:
Hadoop
總體來說,上述默認(rèn)的replica分配策略給了我們很好的可用性(blocks放置在兩個rack上,較為安全),寫帶寬優(yōu)化(寫數(shù)據(jù)只需要跨越一個rack),讀帶寬優(yōu)化(你可以從兩個機(jī)架中選擇較近的一個讀取)。
一致性模型
HDFS某些地方為了性能可能會不符合POSIX(是的,你沒有看錯,POSIX不僅僅只適用于linux/unix, Hadoop 使用了POSIX的設(shè)計來實現(xiàn)對文件系統(tǒng)文件流的讀取 ),所以它看起來可能與你所期望的不同,要注意。
創(chuàng)建了一個文件以后,它是可以在命名空間(namespace)中可以看到的:
Path p = new Path("p");
fs.create(p);
assertThat(fs.exists(p), is(true));
但是任何向此文件中寫入的數(shù)據(jù)并不能保證是可見的,即使你flush了已經(jīng)寫入的數(shù)據(jù),此文件的長度可能仍然為零:
Path p = new Path("p");
OutputStream out = fs.create(p);
out.write("content".getBytes("UTF-8"));
out.flush();
assertThat(fs.getFileStatus(p).getLen(), is(0L));
這是因為,在Hadoop中,只有滿一個block數(shù)據(jù)量的數(shù)據(jù)被寫入文件后,此文件中的內(nèi)容才是可見的(即這些數(shù)據(jù)會被寫入到硬盤中去),所以當(dāng)前正在寫的block中的內(nèi)容總是不可見的。
Hadoop提供了一種強(qiáng)制使buffer中的內(nèi)容沖洗到datanode的方法,那就是FSDataOutputStream的sync()方法。調(diào)用了sync()方法后,Hadoop保證所有已經(jīng)被寫入的數(shù)據(jù)都被沖洗到了管道線中的datanode中,并且對所有讀者都可見了:
Path p = new Path("p");
FSDataOutputStream out = fs.create(p);
out.write("content".getBytes("UTF-8"));
out.flush();
out.sync();
assertThat(fs.getFileStatus(p).getLen(), is(((long) "content".length())));
這個方法就像POSIX中的fsync系統(tǒng)調(diào)用(它沖洗給定文件描述符中的所有緩沖數(shù)據(jù)到磁盤中)。例如,使用java API寫一個本地文件,我們可以保證在調(diào)用flush()和同步化后可以看到已寫入的內(nèi)容:
FileOutputStream out = new FileOutputStream(localFile);
out.write("content".getBytes("UTF-8"));
out.flush(); // flush to operating system
out.getFD().sync(); // sync to disk (getFD()返回與該流所對應(yīng)的文件描述符)
assertThat(localFile.length(), is(((long) "content".length())));
在HDFS中關(guān)閉一個流隱式的調(diào)用了sync()方法:
Path p = new Path("p");
OutputStream out = fs.create(p);
out.write("content".getBytes("UTF-8"));
out.close();
assertThat(fs.getFileStatus(p).getLen(), is(((long) "content".length())));
由于Hadoop中的一致性模型限制,如果我們不調(diào)用sync()方法的話,我們很可能會丟失多大一個block的數(shù)據(jù)。這是難以接受的,所以我們應(yīng)該使用sync()方法來確保數(shù)據(jù)已經(jīng)寫入磁盤。但頻繁調(diào)用sync()方法也是不好的,因為會造成很多額外開銷。我們可以再寫入一定量數(shù)據(jù)后調(diào)用sync()方法一次,至于這個具體的數(shù)據(jù)量大小就要根據(jù)你的應(yīng)用程序而定了,在不影響你的應(yīng)用程序的性能的情況下,這個數(shù)據(jù)量應(yīng)越大越好。
二、如何使用Hadoop讀寫數(shù)據(jù)庫
我們的一些應(yīng)用程序中,常常避免不了要與數(shù)據(jù)庫進(jìn)行交互,而在我們的hadoop中,有時候也需要和數(shù)據(jù)庫進(jìn)行交互,比如說,數(shù)據(jù)分析的結(jié)果存入數(shù)據(jù)庫,
或者是,讀取數(shù)據(jù)庫的信息寫入HDFS上,不過直接使用MapReduce操作數(shù)據(jù)庫,這種情況在現(xiàn)實開發(fā)還是比較少,一般我們會采用Sqoop來進(jìn)行數(shù)
據(jù)的遷入,遷出,使用Hive分析數(shù)據(jù)集,大多數(shù)情況下,直接使用Hadoop訪問關(guān)系型數(shù)據(jù)庫,可能產(chǎn)生比較大的數(shù)據(jù)訪問壓力,尤其是在數(shù)據(jù)庫還是單機(jī)
的情況下,情況可能更加糟糕,在集群的模式下壓力會相對少一些。
那么,今天散仙就來看下,如何直接使用Hadoop1.2.0的MR來讀寫操作數(shù)據(jù)庫,hadoop的API提供了DBOutputFormat和
DBInputFormat這兩個類,來進(jìn)行與數(shù)據(jù)庫交互,除此之外,我們還需要定義一個類似JAVA
Bean的實體類,來與數(shù)據(jù)庫的每行記錄進(jìn)行對應(yīng),通常這個類要實現(xiàn)Writable和DBWritable接口,來重寫里面的4個方法以對應(yīng)獲取每行記
三、怎樣將數(shù)據(jù)庫數(shù)據(jù)寫入到hdfs中
如下面這個shell腳本:
#Oracle的連接字符串,其中包含了Oracle的地址,SID,和端口號
CONNECTURL=jdbc:oracle:thin:@20.135.60.21:1521:DWRAC2
#使用的用戶名
ORACLENAME=kkaa
#使用的密碼
ORACLEPASSWORD=kkaa123
#需要從Oracle中導(dǎo)入的表名
oralceTableName=tt
#需要從Oracle中導(dǎo)入的表中的字段名
columns=AREA_ID,TEAM_NAME
#將Oracle中的數(shù)據(jù)導(dǎo)入到HDFS后的存放路徑
hdfsPath=apps/as/hive/$oralceTableName
#執(zhí)行導(dǎo)入邏輯。將Oracle中的數(shù)據(jù)導(dǎo)入到HDFS中
sqoop import --append --connect $CONNECTURL --username $ORACLENAME --password $ORACLEPASSWORD --target-dir $hdfsPath --num-mappers 1 --table $oralceTableName --columns $columns --fields-terminated-by '\001'
執(zhí)行這個腳本之后,導(dǎo)入程序就完成了。
四、如何使用Java API讀寫HDFS
Java API讀寫HDFS
public class FSOptr {
/**
* @param args
*/
public static void main(String[] args) throws Exception {
// TODO Auto-generated method stub
Configuration conf = new Configuration();
makeDir(conf);
rename(conf);
delete(conf);
}
// 創(chuàng)建文件目錄
private static void makeDir(Configuration conf) throws Exception {
FileSystem fs = FileSystem.get(conf);
Path dir = new Path("/user/hadoop/data/20140318");
boolean result = fs.mkdirs(dir);// 創(chuàng)建文件夾
System.out.println("make dir :" + result);
// 創(chuàng)建文件,并寫入內(nèi)容
Path dst = new Path("/user/hadoop/data/20140318/tmp");
byte[] buff = "hello,hadoop!".getBytes();
FSDataOutputStream outputStream = fs.create(dst);
outputStream.write(buff, 0, buff.length);
outputStream.close();
FileStatus files[] = fs.listStatus(dst);
for (FileStatus file : files) {
System.out.println(file.getPath());
}
fs.close();
}
// 重命名文件
private static void rename(Configuration conf) throws Exception {
FileSystem fs = FileSystem.get(conf);
Path oldName = new Path("/user/hadoop/data/20140318/1.txt");
Path newName = new Path("/user/hadoop/data/20140318/2.txt");
fs.rename(oldName, newName);
FileStatus files[] = fs.listStatus(new Path(
"/user/hadoop/data/20140318"));
for (FileStatus file : files) {
System.out.println(file.getPath());
}
fs.close();
}
// 刪除文件
@SuppressWarnings("deprecation")
private static void delete(Configuration conf) throws Exception {
FileSystem fs = FileSystem.get(conf);
Path path = new Path("/user/hadoop/data/20140318");
if (fs.isDirectory(path)) {
FileStatus files[] = fs.listStatus(path);
for (FileStatus file : files) {
fs.delete(file.getPath());
}
} else {
fs.delete(path);
}
// 或者
fs.delete(path, true);
fs.close();
}
/**
* 下載,將hdfs文件下載到本地磁盤
*
* @param localSrc1
* 本地的文件地址,即文件的路徑
* @param hdfsSrc1
* 存放在hdfs的文件地址
*/
public boolean sendFromHdfs(String hdfsSrc1, String localSrc1) {
Configuration conf = new Configuration();
FileSystem fs = null;
try {
fs = FileSystem.get(URI.create(hdfsSrc1), conf);
Path hdfs_path = new Path(hdfsSrc1);
Path local_path = new Path(localSrc1);
fs.copyToLocalFile(hdfs_path, local_path);
return true;
} catch (IOException e) {
e.printStackTrace();
}
return false;
}
/**
* 上傳,將本地文件copy到hdfs系統(tǒng)中
*
* @param localSrc
* 本地的文件地址,即文件的路徑
* @param hdfsSrc
* 存放在hdfs的文件地址
*/
public boolean sendToHdfs1(String localSrc, String hdfsSrc) {
InputStream in;
try {
in = new BufferedInputStream(new FileInputStream(localSrc));
Configuration conf = new Configuration();// 得到配置對象
FileSystem fs; // 文件系統(tǒng)
try {
fs = FileSystem.get(URI.create(hdfsSrc), conf);
// 輸出流,創(chuàng)建一個輸出流
OutputStream out = fs.create(new Path(hdfsSrc),
new Progressable() {
// 重寫progress方法
public void progress() {
// System.out.println("上傳完一個設(shè)定緩存區(qū)大小容量的文件!");
}
});
// 連接兩個流,形成通道,使輸入流向輸出流傳輸數(shù)據(jù),
IOUtils.copyBytes(in, out, 10240, true); // in為輸入流對象,out為輸出流對象,4096為緩沖區(qū)大小,true為上傳后關(guān)閉流
return true;
} catch (IOException e) {
e.printStackTrace();
}
} catch (FileNotFoundException e) {
e.printStackTrace();
}
return false;
}
/**
* 移動
*
* @param old_st原來存放的路徑
* @param new_st移動到的路徑
*/
public boolean moveFileName(String old_st, String new_st) {
try {
// 下載到服務(wù)器本地
boolean down_flag = sendFromHdfs(old_st, "/home/hadoop/文檔/temp");
Configuration conf = new Configuration();
FileSystem fs = null;
// 刪除源文件
try {
fs = FileSystem.get(URI.create(old_st), conf);
Path hdfs_path = new Path(old_st);
fs.delete(hdfs_path);
} catch (IOException e) {
e.printStackTrace();
}
// 從服務(wù)器本地傳到新路徑
new_st = new_st + old_st.substring(old_st.lastIndexOf("/"));
boolean uplod_flag = sendToHdfs1("/home/hadoop/文檔/temp", new_st);
if (down_flag && uplod_flag) {
return true;
}
} catch (Exception e) {
e.printStackTrace();
}
return false;
}
// copy本地文件到hdfs
private static void CopyFromLocalFile(Configuration conf) throws Exception {
FileSystem fs = FileSystem.get(conf);
Path src = new Path("/home/hadoop/word.txt");
Path dst = new Path("/user/hadoop/data/");
fs.copyFromLocalFile(src, dst);
fs.close();
}
// 獲取給定目錄下的所有子目錄以及子文件
private static void getAllChildFile(Configuration conf) throws Exception {
FileSystem fs = FileSystem.get(conf);
Path path = new Path("/user/hadoop");
getFile(path, fs);
}
private static void getFile(Path path, FileSystem fs)throws Exception {
FileStatus[] fileStatus = fs.listStatus(path);
for (int i = 0; i < fileStatus.length; i++) {
if (fileStatus[i].isDir()) {
Path p = new Path(fileStatus[i].getPath().toString());
getFile(p, fs);
} else {
System.out.println(fileStatus[i].getPath().toString());
}
}
}
//判斷文件是否存在
private static boolean isExist(Configuration conf,String path)throws Exception{
FileSystem fileSystem = FileSystem.get(conf);
return fileSystem.exists(new Path(path));
}
//獲取hdfs集群所有主機(jī)結(jié)點數(shù)據(jù)
private static void getAllClusterNodeInfo(Configuration conf)throws Exception{
FileSystem fs = FileSystem.get(conf);
DistributedFileSystem hdfs = (DistributedFileSystem)fs;
DatanodeInfo[] dataNodeStats = hdfs.getDataNodeStats();
String[] names = new String[dataNodeStats.length];
System.out.println("list of all the nodes in HDFS cluster:"); //print info
for(int i=0; i < dataNodeStats.length; i++){
names[i] = dataNodeStats[i].getHostName();
System.out.println(names[i]); //print info
}
}
//get the locations of a file in HDFS
private static void getFileLocation(Configuration conf)throws Exception{
FileSystem fs = FileSystem.get(conf);
Path f = new Path("/user/cluster/dfs.txt");
FileStatus filestatus = fs.getFileStatus(f);
BlockLocation[] blkLocations = fs.getFileBlockLocations(filestatus,0,filestatus.getLen());
int blkCount = blkLocations.length;
for(int i=0; i < blkCount; i++){
String[] hosts = blkLocations[i].getHosts();
//Do sth with the block hosts
System.out.println(hosts);
}
}
//get HDFS file last modification time
private static void getModificationTime(Configuration conf)throws Exception{
FileSystem fs = FileSystem.get(conf);
Path f = new Path("/user/cluster/dfs.txt");
FileStatus filestatus = fs.getFileStatus(f);
long modificationTime = filestatus.getModificationTime(); // measured in milliseconds since the epoch
Date d = new Date(modificationTime);
System.out.println(d);
}
}
以上就是關(guān)于請簡述一下HDFS數(shù)據(jù)讀取和寫入流程相關(guān)問題的回答。希望能幫到你,如有更多相關(guān)問題,您也可以聯(lián)系我們的客服進(jìn)行咨詢,客服也會為您講解更多精彩的知識和內(nèi)容。
推薦閱讀:
請簡述網(wǎng)絡(luò)營銷策略有哪幾種(請簡述網(wǎng)絡(luò)營銷策略有哪幾種形式)
簡述推銷與營銷的區(qū)別和聯(lián)系(請簡述推銷與營銷的區(qū)別和聯(lián)系)