一、背景
mysql数据由于自增的bigint 主键,会插入更快,因为能持续往文件末尾插入嘛,因此需要这个东西。
然后呢,服务端有专门生产id的接口,但是数据中心批量插入,肯定会拉暴他们,不让我们一起玩,只能自己玩。
二、方案
1.redis 获取数据段,程序内部自增。
问题:要用外部redis麻烦,而且要持久化
2.python 服务注册的,这个自己网上搜索。
问题:要机器,要服务端和客户端,而且python 我们版本不够高。。。尴尬,不想随便升级整个集群
3.twitter 的 snowflake 算法
参考: https://www.jianshu.com/p/54a87a7c3622
参数:workId datacenterId 唯一
问题:
1.两个ID 我都没法唯一。MAP 阶段可能是在同一个机器上,同时执行,参数不好搞
解决方案:
1.我们场景是只需要同一个任务的主键不重复就行。因此workId 我选取map 的ID,毕竟同一个任务,每个MAP的ID 肯定不同。 datacenterId 可以先默认0.
三.上代码:
public class MagicSnowFlake { //其实时间戳 2017-01-01 00:00:00 private final static long twepoch = 1483200000000l;
// 改到16位 65535,认为MAP的最大数量限制
private final static long ipIdMax = 65535; // 默认1位,我们小,没那么多数据中心,意思一下 private final static long dataCenterIdBits = 1L;
// 9+1 (10 )
private final static long mapIdBits = 9L;
private final static long dataCenterIdMax = ~ (-1L << dataCenterIdBits); //序列在id中占的位数 12bit private final static long seqBits = 12L; //序列最大值 4095 即2的12次方减一。 private final static long seqMax = ~(-1L << seqBits); // 64位的数字:首位0 随后41位表示时间戳 MAP_ID 最后12位序列号 private final static long dataCenterIdLeftShift = seqBits; private final static long mapIdLeftShift = seqBits + dataCenterIdBits; private final static long timeLeftShift = seqBits + dataCenterIdBits + mapIdLeftShift; //IP标识(0~255) private long ipId; // 数据中心ID(0~3) private long dataCenterId; // 毫秒内序列(0~4095) private long seq = 0L; // 上次生成ID的时间截 private long lastTime = -1L; public MagicSnowFlake(long ipId, long dataCenterId) { if(ipId < 0 || ipId > ipIdMax) { System.out.println(" ---------- ipId不在正常范围内(0~"+ipIdMax +") " + ipId); System.exit(0); } if(dataCenterId < 0 || dataCenterId > dataCenterIdMax) { System.out.println(" ---------- dataCenterId不在正常范围内(0~"+dataCenterIdMax +") " + dataCenterId); System.exit(0); } this.ipId = ipId; this.dataCenterId = dataCenterId; } public synchronized long nextId() { long nowTime = System.currentTimeMillis(); if(nowTime < lastTime) { System.out.println(" ---------- 当前时间前于上次操作时间,当前时间有误: " + nowTime); System.exit(0); } if(nowTime == lastTime) { seq = (seq + 1) & seqMax; if(seq == 0) { nowTime = getNextTimeStamp(); } } else { seq = 0L; } lastTime = nowTime; return ((nowTime - twepoch) << timeLeftShift) | (ipId << mapIdLeftShift) | (dataCenterId << dataCenterIdLeftShift) | seq; } private long getNextTimeStamp() { long nowTime; do { nowTime = System.currentTimeMillis(); } while(nowTime <= lastTime); return nowTime; } public static void main(String[] args) { System.out.println(Long.MAX_VALUE); MagicSnowFlake msf = new MagicSnowFlake(1, 1); msf.nextId(); System.out.println(~ (-1L << 15)); } }
UDF 部分
import org.apache.hadoop.hive.ql.exec.MapredContext; import org.apache.hadoop.hive.ql.exec.UDFArgumentException; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.udf.UDFType; import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; /** * @author <a href="mailto:huoguo@2dfire.com">火锅</a> * @time 18/3/8 */ @UDFType(deterministic = false, stateful = true) public class LongIdUDF extends GenericUDF { private static final char SEPARATOR = '_'; private static final String ATTEMPT = "attempt"; private long mapTaskId = 0l; private int increment = 0; private MagicSnowFlake snowFlake; @Override public void configure(MapredContext context) { increment = context.getJobConf().getNumMapTasks(); if(increment == 0) { throw new IllegalArgumentException("mapred.map.tasks is zero"); } mapTaskId = getInitId(context.getJobConf().get("mapred.task.id"),increment); if(mapTaskId == 0l) { throw new IllegalArgumentException("mapred.task.id"); } } @Override public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException { return PrimitiveObjectInspectorFactory.javaLongObjectInspector; } @Override public Long evaluate(DeferredObject[] arguments) throws HiveException { if(snowFlake == null){ int dataCenterId = Integer.parseInt(arguments[0].get().toString()); snowFlake = new MagicSnowFlake(getMapTaskId(),dataCenterId); } return snowFlake.nextId(); } @Override public String getDisplayString(String[] children) { return "getLongId(0)"; } private synchronized long getMapTaskId() { return mapTaskId; } //attempt_1478926768563_0537_m_000004_0 // return 0+1 private long getInitId (String taskAttemptIDstr,int numTasks) throws IllegalArgumentException { try { String[] parts = taskAttemptIDstr.split(Character.toString(SEPARATOR)); if(parts.length == 6) { if(parts[0].equals(ATTEMPT)) { if(!parts[3].equals("m") && !parts[3].equals("r")) { throw new Exception(); } long result = Long.parseLong(parts[4]); if(result >= numTasks) { //if taskid >= numtasks throw new Exception("TaskAttemptId string : " + taskAttemptIDstr + " parse ID [" + result + "] >= numTasks[" + numTasks + "] .."); } return result + 1; } } } catch (Exception e) {} throw new IllegalArgumentException("TaskAttemptId string : " + taskAttemptIDstr + " is not properly formed"); } public static void main(String[] args) { String s = "attempt_1478926768563_0537_m_000004_4"; System.out.println(new LongIdUDF().getInitId(s,5)); } }
小结:
1.copy的代码自己改造的,忘记位置了,总的来说是出自twitter 嘛。
2.用了30亿的表进行测试,没重复
3.有问题 请及时提出,
相关推荐
hive编写 udf 至少需要引入的jar包:hive-exec-xxx.jar 和 hadoop-core-xxx.jar
Hive UDF 说明书,官方指定文档。Hive_LanguageManual_UDF
大数据的hive资源的详细代码设计以及分享,望博友相互交流
Hive表生成工具,Hive表生成工具Hive表生成工具
主要介绍了大数据 java hive udf函数(手机号码脱敏),的相关知识,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
hive的udf函数实现
NexR Hive UDF 关于 NexR Hive UDF是Hive用户定义功能的集合。 执照 快速开始 $ git clone https://github.com/nexr/hive-udf.git $ cd hive-udf $ mvn clean package or $ ant -Dhive.install.dir=../hive/build/...
udf开发–做个简单脱敏udf保留前5位,后面全部替换成*****
详细介绍如何开发hive自定义永久函数,配套有测试数据
通用hive udf 源码,想要开发hadoop hive 的开发者 可以下载试看
Hive UDF UDF 聚合 UDF Finds MIN, MAX and SUM from array of Struct Objects based on a field. 排序 UDF Returns sorted array of Struct objects for an array of Struct Objects based on a field. 日期 ...
hiveUDF-1.0-SNAPSHOT.jar
地址转换成经纬度+两地址间距离计算+省市区位置解析(Java代码) Hive自定义函数的封装
java6 string源码 ...将会在target目录下生成[A=jet-hive-udf-${version}-shaded.jar, B=jet-hive-udf-${version}.jar]文件.其中A是包括所有依赖包的jar, B是最小编译jar文件 你也可以直接在发布页下载打
Hive UDF 项目 介绍 该项目只是一个示例,包含多个 (UDF),用于 Apache Spark。 它旨在演示如何在 Scala 或 Java 中构建 Hive UDF 并在 . 为什么要使用 Hive UDF? Hive UDF 的一个特别好的用途是与 Python 和 ...
Spark Hive UDF示例 建立项目 mvn clean package 将spark-hive-udf-1.0.0-SNAPSHOT.jar复制到边缘节点临时目录 spark-hive-udf]# cp target/spark-hive-udf-1.0.0-SNAPSHOT.jar /tmp 通过提供罐子来启动火花壳 spark...
NULL 博文链接:https://superlxw1234.iteye.com/blog/1586377
HiveUDF 此Hive UDF示例代码包含2个函数:MyUpper()和MyContains() 它们在Hive 0.12、0.13和1.0中进行了测试。 要使Hive UDF在Drill中工作,请关注以下博客:一种。 如何制作罐子mvn package ## b。 准备一个带...
hive元数据生成工具-基于CDH4.7.0版本
Hive 是一个基于 Hadoop 的数据仓库工具,用于处理大规模数据集。它使用类似 SQL 的语言 HiveQL 来查询和管理数据。而自定义用户定义函数(UDF)是 Hive 中的一个重要功能,允许用户根据自己的需求编写自定义函数,...