Compare commits
No commits in common. "master" and "main" have entirely different histories.
65
README.md
Normal file
65
README.md
Normal file
@ -0,0 +1,65 @@
|
|||||||
|
# DataCollections
|
||||||
|
|
||||||
|
Kafka生产 flume采集 sqoop抽取数据到Hive spark处理
|
||||||
|
|
||||||
|
# 大三下学期期末项目
|
||||||
|
## 项目简介
|
||||||
|
- 项目主题10
|
||||||
|
云部署+ 域名 + 服务 。云部署的形式,可以在任何有互联网的地方访问,如 访问百度一样,在连接互联网的机房都可以访问。相比较项目1,云服务器上的环境,可能要重新配置环境,出现问题,可能要练习云客服等才能处理。云服务也意味着收费,所以请慎重选择。云部署形式会增加很多的部署工作,所以评分也会相应的比较高。如果域名申请,时间较长,麻烦的话,可以通过nginx,反向代理通过IP去访问不用域名
|
||||||
|
|
||||||
|
## 项目名称
|
||||||
|
Orders information statistical system( 订单信息统计系统)
|
||||||
|
|
||||||
|
## 项目要求
|
||||||
|
|
||||||
|
1. Kafka data production (kafka 数据生产)
|
||||||
|
这个模块主要用来模拟在实时的业务系统中,用户数据是实时生产的,并推送到kafka 相应的主题中。
|
||||||
|
1) 使用java 代码实现producer,并每隔5s推送以下数据给 kafka的 orders主题
|
||||||
|
需要推送的数据格式,请使用随机数 +数组 的形式进行随机产生,结果如下:
|
||||||
|
5个字段:分别是 订单类别,订单名字,订单数量,日期,顾客评分(很喜欢这个物品100,还行 50, 不喜欢 0 ),是否有效: Y是有效,N无效分隔符可以用 \t 进行拼接。
|
||||||
|
2) 使用 flume 定时采集 相应kafka的数据写入到hdfs 的 /home/flume
|
||||||
|
3) 使用 sqoop 定时将 Spark data processing 所有处理的所有需求数据 抽取到 中央仓库 hive,建立相应的数据库名字 orders。
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
2. Spark data processing
|
||||||
|
选这个模块主要使用 spark mysql ,hive 或者 kafka ,并进行需求处理,并将统计的结果写入相应的数据库表中
|
||||||
|
1) 使用 spark sql / spark streaming 创建消费者读取相应主题的数据
|
||||||
|
2) 使用 spark sql / spark streaming 实时统计 分别统计 所有有效和无效订单号的总和
|
||||||
|
3) 使用 spark sql / spark streaming 实时统计 各个订单号 各自的有效和无效数量
|
||||||
|
4) 使用 spark sql / spark streaming 实时统计 所有订单类别的数量
|
||||||
|
5) 使用 spark sql 统计 各个订单的有效数和无效数的数量
|
||||||
|
6) 使用 spark core/rdd 统计 各个订单的 各个类别的有效和无效数量
|
||||||
|
7) 协同过滤推荐
|
||||||
|
a. 建立 订单-类别的关系矩阵
|
||||||
|
b. 建立 类别-类别 的关系矩阵
|
||||||
|
c. 基于 Item-Base CF 生成每个物品的推荐列表
|
||||||
|
d. 基于 User-Base CF 生成每个顾客的推荐列表
|
||||||
|
提示:前端展示的工具或者 echarts 展示的项目直接读取 mysql ,不用和后端的spark+kafka在同一个项目。前端所有展示的数据指标最好在同一个项目中。
|
||||||
|
3. Data visualization
|
||||||
|
展示学生信息
|
||||||
|
1)注册和登录页面 ,用户信息存储于mysql中
|
||||||
|
|
||||||
|
2) 用户登录后,首页使用echarts静态展示所所有有效和无效数量的各自的总和,以及各自类别的有效和无效数量
|
||||||
|
|
||||||
|
3) 个性化推荐:展示个性化推荐前6的类别名单。基于Item-Base 3个基于User-Base 3个。(基于协同过滤推荐的名单)
|
||||||
|
|
||||||
|
4)离线推荐:展示有效最多的前3的类别
|
||||||
|
|
||||||
|
5)通过搜索展示目标类别,以及相似类别top3 的推荐
|
||||||
|
|
||||||
|
### 项目结构
|
||||||
|
|
||||||
|
### 技术选型
|
||||||
|
|
||||||
|
|
||||||
|
数据生产
|
||||||
|
Java + Apache Kafka
|
||||||
|
数据采集
|
||||||
|
Apache Flume
|
||||||
|
数据抽取
|
||||||
|
Apache Sqoop
|
||||||
|
数据处理
|
||||||
|
Apache Spark (Spark SQL / Spark Streaming / Spark Core)
|
||||||
|
数据存储
|
||||||
|
MySQL + Hive + HDFS
|
92
pom.xml
92
pom.xml
@ -1,92 +0,0 @@
|
|||||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
|
||||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
|
||||||
<modelVersion>4.0.0</modelVersion>
|
|
||||||
|
|
||||||
<groupId>com.example</groupId>
|
|
||||||
<artifactId>OrderInformationData</artifactId>
|
|
||||||
<version>1.0-SNAPSHOT</version>
|
|
||||||
<packaging>jar</packaging>
|
|
||||||
|
|
||||||
<name>OrderInformationData</name>
|
|
||||||
<url>http://maven.apache.org</url>
|
|
||||||
|
|
||||||
|
|
||||||
<properties>
|
|
||||||
<maven.compiler.source>8</maven.compiler.source>
|
|
||||||
<maven.compiler.target>8</maven.compiler.target>
|
|
||||||
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
|
||||||
<scala.version>2.13.0</scala.version>
|
|
||||||
<spark.version>3.4.3</spark.version>
|
|
||||||
<hadoop.version>3.3.0</hadoop.version>
|
|
||||||
<encoding>UTF-8</encoding>
|
|
||||||
</properties>
|
|
||||||
|
|
||||||
<dependencies>
|
|
||||||
<dependency>
|
|
||||||
<groupId>org.apache.kafka</groupId>
|
|
||||||
<artifactId>kafka-clients</artifactId>
|
|
||||||
<version>3.0.2</version>
|
|
||||||
</dependency>
|
|
||||||
|
|
||||||
|
|
||||||
<dependency>
|
|
||||||
<groupId>org.apache.spark</groupId>
|
|
||||||
<artifactId>spark-core_2.13</artifactId>
|
|
||||||
<version>${spark.version}</version>
|
|
||||||
</dependency>
|
|
||||||
|
|
||||||
<!-- 导入spark-streaming的依赖 -->
|
|
||||||
<dependency>
|
|
||||||
<groupId>org.apache.spark</groupId>
|
|
||||||
<artifactId>spark-streaming_2.13</artifactId>
|
|
||||||
<version>${spark.version}</version>
|
|
||||||
</dependency>
|
|
||||||
|
|
||||||
<!-- 导入sparkstreaming-kafka依赖 -->
|
|
||||||
<dependency>
|
|
||||||
<groupId>org.apache.spark</groupId>
|
|
||||||
<artifactId>spark-sql-kafka-0-10_2.13</artifactId>
|
|
||||||
<version>${spark.version}</version>
|
|
||||||
</dependency>
|
|
||||||
|
|
||||||
|
|
||||||
<dependency>
|
|
||||||
<groupId>org.apache.spark</groupId>
|
|
||||||
<artifactId>spark-sql_2.13</artifactId>
|
|
||||||
<version>${spark.version}</version>
|
|
||||||
</dependency>
|
|
||||||
|
|
||||||
<dependency>
|
|
||||||
<groupId>org.apache.spark</groupId>
|
|
||||||
<artifactId>spark-mllib_2.13</artifactId>
|
|
||||||
<version>${spark.version}</version>
|
|
||||||
<scope>compile</scope>
|
|
||||||
</dependency>
|
|
||||||
|
|
||||||
<dependency>
|
|
||||||
<groupId>mysql</groupId>
|
|
||||||
<artifactId>mysql-connector-java</artifactId>
|
|
||||||
<version>8.0.22</version>
|
|
||||||
</dependency>
|
|
||||||
|
|
||||||
|
|
||||||
</dependencies>
|
|
||||||
<build>
|
|
||||||
|
|
||||||
|
|
||||||
<plugins>
|
|
||||||
<plugin>
|
|
||||||
<groupId>org.springframework.boot</groupId>
|
|
||||||
<artifactId>spring-boot-maven-plugin</artifactId>
|
|
||||||
</plugin>
|
|
||||||
<plugin>
|
|
||||||
<groupId>org.apache.maven.plugins</groupId>
|
|
||||||
<artifactId>maven-compiler-plugin</artifactId>
|
|
||||||
<configuration>
|
|
||||||
<source>8</source>
|
|
||||||
<target>8</target>
|
|
||||||
</configuration>
|
|
||||||
</plugin>
|
|
||||||
</plugins>
|
|
||||||
</build>
|
|
||||||
</project>
|
|
@ -1,37 +0,0 @@
|
|||||||
package com.example.kafka;
|
|
||||||
|
|
||||||
import org.apache.kafka.clients.producer.KafkaProducer;
|
|
||||||
import org.apache.kafka.clients.producer.Producer;
|
|
||||||
import org.apache.kafka.clients.producer.ProducerConfig;
|
|
||||||
import org.apache.kafka.clients.producer.ProducerRecord;
|
|
||||||
import org.apache.kafka.common.serialization.StringSerializer;
|
|
||||||
|
|
||||||
import java.util.Properties;
|
|
||||||
import java.util.Timer;
|
|
||||||
import java.util.TimerTask;
|
|
||||||
|
|
||||||
public class KafkaProducerApp {
|
|
||||||
|
|
||||||
public static void main(String[] args) {
|
|
||||||
String bootstrapServers = "172.16.5.3:2181"; // 替换远程 Kafka 地址
|
|
||||||
String topic = "orders";
|
|
||||||
|
|
||||||
Properties props = new Properties();
|
|
||||||
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
|
|
||||||
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
|
|
||||||
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
|
|
||||||
|
|
||||||
Producer<String, String> producer = new KafkaProducer<>(props);
|
|
||||||
|
|
||||||
Timer timer = new Timer();
|
|
||||||
timer.scheduleAtFixedRate(new TimerTask() {
|
|
||||||
@Override
|
|
||||||
public void run() {
|
|
||||||
String message = OrderDataGenerator.generateOrder();
|
|
||||||
ProducerRecord<String, String> record = new ProducerRecord<>(topic, message);
|
|
||||||
producer.send(record);
|
|
||||||
System.out.println("Sent to Kafka: " + message);
|
|
||||||
}
|
|
||||||
}, 0, 5000); // 每隔5秒发送一次
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,69 +0,0 @@
|
|||||||
package com.example.kafka;
|
|
||||||
|
|
||||||
import java.text.SimpleDateFormat;
|
|
||||||
import java.util.Date;
|
|
||||||
import java.util.Random;
|
|
||||||
import java.util.HashSet;
|
|
||||||
import java.util.Set;
|
|
||||||
|
|
||||||
import java.text.SimpleDateFormat;
|
|
||||||
import java.util.Date;
|
|
||||||
import java.util.Random;
|
|
||||||
import java.util.HashSet;
|
|
||||||
import java.util.Set;
|
|
||||||
|
|
||||||
public class OrderDataGenerator {
|
|
||||||
|
|
||||||
// 订单类别定义
|
|
||||||
private static final String[] CATEGORIES = {"电器", "服饰", "食品", "玩具", "手机"};
|
|
||||||
// 每个类别对应的订单名称
|
|
||||||
private static final String[][] PRODUCT_NAMES = {
|
|
||||||
{"电视", "冰箱", "洗衣机", "空调", "吸尘器", "电饭煲", "微波炉", "电磁炉", "热水器", "空气净化器"}, // 电器
|
|
||||||
{"T恤", "牛仔裤", "羽绒服", "衬衫", "运动鞋", "夹克", "卫衣", "连衣裙", "短裤", "风衣"}, // 服饰
|
|
||||||
{"巧克力", "饼干", "方便面", "牛奶", "饮料", "面包", "糖果", "果冻", "薯片", "蛋糕"}, // 食品
|
|
||||||
{"积木", "拼图", "玩偶", "遥控车", "毛绒玩具", "魔方", "乐高", "变形金刚", "洋娃娃", "电子琴"}, // 玩具
|
|
||||||
{"华为", "苹果", "小米", "OPPO", "vivo", "荣耀", "三星", "魅族", "联想", "努比亚"} // 手机
|
|
||||||
};
|
|
||||||
|
|
||||||
private static final Random random = new Random();
|
|
||||||
private static final SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
|
|
||||||
|
|
||||||
// 用于记录已生成的 categoryIndex,保证唯一性
|
|
||||||
private static final Set<Integer> generatedCategoryIndexes = new HashSet<>();
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 生成一条订单记录,并确保 categoryIndex 唯一
|
|
||||||
*/
|
|
||||||
public static String generateOrder() {
|
|
||||||
int categoryIndex;
|
|
||||||
// 确保生成的 categoryIndex 未被使用过
|
|
||||||
do {
|
|
||||||
categoryIndex = random.nextInt(CATEGORIES.length);
|
|
||||||
} while (generatedCategoryIndexes.contains(categoryIndex));
|
|
||||||
|
|
||||||
// 添加到已生成集合
|
|
||||||
generatedCategoryIndexes.add(categoryIndex);
|
|
||||||
|
|
||||||
String category = CATEGORIES[categoryIndex];
|
|
||||||
String name = PRODUCT_NAMES[categoryIndex][random.nextInt(10)];
|
|
||||||
int quantity = random.nextInt(10) + 1;
|
|
||||||
String date = sdf.format(new Date());
|
|
||||||
int rating = new Random().nextInt(3) * 50; // 0, 50, 100
|
|
||||||
String isValid = random.nextBoolean() ? "Y" : "N";
|
|
||||||
|
|
||||||
return String.join("\t",
|
|
||||||
category,
|
|
||||||
name,
|
|
||||||
String.valueOf(quantity),
|
|
||||||
date,
|
|
||||||
String.valueOf(rating),
|
|
||||||
isValid);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 清空已生成的 categoryIndex 记录,以便下一轮生成
|
|
||||||
*/
|
|
||||||
public static void resetCategoryIndexTracker() {
|
|
||||||
generatedCategoryIndexes.clear();
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,934 +0,0 @@
|
|||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.hadoop.io.nativeio;
|
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
|
||||||
import org.apache.commons.logging.Log;
|
|
||||||
import org.apache.commons.logging.LogFactory;
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
|
||||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
|
||||||
import org.apache.hadoop.fs.HardLink;
|
|
||||||
import org.apache.hadoop.io.IOUtils;
|
|
||||||
import org.apache.hadoop.io.SecureIOUtils.AlreadyExistsException;
|
|
||||||
import org.apache.hadoop.util.NativeCodeLoader;
|
|
||||||
import org.apache.hadoop.util.PerformanceAdvisory;
|
|
||||||
import org.apache.hadoop.util.Shell;
|
|
||||||
import sun.misc.Unsafe;
|
|
||||||
|
|
||||||
import java.io.*;
|
|
||||||
import java.lang.reflect.Field;
|
|
||||||
import java.nio.ByteBuffer;
|
|
||||||
import java.nio.MappedByteBuffer;
|
|
||||||
import java.nio.channels.FileChannel;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* JNI wrappers for various native IO-related calls not available in Java.
|
|
||||||
* These functions should generally be used alongside a fallback to another
|
|
||||||
* more portable mechanism.
|
|
||||||
*/
|
|
||||||
@InterfaceAudience.Private
|
|
||||||
@InterfaceStability.Unstable
|
|
||||||
public class NativeIO {
|
|
||||||
public static class POSIX {
|
|
||||||
// Flags for open() call from bits/fcntl.h
|
|
||||||
public static final int O_RDONLY = 00;
|
|
||||||
public static final int O_WRONLY = 01;
|
|
||||||
public static final int O_RDWR = 02;
|
|
||||||
public static final int O_CREAT = 0100;
|
|
||||||
public static final int O_EXCL = 0200;
|
|
||||||
public static final int O_NOCTTY = 0400;
|
|
||||||
public static final int O_TRUNC = 01000;
|
|
||||||
public static final int O_APPEND = 02000;
|
|
||||||
public static final int O_NONBLOCK = 04000;
|
|
||||||
public static final int O_SYNC = 010000;
|
|
||||||
public static final int O_ASYNC = 020000;
|
|
||||||
public static final int O_FSYNC = O_SYNC;
|
|
||||||
public static final int O_NDELAY = O_NONBLOCK;
|
|
||||||
|
|
||||||
// Flags for posix_fadvise() from bits/fcntl.h
|
|
||||||
/* No further special treatment. */
|
|
||||||
public static final int POSIX_FADV_NORMAL = 0;
|
|
||||||
/* Expect random page references. */
|
|
||||||
public static final int POSIX_FADV_RANDOM = 1;
|
|
||||||
/* Expect sequential page references. */
|
|
||||||
public static final int POSIX_FADV_SEQUENTIAL = 2;
|
|
||||||
/* Will need these pages. */
|
|
||||||
public static final int POSIX_FADV_WILLNEED = 3;
|
|
||||||
/* Don't need these pages. */
|
|
||||||
public static final int POSIX_FADV_DONTNEED = 4;
|
|
||||||
/* Data will be accessed once. */
|
|
||||||
public static final int POSIX_FADV_NOREUSE = 5;
|
|
||||||
/* Wait upon writeout of all pages
|
|
||||||
in the range before performing the
|
|
||||||
write. */
|
|
||||||
public static final int SYNC_FILE_RANGE_WAIT_BEFORE = 1;
|
|
||||||
/* Initiate writeout of all those
|
|
||||||
dirty pages in the range which are
|
|
||||||
not presently under writeback. */
|
|
||||||
public static final int SYNC_FILE_RANGE_WRITE = 2;
|
|
||||||
/* Wait upon writeout of all pages in
|
|
||||||
the range after performing the
|
|
||||||
write. */
|
|
||||||
public static final int SYNC_FILE_RANGE_WAIT_AFTER = 4;
|
|
||||||
private static final Log LOG = LogFactory.getLog(NativeIO.class);
|
|
||||||
private static boolean nativeLoaded = false;
|
|
||||||
private static boolean fadvisePossible = true;
|
|
||||||
private static boolean syncFileRangePossible = true;
|
|
||||||
static final String WORKAROUND_NON_THREADSAFE_CALLS_KEY =
|
|
||||||
"hadoop.workaround.non.threadsafe.getpwuid";
|
|
||||||
static final boolean WORKAROUND_NON_THREADSAFE_CALLS_DEFAULT = true;
|
|
||||||
private static long cacheTimeout = -1;
|
|
||||||
private static CacheManipulator cacheManipulator = new CacheManipulator();
|
|
||||||
public static CacheManipulator getCacheManipulator() {
|
|
||||||
return cacheManipulator;
|
|
||||||
}
|
|
||||||
public static void setCacheManipulator(CacheManipulator cacheManipulator) {
|
|
||||||
POSIX.cacheManipulator = cacheManipulator;
|
|
||||||
}
|
|
||||||
/**
|
|
||||||
* Used to manipulate the operating system cache.
|
|
||||||
*/
|
|
||||||
@VisibleForTesting
|
|
||||||
public static class CacheManipulator {
|
|
||||||
public void mlock(String identifier, ByteBuffer buffer,
|
|
||||||
long len) throws IOException {
|
|
||||||
POSIX.mlock(buffer, len);
|
|
||||||
}
|
|
||||||
public long getMemlockLimit() {
|
|
||||||
return NativeIO.getMemlockLimit();
|
|
||||||
}
|
|
||||||
public long getOperatingSystemPageSize() {
|
|
||||||
return NativeIO.getOperatingSystemPageSize();
|
|
||||||
}
|
|
||||||
public void posixFadviseIfPossible(String identifier,
|
|
||||||
FileDescriptor fd, long offset, long len, int flags)
|
|
||||||
throws NativeIOException {
|
|
||||||
POSIX.posixFadviseIfPossible(identifier, fd, offset,
|
|
||||||
len, flags);
|
|
||||||
}
|
|
||||||
public boolean verifyCanMlock() {
|
|
||||||
return NativeIO.isAvailable();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
/**
|
|
||||||
* A CacheManipulator used for testing which does not actually call mlock.
|
|
||||||
* This allows many tests to be run even when the operating system does not
|
|
||||||
* allow mlock, or only allows limited mlocking.
|
|
||||||
*/
|
|
||||||
@VisibleForTesting
|
|
||||||
public static class NoMlockCacheManipulator extends CacheManipulator {
|
|
||||||
public void mlock(String identifier, ByteBuffer buffer,
|
|
||||||
long len) throws IOException {
|
|
||||||
LOG.info("mlocking " + identifier);
|
|
||||||
}
|
|
||||||
public long getMemlockLimit() {
|
|
||||||
return 1125899906842624L;
|
|
||||||
}
|
|
||||||
public long getOperatingSystemPageSize() {
|
|
||||||
return 4096;
|
|
||||||
}
|
|
||||||
public boolean verifyCanMlock() {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
static {
|
|
||||||
if (NativeCodeLoader.isNativeCodeLoaded()) {
|
|
||||||
try {
|
|
||||||
Configuration conf = new Configuration();
|
|
||||||
workaroundNonThreadSafePasswdCalls = conf.getBoolean(
|
|
||||||
WORKAROUND_NON_THREADSAFE_CALLS_KEY,
|
|
||||||
WORKAROUND_NON_THREADSAFE_CALLS_DEFAULT);
|
|
||||||
initNative();
|
|
||||||
nativeLoaded = true;
|
|
||||||
cacheTimeout = conf.getLong(
|
|
||||||
CommonConfigurationKeys.HADOOP_SECURITY_UID_NAME_CACHE_TIMEOUT_KEY,
|
|
||||||
CommonConfigurationKeys.HADOOP_SECURITY_UID_NAME_CACHE_TIMEOUT_DEFAULT) *
|
|
||||||
1000;
|
|
||||||
LOG.debug("Initialized cache for IDs to User/Group mapping with a " +
|
|
||||||
" cache timeout of " + cacheTimeout/1000 + " seconds.");
|
|
||||||
} catch (Throwable t) {
|
|
||||||
// This can happen if the user has an older version of libhadoop.so
|
|
||||||
// installed - in this case we can continue without native IO
|
|
||||||
// after warning
|
|
||||||
PerformanceAdvisory.LOG.debug("Unable to initialize nativeio libraries", t);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
/**
|
|
||||||
* Return true if the JNI-based native IO extensions are available.
|
|
||||||
*/
|
|
||||||
public static boolean isAvailable() {
|
|
||||||
return NativeCodeLoader.isNativeCodeLoaded() && nativeLoaded;
|
|
||||||
}
|
|
||||||
private static void assertCodeLoaded() throws IOException {
|
|
||||||
if (!isAvailable()) {
|
|
||||||
throw new IOException("nativeio was not loaded");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
/** Wrapper around open(2) */
|
|
||||||
public static native FileDescriptor open(String path, int flags, int mode) throws IOException;
|
|
||||||
/** Wrapper around fstat(2) */
|
|
||||||
private static native Stat fstat(FileDescriptor fd) throws IOException;
|
|
||||||
/** Native chmod implementation. On UNIX, it is a wrapper around chmod(2) */
|
|
||||||
private static native void chmodImpl(String path, int mode) throws IOException;
|
|
||||||
public static void chmod(String path, int mode) throws IOException {
|
|
||||||
if (!Shell.WINDOWS) {
|
|
||||||
chmodImpl(path, mode);
|
|
||||||
} else {
|
|
||||||
try {
|
|
||||||
chmodImpl(path, mode);
|
|
||||||
} catch (NativeIOException nioe) {
|
|
||||||
if (nioe.getErrorCode() == 3) {
|
|
||||||
throw new NativeIOException("No such file or directory",
|
|
||||||
Errno.ENOENT);
|
|
||||||
} else {
|
|
||||||
LOG.warn(String.format("nativeio.chmod error (%d): %s",
|
|
||||||
nioe.getErrorCode(), nioe.getMessage()));
|
|
||||||
throw new NativeIOException("Unknown error", Errno.UNKNOWN);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
/** Wrapper around posix_fadvise(2) */
|
|
||||||
static native void posix_fadvise(
|
|
||||||
FileDescriptor fd, long offset, long len, int flags) throws NativeIOException;
|
|
||||||
/** Wrapper around sync_file_range(2) */
|
|
||||||
static native void sync_file_range(
|
|
||||||
FileDescriptor fd, long offset, long nbytes, int flags) throws NativeIOException;
|
|
||||||
/**
|
|
||||||
* Call posix_fadvise on the given file descriptor. See the manpage
|
|
||||||
* for this syscall for more information. On systems where this
|
|
||||||
* call is not available, does nothing.
|
|
||||||
*
|
|
||||||
* @throws NativeIOException if there is an error with the syscall
|
|
||||||
*/
|
|
||||||
static void posixFadviseIfPossible(String identifier,
|
|
||||||
FileDescriptor fd, long offset, long len, int flags)
|
|
||||||
throws NativeIOException {
|
|
||||||
if (nativeLoaded && fadvisePossible) {
|
|
||||||
try {
|
|
||||||
posix_fadvise(fd, offset, len, flags);
|
|
||||||
} catch (UnsupportedOperationException uoe) {
|
|
||||||
fadvisePossible = false;
|
|
||||||
} catch (UnsatisfiedLinkError ule) {
|
|
||||||
fadvisePossible = false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
/**
|
|
||||||
* Call sync_file_range on the given file descriptor. See the manpage
|
|
||||||
* for this syscall for more information. On systems where this
|
|
||||||
* call is not available, does nothing.
|
|
||||||
*
|
|
||||||
* @throws NativeIOException if there is an error with the syscall
|
|
||||||
*/
|
|
||||||
public static void syncFileRangeIfPossible(
|
|
||||||
FileDescriptor fd, long offset, long nbytes, int flags)
|
|
||||||
throws NativeIOException {
|
|
||||||
if (nativeLoaded && syncFileRangePossible) {
|
|
||||||
try {
|
|
||||||
sync_file_range(fd, offset, nbytes, flags);
|
|
||||||
} catch (UnsupportedOperationException uoe) {
|
|
||||||
syncFileRangePossible = false;
|
|
||||||
} catch (UnsatisfiedLinkError ule) {
|
|
||||||
syncFileRangePossible = false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
static native void mlock_native(
|
|
||||||
ByteBuffer buffer, long len) throws NativeIOException;
|
|
||||||
/**
|
|
||||||
* Locks the provided direct ByteBuffer into memory, preventing it from
|
|
||||||
* swapping out. After a buffer is locked, future accesses will not incur
|
|
||||||
* a page fault.
|
|
||||||
*
|
|
||||||
* See the mlock(2) man page for more information.
|
|
||||||
*
|
|
||||||
* @throws NativeIOException
|
|
||||||
*/
|
|
||||||
static void mlock(ByteBuffer buffer, long len)
|
|
||||||
throws IOException {
|
|
||||||
assertCodeLoaded();
|
|
||||||
if (!buffer.isDirect()) {
|
|
||||||
throw new IOException("Cannot mlock a non-direct ByteBuffer");
|
|
||||||
}
|
|
||||||
mlock_native(buffer, len);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Unmaps the block from memory. See munmap(2).
|
|
||||||
*
|
|
||||||
* There isn't any portable way to unmap a memory region in Java.
|
|
||||||
* So we use the sun.nio method here.
|
|
||||||
* Note that unmapping a memory region could cause crashes if code
|
|
||||||
* continues to reference the unmapped code. However, if we don't
|
|
||||||
* manually unmap the memory, we are dependent on the finalizer to
|
|
||||||
* do it, and we have no idea when the finalizer will run.
|
|
||||||
*
|
|
||||||
* @param buffer The buffer to unmap.
|
|
||||||
*/
|
|
||||||
public static void munmap(MappedByteBuffer buffer) {
|
|
||||||
if (buffer instanceof sun.nio.ch.DirectBuffer) {
|
|
||||||
sun.misc.Cleaner cleaner =
|
|
||||||
((sun.nio.ch.DirectBuffer)buffer).cleaner();
|
|
||||||
cleaner.clean();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Linux only methods used for getOwner() implementation */
|
|
||||||
private static native long getUIDforFDOwnerforOwner(FileDescriptor fd) throws IOException;
|
|
||||||
private static native String getUserName(long uid) throws IOException;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Result type of the fstat call
|
|
||||||
*/
|
|
||||||
public static class Stat {
|
|
||||||
private int ownerId, groupId;
|
|
||||||
private String owner, group;
|
|
||||||
private int mode;
|
|
||||||
|
|
||||||
// Mode constants
|
|
||||||
public static final int S_IFMT = 0170000; /* type of file */
|
|
||||||
public static final int S_IFIFO = 0010000; /* named pipe (fifo) */
|
|
||||||
public static final int S_IFCHR = 0020000; /* character special */
|
|
||||||
public static final int S_IFDIR = 0040000; /* directory */
|
|
||||||
public static final int S_IFBLK = 0060000; /* block special */
|
|
||||||
public static final int S_IFREG = 0100000; /* regular */
|
|
||||||
public static final int S_IFLNK = 0120000; /* symbolic link */
|
|
||||||
public static final int S_IFSOCK = 0140000; /* socket */
|
|
||||||
public static final int S_IFWHT = 0160000; /* whiteout */
|
|
||||||
public static final int S_ISUID = 0004000; /* set user id on execution */
|
|
||||||
public static final int S_ISGID = 0002000; /* set group id on execution */
|
|
||||||
public static final int S_ISVTX = 0001000; /* save swapped text even after use */
|
|
||||||
public static final int S_IRUSR = 0000400; /* read permission, owner */
|
|
||||||
public static final int S_IWUSR = 0000200; /* write permission, owner */
|
|
||||||
public static final int S_IXUSR = 0000100; /* execute/search permission, owner */
|
|
||||||
|
|
||||||
Stat(int ownerId, int groupId, int mode) {
|
|
||||||
this.ownerId = ownerId;
|
|
||||||
this.groupId = groupId;
|
|
||||||
this.mode = mode;
|
|
||||||
}
|
|
||||||
|
|
||||||
Stat(String owner, String group, int mode) {
|
|
||||||
if (!Shell.WINDOWS) {
|
|
||||||
this.owner = owner;
|
|
||||||
} else {
|
|
||||||
this.owner = stripDomain(owner);
|
|
||||||
}
|
|
||||||
if (!Shell.WINDOWS) {
|
|
||||||
this.group = group;
|
|
||||||
} else {
|
|
||||||
this.group = stripDomain(group);
|
|
||||||
}
|
|
||||||
this.mode = mode;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String toString() {
|
|
||||||
return "Stat(owner='" + owner + "', group='" + group + "'" +
|
|
||||||
", mode=" + mode + ")";
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getOwner() {
|
|
||||||
return owner;
|
|
||||||
}
|
|
||||||
public String getGroup() {
|
|
||||||
return group;
|
|
||||||
}
|
|
||||||
public int getMode() {
|
|
||||||
return mode;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns the file stat for a file descriptor.
|
|
||||||
*
|
|
||||||
* @param fd file descriptor.
|
|
||||||
* @return the file descriptor file stat.
|
|
||||||
* @throws IOException thrown if there was an IO error while obtaining the file stat.
|
|
||||||
*/
|
|
||||||
public static Stat getFstat(FileDescriptor fd) throws IOException {
|
|
||||||
Stat stat = null;
|
|
||||||
if (!Shell.WINDOWS) {
|
|
||||||
stat = fstat(fd);
|
|
||||||
stat.owner = getName(IdCache.USER, stat.ownerId);
|
|
||||||
stat.group = getName(IdCache.GROUP, stat.groupId);
|
|
||||||
} else {
|
|
||||||
try {
|
|
||||||
stat = fstat(fd);
|
|
||||||
} catch (NativeIOException nioe) {
|
|
||||||
if (nioe.getErrorCode() == 6) {
|
|
||||||
throw new NativeIOException("The handle is invalid.",
|
|
||||||
Errno.EBADF);
|
|
||||||
} else {
|
|
||||||
LOG.warn(String.format("nativeio.getFstat error (%d): %s",
|
|
||||||
nioe.getErrorCode(), nioe.getMessage()));
|
|
||||||
throw new NativeIOException("Unknown error", Errno.UNKNOWN);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return stat;
|
|
||||||
}
|
|
||||||
|
|
||||||
private static String getName(IdCache domain, int id) throws IOException {
|
|
||||||
Map<Integer, CachedName> idNameCache = (domain == IdCache.USER)
|
|
||||||
? USER_ID_NAME_CACHE : GROUP_ID_NAME_CACHE;
|
|
||||||
String name;
|
|
||||||
CachedName cachedName = idNameCache.get(id);
|
|
||||||
long now = System.currentTimeMillis();
|
|
||||||
if (cachedName != null && (cachedName.timestamp + cacheTimeout) > now) {
|
|
||||||
name = cachedName.name;
|
|
||||||
} else {
|
|
||||||
name = (domain == IdCache.USER) ? getUserName(id) : getGroupName(id);
|
|
||||||
if (LOG.isDebugEnabled()) {
|
|
||||||
String type = (domain == IdCache.USER) ? "UserName" : "GroupName";
|
|
||||||
LOG.debug("Got " + type + " " + name + " for ID " + id +
|
|
||||||
" from the native implementation");
|
|
||||||
}
|
|
||||||
cachedName = new CachedName(name, now);
|
|
||||||
idNameCache.put(id, cachedName);
|
|
||||||
}
|
|
||||||
return name;
|
|
||||||
}
|
|
||||||
|
|
||||||
static native String getUserName(int uid) throws IOException;
|
|
||||||
static native String getGroupName(int uid) throws IOException;
|
|
||||||
|
|
||||||
private static class CachedName {
|
|
||||||
final long timestamp;
|
|
||||||
final String name;
|
|
||||||
|
|
||||||
public CachedName(String name, long timestamp) {
|
|
||||||
this.name = name;
|
|
||||||
this.timestamp = timestamp;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private static final Map<Integer, CachedName> USER_ID_NAME_CACHE =
|
|
||||||
new ConcurrentHashMap<Integer, CachedName>();
|
|
||||||
|
|
||||||
private static final Map<Integer, CachedName> GROUP_ID_NAME_CACHE =
|
|
||||||
new ConcurrentHashMap<Integer, CachedName>();
|
|
||||||
|
|
||||||
private enum IdCache { USER, GROUP }
|
|
||||||
|
|
||||||
public final static int MMAP_PROT_READ = 0x1;
|
|
||||||
public final static int MMAP_PROT_WRITE = 0x2;
|
|
||||||
public final static int MMAP_PROT_EXEC = 0x4;
|
|
||||||
|
|
||||||
public static native long mmap(FileDescriptor fd, int prot,
|
|
||||||
boolean shared, long length) throws IOException;
|
|
||||||
|
|
||||||
public static native void munmap(long addr, long length)
|
|
||||||
throws IOException;
|
|
||||||
}
|
|
||||||
|
|
||||||
private static boolean workaroundNonThreadSafePasswdCalls = false;
|
|
||||||
|
|
||||||
|
|
||||||
public static class Windows {
|
|
||||||
// Flags for CreateFile() call on Windows
|
|
||||||
public static final long GENERIC_READ = 0x80000000L;
|
|
||||||
public static final long GENERIC_WRITE = 0x40000000L;
|
|
||||||
|
|
||||||
public static final long FILE_SHARE_READ = 0x00000001L;
|
|
||||||
public static final long FILE_SHARE_WRITE = 0x00000002L;
|
|
||||||
public static final long FILE_SHARE_DELETE = 0x00000004L;
|
|
||||||
|
|
||||||
public static final long CREATE_NEW = 1;
|
|
||||||
public static final long CREATE_ALWAYS = 2;
|
|
||||||
public static final long OPEN_EXISTING = 3;
|
|
||||||
public static final long OPEN_ALWAYS = 4;
|
|
||||||
public static final long TRUNCATE_EXISTING = 5;
|
|
||||||
|
|
||||||
public static final long FILE_BEGIN = 0;
|
|
||||||
public static final long FILE_CURRENT = 1;
|
|
||||||
public static final long FILE_END = 2;
|
|
||||||
|
|
||||||
public static final long FILE_ATTRIBUTE_NORMAL = 0x00000080L;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Create a directory with permissions set to the specified mode. By setting
|
|
||||||
* permissions at creation time, we avoid issues related to the user lacking
|
|
||||||
* WRITE_DAC rights on subsequent chmod calls. One example where this can
|
|
||||||
* occur is writing to an SMB share where the user does not have Full Control
|
|
||||||
* rights, and therefore WRITE_DAC is denied.
|
|
||||||
*
|
|
||||||
* @param path directory to create
|
|
||||||
* @param mode permissions of new directory
|
|
||||||
* @throws IOException if there is an I/O error
|
|
||||||
*/
|
|
||||||
public static void createDirectoryWithMode(File path, int mode)
|
|
||||||
throws IOException {
|
|
||||||
createDirectoryWithMode0(path.getAbsolutePath(), mode);
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Wrapper around CreateDirectory() on Windows */
|
|
||||||
private static native void createDirectoryWithMode0(String path, int mode)
|
|
||||||
throws NativeIOException;
|
|
||||||
|
|
||||||
/** Wrapper around CreateFile() on Windows */
|
|
||||||
public static native FileDescriptor createFile(String path,
|
|
||||||
long desiredAccess, long shareMode, long creationDisposition)
|
|
||||||
throws IOException;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Create a file for write with permissions set to the specified mode. By
|
|
||||||
* setting permissions at creation time, we avoid issues related to the user
|
|
||||||
* lacking WRITE_DAC rights on subsequent chmod calls. One example where
|
|
||||||
* this can occur is writing to an SMB share where the user does not have
|
|
||||||
* Full Control rights, and therefore WRITE_DAC is denied.
|
|
||||||
*
|
|
||||||
* This method mimics the semantics implemented by the JDK in
|
|
||||||
* {@link FileOutputStream}. The file is opened for truncate or
|
|
||||||
* append, the sharing mode allows other readers and writers, and paths
|
|
||||||
* longer than MAX_PATH are supported. (See io_util_md.c in the JDK.)
|
|
||||||
*
|
|
||||||
* @param path file to create
|
|
||||||
* @param append if true, then open file for append
|
|
||||||
* @param mode permissions of new directory
|
|
||||||
* @return FileOutputStream of opened file
|
|
||||||
* @throws IOException if there is an I/O error
|
|
||||||
*/
|
|
||||||
public static FileOutputStream createFileOutputStreamWithMode(File path,
|
|
||||||
boolean append, int mode) throws IOException {
|
|
||||||
long desiredAccess = GENERIC_WRITE;
|
|
||||||
long shareMode = FILE_SHARE_READ | FILE_SHARE_WRITE;
|
|
||||||
long creationDisposition = append ? OPEN_ALWAYS : CREATE_ALWAYS;
|
|
||||||
return new FileOutputStream(createFileWithMode0(path.getAbsolutePath(),
|
|
||||||
desiredAccess, shareMode, creationDisposition, mode));
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Wrapper around CreateFile() with security descriptor on Windows */
|
|
||||||
private static native FileDescriptor createFileWithMode0(String path,
|
|
||||||
long desiredAccess, long shareMode, long creationDisposition, int mode)
|
|
||||||
throws NativeIOException;
|
|
||||||
|
|
||||||
/** Wrapper around SetFilePointer() on Windows */
|
|
||||||
public static native long setFilePointer(FileDescriptor fd,
|
|
||||||
long distanceToMove, long moveMethod) throws IOException;
|
|
||||||
|
|
||||||
/** Windows only methods used for getOwner() implementation */
|
|
||||||
private static native String getOwner(FileDescriptor fd) throws IOException;
|
|
||||||
|
|
||||||
/** Supported list of Windows access right flags */
|
|
||||||
public static enum AccessRight {
|
|
||||||
ACCESS_READ (0x0001), // FILE_READ_DATA
|
|
||||||
ACCESS_WRITE (0x0002), // FILE_WRITE_DATA
|
|
||||||
ACCESS_EXECUTE (0x0020); // FILE_EXECUTE
|
|
||||||
|
|
||||||
private final int accessRight;
|
|
||||||
AccessRight(int access) {
|
|
||||||
accessRight = access;
|
|
||||||
}
|
|
||||||
|
|
||||||
public int accessRight() {
|
|
||||||
return accessRight;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
/** Windows only method used to check if the current process has requested
|
|
||||||
* access rights on the given path. */
|
|
||||||
private static native boolean access0(String path, int requestedAccess);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Checks whether the current process has desired access rights on
|
|
||||||
* the given path.
|
|
||||||
*
|
|
||||||
* Longer term this native function can be substituted with JDK7
|
|
||||||
* function Files#isReadable, isWritable, isExecutable.
|
|
||||||
*
|
|
||||||
* @param path input path
|
|
||||||
* @param desiredAccess ACCESS_READ, ACCESS_WRITE or ACCESS_EXECUTE
|
|
||||||
* @return true if access is allowed
|
|
||||||
* @throws IOException I/O exception on error
|
|
||||||
*/
|
|
||||||
public static boolean access(String path, AccessRight desiredAccess)
|
|
||||||
throws IOException {
|
|
||||||
|
|
||||||
return true;
|
|
||||||
// return access0(path, desiredAccess.accessRight());
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Extends both the minimum and maximum working set size of the current
|
|
||||||
* process. This method gets the current minimum and maximum working set
|
|
||||||
* size, adds the requested amount to each and then sets the minimum and
|
|
||||||
* maximum working set size to the new values. Controlling the working set
|
|
||||||
* size of the process also controls the amount of memory it can lock.
|
|
||||||
*
|
|
||||||
* @param delta amount to increment minimum and maximum working set size
|
|
||||||
* @throws IOException for any error
|
|
||||||
* @see POSIX#mlock(ByteBuffer, long)
|
|
||||||
*/
|
|
||||||
public static native void extendWorkingSetSize(long delta) throws IOException;
|
|
||||||
|
|
||||||
static {
|
|
||||||
if (NativeCodeLoader.isNativeCodeLoaded()) {
|
|
||||||
try {
|
|
||||||
initNative();
|
|
||||||
nativeLoaded = true;
|
|
||||||
} catch (Throwable t) {
|
|
||||||
// This can happen if the user has an older version of libhadoop.so
|
|
||||||
// installed - in this case we can continue without native IO
|
|
||||||
// after warning
|
|
||||||
PerformanceAdvisory.LOG.debug("Unable to initialize nativeio libraries", t);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private static final Log LOG = LogFactory.getLog(NativeIO.class);
|
|
||||||
|
|
||||||
private static boolean nativeLoaded = false;
|
|
||||||
|
|
||||||
static {
|
|
||||||
if (NativeCodeLoader.isNativeCodeLoaded()) {
|
|
||||||
try {
|
|
||||||
initNative();
|
|
||||||
nativeLoaded = true;
|
|
||||||
} catch (Throwable t) {
|
|
||||||
// This can happen if the user has an older version of libhadoop.so
|
|
||||||
// installed - in this case we can continue without native IO
|
|
||||||
// after warning
|
|
||||||
PerformanceAdvisory.LOG.debug("Unable to initialize nativeio libraries", t);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Return true if the JNI-based native IO extensions are available.
|
|
||||||
*/
|
|
||||||
public static boolean isAvailable() {
|
|
||||||
return NativeCodeLoader.isNativeCodeLoaded() && nativeLoaded;
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Initialize the JNI method ID and class ID cache */
|
|
||||||
private static native void initNative();
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Get the maximum number of bytes that can be locked into memory at any
|
|
||||||
* given point.
|
|
||||||
*
|
|
||||||
* @return 0 if no bytes can be locked into memory;
|
|
||||||
* Long.MAX_VALUE if there is no limit;
|
|
||||||
* The number of bytes that can be locked into memory otherwise.
|
|
||||||
*/
|
|
||||||
static long getMemlockLimit() {
|
|
||||||
return isAvailable() ? getMemlockLimit0() : 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
private static native long getMemlockLimit0();
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @return the operating system's page size.
|
|
||||||
*/
|
|
||||||
static long getOperatingSystemPageSize() {
|
|
||||||
try {
|
|
||||||
Field f = Unsafe.class.getDeclaredField("theUnsafe");
|
|
||||||
f.setAccessible(true);
|
|
||||||
Unsafe unsafe = (Unsafe)f.get(null);
|
|
||||||
return unsafe.pageSize();
|
|
||||||
} catch (Throwable e) {
|
|
||||||
LOG.warn("Unable to get operating system page size. Guessing 4096.", e);
|
|
||||||
return 4096;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private static class CachedUid {
|
|
||||||
final long timestamp;
|
|
||||||
final String username;
|
|
||||||
public CachedUid(String username, long timestamp) {
|
|
||||||
this.timestamp = timestamp;
|
|
||||||
this.username = username;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
private static final Map<Long, CachedUid> uidCache =
|
|
||||||
new ConcurrentHashMap<Long, CachedUid>();
|
|
||||||
private static long cacheTimeout;
|
|
||||||
private static boolean initialized = false;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* The Windows logon name has two part, NetBIOS domain name and
|
|
||||||
* user account name, of the format DOMAIN\UserName. This method
|
|
||||||
* will remove the domain part of the full logon name.
|
|
||||||
*
|
|
||||||
* @param Fthe full principal name containing the domain
|
|
||||||
* @return name with domain removed
|
|
||||||
*/
|
|
||||||
private static String stripDomain(String name) {
|
|
||||||
int i = name.indexOf('\\');
|
|
||||||
if (i != -1)
|
|
||||||
name = name.substring(i + 1);
|
|
||||||
return name;
|
|
||||||
}
|
|
||||||
|
|
||||||
public static String getOwner(FileDescriptor fd) throws IOException {
|
|
||||||
ensureInitialized();
|
|
||||||
if (Shell.WINDOWS) {
|
|
||||||
String owner = Windows.getOwner(fd);
|
|
||||||
owner = stripDomain(owner);
|
|
||||||
return owner;
|
|
||||||
} else {
|
|
||||||
long uid = POSIX.getUIDforFDOwnerforOwner(fd);
|
|
||||||
CachedUid cUid = uidCache.get(uid);
|
|
||||||
long now = System.currentTimeMillis();
|
|
||||||
if (cUid != null && (cUid.timestamp + cacheTimeout) > now) {
|
|
||||||
return cUid.username;
|
|
||||||
}
|
|
||||||
String user = POSIX.getUserName(uid);
|
|
||||||
LOG.info("Got UserName " + user + " for UID " + uid
|
|
||||||
+ " from the native implementation");
|
|
||||||
cUid = new CachedUid(user, now);
|
|
||||||
uidCache.put(uid, cUid);
|
|
||||||
return user;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Create a FileInputStream that shares delete permission on the
|
|
||||||
* file opened, i.e. other process can delete the file the
|
|
||||||
* FileInputStream is reading. Only Windows implementation uses
|
|
||||||
* the native interface.
|
|
||||||
*/
|
|
||||||
public static FileInputStream getShareDeleteFileInputStream(File f)
|
|
||||||
throws IOException {
|
|
||||||
if (!Shell.WINDOWS) {
|
|
||||||
// On Linux the default FileInputStream shares delete permission
|
|
||||||
// on the file opened.
|
|
||||||
//
|
|
||||||
return new FileInputStream(f);
|
|
||||||
} else {
|
|
||||||
// Use Windows native interface to create a FileInputStream that
|
|
||||||
// shares delete permission on the file opened.
|
|
||||||
//
|
|
||||||
FileDescriptor fd = Windows.createFile(
|
|
||||||
f.getAbsolutePath(),
|
|
||||||
Windows.GENERIC_READ,
|
|
||||||
Windows.FILE_SHARE_READ |
|
|
||||||
Windows.FILE_SHARE_WRITE |
|
|
||||||
Windows.FILE_SHARE_DELETE,
|
|
||||||
Windows.OPEN_EXISTING);
|
|
||||||
return new FileInputStream(fd);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Create a FileInputStream that shares delete permission on the
|
|
||||||
* file opened at a given offset, i.e. other process can delete
|
|
||||||
* the file the FileInputStream is reading. Only Windows implementation
|
|
||||||
* uses the native interface.
|
|
||||||
*/
|
|
||||||
public static FileInputStream getShareDeleteFileInputStream(File f, long seekOffset)
|
|
||||||
throws IOException {
|
|
||||||
if (!Shell.WINDOWS) {
|
|
||||||
RandomAccessFile rf = new RandomAccessFile(f, "r");
|
|
||||||
if (seekOffset > 0) {
|
|
||||||
rf.seek(seekOffset);
|
|
||||||
}
|
|
||||||
return new FileInputStream(rf.getFD());
|
|
||||||
} else {
|
|
||||||
// Use Windows native interface to create a FileInputStream that
|
|
||||||
// shares delete permission on the file opened, and set it to the
|
|
||||||
// given offset.
|
|
||||||
//
|
|
||||||
FileDescriptor fd = Windows.createFile(
|
|
||||||
f.getAbsolutePath(),
|
|
||||||
Windows.GENERIC_READ,
|
|
||||||
Windows.FILE_SHARE_READ |
|
|
||||||
Windows.FILE_SHARE_WRITE |
|
|
||||||
Windows.FILE_SHARE_DELETE,
|
|
||||||
Windows.OPEN_EXISTING);
|
|
||||||
if (seekOffset > 0)
|
|
||||||
Windows.setFilePointer(fd, seekOffset, Windows.FILE_BEGIN);
|
|
||||||
return new FileInputStream(fd);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Create the specified File for write access, ensuring that it does not exist.
|
|
||||||
* @param f the file that we want to create
|
|
||||||
* @param permissions we want to have on the file (if security is enabled)
|
|
||||||
*
|
|
||||||
* @throws AlreadyExistsException if the file already exists
|
|
||||||
* @throws IOException if any other error occurred
|
|
||||||
*/
|
|
||||||
public static FileOutputStream getCreateForWriteFileOutputStream(File f, int permissions)
|
|
||||||
throws IOException {
|
|
||||||
if (!Shell.WINDOWS) {
|
|
||||||
// Use the native wrapper around open(2)
|
|
||||||
try {
|
|
||||||
FileDescriptor fd = POSIX.open(f.getAbsolutePath(),
|
|
||||||
POSIX.O_WRONLY | POSIX.O_CREAT
|
|
||||||
| POSIX.O_EXCL, permissions);
|
|
||||||
return new FileOutputStream(fd);
|
|
||||||
} catch (NativeIOException nioe) {
|
|
||||||
if (nioe.getErrno() == Errno.EEXIST) {
|
|
||||||
throw new AlreadyExistsException(nioe);
|
|
||||||
}
|
|
||||||
throw nioe;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// Use the Windows native APIs to create equivalent FileOutputStream
|
|
||||||
try {
|
|
||||||
FileDescriptor fd = Windows.createFile(f.getCanonicalPath(),
|
|
||||||
Windows.GENERIC_WRITE,
|
|
||||||
Windows.FILE_SHARE_DELETE
|
|
||||||
| Windows.FILE_SHARE_READ
|
|
||||||
| Windows.FILE_SHARE_WRITE,
|
|
||||||
Windows.CREATE_NEW);
|
|
||||||
POSIX.chmod(f.getCanonicalPath(), permissions);
|
|
||||||
return new FileOutputStream(fd);
|
|
||||||
} catch (NativeIOException nioe) {
|
|
||||||
if (nioe.getErrorCode() == 80) {
|
|
||||||
// ERROR_FILE_EXISTS
|
|
||||||
// 80 (0x50)
|
|
||||||
// The file exists
|
|
||||||
throw new AlreadyExistsException(nioe);
|
|
||||||
}
|
|
||||||
throw nioe;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private synchronized static void ensureInitialized() {
|
|
||||||
if (!initialized) {
|
|
||||||
cacheTimeout =
|
|
||||||
new Configuration().getLong("hadoop.security.uid.cache.secs",
|
|
||||||
4*60*60) * 1000;
|
|
||||||
LOG.info("Initialized cache for UID to User mapping with a cache" +
|
|
||||||
" timeout of " + cacheTimeout/1000 + " seconds.");
|
|
||||||
initialized = true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* A version of renameTo that throws a descriptive exception when it fails.
|
|
||||||
*
|
|
||||||
* @param src The source path
|
|
||||||
* @param dst The destination path
|
|
||||||
*
|
|
||||||
* @throws NativeIOException On failure.
|
|
||||||
*/
|
|
||||||
public static void renameTo(File src, File dst)
|
|
||||||
throws IOException {
|
|
||||||
if (!nativeLoaded) {
|
|
||||||
if (!src.renameTo(dst)) {
|
|
||||||
throw new IOException("renameTo(src=" + src + ", dst=" +
|
|
||||||
dst + ") failed.");
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
renameTo0(src.getAbsolutePath(), dst.getAbsolutePath());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public static void link(File src, File dst) throws IOException {
|
|
||||||
if (!nativeLoaded) {
|
|
||||||
HardLink.createHardLink(src, dst);
|
|
||||||
} else {
|
|
||||||
link0(src.getAbsolutePath(), dst.getAbsolutePath());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* A version of renameTo that throws a descriptive exception when it fails.
|
|
||||||
*
|
|
||||||
* @param src The source path
|
|
||||||
* @param dst The destination path
|
|
||||||
*
|
|
||||||
* @throws NativeIOException On failure.
|
|
||||||
*/
|
|
||||||
private static native void renameTo0(String src, String dst)
|
|
||||||
throws NativeIOException;
|
|
||||||
|
|
||||||
private static native void link0(String src, String dst)
|
|
||||||
throws NativeIOException;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Unbuffered file copy from src to dst without tainting OS buffer cache
|
|
||||||
*
|
|
||||||
* In POSIX platform:
|
|
||||||
* It uses FileChannel#transferTo() which internally attempts
|
|
||||||
* unbuffered IO on OS with native sendfile64() support and falls back to
|
|
||||||
* buffered IO otherwise.
|
|
||||||
*
|
|
||||||
* It minimizes the number of FileChannel#transferTo call by passing the the
|
|
||||||
* src file size directly instead of a smaller size as the 3rd parameter.
|
|
||||||
* This saves the number of sendfile64() system call when native sendfile64()
|
|
||||||
* is supported. In the two fall back cases where sendfile is not supported,
|
|
||||||
* FileChannle#transferTo already has its own batching of size 8 MB and 8 KB,
|
|
||||||
* respectively.
|
|
||||||
*
|
|
||||||
* In Windows Platform:
|
|
||||||
* It uses its own native wrapper of CopyFileEx with COPY_FILE_NO_BUFFERING
|
|
||||||
* flag, which is supported on Windows Server 2008 and above.
|
|
||||||
*
|
|
||||||
* Ideally, we should use FileChannel#transferTo() across both POSIX and Windows
|
|
||||||
* platform. Unfortunately, the wrapper(Java_sun_nio_ch_FileChannelImpl_transferTo0)
|
|
||||||
* used by FileChannel#transferTo for unbuffered IO is not implemented on Windows.
|
|
||||||
* Based on OpenJDK 6/7/8 source code, Java_sun_nio_ch_FileChannelImpl_transferTo0
|
|
||||||
* on Windows simply returns IOS_UNSUPPORTED.
|
|
||||||
*
|
|
||||||
* Note: This simple native wrapper does minimal parameter checking before copy and
|
|
||||||
* consistency check (e.g., size) after copy.
|
|
||||||
* It is recommended to use wrapper function like
|
|
||||||
* the Storage#nativeCopyFileUnbuffered() function in hadoop-hdfs with pre/post copy
|
|
||||||
* checks.
|
|
||||||
*
|
|
||||||
* @param src The source path
|
|
||||||
* @param dst The destination path
|
|
||||||
* @throws IOException
|
|
||||||
*/
|
|
||||||
public static void copyFileUnbuffered(File src, File dst) throws IOException {
|
|
||||||
if (nativeLoaded && Shell.WINDOWS) {
|
|
||||||
copyFileUnbuffered0(src.getAbsolutePath(), dst.getAbsolutePath());
|
|
||||||
} else {
|
|
||||||
FileInputStream fis = null;
|
|
||||||
FileOutputStream fos = null;
|
|
||||||
FileChannel input = null;
|
|
||||||
FileChannel output = null;
|
|
||||||
try {
|
|
||||||
fis = new FileInputStream(src);
|
|
||||||
fos = new FileOutputStream(dst);
|
|
||||||
input = fis.getChannel();
|
|
||||||
output = fos.getChannel();
|
|
||||||
long remaining = input.size();
|
|
||||||
long position = 0;
|
|
||||||
long transferred = 0;
|
|
||||||
while (remaining > 0) {
|
|
||||||
transferred = input.transferTo(position, remaining, output);
|
|
||||||
remaining -= transferred;
|
|
||||||
position += transferred;
|
|
||||||
}
|
|
||||||
} finally {
|
|
||||||
IOUtils.cleanup(LOG, output);
|
|
||||||
IOUtils.cleanup(LOG, fos);
|
|
||||||
IOUtils.cleanup(LOG, input);
|
|
||||||
IOUtils.cleanup(LOG, fis);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private static native void copyFileUnbuffered0(String src, String dst)
|
|
||||||
throws NativeIOException;
|
|
||||||
}
|
|
@ -1,10 +0,0 @@
|
|||||||
# Set everything to be logged to the console
|
|
||||||
log4j.rootCategory=INFO, console
|
|
||||||
log4j.appender.console=org.apache.log4j.ConsoleAppender
|
|
||||||
log4j.appender.console.target=System.err
|
|
||||||
log4j.appender.console.layout=org.apache.log4j.PatternLayout
|
|
||||||
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
|
|
||||||
|
|
||||||
# Spark ????
|
|
||||||
spark.org.apache.spark=WARN
|
|
||||||
spark.streaming=INFO
|
|
Loading…
x
Reference in New Issue
Block a user