Compare commits
No commits in common. "main" and "spqrk" have entirely different histories.
65
README.md
65
README.md
@ -1,65 +0,0 @@
|
||||
# DataCollections
|
||||
|
||||
Kafka生产 flume采集 sqoop抽取数据到Hive spark处理
|
||||
|
||||
# 大三下学期期末项目
|
||||
## 项目简介
|
||||
- 项目主题10
|
||||
云部署+ 域名 + 服务 。云部署的形式,可以在任何有互联网的地方访问,如 访问百度一样,在连接互联网的机房都可以访问。相比较项目1,云服务器上的环境,可能要重新配置环境,出现问题,可能要练习云客服等才能处理。云服务也意味着收费,所以请慎重选择。云部署形式会增加很多的部署工作,所以评分也会相应的比较高。如果域名申请,时间较长,麻烦的话,可以通过nginx,反向代理通过IP去访问不用域名
|
||||
|
||||
## 项目名称
|
||||
Orders information statistical system( 订单信息统计系统)
|
||||
|
||||
## 项目要求
|
||||
|
||||
1. Kafka data production (kafka 数据生产)
|
||||
这个模块主要用来模拟在实时的业务系统中,用户数据是实时生产的,并推送到kafka 相应的主题中。
|
||||
1) 使用java 代码实现producer,并每隔5s推送以下数据给 kafka的 orders主题
|
||||
需要推送的数据格式,请使用随机数 +数组 的形式进行随机产生,结果如下:
|
||||
5个字段:分别是 订单类别,订单名字,订单数量,日期,顾客评分(很喜欢这个物品100,还行 50, 不喜欢 0 ),是否有效: Y是有效,N无效分隔符可以用 \t 进行拼接。
|
||||
2) 使用 flume 定时采集 相应kafka的数据写入到hdfs 的 /home/flume
|
||||
3) 使用 sqoop 定时将 Spark data processing 所有处理的所有需求数据 抽取到 中央仓库 hive,建立相应的数据库名字 orders。
|
||||
|
||||
|
||||
|
||||
2. Spark data processing
|
||||
选这个模块主要使用 spark mysql ,hive 或者 kafka ,并进行需求处理,并将统计的结果写入相应的数据库表中
|
||||
1) 使用 spark sql / spark streaming 创建消费者读取相应主题的数据
|
||||
2) 使用 spark sql / spark streaming 实时统计 分别统计 所有有效和无效订单号的总和
|
||||
3) 使用 spark sql / spark streaming 实时统计 各个订单号 各自的有效和无效数量
|
||||
4) 使用 spark sql / spark streaming 实时统计 所有订单类别的数量
|
||||
5) 使用 spark sql 统计 各个订单的有效数和无效数的数量
|
||||
6) 使用 spark core/rdd 统计 各个订单的 各个类别的有效和无效数量
|
||||
7) 协同过滤推荐
|
||||
a. 建立 订单-类别的关系矩阵
|
||||
b. 建立 类别-类别 的关系矩阵
|
||||
c. 基于 Item-Base CF 生成每个物品的推荐列表
|
||||
d. 基于 User-Base CF 生成每个顾客的推荐列表
|
||||
提示:前端展示的工具或者 echarts 展示的项目直接读取 mysql ,不用和后端的spark+kafka在同一个项目。前端所有展示的数据指标最好在同一个项目中。
|
||||
3. Data visualization
|
||||
展示学生信息
|
||||
1)注册和登录页面 ,用户信息存储于mysql中
|
||||
|
||||
2) 用户登录后,首页使用echarts静态展示所所有有效和无效数量的各自的总和,以及各自类别的有效和无效数量
|
||||
|
||||
3) 个性化推荐:展示个性化推荐前6的类别名单。基于Item-Base 3个基于User-Base 3个。(基于协同过滤推荐的名单)
|
||||
|
||||
4)离线推荐:展示有效最多的前3的类别
|
||||
|
||||
5)通过搜索展示目标类别,以及相似类别top3 的推荐
|
||||
|
||||
### 项目结构
|
||||
|
||||
### 技术选型
|
||||
|
||||
|
||||
数据生产
|
||||
Java + Apache Kafka
|
||||
数据采集
|
||||
Apache Flume
|
||||
数据抽取
|
||||
Apache Sqoop
|
||||
数据处理
|
||||
Apache Spark (Spark SQL / Spark Streaming / Spark Core)
|
||||
数据存储
|
||||
MySQL + Hive + HDFS
|
92
pom.xml
Normal file
92
pom.xml
Normal file
@ -0,0 +1,92 @@
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<groupId>com.example</groupId>
|
||||
<artifactId>OrderInformationData</artifactId>
|
||||
<version>1.0-SNAPSHOT</version>
|
||||
<packaging>jar</packaging>
|
||||
|
||||
<name>OrderInformationData</name>
|
||||
<url>http://maven.apache.org</url>
|
||||
|
||||
|
||||
<properties>
|
||||
<maven.compiler.source>8</maven.compiler.source>
|
||||
<maven.compiler.target>8</maven.compiler.target>
|
||||
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
||||
<scala.version>2.13.0</scala.version>
|
||||
<spark.version>3.4.3</spark.version>
|
||||
<hadoop.version>3.3.0</hadoop.version>
|
||||
<encoding>UTF-8</encoding>
|
||||
</properties>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.apache.kafka</groupId>
|
||||
<artifactId>kafka-clients</artifactId>
|
||||
<version>3.0.2</version>
|
||||
</dependency>
|
||||
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.spark</groupId>
|
||||
<artifactId>spark-core_2.13</artifactId>
|
||||
<version>${spark.version}</version>
|
||||
</dependency>
|
||||
|
||||
<!-- 导入spark-streaming的依赖 -->
|
||||
<dependency>
|
||||
<groupId>org.apache.spark</groupId>
|
||||
<artifactId>spark-streaming_2.13</artifactId>
|
||||
<version>${spark.version}</version>
|
||||
</dependency>
|
||||
|
||||
<!-- 导入sparkstreaming-kafka依赖 -->
|
||||
<dependency>
|
||||
<groupId>org.apache.spark</groupId>
|
||||
<artifactId>spark-sql-kafka-0-10_2.13</artifactId>
|
||||
<version>${spark.version}</version>
|
||||
</dependency>
|
||||
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.spark</groupId>
|
||||
<artifactId>spark-sql_2.13</artifactId>
|
||||
<version>${spark.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.spark</groupId>
|
||||
<artifactId>spark-mllib_2.13</artifactId>
|
||||
<version>${spark.version}</version>
|
||||
<scope>compile</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>mysql</groupId>
|
||||
<artifactId>mysql-connector-java</artifactId>
|
||||
<version>8.0.22</version>
|
||||
</dependency>
|
||||
|
||||
|
||||
</dependencies>
|
||||
<build>
|
||||
|
||||
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-maven-plugin</artifactId>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-compiler-plugin</artifactId>
|
||||
<configuration>
|
||||
<source>8</source>
|
||||
<target>8</target>
|
||||
</configuration>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
</project>
|
37
src/main/java/com/example/kafka/KafkaProducerApp.java
Normal file
37
src/main/java/com/example/kafka/KafkaProducerApp.java
Normal file
@ -0,0 +1,37 @@
|
||||
package com.example.kafka;
|
||||
|
||||
import org.apache.kafka.clients.producer.KafkaProducer;
|
||||
import org.apache.kafka.clients.producer.Producer;
|
||||
import org.apache.kafka.clients.producer.ProducerConfig;
|
||||
import org.apache.kafka.clients.producer.ProducerRecord;
|
||||
import org.apache.kafka.common.serialization.StringSerializer;
|
||||
|
||||
import java.util.Properties;
|
||||
import java.util.Timer;
|
||||
import java.util.TimerTask;
|
||||
|
||||
public class KafkaProducerApp {
|
||||
|
||||
public static void main(String[] args) {
|
||||
String bootstrapServers = "172.16.5.3:2181"; // 替换远程 Kafka 地址
|
||||
String topic = "orders";
|
||||
|
||||
Properties props = new Properties();
|
||||
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
|
||||
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
|
||||
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
|
||||
|
||||
Producer<String, String> producer = new KafkaProducer<>(props);
|
||||
|
||||
Timer timer = new Timer();
|
||||
timer.scheduleAtFixedRate(new TimerTask() {
|
||||
@Override
|
||||
public void run() {
|
||||
String message = OrderDataGenerator.generateOrder();
|
||||
ProducerRecord<String, String> record = new ProducerRecord<>(topic, message);
|
||||
producer.send(record);
|
||||
System.out.println("Sent to Kafka: " + message);
|
||||
}
|
||||
}, 0, 5000); // 每隔5秒发送一次
|
||||
}
|
||||
}
|
69
src/main/java/com/example/kafka/OrderDataGenerator.java
Normal file
69
src/main/java/com/example/kafka/OrderDataGenerator.java
Normal file
@ -0,0 +1,69 @@
|
||||
package com.example.kafka;
|
||||
|
||||
import java.text.SimpleDateFormat;
|
||||
import java.util.Date;
|
||||
import java.util.Random;
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
|
||||
import java.text.SimpleDateFormat;
|
||||
import java.util.Date;
|
||||
import java.util.Random;
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
|
||||
public class OrderDataGenerator {
|
||||
|
||||
// 订单类别定义
|
||||
private static final String[] CATEGORIES = {"电器", "服饰", "食品", "玩具", "手机"};
|
||||
// 每个类别对应的订单名称
|
||||
private static final String[][] PRODUCT_NAMES = {
|
||||
{"电视", "冰箱", "洗衣机", "空调", "吸尘器", "电饭煲", "微波炉", "电磁炉", "热水器", "空气净化器"}, // 电器
|
||||
{"T恤", "牛仔裤", "羽绒服", "衬衫", "运动鞋", "夹克", "卫衣", "连衣裙", "短裤", "风衣"}, // 服饰
|
||||
{"巧克力", "饼干", "方便面", "牛奶", "饮料", "面包", "糖果", "果冻", "薯片", "蛋糕"}, // 食品
|
||||
{"积木", "拼图", "玩偶", "遥控车", "毛绒玩具", "魔方", "乐高", "变形金刚", "洋娃娃", "电子琴"}, // 玩具
|
||||
{"华为", "苹果", "小米", "OPPO", "vivo", "荣耀", "三星", "魅族", "联想", "努比亚"} // 手机
|
||||
};
|
||||
|
||||
private static final Random random = new Random();
|
||||
private static final SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
|
||||
|
||||
// 用于记录已生成的 categoryIndex,保证唯一性
|
||||
private static final Set<Integer> generatedCategoryIndexes = new HashSet<>();
|
||||
|
||||
/**
|
||||
* 生成一条订单记录,并确保 categoryIndex 唯一
|
||||
*/
|
||||
public static String generateOrder() {
|
||||
int categoryIndex;
|
||||
// 确保生成的 categoryIndex 未被使用过
|
||||
do {
|
||||
categoryIndex = random.nextInt(CATEGORIES.length);
|
||||
} while (generatedCategoryIndexes.contains(categoryIndex));
|
||||
|
||||
// 添加到已生成集合
|
||||
generatedCategoryIndexes.add(categoryIndex);
|
||||
|
||||
String category = CATEGORIES[categoryIndex];
|
||||
String name = PRODUCT_NAMES[categoryIndex][random.nextInt(10)];
|
||||
int quantity = random.nextInt(10) + 1;
|
||||
String date = sdf.format(new Date());
|
||||
int rating = new Random().nextInt(3) * 50; // 0, 50, 100
|
||||
String isValid = random.nextBoolean() ? "Y" : "N";
|
||||
|
||||
return String.join("\t",
|
||||
category,
|
||||
name,
|
||||
String.valueOf(quantity),
|
||||
date,
|
||||
String.valueOf(rating),
|
||||
isValid);
|
||||
}
|
||||
|
||||
/**
|
||||
* 清空已生成的 categoryIndex 记录,以便下一轮生成
|
||||
*/
|
||||
public static void resetCategoryIndexTracker() {
|
||||
generatedCategoryIndexes.clear();
|
||||
}
|
||||
}
|
934
src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java
Normal file
934
src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java
Normal file
@ -0,0 +1,934 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.io.nativeio;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||
import org.apache.hadoop.fs.HardLink;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.io.SecureIOUtils.AlreadyExistsException;
|
||||
import org.apache.hadoop.util.NativeCodeLoader;
|
||||
import org.apache.hadoop.util.PerformanceAdvisory;
|
||||
import org.apache.hadoop.util.Shell;
|
||||
import sun.misc.Unsafe;
|
||||
|
||||
import java.io.*;
|
||||
import java.lang.reflect.Field;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.MappedByteBuffer;
|
||||
import java.nio.channels.FileChannel;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
/**
|
||||
* JNI wrappers for various native IO-related calls not available in Java.
|
||||
* These functions should generally be used alongside a fallback to another
|
||||
* more portable mechanism.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Unstable
|
||||
public class NativeIO {
|
||||
public static class POSIX {
|
||||
// Flags for open() call from bits/fcntl.h
|
||||
public static final int O_RDONLY = 00;
|
||||
public static final int O_WRONLY = 01;
|
||||
public static final int O_RDWR = 02;
|
||||
public static final int O_CREAT = 0100;
|
||||
public static final int O_EXCL = 0200;
|
||||
public static final int O_NOCTTY = 0400;
|
||||
public static final int O_TRUNC = 01000;
|
||||
public static final int O_APPEND = 02000;
|
||||
public static final int O_NONBLOCK = 04000;
|
||||
public static final int O_SYNC = 010000;
|
||||
public static final int O_ASYNC = 020000;
|
||||
public static final int O_FSYNC = O_SYNC;
|
||||
public static final int O_NDELAY = O_NONBLOCK;
|
||||
|
||||
// Flags for posix_fadvise() from bits/fcntl.h
|
||||
/* No further special treatment. */
|
||||
public static final int POSIX_FADV_NORMAL = 0;
|
||||
/* Expect random page references. */
|
||||
public static final int POSIX_FADV_RANDOM = 1;
|
||||
/* Expect sequential page references. */
|
||||
public static final int POSIX_FADV_SEQUENTIAL = 2;
|
||||
/* Will need these pages. */
|
||||
public static final int POSIX_FADV_WILLNEED = 3;
|
||||
/* Don't need these pages. */
|
||||
public static final int POSIX_FADV_DONTNEED = 4;
|
||||
/* Data will be accessed once. */
|
||||
public static final int POSIX_FADV_NOREUSE = 5;
|
||||
/* Wait upon writeout of all pages
|
||||
in the range before performing the
|
||||
write. */
|
||||
public static final int SYNC_FILE_RANGE_WAIT_BEFORE = 1;
|
||||
/* Initiate writeout of all those
|
||||
dirty pages in the range which are
|
||||
not presently under writeback. */
|
||||
public static final int SYNC_FILE_RANGE_WRITE = 2;
|
||||
/* Wait upon writeout of all pages in
|
||||
the range after performing the
|
||||
write. */
|
||||
public static final int SYNC_FILE_RANGE_WAIT_AFTER = 4;
|
||||
private static final Log LOG = LogFactory.getLog(NativeIO.class);
|
||||
private static boolean nativeLoaded = false;
|
||||
private static boolean fadvisePossible = true;
|
||||
private static boolean syncFileRangePossible = true;
|
||||
static final String WORKAROUND_NON_THREADSAFE_CALLS_KEY =
|
||||
"hadoop.workaround.non.threadsafe.getpwuid";
|
||||
static final boolean WORKAROUND_NON_THREADSAFE_CALLS_DEFAULT = true;
|
||||
private static long cacheTimeout = -1;
|
||||
private static CacheManipulator cacheManipulator = new CacheManipulator();
|
||||
public static CacheManipulator getCacheManipulator() {
|
||||
return cacheManipulator;
|
||||
}
|
||||
public static void setCacheManipulator(CacheManipulator cacheManipulator) {
|
||||
POSIX.cacheManipulator = cacheManipulator;
|
||||
}
|
||||
/**
|
||||
* Used to manipulate the operating system cache.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
public static class CacheManipulator {
|
||||
public void mlock(String identifier, ByteBuffer buffer,
|
||||
long len) throws IOException {
|
||||
POSIX.mlock(buffer, len);
|
||||
}
|
||||
public long getMemlockLimit() {
|
||||
return NativeIO.getMemlockLimit();
|
||||
}
|
||||
public long getOperatingSystemPageSize() {
|
||||
return NativeIO.getOperatingSystemPageSize();
|
||||
}
|
||||
public void posixFadviseIfPossible(String identifier,
|
||||
FileDescriptor fd, long offset, long len, int flags)
|
||||
throws NativeIOException {
|
||||
POSIX.posixFadviseIfPossible(identifier, fd, offset,
|
||||
len, flags);
|
||||
}
|
||||
public boolean verifyCanMlock() {
|
||||
return NativeIO.isAvailable();
|
||||
}
|
||||
}
|
||||
/**
|
||||
* A CacheManipulator used for testing which does not actually call mlock.
|
||||
* This allows many tests to be run even when the operating system does not
|
||||
* allow mlock, or only allows limited mlocking.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
public static class NoMlockCacheManipulator extends CacheManipulator {
|
||||
public void mlock(String identifier, ByteBuffer buffer,
|
||||
long len) throws IOException {
|
||||
LOG.info("mlocking " + identifier);
|
||||
}
|
||||
public long getMemlockLimit() {
|
||||
return 1125899906842624L;
|
||||
}
|
||||
public long getOperatingSystemPageSize() {
|
||||
return 4096;
|
||||
}
|
||||
public boolean verifyCanMlock() {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
static {
|
||||
if (NativeCodeLoader.isNativeCodeLoaded()) {
|
||||
try {
|
||||
Configuration conf = new Configuration();
|
||||
workaroundNonThreadSafePasswdCalls = conf.getBoolean(
|
||||
WORKAROUND_NON_THREADSAFE_CALLS_KEY,
|
||||
WORKAROUND_NON_THREADSAFE_CALLS_DEFAULT);
|
||||
initNative();
|
||||
nativeLoaded = true;
|
||||
cacheTimeout = conf.getLong(
|
||||
CommonConfigurationKeys.HADOOP_SECURITY_UID_NAME_CACHE_TIMEOUT_KEY,
|
||||
CommonConfigurationKeys.HADOOP_SECURITY_UID_NAME_CACHE_TIMEOUT_DEFAULT) *
|
||||
1000;
|
||||
LOG.debug("Initialized cache for IDs to User/Group mapping with a " +
|
||||
" cache timeout of " + cacheTimeout/1000 + " seconds.");
|
||||
} catch (Throwable t) {
|
||||
// This can happen if the user has an older version of libhadoop.so
|
||||
// installed - in this case we can continue without native IO
|
||||
// after warning
|
||||
PerformanceAdvisory.LOG.debug("Unable to initialize nativeio libraries", t);
|
||||
}
|
||||
}
|
||||
}
|
||||
/**
|
||||
* Return true if the JNI-based native IO extensions are available.
|
||||
*/
|
||||
public static boolean isAvailable() {
|
||||
return NativeCodeLoader.isNativeCodeLoaded() && nativeLoaded;
|
||||
}
|
||||
private static void assertCodeLoaded() throws IOException {
|
||||
if (!isAvailable()) {
|
||||
throw new IOException("nativeio was not loaded");
|
||||
}
|
||||
}
|
||||
/** Wrapper around open(2) */
|
||||
public static native FileDescriptor open(String path, int flags, int mode) throws IOException;
|
||||
/** Wrapper around fstat(2) */
|
||||
private static native Stat fstat(FileDescriptor fd) throws IOException;
|
||||
/** Native chmod implementation. On UNIX, it is a wrapper around chmod(2) */
|
||||
private static native void chmodImpl(String path, int mode) throws IOException;
|
||||
public static void chmod(String path, int mode) throws IOException {
|
||||
if (!Shell.WINDOWS) {
|
||||
chmodImpl(path, mode);
|
||||
} else {
|
||||
try {
|
||||
chmodImpl(path, mode);
|
||||
} catch (NativeIOException nioe) {
|
||||
if (nioe.getErrorCode() == 3) {
|
||||
throw new NativeIOException("No such file or directory",
|
||||
Errno.ENOENT);
|
||||
} else {
|
||||
LOG.warn(String.format("nativeio.chmod error (%d): %s",
|
||||
nioe.getErrorCode(), nioe.getMessage()));
|
||||
throw new NativeIOException("Unknown error", Errno.UNKNOWN);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
/** Wrapper around posix_fadvise(2) */
|
||||
static native void posix_fadvise(
|
||||
FileDescriptor fd, long offset, long len, int flags) throws NativeIOException;
|
||||
/** Wrapper around sync_file_range(2) */
|
||||
static native void sync_file_range(
|
||||
FileDescriptor fd, long offset, long nbytes, int flags) throws NativeIOException;
|
||||
/**
|
||||
* Call posix_fadvise on the given file descriptor. See the manpage
|
||||
* for this syscall for more information. On systems where this
|
||||
* call is not available, does nothing.
|
||||
*
|
||||
* @throws NativeIOException if there is an error with the syscall
|
||||
*/
|
||||
static void posixFadviseIfPossible(String identifier,
|
||||
FileDescriptor fd, long offset, long len, int flags)
|
||||
throws NativeIOException {
|
||||
if (nativeLoaded && fadvisePossible) {
|
||||
try {
|
||||
posix_fadvise(fd, offset, len, flags);
|
||||
} catch (UnsupportedOperationException uoe) {
|
||||
fadvisePossible = false;
|
||||
} catch (UnsatisfiedLinkError ule) {
|
||||
fadvisePossible = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
/**
|
||||
* Call sync_file_range on the given file descriptor. See the manpage
|
||||
* for this syscall for more information. On systems where this
|
||||
* call is not available, does nothing.
|
||||
*
|
||||
* @throws NativeIOException if there is an error with the syscall
|
||||
*/
|
||||
public static void syncFileRangeIfPossible(
|
||||
FileDescriptor fd, long offset, long nbytes, int flags)
|
||||
throws NativeIOException {
|
||||
if (nativeLoaded && syncFileRangePossible) {
|
||||
try {
|
||||
sync_file_range(fd, offset, nbytes, flags);
|
||||
} catch (UnsupportedOperationException uoe) {
|
||||
syncFileRangePossible = false;
|
||||
} catch (UnsatisfiedLinkError ule) {
|
||||
syncFileRangePossible = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
static native void mlock_native(
|
||||
ByteBuffer buffer, long len) throws NativeIOException;
|
||||
/**
|
||||
* Locks the provided direct ByteBuffer into memory, preventing it from
|
||||
* swapping out. After a buffer is locked, future accesses will not incur
|
||||
* a page fault.
|
||||
*
|
||||
* See the mlock(2) man page for more information.
|
||||
*
|
||||
* @throws NativeIOException
|
||||
*/
|
||||
static void mlock(ByteBuffer buffer, long len)
|
||||
throws IOException {
|
||||
assertCodeLoaded();
|
||||
if (!buffer.isDirect()) {
|
||||
throw new IOException("Cannot mlock a non-direct ByteBuffer");
|
||||
}
|
||||
mlock_native(buffer, len);
|
||||
}
|
||||
|
||||
/**
|
||||
* Unmaps the block from memory. See munmap(2).
|
||||
*
|
||||
* There isn't any portable way to unmap a memory region in Java.
|
||||
* So we use the sun.nio method here.
|
||||
* Note that unmapping a memory region could cause crashes if code
|
||||
* continues to reference the unmapped code. However, if we don't
|
||||
* manually unmap the memory, we are dependent on the finalizer to
|
||||
* do it, and we have no idea when the finalizer will run.
|
||||
*
|
||||
* @param buffer The buffer to unmap.
|
||||
*/
|
||||
public static void munmap(MappedByteBuffer buffer) {
|
||||
if (buffer instanceof sun.nio.ch.DirectBuffer) {
|
||||
sun.misc.Cleaner cleaner =
|
||||
((sun.nio.ch.DirectBuffer)buffer).cleaner();
|
||||
cleaner.clean();
|
||||
}
|
||||
}
|
||||
|
||||
/** Linux only methods used for getOwner() implementation */
|
||||
private static native long getUIDforFDOwnerforOwner(FileDescriptor fd) throws IOException;
|
||||
private static native String getUserName(long uid) throws IOException;
|
||||
|
||||
/**
|
||||
* Result type of the fstat call
|
||||
*/
|
||||
public static class Stat {
|
||||
private int ownerId, groupId;
|
||||
private String owner, group;
|
||||
private int mode;
|
||||
|
||||
// Mode constants
|
||||
public static final int S_IFMT = 0170000; /* type of file */
|
||||
public static final int S_IFIFO = 0010000; /* named pipe (fifo) */
|
||||
public static final int S_IFCHR = 0020000; /* character special */
|
||||
public static final int S_IFDIR = 0040000; /* directory */
|
||||
public static final int S_IFBLK = 0060000; /* block special */
|
||||
public static final int S_IFREG = 0100000; /* regular */
|
||||
public static final int S_IFLNK = 0120000; /* symbolic link */
|
||||
public static final int S_IFSOCK = 0140000; /* socket */
|
||||
public static final int S_IFWHT = 0160000; /* whiteout */
|
||||
public static final int S_ISUID = 0004000; /* set user id on execution */
|
||||
public static final int S_ISGID = 0002000; /* set group id on execution */
|
||||
public static final int S_ISVTX = 0001000; /* save swapped text even after use */
|
||||
public static final int S_IRUSR = 0000400; /* read permission, owner */
|
||||
public static final int S_IWUSR = 0000200; /* write permission, owner */
|
||||
public static final int S_IXUSR = 0000100; /* execute/search permission, owner */
|
||||
|
||||
Stat(int ownerId, int groupId, int mode) {
|
||||
this.ownerId = ownerId;
|
||||
this.groupId = groupId;
|
||||
this.mode = mode;
|
||||
}
|
||||
|
||||
Stat(String owner, String group, int mode) {
|
||||
if (!Shell.WINDOWS) {
|
||||
this.owner = owner;
|
||||
} else {
|
||||
this.owner = stripDomain(owner);
|
||||
}
|
||||
if (!Shell.WINDOWS) {
|
||||
this.group = group;
|
||||
} else {
|
||||
this.group = stripDomain(group);
|
||||
}
|
||||
this.mode = mode;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "Stat(owner='" + owner + "', group='" + group + "'" +
|
||||
", mode=" + mode + ")";
|
||||
}
|
||||
|
||||
public String getOwner() {
|
||||
return owner;
|
||||
}
|
||||
public String getGroup() {
|
||||
return group;
|
||||
}
|
||||
public int getMode() {
|
||||
return mode;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the file stat for a file descriptor.
|
||||
*
|
||||
* @param fd file descriptor.
|
||||
* @return the file descriptor file stat.
|
||||
* @throws IOException thrown if there was an IO error while obtaining the file stat.
|
||||
*/
|
||||
public static Stat getFstat(FileDescriptor fd) throws IOException {
|
||||
Stat stat = null;
|
||||
if (!Shell.WINDOWS) {
|
||||
stat = fstat(fd);
|
||||
stat.owner = getName(IdCache.USER, stat.ownerId);
|
||||
stat.group = getName(IdCache.GROUP, stat.groupId);
|
||||
} else {
|
||||
try {
|
||||
stat = fstat(fd);
|
||||
} catch (NativeIOException nioe) {
|
||||
if (nioe.getErrorCode() == 6) {
|
||||
throw new NativeIOException("The handle is invalid.",
|
||||
Errno.EBADF);
|
||||
} else {
|
||||
LOG.warn(String.format("nativeio.getFstat error (%d): %s",
|
||||
nioe.getErrorCode(), nioe.getMessage()));
|
||||
throw new NativeIOException("Unknown error", Errno.UNKNOWN);
|
||||
}
|
||||
}
|
||||
}
|
||||
return stat;
|
||||
}
|
||||
|
||||
private static String getName(IdCache domain, int id) throws IOException {
|
||||
Map<Integer, CachedName> idNameCache = (domain == IdCache.USER)
|
||||
? USER_ID_NAME_CACHE : GROUP_ID_NAME_CACHE;
|
||||
String name;
|
||||
CachedName cachedName = idNameCache.get(id);
|
||||
long now = System.currentTimeMillis();
|
||||
if (cachedName != null && (cachedName.timestamp + cacheTimeout) > now) {
|
||||
name = cachedName.name;
|
||||
} else {
|
||||
name = (domain == IdCache.USER) ? getUserName(id) : getGroupName(id);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
String type = (domain == IdCache.USER) ? "UserName" : "GroupName";
|
||||
LOG.debug("Got " + type + " " + name + " for ID " + id +
|
||||
" from the native implementation");
|
||||
}
|
||||
cachedName = new CachedName(name, now);
|
||||
idNameCache.put(id, cachedName);
|
||||
}
|
||||
return name;
|
||||
}
|
||||
|
||||
static native String getUserName(int uid) throws IOException;
|
||||
static native String getGroupName(int uid) throws IOException;
|
||||
|
||||
private static class CachedName {
|
||||
final long timestamp;
|
||||
final String name;
|
||||
|
||||
public CachedName(String name, long timestamp) {
|
||||
this.name = name;
|
||||
this.timestamp = timestamp;
|
||||
}
|
||||
}
|
||||
|
||||
private static final Map<Integer, CachedName> USER_ID_NAME_CACHE =
|
||||
new ConcurrentHashMap<Integer, CachedName>();
|
||||
|
||||
private static final Map<Integer, CachedName> GROUP_ID_NAME_CACHE =
|
||||
new ConcurrentHashMap<Integer, CachedName>();
|
||||
|
||||
private enum IdCache { USER, GROUP }
|
||||
|
||||
public final static int MMAP_PROT_READ = 0x1;
|
||||
public final static int MMAP_PROT_WRITE = 0x2;
|
||||
public final static int MMAP_PROT_EXEC = 0x4;
|
||||
|
||||
public static native long mmap(FileDescriptor fd, int prot,
|
||||
boolean shared, long length) throws IOException;
|
||||
|
||||
public static native void munmap(long addr, long length)
|
||||
throws IOException;
|
||||
}
|
||||
|
||||
private static boolean workaroundNonThreadSafePasswdCalls = false;
|
||||
|
||||
|
||||
public static class Windows {
|
||||
// Flags for CreateFile() call on Windows
|
||||
public static final long GENERIC_READ = 0x80000000L;
|
||||
public static final long GENERIC_WRITE = 0x40000000L;
|
||||
|
||||
public static final long FILE_SHARE_READ = 0x00000001L;
|
||||
public static final long FILE_SHARE_WRITE = 0x00000002L;
|
||||
public static final long FILE_SHARE_DELETE = 0x00000004L;
|
||||
|
||||
public static final long CREATE_NEW = 1;
|
||||
public static final long CREATE_ALWAYS = 2;
|
||||
public static final long OPEN_EXISTING = 3;
|
||||
public static final long OPEN_ALWAYS = 4;
|
||||
public static final long TRUNCATE_EXISTING = 5;
|
||||
|
||||
public static final long FILE_BEGIN = 0;
|
||||
public static final long FILE_CURRENT = 1;
|
||||
public static final long FILE_END = 2;
|
||||
|
||||
public static final long FILE_ATTRIBUTE_NORMAL = 0x00000080L;
|
||||
|
||||
/**
|
||||
* Create a directory with permissions set to the specified mode. By setting
|
||||
* permissions at creation time, we avoid issues related to the user lacking
|
||||
* WRITE_DAC rights on subsequent chmod calls. One example where this can
|
||||
* occur is writing to an SMB share where the user does not have Full Control
|
||||
* rights, and therefore WRITE_DAC is denied.
|
||||
*
|
||||
* @param path directory to create
|
||||
* @param mode permissions of new directory
|
||||
* @throws IOException if there is an I/O error
|
||||
*/
|
||||
public static void createDirectoryWithMode(File path, int mode)
|
||||
throws IOException {
|
||||
createDirectoryWithMode0(path.getAbsolutePath(), mode);
|
||||
}
|
||||
|
||||
/** Wrapper around CreateDirectory() on Windows */
|
||||
private static native void createDirectoryWithMode0(String path, int mode)
|
||||
throws NativeIOException;
|
||||
|
||||
/** Wrapper around CreateFile() on Windows */
|
||||
public static native FileDescriptor createFile(String path,
|
||||
long desiredAccess, long shareMode, long creationDisposition)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
* Create a file for write with permissions set to the specified mode. By
|
||||
* setting permissions at creation time, we avoid issues related to the user
|
||||
* lacking WRITE_DAC rights on subsequent chmod calls. One example where
|
||||
* this can occur is writing to an SMB share where the user does not have
|
||||
* Full Control rights, and therefore WRITE_DAC is denied.
|
||||
*
|
||||
* This method mimics the semantics implemented by the JDK in
|
||||
* {@link FileOutputStream}. The file is opened for truncate or
|
||||
* append, the sharing mode allows other readers and writers, and paths
|
||||
* longer than MAX_PATH are supported. (See io_util_md.c in the JDK.)
|
||||
*
|
||||
* @param path file to create
|
||||
* @param append if true, then open file for append
|
||||
* @param mode permissions of new directory
|
||||
* @return FileOutputStream of opened file
|
||||
* @throws IOException if there is an I/O error
|
||||
*/
|
||||
public static FileOutputStream createFileOutputStreamWithMode(File path,
|
||||
boolean append, int mode) throws IOException {
|
||||
long desiredAccess = GENERIC_WRITE;
|
||||
long shareMode = FILE_SHARE_READ | FILE_SHARE_WRITE;
|
||||
long creationDisposition = append ? OPEN_ALWAYS : CREATE_ALWAYS;
|
||||
return new FileOutputStream(createFileWithMode0(path.getAbsolutePath(),
|
||||
desiredAccess, shareMode, creationDisposition, mode));
|
||||
}
|
||||
|
||||
/** Wrapper around CreateFile() with security descriptor on Windows */
|
||||
private static native FileDescriptor createFileWithMode0(String path,
|
||||
long desiredAccess, long shareMode, long creationDisposition, int mode)
|
||||
throws NativeIOException;
|
||||
|
||||
/** Wrapper around SetFilePointer() on Windows */
|
||||
public static native long setFilePointer(FileDescriptor fd,
|
||||
long distanceToMove, long moveMethod) throws IOException;
|
||||
|
||||
/** Windows only methods used for getOwner() implementation */
|
||||
private static native String getOwner(FileDescriptor fd) throws IOException;
|
||||
|
||||
/** Supported list of Windows access right flags */
|
||||
public static enum AccessRight {
|
||||
ACCESS_READ (0x0001), // FILE_READ_DATA
|
||||
ACCESS_WRITE (0x0002), // FILE_WRITE_DATA
|
||||
ACCESS_EXECUTE (0x0020); // FILE_EXECUTE
|
||||
|
||||
private final int accessRight;
|
||||
AccessRight(int access) {
|
||||
accessRight = access;
|
||||
}
|
||||
|
||||
public int accessRight() {
|
||||
return accessRight;
|
||||
}
|
||||
};
|
||||
|
||||
/** Windows only method used to check if the current process has requested
|
||||
* access rights on the given path. */
|
||||
private static native boolean access0(String path, int requestedAccess);
|
||||
|
||||
/**
|
||||
* Checks whether the current process has desired access rights on
|
||||
* the given path.
|
||||
*
|
||||
* Longer term this native function can be substituted with JDK7
|
||||
* function Files#isReadable, isWritable, isExecutable.
|
||||
*
|
||||
* @param path input path
|
||||
* @param desiredAccess ACCESS_READ, ACCESS_WRITE or ACCESS_EXECUTE
|
||||
* @return true if access is allowed
|
||||
* @throws IOException I/O exception on error
|
||||
*/
|
||||
public static boolean access(String path, AccessRight desiredAccess)
|
||||
throws IOException {
|
||||
|
||||
return true;
|
||||
// return access0(path, desiredAccess.accessRight());
|
||||
}
|
||||
|
||||
/**
|
||||
* Extends both the minimum and maximum working set size of the current
|
||||
* process. This method gets the current minimum and maximum working set
|
||||
* size, adds the requested amount to each and then sets the minimum and
|
||||
* maximum working set size to the new values. Controlling the working set
|
||||
* size of the process also controls the amount of memory it can lock.
|
||||
*
|
||||
* @param delta amount to increment minimum and maximum working set size
|
||||
* @throws IOException for any error
|
||||
* @see POSIX#mlock(ByteBuffer, long)
|
||||
*/
|
||||
public static native void extendWorkingSetSize(long delta) throws IOException;
|
||||
|
||||
static {
|
||||
if (NativeCodeLoader.isNativeCodeLoaded()) {
|
||||
try {
|
||||
initNative();
|
||||
nativeLoaded = true;
|
||||
} catch (Throwable t) {
|
||||
// This can happen if the user has an older version of libhadoop.so
|
||||
// installed - in this case we can continue without native IO
|
||||
// after warning
|
||||
PerformanceAdvisory.LOG.debug("Unable to initialize nativeio libraries", t);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(NativeIO.class);
|
||||
|
||||
private static boolean nativeLoaded = false;
|
||||
|
||||
static {
|
||||
if (NativeCodeLoader.isNativeCodeLoaded()) {
|
||||
try {
|
||||
initNative();
|
||||
nativeLoaded = true;
|
||||
} catch (Throwable t) {
|
||||
// This can happen if the user has an older version of libhadoop.so
|
||||
// installed - in this case we can continue without native IO
|
||||
// after warning
|
||||
PerformanceAdvisory.LOG.debug("Unable to initialize nativeio libraries", t);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Return true if the JNI-based native IO extensions are available.
|
||||
*/
|
||||
public static boolean isAvailable() {
|
||||
return NativeCodeLoader.isNativeCodeLoaded() && nativeLoaded;
|
||||
}
|
||||
|
||||
/** Initialize the JNI method ID and class ID cache */
|
||||
private static native void initNative();
|
||||
|
||||
/**
|
||||
* Get the maximum number of bytes that can be locked into memory at any
|
||||
* given point.
|
||||
*
|
||||
* @return 0 if no bytes can be locked into memory;
|
||||
* Long.MAX_VALUE if there is no limit;
|
||||
* The number of bytes that can be locked into memory otherwise.
|
||||
*/
|
||||
static long getMemlockLimit() {
|
||||
return isAvailable() ? getMemlockLimit0() : 0;
|
||||
}
|
||||
|
||||
private static native long getMemlockLimit0();
|
||||
|
||||
/**
|
||||
* @return the operating system's page size.
|
||||
*/
|
||||
static long getOperatingSystemPageSize() {
|
||||
try {
|
||||
Field f = Unsafe.class.getDeclaredField("theUnsafe");
|
||||
f.setAccessible(true);
|
||||
Unsafe unsafe = (Unsafe)f.get(null);
|
||||
return unsafe.pageSize();
|
||||
} catch (Throwable e) {
|
||||
LOG.warn("Unable to get operating system page size. Guessing 4096.", e);
|
||||
return 4096;
|
||||
}
|
||||
}
|
||||
|
||||
private static class CachedUid {
|
||||
final long timestamp;
|
||||
final String username;
|
||||
public CachedUid(String username, long timestamp) {
|
||||
this.timestamp = timestamp;
|
||||
this.username = username;
|
||||
}
|
||||
}
|
||||
private static final Map<Long, CachedUid> uidCache =
|
||||
new ConcurrentHashMap<Long, CachedUid>();
|
||||
private static long cacheTimeout;
|
||||
private static boolean initialized = false;
|
||||
|
||||
/**
|
||||
* The Windows logon name has two part, NetBIOS domain name and
|
||||
* user account name, of the format DOMAIN\UserName. This method
|
||||
* will remove the domain part of the full logon name.
|
||||
*
|
||||
* @param Fthe full principal name containing the domain
|
||||
* @return name with domain removed
|
||||
*/
|
||||
private static String stripDomain(String name) {
|
||||
int i = name.indexOf('\\');
|
||||
if (i != -1)
|
||||
name = name.substring(i + 1);
|
||||
return name;
|
||||
}
|
||||
|
||||
public static String getOwner(FileDescriptor fd) throws IOException {
|
||||
ensureInitialized();
|
||||
if (Shell.WINDOWS) {
|
||||
String owner = Windows.getOwner(fd);
|
||||
owner = stripDomain(owner);
|
||||
return owner;
|
||||
} else {
|
||||
long uid = POSIX.getUIDforFDOwnerforOwner(fd);
|
||||
CachedUid cUid = uidCache.get(uid);
|
||||
long now = System.currentTimeMillis();
|
||||
if (cUid != null && (cUid.timestamp + cacheTimeout) > now) {
|
||||
return cUid.username;
|
||||
}
|
||||
String user = POSIX.getUserName(uid);
|
||||
LOG.info("Got UserName " + user + " for UID " + uid
|
||||
+ " from the native implementation");
|
||||
cUid = new CachedUid(user, now);
|
||||
uidCache.put(uid, cUid);
|
||||
return user;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a FileInputStream that shares delete permission on the
|
||||
* file opened, i.e. other process can delete the file the
|
||||
* FileInputStream is reading. Only Windows implementation uses
|
||||
* the native interface.
|
||||
*/
|
||||
public static FileInputStream getShareDeleteFileInputStream(File f)
|
||||
throws IOException {
|
||||
if (!Shell.WINDOWS) {
|
||||
// On Linux the default FileInputStream shares delete permission
|
||||
// on the file opened.
|
||||
//
|
||||
return new FileInputStream(f);
|
||||
} else {
|
||||
// Use Windows native interface to create a FileInputStream that
|
||||
// shares delete permission on the file opened.
|
||||
//
|
||||
FileDescriptor fd = Windows.createFile(
|
||||
f.getAbsolutePath(),
|
||||
Windows.GENERIC_READ,
|
||||
Windows.FILE_SHARE_READ |
|
||||
Windows.FILE_SHARE_WRITE |
|
||||
Windows.FILE_SHARE_DELETE,
|
||||
Windows.OPEN_EXISTING);
|
||||
return new FileInputStream(fd);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a FileInputStream that shares delete permission on the
|
||||
* file opened at a given offset, i.e. other process can delete
|
||||
* the file the FileInputStream is reading. Only Windows implementation
|
||||
* uses the native interface.
|
||||
*/
|
||||
public static FileInputStream getShareDeleteFileInputStream(File f, long seekOffset)
|
||||
throws IOException {
|
||||
if (!Shell.WINDOWS) {
|
||||
RandomAccessFile rf = new RandomAccessFile(f, "r");
|
||||
if (seekOffset > 0) {
|
||||
rf.seek(seekOffset);
|
||||
}
|
||||
return new FileInputStream(rf.getFD());
|
||||
} else {
|
||||
// Use Windows native interface to create a FileInputStream that
|
||||
// shares delete permission on the file opened, and set it to the
|
||||
// given offset.
|
||||
//
|
||||
FileDescriptor fd = Windows.createFile(
|
||||
f.getAbsolutePath(),
|
||||
Windows.GENERIC_READ,
|
||||
Windows.FILE_SHARE_READ |
|
||||
Windows.FILE_SHARE_WRITE |
|
||||
Windows.FILE_SHARE_DELETE,
|
||||
Windows.OPEN_EXISTING);
|
||||
if (seekOffset > 0)
|
||||
Windows.setFilePointer(fd, seekOffset, Windows.FILE_BEGIN);
|
||||
return new FileInputStream(fd);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Create the specified File for write access, ensuring that it does not exist.
|
||||
* @param f the file that we want to create
|
||||
* @param permissions we want to have on the file (if security is enabled)
|
||||
*
|
||||
* @throws AlreadyExistsException if the file already exists
|
||||
* @throws IOException if any other error occurred
|
||||
*/
|
||||
public static FileOutputStream getCreateForWriteFileOutputStream(File f, int permissions)
|
||||
throws IOException {
|
||||
if (!Shell.WINDOWS) {
|
||||
// Use the native wrapper around open(2)
|
||||
try {
|
||||
FileDescriptor fd = POSIX.open(f.getAbsolutePath(),
|
||||
POSIX.O_WRONLY | POSIX.O_CREAT
|
||||
| POSIX.O_EXCL, permissions);
|
||||
return new FileOutputStream(fd);
|
||||
} catch (NativeIOException nioe) {
|
||||
if (nioe.getErrno() == Errno.EEXIST) {
|
||||
throw new AlreadyExistsException(nioe);
|
||||
}
|
||||
throw nioe;
|
||||
}
|
||||
} else {
|
||||
// Use the Windows native APIs to create equivalent FileOutputStream
|
||||
try {
|
||||
FileDescriptor fd = Windows.createFile(f.getCanonicalPath(),
|
||||
Windows.GENERIC_WRITE,
|
||||
Windows.FILE_SHARE_DELETE
|
||||
| Windows.FILE_SHARE_READ
|
||||
| Windows.FILE_SHARE_WRITE,
|
||||
Windows.CREATE_NEW);
|
||||
POSIX.chmod(f.getCanonicalPath(), permissions);
|
||||
return new FileOutputStream(fd);
|
||||
} catch (NativeIOException nioe) {
|
||||
if (nioe.getErrorCode() == 80) {
|
||||
// ERROR_FILE_EXISTS
|
||||
// 80 (0x50)
|
||||
// The file exists
|
||||
throw new AlreadyExistsException(nioe);
|
||||
}
|
||||
throw nioe;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private synchronized static void ensureInitialized() {
|
||||
if (!initialized) {
|
||||
cacheTimeout =
|
||||
new Configuration().getLong("hadoop.security.uid.cache.secs",
|
||||
4*60*60) * 1000;
|
||||
LOG.info("Initialized cache for UID to User mapping with a cache" +
|
||||
" timeout of " + cacheTimeout/1000 + " seconds.");
|
||||
initialized = true;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A version of renameTo that throws a descriptive exception when it fails.
|
||||
*
|
||||
* @param src The source path
|
||||
* @param dst The destination path
|
||||
*
|
||||
* @throws NativeIOException On failure.
|
||||
*/
|
||||
public static void renameTo(File src, File dst)
|
||||
throws IOException {
|
||||
if (!nativeLoaded) {
|
||||
if (!src.renameTo(dst)) {
|
||||
throw new IOException("renameTo(src=" + src + ", dst=" +
|
||||
dst + ") failed.");
|
||||
}
|
||||
} else {
|
||||
renameTo0(src.getAbsolutePath(), dst.getAbsolutePath());
|
||||
}
|
||||
}
|
||||
|
||||
public static void link(File src, File dst) throws IOException {
|
||||
if (!nativeLoaded) {
|
||||
HardLink.createHardLink(src, dst);
|
||||
} else {
|
||||
link0(src.getAbsolutePath(), dst.getAbsolutePath());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A version of renameTo that throws a descriptive exception when it fails.
|
||||
*
|
||||
* @param src The source path
|
||||
* @param dst The destination path
|
||||
*
|
||||
* @throws NativeIOException On failure.
|
||||
*/
|
||||
private static native void renameTo0(String src, String dst)
|
||||
throws NativeIOException;
|
||||
|
||||
private static native void link0(String src, String dst)
|
||||
throws NativeIOException;
|
||||
|
||||
/**
|
||||
* Unbuffered file copy from src to dst without tainting OS buffer cache
|
||||
*
|
||||
* In POSIX platform:
|
||||
* It uses FileChannel#transferTo() which internally attempts
|
||||
* unbuffered IO on OS with native sendfile64() support and falls back to
|
||||
* buffered IO otherwise.
|
||||
*
|
||||
* It minimizes the number of FileChannel#transferTo call by passing the the
|
||||
* src file size directly instead of a smaller size as the 3rd parameter.
|
||||
* This saves the number of sendfile64() system call when native sendfile64()
|
||||
* is supported. In the two fall back cases where sendfile is not supported,
|
||||
* FileChannle#transferTo already has its own batching of size 8 MB and 8 KB,
|
||||
* respectively.
|
||||
*
|
||||
* In Windows Platform:
|
||||
* It uses its own native wrapper of CopyFileEx with COPY_FILE_NO_BUFFERING
|
||||
* flag, which is supported on Windows Server 2008 and above.
|
||||
*
|
||||
* Ideally, we should use FileChannel#transferTo() across both POSIX and Windows
|
||||
* platform. Unfortunately, the wrapper(Java_sun_nio_ch_FileChannelImpl_transferTo0)
|
||||
* used by FileChannel#transferTo for unbuffered IO is not implemented on Windows.
|
||||
* Based on OpenJDK 6/7/8 source code, Java_sun_nio_ch_FileChannelImpl_transferTo0
|
||||
* on Windows simply returns IOS_UNSUPPORTED.
|
||||
*
|
||||
* Note: This simple native wrapper does minimal parameter checking before copy and
|
||||
* consistency check (e.g., size) after copy.
|
||||
* It is recommended to use wrapper function like
|
||||
* the Storage#nativeCopyFileUnbuffered() function in hadoop-hdfs with pre/post copy
|
||||
* checks.
|
||||
*
|
||||
* @param src The source path
|
||||
* @param dst The destination path
|
||||
* @throws IOException
|
||||
*/
|
||||
public static void copyFileUnbuffered(File src, File dst) throws IOException {
|
||||
if (nativeLoaded && Shell.WINDOWS) {
|
||||
copyFileUnbuffered0(src.getAbsolutePath(), dst.getAbsolutePath());
|
||||
} else {
|
||||
FileInputStream fis = null;
|
||||
FileOutputStream fos = null;
|
||||
FileChannel input = null;
|
||||
FileChannel output = null;
|
||||
try {
|
||||
fis = new FileInputStream(src);
|
||||
fos = new FileOutputStream(dst);
|
||||
input = fis.getChannel();
|
||||
output = fos.getChannel();
|
||||
long remaining = input.size();
|
||||
long position = 0;
|
||||
long transferred = 0;
|
||||
while (remaining > 0) {
|
||||
transferred = input.transferTo(position, remaining, output);
|
||||
remaining -= transferred;
|
||||
position += transferred;
|
||||
}
|
||||
} finally {
|
||||
IOUtils.cleanup(LOG, output);
|
||||
IOUtils.cleanup(LOG, fos);
|
||||
IOUtils.cleanup(LOG, input);
|
||||
IOUtils.cleanup(LOG, fis);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static native void copyFileUnbuffered0(String src, String dst)
|
||||
throws NativeIOException;
|
||||
}
|
10
src/main/resources/log4j.properties
Normal file
10
src/main/resources/log4j.properties
Normal file
@ -0,0 +1,10 @@
|
||||
# Set everything to be logged to the console
|
||||
log4j.rootCategory=INFO, console
|
||||
log4j.appender.console=org.apache.log4j.ConsoleAppender
|
||||
log4j.appender.console.target=System.err
|
||||
log4j.appender.console.layout=org.apache.log4j.PatternLayout
|
||||
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
|
||||
|
||||
# Spark ????
|
||||
spark.org.apache.spark=WARN
|
||||
spark.streaming=INFO
|
Loading…
x
Reference in New Issue
Block a user